diff --git a/core-buffer/src/UdpRecvModule.cpp b/core-buffer/src/UdpRecvModule.cpp index c0514af..c4f2934 100644 --- a/core-buffer/src/UdpRecvModule.cpp +++ b/core-buffer/src/UdpRecvModule.cpp @@ -5,103 +5,60 @@ using namespace std; -UdpRecvModule::UdpRecvModule(RingBuffer& ring_buffer) : - ring_buffer_(ring_buffer), - is_receiving_(false) +UdpRecvModule::UdpRecvModule( + FastQueue& queue, + const uint16_t udp_port) : + queue_(queue), + is_receiving_(true) { - -} - -UdpRecvModule::~UdpRecvModule() -{ - stop_recv(); -} - -void UdpRecvModule::start_recv( - const uint16_t udp_port, - const size_t frame_n_bytes) -{ - if (is_receiving_ == true) { - std::stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::start_recv]"; - err_msg << " Receivers already running." << endl; - - throw runtime_error(err_msg.str()); - } - #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; - cout << "[UdpRecvModule::start_recv]"; + cout << "[UdpRecvModule::UdpRecvModule]"; cout << " Starting with "; cout << "udp_port " << udp_port << endl; #endif - is_receiving_ = true; - - if (receiving_thread_.joinable()) { - receiving_thread_.join(); - } - receiving_thread_ = thread( &UdpRecvModule::receive_thread, this, - udp_port, - frame_n_bytes); + udp_port); } -void UdpRecvModule::stop_recv() +UdpRecvModule::~UdpRecvModule() { -#ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - cout << "[" << system_clock::now() << "]"; - cout << "UdpRecvModule::stop_recv"; - cout << " Stop receiving." << endl; -#endif - is_receiving_ = false; - - if (receiving_thread_.joinable()) { - receiving_thread_.join(); - } + receiving_thread_.join(); } -void UdpRecvModule::receive_thread( - const uint16_t udp_port, - const size_t frame_size) + +void UdpRecvModule::receive_thread(const uint16_t udp_port) { try { - ring_buffer_.initialize(frame_size); - UdpReceiver udp_receiver; udp_receiver.bind(udp_port); - auto metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; + ModuleFrame* module_frame; + module_frame->pulse_id = 0; + module_frame->n_received_packets = 0; - char* frame_buffer = ring_buffer_.reserve(metadata); - if (frame_buffer == nullptr) { + jungfrau_packet packet_buffer; + + auto slot_id = queue_.reserve(); + + if (slot_id == -1) { stringstream err_msg; using namespace date; using namespace chrono; err_msg << "[" << system_clock::now() << "]"; err_msg << "[UdpRecvModule::receive_thread]"; - err_msg << " Ring buffer is full."; + err_msg << " Queue is full."; err_msg << endl; throw runtime_error(err_msg.str()); } - jungfrau_packet packet_buffer; - while (is_receiving_.load(memory_order_relaxed)) { if (!udp_receiver.receive( @@ -110,8 +67,6 @@ void UdpRecvModule::receive_thread( continue; } - auto* frame_metadata = metadata.get(); - // TODO: Horrible. Breake it down into methods. // First packet for this frame.