From 399027340a1c63647b6a73a6474f90a6b6098580 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 20 May 2020 12:33:53 +0200 Subject: [PATCH] Finished refactoring sf-replay --- sf-replay/CMakeLists.txt | 20 ++++ sf-replay/include/ReplayH5Reader.hpp | 39 +++++++ sf-replay/src/ReplayH5Reader.cpp | 138 ++++++++++++++++++++++++ sf-replay/src/main.cpp | 140 +++++++++++++++++++++++++ sf-replay/test/CMakeLists.txt | 11 ++ sf-replay/test/main.cpp | 9 ++ sf-replay/test/test_ReplayH5Reader.cpp | 111 ++++++++++++++++++++ 7 files changed, 468 insertions(+) create mode 100644 sf-replay/CMakeLists.txt create mode 100644 sf-replay/include/ReplayH5Reader.hpp create mode 100644 sf-replay/src/ReplayH5Reader.cpp create mode 100644 sf-replay/src/main.cpp create mode 100644 sf-replay/test/CMakeLists.txt create mode 100644 sf-replay/test/main.cpp create mode 100644 sf-replay/test/test_ReplayH5Reader.cpp diff --git a/sf-replay/CMakeLists.txt b/sf-replay/CMakeLists.txt new file mode 100644 index 0000000..d1430f3 --- /dev/null +++ b/sf-replay/CMakeLists.txt @@ -0,0 +1,20 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(sf-replay-lib STATIC ${SOURCES}) +target_include_directories(sf-replay-lib PUBLIC include/) +target_link_libraries(sf-replay-lib + external + core-buffer-lib) + +add_executable(sf-replay src/main.cpp) +set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay) +target_link_libraries(sf-replay + core-buffer-lib + sf-replay-lib + zmq + hdf5 + hdf5_cpp) + +enable_testing() +add_subdirectory(test/) \ No newline at end of file diff --git a/sf-replay/include/ReplayH5Reader.hpp b/sf-replay/include/ReplayH5Reader.hpp new file mode 100644 index 0000000..d34beaa --- /dev/null +++ b/sf-replay/include/ReplayH5Reader.hpp @@ -0,0 +1,39 @@ +#ifndef SF_DAQ_BUFFER_REPLAYH5READER_HPP +#define SF_DAQ_BUFFER_REPLAYH5READER_HPP + +#include +#include "jungfrau.hpp" +#include +#include +#include "buffer_config.hpp" + + +class ReplayH5Reader { + + const std::string device_; + const std::string channel_name_; + + H5::H5File current_file_; + std::string current_filename_; + H5::DataSet dset_metadata_; + H5::DataSet dset_frame_; + + std::unique_ptr frame_buffer_ = std::make_unique( + core_buffer::MODULE_N_BYTES * core_buffer::REPLAY_READ_BUFFER_SIZE); + std::unique_ptr metadata_buffer_ = + std::make_unique(core_buffer::FILE_MOD); + + uint64_t buffer_start_pulse_id_ = 0; + uint64_t buffer_end_pulse_id_ = 0; + void prepare_buffer_for_pulse(const uint64_t pulse_id); + +public: + ReplayH5Reader(const std::string device, const std::string channel_name); + virtual ~ReplayH5Reader(); + void close_file(); + bool get_frame( + const uint64_t pulse_id, ModuleFrame* metadata, char* frame_buffer); +}; + + +#endif //SF_DAQ_BUFFER_REPLAYH5READER_HPP diff --git a/sf-replay/src/ReplayH5Reader.cpp b/sf-replay/src/ReplayH5Reader.cpp new file mode 100644 index 0000000..24dc53d --- /dev/null +++ b/sf-replay/src/ReplayH5Reader.cpp @@ -0,0 +1,138 @@ +#include "ReplayH5Reader.hpp" + +#include "BufferUtils.hpp" +#include +#include +#include +#include "date.h" + +using namespace std; +using namespace core_buffer; + +void ReplayH5Reader::prepare_buffer_for_pulse(const uint64_t pulse_id) +{ + auto pulse_filename = BufferUtils::get_filename( + device_, channel_name_, pulse_id); + + if (pulse_filename != current_filename_) { + close_file(); + + current_filename_ = pulse_filename; + current_file_ = H5::H5File(current_filename_, H5F_ACC_RDONLY); + + dset_metadata_ = current_file_.openDataSet(BUFFER_H5_METADATA_DATASET); + dset_frame_ = current_file_.openDataSet(BUFFER_H5_FRAME_DATASET); + + // We always read the metadata for the entire file. + hsize_t b_metadata_dims[2] = + {FILE_MOD, ModuleFrame_N_FIELDS}; + H5::DataSpace b_m_space (2, b_metadata_dims); + hsize_t b_m_count[] = + {FILE_MOD, ModuleFrame_N_FIELDS}; + hsize_t b_m_start[] = {0, 0}; + b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); + + hsize_t f_metadata_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS}; + H5::DataSpace f_m_space (2, f_metadata_dims); + hsize_t f_m_count[] = + {FILE_MOD, ModuleFrame_N_FIELDS}; + hsize_t f_m_start[] = {0, 0}; + f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start); + + dset_metadata_.read(&(metadata_buffer_[0]), H5::PredType::NATIVE_UINT64, + b_m_space, f_m_space); + + buffer_start_pulse_id_ = 0; + buffer_end_pulse_id_ = 0; + } + + // End pulse_id is not included in the buffer. + if ((pulse_id >= buffer_start_pulse_id_) && + (pulse_id < buffer_end_pulse_id_)) { + return; + } + + buffer_start_pulse_id_ = pulse_id - (pulse_id % REPLAY_READ_BUFFER_SIZE); + buffer_end_pulse_id_ = buffer_start_pulse_id_ + REPLAY_READ_BUFFER_SIZE; + + auto start_index_in_file = BufferUtils::get_file_frame_index( + buffer_start_pulse_id_); + + hsize_t b_image_dims[3] = + {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; + H5::DataSpace b_f_space (3, b_image_dims); + hsize_t b_i_count[] = + {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; + hsize_t b_i_start[] = {0, 0, 0}; + b_f_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); + + hsize_t f_frame_dims[3] = {FILE_MOD, MODULE_Y_SIZE, MODULE_X_SIZE}; + H5::DataSpace f_f_space (3, f_frame_dims); + hsize_t f_f_count[] = + {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; + hsize_t f_f_start[] = {start_index_in_file, 0, 0}; + f_f_space.selectHyperslab(H5S_SELECT_SET, f_f_count, f_f_start); + + dset_frame_.read(&(frame_buffer_[0]), H5::PredType::NATIVE_UINT16, + b_f_space, f_f_space); +} + +ReplayH5Reader::ReplayH5Reader( + const string device, + const string channel_name) : + device_(device), + channel_name_(channel_name) +{ +} + +ReplayH5Reader::~ReplayH5Reader() +{ + close_file(); +} + +void ReplayH5Reader::close_file() +{ + if (current_file_.getId() != -1) { + dset_metadata_.close(); + dset_frame_.close(); + current_file_.close(); + } +} + +bool ReplayH5Reader::get_frame( + const uint64_t pulse_id, ModuleFrame* metadata, char* frame_buffer) +{ + prepare_buffer_for_pulse(pulse_id); + + auto metadata_buffer_index = BufferUtils::get_file_frame_index(pulse_id); + memcpy(metadata, + &(metadata_buffer_[metadata_buffer_index]), + sizeof(ModuleFrame)); + + auto frame_buffer_index = pulse_id - buffer_start_pulse_id_; + memcpy(frame_buffer, + &(frame_buffer_[frame_buffer_index * MODULE_N_BYTES]), + MODULE_N_BYTES); + + if (metadata->pulse_id == 0) { + // Signal that there is no frame at this pulse_id. + metadata->pulse_id = pulse_id; + return false; + + }else if (metadata->pulse_id != pulse_id) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[ReplayH5Reader::get_frame]"; + err_msg << " Corrupted file " << current_filename_; + err_msg << " index_in_file " << metadata_buffer_index; + err_msg << " expected pulse_id " << pulse_id; + err_msg << " but read " << metadata->pulse_id << endl; + + throw runtime_error(err_msg.str()); + } + + return true; +} diff --git a/sf-replay/src/main.cpp b/sf-replay/src/main.cpp new file mode 100644 index 0000000..b7b8ae9 --- /dev/null +++ b/sf-replay/src/main.cpp @@ -0,0 +1,140 @@ +#include +#include +#include "jungfrau.hpp" + +#include "zmq.h" +#include "buffer_config.hpp" + +#include +#include "ReplayH5Reader.hpp" +#include "date.h" +#include "bitshuffle/bitshuffle.h" + +using namespace std; +using namespace core_buffer; + +void sf_replay ( + void* socket, + const string& device, + const string& channel_name, + const uint64_t start_pulse_id, + const uint64_t stop_pulse_id) +{ + StreamModuleFrame metadata_buffer; + auto frame_buffer = make_unique(MODULE_N_PIXELS); + + ReplayH5Reader file_reader(device, channel_name); + + //TODO: Add statstics. + uint64_t stats_counter = 0; + + uint64_t total_read_us = 0; + uint64_t max_read_us = 0; + uint64_t total_send_us = 0; + uint64_t max_send_us = 0; + + // "<= stop_pulse_id" because we include the stop_pulse_id in the file. + for ( + uint64_t curr_pulse_id = start_pulse_id; + curr_pulse_id <= stop_pulse_id; + curr_pulse_id++) { + + auto start_time = chrono::steady_clock::now(); + + metadata_buffer.is_frame_present = file_reader.get_frame( + curr_pulse_id, + &(metadata_buffer.metadata), + (char*)(frame_buffer.get())); + + metadata_buffer.data_n_bytes = MODULE_N_BYTES; + + auto end_time = chrono::steady_clock::now(); + auto read_us_duration = chrono::duration_cast( + end_time-start_time).count(); + + start_time = chrono::steady_clock::now(); + + zmq_send(socket, + &metadata_buffer, + sizeof(StreamModuleFrame), + ZMQ_SNDMORE); + zmq_send(socket, + (char*)(frame_buffer.get()), + metadata_buffer.data_n_bytes, + 0); + + end_time = chrono::steady_clock::now(); + auto send_us_duration = chrono::duration_cast( + end_time-start_time).count(); + + // TODO: Make proper stastistics. + stats_counter++; + total_read_us += read_us_duration; + max_read_us = max(max_read_us, (uint64_t)read_us_duration); + + total_send_us += send_us_duration; + max_send_us = max(max_send_us, (uint64_t)send_us_duration); + + if (stats_counter == STATS_MODULO) { + cout << "sf_replay:avg_read_us " << total_read_us/STATS_MODULO; + cout << " sf_replay:max_read_us " << max_read_us; + cout << " sf_replay:avg_send_us " << total_send_us/STATS_MODULO; + cout << " sf_replay:max_send_us " << max_send_us; + + cout << endl; + + stats_counter = 0; + total_read_us = 0; + max_read_us = 0; + total_send_us = 0; + max_send_us = 0; + } + } +} + +int main (int argc, char *argv[]) { + + if (argc != 6) { + cout << endl; + cout << "Usage: sf_replay [device]"; + cout << " [channel_name] [source_id] [start_pulse_id] [stop_pulse_id]"; + cout << endl; + cout << "\tdevice: Name of detector." << endl; + cout << "\tchannel_name: M00-M31 for JF16M." << endl; + cout << "\tsource_id: Module index" << endl; + cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; + cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; + cout << endl; + + exit(-1); + } + + const string device = string(argv[1]); + const string channel_name = string(argv[2]); + const auto source_id = (uint16_t) atoi(argv[3]); + const auto start_pulse_id = (uint64_t) atoll(argv[4]); + const auto stop_pulse_id = (uint64_t) atoll(argv[5]); + + stringstream ipc_stream; + ipc_stream << REPLAY_STREAM_IPC_URL << (int)source_id; + const auto ipc_address = ipc_stream.str(); + + auto ctx = zmq_ctx_new(); + auto socket = zmq_socket(ctx, ZMQ_PUSH); + + const int sndhwm = REPLAY_SNDHWM; + if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) + throw runtime_error(strerror (errno)); + + const int linger_ms = -1; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) + throw runtime_error(strerror (errno)); + + if (zmq_bind(socket, ipc_address.c_str()) != 0) + throw runtime_error(strerror (errno)); + + sf_replay(socket, device, channel_name, start_pulse_id, stop_pulse_id); + + zmq_close(socket); + zmq_ctx_destroy(ctx); +} diff --git a/sf-replay/test/CMakeLists.txt b/sf-replay/test/CMakeLists.txt new file mode 100644 index 0000000..912d369 --- /dev/null +++ b/sf-replay/test/CMakeLists.txt @@ -0,0 +1,11 @@ +add_executable(sf-replay-tests main.cpp) + +target_link_libraries(sf-replay-tests + core-buffer-lib + sf-buffer-lib + sf-replay-lib + hdf5 + hdf5_cpp + gtest + ) + diff --git a/sf-replay/test/main.cpp b/sf-replay/test/main.cpp new file mode 100644 index 0000000..d40a5df --- /dev/null +++ b/sf-replay/test/main.cpp @@ -0,0 +1,9 @@ +#include "gtest/gtest.h" +#include "test_ReplayH5Reader.cpp" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/sf-replay/test/test_ReplayH5Reader.cpp b/sf-replay/test/test_ReplayH5Reader.cpp new file mode 100644 index 0000000..7559c0c --- /dev/null +++ b/sf-replay/test/test_ReplayH5Reader.cpp @@ -0,0 +1,111 @@ +#include "ReplayH5Reader.hpp" +#include "BufferH5Writer.hpp" +#include "gtest/gtest.h" + +using namespace std; +using namespace core_buffer; + +TEST(ReplayH5Reader, basic_interaction) +{ + auto root_folder = "."; + auto device_name = "fast_device"; + + // This 2 must be compatible by design. + BufferH5Writer writer(root_folder, device_name); + ReplayH5Reader reader(root_folder, device_name); + + size_t pulse_id = 65; + + ModuleFrame w_metadata; + ModuleFrame r_metadata; + auto w_frame_buffer = make_unique(MODULE_N_PIXELS); + auto r_frame_buffer = make_unique(MODULE_N_PIXELS); + + // Setup test values. + w_metadata.pulse_id = pulse_id; + w_metadata.frame_index = 2; + w_metadata.daq_rec = 3; + w_metadata.n_received_packets = 128; + w_metadata.module_id = 4; + + for (size_t i=0; i(MODULE_N_PIXELS); + auto r_frame_buffer = make_unique(MODULE_N_PIXELS); + + // Setup test values. + w_metadata.pulse_id = pulse_id; + w_metadata.frame_index = 2; + w_metadata.daq_rec = 3; + w_metadata.n_received_packets = 128; + w_metadata.module_id = 4; + + for (size_t i=0; i