Reimplement ZmqReceiver

This commit is contained in:
2018-07-05 13:54:27 +02:00
parent e7ac6ac702
commit 3c8ab28154
2 changed files with 125 additions and 137 deletions
+107 -132
View File
@@ -8,8 +8,35 @@
using namespace std;
namespace pt = boost::property_tree;
HeaderDataType::HeaderDataType(const std::string& type, size_t value_shape) :
type(type), value_shape(value_shape), endianness("little") {
value_bytes_size = get_type_byte_size(type);
}
size_t get_type_byte_size(const string& type)
{
if (type == "uint8" || type== "int8") {
return 1;
} else if (type == "uint16" || type == "int16") {
return 2;
} else if (type == "uint32" || type == "int32" || type == "float32") {
return 4;
} else if (type == "uint64" || type == "int64" || type == "float64") {
return 8;
} else {
stringstream error_message;
error_message << "[ZmqReceiver::get_type_byte_size] Unsupported data type " << type << endl;
throw runtime_error(error_message.str());
}
}
ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout,
shared_ptr<unordered_map<string, string>> header_values_type) :
shared_ptr<unordered_map<string, HeaderDataType>> header_values_type) :
connect_address(connect_address), n_io_threads(n_io_threads),
receive_timeout(receive_timeout), receiver(NULL), header_values_type(header_values_type)
@@ -71,132 +98,6 @@ pair<shared_ptr<FrameMetadata>, char*> ZmqReceiver::receive()
return {frame_metadata, static_cast<char*>(message_data.data())};
}
shared_ptr<char> ZmqReceiver::get_value_from_json(const pt::ptree& json_header, const string& name, const string& type)
{
if (type == "uint8") {
return shared_ptr<char>(reinterpret_cast<char*>(new uint8_t(json_header.get<uint8_t>(name))), default_delete<char[]>());
} else if (type == "uint16") {
return shared_ptr<char>(reinterpret_cast<char*>(new uint16_t(json_header.get<uint16_t>(name))), default_delete<char[]>());
} else if (type == "uint32") {
return shared_ptr<char>(reinterpret_cast<char*>(new uint32_t(json_header.get<uint32_t>(name))), default_delete<char[]>());
} else if (type == "uint64") {
return shared_ptr<char>(reinterpret_cast<char*>(new uint64_t(json_header.get<uint64_t>(name))), default_delete<char[]>());
} else if (type == "int8") {
return shared_ptr<char>(reinterpret_cast<char*>(new int8_t(json_header.get<int8_t>(name))), default_delete<char[]>());
} else if (type == "int16") {
return shared_ptr<char>(reinterpret_cast<char*>(new int16_t(json_header.get<int16_t>(name))), default_delete<char[]>());
} else if (type == "int32") {
return shared_ptr<char>(reinterpret_cast<char*>(new int32_t(json_header.get<int32_t>(name))), default_delete<char[]>());
} else if (type == "int64") {
return shared_ptr<char>(reinterpret_cast<char*>(new int64_t(json_header.get<int64_t>(name))), default_delete<char[]>());
} else if (type == "float32") {
return shared_ptr<char>(reinterpret_cast<char*>(new float(json_header.get<float>(name))), default_delete<char[]>());
} else if (type == "float64") {
return shared_ptr<char>(reinterpret_cast<char*>(new double(json_header.get<double>(name))), default_delete<char[]>());
// TODO: This is so ugly I cannot even talk about it. Remove after production panic is over.
} else if (type == "JF4.5M_header") {
// 8 bytes (int64) * 9 values
char* buffer = new char[72];
size_t index = 0;
for (const auto& item : json_header.get_child(name)) {
auto value = item.second.get_value<int64_t>();
char* value_buffer = reinterpret_cast<char*>(&value);
// 8 bytes per value.
memcpy(buffer + (index * 8), value_buffer, 8);
++index;
}
return shared_ptr<char>(buffer, default_delete<char[]>());
// TODO: This is so ugly I cannot even talk about it. Remove after production panic is over.
} else if (type == "JF2.0M_header") {
// 8 bytes (int64) * 4 values
char* buffer = new char[32];
size_t index = 0;
for (const auto& item : json_header.get_child(name)) {
auto value = item.second.get_value<int64_t>();
char* value_buffer = reinterpret_cast<char*>(&value);
// 8 bytes per value.
memcpy(buffer + (index * 8), value_buffer, 8);
++index;
}
return shared_ptr<char>(buffer, default_delete<char[]>());
// TODO: This is so ugly I cannot even talk about it. Remove after production panic is over.
} else if (type == "uJF4.5M_header") {
// 8 bytes (int64) * 9 values
char* buffer = new char[72];
size_t index = 0;
for (const auto& item : json_header.get_child(name)) {
auto value = item.second.get_value<uint64_t>();
char* value_buffer = reinterpret_cast<char*>(&value);
// 8 bytes per value.
memcpy(buffer + (index * 8), value_buffer, 8);
++index;
}
return shared_ptr<char>(buffer, default_delete<char[]>());
// TODO: This is so ugly I cannot even talk about it. Remove after production panic is over.
} else if (type == "uJF2.0M_header") {
// 8 bytes (int64) * 4 values
char* buffer = new char[32];
size_t index = 0;
for (const auto& item : json_header.get_child(name)) {
auto value = item.second.get_value<uint64_t>();
char* value_buffer = reinterpret_cast<char*>(&value);
// 8 bytes per value.
memcpy(buffer + (index * 8), value_buffer, 8);
++index;
}
return shared_ptr<char>(buffer, default_delete<char[]>());
} else {
// We cannot really convert this attribute.
stringstream error_message;
error_message << "[ZmqReceiver::get_value_from_json] Unsupported header data type " << type << endl;
throw runtime_error(error_message.str());
}
}
shared_ptr<FrameMetadata> ZmqReceiver::read_json_header(const string& header)
{
try {
@@ -222,9 +123,9 @@ shared_ptr<FrameMetadata> ZmqReceiver::read_json_header(const string& header)
for (const auto& value_mapping : *header_values_type) {
const auto& name = value_mapping.first;
const auto& type = value_mapping.second;
const auto& header_data_type = value_mapping.second;
auto value = get_value_from_json(json_header, name, type);
auto value = get_value_from_json(json_header, name, header_data_type);
header_data->header_values.insert(
{name, value}
@@ -240,16 +141,90 @@ shared_ptr<FrameMetadata> ZmqReceiver::read_json_header(const string& header)
if (header_values_type) {
for (const auto& value_mapping : *header_values_type) {
cout << "\t" << value_mapping.first << ":" << value_mapping.second << endl;
cout << "\t" << value_mapping.first << ":" << value_mapping.second.type;
cout << "[" << value_mapping.second.value_shape << "]" << endl;
}
} else {
cout << "\tExpected header value types is a null pointer." << endl;
}
throw;
}
}
const shared_ptr<unordered_map<string, string>> ZmqReceiver::get_header_values_type() const{
void copy_value_to_buffer(char* buffer, const size_t offset, const pt::ptree& json_value, const HeaderDataType& header_data_type)
{
if (header_data_type.type == "uint8") {
auto value = json_value.get_value<uint8_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "uint16") {
auto value = json_value.get_value<uint16_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "uint32") {
auto value = json_value.get_value<uint32_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "uint64") {
auto value = json_value.get_value<uint64_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "int8") {
auto value = json_value.get_value<int8_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "int16") {
auto value = json_value.get_value<int16_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "int32") {
auto value = json_value.get_value<int32_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "int64") {
auto value = json_value.get_value<int64_t>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "float32") {
auto value = json_value.get_value<float>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else if (header_data_type.type == "float64") {
auto value = json_value.get_value<double>();
memcpy(buffer + offset, reinterpret_cast<char*>(&value), header_data_type.value_bytes_size);
} else {
// We cannot really convert this attribute.
stringstream error_message;
error_message << "[ZmqReceiver::get_value_from_json] Unsupported header data type " << header_data_type.type << endl;
throw runtime_error(error_message.str());
}
}
shared_ptr<char> ZmqReceiver::get_value_from_json(const pt::ptree& json_header, const string& name, const HeaderDataType& header_data_type) const
{
char* buffer = new char[header_data_type.value_bytes_size * header_data_type.value_shape];
if (header_data_type.value_shape == 1) {
copy_value_to_buffer(buffer, 0, json_header.get_child(name), header_data_type);
} else {
size_t index = 0;
for (const auto& item : json_header.get_child(name)) {
auto offset = index * header_data_type.value_bytes_size;
copy_value_to_buffer(buffer, offset, json_header.get_child(name), header_data_type);
++index;
}
}
return shared_ptr<char>(buffer, default_delete<char[]>());
}
const shared_ptr<unordered_map<string, HeaderDataType>> ZmqReceiver::get_header_values_type() const
{
return header_values_type;
}
+18 -5
View File
@@ -12,6 +12,19 @@
#include "RingBuffer.hpp"
struct HeaderDataType
{
std::string type;
size_t value_shape;
std::string endianness;
size_t value_bytes_size;
HeaderDataType(const std::string& type, size_t n_values);
};
size_t get_type_byte_size(const std::string& type);
void copy_value_to_buffer(const char* buffer, size_t offset, const boost::property_tree::ptree& json_value, const HeaderDataType& header_data_type);
class ZmqReceiver
{
const std::string connect_address;
@@ -23,16 +36,16 @@ class ZmqReceiver
zmq::message_t message_data;
boost::property_tree::ptree json_header;
std::shared_ptr<std::unordered_map<std::string, std::string>> header_values_type = NULL;
std::shared_ptr<std::unordered_map<std::string, HeaderDataType>> header_values_type = NULL;
std::shared_ptr<FrameMetadata> read_json_header(const std::string& header);
std::shared_ptr<char> get_value_from_json(const boost::property_tree::ptree& json_header,
const std::string& name, const std::string& type);
const std::string& name, const HeaderDataType& header_data_type) const;
public:
ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout,
std::shared_ptr<std::unordered_map<std::string, std::string>> header_values_type=NULL);
std::shared_ptr<std::unordered_map<std::string, HeaderDataType>> header_values_type=NULL);
virtual ~ZmqReceiver(){};
@@ -40,7 +53,7 @@ class ZmqReceiver
std::pair<std::shared_ptr<FrameMetadata>, char*> receive();
const std::shared_ptr<std::unordered_map<std::string, std::string>> get_header_values_type() const;
const std::shared_ptr<std::unordered_map<std::string, HeaderDataType>> get_header_values_type() const;
};