This commit is contained in:
2021-07-01 14:23:33 +02:00
parent 0e7c643cf9
commit 9d8c68b1d0
15 changed files with 370 additions and 320 deletions

View File

@ -23,63 +23,162 @@
#include <cstring>
#include <iostream>
const std::string DataProcessor::TypeName = "DataProcessor";
const std::string DataProcessor::typeName_ = "DataProcessor";
DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo *f, bool act,
bool depaden, bool *dsEnable, uint32_t *freq,
uint32_t *timer, uint32_t *sfnum, bool *fp,
bool *sm, std::vector<int> *cdl, int *cdo,
int *cad)
: ThreadObject(ind, TypeName), fifo(f), myDetectorType(dtype),
dataStreamEnable(dsEnable), activated(act),
deactivatedPaddingEnable(depaden), streamingFrequency(freq),
streamingTimerInMs(timer), streamingStartFnum(sfnum), silentMode(sm),
framePadding(fp), ctbDbitList(cdl), ctbDbitOffset(cdo),
ctbAnalogDataBytes(cad), firstStreamerFrame(false) {
LOG(logDEBUG) << "DataProcessor " << ind << " created";
memset((void *)&timerBegin, 0, sizeof(timespec));
DataProcessor::DataProcessor(int index, detectorType detectorType, Fifo *fifo,
bool *activated, bool *deactivatedPaddingEnable,
bool *dataStreamEnable,
uint32_t *streamingFrequency,
uint32_t *streamingTimerInMs,
uint32_t *streamingStartFnum, bool *framePadding,
std::vector<int> *ctbDbitList, int *ctbDbitOffset,
int *ctbAnalogDataBytes, std::mutex *hdf5Lib)
: ThreadObject(index, typeName_), fifo_(fifo), detectorType_(detectorType),
dataStreamEnable_(dataStreamEnable), activated_(activated),
deactivatedPaddingEnable_(deactivatedPaddingEnable),
streamingFrequency_(streamingFrequency),
streamingTimerInMs_(streamingTimerInMs),
streamingStartFnum_(streamingStartFnum), framePadding_(framePadding),
ctbDbitList_(ctbDbitList), ctbDbitOffset_(ctbDbitOffset),
ctbAnalogDataBytes_(ctbAnalogDataBytes), firstStreamerFrame_(false),
hdf5Lib_(hdf5Lib) {
LOG(logDEBUG) << "DataProcessor " << index << " created";
memset((void *)&timerbegin_, 0, sizeof(timespec));
}
DataProcessor::~DataProcessor() {}
DataProcessor::~DataProcessor() { DeleteFiles(); }
/** getters */
bool DataProcessor::GetStartedFlag() { return startedFlag; }
bool DataProcessor::GetStartedFlag() { return startedFlag_; }
uint64_t DataProcessor::GetNumFramesCaught() { return numFramesCaught; }
uint64_t DataProcessor::GetNumFramesCaught() { return numFramesCaught_; }
uint64_t DataProcessor::GetCurrentFrameIndex() { return currentFrameIndex; }
uint64_t DataProcessor::GetCurrentFrameIndex() { return currentFrameIndex_; }
uint64_t DataProcessor::GetProcessedIndex() {
return currentFrameIndex - firstIndex;
return currentFrameIndex_ - firstIndex_;
}
void DataProcessor::SetFifo(Fifo *f) { fifo = f; }
void DataProcessor::SetFifo(Fifo *fifo) { fifo_ = fifo; }
void DataProcessor::ResetParametersforNewAcquisition() {
StopRunning();
startedFlag = false;
numFramesCaught = 0;
firstIndex = 0;
currentFrameIndex = 0;
firstStreamerFrame = true;
startedFlag_ = false;
numFramesCaught_ = 0;
firstIndex_ = 0;
currentFrameIndex_ = 0;
firstStreamerFrame_ = true;
}
void DataProcessor::RecordFirstIndex(uint64_t fnum) {
// listen to this fnum, later +1
currentFrameIndex = fnum;
currentFrameIndex_ = fnum;
startedFlag = true;
firstIndex = fnum;
startedFlag_ = true;
firstIndex_ = fnum;
LOG(logDEBUG1) << index << " First Index:" << firstIndex;
LOG(logDEBUG1) << index << " First Index:" << firstIndex_;
}
void DataProcessor::SetGeneralData(GeneralData *g) { generalData = g; }
void DataProcessor::SetGeneralData(GeneralData *generalData) {
generalData_ = generalData;
}
void DataProcessor::DeleteFiles() {
if (dataFile_ != nullptr) {
delete dataFile_;
dataFile_ = nullptr;
}
if (masterFile_ != nullptr) {
delete masterFile_;
masterFile_ = nullptr;
}
}
void DataProcessor::SetupFileWriter(const bool filewriteEnable,
const bool masterFilewriteEnable,
const fileFormat fileFormatType,
const int modulePos) {
DeleteFiles();
if (filewriteEnable) {
switch (fileFormatType) {
#ifdef HDF5C
case HDF5:
dataFile_ = new HDF5DataFile(index, hdf5Lib_);
if (modulePos == 0 && index == 0) {
if (masterFilewriteEnable) {
masterFile_ = new HDF5MasterFile(hdf5Lib_);
}
// virtual file
}
break;
#endif
case BINARY:
dataFile_ = new BinaryDataFile(index);
if (modulePos == 0 && index == 0 && masterFilewriteEnable) {
masterFile_ = new BinaryMasterFile();
}
break;
default:
throw sls::RuntimeError(
"Unknown file format (compile with hdf5 flags");
}
}
}
void DataProcessor::CreateFirstFiles(
MasterAttributes *attr, const std::string filePath,
const std::string fileNamePrefix, const uint64_t fileIndex,
const bool overWriteEnable, const bool silentMode, const int modulePos,
const int numUnitsPerReadout, const uint32_t udpPortNumber,
const uint32_t maxFramesPerFile, const uint64_t numImages,
const uint32_t nPIxelsX, const uint32_t nPIxelsY,
const uint32_t dynamicRange) {
if (dataFile_ == nullptr) {
throw sls::RuntimeError("file object not contstructed");
}
dataFile_->CloseFile();
/*
#ifdef HDF5C
if (virtualFile_) {
virtualFile_->CloseFile();
}
#endif
*/
// master file write enabled
if (masterFile_) {
masterFile_->CloseFile();
masterFile_->CreateMasterFile(filePath, fileNamePrefix, fileIndex,
overWriteEnable, silentMode, attr);
}
switch (dataFile_->GetFileFormat()) {
#ifdef HDF5C
case HDF5:
dataFile_->CreateFirstHDF5DataFile(
filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode,
modulePos, numUnitsPerReadout, udpPortNumber, maxFramesPerFile,
numImages, nPIxelsX, nPIxelsY, dynamicRange);
/*if (virtualFile_) {
virtualFile_->CreateFile();
}*/
break;
#endif
case BINARY:
dataFile_->CreateFirstBinaryDataFile(
filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode,
modulePos, numUnitsPerReadout, udpPortNumber, maxFramesPerFile);
break;
default:
throw sls::RuntimeError("Unknown file format (compile with hdf5 flags");
}
}
void DataProcessor::ThreadExecution() {
char *buffer = nullptr;
fifo->PopAddress(buffer);
fifo_->PopAddress(buffer);
LOG(logDEBUG5) << "DataProcessor " << index
<< ", "
"pop 0x"
@ -97,21 +196,21 @@ void DataProcessor::ThreadExecution() {
try {
fnum = ProcessAnImage(buffer);
} catch (const std::exception &e) {
fifo->FreeAddress(buffer);
fifo_->FreeAddress(buffer);
return;
}
// stream (if time/freq to stream) or free
if (*dataStreamEnable && SendToStreamer()) {
if (*dataStreamEnable_ && SendToStreamer()) {
// if first frame to stream, add frame index to fifo header (might
// not be the first)
if (firstStreamerFrame) {
firstStreamerFrame = false;
if (firstStreamerFrame_) {
firstStreamerFrame_ = false;
(*((uint32_t *)(buffer + FIFO_DATASIZE_NUMBYTES))) =
(uint32_t)(fnum - firstIndex);
(uint32_t)(fnum - firstIndex_);
}
fifo->PushAddressToStream(buffer);
fifo_->PushAddressToStream(buffer);
} else {
fifo->FreeAddress(buffer);
fifo_->FreeAddress(buffer);
}
}
@ -119,10 +218,10 @@ void DataProcessor::StopProcessing(char *buf) {
LOG(logDEBUG1) << "DataProcessing " << index << ": Dummy";
// stream or free
if (*dataStreamEnable)
fifo->PushAddressToStream(buf);
if (*dataStreamEnable_)
fifo_->PushAddressToStream(buf);
else
fifo->FreeAddress(buf);
fifo_->FreeAddress(buf);
StopRunning();
LOG(logDEBUG1) << index << ": Processing Completed";
@ -133,37 +232,37 @@ uint64_t DataProcessor::ProcessAnImage(char *buf) {
auto *rheader = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES);
sls_detector_header header = rheader->detHeader;
uint64_t fnum = header.frameNumber;
currentFrameIndex = fnum;
currentFrameIndex_ = fnum;
uint32_t nump = header.packetNumber;
if (nump == generalData->packetsPerFrame) {
numFramesCaught++;
if (nump == generalData_->packetsPerFrame) {
numFramesCaught_++;
}
LOG(logDEBUG1) << "DataProcessing " << index << ": fnum:" << fnum;
if (!startedFlag) {
if (!startedFlag_) {
RecordFirstIndex(fnum);
if (*dataStreamEnable) {
if (*dataStreamEnable_) {
// restart timer
clock_gettime(CLOCK_REALTIME, &timerBegin);
timerBegin.tv_sec -= (*streamingTimerInMs) / 1000;
timerBegin.tv_nsec -= ((*streamingTimerInMs) % 1000) * 1000000;
clock_gettime(CLOCK_REALTIME, &timerbegin_);
timerbegin_.tv_sec -= (*streamingTimerInMs_) / 1000;
timerbegin_.tv_nsec -= ((*streamingTimerInMs_) % 1000) * 1000000;
// to send first image
currentFreqCount = *streamingFrequency - *streamingStartFnum;
currentFreqCount_ = *streamingFrequency_ - *streamingStartFnum_;
}
}
// frame padding
if (activated && *framePadding && nump < generalData->packetsPerFrame)
if (*activated_ && *framePadding_ && nump < generalData_->packetsPerFrame)
PadMissingPackets(buf);
// deactivated and padding enabled
else if (!activated && deactivatedPaddingEnable)
else if (!*activated_ && *deactivatedPaddingEnable_)
PadMissingPackets(buf);
// rearrange ctb digital bits (if ctbDbitlist is not empty)
if (!(*ctbDbitList).empty()) {
if (!(*ctbDbitList_).empty()) {
RearrangeDbitData(buf);
}
@ -195,7 +294,7 @@ uint64_t DataProcessor::ProcessAnImage(char *buf) {
bool DataProcessor::SendToStreamer() {
// skip
if ((*streamingFrequency) == 0u) {
if ((*streamingFrequency_) == 0u) {
if (!CheckTimer())
return false;
} else {
@ -210,26 +309,26 @@ bool DataProcessor::CheckTimer() {
clock_gettime(CLOCK_REALTIME, &end);
LOG(logDEBUG1) << index << " Timer elapsed time:"
<< ((end.tv_sec - timerBegin.tv_sec) +
(end.tv_nsec - timerBegin.tv_nsec) / 1000000000.0)
<< ((end.tv_sec - timerbegin_.tv_sec) +
(end.tv_nsec - timerbegin_.tv_nsec) / 1000000000.0)
<< " seconds";
// still less than streaming timer, keep waiting
if (((end.tv_sec - timerBegin.tv_sec) +
(end.tv_nsec - timerBegin.tv_nsec) / 1000000000.0) <
((double)*streamingTimerInMs / 1000.00))
if (((end.tv_sec - timerbegin_.tv_sec) +
(end.tv_nsec - timerbegin_.tv_nsec) / 1000000000.0) <
((double)*streamingTimerInMs_ / 1000.00))
return false;
// restart timer
clock_gettime(CLOCK_REALTIME, &timerBegin);
clock_gettime(CLOCK_REALTIME, &timerbegin_);
return true;
}
bool DataProcessor::CheckCount() {
if (currentFreqCount == *streamingFrequency) {
currentFreqCount = 1;
if (currentFreqCount_ == *streamingFrequency_) {
currentFreqCount_ = 1;
return true;
}
currentFreqCount++;
currentFreqCount_++;
return false;
}
@ -249,18 +348,18 @@ void DataProcessor::registerCallBackRawDataModifyReady(
void DataProcessor::PadMissingPackets(char *buf) {
LOG(logDEBUG) << index << ": Padding Missing Packets";
uint32_t pperFrame = generalData->packetsPerFrame;
uint32_t pperFrame = generalData_->packetsPerFrame;
auto *header = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES);
uint32_t nmissing = pperFrame - header->detHeader.packetNumber;
sls_bitset pmask = header->packetsMask;
uint32_t dsize = generalData->dataSize;
if (myDetectorType == GOTTHARD2 && index != 0) {
dsize = generalData->vetoDataSize;
uint32_t dsize = generalData_->dataSize;
if (detectorType_ == GOTTHARD2 && index != 0) {
dsize = generalData_->vetoDataSize;
}
uint32_t fifohsize = generalData->fifoBufferHeaderSize;
uint32_t fifohsize = generalData_->fifoBufferHeaderSize;
uint32_t corrected_dsize =
dsize - ((pperFrame * dsize) - generalData->imageSize);
dsize - ((pperFrame * dsize) - generalData_->imageSize);
LOG(logDEBUG1) << "bitmask: " << pmask.to_string();
for (unsigned int pnum = 0; pnum < pperFrame; ++pnum) {
@ -277,7 +376,7 @@ void DataProcessor::PadMissingPackets(char *buf) {
<< std::endl;
// missing packet
switch (myDetectorType) {
switch (detectorType_) {
// for gotthard, 1st packet: 4 bytes fnum, CACA + CACA, 639*2 bytes
// data
// 2nd packet: 4 bytes fnum, previous 1*2 bytes data +
@ -308,7 +407,7 @@ void DataProcessor::RearrangeDbitData(char *buf) {
// TODO! (Erik) Refactor and add tests
int totalSize = (int)(*((uint32_t *)buf));
int ctbDigitalDataBytes =
totalSize - (*ctbAnalogDataBytes) - (*ctbDbitOffset);
totalSize - (*ctbAnalogDataBytes_) - (*ctbDbitOffset_);
// no digital data
if (ctbDigitalDataBytes == 0) {
@ -319,19 +418,19 @@ void DataProcessor::RearrangeDbitData(char *buf) {
const int numSamples = (ctbDigitalDataBytes / sizeof(uint64_t));
const int digOffset = FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header) +
(*ctbAnalogDataBytes);
(*ctbAnalogDataBytes_);
// ceil as numResult8Bits could be decimal
const int numResult8Bits =
ceil((double)(numSamples * (*ctbDbitList).size()) / 8.00);
ceil((double)(numSamples * (*ctbDbitList_).size()) / 8.00);
std::vector<uint8_t> result(numResult8Bits);
uint8_t *dest = &result[0];
auto *source = (uint64_t *)(buf + digOffset + (*ctbDbitOffset));
auto *source = (uint64_t *)(buf + digOffset + (*ctbDbitOffset_));
// loop through digital bit enable vector
int bitoffset = 0;
for (auto bi : (*ctbDbitList)) {
for (auto bi : (*ctbDbitList_)) {
// where numbits * numsamples is not a multiple of 8
if (bitoffset != 0) {
bitoffset = 0;