mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-02 15:14:12 +02:00
Add pulse_id stream to streamer
The pulse_id stream can be used to synchronize components. For example, the writer can listen for the current pulse_id and write with 1 second delay to a file. This will help us eliminate the need to wait for the run to complete to start the data write request.
This commit is contained in:
@@ -20,6 +20,7 @@ struct LiveStreamConfig {
|
||||
const std::string GAIN_FILENAME;
|
||||
const std::string DETECTOR_NAME;
|
||||
const int n_modules;
|
||||
const std::string pulse_address;
|
||||
};
|
||||
|
||||
LiveStreamConfig read_json_config(const std::string filename);
|
||||
@@ -30,6 +31,7 @@ class ZmqLiveSender {
|
||||
|
||||
void* socket_streamvis_;
|
||||
void* socket_live_;
|
||||
void* socket_pulse_;
|
||||
|
||||
public:
|
||||
ZmqLiveSender(void* ctx,
|
||||
|
||||
@@ -10,4 +10,6 @@ namespace stream_config
|
||||
const uint64_t PULSE_OFFSET_LIMIT = 1000;
|
||||
// SNDHWM for live processing socket.
|
||||
const int PROCESSING_ZMQ_SNDHWM = 10;
|
||||
// Keep the last second of pulses in the buffer.
|
||||
const int PULSE_ZMQ_SNDHWM = 100;
|
||||
}
|
||||
|
||||
@@ -22,7 +22,8 @@ LiveStreamConfig read_json_config(const std::string filename)
|
||||
config_parameters["pedestal_file"].GetString(),
|
||||
config_parameters["gain_file"].GetString(),
|
||||
config_parameters["detector_name"].GetString(),
|
||||
config_parameters["n_modules"].GetInt()
|
||||
config_parameters["n_modules"].GetInt(),
|
||||
"tcp://127.0.0.1:51234"
|
||||
};
|
||||
}
|
||||
|
||||
@@ -34,27 +35,51 @@ ZmqLiveSender::ZmqLiveSender(
|
||||
{
|
||||
// TODO: Set also LINGER and SNDHWM.
|
||||
socket_streamvis_ = zmq_socket(ctx, ZMQ_PUB);
|
||||
|
||||
if (zmq_bind(socket_streamvis_, config.streamvis_address.c_str()) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
socket_live_ = zmq_socket(ctx, ZMQ_PUSH);
|
||||
{
|
||||
socket_live_ = zmq_socket(ctx, ZMQ_PUSH);
|
||||
|
||||
const int sndhwm = PROCESSING_ZMQ_SNDHWM;
|
||||
if (zmq_setsockopt(
|
||||
socket_live_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
const int sndhwm = PROCESSING_ZMQ_SNDHWM;
|
||||
if (zmq_setsockopt(
|
||||
socket_live_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
const int linger = 0;
|
||||
if (zmq_setsockopt(
|
||||
socket_live_, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
if (zmq_bind(socket_live_, config.live_analysis_address.c_str()) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
const int linger = 0;
|
||||
if (zmq_setsockopt(
|
||||
socket_live_, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
{
|
||||
socket_pulse_ = zmq_socket(ctx, ZMQ_PUB);
|
||||
|
||||
if (zmq_bind(socket_pulse_, config.pulse_address.c_str()) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
const int sndhwm = PULSE_ZMQ_SNDHWM;
|
||||
if (zmq_setsockopt(
|
||||
socket_pulse_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
const int linger = 0;
|
||||
if (zmq_setsockopt(
|
||||
socket_pulse_, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
if (zmq_bind(socket_live_, config.live_analysis_address.c_str()) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
ZmqLiveSender::~ZmqLiveSender()
|
||||
@@ -96,6 +121,10 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data)
|
||||
}
|
||||
}
|
||||
|
||||
if(zmq_send(socket_pulse_, &pulse_id, sizeof(pulse_id), 0) == -1) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
// TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
|
||||
|
||||
header.AddMember("frame", frame_index, header_alloc);
|
||||
|
||||
Reference in New Issue
Block a user