diff --git a/CMakeLists.txt b/CMakeLists.txt index 74c7972..dbef01c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,7 @@ add_subdirectory("core-buffer") add_subdirectory("jf-udp-recv") add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") +add_subdirectory("jfj-udp-recv") add_subdirectory("sf-stream") add_subdirectory("sf-writer") diff --git a/core-buffer/include/jungfraujoch.hpp b/core-buffer/include/jungfraujoch.hpp new file mode 100644 index 0000000..bbfbb08 --- /dev/null +++ b/core-buffer/include/jungfraujoch.hpp @@ -0,0 +1,37 @@ +#ifndef JUNGFRAUJOCH_H +#define JUNGFRAUJOCH_H + +#include + +#define JFJOCH_N_MODULES 32 +#define JFJOCH_BYTES_PER_PACKET 8240 +#define JFJOCH_DATA_BYTES_PER_PACKET 8192 +#define JFJOCH_N_PACKETS_PER_FRAME JFJOCH_N_MODULES * 128 +#define JFJOCH_DATA_BYTES_PER_FRAME JFJOCH_N_MODULES * 1048576 + +// 48 bytes + 8192 bytes = 8240 bytes +#pragma pack(push) +#pragma pack(2) +struct jfjoch_packet_t { + uint64_t framenum; + uint32_t exptime; + uint32_t packetnum; + + double bunchid; + uint64_t timestamp; + + uint16_t moduleID; + uint16_t xCoord; + uint16_t yCoord; + uint16_t zCoord; + + uint32_t debug; + uint16_t roundRobin; + uint8_t detectortype; + uint8_t headerVersion; + char data[JFJOCH_DATA_BYTES_PER_PACKET]; +}; +#pragma pack(pop) + + +#endif diff --git a/jfj-udp-recv/CMakeLists.txt b/jfj-udp-recv/CMakeLists.txt new file mode 100644 index 0000000..3c83127 --- /dev/null +++ b/jfj-udp-recv/CMakeLists.txt @@ -0,0 +1,18 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(jf-udp-recv-lib STATIC ${SOURCES}) +target_include_directories(jf-udp-recv-lib PUBLIC include/) +target_link_libraries(jf-udp-recv-lib + external + core-buffer-lib) + +add_executable(jf-udp-recv src/main.cpp) +set_target_properties(jf-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv) +target_link_libraries(jf-udp-recv + jf-udp-recv-lib + zmq + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/jfj-udp-recv/README.md b/jfj-udp-recv/README.md new file mode 100644 index 0000000..504281f --- /dev/null +++ b/jfj-udp-recv/README.md @@ -0,0 +1,164 @@ +# sf-buffer +sf-buffer is the component that receives the detector data in form of UDP +packages and writes them down to disk to a binary format. In addition, it +sends a copy of the module frame to sf-stream via ZMQ. + +Each sf-buffer process is taking care of a single detector module. The +processes are all independent and do not rely on any external data input +to maximize isolation and possible interactions in our system. + +The main design principle is simplicity and decoupling: + +- No interprocess dependencies/communication. +- No dependencies on external libraries (as much as possible). +- Using POSIX as much as possible. + +We are optimizing for maintainability and long term stability. Performance is +of concern only if the performance criteria are not met. + +## Overview + +![image_buffer_overview](../docs/sf_daq_buffer-overview-buffer.jpg) + +sf-buffer is a single threaded application (without counting the ZMQ IO threads) +that does both receiving, assembling, writing and sending in the same thread. + +### UDP receiving + +Each process listens to one udp port. Packets coming to this udp port are +assembled into frames. Frames (either complete or with missing packets) are +passed forward. The number of received packets is saved so we can later +(at image assembly time) determine if the frame is valid or not. At this point +we do no validation. + +We are currently using **recvmmsg** to minimize the number of switches to +kernel mode. + +We expect all packets to come in order or not come at all. Once we see the +package for the next pulse_id we can assume no more packages are coming for +the previous one, and send the assembled frame down the program. + +### File writing + +Files are written to disk in frames - one write to disk per frame. This gives +us a relaxed 10ms interval of 1 MB writes. + +#### File format + +The binary file on disk is just a serialization of multiple +**BufferBinaryFormat** structs: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) + +#pragma pack(push) +#pragma pack(1) +struct BufferBinaryFormat { + const char FORMAT_MARKER = 0xBE; + ModuleFrame meta; + char data[buffer_config::MODULE_N_BYTES]; +}; +#pragma pack(pop) +``` + +![file_layout_image](../docs/sf_daq_buffer-FileLayout.jpg) + +Each frame is composed by: + +- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame. +- **ModuleFrame** - frame meta used in image assembly phase. +- **Data** - assembled frame from a single module. + +Frames are written one after another to a specific offset in the file. The +offset is calculated based on the pulse_id, so each frame has a specific place +in the file and there is no need to have an index for frame retrieval. + +The offset where a specific pulse_id is written in a file is calculated: + +```c++ +// We save 1000 pulses in each file. +const uint64_t FILE_MOD = 1000 + +// Relative index of pulse_id inside file. +size_t file_base = pulse_id % FILE_MOD; +// Offset in bytes of relative index in file. +size_t file_offset = file_base * sizeof(BufferBinaryFormat); +``` + +We now know where to look for data inside the file, but we still don't know +inside which file to look. For this we need to discuss the folder structure. + +#### Folder structure + +The folder (as well as file) structure is deterministic in the sense that given +a specific pulse_id, we can directly calculate the folder, file, and file +offset where the data is stored. This allows us to have independent writing +and reading from the buffer without building any indexes. + +The binary files written by sf_buffer are saved to: + +[detector_folder]/[module_folder]/[data_folder]/[data_file].bin + +- **detector\_folder** should always be passed as an absolute path. This is the +container that holds all data related to a specific detector. +- **module\_folder** is usually composed like "M00", "M01". It separates data +from different modules of one detector. +- **data\_folder** and **data\_file** are automatically calculated based on the +current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our +index for accessing data. + +![folder_layout_image](../docs/sf_daq_buffer-FolderLayout.jpg) + +```c++ +// FOLDER_MOD = 100000 +int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD; +// FILE_MOD = 1000 +int data_file = (pulse_id % FILE_MOD) * FILE_MOD; +``` + +The data_folder and data_file folders are named as the first pulse_id that +should be stored inside them. + +FOLDER_MOD == 100000 means that each data_folder will contain data for 100000 +pulses, while FILE_MOD == 1000 means that each file inside the data_folder +will contain 1000 pulses. The total number of data_files in each data_folder +will therefore be **FILE\_MOD / FOLDER\_MOD = 100**. + +#### Analyzing the buffer on disk +In **sf-utils** there is a Python module that allows you to read directly the +buffer in order to debug it or to verify the consistency between the HDF5 file +and the received data. + +- VerifyH5DataConsistency.py checks the consistency between the H5 file and +buffer. +- BinaryBufferReader.py reads the buffer and prints meta. The class inside +can also be used in external scripts. + +### ZMQ sending + +A copy of the data written to disk is also send via ZMQ to the sf-stream. This +is used to provide live viewing / processing capabilities. Each module data is +sent separately, and this is later assembled in the sf-stream. + +We use the PUB/SUB mechanism for distributing this data - we cannot control the +rate of the producer, and we would like to avoid distributed image assembly +if possible, so PUSH/PULL does not make sense in this case. + +We provide no guarantees on live data delivery, but in practice the number of +dropped or incomplete frames in currently negligible. + +The protocol is a serialization of the same data structures we use to +write on disk (no need for additional memory operations before sending out +data). It uses a 2 part multipart ZMQ message: + +- The first part is a serialization of the ModuleFrame struct (see above). +- The second part is the data field in the BufferBinaryFormat struct (the frame +data). diff --git a/jfj-udp-recv/include/DataBuffer.hpp b/jfj-udp-recv/include/DataBuffer.hpp new file mode 100644 index 0000000..1d22d73 --- /dev/null +++ b/jfj-udp-recv/include/DataBuffer.hpp @@ -0,0 +1,92 @@ +#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP +#define CIRCULAR_BUFFER_TEMPLATE_HPP + +#include +#include +#include +#include +#include + +/**Linear data buffer + +A simplified version of FIFO. +**/ +template +class DataBuffer{ +public: + DataBuffer() {}; + ~DataBuffer() {}; + + /**Diagnostics**/ + size_t size() const { return ( _write-_read ); } + size_t capacity() const { return _capacity; } + bool is_full(){ return ( (_write - _read)<_capacity ); } + bool is_empty(){ return (_write ==_read); } + + /**Operators**/ + void zero(){ memset(m_cont, 0, sizeof(m_cont)); } + T& operator[](size_t index); // Array subscript operator + T& container(){ return (_cont; } // Direct container reference + + /**Element access**/ + const T& pop_front(); //Destructive read + const T& get_front(); //Non-destructive read + void push_back(T item); //Write new element to buffer + + /**Guards**/ + std::mutex g_mutex; +private: + T m_cont[CAP]; + const size_t m_capacity = CAP; + size_t ptr_write = 0; + size_t ptr_read = 0; +}; + +/** Array subscript operator + Throws 'std::length_error' if out of range. +**/ +template +T& DataBuffer::operator[](size_t idx){ + if(idx > m_capacity){ + std::string msg = "Buffer index '" + std::to_string(idx) + "' is out of range with capacity '" + std::to_sting(m_capacity) + "'" + std::endl; + throw std::out_of_range(msg); + } + + return m_buffer[idx]; +} + +template +T& DataBuffer::container(){ + return m_buffer; +} + +/*********************************************************************/ + +/** Destructive read (i.e. progress the read pointer) **/ +template +const T& DataBuffer::pop_front(){ + std::lock_guard g_guard; + ptr_read++; + return _buffer[ptr_read-1]; +} + +/**Push a new element to the ringbuffer (do not progress read pointer)**/ +template +const T& DataBuffer::peek_front(){ + return m_buffer[ptr_read]; +} + + +/**Push a new element to the ringbuffer**/ +template +void DataBuffer::push_back(T item){ + std::lock_guard g_guard; + if(ptr_write==m_capacity-1){ + std::string msg = "Buffer with '" + std::to_sting(m_capacity) + "' capacity is full" + std::endl; + throw std::out_of_range(msg); + } + m_buffer[ptr_write] = item; + ptr_write++; +} + +#endif // CIRCULAR_BUFFER_TEMPLATE_HPP diff --git a/jfj-udp-recv/include/ImageStats.hpp b/jfj-udp-recv/include/ImageStats.hpp new file mode 100644 index 0000000..7839a38 --- /dev/null +++ b/jfj-udp-recv/include/ImageStats.hpp @@ -0,0 +1,31 @@ +#include +#include +#include + +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP + + +class FrameStats { + const std::string detector_name_; + const int module_id_; + size_t stats_time_; + + int frames_counter_; + int n_missed_packets_; + int n_corrupted_frames_; + int n_corrupted_pulse_id_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + FrameStats(const std::string &detector_name, + const int module_id, + const size_t stats_time); + void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); +}; + + +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jfj-udp-recv/include/JFJochUdpReceiver.hpp b/jfj-udp-recv/include/JFJochUdpReceiver.hpp new file mode 100644 index 0000000..4d38dd4 --- /dev/null +++ b/jfj-udp-recv/include/JFJochUdpReceiver.hpp @@ -0,0 +1,33 @@ +#ifndef SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP +#define SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP + +#include +#include "PacketUdpReceiver.hpp" +#include "formats.hpp" +#include "buffer_config.hpp" + +class JFJochUdpReceiver { + PacketUdpReceiver m_udp_receiver; + + // Incoming packet buffers + jfjoch_packet_t m_packet_buffer[buffer_config::BUFFER_UDP_N_RECV_MSG]; + iovec m_recv_buff_ptr[buffer_config::BUFFER_UDP_N_RECV_MSG]; + mmsghdr m_msgs[buffer_config::BUFFER_UDP_N_RECV_MSG]; + sockaddr_in m_sock_from[buffer_config::BUFFER_UDP_N_RECV_MSG]; + + bool packet_buffer_loaded_ = false; + int packet_buffer_n_packets_ = 0; + int packet_buffer_offset_ = 0; + + inline void init_frame(ImageMetadata& frame_metadata, const int i_packet); + inline void copy_packet_to_buffers(ImageMetadata& metadata, char* frame_buffer, const int i_packet); + inline uint64_t m_process_packets(const int n_packets, ImageMetadata& metadata, char* frame_buffer); + +public: + JFJochUdpReceiver(const uint16_t port, const int module_id); + virtual ~JFJochUdpReceiver(); + uint64_t get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer); +}; + + +#endif //SF_DAQ_BUFFER_JOCHUDPRECEIVER_HPP diff --git a/jfj-udp-recv/include/PacketUdpReceiver.hpp b/jfj-udp-recv/include/PacketUdpReceiver.hpp new file mode 100644 index 0000000..da92d85 --- /dev/null +++ b/jfj-udp-recv/include/PacketUdpReceiver.hpp @@ -0,0 +1,22 @@ +#ifndef UDPRECEIVER_H +#define UDPRECEIVER_H + +#include + +class PacketUdpReceiver { + + int socket_fd_; + +public: + PacketUdpReceiver(); + virtual ~PacketUdpReceiver(); + + bool receive(void* buffer, const size_t buffer_n_bytes); + int receive_many(mmsghdr* msgs, const size_t n_msgs); + + void bind(const uint16_t port); + void disconnect(); +}; + + +#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H diff --git a/jfj-udp-recv/src/ImageStats.cpp b/jfj-udp-recv/src/ImageStats.cpp new file mode 100644 index 0000000..28161c7 --- /dev/null +++ b/jfj-udp-recv/src/ImageStats.cpp @@ -0,0 +1,71 @@ +#include +#include "FrameStats.hpp" + +using namespace std; +using namespace chrono; + +FrameStats::FrameStats( + const std::string &detector_name, + const int module_id, + const size_t stats_time) : + detector_name_(detector_name), + module_id_(module_id), + stats_time_(stats_time) +{ + reset_counters(); +} + +void FrameStats::reset_counters() +{ + frames_counter_ = 0; + n_missed_packets_ = 0; + n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) +{ + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + } + + if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { + n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; + n_corrupted_frames_++; + } + + frames_counter_++; + + auto time_passed = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { + print_stats(); + reset_counters(); + } +} + +void FrameStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_udp_recv"; + cout << ",detector_name=" << detector_name_; + cout << ",module_name=M" << module_id_; + cout << " "; + cout << "n_missed_packets=" << n_missed_packets_ << "i"; + cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/jfj-udp-recv/src/JFJochUdpReceiver.cpp b/jfj-udp-recv/src/JFJochUdpReceiver.cpp new file mode 100644 index 0000000..24ed15e --- /dev/null +++ b/jfj-udp-recv/src/JFJochUdpReceiver.cpp @@ -0,0 +1,112 @@ +#include +#include +#include "JFJochUdpReceiver.hpp" + +using namespace std; +using namespace buffer_config; + +JFJochUdpReceiver::JFJochUdpReceiver(const uint16_t port, const int module_id) : module_id_(module_id){ + udp_receiver_.bind(port); + + for (int i = 0; i < BUFFER_UDP_N_RECV_MSG; i++) { + m_recv_buff_ptr[i].iov_base = (void*) &(m_packet_buffer[i]); + m_recv_buff_ptr[i].iov_len = sizeof(jfjoch_packet_t); + + msgs_[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i]; + msgs_[i].msg_hdr.msg_iovlen = 1; + msgs_[i].msg_hdr.msg_name = &m_sock_from[i]; + msgs_[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } +} + +JFJochUdpReceiver::~JFJochUdpReceiver() { + m_udp_receiver.disconnect(); +} + +inline void JFJochUdpReceiver::init_frame(ImageMetadata& image_metadata, const int i_packet) { + image_metadata.pulse_id = m_packet_buffer[i_packet].bunchid; + image_metadata.frame_index = m_packet_buffer[i_packet].framenum; + image_metadata.daq_rec = m_packet_buffer[i_packet].debug; + image_metadata.is_good_image = 0; +} + +inline void JFJochUdpReceiver::copy_packet_to_buffers(ImageMetadata& metadata, char* frame_buffer, const int idx_packet){ + + size_t buffer_offset = JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer_[idx_packet].packetnum; + + memcpy((void*) (frame_buffer + buffer_offset), m_packet_buffer[idx_packet].data, JUNGFRAU_DATA_BYTES_PER_PACKET); + + metadata.n_recv_packets++; +} + + + +/** Copy the contents of the packet buffer into a single assembled image + NOTE: In the jungfrau_packet, framenum is the trigger number + NOTE: Even partial frames are valid +**/ +inline uint64_t JFJochUdpReceiver::m_process_packets(const int start_offset, ImageMetadata& metadata, char* frame_buffer){ + + for (int i_packet=start_offset; i_packet < packet_buffer_n_packets_; i_packet++) { + + // First packet for this frame (sucks if this one is missed) + if (metadata.pulse_id == 0) { + init_frame(metadata, i_packet); + } + // Unexpected jump (if the last packet from the previous frame got lost) + if (metadata.frame_index != m_packet_buffer[i_packet].framenum) { + // Save queue status (lazy fifo queue) + m_packet_buffer_loaded_ = true; + m_packet_buffer_offset_ = i_packet; + // Even partial frames are valid? + return metadata.pulse_id; + } + + copy_packet_to_buffers(metadata, frame_buffer, i_packet); + + // Last frame packet received (frame finished) + if (packet_buffer_[i_packet].packetnum == JFJ_N_PACKETS_PER_FRAME - 1) { + // Buffer is loaded only if this is not the last message. + if (i_packet+1 != packet_buffer_n_packets_) { + // Continue on next packet + m_packet_buffer_loaded = true; + m_packet_buffer_offset = i_packet + 1; + + // If i_packet is the last packet the buffer is empty. + } else { + m_packet_buffer_loaded = true; + m_packet_buffer_offset = 0; + } + + return metadata.pulse_id; + } + } + // We emptied the buffer. + m_packet_buffer_loaded = false; + m_packet_buffer_offset = 0; + + return 0; +} + +uint64_t JFJochUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){ + // Reset the metadata and frame buffer for the next frame. + metadata.pulse_id = 0; + metadata.n_recv_packets = 0; + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + + // Happens when last packet from previous frame was missed. + if (packet_buffer_loaded_) { + auto pulse_id = m_process_packets(packet_buffer_offset_, metadata, frame_buffer); + if (pulse_id != 0) { return pulse_id; } + } + + // Otherwise read a new one + while (true) { + packet_buffer_n_packets_ = udp_receiver_.receive_many(msgs_, BUFFER_UDP_N_RECV_MSG); + + if (packet_buffer_n_packets_ > 0) { + auto pulse_id = m_process_packets(0, metadata, frame_buffer); + if (pulse_id != 0) { return pulse_id; } + } + } +} diff --git a/jfj-udp-recv/src/PacketUdpReceiver.cpp b/jfj-udp-recv/src/PacketUdpReceiver.cpp new file mode 100644 index 0000000..713dd98 --- /dev/null +++ b/jfj-udp-recv/src/PacketUdpReceiver.cpp @@ -0,0 +1,66 @@ +#include +#include +#include "PacketUdpReceiver.hpp" +#include "jungfrau.hpp" +#include +#include +#include "buffer_config.hpp" + +using namespace std; +using namespace buffer_config; + +PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { } + +PacketUdpReceiver::~PacketUdpReceiver() { + disconnect(); +} + +void PacketUdpReceiver::bind(const uint16_t port){ + if (socket_fd_ > -1) { + throw runtime_error("Socket already bound."); + } + + socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); + if (socket_fd_ < 0) { + throw runtime_error("Cannot open socket."); + } + + sockaddr_in server_address = {0}; + server_address.sin_family = AF_INET; + server_address.sin_addr.s_addr = INADDR_ANY; + server_address.sin_port = htons(port); + + timeval udp_socket_timeout; + udp_socket_timeout.tv_sec = 0; + udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT; + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) { + throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno))); + } + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) { + throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno))); + }; + //TODO: try to set SO_RCVLOWAT + + auto bind_result = ::bind(socket_fd_, reinterpret_cast(&server_address), sizeof(server_address)); + + if (bind_result < 0) { + throw runtime_error("Cannot bind socket."); + } +} + +int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){ + return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0); +} + +bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){ + auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0); + + return (data_len == buffer_n_bytes) ? true : false; +} + +void PacketUdpReceiver::disconnect(){ + close(socket_fd_); + socket_fd_ = -1; +} diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp new file mode 100644 index 0000000..4bd2a17 --- /dev/null +++ b/jfj-udp-recv/src/main.cpp @@ -0,0 +1,78 @@ +#include +#include +#include +#include + +#include "formats.hpp" +#include "buffer_config.hpp" +#include "FrameUdpReceiver.hpp" +#include "BufferUtils.hpp" +#include "FrameStats.hpp" + +using namespace std; +using namespace chrono; +using namespace buffer_config; +using namespace BufferUtils; + +int main (int argc, char *argv[]) { + + if (argc != 3) { + cout << endl; + cout << "Usage: jf_udp_recv [detector_json_filename] [module_id]"; + cout << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tmodule_id: id of the module for this process." << endl; + cout << endl; + + exit(-1); + } + + const auto config = read_json_config(string(argv[1])); + const int module_id = atoi(argv[2]); + + const auto udp_port = config.start_udp_port + module_id; + ImageUdpReceiver receiver(udp_port, module_id); + RamBuffer buffer(config.detector_name, config.n_modules); + ImageStats stats(config.detector_name, module_id, STATS_TIME); + + auto ctx = zmq_ctx_new(); + auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); + + + // Might be better creating a structure for double buffering + ImageMetadata metaBufferA; + char* dataBufferA = new char[IMAGE_N_BYTES]; + + uint64_t pulse_id_previous = 0; + uint64_t frame_index_previous = 0; + + + while (true) { + // NOTE: Needs to be pipelined for really high frame rates + auto pulse_id = receiver.get_image_from_udp(metaBufferA, dataBufferA); + + bool bad_pulse_id = false; + + if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) || + ( (pulse_id-pulse_id_previous) < 0 ) || + ( (pulse_id-pulse_id_previous) > 1000 ) ) { + + bad_pulse_id = true; + + } else { + + buffer.write_frame(metaBufferA, dataBufferA); + + zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); + + } + + stats.record_stats(metaBufferA, bad_pulse_id); + + pulse_id_previous = pulse_id; + frame_index_previous = metaBufferA.frame_index; + + } + + delete[] data; +} diff --git a/jfj-udp-recv/test/CMakeLists.txt b/jfj-udp-recv/test/CMakeLists.txt new file mode 100644 index 0000000..25c729a --- /dev/null +++ b/jfj-udp-recv/test/CMakeLists.txt @@ -0,0 +1,8 @@ +add_executable(jf-udp-recv-tests main.cpp) + +target_link_libraries(jf-udp-recv-tests + core-buffer-lib + jf-udp-recv-lib + gtest + ) + diff --git a/jfj-udp-recv/test/main.cpp b/jfj-udp-recv/test/main.cpp new file mode 100644 index 0000000..8f2cd01 --- /dev/null +++ b/jfj-udp-recv/test/main.cpp @@ -0,0 +1,10 @@ +#include "gtest/gtest.h" +#include "test_PacketUdpReceiver.cpp" +#include "test_FrameUdpReceiver.cpp" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/jfj-udp-recv/test/mock/udp.hpp b/jfj-udp-recv/test/mock/udp.hpp new file mode 100644 index 0000000..1cef271 --- /dev/null +++ b/jfj-udp-recv/test/mock/udp.hpp @@ -0,0 +1,16 @@ +#ifndef MOCK_UDP_H +#define MOCK_UDP_H + +const int MOCK_UDP_PORT(13000); + +sockaddr_in get_server_address(uint16_t udp_port) +{ + sockaddr_in server_address = {0}; + server_address.sin_family = AF_INET; + server_address.sin_addr.s_addr = INADDR_ANY; + server_address.sin_port = htons(udp_port); + + return server_address; +} + +#endif \ No newline at end of file diff --git a/jfj-udp-recv/test/test_FrameUdpReceiver.cpp b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp new file mode 100644 index 0000000..0d1c9ee --- /dev/null +++ b/jfj-udp-recv/test/test_FrameUdpReceiver.cpp @@ -0,0 +1,239 @@ +#include +#include +#include "gtest/gtest.h" +#include "FrameUdpReceiver.hpp" +#include "mock/udp.hpp" + +#include +#include +#include + +using namespace std; + +TEST(BufferUdpReceiver, simple_recv) +{ + auto n_packets = JF_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 5; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + FrameUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_middle_packet) +{ + auto n_packets = JF_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + FrameUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_first_packet) +{ + auto n_packets = JF_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + FrameUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_last_packet) +{ + auto n_packets = JF_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + FrameUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + // n_frames -1 because the last frame is not complete. + for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} \ No newline at end of file diff --git a/jfj-udp-recv/test/test_PacketUdpReceiver.cpp b/jfj-udp-recv/test/test_PacketUdpReceiver.cpp new file mode 100644 index 0000000..1be343a --- /dev/null +++ b/jfj-udp-recv/test/test_PacketUdpReceiver.cpp @@ -0,0 +1,170 @@ +#include +#include +#include "gtest/gtest.h" +#include "mock/udp.hpp" +#include "PacketUdpReceiver.hpp" + +#include +#include + +using namespace std; + +TEST(PacketUdpReceiver, simple_recv) +{ + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + send_udp_buffer.packetnum = 91; + send_udp_buffer.framenum = 92; + send_udp_buffer.bunchid = 93; + send_udp_buffer.debug = 94; + + auto server_address = get_server_address(udp_port); + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + this_thread::sleep_for(chrono::milliseconds(100)); + + jungfrau_packet recv_udp_buffer; + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + EXPECT_EQ(send_udp_buffer.packetnum, recv_udp_buffer.packetnum); + EXPECT_EQ(send_udp_buffer.framenum, recv_udp_buffer.framenum); + EXPECT_EQ(send_udp_buffer.bunchid, recv_udp_buffer.bunchid); + EXPECT_EQ(send_udp_buffer.debug, recv_udp_buffer.debug); + + ASSERT_FALSE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + udp_receiver.disconnect(); + ::close(send_socket_fd); +} + +TEST(PacketUdpReceiver, false_recv) +{ + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + jungfrau_packet recv_udp_buffer; + + auto server_address = get_server_address(udp_port); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET-1, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_FALSE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET-1, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET-1)); + + udp_receiver.disconnect(); + ::close(send_socket_fd); +} + +TEST(PacketUdpReceiver, receive_many) +{ + auto n_msg_buffer = JF_N_PACKETS_PER_FRAME; + jungfrau_packet recv_buffer[n_msg_buffer]; + iovec recv_buff_ptr[n_msg_buffer]; + struct mmsghdr msgs[n_msg_buffer]; + struct sockaddr_in sockFrom[n_msg_buffer]; + + for (int i = 0; i < n_msg_buffer; i++) { + recv_buff_ptr[i].iov_base = (void*) &(recv_buffer[i]); + recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet); + + msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i]; + msgs[i].msg_hdr.msg_iovlen = 1; + msgs[i].msg_hdr.msg_name = &sockFrom[i]; + msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } + + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + + auto server_address = get_server_address(udp_port); + + send_udp_buffer.bunchid = 0; + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + send_udp_buffer.bunchid = 1; + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + this_thread::sleep_for(chrono::milliseconds(10)); + + auto n_msgs = udp_receiver.receive_many(msgs, JF_N_PACKETS_PER_FRAME); + ASSERT_EQ(n_msgs, 2); + + for (size_t i=0;i