From cd25fe50b8e05364488c263736af397282665e6d Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 11 May 2020 11:39:18 +0200 Subject: [PATCH] Rewrite logic to be function oriented --- sf-buffer/src/sf_buffer.cpp | 108 +++++++++++++++++++++++------------- 1 file changed, 70 insertions(+), 38 deletions(-) diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index b44aa60..b5183a3 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -38,6 +38,68 @@ inline void save_and_send( zmq_send(socket, data, MODULE_N_BYTES, 0); } +inline void get_frame_from_udp( + UdpReceiver& udp_receiver, + ModuleFrame& metadata, + jungfrau_packet& packet_buffer, + BufferH5Writer& writer, + uint64_t source_id, + char *frame_buffer + ) +{ + // Reset the metadata and frame buffer for the next frame. + metadata.pulse_id = 0; + metadata.n_received_packets = 0; + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + + // The buffer contains a valid packet. Use it. + if (packet_buffer.bunchid != 0) { + init_frame(metadata, packet_buffer, source_id); + + size_t frame_buffer_offset = + JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum; + memcpy( + (void*) (frame_buffer + frame_buffer_offset), + packet_buffer.data, + JUNGFRAU_DATA_BYTES_PER_PACKET); + + metadata.n_received_packets++; + } + + while (true) { + + if (!udp_receiver.receive( + &packet_buffer, + JUNGFRAU_BYTES_PER_PACKET)) { + continue; + } + + // First packet for this frame. + if (metadata.pulse_id == 0) { + init_frame(metadata, packet_buffer, source_id); + + // Happens if the last packet from the previous frame gets lost. + } else if (metadata.pulse_id != packet_buffer.bunchid) { + return; + } + + size_t frame_buffer_offset = + JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum; + memcpy( + (void*) (frame_buffer + frame_buffer_offset), + packet_buffer.data, + JUNGFRAU_DATA_BYTES_PER_PACKET); + + metadata.n_received_packets++; + + // Last frame packet received. Frame finished. + if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) + { + return; + } + } +} + int main (int argc, char *argv[]) { if (argc != 5) { cout << endl; @@ -88,50 +150,20 @@ int main (int argc, char *argv[]) { BufferH5Writer writer(device_name, root_folder); jungfrau_packet packet_buffer; + packet_buffer.bunchid = 0; + ModuleFrame metadata; auto frame_buffer = new char[MODULE_N_BYTES * JUNGFRAU_N_MODULES]; + metadata.pulse_id = 0; + metadata.n_received_packets = 0; + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + while (true) { - if (!udp_receiver.receive( - &packet_buffer, - JUNGFRAU_BYTES_PER_PACKET)) { - continue; - } - - // First packet for this frame. - if (metadata.pulse_id == 0) { - init_frame(metadata, packet_buffer, source_id); - - // Happens if the last packet from the previous frame gets lost. - } else if (metadata.pulse_id != packet_buffer.bunchid) { - save_and_send(writer, socket, &metadata, frame_buffer); - - metadata.pulse_id = 0; - metadata.n_received_packets = 0; - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - - init_frame(metadata, packet_buffer, source_id); - } - - size_t frame_buffer_offset = - JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum; - memcpy( - (void*) (frame_buffer + frame_buffer_offset), - packet_buffer.data, - JUNGFRAU_DATA_BYTES_PER_PACKET); - - metadata.n_received_packets++; - - // Last frame packet received. Frame finished. - if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) - { - save_and_send(writer, socket, &metadata, frame_buffer); - metadata.pulse_id = 0; - metadata.n_received_packets = 0; - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - } + get_frame_from_udp(); + save_and_send(writer, socket, &metadata, frame_buffer); // TODO: Make real statistics, please.