From 7494e418629543c60bdee3c4151c6167c3a553a0 Mon Sep 17 00:00:00 2001 From: Dhanya Thattil Date: Tue, 18 Feb 2025 11:19:00 +0100 Subject: [PATCH] 910/frame synchronizer (#1086) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * wip: draft of frame synchronizer, semaphores not done yet * signal handler not affecting semaphore inside lambda function * finally works with sem * install targets cmake error fix * reducing the number of data call backs. incoming from developer * WIP: of synchronisation (#969) * WIP of synchronisation * working so far if everything goes right * added all information into json headers * valid json * allow frame synchronizer to have access to static libzmq when compiling on conda (libzeromq-devel not installed by default * upto date with multirecieverapp for invalid arguments and help * formatting * remove warnings * changes to print * removed prints * no need for print frames to be called * minor * commnet * adding json header in start callback, imagesize in data callback and formatted * fixed sanitizer issues. 1 left for ctrl+C - zmq_msg_t should be deleted, not freed. Same with the char arrays and semaphores. * fixed sanitizer issues and made it more readable * moving clearing old frames to new startacq just in case it has to process soem frames before the callback * created a callback mutex to register the callbacks when using threads for different Receiver objects instead of child processes, remove the clean up of the frames (deleting from a signal is thread unsafe) from the siginterrupt handler, reading or setting terminate should also be inside the mutex, pass receiver object index so that only the first oneĀ  cleans up the shared structure --------- Co-authored-by: Felix Engelmann --- CMakeLists.txt | 4 +- python/scripts/frameSynchronizerPullSocket.py | 19 + slsReceiverSoftware/CMakeLists.txt | 25 +- slsReceiverSoftware/src/ClientInterface.cpp | 26 +- slsReceiverSoftware/src/ClientInterface.h | 2 + .../src/FrameSynchronizerApp.cpp | 622 ++++++++++++++++++ slsSupportLib/include/sls/logger.h | 19 +- 7 files changed, 700 insertions(+), 17 deletions(-) create mode 100644 python/scripts/frameSynchronizerPullSocket.py create mode 100644 slsReceiverSoftware/src/FrameSynchronizerApp.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1313c48bb..5676ec92d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -249,8 +249,8 @@ endif() if(SLS_USE_SANITIZER) target_compile_options(slsProjectOptions INTERFACE -fsanitize=address,undefined -fno-omit-frame-pointer) target_link_libraries(slsProjectOptions INTERFACE -fsanitize=address,undefined) - # target_compile_options(slsProjectOptions INTERFACE -fsanitize=thread) - # target_link_libraries(slsProjectOptions INTERFACE -fsanitize=thread) + #target_compile_options(slsProjectOptions INTERFACE -fsanitize=thread -fno-omit-frame-pointer) + #target_link_libraries(slsProjectOptions INTERFACE -fsanitize=thread) endif() diff --git a/python/scripts/frameSynchronizerPullSocket.py b/python/scripts/frameSynchronizerPullSocket.py new file mode 100644 index 000000000..4c13815ab --- /dev/null +++ b/python/scripts/frameSynchronizerPullSocket.py @@ -0,0 +1,19 @@ +# SPDX-License-Identifier: LGPL-3.0-or-other +# Copyright (C) 2021 Contributors to the SLS Detector Package +# Script to get combined zmq packets from frame synchronizer using pull zmq sockets +import json +import zmq + +c = zmq.Context() +s = c.socket(zmq.PULL) +s.connect("tcp://127.0.0.1:5555") + +while True: + m = s.recv_multipart() + for p in m: + if p.startswith(b"{"): + print(p.decode().strip()) + else: + print("binary") + print("--------") + diff --git a/slsReceiverSoftware/CMakeLists.txt b/slsReceiverSoftware/CMakeLists.txt index 8ff2b1e6b..543ff7467 100755 --- a/slsReceiverSoftware/CMakeLists.txt +++ b/slsReceiverSoftware/CMakeLists.txt @@ -138,13 +138,34 @@ if (SLS_USE_RECEIVER_BINARIES) slsProjectWarnings ) - install(TARGETS slsReceiver slsMultiReceiver + add_executable(slsFrameSynchronizer + src/FrameSynchronizerApp.cpp + ) + + set_target_properties(slsFrameSynchronizer PROPERTIES + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin + ) + if((CMAKE_BUILD_TYPE STREQUAL "Release") AND SLS_LTO_AVAILABLE) + set_property(TARGET slsFrameSynchronizer PROPERTY INTERPROCEDURAL_OPTIMIZATION True) + endif() + + target_link_libraries(slsFrameSynchronizer + PUBLIC + slsReceiverStatic + pthread + rt + PRIVATE + slsProjectWarnings + "$" + ) + + install(TARGETS slsReceiver slsMultiReceiver slsFrameSynchronizer EXPORT "${TARGETS_EXPORT_NAME}" RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/sls -) + ) endif(SLS_USE_RECEIVER_BINARIES) diff --git a/slsReceiverSoftware/src/ClientInterface.cpp b/slsReceiverSoftware/src/ClientInterface.cpp index ce8702151..4332970a5 100644 --- a/slsReceiverSoftware/src/ClientInterface.cpp +++ b/slsReceiverSoftware/src/ClientInterface.cpp @@ -33,6 +33,8 @@ using Interface = ServerInterface; #define gettid() syscall(SYS_gettid) #endif +std::mutex ClientInterface::callbackMutex; + ClientInterface::~ClientInterface() { killTcpThread = true; LOG(logINFO) << "Shutting down TCP Socket on port " << portNumber; @@ -55,12 +57,14 @@ std::string ClientInterface::getReceiverVersion() { return APIRECEIVER; } /***callback functions***/ void ClientInterface::registerCallBackStartAcquisition( int (*func)(const startCallbackHeader, void *), void *arg) { + std::lock_guard lock(callbackMutex); startAcquisitionCallBack = func; pStartAcquisition = arg; } void ClientInterface::registerCallBackAcquisitionFinished( void (*func)(const endCallbackHeader, void *), void *arg) { + std::lock_guard lock(callbackMutex); acquisitionFinishedCallBack = func; pAcquisitionFinished = arg; } @@ -69,6 +73,7 @@ void ClientInterface::registerCallBackRawDataReady( void (*func)(sls_receiver_header &, dataCallbackHeader, char *, size_t &, void *), void *arg) { + std::lock_guard lock(callbackMutex); rawDataReadyCallBack = func; pRawDataReady = arg; } @@ -461,15 +466,18 @@ void ClientInterface::setDetectorType(detectorType arg) { std::string(e.what()) + ']'); } // callbacks after (in setdetectortype, the object is reinitialized) - if (startAcquisitionCallBack != nullptr) - impl()->registerCallBackStartAcquisition(startAcquisitionCallBack, - pStartAcquisition); - if (acquisitionFinishedCallBack != nullptr) - impl()->registerCallBackAcquisitionFinished(acquisitionFinishedCallBack, - pAcquisitionFinished); - if (rawDataReadyCallBack != nullptr) - impl()->registerCallBackRawDataReady(rawDataReadyCallBack, - pRawDataReady); + { + std::lock_guard lock(callbackMutex); + if (startAcquisitionCallBack != nullptr) + impl()->registerCallBackStartAcquisition(startAcquisitionCallBack, + pStartAcquisition); + if (acquisitionFinishedCallBack != nullptr) + impl()->registerCallBackAcquisitionFinished( + acquisitionFinishedCallBack, pAcquisitionFinished); + if (rawDataReadyCallBack != nullptr) + impl()->registerCallBackRawDataReady(rawDataReadyCallBack, + pRawDataReady); + } impl()->setThreadIds(parentThreadId, tcpThreadId); } diff --git a/slsReceiverSoftware/src/ClientInterface.h b/slsReceiverSoftware/src/ClientInterface.h index 8bde90c64..e52fb415a 100644 --- a/slsReceiverSoftware/src/ClientInterface.h +++ b/slsReceiverSoftware/src/ClientInterface.h @@ -194,6 +194,8 @@ class ClientInterface : private virtual slsDetectorDefs { pid_t tcpThreadId{0}; std::vector udpips = std::vector(MAX_NUMBER_OF_LISTENING_THREADS); + // necessary if Receiver objects using threads with callbacks + static std::mutex callbackMutex; }; } // namespace sls diff --git a/slsReceiverSoftware/src/FrameSynchronizerApp.cpp b/slsReceiverSoftware/src/FrameSynchronizerApp.cpp new file mode 100644 index 000000000..fdf6a6778 --- /dev/null +++ b/slsReceiverSoftware/src/FrameSynchronizerApp.cpp @@ -0,0 +1,622 @@ +// SPDX-License-Identifier: LGPL-3.0-or-other +// Copyright (C) 2021 Contributors to the SLS Detector Package +/* Creates the slsFrameSynchronizer for running multiple receivers in different + * threads form a single binary that will spit out zmq streams without + * reconstructing image. Sample python script for pull socket for this combiner + * in python/scripts folder. TODO: Not handling empty frames from one socket + */ +#include "sls/Receiver.h" +#include "sls/ToString.h" +#include "sls/container_utils.h" +#include "sls/logger.h" +#include "sls/sls_detector_defs.h" + +#include //SIGINT +#include +#include +#include +#include +#include +#include +#include +#include //wait +#include +#include + +#include +#include +#include + +std::vector threads; +std::vector semaphores; +sls::TLogLevel printHeadersLevel = sls::logDEBUG; + +/** Define Colors to print data call back in different colors for different + * recievers */ +#define PRINT_IN_COLOR(c, f, ...) \ + printf("\033[%dm" f RESET, 30 + c + 1, ##__VA_ARGS__) + +/** Structure handling different threads */ +using ZmqMsgList = std::vector; +using FrameMap = std::map; +using PortFrameMap = std::map; +struct FrameStatus { + bool starting = true; + bool terminate = false; + int num_receivers; + sem_t available; + std::mutex mtx; + ZmqMsgList headers; + PortFrameMap frames; + ZmqMsgList ends; + + FrameStatus(bool start, bool term, int num_recv) + : starting(start), terminate(term), num_receivers(num_recv) { + sem_init(&available, 0, 0); + } +}; +FrameStatus *global_frame_status = nullptr; + +/** + * Control+C Interrupt Handler + * to let all the processes know to exit properly + */ +void sigInterruptHandler(int p) { + for (size_t i = 0; i != semaphores.size(); ++i) { + sem_post(semaphores[i]); + } +} + +void cleanup() { + if (global_frame_status) { + std::lock_guard lock(global_frame_status->mtx); + for (auto &outer_pair : global_frame_status->frames) { + for (auto &inner_pair : outer_pair.second) { + for (zmq_msg_t *msg : inner_pair.second) { + if (msg) { + zmq_msg_close(msg); + delete msg; + } + } + inner_pair.second.clear(); + } + outer_pair.second.clear(); + } + global_frame_status->frames.clear(); + } +} + +/** + * prints usage of this example program + */ +std::string getHelpMessage() { + std::ostringstream os; + os << "\nUsage:\n" + "./slsFrameSynchronizer [start tcp port] [num recevers] [print " + "callback headers (optional)]\n" + << "\t - tcp port has to be non-zero and 16 bit\n" + << "\t - print callback headers option is 0 (disabled) by default\n"; + return os.str(); +} + +void zmq_free(void *data, void *hint) { delete[] static_cast(data); } + +void print_frames(const PortFrameMap &frame_port_map) { + LOG(sls::logDEBUG) << "Printing frames"; + for (const auto &it : frame_port_map) { + uint16_t udpPort = it.first; + const auto &frame_map = it.second; + LOG(sls::logDEBUG) << "UDP port: " << udpPort; + for (const auto &frame : frame_map) { + uint64_t fnum = frame.first; + const auto &msg_list = frame.second; + LOG(sls::logDEBUG) + << " acq index: " << fnum << '[' << msg_list.size() << ']'; + } + } +} + +/** Valid frame numbers mean they exist across all ports or + * has at least a larger fnum in the port with the missing fnum **/ +std::set get_valid_fnums(const PortFrameMap &port_frame_map) { + // empty list + std::set valid_fnums; + if (port_frame_map.empty()) { + return valid_fnums; + } + + // collect all unique frame numbers from all ports + std::set unique_fnums; + for (auto it = port_frame_map.begin(); it != port_frame_map.begin(); ++it) { + const FrameMap &frame_map = it->second; + for (auto frame = frame_map.begin(); frame != frame_map.end(); + ++frame) { + unique_fnums.insert(frame->first); + } + } + + // collect valid frame numbers + for (auto &fnum : unique_fnums) { + bool is_valid = true; + for (auto it = port_frame_map.begin(); it != port_frame_map.end(); + ++it) { + uint16_t port = it->first; + const FrameMap &frame_map = it->second; + auto frame = frame_map.find(fnum); + // invalid: fnum missing in one port + if (frame == frame_map.end()) { + LOG(sls::logDEBUG) + << "Fnum " << fnum << " is missing in port " << port; + // invalid: fnum greater than all in that port + auto last_frame = std::prev(frame_map.end()); + auto last_fnum = last_frame->first; + if (fnum > last_fnum) { + LOG(sls::logDEBUG) << "And no larger fnum found. Fnum " + << fnum << " is invalid.\n"; + is_valid = false; + break; + } + } + } + if (is_valid) { + valid_fnums.insert(fnum); + } + } + + return valid_fnums; +} + +int zmq_send_multipart(void *socket, const ZmqMsgList &messages) { + size_t num_messages = messages.size(); + for (size_t i = 0; i != num_messages; ++i) { + zmq_msg_t *msg = messages[i]; + // determine flags: ZMQ_SNDMORE for all messages except the last + int flags = (i == num_messages - 1) ? 0 : ZMQ_SNDMORE; + if (zmq_msg_send(msg, socket, flags) == -1) { + LOG(sls::logERROR) + << "Error sending message: " << zmq_strerror(zmq_errno()); + return slsDetectorDefs::FAIL; + } + } + return slsDetectorDefs::OK; +} + +void Correlate(FrameStatus *stat) { + void *context = zmq_ctx_new(); + + void *socket = zmq_socket(context, ZMQ_PUSH); + int rc = zmq_bind(socket, "tcp://*:5555"); + if (rc != 0) { + LOG(sls::logERROR) << "failed to bind"; + } + + while (true) { + sem_wait(&(stat->available)); + { + std::lock_guard lock(stat->mtx); + + if (stat->terminate) { + break; + } + + if (stat->starting) { + // sending all start packets + if (static_cast(stat->headers.size()) == + stat->num_receivers) { + stat->starting = false; + // clean up + zmq_send_multipart(socket, stat->headers); + for (zmq_msg_t *msg : stat->headers) { + if (msg) { + zmq_msg_close(msg); + delete msg; + } + } + stat->headers.clear(); + } + } else { + // print_frames(stat->frames); + auto valid_fnums = get_valid_fnums(stat->frames); + // sending all valid fnum data packets + for (const auto &fnum : valid_fnums) { + ZmqMsgList msg_list; + PortFrameMap &port_frame_map = stat->frames; + for (auto it = port_frame_map.begin(); + it != port_frame_map.end(); ++it) { + uint16_t port = it->first; + const FrameMap &frame_map = it->second; + auto frame = frame_map.find(fnum); + if (frame != frame_map.end()) { + msg_list.insert(msg_list.end(), + stat->frames[port][fnum].begin(), + stat->frames[port][fnum].end()); + // clean up + for (zmq_msg_t *msg : stat->frames[port][fnum]) { + if (msg) { + zmq_msg_close(msg); + delete msg; + } + } + stat->frames[port].erase(fnum); + } + } + LOG(printHeadersLevel) + << "Sending data packets for fnum " << fnum; + zmq_send_multipart(socket, msg_list); + } + } + // sending all end packets + if (static_cast(stat->ends.size()) == stat->num_receivers) { + zmq_send_multipart(socket, stat->ends); + // clean up + for (zmq_msg_t *msg : stat->ends) { + if (msg) { + zmq_msg_close(msg); + delete msg; + } + } + stat->ends.clear(); + } + } + } + zmq_close(socket); + zmq_ctx_destroy(context); +} + +int StartAcquisitionCallback( + const slsDetectorDefs::startCallbackHeader callbackHeader, + void *objectPointer) { + LOG(printHeadersLevel) + << "Start Acquisition:" + << "\n\t[" + << "\n\tUDP Port : " << sls::ToString(callbackHeader.udpPort) + << "\n\tDynamic Range : " << callbackHeader.dynamicRange + << "\n\tDetector Shape : " + << sls::ToString(callbackHeader.detectorShape) + << "\n\tImage Size : " << callbackHeader.imageSize + << "\n\tFile Path : " << callbackHeader.filePath + << "\n\tFile Name : " << callbackHeader.fileName + << "\n\tFile Index : " << callbackHeader.fileIndex + << "\n\tQuad Enable : " << callbackHeader.quad + << "\n\tAdditional Json Header : " + << sls::ToString(callbackHeader.addJsonHeader).c_str() << "\n\t]"; + std::ostringstream oss; + oss << "{\"htype\":\"header\"" + << ", \"udpPorts\":" << sls::ToString(callbackHeader.udpPort) + << ", \"bitmode\":" << callbackHeader.dynamicRange + << ", \"filePath\":\"" << callbackHeader.filePath + << "\", \"fileName\":\"" << callbackHeader.fileName + << "\", \"fileIndex\":" << callbackHeader.fileIndex + << ", \"detshape\":" << sls::ToString(callbackHeader.detectorShape) + << ", \"size\":" << callbackHeader.imageSize + << ", \"quad\":" << (callbackHeader.quad ? 1 : 0); + + if (!callbackHeader.addJsonHeader.empty()) { + oss << ", \"addJsonHeader\": {"; + for (auto it = callbackHeader.addJsonHeader.begin(); + it != callbackHeader.addJsonHeader.end(); ++it) { + if (it != callbackHeader.addJsonHeader.begin()) { + oss << ", "; + } + oss << "\"" << it->first.c_str() << "\":\"" << it->second.c_str() + << "\""; + } + oss << " } "; + } + oss << "}\n"; + + std::string message = oss.str(); + LOG(sls::logDEBUG) << "Start Acquisition message:" << std::endl << message; + + int length = message.length(); + char *hdata = new char[length]; + memcpy(hdata, message.c_str(), length); + zmq_msg_t *hmsg = new zmq_msg_t; + zmq_msg_init_data(hmsg, hdata, length, zmq_free, NULL); + + // push zmq msg into stat to be processed + FrameStatus *stat = static_cast(objectPointer); + { + std::lock_guard lock(stat->mtx); + stat->headers.push_back(hmsg); + stat->starting = true; + // clean up old frames + for (int port : callbackHeader.udpPort) { + for (auto &frame_map : stat->frames[port]) { + for (zmq_msg_t *msg : frame_map.second) { + if (msg) { + zmq_msg_close(msg); + delete msg; + } + } + frame_map.second.clear(); + } + stat->frames[port].clear(); + } + } + sem_post(&stat->available); + return slsDetectorDefs::OK; // TODO: change return to void +} + +void AcquisitionFinishedCallback( + const slsDetectorDefs::endCallbackHeader callbackHeader, + void *objectPointer) { + LOG(printHeadersLevel) << "Acquisition Finished:" + << "\n\t[" + << "\n\tUDP Port : " + << sls::ToString(callbackHeader.udpPort) + << "\n\tComplete Frames : " + << sls::ToString(callbackHeader.completeFrames) + << "\n\tLast Frame Index : " + << sls::ToString(callbackHeader.lastFrameIndex) + << "\n\t]"; + std::ostringstream oss; + oss << "{\"htype\":\"series_end\"" + << ", \"udpPorts\":" << sls::ToString(callbackHeader.udpPort) + << ", \"comleteFrames\":" + << sls::ToString(callbackHeader.completeFrames) + << ", \"lastFrameIndex\":" + << sls::ToString(callbackHeader.lastFrameIndex) << "}\n"; + std::string message = oss.str(); + int length = message.length(); + LOG(sls::logDEBUG) << "Acquisition Finished message:" << std::endl + << message; + + char *hdata = new char[length]; + memcpy(hdata, message.c_str(), length); + zmq_msg_t *hmsg = new zmq_msg_t; + zmq_msg_init_data(hmsg, hdata, length, zmq_free, NULL); + + // push zmq msg into stat to be processed + FrameStatus *stat = static_cast(objectPointer); + { + std::lock_guard lock(stat->mtx); + stat->ends.push_back(hmsg); + } + sem_post(&stat->available); +} + +void GetDataCallback(slsDetectorDefs::sls_receiver_header &header, + slsDetectorDefs::dataCallbackHeader callbackHeader, + char *dataPointer, size_t &imageSize, + void *objectPointer) { + + slsDetectorDefs::sls_detector_header detectorHeader = header.detHeader; + + if (printHeadersLevel < sls::logDEBUG) { + // print in different color for each udp port + PRINT_IN_COLOR((callbackHeader.udpPort % 10), + "Data: " + "\n\tCallback Header: " + "\n\t[" + "\n\tUDP Port: %u" + "\n\tShape: [%u, %u]" + "\n\tAcq Index : %lu" + "\n\tFrame Index :%lu" + "\n\tProgress : %.2f%%" + "\n\tComplete Image :%s" + "\n\tFlip Rows :%s" + "\n\tAdditional Json Header : %s" + "\n\t]" + "\n\ttReceiver Header: " + "\n\t[" + "\n\tFrame Number : %lu" + "\n\tExposure Length :%u" + "\n\tPackets Caught :%u" + "\n\tDetector Specific 1: %lu" + "\n\tTimestamp : %lu" + "\n\tModule Id :%u" + "\n\tRow : %u" + "\n\tColumn :%u" + "\n\tDetector Specific 2 : %u" + "\n\tDetector Specific 3 : %u" + "\n\tDetector Specific 4 : %u" + "\n\tDetector Type : %s" + "\n\tVersion: %u" + "\n\t]" + "\n\tFirst Byte Data: 0x%x" + "\n\tImage Size: %zu\n\n", + callbackHeader.udpPort, callbackHeader.shape.x, + callbackHeader.shape.y, callbackHeader.acqIndex, + callbackHeader.frameIndex, callbackHeader.progress, + sls::ToString(callbackHeader.completeImage).c_str(), + sls::ToString(callbackHeader.flipRows).c_str(), + sls::ToString(callbackHeader.addJsonHeader).c_str(), + detectorHeader.frameNumber, detectorHeader.expLength, + detectorHeader.packetNumber, detectorHeader.detSpec1, + detectorHeader.timestamp, detectorHeader.modId, + detectorHeader.row, detectorHeader.column, + detectorHeader.detSpec2, detectorHeader.detSpec3, + detectorHeader.detSpec4, + sls::ToString(detectorHeader.detType).c_str(), + detectorHeader.version, + // header->packetsMask.to_string().c_str(), + ((uint8_t)(*((uint8_t *)(dataPointer)))), imageSize); + } + std::ostringstream oss; + oss << "{\"htype\":\"module\"" + << ", \"port\":" << callbackHeader.udpPort + << ", \"shape\":" << sls::ToString(callbackHeader.shape) + << ", \"acqIndex\":" << callbackHeader.acqIndex + << ", \"frameIndex\":" << callbackHeader.frameIndex + << ", \"flipRows\":" << (callbackHeader.flipRows ? 1 : 0) + << ", \"progress\":" << callbackHeader.progress + << ", \"completeImage\":" << (callbackHeader.completeImage ? 1 : 0) + << ", \"imageSize\":" << imageSize + << ", \"frameNumber\":" << detectorHeader.frameNumber + << ", \"expLength\":" << detectorHeader.expLength + << ", \"packetNumber\":" << detectorHeader.packetNumber + << ", \"detSpec1\":" << detectorHeader.detSpec1 + << ", \"timestamp\":" << detectorHeader.timestamp + << ", \"modId\":" << detectorHeader.modId + << ", \"row\":" << detectorHeader.row + << ", \"column\":" << detectorHeader.column + << ", \"detSpec2\":" << detectorHeader.detSpec2 + << ", \"detSpec3\":" << detectorHeader.detSpec3 + << ", \"detSpec4\":" << detectorHeader.detSpec4 + << ", \"detType\":" << static_cast(detectorHeader.detType) + << ", \"version\":" << static_cast(detectorHeader.version); + + if (!callbackHeader.addJsonHeader.empty()) { + oss << ", \"addJsonHeader\": {"; + for (auto it = callbackHeader.addJsonHeader.begin(); + it != callbackHeader.addJsonHeader.end(); ++it) { + if (it != callbackHeader.addJsonHeader.begin()) { + oss << ", "; + } + oss << "\"" << it->first.c_str() << "\":\"" << it->second.c_str() + << "\""; + } + oss << " } "; + } + oss << "}\n"; + std::string message = oss.str(); + LOG(sls::logDEBUG) << "Data message:" << std::endl << message; + + // creating header part of data packet + int length = message.length(); + char *hdata = new char[length]; + memcpy(hdata, message.c_str(), length); + zmq_msg_t *hmsg = new zmq_msg_t; + zmq_msg_init_data(hmsg, hdata, length, zmq_free, NULL); + + // created data part of data packet + char *data = new char[imageSize]; + zmq_msg_t *msg = new zmq_msg_t; + zmq_msg_init_data(msg, data, imageSize, zmq_free, NULL); + + // push both parts into stat to be processed + FrameStatus *stat = static_cast(objectPointer); + { + std::lock_guard lock(stat->mtx); + stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber] + .push_back(hmsg); + stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber] + .push_back(msg); + } + sem_post(&stat->available); +} + +/** + * Example of main program using the Receiver class + * + * - Defines in file for: + * - Default Number of receivers is 1 + * - Default Start TCP port is 1954 + */ +int main(int argc, char *argv[]) { + + /** - set default values */ + int numReceivers = 1; + uint16_t startTCPPort = DEFAULT_TCP_RX_PORTNO; + bool printHeaders = false; + + /** - get number of receivers and start tcp port from command line + * arguments */ + if (argc > 1) { + try { + if (argc == 3 || argc == 4) { + startTCPPort = sls::StringTo(argv[1]); + if (startTCPPort == 0) { + throw std::runtime_error("Invalid start tcp port"); + } + numReceivers = std::stoi(argv[2]); + if (numReceivers > 1024) { + cprintf(RED, + "Did you mix up the order of the arguments?\n%s\n", + getHelpMessage().c_str()); + return EXIT_FAILURE; + } + if (numReceivers == 0) { + cprintf(RED, "Invalid number of receivers.\n%s\n", + getHelpMessage().c_str()); + return EXIT_FAILURE; + } + if (argc == 4) { + printHeaders = sls::StringTo(argv[3]); + if (printHeaders) { + printHeadersLevel = sls::logINFOBLUE; + } + } + } else + throw std::runtime_error("Invalid number of arguments"); + } catch (const std::exception &e) { + cprintf(RED, "Error: %s\n%s\n", e.what(), getHelpMessage().c_str()); + return EXIT_FAILURE; + } + } + + cprintf(RESET, "Number of Receivers: %d\n", numReceivers); + cprintf(RESET, "Start TCP Port: %hu\n", startTCPPort); + cprintf(RESET, "Print Callback Headers: %s\n\n", + (printHeaders ? "Enabled" : "Disabled")); + + /** - Catch signal SIGINT to close files and call destructors properly */ + struct sigaction sa; + sa.sa_flags = 0; // no flags + sa.sa_handler = sigInterruptHandler; // handler function + sigemptyset(&sa.sa_mask); // dont block additional signals during invocation + // of handler + if (sigaction(SIGINT, &sa, nullptr) == -1) { + cprintf(RED, "Could not set handler function for SIGINT\n"); + } + + /** - Ignore SIG_PIPE, prevents global signal handler, handle locally, + instead of a server crashing due to client crash when writing, it just + gives error */ + struct sigaction asa; + asa.sa_flags = 0; // no flags + asa.sa_handler = SIG_IGN; // handler function + sigemptyset(&asa.sa_mask); // dont block additional signals during + // invocation of handler + if (sigaction(SIGPIPE, &asa, nullptr) == -1) { + cprintf(RED, "Could not set handler function for SIGPIPE\n"); + } + + FrameStatus stat{true, false, numReceivers}; + // store pointer for signal handler + global_frame_status = &stat; + + // thread synchronizing all packets + void *user_data = static_cast(&stat); + std::thread combinerThread(Correlate, &stat); + + for (int i = 0; i != numReceivers; ++i) { + sem_t *semaphore = new sem_t; + sem_init(semaphore, 1, 0); + semaphores.push_back(semaphore); + + uint16_t port = startTCPPort + i; + threads.emplace_back([i, semaphore, port, user_data]() { + sls::Receiver receiver(port); + receiver.registerCallBackStartAcquisition(StartAcquisitionCallback, + user_data); + receiver.registerCallBackAcquisitionFinished( + AcquisitionFinishedCallback, user_data); + receiver.registerCallBackRawDataReady(GetDataCallback, user_data); + /** - as long as no Ctrl+C */ + sem_wait(semaphore); + sem_destroy(semaphore); + delete semaphore; + + // clean up frames + if (i == 0) + cleanup(); + }); + } + + for (auto &thread : threads) { + thread.join(); + } + + { + std::lock_guard lock(stat.mtx); + stat.terminate = true; + sem_post(&stat.available); + } + combinerThread.join(); + sem_destroy(&stat.available); + + LOG(sls::logINFOBLUE) << "Goodbye!"; + return EXIT_SUCCESS; +} diff --git a/slsSupportLib/include/sls/logger.h b/slsSupportLib/include/sls/logger.h index 437f7e905..9b949d802 100644 --- a/slsSupportLib/include/sls/logger.h +++ b/slsSupportLib/include/sls/logger.h @@ -16,6 +16,8 @@ enum TLogLevel { logINFOBLUE, logINFOGREEN, logINFORED, + logINFOCYAN, + logINFOMAGENTA, logINFO, logDEBUG, logDEBUG1, @@ -60,16 +62,25 @@ class Logger { // Danger this buffer need as many elements as TLogLevel static const char *Color(TLogLevel level) noexcept { static const char *const colors[] = { - RED BOLD, YELLOW BOLD, BLUE, GREEN, RED, RESET, - RESET, RESET, RESET, RESET, RESET, RESET}; + RED BOLD, YELLOW BOLD, BLUE, GREEN, RED, CYAN, MAGENTA, + RESET, RESET, RESET, RESET, RESET, RESET, RESET}; + // out of bounds + if (level < 0 || level >= sizeof(colors) / sizeof(colors[0])) { + return RESET; + } return colors[level]; } // Danger this buffer need as many elements as TLogLevel static std::string ToString(TLogLevel level) { static const char *const buffer[] = { - "ERROR", "WARNING", "INFO", "INFO", "INFO", "INFO", - "DEBUG", "DEBUG1", "DEBUG2", "DEBUG3", "DEBUG4", "DEBUG5"}; + "ERROR", "WARNING", "INFO", "INFO", "INFO", + "INFO", "INFO", "INFO", "DEBUG", "DEBUG1", + "DEBUG2", "DEBUG3", "DEBUG4", "DEBUG5"}; + // out of bounds + if (level < 0 || level >= sizeof(buffer) / sizeof(buffer[0])) { + return "UNKNOWN"; + } return buffer[level]; }