Add jf-live-writer

First implementation of the image buffer writer for the
 Jungfrau
This commit is contained in:
2021-01-19 17:20:06 +01:00
parent 402a2b978d
commit 6bbbd734c7
11 changed files with 121 additions and 699 deletions
+1 -1
View File
@@ -34,4 +34,4 @@ add_subdirectory("jf-buffer-writer")
add_subdirectory("jf-assembler")
add_subdirectory("sf-stream")
add_subdirectory("sf-writer")
#add_subdirectory("jf-live-writer")
add_subdirectory("jf-live-writer")
-28
View File
@@ -1,28 +0,0 @@
#ifndef SF_DAQ_BUFFER_BINARYREADER_HPP
#define SF_DAQ_BUFFER_BINARYREADER_HPP
#include <formats.hpp>
class BinaryReader {
const std::string detector_folder_;
const std::string module_name_;
std::string current_input_file_;
int input_file_fd_;
void open_file(const std::string& filename);
void close_current_file();
public:
BinaryReader(const std::string &detector_folder,
const std::string &module_name);
~BinaryReader();
void get_frame(const uint64_t pulse_id, BufferBinaryFormat *buffer);
};
#endif //SF_DAQ_BUFFER_BINARYREADER_HPP
+32
View File
@@ -0,0 +1,32 @@
#include <cstddef>
#include <formats.hpp>
#include <chrono>
#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP
#define SF_DAQ_BUFFER_FRAMESTATS_HPP
class BufferStats {
const std::string detector_name_;
const int module_id_;
size_t stats_modulo_;
int frames_counter_;
uint32_t total_buffer_write_us_;
uint32_t max_buffer_write_us_;
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
void reset_counters();
void print_stats();
public:
BufferStats(
const std::string &detector_name,
const int module_id,
const size_t stats_modulo);
void start_frame_write();
void end_frame_write();
};
#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP
-49
View File
@@ -1,49 +0,0 @@
#ifndef SFWRITER_HPP
#define SFWRITER_HPP
#include <memory>
#include <string>
#include <H5Cpp.h>
#include "LiveImageAssembler.hpp"
const auto& H5_UINT64 = H5::PredType::NATIVE_UINT64;
const auto& H5_UINT32 = H5::PredType::NATIVE_UINT32;
const auto& H5_UINT16 = H5::PredType::NATIVE_UINT16;
const auto& H5_UINT8 = H5::PredType::NATIVE_UINT8;
class JFH5LiveWriter {
const std::string detector_name_;
const size_t n_modules_;
const size_t n_pulses_;
size_t write_index_;
H5::H5File file_;
H5::DataSet image_dataset_;
uint64_t* b_pulse_id_;
uint64_t* b_frame_index_;
uint32_t* b_daq_rec_;
uint8_t* b_is_good_frame_ ;
void init_file(const std::string &output_file);
void write_dataset(const std::string name,
const void *buffer,
const H5::PredType &type);
void write_metadata();
std::string get_detector_name(const std::string& detector_folder);
void close_file();
public:
JFH5LiveWriter(const std::string& output_file,
const std::string& detector_folder,
const size_t n_modules,
const size_t n_pulses);
~JFH5LiveWriter();
void write(const ImageMetadata* metadata, const char* data);
};
#endif //SFWRITER_HPP
@@ -1,51 +0,0 @@
#ifndef SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP
#define SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP
#include <atomic>
#include "buffer_config.hpp"
#include "formats.hpp"
const uint64_t IA_EMPTY_SLOT_VALUE = 0;
struct ImageMetadata
{
uint64_t pulse_id;
uint64_t frame_index;
uint32_t daq_rec;
uint8_t is_good_image;
};
class LiveImageAssembler {
const size_t n_modules_;
const size_t image_buffer_slot_n_bytes_;
char* image_buffer_;
ImageMetadata* image_meta_buffer_;
ModuleFrame* frame_meta_buffer_;
std::atomic_int* buffer_status_;
std::atomic_uint64_t* buffer_pulse_id_;
size_t get_data_offset(const uint64_t slot_id, const int i_module);
size_t get_frame_metadata_offset(const uint64_t slot_id, const int i_module);
public:
LiveImageAssembler(const size_t n_modules);
virtual ~LiveImageAssembler();
bool is_slot_free(const uint64_t pulse_id);
bool is_slot_full(const uint64_t pulse_id);
void process(const uint64_t pulse_id,
const int i_module,
const BufferBinaryFormat* block_buffer);
void free_slot(const uint64_t pulse_id);
ImageMetadata* get_metadata_buffer(const uint64_t pulse_id);
char* get_data_buffer(const uint64_t pulse_id);
};
#endif //SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP
@@ -2,8 +2,6 @@
namespace live_writer_config
{
// MS to retry reading from the image assembler.
const size_t ASSEMBLER_RETRY_MS = 5;
// Number of slots in the reconstruction buffer.
const size_t WRITER_IA_N_SLOTS = 200;
// N of IO threads to receive data from modules.
const int LIVE_ZMQ_IO_THREADS = 1;
}
-102
View File
@@ -1,102 +0,0 @@
#include "BinaryReader.hpp"
#include <unistd.h>
#include <sstream>
#include <cstring>
#include <fcntl.h>
#include "BufferUtils.hpp"
#include "buffer_config.hpp"
using namespace std;
using namespace buffer_config;
BinaryReader::BinaryReader(
const std::string &detector_folder,
const std::string &module_name) :
detector_folder_(detector_folder),
module_name_(module_name),
current_input_file_(""),
input_file_fd_(-1)
{}
BinaryReader::~BinaryReader()
{
close_current_file();
}
void BinaryReader::get_frame(
const uint64_t pulse_id, BufferBinaryFormat* buffer)
{
auto current_frame_file = BufferUtils::get_filename(
detector_folder_, module_name_, pulse_id);
if (current_frame_file != current_input_file_) {
open_file(current_frame_file);
}
size_t file_index = BufferUtils::get_file_frame_index(pulse_id);
size_t n_bytes_offset = file_index * sizeof(BufferBinaryFormat);
auto lseek_result = lseek(input_file_fd_, n_bytes_offset, SEEK_SET);
if (lseek_result < 0) {
stringstream err_msg;
err_msg << "[BinaryReader::get_frame]";
err_msg << " Error while lseek on file ";
err_msg << current_input_file_ << " for n_bytes_offset ";
err_msg << n_bytes_offset << ": " << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
auto n_bytes = ::read(input_file_fd_, buffer, sizeof(BufferBinaryFormat));
if (n_bytes < sizeof(BufferBinaryFormat)) {
stringstream err_msg;
err_msg << "[BinaryReader::get_block]";
err_msg << " Error while reading from file ";
err_msg << current_input_file_ << ": " << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
}
void BinaryReader::open_file(const std::string& filename)
{
close_current_file();
input_file_fd_ = open(filename.c_str(), O_RDONLY);
if (input_file_fd_ < 0) {
stringstream err_msg;
err_msg << "[BinaryReader::open_file]";
err_msg << " Cannot open file " << filename << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
current_input_file_ = filename;
}
void BinaryReader::close_current_file()
{
if (input_file_fd_ != -1) {
if (close(input_file_fd_) < 0) {
stringstream err_msg;
err_msg << "[BinaryWriter::close_current_file]";
err_msg << " Error while closing file " << current_input_file_;
err_msg << ": " << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
input_file_fd_ = -1;
current_input_file_ = "";
}
}
+63
View File
@@ -0,0 +1,63 @@
#include <iostream>
#include "BufferStats.hpp"
using namespace std;
using namespace chrono;
BufferStats::BufferStats(
const string& detector_name,
const int module_id,
const size_t stats_modulo) :
detector_name_(detector_name),
module_id_(module_id),
stats_modulo_(stats_modulo)
{
reset_counters();
}
void BufferStats::reset_counters()
{
frames_counter_ = 0;
total_buffer_write_us_ = 0;
max_buffer_write_us_ = 0;
}
void BufferStats::start_frame_write()
{
stats_interval_start_ = steady_clock::now();
}
void BufferStats::end_frame_write()
{
frames_counter_++;
uint32_t write_us_duration = duration_cast<microseconds>(
steady_clock::now()-stats_interval_start_).count();
total_buffer_write_us_ += write_us_duration;
max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration);
if (frames_counter_ == stats_modulo_) {
print_stats();
reset_counters();
}
}
void BufferStats::print_stats()
{
float avg_buffer_write_us = total_buffer_write_us_ / frames_counter_;
uint64_t timestamp = time_point_cast<nanoseconds>(
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "jf_buffer_writer";
cout << ",detector_name=" << detector_name_;
cout << ",module_name=M" << module_id_;
cout << " ";
cout << "avg_buffer_write_us=" << avg_buffer_write_us;
cout << ",max_buffer_write_us=" << max_buffer_write_us_ << "i";
cout << " ";
cout << timestamp;
cout << endl;
}
-133
View File
@@ -1,133 +0,0 @@
#include "JFH5LiveWriter.hpp"
#include <cstring>
#include <hdf5_hl.h>
#include "buffer_config.hpp"
using namespace std;
using namespace buffer_config;
JFH5LiveWriter::JFH5LiveWriter(const string& output_file,
const string& detector_folder,
const size_t n_modules,
const size_t n_pulses) :
detector_name_(get_detector_name(detector_folder)),
n_modules_(n_modules),
n_pulses_(n_pulses),
write_index_(0)
{
b_pulse_id_ = new uint64_t[n_pulses_];
b_frame_index_= new uint64_t[n_pulses_];
b_daq_rec_ = new uint32_t[n_pulses_];
b_is_good_frame_ = new uint8_t[n_pulses_];
init_file(output_file);
}
void JFH5LiveWriter::init_file(const string& output_file)
{
file_ = H5::H5File(output_file, H5F_ACC_TRUNC);
file_.createGroup("/data");
file_.createGroup("/data/" + detector_name_);
H5::DataSpace att_space(H5S_SCALAR);
H5::DataType data_type = H5::StrType(0, H5T_VARIABLE);
file_.createGroup("/general");
auto detector_dataset = file_.createDataSet(
"/general/detector_name", data_type ,att_space);
detector_dataset.write(detector_name_, data_type);
hsize_t image_dataset_dims[3] =
{n_pulses_, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DataSpace image_dataspace(3, image_dataset_dims);
hsize_t image_dataset_chunking[3] =
{1, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DSetCreatPropList image_dataset_properties;
image_dataset_properties.setChunk(3, image_dataset_chunking);
image_dataset_ = file_.createDataSet(
"/data/" + detector_name_ + "/data",
H5_UINT16,
image_dataspace,
image_dataset_properties);
}
std::string JFH5LiveWriter::get_detector_name(const string& detector_folder)
{
size_t last_separator;
if ((last_separator = detector_folder.rfind("/")) == string::npos) {
return detector_folder;
}
return detector_folder.substr(last_separator + 1);
}
JFH5LiveWriter::~JFH5LiveWriter()
{
close_file();
delete[] b_pulse_id_;
delete[] b_frame_index_;
delete[] b_daq_rec_;
delete[] b_is_good_frame_;
}
void JFH5LiveWriter::write_dataset(
const string name, const void* buffer, const H5::PredType& type)
{
hsize_t b_m_dims[] = {n_pulses_};
H5::DataSpace b_m_space (1, b_m_dims);
hsize_t f_m_dims[] = {n_pulses_, 1};
H5::DataSpace f_m_space(2, f_m_dims);
auto complete_name = "/data/" + detector_name_ + "/" + name;
auto dataset = file_.createDataSet(complete_name, type, f_m_space);
dataset.write(buffer, type, b_m_space, f_m_space);
dataset.close();
}
void JFH5LiveWriter::write_metadata()
{
write_dataset("pulse_id", &b_pulse_id_, H5_UINT64);
write_dataset("frame_index", &b_frame_index_, H5_UINT64);
write_dataset("daq_rec", &b_daq_rec_, H5_UINT32);
write_dataset("is_good_frame", &b_is_good_frame_, H5_UINT8);
}
void JFH5LiveWriter::close_file()
{
if (file_.getId() == -1) {
return;
}
image_dataset_.close();
write_metadata();
file_.close();
}
void JFH5LiveWriter::write(const ImageMetadata* metadata, const char* data)
{
hsize_t offset[] = {write_index_, 0, 0};
H5DOwrite_chunk(image_dataset_.getId(), H5P_DEFAULT, 0,
offset, MODULE_N_BYTES * n_modules_, data);
b_pulse_id_[write_index_] = metadata->pulse_id;
b_frame_index_[write_index_] = metadata->frame_index;
b_daq_rec_[write_index_] = metadata->daq_rec;
b_is_good_frame_[write_index_] = metadata->is_good_image;
write_index_++;
}
-159
View File
@@ -1,159 +0,0 @@
#include <cstring>
#include "LiveImageAssembler.hpp"
#include "buffer_config.hpp"
#include "live_writer_config.hpp"
using namespace std;
using namespace buffer_config;
using namespace live_writer_config;
LiveImageAssembler::LiveImageAssembler(const size_t n_modules) :
n_modules_(n_modules),
image_buffer_slot_n_bytes_(MODULE_N_BYTES * n_modules_)
{
image_buffer_ = new char[WRITER_IA_N_SLOTS * image_buffer_slot_n_bytes_];
image_meta_buffer_ = new ImageMetadata[WRITER_IA_N_SLOTS];
frame_meta_buffer_ = new ModuleFrame[WRITER_IA_N_SLOTS * n_modules];
buffer_status_ = new atomic_int[WRITER_IA_N_SLOTS];
buffer_pulse_id_ = new atomic_uint64_t[WRITER_IA_N_SLOTS];
for (size_t i=0; i < WRITER_IA_N_SLOTS; i++) {
free_slot(i);
}
}
LiveImageAssembler::~LiveImageAssembler()
{
delete[] image_buffer_;
delete[] image_meta_buffer_;
}
bool LiveImageAssembler::is_slot_free(const uint64_t pulse_id)
{
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
uint64_t slot_pulse_id = IA_EMPTY_SLOT_VALUE;
if (buffer_pulse_id_[slot_id].compare_exchange_strong(
slot_pulse_id, pulse_id)) {
return true;
}
auto is_free = buffer_status_[slot_id].load(memory_order_relaxed) > 0;
return is_free && (slot_pulse_id == pulse_id);
}
bool LiveImageAssembler::is_slot_full(const uint64_t pulse_id)
{
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
return buffer_status_[slot_id].load(memory_order_relaxed) == 0;
}
size_t LiveImageAssembler::get_data_offset(
const uint64_t slot_id, const int i_module)
{
size_t slot_i_offset = slot_id * image_buffer_slot_n_bytes_;
size_t module_i_offset = i_module * MODULE_N_BYTES;
return slot_i_offset + module_i_offset;
}
size_t LiveImageAssembler::get_frame_metadata_offset(
const uint64_t slot_id, const int i_module)
{
size_t slot_m_offset = slot_id * n_modules_;
size_t module_m_offset = i_module;
return slot_m_offset + module_m_offset;
}
void LiveImageAssembler::process(
const uint64_t pulse_id,
const int i_module,
const BufferBinaryFormat* file_buffer)
{
const auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
auto frame_meta_offset = get_frame_metadata_offset(slot_id, i_module);
auto image_offset = get_data_offset(slot_id, i_module);
memcpy(
&(frame_meta_buffer_[frame_meta_offset]),
&(file_buffer->metadata),
sizeof(file_buffer->metadata));
memcpy(
image_buffer_ + image_offset,
&(file_buffer->data[0]),
MODULE_N_BYTES);
buffer_status_[slot_id].fetch_sub(1, memory_order_relaxed);
}
void LiveImageAssembler::free_slot(const uint64_t pulse_id)
{
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
buffer_status_[slot_id].store(n_modules_, memory_order_relaxed);
buffer_pulse_id_[slot_id].store(IA_EMPTY_SLOT_VALUE, memory_order_relaxed);
}
ImageMetadata* LiveImageAssembler::get_metadata_buffer(const uint64_t pulse_id)
{
const auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
ImageMetadata& image_meta = image_meta_buffer_[slot_id];
auto frame_meta_offset = get_frame_metadata_offset(slot_id, 0);
auto is_pulse_init = false;
image_meta.is_good_image = 1;
image_meta.pulse_id = 0;
for (size_t i_module=0; i_module < n_modules_; i_module++) {
auto& frame_meta = frame_meta_buffer_[frame_meta_offset];
frame_meta_offset += 1;
auto is_good_frame =
frame_meta.n_recv_packets == JF_N_PACKETS_PER_FRAME;
if (!is_good_frame) {
image_meta.pulse_id = 0;
continue;
}
if (!is_pulse_init) {
image_meta.pulse_id = frame_meta.pulse_id;
image_meta.frame_index = frame_meta.frame_index;
image_meta.daq_rec = frame_meta.daq_rec;
is_pulse_init = true;
}
if (image_meta.is_good_image == 1) {
if (frame_meta.pulse_id != image_meta.pulse_id) {
image_meta.is_good_image = 0;
}
if (frame_meta.frame_index != image_meta.frame_index) {
image_meta.is_good_image = 0;
}
if (frame_meta.daq_rec != image_meta.daq_rec) {
image_meta.is_good_image = 0;
}
if (frame_meta.n_recv_packets != JF_N_PACKETS_PER_FRAME) {
image_meta.is_good_image = 0;
}
}
}
return &image_meta;
}
char* LiveImageAssembler::get_data_buffer(const uint64_t pulse_id)
{
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
return image_buffer_ + (slot_id * image_buffer_slot_n_bytes_);
}
+23 -172
View File
@@ -1,195 +1,46 @@
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <vector>
#include "zmq.h"
#include <zmq.h>
#include <RamBuffer.hpp>
#include <BufferUtils.hpp>
#include "live_writer_config.hpp"
#include "buffer_config.hpp"
#include "bitshuffle/bitshuffle.h"
#include "JFH5LiveWriter.hpp"
#include "LiveImageAssembler.hpp"
#include "BinaryReader.hpp"
#include "../../jf-buffer-writer/include/BufferStats.hpp"
using namespace std;
using namespace chrono;
using namespace buffer_config;
using namespace live_writer_config;
void read_buffer(
const string detector_folder,
const string module_name,
const int i_module,
const vector<uint64_t>& pulse_ids_to_write,
LiveImageAssembler& image_assembler,
void* ctx)
{
BinaryReader reader(detector_folder, module_name);
auto frame_buffer = new BufferBinaryFormat();
void* socket = zmq_socket(ctx, ZMQ_SUB);
if (socket == nullptr) {
throw runtime_error(zmq_strerror(errno));
}
int rcvhwm = 100;
if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
int linger = 0;
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
// In milliseconds.
int rcvto = 2000;
if (zmq_setsockopt(socket, ZMQ_RCVTIMEO, &rcvto, sizeof(rcvto)) != 0 ){
throw runtime_error(zmq_strerror(errno));
}
if (zmq_connect(socket, "tcp://127.0.0.1:51234") != 0) {
throw runtime_error(zmq_strerror(errno));
}
if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) {
throw runtime_error(zmq_strerror(errno));
}
const uint64_t PULSE_ID_DELAY = 100;
uint64_t live_pulse_id = pulse_ids_to_write.front();
for (uint64_t pulse_id:pulse_ids_to_write) {
while(!image_assembler.is_slot_free(pulse_id)) {
this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS));
}
auto start_time = steady_clock::now();
// Enforce a delay of 1 second for writing.
while (live_pulse_id - pulse_id < PULSE_ID_DELAY) {
if (zmq_recv(socket, &live_pulse_id,
sizeof(live_pulse_id), 0) == -1) {
if (errno == EAGAIN) {
throw runtime_error("Did not receive pulse_id in time.");
} else {
throw runtime_error(zmq_strerror(errno));
}
}
}
reader.get_frame(pulse_id, frame_buffer);
auto end_time = steady_clock::now();
uint64_t read_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
start_time = steady_clock::now();
image_assembler.process(pulse_id, i_module, frame_buffer);
end_time = steady_clock::now();
uint64_t compose_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
cout << "sf_writer:avg_read_us ";
cout << read_us_duration / BUFFER_BLOCK_SIZE << endl;
cout << "sf_writer:avg_assemble_us ";
cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl;
}
delete frame_buffer;
}
int main (int argc, char *argv[])
{
if (argc != 7) {
if (argc != 3) {
cout << endl;
cout << "Usage: sf_writer [output_file] [detector_folder] [n_modules]";
cout << " [start_pulse_id] [n_pulses] [pulse_id_step]";
cout << endl;
cout << "\toutput_file: Complete path to the output file." << endl;
cout << "\tdetector_folder: Absolute path to detector buffer." << endl;
cout << "\tn_modules: number of modules" << endl;
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
cout << "\tn_pulses: Number of pulses to write." << endl;
cout << "\tpulse_id_step: 1==100Hz, 2==50hz, 4==25Hz.." << endl;
cout << "Usage: jf_live_writer [detector_json_filename]"
" [stream_name]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << endl;
exit(-1);
}
string output_file = string(argv[1]);
const string detector_folder = string(argv[2]);
size_t n_modules = atoi(argv[3]);
uint64_t start_pulse_id = (uint64_t) atoll(argv[4]);
size_t n_pulses = (size_t) atoll(argv[5]);
int pulse_id_step = atoi(argv[6]);
std::vector<uint64_t> pulse_ids_to_write;
uint64_t i_pulse_id = start_pulse_id;
for (size_t i=0; i<n_pulses; i++) {
pulse_ids_to_write.push_back(i_pulse_id);
i_pulse_id += pulse_id_step;
}
LiveImageAssembler image_assembler(n_modules);
const auto stream_name = string(argv[2]);
auto config = BufferUtils::read_json_config(string(argv[1]));
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, 1);
zmq_ctx_set(ctx, ZMQ_IO_THREADS, LIVE_ZMQ_IO_THREADS);
auto receiver = BufferUtils::connect_socket(
ctx, config.detector_name, "assembler");
std::vector<std::thread> reading_threads(n_modules);
for (size_t i_module=0; i_module<n_modules; i_module++) {
RamBuffer ram_buffer(config.detector_name, config.n_modules);
BufferStats stats(config.detector_name, stream_name, STATS_MODULO);
// TODO: Very ugly. Fix.
string module_name = "M";
if (i_module < 10) {
module_name += "0";
}
module_name += to_string(i_module);
ImageMetadata meta;
while (true) {
zmq_recv(receiver, &meta, sizeof(meta), 0);
char* data = ram_buffer.read_image(meta.pulse_id);
reading_threads.emplace_back(
read_buffer,
detector_folder,
module_name,
i_module,
ref(pulse_ids_to_write),
ref(image_assembler),
ctx);
sender.send(meta, data);
stats.record_stats(meta);
}
JFH5LiveWriter writer(output_file, detector_folder, n_modules, n_pulses);
for (uint64_t pulse_id:pulse_ids_to_write) {
while(!image_assembler.is_slot_full(pulse_id)) {
this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS));
}
auto metadata = image_assembler.get_metadata_buffer(pulse_id);
auto data = image_assembler.get_data_buffer(pulse_id);
auto start_time = steady_clock::now();
writer.write(metadata, data);
auto end_time = steady_clock::now();
auto write_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
image_assembler.free_slot(pulse_id);
cout << "sf_writer:avg_write_us ";
cout << write_us_duration / BUFFER_BLOCK_SIZE << endl;
}
for (auto& reading_thread : reading_threads) {
if (reading_thread.joinable()) {
reading_thread.join();
}
}
return 0;
}