From a485736af4a690d57e8d162d0ac740ba993ba71e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Thu, 25 Feb 2021 10:21:49 +0100 Subject: [PATCH] Reimplemented writer --- jf-live-writer/include/JFH5Writer.hpp | 58 ++-- jf-live-writer/src/JFH5Writer.cpp | 376 +++++++++++--------------- 2 files changed, 180 insertions(+), 254 deletions(-) diff --git a/jf-live-writer/include/JFH5Writer.hpp b/jf-live-writer/include/JFH5Writer.hpp index afd4a76..fb4a96a 100644 --- a/jf-live-writer/include/JFH5Writer.hpp +++ b/jf-live-writer/include/JFH5Writer.hpp @@ -3,48 +3,46 @@ #include #include -#include +#include +#include -#include "ImageAssembler.hpp" +extern "C" { + #include +} class JFH5Writer { + const std::string root_folder_; const std::string detector_name_; - const size_t n_modules_; - const uint64_t start_pulse_id_; - const uint64_t stop_pulse_id_; - const size_t pulse_id_step_; - const size_t n_images_; - const size_t n_total_pulses_; - size_t meta_write_index_; - size_t data_write_index_; + const uint32_t image_y_size_; + const uint32_t image_x_size_; - H5::H5File file_; - H5::DataSet image_dataset_; + static const int64_t NO_RUN_ID; + int64_t current_run_id_ = NO_RUN_ID; - uint64_t* b_pulse_id_; - uint64_t* b_frame_index_; - uint32_t* b_daq_rec_; - uint8_t* b_is_good_frame_ ; - - size_t get_n_pulses_in_range(const uint64_t start_pulse_id, - const uint64_t stop_pulse_id, - const int pulse_id_step); - - void write_metadata(); - std::string get_device_name(const std::string& device); + hid_t file_id_ = -1; + hid_t image_dataset_id_ = -1; + hid_t pulse_dataset_id_= -1; + hid_t frame_dataset_id_ = -1; + hid_t daq_rec_dataset_id_ = -1; + hid_t is_good_dataset_id_ = -1; + void open_file(const std::string& output_file, const uint32_t n_images); void close_file(); public: - JFH5Writer(const std::string& output_file, - const std::string& device, - const size_t n_modules, - const uint64_t start_pulse_id, - const uint64_t stop_pulse_id, - const size_t pulse_id_step); + JFH5Writer(const BufferUtils::DetectorConfig config); ~JFH5Writer(); - void write(const ImageMetadataBlock* metadata, const char* data); + void open_run(const int64_t run_id, const uint32_t n_images); + void close_run(); + + void write_data(const int64_t run_id, + const uint32_t index, + const char* data); + + void write_meta(const int64_t run_id, + const uint32_t index, + const ImageMetadata& meta); }; #endif //SFWRITER_HPP diff --git a/jf-live-writer/src/JFH5Writer.cpp b/jf-live-writer/src/JFH5Writer.cpp index 784597d..535d233 100644 --- a/jf-live-writer/src/JFH5Writer.cpp +++ b/jf-live-writer/src/JFH5Writer.cpp @@ -2,213 +2,181 @@ #include #include -#include +#include -#include "writer_config.hpp" + +#include "live_writer_config.hpp" #include "buffer_config.hpp" +#include "formats.hpp" -//extern "C" -//{ -// #include "H5DOpublic.h" -// #include -//} - -using namespace std; -using namespace writer_config; -using namespace buffer_config; - -JFH5Writer::JFH5Writer(const string& output_file, - const string& device, - const size_t n_modules, - const uint64_t start_pulse_id, - const uint64_t stop_pulse_id, - const size_t pulse_id_step) : - detector_name_(get_device_name(device)), - n_modules_(n_modules), - start_pulse_id_(start_pulse_id), - stop_pulse_id_(stop_pulse_id), - pulse_id_step_(pulse_id_step), - n_images_(get_n_pulses_in_range(start_pulse_id, - stop_pulse_id, - pulse_id_step)), - n_total_pulses_(stop_pulse_id_ - start_pulse_id_ + 1), - meta_write_index_(0), - data_write_index_(0) +extern "C" { - -// bshuf_register_h5filter(); - - file_ = H5::H5File(output_file, H5F_ACC_TRUNC); - file_.createGroup("/data"); - file_.createGroup("/data/" + detector_name_); - - H5::DataSpace att_space(H5S_SCALAR); - H5::DataType data_type = H5::StrType(0, H5T_VARIABLE); - - file_.createGroup("/general"); - auto detector_dataset = file_.createDataSet( - "/general/detector_name", data_type ,att_space); - - detector_dataset.write(detector_name_, data_type); - - hsize_t image_dataset_dims[3] = - {n_images_, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE}; - - H5::DataSpace image_dataspace(3, image_dataset_dims); - - hsize_t image_dataset_chunking[3] = - {1, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DSetCreatPropList image_dataset_properties; - image_dataset_properties.setChunk(3, image_dataset_chunking); - -// // block_size, compression type -// uint compression_prop[] = -// {MODULE_N_PIXELS, //block size -// BSHUF_H5_COMPRESS_LZ4}; // Compression type -// -// H5Pset_filter(image_dataset_properties.getId(), -// BSHUF_H5FILTER, -// H5Z_FLAG_MANDATORY, -// 2, -// &(compression_prop[0])); - - image_dataset_ = file_.createDataSet( - "/data/" + detector_name_ + "/data", - H5::PredType::NATIVE_UINT16, - image_dataspace, - image_dataset_properties); - - b_pulse_id_ = new uint64_t[n_total_pulses_]; - b_frame_index_= new uint64_t[n_total_pulses_]; - b_daq_rec_ = new uint32_t[n_total_pulses_]; - b_is_good_frame_ = new uint8_t[n_total_pulses_]; + #include "H5DOpublic.h" + #include } -std::string JFH5Writer::get_device_name(const std::string& device) -{ - size_t last_separator; - if ((last_separator = device.rfind("/")) == string::npos) { - return device; - } +using namespace std; +using namespace buffer_config; +using namespace live_writer_config; - return device.substr(last_separator+1); +JFH5Writer::JFH5Writer(const BufferUtils::DetectorConfig config): + root_folder_(config.buffer_folder), + detector_name_(config.detector_name), + image_x_size_(config.image_x_size), + image_y_size_(config.image_y_size) +{ } JFH5Writer::~JFH5Writer() { close_file(); - - delete[] b_pulse_id_; - delete[] b_frame_index_; - delete[] b_daq_rec_; - delete[] b_is_good_frame_; } -size_t JFH5Writer::get_n_pulses_in_range( - const uint64_t start_pulse_id, - const uint64_t stop_pulse_id, - const int pulse_id_step) +void JFH5Writer::open_run(const int64_t run_id, const uint32_t n_images) { - if (stop_pulse_id < start_pulse_id) { - throw runtime_error("stop_pulse_id smaller than start_pulse_id."); - } + close_file(); - if (100 % pulse_id_step != 0) { - throw runtime_error("100 is not divisible by the pulse_id_step."); - } + const string output_folder = root_folder_ + "/" + OUTPUT_FOLDER_SYMLINK; + // TODO: Maybe add leading zeros to filename? + const string output_file = output_folder + to_string(run_id) + ".h5"; - if (start_pulse_id % pulse_id_step != 0) { - throw runtime_error("start_pulse_id not divisible by pulse_id_step."); - } + open_file(output_file, n_images); - if (stop_pulse_id % pulse_id_step != 0) { - throw runtime_error("stop_pulse_id not divisible by pulse_id_step."); - } - - size_t n_pulses = 1; - n_pulses += (stop_pulse_id / pulse_id_step); - n_pulses -= start_pulse_id / pulse_id_step; - - if (n_pulses == 0) { - throw runtime_error("Zero pulses to write in given range."); - } - - return n_pulses; + current_run_id_ = run_id; } -void JFH5Writer::write_metadata() +void JFH5Writer::close_run() { - hsize_t b_m_dims[] = {n_total_pulses_}; - hsize_t b_m_count[] = {n_images_}; - hsize_t b_m_start[] = {0}; - hsize_t b_m_stride[] = {pulse_id_step_}; - H5::DataSpace b_m_space (1, b_m_dims); - b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start, b_m_stride); + close_file(); + current_run_id_ = NO_RUN_ID; +} - hsize_t f_m_dims[] = {n_images_, 1}; - H5::DataSpace f_m_space(2, f_m_dims); +void JFH5Writer::open_file(const string& output_file, const uint32_t n_images) +{ + // Create file + auto fcpl_id = H5Pcreate(H5P_FILE_ACCESS); + if (fcpl_id == -1) { + throw runtime_error("Error in file access property list."); + } - auto pulse_id_dataset = file_.createDataSet( - "/data/" + detector_name_ + "/pulse_id", - H5::PredType::NATIVE_UINT64, f_m_space); - pulse_id_dataset.write( - b_pulse_id_, H5::PredType::NATIVE_UINT64, b_m_space, f_m_space); - pulse_id_dataset.close(); + if (H5Pset_fapl_mpio(fcpl_id, MPI_COMM_WORLD, MPI_INFO_NULL) < 0) { + throw runtime_error("Cannot set mpio to property list."); + } - auto frame_index_dataset = file_.createDataSet( - "/data/" + detector_name_ + "/frame_index", - H5::PredType::NATIVE_UINT64, f_m_space); - frame_index_dataset.write( - b_frame_index_, H5::PredType::NATIVE_UINT64, b_m_space, f_m_space); - frame_index_dataset.close(); + file_id_ = H5Fcreate( + output_file.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, fcpl_id); + if (file_id_ < 0) { + throw runtime_error("Cannot create output file."); + } - auto daq_rec_dataset = file_.createDataSet( - "/data/" + detector_name_ + "/daq_rec", - H5::PredType::NATIVE_UINT32, f_m_space); - daq_rec_dataset.write( - b_daq_rec_, H5::PredType::NATIVE_UINT32, b_m_space, f_m_space); - daq_rec_dataset.close(); + H5Pclose(fcpl_id); - auto is_good_frame_dataset = file_.createDataSet( - "/data/" + detector_name_ + "/is_good_frame", - H5::PredType::NATIVE_UINT8, f_m_space); - is_good_frame_dataset.write( - b_is_good_frame_, H5::PredType::NATIVE_UINT8, b_m_space, f_m_space); - is_good_frame_dataset.close(); + // Create group + auto data_group_id = H5Gcreate(file_id_, detector_name_.c_str(), + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); + if (data_group_id < 0) { + throw runtime_error("Cannot create data group."); + } + + // Create image dataset. + auto dcpl_id = H5Pcreate(H5P_DATASET_CREATE); + if (dcpl_id < 0) { + throw runtime_error("Error in creating dataset create property list."); + } + + hsize_t image_dataset_chunking[] = {1, image_y_size_, image_x_size_}; + if (H5Pset_chunk(dcpl_id, 3, image_dataset_chunking) < 0) { + throw runtime_error("Cannot set image dataset chunking."); + } + + if (H5Pset_fill_time(dcpl_id, H5D_FILL_TIME_NEVER) < 0) { + throw runtime_error("Cannot set image dataset fill time."); + } + + if (H5Pset_alloc_time(dcpl_id, H5D_ALLOC_TIME_EARLY) < 0) { + throw runtime_error("Cannot set image dataset allocation time."); + } + + hsize_t image_dataset_dims[] = {n_images, image_y_size_, image_x_size_}; + auto image_space_id = H5Screate_simple(3, image_dataset_dims, NULL); + if (image_space_id < 0) { + throw runtime_error("Cannot create image dataset space."); + } + + // TODO: Enable compression. +// bshuf_register_h5filter(); +// uint filter_prop[] = {PIXEL_N_BYTES, BSHUF_H5_COMPRESS_LZ4}; +// if (H5Pset_filter(dcpl_id, BSHUF_H5FILTER, H5Z_FLAG_MANDATORY, +// 2, filter_prop) < 0) { +// throw runtime_error("Cannot set compression filter on dataset."); +// } + + image_dataset_id_ = H5Dcreate( + data_group_id, "data", H5T_NATIVE_INT, image_space_id, + H5P_DEFAULT, dcpl_id, H5P_DEFAULT); + if (image_dataset_id_ < 0) { + throw runtime_error("Cannot create image dataset."); + } + + // Create metadata datasets. + hsize_t meta_dataset_dims[] = {n_images}; + auto meta_space_id = H5Screate_simple(1, meta_dataset_dims, NULL); + if (meta_space_id < 0) { + throw runtime_error("Cannot create meta dataset space."); + } + + auto create_meta_dataset = [&](string name, hid_t data_type) { + auto dataset_id = H5Dcreate( + data_group_id, name.c_str(), data_type, meta_space_id, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); + if (dataset_id < 0) { + throw runtime_error("Cannot create " + name + " dataset."); + } + + return dataset_id; + }; + + pulse_dataset_id_ = create_meta_dataset("pulse_id", H5T_NATIVE_UINT64); + frame_dataset_id_ = create_meta_dataset("frame_index", H5T_NATIVE_UINT64); + daq_rec_dataset_id_ = create_meta_dataset("daq_rec", H5T_NATIVE_UINT32); + is_good_dataset_id_ = create_meta_dataset("is_good_frame", H5T_NATIVE_UINT8); + + H5Sclose(meta_space_id); + H5Sclose(image_space_id); + H5Pclose(dcpl_id); + H5Gclose(data_group_id); } void JFH5Writer::close_file() { - if (file_.getId() == -1) { + if (file_id_ < 0) { return; } - image_dataset_.close(); + H5Dclose(image_dataset_id_); + image_dataset_id_ = -1; - write_metadata(); + H5Dclose(pulse_dataset_id_); + pulse_dataset_id_ = -1; - file_.close(); + H5Dclose(frame_dataset_id_); + frame_dataset_id_ = -1; + + H5Dclose(daq_rec_dataset_id_); + daq_rec_dataset_id_ = -1; + + H5Dclose(is_good_dataset_id_); + is_good_dataset_id_ = -1; + + H5Fclose(file_id_); + file_id_ = -1; } -void JFH5Writer::write( - const ImageMetadataBlock* metadata, const char* data) +void JFH5Writer::write_data( + const int64_t run_id, const uint32_t index, const char* data) { - size_t n_images_offset = 0; - if (start_pulse_id_ > metadata->block_start_pulse_id) { - n_images_offset = start_pulse_id_ - metadata->block_start_pulse_id; - } - - if (n_images_offset > BUFFER_BLOCK_SIZE) { - throw runtime_error("Received unexpected block for start_pulse_id."); - } - - size_t n_images_to_copy = BUFFER_BLOCK_SIZE - n_images_offset; - if (stop_pulse_id_ < metadata->block_stop_pulse_id) { - n_images_to_copy -= metadata->block_stop_pulse_id - stop_pulse_id_; - } - - if (n_images_to_copy < 1) { - throw runtime_error("Received unexpected block for stop_pulse_id."); + if (run_id != current_run_id_) { + throw runtime_error("Invalid run_id."); } // hsize_t b_i_dims[3] = {BUFFER_BLOCK_SIZE, @@ -234,60 +202,20 @@ void JFH5Writer::write( // image_dataset_.write( // data, H5::PredType::NATIVE_UINT16, b_i_space, f_i_space); - // TODO: Can the i_image++ be made more efficient? - for (size_t i_image=n_images_offset; - i_image < n_images_offset + n_images_to_copy; - i_image++) { + hsize_t offset[] = {data_write_index_, 0, 0}; + size_t data_offset = i_image * MODULE_N_BYTES * n_modules_; + + H5DOwrite_chunk( + image_dataset_.getId(), + H5P_DEFAULT, + 0, + offset, + MODULE_N_BYTES * n_modules_, + data + data_offset); +} + +void JFH5Writer::write_meta( + const int64_t run_id, const uint32_t index, const ImageMetadata& meta) +{ - if (i_image % pulse_id_step_ != 0) { - continue; - } - - hsize_t offset[] = {data_write_index_, 0, 0}; - size_t data_offset = i_image * MODULE_N_BYTES * n_modules_; - - H5DOwrite_chunk( - image_dataset_.getId(), - H5P_DEFAULT, - 0, - offset, - MODULE_N_BYTES * n_modules_, - data + data_offset); - - data_write_index_++; - } - - // pulse_id - { - auto b_current_ptr = b_pulse_id_ + meta_write_index_; - memcpy(b_current_ptr, - &(metadata->pulse_id[n_images_offset]), - sizeof(uint64_t) * n_images_to_copy); - } - - // frame_index - { - auto b_current_ptr = b_frame_index_ + meta_write_index_; - memcpy(b_current_ptr, - &(metadata->frame_index[n_images_offset]), - sizeof(uint64_t) * n_images_to_copy); - } - - // daq_rec - { - auto b_current_ptr = b_daq_rec_ + meta_write_index_; - memcpy(b_current_ptr, - &(metadata->daq_rec[n_images_offset]), - sizeof(uint32_t) * n_images_to_copy); - } - - // is_good_frame - { - auto b_current_ptr = b_is_good_frame_ + meta_write_index_; - memcpy(b_current_ptr, - &(metadata->is_good_image[n_images_offset]), - sizeof(uint8_t) * n_images_to_copy); - } - - meta_write_index_ += n_images_to_copy; }