diff --git a/core-writer/src/module/UdpRecvModule.cpp b/core-writer/src/module/UdpRecvModule.cpp index 1f2769f..abe2e54 100644 --- a/core-writer/src/module/UdpRecvModule.cpp +++ b/core-writer/src/module/UdpRecvModule.cpp @@ -81,9 +81,13 @@ void UdpRecvModule::receive_thread( UdpReceiver udp_receiver; udp_receiver.bind(udp_port); + auto metadata = make_shared(); + metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; + metadata->pulse_id = 0; + + char* frame_buffer = ring_buffer_.reserve(metadata); + jungfrau_packet packet_buffer; - char* frame_buffer = nullptr; - shared_ptr metadata = nullptr; while (is_receiving_.load(memory_order_relaxed)) { @@ -93,27 +97,34 @@ void UdpRecvModule::receive_thread( continue; } - auto* current_metadata = metadata.get(); + auto* frame_metadata = metadata.get(); - if (current_metadata == nullptr || - packet_buffer.framenum != current_metadata->frame_index) { + // TODO: Horrible. Breake it down into methods. - if (frame_buffer != nullptr) { - ring_buffer_.commit(metadata); - } + // First packet for this frame. + if (frame_metadata->pulse_id == 0) { + frame_metadata->frame_index = packet_buffer.framenum; + frame_metadata->pulse_id = packet_buffer.bunchid; + frame_metadata->recv_packets_1 = ~(uint64_t)0; + frame_metadata->recv_packets_2 = ~(uint64_t)0; + frame_metadata->daq_rec = packet_buffer.debug; + // Packet from new frame, while we lost the last packet of + // previous frame. + } else if (frame_metadata->pulse_id != packet_buffer.bunchid) { + ring_buffer_.commit(metadata); metadata = make_shared(); - current_metadata = metadata.get(); - - current_metadata->frame_index = packet_buffer.framenum; - current_metadata->pulse_id = packet_buffer.bunchid; - current_metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - current_metadata->recv_packets_1 = ~(uint64_t)0; - current_metadata->recv_packets_2 = ~(uint64_t)0; - current_metadata->daq_rec = packet_buffer.debug; + metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; + metadata->pulse_id = 0; frame_buffer = ring_buffer_.reserve(metadata); memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + + frame_metadata->frame_index = packet_buffer.framenum; + frame_metadata->pulse_id = packet_buffer.bunchid; + frame_metadata->recv_packets_1 = ~(uint64_t)0; + frame_metadata->recv_packets_2 = ~(uint64_t)0; + frame_metadata->daq_rec = packet_buffer.debug; } size_t frame_buffer_offset = @@ -125,12 +136,25 @@ void UdpRecvModule::receive_thread( JUNGFRAU_DATA_BYTES_PER_PACKET); if (packet_buffer.packetnum < 64) { - current_metadata->recv_packets_1 ^= + frame_metadata->recv_packets_1 ^= (uint64_t)1 << packet_buffer.packetnum; } else { - current_metadata->recv_packets_2 ^= + frame_metadata->recv_packets_2 ^= (uint64_t)1 << (packet_buffer.packetnum - 64); } + + // Frame finished with last packet. + if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) + { + ring_buffer_.commit(metadata); + + metadata = make_shared(); + metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; + metadata->pulse_id = 0; + + frame_buffer = ring_buffer_.reserve(metadata); + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + } } } catch (const std::exception& e) {