diff --git a/src/ProcessManager.cpp b/src/ProcessManager.cpp index 3815087..a6fc421 100644 --- a/src/ProcessManager.cpp +++ b/src/ProcessManager.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -84,7 +83,7 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri exit(0); } -void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, +void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, const H5Format& format, const string& connect_address, int n_io_threads, int receive_timeout) { zmq::context_t context(n_io_threads); @@ -103,29 +102,8 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer continue; } - // Parse JSON header. - auto frame_metadata = make_shared(); - - frame_metadata->header_string = string(static_cast(message_header.data()), message_header.size()); - - stringstream header_stream; - header_stream << frame_metadata->header_string << endl; - - pt::read_json(header_stream, json_header); - - // Extract data from message header. - frame_metadata->frame_index = json_header.get("frame"); - - uint8_t index = 0; - for (const auto& item : json_header.get_child("shape")) { - frame_metadata->frame_shape[index] = item.second.get_value(); - ++index; - } - - // Array 1.0 specified little endian as the default encoding. - frame_metadata->endianness = json_header.get("endianness", "little"); - - frame_metadata->type = json_header.get("type"); + auto header_string = string(static_cast(message_header.data()), message_header.size()); + auto frame_metadata = read_json_header(json_header, header_string, format.get_header_value_type()); // Get the message data. if (!receiver.recv(&message_data)) { @@ -160,6 +138,73 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer #endif } +boost::any get_value_from_json(const pt::ptree& json_header, const string& value_name, const HEADER_DATA_TYPE data_type) +{ + switch(data_type) { + case UINT8 : + return json_header.get(value_name); + case UINT16 : + return json_header.get(value_name); + case UINT32 : + return json_header.get(value_name); + case UINT64 : + return json_header.get(value_name); + case INT8 : + return json_header.get(value_name); + case INT16 : + return json_header.get(value_name); + case INT32 : + return json_header.get(value_name); + case INT64 : + return json_header.get(value_name); + case FLOAT32 : + return json_header.get(value_name); + case FLOAT64 : + return json_header.get(value_name); + default: + stringstream error_message; + error_message << "[ProcessManager::get_value_from_json] Unknown value type for header value " << value_name << endl; + + throw runtime_error(error_message.str()); + } +} + +shared_ptr ProcessManager::read_json_header(pt::ptree& json_header, + const string& header, const map& header_data_type) +{ + stringstream header_stream; + header_stream << header << endl; + pt::read_json(header_stream, json_header); + + auto header_data = make_shared(); + + header_data->frame_index = json_header.get("frame"); + + uint8_t index = 0; + for (const auto& item : json_header.get_child("shape")) { + header_data->frame_shape[index] = item.second.get_value(); + ++index; + } + + // Array 1.0 specified little endian as the default encoding. + header_data->endianness = json_header.get("endianness", "little"); + + header_data->type = json_header.get("type"); + + for (const auto& value_mapping : header_data_type) { + + const auto& name = value_mapping.first; + const auto& data_type = value_mapping.second; + const boost::any& value = get_value_from_json(json_header, name, data_type); + + header_data->header_values.insert( + {name, value} + ); + } + + return header_data; +} + void ProcessManager::run_writer(WriterManager& manager, const H5Format& format, const string& connect_address, uint16_t rest_port) { @@ -179,7 +224,7 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format, cout << endl; #endif - boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), connect_address, n_io_threads, receive_timeout); + boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), boost::ref(format), connect_address, n_io_threads, receive_timeout); boost::thread writer_thread(write_h5, boost::ref(manager), boost::ref(format), boost::ref(ring_buffer)); RestApi::start_rest_api(manager, rest_port); diff --git a/src/ProcessManager.hpp b/src/ProcessManager.hpp index 108c651..1d4d678 100644 --- a/src/ProcessManager.hpp +++ b/src/ProcessManager.hpp @@ -1,4 +1,6 @@ #include +#include +#include #include "WriterManager.hpp" #include "H5Format.hpp" @@ -7,7 +9,9 @@ namespace ProcessManager { void run_writer(WriterManager& manager, const H5Format& format, const std::string& connect_address, uint16_t rest_port); - void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, const std::string& connect_address, - int n_io_threads=1, int receive_timeout=-1); + void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, const H5Format& format, + const std::string& connect_address, int n_io_threads=1, int receive_timeout=-1); void write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer); + std::shared_ptr read_json_header(boost::property_tree::ptree& json_header, const std::string& header, + const std::map& header_data_type); }; \ No newline at end of file