From 72d31d027e22c9915b2d761ccd68075785cd0288 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 11 May 2020 12:31:46 +0200 Subject: [PATCH] Rewrite sf_buffer to use BufferUdpReceiver --- sf-buffer/src/sf_buffer.cpp | 106 ++++-------------------------------- 1 file changed, 11 insertions(+), 95 deletions(-) diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index e4b5336..df88c63 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -3,104 +3,15 @@ #include #include #include -#include #include "zmq.h" #include "buffer_config.hpp" #include "jungfrau.hpp" +#include "BufferUdpReceiver.hpp" using namespace std; using namespace core_buffer; -inline void init_frame ( - ModuleFrame& frame_metadata, - jungfrau_packet& packet_buffer, - uint64_t source_id) -{ - frame_metadata.pulse_id = packet_buffer.bunchid; - frame_metadata.frame_index = packet_buffer.framenum; - frame_metadata.daq_rec = (uint64_t)packet_buffer.debug; - frame_metadata.module_id = source_id; -} - -inline void save_and_send( - BufferH5Writer& writer, - void* socket, - ModuleFrame *metadata, - char *data) -{ - // Write to file. - writer.set_pulse_id(metadata->pulse_id); - writer.write(metadata, data); - - // Live stream. - zmq_send(socket, metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); - zmq_send(socket, data, MODULE_N_BYTES, 0); -} - -inline void get_frame_from_udp( - UdpReceiver& udp_receiver, - ModuleFrame& metadata, - char *frame_buffer, - uint64_t source_id) -{ - static jungfrau_packet packet_buffer = {}; - - // Reset the metadata and frame buffer for the next frame. - metadata.pulse_id = 0; - metadata.n_received_packets = 0; - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - - // The buffer contains a valid packet. Use it. - if (packet_buffer.bunchid != 0) { - init_frame(metadata, packet_buffer, source_id); - - size_t frame_buffer_offset = - JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum; - memcpy( - (void*) (frame_buffer + frame_buffer_offset), - packet_buffer.data, - JUNGFRAU_DATA_BYTES_PER_PACKET); - - metadata.n_received_packets++; - } - - while (true) { - - if (!udp_receiver.receive( - &packet_buffer, - JUNGFRAU_BYTES_PER_PACKET)) { - continue; - } - - // First packet for this frame. - if (metadata.pulse_id == 0) { - init_frame(metadata, packet_buffer, source_id); - - // Happens if the last packet from the previous frame gets lost. - } else if (metadata.pulse_id != packet_buffer.bunchid) { - return; - } - - size_t frame_buffer_offset = - JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum; - memcpy( - (void*) (frame_buffer + frame_buffer_offset), - packet_buffer.data, - JUNGFRAU_DATA_BYTES_PER_PACKET); - - metadata.n_received_packets++; - - // Last frame packet received. Frame finished. - if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) - { - // Indicates that the packet has already been consumed. - packet_buffer.bunchid = 0; - return; - } - } -} - int main (int argc, char *argv[]) { if (argc != 5) { cout << endl; @@ -122,6 +33,7 @@ int main (int argc, char *argv[]) { int source_id = atoi(argv[4]); stringstream ipc_stream; + // TODO: Move this into config. ipc_stream << "ipc:///tmp/sf-live-" << source_id; const auto ipc_address = ipc_stream.str(); @@ -145,20 +57,22 @@ int main (int argc, char *argv[]) { uint64_t n_corrupted_frames = 0; uint64_t last_pulse_id = 0; - UdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - BufferH5Writer writer(device_name, root_folder); + BufferUdpReceiver receiver(source_id); + receiver.bind(udp_port); ModuleFrame metadata; auto frame_buffer = new char[MODULE_N_BYTES * JUNGFRAU_N_MODULES]; while (true) { - get_frame_from_udp(udp_receiver, metadata, frame_buffer, source_id); + receiver.get_frame_from_udp(metadata, frame_buffer); - save_and_send(writer, socket, &metadata, frame_buffer); + writer.set_pulse_id(metadata.pulse_id); + writer.write(&metadata, frame_buffer); + zmq_send(socket, &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); + zmq_send(socket, frame_buffer, MODULE_N_BYTES, 0); // TODO: Make real statistics, please. auto pulse_id = metadata.pulse_id; @@ -189,4 +103,6 @@ int main (int argc, char *argv[]) { n_missed_frames = 0; } } + + delete[] frame_buffer; }