From 89e293cb5ab4cc21b357e1ca463c46eee252d19a Mon Sep 17 00:00:00 2001 From: Dhanya Thattil <33750417+thattil@users.noreply.github.com> Date: Fri, 5 Aug 2022 09:08:18 +0200 Subject: [PATCH] Rxpointers (#504) * gui message doesnt show if it has a '>' symbol in error msg * minor refactoring for readability (size_t calc fifo size) * refactoring listening udp socket code: activated and datastream dont create udp sockets anyway, rc<=- should be discarded in any case * wip * refactoring memory structure access * wip: bugfix write header + data to binary * wip * wip * wip * wip * wip * wip * wip * wip * wip * portRoi no roi effecto on progress * fail at receiver progress, wip * segfaults for char pointer in struct * reference to header to get header and data * refactoring * use const defined for size of header of fifo * updated release notes * remove pointer in callback for sls_receiver_header pointer * rx same name arguments in constructors * rx: same name arguments in constructor * rx: removing the '_' suffix in class data members * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * diff undo for clang later * wip * Wip * const string& --- slsReceiverSoftware/src/BinaryDataFile.cpp | 10 +- slsReceiverSoftware/src/BinaryDataFile.h | 9 +- slsReceiverSoftware/src/DataProcessor.cpp | 108 ++++-- slsReceiverSoftware/src/DataProcessor.h | 46 +-- slsReceiverSoftware/src/DataStreamer.cpp | 60 +-- slsReceiverSoftware/src/DataStreamer.h | 23 +- slsReceiverSoftware/src/File.h | 10 +- slsReceiverSoftware/src/GeneralData.h | 54 +-- slsReceiverSoftware/src/HDF5DataFile.cpp | 9 +- slsReceiverSoftware/src/HDF5DataFile.h | 8 +- slsReceiverSoftware/src/Implementation.cpp | 431 ++++++++++----------- slsReceiverSoftware/src/Implementation.h | 17 +- slsReceiverSoftware/src/Listener.cpp | 157 ++++---- slsReceiverSoftware/src/Listener.h | 63 ++- 14 files changed, 491 insertions(+), 514 deletions(-) diff --git a/slsReceiverSoftware/src/BinaryDataFile.cpp b/slsReceiverSoftware/src/BinaryDataFile.cpp index a549819be..e13bbeada 100644 --- a/slsReceiverSoftware/src/BinaryDataFile.cpp +++ b/slsReceiverSoftware/src/BinaryDataFile.cpp @@ -20,22 +20,17 @@ void BinaryDataFile::CloseFile() { } void BinaryDataFile::CreateFirstBinaryDataFile( - const std::string fPath, const std::string fNamePrefix, + const std::string& fNamePrefix, const uint64_t fIndex, const bool ovEnable, const bool sMode, - const int modulePos, const int nUnitsPerReadout, const uint32_t uPortNumber, const uint32_t mFramesPerFile) { subFileIndex = 0; numFramesInFile = 0; - filePath = fPath; fileNamePrefix = fNamePrefix; fileIndex = fIndex; overWriteEnable = ovEnable; - silentMode = sMode; - detIndex = modulePos; - numUnitsPerReadout = nUnitsPerReadout; udpPortNumber = uPortNumber; maxFramesPerFile = mFramesPerFile; @@ -46,8 +41,7 @@ void BinaryDataFile::CreateFile() { numFramesInFile = 0; std::ostringstream os; - os << filePath << "/" << fileNamePrefix << "_d" - << (detIndex * numUnitsPerReadout + index) << "_f" << subFileIndex + os << fileNamePrefix << "_f" << subFileIndex << '_' << fileIndex << ".raw"; fileName = os.str(); diff --git a/slsReceiverSoftware/src/BinaryDataFile.h b/slsReceiverSoftware/src/BinaryDataFile.h index 7b5884e3c..50b002ad7 100644 --- a/slsReceiverSoftware/src/BinaryDataFile.h +++ b/slsReceiverSoftware/src/BinaryDataFile.h @@ -14,12 +14,10 @@ class BinaryDataFile : private virtual slsDetectorDefs, public File { fileFormat GetFileFormat() const override; void CloseFile() override; - void CreateFirstBinaryDataFile(const std::string fPath, - const std::string fNamePrefix, + void CreateFirstBinaryDataFile(const std::string& fNamePrefix, const uint64_t fIndex, const bool ovEnable, - const bool sMode, const int modulePos, - const int nUnitsPerReadout, + const bool sMode, const uint32_t uPortNumber, const uint32_t mFramesPerFile) override; @@ -34,13 +32,10 @@ class BinaryDataFile : private virtual slsDetectorDefs, public File { uint32_t numFramesInFile{0}; uint32_t subFileIndex{0}; - std::string filePath; std::string fileNamePrefix; uint64_t fileIndex{0}; bool overWriteEnable{false}; bool silentMode{false}; - int detIndex{0}; - int numUnitsPerReadout{0}; uint32_t udpPortNumber{0}; uint32_t maxFramesPerFile{0}; }; diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index db558732a..b2afa47b4 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -28,8 +28,8 @@ namespace sls { const std::string DataProcessor::typeName = "DataProcessor"; -DataProcessor::DataProcessor(int index, detectorType detType, Fifo *fifo, bool *dataStreamEnable, uint32_t *streamingFrequency, uint32_t *streamingTimerInMs, uint32_t *streamingStartFnum, bool *framePadding, std::vector *ctbDbitList, int *ctbDbitOffset, int *ctbAnalogDataBytes) - : ThreadObject(index, typeName), fifo(fifo), detType(detType), dataStreamEnable(dataStreamEnable), streamingFrequency(streamingFrequency), streamingTimerInMs(streamingTimerInMs), streamingStartFnum(streamingStartFnum), framePadding(framePadding), ctbDbitList(ctbDbitList), ctbDbitOffset(ctbDbitOffset), ctbAnalogDataBytes(ctbAnalogDataBytes) { +DataProcessor::DataProcessor(int index) + : ThreadObject(index, typeName) { LOG(logDEBUG) << "DataProcessor " << index << " created"; } @@ -38,7 +38,11 @@ DataProcessor::~DataProcessor() { DeleteFiles(); } bool DataProcessor::GetStartedFlag() const { return startedFlag; } -void DataProcessor::SetFifo(Fifo *fifo) { fifo = fifo; } +void DataProcessor::SetFifo(Fifo *f) { fifo = f; } + +void DataProcessor::SetGeneralData(GeneralData *g) { + generalData = g; +} void DataProcessor::SetActivate(bool enable) { activated = enable; } @@ -48,6 +52,30 @@ void DataProcessor::SetReceiverROI(ROI roi) { receiverNoRoi = receiverRoi.noRoi(); } +void DataProcessor::SetDataStreamEnable(bool enable) { dataStreamEnable = enable; } + +void DataProcessor::SetStreamingFrequency(uint32_t value) { + streamingFrequency = value; +} + +void DataProcessor::SetStreamingTimerInMs(uint32_t value) { + streamingTimerInMs = value; +} + +void DataProcessor::SetStreamingStartFnum(uint32_t value) { + streamingStartFnum = value; +} + +void DataProcessor::SetFramePadding(bool enable) { framePadding = enable; } + +void DataProcessor::SetCtbDbitList(std::vector value) { + ctbDbitList = value; +} + +void DataProcessor::SetCtbDbitOffset(int value) { + ctbDbitOffset = value; +} + void DataProcessor::ResetParametersforNewAcquisition() { StopRunning(); startedFlag = false; @@ -67,10 +95,6 @@ void DataProcessor::RecordFirstIndex(uint64_t fnum) { LOG(logDEBUG1) << index << " First Index:" << firstIndex; } -void DataProcessor::SetGeneralData(GeneralData *g) { - generalData = g; -} - void DataProcessor::CloseFiles() { if (dataFile) dataFile->CloseFile(); @@ -103,11 +127,10 @@ void DataProcessor::SetupFileWriter(const bool filewriteEnable, } void DataProcessor::CreateFirstFiles( - const std::string &filePath, const std::string &fileNamePrefix, + const std::string &fileNamePrefix, const uint64_t fileIndex, const bool overWriteEnable, const bool silentMode, - const int modulePos, const int numUnitsPerReadout, - const uint32_t udpPortNumber, const uint32_t maxFramesPerFile, - const uint64_t numImages, const uint32_t dynamicRange, + const uint32_t udpPortNumber, + const uint64_t numImages, const bool detectorDataStream) { if (dataFile == nullptr) { throw RuntimeError("file object not contstructed"); @@ -134,15 +157,15 @@ void DataProcessor::CreateFirstFiles( #ifdef HDF5C case HDF5: dataFile->CreateFirstHDF5DataFile( - filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode, - modulePos, numUnitsPerReadout, udpPortNumber, maxFramesPerFile, - numImages, nx, ny, dynamicRange); + fileNamePrefix, fileIndex, overWriteEnable, silentMode, + udpPortNumber, generalData->framesPerFile, + numImages, nx, ny, generalData->dynamicRange); break; #endif case BINARY: dataFile->CreateFirstBinaryDataFile( - filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode, - modulePos, numUnitsPerReadout, udpPortNumber, maxFramesPerFile); + fileNamePrefix, fileIndex, overWriteEnable, silentMode, + udpPortNumber, generalData->framesPerFile); break; default: throw RuntimeError("Unknown file format (compile with hdf5 flags"); @@ -161,9 +184,9 @@ uint32_t DataProcessor::GetFilesInAcquisition() const { std::string DataProcessor::CreateVirtualFile( const std::string &filePath, const std::string &fileNamePrefix, const uint64_t fileIndex, const bool overWriteEnable, const bool silentMode, - const int modulePos, const int numUnitsPerReadout, - const uint32_t maxFramesPerFile, const uint64_t numImages, - const int numModX, const int numModY, const uint32_t dynamicRange, + const int modulePos, + const uint64_t numImages, + const int numModX, const int numModY, std::mutex *hdf5LibMutex) { if (receiverRoiEnabled) { @@ -171,12 +194,12 @@ std::string DataProcessor::CreateVirtualFile( } bool gotthard25um = - ((detType == GOTTHARD || detType == GOTTHARD2) && + ((generalData->detType == GOTTHARD || generalData->detType == GOTTHARD2) && (numModX * numModY) == 2); - // maxframesperfile = 0 for infinite files + // 0 for infinite files uint32_t framesPerFile = - ((maxFramesPerFile == 0) ? numFramesCaught : maxFramesPerFile); + ((generalData->framesPerFile == 0) ? numFramesCaught : generalData->framesPerFile); // TODO: assumption 1: create virtual file even if no data in other // files (they exist anyway) assumption2: virtual file max frame index @@ -184,8 +207,8 @@ std::string DataProcessor::CreateVirtualFile( // stop acquisition) return masterFileUtility::CreateVirtualHDF5File( filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode, - modulePos, numUnitsPerReadout, framesPerFile, - generalData->nPixelsX, generalData->nPixelsY, dynamicRange, + modulePos, generalData->numUDPInterfaces, framesPerFile, + generalData->nPixelsX, generalData->nPixelsY, generalData->dynamicRange, numFramesCaught, numModX, numModY, dataFile->GetPDataType(), dataFile->GetParameterNames(), dataFile->GetParameterDataTypes(), hdf5LibMutex, gotthard25um); @@ -273,7 +296,7 @@ void DataProcessor::StopProcessing(char *buf) { LOG(logDEBUG1) << "DataProcessing " << index << ": Dummy"; // stream or free - if (*dataStreamEnable) + if (dataStreamEnable) fifo->PushAddressToStream(buf); else fifo->FreeAddress(buf); @@ -292,29 +315,29 @@ void DataProcessor::ProcessAnImage(sls_receiver_header & header, size_t &size, s if (!startedFlag) { RecordFirstIndex(fnum); - if (*dataStreamEnable) { + if (dataStreamEnable) { // restart timer clock_gettime(CLOCK_REALTIME, &timerbegin); - timerbegin.tv_sec -= (*streamingTimerInMs) / 1000; - timerbegin.tv_nsec -= ((*streamingTimerInMs) % 1000) * 1000000; + timerbegin.tv_sec -= streamingTimerInMs / 1000; + timerbegin.tv_nsec -= (streamingTimerInMs % 1000) * 1000000; // to send first image - currentFreqCount = *streamingFrequency - *streamingStartFnum; + currentFreqCount = streamingFrequency - streamingStartFnum; } } // frame padding - if (*framePadding && nump < generalData->packetsPerFrame) + if (framePadding && nump < generalData->packetsPerFrame) PadMissingPackets(header, data); // rearrange ctb digital bits (if ctbDbitlist is not empty) - if (!(*ctbDbitList).empty()) { + if (!ctbDbitList.empty()) { RearrangeDbitData(size, data); } // 'stream Image' check has to be done here before crop image // stream (if time/freq to stream) or free - if (*dataStreamEnable && SendToStreamer()) { + if (dataStreamEnable && SendToStreamer()) { if (firstStreamerFrame) { firstStreamerFrame = false; // write to memory structure of first streamer frame @@ -362,7 +385,7 @@ void DataProcessor::ProcessAnImage(sls_receiver_header & header, size_t &size, s bool DataProcessor::SendToStreamer() { // skip - if ((*streamingFrequency) == 0u) { + if (streamingFrequency == 0u) { if (!CheckTimer()) return false; } else { @@ -378,7 +401,7 @@ bool DataProcessor::CheckTimer() { auto elapsed_s = (end.tv_sec - timerbegin.tv_sec) + (end.tv_nsec - timerbegin.tv_nsec) / 1e9; - double timer_s = *streamingTimerInMs / 1e3; + double timer_s = streamingTimerInMs / 1e3; LOG(logDEBUG1) << index << " Timer elapsed time:" << elapsed_s << " seconds"; @@ -393,7 +416,7 @@ bool DataProcessor::CheckTimer() { } bool DataProcessor::CheckCount() { - if (currentFreqCount == *streamingFrequency) { + if (currentFreqCount == streamingFrequency) { currentFreqCount = 1; return true; } @@ -422,7 +445,7 @@ void DataProcessor::PadMissingPackets(sls_receiver_header header, char* data) { sls_bitset pmask = header.packetsMask; uint32_t dsize = generalData->dataSize; - if (detType == GOTTHARD2 && index != 0) { + if (generalData->detType == GOTTHARD2 && index != 0) { dsize = generalData->vetoDataSize; } uint32_t corrected_dsize = @@ -443,7 +466,7 @@ void DataProcessor::PadMissingPackets(sls_receiver_header header, char* data) { << std::endl; // missing packet - switch (detType) { + switch (generalData->detType) { // for gotthard, 1st packet: 4 bytes fnum, CACA + CACA, 639*2 bytes // data // 2nd packet: 4 bytes fnum, previous 1*2 bytes data + @@ -471,8 +494,9 @@ void DataProcessor::PadMissingPackets(sls_receiver_header header, char* data) { /** ctb specific */ void DataProcessor::RearrangeDbitData(size_t & size, char *data) { + int nAnalogDataBytes = generalData->GetNumberOfAnalogDatabytes(); // TODO! (Erik) Refactor and add tests - int ctbDigitalDataBytes = size - (*ctbAnalogDataBytes) - (*ctbDbitOffset); + int ctbDigitalDataBytes = size - nAnalogDataBytes - ctbDbitOffset; // no digital data if (ctbDigitalDataBytes == 0) { @@ -485,15 +509,15 @@ void DataProcessor::RearrangeDbitData(size_t & size, char *data) { // ceil as numResult8Bits could be decimal const int numResult8Bits = - ceil((numSamples * (*ctbDbitList).size()) / 8.00); + ceil((numSamples * ctbDbitList.size()) / 8.00); std::vector result(numResult8Bits); uint8_t *dest = &result[0]; - auto *source = (uint64_t *)(data + (*ctbAnalogDataBytes) + (*ctbDbitOffset)); + auto *source = (uint64_t *)(data + nAnalogDataBytes + ctbDbitOffset); // loop through digital bit enable vector int bitoffset = 0; - for (auto bi : (*ctbDbitList)) { + for (auto bi : ctbDbitList) { // where numbits * numsamples is not a multiple of 8 if (bitoffset != 0) { bitoffset = 0; @@ -515,7 +539,7 @@ void DataProcessor::RearrangeDbitData(size_t & size, char *data) { } // copy back to memory and update size - memcpy(data + (*ctbAnalogDataBytes), result.data(), numResult8Bits * sizeof(uint8_t)); + memcpy(data + nAnalogDataBytes, result.data(), numResult8Bits * sizeof(uint8_t)); size = numResult8Bits * sizeof(uint8_t); } diff --git a/slsReceiverSoftware/src/DataProcessor.h b/slsReceiverSoftware/src/DataProcessor.h index e29a38459..e3c797d59 100644 --- a/slsReceiverSoftware/src/DataProcessor.h +++ b/slsReceiverSoftware/src/DataProcessor.h @@ -29,31 +29,36 @@ struct MasterAttributes; class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { public: - DataProcessor(int index, detectorType detType, Fifo *fifo, bool *dataStreamEnable, uint32_t *streamingFrequency, uint32_t *streamingTimerInMs, uint32_t *streamingStartFnum, bool *framePadding, std::vector *ctbDbitList, int *ctbDbitOffset, int *ctbAnalogDataBytes); + DataProcessor(int index); ~DataProcessor() override; bool GetStartedFlag() const; void SetFifo(Fifo *f); - void SetActivate(bool enable); - void SetReceiverROI(ROI roi); - void ResetParametersforNewAcquisition(); void SetGeneralData(GeneralData *generalData); + void SetActivate(bool enable); + void SetReceiverROI(ROI roi); + void SetDataStreamEnable(bool enable); + void SetStreamingFrequency(uint32_t value); + void SetStreamingTimerInMs(uint32_t value); + void SetStreamingStartFnum(uint32_t value); + void SetFramePadding(bool enable); + void SetCtbDbitList(std::vector value); + void SetCtbDbitOffset(int value); + + void ResetParametersforNewAcquisition(); void CloseFiles(); void DeleteFiles(); void SetupFileWriter(const bool filewriteEnable, const fileFormat fileFormatType, std::mutex *hdf5LibMutex); - void CreateFirstFiles(const std::string &filePath, - const std::string &fileNamePrefix, + void CreateFirstFiles(const std::string &fileNamePrefix, const uint64_t fileIndex, const bool overWriteEnable, - const bool silentMode, const int modulePos, - const int numUnitsPerReadout, + const bool silentMode, const uint32_t udpPortNumber, - const uint32_t maxFramesPerFile, - const uint64_t numImages, const uint32_t dynamicRange, + const uint64_t numImages, const bool detectorDataStream); #ifdef HDF5C uint32_t GetFilesInAcquisition() const; @@ -61,9 +66,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { const std::string &filePath, const std::string &fileNamePrefix, const uint64_t fileIndex, const bool overWriteEnable, const bool silentMode, const int modulePos, - const int numUnitsPerReadout, const uint32_t maxFramesPerFile, const uint64_t numImages, const int numModX, const int numModY, - const uint32_t dynamicRange, std::mutex *hdf5LibMutex); + std::mutex *hdf5LibMutex); void LinkFileInMaster(const std::string &masterFileName, const std::string &virtualFileName, const bool silentMode, std::mutex *hdf5LibMutex); @@ -137,25 +141,23 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { static const std::string typeName; - const GeneralData *generalData{nullptr}; + GeneralData *generalData{nullptr}; Fifo *fifo; - detectorType detType; - bool *dataStreamEnable; + bool dataStreamEnable; bool activated{false}; ROI receiverRoi{}; bool receiverRoiEnabled{false}; bool receiverNoRoi{false}; std::unique_ptr completeImageToStreamBeforeCropping; /** if 0, sending random images with a timer */ - uint32_t *streamingFrequency; - uint32_t *streamingTimerInMs; - uint32_t *streamingStartFnum; + uint32_t streamingFrequency; + uint32_t streamingTimerInMs; + uint32_t streamingStartFnum; uint32_t currentFreqCount{0}; struct timespec timerbegin {}; - bool *framePadding; - std::vector *ctbDbitList; - int *ctbDbitOffset; - int *ctbAnalogDataBytes; + bool framePadding; + std::vector ctbDbitList; + int ctbDbitOffset; std::atomic startedFlag{false}; std::atomic firstIndex{0}; diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index c95eff075..9f8f38fc4 100644 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -18,9 +18,7 @@ namespace sls { const std::string DataStreamer::TypeName = "DataStreamer"; -DataStreamer::DataStreamer(int index, Fifo *fifo, uint32_t *dynamicRange, ROI *detectorRoi, uint64_t *fileIndex, bool flipRows, slsDetectorDefs::xy numPorts, bool *quadEnable, uint64_t *totalNumFrames) - : ThreadObject(index, TypeName), fifo(fifo), dynamicRange(dynamicRange), detectorRoi(detectorRoi), fileIndex(fileIndex), flipRows(flipRows), numPorts(numPorts), quadEnable(quadEnable), totalNumFrames(totalNumFrames) { - +DataStreamer::DataStreamer(int index) : ThreadObject(index, TypeName) { LOG(logDEBUG) << "DataStreamer " << index << " created"; } @@ -31,6 +29,35 @@ DataStreamer::~DataStreamer() { void DataStreamer::SetFifo(Fifo *f) { fifo = f; } +void DataStreamer::SetGeneralData(GeneralData *g) { generalData = g; } + +void DataStreamer::SetFileIndex(uint64_t value) { + fileIndex = value; +} + +void DataStreamer::SetNumberofPorts(xy np) { numPorts = np; } + +void DataStreamer::SetFlipRows(bool fd) { + flipRows = fd; + // flip only right port of quad + if (quadEnable) { + flipRows = (index == 1 ? true : false); + } +} + +void DataStreamer::SetQuadEnable(bool value) { quadEnable = value; } + +void DataStreamer::SetNumberofTotalFrames(uint64_t value) { + nTotalFrames = value; +} + +void DataStreamer::SetAdditionalJsonHeader( + const std::map &json) { + std::lock_guard lock(additionalJsonMutex); + additionalJsonHeader = json; + isAdditionalJsonUpdated = true; +} + void DataStreamer::ResetParametersforNewAcquisition(const std::string &fname) { StopRunning(); startedFlag = false; @@ -41,8 +68,8 @@ void DataStreamer::ResetParametersforNewAcquisition(const std::string &fname) { delete[] completeBuffer; completeBuffer = nullptr; } - if (generalData->detType == GOTTHARD && detectorRoi->xmin != -1) { - adcConfigured = generalData->GetAdcConfigured(index, *detectorRoi); + if (generalData->detType == GOTTHARD && generalData->detectorRoi.xmin != -1) { + adcConfigured = generalData->GetAdcConfigured(index, generalData->detectorRoi); completeBuffer = new char[generalData->imageSizeComplete]; memset(completeBuffer, 0, generalData->imageSizeComplete); } @@ -55,20 +82,7 @@ void DataStreamer::RecordFirstIndex(uint64_t fnum, size_t firstImageIndex) { << ", First Streamer Index:" << fnum; } -void DataStreamer::SetGeneralData(GeneralData *g) { generalData = g; } - -void DataStreamer::SetNumberofPorts(xy np) { numPorts = np; } - -void DataStreamer::SetFlipRows(bool fd) { flipRows = fd; } - -void DataStreamer::SetAdditionalJsonHeader( - const std::map &json) { - std::lock_guard lock(additionalJsonMutex); - additionalJsonHeader = json; - isAdditionalJsonUpdated = true; -} - -void DataStreamer::CreateZmqSockets(int *nunits, uint32_t port, +void DataStreamer::CreateZmqSockets(uint32_t port, const IpAddr ip, int hwm) { uint32_t portnum = port + index; std::string sip = ip.str(); @@ -193,8 +207,8 @@ int DataStreamer::SendDataHeader(sls_detector_header header, uint32_t size, uint64_t frameIndex = header.frameNumber - firstIndex; uint64_t acquisitionIndex = header.frameNumber; - zHeader.dynamicRange = *dynamicRange; - zHeader.fileIndex = *fileIndex; + zHeader.dynamicRange = generalData->dynamicRange; + zHeader.fileIndex = fileIndex; zHeader.ndetx = numPorts.x; zHeader.ndety = numPorts.y; zHeader.npixelsx = nx; @@ -203,7 +217,7 @@ int DataStreamer::SendDataHeader(sls_detector_header header, uint32_t size, zHeader.acqIndex = acquisitionIndex; zHeader.frameIndex = frameIndex; zHeader.progress = - 100 * ((double)(frameIndex + 1) / (double)(*totalNumFrames)); + 100 * ((double)(frameIndex + 1) / (double)(nTotalFrames)); zHeader.fname = fileNametoStream; zHeader.frameNumber = header.frameNumber; zHeader.expLength = header.expLength; @@ -219,7 +233,7 @@ int DataStreamer::SendDataHeader(sls_detector_header header, uint32_t size, zHeader.detType = header.detType; zHeader.version = header.version; zHeader.flipRows = static_cast(flipRows); - zHeader.quad = *quadEnable; + zHeader.quad = quadEnable; zHeader.completeImage = (header.packetNumber < generalData->packetsPerFrame ? false : true); diff --git a/slsReceiverSoftware/src/DataStreamer.h b/slsReceiverSoftware/src/DataStreamer.h index 2bd8c2b32..d98392bd8 100644 --- a/slsReceiverSoftware/src/DataStreamer.h +++ b/slsReceiverSoftware/src/DataStreamer.h @@ -25,26 +25,29 @@ class ZmqSocket; class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { public: - DataStreamer(int index, Fifo *fifo, uint32_t *dynamicRange, ROI *detectorRoi, uint64_t *fileIndex, bool flipRows, slsDetectorDefs::xy numPorts, bool *quadEnable, uint64_t *totalNumFrames); + DataStreamer(int index); ~DataStreamer(); void SetFifo(Fifo *f); - void ResetParametersforNewAcquisition(const std::string &fname); void SetGeneralData(GeneralData *g); + + void SetFileIndex(uint64_t value); void SetNumberofPorts(xy np); void SetFlipRows(bool fd); + void SetQuadEnable(bool value); + void SetNumberofTotalFrames(uint64_t value); void SetAdditionalJsonHeader(const std::map &json); + void ResetParametersforNewAcquisition(const std::string &fname); /** * Creates Zmq Sockets * (throws an exception if it couldnt create zmq sockets) - * @param nunits pointer to number of theads/ units per detector * @param port streaming port start index * @param ip streaming source ip * @param hwm streaming high water mark */ - void CreateZmqSockets(int *nunits, uint32_t port, const IpAddr ip, + void CreateZmqSockets(uint32_t port, const IpAddr ip, int hwm); void CloseZmqSocket(); void RestreamStop(); @@ -85,13 +88,11 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { static const std::string TypeName; const GeneralData *generalData{nullptr}; - Fifo *fifo; + Fifo *fifo{nullptr}; ZmqSocket *zmqSocket{nullptr}; - uint32_t *dynamicRange; - ROI *detectorRoi; int adcConfigured{-1}; - uint64_t *fileIndex; - bool flipRows; + uint64_t fileIndex{0}; + bool flipRows{false}; std::map additionalJsonHeader; /** Used by streamer thread to update local copy (reduce number of locks @@ -111,8 +112,8 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { char *completeBuffer{nullptr}; xy numPorts{1, 1}; - bool *quadEnable; - uint64_t *totalNumFrames; + bool quadEnable{false}; + uint64_t nTotalFrames{0}; }; } // namespace sls diff --git a/slsReceiverSoftware/src/File.h b/slsReceiverSoftware/src/File.h index 3ff76c3ae..ab2c48d0b 100644 --- a/slsReceiverSoftware/src/File.h +++ b/slsReceiverSoftware/src/File.h @@ -60,10 +60,9 @@ class File : private virtual slsDetectorDefs { }; virtual void CreateFirstHDF5DataFile( - const std::string filePath, const std::string fileNamePrefix, + const std::string& fileNamePrefix, const uint64_t fileIndex, const bool overWriteEnable, - const bool silentMode, const int modulePos, - const int numUnitsPerReadout, const uint32_t udpPortNumber, + const bool silentMode, const uint32_t udpPortNumber, const uint32_t maxFramesPerFile, const uint64_t numImages, const uint32_t nPixelsX, const uint32_t nPixelsY, const uint32_t dynamicRange) { @@ -72,10 +71,9 @@ class File : private virtual slsDetectorDefs { }; #endif virtual void CreateFirstBinaryDataFile( - const std::string filePath, const std::string fileNamePrefix, + const std::string& fileNamePrefix, const uint64_t fileIndex, const bool overWriteEnable, - const bool silentMode, const int modulePos, - const int numUnitsPerReadout, const uint32_t udpPortNumber, + const bool silentMode, const uint32_t udpPortNumber, const uint32_t maxFramesPerFile) { LOG(logERROR) << "This is a generic function CreateFirstBinaryDataFile that " "should be overloaded by a derived class"; diff --git a/slsReceiverSoftware/src/GeneralData.h b/slsReceiverSoftware/src/GeneralData.h index dc80d7403..9ee8ece51 100644 --- a/slsReceiverSoftware/src/GeneralData.h +++ b/slsReceiverSoftware/src/GeneralData.h @@ -36,9 +36,9 @@ class GeneralData { uint32_t frameIndexOffset{0}; uint32_t packetIndexMask{0}; uint32_t packetIndexOffset{0}; - uint32_t maxFramesPerFile{0}; - uint32_t defaultFifoDepth{0}; - uint32_t numUDPInterfaces{1}; + uint32_t framesPerFile{0}; + uint32_t fifoDepth{0}; + int numUDPInterfaces{1}; uint32_t headerPacketSize{0}; /** Streaming (for ROI - mainly short Gotthard) */ uint32_t nPixelsXComplete{0}; @@ -48,7 +48,7 @@ class GeneralData { uint32_t imageSizeComplete{0}; /** if standard header implemented in firmware */ bool standardheader{false}; - uint32_t defaultUdpSocketBufferSize{RECEIVE_SOCKET_BUFFER_SIZE}; + uint32_t udpSocketBufferSize{RECEIVE_SOCKET_BUFFER_SIZE}; uint32_t vetoDataSize{0}; uint32_t vetoPacketSize{0}; uint32_t vetoImageSize{0}; @@ -61,7 +61,7 @@ class GeneralData { slsDetectorDefs::readoutMode readoutType{slsDetectorDefs::ANALOG_ONLY}; uint32_t adcEnableMaskOneGiga{BIT32_MASK}; uint32_t adcEnableMaskTenGiga{BIT32_MASK}; - slsDetectorDefs::ROI roi{}; + slsDetectorDefs::ROI detectorRoi{}; uint32_t counterMask{0}; GeneralData(){}; @@ -164,7 +164,7 @@ class GotthardData : public GeneralData { detType = slsDetectorDefs::GOTTHARD; nPixelsY = 1; headerSizeinPacket = 6; - maxFramesPerFile = MAX_FRAMES_PER_FILE; + framesPerFile = MAX_FRAMES_PER_FILE; UpdateImageSize(); }; @@ -247,7 +247,7 @@ class GotthardData : public GeneralData { }; void SetDetectorROI(slsDetectorDefs::ROI i) { - roi = i; + detectorRoi = i; UpdateImageSize(); }; @@ -255,18 +255,18 @@ class GotthardData : public GeneralData { void UpdateImageSize() { // all adcs - if (roi.xmin == -1) { + if (detectorRoi.xmin == -1) { nPixelsX = 1280; dataSize = 1280; packetsPerFrame = 2; frameIndexMask = 0xFFFFFFFE; frameIndexOffset = 1; packetIndexMask = 1; - maxFramesPerFile = MAX_FRAMES_PER_FILE; + framesPerFile = MAX_FRAMES_PER_FILE; nPixelsXComplete = 0; nPixelsYComplete = 0; imageSizeComplete = 0; - defaultFifoDepth = 50000; + fifoDepth = 50000; } else { nPixelsX = 256; dataSize = 512; @@ -274,11 +274,11 @@ class GotthardData : public GeneralData { frameIndexMask = 0xFFFFFFFF; frameIndexOffset = 0; packetIndexMask = 0; - maxFramesPerFile = SHORT_MAX_FRAMES_PER_FILE; + framesPerFile = SHORT_MAX_FRAMES_PER_FILE; nPixelsXComplete = 1280; nPixelsYComplete = 1; imageSizeComplete = 1280 * 2; - defaultFifoDepth = 75000; + fifoDepth = 75000; } imageSize = int(nPixelsX * nPixelsY * GetPixelDepth()); packetSize = headerSizeinPacket + dataSize; @@ -292,7 +292,7 @@ class EigerData : public GeneralData { EigerData() { detType = slsDetectorDefs::EIGER; headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header); - maxFramesPerFile = EIGER_MAX_FRAMES_PER_FILE; + framesPerFile = EIGER_MAX_FRAMES_PER_FILE; numUDPInterfaces = 2; headerPacketSize = 40; standardheader = true; @@ -318,7 +318,7 @@ class EigerData : public GeneralData { packetSize = headerSizeinPacket + dataSize; imageSize = int(nPixelsX * nPixelsY * GetPixelDepth()); packetsPerFrame = imageSize / dataSize; - defaultFifoDepth = (dynamicRange == 32 ? 100 : 1000); + fifoDepth = (dynamicRange == 32 ? 100 : 1000); }; }; @@ -330,8 +330,8 @@ class JungfrauData : public GeneralData { headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header); dataSize = 8192; packetSize = headerSizeinPacket + dataSize; - maxFramesPerFile = JFRAU_MAX_FRAMES_PER_FILE; - defaultFifoDepth = 2500; + framesPerFile = JFRAU_MAX_FRAMES_PER_FILE; + fifoDepth = 2500; standardheader = true; maxRowsPerReadout = 512; UpdateImageSize(); @@ -348,7 +348,7 @@ class JungfrauData : public GeneralData { nPixelsY = (256 * 2) / numUDPInterfaces; imageSize = int(nPixelsX * nPixelsY * GetPixelDepth()); packetsPerFrame = imageSize / dataSize; - defaultUdpSocketBufferSize = (1000 * 1024 * 1024) / numUDPInterfaces; + udpSocketBufferSize = (1000 * 1024 * 1024) / numUDPInterfaces; }; }; @@ -362,10 +362,10 @@ class Mythen3Data : public GeneralData { detType = slsDetectorDefs::MYTHEN3; nPixelsY = 1; headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header); - maxFramesPerFile = MYTHEN3_MAX_FRAMES_PER_FILE; - defaultFifoDepth = 50000; + framesPerFile = MYTHEN3_MAX_FRAMES_PER_FILE; + fifoDepth = 50000; standardheader = true; - defaultUdpSocketBufferSize = (1000 * 1024 * 1024); + udpSocketBufferSize = (1000 * 1024 * 1024); dynamicRange = 32; tengigaEnable = true; SetCounterMask(0x7); @@ -432,8 +432,8 @@ class Gotthard2Data : public GeneralData { nPixelsY = 1; headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header); dataSize = 2560; // 1280 channels * 2 bytes - maxFramesPerFile = GOTTHARD2_MAX_FRAMES_PER_FILE; - defaultFifoDepth = 50000; + framesPerFile = GOTTHARD2_MAX_FRAMES_PER_FILE; + fifoDepth = 50000; standardheader = true; vetoDataSize = 160; vetoHsize = 16; @@ -469,7 +469,7 @@ class Gotthard2Data : public GeneralData { packetsPerFrame = imageSize / dataSize; vetoPacketSize = vetoHsize + vetoDataSize; vetoImageSize = vetoDataSize * packetsPerFrame; - defaultUdpSocketBufferSize = (1000 * 1024 * 1024) / numUDPInterfaces; + udpSocketBufferSize = (1000 * 1024 * 1024) / numUDPInterfaces; }; }; @@ -488,8 +488,8 @@ class ChipTestBoardData : public GeneralData { frameIndexMask = 0xFFFFFF; // 10g frameIndexOffset = 8; // 10g packetIndexMask = 0xFF; // 10g - maxFramesPerFile = CTB_MAX_FRAMES_PER_FILE; - defaultFifoDepth = 2500; + framesPerFile = CTB_MAX_FRAMES_PER_FILE; + fifoDepth = 2500; standardheader = true; UpdateImageSize(); }; @@ -575,8 +575,8 @@ class MoenchData : public GeneralData { detType = slsDetectorDefs::MOENCH; headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header); frameIndexMask = 0xFFFFFF; - maxFramesPerFile = MOENCH_MAX_FRAMES_PER_FILE; - defaultFifoDepth = 2500; + framesPerFile = MOENCH_MAX_FRAMES_PER_FILE; + fifoDepth = 2500; standardheader = true; UpdateImageSize(); }; diff --git a/slsReceiverSoftware/src/HDF5DataFile.cpp b/slsReceiverSoftware/src/HDF5DataFile.cpp index ae11eaf37..bf4c7d0c1 100644 --- a/slsReceiverSoftware/src/HDF5DataFile.cpp +++ b/slsReceiverSoftware/src/HDF5DataFile.cpp @@ -90,9 +90,8 @@ void HDF5DataFile::CloseFile() { } void HDF5DataFile::CreateFirstHDF5DataFile( - const std::string fPath, const std::string fNamePrefix, + const std::string& fNamePrefix, const uint64_t fIndex, const bool owEnable, const bool sMode, - const int modulePos, const int nUnitsPerReadout, const uint32_t uPortNumber, const uint32_t mFramesPerFile, const uint64_t nImages, const uint32_t nX, const uint32_t nY, const uint32_t dr) { @@ -108,13 +107,10 @@ void HDF5DataFile::CreateFirstHDF5DataFile( nPixelsY = nY; dynamicRange = dr; - filePath = fPath; fileNamePrefix = fNamePrefix; fileIndex = fIndex; overWriteEnable = owEnable; silentMode = sMode; - detIndex = modulePos; - numUnitsPerReadout = nUnitsPerReadout; udpPortNumber = uPortNumber; switch (dynamicRange) { @@ -138,8 +134,7 @@ void HDF5DataFile::CreateFile() { numFilesInAcquisition++; std::ostringstream os; - os << filePath << "/" << fileNamePrefix << "_d" - << (detIndex * numUnitsPerReadout + index) << "_f" << subFileIndex + os << fileNamePrefix << "_f" << subFileIndex << '_' << fileIndex << ".h5"; fileName = os.str(); diff --git a/slsReceiverSoftware/src/HDF5DataFile.h b/slsReceiverSoftware/src/HDF5DataFile.h index 6135bb211..dcb3bb635 100644 --- a/slsReceiverSoftware/src/HDF5DataFile.h +++ b/slsReceiverSoftware/src/HDF5DataFile.h @@ -24,10 +24,9 @@ class HDF5DataFile : private virtual slsDetectorDefs, public File { void CloseFile() override; void CreateFirstHDF5DataFile( - const std::string fPath, const std::string fNamePrefix, + const std::string& fNamePrefix, const uint64_t fIndex, const bool owEnable, - const bool sMode, const int modulePos, - const int nUnitsPerReadout, const uint32_t uPortNumber, + const bool sMode, const uint32_t uPortNumber, const uint32_t mFramesPerFile, const uint64_t nImages, const uint32_t nX, const uint32_t nY, const uint32_t dr) override; @@ -65,13 +64,10 @@ class HDF5DataFile : private virtual slsDetectorDefs, public File { uint32_t nPixelsY{0}; uint32_t dynamicRange{0}; - std::string filePath; std::string fileNamePrefix; uint64_t fileIndex{0}; bool overWriteEnable{false}; bool silentMode{false}; - int detIndex{0}; - int numUnitsPerReadout{0}; uint32_t udpPortNumber{0}; static const int EIGER_NUM_PIXELS{256 * 2 * 256}; diff --git a/slsReceiverSoftware/src/Implementation.cpp b/slsReceiverSoftware/src/Implementation.cpp index 19f6b1c89..680f74ede 100644 --- a/slsReceiverSoftware/src/Implementation.cpp +++ b/slsReceiverSoftware/src/Implementation.cpp @@ -66,20 +66,20 @@ void Implementation::SetThreadPriorities() { void Implementation::SetupFifoStructure() { fifo.clear(); - for (int i = 0; i < numUDPInterfaces; ++i) { + for (int i = 0; i < generalData->numUDPInterfaces; ++i) { size_t datasize = generalData->imageSize; // veto data size - if (detType == GOTTHARD2 && i != 0) { + if (generalData->detType == GOTTHARD2 && i != 0) { datasize = generalData->vetoImageSize; } datasize += IMAGE_STRUCTURE_HEADER_SIZE; // create fifo structure try { - fifo.push_back(sls::make_unique(i, datasize, fifoDepth)); + fifo.push_back(sls::make_unique(i, datasize, generalData->fifoDepth)); } catch (...) { fifo.clear(); - fifoDepth = 0; + generalData->fifoDepth = 0; throw RuntimeError( "Could not allocate memory for fifo structure " + std::to_string(i) + ". FifoDepth is now 0."); @@ -93,11 +93,11 @@ void Implementation::SetupFifoStructure() { dataStreamer[i]->SetFifo(fifo[i].get()); LOG(logINFO) << "Memory Allocated for Fifo " << i << ": " - << (double)(datasize * (size_t)fifoDepth) / + << (double)(datasize * (size_t)generalData->fifoDepth) / (double)(1024 * 1024) << " MB"; } - LOG(logINFO) << numUDPInterfaces << " Fifo structure(s) reconstructed"; + LOG(logINFO) << generalData->numUDPInterfaces << " Fifo structure(s) reconstructed"; } /************************************************** @@ -108,8 +108,7 @@ void Implementation::SetupFifoStructure() { void Implementation::setDetectorType(const detectorType d) { - detType = d; - switch (detType) { + switch (d) { case GOTTHARD: case EIGER: case JUNGFRAU: @@ -128,7 +127,7 @@ void Implementation::setDetectorType(const detectorType d) { generalData = nullptr; // set detector specific variables - switch (detType) { + switch (d) { case GOTTHARD: generalData = new GotthardData(); break; @@ -154,40 +153,18 @@ void Implementation::setDetectorType(const detectorType d) { break; } - framesPerFile = generalData->maxFramesPerFile; - fifoDepth = generalData->defaultFifoDepth; - numUDPInterfaces = generalData->numUDPInterfaces; - udpSocketBufferSize = generalData->defaultUdpSocketBufferSize; - dynamicRange = generalData->dynamicRange; - tengigaEnable = generalData->tengigaEnable; - numberOfAnalogSamples = generalData->nAnalogSamples; - numberOfDigitalSamples = generalData->nDigitalSamples; - readoutType = generalData->readoutType; - adcEnableMaskOneGiga = generalData->adcEnableMaskOneGiga; - adcEnableMaskTenGiga = generalData->adcEnableMaskTenGiga; - detectorRoi = generalData->roi; - counterMask = generalData->counterMask; - SetLocalNetworkParameters(); SetupFifoStructure(); // create threads - for (int i = 0; i < numUDPInterfaces; ++i) { + for (int i = 0; i < generalData->numUDPInterfaces; ++i) { try { - auto fifo_ptr = fifo[i].get(); listener.push_back(sls::make_unique( - i, fifo_ptr, &status, &udpPortNum[i], ð[i], - &udpSocketBufferSize, &actualUDPSocketBufferSize, - &framesPerFile, &frameDiscardMode, &silentMode)); - int ctbAnalogDataBytes = 0; - if (detType == CHIPTESTBOARD) { - ctbAnalogDataBytes = generalData->GetNumberOfAnalogDatabytes(); - } - dataProcessor.push_back(sls::make_unique( - i, detType, fifo_ptr, &dataStreamEnable, &streamingFrequency, - &streamingTimerInMs, &streamingStartFnum, &framePadding, - &ctbDbitList, &ctbDbitOffset, &ctbAnalogDataBytes)); + i, &status)); + SetupListener(i); + dataProcessor.push_back(sls::make_unique(i)); + SetupDataProcessor(i); } catch (...) { listener.clear(); dataProcessor.clear(); @@ -197,31 +174,59 @@ void Implementation::setDetectorType(const detectorType d) { } } - // set up writer and callbacks - for (int i = 0; i != (int)listener.size(); ++i) { - listener[i]->SetGeneralData(generalData); - listener[i]->SetActivate(activated); - listener[i]->SetDetectorDatastream(detectorDataStream[i]); - } - for (const auto &it : dataProcessor) { - it->SetGeneralData(generalData); - it->SetActivate(activated); - } SetThreadPriorities(); LOG(logDEBUG) << " Detector type set to " << ToString(d); } +void Implementation::SetupListener(int i) { + listener[i]->SetFifo(fifo[i].get()); + listener[i]->SetGeneralData(generalData); + listener[i]->SetUdpPortNumber(udpPortNum[i]); + listener[i]->SetEthernetInterface(eth[i]); + listener[i]->SetActivate(activated); + listener[i]->SetNoRoi(portRois[i].noRoi()); + listener[i]->SetDetectorDatastream(detectorDataStream[i]); + listener[i]->SetFrameDiscardPolicy(frameDiscardMode); + listener[i]->SetSilentMode(silentMode); +} + +void Implementation::SetupDataProcessor(int i) { + dataProcessor[i]->SetFifo(fifo[i].get()); + dataProcessor[i]->SetGeneralData(generalData); + dataProcessor[i]->SetActivate(activated); + dataProcessor[i]->SetReceiverROI(portRois[i]); + dataProcessor[i]->SetDataStreamEnable(dataStreamEnable); + dataProcessor[i]->SetStreamingFrequency(streamingFrequency); + dataProcessor[i]->SetStreamingTimerInMs(streamingTimerInMs); + dataProcessor[i]->SetStreamingStartFnum(streamingStartFnum); + dataProcessor[i]->SetFramePadding(framePadding); + dataProcessor[i]->SetCtbDbitList(ctbDbitList); + dataProcessor[i]->SetCtbDbitOffset(ctbDbitOffset); +} + +void Implementation::SetupDataStreamer(int i) { + dataStreamer[i]->SetFifo(fifo[i].get()); + dataStreamer[i]->SetGeneralData(generalData); + dataStreamer[i]->CreateZmqSockets(streamingPort, streamingSrcIP, streamingHwm); + dataStreamer[i]->SetAdditionalJsonHeader(additionalJsonHeader); + dataStreamer[i]->SetFileIndex(fileIndex); + dataStreamer[i]->SetFlipRows(flipRows); + dataStreamer[i]->SetNumberofPorts(numPorts); + dataStreamer[i]->SetQuadEnable(quadEnable); + dataStreamer[i]->SetNumberofTotalFrames(numberOfTotalFrames); +} + slsDetectorDefs::xy Implementation::getDetectorSize() const { return numModules; } const slsDetectorDefs::xy Implementation::GetPortGeometry() const { xy portGeometry{1, 1}; - if (detType == EIGER) - portGeometry.x = numUDPInterfaces; - else if (detType == JUNGFRAU) - portGeometry.y = numUDPInterfaces; + if (generalData->detType == EIGER) + portGeometry.x = generalData->numUDPInterfaces; + else if (generalData->detType == JUNGFRAU) + portGeometry.y = generalData->numUDPInterfaces; return portGeometry; } @@ -283,14 +288,16 @@ bool Implementation::getSilentMode() const { return silentMode; } void Implementation::setSilentMode(const bool i) { silentMode = i; + for (const auto &it : listener) + it->SetSilentMode(silentMode); LOG(logINFO) << "Silent Mode: " << i; } -uint32_t Implementation::getFifoDepth() const { return fifoDepth; } +uint32_t Implementation::getFifoDepth() const { return generalData->fifoDepth; } void Implementation::setFifoDepth(const uint32_t i) { - if (fifoDepth != i) { - fifoDepth = i; + if (generalData->fifoDepth != i) { + generalData->fifoDepth = i; SetupFifoStructure(); } LOG(logINFO) << "Fifo Depth: " << i; @@ -303,6 +310,8 @@ Implementation::getFrameDiscardPolicy() const { void Implementation::setFrameDiscardPolicy(const frameDiscardPolicy i) { frameDiscardMode = i; + for (const auto &it : listener) + it->SetFrameDiscardPolicy(frameDiscardMode); LOG(logINFO) << "Frame Discard Policy: " << ToString(frameDiscardMode); } @@ -310,6 +319,8 @@ bool Implementation::getFramePaddingEnable() const { return framePadding; } void Implementation::setFramePaddingEnable(const bool i) { framePadding = i; + for (const auto &it : dataProcessor) + it->SetFramePadding(framePadding); LOG(logINFO) << "Frame Padding: " << framePadding; } @@ -330,7 +341,7 @@ std::array Implementation::getThreadIds() const { } else { retval[id++] = 0; } - if (numUDPInterfaces == 2) { + if (generalData->numUDPInterfaces == 2) { retval[id++] = listener[1]->GetThreadId(); retval[id++] = dataProcessor[1]->GetThreadId(); if (dataStreamEnable) { @@ -354,9 +365,9 @@ void Implementation::setArping(const bool i, arping.StopThread(); } else { // setup interface - for (int i = 0; i != numUDPInterfaces; ++i) { + for (int i = 0; i != generalData->numUDPInterfaces; ++i) { // ignore eiger with 2 interfaces (only udp port) - if (i == 1 && (numUDPInterfaces == 1 || detType == EIGER)) { + if (i == 1 && (generalData->numUDPInterfaces == 1 || generalData->detType == EIGER)) { break; } arping.SetInterfacesAndIps(i, eth[i], ips[i]); @@ -373,13 +384,13 @@ slsDetectorDefs::ROI Implementation::getReceiverROI() const { void Implementation::setReceiverROI(const slsDetectorDefs::ROI arg) { receiverRoi = arg; - if (numUDPInterfaces == 1 || detType == slsDetectorDefs::GOTTHARD2) { + if (generalData->numUDPInterfaces == 1 || generalData->detType == slsDetectorDefs::GOTTHARD2) { portRois[0] = arg; } else { slsDetectorDefs::xy nPortDim(generalData->nPixelsX, generalData->nPixelsY); - for (int iPort = 0; iPort != numUDPInterfaces; ++iPort) { + for (int iPort = 0; iPort != generalData->numUDPInterfaces; ++iPort) { // default init = complete roi slsDetectorDefs::ROI portRoi{}; @@ -443,7 +454,7 @@ void Implementation::setReceiverROI(const slsDetectorDefs::ROI arg) { for (size_t i = 0; i != dataProcessor.size(); ++i) dataProcessor[i]->SetReceiverROI(portRois[i]); LOG(logINFO) << "receiver roi: " << ToString(receiverRoi); - if (numUDPInterfaces == 2 && detType != slsDetectorDefs::GOTTHARD2) { + if (generalData->numUDPInterfaces == 2 && generalData->detType != slsDetectorDefs::GOTTHARD2) { LOG(logINFO) << "port rois: " << ToString(portRois); } } @@ -504,6 +515,8 @@ uint64_t Implementation::getFileIndex() const { return fileIndex; } void Implementation::setFileIndex(const uint64_t i) { fileIndex = i; + for (const auto &it : dataStreamer) + it->SetFileIndex(fileIndex); LOG(logINFO) << "File Index: " << fileIndex; } @@ -539,11 +552,11 @@ void Implementation::setOverwriteEnable(const bool b) { << (overwriteEnable ? "enabled" : "disabled"); } -uint32_t Implementation::getFramesPerFile() const { return framesPerFile; } +uint32_t Implementation::getFramesPerFile() const { return generalData->framesPerFile; } void Implementation::setFramesPerFile(const uint32_t i) { - framesPerFile = i; - LOG(logINFO) << "Frames per file: " << framesPerFile; + generalData->framesPerFile = i; + LOG(logINFO) << "Frames per file: " << generalData->framesPerFile; } /************************************************** @@ -554,7 +567,7 @@ void Implementation::setFramesPerFile(const uint32_t i) { slsDetectorDefs::runStatus Implementation::getStatus() const { return status; } std::vector Implementation::getFramesCaught() const { - std::vector numFramesCaught(numUDPInterfaces); + std::vector numFramesCaught(generalData->numUDPInterfaces); int index = 0; for (const auto &it : listener) { if (it->GetStartedFlag()) { @@ -566,7 +579,7 @@ std::vector Implementation::getFramesCaught() const { } std::vector Implementation::getCurrentFrameIndex() const { - std::vector frameIndex(numUDPInterfaces); + std::vector frameIndex(generalData->numUDPInterfaces); int index = 0; for (const auto &it : listener) { if (it->GetStartedFlag()) { @@ -611,8 +624,8 @@ double Implementation::getProgress() const { } std::vector Implementation::getNumMissingPackets() const { - std::vector mp(numUDPInterfaces); - for (int i = 0; i < numUDPInterfaces; ++i) { + std::vector mp(generalData->numUDPInterfaces); + for (int i = 0; i < generalData->numUDPInterfaces; ++i) { int np = generalData->packetsPerFrame; uint64_t totnp = np; // ReadNRows @@ -716,7 +729,7 @@ void Implementation::stopReceiver() { auto mp = getNumMissingPackets(); // print summary uint64_t tot = 0; - for (int i = 0; i < numUDPInterfaces; i++) { + for (int i = 0; i < generalData->numUDPInterfaces; i++) { int nf = listener[i]->GetNumCompleteFramesCaught(); tot += nf; std::string mpMessage = std::to_string(mp[i]); @@ -750,7 +763,7 @@ void Implementation::stopReceiver() { // callback if (acquisitionFinishedCallBack) { try { - acquisitionFinishedCallBack((tot / numUDPInterfaces), + acquisitionFinishedCallBack((tot / generalData->numUDPInterfaces), pAcquisitionFinished); } catch (const std::exception &e) { // change status @@ -836,7 +849,7 @@ void Implementation::ResetParametersforNewAcquisition() { void Implementation::CreateUDPSockets() { try { for (unsigned int i = 0; i < listener.size(); ++i) { - listener[i]->CreateUDPSockets(); + listener[i]->CreateUDPSocket(actualUDPSocketBufferSize); } } catch (const RuntimeError &e) { shutDownUDPSockets(); @@ -848,10 +861,12 @@ void Implementation::CreateUDPSockets() { void Implementation::SetupWriter() { try { for (unsigned int i = 0; i < dataProcessor.size(); ++i) { + std::ostringstream os; + os << filePath << "/" << fileName << "_d" << (modulePos * generalData->numUDPInterfaces + i); + std::string fileNamePrefix = os.str(); dataProcessor[i]->CreateFirstFiles( - filePath, fileName, fileIndex, overwriteEnable, silentMode, - modulePos, numUDPInterfaces, udpPortNum[i], framesPerFile, - numberOfTotalFrames, dynamicRange, detectorDataStream[i]); + fileNamePrefix, fileIndex, overwriteEnable, silentMode, + udpPortNum[i], numberOfTotalFrames, detectorDataStream[i]); } } catch (const RuntimeError &e) { shutDownUDPSockets(); @@ -867,13 +882,13 @@ void Implementation::StartMasterWriter() { // master file if (masterFileWriteEnable) { MasterAttributes masterAttributes; - masterAttributes.detType = detType; + masterAttributes.detType = generalData->detType; masterAttributes.timingMode = timingMode; masterAttributes.geometry = numPorts; masterAttributes.imageSize = generalData->imageSize; masterAttributes.nPixels = xy(generalData->nPixelsX, generalData->nPixelsY); - masterAttributes.maxFramesPerFile = framesPerFile; + masterAttributes.maxFramesPerFile = generalData->framesPerFile; masterAttributes.frameDiscardMode = frameDiscardMode; masterAttributes.framePadding = framePadding; masterAttributes.scanParams = scanParams; @@ -883,9 +898,9 @@ void Implementation::StartMasterWriter() { masterAttributes.exptime = acquisitionTime; masterAttributes.period = acquisitionPeriod; masterAttributes.burstMode = burstMode; - masterAttributes.numUDPInterfaces = numUDPInterfaces; - masterAttributes.dynamicRange = dynamicRange; - masterAttributes.tenGiga = tengigaEnable; + masterAttributes.numUDPInterfaces = generalData->numUDPInterfaces; + masterAttributes.dynamicRange = generalData->dynamicRange; + masterAttributes.tenGiga = generalData->tengigaEnable; masterAttributes.thresholdEnergyeV = thresholdEnergyeV; masterAttributes.thresholdAllEnergyeV = thresholdAllEnergyeV; masterAttributes.subExptime = subExpTime; @@ -894,24 +909,24 @@ void Implementation::StartMasterWriter() { masterAttributes.readNRows = readNRows; masterAttributes.ratecorr = rateCorrections; masterAttributes.adcmask = - tengigaEnable ? adcEnableMaskTenGiga : adcEnableMaskOneGiga; - masterAttributes.analog = (readoutType == ANALOG_ONLY || - readoutType == ANALOG_AND_DIGITAL) + generalData->tengigaEnable ? generalData->adcEnableMaskTenGiga : generalData->adcEnableMaskOneGiga; + masterAttributes.analog = (generalData->readoutType == ANALOG_ONLY || + generalData->readoutType == ANALOG_AND_DIGITAL) ? 1 : 0; - masterAttributes.analogSamples = numberOfAnalogSamples; - masterAttributes.digital = (readoutType == DIGITAL_ONLY || - readoutType == ANALOG_AND_DIGITAL) + masterAttributes.analogSamples = generalData->nAnalogSamples; + masterAttributes.digital = (generalData->readoutType == DIGITAL_ONLY || + generalData->readoutType == ANALOG_AND_DIGITAL) ? 1 : 0; - masterAttributes.digitalSamples = numberOfDigitalSamples; + masterAttributes.digitalSamples = generalData->nDigitalSamples; masterAttributes.dbitoffset = ctbDbitOffset; masterAttributes.dbitlist = 0; for (auto &i : ctbDbitList) { masterAttributes.dbitlist |= (1 << i); } - masterAttributes.detectorRoi = detectorRoi; - masterAttributes.counterMask = counterMask; + masterAttributes.detectorRoi = generalData->detectorRoi; + masterAttributes.counterMask = generalData->counterMask; masterAttributes.exptimeArray[0] = acquisitionTime1; masterAttributes.exptimeArray[1] = acquisitionTime2; masterAttributes.exptimeArray[2] = acquisitionTime3; @@ -935,9 +950,8 @@ void Implementation::StartMasterWriter() { virtualFileName = dataProcessor[0]->CreateVirtualFile( filePath, fileName, fileIndex, overwriteEnable, - silentMode, modulePos, numUDPInterfaces, framesPerFile, - numberOfTotalFrames, numPorts.x, numPorts.y, - dynamicRange, &hdf5LibMutex); + silentMode, modulePos, numberOfTotalFrames, numPorts.x, numPorts.y, + &hdf5LibMutex); } // link file in master if (masterFileWriteEnable) { @@ -976,17 +990,17 @@ void Implementation::StartRunning() { * * * ************************************************/ int Implementation::getNumberofUDPInterfaces() const { - return numUDPInterfaces; + return generalData->numUDPInterfaces; } // not Eiger void Implementation::setNumberofUDPInterfaces(const int n) { - if (detType == EIGER) { + if (generalData->detType == EIGER) { throw RuntimeError( "Cannot set number of UDP interfaces for Eiger"); } - if (numUDPInterfaces != n) { + if (generalData->numUDPInterfaces != n) { // clear all threads and fifos listener.clear(); dataProcessor.clear(); @@ -995,41 +1009,22 @@ void Implementation::setNumberofUDPInterfaces(const int n) { // set local variables generalData->SetNumberofInterfaces(n); - numUDPInterfaces = n; + generalData->numUDPInterfaces = n; // fifo - udpSocketBufferSize = generalData->defaultUdpSocketBufferSize; SetupFifoStructure(); // recalculate port rois setReceiverROI(receiverRoi); // create threads - for (int i = 0; i < numUDPInterfaces; ++i) { + for (int i = 0; i < generalData->numUDPInterfaces; ++i) { // listener and dataprocessor threads try { - auto fifo_ptr = fifo[i].get(); listener.push_back(sls::make_unique( - i, fifo_ptr, &status, &udpPortNum[i], ð[i], - &udpSocketBufferSize, &actualUDPSocketBufferSize, - &framesPerFile, &frameDiscardMode, &silentMode)); - listener[i]->SetGeneralData(generalData); - listener[i]->SetActivate(activated); - listener[i]->SetNoRoi(portRois[i].noRoi()); - listener[i]->SetDetectorDatastream(detectorDataStream[i]); - - int ctbAnalogDataBytes = 0; - if (detType == CHIPTESTBOARD) { - ctbAnalogDataBytes = - generalData->GetNumberOfAnalogDatabytes(); - } - dataProcessor.push_back(sls::make_unique( - i, detType, fifo_ptr, &dataStreamEnable, - &streamingFrequency, &streamingTimerInMs, - &streamingStartFnum, &framePadding, &ctbDbitList, - &ctbDbitOffset, &ctbAnalogDataBytes)); - dataProcessor[i]->SetGeneralData(generalData); - dataProcessor[i]->SetActivate(activated); - dataProcessor[i]->SetReceiverROI(portRois[i]); + i, &status)); + SetupListener(i); + dataProcessor.push_back(sls::make_unique(i)); + SetupDataProcessor(i); } catch (...) { listener.clear(); dataProcessor.clear(); @@ -1037,27 +1032,18 @@ void Implementation::setNumberofUDPInterfaces(const int n) { "Could not create listener/dataprocessor threads (index:" + std::to_string(i) + ")"); } + // streamer threads if (dataStreamEnable) { try { - bool flip = flipRows; - if (quadEnable) { - flip = (i == 1 ? true : false); - } - dataStreamer.push_back(sls::make_unique( - i, fifo[i].get(), &dynamicRange, &detectorRoi, - &fileIndex, flip, numPorts, &quadEnable, - &numberOfTotalFrames)); - dataStreamer[i]->SetGeneralData(generalData); - dataStreamer[i]->CreateZmqSockets( - &numUDPInterfaces, streamingPort, streamingSrcIP, - streamingHwm); - dataStreamer[i]->SetAdditionalJsonHeader( - additionalJsonHeader); + dataStreamer.push_back(sls::make_unique(i)); + SetupDataStreamer(i); } catch (...) { if (dataStreamEnable) { dataStreamer.clear(); dataStreamEnable = false; + for (const auto &it : dataProcessor) + it->SetDataStreamEnable(dataStreamEnable); } throw RuntimeError( "Could not create datastreamer threads (index:" + @@ -1089,13 +1075,14 @@ void Implementation::setNumberofUDPInterfaces(const int n) { setUDPSocketBufferSize(0); } - LOG(logINFO) << "Number of Interfaces: " << numUDPInterfaces; + LOG(logINFO) << "Number of Interfaces: " << generalData->numUDPInterfaces; } std::string Implementation::getEthernetInterface() const { return eth[0]; } void Implementation::setEthernetInterface(const std::string &c) { eth[0] = c; + listener[0]->SetEthernetInterface(c); LOG(logINFO) << "Ethernet Interface: " << eth[0]; } @@ -1103,6 +1090,9 @@ std::string Implementation::getEthernetInterface2() const { return eth[1]; } void Implementation::setEthernetInterface2(const std::string &c) { eth[1] = c; + if (listener.size() > 1) { + listener[1]->SetEthernetInterface(c); + } LOG(logINFO) << "Ethernet Interface 2: " << eth[1]; } @@ -1110,6 +1100,7 @@ uint32_t Implementation::getUDPPortNumber() const { return udpPortNum[0]; } void Implementation::setUDPPortNumber(const uint32_t i) { udpPortNum[0] = i; + listener[0]->SetUdpPortNumber(i); LOG(logINFO) << "UDP Port Number[0]: " << udpPortNum[0]; } @@ -1117,32 +1108,27 @@ uint32_t Implementation::getUDPPortNumber2() const { return udpPortNum[1]; } void Implementation::setUDPPortNumber2(const uint32_t i) { udpPortNum[1] = i; + if (listener.size() > 1) { + listener[1]->SetUdpPortNumber(i); + } LOG(logINFO) << "UDP Port Number[1]: " << udpPortNum[1]; } int Implementation::getUDPSocketBufferSize() const { - return udpSocketBufferSize; + return generalData->udpSocketBufferSize; } void Implementation::setUDPSocketBufferSize(const int s) { - // custom setup is not 0 (must complain if set up didnt work) - // testing default setup at startup, argument is 0 to use default values - int size = (s == 0) ? udpSocketBufferSize : s; size_t listSize = listener.size(); - if ((detType == JUNGFRAU || detType == GOTTHARD2) && - (int)listSize != numUDPInterfaces) { + if ((generalData->detType == JUNGFRAU || generalData->detType == GOTTHARD2) && + (int)listSize != generalData->numUDPInterfaces) { throw RuntimeError( - "Number of Interfaces " + std::to_string(numUDPInterfaces) + + "Number of Interfaces " + std::to_string(generalData->numUDPInterfaces) + " do not match listener size " + std::to_string(listSize)); } for (auto &l : listener) { - l->CreateDummySocketForUDPSocketBufferSize(size); - } - // custom and didnt set, throw error - if (s != 0 && udpSocketBufferSize != s) { - throw RuntimeError("Could not set udp socket buffer size. (No " - "CAP_NET_ADMIN privileges?)"); + l->CreateDummySocketForUDPSocketBufferSize(s, actualUDPSocketBufferSize); } } @@ -1165,31 +1151,23 @@ void Implementation::setDataStreamEnable(const bool enable) { dataStreamer.clear(); if (enable) { - for (int i = 0; i < numUDPInterfaces; ++i) { + for (int i = 0; i < generalData->numUDPInterfaces; ++i) { try { - bool flip = flipRows; - if (quadEnable) { - flip = (i == 1 ? true : false); - } - dataStreamer.push_back(sls::make_unique( - i, fifo[i].get(), &dynamicRange, &detectorRoi, - &fileIndex, flip, numPorts, &quadEnable, - &numberOfTotalFrames)); - dataStreamer[i]->SetGeneralData(generalData); - dataStreamer[i]->CreateZmqSockets( - &numUDPInterfaces, streamingPort, streamingSrcIP, - streamingHwm); - dataStreamer[i]->SetAdditionalJsonHeader( - additionalJsonHeader); + dataStreamer.push_back(sls::make_unique(i)); + SetupDataStreamer(i); } catch (...) { dataStreamer.clear(); dataStreamEnable = false; + for (const auto &it : dataProcessor) + it->SetDataStreamEnable(dataStreamEnable); throw RuntimeError( "Could not set data stream enable."); } } SetThreadPriorities(); } + for (const auto &it : dataProcessor) + it->SetDataStreamEnable(dataStreamEnable); } LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable; } @@ -1200,6 +1178,8 @@ uint32_t Implementation::getStreamingFrequency() const { void Implementation::setStreamingFrequency(const uint32_t freq) { streamingFrequency = freq; + for (const auto &it : dataProcessor) + it->SetStreamingFrequency(streamingFrequency); LOG(logINFO) << "Streaming Frequency: " << streamingFrequency; } @@ -1209,6 +1189,8 @@ uint32_t Implementation::getStreamingTimer() const { void Implementation::setStreamingTimer(const uint32_t time_in_ms) { streamingTimerInMs = time_in_ms; + for (const auto &it : dataProcessor) + it->SetStreamingTimerInMs(streamingTimerInMs); LOG(logINFO) << "Streamer Timer: " << streamingTimerInMs; } @@ -1218,6 +1200,8 @@ uint32_t Implementation::getStreamingStartingFrameNumber() const { void Implementation::setStreamingStartingFrameNumber(const uint32_t fnum) { streamingStartFnum = fnum; + for (const auto &it : dataProcessor) + it->SetStreamingStartFnum(streamingStartFnum); LOG(logINFO) << "Streaming Start Frame num: " << streamingStartFnum; } @@ -1313,7 +1297,7 @@ void Implementation::updateTotalNumberOfFrames() { int64_t repeats = numberOfTriggers; int64_t numFrames = numberOfFrames; // gotthard2 - if (detType == GOTTHARD2) { + if (generalData->detType == GOTTHARD2) { // auto if (timingMode == AUTO_TIMING) { // burst mode, repeats = #bursts @@ -1336,6 +1320,8 @@ void Implementation::updateTotalNumberOfFrames() { } numberOfTotalFrames = numFrames * repeats * (int64_t)(numberOfAdditionalStorageCells + 1); + for (const auto &it : dataStreamer) + it->SetNumberofTotalFrames(numberOfTotalFrames); if (numberOfTotalFrames == 0) { throw RuntimeError("Invalid total number of frames to receive: 0"); } @@ -1475,91 +1461,77 @@ void Implementation::setSubPeriod(const ns i) { } uint32_t Implementation::getNumberofAnalogSamples() const { - return numberOfAnalogSamples; + return generalData->nAnalogSamples; } void Implementation::setNumberofAnalogSamples(const uint32_t i) { - if (numberOfAnalogSamples != i) { - numberOfAnalogSamples = i; - + if ( generalData->nAnalogSamples != i) { generalData->SetNumberOfAnalogSamples(i); SetupFifoStructure(); } - LOG(logINFO) << "Number of Analog Samples: " << numberOfAnalogSamples; + LOG(logINFO) << "Number of Analog Samples: " << generalData->nAnalogSamples; LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); } uint32_t Implementation::getNumberofDigitalSamples() const { - return numberOfDigitalSamples; + return generalData->nDigitalSamples; } void Implementation::setNumberofDigitalSamples(const uint32_t i) { - if (numberOfDigitalSamples != i) { - numberOfDigitalSamples = i; - + if ( generalData->nDigitalSamples != i) { generalData->SetNumberOfDigitalSamples(i); SetupFifoStructure(); } - LOG(logINFO) << "Number of Digital Samples: " << numberOfDigitalSamples; + LOG(logINFO) << "Number of Digital Samples: " << generalData->nDigitalSamples; LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); } -uint32_t Implementation::getCounterMask() const { return counterMask; } +uint32_t Implementation::getCounterMask() const { return generalData->counterMask; } void Implementation::setCounterMask(const uint32_t i) { - if (counterMask != i) { + if (generalData->counterMask != i) { generalData->SetCounterMask(i); - counterMask = i; SetupFifoStructure(); } - LOG(logINFO) << "Counter mask: " << ToStringHex(counterMask); - int ncounters = __builtin_popcount(counterMask); + LOG(logINFO) << "Counter mask: " << ToStringHex(generalData->counterMask); + int ncounters = __builtin_popcount(generalData->counterMask); LOG(logINFO) << "Number of counters: " << ncounters; } -uint32_t Implementation::getDynamicRange() const { return dynamicRange; } +uint32_t Implementation::getDynamicRange() const { return generalData->dynamicRange; } void Implementation::setDynamicRange(const uint32_t i) { - if (dynamicRange != i) { - dynamicRange = i; - - if (detType == EIGER || detType == MYTHEN3) { + if (generalData->dynamicRange != i) { + if (generalData->detType == EIGER || generalData->detType == MYTHEN3) { generalData->SetDynamicRange(i); - fifoDepth = generalData->defaultFifoDepth; SetupFifoStructure(); } } - LOG(logINFO) << "Dynamic Range: " << dynamicRange; + LOG(logINFO) << "Dynamic Range: " << generalData->dynamicRange; } -slsDetectorDefs::ROI Implementation::getROI() const { return detectorRoi; } +slsDetectorDefs::ROI Implementation::getROI() const { return generalData->detectorRoi; } void Implementation::setDetectorROI(slsDetectorDefs::ROI arg) { - if (detectorRoi.xmin != arg.xmin || detectorRoi.xmax != arg.xmax) { - detectorRoi.xmin = arg.xmin; - detectorRoi.xmax = arg.xmax; - + if (generalData->detectorRoi.xmin != arg.xmin || generalData->detectorRoi.xmax != arg.xmax) { // only for gotthard generalData->SetDetectorROI(arg); - framesPerFile = generalData->maxFramesPerFile; SetupFifoStructure(); } - LOG(logINFO) << "Detector ROI: " << ToString(detectorRoi); + LOG(logINFO) << "Detector ROI: " << ToString(generalData->detectorRoi); LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); } -bool Implementation::getTenGigaEnable() const { return tengigaEnable; } +bool Implementation::getTenGigaEnable() const { return generalData->tengigaEnable; } void Implementation::setTenGigaEnable(const bool b) { - if (tengigaEnable != b) { - tengigaEnable = b; - + if ( generalData->tengigaEnable != b) { generalData->SetTenGigaEnable(b); SetupFifoStructure(); // datastream can be disabled/enabled only for Eiger 10GbE - if (detType == EIGER) { + if (generalData->detType == EIGER) { if (!b) { detectorDataStream[LEFT] = 1; detectorDataStream[RIGHT] = 1; @@ -1573,7 +1545,7 @@ void Implementation::setTenGigaEnable(const bool b) { << ToString(detectorDataStream[RIGHT]) << "]"; } } - LOG(logINFO) << "Ten Giga: " << (tengigaEnable ? "enabled" : "disabled"); + LOG(logINFO) << "Ten Giga: " << ( generalData->tengigaEnable ? "enabled" : "disabled"); LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); } @@ -1581,19 +1553,8 @@ bool Implementation::getFlipRows() const { return flipRows; } void Implementation::setFlipRows(bool enable) { flipRows = enable; - - if (!quadEnable) { - for (const auto &it : dataStreamer) { - it->SetFlipRows(flipRows); - } - } - // quad - else { - if (dataStreamer.size() == 2) { - dataStreamer[0]->SetFlipRows(false); - dataStreamer[1]->SetFlipRows(true); - } - } + for (const auto &it : dataStreamer) + it->SetFlipRows(flipRows); LOG(logINFO) << "Flip Rows: " << flipRows; } @@ -1603,15 +1564,9 @@ void Implementation::setQuad(const bool b) { if (quadEnable != b) { quadEnable = b; setDetectorSize(numModules); - if (!quadEnable) { - for (const auto &it : dataStreamer) { - it->SetFlipRows(flipRows); - } - } else { - if (dataStreamer.size() == 2) { - dataStreamer[0]->SetFlipRows(false); - dataStreamer[1]->SetFlipRows(true); - } + for (const auto &it : dataStreamer) { + it->SetQuadEnable(quadEnable); + it->SetFlipRows(flipRows); } } LOG(logINFO) << "Quad Enable: " << quadEnable; @@ -1641,7 +1596,7 @@ void Implementation::setDetectorDataStream(const portPosition port, LOG(logINFO) << "Detector 10GbE datastream (" << ToString(port) << " Port): " << ToString(detectorDataStream10GbE[index]); // update datastream for 10g - if (tengigaEnable) { + if ( generalData->tengigaEnable) { detectorDataStream[index] = detectorDataStream10GbE[index]; LOG(logDEBUG) << "Detector datastream updated [" << (index == 0 ? "Left" : "Right") @@ -1673,59 +1628,63 @@ void Implementation::setRateCorrections(const std::vector &t) { } slsDetectorDefs::readoutMode Implementation::getReadoutMode() const { - return readoutType; + return generalData->readoutType; } void Implementation::setReadoutMode(const readoutMode f) { - if (readoutType != f) { - readoutType = f; - + if (generalData->readoutType != f) { generalData->SetReadoutMode(f); SetupFifoStructure(); } - LOG(logINFO) << "Readout Mode: " << ToString(f); + LOG(logINFO) << "Readout Mode: " << ToString(generalData->readoutType); LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); } uint32_t Implementation::getADCEnableMask() const { - return adcEnableMaskOneGiga; + return generalData->adcEnableMaskOneGiga; } void Implementation::setADCEnableMask(uint32_t mask) { - if (adcEnableMaskOneGiga != mask) { - adcEnableMaskOneGiga = mask; - + if (generalData->adcEnableMaskOneGiga != mask) { generalData->SetOneGigaAdcEnableMask(mask); SetupFifoStructure(); } LOG(logINFO) << "ADC Enable Mask for 1Gb mode: 0x" << std::hex - << adcEnableMaskOneGiga << std::dec; + << generalData->adcEnableMaskOneGiga << std::dec; LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); } uint32_t Implementation::getTenGigaADCEnableMask() const { - return adcEnableMaskTenGiga; + return generalData->adcEnableMaskTenGiga; } void Implementation::setTenGigaADCEnableMask(uint32_t mask) { - if (adcEnableMaskTenGiga != mask) { - adcEnableMaskTenGiga = mask; - + if (generalData->adcEnableMaskTenGiga != mask) { generalData->SetTenGigaAdcEnableMask(mask); SetupFifoStructure(); } LOG(logINFO) << "ADC Enable Mask for 10Gb mode: 0x" << std::hex - << adcEnableMaskTenGiga << std::dec; + << generalData->adcEnableMaskTenGiga << std::dec; LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame); } std::vector Implementation::getDbitList() const { return ctbDbitList; } -void Implementation::setDbitList(const std::vector &v) { ctbDbitList = v; } +void Implementation::setDbitList(const std::vector &v) { + ctbDbitList = v; + for (const auto &it : dataProcessor) + it->SetCtbDbitList(ctbDbitList); + LOG(logINFO) << "Dbit list: " << ToString(ctbDbitList); +} int Implementation::getDbitOffset() const { return ctbDbitOffset; } -void Implementation::setDbitOffset(const int s) { ctbDbitOffset = s; } +void Implementation::setDbitOffset(const int s) { + ctbDbitOffset = s; + for (const auto &it : dataProcessor) + it->SetCtbDbitOffset(ctbDbitOffset); + LOG(logINFO) << "Dbit offset: " << ctbDbitOffset; +} /************************************************** * * diff --git a/slsReceiverSoftware/src/Implementation.h b/slsReceiverSoftware/src/Implementation.h index 0550c74f2..3e9e2a405 100644 --- a/slsReceiverSoftware/src/Implementation.h +++ b/slsReceiverSoftware/src/Implementation.h @@ -287,6 +287,9 @@ class Implementation : private virtual slsDetectorDefs { void SetupWriter(); void StartMasterWriter(); void StartRunning(); + void SetupListener(int i); + void SetupDataProcessor(int i); + void SetupDataStreamer(int i); /************************************************** * * @@ -295,13 +298,11 @@ class Implementation : private virtual slsDetectorDefs { * ************************************************/ // config parameters - detectorType detType{GENERIC}; xy numModules{1, 1}; xy numPorts{1, 1}; int modulePos{0}; std::string detHostname; bool silentMode{false}; - uint32_t fifoDepth{0}; frameDiscardPolicy frameDiscardMode{NO_DISCARD}; bool framePadding{true}; pid_t parentThreadId; @@ -319,7 +320,6 @@ class Implementation : private virtual slsDetectorDefs { bool fileWriteEnable{false}; bool masterFileWriteEnable{true}; bool overwriteEnable{true}; - uint32_t framesPerFile{0}; // acquisition std::atomic status{IDLE}; @@ -327,11 +327,9 @@ class Implementation : private virtual slsDetectorDefs { scanParameters scanParams{}; // network configuration (UDP) - int numUDPInterfaces{1}; std::array eth; std::array udpPortNum{ {DEFAULT_UDP_PORTNO, DEFAULT_UDP_PORTNO + 1}}; - int udpSocketBufferSize{0}; int actualUDPSocketBufferSize{0}; // zmq parameters @@ -363,12 +361,6 @@ class Implementation : private virtual slsDetectorDefs { ns gateDelay3 = std::chrono::nanoseconds(0); ns subExpTime = std::chrono::nanoseconds(0); ns subPeriod = std::chrono::nanoseconds(0); - uint32_t numberOfAnalogSamples{0}; - uint32_t numberOfDigitalSamples{0}; - uint32_t counterMask{0}; - uint32_t dynamicRange{16}; - ROI detectorRoi{}; - bool tengigaEnable{false}; bool flipRows{false}; bool quadEnable{false}; bool activated{true}; @@ -379,9 +371,6 @@ class Implementation : private virtual slsDetectorDefs { int thresholdEnergyeV{-1}; std::array thresholdAllEnergyeV = {{-1, -1, -1}}; std::vector rateCorrections; - readoutMode readoutType{ANALOG_ONLY}; - uint32_t adcEnableMaskOneGiga{BIT32_MASK}; - uint32_t adcEnableMaskTenGiga{BIT32_MASK}; std::vector ctbDbitList; int ctbDbitOffset{0}; diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index 174c049cf..cd8a260b1 100644 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -24,16 +24,15 @@ namespace sls { const std::string Listener::TypeName = "Listener"; -Listener::Listener(int index, Fifo *fifo, std::atomic *status, uint32_t *udpPortNumber, std::string *eth, int *udpSocketBufferSize, int *actualUDPSocketBufferSize, uint32_t *framesPerFile, frameDiscardPolicy *frameDiscardMode, bool *silentMode) - : ThreadObject(index, TypeName), fifo(fifo), status(status), - udpPortNumber(udpPortNumber), eth(eth), udpSocketBufferSize(udpSocketBufferSize), actualUDPSocketBufferSize(actualUDPSocketBufferSize), framesPerFile(framesPerFile), frameDiscardMode(frameDiscardMode), silentMode(silentMode) { +Listener::Listener(int index, std::atomic *status) + : ThreadObject(index, TypeName), status(status) { LOG(logDEBUG) << "Listener " << index << " created"; } Listener::~Listener() = default; -bool Listener::isPortDisabled() const { - return disabledPort; +bool Listener::isPortDisabled() const { + return disabledPort; } uint64_t Listener::GetPacketsCaught() const { return numPacketsCaught; } @@ -72,6 +71,47 @@ uint64_t Listener::GetListenedIndex() const { void Listener::SetFifo(Fifo *f) { fifo = f; } +void Listener::SetGeneralData(GeneralData *g) { generalData = g; } + +void Listener::SetUdpPortNumber(const uint32_t portNumber) { + udpPortNumber = portNumber; +} + +void Listener::SetEthernetInterface(const std::string e) { + eth = e; + // if eth is mistaken with ip address + if (eth.find('.') != std::string::npos) { + eth = ""; + } + if (!eth.length()) { + LOG(logWARNING) << "ethernet interface for udp port " << udpPortNumber << " is empty. Listening to all"; + } +} + +void Listener::SetActivate(bool enable) { + activated = enable; + disabledPort = (!activated || !detectorDataStream || noRoi); +} + +void Listener::SetDetectorDatastream(bool enable) { + detectorDataStream = enable; + disabledPort = (!activated || !detectorDataStream || noRoi); +} + +void Listener::SetNoRoi(bool enable) { + noRoi = enable; + disabledPort = (!activated || !detectorDataStream || noRoi); +} + +void Listener::SetFrameDiscardPolicy(frameDiscardPolicy value) { + frameDiscardMode = value; +} + +void Listener::SetSilentMode(bool enable) { + silentMode = enable; +} + + void Listener::ResetParametersforNewAcquisition() { StopRunning(); startedFlag = false; @@ -104,43 +144,17 @@ void Listener::RecordFirstIndex(uint64_t fnum) { startedFlag = true; firstIndex = fnum; - if (!(*silentMode)) { + if (!silentMode) { if (!index) { LOG(logINFOBLUE) << index << " First Index: " << firstIndex; } } } -void Listener::SetGeneralData(GeneralData *g) { generalData = g; } - -void Listener::SetActivate(bool enable) { - activated = enable; - disabledPort = (!activated || !detectorDataStream || noRoi); -} - -void Listener::SetDetectorDatastream(bool enable) { - detectorDataStream = enable; - disabledPort = (!activated || !detectorDataStream || noRoi); -} - -void Listener::SetNoRoi(bool enable) { - noRoi = enable; - disabledPort = (!activated || !detectorDataStream || noRoi); -} - -void Listener::CreateUDPSockets() { +void Listener::CreateUDPSocket(int& actualSize) { if (disabledPort) { return; } - - // if eth is mistaken with ip address - if ((*eth).find('.') != std::string::npos) { - (*eth) = ""; - } - if (!(*eth).length()) { - LOG(logWARNING) << "eth is empty. Listening to all"; - } - ShutDownUDPSocket(); uint32_t packetSize = generalData->packetSize; @@ -148,23 +162,20 @@ void Listener::CreateUDPSockets() { packetSize = generalData->vetoPacketSize; } - // InterfaceNameToIp(eth).str().c_str() try { - udpSocket = make_unique( - *udpPortNumber, packetSize, - ((*eth).length() ? InterfaceNameToIp(*eth).str().c_str() - : nullptr), - *udpSocketBufferSize); - LOG(logINFO) << index << ": UDP port opened at port " << *udpPortNumber; + udpSocket = make_unique(udpPortNumber, packetSize, + (eth.length() ? InterfaceNameToIp(eth).str().c_str() + : nullptr), generalData->udpSocketBufferSize); + LOG(logINFO) << index << ": UDP port opened at port " << udpPortNumber; } catch (...) { throw RuntimeError("Could not create UDP socket on port " + - std::to_string(*udpPortNumber)); + std::to_string(udpPortNumber)); } udpSocketAlive = true; // doubled due to kernel bookkeeping (could also be less due to permissions) - *actualUDPSocketBufferSize = udpSocket->getBufferSize(); + actualSize = udpSocket->getBufferSize(); } void Listener::ShutDownUDPSocket() { @@ -173,27 +184,24 @@ void Listener::ShutDownUDPSocket() { // give other thread time after udpSocketAlive is changed usleep(0); udpSocket->Shutdown(); - LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber; + LOG(logINFO) << "Shut down of UDP port " << udpPortNumber; } } -void Listener::CreateDummySocketForUDPSocketBufferSize(int s) { - LOG(logINFO) << "Testing UDP Socket Buffer size " << s << " with test port " - << *udpPortNumber; +void Listener::CreateDummySocketForUDPSocketBufferSize(int s, int& actualSize) { + // custom setup (s != 0) + // default setup at startup (s = 0) + int size = (s == 0 ? generalData->udpSocketBufferSize : s); + LOG(logINFO) << "Testing UDP Socket Buffer size " << size << " with test port " + << udpPortNumber; + int previousSize = generalData->udpSocketBufferSize; + generalData->udpSocketBufferSize = size; if (disabledPort) { - *actualUDPSocketBufferSize = (s * 2); + actualSize = (generalData->udpSocketBufferSize * 2); return; } - int temp = *udpSocketBufferSize; - *udpSocketBufferSize = s; - - // if eth is mistaken with ip address - if ((*eth).find('.') != std::string::npos) { - (*eth) = ""; - } - uint32_t packetSize = generalData->packetSize; if (generalData->detType == GOTTHARD2 && index != 0) { packetSize = generalData->vetoPacketSize; @@ -201,24 +209,29 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int s) { // create dummy socket try { - UdpRxSocket g(*udpPortNumber, packetSize, - ((*eth).length() - ? InterfaceNameToIp(*eth).str().c_str() - : nullptr), - *udpSocketBufferSize); + UdpRxSocket g(udpPortNumber, packetSize, + (eth.length() + ? InterfaceNameToIp(eth).str().c_str() + : nullptr), generalData->udpSocketBufferSize); // doubled due to kernel bookkeeping (could also be less due to // permissions) - *actualUDPSocketBufferSize = g.getBufferSize(); - if (*actualUDPSocketBufferSize == -1) { - *udpSocketBufferSize = temp; + actualSize = g.getBufferSize(); + if (actualSize == -1) { + generalData->udpSocketBufferSize = previousSize; } else { - *udpSocketBufferSize = (*actualUDPSocketBufferSize) / 2; + generalData->udpSocketBufferSize = actualSize / 2; } } catch (...) { throw RuntimeError("Could not create a test UDP socket on port " + - std::to_string(*udpPortNumber)); + std::to_string(udpPortNumber)); + } + + // custom and didnt set, throw error + if (s != 0 && static_cast(generalData->udpSocketBufferSize) != s) { + throw RuntimeError("Could not set udp socket buffer size. (No " + "CAP_NET_ADMIN privileges?)"); } } @@ -257,12 +270,12 @@ void Listener::ThreadExecution() { fifo->PushAddress(buffer); // Statistics - if (!(*silentMode)) { + if (!silentMode) { numFramesStatistic++; if (numFramesStatistic >= // second condition also for infinite #number of frames - (((*framesPerFile) == 0) ? STATISTIC_FRAMENUMBER_INFINITE - : (*framesPerFile))) + (generalData->framesPerFile == 0 ? STATISTIC_FRAMENUMBER_INFINITE + : generalData->framesPerFile)) PrintFifoStatistics(); } } @@ -271,7 +284,7 @@ void Listener::StopListening(char *buf, size_t & size) { size = DUMMY_PACKET_VALUE; fifo->PushAddress(buf); StopRunning(); - LOG(logDEBUG1) << index << ": Listening Completed. Packets (" << *udpPortNumber + LOG(logDEBUG1) << index << ": Listening Completed. Packets (" << udpPortNumber << ") : " << numPacketsCaught; } @@ -342,7 +355,7 @@ uint32_t Listener::ListenToAnImage(sls_receiver_header & dstHeader, char *dstDat // Eiger Firmware in a weird state if (generalData->detType == EIGER && fnum == 0) { - LOG(logERROR) << "[" << *udpPortNumber + LOG(logERROR) << "[" << udpPortNumber << "]: Got Frame Number " "Zero from Firmware. Discarding Packet"; numPacketsCaught--; @@ -385,7 +398,7 @@ uint32_t Listener::ListenToAnImage(sls_receiver_header & dstHeader, char *dstDat } size_t Listener::HandleFuturePacket(bool EOA, uint32_t numpackets, uint64_t fnum, bool isHeaderEmpty, size_t imageSize, sls_receiver_header& dstHeader) { - switch (*frameDiscardMode) { + switch (frameDiscardMode) { case DISCARD_EMPTY_FRAMES: if (!numpackets) { if (!EOA) { @@ -495,7 +508,7 @@ void Listener::PrintFifoStatistics() { numFramesStatistic = 0; const auto color = loss ? logINFORED : logINFOGREEN; - LOG(color) << "[" << *udpPortNumber + LOG(color) << "[" << udpPortNumber << "]: " "Packet_Loss:" << loss << " (" << lossPercent << "%)" diff --git a/slsReceiverSoftware/src/Listener.h b/slsReceiverSoftware/src/Listener.h index 6ca0c9c26..07d48e796 100644 --- a/slsReceiverSoftware/src/Listener.h +++ b/slsReceiverSoftware/src/Listener.h @@ -24,35 +24,35 @@ class Fifo; class Listener : private virtual slsDetectorDefs, public ThreadObject { public: - - Listener(int index, Fifo *fifo, std::atomic *status, uint32_t *udpPortNumber, std::string *eth, int *udpSocketBufferSize, int *actualUDPSocketBufferSize, uint32_t *framesPerFile, frameDiscardPolicy *frameDiscardMode, bool *silentMode); + Listener(int index, std::atomic *status); ~Listener(); - bool isPortDisabled() const; - uint64_t GetPacketsCaught() const; - uint64_t GetNumCompleteFramesCaught() const; - uint64_t GetLastFrameIndexCaught() const; - /** negative values in case of extra packets */ - int64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const; - bool GetStartedFlag() const; - uint64_t GetCurrentFrameIndex() const; - uint64_t GetListenedIndex() const; + bool isPortDisabled() const; + uint64_t GetPacketsCaught() const; + uint64_t GetNumCompleteFramesCaught() const; + uint64_t GetLastFrameIndexCaught() const; + /** negative values in case of extra packets */ + int64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const; + bool GetStartedFlag() const; + uint64_t GetCurrentFrameIndex() const; + uint64_t GetListenedIndex() const; - void SetFifo(Fifo *f); - void ResetParametersforNewAcquisition(); - void SetGeneralData(GeneralData *g); - void SetActivate(bool enable); - void SetDetectorDatastream(bool enable); - void SetNoRoi(bool enable); - void CreateUDPSockets(); - void ShutDownUDPSocket(); + void SetFifo(Fifo *f); + void SetGeneralData(GeneralData *g); + void SetUdpPortNumber(const uint32_t portNumber); + void SetEthernetInterface(const std::string e); + void SetActivate(bool enable); + void SetDetectorDatastream(bool enable); + void SetNoRoi(bool enable); + void SetFrameDiscardPolicy(frameDiscardPolicy value); + void SetSilentMode(bool enable); - /** - * Create & closes a dummy UDP socket - * to set & get actual buffer size - * @param s UDP socket buffer size to be set - */ - void CreateDummySocketForUDPSocketBufferSize(int s); + + void ResetParametersforNewAcquisition(); + void CreateUDPSocket(int& actualSize); + void ShutDownUDPSocket(); + /** to set & get actual buffer size */ + void CreateDummySocketForUDPSocketBufferSize(int s, int & actualSize); /** * Set hard coded (calculated but not from detector) row and column @@ -102,17 +102,14 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { // individual members std::atomic *status; std::unique_ptr udpSocket{nullptr}; - uint32_t *udpPortNumber; - std::string *eth; - int *udpSocketBufferSize; - /** double due to kernel bookkeeping */ - int *actualUDPSocketBufferSize; - uint32_t *framesPerFile; - frameDiscardPolicy *frameDiscardMode; + + uint32_t udpPortNumber{0}; + std::string eth; + frameDiscardPolicy frameDiscardMode; bool activated{false}; bool detectorDataStream{true}; bool noRoi{false}; - bool *silentMode; + bool silentMode; bool disabledPort{false}; /** row hardcoded as 1D or 2d,