From d013bf6da3df1679f14215a24c0e828be147b263 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 15 Apr 2020 13:25:31 +0200 Subject: [PATCH] First implementation of UDP receiver core --- core-writer/include/UdpRecvModule.hpp | 32 ++++++ core-writer/src/module/UdpRecvModule.cpp | 124 +++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 core-writer/include/UdpRecvModule.hpp create mode 100644 core-writer/src/module/UdpRecvModule.cpp diff --git a/core-writer/include/UdpRecvModule.hpp b/core-writer/include/UdpRecvModule.hpp new file mode 100644 index 0000000..0be6633 --- /dev/null +++ b/core-writer/include/UdpRecvModule.hpp @@ -0,0 +1,32 @@ +#ifndef UDPRECVMODULE_HPP +#define UDPRECVMODULE_HPP + +#include "RingBuffer.hpp" +#include + +class UdpRecvModule { + + RingBuffer& ring_buffer_; + + std::atomic_bool is_receiving_; + std::thread receiving_thread_; + + protected: + void receive_thread( + const uint16_t udp_port, + const size_t udp_buffer_n_bytes); + + public: + UdpRecvModule(RingBuffer& ring_buffer); + + virtual ~UdpRecvModule() = default; + + void start_recv( + const uint16_t udp_port, + const size_t udp_buffer_n_bytes); + void stop_recv(); + bool is_receiving(); +}; + + +#endif // UDPRECVMODULE_HPP diff --git a/core-writer/src/module/UdpRecvModule.cpp b/core-writer/src/module/UdpRecvModule.cpp new file mode 100644 index 0000000..ed9ed9a --- /dev/null +++ b/core-writer/src/module/UdpRecvModule.cpp @@ -0,0 +1,124 @@ +#include "UdpRecvModule.hpp" +#include "jungfrau.hpp" +#include +#include + +using namespace std; + +UdpRecvModule::UdpRecvModule(RingBuffer& ring_buffer) : + ring_buffer_(ring_buffer), + is_receiving_(false) +{ + +} + +void UdpRecvModule::start_recv( + const uint16_t udp_port, + const size_t udp_buffer_n_bytes) +{ + if (is_receiving_ == true) { + std::stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[UdpRecvModule::start_recv]"; + err_msg << " Receivers already running." << endl; + + throw runtime_error(err_msg.str()); + } + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[UdpRecvModule::start_recv]"; + cout << " Starting with "; + cout << "udp_port " << udp_port << endl; + #endif + + is_receiving_ = true; + + if (receiving_thread_.joinable()) { + receiving_thread_.join(); + } + + receiving_thread_ = thread( + &UdpRecvModule::receive_thread, this, + udp_port, + udp_buffer_n_bytes); +} + +void UdpRecvModule::receive_thread( + const uint16_t udp_port, + const size_t udp_buffer_n_bytes) +{ + try { + ring_buffer_.initialize(udp_buffer_n_bytes); + + UdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet packet_buffer; + char* frame_buffer = nullptr; + shared_ptr metadata = nullptr; + + while (is_receiving_.load(memory_order_relaxed)) { + + if (!udp_receiver.receive( + &packet_buffer, + JUNGFRAU_BYTES_PER_PACKET)) { + continue; + } + + auto* current_metadata = metadata.get(); + + if (packet_buffer.framenum != current_metadata->frame_index) { + if (frame_buffer != nullptr) { + ring_buffer_.commit(metadata); + } + + metadata = make_shared(); + current_metadata = metadata.get(); + + current_metadata->frame_index = packet_buffer.framenum; + current_metadata->pulse_id = packet_buffer.bunchid; + current_metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; + current_metadata->recv_packets_1 = 0; + current_metadata->recv_packets_2 = 0; + + frame_buffer = ring_buffer_.reserve(metadata); + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + } + + size_t frame_buffer_offset = + JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum; + + memcpy( + (void*) (frame_buffer + frame_buffer_offset), + packet_buffer.data, + JUNGFRAU_DATA_BYTES_PER_PACKET); + + if (packet_buffer.packetnum < 64) { + current_metadata->recv_packets_1 ^= + (uint64_t)1 << packet_buffer.packetnum; + } else { + current_metadata->recv_packets_2 ^= + (uint64_t)1 << (packet_buffer.packetnum - 64); + } + } + + } catch (const std::exception& e) { + is_receiving_ = false; + + using namespace date; + using namespace chrono; + + cout << "[" << system_clock::now() << "]"; + cout << "[UdpRecvModule::receive_thread]"; + cout << " Stopped because of exception: " << endl; + cout << e.what() << endl; + + throw; + } +} \ No newline at end of file