removed zmq threads from client and made it receive packets via for loop

This commit is contained in:
Dhanya Maliakal 2016-11-22 11:05:42 +01:00
parent 0ed9ee0e5d
commit 394526a9c3
5 changed files with 244 additions and 337 deletions

View File

@ -268,7 +268,12 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1)
getNMods(); getNMods();
getMaxMods(); getMaxMods();
threadStarted = false; dataSocketsStarted = false;
for(int i=0;i<MAXDET;++i){
context[i] = NULL;
zmqsocket[i] = NULL;
strcpy(dataSocketServerDetails[i],"");
}
threadpool = 0; threadpool = 0;
if(createThreadPool() == FAIL) if(createThreadPool() == FAIL)
exit(-1); exit(-1);
@ -5028,361 +5033,282 @@ int multiSlsDetector::resetFramesCaught() {
} }
int multiSlsDetector::createReceivingDataThreads(bool destroy){
if(!destroy) cprintf(MAGENTA,"Going to create data threads\n");
else cprintf(MAGENTA,"Going to destroy data threads\n");
int multiSlsDetector::createReceivingDataSockets(bool destroy){
int numReadouts = thisMultiDetector->numberOfDetectors; //number of sockets
int numSockets = thisMultiDetector->numberOfDetectors;
if(getDetectorsType() == EIGER) if(getDetectorsType() == EIGER)
numReadouts *= 2; numSockets *= 2;
//reset masks
killAllReceivingDataThreads = false;
//destroy
if(destroy){ if(destroy){
#ifdef DEBUG cprintf(MAGENTA,"Going to destroy data sockets\n");
cout << "Destroying Receiving Data Thread(s)" << endl; //close socket
#endif for(int i=0;i<numSockets; ++i){
killAllReceivingDataThreads = true; if(strlen(dataSocketServerDetails[i])){
for(int i = 0; i < numReadouts; ++i){ zmq_disconnect(zmqsocket[i], dataSocketServerDetails[i]);
sem_post(&sem_singlewait[i]); zmq_close(zmqsocket[i]);
pthread_join(receivingDataThreads[i],NULL); zmq_ctx_destroy(context[i]);
sem_destroy(&sem_singlewait[i]); context[i] = NULL;
sem_destroy(&sem_singledone[i]); zmqsocket[i] = NULL;
#ifdef DEBUG strcpy(dataSocketServerDetails[i],"");
cout << "." << flush << endl;
#endif
} }
killAllReceivingDataThreads = false;
threadStarted = false;
cout << "Destroyed Receiving Data Thread(s)" << endl;
}
//create
else{
#ifdef DEBUG
cout << "Creating Receiving Data Thread(s)" << endl;
#endif
//reset current index
currentThreadIndex = -1;
for(int i = 0; i < numReadouts; ++i){
sem_init(&sem_singlewait[i],1,0);
sem_init(&sem_singledone[i],1,0);
threadStarted = false;
currentThreadIndex = i;
if(pthread_create(&receivingDataThreads[i], NULL,staticstartReceivingDataThread, (void*) this)){
cprintf(RED, "Could not create receiving data thread with index %d\n",i);
return FAIL;
}
while(!threadStarted);
#ifdef DEBUG
cout << "." << flush << endl;
#endif
}
cout << "Receiving Data Thread(s) created" << endl;
} }
dataSocketsStarted = false;
cout << "Destroyed Receiving Data Socket(s)" << endl;
return OK; return OK;
}
void* multiSlsDetector::staticstartReceivingDataThread(void* this_pointer){
((multiSlsDetector*)this_pointer)->startReceivingDataThread();
//while(true);
return this_pointer;
}
void multiSlsDetector::startReceivingDataThread(){
int ithread = currentThreadIndex; //set current thread value index
//initializations
int numReadoutPerDetector = 1;
bool jungfrau = false;
if(getDetectorsType() == EIGER){
numReadoutPerDetector = 2;
}else if(getDetectorsType() == JUNGFRAU){
jungfrau = true;
//expectedsize = 8192*128;
} }
int singleDatabytes = detectors[ithread/numReadoutPerDetector]->getDataBytes();
int nel=(singleDatabytes/numReadoutPerDetector)/sizeof(int);
int expectedsize = singleDatabytes/numReadoutPerDetector;//8192*128; //1024*256
int* image = new int[nel];
int len,idet = 0;
singleframe[ithread]=NULL;
cprintf(MAGENTA,"Going to create data sockets\n");
char hostname[100] = "tcp://"; for(int i=0;i<numSockets; ++i){
//get name of rx_hostname
char rx_hostname[100]; char rx_hostname[100];
strcpy(rx_hostname, detectors[ithread/numReadoutPerDetector]->getReceiver()); strcpy(dataSocketServerDetails[i],"tcp://");
strcpy(rx_hostname, detectors[i/numSockets]->getReceiver());
cout<<"rx_hostname:"<<rx_hostname<<endl; cout<<"rx_hostname:"<<rx_hostname<<endl;
//append it (first into ip) to tcp://
if(strchr(rx_hostname,'.')!=NULL) if(strchr(rx_hostname,'.')!=NULL)
strcat(hostname,rx_hostname); strcat(dataSocketServerDetails[i],rx_hostname);
else{ else{
//convert hostname to ip
struct hostent *he = gethostbyname(rx_hostname); struct hostent *he = gethostbyname(rx_hostname);
if (he == NULL){ if (he == NULL){
cprintf(RED,"ERROR: could not convert receiver hostname to ip\n"); cprintf(RED,"ERROR: could not convert receiver hostname to ip\n");
exit(-1); exit(-1);
}else }else
strcat(hostname,inet_ntoa(*(struct in_addr*)he->h_addr)); strcat(dataSocketServerDetails[i],inet_ntoa(*(struct in_addr*)he->h_addr));
} }
strcat(hostname,":"); //add port
//server details sprintf(dataSocketServerDetails[i],"%s:%d",dataSocketServerDetails[i],DEFAULT_ZMQ_PORTNO + i);
//char hostname[100] = "tcp://127.0.0.1:";
int portno = DEFAULT_ZMQ_PORTNO + ithread;
sprintf(hostname,"%s%d",hostname,portno);
//socket details //create context
zmq_msg_t message; context[i] = zmq_ctx_new();
void *context; //create socket
void *zmqsocket; zmqsocket[i] = zmq_socket(context[i], ZMQ_PULL);
context = zmq_ctx_new(); //connect socket
zmqsocket = zmq_socket(context, ZMQ_PULL); zmq_connect(zmqsocket[i], dataSocketServerDetails[i]);
//int hwmval = 10; //int hwmval = 10;
//zmq_setsockopt(zmqsocket,ZMQ_RCVHWM,&hwmval,sizeof(hwmval)); //set receive HIGH WATER MARK (8-9ms slower//should not drop last packets) //zmq_setsockopt(zmqsocket[i],ZMQ_RCVHWM,&hwmval,sizeof(hwmval)); //set receive HIGH WATER MARK (8-9ms slower//should not drop last packets)
cprintf(RED,"connect ret:%d\n",zmq_connect(zmqsocket, hostname)); cout << "ZMQ Client[" << i << "] from " << dataSocketServerDetails[i] << endl;
cout << "ZMQ Client of " << ithread << " at " << hostname << endl; }
cprintf(BLUE,"%d Created socket\n",ithread);
dataSocketsStarted = true;
cout << "Receiving Data Socket(s) created" << endl;
return OK;
}
threadStarted = true; //let calling function know thread started and obtained current
//infinite loop, exited only (if gui restarted/ enabledatastreaming called)
while(true){
//cprintf(GREEN,"%d waiting to copy\n",ithread);
sem_wait(&sem_singlewait[ithread]); //wait for it to be copied
//cprintf(GREEN,"%d gonna copy\n",ithread);
//check to exit thread int multiSlsDetector::getData(int isocket, bool masking, int* image, int size, int &acqIndex, int &frameIndex, int &subframeIndex, string &filename){
if(killAllReceivingDataThreads)
break; zmq_msg_t message;
//scan header------------------------------------------------------------------- //scan header-------------------------------------------------------------------
zmq_msg_init (&message); zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0); int len = zmq_msg_recv(&message, zmqsocket[isocket], 0);
if (len == -1) { if (len == -1) {
cprintf(BG_RED,"Could not read header for socket %d\n",ithread); cprintf(BG_RED,"Could not read header for socket %d\n",isocket);
zmq_msg_close(&message); zmq_msg_close(&message);
cprintf(RED, "%d message null\n",ithread); cprintf(RED, "%d message null\n",isocket);
continue; return FAIL;
} }
// error if you print it // error if you print it
// cout << ithread << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl; // cout << isocket << " header len:"<<len<<" value:"<< (char*)zmq_msg_data(&message)<<endl;
//cprintf(BLUE,"%d header %d\n",ithread,len); //cprintf(BLUE,"%d header %d\n",isocket,len);
rapidjson::Document d; rapidjson::Document d;
d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message)); d.Parse( (char*)zmq_msg_data(&message), zmq_msg_size(&message));
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
// htype is an array of strings // htype is an array of strings
rapidjson::Value::Array htype = d["htype"].GetArray(); rapidjson::Value::Array htype = d["htype"].GetArray();
for(int i=0; i< htype.Size(); i++) for(int i=0; i< htype.Size(); i++)
std::cout << ithread << "htype: " << htype[i].GetString() << std::endl; std::cout << isocket << "htype: " << htype[i].GetString() << std::endl;
// shape is an array of ints // shape is an array of ints
rapidjson::Value::Array shape = d["shape"].GetArray(); rapidjson::Value::Array shape = d["shape"].GetArray();
cout << ithread << "shape: "; cout << isocket << "shape: ";
for(int i=0; i< shape.Size(); i++) for(int i=0; i< shape.Size(); i++)
cout << ithread << shape[i].GetInt() << " "; cout << isocket << shape[i].GetInt() << " ";
cout << endl; cout << endl;
cout << ithread << "type: " << d["type"].GetString() << endl; cout << isocket << "type: " << d["type"].GetString() << endl;
#endif #endif
if(!ithread && (d["acqIndex"].GetInt()!=-9)){ if(d["acqIndex"].GetInt()!=-9){ //!isocket &&
currentAcquisitionIndex = d["acqIndex"].GetInt(); acqIndex = d["acqIndex"].GetInt();
currentFrameIndex = d["fIndex"].GetInt(); frameIndex = d["fIndex"].GetInt();
currentSubFrameIndex = d["subfnum"].GetInt(); subframeIndex = d["subfnum"].GetInt();
currentFileName = d["fname"].GetString(); filename = d["fname"].GetString();
#ifdef VERYVERBOSE #ifdef VERYVERBOSE
cout << "Acquisition index: " << currentAcquisitionIndex << endl; cout << "Acquisition index: " << acqIndex << endl;
cout << "Frame index: " << currentFrameIndex << endl; cout << "Frame index: " << frameIndex << endl;
cout << "Subframe index: " << currentSubFrameIndex << endl; cout << "Subframe index: " << subframeIndex << endl;
cout << "File name: " << currentFileName << endl; cout << "File name: " << filename << endl;
#endif #endif
if(currentFrameIndex ==-1) cprintf(RED,"multi frame index -1!!\n"); if(frameIndex ==-1) cprintf(RED,"multi frame index -1!!\n");
} }
if(singleframe[ithread]==NULL){
singleDatabytes = detectors[ithread/numReadoutPerDetector]->getDataBytes();
nel=(singleDatabytes/numReadoutPerDetector)/sizeof(int);
delete [] image;
image = new int[nel];
expectedsize = singleDatabytes/numReadoutPerDetector;
}
singleframe[ithread]=image;
// close the message // close the message
zmq_msg_close(&message); zmq_msg_close(&message);
//scan data------------------------------------------------------------------- //scan data-------------------------------------------------------------------
zmq_msg_init (&message); zmq_msg_init (&message);
len = zmq_msg_recv(&message, zmqsocket, 0); len = zmq_msg_recv(&message, zmqsocket[isocket], 0);
//cprintf(BLUE,"%d data %d\n",isocket,len);
//cprintf(BLUE,"%d data %d\n",ithread,len);
//end of socket ("end") //end of socket ("end")
if (len < expectedsize ) {
if(len == 3){ if(len == 3){
//cprintf(RED,"%d Received end of acquisition\n", ithread); //cprintf(RED,"%d Received end of acquisition\n", isocket);
singleframe[ithread] = NULL; return FAIL;
//break;
}else{
cprintf(RED,"Received weird packet size %d in socket for %d\n", len, ithread);
memset((char*)(singleframe[ithread]),0xFF,singleDatabytes/numReadoutPerDetector);
} }
//crappy image
if (len < size ) {
cprintf(RED,"Received weird packet size %d in socket for %d\n", len, isocket);
memset((char*)image,0xFF,size);
} }
//actual image
else{ else{
//actual data //actual data
//cprintf(BLUE,"%d actual dataaa\n",ithread); //cprintf(BLUE,"%d actual dataaa\n",isocket);
//memset((char*)(singleframe[ithread]),0xFF,singleDatabytes/numReadoutPerDetector); memcpy((char*)image,(char*)zmq_msg_data(&message),size);
memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector);
//cprintf(GREEN,"%d copied data %d\n",ithread,singleDatabytes/numReadoutPerDetector);
//jungfrau masking adcval //jungfrau masking adcval
if(jungfrau){ if(masking){
for(unsigned int i=0;i<nel;i++){ int snel = size/sizeof(int);
singleframe[ithread][i] = (singleframe[ithread][i] & 0x3FFF3FFF); for(unsigned int i=0;i<snel;++i){
image[i] = (image[i] & 0x3FFF3FFF);
} }
} }
} }
sem_post(&sem_singledone[ithread]);//let multi know is ready
zmq_msg_close(&message); // close the message zmq_msg_close(&message); // close the message
}
cprintf(RED,"%d Closing socket\n",ithread);
//close socket
zmq_disconnect(zmqsocket, hostname);
zmq_close(zmqsocket);
zmq_ctx_destroy(context);
//free resources
delete [] image;
#ifdef DEBUG
cprintf(MAGENTA,"Receiving Data Thread %d:Goodbye!\n",ithread);
#endif
return OK;
} }
void multiSlsDetector::readFrameFromReceiver(){ void multiSlsDetector::readFrameFromReceiver(){
//determine number of half readouts and maxX and maxY //determine number of half readouts and maxX and maxY
int maxX=0,maxY=0; int maxX=0,maxY=0;
int numReadoutPerDetector = 1; int numSockets = thisMultiDetector->numberOfDetectors;
if(getDetectorsType() == EIGER){ int numSocketsPerSLSDetector = 1;
numReadoutPerDetector = 2; bool jungfrau = false;
switch(getDetectorsType()){
case EIGER:
numSocketsPerSLSDetector = 2;
numSockets *= numSocketsPerSLSDetector;
maxX = thisMultiDetector->numberOfChannel[X]; maxX = thisMultiDetector->numberOfChannel[X];
maxY = thisMultiDetector->numberOfChannel[Y]; maxY = thisMultiDetector->numberOfChannel[Y];
break;
case JUNGFRAU:
jungfrau = true;
break;
default:
break;
} }
int numReadouts = numReadoutPerDetector * thisMultiDetector->numberOfDetectors;
//initializing variables //gui variables
currentFileName=""; int currentAcquisitionIndex = -1;
currentAcquisitionIndex = -1; int currentFrameIndex = -1;
currentFrameIndex = -1; int currentSubFrameIndex = -1;
currentSubFrameIndex = -1; string currentFileName = "";
//getting sls values
//getting values int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0, nx=0, ny=0;
int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0;
if(detectors[0]){ if(detectors[0]){
slsdatabytes = detectors[0]->getDataBytes(); slsdatabytes = detectors[0]->getDataBytes();
slsmaxchannels = detectors[0]->getMaxNumberOfChannels(); slsmaxchannels = detectors[0]->getMaxNumberOfChannels();
bytesperchannel = slsdatabytes/slsmaxchannels; bytesperchannel = slsdatabytes/slsmaxchannels;
slsmaxX = detectors[0]->getTotalNumberOfChannels(X); slsmaxX = detectors[0]->getTotalNumberOfChannels(X);
slsmaxY = detectors[0]->getTotalNumberOfChannels(Y); slsmaxY = detectors[0]->getTotalNumberOfChannels(Y);
//cprintf(BLUE,"slsdatabytes:%d slsmaxchannels:%d bytesperchannel:%d slsmaxX:%d slsmaxY:%d\n",
// slsdatabytes,slsmaxchannels,bytesperchannel,slsmaxX,slsmaxY);
} }
//getting multi values
nx = getTotalNumberOfChannels(slsDetectorDefs::X);
ny = getTotalNumberOfChannels(slsDetectorDefs::Y);
//calculating offsets (for eiger interleaving ports)
int offsetX[numSockets]; int offsetY[numSockets];
if(maxX){
for(int i=0; i<numSockets; ++i){
offsetY[i] = (maxY - (thisMultiDetector->offsetY[i/numSocketsPerSLSDetector] + slsmaxY)) * maxX * bytesperchannel;
//the left half or right half
if(!(i%numSocketsPerSLSDetector))
offsetX[i] = thisMultiDetector->offsetX[i/numSocketsPerSLSDetector];
else
offsetX[i] = thisMultiDetector->offsetX[i/numSocketsPerSLSDetector] + (slsmaxX/numSocketsPerSLSDetector);
offsetX[i] *= bytesperchannel;
}
}
int expectedslssize = slsdatabytes/numSocketsPerSLSDetector;
int* image = new int[(expectedslssize/sizeof(int))]();
int nel=(thisMultiDetector->dataBytes)/sizeof(int); int nel=(thisMultiDetector->dataBytes)/sizeof(int);
//cprintf(BLUE,"multi databytes:%d\n",thisMultiDetector->dataBytes);
if(nel <= 0){ if(nel <= 0){
cprintf(RED,"Error: Multislsdetector databytes not valid : %d\n", thisMultiDetector->dataBytes); cprintf(RED,"Error: Multislsdetector databytes not valid : %d\n", thisMultiDetector->dataBytes);
return; return;
} }
int* multiframe=new int[nel]; int* multiframe=new int[nel]();
int idet,offsetY,offsetX;
int halfreadoutoffset = (slsmaxX/numReadoutPerDetector);
int nx =getTotalNumberOfChannels(slsDetectorDefs::X);
int ny =getTotalNumberOfChannels(slsDetectorDefs::Y);
volatile uint64_t dataThreadMask = 0x0; volatile uint64_t dataThreadMask = 0x0;
for(int i = 0; i < numReadouts; ++i) for(int i = 0; i < numSockets; ++i)
dataThreadMask|=(1<<i); dataThreadMask|=(1<<i);
//exit when last message for each socket received
//construct complete image and send to callback
while(true){ while(true){
//memset(((char*)multiframe),0xFF,slsdatabytes*thisMultiDetector->numberOfDetectors); //reset frame memory //memset(((char*)multiframe),0xFF,slsdatabytes*thisMultiDetector->numberOfDetectors); //reset frame memory
//post all of them to start
for(int ireadout=0; ireadout<numReadouts; ++ireadout){
if((1 << ireadout) & dataThreadMask){
sem_post(&sem_singlewait[ireadout]); //sls to continue
}
}
//get each frame //get each frame
for(int ireadout=0; ireadout<numReadouts; ++ireadout){ for(int isocket=0; isocket<numSockets; ++isocket){
//cprintf(BLUE,"multi checking %d mask:0x%x\n",ireadout,receivingDataThreadMask);
idet = ireadout/numReadoutPerDetector;
if((1 << ireadout) & dataThreadMask){ //if running
sem_wait(&sem_singledone[ireadout]); //wait for sls to copy //if running
if((1 << isocket) & dataThreadMask){
//this socket closed //get individual images
if(singleframe[ireadout] == NULL){ //if got nothing if(FAIL == getData(isocket, jungfrau, image, expectedslssize, currentAcquisitionIndex,currentFrameIndex,currentSubFrameIndex,currentFileName)){
dataThreadMask^=(1<<ireadout); dataThreadMask^=(1<<isocket);
continue; continue;
} }
//assemble data //assemble data with interleaving
if(maxX){ if(maxX){
//eiger, so interleaving between ports in one readout itself
offsetY = (maxY - (thisMultiDetector->offsetY[idet] + slsmaxY)) * maxX * bytesperchannel;
//the left half or right half
if(!(ireadout%numReadoutPerDetector))
offsetX = thisMultiDetector->offsetX[idet];
else
offsetX = thisMultiDetector->offsetX[idet] + halfreadoutoffset;
offsetX *= bytesperchannel;
//interleaving with other detectors
//bottom //bottom
if(((idet+1)%2) == 0){ if((((isocket/numSocketsPerSLSDetector)+1)%2) == 0){
for(int i=0;i<slsmaxY;++i){ for(int i=0;i<slsmaxY;++i){
memcpy(((char*)multiframe) + offsetY + offsetX + ((slsmaxY-1-i)*maxX*bytesperchannel), memcpy(((char*)multiframe) + offsetY[isocket] + offsetX[isocket] + ((slsmaxY-1-i)*maxX*bytesperchannel),
(char*)singleframe[ireadout]+ i*(slsmaxX/numReadoutPerDetector)*bytesperchannel, (char*)image+ i*(slsmaxX/numSocketsPerSLSDetector)*bytesperchannel,
(slsmaxX/numReadoutPerDetector)*bytesperchannel); (slsmaxX/numSocketsPerSLSDetector)*bytesperchannel);
} }
} }
//top //top
else{ else{
for(int i=0;i<slsmaxY;++i){ for(int i=0;i<slsmaxY;++i){
memcpy(((char*)multiframe) + offsetY + offsetX + (i*maxX*bytesperchannel), memcpy(((char*)multiframe) + offsetY[isocket] + offsetX[isocket] + (i*maxX*bytesperchannel),
(char*)singleframe[ireadout]+ i*(slsmaxX/numReadoutPerDetector)*bytesperchannel, (char*)image+ i*(slsmaxX/numSocketsPerSLSDetector)*bytesperchannel,
(slsmaxX/numReadoutPerDetector)*bytesperchannel); (slsmaxX/numSocketsPerSLSDetector)*bytesperchannel);
} }
} }
} }
//no interleaving, just add to the end
//numReadout always 1 here //assemble data with no interleaving, assumed detectors appended vertically
else{ else{
memcpy((char*)multiframe+slsdatabytes*ireadout,(char*)singleframe[ireadout],slsdatabytes); memcpy((char*)multiframe+slsdatabytes*isocket,(char*)image,slsdatabytes);
} }
} }
} }
//all done //all done
@ -5391,8 +5317,6 @@ void multiSlsDetector::readFrameFromReceiver(){
//send data to callback //send data to callback
fdata = decodeData(multiframe); fdata = decodeData(multiframe);
if ((fdata) && (dataReady)){ if ((fdata) && (dataReady)){
thisData = new detectorData(fdata,NULL,NULL,getCurrentProgress(),currentFileName.c_str(),nx,ny); thisData = new detectorData(fdata,NULL,NULL,getCurrentProgress(),currentFileName.c_str(),nx,ny);
dataReady(thisData, currentFrameIndex, currentSubFrameIndex, pCallbackArg); dataReady(thisData, currentFrameIndex, currentSubFrameIndex, pCallbackArg);
@ -5401,9 +5325,11 @@ void multiSlsDetector::readFrameFromReceiver(){
//cout<<"Send frame #"<< currentFrameIndex << " to gui"<<endl; //cout<<"Send frame #"<< currentFrameIndex << " to gui"<<endl;
} }
setCurrentProgress(currentAcquisitionIndex+1); setCurrentProgress(currentAcquisitionIndex+1);
} }
//free resources //free resources
delete [] image;
delete[] multiframe; delete[] multiframe;
} }
@ -5656,16 +5582,15 @@ int multiSlsDetector::setReceiverReadTimer(int time_in_ms){
int multiSlsDetector::enableDataStreamingFromReceiver(int enable){ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
if(enable >= 0){ if(enable >= 0){
if(dataSocketsStarted != enable){
if(threadStarted != enable){
//destroy data threads //destroy data threads
if(threadStarted) if(dataSocketsStarted)
createReceivingDataThreads(true); createReceivingDataSockets(true);
//create data threads //create data threads
if(enable > 0){ if(enable > 0){
if(createReceivingDataThreads() == FAIL){ if(createReceivingDataSockets() == FAIL){
std::cout << "Could not create data threads in client. Aborting creating data threads in receiver" << std::endl; std::cout << "Could not create data threads in client. Aborting creating data sockets in receiver" << std::endl;
//only for the first det as theres no general one //only for the first det as theres no general one
setErrorMask(getErrorMask()|(1<<0)); setErrorMask(getErrorMask()|(1<<0));
detectors[0]->setErrorMask((detectors[0]->getErrorMask())|(DATA_STREAMING)); detectors[0]->setErrorMask((detectors[0]->getErrorMask())|(DATA_STREAMING));
@ -5674,7 +5599,7 @@ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
} }
} }
}else enable = threadStarted; }else enable = dataSocketsStarted;
int ret=-100, ret1; int ret=-100, ret1;
for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) { for (int idet=0; idet<thisMultiDetector->numberOfDetectors; idet++) {
@ -5690,9 +5615,9 @@ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){
} }
/* /*
if(enable == -1) if(enable == -1)
return threadStarted; return dataSocketsStarted;
*/ */
return (threadStarted & ret); return (dataSocketsStarted & ret);
} }
int multiSlsDetector::enableReceiverCompression(int i){ int multiSlsDetector::enableReceiverCompression(int i){

View File

@ -1196,11 +1196,11 @@ class multiSlsDetector : public slsDetectorUtils {
int resetFramesCaught(); int resetFramesCaught();
/** /**
* Create Receiving Data Threads * Create Receiving Data Sockets
* @param destroy is true to destroy all the threads * @param destroy is true to destroy all the sockets
* @return OK or FAIL * @return OK or FAIL
*/ */
int createReceivingDataThreads(bool destroy = false); int createReceivingDataSockets(bool destroy = false);
@ -1392,35 +1392,17 @@ class multiSlsDetector : public slsDetectorUtils {
private: private:
/**
* Static function - Starts Data Thread of this object
* @param this_pointer pointer to this object
*/
static void* staticstartReceivingDataThread(void *this_pointer);
/** /**
* Thread that receives data packets from receiver * Gets data from socket
*/ */
void startReceivingDataThread(); int getData(int isocket, bool masking, int* image, int size, int &acqIndex, int &frameIndex, int &subframeIndex, string &filename);
/* synchronizing between zmq threads */ /** Ensures if sockets created successfully */
sem_t sem_singledone[MAXDET]; bool dataSocketsStarted;
sem_t sem_singlewait[MAXDET]; void *context[MAXDET];
int* singleframe[MAXDET]; void *zmqsocket[MAXDET];
char dataSocketServerDetails[MAXDET][100];
/* Parameters given to the gui picked up from zmq threads*/
int currentAcquisitionIndex;
int currentFrameIndex;
int currentSubFrameIndex;
string currentFileName;
pthread_t receivingDataThreads[MAXDET];
/** Ensures if threads created successfully */
bool threadStarted;
/** Current Thread Index*/
int currentThreadIndex;
/** Set to self-terminate data receiving threads waiting for semaphores */
bool killAllReceivingDataThreads;
protected: protected:

View File

@ -5730,7 +5730,7 @@ int slsDetector::setUDPConnection(){
int ret = FAIL; int ret = FAIL;
int fnum = F_SETUP_RECEIVER_UDP; int fnum = F_SETUP_RECEIVER_UDP;
char args[3][MAX_STR_LENGTH]; char args[3][MAX_STR_LENGTH]={"","",""};
char retval[MAX_STR_LENGTH]=""; char retval[MAX_STR_LENGTH]="";
//called before set up //called before set up
@ -5806,7 +5806,7 @@ int slsDetector::configureMAC(){
int ret=FAIL; int ret=FAIL;
int fnum=F_CONFIGURE_MAC,fnum2=F_RECEIVER_SHORT_FRAME; int fnum=F_CONFIGURE_MAC,fnum2=F_RECEIVER_SHORT_FRAME;
char mess[MAX_STR_LENGTH]=""; char mess[MAX_STR_LENGTH]="";
char arg[6][50]; char arg[6][50]={"","","","","",""};
char cword[50]="", *pcword; char cword[50]="", *pcword;
string sword; string sword;
int retval=-1; int retval=-1;
@ -7141,7 +7141,7 @@ int slsDetector::setReceiverTCPSocket(string const name, int const receiver_port
string slsDetector::setFilePath(string s) { string slsDetector::setFilePath(string s) {
int fnum = F_SET_RECEIVER_FILE_PATH; int fnum = F_SET_RECEIVER_FILE_PATH;
int ret = FAIL; int ret = FAIL;
char arg[MAX_STR_LENGTH]; char arg[MAX_STR_LENGTH]="";
char retval[MAX_STR_LENGTH] = ""; char retval[MAX_STR_LENGTH] = "";
struct stat st; struct stat st;
@ -7192,7 +7192,7 @@ string slsDetector::setFilePath(string s) {
string slsDetector::setFileName(string s) { string slsDetector::setFileName(string s) {
int fnum=F_SET_RECEIVER_FILE_NAME; int fnum=F_SET_RECEIVER_FILE_NAME;
int ret = FAIL; int ret = FAIL;
char arg[MAX_STR_LENGTH]; char arg[MAX_STR_LENGTH]="";
char retval[MAX_STR_LENGTH]=""; char retval[MAX_STR_LENGTH]="";
if(!s.empty()){ if(!s.empty()){

View File

@ -1586,11 +1586,11 @@ class slsDetector : public slsDetectorUtils, public energyConversion {
int resetFramesCaught(); int resetFramesCaught();
/** /**
* Create Receiving Data Threads * Create Receiving Data Sockets
* @param destroy is true to destroy all the threads * @param destroy is true to destroy all the sockets
* @return OK or FAIL * @return OK or FAIL
*/ */
int createReceivingDataThreads(bool destroy = false){return 0;}; int createReceivingDataSockets(bool destroy = false){return 0;};
/** Reads frames from receiver through a constant socket /** Reads frames from receiver through a constant socket

View File

@ -656,11 +656,11 @@ virtual int getReceiverCurrentFrameIndex()=0;
virtual int resetFramesCaught()=0; virtual int resetFramesCaught()=0;
/** /**
* Create Receiving Data Threads * Create Receiving Data Sockets
* @param destroy is true to destroy all the threads * @param destroy is true to destroy all the sockets
* @return OK or FAIL * @return OK or FAIL
*/ */
virtual int createReceivingDataThreads(bool destroy = false)=0; virtual int createReceivingDataSockets(bool destroy = false)=0;
/** Reads frames from receiver through a constant socket /** Reads frames from receiver through a constant socket