Extend sf_buffer with streaming capabilities

This commit is contained in:
2020-05-04 09:16:49 +02:00
parent c001faac9a
commit 487efb3f01
+40 -1
View File
@@ -3,7 +3,7 @@
#include <RingBuffer.hpp>
#include <UdpRecvModule.hpp>
#include <FastH5Writer.hpp>
#include "zmq.h"
#include "buffer_config.hpp"
#include "jungfrau.hpp"
@@ -16,10 +16,12 @@ int main (int argc, char *argv[]) {
if (argc != 4) {
cout << endl;
cout << "Usage: sf_buffer [device_name] [udp_port] [root_folder]";
cout << "[source_id]";
cout << endl;
cout << "\tdevice_name: Name to write to disk.";
cout << "\tudp_port: UDP port to connect to." << endl;
cout << "\troot_folder: FS root folder." << endl;
cout << "\tsource_id: ID of the source for live stream." << endl;
cout << endl;
exit(-1);
@@ -28,6 +30,25 @@ int main (int argc, char *argv[]) {
string device_name = string(argv[1]);
int udp_port = atoi(argv[2]);
string root_folder = string(argv[3]);
int source_id = atoi(argv[2]);
stringstream ipc_stream;
ipc_stream << "ipc://sf-live-" << source_id;
const auto ipc_address = ipc_stream.str();
auto ctx = zmq_ctx_new();
auto socket = zmq_socket(ctx, ZMQ_PUB);
const int sndhwm = BUFFER_LIVE_SEND_HWM;
if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0)
throw runtime_error(strerror (errno));
const int linger_ms = 0;
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0)
throw runtime_error(strerror (errno));
if (zmq_connect(socket, ipc_address.c_str()) != 0)
throw runtime_error(strerror (errno));
RingBuffer<UdpFrameMetadata> ring_buffer(BUFFER_RB_SIZE);
@@ -78,6 +99,24 @@ int main (int argc, char *argv[]) {
"received_packets",
&(data.first->n_recv_packets));
ModuleFrame metadata = {
metadata.pulse_id,
metadata.frame_index,
metadata.daq_rec,
metadata.n_received_packets,
(uint16_t) source_id
};
zmq_send(socket,
&metadata,
sizeof(ModuleFrame),
ZMQ_SNDMORE);
zmq_send(socket,
data.second,
MODULE_N_BYTES,
0);
ring_buffer.release(data.first->buffer_slot_index);
// TODO: Make real statistics, please.