merged to developer

This commit is contained in:
2016-12-19 12:26:53 +01:00
29 changed files with 1113 additions and 868 deletions

View File

@ -251,7 +251,7 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1)
#ifdef VERBOSE
cout << thisMultiDetector->detectorIds[i] << endl;
#endif
detectors[i]=new slsDetector(thisMultiDetector->detectorIds[i], this);
detectors[i]=new slsDetector(i, thisMultiDetector->detectorIds[i], this);
// setAngularConversionPointer(detectors[i]->getAngularConversionPointer(),detectors[i]->getNModsPointer(),detectors[i]->getNChans()*detectors[i]->getNChips(), i);
@ -268,7 +268,12 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1)
getNMods();
getMaxMods();
threadStarted = false;
dataSocketsStarted = false;
for(int i=0;i<MAXDET;++i){
context[i] = NULL;
zmqsocket[i] = NULL;
strcpy(dataSocketServerDetails[i],"");
}
threadpool = 0;
if(createThreadPool() == FAIL)
exit(-1);
@ -367,7 +372,7 @@ int multiSlsDetector::addSlsDetector(int id, int pos) {
cout << "Creating new detector " << pos << endl;
#endif
detectors[pos]=new slsDetector(id, this);
detectors[pos]=new slsDetector(pos, id, this);
thisMultiDetector->detectorIds[pos]=detectors[pos]->getDetectorId();
thisMultiDetector->numberOfDetectors++;
@ -682,7 +687,7 @@ int multiSlsDetector::addSlsDetector(const char *name, int pos) {
#ifdef VERBOSE
cout << "Detector " << id << " already exists" << endl;
#endif
s=new slsDetector(id, this);
s=new slsDetector(pos, id, this);
if (s->getHostname()==string(name))
break;
delete s;
@ -723,7 +728,7 @@ int multiSlsDetector::addSlsDetector(const char *name, int pos) {
#ifdef VERBOSE
cout << "Creating detector " << id << " of type " << getDetectorType(t) << endl;
#endif
s=new slsDetector(t, id, this);
s=new slsDetector(pos, t, id, this);
if (online) {
s->setTCPSocket(name);
setOnline(ONLINE_FLAG);
@ -756,7 +761,7 @@ int multiSlsDetector::addSlsDetector(detectorType t, int pos) {
#ifdef VERBOSE
cout << "Creating detector " << id << " of type " << getDetectorType(t) << endl;
#endif
slsDetector *s=new slsDetector(t, id, this);
slsDetector *s=new slsDetector(pos, t, id, this);
s=NULL;
#ifdef VERBOSE
cout << "Adding it to the multi detector structure" << endl;
@ -1415,41 +1420,46 @@ int multiSlsDetector::startReadOut(){
int* multiSlsDetector::getDataFromDetector() {
int nel=thisMultiDetector->dataBytes/sizeof(int);
int n;
int* retval=new int[nel];
int n = 0;
int* retval= NULL;
int *retdet, *p=retval;
int nodata=1, nodatadet=-1;;
int nodata=1, nodatadet=-1;
int nodatadetectortype = false;
detectorType types = getDetectorsType();
if(types == EIGER || types == JUNGFRAU){
nodatadetectortype = true;
}
if(!nodatadetectortype)
retval=new int[nel];
// cout << "multi: " << thisMultiDetector->dataBytes << endl;
for (int id=0; id<thisMultiDetector->numberOfDetectors; id++) {
if (detectors[id]) {
retdet=detectors[id]->getDataFromDetector(p);
n=detectors[id]->getDataBytes();
if(detectors[id]->getErrorMask())
setErrorMask(getErrorMask()|(1<<id));
if (retdet) {
nodata=0;
if(!nodatadetectortype){
n=detectors[id]->getDataBytes();
if (retdet) {
nodata=0;
#ifdef VERBOSE
cout << "Detector " << id << " returned " << n << " bytes " << endl;
cout << "Detector " << id << " returned " << n << " bytes " << endl;
#endif
} else {
nodatadet=id;
} else {
nodatadet=id;
#ifdef VERBOSE
cout << "Detector " << id << " does not have data left " << endl;
cout << "Detector " << id << " does not have data left " << endl;
#endif
/*if((detectors[id]->getDetectorsType() != EIGER)||(detectors[id]->getDetectorsType() != JUNGFRAU))
break;*/
}
p+=n/sizeof(int);
}
p+=n/sizeof(int);
}
}
//eiger returns only null
detectorType types = getDetectorsType();
if(types == EIGER || types == JUNGFRAU){
delete [] retval;
if(nodatadetectortype){
return NULL;
}
@ -2280,6 +2290,7 @@ double* multiSlsDetector::decodeData(int *datain, int &nn, double *fdata) {
}
}
return dataout;
}
@ -3803,6 +3814,20 @@ int multiSlsDetector::getMaxNumberOfModules(dimension d) {
}
int multiSlsDetector::getFlippedData(dimension d){
int ret=-100,ret1;
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++)
if (detectors[idet]){
ret1=detectors[idet]->getFlippedData(d);
if(ret==-100)
ret=ret1;
else if (ret!=ret1)
ret=-1;
}
return ret;
}
int multiSlsDetector::setNumberOfModules(int p, dimension d) {
int ret=0;//, ret1;
@ -3842,6 +3867,24 @@ int multiSlsDetector::setNumberOfModules(int p, dimension d) {
}
int multiSlsDetector::setFlippedData(dimension d, int value){
int ret=-100,ret1;
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++)
if (detectors[idet]){
ret1=detectors[idet]->setFlippedData(d,value);
if(ret==-100)
ret=ret1;
else if (ret!=ret1)
ret=-1;
}
return ret;
}
int multiSlsDetector::decodeNMod(int i, int &id, int &im) {
#ifdef VERBOSE
cout << " Module " << i << " belongs to detector " << id << endl;;
@ -4331,6 +4374,7 @@ int multiSlsDetector::readConfigurationFile(string const fname){
// char ext[100];
setAcquiringFlag(false);
clearAllErrorMask();
string ans;
string str;
@ -5042,429 +5086,336 @@ int multiSlsDetector::resetFramesCaught() {
}
int multiSlsDetector::createReceivingDataThreads(bool destroy){
if(!destroy) cprintf(MAGENTA,"Going to create data threads\n");
else cprintf(MAGENTA,"Going to destroy data threads\n");
int multiSlsDetector::createReceivingDataSockets(const bool destroy){
int numReadouts = thisMultiDetector->numberOfDetectors;
if(getDetectorsType() == EIGER)
numReadouts *= 2;
//number of sockets
int numSockets = thisMultiDetector->numberOfDetectors;
int numSocketsPerDetector = 1;
if(getDetectorsType() == EIGER){
numSocketsPerDetector = 2;
}
numSockets *= numSocketsPerDetector;
//reset masks
killAllReceivingDataThreads = false;
//destroy
if(destroy){
#ifdef DEBUG
cout << "Destroying Receiving Data Thread(s)" << endl;
#endif
killAllReceivingDataThreads = true;
for(int i = 0; i < numReadouts; ++i){
sem_post(&sem_singlewait[i]);
pthread_join(receivingDataThreads[i],NULL);
sem_destroy(&sem_singlewait[i]);
sem_destroy(&sem_singledone[i]);
#ifdef DEBUG
cout << "." << flush << endl;
#endif
}
killAllReceivingDataThreads = false;
threadStarted = false;
cprintf(MAGENTA,"Going to destroy data sockets\n");
//close socket
for(int i=0;i<numSockets; ++i){
if(strlen(dataSocketServerDetails[i])){
zmq_disconnect(zmqsocket[i], dataSocketServerDetails[i]);
zmq_close(zmqsocket[i]);
zmq_ctx_destroy(context[i]);
context[i] = NULL;
zmqsocket[i] = NULL;
strcpy(dataSocketServerDetails[i],"");
}
}
cout << "Destroyed Receiving Data Thread(s)" << endl;
dataSocketsStarted = false;
cout << "Destroyed Receiving Data Socket(s)" << endl;
return OK;
}
//create
cprintf(MAGENTA,"Going to create data sockets\n");
for(int i=0;i<numSockets; ++i){
//get name of rx_hostname
char rx_hostname[100];
strcpy(dataSocketServerDetails[i],"tcp://");
strcpy(rx_hostname, detectors[i/numSocketsPerDetector]->getReceiver());
cout<<"rx_hostname:"<<rx_hostname<<endl;
//append it (first into ip) to tcp://
if(strchr(rx_hostname,'.')!=NULL)
strcat(dataSocketServerDetails[i],rx_hostname);
else{
//convert hostname to ip
struct hostent *he = gethostbyname(rx_hostname);
if (he == NULL){
cprintf(RED,"ERROR: could not convert receiver hostname to ip\n");
exit(-1);
}else
strcat(dataSocketServerDetails[i],inet_ntoa(*(struct in_addr*)he->h_addr));
}
//add port
sprintf(dataSocketServerDetails[i],"%s:%d",dataSocketServerDetails[i],DEFAULT_ZMQ_PORTNO +
(i/numSocketsPerDetector)*numSocketsPerDetector + (i%numSocketsPerDetector));//using this instead of i in the offchance, detid doesnt start at 0 (shmget error)
//create context
context[i] = zmq_ctx_new();
//create socket
zmqsocket[i] = zmq_socket(context[i], ZMQ_PULL);
//connect socket
zmq_connect(zmqsocket[i], dataSocketServerDetails[i]);
//int hwmval = 10;
//zmq_setsockopt(zmqsocket[i],ZMQ_RCVHWM,&hwmval,sizeof(hwmval)); //set receive HIGH WATER MARK (8-9ms slower//should not drop last packets)
cout << "ZMQ Client[" << i << "] from " << dataSocketServerDetails[i] << endl;
}
dataSocketsStarted = true;
cout << "Receiving Data Socket(s) created" << endl;
return OK;
}
int multiSlsDetector::getData(const int isocket, const bool masking, int* image, const int size, int &acqIndex, int &frameIndex, int &subframeIndex, string &filename){
zmq_msg_t message;
//scan header-------------------------------------------------------------------
zmq_msg_init (&message);
int len = zmq_msg_recv(&message, zmqsocket[isocket], 0);
if (len == -1) {
cprintf(BG_RED,"Could not read header for socket %d\n",isocket);
zmq_msg_close(&message);
cprintf(RED, "%d message null\n",isocket);
return FAIL;
}
// error if you print it
// cout << isocket << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
//cprintf(BLUE,"%d header %d\n",isocket,len);
rapidjson::Document d;
d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
#ifdef VERYVERBOSE
// htype is an array of strings
rapidjson::Value::Array htype = d["htype"].GetArray();
for(int i=0; i< htype.Size(); i++)
std::cout << isocket << "htype: " << htype[i].GetString() << std::endl;
// shape is an array of ints
rapidjson::Value::Array shape = d["shape"].GetArray();
cout << isocket << "shape: ";
for(int i=0; i< shape.Size(); i++)
cout << isocket << shape[i].GetInt() << " ";
cout << endl;
cout << isocket << "type: " << d["type"].GetString() << endl;
#endif
if(d["acqIndex"].GetInt()!=-9){ //!isocket &&
acqIndex = d["acqIndex"].GetInt();
frameIndex = d["fIndex"].GetInt();
subframeIndex = d["subfnum"].GetInt();
filename = d["fname"].GetString();
#ifdef VERYVERBOSE
cout << "Acquisition index: " << acqIndex << endl;
cout << "Frame index: " << frameIndex << endl;
cout << "Subframe index: " << subframeIndex << endl;
cout << "File name: " << filename << endl;
#endif
if(frameIndex ==-1) cprintf(RED,"multi frame index -1!!\n");
}
// close the message
zmq_msg_close(&message);
//scan data-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket[isocket], 0);
//cprintf(BLUE,"%d data %d\n",isocket,len);
//end of socket ("end")
if(len == 3){
//cprintf(RED,"%d Received end of acquisition\n", isocket);
return FAIL;
}
//crappy image
if (len < size ) {
cprintf(RED,"Received weird packet size %d in socket for %d\n", len, isocket);
memset((char*)image,0xFF,size);
}
//actual image
else{
#ifdef DEBUG
cout << "Creating Receiving Data Thread(s)" << endl;
#endif
//reset current index
currentThreadIndex = -1;
for(int i = 0; i < numReadouts; ++i){
sem_init(&sem_singlewait[i],1,0);
sem_init(&sem_singledone[i],1,0);
threadStarted = false;
currentThreadIndex = i;
if(pthread_create(&receivingDataThreads[i], NULL,staticstartReceivingDataThread, (void*) this)){
cprintf(RED, "Could not create receiving data thread with index %d\n",i);
return FAIL;
//actual data
//cprintf(BLUE,"%d actual dataaa\n",isocket);
memcpy((char*)image,(char*)zmq_msg_data(&message),size);
//jungfrau masking adcval
if(masking){
int snel = size/sizeof(int);
for(unsigned int i=0;i<snel;++i){
image[i] = (image[i] & 0x3FFF3FFF);
}
while(!threadStarted);
#ifdef DEBUG
cout << "." << flush << endl;
#endif
}
cout << "Receiving Data Thread(s) created" << endl;
}
zmq_msg_close(&message); // close the message
return OK;
}
void* multiSlsDetector::staticstartReceivingDataThread(void* this_pointer){
((multiSlsDetector*)this_pointer)->startReceivingDataThread();
//while(true);
return this_pointer;
}
void multiSlsDetector::startReceivingDataThread(){
void multiSlsDetector::readFrameFromReceiver(){
int ithread = currentThreadIndex; //set current thread value index
char hostname[100] = "tcp://";
char rx_hostname[100];
strcpy(rx_hostname, detectors[ithread]->getReceiver());
cout<<"rx_hostname:"<<rx_hostname<<endl;
if(strchr(rx_hostname,'.')!=NULL)
strcat(hostname,rx_hostname);
else{
struct hostent *he = gethostbyname(rx_hostname);
if (he == NULL){
cprintf(RED,"ERROR: could not convert receiver hostname to ip\n");
exit(-1);
}else
strcat(hostname,inet_ntoa(*(struct in_addr*)he->h_addr));
}
strcat(hostname,":");
//server details
//char hostname[100] = "tcp://127.0.0.1:";
int portno = DEFAULT_ZMQ_PORTNO + ithread;
sprintf(hostname,"%s%d",hostname,portno);
//socket details
zmq_msg_t message;
void *context;
void *zmqsocket;
context = zmq_ctx_new();
zmqsocket = zmq_socket(context, ZMQ_PULL);
//int hwmval = 10;
//zmq_setsockopt(zmqsocket,ZMQ_RCVHWM,&hwmval,sizeof(hwmval)); //set receive HIGH WATER MARK (8-9ms slower//should not drop last packets)
cprintf(RED,"connect ret:%d\n",zmq_connect(zmqsocket, hostname));
cout << "ZMQ Client of " << ithread << " at " << hostname << endl;
cprintf(BLUE,"%d Created socket\n",ithread);
//initializations
int numReadoutPerDetector = 1;
//determine number of half readouts and maxX and maxY
int maxX=0,maxY=0;
int numSockets = thisMultiDetector->numberOfDetectors;
int numSocketsPerSLSDetector = 1;
bool jungfrau = false;
int expectedsize = 1024*256;/**shouldnt work for other bit modes or anythign*/
if(getDetectorsType() == EIGER){
numReadoutPerDetector = 2;
expectedsize = 1024*256;
}else if(getDetectorsType() == JUNGFRAU){
switch(getDetectorsType()){
case EIGER:
numSocketsPerSLSDetector = 2;
numSockets *= numSocketsPerSLSDetector;
maxX = thisMultiDetector->numberOfChannel[X];
maxY = thisMultiDetector->numberOfChannel[Y];
break;
case JUNGFRAU:
jungfrau = true;
expectedsize = 8192*128;
break;
default:
break;
}
int singleDatabytes = detectors[ithread/numReadoutPerDetector]->getDataBytes();
int nel=(singleDatabytes/numReadoutPerDetector)/sizeof(int);
int* image = new int[nel];
int len,idet = 0;
singleframe[ithread]=NULL;
threadStarted = true; //let calling function know thread started and obtained current
//gui variables
int currentAcquisitionIndex = -1;
int currentFrameIndex = -1;
int currentSubFrameIndex = -1;
string currentFileName = "";
//infinite loop, exited only (if gui restarted/ enabledatastreaming called)
while(true){
sem_wait(&sem_singlewait[ithread]); //wait for it to be copied
//check to exit thread
if(killAllReceivingDataThreads)
break;
//scan header-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0);
if (len == -1) {
cprintf(BG_RED,"Could not read header for socket %d\n",ithread);
zmq_msg_close(&message);
cprintf(RED, "%d message null\n",ithread);
continue;
//getting sls values
int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0, nx=0, ny=0;
if(detectors[0]){
slsdatabytes = detectors[0]->getDataBytes();
slsmaxchannels = detectors[0]->getMaxNumberOfChannels();
bytesperchannel = slsdatabytes/slsmaxchannels;
slsmaxX = detectors[0]->getTotalNumberOfChannels(X);
slsmaxY = detectors[0]->getTotalNumberOfChannels(Y);
}
//getting multi values
nx = getTotalNumberOfChannels(slsDetectorDefs::X);
ny = getTotalNumberOfChannels(slsDetectorDefs::Y);
//calculating offsets (for eiger interleaving ports)
int offsetX[numSockets]; int offsetY[numSockets];
int bottom[numSockets];
if(maxX){
for(int i=0; i<numSockets; ++i){
offsetY[i] = (maxY - (thisMultiDetector->offsetY[i/numSocketsPerSLSDetector] + slsmaxY)) * maxX * bytesperchannel;
//the left half or right half
if(!(i%numSocketsPerSLSDetector))
offsetX[i] = thisMultiDetector->offsetX[i/numSocketsPerSLSDetector];
else
offsetX[i] = thisMultiDetector->offsetX[i/numSocketsPerSLSDetector] + (slsmaxX/numSocketsPerSLSDetector);
offsetX[i] *= bytesperchannel;
bottom[i] = detectors[i/numSocketsPerSLSDetector]->getFlippedData(X);/*only for eiger*/
}
}
int expectedslssize = slsdatabytes/numSocketsPerSLSDetector;
int* image = new int[(expectedslssize/sizeof(int))]();
int nel=(thisMultiDetector->dataBytes)/sizeof(int);
if(nel <= 0){
cprintf(RED,"Error: Multislsdetector databytes not valid : %d\n", thisMultiDetector->dataBytes);
return;
}
int* multiframe=new int[nel]();
// error if you print it
// cout << ithread << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
//cprintf(BLUE,"%d header %d\n",ithread,len);
rapidjson::Document d;
d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
#ifdef VERYVERBOSE
// htype is an array of strings
rapidjson::Value::Array htype = d["htype"].GetArray();
for(int i=0; i< htype.Size(); i++)
std::cout << ithread << "htype: " << htype[i].GetString() << std::endl;
// shape is an array of ints
rapidjson::Value::Array shape = d["shape"].GetArray();
cout << ithread << "shape: ";
for(int i=0; i< shape.Size(); i++)
cout << ithread << shape[i].GetInt() << " ";
cout << endl;
volatile uint64_t dataThreadMask = 0x0;
cout << ithread << "type: " << d["type"].GetString() << endl;
//wait for real time acquisition to start
bool running = true;
sem_wait(&sem_newRTAcquisition);
if(checkJoinThread())
running = false;
#endif
if(!ithread && (d["acqIndex"].GetInt()!=-9)){
currentAcquisitionIndex = d["acqIndex"].GetInt();
currentFrameIndex = d["fIndex"].GetInt();
currentSubFrameIndex = d["subfnum"].GetInt();
currentFileName = d["fname"].GetString();
#ifdef VERYVERBOSE
cout << "Acquisition index: " << currentAcquisitionIndex << endl;
cout << "Frame index: " << currentFrameIndex << endl;
cout << "Subframe index: " << currentSubFrameIndex << endl;
cout << "File name: " << currentFileName << endl;
#endif
if(currentFrameIndex ==-1) cprintf(RED,"multi frame index -1!!\n");
}
singleframe[ithread]=image;
for(int i = 0; i < numSockets; ++i)
dataThreadMask|=(1<<i);
// close the message
zmq_msg_close(&message);
//exit when last message for each socket received
while(running){
//memset(((char*)multiframe),0xFF,slsdatabytes*thisMultiDetector->numberOfDetectors); //reset frame memory
//get each frame
for(int isocket=0; isocket<numSockets; ++isocket){
//scan data-------------------------------------------------------------------
zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0);
//if running
if((1 << isocket) & dataThreadMask){
//cprintf(BLUE,"%d data %d\n",ithread,len);
//end of socket ("end")
if (len < expectedsize ) {
if(len == 3){
//cprintf(RED,"%d Received end of acquisition\n", ithread);
singleframe[ithread] = NULL;
//break;
}else{
cprintf(RED,"Received weird packet size %d in socket for %d\n", len, ithread);
memset((char*)(singleframe[ithread]),0xFF,singleDatabytes/numReadoutPerDetector);
}
//get individual images
if(FAIL == getData(isocket, jungfrau, image, expectedslssize, currentAcquisitionIndex,currentFrameIndex,currentSubFrameIndex,currentFileName)){
dataThreadMask^=(1<<isocket);
continue;
}
}
else{
//actual data
//cprintf(BLUE,"%d actual dataaa\n",ithread);
memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector);
//assemble data with interleaving
if(maxX){
//bottom
if(bottom[isocket]){
//if((((isocket/numSocketsPerSLSDetector)+1)%2) == 0){
for(int i=0;i<slsmaxY;++i){
memcpy(((char*)multiframe) + offsetY[isocket] + offsetX[isocket] + ((slsmaxY-1-i)*maxX*bytesperchannel),
(char*)image+ i*(slsmaxX/numSocketsPerSLSDetector)*bytesperchannel,
(slsmaxX/numSocketsPerSLSDetector)*bytesperchannel);
}
}
//top
else{
for(int i=0;i<slsmaxY;++i){
memcpy(((char*)multiframe) + offsetY[isocket] + offsetX[isocket] + (i*maxX*bytesperchannel),
(char*)image+ i*(slsmaxX/numSocketsPerSLSDetector)*bytesperchannel,
(slsmaxX/numSocketsPerSLSDetector)*bytesperchannel);
}
}
}
//jungfrau masking adcval
if(jungfrau){
for(unsigned int i=0;i<nel;i++){
singleframe[ithread][i] = (singleframe[ithread][i] & 0x3FFF3FFF);
//assemble data with no interleaving, assumed detectors appended vertically
else{
memcpy((char*)multiframe+slsdatabytes*isocket,(char*)image,slsdatabytes);
}
}
}
sem_post(&sem_singledone[ithread]);//let multi know is ready
zmq_msg_close(&message); // close the message
}
cprintf(RED,"%d Closing socket\n",ithread);
//close socket
zmq_disconnect(zmqsocket, hostname);
zmq_close(zmqsocket);
zmq_ctx_destroy(context);
//all done
if(!dataThreadMask){
sem_wait(&sem_newRTAcquisition);
//done with complete acquisition
if(checkJoinThread())
break;
else{
//starting a new scan/measurement
for(int i = 0; i < numSockets; ++i)
dataThreadMask|=(1<<i);
running = false;
}
}
//send data to callback
if(running){
fdata = decodeData(multiframe);
if ((fdata) && (dataReady)){
thisData = new detectorData(fdata,NULL,NULL,getCurrentProgress(),currentFileName.c_str(),nx,ny);
dataReady(thisData, currentFrameIndex, currentSubFrameIndex, pCallbackArg);
delete thisData;
fdata = NULL;
//cout<<"Send frame #"<< currentFrameIndex << " to gui"<<endl;
}
setCurrentProgress(currentAcquisitionIndex+1);
}
//setting it back for each scan/measurement
running = true;
}
//free resources
delete [] image;
#ifdef DEBUG
cprintf(MAGENTA,"Receiving Data Thread %d:Goodbye!\n",ithread);
#endif
delete[] multiframe;
}
// void multiSlsDetector::readFrameFromReceiver(){
// //determine number of half readouts and maxX and maxY
// int maxX=0,maxY=0;
// int numReadoutPerDetector = 1;
// if(getDetectorsType() == EIGER){
// numReadoutPerDetector = 2;
// maxX = thisMultiDetector->numberOfChannel[X];
// maxY = thisMultiDetector->numberOfChannel[Y];
// }
// int numReadouts = numReadoutPerDetector * thisMultiDetector->numberOfDetectors;
// //initializing variables
// currentFileName="";
// currentAcquisitionIndex = -1;
// currentFrameIndex = -1;
// currentSubFrameIndex = -1;
// //getting values
// int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0;
// if(detectors[0]){
// slsdatabytes = detectors[0]->getDataBytes();
// slsmaxchannels = detectors[0]->getMaxNumberOfChannels();
// bytesperchannel = slsdatabytes/slsmaxchannels;
// slsmaxX = detectors[0]->getTotalNumberOfChannels(X);
// slsmaxY = detectors[0]->getTotalNumberOfChannels(Y);
// }
// int nel=(thisMultiDetector->dataBytes)/sizeof(int);
// if(nel <= 0){
// cprintf(RED,"Error: Multislsdetector databytes not valid : %d\n", thisMultiDetector->dataBytes);
// return;
// }
// int* multiframe=new int[nel];
// int idet,offsetY,offsetX;
// int halfreadoutoffset = (slsmaxX/numReadoutPerDetector);
// int nx =getTotalNumberOfChannels(slsDetectorDefs::X);
// int ny =getTotalNumberOfChannels(slsDetectorDefs::Y);
// volatile uint64_t dataThreadMask = 0x0;
// for(int i = 0; i < numReadouts; ++i)
// dataThreadMask|=(1<<i);
// //construct complete image and send to callback
// while(true){
// //memset(((char*)multiframe),0xFF,slsdatabytes*thisMultiDetector->numberOfDetectors); //reset frame memory
// //post all of them to start
// for(int ireadout=0; ireadout<numReadouts; ++ireadout){
// if((1 << ireadout) & dataThreadMask){
// sem_post(&sem_singlewait[ireadout]); //sls to continue
// }
// }
// //get each frame
// for(int ireadout=0; ireadout<numReadouts; ++ireadout){
// //cprintf(BLUE,"multi checking %d mask:0x%x\n",ireadout,receivingDataThreadMask);
// idet = ireadout/numReadoutPerDetector;
// if((1 << ireadout) & dataThreadMask){ //if running
// sem_wait(&sem_singledone[ireadout]); //wait for sls to copy
// //this socket closed
// if(singleframe[ireadout] == NULL){ //if got nothing
// dataThreadMask^=(1<<ireadout);
// continue;
// }
// //assemble data
// if(maxX){ //eiger, so interleaving between ports in one readout itself
// offsetY = (maxY - (thisMultiDetector->offsetY[idet] + slsmaxY)) * maxX * bytesperchannel;
// //the left half or right half
// if(!(ireadout%numReadoutPerDetector))
// offsetX = thisMultiDetector->offsetX[idet];
// else
// offsetX = thisMultiDetector->offsetX[idet] + halfreadoutoffset;
// offsetX *= bytesperchannel;
// //cprintf(BLUE,"offsetx:%d offsety:%d maxx:%d slsmaxX:%d slsmaxY:%d bytesperchannel:%d\n",
// // offsetX,offsetY,maxX,slsmaxX,slsmaxY,bytesperchannel);
// // cprintf(BLUE,"copying bytes:%d\n", (slsmaxX/numReadoutPerDetector)*bytesperchannel);
// //itnerleaving with other detectors
// //bottom
// if(((idet+1)%2) == 0){
// for(int i=0;i<slsmaxY;++i)
// memcpy(((char*)multiframe) + offsetY + offsetX + ((slsmaxY-i)*maxX*bytesperchannel),
// (char*)singleframe[ireadout]+ i*(slsmaxX/numReadoutPerDetector)*bytesperchannel,
// (slsmaxX/numReadoutPerDetector)*bytesperchannel);
// }
// //top
// else{
// for(int i=0;i<slsmaxY;++i)
// memcpy(((char*)multiframe) + offsetY + offsetX + (i*maxX*bytesperchannel),
// (char*)singleframe[ireadout]+ i*(slsmaxX/numReadoutPerDetector)*bytesperchannel,
// (slsmaxX/numReadoutPerDetector)*bytesperchannel);
// }
// }
// //no interleaving, just add to the end
// //numReadout always 1 here
// else{
// memcpy((char*)multiframe+slsdatabytes*ireadout,(char*)singleframe[ireadout],slsdatabytes);
// }
// }
// }
// //all done
// if(!dataThreadMask)
// break;
// //send data to callback
// fdata = decodeData(multiframe);
// if ((fdata) && (dataReady)){
// thisData = new detectorData(fdata,NULL,NULL,getCurrentProgress(),currentFileName.c_str(),nx,ny);
// dataReady(thisData, currentFrameIndex, currentSubFrameIndex, pCallbackArg);
// delete thisData;
// fdata = NULL;
// //cout<<"Send frame #"<< currentFrameIndex << " to gui"<<endl;
// }
// setCurrentProgress(currentAcquisitionIndex+1);
// }
// //free resources
// delete[] multiframe;
// }
int* multiSlsDetector::readFrameFromReceiver(char* fName, int &acquisitionIndex, int &frameIndex, int &subFrameIndex){ int nel=(thisMultiDetector->dataBytes)/sizeof(int); if(nel <= 0){ cout << "Multislsdetector databytes not valid :" << thisMultiDetector->dataBytes << endl; acquisitionIndex = -1; return NULL; } int n,complete=OK; int i,k,offsetX, offsetY, maxX, maxY; double dr; int* retval=new int[nel]; int *retdet = NULL, *p=retval; string fullFName=""; string ext=""; int index=-1,f_index=-1,p_index=-1,det_index=-1; double sv0=-1,sv1=-1; if(getDetectorsType() == EIGER){ maxX = thisMultiDetector->numberOfChannel[X]; maxY = thisMultiDetector->numberOfChannel[Y]; } for (int id=0; id<thisMultiDetector->numberOfDetectors; id++) { if (detectors[id]) { n=detectors[id]->getDataBytes(); retdet=detectors[id]->readFrameFromReceiver(fName, acquisitionIndex, frameIndex, subFrameIndex); if(detectors[id]->getErrorMask()) setErrorMask(getErrorMask()|(1<<id)); if (retdet){ if (acquisitionIndex==-1){ complete = FAIL; delete [] retdet; }else{ n=detectors[id]->getDataBytes(); if(getDetectorsType() == EIGER){
//cout << "fname:"<<fName<<" findex:"<<fIndex<<endl;//cout<<"n:"<<n<<endl;//cout<<"maxchan:"<<detectors[id]->getMaxNumberOfChannels()<<" n:"<<n<<endl;
dr = (double)n/detectors[id]->getMaxNumberOfChannels();
k=(int)(detectors[id]->getMaxNumberOfChannels(X)*dr);
//bit mode
//cout << "dr:"<<dr<<endl;//cout << "k:"<<k<<endl;
offsetY = (int)(((maxY - (thisMultiDetector->offsetY[id] + detectors[id]->getMaxNumberOfChannels(Y))) * maxX)*dr);//bit mode
offsetX = (int)(thisMultiDetector->offsetX[id]*dr);
//cout << "offsetY"<<offsetY<< " offsetX:"<<offsetX<<endl;
for(i=0; i< 256;i++){
memcpy((((char*)p) + offsetY + offsetX + ((int)(i*maxX*dr))) ,(((char*)retdet) + (i*k)),k);//bit mode
}
} else{
memcpy(p,retdet,n);
p+=n/sizeof(int);
}
delete [] retdet;
//concatenate filenames
if(!fullFName.length()){
//assign file prefix
fullFName.assign(fileIO::getFileName());
if (strrchr(fName,'.')!=NULL){
ext.assign(fName);
size_t dot = ext.rfind(".");
if(dot != string::npos)
ext = ext.erase(0,dot);
else
ext = "";
//get variables
fileIOStatic::getVariablesFromFileName(fName,index, f_index, p_index, sv0, sv1, det_index);
//append scan and det variables
fullFName.append(fileIOStatic::getReceiverFileNameToConcatenate(fName));
}
}
//append only if scan variables are different
if(!fileIOStatic::verifySameFrame(fName,index,f_index, p_index, sv0, sv1, det_index)){
fullFName.append(fileIOStatic::getReceiverFileNameToConcatenate(fName));
}
}
}
else {
#ifdef VERBOSE
cout << "Receiver for detector " << id << " does not have data left " << endl;
#endif
delete [] retval;
return NULL;
}
}
}
//append extension
fullFName.append(ext);
strcpy(fName,fullFName.c_str());
//if some of the receivers did not give data, dont count it
if((getDetectorsType() == EIGER) &&(complete ==FAIL))
acquisitionIndex = -1;
return retval;
};
int multiSlsDetector::lockReceiver(int lock) {
int ret=-100, ret1;
@ -5687,20 +5638,39 @@ int multiSlsDetector::setReadReceiverFrequency(int getFromReceiver, int freq){
}
int multiSlsDetector::setReceiverReadTimer(int time_in_ms){
int ret=-100, ret1;
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) {
if (detectors[idet]) {
ret1=detectors[idet]->setReceiverReadTimer(time_in_ms);
if(detectors[idet]->getErrorMask())
setErrorMask(getErrorMask()|(1<<idet));
if (ret==-100)
ret=ret1;
else if (ret!=ret1)
ret=-1;
}
}
return ret;
}
// only called from gui or that wants zmq data packets
int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
if(enable >= 0){
if(threadStarted != enable){
if(dataSocketsStarted != enable){
//destroy data threads
if(threadStarted)
createReceivingDataThreads(true);
if(dataSocketsStarted)
createReceivingDataSockets(true);
//create data threads
if(enable > 0){
if(createReceivingDataThreads() == FAIL){
std::cout << "Could not create data threads in client. Aborting creating data threads in receiver" << std::endl;
if(createReceivingDataSockets() == FAIL){
std::cout << "Could not create data threads in client. Aborting creating data sockets in receiver" << std::endl;
//only for the first det as theres no general one
setErrorMask(getErrorMask()|(1<<0));
detectors[0]->setErrorMask((detectors[0]->getErrorMask())|(DATA_STREAMING));
@ -5709,7 +5679,7 @@ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
}
}
}else enable = threadStarted;
}else enable = dataSocketsStarted;
int ret=-100, ret1;
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) {
@ -5725,9 +5695,9 @@ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
}
/*
if(enable == -1)
return threadStarted;
return dataSocketsStarted;
*/
return (threadStarted & ret);
return (dataSocketsStarted & ret);
}
int multiSlsDetector::enableReceiverCompression(int i){

View File

@ -340,6 +340,12 @@ class multiSlsDetector : public slsDetectorUtils {
int getMaxNumberOfChannelsPerDetector(dimension d){return thisMultiDetector->maxNumberOfChannelsPerDetector[d];};
/** returns the enable if data will be flipped across x or y axis
* \param d axis across which data is flipped
* returns 1 or 0
*/
int getFlippedData(dimension d=X);
int setMaxNumberOfChannelsPerDetector(dimension d,int i){thisMultiDetector->maxNumberOfChannelsPerDetector[d]=i; return thisMultiDetector->maxNumberOfChannelsPerDetector[d];};
double getScanStep(int index, int istep){return thisMultiDetector->scanSteps[index][istep];};
@ -979,6 +985,14 @@ class multiSlsDetector : public slsDetectorUtils {
int configureMAC();
int setNumberOfModules(int i=-1, dimension d=X);
/** sets the enable which determines if data will be flipped across x or y axis
* \param d axis across which data is flipped
* \param value 0 or 1 to reset/set or -1 to get value
* \return enable flipped data across x or y axis
*/
int setFlippedData(dimension d=X, int value=-1);
int getMaxNumberOfModules(dimension d=X);
int setDynamicRange(int i=-1);
@ -1196,19 +1210,18 @@ class multiSlsDetector : public slsDetectorUtils {
int resetFramesCaught();
/**
* Create Receiving Data Threads
* @param destroy is true to destroy all the threads
* Create Receiving Data Sockets
* @param destroy is true to destroy all the sockets
* @return OK or FAIL
*/
int createReceivingDataThreads(bool destroy = false);
int createReceivingDataSockets(const bool destroy = false);
/** Reads frames from receiver through a constant socket
*/
//void readFrameFromReceiver();
int* readFrameFromReceiver(char*, int&, int&, int&);
/** Locks/Unlocks the connection to the receiver
void readFrameFromReceiver();
/** Locks/Unlocks the connection to the receiver
/param lock sets (1), usets (0), gets (-1) the lock
/returns lock status of the receiver
*/
@ -1276,6 +1289,14 @@ class multiSlsDetector : public slsDetectorUtils {
*/
int setReadReceiverFrequency(int getFromReceiver, int freq=-1);
/** Sets the read receiver timer
if data required from receiver randomly readRxrFrequency=0,
then the timer between each data stream is set with time_in_ms
@param time_in_ms timer between frames
/returns read receiver timer
*/
int setReceiverReadTimer(int time_in_ms=500);
/** Enable or disable streaming data from receiver to client
* @param enable 0 to disable 1 to enable -1 to only get the value
* @returns data streaming
@ -1384,35 +1405,17 @@ class multiSlsDetector : public slsDetectorUtils {
private:
/**
* Static function - Starts Data Thread of this object
* @param this_pointer pointer to this object
*/
static void* staticstartReceivingDataThread(void *this_pointer);
/**
* Thread that receives data packets from receiver
* Gets data from socket
*/
void startReceivingDataThread();
int getData(const int isocket, const bool masking, int* image, const int size, int &acqIndex, int &frameIndex, int &subframeIndex, string &filename);
/* synchronizing between zmq threads */
sem_t sem_singledone[MAXDET];
sem_t sem_singlewait[MAXDET];
int* singleframe[MAXDET];
/* Parameters given to the gui picked up from zmq threads*/
int currentAcquisitionIndex;
int currentFrameIndex;
int currentSubFrameIndex;
string currentFileName;
pthread_t receivingDataThreads[MAXDET];
/** Ensures if threads created successfully */
bool threadStarted;
/** Current Thread Index*/
int currentThreadIndex;
/** Set to self-terminate data receiving threads waiting for semaphores */
bool killAllReceivingDataThreads;
/** Ensures if sockets created successfully */
bool dataSocketsStarted;
void *context[MAXDET];
void *zmqsocket[MAXDET];
char dataSocketServerDetails[MAXDET][100];
protected: