Reimplemented writer

This commit is contained in:
2021-02-25 10:21:49 +01:00
parent d8706f65da
commit a485736af4
2 changed files with 180 additions and 254 deletions
+28 -30
View File
@@ -3,48 +3,46 @@
#include <memory>
#include <string>
#include <H5Cpp.h>
#include <BufferUtils.hpp>
#include <formats.hpp>
#include "ImageAssembler.hpp"
extern "C" {
#include <hdf5_hl.h>
}
class JFH5Writer {
const std::string root_folder_;
const std::string detector_name_;
const size_t n_modules_;
const uint64_t start_pulse_id_;
const uint64_t stop_pulse_id_;
const size_t pulse_id_step_;
const size_t n_images_;
const size_t n_total_pulses_;
size_t meta_write_index_;
size_t data_write_index_;
const uint32_t image_y_size_;
const uint32_t image_x_size_;
H5::H5File file_;
H5::DataSet image_dataset_;
static const int64_t NO_RUN_ID;
int64_t current_run_id_ = NO_RUN_ID;
uint64_t* b_pulse_id_;
uint64_t* b_frame_index_;
uint32_t* b_daq_rec_;
uint8_t* b_is_good_frame_ ;
size_t get_n_pulses_in_range(const uint64_t start_pulse_id,
const uint64_t stop_pulse_id,
const int pulse_id_step);
void write_metadata();
std::string get_device_name(const std::string& device);
hid_t file_id_ = -1;
hid_t image_dataset_id_ = -1;
hid_t pulse_dataset_id_= -1;
hid_t frame_dataset_id_ = -1;
hid_t daq_rec_dataset_id_ = -1;
hid_t is_good_dataset_id_ = -1;
void open_file(const std::string& output_file, const uint32_t n_images);
void close_file();
public:
JFH5Writer(const std::string& output_file,
const std::string& device,
const size_t n_modules,
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id,
const size_t pulse_id_step);
JFH5Writer(const BufferUtils::DetectorConfig config);
~JFH5Writer();
void write(const ImageMetadataBlock* metadata, const char* data);
void open_run(const int64_t run_id, const uint32_t n_images);
void close_run();
void write_data(const int64_t run_id,
const uint32_t index,
const char* data);
void write_meta(const int64_t run_id,
const uint32_t index,
const ImageMetadata& meta);
};
#endif //SFWRITER_HPP
+152 -224
View File
@@ -2,213 +2,181 @@
#include <sstream>
#include <cstring>
#include <hdf5_hl.h>
#include <H5version.h>
#include "writer_config.hpp"
#include "live_writer_config.hpp"
#include "buffer_config.hpp"
#include "formats.hpp"
//extern "C"
//{
// #include "H5DOpublic.h"
// #include <bitshuffle/bshuf_h5filter.h>
//}
using namespace std;
using namespace writer_config;
using namespace buffer_config;
JFH5Writer::JFH5Writer(const string& output_file,
const string& device,
const size_t n_modules,
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id,
const size_t pulse_id_step) :
detector_name_(get_device_name(device)),
n_modules_(n_modules),
start_pulse_id_(start_pulse_id),
stop_pulse_id_(stop_pulse_id),
pulse_id_step_(pulse_id_step),
n_images_(get_n_pulses_in_range(start_pulse_id,
stop_pulse_id,
pulse_id_step)),
n_total_pulses_(stop_pulse_id_ - start_pulse_id_ + 1),
meta_write_index_(0),
data_write_index_(0)
extern "C"
{
// bshuf_register_h5filter();
file_ = H5::H5File(output_file, H5F_ACC_TRUNC);
file_.createGroup("/data");
file_.createGroup("/data/" + detector_name_);
H5::DataSpace att_space(H5S_SCALAR);
H5::DataType data_type = H5::StrType(0, H5T_VARIABLE);
file_.createGroup("/general");
auto detector_dataset = file_.createDataSet(
"/general/detector_name", data_type ,att_space);
detector_dataset.write(detector_name_, data_type);
hsize_t image_dataset_dims[3] =
{n_images_, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DataSpace image_dataspace(3, image_dataset_dims);
hsize_t image_dataset_chunking[3] =
{1, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DSetCreatPropList image_dataset_properties;
image_dataset_properties.setChunk(3, image_dataset_chunking);
// // block_size, compression type
// uint compression_prop[] =
// {MODULE_N_PIXELS, //block size
// BSHUF_H5_COMPRESS_LZ4}; // Compression type
//
// H5Pset_filter(image_dataset_properties.getId(),
// BSHUF_H5FILTER,
// H5Z_FLAG_MANDATORY,
// 2,
// &(compression_prop[0]));
image_dataset_ = file_.createDataSet(
"/data/" + detector_name_ + "/data",
H5::PredType::NATIVE_UINT16,
image_dataspace,
image_dataset_properties);
b_pulse_id_ = new uint64_t[n_total_pulses_];
b_frame_index_= new uint64_t[n_total_pulses_];
b_daq_rec_ = new uint32_t[n_total_pulses_];
b_is_good_frame_ = new uint8_t[n_total_pulses_];
#include "H5DOpublic.h"
#include <bitshuffle/bshuf_h5filter.h>
}
std::string JFH5Writer::get_device_name(const std::string& device)
{
size_t last_separator;
if ((last_separator = device.rfind("/")) == string::npos) {
return device;
}
using namespace std;
using namespace buffer_config;
using namespace live_writer_config;
return device.substr(last_separator+1);
JFH5Writer::JFH5Writer(const BufferUtils::DetectorConfig config):
root_folder_(config.buffer_folder),
detector_name_(config.detector_name),
image_x_size_(config.image_x_size),
image_y_size_(config.image_y_size)
{
}
JFH5Writer::~JFH5Writer()
{
close_file();
delete[] b_pulse_id_;
delete[] b_frame_index_;
delete[] b_daq_rec_;
delete[] b_is_good_frame_;
}
size_t JFH5Writer::get_n_pulses_in_range(
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id,
const int pulse_id_step)
void JFH5Writer::open_run(const int64_t run_id, const uint32_t n_images)
{
if (stop_pulse_id < start_pulse_id) {
throw runtime_error("stop_pulse_id smaller than start_pulse_id.");
}
close_file();
if (100 % pulse_id_step != 0) {
throw runtime_error("100 is not divisible by the pulse_id_step.");
}
const string output_folder = root_folder_ + "/" + OUTPUT_FOLDER_SYMLINK;
// TODO: Maybe add leading zeros to filename?
const string output_file = output_folder + to_string(run_id) + ".h5";
if (start_pulse_id % pulse_id_step != 0) {
throw runtime_error("start_pulse_id not divisible by pulse_id_step.");
}
open_file(output_file, n_images);
if (stop_pulse_id % pulse_id_step != 0) {
throw runtime_error("stop_pulse_id not divisible by pulse_id_step.");
}
size_t n_pulses = 1;
n_pulses += (stop_pulse_id / pulse_id_step);
n_pulses -= start_pulse_id / pulse_id_step;
if (n_pulses == 0) {
throw runtime_error("Zero pulses to write in given range.");
}
return n_pulses;
current_run_id_ = run_id;
}
void JFH5Writer::write_metadata()
void JFH5Writer::close_run()
{
hsize_t b_m_dims[] = {n_total_pulses_};
hsize_t b_m_count[] = {n_images_};
hsize_t b_m_start[] = {0};
hsize_t b_m_stride[] = {pulse_id_step_};
H5::DataSpace b_m_space (1, b_m_dims);
b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start, b_m_stride);
close_file();
current_run_id_ = NO_RUN_ID;
}
hsize_t f_m_dims[] = {n_images_, 1};
H5::DataSpace f_m_space(2, f_m_dims);
void JFH5Writer::open_file(const string& output_file, const uint32_t n_images)
{
// Create file
auto fcpl_id = H5Pcreate(H5P_FILE_ACCESS);
if (fcpl_id == -1) {
throw runtime_error("Error in file access property list.");
}
auto pulse_id_dataset = file_.createDataSet(
"/data/" + detector_name_ + "/pulse_id",
H5::PredType::NATIVE_UINT64, f_m_space);
pulse_id_dataset.write(
b_pulse_id_, H5::PredType::NATIVE_UINT64, b_m_space, f_m_space);
pulse_id_dataset.close();
if (H5Pset_fapl_mpio(fcpl_id, MPI_COMM_WORLD, MPI_INFO_NULL) < 0) {
throw runtime_error("Cannot set mpio to property list.");
}
auto frame_index_dataset = file_.createDataSet(
"/data/" + detector_name_ + "/frame_index",
H5::PredType::NATIVE_UINT64, f_m_space);
frame_index_dataset.write(
b_frame_index_, H5::PredType::NATIVE_UINT64, b_m_space, f_m_space);
frame_index_dataset.close();
file_id_ = H5Fcreate(
output_file.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, fcpl_id);
if (file_id_ < 0) {
throw runtime_error("Cannot create output file.");
}
auto daq_rec_dataset = file_.createDataSet(
"/data/" + detector_name_ + "/daq_rec",
H5::PredType::NATIVE_UINT32, f_m_space);
daq_rec_dataset.write(
b_daq_rec_, H5::PredType::NATIVE_UINT32, b_m_space, f_m_space);
daq_rec_dataset.close();
H5Pclose(fcpl_id);
auto is_good_frame_dataset = file_.createDataSet(
"/data/" + detector_name_ + "/is_good_frame",
H5::PredType::NATIVE_UINT8, f_m_space);
is_good_frame_dataset.write(
b_is_good_frame_, H5::PredType::NATIVE_UINT8, b_m_space, f_m_space);
is_good_frame_dataset.close();
// Create group
auto data_group_id = H5Gcreate(file_id_, detector_name_.c_str(),
H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
if (data_group_id < 0) {
throw runtime_error("Cannot create data group.");
}
// Create image dataset.
auto dcpl_id = H5Pcreate(H5P_DATASET_CREATE);
if (dcpl_id < 0) {
throw runtime_error("Error in creating dataset create property list.");
}
hsize_t image_dataset_chunking[] = {1, image_y_size_, image_x_size_};
if (H5Pset_chunk(dcpl_id, 3, image_dataset_chunking) < 0) {
throw runtime_error("Cannot set image dataset chunking.");
}
if (H5Pset_fill_time(dcpl_id, H5D_FILL_TIME_NEVER) < 0) {
throw runtime_error("Cannot set image dataset fill time.");
}
if (H5Pset_alloc_time(dcpl_id, H5D_ALLOC_TIME_EARLY) < 0) {
throw runtime_error("Cannot set image dataset allocation time.");
}
hsize_t image_dataset_dims[] = {n_images, image_y_size_, image_x_size_};
auto image_space_id = H5Screate_simple(3, image_dataset_dims, NULL);
if (image_space_id < 0) {
throw runtime_error("Cannot create image dataset space.");
}
// TODO: Enable compression.
// bshuf_register_h5filter();
// uint filter_prop[] = {PIXEL_N_BYTES, BSHUF_H5_COMPRESS_LZ4};
// if (H5Pset_filter(dcpl_id, BSHUF_H5FILTER, H5Z_FLAG_MANDATORY,
// 2, filter_prop) < 0) {
// throw runtime_error("Cannot set compression filter on dataset.");
// }
image_dataset_id_ = H5Dcreate(
data_group_id, "data", H5T_NATIVE_INT, image_space_id,
H5P_DEFAULT, dcpl_id, H5P_DEFAULT);
if (image_dataset_id_ < 0) {
throw runtime_error("Cannot create image dataset.");
}
// Create metadata datasets.
hsize_t meta_dataset_dims[] = {n_images};
auto meta_space_id = H5Screate_simple(1, meta_dataset_dims, NULL);
if (meta_space_id < 0) {
throw runtime_error("Cannot create meta dataset space.");
}
auto create_meta_dataset = [&](string name, hid_t data_type) {
auto dataset_id = H5Dcreate(
data_group_id, name.c_str(), data_type, meta_space_id,
H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
if (dataset_id < 0) {
throw runtime_error("Cannot create " + name + " dataset.");
}
return dataset_id;
};
pulse_dataset_id_ = create_meta_dataset("pulse_id", H5T_NATIVE_UINT64);
frame_dataset_id_ = create_meta_dataset("frame_index", H5T_NATIVE_UINT64);
daq_rec_dataset_id_ = create_meta_dataset("daq_rec", H5T_NATIVE_UINT32);
is_good_dataset_id_ = create_meta_dataset("is_good_frame", H5T_NATIVE_UINT8);
H5Sclose(meta_space_id);
H5Sclose(image_space_id);
H5Pclose(dcpl_id);
H5Gclose(data_group_id);
}
void JFH5Writer::close_file()
{
if (file_.getId() == -1) {
if (file_id_ < 0) {
return;
}
image_dataset_.close();
H5Dclose(image_dataset_id_);
image_dataset_id_ = -1;
write_metadata();
H5Dclose(pulse_dataset_id_);
pulse_dataset_id_ = -1;
file_.close();
H5Dclose(frame_dataset_id_);
frame_dataset_id_ = -1;
H5Dclose(daq_rec_dataset_id_);
daq_rec_dataset_id_ = -1;
H5Dclose(is_good_dataset_id_);
is_good_dataset_id_ = -1;
H5Fclose(file_id_);
file_id_ = -1;
}
void JFH5Writer::write(
const ImageMetadataBlock* metadata, const char* data)
void JFH5Writer::write_data(
const int64_t run_id, const uint32_t index, const char* data)
{
size_t n_images_offset = 0;
if (start_pulse_id_ > metadata->block_start_pulse_id) {
n_images_offset = start_pulse_id_ - metadata->block_start_pulse_id;
}
if (n_images_offset > BUFFER_BLOCK_SIZE) {
throw runtime_error("Received unexpected block for start_pulse_id.");
}
size_t n_images_to_copy = BUFFER_BLOCK_SIZE - n_images_offset;
if (stop_pulse_id_ < metadata->block_stop_pulse_id) {
n_images_to_copy -= metadata->block_stop_pulse_id - stop_pulse_id_;
}
if (n_images_to_copy < 1) {
throw runtime_error("Received unexpected block for stop_pulse_id.");
if (run_id != current_run_id_) {
throw runtime_error("Invalid run_id.");
}
// hsize_t b_i_dims[3] = {BUFFER_BLOCK_SIZE,
@@ -234,60 +202,20 @@ void JFH5Writer::write(
// image_dataset_.write(
// data, H5::PredType::NATIVE_UINT16, b_i_space, f_i_space);
// TODO: Can the i_image++ be made more efficient?
for (size_t i_image=n_images_offset;
i_image < n_images_offset + n_images_to_copy;
i_image++) {
hsize_t offset[] = {data_write_index_, 0, 0};
size_t data_offset = i_image * MODULE_N_BYTES * n_modules_;
H5DOwrite_chunk(
image_dataset_.getId(),
H5P_DEFAULT,
0,
offset,
MODULE_N_BYTES * n_modules_,
data + data_offset);
}
void JFH5Writer::write_meta(
const int64_t run_id, const uint32_t index, const ImageMetadata& meta)
{
if (i_image % pulse_id_step_ != 0) {
continue;
}
hsize_t offset[] = {data_write_index_, 0, 0};
size_t data_offset = i_image * MODULE_N_BYTES * n_modules_;
H5DOwrite_chunk(
image_dataset_.getId(),
H5P_DEFAULT,
0,
offset,
MODULE_N_BYTES * n_modules_,
data + data_offset);
data_write_index_++;
}
// pulse_id
{
auto b_current_ptr = b_pulse_id_ + meta_write_index_;
memcpy(b_current_ptr,
&(metadata->pulse_id[n_images_offset]),
sizeof(uint64_t) * n_images_to_copy);
}
// frame_index
{
auto b_current_ptr = b_frame_index_ + meta_write_index_;
memcpy(b_current_ptr,
&(metadata->frame_index[n_images_offset]),
sizeof(uint64_t) * n_images_to_copy);
}
// daq_rec
{
auto b_current_ptr = b_daq_rec_ + meta_write_index_;
memcpy(b_current_ptr,
&(metadata->daq_rec[n_images_offset]),
sizeof(uint32_t) * n_images_to_copy);
}
// is_good_frame
{
auto b_current_ptr = b_is_good_frame_ + meta_write_index_;
memcpy(b_current_ptr,
&(metadata->is_good_image[n_images_offset]),
sizeof(uint8_t) * n_images_to_copy);
}
meta_write_index_ += n_images_to_copy;
}