mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-02 04:32:24 +02:00
Refactor JSON parsing
This commit is contained in:
+71
-26
@@ -3,7 +3,6 @@
|
||||
#include <chrono>
|
||||
#include <unistd.h>
|
||||
#include <stdexcept>
|
||||
#include <boost/property_tree/json_parser.hpp>
|
||||
#include <boost/thread.hpp>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
@@ -84,7 +83,7 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
|
||||
exit(0);
|
||||
}
|
||||
|
||||
void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer,
|
||||
void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, const H5Format& format,
|
||||
const string& connect_address, int n_io_threads, int receive_timeout)
|
||||
{
|
||||
zmq::context_t context(n_io_threads);
|
||||
@@ -103,29 +102,8 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer
|
||||
continue;
|
||||
}
|
||||
|
||||
// Parse JSON header.
|
||||
auto frame_metadata = make_shared<FrameMetadata>();
|
||||
|
||||
frame_metadata->header_string = string(static_cast<char*>(message_header.data()), message_header.size());
|
||||
|
||||
stringstream header_stream;
|
||||
header_stream << frame_metadata->header_string << endl;
|
||||
|
||||
pt::read_json(header_stream, json_header);
|
||||
|
||||
// Extract data from message header.
|
||||
frame_metadata->frame_index = json_header.get<uint64_t>("frame");
|
||||
|
||||
uint8_t index = 0;
|
||||
for (const auto& item : json_header.get_child("shape")) {
|
||||
frame_metadata->frame_shape[index] = item.second.get_value<size_t>();
|
||||
++index;
|
||||
}
|
||||
|
||||
// Array 1.0 specified little endian as the default encoding.
|
||||
frame_metadata->endianness = json_header.get("endianness", "little");
|
||||
|
||||
frame_metadata->type = json_header.get<string>("type");
|
||||
auto header_string = string(static_cast<char*>(message_header.data()), message_header.size());
|
||||
auto frame_metadata = read_json_header(json_header, header_string, format.get_header_value_type());
|
||||
|
||||
// Get the message data.
|
||||
if (!receiver.recv(&message_data)) {
|
||||
@@ -160,6 +138,73 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer
|
||||
#endif
|
||||
}
|
||||
|
||||
boost::any get_value_from_json(const pt::ptree& json_header, const string& value_name, const HEADER_DATA_TYPE data_type)
|
||||
{
|
||||
switch(data_type) {
|
||||
case UINT8 :
|
||||
return json_header.get<uint8_t>(value_name);
|
||||
case UINT16 :
|
||||
return json_header.get<uint16_t>(value_name);
|
||||
case UINT32 :
|
||||
return json_header.get<uint32_t>(value_name);
|
||||
case UINT64 :
|
||||
return json_header.get<uint64_t>(value_name);
|
||||
case INT8 :
|
||||
return json_header.get<int8_t>(value_name);
|
||||
case INT16 :
|
||||
return json_header.get<int16_t>(value_name);
|
||||
case INT32 :
|
||||
return json_header.get<int32_t>(value_name);
|
||||
case INT64 :
|
||||
return json_header.get<int64_t>(value_name);
|
||||
case FLOAT32 :
|
||||
return json_header.get<float>(value_name);
|
||||
case FLOAT64 :
|
||||
return json_header.get<double>(value_name);
|
||||
default:
|
||||
stringstream error_message;
|
||||
error_message << "[ProcessManager::get_value_from_json] Unknown value type for header value " << value_name << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
}
|
||||
}
|
||||
|
||||
shared_ptr<FrameMetadata> ProcessManager::read_json_header(pt::ptree& json_header,
|
||||
const string& header, const map<string, HEADER_DATA_TYPE>& header_data_type)
|
||||
{
|
||||
stringstream header_stream;
|
||||
header_stream << header << endl;
|
||||
pt::read_json(header_stream, json_header);
|
||||
|
||||
auto header_data = make_shared<FrameMetadata>();
|
||||
|
||||
header_data->frame_index = json_header.get<uint64_t>("frame");
|
||||
|
||||
uint8_t index = 0;
|
||||
for (const auto& item : json_header.get_child("shape")) {
|
||||
header_data->frame_shape[index] = item.second.get_value<size_t>();
|
||||
++index;
|
||||
}
|
||||
|
||||
// Array 1.0 specified little endian as the default encoding.
|
||||
header_data->endianness = json_header.get("endianness", "little");
|
||||
|
||||
header_data->type = json_header.get<string>("type");
|
||||
|
||||
for (const auto& value_mapping : header_data_type) {
|
||||
|
||||
const auto& name = value_mapping.first;
|
||||
const auto& data_type = value_mapping.second;
|
||||
const boost::any& value = get_value_from_json(json_header, name, data_type);
|
||||
|
||||
header_data->header_values.insert(
|
||||
{name, value}
|
||||
);
|
||||
}
|
||||
|
||||
return header_data;
|
||||
}
|
||||
|
||||
void ProcessManager::run_writer(WriterManager& manager, const H5Format& format,
|
||||
const string& connect_address, uint16_t rest_port)
|
||||
{
|
||||
@@ -179,7 +224,7 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format,
|
||||
cout << endl;
|
||||
#endif
|
||||
|
||||
boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), connect_address, n_io_threads, receive_timeout);
|
||||
boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), boost::ref(format), connect_address, n_io_threads, receive_timeout);
|
||||
boost::thread writer_thread(write_h5, boost::ref(manager), boost::ref(format), boost::ref(ring_buffer));
|
||||
|
||||
RestApi::start_rest_api(manager, rest_port);
|
||||
|
||||
Reference in New Issue
Block a user