Reorder the repository

This commit is contained in:
2018-07-12 10:51:47 +02:00
parent 7115749da3
commit a481e67428
33 changed files with 2 additions and 2 deletions
+378
View File
@@ -0,0 +1,378 @@
#include <string>
#include <sstream>
#include <stdexcept>
#include <iostream>
#include "config.hpp"
#include "H5Format.hpp"
using namespace std;
hsize_t H5FormatUtils::expand_dataset(H5::DataSet& dataset, hsize_t frame_index, hsize_t dataset_increase_step)
{
const auto& data_space = dataset.getSpace();
int dataset_rank = data_space.getSimpleExtentNdims();
hsize_t dataset_dimension[dataset_rank];
data_space.getSimpleExtentDims(dataset_dimension);
dataset_dimension[0] = frame_index + dataset_increase_step;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5FormatUtils::expand_dataset] Expanding dataspace to size (";
for (int i=0; i<dataset_rank; ++i) {
cout << dataset_dimension[i] << ",";
}
cout << ")" << endl;
#endif
dataset.extend(dataset_dimension);
return dataset_dimension[0];
}
void H5FormatUtils::compact_dataset(H5::DataSet& dataset, hsize_t max_frame_index)
{
const auto& data_space = dataset.getSpace();
int dataset_rank = data_space.getSimpleExtentNdims();
hsize_t dataset_dimension[dataset_rank];
data_space.getSimpleExtentDims(dataset_dimension);
dataset_dimension[0] = max_frame_index + 1;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5FormatUtils::compact_dataset] Compacting dataspace to size (";
for (int i=0; i<dataset_rank; ++i) {
cout << dataset_dimension[i] << ",";
}
cout << ")" << endl;
#endif
dataset.extend(dataset_dimension);
}
H5::Group H5FormatUtils::create_group(H5::Group& target, const string& name)
{
return target.createGroup(name.c_str());
}
const boost::any& H5FormatUtils::get_value_from_reference(const string& dataset_name,
const boost::any& value_reference, const unordered_map<string, boost::any>& values)
{
try {
auto reference_string = boost::any_cast<string>(value_reference);
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5FormatUtils::get_value_from_reference] Getting dataset '"<< dataset_name;
cout << "' reference value '" << reference_string << "'." << endl;
#endif
return values.at(reference_string);
} catch (const boost::bad_any_cast& exception) {
stringstream error_message;
using namespace date;
error_message << "[" << chrono::system_clock::now() << "]";
error_message << "Cannot convert dataset " << dataset_name << " value reference to string." << endl;
throw runtime_error(error_message.str());
} catch (const out_of_range& exception){
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Dataset " << dataset_name << " value reference " << boost::any_cast<string>(value_reference);
error_message << " not present in values map." << endl;
throw runtime_error(error_message.str());
}
}
const H5::PredType& H5FormatUtils::get_dataset_data_type(const string& type)
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5FormatUtils::get_dataset_data_type] Getting dataset type for received frame type " << type << endl;
#endif
if (type == "uint8") {
return H5::PredType::NATIVE_UINT8;
} else if (type == "uint16") {
return H5::PredType::NATIVE_UINT16;
} else if (type == "uint32") {
return H5::PredType::NATIVE_UINT32;
} else if (type == "uint64") {
return H5::PredType::NATIVE_UINT64;
} else if (type == "int8") {
return H5::PredType::NATIVE_INT8;
} else if (type == "int16") {
return H5::PredType::NATIVE_INT16;
} else if (type == "int32") {
return H5::PredType::NATIVE_INT32;
} else if (type == "int64") {
return H5::PredType::NATIVE_INT64;
} else {
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[H5FormatUtils::get_dataset_data_type] Unsupported dataset data_type " << type << endl;
throw runtime_error(error_message.str());
}
}
H5::DataSet H5FormatUtils::write_dataset(H5::Group& target, const h5_dataset& dataset,
const unordered_map<string, boost::any>& values)
{
const string& name = dataset.name;
boost::any value;
// Value is stored directly in the struct.
if (dataset.data_location == IMMEDIATE){
value = dataset.value;
// Value in struct is just a string reference to into the values map.
} else {
value = H5FormatUtils::get_value_from_reference(name, dataset.value, values);
}
if (dataset.data_type == NX_CHAR || dataset.data_type == NX_DATE_TIME || dataset.data_type == NXnote) {
// Attempt to convert to const char * (string "literals" cause that).
try {
return H5FormatUtils::write_dataset(target, name, string(boost::any_cast<const char*>(value)));
} catch (const boost::bad_any_cast& exception) {}
// Atempt to convert to string.
try {
return H5FormatUtils::write_dataset(target, name, boost::any_cast<string>(value));
} catch (const boost::bad_any_cast& exception) {}
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Cannot convert dataset " << name << " to string or const char*." << endl;
throw runtime_error(error_message.str());
} else if (dataset.data_type == NX_INT) {
try {
return H5FormatUtils::write_dataset(target, name, boost::any_cast<int>(value));
} catch (const boost::bad_any_cast& exception) {}
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Cannot convert dataset " << name << " to NX_INT." << endl;
throw runtime_error(error_message.str());
} else if (dataset.data_type == NX_FLOAT || dataset.data_type == NX_NUMBER) {
try {
return H5FormatUtils::write_dataset(target, name, boost::any_cast<double>(value));
} catch (const boost::bad_any_cast& exception) {}
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Cannot convert dataset " << name << " to NX_FLOAT." << endl;
throw runtime_error(error_message.str());
} else {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Unsupported dataset type for dataset " << name << "." << endl;
throw runtime_error(error_message.str());
}
}
H5::DataSet H5FormatUtils::write_dataset(H5::Group& target, const string& name, double value)
{
H5::DataSpace att_space(H5S_SCALAR);
auto data_type = H5::PredType::NATIVE_DOUBLE;
H5::DataSet dataset = target.createDataSet(name.c_str(), data_type , att_space);
dataset.write(&value, data_type);
return dataset;
}
H5::DataSet H5FormatUtils::write_dataset(H5::Group& target, const string& name, int value)
{
H5::DataSpace att_space(H5S_SCALAR);
auto data_type = H5::PredType::NATIVE_INT;
H5::DataSet dataset = target.createDataSet(name.c_str(), data_type, att_space);
dataset.write(&value, data_type);
return dataset;
}
H5::DataSet H5FormatUtils::write_dataset(H5::Group& target, const string& name, const string& value)
{
H5::DataSpace att_space(H5S_SCALAR);
H5::DataType data_type = H5::StrType(0, H5T_VARIABLE);
H5::DataSet dataset = target.createDataSet(name.c_str(), data_type ,att_space);
dataset.write(&value, data_type);
return dataset;
}
void H5FormatUtils::write_attribute(H5::H5Object& target, const string& name, const string& value)
{
H5::DataSpace att_space(H5S_SCALAR);
H5::DataType data_type = H5::StrType(H5::PredType::C_S1, H5T_VARIABLE);
auto h5_attribute = target.createAttribute(name.c_str(), data_type, att_space);
h5_attribute.write(data_type, &value);
}
void H5FormatUtils::write_attribute(H5::H5Object& target, const string& name, int value)
{
H5::DataSpace att_space(H5S_SCALAR);
auto data_type = H5::PredType::NATIVE_INT;
auto h5_attribute = target.createAttribute(name.c_str(), data_type, att_space);
h5_attribute.write(data_type, &value);
}
void H5FormatUtils::write_attribute(H5::H5Object& target, const h5_attr& attribute,
const unordered_map<string, boost::any>& values)
{
string name = attribute.name;
boost::any value;
// Value is stored directly in the struct.
if (attribute.data_location == IMMEDIATE){
value = attribute.value;
// Value in struct is just a string reference to into the values map.
} else {
value = H5FormatUtils::get_value_from_reference(name, attribute.value, values);
}
if (attribute.data_type == NX_CHAR) {
// Attempt to convert to const char * (string "literals" cause that).
try {
H5FormatUtils::write_attribute(target, name, string(boost::any_cast<const char*>(value)));
return;
} catch (const boost::bad_any_cast& exception) {}
// Atempt to convert to string.
try {
H5FormatUtils::write_attribute(target, name, boost::any_cast<string>(value));
return;
} catch (const boost::bad_any_cast& exception) {}
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Cannot convert attribute " << name << " to string or const char*." << endl;
throw runtime_error(error_message.str());
} else if (attribute.data_type == NX_INT) {
try {
H5FormatUtils::write_attribute(target, name, boost::any_cast<int>(value));
return;
} catch (const boost::bad_any_cast& exception) {}
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Cannot convert attribute " << name << " to INT." << endl;
throw runtime_error(error_message.str());
}
}
void H5FormatUtils::write_format_data(H5::Group& file_node, const h5_parent& format_node,
const std::unordered_map<std::string, h5_value>& values)
{
auto process_items = [&format_node, &values](H5::Group& node_group){
for (const auto item_ptr : format_node.items) {
const h5_base& item = *item_ptr;
if (item.node_type == GROUP) {
auto sub_group = dynamic_cast<const h5_group&>(item);
write_format_data(node_group, sub_group, values);
} else if (item.node_type == ATTRIBUTE) {
auto sub_attribute = dynamic_cast<const h5_attr&>(item);
H5FormatUtils::write_attribute(node_group, sub_attribute, values);
} else if (item.node_type == DATASET) {
auto sub_dataset = dynamic_cast<const h5_dataset&>(item);
auto current_dataset = H5FormatUtils::write_dataset(node_group, sub_dataset, values);
for (const auto dataset_attr_ptr : sub_dataset.items) {
const h5_base& dataset_attr = *dataset_attr_ptr;
// You can specify only attributes inside a dataset.
if (dataset_attr.node_type != ATTRIBUTE) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Invalid element " << dataset_attr.name << " on dataset " << sub_dataset.name << ". Only attributes allowd.";
throw invalid_argument( error_message.str() );
}
auto sub_attribute = dynamic_cast<const h5_attr&>(dataset_attr);
H5FormatUtils::write_attribute(current_dataset, sub_attribute, values);
}
}
}
};
if (format_node.node_type == GROUP) {
auto x = H5FormatUtils::create_group(file_node, format_node.name);
process_items(x);
}else {
process_items(file_node);
}
}
void H5FormatUtils::write_format(H5::H5File& file, const H5Format& format,
const std::unordered_map<std::string, h5_value>& input_values)
{
auto format_definition = format.get_format_definition();
auto default_values = format.get_default_values();
auto format_values(default_values);
format.add_input_values(format_values, input_values);
format.add_calculated_values(format_values);
write_format_data(file, format_definition, format_values);
for (const auto& mapping : format.get_dataset_move_mapping()) {
file.move(mapping.first.c_str(), mapping.second.c_str());
}
}
+143
View File
@@ -0,0 +1,143 @@
#ifndef H5FORMAT_H
#define H5FORMAT_H
#include <string>
#include <list>
#include <unordered_map>
#include <H5Cpp.h>
#include <memory>
#include <tuple>
#include <boost/any.hpp>
#include <chrono>
#include "date.h"
typedef boost::any h5_value;
enum NODE_TYPE
{
EMPTY_ROOT,
ATTRIBUTE,
DATASET,
GROUP
};
enum DATA_TYPE
{
NX_FLOAT,
NX_CHAR,
NX_INT,
NX_DATE_TIME,
NX_NUMBER,
NXnote
};
enum DATA_LOCATION
{
IMMEDIATE,
REFERENCE
};
struct h5_base
{
h5_base(const std::string& name, NODE_TYPE node_type) : name(name), node_type(node_type){};
virtual ~h5_base(){}
std::string name;
NODE_TYPE node_type;
};
struct h5_data_base
{
h5_data_base(DATA_TYPE data_type, DATA_LOCATION data_location) : data_type(data_type), data_location(data_location) {};
virtual ~h5_data_base(){}
DATA_TYPE data_type;
DATA_LOCATION data_location;
};
struct h5_parent: public h5_base
{
h5_parent(const std::string& name, NODE_TYPE node_type, const std::list<std::shared_ptr<h5_base>>& items) :
h5_base(name, node_type), items(items) {};
std::list<std::shared_ptr<h5_base>> items;
};
struct h5_group : public h5_parent
{
h5_group(const std::string& name, const std::list<std::shared_ptr<h5_base>>& items={}) :
h5_parent(name, GROUP, items) {};
};
struct h5_dataset : public h5_parent, public h5_data_base
{
h5_dataset(const std::string& name, const std::string& value, DATA_TYPE data_type, const std::list<std::shared_ptr<h5_base>>& items={})
: h5_parent(name, DATASET, items), h5_data_base(data_type, REFERENCE), value(value) {};
std::string value;
};
struct h5_attr : public h5_base, public h5_data_base
{
h5_attr(const std::string& name, const h5_value& value, DATA_TYPE data_types, DATA_LOCATION data_location=IMMEDIATE)
: h5_base(name, ATTRIBUTE), h5_data_base(data_types, data_location), value(value){};
h5_value value;
};
class H5Format
{
public:
virtual ~H5Format(){};
virtual const std::unordered_map<std::string, DATA_TYPE>& get_input_value_type() const = 0;
virtual const std::unordered_map<std::string, boost::any>& get_default_values() const = 0;
virtual const h5_parent& get_format_definition() const = 0;
virtual void add_calculated_values(std::unordered_map<std::string, boost::any>& values) const = 0;
virtual void add_input_values(std::unordered_map<std::string, boost::any>& values,
const std::unordered_map<std::string, boost::any>& input_values) const = 0;
virtual const std::unordered_map<std::string, std::string>& get_dataset_move_mapping() const = 0;
};
namespace H5FormatUtils
{
hsize_t expand_dataset(H5::DataSet& dataset, hsize_t frame_index, hsize_t dataset_increase_step);
void compact_dataset(H5::DataSet& dataset, hsize_t max_frame_index);
H5::Group create_group(H5::Group& target, const std::string& name);
const H5::PredType& get_dataset_data_type(const std::string& type);
H5::DataSet write_dataset(H5::Group& target, const h5_dataset& dataset,
const std::unordered_map<std::string, boost::any>& values);
H5::DataSet write_dataset(H5::Group& target, const std::string& name, double value);
H5::DataSet write_dataset(H5::Group& target, const std::string& name, int value);
H5::DataSet write_dataset(H5::Group& target, const std::string& name, const std::string& value);
void write_attribute(H5::H5Object& target, const h5_attr& attribute,
const std::unordered_map<std::string, boost::any>& values);
void write_attribute(H5::H5Object& target, const std::string& name, const std::string& value);
void write_attribute(H5::H5Object& target, const std::string& name, int value);
const boost::any& get_value_from_reference(const std::string& dataset_name,
const boost::any& value_reference, const std::unordered_map<std::string, boost::any>& values);
void write_format_data(H5::Group& file_node, const h5_parent& format_node,
const std::unordered_map<std::string, h5_value>& values);
void write_format(H5::H5File& file, const H5Format& format,
const std::unordered_map<std::string, h5_value>& input_values);
};
#endif
+275
View File
@@ -0,0 +1,275 @@
#include <sstream>
#include <stdexcept>
#include <iostream>
#include "H5Writer.hpp"
#include "H5Format.hpp"
extern "C"
{
#include "H5DOpublic.h"
}
using namespace std;
H5Writer::H5Writer(const std::string& filename, hsize_t frames_per_file, hsize_t initial_dataset_size, hsize_t dataset_increase_step) :
filename(filename), frames_per_file(frames_per_file),
initial_dataset_size(initial_dataset_size), dataset_increase_step(dataset_increase_step)
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::H5Writer] Creating chunked writer";
cout << " with filename " << filename;
cout << " and frames_per_file " << frames_per_file;
cout << " and initial_dataset_size " << initial_dataset_size;
cout << endl;
#endif
}
H5Writer::~H5Writer()
{
close_file();
}
void H5Writer::close_file()
{
if (is_file_open()) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::close_file] Closing file." << endl;
#endif
hsize_t min_frame_in_dataset = 0;
if (frames_per_file) {
min_frame_in_dataset = (current_frame_chunk - 1) * frames_per_file;
}
// max_data_index is relative to the current file.
hsize_t max_frame_in_dataset = max_data_index + min_frame_in_dataset;
// Frame indexing starts at 1 (for some reason).
auto image_nr_low = min_frame_in_dataset + 1;
auto image_nr_high = max_frame_in_dataset + 1;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::close_file] Setting datasets attribute image_nr_low=" << image_nr_low;
cout << " and image_nr_high=" << image_nr_high << endl;
#endif
for (const auto& dataset_map : datasets) {
auto dataset = dataset_map.second;
H5FormatUtils::compact_dataset(dataset, max_data_index);
H5FormatUtils::write_attribute(dataset, "image_nr_low", image_nr_low);
H5FormatUtils::write_attribute(dataset, "image_nr_high", image_nr_high);
}
file.close();
} else {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::close_file] Trying to close an already closed file." << endl;
#endif
}
// Cleanup.
datasets.clear();
datasets_current_size.clear();
current_frame_chunk = 0;
max_data_index = 0;
}
void H5Writer::write_data(const string& dataset_name, const size_t data_index, const char* data,
const std::vector<size_t>& data_shape, const size_t data_bytes_size, const string& data_type, const string& endianness)
{
try {
// Define the ofset of the currently received image in the file.
hsize_t relative_data_index = prepare_storage_for_data(dataset_name, data_index, data_shape, data_type, endianness);
// Define the offset where to write the data.
size_t data_rank = data_shape.size();
hsize_t offset[data_rank+1];
offset[0] = relative_data_index;
for (uint index=0; index<data_rank; ++index) {
offset[index+1] = 0;
}
// No compression for now.
uint32_t filters = 0;
const auto& dataset = datasets.at(dataset_name);
if( H5DOwrite_chunk(dataset.getId(), H5P_DEFAULT, filters, offset, data_bytes_size, data) )
{
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "Error while writing dataset " << dataset_name << " chunk to file at offset ";
error_message << relative_data_index << "." << endl;
throw invalid_argument( error_message.str() );
}
} catch (...) {
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::write_data] Error while trying to write data to dataset " << dataset_name << endl;
throw;
}
}
void H5Writer::create_dataset(const string& dataset_name, const vector<size_t>& data_shape,
const string& data_type, const string& endianness)
{
// Number of dimensions in each data point.
const size_t data_rank = data_shape.size();
// The +1 dimension is to account for the sequence of data points (time).
const hsize_t dataset_rank = data_rank + 1;
hsize_t dataset_dimension[dataset_rank];
hsize_t max_dataset_dimension[dataset_rank];
hsize_t dataset_chunking[dataset_rank];
dataset_dimension[0] = initial_dataset_size;
// This dataset can be resized without limits.
max_dataset_dimension[0] = H5S_UNLIMITED;
// Chunking is always set to a single data point.
dataset_chunking[0] = 1;
for (size_t index=0; index<data_rank; ++index) {
dataset_dimension[index+1] = data_shape[index];
max_dataset_dimension[index+1] = data_shape[index];
dataset_chunking[index+1] = data_shape[index];
}
H5::DataSpace dataspace(dataset_rank, dataset_dimension, max_dataset_dimension);
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::create_dataset] Creating dataspace of size (";
for (hsize_t i=0; i<dataset_rank; ++i) {
cout << dataset_dimension[i] << ",";
}
cout << ")" << endl;
#endif
H5::DSetCreatPropList dataset_properties;
dataset_properties.setChunk(dataset_rank, dataset_chunking);
H5::AtomType dataset_data_type(H5FormatUtils::get_dataset_data_type(data_type));
if (endianness == "big") {
dataset_data_type.setOrder(H5T_ORDER_BE);
} else {
dataset_data_type.setOrder(H5T_ORDER_LE);
}
auto dataset = file.createDataSet(dataset_name.c_str(), dataset_data_type, dataspace, dataset_properties);
datasets.insert({dataset_name, dataset});
datasets_current_size.insert({dataset_name, initial_dataset_size});
}
void H5Writer::create_file(hsize_t frame_chunk)
{
if (file.getId() != -1) {
close_file();
}
auto target_filename = filename;
// In case frames_per_file is > 0, the filename variable is a template for the filename.
if (frames_per_file) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::create_file] Frames per file is defined. Format " << filename << " with frame_chunk " << frame_chunk << endl;
#endif
// Space for 10 digits should be enough.
char buffer[filename.length() + 10];
sprintf(buffer, filename.c_str(), frame_chunk);
target_filename = string(buffer);
}
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[H5Writer::create_file] Creating filename " << target_filename << endl;
#endif
file = H5::H5File(target_filename.c_str(), H5F_ACC_TRUNC);
// New file created - set this files chunk number.
current_frame_chunk = frame_chunk;
}
bool H5Writer::is_file_open() const
{
return (file.getId() != -1);
}
hsize_t H5Writer::prepare_storage_for_data(const string& dataset_name, const size_t data_index, const std::vector<size_t>& data_shape,
const string& data_type, const string& endianness)
{
hsize_t relative_data_index = data_index;
// Check if we have to create a new file.
if (frames_per_file) {
hsize_t frame_chunk = (data_index / frames_per_file) + 1;
// This frames does not go into this file.
if (frame_chunk != current_frame_chunk) {
create_file(frame_chunk);
}
// Make the data_index relative to this chunk (file).
relative_data_index = data_index - ((frame_chunk - 1) * frames_per_file);
}
// Open the file if needed.
if (!is_file_open()) {
create_file();
}
// Create the dataset if we don't have it yet.
if (datasets.find(dataset_name) == datasets.end()) {
create_dataset(dataset_name, data_shape, data_type, endianness);
}
hsize_t current_dataset_size = datasets_current_size.at(dataset_name);
// Expand the dataset if needed.
if (relative_data_index > current_dataset_size) {
auto dataset = datasets.at(dataset_name);
hsize_t new_dataset_size = H5FormatUtils::expand_dataset(dataset, relative_data_index, dataset_increase_step);
datasets_current_size[dataset_name] = new_dataset_size;
}
// Keep track of the max index in this file - needed for shrinking the dataset at the end.
if (relative_data_index > max_data_index) {
max_data_index = relative_data_index;
}
return relative_data_index;
}
H5::H5File& H5Writer::get_h5_file()
{
return file;
}
+45
View File
@@ -0,0 +1,45 @@
#ifndef H5WRITER_H
#define H5WRITER_H
#include <unordered_map>
#include <memory>
#include <vector>
#include <H5Cpp.h>
#include <chrono>
#include "date.h"
class H5Writer
{
// Initialized in constructor.
const std::string filename;
hsize_t frames_per_file;
hsize_t initial_dataset_size;
hsize_t dataset_increase_step = 0;
// State variables.
hsize_t max_data_index = 0;
hsize_t current_frame_chunk = 0;
H5::H5File file;
std::unordered_map<std::string, H5::DataSet> datasets;
std::unordered_map<std::string, hsize_t> datasets_current_size;
hsize_t prepare_storage_for_data(const std::string& dataset_name, const size_t data_index, const std::vector<size_t>& data_shape,
const std::string& data_type, const std::string& endianness);
void create_file(const hsize_t frame_chunk=0);
void create_dataset(const std::string& dataset_name, const std::vector<size_t>& data_shape,
const std::string& data_type, const std::string& endianness);
public:
H5Writer(const std::string& filename, hsize_t frames_per_file=0, hsize_t initial_dataset_size=1000, hsize_t dataset_increase_step=1000);
virtual ~H5Writer();
bool is_file_open() const;
void close_file();
void write_data(const std::string& dataset_name, const size_t data_index, const char* data, const std::vector<size_t>& data_shape,
const size_t data_bytes_size, const std::string& data_type, const std::string& endianness);
H5::H5File& get_h5_file();
};
#endif
+234
View File
@@ -0,0 +1,234 @@
#include <cstdlib>
#include <chrono>
#include <unistd.h>
#include <stdexcept>
#include <iostream>
#include <memory>
#include <boost/thread.hpp>
#include "RestApi.hpp"
#include "ProcessManager.hpp"
#include "config.hpp"
#include "H5Writer.hpp"
using namespace std;
void ProcessManager::run_writer(WriterManager& manager, const H5Format& format,
ZmqReceiver& receiver, uint16_t rest_port)
{
size_t n_slots = config::ring_buffer_n_slots;
RingBuffer ring_buffer(n_slots);
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::run_writer] Running writer";
cout << " and output_file " << manager.get_output_file();
cout << " and n_slots " << n_slots;
cout << endl;
#endif
boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer),
boost::ref(receiver), boost::ref(format));
boost::thread writer_thread(write_h5, boost::ref(manager),
boost::ref(format), boost::ref(ring_buffer), receiver.get_header_values_type());
RestApi::start_rest_api(manager, rest_port);
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::run_writer] Rest API stopped." << endl;
#endif
// In case SIGINT stopped the rest_api.
manager.stop();
receiver_thread.join();
writer_thread.join();
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::run_writer] Writer properly stopped." << endl;
#endif
}
void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer,
ZmqReceiver& receiver, const H5Format& format)
{
receiver.connect();
while (manager.is_running()) {
auto frame = receiver.receive();
// In case no message is available before the timeout, both pointers are NULL.
if (!frame.first){
continue;
}
auto frame_metadata = frame.first;
auto frame_data = frame.second;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::receive_zmq] Processing FrameMetadata";
cout << " with frame_index " << frame_metadata->frame_index;
cout << " and frame_shape [" << frame_metadata->frame_shape[0] << ", " << frame_metadata->frame_shape[1] << "]";
cout << " and endianness " << frame_metadata->endianness;
cout << " and type " << frame_metadata->type;
cout << " and frame_bytes_size " << frame_metadata->frame_bytes_size;
cout << "." << endl;
#endif
// Commit the frame to the buffer.
ring_buffer.write(frame_metadata, frame_data);
manager.received_frame(frame_metadata->frame_index);
}
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::receive_zmq] Receiver thread stopped." << endl;
#endif
}
void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer,
const shared_ptr<unordered_map<string, HeaderDataType>> header_values_type)
{
H5Writer writer(manager.get_output_file(), 0, config::initial_dataset_size, config::dataset_increase_step);
auto raw_frames_dataset_name = config::raw_image_dataset_name;
// Run until the running flag is set or the ring_buffer is empty.
while(manager.is_running() || !ring_buffer.is_empty()) {
if (ring_buffer.is_empty()) {
boost::this_thread::sleep_for(boost::chrono::milliseconds(config::ring_buffer_read_retry_interval));
continue;
}
const pair< shared_ptr<FrameMetadata>, char* > received_data = ring_buffer.read();
// NULL pointer means that the ringbuffer->read() timeouted. Faster than rising an exception.
if(!received_data.first) {
continue;
}
#ifdef PERF_OUTPUT
using namespace date;
auto start_time_frame = std::chrono::system_clock::now();
#endif
// Write image data.
writer.write_data(raw_frames_dataset_name,
received_data.first->frame_index,
received_data.second,
received_data.first->frame_shape,
received_data.first->frame_bytes_size,
received_data.first->type,
received_data.first->endianness);
#ifdef PERF_OUTPUT
using namespace date;
using namespace std::chrono;
auto frame_time_difference = std::chrono::system_clock::now() - start_time_frame;
auto frame_diff_ms = duration<float, milli>(frame_time_difference).count();
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::write_h5] Frame index ";
cout << received_data.first->frame_index << " written in " << frame_diff_ms << " ms." << endl;
#endif
ring_buffer.release(received_data.first->buffer_slot_index);
#ifdef PERF_OUTPUT
using namespace date;
auto start_time_metadata = std::chrono::system_clock::now();
#endif
// Write image metadata if mapping specified.
if (header_values_type) {
for (const auto& header_type : *header_values_type) {
auto& name = header_type.first;
auto& header_data_type = header_type.second;
auto value = received_data.first->header_values.at(name);
// Header data are fixed to scalars in little endian.
vector<size_t> value_shape = {header_data_type.value_shape};
writer.write_data(name,
received_data.first->frame_index,
value.get(),
value_shape,
header_data_type.value_bytes_size,
header_data_type.type,
header_data_type.endianness);
}
}
#ifdef PERF_OUTPUT
using namespace date;
using namespace std::chrono;
auto metadata_time_difference = std::chrono::system_clock::now() - start_time_metadata;
auto metadata_diff_ms = duration<float, milli>(metadata_time_difference).count();
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::write_h5] Frame metadata index ";
cout << received_data.first->frame_index << " written in " << metadata_diff_ms << " ms." << endl;
#endif
manager.written_frame(received_data.first->frame_index);
}
if (writer.is_file_open()) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::write] Writing file format." << endl;
#endif
// Wait until all parameters are set or writer is killed.
while (!manager.are_all_parameters_set() && !manager.is_killed()) {
boost::this_thread::sleep_for(boost::chrono::milliseconds(config::parameters_read_retry_interval));
}
// Need to check again if we have all parameters to write down the format.
if (manager.are_all_parameters_set()) {
const auto parameters = manager.get_parameters();
// Even if we can't write the format, lets try to preserve the data.
try {
H5FormatUtils::write_format(writer.get_h5_file(), format, parameters);
} catch (const runtime_error& ex) {
using namespace date;
std::cout << "[" << std::chrono::system_clock::now() << "]";
std::cout << "[ProcessManager::write] Error while trying to write file format: "<< ex.what() << endl;
}
}
}
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::write] Closing file " << manager.get_output_file() << endl;
#endif
writer.close_file();
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::write] Writer thread stopped." << endl;
#endif
// Exit when writer thread has closed the file.
exit(0);
}
+21
View File
@@ -0,0 +1,21 @@
#ifndef PROCESSMANAGER_H
#define PROCESSMANAGER_H
#include "WriterManager.hpp"
#include "H5Format.hpp"
#include "RingBuffer.hpp"
#include "ZmqReceiver.hpp"
#include <chrono>
#include "date.h"
namespace ProcessManager
{
void run_writer(WriterManager& manager, const H5Format& format, ZmqReceiver& receiver, uint16_t rest_port);
void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, ZmqReceiver& receiver, const H5Format& format);
void write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer,
const std::shared_ptr<std::unordered_map<std::string, HeaderDataType>> header_values_type);
};
#endif
+157
View File
@@ -0,0 +1,157 @@
#include <sstream>
#include <string>
#include "crow_all.h"
#include "RestApi.hpp"
using namespace std;
void RestApi::start_rest_api(WriterManager& writer_manager, uint16_t port)
{
#ifdef DEBUG_OUTPUT
cout << "[rest_interface::start_rest_api] Starting rest interface on port " << port << endl;
#endif
crow::SimpleApp app;
CROW_ROUTE(app, "/kill")([&](){
writer_manager.kill();
crow::json::wvalue result;
result["status"] = "killed";
app.stop();
return result;
});
CROW_ROUTE(app, "/stop")([&](){
writer_manager.stop();
crow::json::wvalue result;
result["status"] = writer_manager.get_status();
return result;
});
CROW_ROUTE (app, "/status") ([&](){
crow::json::wvalue result;
result["status"] = writer_manager.get_status();
return result;
});
CROW_ROUTE (app, "/statistics") ([&](){
crow::json::wvalue result;
for (const auto& item : writer_manager.get_statistics()) {
result[item.first] = item.second;
}
result["status"] = writer_manager.get_status();
return result;
});
CROW_ROUTE (app, "/parameters").methods("GET"_method, "POST"_method) ([&](const crow::request& req){
crow::json::wvalue result;
auto parameters_type = writer_manager.get_parameters_type();
if (req.method == "GET"_method) {
for (const auto& item : writer_manager.get_parameters()) {
auto parameter_name = item.first;
auto parameter_value = item.second;
try {
auto parameter_type = parameters_type.at(parameter_name);
if (parameter_type == NX_FLOAT || parameter_type == NX_NUMBER) {
result[parameter_name] = boost::any_cast<double>(parameter_value);
} else if (parameter_type == NX_CHAR || parameter_type == NXnote || parameter_type == NX_DATE_TIME) {
result[parameter_name] = boost::any_cast<string>(parameter_value);
} else if (parameter_type == NX_INT) {
result[parameter_name] = boost::any_cast<int>(parameter_value);
}
} catch (const boost::bad_any_cast& exception) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RestApi::parameters(get)] Cannot cast parameter " << parameter_name << " into specified type." << endl;
throw runtime_error(error_message.str());
} catch (const out_of_range& exception){
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RestApi::parameters(get)] No type mapping for parameter " << parameter_name << " in file format."<< endl;
throw runtime_error(error_message.str());
}
}
return result;
} else {
auto request_parameters = crow::json::load(req.body);
std::unordered_map<std::string, boost::any> new_parameters;
for (const auto& item : request_parameters) {
string parameter_name = item.key();
try{
auto parameter_type = parameters_type.at(parameter_name);
if (parameter_type == NX_FLOAT || parameter_type == NX_NUMBER) {
new_parameters[parameter_name] = double(item.d());
} else if (parameter_type == NX_INT) {
new_parameters[parameter_name] = int(item.i());
} else if (parameter_type == NX_CHAR) {
new_parameters[parameter_name] = string(item.s());
} else if (parameter_type == NX_DATE_TIME) {
new_parameters[parameter_name] = string(item.s());
} else {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RestApi::parameters(post)] No NX type mapping for parameter " << parameter_name << endl;
throw runtime_error(error_message.str());
}
} catch (const out_of_range& exception){
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RestApi::parameters(post)] No type mapping for received parameter " << parameter_name << " in file format."<< endl;
throw runtime_error(error_message.str());
} catch (const boost::bad_any_cast& exception) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RestApi::parameters(post)] Cannot cast parameter " << parameter_name << " into specified type." << endl;
throw runtime_error(error_message.str());
}
}
writer_manager.set_parameters(new_parameters);
result["message"] = "Parameters set.";
return result;
}
});
app.loglevel(crow::LogLevel::ERROR);
app.port(port).run();
}
+13
View File
@@ -0,0 +1,13 @@
#ifndef RESTAPI_H
#define RESTAPI_H
#include "WriterManager.hpp"
#include <chrono>
#include "date.h"
namespace RestApi
{
void start_rest_api(WriterManager& writer_manager, uint16_t port);
}
#endif
+238
View File
@@ -0,0 +1,238 @@
#include <stdexcept>
#include <sstream>
#include <cstring>
#include <iostream>
#include <cstddef>
#include "RingBuffer.hpp"
using namespace std;
RingBuffer::RingBuffer(size_t n_slots) : n_slots(n_slots), ringbuffer_slots(n_slots, 0)
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[RingBuffer::RingBuffer] Creating ring buffer with n_slots " << n_slots << endl;
#endif
}
RingBuffer::~RingBuffer()
{
// If the frame buffer is allocated, free it.
if (frame_data_buffer != NULL) {
free(frame_data_buffer);
frame_data_buffer = NULL;
}
}
void RingBuffer::initialize(size_t slot_size)
{
// Check if the ring buffer is already initialized.
if (frame_data_buffer) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RingBuffer::initialize] Ring buffer already initialized." << endl;
throw runtime_error(error_message.str());
}
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[RingBuffer::initialize] Initializing ring buffer with slot_size " << slot_size << endl;
#endif
this->write_index = 0;
this->slot_size = slot_size;
this->buffer_size = slot_size * n_slots;
this->frame_data_buffer = new char[buffer_size];
this->buffer_used_slots = 0;
this->ring_buffer_initialized = true;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[RingBuffer::initialize] Total buffer_size " << buffer_size << endl;
#endif
}
void RingBuffer::write(shared_ptr<FrameMetadata> frame_metadata, const char* data)
{
// Initialize the buffer on the first write.
if (!ring_buffer_initialized) {
initialize(frame_metadata->frame_bytes_size);
}
// All images must fit in the ring buffer.
if (frame_metadata->frame_bytes_size > slot_size) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RingBuffer::write] Received frame index "<< frame_metadata->frame_index;
error_message << " that is too large for ring buffer slot. ";
error_message << "Slot size " << slot_size << ", but frame bytes size " << frame_metadata->frame_bytes_size << endl;
throw runtime_error(error_message.str());
}
// Check and reserve slot in the buffer.
{
lock_guard<mutex> lock(ringbuffer_slots_mutex);
if (!ringbuffer_slots[write_index]) {
ringbuffer_slots[write_index] = 1;
// Set the write index in the FrameMetadata object.
frame_metadata->buffer_slot_index = write_index;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[RingBuffer::write] Ring buffer slot " << frame_metadata->buffer_slot_index << " reserved for frame_index ";
cout << frame_metadata->frame_index << endl;
#endif
// Increase and wrap the write index around if needed.
write_index = (write_index + 1) % n_slots;
// Keep track of the number of used slots.
buffer_used_slots++;
} else {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RingBuffer::write] Ring buffer is full. Collision at write_index = " << write_index << endl;
throw runtime_error(error_message.str());
}
}
// The slot is already reserved, no need for synchronization.
char* slot_memory_address = get_buffer_slot_address(frame_metadata->buffer_slot_index);
memcpy(slot_memory_address, data, frame_metadata->frame_bytes_size);
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[RingBuffer::write] Copied " << frame_metadata->frame_bytes_size << " frame bytes to buffer_slot_index ";
cout << frame_metadata->buffer_slot_index << endl;
#endif
// Add metadata header to the inter-thread communication queue.
{
lock_guard<mutex> lock(frame_metadata_queue_mutex);
frame_metadata_queue.push_back(frame_metadata);
}
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[RingBuffer::write] Metadata for frame_index " << frame_metadata->frame_index << " added to metadata queue." << endl;
#endif
}
char* RingBuffer::get_buffer_slot_address(size_t buffer_slot_index)
{
char* slot_memory_address = frame_data_buffer + (buffer_slot_index * slot_size);
// Check if the memory address is valid.
if (slot_memory_address > frame_data_buffer + buffer_size) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RingBuffer::get_buffer_slot_address] Calculated ring buffer address is out of bound for buffer_slot_index ";
error_message << buffer_slot_index << endl;
throw runtime_error(error_message.str());
}
return slot_memory_address;
}
pair<shared_ptr<FrameMetadata>, char*> RingBuffer::read()
{
shared_ptr<FrameMetadata> frame_metadata;
// Read data from the metadata queue.
{
lock_guard<mutex> lock(frame_metadata_queue_mutex);
// A NULL char* indicates that there are no available data in the ring buffer.
if (frame_metadata_queue.empty()) {
return {NULL, NULL};
}
frame_metadata = frame_metadata_queue.front();
frame_metadata_queue.pop_front();
}
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[RingBuffer::read] Received metadata for frame_index " << frame_metadata->frame_index << endl;
#endif
// Check if the references ring buffer slot is valid.
{
lock_guard<mutex> lock(ringbuffer_slots_mutex);
if (!ringbuffer_slots[frame_metadata->buffer_slot_index]) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RingBuffer::read] Ring buffer slot referenced in message header ";
error_message << frame_metadata->buffer_slot_index << " is empty." << endl;
throw runtime_error(error_message.str());
}
}
char* slot_memory_address = get_buffer_slot_address(frame_metadata->buffer_slot_index);
return {frame_metadata, slot_memory_address};
}
void RingBuffer::release(size_t buffer_slot_index)
{
// Cannot release a slot index that is out of range.
if (buffer_slot_index >= n_slots) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RingBuffer::release] Slot index to release " << buffer_slot_index;
error_message << " is out of range. Ring buffer n_slots = " << n_slots << endl;
throw runtime_error(error_message.str());
}
// Release the buffer slot.
ringbuffer_slots_mutex.lock();
if (ringbuffer_slots[buffer_slot_index]) {
ringbuffer_slots[buffer_slot_index] = 0;
// Keep track of the number of used slots.
buffer_used_slots--;
} else {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[RingBuffer::release] Cannot release empty ring buffer slot " << buffer_slot_index << endl;
throw runtime_error(error_message.str());
}
ringbuffer_slots_mutex.unlock();
}
bool RingBuffer::is_empty()
{
lock_guard<mutex> lock(ringbuffer_slots_mutex);
return buffer_used_slots == 0;
}
+61
View File
@@ -0,0 +1,61 @@
#ifndef RINGBUFFER_H
#define RINGBUFFER_H
#include <list>
#include <vector>
#include <map>
#include <mutex>
#include <memory>
#include <string>
#include <boost/any.hpp>
#include <chrono>
#include "date.h"
struct FrameMetadata
{
// Ring buffer needed data.
size_t buffer_slot_index;
size_t frame_bytes_size;
// Image header data.
uint64_t frame_index;
std::string endianness;
std::string type;
std::vector<size_t> frame_shape;
// Pass additional header values.
std::map<std::string, std::shared_ptr<char>> header_values;
};
class RingBuffer
{
// Initialized in constructor.
size_t n_slots = 0;
std::vector<bool> ringbuffer_slots;
// Set in initialize().
size_t slot_size = 0;
size_t buffer_size = 0;
char* frame_data_buffer = NULL;
size_t write_index = 0;
size_t buffer_used_slots = 0;
bool ring_buffer_initialized = false;
std::list< std::shared_ptr<FrameMetadata> > frame_metadata_queue;
std::mutex frame_metadata_queue_mutex;
std::mutex ringbuffer_slots_mutex;
char* get_buffer_slot_address(size_t buffer_slot_index);
public:
RingBuffer(size_t n_slots);
virtual ~RingBuffer();
void initialize(size_t slot_size);
void write(const std::shared_ptr<FrameMetadata> metadata, const char* data);
std::pair<std::shared_ptr<FrameMetadata>, char*> read();
void release(size_t buffer_slot_index);
bool is_empty();
};
#endif
+205
View File
@@ -0,0 +1,205 @@
#include <iostream>
#include <sstream>
#include "WriterManager.hpp"
using namespace std;
void writer_utils::set_process_id(int user_id)
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[writer_utils::set_process_id] Setting process uid to " << user_id << endl;
#endif
if (setgid(user_id)) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[writer_utils::set_process_id] Cannot set group_id to " << user_id << endl;
throw runtime_error(error_message.str());
}
if (setuid(user_id)) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[writer_utils::set_process_id] Cannot set user_id to " << user_id << endl;
throw runtime_error(error_message.str());
}
}
void writer_utils::create_destination_folder(const string& output_file)
{
auto file_separator_index = output_file.rfind('/');
if (file_separator_index != string::npos) {
string output_folder(output_file.substr(0, file_separator_index));
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[writer_utils::create_destination_folder] Creating folder " << output_folder << endl;
string create_folder_command("mkdir -p " + output_folder);
system(create_folder_command.c_str());
}
}
WriterManager::WriterManager(const unordered_map<string, DATA_TYPE>& parameters_type,
const string& output_file, uint64_t n_frames):
parameters_type(parameters_type), output_file(output_file), n_frames(n_frames),
running_flag(true), killed_flag(false), n_received_frames(0), n_written_frames(0), n_lost_frames(0)
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[WriterManager::WriterManager] Writer manager for n_frames " << n_frames << endl;
#endif
}
WriterManager::~WriterManager(){}
void WriterManager::stop()
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[WriterManager::stop] Stopping the writer manager." << endl;
#endif
running_flag = false;
}
void WriterManager::kill()
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[WriterManager::kills] Killing writer manager." << endl;
#endif
killed_flag = true;
stop();
}
string WriterManager::get_status()
{
if (running_flag) {
return "receiving";
} else if (n_received_frames.load() > n_written_frames) {
return "writing";
} else if (!are_all_parameters_set()) {
return "waiting for parameters";
} else {
return "finished";
}
}
string WriterManager::get_output_file() const
{
return output_file;
}
unordered_map<string, uint64_t> WriterManager::get_statistics() const
{
unordered_map<string, uint64_t> result = {{"n_received_frames", n_received_frames.load()},
{"n_written_frames", n_written_frames.load()},
{"n_lost_frames", n_lost_frames.load()},
{"total_expected_frames", n_frames}};
return result;
}
unordered_map<string, boost::any> WriterManager::get_parameters()
{
lock_guard<mutex> lock(parameters_mutex);
return parameters;
}
void WriterManager::set_parameters(const unordered_map<string, boost::any>& new_parameters)
{
lock_guard<mutex> lock(parameters_mutex);
#ifdef DEBUG_OUTPUT
stringstream output_message;
using namespace date;
output_message << "[" << std::chrono::system_clock::now() << "]";
output_message << "[WriterManager::set_parameters] Setting parameters: ";
#endif
for (const auto& parameter : new_parameters) {
auto& parameter_name = parameter.first;
auto& parameter_value = parameter.second;
parameters[parameter_name] = parameter_value;
#ifdef DEBUG_OUTPUT
output_message << parameter_name << ", ";
#endif
}
#ifdef DEBUG_OUTPUT
cout << output_message.str() << endl;
#endif
}
const unordered_map<string, DATA_TYPE>& WriterManager::get_parameters_type() const
{
return parameters_type;
}
bool WriterManager::is_running()
{
// Take into account n_frames only if it is <> 0.
if (n_frames && n_received_frames.load() >= n_frames) {
running_flag = false;
}
return running_flag.load();
}
bool WriterManager::is_killed() const
{
return killed_flag.load();
}
void WriterManager::received_frame(size_t frame_index)
{
n_received_frames++;
}
void WriterManager::written_frame(size_t frame_index)
{
n_written_frames++;
}
void WriterManager::lost_frame(size_t frame_index)
{
n_lost_frames++;
}
bool WriterManager::are_all_parameters_set()
{
lock_guard<mutex> lock(parameters_mutex);
for (const auto& parameter : parameters_type) {
const auto& parameter_name = parameter.first;
if (parameters.count(parameter_name) == 0) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[WriterManager::are_all_parameters_set] Parameter " << parameter_name << " not set." << endl;
#endif
return false;
}
}
return true;
}
+58
View File
@@ -0,0 +1,58 @@
#ifndef WRITERMANAGER_H
#define WRITERMANAGER_H
#include <unordered_map>
#include <string>
#include <atomic>
#include <mutex>
#include <boost/any.hpp>
#include <chrono>
#include "date.h"
#include "H5Format.hpp"
namespace writer_utils {
void set_process_id(int user_id);
void create_destination_folder(const std::string& output_file);
}
class WriterManager
{
std::unordered_map<std::string, boost::any> parameters = {};
std::mutex parameters_mutex;
// Initialize in constructor.
const std::unordered_map<std::string, DATA_TYPE>& parameters_type;
std::string output_file;
size_t n_frames;
std::atomic_bool running_flag;
std::atomic_bool killed_flag;
std::atomic<uint64_t> n_received_frames;
std::atomic<uint64_t> n_written_frames;
std::atomic<uint64_t> n_lost_frames;
public:
WriterManager(const std::unordered_map<std::string, DATA_TYPE>& parameters_type, const std::string& output_file, uint64_t n_frames=0);
virtual ~WriterManager();
void stop();
void kill();
bool is_running();
bool is_killed() const;
std::string get_status();
bool are_all_parameters_set();
std::string get_output_file() const;
const std::unordered_map<std::string, DATA_TYPE>& get_parameters_type() const;
std::unordered_map<std::string, boost::any> get_parameters();
void set_parameters(const std::unordered_map<std::string, boost::any>& new_parameters);
std::unordered_map<std::string, uint64_t> get_statistics() const;
void received_frame(size_t frame_index);
void written_frame(size_t frame_index);
void lost_frame(size_t frame_index);
};
#endif
+244
View File
@@ -0,0 +1,244 @@
#include <iostream>
#include <stdexcept>
#include "config.hpp"
#include "ZmqReceiver.hpp"
#include "H5Format.hpp"
using namespace std;
namespace pt = boost::property_tree;
HeaderDataType::HeaderDataType(const std::string& type, size_t shape) :
type(type), value_shape(shape), endianness("little") {
value_bytes_size = get_type_byte_size(type);
}
size_t get_type_byte_size(const string& type)
{
if (type == "uint8" || type== "int8") {
return 1;
} else if (type == "uint16" || type == "int16") {
return 2;
} else if (type == "uint32" || type == "int32" || type == "float32") {
return 4;
} else if (type == "uint64" || type == "int64" || type == "float64") {
return 8;
} else {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[ZmqReceiver::get_type_byte_size] Unsupported data type " << type << endl;
throw runtime_error(error_message.str());
}
}
ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout,
shared_ptr<unordered_map<string, HeaderDataType>> header_values_type) :
connect_address(connect_address), n_io_threads(n_io_threads),
receive_timeout(receive_timeout), receiver(NULL), header_values_type(header_values_type)
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ZmqReceiver::ZmqReceiver] Creating ZMQ receiver with";
cout << " connect_address " << connect_address;
cout << " n_io_threads " << n_io_threads;
cout << " receive_timeout " << receive_timeout;
cout << endl;
#endif
message_header = zmq::message_t(config::zmq_buffer_size_header);
message_data = zmq::message_t(config::zmq_buffer_size_data);
}
void ZmqReceiver::connect()
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ZmqReceiver::connect] Connecting to address " << connect_address;
cout << " with n_io_threads " << n_io_threads << endl;
#endif
context = make_shared<zmq::context_t>(n_io_threads);
receiver = make_shared<zmq::socket_t>(*context, ZMQ_PULL);
receiver->setsockopt(ZMQ_RCVTIMEO, receive_timeout);
receiver->connect(connect_address);
}
pair<shared_ptr<FrameMetadata>, char*> ZmqReceiver::receive()
{
if (!receiver) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[ZmqReceiver::receive] Cannot receive before connecting. ";
error_message << "Connect first." << endl;
throw runtime_error(error_message.str());
}
// Get the message header.
if (!receiver->recv(&message_header)){
return {NULL, NULL};
}
auto header_string = string(static_cast<char*>(message_header.data()), message_header.size());
auto frame_metadata = read_json_header(header_string);
// Get the message data.
if (!receiver->recv(&message_data)) {
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ZmqReceiver::receive] Error while reading from ZMQ. Frame index " << frame_metadata->frame_index << " lost.";
cout << " Trying to continue with the next frame." << endl;
return {NULL, NULL};
}
frame_metadata->frame_bytes_size = message_data.size();
return {frame_metadata, static_cast<char*>(message_data.data())};
}
shared_ptr<FrameMetadata> ZmqReceiver::read_json_header(const string& header)
{
try {
stringstream header_stream;
header_stream << header << endl;
pt::read_json(header_stream, json_header);
auto header_data = make_shared<FrameMetadata>();
header_data->frame_index = json_header.get<uint64_t>("frame");
for (const auto& item : json_header.get_child("shape")) {
header_data->frame_shape.push_back(item.second.get_value<size_t>());
}
// Array 1.0 specified little endian as the default encoding.
header_data->endianness = json_header.get("endianness", "little");
header_data->type = json_header.get<string>("type");
if (header_values_type) {
for (const auto& value_mapping : *header_values_type) {
const auto& name = value_mapping.first;
const auto& header_data_type = value_mapping.second;
auto value = get_value_from_json(json_header, name, header_data_type);
header_data->header_values.insert(
{name, value}
);
}
}
return header_data;
} catch (...) {
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ZmqReceiver::read_json_header] Error while interpreting the JSON header. Header string: " << header << endl;
cout << "Expected JSON header format: " << endl;
if (header_values_type) {
for (const auto& value_mapping : *header_values_type) {
cout << "\t" << value_mapping.first << ":" << value_mapping.second.type;
cout << "[" << value_mapping.second.value_shape << "]" << endl;
}
} else {
cout << "\tExpected header value types is a null pointer." << endl;
}
throw;
}
}
void copy_value_to_buffer(char* buffer, const size_t offset, const pt::ptree& json_value, const HeaderDataType& header_data_type)
{
if (header_data_type.type == "uint8") {
auto value = json_value.get_value<uint8_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "uint16") {
auto value = json_value.get_value<uint16_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "uint32") {
auto value = json_value.get_value<uint32_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "uint64") {
auto value = json_value.get_value<uint64_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "int8") {
auto value = json_value.get_value<int8_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "int16") {
auto value = json_value.get_value<int16_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "int32") {
auto value = json_value.get_value<int32_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "int64") {
auto value = json_value.get_value<int64_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "float32") {
auto value = json_value.get_value<float>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "float64") {
auto value = json_value.get_value<double>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else {
// We cannot really convert this attribute.
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[ZmqReceiver::get_value_from_json] Unsupported header data type " << header_data_type.type << endl;
throw runtime_error(error_message.str());
}
}
shared_ptr<char> get_value_from_json(const pt::ptree& json_header, const string& name, const HeaderDataType& header_data_type)
{
char* buffer = new char[header_data_type.value_bytes_size * header_data_type.value_shape];
if (header_data_type.value_shape == 1) {
copy_value_to_buffer(buffer, 0, json_header.get_child(name), header_data_type);
} else {
size_t index = 0;
for (const auto& item : json_header.get_child(name)) {
auto offset = index * header_data_type.value_bytes_size;
copy_value_to_buffer(buffer, offset, item.second, header_data_type);
++index;
}
}
return shared_ptr<char>(buffer, default_delete<char[]>());
}
const shared_ptr<unordered_map<string, HeaderDataType>> ZmqReceiver::get_header_values_type() const
{
return header_values_type;
}
+66
View File
@@ -0,0 +1,66 @@
#ifndef ZMQRECEIVER_H
#define ZMQRECEIVER_H
#include <string>
#include <memory>
#include <tuple>
#include <zmq.hpp>
#include <vector>
#include <memory>
#include <unordered_map>
#include <boost/property_tree/json_parser.hpp>
#include <chrono>
#include "date.h"
#include "RingBuffer.hpp"
struct HeaderDataType
{
std::string type;
size_t value_shape;
std::string endianness;
size_t value_bytes_size;
HeaderDataType(const std::string& type, size_t shape=1);
};
size_t get_type_byte_size(const std::string& type);
void copy_value_to_buffer(const char* buffer, size_t offset, const boost::property_tree::ptree& json_value,
const HeaderDataType& header_data_type);
std::shared_ptr<char> get_value_from_json(const boost::property_tree::ptree& json_header,
const std::string& name, const HeaderDataType& header_data_type);
class ZmqReceiver
{
const std::string connect_address;
const int n_io_threads;
const int receive_timeout;
std::shared_ptr<zmq::socket_t> receiver = NULL;
std::shared_ptr<zmq::context_t> context = NULL;
zmq::message_t message_header;
zmq::message_t message_data;
boost::property_tree::ptree json_header;
std::shared_ptr<std::unordered_map<std::string, HeaderDataType>> header_values_type = NULL;
std::shared_ptr<FrameMetadata> read_json_header(const std::string& header);
public:
ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout,
std::shared_ptr<std::unordered_map<std::string, HeaderDataType>> header_values_type=NULL);
virtual ~ZmqReceiver(){};
void connect();
std::pair<std::shared_ptr<FrameMetadata>, char*> receive();
const std::shared_ptr<std::unordered_map<std::string, HeaderDataType>> get_header_values_type() const;
};
#endif
+28
View File
@@ -0,0 +1,28 @@
#include "config.hpp"
namespace config {
// Number of receiving threads. Roughly 1 thread / (GB/s)
int zmq_n_io_threads = 1;
int zmq_receive_timeout = 100;
// JSON header buffer size - 1MB.
int zmq_buffer_size_header = 1024 * 1024 * 1;
// Data message buffer size - 10MB.
int zmq_buffer_size_data = 1024 * 1024 * 10;
// Ring buffer config.
// Allow for a couple of seconds (file creation might be slow).
size_t ring_buffer_n_slots = 1000;
// Delay before trying again to get data from the ring buffer.
uint32_t ring_buffer_read_retry_interval = 5;
std::string raw_image_dataset_name = "raw_data";
// By how much to enlarge a dataset when a resizing is needed.
hsize_t dataset_increase_step = 1000;
// To which value to initialize a dataset size.
hsize_t initial_dataset_size = 1000;
// Delay in between attempts to see if the requred parameters were passed over the REST api.
uint32_t parameters_read_retry_interval = 300;
}
+24
View File
@@ -0,0 +1,24 @@
#include <H5Cpp.h>
#include <string>
#ifndef CONFIG_H
#define CONFIG_H
namespace config
{
extern int zmq_n_io_threads;
extern int zmq_receive_timeout;
extern int zmq_buffer_size_header;
extern int zmq_buffer_size_data;
extern size_t ring_buffer_n_slots;
extern uint32_t ring_buffer_read_retry_interval;
extern hsize_t dataset_increase_step;
extern hsize_t initial_dataset_size;
extern std::string raw_image_dataset_name;
extern uint32_t parameters_read_retry_interval;
}
#endif