From 5ba868a7aeb6ef0cd8da9f8d515edb50ce530571 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 26 May 2020 12:55:56 +0200 Subject: [PATCH] Create buffered send --- sf-replay/include/ReplayH5Reader.hpp | 11 +-- sf-replay/include/ReplayZmqSender.hpp | 3 +- sf-replay/src/ReplayH5Reader.cpp | 114 +++++++++++--------------- sf-replay/src/ReplayZmqSender.cpp | 6 +- sf-replay/src/main.cpp | 33 ++------ 5 files changed, 60 insertions(+), 107 deletions(-) diff --git a/sf-replay/include/ReplayH5Reader.hpp b/sf-replay/include/ReplayH5Reader.hpp index 2a2c5d8..009a028 100644 --- a/sf-replay/include/ReplayH5Reader.hpp +++ b/sf-replay/include/ReplayH5Reader.hpp @@ -17,13 +17,6 @@ class ReplayH5Reader { H5::DataSet dset_metadata_; H5::DataSet dset_frame_; - ModuleFrame* m_buffer_ = nullptr; - char* f_buffer_ = nullptr; - uint64_t buffer_start_pulse_id_ = 0; - uint64_t buffer_end_pulse_id_ = 0; - - void load_buffers(const uint64_t pulse_id); - public: ReplayH5Reader( const std::string device, @@ -33,8 +26,8 @@ public: void close_file(); void get_buffer( const uint64_t pulse_id, - ModuleFrame*& metadata, - char*& frame_buffer); + ReplayBuffer* metadata, + char* frame_buffer); }; diff --git a/sf-replay/include/ReplayZmqSender.hpp b/sf-replay/include/ReplayZmqSender.hpp index 52507a7..845c9f0 100644 --- a/sf-replay/include/ReplayZmqSender.hpp +++ b/sf-replay/include/ReplayZmqSender.hpp @@ -3,6 +3,7 @@ #include #include +#include class ReplayZmqSender { @@ -15,7 +16,7 @@ public: void close(); - void send(const ModuleFrame* metadata, const char* data); + void send(const ReplayBuffer* metadata, const char* data); }; diff --git a/sf-replay/src/ReplayH5Reader.cpp b/sf-replay/src/ReplayH5Reader.cpp index c38bfc2..9a126ab 100644 --- a/sf-replay/src/ReplayH5Reader.cpp +++ b/sf-replay/src/ReplayH5Reader.cpp @@ -5,7 +5,32 @@ using namespace std; using namespace core_buffer; -void ReplayH5Reader::load_buffers(const uint64_t pulse_id) +ReplayH5Reader::ReplayH5Reader( + const string device, + const string channel_name) : + device_(device), + channel_name_(channel_name) +{ +} + +ReplayH5Reader::~ReplayH5Reader() +{ + close_file(); +} + +void ReplayH5Reader::close_file() +{ + if (current_file_.getId() != -1) { + dset_metadata_.close(); + dset_frame_.close(); + current_file_.close(); + } +} + +void ReplayH5Reader::get_buffer( + const uint64_t pulse_id, + ReplayBuffer* metadata, + char* data) { auto pulse_filename = BufferUtils::get_filename( device_, channel_name_, pulse_id); @@ -18,29 +43,31 @@ void ReplayH5Reader::load_buffers(const uint64_t pulse_id) dset_metadata_ = current_file_.openDataSet(BUFFER_H5_METADATA_DATASET); dset_frame_ = current_file_.openDataSet(BUFFER_H5_FRAME_DATASET); - - hsize_t b_m_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS}; - H5::DataSpace b_m_space (2, b_m_dims); - hsize_t b_m_count[] = {FILE_MOD, ModuleFrame_N_FIELDS}; - hsize_t b_m_start[] = {0, 0}; - b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); - - hsize_t f_m_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS}; - H5::DataSpace f_m_space (2, f_m_dims); - hsize_t f_m_count[] = {FILE_MOD, ModuleFrame_N_FIELDS}; - hsize_t pulse_id_start[] = {0, 0}; - f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, pulse_id_start); - - dset_metadata_.read( - m_buffer_, H5::PredType::NATIVE_UINT64, b_m_space, f_m_space); } auto file_index = BufferUtils::get_file_frame_index(pulse_id); auto cache_start_index = file_index / REPLAY_READ_BUFFER_SIZE; cache_start_index *= REPLAY_READ_BUFFER_SIZE; - buffer_start_pulse_id_ = pulse_id - (file_index - cache_start_index); - buffer_end_pulse_id_ = buffer_start_pulse_id_ + REPLAY_READ_BUFFER_SIZE - 1; + uint64_t b_start_pulse_id = pulse_id - (file_index - cache_start_index); + metadata->start_pulse_id = b_start_pulse_id; + metadata->n_frames = REPLAY_READ_BUFFER_SIZE; + + hsize_t b_m_dims[2] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS}; + H5::DataSpace b_m_space (2, b_m_dims); + hsize_t b_m_count[] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS}; + hsize_t b_m_start[] = {cache_start_index, 0}; + b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); + + hsize_t f_m_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS}; + H5::DataSpace f_m_space (2, f_m_dims); + hsize_t f_m_count[] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS}; + hsize_t pulse_id_start[] = {cache_start_index, 0}; + f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, pulse_id_start); + + dset_metadata_.read( + &metadata->metadata[0], + H5::PredType::NATIVE_UINT64, b_m_space, f_m_space); hsize_t b_f_dims[3] = {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; @@ -58,54 +85,5 @@ void ReplayH5Reader::load_buffers(const uint64_t pulse_id) f_f_space.selectHyperslab(H5S_SELECT_SET, f_f_count, f_f_start); dset_frame_.read( - f_buffer_, H5::PredType::NATIVE_UINT16, b_f_space, f_f_space); -} - -ReplayH5Reader::ReplayH5Reader( - const string device, - const string channel_name) : - device_(device), - channel_name_(channel_name) -{ - m_buffer_ = new ModuleFrame[FILE_MOD]; - f_buffer_ = new char[MODULE_N_BYTES * REPLAY_READ_BUFFER_SIZE]; -} - -ReplayH5Reader::~ReplayH5Reader() -{ - close_file(); - - delete[] m_buffer_; - delete[] f_buffer_; -} - -void ReplayH5Reader::close_file() -{ - if (current_file_.getId() != -1) { - dset_metadata_.close(); - dset_frame_.close(); - current_file_.close(); - } -} - -void ReplayH5Reader::get_buffer( - const uint64_t pulse_id, - ModuleFrame*& metadata, - char*& data) -{ - // Buffer start and end pulse_ids are inclusive. - if ((pulse_id < buffer_start_pulse_id_) || - (pulse_id > buffer_end_pulse_id_)) { - load_buffers(pulse_id); - } - - auto file_index = BufferUtils::get_file_frame_index(pulse_id); - auto buffer_index = pulse_id - buffer_start_pulse_id_; - - metadata = m_buffer_ + file_index; - data = f_buffer_ + (buffer_index * MODULE_N_BYTES); - - if (metadata->pulse_id != 0 && metadata->pulse_id != pulse_id) { - throw runtime_error("Corrupted buffer file."); - } + data, H5::PredType::NATIVE_UINT16, b_f_space, f_f_space); } diff --git a/sf-replay/src/ReplayZmqSender.cpp b/sf-replay/src/ReplayZmqSender.cpp index 5ffd9d6..3d982ae 100644 --- a/sf-replay/src/ReplayZmqSender.cpp +++ b/sf-replay/src/ReplayZmqSender.cpp @@ -41,8 +41,8 @@ void ReplayZmqSender::close() { zmq_ctx_destroy(ctx_); } -void ReplayZmqSender::send(const ModuleFrame* metadata, const char* data) +void ReplayZmqSender::send(const ReplayBuffer* metadata, const char* data) { - zmq_send(socket_, metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); - zmq_send(socket_, data, MODULE_N_BYTES, 0); + zmq_send(socket_, metadata, sizeof(ReplayBuffer), ZMQ_SNDMORE); + zmq_send(socket_, data, MODULE_N_BYTES * REPLAY_READ_BUFFER_SIZE, 0); } \ No newline at end of file diff --git a/sf-replay/src/main.cpp b/sf-replay/src/main.cpp index e64ef37..e2cddc8 100644 --- a/sf-replay/src/main.cpp +++ b/sf-replay/src/main.cpp @@ -14,15 +14,11 @@ using namespace chrono; void sf_replay ( const string device, const string channel_name, - FastQueue& queue, + FastQueue& queue, const uint64_t start_pulse_id, const uint64_t stop_pulse_id ) { - uint64_t read_us = 0; - uint64_t max_read_us = 0; - uint64_t n_stats = 0; - ReplayH5Reader file_reader(device, channel_name); // "<= stop_pulse_id" because we include the stop_pulse_id in the file. @@ -41,12 +37,7 @@ void sf_replay ( auto metadata = queue.get_metadata_buffer(slot_id); auto buffer = queue.get_data_buffer(slot_id); - ModuleFrame* m_buffer; - char* f_buffer; - file_reader.get_buffer(curr_pulse_id, m_buffer, f_buffer); - - memcpy(metadata, m_buffer, sizeof(ModuleFrame)); - memcpy(buffer, f_buffer, MODULE_N_BYTES); + file_reader.get_buffer(curr_pulse_id, metadata, buffer); auto end_time = steady_clock::now(); uint64_t read_us_duration = @@ -55,20 +46,8 @@ void sf_replay ( queue.commit(); // TODO: Proper statistics - n_stats++; - - read_us += read_us_duration; - max_read_us = max(max_read_us, read_us_duration); - - if (n_stats == STATS_MODULO) { - cout << "sf_replay:avg_read_us " << read_us / STATS_MODULO; - cout << " sf_replay:max_read_us " << max_read_us; - cout << endl; - - n_stats = 0; - read_us = 0; - max_read_us = 0; - } + cout << "sf_replay:avg_read_us "; + cout << read_us_duration / REPLAY_READ_BUFFER_SIZE << endl; } } @@ -96,7 +75,9 @@ int main (int argc, char *argv[]) { const auto start_pulse_id = (uint64_t) atoll(argv[5]); const auto stop_pulse_id = (uint64_t) atoll(argv[6]); - FastQueue queue(MODULE_N_BYTES, REPLAY_FASTQUEUE_N_SLOTS); + FastQueue queue( + MODULE_N_BYTES * REPLAY_READ_BUFFER_SIZE, + REPLAY_FASTQUEUE_N_SLOTS); thread file_read_thread(sf_replay, device, channel_name, ref(queue),