From 4117cda79bd206c65985cc3824a7fd75cbfc3197 Mon Sep 17 00:00:00 2001 From: Dhanya Thattil <33750417+thattil@users.noreply.github.com> Date: Fri, 22 Jul 2022 15:32:41 +0200 Subject: [PATCH] Rx: refactor memory structure and listener (#496) * gui message doesnt show if it has a '>' symbol in error msg * minor refactoring for readability (size_t calc fifo size) * refactoring listening udp socket code: activated and datastream dont create udp sockets anyway, rc<=- should be discarded in any case * wip * refactoring memory structure access * wip: bugfix write header + data to binary * wip * wip * wip * wip * wip * wip * wip * wip * wip * portRoi no roi effecto on progress * fail at receiver progress, wip * segfaults for char pointer in struct * reference to header to get header and data * refactoring * use const defined for size of header of fifo * updated release notes * refactoring from review: fwrite, static_cast --- RELEASE.txt | 1 + slsReceiverSoftware/src/BinaryDataFile.cpp | 25 +- slsReceiverSoftware/src/BinaryDataFile.h | 4 +- slsReceiverSoftware/src/ClientInterface.cpp | 2 +- slsReceiverSoftware/src/DataProcessor.cpp | 329 +++++++------- slsReceiverSoftware/src/DataProcessor.h | 68 +-- slsReceiverSoftware/src/DataStreamer.cpp | 96 ++-- slsReceiverSoftware/src/DataStreamer.h | 15 +- slsReceiverSoftware/src/Fifo.cpp | 6 +- slsReceiverSoftware/src/Fifo.h | 4 +- slsReceiverSoftware/src/File.h | 4 +- slsReceiverSoftware/src/GeneralData.h | 16 - slsReceiverSoftware/src/HDF5DataFile.cpp | 16 +- slsReceiverSoftware/src/HDF5DataFile.h | 6 +- slsReceiverSoftware/src/Implementation.cpp | 44 +- slsReceiverSoftware/src/Listener.cpp | 468 +++++++------------- slsReceiverSoftware/src/Listener.h | 20 +- slsReceiverSoftware/src/receiver_defs.h | 13 +- 18 files changed, 469 insertions(+), 668 deletions(-) diff --git a/RELEASE.txt b/RELEASE.txt index 0cd4c5f26..d48f5d6cc 100755 --- a/RELEASE.txt +++ b/RELEASE.txt @@ -83,6 +83,7 @@ This document describes the differences between v7.0.0 and v6.x.x -udp_srcip and udp_Srcip2: can set to auto (for virtual or 1g data networks) - set dataset name for all hdf5 files to "data" only - number of storage cells is not updated in teh receiver. done. and also allowing it to be modified in running status +- refactored memory structure in receiver and listener code (maybe resolves stuck issue, need to check) 2. Resolved Issues ================== diff --git a/slsReceiverSoftware/src/BinaryDataFile.cpp b/slsReceiverSoftware/src/BinaryDataFile.cpp index 98103068a..b21b89284 100644 --- a/slsReceiverSoftware/src/BinaryDataFile.cpp +++ b/slsReceiverSoftware/src/BinaryDataFile.cpp @@ -65,9 +65,7 @@ void BinaryDataFile::CreateFile() { } } -void BinaryDataFile::WriteToFile(char *buffer, const int buffersize, - const uint64_t currentFrameNumber, - const uint32_t numPacketsCaught) { +void BinaryDataFile::WriteToFile(char *imageData, sls_receiver_header& header, const int imageSize, const uint64_t currentFrameNumber, const uint32_t numPacketsCaught) { // check if maxframesperfile = 0 for infinite if (maxFramesPerFile_ && (numFramesInFile_ >= maxFramesPerFile_)) { CloseFile(); @@ -77,37 +75,34 @@ void BinaryDataFile::WriteToFile(char *buffer, const int buffersize, ++numFramesInFile_; // write to file - int ret = 0; + size_t ret = 0; - // contiguous bitset + // contiguous bitset (write header + image) if (sizeof(sls_bitset) == sizeof(bitset_storage)) { - ret = fwrite(buffer, 1, buffersize, fd_); + ret = fwrite(&header, sizeof(sls_receiver_header) + imageSize, 1, fd_); } // not contiguous bitset else { // write detector header - ret = fwrite(buffer, 1, sizeof(sls_detector_header), fd_); + ret = fwrite(&header, sizeof(sls_detector_header), 1, fd_); // get contiguous representation of bit mask bitset_storage storage; memset(storage, 0, sizeof(bitset_storage)); - sls_bitset bits = *(sls_bitset *)(buffer + sizeof(sls_detector_header)); + sls_bitset bits = header.packetsMask; for (int i = 0; i < MAX_NUM_PACKETS; ++i) storage[i >> 3] |= (bits[i] << (i & 7)); // write bitmask - ret += fwrite((char *)storage, 1, sizeof(bitset_storage), fd_); + ret += fwrite(storage, sizeof(bitset_storage), 1, fd_); // write data - ret += fwrite(buffer + sizeof(sls_detector_header), 1, - buffersize - sizeof(sls_receiver_header), fd_); + ret += fwrite(imageData, imageSize, 1, fd_); } // if write error - if (ret != buffersize) { - throw RuntimeError(std::to_string(index_) + - " : Write to file failed for image number " + - std::to_string(currentFrameNumber)); + if (ret != imageSize + sizeof(sls_receiver_header)) { + throw RuntimeError(std::to_string(index_) + " : Write to file failed for image number " + std::to_string(currentFrameNumber) + ". Wrote " + std::to_string(ret) + " bytes instead of " + std::to_string(imageSize + sizeof(sls_receiver_header))); } } diff --git a/slsReceiverSoftware/src/BinaryDataFile.h b/slsReceiverSoftware/src/BinaryDataFile.h index b2de445de..c06008ea3 100644 --- a/slsReceiverSoftware/src/BinaryDataFile.h +++ b/slsReceiverSoftware/src/BinaryDataFile.h @@ -22,9 +22,7 @@ class BinaryDataFile : private virtual slsDetectorDefs, public File { const uint32_t udpPortNumber, const uint32_t maxFramesPerFile) override; - void WriteToFile(char *buffer, const int buffersize, - const uint64_t currentFrameNumber, - const uint32_t numPacketsCaught) override; + void WriteToFile(char *imageData, sls_receiver_header& header, const int imageSize, const uint64_t currentFrameNumber, const uint32_t numPacketsCaught) override; private: void CreateFile(); diff --git a/slsReceiverSoftware/src/ClientInterface.cpp b/slsReceiverSoftware/src/ClientInterface.cpp index c17accb2a..316172394 100644 --- a/slsReceiverSoftware/src/ClientInterface.cpp +++ b/slsReceiverSoftware/src/ClientInterface.cpp @@ -1176,7 +1176,7 @@ int ClientInterface::get_additional_json_header(Interface &socket) { int ClientInterface::set_udp_socket_buffer_size(Interface &socket) { auto size = socket.Receive(); if (size == 0) { - throw RuntimeError("Receiver socket buffer size must be > 0."); + throw RuntimeError("Receiver socket buffer size must be greater than 0."); } if (size > 0) { verifyIdle(socket); diff --git a/slsReceiverSoftware/src/DataProcessor.cpp b/slsReceiverSoftware/src/DataProcessor.cpp index 638b0b99b..8bef7fd26 100644 --- a/slsReceiverSoftware/src/DataProcessor.cpp +++ b/slsReceiverSoftware/src/DataProcessor.cpp @@ -26,72 +26,72 @@ namespace sls { -const std::string DataProcessor::typeName_ = "DataProcessor"; +const std::string DataProcessor::typeName = "DataProcessor"; -DataProcessor::DataProcessor(int index, detectorType detectorType, Fifo *fifo, - bool *dataStreamEnable, - uint32_t *streamingFrequency, - uint32_t *streamingTimerInMs, - uint32_t *streamingStartFnum, bool *framePadding, - std::vector *ctbDbitList, int *ctbDbitOffset, - int *ctbAnalogDataBytes) - : ThreadObject(index, typeName_), fifo_(fifo), detectorType_(detectorType), - dataStreamEnable_(dataStreamEnable), - streamingFrequency_(streamingFrequency), - streamingTimerInMs_(streamingTimerInMs), - streamingStartFnum_(streamingStartFnum), framePadding_(framePadding), - ctbDbitList_(ctbDbitList), ctbDbitOffset_(ctbDbitOffset), - ctbAnalogDataBytes_(ctbAnalogDataBytes) { +DataProcessor::DataProcessor(int index, detectorType dType, Fifo *f, + bool *dse, + uint32_t *sf, + uint32_t *st, + uint32_t *sfnum, bool *fp, + std::vector *ctblist, int *ctboff, + int *ctbad) + : ThreadObject(index, typeName), fifo(f), detType(dType), + dataStreamEnable(dse), + streamingFrequency(sf), + streamingTimerInMs(st), + streamingStartFnum(sfnum), framePadding(fp), + ctbDbitList(ctblist), ctbDbitOffset(ctboff), + ctbAnalogDataBytes(ctbad) { LOG(logDEBUG) << "DataProcessor " << index << " created"; } DataProcessor::~DataProcessor() { DeleteFiles(); } -bool DataProcessor::GetStartedFlag() const { return startedFlag_; } +bool DataProcessor::GetStartedFlag() const { return startedFlag; } -void DataProcessor::SetFifo(Fifo *fifo) { fifo_ = fifo; } +void DataProcessor::SetFifo(Fifo *fifo) { fifo = fifo; } -void DataProcessor::SetActivate(bool enable) { activated_ = enable; } +void DataProcessor::SetActivate(bool enable) { activated = enable; } void DataProcessor::SetReceiverROI(ROI roi) { - receiverRoi_ = roi; - receiverRoiEnabled_ = receiverRoi_.completeRoi() ? false : true; - receiverNoRoi_ = receiverRoi_.noRoi(); + receiverRoi = roi; + receiverRoiEnabled = receiverRoi.completeRoi() ? false : true; + receiverNoRoi = receiverRoi.noRoi(); } void DataProcessor::ResetParametersforNewAcquisition() { StopRunning(); - startedFlag_ = false; - numFramesCaught_ = 0; - firstIndex_ = 0; - currentFrameIndex_ = 0; - firstStreamerFrame_ = true; - streamCurrentFrame_ = false; - completeImageToStreamBeforeCropping = make_unique(generalData_->imageSize); + startedFlag = false; + numFramesCaught = 0; + firstIndex = 0; + currentFrameIndex = 0; + firstStreamerFrame = true; + streamCurrentFrame = false; + completeImageToStreamBeforeCropping = make_unique(generalData->imageSize); } void DataProcessor::RecordFirstIndex(uint64_t fnum) { // listen to this fnum, later +1 - currentFrameIndex_ = fnum; - startedFlag_ = true; - firstIndex_ = fnum; - LOG(logDEBUG1) << index << " First Index:" << firstIndex_; + currentFrameIndex = fnum; + startedFlag = true; + firstIndex = fnum; + LOG(logDEBUG1) << index << " First Index:" << firstIndex; } -void DataProcessor::SetGeneralData(GeneralData *generalData) { - generalData_ = generalData; +void DataProcessor::SetGeneralData(GeneralData *g) { + generalData = g; } void DataProcessor::CloseFiles() { - if (dataFile_) - dataFile_->CloseFile(); + if (dataFile) + dataFile->CloseFile(); } void DataProcessor::DeleteFiles() { CloseFiles(); - delete dataFile_; - dataFile_ = nullptr; + delete dataFile; + dataFile = nullptr; } void DataProcessor::SetupFileWriter(const bool filewriteEnable, const fileFormat fileFormatType, @@ -101,11 +101,11 @@ void DataProcessor::SetupFileWriter(const bool filewriteEnable, switch (fileFormatType) { #ifdef HDF5C case HDF5: - dataFile_ = new HDF5DataFile(index, hdf5LibMutex); + dataFile = new HDF5DataFile(index, hdf5LibMutex); break; #endif case BINARY: - dataFile_ = new BinaryDataFile(index); + dataFile = new BinaryDataFile(index); break; default: throw RuntimeError( @@ -121,38 +121,38 @@ void DataProcessor::CreateFirstFiles( const uint32_t udpPortNumber, const uint32_t maxFramesPerFile, const uint64_t numImages, const uint32_t dynamicRange, const bool detectorDataStream) { - if (dataFile_ == nullptr) { + if (dataFile == nullptr) { throw RuntimeError("file object not contstructed"); } CloseFiles(); // deactivated (half module/ single port or no roi), dont write file - if (!activated_ || !detectorDataStream || receiverNoRoi_) { + if (!activated || !detectorDataStream || receiverNoRoi) { return; } #ifdef HDF5C - int nx = generalData_->nPixelsX; - int ny = generalData_->nPixelsY; - if (receiverRoiEnabled_) { - nx = receiverRoi_.xmax - receiverRoi_.xmin + 1; - ny = receiverRoi_.ymax - receiverRoi_.ymin + 1; - if (receiverRoi_.ymax == -1 || receiverRoi_.ymin == -1) { + int nx = generalData->nPixelsX; + int ny = generalData->nPixelsY; + if (receiverRoiEnabled) { + nx = receiverRoi.xmax - receiverRoi.xmin + 1; + ny = receiverRoi.ymax - receiverRoi.ymin + 1; + if (receiverRoi.ymax == -1 || receiverRoi.ymin == -1) { ny = 1; } } #endif - switch (dataFile_->GetFileFormat()) { + switch (dataFile->GetFileFormat()) { #ifdef HDF5C case HDF5: - dataFile_->CreateFirstHDF5DataFile( + dataFile->CreateFirstHDF5DataFile( filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode, modulePos, numUnitsPerReadout, udpPortNumber, maxFramesPerFile, numImages, nx, ny, dynamicRange); break; #endif case BINARY: - dataFile_->CreateFirstBinaryDataFile( + dataFile->CreateFirstBinaryDataFile( filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode, modulePos, numUnitsPerReadout, udpPortNumber, maxFramesPerFile); break; @@ -163,11 +163,11 @@ void DataProcessor::CreateFirstFiles( #ifdef HDF5C uint32_t DataProcessor::GetFilesInAcquisition() const { - if (dataFile_ == nullptr) { + if (dataFile == nullptr) { throw RuntimeError("No data file object created to get number of " "files in acquiistion"); } - return dataFile_->GetFilesInAcquisition(); + return dataFile->GetFilesInAcquisition(); } std::string DataProcessor::CreateVirtualFile( @@ -178,17 +178,17 @@ std::string DataProcessor::CreateVirtualFile( const int numModX, const int numModY, const uint32_t dynamicRange, std::mutex *hdf5LibMutex) { - if (receiverRoiEnabled_) { + if (receiverRoiEnabled) { throw std::runtime_error("Skipping virtual hdf5 file since rx_roi is enabled."); } bool gotthard25um = - ((detectorType_ == GOTTHARD || detectorType_ == GOTTHARD2) && + ((detType == GOTTHARD || detType == GOTTHARD2) && (numModX * numModY) == 2); // maxframesperfile = 0 for infinite files uint32_t framesPerFile = - ((maxFramesPerFile == 0) ? numFramesCaught_ : maxFramesPerFile); + ((maxFramesPerFile == 0) ? numFramesCaught : maxFramesPerFile); // TODO: assumption 1: create virtual file even if no data in other // files (they exist anyway) assumption2: virtual file max frame index @@ -197,9 +197,9 @@ std::string DataProcessor::CreateVirtualFile( return masterFileUtility::CreateVirtualHDF5File( filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode, modulePos, numUnitsPerReadout, framesPerFile, - generalData_->nPixelsX, generalData_->nPixelsY, dynamicRange, - numFramesCaught_, numModX, numModY, dataFile_->GetPDataType(), - dataFile_->GetParameterNames(), dataFile_->GetParameterDataTypes(), + generalData->nPixelsX, generalData->nPixelsY, dynamicRange, + numFramesCaught, numModX, numModY, dataFile->GetPDataType(), + dataFile->GetParameterNames(), dataFile->GetParameterDataTypes(), hdf5LibMutex, gotthard25um); } @@ -208,16 +208,16 @@ void DataProcessor::LinkFileInMaster(const std::string &masterFileName, const bool silentMode, std::mutex *hdf5LibMutex) { - if (receiverRoiEnabled_) { + if (receiverRoiEnabled) { throw std::runtime_error("Should not be here, roi with hdf5 virtual should throw."); } std::string fname{virtualFileName}, masterfname{masterFileName}; // if no virtual file, link data file if (virtualFileName.empty()) { - fname = dataFile_->GetFileName(); + fname = dataFile->GetFileName(); } masterFileUtility::LinkHDF5FileInMaster(masterfname, fname, - dataFile_->GetParameterNames(), + dataFile->GetParameterNames(), silentMode, hdf5LibMutex); } #endif @@ -228,7 +228,7 @@ std::string DataProcessor::CreateMasterFile( const fileFormat fileFormatType, MasterAttributes *attr, std::mutex *hdf5LibMutex) { - attr->framesInFile = numFramesCaught_; + attr->framesInFile = numFramesCaught; std::unique_ptr masterFile{nullptr}; switch (fileFormatType) { @@ -249,35 +249,35 @@ std::string DataProcessor::CreateMasterFile( void DataProcessor::ThreadExecution() { char *buffer = nullptr; - fifo_->PopAddress(buffer); + fifo->PopAddress(buffer); LOG(logDEBUG5) << "DataProcessor " << index << ", " << std::hex << static_cast(buffer) << std::dec << ":" << buffer; + auto *memImage = reinterpret_cast(buffer); // check dummy - auto numBytes = *reinterpret_cast(buffer); - LOG(logDEBUG1) << "DataProcessor " << index << ", Numbytes:" << numBytes; - if (numBytes == DUMMY_PACKET_VALUE) { + LOG(logDEBUG1) << "DataProcessor " << index << ", Numbytes:" << memImage->size; + if (memImage->size == DUMMY_PACKET_VALUE) { StopProcessing(buffer); return; } try { - ProcessAnImage(buffer); + ProcessAnImage(memImage->header, memImage->size, memImage->firstIndex, memImage->data); } catch (const std::exception &e) { - fifo_->FreeAddress(buffer); + fifo->FreeAddress(buffer); return; } // stream (if time/freq to stream) or free - if (streamCurrentFrame_) { + if (streamCurrentFrame) { // copy the complete image back if roi enabled - if (receiverRoiEnabled_) { - (*((uint32_t *)buffer)) = generalData_->imageSize; - memcpy(buffer + generalData_->fifoBufferHeaderSize, &completeImageToStreamBeforeCropping[0], generalData_->imageSize); + if (receiverRoiEnabled) { + memImage->size = generalData->imageSize; + memcpy(memImage->data, &completeImageToStreamBeforeCropping[0], generalData->imageSize); } - fifo_->PushAddressToStream(buffer); + fifo->PushAddressToStream(buffer); } else { - fifo_->FreeAddress(buffer); + fifo->FreeAddress(buffer); } } @@ -285,92 +285,76 @@ void DataProcessor::StopProcessing(char *buf) { LOG(logDEBUG1) << "DataProcessing " << index << ": Dummy"; // stream or free - if (*dataStreamEnable_) - fifo_->PushAddressToStream(buf); + if (*dataStreamEnable) + fifo->PushAddressToStream(buf); else - fifo_->FreeAddress(buf); + fifo->FreeAddress(buf); CloseFiles(); StopRunning(); LOG(logDEBUG1) << index << ": Processing Completed"; } -void DataProcessor::ProcessAnImage(char *buf) { - - auto *rheader = - reinterpret_cast(buf + FIFO_HEADER_NUMBYTES); - sls_detector_header header = rheader->detHeader; - uint64_t fnum = header.frameNumber; - currentFrameIndex_ = fnum; - numFramesCaught_++; - uint32_t nump = header.packetNumber; - +void DataProcessor::ProcessAnImage(sls_receiver_header & header, size_t &size, size_t &firstImageIndex, char* data) { + uint64_t fnum = header.detHeader.frameNumber; LOG(logDEBUG1) << "DataProcessing " << index << ": fnum:" << fnum; + currentFrameIndex = fnum; + numFramesCaught++; + uint32_t nump = header.detHeader.packetNumber; - if (!startedFlag_) { + if (!startedFlag) { RecordFirstIndex(fnum); - if (*dataStreamEnable_) { + if (*dataStreamEnable) { // restart timer - clock_gettime(CLOCK_REALTIME, &timerbegin_); - timerbegin_.tv_sec -= (*streamingTimerInMs_) / 1000; - timerbegin_.tv_nsec -= ((*streamingTimerInMs_) % 1000) * 1000000; + clock_gettime(CLOCK_REALTIME, &timerbegin); + timerbegin.tv_sec -= (*streamingTimerInMs) / 1000; + timerbegin.tv_nsec -= ((*streamingTimerInMs) % 1000) * 1000000; // to send first image - currentFreqCount_ = *streamingFrequency_ - *streamingStartFnum_; + currentFreqCount = *streamingFrequency - *streamingStartFnum; } } // frame padding - if (activated_ && *framePadding_ && nump < generalData_->packetsPerFrame) - PadMissingPackets(buf); + if (*framePadding && nump < generalData->packetsPerFrame) + PadMissingPackets(header, data); // rearrange ctb digital bits (if ctbDbitlist is not empty) - if (!(*ctbDbitList_).empty()) { - RearrangeDbitData(buf); + if (!(*ctbDbitList).empty()) { + RearrangeDbitData(size, data); } // 'stream Image' check has to be done here before crop image // stream (if time/freq to stream) or free - if (*dataStreamEnable_ && SendToStreamer()) { - // if first frame to stream, add frame index to fifo header (might - // not be the first) - if (firstStreamerFrame_) { - firstStreamerFrame_ = false; - (*((uint32_t *)(buf + FIFO_DATASIZE_NUMBYTES))) = - (uint32_t)(fnum - firstIndex_); + if (*dataStreamEnable && SendToStreamer()) { + if (firstStreamerFrame) { + firstStreamerFrame = false; + // write to memory structure of first streamer frame + firstImageIndex = firstIndex; } - streamCurrentFrame_ = true; + streamCurrentFrame = true; } else { - streamCurrentFrame_ = false; + streamCurrentFrame = false; } - if (receiverRoiEnabled_) { + if (receiverRoiEnabled) { // copy the complete image to stream before cropping - if (streamCurrentFrame_) { - memcpy(&completeImageToStreamBeforeCropping[0], buf + generalData_->fifoBufferHeaderSize, generalData_->imageSize); + if (streamCurrentFrame) { + memcpy(&completeImageToStreamBeforeCropping[0], data, generalData->imageSize); } - CropImage(buf); + CropImage(size, data); } try { // normal call back if (rawDataReadyCallBack != nullptr) { - std::size_t dsize = *reinterpret_cast(buf); - rawDataReadyCallBack(rheader, - buf + FIFO_HEADER_NUMBYTES + - sizeof(sls_receiver_header), - dsize, pRawDataReady); + rawDataReadyCallBack(&header, data, size, pRawDataReady); } // call back with modified size else if (rawDataModifyReadyCallBack != nullptr) { - std::size_t revsize = *reinterpret_cast(buf); - rawDataModifyReadyCallBack(rheader, - buf + FIFO_HEADER_NUMBYTES + - sizeof(sls_receiver_header), - revsize, pRawDataReady); - (*((uint32_t *)buf)) = revsize; + rawDataModifyReadyCallBack(&header, data, size, pRawDataReady); } } catch (const std::exception &e) { throw RuntimeError("Get Data Callback Error: " + @@ -378,14 +362,9 @@ void DataProcessor::ProcessAnImage(char *buf) { } // write to file - if (dataFile_) { + if (dataFile) { try { - dataFile_->WriteToFile( - buf + FIFO_HEADER_NUMBYTES, - sizeof(sls_receiver_header) + - (uint32_t)(*((uint32_t *)buf)), //+ size of data (resizable - // from previous call back - fnum - firstIndex_, nump); + dataFile->WriteToFile(data, header, size, fnum - firstIndex, nump); } catch (const RuntimeError &e) { ; // ignore write exception for now (TODO: send error message // via stopReceiver tcp) @@ -395,7 +374,7 @@ void DataProcessor::ProcessAnImage(char *buf) { bool DataProcessor::SendToStreamer() { // skip - if ((*streamingFrequency_) == 0u) { + if ((*streamingFrequency) == 0u) { if (!CheckTimer()) return false; } else { @@ -409,9 +388,9 @@ bool DataProcessor::CheckTimer() { struct timespec end; clock_gettime(CLOCK_REALTIME, &end); - auto elapsed_s = (end.tv_sec - timerbegin_.tv_sec) + - (end.tv_nsec - timerbegin_.tv_nsec) / 1e9; - double timer_s = *streamingTimerInMs_ / 1e3; + auto elapsed_s = (end.tv_sec - timerbegin.tv_sec) + + (end.tv_nsec - timerbegin.tv_nsec) / 1e9; + double timer_s = *streamingTimerInMs / 1e3; LOG(logDEBUG1) << index << " Timer elapsed time:" << elapsed_s << " seconds"; @@ -421,16 +400,16 @@ bool DataProcessor::CheckTimer() { return false; // restart timer - clock_gettime(CLOCK_REALTIME, &timerbegin_); + clock_gettime(CLOCK_REALTIME, &timerbegin); return true; } bool DataProcessor::CheckCount() { - if (currentFreqCount_ == *streamingFrequency_) { - currentFreqCount_ = 1; + if (currentFreqCount == *streamingFrequency) { + currentFreqCount = 1; return true; } - currentFreqCount_++; + currentFreqCount++; return false; } @@ -446,22 +425,20 @@ void DataProcessor::registerCallBackRawDataModifyReady( pRawDataReady = arg; } -void DataProcessor::PadMissingPackets(char *buf) { +void DataProcessor::PadMissingPackets(sls_receiver_header header, char* data) { LOG(logDEBUG) << index << ": Padding Missing Packets"; - uint32_t pperFrame = generalData_->packetsPerFrame; - auto *header = - reinterpret_cast(buf + FIFO_HEADER_NUMBYTES); - uint32_t nmissing = pperFrame - header->detHeader.packetNumber; - sls_bitset pmask = header->packetsMask; + uint32_t pperFrame = generalData->packetsPerFrame; - uint32_t dsize = generalData_->dataSize; - if (detectorType_ == GOTTHARD2 && index != 0) { - dsize = generalData_->vetoDataSize; + uint32_t nmissing = pperFrame - header.detHeader.packetNumber; + sls_bitset pmask = header.packetsMask; + + uint32_t dsize = generalData->dataSize; + if (detType == GOTTHARD2 && index != 0) { + dsize = generalData->vetoDataSize; } - uint32_t fifohsize = generalData_->fifoBufferHeaderSize; uint32_t corrected_dsize = - dsize - ((pperFrame * dsize) - generalData_->imageSize); + dsize - ((pperFrame * dsize) - generalData->imageSize); LOG(logDEBUG1) << "bitmask: " << pmask.to_string(); for (unsigned int pnum = 0; pnum < pperFrame; ++pnum) { @@ -478,26 +455,26 @@ void DataProcessor::PadMissingPackets(char *buf) { << std::endl; // missing packet - switch (detectorType_) { + switch (detType) { // for gotthard, 1st packet: 4 bytes fnum, CACA + CACA, 639*2 bytes // data // 2nd packet: 4 bytes fnum, previous 1*2 bytes data + // 640*2 bytes data !! case GOTTHARD: if (pnum == 0u) - memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize - 2); + memset(data + (pnum * dsize), 0xFF, dsize - 2); else - memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize + 2); + memset(data + (pnum * dsize), 0xFF, dsize + 2); break; case CHIPTESTBOARD: case MOENCH: if (pnum == (pperFrame - 1)) - memset(buf + fifohsize + (pnum * dsize), 0xFF, corrected_dsize); + memset(data + (pnum * dsize), 0xFF, corrected_dsize); else - memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize); + memset(data + (pnum * dsize), 0xFF, dsize); break; default: - memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize); + memset(data + (pnum * dsize), 0xFF, dsize); break; } --nmissing; @@ -505,11 +482,9 @@ void DataProcessor::PadMissingPackets(char *buf) { } /** ctb specific */ -void DataProcessor::RearrangeDbitData(char *buf) { +void DataProcessor::RearrangeDbitData(size_t & size, char *data) { // TODO! (Erik) Refactor and add tests - int totalSize = (int)(*((uint32_t *)buf)); - int ctbDigitalDataBytes = - totalSize - (*ctbAnalogDataBytes_) - (*ctbDbitOffset_); + int ctbDigitalDataBytes = size - (*ctbAnalogDataBytes) - (*ctbDbitOffset); // no digital data if (ctbDigitalDataBytes == 0) { @@ -519,20 +494,18 @@ void DataProcessor::RearrangeDbitData(char *buf) { } const int numSamples = (ctbDigitalDataBytes / sizeof(uint64_t)); - const int digOffset = FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header) + - (*ctbAnalogDataBytes_); // ceil as numResult8Bits could be decimal const int numResult8Bits = - ceil((numSamples * (*ctbDbitList_).size()) / 8.00); + ceil((numSamples * (*ctbDbitList).size()) / 8.00); std::vector result(numResult8Bits); uint8_t *dest = &result[0]; - auto *source = (uint64_t *)(buf + digOffset + (*ctbDbitOffset_)); + auto *source = (uint64_t *)(data + (*ctbAnalogDataBytes) + (*ctbDbitOffset)); // loop through digital bit enable vector int bitoffset = 0; - for (auto bi : (*ctbDbitList_)) { + for (auto bi : (*ctbDbitList)) { // where numbits * numsamples is not a multiple of 8 if (bitoffset != 0) { bitoffset = 0; @@ -553,18 +526,18 @@ void DataProcessor::RearrangeDbitData(char *buf) { } } - // copy back to buf and update size - memcpy(buf + digOffset, result.data(), numResult8Bits * sizeof(uint8_t)); - (*((uint32_t *)buf)) = numResult8Bits * sizeof(uint8_t); + // copy back to memory and update size + memcpy(data + (*ctbAnalogDataBytes), result.data(), numResult8Bits * sizeof(uint8_t)); + size = numResult8Bits * sizeof(uint8_t); } -void DataProcessor::CropImage(char *buf) { - LOG(logDEBUG) << "Cropping Image to ROI " << ToString(receiverRoi_); - int nPixelsX = generalData_->nPixelsX; - int xmin = receiverRoi_.xmin; - int xmax = receiverRoi_.xmax; - int ymin = receiverRoi_.ymin; - int ymax = receiverRoi_.ymax; +void DataProcessor::CropImage(size_t & size, char *data) { + LOG(logDEBUG) << "Cropping Image to ROI " << ToString(receiverRoi); + int nPixelsX = generalData->nPixelsX; + int xmin = receiverRoi.xmin; + int xmax = receiverRoi.xmax; + int ymin = receiverRoi.ymin; + int ymax = receiverRoi.ymax; int xwidth = xmax - xmin + 1; int ywidth = ymax - ymin + 1; if (ymin == -1 || ymax == -1) { @@ -573,16 +546,16 @@ void DataProcessor::CropImage(char *buf) { } // calculate total roi size - double bytesPerPixel = generalData_->dynamicRange / 8.00; + double bytesPerPixel = generalData->dynamicRange / 8.00; int startOffset = (int)((nPixelsX * ymin + xmin) * bytesPerPixel); - // write size into fifo buffer header + // write size into memory std::size_t roiImageSize = xwidth * ywidth * bytesPerPixel; LOG(logDEBUG) << "roiImageSize:" << roiImageSize; - (*((uint32_t *)buf)) = roiImageSize; + size = roiImageSize; // copy the roi to the beginning of the image - char *dstOffset = buf + generalData_->fifoBufferHeaderSize; + char *dstOffset = data; char *srcOffset = dstOffset + startOffset; // entire width @@ -594,7 +567,7 @@ void DataProcessor::CropImage(char *buf) { for (int y = 0; y != ywidth; ++y) { memcpy(dstOffset, srcOffset, xwidth * bytesPerPixel); dstOffset += (int)(xwidth * bytesPerPixel); - srcOffset += (int)(generalData_->nPixelsX * bytesPerPixel); + srcOffset += (int)(generalData->nPixelsX * bytesPerPixel); } } } diff --git a/slsReceiverSoftware/src/DataProcessor.h b/slsReceiverSoftware/src/DataProcessor.h index ce54e0b3d..82ec80779 100644 --- a/slsReceiverSoftware/src/DataProcessor.h +++ b/slsReceiverSoftware/src/DataProcessor.h @@ -29,11 +29,11 @@ struct MasterAttributes; class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { public: - DataProcessor(int index, detectorType detectorType, Fifo *fifo, - bool *dataStreamEnable, uint32_t *streamingFrequency, - uint32_t *streamingTimerInMs, uint32_t *streamingStartFnum, - bool *framePadding, std::vector *ctbDbitList, - int *ctbDbitOffset, int *ctbAnalogDataBytes); + DataProcessor(int index, detectorType dType, Fifo *f, + bool *dse, uint32_t *sf, + uint32_t *st, uint32_t *sfnum, + bool *fp, std::vector *ctblist, + int *ctboff, int *ctbad); ~DataProcessor() override; @@ -114,7 +114,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { * Process an image popped from fifo, * write to file if fw enabled & update parameters */ - void ProcessAnImage(char *buf); + void ProcessAnImage(sls_receiver_header & header, size_t &size, size_t &firstImageIndex, char* data); /** * Calls CheckTimer and CheckCount for streaming frequency and timer @@ -137,52 +137,52 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject { */ bool CheckCount(); - void PadMissingPackets(char *buf); + void PadMissingPackets(sls_receiver_header header, char* data); /** * Align corresponding digital bits together (CTB only if ctbDbitlist is not * empty) */ - void RearrangeDbitData(char *buf); + void RearrangeDbitData(size_t & size, char *data); - void CropImage(char *buf); + void CropImage(size_t & size, char *data); - static const std::string typeName_; + static const std::string typeName; - const GeneralData *generalData_{nullptr}; - Fifo *fifo_; - detectorType detectorType_; - bool *dataStreamEnable_; - bool activated_{false}; - ROI receiverRoi_{}; - bool receiverRoiEnabled_{false}; - bool receiverNoRoi_{false}; + const GeneralData *generalData{nullptr}; + Fifo *fifo; + detectorType detType; + bool *dataStreamEnable; + bool activated{false}; + ROI receiverRoi{}; + bool receiverRoiEnabled{false}; + bool receiverNoRoi{false}; std::unique_ptr completeImageToStreamBeforeCropping; /** if 0, sending random images with a timer */ - uint32_t *streamingFrequency_; - uint32_t *streamingTimerInMs_; - uint32_t *streamingStartFnum_; - uint32_t currentFreqCount_{0}; - struct timespec timerbegin_ {}; - bool *framePadding_; - std::vector *ctbDbitList_; - int *ctbDbitOffset_; - int *ctbAnalogDataBytes_; - std::atomic startedFlag_{false}; - std::atomic firstIndex_{0}; + uint32_t *streamingFrequency; + uint32_t *streamingTimerInMs; + uint32_t *streamingStartFnum; + uint32_t currentFreqCount{0}; + struct timespec timerbegin {}; + bool *framePadding; + std::vector *ctbDbitList; + int *ctbDbitOffset; + int *ctbAnalogDataBytes; + std::atomic startedFlag{false}; + std::atomic firstIndex{0}; // for statistics - uint64_t numFramesCaught_{0}; + uint64_t numFramesCaught{0}; /** Frame Number of latest processed frame number */ - std::atomic currentFrameIndex_{0}; + std::atomic currentFrameIndex{0}; /** first streamer frame to add frame index in fifo header */ - bool firstStreamerFrame_{false}; + bool firstStreamerFrame{false}; - bool streamCurrentFrame_{false}; + bool streamCurrentFrame{false}; - File *dataFile_{nullptr}; + File *dataFile{nullptr}; // call back /** diff --git a/slsReceiverSoftware/src/DataStreamer.cpp b/slsReceiverSoftware/src/DataStreamer.cpp index 5d89b0126..bb600c849 100644 --- a/slsReceiverSoftware/src/DataStreamer.cpp +++ b/slsReceiverSoftware/src/DataStreamer.cpp @@ -52,12 +52,9 @@ void DataStreamer::ResetParametersforNewAcquisition(const std::string &fname) { } } -void DataStreamer::RecordFirstIndex(uint64_t fnum, char *buf) { +void DataStreamer::RecordFirstIndex(uint64_t fnum, size_t firstImageIndex) { startedFlag = true; - // streamer first index needn't be - uint64_t firstVal = fnum - (*((uint32_t *)(buf + FIFO_DATASIZE_NUMBYTES))); - - firstIndex = firstVal; + firstIndex = firstImageIndex; LOG(logDEBUG1) << index << " First Index: " << firstIndex << ", First Streamer Index:" << fnum; } @@ -110,20 +107,23 @@ void DataStreamer::CloseZmqSocket() { void DataStreamer::ThreadExecution() { char *buffer = nullptr; fifo->PopAddressToStream(buffer); - LOG(logDEBUG5) << "DataStreamer " << index - << ", " - "pop 0x" + LOG(logDEBUG5) << "DataStreamer " << index << ", pop 0x" << std::hex << (void *)(buffer) << std::dec << ":" << buffer; + auto *memImage = reinterpret_cast(buffer); // check dummy - auto numBytes = *reinterpret_cast(buffer); - LOG(logDEBUG1) << "DataStreamer " << index << ", Numbytes:" << numBytes; - if (numBytes == DUMMY_PACKET_VALUE) { + LOG(logDEBUG1) << "DataStreamer " << index << ", Numbytes:" << memImage->size ; + if (memImage->size == DUMMY_PACKET_VALUE) { StopProcessing(buffer); return; } - ProcessAnImage(buffer); + // streamer first index needn't be the very first index + if (!startedFlag) { + RecordFirstIndex(memImage->header.detHeader.frameNumber, memImage->firstIndex); + } + + ProcessAnImage(memImage->header.detHeader, memImage->size, memImage->data); // free fifo->FreeAddress(buffer); @@ -131,12 +131,9 @@ void DataStreamer::ThreadExecution() { void DataStreamer::StopProcessing(char *buf) { LOG(logDEBUG1) << "DataStreamer " << index << ": Dummy"; - - sls_receiver_header *header = (sls_receiver_header *)(buf); - // send dummy header and data - if (!SendHeader(header, 0, 0, 0, true)) { - LOG(logERROR) << "Could not send zmq dummy header for streamer " - << index; + if (!SendDummyHeader()) { + LOG(logERROR) << "Could not send zmq dummy header for streamer for port " + << zmqSocket->GetPortNumber(); } fifo->FreeAddress(buf); @@ -145,38 +142,26 @@ void DataStreamer::StopProcessing(char *buf) { } /** buf includes only the standard header */ -void DataStreamer::ProcessAnImage(char *buf) { - - sls_receiver_header *header = - (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES); - uint64_t fnum = header->detHeader.frameNumber; +void DataStreamer::ProcessAnImage(sls_detector_header header, size_t size, char* data) { + + uint64_t fnum = header.frameNumber; LOG(logDEBUG1) << "DataStreamer " << index << ": fnum:" << fnum; - if (!startedFlag) { - RecordFirstIndex(fnum, buf); - } - auto numBytes = *reinterpret_cast(buf); - // shortframe gotthard if (completeBuffer) { // disregarding the size modified from callback (always using - // imageSizeComplete - // instead of buf (32 bit) because gui needs imagesizecomplete and - // listener - // write imagesize + // imageSizeComplete instead of size because gui needs + // imagesizecomplete and listener writes imagesize to size - if (!SendHeader(header, generalData->imageSizeComplete, - generalData->nPixelsXComplete, - generalData->nPixelsYComplete, false)) { + if (!SendDataHeader(header, generalData->imageSizeComplete, + generalData->nPixelsXComplete, generalData->nPixelsYComplete)) { LOG(logERROR) << "Could not send zmq header for fnum " << fnum << " and streamer " << index; } memcpy(completeBuffer + ((generalData->imageSize) * adcConfigured), - buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header), - numBytes); + data, size); - if (!zmqSocket->SendData(completeBuffer, - generalData->imageSizeComplete)) { + if (!zmqSocket->SendData(completeBuffer, generalData->imageSizeComplete)) { LOG(logERROR) << "Could not send zmq data for fnum " << fnum << " and streamer " << index; } @@ -185,33 +170,29 @@ void DataStreamer::ProcessAnImage(char *buf) { // normal else { - if (!SendHeader(header, numBytes, generalData->nPixelsX, - generalData->nPixelsY, - false)) { // new size possibly from callback + if (!SendDataHeader(header, size, generalData->nPixelsX, generalData->nPixelsY)) { LOG(logERROR) << "Could not send zmq header for fnum " << fnum << " and streamer " << index; } - if (!zmqSocket->SendData(buf + FIFO_HEADER_NUMBYTES + - sizeof(sls_receiver_header), - numBytes)) { // new size possibly from callback + if (!zmqSocket->SendData(data, size)) { LOG(logERROR) << "Could not send zmq data for fnum " << fnum << " and streamer " << index; } } } -int DataStreamer::SendHeader(sls_receiver_header *rheader, uint32_t size, - uint32_t nx, uint32_t ny, bool dummy) { - +int DataStreamer::SendDummyHeader() { zmqHeader zHeader; - zHeader.data = !dummy; + zHeader.data = false; zHeader.jsonversion = SLS_DETECTOR_JSON_HEADER_VERSION; + return zmqSocket->SendHeader(index, zHeader); +} - if (dummy) { - return zmqSocket->SendHeader(index, zHeader); - } - - sls_detector_header header = rheader->detHeader; +int DataStreamer::SendDataHeader(sls_detector_header header, uint32_t size, + uint32_t nx, uint32_t ny) { + zmqHeader zHeader; + zHeader.data = true; + zHeader.jsonversion = SLS_DETECTOR_JSON_HEADER_VERSION; uint64_t frameIndex = header.frameNumber - firstIndex; uint64_t acquisitionIndex = header.frameNumber; @@ -258,12 +239,7 @@ int DataStreamer::SendHeader(sls_receiver_header *rheader, uint32_t size, } void DataStreamer::RestreamStop() { - // send dummy header - zmqHeader zHeader; - zHeader.data = false; - zHeader.jsonversion = SLS_DETECTOR_JSON_HEADER_VERSION; - int ret = zmqSocket->SendHeader(index, zHeader); - if (!ret) { + if (!SendDummyHeader()) { throw RuntimeError( "Could not restream Dummy Header via ZMQ for port " + std::to_string(zmqSocket->GetPortNumber())); diff --git a/slsReceiverSoftware/src/DataStreamer.h b/slsReceiverSoftware/src/DataStreamer.h index 49c3c3d81..4279f7db6 100644 --- a/slsReceiverSoftware/src/DataStreamer.h +++ b/slsReceiverSoftware/src/DataStreamer.h @@ -72,10 +72,9 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { private: /** * Record First Index - * @param fnum current frame number - * @param buf get frame index from buffer to calculate first index to record */ - void RecordFirstIndex(uint64_t fnum, char *buf); + void RecordFirstIndex(uint64_t fnum, size_t firstImageIndex); + void ThreadExecution(); /** @@ -88,19 +87,21 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject { * Process an image popped from fifo, * write to file if fw enabled & update parameters */ - void ProcessAnImage(char *buf); + void ProcessAnImage(sls_detector_header header, size_t size, char* data); + int SendDummyHeader(); + /** * Create and send Json Header * @param rheader header of image * @param size data size (could have been modified in call back) * @param nx number of pixels in x dim * @param ny number of pixels in y dim - * @param dummy true if its a dummy header * @returns 0 if error, else 1 */ - int SendHeader(sls_receiver_header *rheader, uint32_t size = 0, - uint32_t nx = 0, uint32_t ny = 0, bool dummy = true); + int SendDataHeader(sls_detector_header header, uint32_t size = 0, + uint32_t nx = 0, uint32_t ny = 0); + static const std::string TypeName; const GeneralData *generalData{nullptr}; diff --git a/slsReceiverSoftware/src/Fifo.cpp b/slsReceiverSoftware/src/Fifo.cpp index ccb01108d..25772dd54 100644 --- a/slsReceiverSoftware/src/Fifo.cpp +++ b/slsReceiverSoftware/src/Fifo.cpp @@ -17,7 +17,7 @@ namespace sls { -Fifo::Fifo(int ind, uint32_t fifoItemSize, uint32_t depth) +Fifo::Fifo(int ind, size_t fifoItemSize, uint32_t depth) : index(ind), memory(nullptr), fifoBound(nullptr), fifoFree(nullptr), fifoStream(nullptr), fifoDepth(depth), status_fifoBound(0), status_fifoFree(depth) { @@ -30,7 +30,7 @@ Fifo::~Fifo() { DestroyFifos(); } -void Fifo::CreateFifos(uint32_t fifoItemSize) { +void Fifo::CreateFifos(size_t fifoItemSize) { LOG(logDEBUG3) << __SHORT_AT__ << " called"; // destroy if not already @@ -41,7 +41,7 @@ void Fifo::CreateFifos(uint32_t fifoItemSize) { fifoFree = new CircularFifo(fifoDepth); fifoStream = new CircularFifo(fifoDepth); // allocate memory - size_t mem_len = (size_t)fifoItemSize * (size_t)fifoDepth * sizeof(char); + size_t mem_len = fifoItemSize * (size_t)fifoDepth * sizeof(char); memory = (char *)malloc(mem_len); if (memory == nullptr) { throw RuntimeError("Could not allocate memory for fifos"); diff --git a/slsReceiverSoftware/src/Fifo.h b/slsReceiverSoftware/src/Fifo.h index c55e6fc9f..ec7d032cf 100644 --- a/slsReceiverSoftware/src/Fifo.h +++ b/slsReceiverSoftware/src/Fifo.h @@ -28,7 +28,7 @@ class Fifo : private virtual slsDetectorDefs { * @param fifoItemSize size of each fifo item * @param depth fifo depth */ - Fifo(int ind, uint32_t fifoItemSize, uint32_t depth); + Fifo(int ind, size_t fifoItemSize, uint32_t depth); /** * Destructor @@ -82,7 +82,7 @@ class Fifo : private virtual slsDetectorDefs { * Create Fifos, allocate memory & push addresses into fifo * @param fifoItemSize size of each fifo item */ - void CreateFifos(uint32_t fifoItemSize); + void CreateFifos(size_t fifoItemSize); /** * Destroy Fifos and deallocate memory diff --git a/slsReceiverSoftware/src/File.h b/slsReceiverSoftware/src/File.h index 957d414cd..ca832ca1e 100644 --- a/slsReceiverSoftware/src/File.h +++ b/slsReceiverSoftware/src/File.h @@ -81,9 +81,7 @@ class File : private virtual slsDetectorDefs { "should be overloaded by a derived class"; }; - virtual void WriteToFile(char *buffer, const int buffersize, - const uint64_t currentFrameNumber, - const uint32_t numPacketsCaught) = 0; + virtual void WriteToFile(char *imageData, sls_receiver_header& header, const int imageSize, const uint64_t currentFrameNumber,const uint32_t numPacketsCaught) = 0; protected: slsDetectorDefs::fileFormat format_; diff --git a/slsReceiverSoftware/src/GeneralData.h b/slsReceiverSoftware/src/GeneralData.h index 4c71f8f15..2230e4ffd 100644 --- a/slsReceiverSoftware/src/GeneralData.h +++ b/slsReceiverSoftware/src/GeneralData.h @@ -37,8 +37,6 @@ class GeneralData { uint32_t packetIndexMask{0}; uint32_t packetIndexOffset{0}; uint32_t maxFramesPerFile{0}; - /** Header size of data saved into fifo buffer at a time*/ - uint32_t fifoBufferHeaderSize{0}; uint32_t defaultFifoDepth{0}; uint32_t numUDPInterfaces{1}; uint32_t headerPacketSize{0}; @@ -167,8 +165,6 @@ class GotthardData : public GeneralData { nPixelsY = 1; headerSizeinPacket = 6; maxFramesPerFile = MAX_FRAMES_PER_FILE; - fifoBufferHeaderSize = - FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header); UpdateImageSize(); }; @@ -297,8 +293,6 @@ class EigerData : public GeneralData { myDetectorType = slsDetectorDefs::EIGER; headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header); maxFramesPerFile = EIGER_MAX_FRAMES_PER_FILE; - fifoBufferHeaderSize = - FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header); numUDPInterfaces = 2; headerPacketSize = 40; standardheader = true; @@ -337,8 +331,6 @@ class JungfrauData : public GeneralData { dataSize = 8192; packetSize = headerSizeinPacket + dataSize; maxFramesPerFile = JFRAU_MAX_FRAMES_PER_FILE; - fifoBufferHeaderSize = - FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header); defaultFifoDepth = 2500; standardheader = true; maxRowsPerReadout = 512; @@ -371,8 +363,6 @@ class Mythen3Data : public GeneralData { nPixelsY = 1; headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header); maxFramesPerFile = MYTHEN3_MAX_FRAMES_PER_FILE; - fifoBufferHeaderSize = - FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header); defaultFifoDepth = 50000; standardheader = true; defaultUdpSocketBufferSize = (1000 * 1024 * 1024); @@ -443,8 +433,6 @@ class Gotthard2Data : public GeneralData { headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header); dataSize = 2560; // 1280 channels * 2 bytes maxFramesPerFile = GOTTHARD2_MAX_FRAMES_PER_FILE; - fifoBufferHeaderSize = - FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header); defaultFifoDepth = 50000; standardheader = true; vetoDataSize = 160; @@ -501,8 +489,6 @@ class ChipTestBoardData : public GeneralData { frameIndexOffset = 8; // 10g packetIndexMask = 0xFF; // 10g maxFramesPerFile = CTB_MAX_FRAMES_PER_FILE; - fifoBufferHeaderSize = - FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header); defaultFifoDepth = 2500; standardheader = true; UpdateImageSize(); @@ -590,8 +576,6 @@ class MoenchData : public GeneralData { headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header); frameIndexMask = 0xFFFFFF; maxFramesPerFile = MOENCH_MAX_FRAMES_PER_FILE; - fifoBufferHeaderSize = - FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header); defaultFifoDepth = 2500; standardheader = true; UpdateImageSize(); diff --git a/slsReceiverSoftware/src/HDF5DataFile.cpp b/slsReceiverSoftware/src/HDF5DataFile.cpp index 3af531145..572b3eecc 100644 --- a/slsReceiverSoftware/src/HDF5DataFile.cpp +++ b/slsReceiverSoftware/src/HDF5DataFile.cpp @@ -222,9 +222,7 @@ void HDF5DataFile::CreateFile() { } } -void HDF5DataFile::WriteToFile(char *buffer, const int buffersize, - const uint64_t currentFrameNumber, - const uint32_t numPacketsCaught) { +void HDF5DataFile::WriteToFile(char *imageData, sls_receiver_header& header, const int imageSize, const uint64_t currentFrameNumber, const uint32_t numPacketsCaught) { // check if maxframesperfile = 0 for infinite if (maxFramesPerFile_ && (numFramesInFile_ >= maxFramesPerFile_)) { @@ -240,8 +238,8 @@ void HDF5DataFile::WriteToFile(char *buffer, const int buffersize, ExtendDataset(); } - WriteDataFile(currentFrameNumber, buffer + sizeof(sls_receiver_header)); - WriteParameterDatasets(currentFrameNumber, (sls_receiver_header *)(buffer)); + WriteDataFile(currentFrameNumber, imageData); + WriteParameterDatasets(currentFrameNumber, header); } void HDF5DataFile::Convert12to16Bit(uint16_t *dst, uint8_t *src) { @@ -301,14 +299,14 @@ void HDF5DataFile::WriteDataFile(const uint64_t currentFrameNumber, } void HDF5DataFile::WriteParameterDatasets(const uint64_t currentFrameNumber, - sls_receiver_header *rheader) { + sls_receiver_header rheader) { std::lock_guard lock(*hdf5Lib_); uint64_t fnum = ((maxFramesPerFile_ == 0) ? currentFrameNumber : currentFrameNumber % maxFramesPerFile_); - sls_detector_header header = rheader->detHeader; + sls_detector_header header = rheader.detHeader; hsize_t count[1] = {1}; hsize_t start[1] = {fnum}; int i = 0; @@ -358,7 +356,7 @@ void HDF5DataFile::WriteParameterDatasets(const uint64_t currentFrameNumber, // contiguous bitset if (sizeof(sls_bitset) == sizeof(bitset_storage)) { - dataSetPara_[13]->write((char *)&(rheader->packetsMask), + dataSetPara_[13]->write((char *)&(rheader.packetsMask), parameterDataTypes_[13], memspace, *dataSpacePara_); } @@ -368,7 +366,7 @@ void HDF5DataFile::WriteParameterDatasets(const uint64_t currentFrameNumber, // get contiguous representation of bit mask bitset_storage storage; memset(storage, 0, sizeof(bitset_storage)); - sls_bitset bits = rheader->packetsMask; + sls_bitset bits = rheader.packetsMask; for (int i = 0; i < MAX_NUM_PACKETS; ++i) storage[i >> 3] |= (bits[i] << (i & 7)); // write bitmask diff --git a/slsReceiverSoftware/src/HDF5DataFile.h b/slsReceiverSoftware/src/HDF5DataFile.h index a6d32ae15..c2e11d9b3 100644 --- a/slsReceiverSoftware/src/HDF5DataFile.h +++ b/slsReceiverSoftware/src/HDF5DataFile.h @@ -31,16 +31,14 @@ class HDF5DataFile : private virtual slsDetectorDefs, public File { const uint32_t nPixelsX, const uint32_t nPixelsY, const uint32_t dynamicRange) override; - void WriteToFile(char *buffer, const int buffersize, - const uint64_t currentFrameNumber, - const uint32_t numPacketsCaught) override; + void WriteToFile(char *imageData, sls_receiver_header& header, const int imageSize, const uint64_t currentFrameNumber, const uint32_t numPacketsCaught) override; private: void CreateFile(); void Convert12to16Bit(uint16_t *dst, uint8_t *src); void WriteDataFile(const uint64_t currentFrameNumber, char *buffer); void WriteParameterDatasets(const uint64_t currentFrameNumber, - sls_receiver_header *rheader); + sls_receiver_header rheader); void ExtendDataset(); int index_; diff --git a/slsReceiverSoftware/src/Implementation.cpp b/slsReceiverSoftware/src/Implementation.cpp index 54721701d..007b8901e 100644 --- a/slsReceiverSoftware/src/Implementation.cpp +++ b/slsReceiverSoftware/src/Implementation.cpp @@ -67,16 +67,16 @@ void Implementation::SetThreadPriorities() { void Implementation::SetupFifoStructure() { fifo.clear(); for (int i = 0; i < numUDPInterfaces; ++i) { - uint32_t datasize = generalData->imageSize; + size_t datasize = generalData->imageSize; // veto data size if (detType == GOTTHARD2 && i != 0) { datasize = generalData->vetoImageSize; } + datasize += IMAGE_STRUCTURE_HEADER_SIZE; // create fifo structure try { - fifo.push_back(sls::make_unique( - i, datasize + (generalData->fifoBufferHeaderSize), fifoDepth)); + fifo.push_back(sls::make_unique(i, datasize, fifoDepth)); } catch (...) { fifo.clear(); fifoDepth = 0; @@ -93,9 +93,7 @@ void Implementation::SetupFifoStructure() { dataStreamer[i]->SetFifo(fifo[i].get()); LOG(logINFO) << "Memory Allocated for Fifo " << i << ": " - << (double)(((size_t)(datasize) + - (size_t)(generalData->fifoBufferHeaderSize)) * - (size_t)fifoDepth) / + << (double)(datasize * (size_t)fifoDepth) / (double)(1024 * 1024) << " MB"; } @@ -181,8 +179,7 @@ void Implementation::setDetectorType(const detectorType d) { listener.push_back(sls::make_unique( i, detType, fifo_ptr, &status, &udpPortNum[i], ð[i], &udpSocketBufferSize, &actualUDPSocketBufferSize, - &framesPerFile, &frameDiscardMode, &detectorDataStream[i], - &silentMode)); + &framesPerFile, &frameDiscardMode, &silentMode)); int ctbAnalogDataBytes = 0; if (detType == CHIPTESTBOARD) { ctbAnalogDataBytes = generalData->GetNumberOfAnalogDatabytes(); @@ -201,9 +198,10 @@ void Implementation::setDetectorType(const detectorType d) { } // set up writer and callbacks - for (const auto &it : listener) { - it->SetGeneralData(generalData); - it->SetActivate(activated); + for (int i = 0; i != (int)listener.size(); ++i) { + listener[i]->SetGeneralData(generalData); + listener[i]->SetActivate(activated); + listener[i]->SetDetectorDatastream(detectorDataStream[i]); } for (const auto &it : dataProcessor) { it->SetGeneralData(generalData); @@ -580,15 +578,25 @@ std::vector Implementation::getCurrentFrameIndex() const { } double Implementation::getProgress() const { - if (!activated || (!detectorDataStream[0] && !detectorDataStream[1])) { + std::vector disabledPort; + for (auto &it : listener) { + disabledPort.push_back(it->isPortDisabled()); + } + + // all ports disabled + if (allEqualTo(disabledPort, true)) { return 100.00; } - // if disabled, considering only 1 port + // any disabled double totalFrames = (double)(numberOfTotalFrames * listener.size()); - if (!detectorDataStream[0] || !detectorDataStream[1]) { - totalFrames /= 2; - } + if (anyEqualTo(disabledPort, true)) { + for (auto it : disabledPort) { + if (it) { + totalFrames /= 2; + } + } + } double progress = 0; int index = 0; @@ -1003,11 +1011,11 @@ void Implementation::setNumberofUDPInterfaces(const int n) { listener.push_back(sls::make_unique( i, detType, fifo_ptr, &status, &udpPortNum[i], ð[i], &udpSocketBufferSize, &actualUDPSocketBufferSize, - &framesPerFile, &frameDiscardMode, &detectorDataStream[i], - &silentMode)); + &framesPerFile, &frameDiscardMode, &silentMode)); listener[i]->SetGeneralData(generalData); listener[i]->SetActivate(activated); listener[i]->SetNoRoi(portRois[i].noRoi()); + listener[i]->SetDetectorDatastream(detectorDataStream[i]); int ctbAnalogDataBytes = 0; if (detType == CHIPTESTBOARD) { diff --git a/slsReceiverSoftware/src/Listener.cpp b/slsReceiverSoftware/src/Listener.cpp index d10598a9b..6ba287016 100644 --- a/slsReceiverSoftware/src/Listener.cpp +++ b/slsReceiverSoftware/src/Listener.cpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace sls { @@ -26,16 +27,20 @@ const std::string Listener::TypeName = "Listener"; Listener::Listener(int ind, detectorType dtype, Fifo *f, std::atomic *s, uint32_t *portno, std::string *e, int *us, int *as, uint32_t *fpf, frameDiscardPolicy *fdp, - bool *detds, bool *sm) + bool *sm) : ThreadObject(ind, TypeName), fifo(f), myDetectorType(dtype), status(s), udpPortNumber(portno), eth(e), udpSocketBufferSize(us), - actualUDPSocketBufferSize(as), framesPerFile(fpf), frameDiscardMode(fdp), - detectorDataStream(detds), silentMode(sm) { + actualUDPSocketBufferSize(as), framesPerFile(fpf), frameDiscardMode(fdp), silentMode(sm) { LOG(logDEBUG) << "Listener " << ind << " created"; } Listener::~Listener() = default; +bool Listener::isPortDisabled() const { + return disabledPort; +} + + uint64_t Listener::GetPacketsCaught() const { return numPacketsCaught; } uint64_t Listener::GetNumCompleteFramesCaught() const { @@ -48,7 +53,7 @@ uint64_t Listener::GetLastFrameIndexCaught() const { int64_t Listener::GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const { - if (!activated || !(*detectorDataStream) || noRoi) { + if (disabledPort) { return 0; } if (!stoppedFlag) { @@ -101,7 +106,6 @@ void Listener::RecordFirstIndex(uint64_t fnum) { // listen to this fnum, later +1 currentFrameIndex = fnum; lastCaughtFrameIndex = fnum; - startedFlag = true; firstIndex = fnum; @@ -114,12 +118,23 @@ void Listener::RecordFirstIndex(uint64_t fnum) { void Listener::SetGeneralData(GeneralData *g) { generalData = g; } -void Listener::SetActivate(bool enable) { activated = enable; } +void Listener::SetActivate(bool enable) { + activated = enable; + disabledPort = (!activated || !detectorDataStream || noRoi); +} -void Listener::SetNoRoi(bool enable) {noRoi = enable; } +void Listener::SetDetectorDatastream(bool enable) { + detectorDataStream = enable; + disabledPort = (!activated || !detectorDataStream || noRoi); +} + +void Listener::SetNoRoi(bool enable) { + noRoi = enable; + disabledPort = (!activated || !detectorDataStream || noRoi); +} void Listener::CreateUDPSockets() { - if (!activated || !(*detectorDataStream) || noRoi) { + if (disabledPort) { return; } @@ -160,6 +175,8 @@ void Listener::CreateUDPSockets() { void Listener::ShutDownUDPSocket() { if (udpSocket) { udpSocketAlive = false; + // give other thread time after udpSocketAlive is changed + usleep(0); udpSocket->Shutdown(); LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber; } @@ -169,7 +186,7 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int s) { LOG(logINFO) << "Testing UDP Socket Buffer size " << s << " with test port " << *udpPortNumber; - if (!activated || !(*detectorDataStream) || noRoi) { + if (disabledPort) { *actualUDPSocketBufferSize = (s * 2); return; } @@ -219,50 +236,29 @@ void Listener::SetHardCodedPosition(uint16_t r, uint16_t c) { void Listener::ThreadExecution() { char *buffer; - int rc = 0; - fifo->GetNewAddress(buffer); - LOG(logDEBUG5) << "Listener " << index - << ", " - "pop 0x" + LOG(logDEBUG5) << "Listener " << index << ", pop 0x" << std::hex << (void *)(buffer) << std::dec << ":" << buffer; + auto *memImage = reinterpret_cast(buffer); // udpsocket doesnt exist - if (activated && *detectorDataStream && !noRoi &&!udpSocketAlive && !carryOverFlag) { - // LOG(logERROR) << "Listening_Thread " << index << ": UDP Socket not - // created or shut down earlier"; - (*((uint32_t *)buffer)) = 0; - StopListening(buffer); + if ((*status == TRANSMITTING || !udpSocketAlive) && !carryOverFlag) { + StopListening(buffer, memImage->size); return; } - // get data - if ((*status != TRANSMITTING && - (!activated || !(*detectorDataStream) || noRoi || udpSocketAlive)) || - carryOverFlag) { - rc = ListenToAnImage(buffer); + // reset header and size and get data + memset(memImage, 0, IMAGE_STRUCTURE_HEADER_SIZE); + int rc = ListenToAnImage(memImage->header, memImage->data); + + // end of acquisition or discarding image + if (rc <= 0) { + fifo->FreeAddress(buffer); + return; } - // error check, (should not be here) if not transmitting yet (previous if) - // rc should be > 0 - if (rc == 0) { - if (!udpSocketAlive) { - (*((uint32_t *)buffer)) = 0; - StopListening(buffer); - } else - fifo->FreeAddress(buffer); - return; - } - - // discarding image - else if (rc < 0) { - fifo->FreeAddress(buffer); - return; - } - - (*((uint32_t *)buffer)) = rc; - - // push into fifo + // valid image, set size and push into fifo + memImage->size = rc; fifo->PushAddress(buffer); // Statistics @@ -276,69 +272,45 @@ void Listener::ThreadExecution() { } } -void Listener::StopListening(char *buf) { - (*((uint32_t *)buf)) = DUMMY_PACKET_VALUE; +void Listener::StopListening(char *buf, size_t & size) { + size = DUMMY_PACKET_VALUE; fifo->PushAddress(buf); StopRunning(); - LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber + LOG(logDEBUG1) << index << ": Listening Completed. Packets (" << *udpPortNumber << ") : " << numPacketsCaught; - LOG(logDEBUG1) << index << ": Listening Completed"; } /* buf includes the fifo header and packet header */ -uint32_t Listener::ListenToAnImage(char *buf) { +uint32_t Listener::ListenToAnImage(sls_receiver_header & dstHeader, char *dstData) { - int rc = 0; uint64_t fnum = 0; uint32_t pnum = 0; uint64_t bnum = 0; uint32_t numpackets = 0; + uint32_t dsize = generalData->dataSize; uint32_t imageSize = generalData->imageSize; uint32_t packetSize = generalData->packetSize; uint32_t hsize = generalData->headerSizeinPacket; - uint32_t fifohsize = generalData->fifoBufferHeaderSize; - bool standardheader = generalData->standardheader; + bool standardHeader = generalData->standardheader; if (myDetectorType == GOTTHARD2 && index != 0) { dsize = generalData->vetoDataSize; imageSize = generalData->vetoImageSize; packetSize = generalData->vetoPacketSize; hsize = generalData->vetoHsize; - standardheader = false; + standardHeader = false; } uint32_t pperFrame = generalData->packetsPerFrame; bool isHeaderEmpty = true; - sls_detector_header *old_header = nullptr; - sls_receiver_header *new_header = nullptr; uint32_t corrected_dsize = dsize - ((pperFrame * dsize) - imageSize); + sls_detector_header *srcDetHeader = nullptr; - // reset to -1 - memset(buf, 0, fifohsize); - new_header = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES); - - // deactivated port (eiger) or deactivated (eiger) - if (!(*detectorDataStream) || !activated || noRoi) { - return 0; - } - - // look for carry over + // carry over packet if (carryOverFlag) { LOG(logDEBUG3) << index << "carry flag"; - // check if its the current image packet - // -------------------------- new header - // ---------------------------------------------------------------------- - if (standardheader) { - old_header = (sls_detector_header *)(&carryOverPacket[0]); - fnum = old_header->frameNumber; - pnum = old_header->packetNumber; - } - // -------------------old header - // ----------------------------------------------------------------------------- - else { - generalData->GetHeaderInfo(index, &carryOverPacket[0], - oddStartingPacket, fnum, pnum, bnum); - } - //------------------------------------------------------------------------------------------------------------ + GetPacketIndices(fnum, pnum, bnum, standardHeader, carryOverPacket.get(), srcDetHeader); + + // future packet if (fnum != currentFrameIndex) { if (fnum < currentFrameIndex) { LOG(logERROR) @@ -347,156 +319,31 @@ uint32_t Listener::ListenToAnImage(char *buf) { carryOverFlag = false; return 0; } - switch (*frameDiscardMode) { - case DISCARD_EMPTY_FRAMES: - if (!numpackets) { - LOG(logDEBUG) - << index << " Skipped fnum:" << currentFrameIndex; - currentFrameIndex = fnum; - return -1; - } - break; - case DISCARD_PARTIAL_FRAMES: - LOG(logDEBUG) - << index << " discarding fnum:" << currentFrameIndex; - currentFrameIndex = fnum; - return -1; - default: - break; - } - new_header->detHeader.packetNumber = numpackets; - if (isHeaderEmpty) { - new_header->detHeader.row = row; - new_header->detHeader.column = column; - } - new_header->detHeader.frameNumber = currentFrameIndex; - ++currentFrameIndex; - return imageSize; - } - - // copy packet - switch (myDetectorType) { - // for gotthard, 1st packet: 4 bytes fnum, CACA - // + CACA, 639*2 bytes data 2nd packet: 4 - // bytes fnum, previous 1*2 bytes data + 640*2 bytes data !! - case GOTTHARD: - if (!pnum) - memcpy(buf + fifohsize, &carryOverPacket[hsize + 4], dsize - 2); - else - memcpy(buf + fifohsize + dsize - 2, &carryOverPacket[hsize], - dsize + 2); - break; - case CHIPTESTBOARD: - case MOENCH: - if (pnum == (pperFrame - 1)) - memcpy(buf + fifohsize + (pnum * dsize), - &carryOverPacket[hsize], corrected_dsize); - else - memcpy(buf + fifohsize + (pnum * dsize), - &carryOverPacket[hsize], dsize); - break; - default: - memcpy(buf + fifohsize + (pnum * dsize), &carryOverPacket[hsize], - dsize); - break; + return HandleFuturePacket(false, numpackets, fnum, isHeaderEmpty, imageSize, dstHeader); } + CopyPacket(dstData, carryOverPacket.get(), dsize, hsize, corrected_dsize, numpackets, isHeaderEmpty, standardHeader, dstHeader, srcDetHeader, pnum, bnum); carryOverFlag = false; - ++numpackets; // number of packets in this image (each time its copied - // to buf) - new_header->packetsMask[( - (pnum < MAX_NUM_PACKETS) ? pnum : MAX_NUM_PACKETS - 1)] = 1; - - // writer header - if (isHeaderEmpty) { - // -------------------------- new header - // ---------------------------------------------------------------------- - if (standardheader) { - memcpy((char *)new_header, (char *)old_header, - sizeof(sls_detector_header)); - } - // -------------------old header - // ------------------------------------------------------------------------------ - else { - new_header->detHeader.frameNumber = fnum; - new_header->detHeader.bunchId = bnum; - new_header->detHeader.row = row; - new_header->detHeader.column = column; - new_header->detHeader.detType = - (uint8_t)generalData->myDetectorType; - new_header->detHeader.version = - (uint8_t)SLS_DETECTOR_HEADER_VERSION; - } - //------------------------------------------------------------------------------------------------------------ - isHeaderEmpty = false; - } } // until last packet isHeaderEmpty to account for gotthard short frame, else // never entering this loop) while (numpackets < pperFrame) { // listen to new packet - rc = 0; + int rc = 0; if (udpSocketAlive) { rc = udpSocket->ReceiveDataOnly(&listeningPacket[0]); } // end of acquisition if (rc <= 0) { if (numpackets == 0) - return 0; // empty image - - switch (*frameDiscardMode) { - case DISCARD_EMPTY_FRAMES: - if (!numpackets) { - return -1; - } - break; - case DISCARD_PARTIAL_FRAMES: - // empty packet now, but not empty image (EOA) - if (numpackets) { - LOG(logDEBUG) - << index << " discarding fnum:" << currentFrameIndex; - } - return -1; - default: - break; - } - new_header->detHeader.packetNumber = - numpackets; // number of packets caught - if (isHeaderEmpty) { - new_header->detHeader.row = row; - new_header->detHeader.column = column; - } - new_header->detHeader.frameNumber = currentFrameIndex; - return imageSize; // empty packet now, but not empty image (EOA) + return 0; + return HandleFuturePacket(true, numpackets, fnum, isHeaderEmpty, imageSize, dstHeader); } - // update parameters - numPacketsCaught++; // record immediately to get more time before socket - // shutdown + numPacketsCaught++; numPacketsStatistic++; - - // -------------------------- new header - // ---------------------------------------------------------------------- - if (standardheader) { - old_header = (sls_detector_header *)(&listeningPacket[0]); - fnum = old_header->frameNumber; - pnum = old_header->packetNumber; - } - // -------------------old header - // ----------------------------------------------------------------------------- - else { - // set first packet to be odd or even (check required when switching - // from roi to no roi) - if (myDetectorType == GOTTHARD && !startedFlag) { - oddStartingPacket = generalData->SetOddStartingPacket( - index, &listeningPacket[0]); - } - - generalData->GetHeaderInfo(index, &listeningPacket[0], - oddStartingPacket, fnum, pnum, bnum); - } - //------------------------------------------------------------------------------------------------------------ + GetPacketIndices(fnum, pnum, bnum, standardHeader, listeningPacket.get(), srcDetHeader); // Eiger Firmware in a weird state if (myDetectorType == EIGER && fnum == 0) { @@ -504,120 +351,37 @@ uint32_t Listener::ListenToAnImage(char *buf) { << "]: Got Frame Number " "Zero from Firmware. Discarding Packet"; numPacketsCaught--; + numPacketsStatistic--; return 0; } lastCaughtFrameIndex = fnum; - LOG(logDEBUG1) << "Listening " << index << ": currentfindex:" << currentFrameIndex << ", fnum:" << fnum << ", pnum:" << pnum << ", numpackets:" << numpackets; - if (!startedFlag) RecordFirstIndex(fnum); + // bad packet if (pnum >= pperFrame) { LOG(logERROR) << "Bad packet " << pnum << "(fnum: " << fnum - << "), throwing away. " - "Packets caught so far: " + << "), throwing away. Packets caught so far: " << numpackets; - return 0; // bad packet + return 0; } - // future packet by looking at image number (all other - // detectors) + // future packet if (fnum != currentFrameIndex) { carryOverFlag = true; memcpy(carryOverPacket.get(), &listeningPacket[0], packetSize); - - switch (*frameDiscardMode) { - case DISCARD_EMPTY_FRAMES: - if (!numpackets) { - LOG(logDEBUG) - << index << " Skipped fnum:" << currentFrameIndex; - currentFrameIndex = fnum; - return -1; - } - break; - case DISCARD_PARTIAL_FRAMES: - LOG(logDEBUG) - << index << " discarding fnum:" << currentFrameIndex; - currentFrameIndex = fnum; - return -1; - default: - break; - } - new_header->detHeader.packetNumber = - numpackets; // number of packets caught - if (isHeaderEmpty) { - new_header->detHeader.row = row; - new_header->detHeader.column = column; - } - new_header->detHeader.frameNumber = currentFrameIndex; - ++currentFrameIndex; - return imageSize; - } - - // copy packet - switch (myDetectorType) { - // for gotthard, 1st packet: 4 bytes fnum, CACA - // + CACA, 639*2 bytes data 2nd packet: 4 - // bytes fnum, previous 1*2 bytes data + 640*2 bytes data !! - case GOTTHARD: - if (!pnum) - memcpy(buf + fifohsize + (pnum * dsize), - &listeningPacket[hsize + 4], dsize - 2); - else - memcpy(buf + fifohsize + (pnum * dsize) - 2, - &listeningPacket[hsize], dsize + 2); - break; - case CHIPTESTBOARD: - case MOENCH: - if (pnum == (pperFrame - 1)) - memcpy(buf + fifohsize + (pnum * dsize), - &listeningPacket[hsize], corrected_dsize); - else - memcpy(buf + fifohsize + (pnum * dsize), - &listeningPacket[hsize], dsize); - break; - default: - memcpy(buf + fifohsize + (pnum * dsize), &listeningPacket[hsize], - dsize); - break; - } - ++numpackets; // number of packets in this image (each time its copied - // to buf) - new_header->packetsMask[( - (pnum < MAX_NUM_PACKETS) ? pnum : MAX_NUM_PACKETS - 1)] = 1; - - if (isHeaderEmpty) { - // -------------------------- new header - // ---------------------------------------------------------------------- - if (standardheader) { - memcpy((char *)new_header, (char *)old_header, - sizeof(sls_detector_header)); - } - // -------------------old header - // ------------------------------------------------------------------------------ - else { - new_header->detHeader.frameNumber = fnum; - new_header->detHeader.bunchId = bnum; - new_header->detHeader.row = row; - new_header->detHeader.column = column; - new_header->detHeader.detType = - (uint8_t)generalData->myDetectorType; - new_header->detHeader.version = - (uint8_t)SLS_DETECTOR_HEADER_VERSION; - } - //------------------------------------------------------------------------------------------------------------ - isHeaderEmpty = false; + return HandleFuturePacket(false, numpackets, fnum, isHeaderEmpty, imageSize, dstHeader); } + CopyPacket(dstData, listeningPacket.get(), dsize, hsize, corrected_dsize, numpackets, isHeaderEmpty, standardHeader, dstHeader, srcDetHeader, pnum, bnum); } // complete image - new_header->detHeader.packetNumber = numpackets; // number of packets caught - new_header->detHeader.frameNumber = currentFrameIndex; + dstHeader.detHeader.packetNumber = numpackets; if (numpackets == pperFrame) { ++numCompleteFramesCaught; } @@ -625,6 +389,104 @@ uint32_t Listener::ListenToAnImage(char *buf) { return imageSize; } +size_t Listener::HandleFuturePacket(bool EOA, uint32_t numpackets, uint64_t fnum, bool isHeaderEmpty, size_t imageSize, sls_receiver_header& dstHeader) { + switch (*frameDiscardMode) { + case DISCARD_EMPTY_FRAMES: + if (!numpackets) { + if (!EOA) { + LOG(logDEBUG) << index << " Skipped fnum:" << currentFrameIndex; + currentFrameIndex = fnum; + } + return -1; + } + break; + case DISCARD_PARTIAL_FRAMES: + LOG(logDEBUG) << index << " discarding fnum:" << currentFrameIndex; + if (!EOA) { + currentFrameIndex = fnum; + } + return -1; + default: + break; + } + dstHeader.detHeader.packetNumber = numpackets; + // for empty frames (padded) + if (isHeaderEmpty) { + dstHeader.detHeader.frameNumber = currentFrameIndex; + // no packet to get bnum + dstHeader.detHeader.row = row; + dstHeader.detHeader.column = column; + dstHeader.detHeader.detType = static_cast(generalData->myDetectorType); + dstHeader.detHeader.version = static_cast(SLS_DETECTOR_HEADER_VERSION); + } + if (!EOA) { + ++currentFrameIndex; + } + return imageSize; +} + +void Listener::CopyPacket(char* dst, char* src, uint32_t dataSize, uint32_t detHeaderSize, uint32_t correctedDataSize, uint32_t &numpackets, bool &isHeaderEmpty, bool standardHeader, sls_receiver_header& dstHeader, sls_detector_header * srcDetHeader, uint32_t pnum, uint64_t bnum) { + + // copy packet data + switch (myDetectorType) { + // for gotthard, 1st packet: 4 bytes fnum, CACA + // + CACA, 639*2 bytes data 2nd packet: 4 + // bytes fnum, previous 1*2 bytes data + 640*2 bytes data !! + case GOTTHARD: + if (!pnum) + memcpy(dst, &src[detHeaderSize + 4], dataSize - 2); + else + memcpy(dst + dataSize - 2, &src[detHeaderSize], dataSize + 2); + break; + case CHIPTESTBOARD: + case MOENCH: + if (pnum == (generalData->packetsPerFrame - 1)) + memcpy(dst + (pnum * dataSize), &src[detHeaderSize], correctedDataSize); + else + memcpy(dst + (pnum * dataSize), &src[detHeaderSize], dataSize); + break; + default: + memcpy(dst + (pnum * dataSize), &src[detHeaderSize], dataSize); + break; + } + + ++numpackets; + dstHeader.packetsMask[( + (pnum < MAX_NUM_PACKETS) ? pnum : MAX_NUM_PACKETS - 1)] = 1; + + // writer header + if (isHeaderEmpty) { + if (standardHeader) { + memcpy((char *)&dstHeader, (char *)srcDetHeader, sizeof(sls_detector_header)); + } else { + dstHeader.detHeader.frameNumber = currentFrameIndex; + dstHeader.detHeader.bunchId = bnum; + dstHeader.detHeader.row = row; + dstHeader.detHeader.column = column; + dstHeader.detHeader.detType = static_cast(generalData->myDetectorType); + dstHeader.detHeader.version = static_cast(SLS_DETECTOR_HEADER_VERSION); + } + isHeaderEmpty = false; + } +} + +void Listener::GetPacketIndices(uint64_t &fnum, uint32_t &pnum, uint64_t &bnum, bool standardHeader, char* packet, sls_detector_header*& header) { + if (standardHeader) { + header = (sls_detector_header *)(&packet[0]); + fnum = header->frameNumber; + pnum = header->packetNumber; + } else { + // set first packet to be odd or even (check required when switching + // from roi to no roi) + if (myDetectorType == GOTTHARD && !startedFlag) { + oddStartingPacket = generalData->SetOddStartingPacket(index, &packet[0]); + } + generalData->GetHeaderInfo(index, &packet[0], oddStartingPacket, fnum, pnum, bnum); + } +} + + + void Listener::PrintFifoStatistics() { LOG(logDEBUG1) << "numFramesStatistic:" << numFramesStatistic << " numPacketsStatistic:" << numPacketsStatistic diff --git a/slsReceiverSoftware/src/Listener.h b/slsReceiverSoftware/src/Listener.h index 2257f54cd..b5bcc130e 100644 --- a/slsReceiverSoftware/src/Listener.h +++ b/slsReceiverSoftware/src/Listener.h @@ -39,12 +39,11 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { * @param as pointer to actual udp socket buffer size * @param fpf pointer to frames per file * @param fdp frame discard policy - * @param detds pointer to detector data stream * @param sm pointer to silent mode */ Listener(int ind, detectorType dtype, Fifo *f, std::atomic *s, uint32_t *portno, std::string *e, int *us, int *as, uint32_t *fpf, - frameDiscardPolicy *fdp, bool *detds, bool *sm); + frameDiscardPolicy *fdp, bool *sm); /** * Destructor @@ -52,6 +51,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { */ ~Listener(); + bool isPortDisabled() const; uint64_t GetPacketsCaught() const; uint64_t GetNumCompleteFramesCaught() const; uint64_t GetLastFrameIndexCaught() const; @@ -65,6 +65,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { void ResetParametersforNewAcquisition(); void SetGeneralData(GeneralData *g); void SetActivate(bool enable); + void SetDetectorDatastream(bool enable); void SetNoRoi(bool enable); void CreateUDPSockets(); void ShutDownUDPSocket(); @@ -98,18 +99,22 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { * Pushes non empty buffers into fifo/ frees empty buffer, * pushes dummy buffer into fifo * and reset running mask by calling StopRunning() - * @param buf address of buffer */ - void StopListening(char *buf); + void StopListening(char *buf, size_t& size); /** * Listen to the UDP Socket for an image, * place them in the right order - * @param buf address of buffer * @returns number of bytes of relevant data, can be image size or 0 (stop * acquisition) or -1 to discard image */ - uint32_t ListenToAnImage(char *buf); + uint32_t ListenToAnImage(sls_receiver_header & dstHeader, char *dstData); + + size_t HandleFuturePacket(bool EOA, uint32_t numpackets, uint64_t fnum, bool isHeaderEmpty, size_t imageSize, sls_receiver_header& rxHeader); + + void CopyPacket(char* dst, char* src, uint32_t dataSize, uint32_t detHeaderSize, uint32_t correctedDataSize, uint32_t &numpackets, bool &isHeaderEmpty, bool standardHeader, sls_receiver_header& rxHeader, sls_detector_header* detHeader, uint32_t pnum, uint64_t bnum); + + void GetPacketIndices(uint64_t &fnum, uint32_t &pnum, uint64_t &bnum, bool standardHeader, char* packet, sls_detector_header*& header); void PrintFifoStatistics(); @@ -129,9 +134,10 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject { uint32_t *framesPerFile; frameDiscardPolicy *frameDiscardMode; bool activated{false}; - bool *detectorDataStream; + bool detectorDataStream{true}; bool noRoi{false}; bool *silentMode; + bool disabledPort{false}; /** row hardcoded as 1D or 2d, * if detector does not send them yet or diff --git a/slsReceiverSoftware/src/receiver_defs.h b/slsReceiverSoftware/src/receiver_defs.h index 21dfeaa1a..b6b880851 100644 --- a/slsReceiverSoftware/src/receiver_defs.h +++ b/slsReceiverSoftware/src/receiver_defs.h @@ -36,11 +36,14 @@ namespace sls { // binary #define FILE_BUFFER_SIZE (16 * 1024 * 1024) // 16mb -// fifo -#define FIFO_HEADER_NUMBYTES (16) -#define FIFO_DATASIZE_NUMBYTES (4) -#define FIFO_PADDING_NUMBYTES \ - (4) // for 8 byte alignment due to sls_receiver_header structure +// fifo +struct image_structure { + size_t size; + size_t firstIndex; + slsDetectorDefs::sls_receiver_header header; + char data[]; +}; +#define IMAGE_STRUCTURE_HEADER_SIZE (sizeof(size_t) + sizeof(size_t) + sizeof(slsDetectorDefs::sls_receiver_header)) // hdf5 #define MAX_CHUNKED_IMAGES (1)