From 487efb3f017b835edc7806146414bf6257966736 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 09:16:49 +0200 Subject: [PATCH] Extend sf_buffer with streaming capabilities --- sf-buffer/src/sf_buffer.cpp | 41 ++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index 5515f9f..8914968 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -3,7 +3,7 @@ #include #include #include - +#include "zmq.h" #include "buffer_config.hpp" #include "jungfrau.hpp" @@ -16,10 +16,12 @@ int main (int argc, char *argv[]) { if (argc != 4) { cout << endl; cout << "Usage: sf_buffer [device_name] [udp_port] [root_folder]"; + cout << "[source_id]"; cout << endl; cout << "\tdevice_name: Name to write to disk."; cout << "\tudp_port: UDP port to connect to." << endl; cout << "\troot_folder: FS root folder." << endl; + cout << "\tsource_id: ID of the source for live stream." << endl; cout << endl; exit(-1); @@ -28,6 +30,25 @@ int main (int argc, char *argv[]) { string device_name = string(argv[1]); int udp_port = atoi(argv[2]); string root_folder = string(argv[3]); + int source_id = atoi(argv[2]); + + stringstream ipc_stream; + ipc_stream << "ipc://sf-live-" << source_id; + const auto ipc_address = ipc_stream.str(); + + auto ctx = zmq_ctx_new(); + auto socket = zmq_socket(ctx, ZMQ_PUB); + + const int sndhwm = BUFFER_LIVE_SEND_HWM; + if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) + throw runtime_error(strerror (errno)); + + const int linger_ms = 0; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) + throw runtime_error(strerror (errno)); + + if (zmq_connect(socket, ipc_address.c_str()) != 0) + throw runtime_error(strerror (errno)); RingBuffer ring_buffer(BUFFER_RB_SIZE); @@ -78,6 +99,24 @@ int main (int argc, char *argv[]) { "received_packets", &(data.first->n_recv_packets)); + ModuleFrame metadata = { + metadata.pulse_id, + metadata.frame_index, + metadata.daq_rec, + metadata.n_received_packets, + (uint16_t) source_id + }; + + zmq_send(socket, + &metadata, + sizeof(ModuleFrame), + ZMQ_SNDMORE); + + zmq_send(socket, + data.second, + MODULE_N_BYTES, + 0); + ring_buffer.release(data.first->buffer_slot_index); // TODO: Make real statistics, please.