mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-06-07 20:28:42 +02:00
Create src folder
This commit is contained in:
@@ -0,0 +1,224 @@
|
||||
#include "H5ChunkedWriter.hpp"
|
||||
|
||||
extern "C"
|
||||
{
|
||||
#include "H5DOpublic.h"
|
||||
}
|
||||
|
||||
hsize_t expand_dataset(const H5::DataSet& dataset, hsize_t frame_index, hsize_t dataset_increase_step)
|
||||
{
|
||||
hsize_t dataset_rank = 3;
|
||||
hsize_t dataset_dimension[dataset_rank];
|
||||
|
||||
dataset.getSpace().getSimpleExtentDims(dataset_dimension);
|
||||
dataset_dimension[0] = frame_index + dataset_increase_step;
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Expanding dataspace to size (";
|
||||
for (hsize_t i=0; i<dataset_rank; ++i) {
|
||||
std::cout << dataset_dimension[i] << ",";
|
||||
}
|
||||
std::cout << ")" << std::endl;
|
||||
#endif
|
||||
|
||||
dataset.extend(dataset_dimension);
|
||||
|
||||
return dataset_dimension[0];
|
||||
}
|
||||
|
||||
void compact_dataset(const H5::DataSet& dataset, hsize_t max_frame_index)
|
||||
{
|
||||
hsize_t dataset_rank = 3;
|
||||
hsize_t dataset_dimension[dataset_rank];
|
||||
|
||||
dataset.getSpace().getSimpleExtentDims(dataset_dimension);
|
||||
dataset_dimension[0] = max_frame_index + 1;
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Compacting dataspace to size (";
|
||||
for (hsize_t i=0; i<dataset_rank; ++i) {
|
||||
std::cout << dataset_dimension[i] << ",";
|
||||
}
|
||||
std::cout << ")" << std::endl;
|
||||
#endif
|
||||
|
||||
dataset.extend(dataset_dimension);
|
||||
}
|
||||
|
||||
HDF5ChunkedWriter::HDF5ChunkedWriter(const std::string filename, const std::string dataset_name, hsize_t frames_per_file, hsize_t initial_dataset_size)
|
||||
{
|
||||
this->filename = filename;
|
||||
this->dataset_name = dataset_name;
|
||||
this->frames_per_file = frames_per_file;
|
||||
this->initial_dataset_size = initial_dataset_size;
|
||||
}
|
||||
|
||||
HDF5ChunkedWriter::~HDF5ChunkedWriter()
|
||||
{
|
||||
close_file();
|
||||
}
|
||||
|
||||
void HDF5ChunkedWriter::close_file()
|
||||
{
|
||||
if (file.getId() == -1) {
|
||||
#ifdef DEBUG
|
||||
std::cout << "Trying to close an already closed file." << std::endl;
|
||||
#endif
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Closing file." << std::endl;
|
||||
#endif
|
||||
|
||||
compact_dataset(dataset, max_frame_index);
|
||||
|
||||
hsize_t min_frame_in_dataset = 0;
|
||||
if (frames_per_file) {
|
||||
min_frame_in_dataset = (current_frame_chunk - 1) * frames_per_file;
|
||||
}
|
||||
|
||||
// max_frame_index is relative to the current file.
|
||||
hsize_t max_frame_in_dataset = max_frame_index + min_frame_in_dataset;
|
||||
|
||||
// Frame indexing starts at 1 (for some reason).
|
||||
auto image_nr_low = min_frame_in_dataset + 1;
|
||||
auto image_nr_high = max_frame_in_dataset + 1;
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Setting dataset attribute image_nr_low=" << image_nr_low << " and image_nr_high=" << image_nr_high << std::endl;
|
||||
#endif
|
||||
|
||||
// H5::IntType int_type(H5::PredType::NATIVE_UINT32);
|
||||
// H5::DataSpace att_space(H5S_SCALAR);
|
||||
// auto low_index_attribute = dataset.createAttribute("image_nr_low", int_type, att_space);
|
||||
|
||||
// H5::IntType int_type(H5::PredType::NATIVE_UINT32);
|
||||
// H5::DataSpace att_space(H5S_SCALAR);
|
||||
// auto high_index_attribute = dataset.createAttribute("image_nr_high", int_type, att_space);
|
||||
|
||||
// TODO: Populate additional h5 attributes.
|
||||
|
||||
// Cleanup.
|
||||
file.close();
|
||||
current_frame_chunk = 0;
|
||||
current_dataset_size = 0;
|
||||
max_frame_index = 0;
|
||||
}
|
||||
|
||||
void HDF5ChunkedWriter::write_data(size_t frame_index, size_t* frame_shape, size_t data_bytes_size, char* data)
|
||||
{
|
||||
// Define the ofset of the currently received image in the file.
|
||||
hsize_t relative_frame_index = prepare_storage_for_frame(frame_index, frame_shape);
|
||||
|
||||
// Define where to write values in the dataset.
|
||||
const hsize_t offset[] = {relative_frame_index, 0, 0};
|
||||
uint32_t filters = 0;
|
||||
|
||||
if( H5DOwrite_chunk(dataset.getId(), H5P_DEFAULT, filters, offset, data_bytes_size, data) )
|
||||
{
|
||||
std::stringstream error_message;
|
||||
error_message << "Error while writing chunk to file at offset " << relative_frame_index << "." << std::endl;
|
||||
|
||||
throw std::invalid_argument( error_message.str() );
|
||||
}
|
||||
}
|
||||
|
||||
void HDF5ChunkedWriter::create_file(size_t* frame_shape, hsize_t frame_chunk) {
|
||||
|
||||
if (file.getId() != -1) {
|
||||
close_file();
|
||||
}
|
||||
|
||||
auto target_filename = filename;
|
||||
|
||||
// In case frames_per_file is > 0, the filename variable is a template for the filename.
|
||||
if (frames_per_file) {
|
||||
#ifdef DEBUG
|
||||
std::cout << "Frames per file is defined. Format " << filename << " with frame_chunk " << frame_chunk << std::endl;
|
||||
#endif
|
||||
|
||||
// Space for 10 digits should be enough.
|
||||
char buffer[filename.length() + 10];
|
||||
|
||||
sprintf(buffer, filename.c_str(), frame_chunk);
|
||||
target_filename = std::string(buffer);
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Creating filename " << target_filename << std::endl;
|
||||
#endif
|
||||
|
||||
// TODO: Create folder if it does not exist.
|
||||
|
||||
file = H5::H5File( target_filename.c_str(), H5F_ACC_TRUNC );
|
||||
|
||||
H5::IntType data_type( config::dataset_type );
|
||||
data_type.setOrder( config::dataset_byte_order );
|
||||
|
||||
hsize_t dataset_rank = 3;
|
||||
const hsize_t dataset_dimension[] = {initial_dataset_size, frame_shape[0], frame_shape[1]};
|
||||
const hsize_t max_dataset_dimension[] = {H5S_UNLIMITED, frame_shape[0], frame_shape[1]};
|
||||
H5::DataSpace dataspace(dataset_rank, dataset_dimension, max_dataset_dimension);
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Creating dataspace of size (";
|
||||
for (hsize_t i=0; i<dataset_rank; ++i) {
|
||||
std::cout << dataset_dimension[i] << ",";
|
||||
}
|
||||
std::cout << ")" << std::endl;
|
||||
#endif
|
||||
|
||||
// Set chunking to single image.
|
||||
H5::DSetCreatPropList dataset_properties;
|
||||
const hsize_t dataset_chunking[] = {1, frame_shape[0], frame_shape[1]};
|
||||
dataset_properties.setChunk(dataset_rank, dataset_chunking);
|
||||
|
||||
// Take into account initial size, set chunking.
|
||||
dataset = file.createDataSet(dataset_name.c_str(), data_type, dataspace, dataset_properties);
|
||||
|
||||
// New file created - update global values.
|
||||
current_frame_chunk = frame_chunk;
|
||||
current_dataset_size = initial_dataset_size;
|
||||
|
||||
}
|
||||
|
||||
hsize_t HDF5ChunkedWriter::prepare_storage_for_frame(size_t frame_index, size_t* frame_shape) {
|
||||
|
||||
hsize_t relative_frame_index = frame_index;
|
||||
|
||||
// Check if we have to create a new file.
|
||||
if (frames_per_file) {
|
||||
hsize_t frame_chunk = (frame_index / frames_per_file) + 1;
|
||||
|
||||
// This frames does not go into this file.
|
||||
if (frame_chunk != current_frame_chunk) {
|
||||
create_file(frame_shape, frame_chunk);
|
||||
}
|
||||
|
||||
// Make the frame_index relative to this chunk (file).
|
||||
relative_frame_index = frame_index - ((frame_chunk - 1) * frames_per_file);
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Received frame index " << frame_index << " and processed as relative frame index " << relative_frame_index << std::endl;
|
||||
#endif
|
||||
|
||||
// Open the file if needed.
|
||||
if (file.getId() == -1) {
|
||||
create_file(frame_shape);
|
||||
}
|
||||
|
||||
// Expand the dataset if needed.
|
||||
if (relative_frame_index > current_dataset_size) {
|
||||
current_dataset_size = expand_dataset(dataset, relative_frame_index, config::dataset_increase_step);
|
||||
}
|
||||
|
||||
// Keep track of the max index in this file - needed for shrinking the dataset at the end.
|
||||
if (relative_frame_index > max_frame_index) {
|
||||
max_frame_index = relative_frame_index;
|
||||
}
|
||||
|
||||
return relative_frame_index;
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
#ifndef H5CHUNKEDWRITER_H
|
||||
#define H5CHUNKEDWRITER_H
|
||||
|
||||
#include <H5Cpp.h>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <iostream>
|
||||
#include "config.hpp"
|
||||
|
||||
hsize_t expand_dataset(const H5::DataSet& dataset, hsize_t frame_index, hsize_t dataset_increase_step);
|
||||
|
||||
void compact_dataset(const H5::DataSet& dataset, hsize_t max_frame_index);
|
||||
|
||||
class HDF5ChunkedWriter
|
||||
{
|
||||
// Initialized in constructor.
|
||||
std::string filename;
|
||||
std::string dataset_name;
|
||||
hsize_t frames_per_file;
|
||||
hsize_t initial_dataset_size;
|
||||
|
||||
// Configuration parameters.
|
||||
hsize_t dataset_increase_step = config::dataset_increase_step;
|
||||
|
||||
// State variables.
|
||||
hsize_t max_frame_index = 0;
|
||||
hsize_t current_dataset_size = 0;
|
||||
hsize_t current_frame_chunk = 0;
|
||||
|
||||
H5::H5File file;
|
||||
H5::DataSet dataset;
|
||||
|
||||
hsize_t prepare_storage_for_frame(size_t frame_index, size_t* frame_shape);
|
||||
void create_file(size_t* frame_shape, hsize_t frame_chunk=0);
|
||||
|
||||
public:
|
||||
HDF5ChunkedWriter(const std::string filename, const std::string dataset_name, hsize_t frames_per_file=0, hsize_t initial_dataset_size=config::initial_dataset_size);
|
||||
~HDF5ChunkedWriter();
|
||||
void close_file();
|
||||
void write_data(size_t frame_index, size_t* frame_shape, size_t data_bytes_size, char* data);
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,176 @@
|
||||
#include "RingBuffer.hpp"
|
||||
|
||||
RingBuffer::RingBuffer(size_t n_slots) : n_slots(n_slots), ringbuffer_slots(n_slots, 0){
|
||||
#ifdef DEBUG
|
||||
std::cout << "Creating ring buffer with n_slots " << n_slots << std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
RingBuffer::~RingBuffer() {
|
||||
// If the frame buffer is allocated, free it.
|
||||
if (frame_data_buffer != NULL) {
|
||||
free(frame_data_buffer);
|
||||
frame_data_buffer = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void RingBuffer::initialize(size_t slot_size)
|
||||
{
|
||||
// Check if the ring buffer is already initialized.
|
||||
if (frame_data_buffer != NULL) {
|
||||
std::stringstream error_message;
|
||||
error_message << "Ring buffer already initialized." << std::endl;
|
||||
|
||||
throw std::runtime_error(error_message.str());
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Initializing ring buffer with slot_size " << slot_size << std::endl;
|
||||
#endif
|
||||
|
||||
this->write_index = 0;
|
||||
this->slot_size = slot_size;
|
||||
this->buffer_size = slot_size * n_slots;
|
||||
this->frame_data_buffer = new char[buffer_size];
|
||||
}
|
||||
|
||||
void RingBuffer::write(FrameMetadata &metadata, char* data)
|
||||
{
|
||||
// All images must fit in the ring buffer.
|
||||
if (metadata.frame_bytes_size > slot_size) {
|
||||
std::stringstream error_message;
|
||||
error_message << "Received frame index "<< metadata.frame_index <<" that is too large for ring buffer slot." << std::endl;
|
||||
error_message << "RingBuffer slot size " << slot_size << ", but frame bytes size " << metadata.frame_bytes_size << std::endl;
|
||||
|
||||
throw std::runtime_error(error_message.str());
|
||||
}
|
||||
|
||||
// Check and reserve slot in the buffer.
|
||||
ringbuffer_slots_mutex.lock();
|
||||
|
||||
if (!ringbuffer_slots[write_index]) {
|
||||
ringbuffer_slots[write_index] = 1;
|
||||
|
||||
// Set the write index in the FrameMetadata object.
|
||||
metadata.buffer_slot_index = write_index;
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Ring buffer slot " << metadata.buffer_slot_index << " reserved for frame_index " << metadata.frame_index << std::endl;
|
||||
#endif
|
||||
|
||||
// Increase and wrap the write index around if needed.
|
||||
write_index = (write_index + 1) % n_slots;
|
||||
|
||||
} else {
|
||||
std::stringstream error_message;
|
||||
error_message << "Ring buffer is full. Collision at write_index = " << write_index << std::endl;
|
||||
|
||||
throw std::runtime_error(error_message.str());
|
||||
}
|
||||
|
||||
ringbuffer_slots_mutex.unlock();
|
||||
|
||||
// Write to the buffer. The slot is already reserved.
|
||||
char* slot_memory_address = get_buffer_slot_address(metadata.buffer_slot_index);
|
||||
std::memcpy(slot_memory_address, data, metadata.frame_bytes_size);
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Copied " << metadata.frame_bytes_size << " frame bytes to buffer_slot_index " << metadata.buffer_slot_index << std::endl;
|
||||
#endif
|
||||
|
||||
frame_metadata_queue_mutex.lock();
|
||||
|
||||
// Send the metadata header to writing process.
|
||||
frame_metadata_queue.push_back(metadata);
|
||||
|
||||
frame_metadata_queue_mutex.unlock();
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Metadata for frame_index " << metadata.frame_index << " added to metadata queue." << std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
char* RingBuffer::get_buffer_slot_address(size_t buffer_slot_index) {
|
||||
char* slot_memory_address = frame_data_buffer + (buffer_slot_index * slot_size);
|
||||
|
||||
// Check if the memory address is valid.
|
||||
if (slot_memory_address > frame_data_buffer + buffer_size) {
|
||||
std::stringstream error_message;
|
||||
error_message << "Calculated ring buffer address is out of bound for buffer_slot_index " << buffer_slot_index << std::endl;
|
||||
|
||||
throw std::runtime_error(error_message.str());
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "For buffer_slot_index " << buffer_slot_index << " the calculated memory address is " << long(slot_memory_address) << std::endl;
|
||||
#endif
|
||||
|
||||
return slot_memory_address;
|
||||
}
|
||||
|
||||
std::pair<FrameMetadata, char*> RingBuffer::read()
|
||||
{
|
||||
FrameMetadata frame_metadata;
|
||||
|
||||
while (1) {
|
||||
|
||||
frame_metadata_queue_mutex.lock();
|
||||
|
||||
if (frame_metadata_queue.empty()) {
|
||||
frame_metadata_queue_mutex.unlock();
|
||||
continue;
|
||||
|
||||
} else {
|
||||
frame_metadata = frame_metadata_queue.front();
|
||||
frame_metadata_queue.pop_front();
|
||||
|
||||
frame_metadata_queue_mutex.unlock();
|
||||
|
||||
#ifdef DEBUG
|
||||
std::cout << "Received metadata for frame_index " << frame_metadata.frame_index << std::endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
// Check if the references ring buffer slot is valid.
|
||||
ringbuffer_slots_mutex.lock();
|
||||
|
||||
if (!ringbuffer_slots[frame_metadata.buffer_slot_index]) {
|
||||
std::stringstream error_message;
|
||||
error_message << "Ring buffer slot referenced in message header " << frame_metadata.buffer_slot_index << " is empty." << std::endl;
|
||||
|
||||
throw std::runtime_error(error_message.str());
|
||||
}
|
||||
|
||||
ringbuffer_slots_mutex.unlock();
|
||||
|
||||
// Memory address of frame in buffer.
|
||||
char* slot_memory_address = get_buffer_slot_address(frame_metadata.buffer_slot_index);
|
||||
|
||||
return {frame_metadata, slot_memory_address};
|
||||
};
|
||||
}
|
||||
|
||||
void RingBuffer::release(size_t buffer_slot_index) {
|
||||
// Cannot release a slot index that is out of range.
|
||||
if (buffer_slot_index >= n_slots) {
|
||||
std::stringstream error_message;
|
||||
error_message << "Slot index to release " << buffer_slot_index << " is out of range. Ring buffer n_slots = " << n_slots << std::endl;
|
||||
|
||||
throw std::runtime_error(error_message.str());
|
||||
}
|
||||
|
||||
// Release the buffer slot.
|
||||
ringbuffer_slots_mutex.lock();
|
||||
|
||||
if (ringbuffer_slots[buffer_slot_index]) {
|
||||
ringbuffer_slots[buffer_slot_index] = 0;
|
||||
|
||||
} else {
|
||||
std::stringstream error_message;
|
||||
error_message << "Cannot release empty ring buffer slot " << buffer_slot_index << std::endl;
|
||||
|
||||
throw std::runtime_error(error_message.str());
|
||||
}
|
||||
|
||||
ringbuffer_slots_mutex.unlock();
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
#ifndef RINGBUFFER_H
|
||||
#define RINGBUFFER_H
|
||||
|
||||
#include <cstddef>
|
||||
#include <list>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include <map>
|
||||
#include <stdexcept>
|
||||
#include <mutex>
|
||||
#include <sstream>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
|
||||
struct FrameMetadata
|
||||
{
|
||||
size_t buffer_slot_index = 0;
|
||||
|
||||
size_t frame_bytes_size = 0;
|
||||
size_t frame_index = 0;
|
||||
size_t frame_shape[2];
|
||||
|
||||
std::string frame_header;
|
||||
};
|
||||
|
||||
class RingBuffer
|
||||
{
|
||||
// Initialized in constructor.
|
||||
size_t n_slots;
|
||||
std::vector<bool> ringbuffer_slots;
|
||||
|
||||
// Set in initialize().
|
||||
size_t slot_size = 0;
|
||||
size_t buffer_size = 0;
|
||||
char* frame_data_buffer = NULL;
|
||||
size_t write_index = 0;
|
||||
|
||||
std::list<FrameMetadata> frame_metadata_queue;
|
||||
std::mutex frame_metadata_queue_mutex;
|
||||
std::mutex ringbuffer_slots_mutex;
|
||||
|
||||
char* get_buffer_slot_address(size_t buffer_slot_index);
|
||||
|
||||
public:
|
||||
RingBuffer(size_t n_slots);
|
||||
~RingBuffer();
|
||||
void initialize(size_t slot_size);
|
||||
void write(FrameMetadata &metadata, char* data);
|
||||
std::pair<FrameMetadata, char*> read();
|
||||
void release(size_t buffer_slot_index);
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,8 @@
|
||||
#include "config.hpp"
|
||||
|
||||
namespace config {
|
||||
H5::PredType dataset_type = H5::PredType::NATIVE_UINT8;
|
||||
H5T_order_t dataset_byte_order = H5T_ORDER_LE;
|
||||
hsize_t dataset_increase_step = 1000;
|
||||
hsize_t initial_dataset_size = 1000;
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
#include <H5Cpp.h>
|
||||
|
||||
#ifndef CONFIG
|
||||
#define CONFIG
|
||||
|
||||
namespace config
|
||||
{
|
||||
extern H5::PredType dataset_type;
|
||||
extern H5T_order_t dataset_byte_order;
|
||||
extern hsize_t dataset_increase_step;
|
||||
extern hsize_t initial_dataset_size;
|
||||
}
|
||||
|
||||
#endif
|
||||
Reference in New Issue
Block a user