diff --git a/slsReceiverSoftware/include/UDPStandardImplementation.h b/slsReceiverSoftware/include/UDPStandardImplementation.h index b043eeedf..6674c62d9 100644 --- a/slsReceiverSoftware/include/UDPStandardImplementation.h +++ b/slsReceiverSoftware/include/UDPStandardImplementation.h @@ -289,6 +289,12 @@ private: /************************************************************************* * Listening and Writing Threads ***************************************** *************************************************************************/ + /** + * Create Data Call Back Threads + * @param destroy is true to destroy all the threads + * @return OK or FAIL + */ + int createDataCallbackThreads(bool destroy = false); /** * Create Listening Threads @@ -303,6 +309,9 @@ private: */ int createWriterThreads(bool destroy = false); + + + /** * Set Thread Priorities */ @@ -336,6 +345,12 @@ private: */ int createCompressionFile(int ithread, int iframe); + /** + * Static function - Starts Data Callback Thread of this object + * @param this_pointer pointer to this object + */ + static void* startDataCallbackThread(void *this_pointer); + /** * Static function - Starts Listening Thread of this object * @param this_pointer pointer to this object @@ -348,6 +363,11 @@ private: */ static void* startWritingThread(void *this_pointer); + /** + * Thread that sends data packets to client + */ + void startDataCallback(); + /** * Thread that listens to packets * It pops the fifofree for free addresses, listens to packets and pushes them into the fifo @@ -652,6 +672,24 @@ private: + //***data call back thread parameters*** + /** Number of data callback Threads */ + int numberofDataCallbackThreads; + + /** Data Callback Threads */ + pthread_t dataCallbackThreads[MAX_NUMBER_OF_LISTENING_THREADS]; + + /** Semaphores Synchronizing DataCallback Threads */ + sem_t dataCallbackSemaphore[MAX_NUMBER_OF_LISTENING_THREADS]; + + /** Mask with each bit indicating status of each data callback thread */ + volatile uint32_t dataCallbackThreadsMask; + + /** Set to self-terminate data callback threads waiting for semaphores */ + bool killAllDataCallbackThreads; + + bool dataCallbackEnabled; + //***general and listening thread parameters*** /** Ensures if threads created successfully */ @@ -669,9 +707,6 @@ private: /** Semaphores Synchronizing Listening Threads */ sem_t listenSemaphore[MAX_NUMBER_OF_LISTENING_THREADS]; - /** Current Listening Thread Index*/ - int currentListeningThreadIndex; - /** Mask with each bit indicating status of each listening thread */ volatile uint32_t listeningThreadsMask; diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 7f649d5c3..983fd1fb8 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -186,6 +186,12 @@ void UDPStandardImplementation::initializeMembers(){ frametoGuiCounter[i] = 0; } + //***data callback thread parameters*** + numberofDataCallbackThreads = 1; + dataCallbackThreadsMask = 0x0; + killAllDataCallbackThreads = false; + dataCallbackEnabled = true; /**false*/ + //***general and listening thread parameters*** threadStarted = false; currentThreadIndex = -1; @@ -767,14 +773,17 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){ //delete threads and set number of listening threads if(myDetectorType == EIGER){ pthread_mutex_lock(&statusMutex); + dataCallbackThreadsMask = 0x0; listeningThreadsMask = 0x0; writerThreadsMask = 0x0; pthread_mutex_unlock(&(statusMutex)); if(threadStarted){ createListeningThreads(true); + createDataCallbackThreads(true); createWriterThreads(true); } numberofListeningThreads = MAX_NUMBER_OF_LISTENING_THREADS; + numberofDataCallbackThreads = MAX_NUMBER_OF_LISTENING_THREADS; numberofWriterThreads = MAX_NUMBER_OF_WRITER_THREADS; } @@ -793,7 +802,8 @@ int UDPStandardImplementation::setDetectorType(const detectorType d){ //updates File Header if(myDetectorType == EIGER){ for(int i=0; istartDataCallback(); + return this_pointer; +} + + + void* UDPStandardImplementation::startListeningThread(void* this_pointer){ FILE_LOG(logDEBUG) << __AT__ << " called"; ((UDPStandardImplementation*)this_pointer)->startListening(); @@ -1560,6 +1643,62 @@ void* UDPStandardImplementation::startWritingThread(void* this_pointer){ +void UDPStandardImplementation::startDataCallback(){ + FILE_LOG(logDEBUG) << __AT__ << " called"; + + //set current thread value index + int ithread = currentThreadIndex; + //let calling function know thread started and obtained current + threadStarted = 1; + char* buffer; + // server address to bind + const char *hostName = "tcp://127.0.0.1:70001";/**increment this by ithread and detid*/ + + /* outer loop - loops once for each acquisition */ + //infinite loop, exited only to change dynamic range, 10G parameters etc (then recreated again) + while(true){ + + void *context = zmq_ctx_new(); + // create a publisher + socket = zmq_socket(context, ZMQ_PUB); + // bind + zmq_bind(socket,hostName);/**increment this by 1*/ + + /* inner loop - loop for each buffer */ + //until mask reset (udp sockets shut down by client) + while((1 << ithread) & dataCallbackThreadsMask){ + + //wait for data + sem_wait(&dataCallbackSemaphore[ithread]); + if(status == TRANSMITTING) + continue; + int numpackets = (uint32_t)(*( (uint32_t*) latestData)); /*latestdata should be size of one buffer*/ + memcpy(buffer, latestData, numpackets*onePacketSize);/**read first bytes to get numpackets*/ + + /*check if it should be added to previous (processlistening buffer for datacompression)*/ + + zmq_send(socket, buffer.data(), buffer.size(), 0); + + }/*--end of loop for each buffer (inner loop)*/ + + //end of acquisition, wait for next acquisition/change of parameters + sem_wait(&dataCallbackSemaphore[ithread]); + + //check to exit thread (for change of parameters) - only EXIT possibility + if(killAllDataCallbackThreads){ + cprintf(BLUE,"DataCallback_Thread %d:Goodbye!\n",ithread); + //free resources at exit + zmq_unbind(socket, hostName); + zmq_close(socket); + zmq_ctx_destroy(context); + pthread_exit(NULL); + } + + }/*--end of loop for each acquisition (outer loop) */ + +} + + void UDPStandardImplementation::startListening(){ @@ -2222,6 +2361,8 @@ void UDPStandardImplementation::stopWriting(int ithread, char* wbuffer){ //ensure listening threads done before updating status as it returns to client (from stopReceiver) while(listeningThreadsMask) usleep(5000); + while(dataCallbackThreadsMask) + usleep(5000); //update status pthread_mutex_lock(&statusMutex); status = RUN_FINISHED; @@ -2485,22 +2626,19 @@ void UDPStandardImplementation::copyFrameToGui(int ithread, char* buffer, uint32 FILE_LOG(logDEBUG) << __AT__ << " called"; - //random read (gui not ready) - //need to toggle guiDataReady or the second frame wont be copied - if((!FrameToGuiFrequency) && (!guiData[ithread])){ -#ifdef DEBUG4 - cprintf(GREEN,"Writing_Thread: CopyingFrame: Resetting guiDataReady\n"); -#endif - pthread_mutex_lock(&dataReadyMutex); - guiDataReady[ithread]=0; - pthread_mutex_unlock(&dataReadyMutex); - } //if nthe frame, wait for your turn (1st frame always shown as its zero) - else if(FrameToGuiFrequency && ((frametoGuiCounter[ithread])%FrameToGuiFrequency)); + if(FrameToGuiFrequency && ((frametoGuiCounter[ithread])%FrameToGuiFrequency)); //random read (gui ready) or nth frame read: gui needs data now or it is the first frame else{ + + //tell datacallback to pick up data + sem_post(&dataCallbackSemaphore[ithread]); + + + memcpy(latestData[ithread],buffer , numpackets*onePacketSize); + #ifdef DEBUG4 cprintf(GREEN,"Writing_Thread: CopyingFrame: Gui needs data now OR 1st frame\n"); #endif