diff --git a/slsDetectorSoftware/tests/test-CmdProxy-rx.cpp b/slsDetectorSoftware/tests/test-CmdProxy-rx.cpp index d283a4531..93cd63def 100644 --- a/slsDetectorSoftware/tests/test-CmdProxy-rx.cpp +++ b/slsDetectorSoftware/tests/test-CmdProxy-rx.cpp @@ -620,6 +620,30 @@ TEST_CASE("rx_readfreq", "[.cmd][.rx][.new]") { } } +TEST_CASE("rx_zmqstartfnum", "[.cmd][.rx][.new]") { + Detector det; + CmdProxy proxy(&det); + auto prev_val = det.getRxZmqStartingFrame(); + { + std::ostringstream oss; + proxy.Call("rx_zmqstartfnum", {"5"}, -1, PUT, oss); + REQUIRE(oss.str() == "rx_zmqstartfnum 5\n"); + } + { + std::ostringstream oss; + proxy.Call("rx_zmqstartfnum", {}, -1, GET, oss); + REQUIRE(oss.str() == "rx_zmqstartfnum 5\n"); + } + { + std::ostringstream oss; + proxy.Call("rx_zmqstartfnum", {"0"}, -1, PUT, oss); + REQUIRE(oss.str() == "rx_zmqstartfnum 0\n"); + } + for (int i = 0; i != det.size(); ++i) { + det.setRxZmqStartingFrame(prev_val[i], {i}); + } +} + TEST_CASE("rx_zmqport", "[.cmd][.rx][.new]") { Detector det; CmdProxy proxy(&det); diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index be6df647a..e15243136 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -24,17 +24,17 @@ const std::string DataProcessor::TypeName = "DataProcessor"; DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo *f, fileFormat *ftype, bool fwenable, bool *mfwenable, bool *dsEnable, uint32_t *dr, uint32_t *freq, - uint32_t *timer, uint32_t *sfnum, bool *fp, bool *act, - bool *depaden, bool *sm, bool *qe, + uint32_t *timer, uint32_t *sfnum, bool *fp, + bool *act, bool *depaden, bool *sm, bool *qe, std::vector *cdl, int *cdo, int *cad) : ThreadObject(ind, TypeName), fifo(f), myDetectorType(dtype), dataStreamEnable(dsEnable), fileFormatType(ftype), fileWriteEnable(fwenable), masterFileWriteEnable(mfwenable), dynamicRange(dr), streamingFrequency(freq), streamingTimerInMs(timer), - streamingStartFnum(sfnum), - activated(act), deactivatedPaddingEnable(depaden), silentMode(sm), - quadEnable(qe), framePadding(fp), ctbDbitList(cdl), ctbDbitOffset(cdo), - ctbAnalogDataBytes(cad) { + streamingStartFnum(sfnum), activated(act), + deactivatedPaddingEnable(depaden), silentMode(sm), quadEnable(qe), + framePadding(fp), ctbDbitList(cdl), ctbDbitOffset(cdo), + ctbAnalogDataBytes(cad), firstStreamerFrame(false) { LOG(logDEBUG) << "DataProcessor " << ind << " created"; memset((void *)&timerBegin, 0, sizeof(timespec)); } @@ -61,6 +61,7 @@ void DataProcessor::ResetParametersforNewAcquisition() { numFramesCaught = 0; firstIndex = 0; currentFrameIndex = 0; + firstStreamerFrame = true; } void DataProcessor::RecordFirstIndex(uint64_t fnum) { @@ -183,13 +184,21 @@ void DataProcessor::ThreadExecution() { return; } - ProcessAnImage(buffer); + uint64_t fnum = ProcessAnImage(buffer); // stream (if time/freq to stream) or free - if (*dataStreamEnable && SendToStreamer()) + if (*dataStreamEnable && SendToStreamer()) { + // if first frame to stream, add frame index to fifo header (might not + // be the first) + if (firstStreamerFrame) { + firstStreamerFrame = false; + (*((uint32_t *)(buffer + FIFO_DATASIZE_NUMBYTES))) = + (uint32_t)(fnum - firstIndex); + } fifo->PushAddressToStream(buffer); - else + } else { fifo->FreeAddress(buffer); + } } void DataProcessor::StopProcessing(char *buf) { @@ -207,7 +216,7 @@ void DataProcessor::StopProcessing(char *buf) { LOG(logDEBUG1) << index << ": Processing Completed"; } -void DataProcessor::ProcessAnImage(char *buf) { +uint64_t DataProcessor::ProcessAnImage(char *buf) { auto *rheader = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES); sls_detector_header header = rheader->detHeader; @@ -222,7 +231,6 @@ void DataProcessor::ProcessAnImage(char *buf) { if (!startedFlag) { RecordFirstIndex(fnum); - if (*dataStreamEnable) { // restart timer clock_gettime(CLOCK_REALTIME, &timerBegin); @@ -279,6 +287,7 @@ void DataProcessor::ProcessAnImage(char *buf) { // stopReceiver tcp) } } + return fnum; } bool DataProcessor::SendToStreamer() { diff --git a/slsReceiverSoftware/src/DataProcessor.h b/slsReceiverSoftware/src/DataProcessor.h index 28a65a970..26917a947 100644 --- a/slsReceiverSoftware/src/DataProcessor.h +++ b/slsReceiverSoftware/src/DataProcessor.h @@ -49,10 +49,9 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { */ DataProcessor(int ind, detectorType dtype, Fifo *f, fileFormat *ftype, bool fwenable, bool *mfwenable, bool *dsEnable, uint32_t *dr, - uint32_t *freq, uint32_t *timer, - uint32_t *sfnum, bool *fp, bool *act, - bool *depaden, bool *sm, bool *qe, std::vector *cdl, - int *cdo, int *cad); + uint32_t *freq, uint32_t *timer, uint32_t *sfnum, bool *fp, + bool *act, bool *depaden, bool *sm, bool *qe, + std::vector *cdl, int *cdo, int *cad); /** * Destructor @@ -203,8 +202,9 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { * Process an image popped from fifo, * write to file if fw enabled & update parameters * @param buf address of pointer + * @returns frame number */ - void ProcessAnImage(char *buf); + uint64_t ProcessAnImage(char *buf); /** * Calls CheckTimer and CheckCount for streaming frequency and timer @@ -324,6 +324,9 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { /** Frame Number of latest processed frame number */ std::atomic currentFrameIndex{0}; + /** first streamer frame to add frame index in fifo header */ + bool firstStreamerFrame{false}; + // call back /** * Call back for raw data diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index 3b06dcd46..9c55803a7 100644 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -48,11 +48,14 @@ void DataStreamer::ResetParametersforNewAcquisition(const std::string &fname) { } } -void DataStreamer::RecordFirstIndex(uint64_t fnum) { +void DataStreamer::RecordFirstIndex(uint64_t fnum, char *buf) { startedFlag = true; - firstIndex = fnum; + // streamer first index needn't be + uint64_t firstVal = fnum - (*((uint32_t *)(buf + FIFO_DATASIZE_NUMBYTES))); - LOG(logDEBUG1) << index << " First Index: " << firstIndex; + firstIndex = firstVal; + LOG(logDEBUG1) << index << " First Index: " << firstIndex + << ", First Streamer Index:" << fnum; } void DataStreamer::SetGeneralData(GeneralData *g) { generalData = g; } @@ -137,7 +140,7 @@ void DataStreamer::ProcessAnImage(char *buf) { LOG(logDEBUG1) << "DataStreamer " << index << ": fnum:" << fnum; if (!startedFlag) { - RecordFirstIndex(fnum); + RecordFirstIndex(fnum, buf); } // shortframe gotthard diff --git a/slsReceiverSoftware/src/DataStreamer.h b/slsReceiverSoftware/src/DataStreamer.h index 6ad4faf80..bbfc5aba9 100644 --- a/slsReceiverSoftware/src/DataStreamer.h +++ b/slsReceiverSoftware/src/DataStreamer.h @@ -101,9 +101,10 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { private: /** * Record First Index - * @param fnum frame index to record + * @param fnum current frame number + * @param buf get frame index from buffer to calculate first index to record */ - void RecordFirstIndex(uint64_t fnum); + void RecordFirstIndex(uint64_t fnum, char *buf); /** * Thread Exeution for DataStreamer Class