From 6129b711ba38d473e89b32875a0cb8e492ad10aa Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 15 Jul 2020 10:29:21 +0200 Subject: [PATCH] Change processing socket from PUB to PUSH Since processing of live data can be done at 100Hz it would be more difficult to stream the full rate with a PUB/SUB mode. To support distributed live analysis we move to PUSH/PULL model, since we can support only one live processing at a time anyway (network constraints when working with large detectors). --- sf-stream/include/stream_config.hpp | 2 ++ sf-stream/src/ZmqLiveSender.cpp | 21 +++++++++++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/sf-stream/include/stream_config.hpp b/sf-stream/include/stream_config.hpp index 8d8b18b..411c8bf 100644 --- a/sf-stream/include/stream_config.hpp +++ b/sf-stream/include/stream_config.hpp @@ -8,4 +8,6 @@ namespace stream_config const int STREAM_FASTQUEUE_SLOTS = 5; // If the modules are offset more than 1000 pulses, crush. const uint64_t PULSE_OFFSET_LIMIT = 1000; + // SNDHWM for live processing socket. + const int PROCESSING_ZMQ_SNDHWM = 10; } diff --git a/sf-stream/src/ZmqLiveSender.cpp b/sf-stream/src/ZmqLiveSender.cpp index 6538f64..2262892 100644 --- a/sf-stream/src/ZmqLiveSender.cpp +++ b/sf-stream/src/ZmqLiveSender.cpp @@ -1,9 +1,11 @@ #include "ZmqLiveSender.hpp" +#include "stream_config.hpp" #include "zmq.h" #include using namespace std; +using namespace stream_config; LiveStreamConfig read_json_config(const std::string filename) { @@ -31,14 +33,25 @@ ZmqLiveSender::ZmqLiveSender( config_(config) { // TODO: Set also LINGER and SNDHWM. - - // 0mq sockets to streamvis and live analysis socket_streamvis_ = zmq_socket(ctx, ZMQ_PUB); if (zmq_bind(socket_streamvis_, config.streamvis_address.c_str()) != 0) { throw runtime_error(zmq_strerror(errno)); } - socket_live_ = zmq_socket(ctx, ZMQ_PUB); + socket_live_ = zmq_socket(ctx, ZMQ_PUSH); + + const int sndhwm = PROCESSING_ZMQ_SNDHWM; + if (zmq_setsockopt( + socket_live_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + const int linger = 0; + if (zmq_setsockopt( + socket_live_, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + if (zmq_bind(socket_live_, config.live_analysis_address.c_str()) != 0) { throw runtime_error(zmq_strerror(errno)); } @@ -184,7 +197,7 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) zmq_send(socket_live_, text_header.c_str(), text_header.size(), - ZMQ_SNDMORE); + ZMQ_SNDMORE | ZMQ_NOBLOCK); if ( send_live_analysis == 0 ) { zmq_send(socket_live_,