mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-06-07 16:48:41 +02:00
Additional header data not mandatory
This commit is contained in:
+23
-19
@@ -29,7 +29,7 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format,
|
||||
boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer),
|
||||
boost::ref(receiver), boost::ref(format));
|
||||
boost::thread writer_thread(write_h5, boost::ref(manager),
|
||||
boost::ref(format), boost::ref(ring_buffer), boost::ref(*receiver.get_header_values_type()));
|
||||
boost::ref(format), boost::ref(ring_buffer), receiver.get_header_values_type());
|
||||
|
||||
RestApi::start_rest_api(manager, rest_port);
|
||||
|
||||
@@ -87,7 +87,7 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer
|
||||
}
|
||||
|
||||
void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer,
|
||||
const unordered_map<string, string>& header_values_type)
|
||||
const shared_ptr<unordered_map<string, string>> header_values_type)
|
||||
{
|
||||
H5Writer writer(manager.get_output_file(), 0, config::initial_dataset_size, config::dataset_increase_step);
|
||||
auto raw_frames_dataset_name = config::raw_image_dataset_name;
|
||||
@@ -133,27 +133,31 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
|
||||
|
||||
ring_buffer.release(received_data.first->buffer_slot_index);
|
||||
|
||||
// Write image metadata.
|
||||
for (const auto& header_type : header_values_type) {
|
||||
auto& name = header_type.first;
|
||||
auto& type = header_type.second;
|
||||
// Write image metadata if mapping specified.
|
||||
if (header_values_type) {
|
||||
|
||||
auto value = received_data.first->header_values.at(name);
|
||||
for (const auto& header_type : *header_values_type) {
|
||||
|
||||
// Header data are fixed to scalars in little endian.
|
||||
vector<size_t> value_shape = {1};
|
||||
auto endianness = "little";
|
||||
auto value_bytes_size = type_to_size_mapping.at(type);
|
||||
auto& name = header_type.first;
|
||||
auto& type = header_type.second;
|
||||
|
||||
writer.write_data(name,
|
||||
received_data.first->frame_index,
|
||||
value.get(),
|
||||
value_shape,
|
||||
value_bytes_size,
|
||||
type,
|
||||
endianness);
|
||||
auto value = received_data.first->header_values.at(name);
|
||||
|
||||
// Header data are fixed to scalars in little endian.
|
||||
vector<size_t> value_shape = {1};
|
||||
auto endianness = "little";
|
||||
auto value_bytes_size = type_to_size_mapping.at(type);
|
||||
|
||||
writer.write_data(name,
|
||||
received_data.first->frame_index,
|
||||
value.get(),
|
||||
value_shape,
|
||||
value_bytes_size,
|
||||
type,
|
||||
endianness);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
manager.written_frame(received_data.first->frame_index);
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ namespace ProcessManager
|
||||
void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, ZmqReceiver& receiver, const H5Format& format);
|
||||
|
||||
void write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer,
|
||||
const std::unordered_map<std::string, std::string>& header_values_type);
|
||||
const std::shared_ptr<std::unordered_map<std::string, std::string>> header_values_type);
|
||||
};
|
||||
|
||||
#endif
|
||||
+16
-16
@@ -8,8 +8,11 @@
|
||||
using namespace std;
|
||||
namespace pt = boost::property_tree;
|
||||
|
||||
ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout) :
|
||||
connect_address(connect_address), n_io_threads(n_io_threads), receive_timeout(receive_timeout), receiver(NULL)
|
||||
ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout,
|
||||
shared_ptr<unordered_map<string, string>> header_values_type) :
|
||||
connect_address(connect_address), n_io_threads(n_io_threads),
|
||||
receive_timeout(receive_timeout), receiver(NULL), header_values_type(header_values_type)
|
||||
|
||||
{
|
||||
#ifdef DEBUG_OUTPUT
|
||||
cout << "[ZmqReceiver::ZmqReceiver] Creating ZMQ receiver with";
|
||||
@@ -21,11 +24,6 @@ ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_thre
|
||||
|
||||
message_header = zmq::message_t(config::zmq_buffer_size_header);
|
||||
message_data = zmq::message_t(config::zmq_buffer_size_data);
|
||||
|
||||
header_values_type.reset(
|
||||
new unordered_map<string, string>({
|
||||
{"pulse_id", "uint64"},
|
||||
}));
|
||||
}
|
||||
|
||||
void ZmqReceiver::connect()
|
||||
@@ -133,18 +131,20 @@ shared_ptr<FrameMetadata> ZmqReceiver::read_json_header(const string& header)
|
||||
|
||||
header_data->type = json_header.get<string>("type");
|
||||
|
||||
for (const auto& value_mapping : *header_values_type) {
|
||||
|
||||
const auto& name = value_mapping.first;
|
||||
const auto& type = value_mapping.second;
|
||||
if (header_values_type) {
|
||||
for (const auto& value_mapping : *header_values_type) {
|
||||
|
||||
const auto& name = value_mapping.first;
|
||||
const auto& type = value_mapping.second;
|
||||
|
||||
auto value = get_value_from_json(json_header, name, type);
|
||||
auto value = get_value_from_json(json_header, name, type);
|
||||
|
||||
header_data->header_values.insert(
|
||||
{name, value}
|
||||
);
|
||||
header_data->header_values.insert(
|
||||
{name, value}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return header_data;
|
||||
}
|
||||
|
||||
|
||||
+2
-1
@@ -31,7 +31,8 @@ class ZmqReceiver
|
||||
const std::string& name, const std::string& type);
|
||||
|
||||
public:
|
||||
ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout);
|
||||
ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout,
|
||||
std::shared_ptr<std::unordered_map<std::string, std::string>> header_values_type=NULL);
|
||||
|
||||
virtual ~ZmqReceiver(){};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user