mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-04-23 21:40:46 +02:00
H5Writer supports multiple datasets
This commit is contained in:
+138
-90
@@ -12,15 +12,13 @@ extern "C"
|
||||
|
||||
using namespace std;
|
||||
|
||||
H5Writer::H5Writer(const std::string& filename, const std::string& dataset_name,
|
||||
hsize_t frames_per_file, hsize_t initial_dataset_size, hsize_t dataset_increase_step) :
|
||||
filename(filename), dataset_name(dataset_name), frames_per_file(frames_per_file),
|
||||
H5Writer::H5Writer(const std::string& filename, hsize_t frames_per_file, hsize_t initial_dataset_size, hsize_t dataset_increase_step) :
|
||||
filename(filename), frames_per_file(frames_per_file),
|
||||
initial_dataset_size(initial_dataset_size), dataset_increase_step(dataset_increase_step)
|
||||
{
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[H5Writer::H5Writer] Creating chunked writer";
|
||||
cout << " with filename " << filename;
|
||||
cout << " and dataset_name " << dataset_name;
|
||||
cout << " and frames_per_file " << frames_per_file;
|
||||
cout << " and initial_dataset_size " << initial_dataset_size;
|
||||
cout << endl;
|
||||
@@ -34,65 +32,138 @@ H5Writer::~H5Writer()
|
||||
|
||||
void H5Writer::close_file()
|
||||
{
|
||||
if (!is_file_open()) {
|
||||
if (is_file_open()) {
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[H5Writer::close_file] Closing file." << endl;
|
||||
#endif
|
||||
|
||||
hsize_t min_frame_in_dataset = 0;
|
||||
if (frames_per_file) {
|
||||
min_frame_in_dataset = (current_frame_chunk - 1) * frames_per_file;
|
||||
}
|
||||
|
||||
// max_data_index is relative to the current file.
|
||||
hsize_t max_frame_in_dataset = max_data_index + min_frame_in_dataset;
|
||||
|
||||
// Frame indexing starts at 1 (for some reason).
|
||||
auto image_nr_low = min_frame_in_dataset + 1;
|
||||
auto image_nr_high = max_frame_in_dataset + 1;
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[H5Writer::close_file] Setting datasets attribute image_nr_low=" << image_nr_low;
|
||||
cout << " and image_nr_high=" << image_nr_high << endl;
|
||||
#endif
|
||||
|
||||
for (const auto& dataset_map : datasets) {
|
||||
auto dataset = dataset_map.second;
|
||||
|
||||
H5FormatUtils::compact_dataset(dataset, max_data_index);
|
||||
|
||||
H5FormatUtils::write_attribute(dataset, "image_nr_low", image_nr_low);
|
||||
H5FormatUtils::write_attribute(dataset, "image_nr_high", image_nr_high);
|
||||
}
|
||||
|
||||
file.close();
|
||||
|
||||
} else {
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[H5Writer::close_file] Trying to close an already closed file." << endl;
|
||||
#endif
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[H5Writer::close_file] Closing file." << endl;
|
||||
#endif
|
||||
|
||||
H5FormatUtils::compact_dataset(dataset, max_frame_index);
|
||||
|
||||
hsize_t min_frame_in_dataset = 0;
|
||||
if (frames_per_file) {
|
||||
min_frame_in_dataset = (current_frame_chunk - 1) * frames_per_file;
|
||||
}
|
||||
|
||||
// max_frame_index is relative to the current file.
|
||||
hsize_t max_frame_in_dataset = max_frame_index + min_frame_in_dataset;
|
||||
|
||||
// Frame indexing starts at 1 (for some reason).
|
||||
auto image_nr_low = min_frame_in_dataset + 1;
|
||||
auto image_nr_high = max_frame_in_dataset + 1;
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[H5Writer::close_file] Setting dataset attribute image_nr_low=" << image_nr_low << " and image_nr_high=" << image_nr_high << endl;
|
||||
#endif
|
||||
|
||||
H5FormatUtils::write_attribute(dataset, "image_nr_low", image_nr_low);
|
||||
H5FormatUtils::write_attribute(dataset, "image_nr_high", image_nr_high);
|
||||
|
||||
// Cleanup.
|
||||
file.close();
|
||||
datasets.clear();
|
||||
datasets_current_size.clear();
|
||||
|
||||
current_frame_chunk = 0;
|
||||
current_dataset_size = 0;
|
||||
max_frame_index = 0;
|
||||
max_data_index = 0;
|
||||
}
|
||||
|
||||
void H5Writer::write_frame_data(size_t frame_index, const size_t* frame_shape, size_t data_bytes_size, const char* data, const string& data_type, const string& endianness)
|
||||
void H5Writer::write_data(const string& dataset_name, const size_t data_index, const char* data,
|
||||
const std::vector<size_t>& data_shape, const size_t data_bytes_size, const string& data_type, const string& endianness)
|
||||
{
|
||||
// Define the ofset of the currently received image in the file.
|
||||
hsize_t relative_frame_index = prepare_storage_for_frame(frame_index, frame_shape, data_type, endianness);
|
||||
hsize_t relative_data_index = prepare_storage_for_data(dataset_name, data_index, data_shape, data_type, endianness);
|
||||
|
||||
// Define where to write values in the dataset.
|
||||
const hsize_t offset[] = {relative_frame_index, 0, 0};
|
||||
// Define the offset where to write the data.
|
||||
size_t data_rank = data_shape.size();
|
||||
hsize_t offset[data_rank+1];
|
||||
|
||||
offset[0] = relative_data_index;
|
||||
for (uint index=0; index<data_rank; ++index) {
|
||||
offset[index+1] = 0;
|
||||
}
|
||||
|
||||
// No compression for now.
|
||||
uint32_t filters = 0;
|
||||
|
||||
const auto& dataset = datasets.at(dataset_name);
|
||||
|
||||
if( H5DOwrite_chunk(dataset.getId(), H5P_DEFAULT, filters, offset, data_bytes_size, data) )
|
||||
{
|
||||
stringstream error_message;
|
||||
error_message << "Error while writing chunk to file at offset " << relative_frame_index << "." << endl;
|
||||
error_message << "Error while writing dataset " << dataset_name << " chunk to file at offset ";
|
||||
error_message << relative_data_index << "." << endl;
|
||||
|
||||
throw invalid_argument( error_message.str() );
|
||||
}
|
||||
}
|
||||
|
||||
void H5Writer::create_file(const size_t* frame_shape, hsize_t frame_chunk, const string& type, const string& endianness)
|
||||
void H5Writer::create_dataset(const string& dataset_name, const vector<size_t>& data_shape,
|
||||
const string& data_type, const string& endianness)
|
||||
{
|
||||
// TODO: Create folder if it does not exist.
|
||||
|
||||
// Number of dimensions in each data point.
|
||||
const size_t data_rank = data_shape.size();
|
||||
// The +1 dimension is to account for the sequence of data points (time).
|
||||
const hsize_t dataset_rank = data_rank + 1;
|
||||
|
||||
hsize_t dataset_dimension[dataset_rank];
|
||||
hsize_t max_dataset_dimension[dataset_rank];
|
||||
hsize_t dataset_chunking[dataset_rank];
|
||||
|
||||
dataset_dimension[0] = initial_dataset_size;
|
||||
// This dataset can be resized without limits.
|
||||
max_dataset_dimension[0] = H5S_UNLIMITED;
|
||||
// Chunking is always set to a single data point.
|
||||
dataset_chunking[0] = 1;
|
||||
|
||||
for (size_t index=0; index<data_rank; ++index) {
|
||||
dataset_dimension[index+1] = data_shape[index];
|
||||
max_dataset_dimension[index+1] = data_shape[index];
|
||||
dataset_chunking[index+1] = data_shape[index];
|
||||
}
|
||||
|
||||
H5::DataSpace dataspace(dataset_rank, dataset_dimension, max_dataset_dimension);
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[H5Writer::create_dataset] Creating dataspace of size (";
|
||||
for (hsize_t i=0; i<dataset_rank; ++i) {
|
||||
cout << dataset_dimension[i] << ",";
|
||||
}
|
||||
cout << ")" << endl;
|
||||
#endif
|
||||
|
||||
H5::DSetCreatPropList dataset_properties;
|
||||
dataset_properties.setChunk(dataset_rank, dataset_chunking);
|
||||
|
||||
H5::AtomType dataset_data_type(H5FormatUtils::get_dataset_data_type(data_type));
|
||||
|
||||
if (endianness == "big") {
|
||||
dataset_data_type.setOrder(H5T_ORDER_BE);
|
||||
} else {
|
||||
dataset_data_type.setOrder(H5T_ORDER_LE);
|
||||
}
|
||||
|
||||
auto dataset = file.createDataSet(dataset_name.c_str(), dataset_data_type, dataspace, dataset_properties);
|
||||
|
||||
datasets.insert({dataset_name, dataset});
|
||||
datasets_current_size.insert({dataset_name, initial_dataset_size});
|
||||
}
|
||||
|
||||
void H5Writer::create_file(hsize_t frame_chunk)
|
||||
{
|
||||
|
||||
if (file.getId() != -1) {
|
||||
@@ -118,43 +189,10 @@ void H5Writer::create_file(const size_t* frame_shape, hsize_t frame_chunk, const
|
||||
cout << "[H5Writer::create_file] Creating filename " << target_filename << endl;
|
||||
#endif
|
||||
|
||||
// TODO: Create folder if it does not exist.
|
||||
|
||||
file = H5::H5File( target_filename.c_str(), H5F_ACC_TRUNC );
|
||||
file = H5::H5File(target_filename.c_str(), H5F_ACC_TRUNC);
|
||||
|
||||
hsize_t dataset_rank = 3;
|
||||
const hsize_t dataset_dimension[] = {initial_dataset_size, frame_shape[0], frame_shape[1]};
|
||||
const hsize_t max_dataset_dimension[] = {H5S_UNLIMITED, frame_shape[0], frame_shape[1]};
|
||||
H5::DataSpace dataspace(dataset_rank, dataset_dimension, max_dataset_dimension);
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[H5Writer::create_file] Creating dataspace of size (";
|
||||
for (hsize_t i=0; i<dataset_rank; ++i) {
|
||||
cout << dataset_dimension[i] << ",";
|
||||
}
|
||||
cout << ")" << endl;
|
||||
#endif
|
||||
|
||||
// Set chunking to single image.
|
||||
H5::DSetCreatPropList dataset_properties;
|
||||
const hsize_t dataset_chunking[] = {1, frame_shape[0], frame_shape[1]};
|
||||
dataset_properties.setChunk(dataset_rank, dataset_chunking);
|
||||
|
||||
H5::AtomType data_type(H5FormatUtils::get_dataset_data_type(type));
|
||||
|
||||
if (endianness == "big") {
|
||||
data_type.setOrder(H5T_ORDER_BE);
|
||||
} else {
|
||||
data_type.setOrder(H5T_ORDER_LE);
|
||||
}
|
||||
|
||||
// Take into account initial size, set chunking.
|
||||
dataset = file.createDataSet(dataset_name.c_str(), data_type, dataspace, dataset_properties);
|
||||
|
||||
// New file created - update global values.
|
||||
// New file created - set this files chunk number.
|
||||
current_frame_chunk = frame_chunk;
|
||||
current_dataset_size = initial_dataset_size;
|
||||
|
||||
}
|
||||
|
||||
bool H5Writer::is_file_open()
|
||||
@@ -162,44 +200,54 @@ bool H5Writer::is_file_open()
|
||||
return (file.getId() != -1);
|
||||
}
|
||||
|
||||
hsize_t H5Writer::prepare_storage_for_frame(size_t frame_index, const size_t* frame_shape, const string& data_type, const string& endianness)
|
||||
hsize_t H5Writer::prepare_storage_for_data(const string& dataset_name, const size_t data_index, const std::vector<size_t>& data_shape,
|
||||
const string& data_type, const string& endianness)
|
||||
{
|
||||
|
||||
hsize_t relative_frame_index = frame_index;
|
||||
hsize_t relative_data_index = data_index;
|
||||
|
||||
// Check if we have to create a new file.
|
||||
if (frames_per_file) {
|
||||
hsize_t frame_chunk = (frame_index / frames_per_file) + 1;
|
||||
hsize_t frame_chunk = (data_index / frames_per_file) + 1;
|
||||
|
||||
// This frames does not go into this file.
|
||||
if (frame_chunk != current_frame_chunk) {
|
||||
create_file(frame_shape, frame_chunk, data_type, endianness);
|
||||
create_file(frame_chunk);
|
||||
}
|
||||
|
||||
// Make the frame_index relative to this chunk (file).
|
||||
relative_frame_index = frame_index - ((frame_chunk - 1) * frames_per_file);
|
||||
// Make the data_index relative to this chunk (file).
|
||||
relative_data_index = data_index - ((frame_chunk - 1) * frames_per_file);
|
||||
}
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[H5Writer::prepare_storage_for_frame] Received frame index " << frame_index << " and processed as relative frame index " << relative_frame_index << endl;
|
||||
cout << "[H5Writer::prepare_storage_for_data] Received frame index " << data_index << " and processed as relative frame index " << relative_data_index << endl;
|
||||
#endif
|
||||
|
||||
// Open the file if needed.
|
||||
if (!is_file_open()) {
|
||||
create_file(frame_shape, 0, data_type, endianness);
|
||||
create_file();
|
||||
}
|
||||
|
||||
// Create the dataset if we don't have it yet.
|
||||
if (datasets.find(dataset_name) == datasets.end()) {
|
||||
create_dataset(dataset_name, data_shape, data_type, endianness);
|
||||
}
|
||||
|
||||
hsize_t current_dataset_size = datasets_current_size.at(dataset_name);
|
||||
|
||||
// Expand the dataset if needed.
|
||||
if (relative_frame_index > current_dataset_size) {
|
||||
current_dataset_size = H5FormatUtils::expand_dataset(dataset, relative_frame_index, dataset_increase_step);
|
||||
if (relative_data_index > current_dataset_size) {
|
||||
auto dataset = datasets.at(dataset_name);
|
||||
|
||||
hsize_t new_dataset_size = H5FormatUtils::expand_dataset(dataset, relative_data_index, dataset_increase_step);
|
||||
datasets_current_size[dataset_name] = new_dataset_size;
|
||||
}
|
||||
|
||||
// Keep track of the max index in this file - needed for shrinking the dataset at the end.
|
||||
if (relative_frame_index > max_frame_index) {
|
||||
max_frame_index = relative_frame_index;
|
||||
if (relative_data_index > max_data_index) {
|
||||
max_data_index = relative_data_index;
|
||||
}
|
||||
|
||||
return relative_frame_index;
|
||||
return relative_data_index;
|
||||
}
|
||||
|
||||
H5::H5File& H5Writer::get_h5_file()
|
||||
|
||||
+17
-12
@@ -1,37 +1,42 @@
|
||||
#ifndef H5WRITER_H
|
||||
#define H5WRITER_H
|
||||
|
||||
#include <map>
|
||||
#include <unordered_map>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include <H5Cpp.h>
|
||||
|
||||
class H5Writer
|
||||
{
|
||||
// Initialized in constructor.
|
||||
const std::string filename;
|
||||
const std::string dataset_name;
|
||||
hsize_t frames_per_file;
|
||||
hsize_t initial_dataset_size;
|
||||
hsize_t dataset_increase_step = 0;
|
||||
|
||||
// State variables.
|
||||
hsize_t max_frame_index = 0;
|
||||
hsize_t current_dataset_size = 0;
|
||||
hsize_t max_data_index = 0;
|
||||
hsize_t current_frame_chunk = 0;
|
||||
|
||||
H5::H5File file;
|
||||
H5::DataSet dataset;
|
||||
std::unordered_map<std::string, H5::DataSet> datasets;
|
||||
std::unordered_map<std::string, hsize_t> datasets_current_size;
|
||||
|
||||
hsize_t prepare_storage_for_frame(size_t frame_index, const size_t* frame_shape, const std::string& data_type, const std::string& endianness);
|
||||
void create_file(const size_t* frame_shape, hsize_t frame_chunk, const std::string& data_type, const std::string& endianness);
|
||||
hsize_t prepare_storage_for_data(const std::string& dataset_name, const size_t data_index, const std::vector<size_t>& data_shape,
|
||||
const std::string& data_type, const std::string& endianness);
|
||||
|
||||
void create_file(const hsize_t frame_chunk=0);
|
||||
|
||||
void create_dataset(const std::string& dataset_name, const std::vector<size_t>& data_shape,
|
||||
const std::string& data_type, const std::string& endianness);
|
||||
|
||||
public:
|
||||
H5Writer(const std::string& filename, const std::string& dataset_name,
|
||||
hsize_t frames_per_file=0, hsize_t initial_dataset_size=1000, hsize_t dataset_increase_step=1000);
|
||||
~H5Writer();
|
||||
H5Writer(const std::string& filename, hsize_t frames_per_file=0, hsize_t initial_dataset_size=1000, hsize_t dataset_increase_step=1000);
|
||||
virtual ~H5Writer();
|
||||
bool is_file_open();
|
||||
void close_file();
|
||||
void write_frame_data(size_t frame_index, const size_t* frame_shape, size_t data_bytes_size,
|
||||
const char* data, const std::string& data_type, const std::string& endianness);
|
||||
void write_data(const std::string& dataset_name, const size_t data_index, const char* data, const std::vector<size_t>& data_shape,
|
||||
const size_t data_bytes_size, const std::string& data_type, const std::string& endianness);
|
||||
H5::H5File& get_h5_file();
|
||||
};
|
||||
|
||||
|
||||
+10
-11
@@ -17,7 +17,8 @@ namespace pt = boost::property_tree;
|
||||
|
||||
void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer)
|
||||
{
|
||||
H5Writer writer(manager.get_output_file(), format.get_raw_frames_dataset_name());
|
||||
H5Writer writer(manager.get_output_file());
|
||||
auto raw_frames_dataset_name = format.get_raw_frames_dataset_name();
|
||||
|
||||
// Run until the running flag is set or the ring_buffer is empty.
|
||||
while(manager.is_running() || !ring_buffer.is_empty()) {
|
||||
@@ -35,18 +36,16 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
|
||||
}
|
||||
|
||||
// Write image data.
|
||||
writer.write_frame_data(received_data.first->frame_index,
|
||||
received_data.first->frame_shape,
|
||||
received_data.first->frame_bytes_size,
|
||||
received_data.second,
|
||||
received_data.first->type,
|
||||
received_data.first->endianness);
|
||||
writer.write_data(raw_frames_dataset_name,
|
||||
received_data.first->frame_index,
|
||||
received_data.second,
|
||||
received_data.first->frame_shape,
|
||||
received_data.first->frame_bytes_size,
|
||||
received_data.first->type,
|
||||
received_data.first->endianness);
|
||||
|
||||
ring_buffer.release(received_data.first->buffer_slot_index);
|
||||
|
||||
// Write header data.
|
||||
writer.write_data();
|
||||
|
||||
manager.written_frame(received_data.first->frame_index);
|
||||
}
|
||||
|
||||
@@ -186,7 +185,7 @@ shared_ptr<FrameMetadata> ProcessManager::read_json_header(pt::ptree& json_heade
|
||||
header_data->frame_index = json_header.get<uint64_t>("frame");
|
||||
|
||||
for (const auto& item : json_header.get_child("shape")) {
|
||||
header_data->frame_shape.insert(item.second.get_value<size_t>());
|
||||
header_data->frame_shape.push_back(item.second.get_value<size_t>());
|
||||
}
|
||||
|
||||
// Array 1.0 specified little endian as the default encoding.
|
||||
|
||||
@@ -19,5 +19,5 @@ namespace ProcessManager
|
||||
const std::string& header, const std::map<std::string, HEADER_DATA_TYPE>& header_data_type);
|
||||
|
||||
boost::any get_value_from_json(const boost::property_tree::ptree& json_header,
|
||||
const string& value_name, const HEADER_DATA_TYPE data_type);
|
||||
const std::string& value_name, const HEADER_DATA_TYPE data_type);
|
||||
};
|
||||
Reference in New Issue
Block a user