From 5be091eee842bd5bb6043a8bbcd8901300028fb1 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Tue, 1 Jun 2021 12:11:15 +0200 Subject: [PATCH] Almost compiles but needs more metadata --- jfj-udp-recv/CMakeLists.txt | 30 +- jfj-udp-recv/README.md | 328 +++++++++---------- jfj-udp-recv/include/JfjFrameStats.hpp | 59 ++-- jfj-udp-recv/include/JfjFrameUdpReceiver.hpp | 7 +- jfj-udp-recv/include/PacketBuffer.hpp | 12 +- jfj-udp-recv/include/PacketUdpReceiver.hpp | 44 +-- jfj-udp-recv/mockmain.cpp | 15 - jfj-udp-recv/src/JfjFrameStats.cpp | 125 +++---- jfj-udp-recv/src/JfjFrameUdpReceiver.cpp | 30 +- jfj-udp-recv/src/main.cpp | 33 +- 10 files changed, 317 insertions(+), 366 deletions(-) delete mode 100644 jfj-udp-recv/mockmain.cpp diff --git a/jfj-udp-recv/CMakeLists.txt b/jfj-udp-recv/CMakeLists.txt index 3c83127..7380471 100644 --- a/jfj-udp-recv/CMakeLists.txt +++ b/jfj-udp-recv/CMakeLists.txt @@ -1,18 +1,12 @@ -file(GLOB SOURCES - src/*.cpp) - -add_library(jf-udp-recv-lib STATIC ${SOURCES}) -target_include_directories(jf-udp-recv-lib PUBLIC include/) -target_link_libraries(jf-udp-recv-lib - external - core-buffer-lib) - -add_executable(jf-udp-recv src/main.cpp) -set_target_properties(jf-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv) -target_link_libraries(jf-udp-recv - jf-udp-recv-lib - zmq - rt) - -enable_testing() -add_subdirectory(test/) +file(GLOB SOURCES src/*.cpp) + +add_library(jfj-udp-recv-lib STATIC ${SOURCES}) +target_include_directories(jfj-udp-recv-lib PUBLIC include/) +target_link_libraries(jfj-udp-recv-lib external core-buffer-lib) + +add_executable(jfj-udp-recv src/main.cpp) +set_target_properties(jfj-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv) +target_link_libraries(jfj-udp-recv jfj-udp-recv-lib zmq rt) + +enable_testing() +# add_subdirectory(test/) diff --git a/jfj-udp-recv/README.md b/jfj-udp-recv/README.md index 504281f..b51f999 100644 --- a/jfj-udp-recv/README.md +++ b/jfj-udp-recv/README.md @@ -1,164 +1,164 @@ -# sf-buffer -sf-buffer is the component that receives the detector data in form of UDP -packages and writes them down to disk to a binary format. In addition, it -sends a copy of the module frame to sf-stream via ZMQ. - -Each sf-buffer process is taking care of a single detector module. The -processes are all independent and do not rely on any external data input -to maximize isolation and possible interactions in our system. - -The main design principle is simplicity and decoupling: - -- No interprocess dependencies/communication. -- No dependencies on external libraries (as much as possible). -- Using POSIX as much as possible. - -We are optimizing for maintainability and long term stability. Performance is -of concern only if the performance criteria are not met. - -## Overview - -![image_buffer_overview](../docs/sf_daq_buffer-overview-buffer.jpg) - -sf-buffer is a single threaded application (without counting the ZMQ IO threads) -that does both receiving, assembling, writing and sending in the same thread. - -### UDP receiving - -Each process listens to one udp port. Packets coming to this udp port are -assembled into frames. Frames (either complete or with missing packets) are -passed forward. The number of received packets is saved so we can later -(at image assembly time) determine if the frame is valid or not. At this point -we do no validation. - -We are currently using **recvmmsg** to minimize the number of switches to -kernel mode. - -We expect all packets to come in order or not come at all. Once we see the -package for the next pulse_id we can assume no more packages are coming for -the previous one, and send the assembled frame down the program. - -### File writing - -Files are written to disk in frames - one write to disk per frame. This gives -us a relaxed 10ms interval of 1 MB writes. - -#### File format - -The binary file on disk is just a serialization of multiple -**BufferBinaryFormat** structs: -```c++ -#pragma pack(push) -#pragma pack(1) -struct ModuleFrame { - uint64_t pulse_id; - uint64_t frame_index; - uint64_t daq_rec; - uint64_t n_recv_packets; - uint64_t module_id; -}; -#pragma pack(pop) - -#pragma pack(push) -#pragma pack(1) -struct BufferBinaryFormat { - const char FORMAT_MARKER = 0xBE; - ModuleFrame meta; - char data[buffer_config::MODULE_N_BYTES]; -}; -#pragma pack(pop) -``` - -![file_layout_image](../docs/sf_daq_buffer-FileLayout.jpg) - -Each frame is composed by: - -- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame. -- **ModuleFrame** - frame meta used in image assembly phase. -- **Data** - assembled frame from a single module. - -Frames are written one after another to a specific offset in the file. The -offset is calculated based on the pulse_id, so each frame has a specific place -in the file and there is no need to have an index for frame retrieval. - -The offset where a specific pulse_id is written in a file is calculated: - -```c++ -// We save 1000 pulses in each file. -const uint64_t FILE_MOD = 1000 - -// Relative index of pulse_id inside file. -size_t file_base = pulse_id % FILE_MOD; -// Offset in bytes of relative index in file. -size_t file_offset = file_base * sizeof(BufferBinaryFormat); -``` - -We now know where to look for data inside the file, but we still don't know -inside which file to look. For this we need to discuss the folder structure. - -#### Folder structure - -The folder (as well as file) structure is deterministic in the sense that given -a specific pulse_id, we can directly calculate the folder, file, and file -offset where the data is stored. This allows us to have independent writing -and reading from the buffer without building any indexes. - -The binary files written by sf_buffer are saved to: - -[detector_folder]/[module_folder]/[data_folder]/[data_file].bin - -- **detector\_folder** should always be passed as an absolute path. This is the -container that holds all data related to a specific detector. -- **module\_folder** is usually composed like "M00", "M01". It separates data -from different modules of one detector. -- **data\_folder** and **data\_file** are automatically calculated based on the -current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our -index for accessing data. - -![folder_layout_image](../docs/sf_daq_buffer-FolderLayout.jpg) - -```c++ -// FOLDER_MOD = 100000 -int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD; -// FILE_MOD = 1000 -int data_file = (pulse_id % FILE_MOD) * FILE_MOD; -``` - -The data_folder and data_file folders are named as the first pulse_id that -should be stored inside them. - -FOLDER_MOD == 100000 means that each data_folder will contain data for 100000 -pulses, while FILE_MOD == 1000 means that each file inside the data_folder -will contain 1000 pulses. The total number of data_files in each data_folder -will therefore be **FILE\_MOD / FOLDER\_MOD = 100**. - -#### Analyzing the buffer on disk -In **sf-utils** there is a Python module that allows you to read directly the -buffer in order to debug it or to verify the consistency between the HDF5 file -and the received data. - -- VerifyH5DataConsistency.py checks the consistency between the H5 file and -buffer. -- BinaryBufferReader.py reads the buffer and prints meta. The class inside -can also be used in external scripts. - -### ZMQ sending - -A copy of the data written to disk is also send via ZMQ to the sf-stream. This -is used to provide live viewing / processing capabilities. Each module data is -sent separately, and this is later assembled in the sf-stream. - -We use the PUB/SUB mechanism for distributing this data - we cannot control the -rate of the producer, and we would like to avoid distributed image assembly -if possible, so PUSH/PULL does not make sense in this case. - -We provide no guarantees on live data delivery, but in practice the number of -dropped or incomplete frames in currently negligible. - -The protocol is a serialization of the same data structures we use to -write on disk (no need for additional memory operations before sending out -data). It uses a 2 part multipart ZMQ message: - -- The first part is a serialization of the ModuleFrame struct (see above). -- The second part is the data field in the BufferBinaryFormat struct (the frame -data). +# sf-buffer +sf-buffer is the component that receives the detector data in form of UDP +packages and writes them down to disk to a binary format. In addition, it +sends a copy of the module frame to sf-stream via ZMQ. + +Each sf-buffer process is taking care of a single detector module. The +processes are all independent and do not rely on any external data input +to maximize isolation and possible interactions in our system. + +The main design principle is simplicity and decoupling: + +- No interprocess dependencies/communication. +- No dependencies on external libraries (as much as possible). +- Using POSIX as much as possible. + +We are optimizing for maintainability and long term stability. Performance is +of concern only if the performance criteria are not met. + +## Overview + +![image_buffer_overview](../docs/sf_daq_buffer-overview-buffer.jpg) + +sf-buffer is a single threaded application (without counting the ZMQ IO threads) +that does both receiving, assembling, writing and sending in the same thread. + +### UDP receiving + +Each process listens to one udp port. Packets coming to this udp port are +assembled into frames. Frames (either complete or with missing packets) are +passed forward. The number of received packets is saved so we can later +(at image assembly time) determine if the frame is valid or not. At this point +we do no validation. + +We are currently using **recvmmsg** to minimize the number of switches to +kernel mode. + +We expect all packets to come in order or not come at all. Once we see the +package for the next pulse_id we can assume no more packages are coming for +the previous one, and send the assembled frame down the program. + +### File writing + +Files are written to disk in frames - one write to disk per frame. This gives +us a relaxed 10ms interval of 1 MB writes. + +#### File format + +The binary file on disk is just a serialization of multiple +**BufferBinaryFormat** structs: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) + +#pragma pack(push) +#pragma pack(1) +struct BufferBinaryFormat { + const char FORMAT_MARKER = 0xBE; + ModuleFrame meta; + char data[buffer_config::MODULE_N_BYTES]; +}; +#pragma pack(pop) +``` + +![file_layout_image](../docs/sf_daq_buffer-FileLayout.jpg) + +Each frame is composed by: + +- **FORMAT\_MARKER** (0xBE) - a control byte to determine the validity of the frame. +- **ModuleFrame** - frame meta used in image assembly phase. +- **Data** - assembled frame from a single module. + +Frames are written one after another to a specific offset in the file. The +offset is calculated based on the pulse_id, so each frame has a specific place +in the file and there is no need to have an index for frame retrieval. + +The offset where a specific pulse_id is written in a file is calculated: + +```c++ +// We save 1000 pulses in each file. +const uint64_t FILE_MOD = 1000 + +// Relative index of pulse_id inside file. +size_t file_base = pulse_id % FILE_MOD; +// Offset in bytes of relative index in file. +size_t file_offset = file_base * sizeof(BufferBinaryFormat); +``` + +We now know where to look for data inside the file, but we still don't know +inside which file to look. For this we need to discuss the folder structure. + +#### Folder structure + +The folder (as well as file) structure is deterministic in the sense that given +a specific pulse_id, we can directly calculate the folder, file, and file +offset where the data is stored. This allows us to have independent writing +and reading from the buffer without building any indexes. + +The binary files written by sf_buffer are saved to: + +[detector_folder]/[module_folder]/[data_folder]/[data_file].bin + +- **detector\_folder** should always be passed as an absolute path. This is the +container that holds all data related to a specific detector. +- **module\_folder** is usually composed like "M00", "M01". It separates data +from different modules of one detector. +- **data\_folder** and **data\_file** are automatically calculated based on the +current pulse_id, FOLDER_MOD and FILE_MOD attributes. This folders act as our +index for accessing data. + +![folder_layout_image](../docs/sf_daq_buffer-FolderLayout.jpg) + +```c++ +// FOLDER_MOD = 100000 +int data_folder = (pulse_id % FOLDER_MOD) * FOLDER_MOD; +// FILE_MOD = 1000 +int data_file = (pulse_id % FILE_MOD) * FILE_MOD; +``` + +The data_folder and data_file folders are named as the first pulse_id that +should be stored inside them. + +FOLDER_MOD == 100000 means that each data_folder will contain data for 100000 +pulses, while FILE_MOD == 1000 means that each file inside the data_folder +will contain 1000 pulses. The total number of data_files in each data_folder +will therefore be **FILE\_MOD / FOLDER\_MOD = 100**. + +#### Analyzing the buffer on disk +In **sf-utils** there is a Python module that allows you to read directly the +buffer in order to debug it or to verify the consistency between the HDF5 file +and the received data. + +- VerifyH5DataConsistency.py checks the consistency between the H5 file and +buffer. +- BinaryBufferReader.py reads the buffer and prints meta. The class inside +can also be used in external scripts. + +### ZMQ sending + +A copy of the data written to disk is also send via ZMQ to the sf-stream. This +is used to provide live viewing / processing capabilities. Each module data is +sent separately, and this is later assembled in the sf-stream. + +We use the PUB/SUB mechanism for distributing this data - we cannot control the +rate of the producer, and we would like to avoid distributed image assembly +if possible, so PUSH/PULL does not make sense in this case. + +We provide no guarantees on live data delivery, but in practice the number of +dropped or incomplete frames in currently negligible. + +The protocol is a serialization of the same data structures we use to +write on disk (no need for additional memory operations before sending out +data). It uses a 2 part multipart ZMQ message: + +- The first part is a serialization of the ModuleFrame struct (see above). +- The second part is the data field in the BufferBinaryFormat struct (the frame +data). diff --git a/jfj-udp-recv/include/JfjFrameStats.hpp b/jfj-udp-recv/include/JfjFrameStats.hpp index 7839a38..7d53b6c 100644 --- a/jfj-udp-recv/include/JfjFrameStats.hpp +++ b/jfj-udp-recv/include/JfjFrameStats.hpp @@ -1,31 +1,28 @@ -#include -#include -#include - -#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP -#define SF_DAQ_BUFFER_FRAMESTATS_HPP - - -class FrameStats { - const std::string detector_name_; - const int module_id_; - size_t stats_time_; - - int frames_counter_; - int n_missed_packets_; - int n_corrupted_frames_; - int n_corrupted_pulse_id_; - std::chrono::time_point stats_interval_start_; - - void reset_counters(); - void print_stats(); - -public: - FrameStats(const std::string &detector_name, - const int module_id, - const size_t stats_time); - void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); -}; - - -#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP +#include +#include +#include + +#ifndef SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP + + +class FrameStats { + const std::string detector_name_; + size_t stats_time_; + + int frames_counter_; + int n_missed_packets_; + int n_corrupted_frames_; + int n_corrupted_pulse_id_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + FrameStats(const std::string &detector_name, const size_t stats_time); + void record_stats(const ImageMetadata &meta, const bool bad_pulse_id); +}; + + +#endif //SF_DAQ_BUFFER_JFJ_FRAMESTATS_HPP diff --git a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp index d5c5ddf..fc9a284 100644 --- a/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp +++ b/jfj-udp-recv/include/JfjFrameUdpReceiver.hpp @@ -6,7 +6,7 @@ #include "formats.hpp" #include "buffer_config.hpp" #include "PacketBuffer.hpp" - +#include "jungfraujoch.hpp" /** JungfrauJoch UDP receiver @@ -14,11 +14,12 @@ NOTE: This design will not scale well for higher frame rates... **/ class JfjFrameUdpReceiver { - PacketUdpReceiver udp_receiver_; + PacketUdpReceiver m_udp_receiver; + uint64_t m_frame_index; PacketBuffer m_buffer; - inline void init_frame(ImageMetadata& frame_metadata, jfjoch_packet_t& c_packet); + inline void init_frame(ImageMetadata& frame_metadata, const jfjoch_packet_t& c_packet); inline uint64_t process_packets(ImageMetadata& metadata, char* frame_buffer); public: diff --git a/jfj-udp-recv/include/PacketBuffer.hpp b/jfj-udp-recv/include/PacketBuffer.hpp index 46958e6..b2be33b 100644 --- a/jfj-udp-recv/include/PacketBuffer.hpp +++ b/jfj-udp-recv/include/PacketBuffer.hpp @@ -34,8 +34,8 @@ public: /**Diagnostics**/ size_t size() const { return ( idx_write-idx_read ); } size_t capacity() const { return m_capacity; } - bool is_full() const { return (idx_write >= m_capacity); } - bool is_empty() const { return (idx_write <= idx_read); } + bool is_full() const { return bool(idx_write >= m_capacity); } + bool is_empty() const { return bool(idx_write <= idx_read); } /**Operators**/ void reset(){ idx_write = 0; idx_read = 0; }; // Reset the buffer @@ -43,15 +43,15 @@ public: mmsghdr& msgs(){ return m_msgs; }; /**Element access**/ - const T& pop_front(); //Destructive read + T& pop_front(); //Destructive read const T& peek_front(); //Non-destructive read void push_back(T item); //Write new element to buffer /**Fill from UDP receiver**/ template - void fill_fom(TY& recv){ + void fill_from(TY& recv){ std::lock_guard g_guard(m_mutex); - this->idx_write = recv.receive_many(this->msgs(), this->capacity()); + this->idx_write = recv.receive_many(m_msgs, this->capacity()); this->idx_read = 0; } @@ -80,7 +80,7 @@ private: Standard read access to queues (i.e. progress the read pointer). Throws 'std::length_error' if container is empty. **/ template -const T& PacketBuffer::pop_front(){ +T& PacketBuffer::pop_front(){ std::lock_guard g_guard(m_mutex); if(this->is_empty()){ throw std::out_of_range("Attempted to read empty queue!"); } idx_read++; diff --git a/jfj-udp-recv/include/PacketUdpReceiver.hpp b/jfj-udp-recv/include/PacketUdpReceiver.hpp index da92d85..cc7e093 100644 --- a/jfj-udp-recv/include/PacketUdpReceiver.hpp +++ b/jfj-udp-recv/include/PacketUdpReceiver.hpp @@ -1,22 +1,22 @@ -#ifndef UDPRECEIVER_H -#define UDPRECEIVER_H - -#include - -class PacketUdpReceiver { - - int socket_fd_; - -public: - PacketUdpReceiver(); - virtual ~PacketUdpReceiver(); - - bool receive(void* buffer, const size_t buffer_n_bytes); - int receive_many(mmsghdr* msgs, const size_t n_msgs); - - void bind(const uint16_t port); - void disconnect(); -}; - - -#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H +#ifndef UDPRECEIVER_H +#define UDPRECEIVER_H + +#include + +class PacketUdpReceiver { + + int socket_fd_; + +public: + PacketUdpReceiver(); + virtual ~PacketUdpReceiver(); + + bool receive(void* buffer, const size_t buffer_n_bytes); + int receive_many(mmsghdr* msgs, const size_t n_msgs); + + void bind(const uint16_t port); + void disconnect(); +}; + + +#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H diff --git a/jfj-udp-recv/mockmain.cpp b/jfj-udp-recv/mockmain.cpp deleted file mode 100644 index 1d33d2b..0000000 --- a/jfj-udp-recv/mockmain.cpp +++ /dev/null @@ -1,15 +0,0 @@ - -#include "include/PacketBuffer.hpp" - - -struct DummyContainer{ - uint64_t index; - uint64_t timestamp; - uint16_t data[32]; -}; - - -int main (int argc, char *argv[]) { - PacketBuffer b; - -} diff --git a/jfj-udp-recv/src/JfjFrameStats.cpp b/jfj-udp-recv/src/JfjFrameStats.cpp index 28161c7..36bb614 100644 --- a/jfj-udp-recv/src/JfjFrameStats.cpp +++ b/jfj-udp-recv/src/JfjFrameStats.cpp @@ -1,71 +1,54 @@ -#include -#include "FrameStats.hpp" - -using namespace std; -using namespace chrono; - -FrameStats::FrameStats( - const std::string &detector_name, - const int module_id, - const size_t stats_time) : - detector_name_(detector_name), - module_id_(module_id), - stats_time_(stats_time) -{ - reset_counters(); -} - -void FrameStats::reset_counters() -{ - frames_counter_ = 0; - n_missed_packets_ = 0; - n_corrupted_frames_ = 0; - n_corrupted_pulse_id_ = 0; - stats_interval_start_ = steady_clock::now(); -} - -void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) -{ - - if (bad_pulse_id) { - n_corrupted_pulse_id_++; - } - - if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { - n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; - n_corrupted_frames_++; - } - - frames_counter_++; - - auto time_passed = duration_cast( - steady_clock::now()-stats_interval_start_).count(); - - if (time_passed >= stats_time_*1000) { - print_stats(); - reset_counters(); - } -} - -void FrameStats::print_stats() -{ - auto interval_ms_duration = duration_cast( - steady_clock::now()-stats_interval_start_).count(); - // * 1000 because milliseconds, + 250 because of truncation. - int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; - uint64_t timestamp = time_point_cast( - system_clock::now()).time_since_epoch().count(); - - // Output in InfluxDB line protocol - cout << "jf_udp_recv"; - cout << ",detector_name=" << detector_name_; - cout << ",module_name=M" << module_id_; - cout << " "; - cout << "n_missed_packets=" << n_missed_packets_ << "i"; - cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; - cout << ",repetition_rate=" << rep_rate << "i"; - cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; - cout << " "; - cout << timestamp; - cout << endl; -} +#include +#include "JfjFrameStats.hpp" + +using namespace std; +using namespace chrono; + +FrameStats::FrameStats(const std::string &detector_name, const size_t stats_time) : + detector_name_(detector_name), stats_time_(stats_time) { + reset_counters(); +} + +void FrameStats::reset_counters() +{ + frames_counter_ = 0; + n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void FrameStats::record_stats(const ImageMetadata &meta, const bool bad_pulse_id) +{ + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + n_corrupted_frames_++; + } + + frames_counter_++; + + auto time_passed = duration_cast(steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { + print_stats(); + reset_counters(); + } +} + +void FrameStats::print_stats(){ + auto interval_ms_duration = duration_cast(steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((frames_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast(system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jfj_udp_recv"; + cout << ",detector_name=" << detector_name_; + cout << " "; + cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp index aa38ec9..88b9ec0 100644 --- a/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp +++ b/jfj-udp-recv/src/JfjFrameUdpReceiver.cpp @@ -1,23 +1,23 @@ #include -#include +#include #include "JfjFrameUdpReceiver.hpp" using namespace std; using namespace buffer_config; JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) { - udp_receiver_.bind(port); + m_udp_receiver.bind(port); } JfjFrameUdpReceiver::~JfjFrameUdpReceiver() { - udp_receiver_.disconnect(); + m_udp_receiver.disconnect(); } -inline void JfjFrameUdpReceiver::init_frame(ImageMetadata& frame_metadata, const jfjoch_packet_t& c_packet) { - frame_metadata.pulse_id = c_packet.timestamp; - frame_metadata.frame_index = c_packet.framenum; - frame_metadata.daq_rec = (uint32_t) c_packet.debug; - frame_metadata.is_good_image = (int32_t) true; +inline void JfjFrameUdpReceiver::init_frame(ImageMetadata& metadata, const jfjoch_packet_t& c_packet) { + metadata.pulse_id = c_packet.timestamp; + metadata.frame_index = c_packet.framenum; + metadata.daq_rec = (uint32_t) c_packet.debug; + metadata.is_good_image = (int32_t) true; } inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, char* frame_buffer){ @@ -26,7 +26,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch // Happens if the last packet from the previous frame gets lost. if (m_frame_index != m_buffer.peek_front().framenum) { m_frame_index = m_buffer.peek_front().framenum; - frame_metadata.is_good_image = (int32_t) false; + metadata.is_good_image = (int32_t) false; return metadata.pulse_id; } @@ -38,12 +38,11 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch this->init_frame(metadata, c_packet); // Copy data to frame buffer - size_t offset = JUNGFRAU_DATA_BYTES_PER_PACKET * c_packet.packetnum; - memcpy( (void*) (frame_buffer + offset), c_packet.data, JUNGFRAU_DATA_BYTES_PER_PACKET); - metadata.n_recv_packets++; + size_t offset = JFJOCH_DATA_BYTES_PER_PACKET * c_packet.packetnum; + memcpy( (void*) (frame_buffer + offset), c_packet.data, JFJOCH_DATA_BYTES_PER_PACKET); // Last frame packet received. Frame finished. - if (c_packet.packetnum == JFJ_N_PACKETS_PER_FRAME - 1){ + if (c_packet.packetnum == JFJOCH_N_PACKETS_PER_FRAME - 1){ return metadata.pulse_id; } } @@ -54,10 +53,9 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ImageMetadata& metadata, ch } uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ImageMetadata& metadata, char* frame_buffer){ - // Reset the metadata and frame buffer for the next frame. + // Reset the metadata and frame buffer for the next frame. (really needed?) metadata.pulse_id = 0; - metadata.n_recv_packets = 0; - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_PACKET); // Process leftover packages in the buffer if (!m_buffer.is_empty()) { diff --git a/jfj-udp-recv/src/main.cpp b/jfj-udp-recv/src/main.cpp index 4bd2a17..df9d0fa 100644 --- a/jfj-udp-recv/src/main.cpp +++ b/jfj-udp-recv/src/main.cpp @@ -5,9 +5,9 @@ #include "formats.hpp" #include "buffer_config.hpp" -#include "FrameUdpReceiver.hpp" +#include "JfjFrameUdpReceiver.hpp" #include "BufferUtils.hpp" -#include "FrameStats.hpp" +#include "JfjFrameStats.hpp" using namespace std; using namespace chrono; @@ -18,7 +18,7 @@ int main (int argc, char *argv[]) { if (argc != 3) { cout << endl; - cout << "Usage: jf_udp_recv [detector_json_filename] [module_id]"; + cout << "Usage: jfj_udp_recv [detector_json_filename] [module_id]"; cout << endl; cout << "\tdetector_json_filename: detector config file path." << endl; cout << "\tmodule_id: id of the module for this process." << endl; @@ -30,18 +30,18 @@ int main (int argc, char *argv[]) { const auto config = read_json_config(string(argv[1])); const int module_id = atoi(argv[2]); - const auto udp_port = config.start_udp_port + module_id; - ImageUdpReceiver receiver(udp_port, module_id); + const auto udp_port = config.start_udp_port; + JfjFrameUdpReceiver receiver(udp_port); RamBuffer buffer(config.detector_name, config.n_modules); - ImageStats stats(config.detector_name, module_id, STATS_TIME); + FrameStats stats(config.detector_name, STATS_TIME); auto ctx = zmq_ctx_new(); - auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); - + zmq_ctx_set(ctx, ZMQ_IO_THREADS, ZMQ_IO_THREADS); + auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "jungfraujoch"); // Might be better creating a structure for double buffering ImageMetadata metaBufferA; - char* dataBufferA = new char[IMAGE_N_BYTES]; + char* dataBufferA = new char[JFJOCH_DATA_BYTES_PER_FRAME]; uint64_t pulse_id_previous = 0; uint64_t frame_index_previous = 0; @@ -49,22 +49,15 @@ int main (int argc, char *argv[]) { while (true) { // NOTE: Needs to be pipelined for really high frame rates - auto pulse_id = receiver.get_image_from_udp(metaBufferA, dataBufferA); + auto pulse_id = receiver.get_frame_from_udp(metaBufferA, dataBufferA); bool bad_pulse_id = false; - if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) || - ( (pulse_id-pulse_id_previous) < 0 ) || - ( (pulse_id-pulse_id_previous) > 1000 ) ) { - + if ( ( metaBufferA.frame_index != (frame_index_previous+1) ) || ( (pulse_id-pulse_id_previous) < 0 ) || ( (pulse_id-pulse_id_previous) > 1000 ) ) { bad_pulse_id = true; - } else { - buffer.write_frame(metaBufferA, dataBufferA); - - zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); - + zmq_send(sender, &metaBufferA, sizeof(metaBufferA), 0); } stats.record_stats(metaBufferA, bad_pulse_id); @@ -74,5 +67,5 @@ int main (int argc, char *argv[]) { } - delete[] data; + delete[] dataBufferA; }