diff --git a/sf-stream/include/stream_config.hpp b/sf-stream/include/stream_config.hpp index 8d8b18b..411c8bf 100644 --- a/sf-stream/include/stream_config.hpp +++ b/sf-stream/include/stream_config.hpp @@ -8,4 +8,6 @@ namespace stream_config const int STREAM_FASTQUEUE_SLOTS = 5; // If the modules are offset more than 1000 pulses, crush. const uint64_t PULSE_OFFSET_LIMIT = 1000; + // SNDHWM for live processing socket. + const int PROCESSING_ZMQ_SNDHWM = 10; } diff --git a/sf-stream/src/ZmqLiveSender.cpp b/sf-stream/src/ZmqLiveSender.cpp index 6538f64..2262892 100644 --- a/sf-stream/src/ZmqLiveSender.cpp +++ b/sf-stream/src/ZmqLiveSender.cpp @@ -1,9 +1,11 @@ #include "ZmqLiveSender.hpp" +#include "stream_config.hpp" #include "zmq.h" #include using namespace std; +using namespace stream_config; LiveStreamConfig read_json_config(const std::string filename) { @@ -31,14 +33,25 @@ ZmqLiveSender::ZmqLiveSender( config_(config) { // TODO: Set also LINGER and SNDHWM. - - // 0mq sockets to streamvis and live analysis socket_streamvis_ = zmq_socket(ctx, ZMQ_PUB); if (zmq_bind(socket_streamvis_, config.streamvis_address.c_str()) != 0) { throw runtime_error(zmq_strerror(errno)); } - socket_live_ = zmq_socket(ctx, ZMQ_PUB); + socket_live_ = zmq_socket(ctx, ZMQ_PUSH); + + const int sndhwm = PROCESSING_ZMQ_SNDHWM; + if (zmq_setsockopt( + socket_live_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + const int linger = 0; + if (zmq_setsockopt( + socket_live_, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + if (zmq_bind(socket_live_, config.live_analysis_address.c_str()) != 0) { throw runtime_error(zmq_strerror(errno)); } @@ -184,7 +197,7 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) zmq_send(socket_live_, text_header.c_str(), text_header.size(), - ZMQ_SNDMORE); + ZMQ_SNDMORE | ZMQ_NOBLOCK); if ( send_live_analysis == 0 ) { zmq_send(socket_live_,