From eb9c87cf2f8f115c1c97cf1b79294063544ad334 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 16:12:53 +0200 Subject: [PATCH] Add std-udp-sync project --- CMakeLists.txt | 6 +- std-udp-sync/CMakeLists.txt | 22 +++ std-udp-sync/README.md | 179 ++++++++++++++++++ std-udp-sync/include/SyncStats.hpp | 27 +++ std-udp-sync/include/UdpSyncConfig.hpp | 29 +++ std-udp-sync/include/ZmqPulseSyncReceiver.hpp | 34 ++++ std-udp-sync/include/sync_config.hpp | 11 ++ std-udp-sync/src/SyncStats.cpp | 55 ++++++ std-udp-sync/src/ZmqPulseSyncReceiver.cpp | 127 +++++++++++++ std-udp-sync/src/main.cpp | 69 +++++++ std-udp-sync/test/CMakeLists.txt | 7 + std-udp-sync/test/main.cpp | 8 + 12 files changed, 570 insertions(+), 4 deletions(-) create mode 100644 std-udp-sync/CMakeLists.txt create mode 100644 std-udp-sync/README.md create mode 100644 std-udp-sync/include/SyncStats.hpp create mode 100644 std-udp-sync/include/UdpSyncConfig.hpp create mode 100644 std-udp-sync/include/ZmqPulseSyncReceiver.hpp create mode 100644 std-udp-sync/include/sync_config.hpp create mode 100644 std-udp-sync/src/SyncStats.cpp create mode 100644 std-udp-sync/src/ZmqPulseSyncReceiver.cpp create mode 100644 std-udp-sync/src/main.cpp create mode 100644 std-udp-sync/test/CMakeLists.txt create mode 100644 std-udp-sync/test/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 4365202..1bbfa25 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,12 +32,10 @@ add_subdirectory( add_subdirectory("core-buffer") add_subdirectory("std-udp-recv") +add_subdirectory("std-udp-sync") #add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") add_subdirectory("sf-stream") #add_subdirectory("sf-writer") - -if(BUILD_JF_LIVE_WRITER) - add_subdirectory("jf-live-writer") -endif() +add_subdirectory("jf-live-writer") diff --git a/std-udp-sync/CMakeLists.txt b/std-udp-sync/CMakeLists.txt new file mode 100644 index 0000000..7466d1e --- /dev/null +++ b/std-udp-sync/CMakeLists.txt @@ -0,0 +1,22 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(std-udp-sync-lib STATIC ${SOURCES}) +target_include_directories(std-udp-sync-lib PUBLIC include/) +target_link_libraries(std-udp-sync-lib + external + core-buffer-lib) + +add_executable(std-udp-sync src/main.cpp) + +set_target_properties(std-udp-sync PROPERTIES OUTPUT_NAME std_udp_sync) +target_link_libraries(std-udp-sync + external + core-buffer-lib + std-udp-sync-lib + zmq + pthread + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/std-udp-sync/README.md b/std-udp-sync/README.md new file mode 100644 index 0000000..ce21a4d --- /dev/null +++ b/std-udp-sync/README.md @@ -0,0 +1,179 @@ +# sf-stream +sf-stream is the component that receives a live stream of frame data from +sf-buffers over ZMQ and assembles them into images. This images are then +sent again over ZMQ to external components. There is always only 1 sf-stream +per detector. + +It currently has 3 output streams: + +- **Full data full meta** rate stream (send all images and meta) +- **Reduced data full meta** rate stream (send less images, but +all meta) +- **Pulse_id** stream (send only the current pulse_id) + +In addition to receiving and assembling images, sf-stream also calculates +additional meta and constructs the structures needed to send data in +Array 1.0 protocol. + +This component does not guarantee that the streams will always contain all +the data - it can happen that frame resynchronization is needed, and in this +case 1 or more frames could potentially be lost. This happens so rarely that in +practice is not a problem. + +## Overview + +![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg) + +sf-stream is a single threaded application (without counting the ZMQ IO threads) +that is used for providing live assembled images to anyone willing to listen. + +In addition, it also provides a pulse_id stream, which is the most immediate +pulse_id feedback we currently have in case we need to synchronize external +components to the current machine pulse_id. + +## ZMQ receiving +Each ZMQ stream is coming from a separate sf-buffer. This means that we have as +many connections as we have modules in a detector. + +Messages are multipart (2 parts) and are received in PUB/SUB mode. + +There is no need for special synchronization between modules as we expect that +frames will always be in the correct order and all modules will provide the +same frame more or less at the same time. If any of this 2 conditions is not +met, the detector is not working properly and we cannot guaranty that sf-stream +will work correctly. + +Nonetheless we provide the capability to synchronize the streams in image +assembly phase - this is needed rarely, but occasionally happens. In this sort +of hiccups we usually loose only a couple of consecutive images. + +### Messages format +Each message is composed by 2 parts: + +- Serialization of ModuleFrame in the first part. +- Frame data in the second part. + +Module frame is defined as: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) +``` + +The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in +**uint16** representing the detector image. + +## Image assembly +We first synchronize the modules. We do this by reading all sockets and +deciding the largest frame pulse_id among them (max_pulse_id). We then calculate +the diff between a specific socket pulse_id and the max_pulse_id. +This difference tells us how many messages we need to discard from a specific socket. + +This discarding is the source of possible missing images in the output stream. +It can happen in 3 cases: + +- At least one of the detector modules did not sent any packets for the specific +pulse_id. +- All the packets from a specific module for a pulse_id were lost before UDP +receiving them. +- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was +dropped. + +All this 3 cases are highly unlikely, so synchronization is mostly needed when +first starting sf-stream. Different sockets connect to sf-buffers at different +times. Apart from the initial synchronization there should be no need to +re-synchronize modules in a healthy running environment. + +If an image is missing any ZMQ messages from sf-buffers (not all modules data +arrived), the image will be dropped. We do not do partial reconstruction in +sf-stream. However, it is important to note, that this does not cover the case +where frames are incomplete (missing UDP packets on sf-buffer) - we still +assemble this images as long as at least 1 packet/frame for a specific pulse_id +arrived. + +## ZMQ sending + +We devide the ZMQ sending to 3 types of stream: + +- Data processing stream. This is basically the complete stream from +the detector with all meta and data. It can be described as full data full +meta stream. Only 1 client at the time can be connected to this stream +(PUSH/PULL for load balancing). + +- Live viewing stream. This is a reduced data full meta stream. We send +meta for all frames, but data only for subset of them (10Hz, for example). +Any number of clients can connect to the 10Hz stream, because we use PUB/SUB +for this socket. + +- Pulse_id stream. This is a stream that sends out only the current pulse_id. +It can be used to synchronize any external system with the current pulse_id +being recorded. Multiple clients can connect to this stream. + +In the data processing and live viewing stream we use +[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md) +as our protocol to be compatible with currently available external components. + +We use following fields in the JSON header: + +| Name | Type | Comment | +| --- | --- | --- | +| pulse_id | uint64 |bunchid from detector header| +|frame|uint64|frame_index from detector header| +|is_good_frame|bool|true if all packets for this frame are present| +|daq_rec|uint32|daqrec from detector header| +|pedestal_file|string|Path to pedestal file| +|gain_file|string|Path to gain file| +|number_frames_expected|int|Number of expected frames| +|run_name|string|Name of the run| +|detector_name|string|Name of the detector| +|htype|string|Value: "array-1.0"| +|type|string|Value: "uint16"| +|shape|Array[uint64]|Shape of the image in stream| + +### Full data full meta stream + +This stream runs at detector frequency and uses PUSH/PULL to distribute data +to max 1 client (this client can have many processes, but it needs to be a +single logical entity, since the images are evenly distributed to all +connected sockets). + +![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg) + +The goal here is to provide a complete copy of the detector image stream +for purposes of online analysis. Given the large amount of data on this +stream only "pre-approved" applications that can handle the load should be +attached here. + +### Reduced data full meta stream + +This streams also runs at detector frequency for JSON headers (meta), but +it sends only part of the images in the stream. The rest of the images are +sent as empty buffers (the receiver needs to be aware of this behaviour, as +Array 1.0 alone does not define it). + +![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg) + +This is the lightweight version of the image stream. Any number of clients +can connect to this stream (PUB/SUB) but no client can do load +balancing automatically (it would require PUSH/PULL). + +This is a "public interface" for anyone who wants to get detector data live, +and can do with only a subset of images. + +### Pulse_id stream + +This stream runs ar detector frequency in PUB/SUB mode. The only thing it +does is sends out the pulse_id (of the just received image) in uint64_t +format. + +![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg) + +This is also a "public interface" for anyone who wants to get the current +system pulse_id. \ No newline at end of file diff --git a/std-udp-sync/include/SyncStats.hpp b/std-udp-sync/include/SyncStats.hpp new file mode 100644 index 0000000..18b9d1d --- /dev/null +++ b/std-udp-sync/include/SyncStats.hpp @@ -0,0 +1,27 @@ +#ifndef SF_DAQ_BUFFER_SYNCSTATS_HPP +#define SF_DAQ_BUFFER_SYNCSTATS_HPP + +#include +#include +#include + +class SyncStats { + const std::string detector_name_; + const size_t stats_modulo_; + + int image_counter_; + int n_sync_lost_images_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + SyncStats(const std::string &detector_name, + const size_t stats_modulo); + + void record_stats(const uint32_t n_lost_pulses); +}; + + +#endif //SF_DAQ_BUFFER_SYNCSTATS_HPP diff --git a/std-udp-sync/include/UdpSyncConfig.hpp b/std-udp-sync/include/UdpSyncConfig.hpp new file mode 100644 index 0000000..89d735a --- /dev/null +++ b/std-udp-sync/include/UdpSyncConfig.hpp @@ -0,0 +1,29 @@ +#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP +#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP + + +#include +#include +#include +#include +#include + +struct UdpSyncConfig { + static UdpSyncConfig from_json_file(const std::string& filename) { + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["detector_name"].GetString(), + config_parameters["n_modules"].GetInt() + }; + } + + const std::string detector_name; + const int n_modules; +}; + + +#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP diff --git a/std-udp-sync/include/ZmqPulseSyncReceiver.hpp b/std-udp-sync/include/ZmqPulseSyncReceiver.hpp new file mode 100644 index 0000000..26a5d5b --- /dev/null +++ b/std-udp-sync/include/ZmqPulseSyncReceiver.hpp @@ -0,0 +1,34 @@ +#ifndef SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP +#define SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP + + +#include +#include +#include + +#include "formats.hpp" + +struct PulseAndSync { + const uint64_t image_id; + const uint32_t n_lost_pulses; +}; + +class ZmqPulseSyncReceiver { + + void* ctx_; + const int n_modules_; + + std::vector sockets_; + +public: + ZmqPulseSyncReceiver( + void* ctx, + const std::string& detector_name, + const int n_modules); + ~ZmqPulseSyncReceiver(); + + PulseAndSync get_next_pulse_id() const; +}; + + +#endif //SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP diff --git a/std-udp-sync/include/sync_config.hpp b/std-udp-sync/include/sync_config.hpp new file mode 100644 index 0000000..ff8f500 --- /dev/null +++ b/std-udp-sync/include/sync_config.hpp @@ -0,0 +1,11 @@ +namespace sync_config +{ + // If the modules are offset more than 1000 pulses, crush. + const uint64_t PULSE_OFFSET_LIMIT = 100; + + // Number of times we try to re-sync in case of failure. + const int SYNC_RETRY_LIMIT = 3; + + // Number of pulses between each statistics print out. + const size_t SYNC_STATS_MODULO = 1000; +} diff --git a/std-udp-sync/src/SyncStats.cpp b/std-udp-sync/src/SyncStats.cpp new file mode 100644 index 0000000..e9bb76d --- /dev/null +++ b/std-udp-sync/src/SyncStats.cpp @@ -0,0 +1,55 @@ +#include "SyncStats.hpp" + +#include + +using namespace std; +using namespace chrono; + +SyncStats::SyncStats( + const std::string &detector_name, + const size_t stats_modulo) : + detector_name_(detector_name), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void SyncStats::reset_counters() +{ + image_counter_ = 0; + n_sync_lost_images_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void SyncStats::record_stats(const uint32_t n_lost_pulses) +{ + image_counter_++; + n_sync_lost_images_ += n_lost_pulses; + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void SyncStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "std_udp_sync"; + cout << ",detector_name=" << detector_name_; + cout << " "; + cout << "n_processed_images=" << image_counter_ << "i"; + cout << ",n_sync_lost_images=" << n_sync_lost_images_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} + diff --git a/std-udp-sync/src/ZmqPulseSyncReceiver.cpp b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp new file mode 100644 index 0000000..c8a87cb --- /dev/null +++ b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp @@ -0,0 +1,127 @@ +#include "ZmqPulseSyncReceiver.hpp" +#include "BufferUtils.hpp" + +#include +#include +#include +#include +#include +#include "date.h" + +#include "sync_config.hpp" + +using namespace std; +using namespace chrono; +using namespace sync_config; + + +ZmqPulseSyncReceiver::ZmqPulseSyncReceiver( + void * ctx, + const string& detector_name, + const int n_modules) : + ctx_(ctx), + n_modules_(n_modules) +{ + sockets_.reserve(n_modules_); + + for (int i=0; i::max();; + uint64_t max_pulse_id = 0; + + for (int i = 0; i < n_modules_; i++) { + min_pulse_id = min(min_pulse_id, pulses[i]); + max_pulse_id = max(max_pulse_id, pulses[i]); + } + + auto max_diff = max_pulse_id - min_pulse_id; + if (max_diff > PULSE_OFFSET_LIMIT) { + stringstream err_msg; + err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; + err_msg << " PULSE_OFFSET_LIMIT exceeded."; + err_msg << " max_diff=" << max_diff << " pulses."; + + for (int i = 0; i < n_modules_; i++) { + err_msg << " (module " << i << ", "; + err_msg << pulses[i] << "),"; + } + err_msg << endl; + + throw runtime_error(err_msg.str()); + } + + modules_in_sync = true; + // Max pulses we lost in this sync attempt. + uint32_t i_sync_lost_pulses = 0; + for (int i = 0; i < n_modules_; i++) { + // How many pulses we lost for this specific module. + uint32_t i_module_lost_pulses = 0; + while (pulses[i] < max_pulse_id) { + zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); + i_module_lost_pulses++; + } + + i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); + + if (pulses[i] != max_pulse_id) { + modules_in_sync = false; + } + } + n_lost_pulses += i_sync_lost_pulses; + + if (modules_in_sync) { + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id]"; + cout << " modules_in_sync false"; + cout << endl; + #endif + return {pulses[0], n_lost_pulses}; + } + } + + stringstream err_msg; + err_msg << "[ZmqLiveReceiver::get_next_pulse_id]"; + err_msg << " SYNC_RETRY_LIMIT exceeded."; + err_msg << endl; + + throw runtime_error(err_msg.str()); +} diff --git a/std-udp-sync/src/main.cpp b/std-udp-sync/src/main.cpp new file mode 100644 index 0000000..1068091 --- /dev/null +++ b/std-udp-sync/src/main.cpp @@ -0,0 +1,69 @@ +#include +#include +#include +#include +#include +#include + +#include "date.h" +#include +#include "sync_config.hpp" +#include "ZmqPulseSyncReceiver.hpp" +#include "UdpSyncConfig.hpp" + + +using namespace std; +using namespace sync_config; + +#ifdef USE_EIGER + #include "eiger.hpp" +#else + #include "jungfrau.hpp" +#endif + +int main (int argc, char *argv[]) +{ + if (argc != 3) { + cout << endl; + cout << "Usage: std_udp_sync [detector_json_filename] [bit_depth]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; + cout << endl; + + exit(-1); + } + + const auto config = UdpSyncConfig::from_json_file(string(argv[1])); + const int bit_depth = atoi(argv[2]); + + const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8; + + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, 1); + + auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "sync"); + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [Assembler] :"; + cout << " Details of Assembler:"; + cout << " detector_name: " << config.detector_name; + cout << " || n_modules: " << config.n_modules; + cout << endl; + #endif + + RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), + FRAME_N_BYTES, config.n_modules); + + ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); + SyncStats stats(config.detector_name, SYNC_STATS_MODULO); + + while (true) { + auto meta = receiver.get_next_pulse_id(); + + zmq_send(sender, &meta.image_id, sizeof(meta.image_id), 0); + + stats.record_stats(meta.n_lost_pulses); + } +} diff --git a/std-udp-sync/test/CMakeLists.txt b/std-udp-sync/test/CMakeLists.txt new file mode 100644 index 0000000..01fc383 --- /dev/null +++ b/std-udp-sync/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(std-udp-sync-tests main.cpp) + +target_link_libraries(std-udp-sync-tests + std-udp-sync-lib + gtest + ) + diff --git a/std-udp-sync/test/main.cpp b/std-udp-sync/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/std-udp-sync/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}