From ad634a8c8c1bb53224af14844dec7a4235ef15cc Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Tue, 14 Sep 2021 10:43:38 +0200 Subject: [PATCH] ZMQ streamer for jungfrau --- jfj-combined/CMakeLists.txt | 12 ++ jfj-combined/README.md | 164 +++++++++++++++ jfj-combined/include/FrameCache.hpp | 177 +++++++++++++++++ jfj-combined/include/FrameStats.hpp | 31 +++ jfj-combined/include/FrameWorker.hpp | 48 +++++ jfj-combined/include/PacketBuffer.hpp | 117 +++++++++++ jfj-combined/include/PacketUdpReceiver.hpp | 27 +++ jfj-combined/include/Watchdog.hpp | 74 +++++++ jfj-combined/include/ZmqImagePublisher.hpp | 140 +++++++++++++ jfj-combined/src/JfjFrameStats.cpp | 68 +++++++ jfj-combined/src/JfjFrameWorker.cpp | 107 ++++++++++ jfj-combined/src/PacketUdpReceiver.cpp | 66 ++++++ jfj-combined/src/main.cpp | 60 ++++++ jfj-combined/test/CMakeLists.txt | 8 + jfj-combined/test/main.cpp | 11 + jfj-combined/test/mock/dummy_detector.json | 13 ++ jfj-combined/test/mock/udp.hpp | 16 ++ jfj-combined/test/test_FrameUdpReceiver.cpp | 199 +++++++++++++++++++ jfj-combined/test/test_PacketBuffer.cpp | 76 +++++++ jfj-combined/test/test_PacketUdpReceiver.cpp | 170 ++++++++++++++++ jfj-combined/test/test_Watchdog.cpp | 51 +++++ 21 files changed, 1635 insertions(+) create mode 100644 jfj-combined/CMakeLists.txt create mode 100644 jfj-combined/README.md create mode 100644 jfj-combined/include/FrameCache.hpp create mode 100644 jfj-combined/include/FrameStats.hpp create mode 100644 jfj-combined/include/FrameWorker.hpp create mode 100644 jfj-combined/include/PacketBuffer.hpp create mode 100644 jfj-combined/include/PacketUdpReceiver.hpp create mode 100644 jfj-combined/include/Watchdog.hpp create mode 100644 jfj-combined/include/ZmqImagePublisher.hpp create mode 100644 jfj-combined/src/JfjFrameStats.cpp create mode 100644 jfj-combined/src/JfjFrameWorker.cpp create mode 100644 jfj-combined/src/PacketUdpReceiver.cpp create mode 100644 jfj-combined/src/main.cpp create mode 100644 jfj-combined/test/CMakeLists.txt create mode 100644 jfj-combined/test/main.cpp create mode 100644 jfj-combined/test/mock/dummy_detector.json create mode 100644 jfj-combined/test/mock/udp.hpp create mode 100644 jfj-combined/test/test_FrameUdpReceiver.cpp create mode 100644 jfj-combined/test/test_PacketBuffer.cpp create mode 100644 jfj-combined/test/test_PacketUdpReceiver.cpp create mode 100644 jfj-combined/test/test_Watchdog.cpp diff --git a/jfj-combined/CMakeLists.txt b/jfj-combined/CMakeLists.txt new file mode 100644 index 0000000..4932c5d --- /dev/null +++ b/jfj-combined/CMakeLists.txt @@ -0,0 +1,12 @@ +file(GLOB SOURCES src/*.cpp) + +add_library(jfj-combined-lib STATIC ${SOURCES}) +target_include_directories(jfj-combined-lib PUBLIC include/) +target_link_libraries(jfj-combined-lib external core-buffer-lib) + +add_executable(jfj-combined src/main.cpp) +set_target_properties(jfj-combined PROPERTIES OUTPUT_NAME jfj_combined) +target_link_libraries(jfj-combined jfj-combined-lib zmq rt pthread) + +enable_testing() +add_subdirectory(test/) diff --git a/jfj-combined/README.md b/jfj-combined/README.md new file mode 100644 index 0000000..504281f --- /dev/null +++ b/jfj-combined/README.md @@ -0,0 +1,164 @@ +# sf-buffer +sf-buffer is the component that receives the detector data in form of UDP +packages and writes them down to disk to a binary format. In addition, it +sends a copy of the module frame to sf-stream via ZMQ. + +Each sf-buffer process is taking care of a single detector module. The +processes are all independent and do not rely on any external data input +to maximize isolation and possible interactions in our system. + +The main design principle is simplicity and decoupling: + +- No interprocess dependencies/communication. +- No dependencies on external libraries (as much as possible). +- Using POSIX as much as possible. + +We are optimizing for maintainability and long term stability. Performance is +of concern only if the performance criteria are not met. + +## Overview + +![image_buffer_overview](../docs/sf_daq_buffer-overview-buffer.jpg) + +sf-buffer is a single threaded application (without counting the ZMQ IO threads) +that does both receiving, assembling, writing and sending in the same thread. + +### UDP receiving + +Each process listens to one udp port. Packets coming to this udp port are +assembled into frames. Frames (either complete or with missing packets) are +passed forward. The number of received packets is saved so we can later +(at image assembly time) determine if the frame is valid or not. At this point +we do no validation. + +We are currently using **recvmmsg** to minimize the number of switches to +kernel mode. + +We expect all packets to come in order or not come at all. Once we see the +package for the next pulse_id we can assume no more packages are coming for +the previous one, and send the assembled frame down the program. + +### File writing + +Files are written to disk in frames - one write to disk per frame. This gives +us a relaxed 10ms interval of 1 MB writes. + +#### File format + +The binary file on disk is just a serialization of multiple +**BufferBinaryFormat** structs: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) + +#pragma pack(push) +#pragma pack(1) +struct BufferBinaryFormat { + const char FORMAT_MARKER = 0xBE; + ModuleFrame meta; + char data[buffer_config::MODULE_N_BYTES]; +}; +#pragma pack(pop) +``` + +![file_layout_image](../docs/sf_daq_buffer-FileLayout.jpg) + +Each frame is composed by: + +- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame. +- **ModuleFrame** - frame meta used in image assembly phase. +- **Data** - assembled frame from a single module. + +Frames are written one after another to a specific offset in the file. The +offset is calculated based on the pulse_id, so each frame has a specific place +in the file and there is no need to have an index for frame retrieval. + +The offset where a specific pulse_id is written in a file is calculated: + +```c++ +// We save 1000 pulses in each file. +const uint64_t FILE_MOD = 1000 + +// Relative index of pulse_id inside file. +size_t file_base = pulse_id % FILE_MOD; +// Offset in bytes of relative index in file. +size_t file_offset = file_base * sizeof(BufferBinaryFormat); +``` + +We now know where to look for data inside the file, but we still don't know +inside which file to look. For this we need to discuss the folder structure. + +#### Folder structure + +The folder (as well as file) structure is deterministic in the sense that given +a specific pulse_id, we can directly calculate the folder, file, and file +offset where the data is stored. This allows us to have independent writing +and reading from the buffer without building any indexes. + +The binary files written by sf_buffer are saved to: + +[detector_folder]/[module_folder]/[data_folder]/[data_file].bin + +- **detector\_folder** should always be passed as an absolute path. This is the +container that holds all data related to a specific detector. +- **module\_folder** is usually composed like "M00", "M01". It separates data +from different modules of one detector. +- **data\_folder** and **data\_file** are automatically calculated based on the +current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our +index for accessing data. + +![folder_layout_image](../docs/sf_daq_buffer-FolderLayout.jpg) + +```c++ +// FOLDER_MOD = 100000 +int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD; +// FILE_MOD = 1000 +int data_file = (pulse_id % FILE_MOD) * FILE_MOD; +``` + +The data_folder and data_file folders are named as the first pulse_id that +should be stored inside them. + +FOLDER_MOD == 100000 means that each data_folder will contain data for 100000 +pulses, while FILE_MOD == 1000 means that each file inside the data_folder +will contain 1000 pulses. The total number of data_files in each data_folder +will therefore be **FILE\_MOD / FOLDER\_MOD = 100**. + +#### Analyzing the buffer on disk +In **sf-utils** there is a Python module that allows you to read directly the +buffer in order to debug it or to verify the consistency between the HDF5 file +and the received data. + +- VerifyH5DataConsistency.py checks the consistency between the H5 file and +buffer. +- BinaryBufferReader.py reads the buffer and prints meta. The class inside +can also be used in external scripts. + +### ZMQ sending + +A copy of the data written to disk is also send via ZMQ to the sf-stream. This +is used to provide live viewing / processing capabilities. Each module data is +sent separately, and this is later assembled in the sf-stream. + +We use the PUB/SUB mechanism for distributing this data - we cannot control the +rate of the producer, and we would like to avoid distributed image assembly +if possible, so PUSH/PULL does not make sense in this case. + +We provide no guarantees on live data delivery, but in practice the number of +dropped or incomplete frames in currently negligible. + +The protocol is a serialization of the same data structures we use to +write on disk (no need for additional memory operations before sending out +data). It uses a 2 part multipart ZMQ message: + +- The first part is a serialization of the ModuleFrame struct (see above). +- The second part is the data field in the BufferBinaryFormat struct (the frame +data). diff --git a/jfj-combined/include/FrameCache.hpp b/jfj-combined/include/FrameCache.hpp new file mode 100644 index 0000000..263577e --- /dev/null +++ b/jfj-combined/include/FrameCache.hpp @@ -0,0 +1,177 @@ +#ifndef SF_DAQ_FRAME_CACHE_HPP +#define SF_DAQ_FRAME_CACHE_HPP + +#include +#include +#include +#include +#include +#include +#include + +#include "../../core-buffer/include/EpicsFieldTypes.hpp" +#include "../../core-buffer/include/formats.hpp" +#include "Watchdog.hpp" + +#define MAX_FIFO_LENGTH 32 + + +/** Frame Cache + + Similar to a thread-safe RamBuffer that handles concurrency internally via mutexes. + The class operates on in-memory arrays via pointer/reference access. It uses a + linearly increasing pulseID index for cache addressing. The standard placement method + ensures that no data corruption occurs, lines are always flushed before overwrite. + A large-enough buffer should ensure that there is sufficient time to retrieve all + data from all detector modules. + + The cache line is flushed on three occasions: + - A new frame is about to overwrite it (by the frame-worker thread) + - Complete frames are queued for flushing internally (by internal worker) + - Incomplete frames are flushed by a watchdog after a timeout (by watchdog worker) + + NOTE: The class is header-only for future template-refactoring. + TODO: Multiple queue workers + **/ +class FrameCache{ +public: + FrameCache(uint64_t N_CAP, uint64_t modX, uint64_t modY, std::function callback): + m_capacity(N_CAP), m_modX(modX), m_modY(modY), m_mod(modX*modY), m_valid(N_CAP, 0), m_fill(N_CAP, 0), m_lock(N_CAP), + m_buffer(N_CAP, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))), + f_send(callback) { + // Initialize buffer metadata + for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); } + + // Initialize the watchdog + std::function wd_callback = std::bind(&FrameCache::flush_all, this); + m_watchdog = new Watchdog(500, wd_callback); + m_watchdog->Start(); + + // Start drain worker + m_drainer = std::thread(&FrameCache::drain_loop, this); + }; + + + /** Emplace + + Place a recorded frame to it's corresponding module location. + This simultaneously handles buffering, assembly and flushing. + Also handles concurrency (shared and unique mutexes). + + NOTE: Forced flushing is performed by the current thread. + **/ + void emplace(uint64_t pulseID, uint64_t moduleIDX, BufferBinaryFormat& inc_frame){ + // Cache-line index + const uint64_t idx = pulseID % m_capacity; + + // A new frame is starting + if(inc_frame.meta.pulse_id != m_buffer[idx].meta.id){ + // Unique lock to flush and start a new one + std::unique_lock p_guard(m_lock[idx]); + // Check if condition persists after getting the mutex + if(inc_frame.meta.pulse_id != m_buffer[idx].meta.id){ + start_line(idx, inc_frame.meta); + } + } + + // Shared lock for concurrent PUT operations + std::shared_lock s_guard(m_lock[idx]); + + // Calculate destination pointer and copy data + char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize; + std::memcpy((void*)ptr_dest, (void*)&inc_frame.data, m_blocksize); + m_fill[idx]++; + m_watchdog->Kick(); + + // Queue for draining + if(m_fill[idx]==m_mod-1){ + if(m_fill.size() > MAX_FIFO_LENGTH) { + m_drain_queue.push_back(idx); + } + } + } + + +protected: + /** Flush and start a new line + + Flushes a valid cache line and starts another one from the provided metadata. + NOTE : It does not lock, that must be done externally! **/ + void start_line(uint64_t idx, ModuleFrame& inc_frame){ + // 1. Flush + if(m_valid[idx]){ f_send(m_buffer[idx]); } + + // 2. Init new frame + m_buffer[idx].meta.id = inc_frame.pulse_id; + m_buffer[idx].meta.width = 1024 * m_modX; + m_buffer[idx].meta.height = 512 * m_modY; + m_buffer[idx].meta.dtype = (int)DBF_USHORT; + m_buffer[idx].meta.user_1 = inc_frame.frame_index; + m_buffer[idx].meta.user_2 = inc_frame.daq_rec; + m_buffer[idx].meta.status = true; + m_fill[idx] = 0; + m_valid[idx] = 1; + } + + /** Flush and invalidate a line + + Flushes a valid cache line and invalidates the associated buffer. + NOTE : It does not lock, that must be done externally! **/ + void flush_line(uint64_t idx){ + if(m_valid[idx]){ + f_send(m_buffer[idx]); + m_fill[idx] = 0; + m_valid[idx] = 0; + } + } + + /** Flush all lines in the buffer**/ + void flush_all(){ + for(int64_t idx=0; idx< m_capacity; idx++){ + std::unique_lock p_guard(m_lock[idx]); + flush_line(idx); + } + } + + /** Drain loop + + Flushes queued frames from the cache buffer and invalidates line. + It also locks the frame for the duration of flushing! **/ + void drain_loop(){ + while(true){ + if(!m_drain_queue.empty()){ + uint32_t idx = m_drain_queue.front(); + m_drain_queue.pop_front(); + // Lock and flush the frame + std::unique_lock p_guard(m_lock[idx]); + flush_line(idx); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + } + } + } + + /** Variables **/ + const uint64_t m_capacity; + const uint64_t m_modX; + const uint64_t m_modY; + const uint64_t m_mod; + const uint64_t m_blocksize = 1024*512*sizeof(uint16_t); + + /** Flush function **/ + std::function f_send; + + /** Main container and mutex guard **/ + std::vector m_valid; + std::vector m_fill; + std::vector m_lock; + std::vector m_buffer; + + /** Watchdog timer and flush queue **/ + Watchdog *m_watchdog; + std::thread m_drainer; + std::deque m_drain_queue; + +}; + +#endif // SF_DAQ_FRAME_CACHE_HPP diff --git a/jfj-combined/include/FrameStats.hpp b/jfj-combined/include/FrameStats.hpp new file mode 100644 index 0000000..30a4fa2 --- /dev/null +++ b/jfj-combined/include/FrameStats.hpp @@ -0,0 +1,31 @@ +#include +#include + +#include "../../core-buffer/include/formats.hpp" + +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP + + +class FrameStats { +private: + const std::string detector_name_; + const int module_id_; + size_t stats_time_; + + int frames_counter_; + int n_missed_packets_; + int n_corrupted_frames_; + int n_corrupted_pulse_id_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + FrameStats(const std::string &detector_name, const int module_id, const size_t stats_time); + void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); +}; + + +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jfj-combined/include/FrameWorker.hpp b/jfj-combined/include/FrameWorker.hpp new file mode 100644 index 0000000..48e3c3d --- /dev/null +++ b/jfj-combined/include/FrameWorker.hpp @@ -0,0 +1,48 @@ +#ifndef SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP +#define SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP + +#include +#include +#include + +#include "../../core-buffer/include/formats.hpp" +#include "PacketUdpReceiver.hpp" +#include "PacketBuffer.hpp" +#include "FrameStats.hpp" + +/** JungfrauJoch UDP receiver + + Capture UDP data stream from Jungfrau(Joch) FPGA card. + NOTE: This design will not scale well for higher frame rates... + TODO: Direct copy into FrameCache buffer (saves a memcopy) +**/ +class JfjFrameWorker { + const std::string m_moduleName; + const uint64_t m_moduleID; + + // UDP and statistics interfaces + FrameStats m_moduleStats; + PacketUdpReceiver m_udp_receiver; + + // Buffer and helper structures + bool in_progress = false; + uint64_t m_current_index = 0; + PacketBuffer m_buffer; + + // Buffer processing + inline uint64_t process_packets(BufferBinaryFormat& buffer); + uint64_t get_frame(BufferBinaryFormat& buffer); + + std::function f_push_callback; +public: + JfjFrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID, + std::function callback); + virtual ~JfjFrameWorker(); + void run(); + + // Copy semantics : OFF + JfjFrameWorker(JfjFrameWorker const &) = delete; + JfjFrameWorker& operator=(JfjFrameWorker const &) = delete; +}; + +#endif //SF_DAQ_BUFFER_JFJ_FRAMEWORKER_HPP diff --git a/jfj-combined/include/PacketBuffer.hpp b/jfj-combined/include/PacketBuffer.hpp new file mode 100644 index 0000000..45feb32 --- /dev/null +++ b/jfj-combined/include/PacketBuffer.hpp @@ -0,0 +1,117 @@ +#ifndef CIRCULAR_BUFFER_TEMPLATE_HPP +#define CIRCULAR_BUFFER_TEMPLATE_HPP + +#include +#include +#include +#include +#include +#if defined(WIN32) || defined(_WIN32) || defined(MINGW32) + #include +#else + #include + #include +#endif // defined + + +/** Linear data buffer (NOT FIFO) + + Simplified data buffer that provides pop and push operations and + bundles the actual container with metadata required by . + It stores the actual data in an accessible C-style array. **/ +template +class PacketBuffer{ +public: + PacketBuffer() { + // Initialize C-structures as expected by + for (int i = 0; i < CAPACITY; i++) { + m_recv_buff_ptr[i].iov_base = (void*) &(m_container[i]); + m_recv_buff_ptr[i].iov_len = sizeof(T); + + m_msgs[i].msg_hdr.msg_iov = &m_recv_buff_ptr[i]; + m_msgs[i].msg_hdr.msg_iovlen = 1; + m_msgs[i].msg_hdr.msg_name = &m_sock_from[i]; + m_msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } + }; + // ~PacketBuffer() {}; + + /**Diagnostics**/ + int size() const { return ( idx_write-idx_read ); } + int capacity() const { return m_capacity; } + bool is_full() const { return bool(idx_write >= m_capacity); } + bool is_empty() const { return bool(idx_write <= idx_read); } + + /**Operators**/ + void reset(){ idx_write = 0; idx_read = 0; }; // Reset the buffer + T& container(){ return m_container; }; // Direct container reference + mmsghdr& msgs(){ return m_msgs; }; + + /**Element access**/ + T& pop_front(); //Destructive read + const T& peek_front(); //Non-destructive read + void push_back(T item); //Write new element to buffer + + /**Fill from UDP receiver (threadsafe)**/ + template + void fill_from(TY& recv){ + std::lock_guard g_guard(m_mutex); + this->idx_write = recv.receive_many(m_msgs, this->capacity()); + // Returns -1 with errno=11 if no data received + if(idx_write==-1){ idx_write = 0; } + this->idx_read = 0; + } + +private: + // Main container + T m_container[CAPACITY]; + const size_t m_capacity = CAPACITY; + /**Guards**/ + std::mutex m_mutex; + /**Read and write index**/ + int idx_write = 0; + int idx_read = 0; + + // C-structures as expected by + mmsghdr m_msgs[CAPACITY]; + iovec m_recv_buff_ptr[CAPACITY]; + sockaddr_in m_sock_from[CAPACITY]; +}; + + +/*********************************************************************/ +/*********************************************************************/ +/*********************************************************************/ + +/** Destructive read + Standard read access to queues (i.e. progress the read pointer). + Throws 'std::length_error' if container is empty. **/ +template +T& PacketBuffer::pop_front(){ + std::lock_guard g_guard(m_mutex); + if(this->is_empty()) { throw std::out_of_range("Attempted to read empty queue!"); } + idx_read++; + return m_container[idx_read-1]; +} + +/** Non-destructive read + Standard, non-destructive read access (does not progress the read pointer). + Throws 'std::length_error' if container is empty. **/ +template +const T& PacketBuffer::peek_front(){ + std::lock_guard g_guard(m_mutex); + if(this->is_empty()) { throw std::out_of_range("Attempted to read empty queue!"); } + return m_container[idx_read]; +} + + +/** Push an element into the end of the buffer**/ +template +void PacketBuffer::push_back(T item){ + std::lock_guard g_guard(m_mutex); + if(this->is_full()) { throw std::out_of_range("Attempted to write a full buffer!"); } + m_container[idx_write] = item; + idx_write++; +} + +#endif // CIRCULAR_BUFFER_TEMPLATE_HPP diff --git a/jfj-combined/include/PacketUdpReceiver.hpp b/jfj-combined/include/PacketUdpReceiver.hpp new file mode 100644 index 0000000..4999ec7 --- /dev/null +++ b/jfj-combined/include/PacketUdpReceiver.hpp @@ -0,0 +1,27 @@ +#ifndef UDPRECEIVER_H +#define UDPRECEIVER_H + +#include +#if defined(WIN32) || defined(_WIN32) || defined(MINGW32) + #include +#else + #include +#endif // defined + +class PacketUdpReceiver { + + int socket_fd_; + +public: + PacketUdpReceiver(); + virtual ~PacketUdpReceiver(); + + bool receive(void* buffer, const size_t buffer_n_bytes); + int receive_many(mmsghdr* msgs, const size_t n_msgs); + + void bind(const uint16_t port); + void disconnect(); +}; + + +#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H diff --git a/jfj-combined/include/Watchdog.hpp b/jfj-combined/include/Watchdog.hpp new file mode 100644 index 0000000..2cbe84b --- /dev/null +++ b/jfj-combined/include/Watchdog.hpp @@ -0,0 +1,74 @@ +#ifndef FRAME_CACHE_HPP +#define FRAME_CACHE_HPP + +#include +#include +#include +#include +#include + + +/** Watchdog timer class + + Unless kicked repeatedly, it periodically calls a user-defined function. + **/ +class Watchdog{ +public: + Watchdog(int64_t timeout, std::function callback): m_timeout(timeout), m_callback(callback) {}; + ~Watchdog() { Stop(); }; + void Start(); + void Stop(); + void Kick(); + +protected: + int64_t m_timeout; + std::atomic m_running = false; + std::function m_callback; + std::chrono::time_point m_lastkick; + + std::thread m_thread; + std::mutex m_mutex; + void Loop(); +}; + + +void Watchdog::Start(){ + std::unique_lock lock(m_mutex); + if(m_running == false){ + m_running = true; + m_lastkick = std::chrono::steady_clock::now(); + m_thread = std::thread(&Watchdog::Loop, this); + } +} + +void Watchdog::Stop(){ + std::unique_lock g_guard(m_mutex); + if(m_running == true){ + m_running = false; + m_thread.join(); + } +} + +void Watchdog::Kick(){ + std::unique_lock g_guard(m_mutex); + m_lastkick = std::chrono::steady_clock::now(); +} + +void Watchdog::Loop(){ + std::cout << "Starting watchdog" << std::endl; + while(m_running){ + auto elapsed = std::chrono::duration_cast(std::chrono::steady_clock::now() - m_lastkick); + if(elapsed.count() < m_timeout){ + // std::cout << "Elapsed " << (int64_t)elapsed.count() << " of " << m_timeout << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } else { + std::cout << "Expired timer" << std::endl; + m_callback(); + // Infinite re-kick + std::unique_lock g_guard(m_mutex); + m_lastkick = std::chrono::steady_clock::now(); + } + } +} + +#endif // FRAME_CACHE_HPP diff --git a/jfj-combined/include/ZmqImagePublisher.hpp b/jfj-combined/include/ZmqImagePublisher.hpp new file mode 100644 index 0000000..86f4eeb --- /dev/null +++ b/jfj-combined/include/ZmqImagePublisher.hpp @@ -0,0 +1,140 @@ +#ifndef SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP +#define SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP + +#include +#include +#include +#include +#include + +#include "../../core-buffer/include/formats.hpp" + +typedef enum { + DBF_STRING, + DBF_CHAR, + DBF_UCHAR, + DBF_SHORT, + DBF_USHORT, + DBF_LONG, + DBF_ULONG, + DBF_INT64, + DBF_UINT64, + DBF_FLOAT, + DBF_DOUBLE, + DBF_ENUM, + DBF_MENU, + DBF_DEVICE, + DBF_INLINK, + DBF_OUTLINK, + DBF_FWDLINK, + DBF_NOACCESS +}dbfType; + + +#define ASSERT_FALSE(expr, msg) \ + if(bool(expr)) { \ + std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \ + text = text + "Reason: " + std::to_string(expr) + "\n"; \ + text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \ + throw std::runtime_error(text); \ + } \ + +#define ASSERT_TRUE(expr, msg) \ + if(!bool(expr)) { \ + std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \ + text = text + "Reason: " + std::to_string(expr) + "\n"; \ + text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \ + throw std::runtime_error(text); \ + } + +/** ZMQ Publisher + + Lightweight wrapper base class to initialize a ZMQ Publisher. + Nothing data specific, but everything is only 'protected'. + It also has an internal mutex that can be used for thread-safe + access to the underlying connection; +**/ +class ZmqPublisher { + protected: + const uint16_t m_port; + std::string m_address; + zmq::context_t m_ctx; + zmq::socket_t m_socket; + std::mutex g_zmq_socket; + + public: + ZmqPublisher(std::string ip, uint16_t port, uint32_t n_threads) : + m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(n_threads), m_socket(m_ctx, ZMQ_PUB) { + // Bind the socket + m_socket.bind(m_address.c_str()); + std::cout << "Initialized ZMQ publisher at " << m_address << std::endl; + }; + + ~ZmqPublisher(){}; +}; + + +/** ZMQ Image Publisher + + Specialized publisher to send 'ImageBinaryFormat' data format as + multipart message. It also takes care of thread safety. + + NOTE: This method implements a single publisher! The receiver should take care of load balancing and redistributing. +**/ +class ZmqImagePublisher: public ZmqPublisher { + public: + ZmqImagePublisher(std::string ip, uint16_t port, uint32_t n_threads) : ZmqPublisher(ip, port, n_threads) {}; + const std::string topic = "IMAGEDATA"; + + std::string serializer(const ImageMetadata& meta){ + rapidjson::Document header(rapidjson::kObjectType); + auto& header_alloc = header.GetAllocator(); + + // Fill the RapidJSON header with metadata + header.AddMember("version", meta.version, header_alloc); + header.AddMember("id", meta.id, header_alloc); + header.AddMember("height", meta.height, header_alloc); + header.AddMember("width", meta.width, header_alloc); + header.AddMember("dtype", meta.dtype, header_alloc); + header.AddMember("encoding", meta.encoding, header_alloc); + header.AddMember("array_id", meta.array_id, header_alloc); + header.AddMember("status", meta.status, header_alloc); + header.AddMember("user_1", meta.user_1, header_alloc); + header.AddMember("user_2", meta.user_2, header_alloc); + // Set image shape + auto shape_value = rapidjson::Value(rapidjson::kArrayType); + shape_value.PushBack((uint64_t)meta.height, header_alloc); + shape_value.PushBack((uint64_t)meta.width, header_alloc); + header.AddMember("shape", shape_value, header_alloc); + + // Serialize header + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + header.Accept(writer); + + std::string text_header = buffer.GetString(); + return text_header; + } + + void sendImage(ImageBinaryFormat& image){ + auto meta_str = serializer(image.meta); + std::cout << "Metadata JSON file:\n" << meta_str << std::endl; + + std::lock_guard guard(g_zmq_socket); + int len; + + len = m_socket.send(topic.c_str(), topic.size(), ZMQ_SNDMORE); + ASSERT_TRUE( len >=0, "Failed to send topic data" ) + len = m_socket.send(meta_str.c_str(), meta_str.length(), ZMQ_SNDMORE); + ASSERT_TRUE( len >=0, "Failed to send meta data" ) + len = m_socket.send(image.data.data(), image.data.size(), 0); + ASSERT_TRUE( len >=0, "Failed to send image data" ) + + if(image.meta.id%100==0){ + std::cout << "Sent ZMQ stream of pulse: " << image.meta.id << std::endl; + } + } +}; + + +#endif //SF_DAQ_BUFFER_ZMQ_IMAGE_PUBLISHER_HPP diff --git a/jfj-combined/src/JfjFrameStats.cpp b/jfj-combined/src/JfjFrameStats.cpp new file mode 100644 index 0000000..4df8b82 --- /dev/null +++ b/jfj-combined/src/JfjFrameStats.cpp @@ -0,0 +1,68 @@ +#include +#include +#include "FrameStats.hpp" + +using namespace std; +using namespace chrono; + +FrameStats::FrameStats( + const std::string &detector_name, + const int module_id, + const size_t stats_time) : + detector_name_(detector_name), + module_id_(module_id), + stats_time_(stats_time) +{ + reset_counters(); +} + +void FrameStats::reset_counters(){ + frames_counter_ = 0; + n_missed_packets_ = 0; + n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id){ + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + } + + if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { + n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; + n_corrupted_frames_++; + } + + frames_counter_++; + + auto time_passed = duration_cast(steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { + print_stats(); + reset_counters(); + } +} + +void FrameStats::print_stats(){ + auto interval_ms_duration = duration_cast(steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast(system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + stringstream ss; + ss << "jf_udp_recv"; + ss << ",detector_name=" << detector_name_; + ss << ",module_name=M" << module_id_; + ss << " "; + ss << "n_missed_packets=" << n_missed_packets_ << "i"; + ss << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; + ss << ",repetition_rate=" << rep_rate << "i"; + ss << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; + ss << " "; + ss << timestamp; + ss << std::endl; + std::cout << ss.str(); +} diff --git a/jfj-combined/src/JfjFrameWorker.cpp b/jfj-combined/src/JfjFrameWorker.cpp new file mode 100644 index 0000000..703d900 --- /dev/null +++ b/jfj-combined/src/JfjFrameWorker.cpp @@ -0,0 +1,107 @@ +#include + +#include "FrameWorker.hpp" + + +JfjFrameWorker::JfjFrameWorker(const uint16_t port, std::string moduleName, const uint32_t moduleID, std::function callback): + m_moduleName(moduleName), m_moduleID(moduleID), m_moduleStats(moduleName, moduleID, 10.0), f_push_callback(callback) { + m_udp_receiver.bind(port); +} + + +JfjFrameWorker::~JfjFrameWorker() { + m_udp_receiver.disconnect(); +} + +/** Process Packets + + Drains the buffer either until it's empty or the current frame is finished. + Has some optimizations and safety checks before segfaulting right away... + TODO: Direct memcopy into FrameCache for more speed! **/ +inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){ + + while(!m_buffer.is_empty()){ + // Happens if the last packet from the previous frame gets lost. + if (m_current_index != m_buffer.peek_front().bunchid) { + m_current_index = m_buffer.peek_front().bunchid; + if(this->in_progress){ + this->in_progress = false; + return buffer.meta.pulse_id; + } + } + + // Otherwise pop the queue (and set current frame index) + jfjoch_packet_t& c_packet = m_buffer.pop_front(); + + // Sanity check: rather throw than segfault... + if(c_packet.packetnum >= JF_N_PACKETS_PER_FRAME) { + std::stringstream ss; + ss << "Packet index '" << c_packet.packetnum << "' is out of range of " << JF_N_PACKETS_PER_FRAME << std::endl; + throw std::range_error(ss.str()); + } + + // Start new frame + if(!this->in_progress) { + m_current_index = c_packet.bunchid; + this->in_progress = true; + + // Always copy metadata (otherwise problem when 0th packet gets lost) + buffer.meta.pulse_id = c_packet.bunchid; + buffer.meta.frame_index = c_packet.framenum; + buffer.meta.daq_rec = c_packet.debug; + buffer.meta.module_id = m_moduleID; + } + + // Copy data to frame buffer + size_t offset = JUNGFRAU_DATA_BYTES_PER_PACKET * c_packet.packetnum; + std::memcpy( (void*) (buffer.data + offset), c_packet.data, JUNGFRAU_DATA_BYTES_PER_PACKET); + buffer.meta.n_recv_packets++; + + // Last frame packet received. Frame finished. + if (c_packet.packetnum == JF_N_PACKETS_PER_FRAME - 1) { + this->in_progress = false; + return buffer.meta.pulse_id; + } + } + + // We emptied the buffer. + return 0; +} + +uint64_t JfjFrameWorker::get_frame(BufferBinaryFormat& buffer){ + // Reset the metadata and frame buffer for the next frame + std::memset(&buffer, 0, sizeof(buffer)); + uint64_t pulse_id = 0; + + // Hehehehe... do-while loop! + do { + // First make sure the buffer is drained of leftovers + pulse_id = process_packets(buffer); + if (pulse_id != 0) { return pulse_id; } + + // Then try to refill buffer... + m_buffer.fill_from(m_udp_receiver); + } while (true); +} + + +void JfjFrameWorker::run(){ + std::cout << "Running worker loop" << std::endl; + // Might be better creating a structure for double buffering + BufferBinaryFormat buffer; + + try{ + while (true) { + // NOTE: Needs to be pipelined for really high frame rates + auto pulse_id = get_frame(buffer); + m_moduleStats.record_stats(buffer.meta, true); + + if(pulse_id>10) { + f_push_callback(pulse_id, m_moduleID, buffer); + } + } + } catch (const std::exception& ex) { + std::cout << "Exception in worker loop: " << ex.what() << std::endl; + throw; + }; +} diff --git a/jfj-combined/src/PacketUdpReceiver.cpp b/jfj-combined/src/PacketUdpReceiver.cpp new file mode 100644 index 0000000..6483fab --- /dev/null +++ b/jfj-combined/src/PacketUdpReceiver.cpp @@ -0,0 +1,66 @@ +#include +#include +#include +#include + +#include "PacketUdpReceiver.hpp" +#include "../../core-buffer/include/buffer_config.hpp" + +using namespace std; +using namespace buffer_config; + +PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { } + +PacketUdpReceiver::~PacketUdpReceiver() { + disconnect(); +} + +void PacketUdpReceiver::bind(const uint16_t port){ + if (socket_fd_ > -1) { + throw runtime_error("Socket already bound."); + } + + socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); + if (socket_fd_ < 0) { + throw runtime_error("Cannot open socket."); + } + + sockaddr_in server_address = {0}; + server_address.sin_family = AF_INET; + server_address.sin_addr.s_addr = INADDR_ANY; + server_address.sin_port = htons(port); + + timeval udp_socket_timeout; + udp_socket_timeout.tv_sec = 0; + udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT; + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) { + throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno))); + } + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) { + throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno))); + }; + //TODO: try to set SO_RCVLOWAT + + auto bind_result = ::bind(socket_fd_, reinterpret_cast(&server_address), sizeof(server_address)); + + if (bind_result < 0) { + throw runtime_error("Cannot bind socket."); + } +} + +int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){ + return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0); +} + +bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){ + auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0); + + return (data_len == buffer_n_bytes) ? true : false; +} + +void PacketUdpReceiver::disconnect(){ + close(socket_fd_); + socket_fd_ = -1; +} diff --git a/jfj-combined/src/main.cpp b/jfj-combined/src/main.cpp new file mode 100644 index 0000000..e258723 --- /dev/null +++ b/jfj-combined/src/main.cpp @@ -0,0 +1,60 @@ +#include +#include +#include +#include + +#include "BufferUtils.hpp" +#include "formats.hpp" +#include "../include/FrameCache.hpp" +#include "../include/FrameWorker.hpp" +#include "../include/ZmqImagePublisher.hpp" + + +int main (int argc, char *argv[]) { + if (argc != 2) { + std::cout << "\nUsage: jf_buffer_writer [detector_json_filename]\n"; + std::cout << "\tdetector_json_filename: detector config file path." << std::endl; + std::cout << "\tZMQ publisher port: 5200 (high data rate)" << std::endl; + exit(-1); + } + const auto config = BufferUtils::read_json_config(std::string(argv[1])); + + + std::cout << "Creating ZMQ sockets..." << std::endl; + ZmqImagePublisher pub("*", 5200, 2); + // ... and extracting sender function + std::function zmq_publish = + std::bind(&ZmqImagePublisher::sendImage, &pub, std::placeholders::_1); + + + std::cout << "Creating frame cache..." << std::endl; + FrameCache cache(128, 1, 3, zmq_publish); + // ... and extracting push function + std::function push_cb = + std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + + + std::cout << "Creating frame workers..." << std::endl; + std::vector> vWorkers; + for(int mm=0; mm(config.start_udp_port+mm, moduleName, mm, push_cb) ); + } + + + std::cout << "Starting frame worker threads..." << std::endl; + std::vector vThreads; + for(int mm=0; mm +#include +#include "gtest/gtest.h" +#include "JfjFrameUdpReceiver.hpp" +#include "mock/udp.hpp" + +#include +#include +#include + +using namespace std; +#define NUM_TEST_MODULES 3 + + +TEST(BufferUdpReceiver, simple_recv){ + int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); + + auto handle = async(launch::async, [&](){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(NUM_TEST_MODULES*JFJOCH_DATA_BYTES_PER_MODULE); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + ASSERT_EQ(metadata.n_recv_packets, n_packets); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_middle_packet){ + int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); + + auto handle = async(launch::async, [&](){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_first_packet){ + auto n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); + + auto handle = async(launch::async, [&](){ + for (int64_t i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -2 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_last_packet){ + int n_packets = NUM_TEST_MODULES * JFJOCH_N_PACKETS_PER_MODULE; + int n_frames = 4; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + JfjFrameUdpReceiver udp_receiver(udp_port, NUM_TEST_MODULES); + + auto handle = async(launch::async, [&](){ + for (int64_t i_frame=0; i_frame < n_frames+1; i_frame++){ + for (size_t i_packet=0; i_packet(NUM_TEST_MODULES * JFJOCH_DATA_BYTES_PER_MODULE); + + // n_frames -1 because the last frame is not complete. + for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp(metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_recv_packets, n_packets - 1); + } + + ::close(send_socket_fd); +} diff --git a/jfj-combined/test/test_PacketBuffer.cpp b/jfj-combined/test/test_PacketBuffer.cpp new file mode 100644 index 0000000..7b602a8 --- /dev/null +++ b/jfj-combined/test/test_PacketBuffer.cpp @@ -0,0 +1,76 @@ +#include +#include +#include "gtest/gtest.h" +#include "PacketBuffer.hpp" + +#include +#include +#include + +using namespace std; + + + +std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) { + os << "Frame number: " << packet.framenum << std::endl; + os << "Packet number: " << packet.packetnum << std::endl; + os << "Bunch id: " << packet.bunchid << std::endl; + os << std::endl; + return os; +} + + + + +class MockReceiver{ + public: + uint64_t idx_packet = 42000; + uint64_t packet_per_frame = 512; + uint64_t num_bunches = 100; + uint64_t num_packets =50; + jfjoch_packet_t tmp; + + uint64_t receive_many(mmsghdr* msgs, const size_t n_msgs){ + // Receive 'num_packets numner of packets' + for(int ii=0; iiiov_base, &tmp, sizeof(tmp)); + idx_packet++; + } + return num_packets; + }; +}; + + + +TEST(BufferUdpReceiver, packetbuffer_simple){ + + PacketBuffer p_buffer; + MockReceiver mockery; + uint64_t prev_bunch, prev_packet; + jfjoch_packet_t p_pop; + + mockery.idx_packet = 7*512 + 13; + mockery.num_packets = 25; + + p_buffer.fill_from(mockery); + + // First packet + ASSERT_FALSE(p_buffer.is_empty()); + ASSERT_EQ(p_buffer.size(), 25); + + ASSERT_EQ(p_buffer.peek_front().bunchid, 1007); + ASSERT_EQ(p_buffer.peek_front().packetnum, 13); + prev_bunch = p_buffer.peek_front().bunchid; + prev_packet = p_buffer.peek_front().packetnum; + + p_pop = p_buffer.pop_front(); + ASSERT_EQ(p_buffer.size(), 24); + + ASSERT_EQ(p_pop.bunchid, prev_bunch); + ASSERT_EQ(p_pop.packetnum, prev_packet); + ASSERT_EQ(p_buffer.peek_front().bunchid, prev_bunch); + ASSERT_EQ(p_buffer.peek_front().packetnum, prev_packet+1); +}; diff --git a/jfj-combined/test/test_PacketUdpReceiver.cpp b/jfj-combined/test/test_PacketUdpReceiver.cpp new file mode 100644 index 0000000..1be343a --- /dev/null +++ b/jfj-combined/test/test_PacketUdpReceiver.cpp @@ -0,0 +1,170 @@ +#include +#include +#include "gtest/gtest.h" +#include "mock/udp.hpp" +#include "PacketUdpReceiver.hpp" + +#include +#include + +using namespace std; + +TEST(PacketUdpReceiver, simple_recv) +{ + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + send_udp_buffer.packetnum = 91; + send_udp_buffer.framenum = 92; + send_udp_buffer.bunchid = 93; + send_udp_buffer.debug = 94; + + auto server_address = get_server_address(udp_port); + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + this_thread::sleep_for(chrono::milliseconds(100)); + + jungfrau_packet recv_udp_buffer; + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + EXPECT_EQ(send_udp_buffer.packetnum, recv_udp_buffer.packetnum); + EXPECT_EQ(send_udp_buffer.framenum, recv_udp_buffer.framenum); + EXPECT_EQ(send_udp_buffer.bunchid, recv_udp_buffer.bunchid); + EXPECT_EQ(send_udp_buffer.debug, recv_udp_buffer.debug); + + ASSERT_FALSE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + udp_receiver.disconnect(); + ::close(send_socket_fd); +} + +TEST(PacketUdpReceiver, false_recv) +{ + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + jungfrau_packet recv_udp_buffer; + + auto server_address = get_server_address(udp_port); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET-1, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_FALSE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET-1, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET-1)); + + udp_receiver.disconnect(); + ::close(send_socket_fd); +} + +TEST(PacketUdpReceiver, receive_many) +{ + auto n_msg_buffer = JF_N_PACKETS_PER_FRAME; + jungfrau_packet recv_buffer[n_msg_buffer]; + iovec recv_buff_ptr[n_msg_buffer]; + struct mmsghdr msgs[n_msg_buffer]; + struct sockaddr_in sockFrom[n_msg_buffer]; + + for (int i = 0; i < n_msg_buffer; i++) { + recv_buff_ptr[i].iov_base = (void*) &(recv_buffer[i]); + recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet); + + msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i]; + msgs[i].msg_hdr.msg_iovlen = 1; + msgs[i].msg_hdr.msg_name = &sockFrom[i]; + msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } + + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + PacketUdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + + auto server_address = get_server_address(udp_port); + + send_udp_buffer.bunchid = 0; + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + send_udp_buffer.bunchid = 1; + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + this_thread::sleep_for(chrono::milliseconds(10)); + + auto n_msgs = udp_receiver.receive_many(msgs, JF_N_PACKETS_PER_FRAME); + ASSERT_EQ(n_msgs, 2); + + for (size_t i=0;i +#include +#include + +using namespace std; + + +uint64_t tick_counter = 0; +// Dummy callback to increase tick counter +void mock_callback(){ + tick_counter++; +}; + + +TEST(WatchdogTimer, timer_test){ + Watchdog wDog(100, &mock_callback); + + // Free running + wDog.Start(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ASSERT_EQ(tick_counter, 0); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(tick_counter, 5); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(tick_counter, 10); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(tick_counter, 15); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(tick_counter, 20); + + // Test Stop() + wDog.Stop(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(tick_counter, 20); + + // Test Kick() + tick_counter = 0; + wDog.Start(); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + ASSERT_EQ(tick_counter, 2); + for(int ii=0; ii<20 ii++){ + wDog.Kick(); + ASSERT_EQ(tick_counter, 2); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + ASSERT_EQ(tick_counter, 4); +}