Rx: refactor memory structure and listener (#496)

* gui message doesnt show if it has a '>' symbol in error msg

* minor refactoring for readability (size_t calc fifo size)

* refactoring listening udp socket code: activated and datastream dont create udp sockets anyway, rc<=- should be discarded in any case

* wip

* refactoring memory structure access

* wip: bugfix write header + data to binary

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* portRoi no roi effecto on progress

* fail at receiver progress, wip

* segfaults for char pointer in struct

* reference to header to get header and data

* refactoring

* use const defined for size of header of fifo

* updated release notes

* refactoring from review: fwrite, static_cast
This commit is contained in:
Dhanya Thattil 2022-07-22 15:32:41 +02:00 committed by GitHub
parent 26cbfbdb30
commit 4117cda79b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 469 additions and 668 deletions

View File

@ -83,6 +83,7 @@ This document describes the differences between v7.0.0 and v6.x.x
-udp_srcip and udp_Srcip2: can set to auto (for virtual or 1g data networks)
- set dataset name for all hdf5 files to "data" only
- number of storage cells is not updated in teh receiver. done. and also allowing it to be modified in running status
- refactored memory structure in receiver and listener code (maybe resolves stuck issue, need to check)
2. Resolved Issues
==================

View File

@ -65,9 +65,7 @@ void BinaryDataFile::CreateFile() {
}
}
void BinaryDataFile::WriteToFile(char *buffer, const int buffersize,
const uint64_t currentFrameNumber,
const uint32_t numPacketsCaught) {
void BinaryDataFile::WriteToFile(char *imageData, sls_receiver_header& header, const int imageSize, const uint64_t currentFrameNumber, const uint32_t numPacketsCaught) {
// check if maxframesperfile = 0 for infinite
if (maxFramesPerFile_ && (numFramesInFile_ >= maxFramesPerFile_)) {
CloseFile();
@ -77,37 +75,34 @@ void BinaryDataFile::WriteToFile(char *buffer, const int buffersize,
++numFramesInFile_;
// write to file
int ret = 0;
size_t ret = 0;
// contiguous bitset
// contiguous bitset (write header + image)
if (sizeof(sls_bitset) == sizeof(bitset_storage)) {
ret = fwrite(buffer, 1, buffersize, fd_);
ret = fwrite(&header, sizeof(sls_receiver_header) + imageSize, 1, fd_);
}
// not contiguous bitset
else {
// write detector header
ret = fwrite(buffer, 1, sizeof(sls_detector_header), fd_);
ret = fwrite(&header, sizeof(sls_detector_header), 1, fd_);
// get contiguous representation of bit mask
bitset_storage storage;
memset(storage, 0, sizeof(bitset_storage));
sls_bitset bits = *(sls_bitset *)(buffer + sizeof(sls_detector_header));
sls_bitset bits = header.packetsMask;
for (int i = 0; i < MAX_NUM_PACKETS; ++i)
storage[i >> 3] |= (bits[i] << (i & 7));
// write bitmask
ret += fwrite((char *)storage, 1, sizeof(bitset_storage), fd_);
ret += fwrite(storage, sizeof(bitset_storage), 1, fd_);
// write data
ret += fwrite(buffer + sizeof(sls_detector_header), 1,
buffersize - sizeof(sls_receiver_header), fd_);
ret += fwrite(imageData, imageSize, 1, fd_);
}
// if write error
if (ret != buffersize) {
throw RuntimeError(std::to_string(index_) +
" : Write to file failed for image number " +
std::to_string(currentFrameNumber));
if (ret != imageSize + sizeof(sls_receiver_header)) {
throw RuntimeError(std::to_string(index_) + " : Write to file failed for image number " + std::to_string(currentFrameNumber) + ". Wrote " + std::to_string(ret) + " bytes instead of " + std::to_string(imageSize + sizeof(sls_receiver_header)));
}
}

View File

@ -22,9 +22,7 @@ class BinaryDataFile : private virtual slsDetectorDefs, public File {
const uint32_t udpPortNumber,
const uint32_t maxFramesPerFile) override;
void WriteToFile(char *buffer, const int buffersize,
const uint64_t currentFrameNumber,
const uint32_t numPacketsCaught) override;
void WriteToFile(char *imageData, sls_receiver_header& header, const int imageSize, const uint64_t currentFrameNumber, const uint32_t numPacketsCaught) override;
private:
void CreateFile();

View File

@ -1176,7 +1176,7 @@ int ClientInterface::get_additional_json_header(Interface &socket) {
int ClientInterface::set_udp_socket_buffer_size(Interface &socket) {
auto size = socket.Receive<int>();
if (size == 0) {
throw RuntimeError("Receiver socket buffer size must be > 0.");
throw RuntimeError("Receiver socket buffer size must be greater than 0.");
}
if (size > 0) {
verifyIdle(socket);

View File

@ -26,72 +26,72 @@
namespace sls {
const std::string DataProcessor::typeName_ = "DataProcessor";
const std::string DataProcessor::typeName = "DataProcessor";
DataProcessor::DataProcessor(int index, detectorType detectorType, Fifo *fifo,
bool *dataStreamEnable,
uint32_t *streamingFrequency,
uint32_t *streamingTimerInMs,
uint32_t *streamingStartFnum, bool *framePadding,
std::vector<int> *ctbDbitList, int *ctbDbitOffset,
int *ctbAnalogDataBytes)
: ThreadObject(index, typeName_), fifo_(fifo), detectorType_(detectorType),
dataStreamEnable_(dataStreamEnable),
streamingFrequency_(streamingFrequency),
streamingTimerInMs_(streamingTimerInMs),
streamingStartFnum_(streamingStartFnum), framePadding_(framePadding),
ctbDbitList_(ctbDbitList), ctbDbitOffset_(ctbDbitOffset),
ctbAnalogDataBytes_(ctbAnalogDataBytes) {
DataProcessor::DataProcessor(int index, detectorType dType, Fifo *f,
bool *dse,
uint32_t *sf,
uint32_t *st,
uint32_t *sfnum, bool *fp,
std::vector<int> *ctblist, int *ctboff,
int *ctbad)
: ThreadObject(index, typeName), fifo(f), detType(dType),
dataStreamEnable(dse),
streamingFrequency(sf),
streamingTimerInMs(st),
streamingStartFnum(sfnum), framePadding(fp),
ctbDbitList(ctblist), ctbDbitOffset(ctboff),
ctbAnalogDataBytes(ctbad) {
LOG(logDEBUG) << "DataProcessor " << index << " created";
}
DataProcessor::~DataProcessor() { DeleteFiles(); }
bool DataProcessor::GetStartedFlag() const { return startedFlag_; }
bool DataProcessor::GetStartedFlag() const { return startedFlag; }
void DataProcessor::SetFifo(Fifo *fifo) { fifo_ = fifo; }
void DataProcessor::SetFifo(Fifo *fifo) { fifo = fifo; }
void DataProcessor::SetActivate(bool enable) { activated_ = enable; }
void DataProcessor::SetActivate(bool enable) { activated = enable; }
void DataProcessor::SetReceiverROI(ROI roi) {
receiverRoi_ = roi;
receiverRoiEnabled_ = receiverRoi_.completeRoi() ? false : true;
receiverNoRoi_ = receiverRoi_.noRoi();
receiverRoi = roi;
receiverRoiEnabled = receiverRoi.completeRoi() ? false : true;
receiverNoRoi = receiverRoi.noRoi();
}
void DataProcessor::ResetParametersforNewAcquisition() {
StopRunning();
startedFlag_ = false;
numFramesCaught_ = 0;
firstIndex_ = 0;
currentFrameIndex_ = 0;
firstStreamerFrame_ = true;
streamCurrentFrame_ = false;
completeImageToStreamBeforeCropping = make_unique<char[]>(generalData_->imageSize);
startedFlag = false;
numFramesCaught = 0;
firstIndex = 0;
currentFrameIndex = 0;
firstStreamerFrame = true;
streamCurrentFrame = false;
completeImageToStreamBeforeCropping = make_unique<char[]>(generalData->imageSize);
}
void DataProcessor::RecordFirstIndex(uint64_t fnum) {
// listen to this fnum, later +1
currentFrameIndex_ = fnum;
startedFlag_ = true;
firstIndex_ = fnum;
LOG(logDEBUG1) << index << " First Index:" << firstIndex_;
currentFrameIndex = fnum;
startedFlag = true;
firstIndex = fnum;
LOG(logDEBUG1) << index << " First Index:" << firstIndex;
}
void DataProcessor::SetGeneralData(GeneralData *generalData) {
generalData_ = generalData;
void DataProcessor::SetGeneralData(GeneralData *g) {
generalData = g;
}
void DataProcessor::CloseFiles() {
if (dataFile_)
dataFile_->CloseFile();
if (dataFile)
dataFile->CloseFile();
}
void DataProcessor::DeleteFiles() {
CloseFiles();
delete dataFile_;
dataFile_ = nullptr;
delete dataFile;
dataFile = nullptr;
}
void DataProcessor::SetupFileWriter(const bool filewriteEnable,
const fileFormat fileFormatType,
@ -101,11 +101,11 @@ void DataProcessor::SetupFileWriter(const bool filewriteEnable,
switch (fileFormatType) {
#ifdef HDF5C
case HDF5:
dataFile_ = new HDF5DataFile(index, hdf5LibMutex);
dataFile = new HDF5DataFile(index, hdf5LibMutex);
break;
#endif
case BINARY:
dataFile_ = new BinaryDataFile(index);
dataFile = new BinaryDataFile(index);
break;
default:
throw RuntimeError(
@ -121,38 +121,38 @@ void DataProcessor::CreateFirstFiles(
const uint32_t udpPortNumber, const uint32_t maxFramesPerFile,
const uint64_t numImages, const uint32_t dynamicRange,
const bool detectorDataStream) {
if (dataFile_ == nullptr) {
if (dataFile == nullptr) {
throw RuntimeError("file object not contstructed");
}
CloseFiles();
// deactivated (half module/ single port or no roi), dont write file
if (!activated_ || !detectorDataStream || receiverNoRoi_) {
if (!activated || !detectorDataStream || receiverNoRoi) {
return;
}
#ifdef HDF5C
int nx = generalData_->nPixelsX;
int ny = generalData_->nPixelsY;
if (receiverRoiEnabled_) {
nx = receiverRoi_.xmax - receiverRoi_.xmin + 1;
ny = receiverRoi_.ymax - receiverRoi_.ymin + 1;
if (receiverRoi_.ymax == -1 || receiverRoi_.ymin == -1) {
int nx = generalData->nPixelsX;
int ny = generalData->nPixelsY;
if (receiverRoiEnabled) {
nx = receiverRoi.xmax - receiverRoi.xmin + 1;
ny = receiverRoi.ymax - receiverRoi.ymin + 1;
if (receiverRoi.ymax == -1 || receiverRoi.ymin == -1) {
ny = 1;
}
}
#endif
switch (dataFile_->GetFileFormat()) {
switch (dataFile->GetFileFormat()) {
#ifdef HDF5C
case HDF5:
dataFile_->CreateFirstHDF5DataFile(
dataFile->CreateFirstHDF5DataFile(
filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode,
modulePos, numUnitsPerReadout, udpPortNumber, maxFramesPerFile,
numImages, nx, ny, dynamicRange);
break;
#endif
case BINARY:
dataFile_->CreateFirstBinaryDataFile(
dataFile->CreateFirstBinaryDataFile(
filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode,
modulePos, numUnitsPerReadout, udpPortNumber, maxFramesPerFile);
break;
@ -163,11 +163,11 @@ void DataProcessor::CreateFirstFiles(
#ifdef HDF5C
uint32_t DataProcessor::GetFilesInAcquisition() const {
if (dataFile_ == nullptr) {
if (dataFile == nullptr) {
throw RuntimeError("No data file object created to get number of "
"files in acquiistion");
}
return dataFile_->GetFilesInAcquisition();
return dataFile->GetFilesInAcquisition();
}
std::string DataProcessor::CreateVirtualFile(
@ -178,17 +178,17 @@ std::string DataProcessor::CreateVirtualFile(
const int numModX, const int numModY, const uint32_t dynamicRange,
std::mutex *hdf5LibMutex) {
if (receiverRoiEnabled_) {
if (receiverRoiEnabled) {
throw std::runtime_error("Skipping virtual hdf5 file since rx_roi is enabled.");
}
bool gotthard25um =
((detectorType_ == GOTTHARD || detectorType_ == GOTTHARD2) &&
((detType == GOTTHARD || detType == GOTTHARD2) &&
(numModX * numModY) == 2);
// maxframesperfile = 0 for infinite files
uint32_t framesPerFile =
((maxFramesPerFile == 0) ? numFramesCaught_ : maxFramesPerFile);
((maxFramesPerFile == 0) ? numFramesCaught : maxFramesPerFile);
// TODO: assumption 1: create virtual file even if no data in other
// files (they exist anyway) assumption2: virtual file max frame index
@ -197,9 +197,9 @@ std::string DataProcessor::CreateVirtualFile(
return masterFileUtility::CreateVirtualHDF5File(
filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode,
modulePos, numUnitsPerReadout, framesPerFile,
generalData_->nPixelsX, generalData_->nPixelsY, dynamicRange,
numFramesCaught_, numModX, numModY, dataFile_->GetPDataType(),
dataFile_->GetParameterNames(), dataFile_->GetParameterDataTypes(),
generalData->nPixelsX, generalData->nPixelsY, dynamicRange,
numFramesCaught, numModX, numModY, dataFile->GetPDataType(),
dataFile->GetParameterNames(), dataFile->GetParameterDataTypes(),
hdf5LibMutex, gotthard25um);
}
@ -208,16 +208,16 @@ void DataProcessor::LinkFileInMaster(const std::string &masterFileName,
const bool silentMode,
std::mutex *hdf5LibMutex) {
if (receiverRoiEnabled_) {
if (receiverRoiEnabled) {
throw std::runtime_error("Should not be here, roi with hdf5 virtual should throw.");
}
std::string fname{virtualFileName}, masterfname{masterFileName};
// if no virtual file, link data file
if (virtualFileName.empty()) {
fname = dataFile_->GetFileName();
fname = dataFile->GetFileName();
}
masterFileUtility::LinkHDF5FileInMaster(masterfname, fname,
dataFile_->GetParameterNames(),
dataFile->GetParameterNames(),
silentMode, hdf5LibMutex);
}
#endif
@ -228,7 +228,7 @@ std::string DataProcessor::CreateMasterFile(
const fileFormat fileFormatType, MasterAttributes *attr,
std::mutex *hdf5LibMutex) {
attr->framesInFile = numFramesCaught_;
attr->framesInFile = numFramesCaught;
std::unique_ptr<File> masterFile{nullptr};
switch (fileFormatType) {
@ -249,35 +249,35 @@ std::string DataProcessor::CreateMasterFile(
void DataProcessor::ThreadExecution() {
char *buffer = nullptr;
fifo_->PopAddress(buffer);
fifo->PopAddress(buffer);
LOG(logDEBUG5) << "DataProcessor " << index << ", " << std::hex
<< static_cast<void *>(buffer) << std::dec << ":" << buffer;
auto *memImage = reinterpret_cast<image_structure *>(buffer);
// check dummy
auto numBytes = *reinterpret_cast<uint32_t *>(buffer);
LOG(logDEBUG1) << "DataProcessor " << index << ", Numbytes:" << numBytes;
if (numBytes == DUMMY_PACKET_VALUE) {
LOG(logDEBUG1) << "DataProcessor " << index << ", Numbytes:" << memImage->size;
if (memImage->size == DUMMY_PACKET_VALUE) {
StopProcessing(buffer);
return;
}
try {
ProcessAnImage(buffer);
ProcessAnImage(memImage->header, memImage->size, memImage->firstIndex, memImage->data);
} catch (const std::exception &e) {
fifo_->FreeAddress(buffer);
fifo->FreeAddress(buffer);
return;
}
// stream (if time/freq to stream) or free
if (streamCurrentFrame_) {
if (streamCurrentFrame) {
// copy the complete image back if roi enabled
if (receiverRoiEnabled_) {
(*((uint32_t *)buffer)) = generalData_->imageSize;
memcpy(buffer + generalData_->fifoBufferHeaderSize, &completeImageToStreamBeforeCropping[0], generalData_->imageSize);
if (receiverRoiEnabled) {
memImage->size = generalData->imageSize;
memcpy(memImage->data, &completeImageToStreamBeforeCropping[0], generalData->imageSize);
}
fifo_->PushAddressToStream(buffer);
fifo->PushAddressToStream(buffer);
} else {
fifo_->FreeAddress(buffer);
fifo->FreeAddress(buffer);
}
}
@ -285,92 +285,76 @@ void DataProcessor::StopProcessing(char *buf) {
LOG(logDEBUG1) << "DataProcessing " << index << ": Dummy";
// stream or free
if (*dataStreamEnable_)
fifo_->PushAddressToStream(buf);
if (*dataStreamEnable)
fifo->PushAddressToStream(buf);
else
fifo_->FreeAddress(buf);
fifo->FreeAddress(buf);
CloseFiles();
StopRunning();
LOG(logDEBUG1) << index << ": Processing Completed";
}
void DataProcessor::ProcessAnImage(char *buf) {
auto *rheader =
reinterpret_cast<sls_receiver_header *>(buf + FIFO_HEADER_NUMBYTES);
sls_detector_header header = rheader->detHeader;
uint64_t fnum = header.frameNumber;
currentFrameIndex_ = fnum;
numFramesCaught_++;
uint32_t nump = header.packetNumber;
void DataProcessor::ProcessAnImage(sls_receiver_header & header, size_t &size, size_t &firstImageIndex, char* data) {
uint64_t fnum = header.detHeader.frameNumber;
LOG(logDEBUG1) << "DataProcessing " << index << ": fnum:" << fnum;
currentFrameIndex = fnum;
numFramesCaught++;
uint32_t nump = header.detHeader.packetNumber;
if (!startedFlag_) {
if (!startedFlag) {
RecordFirstIndex(fnum);
if (*dataStreamEnable_) {
if (*dataStreamEnable) {
// restart timer
clock_gettime(CLOCK_REALTIME, &timerbegin_);
timerbegin_.tv_sec -= (*streamingTimerInMs_) / 1000;
timerbegin_.tv_nsec -= ((*streamingTimerInMs_) % 1000) * 1000000;
clock_gettime(CLOCK_REALTIME, &timerbegin);
timerbegin.tv_sec -= (*streamingTimerInMs) / 1000;
timerbegin.tv_nsec -= ((*streamingTimerInMs) % 1000) * 1000000;
// to send first image
currentFreqCount_ = *streamingFrequency_ - *streamingStartFnum_;
currentFreqCount = *streamingFrequency - *streamingStartFnum;
}
}
// frame padding
if (activated_ && *framePadding_ && nump < generalData_->packetsPerFrame)
PadMissingPackets(buf);
if (*framePadding && nump < generalData->packetsPerFrame)
PadMissingPackets(header, data);
// rearrange ctb digital bits (if ctbDbitlist is not empty)
if (!(*ctbDbitList_).empty()) {
RearrangeDbitData(buf);
if (!(*ctbDbitList).empty()) {
RearrangeDbitData(size, data);
}
// 'stream Image' check has to be done here before crop image
// stream (if time/freq to stream) or free
if (*dataStreamEnable_ && SendToStreamer()) {
// if first frame to stream, add frame index to fifo header (might
// not be the first)
if (firstStreamerFrame_) {
firstStreamerFrame_ = false;
(*((uint32_t *)(buf + FIFO_DATASIZE_NUMBYTES))) =
(uint32_t)(fnum - firstIndex_);
if (*dataStreamEnable && SendToStreamer()) {
if (firstStreamerFrame) {
firstStreamerFrame = false;
// write to memory structure of first streamer frame
firstImageIndex = firstIndex;
}
streamCurrentFrame_ = true;
streamCurrentFrame = true;
} else {
streamCurrentFrame_ = false;
streamCurrentFrame = false;
}
if (receiverRoiEnabled_) {
if (receiverRoiEnabled) {
// copy the complete image to stream before cropping
if (streamCurrentFrame_) {
memcpy(&completeImageToStreamBeforeCropping[0], buf + generalData_->fifoBufferHeaderSize, generalData_->imageSize);
if (streamCurrentFrame) {
memcpy(&completeImageToStreamBeforeCropping[0], data, generalData->imageSize);
}
CropImage(buf);
CropImage(size, data);
}
try {
// normal call back
if (rawDataReadyCallBack != nullptr) {
std::size_t dsize = *reinterpret_cast<uint32_t *>(buf);
rawDataReadyCallBack(rheader,
buf + FIFO_HEADER_NUMBYTES +
sizeof(sls_receiver_header),
dsize, pRawDataReady);
rawDataReadyCallBack(&header, data, size, pRawDataReady);
}
// call back with modified size
else if (rawDataModifyReadyCallBack != nullptr) {
std::size_t revsize = *reinterpret_cast<uint32_t *>(buf);
rawDataModifyReadyCallBack(rheader,
buf + FIFO_HEADER_NUMBYTES +
sizeof(sls_receiver_header),
revsize, pRawDataReady);
(*((uint32_t *)buf)) = revsize;
rawDataModifyReadyCallBack(&header, data, size, pRawDataReady);
}
} catch (const std::exception &e) {
throw RuntimeError("Get Data Callback Error: " +
@ -378,14 +362,9 @@ void DataProcessor::ProcessAnImage(char *buf) {
}
// write to file
if (dataFile_) {
if (dataFile) {
try {
dataFile_->WriteToFile(
buf + FIFO_HEADER_NUMBYTES,
sizeof(sls_receiver_header) +
(uint32_t)(*((uint32_t *)buf)), //+ size of data (resizable
// from previous call back
fnum - firstIndex_, nump);
dataFile->WriteToFile(data, header, size, fnum - firstIndex, nump);
} catch (const RuntimeError &e) {
; // ignore write exception for now (TODO: send error message
// via stopReceiver tcp)
@ -395,7 +374,7 @@ void DataProcessor::ProcessAnImage(char *buf) {
bool DataProcessor::SendToStreamer() {
// skip
if ((*streamingFrequency_) == 0u) {
if ((*streamingFrequency) == 0u) {
if (!CheckTimer())
return false;
} else {
@ -409,9 +388,9 @@ bool DataProcessor::CheckTimer() {
struct timespec end;
clock_gettime(CLOCK_REALTIME, &end);
auto elapsed_s = (end.tv_sec - timerbegin_.tv_sec) +
(end.tv_nsec - timerbegin_.tv_nsec) / 1e9;
double timer_s = *streamingTimerInMs_ / 1e3;
auto elapsed_s = (end.tv_sec - timerbegin.tv_sec) +
(end.tv_nsec - timerbegin.tv_nsec) / 1e9;
double timer_s = *streamingTimerInMs / 1e3;
LOG(logDEBUG1) << index << " Timer elapsed time:" << elapsed_s
<< " seconds";
@ -421,16 +400,16 @@ bool DataProcessor::CheckTimer() {
return false;
// restart timer
clock_gettime(CLOCK_REALTIME, &timerbegin_);
clock_gettime(CLOCK_REALTIME, &timerbegin);
return true;
}
bool DataProcessor::CheckCount() {
if (currentFreqCount_ == *streamingFrequency_) {
currentFreqCount_ = 1;
if (currentFreqCount == *streamingFrequency) {
currentFreqCount = 1;
return true;
}
currentFreqCount_++;
currentFreqCount++;
return false;
}
@ -446,22 +425,20 @@ void DataProcessor::registerCallBackRawDataModifyReady(
pRawDataReady = arg;
}
void DataProcessor::PadMissingPackets(char *buf) {
void DataProcessor::PadMissingPackets(sls_receiver_header header, char* data) {
LOG(logDEBUG) << index << ": Padding Missing Packets";
uint32_t pperFrame = generalData_->packetsPerFrame;
auto *header =
reinterpret_cast<sls_receiver_header *>(buf + FIFO_HEADER_NUMBYTES);
uint32_t nmissing = pperFrame - header->detHeader.packetNumber;
sls_bitset pmask = header->packetsMask;
uint32_t pperFrame = generalData->packetsPerFrame;
uint32_t dsize = generalData_->dataSize;
if (detectorType_ == GOTTHARD2 && index != 0) {
dsize = generalData_->vetoDataSize;
uint32_t nmissing = pperFrame - header.detHeader.packetNumber;
sls_bitset pmask = header.packetsMask;
uint32_t dsize = generalData->dataSize;
if (detType == GOTTHARD2 && index != 0) {
dsize = generalData->vetoDataSize;
}
uint32_t fifohsize = generalData_->fifoBufferHeaderSize;
uint32_t corrected_dsize =
dsize - ((pperFrame * dsize) - generalData_->imageSize);
dsize - ((pperFrame * dsize) - generalData->imageSize);
LOG(logDEBUG1) << "bitmask: " << pmask.to_string();
for (unsigned int pnum = 0; pnum < pperFrame; ++pnum) {
@ -478,26 +455,26 @@ void DataProcessor::PadMissingPackets(char *buf) {
<< std::endl;
// missing packet
switch (detectorType_) {
switch (detType) {
// for gotthard, 1st packet: 4 bytes fnum, CACA + CACA, 639*2 bytes
// data
// 2nd packet: 4 bytes fnum, previous 1*2 bytes data +
// 640*2 bytes data !!
case GOTTHARD:
if (pnum == 0u)
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize - 2);
memset(data + (pnum * dsize), 0xFF, dsize - 2);
else
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize + 2);
memset(data + (pnum * dsize), 0xFF, dsize + 2);
break;
case CHIPTESTBOARD:
case MOENCH:
if (pnum == (pperFrame - 1))
memset(buf + fifohsize + (pnum * dsize), 0xFF, corrected_dsize);
memset(data + (pnum * dsize), 0xFF, corrected_dsize);
else
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize);
memset(data + (pnum * dsize), 0xFF, dsize);
break;
default:
memset(buf + fifohsize + (pnum * dsize), 0xFF, dsize);
memset(data + (pnum * dsize), 0xFF, dsize);
break;
}
--nmissing;
@ -505,11 +482,9 @@ void DataProcessor::PadMissingPackets(char *buf) {
}
/** ctb specific */
void DataProcessor::RearrangeDbitData(char *buf) {
void DataProcessor::RearrangeDbitData(size_t & size, char *data) {
// TODO! (Erik) Refactor and add tests
int totalSize = (int)(*((uint32_t *)buf));
int ctbDigitalDataBytes =
totalSize - (*ctbAnalogDataBytes_) - (*ctbDbitOffset_);
int ctbDigitalDataBytes = size - (*ctbAnalogDataBytes) - (*ctbDbitOffset);
// no digital data
if (ctbDigitalDataBytes == 0) {
@ -519,20 +494,18 @@ void DataProcessor::RearrangeDbitData(char *buf) {
}
const int numSamples = (ctbDigitalDataBytes / sizeof(uint64_t));
const int digOffset = FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header) +
(*ctbAnalogDataBytes_);
// ceil as numResult8Bits could be decimal
const int numResult8Bits =
ceil((numSamples * (*ctbDbitList_).size()) / 8.00);
ceil((numSamples * (*ctbDbitList).size()) / 8.00);
std::vector<uint8_t> result(numResult8Bits);
uint8_t *dest = &result[0];
auto *source = (uint64_t *)(buf + digOffset + (*ctbDbitOffset_));
auto *source = (uint64_t *)(data + (*ctbAnalogDataBytes) + (*ctbDbitOffset));
// loop through digital bit enable vector
int bitoffset = 0;
for (auto bi : (*ctbDbitList_)) {
for (auto bi : (*ctbDbitList)) {
// where numbits * numsamples is not a multiple of 8
if (bitoffset != 0) {
bitoffset = 0;
@ -553,18 +526,18 @@ void DataProcessor::RearrangeDbitData(char *buf) {
}
}
// copy back to buf and update size
memcpy(buf + digOffset, result.data(), numResult8Bits * sizeof(uint8_t));
(*((uint32_t *)buf)) = numResult8Bits * sizeof(uint8_t);
// copy back to memory and update size
memcpy(data + (*ctbAnalogDataBytes), result.data(), numResult8Bits * sizeof(uint8_t));
size = numResult8Bits * sizeof(uint8_t);
}
void DataProcessor::CropImage(char *buf) {
LOG(logDEBUG) << "Cropping Image to ROI " << ToString(receiverRoi_);
int nPixelsX = generalData_->nPixelsX;
int xmin = receiverRoi_.xmin;
int xmax = receiverRoi_.xmax;
int ymin = receiverRoi_.ymin;
int ymax = receiverRoi_.ymax;
void DataProcessor::CropImage(size_t & size, char *data) {
LOG(logDEBUG) << "Cropping Image to ROI " << ToString(receiverRoi);
int nPixelsX = generalData->nPixelsX;
int xmin = receiverRoi.xmin;
int xmax = receiverRoi.xmax;
int ymin = receiverRoi.ymin;
int ymax = receiverRoi.ymax;
int xwidth = xmax - xmin + 1;
int ywidth = ymax - ymin + 1;
if (ymin == -1 || ymax == -1) {
@ -573,16 +546,16 @@ void DataProcessor::CropImage(char *buf) {
}
// calculate total roi size
double bytesPerPixel = generalData_->dynamicRange / 8.00;
double bytesPerPixel = generalData->dynamicRange / 8.00;
int startOffset = (int)((nPixelsX * ymin + xmin) * bytesPerPixel);
// write size into fifo buffer header
// write size into memory
std::size_t roiImageSize = xwidth * ywidth * bytesPerPixel;
LOG(logDEBUG) << "roiImageSize:" << roiImageSize;
(*((uint32_t *)buf)) = roiImageSize;
size = roiImageSize;
// copy the roi to the beginning of the image
char *dstOffset = buf + generalData_->fifoBufferHeaderSize;
char *dstOffset = data;
char *srcOffset = dstOffset + startOffset;
// entire width
@ -594,7 +567,7 @@ void DataProcessor::CropImage(char *buf) {
for (int y = 0; y != ywidth; ++y) {
memcpy(dstOffset, srcOffset, xwidth * bytesPerPixel);
dstOffset += (int)(xwidth * bytesPerPixel);
srcOffset += (int)(generalData_->nPixelsX * bytesPerPixel);
srcOffset += (int)(generalData->nPixelsX * bytesPerPixel);
}
}
}

View File

@ -29,11 +29,11 @@ struct MasterAttributes;
class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
public:
DataProcessor(int index, detectorType detectorType, Fifo *fifo,
bool *dataStreamEnable, uint32_t *streamingFrequency,
uint32_t *streamingTimerInMs, uint32_t *streamingStartFnum,
bool *framePadding, std::vector<int> *ctbDbitList,
int *ctbDbitOffset, int *ctbAnalogDataBytes);
DataProcessor(int index, detectorType dType, Fifo *f,
bool *dse, uint32_t *sf,
uint32_t *st, uint32_t *sfnum,
bool *fp, std::vector<int> *ctblist,
int *ctboff, int *ctbad);
~DataProcessor() override;
@ -114,7 +114,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
* Process an image popped from fifo,
* write to file if fw enabled & update parameters
*/
void ProcessAnImage(char *buf);
void ProcessAnImage(sls_receiver_header & header, size_t &size, size_t &firstImageIndex, char* data);
/**
* Calls CheckTimer and CheckCount for streaming frequency and timer
@ -137,52 +137,52 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
*/
bool CheckCount();
void PadMissingPackets(char *buf);
void PadMissingPackets(sls_receiver_header header, char* data);
/**
* Align corresponding digital bits together (CTB only if ctbDbitlist is not
* empty)
*/
void RearrangeDbitData(char *buf);
void RearrangeDbitData(size_t & size, char *data);
void CropImage(char *buf);
void CropImage(size_t & size, char *data);
static const std::string typeName_;
static const std::string typeName;
const GeneralData *generalData_{nullptr};
Fifo *fifo_;
detectorType detectorType_;
bool *dataStreamEnable_;
bool activated_{false};
ROI receiverRoi_{};
bool receiverRoiEnabled_{false};
bool receiverNoRoi_{false};
const GeneralData *generalData{nullptr};
Fifo *fifo;
detectorType detType;
bool *dataStreamEnable;
bool activated{false};
ROI receiverRoi{};
bool receiverRoiEnabled{false};
bool receiverNoRoi{false};
std::unique_ptr<char[]> completeImageToStreamBeforeCropping;
/** if 0, sending random images with a timer */
uint32_t *streamingFrequency_;
uint32_t *streamingTimerInMs_;
uint32_t *streamingStartFnum_;
uint32_t currentFreqCount_{0};
struct timespec timerbegin_ {};
bool *framePadding_;
std::vector<int> *ctbDbitList_;
int *ctbDbitOffset_;
int *ctbAnalogDataBytes_;
std::atomic<bool> startedFlag_{false};
std::atomic<uint64_t> firstIndex_{0};
uint32_t *streamingFrequency;
uint32_t *streamingTimerInMs;
uint32_t *streamingStartFnum;
uint32_t currentFreqCount{0};
struct timespec timerbegin {};
bool *framePadding;
std::vector<int> *ctbDbitList;
int *ctbDbitOffset;
int *ctbAnalogDataBytes;
std::atomic<bool> startedFlag{false};
std::atomic<uint64_t> firstIndex{0};
// for statistics
uint64_t numFramesCaught_{0};
uint64_t numFramesCaught{0};
/** Frame Number of latest processed frame number */
std::atomic<uint64_t> currentFrameIndex_{0};
std::atomic<uint64_t> currentFrameIndex{0};
/** first streamer frame to add frame index in fifo header */
bool firstStreamerFrame_{false};
bool firstStreamerFrame{false};
bool streamCurrentFrame_{false};
bool streamCurrentFrame{false};
File *dataFile_{nullptr};
File *dataFile{nullptr};
// call back
/**

View File

@ -52,12 +52,9 @@ void DataStreamer::ResetParametersforNewAcquisition(const std::string &fname) {
}
}
void DataStreamer::RecordFirstIndex(uint64_t fnum, char *buf) {
void DataStreamer::RecordFirstIndex(uint64_t fnum, size_t firstImageIndex) {
startedFlag = true;
// streamer first index needn't be
uint64_t firstVal = fnum - (*((uint32_t *)(buf + FIFO_DATASIZE_NUMBYTES)));
firstIndex = firstVal;
firstIndex = firstImageIndex;
LOG(logDEBUG1) << index << " First Index: " << firstIndex
<< ", First Streamer Index:" << fnum;
}
@ -110,20 +107,23 @@ void DataStreamer::CloseZmqSocket() {
void DataStreamer::ThreadExecution() {
char *buffer = nullptr;
fifo->PopAddressToStream(buffer);
LOG(logDEBUG5) << "DataStreamer " << index
<< ", "
"pop 0x"
LOG(logDEBUG5) << "DataStreamer " << index << ", pop 0x"
<< std::hex << (void *)(buffer) << std::dec << ":" << buffer;
auto *memImage = reinterpret_cast<image_structure *>(buffer);
// check dummy
auto numBytes = *reinterpret_cast<uint32_t *>(buffer);
LOG(logDEBUG1) << "DataStreamer " << index << ", Numbytes:" << numBytes;
if (numBytes == DUMMY_PACKET_VALUE) {
LOG(logDEBUG1) << "DataStreamer " << index << ", Numbytes:" << memImage->size ;
if (memImage->size == DUMMY_PACKET_VALUE) {
StopProcessing(buffer);
return;
}
ProcessAnImage(buffer);
// streamer first index needn't be the very first index
if (!startedFlag) {
RecordFirstIndex(memImage->header.detHeader.frameNumber, memImage->firstIndex);
}
ProcessAnImage(memImage->header.detHeader, memImage->size, memImage->data);
// free
fifo->FreeAddress(buffer);
@ -131,12 +131,9 @@ void DataStreamer::ThreadExecution() {
void DataStreamer::StopProcessing(char *buf) {
LOG(logDEBUG1) << "DataStreamer " << index << ": Dummy";
sls_receiver_header *header = (sls_receiver_header *)(buf);
// send dummy header and data
if (!SendHeader(header, 0, 0, 0, true)) {
LOG(logERROR) << "Could not send zmq dummy header for streamer "
<< index;
if (!SendDummyHeader()) {
LOG(logERROR) << "Could not send zmq dummy header for streamer for port "
<< zmqSocket->GetPortNumber();
}
fifo->FreeAddress(buf);
@ -145,38 +142,26 @@ void DataStreamer::StopProcessing(char *buf) {
}
/** buf includes only the standard header */
void DataStreamer::ProcessAnImage(char *buf) {
sls_receiver_header *header =
(sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES);
uint64_t fnum = header->detHeader.frameNumber;
void DataStreamer::ProcessAnImage(sls_detector_header header, size_t size, char* data) {
uint64_t fnum = header.frameNumber;
LOG(logDEBUG1) << "DataStreamer " << index << ": fnum:" << fnum;
if (!startedFlag) {
RecordFirstIndex(fnum, buf);
}
auto numBytes = *reinterpret_cast<uint32_t *>(buf);
// shortframe gotthard
if (completeBuffer) {
// disregarding the size modified from callback (always using
// imageSizeComplete
// instead of buf (32 bit) because gui needs imagesizecomplete and
// listener
// write imagesize
// imageSizeComplete instead of size because gui needs
// imagesizecomplete and listener writes imagesize to size
if (!SendHeader(header, generalData->imageSizeComplete,
generalData->nPixelsXComplete,
generalData->nPixelsYComplete, false)) {
if (!SendDataHeader(header, generalData->imageSizeComplete,
generalData->nPixelsXComplete, generalData->nPixelsYComplete)) {
LOG(logERROR) << "Could not send zmq header for fnum " << fnum
<< " and streamer " << index;
}
memcpy(completeBuffer + ((generalData->imageSize) * adcConfigured),
buf + FIFO_HEADER_NUMBYTES + sizeof(sls_receiver_header),
numBytes);
data, size);
if (!zmqSocket->SendData(completeBuffer,
generalData->imageSizeComplete)) {
if (!zmqSocket->SendData(completeBuffer, generalData->imageSizeComplete)) {
LOG(logERROR) << "Could not send zmq data for fnum " << fnum
<< " and streamer " << index;
}
@ -185,33 +170,29 @@ void DataStreamer::ProcessAnImage(char *buf) {
// normal
else {
if (!SendHeader(header, numBytes, generalData->nPixelsX,
generalData->nPixelsY,
false)) { // new size possibly from callback
if (!SendDataHeader(header, size, generalData->nPixelsX, generalData->nPixelsY)) {
LOG(logERROR) << "Could not send zmq header for fnum " << fnum
<< " and streamer " << index;
}
if (!zmqSocket->SendData(buf + FIFO_HEADER_NUMBYTES +
sizeof(sls_receiver_header),
numBytes)) { // new size possibly from callback
if (!zmqSocket->SendData(data, size)) {
LOG(logERROR) << "Could not send zmq data for fnum " << fnum
<< " and streamer " << index;
}
}
}
int DataStreamer::SendHeader(sls_receiver_header *rheader, uint32_t size,
uint32_t nx, uint32_t ny, bool dummy) {
int DataStreamer::SendDummyHeader() {
zmqHeader zHeader;
zHeader.data = !dummy;
zHeader.data = false;
zHeader.jsonversion = SLS_DETECTOR_JSON_HEADER_VERSION;
return zmqSocket->SendHeader(index, zHeader);
}
if (dummy) {
return zmqSocket->SendHeader(index, zHeader);
}
sls_detector_header header = rheader->detHeader;
int DataStreamer::SendDataHeader(sls_detector_header header, uint32_t size,
uint32_t nx, uint32_t ny) {
zmqHeader zHeader;
zHeader.data = true;
zHeader.jsonversion = SLS_DETECTOR_JSON_HEADER_VERSION;
uint64_t frameIndex = header.frameNumber - firstIndex;
uint64_t acquisitionIndex = header.frameNumber;
@ -258,12 +239,7 @@ int DataStreamer::SendHeader(sls_receiver_header *rheader, uint32_t size,
}
void DataStreamer::RestreamStop() {
// send dummy header
zmqHeader zHeader;
zHeader.data = false;
zHeader.jsonversion = SLS_DETECTOR_JSON_HEADER_VERSION;
int ret = zmqSocket->SendHeader(index, zHeader);
if (!ret) {
if (!SendDummyHeader()) {
throw RuntimeError(
"Could not restream Dummy Header via ZMQ for port " +
std::to_string(zmqSocket->GetPortNumber()));

View File

@ -72,10 +72,9 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
private:
/**
* Record First Index
* @param fnum current frame number
* @param buf get frame index from buffer to calculate first index to record
*/
void RecordFirstIndex(uint64_t fnum, char *buf);
void RecordFirstIndex(uint64_t fnum, size_t firstImageIndex);
void ThreadExecution();
/**
@ -88,19 +87,21 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
* Process an image popped from fifo,
* write to file if fw enabled & update parameters
*/
void ProcessAnImage(char *buf);
void ProcessAnImage(sls_detector_header header, size_t size, char* data);
int SendDummyHeader();
/**
* Create and send Json Header
* @param rheader header of image
* @param size data size (could have been modified in call back)
* @param nx number of pixels in x dim
* @param ny number of pixels in y dim
* @param dummy true if its a dummy header
* @returns 0 if error, else 1
*/
int SendHeader(sls_receiver_header *rheader, uint32_t size = 0,
uint32_t nx = 0, uint32_t ny = 0, bool dummy = true);
int SendDataHeader(sls_detector_header header, uint32_t size = 0,
uint32_t nx = 0, uint32_t ny = 0);
static const std::string TypeName;
const GeneralData *generalData{nullptr};

View File

@ -17,7 +17,7 @@
namespace sls {
Fifo::Fifo(int ind, uint32_t fifoItemSize, uint32_t depth)
Fifo::Fifo(int ind, size_t fifoItemSize, uint32_t depth)
: index(ind), memory(nullptr), fifoBound(nullptr), fifoFree(nullptr),
fifoStream(nullptr), fifoDepth(depth), status_fifoBound(0),
status_fifoFree(depth) {
@ -30,7 +30,7 @@ Fifo::~Fifo() {
DestroyFifos();
}
void Fifo::CreateFifos(uint32_t fifoItemSize) {
void Fifo::CreateFifos(size_t fifoItemSize) {
LOG(logDEBUG3) << __SHORT_AT__ << " called";
// destroy if not already
@ -41,7 +41,7 @@ void Fifo::CreateFifos(uint32_t fifoItemSize) {
fifoFree = new CircularFifo<char>(fifoDepth);
fifoStream = new CircularFifo<char>(fifoDepth);
// allocate memory
size_t mem_len = (size_t)fifoItemSize * (size_t)fifoDepth * sizeof(char);
size_t mem_len = fifoItemSize * (size_t)fifoDepth * sizeof(char);
memory = (char *)malloc(mem_len);
if (memory == nullptr) {
throw RuntimeError("Could not allocate memory for fifos");

View File

@ -28,7 +28,7 @@ class Fifo : private virtual slsDetectorDefs {
* @param fifoItemSize size of each fifo item
* @param depth fifo depth
*/
Fifo(int ind, uint32_t fifoItemSize, uint32_t depth);
Fifo(int ind, size_t fifoItemSize, uint32_t depth);
/**
* Destructor
@ -82,7 +82,7 @@ class Fifo : private virtual slsDetectorDefs {
* Create Fifos, allocate memory & push addresses into fifo
* @param fifoItemSize size of each fifo item
*/
void CreateFifos(uint32_t fifoItemSize);
void CreateFifos(size_t fifoItemSize);
/**
* Destroy Fifos and deallocate memory

View File

@ -81,9 +81,7 @@ class File : private virtual slsDetectorDefs {
"should be overloaded by a derived class";
};
virtual void WriteToFile(char *buffer, const int buffersize,
const uint64_t currentFrameNumber,
const uint32_t numPacketsCaught) = 0;
virtual void WriteToFile(char *imageData, sls_receiver_header& header, const int imageSize, const uint64_t currentFrameNumber,const uint32_t numPacketsCaught) = 0;
protected:
slsDetectorDefs::fileFormat format_;

View File

@ -37,8 +37,6 @@ class GeneralData {
uint32_t packetIndexMask{0};
uint32_t packetIndexOffset{0};
uint32_t maxFramesPerFile{0};
/** Header size of data saved into fifo buffer at a time*/
uint32_t fifoBufferHeaderSize{0};
uint32_t defaultFifoDepth{0};
uint32_t numUDPInterfaces{1};
uint32_t headerPacketSize{0};
@ -167,8 +165,6 @@ class GotthardData : public GeneralData {
nPixelsY = 1;
headerSizeinPacket = 6;
maxFramesPerFile = MAX_FRAMES_PER_FILE;
fifoBufferHeaderSize =
FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header);
UpdateImageSize();
};
@ -297,8 +293,6 @@ class EigerData : public GeneralData {
myDetectorType = slsDetectorDefs::EIGER;
headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header);
maxFramesPerFile = EIGER_MAX_FRAMES_PER_FILE;
fifoBufferHeaderSize =
FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header);
numUDPInterfaces = 2;
headerPacketSize = 40;
standardheader = true;
@ -337,8 +331,6 @@ class JungfrauData : public GeneralData {
dataSize = 8192;
packetSize = headerSizeinPacket + dataSize;
maxFramesPerFile = JFRAU_MAX_FRAMES_PER_FILE;
fifoBufferHeaderSize =
FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header);
defaultFifoDepth = 2500;
standardheader = true;
maxRowsPerReadout = 512;
@ -371,8 +363,6 @@ class Mythen3Data : public GeneralData {
nPixelsY = 1;
headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header);
maxFramesPerFile = MYTHEN3_MAX_FRAMES_PER_FILE;
fifoBufferHeaderSize =
FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header);
defaultFifoDepth = 50000;
standardheader = true;
defaultUdpSocketBufferSize = (1000 * 1024 * 1024);
@ -443,8 +433,6 @@ class Gotthard2Data : public GeneralData {
headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header);
dataSize = 2560; // 1280 channels * 2 bytes
maxFramesPerFile = GOTTHARD2_MAX_FRAMES_PER_FILE;
fifoBufferHeaderSize =
FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header);
defaultFifoDepth = 50000;
standardheader = true;
vetoDataSize = 160;
@ -501,8 +489,6 @@ class ChipTestBoardData : public GeneralData {
frameIndexOffset = 8; // 10g
packetIndexMask = 0xFF; // 10g
maxFramesPerFile = CTB_MAX_FRAMES_PER_FILE;
fifoBufferHeaderSize =
FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header);
defaultFifoDepth = 2500;
standardheader = true;
UpdateImageSize();
@ -590,8 +576,6 @@ class MoenchData : public GeneralData {
headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header);
frameIndexMask = 0xFFFFFF;
maxFramesPerFile = MOENCH_MAX_FRAMES_PER_FILE;
fifoBufferHeaderSize =
FIFO_HEADER_NUMBYTES + sizeof(slsDetectorDefs::sls_receiver_header);
defaultFifoDepth = 2500;
standardheader = true;
UpdateImageSize();

View File

@ -222,9 +222,7 @@ void HDF5DataFile::CreateFile() {
}
}
void HDF5DataFile::WriteToFile(char *buffer, const int buffersize,
const uint64_t currentFrameNumber,
const uint32_t numPacketsCaught) {
void HDF5DataFile::WriteToFile(char *imageData, sls_receiver_header& header, const int imageSize, const uint64_t currentFrameNumber, const uint32_t numPacketsCaught) {
// check if maxframesperfile = 0 for infinite
if (maxFramesPerFile_ && (numFramesInFile_ >= maxFramesPerFile_)) {
@ -240,8 +238,8 @@ void HDF5DataFile::WriteToFile(char *buffer, const int buffersize,
ExtendDataset();
}
WriteDataFile(currentFrameNumber, buffer + sizeof(sls_receiver_header));
WriteParameterDatasets(currentFrameNumber, (sls_receiver_header *)(buffer));
WriteDataFile(currentFrameNumber, imageData);
WriteParameterDatasets(currentFrameNumber, header);
}
void HDF5DataFile::Convert12to16Bit(uint16_t *dst, uint8_t *src) {
@ -301,14 +299,14 @@ void HDF5DataFile::WriteDataFile(const uint64_t currentFrameNumber,
}
void HDF5DataFile::WriteParameterDatasets(const uint64_t currentFrameNumber,
sls_receiver_header *rheader) {
sls_receiver_header rheader) {
std::lock_guard<std::mutex> lock(*hdf5Lib_);
uint64_t fnum =
((maxFramesPerFile_ == 0) ? currentFrameNumber
: currentFrameNumber % maxFramesPerFile_);
sls_detector_header header = rheader->detHeader;
sls_detector_header header = rheader.detHeader;
hsize_t count[1] = {1};
hsize_t start[1] = {fnum};
int i = 0;
@ -358,7 +356,7 @@ void HDF5DataFile::WriteParameterDatasets(const uint64_t currentFrameNumber,
// contiguous bitset
if (sizeof(sls_bitset) == sizeof(bitset_storage)) {
dataSetPara_[13]->write((char *)&(rheader->packetsMask),
dataSetPara_[13]->write((char *)&(rheader.packetsMask),
parameterDataTypes_[13], memspace,
*dataSpacePara_);
}
@ -368,7 +366,7 @@ void HDF5DataFile::WriteParameterDatasets(const uint64_t currentFrameNumber,
// get contiguous representation of bit mask
bitset_storage storage;
memset(storage, 0, sizeof(bitset_storage));
sls_bitset bits = rheader->packetsMask;
sls_bitset bits = rheader.packetsMask;
for (int i = 0; i < MAX_NUM_PACKETS; ++i)
storage[i >> 3] |= (bits[i] << (i & 7));
// write bitmask

View File

@ -31,16 +31,14 @@ class HDF5DataFile : private virtual slsDetectorDefs, public File {
const uint32_t nPixelsX, const uint32_t nPixelsY,
const uint32_t dynamicRange) override;
void WriteToFile(char *buffer, const int buffersize,
const uint64_t currentFrameNumber,
const uint32_t numPacketsCaught) override;
void WriteToFile(char *imageData, sls_receiver_header& header, const int imageSize, const uint64_t currentFrameNumber, const uint32_t numPacketsCaught) override;
private:
void CreateFile();
void Convert12to16Bit(uint16_t *dst, uint8_t *src);
void WriteDataFile(const uint64_t currentFrameNumber, char *buffer);
void WriteParameterDatasets(const uint64_t currentFrameNumber,
sls_receiver_header *rheader);
sls_receiver_header rheader);
void ExtendDataset();
int index_;

View File

@ -67,16 +67,16 @@ void Implementation::SetThreadPriorities() {
void Implementation::SetupFifoStructure() {
fifo.clear();
for (int i = 0; i < numUDPInterfaces; ++i) {
uint32_t datasize = generalData->imageSize;
size_t datasize = generalData->imageSize;
// veto data size
if (detType == GOTTHARD2 && i != 0) {
datasize = generalData->vetoImageSize;
}
datasize += IMAGE_STRUCTURE_HEADER_SIZE;
// create fifo structure
try {
fifo.push_back(sls::make_unique<Fifo>(
i, datasize + (generalData->fifoBufferHeaderSize), fifoDepth));
fifo.push_back(sls::make_unique<Fifo>(i, datasize, fifoDepth));
} catch (...) {
fifo.clear();
fifoDepth = 0;
@ -93,9 +93,7 @@ void Implementation::SetupFifoStructure() {
dataStreamer[i]->SetFifo(fifo[i].get());
LOG(logINFO) << "Memory Allocated for Fifo " << i << ": "
<< (double)(((size_t)(datasize) +
(size_t)(generalData->fifoBufferHeaderSize)) *
(size_t)fifoDepth) /
<< (double)(datasize * (size_t)fifoDepth) /
(double)(1024 * 1024)
<< " MB";
}
@ -181,8 +179,7 @@ void Implementation::setDetectorType(const detectorType d) {
listener.push_back(sls::make_unique<Listener>(
i, detType, fifo_ptr, &status, &udpPortNum[i], &eth[i],
&udpSocketBufferSize, &actualUDPSocketBufferSize,
&framesPerFile, &frameDiscardMode, &detectorDataStream[i],
&silentMode));
&framesPerFile, &frameDiscardMode, &silentMode));
int ctbAnalogDataBytes = 0;
if (detType == CHIPTESTBOARD) {
ctbAnalogDataBytes = generalData->GetNumberOfAnalogDatabytes();
@ -201,9 +198,10 @@ void Implementation::setDetectorType(const detectorType d) {
}
// set up writer and callbacks
for (const auto &it : listener) {
it->SetGeneralData(generalData);
it->SetActivate(activated);
for (int i = 0; i != (int)listener.size(); ++i) {
listener[i]->SetGeneralData(generalData);
listener[i]->SetActivate(activated);
listener[i]->SetDetectorDatastream(detectorDataStream[i]);
}
for (const auto &it : dataProcessor) {
it->SetGeneralData(generalData);
@ -580,15 +578,25 @@ std::vector<int64_t> Implementation::getCurrentFrameIndex() const {
}
double Implementation::getProgress() const {
if (!activated || (!detectorDataStream[0] && !detectorDataStream[1])) {
std::vector<bool> disabledPort;
for (auto &it : listener) {
disabledPort.push_back(it->isPortDisabled());
}
// all ports disabled
if (allEqualTo<bool>(disabledPort, true)) {
return 100.00;
}
// if disabled, considering only 1 port
// any disabled
double totalFrames = (double)(numberOfTotalFrames * listener.size());
if (!detectorDataStream[0] || !detectorDataStream[1]) {
totalFrames /= 2;
}
if (anyEqualTo<bool>(disabledPort, true)) {
for (auto it : disabledPort) {
if (it) {
totalFrames /= 2;
}
}
}
double progress = 0;
int index = 0;
@ -1003,11 +1011,11 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
listener.push_back(sls::make_unique<Listener>(
i, detType, fifo_ptr, &status, &udpPortNum[i], &eth[i],
&udpSocketBufferSize, &actualUDPSocketBufferSize,
&framesPerFile, &frameDiscardMode, &detectorDataStream[i],
&silentMode));
&framesPerFile, &frameDiscardMode, &silentMode));
listener[i]->SetGeneralData(generalData);
listener[i]->SetActivate(activated);
listener[i]->SetNoRoi(portRois[i].noRoi());
listener[i]->SetDetectorDatastream(detectorDataStream[i]);
int ctbAnalogDataBytes = 0;
if (detType == CHIPTESTBOARD) {

View File

@ -18,6 +18,7 @@
#include <cerrno>
#include <cstring>
#include <iostream>
#include <unistd.h>
namespace sls {
@ -26,16 +27,20 @@ const std::string Listener::TypeName = "Listener";
Listener::Listener(int ind, detectorType dtype, Fifo *f,
std::atomic<runStatus> *s, uint32_t *portno, std::string *e,
int *us, int *as, uint32_t *fpf, frameDiscardPolicy *fdp,
bool *detds, bool *sm)
bool *sm)
: ThreadObject(ind, TypeName), fifo(f), myDetectorType(dtype), status(s),
udpPortNumber(portno), eth(e), udpSocketBufferSize(us),
actualUDPSocketBufferSize(as), framesPerFile(fpf), frameDiscardMode(fdp),
detectorDataStream(detds), silentMode(sm) {
actualUDPSocketBufferSize(as), framesPerFile(fpf), frameDiscardMode(fdp), silentMode(sm) {
LOG(logDEBUG) << "Listener " << ind << " created";
}
Listener::~Listener() = default;
bool Listener::isPortDisabled() const {
return disabledPort;
}
uint64_t Listener::GetPacketsCaught() const { return numPacketsCaught; }
uint64_t Listener::GetNumCompleteFramesCaught() const {
@ -48,7 +53,7 @@ uint64_t Listener::GetLastFrameIndexCaught() const {
int64_t Listener::GetNumMissingPacket(bool stoppedFlag,
uint64_t numPackets) const {
if (!activated || !(*detectorDataStream) || noRoi) {
if (disabledPort) {
return 0;
}
if (!stoppedFlag) {
@ -101,7 +106,6 @@ void Listener::RecordFirstIndex(uint64_t fnum) {
// listen to this fnum, later +1
currentFrameIndex = fnum;
lastCaughtFrameIndex = fnum;
startedFlag = true;
firstIndex = fnum;
@ -114,12 +118,23 @@ void Listener::RecordFirstIndex(uint64_t fnum) {
void Listener::SetGeneralData(GeneralData *g) { generalData = g; }
void Listener::SetActivate(bool enable) { activated = enable; }
void Listener::SetActivate(bool enable) {
activated = enable;
disabledPort = (!activated || !detectorDataStream || noRoi);
}
void Listener::SetNoRoi(bool enable) {noRoi = enable; }
void Listener::SetDetectorDatastream(bool enable) {
detectorDataStream = enable;
disabledPort = (!activated || !detectorDataStream || noRoi);
}
void Listener::SetNoRoi(bool enable) {
noRoi = enable;
disabledPort = (!activated || !detectorDataStream || noRoi);
}
void Listener::CreateUDPSockets() {
if (!activated || !(*detectorDataStream) || noRoi) {
if (disabledPort) {
return;
}
@ -160,6 +175,8 @@ void Listener::CreateUDPSockets() {
void Listener::ShutDownUDPSocket() {
if (udpSocket) {
udpSocketAlive = false;
// give other thread time after udpSocketAlive is changed
usleep(0);
udpSocket->Shutdown();
LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber;
}
@ -169,7 +186,7 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int s) {
LOG(logINFO) << "Testing UDP Socket Buffer size " << s << " with test port "
<< *udpPortNumber;
if (!activated || !(*detectorDataStream) || noRoi) {
if (disabledPort) {
*actualUDPSocketBufferSize = (s * 2);
return;
}
@ -219,50 +236,29 @@ void Listener::SetHardCodedPosition(uint16_t r, uint16_t c) {
void Listener::ThreadExecution() {
char *buffer;
int rc = 0;
fifo->GetNewAddress(buffer);
LOG(logDEBUG5) << "Listener " << index
<< ", "
"pop 0x"
LOG(logDEBUG5) << "Listener " << index << ", pop 0x"
<< std::hex << (void *)(buffer) << std::dec << ":" << buffer;
auto *memImage = reinterpret_cast<image_structure *>(buffer);
// udpsocket doesnt exist
if (activated && *detectorDataStream && !noRoi &&!udpSocketAlive && !carryOverFlag) {
// LOG(logERROR) << "Listening_Thread " << index << ": UDP Socket not
// created or shut down earlier";
(*((uint32_t *)buffer)) = 0;
StopListening(buffer);
if ((*status == TRANSMITTING || !udpSocketAlive) && !carryOverFlag) {
StopListening(buffer, memImage->size);
return;
}
// get data
if ((*status != TRANSMITTING &&
(!activated || !(*detectorDataStream) || noRoi || udpSocketAlive)) ||
carryOverFlag) {
rc = ListenToAnImage(buffer);
// reset header and size and get data
memset(memImage, 0, IMAGE_STRUCTURE_HEADER_SIZE);
int rc = ListenToAnImage(memImage->header, memImage->data);
// end of acquisition or discarding image
if (rc <= 0) {
fifo->FreeAddress(buffer);
return;
}
// error check, (should not be here) if not transmitting yet (previous if)
// rc should be > 0
if (rc == 0) {
if (!udpSocketAlive) {
(*((uint32_t *)buffer)) = 0;
StopListening(buffer);
} else
fifo->FreeAddress(buffer);
return;
}
// discarding image
else if (rc < 0) {
fifo->FreeAddress(buffer);
return;
}
(*((uint32_t *)buffer)) = rc;
// push into fifo
// valid image, set size and push into fifo
memImage->size = rc;
fifo->PushAddress(buffer);
// Statistics
@ -276,69 +272,45 @@ void Listener::ThreadExecution() {
}
}
void Listener::StopListening(char *buf) {
(*((uint32_t *)buf)) = DUMMY_PACKET_VALUE;
void Listener::StopListening(char *buf, size_t & size) {
size = DUMMY_PACKET_VALUE;
fifo->PushAddress(buf);
StopRunning();
LOG(logDEBUG1) << index << ": Listening Packets (" << *udpPortNumber
LOG(logDEBUG1) << index << ": Listening Completed. Packets (" << *udpPortNumber
<< ") : " << numPacketsCaught;
LOG(logDEBUG1) << index << ": Listening Completed";
}
/* buf includes the fifo header and packet header */
uint32_t Listener::ListenToAnImage(char *buf) {
uint32_t Listener::ListenToAnImage(sls_receiver_header & dstHeader, char *dstData) {
int rc = 0;
uint64_t fnum = 0;
uint32_t pnum = 0;
uint64_t bnum = 0;
uint32_t numpackets = 0;
uint32_t dsize = generalData->dataSize;
uint32_t imageSize = generalData->imageSize;
uint32_t packetSize = generalData->packetSize;
uint32_t hsize = generalData->headerSizeinPacket;
uint32_t fifohsize = generalData->fifoBufferHeaderSize;
bool standardheader = generalData->standardheader;
bool standardHeader = generalData->standardheader;
if (myDetectorType == GOTTHARD2 && index != 0) {
dsize = generalData->vetoDataSize;
imageSize = generalData->vetoImageSize;
packetSize = generalData->vetoPacketSize;
hsize = generalData->vetoHsize;
standardheader = false;
standardHeader = false;
}
uint32_t pperFrame = generalData->packetsPerFrame;
bool isHeaderEmpty = true;
sls_detector_header *old_header = nullptr;
sls_receiver_header *new_header = nullptr;
uint32_t corrected_dsize = dsize - ((pperFrame * dsize) - imageSize);
sls_detector_header *srcDetHeader = nullptr;
// reset to -1
memset(buf, 0, fifohsize);
new_header = (sls_receiver_header *)(buf + FIFO_HEADER_NUMBYTES);
// deactivated port (eiger) or deactivated (eiger)
if (!(*detectorDataStream) || !activated || noRoi) {
return 0;
}
// look for carry over
// carry over packet
if (carryOverFlag) {
LOG(logDEBUG3) << index << "carry flag";
// check if its the current image packet
// -------------------------- new header
// ----------------------------------------------------------------------
if (standardheader) {
old_header = (sls_detector_header *)(&carryOverPacket[0]);
fnum = old_header->frameNumber;
pnum = old_header->packetNumber;
}
// -------------------old header
// -----------------------------------------------------------------------------
else {
generalData->GetHeaderInfo(index, &carryOverPacket[0],
oddStartingPacket, fnum, pnum, bnum);
}
//------------------------------------------------------------------------------------------------------------
GetPacketIndices(fnum, pnum, bnum, standardHeader, carryOverPacket.get(), srcDetHeader);
// future packet
if (fnum != currentFrameIndex) {
if (fnum < currentFrameIndex) {
LOG(logERROR)
@ -347,156 +319,31 @@ uint32_t Listener::ListenToAnImage(char *buf) {
carryOverFlag = false;
return 0;
}
switch (*frameDiscardMode) {
case DISCARD_EMPTY_FRAMES:
if (!numpackets) {
LOG(logDEBUG)
<< index << " Skipped fnum:" << currentFrameIndex;
currentFrameIndex = fnum;
return -1;
}
break;
case DISCARD_PARTIAL_FRAMES:
LOG(logDEBUG)
<< index << " discarding fnum:" << currentFrameIndex;
currentFrameIndex = fnum;
return -1;
default:
break;
}
new_header->detHeader.packetNumber = numpackets;
if (isHeaderEmpty) {
new_header->detHeader.row = row;
new_header->detHeader.column = column;
}
new_header->detHeader.frameNumber = currentFrameIndex;
++currentFrameIndex;
return imageSize;
}
// copy packet
switch (myDetectorType) {
// for gotthard, 1st packet: 4 bytes fnum, CACA
// + CACA, 639*2 bytes data 2nd packet: 4
// bytes fnum, previous 1*2 bytes data + 640*2 bytes data !!
case GOTTHARD:
if (!pnum)
memcpy(buf + fifohsize, &carryOverPacket[hsize + 4], dsize - 2);
else
memcpy(buf + fifohsize + dsize - 2, &carryOverPacket[hsize],
dsize + 2);
break;
case CHIPTESTBOARD:
case MOENCH:
if (pnum == (pperFrame - 1))
memcpy(buf + fifohsize + (pnum * dsize),
&carryOverPacket[hsize], corrected_dsize);
else
memcpy(buf + fifohsize + (pnum * dsize),
&carryOverPacket[hsize], dsize);
break;
default:
memcpy(buf + fifohsize + (pnum * dsize), &carryOverPacket[hsize],
dsize);
break;
return HandleFuturePacket(false, numpackets, fnum, isHeaderEmpty, imageSize, dstHeader);
}
CopyPacket(dstData, carryOverPacket.get(), dsize, hsize, corrected_dsize, numpackets, isHeaderEmpty, standardHeader, dstHeader, srcDetHeader, pnum, bnum);
carryOverFlag = false;
++numpackets; // number of packets in this image (each time its copied
// to buf)
new_header->packetsMask[(
(pnum < MAX_NUM_PACKETS) ? pnum : MAX_NUM_PACKETS - 1)] = 1;
// writer header
if (isHeaderEmpty) {
// -------------------------- new header
// ----------------------------------------------------------------------
if (standardheader) {
memcpy((char *)new_header, (char *)old_header,
sizeof(sls_detector_header));
}
// -------------------old header
// ------------------------------------------------------------------------------
else {
new_header->detHeader.frameNumber = fnum;
new_header->detHeader.bunchId = bnum;
new_header->detHeader.row = row;
new_header->detHeader.column = column;
new_header->detHeader.detType =
(uint8_t)generalData->myDetectorType;
new_header->detHeader.version =
(uint8_t)SLS_DETECTOR_HEADER_VERSION;
}
//------------------------------------------------------------------------------------------------------------
isHeaderEmpty = false;
}
}
// until last packet isHeaderEmpty to account for gotthard short frame, else
// never entering this loop)
while (numpackets < pperFrame) {
// listen to new packet
rc = 0;
int rc = 0;
if (udpSocketAlive) {
rc = udpSocket->ReceiveDataOnly(&listeningPacket[0]);
}
// end of acquisition
if (rc <= 0) {
if (numpackets == 0)
return 0; // empty image
switch (*frameDiscardMode) {
case DISCARD_EMPTY_FRAMES:
if (!numpackets) {
return -1;
}
break;
case DISCARD_PARTIAL_FRAMES:
// empty packet now, but not empty image (EOA)
if (numpackets) {
LOG(logDEBUG)
<< index << " discarding fnum:" << currentFrameIndex;
}
return -1;
default:
break;
}
new_header->detHeader.packetNumber =
numpackets; // number of packets caught
if (isHeaderEmpty) {
new_header->detHeader.row = row;
new_header->detHeader.column = column;
}
new_header->detHeader.frameNumber = currentFrameIndex;
return imageSize; // empty packet now, but not empty image (EOA)
return 0;
return HandleFuturePacket(true, numpackets, fnum, isHeaderEmpty, imageSize, dstHeader);
}
// update parameters
numPacketsCaught++; // record immediately to get more time before socket
// shutdown
numPacketsCaught++;
numPacketsStatistic++;
// -------------------------- new header
// ----------------------------------------------------------------------
if (standardheader) {
old_header = (sls_detector_header *)(&listeningPacket[0]);
fnum = old_header->frameNumber;
pnum = old_header->packetNumber;
}
// -------------------old header
// -----------------------------------------------------------------------------
else {
// set first packet to be odd or even (check required when switching
// from roi to no roi)
if (myDetectorType == GOTTHARD && !startedFlag) {
oddStartingPacket = generalData->SetOddStartingPacket(
index, &listeningPacket[0]);
}
generalData->GetHeaderInfo(index, &listeningPacket[0],
oddStartingPacket, fnum, pnum, bnum);
}
//------------------------------------------------------------------------------------------------------------
GetPacketIndices(fnum, pnum, bnum, standardHeader, listeningPacket.get(), srcDetHeader);
// Eiger Firmware in a weird state
if (myDetectorType == EIGER && fnum == 0) {
@ -504,120 +351,37 @@ uint32_t Listener::ListenToAnImage(char *buf) {
<< "]: Got Frame Number "
"Zero from Firmware. Discarding Packet";
numPacketsCaught--;
numPacketsStatistic--;
return 0;
}
lastCaughtFrameIndex = fnum;
LOG(logDEBUG1) << "Listening " << index
<< ": currentfindex:" << currentFrameIndex
<< ", fnum:" << fnum << ", pnum:" << pnum
<< ", numpackets:" << numpackets;
if (!startedFlag)
RecordFirstIndex(fnum);
// bad packet
if (pnum >= pperFrame) {
LOG(logERROR) << "Bad packet " << pnum << "(fnum: " << fnum
<< "), throwing away. "
"Packets caught so far: "
<< "), throwing away. Packets caught so far: "
<< numpackets;
return 0; // bad packet
return 0;
}
// future packet by looking at image number (all other
// detectors)
// future packet
if (fnum != currentFrameIndex) {
carryOverFlag = true;
memcpy(carryOverPacket.get(), &listeningPacket[0], packetSize);
switch (*frameDiscardMode) {
case DISCARD_EMPTY_FRAMES:
if (!numpackets) {
LOG(logDEBUG)
<< index << " Skipped fnum:" << currentFrameIndex;
currentFrameIndex = fnum;
return -1;
}
break;
case DISCARD_PARTIAL_FRAMES:
LOG(logDEBUG)
<< index << " discarding fnum:" << currentFrameIndex;
currentFrameIndex = fnum;
return -1;
default:
break;
}
new_header->detHeader.packetNumber =
numpackets; // number of packets caught
if (isHeaderEmpty) {
new_header->detHeader.row = row;
new_header->detHeader.column = column;
}
new_header->detHeader.frameNumber = currentFrameIndex;
++currentFrameIndex;
return imageSize;
}
// copy packet
switch (myDetectorType) {
// for gotthard, 1st packet: 4 bytes fnum, CACA
// + CACA, 639*2 bytes data 2nd packet: 4
// bytes fnum, previous 1*2 bytes data + 640*2 bytes data !!
case GOTTHARD:
if (!pnum)
memcpy(buf + fifohsize + (pnum * dsize),
&listeningPacket[hsize + 4], dsize - 2);
else
memcpy(buf + fifohsize + (pnum * dsize) - 2,
&listeningPacket[hsize], dsize + 2);
break;
case CHIPTESTBOARD:
case MOENCH:
if (pnum == (pperFrame - 1))
memcpy(buf + fifohsize + (pnum * dsize),
&listeningPacket[hsize], corrected_dsize);
else
memcpy(buf + fifohsize + (pnum * dsize),
&listeningPacket[hsize], dsize);
break;
default:
memcpy(buf + fifohsize + (pnum * dsize), &listeningPacket[hsize],
dsize);
break;
}
++numpackets; // number of packets in this image (each time its copied
// to buf)
new_header->packetsMask[(
(pnum < MAX_NUM_PACKETS) ? pnum : MAX_NUM_PACKETS - 1)] = 1;
if (isHeaderEmpty) {
// -------------------------- new header
// ----------------------------------------------------------------------
if (standardheader) {
memcpy((char *)new_header, (char *)old_header,
sizeof(sls_detector_header));
}
// -------------------old header
// ------------------------------------------------------------------------------
else {
new_header->detHeader.frameNumber = fnum;
new_header->detHeader.bunchId = bnum;
new_header->detHeader.row = row;
new_header->detHeader.column = column;
new_header->detHeader.detType =
(uint8_t)generalData->myDetectorType;
new_header->detHeader.version =
(uint8_t)SLS_DETECTOR_HEADER_VERSION;
}
//------------------------------------------------------------------------------------------------------------
isHeaderEmpty = false;
return HandleFuturePacket(false, numpackets, fnum, isHeaderEmpty, imageSize, dstHeader);
}
CopyPacket(dstData, listeningPacket.get(), dsize, hsize, corrected_dsize, numpackets, isHeaderEmpty, standardHeader, dstHeader, srcDetHeader, pnum, bnum);
}
// complete image
new_header->detHeader.packetNumber = numpackets; // number of packets caught
new_header->detHeader.frameNumber = currentFrameIndex;
dstHeader.detHeader.packetNumber = numpackets;
if (numpackets == pperFrame) {
++numCompleteFramesCaught;
}
@ -625,6 +389,104 @@ uint32_t Listener::ListenToAnImage(char *buf) {
return imageSize;
}
size_t Listener::HandleFuturePacket(bool EOA, uint32_t numpackets, uint64_t fnum, bool isHeaderEmpty, size_t imageSize, sls_receiver_header& dstHeader) {
switch (*frameDiscardMode) {
case DISCARD_EMPTY_FRAMES:
if (!numpackets) {
if (!EOA) {
LOG(logDEBUG) << index << " Skipped fnum:" << currentFrameIndex;
currentFrameIndex = fnum;
}
return -1;
}
break;
case DISCARD_PARTIAL_FRAMES:
LOG(logDEBUG) << index << " discarding fnum:" << currentFrameIndex;
if (!EOA) {
currentFrameIndex = fnum;
}
return -1;
default:
break;
}
dstHeader.detHeader.packetNumber = numpackets;
// for empty frames (padded)
if (isHeaderEmpty) {
dstHeader.detHeader.frameNumber = currentFrameIndex;
// no packet to get bnum
dstHeader.detHeader.row = row;
dstHeader.detHeader.column = column;
dstHeader.detHeader.detType = static_cast<uint8_t>(generalData->myDetectorType);
dstHeader.detHeader.version = static_cast<uint8_t>(SLS_DETECTOR_HEADER_VERSION);
}
if (!EOA) {
++currentFrameIndex;
}
return imageSize;
}
void Listener::CopyPacket(char* dst, char* src, uint32_t dataSize, uint32_t detHeaderSize, uint32_t correctedDataSize, uint32_t &numpackets, bool &isHeaderEmpty, bool standardHeader, sls_receiver_header& dstHeader, sls_detector_header * srcDetHeader, uint32_t pnum, uint64_t bnum) {
// copy packet data
switch (myDetectorType) {
// for gotthard, 1st packet: 4 bytes fnum, CACA
// + CACA, 639*2 bytes data 2nd packet: 4
// bytes fnum, previous 1*2 bytes data + 640*2 bytes data !!
case GOTTHARD:
if (!pnum)
memcpy(dst, &src[detHeaderSize + 4], dataSize - 2);
else
memcpy(dst + dataSize - 2, &src[detHeaderSize], dataSize + 2);
break;
case CHIPTESTBOARD:
case MOENCH:
if (pnum == (generalData->packetsPerFrame - 1))
memcpy(dst + (pnum * dataSize), &src[detHeaderSize], correctedDataSize);
else
memcpy(dst + (pnum * dataSize), &src[detHeaderSize], dataSize);
break;
default:
memcpy(dst + (pnum * dataSize), &src[detHeaderSize], dataSize);
break;
}
++numpackets;
dstHeader.packetsMask[(
(pnum < MAX_NUM_PACKETS) ? pnum : MAX_NUM_PACKETS - 1)] = 1;
// writer header
if (isHeaderEmpty) {
if (standardHeader) {
memcpy((char *)&dstHeader, (char *)srcDetHeader, sizeof(sls_detector_header));
} else {
dstHeader.detHeader.frameNumber = currentFrameIndex;
dstHeader.detHeader.bunchId = bnum;
dstHeader.detHeader.row = row;
dstHeader.detHeader.column = column;
dstHeader.detHeader.detType = static_cast<uint8_t>(generalData->myDetectorType);
dstHeader.detHeader.version = static_cast<uint8_t>(SLS_DETECTOR_HEADER_VERSION);
}
isHeaderEmpty = false;
}
}
void Listener::GetPacketIndices(uint64_t &fnum, uint32_t &pnum, uint64_t &bnum, bool standardHeader, char* packet, sls_detector_header*& header) {
if (standardHeader) {
header = (sls_detector_header *)(&packet[0]);
fnum = header->frameNumber;
pnum = header->packetNumber;
} else {
// set first packet to be odd or even (check required when switching
// from roi to no roi)
if (myDetectorType == GOTTHARD && !startedFlag) {
oddStartingPacket = generalData->SetOddStartingPacket(index, &packet[0]);
}
generalData->GetHeaderInfo(index, &packet[0], oddStartingPacket, fnum, pnum, bnum);
}
}
void Listener::PrintFifoStatistics() {
LOG(logDEBUG1) << "numFramesStatistic:" << numFramesStatistic
<< " numPacketsStatistic:" << numPacketsStatistic

View File

@ -39,12 +39,11 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
* @param as pointer to actual udp socket buffer size
* @param fpf pointer to frames per file
* @param fdp frame discard policy
* @param detds pointer to detector data stream
* @param sm pointer to silent mode
*/
Listener(int ind, detectorType dtype, Fifo *f, std::atomic<runStatus> *s,
uint32_t *portno, std::string *e, int *us, int *as, uint32_t *fpf,
frameDiscardPolicy *fdp, bool *detds, bool *sm);
frameDiscardPolicy *fdp, bool *sm);
/**
* Destructor
@ -52,6 +51,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
*/
~Listener();
bool isPortDisabled() const;
uint64_t GetPacketsCaught() const;
uint64_t GetNumCompleteFramesCaught() const;
uint64_t GetLastFrameIndexCaught() const;
@ -65,6 +65,7 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
void ResetParametersforNewAcquisition();
void SetGeneralData(GeneralData *g);
void SetActivate(bool enable);
void SetDetectorDatastream(bool enable);
void SetNoRoi(bool enable);
void CreateUDPSockets();
void ShutDownUDPSocket();
@ -98,18 +99,22 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
* Pushes non empty buffers into fifo/ frees empty buffer,
* pushes dummy buffer into fifo
* and reset running mask by calling StopRunning()
* @param buf address of buffer
*/
void StopListening(char *buf);
void StopListening(char *buf, size_t& size);
/**
* Listen to the UDP Socket for an image,
* place them in the right order
* @param buf address of buffer
* @returns number of bytes of relevant data, can be image size or 0 (stop
* acquisition) or -1 to discard image
*/
uint32_t ListenToAnImage(char *buf);
uint32_t ListenToAnImage(sls_receiver_header & dstHeader, char *dstData);
size_t HandleFuturePacket(bool EOA, uint32_t numpackets, uint64_t fnum, bool isHeaderEmpty, size_t imageSize, sls_receiver_header& rxHeader);
void CopyPacket(char* dst, char* src, uint32_t dataSize, uint32_t detHeaderSize, uint32_t correctedDataSize, uint32_t &numpackets, bool &isHeaderEmpty, bool standardHeader, sls_receiver_header& rxHeader, sls_detector_header* detHeader, uint32_t pnum, uint64_t bnum);
void GetPacketIndices(uint64_t &fnum, uint32_t &pnum, uint64_t &bnum, bool standardHeader, char* packet, sls_detector_header*& header);
void PrintFifoStatistics();
@ -129,9 +134,10 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
uint32_t *framesPerFile;
frameDiscardPolicy *frameDiscardMode;
bool activated{false};
bool *detectorDataStream;
bool detectorDataStream{true};
bool noRoi{false};
bool *silentMode;
bool disabledPort{false};
/** row hardcoded as 1D or 2d,
* if detector does not send them yet or

View File

@ -36,11 +36,14 @@ namespace sls {
// binary
#define FILE_BUFFER_SIZE (16 * 1024 * 1024) // 16mb
// fifo
#define FIFO_HEADER_NUMBYTES (16)
#define FIFO_DATASIZE_NUMBYTES (4)
#define FIFO_PADDING_NUMBYTES \
(4) // for 8 byte alignment due to sls_receiver_header structure
// fifo
struct image_structure {
size_t size;
size_t firstIndex;
slsDetectorDefs::sls_receiver_header header;
char data[];
};
#define IMAGE_STRUCTURE_HEADER_SIZE (sizeof(size_t) + sizeof(size_t) + sizeof(slsDetectorDefs::sls_receiver_header))
// hdf5
#define MAX_CHUNKED_IMAGES (1)