From 0ae9eb1e886f5e7e140f88bc5f621ea258ae0197 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Fri, 28 May 2021 17:44:15 +0200 Subject: [PATCH 1/5] Placeholder based on jungfrau --- CMakeLists.txt | 1 + core-buffer/include/jungfraujoch.hpp | 37 +++ jfj-udp-recv/CMakeLists.txt | 18 ++ jfj-udp-recv/README.md | 164 +++++++++++++ jfj-udp-recv/include/DataBuffer.hpp | 92 +++++++ jfj-udp-recv/include/ImageStats.hpp | 31 +++ jfj-udp-recv/include/JFJochUdpReceiver.hpp | 33 +++ jfj-udp-recv/include/PacketUdpReceiver.hpp | 22 ++ jfj-udp-recv/src/ImageStats.cpp | 71 ++++++ jfj-udp-recv/src/JFJochUdpReceiver.cpp | 112 +++++++++ jfj-udp-recv/src/PacketUdpReceiver.cpp | 66 +++++ jfj-udp-recv/src/main.cpp | 78 ++++++ jfj-udp-recv/test/CMakeLists.txt | 8 + jfj-udp-recv/test/main.cpp | 10 + jfj-udp-recv/test/mock/udp.hpp | 16 ++ jfj-udp-recv/test/test_FrameUdpReceiver.cpp | 239 +++++++++++++++++++ jfj-udp-recv/test/test_PacketUdpReceiver.cpp | 170 +++++++++++++ 17 files changed, 1168 insertions(+) create mode 100644 core-buffer/include/jungfraujoch.hpp create mode 100644 jfj-udp-recv/CMakeLists.txt create mode 100644 jfj-udp-recv/README.md create mode 100644 jfj-udp-recv/include/DataBuffer.hpp create mode 100644 jfj-udp-recv/include/ImageStats.hpp create mode 100644 jfj-udp-recv/include/JFJochUdpReceiver.hpp create mode 100644 jfj-udp-recv/include/PacketUdpReceiver.hpp create mode 100644 jfj-udp-recv/src/ImageStats.cpp create mode 100644 jfj-udp-recv/src/JFJochUdpReceiver.cpp create mode 100644 jfj-udp-recv/src/PacketUdpReceiver.cpp create mode 100644 jfj-udp-recv/src/main.cpp create mode 100644 jfj-udp-recv/test/CMakeLists.txt create mode 100644 jfj-udp-recv/test/main.cpp create mode 100644 jfj-udp-recv/test/mock/udp.hpp create mode 100644 jfj-udp-recv/test/test_FrameUdpReceiver.cpp create mode 100644 jfj-udp-recv/test/test_PacketUdpReceiver.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 74c7972..dbef01c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,7 @@ add_subdirectory("core-buffer") add_subdirectory("jf-udp-recv") add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") +add_subdirectory("jfj-udp-recv") add_subdirectory("sf-stream") add_subdirectory("sf-writer") diff --git a/core-buffer/include/jungfraujoch.hpp b/core-buffer/include/jungfraujoch.hpp new file mode 100644 index 0000000..bbfbb08 --- /dev/null +++ b/core-buffer/include/jungfraujoch.hpp @@ -0,0 +1,37 @@ +#ifndef JUNGFRAUJOCH_H +#define JUNGFRAUJOCH_H + +#include + +#define JFJOCH_N_MODULES 32 +#define JFJOCH_BYTES_PER_PACKET 8240 +#define JFJOCH_DATA_BYTES_PER_PACKET 8192 +#define JFJOCH_N_PACKETS_PER_FRAME JFJOCH_N_MODULES * 128 +#define JFJOCH_DATA_BYTES_PER_FRAME JFJOCH_N_MODULES * 1048576 + +// 48 bytes + 8192 bytes = 8240 bytes +#pragma pack(push) +#pragma pack(2) +struct jfjoch_packet_t { + uint64_t framenum; + uint32_t exptime; + uint32_t packetnum; + + double bunchid; + uint64_t timestamp; + + uint16_t moduleID; + uint16_t xCoord; + uint16_t yCoord; + uint16_t zCoord; + + uint32_t debug; + uint16_t roundRobin; + uint8_t detectortype; + uint8_t headerVersion; + char data[JFJOCH_DATA_BYTES_PER_PACKET]; +}; +#pragma pack(pop) + + +#endif diff --git a/jfj-udp-recv/CMakeLists.txt b/jfj-udp-recv/CMakeLists.txt new file mode 100644 index 0000000..3c83127 --- /dev/null +++ b/jfj-udp-recv/CMakeLists.txt @@ -0,0 +1,18 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(jf-udp-recv-lib STATIC ${SOURCES}) +target_include_directories(jf-udp-recv-lib PUBLIC include/) +target_link_libraries(jf-udp-recv-lib + external + core-buffer-lib) + +add_executable(jf-udp-recv src/main.cpp) +set_target_properties(jf-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv) +target_link_libraries(jf-udp-recv + jf-udp-recv-lib + zmq + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/jfj-udp-recv/README.md b/jfj-udp-recv/README.md new file mode 100644 index 0000000..504281f --- /dev/null +++ b/jfj-udp-recv/README.md @@ -0,0 +1,164 @@ +# sf-buffer +sf-buffer is the component that receives the detector data in form of UDP +packages and writes them down to disk to a binary format. In addition, it +sends a copy of the module frame to sf-stream via ZMQ. + +Each sf-buffer process is taking care of a single detector module. The +processes are all independent and do not rely on any external data input +to maximize isolation and possible interactions in our system. + +The main design principle is simplicity and decoupling: + +- No interprocess dependencies/communication. +- No dependencies on external libraries (as much as possible). +- Using POSIX as much as possible. + +We are optimizing for maintainability and long term stability. Performance is +of concern only if the performance criteria are not met. + +## Overview + +![image_buffer_overview](../docs/sf_daq_buffer-overview-buffer.jpg) + +sf-buffer is a single threaded application (without counting the ZMQ IO threads) +that does both receiving, assembling, writing and sending in the same thread. + +### UDP receiving + +Each process listens to one udp port. Packets coming to this udp port are +assembled into frames. Frames (either complete or with missing packets) are +passed forward. The number of received packets is saved so we can later +(at image assembly time) determine if the frame is valid or not. At this point +we do no validation. + +We are currently using **recvmmsg** to minimize the number of switches to +kernel mode. + +We expect all packets to come in order or not come at all. Once we see the +package for the next pulse_id we can assume no more packages are coming for +the previous one, and send the assembled frame down the program. + +### File writing + +Files are written to disk in frames - one write to disk per frame. This gives +us a relaxed 10ms interval of 1 MB writes. + +#### File format + +The binary file on disk is just a serialization of multiple +**BufferBinaryFormat** structs: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) + +#pragma pack(push) +#pragma pack(1) +struct BufferBinaryFormat { + const char FORMAT_MARKER = 0xBE; + ModuleFrame meta; + char data[buffer_config::MODULE_N_BYTES]; +}; +#pragma pack(pop) +``` + +![file_layout_image](../docs/sf_daq_buffer-FileLayout.jpg) + +Each frame is composed by: + +- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame. +- **ModuleFrame** - frame meta used in image assembly phase. +- **Data** - assembled frame from a single module. + +Frames are written one after another to a specific offset in the file. The +offset is calculated based on the pulse_id, so each frame has a specific place +in the file and there is no need to have an index for frame retrieval. + +The offset where a specific pulse_id is written in a file is calculated: + +```c++ +// We save 1000 pulses in each file. +const uint64_t FILE_MOD = 1000 + +// Relative index of pulse_id inside file. +size_t file_base = pulse_id % FILE_MOD; +// Offset in bytes of relative index in file. +size_t file_offset = file_base * sizeof(BufferBinaryFormat); +``` + +We now know where to look for data inside the file, but we still don't know +inside which file to look. For this we need to discuss the folder structure. + +#### Folder structure + +The folder (as well as file) structure is deterministic in the sense that given +a specific pulse_id, we can directly calculate the folder, file, and file +offset where the data is stored. This allows us to have independent writing +and reading from the buffer without building any indexes. + +The binary files written by sf_buffer are saved to: + +[detector_folder]/[module_folder]/[data_folder]/[data_file].bin + +- **detector\_folder** should always be passed as an absolute path. This is the +container that holds all data related to a specific detector. +- **module\_folder** is usually composed like "M00", "M01". It separates data +from different modules of one detector. +- **data\_folder** and **data\_file** are automatically calculated based on the +current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our +index for accessing data. + +![folder_layout_image](../docs/sf_daq_buffer-FolderLayout.jpg) + +```c++ +// FOLDER_MOD = 100000 +int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD; +// FILE_MOD = 1000 +int data_file = (pulse_id % FILE_MOD) * FILE_MOD; +``` + +The data_folder and data_file folders are named as the first pulse_id that +should be stored inside them. + +FOLDER_MOD == 100000 means that each data_folder will contain data for 100000 +pulses, while FILE_MOD == 1000 means that each file inside the data_folder +will contain 1000 pulses. The total number of data_files in each data_folder +will therefore be **FILE\_MOD / FOLDER\_MOD = 100**. + +#### Analyzing the buffer on disk +In **sf-utils** there is a Python module that allows you to read directly the +buffer in order to debug it or to verify the consistency between the HDF5 file +and the received data. + +- VerifyH5DataConsistency.py checks the consistency between the H5 file and +buffer. +- BinaryBufferReader.py reads the buffer and prints meta. The class inside +can also be used in external scripts. + +### ZMQ sending + +A copy of the data written to disk is also send via ZMQ to the sf-stream. This +is used to provide live viewing / processing capabilities. Each module data is +sent separately, and this is later assembled in the sf-stream. + +We use the PUB/SUB mechanism for distributing this data - we cannot control the +rate of the producer, and we would like to avoid distributed image assembly +if possible, so PUSH/PULL does not make sense in this case. + +We provide no guarantees on live data delivery, but in practice the number of +dropped or incomplete frames in currently negligible. + +The protocol is a serialization of the same data structures we use to +write on disk (no need for additional memory operations before sending out +data). It uses a 2 part multipart ZMQ message: + +- The first part is a serialization of the ModuleFrame struct (see above). +- The second part is the data field in the BufferBinaryFormat struct (the frame +data). diff --git a/jfj-udp-recv/include/DataBuffer.hpp b/jfj-udp-recv/include/DataBuffer.hpp new file mode 100644 index 0000000..1d22d73 --- /dev/null +++ b/jfj-udp-recv/include/DataBuffer.hpp @@ -0,0 +1,92 @@ +#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP +#define CIRCULAR_BUFFER_TEMPLATE_HPP + +#include +#include +#include +#include +#include + +/**Linear data buffer + +A simplified version of FIFO. +**/ +template +class DataBuffer{ +public: + DataBuffer() {}; + ~DataBuffer() {}; + + /**Diagnostics**/ + size_t size() const { return ( _write-_read ); } + size_t capacity() const { return _capacity; } + bool is_full(){ return ( (_write - _read)<_capacity ); } + bool is_empty(){ return (_write ==_read); } + + /**Operators**/ + void zero(){ memset(m_cont, 0, sizeof(m_cont)); } + T& operator[](size_t index); // Array subscript operator + T& container(){ return (_cont; } // Direct container reference + + /**Element access**/ + const T& pop_front(); //Destructive read + const T& get_front(); //Non-destructive read + void push_back(T item); //Write new element to buffer + + /**Guards**/ + std::mutex g_mutex; +private: + T m_cont[CAP]; + const size_t m_capacity = CAP; + size_t ptr_write = 0; + size_t ptr_read = 0; +}; + +/** Array subscript operator + Throws 'std::length_error' if out of range. +**/ +template +T& DataBuffer::operator[](size_t idx){ + if(idx > m_capacity){ + std::string msg = "Buffer index '" + std::to_string(idx) + "' is out of range with capacity '" + std::to_sting(m_capacity) + "'" + std::endl; + throw std::out_of_range(msg); + } + + return m_buffer[idx]; +} + +template +T& DataBuffer::container(){ + return m_buffer; +} + +/*********************************************************************/ + +/** Destructive read (i.e. progress the read pointer) **/ +template +const T& DataBuffer::pop_front(){ + std::lock_guard g_guard; + ptr_read++; + return _buffer[ptr_read-1]; +} + +/**Push a new element to the ringbuffer (do not progress read pointer)**/ +template +const T& DataBuffer::peek_front(){ + return m_buffer[ptr_read]; +} + + +/**Push a new element to the ringbuffer**/ +template +void DataBuffer::push_back(T item){ + std::lock_guard g_guard; + if(ptr_write==m_capacity-1){ + std::string msg = "Buffer with '" + std::to_sting(m_capacity) + "' capacity is full" + std::endl; + throw std::out_of_range(msg); + } + m_buffer[ptr_write] = item; + ptr_write++; +} + +#endif // CIRCULAR_BUFFER_TEMPLATE_HPP diff --git a/jfj-udp-recv/include/ImageStats.hpp b/jfj-udp-recv/include/ImageStats.hpp new file mode 100644 index 0000000..7839a38 --- /dev/null +++ b/jfj-udp-recv/include/ImageStats.hpp @@ -0,0 +1,31 @@ +#include +#include +#include + +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP + + +class FrameStats { + const std::string detector_name_; + const int module_id_; + size_t stats_time_; + + int frames_counter_; + int n_missed_packets_; + int n_corrupted_frames_; + int n_corrupted_pulse_id_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + FrameStats(const std::string &detector_name, + const int module_id, + const size_t stats_time); + void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); +}; + + +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jfj-udp-recv/include/JFJochUdpReceiver.hpp b/jfj-udp-recv/include/JFJochUdpReceiver.hpp new file mode 100644 index 0000000..4d38dd4 --- /dev/null +++ b/jfj-udp-recv/include/JFJochUdpReceiver.hpp @@ -0,0 +1,33 @@ +#ifndef SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP +#define SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP + +#include +#include "PacketUdpReceiver.hpp" +#include "formats.hpp" +#include "buffer_config.hpp" + +class JFJochUdpReceiver { + PacketUdpReceiver m_udp_receiver; + + // Incoming packet buffers + jfjoch_packet_t m_packet_buffer[buffer_config::BUFFER_UDP_N_RECV_MSG]; + iovec m_recv_buff_ptr[buffer_config::BUFFER_UDP_N_RECV_MSG]; + mmsghdr m_msgs[buffer_config::BUFFER_UDP_N_RECV_MSG]; + sockaddr_in m_sock_from[buffer_config::BUFFER_UDP_N_RECV_MSG]; + + bool packet_buffer_loaded_ = false; + int packet_buffer_n_packets_ = 0; + int packet_buffer_offset_ = 0; + + inline void init_frame(ImageMetadata& frame_metadata, const int i_packet); + inline void copy_packet_to_buffers(ImageMetadata& metadata, char* frame_buffer, const int i_packet); + inline uint64_t m_process_packets(const int n_packets, ImageMetadata& metadata, char* frame_buffer); + +public: + JFJochUdpReceiver(const uint16_t port, const int module_id); + virtual ~JFJochUdpReceiver(); + uint64_t get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer); +}; + + +#endif //SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP diff --git a/jfj-udp-recv/include/PacketUdpReceiver.hpp b/jfj-udp-recv/include/PacketUdpReceiver.hpp new file mode 100644 index 0000000..da92d85 --- /dev/null +++ b/jfj-udp-recv/include/PacketUdpReceiver.hpp @@ -0,0 +1,22 @@ +#ifndef UDPRECEIVER_H +#define UDPRECEIVER_H + +#include + +class PacketUdpReceiver { + + int socket_fd_; + +public: + PacketUdpReceiver(); + virtual ~PacketUdpReceiver(); + + bool receive(void* buffer, const size_t buffer_n_bytes); + int receive_many(mmsghdr* msgs, const size_t n_msgs); + + void bind(const uint16_t port); + void disconnect(); +}; + + +#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H diff --git a/jfj-udp-recv/src/ImageStats.cpp b/jfj-udp-recv/src/ImageStats.cpp new file mode 100644 index 0000000..28161c7 --- /dev/null +++ b/jfj-udp-recv/src/ImageStats.cpp @@ -0,0 +1,71 @@ +#include +#include "FrameStats.hpp" + +using namespace std; +using namespace chrono; + +FrameStats::FrameStats( + const std::string &detector_name, + const int module_id, + const size_t stats_time) : + detector_name_(detector_name), + module_id_(module_id), + stats_time_(stats_time) +{ + reset_counters(); +} + +void FrameStats::reset_counters() +{ + frames_counter_ = 0; + n_missed_packets_ = 0; + n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) +{ + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + } + + if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { + n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; + n_corrupted_frames_++; + } + + frames_counter_++; + + auto time_passed = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { + print_stats(); + reset_counters(); + } +} + +void FrameStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_udp_recv"; + cout << ",detector_name=" << detector_name_; + cout << ",module_name=M" << module_id_; + cout << " "; + cout << "n_missed_packets=" << n_missed_packets_ << "i"; + cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/jfj-udp-recv/src/JFJochUdpReceiver.cpp b/jfj-udp-recv/src/JFJochUdpReceiver.cpp new file mode 100644 index 0000000..24ed15e --- /dev/null +++ b/jfj-udp-recv/src/JFJochUdpReceiver.cpp @@ -0,0 +1,112 @@ +#include +#include +#include "JFJochUdpReceiver.hpp" + +using namespace std; +using namespace buffer_config; + +JFJochUdpReceiver::JFJochUdpReceiver(const uint16_t port, const int module_id) : module_id_(module_id){ + udp_receiver_.bind(port); + + for (int i = 0; i < BUFFER_UDP_N_RECV_MSG; i++) { + m_recv_buff_ptr[i].iov_base = (void*) &(m_packet_buffer[i]); + m_recv_buff_ptr[i].iov_len = sizeof(jfjoch_packet_t); + + msgs_[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i]; + msgs_[i].msg_hdr.msg_iovlen = 1; + msgs_[i].msg_hdr.msg_name = &m_sock_from[i]; + msgs_[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } +} + +JFJochUdpReceiver::~JFJochUdpReceiver() { + m_udp_receiver.disconnect(); +} + +inline void JFJochUdpReceiver::init_frame(ImageMetadata& image_metadata, const int i_packet) { + image_metadata.pulse_id = m_packet_buffer[i_packet].bunchid; + image_metadata.frame_index = m_packet_buffer[i_packet].framenum; + image_metadata.daq_rec = m_packet_buffer[i_packet].debug; + image_metadata.is_good_image = 0; +} + +inline void JFJochUdpReceiver::copy_packet_to_buffers(ImageMetadata& metadata, char* frame_buffer, const int idx_packet){ + + size_t buffer_offset = JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer_[idx_packet].packetnum; + + memcpy((void*) (frame_buffer + buffer_offset), m_packet_buffer[idx_packet].data, JUNGFRAU_DATA_BYTES_PER_PACKET); + + metadata.n_recv_packets++; +} + + + +/** Copy the contents of the packet buffer into a single assembled image + NOTE: In the jungfrau_packet, framenum is the trigger number + NOTE: Even partial frames are valid +**/ +inline uint64_t JFJochUdpReceiver::m_process_packets(const int start_offset, ImageMetadata& metadata, char* frame_buffer){ + + for (int i_packet=start_offset; i_packet < packet_buffer_n_packets_; i_packet++) { + + // First packet for this frame (sucks if this one is missed) + if (metadata.pulse_id == 0) { + init_frame(metadata, i_packet); + } + // Unexpected jump (if the last packet from the previous frame got lost) + if (metadata.frame_index != m_packet_buffer[i_packet].framenum) { + // Save queue status (lazy fifo queue) + m_packet_buffer_loaded_ = true; + m_packet_buffer_offset_ = i_packet; + // Even partial frames are valid? + return metadata.pulse_id; + } + + copy_packet_to_buffers(metadata, frame_buffer, i_packet); + + // Last frame packet received (frame finished) + if (packet_buffer_[i_packet].packetnum == JFJ_N_PACKETS_PER_FRAME - 1) { + // Buffer is loaded only if this is not the last message. + if (i_packet+1 != packet_buffer_n_packets_) { + // Continue on next packet + m_packet_buffer_loaded = true; + m_packet_buffer_offset = i_packet + 1; + + // If i_packet is the last packet the buffer is empty. + } else { + m_packet_buffer_loaded = true; + m_packet_buffer_offset = 0; + } + + return metadata.pulse_id; + } + } + // We emptied the buffer. + m_packet_buffer_loaded = false; + m_packet_buffer_offset = 0; + + return 0; +} + +uint64_t JFJochUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){ + // Reset the metadata and frame buffer for the next frame. + metadata.pulse_id = 0; + metadata.n_recv_packets = 0; + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + + // Happens when last packet from previous frame was missed. + if (packet_buffer_loaded_) { + auto pulse_id = m_process_packets(packet_buffer_offset_, metadata, frame_buffer); + if (pulse_id != 0) { return pulse_id; } + } + + // Otherwise read a new one + while (true) { + packet_buffer_n_packets_ = udp_receiver_.receive_many(msgs_, BUFFER_UDP_N_RECV_MSG); + + if (packet_buffer_n_packets_ > 0) { + auto pulse_id = m_process_packets(0, metadata, frame_buffer); + if (pulse_id != 0) { return pulse_id; } + } + } +} diff --git a/jfj-udp-recv/src/PacketUdpReceiver.cpp b/jfj-udp-recv/src/PacketUdpReceiver.cpp new file mode 100644 index 0000000..713dd98 --- /dev/null +++ b/jfj-udp-recv/src/PacketUdpReceiver.cpp @@ -0,0 +1,66 @@ +#include +#include +#include "PacketUdpReceiver.hpp" +#include "jungfrau.hpp" +#include +#include +#include "buffer_config.hpp" + +using namespace std; +using namespace buffer_config; + +PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { } + +PacketUdpReceiver::~PacketUdpReceiver() { + disconnect(); +} + +void PacketUdpReceiver::bind(const uint16_t port){ + if (socket_fd_ > -1) { + throw runtime_error("Socket already bound."); + } + + socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); + if (socket_fd_ < 0) { + throw runtime_error("Cannot open socket."); + } + + sockaddr_in server_address = {0}; + server_address.sin_family = AF_INET; + server_address.sin_addr.s_addr = INADDR_ANY; + server_address.sin_port = htons(port); + + timeval udp_socket_timeout; + udp_socket_timeout.tv_sec = 0; + udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT; + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) { + throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno))); + } + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) { + throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno))); + }; + //TODO: try to set SO_RCVLOWAT + + auto bind_result = ::bind(socket_fd_, reinterpret_cast(&server_address), sizeof(server_address)); + + if (bind_result < 0) { + throw runtime_error("Cannot bind socket."); + } +} + +int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){ + return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0); +} + +bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){ + auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0); + + return (data_len == buffer_n_bytes) ? true : false; +} + +void PacketUdpReceiver::disconnect(){ + close(socket_fd_); + socket_fd_ = -1; +} diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp new file mode 100644 index 0000000..4bd2a17 --- /dev/null +++ b/jfj-udp-recv/src/main.cpp @@ -0,0 +1,78 @@ +#include +#include +#include +#include + +#include "formats.hpp" +#include "buffer_config.hpp" +#include "FrameUdpReceiver.hpp" +#include "BufferUtils.hpp" +#include "FrameStats.hpp" + +using namespace std; +using namespace chrono; +using namespace buffer_config; +using namespace BufferUtils; + +int main (int argc, char *argv[]) { + + if (argc != 3) { + cout << endl; + cout << "Usage: jf_udp_recv [detector_json_filename] [module_id]"; + cout << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tmodule_id: id of the module for this process." << endl; + cout << endl; + + exit(-1); + } + + const auto config = read_json_config(string(argv[1])); + const int module_id = atoi(argv[2]); + + const auto udp_port = config.start_udp_port + module_id; + ImageUdpReceiver receiver(udp_port, module_id); + RamBuffer buffer(config.detector_name, config.n_modules); + ImageStats stats(config.detector_name, module_id, STATS_TIME); + + auto ctx = zmq_ctx_new(); + auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); + + + // Might be better creating a structure for double buffering + ImageMetadata metaBufferA; + char* dataBufferA = new char[IMAGE_N_BYTES]; + + uint64_t pulse_id_previous = 0; + uint64_t frame_index_previous = 0; + + + while (true) { + // NOTE: Needs to be pipelined for really high frame rates + auto pulse_id = receiver.get_image_from_udp(metaBufferA, dataBufferA); + + bool bad_pulse_id = false; + + if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) || + ( (pulse_id-pulse_id_previous) < 0 ) || + ( (pulse_id-pulse_id_previous) > 1000 ) ) { + + bad_pulse_id = true; + + } else { + + buffer.write_frame(metaBufferA, dataBufferA); + + zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); + + } + + stats.record_stats(metaBufferA, bad_pulse_id); + + pulse_id_previous = pulse_id; + frame_index_previous = metaBufferA.frame_index; + + } + + delete[] data; +} diff --git a/jfj-udp-recv/test/CMakeLists.txt b/jfj-udp-recv/test/CMakeLists.txt new file mode 100644 index 0000000..25c729a --- /dev/null +++ b/jfj-udp-recv/test/CMakeLists.txt @@ -0,0 +1,8 @@ +add_executable(jf-udp-recv-tests main.cpp) + +target_link_libraries(jf-udp-recv-tests + core-buffer-lib + jf-udp-recv-lib + gtest + ) + diff --git a/jfj-udp-recv/test/main.cpp b/jfj-udp-recv/test/main.cpp new file mode 100644 index 0000000..8f2cd01 --- /dev/null +++ b/jfj-udp-recv/test/main.cpp @@ -0,0 +1,10 @@ +#include "gtest/gtest.h" +#include "test_PacketUdpReceiver.cpp" +#include "test_FrameUdpReceiver.cpp" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/jfj-udp-recv/test/mock/udp.hpp b/jfj-udp-recv/test/mock/udp.hpp new file mode 100644 index 0000000..1cef271 --- /dev/null +++ b/jfj-udp-recv/test/mock/udp.hpp @@ -0,0 +1,16 @@ +#ifndef MOCK_UDP_H +#define MOCK_UDP_H + +const int MOCK_UDP_PORT(13000); + +sockaddr_in get_server_address(uint16_t udp_port) +{ + sockaddr_in server_address = {0}; + server_address.sin_family = AF_INET; + server_address.sin_addr.s_addr = INADDR_ANY; + server_address.sin_port = htons(udp_port); + + return server_address; +} + +#endif \ No newline at end of file diff --git a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp new file mode 100644 index 0000000..0d1c9ee --- /dev/null +++ b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp @@ -0,0 +1,239 @@ +#include +#include +#include "gtest/gtest.h" +#include "FrameUdpReceiver.hpp" +#include "mock/udp.hpp" + +#include +#include +#include + +using namespace std; + +TEST(BufferUdpReceiver, simple_recv) +{ + auto n_packets = JF_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 5; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + FrameUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_middle_packet) +{ + auto n_packets = JF_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + FrameUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_first_packet) +{ + auto n_packets = JF_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + FrameUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_last_packet) +{ + auto n_packets = JF_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + FrameUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + // n_frames -1 because the last frame is not complete. + for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} \ No newline at end of file diff --git a/jfj-udp-recv/test/test_PacketUdpReceiver.cpp b/jfj-udp-recv/test/test_PacketUdpReceiver.cpp new file mode 100644 index 0000000..1be343a --- /dev/null +++ b/jfj-udp-recv/test/test_PacketUdpReceiver.cpp @@ -0,0 +1,170 @@ +#include +#include +#include "gtest/gtest.h" +#include "mock/udp.hpp" +#include "PacketUdpReceiver.hpp" + +#include +#include + +using namespace std; + +TEST(PacketUdpReceiver, simple_recv) +{ + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + send_udp_buffer.packetnum = 91; + send_udp_buffer.framenum = 92; + send_udp_buffer.bunchid = 93; + send_udp_buffer.debug = 94; + + auto server_address = get_server_address(udp_port); + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + this_thread::sleep_for(chrono::milliseconds(100)); + + jungfrau_packet recv_udp_buffer; + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + EXPECT_EQ(send_udp_buffer.packetnum, recv_udp_buffer.packetnum); + EXPECT_EQ(send_udp_buffer.framenum, recv_udp_buffer.framenum); + EXPECT_EQ(send_udp_buffer.bunchid, recv_udp_buffer.bunchid); + EXPECT_EQ(send_udp_buffer.debug, recv_udp_buffer.debug); + + ASSERT_FALSE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + udp_receiver.disconnect(); + ::close(send_socket_fd); +} + +TEST(PacketUdpReceiver, false_recv) +{ + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + jungfrau_packet recv_udp_buffer; + + auto server_address = get_server_address(udp_port); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET-1, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_FALSE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET-1, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET-1)); + + udp_receiver.disconnect(); + ::close(send_socket_fd); +} + +TEST(PacketUdpReceiver, receive_many) +{ + auto n_msg_buffer = JF_N_PACKETS_PER_FRAME; + jungfrau_packet recv_buffer[n_msg_buffer]; + iovec recv_buff_ptr[n_msg_buffer]; + struct mmsghdr msgs[n_msg_buffer]; + struct sockaddr_in sockFrom[n_msg_buffer]; + + for (int i = 0; i < n_msg_buffer; i++) { + recv_buff_ptr[i].iov_base = (void*) &(recv_buffer[i]); + recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet); + + msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i]; + msgs[i].msg_hdr.msg_iovlen = 1; + msgs[i].msg_hdr.msg_name = &sockFrom[i]; + msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } + + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + + auto server_address = get_server_address(udp_port); + + send_udp_buffer.bunchid = 0; + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + send_udp_buffer.bunchid = 1; + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + this_thread::sleep_for(chrono::milliseconds(10)); + + auto n_msgs = udp_receiver.receive_many(msgs, JF_N_PACKETS_PER_FRAME); + ASSERT_EQ(n_msgs, 2); + + for (size_t i=0;i Date: Tue, 1 Jun 2021 10:22:10 +0200 Subject: [PATCH 2/5] Refactored to linear buffer --- core-buffer/include/jungfraujoch.hpp | 2 +- core-buffer/test/test_PacketBuffer.cpp | 254 ++++++++++++++++++ jfj-udp-recv/include/DataBuffer.hpp | 92 ------- jfj-udp-recv/include/JFJochUdpReceiver.hpp | 33 --- .../{ImageStats.hpp => JfjFrameStats.hpp} | 0 jfj-udp-recv/include/JfjFrameUdpReceiver.hpp | 31 +++ jfj-udp-recv/include/PacketBuffer.hpp | 110 ++++++++ jfj-udp-recv/mockmain.cpp | 15 ++ jfj-udp-recv/src/JFJochUdpReceiver.cpp | 112 -------- .../src/{ImageStats.cpp => JfjFrameStats.cpp} | 0 jfj-udp-recv/src/JfjFrameUdpReceiver.cpp | 77 ++++++ jfj-udp-recv/test/test_FrameUdpReceiver.cpp | 21 +- 12 files changed, 494 insertions(+), 253 deletions(-) create mode 100644 core-buffer/test/test_PacketBuffer.cpp delete mode 100644 jfj-udp-recv/include/DataBuffer.hpp delete mode 100644 jfj-udp-recv/include/JFJochUdpReceiver.hpp rename jfj-udp-recv/include/{ImageStats.hpp => JfjFrameStats.hpp} (100%) create mode 100644 jfj-udp-recv/include/JfjFrameUdpReceiver.hpp create mode 100644 jfj-udp-recv/include/PacketBuffer.hpp create mode 100644 jfj-udp-recv/mockmain.cpp delete mode 100644 jfj-udp-recv/src/JFJochUdpReceiver.cpp rename jfj-udp-recv/src/{ImageStats.cpp => JfjFrameStats.cpp} (100%) create mode 100644 jfj-udp-recv/src/JfjFrameUdpReceiver.cpp diff --git a/core-buffer/include/jungfraujoch.hpp b/core-buffer/include/jungfraujoch.hpp index bbfbb08..fd549b2 100644 --- a/core-buffer/include/jungfraujoch.hpp +++ b/core-buffer/include/jungfraujoch.hpp @@ -17,7 +17,7 @@ struct jfjoch_packet_t { uint32_t exptime; uint32_t packetnum; - double bunchid; + uint64_t bunchid; uint64_t timestamp; uint16_t moduleID; diff --git a/core-buffer/test/test_PacketBuffer.cpp b/core-buffer/test/test_PacketBuffer.cpp new file mode 100644 index 0000000..110e6ca --- /dev/null +++ b/core-buffer/test/test_PacketBuffer.cpp @@ -0,0 +1,254 @@ +#include +#include +#include "gtest/gtest.h" +#include "PacketBuffer.hpp" + +#include +#include +#include + +using namespace std; + +template +class MockReceiver{ + public: + int idx_packet = 42000; + int packet_per_frame = 512; + int num_bunches = 100; + int num_packets =50; + + int receive_many(mmsghdr* msgs, const size_t n_msgs){ + // Receive 'num_packets numner of packets' + + for(int ii=0; ii(mmsghdr[ii].msg_hdr.msg_iov->iov_base); + refer.bunchid = idx_packet / packet_per_frame; + refer.packetnum = idx_packet % packet_per_frame; + idx_packet++; + } + return std::min(num_packets, n_msgs); + }; +}; + +// +// +// +// +//TEST(BufferUdpReceiver, simple_recv) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 5; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// for (int i_frame=0; i_frame < n_frames; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp( +// metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets); +// } +// +// ::close(send_socket_fd); +//} +// +//TEST(BufferUdpReceiver, missing_middle_packet) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 3; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port, source_id); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// for (int i_frame=0; i_frame < n_frames; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp( +// metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); +// } +// +// ::close(send_socket_fd); +//} +// +//TEST(BufferUdpReceiver, missing_first_packet) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 3; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// for (int i_frame=0; i_frame < n_frames; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp( +// metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); +// } +// +// ::close(send_socket_fd); +//} +// +//TEST(BufferUdpReceiver, missing_last_packet) +//{ +// auto n_packets = JF_N_PACKETS_PER_FRAME; +// int n_frames = 3; +// +// uint16_t udp_port = MOCK_UDP_PORT; +// auto server_address = get_server_address(udp_port); +// auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); +// ASSERT_TRUE(send_socket_fd >= 0); +// +// JfjFrameUdpReceiver udp_receiver(udp_port); +// +// auto handle = async(launch::async, [&](){ +// for (int i_frame=0; i_frame < n_frames; i_frame++){ +// for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); +// +// // n_frames -1 because the last frame is not complete. +// for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { +// auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); +// +// ASSERT_EQ(i_frame + 1, pulse_id); +// ASSERT_EQ(metadata.frame_index, i_frame + 1000); +// ASSERT_EQ(metadata.daq_rec, i_frame + 10000); +// // -1 because we skipped a packet. +// ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); +// } +// +// ::close(send_socket_fd); +//} diff --git a/jfj-udp-recv/include/DataBuffer.hpp b/jfj-udp-recv/include/DataBuffer.hpp deleted file mode 100644 index 1d22d73..0000000 --- a/jfj-udp-recv/include/DataBuffer.hpp +++ /dev/null @@ -1,92 +0,0 @@ -#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP -#define CIRCULAR_BUFFER_TEMPLATE_HPP - -#include -#include -#include -#include -#include - -/**Linear data buffer - -A simplified version of FIFO. -**/ -template -class DataBuffer{ -public: - DataBuffer() {}; - ~DataBuffer() {}; - - /**Diagnostics**/ - size_t size() const { return ( _write-_read ); } - size_t capacity() const { return _capacity; } - bool is_full(){ return ( (_write - _read)<_capacity ); } - bool is_empty(){ return (_write ==_read); } - - /**Operators**/ - void zero(){ memset(m_cont, 0, sizeof(m_cont)); } - T& operator[](size_t index); // Array subscript operator - T& container(){ return (_cont; } // Direct container reference - - /**Element access**/ - const T& pop_front(); //Destructive read - const T& get_front(); //Non-destructive read - void push_back(T item); //Write new element to buffer - - /**Guards**/ - std::mutex g_mutex; -private: - T m_cont[CAP]; - const size_t m_capacity = CAP; - size_t ptr_write = 0; - size_t ptr_read = 0; -}; - -/** Array subscript operator - Throws 'std::length_error' if out of range. -**/ -template -T& DataBuffer::operator[](size_t idx){ - if(idx > m_capacity){ - std::string msg = "Buffer index '" + std::to_string(idx) + "' is out of range with capacity '" + std::to_sting(m_capacity) + "'" + std::endl; - throw std::out_of_range(msg); - } - - return m_buffer[idx]; -} - -template -T& DataBuffer::container(){ - return m_buffer; -} - -/*********************************************************************/ - -/** Destructive read (i.e. progress the read pointer) **/ -template -const T& DataBuffer::pop_front(){ - std::lock_guard g_guard; - ptr_read++; - return _buffer[ptr_read-1]; -} - -/**Push a new element to the ringbuffer (do not progress read pointer)**/ -template -const T& DataBuffer::peek_front(){ - return m_buffer[ptr_read]; -} - - -/**Push a new element to the ringbuffer**/ -template -void DataBuffer::push_back(T item){ - std::lock_guard g_guard; - if(ptr_write==m_capacity-1){ - std::string msg = "Buffer with '" + std::to_sting(m_capacity) + "' capacity is full" + std::endl; - throw std::out_of_range(msg); - } - m_buffer[ptr_write] = item; - ptr_write++; -} - -#endif // CIRCULAR_BUFFER_TEMPLATE_HPP diff --git a/jfj-udp-recv/include/JFJochUdpReceiver.hpp b/jfj-udp-recv/include/JFJochUdpReceiver.hpp deleted file mode 100644 index 4d38dd4..0000000 --- a/jfj-udp-recv/include/JFJochUdpReceiver.hpp +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP -#define SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP - -#include -#include "PacketUdpReceiver.hpp" -#include "formats.hpp" -#include "buffer_config.hpp" - -class JFJochUdpReceiver { - PacketUdpReceiver m_udp_receiver; - - // Incoming packet buffers - jfjoch_packet_t m_packet_buffer[buffer_config::BUFFER_UDP_N_RECV_MSG]; - iovec m_recv_buff_ptr[buffer_config::BUFFER_UDP_N_RECV_MSG]; - mmsghdr m_msgs[buffer_config::BUFFER_UDP_N_RECV_MSG]; - sockaddr_in m_sock_from[buffer_config::BUFFER_UDP_N_RECV_MSG]; - - bool packet_buffer_loaded_ = false; - int packet_buffer_n_packets_ = 0; - int packet_buffer_offset_ = 0; - - inline void init_frame(ImageMetadata& frame_metadata, const int i_packet); - inline void copy_packet_to_buffers(ImageMetadata& metadata, char* frame_buffer, const int i_packet); - inline uint64_t m_process_packets(const int n_packets, ImageMetadata& metadata, char* frame_buffer); - -public: - JFJochUdpReceiver(const uint16_t port, const int module_id); - virtual ~JFJochUdpReceiver(); - uint64_t get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer); -}; - - -#endif //SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP diff --git a/jfj-udp-recv/include/ImageStats.hpp b/jfj-udp-recv/include/JfjFrameStats.hpp similarity index 100% rename from jfj-udp-recv/include/ImageStats.hpp rename to jfj-udp-recv/include/JfjFrameStats.hpp diff --git a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp new file mode 100644 index 0000000..d5c5ddf --- /dev/null +++ b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp @@ -0,0 +1,31 @@ +#ifndef SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP +#define SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP + +#include +#include "PacketUdpReceiver.hpp" +#include "formats.hpp" +#include "buffer_config.hpp" +#include "PacketBuffer.hpp" + + +/** JungfrauJoch UDP receiver + + Wrapper class to capture frames from the UDP stream of the JungfrauJoch FPGA card. + NOTE: This design will not scale well for higher frame rates... +**/ +class JfjFrameUdpReceiver { + PacketUdpReceiver udp_receiver_; + + PacketBuffer m_buffer; + + inline void init_frame(ImageMetadata& frame_metadata, jfjoch_packet_t& c_packet); + inline uint64_t process_packets(ImageMetadata& metadata, char* frame_buffer); + +public: + JfjFrameUdpReceiver(const uint16_t port); + virtual ~JfjFrameUdpReceiver(); + uint64_t get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer); +}; + + +#endif //SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP diff --git a/jfj-udp-recv/include/PacketBuffer.hpp b/jfj-udp-recv/include/PacketBuffer.hpp new file mode 100644 index 0000000..46958e6 --- /dev/null +++ b/jfj-udp-recv/include/PacketBuffer.hpp @@ -0,0 +1,110 @@ +#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP +#define CIRCULAR_BUFFER_TEMPLATE_HPP + +#include +#include +#include +#include +#include +#include + + +/** Linear data buffer (NOT FIFO) + + Simplified data buffer that provides pop and push operations and + bundles the actual container with metadata required by . + It stores the actual data in an accessible C-style array. **/ +template +class PacketBuffer{ +public: + PacketBuffer() { + for (int i = 0; i < CAPACITY; i++) { + m_recv_buff_ptr[i].iov_base = (void*) &(m_container[i]); + m_recv_buff_ptr[i].iov_len = sizeof(T); + + // C-structure as expected by + m_msgs[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i]; + m_msgs[i].msg_hdr.msg_iovlen = 1; + m_msgs[i].msg_hdr.msg_name = &m_sock_from[i]; + m_msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } + }; + // ~PacketBuffer() {}; + + /**Diagnostics**/ + size_t size() const { return ( idx_write-idx_read ); } + size_t capacity() const { return m_capacity; } + bool is_full() const { return (idx_write >= m_capacity); } + bool is_empty() const { return (idx_write <= idx_read); } + + /**Operators**/ + void reset(){ idx_write = 0; idx_read = 0; }; // Reset the buffer + T& container(){ return m_container; }; // Direct container reference + mmsghdr& msgs(){ return m_msgs; }; + + /**Element access**/ + const T& pop_front(); //Destructive read + const T& peek_front(); //Non-destructive read + void push_back(T item); //Write new element to buffer + + /**Fill from UDP receiver**/ + template + void fill_fom(TY& recv){ + std::lock_guard g_guard(m_mutex); + this->idx_write = recv.receive_many(this->msgs(), this->capacity()); + this->idx_read = 0; + } + +private: + // Main container + T m_container[CAPACITY]; + const size_t m_capacity = CAPACITY; + /**Guards**/ + std::mutex m_mutex; + /**Read and write index**/ + size_t idx_write = 0; + size_t idx_read = 0; + + // C-structures as expected by + mmsghdr m_msgs[CAPACITY]; + iovec m_recv_buff_ptr[CAPACITY]; + sockaddr_in m_sock_from[CAPACITY]; +}; + + +/*********************************************************************/ +/*********************************************************************/ +/*********************************************************************/ + +/** Destructive read + Standard read access to queues (i.e. progress the read pointer). + Throws 'std::length_error' if container is empty. **/ +template +const T& PacketBuffer::pop_front(){ + std::lock_guard g_guard(m_mutex); + if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } + idx_read++; + return m_container[idx_read-1]; +} + +/** Non-destructive read + Standard, non-destructive read access (does not progress the read pointer). + Throws 'std::length_error' if container is empty. **/ +template +const T& PacketBuffer::peek_front(){ + std::lock_guard g_guard(m_mutex); + if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } + return m_container[idx_read]; +} + + +/** Push an element into the end of the buffer**/ +template +void PacketBuffer::push_back(T item){ + std::lock_guard g_guard(m_mutex); + if(this->is_full()){ throw std::out_of_range("Attempted to write a full buffer!"); } + m_container[idx_write] = item; + idx_write++; +} + +#endif // CIRCULAR_BUFFER_TEMPLATE_HPP diff --git a/jfj-udp-recv/mockmain.cpp b/jfj-udp-recv/mockmain.cpp new file mode 100644 index 0000000..1d33d2b --- /dev/null +++ b/jfj-udp-recv/mockmain.cpp @@ -0,0 +1,15 @@ + +#include "include/PacketBuffer.hpp" + + +struct DummyContainer{ + uint64_t index; + uint64_t timestamp; + uint16_t data[32]; +}; + + +int main (int argc, char *argv[]) { + PacketBuffer b; + +} diff --git a/jfj-udp-recv/src/JFJochUdpReceiver.cpp b/jfj-udp-recv/src/JFJochUdpReceiver.cpp deleted file mode 100644 index 24ed15e..0000000 --- a/jfj-udp-recv/src/JFJochUdpReceiver.cpp +++ /dev/null @@ -1,112 +0,0 @@ -#include -#include -#include "JFJochUdpReceiver.hpp" - -using namespace std; -using namespace buffer_config; - -JFJochUdpReceiver::JFJochUdpReceiver(const uint16_t port, const int module_id) : module_id_(module_id){ - udp_receiver_.bind(port); - - for (int i = 0; i < BUFFER_UDP_N_RECV_MSG; i++) { - m_recv_buff_ptr[i].iov_base = (void*) &(m_packet_buffer[i]); - m_recv_buff_ptr[i].iov_len = sizeof(jfjoch_packet_t); - - msgs_[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i]; - msgs_[i].msg_hdr.msg_iovlen = 1; - msgs_[i].msg_hdr.msg_name = &m_sock_from[i]; - msgs_[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); - } -} - -JFJochUdpReceiver::~JFJochUdpReceiver() { - m_udp_receiver.disconnect(); -} - -inline void JFJochUdpReceiver::init_frame(ImageMetadata& image_metadata, const int i_packet) { - image_metadata.pulse_id = m_packet_buffer[i_packet].bunchid; - image_metadata.frame_index = m_packet_buffer[i_packet].framenum; - image_metadata.daq_rec = m_packet_buffer[i_packet].debug; - image_metadata.is_good_image = 0; -} - -inline void JFJochUdpReceiver::copy_packet_to_buffers(ImageMetadata& metadata, char* frame_buffer, const int idx_packet){ - - size_t buffer_offset = JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer_[idx_packet].packetnum; - - memcpy((void*) (frame_buffer + buffer_offset), m_packet_buffer[idx_packet].data, JUNGFRAU_DATA_BYTES_PER_PACKET); - - metadata.n_recv_packets++; -} - - - -/** Copy the contents of the packet buffer into a single assembled image - NOTE: In the jungfrau_packet, framenum is the trigger number - NOTE: Even partial frames are valid -**/ -inline uint64_t JFJochUdpReceiver::m_process_packets(const int start_offset, ImageMetadata& metadata, char* frame_buffer){ - - for (int i_packet=start_offset; i_packet < packet_buffer_n_packets_; i_packet++) { - - // First packet for this frame (sucks if this one is missed) - if (metadata.pulse_id == 0) { - init_frame(metadata, i_packet); - } - // Unexpected jump (if the last packet from the previous frame got lost) - if (metadata.frame_index != m_packet_buffer[i_packet].framenum) { - // Save queue status (lazy fifo queue) - m_packet_buffer_loaded_ = true; - m_packet_buffer_offset_ = i_packet; - // Even partial frames are valid? - return metadata.pulse_id; - } - - copy_packet_to_buffers(metadata, frame_buffer, i_packet); - - // Last frame packet received (frame finished) - if (packet_buffer_[i_packet].packetnum == JFJ_N_PACKETS_PER_FRAME - 1) { - // Buffer is loaded only if this is not the last message. - if (i_packet+1 != packet_buffer_n_packets_) { - // Continue on next packet - m_packet_buffer_loaded = true; - m_packet_buffer_offset = i_packet + 1; - - // If i_packet is the last packet the buffer is empty. - } else { - m_packet_buffer_loaded = true; - m_packet_buffer_offset = 0; - } - - return metadata.pulse_id; - } - } - // We emptied the buffer. - m_packet_buffer_loaded = false; - m_packet_buffer_offset = 0; - - return 0; -} - -uint64_t JFJochUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){ - // Reset the metadata and frame buffer for the next frame. - metadata.pulse_id = 0; - metadata.n_recv_packets = 0; - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - - // Happens when last packet from previous frame was missed. - if (packet_buffer_loaded_) { - auto pulse_id = m_process_packets(packet_buffer_offset_, metadata, frame_buffer); - if (pulse_id != 0) { return pulse_id; } - } - - // Otherwise read a new one - while (true) { - packet_buffer_n_packets_ = udp_receiver_.receive_many(msgs_, BUFFER_UDP_N_RECV_MSG); - - if (packet_buffer_n_packets_ > 0) { - auto pulse_id = m_process_packets(0, metadata, frame_buffer); - if (pulse_id != 0) { return pulse_id; } - } - } -} diff --git a/jfj-udp-recv/src/ImageStats.cpp b/jfj-udp-recv/src/JfjFrameStats.cpp similarity index 100% rename from jfj-udp-recv/src/ImageStats.cpp rename to jfj-udp-recv/src/JfjFrameStats.cpp diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp new file mode 100644 index 0000000..aa38ec9 --- /dev/null +++ b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp @@ -0,0 +1,77 @@ +#include +#include +#include "JfjFrameUdpReceiver.hpp" + +using namespace std; +using namespace buffer_config; + +JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) { + udp_receiver_.bind(port); +} + +JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { + udp_receiver_.disconnect(); +} + +inline void JfjFrameUdpReceiver::init_frame(ImageMetadata& frame_metadata, const jfjoch_packet_t& c_packet) { + frame_metadata.pulse_id = c_packet.timestamp; + frame_metadata.frame_index = c_packet.framenum; + frame_metadata.daq_rec = (uint32_t) c_packet.debug; + frame_metadata.is_good_image = (int32_t) true; +} + +inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, char* frame_buffer){ + + while(!m_buffer.is_empty()){ + // Happens if the last packet from the previous frame gets lost. + if (m_frame_index != m_buffer.peek_front().framenum) { + m_frame_index = m_buffer.peek_front().framenum; + frame_metadata.is_good_image = (int32_t) false; + return metadata.pulse_id; + } + + // Otherwise pop the queue (and set current frame index) + jfjoch_packet_t& c_packet = m_buffer.pop_front(); + m_frame_index = c_packet.framenum; + + // Always copy metadata (otherwise problem when 0th packet gets lost) + this->init_frame(metadata, c_packet); + + // Copy data to frame buffer + size_t offset = JUNGFRAU_DATA_BYTES_PER_PACKET * c_packet.packetnum; + memcpy( (void*) (frame_buffer + offset), c_packet.data, JUNGFRAU_DATA_BYTES_PER_PACKET); + metadata.n_recv_packets++; + + // Last frame packet received. Frame finished. + if (c_packet.packetnum == JFJ_N_PACKETS_PER_FRAME - 1){ + return metadata.pulse_id; + } + } + + // We emptied the buffer. + m_buffer.reset(); + return 0; +} + +uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){ + // Reset the metadata and frame buffer for the next frame. + metadata.pulse_id = 0; + metadata.n_recv_packets = 0; + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + + // Process leftover packages in the buffer + if (!m_buffer.is_empty()) { + auto pulse_id = process_packets(metadata, frame_buffer); + if (pulse_id != 0) { return pulse_id; } + } + + while (true) { + // Receive new packages (pass if none)... + m_buffer.fill_from(m_udp_receiver); + if (m_buffer.is_empty()) { continue; } + + // ... and process them + auto pulse_id = process_packets(metadata, frame_buffer); + if (pulse_id != 0) { return pulse_id; } + } +} diff --git a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp index 0d1c9ee..e1e86ab 100644 --- a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp +++ b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp @@ -13,7 +13,6 @@ using namespace std; TEST(BufferUdpReceiver, simple_recv) { auto n_packets = JF_N_PACKETS_PER_FRAME; - int source_id = 1234; int n_frames = 5; uint16_t udp_port = MOCK_UDP_PORT; @@ -21,7 +20,7 @@ TEST(BufferUdpReceiver, simple_recv) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -57,7 +56,6 @@ TEST(BufferUdpReceiver, simple_recv) ASSERT_EQ(metadata.daq_rec, i_frame + 10000); // -1 because we skipped a packet. ASSERT_EQ(metadata.n_recv_packets, n_packets); - ASSERT_EQ(metadata.module_id, source_id); } ::close(send_socket_fd); @@ -66,7 +64,6 @@ TEST(BufferUdpReceiver, simple_recv) TEST(BufferUdpReceiver, missing_middle_packet) { auto n_packets = JF_N_PACKETS_PER_FRAME; - int source_id = 1234; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -74,7 +71,7 @@ TEST(BufferUdpReceiver, missing_middle_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + JfjFrameUdpReceiver udp_receiver(udp_port, source_id); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -115,7 +112,6 @@ TEST(BufferUdpReceiver, missing_middle_packet) ASSERT_EQ(metadata.daq_rec, i_frame + 10000); // -1 because we skipped a packet. ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - ASSERT_EQ(metadata.module_id, source_id); } ::close(send_socket_fd); @@ -124,7 +120,6 @@ TEST(BufferUdpReceiver, missing_middle_packet) TEST(BufferUdpReceiver, missing_first_packet) { auto n_packets = JF_N_PACKETS_PER_FRAME; - int source_id = 1234; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -132,7 +127,7 @@ TEST(BufferUdpReceiver, missing_first_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -173,7 +168,6 @@ TEST(BufferUdpReceiver, missing_first_packet) ASSERT_EQ(metadata.daq_rec, i_frame + 10000); // -1 because we skipped a packet. ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - ASSERT_EQ(metadata.module_id, source_id); } ::close(send_socket_fd); @@ -182,7 +176,6 @@ TEST(BufferUdpReceiver, missing_first_packet) TEST(BufferUdpReceiver, missing_last_packet) { auto n_packets = JF_N_PACKETS_PER_FRAME; - int source_id = 1234; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -190,7 +183,7 @@ TEST(BufferUdpReceiver, missing_last_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - FrameUdpReceiver udp_receiver(udp_port, source_id); + JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ for (int i_frame=0; i_frame < n_frames; i_frame++){ @@ -224,16 +217,14 @@ TEST(BufferUdpReceiver, missing_last_packet) // n_frames -1 because the last frame is not complete. for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); + auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); ASSERT_EQ(i_frame + 1, pulse_id); ASSERT_EQ(metadata.frame_index, i_frame + 1000); ASSERT_EQ(metadata.daq_rec, i_frame + 10000); // -1 because we skipped a packet. ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); - ASSERT_EQ(metadata.module_id, source_id); } ::close(send_socket_fd); -} \ No newline at end of file +} From 5be091eee842bd5bb6043a8bbcd8901300028fb1 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Tue, 1 Jun 2021 12:11:15 +0200 Subject: [PATCH 3/5] Almost compiles but needs more metadata --- jfj-udp-recv/CMakeLists.txt | 30 +- jfj-udp-recv/README.md | 328 +++++++++---------- jfj-udp-recv/include/JfjFrameStats.hpp | 59 ++-- jfj-udp-recv/include/JfjFrameUdpReceiver.hpp | 7 +- jfj-udp-recv/include/PacketBuffer.hpp | 12 +- jfj-udp-recv/include/PacketUdpReceiver.hpp | 44 +-- jfj-udp-recv/mockmain.cpp | 15 - jfj-udp-recv/src/JfjFrameStats.cpp | 125 +++---- jfj-udp-recv/src/JfjFrameUdpReceiver.cpp | 30 +- jfj-udp-recv/src/main.cpp | 33 +- 10 files changed, 317 insertions(+), 366 deletions(-) delete mode 100644 jfj-udp-recv/mockmain.cpp diff --git a/jfj-udp-recv/CMakeLists.txt b/jfj-udp-recv/CMakeLists.txt index 3c83127..7380471 100644 --- a/jfj-udp-recv/CMakeLists.txt +++ b/jfj-udp-recv/CMakeLists.txt @@ -1,18 +1,12 @@ -file(GLOB SOURCES - src/*.cpp) - -add_library(jf-udp-recv-lib STATIC ${SOURCES}) -target_include_directories(jf-udp-recv-lib PUBLIC include/) -target_link_libraries(jf-udp-recv-lib - external - core-buffer-lib) - -add_executable(jf-udp-recv src/main.cpp) -set_target_properties(jf-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv) -target_link_libraries(jf-udp-recv - jf-udp-recv-lib - zmq - rt) - -enable_testing() -add_subdirectory(test/) +file(GLOB SOURCES src/*.cpp) + +add_library(jfj-udp-recv-lib STATIC ${SOURCES}) +target_include_directories(jfj-udp-recv-lib PUBLIC include/) +target_link_libraries(jfj-udp-recv-lib external core-buffer-lib) + +add_executable(jfj-udp-recv src/main.cpp) +set_target_properties(jfj-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv) +target_link_libraries(jfj-udp-recv jfj-udp-recv-lib zmq rt) + +enable_testing() +# add_subdirectory(test/) diff --git a/jfj-udp-recv/README.md b/jfj-udp-recv/README.md index 504281f..b51f999 100644 --- a/jfj-udp-recv/README.md +++ b/jfj-udp-recv/README.md @@ -1,164 +1,164 @@ -# sf-buffer -sf-buffer is the component that receives the detector data in form of UDP -packages and writes them down to disk to a binary format. In addition, it -sends a copy of the module frame to sf-stream via ZMQ. - -Each sf-buffer process is taking care of a single detector module. The -processes are all independent and do not rely on any external data input -to maximize isolation and possible interactions in our system. - -The main design principle is simplicity and decoupling: - -- No interprocess dependencies/communication. -- No dependencies on external libraries (as much as possible). -- Using POSIX as much as possible. - -We are optimizing for maintainability and long term stability. Performance is -of concern only if the performance criteria are not met. - -## Overview - -![image_buffer_overview](../docs/sf_daq_buffer-overview-buffer.jpg) - -sf-buffer is a single threaded application (without counting the ZMQ IO threads) -that does both receiving, assembling, writing and sending in the same thread. - -### UDP receiving - -Each process listens to one udp port. Packets coming to this udp port are -assembled into frames. Frames (either complete or with missing packets) are -passed forward. The number of received packets is saved so we can later -(at image assembly time) determine if the frame is valid or not. At this point -we do no validation. - -We are currently using **recvmmsg** to minimize the number of switches to -kernel mode. - -We expect all packets to come in order or not come at all. Once we see the -package for the next pulse_id we can assume no more packages are coming for -the previous one, and send the assembled frame down the program. - -### File writing - -Files are written to disk in frames - one write to disk per frame. This gives -us a relaxed 10ms interval of 1 MB writes. - -#### File format - -The binary file on disk is just a serialization of multiple -**BufferBinaryFormat** structs: -```c++ -#pragma pack(push) -#pragma pack(1) -struct ModuleFrame { - uint64_t pulse_id; - uint64_t frame_index; - uint64_t daq_rec; - uint64_t n_recv_packets; - uint64_t module_id; -}; -#pragma pack(pop) - -#pragma pack(push) -#pragma pack(1) -struct BufferBinaryFormat { - const char FORMAT_MARKER = 0xBE; - ModuleFrame meta; - char data[buffer_config::MODULE_N_BYTES]; -}; -#pragma pack(pop) -``` - -![file_layout_image](../docs/sf_daq_buffer-FileLayout.jpg) - -Each frame is composed by: - -- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame. -- **ModuleFrame** - frame meta used in image assembly phase. -- **Data** - assembled frame from a single module. - -Frames are written one after another to a specific offset in the file. The -offset is calculated based on the pulse_id, so each frame has a specific place -in the file and there is no need to have an index for frame retrieval. - -The offset where a specific pulse_id is written in a file is calculated: - -```c++ -// We save 1000 pulses in each file. -const uint64_t FILE_MOD = 1000 - -// Relative index of pulse_id inside file. -size_t file_base = pulse_id % FILE_MOD; -// Offset in bytes of relative index in file. -size_t file_offset = file_base * sizeof(BufferBinaryFormat); -``` - -We now know where to look for data inside the file, but we still don't know -inside which file to look. For this we need to discuss the folder structure. - -#### Folder structure - -The folder (as well as file) structure is deterministic in the sense that given -a specific pulse_id, we can directly calculate the folder, file, and file -offset where the data is stored. This allows us to have independent writing -and reading from the buffer without building any indexes. - -The binary files written by sf_buffer are saved to: - -[detector_folder]/[module_folder]/[data_folder]/[data_file].bin - -- **detector\_folder** should always be passed as an absolute path. This is the -container that holds all data related to a specific detector. -- **module\_folder** is usually composed like "M00", "M01". It separates data -from different modules of one detector. -- **data\_folder** and **data\_file** are automatically calculated based on the -current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our -index for accessing data. - -![folder_layout_image](../docs/sf_daq_buffer-FolderLayout.jpg) - -```c++ -// FOLDER_MOD = 100000 -int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD; -// FILE_MOD = 1000 -int data_file = (pulse_id % FILE_MOD) * FILE_MOD; -``` - -The data_folder and data_file folders are named as the first pulse_id that -should be stored inside them. - -FOLDER_MOD == 100000 means that each data_folder will contain data for 100000 -pulses, while FILE_MOD == 1000 means that each file inside the data_folder -will contain 1000 pulses. The total number of data_files in each data_folder -will therefore be **FILE\_MOD / FOLDER\_MOD = 100**. - -#### Analyzing the buffer on disk -In **sf-utils** there is a Python module that allows you to read directly the -buffer in order to debug it or to verify the consistency between the HDF5 file -and the received data. - -- VerifyH5DataConsistency.py checks the consistency between the H5 file and -buffer. -- BinaryBufferReader.py reads the buffer and prints meta. The class inside -can also be used in external scripts. - -### ZMQ sending - -A copy of the data written to disk is also send via ZMQ to the sf-stream. This -is used to provide live viewing / processing capabilities. Each module data is -sent separately, and this is later assembled in the sf-stream. - -We use the PUB/SUB mechanism for distributing this data - we cannot control the -rate of the producer, and we would like to avoid distributed image assembly -if possible, so PUSH/PULL does not make sense in this case. - -We provide no guarantees on live data delivery, but in practice the number of -dropped or incomplete frames in currently negligible. - -The protocol is a serialization of the same data structures we use to -write on disk (no need for additional memory operations before sending out -data). It uses a 2 part multipart ZMQ message: - -- The first part is a serialization of the ModuleFrame struct (see above). -- The second part is the data field in the BufferBinaryFormat struct (the frame -data). +# sf-buffer +sf-buffer is the component that receives the detector data in form of UDP +packages and writes them down to disk to a binary format. In addition, it +sends a copy of the module frame to sf-stream via ZMQ. + +Each sf-buffer process is taking care of a single detector module. The +processes are all independent and do not rely on any external data input +to maximize isolation and possible interactions in our system. + +The main design principle is simplicity and decoupling: + +- No interprocess dependencies/communication. +- No dependencies on external libraries (as much as possible). +- Using POSIX as much as possible. + +We are optimizing for maintainability and long term stability. Performance is +of concern only if the performance criteria are not met. + +## Overview + +![image_buffer_overview](../docs/sf_daq_buffer-overview-buffer.jpg) + +sf-buffer is a single threaded application (without counting the ZMQ IO threads) +that does both receiving, assembling, writing and sending in the same thread. + +### UDP receiving + +Each process listens to one udp port. Packets coming to this udp port are +assembled into frames. Frames (either complete or with missing packets) are +passed forward. The number of received packets is saved so we can later +(at image assembly time) determine if the frame is valid or not. At this point +we do no validation. + +We are currently using **recvmmsg** to minimize the number of switches to +kernel mode. + +We expect all packets to come in order or not come at all. Once we see the +package for the next pulse_id we can assume no more packages are coming for +the previous one, and send the assembled frame down the program. + +### File writing + +Files are written to disk in frames - one write to disk per frame. This gives +us a relaxed 10ms interval of 1 MB writes. + +#### File format + +The binary file on disk is just a serialization of multiple +**BufferBinaryFormat** structs: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) + +#pragma pack(push) +#pragma pack(1) +struct BufferBinaryFormat { + const char FORMAT_MARKER = 0xBE; + ModuleFrame meta; + char data[buffer_config::MODULE_N_BYTES]; +}; +#pragma pack(pop) +``` + +![file_layout_image](../docs/sf_daq_buffer-FileLayout.jpg) + +Each frame is composed by: + +- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame. +- **ModuleFrame** - frame meta used in image assembly phase. +- **Data** - assembled frame from a single module. + +Frames are written one after another to a specific offset in the file. The +offset is calculated based on the pulse_id, so each frame has a specific place +in the file and there is no need to have an index for frame retrieval. + +The offset where a specific pulse_id is written in a file is calculated: + +```c++ +// We save 1000 pulses in each file. +const uint64_t FILE_MOD = 1000 + +// Relative index of pulse_id inside file. +size_t file_base = pulse_id % FILE_MOD; +// Offset in bytes of relative index in file. +size_t file_offset = file_base * sizeof(BufferBinaryFormat); +``` + +We now know where to look for data inside the file, but we still don't know +inside which file to look. For this we need to discuss the folder structure. + +#### Folder structure + +The folder (as well as file) structure is deterministic in the sense that given +a specific pulse_id, we can directly calculate the folder, file, and file +offset where the data is stored. This allows us to have independent writing +and reading from the buffer without building any indexes. + +The binary files written by sf_buffer are saved to: + +[detector_folder]/[module_folder]/[data_folder]/[data_file].bin + +- **detector\_folder** should always be passed as an absolute path. This is the +container that holds all data related to a specific detector. +- **module\_folder** is usually composed like "M00", "M01". It separates data +from different modules of one detector. +- **data\_folder** and **data\_file** are automatically calculated based on the +current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our +index for accessing data. + +![folder_layout_image](../docs/sf_daq_buffer-FolderLayout.jpg) + +```c++ +// FOLDER_MOD = 100000 +int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD; +// FILE_MOD = 1000 +int data_file = (pulse_id % FILE_MOD) * FILE_MOD; +``` + +The data_folder and data_file folders are named as the first pulse_id that +should be stored inside them. + +FOLDER_MOD == 100000 means that each data_folder will contain data for 100000 +pulses, while FILE_MOD == 1000 means that each file inside the data_folder +will contain 1000 pulses. The total number of data_files in each data_folder +will therefore be **FILE\_MOD / FOLDER\_MOD = 100**. + +#### Analyzing the buffer on disk +In **sf-utils** there is a Python module that allows you to read directly the +buffer in order to debug it or to verify the consistency between the HDF5 file +and the received data. + +- VerifyH5DataConsistency.py checks the consistency between the H5 file and +buffer. +- BinaryBufferReader.py reads the buffer and prints meta. The class inside +can also be used in external scripts. + +### ZMQ sending + +A copy of the data written to disk is also send via ZMQ to the sf-stream. This +is used to provide live viewing / processing capabilities. Each module data is +sent separately, and this is later assembled in the sf-stream. + +We use the PUB/SUB mechanism for distributing this data - we cannot control the +rate of the producer, and we would like to avoid distributed image assembly +if possible, so PUSH/PULL does not make sense in this case. + +We provide no guarantees on live data delivery, but in practice the number of +dropped or incomplete frames in currently negligible. + +The protocol is a serialization of the same data structures we use to +write on disk (no need for additional memory operations before sending out +data). It uses a 2 part multipart ZMQ message: + +- The first part is a serialization of the ModuleFrame struct (see above). +- The second part is the data field in the BufferBinaryFormat struct (the frame +data). diff --git a/jfj-udp-recv/include/JfjFrameStats.hpp b/jfj-udp-recv/include/JfjFrameStats.hpp index 7839a38..7d53b6c 100644 --- a/jfj-udp-recv/include/JfjFrameStats.hpp +++ b/jfj-udp-recv/include/JfjFrameStats.hpp @@ -1,31 +1,28 @@ -#include -#include -#include - -#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP -#define SF_DAQ_BUFFER_FRAMESTATS_HPP - - -class FrameStats { - const std::string detector_name_; - const int module_id_; - size_t stats_time_; - - int frames_counter_; - int n_missed_packets_; - int n_corrupted_frames_; - int n_corrupted_pulse_id_; - std::chrono::time_point stats_interval_start_; - - void reset_counters(); - void print_stats(); - -public: - FrameStats(const std::string &detector_name, - const int module_id, - const size_t stats_time); - void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); -}; - - -#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP +#include +#include +#include + +#ifndef SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP + + +class FrameStats { + const std::string detector_name_; + size_t stats_time_; + + int frames_counter_; + int n_missed_packets_; + int n_corrupted_frames_; + int n_corrupted_pulse_id_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + FrameStats(const std::string &detector_name, const size_t stats_time); + void record_stats(const ImageMetadata &meta, const bool bad_pulse_id); +}; + + +#endif //SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP diff --git a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp index d5c5ddf..fc9a284 100644 --- a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp +++ b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp @@ -6,7 +6,7 @@ #include "formats.hpp" #include "buffer_config.hpp" #include "PacketBuffer.hpp" - +#include "jungfraujoch.hpp" /** JungfrauJoch UDP receiver @@ -14,11 +14,12 @@ NOTE: This design will not scale well for higher frame rates... **/ class JfjFrameUdpReceiver { - PacketUdpReceiver udp_receiver_; + PacketUdpReceiver m_udp_receiver; + uint64_t m_frame_index; PacketBuffer m_buffer; - inline void init_frame(ImageMetadata& frame_metadata, jfjoch_packet_t& c_packet); + inline void init_frame(ImageMetadata& frame_metadata, const jfjoch_packet_t& c_packet); inline uint64_t process_packets(ImageMetadata& metadata, char* frame_buffer); public: diff --git a/jfj-udp-recv/include/PacketBuffer.hpp b/jfj-udp-recv/include/PacketBuffer.hpp index 46958e6..b2be33b 100644 --- a/jfj-udp-recv/include/PacketBuffer.hpp +++ b/jfj-udp-recv/include/PacketBuffer.hpp @@ -34,8 +34,8 @@ public: /**Diagnostics**/ size_t size() const { return ( idx_write-idx_read ); } size_t capacity() const { return m_capacity; } - bool is_full() const { return (idx_write >= m_capacity); } - bool is_empty() const { return (idx_write <= idx_read); } + bool is_full() const { return bool(idx_write >= m_capacity); } + bool is_empty() const { return bool(idx_write <= idx_read); } /**Operators**/ void reset(){ idx_write = 0; idx_read = 0; }; // Reset the buffer @@ -43,15 +43,15 @@ public: mmsghdr& msgs(){ return m_msgs; }; /**Element access**/ - const T& pop_front(); //Destructive read + T& pop_front(); //Destructive read const T& peek_front(); //Non-destructive read void push_back(T item); //Write new element to buffer /**Fill from UDP receiver**/ template - void fill_fom(TY& recv){ + void fill_from(TY& recv){ std::lock_guard g_guard(m_mutex); - this->idx_write = recv.receive_many(this->msgs(), this->capacity()); + this->idx_write = recv.receive_many(m_msgs, this->capacity()); this->idx_read = 0; } @@ -80,7 +80,7 @@ private: Standard read access to queues (i.e. progress the read pointer). Throws 'std::length_error' if container is empty. **/ template -const T& PacketBuffer::pop_front(){ +T& PacketBuffer::pop_front(){ std::lock_guard g_guard(m_mutex); if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } idx_read++; diff --git a/jfj-udp-recv/include/PacketUdpReceiver.hpp b/jfj-udp-recv/include/PacketUdpReceiver.hpp index da92d85..cc7e093 100644 --- a/jfj-udp-recv/include/PacketUdpReceiver.hpp +++ b/jfj-udp-recv/include/PacketUdpReceiver.hpp @@ -1,22 +1,22 @@ -#ifndef UDPRECEIVER_H -#define UDPRECEIVER_H - -#include - -class PacketUdpReceiver { - - int socket_fd_; - -public: - PacketUdpReceiver(); - virtual ~PacketUdpReceiver(); - - bool receive(void* buffer, const size_t buffer_n_bytes); - int receive_many(mmsghdr* msgs, const size_t n_msgs); - - void bind(const uint16_t port); - void disconnect(); -}; - - -#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H +#ifndef UDPRECEIVER_H +#define UDPRECEIVER_H + +#include + +class PacketUdpReceiver { + + int socket_fd_; + +public: + PacketUdpReceiver(); + virtual ~PacketUdpReceiver(); + + bool receive(void* buffer, const size_t buffer_n_bytes); + int receive_many(mmsghdr* msgs, const size_t n_msgs); + + void bind(const uint16_t port); + void disconnect(); +}; + + +#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H diff --git a/jfj-udp-recv/mockmain.cpp b/jfj-udp-recv/mockmain.cpp deleted file mode 100644 index 1d33d2b..0000000 --- a/jfj-udp-recv/mockmain.cpp +++ /dev/null @@ -1,15 +0,0 @@ - -#include "include/PacketBuffer.hpp" - - -struct DummyContainer{ - uint64_t index; - uint64_t timestamp; - uint16_t data[32]; -}; - - -int main (int argc, char *argv[]) { - PacketBuffer b; - -} diff --git a/jfj-udp-recv/src/JfjFrameStats.cpp b/jfj-udp-recv/src/JfjFrameStats.cpp index 28161c7..36bb614 100644 --- a/jfj-udp-recv/src/JfjFrameStats.cpp +++ b/jfj-udp-recv/src/JfjFrameStats.cpp @@ -1,71 +1,54 @@ -#include -#include "FrameStats.hpp" - -using namespace std; -using namespace chrono; - -FrameStats::FrameStats( - const std::string &detector_name, - const int module_id, - const size_t stats_time) : - detector_name_(detector_name), - module_id_(module_id), - stats_time_(stats_time) -{ - reset_counters(); -} - -void FrameStats::reset_counters() -{ - frames_counter_ = 0; - n_missed_packets_ = 0; - n_corrupted_frames_ = 0; - n_corrupted_pulse_id_ = 0; - stats_interval_start_ = steady_clock::now(); -} - -void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) -{ - - if (bad_pulse_id) { - n_corrupted_pulse_id_++; - } - - if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { - n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; - n_corrupted_frames_++; - } - - frames_counter_++; - - auto time_passed = duration_cast( - steady_clock::now()-stats_interval_start_).count(); - - if (time_passed >= stats_time_*1000) { - print_stats(); - reset_counters(); - } -} - -void FrameStats::print_stats() -{ - auto interval_ms_duration = duration_cast( - steady_clock::now()-stats_interval_start_).count(); - // * 1000 because milliseconds, + 250 because of truncation. - int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; - uint64_t timestamp = time_point_cast( - system_clock::now()).time_since_epoch().count(); - - // Output in InfluxDB line protocol - cout << "jf_udp_recv"; - cout << ",detector_name=" << detector_name_; - cout << ",module_name=M" << module_id_; - cout << " "; - cout << "n_missed_packets=" << n_missed_packets_ << "i"; - cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; - cout << ",repetition_rate=" << rep_rate << "i"; - cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; - cout << " "; - cout << timestamp; - cout << endl; -} +#include +#include "JfjFrameStats.hpp" + +using namespace std; +using namespace chrono; + +FrameStats::FrameStats(const std::string &detector_name, const size_t stats_time) : + detector_name_(detector_name), stats_time_(stats_time) { + reset_counters(); +} + +void FrameStats::reset_counters() +{ + frames_counter_ = 0; + n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void FrameStats::record_stats(const ImageMetadata &meta, const bool bad_pulse_id) +{ + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + n_corrupted_frames_++; + } + + frames_counter_++; + + auto time_passed = duration_cast(steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { + print_stats(); + reset_counters(); + } +} + +void FrameStats::print_stats(){ + auto interval_ms_duration = duration_cast(steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast(system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jfj_udp_recv"; + cout << ",detector_name=" << detector_name_; + cout << " "; + cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp index aa38ec9..88b9ec0 100644 --- a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp +++ b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp @@ -1,23 +1,23 @@ #include -#include +#include #include "JfjFrameUdpReceiver.hpp" using namespace std; using namespace buffer_config; JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) { - udp_receiver_.bind(port); + m_udp_receiver.bind(port); } JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { - udp_receiver_.disconnect(); + m_udp_receiver.disconnect(); } -inline void JfjFrameUdpReceiver::init_frame(ImageMetadata& frame_metadata, const jfjoch_packet_t& c_packet) { - frame_metadata.pulse_id = c_packet.timestamp; - frame_metadata.frame_index = c_packet.framenum; - frame_metadata.daq_rec = (uint32_t) c_packet.debug; - frame_metadata.is_good_image = (int32_t) true; +inline void JfjFrameUdpReceiver::init_frame(ImageMetadata& metadata, const jfjoch_packet_t& c_packet) { + metadata.pulse_id = c_packet.timestamp; + metadata.frame_index = c_packet.framenum; + metadata.daq_rec = (uint32_t) c_packet.debug; + metadata.is_good_image = (int32_t) true; } inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, char* frame_buffer){ @@ -26,7 +26,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch // Happens if the last packet from the previous frame gets lost. if (m_frame_index != m_buffer.peek_front().framenum) { m_frame_index = m_buffer.peek_front().framenum; - frame_metadata.is_good_image = (int32_t) false; + metadata.is_good_image = (int32_t) false; return metadata.pulse_id; } @@ -38,12 +38,11 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch this->init_frame(metadata, c_packet); // Copy data to frame buffer - size_t offset = JUNGFRAU_DATA_BYTES_PER_PACKET * c_packet.packetnum; - memcpy( (void*) (frame_buffer + offset), c_packet.data, JUNGFRAU_DATA_BYTES_PER_PACKET); - metadata.n_recv_packets++; + size_t offset = JFJOCH_DATA_BYTES_PER_PACKET * c_packet.packetnum; + memcpy( (void*) (frame_buffer + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET); // Last frame packet received. Frame finished. - if (c_packet.packetnum == JFJ_N_PACKETS_PER_FRAME - 1){ + if (c_packet.packetnum == JFJOCH_N_PACKETS_PER_FRAME - 1){ return metadata.pulse_id; } } @@ -54,10 +53,9 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch } uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){ - // Reset the metadata and frame buffer for the next frame. + // Reset the metadata and frame buffer for the next frame. (really needed?) metadata.pulse_id = 0; - metadata.n_recv_packets = 0; - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_PACKET); // Process leftover packages in the buffer if (!m_buffer.is_empty()) { diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp index 4bd2a17..df9d0fa 100644 --- a/jfj-udp-recv/src/main.cpp +++ b/jfj-udp-recv/src/main.cpp @@ -5,9 +5,9 @@ #include "formats.hpp" #include "buffer_config.hpp" -#include "FrameUdpReceiver.hpp" +#include "JfjFrameUdpReceiver.hpp" #include "BufferUtils.hpp" -#include "FrameStats.hpp" +#include "JfjFrameStats.hpp" using namespace std; using namespace chrono; @@ -18,7 +18,7 @@ int main (int argc, char *argv[]) { if (argc != 3) { cout << endl; - cout << "Usage: jf_udp_recv [detector_json_filename] [module_id]"; + cout << "Usage: jfj_udp_recv [detector_json_filename] [module_id]"; cout << endl; cout << "\tdetector_json_filename: detector config file path." << endl; cout << "\tmodule_id: id of the module for this process." << endl; @@ -30,18 +30,18 @@ int main (int argc, char *argv[]) { const auto config = read_json_config(string(argv[1])); const int module_id = atoi(argv[2]); - const auto udp_port = config.start_udp_port + module_id; - ImageUdpReceiver receiver(udp_port, module_id); + const auto udp_port = config.start_udp_port; + JfjFrameUdpReceiver receiver(udp_port); RamBuffer buffer(config.detector_name, config.n_modules); - ImageStats stats(config.detector_name, module_id, STATS_TIME); + FrameStats stats(config.detector_name, STATS_TIME); auto ctx = zmq_ctx_new(); - auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); - + zmq_ctx_set(ctx, ZMQ_IO_THREADS, ZMQ_IO_THREADS); + auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "jungfraujoch"); // Might be better creating a structure for double buffering ImageMetadata metaBufferA; - char* dataBufferA = new char[IMAGE_N_BYTES]; + char* dataBufferA = new char[JFJOCH_DATA_BYTES_PER_FRAME]; uint64_t pulse_id_previous = 0; uint64_t frame_index_previous = 0; @@ -49,22 +49,15 @@ int main (int argc, char *argv[]) { while (true) { // NOTE: Needs to be pipelined for really high frame rates - auto pulse_id = receiver.get_image_from_udp(metaBufferA, dataBufferA); + auto pulse_id = receiver.get_frame_from_udp(metaBufferA, dataBufferA); bool bad_pulse_id = false; - if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) || - ( (pulse_id-pulse_id_previous) < 0 ) || - ( (pulse_id-pulse_id_previous) > 1000 ) ) { - + if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) { bad_pulse_id = true; - } else { - buffer.write_frame(metaBufferA, dataBufferA); - - zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); - + zmq_send(sender, &metaBufferA, sizeof(metaBufferA), 0); } stats.record_stats(metaBufferA, bad_pulse_id); @@ -74,5 +67,5 @@ int main (int argc, char *argv[]) { } - delete[] data; + delete[] dataBufferA; } From 53abc8e867c2adb5e9f58543ab5d55e1622ef651 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Tue, 1 Jun 2021 13:49:30 +0200 Subject: [PATCH 4/5] It compiles --- jfj-udp-recv/include/JfjFrameStats.hpp | 13 +- jfj-udp-recv/include/JfjFrameUdpReceiver.hpp | 6 +- jfj-udp-recv/src/JfjFrameStats.cpp | 125 +++++++++++-------- jfj-udp-recv/src/JfjFrameUdpReceiver.cpp | 17 +-- jfj-udp-recv/src/main.cpp | 30 +++-- 5 files changed, 108 insertions(+), 83 deletions(-) diff --git a/jfj-udp-recv/include/JfjFrameStats.hpp b/jfj-udp-recv/include/JfjFrameStats.hpp index 7d53b6c..7ab6636 100644 --- a/jfj-udp-recv/include/JfjFrameStats.hpp +++ b/jfj-udp-recv/include/JfjFrameStats.hpp @@ -2,12 +2,13 @@ #include #include -#ifndef SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP -#define SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP class FrameStats { const std::string detector_name_; + const int module_id_; size_t stats_time_; int frames_counter_; @@ -20,9 +21,11 @@ class FrameStats { void print_stats(); public: - FrameStats(const std::string &detector_name, const size_t stats_time); - void record_stats(const ImageMetadata &meta, const bool bad_pulse_id); + FrameStats(const std::string &detector_name, + const int module_id, + const size_t stats_time); + void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); }; -#endif //SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp index fc9a284..aad5962 100644 --- a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp +++ b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp @@ -19,13 +19,13 @@ class JfjFrameUdpReceiver { PacketBuffer m_buffer; - inline void init_frame(ImageMetadata& frame_metadata, const jfjoch_packet_t& c_packet); - inline uint64_t process_packets(ImageMetadata& metadata, char* frame_buffer); + inline void init_frame(ModuleFrame& frame_metadata, const jfjoch_packet_t& c_packet); + inline uint64_t process_packets(ModuleFrame& metadata, char* frame_buffer); public: JfjFrameUdpReceiver(const uint16_t port); virtual ~JfjFrameUdpReceiver(); - uint64_t get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer); + uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); }; diff --git a/jfj-udp-recv/src/JfjFrameStats.cpp b/jfj-udp-recv/src/JfjFrameStats.cpp index 36bb614..ee48423 100644 --- a/jfj-udp-recv/src/JfjFrameStats.cpp +++ b/jfj-udp-recv/src/JfjFrameStats.cpp @@ -1,54 +1,71 @@ -#include -#include "JfjFrameStats.hpp" - -using namespace std; -using namespace chrono; - -FrameStats::FrameStats(const std::string &detector_name, const size_t stats_time) : - detector_name_(detector_name), stats_time_(stats_time) { - reset_counters(); -} - -void FrameStats::reset_counters() -{ - frames_counter_ = 0; - n_corrupted_frames_ = 0; - n_corrupted_pulse_id_ = 0; - stats_interval_start_ = steady_clock::now(); -} - -void FrameStats::record_stats(const ImageMetadata &meta, const bool bad_pulse_id) -{ - - if (bad_pulse_id) { - n_corrupted_pulse_id_++; - n_corrupted_frames_++; - } - - frames_counter_++; - - auto time_passed = duration_cast(steady_clock::now()-stats_interval_start_).count(); - - if (time_passed >= stats_time_*1000) { - print_stats(); - reset_counters(); - } -} - -void FrameStats::print_stats(){ - auto interval_ms_duration = duration_cast(steady_clock::now()-stats_interval_start_).count(); - // * 1000 because milliseconds, + 250 because of truncation. - int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; - uint64_t timestamp = time_point_cast(system_clock::now()).time_since_epoch().count(); - - // Output in InfluxDB line protocol - cout << "jfj_udp_recv"; - cout << ",detector_name=" << detector_name_; - cout << " "; - cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; - cout << ",repetition_rate=" << rep_rate << "i"; - cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; - cout << " "; - cout << timestamp; - cout << endl; -} +#include +#include "JfjFrameStats.hpp" + +using namespace std; +using namespace chrono; + +FrameStats::FrameStats( + const std::string &detector_name, + const int module_id, + const size_t stats_time) : + detector_name_(detector_name), + module_id_(module_id), + stats_time_(stats_time) +{ + reset_counters(); +} + +void FrameStats::reset_counters() +{ + frames_counter_ = 0; + n_missed_packets_ = 0; + n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) +{ + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + } + + if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { + n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; + n_corrupted_frames_++; + } + + frames_counter_++; + + auto time_passed = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { + print_stats(); + reset_counters(); + } +} + +void FrameStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_udp_recv"; + cout << ",detector_name=" << detector_name_; + cout << ",module_name=M" << module_id_; + cout << " "; + cout << "n_missed_packets=" << n_missed_packets_ << "i"; + cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp index 88b9ec0..c7c6724 100644 --- a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp +++ b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp @@ -13,20 +13,19 @@ JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { m_udp_receiver.disconnect(); } -inline void JfjFrameUdpReceiver::init_frame(ImageMetadata& metadata, const jfjoch_packet_t& c_packet) { - metadata.pulse_id = c_packet.timestamp; - metadata.frame_index = c_packet.framenum; - metadata.daq_rec = (uint32_t) c_packet.debug; - metadata.is_good_image = (int32_t) true; +inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { + metadata.pulse_id = c_packet.bunchid; + metadata.frame_index = c_packet.framenum; + metadata.daq_rec = (uint64_t) c_packet.debug; + metadata.module_id = (int64_t) 0; } -inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, char* frame_buffer){ +inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char* frame_buffer){ while(!m_buffer.is_empty()){ // Happens if the last packet from the previous frame gets lost. if (m_frame_index != m_buffer.peek_front().framenum) { m_frame_index = m_buffer.peek_front().framenum; - metadata.is_good_image = (int32_t) false; return metadata.pulse_id; } @@ -40,6 +39,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch // Copy data to frame buffer size_t offset = JFJOCH_DATA_BYTES_PER_PACKET * c_packet.packetnum; memcpy( (void*) (frame_buffer + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET); + metadata.n_recv_packets++; // Last frame packet received. Frame finished. if (c_packet.packetnum == JFJOCH_N_PACKETS_PER_FRAME - 1){ @@ -52,9 +52,10 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch return 0; } -uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){ +uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer){ // Reset the metadata and frame buffer for the next frame. (really needed?) metadata.pulse_id = 0; + metadata.n_recv_packets = 0; memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_PACKET); // Process leftover packages in the buffer diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp index df9d0fa..2c52f25 100644 --- a/jfj-udp-recv/src/main.cpp +++ b/jfj-udp-recv/src/main.cpp @@ -18,30 +18,29 @@ int main (int argc, char *argv[]) { if (argc != 3) { cout << endl; - cout << "Usage: jfj_udp_recv [detector_json_filename] [module_id]"; + cout << "Usage: jfj_udp_recv [detector_json_filename]"; cout << endl; cout << "\tdetector_json_filename: detector config file path." << endl; - cout << "\tmodule_id: id of the module for this process." << endl; cout << endl; exit(-1); } const auto config = read_json_config(string(argv[1])); - const int module_id = atoi(argv[2]); const auto udp_port = config.start_udp_port; JfjFrameUdpReceiver receiver(udp_port); RamBuffer buffer(config.detector_name, config.n_modules); - FrameStats stats(config.detector_name, STATS_TIME); + FrameStats stats(config.detector_name, 0, STATS_TIME); auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, ZMQ_IO_THREADS); auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "jungfraujoch"); // Might be better creating a structure for double buffering - ImageMetadata metaBufferA; - char* dataBufferA = new char[JFJOCH_DATA_BYTES_PER_FRAME]; + ModuleFrame frameMeta; + ImageMetadata imageMeta; + char* dataBuffer = new char[JFJOCH_DATA_BYTES_PER_FRAME]; uint64_t pulse_id_previous = 0; uint64_t frame_index_previous = 0; @@ -49,23 +48,28 @@ int main (int argc, char *argv[]) { while (true) { // NOTE: Needs to be pipelined for really high frame rates - auto pulse_id = receiver.get_frame_from_udp(metaBufferA, dataBufferA); + auto pulse_id = receiver.get_frame_from_udp(frameMeta, dataBuffer); bool bad_pulse_id = false; - if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) { + if ( ( frameMeta.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) { bad_pulse_id = true; } else { - buffer.write_frame(metaBufferA, dataBufferA); - zmq_send(sender, &metaBufferA, sizeof(metaBufferA), 0); + imageMeta.pulse_id = frameMeta.pulse_id; + imageMeta.frame_index = frameMeta.frame_index; + imageMeta.daq_rec = frameMeta.daq_rec; + imageMeta.is_good_image = true; + + buffer.write_frame(frameMeta, dataBuffer); + zmq_send(sender, &imageMeta, sizeof(imageMeta), 0); } - stats.record_stats(metaBufferA, bad_pulse_id); + stats.record_stats(frameMeta, bad_pulse_id); pulse_id_previous = pulse_id; - frame_index_previous = metaBufferA.frame_index; + frame_index_previous = frameMeta.frame_index; } - delete[] dataBufferA; + delete[] dataBuffer; } From 1406284d41ca159405adee03c251d2a0f66a9c5d Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Wed, 2 Jun 2021 13:41:57 +0200 Subject: [PATCH 5/5] Tests and bugfix --- core-buffer/include/jungfraujoch.hpp | 10 ++--- jfj-udp-recv/CMakeLists.txt | 2 +- jfj-udp-recv/include/PacketBuffer.hpp | 10 +++-- jfj-udp-recv/src/JfjFrameUdpReceiver.cpp | 22 ++++++++--- jfj-udp-recv/src/main.cpp | 3 +- jfj-udp-recv/test/CMakeLists.txt | 6 +-- jfj-udp-recv/test/test_FrameUdpReceiver.cpp | 42 ++++++++++----------- 7 files changed, 53 insertions(+), 42 deletions(-) diff --git a/core-buffer/include/jungfraujoch.hpp b/core-buffer/include/jungfraujoch.hpp index fd549b2..6d219ca 100644 --- a/core-buffer/include/jungfraujoch.hpp +++ b/core-buffer/include/jungfraujoch.hpp @@ -1,13 +1,13 @@ -#ifndef JUNGFRAUJOCH_H -#define JUNGFRAUJOCH_H +#ifndef JUNGFRAUJOCH_HPP +#define JUNGFRAUJOCH_HPP #include #define JFJOCH_N_MODULES 32 #define JFJOCH_BYTES_PER_PACKET 8240 #define JFJOCH_DATA_BYTES_PER_PACKET 8192 -#define JFJOCH_N_PACKETS_PER_FRAME JFJOCH_N_MODULES * 128 -#define JFJOCH_DATA_BYTES_PER_FRAME JFJOCH_N_MODULES * 1048576 +#define JFJOCH_N_PACKETS_PER_FRAME (JFJOCH_N_MODULES * 128) +#define JFJOCH_DATA_BYTES_PER_FRAME (JFJOCH_N_MODULES * 1048576) // 48 bytes + 8192 bytes = 8240 bytes #pragma pack(push) @@ -17,7 +17,7 @@ struct jfjoch_packet_t { uint32_t exptime; uint32_t packetnum; - uint64_t bunchid; + int64_t bunchid; uint64_t timestamp; uint16_t moduleID; diff --git a/jfj-udp-recv/CMakeLists.txt b/jfj-udp-recv/CMakeLists.txt index 7380471..8a19873 100644 --- a/jfj-udp-recv/CMakeLists.txt +++ b/jfj-udp-recv/CMakeLists.txt @@ -9,4 +9,4 @@ set_target_properties(jfj-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv) target_link_libraries(jfj-udp-recv jfj-udp-recv-lib zmq rt) enable_testing() -# add_subdirectory(test/) +add_subdirectory(test/) diff --git a/jfj-udp-recv/include/PacketBuffer.hpp b/jfj-udp-recv/include/PacketBuffer.hpp index b2be33b..0697a5f 100644 --- a/jfj-udp-recv/include/PacketBuffer.hpp +++ b/jfj-udp-recv/include/PacketBuffer.hpp @@ -32,8 +32,8 @@ public: // ~PacketBuffer() {}; /**Diagnostics**/ - size_t size() const { return ( idx_write-idx_read ); } - size_t capacity() const { return m_capacity; } + int size() const { return ( idx_write-idx_read ); } + int capacity() const { return m_capacity; } bool is_full() const { return bool(idx_write >= m_capacity); } bool is_empty() const { return bool(idx_write <= idx_read); } @@ -52,6 +52,8 @@ public: void fill_from(TY& recv){ std::lock_guard g_guard(m_mutex); this->idx_write = recv.receive_many(m_msgs, this->capacity()); + // Returns -1 with errno=11 if no data received + if(idx_write==-1){ idx_write = 0; } this->idx_read = 0; } @@ -62,8 +64,8 @@ private: /**Guards**/ std::mutex m_mutex; /**Read and write index**/ - size_t idx_write = 0; - size_t idx_read = 0; + int idx_write = 0; + int idx_read = 0; // C-structures as expected by mmsghdr m_msgs[CAPACITY]; diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp index c7c6724..059da2b 100644 --- a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp +++ b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp @@ -1,10 +1,17 @@ #include -#include #include "JfjFrameUdpReceiver.hpp" using namespace std; using namespace buffer_config; +std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) { + os << "Frame number: " << packet.framenum << std::endl; + os << "Packet number: " << packet.packetnum << std::endl; + os << "Bunch id: " << packet.bunchid << std::endl; + os << std::endl; + return os; +} + JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) { m_udp_receiver.bind(port); } @@ -13,7 +20,9 @@ JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { m_udp_receiver.disconnect(); } -inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { +inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) { + // std::cout << c_packet; + metadata.pulse_id = c_packet.bunchid; metadata.frame_index = c_packet.framenum; metadata.daq_rec = (uint64_t) c_packet.debug; @@ -26,6 +35,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char // Happens if the last packet from the previous frame gets lost. if (m_frame_index != m_buffer.peek_front().framenum) { m_frame_index = m_buffer.peek_front().framenum; + std::cout << "Peeked pulse: " << metadata.pulse_id << std::endl; return metadata.pulse_id; } @@ -48,7 +58,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char } // We emptied the buffer. - m_buffer.reset(); + // m_buffer.reset(); return 0; } @@ -56,8 +66,7 @@ uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* fr // Reset the metadata and frame buffer for the next frame. (really needed?) metadata.pulse_id = 0; metadata.n_recv_packets = 0; - memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_PACKET); - + memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_FRAME); // Process leftover packages in the buffer if (!m_buffer.is_empty()) { auto pulse_id = process_packets(metadata, frame_buffer); @@ -66,7 +75,8 @@ uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* fr while (true) { // Receive new packages (pass if none)... - m_buffer.fill_from(m_udp_receiver); + m_buffer.reset(); + m_buffer.fill_from(m_udp_receiver); if (m_buffer.is_empty()) { continue; } // ... and process them diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp index 2c52f25..2afb451 100644 --- a/jfj-udp-recv/src/main.cpp +++ b/jfj-udp-recv/src/main.cpp @@ -18,8 +18,7 @@ int main (int argc, char *argv[]) { if (argc != 3) { cout << endl; - cout << "Usage: jfj_udp_recv [detector_json_filename]"; - cout << endl; + cout << "Usage: jfj_udp_recv [detector_json_filename]" << endl; cout << "\tdetector_json_filename: detector config file path." << endl; cout << endl; diff --git a/jfj-udp-recv/test/CMakeLists.txt b/jfj-udp-recv/test/CMakeLists.txt index 25c729a..77eeb3a 100644 --- a/jfj-udp-recv/test/CMakeLists.txt +++ b/jfj-udp-recv/test/CMakeLists.txt @@ -1,8 +1,8 @@ -add_executable(jf-udp-recv-tests main.cpp) +add_executable(jfj-udp-recv-tests main.cpp) -target_link_libraries(jf-udp-recv-tests +target_link_libraries(jfj-udp-recv-tests core-buffer-lib - jf-udp-recv-lib + jfj-udp-recv-lib gtest ) diff --git a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp index e1e86ab..3f490c7 100644 --- a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp +++ b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp @@ -1,7 +1,7 @@ #include -#include +#include #include "gtest/gtest.h" -#include "FrameUdpReceiver.hpp" +#include "JfjFrameUdpReceiver.hpp" #include "mock/udp.hpp" #include @@ -12,7 +12,7 @@ using namespace std; TEST(BufferUdpReceiver, simple_recv) { - auto n_packets = JF_N_PACKETS_PER_FRAME; + int n_packets = JFJOCH_N_PACKETS_PER_FRAME; int n_frames = 5; uint16_t udp_port = MOCK_UDP_PORT; @@ -23,9 +23,9 @@ TEST(BufferUdpReceiver, simple_recv) JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + auto frame_buffer = make_unique(JFJOCH_DATA_BYTES_PER_FRAME); for (int i_frame=0; i_frame < n_frames; i_frame++) { auto pulse_id = udp_receiver.get_frame_from_udp( @@ -63,7 +63,7 @@ TEST(BufferUdpReceiver, simple_recv) TEST(BufferUdpReceiver, missing_middle_packet) { - auto n_packets = JF_N_PACKETS_PER_FRAME; + int n_packets = JFJOCH_N_PACKETS_PER_FRAME; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -71,17 +71,17 @@ TEST(BufferUdpReceiver, missing_middle_packet) auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); ASSERT_TRUE(send_socket_fd >= 0); - JfjFrameUdpReceiver udp_receiver(udp_port, source_id); + JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + auto frame_buffer = make_unique(JFJOCH_DATA_BYTES_PER_FRAME); for (int i_frame=0; i_frame < n_frames; i_frame++) { auto pulse_id = udp_receiver.get_frame_from_udp( @@ -119,7 +119,7 @@ TEST(BufferUdpReceiver, missing_middle_packet) TEST(BufferUdpReceiver, missing_first_packet) { - auto n_packets = JF_N_PACKETS_PER_FRAME; + auto n_packets = JFJOCH_N_PACKETS_PER_FRAME; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -130,14 +130,14 @@ TEST(BufferUdpReceiver, missing_first_packet) JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + auto frame_buffer = make_unique(JFJOCH_DATA_BYTES_PER_FRAME); for (int i_frame=0; i_frame < n_frames; i_frame++) { auto pulse_id = udp_receiver.get_frame_from_udp( @@ -175,7 +175,7 @@ TEST(BufferUdpReceiver, missing_first_packet) TEST(BufferUdpReceiver, missing_last_packet) { - auto n_packets = JF_N_PACKETS_PER_FRAME; + int n_packets = JFJOCH_N_PACKETS_PER_FRAME; int n_frames = 3; uint16_t udp_port = MOCK_UDP_PORT; @@ -186,14 +186,14 @@ TEST(BufferUdpReceiver, missing_last_packet) JfjFrameUdpReceiver udp_receiver(udp_port); auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + auto frame_buffer = make_unique(JFJOCH_DATA_BYTES_PER_FRAME); // n_frames -1 because the last frame is not complete. for (int i_frame=0; i_frame < n_frames - 1; i_frame++) {