diff --git a/core-buffer/src/UdpRecvModule.cpp b/core-buffer/src/UdpRecvModule.cpp index 49a77fc..6aaa228 100644 --- a/core-buffer/src/UdpRecvModule.cpp +++ b/core-buffer/src/UdpRecvModule.cpp @@ -2,6 +2,7 @@ #include "jungfrau.hpp" #include #include +#include using namespace std; @@ -88,65 +89,84 @@ void UdpRecvModule::receive_thread( char* frame_buffer = ring_buffer_.reserve(metadata); - jungfrau_packet packet_buffer; + auto n_msgs = JUNGFRAU_N_PACKETS_PER_FRAME; + jungfrau_packet recv_buffers[n_msgs]; + iovec recv_buff_ptr[n_msgs]; + struct mmsghdr msgs[n_msgs]; + struct sockaddr_in sockFrom[n_msgs]; + + for (int i = 0; i < n_msgs; i++) { + recv_buff_ptr[i].iov_base = (void*) &(recv_buffers[i]); + recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet); + + msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i]; + msgs[i].msg_hdr.msg_iovlen = 1; + msgs[i].msg_hdr.msg_name = &sockFrom[i]; + msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in ); + } while (is_receiving_.load(memory_order_relaxed)) { - if (!udp_receiver.receive( - &packet_buffer, - JUNGFRAU_BYTES_PER_PACKET)) { + auto n_packets = udp_receiver.receive_many(msgs, n_msgs); + + if (n_packets < 1) { continue; } - auto* frame_metadata = metadata.get(); + for (int i_packet=0; i_packetpulse_id == 0) { - frame_metadata->frame_index = packet_buffer.framenum; - frame_metadata->pulse_id = packet_buffer.bunchid; - frame_metadata->daq_rec = packet_buffer.debug; - // Packet from new frame, while we lost the last packet of - // previous frame. - } else if (frame_metadata->pulse_id != packet_buffer.bunchid) { - ring_buffer_.commit(metadata); + // TODO: Horrible. Breake it down into methods. - metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; + // First packet for this frame. + if (frame_metadata->pulse_id == 0) { + frame_metadata->frame_index = packet_buffer->framenum; + frame_metadata->pulse_id = packet_buffer->bunchid; + frame_metadata->daq_rec = packet_buffer->debug; + // Packet from new frame, while we lost the last packet of + // previous frame. - frame_buffer = ring_buffer_.reserve(metadata); - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + } else if (frame_metadata->pulse_id != packet_buffer->bunchid) { + ring_buffer_.commit(metadata); - frame_metadata->frame_index = packet_buffer.framenum; - frame_metadata->pulse_id = packet_buffer.bunchid; - frame_metadata->daq_rec = packet_buffer.debug; - } + metadata = make_shared(); + metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; + metadata->pulse_id = 0; + metadata->n_recv_packets = 0; - size_t frame_buffer_offset = - JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum; + frame_buffer = ring_buffer_.reserve(metadata); + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - memcpy( - (void*) (frame_buffer + frame_buffer_offset), - packet_buffer.data, - JUNGFRAU_DATA_BYTES_PER_PACKET); + frame_metadata->frame_index = packet_buffer->framenum; + frame_metadata->pulse_id = packet_buffer->bunchid; + frame_metadata->daq_rec = packet_buffer->debug; + } - frame_metadata->n_recv_packets++; + size_t frame_buffer_offset = + JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer->packetnum; - // Frame finished with last packet. - if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) - { - ring_buffer_.commit(metadata); + memcpy( + (void*) (frame_buffer + frame_buffer_offset), + packet_buffer->data, + JUNGFRAU_DATA_BYTES_PER_PACKET); - metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; + frame_metadata->n_recv_packets++; - frame_buffer = ring_buffer_.reserve(metadata); - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + // Frame finished with last packet. + if (packet_buffer->packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) + { + ring_buffer_.commit(metadata); + + metadata = make_shared(); + metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; + metadata->pulse_id = 0; + metadata->n_recv_packets = 0; + + frame_buffer = ring_buffer_.reserve(metadata); + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + } } }