From 4e3ec6af1f3e9089505a451702287facc2cb8f25 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 17 Apr 2020 17:58:20 +0200 Subject: [PATCH] Revert "Try to use recvmmsg" This reverts commit 365c15a6 --- core-buffer/src/UdpRecvModule.cpp | 104 ++++++++++++------------------ 1 file changed, 42 insertions(+), 62 deletions(-) diff --git a/core-buffer/src/UdpRecvModule.cpp b/core-buffer/src/UdpRecvModule.cpp index 6aaa228..49a77fc 100644 --- a/core-buffer/src/UdpRecvModule.cpp +++ b/core-buffer/src/UdpRecvModule.cpp @@ -2,7 +2,6 @@ #include "jungfrau.hpp" #include #include -#include using namespace std; @@ -89,84 +88,65 @@ void UdpRecvModule::receive_thread( char* frame_buffer = ring_buffer_.reserve(metadata); - auto n_msgs = JUNGFRAU_N_PACKETS_PER_FRAME; - jungfrau_packet recv_buffers[n_msgs]; - iovec recv_buff_ptr[n_msgs]; - struct mmsghdr msgs[n_msgs]; - struct sockaddr_in sockFrom[n_msgs]; - - for (int i = 0; i < n_msgs; i++) { - recv_buff_ptr[i].iov_base = (void*) &(recv_buffers[i]); - recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet); - - msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i]; - msgs[i].msg_hdr.msg_iovlen = 1; - msgs[i].msg_hdr.msg_name = &sockFrom[i]; - msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in ); - } + jungfrau_packet packet_buffer; while (is_receiving_.load(memory_order_relaxed)) { - auto n_packets = udp_receiver.receive_many(msgs, n_msgs); - - if (n_packets < 1) { + if (!udp_receiver.receive( + &packet_buffer, + JUNGFRAU_BYTES_PER_PACKET)) { continue; } - for (int i_packet=0; i_packetpulse_id == 0) { + frame_metadata->frame_index = packet_buffer.framenum; + frame_metadata->pulse_id = packet_buffer.bunchid; + frame_metadata->daq_rec = packet_buffer.debug; + // Packet from new frame, while we lost the last packet of + // previous frame. + } else if (frame_metadata->pulse_id != packet_buffer.bunchid) { + ring_buffer_.commit(metadata); - // First packet for this frame. - if (frame_metadata->pulse_id == 0) { - frame_metadata->frame_index = packet_buffer->framenum; - frame_metadata->pulse_id = packet_buffer->bunchid; - frame_metadata->daq_rec = packet_buffer->debug; - // Packet from new frame, while we lost the last packet of - // previous frame. + metadata = make_shared(); + metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; + metadata->pulse_id = 0; + metadata->n_recv_packets = 0; - } else if (frame_metadata->pulse_id != packet_buffer->bunchid) { - ring_buffer_.commit(metadata); + frame_buffer = ring_buffer_.reserve(metadata); + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; + frame_metadata->frame_index = packet_buffer.framenum; + frame_metadata->pulse_id = packet_buffer.bunchid; + frame_metadata->daq_rec = packet_buffer.debug; + } - frame_buffer = ring_buffer_.reserve(metadata); - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + size_t frame_buffer_offset = + JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum; - frame_metadata->frame_index = packet_buffer->framenum; - frame_metadata->pulse_id = packet_buffer->bunchid; - frame_metadata->daq_rec = packet_buffer->debug; - } + memcpy( + (void*) (frame_buffer + frame_buffer_offset), + packet_buffer.data, + JUNGFRAU_DATA_BYTES_PER_PACKET); - size_t frame_buffer_offset = - JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer->packetnum; + frame_metadata->n_recv_packets++; - memcpy( - (void*) (frame_buffer + frame_buffer_offset), - packet_buffer->data, - JUNGFRAU_DATA_BYTES_PER_PACKET); + // Frame finished with last packet. + if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) + { + ring_buffer_.commit(metadata); - frame_metadata->n_recv_packets++; + metadata = make_shared(); + metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; + metadata->pulse_id = 0; + metadata->n_recv_packets = 0; - // Frame finished with last packet. - if (packet_buffer->packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) - { - ring_buffer_.commit(metadata); - - metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; - - frame_buffer = ring_buffer_.reserve(metadata); - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - } + frame_buffer = ring_buffer_.reserve(metadata); + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); } }