Finished refactoring sf-replay

This commit is contained in:
2020-05-20 12:33:53 +02:00
parent 618dd7110e
commit 399027340a
7 changed files with 468 additions and 0 deletions
+20
View File
@@ -0,0 +1,20 @@
file(GLOB SOURCES
src/*.cpp)
add_library(sf-replay-lib STATIC ${SOURCES})
target_include_directories(sf-replay-lib PUBLIC include/)
target_link_libraries(sf-replay-lib
external
core-buffer-lib)
add_executable(sf-replay src/main.cpp)
set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay)
target_link_libraries(sf-replay
core-buffer-lib
sf-replay-lib
zmq
hdf5
hdf5_cpp)
enable_testing()
add_subdirectory(test/)
+39
View File
@@ -0,0 +1,39 @@
#ifndef SF_DAQ_BUFFER_REPLAYH5READER_HPP
#define SF_DAQ_BUFFER_REPLAYH5READER_HPP
#include <string>
#include "jungfrau.hpp"
#include <H5Cpp.h>
#include <memory>
#include "buffer_config.hpp"
class ReplayH5Reader {
const std::string device_;
const std::string channel_name_;
H5::H5File current_file_;
std::string current_filename_;
H5::DataSet dset_metadata_;
H5::DataSet dset_frame_;
std::unique_ptr<char[]> frame_buffer_ = std::make_unique<char[]>(
core_buffer::MODULE_N_BYTES * core_buffer::REPLAY_READ_BUFFER_SIZE);
std::unique_ptr<ModuleFrame[]> metadata_buffer_ =
std::make_unique<ModuleFrame[]>(core_buffer::FILE_MOD);
uint64_t buffer_start_pulse_id_ = 0;
uint64_t buffer_end_pulse_id_ = 0;
void prepare_buffer_for_pulse(const uint64_t pulse_id);
public:
ReplayH5Reader(const std::string device, const std::string channel_name);
virtual ~ReplayH5Reader();
void close_file();
bool get_frame(
const uint64_t pulse_id, ModuleFrame* metadata, char* frame_buffer);
};
#endif //SF_DAQ_BUFFER_REPLAYH5READER_HPP
+138
View File
@@ -0,0 +1,138 @@
#include "ReplayH5Reader.hpp"
#include "BufferUtils.hpp"
#include <iostream>
#include <chrono>
#include <cstring>
#include "date.h"
using namespace std;
using namespace core_buffer;
void ReplayH5Reader::prepare_buffer_for_pulse(const uint64_t pulse_id)
{
auto pulse_filename = BufferUtils::get_filename(
device_, channel_name_, pulse_id);
if (pulse_filename != current_filename_) {
close_file();
current_filename_ = pulse_filename;
current_file_ = H5::H5File(current_filename_, H5F_ACC_RDONLY);
dset_metadata_ = current_file_.openDataSet(BUFFER_H5_METADATA_DATASET);
dset_frame_ = current_file_.openDataSet(BUFFER_H5_FRAME_DATASET);
// We always read the metadata for the entire file.
hsize_t b_metadata_dims[2] =
{FILE_MOD, ModuleFrame_N_FIELDS};
H5::DataSpace b_m_space (2, b_metadata_dims);
hsize_t b_m_count[] =
{FILE_MOD, ModuleFrame_N_FIELDS};
hsize_t b_m_start[] = {0, 0};
b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start);
hsize_t f_metadata_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS};
H5::DataSpace f_m_space (2, f_metadata_dims);
hsize_t f_m_count[] =
{FILE_MOD, ModuleFrame_N_FIELDS};
hsize_t f_m_start[] = {0, 0};
f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start);
dset_metadata_.read(&(metadata_buffer_[0]), H5::PredType::NATIVE_UINT64,
b_m_space, f_m_space);
buffer_start_pulse_id_ = 0;
buffer_end_pulse_id_ = 0;
}
// End pulse_id is not included in the buffer.
if ((pulse_id >= buffer_start_pulse_id_) &&
(pulse_id < buffer_end_pulse_id_)) {
return;
}
buffer_start_pulse_id_ = pulse_id - (pulse_id % REPLAY_READ_BUFFER_SIZE);
buffer_end_pulse_id_ = buffer_start_pulse_id_ + REPLAY_READ_BUFFER_SIZE;
auto start_index_in_file = BufferUtils::get_file_frame_index(
buffer_start_pulse_id_);
hsize_t b_image_dims[3] =
{REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DataSpace b_f_space (3, b_image_dims);
hsize_t b_i_count[] =
{REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE};
hsize_t b_i_start[] = {0, 0, 0};
b_f_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start);
hsize_t f_frame_dims[3] = {FILE_MOD, MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DataSpace f_f_space (3, f_frame_dims);
hsize_t f_f_count[] =
{REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE};
hsize_t f_f_start[] = {start_index_in_file, 0, 0};
f_f_space.selectHyperslab(H5S_SELECT_SET, f_f_count, f_f_start);
dset_frame_.read(&(frame_buffer_[0]), H5::PredType::NATIVE_UINT16,
b_f_space, f_f_space);
}
ReplayH5Reader::ReplayH5Reader(
const string device,
const string channel_name) :
device_(device),
channel_name_(channel_name)
{
}
ReplayH5Reader::~ReplayH5Reader()
{
close_file();
}
void ReplayH5Reader::close_file()
{
if (current_file_.getId() != -1) {
dset_metadata_.close();
dset_frame_.close();
current_file_.close();
}
}
bool ReplayH5Reader::get_frame(
const uint64_t pulse_id, ModuleFrame* metadata, char* frame_buffer)
{
prepare_buffer_for_pulse(pulse_id);
auto metadata_buffer_index = BufferUtils::get_file_frame_index(pulse_id);
memcpy(metadata,
&(metadata_buffer_[metadata_buffer_index]),
sizeof(ModuleFrame));
auto frame_buffer_index = pulse_id - buffer_start_pulse_id_;
memcpy(frame_buffer,
&(frame_buffer_[frame_buffer_index * MODULE_N_BYTES]),
MODULE_N_BYTES);
if (metadata->pulse_id == 0) {
// Signal that there is no frame at this pulse_id.
metadata->pulse_id = pulse_id;
return false;
}else if (metadata->pulse_id != pulse_id) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[ReplayH5Reader::get_frame]";
err_msg << " Corrupted file " << current_filename_;
err_msg << " index_in_file " << metadata_buffer_index;
err_msg << " expected pulse_id " << pulse_id;
err_msg << " but read " << metadata->pulse_id << endl;
throw runtime_error(err_msg.str());
}
return true;
}
+140
View File
@@ -0,0 +1,140 @@
#include <iostream>
#include <thread>
#include "jungfrau.hpp"
#include "zmq.h"
#include "buffer_config.hpp"
#include <cstring>
#include "ReplayH5Reader.hpp"
#include "date.h"
#include "bitshuffle/bitshuffle.h"
using namespace std;
using namespace core_buffer;
void sf_replay (
void* socket,
const string& device,
const string& channel_name,
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id)
{
StreamModuleFrame metadata_buffer;
auto frame_buffer = make_unique<uint16_t[]>(MODULE_N_PIXELS);
ReplayH5Reader file_reader(device, channel_name);
//TODO: Add statstics.
uint64_t stats_counter = 0;
uint64_t total_read_us = 0;
uint64_t max_read_us = 0;
uint64_t total_send_us = 0;
uint64_t max_send_us = 0;
// "<= stop_pulse_id" because we include the stop_pulse_id in the file.
for (
uint64_t curr_pulse_id = start_pulse_id;
curr_pulse_id <= stop_pulse_id;
curr_pulse_id++) {
auto start_time = chrono::steady_clock::now();
metadata_buffer.is_frame_present = file_reader.get_frame(
curr_pulse_id,
&(metadata_buffer.metadata),
(char*)(frame_buffer.get()));
metadata_buffer.data_n_bytes = MODULE_N_BYTES;
auto end_time = chrono::steady_clock::now();
auto read_us_duration = chrono::duration_cast<chrono::microseconds>(
end_time-start_time).count();
start_time = chrono::steady_clock::now();
zmq_send(socket,
&metadata_buffer,
sizeof(StreamModuleFrame),
ZMQ_SNDMORE);
zmq_send(socket,
(char*)(frame_buffer.get()),
metadata_buffer.data_n_bytes,
0);
end_time = chrono::steady_clock::now();
auto send_us_duration = chrono::duration_cast<chrono::microseconds>(
end_time-start_time).count();
// TODO: Make proper stastistics.
stats_counter++;
total_read_us += read_us_duration;
max_read_us = max(max_read_us, (uint64_t)read_us_duration);
total_send_us += send_us_duration;
max_send_us = max(max_send_us, (uint64_t)send_us_duration);
if (stats_counter == STATS_MODULO) {
cout << "sf_replay:avg_read_us " << total_read_us/STATS_MODULO;
cout << " sf_replay:max_read_us " << max_read_us;
cout << " sf_replay:avg_send_us " << total_send_us/STATS_MODULO;
cout << " sf_replay:max_send_us " << max_send_us;
cout << endl;
stats_counter = 0;
total_read_us = 0;
max_read_us = 0;
total_send_us = 0;
max_send_us = 0;
}
}
}
int main (int argc, char *argv[]) {
if (argc != 6) {
cout << endl;
cout << "Usage: sf_replay [device]";
cout << " [channel_name] [source_id] [start_pulse_id] [stop_pulse_id]";
cout << endl;
cout << "\tdevice: Name of detector." << endl;
cout << "\tchannel_name: M00-M31 for JF16M." << endl;
cout << "\tsource_id: Module index" << endl;
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl;
cout << endl;
exit(-1);
}
const string device = string(argv[1]);
const string channel_name = string(argv[2]);
const auto source_id = (uint16_t) atoi(argv[3]);
const auto start_pulse_id = (uint64_t) atoll(argv[4]);
const auto stop_pulse_id = (uint64_t) atoll(argv[5]);
stringstream ipc_stream;
ipc_stream << REPLAY_STREAM_IPC_URL << (int)source_id;
const auto ipc_address = ipc_stream.str();
auto ctx = zmq_ctx_new();
auto socket = zmq_socket(ctx, ZMQ_PUSH);
const int sndhwm = REPLAY_SNDHWM;
if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0)
throw runtime_error(strerror (errno));
const int linger_ms = -1;
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0)
throw runtime_error(strerror (errno));
if (zmq_bind(socket, ipc_address.c_str()) != 0)
throw runtime_error(strerror (errno));
sf_replay(socket, device, channel_name, start_pulse_id, stop_pulse_id);
zmq_close(socket);
zmq_ctx_destroy(ctx);
}
+11
View File
@@ -0,0 +1,11 @@
add_executable(sf-replay-tests main.cpp)
target_link_libraries(sf-replay-tests
core-buffer-lib
sf-buffer-lib
sf-replay-lib
hdf5
hdf5_cpp
gtest
)
+9
View File
@@ -0,0 +1,9 @@
#include "gtest/gtest.h"
#include "test_ReplayH5Reader.cpp"
using namespace std;
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
+111
View File
@@ -0,0 +1,111 @@
#include "ReplayH5Reader.hpp"
#include "BufferH5Writer.hpp"
#include "gtest/gtest.h"
using namespace std;
using namespace core_buffer;
TEST(ReplayH5Reader, basic_interaction)
{
auto root_folder = ".";
auto device_name = "fast_device";
// This 2 must be compatible by design.
BufferH5Writer writer(root_folder, device_name);
ReplayH5Reader reader(root_folder, device_name);
size_t pulse_id = 65;
ModuleFrame w_metadata;
ModuleFrame r_metadata;
auto w_frame_buffer = make_unique<uint16_t[]>(MODULE_N_PIXELS);
auto r_frame_buffer = make_unique<uint16_t[]>(MODULE_N_PIXELS);
// Setup test values.
w_metadata.pulse_id = pulse_id;
w_metadata.frame_index = 2;
w_metadata.daq_rec = 3;
w_metadata.n_received_packets = 128;
w_metadata.module_id = 4;
for (size_t i=0; i<MODULE_N_PIXELS; i++) {
w_frame_buffer[i] = i % 100;
}
// Write to file.
writer.set_pulse_id(pulse_id);
writer.write(&w_metadata, (char*)&(w_frame_buffer[0]));
writer.close_file();
auto frame_present = reader.get_frame(
pulse_id, &r_metadata, (char*)&(r_frame_buffer[0]));
ASSERT_EQ(frame_present, true);
// Metadata has to match.
EXPECT_EQ(r_metadata.pulse_id, w_metadata.pulse_id);
EXPECT_EQ(r_metadata.frame_index, w_metadata.frame_index);
EXPECT_EQ(r_metadata.daq_rec, w_metadata.daq_rec);
EXPECT_EQ(r_metadata.n_received_packets, w_metadata.n_received_packets);
EXPECT_EQ(r_metadata.module_id, w_metadata.module_id);
// Data as well.
for (size_t i=0; i<MODULE_N_PIXELS; i++) {
w_frame_buffer[i] = r_frame_buffer[i];
}
reader.close_file();
}
TEST(ReplayH5Reader, missing_frame)
{
auto root_folder = ".";
auto device_name = "fast_device";
// This 2 must be compatible by design.
BufferH5Writer writer(root_folder, device_name);
ReplayH5Reader reader(root_folder, device_name);
size_t pulse_id = 65;
ModuleFrame w_metadata;
ModuleFrame r_metadata;
auto w_frame_buffer = make_unique<uint16_t[]>(MODULE_N_PIXELS);
auto r_frame_buffer = make_unique<uint16_t[]>(MODULE_N_PIXELS);
// Setup test values.
w_metadata.pulse_id = pulse_id;
w_metadata.frame_index = 2;
w_metadata.daq_rec = 3;
w_metadata.n_received_packets = 128;
w_metadata.module_id = 4;
for (size_t i=0; i<MODULE_N_PIXELS; i++) {
w_frame_buffer[i] = i % 100;
}
// Write to file.
writer.set_pulse_id(pulse_id);
writer.write(&w_metadata, (char*)&(w_frame_buffer[0]));
writer.close_file();
// But read another pulse_id, that should be empty.
auto frame_present = reader.get_frame(
pulse_id-1, &r_metadata, (char*)&(r_frame_buffer[0]));
ASSERT_EQ(frame_present, false);
// All metadata has to be 0, expect pulse_id.
EXPECT_EQ(r_metadata.pulse_id, pulse_id-1);
EXPECT_EQ(r_metadata.frame_index, 0);
EXPECT_EQ(r_metadata.daq_rec, 0);
EXPECT_EQ(r_metadata.n_received_packets, 0);
EXPECT_EQ(r_metadata.module_id, 0);
// Data as well.
for (size_t i=0; i<MODULE_N_PIXELS; i++) {
r_frame_buffer[i] = 0;
}
reader.close_file();
}