From 95be6b25e205bd48ad2f928d49f0216476b8f241 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 2 Jul 2021 14:13:56 +0200 Subject: [PATCH 01/23] Remove Eiger module n_bytes and calculate correctly --- core-buffer/include/eiger.hpp | 1 - jf-udp-recv/src/main.cpp | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core-buffer/include/eiger.hpp b/core-buffer/include/eiger.hpp index 2038d00..6eee1ea 100644 --- a/core-buffer/include/eiger.hpp +++ b/core-buffer/include/eiger.hpp @@ -19,7 +19,6 @@ #define MODULE_Y_SIZE 512 #define MODULE_N_PIXELS 131072 #define PIXEL_N_BYTES 2 -#define MODULE_N_BYTES 262144 #define GAP_X_MODULE_PIXELS 2 #define GAP_Y_MODULE_PIXELS 2 #define GAP_X_EIGERMOD_PIXELS 8 diff --git a/jf-udp-recv/src/main.cpp b/jf-udp-recv/src/main.cpp index 027fce4..b2e84fc 100644 --- a/jf-udp-recv/src/main.cpp +++ b/jf-udp-recv/src/main.cpp @@ -47,8 +47,7 @@ int main (int argc, char *argv[]) { auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); ModuleFrame meta; - // TODO: This will not work. Only if Eiger sends in 16 bit. Use MODULE_N_PIXELS * bit_depth / 8 - char* data = new char[MODULE_N_BYTES]; + char* data = new char[MODULE_N_PIXELS * bit_depth / 8]; uint64_t pulse_id_previous = 0; uint64_t frame_index_previous = 0; From ce93dce7cf7379bcf22982ca9b05c1ebbd5f6c13 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 2 Jul 2021 14:17:38 +0200 Subject: [PATCH 02/23] Removed MODULE_N_BYTES requirement --- CMakeLists.txt | 4 ++-- core-buffer/include/formats.hpp | 32 ++++++++++++++++---------------- jf-assembler/src/main.cpp | 2 +- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 74c7972..07b6e21 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,10 +32,10 @@ add_subdirectory( add_subdirectory("core-buffer") add_subdirectory("jf-udp-recv") -add_subdirectory("jf-buffer-writer") +#add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") add_subdirectory("sf-stream") -add_subdirectory("sf-writer") +#add_subdirectory("sf-writer") if(BUILD_JF_LIVE_WRITER) add_subdirectory("jf-live-writer") diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index ffc065e..d8de5aa 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -40,22 +40,22 @@ struct ModuleFrameBuffer { ModuleFrame module[N_MODULES]; }; -#pragma pack(push) -#pragma pack(1) -struct BufferBinaryFormat { - const char FORMAT_MARKER = 0xBE; - ModuleFrame meta; - char data[MODULE_N_BYTES]; -}; -#pragma pack(pop) +//#pragma pack(push) +//#pragma pack(1) +//struct BufferBinaryFormat { +// const char FORMAT_MARKER = 0xBE; +// ModuleFrame meta; +// char data[MODULE_N_BYTES]; +//}; +//#pragma pack(pop) -#pragma pack(push) -#pragma pack(1) -struct BufferBinaryBlock -{ - BufferBinaryFormat frame[buffer_config::BUFFER_BLOCK_SIZE]; - uint64_t start_pulse_id; -}; -#pragma pack(pop) +//#pragma pack(push) +//#pragma pack(1) +//struct BufferBinaryBlock +//{ +// BufferBinaryFormat frame[buffer_config::BUFFER_BLOCK_SIZE]; +// uint64_t start_pulse_id; +//}; +//#pragma pack(pop) #endif //SF_DAQ_BUFFER_FORMATS_HPP diff --git a/jf-assembler/src/main.cpp b/jf-assembler/src/main.cpp index 870f951..9ccf79d 100644 --- a/jf-assembler/src/main.cpp +++ b/jf-assembler/src/main.cpp @@ -57,7 +57,7 @@ int main (int argc, char *argv[]) EigerAssembler assembler(config.n_submodules, bit_depth); RamBuffer frame_buffer(config.detector_name, - sizeof(ModuleFrame), MODULE_N_BYTES, config.n_modules); + sizeof(ModuleFrame), MODULE_N_PIXELS * bit_depth, config.n_modules); RamBuffer image_buffer(config.detector_name + "_" + stream_name, sizeof(ImageMetadata), assembler.get_image_n_bytes(), 1); From 7879c3c3ea8402580b6f1c3e122f7bca3d3b08e1 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 2 Jul 2021 14:41:48 +0200 Subject: [PATCH 03/23] Move JF specific receiver to generic --- CMakeLists.txt | 2 +- jf-udp-recv/CMakeLists.txt | 26 ------------------- std-udp-recv/CMakeLists.txt | 26 +++++++++++++++++++ {jf-udp-recv => std-udp-recv}/README.md | 0 .../include/FrameStats.hpp | 0 .../include/FrameUdpReceiver.hpp | 0 .../include/PacketUdpReceiver.hpp | 0 .../src/FrameStats.cpp | 0 .../src/FrameUdpReceiver.cpp | 0 .../src/PacketUdpReceiver.cpp | 0 {jf-udp-recv => std-udp-recv}/src/main.cpp | 15 +++++------ .../test/CMakeLists.txt | 0 {jf-udp-recv => std-udp-recv}/test/main.cpp | 0 .../test/mock/udp.hpp | 0 .../test/test_FrameUdpReceiver.cpp | 0 .../test/test_PacketUdpReceiver.cpp | 0 16 files changed, 33 insertions(+), 36 deletions(-) delete mode 100644 jf-udp-recv/CMakeLists.txt create mode 100644 std-udp-recv/CMakeLists.txt rename {jf-udp-recv => std-udp-recv}/README.md (100%) rename {jf-udp-recv => std-udp-recv}/include/FrameStats.hpp (100%) rename {jf-udp-recv => std-udp-recv}/include/FrameUdpReceiver.hpp (100%) rename {jf-udp-recv => std-udp-recv}/include/PacketUdpReceiver.hpp (100%) rename {jf-udp-recv => std-udp-recv}/src/FrameStats.cpp (100%) rename {jf-udp-recv => std-udp-recv}/src/FrameUdpReceiver.cpp (100%) rename {jf-udp-recv => std-udp-recv}/src/PacketUdpReceiver.cpp (100%) rename {jf-udp-recv => std-udp-recv}/src/main.cpp (79%) rename {jf-udp-recv => std-udp-recv}/test/CMakeLists.txt (100%) rename {jf-udp-recv => std-udp-recv}/test/main.cpp (100%) rename {jf-udp-recv => std-udp-recv}/test/mock/udp.hpp (100%) rename {jf-udp-recv => std-udp-recv}/test/test_FrameUdpReceiver.cpp (100%) rename {jf-udp-recv => std-udp-recv}/test/test_PacketUdpReceiver.cpp (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 07b6e21..4365202 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,7 +31,7 @@ add_subdirectory( EXCLUDE_FROM_ALL) add_subdirectory("core-buffer") -add_subdirectory("jf-udp-recv") +add_subdirectory("std-udp-recv") #add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") add_subdirectory("sf-stream") diff --git a/jf-udp-recv/CMakeLists.txt b/jf-udp-recv/CMakeLists.txt deleted file mode 100644 index 3667afb..0000000 --- a/jf-udp-recv/CMakeLists.txt +++ /dev/null @@ -1,26 +0,0 @@ -file(GLOB SOURCES - src/*.cpp) - -add_library(jf-udp-recv-lib STATIC ${SOURCES}) -target_include_directories(jf-udp-recv-lib PUBLIC include/) -target_link_libraries(jf-udp-recv-lib - external - core-buffer-lib) - -add_executable(jf-udp-recv src/main.cpp) - -if (USE_EIGER) - set (LIB_NAME_UDP_RECV "eiger_udp_recv") -else() - set (LIB_NAME_UDP_RECV "jf_udp_recv") -endif() - -set_target_properties(jf-udp-recv PROPERTIES OUTPUT_NAME ${LIB_NAME_UDP_RECV}) - -target_link_libraries(jf-udp-recv - jf-udp-recv-lib - zmq - rt) - -enable_testing() -add_subdirectory(test/) diff --git a/std-udp-recv/CMakeLists.txt b/std-udp-recv/CMakeLists.txt new file mode 100644 index 0000000..324e603 --- /dev/null +++ b/std-udp-recv/CMakeLists.txt @@ -0,0 +1,26 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(std-udp-recv-lib STATIC ${SOURCES}) +target_include_directories(std-udp-recv-lib PUBLIC include/) +target_link_libraries(std-udp-recv-lib + external + core-buffer-lib) + +add_executable(std-udp-recv src/main.cpp) + +if (USE_EIGER) + set (LIB_NAME_UDP_RECV "eiger_udp_recv") +else() + set (LIB_NAME_UDP_RECV "jf_udp_recv") +endif() + +set_target_properties(std-udp-recv PROPERTIES OUTPUT_NAME ${LIB_NAME_UDP_RECV}) + +target_link_libraries(std-udp-recv + std-udp-recv-lib + zmq + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/jf-udp-recv/README.md b/std-udp-recv/README.md similarity index 100% rename from jf-udp-recv/README.md rename to std-udp-recv/README.md diff --git a/jf-udp-recv/include/FrameStats.hpp b/std-udp-recv/include/FrameStats.hpp similarity index 100% rename from jf-udp-recv/include/FrameStats.hpp rename to std-udp-recv/include/FrameStats.hpp diff --git a/jf-udp-recv/include/FrameUdpReceiver.hpp b/std-udp-recv/include/FrameUdpReceiver.hpp similarity index 100% rename from jf-udp-recv/include/FrameUdpReceiver.hpp rename to std-udp-recv/include/FrameUdpReceiver.hpp diff --git a/jf-udp-recv/include/PacketUdpReceiver.hpp b/std-udp-recv/include/PacketUdpReceiver.hpp similarity index 100% rename from jf-udp-recv/include/PacketUdpReceiver.hpp rename to std-udp-recv/include/PacketUdpReceiver.hpp diff --git a/jf-udp-recv/src/FrameStats.cpp b/std-udp-recv/src/FrameStats.cpp similarity index 100% rename from jf-udp-recv/src/FrameStats.cpp rename to std-udp-recv/src/FrameStats.cpp diff --git a/jf-udp-recv/src/FrameUdpReceiver.cpp b/std-udp-recv/src/FrameUdpReceiver.cpp similarity index 100% rename from jf-udp-recv/src/FrameUdpReceiver.cpp rename to std-udp-recv/src/FrameUdpReceiver.cpp diff --git a/jf-udp-recv/src/PacketUdpReceiver.cpp b/std-udp-recv/src/PacketUdpReceiver.cpp similarity index 100% rename from jf-udp-recv/src/PacketUdpReceiver.cpp rename to std-udp-recv/src/PacketUdpReceiver.cpp diff --git a/jf-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp similarity index 79% rename from jf-udp-recv/src/main.cpp rename to std-udp-recv/src/main.cpp index b2e84fc..4db9263 100644 --- a/jf-udp-recv/src/main.cpp +++ b/std-udp-recv/src/main.cpp @@ -20,27 +20,24 @@ int main (int argc, char *argv[]) { if (argc != 4) { cout << endl; - #ifndef USE_EIGER - cout << "Usage: jf_udp_recv [detector_json_filename] [module_id] [bit_depth]"; - #else - cout << "Usage: eiger_udp_recv [detector_json_filename] [module_id] [bit_depth]"; - #endif + + cout << "Usage: std_udp_recv [udp_recv_config_filename] [module_id] [bit_depth]"; cout << endl; - cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tudp_recv_config_filename: detector config file path." << endl; cout << "\tmodule_id: id of the module for this process." << endl; cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; cout << endl; exit(-1); } - const auto config = read_json_config(string(argv[1])); + const auto config = UdpRecvConfig.from_json_file(string(argv[1])); const int module_id = atoi(argv[2]); const int bit_depth = atoi(argv[3]); const int n_receivers = config.n_modules * config.n_submodules; const auto udp_port = config.start_udp_port + module_id; FrameUdpReceiver receiver(module_id, udp_port, n_receivers, config.n_submodules, bit_depth); - RamBuffer buffer(config.detector_name, n_receivers, config.n_submodules, bit_depth); + RamBuffer frame_buffer(config.detector_name, n_receivers, config.n_submodules, bit_depth); FrameStats stats(config.detector_name, n_receivers, module_id, bit_depth, STATS_TIME); auto ctx = zmq_ctx_new(); @@ -62,7 +59,7 @@ int main (int argc, char *argv[]) { ( (meta.frame_index-frame_index_previous) > 1000 ) ){ bad_pulse_id = true; } else { - buffer.write_frame(meta, data); + frame_buffer.write_frame(meta, data); zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); diff --git a/jf-udp-recv/test/CMakeLists.txt b/std-udp-recv/test/CMakeLists.txt similarity index 100% rename from jf-udp-recv/test/CMakeLists.txt rename to std-udp-recv/test/CMakeLists.txt diff --git a/jf-udp-recv/test/main.cpp b/std-udp-recv/test/main.cpp similarity index 100% rename from jf-udp-recv/test/main.cpp rename to std-udp-recv/test/main.cpp diff --git a/jf-udp-recv/test/mock/udp.hpp b/std-udp-recv/test/mock/udp.hpp similarity index 100% rename from jf-udp-recv/test/mock/udp.hpp rename to std-udp-recv/test/mock/udp.hpp diff --git a/jf-udp-recv/test/test_FrameUdpReceiver.cpp b/std-udp-recv/test/test_FrameUdpReceiver.cpp similarity index 100% rename from jf-udp-recv/test/test_FrameUdpReceiver.cpp rename to std-udp-recv/test/test_FrameUdpReceiver.cpp diff --git a/jf-udp-recv/test/test_PacketUdpReceiver.cpp b/std-udp-recv/test/test_PacketUdpReceiver.cpp similarity index 100% rename from jf-udp-recv/test/test_PacketUdpReceiver.cpp rename to std-udp-recv/test/test_PacketUdpReceiver.cpp From 293573971775a05fbd1a3796f5e460bf0c48ec37 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 2 Jul 2021 15:14:25 +0200 Subject: [PATCH 04/23] Move detector specific files to std receiver --- {core-buffer => std-udp-recv}/include/eiger.hpp | 0 {core-buffer => std-udp-recv}/include/jungfrau.hpp | 1 - 2 files changed, 1 deletion(-) rename {core-buffer => std-udp-recv}/include/eiger.hpp (100%) rename {core-buffer => std-udp-recv}/include/jungfrau.hpp (99%) diff --git a/core-buffer/include/eiger.hpp b/std-udp-recv/include/eiger.hpp similarity index 100% rename from core-buffer/include/eiger.hpp rename to std-udp-recv/include/eiger.hpp diff --git a/core-buffer/include/jungfrau.hpp b/std-udp-recv/include/jungfrau.hpp similarity index 99% rename from core-buffer/include/jungfrau.hpp rename to std-udp-recv/include/jungfrau.hpp index eb4f174..455f21e 100644 --- a/core-buffer/include/jungfrau.hpp +++ b/std-udp-recv/include/jungfrau.hpp @@ -40,5 +40,4 @@ struct det_packet { }; #pragma pack(pop) - #endif From bdd065cd428ba3e9dea556d76c20d21d555ef18e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 2 Jul 2021 15:15:42 +0200 Subject: [PATCH 05/23] Comment out structs not used --- core-buffer/include/formats.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index d8de5aa..bbfb60b 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -36,9 +36,9 @@ struct ImageMetadata { }; #pragma pack(pop) -struct ModuleFrameBuffer { - ModuleFrame module[N_MODULES]; -}; +//struct ModuleFrameBuffer { +// ModuleFrame module[N_MODULES]; +//}; //#pragma pack(push) //#pragma pack(1) From a5e54b08904bebfc6cf3383a632eb4235554de28 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 14:24:36 +0200 Subject: [PATCH 06/23] Add std_udp_recv config structure --- std-udp-recv/include/UdpRecvConfig.hpp | 33 ++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 std-udp-recv/include/UdpRecvConfig.hpp diff --git a/std-udp-recv/include/UdpRecvConfig.hpp b/std-udp-recv/include/UdpRecvConfig.hpp new file mode 100644 index 0000000..847439b --- /dev/null +++ b/std-udp-recv/include/UdpRecvConfig.hpp @@ -0,0 +1,33 @@ +#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP +#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP + + +#include +#include +#include +#include +#include + +struct UdpRecvConfig { + static UdpRecvConfig from_json_file(const std::string& filename) { + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["detector_name"].GetString(), + config_parameters["detector_type"].GetString(), + config_parameters["n_modules"].GetInt(), + config_parameters["start_udp_port"].GetInt(), + }; + } + + const std::string detector_name; + const std::string detector_type; + const int n_modules; + const int start_udp_port; +}; + + +#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP From 8453c275f669bcb6de718dc37caa4643085c3b6e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 14:29:42 +0200 Subject: [PATCH 07/23] Adjust the UDP receivers to work also for Eiger --- std-udp-recv/src/main.cpp | 70 +++++++++++++++++++++++---------------- 1 file changed, 42 insertions(+), 28 deletions(-) diff --git a/std-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp index 4db9263..3d4010f 100644 --- a/std-udp-recv/src/main.cpp +++ b/std-udp-recv/src/main.cpp @@ -8,14 +8,20 @@ #include "FrameUdpReceiver.hpp" #include "BufferUtils.hpp" #include "FrameStats.hpp" - +#include "UdpRecvConfig.hpp" + +#ifdef USE_EIGER + #include "eiger.hpp" +#else + # include "jungfrau.hpp" +#endif + using namespace std; using namespace chrono; using namespace buffer_config; using namespace BufferUtils; - int main (int argc, char *argv[]) { if (argc != 4) { @@ -30,46 +36,54 @@ int main (int argc, char *argv[]) { exit(-1); } - const auto config = UdpRecvConfig.from_json_file(string(argv[1])); + const auto config = UdpRecvConfig::from_json_file(string(argv[1])); const int module_id = atoi(argv[2]); const int bit_depth = atoi(argv[3]); - const int n_receivers = config.n_modules * config.n_submodules; + + if (DETECTOR_TYPE != config.detector_type) { + throw runtime_error("UDP recv version for " + DETECTOR_TYPE + + " but config for " + config.detector_type); + } + const auto udp_port = config.start_udp_port + module_id; - - FrameUdpReceiver receiver(module_id, udp_port, n_receivers, config.n_submodules, bit_depth); - RamBuffer frame_buffer(config.detector_name, n_receivers, config.n_submodules, bit_depth); - FrameStats stats(config.detector_name, n_receivers, module_id, bit_depth, STATS_TIME); + const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8; + const size_t N_PACKETS_PER_FRAME = FRAME_N_BYTES / DATA_BYTES_PER_PACKET; + + FrameUdpReceiver receiver(udp_port); + RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), + FRAME_N_BYTES, config.n_modules); + FrameStats stats(config.detector_name, config.n_modules, + module_id, bit_depth, STATS_TIME); auto ctx = zmq_ctx_new(); auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); ModuleFrame meta; - char* data = new char[MODULE_N_PIXELS * bit_depth / 8]; + meta.module_id = module_id; + meta.bit_depth = bit_depth; + + char* data = new char[FRAME_N_BYTES]; - uint64_t pulse_id_previous = 0; - uint64_t frame_index_previous = 0; - while (true) { + // Reset the metadata and frame buffer for the next frame. + meta.frame_index = 0; + memset(data, 0, FRAME_N_BYTES); - auto pulse_id = receiver.get_frame_from_udp(meta, data); + receiver.get_frame_from_udp(meta, data); - bool bad_pulse_id = false; - if ( ( meta.frame_index != (frame_index_previous+1) ) || - ( (meta.frame_index-frame_index_previous) <= 0 ) || - ( (meta.frame_index-frame_index_previous) > 1000 ) ){ - bad_pulse_id = true; - } else { - frame_buffer.write_frame(meta, data); + // Assign the image_id based on the detector type. +#ifdef USE_EIGER + const uint64_t image_id = meta.frame_index; +#else + const uint64_t image_id = meta.pulse_id; +#endif + meta.id = image_id; - zmq_send(socket, &pulse_id, sizeof(pulse_id), 0); - - } - - stats.record_stats(meta, bad_pulse_id); - - pulse_id_previous = pulse_id; - frame_index_previous = meta.frame_index; + frame_buffer.write_frame(meta, data); + zmq_send(socket, &image_id, sizeof(image_id), 0); + const bool is_good_frame = meta.n_recv_packets == N_PACKETS_PER_FRAME; + stats.record_stats(meta, is_good_frame); } delete[] data; From 38b4af3297ad7fbdc799ca99266c507a3e38be47 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 14:44:05 +0200 Subject: [PATCH 08/23] Adapt FrameStat for common udp receivers --- std-udp-recv/include/FrameStats.hpp | 14 +++++------- std-udp-recv/src/FrameStats.cpp | 34 +++++++++++------------------ 2 files changed, 18 insertions(+), 30 deletions(-) diff --git a/std-udp-recv/include/FrameStats.hpp b/std-udp-recv/include/FrameStats.hpp index da2d000..c144845 100644 --- a/std-udp-recv/include/FrameStats.hpp +++ b/std-udp-recv/include/FrameStats.hpp @@ -8,28 +8,24 @@ class FrameStats { const std::string detector_name_; - const int n_modules_; const int module_id_; - const int bit_depth_; const int n_packets_per_frame_; - size_t stats_time_; + const size_t stats_time_; int frames_counter_; int n_missed_packets_; int n_corrupted_frames_; - int n_corrupted_pulse_id_; std::chrono::time_point stats_interval_start_; void reset_counters(); void print_stats(); -public:////config.detector_name, n_receivers, module_id, bit_depth, STATS_TIME - FrameStats(const std::string &detector_name, - const int n_modules, +public: + FrameStats(std::string detector_name, const int module_id, - const int bit_depth, + const int n_packets_per_frame, const size_t stats_time); - void record_stats(const ModuleFrame &meta, const bool bad_pulse_id); + void record_stats(const ModuleFrame &meta); }; diff --git a/std-udp-recv/src/FrameStats.cpp b/std-udp-recv/src/FrameStats.cpp index 46bc8a7..c78f052 100644 --- a/std-udp-recv/src/FrameStats.cpp +++ b/std-udp-recv/src/FrameStats.cpp @@ -1,20 +1,18 @@ #include +#include #include "FrameStats.hpp" #include "date.h" using namespace std; using namespace chrono; FrameStats::FrameStats( - const std::string &detector_name, - const int n_modules, + string detector_name, const int module_id, - const int bit_depth, + const int n_packets_per_frame, const size_t stats_time) : - detector_name_(detector_name), - n_modules_(n_modules), + detector_name_(move(detector_name)), module_id_(module_id), - bit_depth_(bit_depth), - n_packets_per_frame_(bit_depth_ * MODULE_N_PIXELS / 8 / DATA_BYTES_PER_PACKET / n_modules), + n_packets_per_frame_(n_packets_per_frame), stats_time_(stats_time) { reset_counters(); @@ -25,36 +23,31 @@ void FrameStats::reset_counters() frames_counter_ = 0; n_missed_packets_ = 0; n_corrupted_frames_ = 0; - n_corrupted_pulse_id_ = 0; stats_interval_start_ = steady_clock::now(); } -void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id) +void FrameStats::record_stats(const ModuleFrame &meta) { - if (bad_pulse_id) { - n_corrupted_pulse_id_++; - } - if (meta.n_recv_packets < n_packets_per_frame_) { n_missed_packets_ += n_packets_per_frame_ - meta.n_recv_packets; n_corrupted_frames_++; + #ifdef DEBUG_OUTPUT using namespace date; - cout << " [" << std::chrono::system_clock::now(); - cout << "] [FrameStats::record_stats] :"; - cout << " meta.frame "<< meta.frame_index; + cout << " [" << std::chrono::system_clock::now() << "]"; + cout << " [FrameStats::record_stats] :"; + cout << " meta.pulse_id "<< meta.pulse_id; + cout << " meta.frame_index "<< meta.frame_index; cout << " || meta.n_recv_packets " << meta.n_recv_packets; cout << " || n_missed_packets_ " << n_missed_packets_; cout << endl; #endif - - } frames_counter_++; - auto time_passed = duration_cast( + const auto time_passed = duration_cast( steady_clock::now()-stats_interval_start_).count(); if (time_passed >= stats_time_*1000) { @@ -75,12 +68,11 @@ void FrameStats::print_stats() // Output in InfluxDB line protocol cout << "jf_udp_recv"; cout << ",detector_name=" << detector_name_; - cout << ",module_name=M" << module_id_; + cout << ",module_id=" << module_id_; cout << " "; cout << "n_missed_packets=" << n_missed_packets_ << "i"; cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i"; cout << ",repetition_rate=" << rep_rate << "i"; - cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i"; cout << " "; cout << timestamp; cout << endl; From 82a334212f355d274205c74dd36c141952c22e89 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 14:44:50 +0200 Subject: [PATCH 09/23] Add detector type to definitions --- std-udp-recv/include/eiger.hpp | 4 ++++ std-udp-recv/include/jungfrau.hpp | 2 ++ 2 files changed, 6 insertions(+) diff --git a/std-udp-recv/include/eiger.hpp b/std-udp-recv/include/eiger.hpp index 6eee1ea..d8a7b33 100644 --- a/std-udp-recv/include/eiger.hpp +++ b/std-udp-recv/include/eiger.hpp @@ -3,6 +3,10 @@ #include +#define IS_BOTTOM(n) ((n%2 != 0) ? -1 : 1) + +const std::string DETECTOR_TYPE = "eiger"; + #define N_MODULES 1 #define BYTES_PER_PACKET 4144 #define DATA_BYTES_PER_PACKET 4096 diff --git a/std-udp-recv/include/jungfrau.hpp b/std-udp-recv/include/jungfrau.hpp index 455f21e..b7ad43d 100644 --- a/std-udp-recv/include/jungfrau.hpp +++ b/std-udp-recv/include/jungfrau.hpp @@ -3,6 +3,8 @@ #include +const std::string DETECTOR_TYPE = "jungfrau"; + #define N_MODULES 32 #define BYTES_PER_PACKET 8240 #define DATA_BYTES_PER_PACKET 8192 From 7b4b750914dca8e29944cef68c4499767ad4491d Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 14:45:09 +0200 Subject: [PATCH 10/23] Adjust formats to latest standard --- core-buffer/include/formats.hpp | 33 +++------------------------------ 1 file changed, 3 insertions(+), 30 deletions(-) diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index bbfb60b..17504a2 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -3,25 +3,19 @@ #include "buffer_config.hpp" -#ifndef USE_EIGER -#include "jungfrau.hpp" -#else -#include "eiger.hpp" -#endif - -#define IS_BOTTOM(n) ((n%2 != 0) ? -1 : 1) #pragma pack(push) #pragma pack(1) struct ModuleFrame { + uint64_t id; uint64_t pulse_id; uint64_t frame_index; uint64_t daq_rec; uint64_t n_recv_packets; uint64_t module_id; uint16_t bit_depth; - uint16_t row; - uint16_t column; + uint16_t pos_y; + uint16_t pos_x; }; #pragma pack(pop) @@ -36,26 +30,5 @@ struct ImageMetadata { }; #pragma pack(pop) -//struct ModuleFrameBuffer { -// ModuleFrame module[N_MODULES]; -//}; - -//#pragma pack(push) -//#pragma pack(1) -//struct BufferBinaryFormat { -// const char FORMAT_MARKER = 0xBE; -// ModuleFrame meta; -// char data[MODULE_N_BYTES]; -//}; -//#pragma pack(pop) - -//#pragma pack(push) -//#pragma pack(1) -//struct BufferBinaryBlock -//{ -// BufferBinaryFormat frame[buffer_config::BUFFER_BLOCK_SIZE]; -// uint64_t start_pulse_id; -//}; -//#pragma pack(pop) #endif //SF_DAQ_BUFFER_FORMATS_HPP From 0082c30a668cc01295cd8e66223b1363173eb136 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 14:47:58 +0200 Subject: [PATCH 11/23] Adjust the ImageMetadata format --- core-buffer/include/formats.hpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index 17504a2..5cd8286 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -23,10 +23,15 @@ struct ModuleFrame { #pragma pack(push) #pragma pack(1) struct ImageMetadata { - uint64_t pulse_id; - uint64_t frame_index; - uint32_t daq_rec; - uint32_t is_good_image; + uint64_t id; + uint64_t height; + uint64_t width; + uint64_t dtype; + uint64_t encoding; + uint64_t source_id; + uint64_t status; + uint64_t user_1; + uint64_t user_2; }; #pragma pack(pop) From 7b8a065d8c4e1fc64a2767bc5a4322630402afab Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 14:48:41 +0200 Subject: [PATCH 12/23] Remove unused include --- core-buffer/include/formats.hpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index 5cd8286..7ae731f 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -1,9 +1,6 @@ #ifndef SF_DAQ_BUFFER_FORMATS_HPP #define SF_DAQ_BUFFER_FORMATS_HPP -#include "buffer_config.hpp" - - #pragma pack(push) #pragma pack(1) struct ModuleFrame { From 566820ed0c05197de1b05797dcc8fe6bcc92ca16 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 14:49:31 +0200 Subject: [PATCH 13/23] Remove static buffer config This needs to be calculated based on the bit depth --- core-buffer/include/buffer_config.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index f33e601..c6f64be 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -24,7 +24,6 @@ namespace buffer_config { const size_t BUFFER_BLOCK_SIZE = 100; - const size_t BUFFER_UDP_N_RECV_MSG = 128; // Size of UDP recv buffer const int BUFFER_UDP_RCVBUF_N_SLOTS = 100; // 8246 bytes for each UDP packet. From 78899d319f15e7d0b3b865c580f81cfdcf0d8389 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 14:50:39 +0200 Subject: [PATCH 14/23] Adjust FrameStats invocation --- std-udp-recv/src/main.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/std-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp index 3d4010f..45d522a 100644 --- a/std-udp-recv/src/main.cpp +++ b/std-udp-recv/src/main.cpp @@ -52,8 +52,8 @@ int main (int argc, char *argv[]) { FrameUdpReceiver receiver(udp_port); RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), FRAME_N_BYTES, config.n_modules); - FrameStats stats(config.detector_name, config.n_modules, - module_id, bit_depth, STATS_TIME); + FrameStats stats(config.detector_name, module_id, + N_PACKETS_PER_FRAME, STATS_TIME); auto ctx = zmq_ctx_new(); auto socket = bind_socket(ctx, config.detector_name, to_string(module_id)); From f9e8b2750a79162c59221dafd217701b0f1b0253 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 15:02:09 +0200 Subject: [PATCH 15/23] Fix call to FrameUdpReceiver --- jf-live-writer/src/main.cpp | 2 +- std-udp-recv/src/main.cpp | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/jf-live-writer/src/main.cpp b/jf-live-writer/src/main.cpp index 179e3aa..e2940e7 100644 --- a/jf-live-writer/src/main.cpp +++ b/jf-live-writer/src/main.cpp @@ -62,7 +62,7 @@ int main (int argc, char *argv[]) // Fair distribution of images among writers. if (meta.i_image % n_writers == i_writer) { - char* data = ram_buffer.read_image(meta.image_metadata.pulse_id); + char* data = ram_buffer.get_slot_data(meta.image_metadata.pulse_id); stats.start_image_write(); writer.write_data(meta.run_id, meta.i_image, data); diff --git a/std-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp index 45d522a..e162800 100644 --- a/std-udp-recv/src/main.cpp +++ b/std-udp-recv/src/main.cpp @@ -49,7 +49,7 @@ int main (int argc, char *argv[]) { const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8; const size_t N_PACKETS_PER_FRAME = FRAME_N_BYTES / DATA_BYTES_PER_PACKET; - FrameUdpReceiver receiver(udp_port); + FrameUdpReceiver receiver(udp_port, N_PACKETS_PER_FRAME); RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), FRAME_N_BYTES, config.n_modules); FrameStats stats(config.detector_name, module_id, @@ -82,8 +82,7 @@ int main (int argc, char *argv[]) { frame_buffer.write_frame(meta, data); zmq_send(socket, &image_id, sizeof(image_id), 0); - const bool is_good_frame = meta.n_recv_packets == N_PACKETS_PER_FRAME; - stats.record_stats(meta, is_good_frame); + stats.record_stats(meta); } delete[] data; From a64ea62c88bba5d55d8c04e82f4956d47ef4bb67 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 15:13:46 +0200 Subject: [PATCH 16/23] Isolate detector differences to FrameUdpRecv --- std-udp-recv/include/FrameUdpReceiver.hpp | 24 ++++--- std-udp-recv/src/FrameUdpReceiver.cpp | 86 ++++++++++------------- std-udp-recv/src/main.cpp | 9 +-- 3 files changed, 53 insertions(+), 66 deletions(-) diff --git a/std-udp-recv/include/FrameUdpReceiver.hpp b/std-udp-recv/include/FrameUdpReceiver.hpp index 8487e0f..6038bf7 100644 --- a/std-udp-recv/include/FrameUdpReceiver.hpp +++ b/std-udp-recv/include/FrameUdpReceiver.hpp @@ -6,18 +6,20 @@ #include "formats.hpp" #include "buffer_config.hpp" +#ifdef USE_EIGER + #include "eiger.hpp" +#else + #include "jungfrau.hpp" +#endif + class FrameUdpReceiver { - const int module_id_; - const int bit_depth_; - const size_t n_packets_per_frame_; - const size_t data_bytes_per_frame_; - PacketUdpReceiver udp_receiver_; + const int n_packets_per_frame_; - det_packet packet_buffer_[buffer_config::BUFFER_UDP_N_RECV_MSG]; - iovec recv_buff_ptr_[buffer_config::BUFFER_UDP_N_RECV_MSG]; - mmsghdr msgs_[buffer_config::BUFFER_UDP_N_RECV_MSG]; - sockaddr_in sock_from_[buffer_config::BUFFER_UDP_N_RECV_MSG]; + det_packet* const packet_buffer_; + iovec* const recv_buff_ptr_; + mmsghdr* const msgs_; + sockaddr_in* const sock_from_; bool packet_buffer_loaded_ = false; int packet_buffer_n_packets_ = 0; @@ -30,9 +32,9 @@ class FrameUdpReceiver { const int n_packets, ModuleFrame& metadata, char* frame_buffer); public: - FrameUdpReceiver(const int module_id, const uint16_t port, const int n_modules, const int n_submodules, const int bit_depth); + FrameUdpReceiver(const uint16_t port, const int n_packets_per_frame); virtual ~FrameUdpReceiver(); - uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); + uint64_t get_frame_from_udp(ModuleFrame& meta, char* frame_buffer); }; diff --git a/std-udp-recv/src/FrameUdpReceiver.cpp b/std-udp-recv/src/FrameUdpReceiver.cpp index 72f7704..b6b7901 100644 --- a/std-udp-recv/src/FrameUdpReceiver.cpp +++ b/std-udp-recv/src/FrameUdpReceiver.cpp @@ -3,37 +3,32 @@ #include #include #include -#include #include "date.h" using namespace std; using namespace buffer_config; FrameUdpReceiver::FrameUdpReceiver( - const int module_id, - const uint16_t port, - const int n_modules, - const int n_submodules, - const int bit_depth): - module_id_(module_id), - bit_depth_(bit_depth), - n_packets_per_frame_(bit_depth_ * MODULE_N_PIXELS / 8 / DATA_BYTES_PER_PACKET / n_modules * n_submodules), - data_bytes_per_frame_(n_packets_per_frame_ * DATA_BYTES_PER_PACKET) + const uint16_t port, const int n_packets_per_frame) : + n_packets_per_frame_(n_packets_per_frame), + packet_buffer_(new det_packet[n_packets_per_frame_]), + recv_buff_ptr_(new iovec[n_packets_per_frame_]), + msgs_(new mmsghdr[n_packets_per_frame_]), + sock_from_(new sockaddr_in[n_packets_per_frame_]) { #ifdef DEBUG_OUTPUT using namespace date; cout << " [" << std::chrono::system_clock::now(); cout << "] [FrameUdpReceiver::FrameUdpReceiver] :"; cout << " Details of FrameUdpReceiver:"; - cout << "module_id: " << module_id_; cout << " || port: " << port; - cout << " || bit_depth : " << bit_depth_; - cout << " || n_packets_per_frame_ : " << n_packets_per_frame_; - cout << " || data_bytes_per_frame_: " << data_bytes_per_frame_ << " !!"; + cout << " || n_packets_per_frame: " << n_packets_per_frame_; cout << endl; #endif + udp_receiver_.bind(port); - for (int i = 0; i < BUFFER_UDP_N_RECV_MSG; i++) { + + for (int i = 0; i < n_packets_per_frame_; i++) { recv_buff_ptr_[i].iov_base = (void*) &(packet_buffer_[i]); recv_buff_ptr_[i].iov_len = sizeof(det_packet); @@ -46,28 +41,28 @@ FrameUdpReceiver::FrameUdpReceiver( FrameUdpReceiver::~FrameUdpReceiver() { udp_receiver_.disconnect(); + + delete[] packet_buffer_; + delete[] recv_buff_ptr_; + delete[] msgs_; + delete[] sock_from_; } inline void FrameUdpReceiver::init_frame( ModuleFrame& frame_metadata, const int i_packet) { - // Eiger has no pulse_id, frame number instead - frame_metadata.pulse_id = packet_buffer_[i_packet].framenum; + frame_metadata.pulse_id = packet_buffer_[i_packet].bunchid; frame_metadata.frame_index = packet_buffer_[i_packet].framenum; frame_metadata.daq_rec = (uint64_t) packet_buffer_[i_packet].debug; - frame_metadata.module_id = (int64_t) module_id_; - - frame_metadata.bit_depth = (int16_t) bit_depth_; - frame_metadata.row = (int16_t) packet_buffer_[i_packet].row; - frame_metadata.column = (int16_t) packet_buffer_[i_packet].column; + frame_metadata.pos_y = (int16_t) packet_buffer_[i_packet].row; + frame_metadata.pos_x = (int16_t) packet_buffer_[i_packet].column; #ifdef DEBUG_OUTPUT using namespace date; cout << " [" << std::chrono::system_clock::now(); cout << "] [FrameUdpReceiver::init_frame] :"; - cout << "module_id: " << module_id_; - cout << " || row: " << frame_metadata.row; - cout << " || column: " << frame_metadata.column; + cout << " || pos_y: " << frame_metadata.pos_x; + cout << " || pos_x: " << frame_metadata.pos_x; cout << " || pulse_id: " << frame_metadata.pulse_id; cout << " || frame_index: " << frame_metadata.frame_index; cout << endl; @@ -86,7 +81,6 @@ inline void FrameUdpReceiver::copy_packet_to_buffers( DATA_BYTES_PER_PACKET); metadata.n_recv_packets++; - // cout << "[ frame" << metadata.frame_index << "] NUMBER OF RECV PACKETS : " << metadata.n_recv_packets ; } inline uint64_t FrameUdpReceiver::process_packets( @@ -99,11 +93,12 @@ inline uint64_t FrameUdpReceiver::process_packets( i_packet++) { // First packet for this frame. - if (metadata.pulse_id == 0) { + if (metadata.frame_index == 0) { init_frame(metadata, i_packet); // Happens if the last packet from the previous frame gets lost. - // In the jungfrau_packet, framenum is the trigger number (how many triggers from detector power-on) happened + // In the jungfrau_packet, framenum is the trigger number + // (how many triggers from detector power-on) happened } else if (metadata.frame_index != packet_buffer_[i_packet].framenum) { packet_buffer_loaded_ = true; // Continue on this packet. @@ -121,19 +116,19 @@ inline uint64_t FrameUdpReceiver::process_packets( #ifdef DEBUG_OUTPUT using namespace date; cout << " [" << std::chrono::system_clock::now(); - cout << "] [FrameUdpReceiver::process_packets] :"; - cout << " Frame " << metadata.frame_index << " || "; + cout << "] [frameudpreceiver::process_packets] :"; + cout << " frame " << metadata.frame_index << " || "; cout << packet_buffer_[i_packet].packetnum << " packets received."; - cout << " PULSE ID "<< metadata.pulse_id; + cout << " pulse id "<< metadata.pulse_id; cout << endl; #endif - // Buffer is loaded only if this is not the last message. + // buffer is loaded only if this is not the last message. if (i_packet+1 != packet_buffer_n_packets_) { packet_buffer_loaded_ = true; - // Continue on next packet. + // continue on next packet. packet_buffer_offset_ = i_packet + 1; - // If i_packet is the last packet the buffer is empty. + // if i_packet is the last packet the buffer is empty. } else { packet_buffer_loaded_ = false; packet_buffer_offset_ = 0; @@ -150,36 +145,31 @@ inline uint64_t FrameUdpReceiver::process_packets( } uint64_t FrameUdpReceiver::get_frame_from_udp( - ModuleFrame& metadata, char* frame_buffer) + ModuleFrame& meta, char* frame_buffer) { - // Reset the metadata and frame buffer for the next frame. - metadata.pulse_id = 0; - metadata.n_recv_packets = 0; - memset(frame_buffer, 0, data_bytes_per_frame_); - // Happens when last packet from previous frame was missed. if (packet_buffer_loaded_) { - auto pulse_id = process_packets( - packet_buffer_offset_, metadata, frame_buffer); - if (pulse_id != 0) { - return pulse_id; + auto frame_index = process_packets( + packet_buffer_offset_, meta, frame_buffer); + if (frame_index != 0) { + return frame_index; } } while (true) { packet_buffer_n_packets_ = udp_receiver_.receive_many( - msgs_, BUFFER_UDP_N_RECV_MSG); + msgs_, n_packets_per_frame_); if (packet_buffer_n_packets_ == 0) { continue; } - auto pulse_id = process_packets(0, metadata, frame_buffer); + auto frame_index = process_packets(0, meta, frame_buffer); - if (pulse_id != 0) { - return pulse_id; + if (frame_index != 0) { + return frame_index; } } } diff --git a/std-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp index e162800..b05a03f 100644 --- a/std-udp-recv/src/main.cpp +++ b/std-udp-recv/src/main.cpp @@ -10,12 +10,6 @@ #include "FrameStats.hpp" #include "UdpRecvConfig.hpp" -#ifdef USE_EIGER - #include "eiger.hpp" -#else - # include "jungfrau.hpp" -#endif - using namespace std; using namespace chrono; using namespace buffer_config; @@ -27,7 +21,8 @@ int main (int argc, char *argv[]) { if (argc != 4) { cout << endl; - cout << "Usage: std_udp_recv [udp_recv_config_filename] [module_id] [bit_depth]"; + cout << "Usage: std_udp_recv [udp_recv_config_filename] [module_id] " + "[bit_depth]"; cout << endl; cout << "\tudp_recv_config_filename: detector config file path." << endl; cout << "\tmodule_id: id of the module for this process." << endl; From c40ffb8b125f06980dafbdf318172dc662950065 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 15:30:28 +0200 Subject: [PATCH 17/23] Fix name of udp recv processes --- std-udp-recv/CMakeLists.txt | 8 +------- std-udp-recv/src/main.cpp | 1 - 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/std-udp-recv/CMakeLists.txt b/std-udp-recv/CMakeLists.txt index 324e603..001dbf0 100644 --- a/std-udp-recv/CMakeLists.txt +++ b/std-udp-recv/CMakeLists.txt @@ -9,13 +9,7 @@ target_link_libraries(std-udp-recv-lib add_executable(std-udp-recv src/main.cpp) -if (USE_EIGER) - set (LIB_NAME_UDP_RECV "eiger_udp_recv") -else() - set (LIB_NAME_UDP_RECV "jf_udp_recv") -endif() - -set_target_properties(std-udp-recv PROPERTIES OUTPUT_NAME ${LIB_NAME_UDP_RECV}) +set_target_properties(std-udp-recv PROPERTIES OUTPUT_NAME std_udp_recv) target_link_libraries(std-udp-recv std-udp-recv-lib diff --git a/std-udp-recv/src/main.cpp b/std-udp-recv/src/main.cpp index b05a03f..c9c1a06 100644 --- a/std-udp-recv/src/main.cpp +++ b/std-udp-recv/src/main.cpp @@ -20,7 +20,6 @@ int main (int argc, char *argv[]) { if (argc != 4) { cout << endl; - cout << "Usage: std_udp_recv [udp_recv_config_filename] [module_id] " "[bit_depth]"; cout << endl; From 853effc00692b588c570b760a26b40601d6c16ee Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 15:46:38 +0200 Subject: [PATCH 18/23] Fix stats name output --- std-udp-recv/src/FrameStats.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/std-udp-recv/src/FrameStats.cpp b/std-udp-recv/src/FrameStats.cpp index c78f052..22948d1 100644 --- a/std-udp-recv/src/FrameStats.cpp +++ b/std-udp-recv/src/FrameStats.cpp @@ -66,7 +66,7 @@ void FrameStats::print_stats() system_clock::now()).time_since_epoch().count(); // Output in InfluxDB line protocol - cout << "jf_udp_recv"; + cout << "std_udp_recv,"; cout << ",detector_name=" << detector_name_; cout << ",module_id=" << module_id_; cout << " "; From 64375de624bc2e193d1c538b975556fbdd411a08 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 16:07:27 +0200 Subject: [PATCH 19/23] Move detector configs to common --- {std-udp-recv => core-buffer}/include/eiger.hpp | 0 {std-udp-recv => core-buffer}/include/jungfrau.hpp | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename {std-udp-recv => core-buffer}/include/eiger.hpp (100%) rename {std-udp-recv => core-buffer}/include/jungfrau.hpp (100%) diff --git a/std-udp-recv/include/eiger.hpp b/core-buffer/include/eiger.hpp similarity index 100% rename from std-udp-recv/include/eiger.hpp rename to core-buffer/include/eiger.hpp diff --git a/std-udp-recv/include/jungfrau.hpp b/core-buffer/include/jungfrau.hpp similarity index 100% rename from std-udp-recv/include/jungfrau.hpp rename to core-buffer/include/jungfrau.hpp From 9ff380e78af179a650e86420bbff7f2de54263b2 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 16:07:47 +0200 Subject: [PATCH 20/23] Remove unused import --- core-buffer/include/RamBuffer.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/core-buffer/include/RamBuffer.hpp b/core-buffer/include/RamBuffer.hpp index 7c5c55b..c778d19 100644 --- a/core-buffer/include/RamBuffer.hpp +++ b/core-buffer/include/RamBuffer.hpp @@ -3,6 +3,7 @@ #include #include "formats.hpp" +#include "buffer_config.hpp" class RamBuffer { const std::string buffer_name_; From eb9c87cf2f8f115c1c97cf1b79294063544ad334 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 16:12:53 +0200 Subject: [PATCH 21/23] Add std-udp-sync project --- CMakeLists.txt | 6 +- std-udp-sync/CMakeLists.txt | 22 +++ std-udp-sync/README.md | 179 ++++++++++++++++++ std-udp-sync/include/SyncStats.hpp | 27 +++ std-udp-sync/include/UdpSyncConfig.hpp | 29 +++ std-udp-sync/include/ZmqPulseSyncReceiver.hpp | 34 ++++ std-udp-sync/include/sync_config.hpp | 11 ++ std-udp-sync/src/SyncStats.cpp | 55 ++++++ std-udp-sync/src/ZmqPulseSyncReceiver.cpp | 127 +++++++++++++ std-udp-sync/src/main.cpp | 69 +++++++ std-udp-sync/test/CMakeLists.txt | 7 + std-udp-sync/test/main.cpp | 8 + 12 files changed, 570 insertions(+), 4 deletions(-) create mode 100644 std-udp-sync/CMakeLists.txt create mode 100644 std-udp-sync/README.md create mode 100644 std-udp-sync/include/SyncStats.hpp create mode 100644 std-udp-sync/include/UdpSyncConfig.hpp create mode 100644 std-udp-sync/include/ZmqPulseSyncReceiver.hpp create mode 100644 std-udp-sync/include/sync_config.hpp create mode 100644 std-udp-sync/src/SyncStats.cpp create mode 100644 std-udp-sync/src/ZmqPulseSyncReceiver.cpp create mode 100644 std-udp-sync/src/main.cpp create mode 100644 std-udp-sync/test/CMakeLists.txt create mode 100644 std-udp-sync/test/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 4365202..1bbfa25 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,12 +32,10 @@ add_subdirectory( add_subdirectory("core-buffer") add_subdirectory("std-udp-recv") +add_subdirectory("std-udp-sync") #add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") add_subdirectory("sf-stream") #add_subdirectory("sf-writer") - -if(BUILD_JF_LIVE_WRITER) - add_subdirectory("jf-live-writer") -endif() +add_subdirectory("jf-live-writer") diff --git a/std-udp-sync/CMakeLists.txt b/std-udp-sync/CMakeLists.txt new file mode 100644 index 0000000..7466d1e --- /dev/null +++ b/std-udp-sync/CMakeLists.txt @@ -0,0 +1,22 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(std-udp-sync-lib STATIC ${SOURCES}) +target_include_directories(std-udp-sync-lib PUBLIC include/) +target_link_libraries(std-udp-sync-lib + external + core-buffer-lib) + +add_executable(std-udp-sync src/main.cpp) + +set_target_properties(std-udp-sync PROPERTIES OUTPUT_NAME std_udp_sync) +target_link_libraries(std-udp-sync + external + core-buffer-lib + std-udp-sync-lib + zmq + pthread + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/std-udp-sync/README.md b/std-udp-sync/README.md new file mode 100644 index 0000000..ce21a4d --- /dev/null +++ b/std-udp-sync/README.md @@ -0,0 +1,179 @@ +# sf-stream +sf-stream is the component that receives a live stream of frame data from +sf-buffers over ZMQ and assembles them into images. This images are then +sent again over ZMQ to external components. There is always only 1 sf-stream +per detector. + +It currently has 3 output streams: + +- **Full data full meta** rate stream (send all images and meta) +- **Reduced data full meta** rate stream (send less images, but +all meta) +- **Pulse_id** stream (send only the current pulse_id) + +In addition to receiving and assembling images, sf-stream also calculates +additional meta and constructs the structures needed to send data in +Array 1.0 protocol. + +This component does not guarantee that the streams will always contain all +the data - it can happen that frame resynchronization is needed, and in this +case 1 or more frames could potentially be lost. This happens so rarely that in +practice is not a problem. + +## Overview + +![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg) + +sf-stream is a single threaded application (without counting the ZMQ IO threads) +that is used for providing live assembled images to anyone willing to listen. + +In addition, it also provides a pulse_id stream, which is the most immediate +pulse_id feedback we currently have in case we need to synchronize external +components to the current machine pulse_id. + +## ZMQ receiving +Each ZMQ stream is coming from a separate sf-buffer. This means that we have as +many connections as we have modules in a detector. + +Messages are multipart (2 parts) and are received in PUB/SUB mode. + +There is no need for special synchronization between modules as we expect that +frames will always be in the correct order and all modules will provide the +same frame more or less at the same time. If any of this 2 conditions is not +met, the detector is not working properly and we cannot guaranty that sf-stream +will work correctly. + +Nonetheless we provide the capability to synchronize the streams in image +assembly phase - this is needed rarely, but occasionally happens. In this sort +of hiccups we usually loose only a couple of consecutive images. + +### Messages format +Each message is composed by 2 parts: + +- Serialization of ModuleFrame in the first part. +- Frame data in the second part. + +Module frame is defined as: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) +``` + +The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in +**uint16** representing the detector image. + +## Image assembly +We first synchronize the modules. We do this by reading all sockets and +deciding the largest frame pulse_id among them (max_pulse_id). We then calculate +the diff between a specific socket pulse_id and the max_pulse_id. +This difference tells us how many messages we need to discard from a specific socket. + +This discarding is the source of possible missing images in the output stream. +It can happen in 3 cases: + +- At least one of the detector modules did not sent any packets for the specific +pulse_id. +- All the packets from a specific module for a pulse_id were lost before UDP +receiving them. +- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was +dropped. + +All this 3 cases are highly unlikely, so synchronization is mostly needed when +first starting sf-stream. Different sockets connect to sf-buffers at different +times. Apart from the initial synchronization there should be no need to +re-synchronize modules in a healthy running environment. + +If an image is missing any ZMQ messages from sf-buffers (not all modules data +arrived), the image will be dropped. We do not do partial reconstruction in +sf-stream. However, it is important to note, that this does not cover the case +where frames are incomplete (missing UDP packets on sf-buffer) - we still +assemble this images as long as at least 1 packet/frame for a specific pulse_id +arrived. + +## ZMQ sending + +We devide the ZMQ sending to 3 types of stream: + +- Data processing stream. This is basically the complete stream from +the detector with all meta and data. It can be described as full data full +meta stream. Only 1 client at the time can be connected to this stream +(PUSH/PULL for load balancing). + +- Live viewing stream. This is a reduced data full meta stream. We send +meta for all frames, but data only for subset of them (10Hz, for example). +Any number of clients can connect to the 10Hz stream, because we use PUB/SUB +for this socket. + +- Pulse_id stream. This is a stream that sends out only the current pulse_id. +It can be used to synchronize any external system with the current pulse_id +being recorded. Multiple clients can connect to this stream. + +In the data processing and live viewing stream we use +[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md) +as our protocol to be compatible with currently available external components. + +We use following fields in the JSON header: + +| Name | Type | Comment | +| --- | --- | --- | +| pulse_id | uint64 |bunchid from detector header| +|frame|uint64|frame_index from detector header| +|is_good_frame|bool|true if all packets for this frame are present| +|daq_rec|uint32|daqrec from detector header| +|pedestal_file|string|Path to pedestal file| +|gain_file|string|Path to gain file| +|number_frames_expected|int|Number of expected frames| +|run_name|string|Name of the run| +|detector_name|string|Name of the detector| +|htype|string|Value: "array-1.0"| +|type|string|Value: "uint16"| +|shape|Array[uint64]|Shape of the image in stream| + +### Full data full meta stream + +This stream runs at detector frequency and uses PUSH/PULL to distribute data +to max 1 client (this client can have many processes, but it needs to be a +single logical entity, since the images are evenly distributed to all +connected sockets). + +![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg) + +The goal here is to provide a complete copy of the detector image stream +for purposes of online analysis. Given the large amount of data on this +stream only "pre-approved" applications that can handle the load should be +attached here. + +### Reduced data full meta stream + +This streams also runs at detector frequency for JSON headers (meta), but +it sends only part of the images in the stream. The rest of the images are +sent as empty buffers (the receiver needs to be aware of this behaviour, as +Array 1.0 alone does not define it). + +![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg) + +This is the lightweight version of the image stream. Any number of clients +can connect to this stream (PUB/SUB) but no client can do load +balancing automatically (it would require PUSH/PULL). + +This is a "public interface" for anyone who wants to get detector data live, +and can do with only a subset of images. + +### Pulse_id stream + +This stream runs ar detector frequency in PUB/SUB mode. The only thing it +does is sends out the pulse_id (of the just received image) in uint64_t +format. + +![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg) + +This is also a "public interface" for anyone who wants to get the current +system pulse_id. \ No newline at end of file diff --git a/std-udp-sync/include/SyncStats.hpp b/std-udp-sync/include/SyncStats.hpp new file mode 100644 index 0000000..18b9d1d --- /dev/null +++ b/std-udp-sync/include/SyncStats.hpp @@ -0,0 +1,27 @@ +#ifndef SF_DAQ_BUFFER_SYNCSTATS_HPP +#define SF_DAQ_BUFFER_SYNCSTATS_HPP + +#include +#include +#include + +class SyncStats { + const std::string detector_name_; + const size_t stats_modulo_; + + int image_counter_; + int n_sync_lost_images_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + SyncStats(const std::string &detector_name, + const size_t stats_modulo); + + void record_stats(const uint32_t n_lost_pulses); +}; + + +#endif //SF_DAQ_BUFFER_SYNCSTATS_HPP diff --git a/std-udp-sync/include/UdpSyncConfig.hpp b/std-udp-sync/include/UdpSyncConfig.hpp new file mode 100644 index 0000000..89d735a --- /dev/null +++ b/std-udp-sync/include/UdpSyncConfig.hpp @@ -0,0 +1,29 @@ +#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP +#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP + + +#include +#include +#include +#include +#include + +struct UdpSyncConfig { + static UdpSyncConfig from_json_file(const std::string& filename) { + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["detector_name"].GetString(), + config_parameters["n_modules"].GetInt() + }; + } + + const std::string detector_name; + const int n_modules; +}; + + +#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP diff --git a/std-udp-sync/include/ZmqPulseSyncReceiver.hpp b/std-udp-sync/include/ZmqPulseSyncReceiver.hpp new file mode 100644 index 0000000..26a5d5b --- /dev/null +++ b/std-udp-sync/include/ZmqPulseSyncReceiver.hpp @@ -0,0 +1,34 @@ +#ifndef SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP +#define SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP + + +#include +#include +#include + +#include "formats.hpp" + +struct PulseAndSync { + const uint64_t image_id; + const uint32_t n_lost_pulses; +}; + +class ZmqPulseSyncReceiver { + + void* ctx_; + const int n_modules_; + + std::vector sockets_; + +public: + ZmqPulseSyncReceiver( + void* ctx, + const std::string& detector_name, + const int n_modules); + ~ZmqPulseSyncReceiver(); + + PulseAndSync get_next_pulse_id() const; +}; + + +#endif //SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP diff --git a/std-udp-sync/include/sync_config.hpp b/std-udp-sync/include/sync_config.hpp new file mode 100644 index 0000000..ff8f500 --- /dev/null +++ b/std-udp-sync/include/sync_config.hpp @@ -0,0 +1,11 @@ +namespace sync_config +{ + // If the modules are offset more than 1000 pulses, crush. + const uint64_t PULSE_OFFSET_LIMIT = 100; + + // Number of times we try to re-sync in case of failure. + const int SYNC_RETRY_LIMIT = 3; + + // Number of pulses between each statistics print out. + const size_t SYNC_STATS_MODULO = 1000; +} diff --git a/std-udp-sync/src/SyncStats.cpp b/std-udp-sync/src/SyncStats.cpp new file mode 100644 index 0000000..e9bb76d --- /dev/null +++ b/std-udp-sync/src/SyncStats.cpp @@ -0,0 +1,55 @@ +#include "SyncStats.hpp" + +#include + +using namespace std; +using namespace chrono; + +SyncStats::SyncStats( + const std::string &detector_name, + const size_t stats_modulo) : + detector_name_(detector_name), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void SyncStats::reset_counters() +{ + image_counter_ = 0; + n_sync_lost_images_ = 0; + stats_interval_start_ = steady_clock::now(); +} + +void SyncStats::record_stats(const uint32_t n_lost_pulses) +{ + image_counter_++; + n_sync_lost_images_ += n_lost_pulses; + + if (image_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void SyncStats::print_stats() +{ + auto interval_ms_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + // * 1000 because milliseconds, + 250 because of truncation. + int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration; + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "std_udp_sync"; + cout << ",detector_name=" << detector_name_; + cout << " "; + cout << "n_processed_images=" << image_counter_ << "i"; + cout << ",n_sync_lost_images=" << n_sync_lost_images_ << "i"; + cout << ",repetition_rate=" << rep_rate << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} + diff --git a/std-udp-sync/src/ZmqPulseSyncReceiver.cpp b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp new file mode 100644 index 0000000..c8a87cb --- /dev/null +++ b/std-udp-sync/src/ZmqPulseSyncReceiver.cpp @@ -0,0 +1,127 @@ +#include "ZmqPulseSyncReceiver.hpp" +#include "BufferUtils.hpp" + +#include +#include +#include +#include +#include +#include "date.h" + +#include "sync_config.hpp" + +using namespace std; +using namespace chrono; +using namespace sync_config; + + +ZmqPulseSyncReceiver::ZmqPulseSyncReceiver( + void * ctx, + const string& detector_name, + const int n_modules) : + ctx_(ctx), + n_modules_(n_modules) +{ + sockets_.reserve(n_modules_); + + for (int i=0; i::max();; + uint64_t max_pulse_id = 0; + + for (int i = 0; i < n_modules_; i++) { + min_pulse_id = min(min_pulse_id, pulses[i]); + max_pulse_id = max(max_pulse_id, pulses[i]); + } + + auto max_diff = max_pulse_id - min_pulse_id; + if (max_diff > PULSE_OFFSET_LIMIT) { + stringstream err_msg; + err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]"; + err_msg << " PULSE_OFFSET_LIMIT exceeded."; + err_msg << " max_diff=" << max_diff << " pulses."; + + for (int i = 0; i < n_modules_; i++) { + err_msg << " (module " << i << ", "; + err_msg << pulses[i] << "),"; + } + err_msg << endl; + + throw runtime_error(err_msg.str()); + } + + modules_in_sync = true; + // Max pulses we lost in this sync attempt. + uint32_t i_sync_lost_pulses = 0; + for (int i = 0; i < n_modules_; i++) { + // How many pulses we lost for this specific module. + uint32_t i_module_lost_pulses = 0; + while (pulses[i] < max_pulse_id) { + zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); + i_module_lost_pulses++; + } + + i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses); + + if (pulses[i] != max_pulse_id) { + modules_in_sync = false; + } + } + n_lost_pulses += i_sync_lost_pulses; + + if (modules_in_sync) { + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id]"; + cout << " modules_in_sync false"; + cout << endl; + #endif + return {pulses[0], n_lost_pulses}; + } + } + + stringstream err_msg; + err_msg << "[ZmqLiveReceiver::get_next_pulse_id]"; + err_msg << " SYNC_RETRY_LIMIT exceeded."; + err_msg << endl; + + throw runtime_error(err_msg.str()); +} diff --git a/std-udp-sync/src/main.cpp b/std-udp-sync/src/main.cpp new file mode 100644 index 0000000..1068091 --- /dev/null +++ b/std-udp-sync/src/main.cpp @@ -0,0 +1,69 @@ +#include +#include +#include +#include +#include +#include + +#include "date.h" +#include +#include "sync_config.hpp" +#include "ZmqPulseSyncReceiver.hpp" +#include "UdpSyncConfig.hpp" + + +using namespace std; +using namespace sync_config; + +#ifdef USE_EIGER + #include "eiger.hpp" +#else + #include "jungfrau.hpp" +#endif + +int main (int argc, char *argv[]) +{ + if (argc != 3) { + cout << endl; + cout << "Usage: std_udp_sync [detector_json_filename] [bit_depth]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; + cout << endl; + + exit(-1); + } + + const auto config = UdpSyncConfig::from_json_file(string(argv[1])); + const int bit_depth = atoi(argv[2]); + + const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8; + + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, 1); + + auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "sync"); + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [Assembler] :"; + cout << " Details of Assembler:"; + cout << " detector_name: " << config.detector_name; + cout << " || n_modules: " << config.n_modules; + cout << endl; + #endif + + RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), + FRAME_N_BYTES, config.n_modules); + + ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); + SyncStats stats(config.detector_name, SYNC_STATS_MODULO); + + while (true) { + auto meta = receiver.get_next_pulse_id(); + + zmq_send(sender, &meta.image_id, sizeof(meta.image_id), 0); + + stats.record_stats(meta.n_lost_pulses); + } +} diff --git a/std-udp-sync/test/CMakeLists.txt b/std-udp-sync/test/CMakeLists.txt new file mode 100644 index 0000000..01fc383 --- /dev/null +++ b/std-udp-sync/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(std-udp-sync-tests main.cpp) + +target_link_libraries(std-udp-sync-tests + std-udp-sync-lib + gtest + ) + diff --git a/std-udp-sync/test/main.cpp b/std-udp-sync/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/std-udp-sync/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From 13547a5fef25cd1188304ff1b514abd3463211db Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 17:58:44 +0200 Subject: [PATCH 22/23] Remove stream send component --- CMakeLists.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1bbfa25..19194fc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,11 +31,12 @@ add_subdirectory( EXCLUDE_FROM_ALL) add_subdirectory("core-buffer") +#add_subdirectory("std-stream-send") add_subdirectory("std-udp-recv") add_subdirectory("std-udp-sync") #add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") -add_subdirectory("sf-stream") +#add_subdirectory("sf-stream") #add_subdirectory("sf-writer") add_subdirectory("jf-live-writer") From 9d56da96f91b5af61ac2ceaa05ef3b2bfb43a6ae Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 5 Jul 2021 17:59:09 +0200 Subject: [PATCH 23/23] Add stub of std-stream-send --- std-stream-send/CMakeLists.txt | 18 ++ std-stream-send/README.md | 179 +++++++++++++++++++ std-stream-send/build_stdstream.sh | 10 ++ std-stream-send/debug.Dockerfile | 13 ++ std-stream-send/include/StreamSendConfig.hpp | 33 ++++ std-stream-send/include/stream_config.hpp | 14 ++ std-stream-send/src/main.cpp | 52 ++++++ std-stream-send/test/CMakeLists.txt | 7 + std-stream-send/test/main.cpp | 8 + 9 files changed, 334 insertions(+) create mode 100644 std-stream-send/CMakeLists.txt create mode 100644 std-stream-send/README.md create mode 100644 std-stream-send/build_stdstream.sh create mode 100644 std-stream-send/debug.Dockerfile create mode 100644 std-stream-send/include/StreamSendConfig.hpp create mode 100644 std-stream-send/include/stream_config.hpp create mode 100644 std-stream-send/src/main.cpp create mode 100644 std-stream-send/test/CMakeLists.txt create mode 100644 std-stream-send/test/main.cpp diff --git a/std-stream-send/CMakeLists.txt b/std-stream-send/CMakeLists.txt new file mode 100644 index 0000000..3d8b419 --- /dev/null +++ b/std-stream-send/CMakeLists.txt @@ -0,0 +1,18 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(std-stream-send-lib STATIC ${SOURCES}) +target_include_directories(std-stream-send-lib PUBLIC include/) +target_link_libraries(std-stream-send-lib + core-buffer-lib) + +add_executable(std-stream-send src/main.cpp) +set_target_properties(std-stream-send PROPERTIES OUTPUT_NAME std_stream_send) +target_link_libraries(std-stream-send + std-stream-send-lib + zmq + pthread + rt) + +enable_testing() +add_subdirectory(test/) diff --git a/std-stream-send/README.md b/std-stream-send/README.md new file mode 100644 index 0000000..ce21a4d --- /dev/null +++ b/std-stream-send/README.md @@ -0,0 +1,179 @@ +# sf-stream +sf-stream is the component that receives a live stream of frame data from +sf-buffers over ZMQ and assembles them into images. This images are then +sent again over ZMQ to external components. There is always only 1 sf-stream +per detector. + +It currently has 3 output streams: + +- **Full data full meta** rate stream (send all images and meta) +- **Reduced data full meta** rate stream (send less images, but +all meta) +- **Pulse_id** stream (send only the current pulse_id) + +In addition to receiving and assembling images, sf-stream also calculates +additional meta and constructs the structures needed to send data in +Array 1.0 protocol. + +This component does not guarantee that the streams will always contain all +the data - it can happen that frame resynchronization is needed, and in this +case 1 or more frames could potentially be lost. This happens so rarely that in +practice is not a problem. + +## Overview + +![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg) + +sf-stream is a single threaded application (without counting the ZMQ IO threads) +that is used for providing live assembled images to anyone willing to listen. + +In addition, it also provides a pulse_id stream, which is the most immediate +pulse_id feedback we currently have in case we need to synchronize external +components to the current machine pulse_id. + +## ZMQ receiving +Each ZMQ stream is coming from a separate sf-buffer. This means that we have as +many connections as we have modules in a detector. + +Messages are multipart (2 parts) and are received in PUB/SUB mode. + +There is no need for special synchronization between modules as we expect that +frames will always be in the correct order and all modules will provide the +same frame more or less at the same time. If any of this 2 conditions is not +met, the detector is not working properly and we cannot guaranty that sf-stream +will work correctly. + +Nonetheless we provide the capability to synchronize the streams in image +assembly phase - this is needed rarely, but occasionally happens. In this sort +of hiccups we usually loose only a couple of consecutive images. + +### Messages format +Each message is composed by 2 parts: + +- Serialization of ModuleFrame in the first part. +- Frame data in the second part. + +Module frame is defined as: +```c++ +#pragma pack(push) +#pragma pack(1) +struct ModuleFrame { + uint64_t pulse_id; + uint64_t frame_index; + uint64_t daq_rec; + uint64_t n_recv_packets; + uint64_t module_id; +}; +#pragma pack(pop) +``` + +The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in +**uint16** representing the detector image. + +## Image assembly +We first synchronize the modules. We do this by reading all sockets and +deciding the largest frame pulse_id among them (max_pulse_id). We then calculate +the diff between a specific socket pulse_id and the max_pulse_id. +This difference tells us how many messages we need to discard from a specific socket. + +This discarding is the source of possible missing images in the output stream. +It can happen in 3 cases: + +- At least one of the detector modules did not sent any packets for the specific +pulse_id. +- All the packets from a specific module for a pulse_id were lost before UDP +receiving them. +- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was +dropped. + +All this 3 cases are highly unlikely, so synchronization is mostly needed when +first starting sf-stream. Different sockets connect to sf-buffers at different +times. Apart from the initial synchronization there should be no need to +re-synchronize modules in a healthy running environment. + +If an image is missing any ZMQ messages from sf-buffers (not all modules data +arrived), the image will be dropped. We do not do partial reconstruction in +sf-stream. However, it is important to note, that this does not cover the case +where frames are incomplete (missing UDP packets on sf-buffer) - we still +assemble this images as long as at least 1 packet/frame for a specific pulse_id +arrived. + +## ZMQ sending + +We devide the ZMQ sending to 3 types of stream: + +- Data processing stream. This is basically the complete stream from +the detector with all meta and data. It can be described as full data full +meta stream. Only 1 client at the time can be connected to this stream +(PUSH/PULL for load balancing). + +- Live viewing stream. This is a reduced data full meta stream. We send +meta for all frames, but data only for subset of them (10Hz, for example). +Any number of clients can connect to the 10Hz stream, because we use PUB/SUB +for this socket. + +- Pulse_id stream. This is a stream that sends out only the current pulse_id. +It can be used to synchronize any external system with the current pulse_id +being recorded. Multiple clients can connect to this stream. + +In the data processing and live viewing stream we use +[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md) +as our protocol to be compatible with currently available external components. + +We use following fields in the JSON header: + +| Name | Type | Comment | +| --- | --- | --- | +| pulse_id | uint64 |bunchid from detector header| +|frame|uint64|frame_index from detector header| +|is_good_frame|bool|true if all packets for this frame are present| +|daq_rec|uint32|daqrec from detector header| +|pedestal_file|string|Path to pedestal file| +|gain_file|string|Path to gain file| +|number_frames_expected|int|Number of expected frames| +|run_name|string|Name of the run| +|detector_name|string|Name of the detector| +|htype|string|Value: "array-1.0"| +|type|string|Value: "uint16"| +|shape|Array[uint64]|Shape of the image in stream| + +### Full data full meta stream + +This stream runs at detector frequency and uses PUSH/PULL to distribute data +to max 1 client (this client can have many processes, but it needs to be a +single logical entity, since the images are evenly distributed to all +connected sockets). + +![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg) + +The goal here is to provide a complete copy of the detector image stream +for purposes of online analysis. Given the large amount of data on this +stream only "pre-approved" applications that can handle the load should be +attached here. + +### Reduced data full meta stream + +This streams also runs at detector frequency for JSON headers (meta), but +it sends only part of the images in the stream. The rest of the images are +sent as empty buffers (the receiver needs to be aware of this behaviour, as +Array 1.0 alone does not define it). + +![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg) + +This is the lightweight version of the image stream. Any number of clients +can connect to this stream (PUB/SUB) but no client can do load +balancing automatically (it would require PUSH/PULL). + +This is a "public interface" for anyone who wants to get detector data live, +and can do with only a subset of images. + +### Pulse_id stream + +This stream runs ar detector frequency in PUB/SUB mode. The only thing it +does is sends out the pulse_id (of the just received image) in uint64_t +format. + +![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg) + +This is also a "public interface" for anyone who wants to get the current +system pulse_id. \ No newline at end of file diff --git a/std-stream-send/build_stdstream.sh b/std-stream-send/build_stdstream.sh new file mode 100644 index 0000000..d954ab2 --- /dev/null +++ b/std-stream-send/build_stdstream.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +VERSION=1.0.0 + +docker build --no-cache=true -f debug.Dockerfile -t paulscherrerinstitute/std-stream-send-sim . +docker tag paulscherrerinstitute/std-stream-send-sim paulscherrerinstitute/std-stream-send-sim:$VERSION + +docker login +docker push paulscherrerinstitute/std-stream-send-sim:$VERSION +docker push paulscherrerinstitute/std-stream-send-sim \ No newline at end of file diff --git a/std-stream-send/debug.Dockerfile b/std-stream-send/debug.Dockerfile new file mode 100644 index 0000000..97a772a --- /dev/null +++ b/std-stream-send/debug.Dockerfile @@ -0,0 +1,13 @@ +FROM paulscherrerinstitute/sf-daq_phdf5:1.0.0 + +COPY . /sf_daq_buffer/ + +RUN mkdir /sf_daq_buffer/build && \ + cd /sf_daq_buffer/build && \ + cmake3 .. && \ + make std-stream-send + +WORKDIR /sf_daq_buffer/build + +ENTRYPOINT ["/sf_daq_buffer/build/std_stream"] + diff --git a/std-stream-send/include/StreamSendConfig.hpp b/std-stream-send/include/StreamSendConfig.hpp new file mode 100644 index 0000000..36ecd2c --- /dev/null +++ b/std-stream-send/include/StreamSendConfig.hpp @@ -0,0 +1,33 @@ +#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP +#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP + + +#include +#include +#include +#include +#include + +struct StreamSendConfig { + static StreamSendConfig from_json_file(const std::string& filename) { + std::ifstream ifs(filename); + rapidjson::IStreamWrapper isw(ifs); + rapidjson::Document config_parameters; + config_parameters.ParseStream(isw); + + return { + config_parameters["detector_name"].GetString(), + config_parameters["detector_type"].GetString(), + config_parameters["n_modules"].GetInt(), + config_parameters["start_udp_port"].GetInt(), + }; + } + + const std::string detector_name; + const std::string detector_type; + const int n_modules; + const int start_udp_port; +}; + + +#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP diff --git a/std-stream-send/include/stream_config.hpp b/std-stream-send/include/stream_config.hpp new file mode 100644 index 0000000..a0f1d72 --- /dev/null +++ b/std-stream-send/include/stream_config.hpp @@ -0,0 +1,14 @@ +namespace stream_config +{ + // N of IO threads to receive data from modules. + const int STREAM_ZMQ_IO_THREADS = 1; + // How long should the RECV queue be. + const size_t STREAM_RCVHWM = 100; + + const int PROCESSING_ZMQ_SNDHWM = 10; + // Keep the last second of pulses in the buffer. + const int PULSE_ZMQ_SNDHWM = 100; + + // Number of pulses between each statistics print out. + const size_t STREAM_STATS_MODULO = 1000; +} diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp new file mode 100644 index 0000000..c832578 --- /dev/null +++ b/std-stream-send/src/main.cpp @@ -0,0 +1,52 @@ +#include +#include +#include "stream_config.hpp" +#include +#include +#include "RamBuffer.hpp" + + +using namespace std; +using namespace stream_config; + +int main (int argc, char *argv[]) +{ + auto ctx = zmq_ctx_new(); + zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); + + auto sender = zmq_socket(ctx, ZMQ_PUSH); + const int sndhwm = PROCESSING_ZMQ_SNDHWM; + if (zmq_setsockopt( + sender, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + const int linger = 0; + if (zmq_setsockopt( + sender, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_bind(sender, "tcp://127.0.0.1:10000") != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + RamBuffer image_buffer(config.detector_name + "_assembler", + sizeof(ImageMetadata), assembler.get_image_n_bytes(), 1); + + while (true) { + + image_id = 123; + + zmq_send(sender, + ram_buffer.get_slot_meta(image_id), + sizeof(ImageMetadata), ZMQ_SNDMORE); + + zmq_send(sender, + ram_buffer.get_slot_data(image_id), + buffer_config::MODULE_N_BYTES * 4, 0); + + pulse_id++; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } +} diff --git a/std-stream-send/test/CMakeLists.txt b/std-stream-send/test/CMakeLists.txt new file mode 100644 index 0000000..2778a7a --- /dev/null +++ b/std-stream-send/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(std-stream-send-tests main.cpp) + +target_link_libraries(std-stream-send-tests + std-stream-send-lib + gtest + ) + diff --git a/std-stream-send/test/main.cpp b/std-stream-send/test/main.cpp new file mode 100644 index 0000000..e819294 --- /dev/null +++ b/std-stream-send/test/main.cpp @@ -0,0 +1,8 @@ +#include "gtest/gtest.h" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}