From fcafeff6c535eaa4d8079e55a37352bbd891dc88 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 6 Jul 2021 17:50:01 +0200 Subject: [PATCH] Change writer to JSON consuming --- std-det-writer/include/WriterStats.hpp | 6 +-- std-det-writer/include/broker_format.hpp | 17 ------- std-det-writer/src/WriterStats.cpp | 12 ++--- std-det-writer/src/main.cpp | 58 ++++++++++++++---------- 4 files changed, 41 insertions(+), 52 deletions(-) delete mode 100644 std-det-writer/include/broker_format.hpp diff --git a/std-det-writer/include/WriterStats.hpp b/std-det-writer/include/WriterStats.hpp index ba48a18..705c682 100644 --- a/std-det-writer/include/WriterStats.hpp +++ b/std-det-writer/include/WriterStats.hpp @@ -1,7 +1,7 @@ #include #include #include -#include "broker_format.hpp" +#include "store_format.hpp" #ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP #define SF_DAQ_BUFFER_FRAMESTATS_HPP @@ -23,8 +23,8 @@ class WriterStats { void print_stats(); public: - explicit WriterStats(std::string detector_name); - void start_run(const StoreStream& meta); + explicit WriterStats(std::string detector_name, + const uint64_t image_n_bytes); void end_run(); void start_image_write(); void end_image_write(); diff --git a/std-det-writer/include/broker_format.hpp b/std-det-writer/include/broker_format.hpp deleted file mode 100644 index 417d299..0000000 --- a/std-det-writer/include/broker_format.hpp +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef SF_DAQ_BUFFER_BROKER_FORMAT_HPP -#define SF_DAQ_BUFFER_BROKER_FORMAT_HPP - -#include "formats.hpp" - - -#pragma pack(push) -#pragma pack(1) -struct StoreStream { - std::string output_file; - int64_t run_id; - uint64_t image_id; - uint32_t i_image; - uint32_t n_images; -}; -#pragma pack(pop) -#endif //SF_DAQ_BUFFER_BROKER_FORMAT_HPP diff --git a/std-det-writer/src/WriterStats.cpp b/std-det-writer/src/WriterStats.cpp index 87f2800..83ce848 100644 --- a/std-det-writer/src/WriterStats.cpp +++ b/std-det-writer/src/WriterStats.cpp @@ -5,8 +5,9 @@ using namespace std; using namespace chrono; -WriterStats::WriterStats(string detector_name) : - detector_name_(std::move(detector_name)) +WriterStats::WriterStats(string detector_name, size_t image_n_bytes) : + detector_name_(std::move(detector_name)), + image_n_bytes_(image_n_bytes) { reset_counters(); } @@ -36,13 +37,6 @@ void WriterStats::end_image_write() max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration); } -void WriterStats::start_run(const StoreStream& meta) -{ - image_n_bytes_ = (meta.image_y_size * - meta.image_x_size * - meta.bits_per_pixel) / 8; -} - void WriterStats::end_run() { print_stats(); diff --git a/std-det-writer/src/main.cpp b/std-det-writer/src/main.cpp index b9a39e9..451d530 100644 --- a/std-det-writer/src/main.cpp +++ b/std-det-writer/src/main.cpp @@ -7,10 +7,12 @@ #include "BufferUtils.hpp" #include "live_writer_config.hpp" #include "WriterStats.hpp" -#include "broker_format.hpp" +#include "store_format.hpp" #include "JFH5Writer.hpp" #include "DetWriterConfig.hpp" +#include "rapidjson/document.h" + using namespace std; using namespace buffer_config; using namespace live_writer_config; @@ -46,49 +48,59 @@ int main (int argc, char *argv[]) sizeof(ImageMetadata), IMAGE_N_BYTES, 1, RAM_BUFFER_N_SLOTS); JFH5Writer writer(config.detector_name); - WriterStats stats(config.detector_name); + WriterStats stats(config.detector_name, IMAGE_N_BYTES); - StoreStream meta = {}; + char recv_buffer[8192]; while (true) { - zmq_recv(receiver, &meta, sizeof(meta), 0); + zmq_recv(receiver, &recv_buffer, sizeof(recv_buffer), 0); - // i_image == 0 -> we have a new run. - if (meta.i_image == 0) { - auto image_meta = (ImageMetadata*) - image_buffer.get_slot_meta(meta.image_id); - - writer.open_run(meta.output_file, - meta.run_id, - meta.n_images, - image_meta->height, - image_meta->width, - image_meta->dtype); - - stats.start_run(meta); + rapidjson::Document document; + if (document.Parse(recv_buffer).HasParseError()) { + continue; } + const string output_file = document["output_file"].GetString(); + const uint64_t image_id = document["image_id"].GetUint64(); + const int run_id = document["run_id"].GetInt(); + const int i_image = document["i_image"].GetInt(); + const int n_images = document["n_images"].GetInt(); + // i_image == n_images -> end of run. - if (meta.i_image == meta.n_images) { + if (i_image == n_images) { writer.close_run(); stats.end_run(); continue; } + // i_image == 0 -> we have a new run. + if (i_image == 0) { + auto image_meta = (ImageMetadata*) + image_buffer.get_slot_meta(image_id); + + writer.open_run(output_file, + run_id, + n_images, + image_meta->height, + image_meta->width, + image_meta->dtype); + } + + // Fair distribution of images among writers. - if (meta.i_image % n_writers == i_writer) { - char* data = image_buffer.get_slot_data(meta.image_id); + if (i_image % n_writers == i_writer) { + char* data = image_buffer.get_slot_data(image_id); stats.start_image_write(); - writer.write_data(meta.run_id, meta.i_image, data); + writer.write_data(run_id, i_image, data); stats.end_image_write(); } // Only the first instance writes metadata. if (i_writer == 0) { auto image_meta = (ImageMetadata*) - image_buffer.get_slot_meta(meta.image_id); - writer.write_meta(meta.run_id, meta.i_image, image_meta); + image_buffer.get_slot_meta(image_id); + writer.write_meta(run_id, i_image, image_meta); } }