Dev/rx callbacks (#966)

* changed rxr callback signatures to all include structs
* removed datamodify call back as size can be changed in the original data call back now
* bringing some parameters (set functions) to dataProcessor class for its callback (namely udpport, quad, fliprows, totalframes, jsonheader), resulting in also removing totalframes from 2 other function signatures

* updated MultiReceiverApp to reflect the new callback signatures
This commit is contained in:
maliakal_d 2024-09-30 16:30:13 +02:00 committed by GitHub
parent 007330caa7
commit a44ba4dc35
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 319 additions and 257 deletions

View File

@ -41,49 +41,34 @@ class Receiver : private virtual slsDetectorDefs {
/** /**
* Start Acquisition Call back (slsMultiReceiver writes data if file write * Start Acquisition Call back (slsMultiReceiver writes data if file write
* enabled) if registerCallBackRawDataReady or * enabled) if registerCallBackRawDataReady or
* registerCallBackRawDataModifyReady registered, users get data callback * registerCallBackRawDataModifyReady registered
* arguments are: * Call back arguments are:
* - file path * - startCallbackHeader metadata
* - file name prefix
* - file index
* - image size in bytes
*/ */
void registerCallBackStartAcquisition(int (*func)(const std::string &, void registerCallBackStartAcquisition(int (*func)(const startCallbackHeader,
const std::string &, void *),
uint64_t, size_t, void *),
void *arg); void *arg);
/** /**
* Call back for acquisition finished * Call back for acquisition finished
* callback argument is: * callback argument is:
* - total frames caught * - startCallbackHeader metadata
*/ */
void registerCallBackAcquisitionFinished(void (*func)(uint64_t, void *), void registerCallBackAcquisitionFinished(
void *arg); void (*func)(const endCallbackHeader, void *), void *arg);
/** /**
* Call back for raw data * Call back for raw data
* args to raw data ready callback are: * args to raw data ready callback are:
* - sls_receiver_header frame metadata, * - sls_receiver_header frame metadata,
* - dataCallbackHeader metadata
* - pointer to data * - pointer to data
* - image size in bytes * - image size in bytes. Can be modified to the new size to be
* written/streamed. (only smaller value allowed).
*/ */
void registerCallBackRawDataReady(void (*func)(sls_receiver_header &, void registerCallBackRawDataReady(void (*func)(sls_receiver_header &,
char *, size_t, void *), const dataCallbackHeader,
void *arg); char *, size_t &, void *),
/**
* Call back for raw data (modified)
* args to raw data ready callback are:
* - sls_receiver_header frame metadata,
* - pointer to data
* - revDatasize is the reference of data size in bytes.
* Can be modified to the new size to be written/streamed. (only smaller
* value allowed).
*/
void registerCallBackRawDataModifyReady(void (*func)(sls_receiver_header &,
char *, size_t &,
void *),
void *arg); void *arg);
private: private:

View File

@ -54,32 +54,25 @@ std::string ClientInterface::getReceiverVersion() { return APIRECEIVER; }
/***callback functions***/ /***callback functions***/
void ClientInterface::registerCallBackStartAcquisition( void ClientInterface::registerCallBackStartAcquisition(
int (*func)(const std::string &, const std::string &, uint64_t, size_t, int (*func)(const startCallbackHeader, void *), void *arg) {
void *),
void *arg) {
startAcquisitionCallBack = func; startAcquisitionCallBack = func;
pStartAcquisition = arg; pStartAcquisition = arg;
} }
void ClientInterface::registerCallBackAcquisitionFinished(void (*func)(uint64_t, void ClientInterface::registerCallBackAcquisitionFinished(
void *), void (*func)(const endCallbackHeader, void *), void *arg) {
void *arg) {
acquisitionFinishedCallBack = func; acquisitionFinishedCallBack = func;
pAcquisitionFinished = arg; pAcquisitionFinished = arg;
} }
void ClientInterface::registerCallBackRawDataReady( void ClientInterface::registerCallBackRawDataReady(
void (*func)(sls_receiver_header &, char *, size_t, void *), void *arg) { void (*func)(sls_receiver_header &, dataCallbackHeader, char *, size_t &,
void *),
void *arg) {
rawDataReadyCallBack = func; rawDataReadyCallBack = func;
pRawDataReady = arg; pRawDataReady = arg;
} }
void ClientInterface::registerCallBackRawDataModifyReady(
void (*func)(sls_receiver_header &, char *, size_t &, void *), void *arg) {
rawDataModifyReadyCallBack = func;
pRawDataReady = arg;
}
void ClientInterface::startTCPServer() { void ClientInterface::startTCPServer() {
tcpThreadId = gettid(); tcpThreadId = gettid();
LOG(logINFOBLUE) << "Created [ TCP server Tid: " << tcpThreadId << "]"; LOG(logINFOBLUE) << "Created [ TCP server Tid: " << tcpThreadId << "]";
@ -477,9 +470,6 @@ void ClientInterface::setDetectorType(detectorType arg) {
if (rawDataReadyCallBack != nullptr) if (rawDataReadyCallBack != nullptr)
impl()->registerCallBackRawDataReady(rawDataReadyCallBack, impl()->registerCallBackRawDataReady(rawDataReadyCallBack,
pRawDataReady); pRawDataReady);
if (rawDataModifyReadyCallBack != nullptr)
impl()->registerCallBackRawDataModifyReady(rawDataModifyReadyCallBack,
pRawDataReady);
impl()->setThreadIds(parentThreadId, tcpThreadId); impl()->setThreadIds(parentThreadId, tcpThreadId);
} }

View File

@ -34,24 +34,18 @@ class ClientInterface : private virtual slsDetectorDefs {
//***callback functions*** //***callback functions***
/** params: file path, file name, file index, image size */ /** params: file path, file name, file index, image size */
void registerCallBackStartAcquisition(int (*func)(const std::string &, void registerCallBackStartAcquisition(int (*func)(const startCallbackHeader,
const std::string &, void *),
uint64_t, size_t, void *),
void *arg); void *arg);
/** params: total frames caught */ /** params: total frames caught */
void registerCallBackAcquisitionFinished(void (*func)(uint64_t, void *), void registerCallBackAcquisitionFinished(
void *arg); void (*func)(const endCallbackHeader, void *), void *arg);
/** params: sls_receiver_header, pointer to data, image size */ /** params: sls_receiver_header, pointer to data, image size */
void registerCallBackRawDataReady(void (*func)(sls_receiver_header &, void registerCallBackRawDataReady(void (*func)(sls_receiver_header &,
char *, size_t, void *), const dataCallbackHeader,
void *arg); char *, size_t &, void *),
/** params: sls_receiver_header, pointer to data, reference to image size */
void registerCallBackRawDataModifyReady(void (*func)(sls_receiver_header &,
char *, size_t &,
void *),
void *arg); void *arg);
private: private:
@ -186,15 +180,14 @@ class ClientInterface : private virtual slsDetectorDefs {
//***callback parameters*** //***callback parameters***
int (*startAcquisitionCallBack)(const std::string &, const std::string &, int (*startAcquisitionCallBack)(const startCallbackHeader,
uint64_t, size_t, void *) = nullptr; void *) = nullptr;
void *pStartAcquisition{nullptr}; void *pStartAcquisition{nullptr};
void (*acquisitionFinishedCallBack)(uint64_t, void *) = nullptr; void (*acquisitionFinishedCallBack)(const endCallbackHeader,
void *) = nullptr;
void *pAcquisitionFinished{nullptr}; void *pAcquisitionFinished{nullptr};
void (*rawDataReadyCallBack)(sls_receiver_header &, char *, size_t, void (*rawDataReadyCallBack)(sls_receiver_header &, dataCallbackHeader,
void *) = nullptr; char *, size_t &, void *) = nullptr;
void (*rawDataModifyReadyCallBack)(sls_receiver_header &, char *, size_t &,
void *) = nullptr;
void *pRawDataReady{nullptr}; void *pRawDataReady{nullptr};
pid_t parentThreadId{0}; pid_t parentThreadId{0};

View File

@ -41,6 +41,10 @@ void DataProcessor::SetFifo(Fifo *f) { fifo = f; }
void DataProcessor::SetGeneralData(GeneralData *g) { generalData = g; } void DataProcessor::SetGeneralData(GeneralData *g) { generalData = g; }
void DataProcessor::SetUdpPortNumber(const uint16_t portNumber) {
udpPortNumber = portNumber;
}
void DataProcessor::SetActivate(bool enable) { activated = enable; } void DataProcessor::SetActivate(bool enable) { activated = enable; }
void DataProcessor::SetReceiverROI(ROI roi) { void DataProcessor::SetReceiverROI(ROI roi) {
@ -73,6 +77,27 @@ void DataProcessor::SetCtbDbitList(std::vector<int> value) {
void DataProcessor::SetCtbDbitOffset(int value) { ctbDbitOffset = value; } void DataProcessor::SetCtbDbitOffset(int value) { ctbDbitOffset = value; }
void DataProcessor::SetQuadEnable(bool value) { quadEnable = value; }
void DataProcessor::SetFlipRows(bool fd) {
flipRows = fd;
// flip only right port of quad
if (quadEnable) {
flipRows = (index == 1 ? true : false);
}
}
void DataProcessor::SetNumberofTotalFrames(uint64_t value) {
nTotalFrames = value;
}
void DataProcessor::SetAdditionalJsonHeader(
const std::map<std::string, std::string> &json) {
std::lock_guard<std::mutex> lock(additionalJsonMutex);
additionalJsonHeader = json;
isAdditionalJsonUpdated = true;
}
void DataProcessor::ResetParametersforNewAcquisition() { void DataProcessor::ResetParametersforNewAcquisition() {
StopRunning(); StopRunning();
startedFlag = false; startedFlag = false;
@ -127,8 +152,6 @@ void DataProcessor::CreateFirstFiles(const std::string &fileNamePrefix,
const uint64_t fileIndex, const uint64_t fileIndex,
const bool overWriteEnable, const bool overWriteEnable,
const bool silentMode, const bool silentMode,
const uint16_t udpPortNumber,
const uint64_t numImages,
const bool detectorDataStream) { const bool detectorDataStream) {
if (dataFile == nullptr) { if (dataFile == nullptr) {
throw RuntimeError("file object not contstructed"); throw RuntimeError("file object not contstructed");
@ -156,7 +179,7 @@ void DataProcessor::CreateFirstFiles(const std::string &fileNamePrefix,
case HDF5: case HDF5:
dataFile->CreateFirstHDF5DataFile( dataFile->CreateFirstHDF5DataFile(
fileNamePrefix, fileIndex, overWriteEnable, silentMode, fileNamePrefix, fileIndex, overWriteEnable, silentMode,
udpPortNumber, generalData->framesPerFile, numImages, nx, ny, udpPortNumber, generalData->framesPerFile, nTotalFrames, nx, ny,
generalData->dynamicRange); generalData->dynamicRange);
break; break;
#endif #endif
@ -182,8 +205,8 @@ uint32_t DataProcessor::GetFilesInAcquisition() const {
std::string DataProcessor::CreateVirtualFile( std::string DataProcessor::CreateVirtualFile(
const std::string &filePath, const std::string &fileNamePrefix, const std::string &filePath, const std::string &fileNamePrefix,
const uint64_t fileIndex, const bool overWriteEnable, const bool silentMode, const uint64_t fileIndex, const bool overWriteEnable, const bool silentMode,
const int modulePos, const uint64_t numImages, const int numModX, const int modulePos, const int numModX, const int numModY,
const int numModY, std::mutex *hdf5LibMutex) { std::mutex *hdf5LibMutex) {
if (receiverRoiEnabled) { if (receiverRoiEnabled) {
throw std::runtime_error( throw std::runtime_error(
@ -361,14 +384,31 @@ void DataProcessor::ProcessAnImage(sls_receiver_header &header, size_t &size,
} }
try { try {
// normal call back // callbacks
if (rawDataReadyCallBack != nullptr) { if (rawDataReadyCallBack != nullptr) {
rawDataReadyCallBack(header, data, size, pRawDataReady);
uint64_t frameIndex = fnum - firstIndex;
// update local copy only if it was updated (to prevent locking each
// time)
if (isAdditionalJsonUpdated) {
std::lock_guard<std::mutex> lock(additionalJsonMutex);
localAdditionalJsonHeader = additionalJsonHeader;
isAdditionalJsonUpdated = false;
} }
// call back with modified size dataCallbackHeader callbackHeader = {
else if (rawDataModifyReadyCallBack != nullptr) { udpPortNumber,
rawDataModifyReadyCallBack(header, data, size, pRawDataReady); {static_cast<int>(generalData->nPixelsX),
static_cast<int>(generalData->nPixelsY)},
fnum,
frameIndex,
(100 * ((double)(frameIndex + 1) / (double)(nTotalFrames))),
(nump == generalData->packetsPerFrame ? true : false),
flipRows,
localAdditionalJsonHeader};
rawDataReadyCallBack(header, callbackHeader, data, size,
pRawDataReady);
} }
} catch (const std::exception &e) { } catch (const std::exception &e) {
throw RuntimeError("Get Data Callback Error: " + std::string(e.what())); throw RuntimeError("Get Data Callback Error: " + std::string(e.what()));
@ -427,17 +467,13 @@ bool DataProcessor::CheckCount() {
} }
void DataProcessor::registerCallBackRawDataReady( void DataProcessor::registerCallBackRawDataReady(
void (*func)(sls_receiver_header &, char *, size_t, void *), void *arg) { void (*func)(sls_receiver_header &, dataCallbackHeader, char *, size_t &,
void *),
void *arg) {
rawDataReadyCallBack = func; rawDataReadyCallBack = func;
pRawDataReady = arg; pRawDataReady = arg;
} }
void DataProcessor::registerCallBackRawDataModifyReady(
void (*func)(sls_receiver_header &, char *, size_t &, void *), void *arg) {
rawDataModifyReadyCallBack = func;
pRawDataReady = arg;
}
void DataProcessor::PadMissingPackets(sls_receiver_header header, char *data) { void DataProcessor::PadMissingPackets(sls_receiver_header header, char *data) {
LOG(logDEBUG) << index << ": Padding Missing Packets"; LOG(logDEBUG) << index << ": Padding Missing Packets";

View File

@ -37,6 +37,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
void SetFifo(Fifo *f); void SetFifo(Fifo *f);
void SetGeneralData(GeneralData *generalData); void SetGeneralData(GeneralData *generalData);
void SetUdpPortNumber(const uint16_t portNumber);
void SetActivate(bool enable); void SetActivate(bool enable);
void SetReceiverROI(ROI roi); void SetReceiverROI(ROI roi);
void SetDataStreamEnable(bool enable); void SetDataStreamEnable(bool enable);
@ -46,6 +47,11 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
void SetFramePadding(bool enable); void SetFramePadding(bool enable);
void SetCtbDbitList(std::vector<int> value); void SetCtbDbitList(std::vector<int> value);
void SetCtbDbitOffset(int value); void SetCtbDbitOffset(int value);
void SetQuadEnable(bool value);
void SetFlipRows(bool fd);
void SetNumberofTotalFrames(uint64_t value);
void
SetAdditionalJsonHeader(const std::map<std::string, std::string> &json);
void ResetParametersforNewAcquisition(); void ResetParametersforNewAcquisition();
void CloseFiles(); void CloseFiles();
@ -56,9 +62,7 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
void CreateFirstFiles(const std::string &fileNamePrefix, void CreateFirstFiles(const std::string &fileNamePrefix,
const uint64_t fileIndex, const bool overWriteEnable, const uint64_t fileIndex, const bool overWriteEnable,
const bool silentMode, const uint16_t udpPortNumber, const bool silentMode, const bool detectorDataStream);
const uint64_t numImages,
const bool detectorDataStream);
#ifdef HDF5C #ifdef HDF5C
uint32_t GetFilesInAcquisition() const; uint32_t GetFilesInAcquisition() const;
std::string CreateVirtualFile(const std::string &filePath, std::string CreateVirtualFile(const std::string &filePath,
@ -66,8 +70,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
const uint64_t fileIndex, const uint64_t fileIndex,
const bool overWriteEnable, const bool overWriteEnable,
const bool silentMode, const int modulePos, const bool silentMode, const int modulePos,
const uint64_t numImages, const int numModX, const int numModX, const int numModY,
const int numModY, std::mutex *hdf5LibMutex); std::mutex *hdf5LibMutex);
void LinkFileInMaster(const std::string &masterFileName, void LinkFileInMaster(const std::string &masterFileName,
const std::string &virtualFileName, const std::string &virtualFileName,
const bool silentMode, std::mutex *hdf5LibMutex); const bool silentMode, std::mutex *hdf5LibMutex);
@ -83,13 +87,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
/** params: sls_receiver_header, pointer to data, image size */ /** params: sls_receiver_header, pointer to data, image size */
void registerCallBackRawDataReady(void (*func)(sls_receiver_header &, void registerCallBackRawDataReady(void (*func)(sls_receiver_header &,
char *, size_t, void *), dataCallbackHeader, char *,
void *arg); size_t &, void *),
/** params: sls_receiver_header, pointer to data, reference to image size */
void registerCallBackRawDataModifyReady(void (*func)(sls_receiver_header &,
char *, size_t &,
void *),
void *arg); void *arg);
private: private:
@ -150,6 +149,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
GeneralData *generalData{nullptr}; GeneralData *generalData{nullptr};
Fifo *fifo; Fifo *fifo;
uint16_t udpPortNumber{0};
bool dataStreamEnable; bool dataStreamEnable;
bool activated{false}; bool activated{false};
ROI receiverRoi{}; ROI receiverRoi{};
@ -167,6 +168,18 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
int ctbDbitOffset; int ctbDbitOffset;
std::atomic<bool> startedFlag{false}; std::atomic<bool> startedFlag{false};
std::atomic<uint64_t> firstIndex{0}; std::atomic<uint64_t> firstIndex{0};
bool quadEnable{false};
bool flipRows{false};
uint64_t nTotalFrames{0};
std::map<std::string, std::string> additionalJsonHeader;
/** Used by streamer thread to update local copy (reduce number of locks
* during streaming) */
std::atomic<bool> isAdditionalJsonUpdated{false};
/** mutex to update json and to read and update local copy */
mutable std::mutex additionalJsonMutex;
/** local copy of additional json header (it can be update on the fly) */
std::map<std::string, std::string> localAdditionalJsonHeader;
// for statistics // for statistics
uint64_t numFramesCaught{0}; uint64_t numFramesCaught{0};
@ -189,19 +202,8 @@ class DataProcessor : private virtual slsDetectorDefs, public ThreadObject {
* dataPointer is the pointer to the data * dataPointer is the pointer to the data
* dataSize in bytes is the size of the data in bytes. * dataSize in bytes is the size of the data in bytes.
*/ */
void (*rawDataReadyCallBack)(sls_receiver_header &, char *, size_t, void (*rawDataReadyCallBack)(sls_receiver_header &, dataCallbackHeader,
void *) = nullptr; char *, size_t &, void *) = nullptr;
/**
* Call back for raw data (modified)
* args to raw data ready callback are
* sls_receiver_header frame metadata
* dataPointer is the pointer to the data
* revDatasize is the reference of data size in bytes. Can be modified to
* the new size to be written/streamed. (only smaller value).
*/
void (*rawDataModifyReadyCallBack)(sls_receiver_header &, char *, size_t &,
void *) = nullptr;
void *pRawDataReady{nullptr}; void *pRawDataReady{nullptr};
}; };

View File

@ -196,6 +196,7 @@ void Implementation::SetupListener(int i) {
void Implementation::SetupDataProcessor(int i) { void Implementation::SetupDataProcessor(int i) {
dataProcessor[i]->SetFifo(fifo[i].get()); dataProcessor[i]->SetFifo(fifo[i].get());
dataProcessor[i]->SetGeneralData(generalData); dataProcessor[i]->SetGeneralData(generalData);
dataProcessor[i]->SetUdpPortNumber(udpPortNum[i]);
dataProcessor[i]->SetActivate(activated); dataProcessor[i]->SetActivate(activated);
dataProcessor[i]->SetReceiverROI(portRois[i]); dataProcessor[i]->SetReceiverROI(portRois[i]);
dataProcessor[i]->SetDataStreamEnable(dataStreamEnable); dataProcessor[i]->SetDataStreamEnable(dataStreamEnable);
@ -205,6 +206,10 @@ void Implementation::SetupDataProcessor(int i) {
dataProcessor[i]->SetFramePadding(framePadding); dataProcessor[i]->SetFramePadding(framePadding);
dataProcessor[i]->SetCtbDbitList(ctbDbitList); dataProcessor[i]->SetCtbDbitList(ctbDbitList);
dataProcessor[i]->SetCtbDbitOffset(ctbDbitOffset); dataProcessor[i]->SetCtbDbitOffset(ctbDbitOffset);
dataProcessor[i]->SetQuadEnable(quadEnable);
dataProcessor[i]->SetFlipRows(flipRows);
dataProcessor[i]->SetNumberofTotalFrames(numberOfTotalFrames);
dataProcessor[i]->SetAdditionalJsonHeader(additionalJsonHeader);
} }
void Implementation::SetupDataStreamer(int i) { void Implementation::SetupDataStreamer(int i) {
@ -216,7 +221,6 @@ void Implementation::SetupDataStreamer(int i) {
dataStreamer[i]->SetQuadEnable(quadEnable); dataStreamer[i]->SetQuadEnable(quadEnable);
dataStreamer[i]->SetFlipRows(flipRows); dataStreamer[i]->SetFlipRows(flipRows);
dataStreamer[i]->SetNumberofPorts(numPorts); dataStreamer[i]->SetNumberofPorts(numPorts);
dataStreamer[i]->SetQuadEnable(quadEnable);
dataStreamer[i]->SetNumberofTotalFrames(numberOfTotalFrames); dataStreamer[i]->SetNumberofTotalFrames(numberOfTotalFrames);
dataStreamer[i]->SetReceiverROI( dataStreamer[i]->SetReceiverROI(
portRois[i].completeRoi() ? GetMaxROIPerPort() : portRois[i]); portRois[i].completeRoi() ? GetMaxROIPerPort() : portRois[i]);
@ -685,18 +689,26 @@ void Implementation::startReceiver() {
// callbacks // callbacks
if (startAcquisitionCallBack) { if (startAcquisitionCallBack) {
try { try {
std::size_t imageSize = std::vector<uint32_t> udpPort;
static_cast<uint32_t>(generalData->imageSize); for (size_t i = 0; i != listener.size(); ++i) {
startAcquisitionCallBack(filePath, fileName, fileIndex, imageSize, udpPort.push_back(udpPortNum[i]);
pStartAcquisition); }
startCallbackHeader callbackHeader = {
udpPort,
generalData->dynamicRange,
numPorts,
static_cast<size_t>(generalData->imageSize),
filePath,
fileName,
fileIndex,
quadEnable,
additionalJsonHeader};
startAcquisitionCallBack(callbackHeader, pStartAcquisition);
} catch (const std::exception &e) { } catch (const std::exception &e) {
std::ostringstream oss; std::ostringstream oss;
oss << "Start Acquisition Callback Error: " << e.what(); oss << "Start Acquisition Callback Error: " << e.what();
throw RuntimeError(oss.str()); throw RuntimeError(oss.str());
} }
if (rawDataReadyCallBack != nullptr) {
LOG(logINFO) << "Data Write has been defined externally";
}
} }
// processor->writer // processor->writer
@ -799,9 +811,19 @@ void Implementation::stopReceiver() {
// callback // callback
if (acquisitionFinishedCallBack) { if (acquisitionFinishedCallBack) {
try { try {
acquisitionFinishedCallBack( std::vector<uint32_t> udpPort;
(tot / generalData->numUDPInterfaces), std::vector<uint64_t> completeFramesCaught;
pAcquisitionFinished); std::vector<uint64_t> lastFrameIndexCaught;
for (size_t i = 0; i != listener.size(); ++i) {
udpPort.push_back(udpPortNum[i]);
completeFramesCaught.push_back(
listener[i]->GetNumCompleteFramesCaught());
lastFrameIndexCaught.push_back(
listener[i]->GetLastFrameIndexCaught());
}
endCallbackHeader callHeader = {udpPort, completeFramesCaught,
lastFrameIndexCaught};
acquisitionFinishedCallBack(callHeader, pAcquisitionFinished);
} catch (const std::exception &e) { } catch (const std::exception &e) {
// change status // change status
status = IDLE; status = IDLE;
@ -905,9 +927,9 @@ void Implementation::SetupWriter() {
os << filePath << "/" << fileName << "_d" os << filePath << "/" << fileName << "_d"
<< (modulePos * generalData->numUDPInterfaces + i); << (modulePos * generalData->numUDPInterfaces + i);
std::string fileNamePrefix = os.str(); std::string fileNamePrefix = os.str();
dataProcessor[i]->CreateFirstFiles( dataProcessor[i]->CreateFirstFiles(fileNamePrefix, fileIndex,
fileNamePrefix, fileIndex, overwriteEnable, silentMode, overwriteEnable, silentMode,
udpPortNum[i], numberOfTotalFrames, detectorDataStream[i]); detectorDataStream[i]);
} }
} catch (const RuntimeError &e) { } catch (const RuntimeError &e) {
shutDownUDPSockets(); shutDownUDPSockets();
@ -1003,8 +1025,7 @@ void Implementation::StartMasterWriter() {
(numPorts.x * numPorts.y) > 1) { (numPorts.x * numPorts.y) > 1) {
virtualFileName = dataProcessor[0]->CreateVirtualFile( virtualFileName = dataProcessor[0]->CreateVirtualFile(
filePath, fileName, fileIndex, overwriteEnable, silentMode, filePath, fileName, fileIndex, overwriteEnable, silentMode,
modulePos, numberOfTotalFrames, numPorts.x, numPorts.y, modulePos, numPorts.x, numPorts.y, &hdf5LibMutex);
&hdf5LibMutex);
} }
// link file in master // link file in master
if (masterFileWriteEnable) { if (masterFileWriteEnable) {
@ -1112,11 +1133,6 @@ void Implementation::setNumberofUDPInterfaces(const int n) {
it->registerCallBackRawDataReady(rawDataReadyCallBack, it->registerCallBackRawDataReady(rawDataReadyCallBack,
pRawDataReady); pRawDataReady);
} }
if (rawDataModifyReadyCallBack) {
for (const auto &it : dataProcessor)
it->registerCallBackRawDataModifyReady(
rawDataModifyReadyCallBack, pRawDataReady);
}
// test socket buffer size with current set up // test socket buffer size with current set up
setUDPSocketBufferSize(0); setUDPSocketBufferSize(0);
@ -1148,6 +1164,7 @@ uint16_t Implementation::getUDPPortNumber() const { return udpPortNum[0]; }
void Implementation::setUDPPortNumber(const uint16_t i) { void Implementation::setUDPPortNumber(const uint16_t i) {
udpPortNum[0] = i; udpPortNum[0] = i;
listener[0]->SetUdpPortNumber(i); listener[0]->SetUdpPortNumber(i);
dataProcessor[0]->SetUdpPortNumber(i);
LOG(logINFO) << "UDP Port Number[0]: " << udpPortNum[0]; LOG(logINFO) << "UDP Port Number[0]: " << udpPortNum[0];
} }
@ -1157,6 +1174,7 @@ void Implementation::setUDPPortNumber2(const uint16_t i) {
udpPortNum[1] = i; udpPortNum[1] = i;
if (listener.size() > 1) { if (listener.size() > 1) {
listener[1]->SetUdpPortNumber(i); listener[1]->SetUdpPortNumber(i);
dataProcessor[1]->SetUdpPortNumber(i);
} }
LOG(logINFO) << "UDP Port Number[1]: " << udpPortNum[1]; LOG(logINFO) << "UDP Port Number[1]: " << udpPortNum[1];
} }
@ -1278,6 +1296,9 @@ void Implementation::setAdditionalJsonHeader(
const std::map<std::string, std::string> &c) { const std::map<std::string, std::string> &c) {
additionalJsonHeader = c; additionalJsonHeader = c;
for (const auto &it : dataProcessor) {
it->SetAdditionalJsonHeader(c);
}
for (const auto &it : dataStreamer) { for (const auto &it : dataStreamer) {
it->SetAdditionalJsonHeader(c); it->SetAdditionalJsonHeader(c);
} }
@ -1320,6 +1341,9 @@ void Implementation::setAdditionalJsonParameter(const std::string &key,
LOG(logINFO) << "Adding additional json parameter (" << key << ") to " LOG(logINFO) << "Adding additional json parameter (" << key << ") to "
<< value; << value;
} }
for (const auto &it : dataProcessor) {
it->SetAdditionalJsonHeader(additionalJsonHeader);
}
for (const auto &it : dataStreamer) { for (const auto &it : dataStreamer) {
it->SetAdditionalJsonHeader(additionalJsonHeader); it->SetAdditionalJsonHeader(additionalJsonHeader);
} }
@ -1359,6 +1383,8 @@ void Implementation::updateTotalNumberOfFrames() {
} }
numberOfTotalFrames = numberOfTotalFrames =
numFrames * repeats * (int64_t)(numberOfAdditionalStorageCells + 1); numFrames * repeats * (int64_t)(numberOfAdditionalStorageCells + 1);
for (const auto &it : dataProcessor)
it->SetNumberofTotalFrames(numberOfTotalFrames);
for (const auto &it : dataStreamer) for (const auto &it : dataStreamer)
it->SetNumberofTotalFrames(numberOfTotalFrames); it->SetNumberofTotalFrames(numberOfTotalFrames);
if (numberOfTotalFrames == 0) { if (numberOfTotalFrames == 0) {
@ -1617,6 +1643,8 @@ bool Implementation::getFlipRows() const { return flipRows; }
void Implementation::setFlipRows(bool enable) { void Implementation::setFlipRows(bool enable) {
flipRows = enable; flipRows = enable;
for (const auto &it : dataProcessor)
it->SetFlipRows(flipRows);
for (const auto &it : dataStreamer) for (const auto &it : dataStreamer)
it->SetFlipRows(flipRows); it->SetFlipRows(flipRows);
LOG(logINFO) << "Flip Rows: " << flipRows; LOG(logINFO) << "Flip Rows: " << flipRows;
@ -1628,6 +1656,10 @@ void Implementation::setQuad(const bool b) {
if (quadEnable != b) { if (quadEnable != b) {
quadEnable = b; quadEnable = b;
setDetectorSize(numModules); setDetectorSize(numModules);
for (const auto &it : dataProcessor) {
it->SetQuadEnable(quadEnable);
it->SetFlipRows(flipRows);
}
for (const auto &it : dataStreamer) { for (const auto &it : dataStreamer) {
it->SetQuadEnable(quadEnable); it->SetQuadEnable(quadEnable);
it->SetFlipRows(flipRows); it->SetFlipRows(flipRows);
@ -1769,35 +1801,25 @@ void Implementation::setTransceiverEnableMask(uint32_t mask) {
* * * *
* ************************************************/ * ************************************************/
void Implementation::registerCallBackStartAcquisition( void Implementation::registerCallBackStartAcquisition(
int (*func)(const std::string &, const std::string &, uint64_t, size_t, int (*func)(const startCallbackHeader, void *), void *arg) {
void *),
void *arg) {
startAcquisitionCallBack = func; startAcquisitionCallBack = func;
pStartAcquisition = arg; pStartAcquisition = arg;
} }
void Implementation::registerCallBackAcquisitionFinished(void (*func)(uint64_t, void Implementation::registerCallBackAcquisitionFinished(
void *), void (*func)(const endCallbackHeader, void *), void *arg) {
void *arg) {
acquisitionFinishedCallBack = func; acquisitionFinishedCallBack = func;
pAcquisitionFinished = arg; pAcquisitionFinished = arg;
} }
void Implementation::registerCallBackRawDataReady( void Implementation::registerCallBackRawDataReady(
void (*func)(sls_receiver_header &, char *, size_t, void *), void *arg) { void (*func)(sls_receiver_header &, dataCallbackHeader, char *, size_t &,
void *),
void *arg) {
rawDataReadyCallBack = func; rawDataReadyCallBack = func;
pRawDataReady = arg; pRawDataReady = arg;
for (const auto &it : dataProcessor) for (const auto &it : dataProcessor)
it->registerCallBackRawDataReady(rawDataReadyCallBack, pRawDataReady); it->registerCallBackRawDataReady(rawDataReadyCallBack, pRawDataReady);
} }
void Implementation::registerCallBackRawDataModifyReady(
void (*func)(sls_receiver_header &, char *, size_t &, void *), void *arg) {
rawDataModifyReadyCallBack = func;
pRawDataReady = arg;
for (const auto &it : dataProcessor)
it->registerCallBackRawDataModifyReady(rawDataModifyReadyCallBack,
pRawDataReady);
}
} // namespace sls } // namespace sls

View File

@ -265,21 +265,16 @@ class Implementation : private virtual slsDetectorDefs {
* * * *
* ************************************************/ * ************************************************/
/** params: file path, file name, file index, image size */ /** params: file path, file name, file index, image size */
void registerCallBackStartAcquisition(int (*func)(const std::string &, void registerCallBackStartAcquisition(int (*func)(const startCallbackHeader,
const std::string &, void *),
uint64_t, size_t, void *),
void *arg); void *arg);
/** params: total frames caught */ /** params: total frames caught */
void registerCallBackAcquisitionFinished(void (*func)(uint64_t, void *), void registerCallBackAcquisitionFinished(
void *arg); void (*func)(const endCallbackHeader, void *), void *arg);
/** params: sls_receiver_header, pointer to data, image size */ /** params: sls_receiver_header, pointer to data, image size */
void registerCallBackRawDataReady(void (*func)(sls_receiver_header &, void registerCallBackRawDataReady(void (*func)(sls_receiver_header &,
char *, size_t, void *), dataCallbackHeader, char *,
void *arg); size_t &, void *),
/** params: sls_receiver_header, pointer to data, reference to image size */
void registerCallBackRawDataModifyReady(void (*func)(sls_receiver_header &,
char *, size_t &,
void *),
void *arg); void *arg);
private: private:
@ -379,15 +374,13 @@ class Implementation : private virtual slsDetectorDefs {
int ctbDbitOffset{0}; int ctbDbitOffset{0};
// callbacks // callbacks
int (*startAcquisitionCallBack)(const std::string &, const std::string &, int (*startAcquisitionCallBack)(const startCallbackHeader, void *){nullptr};
uint64_t, size_t, void *){nullptr};
void *pStartAcquisition{nullptr}; void *pStartAcquisition{nullptr};
void (*acquisitionFinishedCallBack)(uint64_t, void *){nullptr}; void (*acquisitionFinishedCallBack)(const endCallbackHeader,
void *){nullptr};
void *pAcquisitionFinished{nullptr}; void *pAcquisitionFinished{nullptr};
void (*rawDataReadyCallBack)(sls_receiver_header &, char *, size_t, void (*rawDataReadyCallBack)(sls_receiver_header &, dataCallbackHeader,
void *){nullptr}; char *, size_t &, void *){nullptr};
void (*rawDataModifyReadyCallBack)(sls_receiver_header &, char *, size_t &,
void *){nullptr};
void *pRawDataReady{nullptr}; void *pRawDataReady{nullptr};
// class objects // class objects

View File

@ -50,19 +50,40 @@ std::string getHelpMessage() {
* enabled) if registerCallBackRawDataReady or * enabled) if registerCallBackRawDataReady or
* registerCallBackRawDataModifyReady registered, users get data * registerCallBackRawDataModifyReady registered, users get data
*/ */
int StartAcq(const std::string &filePath, const std::string &fileName, int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader,
uint64_t fileIndex, size_t imageSize, void *objectPointer) { void *objectPointer) {
LOG(sls::logINFOBLUE) << "#### StartAcq: filePath:" << filePath LOG(sls::logINFOBLUE) << "#### Start Acquisition:"
<< " fileName:" << fileName << "\n\t["
<< " fileIndex:" << fileIndex << "\n\tUDP Port : "
<< " imageSize:" << imageSize << " ####"; << sls::ToString(callbackHeader.udpPort)
<< "\n\tDynamic Range : "
<< callbackHeader.dynamicRange
<< "\n\tDetector Shape : "
<< sls::ToString(callbackHeader.detectorShape)
<< "\n\tImage Size : " << callbackHeader.imageSize
<< "\n\tFile Path : " << callbackHeader.filePath
<< "\n\tFile Name : " << callbackHeader.fileName
<< "\n\tFile Index : " << callbackHeader.fileIndex
<< "\n\tQuad Enable : " << callbackHeader.quad
<< "\n\tAdditional Json Header : "
<< sls::ToString(callbackHeader.addJsonHeader)
<< "\n\t]";
return 0; return 0;
} }
/** Acquisition Finished Call back */ /** Acquisition Finished Call back */
void AcquisitionFinished(uint64_t framesCaught, void *objectPointer) { void AcquisitionFinished(
LOG(sls::logINFOBLUE) << "#### AcquisitionFinished: framesCaught:" const slsDetectorDefs::endCallbackHeader callbackHeader,
<< framesCaught << " ####"; void *objectPointer) {
LOG(sls::logINFOBLUE) << "#### AcquisitionFinished:"
<< "\n\t["
<< "\n\tUDP Port : "
<< sls::ToString(callbackHeader.udpPort)
<< "\n\tComplete Frames : "
<< sls::ToString(callbackHeader.completeFrames)
<< "\n\tLast Frame Index : "
<< sls::ToString(callbackHeader.lastFrameIndex)
<< "\n\t]";
} }
/** /**
@ -70,62 +91,61 @@ void AcquisitionFinished(uint64_t framesCaught, void *objectPointer) {
* Prints in different colors(for each receiver process) the different headers * Prints in different colors(for each receiver process) the different headers
* for each image call back. * for each image call back.
*/ */
void GetData(slsDetectorDefs::sls_receiver_header &header, char *dataPointer, void GetData(slsDetectorDefs::sls_receiver_header &header,
size_t imageSize, void *objectPointer) { slsDetectorDefs::dataCallbackHeader callbackHeader,
char *dataPointer, size_t &imageSize, void *objectPointer) {
slsDetectorDefs::sls_detector_header detectorHeader = header.detHeader; slsDetectorDefs::sls_detector_header detectorHeader = header.detHeader;
PRINT_IN_COLOR( PRINT_IN_COLOR(
detectorHeader.modId ? detectorHeader.modId : detectorHeader.row, (callbackHeader.udpPort % 10),
"#### %d %d GetData: ####\n" "#### GetData: "
"frameNumber: %lu\t\texpLength: %u\t\tpacketNumber: %u\t\tdetSpec1: %lu" "\n\tCallback Header: "
"\t\ttimestamp: %lu\t\tmodId: %u\t\t" "\n\t["
"row: %u\t\tcolumn: %u\t\tdetSpec2: %u\t\tdetSpec3: %u" "\n\tUDP Port: %u"
"\t\tdetSpec4: %u\t\tdetType: %u\t\tversion: %u" "\n\tShape: [%u, %u]"
//"\t\tpacketsMask:%s" "\n\tAcq Index : %lu"
"\t\tfirstbytedata: 0x%x\t\tdatsize: %zu\n\n", "\n\tFrame Index :%lu"
detectorHeader.column, detectorHeader.row, "\n\tProgress : %.2f%%"
(long unsigned int)detectorHeader.frameNumber, detectorHeader.expLength, "\n\tCompelte Image :%s"
detectorHeader.packetNumber, (long unsigned int)detectorHeader.detSpec1, "\n\tFlip Rows :%s"
(long unsigned int)detectorHeader.timestamp, detectorHeader.modId, "\n\tAdditional Json Header : %s"
detectorHeader.row, detectorHeader.column, detectorHeader.detSpec2, "\n\t]"
detectorHeader.detSpec3, detectorHeader.detSpec4, "\n\ttReceiver Header: "
detectorHeader.detType, detectorHeader.version, "\n\t["
"\n\tFrame Number : %lu"
"\n\tExposure Length :%u"
"\n\tPackets Caught :%u"
"\n\tDetector Specific 1: %lu"
"\n\tTimestamp : %lu"
"\n\tModule Id :%u"
"\n\tRow : %u"
"\n\tColumn :%u"
"\n\tDetector Specific 2 : %u"
"\n\tDetector Specific 3 : %u"
"\n\tDetector Specific 4 : %u"
"\n\tDetector Type : %s"
"\n\tVersion: %u"
"\n\t]"
"\n\tFirst Byte Data: 0x%x"
"\n\tImage Size: %zu\n\n",
callbackHeader.udpPort, callbackHeader.shape.x, callbackHeader.shape.y,
callbackHeader.acqIndex, callbackHeader.frameIndex,
callbackHeader.progress,
sls::ToString(callbackHeader.completeImage).c_str(),
sls::ToString(callbackHeader.flipRows).c_str(),
sls::ToString(callbackHeader.addJsonHeader).c_str(),
detectorHeader.frameNumber, detectorHeader.expLength,
detectorHeader.packetNumber, detectorHeader.detSpec1,
detectorHeader.timestamp, detectorHeader.modId, detectorHeader.row,
detectorHeader.column, detectorHeader.detSpec2, detectorHeader.detSpec3,
detectorHeader.detSpec4, sls::ToString(detectorHeader.detType).c_str(),
detectorHeader.version,
// header->packetsMask.to_string().c_str(), // header->packetsMask.to_string().c_str(),
((uint8_t)(*((uint8_t *)(dataPointer)))), imageSize); ((uint8_t)(*((uint8_t *)(dataPointer)))), imageSize);
}
/**
* Get Receiver Data Call back (modified)
* Prints in different colors(for each receiver process) the different headers
* for each image call back.
* @param modifiedImageSize new data size in bytes after the callback.
* This will be the size written/streamed. (only smaller value is allowed).
*/
void GetData(slsDetectorDefs::sls_receiver_header &header, char *dataPointer,
size_t &modifiedImageSize, void *objectPointer) {
slsDetectorDefs::sls_detector_header detectorHeader = header.detHeader;
PRINT_IN_COLOR(
detectorHeader.modId ? detectorHeader.modId : detectorHeader.row,
"#### %d %d GetData: ####\n"
"frameNumber: %lu\t\texpLength: %u\t\tpacketNumber: %u\t\tdetSpec1: %lu"
"\t\ttimestamp: %lu\t\tmodId: %u\t\t"
"row: %u\t\tcolumn: %u\t\tdetSpec2: %u\t\tdetSpec3: %u"
"\t\tdetSpec4: %u\t\tdetType: %u\t\tversion: %u"
//"\t\tpacketsMask:%s"
"\t\tfirstbytedata: 0x%x\t\tdatsize: %zu\n\n",
detectorHeader.column, detectorHeader.row,
(long unsigned int)detectorHeader.frameNumber, detectorHeader.expLength,
detectorHeader.packetNumber, (long unsigned int)detectorHeader.detSpec1,
(long unsigned int)detectorHeader.timestamp, detectorHeader.modId,
detectorHeader.row, detectorHeader.column, detectorHeader.detSpec2,
detectorHeader.detSpec3, detectorHeader.detSpec4,
detectorHeader.detType, detectorHeader.version,
// header->packetsMask.to_string().c_str(),
*reinterpret_cast<uint8_t *>(dataPointer), modifiedImageSize);
// if data is modified, eg ROI and size is reduced // if data is modified, eg ROI and size is reduced
modifiedImageSize = 26000; imageSize = 26000;
} }
/** /**
@ -214,8 +234,8 @@ int main(int argc, char *argv[]) {
throw; throw;
} }
/** - register callbacks. remember to set file write enable to 0 /** - register callbacks. remember to set file write enable to 0
(using the client) if we should not write files and you will write data * (using the client) if we should not write files and you will
using the callbacks */ * write data using the callbacks */
if (withCallback) { if (withCallback) {
/** - Call back for start acquisition */ /** - Call back for start acquisition */
@ -229,11 +249,7 @@ int main(int argc, char *argv[]) {
/* - Call back for raw data */ /* - Call back for raw data */
cprintf(BLUE, "Registering GetData() \n"); cprintf(BLUE, "Registering GetData() \n");
if (withCallback == 1)
receiver->registerCallBackRawDataReady(GetData, nullptr); receiver->registerCallBackRawDataReady(GetData, nullptr);
else if (withCallback == 2)
receiver->registerCallBackRawDataModifyReady(GetData,
nullptr);
} }
/** - as long as no Ctrl+C */ /** - as long as no Ctrl+C */

View File

@ -133,28 +133,21 @@ std::string Receiver::getReceiverVersion() {
return tcpipInterface->getReceiverVersion(); return tcpipInterface->getReceiverVersion();
} }
void Receiver::registerCallBackStartAcquisition(int (*func)(const std::string &, void Receiver::registerCallBackStartAcquisition(
const std::string &, int (*func)(const startCallbackHeader, void *), void *arg) {
uint64_t, size_t,
void *),
void *arg) {
tcpipInterface->registerCallBackStartAcquisition(func, arg); tcpipInterface->registerCallBackStartAcquisition(func, arg);
} }
void Receiver::registerCallBackAcquisitionFinished(void (*func)(uint64_t, void Receiver::registerCallBackAcquisitionFinished(
void *), void (*func)(const endCallbackHeader, void *), void *arg) {
void *arg) {
tcpipInterface->registerCallBackAcquisitionFinished(func, arg); tcpipInterface->registerCallBackAcquisitionFinished(func, arg);
} }
void Receiver::registerCallBackRawDataReady( void Receiver::registerCallBackRawDataReady(
void (*func)(sls_receiver_header &, char *, size_t, void *), void *arg) { void (*func)(sls_receiver_header &, const dataCallbackHeader, char *,
size_t &, void *),
void *arg) {
tcpipInterface->registerCallBackRawDataReady(func, arg); tcpipInterface->registerCallBackRawDataReady(func, arg);
} }
void Receiver::registerCallBackRawDataModifyReady(
void (*func)(sls_receiver_header &, char *, size_t &, void *), void *arg) {
tcpipInterface->registerCallBackRawDataModifyReady(func, arg);
}
} // namespace sls } // namespace sls

View File

@ -25,7 +25,9 @@
#include <chrono> #include <chrono>
#include <cstdint> #include <cstdint>
#include <cstring> #include <cstring>
#include <map>
#include <string> #include <string>
#include <vector>
#else #else
// C includes // C includes
#include <stdint.h> #include <stdint.h>
@ -113,6 +115,20 @@ class slsDetectorDefs {
STOPPED STOPPED
}; };
/**
dimension indexes
*/
enum dimension { X, Y };
#ifdef __cplusplus
struct xy {
int x{0};
int y{0};
xy() = default;
xy(int x, int y) : x(x), y(y){};
} __attribute__((packed));
#endif
/** /**
@short structure for a Detector Packet or Image Header @short structure for a Detector Packet or Image Header
Details at https://slsdetectorgroup.github.io/devdoc/udpheader.html Details at https://slsdetectorgroup.github.io/devdoc/udpheader.html
@ -160,6 +176,36 @@ class slsDetectorDefs {
sls_detector_header detHeader; /**< is the detector header */ sls_detector_header detHeader; /**< is the detector header */
sls_bitset packetsMask; /**< is the packets caught bit mask */ sls_bitset packetsMask; /**< is the packets caught bit mask */
}; };
struct startCallbackHeader {
std::vector<uint32_t> udpPort;
uint32_t dynamicRange;
xy detectorShape;
size_t imageSize;
std::string filePath;
std::string fileName;
uint64_t fileIndex;
bool quad;
std::map<std::string, std::string> addJsonHeader;
};
struct endCallbackHeader {
std::vector<uint32_t> udpPort;
std::vector<uint64_t> completeFrames;
std::vector<uint64_t> lastFrameIndex;
};
struct dataCallbackHeader {
uint32_t udpPort;
xy shape;
uint64_t acqIndex;
uint64_t frameIndex;
double progress;
bool completeImage;
bool flipRows;
std::map<std::string, std::string> addJsonHeader;
};
#endif #endif
enum frameDiscardPolicy { enum frameDiscardPolicy {
NO_DISCARD, NO_DISCARD,
@ -224,20 +270,6 @@ typedef struct {
READOUT_ZMQ_ACTION READOUT_ZMQ_ACTION
}; };
/**
dimension indexes
*/
enum dimension { X, Y };
#ifdef __cplusplus
struct xy {
int x{0};
int y{0};
xy() = default;
xy(int x, int y) : x(x), y(y){};
} __attribute__((packed));
#endif
/** /**
use of the external signals use of the external signals
*/ */