Change processing socket from PUB to PUSH

Since processing of live data can be done at 100Hz it would
be more difficult to stream the full rate with a PUB/SUB mode.

To support distributed live analysis we move to PUSH/PULL
model, since we can support only one live processing at a time
anyway (network constraints when working with large detectors).
This commit is contained in:
2020-07-15 10:29:21 +02:00
parent eb685aeea6
commit 6129b711ba
2 changed files with 19 additions and 4 deletions
+2
View File
@@ -8,4 +8,6 @@ namespace stream_config
const int STREAM_FASTQUEUE_SLOTS = 5;
// If the modules are offset more than 1000 pulses, crush.
const uint64_t PULSE_OFFSET_LIMIT = 1000;
// SNDHWM for live processing socket.
const int PROCESSING_ZMQ_SNDHWM = 10;
}
+17 -4
View File
@@ -1,9 +1,11 @@
#include "ZmqLiveSender.hpp"
#include "stream_config.hpp"
#include "zmq.h"
#include <stdexcept>
using namespace std;
using namespace stream_config;
LiveStreamConfig read_json_config(const std::string filename)
{
@@ -31,14 +33,25 @@ ZmqLiveSender::ZmqLiveSender(
config_(config)
{
// TODO: Set also LINGER and SNDHWM.
// 0mq sockets to streamvis and live analysis
socket_streamvis_ = zmq_socket(ctx, ZMQ_PUB);
if (zmq_bind(socket_streamvis_, config.streamvis_address.c_str()) != 0) {
throw runtime_error(zmq_strerror(errno));
}
socket_live_ = zmq_socket(ctx, ZMQ_PUB);
socket_live_ = zmq_socket(ctx, ZMQ_PUSH);
const int sndhwm = PROCESSING_ZMQ_SNDHWM;
if (zmq_setsockopt(
socket_live_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
const int linger = 0;
if (zmq_setsockopt(
socket_live_, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
if (zmq_bind(socket_live_, config.live_analysis_address.c_str()) != 0) {
throw runtime_error(zmq_strerror(errno));
}
@@ -184,7 +197,7 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data)
zmq_send(socket_live_,
text_header.c_str(),
text_header.size(),
ZMQ_SNDMORE);
ZMQ_SNDMORE | ZMQ_NOBLOCK);
if ( send_live_analysis == 0 ) {
zmq_send(socket_live_,