From c3de2ccb406b040e99b0feeb50b26f825d8adbe9 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 20 Apr 2020 19:55:17 +0200 Subject: [PATCH] New single threaded sf_buffer --- sf-buffer/src/sf_buffer.cpp | 131 ++++++++++++++++++++++++++++-------- 1 file changed, 103 insertions(+), 28 deletions(-) diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index d4ec12f..80b0820 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -3,14 +3,40 @@ #include #include #include +#include #include "config.hpp" #include "jungfrau.hpp" #include "BufferUtils.hpp" - using namespace std; +void write_frame( + FastH5Writer &writer, + const uint64_t pulse_id, + const UdpFrameMetadata* frame_metadata, + const char* frame_buffer) +{ + writer.set_pulse_id(pulse_id); + + writer.write_data(frame_buffer); + + writer.write_scalar_metadata( + "pulse_id", &(frame_metadata->pulse_id)); + + writer.write_scalar_metadata( + "frame_id", + &(frame_metadata->frame_index)); + + writer.write_scalar_metadata( + "daq_rec", + &(frame_metadata->daq_rec)); + + writer.write_scalar_metadata( + "received_packets", + &(frame_metadata->n_recv_packets)); +} + int main (int argc, char *argv[]) { if (argc != 4) { @@ -47,42 +73,91 @@ int main (int argc, char *argv[]) { writer.add_scalar_metadata("daq_rec"); writer.add_scalar_metadata("received_packets"); - while (true) { - auto data = ring_buffer.read(); + jungfrau_packet packet_buffer; + UdpReceiver udp_receiver; + udp_receiver.bind(udp_port); - if (data.first == nullptr) { - this_thread::sleep_for(chrono::milliseconds(10)); - continue; + char* previous_frame_buffer = new char[2*512*1024]; + memset(previous_frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + + UdpFrameMetadata previous_metadata; + previous_metadata.pulse_id = 0; + previous_metadata.n_recv_packets = 0; + previous_metadata.daq_rec = 0; + previous_metadata.n_recv_packets = 0; + + char* current_frame_buffer = new char[2*512*1024]; + memset(current_frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + + UdpFrameMetadata current_metadata; + current_metadata.pulse_id = 0; + current_metadata.n_recv_packets = 0; + current_metadata.daq_rec = 0; + current_metadata.n_recv_packets = 0; + + while (true) { + + while (true) { + + if (!udp_receiver.receive( + &packet_buffer, + JUNGFRAU_BYTES_PER_PACKET)) { + continue; + } + + if (current_metadata.pulse_id != packet_buffer.bunchid) { + if (current_metadata.pulse_id != 0) { + // Commit + previous_metadata = current_metadata; + swap(previous_frame_buffer, current_frame_buffer); + } + + // Init current_metadata + current_metadata.frame_index = packet_buffer.framenum; + current_metadata.pulse_id = packet_buffer.bunchid; + current_metadata.daq_rec = packet_buffer.debug; + current_metadata.n_recv_packets = 0; + memset(current_frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + } + + size_t frame_buffer_offset = + JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum; + + memcpy((void*) (current_frame_buffer + frame_buffer_offset), + packet_buffer.data, + JUNGFRAU_DATA_BYTES_PER_PACKET); + + current_metadata.n_recv_packets++; + + // Frame finished with last packet. + if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) + { + // Commit. + previous_metadata = current_metadata; + swap(previous_frame_buffer, current_frame_buffer); + + // This will cause a reset on the next iteration. + current_metadata.pulse_id = 0; + } + + if (previous_metadata.pulse_id != 0) { + break; + } } - auto pulse_id = data.first->pulse_id; - writer.set_pulse_id(pulse_id); + uint64_t pulse_id = previous_metadata.pulse_id; + write_frame(writer, pulse_id, &previous_metadata, previous_frame_buffer); - writer.write_data(data.second); - - writer.write_scalar_metadata( - "pulse_id", &(data.first->pulse_id)); - - writer.write_scalar_metadata( - "frame_id", - &(data.first->frame_index)); - - writer.write_scalar_metadata( - "daq_rec", - &(data.first->daq_rec)); - - writer.write_scalar_metadata( - "received_packets", - &(data.first->n_recv_packets)); - - ring_buffer.release(data.first->buffer_slot_index); + // Indicates that is processed. + previous_metadata.pulse_id = 0; // TODO: Make real statistics, please. n_stat_out++; - if (data.first->n_recv_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { + if (previous_metadata.n_recv_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { n_frames_with_missing_packets += - JUNGFRAU_N_PACKETS_PER_FRAME - data.first->n_recv_packets; + JUNGFRAU_N_PACKETS_PER_FRAME - + previous_metadata.n_recv_packets; } if (last_pulse_id>0) {