From a6d065a285692e29da8f0414d3f6b389a3e86407 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 4 May 2020 12:17:54 +0200 Subject: [PATCH] Make UdpReceiver quicker and more simple --- core-buffer/include/UdpRecvModule.hpp | 8 ++ core-buffer/src/UdpRecvModule.cpp | 109 +++++++++----------------- 2 files changed, 45 insertions(+), 72 deletions(-) diff --git a/core-buffer/include/UdpRecvModule.hpp b/core-buffer/include/UdpRecvModule.hpp index 723d35c..36e9afc 100644 --- a/core-buffer/include/UdpRecvModule.hpp +++ b/core-buffer/include/UdpRecvModule.hpp @@ -12,6 +12,14 @@ class UdpRecvModule { std::thread receiving_thread_; std::atomic_bool is_receiving_; + inline void init_frame( + ModuleFrame* frame_metadata, + jungfrau_packet& packet_buffer); + + inline void reserve_next_frame_buffers( + ModuleFrame*& frame_metadata, + char*& frame_buffer); + protected: void receive_thread(const uint16_t udp_port); diff --git a/core-buffer/src/UdpRecvModule.cpp b/core-buffer/src/UdpRecvModule.cpp index c4f2934..f633f29 100644 --- a/core-buffer/src/UdpRecvModule.cpp +++ b/core-buffer/src/UdpRecvModule.cpp @@ -31,6 +31,30 @@ UdpRecvModule::~UdpRecvModule() receiving_thread_.join(); } +inline void UdpRecvModule::init_frame ( + ModuleFrame* frame_metadata, + jungfrau_packet& packet_buffer) +{ + frame_metadata->frame_index = packet_buffer.framenum; + frame_metadata->pulse_id = packet_buffer.bunchid; + frame_metadata->daq_rec = packet_buffer.debug; +} + +inline void UdpRecvModule::reserve_next_frame_buffers( + ModuleFrame*& frame_metadata, + char*& frame_buffer) +{ + int slot_id; + if ((slot_id = queue_.reserve()) == -1) + throw runtime_error("Queue is full."); + + frame_metadata = queue_.get_metadata_buffer(slot_id); + frame_metadata->pulse_id=0; + frame_metadata->n_received_packets=0; + + frame_buffer = queue_.get_data_buffer(slot_id); + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); +} void UdpRecvModule::receive_thread(const uint16_t udp_port) { @@ -38,27 +62,12 @@ void UdpRecvModule::receive_thread(const uint16_t udp_port) UdpReceiver udp_receiver; udp_receiver.bind(udp_port); - ModuleFrame* module_frame; - module_frame->pulse_id = 0; - module_frame->n_received_packets = 0; + ModuleFrame* frame_metadata; + char* frame_buffer; + reserve_next_frame_buffers(frame_metadata, frame_buffer); jungfrau_packet packet_buffer; - auto slot_id = queue_.reserve(); - - if (slot_id == -1) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::receive_thread]"; - err_msg << " Queue is full."; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - while (is_receiving_.load(memory_order_relaxed)) { if (!udp_receiver.receive( @@ -67,41 +76,16 @@ void UdpRecvModule::receive_thread(const uint16_t udp_port) continue; } - // TODO: Horrible. Breake it down into methods. - // First packet for this frame. if (frame_metadata->pulse_id == 0) { - frame_metadata->frame_index = packet_buffer.framenum; - frame_metadata->pulse_id = packet_buffer.bunchid; - frame_metadata->daq_rec = packet_buffer.debug; - // Packet from new frame, while we lost the last packet of - // previous frame. + init_frame(frame_metadata, packet_buffer); + + // Happens if the last packet from the previous frame gets lost. } else if (frame_metadata->pulse_id != packet_buffer.bunchid) { - ring_buffer_.commit(metadata); + queue_.commit(); + reserve_next_frame_buffers(frame_metadata, frame_buffer); - metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; - - frame_buffer = ring_buffer_.reserve(metadata); - if (frame_buffer == nullptr) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::receive_thread]"; - err_msg << " Ring buffer is full."; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - - frame_metadata->frame_index = packet_buffer.framenum; - frame_metadata->pulse_id = packet_buffer.bunchid; - frame_metadata->daq_rec = packet_buffer.debug; + init_frame(frame_metadata, packet_buffer); } size_t frame_buffer_offset = @@ -112,32 +96,13 @@ void UdpRecvModule::receive_thread(const uint16_t udp_port) packet_buffer.data, JUNGFRAU_DATA_BYTES_PER_PACKET); - frame_metadata->n_recv_packets++; + frame_metadata->n_received_packets++; - // Frame finished with last packet. + // Last frame packet received. Frame finished. if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) { - ring_buffer_.commit(metadata); - - metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; - - frame_buffer = ring_buffer_.reserve(metadata); - if (frame_buffer == nullptr) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::receive_thread]"; - err_msg << " Ring buffer is full."; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + queue_.commit(); + reserve_next_frame_buffers(frame_metadata, frame_buffer); } }