diff --git a/CMakeLists.txt b/CMakeLists.txt index 74c7972..dbef01c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,7 @@ add_subdirectory("core-buffer") add_subdirectory("jf-udp-recv") add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") +add_subdirectory("jfj-udp-recv") add_subdirectory("sf-stream") add_subdirectory("sf-writer") diff --git a/core-buffer/include/jungfraujoch.hpp b/core-buffer/include/jungfraujoch.hpp new file mode 100644 index 0000000..6d219ca --- /dev/null +++ b/core-buffer/include/jungfraujoch.hpp @@ -0,0 +1,37 @@ +#ifndef JUNGFRAUJOCH_HPP +#define JUNGFRAUJOCH_HPP + +#include + +#define JFJOCH_N_MODULES 32 +#define JFJOCH_BYTES_PER_PACKET 8240 +#define JFJOCH_DATA_BYTES_PER_PACKET 8192 +#define JFJOCH_N_PACKETS_PER_FRAME (JFJOCH_N_MODULES * 128) +#define JFJOCH_DATA_BYTES_PER_FRAME (JFJOCH_N_MODULES * 1048576) + +// 48 bytes + 8192 bytes = 8240 bytes +#pragma pack(push) +#pragma pack(2) +struct jfjoch_packet_t { + uint64_t framenum; + uint32_t exptime; + uint32_t packetnum; + + int64_t bunchid; + uint64_t timestamp; + + uint16_t moduleID; + uint16_t xCoord; + uint16_t yCoord; + uint16_t zCoord; + + uint32_t debug; + uint16_t roundRobin; + uint8_t detectortype; + uint8_t headerVersion; + char data[JFJOCH_DATA_BYTES_PER_PACKET]; +}; +#pragma pack(pop) + + +#endif diff --git a/core-buffer/test/test_PacketBuffer.cpp b/core-buffer/test/test_PacketBuffer.cpp new file mode 100644 index 0000000..110e6ca --- /dev/null +++ b/core-buffer/test/test_PacketBuffer.cpp @@ -0,0 +1,254 @@ +#include +#include +#include "gtest/gtest.h" +#include "PacketBuffer.hpp" + +#include +#include +#include + +using namespace std; + +template +class MockReceiver{ + public: + int idx_packet = 42000; + int packet_per_frame = 512; + int num_bunches = 100; + int num_packets =50; + + int receive_many(mmsghdr* msgs, const size_t n_msgs){ + // Receive 'num_packets numner of packets' + + for(int ii=0; ii(mmsghdr[ii].msg_hdr.msg_iov->iov_base); + refer.bunchid = idx_packet / packet_per_frame; + refer.packetnum = idx_packet % packet_per_frame; + idx_packet++; + } + return std::min(num_packets, n_msgs); + }; +}; + +// +// +// +// +//TEST(BufferUdpReceiver, simple_recv) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 5; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// for (int i_frame=0; i_frame < n_frames; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp( +// metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets); +// } +// +// ::close(send_socket_fd); +//} +// +//TEST(BufferUdpReceiver, missing_middle_packet) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 3; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port, source_id); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// for (int i_frame=0; i_frame < n_frames; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp( +// metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); +// } +// +// ::close(send_socket_fd); +//} +// +//TEST(BufferUdpReceiver, missing_first_packet) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 3; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// for (int i_frame=0; i_frame < n_frames; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp( +// metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); +// } +// +// ::close(send_socket_fd); +//} +// +//TEST(BufferUdpReceiver, missing_last_packet) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 3; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// // n_frames -1 because the last frame is not complete. +// for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); +// } +// +// ::close(send_socket_fd); +//} diff --git a/jfj-udp-recv/CMakeLists.txt b/jfj-udp-recv/CMakeLists.txt new file mode 100644 index 0000000..8a19873 --- /dev/null +++ b/jfj-udp-recv/CMakeLists.txt @@ -0,0 +1,12 @@ +file(GLOB SOURCES src/*.cpp) + +add_library(jfj-udp-recv-lib STATIC ${SOURCES}) +target_include_directories(jfj-udp-recv-lib PUBLIC include/) +target_link_libraries(jfj-udp-recv-lib external core-buffer-lib) + +add_executable(jfj-udp-recv src/main.cpp) +set_target_properties(jfj-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv) +target_link_libraries(jfj-udp-recv jfj-udp-recv-lib zmq rt) + +enable_testing() +add_subdirectory(test/) diff --git a/jfj-udp-recv/README.md b/jfj-udp-recv/README.md new file mode 100644 index 0000000..b51f999 --- /dev/null +++ b/jfj-udp-recv/README.md @@ -0,0 +1,164 @@ +# sf-buffer +sf-buffer is the component that receives the detector data in form of UDP +packages and writes them down to disk to a binary format. In addition, it +sends a copy of the module frame to sf-stream via ZMQ. + +Each sf-buffer process is taking care of a single detector module. The +processes are all independent and do not rely on any external data input +to maximize isolation and possible interactions in our system. + +The main design principle is simplicity and decoupling: + +- No interprocess dependencies/communication. +- No dependencies on external libraries (as much as possible). +- Using POSIX as much as possible. + +We are optimizing for maintainability and long term stability. Performance is +of concern only if the performance criteria are not met. + +## Overview + +![image_buffer_overview](../docs/sf_daq_buffer-overview-buffer.jpg) + +sf-buffer is a single threaded application (without counting the ZMQ IO threads) +that does both receiving, assembling, writing and sending in the same thread. + +### UDP receiving + +Each process listens to one udp port. Packets coming to this udp port are +assembled into frames. Frames (either complete or with missing packets) are +passed forward. The number of received packets is saved so we can later +(at image assembly time) determine if the frame is valid or not. At this point +we do no validation. + +We are currently using **recvmmsg** to minimize the number of switches to +kernel mode. + +We expect all packets to come in order or not come at all. Once we see the +package for the next pulse_id we can assume no more packages are coming for +the previous one, and send the assembled frame down the program. + +### File writing + +Files are written to disk in frames - one write to disk per frame. This gives +us a relaxed 10ms interval of 1 MB writes. + +#### File format + +The binary file on disk is just a serialization of multiple +**BufferBinaryFormat** structs: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) + +#pragma pack(push) +#pragma pack(1) +struct BufferBinaryFormat { + const char FORMAT_MARKER = 0xBE; + ModuleFrame meta; + char data[buffer_config::MODULE_N_BYTES]; +}; +#pragma pack(pop) +``` + +![file_layout_image](../docs/sf_daq_buffer-FileLayout.jpg) + +Each frame is composed by: + +- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame. +- **ModuleFrame** - frame meta used in image assembly phase. +- **Data** - assembled frame from a single module. + +Frames are written one after another to a specific offset in the file. The +offset is calculated based on the pulse_id, so each frame has a specific place +in the file and there is no need to have an index for frame retrieval. + +The offset where a specific pulse_id is written in a file is calculated: + +```c++ +// We save 1000 pulses in each file. +const uint64_t FILE_MOD = 1000 + +// Relative index of pulse_id inside file. +size_t file_base = pulse_id % FILE_MOD; +// Offset in bytes of relative index in file. +size_t file_offset = file_base * sizeof(BufferBinaryFormat); +``` + +We now know where to look for data inside the file, but we still don't know +inside which file to look. For this we need to discuss the folder structure. + +#### Folder structure + +The folder (as well as file) structure is deterministic in the sense that given +a specific pulse_id, we can directly calculate the folder, file, and file +offset where the data is stored. This allows us to have independent writing +and reading from the buffer without building any indexes. + +The binary files written by sf_buffer are saved to: + +[detector_folder]/[module_folder]/[data_folder]/[data_file].bin + +- **detector\_folder** should always be passed as an absolute path. This is the +container that holds all data related to a specific detector. +- **module\_folder** is usually composed like "M00", "M01". It separates data +from different modules of one detector. +- **data\_folder** and **data\_file** are automatically calculated based on the +current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our +index for accessing data. + +![folder_layout_image](../docs/sf_daq_buffer-FolderLayout.jpg) + +```c++ +// FOLDER_MOD = 100000 +int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD; +// FILE_MOD = 1000 +int data_file = (pulse_id % FILE_MOD) * FILE_MOD; +``` + +The data_folder and data_file folders are named as the first pulse_id that +should be stored inside them. + +FOLDER_MOD == 100000 means that each data_folder will contain data for 100000 +pulses, while FILE_MOD == 1000 means that each file inside the data_folder +will contain 1000 pulses. The total number of data_files in each data_folder +will therefore be **FILE\_MOD / FOLDER\_MOD = 100**. + +#### Analyzing the buffer on disk +In **sf-utils** there is a Python module that allows you to read directly the +buffer in order to debug it or to verify the consistency between the HDF5 file +and the received data. + +- VerifyH5DataConsistency.py checks the consistency between the H5 file and +buffer. +- BinaryBufferReader.py reads the buffer and prints meta. The class inside +can also be used in external scripts. + +### ZMQ sending + +A copy of the data written to disk is also send via ZMQ to the sf-stream. This +is used to provide live viewing / processing capabilities. Each module data is +sent separately, and this is later assembled in the sf-stream. + +We use the PUB/SUB mechanism for distributing this data - we cannot control the +rate of the producer, and we would like to avoid distributed image assembly +if possible, so PUSH/PULL does not make sense in this case. + +We provide no guarantees on live data delivery, but in practice the number of +dropped or incomplete frames in currently negligible. + +The protocol is a serialization of the same data structures we use to +write on disk (no need for additional memory operations before sending out +data). It uses a 2 part multipart ZMQ message: + +- The first part is a serialization of the ModuleFrame struct (see above). +- The second part is the data field in the BufferBinaryFormat struct (the frame +data). diff --git a/jfj-udp-recv/include/JfjFrameStats.hpp b/jfj-udp-recv/include/JfjFrameStats.hpp new file mode 100644 index 0000000..7ab6636 --- /dev/null +++ b/jfj-udp-recv/include/JfjFrameStats.hpp @@ -0,0 +1,31 @@ +#include +#include +#include + +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP + + +class FrameStats { + const std::string detector_name_; + const int module_id_; + size_t stats_time_; + + int frames_counter_; + int n_missed_packets_; + int n_corrupted_frames_; + int n_corrupted_pulse_id_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + FrameStats(const std::string &detector_name, + const int module_id, + const size_t stats_time); + void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); +}; + + +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp new file mode 100644 index 0000000..aad5962 --- /dev/null +++ b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp @@ -0,0 +1,32 @@ +#ifndef SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP +#define SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP + +#include +#include "PacketUdpReceiver.hpp" +#include "formats.hpp" +#include "buffer_config.hpp" +#include "PacketBuffer.hpp" +#include "jungfraujoch.hpp" + +/** JungfrauJoch UDP receiver + + Wrapper class to capture frames from the UDP stream of the JungfrauJoch FPGA card. + NOTE: This design will not scale well for higher frame rates... +**/ +class JfjFrameUdpReceiver { + PacketUdpReceiver m_udp_receiver; + uint64_t m_frame_index; + + PacketBuffer m_buffer; + + inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet); + inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer); + +public: + JfjFrameUdpReceiver(const uint16_t port); + virtual ~JfjFrameUdpReceiver(); + uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); +}; + + +#endif //SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP diff --git a/jfj-udp-recv/include/PacketBuffer.hpp b/jfj-udp-recv/include/PacketBuffer.hpp new file mode 100644 index 0000000..0697a5f --- /dev/null +++ b/jfj-udp-recv/include/PacketBuffer.hpp @@ -0,0 +1,112 @@ +#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP +#define CIRCULAR_BUFFER_TEMPLATE_HPP + +#include +#include +#include +#include +#include +#include + + +/** Linear data buffer (NOT FIFO) + + Simplified data buffer that provides pop and push operations and + bundles the actual container with metadata required by . + It stores the actual data in an accessible C-style array. **/ +template +class PacketBuffer{ +public: + PacketBuffer() { + for (int i = 0; i < CAPACITY; i++) { + m_recv_buff_ptr[i].iov_base = (void*) &(m_container[i]); + m_recv_buff_ptr[i].iov_len = sizeof(T); + + // C-structure as expected by + m_msgs[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i]; + m_msgs[i].msg_hdr.msg_iovlen = 1; + m_msgs[i].msg_hdr.msg_name = &m_sock_from[i]; + m_msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } + }; + // ~PacketBuffer() {}; + + /**Diagnostics**/ + int size() const { return ( idx_write-idx_read ); } + int capacity() const { return m_capacity; } + bool is_full() const { return bool(idx_write >= m_capacity); } + bool is_empty() const { return bool(idx_write <= idx_read); } + + /**Operators**/ + void reset(){ idx_write = 0; idx_read = 0; }; // Reset the buffer + T& container(){ return m_container; }; // Direct container reference + mmsghdr& msgs(){ return m_msgs; }; + + /**Element access**/ + T& pop_front(); //Destructive read + const T& peek_front(); //Non-destructive read + void push_back(T item); //Write new element to buffer + + /**Fill from UDP receiver**/ + template + void fill_from(TY& recv){ + std::lock_guard g_guard(m_mutex); + this->idx_write = recv.receive_many(m_msgs, this->capacity()); + // Returns -1 with errno=11 if no data received + if(idx_write==-1){ idx_write = 0; } + this->idx_read = 0; + } + +private: + // Main container + T m_container[CAPACITY]; + const size_t m_capacity = CAPACITY; + /**Guards**/ + std::mutex m_mutex; + /**Read and write index**/ + int idx_write = 0; + int idx_read = 0; + + // C-structures as expected by + mmsghdr m_msgs[CAPACITY]; + iovec m_recv_buff_ptr[CAPACITY]; + sockaddr_in m_sock_from[CAPACITY]; +}; + + +/*********************************************************************/ +/*********************************************************************/ +/*********************************************************************/ + +/** Destructive read + Standard read access to queues (i.e. progress the read pointer). + Throws 'std::length_error' if container is empty. **/ +template +T& PacketBuffer::pop_front(){ + std::lock_guard g_guard(m_mutex); + if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } + idx_read++; + return m_container[idx_read-1]; +} + +/** Non-destructive read + Standard, non-destructive read access (does not progress the read pointer). + Throws 'std::length_error' if container is empty. **/ +template +const T& PacketBuffer::peek_front(){ + std::lock_guard g_guard(m_mutex); + if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } + return m_container[idx_read]; +} + + +/** Push an element into the end of the buffer**/ +template +void PacketBuffer::push_back(T item){ + std::lock_guard g_guard(m_mutex); + if(this->is_full()){ throw std::out_of_range("Attempted to write a full buffer!"); } + m_container[idx_write] = item; + idx_write++; +} + +#endif // CIRCULAR_BUFFER_TEMPLATE_HPP diff --git a/jfj-udp-recv/include/PacketUdpReceiver.hpp b/jfj-udp-recv/include/PacketUdpReceiver.hpp new file mode 100644 index 0000000..cc7e093 --- /dev/null +++ b/jfj-udp-recv/include/PacketUdpReceiver.hpp @@ -0,0 +1,22 @@ +#ifndef UDPRECEIVER_H +#define UDPRECEIVER_H + +#include + +class PacketUdpReceiver { + + int socket_fd_; + +public: + PacketUdpReceiver(); + virtual ~PacketUdpReceiver(); + + bool receive(void* buffer, const size_t buffer_n_bytes); + int receive_many(mmsghdr* msgs, const size_t n_msgs); + + void bind(const uint16_t port); + void disconnect(); +}; + + +#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H diff --git a/jfj-udp-recv/src/JfjFrameStats.cpp b/jfj-udp-recv/src/JfjFrameStats.cpp new file mode 100644 index 0000000..ee48423 --- /dev/null +++ b/jfj-udp-recv/src/JfjFrameStats.cpp @@ -0,0 +1,71 @@ +#include +#include "JfjFrameStats.hpp" + +using namespace std; +using namespace chrono; + +FrameStats::FrameStats( + const std::string &detector_name, + const int module_id, + const size_t stats_time) : + detector_name_(detector_name), + module_id_(module_id), + stats_time_(stats_time) +{ + reset_counters(); +} + +void FrameStats::reset_counters() +{ + frames_counter_ = 0; + n_missed_packets_ = 0; + n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) +{ + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + } + + if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { + n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; + n_corrupted_frames_++; + } + + frames_counter_++; + + auto time_passed = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { + print_stats(); + reset_counters(); + } +} + +void FrameStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_udp_recv"; + cout << ",detector_name=" << detector_name_; + cout << ",module_name=M" << module_id_; + cout << " "; + cout << "n_missed_packets=" << n_missed_packets_ << "i"; + cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp new file mode 100644 index 0000000..059da2b --- /dev/null +++ b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp @@ -0,0 +1,86 @@ +#include +#include "JfjFrameUdpReceiver.hpp" + +using namespace std; +using namespace buffer_config; + +std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) { + os << "Frame number: " << packet.framenum << std::endl; + os << "Packet number: " << packet.packetnum << std::endl; + os << "Bunch id: " << packet.bunchid << std::endl; + os << std::endl; + return os; +} + +JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) { + m_udp_receiver.bind(port); +} + +JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { + m_udp_receiver.disconnect(); +} + +inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { + // std::cout << c_packet; + + metadata.pulse_id = c_packet.bunchid; + metadata.frame_index = c_packet.framenum; + metadata.daq_rec = (uint64_t) c_packet.debug; + metadata.module_id = (int64_t) 0; +} + +inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char* frame_buffer){ + + while(!m_buffer.is_empty()){ + // Happens if the last packet from the previous frame gets lost. + if (m_frame_index != m_buffer.peek_front().framenum) { + m_frame_index = m_buffer.peek_front().framenum; + std::cout << "Peeked pulse: " << metadata.pulse_id << std::endl; + return metadata.pulse_id; + } + + // Otherwise pop the queue (and set current frame index) + jfjoch_packet_t& c_packet = m_buffer.pop_front(); + m_frame_index = c_packet.framenum; + + // Always copy metadata (otherwise problem when 0th packet gets lost) + this->init_frame(metadata, c_packet); + + // Copy data to frame buffer + size_t offset = JFJOCH_DATA_BYTES_PER_PACKET * c_packet.packetnum; + memcpy( (void*) (frame_buffer + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET); + metadata.n_recv_packets++; + + // Last frame packet received. Frame finished. + if (c_packet.packetnum == JFJOCH_N_PACKETS_PER_FRAME - 1){ + return metadata.pulse_id; + } + } + + // We emptied the buffer. + // m_buffer.reset(); + return 0; +} + +uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){ + // Reset the metadata and frame buffer for the next frame. (really needed?) + metadata.pulse_id = 0; + metadata.n_recv_packets = 0; + memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_FRAME); + // Process leftover packages in the buffer + if (!m_buffer.is_empty()) { + auto pulse_id = process_packets(metadata, frame_buffer); + if (pulse_id != 0) { return pulse_id; } + } + + while (true) { + // Receive new packages (pass if none)... + m_buffer.reset(); + m_buffer.fill_from(m_udp_receiver); + if (m_buffer.is_empty()) { continue; } + + // ... and process them + auto pulse_id = process_packets(metadata, frame_buffer); + if (pulse_id != 0) { return pulse_id; } + } +} diff --git a/jfj-udp-recv/src/PacketUdpReceiver.cpp b/jfj-udp-recv/src/PacketUdpReceiver.cpp new file mode 100644 index 0000000..713dd98 --- /dev/null +++ b/jfj-udp-recv/src/PacketUdpReceiver.cpp @@ -0,0 +1,66 @@ +#include +#include +#include "PacketUdpReceiver.hpp" +#include "jungfrau.hpp" +#include +#include +#include "buffer_config.hpp" + +using namespace std; +using namespace buffer_config; + +PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { } + +PacketUdpReceiver::~PacketUdpReceiver() { + disconnect(); +} + +void PacketUdpReceiver::bind(const uint16_t port){ + if (socket_fd_ > -1) { + throw runtime_error("Socket already bound."); + } + + socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); + if (socket_fd_ < 0) { + throw runtime_error("Cannot open socket."); + } + + sockaddr_in server_address = {0}; + server_address.sin_family = AF_INET; + server_address.sin_addr.s_addr = INADDR_ANY; + server_address.sin_port = htons(port); + + timeval udp_socket_timeout; + udp_socket_timeout.tv_sec = 0; + udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT; + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) { + throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno))); + } + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) { + throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno))); + }; + //TODO: try to set SO_RCVLOWAT + + auto bind_result = ::bind(socket_fd_, reinterpret_cast(&server_address), sizeof(server_address)); + + if (bind_result < 0) { + throw runtime_error("Cannot bind socket."); + } +} + +int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){ + return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0); +} + +bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){ + auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0); + + return (data_len == buffer_n_bytes) ? true : false; +} + +void PacketUdpReceiver::disconnect(){ + close(socket_fd_); + socket_fd_ = -1; +} diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp new file mode 100644 index 0000000..2afb451 --- /dev/null +++ b/jfj-udp-recv/src/main.cpp @@ -0,0 +1,74 @@ +#include +#include +#include +#include + +#include "formats.hpp" +#include "buffer_config.hpp" +#include "JfjFrameUdpReceiver.hpp" +#include "BufferUtils.hpp" +#include "JfjFrameStats.hpp" + +using namespace std; +using namespace chrono; +using namespace buffer_config; +using namespace BufferUtils; + +int main (int argc, char *argv[]) { + + if (argc != 3) { + cout << endl; + cout << "Usage: jfj_udp_recv [detector_json_filename]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << endl; + + exit(-1); + } + + const auto config = read_json_config(string(argv[1])); + + const auto udp_port = config.start_udp_port; + JfjFrameUdpReceiver receiver(udp_port); + RamBuffer buffer(config.detector_name, config.n_modules); + FrameStats stats(config.detector_name, 0, STATS_TIME); + + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, ZMQ_IO_THREADS); + auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "jungfraujoch"); + + // Might be better creating a structure for double buffering + ModuleFrame frameMeta; + ImageMetadata imageMeta; + char* dataBuffer = new char[JFJOCH_DATA_BYTES_PER_FRAME]; + + uint64_t pulse_id_previous = 0; + uint64_t frame_index_previous = 0; + + + while (true) { + // NOTE: Needs to be pipelined for really high frame rates + auto pulse_id = receiver.get_frame_from_udp(frameMeta, dataBuffer); + + bool bad_pulse_id = false; + + if ( ( frameMeta.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) { + bad_pulse_id = true; + } else { + imageMeta.pulse_id = frameMeta.pulse_id; + imageMeta.frame_index = frameMeta.frame_index; + imageMeta.daq_rec = frameMeta.daq_rec; + imageMeta.is_good_image = true; + + buffer.write_frame(frameMeta, dataBuffer); + zmq_send(sender, &imageMeta, sizeof(imageMeta), 0); + } + + stats.record_stats(frameMeta, bad_pulse_id); + + pulse_id_previous = pulse_id; + frame_index_previous = frameMeta.frame_index; + + } + + delete[] dataBuffer; +} diff --git a/jfj-udp-recv/test/CMakeLists.txt b/jfj-udp-recv/test/CMakeLists.txt new file mode 100644 index 0000000..77eeb3a --- /dev/null +++ b/jfj-udp-recv/test/CMakeLists.txt @@ -0,0 +1,8 @@ +add_executable(jfj-udp-recv-tests main.cpp) + +target_link_libraries(jfj-udp-recv-tests + core-buffer-lib + jfj-udp-recv-lib + gtest + ) + diff --git a/jfj-udp-recv/test/main.cpp b/jfj-udp-recv/test/main.cpp new file mode 100644 index 0000000..8f2cd01 --- /dev/null +++ b/jfj-udp-recv/test/main.cpp @@ -0,0 +1,10 @@ +#include "gtest/gtest.h" +#include "test_PacketUdpReceiver.cpp" +#include "test_FrameUdpReceiver.cpp" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/jfj-udp-recv/test/mock/udp.hpp b/jfj-udp-recv/test/mock/udp.hpp new file mode 100644 index 0000000..1cef271 --- /dev/null +++ b/jfj-udp-recv/test/mock/udp.hpp @@ -0,0 +1,16 @@ +#ifndef MOCK_UDP_H +#define MOCK_UDP_H + +const int MOCK_UDP_PORT(13000); + +sockaddr_in get_server_address(uint16_t udp_port) +{ + sockaddr_in server_address = {0}; + server_address.sin_family = AF_INET; + server_address.sin_addr.s_addr = INADDR_ANY; + server_address.sin_port = htons(udp_port); + + return server_address; +} + +#endif \ No newline at end of file diff --git a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp new file mode 100644 index 0000000..3f490c7 --- /dev/null +++ b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp @@ -0,0 +1,230 @@ +#include +#include +#include "gtest/gtest.h" +#include "JfjFrameUdpReceiver.hpp" +#include "mock/udp.hpp" + +#include +#include +#include + +using namespace std; + +TEST(BufferUdpReceiver, simple_recv) +{ + int n_packets = JFJOCH_N_PACKETS_PER_FRAME; + int n_frames = 5; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + JfjFrameUdpReceiver udp_receiver(udp_port); + + auto handle = async(launch::async, [&](){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JFJOCH_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_middle_packet) +{ + int n_packets = JFJOCH_N_PACKETS_PER_FRAME; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + JfjFrameUdpReceiver udp_receiver(udp_port); + + auto handle = async(launch::async, [&](){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JFJOCH_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_first_packet) +{ + auto n_packets = JFJOCH_N_PACKETS_PER_FRAME; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + JfjFrameUdpReceiver udp_receiver(udp_port); + + auto handle = async(launch::async, [&](){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JFJOCH_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_last_packet) +{ + int n_packets = JFJOCH_N_PACKETS_PER_FRAME; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + JfjFrameUdpReceiver udp_receiver(udp_port); + + auto handle = async(launch::async, [&](){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JFJOCH_DATA_BYTES_PER_FRAME); + + // n_frames -1 because the last frame is not complete. + for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + } + + ::close(send_socket_fd); +} diff --git a/jfj-udp-recv/test/test_PacketUdpReceiver.cpp b/jfj-udp-recv/test/test_PacketUdpReceiver.cpp new file mode 100644 index 0000000..1be343a --- /dev/null +++ b/jfj-udp-recv/test/test_PacketUdpReceiver.cpp @@ -0,0 +1,170 @@ +#include +#include +#include "gtest/gtest.h" +#include "mock/udp.hpp" +#include "PacketUdpReceiver.hpp" + +#include +#include + +using namespace std; + +TEST(PacketUdpReceiver, simple_recv) +{ + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + send_udp_buffer.packetnum = 91; + send_udp_buffer.framenum = 92; + send_udp_buffer.bunchid = 93; + send_udp_buffer.debug = 94; + + auto server_address = get_server_address(udp_port); + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + this_thread::sleep_for(chrono::milliseconds(100)); + + jungfrau_packet recv_udp_buffer; + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + EXPECT_EQ(send_udp_buffer.packetnum, recv_udp_buffer.packetnum); + EXPECT_EQ(send_udp_buffer.framenum, recv_udp_buffer.framenum); + EXPECT_EQ(send_udp_buffer.bunchid, recv_udp_buffer.bunchid); + EXPECT_EQ(send_udp_buffer.debug, recv_udp_buffer.debug); + + ASSERT_FALSE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + udp_receiver.disconnect(); + ::close(send_socket_fd); +} + +TEST(PacketUdpReceiver, false_recv) +{ + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + jungfrau_packet recv_udp_buffer; + + auto server_address = get_server_address(udp_port); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET-1, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_FALSE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET-1, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET-1)); + + udp_receiver.disconnect(); + ::close(send_socket_fd); +} + +TEST(PacketUdpReceiver, receive_many) +{ + auto n_msg_buffer = JF_N_PACKETS_PER_FRAME; + jungfrau_packet recv_buffer[n_msg_buffer]; + iovec recv_buff_ptr[n_msg_buffer]; + struct mmsghdr msgs[n_msg_buffer]; + struct sockaddr_in sockFrom[n_msg_buffer]; + + for (int i = 0; i < n_msg_buffer; i++) { + recv_buff_ptr[i].iov_base = (void*) &(recv_buffer[i]); + recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet); + + msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i]; + msgs[i].msg_hdr.msg_iovlen = 1; + msgs[i].msg_hdr.msg_name = &sockFrom[i]; + msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } + + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + + auto server_address = get_server_address(udp_port); + + send_udp_buffer.bunchid = 0; + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + send_udp_buffer.bunchid = 1; + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + this_thread::sleep_for(chrono::milliseconds(10)); + + auto n_msgs = udp_receiver.receive_many(msgs, JF_N_PACKETS_PER_FRAME); + ASSERT_EQ(n_msgs, 2); + + for (size_t i=0;i