From eb480c82d98396dc9dcd68c008a56237d2d0f449 Mon Sep 17 00:00:00 2001 From: Leonardo Hax Date: Thu, 5 May 2022 19:07:33 +0200 Subject: [PATCH] fixed metadata parser with user_id --- std-det-writer/include/DetWriterConfig.hpp | 8 ++- std-det-writer/src/main.cpp | 76 +++++++++++++++------- 2 files changed, 61 insertions(+), 23 deletions(-) diff --git a/std-det-writer/include/DetWriterConfig.hpp b/std-det-writer/include/DetWriterConfig.hpp index f7efa15..0c43843 100644 --- a/std-det-writer/include/DetWriterConfig.hpp +++ b/std-det-writer/include/DetWriterConfig.hpp @@ -17,15 +17,21 @@ struct DetWriterConfig { return { config_parameters["detector_name"].GetString(), + config_parameters["detector_type"].GetString(), + config_parameters["n_modules"].GetInt(), config_parameters["image_height"].GetInt(), config_parameters["image_width"].GetInt(), - config_parameters["bit_depth"].GetInt(), + config_parameters["start_udp_port"].GetInt(), + config_parameters["bit_depth"].GetInt() }; } const std::string detector_name; + const std::string detector_type; + const int n_modules; const int image_height; const int image_width; + const int start_udp_port; const int bit_depth; }; diff --git a/std-det-writer/src/main.cpp b/std-det-writer/src/main.cpp index 5872ffc..c65f2e4 100644 --- a/std-det-writer/src/main.cpp +++ b/std-det-writer/src/main.cpp @@ -15,19 +15,24 @@ #include "rapidjson/document.h" #include "rapidjson/writer.h" +#include "rapidjson/stringbuffer.h" + #include "date.h" using namespace std; using namespace date; using namespace buffer_config; using namespace live_writer_config; +using namespace rapidjson; + int main (int argc, char *argv[]) { - if (argc != 2) { + if (argc != 3) { cout << endl; cout << "Usage: std_det_writer [detector_json_filename]" << endl; cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tincoming_address: zmq incoming address." << endl; cout << endl; exit(-1); @@ -42,20 +47,28 @@ int main (int argc, char *argv[]) int i_writer; MPI_Comm_rank(MPI_COMM_WORLD, &i_writer); - auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, LIVE_ZMQ_IO_THREADS); + auto address_incoming = argv[2]; auto receiver = BufferUtils::connect_socket_gf( - ctx, config.detector_name, "tcp://localhost:9667"); + ctx, config.detector_name, address_incoming); const size_t IMAGE_N_BYTES = config.image_width * config.image_height * config.bit_depth / 8; + + + #ifdef DEBUG_OUTPUT + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[std_daq_det_writer] " << endl; + cout << " Config.detector_name " << config.detector_name << endl; + cout << " address : " << address_incoming << endl; + cout << " image_n_bytes " << IMAGE_N_BYTES; + #endif JFH5Writer writer(config.detector_name); WriterStats stats(config.detector_name, IMAGE_N_BYTES); char recv_buffer_meta[512]; char recv_buffer_data[4838400]; - bool open_run = false; bool header_in = false; int last_run_id = -1; while (true) { @@ -65,29 +78,34 @@ int main (int argc, char *argv[]) std::string error_str(recv_buffer_meta, nbytes); throw runtime_error(error_str); } - const string output_file = document["output_file"].GetString(); - const uint64_t image_id = document["image_id"].GetUint64(); + // const uint64_t image_id = document["image_id"].GetUint64(); const int run_id = document["run_id"].GetInt(); const int i_image = document["i_image"].GetInt(); const int n_images = document["n_images"].GetInt(); const int user_id = document["user_id"].GetInt(); - - - - const int status = document["status"].GetInt(); const rapidjson::Value& a = document["shape"]; - const int width = a[0].GetInt(); - const int heigth = a[1].GetInt(); + const int img_metadata_width = a[0].GetInt(); + const int img_metadata_heigth = a[1].GetInt(); const int dtype = 2; + #ifdef DEBUG_OUTPUT + if (i_writer == 0){ + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[std_daq_det_writer] Metadata json: " << endl; + StringBuffer doc_buffer; + Writer doc_writer(doc_buffer); + document.Accept(doc_writer); + cout < end of run. - if (i_image == n_images && open_run == true) { + if (i_image == n_images) { writer.close_run(); stats.end_run(); - open_run = false; #ifdef DEBUG_OUTPUT cout << "[" << std::chrono::system_clock::now() << "]"; @@ -106,11 +124,12 @@ int main (int argc, char *argv[]) error_message << "[std_daq_det_writer] Cannot set real user id..." << endl; throw runtime_error(error_message.str()); } + header_in = true; continue; } // i_image == 0 -> we have a new run. - if (i_image == 0 && open_run == false) { + if (i_image == 0) { // TODO Improve changing GID and UID of the writer processes // to be part of the deployment via the ansible deployment. #ifdef DEBUG_OUTPUT @@ -131,22 +150,30 @@ int main (int argc, char *argv[]) error_message << "[std_daq_det_writer] Cannot set user_id to " << user_id << endl; throw runtime_error(error_message.str()); } + #ifdef DEBUG_OUTPUT + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[std_daq_det_writer] Opening run..." << endl; + #endif writer.open_run(output_file, run_id, n_images, - heigth, - width, + img_metadata_heigth, + img_metadata_width, dtype); - open_run = true; - } - + } // data auto img_nbytes = zmq_recv(receiver, &recv_buffer_data, sizeof(recv_buffer_data), 0); - if (img_nbytes != -1 && header_in == true && open_run == true){ + if (img_nbytes != -1 && header_in == true){ // Fair distribution of images among writers. if (i_image % n_writers == i_writer) { + #ifdef DEBUG_OUTPUT + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[std_daq_det_writer] Writing data..." << endl; + cout << "[writer ID] "<< i_writer << endl; + #endif + stats.start_image_write(); writer.write_data(run_id, i_image, recv_buffer_data); stats.end_image_write(); @@ -156,7 +183,12 @@ int main (int argc, char *argv[]) // Only the first instance writes metadata. - if (i_writer == 0 && header_in == true && open_run == true) { + if (i_writer == 0 && header_in == true) { + #ifdef DEBUG_OUTPUT + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[std_daq_det_writer] Writing metadata..." << endl; + cout << "[writer ID] "<< i_writer << endl; + #endif writer.write_meta_gf(run_id, i_image, (uint16_t)run_id, (uint64_t)status);