diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index 77e704c..d4ec12f 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -3,40 +3,14 @@ #include #include #include -#include #include "config.hpp" #include "jungfrau.hpp" #include "BufferUtils.hpp" + using namespace std; -void write_frame( - FastH5Writer &writer, - const uint64_t pulse_id, - const UdpFrameMetadata* frame_metadata, - const char* frame_buffer) -{ - writer.set_pulse_id(pulse_id); - - writer.write_data(frame_buffer); - - writer.write_scalar_metadata( - "pulse_id", &(frame_metadata->pulse_id)); - - writer.write_scalar_metadata( - "frame_id", - &(frame_metadata->frame_index)); - - writer.write_scalar_metadata( - "daq_rec", - &(frame_metadata->daq_rec)); - - writer.write_scalar_metadata( - "received_packets", - &(frame_metadata->n_recv_packets)); -} - int main (int argc, char *argv[]) { if (argc != 4) { @@ -55,6 +29,11 @@ int main (int argc, char *argv[]) { int udp_port = atoi(argv[2]); string root_folder = string(argv[3]); + RingBuffer ring_buffer(config::ring_buffer_n_slots); + + UdpRecvModule udp_module(ring_buffer); + udp_module.start_recv(udp_port, JUNGFRAU_DATA_BYTES_PER_FRAME); + uint64_t n_stat_out(0); uint64_t n_frames_with_missing_packets = 0; uint64_t n_missed_frames = 0; @@ -68,91 +47,42 @@ int main (int argc, char *argv[]) { writer.add_scalar_metadata("daq_rec"); writer.add_scalar_metadata("received_packets"); - jungfrau_packet packet_buffer; - UdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - - char* previous_frame_buffer = new char[2*512*1024]; - memset(previous_frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - - UdpFrameMetadata previous_metadata; - previous_metadata.pulse_id = 0; - previous_metadata.n_recv_packets = 0; - previous_metadata.daq_rec = 0; - previous_metadata.n_recv_packets = 0; - - char* current_frame_buffer = new char[2*512*1024]; - memset(current_frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - - UdpFrameMetadata current_metadata; - current_metadata.pulse_id = 0; - current_metadata.n_recv_packets = 0; - current_metadata.daq_rec = 0; - current_metadata.n_recv_packets = 0; - while (true) { + auto data = ring_buffer.read(); - while (true) { - - if (!udp_receiver.receive( - &packet_buffer, - JUNGFRAU_BYTES_PER_PACKET)) { - continue; - } - - if (current_metadata.pulse_id != packet_buffer.bunchid) { - if (current_metadata.pulse_id != 0) { - // Commit - previous_metadata = current_metadata; - swap(previous_frame_buffer, current_frame_buffer); - } - - // Init current_metadata - current_metadata.frame_index = packet_buffer.framenum; - current_metadata.pulse_id = packet_buffer.bunchid; - current_metadata.daq_rec = packet_buffer.debug; - current_metadata.n_recv_packets = 0; - memset(current_frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - } - - size_t frame_buffer_offset = - JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum; - - memcpy((void*) (current_frame_buffer + frame_buffer_offset), - packet_buffer.data, - JUNGFRAU_DATA_BYTES_PER_PACKET); - - current_metadata.n_recv_packets++; - - // Frame finished with last packet. - if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) - { - // Commit. - previous_metadata = current_metadata; - swap(previous_frame_buffer, current_frame_buffer); - - // This will cause a reset on the next iteration. - current_metadata.pulse_id = 0; - } - - if (previous_metadata.pulse_id != 0) { - break; - } + if (data.first == nullptr) { + this_thread::sleep_for(chrono::milliseconds(10)); + continue; } - uint64_t pulse_id = previous_metadata.pulse_id; - write_frame(writer, pulse_id, &previous_metadata, previous_frame_buffer); + auto pulse_id = data.first->pulse_id; + writer.set_pulse_id(pulse_id); - // Indicates that is processed. - previous_metadata.pulse_id = 0; + writer.write_data(data.second); + + writer.write_scalar_metadata( + "pulse_id", &(data.first->pulse_id)); + + writer.write_scalar_metadata( + "frame_id", + &(data.first->frame_index)); + + writer.write_scalar_metadata( + "daq_rec", + &(data.first->daq_rec)); + + writer.write_scalar_metadata( + "received_packets", + &(data.first->n_recv_packets)); + + ring_buffer.release(data.first->buffer_slot_index); // TODO: Make real statistics, please. n_stat_out++; - if (previous_metadata.n_recv_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { + if (data.first->n_recv_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { n_frames_with_missing_packets += - JUNGFRAU_N_PACKETS_PER_FRAME - - previous_metadata.n_recv_packets; + JUNGFRAU_N_PACKETS_PER_FRAME - data.first->n_recv_packets; } if (last_pulse_id>0) { @@ -172,7 +102,5 @@ int main (int argc, char *argv[]) { n_frames_with_missing_packets = 0; n_missed_frames = 0; } - - this_thread::yield(); } }