diff --git a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp index d6317aa58..40498e5d5 100644 --- a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp +++ b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.cpp @@ -268,7 +268,12 @@ multiSlsDetector::multiSlsDetector(int id) : slsDetectorUtils(), shmId(-1) getNMods(); getMaxMods(); - threadStarted = false; + dataSocketsStarted = false; + for(int i=0;inumberOfDetectors; + //number of sockets + int numSockets = thisMultiDetector->numberOfDetectors; if(getDetectorsType() == EIGER) - numReadouts *= 2; + numSockets *= 2; - //reset masks - killAllReceivingDataThreads = false; - - //destroy if(destroy){ -#ifdef DEBUG - cout << "Destroying Receiving Data Thread(s)" << endl; -#endif - killAllReceivingDataThreads = true; - for(int i = 0; i < numReadouts; ++i){ - sem_post(&sem_singlewait[i]); - pthread_join(receivingDataThreads[i],NULL); - sem_destroy(&sem_singlewait[i]); - sem_destroy(&sem_singledone[i]); -#ifdef DEBUG - cout << "." << flush << endl; -#endif - } - killAllReceivingDataThreads = false; - threadStarted = false; + cprintf(MAGENTA,"Going to destroy data sockets\n"); + //close socket + for(int i=0;igetReceiver()); + cout<<"rx_hostname:"<h_addr)); + } + //add port + sprintf(dataSocketServerDetails[i],"%s:%d",dataSocketServerDetails[i],DEFAULT_ZMQ_PORTNO + i); + + //create context + context[i] = zmq_ctx_new(); + //create socket + zmqsocket[i] = zmq_socket(context[i], ZMQ_PULL); + //connect socket + zmq_connect(zmqsocket[i], dataSocketServerDetails[i]); + //int hwmval = 10; + //zmq_setsockopt(zmqsocket[i],ZMQ_RCVHWM,&hwmval,sizeof(hwmval)); //set receive HIGH WATER MARK (8-9ms slower//should not drop last packets) + cout << "ZMQ Client[" << i << "] from " << dataSocketServerDetails[i] << endl; + } + + dataSocketsStarted = true; + cout << "Receiving Data Socket(s) created" << endl; + return OK; +} + + + + + + + + + +int multiSlsDetector::getData(int isocket, bool masking, int* image, int size, int &acqIndex, int &frameIndex, int &subframeIndex, string &filename){ + + zmq_msg_t message; + + //scan header------------------------------------------------------------------- + zmq_msg_init (&message); + int len = zmq_msg_recv(&message, zmqsocket[isocket], 0); + if (len == -1) { + cprintf(BG_RED,"Could not read header for socket %d\n",isocket); + zmq_msg_close(&message); + cprintf(RED, "%d message null\n",isocket); + return FAIL; + } + + + // error if you print it + // cout << isocket << " header len:"<startReceivingDataThread(); - //while(true); - - return this_pointer; -} - - -void multiSlsDetector::startReceivingDataThread(){ - - int ithread = currentThreadIndex; //set current thread value index - - //initializations - int numReadoutPerDetector = 1; - bool jungfrau = false; - if(getDetectorsType() == EIGER){ - numReadoutPerDetector = 2; - }else if(getDetectorsType() == JUNGFRAU){ - jungfrau = true; - //expectedsize = 8192*128; - } - int singleDatabytes = detectors[ithread/numReadoutPerDetector]->getDataBytes(); - int nel=(singleDatabytes/numReadoutPerDetector)/sizeof(int); - int expectedsize = singleDatabytes/numReadoutPerDetector;//8192*128; //1024*256 - int* image = new int[nel]; - int len,idet = 0; - singleframe[ithread]=NULL; - - - char hostname[100] = "tcp://"; - char rx_hostname[100]; - strcpy(rx_hostname, detectors[ithread/numReadoutPerDetector]->getReceiver()); - cout<<"rx_hostname:"<h_addr)); - } - strcat(hostname,":"); - //server details - //char hostname[100] = "tcp://127.0.0.1:"; - int portno = DEFAULT_ZMQ_PORTNO + ithread; - sprintf(hostname,"%s%d",hostname,portno); - - //socket details - zmq_msg_t message; - void *context; - void *zmqsocket; - context = zmq_ctx_new(); - zmqsocket = zmq_socket(context, ZMQ_PULL); - //int hwmval = 10; - //zmq_setsockopt(zmqsocket,ZMQ_RCVHWM,&hwmval,sizeof(hwmval)); //set receive HIGH WATER MARK (8-9ms slower//should not drop last packets) - cprintf(RED,"connect ret:%d\n",zmq_connect(zmqsocket, hostname)); - cout << "ZMQ Client of " << ithread << " at " << hostname << endl; - cprintf(BLUE,"%d Created socket\n",ithread); - - - - threadStarted = true; //let calling function know thread started and obtained current - - - //infinite loop, exited only (if gui restarted/ enabledatastreaming called) - while(true){ - - //cprintf(GREEN,"%d waiting to copy\n",ithread); - sem_wait(&sem_singlewait[ithread]); //wait for it to be copied - //cprintf(GREEN,"%d gonna copy\n",ithread); - //check to exit thread - if(killAllReceivingDataThreads) - break; - - //scan header------------------------------------------------------------------- - zmq_msg_init (&message); - len = zmq_msg_recv(&message, zmqsocket, 0); - if (len == -1) { - cprintf(BG_RED,"Could not read header for socket %d\n",ithread); - zmq_msg_close(&message); - cprintf(RED, "%d message null\n",ithread); - continue; - } - - - // error if you print it - // cout << ithread << " header len:"<getDataBytes(); - nel=(singleDatabytes/numReadoutPerDetector)/sizeof(int); - delete [] image; - image = new int[nel]; - expectedsize = singleDatabytes/numReadoutPerDetector; - } - singleframe[ithread]=image; - // close the message - zmq_msg_close(&message); - - - //scan data------------------------------------------------------------------- - zmq_msg_init (&message); - len = zmq_msg_recv(&message, zmqsocket, 0); - - //cprintf(BLUE,"%d data %d\n",ithread,len); - //end of socket ("end") - if (len < expectedsize ) { - if(len == 3){ - //cprintf(RED,"%d Received end of acquisition\n", ithread); - singleframe[ithread] = NULL; - //break; - }else{ - cprintf(RED,"Received weird packet size %d in socket for %d\n", len, ithread); - memset((char*)(singleframe[ithread]),0xFF,singleDatabytes/numReadoutPerDetector); - } - - } - else{ - //actual data - //cprintf(BLUE,"%d actual dataaa\n",ithread); - //memset((char*)(singleframe[ithread]),0xFF,singleDatabytes/numReadoutPerDetector); - memcpy((char*)(singleframe[ithread]),(char*)zmq_msg_data(&message),singleDatabytes/numReadoutPerDetector); - //cprintf(GREEN,"%d copied data %d\n",ithread,singleDatabytes/numReadoutPerDetector); - - //jungfrau masking adcval - if(jungfrau){ - for(unsigned int i=0;inumberOfDetectors; + int numSocketsPerSLSDetector = 1; + bool jungfrau = false; + switch(getDetectorsType()){ + case EIGER: + numSocketsPerSLSDetector = 2; + numSockets *= numSocketsPerSLSDetector; maxX = thisMultiDetector->numberOfChannel[X]; maxY = thisMultiDetector->numberOfChannel[Y]; + break; + case JUNGFRAU: + jungfrau = true; + break; + default: + break; } - int numReadouts = numReadoutPerDetector * thisMultiDetector->numberOfDetectors; - //initializing variables - currentFileName=""; - currentAcquisitionIndex = -1; - currentFrameIndex = -1; - currentSubFrameIndex = -1; + //gui variables + int currentAcquisitionIndex = -1; + int currentFrameIndex = -1; + int currentSubFrameIndex = -1; + string currentFileName = ""; - - //getting values - int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0; + //getting sls values + int slsdatabytes = 0, slsmaxchannels = 0, bytesperchannel = 0, slsmaxX = 0, slsmaxY=0, nx=0, ny=0; if(detectors[0]){ slsdatabytes = detectors[0]->getDataBytes(); slsmaxchannels = detectors[0]->getMaxNumberOfChannels(); bytesperchannel = slsdatabytes/slsmaxchannels; slsmaxX = detectors[0]->getTotalNumberOfChannels(X); slsmaxY = detectors[0]->getTotalNumberOfChannels(Y); - //cprintf(BLUE,"slsdatabytes:%d slsmaxchannels:%d bytesperchannel:%d slsmaxX:%d slsmaxY:%d\n", - // slsdatabytes,slsmaxchannels,bytesperchannel,slsmaxX,slsmaxY); } + //getting multi values + nx = getTotalNumberOfChannels(slsDetectorDefs::X); + ny = getTotalNumberOfChannels(slsDetectorDefs::Y); + //calculating offsets (for eiger interleaving ports) + int offsetX[numSockets]; int offsetY[numSockets]; + if(maxX){ + for(int i=0; ioffsetY[i/numSocketsPerSLSDetector] + slsmaxY)) * maxX * bytesperchannel; + //the left half or right half + if(!(i%numSocketsPerSLSDetector)) + offsetX[i] = thisMultiDetector->offsetX[i/numSocketsPerSLSDetector]; + else + offsetX[i] = thisMultiDetector->offsetX[i/numSocketsPerSLSDetector] + (slsmaxX/numSocketsPerSLSDetector); + offsetX[i] *= bytesperchannel; + } + } + + int expectedslssize = slsdatabytes/numSocketsPerSLSDetector; + int* image = new int[(expectedslssize/sizeof(int))](); int nel=(thisMultiDetector->dataBytes)/sizeof(int); - //cprintf(BLUE,"multi databytes:%d\n",thisMultiDetector->dataBytes); if(nel <= 0){ cprintf(RED,"Error: Multislsdetector databytes not valid : %d\n", thisMultiDetector->dataBytes); return; } - int* multiframe=new int[nel]; - int idet,offsetY,offsetX; - int halfreadoutoffset = (slsmaxX/numReadoutPerDetector); - int nx =getTotalNumberOfChannels(slsDetectorDefs::X); - int ny =getTotalNumberOfChannels(slsDetectorDefs::Y); + int* multiframe=new int[nel](); + volatile uint64_t dataThreadMask = 0x0; - for(int i = 0; i < numReadouts; ++i) + for(int i = 0; i < numSockets; ++i) dataThreadMask|=(1<numberOfDetectors); //reset frame memory - //post all of them to start - for(int ireadout=0; ireadoutoffsetY[idet] + slsmaxY)) * maxX * bytesperchannel; - //the left half or right half - if(!(ireadout%numReadoutPerDetector)) - offsetX = thisMultiDetector->offsetX[idet]; - else - offsetX = thisMultiDetector->offsetX[idet] + halfreadoutoffset; - offsetX *= bytesperchannel; - //interleaving with other detectors - //bottom - if(((idet+1)%2) == 0){ + if((((isocket/numSocketsPerSLSDetector)+1)%2) == 0){ for(int i=0;i= 0){ - - if(threadStarted != enable){ + if(dataSocketsStarted != enable){ //destroy data threads - if(threadStarted) - createReceivingDataThreads(true); + if(dataSocketsStarted) + createReceivingDataSockets(true); //create data threads if(enable > 0){ - if(createReceivingDataThreads() == FAIL){ - std::cout << "Could not create data threads in client. Aborting creating data threads in receiver" << std::endl; + if(createReceivingDataSockets() == FAIL){ + std::cout << "Could not create data threads in client. Aborting creating data sockets in receiver" << std::endl; //only for the first det as theres no general one setErrorMask(getErrorMask()|(1<<0)); detectors[0]->setErrorMask((detectors[0]->getErrorMask())|(DATA_STREAMING)); @@ -5674,7 +5599,7 @@ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){ } } - }else enable = threadStarted; + }else enable = dataSocketsStarted; int ret=-100, ret1; for (int idet=0; idetnumberOfDetectors; idet++) { @@ -5690,9 +5615,9 @@ int multiSlsDetector::enableDataStreamingFromReceiver(int enable){ } /* if(enable == -1) - return threadStarted; + return dataSocketsStarted; */ - return (threadStarted & ret); + return (dataSocketsStarted & ret); } int multiSlsDetector::enableReceiverCompression(int i){ diff --git a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h index 4228b47a4..1cc895fa5 100644 --- a/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h +++ b/slsDetectorSoftware/multiSlsDetector/multiSlsDetector.h @@ -1196,11 +1196,11 @@ class multiSlsDetector : public slsDetectorUtils { int resetFramesCaught(); /** - * Create Receiving Data Threads - * @param destroy is true to destroy all the threads + * Create Receiving Data Sockets + * @param destroy is true to destroy all the sockets * @return OK or FAIL */ - int createReceivingDataThreads(bool destroy = false); + int createReceivingDataSockets(bool destroy = false); @@ -1392,35 +1392,17 @@ class multiSlsDetector : public slsDetectorUtils { private: - /** - * Static function - Starts Data Thread of this object - * @param this_pointer pointer to this object - */ - static void* staticstartReceivingDataThread(void *this_pointer); /** - * Thread that receives data packets from receiver + * Gets data from socket */ - void startReceivingDataThread(); + int getData(int isocket, bool masking, int* image, int size, int &acqIndex, int &frameIndex, int &subframeIndex, string &filename); - /* synchronizing between zmq threads */ - sem_t sem_singledone[MAXDET]; - sem_t sem_singlewait[MAXDET]; - int* singleframe[MAXDET]; - - /* Parameters given to the gui picked up from zmq threads*/ - int currentAcquisitionIndex; - int currentFrameIndex; - int currentSubFrameIndex; - string currentFileName; - - pthread_t receivingDataThreads[MAXDET]; - /** Ensures if threads created successfully */ - bool threadStarted; - /** Current Thread Index*/ - int currentThreadIndex; - /** Set to self-terminate data receiving threads waiting for semaphores */ - bool killAllReceivingDataThreads; + /** Ensures if sockets created successfully */ + bool dataSocketsStarted; + void *context[MAXDET]; + void *zmqsocket[MAXDET]; + char dataSocketServerDetails[MAXDET][100]; protected: diff --git a/slsDetectorSoftware/slsDetector/slsDetector.cpp b/slsDetectorSoftware/slsDetector/slsDetector.cpp index 6ae228dd2..fe93af9d6 100644 --- a/slsDetectorSoftware/slsDetector/slsDetector.cpp +++ b/slsDetectorSoftware/slsDetector/slsDetector.cpp @@ -5730,7 +5730,7 @@ int slsDetector::setUDPConnection(){ int ret = FAIL; int fnum = F_SETUP_RECEIVER_UDP; - char args[3][MAX_STR_LENGTH]; + char args[3][MAX_STR_LENGTH]={"","",""}; char retval[MAX_STR_LENGTH]=""; //called before set up @@ -5806,7 +5806,7 @@ int slsDetector::configureMAC(){ int ret=FAIL; int fnum=F_CONFIGURE_MAC,fnum2=F_RECEIVER_SHORT_FRAME; char mess[MAX_STR_LENGTH]=""; - char arg[6][50]; + char arg[6][50]={"","","","","",""}; char cword[50]="", *pcword; string sword; int retval=-1; @@ -7141,7 +7141,7 @@ int slsDetector::setReceiverTCPSocket(string const name, int const receiver_port string slsDetector::setFilePath(string s) { int fnum = F_SET_RECEIVER_FILE_PATH; int ret = FAIL; - char arg[MAX_STR_LENGTH]; + char arg[MAX_STR_LENGTH]=""; char retval[MAX_STR_LENGTH] = ""; struct stat st; @@ -7192,7 +7192,7 @@ string slsDetector::setFilePath(string s) { string slsDetector::setFileName(string s) { int fnum=F_SET_RECEIVER_FILE_NAME; int ret = FAIL; - char arg[MAX_STR_LENGTH]; + char arg[MAX_STR_LENGTH]=""; char retval[MAX_STR_LENGTH]=""; if(!s.empty()){ diff --git a/slsDetectorSoftware/slsDetector/slsDetector.h b/slsDetectorSoftware/slsDetector/slsDetector.h index e00e03ebb..508b28000 100644 --- a/slsDetectorSoftware/slsDetector/slsDetector.h +++ b/slsDetectorSoftware/slsDetector/slsDetector.h @@ -1586,11 +1586,11 @@ class slsDetector : public slsDetectorUtils, public energyConversion { int resetFramesCaught(); /** - * Create Receiving Data Threads - * @param destroy is true to destroy all the threads + * Create Receiving Data Sockets + * @param destroy is true to destroy all the sockets * @return OK or FAIL */ - int createReceivingDataThreads(bool destroy = false){return 0;}; + int createReceivingDataSockets(bool destroy = false){return 0;}; /** Reads frames from receiver through a constant socket diff --git a/slsDetectorSoftware/slsDetector/slsDetectorUtils.h b/slsDetectorSoftware/slsDetector/slsDetectorUtils.h index 053a91c01..18ec00524 100644 --- a/slsDetectorSoftware/slsDetector/slsDetectorUtils.h +++ b/slsDetectorSoftware/slsDetector/slsDetectorUtils.h @@ -656,11 +656,11 @@ virtual int getReceiverCurrentFrameIndex()=0; virtual int resetFramesCaught()=0; /** - * Create Receiving Data Threads - * @param destroy is true to destroy all the threads + * Create Receiving Data Sockets + * @param destroy is true to destroy all the sockets * @return OK or FAIL */ -virtual int createReceivingDataThreads(bool destroy = false)=0; +virtual int createReceivingDataSockets(bool destroy = false)=0; /** Reads frames from receiver through a constant socket