diff --git a/slsReceiverSoftware/include/DataProcessor.h b/slsReceiverSoftware/include/DataProcessor.h index 433cd81a5..0f20f23e1 100644 --- a/slsReceiverSoftware/include/DataProcessor.h +++ b/slsReceiverSoftware/include/DataProcessor.h @@ -28,10 +28,13 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { * @param ftype pointer to file format type * @param fwenable pointer to file writer enable * @param dsEnable pointer to data stream enable + * @param freq pointer to streaming frequency + * @param timer pointer to timer if streaming frequency is random * @param dataReadycb pointer to data ready call back function * @param pDataReadycb pointer to arguments of data ready call back function */ DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable, + uint32_t* freq, uint32_t* timer, void (*dataReadycb)(uint64_t, uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t, uint16_t, uint32_t, uint16_t, uint8_t, uint8_t, char*, uint32_t, void*), void *pDataReadycb); @@ -240,7 +243,26 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { */ void ProcessAnImage(char* buf); + /** + * Calls CheckTimer and CheckCount for streaming frequency and timer + * and determines if the current image should be sent to streamer + * @returns true if it should to streamer, else false + */ + bool SendToStreamer(); + /** + * This function should be called only in random frequency mode + * Checks if timer is done and ready to send to stream + * @returns true if ready to send to stream, else false + */ + bool CheckTimer(); + + /** + * This function should be called only in non random frequency mode + * Checks if count is done and ready to send to stream + * @returns true if ready to send to stream, else false + */ + bool CheckCount(); /** type of thread */ static const std::string TypeName; @@ -277,6 +299,19 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { /** File Write Enable */ bool* fileWriteEnable; + /** Pointer to Streaming frequency, if 0, sending random images with a timer */ + uint32_t* streamingFrequency; + + /** Pointer to the timer if Streaming frequency is random */ + uint32_t* streamingTimerInMs; + + /** Current frequency count */ + uint32_t currentFreqCount; + + /** timer beginning stamp for random streaming */ + struct timespec timerBegin; + + //acquisition start /** Aquisition Started flag */ @@ -304,8 +339,6 @@ class DataProcessor : private virtual slsReceiverDefs, public ThreadObject { - - //call back /** * Call back for raw data diff --git a/slsReceiverSoftware/include/DataStreamer.h b/slsReceiverSoftware/include/DataStreamer.h index 1a2201a05..50d3f2e14 100644 --- a/slsReceiverSoftware/include/DataStreamer.h +++ b/slsReceiverSoftware/include/DataStreamer.h @@ -22,11 +22,9 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { * Calls Base Class CreateThread(), sets ErrorMask if error and increments NumberofDataStreamers * @param f address of Fifo pointer * @param dr pointer to dynamic range - * @param freq pointer to streaming frequency - * @param timer pointer to timer if streaming frequency is random * @param sEnable pointer to short frame enable */ - DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* timer, int* sEnable); + DataStreamer(Fifo*& f, uint32_t* dr, int* sEnable); /** * Destructor @@ -151,20 +149,6 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { */ void ProcessAnImage(char* buf); - /** - * This function should be called only in random frequency mode - * Checks if timer is done and ready to send data - * @returns true if ready to send data, else false - */ - bool CheckTimer(); - - /** - * This function should be called only in non random frequency mode - * Checks if count is done and ready to send data - * @returns true if ready to send data, else false - */ - bool CheckCount(); - /** * Create and send Json Header * @param header header of image @@ -203,18 +187,6 @@ class DataStreamer : private virtual slsReceiverDefs, public ThreadObject { /** Pointer to short frame enable */ int* shortFrameEnable; - /** Pointer to Streaming frequency, if 0, sending random images with a timer */ - uint32_t* streamingFrequency; - - /** Pointer to the timer if Streaming frequency is random */ - uint32_t* streamingTimerInMs; - - /** Current frequency count */ - uint32_t currentFreqCount; - - /** timer beginning stamp for random streaming */ - struct timespec timerBegin; - /** Aquisition Started flag */ bool acquisitionStartedFlag; diff --git a/slsReceiverSoftware/include/UDPBaseImplementation.h b/slsReceiverSoftware/include/UDPBaseImplementation.h index 9c0766e30..e24a305c5 100644 --- a/slsReceiverSoftware/include/UDPBaseImplementation.h +++ b/slsReceiverSoftware/include/UDPBaseImplementation.h @@ -650,7 +650,6 @@ class UDPBaseImplementation : protected virtual slsReceiverDefs, public UDPInter uint32_t frameToGuiTimerinMS; /** Data Stream Enable from Receiver */ bool dataStreamEnable; - static const int DEFAULT_STREAMING_TIMER = 500; /** streaming port */ uint32_t streamingPort; diff --git a/slsReceiverSoftware/include/sls_receiver_defs.h b/slsReceiverSoftware/include/sls_receiver_defs.h index e2387b416..1221e6386 100755 --- a/slsReceiverSoftware/include/sls_receiver_defs.h +++ b/slsReceiverSoftware/include/sls_receiver_defs.h @@ -27,6 +27,8 @@ typedef int int32_t; #define JFCTB_MAX_FRAMES_PER_FILE 100000 +#define DEFAULT_STREAMING_TIMER_IN_MS 500 + /** \file sls_receiver_defs.h This file contains all the basic definitions common to the slsReceiver class diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index ce611c723..ff7548e89 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -32,6 +32,7 @@ pthread_mutex_t DataProcessor::Mutex = PTHREAD_MUTEX_INITIALIZER; DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dsEnable, + uint32_t* freq, uint32_t* timer, void (*dataReadycb)(uint64_t, uint32_t, uint32_t, uint64_t, uint64_t, uint16_t, uint16_t, uint16_t, uint16_t, uint32_t, uint16_t, uint8_t, uint8_t, char*, uint32_t, void*), void *pDataReadycb) : @@ -43,6 +44,9 @@ DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* dataStreamEnable(dsEnable), fileFormatType(ftype), fileWriteEnable(fwenable), + streamingFrequency(freq), + streamingTimerInMs(timer), + currentFreqCount(0), acquisitionStartedFlag(false), measurementStartedFlag(false), firstAcquisitionIndex(0), @@ -61,6 +65,8 @@ DataProcessor::DataProcessor(Fifo*& f, fileFormat* ftype, bool* fwenable, bool* NumberofDataProcessors++; FILE_LOG (logDEBUG) << "Number of DataProcessors: " << NumberofDataProcessors; + + memset((void*)&timerBegin, 0, sizeof(timespec)); } @@ -289,15 +295,14 @@ void DataProcessor::ThreadExecution() { ProcessAnImage(buffer + FIFO_HEADER_NUMBYTES); - //stream or free - if (*dataStreamEnable) + //stream (if time/freq to stream) or free + if (*dataStreamEnable && SendToStreamer()) fifo->PushAddressToStream(buffer); else fifo->FreeAddress(buffer); } - void DataProcessor::StopProcessing(char* buf) { #ifdef VERBOSE if (!index) @@ -339,6 +344,13 @@ void DataProcessor::ProcessAnImage(char* buf) { if (!index) bprintf(BLUE,"DataProcessing %d: fnum:%lu\n", index, fnum); #endif RecordFirstIndices(fnum); + + if (*dataStreamEnable) { + //restart timer + clock_gettime(CLOCK_REALTIME, &timerBegin); + //to send first image + currentFreqCount = *streamingFrequency; + } } @@ -367,3 +379,43 @@ void DataProcessor::ProcessAnImage(char* buf) { } + + +bool DataProcessor::SendToStreamer() { + //skip + if (!(*streamingFrequency)) { + if (!CheckTimer()) + return false; + } else { + if (!CheckCount()) + return false; + } + return true; +} + + +bool DataProcessor::CheckTimer() { + struct timespec end; + clock_gettime(CLOCK_REALTIME, &end); +#ifdef VERBOSE + bprintf(BLUE,"%d Timer elapsed time:%f seconds\n", index, ( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0); +#endif + //still less than streaming timer, keep waiting + if((( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0) < ((double)*streamingTimerInMs/1000.00)) + return false; + + //restart timer + clock_gettime(CLOCK_REALTIME, &timerBegin); + return true; +} + + +bool DataProcessor::CheckCount() { + if (currentFreqCount == *streamingFrequency ) { + currentFreqCount = 1; + return true; + } + currentFreqCount++; + return false; +} + diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index f4cb49d2e..5823adf63 100644 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -24,16 +24,13 @@ uint64_t DataStreamer::RunningMask(0x0); pthread_mutex_t DataStreamer::Mutex = PTHREAD_MUTEX_INITIALIZER; -DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* timer, int* sEnable) : +DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, int* sEnable) : ThreadObject(NumberofDataStreamers), generalData(0), fifo(f), zmqSocket(0), dynamicRange(dr), shortFrameEnable(sEnable), - streamingFrequency(freq), - streamingTimerInMs(timer), - currentFreqCount(0), acquisitionStartedFlag(false), measurementStartedFlag(false), firstAcquisitionIndex(0), @@ -49,7 +46,6 @@ DataStreamer::DataStreamer(Fifo*& f, uint32_t* dr, uint32_t* freq, uint32_t* tim NumberofDataStreamers++; FILE_LOG (logDEBUG) << "Number of DataStreamers: " << NumberofDataStreamers; - memset((void*)&timerBegin, 0, sizeof(timespec)); strcpy(fileNametoStream, ""); } @@ -236,19 +232,6 @@ void DataStreamer::ProcessAnImage(char* buf) { if (!index) bprintf(MAGENTA,"DataStreamer %d: fnum:%lu\n", index, fnum); #endif RecordFirstIndices(fnum); - //restart timer - clock_gettime(CLOCK_REALTIME, &timerBegin); - //to send first image - currentFreqCount = *streamingFrequency; - } - - //skip - if (!(*streamingFrequency)) { - if (!CheckTimer()) - return; - } else { - if (!CheckCount()) - return; } if (!SendHeader(header)) @@ -272,32 +255,6 @@ void DataStreamer::ProcessAnImage(char* buf) { -bool DataStreamer::CheckTimer() { - struct timespec end; - clock_gettime(CLOCK_REALTIME, &end); -#ifdef VERBOSE - bprintf(BLUE,"%d Timer elapsed time:%f seconds\n", index, ( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0); -#endif - //still less than streaming timer, keep waiting - if((( end.tv_sec - timerBegin.tv_sec ) + ( end.tv_nsec - timerBegin.tv_nsec ) / 1000000000.0) < (*streamingTimerInMs/1000)) - return false; - - //restart timer - clock_gettime(CLOCK_REALTIME, &timerBegin); - return true; -} - - -bool DataStreamer::CheckCount() { - if (currentFreqCount == *streamingFrequency ) { - currentFreqCount = 1; - return true; - } - currentFreqCount++; - return false; -} - - int DataStreamer::SendHeader(sls_detector_header* header, bool dummy) { if (dummy) diff --git a/slsReceiverSoftware/src/UDPBaseImplementation.cpp b/slsReceiverSoftware/src/UDPBaseImplementation.cpp index 19db43028..66e84d3a5 100644 --- a/slsReceiverSoftware/src/UDPBaseImplementation.cpp +++ b/slsReceiverSoftware/src/UDPBaseImplementation.cpp @@ -77,7 +77,7 @@ void UDPBaseImplementation::initializeMembers(){ //***acquisition parameters*** shortFrameEnable = -1; frameToGuiFrequency = 0; - frameToGuiTimerinMS = DEFAULT_STREAMING_TIMER; + frameToGuiTimerinMS = DEFAULT_STREAMING_TIMER_IN_MS; dataStreamEnable = false; streamingPort = 0; } diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index 844ed0d61..759dea8e9 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -194,7 +194,7 @@ int UDPStandardImplementation::setDataStreamEnable(const bool enable) {\ if (enable) { bool error = false; for ( int i = 0; i < numThreads; ++i ) { - dataStreamer.push_back(new DataStreamer(fifo[i], &dynamicRange, &frameToGuiFrequency, &frameToGuiTimerinMS, &shortFrameEnable)); + dataStreamer.push_back(new DataStreamer(fifo[i], &dynamicRange, &shortFrameEnable)); dataStreamer[i]->SetGeneralData(generalData); // check again if (streamingPort == 0) @@ -314,7 +314,8 @@ int UDPStandardImplementation::setDetectorType(const detectorType d) { //create threads for ( int i=0; i < numThreads; ++i ) { listener.push_back(new Listener(myDetectorType, fifo[i], &status, &udpPortNum[i], eth, &activated, &numberOfFrames, &dynamicRange)); - dataProcessor.push_back(new DataProcessor(fifo[i], &fileFormatType, &fileWriteEnable, &dataStreamEnable, + dataProcessor.push_back(new DataProcessor(fifo[i], &fileFormatType, + &fileWriteEnable, &dataStreamEnable, &frameToGuiFrequency, &frameToGuiTimerinMS, rawDataReadyCallBack,pRawDataReady)); if (Listener::GetErrorMask() || DataProcessor::GetErrorMask()) { FILE_LOG (logERROR) << "Error: Could not creates listener/dataprocessor threads (index:" << i << ")";