rx_zmqstartfnum added to have an offset of streaming frame numbers

This commit is contained in:
2020-07-17 11:11:19 +02:00
parent f70d28b175
commit 3bdf02a23c
5 changed files with 62 additions and 22 deletions

View File

@ -24,17 +24,17 @@ const std::string DataProcessor::TypeName = "DataProcessor";
DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo *f,
fileFormat *ftype, bool fwenable, bool *mfwenable,
bool *dsEnable, uint32_t *dr, uint32_t *freq,
uint32_t *timer, uint32_t *sfnum, bool *fp, bool *act,
bool *depaden, bool *sm, bool *qe,
uint32_t *timer, uint32_t *sfnum, bool *fp,
bool *act, bool *depaden, bool *sm, bool *qe,
std::vector<int> *cdl, int *cdo, int *cad)
: ThreadObject(ind, TypeName), fifo(f), myDetectorType(dtype),
dataStreamEnable(dsEnable), fileFormatType(ftype),
fileWriteEnable(fwenable), masterFileWriteEnable(mfwenable),
dynamicRange(dr), streamingFrequency(freq), streamingTimerInMs(timer),
streamingStartFnum(sfnum),
activated(act), deactivatedPaddingEnable(depaden), silentMode(sm),
quadEnable(qe), framePadding(fp), ctbDbitList(cdl), ctbDbitOffset(cdo),
ctbAnalogDataBytes(cad) {
streamingStartFnum(sfnum), activated(act),
deactivatedPaddingEnable(depaden), silentMode(sm), quadEnable(qe),
framePadding(fp), ctbDbitList(cdl), ctbDbitOffset(cdo),
ctbAnalogDataBytes(cad), firstStreamerFrame(false) {
LOG(logDEBUG) << "DataProcessor " << ind << " created";
memset((void *)&timerBegin, 0, sizeof(timespec));
}
@ -61,6 +61,7 @@ void DataProcessor::ResetParametersforNewAcquisition() {
numFramesCaught = 0;
firstIndex = 0;
currentFrameIndex = 0;
firstStreamerFrame = true;
}
void DataProcessor::RecordFirstIndex(uint64_t fnum) {
@ -183,13 +184,21 @@ void DataProcessor::ThreadExecution() {
return;
}
ProcessAnImage(buffer);
uint64_t fnum = ProcessAnImage(buffer);
// stream (if time/freq to stream) or free
if (*dataStreamEnable && SendToStreamer())
if (*dataStreamEnable && SendToStreamer()) {
// if first frame to stream, add frame index to fifo header (might not
// be the first)
if (firstStreamerFrame) {
firstStreamerFrame = false;
(*((uint32_t *)(buffer + FIFO_DATASIZE_NUMBYTES))) =
(uint32_t)(fnum - firstIndex);
}
fifo->PushAddressToStream(buffer);
else
} else {
fifo->FreeAddress(buffer);
}
}
void DataProcessor::StopProcessing(char *buf) {
@ -207,7 +216,7 @@ void DataProcessor::StopProcessing(char *buf) {
LOG(logDEBUG1) << index << ": Processing Completed";
}
void DataProcessor::ProcessAnImage(char *buf) {
uint64_t DataProcessor::ProcessAnImage(char *buf) {
auto *rheader = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES);
sls_detector_header header = rheader->detHeader;
@ -222,7 +231,6 @@ void DataProcessor::ProcessAnImage(char *buf) {
if (!startedFlag) {
RecordFirstIndex(fnum);
if (*dataStreamEnable) {
// restart timer
clock_gettime(CLOCK_REALTIME, &timerBegin);
@ -279,6 +287,7 @@ void DataProcessor::ProcessAnImage(char *buf) {
// stopReceiver tcp)
}
}
return fnum;
}
bool DataProcessor::SendToStreamer() {