From 39d714f53829841452a93ac8c9429f9e4c8a7cba Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 14:02:57 +0100 Subject: [PATCH] Add jf-assembler executable This service reconstructs the various modules and sends out a image metadata stream for further consumers. --- CMakeLists.txt | 1 + jf-assembler/CMakeLists.txt | 21 ++ jf-assembler/README.md | 179 ++++++++++++++++++ jf-assembler/include/AssemblerStats.hpp | 28 +++ jf-assembler/include/ZmqPulseSyncReceiver.hpp | 34 ++++ jf-assembler/include/assembler_config.hpp | 14 ++ jf-assembler/src/AssemblerStats.cpp | 62 ++++++ jf-assembler/src/ZmqPulseSyncReceiver.cpp | 115 +++++++++++ jf-assembler/src/main.cpp | 50 +++++ jf-assembler/test/CMakeLists.txt | 7 + jf-assembler/test/main.cpp | 8 + 11 files changed, 519 insertions(+) create mode 100644 jf-assembler/CMakeLists.txt create mode 100644 jf-assembler/README.md create mode 100644 jf-assembler/include/AssemblerStats.hpp create mode 100644 jf-assembler/include/ZmqPulseSyncReceiver.hpp create mode 100644 jf-assembler/include/assembler_config.hpp create mode 100644 jf-assembler/src/AssemblerStats.cpp create mode 100644 jf-assembler/src/ZmqPulseSyncReceiver.cpp create mode 100644 jf-assembler/src/main.cpp create mode 100644 jf-assembler/test/CMakeLists.txt create mode 100644 jf-assembler/test/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1fd9a7e..c739d65 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,6 +31,7 @@ add_subdirectory( add_subdirectory("core-buffer") add_subdirectory("jf-udp-recv") add_subdirectory("jf-buffer-writer") +add_subdirectory("jf-assembler") add_subdirectory("sf-stream") add_subdirectory("sf-writer") #add_subdirectory("jf-live-writer") diff --git a/jf-assembler/CMakeLists.txt b/jf-assembler/CMakeLists.txt new file mode 100644 index 0000000..95e755c --- /dev/null +++ b/jf-assembler/CMakeLists.txt @@ -0,0 +1,21 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(jf-assembler-lib STATIC ${SOURCES}) +target_include_directories(jf-assembler-lib PUBLIC include/) +target_link_libraries(jf-assembler-lib + external + core-buffer-lib) + +add_executable(jf-assembler src/main.cpp) +set_target_properties(jf-assembler PROPERTIES OUTPUT_NAME jf_assembler) +target_link_libraries(jf-assembler + external + core-buffer-lib + jf-assembler-lib + zmq + pthread + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/jf-assembler/README.md b/jf-assembler/README.md new file mode 100644 index 0000000..ce21a4d --- /dev/null +++ b/jf-assembler/README.md @@ -0,0 +1,179 @@ +# sf-stream +sf-stream is the component that receives a live stream of frame data from +sf-buffers over ZMQ and assembles them into images. This images are then +sent again over ZMQ to external components. There is always only 1 sf-stream +per detector. + +It currently has 3 output streams: + +- **Full data full meta** rate stream (send all images and meta) +- **Reduced data full meta** rate stream (send less images, but +all meta) +- **Pulse_id** stream (send only the current pulse_id) + +In addition to receiving and assembling images, sf-stream also calculates +additional meta and constructs the structures needed to send data in +Array 1.0 protocol. + +This component does not guarantee that the streams will always contain all +the data - it can happen that frame resynchronization is needed, and in this +case 1 or more frames could potentially be lost. This happens so rarely that in +practice is not a problem. + +## Overview + +![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg) + +sf-stream is a single threaded application (without counting the ZMQ IO threads) +that is used for providing live assembled images to anyone willing to listen. + +In addition, it also provides a pulse_id stream, which is the most immediate +pulse_id feedback we currently have in case we need to synchronize external +components to the current machine pulse_id. + +## ZMQ receiving +Each ZMQ stream is coming from a separate sf-buffer. This means that we have as +many connections as we have modules in a detector. + +Messages are multipart (2 parts) and are received in PUB/SUB mode. + +There is no need for special synchronization between modules as we expect that +frames will always be in the correct order and all modules will provide the +same frame more or less at the same time. If any of this 2 conditions is not +met, the detector is not working properly and we cannot guaranty that sf-stream +will work correctly. + +Nonetheless we provide the capability to synchronize the streams in image +assembly phase - this is needed rarely, but occasionally happens. In this sort +of hiccups we usually loose only a couple of consecutive images. + +### Messages format +Each message is composed by 2 parts: + +- Serialization of ModuleFrame in the first part. +- Frame data in the second part. + +Module frame is defined as: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) +``` + +The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in +**uint16** representing the detector image. + +## Image assembly +We first synchronize the modules. We do this by reading all sockets and +deciding the largest frame pulse_id among them (max_pulse_id). We then calculate +the diff between a specific socket pulse_id and the max_pulse_id. +This difference tells us how many messages we need to discard from a specific socket. + +This discarding is the source of possible missing images in the output stream. +It can happen in 3 cases: + +- At least one of the detector modules did not sent any packets for the specific +pulse_id. +- All the packets from a specific module for a pulse_id were lost before UDP +receiving them. +- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was +dropped. + +All this 3 cases are highly unlikely, so synchronization is mostly needed when +first starting sf-stream. Different sockets connect to sf-buffers at different +times. Apart from the initial synchronization there should be no need to +re-synchronize modules in a healthy running environment. + +If an image is missing any ZMQ messages from sf-buffers (not all modules data +arrived), the image will be dropped. We do not do partial reconstruction in +sf-stream. However, it is important to note, that this does not cover the case +where frames are incomplete (missing UDP packets on sf-buffer) - we still +assemble this images as long as at least 1 packet/frame for a specific pulse_id +arrived. + +## ZMQ sending + +We devide the ZMQ sending to 3 types of stream: + +- Data processing stream. This is basically the complete stream from +the detector with all meta and data. It can be described as full data full +meta stream. Only 1 client at the time can be connected to this stream +(PUSH/PULL for load balancing). + +- Live viewing stream. This is a reduced data full meta stream. We send +meta for all frames, but data only for subset of them (10Hz, for example). +Any number of clients can connect to the 10Hz stream, because we use PUB/SUB +for this socket. + +- Pulse_id stream. This is a stream that sends out only the current pulse_id. +It can be used to synchronize any external system with the current pulse_id +being recorded. Multiple clients can connect to this stream. + +In the data processing and live viewing stream we use +[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md) +as our protocol to be compatible with currently available external components. + +We use following fields in the JSON header: + +| Name | Type | Comment | +| --- | --- | --- | +| pulse_id | uint64 |bunchid from detector header| +|frame|uint64|frame_index from detector header| +|is_good_frame|bool|true if all packets for this frame are present| +|daq_rec|uint32|daqrec from detector header| +|pedestal_file|string|Path to pedestal file| +|gain_file|string|Path to gain file| +|number_frames_expected|int|Number of expected frames| +|run_name|string|Name of the run| +|detector_name|string|Name of the detector| +|htype|string|Value: "array-1.0"| +|type|string|Value: "uint16"| +|shape|Array[uint64]|Shape of the image in stream| + +### Full data full meta stream + +This stream runs at detector frequency and uses PUSH/PULL to distribute data +to max 1 client (this client can have many processes, but it needs to be a +single logical entity, since the images are evenly distributed to all +connected sockets). + +![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg) + +The goal here is to provide a complete copy of the detector image stream +for purposes of online analysis. Given the large amount of data on this +stream only "pre-approved" applications that can handle the load should be +attached here. + +### Reduced data full meta stream + +This streams also runs at detector frequency for JSON headers (meta), but +it sends only part of the images in the stream. The rest of the images are +sent as empty buffers (the receiver needs to be aware of this behaviour, as +Array 1.0 alone does not define it). + +![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg) + +This is the lightweight version of the image stream. Any number of clients +can connect to this stream (PUB/SUB) but no client can do load +balancing automatically (it would require PUSH/PULL). + +This is a "public interface" for anyone who wants to get detector data live, +and can do with only a subset of images. + +### Pulse_id stream + +This stream runs ar detector frequency in PUB/SUB mode. The only thing it +does is sends out the pulse_id (of the just received image) in uint64_t +format. + +![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg) + +This is also a "public interface" for anyone who wants to get the current +system pulse_id. \ No newline at end of file diff --git a/jf-assembler/include/AssemblerStats.hpp b/jf-assembler/include/AssemblerStats.hpp new file mode 100644 index 0000000..a8267a6 --- /dev/null +++ b/jf-assembler/include/AssemblerStats.hpp @@ -0,0 +1,28 @@ +#ifndef SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP +#define SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP + +#include +#include +#include + +class AssemblerStats { + const std::string detector_name_; + const size_t stats_modulo_; + + int image_counter_; + int n_corrupted_images_; + int n_sync_lost_images_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + AssemblerStats(const std::string &detector_name, + const size_t stats_modulo); + + void record_stats(const ImageMetadata &meta, const uint32_t n_lost_pulses); +}; + + +#endif //SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP diff --git a/jf-assembler/include/ZmqPulseSyncReceiver.hpp b/jf-assembler/include/ZmqPulseSyncReceiver.hpp new file mode 100644 index 0000000..624de34 --- /dev/null +++ b/jf-assembler/include/ZmqPulseSyncReceiver.hpp @@ -0,0 +1,34 @@ +#ifndef SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP +#define SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP + + +#include +#include +#include + +#include "formats.hpp" + +struct PulseAndSync { + const uint64_t pulse_id; + const uint32_t n_lost_pulses; +}; + +class ZmqPulseSyncReceiver { + + void* ctx_; + const int n_modules_; + + std::vector sockets_; + +public: + ZmqPulseSyncReceiver( + void* ctx, + const std::string& detector_name, + const int n_modules); + ~ZmqPulseSyncReceiver(); + + PulseAndSync get_next_pulse_id() const; +}; + + +#endif //SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP diff --git a/jf-assembler/include/assembler_config.hpp b/jf-assembler/include/assembler_config.hpp new file mode 100644 index 0000000..b0e277d --- /dev/null +++ b/jf-assembler/include/assembler_config.hpp @@ -0,0 +1,14 @@ +namespace assembler_config +{ + // N of IO threads to send image metadata. + const int ASSEMBLER_ZMQ_IO_THREADS = 1; + + // If the modules are offset more than 1000 pulses, crush. + const uint64_t PULSE_OFFSET_LIMIT = 100; + + // Number of times we try to re-sync in case of failure. + const int SYNC_RETRY_LIMIT = 3; + + // Number of pulses between each statistics print out. + const size_t ASSEMBLER_STATS_MODULO = 1000; +} diff --git a/jf-assembler/src/AssemblerStats.cpp b/jf-assembler/src/AssemblerStats.cpp new file mode 100644 index 0000000..e295da6 --- /dev/null +++ b/jf-assembler/src/AssemblerStats.cpp @@ -0,0 +1,62 @@ +#include "AssemblerStats.hpp" + +#include + +using namespace std; +using namespace chrono; + +AssemblerStats::AssemblerStats( + const std::string &detector_name, + const size_t stats_modulo) : + detector_name_(detector_name), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void AssemblerStats::reset_counters() +{ + image_counter_ = 0; + n_sync_lost_images_ = 0; + n_corrupted_images_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void AssemblerStats::record_stats( + const ImageMetadata &meta, const uint32_t n_lost_pulses) +{ + image_counter_++; + n_sync_lost_images_ += n_lost_pulses; + + if (!meta.is_good_image) { + n_corrupted_images_++; + } + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void AssemblerStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_assembler"; + cout << ",detector_name=" << detector_name_; + cout << " "; + cout << "n_processed_images=" << image_counter_ << "i"; + cout << ",n_corrupted_images=" << n_corrupted_images_ << "i"; + cout << ",n_sync_lost_images=" << n_sync_lost_images_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} + diff --git a/jf-assembler/src/ZmqPulseSyncReceiver.cpp b/jf-assembler/src/ZmqPulseSyncReceiver.cpp new file mode 100644 index 0000000..6dbe2fc --- /dev/null +++ b/jf-assembler/src/ZmqPulseSyncReceiver.cpp @@ -0,0 +1,115 @@ +#include "ZmqPulseSyncReceiver.hpp" +#include "BufferUtils.hpp" + +#include +#include +#include +#include +#include +#include + +#include "assembler_config.hpp" + +using namespace std; +using namespace chrono; +using namespace buffer_config; +using namespace assembler_config; + + +ZmqPulseSyncReceiver::ZmqPulseSyncReceiver( + void * ctx, + const string& detector_name, + const int n_modules) : + ctx_(ctx), + n_modules_(n_modules) +{ + sockets_.reserve(n_modules_); + + for (int i=0; i::max();; + uint64_t max_pulse_id = 0; + + for (int i = 0; i < n_modules_; i++) { + min_pulse_id = min(min_pulse_id, pulses[i]); + max_pulse_id = max(max_pulse_id, pulses[i]); + } + + auto max_diff = max_pulse_id - min_pulse_id; + if (max_diff > PULSE_OFFSET_LIMIT) { + stringstream err_msg; + err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; + err_msg << " PULSE_OFFSET_LIMIT exceeded."; + err_msg << " max_diff=" << max_diff << " pulses."; + + for (int i = 0; i < n_modules_; i++) { + err_msg << " (module " << i << ", "; + err_msg << pulses[i] << "),"; + } + err_msg << endl; + + throw runtime_error(err_msg.str()); + } + + modules_in_sync = true; + // Max pulses we lost in this sync attempt. + uint32_t i_sync_lost_pulses = 0; + for (int i = 0; i < n_modules_; i++) { + // How many pulses we lost for this specific module. + uint32_t i_module_lost_pulses = 0; + while (pulses[i] < max_pulse_id) { + zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); + i_module_lost_pulses++; + } + + i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); + + if (pulses[i] != max_pulse_id) { + modules_in_sync = false; + } + } + n_lost_pulses += i_sync_lost_pulses; + + if (modules_in_sync) { + return {pulses[0], n_lost_pulses}; + } + } + + stringstream err_msg; + err_msg << "[ZmqLiveReceiver::get_next_pulse_id]"; + err_msg << " SYNC_RETRY_LIMIT exceeded."; + err_msg << endl; + + throw runtime_error(err_msg.str()); +} diff --git a/jf-assembler/src/main.cpp b/jf-assembler/src/main.cpp new file mode 100644 index 0000000..22ede02 --- /dev/null +++ b/jf-assembler/src/main.cpp @@ -0,0 +1,50 @@ +#include +#include +#include +#include +#include +#include + +#include "buffer_config.hpp" +#include "assembler_config.hpp" +#include "ZmqPulseSyncReceiver.hpp" + +using namespace std; +using namespace buffer_config; +using namespace assembler_config; + +int main (int argc, char *argv[]) +{ + if (argc != 2) { + cout << endl; + cout << "Usage: jf_assembler [detector_json_filename]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << endl; + + exit(-1); + } + + auto config = BufferUtils::read_json_config(string(argv[1])); + auto const stream_name = "assembler"; + + string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.detector_name + "-"; + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS); + + ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); + RamBuffer ram_buffer(config.detector_name, config.n_modules); + AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO); + + auto sender = BufferUtils::bind_socket( + ctx, config.detector_name, stream_name); + + ImageMetadata meta; + while (true) { + auto pulse_and_sync = receiver.get_next_pulse_id(); + ram_buffer.assemble_image(pulse_and_sync.pulse_id, meta); + + zmq_send(sender, &meta, sizeof(meta), 0); + + stats.record_stats(meta, pulse_and_sync.n_lost_pulses); + } +} diff --git a/jf-assembler/test/CMakeLists.txt b/jf-assembler/test/CMakeLists.txt new file mode 100644 index 0000000..bb240e7 --- /dev/null +++ b/jf-assembler/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(jf-assembler-tests main.cpp) + +target_link_libraries(jf-assembler-tests + jf-assembler-lib + gtest + ) + diff --git a/jf-assembler/test/main.cpp b/jf-assembler/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/jf-assembler/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}