From 0cf973802da0017d3775b680087b755f4b81786a Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Thu, 14 Jan 2021 15:45:41 +0100 Subject: [PATCH 01/21] Add stream stats modulo to config file --- sf-stream/include/stream_config.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sf-stream/include/stream_config.hpp b/sf-stream/include/stream_config.hpp index 9b7e693..4e4eec4 100644 --- a/sf-stream/include/stream_config.hpp +++ b/sf-stream/include/stream_config.hpp @@ -14,4 +14,7 @@ namespace stream_config const int PULSE_ZMQ_SNDHWM = 100; // Number of times we try to re-sync in case of failure. const int SYNC_RETRY_LIMIT = 3; + + // Number of pulses between each statistics print out. + const size_t STREAM_STATS_MODULO = 10000; } From 2937326dbf97ccf8b2fb69c034e0a4aa878144a2 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 15 Jan 2021 10:59:43 +0100 Subject: [PATCH 02/21] Return n_lost_pulses in sync from receiver In order to have proper statistics we need to return the number of lost pulses from the receiver. We cannot just count the gap in pulse_id as the beam might be operated at different frequencies. --- sf-stream/include/ZmqPulseSyncReceiver.hpp | 7 ++++++- sf-stream/src/ZmqPulseSyncReceiver.cpp | 18 +++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/sf-stream/include/ZmqPulseSyncReceiver.hpp b/sf-stream/include/ZmqPulseSyncReceiver.hpp index aa00035..624de34 100644 --- a/sf-stream/include/ZmqPulseSyncReceiver.hpp +++ b/sf-stream/include/ZmqPulseSyncReceiver.hpp @@ -8,6 +8,11 @@ #include "formats.hpp" +struct PulseAndSync { + const uint64_t pulse_id; + const uint32_t n_lost_pulses; +}; + class ZmqPulseSyncReceiver { void* ctx_; @@ -22,7 +27,7 @@ public: const int n_modules); ~ZmqPulseSyncReceiver(); - uint64_t get_next_pulse_id() const; + PulseAndSync get_next_pulse_id() const; }; diff --git a/sf-stream/src/ZmqPulseSyncReceiver.cpp b/sf-stream/src/ZmqPulseSyncReceiver.cpp index f309389..ead65ee 100644 --- a/sf-stream/src/ZmqPulseSyncReceiver.cpp +++ b/sf-stream/src/ZmqPulseSyncReceiver.cpp @@ -38,7 +38,7 @@ ZmqPulseSyncReceiver::~ZmqPulseSyncReceiver() } } -uint64_t ZmqPulseSyncReceiver::get_next_pulse_id() const +PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const { uint64_t pulses[n_modules_]; @@ -52,12 +52,12 @@ uint64_t ZmqPulseSyncReceiver::get_next_pulse_id() const } if (modules_in_sync) { - return pulses[0]; + return {pulses[0], 0}; } + // How many pulses we lost in total to get the next pulse_id. + uint32_t n_lost_pulses = 0; for (int i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) { - cout << "Sync attempt " << i_sync << endl; - uint64_t min_pulse_id = numeric_limits::max();; uint64_t max_pulse_id = 0; @@ -83,18 +83,26 @@ uint64_t ZmqPulseSyncReceiver::get_next_pulse_id() const } modules_in_sync = true; + // Max pulses we lost in this sync attempt. + uint32_t i_sync_lost_pulses = 0; for (int i = 0; i < n_modules_; i++) { + // How many pulses we lost for this specific module. + uint32_t i_module_lost_pulses = 0; while (pulses[i] < max_pulse_id) { zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); + i_module_lost_pulses++; } + i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); + if (pulses[i] != max_pulse_id) { modules_in_sync = false; } } + n_lost_pulses += i_sync_lost_pulses; if (modules_in_sync) { - return pulses[0]; + return {pulses[0], n_lost_pulses}; } } From f58ec4270c9be471f43bcaae67401648d4405589 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 15 Jan 2021 11:06:47 +0100 Subject: [PATCH 03/21] Implementation fo StreamStats for outputting Grafana metrics --- sf-stream/include/StreamStats.hpp | 30 ++++++++++++++ sf-stream/src/StreamStats.cpp | 65 +++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 sf-stream/include/StreamStats.hpp create mode 100644 sf-stream/src/StreamStats.cpp diff --git a/sf-stream/include/StreamStats.hpp b/sf-stream/include/StreamStats.hpp new file mode 100644 index 0000000..a91db57 --- /dev/null +++ b/sf-stream/include/StreamStats.hpp @@ -0,0 +1,30 @@ +#ifndef SF_DAQ_BUFFER_STREAMSTATS_HPP +#define SF_DAQ_BUFFER_STREAMSTATS_HPP + +#include +#include +#include + +class StreamStats { + const std::string detector_name_; + const std::string stream_name_; + const size_t stats_modulo_; + + int image_counter_; + int n_corrupted_images_; + int n_sync_lost_images_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + StreamStats(const std::string &detector_name, + const std::string &stream_name, + const size_t stats_modulo); + + void record_stats(const ImageMetadata &meta, const uint32_t n_lost_pulses); +}; + + +#endif //SF_DAQ_BUFFER_STREAMSTATS_HPP diff --git a/sf-stream/src/StreamStats.cpp b/sf-stream/src/StreamStats.cpp new file mode 100644 index 0000000..190f1bc --- /dev/null +++ b/sf-stream/src/StreamStats.cpp @@ -0,0 +1,65 @@ +#include "StreamStats.hpp" + +#include + +using namespace std; +using namespace chrono; + +StreamStats::StreamStats( + const std::string &detector_name, + const std::string &stream_name, + const size_t stats_modulo) : + detector_name_(detector_name), + stream_name_(stream_name), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void StreamStats::reset_counters() +{ + image_counter_ = 0; + n_sync_lost_images_ = 0; + n_corrupted_images_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void StreamStats::record_stats( + const ImageMetadata &meta, const uint32_t n_lost_pulses) +{ + image_counter_++; + n_sync_lost_images_ += n_lost_pulses; + + if (!meta.is_good_image) { + n_corrupted_images_++; + } + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void StreamStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "sf-stream"; + cout << ",detector_name=" << detector_name_; + cout << ",stream_name=" << stream_name_; + cout << " "; + cout << "n_processed_images=" << image_counter_ << "i"; + cout << ",n_corrupted_images=" << n_corrupted_images_ << "i"; + cout << ",n_sync_lost_images=" << n_sync_lost_images_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} + From 3ed5bcb33021573f0c8e820de0376dab3f8cdbc8 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 15 Jan 2021 11:07:31 +0100 Subject: [PATCH 04/21] Statistics for sf-stream --- sf-stream/src/main.cpp | 42 +++++++++++------------------------------- 1 file changed, 11 insertions(+), 31 deletions(-) diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index dbd588f..9ebb111 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "buffer_config.hpp" #include "stream_config.hpp" @@ -15,57 +16,36 @@ using namespace stream_config; int main (int argc, char *argv[]) { - if (argc != 2) { + if (argc != 3) { cout << endl; - cout << "Usage: sf_stream [detector_json_filename]" << endl; + cout << "Usage: sf_stream [detector_json_filename]" + " [stream_name]" << endl; cout << "\tdetector_json_filename: detector config file path." << endl; cout << endl; exit(-1); } + const auto stream_name = string(argv[2]); + // TODO: Add stream_name to config reading - multiple stream definitions. auto config = BufferUtils::read_json_config(string(argv[1])); - string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.detector_name + "-"; + string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.detector_name + "-"; auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); RamBuffer ram_buffer(config.detector_name, config.n_modules); + StreamStats stats(config.detector_name, stream_name, STREAM_STATS_MODULO); ZmqLiveSender sender(ctx, config); - // TODO: Remove stats trash. - uint64_t last_pulse_id = 0; - uint64_t last_pulse_id_range = 0; - uint16_t n_good_images = 0; - ImageMetadata meta; while (true) { - auto pulse_id = receiver.get_next_pulse_id(); - char* data = ram_buffer.read_image(pulse_id, meta); + auto pulse_and_sync = receiver.get_next_pulse_id(); + char* data = ram_buffer.read_image(pulse_and_sync.pulse_id, meta); sender.send(meta, data); - // TODO: This logic works only at 100Hz. Fix it systematically. - uint64_t sync_lost_pulses = pulse_id - last_pulse_id; - if (last_pulse_id > 0 && sync_lost_pulses > 1) { - cout << "sf_stream:sync_lost_pulses " << sync_lost_pulses << endl; - } - last_pulse_id = pulse_id; - - uint64_t curr_pulse_id_range = pulse_id / 10000; - if (last_pulse_id_range != curr_pulse_id_range) { - if (last_pulse_id_range > 0) { - cout << "sf_stream:n_good_images " << n_good_images; - cout << endl; - } - - last_pulse_id_range = curr_pulse_id_range; - n_good_images = 0; - } - - if (meta.is_good_image) { - n_good_images++; - } + stats.record_stats(meta, pulse_and_sync.n_lost_pulses); } } From ba9532591caf128d222040d277b0c394df609f04 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 15 Jan 2021 11:14:54 +0100 Subject: [PATCH 05/21] Add details about unexpected pulse_id in ram buffer error --- core-buffer/src/RamBuffer.cpp | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/core-buffer/src/RamBuffer.cpp b/core-buffer/src/RamBuffer.cpp index bcf614b..581ab73 100644 --- a/core-buffer/src/RamBuffer.cpp +++ b/core-buffer/src/RamBuffer.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include "RamBuffer.hpp" #include "buffer_config.hpp" @@ -110,7 +111,21 @@ char* RamBuffer::read_image(const uint64_t pulse_id, if (!is_pulse_init) { if (frame_meta->pulse_id != pulse_id) { - throw runtime_error("Wrong pulse_id in ram buffer slot."); + stringstream err_msg; + err_msg << "[RamBuffer::read_image]"; + err_msg << " Unexpected pulse_id in ram buffer."; + err_msg << " expected=" << pulse_id; + err_msg << " got=" << frame_meta->pulse_id; + + for (int i = 0; i < n_modules_; i++) { + ModuleFrame *meta = src_meta + i_module; + + err_msg << " (module " << i << ", "; + err_msg << meta->pulse_id << "),"; + } + err_msg << endl; + + throw runtime_error(err_msg.str()); } image_meta.pulse_id = frame_meta->pulse_id; From fec900ff63b77d565d5444136fa163303cc42ac1 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 15 Jan 2021 11:15:28 +0100 Subject: [PATCH 06/21] Increase the sf_stream output frequency to 10 seconds --- sf-stream/include/stream_config.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sf-stream/include/stream_config.hpp b/sf-stream/include/stream_config.hpp index 4e4eec4..8f8b977 100644 --- a/sf-stream/include/stream_config.hpp +++ b/sf-stream/include/stream_config.hpp @@ -16,5 +16,5 @@ namespace stream_config const int SYNC_RETRY_LIMIT = 3; // Number of pulses between each statistics print out. - const size_t STREAM_STATS_MODULO = 10000; + const size_t STREAM_STATS_MODULO = 1000; } From 0a3db103c5f4bbe44bc358a6989d8101bab28499 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 15 Jan 2021 11:18:07 +0100 Subject: [PATCH 07/21] Change stats names to use _ instead of - Using - is causing problems in Influx -> we move to use the executable name instead of the project name as the main identifier. --- jf-buffer-writer/src/BufferStats.cpp | 2 +- jf-udp-recv/src/FrameStats.cpp | 2 +- sf-stream/src/StreamStats.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/jf-buffer-writer/src/BufferStats.cpp b/jf-buffer-writer/src/BufferStats.cpp index 325e930..173a35c 100644 --- a/jf-buffer-writer/src/BufferStats.cpp +++ b/jf-buffer-writer/src/BufferStats.cpp @@ -51,7 +51,7 @@ void BufferStats::print_stats() system_clock::now()).time_since_epoch().count(); // Output in InfluxDB line protocol - cout << "jf-buffer-writer"; + cout << "jf_buffer_writer"; cout << ",detector_name=" << detector_name_; cout << ",module_name=M" << module_id_; cout << " "; diff --git a/jf-udp-recv/src/FrameStats.cpp b/jf-udp-recv/src/FrameStats.cpp index 8beb291..103e4fe 100644 --- a/jf-udp-recv/src/FrameStats.cpp +++ b/jf-udp-recv/src/FrameStats.cpp @@ -48,7 +48,7 @@ void FrameStats::print_stats() system_clock::now()).time_since_epoch().count(); // Output in InfluxDB line protocol - cout << "jf-udp-recv"; + cout << "jf_udp_recv"; cout << ",detector_name=" << detector_name_; cout << ",module_name=M" << module_id_; cout << " "; diff --git a/sf-stream/src/StreamStats.cpp b/sf-stream/src/StreamStats.cpp index 190f1bc..bed448d 100644 --- a/sf-stream/src/StreamStats.cpp +++ b/sf-stream/src/StreamStats.cpp @@ -50,7 +50,7 @@ void StreamStats::print_stats() system_clock::now()).time_since_epoch().count(); // Output in InfluxDB line protocol - cout << "sf-stream"; + cout << "sf_stream"; cout << ",detector_name=" << detector_name_; cout << ",stream_name=" << stream_name_; cout << " "; From e013c4a6f8fca49e2fa740ece69fdbef284f11e9 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 15 Jan 2021 18:18:54 +0100 Subject: [PATCH 08/21] Increase number of packets read at once to 128 --- core-buffer/include/buffer_config.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index 73c97b5..293308b 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -28,7 +28,7 @@ namespace buffer_config { const size_t BUFFER_BLOCK_SIZE = 100; - const size_t BUFFER_UDP_N_RECV_MSG = 64; + const size_t BUFFER_UDP_N_RECV_MSG = 128; // Size of UDP recv buffer const int BUFFER_UDP_RCVBUF_N_SLOTS = 100; // 8246 bytes for each UDP packet. From 20bf00cbbec4c164f2ca50f5da2b29dcfbc7b572 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 10:52:18 +0100 Subject: [PATCH 09/21] Adjust RamBuffer test --- core-buffer/test/test_RamBuffer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core-buffer/test/test_RamBuffer.cpp b/core-buffer/test/test_RamBuffer.cpp index b4b8ddb..55c3a7c 100644 --- a/core-buffer/test/test_RamBuffer.cpp +++ b/core-buffer/test/test_RamBuffer.cpp @@ -25,7 +25,7 @@ TEST(RamBuffer, simple_store) for (int i_module=0; i_module Date: Tue, 19 Jan 2021 10:56:14 +0100 Subject: [PATCH 10/21] Add missing link library to sf_writer --- sf-writer/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/sf-writer/CMakeLists.txt b/sf-writer/CMakeLists.txt index dc87ad2..5b520f5 100644 --- a/sf-writer/CMakeLists.txt +++ b/sf-writer/CMakeLists.txt @@ -11,6 +11,7 @@ add_executable(sf-writer src/main.cpp) set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer) target_link_libraries(sf-writer sf-writer-lib + zmq hdf5 hdf5_hl hdf5_cpp From ccca4225d40ed68b7e3ec34265f678744f4d881e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 12:59:50 +0100 Subject: [PATCH 11/21] Change bind and connect socket helper functions to take string --- core-buffer/include/BufferUtils.hpp | 9 +++++++-- core-buffer/src/BufferUtils.cpp | 8 ++++---- jf-buffer-writer/src/main.cpp | 3 ++- jf-udp-recv/src/main.cpp | 2 +- sf-stream/src/ZmqPulseSyncReceiver.cpp | 2 +- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/core-buffer/include/BufferUtils.hpp b/core-buffer/include/BufferUtils.hpp index bb09dea..18a3fe5 100644 --- a/core-buffer/include/BufferUtils.hpp +++ b/core-buffer/include/BufferUtils.hpp @@ -36,9 +36,14 @@ namespace BufferUtils void create_destination_folder(const std::string& output_file); void* bind_socket( - void* ctx, const std::string& detector_name, const int source_id); + void* ctx, + const std::string& detector_name, + const std::string& stream_name); + void* connect_socket( - void* ctx, const std::string& detector_name, const int source_id); + void* ctx, + const std::string& detector_name, + const std::string& stream_name); DetectorConfig read_json_config(const std::string& filename); } diff --git a/core-buffer/src/BufferUtils.cpp b/core-buffer/src/BufferUtils.cpp index 4e861df..147d4e8 100644 --- a/core-buffer/src/BufferUtils.cpp +++ b/core-buffer/src/BufferUtils.cpp @@ -69,11 +69,11 @@ void BufferUtils::create_destination_folder(const string& output_file) } void* BufferUtils::connect_socket( - void* ctx, const string& detector_name, const int source_id) + void* ctx, const string& detector_name, const string& stream_name) { string ipc_address = BUFFER_LIVE_IPC_URL + detector_name + "-" + - to_string(source_id); + stream_name; void* socket = zmq_socket(ctx, ZMQ_SUB); if (socket == nullptr) { @@ -102,11 +102,11 @@ void* BufferUtils::connect_socket( } void* BufferUtils::bind_socket( - void* ctx, const string& detector_name, const int source_id) + void* ctx, const string& detector_name, const string& stream_name) { string ipc_address = BUFFER_LIVE_IPC_URL + detector_name + "-" + - to_string(source_id); + stream_name; void* socket = zmq_socket(ctx, ZMQ_PUB); diff --git a/jf-buffer-writer/src/main.cpp b/jf-buffer-writer/src/main.cpp index e1eb961..cd65986 100644 --- a/jf-buffer-writer/src/main.cpp +++ b/jf-buffer-writer/src/main.cpp @@ -39,7 +39,8 @@ int main (int argc, char *argv[]) { BufferStats stats(config.detector_name, module_id, STATS_MODULO); auto ctx = zmq_ctx_new(); - auto socket = connect_socket(ctx, config.detector_name, module_id); + auto socket = connect_socket( + ctx, config.detector_name, to_string(module_id)); auto file_buff = new BufferBinaryFormat(); uint64_t pulse_id; diff --git a/jf-udp-recv/src/main.cpp b/jf-udp-recv/src/main.cpp index 49668a1..2194b80 100644 --- a/jf-udp-recv/src/main.cpp +++ b/jf-udp-recv/src/main.cpp @@ -36,7 +36,7 @@ int main (int argc, char *argv[]) { FrameStats stats(config.detector_name, module_id, STATS_MODULO); auto ctx = zmq_ctx_new(); - auto socket = bind_socket(ctx, config.detector_name, module_id); + auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); ModuleFrame meta; char* data = new char[MODULE_N_BYTES]; diff --git a/sf-stream/src/ZmqPulseSyncReceiver.cpp b/sf-stream/src/ZmqPulseSyncReceiver.cpp index ead65ee..96221f3 100644 --- a/sf-stream/src/ZmqPulseSyncReceiver.cpp +++ b/sf-stream/src/ZmqPulseSyncReceiver.cpp @@ -27,7 +27,7 @@ ZmqPulseSyncReceiver::ZmqPulseSyncReceiver( for (int i=0; i Date: Tue, 19 Jan 2021 13:22:20 +0100 Subject: [PATCH 12/21] Separate image assembly and image retrieval in RamBuffer --- core-buffer/include/RamBuffer.hpp | 4 +++- core-buffer/src/RamBuffer.cpp | 14 ++++++++------ core-buffer/test/test_RamBuffer.cpp | 2 +- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/core-buffer/include/RamBuffer.hpp b/core-buffer/include/RamBuffer.hpp index 45b67a1..91872cb 100644 --- a/core-buffer/include/RamBuffer.hpp +++ b/core-buffer/include/RamBuffer.hpp @@ -30,7 +30,9 @@ public: const uint64_t module_id, ModuleFrame &meta, char *data) const; - char* read_image(const uint64_t pulse_id, ImageMetadata &image_meta) const; + char* read_image(const uint64_t pulse_id) const; + void assemble_image( + const uint64_t pulse_id, ImageMetadata &image_meta) const; }; diff --git a/core-buffer/src/RamBuffer.cpp b/core-buffer/src/RamBuffer.cpp index 581ab73..ff14011 100644 --- a/core-buffer/src/RamBuffer.cpp +++ b/core-buffer/src/RamBuffer.cpp @@ -85,15 +85,12 @@ void RamBuffer::read_frame( memcpy(dst_data, src_data, MODULE_N_BYTES); } -char* RamBuffer::read_image(const uint64_t pulse_id, - ImageMetadata &image_meta) const +void RamBuffer::assemble_image( + const uint64_t pulse_id, ImageMetadata &image_meta) const { const size_t slot_n = pulse_id % n_slots_; - ModuleFrame *src_meta = meta_buffer_ + (n_modules_ * slot_n); - char *src_data = image_buffer_ + (image_bytes_ * slot_n); - auto is_pulse_init = false; auto is_good_image = true; @@ -158,7 +155,12 @@ char* RamBuffer::read_image(const uint64_t pulse_id, image_meta.frame_index = 0; image_meta.daq_rec = 0; } +} + +char* RamBuffer::read_image(const uint64_t pulse_id) const +{ + const size_t slot_n = pulse_id % n_slots_; + char *src_data = image_buffer_ + (image_bytes_ * slot_n); return src_data; } - diff --git a/core-buffer/test/test_RamBuffer.cpp b/core-buffer/test/test_RamBuffer.cpp index 55c3a7c..d28dd81 100644 --- a/core-buffer/test/test_RamBuffer.cpp +++ b/core-buffer/test/test_RamBuffer.cpp @@ -29,7 +29,7 @@ TEST(RamBuffer, simple_store) } ImageMetadata image_meta; - buffer.read_image(frame_meta.pulse_id, image_meta); + buffer.assemble_image(frame_meta.pulse_id, image_meta); ASSERT_EQ(image_meta.pulse_id, frame_meta.pulse_id); ASSERT_EQ(image_meta.daq_rec, frame_meta.daq_rec); ASSERT_EQ(image_meta.frame_index, frame_meta.frame_index); From 65fb6c929f1d87693287c823b4158daaee39691d Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 13:40:16 +0100 Subject: [PATCH 13/21] Remove hdf5 linking to core-buffer --- core-buffer/test/CMakeLists.txt | 2 -- 1 file changed, 2 deletions(-) diff --git a/core-buffer/test/CMakeLists.txt b/core-buffer/test/CMakeLists.txt index bf61f07..aa1173a 100644 --- a/core-buffer/test/CMakeLists.txt +++ b/core-buffer/test/CMakeLists.txt @@ -4,7 +4,5 @@ target_link_libraries(core-buffer-tests core-buffer-lib external rt - hdf5 - hdf5_cpp zmq gtest) From 39d714f53829841452a93ac8c9429f9e4c8a7cba Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 14:02:57 +0100 Subject: [PATCH 14/21] Add jf-assembler executable This service reconstructs the various modules and sends out a image metadata stream for further consumers. --- CMakeLists.txt | 1 + jf-assembler/CMakeLists.txt | 21 ++ jf-assembler/README.md | 179 ++++++++++++++++++ jf-assembler/include/AssemblerStats.hpp | 28 +++ jf-assembler/include/ZmqPulseSyncReceiver.hpp | 34 ++++ jf-assembler/include/assembler_config.hpp | 14 ++ jf-assembler/src/AssemblerStats.cpp | 62 ++++++ jf-assembler/src/ZmqPulseSyncReceiver.cpp | 115 +++++++++++ jf-assembler/src/main.cpp | 50 +++++ jf-assembler/test/CMakeLists.txt | 7 + jf-assembler/test/main.cpp | 8 + 11 files changed, 519 insertions(+) create mode 100644 jf-assembler/CMakeLists.txt create mode 100644 jf-assembler/README.md create mode 100644 jf-assembler/include/AssemblerStats.hpp create mode 100644 jf-assembler/include/ZmqPulseSyncReceiver.hpp create mode 100644 jf-assembler/include/assembler_config.hpp create mode 100644 jf-assembler/src/AssemblerStats.cpp create mode 100644 jf-assembler/src/ZmqPulseSyncReceiver.cpp create mode 100644 jf-assembler/src/main.cpp create mode 100644 jf-assembler/test/CMakeLists.txt create mode 100644 jf-assembler/test/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1fd9a7e..c739d65 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,6 +31,7 @@ add_subdirectory( add_subdirectory("core-buffer") add_subdirectory("jf-udp-recv") add_subdirectory("jf-buffer-writer") +add_subdirectory("jf-assembler") add_subdirectory("sf-stream") add_subdirectory("sf-writer") #add_subdirectory("jf-live-writer") diff --git a/jf-assembler/CMakeLists.txt b/jf-assembler/CMakeLists.txt new file mode 100644 index 0000000..95e755c --- /dev/null +++ b/jf-assembler/CMakeLists.txt @@ -0,0 +1,21 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(jf-assembler-lib STATIC ${SOURCES}) +target_include_directories(jf-assembler-lib PUBLIC include/) +target_link_libraries(jf-assembler-lib + external + core-buffer-lib) + +add_executable(jf-assembler src/main.cpp) +set_target_properties(jf-assembler PROPERTIES OUTPUT_NAME jf_assembler) +target_link_libraries(jf-assembler + external + core-buffer-lib + jf-assembler-lib + zmq + pthread + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/jf-assembler/README.md b/jf-assembler/README.md new file mode 100644 index 0000000..ce21a4d --- /dev/null +++ b/jf-assembler/README.md @@ -0,0 +1,179 @@ +# sf-stream +sf-stream is the component that receives a live stream of frame data from +sf-buffers over ZMQ and assembles them into images. This images are then +sent again over ZMQ to external components. There is always only 1 sf-stream +per detector. + +It currently has 3 output streams: + +- **Full data full meta** rate stream (send all images and meta) +- **Reduced data full meta** rate stream (send less images, but +all meta) +- **Pulse_id** stream (send only the current pulse_id) + +In addition to receiving and assembling images, sf-stream also calculates +additional meta and constructs the structures needed to send data in +Array 1.0 protocol. + +This component does not guarantee that the streams will always contain all +the data - it can happen that frame resynchronization is needed, and in this +case 1 or more frames could potentially be lost. This happens so rarely that in +practice is not a problem. + +## Overview + +![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg) + +sf-stream is a single threaded application (without counting the ZMQ IO threads) +that is used for providing live assembled images to anyone willing to listen. + +In addition, it also provides a pulse_id stream, which is the most immediate +pulse_id feedback we currently have in case we need to synchronize external +components to the current machine pulse_id. + +## ZMQ receiving +Each ZMQ stream is coming from a separate sf-buffer. This means that we have as +many connections as we have modules in a detector. + +Messages are multipart (2 parts) and are received in PUB/SUB mode. + +There is no need for special synchronization between modules as we expect that +frames will always be in the correct order and all modules will provide the +same frame more or less at the same time. If any of this 2 conditions is not +met, the detector is not working properly and we cannot guaranty that sf-stream +will work correctly. + +Nonetheless we provide the capability to synchronize the streams in image +assembly phase - this is needed rarely, but occasionally happens. In this sort +of hiccups we usually loose only a couple of consecutive images. + +### Messages format +Each message is composed by 2 parts: + +- Serialization of ModuleFrame in the first part. +- Frame data in the second part. + +Module frame is defined as: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) +``` + +The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in +**uint16** representing the detector image. + +## Image assembly +We first synchronize the modules. We do this by reading all sockets and +deciding the largest frame pulse_id among them (max_pulse_id). We then calculate +the diff between a specific socket pulse_id and the max_pulse_id. +This difference tells us how many messages we need to discard from a specific socket. + +This discarding is the source of possible missing images in the output stream. +It can happen in 3 cases: + +- At least one of the detector modules did not sent any packets for the specific +pulse_id. +- All the packets from a specific module for a pulse_id were lost before UDP +receiving them. +- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was +dropped. + +All this 3 cases are highly unlikely, so synchronization is mostly needed when +first starting sf-stream. Different sockets connect to sf-buffers at different +times. Apart from the initial synchronization there should be no need to +re-synchronize modules in a healthy running environment. + +If an image is missing any ZMQ messages from sf-buffers (not all modules data +arrived), the image will be dropped. We do not do partial reconstruction in +sf-stream. However, it is important to note, that this does not cover the case +where frames are incomplete (missing UDP packets on sf-buffer) - we still +assemble this images as long as at least 1 packet/frame for a specific pulse_id +arrived. + +## ZMQ sending + +We devide the ZMQ sending to 3 types of stream: + +- Data processing stream. This is basically the complete stream from +the detector with all meta and data. It can be described as full data full +meta stream. Only 1 client at the time can be connected to this stream +(PUSH/PULL for load balancing). + +- Live viewing stream. This is a reduced data full meta stream. We send +meta for all frames, but data only for subset of them (10Hz, for example). +Any number of clients can connect to the 10Hz stream, because we use PUB/SUB +for this socket. + +- Pulse_id stream. This is a stream that sends out only the current pulse_id. +It can be used to synchronize any external system with the current pulse_id +being recorded. Multiple clients can connect to this stream. + +In the data processing and live viewing stream we use +[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md) +as our protocol to be compatible with currently available external components. + +We use following fields in the JSON header: + +| Name | Type | Comment | +| --- | --- | --- | +| pulse_id | uint64 |bunchid from detector header| +|frame|uint64|frame_index from detector header| +|is_good_frame|bool|true if all packets for this frame are present| +|daq_rec|uint32|daqrec from detector header| +|pedestal_file|string|Path to pedestal file| +|gain_file|string|Path to gain file| +|number_frames_expected|int|Number of expected frames| +|run_name|string|Name of the run| +|detector_name|string|Name of the detector| +|htype|string|Value: "array-1.0"| +|type|string|Value: "uint16"| +|shape|Array[uint64]|Shape of the image in stream| + +### Full data full meta stream + +This stream runs at detector frequency and uses PUSH/PULL to distribute data +to max 1 client (this client can have many processes, but it needs to be a +single logical entity, since the images are evenly distributed to all +connected sockets). + +![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg) + +The goal here is to provide a complete copy of the detector image stream +for purposes of online analysis. Given the large amount of data on this +stream only "pre-approved" applications that can handle the load should be +attached here. + +### Reduced data full meta stream + +This streams also runs at detector frequency for JSON headers (meta), but +it sends only part of the images in the stream. The rest of the images are +sent as empty buffers (the receiver needs to be aware of this behaviour, as +Array 1.0 alone does not define it). + +![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg) + +This is the lightweight version of the image stream. Any number of clients +can connect to this stream (PUB/SUB) but no client can do load +balancing automatically (it would require PUSH/PULL). + +This is a "public interface" for anyone who wants to get detector data live, +and can do with only a subset of images. + +### Pulse_id stream + +This stream runs ar detector frequency in PUB/SUB mode. The only thing it +does is sends out the pulse_id (of the just received image) in uint64_t +format. + +![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg) + +This is also a "public interface" for anyone who wants to get the current +system pulse_id. \ No newline at end of file diff --git a/jf-assembler/include/AssemblerStats.hpp b/jf-assembler/include/AssemblerStats.hpp new file mode 100644 index 0000000..a8267a6 --- /dev/null +++ b/jf-assembler/include/AssemblerStats.hpp @@ -0,0 +1,28 @@ +#ifndef SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP +#define SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP + +#include +#include +#include + +class AssemblerStats { + const std::string detector_name_; + const size_t stats_modulo_; + + int image_counter_; + int n_corrupted_images_; + int n_sync_lost_images_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + AssemblerStats(const std::string &detector_name, + const size_t stats_modulo); + + void record_stats(const ImageMetadata &meta, const uint32_t n_lost_pulses); +}; + + +#endif //SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP diff --git a/jf-assembler/include/ZmqPulseSyncReceiver.hpp b/jf-assembler/include/ZmqPulseSyncReceiver.hpp new file mode 100644 index 0000000..624de34 --- /dev/null +++ b/jf-assembler/include/ZmqPulseSyncReceiver.hpp @@ -0,0 +1,34 @@ +#ifndef SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP +#define SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP + + +#include +#include +#include + +#include "formats.hpp" + +struct PulseAndSync { + const uint64_t pulse_id; + const uint32_t n_lost_pulses; +}; + +class ZmqPulseSyncReceiver { + + void* ctx_; + const int n_modules_; + + std::vector sockets_; + +public: + ZmqPulseSyncReceiver( + void* ctx, + const std::string& detector_name, + const int n_modules); + ~ZmqPulseSyncReceiver(); + + PulseAndSync get_next_pulse_id() const; +}; + + +#endif //SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP diff --git a/jf-assembler/include/assembler_config.hpp b/jf-assembler/include/assembler_config.hpp new file mode 100644 index 0000000..b0e277d --- /dev/null +++ b/jf-assembler/include/assembler_config.hpp @@ -0,0 +1,14 @@ +namespace assembler_config +{ + // N of IO threads to send image metadata. + const int ASSEMBLER_ZMQ_IO_THREADS = 1; + + // If the modules are offset more than 1000 pulses, crush. + const uint64_t PULSE_OFFSET_LIMIT = 100; + + // Number of times we try to re-sync in case of failure. + const int SYNC_RETRY_LIMIT = 3; + + // Number of pulses between each statistics print out. + const size_t ASSEMBLER_STATS_MODULO = 1000; +} diff --git a/jf-assembler/src/AssemblerStats.cpp b/jf-assembler/src/AssemblerStats.cpp new file mode 100644 index 0000000..e295da6 --- /dev/null +++ b/jf-assembler/src/AssemblerStats.cpp @@ -0,0 +1,62 @@ +#include "AssemblerStats.hpp" + +#include + +using namespace std; +using namespace chrono; + +AssemblerStats::AssemblerStats( + const std::string &detector_name, + const size_t stats_modulo) : + detector_name_(detector_name), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void AssemblerStats::reset_counters() +{ + image_counter_ = 0; + n_sync_lost_images_ = 0; + n_corrupted_images_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void AssemblerStats::record_stats( + const ImageMetadata &meta, const uint32_t n_lost_pulses) +{ + image_counter_++; + n_sync_lost_images_ += n_lost_pulses; + + if (!meta.is_good_image) { + n_corrupted_images_++; + } + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void AssemblerStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_assembler"; + cout << ",detector_name=" << detector_name_; + cout << " "; + cout << "n_processed_images=" << image_counter_ << "i"; + cout << ",n_corrupted_images=" << n_corrupted_images_ << "i"; + cout << ",n_sync_lost_images=" << n_sync_lost_images_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} + diff --git a/jf-assembler/src/ZmqPulseSyncReceiver.cpp b/jf-assembler/src/ZmqPulseSyncReceiver.cpp new file mode 100644 index 0000000..6dbe2fc --- /dev/null +++ b/jf-assembler/src/ZmqPulseSyncReceiver.cpp @@ -0,0 +1,115 @@ +#include "ZmqPulseSyncReceiver.hpp" +#include "BufferUtils.hpp" + +#include +#include +#include +#include +#include +#include + +#include "assembler_config.hpp" + +using namespace std; +using namespace chrono; +using namespace buffer_config; +using namespace assembler_config; + + +ZmqPulseSyncReceiver::ZmqPulseSyncReceiver( + void * ctx, + const string& detector_name, + const int n_modules) : + ctx_(ctx), + n_modules_(n_modules) +{ + sockets_.reserve(n_modules_); + + for (int i=0; i::max();; + uint64_t max_pulse_id = 0; + + for (int i = 0; i < n_modules_; i++) { + min_pulse_id = min(min_pulse_id, pulses[i]); + max_pulse_id = max(max_pulse_id, pulses[i]); + } + + auto max_diff = max_pulse_id - min_pulse_id; + if (max_diff > PULSE_OFFSET_LIMIT) { + stringstream err_msg; + err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; + err_msg << " PULSE_OFFSET_LIMIT exceeded."; + err_msg << " max_diff=" << max_diff << " pulses."; + + for (int i = 0; i < n_modules_; i++) { + err_msg << " (module " << i << ", "; + err_msg << pulses[i] << "),"; + } + err_msg << endl; + + throw runtime_error(err_msg.str()); + } + + modules_in_sync = true; + // Max pulses we lost in this sync attempt. + uint32_t i_sync_lost_pulses = 0; + for (int i = 0; i < n_modules_; i++) { + // How many pulses we lost for this specific module. + uint32_t i_module_lost_pulses = 0; + while (pulses[i] < max_pulse_id) { + zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); + i_module_lost_pulses++; + } + + i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); + + if (pulses[i] != max_pulse_id) { + modules_in_sync = false; + } + } + n_lost_pulses += i_sync_lost_pulses; + + if (modules_in_sync) { + return {pulses[0], n_lost_pulses}; + } + } + + stringstream err_msg; + err_msg << "[ZmqLiveReceiver::get_next_pulse_id]"; + err_msg << " SYNC_RETRY_LIMIT exceeded."; + err_msg << endl; + + throw runtime_error(err_msg.str()); +} diff --git a/jf-assembler/src/main.cpp b/jf-assembler/src/main.cpp new file mode 100644 index 0000000..22ede02 --- /dev/null +++ b/jf-assembler/src/main.cpp @@ -0,0 +1,50 @@ +#include +#include +#include +#include +#include +#include + +#include "buffer_config.hpp" +#include "assembler_config.hpp" +#include "ZmqPulseSyncReceiver.hpp" + +using namespace std; +using namespace buffer_config; +using namespace assembler_config; + +int main (int argc, char *argv[]) +{ + if (argc != 2) { + cout << endl; + cout << "Usage: jf_assembler [detector_json_filename]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << endl; + + exit(-1); + } + + auto config = BufferUtils::read_json_config(string(argv[1])); + auto const stream_name = "assembler"; + + string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.detector_name + "-"; + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS); + + ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); + RamBuffer ram_buffer(config.detector_name, config.n_modules); + AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO); + + auto sender = BufferUtils::bind_socket( + ctx, config.detector_name, stream_name); + + ImageMetadata meta; + while (true) { + auto pulse_and_sync = receiver.get_next_pulse_id(); + ram_buffer.assemble_image(pulse_and_sync.pulse_id, meta); + + zmq_send(sender, &meta, sizeof(meta), 0); + + stats.record_stats(meta, pulse_and_sync.n_lost_pulses); + } +} diff --git a/jf-assembler/test/CMakeLists.txt b/jf-assembler/test/CMakeLists.txt new file mode 100644 index 0000000..bb240e7 --- /dev/null +++ b/jf-assembler/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(jf-assembler-tests main.cpp) + +target_link_libraries(jf-assembler-tests + jf-assembler-lib + gtest + ) + diff --git a/jf-assembler/test/main.cpp b/jf-assembler/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/jf-assembler/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From 53e24c9f1a8316eb4705733aeaac4a83f42361df Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 14:06:57 +0100 Subject: [PATCH 15/21] Reorganize assembler main code --- jf-assembler/src/main.cpp | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/jf-assembler/src/main.cpp b/jf-assembler/src/main.cpp index 22ede02..e1b76ab 100644 --- a/jf-assembler/src/main.cpp +++ b/jf-assembler/src/main.cpp @@ -5,7 +5,6 @@ #include #include -#include "buffer_config.hpp" #include "assembler_config.hpp" #include "ZmqPulseSyncReceiver.hpp" @@ -27,17 +26,15 @@ int main (int argc, char *argv[]) auto config = BufferUtils::read_json_config(string(argv[1])); auto const stream_name = "assembler"; - string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.detector_name + "-"; auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS); + auto sender = BufferUtils::bind_socket( + ctx, config.detector_name, stream_name); ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); RamBuffer ram_buffer(config.detector_name, config.n_modules); AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO); - auto sender = BufferUtils::bind_socket( - ctx, config.detector_name, stream_name); - ImageMetadata meta; while (true) { auto pulse_and_sync = receiver.get_next_pulse_id(); From d3d51d2957298ea76b90b4c864e380ebf289ba37 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 16:57:41 +0100 Subject: [PATCH 16/21] Adjust stats to new stream workings --- sf-stream/include/StreamStats.hpp | 3 +-- sf-stream/src/StreamStats.cpp | 5 +---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/sf-stream/include/StreamStats.hpp b/sf-stream/include/StreamStats.hpp index a91db57..bca5ce0 100644 --- a/sf-stream/include/StreamStats.hpp +++ b/sf-stream/include/StreamStats.hpp @@ -12,7 +12,6 @@ class StreamStats { int image_counter_; int n_corrupted_images_; - int n_sync_lost_images_; std::chrono::time_point stats_interval_start_; void reset_counters(); @@ -23,7 +22,7 @@ public: const std::string &stream_name, const size_t stats_modulo); - void record_stats(const ImageMetadata &meta, const uint32_t n_lost_pulses); + void record_stats(const ImageMetadata &meta); }; diff --git a/sf-stream/src/StreamStats.cpp b/sf-stream/src/StreamStats.cpp index bed448d..7408629 100644 --- a/sf-stream/src/StreamStats.cpp +++ b/sf-stream/src/StreamStats.cpp @@ -19,16 +19,14 @@ StreamStats::StreamStats( void StreamStats::reset_counters() { image_counter_ = 0; - n_sync_lost_images_ = 0; n_corrupted_images_ = 0; stats_interval_start_ = steady_clock::now(); } void StreamStats::record_stats( - const ImageMetadata &meta, const uint32_t n_lost_pulses) + const ImageMetadata &meta) { image_counter_++; - n_sync_lost_images_ += n_lost_pulses; if (!meta.is_good_image) { n_corrupted_images_++; @@ -56,7 +54,6 @@ void StreamStats::print_stats() cout << " "; cout << "n_processed_images=" << image_counter_ << "i"; cout << ",n_corrupted_images=" << n_corrupted_images_ << "i"; - cout << ",n_sync_lost_images=" << n_sync_lost_images_ << "i"; cout << ",repetition_rate=" << rep_rate << "i"; cout << " "; cout << timestamp; From 402a2b978d002bbfa41d7e435e23f7ce77f9f90e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 16:58:07 +0100 Subject: [PATCH 17/21] Rework sf-stream to attach to assembler --- sf-stream/src/main.cpp | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index 9ebb111..dbe313a 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -5,10 +5,8 @@ #include #include -#include "buffer_config.hpp" #include "stream_config.hpp" #include "ZmqLiveSender.hpp" -#include "ZmqPulseSyncReceiver.hpp" using namespace std; using namespace buffer_config; @@ -30,22 +28,22 @@ int main (int argc, char *argv[]) // TODO: Add stream_name to config reading - multiple stream definitions. auto config = BufferUtils::read_json_config(string(argv[1])); - string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.detector_name + "-"; auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); + auto receiver = BufferUtils::connect_socket( + ctx, config.detector_name, "assembler"); - ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); RamBuffer ram_buffer(config.detector_name, config.n_modules); StreamStats stats(config.detector_name, stream_name, STREAM_STATS_MODULO); ZmqLiveSender sender(ctx, config); ImageMetadata meta; while (true) { - auto pulse_and_sync = receiver.get_next_pulse_id(); - char* data = ram_buffer.read_image(pulse_and_sync.pulse_id, meta); + zmq_recv(receiver, &meta, sizeof(meta), 0); + char* data = ram_buffer.read_image(meta.pulse_id); sender.send(meta, data); - stats.record_stats(meta, pulse_and_sync.n_lost_pulses); + stats.record_stats(meta); } } From 6bbbd734c7f0ec9901a4e4dc3929d477bf635ceb Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 17:20:06 +0100 Subject: [PATCH 18/21] Add jf-live-writer First implementation of the image buffer writer for the Jungfrau --- CMakeLists.txt | 2 +- jf-live-writer/include/BinaryReader.hpp | 28 --- jf-live-writer/include/BufferStats.hpp | 32 +++ jf-live-writer/include/JFH5LiveWriter.hpp | 49 ----- jf-live-writer/include/LiveImageAssembler.hpp | 51 ----- jf-live-writer/include/live_writer_config.hpp | 6 +- jf-live-writer/src/BinaryReader.cpp | 102 --------- jf-live-writer/src/BufferStats.cpp | 63 ++++++ jf-live-writer/src/JFH5LiveWriter.cpp | 133 ------------ jf-live-writer/src/LiveImageAssembler.cpp | 159 -------------- jf-live-writer/src/main.cpp | 195 +++--------------- 11 files changed, 121 insertions(+), 699 deletions(-) delete mode 100644 jf-live-writer/include/BinaryReader.hpp create mode 100644 jf-live-writer/include/BufferStats.hpp delete mode 100644 jf-live-writer/include/JFH5LiveWriter.hpp delete mode 100644 jf-live-writer/include/LiveImageAssembler.hpp delete mode 100644 jf-live-writer/src/BinaryReader.cpp create mode 100644 jf-live-writer/src/BufferStats.cpp delete mode 100644 jf-live-writer/src/JFH5LiveWriter.cpp delete mode 100644 jf-live-writer/src/LiveImageAssembler.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c739d65..63c25dc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,4 +34,4 @@ add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") add_subdirectory("sf-stream") add_subdirectory("sf-writer") -#add_subdirectory("jf-live-writer") +add_subdirectory("jf-live-writer") diff --git a/jf-live-writer/include/BinaryReader.hpp b/jf-live-writer/include/BinaryReader.hpp deleted file mode 100644 index 85d2a0c..0000000 --- a/jf-live-writer/include/BinaryReader.hpp +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef SF_DAQ_BUFFER_BINARYREADER_HPP -#define SF_DAQ_BUFFER_BINARYREADER_HPP - - -#include - -class BinaryReader { - - const std::string detector_folder_; - const std::string module_name_; - - std::string current_input_file_; - int input_file_fd_; - - void open_file(const std::string& filename); - void close_current_file(); - -public: - BinaryReader(const std::string &detector_folder, - const std::string &module_name); - - ~BinaryReader(); - - void get_frame(const uint64_t pulse_id, BufferBinaryFormat *buffer); -}; - - -#endif //SF_DAQ_BUFFER_BINARYREADER_HPP diff --git a/jf-live-writer/include/BufferStats.hpp b/jf-live-writer/include/BufferStats.hpp new file mode 100644 index 0000000..3aff6cb --- /dev/null +++ b/jf-live-writer/include/BufferStats.hpp @@ -0,0 +1,32 @@ +#include +#include +#include + +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP + + +class BufferStats { + const std::string detector_name_; + const int module_id_; + size_t stats_modulo_; + + int frames_counter_; + uint32_t total_buffer_write_us_; + uint32_t max_buffer_write_us_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + BufferStats( + const std::string &detector_name, + const int module_id, + const size_t stats_modulo); + void start_frame_write(); + void end_frame_write(); +}; + + +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jf-live-writer/include/JFH5LiveWriter.hpp b/jf-live-writer/include/JFH5LiveWriter.hpp deleted file mode 100644 index a417631..0000000 --- a/jf-live-writer/include/JFH5LiveWriter.hpp +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef SFWRITER_HPP -#define SFWRITER_HPP - -#include -#include -#include - -#include "LiveImageAssembler.hpp" - -const auto& H5_UINT64 = H5::PredType::NATIVE_UINT64; -const auto& H5_UINT32 = H5::PredType::NATIVE_UINT32; -const auto& H5_UINT16 = H5::PredType::NATIVE_UINT16; -const auto& H5_UINT8 = H5::PredType::NATIVE_UINT8; - -class JFH5LiveWriter { - - const std::string detector_name_; - const size_t n_modules_; - const size_t n_pulses_; - - size_t write_index_; - - H5::H5File file_; - H5::DataSet image_dataset_; - - uint64_t* b_pulse_id_; - uint64_t* b_frame_index_; - uint32_t* b_daq_rec_; - uint8_t* b_is_good_frame_ ; - - void init_file(const std::string &output_file); - void write_dataset(const std::string name, - const void *buffer, - const H5::PredType &type); - void write_metadata(); - std::string get_detector_name(const std::string& detector_folder); - - void close_file(); - -public: - JFH5LiveWriter(const std::string& output_file, - const std::string& detector_folder, - const size_t n_modules, - const size_t n_pulses); - ~JFH5LiveWriter(); - void write(const ImageMetadata* metadata, const char* data); -}; - -#endif //SFWRITER_HPP diff --git a/jf-live-writer/include/LiveImageAssembler.hpp b/jf-live-writer/include/LiveImageAssembler.hpp deleted file mode 100644 index 5bcb749..0000000 --- a/jf-live-writer/include/LiveImageAssembler.hpp +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP -#define SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP - -#include - -#include "buffer_config.hpp" -#include "formats.hpp" - -const uint64_t IA_EMPTY_SLOT_VALUE = 0; - -struct ImageMetadata -{ - uint64_t pulse_id; - uint64_t frame_index; - uint32_t daq_rec; - uint8_t is_good_image; -}; - -class LiveImageAssembler { - const size_t n_modules_; - const size_t image_buffer_slot_n_bytes_; - - char* image_buffer_; - ImageMetadata* image_meta_buffer_; - ModuleFrame* frame_meta_buffer_; - std::atomic_int* buffer_status_; - std::atomic_uint64_t* buffer_pulse_id_; - - size_t get_data_offset(const uint64_t slot_id, const int i_module); - size_t get_frame_metadata_offset(const uint64_t slot_id, const int i_module); - -public: - LiveImageAssembler(const size_t n_modules); - - virtual ~LiveImageAssembler(); - - bool is_slot_free(const uint64_t pulse_id); - bool is_slot_full(const uint64_t pulse_id); - - void process(const uint64_t pulse_id, - const int i_module, - const BufferBinaryFormat* block_buffer); - - void free_slot(const uint64_t pulse_id); - - ImageMetadata* get_metadata_buffer(const uint64_t pulse_id); - char* get_data_buffer(const uint64_t pulse_id); -}; - - -#endif //SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP diff --git a/jf-live-writer/include/live_writer_config.hpp b/jf-live-writer/include/live_writer_config.hpp index 0a62457..76d9b05 100644 --- a/jf-live-writer/include/live_writer_config.hpp +++ b/jf-live-writer/include/live_writer_config.hpp @@ -2,8 +2,6 @@ namespace live_writer_config { - // MS to retry reading from the image assembler. - const size_t ASSEMBLER_RETRY_MS = 5; - // Number of slots in the reconstruction buffer. - const size_t WRITER_IA_N_SLOTS = 200; + // N of IO threads to receive data from modules. + const int LIVE_ZMQ_IO_THREADS = 1; } \ No newline at end of file diff --git a/jf-live-writer/src/BinaryReader.cpp b/jf-live-writer/src/BinaryReader.cpp deleted file mode 100644 index 0512ac7..0000000 --- a/jf-live-writer/src/BinaryReader.cpp +++ /dev/null @@ -1,102 +0,0 @@ -#include "BinaryReader.hpp" - -#include -#include -#include -#include - -#include "BufferUtils.hpp" -#include "buffer_config.hpp" - -using namespace std; -using namespace buffer_config; - -BinaryReader::BinaryReader( - const std::string &detector_folder, - const std::string &module_name) : - detector_folder_(detector_folder), - module_name_(module_name), - current_input_file_(""), - input_file_fd_(-1) -{} - -BinaryReader::~BinaryReader() -{ - close_current_file(); -} - -void BinaryReader::get_frame( - const uint64_t pulse_id, BufferBinaryFormat* buffer) -{ - - auto current_frame_file = BufferUtils::get_filename( - detector_folder_, module_name_, pulse_id); - - if (current_frame_file != current_input_file_) { - open_file(current_frame_file); - } - - size_t file_index = BufferUtils::get_file_frame_index(pulse_id); - size_t n_bytes_offset = file_index * sizeof(BufferBinaryFormat); - - auto lseek_result = lseek(input_file_fd_, n_bytes_offset, SEEK_SET); - if (lseek_result < 0) { - stringstream err_msg; - - err_msg << "[BinaryReader::get_frame]"; - err_msg << " Error while lseek on file "; - err_msg << current_input_file_ << " for n_bytes_offset "; - err_msg << n_bytes_offset << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - auto n_bytes = ::read(input_file_fd_, buffer, sizeof(BufferBinaryFormat)); - - if (n_bytes < sizeof(BufferBinaryFormat)) { - stringstream err_msg; - - err_msg << "[BinaryReader::get_block]"; - err_msg << " Error while reading from file "; - err_msg << current_input_file_ << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } -} - -void BinaryReader::open_file(const std::string& filename) -{ - close_current_file(); - - input_file_fd_ = open(filename.c_str(), O_RDONLY); - - if (input_file_fd_ < 0) { - stringstream err_msg; - - err_msg << "[BinaryReader::open_file]"; - err_msg << " Cannot open file " << filename << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - current_input_file_ = filename; -} - -void BinaryReader::close_current_file() -{ - if (input_file_fd_ != -1) { - if (close(input_file_fd_) < 0) { - stringstream err_msg; - - err_msg << "[BinaryWriter::close_current_file]"; - err_msg << " Error while closing file " << current_input_file_; - err_msg << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - input_file_fd_ = -1; - current_input_file_ = ""; - } -} diff --git a/jf-live-writer/src/BufferStats.cpp b/jf-live-writer/src/BufferStats.cpp new file mode 100644 index 0000000..173a35c --- /dev/null +++ b/jf-live-writer/src/BufferStats.cpp @@ -0,0 +1,63 @@ +#include +#include "BufferStats.hpp" + +using namespace std; +using namespace chrono; + +BufferStats::BufferStats( + const string& detector_name, + const int module_id, + const size_t stats_modulo) : + detector_name_(detector_name), + module_id_(module_id), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void BufferStats::reset_counters() +{ + frames_counter_ = 0; + total_buffer_write_us_ = 0; + max_buffer_write_us_ = 0; +} + +void BufferStats::start_frame_write() +{ + stats_interval_start_ = steady_clock::now(); +} + +void BufferStats::end_frame_write() +{ + frames_counter_++; + + uint32_t write_us_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + total_buffer_write_us_ += write_us_duration; + max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration); + + if (frames_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void BufferStats::print_stats() +{ + float avg_buffer_write_us = total_buffer_write_us_ / frames_counter_; + + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_buffer_writer"; + cout << ",detector_name=" << detector_name_; + cout << ",module_name=M" << module_id_; + cout << " "; + cout << "avg_buffer_write_us=" << avg_buffer_write_us; + cout << ",max_buffer_write_us=" << max_buffer_write_us_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/jf-live-writer/src/JFH5LiveWriter.cpp b/jf-live-writer/src/JFH5LiveWriter.cpp deleted file mode 100644 index 5928a6e..0000000 --- a/jf-live-writer/src/JFH5LiveWriter.cpp +++ /dev/null @@ -1,133 +0,0 @@ -#include "JFH5LiveWriter.hpp" - -#include -#include - - -#include "buffer_config.hpp" - -using namespace std; -using namespace buffer_config; - -JFH5LiveWriter::JFH5LiveWriter(const string& output_file, - const string& detector_folder, - const size_t n_modules, - const size_t n_pulses) : - detector_name_(get_detector_name(detector_folder)), - n_modules_(n_modules), - n_pulses_(n_pulses), - write_index_(0) -{ - b_pulse_id_ = new uint64_t[n_pulses_]; - b_frame_index_= new uint64_t[n_pulses_]; - b_daq_rec_ = new uint32_t[n_pulses_]; - b_is_good_frame_ = new uint8_t[n_pulses_]; - - init_file(output_file); -} - -void JFH5LiveWriter::init_file(const string& output_file) -{ - file_ = H5::H5File(output_file, H5F_ACC_TRUNC); - file_.createGroup("/data"); - file_.createGroup("/data/" + detector_name_); - - H5::DataSpace att_space(H5S_SCALAR); - H5::DataType data_type = H5::StrType(0, H5T_VARIABLE); - - file_.createGroup("/general"); - auto detector_dataset = file_.createDataSet( - "/general/detector_name", data_type ,att_space); - - detector_dataset.write(detector_name_, data_type); - - hsize_t image_dataset_dims[3] = - {n_pulses_, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE}; - - H5::DataSpace image_dataspace(3, image_dataset_dims); - - hsize_t image_dataset_chunking[3] = - {1, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DSetCreatPropList image_dataset_properties; - image_dataset_properties.setChunk(3, image_dataset_chunking); - - image_dataset_ = file_.createDataSet( - "/data/" + detector_name_ + "/data", - H5_UINT16, - image_dataspace, - image_dataset_properties); -} - - -std::string JFH5LiveWriter::get_detector_name(const string& detector_folder) -{ - size_t last_separator; - if ((last_separator = detector_folder.rfind("/")) == string::npos) { - return detector_folder; - } - - return detector_folder.substr(last_separator + 1); -} - -JFH5LiveWriter::~JFH5LiveWriter() -{ - close_file(); - - delete[] b_pulse_id_; - delete[] b_frame_index_; - delete[] b_daq_rec_; - delete[] b_is_good_frame_; -} - -void JFH5LiveWriter::write_dataset( - const string name, const void* buffer, const H5::PredType& type) -{ - hsize_t b_m_dims[] = {n_pulses_}; - H5::DataSpace b_m_space (1, b_m_dims); - - hsize_t f_m_dims[] = {n_pulses_, 1}; - H5::DataSpace f_m_space(2, f_m_dims); - - auto complete_name = "/data/" + detector_name_ + "/" + name; - auto dataset = file_.createDataSet(complete_name, type, f_m_space); - - dataset.write(buffer, type, b_m_space, f_m_space); - - dataset.close(); -} - -void JFH5LiveWriter::write_metadata() -{ - write_dataset("pulse_id", &b_pulse_id_, H5_UINT64); - write_dataset("frame_index", &b_frame_index_, H5_UINT64); - write_dataset("daq_rec", &b_daq_rec_, H5_UINT32); - write_dataset("is_good_frame", &b_is_good_frame_, H5_UINT8); -} - -void JFH5LiveWriter::close_file() -{ - if (file_.getId() == -1) { - return; - } - - image_dataset_.close(); - - write_metadata(); - - file_.close(); -} - -void JFH5LiveWriter::write(const ImageMetadata* metadata, const char* data) -{ - hsize_t offset[] = {write_index_, 0, 0}; - - H5DOwrite_chunk(image_dataset_.getId(), H5P_DEFAULT, 0, - offset, MODULE_N_BYTES * n_modules_, data); - - b_pulse_id_[write_index_] = metadata->pulse_id; - b_frame_index_[write_index_] = metadata->frame_index; - b_daq_rec_[write_index_] = metadata->daq_rec; - b_is_good_frame_[write_index_] = metadata->is_good_image; - - write_index_++; -} diff --git a/jf-live-writer/src/LiveImageAssembler.cpp b/jf-live-writer/src/LiveImageAssembler.cpp deleted file mode 100644 index 57cf48b..0000000 --- a/jf-live-writer/src/LiveImageAssembler.cpp +++ /dev/null @@ -1,159 +0,0 @@ -#include - -#include "LiveImageAssembler.hpp" -#include "buffer_config.hpp" -#include "live_writer_config.hpp" - -using namespace std; -using namespace buffer_config; -using namespace live_writer_config; - -LiveImageAssembler::LiveImageAssembler(const size_t n_modules) : - n_modules_(n_modules), - image_buffer_slot_n_bytes_(MODULE_N_BYTES * n_modules_) -{ - image_buffer_ = new char[WRITER_IA_N_SLOTS * image_buffer_slot_n_bytes_]; - image_meta_buffer_ = new ImageMetadata[WRITER_IA_N_SLOTS]; - frame_meta_buffer_ = new ModuleFrame[WRITER_IA_N_SLOTS * n_modules]; - buffer_status_ = new atomic_int[WRITER_IA_N_SLOTS]; - buffer_pulse_id_ = new atomic_uint64_t[WRITER_IA_N_SLOTS]; - - for (size_t i=0; i < WRITER_IA_N_SLOTS; i++) { - free_slot(i); - } -} - -LiveImageAssembler::~LiveImageAssembler() -{ - delete[] image_buffer_; - delete[] image_meta_buffer_; -} - -bool LiveImageAssembler::is_slot_free(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - uint64_t slot_pulse_id = IA_EMPTY_SLOT_VALUE; - if (buffer_pulse_id_[slot_id].compare_exchange_strong( - slot_pulse_id, pulse_id)) { - return true; - } - - auto is_free = buffer_status_[slot_id].load(memory_order_relaxed) > 0; - return is_free && (slot_pulse_id == pulse_id); -} - -bool LiveImageAssembler::is_slot_full(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - return buffer_status_[slot_id].load(memory_order_relaxed) == 0; -} - -size_t LiveImageAssembler::get_data_offset( - const uint64_t slot_id, const int i_module) -{ - size_t slot_i_offset = slot_id * image_buffer_slot_n_bytes_; - size_t module_i_offset = i_module * MODULE_N_BYTES; - - return slot_i_offset + module_i_offset; -} - -size_t LiveImageAssembler::get_frame_metadata_offset( - const uint64_t slot_id, const int i_module) -{ - size_t slot_m_offset = slot_id * n_modules_; - size_t module_m_offset = i_module; - - return slot_m_offset + module_m_offset; -} - -void LiveImageAssembler::process( - const uint64_t pulse_id, - const int i_module, - const BufferBinaryFormat* file_buffer) -{ - const auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - auto frame_meta_offset = get_frame_metadata_offset(slot_id, i_module); - auto image_offset = get_data_offset(slot_id, i_module); - - memcpy( - &(frame_meta_buffer_[frame_meta_offset]), - &(file_buffer->metadata), - sizeof(file_buffer->metadata)); - - memcpy( - image_buffer_ + image_offset, - &(file_buffer->data[0]), - MODULE_N_BYTES); - - buffer_status_[slot_id].fetch_sub(1, memory_order_relaxed); -} - -void LiveImageAssembler::free_slot(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - buffer_status_[slot_id].store(n_modules_, memory_order_relaxed); - buffer_pulse_id_[slot_id].store(IA_EMPTY_SLOT_VALUE, memory_order_relaxed); -} - -ImageMetadata* LiveImageAssembler::get_metadata_buffer(const uint64_t pulse_id) -{ - const auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - ImageMetadata& image_meta = image_meta_buffer_[slot_id]; - - auto frame_meta_offset = get_frame_metadata_offset(slot_id, 0); - - auto is_pulse_init = false; - image_meta.is_good_image = 1; - image_meta.pulse_id = 0; - - for (size_t i_module=0; i_module < n_modules_; i_module++) { - - auto& frame_meta = frame_meta_buffer_[frame_meta_offset]; - frame_meta_offset += 1; - - auto is_good_frame = - frame_meta.n_recv_packets == JF_N_PACKETS_PER_FRAME; - - if (!is_good_frame) { - image_meta.pulse_id = 0; - continue; - } - - if (!is_pulse_init) { - image_meta.pulse_id = frame_meta.pulse_id; - image_meta.frame_index = frame_meta.frame_index; - image_meta.daq_rec = frame_meta.daq_rec; - - is_pulse_init = true; - } - - if (image_meta.is_good_image == 1) { - if (frame_meta.pulse_id != image_meta.pulse_id) { - image_meta.is_good_image = 0; - } - - if (frame_meta.frame_index != image_meta.frame_index) { - image_meta.is_good_image = 0; - } - - if (frame_meta.daq_rec != image_meta.daq_rec) { - image_meta.is_good_image = 0; - } - - if (frame_meta.n_recv_packets != JF_N_PACKETS_PER_FRAME) { - image_meta.is_good_image = 0; - } - } - } - - return &image_meta; -} - -char* LiveImageAssembler::get_data_buffer(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - return image_buffer_ + (slot_id * image_buffer_slot_n_bytes_); -} diff --git a/jf-live-writer/src/main.cpp b/jf-live-writer/src/main.cpp index 139a34f..1b912f8 100644 --- a/jf-live-writer/src/main.cpp +++ b/jf-live-writer/src/main.cpp @@ -1,195 +1,46 @@ #include #include -#include -#include -#include - -#include "zmq.h" +#include +#include +#include #include "live_writer_config.hpp" -#include "buffer_config.hpp" -#include "bitshuffle/bitshuffle.h" -#include "JFH5LiveWriter.hpp" -#include "LiveImageAssembler.hpp" -#include "BinaryReader.hpp" +#include "../../jf-buffer-writer/include/BufferStats.hpp" + using namespace std; -using namespace chrono; using namespace buffer_config; using namespace live_writer_config; -void read_buffer( - const string detector_folder, - const string module_name, - const int i_module, - const vector& pulse_ids_to_write, - LiveImageAssembler& image_assembler, - void* ctx) -{ - BinaryReader reader(detector_folder, module_name); - auto frame_buffer = new BufferBinaryFormat(); - - void* socket = zmq_socket(ctx, ZMQ_SUB); - if (socket == nullptr) { - throw runtime_error(zmq_strerror(errno)); - } - - int rcvhwm = 100; - if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - int linger = 0; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - // In milliseconds. - int rcvto = 2000; - if (zmq_setsockopt(socket, ZMQ_RCVTIMEO, &rcvto, sizeof(rcvto)) != 0 ){ - throw runtime_error(zmq_strerror(errno)); - } - - if (zmq_connect(socket, "tcp://127.0.0.1:51234") != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - const uint64_t PULSE_ID_DELAY = 100; - - uint64_t live_pulse_id = pulse_ids_to_write.front(); - for (uint64_t pulse_id:pulse_ids_to_write) { - - while(!image_assembler.is_slot_free(pulse_id)) { - this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS)); - } - - auto start_time = steady_clock::now(); - - // Enforce a delay of 1 second for writing. - while (live_pulse_id - pulse_id < PULSE_ID_DELAY) { - if (zmq_recv(socket, &live_pulse_id, - sizeof(live_pulse_id), 0) == -1) { - if (errno == EAGAIN) { - throw runtime_error("Did not receive pulse_id in time."); - } else { - throw runtime_error(zmq_strerror(errno)); - } - } - } - - reader.get_frame(pulse_id, frame_buffer); - - auto end_time = steady_clock::now(); - uint64_t read_us_duration = duration_cast( - end_time-start_time).count(); - - start_time = steady_clock::now(); - - image_assembler.process(pulse_id, i_module, frame_buffer); - - end_time = steady_clock::now(); - uint64_t compose_us_duration = duration_cast( - end_time-start_time).count(); - - cout << "sf_writer:avg_read_us "; - cout << read_us_duration / BUFFER_BLOCK_SIZE << endl; - cout << "sf_writer:avg_assemble_us "; - cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl; - } - - delete frame_buffer; -} - int main (int argc, char *argv[]) { - if (argc != 7) { + if (argc != 3) { cout << endl; - cout << "Usage: sf_writer [output_file] [detector_folder] [n_modules]"; - cout << " [start_pulse_id] [n_pulses] [pulse_id_step]"; - cout << endl; - cout << "\toutput_file: Complete path to the output file." << endl; - cout << "\tdetector_folder: Absolute path to detector buffer." << endl; - cout << "\tn_modules: number of modules" << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tn_pulses: Number of pulses to write." << endl; - cout << "\tpulse_id_step: 1==100Hz, 2==50hz, 4==25Hz.." << endl; + cout << "Usage: jf_live_writer [detector_json_filename]" + " [stream_name]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; cout << endl; exit(-1); } - string output_file = string(argv[1]); - const string detector_folder = string(argv[2]); - size_t n_modules = atoi(argv[3]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); - size_t n_pulses = (size_t) atoll(argv[5]); - int pulse_id_step = atoi(argv[6]); - - std::vector pulse_ids_to_write; - uint64_t i_pulse_id = start_pulse_id; - for (size_t i=0; i reading_threads(n_modules); - for (size_t i_module=0; i_module( - end_time-start_time).count(); - - image_assembler.free_slot(pulse_id); - - cout << "sf_writer:avg_write_us "; - cout << write_us_duration / BUFFER_BLOCK_SIZE << endl; - } - - for (auto& reading_thread : reading_threads) { - if (reading_thread.joinable()) { - reading_thread.join(); - } - } - - return 0; } From 2a9e4f2b7c8b450f8a888dbe45864d4f09c60eb3 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 18:35:28 +0100 Subject: [PATCH 19/21] Implementation of WriterStats for the live writer --- .../{BufferStats.hpp => WriterStats.hpp} | 12 ++++----- .../src/{BufferStats.cpp => WriterStats.cpp} | 26 +++++++++---------- 2 files changed, 17 insertions(+), 21 deletions(-) rename jf-live-writer/include/{BufferStats.hpp => WriterStats.hpp} (75%) rename jf-live-writer/src/{BufferStats.cpp => WriterStats.cpp} (66%) diff --git a/jf-live-writer/include/BufferStats.hpp b/jf-live-writer/include/WriterStats.hpp similarity index 75% rename from jf-live-writer/include/BufferStats.hpp rename to jf-live-writer/include/WriterStats.hpp index 3aff6cb..cb023a7 100644 --- a/jf-live-writer/include/BufferStats.hpp +++ b/jf-live-writer/include/WriterStats.hpp @@ -6,12 +6,11 @@ #define SF_DAQ_BUFFER_FRAMESTATS_HPP -class BufferStats { +class WriterStats { const std::string detector_name_; - const int module_id_; size_t stats_modulo_; - int frames_counter_; + int image_counter_; uint32_t total_buffer_write_us_; uint32_t max_buffer_write_us_; std::chrono::time_point stats_interval_start_; @@ -20,12 +19,11 @@ class BufferStats { void print_stats(); public: - BufferStats( + WriterStats( const std::string &detector_name, - const int module_id, const size_t stats_modulo); - void start_frame_write(); - void end_frame_write(); + void start_image_write(); + void end_image_write(); }; diff --git a/jf-live-writer/src/BufferStats.cpp b/jf-live-writer/src/WriterStats.cpp similarity index 66% rename from jf-live-writer/src/BufferStats.cpp rename to jf-live-writer/src/WriterStats.cpp index 173a35c..1d67947 100644 --- a/jf-live-writer/src/BufferStats.cpp +++ b/jf-live-writer/src/WriterStats.cpp @@ -1,35 +1,33 @@ #include -#include "BufferStats.hpp" +#include "WriterStats.hpp" using namespace std; using namespace chrono; -BufferStats::BufferStats( +WriterStats::WriterStats( const string& detector_name, - const int module_id, const size_t stats_modulo) : detector_name_(detector_name), - module_id_(module_id), stats_modulo_(stats_modulo) { reset_counters(); } -void BufferStats::reset_counters() +void WriterStats::reset_counters() { - frames_counter_ = 0; + image_counter_ = 0; total_buffer_write_us_ = 0; max_buffer_write_us_ = 0; } -void BufferStats::start_frame_write() +void WriterStats::start_image_write() { stats_interval_start_ = steady_clock::now(); } -void BufferStats::end_frame_write() +void WriterStats::end_image_write() { - frames_counter_++; + image_counter_++; uint32_t write_us_duration = duration_cast( steady_clock::now()-stats_interval_start_).count(); @@ -37,15 +35,15 @@ void BufferStats::end_frame_write() total_buffer_write_us_ += write_us_duration; max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration); - if (frames_counter_ == stats_modulo_) { + if (image_counter_ == stats_modulo_) { print_stats(); reset_counters(); } } -void BufferStats::print_stats() +void WriterStats::print_stats() { - float avg_buffer_write_us = total_buffer_write_us_ / frames_counter_; + float avg_buffer_write_us = total_buffer_write_us_ / image_counter_; uint64_t timestamp = time_point_cast( system_clock::now()).time_since_epoch().count(); @@ -53,9 +51,9 @@ void BufferStats::print_stats() // Output in InfluxDB line protocol cout << "jf_buffer_writer"; cout << ",detector_name=" << detector_name_; - cout << ",module_name=M" << module_id_; cout << " "; - cout << "avg_buffer_write_us=" << avg_buffer_write_us; + cout << "n_written_images=" << image_counter_ << "i"; + cout << " ,avg_buffer_write_us=" << avg_buffer_write_us; cout << ",max_buffer_write_us=" << max_buffer_write_us_ << "i"; cout << " "; cout << timestamp; From 94749585d892ad343b42b0bd6920686e884def92 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 19:09:19 +0100 Subject: [PATCH 20/21] Add ImageBinaryWriter implementation --- jf-live-writer/include/ImageBinaryWriter.hpp | 33 ++++ jf-live-writer/src/ImageBinaryWriter.cpp | 165 +++++++++++++++++++ 2 files changed, 198 insertions(+) create mode 100644 jf-live-writer/include/ImageBinaryWriter.hpp create mode 100644 jf-live-writer/src/ImageBinaryWriter.cpp diff --git a/jf-live-writer/include/ImageBinaryWriter.hpp b/jf-live-writer/include/ImageBinaryWriter.hpp new file mode 100644 index 0000000..8e6ebfb --- /dev/null +++ b/jf-live-writer/include/ImageBinaryWriter.hpp @@ -0,0 +1,33 @@ +#ifndef BINARYWRITER_HPP +#define BINARYWRITER_HPP + +#include + +#include "formats.hpp" + +class ImageBinaryWriter { + + const size_t MAX_FILE_BYTES = + buffer_config::FILE_MOD * sizeof(BufferBinaryFormat); + + const std::string detector_folder_; + std::string latest_filename_; + + std::string current_output_filename_; + int output_file_fd_; + + void open_file(const std::string& filename); + void close_current_file(); + + +public: + ImageBinaryWriter(const std::string& detector_folder); + + virtual ~ImageBinaryWriter(); + + void write(const uint64_t pulse_id, const BufferBinaryFormat* buffer); + +}; + + +#endif //BINARYWRITER_HPP diff --git a/jf-live-writer/src/ImageBinaryWriter.cpp b/jf-live-writer/src/ImageBinaryWriter.cpp new file mode 100644 index 0000000..c3f70f5 --- /dev/null +++ b/jf-live-writer/src/ImageBinaryWriter.cpp @@ -0,0 +1,165 @@ +#include "ImageBinaryWriter.hpp" + +#include +#include +#include "date.h" +#include +#include +#include +#include + +#include "BufferUtils.hpp" + +using namespace std; + +ImageBinaryWriter::ImageBinaryWriter( + const string& detector_folder): + detector_folder_(detector_folder), + latest_filename_(detector_folder + "/LATEST"), + current_output_filename_(""), + output_file_fd_(-1) +{ +} + +ImageBinaryWriter::~ImageBinaryWriter() +{ + close_current_file(); +} + +void ImageBinaryWriter::write( + const uint64_t pulse_id, + const BufferBinaryFormat* buffer) +{ + auto current_frame_file = + BufferUtils::get_filename(detector_folder_, module_name_, pulse_id); + + if (current_frame_file != current_output_filename_) { + open_file(current_frame_file); + } + + size_t n_bytes_offset = + BufferUtils::get_file_frame_index(pulse_id) * + sizeof(BufferBinaryFormat); + + auto lseek_result = lseek(output_file_fd_, n_bytes_offset, SEEK_SET); + if (lseek_result < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryWriter::write]"; + err_msg << " Error while lseek on file "; + err_msg << current_output_filename_; + err_msg << " for n_bytes_offset "; + err_msg << n_bytes_offset << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + auto n_bytes = ::write(output_file_fd_, buffer, sizeof(BufferBinaryFormat)); + if (n_bytes < sizeof(BufferBinaryFormat)) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryWriter::write]"; + err_msg << " Error while writing to file "; + err_msg << current_output_filename_ << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } +} + +void ImageBinaryWriter::open_file(const std::string& filename) +{ + close_current_file(); + + BufferUtils::create_destination_folder(filename); + + output_file_fd_ = ::open(filename.c_str(), O_WRONLY | O_CREAT, + S_IRWXU | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); + if (output_file_fd_ < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BinaryWriter::open_file]"; + err_msg << " Cannot create file "; + err_msg << filename << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + // TODO: Remove context if test successful. + + /** Setting the buffer file size in advance to try to lower the number of + metadata updates on GPFS. */ + { + // TODO: Try instead to use fallocate. + if (lseek(output_file_fd_, MAX_FILE_BYTES, SEEK_SET) < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryWriter::open_file]"; + err_msg << " Error while lseek on end of file "; + err_msg << current_output_filename_; + err_msg << " for MAX_FILE_BYTES "; + err_msg << MAX_FILE_BYTES << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + const uint8_t mark = 255; + if(::write(output_file_fd_, &mark, sizeof(mark)) != sizeof(mark)) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryWriter::open_file]"; + err_msg << " Error while writing to file "; + err_msg << current_output_filename_ << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + } + + + current_output_filename_ = filename; +} + +void ImageBinaryWriter::close_current_file() +{ + if (output_file_fd_ != -1) { + if (close(output_file_fd_) < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BufferBinaryWriter::close_current_file]"; + err_msg << " Error while closing file "; + err_msg << current_output_filename_ << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + output_file_fd_ = -1; + + BufferUtils::update_latest_file( + latest_filename_, current_output_filename_); + + current_output_filename_ = ""; + } +} \ No newline at end of file From aba739ce877f23e70dc33938bdbb3e2b5e387783 Mon Sep 17 00:00:00 2001 From: Dmitry Ozerov Date: Tue, 16 Feb 2021 16:25:28 +0100 Subject: [PATCH 21/21] print statistics from udp writer every N seconds (not N frames) check that frame packets sending is finished (new frame) by new trigger number, not pulse_id check if pulse_id of the frame is more or less correct (in case not - do not store that frame in the ram buffer) --- core-buffer/include/buffer_config.hpp | 6 ++++-- jf-udp-recv/include/FrameStats.hpp | 7 ++++--- jf-udp-recv/src/FrameStats.cpp | 18 +++++++++++++---- jf-udp-recv/src/FrameUdpReceiver.cpp | 6 +++--- jf-udp-recv/src/main.cpp | 28 +++++++++++++++++++++++---- 5 files changed, 49 insertions(+), 16 deletions(-) diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index 293308b..b2e68ca 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -20,8 +20,10 @@ namespace buffer_config { const size_t FOLDER_MOD = 100000; // Extension of our file format. const std::string FILE_EXTENSION = ".bin"; - // Number of pulses between each statistics print out. - const size_t STATS_MODULO = 100; + // Number of pulses between each statistics print out (buffer_writer, stream2vis...) + const size_t STATS_MODULO = 1000; + // Number of seconds after which statistics is print out (udp_recv) + const size_t STATS_TIME = 10; // If the RB is empty, how much time to wait before trying to read it again. const size_t RB_READ_RETRY_INTERVAL_MS = 5; // How many frames to read at once from file. diff --git a/jf-udp-recv/include/FrameStats.hpp b/jf-udp-recv/include/FrameStats.hpp index dd4ef95..7839a38 100644 --- a/jf-udp-recv/include/FrameStats.hpp +++ b/jf-udp-recv/include/FrameStats.hpp @@ -9,11 +9,12 @@ class FrameStats { const std::string detector_name_; const int module_id_; - size_t stats_modulo_; + size_t stats_time_; int frames_counter_; int n_missed_packets_; int n_corrupted_frames_; + int n_corrupted_pulse_id_; std::chrono::time_point stats_interval_start_; void reset_counters(); @@ -22,8 +23,8 @@ class FrameStats { public: FrameStats(const std::string &detector_name, const int module_id, - const size_t stats_modulo); - void record_stats(const ModuleFrame &meta); + const size_t stats_time); + void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); }; diff --git a/jf-udp-recv/src/FrameStats.cpp b/jf-udp-recv/src/FrameStats.cpp index 103e4fe..28161c7 100644 --- a/jf-udp-recv/src/FrameStats.cpp +++ b/jf-udp-recv/src/FrameStats.cpp @@ -7,10 +7,10 @@ using namespace chrono; FrameStats::FrameStats( const std::string &detector_name, const int module_id, - const size_t stats_modulo) : + const size_t stats_time) : detector_name_(detector_name), module_id_(module_id), - stats_modulo_(stats_modulo) + stats_time_(stats_time) { reset_counters(); } @@ -20,11 +20,17 @@ void FrameStats::reset_counters() frames_counter_ = 0; n_missed_packets_ = 0; n_corrupted_frames_ = 0; + n_corrupted_pulse_id_ = 0; stats_interval_start_ = steady_clock::now(); } -void FrameStats::record_stats(const ModuleFrame &meta) +void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) { + + if (bad_pulse_id) { + n_corrupted_pulse_id_++; + } + if (meta.n_recv_packets < JF_N_PACKETS_PER_FRAME) { n_missed_packets_ += JF_N_PACKETS_PER_FRAME - meta.n_recv_packets; n_corrupted_frames_++; @@ -32,7 +38,10 @@ void FrameStats::record_stats(const ModuleFrame &meta) frames_counter_++; - if (frames_counter_ == stats_modulo_) { + auto time_passed = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { print_stats(); reset_counters(); } @@ -55,6 +64,7 @@ void FrameStats::print_stats() cout << "n_missed_packets=" << n_missed_packets_ << "i"; cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; cout << ",repetition_rate=" << rep_rate << "i"; + cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; cout << " "; cout << timestamp; cout << endl; diff --git a/jf-udp-recv/src/FrameUdpReceiver.cpp b/jf-udp-recv/src/FrameUdpReceiver.cpp index 1c0833b..cb78a7a 100644 --- a/jf-udp-recv/src/FrameUdpReceiver.cpp +++ b/jf-udp-recv/src/FrameUdpReceiver.cpp @@ -63,8 +63,8 @@ inline uint64_t FrameUdpReceiver::process_packets( init_frame(metadata, i_packet); // Happens if the last packet from the previous frame gets lost. - // In the jungfrau_packet, pulse_id is called bunchid. - } else if (metadata.pulse_id != packet_buffer_[i_packet].bunchid) { + // In the jungfrau_packet, framenum is the trigger number (how many triggers from detector power-on) happened + } else if (metadata.frame_index != packet_buffer_[i_packet].framenum) { packet_buffer_loaded_ = true; // Continue on this packet. packet_buffer_offset_ = i_packet; @@ -134,4 +134,4 @@ uint64_t FrameUdpReceiver::get_frame_from_udp( return pulse_id; } } -} \ No newline at end of file +} diff --git a/jf-udp-recv/src/main.cpp b/jf-udp-recv/src/main.cpp index 2194b80..bca57e2 100644 --- a/jf-udp-recv/src/main.cpp +++ b/jf-udp-recv/src/main.cpp @@ -33,7 +33,7 @@ int main (int argc, char *argv[]) { const auto udp_port = config.start_udp_port + module_id; FrameUdpReceiver receiver(udp_port, module_id); RamBuffer buffer(config.detector_name, config.n_modules); - FrameStats stats(config.detector_name, module_id, STATS_MODULO); + FrameStats stats(config.detector_name, module_id, STATS_TIME); auto ctx = zmq_ctx_new(); auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); @@ -41,14 +41,34 @@ int main (int argc, char *argv[]) { ModuleFrame meta; char* data = new char[MODULE_N_BYTES]; + uint64_t pulse_id_previous = 0; + uint64_t frame_index_previous = 0; + while (true) { + auto pulse_id = receiver.get_frame_from_udp(meta, data); - buffer.write_frame(meta, data); + bool bad_pulse_id = false; - zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); + if ( ( meta.frame_index != (frame_index_previous+1) ) || + ( (pulse_id-pulse_id_previous) < 0 ) || + ( (pulse_id-pulse_id_previous) > 1000 ) ) { + + bad_pulse_id = true; + + } else { + + buffer.write_frame(meta, data); + + zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); + + } + + stats.record_stats(meta, bad_pulse_id); + + pulse_id_previous = pulse_id; + frame_index_previous = meta.frame_index; - stats.record_stats(meta); } delete[] data;