From 3295d36f469a832e0edd6a7cfd623bdab9a5b4bc Mon Sep 17 00:00:00 2001 From: Dhanya Thattil Date: Mon, 28 Jun 2021 12:17:46 +0200 Subject: [PATCH] wip --- slsReceiverSoftware/src/BinaryFile_copy.cpp | 173 ++ slsReceiverSoftware/src/BinaryFile_copy.h | 58 + slsReceiverSoftware/src/DataProcessor.cpp | 139 +- slsReceiverSoftware/src/DataProcessor.h | 37 - .../src/DataProcessor_copy.cpp | 515 +++++ slsReceiverSoftware/src/DataProcessor_copy.h | 343 ++++ slsReceiverSoftware/src/HDF5File_copy.cpp | 830 ++++++++ slsReceiverSoftware/src/HDF5File_copy.h | 92 + .../src/Implementation_copy.cpp | 1717 +++++++++++++++++ 9 files changed, 3730 insertions(+), 174 deletions(-) create mode 100644 slsReceiverSoftware/src/BinaryFile_copy.cpp create mode 100644 slsReceiverSoftware/src/BinaryFile_copy.h create mode 100644 slsReceiverSoftware/src/DataProcessor_copy.cpp create mode 100644 slsReceiverSoftware/src/DataProcessor_copy.h create mode 100644 slsReceiverSoftware/src/HDF5File_copy.cpp create mode 100644 slsReceiverSoftware/src/HDF5File_copy.h create mode 100644 slsReceiverSoftware/src/Implementation_copy.cpp diff --git a/slsReceiverSoftware/src/BinaryFile_copy.cpp b/slsReceiverSoftware/src/BinaryFile_copy.cpp new file mode 100644 index 000000000..1e58f9910 --- /dev/null +++ b/slsReceiverSoftware/src/BinaryFile_copy.cpp @@ -0,0 +1,173 @@ +/************************************************ + * @file BinaryFile.cpp + * @short sets/gets properties for the binary file, + * creates/closes the file and writes data to it + ***********************************************/ + +#include "BinaryFile.h" +#include "Fifo.h" +#include "MasterAttributes.h" +#include "receiver_defs.h" + +#include +#include +#include + +FILE *BinaryFile::masterfd = nullptr; + +BinaryFile::BinaryFile(int ind, uint32_t *maxf, int *nd, std::string *fname, + std::string *fpath, uint64_t *findex, bool *owenable, + int *dindex, int *nunits, uint64_t *nf, uint32_t *dr, + uint32_t *portno, bool *smode) + : File(ind, BINARY, maxf, nd, fname, fpath, findex, owenable, dindex, + nunits, nf, dr, portno, smode) { +#ifdef VERBOSE + PrintMembers(); +#endif +} + +BinaryFile::~BinaryFile() { CloseAllFiles(); } + +void BinaryFile::PrintMembers(TLogLevel level) { + File::PrintMembers(level); + LOG(logINFO) << "Max Frames Per File: " << *maxFramesPerFile; + LOG(logINFO) << "Number of Frames in File: " << numFramesInFile; +} + +void BinaryFile::CreateFile() { + numFramesInFile = 0; + + std::ostringstream os; + os << *filePath << "/" << *fileNamePrefix << "_d" + << (*detIndex * (*numUnitsPerDetector) + index) << "_f" << subFileIndex + << '_' << *fileIndex << ".raw"; + currentFileName = os.str(); + + if (!(*overWriteEnable)) { + if (nullptr == + (filefd = fopen((const char *)currentFileName.c_str(), "wx"))) { + filefd = nullptr; + throw sls::RuntimeError("Could not create/overwrite file " + + currentFileName); + } + } else if (nullptr == + (filefd = fopen((const char *)currentFileName.c_str(), "w"))) { + filefd = nullptr; + throw sls::RuntimeError("Could not create file " + currentFileName); + } + // setting to no file buffering + setvbuf(filefd, nullptr, _IONBF, 0); + + if (!(*silentMode)) { + LOG(logINFO) << "[" << *udpPortNumber + << "]: Binary File created: " << currentFileName; + } +} + +void BinaryFile::CloseAllFiles() { + CloseCurrentDataFile(); + CloseMasterFile(); +} + +void BinaryFile::CloseMasterFile() { + if (master) { + if (masterfd) + fclose(masterfd); + masterfd = nullptr; + } +} + +void BinaryFile::CloseCurrentDataFile() { + if (filefd) + fclose(filefd); + filefd = nullptr; +} + +int BinaryFile::WriteData(char *buf, int bsize) { + if (!filefd) + return 0; + return fwrite(buf, 1, bsize, filefd); +} + +void BinaryFile::WriteToFile(char *buffer, int buffersize, + uint64_t currentFrameNumber, + uint32_t numPacketsCaught) { + // check if maxframesperfile = 0 for infinite + if ((*maxFramesPerFile) && (numFramesInFile >= (*maxFramesPerFile))) { + CloseCurrentFile(); + ++subFileIndex; + CreateFile(); + } + numFramesInFile++; + + // write to file + int ret = 0; + + // contiguous bitset + if (sizeof(sls_bitset) == sizeof(bitset_storage)) { + ret = WriteData(buffer, buffersize); + } + + // not contiguous bitset + else { + // write detector header + ret = WriteData(buffer, sizeof(sls_detector_header)); + + // get contiguous representation of bit mask + bitset_storage storage; + memset(storage, 0, sizeof(bitset_storage)); + sls_bitset bits = *(sls_bitset *)(buffer + sizeof(sls_detector_header)); + for (int i = 0; i < MAX_NUM_PACKETS; ++i) + storage[i >> 3] |= (bits[i] << (i & 7)); + // write bitmask + ret += WriteData((char *)storage, sizeof(bitset_storage)); + + // write data + ret += WriteData(buffer + sizeof(sls_detector_header), + buffersize - sizeof(sls_receiver_header)); + } + + // if write error + if (ret != buffersize) { + throw sls::RuntimeError(std::to_string(index) + + " : Write to file failed for image number " + + std::to_string(currentFrameNumber)); + } +} + +void BinaryFile::CreateMasterFile(MasterAttributes *attr) { + if (master) { + std::ostringstream os; + os << *filePath << "/" << *fileNamePrefix << "_master" + << "_" << *fileIndex << ".raw"; + masterFileName = os.str(); + if (!(*silentMode)) { + LOG(logINFO) << "Master File: " << masterFileName; + } + + // create master file + if (!(*overWriteEnable)) { + if (nullptr == (masterfd = fopen( + (const char *)masterFileName.c_str(), "wx"))) { + masterfd = nullptr; + throw sls::RuntimeError("Could not create binary master file " + "(without overwrite enable) " + + masterFileName); + } + } else if (nullptr == + (masterfd = + fopen((const char *)masterFileName.c_str(), "w"))) { + masterfd = nullptr; + throw sls::RuntimeError("Could not create binary master file " + "(with overwrite enable) " + + masterFileName); + } + + attr->WriteMasterBinaryAttributes(masterfd); + if (masterfd) + fclose(masterfd); + masterfd = nullptr; + } +} + +void BinaryFile::StartofAcquisition() { numFramesInFile = 0; } diff --git a/slsReceiverSoftware/src/BinaryFile_copy.h b/slsReceiverSoftware/src/BinaryFile_copy.h new file mode 100644 index 000000000..fbd321a56 --- /dev/null +++ b/slsReceiverSoftware/src/BinaryFile_copy.h @@ -0,0 +1,58 @@ +#pragma once +/************************************************ + * @file BinaryFile.h + * @short sets/gets properties for the binary file, + * creates/closes the file and writes data to it + ***********************************************/ +/** + *@short sets/gets properties for the binary file, creates/closes the file and + *writes data to it + */ + +#include "File.h" + +#include + +class BinaryFile : private virtual slsDetectorDefs, public File { + + public: + /** + * Constructor + * creates the File Writer + * @param ind self index + * @param maxf pointer to max frames per file + * @param nd pointer to number of detectors in each dimension + * @param fname pointer to file name prefix + * @param fpath pointer to file path + * @param findex pointer to file index + * @param owenable pointer to over write enable + * @param dindex pointer to detector index + * @param nunits pointer to number of theads/ units per detector + * @param nf pointer to number of images in acquisition + * @param dr pointer to dynamic range + * @param portno pointer to udp port number for logging + * @param smode pointer to silent mode + */ + BinaryFile(int ind, uint32_t *maxf, int *nd, std::string *fname, + std::string *fpath, uint64_t *findex, bool *owenable, + int *dindex, int *nunits, uint64_t *nf, uint32_t *dr, + uint32_t *portno, bool *smode); + ~BinaryFile(); + + void PrintMembers(TLogLevel level = logDEBUG1) override; + void CreateFile() override; + void CreateMasterFile(MasterAttributes *attr) override; + void StartofAcquisition() override; + void CloseAllFiles() override; + void CloseCurrentDataFile() override; + void CloseMasterFile() override; + void WriteToFile(char *buffer, int buffersize, uint64_t currentFrameNumber, + uint32_t numPacketsCaught) override; + + private: + int WriteData(char *buf, int bsize); + + FILE *filefd = nullptr; + static FILE *masterfd; + uint32_t numFramesInFile = 0; +}; \ No newline at end of file diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index b8205f362..6a64ca706 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -37,11 +37,7 @@ DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo *f, bool act, memset((void *)&timerBegin, 0, sizeof(timespec)); } -DataProcessor::~DataProcessor() { - delete file; - delete masterFile; - delete virtualFile; -} +DataProcessor::~DataProcessor() {} /** getters */ @@ -76,138 +72,7 @@ void DataProcessor::RecordFirstIndex(uint64_t fnum) { LOG(logDEBUG1) << index << " First Index:" << firstIndex; } -void DataProcessor::SetGeneralData(GeneralData *g) { - generalData = g; - if (file != nullptr) { - if (file->GetFileType() == HDF5) { - file->SetNumberofPixels(generalData->nPixelsX, - generalData->nPixelsY); - } - } - if (masterFile != nullptr) { - if (masterFile->GetFileType() == HDF5) { - masterFile->SetNumberofPixels(generalData->nPixelsX, - generalData->nPixelsY); - } - } - if (virtualFile != nullptr) { - if (virtualFile->GetFileType() == HDF5) { - virtualFile->SetNumberofPixels(generalData->nPixelsX, - generalData->nPixelsY); - } - } -} - -void DataProcessor::SetupFileWriter(fileFormat ftype, bool fwe, int act, - int depaden, int *nd, uint32_t *maxf, - std::string *fname, std::string *fpath, - uint64_t *findex, bool *owenable, - int *dindex, int *nunits, uint64_t *nf, - uint32_t *dr, uint32_t *portno, - GeneralData *g) { - activated = act; - deactivatedPaddingEnable = depaden; - if (g != nullptr) - generalData = g; - - // close existing file objects - if (file != nullptr) { - delete file; - file = nullptr; - } - if (masterFile != nullptr) { - delete masterFile; - masterFile = nullptr; - } - if (virtualFile != nullptr) { - delete virtualFile; - virtualFile = nullptr; - } - // skip data file writing for deactivated non padded parts - bool skipDataFileWriting = false; - if (myDetectorType == EIGER && !activated && !deactivatedPaddingEnable) { - skipDataFileWriting = true; - } - - // create file objects - if (fwe) { - switch (fileFormatType) { -#ifdef HDF5C - case HDF5: - // data file - if (!skipDataFileWriting) { - file = new HDF5File(index, maxf, nd, fname, fpath, findex, - owenable, dindex, nunits, nf, dr, portno, - generalData->nPixelsX, - generalData->nPixelsY, silentMode); - } - // master file - if ((index == 0) && (*dindex == 0)) { - masterFile = new HDF5File(index, maxf, nd, fname, fpath, findex, - owenable, dindex, nunits, nf, dr, - portno, generalData->nPixelsX, - generalData->nPixelsY, silentMode); - virtualFile = new HDF5File(index, maxf, nd, fname, fpath, - findex, owenable, dindex, nunits, nf, - dr, portno, generalData->nPixelsX, - generalData->nPixelsY, silentMode); - } - break; -#endif - default: - // data file - if (!skipDataFileWriting) { - file = new BinaryFile(index, maxf, nd, fname, fpath, findex, - owenable, dindex, nunits, nf, dr, portno, - silentMode); - } - // master file - if ((index == 0) && (*dindex == 0)) { - masterFile = new BinaryFile(index, maxf, nd, fname, fpath, - findex, owenable, dindex, nunits, - nf, dr, portno, silentMode); - } - break; - } - } -} - -void DataProcessor::CreateMasterFile(MasterAttributes *attr) { - if (masterFile == nullptr) { - throw sls::RuntimeError("master file object not contstructed"); - } - masterFile->CloseMasterFile(); - masterFile->CreateMasterFile(attr); -} - -void DataProcessor::CreateFirstDataFile() { - if (file == nullptr) { - throw sls::RuntimeError("file object not contstructed"); - } - file->CloseCurrentDataFile(); - file->resetSubFileIndex(); - file->StartofAcquisition(); - // do not create file if deactivated and no padding - if (myDetectorType == EIGER && !activated && !deactivatedPaddingEnable) { - return; - } - file->CreateFile(); -} - -void DataProcessor::CloseFiles() { - if (file != nullptr) - file->CloseAllFiles(); -} - -void DataProcessor::EndofAcquisition(bool anyPacketsCaught, uint64_t numf) { - if ((file != nullptr) && file->GetFileType() == HDF5) { - try { - file->EndofAcquisition(anyPacketsCaught, numf); - } catch (const sls::RuntimeError &e) { - ; // ignore for now //TODO: send error to client via stop receiver - } - } -} +void DataProcessor::SetGeneralData(GeneralData *g) { generalData = g; } void DataProcessor::ThreadExecution() { char *buffer = nullptr; diff --git a/slsReceiverSoftware/src/DataProcessor.h b/slsReceiverSoftware/src/DataProcessor.h index 944931c88..c49c312fb 100644 --- a/slsReceiverSoftware/src/DataProcessor.h +++ b/slsReceiverSoftware/src/DataProcessor.h @@ -119,34 +119,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { * @param portno pointer to udp port number * @param g address of GeneralData (Detector Data) pointer */ - void SetupFileWriter(fileFormat ftype, bool fwe, int act, int depaden, - int *nd, uint32_t *maxf, std::string *fname, - std::string *fpath, uint64_t *findex, bool *owenable, - int *dindex, int *nunits, uint64_t *nf, uint32_t *dr, - uint32_t *portno, GeneralData *g = nullptr); - - /** - * Create Master File (also virtual if hdf5) - * @param attr master file attributes - */ - void CreateMasterFile(MasterAttributes *attr); - - /** - * Create First Data File - */ - void CreatFirsteDataFile(); - - /** - * Closes files - */ - void CloseFiles(); - - /** - * End of Acquisition - * @param anyPacketsCaught true if any packets are caught, else false - * @param numf number of images caught - */ - void EndofAcquisition(bool anyPacketsCaught, uint64_t numf); /** * Update pixel dimensions in file writer @@ -252,15 +224,6 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { /** Detector Type */ detectorType myDetectorType; - /** File writer implemented as binary or hdf5 File */ - File *file{nullptr}; - - /** master file */ - File *masterFile{nullptr}; - - /** virtual file (for hdf5) */ - File *virtualFile{nullptr}; - /** Data Stream Enable */ bool *dataStreamEnable; diff --git a/slsReceiverSoftware/src/DataProcessor_copy.cpp b/slsReceiverSoftware/src/DataProcessor_copy.cpp new file mode 100644 index 000000000..b8205f362 --- /dev/null +++ b/slsReceiverSoftware/src/DataProcessor_copy.cpp @@ -0,0 +1,515 @@ +/************************************************ + * @file DataProcessor.cpp + * @short creates data processor thread that + * pulls pointers to memory addresses from fifos + * and processes data stored in them & writes them to file + ***********************************************/ + +#include "DataProcessor.h" +#include "BinaryFile.h" +#include "Fifo.h" +#include "GeneralData.h" +#include "MasterAttributes.h" +#ifdef HDF5C +#include "HDF5File.h" +#endif +#include "DataStreamer.h" +#include "sls/sls_detector_exceptions.h" + +#include +#include +#include + +const std::string DataProcessor::TypeName = "DataProcessor"; + +DataProcessor::DataProcessor(int ind, detectorType dtype, Fifo *f, bool act, + bool depaden, bool *dsEnable, uint32_t *freq, + uint32_t *timer, uint32_t *sfnum, bool *fp, + bool *sm, std::vector *cdl, int *cdo, + int *cad) + : ThreadObject(ind, TypeName), fifo(f), myDetectorType(dtype), + dataStreamEnable(dsEnable), activated(act), + deactivatedPaddingEnable(depaden), streamingFrequency(freq), + streamingTimerInMs(timer), streamingStartFnum(sfnum), silentMode(sm), + framePadding(fp), ctbDbitList(cdl), ctbDbitOffset(cdo), + ctbAnalogDataBytes(cad), firstStreamerFrame(false) { + LOG(logDEBUG) << "DataProcessor " << ind << " created"; + memset((void *)&timerBegin, 0, sizeof(timespec)); +} + +DataProcessor::~DataProcessor() { + delete file; + delete masterFile; + delete virtualFile; +} + +/** getters */ + +bool DataProcessor::GetStartedFlag() { return startedFlag; } + +uint64_t DataProcessor::GetNumFramesCaught() { return numFramesCaught; } + +uint64_t DataProcessor::GetCurrentFrameIndex() { return currentFrameIndex; } + +uint64_t DataProcessor::GetProcessedIndex() { + return currentFrameIndex - firstIndex; +} + +void DataProcessor::SetFifo(Fifo *f) { fifo = f; } + +void DataProcessor::ResetParametersforNewAcquisition() { + StopRunning(); + startedFlag = false; + numFramesCaught = 0; + firstIndex = 0; + currentFrameIndex = 0; + firstStreamerFrame = true; +} + +void DataProcessor::RecordFirstIndex(uint64_t fnum) { + // listen to this fnum, later +1 + currentFrameIndex = fnum; + + startedFlag = true; + firstIndex = fnum; + + LOG(logDEBUG1) << index << " First Index:" << firstIndex; +} + +void DataProcessor::SetGeneralData(GeneralData *g) { + generalData = g; + if (file != nullptr) { + if (file->GetFileType() == HDF5) { + file->SetNumberofPixels(generalData->nPixelsX, + generalData->nPixelsY); + } + } + if (masterFile != nullptr) { + if (masterFile->GetFileType() == HDF5) { + masterFile->SetNumberofPixels(generalData->nPixelsX, + generalData->nPixelsY); + } + } + if (virtualFile != nullptr) { + if (virtualFile->GetFileType() == HDF5) { + virtualFile->SetNumberofPixels(generalData->nPixelsX, + generalData->nPixelsY); + } + } +} + +void DataProcessor::SetupFileWriter(fileFormat ftype, bool fwe, int act, + int depaden, int *nd, uint32_t *maxf, + std::string *fname, std::string *fpath, + uint64_t *findex, bool *owenable, + int *dindex, int *nunits, uint64_t *nf, + uint32_t *dr, uint32_t *portno, + GeneralData *g) { + activated = act; + deactivatedPaddingEnable = depaden; + if (g != nullptr) + generalData = g; + + // close existing file objects + if (file != nullptr) { + delete file; + file = nullptr; + } + if (masterFile != nullptr) { + delete masterFile; + masterFile = nullptr; + } + if (virtualFile != nullptr) { + delete virtualFile; + virtualFile = nullptr; + } + // skip data file writing for deactivated non padded parts + bool skipDataFileWriting = false; + if (myDetectorType == EIGER && !activated && !deactivatedPaddingEnable) { + skipDataFileWriting = true; + } + + // create file objects + if (fwe) { + switch (fileFormatType) { +#ifdef HDF5C + case HDF5: + // data file + if (!skipDataFileWriting) { + file = new HDF5File(index, maxf, nd, fname, fpath, findex, + owenable, dindex, nunits, nf, dr, portno, + generalData->nPixelsX, + generalData->nPixelsY, silentMode); + } + // master file + if ((index == 0) && (*dindex == 0)) { + masterFile = new HDF5File(index, maxf, nd, fname, fpath, findex, + owenable, dindex, nunits, nf, dr, + portno, generalData->nPixelsX, + generalData->nPixelsY, silentMode); + virtualFile = new HDF5File(index, maxf, nd, fname, fpath, + findex, owenable, dindex, nunits, nf, + dr, portno, generalData->nPixelsX, + generalData->nPixelsY, silentMode); + } + break; +#endif + default: + // data file + if (!skipDataFileWriting) { + file = new BinaryFile(index, maxf, nd, fname, fpath, findex, + owenable, dindex, nunits, nf, dr, portno, + silentMode); + } + // master file + if ((index == 0) && (*dindex == 0)) { + masterFile = new BinaryFile(index, maxf, nd, fname, fpath, + findex, owenable, dindex, nunits, + nf, dr, portno, silentMode); + } + break; + } + } +} + +void DataProcessor::CreateMasterFile(MasterAttributes *attr) { + if (masterFile == nullptr) { + throw sls::RuntimeError("master file object not contstructed"); + } + masterFile->CloseMasterFile(); + masterFile->CreateMasterFile(attr); +} + +void DataProcessor::CreateFirstDataFile() { + if (file == nullptr) { + throw sls::RuntimeError("file object not contstructed"); + } + file->CloseCurrentDataFile(); + file->resetSubFileIndex(); + file->StartofAcquisition(); + // do not create file if deactivated and no padding + if (myDetectorType == EIGER && !activated && !deactivatedPaddingEnable) { + return; + } + file->CreateFile(); +} + +void DataProcessor::CloseFiles() { + if (file != nullptr) + file->CloseAllFiles(); +} + +void DataProcessor::EndofAcquisition(bool anyPacketsCaught, uint64_t numf) { + if ((file != nullptr) && file->GetFileType() == HDF5) { + try { + file->EndofAcquisition(anyPacketsCaught, numf); + } catch (const sls::RuntimeError &e) { + ; // ignore for now //TODO: send error to client via stop receiver + } + } +} + +void DataProcessor::ThreadExecution() { + char *buffer = nullptr; + fifo->PopAddress(buffer); + LOG(logDEBUG5) << "DataProcessor " << index + << ", " + "pop 0x" + << std::hex << (void *)(buffer) << std::dec << ":" << buffer; + + // check dummy + auto numBytes = (uint32_t)(*((uint32_t *)buffer)); + LOG(logDEBUG1) << "DataProcessor " << index << ", Numbytes:" << numBytes; + if (numBytes == DUMMY_PACKET_VALUE) { + StopProcessing(buffer); + return; + } + + uint64_t fnum = 0; + try { + fnum = ProcessAnImage(buffer); + } catch (const std::exception &e) { + fifo->FreeAddress(buffer); + return; + } + // stream (if time/freq to stream) or free + if (*dataStreamEnable && SendToStreamer()) { + // if first frame to stream, add frame index to fifo header (might + // not be the first) + if (firstStreamerFrame) { + firstStreamerFrame = false; + (*((uint32_t *)(buffer + FIFO_DATASIZE_NUMBYTES))) = + (uint32_t)(fnum - firstIndex); + } + fifo->PushAddressToStream(buffer); + } else { + fifo->FreeAddress(buffer); + } +} + +void DataProcessor::StopProcessing(char *buf) { + LOG(logDEBUG1) << "DataProcessing " << index << ": Dummy"; + + // stream or free + if (*dataStreamEnable) + fifo->PushAddressToStream(buf); + else + fifo->FreeAddress(buf); + + if (file != nullptr) + file->CloseCurrentFile(); + StopRunning(); + LOG(logDEBUG1) << index << ": Processing Completed"; +} + +uint64_t DataProcessor::ProcessAnImage(char *buf) { + + auto *rheader = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES); + sls_detector_header header = rheader->detHeader; + uint64_t fnum = header.frameNumber; + currentFrameIndex = fnum; + uint32_t nump = header.packetNumber; + if (nump == generalData->packetsPerFrame) { + numFramesCaught++; + } + + LOG(logDEBUG1) << "DataProcessing " << index << ": fnum:" << fnum; + + if (!startedFlag) { + RecordFirstIndex(fnum); + if (*dataStreamEnable) { + // restart timer + clock_gettime(CLOCK_REALTIME, &timerBegin); + timerBegin.tv_sec -= (*streamingTimerInMs) / 1000; + timerBegin.tv_nsec -= ((*streamingTimerInMs) % 1000) * 1000000; + + // to send first image + currentFreqCount = *streamingFrequency - *streamingStartFnum; + } + } + + // frame padding + if (activated && *framePadding && nump < generalData->packetsPerFrame) + PadMissingPackets(buf); + + // deactivated and padding enabled + else if (!activated && deactivatedPaddingEnable) + PadMissingPackets(buf); + + // rearrange ctb digital bits (if ctbDbitlist is not empty) + if (!(*ctbDbitList).empty()) { + RearrangeDbitData(buf); + } + + try { + // normal call back + if (rawDataReadyCallBack != nullptr) { + rawDataReadyCallBack((char *)rheader, + buf + FIFO_HEADER_NUMBYTES + + sizeof(sls_receiver_header), + (uint32_t)(*((uint32_t *)buf)), pRawDataReady); + } + + // call back with modified size + else if (rawDataModifyReadyCallBack != nullptr) { + auto revsize = (uint32_t)(*((uint32_t *)buf)); + rawDataModifyReadyCallBack((char *)rheader, + buf + FIFO_HEADER_NUMBYTES + + sizeof(sls_receiver_header), + revsize, pRawDataReady); + (*((uint32_t *)buf)) = revsize; + } + } catch (const std::exception &e) { + throw sls::RuntimeError("Get Data Callback Error: " + + std::string(e.what())); + } + + // write to file + if (file != nullptr) { + try { + file->WriteToFile( + buf + FIFO_HEADER_NUMBYTES, + sizeof(sls_receiver_header) + + (uint32_t)(*((uint32_t *)buf)), //+ size of data (resizable + // from previous call back + fnum - firstIndex, nump); + } catch (const sls::RuntimeError &e) { + ; // ignore write exception for now (TODO: send error message + // via stopReceiver tcp) + } + } + return fnum; +} + +bool DataProcessor::SendToStreamer() { + // skip + if ((*streamingFrequency) == 0u) { + if (!CheckTimer()) + return false; + } else { + if (!CheckCount()) + return false; + } + return true; +} + +bool DataProcessor::CheckTimer() { + struct timespec end; + clock_gettime(CLOCK_REALTIME, &end); + + LOG(logDEBUG1) << index << " Timer elapsed time:" + << ((end.tv_sec - timerBegin.tv_sec) + + (end.tv_nsec - timerBegin.tv_nsec) / 1000000000.0) + << " seconds"; + // still less than streaming timer, keep waiting + if (((end.tv_sec - timerBegin.tv_sec) + + (end.tv_nsec - timerBegin.tv_nsec) / 1000000000.0) < + ((double)*streamingTimerInMs / 1000.00)) + return false; + + // restart timer + clock_gettime(CLOCK_REALTIME, &timerBegin); + return true; +} + +bool DataProcessor::CheckCount() { + if (currentFreqCount == *streamingFrequency) { + currentFreqCount = 1; + return true; + } + currentFreqCount++; + return false; +} + +void DataProcessor::SetPixelDimension() { + if (file != nullptr) { + if (file->GetFileType() == HDF5) { + file->SetNumberofPixels(generalData->nPixelsX, + generalData->nPixelsY); + } + } +} + +void DataProcessor::registerCallBackRawDataReady(void (*func)(char *, char *, + uint32_t, void *), + void *arg) { + rawDataReadyCallBack = func; + pRawDataReady = arg; +} + +void DataProcessor::registerCallBackRawDataModifyReady( + void (*func)(char *, char *, uint32_t &, void *), void *arg) { + rawDataModifyReadyCallBack = func; + pRawDataReady = arg; +} + +void DataProcessor::PadMissingPackets(char *buf) { + LOG(logDEBUG) << index << ": Padding Missing Packets"; + + uint32_t pperFrame = generalData->packetsPerFrame; + auto *header = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES); + uint32_t nmissing = pperFrame - header->detHeader.packetNumber; + sls_bitset pmask = header->packetsMask; + + uint32_t dsize = generalData->dataSize; + if (myDetectorType == GOTTHARD2 && index != 0) { + dsize = generalData->vetoDataSize; + } + uint32_t fifohsize = generalData->fifoBufferHeaderSize; + uint32_t corrected_dsize = + dsize - ((pperFrame * dsize) - generalData->imageSize); + LOG(logDEBUG1) << "bitmask: " << pmask.to_string(); + + for (unsigned int pnum = 0; pnum < pperFrame; ++pnum) { + + // not missing packet + if (pmask[pnum]) + continue; + + // done with padding, exit loop earlier + if (nmissing == 0u) + break; + + LOG(logDEBUG) << "padding for " << index << " for pnum: " << pnum + << std::endl; + + // missing packet + switch (myDetectorType) { + // for gotthard, 1st packet: 4 bytes fnum, CACA + CACA, 639*2 bytes + // data + // 2nd packet: 4 bytes fnum, previous 1*2 bytes data + + // 640*2 bytes data !! + case GOTTHARD: + if (pnum == 0u) + memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize - 2); + else + memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize + 2); + break; + case CHIPTESTBOARD: + case MOENCH: + if (pnum == (pperFrame - 1)) + memset(buf + fifohsize + (pnum * dsize), 0xFF, corrected_dsize); + else + memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize); + break; + default: + memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize); + break; + } + --nmissing; + } +} + +/** ctb specific */ +void DataProcessor::RearrangeDbitData(char *buf) { + // TODO! (Erik) Refactor and add tests + int totalSize = (int)(*((uint32_t *)buf)); + int ctbDigitalDataBytes = + totalSize - (*ctbAnalogDataBytes) - (*ctbDbitOffset); + + // no digital data + if (ctbDigitalDataBytes == 0) { + LOG(logWARNING) + << "No digital data for call back, yet dbitlist is not empty."; + return; + } + + const int numSamples = (ctbDigitalDataBytes / sizeof(uint64_t)); + const int digOffset = FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header) + + (*ctbAnalogDataBytes); + + // ceil as numResult8Bits could be decimal + const int numResult8Bits = + ceil((double)(numSamples * (*ctbDbitList).size()) / 8.00); + std::vector result(numResult8Bits); + uint8_t *dest = &result[0]; + + auto *source = (uint64_t *)(buf + digOffset + (*ctbDbitOffset)); + + // loop through digital bit enable vector + int bitoffset = 0; + for (auto bi : (*ctbDbitList)) { + // where numbits * numsamples is not a multiple of 8 + if (bitoffset != 0) { + bitoffset = 0; + ++dest; + } + + // loop through the frame digital data + for (auto ptr = source; ptr < (source + numSamples);) { + // get selected bit from each 8 bit + uint8_t bit = (*ptr++ >> bi) & 1; + *dest |= bit << bitoffset; + ++bitoffset; + // extract destination in 8 bit batches + if (bitoffset == 8) { + bitoffset = 0; + ++dest; + } + } + } + + // copy back to buf and update size + memcpy(buf + digOffset, result.data(), numResult8Bits * sizeof(uint8_t)); + (*((uint32_t *)buf)) = numResult8Bits * sizeof(uint8_t); +} diff --git a/slsReceiverSoftware/src/DataProcessor_copy.h b/slsReceiverSoftware/src/DataProcessor_copy.h new file mode 100644 index 000000000..944931c88 --- /dev/null +++ b/slsReceiverSoftware/src/DataProcessor_copy.h @@ -0,0 +1,343 @@ +#pragma once +/************************************************ + * @file DataProcessor.h + * @short creates data processor thread that + * pulls pointers to memory addresses from fifos + * and processes data stored in them & writes them to file + ***********************************************/ +/** + *@short creates & manages a data processor thread each + */ + +#include "ThreadObject.h" +#include "receiver_defs.h" + +class GeneralData; +class Fifo; +class File; +class DataStreamer; +struct MasterAttributes; + +#include +#include + +class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { + + public: + /** + * Constructor + * Calls Base Class CreateThread(), sets ErrorMask if error and increments + * NumberofDataProcessors + * @param ind self index + * @param dtype detector type + * @param f address of Fifo pointer + * @param act activated + * @param depaden deactivated padding enable + * @param dsEnable pointer to data stream enable + * @param dr pointer to dynamic range + * @param freq pointer to streaming frequency + * @param timer pointer to timer if streaming frequency is random + * @param sfnum pointer to streaming starting fnum + * @param fp pointer to frame padding enable + * @param sm pointer to silent mode + * @param qe pointer to quad Enable + * @param cdl pointer to vector or ctb digital bits enable + * @param cdo pointer to digital bits offset + * @param cad pointer to ctb analog databytes + */ + DataProcessor(int ind, detectorType dtype, Fifo *f, bool act, bool depaden, + bool *dsEnable, uint32_t *freq, uint32_t *timer, + uint32_t *sfnum, bool *fp, bool *sm, std::vector *cdl, + int *cdo, int *cad); + + /** + * Destructor + * Calls Base Class DestroyThread() and decrements NumberofDataProcessors + */ + ~DataProcessor() override; + + //*** getters *** + + /** + * Get acquisition started flag + * @return acquisition started flag + */ + bool GetStartedFlag(); + + /** + * Get Frames Complete Caught + * @return number of frames + */ + uint64_t GetNumFramesCaught(); + + /** + * Gets Actual Current Frame Index (that has not been subtracted from + * firstIndex) thats been processed + * @return -1 if no frames have been caught, else current frame index + */ + uint64_t GetCurrentFrameIndex(); + + /** + * Get Current Frame Index thats been processed + * @return -1 if no frames have been caught, else current frame index + */ + uint64_t GetProcessedIndex(); + + /** + * Set Fifo pointer to the one given + * @param f address of Fifo pointer + */ + void SetFifo(Fifo *f); + + /** + * Reset parameters for new acquisition + */ + void ResetParametersforNewAcquisition(); + + /** + * Set GeneralData pointer to the one given + * @param g address of GeneralData (Detector Data) pointer + */ + void SetGeneralData(GeneralData *g); + + /** + * Set up file writer object and call backs + * @param ftype file format + * @param fwe file write enable + * @param act activated + * @param depad deactivated padding enable + * @param nd pointer to number of detectors in each dimension + * @param maxf pointer to max frames per file + * @param fname pointer to file name prefix + * @param fpath pointer to file path + * @param findex pointer to file index + * @param owenable pointer to over write enable + * @param dindex pointer to detector index + * @param nunits pointer to number of threads/ units per detector + * @param nf pointer to number of images in acquisition + * @param dr pointer to dynamic range + * @param portno pointer to udp port number + * @param g address of GeneralData (Detector Data) pointer + */ + void SetupFileWriter(fileFormat ftype, bool fwe, int act, int depaden, + int *nd, uint32_t *maxf, std::string *fname, + std::string *fpath, uint64_t *findex, bool *owenable, + int *dindex, int *nunits, uint64_t *nf, uint32_t *dr, + uint32_t *portno, GeneralData *g = nullptr); + + /** + * Create Master File (also virtual if hdf5) + * @param attr master file attributes + */ + void CreateMasterFile(MasterAttributes *attr); + + /** + * Create First Data File + */ + void CreatFirsteDataFile(); + + /** + * Closes files + */ + void CloseFiles(); + + /** + * End of Acquisition + * @param anyPacketsCaught true if any packets are caught, else false + * @param numf number of images caught + */ + void EndofAcquisition(bool anyPacketsCaught, uint64_t numf); + + /** + * Update pixel dimensions in file writer + */ + void SetPixelDimension(); + + /** + * Call back for raw data + * args to raw data ready callback are + * sls_receiver_header frame metadata + * dataPointer is the pointer to the data + * dataSize in bytes is the size of the data in bytes. + */ + void registerCallBackRawDataReady(void (*func)(char *, char *, uint32_t, + void *), + void *arg); + + /** + * Call back for raw data (modified) + * args to raw data ready callback are + * sls_receiver_header frame metadata + * dataPointer is the pointer to the data + * revDatasize is the reference of data size in bytes. + * Can be modified to the new size to be written/streamed. (only smaller + * value). + */ + void registerCallBackRawDataModifyReady(void (*func)(char *, char *, + uint32_t &, void *), + void *arg); + + private: + /** + * Record First Index + * @param fnum frame index to record + */ + void RecordFirstIndex(uint64_t fnum); + + /** + * Thread Exeution for DataProcessor Class + * Pop bound addresses, process them, + * write to file if needed & free the address + */ + void ThreadExecution() override; + + /** + * Frees dummy buffer, + * reset running mask by calling StopRunning() + * @param buf address of pointer + */ + void StopProcessing(char *buf); + + /** + * Process an image popped from fifo, + * write to file if fw enabled & update parameters + * @param buf address of pointer + * @returns frame number + */ + uint64_t ProcessAnImage(char *buf); + + /** + * Calls CheckTimer and CheckCount for streaming frequency and timer + * and determines if the current image should be sent to streamer + * @returns true if it should to streamer, else false + */ + bool SendToStreamer(); + + /** + * This function should be called only in random frequency mode + * Checks if timer is done and ready to send to stream + * @returns true if ready to send to stream, else false + */ + bool CheckTimer(); + + /** + * This function should be called only in non random frequency mode + * Checks if count is done and ready to send to stream + * @returns true if ready to send to stream, else false + */ + bool CheckCount(); + + /** + * Pad Missing Packets from the bit mask + * @param buf buffer + */ + void PadMissingPackets(char *buf); + + /** + * Align corresponding digital bits together (CTB only if ctbDbitlist is not + * empty) + */ + void RearrangeDbitData(char *buf); + + /** type of thread */ + static const std::string TypeName; + + /** GeneralData (Detector Data) object */ + const GeneralData *generalData{nullptr}; + + /** Fifo structure */ + Fifo *fifo; + + // individual members + /** Detector Type */ + detectorType myDetectorType; + + /** File writer implemented as binary or hdf5 File */ + File *file{nullptr}; + + /** master file */ + File *masterFile{nullptr}; + + /** virtual file (for hdf5) */ + File *virtualFile{nullptr}; + + /** Data Stream Enable */ + bool *dataStreamEnable; + + /** Activated/Deactivated */ + bool activated; + + /** Deactivated padding enable */ + bool deactivatedPaddingEnable; + + /** Pointer to Streaming frequency, if 0, sending random images with a timer + */ + uint32_t *streamingFrequency; + + /** Pointer to the timer if Streaming frequency is random */ + uint32_t *streamingTimerInMs; + + /** Pointer to streaming starting fnum */ + uint32_t *streamingStartFnum; + + /** Current frequency count */ + uint32_t currentFreqCount{0}; + + /** timer beginning stamp for random streaming */ + struct timespec timerBegin; + + /** Silent Mode */ + bool *silentMode; + + /** frame padding */ + bool *framePadding; + + /** ctb digital bits enable list */ + std::vector *ctbDbitList; + + /** ctb digital bits offset */ + int *ctbDbitOffset; + + /** ctb analog databytes */ + int *ctbAnalogDataBytes; + + // acquisition start + /** Aquisition Started flag */ + std::atomic startedFlag{false}; + + /** Frame Number of First Frame */ + std::atomic firstIndex{0}; + + // for statistics + /** Number of complete frames caught */ + uint64_t numFramesCaught{0}; + + /** Frame Number of latest processed frame number */ + std::atomic currentFrameIndex{0}; + + /** first streamer frame to add frame index in fifo header */ + bool firstStreamerFrame{false}; + + // call back + /** + * Call back for raw data + * args to raw data ready callback are + * sls_receiver_header frame metadata + * dataPointer is the pointer to the data + * dataSize in bytes is the size of the data in bytes. + */ + void (*rawDataReadyCallBack)(char *, char *, uint32_t, void *) = nullptr; + + /** + * Call back for raw data (modified) + * args to raw data ready callback are + * sls_receiver_header frame metadata + * dataPointer is the pointer to the data + * revDatasize is the reference of data size in bytes. Can be modified to + * the new size to be written/streamed. (only smaller value). + */ + void (*rawDataModifyReadyCallBack)(char *, char *, uint32_t &, + void *) = nullptr; + + void *pRawDataReady{nullptr}; +}; diff --git a/slsReceiverSoftware/src/HDF5File_copy.cpp b/slsReceiverSoftware/src/HDF5File_copy.cpp new file mode 100644 index 000000000..c27284d1f --- /dev/null +++ b/slsReceiverSoftware/src/HDF5File_copy.cpp @@ -0,0 +1,830 @@ +/************************************************ + * @file HDF5File.cpp + * @short sets/gets properties for the HDF5 file, + * creates/closes the file and writes data to it + ***********************************************/ +#include "HDF5File.h" +#include "Fifo.h" +#include "MasterAttributes.h" +#include "receiver_defs.h" + +#include +#include +#include //basename +#include + +std::mutex HDF5File::hdf5Lib; + +HDF5File::HDF5File(int ind, uint32_t *maxf, int *nd, std::string *fname, + std::string *fpath, uint64_t *findex, bool *owenable, + int *dindex, int *nunits, uint64_t *nf, uint32_t *dr, + uint32_t *portno, uint32_t nx, uint32_t ny, bool *smode) + : + + File(ind, HDF5, maxf, nd, fname, fpath, findex, owenable, dindex, nunits, + nf, dr, portno, smode), + masterfd(nullptr), virtualfd(0), filefd(nullptr), dataspace(nullptr), + dataset(nullptr), datatype(PredType::STD_U16LE), nPixelsX(nx), + nPixelsY(ny), numFramesInFile(0), numFilesinAcquisition(0), + dataspace_para(nullptr), extNumImages(0) { + PrintMembers(); + dataset_para.clear(); + parameterNames.clear(); + parameterDataTypes.clear(); + + parameterNames = std::vector{ + "frame number", + "exp length or sub exposure time", + "packets caught", + "bunch id", + "timestamp", + "mod id", + "row", + "column", + "reserved", + "debug", + "round robin number", + "detector type", + "detector header version", + "packets caught bit mask", + }; + StrType strdatatype(PredType::C_S1, sizeof(bitset_storage)); + parameterDataTypes = std::vector{ + PredType::STD_U64LE, PredType::STD_U32LE, PredType::STD_U32LE, + PredType::STD_U64LE, PredType::STD_U64LE, PredType::STD_U16LE, + PredType::STD_U16LE, PredType::STD_U16LE, PredType::STD_U16LE, + PredType::STD_U32LE, PredType::STD_U16LE, PredType::STD_U8LE, + PredType::STD_U8LE, strdatatype}; +} + +HDF5File::~HDF5File() { CloseAllFiles(); } + +void HDF5File::SetNumberofPixels(uint32_t nx, uint32_t ny) { + nPixelsX = nx; + nPixelsY = ny; +} + +void HDF5File::CreateFile() { + numFilesinAcquisition++; + numFramesInFile = 0; + + // first time + if (subFileIndex == 0u) { + switch (*dynamicRange) { + case 16: + datatype = PredType::STD_U16LE; + break; + case 32: + datatype = PredType::STD_U32LE; + break; + default: + datatype = PredType::STD_U8LE; + break; + } + } + CreateDataFile(); +} + +void HDF5File::CloseAllFiles() { + CloseCurrentDataFile(); + CloseMasterFile(); +} + +void HDF5File::CloseMasterFile() { + if (master) { + CloseFile(masterfd, true); + // close virtual file + // c code due to only c implementation of H5Pset_virtual available + if (virtualfd != 0) { + if (H5Fclose(virtualfd) < 0) { + LOG(logERROR) << "Could not close virtual HDF5 handles"; + } + virtualfd = 0; + } + } +} + +void HDF5File::CloseCurrentDataFile() { + CloseFile(filefd, false); + for (unsigned int i = 0; i < dataset_para.size(); ++i) + delete dataset_para[i]; + dataset_para.clear(); + if (dataspace_para) { + delete dataspace_para; + dataspace_para = nullptr; + } + if (dataset) { + delete dataset; + dataset = nullptr; + } + if (dataspace) { + delete dataspace; + dataspace = nullptr; + } +} + +void HDF5File::WriteToFile(char *buffer, int bufferSize, + uint64_t currentFrameNumber, + uint32_t numPacketsCaught) { + + // check if maxframesperfile = 0 for infinite + if ((*maxFramesPerFile) && (numFramesInFile >= (*maxFramesPerFile))) { + CloseCurrentFile(); + ++subFileIndex; + CreateFile(); + } + numFramesInFile++; + + // extend dataset (when receiver start followed by many status starts + // (jungfrau))) + if (currentFrameNumber >= extNumImages) { + ExtendDataset(); + } + + WriteDataFile(currentFrameNumber, buffer + sizeof(sls_receiver_header)); + WriteParameterDatasets(currentFrameNumber, (sls_receiver_header *)(buffer)); +} + +void HDF5File::CreateMasterFile(MasterAttributes *attr) { + if (master) { + virtualfd = 0; + CreateMasterDataFile(attr); + } +} + +void HDF5File::StartofAcquisition() { + numFilesinAcquisition = 0; + numFramesInFile = 0; + extNumImages = *numImages; +} + +void HDF5File::EndofAcquisition(bool anyPacketsCaught, + uint64_t numImagesCaught) { + // not created before + if (!virtualfd && anyPacketsCaught) { + // called only by the one maser receiver + if (master && masterfd != nullptr) { + // only one file and one sub image (link current file in master) + if (((numFilesinAcquisition == 1) && (numDetY * numDetX) == 1)) { + // dataset name + std::ostringstream oss; + oss << "/data"; + if ((*numImages > 1)) { + oss << "_f" << std::setfill('0') << std::setw(12) << 0; + } + std::string dsetname = oss.str(); + + LinkVirtualInMaster(currentFileName, dsetname); + } + // create virutal file + else { + CreateVirtualDataFile( + // infinite images in 1 file, then maxfrperfile = + // numImagesCaught + ((*maxFramesPerFile == 0) ? numImagesCaught + 1 + : *maxFramesPerFile), + numImagesCaught + 1); + } + } + } + numFilesinAcquisition = 0; +} + +void HDF5File::CloseFile(H5File *&fd, bool masterFile) { + std::lock_guard lock(HDF5File::hdf5Lib); + try { + Exception::dontPrint(); // to handle errors + if (fd) { + fd->close(); + delete fd; + fd = nullptr; + } + } catch (const Exception &error) { + LOG(logERROR) << "Could not close " << (masterFile ? "master" : "data") + << " HDF5 handles of index " << index; + error.printErrorStack(); + } +} + +void HDF5File::WriteDataFile(uint64_t currentFrameNumber, char *buffer) { + std::lock_guard lock(HDF5File::hdf5Lib); + + uint64_t nDimx = + ((*maxFramesPerFile == 0) ? currentFrameNumber + : currentFrameNumber % (*maxFramesPerFile)); + uint32_t nDimy = nPixelsY; + uint32_t nDimz = ((*dynamicRange == 4) ? (nPixelsX / 2) : nPixelsX); + + hsize_t count[3] = {1, nDimy, nDimz}; + hsize_t start[3] = {nDimx, 0, 0}; + hsize_t dims2[2] = {nDimy, nDimz}; + try { + Exception::dontPrint(); // to handle errors + + dataspace->selectHyperslab(H5S_SELECT_SET, count, start); + DataSpace memspace(2, dims2); + dataset->write(buffer, datatype, memspace, *dataspace); + memspace.close(); + } catch (const Exception &error) { + LOG(logERROR) << "Could not write to file in object " << index; + error.printErrorStack(); + throw sls::RuntimeError("Could not write to file in object " + + std::to_string(index)); + } +} + +void HDF5File::WriteParameterDatasets(uint64_t currentFrameNumber, + sls_receiver_header *rheader) { + std::lock_guard lock(HDF5File::hdf5Lib); + + uint64_t fnum = + ((*maxFramesPerFile == 0) ? currentFrameNumber + : currentFrameNumber % (*maxFramesPerFile)); + + sls_detector_header header = rheader->detHeader; + hsize_t count[1] = {1}; + hsize_t start[1] = {fnum}; + int i = 0; + try { + Exception::dontPrint(); // to handle errors + dataspace_para->selectHyperslab(H5S_SELECT_SET, count, start); + DataSpace memspace(H5S_SCALAR); + dataset_para[0]->write(&header.frameNumber, parameterDataTypes[0], + memspace, *dataspace_para); + i = 1; + dataset_para[1]->write(&header.expLength, parameterDataTypes[1], + memspace, *dataspace_para); + i = 2; + dataset_para[2]->write(&header.packetNumber, parameterDataTypes[2], + memspace, *dataspace_para); + i = 3; + dataset_para[3]->write(&header.bunchId, parameterDataTypes[3], memspace, + *dataspace_para); + i = 4; + dataset_para[4]->write(&header.timestamp, parameterDataTypes[4], + memspace, *dataspace_para); + i = 5; + dataset_para[5]->write(&header.modId, parameterDataTypes[5], memspace, + *dataspace_para); + i = 6; + dataset_para[6]->write(&header.row, parameterDataTypes[6], memspace, + *dataspace_para); + i = 7; + dataset_para[7]->write(&header.column, parameterDataTypes[7], memspace, + *dataspace_para); + i = 8; + dataset_para[8]->write(&header.reserved, parameterDataTypes[8], + memspace, *dataspace_para); + i = 9; + dataset_para[9]->write(&header.debug, parameterDataTypes[9], memspace, + *dataspace_para); + i = 10; + dataset_para[10]->write(&header.roundRNumber, parameterDataTypes[10], + memspace, *dataspace_para); + i = 11; + dataset_para[11]->write(&header.detType, parameterDataTypes[11], + memspace, *dataspace_para); + i = 12; + dataset_para[12]->write(&header.version, parameterDataTypes[12], + memspace, *dataspace_para); + i = 13; + + // contiguous bitset + if (sizeof(sls_bitset) == sizeof(bitset_storage)) { + dataset_para[13]->write((char *)&(rheader->packetsMask), + parameterDataTypes[13], memspace, + *dataspace_para); + } + + // not contiguous bitset + else { + // get contiguous representation of bit mask + bitset_storage storage; + memset(storage, 0, sizeof(bitset_storage)); + sls_bitset bits = rheader->packetsMask; + for (int i = 0; i < MAX_NUM_PACKETS; ++i) + storage[i >> 3] |= (bits[i] << (i & 7)); + // write bitmask + dataset_para[13]->write((char *)storage, parameterDataTypes[13], + memspace, *dataspace_para); + } + i = 14; + } catch (const Exception &error) { + error.printErrorStack(); + throw sls::RuntimeError( + "Could not write parameters (index:" + std::to_string(i) + + ") to file in object " + std::to_string(index)); + } +} + +void HDF5File::ExtendDataset() { + std::lock_guard lock(HDF5File::hdf5Lib); + + try { + Exception::dontPrint(); // to handle errors + + hsize_t dims[3]; + dataspace->getSimpleExtentDims(dims); + dims[0] += *numImages; + + dataset->extend(dims); + delete dataspace; + dataspace = nullptr; + dataspace = new DataSpace(dataset->getSpace()); + + hsize_t dims_para[1] = {dims[0]}; + for (unsigned int i = 0; i < dataset_para.size(); ++i) + dataset_para[i]->extend(dims_para); + delete dataspace_para; + dataspace_para = nullptr; + dataspace_para = new DataSpace(dataset_para[0]->getSpace()); + + } catch (const Exception &error) { + error.printErrorStack(); + throw sls::RuntimeError("Could not extend dataset in object " + + std::to_string(index)); + } + if (!(*silentMode)) { + LOG(logINFO) << index << " Extending HDF5 dataset by " << extNumImages + << ", Total x Dimension: " << (extNumImages + *numImages); + } + extNumImages += *numImages; +} + +void HDF5File::CreateDataFile() { + + std::ostringstream os; + os << *filePath << "/" << *fileNamePrefix << "_d" + << (*detIndex * (*numUnitsPerDetector) + index) << "_f" << subFileIndex + << '_' << *fileIndex << ".h5"; + currentFileName = os.str(); + + std::lock_guard lock(HDF5File::hdf5Lib); + + uint64_t framestosave = + ((*maxFramesPerFile == 0) ? *numImages : // infinite images + (((extNumImages - subFileIndex) > (*maxFramesPerFile)) + ? // save up to maximum at a time + (*maxFramesPerFile) + : (extNumImages - subFileIndex))); + + uint64_t nDimx = framestosave; + uint32_t nDimy = nPixelsY; + uint32_t nDimz = ((*dynamicRange == 4) ? (nPixelsX / 2) : nPixelsX); + + try { + Exception::dontPrint(); // to handle errors + + // file + FileAccPropList fapl; + fapl.setFcloseDegree(H5F_CLOSE_STRONG); + filefd = nullptr; + if (!(*overWriteEnable)) + filefd = new H5File(currentFileName.c_str(), H5F_ACC_EXCL, + FileCreatPropList::DEFAULT, fapl); + else + filefd = new H5File(currentFileName.c_str(), H5F_ACC_TRUNC, + FileCreatPropList::DEFAULT, fapl); + + // attributes - version + double dValue = HDF5_WRITER_VERSION; + DataSpace dataspace_attr = DataSpace(H5S_SCALAR); + Attribute attribute = filefd->createAttribute( + "version", PredType::NATIVE_DOUBLE, dataspace_attr); + attribute.write(PredType::NATIVE_DOUBLE, &dValue); + + // dataspace + hsize_t srcdims[3] = {nDimx, nDimy, nDimz}; + hsize_t srcdimsmax[3] = {H5S_UNLIMITED, nDimy, nDimz}; + dataspace = nullptr; + dataspace = new DataSpace(3, srcdims, srcdimsmax); + + // dataset name + std::ostringstream osfn; + osfn << "/data"; + if (*numImages > 1) + osfn << "_f" << std::setfill('0') << std::setw(12) << subFileIndex; + std::string dsetname = osfn.str(); + + // dataset + // fill value + DSetCreatPropList plist; + int fill_value = -1; + plist.setFillValue(datatype, &fill_value); + // always create chunked dataset as unlimited is only + // supported with chunked layout + hsize_t chunk_dims[3] = {MAX_CHUNKED_IMAGES, nDimy, nDimz}; + plist.setChunk(3, chunk_dims); + dataset = nullptr; + dataset = new DataSet(filefd->createDataSet(dsetname.c_str(), datatype, + *dataspace, plist)); + + // create parameter datasets + hsize_t dims[1] = {nDimx}; + hsize_t dimsmax[1] = {H5S_UNLIMITED}; + dataspace_para = nullptr; + dataspace_para = new DataSpace(1, dims, dimsmax); + + // always create chunked dataset as unlimited is only + // supported with chunked layout + DSetCreatPropList paralist; + hsize_t chunkpara_dims[3] = {MAX_CHUNKED_IMAGES}; + paralist.setChunk(1, chunkpara_dims); + + for (unsigned int i = 0; i < parameterNames.size(); ++i) { + DataSet *ds = new DataSet(filefd->createDataSet( + parameterNames[i].c_str(), parameterDataTypes[i], + *dataspace_para, paralist)); + dataset_para.push_back(ds); + } + } catch (const Exception &error) { + error.printErrorStack(); + if (filefd) { + filefd->close(); + } + throw sls::RuntimeError("Could not create HDF5 handles in object " + + index); + } + if (!(*silentMode)) { + LOG(logINFO) << *udpPortNumber + << ": HDF5 File created: " << currentFileName; + } +} + +void HDF5File::CreateMasterDataFile(MasterAttributes *attr) { + + std::ostringstream os; + os << *filePath << "/" << *fileNamePrefix << "_master" + << "_" << *fileIndex << ".h5"; + masterFileName = os.str(); + + if (!(*silentMode)) { + LOG(logINFO) << "Master File: " << masterFileName; + } + + std::lock_guard lock(HDF5File::hdf5Lib); + + try { + Exception::dontPrint(); // to handle errors + + FileAccPropList flist; + flist.setFcloseDegree(H5F_CLOSE_STRONG); + masterfd = nullptr; + if (!(*overWriteEnable)) + masterfd = new H5File(masterFileName.c_str(), H5F_ACC_EXCL, + FileCreatPropList::DEFAULT, flist); + else + masterfd = new H5File(masterFileName.c_str(), H5F_ACC_TRUNC, + FileCreatPropList::DEFAULT, flist); + + // Create a group in the file + Group group1(masterfd->createGroup("entry")); + Group group2(group1.createGroup("data")); + Group group3(group1.createGroup("instrument")); + Group group4(group3.createGroup("beam")); + Group group5(group3.createGroup("detector")); + Group group6(group1.createGroup("sample")); + + attr->WriteMasterHDF5Attributes(masterfd, &group5); + masterfd->close(); + + } catch (const Exception &error) { + error.printErrorStack(); + if (masterfd) { + masterfd->close(); + } + throw sls::RuntimeError("Could not create master HDF5 handles"); + } +} + +void HDF5File::CreateVirtualDataFile(uint32_t maxFramesPerFile, uint64_t numf) { + + std::ostringstream osfn; + osfn << *filePath << "/" << *fileNamePrefix; + osfn << "_virtual"; + osfn << "_" << *fileIndex; + osfn << ".h5"; + std::string vname = osfn.str(); + + if (!(*silentMode)) { + LOG(logINFO) << "Virtual File: " << vname; + } + + int numDetz = numDetX; + uint32_t nDimy = nPixelsY; + uint32_t nDimz = ((*dynamicRange == 4) ? (nPixelsX / 2) : nPixelsX); + + std::lock_guard lock(HDF5File::hdf5Lib); + + try { + // file + hid_t dfal = H5Pcreate(H5P_FILE_ACCESS); + if (dfal < 0) + throw sls::RuntimeError( + "Could not create file access property for virtual file " + + vname); + if (H5Pset_fclose_degree(dfal, H5F_CLOSE_STRONG) < 0) + throw sls::RuntimeError( + "Could not set strong file close degree for virtual file " + + vname); + virtualfd = H5Fcreate(vname.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, dfal); + if (virtualfd < 0) + throw sls::RuntimeError("Could not create virtual file " + vname); + + // attributes - version + hid_t dataspace_attr = H5Screate(H5S_SCALAR); + if (dataspace_attr < 0) + throw sls::RuntimeError( + "Could not create dataspace for attribute in virtual file " + + vname); + hid_t attrid = H5Acreate2(virtualfd, "version", H5T_NATIVE_DOUBLE, + dataspace_attr, H5P_DEFAULT, H5P_DEFAULT); + if (attrid < 0) + throw sls::RuntimeError( + "Could not create attribute in virtual file " + vname); + double attr_data = HDF5_WRITER_VERSION; + if (H5Awrite(attrid, H5T_NATIVE_DOUBLE, &attr_data) < 0) + throw sls::RuntimeError( + "Could not write attribute in virtual file " + vname); + if (H5Aclose(attrid) < 0) + throw sls::RuntimeError( + "Could not close attribute in virtual file " + vname); + + // virtual dataspace + hsize_t vdsdims[3] = {numf, numDetY * nDimy, numDetz * nDimz}; + hid_t vdsDataspace = H5Screate_simple(3, vdsdims, nullptr); + if (vdsDataspace < 0) + throw sls::RuntimeError( + "Could not create virtual dataspace in virtual file " + vname); + hsize_t vdsdims_para[2] = {numf, (unsigned int)numDetY * numDetz}; + hid_t vdsDataspace_para = H5Screate_simple(2, vdsdims_para, nullptr); + if (vdsDataspace_para < 0) + throw sls::RuntimeError("Could not create virtual dataspace " + "(parameters) in virtual file " + + vname); + + // fill values + hid_t dcpl = H5Pcreate(H5P_DATASET_CREATE); + if (dcpl < 0) + throw sls::RuntimeError( + "Could not create file creation properties in virtual file " + + vname); + int fill_value = -1; + if (H5Pset_fill_value(dcpl, GetDataTypeinC(datatype), &fill_value) < 0) + throw sls::RuntimeError( + "Could not create fill value in virtual file " + vname); + std::vector dcpl_para(parameterNames.size()); + for (unsigned int i = 0; i < parameterNames.size(); ++i) { + dcpl_para[i] = H5Pcreate(H5P_DATASET_CREATE); + if (dcpl_para[i] < 0) + throw sls::RuntimeError( + "Could not create file creation properties (parameters) in " + "virtual file " + + vname); + if (H5Pset_fill_value(dcpl_para[i], + GetDataTypeinC(parameterDataTypes[i]), + &fill_value) < 0) + throw sls::RuntimeError("Could not create fill value " + "(parameters) in virtual file " + + vname); + } + + // hyperslab + int numMajorHyperslab = numf / maxFramesPerFile; + if (numf % maxFramesPerFile) + numMajorHyperslab++; + uint64_t framesSaved = 0; + for (int j = 0; j < numMajorHyperslab; j++) { + + uint64_t nDimx = ((numf - framesSaved) > maxFramesPerFile) + ? maxFramesPerFile + : (numf - framesSaved); + hsize_t offset[3] = {framesSaved, 0, 0}; + hsize_t count[3] = {nDimx, nDimy, nDimz}; + hsize_t offset_para[2] = {framesSaved, 0}; + hsize_t count_para[2] = {nDimx, 1}; + + for (int i = 0; i < numDetY * numDetz; ++i) { + + // setect hyperslabs + if (H5Sselect_hyperslab(vdsDataspace, H5S_SELECT_SET, offset, + nullptr, count, nullptr) < 0) { + throw sls::RuntimeError("Could not select hyperslab"); + } + if (H5Sselect_hyperslab(vdsDataspace_para, H5S_SELECT_SET, + offset_para, nullptr, count_para, + nullptr) < 0) { + throw sls::RuntimeError( + "Could not select hyperslab for parameters"); + } + + // source file name + std::ostringstream os; + os << *filePath << "/" << *fileNamePrefix << "_d" + << (*detIndex * (*numUnitsPerDetector) + i) << "_f" << j + << '_' << *fileIndex << ".h5"; + std::string srcFileName = os.str(); + + LOG(logDEBUG1) << srcFileName; + // find relative path + std::string relative_srcFileName = srcFileName; + { + size_t i = srcFileName.rfind('/', srcFileName.length()); + if (i != std::string::npos) + relative_srcFileName = (srcFileName.substr( + i + 1, srcFileName.length() - i)); + } + + // source dataset name + std::ostringstream osfn; + osfn << "/data"; + if (*numImages > 1) + osfn << "_f" << std::setfill('0') << std::setw(12) << j; + std::string srcDatasetName = osfn.str(); + + // source dataspace + hsize_t srcdims[3] = {nDimx, nDimy, nDimz}; + hsize_t srcdimsmax[3] = {H5S_UNLIMITED, nDimy, nDimz}; + hid_t srcDataspace = H5Screate_simple(3, srcdims, srcdimsmax); + if (srcDataspace < 0) + throw sls::RuntimeError( + "Could not create source dataspace in virtual file " + + vname); + hsize_t srcdims_para[1] = {nDimx}; + hsize_t srcdimsmax_para[1] = {H5S_UNLIMITED}; + hid_t srcDataspace_para = + H5Screate_simple(1, srcdims_para, srcdimsmax_para); + if (srcDataspace_para < 0) + throw sls::RuntimeError("Could not create source dataspace " + "(parameters) in virtual file " + + vname); + + // mapping + if (H5Pset_virtual(dcpl, vdsDataspace, + relative_srcFileName.c_str(), + srcDatasetName.c_str(), srcDataspace) < 0) { + throw sls::RuntimeError( + "Could not set mapping for paramter 1"); + } + + for (unsigned int k = 0; k < parameterNames.size(); ++k) { + if (H5Pset_virtual(dcpl_para[k], vdsDataspace_para, + relative_srcFileName.c_str(), + parameterNames[k].c_str(), + srcDataspace_para) < 0) { + throw sls::RuntimeError( + "Could not set mapping for paramter " + + std::to_string(k)); + } + } + + // H5Sclose(srcDataspace); + // H5Sclose(srcDataspace_para); + offset[2] += nDimz; + if (offset[2] >= (numDetz * nDimz)) { + offset[2] = 0; + offset[1] += nDimy; + } + offset_para[1]++; + } + framesSaved += nDimx; + } + + // dataset + std::string virtualDatasetName = "data"; + hid_t vdsdataset = H5Dcreate2(virtualfd, virtualDatasetName.c_str(), + GetDataTypeinC(datatype), vdsDataspace, + H5P_DEFAULT, dcpl, H5P_DEFAULT); + if (vdsdataset < 0) + throw sls::RuntimeError( + "Could not create virutal dataset in virtual file " + vname); + + // virtual parameter dataset + for (unsigned int i = 0; i < parameterNames.size(); ++i) { + hid_t vdsdataset_para = H5Dcreate2( + virtualfd, parameterNames[i].c_str(), + GetDataTypeinC(parameterDataTypes[i]), vdsDataspace_para, + H5P_DEFAULT, dcpl_para[i], H5P_DEFAULT); + if (vdsdataset_para < 0) + throw sls::RuntimeError("Could not create virutal dataset " + "(parameters) in virtual file " + + vname); + } + + // close + H5Fclose(virtualfd); + virtualfd = 0; + + // link + LinkVirtualInMaster(vname, virtualDatasetName); + } catch (const sls::RuntimeError &e) { + if (virtualfd > 0) + H5Fclose(virtualfd); + virtualfd = 0; + } +} + +void HDF5File::LinkVirtualInMaster(std::string fname, std::string dsetname) { + + if (fname == currentFileName) { + std::lock_guard lock(HDF5File::hdf5Lib); + } + + char linkname[100]; + hid_t vfd = 0; + + try { + hid_t dfal = H5Pcreate(H5P_FILE_ACCESS); + if (dfal < 0) + throw sls::RuntimeError( + "Could not create file access property for link"); + if (H5Pset_fclose_degree(dfal, H5F_CLOSE_STRONG) < 0) + throw sls::RuntimeError( + "Could not set strong file close degree for link"); + + // open master file + hid_t mfd = H5Fopen(masterFileName.c_str(), H5F_ACC_RDWR, dfal); + if (mfd < 0) + throw sls::RuntimeError("Could not open master file"); + + // open virtual file + vfd = H5Fopen(fname.c_str(), H5F_ACC_RDWR, dfal); + if (vfd < 0) { + H5Fclose(mfd); + mfd = 0; + throw sls::RuntimeError("Could not open virtual file"); + } + + // find relative path + std::string relative_virtualfname = fname; + { + size_t i = fname.rfind('/', fname.length()); + if (i != std::string::npos) + relative_virtualfname = + (fname.substr(i + 1, fname.length() - i)); + } + + //**data dataset** + hid_t vdset = H5Dopen2(vfd, dsetname.c_str(), H5P_DEFAULT); + if (vdset < 0) { + H5Fclose(mfd); + throw sls::RuntimeError("Could not open virtual data dataset"); + } + sprintf(linkname, "/entry/data/%s", dsetname.c_str()); + if (H5Lcreate_external(relative_virtualfname.c_str(), dsetname.c_str(), + mfd, linkname, H5P_DEFAULT, H5P_DEFAULT) < 0) { + H5Fclose(mfd); + mfd = 0; + throw sls::RuntimeError("Could not create link to data dataset"); + } + H5Dclose(vdset); + + //**paramter datasets** + for (unsigned int i = 0; i < parameterNames.size(); ++i) { + hid_t vdset_para = H5Dopen2( + vfd, (std::string(parameterNames[i])).c_str(), H5P_DEFAULT); + if (vdset_para < 0) { + H5Fclose(mfd); + mfd = 0; + throw sls::RuntimeError( + "Could not open virtual parameter dataset to create link"); + } + sprintf(linkname, "/entry/data/%s", + (std::string(parameterNames[i])).c_str()); + + if (H5Lcreate_external(relative_virtualfname.c_str(), + parameterNames[i].c_str(), mfd, linkname, + H5P_DEFAULT, H5P_DEFAULT) < 0) { + H5Fclose(mfd); + mfd = 0; + throw sls::RuntimeError( + "Could not create link to virtual parameter dataset"); + } + } + + H5Fclose(mfd); + mfd = 0; + H5Fclose(vfd); + vfd = 0; + } catch (...) { + if (vfd > 0) + H5Fclose(vfd); + vfd = 0; + } +} + +hid_t HDF5File::GetDataTypeinC(DataType dtype) { + if (dtype == PredType::STD_U8LE) + return H5T_STD_U8LE; + else if (dtype == PredType::STD_U16LE) + return H5T_STD_U16LE; + else if (dtype == PredType::STD_U32LE) + return H5T_STD_U32LE; + else if (dtype == PredType::STD_U64LE) + return H5T_STD_U64LE; + else { + hid_t s = H5Tcopy(H5T_C_S1); + H5Tset_size(s, MAX_NUM_PACKETS); + return s; + } +} \ No newline at end of file diff --git a/slsReceiverSoftware/src/HDF5File_copy.h b/slsReceiverSoftware/src/HDF5File_copy.h new file mode 100644 index 000000000..f10d06370 --- /dev/null +++ b/slsReceiverSoftware/src/HDF5File_copy.h @@ -0,0 +1,92 @@ +#pragma once +/************************************************ + * @file HDF5File.h + * @short sets/gets properties for the HDF5 file, + * creates/closes the file and writes data to it + ***********************************************/ +/** + *@short sets/gets properties for the HDF5 file, creates/closes the file and + *writes data to it + */ + +#include "File.h" + +#include "H5Cpp.h" +#ifndef H5_NO_NAMESPACE +using namespace H5; +#endif +#include + +class HDF5File : private virtual slsDetectorDefs, public File { + + public: + /** + * Constructor + * creates the File Writer + * @param ind self index + * @param maxf pointer to max frames per file + * @param nd pointer to number of detectors in each dimension + * @param fname pointer to file name prefix + * @param fpath pointer to file path + * @param findex pointer to file index + * @param owenable pointer to over write enable + * @param dindex pointer to detector index + * @param nunits pointer to number of theads/ units per detector + * @param nf pointer to number of images in acquisition + * @param dr pointer to dynamic range + * @param portno pointer to udp port number for logging + * @param nx number of pixels in x direction + * @param ny number of pixels in y direction + * @param smode pointer to silent mode + */ + HDF5File(int ind, uint32_t *maxf, int *nd, std::string *fname, + std::string *fpath, uint64_t *findex, bool *owenable, int *dindex, + int *nunits, uint64_t *nf, uint32_t *dr, uint32_t *portno, + uint32_t nx, uint32_t ny, bool *smode); + ~HDF5File(); + void SetNumberofPixels(uint32_t nx, uint32_t ny); + void CreateFile() override; + void CloseAllFiles() override; + void CloseCurrentDataFile() override; + void CloseMasterFile() override; + void WriteToFile(char *buffer, int bufferSize, uint64_t currentFrameNumber, + uint32_t numPacketsCaught) override; + void CreateMasterFile(MasterAttributes *attr) override; + void StartofAcquisition() override; + void EndofAcquisition(bool anyPacketsCaught, uint64_t numImagesCaught); + + private: + void CloseFile(H5File *&fd, bool masterFile); + void WriteDataFile(uint64_t currentFrameNumber, char *buffer); + void WriteParameterDatasets(uint64_t currentFrameNumber, + sls_receiver_header *rheader); + void ExtendDataset(); + void CreateDataFile(); + void CreateMasterDataFile(MasterAttributes *attr); + void CreateVirtualDataFile(uint32_t maxFramesPerFile, uint64_t numf); + void LinkVirtualInMaster(std::string fname, std::string dsetname); + hid_t GetDataTypeinC(DataType dtype); + + static std::mutex hdf5Lib; + + H5File *masterfd; + /** Virtual File handle ( only file name because + code in C as H5Pset_virtual doesnt exist yet in C++) */ + hid_t virtualfd; + H5File *filefd; + DataSpace *dataspace; + DataSet *dataset; + DataType datatype; + + uint32_t nPixelsX; + uint32_t nPixelsY; + uint32_t numFramesInFile; + int numFilesinAcquisition; + + std::vector parameterNames; + std::vector parameterDataTypes; + DataSpace *dataspace_para; + std::vector dataset_para; + + uint64_t extNumImages; +}; diff --git a/slsReceiverSoftware/src/Implementation_copy.cpp b/slsReceiverSoftware/src/Implementation_copy.cpp new file mode 100644 index 000000000..da7d972b1 --- /dev/null +++ b/slsReceiverSoftware/src/Implementation_copy.cpp @@ -0,0 +1,1717 @@ +#include "Implementation.h" +#include "DataProcessor.h" +#include "DataStreamer.h" +#include "Fifo.h" +#include "GeneralData.h" +#include "Listener.h" +#include "MasterAttributes.h" +#include "sls/ToString.h" +#include "sls/ZmqSocket.h" //just for the zmq port define +#include "sls/file_utils.h" + +#include //eperm +#include +#include //system +#include +#include //strcpy +#include +#include +#include // stat +#include +#include + +/** cosntructor & destructor */ + +Implementation::Implementation(const detectorType d) { setDetectorType(d); } + +Implementation::~Implementation() { + delete generalData; + generalData = nullptr; +} + +void Implementation::SetLocalNetworkParameters() { + // to increase Max length of input packet queue + int max_back_log; + const char *proc_file_name = "/proc/sys/net/core/netdev_max_backlog"; + { + std::ifstream proc_file(proc_file_name); + proc_file >> max_back_log; + } + + if (max_back_log < MAX_SOCKET_INPUT_PACKET_QUEUE) { + std::ofstream proc_file(proc_file_name); + if (proc_file.good()) { + proc_file << MAX_SOCKET_INPUT_PACKET_QUEUE << std::endl; + LOG(logINFOBLUE) + << "Max length of input packet queue " + "[/proc/sys/net/core/netdev_max_backlog] modified to " + << MAX_SOCKET_INPUT_PACKET_QUEUE; + } else { + LOG(logWARNING) + << "Could not change max length of " + "input packet queue [net.core.netdev_max_backlog]. (No Root " + "Privileges?)"; + } + } +} + +void Implementation::SetThreadPriorities() { + for (const auto &it : listener) + it->SetThreadPriority(LISTENER_PRIORITY); +} + +void Implementation::SetupFifoStructure() { + fifo.clear(); + for (int i = 0; i < numThreads; ++i) { + uint32_t datasize = generalData->imageSize; + // veto data size + if (myDetectorType == GOTTHARD2 && i != 0) { + datasize = generalData->vetoImageSize; + } + + // create fifo structure + try { + fifo.push_back(sls::make_unique( + i, datasize + (generalData->fifoBufferHeaderSize), fifoDepth)); + } catch (...) { + fifo.clear(); + fifoDepth = 0; + throw sls::RuntimeError( + "Could not allocate memory for fifo structure " + + std::to_string(i) + ". FifoDepth is now 0."); + } + // set the listener & dataprocessor threads to point to the right fifo + if (listener.size()) + listener[i]->SetFifo(fifo[i].get()); + if (dataProcessor.size()) + dataProcessor[i]->SetFifo(fifo[i].get()); + if (dataStreamer.size()) + dataStreamer[i]->SetFifo(fifo[i].get()); + + LOG(logINFO) << "Memory Allocated for Fifo " << i << ": " + << (double)(((size_t)(datasize) + + (size_t)(generalData->fifoBufferHeaderSize)) * + (size_t)fifoDepth) / + (double)(1024 * 1024) + << " MB"; + } + LOG(logINFO) << numThreads << " Fifo structure(s) reconstructed"; +} + +/************************************************** + * * + * Configuration Parameters * + * * + * ************************************************/ + +void Implementation::setDetectorType(const detectorType d) { + myDetectorType = d; + switch (myDetectorType) { + case GOTTHARD: + case EIGER: + case JUNGFRAU: + case CHIPTESTBOARD: + case MOENCH: + case MYTHEN3: + case GOTTHARD2: + LOG(logINFO) << " ***** " << sls::ToString(d) << " Receiver *****"; + break; + default: + throw sls::RuntimeError("This is an unknown receiver type " + + std::to_string(static_cast(d))); + } + + delete generalData; + generalData = nullptr; + + // set detector specific variables + switch (myDetectorType) { + case GOTTHARD: + generalData = new GotthardData(); + break; + case EIGER: + generalData = new EigerData(); + break; + case JUNGFRAU: + generalData = new JungfrauData(); + break; + case CHIPTESTBOARD: + generalData = new ChipTestBoardData(); + break; + case MOENCH: + generalData = new MoenchData(); + break; + case MYTHEN3: + generalData = new Mythen3Data(); + break; + case GOTTHARD2: + generalData = new Gotthard2Data(); + break; + default: + break; + } + numThreads = generalData->threadsPerReceiver; + fifoDepth = generalData->defaultFifoDepth; + udpSocketBufferSize = generalData->defaultUdpSocketBufferSize; + framesPerFile = generalData->maxFramesPerFile; + + SetLocalNetworkParameters(); + SetupFifoStructure(); + + // create threads + for (int i = 0; i < numThreads; ++i) { + + try { + auto fifo_ptr = fifo[i].get(); + listener.push_back(sls::make_unique( + i, myDetectorType, fifo_ptr, &status, &udpPortNum[i], ð[i], + &numberOfTotalFrames, &udpSocketBufferSize, + &actualUDPSocketBufferSize, &framesPerFile, &frameDiscardMode, + &activated, &deactivatedPaddingEnable, &silentMode)); + dataProcessor.push_back(sls::make_unique( + i, myDetectorType, fifo_ptr, activated, + deactivatedPaddingEnable, &dataStreamEnable, + &streamingFrequency, &streamingTimerInMs, &streamingStartFnum, + &framePadding, &silentMode, &ctbDbitList, &ctbDbitOffset, + &ctbAnalogDataBytes)); + } catch (...) { + listener.clear(); + dataProcessor.clear(); + throw sls::RuntimeError( + "Could not create listener/dataprocessor threads (index:" + + std::to_string(i) + ")"); + } + } + + // set up writer and callbacks + for (const auto &it : listener) + it->SetGeneralData(generalData); + for (const auto &it : dataProcessor) + it->SetGeneralData(generalData); + SetThreadPriorities(); + + LOG(logDEBUG) << " Detector type set to " << sls::ToString(d); +} + +int *Implementation::getDetectorSize() const { return (int *)numDet; } + +void Implementation::setDetectorSize(const int *size) { + std::string log_message = "Detector Size (ports): ("; + for (int i = 0; i < MAX_DIMENSIONS; ++i) { + // x dir (colums) each udp port + if (myDetectorType == EIGER && i == X) + numDet[i] = size[i] * 2; + // y dir (rows) each udp port + else if (numUDPInterfaces == 2 && i == Y) + numDet[i] = size[i] * 2; + else + numDet[i] = size[i]; + log_message += std::to_string(numDet[i]); + if (i < MAX_DIMENSIONS - 1) + log_message += ", "; + } + log_message += ")"; + + int nd[2] = {numDet[0], numDet[1]}; + if (quadEnable) { + nd[0] = 1; + nd[1] = 2; + } + for (const auto &it : dataStreamer) { + it->SetNumberofDetectors(nd); + } + + LOG(logINFO) << log_message; +} + +int Implementation::getModulePositionId() const { return modulePos; } + +void Implementation::setModulePositionId(const int id) { + modulePos = id; + LOG(logINFO) << "Module Position Id:" << modulePos; + + // update zmq port + streamingPort = + DEFAULT_ZMQ_RX_PORTNO + (modulePos * (myDetectorType == EIGER ? 2 : 1)); + + for (unsigned int i = 0; i < dataProcessor.size(); ++i) { + dataProcessor[i]->SetupFileWriter( + fileFormatType, fileWriteEnable, masterFileWriteEnable, activated, + deactivatedPaddingEnable, (int *)numDet, &framesPerFile, &fileName, + &filePath, &fileIndex, &overwriteEnable, &modulePos, &numThreads, + &numberOfTotalFrames, &dynamicRange, &udpPortNum[i], generalData); + } + assert(numDet[1] != 0); + for (unsigned int i = 0; i < listener.size(); ++i) { + uint16_t row = 0, col = 0; + row = + (modulePos % numDet[1]) * ((numUDPInterfaces == 2) ? 2 : 1); // row + col = (modulePos / numDet[1]) * ((myDetectorType == EIGER) ? 2 : 1) + + i; // col for horiz. udp ports + listener[i]->SetHardCodedPosition(row, col); + } +} + +std::string Implementation::getDetectorHostname() const { return detHostname; } + +void Implementation::setDetectorHostname(const std::string &c) { + if (!c.empty()) + detHostname = c; + LOG(logINFO) << "Detector Hostname: " << detHostname; +} + +bool Implementation::getSilentMode() const { return silentMode; } + +void Implementation::setSilentMode(const bool i) { + silentMode = i; + LOG(logINFO) << "Silent Mode: " << i; +} + +uint32_t Implementation::getFifoDepth() const { return fifoDepth; } + +void Implementation::setFifoDepth(const uint32_t i) { + if (fifoDepth != i) { + fifoDepth = i; + SetupFifoStructure(); + } + LOG(logINFO) << "Fifo Depth: " << i; +} + +slsDetectorDefs::frameDiscardPolicy +Implementation::getFrameDiscardPolicy() const { + return frameDiscardMode; +} + +void Implementation::setFrameDiscardPolicy(const frameDiscardPolicy i) { + frameDiscardMode = i; + LOG(logINFO) << "Frame Discard Policy: " << sls::ToString(frameDiscardMode); +} + +bool Implementation::getFramePaddingEnable() const { return framePadding; } + +void Implementation::setFramePaddingEnable(const bool i) { + framePadding = i; + LOG(logINFO) << "Frame Padding: " << framePadding; +} + +void Implementation::setThreadIds(const pid_t parentTid, const pid_t tcpTid) { + parentThreadId = parentTid; + tcpThreadId = tcpTid; +} + +std::array Implementation::getThreadIds() const { + std::array retval{}; + int id = 0; + retval[id++] = parentThreadId; + retval[id++] = tcpThreadId; + retval[id++] = listener[0]->GetThreadId(); + retval[id++] = dataProcessor[0]->GetThreadId(); + if (dataStreamEnable) { + retval[id++] = dataStreamer[0]->GetThreadId(); + } else { + retval[id++] = 0; + } + if (numThreads == 2) { + retval[id++] = listener[1]->GetThreadId(); + retval[id++] = dataProcessor[1]->GetThreadId(); + if (dataStreamEnable) { + retval[id++] = dataStreamer[1]->GetThreadId(); + } else { + retval[id++] = 0; + } + } + return retval; +} + +/************************************************** + * * + * File Parameters * + * * + * ************************************************/ +slsDetectorDefs::fileFormat Implementation::getFileFormat() const { + return fileFormatType; +} + +void Implementation::setFileFormat(const fileFormat f) { + if (f != fileFormatType) { + switch (f) { +#ifdef HDF5C + case HDF5: + fileFormatType = HDF5; + break; +#endif + default: + fileFormatType = BINARY; + break; + } + if (fileWriteEnable) { + for (unsigned int i = 0; i < dataProcessor.size(); ++i) { + dataProcessor[i]->SetupFileWriter( + fileFormatType, fileWriteEnable, masterFileWriteEnable, + activated, deactivatedPaddingEnable, (int *)numDet, + &framesPerFile, &fileName, &filePath, &fileIndex, + &overwriteEnable, &modulePos, &numThreads, + &numberOfTotalFrames, &dynamicRange, &udpPortNum[i], + generalData); + } + } + } + + LOG(logINFO) << "File Format: " << sls::ToString(fileFormatType); +} + +std::string Implementation::getFilePath() const { return filePath; } + +void Implementation::setFilePath(const std::string &c) { + if (!c.empty()) { + mkdir_p(c); // throws if it can't create + filePath = c; + } + LOG(logINFO) << "File path: " << filePath; +} + +std::string Implementation::getFileName() const { return fileName; } + +void Implementation::setFileName(const std::string &c) { + fileName = c; + LOG(logINFO) << "File name: " << fileName; +} + +uint64_t Implementation::getFileIndex() const { return fileIndex; } + +void Implementation::setFileIndex(const uint64_t i) { + fileIndex = i; + LOG(logINFO) << "File Index: " << fileIndex; +} + +bool Implementation::getFileWriteEnable() const { return fileWriteEnable; } + +void Implementation::setFileWriteEnable(const bool b) { + if (fileWriteEnable != b) { + fileWriteEnable = b; + for (unsigned int i = 0; i < dataProcessor.size(); ++i) { + dataProcessor[i]->SetupFileWriter( + fileFormatType, fileWriteEnable, masterFileWriteEnable, + activated, deactivatedPaddingEnable, (int *)numDet, + &framesPerFile, &fileName, &filePath, &fileIndex, + &overwriteEnable, &modulePos, &numThreads, &numberOfTotalFrames, + &dynamicRange, &udpPortNum[i], generalData); + } + } + LOG(logINFO) << "File Write Enable: " + << (fileWriteEnable ? "enabled" : "disabled"); +} + +bool Implementation::getMasterFileWriteEnable() const { + return masterFileWriteEnable; +} + +void Implementation::setMasterFileWriteEnable(const bool b) { + if (masterFileWriteEnable != b) { + masterFileWriteEnable = b; + for (unsigned int i = 0; i < dataProcessor.size(); ++i) { + dataProcessor[i]->SetupFileWriter( + fileFormatType, fileWriteEnable, masterFileWriteEnable, + activated, deactivatedPaddingEnable, (int *)numDet, + &framesPerFile, &fileName, &filePath, &fileIndex, + &overwriteEnable, &modulePos, &numThreads, &numberOfTotalFrames, + &dynamicRange, &udpPortNum[i], generalData); + } + } + LOG(logINFO) << "Master File Write Enable: " + << (masterFileWriteEnable ? "enabled" : "disabled"); +} + +bool Implementation::getOverwriteEnable() const { return overwriteEnable; } + +void Implementation::setOverwriteEnable(const bool b) { + overwriteEnable = b; + LOG(logINFO) << "Overwrite Enable: " + << (overwriteEnable ? "enabled" : "disabled"); +} + +uint32_t Implementation::getFramesPerFile() const { return framesPerFile; } + +void Implementation::setFramesPerFile(const uint32_t i) { + framesPerFile = i; + LOG(logINFO) << "Frames per file: " << framesPerFile; +} + +/************************************************** + * * + * Acquisition * + * * + * ************************************************/ +slsDetectorDefs::runStatus Implementation::getStatus() const { return status; } + +uint64_t Implementation::getFramesCaught() const { + uint64_t min = -1; + uint32_t flagsum = 0; + + for (const auto &it : dataProcessor) { + flagsum += it->GetStartedFlag(); + min = std::min(min, it->GetNumFramesCaught()); + } + // no data processed + if (flagsum != dataProcessor.size()) + return 0; + + return min; +} + +uint64_t Implementation::getAcquisitionIndex() const { + uint64_t min = -1; + uint32_t flagsum = 0; + + for (const auto &it : dataProcessor) { + flagsum += it->GetStartedFlag(); + min = std::min(min, it->GetCurrentFrameIndex()); + } + // no data processed + if (flagsum != dataProcessor.size()) + return 0; + return min; +} + +double Implementation::getProgress() const { + // get minimum of processed frame indices + uint64_t currentFrameIndex = -1; + uint32_t flagsum = 0; + + for (const auto &it : dataProcessor) { + flagsum += it->GetStartedFlag(); + currentFrameIndex = + std::min(currentFrameIndex, it->GetProcessedIndex()); + } + // no data processed + if (flagsum != dataProcessor.size()) { + currentFrameIndex = -1; + } + + return (100.00 * + ((double)(currentFrameIndex + 1) / (double)numberOfTotalFrames)); +} + +std::vector Implementation::getNumMissingPackets() const { + std::vector mp(numThreads); + for (int i = 0; i < numThreads; i++) { + int np = generalData->packetsPerFrame; + uint64_t totnp = np; + // partial readout + if (numLinesReadout != MAX_EIGER_ROWS_PER_READOUT) { + totnp = ((numLinesReadout * np) / MAX_EIGER_ROWS_PER_READOUT); + } + totnp *= numberOfTotalFrames; + mp[i] = listener[i]->GetNumMissingPacket(stoppedFlag, totnp); + } + return mp; +} + +void Implementation::setScan(slsDetectorDefs::scanParameters s) { + scanParams = s; + LOG(logINFO) << "Scan parameters: " << sls::ToString(scanParams); +} + +void Implementation::startReceiver() { + LOG(logINFO) << "Starting Receiver"; + stoppedFlag = false; + ResetParametersforNewAcquisition(); + + // listener + CreateUDPSockets(); + + // callbacks + if (startAcquisitionCallBack) { + try { + startAcquisitionCallBack(filePath, fileName, fileIndex, + (generalData->imageSize) + + (generalData->fifoBufferHeaderSize), + pStartAcquisition); + } catch (const std::exception &e) { + throw sls::RuntimeError("Start Acquisition Callback Error: " + + std::string(e.what())); + } + if (rawDataReadyCallBack != nullptr) { + LOG(logINFO) << "Data Write has been defined externally"; + } + } + + // processor->writer + if (fileWriteEnable) { + SetupWriter(); + } else + LOG(logINFO) << "File Write Disabled"; + + LOG(logINFO) << "Ready ..."; + + // status + status = RUNNING; + + // Let Threads continue to be ready for acquisition + StartRunning(); + + LOG(logINFO) << "Receiver Started"; + LOG(logINFO) << "Status: " << sls::ToString(status); +} + +void Implementation::setStoppedFlag(bool stopped) { stoppedFlag = stopped; } + +void Implementation::stopReceiver() { + LOG(logINFO) << "Stopping Receiver"; + + // set status to transmitting + startReadout(); + + // wait for the processes (Listener and DataProcessor) to be done + bool running = true; + while (running) { + running = false; + for (const auto &it : listener) + if (it->IsRunning()) + running = true; + + for (const auto &it : dataProcessor) + if (it->IsRunning()) + running = true; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + + // create virtual file + if (fileWriteEnable && fileFormatType == HDF5) { + uint64_t maxIndexCaught = 0; + bool anycaught = false; + for (const auto &it : dataProcessor) { + maxIndexCaught = std::max(maxIndexCaught, it->GetProcessedIndex()); + if (it->GetStartedFlag()) + anycaught = true; + } + // to create virtual file & set files/acquisition to 0 (only hdf5 at the + // moment) + dataProcessor[0]->EndofAcquisition(anycaught, maxIndexCaught); + } + + // wait for the processes (dataStreamer) to be done + running = true; + while (running) { + running = false; + for (const auto &it : dataStreamer) + if (it->IsRunning()) + running = true; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + + status = RUN_FINISHED; + LOG(logINFO) << "Status: " << sls::ToString(status); + + { // statistics + std::vector mp = getNumMissingPackets(); + uint64_t tot = 0; + for (int i = 0; i < numThreads; i++) { + int nf = dataProcessor[i]->GetNumFramesCaught(); + tot += nf; + std::string mpMessage = std::to_string((int64_t)mp[i]); + if ((int64_t)mp[i] < 0) { + mpMessage = + std::to_string(abs(mp[i])) + std::string(" (Extra)"); + } + + TLogLevel lev = (((int64_t)mp[i]) > 0) ? logINFORED : logINFOGREEN; + LOG(lev) << + // udp port number could be the second if selected interface is + // 2 for jungfrau + "Summary of Port " << udpPortNum[i] + << "\n\tMissing Packets\t\t: " << mpMessage + << "\n\tComplete Frames\t\t: " << nf + << "\n\tLast Frame Caught\t: " + << listener[i]->GetLastFrameIndexCaught(); + } + if (!activated) { + LOG(logINFORED) << "Deactivated Receiver"; + } + // callback + if (acquisitionFinishedCallBack) { + try { + acquisitionFinishedCallBack((tot / numThreads), + pAcquisitionFinished); + } catch (const std::exception &e) { + // change status + status = IDLE; + LOG(logINFO) << "Receiver Stopped"; + LOG(logINFO) << "Status: " << sls::ToString(status); + throw sls::RuntimeError( + "Acquisition Finished Callback Error: " + + std::string(e.what())); + } + } + } + + // change status + status = IDLE; + LOG(logINFO) << "Receiver Stopped"; + LOG(logINFO) << "Status: " << sls::ToString(status); +} + +void Implementation::startReadout() { + if (status == RUNNING) { + // wait for incoming delayed packets + int totalPacketsReceived = 0; + int previousValue = -1; + for (const auto &it : listener) + totalPacketsReceived += it->GetPacketsCaught(); + + // wait for all packets + const int numPacketsToReceive = numberOfTotalFrames * + generalData->packetsPerFrame * + listener.size(); + if (totalPacketsReceived != numPacketsToReceive) { + while (totalPacketsReceived != previousValue) { + LOG(logDEBUG3) + << "waiting for all packets, previousValue:" + << previousValue + << " totalPacketsReceived: " << totalPacketsReceived; + /* TODO! Need to find optimal time **/ + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + previousValue = totalPacketsReceived; + totalPacketsReceived = 0; + for (const auto &it : listener) + totalPacketsReceived += it->GetPacketsCaught(); + + LOG(logDEBUG3) << "\tupdated: totalPacketsReceived:" + << totalPacketsReceived; + } + } + status = TRANSMITTING; + LOG(logINFO) << "Status: Transmitting"; + } + // shut down udp sockets to make listeners push dummy (end) packets for + // processors + shutDownUDPSockets(); +} + +void Implementation::shutDownUDPSockets() { + for (const auto &it : listener) + it->ShutDownUDPSocket(); +} + +void Implementation::closeFiles() { + uint64_t maxIndexCaught = 0; + bool anycaught = false; + for (const auto &it : dataProcessor) { + it->CloseFiles(); + maxIndexCaught = std::max(maxIndexCaught, it->GetProcessedIndex()); + if (it->GetStartedFlag()) + anycaught = true; + } + // to create virtual file & set files/acquisition to 0 (only hdf5 at the + // moment) + dataProcessor[0]->EndofAcquisition(anycaught, maxIndexCaught); +} + +void Implementation::restreamStop() { + for (const auto &it : dataStreamer) + it->RestreamStop(); + LOG(logINFO) << "Restreaming Dummy Header via ZMQ successful"; +} + +void Implementation::ResetParametersforNewAcquisition() { + for (const auto &it : listener) + it->ResetParametersforNewAcquisition(); + for (const auto &it : dataProcessor) + it->ResetParametersforNewAcquisition(); + + if (dataStreamEnable) { + std::ostringstream os; + os << filePath << '/' << fileName; + std::string fnametostream = os.str(); + for (const auto &it : dataStreamer) + it->ResetParametersforNewAcquisition(fnametostream); + } +} + +void Implementation::CreateUDPSockets() { + try { + for (unsigned int i = 0; i < listener.size(); ++i) { + listener[i]->CreateUDPSockets(); + } + } catch (const sls::RuntimeError &e) { + shutDownUDPSockets(); + throw sls::RuntimeError("Could not create UDP Socket(s)."); + } + LOG(logDEBUG) << "UDP socket(s) created successfully."; +} + +void Implementation::SetupWriter() { + // master file + if (masterFileWriteEnable) { + std::unique_ptr masterAttributes; + switch (myDetectorType) { + case GOTTHARD: + masterAttributes = sls::make_unique(); + break; + case JUNGFRAU: + masterAttributes = sls::make_unique(); + break; + case EIGER: + masterAttributes = sls::make_unique(); + break; + case MYTHEN3: + masterAttributes = sls::make_unique(); + break; + case GOTTHARD2: + masterAttributes = sls::make_unique(); + break; + case MOENCH: + masterAttributes = sls::make_unique(); + break; + case CHIPTESTBOARD: + masterAttributes = sls::make_unique(); + break; + default: + throw sls::RuntimeError( + "Unknown detector type to set up master file attributes"); + } + masterAttributes->detType = myDetectorType; + masterAttributes->timingMode = timingMode; + masterAttributes->imageSize = generalData->imageSize; + masterAttributes->nPixels = + xy(generalData->nPixelsX, generalData->nPixelsY); + masterAttributes->maxFramesPerFile = framesPerFile; + masterAttributes->frameDiscardMode = frameDiscardMode; + masterAttributes->framePadding = framePadding; + masterAttributes->scanParams = scanParams; + masterAttributes->totalFrames = numberOfTotalFrames; + masterAttributes->exptime = acquisitionTime; + masterAttributes->period = acquisitionPeriod; + masterAttributes->burstMode = burstMode; + masterAttributes->numUDPInterfaces = numUDPInterfaces; + masterAttributes->dynamicRange = dynamicRange; + masterAttributes->tenGiga = tengigaEnable; + masterAttributes->thresholdEnergyeV = thresholdEnergyeV; + masterAttributes->thresholdAllEnergyeV = thresholdAllEnergyeV; + masterAttributes->subExptime = subExpTime; + masterAttributes->subPeriod = subPeriod; + masterAttributes->quad = quadEnable; + masterAttributes->numLinesReadout = numLinesReadout; + masterAttributes->ratecorr = rateCorrections; + masterAttributes->adcmask = + tengigaEnable ? adcEnableMaskTenGiga : adcEnableMaskOneGiga; + masterAttributes->analog = + (readoutType == ANALOG_ONLY || readoutType == ANALOG_AND_DIGITAL) + ? 1 + : 0; + masterAttributes->analogSamples = numberOfAnalogSamples; + masterAttributes->digital = + (readoutType == DIGITAL_ONLY || readoutType == ANALOG_AND_DIGITAL) + ? 1 + : 0; + masterAttributes->digitalSamples = numberOfDigitalSamples; + masterAttributes->dbitoffset = ctbDbitOffset; + masterAttributes->dbitlist = 0; + for (auto &i : ctbDbitList) { + masterAttributes->dbitlist |= (1 << i); + } + masterAttributes->roi = roi; + masterAttributes->counterMask = counterMask; + masterAttributes->exptime1 = acquisitionTime1; + masterAttributes->exptime2 = acquisitionTime2; + masterAttributes->exptime3 = acquisitionTime3; + masterAttributes->gateDelay1 = gateDelay1; + masterAttributes->gateDelay2 = gateDelay2; + masterAttributes->gateDelay3 = gateDelay3; + masterAttributes->gates = numberOfGates; + masterAttributes->additionalJsonHeader = additionalJsonHeader; + try { + dataProcessor[0]->CreateMasterFile(masterAttributes.get()); + } catch (const sls::RuntimeError &e) { + shutDownUDPSockets(); + closeFiles(); + throw sls::RuntimeError("Could not create master file."); + } + } + + // first data file + //->startofacquisition(which has all the start, and createfirstdatafile) + try { + for (unsigned int i = 0; i < dataProcessor.size(); ++i) { + dataProcessor[i]->CreateFirstDataFile(); + } + } catch (const sls::RuntimeError &e) { + shutDownUDPSockets(); + closeFiles(); + throw sls::RuntimeError("Could not create first data file."); + } +} + +void Implementation::StartRunning() { + + // set running mask and post semaphore to start the inner loop in execution + // thread + for (const auto &it : listener) { + it->StartRunning(); + it->Continue(); + } + for (const auto &it : dataProcessor) { + it->StartRunning(); + it->Continue(); + } + for (const auto &it : dataStreamer) { + it->StartRunning(); + it->Continue(); + } +} + +/************************************************** + * * + * Network Configuration (UDP) * + * * + * ************************************************/ +int Implementation::getNumberofUDPInterfaces() const { + return numUDPInterfaces; +} + +void Implementation::setNumberofUDPInterfaces(const int n) { + + if (numUDPInterfaces != n) { + + // reduce number of detectors in y dir (rows) if it had 2 interfaces + // before + if (numUDPInterfaces == 2) + numDet[Y] /= 2; + + numUDPInterfaces = n; + + // clear all threads and fifos + listener.clear(); + dataProcessor.clear(); + dataStreamer.clear(); + fifo.clear(); + + // set local variables + generalData->SetNumberofInterfaces(n); + numThreads = generalData->threadsPerReceiver; + udpSocketBufferSize = generalData->defaultUdpSocketBufferSize; + + // fifo + SetupFifoStructure(); + + // create threads + for (int i = 0; i < numThreads; ++i) { + // listener and dataprocessor threads + try { + auto fifo_ptr = fifo[i].get(); + listener.push_back(sls::make_unique( + i, myDetectorType, fifo_ptr, &status, &udpPortNum[i], + ð[i], &numberOfTotalFrames, &udpSocketBufferSize, + &actualUDPSocketBufferSize, &framesPerFile, + &frameDiscardMode, &activated, &deactivatedPaddingEnable, + &silentMode)); + listener[i]->SetGeneralData(generalData); + + dataProcessor.push_back(sls::make_unique( + i, myDetectorType, fifo_ptr, activated, + deactivatedPaddingEnable, &dataStreamEnable, + &streamingFrequency, &streamingTimerInMs, + &streamingStartFnum, &framePadding, &silentMode, + &ctbDbitList, &ctbDbitOffset, &ctbAnalogDataBytes)); + dataProcessor[i]->SetGeneralData(generalData); + } catch (...) { + listener.clear(); + dataProcessor.clear(); + throw sls::RuntimeError( + "Could not create listener/dataprocessor threads (index:" + + std::to_string(i) + ")"); + } + // streamer threads + if (dataStreamEnable) { + try { + int fd = flippedDataX; + int nd[2] = {numDet[0], numDet[1]}; + if (quadEnable) { + fd = i; + nd[0] = 1; + nd[1] = 2; + } + dataStreamer.push_back(sls::make_unique( + i, fifo[i].get(), &dynamicRange, &roi, &fileIndex, fd, + (int *)nd, &quadEnable, &numberOfTotalFrames)); + dataStreamer[i]->SetGeneralData(generalData); + dataStreamer[i]->CreateZmqSockets( + &numThreads, streamingPort, streamingSrcIP, + streamingHwm); + dataStreamer[i]->SetAdditionalJsonHeader( + additionalJsonHeader); + + } catch (...) { + if (dataStreamEnable) { + dataStreamer.clear(); + dataStreamEnable = false; + } + throw sls::RuntimeError( + "Could not create datastreamer threads (index:" + + std::to_string(i) + ")"); + } + } + } + + SetThreadPriorities(); + + // update (from 1 to 2 interface) & also for printout + setDetectorSize(numDet); + // update row and column in dataprocessor + setModulePositionId(modulePos); + + // update call backs + if (rawDataReadyCallBack) { + for (const auto &it : dataProcessor) + it->registerCallBackRawDataReady(rawDataReadyCallBack, + pRawDataReady); + } + if (rawDataModifyReadyCallBack) { + for (const auto &it : dataProcessor) + it->registerCallBackRawDataModifyReady( + rawDataModifyReadyCallBack, pRawDataReady); + } + + // test socket buffer size with current set up + setUDPSocketBufferSize(0); + } + + LOG(logINFO) << "Number of Interfaces: " << numUDPInterfaces; +} + +std::string Implementation::getEthernetInterface() const { return eth[0]; } + +void Implementation::setEthernetInterface(const std::string &c) { + eth[0] = c; + LOG(logINFO) << "Ethernet Interface: " << eth[0]; +} + +std::string Implementation::getEthernetInterface2() const { return eth[1]; } + +void Implementation::setEthernetInterface2(const std::string &c) { + eth[1] = c; + LOG(logINFO) << "Ethernet Interface 2: " << eth[1]; +} + +uint32_t Implementation::getUDPPortNumber() const { return udpPortNum[0]; } + +void Implementation::setUDPPortNumber(const uint32_t i) { + udpPortNum[0] = i; + LOG(logINFO) << "UDP Port Number[0]: " << udpPortNum[0]; +} + +uint32_t Implementation::getUDPPortNumber2() const { return udpPortNum[1]; } + +void Implementation::setUDPPortNumber2(const uint32_t i) { + udpPortNum[1] = i; + LOG(logINFO) << "UDP Port Number[1]: " << udpPortNum[1]; +} + +int Implementation::getUDPSocketBufferSize() const { + return udpSocketBufferSize; +} + +void Implementation::setUDPSocketBufferSize(const int s) { + // custom setup is not 0 (must complain if set up didnt work) + // testing default setup at startup, argument is 0 to use default values + int size = (s == 0) ? udpSocketBufferSize : s; + size_t listSize = listener.size(); + if (myDetectorType == JUNGFRAU && (int)listSize != numUDPInterfaces) { + throw sls::RuntimeError( + "Number of Interfaces " + std::to_string(numUDPInterfaces) + + " do not match listener size " + std::to_string(listSize)); + } + + for (auto &l : listener) { + l->CreateDummySocketForUDPSocketBufferSize(size); + } + // custom and didnt set, throw error + if (s != 0 && udpSocketBufferSize != s) { + throw sls::RuntimeError("Could not set udp socket buffer size. (No " + "CAP_NET_ADMIN privileges?)"); + } +} + +int Implementation::getActualUDPSocketBufferSize() const { + return actualUDPSocketBufferSize; +} + +/************************************************** + * * + * ZMQ Streaming Parameters (ZMQ) * + * * + * ************************************************/ +bool Implementation::getDataStreamEnable() const { return dataStreamEnable; } + +void Implementation::setDataStreamEnable(const bool enable) { + if (dataStreamEnable != enable) { + dataStreamEnable = enable; + + // data sockets have to be created again as the client ones are + dataStreamer.clear(); + + if (enable) { + for (int i = 0; i < numThreads; ++i) { + try { + int fd = flippedDataX; + int nd[2] = {numDet[0], numDet[1]}; + if (quadEnable) { + fd = i; + nd[0] = 1; + nd[1] = 2; + } + dataStreamer.push_back(sls::make_unique( + i, fifo[i].get(), &dynamicRange, &roi, &fileIndex, fd, + (int *)nd, &quadEnable, &numberOfTotalFrames)); + dataStreamer[i]->SetGeneralData(generalData); + dataStreamer[i]->CreateZmqSockets( + &numThreads, streamingPort, streamingSrcIP, + streamingHwm); + dataStreamer[i]->SetAdditionalJsonHeader( + additionalJsonHeader); + } catch (...) { + dataStreamer.clear(); + dataStreamEnable = false; + throw sls::RuntimeError( + "Could not set data stream enable."); + } + } + SetThreadPriorities(); + } + } + LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable; +} + +uint32_t Implementation::getStreamingFrequency() const { + return streamingFrequency; +} + +void Implementation::setStreamingFrequency(const uint32_t freq) { + streamingFrequency = freq; + LOG(logINFO) << "Streaming Frequency: " << streamingFrequency; +} + +uint32_t Implementation::getStreamingTimer() const { + return streamingTimerInMs; +} + +void Implementation::setStreamingTimer(const uint32_t time_in_ms) { + streamingTimerInMs = time_in_ms; + LOG(logINFO) << "Streamer Timer: " << streamingTimerInMs; +} + +uint32_t Implementation::getStreamingStartingFrameNumber() const { + return streamingStartFnum; +} + +void Implementation::setStreamingStartingFrameNumber(const uint32_t fnum) { + streamingStartFnum = fnum; + LOG(logINFO) << "Streaming Start Frame num: " << streamingStartFnum; +} + +uint32_t Implementation::getStreamingPort() const { return streamingPort; } + +void Implementation::setStreamingPort(const uint32_t i) { + streamingPort = i; + LOG(logINFO) << "Streaming Port: " << streamingPort; +} + +sls::IpAddr Implementation::getStreamingSourceIP() const { + return streamingSrcIP; +} + +void Implementation::setStreamingSourceIP(const sls::IpAddr ip) { + streamingSrcIP = ip; + LOG(logINFO) << "Streaming Source IP: " << streamingSrcIP; +} + +int Implementation::getStreamingHwm() const { return streamingHwm; } + +void Implementation::setStreamingHwm(const int i) { + streamingHwm = i; + LOG(logINFO) << "Streaming Hwm: " + << (i == -1 ? "Default (-1)" : std::to_string(streamingHwm)); +} + +std::map +Implementation::getAdditionalJsonHeader() const { + return additionalJsonHeader; +} + +void Implementation::setAdditionalJsonHeader( + const std::map &c) { + + additionalJsonHeader = c; + for (const auto &it : dataStreamer) { + it->SetAdditionalJsonHeader(c); + } + LOG(logINFO) << "Additional JSON Header: " + << sls::ToString(additionalJsonHeader); +} + +std::string +Implementation::getAdditionalJsonParameter(const std::string &key) const { + if (additionalJsonHeader.find(key) != additionalJsonHeader.end()) { + return additionalJsonHeader.at(key); + } + throw sls::RuntimeError("No key " + key + + " found in additional json header"); +} + +void Implementation::setAdditionalJsonParameter(const std::string &key, + const std::string &value) { + auto pos = additionalJsonHeader.find(key); + // if value is empty, delete + if (value.empty()) { + // doesnt exist + if (pos == additionalJsonHeader.end()) { + LOG(logINFO) << "Additional json parameter (" << key + << ") does not exist anyway"; + } else { + LOG(logINFO) << "Deleting additional json parameter (" << key + << ")"; + additionalJsonHeader.erase(pos); + } + } + // if found, set it + else if (pos != additionalJsonHeader.end()) { + additionalJsonHeader[key] = value; + LOG(logINFO) << "Setting additional json parameter (" << key << ") to " + << value; + } + // append if not found + else { + additionalJsonHeader[key] = value; + LOG(logINFO) << "Adding additional json parameter (" << key << ") to " + << value; + } + for (const auto &it : dataStreamer) { + it->SetAdditionalJsonHeader(additionalJsonHeader); + } + LOG(logINFO) << "Additional JSON Header: " + << sls::ToString(additionalJsonHeader); +} + +/************************************************** + * * + * Detector Parameters * + * * + * ************************************************/ +void Implementation::updateTotalNumberOfFrames() { + int64_t repeats = numberOfTriggers; + int64_t numFrames = numberOfFrames; + // gotthard2 + if (myDetectorType == GOTTHARD2) { + // auto + if (timingMode == AUTO_TIMING) { + // burst mode, repeats = #bursts + if (burstMode == BURST_INTERNAL || burstMode == BURST_EXTERNAL) { + repeats = numberOfBursts; + } + // continuous, repeats = 1 (no trigger as well) + else { + repeats = 1; + } + } + // trigger + else { + // continuous, numFrames is limited + if (burstMode == CONTINUOUS_INTERNAL || + burstMode == CONTINUOUS_EXTERNAL) { + numFrames = 1; + } + } + } + numberOfTotalFrames = + numFrames * repeats * (int64_t)(numberOfAdditionalStorageCells + 1); + if (numberOfTotalFrames == 0) { + throw sls::RuntimeError("Invalid total number of frames to receive: 0"); + } + LOG(logINFO) << "Total Number of Frames: " << numberOfTotalFrames; +} + +uint64_t Implementation::getNumberOfFrames() const { return numberOfFrames; } + +void Implementation::setNumberOfFrames(const uint64_t i) { + numberOfFrames = i; + LOG(logINFO) << "Number of Frames: " << numberOfFrames; + updateTotalNumberOfFrames(); +} + +uint64_t Implementation::getNumberOfTriggers() const { + return numberOfTriggers; +} + +void Implementation::setNumberOfTriggers(const uint64_t i) { + numberOfTriggers = i; + LOG(logINFO) << "Number of Triggers: " << numberOfTriggers; + updateTotalNumberOfFrames(); +} + +uint64_t Implementation::getNumberOfBursts() const { return numberOfBursts; } + +void Implementation::setNumberOfBursts(const uint64_t i) { + numberOfBursts = i; + LOG(logINFO) << "Number of Bursts: " << numberOfBursts; + updateTotalNumberOfFrames(); +} + +int Implementation::getNumberOfAdditionalStorageCells() const { + return numberOfAdditionalStorageCells; +} + +void Implementation::setNumberOfAdditionalStorageCells(const int i) { + numberOfAdditionalStorageCells = i; + LOG(logINFO) << "Number of Additional Storage Cells: " + << numberOfAdditionalStorageCells; + updateTotalNumberOfFrames(); +} + +void Implementation::setNumberOfGates(const int i) { + numberOfGates = i; + LOG(logINFO) << "Number of Gates: " << numberOfGates; +} + +slsDetectorDefs::timingMode Implementation::getTimingMode() const { + return timingMode; +} + +void Implementation::setTimingMode(const slsDetectorDefs::timingMode i) { + timingMode = i; + LOG(logINFO) << "Timing Mode: " << timingMode; + updateTotalNumberOfFrames(); +} + +slsDetectorDefs::burstMode Implementation::getBurstMode() const { + return burstMode; +} + +void Implementation::setBurstMode(const slsDetectorDefs::burstMode i) { + burstMode = i; + LOG(logINFO) << "Burst Mode: " << burstMode; + updateTotalNumberOfFrames(); +} + +ns Implementation::getAcquisitionPeriod() const { return acquisitionPeriod; } + +void Implementation::setAcquisitionPeriod(const ns i) { + acquisitionPeriod = i; + LOG(logINFO) << "Acquisition Period: " << sls::ToString(acquisitionPeriod); +} + +ns Implementation::getAcquisitionTime() const { return acquisitionTime; } + +void Implementation::updateAcquisitionTime() { + if (acquisitionTime1 == acquisitionTime2 && + acquisitionTime2 == acquisitionTime3) { + acquisitionTime = acquisitionTime1; + } else { + acquisitionTime = std::chrono::nanoseconds(0); + } +} + +void Implementation::setAcquisitionTime(const ns i) { + acquisitionTime = i; + LOG(logINFO) << "Acquisition Time: " << sls::ToString(acquisitionTime); +} + +void Implementation::setAcquisitionTime1(const ns i) { + acquisitionTime1 = i; + LOG(logINFO) << "Acquisition Time1: " << sls::ToString(acquisitionTime1); + updateAcquisitionTime(); +} + +void Implementation::setAcquisitionTime2(const ns i) { + acquisitionTime2 = i; + LOG(logINFO) << "Acquisition Time2: " << sls::ToString(acquisitionTime2); + updateAcquisitionTime(); +} + +void Implementation::setAcquisitionTime3(const ns i) { + acquisitionTime3 = i; + LOG(logINFO) << "Acquisition Time3: " << sls::ToString(acquisitionTime3); + updateAcquisitionTime(); +} + +void Implementation::setGateDelay1(const ns i) { + gateDelay1 = i; + LOG(logINFO) << "Gate Delay1: " << sls::ToString(gateDelay1); +} + +void Implementation::setGateDelay2(const ns i) { + gateDelay2 = i; + LOG(logINFO) << "Gate Delay2: " << sls::ToString(gateDelay2); +} + +void Implementation::setGateDelay3(const ns i) { + gateDelay3 = i; + LOG(logINFO) << "Gate Delay3: " << sls::ToString(gateDelay3); +} + +ns Implementation::getSubExpTime() const { return subExpTime; } + +void Implementation::setSubExpTime(const ns i) { + subExpTime = i; + LOG(logINFO) << "Sub Exposure Time: " << sls::ToString(subExpTime); +} + +ns Implementation::getSubPeriod() const { return subPeriod; } + +void Implementation::setSubPeriod(const ns i) { + subPeriod = i; + LOG(logINFO) << "Sub Period: " << sls::ToString(subPeriod); +} + +uint32_t Implementation::getNumberofAnalogSamples() const { + return numberOfAnalogSamples; +} + +void Implementation::setNumberofAnalogSamples(const uint32_t i) { + if (numberOfAnalogSamples != i) { + numberOfAnalogSamples = i; + + ctbAnalogDataBytes = generalData->setImageSize( + tengigaEnable ? adcEnableMaskTenGiga : adcEnableMaskOneGiga, + numberOfAnalogSamples, numberOfDigitalSamples, tengigaEnable, + readoutType); + + for (const auto &it : dataProcessor) + it->SetPixelDimension(); + SetupFifoStructure(); + } + LOG(logINFO) << "Number of Analog Samples: " << numberOfAnalogSamples; + LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); +} + +uint32_t Implementation::getNumberofDigitalSamples() const { + return numberOfDigitalSamples; +} + +void Implementation::setNumberofDigitalSamples(const uint32_t i) { + if (numberOfDigitalSamples != i) { + numberOfDigitalSamples = i; + + ctbAnalogDataBytes = generalData->setImageSize( + tengigaEnable ? adcEnableMaskTenGiga : adcEnableMaskOneGiga, + numberOfAnalogSamples, numberOfDigitalSamples, tengigaEnable, + readoutType); + + for (const auto &it : dataProcessor) + it->SetPixelDimension(); + SetupFifoStructure(); + } + LOG(logINFO) << "Number of Digital Samples: " << numberOfDigitalSamples; + LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); +} + +uint32_t Implementation::getCounterMask() const { return counterMask; } + +void Implementation::setCounterMask(const uint32_t i) { + if (counterMask != i) { + int ncounters = __builtin_popcount(i); + if (ncounters < 1 || ncounters > 3) { + throw sls::RuntimeError("Invalid number of counters " + + std::to_string(ncounters) + + ". Expected 1-3."); + } + counterMask = i; + generalData->SetNumberofCounters(ncounters, dynamicRange, + tengigaEnable); + // to update npixelsx, npixelsy in file writer + for (const auto &it : dataProcessor) + it->SetPixelDimension(); + SetupFifoStructure(); + } + LOG(logINFO) << "Counter mask: " << sls::ToStringHex(counterMask); + int ncounters = __builtin_popcount(counterMask); + LOG(logINFO) << "Number of counters: " << ncounters; +} + +uint32_t Implementation::getDynamicRange() const { return dynamicRange; } + +void Implementation::setDynamicRange(const uint32_t i) { + if (dynamicRange != i) { + dynamicRange = i; + + if (myDetectorType == EIGER || myDetectorType == MYTHEN3) { + + if (myDetectorType == EIGER) { + generalData->SetDynamicRange(i, tengigaEnable); + } else { + int ncounters = __builtin_popcount(counterMask); + generalData->SetNumberofCounters(ncounters, i, tengigaEnable); + } + + // to update npixelsx, npixelsy in file writer + for (const auto &it : dataProcessor) + it->SetPixelDimension(); + fifoDepth = generalData->defaultFifoDepth; + SetupFifoStructure(); + } + } + LOG(logINFO) << "Dynamic Range: " << dynamicRange; +} + +slsDetectorDefs::ROI Implementation::getROI() const { return roi; } + +void Implementation::setROI(slsDetectorDefs::ROI arg) { + if (roi.xmin != arg.xmin || roi.xmax != arg.xmax) { + roi.xmin = arg.xmin; + roi.xmax = arg.xmax; + + // only for gotthard + generalData->SetROI(arg); + framesPerFile = generalData->maxFramesPerFile; + for (const auto &it : dataProcessor) + it->SetPixelDimension(); + SetupFifoStructure(); + } + + LOG(logINFO) << "ROI: [" << roi.xmin << ", " << roi.xmax << "]"; + LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); +} + +bool Implementation::getTenGigaEnable() const { return tengigaEnable; } + +void Implementation::setTenGigaEnable(const bool b) { + if (tengigaEnable != b) { + tengigaEnable = b; + int ncounters = __builtin_popcount(counterMask); + // side effects + switch (myDetectorType) { + case EIGER: + generalData->SetTenGigaEnable(b, dynamicRange); + break; + case MYTHEN3: + generalData->SetNumberofCounters(ncounters, dynamicRange, b); + break; + case MOENCH: + case CHIPTESTBOARD: + ctbAnalogDataBytes = generalData->setImageSize( + tengigaEnable ? adcEnableMaskTenGiga : adcEnableMaskOneGiga, + numberOfAnalogSamples, numberOfDigitalSamples, tengigaEnable, + readoutType); + break; + default: + break; + } + SetupFifoStructure(); + } + LOG(logINFO) << "Ten Giga: " << (tengigaEnable ? "enabled" : "disabled"); + LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); +} + +int Implementation::getFlippedDataX() const { return flippedDataX; } + +void Implementation::setFlippedDataX(int enable) { + flippedDataX = (enable == 0) ? 0 : 1; + + if (!quadEnable) { + for (const auto &it : dataStreamer) { + it->SetFlippedDataX(flippedDataX); + } + } else { + if (dataStreamer.size() == 2) { + dataStreamer[0]->SetFlippedDataX(0); + dataStreamer[1]->SetFlippedDataX(1); + } + } + LOG(logINFO) << "Flipped Data X: " << flippedDataX; +} + +bool Implementation::getQuad() const { return quadEnable; } + +void Implementation::setQuad(const bool b) { + if (quadEnable != b) { + quadEnable = b; + + if (!quadEnable) { + for (const auto &it : dataStreamer) { + it->SetNumberofDetectors(numDet); + it->SetFlippedDataX(flippedDataX); + } + } else { + int size[2] = {1, 2}; + for (const auto &it : dataStreamer) { + it->SetNumberofDetectors(size); + } + if (dataStreamer.size() == 2) { + dataStreamer[0]->SetFlippedDataX(0); + dataStreamer[1]->SetFlippedDataX(1); + } + } + } + LOG(logINFO) << "Quad Enable: " << quadEnable; +} + +bool Implementation::getActivate() const { return activated; } + +bool Implementation::setActivate(bool enable) { + if (activated != enable) { + activated = enable; + // disable file writing if deactivated and no padding + for (unsigned int i = 0; i < dataProcessor.size(); ++i) { + dataProcessor[i]->SetupFileWriter( + fileFormatType, fileWriteEnable, masterFileWriteEnable, + activated, deactivatedPaddingEnable, (int *)numDet, + &framesPerFile, &fileName, &filePath, &fileIndex, + &overwriteEnable, &modulePos, &numThreads, &numberOfTotalFrames, + &dynamicRange, &udpPortNum[i], generalData); + } + } + + LOG(logINFO) << "Activation: " << (activated ? "enabled" : "disabled"); + return activated; +} + +bool Implementation::getDeactivatedPadding() const { + return deactivatedPaddingEnable; +} + +void Implementation::setDeactivatedPadding(bool enable) { + if (deactivatedPaddingEnable != enable) { + deactivatedPaddingEnable = enable; + // disable file writing if deactivated and no padding + for (unsigned int i = 0; i < dataProcessor.size(); ++i) { + dataProcessor[i]->SetupFileWriter( + fileFormatType, fileWriteEnable, masterFileWriteEnable, + activated, deactivatedPaddingEnable, (int *)numDet, + &framesPerFile, &fileName, &filePath, &fileIndex, + &overwriteEnable, &modulePos, &numThreads, &numberOfTotalFrames, + &dynamicRange, &udpPortNum[i], generalData); + } + } + LOG(logINFO) << "Deactivated Padding Enable: " + << (deactivatedPaddingEnable ? "enabled" : "disabled"); +} + +int Implementation::getReadNLines() const { return numLinesReadout; } + +void Implementation::setReadNLines(const int value) { + numLinesReadout = value; + LOG(logINFO) << "Number of Lines to readout: " << numLinesReadout; +} + +void Implementation::setThresholdEnergy(const int value) { + thresholdEnergyeV = value; + LOG(logINFO) << "Threshold Energy: " << thresholdEnergyeV << " eV"; +} + +void Implementation::setThresholdEnergy(const std::array value) { + thresholdAllEnergyeV = value; + LOG(logINFO) << "Threshold Energy (eV): " + << sls::ToString(thresholdAllEnergyeV); +} + +void Implementation::setRateCorrections(const std::vector &t) { + rateCorrections = t; + LOG(logINFO) << "Rate Corrections: " << sls::ToString(rateCorrections); +} + +slsDetectorDefs::readoutMode Implementation::getReadoutMode() const { + return readoutType; +} + +void Implementation::setReadoutMode(const readoutMode f) { + if (readoutType != f) { + readoutType = f; + + // side effects + ctbAnalogDataBytes = generalData->setImageSize( + tengigaEnable ? adcEnableMaskTenGiga : adcEnableMaskOneGiga, + numberOfAnalogSamples, numberOfDigitalSamples, tengigaEnable, + readoutType); + for (const auto &it : dataProcessor) + it->SetPixelDimension(); + SetupFifoStructure(); + } + LOG(logINFO) << "Readout Mode: " << sls::ToString(f); + LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); +} + +uint32_t Implementation::getADCEnableMask() const { + return adcEnableMaskOneGiga; +} + +void Implementation::setADCEnableMask(uint32_t mask) { + if (adcEnableMaskOneGiga != mask) { + adcEnableMaskOneGiga = mask; + ctbAnalogDataBytes = generalData->setImageSize( + tengigaEnable ? adcEnableMaskTenGiga : adcEnableMaskOneGiga, + numberOfAnalogSamples, numberOfDigitalSamples, tengigaEnable, + readoutType); + + for (const auto &it : dataProcessor) + it->SetPixelDimension(); + SetupFifoStructure(); + } + LOG(logINFO) << "ADC Enable Mask for 1Gb mode: 0x" << std::hex + << adcEnableMaskOneGiga << std::dec; + LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); +} + +uint32_t Implementation::getTenGigaADCEnableMask() const { + return adcEnableMaskTenGiga; +} + +void Implementation::setTenGigaADCEnableMask(uint32_t mask) { + if (adcEnableMaskTenGiga != mask) { + adcEnableMaskTenGiga = mask; + + ctbAnalogDataBytes = generalData->setImageSize( + tengigaEnable ? adcEnableMaskTenGiga : adcEnableMaskOneGiga, + numberOfAnalogSamples, numberOfDigitalSamples, tengigaEnable, + readoutType); + + for (const auto &it : dataProcessor) + it->SetPixelDimension(); + SetupFifoStructure(); + } + LOG(logINFO) << "ADC Enable Mask for 10Gb mode: 0x" << std::hex + << adcEnableMaskTenGiga << std::dec; + LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); +} + +std::vector Implementation::getDbitList() const { return ctbDbitList; } + +void Implementation::setDbitList(const std::vector &v) { ctbDbitList = v; } + +int Implementation::getDbitOffset() const { return ctbDbitOffset; } + +void Implementation::setDbitOffset(const int s) { ctbDbitOffset = s; } + +/************************************************** + * * + * Callbacks * + * * + * ************************************************/ +void Implementation::registerCallBackStartAcquisition( + int (*func)(std::string, std::string, uint64_t, uint32_t, void *), + void *arg) { + startAcquisitionCallBack = func; + pStartAcquisition = arg; +} + +void Implementation::registerCallBackAcquisitionFinished(void (*func)(uint64_t, + void *), + void *arg) { + acquisitionFinishedCallBack = func; + pAcquisitionFinished = arg; +} + +void Implementation::registerCallBackRawDataReady( + void (*func)(char *, char *, uint32_t, void *), void *arg) { + rawDataReadyCallBack = func; + pRawDataReady = arg; + for (const auto &it : dataProcessor) + it->registerCallBackRawDataReady(rawDataReadyCallBack, pRawDataReady); +} + +void Implementation::registerCallBackRawDataModifyReady( + void (*func)(char *, char *, uint32_t &, void *), void *arg) { + rawDataModifyReadyCallBack = func; + pRawDataReady = arg; + for (const auto &it : dataProcessor) + it->registerCallBackRawDataModifyReady(rawDataModifyReadyCallBack, + pRawDataReady); +}