Create buffered send

This commit is contained in:
2020-05-26 12:55:56 +02:00
parent 78b2524f20
commit 5ba868a7ae
5 changed files with 60 additions and 107 deletions
+2 -9
View File
@@ -17,13 +17,6 @@ class ReplayH5Reader {
H5::DataSet dset_metadata_;
H5::DataSet dset_frame_;
ModuleFrame* m_buffer_ = nullptr;
char* f_buffer_ = nullptr;
uint64_t buffer_start_pulse_id_ = 0;
uint64_t buffer_end_pulse_id_ = 0;
void load_buffers(const uint64_t pulse_id);
public:
ReplayH5Reader(
const std::string device,
@@ -33,8 +26,8 @@ public:
void close_file();
void get_buffer(
const uint64_t pulse_id,
ModuleFrame*& metadata,
char*& frame_buffer);
ReplayBuffer* metadata,
char* frame_buffer);
};
+2 -1
View File
@@ -3,6 +3,7 @@
#include <string>
#include <jungfrau.hpp>
#include <formats.hpp>
class ReplayZmqSender {
@@ -15,7 +16,7 @@ public:
void close();
void send(const ModuleFrame* metadata, const char* data);
void send(const ReplayBuffer* metadata, const char* data);
};
+46 -68
View File
@@ -5,7 +5,32 @@
using namespace std;
using namespace core_buffer;
void ReplayH5Reader::load_buffers(const uint64_t pulse_id)
ReplayH5Reader::ReplayH5Reader(
const string device,
const string channel_name) :
device_(device),
channel_name_(channel_name)
{
}
ReplayH5Reader::~ReplayH5Reader()
{
close_file();
}
void ReplayH5Reader::close_file()
{
if (current_file_.getId() != -1) {
dset_metadata_.close();
dset_frame_.close();
current_file_.close();
}
}
void ReplayH5Reader::get_buffer(
const uint64_t pulse_id,
ReplayBuffer* metadata,
char* data)
{
auto pulse_filename = BufferUtils::get_filename(
device_, channel_name_, pulse_id);
@@ -18,29 +43,31 @@ void ReplayH5Reader::load_buffers(const uint64_t pulse_id)
dset_metadata_ = current_file_.openDataSet(BUFFER_H5_METADATA_DATASET);
dset_frame_ = current_file_.openDataSet(BUFFER_H5_FRAME_DATASET);
hsize_t b_m_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS};
H5::DataSpace b_m_space (2, b_m_dims);
hsize_t b_m_count[] = {FILE_MOD, ModuleFrame_N_FIELDS};
hsize_t b_m_start[] = {0, 0};
b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start);
hsize_t f_m_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS};
H5::DataSpace f_m_space (2, f_m_dims);
hsize_t f_m_count[] = {FILE_MOD, ModuleFrame_N_FIELDS};
hsize_t pulse_id_start[] = {0, 0};
f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, pulse_id_start);
dset_metadata_.read(
m_buffer_, H5::PredType::NATIVE_UINT64, b_m_space, f_m_space);
}
auto file_index = BufferUtils::get_file_frame_index(pulse_id);
auto cache_start_index = file_index / REPLAY_READ_BUFFER_SIZE;
cache_start_index *= REPLAY_READ_BUFFER_SIZE;
buffer_start_pulse_id_ = pulse_id - (file_index - cache_start_index);
buffer_end_pulse_id_ = buffer_start_pulse_id_ + REPLAY_READ_BUFFER_SIZE - 1;
uint64_t b_start_pulse_id = pulse_id - (file_index - cache_start_index);
metadata->start_pulse_id = b_start_pulse_id;
metadata->n_frames = REPLAY_READ_BUFFER_SIZE;
hsize_t b_m_dims[2] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS};
H5::DataSpace b_m_space (2, b_m_dims);
hsize_t b_m_count[] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS};
hsize_t b_m_start[] = {cache_start_index, 0};
b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start);
hsize_t f_m_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS};
H5::DataSpace f_m_space (2, f_m_dims);
hsize_t f_m_count[] = {REPLAY_READ_BUFFER_SIZE, ModuleFrame_N_FIELDS};
hsize_t pulse_id_start[] = {cache_start_index, 0};
f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, pulse_id_start);
dset_metadata_.read(
&metadata->metadata[0],
H5::PredType::NATIVE_UINT64, b_m_space, f_m_space);
hsize_t b_f_dims[3] =
{REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE};
@@ -58,54 +85,5 @@ void ReplayH5Reader::load_buffers(const uint64_t pulse_id)
f_f_space.selectHyperslab(H5S_SELECT_SET, f_f_count, f_f_start);
dset_frame_.read(
f_buffer_, H5::PredType::NATIVE_UINT16, b_f_space, f_f_space);
}
ReplayH5Reader::ReplayH5Reader(
const string device,
const string channel_name) :
device_(device),
channel_name_(channel_name)
{
m_buffer_ = new ModuleFrame[FILE_MOD];
f_buffer_ = new char[MODULE_N_BYTES * REPLAY_READ_BUFFER_SIZE];
}
ReplayH5Reader::~ReplayH5Reader()
{
close_file();
delete[] m_buffer_;
delete[] f_buffer_;
}
void ReplayH5Reader::close_file()
{
if (current_file_.getId() != -1) {
dset_metadata_.close();
dset_frame_.close();
current_file_.close();
}
}
void ReplayH5Reader::get_buffer(
const uint64_t pulse_id,
ModuleFrame*& metadata,
char*& data)
{
// Buffer start and end pulse_ids are inclusive.
if ((pulse_id < buffer_start_pulse_id_) ||
(pulse_id > buffer_end_pulse_id_)) {
load_buffers(pulse_id);
}
auto file_index = BufferUtils::get_file_frame_index(pulse_id);
auto buffer_index = pulse_id - buffer_start_pulse_id_;
metadata = m_buffer_ + file_index;
data = f_buffer_ + (buffer_index * MODULE_N_BYTES);
if (metadata->pulse_id != 0 && metadata->pulse_id != pulse_id) {
throw runtime_error("Corrupted buffer file.");
}
data, H5::PredType::NATIVE_UINT16, b_f_space, f_f_space);
}
+3 -3
View File
@@ -41,8 +41,8 @@ void ReplayZmqSender::close() {
zmq_ctx_destroy(ctx_);
}
void ReplayZmqSender::send(const ModuleFrame* metadata, const char* data)
void ReplayZmqSender::send(const ReplayBuffer* metadata, const char* data)
{
zmq_send(socket_, metadata, sizeof(ModuleFrame), ZMQ_SNDMORE);
zmq_send(socket_, data, MODULE_N_BYTES, 0);
zmq_send(socket_, metadata, sizeof(ReplayBuffer), ZMQ_SNDMORE);
zmq_send(socket_, data, MODULE_N_BYTES * REPLAY_READ_BUFFER_SIZE, 0);
}
+7 -26
View File
@@ -14,15 +14,11 @@ using namespace chrono;
void sf_replay (
const string device,
const string channel_name,
FastQueue<ModuleFrame>& queue,
FastQueue<ReplayBuffer>& queue,
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id
)
{
uint64_t read_us = 0;
uint64_t max_read_us = 0;
uint64_t n_stats = 0;
ReplayH5Reader file_reader(device, channel_name);
// "<= stop_pulse_id" because we include the stop_pulse_id in the file.
@@ -41,12 +37,7 @@ void sf_replay (
auto metadata = queue.get_metadata_buffer(slot_id);
auto buffer = queue.get_data_buffer(slot_id);
ModuleFrame* m_buffer;
char* f_buffer;
file_reader.get_buffer(curr_pulse_id, m_buffer, f_buffer);
memcpy(metadata, m_buffer, sizeof(ModuleFrame));
memcpy(buffer, f_buffer, MODULE_N_BYTES);
file_reader.get_buffer(curr_pulse_id, metadata, buffer);
auto end_time = steady_clock::now();
uint64_t read_us_duration =
@@ -55,20 +46,8 @@ void sf_replay (
queue.commit();
// TODO: Proper statistics
n_stats++;
read_us += read_us_duration;
max_read_us = max(max_read_us, read_us_duration);
if (n_stats == STATS_MODULO) {
cout << "sf_replay:avg_read_us " << read_us / STATS_MODULO;
cout << " sf_replay:max_read_us " << max_read_us;
cout << endl;
n_stats = 0;
read_us = 0;
max_read_us = 0;
}
cout << "sf_replay:avg_read_us ";
cout << read_us_duration / REPLAY_READ_BUFFER_SIZE << endl;
}
}
@@ -96,7 +75,9 @@ int main (int argc, char *argv[]) {
const auto start_pulse_id = (uint64_t) atoll(argv[5]);
const auto stop_pulse_id = (uint64_t) atoll(argv[6]);
FastQueue<ModuleFrame> queue(MODULE_N_BYTES, REPLAY_FASTQUEUE_N_SLOTS);
FastQueue<ReplayBuffer> queue(
MODULE_N_BYTES * REPLAY_READ_BUFFER_SIZE,
REPLAY_FASTQUEUE_N_SLOTS);
thread file_read_thread(sf_replay,
device, channel_name, ref(queue),