From 20e7376615cfb7fe33675103c0475d4f40047721 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Tue, 1 Jun 2021 10:22:10 +0200 Subject: [PATCH] Refactored to linear buffer --- core-buffer/include/jungfraujoch.hpp | 2 +- core-buffer/test/test_PacketBuffer.cpp | 254 ++++++++++++++++++ jfj-udp-recv/include/DataBuffer.hpp | 92 ------- jfj-udp-recv/include/JFJochUdpReceiver.hpp | 33 --- .../{ImageStats.hpp => JfjFrameStats.hpp} | 0 jfj-udp-recv/include/JfjFrameUdpReceiver.hpp | 31 +++ jfj-udp-recv/include/PacketBuffer.hpp | 110 ++++++++ jfj-udp-recv/mockmain.cpp | 15 ++ jfj-udp-recv/src/JFJochUdpReceiver.cpp | 112 -------- .../src/{ImageStats.cpp => JfjFrameStats.cpp} | 0 jfj-udp-recv/src/JfjFrameUdpReceiver.cpp | 77 ++++++ jfj-udp-recv/test/test_FrameUdpReceiver.cpp | 21 +- 12 files changed, 494 insertions(+), 253 deletions(-) create mode 100644 core-buffer/test/test_PacketBuffer.cpp delete mode 100644 jfj-udp-recv/include/DataBuffer.hpp delete mode 100644 jfj-udp-recv/include/JFJochUdpReceiver.hpp rename jfj-udp-recv/include/{ImageStats.hpp => JfjFrameStats.hpp} (100%) create mode 100644 jfj-udp-recv/include/JfjFrameUdpReceiver.hpp create mode 100644 jfj-udp-recv/include/PacketBuffer.hpp create mode 100644 jfj-udp-recv/mockmain.cpp delete mode 100644 jfj-udp-recv/src/JFJochUdpReceiver.cpp rename jfj-udp-recv/src/{ImageStats.cpp => JfjFrameStats.cpp} (100%) create mode 100644 jfj-udp-recv/src/JfjFrameUdpReceiver.cpp diff --git a/core-buffer/include/jungfraujoch.hpp b/core-buffer/include/jungfraujoch.hpp index bbfbb08..fd549b2 100644 --- a/core-buffer/include/jungfraujoch.hpp +++ b/core-buffer/include/jungfraujoch.hpp @@ -17,7 +17,7 @@ struct jfjoch_packet_t { uint32_t exptime; uint32_t packetnum; - double bunchid; + uint64_t bunchid; uint64_t timestamp; uint16_t moduleID; diff --git a/core-buffer/test/test_PacketBuffer.cpp b/core-buffer/test/test_PacketBuffer.cpp new file mode 100644 index 0000000..110e6ca --- /dev/null +++ b/core-buffer/test/test_PacketBuffer.cpp @@ -0,0 +1,254 @@ +#include +#include +#include "gtest/gtest.h" +#include "PacketBuffer.hpp" + +#include +#include +#include + +using namespace std; + +template +class MockReceiver{ + public: + int idx_packet = 42000; + int packet_per_frame = 512; + int num_bunches = 100; + int num_packets =50; + + int receive_many(mmsghdr* msgs, const size_t n_msgs){ + // Receive 'num_packets numner of packets' + + for(int ii=0; ii(mmsghdr[ii].msg_hdr.msg_iov->iov_base); + refer.bunchid = idx_packet / packet_per_frame; + refer.packetnum = idx_packet % packet_per_frame; + idx_packet++; + } + return std::min(num_packets, n_msgs); + }; +}; + +// +// +// +// +//TEST(BufferUdpReceiver, simple_recv) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 5; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// for (int i_frame=0; i_frame < n_frames; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp( +// metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets); +// } +// +// ::close(send_socket_fd); +//} +// +//TEST(BufferUdpReceiver, missing_middle_packet) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 3; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port, source_id); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// for (int i_frame=0; i_frame < n_frames; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp( +// metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); +// } +// +// ::close(send_socket_fd); +//} +// +//TEST(BufferUdpReceiver, missing_first_packet) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 3; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// for (int i_frame=0; i_frame < n_frames; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp( +// metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); +// } +// +// ::close(send_socket_fd); +//} +// +//TEST(BufferUdpReceiver, missing_last_packet) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 3; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// // n_frames -1 because the last frame is not complete. +// for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); +// } +// +// ::close(send_socket_fd); +//} diff --git a/jfj-udp-recv/include/DataBuffer.hpp b/jfj-udp-recv/include/DataBuffer.hpp deleted file mode 100644 index 1d22d73..0000000 --- a/jfj-udp-recv/include/DataBuffer.hpp +++ /dev/null @@ -1,92 +0,0 @@ -#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP -#define CIRCULAR_BUFFER_TEMPLATE_HPP - -#include -#include -#include -#include -#include - -/**Linear data buffer - -A simplified version of FIFO. -**/ -template -class DataBuffer{ -public: - DataBuffer() {}; - ~DataBuffer() {}; - - /**Diagnostics**/ - size_t size() const { return ( _write-_read ); } - size_t capacity() const { return _capacity; } - bool is_full(){ return ( (_write - _read)<_capacity ); } - bool is_empty(){ return (_write ==_read); } - - /**Operators**/ - void zero(){ memset(m_cont, 0, sizeof(m_cont)); } - T& operator[](size_t index); // Array subscript operator - T& container(){ return (_cont; } // Direct container reference - - /**Element access**/ - const T& pop_front(); //Destructive read - const T& get_front(); //Non-destructive read - void push_back(T item); //Write new element to buffer - - /**Guards**/ - std::mutex g_mutex; -private: - T m_cont[CAP]; - const size_t m_capacity = CAP; - size_t ptr_write = 0; - size_t ptr_read = 0; -}; - -/** Array subscript operator - Throws 'std::length_error' if out of range. -**/ -template -T& DataBuffer::operator[](size_t idx){ - if(idx > m_capacity){ - std::string msg = "Buffer index '" + std::to_string(idx) + "' is out of range with capacity '" + std::to_sting(m_capacity) + "'" + std::endl; - throw std::out_of_range(msg); - } - - return m_buffer[idx]; -} - -template -T& DataBuffer::container(){ - return m_buffer; -} - -/*********************************************************************/ - -/** Destructive read (i.e. progress the read pointer) **/ -template -const T& DataBuffer::pop_front(){ - std::lock_guard g_guard; - ptr_read++; - return _buffer[ptr_read-1]; -} - -/**Push a new element to the ringbuffer (do not progress read pointer)**/ -template -const T& DataBuffer::peek_front(){ - return m_buffer[ptr_read]; -} - - -/**Push a new element to the ringbuffer**/ -template -void DataBuffer::push_back(T item){ - std::lock_guard g_guard; - if(ptr_write==m_capacity-1){ - std::string msg = "Buffer with '" + std::to_sting(m_capacity) + "' capacity is full" + std::endl; - throw std::out_of_range(msg); - } - m_buffer[ptr_write] = item; - ptr_write++; -} - -#endif // CIRCULAR_BUFFER_TEMPLATE_HPP diff --git a/jfj-udp-recv/include/JFJochUdpReceiver.hpp b/jfj-udp-recv/include/JFJochUdpReceiver.hpp deleted file mode 100644 index 4d38dd4..0000000 --- a/jfj-udp-recv/include/JFJochUdpReceiver.hpp +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP -#define SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP - -#include -#include "PacketUdpReceiver.hpp" -#include "formats.hpp" -#include "buffer_config.hpp" - -class JFJochUdpReceiver { - PacketUdpReceiver m_udp_receiver; - - // Incoming packet buffers - jfjoch_packet_t m_packet_buffer[buffer_config::BUFFER_UDP_N_RECV_MSG]; - iovec m_recv_buff_ptr[buffer_config::BUFFER_UDP_N_RECV_MSG]; - mmsghdr m_msgs[buffer_config::BUFFER_UDP_N_RECV_MSG]; - sockaddr_in m_sock_from[buffer_config::BUFFER_UDP_N_RECV_MSG]; - - bool packet_buffer_loaded_ = false; - int packet_buffer_n_packets_ = 0; - int packet_buffer_offset_ = 0; - - inline void init_frame(ImageMetadata& frame_metadata, const int i_packet); - inline void copy_packet_to_buffers(ImageMetadata& metadata, char* frame_buffer, const int i_packet); - inline uint64_t m_process_packets(const int n_packets, ImageMetadata& metadata, char* frame_buffer); - -public: - JFJochUdpReceiver(const uint16_t port, const int module_id); - virtual ~JFJochUdpReceiver(); - uint64_t get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer); -}; - - -#endif //SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP diff --git a/jfj-udp-recv/include/ImageStats.hpp b/jfj-udp-recv/include/JfjFrameStats.hpp similarity index 100% rename from jfj-udp-recv/include/ImageStats.hpp rename to jfj-udp-recv/include/JfjFrameStats.hpp diff --git a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp new file mode 100644 index 0000000..d5c5ddf --- /dev/null +++ b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp @@ -0,0 +1,31 @@ +#ifndef SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP +#define SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP + +#include +#include "PacketUdpReceiver.hpp" +#include "formats.hpp" +#include "buffer_config.hpp" +#include "PacketBuffer.hpp" + + +/** JungfrauJoch UDP receiver + + Wrapper class to capture frames from the UDP stream of the JungfrauJoch FPGA card. + NOTE: This design will not scale well for higher frame rates... +**/ +class JfjFrameUdpReceiver { + PacketUdpReceiver udp_receiver_; + + PacketBuffer m_buffer; + + inline void init_frame(ImageMetadata& frame_metadata, jfjoch_packet_t& c_packet); + inline uint64_t process_packets(ImageMetadata& metadata, char* frame_buffer); + +public: + JfjFrameUdpReceiver(const uint16_t port); + virtual ~JfjFrameUdpReceiver(); + uint64_t get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer); +}; + + +#endif //SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP diff --git a/jfj-udp-recv/include/PacketBuffer.hpp b/jfj-udp-recv/include/PacketBuffer.hpp new file mode 100644 index 0000000..46958e6 --- /dev/null +++ b/jfj-udp-recv/include/PacketBuffer.hpp @@ -0,0 +1,110 @@ +#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP +#define CIRCULAR_BUFFER_TEMPLATE_HPP + +#include +#include +#include +#include +#include +#include + + +/** Linear data buffer (NOT FIFO) + + Simplified data buffer that provides pop and push operations and + bundles the actual container with metadata required by . + It stores the actual data in an accessible C-style array. **/ +template +class PacketBuffer{ +public: + PacketBuffer() { + for (int i = 0; i < CAPACITY; i++) { + m_recv_buff_ptr[i].iov_base = (void*) &(m_container[i]); + m_recv_buff_ptr[i].iov_len = sizeof(T); + + // C-structure as expected by + m_msgs[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i]; + m_msgs[i].msg_hdr.msg_iovlen = 1; + m_msgs[i].msg_hdr.msg_name = &m_sock_from[i]; + m_msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } + }; + // ~PacketBuffer() {}; + + /**Diagnostics**/ + size_t size() const { return ( idx_write-idx_read ); } + size_t capacity() const { return m_capacity; } + bool is_full() const { return (idx_write >= m_capacity); } + bool is_empty() const { return (idx_write <= idx_read); } + + /**Operators**/ + void reset(){ idx_write = 0; idx_read = 0; }; // Reset the buffer + T& container(){ return m_container; }; // Direct container reference + mmsghdr& msgs(){ return m_msgs; }; + + /**Element access**/ + const T& pop_front(); //Destructive read + const T& peek_front(); //Non-destructive read + void push_back(T item); //Write new element to buffer + + /**Fill from UDP receiver**/ + template + void fill_fom(TY& recv){ + std::lock_guard g_guard(m_mutex); + this->idx_write = recv.receive_many(this->msgs(), this->capacity()); + this->idx_read = 0; + } + +private: + // Main container + T m_container[CAPACITY]; + const size_t m_capacity = CAPACITY; + /**Guards**/ + std::mutex m_mutex; + /**Read and write index**/ + size_t idx_write = 0; + size_t idx_read = 0; + + // C-structures as expected by + mmsghdr m_msgs[CAPACITY]; + iovec m_recv_buff_ptr[CAPACITY]; + sockaddr_in m_sock_from[CAPACITY]; +}; + + +/*********************************************************************/ +/*********************************************************************/ +/*********************************************************************/ + +/** Destructive read + Standard read access to queues (i.e. progress the read pointer). + Throws 'std::length_error' if container is empty. **/ +template +const T& PacketBuffer::pop_front(){ + std::lock_guard g_guard(m_mutex); + if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } + idx_read++; + return m_container[idx_read-1]; +} + +/** Non-destructive read + Standard, non-destructive read access (does not progress the read pointer). + Throws 'std::length_error' if container is empty. **/ +template +const T& PacketBuffer::peek_front(){ + std::lock_guard g_guard(m_mutex); + if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } + return m_container[idx_read]; +} + + +/** Push an element into the end of the buffer**/ +template +void PacketBuffer::push_back(T item){ + std::lock_guard g_guard(m_mutex); + if(this->is_full()){ throw std::out_of_range("Attempted to write a full buffer!"); } + m_container[idx_write] = item; + idx_write++; +} + +#endif // CIRCULAR_BUFFER_TEMPLATE_HPP diff --git a/jfj-udp-recv/mockmain.cpp b/jfj-udp-recv/mockmain.cpp new file mode 100644 index 0000000..1d33d2b --- /dev/null +++ b/jfj-udp-recv/mockmain.cpp @@ -0,0 +1,15 @@ + +#include "include/PacketBuffer.hpp" + + +struct DummyContainer{ + uint64_t index; + uint64_t timestamp; + uint16_t data[32]; +}; + + +int main (int argc, char *argv[]) { + PacketBuffer b; + +} diff --git a/jfj-udp-recv/src/JFJochUdpReceiver.cpp b/jfj-udp-recv/src/JFJochUdpReceiver.cpp deleted file mode 100644 index 24ed15e..0000000 --- a/jfj-udp-recv/src/JFJochUdpReceiver.cpp +++ /dev/null @@ -1,112 +0,0 @@ -#include -#include -#include "JFJochUdpReceiver.hpp" - -using namespace std; -using namespace buffer_config; - -JFJochUdpReceiver::JFJochUdpReceiver(const uint16_t port, const int module_id) : module_id_(module_id){ - udp_receiver_.bind(port); - - for (int i = 0; i < BUFFER_UDP_N_RECV_MSG; i++) { - m_recv_buff_ptr[i].iov_base = (void*) &(m_packet_buffer[i]); - m_recv_buff_ptr[i].iov_len = sizeof(jfjoch_packet_t); - - msgs_[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i]; - msgs_[i].msg_hdr.msg_iovlen = 1; - msgs_[i].msg_hdr.msg_name = &m_sock_from[i]; - msgs_[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); - } -} - -JFJochUdpReceiver::~JFJochUdpReceiver() { - m_udp_receiver.disconnect(); -} - -inline void JFJochUdpReceiver::init_frame(ImageMetadata& image_metadata, const int i_packet) { - image_metadata.pulse_id = m_packet_buffer[i_packet].bunchid; - image_metadata.frame_index = m_packet_buffer[i_packet].framenum; - image_metadata.daq_rec = m_packet_buffer[i_packet].debug; - image_metadata.is_good_image = 0; -} - -inline void JFJochUdpReceiver::copy_packet_to_buffers(ImageMetadata& metadata, char* frame_buffer, const int idx_packet){ - - size_t buffer_offset = JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer_[idx_packet].packetnum; - - memcpy((void*) (frame_buffer + buffer_offset), m_packet_buffer[idx_packet].data, JUNGFRAU_DATA_BYTES_PER_PACKET); - - metadata.n_recv_packets++; -} - - - -/** Copy the contents of the packet buffer into a single assembled image - NOTE: In the jungfrau_packet, framenum is the trigger number - NOTE: Even partial frames are valid -**/ -inline uint64_t JFJochUdpReceiver::m_process_packets(const int start_offset, ImageMetadata& metadata, char* frame_buffer){ - - for (int i_packet=start_offset; i_packet < packet_buffer_n_packets_; i_packet++) { - - // First packet for this frame (sucks if this one is missed) - if (metadata.pulse_id == 0) { - init_frame(metadata, i_packet); - } - // Unexpected jump (if the last packet from the previous frame got lost) - if (metadata.frame_index != m_packet_buffer[i_packet].framenum) { - // Save queue status (lazy fifo queue) - m_packet_buffer_loaded_ = true; - m_packet_buffer_offset_ = i_packet; - // Even partial frames are valid? - return metadata.pulse_id; - } - - copy_packet_to_buffers(metadata, frame_buffer, i_packet); - - // Last frame packet received (frame finished) - if (packet_buffer_[i_packet].packetnum == JFJ_N_PACKETS_PER_FRAME - 1) { - // Buffer is loaded only if this is not the last message. - if (i_packet+1 != packet_buffer_n_packets_) { - // Continue on next packet - m_packet_buffer_loaded = true; - m_packet_buffer_offset = i_packet + 1; - - // If i_packet is the last packet the buffer is empty. - } else { - m_packet_buffer_loaded = true; - m_packet_buffer_offset = 0; - } - - return metadata.pulse_id; - } - } - // We emptied the buffer. - m_packet_buffer_loaded = false; - m_packet_buffer_offset = 0; - - return 0; -} - -uint64_t JFJochUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){ - // Reset the metadata and frame buffer for the next frame. - metadata.pulse_id = 0; - metadata.n_recv_packets = 0; - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - - // Happens when last packet from previous frame was missed. - if (packet_buffer_loaded_) { - auto pulse_id = m_process_packets(packet_buffer_offset_, metadata, frame_buffer); - if (pulse_id != 0) { return pulse_id; } - } - - // Otherwise read a new one - while (true) { - packet_buffer_n_packets_ = udp_receiver_.receive_many(msgs_, BUFFER_UDP_N_RECV_MSG); - - if (packet_buffer_n_packets_ > 0) { - auto pulse_id = m_process_packets(0, metadata, frame_buffer); - if (pulse_id != 0) { return pulse_id; } - } - } -} diff --git a/jfj-udp-recv/src/ImageStats.cpp b/jfj-udp-recv/src/JfjFrameStats.cpp similarity index 100% rename from jfj-udp-recv/src/ImageStats.cpp rename to jfj-udp-recv/src/JfjFrameStats.cpp diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp new file mode 100644 index 0000000..aa38ec9 --- /dev/null +++ b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp @@ -0,0 +1,77 @@ +#include +#include +#include "JfjFrameUdpReceiver.hpp" + +using namespace std; +using namespace buffer_config; + +JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) { + udp_receiver_.bind(port); +} + +JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { + udp_receiver_.disconnect(); +} + +inline void JfjFrameUdpReceiver::init_frame(ImageMetadata& frame_metadata, const jfjoch_packet_t& c_packet) { + frame_metadata.pulse_id = c_packet.timestamp; + frame_metadata.frame_index = c_packet.framenum; + frame_metadata.daq_rec = (uint32_t) c_packet.debug; + frame_metadata.is_good_image = (int32_t) true; +} + +inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, char* frame_buffer){ + + while(!m_buffer.is_empty()){ + // Happens if the last packet from the previous frame gets lost. + if (m_frame_index != m_buffer.peek_front().framenum) { + m_frame_index = m_buffer.peek_front().framenum; + frame_metadata.is_good_image = (int32_t) false; + return metadata.pulse_id; + } + + // Otherwise pop the queue (and set current frame index) + jfjoch_packet_t& c_packet = m_buffer.pop_front(); + m_frame_index = c_packet.framenum; + + // Always copy metadata (otherwise problem when 0th packet gets lost) + this->init_frame(metadata, c_packet); + + // Copy data to frame buffer + size_t offset = JUNGFRAU_DATA_BYTES_PER_PACKET * c_packet.packetnum; + memcpy( (void*) (frame_buffer + offset), c_packet.data, JUNGFRAU_DATA_BYTES_PER_PACKET); + metadata.n_recv_packets++; + + // Last frame packet received. Frame finished. + if (c_packet.packetnum == JFJ_N_PACKETS_PER_FRAME - 1){ + return metadata.pulse_id; + } + } + + // We emptied the buffer. + m_buffer.reset(); + return 0; +} + +uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){ + // Reset the metadata and frame buffer for the next frame. + metadata.pulse_id = 0; + metadata.n_recv_packets = 0; + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + + // Process leftover packages in the buffer + if (!m_buffer.is_empty()) { + auto pulse_id = process_packets(metadata, frame_buffer); + if (pulse_id != 0) { return pulse_id; } + } + + while (true) { + // Receive new packages (pass if none)... + m_buffer.fill_from(m_udp_receiver); + if (m_buffer.is_empty()) { continue; } + + // ... and process them + auto pulse_id = process_packets(metadata, frame_buffer); + if (pulse_id != 0) { return pulse_id; } + } +} diff --git a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp index 0d1c9ee..e1e86ab 100644 --- a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp +++ b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp @@ -13,7 +13,6 @@ using namespace std; TEST(BufferUdpReceiver, simple_recv) { auto n_packets = JF_N_PACKETS_PER_FRAME; - int source_id = 1234; int n_frames = 5; uint16_t udp_port = MOCK_UDP_PORT; @@ -21,7 +20,7 @@ TEST(BufferUdpReceiver, simple_recv) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -57,7 +56,6 @@ TEST(BufferUdpReceiver, simple_recv) ASSERT_EQ(metadata.daq_rec, i_frame + 10000); // -1 because we skipped a packet. ASSERT_EQ(metadata.n_recv_packets, n_packets); - ASSERT_EQ(metadata.module_id, source_id); } ::close(send_socket_fd); @@ -66,7 +64,6 @@ TEST(BufferUdpReceiver, simple_recv) TEST(BufferUdpReceiver, missing_middle_packet) { auto n_packets = JF_N_PACKETS_PER_FRAME; - int source_id = 1234; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -74,7 +71,7 @@ TEST(BufferUdpReceiver, missing_middle_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + JfjFrameUdpReceiver udp_receiver(udp_port, source_id); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -115,7 +112,6 @@ TEST(BufferUdpReceiver, missing_middle_packet) ASSERT_EQ(metadata.daq_rec, i_frame + 10000); // -1 because we skipped a packet. ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - ASSERT_EQ(metadata.module_id, source_id); } ::close(send_socket_fd); @@ -124,7 +120,6 @@ TEST(BufferUdpReceiver, missing_middle_packet) TEST(BufferUdpReceiver, missing_first_packet) { auto n_packets = JF_N_PACKETS_PER_FRAME; - int source_id = 1234; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -132,7 +127,7 @@ TEST(BufferUdpReceiver, missing_first_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -173,7 +168,6 @@ TEST(BufferUdpReceiver, missing_first_packet) ASSERT_EQ(metadata.daq_rec, i_frame + 10000); // -1 because we skipped a packet. ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - ASSERT_EQ(metadata.module_id, source_id); } ::close(send_socket_fd); @@ -182,7 +176,6 @@ TEST(BufferUdpReceiver, missing_first_packet) TEST(BufferUdpReceiver, missing_last_packet) { auto n_packets = JF_N_PACKETS_PER_FRAME; - int source_id = 1234; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -190,7 +183,7 @@ TEST(BufferUdpReceiver, missing_last_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -224,16 +217,14 @@ TEST(BufferUdpReceiver, missing_last_packet) // n_frames -1 because the last frame is not complete. for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); + auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); ASSERT_EQ(i_frame + 1, pulse_id); ASSERT_EQ(metadata.frame_index, i_frame + 1000); ASSERT_EQ(metadata.daq_rec, i_frame + 10000); // -1 because we skipped a packet. ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - ASSERT_EQ(metadata.module_id, source_id); } ::close(send_socket_fd); -} \ No newline at end of file +}