diff --git a/.dockerignore b/.dockerignore index e98d44a..197f5fd 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,4 +1,9 @@ cmake-build-*/ docker/ docs/ -scripts/ \ No newline at end of file +scripts/ + +debug.Dockerfile + +# Include files: +!docker/example_detector.json diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index c6f64be..ad5b12b 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -36,7 +36,7 @@ namespace buffer_config { // HWM for live stream from buffer. const int BUFFER_ZMQ_RCVHWM = 100; // IPC address of the live stream. - const std::string BUFFER_LIVE_IPC_URL = "ipc:///tmp/sf-live-"; + const std::string IPC_URL_BASE = "ipc:///tmp/std-daq-"; // Number of image slots in ram buffer - 10 seconds should be enough const int RAM_BUFFER_N_SLOTS = 100 * 10; } diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index 3fc1c6b..1d0f0f8 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -26,13 +26,14 @@ struct ModuleFrame { #pragma pack(push) #pragma pack(1) struct ImageMetadata { + uint64_t version; uint64_t id; uint64_t height; uint64_t width; - uint64_t dtype; - uint64_t encoding; - uint64_t source_id; - uint64_t status; + uint16_t dtype; + uint16_t encoding; + uint16_t source_id; + uint16_t status; uint64_t user_1; uint64_t user_2; }; diff --git a/core-buffer/src/BufferUtils.cpp b/core-buffer/src/BufferUtils.cpp index abe0a3d..2809341 100644 --- a/core-buffer/src/BufferUtils.cpp +++ b/core-buffer/src/BufferUtils.cpp @@ -94,7 +94,7 @@ void BufferUtils::create_destination_folder(const string& output_file) void* BufferUtils::connect_socket( void* ctx, const string& detector_name, const string& stream_name) { - string ipc_address = buffer_config::BUFFER_LIVE_IPC_URL + + string ipc_address = buffer_config::IPC_URL_BASE + detector_name + "-" + stream_name; @@ -127,7 +127,7 @@ void* BufferUtils::connect_socket( void* BufferUtils::bind_socket( void* ctx, const string& detector_name, const string& stream_name) { - string ipc_address = BUFFER_LIVE_IPC_URL + + string ipc_address = IPC_URL_BASE + detector_name + "-" + stream_name; diff --git a/std-det-writer/debug.Dockerfile b/debug.Dockerfile similarity index 52% rename from std-det-writer/debug.Dockerfile rename to debug.Dockerfile index eb0f094..7225593 100644 --- a/std-det-writer/debug.Dockerfile +++ b/debug.Dockerfile @@ -4,7 +4,10 @@ COPY . /sf_daq_buffer/ RUN mkdir /sf_daq_buffer/build && \ cd /sf_daq_buffer/build && \ - cmake3 .. && \ - make std-det-writer + # Build the project + cmake3 .. -DUSE_EIGER=1&& \ + make && \ + # Deploy the test config. + cp /sf_daq_buffer/docker/example_detector.json . WORKDIR /sf_daq_buffer/build diff --git a/docker/example_detector.json b/docker/example_detector.json new file mode 100644 index 0000000..02b0012 --- /dev/null +++ b/docker/example_detector.json @@ -0,0 +1,7 @@ +{ + "detector_name": "cSAXS.EG01V01", + "detector_type": "eiger", + "n_modules": 4, + "image_n_pixels": 123456, + "start_udp_port": 50000 +} \ No newline at end of file diff --git a/std-det-writer/include/StdDetWriterConfig.hpp b/std-det-writer/include/DetWriterConfig.hpp similarity index 61% rename from std-det-writer/include/StdDetWriterConfig.hpp rename to std-det-writer/include/DetWriterConfig.hpp index 847439b..31e983a 100644 --- a/std-det-writer/include/StdDetWriterConfig.hpp +++ b/std-det-writer/include/DetWriterConfig.hpp @@ -8,8 +8,8 @@ #include #include -struct UdpRecvConfig { - static UdpRecvConfig from_json_file(const std::string& filename) { +struct DetWriterConfig { + static DetWriterConfig from_json_file(const std::string& filename) { std::ifstream ifs(filename); rapidjson::IStreamWrapper isw(ifs); rapidjson::Document config_parameters; @@ -17,16 +17,12 @@ struct UdpRecvConfig { return { config_parameters["detector_name"].GetString(), - config_parameters["detector_type"].GetString(), - config_parameters["n_modules"].GetInt(), - config_parameters["start_udp_port"].GetInt(), + config_parameters["image_n_pixels"].GetInt(), }; } const std::string detector_name; - const std::string detector_type; - const int n_modules; - const int start_udp_port; + const int image_n_pixels; }; diff --git a/std-det-writer/include/JFH5Writer.hpp b/std-det-writer/include/JFH5Writer.hpp index b979285..2702f4e 100644 --- a/std-det-writer/include/JFH5Writer.hpp +++ b/std-det-writer/include/JFH5Writer.hpp @@ -13,7 +13,6 @@ extern "C" { class JFH5Writer { const std::string detector_name_; - const std::string root_folder_; static const int64_t NO_RUN_ID = -1; @@ -35,24 +34,24 @@ class JFH5Writer { void close_file(); public: - explicit JFH5Writer( - const std::string detector_name, const std::string root_folder); + JFH5Writer(std::string detector_name); ~JFH5Writer(); - void open_run(int64_t run_id, - uint32_t n_images, - uint32_t image_y_size, - uint32_t image_x_size, - uint32_t bits_per_pixel); + void open_run(const std::string& output_file, + const int run_id, + const int n_images, + const int image_y_size, + const int image_x_size, + const int bits_per_pixel); void close_run(); - void write_data(int64_t run_id, - uint32_t index, + void write_data(const int64_t run_id, + const uint32_t index, const char* data); - void write_meta(int64_t run_id, - uint32_t index, - const ImageMetadata& meta); + void write_meta(const int64_t run_id, + const uint32_t index, + const ImageMetadata* meta); }; #endif //JF_LIVE_WRITER_HPP diff --git a/std-det-writer/include/WriterStats.hpp b/std-det-writer/include/WriterStats.hpp index ba48a18..cacf89d 100644 --- a/std-det-writer/include/WriterStats.hpp +++ b/std-det-writer/include/WriterStats.hpp @@ -1,7 +1,6 @@ #include #include #include -#include "broker_format.hpp" #ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP #define SF_DAQ_BUFFER_FRAMESTATS_HPP @@ -23,8 +22,8 @@ class WriterStats { void print_stats(); public: - explicit WriterStats(std::string detector_name); - void start_run(const StoreStream& meta); + explicit WriterStats(std::string detector_name, + const uint64_t image_n_bytes); void end_run(); void start_image_write(); void end_image_write(); diff --git a/std-det-writer/include/broker_format.hpp b/std-det-writer/include/broker_format.hpp deleted file mode 100644 index dd45175..0000000 --- a/std-det-writer/include/broker_format.hpp +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef SF_DAQ_BUFFER_BROKER_FORMAT_HPP -#define SF_DAQ_BUFFER_BROKER_FORMAT_HPP - -#include "formats.hpp" - - -#pragma pack(push) -#pragma pack(1) -struct StoreStream { - ImageMetadata image_metadata; - - int64_t run_id; - uint32_t i_image; - uint32_t n_images; - uint32_t image_y_size; - uint32_t image_x_size; - uint32_t bits_per_pixel; -}; -#pragma pack(pop) -#endif //SF_DAQ_BUFFER_BROKER_FORMAT_HPP diff --git a/std-det-writer/src/JFH5Writer.cpp b/std-det-writer/src/JFH5Writer.cpp index 3e58a1d..cdb5deb 100644 --- a/std-det-writer/src/JFH5Writer.cpp +++ b/std-det-writer/src/JFH5Writer.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "JFH5Writer.hpp" #include "live_writer_config.hpp" @@ -15,10 +16,8 @@ using namespace std; using namespace buffer_config; using namespace live_writer_config; -JFH5Writer::JFH5Writer( - const std::string detector_name, const std::string root_folder): - detector_name_(detector_name), - root_folder_(root_folder) +JFH5Writer::JFH5Writer(std::string detector_name): + detector_name_(std::move(detector_name)) { } @@ -42,22 +41,20 @@ hid_t JFH5Writer::get_datatype(const int bits_per_pixel) } } -void JFH5Writer::open_run(const int64_t run_id, - const uint32_t n_images, - const uint32_t image_y_size, - const uint32_t image_x_size, - const uint32_t bits_per_pixel) +void JFH5Writer::open_run(const string& output_file, + const int run_id, + const int n_images, + const int image_y_size, + const int image_x_size, + const int dtype) { close_run(); - const string output_folder = root_folder_ + "/" + OUTPUT_FOLDER_SYMLINK; - // TODO: Maybe add leading zeros to filename? - const string output_file = output_folder + to_string(run_id) + ".h5"; - current_run_id_ = run_id; image_y_size_ = image_y_size; image_x_size_ = image_x_size; - bits_per_pixel_ = bits_per_pixel; + // The last digit in the enum value represents the number of bytes/pixel. + bits_per_pixel_ = (dtype % 10) * 8; image_n_bytes_ = (image_y_size_ * image_x_size_ * bits_per_pixel_) / 8; #ifdef DEBUG_OUTPUT @@ -237,7 +234,7 @@ void JFH5Writer::write_data( } void JFH5Writer::write_meta( - const int64_t run_id, const uint32_t index, const ImageMetadata& meta) + const int64_t run_id, const uint32_t index, const ImageMetadata* meta) { if (run_id != current_run_id_) { throw runtime_error("Invalid run_id."); @@ -264,12 +261,12 @@ void JFH5Writer::write_meta( } if (H5Dwrite(image_id_dataset_, H5T_NATIVE_UINT64, - ram_ds, file_ds, H5P_DEFAULT, &(meta.id)) < 0) { + ram_ds, file_ds, H5P_DEFAULT, &(meta->id)) < 0) { throw runtime_error("Cannot write data to pulse_id dataset."); } if (H5Dwrite(status_dataset_, H5T_NATIVE_UINT64, - ram_ds, file_ds, H5P_DEFAULT, &(meta.status)) < 0) { + ram_ds, file_ds, H5P_DEFAULT, &(meta->status)) < 0) { throw runtime_error("Cannot write data to is_good_image dataset."); } diff --git a/std-det-writer/src/WriterStats.cpp b/std-det-writer/src/WriterStats.cpp index 87f2800..83ce848 100644 --- a/std-det-writer/src/WriterStats.cpp +++ b/std-det-writer/src/WriterStats.cpp @@ -5,8 +5,9 @@ using namespace std; using namespace chrono; -WriterStats::WriterStats(string detector_name) : - detector_name_(std::move(detector_name)) +WriterStats::WriterStats(string detector_name, size_t image_n_bytes) : + detector_name_(std::move(detector_name)), + image_n_bytes_(image_n_bytes) { reset_counters(); } @@ -36,13 +37,6 @@ void WriterStats::end_image_write() max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration); } -void WriterStats::start_run(const StoreStream& meta) -{ - image_n_bytes_ = (meta.image_y_size * - meta.image_x_size * - meta.bits_per_pixel) / 8; -} - void WriterStats::end_run() { print_stats(); diff --git a/std-det-writer/src/main.cpp b/std-det-writer/src/main.cpp index 4af58a7..f6cacc9 100644 --- a/std-det-writer/src/main.cpp +++ b/std-det-writer/src/main.cpp @@ -7,8 +7,10 @@ #include "BufferUtils.hpp" #include "live_writer_config.hpp" #include "WriterStats.hpp" -#include "broker_format.hpp" #include "JFH5Writer.hpp" +#include "DetWriterConfig.hpp" + +#include "rapidjson/document.h" using namespace std; using namespace buffer_config; @@ -16,16 +18,19 @@ using namespace live_writer_config; int main (int argc, char *argv[]) { - if (argc != 2) { + if (argc != 3) { cout << endl; - cout << "Usage: jf_live_writer [detector_json_filename]" << endl; + cout << "Usage: std-det-writer [detector_json_filename]" + " [bit_depth]" << endl; cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; cout << endl; exit(-1); } - auto const config = BufferUtils::read_json_config(string(argv[1])); + auto const config = DetWriterConfig::from_json_file(string(argv[1])); + const int bit_depth = atoi(argv[2]); MPI_Init(nullptr, nullptr); @@ -38,50 +43,66 @@ int main (int argc, char *argv[]) auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, LIVE_ZMQ_IO_THREADS); auto receiver = BufferUtils::connect_socket( - ctx, config.detector_name, "writer-agent"); + ctx, config.detector_name, "writer_agent"); - const size_t IMAGE_N_BYTES = 12; + const size_t IMAGE_N_BYTES = config.image_n_pixels * bit_depth / 8; RamBuffer image_buffer(config.detector_name + "_assembler", sizeof(ImageMetadata), IMAGE_N_BYTES, 1, RAM_BUFFER_N_SLOTS); - JFH5Writer writer(config); - WriterStats stats(config.detector_name); + JFH5Writer writer(config.detector_name); + WriterStats stats(config.detector_name, IMAGE_N_BYTES); - StoreStream meta = {}; + char recv_buffer[8192]; while (true) { - zmq_recv(receiver, &meta, sizeof(meta), 0); + zmq_recv(receiver, &recv_buffer, sizeof(recv_buffer), 0); - // i_image == 0 -> we have a new run. - if (meta.i_image == 0) { - writer.open_run(meta.run_id, - meta.n_images, - meta.image_y_size, - meta.image_x_size, - meta.bits_per_pixel); - - stats.start_run(meta); + rapidjson::Document document; + if (document.Parse(recv_buffer).HasParseError()) { + continue; } + const string output_file = document["output_file"].GetString(); + const uint64_t image_id = document["image_id"].GetUint64(); + const int run_id = document["run_id"].GetInt(); + const int i_image = document["i_image"].GetInt(); + const int n_images = document["n_images"].GetInt(); + // i_image == n_images -> end of run. - if (meta.i_image == meta.n_images) { + if (i_image == n_images) { writer.close_run(); stats.end_run(); continue; } + // i_image == 0 -> we have a new run. + if (i_image == 0) { + auto image_meta = (ImageMetadata*) + image_buffer.get_slot_meta(image_id); + + writer.open_run(output_file, + run_id, + n_images, + image_meta->height, + image_meta->width, + image_meta->dtype); + } + + // Fair distribution of images among writers. - if (meta.i_image % n_writers == i_writer) { - char* data = ram_buffer.get_slot_data(meta.image_metadata.id); + if (i_image % n_writers == i_writer) { + char* data = image_buffer.get_slot_data(image_id); stats.start_image_write(); - writer.write_data(meta.run_id, meta.i_image, data); + writer.write_data(run_id, i_image, data); stats.end_image_write(); } // Only the first instance writes metadata. if (i_writer == 0) { - writer.write_meta(meta.run_id, meta.i_image, meta.image_metadata); + auto image_meta = (ImageMetadata*) + image_buffer.get_slot_meta(image_id); + writer.write_meta(run_id, i_image, image_meta); } } diff --git a/std-det-writer/test/CMakeLists.txt b/std-det-writer/test/CMakeLists.txt index 8f806b0..8afe069 100644 --- a/std-det-writer/test/CMakeLists.txt +++ b/std-det-writer/test/CMakeLists.txt @@ -1,7 +1,7 @@ -add_executable(jf-live-writer-tests main.cpp) +add_executable(std-det-writer-tests main.cpp) -target_link_libraries(jf-live-writer-tests - jf-live-writer-lib +target_link_libraries(std-det-writer-tests + std-det-writer-lib zmq rt gtest diff --git a/std-stream-send/include/StreamSendConfig.hpp b/std-stream-send/include/StreamSendConfig.hpp index 5bf5531..9709aa8 100644 --- a/std-stream-send/include/StreamSendConfig.hpp +++ b/std-stream-send/include/StreamSendConfig.hpp @@ -19,14 +19,12 @@ struct StreamSendConfig { config_parameters["detector_name"].GetString(), config_parameters["n_modules"].GetInt(), config_parameters["image_n_pixels"].GetInt(), - config_parameters["stream_address"].GetString() }; } const std::string detector_name; const int n_modules; const int image_n_pixels; - const std::string stream_address; }; diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp index 2418c87..e1894f7 100644 --- a/std-stream-send/src/main.cpp +++ b/std-stream-send/src/main.cpp @@ -13,12 +13,13 @@ using namespace buffer_config; int main (int argc, char *argv[]) { - if (argc != 3) { + if (argc != 4) { cout << endl; cout << "Usage: std_stream_send [detector_json_filename]" - " [bit_depth]" << endl; + " [bit_depth] [stream_address]" << endl; cout << "\tdetector_json_filename: detector config file path." << endl; cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; + cout << "\tstream_address: address to bind the output stream." << endl; cout << endl; exit(-1); @@ -26,6 +27,7 @@ int main (int argc, char *argv[]) const auto config = StreamSendConfig::from_json_file(string(argv[1])); const int bit_depth = atoi(argv[2]); + const string stream_address = string(argv[3]); auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); @@ -43,7 +45,7 @@ int main (int argc, char *argv[]) throw runtime_error(zmq_strerror(errno)); } - if (zmq_bind(sender, config.stream_address.c_str()) != 0) { + if (zmq_bind(sender, stream_address.c_str()) != 0) { throw runtime_error(zmq_strerror(errno)); }