From 78c3ae026275e731a378ed4b5671ba89256cb3b8 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 May 2020 09:28:50 +0200 Subject: [PATCH] Add buffering to replay --- core-buffer/include/ReplayH5Reader.hpp | 9 ++- core-buffer/include/buffer_config.hpp | 2 + core-buffer/src/ReplayH5Reader.cpp | 105 ++++++++++++++++--------- 3 files changed, 78 insertions(+), 38 deletions(-) diff --git a/core-buffer/include/ReplayH5Reader.hpp b/core-buffer/include/ReplayH5Reader.hpp index 5d46952..6bd84ca 100644 --- a/core-buffer/include/ReplayH5Reader.hpp +++ b/core-buffer/include/ReplayH5Reader.hpp @@ -4,6 +4,7 @@ #include #include "jungfrau.hpp" #include +#include class ReplayH5Reader { @@ -16,7 +17,13 @@ class ReplayH5Reader { H5::DataSet dset_metadata_; H5::DataSet dset_frame_; - void prepare_file_for_pulse(const uint64_t pulse_id); + std::unique_ptr frame_buffer = make_unique( + MODULE_N_BYTES * REPLAY_READ_BUFFER_SIZE); + std::unique_ptr metadata_buffer = make_unique( + sizeof(ModuleFrame) * FILE_MOD); + uint64_t buffer_start_pulse_id_ = 0; + uint64_t buffer_end_pulse_id_ = 0; + void prepare_buffer_for_pulse(const uint64_t pulse_id); public: ReplayH5Reader(const std::string device, const std::string channel_name); diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index a743b2f..3bd9b42 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -32,6 +32,8 @@ namespace core_buffer { const std::string REPLAY_STREAM_IPC_URL = "ipc:///tmp/sf-replay-"; + const size_t REPLAY_READ_BUFFER_SIZE = 100; + const size_t BUFFER_UDP_N_RECV_MSG = 64; // Size of UDP recv buffer diff --git a/core-buffer/src/ReplayH5Reader.cpp b/core-buffer/src/ReplayH5Reader.cpp index a4d5b0b..049917b 100644 --- a/core-buffer/src/ReplayH5Reader.cpp +++ b/core-buffer/src/ReplayH5Reader.cpp @@ -4,27 +4,78 @@ #include "buffer_config.hpp" #include #include +#include #include "date.h" using namespace std; using namespace core_buffer; -void ReplayH5Reader::prepare_file_for_pulse(const uint64_t pulse_id) +void ReplayH5Reader::prepare_buffer_for_pulse(const uint64_t pulse_id) { auto pulse_filename = BufferUtils::get_filename( device_, channel_name_, pulse_id); - if (pulse_filename == current_filename_) { + if (pulse_filename != current_filename_) { + close_file(); + + current_filename_ = pulse_filename; + current_file_ = H5::H5File(current_filename_, H5F_ACC_RDONLY); + + dset_metadata_ = current_file_.openDataSet(BUFFER_H5_METADATA_DATASET); + dset_frame_ = current_file_.openDataSet(BUFFER_H5_FRAME_DATASET); + + // We always read the metadata for the entire file. + hsize_t b_metadata_dims[2] = + {FILE_MOD, ModuleFrame_N_FIELDS}; + H5::DataSpace b_m_space (2, b_metadata_dims); + hsize_t b_m_count[] = + {FILE_MOD, ModuleFrame_N_FIELDS}; + hsize_t b_m_start[] = {0, 0}; + b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); + + hsize_t f_metadata_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS}; + H5::DataSpace f_m_space (2, f_metadata_dims); + hsize_t f_m_count[] = + {FILE_MOD, ModuleFrame_N_FIELDS}; + hsize_t f_m_start[] = {0, 0}; + f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start); + + dset_metadata_.read(&(metadata_buffer[0]), H5::PredType::NATIVE_UINT64, + b_m_space, f_m_space); + + buffer_start_pulse_id_ = 0; + buffer_end_pulse_id_ = 0; + } + + // End pulse_id is not included in the buffer. + if ((pulse_id >= buffer_start_pulse_id_) && + (pulse_id < buffer_end_pulse_id_)) { return; } - close_file(); + buffer_start_pulse_id_ = pulse_id - (pulse_id % REPLAY_READ_BUFFER_SIZE); + buffer_end_pulse_id_ = buffer_start_pulse_id_ + REPLAY_READ_BUFFER_SIZE; - current_filename_ = pulse_filename; - current_file_ = H5::H5File(current_filename_, H5F_ACC_RDONLY); + auto start_index_in_file = BufferUtils::get_file_frame_index( + buffer_start_pulse_id_); - dset_metadata_ = current_file_.openDataSet(BUFFER_H5_METADATA_DATASET); - dset_frame_ = current_file_.openDataSet(BUFFER_H5_FRAME_DATASET); + hsize_t b_image_dims[3] = + {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; + H5::DataSpace b_f_space (3, b_image_dims); + hsize_t b_i_count[] = + {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; + hsize_t b_i_start[] = {0, 0, 0}; + b_f_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); + + hsize_t f_frame_dims[3] = {FILE_MOD, MODULE_Y_SIZE, MODULE_X_SIZE}; + H5::DataSpace f_f_space (3, f_frame_dims); + hsize_t f_f_count[] = + {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; + hsize_t f_f_start[] = {start_index_in_file, 0, 0}; + f_f_space.selectHyperslab(H5S_SELECT_SET, f_f_count, f_f_start); + + dset_frame_.read(frame_buffer, H5::PredType::NATIVE_UINT16, + b_f_space, f_f_space); } ReplayH5Reader::ReplayH5Reader( @@ -52,39 +103,19 @@ void ReplayH5Reader::close_file() bool ReplayH5Reader::get_frame( const uint64_t pulse_id, ModuleFrame* metadata, char* frame_buffer) { - prepare_file_for_pulse(pulse_id); + prepare_buffer_for_pulse(pulse_id); - auto index_in_file = BufferUtils::get_file_frame_index(pulse_id); - hsize_t b_metadata_dims[2] = {1, ModuleFrame_N_FIELDS}; - H5::DataSpace b_m_space (2, b_metadata_dims); - hsize_t b_m_count[] = {1, ModuleFrame_N_FIELDS}; - hsize_t b_m_start[] = {0, 0}; - b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); + auto metadata_buffer_index = BufferUtils::get_file_frame_index(pulse_id); + memcpy(metadata, + &(metadata_buffer[metadata_buffer_index]), + sizeof(ModuleFrame)); - hsize_t f_metadata_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS}; - H5::DataSpace f_m_space (2, f_metadata_dims); - hsize_t f_m_count[] = {1, ModuleFrame_N_FIELDS}; - hsize_t f_m_start[] = {index_in_file, 0}; - f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start); + auto frame_buffer_index = pulse_id - buffer_start_pulse_id_; + memcpy(frame_buffer, + &(frame_buffer[frame_buffer_index]), + MODULE_N_BYTES); - dset_metadata_.read(metadata, H5::PredType::NATIVE_UINT64, - b_m_space, f_m_space); - - hsize_t b_image_dims[3] = {1, MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DataSpace b_f_space (3, b_image_dims); - hsize_t b_i_count[] = {1, MODULE_Y_SIZE, MODULE_X_SIZE}; - hsize_t b_i_start[] = {0, 0, 0}; - b_f_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); - - hsize_t f_frame_dims[3] = {FILE_MOD, MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DataSpace f_f_space (3, f_frame_dims); - hsize_t f_f_count[] = {1, MODULE_Y_SIZE, MODULE_X_SIZE}; - hsize_t f_f_start[] = {index_in_file, 0, 0}; - f_f_space.selectHyperslab(H5S_SELECT_SET, f_f_count, f_f_start); - - dset_frame_.read(frame_buffer, H5::PredType::NATIVE_UINT16, - b_f_space, f_f_space); if (metadata->pulse_id == 0) { // Signal that there is no frame at this pulse_id. @@ -99,7 +130,7 @@ bool ReplayH5Reader::get_frame( err_msg << "[" << system_clock::now() << "]"; err_msg << "[ReplayH5Reader::get_frame]"; err_msg << " Corrupted file " << current_filename_; - err_msg << " index_in_file " << index_in_file; + err_msg << " index_in_file " << metadata_buffer_index; err_msg << " expected pulse_id " << pulse_id; err_msg << " but read " << metadata->pulse_id << endl;