From 14a50fb08ece60f767e34bf7777fca5374169e60 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 12 May 2020 09:54:43 +0200 Subject: [PATCH] Implement recvmmsg in BufferUdpReceiver --- core-buffer/include/BufferUdpReceiver.hpp | 19 +++- core-buffer/src/BufferUdpReceiver.cpp | 112 ++++++++++++++++------ 2 files changed, 98 insertions(+), 33 deletions(-) diff --git a/core-buffer/include/BufferUdpReceiver.hpp b/core-buffer/include/BufferUdpReceiver.hpp index 25d65e7..2d97a47 100644 --- a/core-buffer/include/BufferUdpReceiver.hpp +++ b/core-buffer/include/BufferUdpReceiver.hpp @@ -1,19 +1,30 @@ #ifndef SF_DAQ_BUFFER_BUFFERUDPRECEIVER_HPP #define SF_DAQ_BUFFER_BUFFERUDPRECEIVER_HPP +#include #include "UdpReceiver.hpp" #include "jungfrau.hpp" +#include "buffer_config.hpp" class BufferUdpReceiver { const int source_id_; UdpReceiver udp_receiver_; - jungfrau_packet packet_buffer_ = {}; - bool packet_buffer_loaded_ = false; - inline void init_frame(ModuleFrame& frame_metadata); + jungfrau_packet packet_buffer_[core_buffer::BUFFER_UDP_N_RECV_MSG]; + iovec recv_buff_ptr_[core_buffer::BUFFER_UDP_N_RECV_MSG]; + mmsghdr msgs_[core_buffer::BUFFER_UDP_N_RECV_MSG]; + sockaddr_in sock_from_[core_buffer::BUFFER_UDP_N_RECV_MSG]; + + bool packet_buffer_loaded_ = false; + int packet_buffer_n_packets_ = 0; + int packet_buffer_offset_ = 0; + + inline void init_frame(ModuleFrame& frame_metadata, const int i_packet); inline void copy_packet_to_buffers( - ModuleFrame& metadata, char* frame_buffer); + ModuleFrame& metadata, char* frame_buffer, const int i_packet); + inline uint64_t process_packets( + const int n_packets, ModuleFrame& metadata, char* frame_buffer); public: BufferUdpReceiver(const uint16_t port, const int source_id); diff --git a/core-buffer/src/BufferUdpReceiver.cpp b/core-buffer/src/BufferUdpReceiver.cpp index fb42745..68b51dc 100644 --- a/core-buffer/src/BufferUdpReceiver.cpp +++ b/core-buffer/src/BufferUdpReceiver.cpp @@ -3,6 +3,7 @@ #include "BufferUdpReceiver.hpp" using namespace std; +using namespace core_buffer; BufferUdpReceiver::BufferUdpReceiver( const uint16_t port, @@ -10,33 +11,95 @@ BufferUdpReceiver::BufferUdpReceiver( source_id_(source_id) { udp_receiver_.bind(port); + + for (int i = 0; i < BUFFER_UDP_N_RECV_MSG; i++) { + recv_buff_ptr_[i].iov_base = (void*) &(packet_buffer_[i]); + recv_buff_ptr_[i].iov_len = sizeof(jungfrau_packet); + + msgs_[i].msg_hdr.msg_iov = &recv_buff_ptr_[i]; + msgs_[i].msg_hdr.msg_iovlen = 1; + msgs_[i].msg_hdr.msg_name = &sock_from_[i]; + msgs_[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } } BufferUdpReceiver::~BufferUdpReceiver() { udp_receiver_.disconnect(); } -inline void BufferUdpReceiver::init_frame(ModuleFrame& frame_metadata) +inline void BufferUdpReceiver::init_frame( + ModuleFrame& frame_metadata, const int i_packet) { - frame_metadata.pulse_id = packet_buffer_.bunchid; - frame_metadata.frame_index = packet_buffer_.framenum; - frame_metadata.daq_rec = (uint64_t) packet_buffer_.debug; + frame_metadata.pulse_id = packet_buffer_[i_packet].bunchid; + frame_metadata.frame_index = packet_buffer_[i_packet].framenum; + frame_metadata.daq_rec = (uint64_t) packet_buffer_[i_packet].debug; frame_metadata.module_id = (int64_t) source_id_; } inline void BufferUdpReceiver::copy_packet_to_buffers( - ModuleFrame& metadata, char* frame_buffer) + ModuleFrame& metadata, char* frame_buffer, const int i_packet) { size_t frame_buffer_offset = - JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer_.packetnum; + JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer_[i_packet].packetnum; memcpy( (void*) (frame_buffer + frame_buffer_offset), - packet_buffer_.data, + packet_buffer_[i_packet].data, JUNGFRAU_DATA_BYTES_PER_PACKET); metadata.n_received_packets++; } +inline uint64_t BufferUdpReceiver::process_packets( + const int start_offset, + ModuleFrame& metadata, + char* frame_buffer) +{ + for ( + int i_packet=start_offset; + i_packet < packet_buffer_n_packets_; + i_packet++) { + + // First packet for this frame. + if (metadata.pulse_id == 0) { + init_frame(metadata, i_packet); + + // Happens if the last packet from the previous frame gets lost. + } else if (metadata.pulse_id != packet_buffer_[i_packet].bunchid) { + packet_buffer_loaded_ = true; + // Continue on this packet. + packet_buffer_offset_ = i_packet; + + return metadata.pulse_id; + } + + copy_packet_to_buffers(metadata, frame_buffer, i_packet); + + // Last frame packet received. Frame finished. + if (packet_buffer_[i_packet].packetnum == + JUNGFRAU_N_PACKETS_PER_FRAME-1) + { + // Buffer is loaded only if this is not the last message. + if (i_packet+1 != packet_buffer_n_packets_) { + packet_buffer_loaded_ = true; + // Continue on next packet. + packet_buffer_offset_ = i_packet + 1; + + // If i_packet is the last packet the buffer is empty. + } else { + packet_buffer_loaded_ = false; + packet_buffer_offset_ = 0; + } + + return metadata.pulse_id; + } + } + // We emptied the buffer. + packet_buffer_loaded_ = false; + packet_buffer_offset_ = 0; + + return 0; +} + uint64_t BufferUdpReceiver::get_frame_from_udp( ModuleFrame& metadata, char* frame_buffer) { @@ -47,37 +110,28 @@ uint64_t BufferUdpReceiver::get_frame_from_udp( // Happens when last packet from previous frame was missed. if (packet_buffer_loaded_) { - packet_buffer_loaded_ = false; - init_frame(metadata); - copy_packet_to_buffers(metadata, frame_buffer); + auto pulse_id = process_packets( + packet_buffer_offset_, metadata, frame_buffer); + + if (pulse_id != 0) { + return pulse_id; + } } while (true) { - if (!udp_receiver_.receive( - &packet_buffer_, - JUNGFRAU_BYTES_PER_PACKET)) { + packet_buffer_n_packets_ = udp_receiver_.receive_many( + msgs_, BUFFER_UDP_N_RECV_MSG); + + if (packet_buffer_n_packets_ == 0) { continue; } - // First packet for this frame. - if (metadata.pulse_id == 0) { - init_frame(metadata); + auto pulse_id = process_packets(0, metadata, frame_buffer); - // Happens if the last packet from the previous frame gets lost. - } else if (metadata.pulse_id != packet_buffer_.bunchid) { - packet_buffer_loaded_ = true; - - return metadata.pulse_id; - } - - copy_packet_to_buffers(metadata, frame_buffer); - - // Last frame packet received. Frame finished. - if (packet_buffer_.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) - { - return metadata.pulse_id; + if (pulse_id != 0) { + return pulse_id; } } } \ No newline at end of file