From 29d3e4c1cb09244c8b6a4b0a41808839eba0bada Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 24 Feb 2021 08:56:29 +0100 Subject: [PATCH] Rewrite the writer main function --- jf-live-writer/src/main.cpp | 50 +++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/jf-live-writer/src/main.cpp b/jf-live-writer/src/main.cpp index aa47b8c..417c919 100644 --- a/jf-live-writer/src/main.cpp +++ b/jf-live-writer/src/main.cpp @@ -3,9 +3,11 @@ #include #include #include -#include #include "live_writer_config.hpp" #include "WriterStats.hpp" +#include "broker_format.hpp" +#include +#include using namespace std; @@ -17,38 +19,64 @@ int main (int argc, char *argv[]) if (argc != 3) { cout << endl; cout << "Usage: jf_live_writer [detector_json_filename]" - " [writer_id]" << endl; + " [bits_per_pixel]" << endl; cout << "\tdetector_json_filename: detector config file path." << endl; - cout << "\twriter_id: Index of this writer instance." << endl; + cout << "\tbits_per_pixel: Number of bits in each pixel." << endl; cout << endl; exit(-1); } - auto config = BufferUtils::read_json_config(string(argv[1])); - const int writer_id = atoi(argv[2]); + auto const config = BufferUtils::read_json_config(string(argv[1])); + auto const bits_per_pixel = atoi(argv[2]); + + MPI_Init(NULL, NULL); + + int n_writers; + MPI_Comm_size(MPI_COMM_WORLD, &n_writers); + + int i_writer; + MPI_Comm_size(MPI_COMM_WORLD, &i_writer); auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, LIVE_ZMQ_IO_THREADS); auto receiver = BufferUtils::connect_socket( - ctx, config.detector_name, "assembler"); + ctx, config.detector_name, "broker-agent"); RamBuffer ram_buffer(config.detector_name, config.n_modules); - const uint64_t image_n_bytes = config.n_modules * MODULE_N_BYTES; - ImageBinaryWriter writer(config.detector_name, image_n_bytes); + const uint64_t image_n_bytes = + config.image_y_size * config.image_x_size * bits_per_pixel; + JFH5Writer writer(config); WriterStats stats(config.detector_name, STATS_MODULO, image_n_bytes); - ImageMetadata meta = {}; + StoreStream meta = {}; while (true) { zmq_recv(receiver, &meta, sizeof(meta), 0); - char* data = ram_buffer.read_image(meta.pulse_id); + + if (meta.op_code == OP_START) { + writer.open_run(meta.run_id, meta.n_images); + continue; + } + + if (meta.op_code == OP_END) { + writer.close_run(meta.run_id); + continue; + } + + if (meta.i_image % n_writers != i_writer) { + continue; + } + + char* data = ram_buffer.read_image(meta.image_metadata.pulse_id); stats.start_image_write(); - writer.write(meta, data); + writer.write(meta.run_id, meta.image_metadata, data); stats.end_image_write(); } + + MPI_Finalize(); }