Dev/frame synchronizer (#968)

* skeleton of structure for callbacks and funcitons updated

* updated callback header structures and implemented in receiver

* fixed bugs

* minor

* formatting

* wip: draft of frame synchronizer, semaphores not done yet

* signal handler not affecting semaphore inside lambda function

* finally works with sem

* install targets cmake error fix

* removed modified callback and instead passing by reference instead of value to the oriignal receiver data callback

* reducing the number of data call backs. incoming from developer

* added json header to receiver start acquiistion call back

* WIP: of synchronisation (#969)

* WIP of synchronisation

* working so far if everything goes right

* added all information into json headers

* valid json

* allow frame synchronizer to have access to static libzmq when compiling on conda (libzeromq-devel not installed by default

* upto date with multirecieverapp for invalid arguments and help

* formatting

* remove warnings

* changes to print

* removed prints

* no need for print frames to be called

* minor

* commnet

* adding json header in start callback, imagesize in data callback and formatted

* startcallback returns an unused int (changed to exceptions and forgotten in last modification to callbacks likely)

* fixed sanitizer issues. 1 left for ctrl+C
- zmq_msg_t should be deleted, not freed. Same with the char arrays and semaphores.

* fixed sanitizer issues and made it more readable

* moving clearing old frames to new startacq just in case it has to process soem frames before the callback

* fix cherry-pick merge of fixing sanitizer thread issues but has start callbacks signature change.fixed

---------

Co-authored-by: Felix Engelmann <felix-github@nlogn.org>
This commit is contained in:
maliakal_d 2025-02-18 11:28:21 +01:00 committed by GitHub
parent f1f369b48c
commit e933a25453
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 714 additions and 35 deletions

View File

@ -249,7 +249,7 @@ endif()
if(SLS_USE_SANITIZER) if(SLS_USE_SANITIZER)
target_compile_options(slsProjectOptions INTERFACE -fsanitize=address,undefined -fno-omit-frame-pointer) target_compile_options(slsProjectOptions INTERFACE -fsanitize=address,undefined -fno-omit-frame-pointer)
target_link_libraries(slsProjectOptions INTERFACE -fsanitize=address,undefined) target_link_libraries(slsProjectOptions INTERFACE -fsanitize=address,undefined)
# target_compile_options(slsProjectOptions INTERFACE -fsanitize=thread) #target_compile_options(slsProjectOptions INTERFACE -fsanitize=thread -fno-omit-frame-pointer)
#target_link_libraries(slsProjectOptions INTERFACE -fsanitize=thread) #target_link_libraries(slsProjectOptions INTERFACE -fsanitize=thread)
endif() endif()

View File

@ -0,0 +1,19 @@
# SPDX-License-Identifier: LGPL-3.0-or-other
# Copyright (C) 2021 Contributors to the SLS Detector Package
# Script to get combined zmq packets from frame synchronizer using pull zmq sockets
import json
import zmq
c = zmq.Context()
s = c.socket(zmq.PULL)
s.connect("tcp://127.0.0.1:5555")
while True:
m = s.recv_multipart()
for p in m:
if p.startswith(b"{"):
print(p.decode().strip())
else:
print("binary")
print("--------")

View File

@ -138,7 +138,28 @@ if (SLS_USE_RECEIVER_BINARIES)
slsProjectWarnings slsProjectWarnings
) )
install(TARGETS slsReceiver slsMultiReceiver add_executable(slsFrameSynchronizer
src/FrameSynchronizerApp.cpp
)
set_target_properties(slsFrameSynchronizer PROPERTIES
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
)
if((CMAKE_BUILD_TYPE STREQUAL "Release") AND SLS_LTO_AVAILABLE)
set_property(TARGET slsFrameSynchronizer PROPERTY INTERPROCEDURAL_OPTIMIZATION True)
endif()
target_link_libraries(slsFrameSynchronizer
PUBLIC
slsReceiverStatic
pthread
rt
PRIVATE
slsProjectWarnings
"$<BUILD_INTERFACE:libzmq-static>"
)
install(TARGETS slsReceiver slsMultiReceiver slsFrameSynchronizer
EXPORT "${TARGETS_EXPORT_NAME}" EXPORT "${TARGETS_EXPORT_NAME}"
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}

View File

@ -45,9 +45,8 @@ class Receiver : private virtual slsDetectorDefs {
* Call back arguments are: * Call back arguments are:
* - startCallbackHeader metadata * - startCallbackHeader metadata
*/ */
void registerCallBackStartAcquisition(int (*func)(const startCallbackHeader, void registerCallBackStartAcquisition(
void *), void (*func)(const startCallbackHeader, void *), void *arg);
void *arg);
/** /**
* Call back for acquisition finished * Call back for acquisition finished

View File

@ -33,6 +33,8 @@ using Interface = ServerInterface;
#define gettid() syscall(SYS_gettid) #define gettid() syscall(SYS_gettid)
#endif #endif
std::mutex ClientInterface::callbackMutex;
ClientInterface::~ClientInterface() { ClientInterface::~ClientInterface() {
killTcpThread = true; killTcpThread = true;
LOG(logINFO) << "Shutting down TCP Socket on port " << portNumber; LOG(logINFO) << "Shutting down TCP Socket on port " << portNumber;
@ -54,13 +56,15 @@ std::string ClientInterface::getReceiverVersion() { return APIRECEIVER; }
/***callback functions***/ /***callback functions***/
void ClientInterface::registerCallBackStartAcquisition( void ClientInterface::registerCallBackStartAcquisition(
int (*func)(const startCallbackHeader, void *), void *arg) { void (*func)(const startCallbackHeader, void *), void *arg) {
std::lock_guard<std::mutex> lock(callbackMutex);
startAcquisitionCallBack = func; startAcquisitionCallBack = func;
pStartAcquisition = arg; pStartAcquisition = arg;
} }
void ClientInterface::registerCallBackAcquisitionFinished( void ClientInterface::registerCallBackAcquisitionFinished(
void (*func)(const endCallbackHeader, void *), void *arg) { void (*func)(const endCallbackHeader, void *), void *arg) {
std::lock_guard<std::mutex> lock(callbackMutex);
acquisitionFinishedCallBack = func; acquisitionFinishedCallBack = func;
pAcquisitionFinished = arg; pAcquisitionFinished = arg;
} }
@ -69,6 +73,7 @@ void ClientInterface::registerCallBackRawDataReady(
void (*func)(sls_receiver_header &, dataCallbackHeader, char *, size_t &, void (*func)(sls_receiver_header &, dataCallbackHeader, char *, size_t &,
void *), void *),
void *arg) { void *arg) {
std::lock_guard<std::mutex> lock(callbackMutex);
rawDataReadyCallBack = func; rawDataReadyCallBack = func;
pRawDataReady = arg; pRawDataReady = arg;
} }
@ -461,15 +466,18 @@ void ClientInterface::setDetectorType(detectorType arg) {
std::string(e.what()) + ']'); std::string(e.what()) + ']');
} }
// callbacks after (in setdetectortype, the object is reinitialized) // callbacks after (in setdetectortype, the object is reinitialized)
{
std::lock_guard<std::mutex> lock(callbackMutex);
if (startAcquisitionCallBack != nullptr) if (startAcquisitionCallBack != nullptr)
impl()->registerCallBackStartAcquisition(startAcquisitionCallBack, impl()->registerCallBackStartAcquisition(startAcquisitionCallBack,
pStartAcquisition); pStartAcquisition);
if (acquisitionFinishedCallBack != nullptr) if (acquisitionFinishedCallBack != nullptr)
impl()->registerCallBackAcquisitionFinished(acquisitionFinishedCallBack, impl()->registerCallBackAcquisitionFinished(
pAcquisitionFinished); acquisitionFinishedCallBack, pAcquisitionFinished);
if (rawDataReadyCallBack != nullptr) if (rawDataReadyCallBack != nullptr)
impl()->registerCallBackRawDataReady(rawDataReadyCallBack, impl()->registerCallBackRawDataReady(rawDataReadyCallBack,
pRawDataReady); pRawDataReady);
}
impl()->setThreadIds(parentThreadId, tcpThreadId); impl()->setThreadIds(parentThreadId, tcpThreadId);
} }

View File

@ -34,9 +34,8 @@ class ClientInterface : private virtual slsDetectorDefs {
//***callback functions*** //***callback functions***
/** params: file path, file name, file index, image size */ /** params: file path, file name, file index, image size */
void registerCallBackStartAcquisition(int (*func)(const startCallbackHeader, void registerCallBackStartAcquisition(
void *), void (*func)(const startCallbackHeader, void *), void *arg);
void *arg);
/** params: total frames caught */ /** params: total frames caught */
void registerCallBackAcquisitionFinished( void registerCallBackAcquisitionFinished(
@ -180,7 +179,7 @@ class ClientInterface : private virtual slsDetectorDefs {
//***callback parameters*** //***callback parameters***
int (*startAcquisitionCallBack)(const startCallbackHeader, void (*startAcquisitionCallBack)(const startCallbackHeader,
void *) = nullptr; void *) = nullptr;
void *pStartAcquisition{nullptr}; void *pStartAcquisition{nullptr};
void (*acquisitionFinishedCallBack)(const endCallbackHeader, void (*acquisitionFinishedCallBack)(const endCallbackHeader,
@ -194,6 +193,8 @@ class ClientInterface : private virtual slsDetectorDefs {
pid_t tcpThreadId{0}; pid_t tcpThreadId{0};
std::vector<std::string> udpips = std::vector<std::string> udpips =
std::vector<std::string>(MAX_NUMBER_OF_LISTENING_THREADS); std::vector<std::string>(MAX_NUMBER_OF_LISTENING_THREADS);
// necessary if Receiver objects using threads with callbacks
static std::mutex callbackMutex;
}; };
} // namespace sls } // namespace sls

View File

@ -0,0 +1,621 @@
// SPDX-License-Identifier: LGPL-3.0-or-other
// Copyright (C) 2021 Contributors to the SLS Detector Package
/* Creates the slsFrameSynchronizer for running multiple receivers in different
* threads form a single binary that will spit out zmq streams without
* reconstructing image. Sample python script for pull socket for this combiner
* in python/scripts folder. TODO: Not handling empty frames from one socket
*/
#include "sls/Receiver.h"
#include "sls/ToString.h"
#include "sls/container_utils.h"
#include "sls/logger.h"
#include "sls/sls_detector_defs.h"
#include <csignal> //SIGINT
#include <cstdio>
#include <cstring>
#include <iostream>
#include <mutex>
#include <ostream>
#include <semaphore.h>
#include <sys/socket.h>
#include <sys/wait.h> //wait
#include <thread>
#include <unistd.h>
#include <set>
#include <vector>
#include <zmq.h>
std::vector<std::thread> threads;
std::vector<sem_t *> semaphores;
sls::TLogLevel printHeadersLevel = sls::logDEBUG;
/** Define Colors to print data call back in different colors for different
* recievers */
#define PRINT_IN_COLOR(c, f, ...) \
printf("\033[%dm" f RESET, 30 + c + 1, ##__VA_ARGS__)
/** Structure handling different threads */
using ZmqMsgList = std::vector<zmq_msg_t *>;
using FrameMap = std::map<uint64_t, ZmqMsgList>;
using PortFrameMap = std::map<uint16_t, FrameMap>;
struct FrameStatus {
bool starting = true;
bool terminate = false;
int num_receivers;
sem_t available;
std::mutex mtx;
ZmqMsgList headers;
PortFrameMap frames;
ZmqMsgList ends;
FrameStatus(bool start, bool term, int num_recv)
: starting(start), terminate(term), num_receivers(num_recv) {
sem_init(&available, 0, 0);
}
};
FrameStatus *global_frame_status = nullptr;
/**
* Control+C Interrupt Handler
* to let all the processes know to exit properly
*/
void sigInterruptHandler(int p) {
for (size_t i = 0; i != semaphores.size(); ++i) {
sem_post(semaphores[i]);
}
}
void cleanup() {
if (global_frame_status) {
std::lock_guard<std::mutex> lock(global_frame_status->mtx);
for (auto &outer_pair : global_frame_status->frames) {
for (auto &inner_pair : outer_pair.second) {
for (zmq_msg_t *msg : inner_pair.second) {
if (msg) {
zmq_msg_close(msg);
delete msg;
}
}
inner_pair.second.clear();
}
outer_pair.second.clear();
}
global_frame_status->frames.clear();
}
}
/**
* prints usage of this example program
*/
std::string getHelpMessage() {
std::ostringstream os;
os << "\nUsage:\n"
"./slsFrameSynchronizer [start tcp port] [num recevers] [print "
"callback headers (optional)]\n"
<< "\t - tcp port has to be non-zero and 16 bit\n"
<< "\t - print callback headers option is 0 (disabled) by default\n";
return os.str();
}
void zmq_free(void *data, void *hint) { delete[] static_cast<char *>(data); }
void print_frames(const PortFrameMap &frame_port_map) {
LOG(sls::logDEBUG) << "Printing frames";
for (const auto &it : frame_port_map) {
uint16_t udpPort = it.first;
const auto &frame_map = it.second;
LOG(sls::logDEBUG) << "UDP port: " << udpPort;
for (const auto &frame : frame_map) {
uint64_t fnum = frame.first;
const auto &msg_list = frame.second;
LOG(sls::logDEBUG)
<< " acq index: " << fnum << '[' << msg_list.size() << ']';
}
}
}
/** Valid frame numbers mean they exist across all ports or
* has at least a larger fnum in the port with the missing fnum **/
std::set<uint64_t> get_valid_fnums(const PortFrameMap &port_frame_map) {
// empty list
std::set<uint64_t> valid_fnums;
if (port_frame_map.empty()) {
return valid_fnums;
}
// collect all unique frame numbers from all ports
std::set<uint64_t> unique_fnums;
for (auto it = port_frame_map.begin(); it != port_frame_map.begin(); ++it) {
const FrameMap &frame_map = it->second;
for (auto frame = frame_map.begin(); frame != frame_map.end();
++frame) {
unique_fnums.insert(frame->first);
}
}
// collect valid frame numbers
for (auto &fnum : unique_fnums) {
bool is_valid = true;
for (auto it = port_frame_map.begin(); it != port_frame_map.end();
++it) {
uint16_t port = it->first;
const FrameMap &frame_map = it->second;
auto frame = frame_map.find(fnum);
// invalid: fnum missing in one port
if (frame == frame_map.end()) {
LOG(sls::logDEBUG)
<< "Fnum " << fnum << " is missing in port " << port;
// invalid: fnum greater than all in that port
auto last_frame = std::prev(frame_map.end());
auto last_fnum = last_frame->first;
if (fnum > last_fnum) {
LOG(sls::logDEBUG) << "And no larger fnum found. Fnum "
<< fnum << " is invalid.\n";
is_valid = false;
break;
}
}
}
if (is_valid) {
valid_fnums.insert(fnum);
}
}
return valid_fnums;
}
int zmq_send_multipart(void *socket, const ZmqMsgList &messages) {
size_t num_messages = messages.size();
for (size_t i = 0; i != num_messages; ++i) {
zmq_msg_t *msg = messages[i];
// determine flags: ZMQ_SNDMORE for all messages except the last
int flags = (i == num_messages - 1) ? 0 : ZMQ_SNDMORE;
if (zmq_msg_send(msg, socket, flags) == -1) {
LOG(sls::logERROR)
<< "Error sending message: " << zmq_strerror(zmq_errno());
return slsDetectorDefs::FAIL;
}
}
return slsDetectorDefs::OK;
}
void Correlate(FrameStatus *stat) {
void *context = zmq_ctx_new();
void *socket = zmq_socket(context, ZMQ_PUSH);
int rc = zmq_bind(socket, "tcp://*:5555");
if (rc != 0) {
LOG(sls::logERROR) << "failed to bind";
}
while (true) {
sem_wait(&(stat->available));
{
std::lock_guard<std::mutex> lock(stat->mtx);
if (stat->terminate) {
break;
}
if (stat->starting) {
// sending all start packets
if (static_cast<int>(stat->headers.size()) ==
stat->num_receivers) {
stat->starting = false;
// clean up
zmq_send_multipart(socket, stat->headers);
for (zmq_msg_t *msg : stat->headers) {
if (msg) {
zmq_msg_close(msg);
delete msg;
}
}
stat->headers.clear();
}
} else {
// print_frames(stat->frames);
auto valid_fnums = get_valid_fnums(stat->frames);
// sending all valid fnum data packets
for (const auto &fnum : valid_fnums) {
ZmqMsgList msg_list;
PortFrameMap &port_frame_map = stat->frames;
for (auto it = port_frame_map.begin();
it != port_frame_map.end(); ++it) {
uint16_t port = it->first;
const FrameMap &frame_map = it->second;
auto frame = frame_map.find(fnum);
if (frame != frame_map.end()) {
msg_list.insert(msg_list.end(),
stat->frames[port][fnum].begin(),
stat->frames[port][fnum].end());
// clean up
for (zmq_msg_t *msg : stat->frames[port][fnum]) {
if (msg) {
zmq_msg_close(msg);
delete msg;
}
}
stat->frames[port].erase(fnum);
}
}
LOG(printHeadersLevel)
<< "Sending data packets for fnum " << fnum;
zmq_send_multipart(socket, msg_list);
}
}
// sending all end packets
if (static_cast<int>(stat->ends.size()) == stat->num_receivers) {
zmq_send_multipart(socket, stat->ends);
// clean up
for (zmq_msg_t *msg : stat->ends) {
if (msg) {
zmq_msg_close(msg);
delete msg;
}
}
stat->ends.clear();
}
}
}
zmq_close(socket);
zmq_ctx_destroy(context);
}
void StartAcquisitionCallback(
const slsDetectorDefs::startCallbackHeader callbackHeader,
void *objectPointer) {
LOG(printHeadersLevel)
<< "Start Acquisition:"
<< "\n\t["
<< "\n\tUDP Port : " << sls::ToString(callbackHeader.udpPort)
<< "\n\tDynamic Range : " << callbackHeader.dynamicRange
<< "\n\tDetector Shape : "
<< sls::ToString(callbackHeader.detectorShape)
<< "\n\tImage Size : " << callbackHeader.imageSize
<< "\n\tFile Path : " << callbackHeader.filePath
<< "\n\tFile Name : " << callbackHeader.fileName
<< "\n\tFile Index : " << callbackHeader.fileIndex
<< "\n\tQuad Enable : " << callbackHeader.quad
<< "\n\tAdditional Json Header : "
<< sls::ToString(callbackHeader.addJsonHeader).c_str() << "\n\t]";
std::ostringstream oss;
oss << "{\"htype\":\"header\""
<< ", \"udpPorts\":" << sls::ToString(callbackHeader.udpPort)
<< ", \"bitmode\":" << callbackHeader.dynamicRange
<< ", \"filePath\":\"" << callbackHeader.filePath
<< "\", \"fileName\":\"" << callbackHeader.fileName
<< "\", \"fileIndex\":" << callbackHeader.fileIndex
<< ", \"detshape\":" << sls::ToString(callbackHeader.detectorShape)
<< ", \"size\":" << callbackHeader.imageSize
<< ", \"quad\":" << (callbackHeader.quad ? 1 : 0);
if (!callbackHeader.addJsonHeader.empty()) {
oss << ", \"addJsonHeader\": {";
for (auto it = callbackHeader.addJsonHeader.begin();
it != callbackHeader.addJsonHeader.end(); ++it) {
if (it != callbackHeader.addJsonHeader.begin()) {
oss << ", ";
}
oss << "\"" << it->first.c_str() << "\":\"" << it->second.c_str()
<< "\"";
}
oss << " } ";
}
oss << "}\n";
std::string message = oss.str();
LOG(sls::logDEBUG) << "Start Acquisition message:" << std::endl << message;
int length = message.length();
char *hdata = new char[length];
memcpy(hdata, message.c_str(), length);
zmq_msg_t *hmsg = new zmq_msg_t;
zmq_msg_init_data(hmsg, hdata, length, zmq_free, NULL);
// push zmq msg into stat to be processed
FrameStatus *stat = static_cast<FrameStatus *>(objectPointer);
{
std::lock_guard<std::mutex> lock(stat->mtx);
stat->headers.push_back(hmsg);
stat->starting = true;
// clean up old frames
for (int port : callbackHeader.udpPort) {
for (auto &frame_map : stat->frames[port]) {
for (zmq_msg_t *msg : frame_map.second) {
if (msg) {
zmq_msg_close(msg);
delete msg;
}
}
frame_map.second.clear();
}
stat->frames[port].clear();
}
}
sem_post(&stat->available);
}
void AcquisitionFinishedCallback(
const slsDetectorDefs::endCallbackHeader callbackHeader,
void *objectPointer) {
LOG(printHeadersLevel) << "Acquisition Finished:"
<< "\n\t["
<< "\n\tUDP Port : "
<< sls::ToString(callbackHeader.udpPort)
<< "\n\tComplete Frames : "
<< sls::ToString(callbackHeader.completeFrames)
<< "\n\tLast Frame Index : "
<< sls::ToString(callbackHeader.lastFrameIndex)
<< "\n\t]";
std::ostringstream oss;
oss << "{\"htype\":\"series_end\""
<< ", \"udpPorts\":" << sls::ToString(callbackHeader.udpPort)
<< ", \"comleteFrames\":"
<< sls::ToString(callbackHeader.completeFrames)
<< ", \"lastFrameIndex\":"
<< sls::ToString(callbackHeader.lastFrameIndex) << "}\n";
std::string message = oss.str();
int length = message.length();
LOG(sls::logDEBUG) << "Acquisition Finished message:" << std::endl
<< message;
char *hdata = new char[length];
memcpy(hdata, message.c_str(), length);
zmq_msg_t *hmsg = new zmq_msg_t;
zmq_msg_init_data(hmsg, hdata, length, zmq_free, NULL);
// push zmq msg into stat to be processed
FrameStatus *stat = static_cast<FrameStatus *>(objectPointer);
{
std::lock_guard<std::mutex> lock(stat->mtx);
stat->ends.push_back(hmsg);
}
sem_post(&stat->available);
}
void GetDataCallback(slsDetectorDefs::sls_receiver_header &header,
slsDetectorDefs::dataCallbackHeader callbackHeader,
char *dataPointer, size_t &imageSize,
void *objectPointer) {
slsDetectorDefs::sls_detector_header detectorHeader = header.detHeader;
if (printHeadersLevel < sls::logDEBUG) {
// print in different color for each udp port
PRINT_IN_COLOR((callbackHeader.udpPort % 10),
"Data: "
"\n\tCallback Header: "
"\n\t["
"\n\tUDP Port: %u"
"\n\tShape: [%u, %u]"
"\n\tAcq Index : %lu"
"\n\tFrame Index :%lu"
"\n\tProgress : %.2f%%"
"\n\tComplete Image :%s"
"\n\tFlip Rows :%s"
"\n\tAdditional Json Header : %s"
"\n\t]"
"\n\ttReceiver Header: "
"\n\t["
"\n\tFrame Number : %lu"
"\n\tExposure Length :%u"
"\n\tPackets Caught :%u"
"\n\tDetector Specific 1: %lu"
"\n\tTimestamp : %lu"
"\n\tModule Id :%u"
"\n\tRow : %u"
"\n\tColumn :%u"
"\n\tDetector Specific 2 : %u"
"\n\tDetector Specific 3 : %u"
"\n\tDetector Specific 4 : %u"
"\n\tDetector Type : %s"
"\n\tVersion: %u"
"\n\t]"
"\n\tFirst Byte Data: 0x%x"
"\n\tImage Size: %zu\n\n",
callbackHeader.udpPort, callbackHeader.shape.x,
callbackHeader.shape.y, callbackHeader.acqIndex,
callbackHeader.frameIndex, callbackHeader.progress,
sls::ToString(callbackHeader.completeImage).c_str(),
sls::ToString(callbackHeader.flipRows).c_str(),
sls::ToString(callbackHeader.addJsonHeader).c_str(),
detectorHeader.frameNumber, detectorHeader.expLength,
detectorHeader.packetNumber, detectorHeader.detSpec1,
detectorHeader.timestamp, detectorHeader.modId,
detectorHeader.row, detectorHeader.column,
detectorHeader.detSpec2, detectorHeader.detSpec3,
detectorHeader.detSpec4,
sls::ToString(detectorHeader.detType).c_str(),
detectorHeader.version,
// header->packetsMask.to_string().c_str(),
((uint8_t)(*((uint8_t *)(dataPointer)))), imageSize);
}
std::ostringstream oss;
oss << "{\"htype\":\"module\""
<< ", \"port\":" << callbackHeader.udpPort
<< ", \"shape\":" << sls::ToString(callbackHeader.shape)
<< ", \"acqIndex\":" << callbackHeader.acqIndex
<< ", \"frameIndex\":" << callbackHeader.frameIndex
<< ", \"flipRows\":" << (callbackHeader.flipRows ? 1 : 0)
<< ", \"progress\":" << callbackHeader.progress
<< ", \"completeImage\":" << (callbackHeader.completeImage ? 1 : 0)
<< ", \"imageSize\":" << imageSize
<< ", \"frameNumber\":" << detectorHeader.frameNumber
<< ", \"expLength\":" << detectorHeader.expLength
<< ", \"packetNumber\":" << detectorHeader.packetNumber
<< ", \"detSpec1\":" << detectorHeader.detSpec1
<< ", \"timestamp\":" << detectorHeader.timestamp
<< ", \"modId\":" << detectorHeader.modId
<< ", \"row\":" << detectorHeader.row
<< ", \"column\":" << detectorHeader.column
<< ", \"detSpec2\":" << detectorHeader.detSpec2
<< ", \"detSpec3\":" << detectorHeader.detSpec3
<< ", \"detSpec4\":" << detectorHeader.detSpec4
<< ", \"detType\":" << static_cast<int>(detectorHeader.detType)
<< ", \"version\":" << static_cast<int>(detectorHeader.version);
if (!callbackHeader.addJsonHeader.empty()) {
oss << ", \"addJsonHeader\": {";
for (auto it = callbackHeader.addJsonHeader.begin();
it != callbackHeader.addJsonHeader.end(); ++it) {
if (it != callbackHeader.addJsonHeader.begin()) {
oss << ", ";
}
oss << "\"" << it->first.c_str() << "\":\"" << it->second.c_str()
<< "\"";
}
oss << " } ";
}
oss << "}\n";
std::string message = oss.str();
LOG(sls::logDEBUG) << "Data message:" << std::endl << message;
// creating header part of data packet
int length = message.length();
char *hdata = new char[length];
memcpy(hdata, message.c_str(), length);
zmq_msg_t *hmsg = new zmq_msg_t;
zmq_msg_init_data(hmsg, hdata, length, zmq_free, NULL);
// created data part of data packet
char *data = new char[imageSize];
zmq_msg_t *msg = new zmq_msg_t;
zmq_msg_init_data(msg, data, imageSize, zmq_free, NULL);
// push both parts into stat to be processed
FrameStatus *stat = static_cast<FrameStatus *>(objectPointer);
{
std::lock_guard<std::mutex> lock(stat->mtx);
stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber]
.push_back(hmsg);
stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber]
.push_back(msg);
}
sem_post(&stat->available);
}
/**
* Example of main program using the Receiver class
*
* - Defines in file for:
* - Default Number of receivers is 1
* - Default Start TCP port is 1954
*/
int main(int argc, char *argv[]) {
/** - set default values */
int numReceivers = 1;
uint16_t startTCPPort = DEFAULT_TCP_RX_PORTNO;
bool printHeaders = false;
/** - get number of receivers and start tcp port from command line
* arguments */
if (argc > 1) {
try {
if (argc == 3 || argc == 4) {
startTCPPort = sls::StringTo<uint16_t>(argv[1]);
if (startTCPPort == 0) {
throw std::runtime_error("Invalid start tcp port");
}
numReceivers = std::stoi(argv[2]);
if (numReceivers > 1024) {
cprintf(RED,
"Did you mix up the order of the arguments?\n%s\n",
getHelpMessage().c_str());
return EXIT_FAILURE;
}
if (numReceivers == 0) {
cprintf(RED, "Invalid number of receivers.\n%s\n",
getHelpMessage().c_str());
return EXIT_FAILURE;
}
if (argc == 4) {
printHeaders = sls::StringTo<bool>(argv[3]);
if (printHeaders) {
printHeadersLevel = sls::logINFOBLUE;
}
}
} else
throw std::runtime_error("Invalid number of arguments");
} catch (const std::exception &e) {
cprintf(RED, "Error: %s\n%s\n", e.what(), getHelpMessage().c_str());
return EXIT_FAILURE;
}
}
cprintf(RESET, "Number of Receivers: %d\n", numReceivers);
cprintf(RESET, "Start TCP Port: %hu\n", startTCPPort);
cprintf(RESET, "Print Callback Headers: %s\n\n",
(printHeaders ? "Enabled" : "Disabled"));
/** - Catch signal SIGINT to close files and call destructors properly */
struct sigaction sa;
sa.sa_flags = 0; // no flags
sa.sa_handler = sigInterruptHandler; // handler function
sigemptyset(&sa.sa_mask); // dont block additional signals during invocation
// of handler
if (sigaction(SIGINT, &sa, nullptr) == -1) {
cprintf(RED, "Could not set handler function for SIGINT\n");
}
/** - Ignore SIG_PIPE, prevents global signal handler, handle locally,
instead of a server crashing due to client crash when writing, it just
gives error */
struct sigaction asa;
asa.sa_flags = 0; // no flags
asa.sa_handler = SIG_IGN; // handler function
sigemptyset(&asa.sa_mask); // dont block additional signals during
// invocation of handler
if (sigaction(SIGPIPE, &asa, nullptr) == -1) {
cprintf(RED, "Could not set handler function for SIGPIPE\n");
}
FrameStatus stat{true, false, numReceivers};
// store pointer for signal handler
global_frame_status = &stat;
// thread synchronizing all packets
void *user_data = static_cast<void *>(&stat);
std::thread combinerThread(Correlate, &stat);
for (int i = 0; i != numReceivers; ++i) {
sem_t *semaphore = new sem_t;
sem_init(semaphore, 1, 0);
semaphores.push_back(semaphore);
uint16_t port = startTCPPort + i;
threads.emplace_back([i, semaphore, port, user_data]() {
sls::Receiver receiver(port);
receiver.registerCallBackStartAcquisition(StartAcquisitionCallback,
user_data);
receiver.registerCallBackAcquisitionFinished(
AcquisitionFinishedCallback, user_data);
receiver.registerCallBackRawDataReady(GetDataCallback, user_data);
/** - as long as no Ctrl+C */
sem_wait(semaphore);
sem_destroy(semaphore);
delete semaphore;
// clean up frames
if (i == 0)
cleanup();
});
}
for (auto &thread : threads) {
thread.join();
}
{
std::lock_guard<std::mutex> lock(stat.mtx);
stat.terminate = true;
sem_post(&stat.available);
}
combinerThread.join();
sem_destroy(&stat.available);
LOG(sls::logINFOBLUE) << "Goodbye!";
return EXIT_SUCCESS;
}

View File

@ -1802,7 +1802,7 @@ void Implementation::setTransceiverEnableMask(uint32_t mask) {
* * * *
* ************************************************/ * ************************************************/
void Implementation::registerCallBackStartAcquisition( void Implementation::registerCallBackStartAcquisition(
int (*func)(const startCallbackHeader, void *), void *arg) { void (*func)(const startCallbackHeader, void *), void *arg) {
startAcquisitionCallBack = func; startAcquisitionCallBack = func;
pStartAcquisition = arg; pStartAcquisition = arg;
} }

View File

@ -265,9 +265,8 @@ class Implementation : private virtual slsDetectorDefs {
* * * *
* ************************************************/ * ************************************************/
/** params: file path, file name, file index, image size */ /** params: file path, file name, file index, image size */
void registerCallBackStartAcquisition(int (*func)(const startCallbackHeader, void registerCallBackStartAcquisition(
void *), void (*func)(const startCallbackHeader, void *), void *arg);
void *arg);
/** params: total frames caught */ /** params: total frames caught */
void registerCallBackAcquisitionFinished( void registerCallBackAcquisitionFinished(
void (*func)(const endCallbackHeader, void *), void *arg); void (*func)(const endCallbackHeader, void *), void *arg);
@ -374,7 +373,8 @@ class Implementation : private virtual slsDetectorDefs {
int ctbDbitOffset{0}; int ctbDbitOffset{0};
// callbacks // callbacks
int (*startAcquisitionCallBack)(const startCallbackHeader, void *){nullptr}; void (*startAcquisitionCallBack)(const startCallbackHeader,
void *){nullptr};
void *pStartAcquisition{nullptr}; void *pStartAcquisition{nullptr};
void (*acquisitionFinishedCallBack)(const endCallbackHeader, void (*acquisitionFinishedCallBack)(const endCallbackHeader,
void *){nullptr}; void *){nullptr};

View File

@ -53,7 +53,7 @@ std::string getHelpMessage() {
* enabled) if registerCallBackRawDataReady or * enabled) if registerCallBackRawDataReady or
* registerCallBackRawDataModifyReady registered, users get data * registerCallBackRawDataModifyReady registered, users get data
*/ */
int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader, void StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader,
void *objectPointer) { void *objectPointer) {
LOG(sls::logINFOBLUE) << "#### Start Acquisition:" LOG(sls::logINFOBLUE) << "#### Start Acquisition:"
<< "\n\t[" << "\n\t["
@ -71,7 +71,6 @@ int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader,
<< "\n\tAdditional Json Header : " << "\n\tAdditional Json Header : "
<< sls::ToString(callbackHeader.addJsonHeader) << sls::ToString(callbackHeader.addJsonHeader)
<< "\n\t]"; << "\n\t]";
return 0;
} }
/** Acquisition Finished Call back */ /** Acquisition Finished Call back */

View File

@ -134,7 +134,7 @@ std::string Receiver::getReceiverVersion() {
} }
void Receiver::registerCallBackStartAcquisition( void Receiver::registerCallBackStartAcquisition(
int (*func)(const startCallbackHeader, void *), void *arg) { void (*func)(const startCallbackHeader, void *), void *arg) {
tcpipInterface->registerCallBackStartAcquisition(func, arg); tcpipInterface->registerCallBackStartAcquisition(func, arg);
} }

View File

@ -16,6 +16,8 @@ enum TLogLevel {
logINFOBLUE, logINFOBLUE,
logINFOGREEN, logINFOGREEN,
logINFORED, logINFORED,
logINFOCYAN,
logINFOMAGENTA,
logINFO, logINFO,
logDEBUG, logDEBUG,
logDEBUG1, logDEBUG1,
@ -60,16 +62,25 @@ class Logger {
// Danger this buffer need as many elements as TLogLevel // Danger this buffer need as many elements as TLogLevel
static const char *Color(TLogLevel level) noexcept { static const char *Color(TLogLevel level) noexcept {
static const char *const colors[] = { static const char *const colors[] = {
RED BOLD, YELLOW BOLD, BLUE, GREEN, RED, RESET, RED BOLD, YELLOW BOLD, BLUE, GREEN, RED, CYAN, MAGENTA,
RESET, RESET, RESET, RESET, RESET, RESET}; RESET, RESET, RESET, RESET, RESET, RESET, RESET};
// out of bounds
if (level < 0 || level >= sizeof(colors) / sizeof(colors[0])) {
return RESET;
}
return colors[level]; return colors[level];
} }
// Danger this buffer need as many elements as TLogLevel // Danger this buffer need as many elements as TLogLevel
static std::string ToString(TLogLevel level) { static std::string ToString(TLogLevel level) {
static const char *const buffer[] = { static const char *const buffer[] = {
"ERROR", "WARNING", "INFO", "INFO", "INFO", "INFO", "ERROR", "WARNING", "INFO", "INFO", "INFO",
"DEBUG", "DEBUG1", "DEBUG2", "DEBUG3", "DEBUG4", "DEBUG5"}; "INFO", "INFO", "INFO", "DEBUG", "DEBUG1",
"DEBUG2", "DEBUG3", "DEBUG4", "DEBUG5"};
// out of bounds
if (level < 0 || level >= sizeof(buffer) / sizeof(buffer[0])) {
return "UNKNOWN";
}
return buffer[level]; return buffer[level];
} }