diff --git a/src/ProcessManager.cpp b/src/ProcessManager.cpp index fe1e8df..7e94df1 100644 --- a/src/ProcessManager.cpp +++ b/src/ProcessManager.cpp @@ -29,7 +29,7 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format, boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), boost::ref(receiver), boost::ref(format)); boost::thread writer_thread(write_h5, boost::ref(manager), - boost::ref(format), boost::ref(ring_buffer), boost::ref(*receiver.get_header_values_type())); + boost::ref(format), boost::ref(ring_buffer), receiver.get_header_values_type()); RestApi::start_rest_api(manager, rest_port); @@ -87,7 +87,7 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer } void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer, - const unordered_map& header_values_type) + const shared_ptr> header_values_type) { H5Writer writer(manager.get_output_file(), 0, config::initial_dataset_size, config::dataset_increase_step); auto raw_frames_dataset_name = config::raw_image_dataset_name; @@ -133,27 +133,31 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri ring_buffer.release(received_data.first->buffer_slot_index); - // Write image metadata. - for (const auto& header_type : header_values_type) { - auto& name = header_type.first; - auto& type = header_type.second; + // Write image metadata if mapping specified. + if (header_values_type) { - auto value = received_data.first->header_values.at(name); + for (const auto& header_type : *header_values_type) { - // Header data are fixed to scalars in little endian. - vector value_shape = {1}; - auto endianness = "little"; - auto value_bytes_size = type_to_size_mapping.at(type); + auto& name = header_type.first; + auto& type = header_type.second; - writer.write_data(name, - received_data.first->frame_index, - value.get(), - value_shape, - value_bytes_size, - type, - endianness); + auto value = received_data.first->header_values.at(name); + + // Header data are fixed to scalars in little endian. + vector value_shape = {1}; + auto endianness = "little"; + auto value_bytes_size = type_to_size_mapping.at(type); + + writer.write_data(name, + received_data.first->frame_index, + value.get(), + value_shape, + value_bytes_size, + type, + endianness); + } } - + manager.written_frame(received_data.first->frame_index); } diff --git a/src/ProcessManager.hpp b/src/ProcessManager.hpp index 9ae83d9..20f1f59 100644 --- a/src/ProcessManager.hpp +++ b/src/ProcessManager.hpp @@ -13,7 +13,7 @@ namespace ProcessManager void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, ZmqReceiver& receiver, const H5Format& format); void write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer, - const std::unordered_map& header_values_type); + const std::shared_ptr> header_values_type); }; #endif \ No newline at end of file diff --git a/src/ZmqReceiver.cpp b/src/ZmqReceiver.cpp index 4c902a0..8d2056a 100644 --- a/src/ZmqReceiver.cpp +++ b/src/ZmqReceiver.cpp @@ -8,8 +8,11 @@ using namespace std; namespace pt = boost::property_tree; -ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout) : - connect_address(connect_address), n_io_threads(n_io_threads), receive_timeout(receive_timeout), receiver(NULL) +ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout, + shared_ptr> header_values_type) : + connect_address(connect_address), n_io_threads(n_io_threads), + receive_timeout(receive_timeout), receiver(NULL), header_values_type(header_values_type) + { #ifdef DEBUG_OUTPUT cout << "[ZmqReceiver::ZmqReceiver] Creating ZMQ receiver with"; @@ -21,11 +24,6 @@ ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_thre message_header = zmq::message_t(config::zmq_buffer_size_header); message_data = zmq::message_t(config::zmq_buffer_size_data); - - header_values_type.reset( - new unordered_map({ - {"pulse_id", "uint64"}, - })); } void ZmqReceiver::connect() @@ -133,18 +131,20 @@ shared_ptr ZmqReceiver::read_json_header(const string& header) header_data->type = json_header.get("type"); - for (const auto& value_mapping : *header_values_type) { - - const auto& name = value_mapping.first; - const auto& type = value_mapping.second; + if (header_values_type) { + for (const auto& value_mapping : *header_values_type) { + + const auto& name = value_mapping.first; + const auto& type = value_mapping.second; - auto value = get_value_from_json(json_header, name, type); + auto value = get_value_from_json(json_header, name, type); - header_data->header_values.insert( - {name, value} - ); + header_data->header_values.insert( + {name, value} + ); + } } - + return header_data; } diff --git a/src/ZmqReceiver.hpp b/src/ZmqReceiver.hpp index 7f37a95..e3008fa 100644 --- a/src/ZmqReceiver.hpp +++ b/src/ZmqReceiver.hpp @@ -31,7 +31,8 @@ class ZmqReceiver const std::string& name, const std::string& type); public: - ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout); + ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout, + std::shared_ptr> header_values_type=NULL); virtual ~ZmqReceiver(){};