mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-03 05:34:12 +02:00
Improve style of ZMQ receiver
This commit is contained in:
@@ -9,13 +9,21 @@ using namespace std;
|
||||
namespace pt = boost::property_tree;
|
||||
|
||||
HeaderDataType::HeaderDataType(const std::string& type, size_t shape) :
|
||||
type(type), value_shape(shape), endianness("little"), is_array(true) {
|
||||
value_bytes_size = get_type_byte_size(type);
|
||||
type(type),
|
||||
value_shape(shape),
|
||||
endianness("little"),
|
||||
is_array(true)
|
||||
{
|
||||
value_bytes_size = get_type_byte_size(type);
|
||||
}
|
||||
|
||||
HeaderDataType::HeaderDataType(const std::string& type) :
|
||||
type(type), value_shape(1), endianness("little"), is_array(false) {
|
||||
value_bytes_size = get_type_byte_size(type);
|
||||
type(type),
|
||||
value_shape(1),
|
||||
endianness("little"),
|
||||
is_array(false)
|
||||
{
|
||||
value_bytes_size = get_type_byte_size(type);
|
||||
}
|
||||
|
||||
size_t get_type_byte_size(const string& type)
|
||||
@@ -36,17 +44,23 @@ size_t get_type_byte_size(const string& type)
|
||||
stringstream error_message;
|
||||
using namespace date;
|
||||
error_message << "[" << std::chrono::system_clock::now() << "]";
|
||||
error_message << "[ZmqReceiver::get_type_byte_size] Unsupported data type " << type << endl;
|
||||
error_message << "[ZmqReceiver::get_type_byte_size]";
|
||||
error_message << " Unsupported data type " << type << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
}
|
||||
}
|
||||
|
||||
ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout,
|
||||
shared_ptr<unordered_map<string, HeaderDataType>> header_values_type) :
|
||||
connect_address(connect_address), n_io_threads(n_io_threads),
|
||||
receive_timeout(receive_timeout), receiver(NULL), header_values_type(header_values_type)
|
||||
|
||||
ZmqReceiver::ZmqReceiver(
|
||||
const std::string& connect_address,
|
||||
const int n_io_threads,
|
||||
const int receive_timeout,
|
||||
shared_ptr<unordered_map<string, HeaderDataType>> header_values_type) :
|
||||
connect_address(connect_address),
|
||||
n_io_threads(n_io_threads),
|
||||
receive_timeout(receive_timeout),
|
||||
receiver(NULL),
|
||||
header_values_type(header_values_type)
|
||||
{
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
@@ -64,11 +78,14 @@ ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_thre
|
||||
|
||||
|
||||
ZmqReceiver::ZmqReceiver(const ZmqReceiver& other) :
|
||||
ZmqReceiver(other.connect_address, other.n_io_threads, other.receive_timeout, other.header_values_type)
|
||||
ZmqReceiver(
|
||||
other.connect_address,
|
||||
other.n_io_threads,
|
||||
other.receive_timeout,
|
||||
other.header_values_type)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
void ZmqReceiver::connect()
|
||||
{
|
||||
#ifdef DEBUG_OUTPUT
|
||||
@@ -91,8 +108,9 @@ pair<shared_ptr<FrameMetadata>, char*> ZmqReceiver::receive()
|
||||
stringstream error_message;
|
||||
using namespace date;
|
||||
error_message << "[" << std::chrono::system_clock::now() << "]";
|
||||
error_message << "[ZmqReceiver::receive] Cannot receive before connecting. ";
|
||||
error_message << "Connect first." << endl;
|
||||
error_message << "[ZmqReceiver::receive]";
|
||||
error_message << " Cannot receive before connecting.";
|
||||
error_message << " Connect first." << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
}
|
||||
@@ -102,14 +120,17 @@ pair<shared_ptr<FrameMetadata>, char*> ZmqReceiver::receive()
|
||||
return {NULL, NULL};
|
||||
}
|
||||
|
||||
auto header_string = string(static_cast<char*>(message_header.data()), message_header.size());
|
||||
auto header_string = string(static_cast<char*>(message_header.data()),
|
||||
message_header.size());
|
||||
auto frame_metadata = read_json_header(header_string);
|
||||
|
||||
// Get the message data.
|
||||
if (!receiver->recv(&message_data)) {
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[ZmqReceiver::receive] Error while reading from ZMQ. Frame index " << frame_metadata->frame_index << " lost.";
|
||||
cout << "[ZmqReceiver::receive] ";
|
||||
cout << " Error while reading from ZMQ.";
|
||||
cout << " Frame index " << frame_metadata->frame_index << " lost.";
|
||||
cout << " Trying to continue with the next frame." << endl;
|
||||
|
||||
return {NULL, NULL};
|
||||
@@ -147,7 +168,8 @@ shared_ptr<FrameMetadata> ZmqReceiver::read_json_header(const string& header)
|
||||
const auto& name = value_mapping.first;
|
||||
const auto& header_data_type = value_mapping.second;
|
||||
|
||||
auto value = get_value_from_json(json_header, name, header_data_type);
|
||||
auto value = get_value_from_json(
|
||||
json_header, name, header_data_type);
|
||||
|
||||
header_data->header_values.insert(
|
||||
{name, value}
|
||||
@@ -160,12 +182,15 @@ shared_ptr<FrameMetadata> ZmqReceiver::read_json_header(const string& header)
|
||||
} catch (...) {
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[ZmqReceiver::read_json_header] Error while interpreting the JSON header. Header string: " << header << endl;
|
||||
cout << "Expected JSON header format: " << endl;
|
||||
cout << "[ZmqReceiver::read_json_header] ";
|
||||
cout << " Error while interpreting the JSON header. ";
|
||||
cout << " Header string: " << header << endl;
|
||||
|
||||
cout << "Expected JSON header format: " << endl;
|
||||
if (header_values_type) {
|
||||
for (const auto& value_mapping : *header_values_type) {
|
||||
cout << "\t" << value_mapping.first << ":" << value_mapping.second.type;
|
||||
cout << "\t" << value_mapping.first << ":";
|
||||
cout << value_mapping.second.type;
|
||||
cout << "[" << value_mapping.second.value_shape << "]" << endl;
|
||||
}
|
||||
} else {
|
||||
@@ -175,7 +200,11 @@ shared_ptr<FrameMetadata> ZmqReceiver::read_json_header(const string& header)
|
||||
}
|
||||
}
|
||||
|
||||
void copy_value_to_buffer(char* buffer, const size_t offset, const pt::ptree& json_value, const HeaderDataType& header_data_type)
|
||||
void copy_value_to_buffer(
|
||||
char* buffer,
|
||||
const size_t offset,
|
||||
const pt::ptree& json_value,
|
||||
const HeaderDataType& header_data_type)
|
||||
{
|
||||
if (header_data_type.type == "uint8") {
|
||||
auto value = json_value.get_value<uint8_t>();
|
||||
@@ -222,13 +251,18 @@ void copy_value_to_buffer(char* buffer, const size_t offset, const pt::ptree& js
|
||||
stringstream error_message;
|
||||
using namespace date;
|
||||
error_message << "[" << std::chrono::system_clock::now() << "]";
|
||||
error_message << "[ZmqReceiver::get_value_from_json] Unsupported header data type " << header_data_type.type << endl;
|
||||
error_message << "[ZmqReceiver::get_value_from_json] ";
|
||||
error_message << " Unsupported header data type ";
|
||||
error_message << header_data_type.type << endl;
|
||||
|
||||
throw runtime_error(error_message.str());
|
||||
}
|
||||
}
|
||||
|
||||
shared_ptr<char> get_value_from_json(const pt::ptree& json_header, const string& name, const HeaderDataType& header_data_type)
|
||||
shared_ptr<char> get_value_from_json(
|
||||
const pt::ptree& json_header,
|
||||
const string& name,
|
||||
const HeaderDataType& header_data_type)
|
||||
{
|
||||
char* buffer = new char[header_data_type.value_bytes_size * header_data_type.value_shape];
|
||||
|
||||
@@ -243,7 +277,8 @@ shared_ptr<char> get_value_from_json(const pt::ptree& json_header, const string&
|
||||
}
|
||||
|
||||
} else {
|
||||
copy_value_to_buffer(buffer, 0, json_header.get_child(name), header_data_type);
|
||||
copy_value_to_buffer(
|
||||
buffer, 0, json_header.get_child(name), header_data_type);
|
||||
}
|
||||
|
||||
return shared_ptr<char>(buffer, default_delete<char[]>());
|
||||
|
||||
Reference in New Issue
Block a user