From 6bbbd734c7f0ec9901a4e4dc3929d477bf635ceb Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 Jan 2021 17:20:06 +0100 Subject: [PATCH] Add jf-live-writer First implementation of the image buffer writer for the Jungfrau --- CMakeLists.txt | 2 +- jf-live-writer/include/BinaryReader.hpp | 28 --- jf-live-writer/include/BufferStats.hpp | 32 +++ jf-live-writer/include/JFH5LiveWriter.hpp | 49 ----- jf-live-writer/include/LiveImageAssembler.hpp | 51 ----- jf-live-writer/include/live_writer_config.hpp | 6 +- jf-live-writer/src/BinaryReader.cpp | 102 --------- jf-live-writer/src/BufferStats.cpp | 63 ++++++ jf-live-writer/src/JFH5LiveWriter.cpp | 133 ------------ jf-live-writer/src/LiveImageAssembler.cpp | 159 -------------- jf-live-writer/src/main.cpp | 195 +++--------------- 11 files changed, 121 insertions(+), 699 deletions(-) delete mode 100644 jf-live-writer/include/BinaryReader.hpp create mode 100644 jf-live-writer/include/BufferStats.hpp delete mode 100644 jf-live-writer/include/JFH5LiveWriter.hpp delete mode 100644 jf-live-writer/include/LiveImageAssembler.hpp delete mode 100644 jf-live-writer/src/BinaryReader.cpp create mode 100644 jf-live-writer/src/BufferStats.cpp delete mode 100644 jf-live-writer/src/JFH5LiveWriter.cpp delete mode 100644 jf-live-writer/src/LiveImageAssembler.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c739d65..63c25dc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,4 +34,4 @@ add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") add_subdirectory("sf-stream") add_subdirectory("sf-writer") -#add_subdirectory("jf-live-writer") +add_subdirectory("jf-live-writer") diff --git a/jf-live-writer/include/BinaryReader.hpp b/jf-live-writer/include/BinaryReader.hpp deleted file mode 100644 index 85d2a0c..0000000 --- a/jf-live-writer/include/BinaryReader.hpp +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef SF_DAQ_BUFFER_BINARYREADER_HPP -#define SF_DAQ_BUFFER_BINARYREADER_HPP - - -#include - -class BinaryReader { - - const std::string detector_folder_; - const std::string module_name_; - - std::string current_input_file_; - int input_file_fd_; - - void open_file(const std::string& filename); - void close_current_file(); - -public: - BinaryReader(const std::string &detector_folder, - const std::string &module_name); - - ~BinaryReader(); - - void get_frame(const uint64_t pulse_id, BufferBinaryFormat *buffer); -}; - - -#endif //SF_DAQ_BUFFER_BINARYREADER_HPP diff --git a/jf-live-writer/include/BufferStats.hpp b/jf-live-writer/include/BufferStats.hpp new file mode 100644 index 0000000..3aff6cb --- /dev/null +++ b/jf-live-writer/include/BufferStats.hpp @@ -0,0 +1,32 @@ +#include +#include +#include + +#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP +#define SF_DAQ_BUFFER_FRAMESTATS_HPP + + +class BufferStats { + const std::string detector_name_; + const int module_id_; + size_t stats_modulo_; + + int frames_counter_; + uint32_t total_buffer_write_us_; + uint32_t max_buffer_write_us_; + std::chrono::time_point stats_interval_start_; + + void reset_counters(); + void print_stats(); + +public: + BufferStats( + const std::string &detector_name, + const int module_id, + const size_t stats_modulo); + void start_frame_write(); + void end_frame_write(); +}; + + +#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP diff --git a/jf-live-writer/include/JFH5LiveWriter.hpp b/jf-live-writer/include/JFH5LiveWriter.hpp deleted file mode 100644 index a417631..0000000 --- a/jf-live-writer/include/JFH5LiveWriter.hpp +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef SFWRITER_HPP -#define SFWRITER_HPP - -#include -#include -#include - -#include "LiveImageAssembler.hpp" - -const auto& H5_UINT64 = H5::PredType::NATIVE_UINT64; -const auto& H5_UINT32 = H5::PredType::NATIVE_UINT32; -const auto& H5_UINT16 = H5::PredType::NATIVE_UINT16; -const auto& H5_UINT8 = H5::PredType::NATIVE_UINT8; - -class JFH5LiveWriter { - - const std::string detector_name_; - const size_t n_modules_; - const size_t n_pulses_; - - size_t write_index_; - - H5::H5File file_; - H5::DataSet image_dataset_; - - uint64_t* b_pulse_id_; - uint64_t* b_frame_index_; - uint32_t* b_daq_rec_; - uint8_t* b_is_good_frame_ ; - - void init_file(const std::string &output_file); - void write_dataset(const std::string name, - const void *buffer, - const H5::PredType &type); - void write_metadata(); - std::string get_detector_name(const std::string& detector_folder); - - void close_file(); - -public: - JFH5LiveWriter(const std::string& output_file, - const std::string& detector_folder, - const size_t n_modules, - const size_t n_pulses); - ~JFH5LiveWriter(); - void write(const ImageMetadata* metadata, const char* data); -}; - -#endif //SFWRITER_HPP diff --git a/jf-live-writer/include/LiveImageAssembler.hpp b/jf-live-writer/include/LiveImageAssembler.hpp deleted file mode 100644 index 5bcb749..0000000 --- a/jf-live-writer/include/LiveImageAssembler.hpp +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP -#define SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP - -#include - -#include "buffer_config.hpp" -#include "formats.hpp" - -const uint64_t IA_EMPTY_SLOT_VALUE = 0; - -struct ImageMetadata -{ - uint64_t pulse_id; - uint64_t frame_index; - uint32_t daq_rec; - uint8_t is_good_image; -}; - -class LiveImageAssembler { - const size_t n_modules_; - const size_t image_buffer_slot_n_bytes_; - - char* image_buffer_; - ImageMetadata* image_meta_buffer_; - ModuleFrame* frame_meta_buffer_; - std::atomic_int* buffer_status_; - std::atomic_uint64_t* buffer_pulse_id_; - - size_t get_data_offset(const uint64_t slot_id, const int i_module); - size_t get_frame_metadata_offset(const uint64_t slot_id, const int i_module); - -public: - LiveImageAssembler(const size_t n_modules); - - virtual ~LiveImageAssembler(); - - bool is_slot_free(const uint64_t pulse_id); - bool is_slot_full(const uint64_t pulse_id); - - void process(const uint64_t pulse_id, - const int i_module, - const BufferBinaryFormat* block_buffer); - - void free_slot(const uint64_t pulse_id); - - ImageMetadata* get_metadata_buffer(const uint64_t pulse_id); - char* get_data_buffer(const uint64_t pulse_id); -}; - - -#endif //SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP diff --git a/jf-live-writer/include/live_writer_config.hpp b/jf-live-writer/include/live_writer_config.hpp index 0a62457..76d9b05 100644 --- a/jf-live-writer/include/live_writer_config.hpp +++ b/jf-live-writer/include/live_writer_config.hpp @@ -2,8 +2,6 @@ namespace live_writer_config { - // MS to retry reading from the image assembler. - const size_t ASSEMBLER_RETRY_MS = 5; - // Number of slots in the reconstruction buffer. - const size_t WRITER_IA_N_SLOTS = 200; + // N of IO threads to receive data from modules. + const int LIVE_ZMQ_IO_THREADS = 1; } \ No newline at end of file diff --git a/jf-live-writer/src/BinaryReader.cpp b/jf-live-writer/src/BinaryReader.cpp deleted file mode 100644 index 0512ac7..0000000 --- a/jf-live-writer/src/BinaryReader.cpp +++ /dev/null @@ -1,102 +0,0 @@ -#include "BinaryReader.hpp" - -#include -#include -#include -#include - -#include "BufferUtils.hpp" -#include "buffer_config.hpp" - -using namespace std; -using namespace buffer_config; - -BinaryReader::BinaryReader( - const std::string &detector_folder, - const std::string &module_name) : - detector_folder_(detector_folder), - module_name_(module_name), - current_input_file_(""), - input_file_fd_(-1) -{} - -BinaryReader::~BinaryReader() -{ - close_current_file(); -} - -void BinaryReader::get_frame( - const uint64_t pulse_id, BufferBinaryFormat* buffer) -{ - - auto current_frame_file = BufferUtils::get_filename( - detector_folder_, module_name_, pulse_id); - - if (current_frame_file != current_input_file_) { - open_file(current_frame_file); - } - - size_t file_index = BufferUtils::get_file_frame_index(pulse_id); - size_t n_bytes_offset = file_index * sizeof(BufferBinaryFormat); - - auto lseek_result = lseek(input_file_fd_, n_bytes_offset, SEEK_SET); - if (lseek_result < 0) { - stringstream err_msg; - - err_msg << "[BinaryReader::get_frame]"; - err_msg << " Error while lseek on file "; - err_msg << current_input_file_ << " for n_bytes_offset "; - err_msg << n_bytes_offset << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - auto n_bytes = ::read(input_file_fd_, buffer, sizeof(BufferBinaryFormat)); - - if (n_bytes < sizeof(BufferBinaryFormat)) { - stringstream err_msg; - - err_msg << "[BinaryReader::get_block]"; - err_msg << " Error while reading from file "; - err_msg << current_input_file_ << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } -} - -void BinaryReader::open_file(const std::string& filename) -{ - close_current_file(); - - input_file_fd_ = open(filename.c_str(), O_RDONLY); - - if (input_file_fd_ < 0) { - stringstream err_msg; - - err_msg << "[BinaryReader::open_file]"; - err_msg << " Cannot open file " << filename << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - current_input_file_ = filename; -} - -void BinaryReader::close_current_file() -{ - if (input_file_fd_ != -1) { - if (close(input_file_fd_) < 0) { - stringstream err_msg; - - err_msg << "[BinaryWriter::close_current_file]"; - err_msg << " Error while closing file " << current_input_file_; - err_msg << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - input_file_fd_ = -1; - current_input_file_ = ""; - } -} diff --git a/jf-live-writer/src/BufferStats.cpp b/jf-live-writer/src/BufferStats.cpp new file mode 100644 index 0000000..173a35c --- /dev/null +++ b/jf-live-writer/src/BufferStats.cpp @@ -0,0 +1,63 @@ +#include +#include "BufferStats.hpp" + +using namespace std; +using namespace chrono; + +BufferStats::BufferStats( + const string& detector_name, + const int module_id, + const size_t stats_modulo) : + detector_name_(detector_name), + module_id_(module_id), + stats_modulo_(stats_modulo) +{ + reset_counters(); +} + +void BufferStats::reset_counters() +{ + frames_counter_ = 0; + total_buffer_write_us_ = 0; + max_buffer_write_us_ = 0; +} + +void BufferStats::start_frame_write() +{ + stats_interval_start_ = steady_clock::now(); +} + +void BufferStats::end_frame_write() +{ + frames_counter_++; + + uint32_t write_us_duration = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + total_buffer_write_us_ += write_us_duration; + max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration); + + if (frames_counter_ == stats_modulo_) { + print_stats(); + reset_counters(); + } +} + +void BufferStats::print_stats() +{ + float avg_buffer_write_us = total_buffer_write_us_ / frames_counter_; + + uint64_t timestamp = time_point_cast( + system_clock::now()).time_since_epoch().count(); + + // Output in InfluxDB line protocol + cout << "jf_buffer_writer"; + cout << ",detector_name=" << detector_name_; + cout << ",module_name=M" << module_id_; + cout << " "; + cout << "avg_buffer_write_us=" << avg_buffer_write_us; + cout << ",max_buffer_write_us=" << max_buffer_write_us_ << "i"; + cout << " "; + cout << timestamp; + cout << endl; +} diff --git a/jf-live-writer/src/JFH5LiveWriter.cpp b/jf-live-writer/src/JFH5LiveWriter.cpp deleted file mode 100644 index 5928a6e..0000000 --- a/jf-live-writer/src/JFH5LiveWriter.cpp +++ /dev/null @@ -1,133 +0,0 @@ -#include "JFH5LiveWriter.hpp" - -#include -#include - - -#include "buffer_config.hpp" - -using namespace std; -using namespace buffer_config; - -JFH5LiveWriter::JFH5LiveWriter(const string& output_file, - const string& detector_folder, - const size_t n_modules, - const size_t n_pulses) : - detector_name_(get_detector_name(detector_folder)), - n_modules_(n_modules), - n_pulses_(n_pulses), - write_index_(0) -{ - b_pulse_id_ = new uint64_t[n_pulses_]; - b_frame_index_= new uint64_t[n_pulses_]; - b_daq_rec_ = new uint32_t[n_pulses_]; - b_is_good_frame_ = new uint8_t[n_pulses_]; - - init_file(output_file); -} - -void JFH5LiveWriter::init_file(const string& output_file) -{ - file_ = H5::H5File(output_file, H5F_ACC_TRUNC); - file_.createGroup("/data"); - file_.createGroup("/data/" + detector_name_); - - H5::DataSpace att_space(H5S_SCALAR); - H5::DataType data_type = H5::StrType(0, H5T_VARIABLE); - - file_.createGroup("/general"); - auto detector_dataset = file_.createDataSet( - "/general/detector_name", data_type ,att_space); - - detector_dataset.write(detector_name_, data_type); - - hsize_t image_dataset_dims[3] = - {n_pulses_, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE}; - - H5::DataSpace image_dataspace(3, image_dataset_dims); - - hsize_t image_dataset_chunking[3] = - {1, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DSetCreatPropList image_dataset_properties; - image_dataset_properties.setChunk(3, image_dataset_chunking); - - image_dataset_ = file_.createDataSet( - "/data/" + detector_name_ + "/data", - H5_UINT16, - image_dataspace, - image_dataset_properties); -} - - -std::string JFH5LiveWriter::get_detector_name(const string& detector_folder) -{ - size_t last_separator; - if ((last_separator = detector_folder.rfind("/")) == string::npos) { - return detector_folder; - } - - return detector_folder.substr(last_separator + 1); -} - -JFH5LiveWriter::~JFH5LiveWriter() -{ - close_file(); - - delete[] b_pulse_id_; - delete[] b_frame_index_; - delete[] b_daq_rec_; - delete[] b_is_good_frame_; -} - -void JFH5LiveWriter::write_dataset( - const string name, const void* buffer, const H5::PredType& type) -{ - hsize_t b_m_dims[] = {n_pulses_}; - H5::DataSpace b_m_space (1, b_m_dims); - - hsize_t f_m_dims[] = {n_pulses_, 1}; - H5::DataSpace f_m_space(2, f_m_dims); - - auto complete_name = "/data/" + detector_name_ + "/" + name; - auto dataset = file_.createDataSet(complete_name, type, f_m_space); - - dataset.write(buffer, type, b_m_space, f_m_space); - - dataset.close(); -} - -void JFH5LiveWriter::write_metadata() -{ - write_dataset("pulse_id", &b_pulse_id_, H5_UINT64); - write_dataset("frame_index", &b_frame_index_, H5_UINT64); - write_dataset("daq_rec", &b_daq_rec_, H5_UINT32); - write_dataset("is_good_frame", &b_is_good_frame_, H5_UINT8); -} - -void JFH5LiveWriter::close_file() -{ - if (file_.getId() == -1) { - return; - } - - image_dataset_.close(); - - write_metadata(); - - file_.close(); -} - -void JFH5LiveWriter::write(const ImageMetadata* metadata, const char* data) -{ - hsize_t offset[] = {write_index_, 0, 0}; - - H5DOwrite_chunk(image_dataset_.getId(), H5P_DEFAULT, 0, - offset, MODULE_N_BYTES * n_modules_, data); - - b_pulse_id_[write_index_] = metadata->pulse_id; - b_frame_index_[write_index_] = metadata->frame_index; - b_daq_rec_[write_index_] = metadata->daq_rec; - b_is_good_frame_[write_index_] = metadata->is_good_image; - - write_index_++; -} diff --git a/jf-live-writer/src/LiveImageAssembler.cpp b/jf-live-writer/src/LiveImageAssembler.cpp deleted file mode 100644 index 57cf48b..0000000 --- a/jf-live-writer/src/LiveImageAssembler.cpp +++ /dev/null @@ -1,159 +0,0 @@ -#include - -#include "LiveImageAssembler.hpp" -#include "buffer_config.hpp" -#include "live_writer_config.hpp" - -using namespace std; -using namespace buffer_config; -using namespace live_writer_config; - -LiveImageAssembler::LiveImageAssembler(const size_t n_modules) : - n_modules_(n_modules), - image_buffer_slot_n_bytes_(MODULE_N_BYTES * n_modules_) -{ - image_buffer_ = new char[WRITER_IA_N_SLOTS * image_buffer_slot_n_bytes_]; - image_meta_buffer_ = new ImageMetadata[WRITER_IA_N_SLOTS]; - frame_meta_buffer_ = new ModuleFrame[WRITER_IA_N_SLOTS * n_modules]; - buffer_status_ = new atomic_int[WRITER_IA_N_SLOTS]; - buffer_pulse_id_ = new atomic_uint64_t[WRITER_IA_N_SLOTS]; - - for (size_t i=0; i < WRITER_IA_N_SLOTS; i++) { - free_slot(i); - } -} - -LiveImageAssembler::~LiveImageAssembler() -{ - delete[] image_buffer_; - delete[] image_meta_buffer_; -} - -bool LiveImageAssembler::is_slot_free(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - uint64_t slot_pulse_id = IA_EMPTY_SLOT_VALUE; - if (buffer_pulse_id_[slot_id].compare_exchange_strong( - slot_pulse_id, pulse_id)) { - return true; - } - - auto is_free = buffer_status_[slot_id].load(memory_order_relaxed) > 0; - return is_free && (slot_pulse_id == pulse_id); -} - -bool LiveImageAssembler::is_slot_full(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - return buffer_status_[slot_id].load(memory_order_relaxed) == 0; -} - -size_t LiveImageAssembler::get_data_offset( - const uint64_t slot_id, const int i_module) -{ - size_t slot_i_offset = slot_id * image_buffer_slot_n_bytes_; - size_t module_i_offset = i_module * MODULE_N_BYTES; - - return slot_i_offset + module_i_offset; -} - -size_t LiveImageAssembler::get_frame_metadata_offset( - const uint64_t slot_id, const int i_module) -{ - size_t slot_m_offset = slot_id * n_modules_; - size_t module_m_offset = i_module; - - return slot_m_offset + module_m_offset; -} - -void LiveImageAssembler::process( - const uint64_t pulse_id, - const int i_module, - const BufferBinaryFormat* file_buffer) -{ - const auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - auto frame_meta_offset = get_frame_metadata_offset(slot_id, i_module); - auto image_offset = get_data_offset(slot_id, i_module); - - memcpy( - &(frame_meta_buffer_[frame_meta_offset]), - &(file_buffer->metadata), - sizeof(file_buffer->metadata)); - - memcpy( - image_buffer_ + image_offset, - &(file_buffer->data[0]), - MODULE_N_BYTES); - - buffer_status_[slot_id].fetch_sub(1, memory_order_relaxed); -} - -void LiveImageAssembler::free_slot(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - buffer_status_[slot_id].store(n_modules_, memory_order_relaxed); - buffer_pulse_id_[slot_id].store(IA_EMPTY_SLOT_VALUE, memory_order_relaxed); -} - -ImageMetadata* LiveImageAssembler::get_metadata_buffer(const uint64_t pulse_id) -{ - const auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - ImageMetadata& image_meta = image_meta_buffer_[slot_id]; - - auto frame_meta_offset = get_frame_metadata_offset(slot_id, 0); - - auto is_pulse_init = false; - image_meta.is_good_image = 1; - image_meta.pulse_id = 0; - - for (size_t i_module=0; i_module < n_modules_; i_module++) { - - auto& frame_meta = frame_meta_buffer_[frame_meta_offset]; - frame_meta_offset += 1; - - auto is_good_frame = - frame_meta.n_recv_packets == JF_N_PACKETS_PER_FRAME; - - if (!is_good_frame) { - image_meta.pulse_id = 0; - continue; - } - - if (!is_pulse_init) { - image_meta.pulse_id = frame_meta.pulse_id; - image_meta.frame_index = frame_meta.frame_index; - image_meta.daq_rec = frame_meta.daq_rec; - - is_pulse_init = true; - } - - if (image_meta.is_good_image == 1) { - if (frame_meta.pulse_id != image_meta.pulse_id) { - image_meta.is_good_image = 0; - } - - if (frame_meta.frame_index != image_meta.frame_index) { - image_meta.is_good_image = 0; - } - - if (frame_meta.daq_rec != image_meta.daq_rec) { - image_meta.is_good_image = 0; - } - - if (frame_meta.n_recv_packets != JF_N_PACKETS_PER_FRAME) { - image_meta.is_good_image = 0; - } - } - } - - return &image_meta; -} - -char* LiveImageAssembler::get_data_buffer(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - return image_buffer_ + (slot_id * image_buffer_slot_n_bytes_); -} diff --git a/jf-live-writer/src/main.cpp b/jf-live-writer/src/main.cpp index 139a34f..1b912f8 100644 --- a/jf-live-writer/src/main.cpp +++ b/jf-live-writer/src/main.cpp @@ -1,195 +1,46 @@ #include #include -#include -#include -#include - -#include "zmq.h" +#include +#include +#include #include "live_writer_config.hpp" -#include "buffer_config.hpp" -#include "bitshuffle/bitshuffle.h" -#include "JFH5LiveWriter.hpp" -#include "LiveImageAssembler.hpp" -#include "BinaryReader.hpp" +#include "../../jf-buffer-writer/include/BufferStats.hpp" + using namespace std; -using namespace chrono; using namespace buffer_config; using namespace live_writer_config; -void read_buffer( - const string detector_folder, - const string module_name, - const int i_module, - const vector& pulse_ids_to_write, - LiveImageAssembler& image_assembler, - void* ctx) -{ - BinaryReader reader(detector_folder, module_name); - auto frame_buffer = new BufferBinaryFormat(); - - void* socket = zmq_socket(ctx, ZMQ_SUB); - if (socket == nullptr) { - throw runtime_error(zmq_strerror(errno)); - } - - int rcvhwm = 100; - if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - int linger = 0; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - // In milliseconds. - int rcvto = 2000; - if (zmq_setsockopt(socket, ZMQ_RCVTIMEO, &rcvto, sizeof(rcvto)) != 0 ){ - throw runtime_error(zmq_strerror(errno)); - } - - if (zmq_connect(socket, "tcp://127.0.0.1:51234") != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - const uint64_t PULSE_ID_DELAY = 100; - - uint64_t live_pulse_id = pulse_ids_to_write.front(); - for (uint64_t pulse_id:pulse_ids_to_write) { - - while(!image_assembler.is_slot_free(pulse_id)) { - this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS)); - } - - auto start_time = steady_clock::now(); - - // Enforce a delay of 1 second for writing. - while (live_pulse_id - pulse_id < PULSE_ID_DELAY) { - if (zmq_recv(socket, &live_pulse_id, - sizeof(live_pulse_id), 0) == -1) { - if (errno == EAGAIN) { - throw runtime_error("Did not receive pulse_id in time."); - } else { - throw runtime_error(zmq_strerror(errno)); - } - } - } - - reader.get_frame(pulse_id, frame_buffer); - - auto end_time = steady_clock::now(); - uint64_t read_us_duration = duration_cast( - end_time-start_time).count(); - - start_time = steady_clock::now(); - - image_assembler.process(pulse_id, i_module, frame_buffer); - - end_time = steady_clock::now(); - uint64_t compose_us_duration = duration_cast( - end_time-start_time).count(); - - cout << "sf_writer:avg_read_us "; - cout << read_us_duration / BUFFER_BLOCK_SIZE << endl; - cout << "sf_writer:avg_assemble_us "; - cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl; - } - - delete frame_buffer; -} - int main (int argc, char *argv[]) { - if (argc != 7) { + if (argc != 3) { cout << endl; - cout << "Usage: sf_writer [output_file] [detector_folder] [n_modules]"; - cout << " [start_pulse_id] [n_pulses] [pulse_id_step]"; - cout << endl; - cout << "\toutput_file: Complete path to the output file." << endl; - cout << "\tdetector_folder: Absolute path to detector buffer." << endl; - cout << "\tn_modules: number of modules" << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tn_pulses: Number of pulses to write." << endl; - cout << "\tpulse_id_step: 1==100Hz, 2==50hz, 4==25Hz.." << endl; + cout << "Usage: jf_live_writer [detector_json_filename]" + " [stream_name]" << endl; + cout << "\tdetector_json_filename: detector config file path." << endl; cout << endl; exit(-1); } - string output_file = string(argv[1]); - const string detector_folder = string(argv[2]); - size_t n_modules = atoi(argv[3]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); - size_t n_pulses = (size_t) atoll(argv[5]); - int pulse_id_step = atoi(argv[6]); - - std::vector pulse_ids_to_write; - uint64_t i_pulse_id = start_pulse_id; - for (size_t i=0; i reading_threads(n_modules); - for (size_t i_module=0; i_module( - end_time-start_time).count(); - - image_assembler.free_slot(pulse_id); - - cout << "sf_writer:avg_write_us "; - cout << write_us_duration / BUFFER_BLOCK_SIZE << endl; - } - - for (auto& reading_thread : reading_threads) { - if (reading_thread.joinable()) { - reading_thread.join(); - } - } - - return 0; }