diff --git a/sf-stream/include/ZmqLiveSender.hpp b/sf-stream/include/ZmqLiveSender.hpp index 6db77fa..ee197d5 100644 --- a/sf-stream/include/ZmqLiveSender.hpp +++ b/sf-stream/include/ZmqLiveSender.hpp @@ -20,6 +20,7 @@ struct LiveStreamConfig { const std::string GAIN_FILENAME; const std::string DETECTOR_NAME; const int n_modules; + const std::string pulse_address; }; LiveStreamConfig read_json_config(const std::string filename); @@ -30,6 +31,7 @@ class ZmqLiveSender { void* socket_streamvis_; void* socket_live_; + void* socket_pulse_; public: ZmqLiveSender(void* ctx, diff --git a/sf-stream/include/stream_config.hpp b/sf-stream/include/stream_config.hpp index 411c8bf..ac28d86 100644 --- a/sf-stream/include/stream_config.hpp +++ b/sf-stream/include/stream_config.hpp @@ -10,4 +10,6 @@ namespace stream_config const uint64_t PULSE_OFFSET_LIMIT = 1000; // SNDHWM for live processing socket. const int PROCESSING_ZMQ_SNDHWM = 10; + // Keep the last second of pulses in the buffer. + const int PULSE_ZMQ_SNDHWM = 100; } diff --git a/sf-stream/src/ZmqLiveSender.cpp b/sf-stream/src/ZmqLiveSender.cpp index 716bb37..897c6c2 100644 --- a/sf-stream/src/ZmqLiveSender.cpp +++ b/sf-stream/src/ZmqLiveSender.cpp @@ -22,7 +22,8 @@ LiveStreamConfig read_json_config(const std::string filename) config_parameters["pedestal_file"].GetString(), config_parameters["gain_file"].GetString(), config_parameters["detector_name"].GetString(), - config_parameters["n_modules"].GetInt() + config_parameters["n_modules"].GetInt(), + "tcp://127.0.0.1:51234" }; } @@ -34,27 +35,51 @@ ZmqLiveSender::ZmqLiveSender( { // TODO: Set also LINGER and SNDHWM. socket_streamvis_ = zmq_socket(ctx, ZMQ_PUB); + if (zmq_bind(socket_streamvis_, config.streamvis_address.c_str()) != 0) { throw runtime_error(zmq_strerror(errno)); } - socket_live_ = zmq_socket(ctx, ZMQ_PUSH); + { + socket_live_ = zmq_socket(ctx, ZMQ_PUSH); - const int sndhwm = PROCESSING_ZMQ_SNDHWM; - if (zmq_setsockopt( - socket_live_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); + const int sndhwm = PROCESSING_ZMQ_SNDHWM; + if (zmq_setsockopt( + socket_live_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + const int linger = 0; + if (zmq_setsockopt( + socket_live_, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_bind(socket_live_, config.live_analysis_address.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } } - const int linger = 0; - if (zmq_setsockopt( - socket_live_, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); + { + socket_pulse_ = zmq_socket(ctx, ZMQ_PUB); + + if (zmq_bind(socket_pulse_, config.pulse_address.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + const int sndhwm = PULSE_ZMQ_SNDHWM; + if (zmq_setsockopt( + socket_pulse_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + const int linger = 0; + if (zmq_setsockopt( + socket_pulse_, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } } - if (zmq_bind(socket_live_, config.live_analysis_address.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } } ZmqLiveSender::~ZmqLiveSender() @@ -96,6 +121,10 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) } } + if(zmq_send(socket_pulse_, &pulse_id, sizeof(pulse_id), 0) == -1) { + throw runtime_error(zmq_strerror(errno)); + } + // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) header.AddMember("frame", frame_index, header_alloc);