Rxpointers (#504)

* gui message doesnt show if it has a '>' symbol in error msg

* minor refactoring for readability (size_t calc fifo size)

* refactoring listening udp socket code: activated and datastream dont create udp sockets anyway, rc<=- should be discarded in any case

* wip

* refactoring memory structure access

* wip: bugfix write header + data to binary

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* portRoi no roi effecto on progress

* fail at receiver progress, wip

* segfaults for char pointer in struct

* reference to header to get header and data

* refactoring

* use const defined for size of header of fifo

* updated release notes

* remove pointer in callback for sls_receiver_header pointer

* rx same name arguments in constructors

* rx: same name arguments in constructor

* rx: removing the '_' suffix in class data members

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* diff undo for clang later

* wip

* Wip

* const string&
This commit is contained in:
Dhanya Thattil 2022-08-05 09:08:18 +02:00 committed by GitHub
parent 9ac8dab8af
commit 89e293cb5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 491 additions and 514 deletions

View File

@ -20,22 +20,17 @@ void BinaryDataFile::CloseFile() {
}
void BinaryDataFile::CreateFirstBinaryDataFile(
const std::string fPath, const std::string fNamePrefix,
const std::string& fNamePrefix,
const uint64_t fIndex, const bool ovEnable, const bool sMode,
const int modulePos, const int nUnitsPerReadout,
const uint32_t uPortNumber, const uint32_t mFramesPerFile) {
subFileIndex = 0;
numFramesInFile = 0;
filePath = fPath;
fileNamePrefix = fNamePrefix;
fileIndex = fIndex;
overWriteEnable = ovEnable;
silentMode = sMode;
detIndex = modulePos;
numUnitsPerReadout = nUnitsPerReadout;
udpPortNumber = uPortNumber;
maxFramesPerFile = mFramesPerFile;
@ -46,8 +41,7 @@ void BinaryDataFile::CreateFile() {
numFramesInFile = 0;
std::ostringstream os;
os << filePath << "/" << fileNamePrefix << "_d"
<< (detIndex * numUnitsPerReadout + index) << "_f" << subFileIndex
os << fileNamePrefix << "_f" << subFileIndex
<< '_' << fileIndex << ".raw";
fileName = os.str();

View File

@ -14,12 +14,10 @@ class BinaryDataFile : private virtual slsDetectorDefs, public File {
fileFormat GetFileFormat() const override;
void CloseFile() override;
void CreateFirstBinaryDataFile(const std::string fPath,
const std::string fNamePrefix,
void CreateFirstBinaryDataFile(const std::string& fNamePrefix,
const uint64_t fIndex,
const bool ovEnable,
const bool sMode, const int modulePos,
const int nUnitsPerReadout,
const bool sMode,
const uint32_t uPortNumber,
const uint32_t mFramesPerFile) override;
@ -34,13 +32,10 @@ class BinaryDataFile : private virtual slsDetectorDefs, public File {
uint32_t numFramesInFile{0};
uint32_t subFileIndex{0};
std::string filePath;
std::string fileNamePrefix;
uint64_t fileIndex{0};
bool overWriteEnable{false};
bool silentMode{false};
int detIndex{0};
int numUnitsPerReadout{0};
uint32_t udpPortNumber{0};
uint32_t maxFramesPerFile{0};
};

View File

@ -28,8 +28,8 @@ namespace sls {
const std::string DataProcessor::typeName = "DataProcessor";
DataProcessor::DataProcessor(int index, detectorType detType, Fifo *fifo, bool *dataStreamEnable, uint32_t *streamingFrequency, uint32_t *streamingTimerInMs, uint32_t *streamingStartFnum, bool *framePadding, std::vector<int> *ctbDbitList, int *ctbDbitOffset, int *ctbAnalogDataBytes)
: ThreadObject(index, typeName), fifo(fifo), detType(detType), dataStreamEnable(dataStreamEnable), streamingFrequency(streamingFrequency), streamingTimerInMs(streamingTimerInMs), streamingStartFnum(streamingStartFnum), framePadding(framePadding), ctbDbitList(ctbDbitList), ctbDbitOffset(ctbDbitOffset), ctbAnalogDataBytes(ctbAnalogDataBytes) {
DataProcessor::DataProcessor(int index)
: ThreadObject(index, typeName) {
LOG(logDEBUG) << "DataProcessor " << index << " created";
}
@ -38,7 +38,11 @@ DataProcessor::~DataProcessor() { DeleteFiles(); }
bool DataProcessor::GetStartedFlag() const { return startedFlag; }
void DataProcessor::SetFifo(Fifo *fifo) { fifo = fifo; }
void DataProcessor::SetFifo(Fifo *f) { fifo = f; }
void DataProcessor::SetGeneralData(GeneralData *g) {
generalData = g;
}
void DataProcessor::SetActivate(bool enable) { activated = enable; }
@ -48,6 +52,30 @@ void DataProcessor::SetReceiverROI(ROI roi) {
receiverNoRoi = receiverRoi.noRoi();
}
void DataProcessor::SetDataStreamEnable(bool enable) { dataStreamEnable = enable; }
void DataProcessor::SetStreamingFrequency(uint32_t value) {
streamingFrequency = value;
}
void DataProcessor::SetStreamingTimerInMs(uint32_t value) {
streamingTimerInMs = value;
}
void DataProcessor::SetStreamingStartFnum(uint32_t value) {
streamingStartFnum = value;
}
void DataProcessor::SetFramePadding(bool enable) { framePadding = enable; }
void DataProcessor::SetCtbDbitList(std::vector<int> value) {
ctbDbitList = value;
}
void DataProcessor::SetCtbDbitOffset(int value) {
ctbDbitOffset = value;
}
void DataProcessor::ResetParametersforNewAcquisition() {
StopRunning();
startedFlag = false;
@ -67,10 +95,6 @@ void DataProcessor::RecordFirstIndex(uint64_t fnum) {
LOG(logDEBUG1) << index << " First Index:" << firstIndex;
}
void DataProcessor::SetGeneralData(GeneralData *g) {
generalData = g;
}
void DataProcessor::CloseFiles() {
if (dataFile)
dataFile->CloseFile();
@ -103,11 +127,10 @@ void DataProcessor::SetupFileWriter(const bool filewriteEnable,
}
void DataProcessor::CreateFirstFiles(
const std::string &filePath, const std::string &fileNamePrefix,
const std::string &fileNamePrefix,
const uint64_t fileIndex, const bool overWriteEnable, const bool silentMode,
const int modulePos, const int numUnitsPerReadout,
const uint32_t udpPortNumber, const uint32_t maxFramesPerFile,
const uint64_t numImages, const uint32_t dynamicRange,
const uint32_t udpPortNumber,
const uint64_t numImages,
const bool detectorDataStream) {
if (dataFile == nullptr) {
throw RuntimeError("file object not contstructed");
@ -134,15 +157,15 @@ void DataProcessor::CreateFirstFiles(
#ifdef HDF5C
case HDF5:
dataFile->CreateFirstHDF5DataFile(
filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode,
modulePos, numUnitsPerReadout, udpPortNumber, maxFramesPerFile,
numImages, nx, ny, dynamicRange);
fileNamePrefix, fileIndex, overWriteEnable, silentMode,
udpPortNumber, generalData->framesPerFile,
numImages, nx, ny, generalData->dynamicRange);
break;
#endif
case BINARY:
dataFile->CreateFirstBinaryDataFile(
filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode,
modulePos, numUnitsPerReadout, udpPortNumber, maxFramesPerFile);
fileNamePrefix, fileIndex, overWriteEnable, silentMode,
udpPortNumber, generalData->framesPerFile);
break;
default:
throw RuntimeError("Unknown file format (compile with hdf5 flags");
@ -161,9 +184,9 @@ uint32_t DataProcessor::GetFilesInAcquisition() const {
std::string DataProcessor::CreateVirtualFile(
const std::string &filePath, const std::string &fileNamePrefix,
const uint64_t fileIndex, const bool overWriteEnable, const bool silentMode,
const int modulePos, const int numUnitsPerReadout,
const uint32_t maxFramesPerFile, const uint64_t numImages,
const int numModX, const int numModY, const uint32_t dynamicRange,
const int modulePos,
const uint64_t numImages,
const int numModX, const int numModY,
std::mutex *hdf5LibMutex) {
if (receiverRoiEnabled) {
@ -171,12 +194,12 @@ std::string DataProcessor::CreateVirtualFile(
}
bool gotthard25um =
((detType == GOTTHARD || detType == GOTTHARD2) &&
((generalData->detType == GOTTHARD || generalData->detType == GOTTHARD2) &&
(numModX * numModY) == 2);
// maxframesperfile = 0 for infinite files
// 0 for infinite files
uint32_t framesPerFile =
((maxFramesPerFile == 0) ? numFramesCaught : maxFramesPerFile);
((generalData->framesPerFile == 0) ? numFramesCaught : generalData->framesPerFile);
// TODO: assumption 1: create virtual file even if no data in other
// files (they exist anyway) assumption2: virtual file max frame index
@ -184,8 +207,8 @@ std::string DataProcessor::CreateVirtualFile(
// stop acquisition)
return masterFileUtility::CreateVirtualHDF5File(
filePath, fileNamePrefix, fileIndex, overWriteEnable, silentMode,
modulePos, numUnitsPerReadout, framesPerFile,
generalData->nPixelsX, generalData->nPixelsY, dynamicRange,
modulePos, generalData->numUDPInterfaces, framesPerFile,
generalData->nPixelsX, generalData->nPixelsY, generalData->dynamicRange,
numFramesCaught, numModX, numModY, dataFile->GetPDataType(),
dataFile->GetParameterNames(), dataFile->GetParameterDataTypes(),
hdf5LibMutex, gotthard25um);
@ -273,7 +296,7 @@ void DataProcessor::StopProcessing(char *buf) {
LOG(logDEBUG1) << "DataProcessing " << index << ": Dummy";
// stream or free
if (*dataStreamEnable)
if (dataStreamEnable)
fifo->PushAddressToStream(buf);
else
fifo->FreeAddress(buf);
@ -292,29 +315,29 @@ void DataProcessor::ProcessAnImage(sls_receiver_header & header, size_t &size, s
if (!startedFlag) {
RecordFirstIndex(fnum);
if (*dataStreamEnable) {
if (dataStreamEnable) {
// restart timer
clock_gettime(CLOCK_REALTIME, &timerbegin);
timerbegin.tv_sec -= (*streamingTimerInMs) / 1000;
timerbegin.tv_nsec -= ((*streamingTimerInMs) % 1000) * 1000000;
timerbegin.tv_sec -= streamingTimerInMs / 1000;
timerbegin.tv_nsec -= (streamingTimerInMs % 1000) * 1000000;
// to send first image
currentFreqCount = *streamingFrequency - *streamingStartFnum;
currentFreqCount = streamingFrequency - streamingStartFnum;
}
}
// frame padding
if (*framePadding && nump < generalData->packetsPerFrame)
if (framePadding && nump < generalData->packetsPerFrame)
PadMissingPackets(header, data);
// rearrange ctb digital bits (if ctbDbitlist is not empty)
if (!(*ctbDbitList).empty()) {
if (!ctbDbitList.empty()) {
RearrangeDbitData(size, data);
}
// 'stream Image' check has to be done here before crop image
// stream (if time/freq to stream) or free
if (*dataStreamEnable && SendToStreamer()) {
if (dataStreamEnable && SendToStreamer()) {
if (firstStreamerFrame) {
firstStreamerFrame = false;
// write to memory structure of first streamer frame
@ -362,7 +385,7 @@ void DataProcessor::ProcessAnImage(sls_receiver_header & header, size_t &size, s
bool DataProcessor::SendToStreamer() {
// skip
if ((*streamingFrequency) == 0u) {
if (streamingFrequency == 0u) {
if (!CheckTimer())
return false;
} else {
@ -378,7 +401,7 @@ bool DataProcessor::CheckTimer() {
auto elapsed_s = (end.tv_sec - timerbegin.tv_sec) +
(end.tv_nsec - timerbegin.tv_nsec) / 1e9;
double timer_s = *streamingTimerInMs / 1e3;
double timer_s = streamingTimerInMs / 1e3;
LOG(logDEBUG1) << index << " Timer elapsed time:" << elapsed_s
<< " seconds";
@ -393,7 +416,7 @@ bool DataProcessor::CheckTimer() {
}
bool DataProcessor::CheckCount() {
if (currentFreqCount == *streamingFrequency) {
if (currentFreqCount == streamingFrequency) {
currentFreqCount = 1;
return true;
}
@ -422,7 +445,7 @@ void DataProcessor::PadMissingPackets(sls_receiver_header header, char* data) {
sls_bitset pmask = header.packetsMask;
uint32_t dsize = generalData->dataSize;
if (detType == GOTTHARD2 && index != 0) {
if (generalData->detType == GOTTHARD2 && index != 0) {
dsize = generalData->vetoDataSize;
}
uint32_t corrected_dsize =
@ -443,7 +466,7 @@ void DataProcessor::PadMissingPackets(sls_receiver_header header, char* data) {
<< std::endl;
// missing packet
switch (detType) {
switch (generalData->detType) {
// for gotthard, 1st packet: 4 bytes fnum, CACA + CACA, 639*2 bytes
// data
// 2nd packet: 4 bytes fnum, previous 1*2 bytes data +
@ -471,8 +494,9 @@ void DataProcessor::PadMissingPackets(sls_receiver_header header, char* data) {
/** ctb specific */
void DataProcessor::RearrangeDbitData(size_t & size, char *data) {
int nAnalogDataBytes = generalData->GetNumberOfAnalogDatabytes();
// TODO! (Erik) Refactor and add tests
int ctbDigitalDataBytes = size - (*ctbAnalogDataBytes) - (*ctbDbitOffset);
int ctbDigitalDataBytes = size - nAnalogDataBytes - ctbDbitOffset;
// no digital data
if (ctbDigitalDataBytes == 0) {
@ -485,15 +509,15 @@ void DataProcessor::RearrangeDbitData(size_t & size, char *data) {
// ceil as numResult8Bits could be decimal
const int numResult8Bits =
ceil((numSamples * (*ctbDbitList).size()) / 8.00);
ceil((numSamples * ctbDbitList.size()) / 8.00);
std::vector<uint8_t> result(numResult8Bits);
uint8_t *dest = &result[0];
auto *source = (uint64_t *)(data + (*ctbAnalogDataBytes) + (*ctbDbitOffset));
auto *source = (uint64_t *)(data + nAnalogDataBytes + ctbDbitOffset);
// loop through digital bit enable vector
int bitoffset = 0;
for (auto bi : (*ctbDbitList)) {
for (auto bi : ctbDbitList) {
// where numbits * numsamples is not a multiple of 8
if (bitoffset != 0) {
bitoffset = 0;
@ -515,7 +539,7 @@ void DataProcessor::RearrangeDbitData(size_t & size, char *data) {
}
// copy back to memory and update size
memcpy(data + (*ctbAnalogDataBytes), result.data(), numResult8Bits * sizeof(uint8_t));
memcpy(data + nAnalogDataBytes, result.data(), numResult8Bits * sizeof(uint8_t));
size = numResult8Bits * sizeof(uint8_t);
}

View File

@ -29,31 +29,36 @@ struct MasterAttributes;
class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
public:
DataProcessor(int index, detectorType detType, Fifo *fifo, bool *dataStreamEnable, uint32_t *streamingFrequency, uint32_t *streamingTimerInMs, uint32_t *streamingStartFnum, bool *framePadding, std::vector<int> *ctbDbitList, int *ctbDbitOffset, int *ctbAnalogDataBytes);
DataProcessor(int index);
~DataProcessor() override;
bool GetStartedFlag() const;
void SetFifo(Fifo *f);
void SetActivate(bool enable);
void SetReceiverROI(ROI roi);
void ResetParametersforNewAcquisition();
void SetGeneralData(GeneralData *generalData);
void SetActivate(bool enable);
void SetReceiverROI(ROI roi);
void SetDataStreamEnable(bool enable);
void SetStreamingFrequency(uint32_t value);
void SetStreamingTimerInMs(uint32_t value);
void SetStreamingStartFnum(uint32_t value);
void SetFramePadding(bool enable);
void SetCtbDbitList(std::vector<int> value);
void SetCtbDbitOffset(int value);
void ResetParametersforNewAcquisition();
void CloseFiles();
void DeleteFiles();
void SetupFileWriter(const bool filewriteEnable,
const fileFormat fileFormatType,
std::mutex *hdf5LibMutex);
void CreateFirstFiles(const std::string &filePath,
const std::string &fileNamePrefix,
void CreateFirstFiles(const std::string &fileNamePrefix,
const uint64_t fileIndex, const bool overWriteEnable,
const bool silentMode, const int modulePos,
const int numUnitsPerReadout,
const bool silentMode,
const uint32_t udpPortNumber,
const uint32_t maxFramesPerFile,
const uint64_t numImages, const uint32_t dynamicRange,
const uint64_t numImages,
const bool detectorDataStream);
#ifdef HDF5C
uint32_t GetFilesInAcquisition() const;
@ -61,9 +66,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
const std::string &filePath, const std::string &fileNamePrefix,
const uint64_t fileIndex, const bool overWriteEnable,
const bool silentMode, const int modulePos,
const int numUnitsPerReadout, const uint32_t maxFramesPerFile,
const uint64_t numImages, const int numModX, const int numModY,
const uint32_t dynamicRange, std::mutex *hdf5LibMutex);
std::mutex *hdf5LibMutex);
void LinkFileInMaster(const std::string &masterFileName,
const std::string &virtualFileName,
const bool silentMode, std::mutex *hdf5LibMutex);
@ -137,25 +141,23 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
static const std::string typeName;
const GeneralData *generalData{nullptr};
GeneralData *generalData{nullptr};
Fifo *fifo;
detectorType detType;
bool *dataStreamEnable;
bool dataStreamEnable;
bool activated{false};
ROI receiverRoi{};
bool receiverRoiEnabled{false};
bool receiverNoRoi{false};
std::unique_ptr<char[]> completeImageToStreamBeforeCropping;
/** if 0, sending random images with a timer */
uint32_t *streamingFrequency;
uint32_t *streamingTimerInMs;
uint32_t *streamingStartFnum;
uint32_t streamingFrequency;
uint32_t streamingTimerInMs;
uint32_t streamingStartFnum;
uint32_t currentFreqCount{0};
struct timespec timerbegin {};
bool *framePadding;
std::vector<int> *ctbDbitList;
int *ctbDbitOffset;
int *ctbAnalogDataBytes;
bool framePadding;
std::vector<int> ctbDbitList;
int ctbDbitOffset;
std::atomic<bool> startedFlag{false};
std::atomic<uint64_t> firstIndex{0};

View File

@ -18,9 +18,7 @@ namespace sls {
const std::string DataStreamer::TypeName = "DataStreamer";
DataStreamer::DataStreamer(int index, Fifo *fifo, uint32_t *dynamicRange, ROI *detectorRoi, uint64_t *fileIndex, bool flipRows, slsDetectorDefs::xy numPorts, bool *quadEnable, uint64_t *totalNumFrames)
: ThreadObject(index, TypeName), fifo(fifo), dynamicRange(dynamicRange), detectorRoi(detectorRoi), fileIndex(fileIndex), flipRows(flipRows), numPorts(numPorts), quadEnable(quadEnable), totalNumFrames(totalNumFrames) {
DataStreamer::DataStreamer(int index) : ThreadObject(index, TypeName) {
LOG(logDEBUG) << "DataStreamer " << index << " created";
}
@ -31,6 +29,35 @@ DataStreamer::~DataStreamer() {
void DataStreamer::SetFifo(Fifo *f) { fifo = f; }
void DataStreamer::SetGeneralData(GeneralData *g) { generalData = g; }
void DataStreamer::SetFileIndex(uint64_t value) {
fileIndex = value;
}
void DataStreamer::SetNumberofPorts(xy np) { numPorts = np; }
void DataStreamer::SetFlipRows(bool fd) {
flipRows = fd;
// flip only right port of quad
if (quadEnable) {
flipRows = (index == 1 ? true : false);
}
}
void DataStreamer::SetQuadEnable(bool value) { quadEnable = value; }
void DataStreamer::SetNumberofTotalFrames(uint64_t value) {
nTotalFrames = value;
}
void DataStreamer::SetAdditionalJsonHeader(
const std::map<std::string, std::string> &json) {
std::lock_guard<std::mutex> lock(additionalJsonMutex);
additionalJsonHeader = json;
isAdditionalJsonUpdated = true;
}
void DataStreamer::ResetParametersforNewAcquisition(const std::string &fname) {
StopRunning();
startedFlag = false;
@ -41,8 +68,8 @@ void DataStreamer::ResetParametersforNewAcquisition(const std::string &fname) {
delete[] completeBuffer;
completeBuffer = nullptr;
}
if (generalData->detType == GOTTHARD && detectorRoi->xmin != -1) {
adcConfigured = generalData->GetAdcConfigured(index, *detectorRoi);
if (generalData->detType == GOTTHARD && generalData->detectorRoi.xmin != -1) {
adcConfigured = generalData->GetAdcConfigured(index, generalData->detectorRoi);
completeBuffer = new char[generalData->imageSizeComplete];
memset(completeBuffer, 0, generalData->imageSizeComplete);
}
@ -55,20 +82,7 @@ void DataStreamer::RecordFirstIndex(uint64_t fnum, size_t firstImageIndex) {
<< ", First Streamer Index:" << fnum;
}
void DataStreamer::SetGeneralData(GeneralData *g) { generalData = g; }
void DataStreamer::SetNumberofPorts(xy np) { numPorts = np; }
void DataStreamer::SetFlipRows(bool fd) { flipRows = fd; }
void DataStreamer::SetAdditionalJsonHeader(
const std::map<std::string, std::string> &json) {
std::lock_guard<std::mutex> lock(additionalJsonMutex);
additionalJsonHeader = json;
isAdditionalJsonUpdated = true;
}
void DataStreamer::CreateZmqSockets(int *nunits, uint32_t port,
void DataStreamer::CreateZmqSockets(uint32_t port,
const IpAddr ip, int hwm) {
uint32_t portnum = port + index;
std::string sip = ip.str();
@ -193,8 +207,8 @@ int DataStreamer::SendDataHeader(sls_detector_header header, uint32_t size,
uint64_t frameIndex = header.frameNumber - firstIndex;
uint64_t acquisitionIndex = header.frameNumber;
zHeader.dynamicRange = *dynamicRange;
zHeader.fileIndex = *fileIndex;
zHeader.dynamicRange = generalData->dynamicRange;
zHeader.fileIndex = fileIndex;
zHeader.ndetx = numPorts.x;
zHeader.ndety = numPorts.y;
zHeader.npixelsx = nx;
@ -203,7 +217,7 @@ int DataStreamer::SendDataHeader(sls_detector_header header, uint32_t size,
zHeader.acqIndex = acquisitionIndex;
zHeader.frameIndex = frameIndex;
zHeader.progress =
100 * ((double)(frameIndex + 1) / (double)(*totalNumFrames));
100 * ((double)(frameIndex + 1) / (double)(nTotalFrames));
zHeader.fname = fileNametoStream;
zHeader.frameNumber = header.frameNumber;
zHeader.expLength = header.expLength;
@ -219,7 +233,7 @@ int DataStreamer::SendDataHeader(sls_detector_header header, uint32_t size,
zHeader.detType = header.detType;
zHeader.version = header.version;
zHeader.flipRows = static_cast<int>(flipRows);
zHeader.quad = *quadEnable;
zHeader.quad = quadEnable;
zHeader.completeImage =
(header.packetNumber < generalData->packetsPerFrame ? false : true);

View File

@ -25,26 +25,29 @@ class ZmqSocket;
class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
public:
DataStreamer(int index, Fifo *fifo, uint32_t *dynamicRange, ROI *detectorRoi, uint64_t *fileIndex, bool flipRows, slsDetectorDefs::xy numPorts, bool *quadEnable, uint64_t *totalNumFrames);
DataStreamer(int index);
~DataStreamer();
void SetFifo(Fifo *f);
void ResetParametersforNewAcquisition(const std::string &fname);
void SetGeneralData(GeneralData *g);
void SetFileIndex(uint64_t value);
void SetNumberofPorts(xy np);
void SetFlipRows(bool fd);
void SetQuadEnable(bool value);
void SetNumberofTotalFrames(uint64_t value);
void
SetAdditionalJsonHeader(const std::map<std::string, std::string> &json);
void ResetParametersforNewAcquisition(const std::string &fname);
/**
* Creates Zmq Sockets
* (throws an exception if it couldnt create zmq sockets)
* @param nunits pointer to number of theads/ units per detector
* @param port streaming port start index
* @param ip streaming source ip
* @param hwm streaming high water mark
*/
void CreateZmqSockets(int *nunits, uint32_t port, const IpAddr ip,
void CreateZmqSockets(uint32_t port, const IpAddr ip,
int hwm);
void CloseZmqSocket();
void RestreamStop();
@ -85,13 +88,11 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
static const std::string TypeName;
const GeneralData *generalData{nullptr};
Fifo *fifo;
Fifo *fifo{nullptr};
ZmqSocket *zmqSocket{nullptr};
uint32_t *dynamicRange;
ROI *detectorRoi;
int adcConfigured{-1};
uint64_t *fileIndex;
bool flipRows;
uint64_t fileIndex{0};
bool flipRows{false};
std::map<std::string, std::string> additionalJsonHeader;
/** Used by streamer thread to update local copy (reduce number of locks
@ -111,8 +112,8 @@ class DataStreamer : private virtual slsDetectorDefs, public ThreadObject {
char *completeBuffer{nullptr};
xy numPorts{1, 1};
bool *quadEnable;
uint64_t *totalNumFrames;
bool quadEnable{false};
uint64_t nTotalFrames{0};
};
} // namespace sls

View File

@ -60,10 +60,9 @@ class File : private virtual slsDetectorDefs {
};
virtual void CreateFirstHDF5DataFile(
const std::string filePath, const std::string fileNamePrefix,
const std::string& fileNamePrefix,
const uint64_t fileIndex, const bool overWriteEnable,
const bool silentMode, const int modulePos,
const int numUnitsPerReadout, const uint32_t udpPortNumber,
const bool silentMode, const uint32_t udpPortNumber,
const uint32_t maxFramesPerFile, const uint64_t numImages,
const uint32_t nPixelsX, const uint32_t nPixelsY,
const uint32_t dynamicRange) {
@ -72,10 +71,9 @@ class File : private virtual slsDetectorDefs {
};
#endif
virtual void CreateFirstBinaryDataFile(
const std::string filePath, const std::string fileNamePrefix,
const std::string& fileNamePrefix,
const uint64_t fileIndex, const bool overWriteEnable,
const bool silentMode, const int modulePos,
const int numUnitsPerReadout, const uint32_t udpPortNumber,
const bool silentMode, const uint32_t udpPortNumber,
const uint32_t maxFramesPerFile) {
LOG(logERROR) << "This is a generic function CreateFirstBinaryDataFile that "
"should be overloaded by a derived class";

View File

@ -36,9 +36,9 @@ class GeneralData {
uint32_t frameIndexOffset{0};
uint32_t packetIndexMask{0};
uint32_t packetIndexOffset{0};
uint32_t maxFramesPerFile{0};
uint32_t defaultFifoDepth{0};
uint32_t numUDPInterfaces{1};
uint32_t framesPerFile{0};
uint32_t fifoDepth{0};
int numUDPInterfaces{1};
uint32_t headerPacketSize{0};
/** Streaming (for ROI - mainly short Gotthard) */
uint32_t nPixelsXComplete{0};
@ -48,7 +48,7 @@ class GeneralData {
uint32_t imageSizeComplete{0};
/** if standard header implemented in firmware */
bool standardheader{false};
uint32_t defaultUdpSocketBufferSize{RECEIVE_SOCKET_BUFFER_SIZE};
uint32_t udpSocketBufferSize{RECEIVE_SOCKET_BUFFER_SIZE};
uint32_t vetoDataSize{0};
uint32_t vetoPacketSize{0};
uint32_t vetoImageSize{0};
@ -61,7 +61,7 @@ class GeneralData {
slsDetectorDefs::readoutMode readoutType{slsDetectorDefs::ANALOG_ONLY};
uint32_t adcEnableMaskOneGiga{BIT32_MASK};
uint32_t adcEnableMaskTenGiga{BIT32_MASK};
slsDetectorDefs::ROI roi{};
slsDetectorDefs::ROI detectorRoi{};
uint32_t counterMask{0};
GeneralData(){};
@ -164,7 +164,7 @@ class GotthardData : public GeneralData {
detType = slsDetectorDefs::GOTTHARD;
nPixelsY = 1;
headerSizeinPacket = 6;
maxFramesPerFile = MAX_FRAMES_PER_FILE;
framesPerFile = MAX_FRAMES_PER_FILE;
UpdateImageSize();
};
@ -247,7 +247,7 @@ class GotthardData : public GeneralData {
};
void SetDetectorROI(slsDetectorDefs::ROI i) {
roi = i;
detectorRoi = i;
UpdateImageSize();
};
@ -255,18 +255,18 @@ class GotthardData : public GeneralData {
void UpdateImageSize() {
// all adcs
if (roi.xmin == -1) {
if (detectorRoi.xmin == -1) {
nPixelsX = 1280;
dataSize = 1280;
packetsPerFrame = 2;
frameIndexMask = 0xFFFFFFFE;
frameIndexOffset = 1;
packetIndexMask = 1;
maxFramesPerFile = MAX_FRAMES_PER_FILE;
framesPerFile = MAX_FRAMES_PER_FILE;
nPixelsXComplete = 0;
nPixelsYComplete = 0;
imageSizeComplete = 0;
defaultFifoDepth = 50000;
fifoDepth = 50000;
} else {
nPixelsX = 256;
dataSize = 512;
@ -274,11 +274,11 @@ class GotthardData : public GeneralData {
frameIndexMask = 0xFFFFFFFF;
frameIndexOffset = 0;
packetIndexMask = 0;
maxFramesPerFile = SHORT_MAX_FRAMES_PER_FILE;
framesPerFile = SHORT_MAX_FRAMES_PER_FILE;
nPixelsXComplete = 1280;
nPixelsYComplete = 1;
imageSizeComplete = 1280 * 2;
defaultFifoDepth = 75000;
fifoDepth = 75000;
}
imageSize = int(nPixelsX * nPixelsY * GetPixelDepth());
packetSize = headerSizeinPacket + dataSize;
@ -292,7 +292,7 @@ class EigerData : public GeneralData {
EigerData() {
detType = slsDetectorDefs::EIGER;
headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header);
maxFramesPerFile = EIGER_MAX_FRAMES_PER_FILE;
framesPerFile = EIGER_MAX_FRAMES_PER_FILE;
numUDPInterfaces = 2;
headerPacketSize = 40;
standardheader = true;
@ -318,7 +318,7 @@ class EigerData : public GeneralData {
packetSize = headerSizeinPacket + dataSize;
imageSize = int(nPixelsX * nPixelsY * GetPixelDepth());
packetsPerFrame = imageSize / dataSize;
defaultFifoDepth = (dynamicRange == 32 ? 100 : 1000);
fifoDepth = (dynamicRange == 32 ? 100 : 1000);
};
};
@ -330,8 +330,8 @@ class JungfrauData : public GeneralData {
headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header);
dataSize = 8192;
packetSize = headerSizeinPacket + dataSize;
maxFramesPerFile = JFRAU_MAX_FRAMES_PER_FILE;
defaultFifoDepth = 2500;
framesPerFile = JFRAU_MAX_FRAMES_PER_FILE;
fifoDepth = 2500;
standardheader = true;
maxRowsPerReadout = 512;
UpdateImageSize();
@ -348,7 +348,7 @@ class JungfrauData : public GeneralData {
nPixelsY = (256 * 2) / numUDPInterfaces;
imageSize = int(nPixelsX * nPixelsY * GetPixelDepth());
packetsPerFrame = imageSize / dataSize;
defaultUdpSocketBufferSize = (1000 * 1024 * 1024) / numUDPInterfaces;
udpSocketBufferSize = (1000 * 1024 * 1024) / numUDPInterfaces;
};
};
@ -362,10 +362,10 @@ class Mythen3Data : public GeneralData {
detType = slsDetectorDefs::MYTHEN3;
nPixelsY = 1;
headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header);
maxFramesPerFile = MYTHEN3_MAX_FRAMES_PER_FILE;
defaultFifoDepth = 50000;
framesPerFile = MYTHEN3_MAX_FRAMES_PER_FILE;
fifoDepth = 50000;
standardheader = true;
defaultUdpSocketBufferSize = (1000 * 1024 * 1024);
udpSocketBufferSize = (1000 * 1024 * 1024);
dynamicRange = 32;
tengigaEnable = true;
SetCounterMask(0x7);
@ -432,8 +432,8 @@ class Gotthard2Data : public GeneralData {
nPixelsY = 1;
headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header);
dataSize = 2560; // 1280 channels * 2 bytes
maxFramesPerFile = GOTTHARD2_MAX_FRAMES_PER_FILE;
defaultFifoDepth = 50000;
framesPerFile = GOTTHARD2_MAX_FRAMES_PER_FILE;
fifoDepth = 50000;
standardheader = true;
vetoDataSize = 160;
vetoHsize = 16;
@ -469,7 +469,7 @@ class Gotthard2Data : public GeneralData {
packetsPerFrame = imageSize / dataSize;
vetoPacketSize = vetoHsize + vetoDataSize;
vetoImageSize = vetoDataSize * packetsPerFrame;
defaultUdpSocketBufferSize = (1000 * 1024 * 1024) / numUDPInterfaces;
udpSocketBufferSize = (1000 * 1024 * 1024) / numUDPInterfaces;
};
};
@ -488,8 +488,8 @@ class ChipTestBoardData : public GeneralData {
frameIndexMask = 0xFFFFFF; // 10g
frameIndexOffset = 8; // 10g
packetIndexMask = 0xFF; // 10g
maxFramesPerFile = CTB_MAX_FRAMES_PER_FILE;
defaultFifoDepth = 2500;
framesPerFile = CTB_MAX_FRAMES_PER_FILE;
fifoDepth = 2500;
standardheader = true;
UpdateImageSize();
};
@ -575,8 +575,8 @@ class MoenchData : public GeneralData {
detType = slsDetectorDefs::MOENCH;
headerSizeinPacket = sizeof(slsDetectorDefs::sls_detector_header);
frameIndexMask = 0xFFFFFF;
maxFramesPerFile = MOENCH_MAX_FRAMES_PER_FILE;
defaultFifoDepth = 2500;
framesPerFile = MOENCH_MAX_FRAMES_PER_FILE;
fifoDepth = 2500;
standardheader = true;
UpdateImageSize();
};

View File

@ -90,9 +90,8 @@ void HDF5DataFile::CloseFile() {
}
void HDF5DataFile::CreateFirstHDF5DataFile(
const std::string fPath, const std::string fNamePrefix,
const std::string& fNamePrefix,
const uint64_t fIndex, const bool owEnable, const bool sMode,
const int modulePos, const int nUnitsPerReadout,
const uint32_t uPortNumber, const uint32_t mFramesPerFile,
const uint64_t nImages, const uint32_t nX, const uint32_t nY,
const uint32_t dr) {
@ -108,13 +107,10 @@ void HDF5DataFile::CreateFirstHDF5DataFile(
nPixelsY = nY;
dynamicRange = dr;
filePath = fPath;
fileNamePrefix = fNamePrefix;
fileIndex = fIndex;
overWriteEnable = owEnable;
silentMode = sMode;
detIndex = modulePos;
numUnitsPerReadout = nUnitsPerReadout;
udpPortNumber = uPortNumber;
switch (dynamicRange) {
@ -138,8 +134,7 @@ void HDF5DataFile::CreateFile() {
numFilesInAcquisition++;
std::ostringstream os;
os << filePath << "/" << fileNamePrefix << "_d"
<< (detIndex * numUnitsPerReadout + index) << "_f" << subFileIndex
os << fileNamePrefix << "_f" << subFileIndex
<< '_' << fileIndex << ".h5";
fileName = os.str();

View File

@ -24,10 +24,9 @@ class HDF5DataFile : private virtual slsDetectorDefs, public File {
void CloseFile() override;
void CreateFirstHDF5DataFile(
const std::string fPath, const std::string fNamePrefix,
const std::string& fNamePrefix,
const uint64_t fIndex, const bool owEnable,
const bool sMode, const int modulePos,
const int nUnitsPerReadout, const uint32_t uPortNumber,
const bool sMode, const uint32_t uPortNumber,
const uint32_t mFramesPerFile, const uint64_t nImages,
const uint32_t nX, const uint32_t nY,
const uint32_t dr) override;
@ -65,13 +64,10 @@ class HDF5DataFile : private virtual slsDetectorDefs, public File {
uint32_t nPixelsY{0};
uint32_t dynamicRange{0};
std::string filePath;
std::string fileNamePrefix;
uint64_t fileIndex{0};
bool overWriteEnable{false};
bool silentMode{false};
int detIndex{0};
int numUnitsPerReadout{0};
uint32_t udpPortNumber{0};
static const int EIGER_NUM_PIXELS{256 * 2 * 256};

View File

@ -66,20 +66,20 @@ void Implementation::SetThreadPriorities() {
void Implementation::SetupFifoStructure() {
fifo.clear();
for (int i = 0; i < numUDPInterfaces; ++i) {
for (int i = 0; i < generalData->numUDPInterfaces; ++i) {
size_t datasize = generalData->imageSize;
// veto data size
if (detType == GOTTHARD2 && i != 0) {
if (generalData->detType == GOTTHARD2 && i != 0) {
datasize = generalData->vetoImageSize;
}
datasize += IMAGE_STRUCTURE_HEADER_SIZE;
// create fifo structure
try {
fifo.push_back(sls::make_unique<Fifo>(i, datasize, fifoDepth));
fifo.push_back(sls::make_unique<Fifo>(i, datasize, generalData->fifoDepth));
} catch (...) {
fifo.clear();
fifoDepth = 0;
generalData->fifoDepth = 0;
throw RuntimeError(
"Could not allocate memory for fifo structure " +
std::to_string(i) + ". FifoDepth is now 0.");
@ -93,11 +93,11 @@ void Implementation::SetupFifoStructure() {
dataStreamer[i]->SetFifo(fifo[i].get());
LOG(logINFO) << "Memory Allocated for Fifo " << i << ": "
<< (double)(datasize * (size_t)fifoDepth) /
<< (double)(datasize * (size_t)generalData->fifoDepth) /
(double)(1024 * 1024)
<< " MB";
}
LOG(logINFO) << numUDPInterfaces << " Fifo structure(s) reconstructed";
LOG(logINFO) << generalData->numUDPInterfaces << " Fifo structure(s) reconstructed";
}
/**************************************************
@ -108,8 +108,7 @@ void Implementation::SetupFifoStructure() {
void Implementation::setDetectorType(const detectorType d) {
detType = d;
switch (detType) {
switch (d) {
case GOTTHARD:
case EIGER:
case JUNGFRAU:
@ -128,7 +127,7 @@ void Implementation::setDetectorType(const detectorType d) {
generalData = nullptr;
// set detector specific variables
switch (detType) {
switch (d) {
case GOTTHARD:
generalData = new GotthardData();
break;
@ -154,40 +153,18 @@ void Implementation::setDetectorType(const detectorType d) {
break;
}
framesPerFile = generalData->maxFramesPerFile;
fifoDepth = generalData->defaultFifoDepth;
numUDPInterfaces = generalData->numUDPInterfaces;
udpSocketBufferSize = generalData->defaultUdpSocketBufferSize;
dynamicRange = generalData->dynamicRange;
tengigaEnable = generalData->tengigaEnable;
numberOfAnalogSamples = generalData->nAnalogSamples;
numberOfDigitalSamples = generalData->nDigitalSamples;
readoutType = generalData->readoutType;
adcEnableMaskOneGiga = generalData->adcEnableMaskOneGiga;
adcEnableMaskTenGiga = generalData->adcEnableMaskTenGiga;
detectorRoi = generalData->roi;
counterMask = generalData->counterMask;
SetLocalNetworkParameters();
SetupFifoStructure();
// create threads
for (int i = 0; i < numUDPInterfaces; ++i) {
for (int i = 0; i < generalData->numUDPInterfaces; ++i) {
try {
auto fifo_ptr = fifo[i].get();
listener.push_back(sls::make_unique<Listener>(
i, fifo_ptr, &status, &udpPortNum[i], &eth[i],
&udpSocketBufferSize, &actualUDPSocketBufferSize,
&framesPerFile, &frameDiscardMode, &silentMode));
int ctbAnalogDataBytes = 0;
if (detType == CHIPTESTBOARD) {
ctbAnalogDataBytes = generalData->GetNumberOfAnalogDatabytes();
}
dataProcessor.push_back(sls::make_unique<DataProcessor>(
i, detType, fifo_ptr, &dataStreamEnable, &streamingFrequency,
&streamingTimerInMs, &streamingStartFnum, &framePadding,
&ctbDbitList, &ctbDbitOffset, &ctbAnalogDataBytes));
i, &status));
SetupListener(i);
dataProcessor.push_back(sls::make_unique<DataProcessor>(i));
SetupDataProcessor(i);
} catch (...) {
listener.clear();
dataProcessor.clear();
@ -197,31 +174,59 @@ void Implementation::setDetectorType(const detectorType d) {
}
}
// set up writer and callbacks
for (int i = 0; i != (int)listener.size(); ++i) {
listener[i]->SetGeneralData(generalData);
listener[i]->SetActivate(activated);
listener[i]->SetDetectorDatastream(detectorDataStream[i]);
}
for (const auto &it : dataProcessor) {
it->SetGeneralData(generalData);
it->SetActivate(activated);
}
SetThreadPriorities();
LOG(logDEBUG) << " Detector type set to " << ToString(d);
}
void Implementation::SetupListener(int i) {
listener[i]->SetFifo(fifo[i].get());
listener[i]->SetGeneralData(generalData);
listener[i]->SetUdpPortNumber(udpPortNum[i]);
listener[i]->SetEthernetInterface(eth[i]);
listener[i]->SetActivate(activated);
listener[i]->SetNoRoi(portRois[i].noRoi());
listener[i]->SetDetectorDatastream(detectorDataStream[i]);
listener[i]->SetFrameDiscardPolicy(frameDiscardMode);
listener[i]->SetSilentMode(silentMode);
}
void Implementation::SetupDataProcessor(int i) {
dataProcessor[i]->SetFifo(fifo[i].get());
dataProcessor[i]->SetGeneralData(generalData);
dataProcessor[i]->SetActivate(activated);
dataProcessor[i]->SetReceiverROI(portRois[i]);
dataProcessor[i]->SetDataStreamEnable(dataStreamEnable);
dataProcessor[i]->SetStreamingFrequency(streamingFrequency);
dataProcessor[i]->SetStreamingTimerInMs(streamingTimerInMs);
dataProcessor[i]->SetStreamingStartFnum(streamingStartFnum);
dataProcessor[i]->SetFramePadding(framePadding);
dataProcessor[i]->SetCtbDbitList(ctbDbitList);
dataProcessor[i]->SetCtbDbitOffset(ctbDbitOffset);
}
void Implementation::SetupDataStreamer(int i) {
dataStreamer[i]->SetFifo(fifo[i].get());
dataStreamer[i]->SetGeneralData(generalData);
dataStreamer[i]->CreateZmqSockets(streamingPort, streamingSrcIP, streamingHwm);
dataStreamer[i]->SetAdditionalJsonHeader(additionalJsonHeader);
dataStreamer[i]->SetFileIndex(fileIndex);
dataStreamer[i]->SetFlipRows(flipRows);
dataStreamer[i]->SetNumberofPorts(numPorts);
dataStreamer[i]->SetQuadEnable(quadEnable);
dataStreamer[i]->SetNumberofTotalFrames(numberOfTotalFrames);
}
slsDetectorDefs::xy Implementation::getDetectorSize() const {
return numModules;
}
const slsDetectorDefs::xy Implementation::GetPortGeometry() const {
xy portGeometry{1, 1};
if (detType == EIGER)
portGeometry.x = numUDPInterfaces;
else if (detType == JUNGFRAU)
portGeometry.y = numUDPInterfaces;
if (generalData->detType == EIGER)
portGeometry.x = generalData->numUDPInterfaces;
else if (generalData->detType == JUNGFRAU)
portGeometry.y = generalData->numUDPInterfaces;
return portGeometry;
}
@ -283,14 +288,16 @@ bool Implementation::getSilentMode() const { return silentMode; }
void Implementation::setSilentMode(const bool i) {
silentMode = i;
for (const auto &it : listener)
it->SetSilentMode(silentMode);
LOG(logINFO) << "Silent Mode: " << i;
}
uint32_t Implementation::getFifoDepth() const { return fifoDepth; }
uint32_t Implementation::getFifoDepth() const { return generalData->fifoDepth; }
void Implementation::setFifoDepth(const uint32_t i) {
if (fifoDepth != i) {
fifoDepth = i;
if (generalData->fifoDepth != i) {
generalData->fifoDepth = i;
SetupFifoStructure();
}
LOG(logINFO) << "Fifo Depth: " << i;
@ -303,6 +310,8 @@ Implementation::getFrameDiscardPolicy() const {
void Implementation::setFrameDiscardPolicy(const frameDiscardPolicy i) {
frameDiscardMode = i;
for (const auto &it : listener)
it->SetFrameDiscardPolicy(frameDiscardMode);
LOG(logINFO) << "Frame Discard Policy: " << ToString(frameDiscardMode);
}
@ -310,6 +319,8 @@ bool Implementation::getFramePaddingEnable() const { return framePadding; }
void Implementation::setFramePaddingEnable(const bool i) {
framePadding = i;
for (const auto &it : dataProcessor)
it->SetFramePadding(framePadding);
LOG(logINFO) << "Frame Padding: " << framePadding;
}
@ -330,7 +341,7 @@ std::array<pid_t, NUM_RX_THREAD_IDS> Implementation::getThreadIds() const {
} else {
retval[id++] = 0;
}
if (numUDPInterfaces == 2) {
if (generalData->numUDPInterfaces == 2) {
retval[id++] = listener[1]->GetThreadId();
retval[id++] = dataProcessor[1]->GetThreadId();
if (dataStreamEnable) {
@ -354,9 +365,9 @@ void Implementation::setArping(const bool i,
arping.StopThread();
} else {
// setup interface
for (int i = 0; i != numUDPInterfaces; ++i) {
for (int i = 0; i != generalData->numUDPInterfaces; ++i) {
// ignore eiger with 2 interfaces (only udp port)
if (i == 1 && (numUDPInterfaces == 1 || detType == EIGER)) {
if (i == 1 && (generalData->numUDPInterfaces == 1 || generalData->detType == EIGER)) {
break;
}
arping.SetInterfacesAndIps(i, eth[i], ips[i]);
@ -373,13 +384,13 @@ slsDetectorDefs::ROI Implementation::getReceiverROI() const {
void Implementation::setReceiverROI(const slsDetectorDefs::ROI arg) {
receiverRoi = arg;
if (numUDPInterfaces == 1 || detType == slsDetectorDefs::GOTTHARD2) {
if (generalData->numUDPInterfaces == 1 || generalData->detType == slsDetectorDefs::GOTTHARD2) {
portRois[0] = arg;
} else {
slsDetectorDefs::xy nPortDim(generalData->nPixelsX,
generalData->nPixelsY);
for (int iPort = 0; iPort != numUDPInterfaces; ++iPort) {
for (int iPort = 0; iPort != generalData->numUDPInterfaces; ++iPort) {
// default init = complete roi
slsDetectorDefs::ROI portRoi{};
@ -443,7 +454,7 @@ void Implementation::setReceiverROI(const slsDetectorDefs::ROI arg) {
for (size_t i = 0; i != dataProcessor.size(); ++i)
dataProcessor[i]->SetReceiverROI(portRois[i]);
LOG(logINFO) << "receiver roi: " << ToString(receiverRoi);
if (numUDPInterfaces == 2 && detType != slsDetectorDefs::GOTTHARD2) {
if (generalData->numUDPInterfaces == 2 && generalData->detType != slsDetectorDefs::GOTTHARD2) {
LOG(logINFO) << "port rois: " << ToString(portRois);
}
}
@ -504,6 +515,8 @@ uint64_t Implementation::getFileIndex() const { return fileIndex; }
void Implementation::setFileIndex(const uint64_t i) {
fileIndex = i;
for (const auto &it : dataStreamer)
it->SetFileIndex(fileIndex);
LOG(logINFO) << "File Index: " << fileIndex;
}
@ -539,11 +552,11 @@ void Implementation::setOverwriteEnable(const bool b) {
<< (overwriteEnable ? "enabled" : "disabled");
}
uint32_t Implementation::getFramesPerFile() const { return framesPerFile; }
uint32_t Implementation::getFramesPerFile() const { return generalData->framesPerFile; }
void Implementation::setFramesPerFile(const uint32_t i) {
framesPerFile = i;
LOG(logINFO) << "Frames per file: " << framesPerFile;
generalData->framesPerFile = i;
LOG(logINFO) << "Frames per file: " << generalData->framesPerFile;
}
/**************************************************
@ -554,7 +567,7 @@ void Implementation::setFramesPerFile(const uint32_t i) {
slsDetectorDefs::runStatus Implementation::getStatus() const { return status; }
std::vector<int64_t> Implementation::getFramesCaught() const {
std::vector<int64_t> numFramesCaught(numUDPInterfaces);
std::vector<int64_t> numFramesCaught(generalData->numUDPInterfaces);
int index = 0;
for (const auto &it : listener) {
if (it->GetStartedFlag()) {
@ -566,7 +579,7 @@ std::vector<int64_t> Implementation::getFramesCaught() const {
}
std::vector<int64_t> Implementation::getCurrentFrameIndex() const {
std::vector<int64_t> frameIndex(numUDPInterfaces);
std::vector<int64_t> frameIndex(generalData->numUDPInterfaces);
int index = 0;
for (const auto &it : listener) {
if (it->GetStartedFlag()) {
@ -611,8 +624,8 @@ double Implementation::getProgress() const {
}
std::vector<int64_t> Implementation::getNumMissingPackets() const {
std::vector<int64_t> mp(numUDPInterfaces);
for (int i = 0; i < numUDPInterfaces; ++i) {
std::vector<int64_t> mp(generalData->numUDPInterfaces);
for (int i = 0; i < generalData->numUDPInterfaces; ++i) {
int np = generalData->packetsPerFrame;
uint64_t totnp = np;
// ReadNRows
@ -716,7 +729,7 @@ void Implementation::stopReceiver() {
auto mp = getNumMissingPackets();
// print summary
uint64_t tot = 0;
for (int i = 0; i < numUDPInterfaces; i++) {
for (int i = 0; i < generalData->numUDPInterfaces; i++) {
int nf = listener[i]->GetNumCompleteFramesCaught();
tot += nf;
std::string mpMessage = std::to_string(mp[i]);
@ -750,7 +763,7 @@ void Implementation::stopReceiver() {
// callback
if (acquisitionFinishedCallBack) {
try {
acquisitionFinishedCallBack((tot / numUDPInterfaces),
acquisitionFinishedCallBack((tot / generalData->numUDPInterfaces),
pAcquisitionFinished);
} catch (const std::exception &e) {
// change status
@ -836,7 +849,7 @@ void Implementation::ResetParametersforNewAcquisition() {
void Implementation::CreateUDPSockets() {
try {
for (unsigned int i = 0; i < listener.size(); ++i) {
listener[i]->CreateUDPSockets();
listener[i]->CreateUDPSocket(actualUDPSocketBufferSize);
}
} catch (const RuntimeError &e) {
shutDownUDPSockets();
@ -848,10 +861,12 @@ void Implementation::CreateUDPSockets() {
void Implementation::SetupWriter() {
try {
for (unsigned int i = 0; i < dataProcessor.size(); ++i) {
std::ostringstream os;
os << filePath << "/" << fileName << "_d" << (modulePos * generalData->numUDPInterfaces + i);
std::string fileNamePrefix = os.str();
dataProcessor[i]->CreateFirstFiles(
filePath, fileName, fileIndex, overwriteEnable, silentMode,
modulePos, numUDPInterfaces, udpPortNum[i], framesPerFile,
numberOfTotalFrames, dynamicRange, detectorDataStream[i]);
fileNamePrefix, fileIndex, overwriteEnable, silentMode,
udpPortNum[i], numberOfTotalFrames, detectorDataStream[i]);
}
} catch (const RuntimeError &e) {
shutDownUDPSockets();
@ -867,13 +882,13 @@ void Implementation::StartMasterWriter() {
// master file
if (masterFileWriteEnable) {
MasterAttributes masterAttributes;
masterAttributes.detType = detType;
masterAttributes.detType = generalData->detType;
masterAttributes.timingMode = timingMode;
masterAttributes.geometry = numPorts;
masterAttributes.imageSize = generalData->imageSize;
masterAttributes.nPixels =
xy(generalData->nPixelsX, generalData->nPixelsY);
masterAttributes.maxFramesPerFile = framesPerFile;
masterAttributes.maxFramesPerFile = generalData->framesPerFile;
masterAttributes.frameDiscardMode = frameDiscardMode;
masterAttributes.framePadding = framePadding;
masterAttributes.scanParams = scanParams;
@ -883,9 +898,9 @@ void Implementation::StartMasterWriter() {
masterAttributes.exptime = acquisitionTime;
masterAttributes.period = acquisitionPeriod;
masterAttributes.burstMode = burstMode;
masterAttributes.numUDPInterfaces = numUDPInterfaces;
masterAttributes.dynamicRange = dynamicRange;
masterAttributes.tenGiga = tengigaEnable;
masterAttributes.numUDPInterfaces = generalData->numUDPInterfaces;
masterAttributes.dynamicRange = generalData->dynamicRange;
masterAttributes.tenGiga = generalData->tengigaEnable;
masterAttributes.thresholdEnergyeV = thresholdEnergyeV;
masterAttributes.thresholdAllEnergyeV = thresholdAllEnergyeV;
masterAttributes.subExptime = subExpTime;
@ -894,24 +909,24 @@ void Implementation::StartMasterWriter() {
masterAttributes.readNRows = readNRows;
masterAttributes.ratecorr = rateCorrections;
masterAttributes.adcmask =
tengigaEnable ? adcEnableMaskTenGiga : adcEnableMaskOneGiga;
masterAttributes.analog = (readoutType == ANALOG_ONLY ||
readoutType == ANALOG_AND_DIGITAL)
generalData->tengigaEnable ? generalData->adcEnableMaskTenGiga : generalData->adcEnableMaskOneGiga;
masterAttributes.analog = (generalData->readoutType == ANALOG_ONLY ||
generalData->readoutType == ANALOG_AND_DIGITAL)
? 1
: 0;
masterAttributes.analogSamples = numberOfAnalogSamples;
masterAttributes.digital = (readoutType == DIGITAL_ONLY ||
readoutType == ANALOG_AND_DIGITAL)
masterAttributes.analogSamples = generalData->nAnalogSamples;
masterAttributes.digital = (generalData->readoutType == DIGITAL_ONLY ||
generalData->readoutType == ANALOG_AND_DIGITAL)
? 1
: 0;
masterAttributes.digitalSamples = numberOfDigitalSamples;
masterAttributes.digitalSamples = generalData->nDigitalSamples;
masterAttributes.dbitoffset = ctbDbitOffset;
masterAttributes.dbitlist = 0;
for (auto &i : ctbDbitList) {
masterAttributes.dbitlist |= (1 << i);
}
masterAttributes.detectorRoi = detectorRoi;
masterAttributes.counterMask = counterMask;
masterAttributes.detectorRoi = generalData->detectorRoi;
masterAttributes.counterMask = generalData->counterMask;
masterAttributes.exptimeArray[0] = acquisitionTime1;
masterAttributes.exptimeArray[1] = acquisitionTime2;
masterAttributes.exptimeArray[2] = acquisitionTime3;
@ -935,9 +950,8 @@ void Implementation::StartMasterWriter() {
virtualFileName =
dataProcessor[0]->CreateVirtualFile(
filePath, fileName, fileIndex, overwriteEnable,
silentMode, modulePos, numUDPInterfaces, framesPerFile,
numberOfTotalFrames, numPorts.x, numPorts.y,
dynamicRange, &hdf5LibMutex);
silentMode, modulePos, numberOfTotalFrames, numPorts.x, numPorts.y,
&hdf5LibMutex);
}
// link file in master
if (masterFileWriteEnable) {
@ -976,17 +990,17 @@ void Implementation::StartRunning() {
* *
* ************************************************/
int Implementation::getNumberofUDPInterfaces() const {
return numUDPInterfaces;
return generalData->numUDPInterfaces;
}
// not Eiger
void Implementation::setNumberofUDPInterfaces(const int n) {
if (detType == EIGER) {
if (generalData->detType == EIGER) {
throw RuntimeError(
"Cannot set number of UDP interfaces for Eiger");
}
if (numUDPInterfaces != n) {
if (generalData->numUDPInterfaces != n) {
// clear all threads and fifos
listener.clear();
dataProcessor.clear();
@ -995,41 +1009,22 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
// set local variables
generalData->SetNumberofInterfaces(n);
numUDPInterfaces = n;
generalData->numUDPInterfaces = n;
// fifo
udpSocketBufferSize = generalData->defaultUdpSocketBufferSize;
SetupFifoStructure();
// recalculate port rois
setReceiverROI(receiverRoi);
// create threads
for (int i = 0; i < numUDPInterfaces; ++i) {
for (int i = 0; i < generalData->numUDPInterfaces; ++i) {
// listener and dataprocessor threads
try {
auto fifo_ptr = fifo[i].get();
listener.push_back(sls::make_unique<Listener>(
i, fifo_ptr, &status, &udpPortNum[i], &eth[i],
&udpSocketBufferSize, &actualUDPSocketBufferSize,
&framesPerFile, &frameDiscardMode, &silentMode));
listener[i]->SetGeneralData(generalData);
listener[i]->SetActivate(activated);
listener[i]->SetNoRoi(portRois[i].noRoi());
listener[i]->SetDetectorDatastream(detectorDataStream[i]);
int ctbAnalogDataBytes = 0;
if (detType == CHIPTESTBOARD) {
ctbAnalogDataBytes =
generalData->GetNumberOfAnalogDatabytes();
}
dataProcessor.push_back(sls::make_unique<DataProcessor>(
i, detType, fifo_ptr, &dataStreamEnable,
&streamingFrequency, &streamingTimerInMs,
&streamingStartFnum, &framePadding, &ctbDbitList,
&ctbDbitOffset, &ctbAnalogDataBytes));
dataProcessor[i]->SetGeneralData(generalData);
dataProcessor[i]->SetActivate(activated);
dataProcessor[i]->SetReceiverROI(portRois[i]);
i, &status));
SetupListener(i);
dataProcessor.push_back(sls::make_unique<DataProcessor>(i));
SetupDataProcessor(i);
} catch (...) {
listener.clear();
dataProcessor.clear();
@ -1037,27 +1032,18 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
"Could not create listener/dataprocessor threads (index:" +
std::to_string(i) + ")");
}
// streamer threads
if (dataStreamEnable) {
try {
bool flip = flipRows;
if (quadEnable) {
flip = (i == 1 ? true : false);
}
dataStreamer.push_back(sls::make_unique<DataStreamer>(
i, fifo[i].get(), &dynamicRange, &detectorRoi,
&fileIndex, flip, numPorts, &quadEnable,
&numberOfTotalFrames));
dataStreamer[i]->SetGeneralData(generalData);
dataStreamer[i]->CreateZmqSockets(
&numUDPInterfaces, streamingPort, streamingSrcIP,
streamingHwm);
dataStreamer[i]->SetAdditionalJsonHeader(
additionalJsonHeader);
dataStreamer.push_back(sls::make_unique<DataStreamer>(i));
SetupDataStreamer(i);
} catch (...) {
if (dataStreamEnable) {
dataStreamer.clear();
dataStreamEnable = false;
for (const auto &it : dataProcessor)
it->SetDataStreamEnable(dataStreamEnable);
}
throw RuntimeError(
"Could not create datastreamer threads (index:" +
@ -1089,13 +1075,14 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
setUDPSocketBufferSize(0);
}
LOG(logINFO) << "Number of Interfaces: " << numUDPInterfaces;
LOG(logINFO) << "Number of Interfaces: " << generalData->numUDPInterfaces;
}
std::string Implementation::getEthernetInterface() const { return eth[0]; }
void Implementation::setEthernetInterface(const std::string &c) {
eth[0] = c;
listener[0]->SetEthernetInterface(c);
LOG(logINFO) << "Ethernet Interface: " << eth[0];
}
@ -1103,6 +1090,9 @@ std::string Implementation::getEthernetInterface2() const { return eth[1]; }
void Implementation::setEthernetInterface2(const std::string &c) {
eth[1] = c;
if (listener.size() > 1) {
listener[1]->SetEthernetInterface(c);
}
LOG(logINFO) << "Ethernet Interface 2: " << eth[1];
}
@ -1110,6 +1100,7 @@ uint32_t Implementation::getUDPPortNumber() const { return udpPortNum[0]; }
void Implementation::setUDPPortNumber(const uint32_t i) {
udpPortNum[0] = i;
listener[0]->SetUdpPortNumber(i);
LOG(logINFO) << "UDP Port Number[0]: " << udpPortNum[0];
}
@ -1117,32 +1108,27 @@ uint32_t Implementation::getUDPPortNumber2() const { return udpPortNum[1]; }
void Implementation::setUDPPortNumber2(const uint32_t i) {
udpPortNum[1] = i;
if (listener.size() > 1) {
listener[1]->SetUdpPortNumber(i);
}
LOG(logINFO) << "UDP Port Number[1]: " << udpPortNum[1];
}
int Implementation::getUDPSocketBufferSize() const {
return udpSocketBufferSize;
return generalData->udpSocketBufferSize;
}
void Implementation::setUDPSocketBufferSize(const int s) {
// custom setup is not 0 (must complain if set up didnt work)
// testing default setup at startup, argument is 0 to use default values
int size = (s == 0) ? udpSocketBufferSize : s;
size_t listSize = listener.size();
if ((detType == JUNGFRAU || detType == GOTTHARD2) &&
(int)listSize != numUDPInterfaces) {
if ((generalData->detType == JUNGFRAU || generalData->detType == GOTTHARD2) &&
(int)listSize != generalData->numUDPInterfaces) {
throw RuntimeError(
"Number of Interfaces " + std::to_string(numUDPInterfaces) +
"Number of Interfaces " + std::to_string(generalData->numUDPInterfaces) +
" do not match listener size " + std::to_string(listSize));
}
for (auto &l : listener) {
l->CreateDummySocketForUDPSocketBufferSize(size);
}
// custom and didnt set, throw error
if (s != 0 && udpSocketBufferSize != s) {
throw RuntimeError("Could not set udp socket buffer size. (No "
"CAP_NET_ADMIN privileges?)");
l->CreateDummySocketForUDPSocketBufferSize(s, actualUDPSocketBufferSize);
}
}
@ -1165,31 +1151,23 @@ void Implementation::setDataStreamEnable(const bool enable) {
dataStreamer.clear();
if (enable) {
for (int i = 0; i < numUDPInterfaces; ++i) {
for (int i = 0; i < generalData->numUDPInterfaces; ++i) {
try {
bool flip = flipRows;
if (quadEnable) {
flip = (i == 1 ? true : false);
}
dataStreamer.push_back(sls::make_unique<DataStreamer>(
i, fifo[i].get(), &dynamicRange, &detectorRoi,
&fileIndex, flip, numPorts, &quadEnable,
&numberOfTotalFrames));
dataStreamer[i]->SetGeneralData(generalData);
dataStreamer[i]->CreateZmqSockets(
&numUDPInterfaces, streamingPort, streamingSrcIP,
streamingHwm);
dataStreamer[i]->SetAdditionalJsonHeader(
additionalJsonHeader);
dataStreamer.push_back(sls::make_unique<DataStreamer>(i));
SetupDataStreamer(i);
} catch (...) {
dataStreamer.clear();
dataStreamEnable = false;
for (const auto &it : dataProcessor)
it->SetDataStreamEnable(dataStreamEnable);
throw RuntimeError(
"Could not set data stream enable.");
}
}
SetThreadPriorities();
}
for (const auto &it : dataProcessor)
it->SetDataStreamEnable(dataStreamEnable);
}
LOG(logINFO) << "Data Send to Gui: " << dataStreamEnable;
}
@ -1200,6 +1178,8 @@ uint32_t Implementation::getStreamingFrequency() const {
void Implementation::setStreamingFrequency(const uint32_t freq) {
streamingFrequency = freq;
for (const auto &it : dataProcessor)
it->SetStreamingFrequency(streamingFrequency);
LOG(logINFO) << "Streaming Frequency: " << streamingFrequency;
}
@ -1209,6 +1189,8 @@ uint32_t Implementation::getStreamingTimer() const {
void Implementation::setStreamingTimer(const uint32_t time_in_ms) {
streamingTimerInMs = time_in_ms;
for (const auto &it : dataProcessor)
it->SetStreamingTimerInMs(streamingTimerInMs);
LOG(logINFO) << "Streamer Timer: " << streamingTimerInMs;
}
@ -1218,6 +1200,8 @@ uint32_t Implementation::getStreamingStartingFrameNumber() const {
void Implementation::setStreamingStartingFrameNumber(const uint32_t fnum) {
streamingStartFnum = fnum;
for (const auto &it : dataProcessor)
it->SetStreamingStartFnum(streamingStartFnum);
LOG(logINFO) << "Streaming Start Frame num: " << streamingStartFnum;
}
@ -1313,7 +1297,7 @@ void Implementation::updateTotalNumberOfFrames() {
int64_t repeats = numberOfTriggers;
int64_t numFrames = numberOfFrames;
// gotthard2
if (detType == GOTTHARD2) {
if (generalData->detType == GOTTHARD2) {
// auto
if (timingMode == AUTO_TIMING) {
// burst mode, repeats = #bursts
@ -1336,6 +1320,8 @@ void Implementation::updateTotalNumberOfFrames() {
}
numberOfTotalFrames =
numFrames * repeats * (int64_t)(numberOfAdditionalStorageCells + 1);
for (const auto &it : dataStreamer)
it->SetNumberofTotalFrames(numberOfTotalFrames);
if (numberOfTotalFrames == 0) {
throw RuntimeError("Invalid total number of frames to receive: 0");
}
@ -1475,91 +1461,77 @@ void Implementation::setSubPeriod(const ns i) {
}
uint32_t Implementation::getNumberofAnalogSamples() const {
return numberOfAnalogSamples;
return generalData->nAnalogSamples;
}
void Implementation::setNumberofAnalogSamples(const uint32_t i) {
if (numberOfAnalogSamples != i) {
numberOfAnalogSamples = i;
if ( generalData->nAnalogSamples != i) {
generalData->SetNumberOfAnalogSamples(i);
SetupFifoStructure();
}
LOG(logINFO) << "Number of Analog Samples: " << numberOfAnalogSamples;
LOG(logINFO) << "Number of Analog Samples: " << generalData->nAnalogSamples;
LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame);
}
uint32_t Implementation::getNumberofDigitalSamples() const {
return numberOfDigitalSamples;
return generalData->nDigitalSamples;
}
void Implementation::setNumberofDigitalSamples(const uint32_t i) {
if (numberOfDigitalSamples != i) {
numberOfDigitalSamples = i;
if ( generalData->nDigitalSamples != i) {
generalData->SetNumberOfDigitalSamples(i);
SetupFifoStructure();
}
LOG(logINFO) << "Number of Digital Samples: " << numberOfDigitalSamples;
LOG(logINFO) << "Number of Digital Samples: " << generalData->nDigitalSamples;
LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame);
}
uint32_t Implementation::getCounterMask() const { return counterMask; }
uint32_t Implementation::getCounterMask() const { return generalData->counterMask; }
void Implementation::setCounterMask(const uint32_t i) {
if (counterMask != i) {
if (generalData->counterMask != i) {
generalData->SetCounterMask(i);
counterMask = i;
SetupFifoStructure();
}
LOG(logINFO) << "Counter mask: " << ToStringHex(counterMask);
int ncounters = __builtin_popcount(counterMask);
LOG(logINFO) << "Counter mask: " << ToStringHex(generalData->counterMask);
int ncounters = __builtin_popcount(generalData->counterMask);
LOG(logINFO) << "Number of counters: " << ncounters;
}
uint32_t Implementation::getDynamicRange() const { return dynamicRange; }
uint32_t Implementation::getDynamicRange() const { return generalData->dynamicRange; }
void Implementation::setDynamicRange(const uint32_t i) {
if (dynamicRange != i) {
dynamicRange = i;
if (detType == EIGER || detType == MYTHEN3) {
if (generalData->dynamicRange != i) {
if (generalData->detType == EIGER || generalData->detType == MYTHEN3) {
generalData->SetDynamicRange(i);
fifoDepth = generalData->defaultFifoDepth;
SetupFifoStructure();
}
}
LOG(logINFO) << "Dynamic Range: " << dynamicRange;
LOG(logINFO) << "Dynamic Range: " << generalData->dynamicRange;
}
slsDetectorDefs::ROI Implementation::getROI() const { return detectorRoi; }
slsDetectorDefs::ROI Implementation::getROI() const { return generalData->detectorRoi; }
void Implementation::setDetectorROI(slsDetectorDefs::ROI arg) {
if (detectorRoi.xmin != arg.xmin || detectorRoi.xmax != arg.xmax) {
detectorRoi.xmin = arg.xmin;
detectorRoi.xmax = arg.xmax;
if (generalData->detectorRoi.xmin != arg.xmin || generalData->detectorRoi.xmax != arg.xmax) {
// only for gotthard
generalData->SetDetectorROI(arg);
framesPerFile = generalData->maxFramesPerFile;
SetupFifoStructure();
}
LOG(logINFO) << "Detector ROI: " << ToString(detectorRoi);
LOG(logINFO) << "Detector ROI: " << ToString(generalData->detectorRoi);
LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame);
}
bool Implementation::getTenGigaEnable() const { return tengigaEnable; }
bool Implementation::getTenGigaEnable() const { return generalData->tengigaEnable; }
void Implementation::setTenGigaEnable(const bool b) {
if (tengigaEnable != b) {
tengigaEnable = b;
if ( generalData->tengigaEnable != b) {
generalData->SetTenGigaEnable(b);
SetupFifoStructure();
// datastream can be disabled/enabled only for Eiger 10GbE
if (detType == EIGER) {
if (generalData->detType == EIGER) {
if (!b) {
detectorDataStream[LEFT] = 1;
detectorDataStream[RIGHT] = 1;
@ -1573,7 +1545,7 @@ void Implementation::setTenGigaEnable(const bool b) {
<< ToString(detectorDataStream[RIGHT]) << "]";
}
}
LOG(logINFO) << "Ten Giga: " << (tengigaEnable ? "enabled" : "disabled");
LOG(logINFO) << "Ten Giga: " << ( generalData->tengigaEnable ? "enabled" : "disabled");
LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame);
}
@ -1581,19 +1553,8 @@ bool Implementation::getFlipRows() const { return flipRows; }
void Implementation::setFlipRows(bool enable) {
flipRows = enable;
if (!quadEnable) {
for (const auto &it : dataStreamer) {
it->SetFlipRows(flipRows);
}
}
// quad
else {
if (dataStreamer.size() == 2) {
dataStreamer[0]->SetFlipRows(false);
dataStreamer[1]->SetFlipRows(true);
}
}
for (const auto &it : dataStreamer)
it->SetFlipRows(flipRows);
LOG(logINFO) << "Flip Rows: " << flipRows;
}
@ -1603,15 +1564,9 @@ void Implementation::setQuad(const bool b) {
if (quadEnable != b) {
quadEnable = b;
setDetectorSize(numModules);
if (!quadEnable) {
for (const auto &it : dataStreamer) {
it->SetFlipRows(flipRows);
}
} else {
if (dataStreamer.size() == 2) {
dataStreamer[0]->SetFlipRows(false);
dataStreamer[1]->SetFlipRows(true);
}
for (const auto &it : dataStreamer) {
it->SetQuadEnable(quadEnable);
it->SetFlipRows(flipRows);
}
}
LOG(logINFO) << "Quad Enable: " << quadEnable;
@ -1641,7 +1596,7 @@ void Implementation::setDetectorDataStream(const portPosition port,
LOG(logINFO) << "Detector 10GbE datastream (" << ToString(port)
<< " Port): " << ToString(detectorDataStream10GbE[index]);
// update datastream for 10g
if (tengigaEnable) {
if ( generalData->tengigaEnable) {
detectorDataStream[index] = detectorDataStream10GbE[index];
LOG(logDEBUG) << "Detector datastream updated ["
<< (index == 0 ? "Left" : "Right")
@ -1673,59 +1628,63 @@ void Implementation::setRateCorrections(const std::vector<int64_t> &t) {
}
slsDetectorDefs::readoutMode Implementation::getReadoutMode() const {
return readoutType;
return generalData->readoutType;
}
void Implementation::setReadoutMode(const readoutMode f) {
if (readoutType != f) {
readoutType = f;
if (generalData->readoutType != f) {
generalData->SetReadoutMode(f);
SetupFifoStructure();
}
LOG(logINFO) << "Readout Mode: " << ToString(f);
LOG(logINFO) << "Readout Mode: " << ToString(generalData->readoutType);
LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame);
}
uint32_t Implementation::getADCEnableMask() const {
return adcEnableMaskOneGiga;
return generalData->adcEnableMaskOneGiga;
}
void Implementation::setADCEnableMask(uint32_t mask) {
if (adcEnableMaskOneGiga != mask) {
adcEnableMaskOneGiga = mask;
if (generalData->adcEnableMaskOneGiga != mask) {
generalData->SetOneGigaAdcEnableMask(mask);
SetupFifoStructure();
}
LOG(logINFO) << "ADC Enable Mask for 1Gb mode: 0x" << std::hex
<< adcEnableMaskOneGiga << std::dec;
<< generalData->adcEnableMaskOneGiga << std::dec;
LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame);
}
uint32_t Implementation::getTenGigaADCEnableMask() const {
return adcEnableMaskTenGiga;
return generalData->adcEnableMaskTenGiga;
}
void Implementation::setTenGigaADCEnableMask(uint32_t mask) {
if (adcEnableMaskTenGiga != mask) {
adcEnableMaskTenGiga = mask;
if (generalData->adcEnableMaskTenGiga != mask) {
generalData->SetTenGigaAdcEnableMask(mask);
SetupFifoStructure();
}
LOG(logINFO) << "ADC Enable Mask for 10Gb mode: 0x" << std::hex
<< adcEnableMaskTenGiga << std::dec;
<< generalData->adcEnableMaskTenGiga << std::dec;
LOG(logINFO) << "Packets per Frame: " << (generalData->packetsPerFrame);
}
std::vector<int> Implementation::getDbitList() const { return ctbDbitList; }
void Implementation::setDbitList(const std::vector<int> &v) { ctbDbitList = v; }
void Implementation::setDbitList(const std::vector<int> &v) {
ctbDbitList = v;
for (const auto &it : dataProcessor)
it->SetCtbDbitList(ctbDbitList);
LOG(logINFO) << "Dbit list: " << ToString(ctbDbitList);
}
int Implementation::getDbitOffset() const { return ctbDbitOffset; }
void Implementation::setDbitOffset(const int s) { ctbDbitOffset = s; }
void Implementation::setDbitOffset(const int s) {
ctbDbitOffset = s;
for (const auto &it : dataProcessor)
it->SetCtbDbitOffset(ctbDbitOffset);
LOG(logINFO) << "Dbit offset: " << ctbDbitOffset;
}
/**************************************************
* *

View File

@ -287,6 +287,9 @@ class Implementation : private virtual slsDetectorDefs {
void SetupWriter();
void StartMasterWriter();
void StartRunning();
void SetupListener(int i);
void SetupDataProcessor(int i);
void SetupDataStreamer(int i);
/**************************************************
* *
@ -295,13 +298,11 @@ class Implementation : private virtual slsDetectorDefs {
* ************************************************/
// config parameters
detectorType detType{GENERIC};
xy numModules{1, 1};
xy numPorts{1, 1};
int modulePos{0};
std::string detHostname;
bool silentMode{false};
uint32_t fifoDepth{0};
frameDiscardPolicy frameDiscardMode{NO_DISCARD};
bool framePadding{true};
pid_t parentThreadId;
@ -319,7 +320,6 @@ class Implementation : private virtual slsDetectorDefs {
bool fileWriteEnable{false};
bool masterFileWriteEnable{true};
bool overwriteEnable{true};
uint32_t framesPerFile{0};
// acquisition
std::atomic<runStatus> status{IDLE};
@ -327,11 +327,9 @@ class Implementation : private virtual slsDetectorDefs {
scanParameters scanParams{};
// network configuration (UDP)
int numUDPInterfaces{1};
std::array<std::string, MAX_NUMBER_OF_LISTENING_THREADS> eth;
std::array<uint32_t, MAX_NUMBER_OF_LISTENING_THREADS> udpPortNum{
{DEFAULT_UDP_PORTNO, DEFAULT_UDP_PORTNO + 1}};
int udpSocketBufferSize{0};
int actualUDPSocketBufferSize{0};
// zmq parameters
@ -363,12 +361,6 @@ class Implementation : private virtual slsDetectorDefs {
ns gateDelay3 = std::chrono::nanoseconds(0);
ns subExpTime = std::chrono::nanoseconds(0);
ns subPeriod = std::chrono::nanoseconds(0);
uint32_t numberOfAnalogSamples{0};
uint32_t numberOfDigitalSamples{0};
uint32_t counterMask{0};
uint32_t dynamicRange{16};
ROI detectorRoi{};
bool tengigaEnable{false};
bool flipRows{false};
bool quadEnable{false};
bool activated{true};
@ -379,9 +371,6 @@ class Implementation : private virtual slsDetectorDefs {
int thresholdEnergyeV{-1};
std::array<int, 3> thresholdAllEnergyeV = {{-1, -1, -1}};
std::vector<int64_t> rateCorrections;
readoutMode readoutType{ANALOG_ONLY};
uint32_t adcEnableMaskOneGiga{BIT32_MASK};
uint32_t adcEnableMaskTenGiga{BIT32_MASK};
std::vector<int> ctbDbitList;
int ctbDbitOffset{0};

View File

@ -24,16 +24,15 @@ namespace sls {
const std::string Listener::TypeName = "Listener";
Listener::Listener(int index, Fifo *fifo, std::atomic<runStatus> *status, uint32_t *udpPortNumber, std::string *eth, int *udpSocketBufferSize, int *actualUDPSocketBufferSize, uint32_t *framesPerFile, frameDiscardPolicy *frameDiscardMode, bool *silentMode)
: ThreadObject(index, TypeName), fifo(fifo), status(status),
udpPortNumber(udpPortNumber), eth(eth), udpSocketBufferSize(udpSocketBufferSize), actualUDPSocketBufferSize(actualUDPSocketBufferSize), framesPerFile(framesPerFile), frameDiscardMode(frameDiscardMode), silentMode(silentMode) {
Listener::Listener(int index, std::atomic<runStatus> *status)
: ThreadObject(index, TypeName), status(status) {
LOG(logDEBUG) << "Listener " << index << " created";
}
Listener::~Listener() = default;
bool Listener::isPortDisabled() const {
return disabledPort;
bool Listener::isPortDisabled() const {
return disabledPort;
}
uint64_t Listener::GetPacketsCaught() const { return numPacketsCaught; }
@ -72,6 +71,47 @@ uint64_t Listener::GetListenedIndex() const {
void Listener::SetFifo(Fifo *f) { fifo = f; }
void Listener::SetGeneralData(GeneralData *g) { generalData = g; }
void Listener::SetUdpPortNumber(const uint32_t portNumber) {
udpPortNumber = portNumber;
}
void Listener::SetEthernetInterface(const std::string e) {
eth = e;
// if eth is mistaken with ip address
if (eth.find('.') != std::string::npos) {
eth = "";
}
if (!eth.length()) {
LOG(logWARNING) << "ethernet interface for udp port " << udpPortNumber << " is empty. Listening to all";
}
}
void Listener::SetActivate(bool enable) {
activated = enable;
disabledPort = (!activated || !detectorDataStream || noRoi);
}
void Listener::SetDetectorDatastream(bool enable) {
detectorDataStream = enable;
disabledPort = (!activated || !detectorDataStream || noRoi);
}
void Listener::SetNoRoi(bool enable) {
noRoi = enable;
disabledPort = (!activated || !detectorDataStream || noRoi);
}
void Listener::SetFrameDiscardPolicy(frameDiscardPolicy value) {
frameDiscardMode = value;
}
void Listener::SetSilentMode(bool enable) {
silentMode = enable;
}
void Listener::ResetParametersforNewAcquisition() {
StopRunning();
startedFlag = false;
@ -104,43 +144,17 @@ void Listener::RecordFirstIndex(uint64_t fnum) {
startedFlag = true;
firstIndex = fnum;
if (!(*silentMode)) {
if (!silentMode) {
if (!index) {
LOG(logINFOBLUE) << index << " First Index: " << firstIndex;
}
}
}
void Listener::SetGeneralData(GeneralData *g) { generalData = g; }
void Listener::SetActivate(bool enable) {
activated = enable;
disabledPort = (!activated || !detectorDataStream || noRoi);
}
void Listener::SetDetectorDatastream(bool enable) {
detectorDataStream = enable;
disabledPort = (!activated || !detectorDataStream || noRoi);
}
void Listener::SetNoRoi(bool enable) {
noRoi = enable;
disabledPort = (!activated || !detectorDataStream || noRoi);
}
void Listener::CreateUDPSockets() {
void Listener::CreateUDPSocket(int& actualSize) {
if (disabledPort) {
return;
}
// if eth is mistaken with ip address
if ((*eth).find('.') != std::string::npos) {
(*eth) = "";
}
if (!(*eth).length()) {
LOG(logWARNING) << "eth is empty. Listening to all";
}
ShutDownUDPSocket();
uint32_t packetSize = generalData->packetSize;
@ -148,23 +162,20 @@ void Listener::CreateUDPSockets() {
packetSize = generalData->vetoPacketSize;
}
// InterfaceNameToIp(eth).str().c_str()
try {
udpSocket = make_unique<UdpRxSocket>(
*udpPortNumber, packetSize,
((*eth).length() ? InterfaceNameToIp(*eth).str().c_str()
: nullptr),
*udpSocketBufferSize);
LOG(logINFO) << index << ": UDP port opened at port " << *udpPortNumber;
udpSocket = make_unique<UdpRxSocket>(udpPortNumber, packetSize,
(eth.length() ? InterfaceNameToIp(eth).str().c_str()
: nullptr), generalData->udpSocketBufferSize);
LOG(logINFO) << index << ": UDP port opened at port " << udpPortNumber;
} catch (...) {
throw RuntimeError("Could not create UDP socket on port " +
std::to_string(*udpPortNumber));
std::to_string(udpPortNumber));
}
udpSocketAlive = true;
// doubled due to kernel bookkeeping (could also be less due to permissions)
*actualUDPSocketBufferSize = udpSocket->getBufferSize();
actualSize = udpSocket->getBufferSize();
}
void Listener::ShutDownUDPSocket() {
@ -173,27 +184,24 @@ void Listener::ShutDownUDPSocket() {
// give other thread time after udpSocketAlive is changed
usleep(0);
udpSocket->Shutdown();
LOG(logINFO) << "Shut down of UDP port " << *udpPortNumber;
LOG(logINFO) << "Shut down of UDP port " << udpPortNumber;
}
}
void Listener::CreateDummySocketForUDPSocketBufferSize(int s) {
LOG(logINFO) << "Testing UDP Socket Buffer size " << s << " with test port "
<< *udpPortNumber;
void Listener::CreateDummySocketForUDPSocketBufferSize(int s, int& actualSize) {
// custom setup (s != 0)
// default setup at startup (s = 0)
int size = (s == 0 ? generalData->udpSocketBufferSize : s);
LOG(logINFO) << "Testing UDP Socket Buffer size " << size << " with test port "
<< udpPortNumber;
int previousSize = generalData->udpSocketBufferSize;
generalData->udpSocketBufferSize = size;
if (disabledPort) {
*actualUDPSocketBufferSize = (s * 2);
actualSize = (generalData->udpSocketBufferSize * 2);
return;
}
int temp = *udpSocketBufferSize;
*udpSocketBufferSize = s;
// if eth is mistaken with ip address
if ((*eth).find('.') != std::string::npos) {
(*eth) = "";
}
uint32_t packetSize = generalData->packetSize;
if (generalData->detType == GOTTHARD2 && index != 0) {
packetSize = generalData->vetoPacketSize;
@ -201,24 +209,29 @@ void Listener::CreateDummySocketForUDPSocketBufferSize(int s) {
// create dummy socket
try {
UdpRxSocket g(*udpPortNumber, packetSize,
((*eth).length()
? InterfaceNameToIp(*eth).str().c_str()
: nullptr),
*udpSocketBufferSize);
UdpRxSocket g(udpPortNumber, packetSize,
(eth.length()
? InterfaceNameToIp(eth).str().c_str()
: nullptr), generalData->udpSocketBufferSize);
// doubled due to kernel bookkeeping (could also be less due to
// permissions)
*actualUDPSocketBufferSize = g.getBufferSize();
if (*actualUDPSocketBufferSize == -1) {
*udpSocketBufferSize = temp;
actualSize = g.getBufferSize();
if (actualSize == -1) {
generalData->udpSocketBufferSize = previousSize;
} else {
*udpSocketBufferSize = (*actualUDPSocketBufferSize) / 2;
generalData->udpSocketBufferSize = actualSize / 2;
}
} catch (...) {
throw RuntimeError("Could not create a test UDP socket on port " +
std::to_string(*udpPortNumber));
std::to_string(udpPortNumber));
}
// custom and didnt set, throw error
if (s != 0 && static_cast<int>(generalData->udpSocketBufferSize) != s) {
throw RuntimeError("Could not set udp socket buffer size. (No "
"CAP_NET_ADMIN privileges?)");
}
}
@ -257,12 +270,12 @@ void Listener::ThreadExecution() {
fifo->PushAddress(buffer);
// Statistics
if (!(*silentMode)) {
if (!silentMode) {
numFramesStatistic++;
if (numFramesStatistic >=
// second condition also for infinite #number of frames
(((*framesPerFile) == 0) ? STATISTIC_FRAMENUMBER_INFINITE
: (*framesPerFile)))
(generalData->framesPerFile == 0 ? STATISTIC_FRAMENUMBER_INFINITE
: generalData->framesPerFile))
PrintFifoStatistics();
}
}
@ -271,7 +284,7 @@ void Listener::StopListening(char *buf, size_t & size) {
size = DUMMY_PACKET_VALUE;
fifo->PushAddress(buf);
StopRunning();
LOG(logDEBUG1) << index << ": Listening Completed. Packets (" << *udpPortNumber
LOG(logDEBUG1) << index << ": Listening Completed. Packets (" << udpPortNumber
<< ") : " << numPacketsCaught;
}
@ -342,7 +355,7 @@ uint32_t Listener::ListenToAnImage(sls_receiver_header & dstHeader, char *dstDat
// Eiger Firmware in a weird state
if (generalData->detType == EIGER && fnum == 0) {
LOG(logERROR) << "[" << *udpPortNumber
LOG(logERROR) << "[" << udpPortNumber
<< "]: Got Frame Number "
"Zero from Firmware. Discarding Packet";
numPacketsCaught--;
@ -385,7 +398,7 @@ uint32_t Listener::ListenToAnImage(sls_receiver_header & dstHeader, char *dstDat
}
size_t Listener::HandleFuturePacket(bool EOA, uint32_t numpackets, uint64_t fnum, bool isHeaderEmpty, size_t imageSize, sls_receiver_header& dstHeader) {
switch (*frameDiscardMode) {
switch (frameDiscardMode) {
case DISCARD_EMPTY_FRAMES:
if (!numpackets) {
if (!EOA) {
@ -495,7 +508,7 @@ void Listener::PrintFifoStatistics() {
numFramesStatistic = 0;
const auto color = loss ? logINFORED : logINFOGREEN;
LOG(color) << "[" << *udpPortNumber
LOG(color) << "[" << udpPortNumber
<< "]: "
"Packet_Loss:"
<< loss << " (" << lossPercent << "%)"

View File

@ -24,35 +24,35 @@ class Fifo;
class Listener : private virtual slsDetectorDefs, public ThreadObject {
public:
Listener(int index, Fifo *fifo, std::atomic<runStatus> *status, uint32_t *udpPortNumber, std::string *eth, int *udpSocketBufferSize, int *actualUDPSocketBufferSize, uint32_t *framesPerFile, frameDiscardPolicy *frameDiscardMode, bool *silentMode);
Listener(int index, std::atomic<runStatus> *status);
~Listener();
bool isPortDisabled() const;
uint64_t GetPacketsCaught() const;
uint64_t GetNumCompleteFramesCaught() const;
uint64_t GetLastFrameIndexCaught() const;
/** negative values in case of extra packets */
int64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const;
bool GetStartedFlag() const;
uint64_t GetCurrentFrameIndex() const;
uint64_t GetListenedIndex() const;
bool isPortDisabled() const;
uint64_t GetPacketsCaught() const;
uint64_t GetNumCompleteFramesCaught() const;
uint64_t GetLastFrameIndexCaught() const;
/** negative values in case of extra packets */
int64_t GetNumMissingPacket(bool stoppedFlag, uint64_t numPackets) const;
bool GetStartedFlag() const;
uint64_t GetCurrentFrameIndex() const;
uint64_t GetListenedIndex() const;
void SetFifo(Fifo *f);
void ResetParametersforNewAcquisition();
void SetGeneralData(GeneralData *g);
void SetActivate(bool enable);
void SetDetectorDatastream(bool enable);
void SetNoRoi(bool enable);
void CreateUDPSockets();
void ShutDownUDPSocket();
void SetFifo(Fifo *f);
void SetGeneralData(GeneralData *g);
void SetUdpPortNumber(const uint32_t portNumber);
void SetEthernetInterface(const std::string e);
void SetActivate(bool enable);
void SetDetectorDatastream(bool enable);
void SetNoRoi(bool enable);
void SetFrameDiscardPolicy(frameDiscardPolicy value);
void SetSilentMode(bool enable);
/**
* Create & closes a dummy UDP socket
* to set & get actual buffer size
* @param s UDP socket buffer size to be set
*/
void CreateDummySocketForUDPSocketBufferSize(int s);
void ResetParametersforNewAcquisition();
void CreateUDPSocket(int& actualSize);
void ShutDownUDPSocket();
/** to set & get actual buffer size */
void CreateDummySocketForUDPSocketBufferSize(int s, int & actualSize);
/**
* Set hard coded (calculated but not from detector) row and column
@ -102,17 +102,14 @@ class Listener : private virtual slsDetectorDefs, public ThreadObject {
// individual members
std::atomic<runStatus> *status;
std::unique_ptr<UdpRxSocket> udpSocket{nullptr};
uint32_t *udpPortNumber;
std::string *eth;
int *udpSocketBufferSize;
/** double due to kernel bookkeeping */
int *actualUDPSocketBufferSize;
uint32_t *framesPerFile;
frameDiscardPolicy *frameDiscardMode;
uint32_t udpPortNumber{0};
std::string eth;
frameDiscardPolicy frameDiscardMode;
bool activated{false};
bool detectorDataStream{true};
bool noRoi{false};
bool *silentMode;
bool silentMode;
bool disabledPort{false};
/** row hardcoded as 1D or 2d,