diff --git a/src/ZmqReceiver.cpp b/src/ZmqReceiver.cpp index 3b7488e..1aa2e8e 100644 --- a/src/ZmqReceiver.cpp +++ b/src/ZmqReceiver.cpp @@ -8,8 +8,35 @@ using namespace std; namespace pt = boost::property_tree; +HeaderDataType::HeaderDataType(const std::string& type, size_t value_shape) : + type(type), value_shape(value_shape), endianness("little") { + value_bytes_size = get_type_byte_size(type); +} + +size_t get_type_byte_size(const string& type) +{ + if (type == "uint8" || type== "int8") { + return 1; + + } else if (type == "uint16" || type == "int16") { + return 2; + + } else if (type == "uint32" || type == "int32" || type == "float32") { + return 4; + + } else if (type == "uint64" || type == "int64" || type == "float64") { + return 8; + + } else { + stringstream error_message; + error_message << "[ZmqReceiver::get_type_byte_size] Unsupported data type " << type << endl; + + throw runtime_error(error_message.str()); + } +} + ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout, - shared_ptr> header_values_type) : + shared_ptr> header_values_type) : connect_address(connect_address), n_io_threads(n_io_threads), receive_timeout(receive_timeout), receiver(NULL), header_values_type(header_values_type) @@ -71,132 +98,6 @@ pair, char*> ZmqReceiver::receive() return {frame_metadata, static_cast(message_data.data())}; } -shared_ptr ZmqReceiver::get_value_from_json(const pt::ptree& json_header, const string& name, const string& type) -{ - if (type == "uint8") { - return shared_ptr(reinterpret_cast(new uint8_t(json_header.get(name))), default_delete()); - - } else if (type == "uint16") { - return shared_ptr(reinterpret_cast(new uint16_t(json_header.get(name))), default_delete()); - - } else if (type == "uint32") { - return shared_ptr(reinterpret_cast(new uint32_t(json_header.get(name))), default_delete()); - - } else if (type == "uint64") { - return shared_ptr(reinterpret_cast(new uint64_t(json_header.get(name))), default_delete()); - - } else if (type == "int8") { - return shared_ptr(reinterpret_cast(new int8_t(json_header.get(name))), default_delete()); - - } else if (type == "int16") { - return shared_ptr(reinterpret_cast(new int16_t(json_header.get(name))), default_delete()); - - } else if (type == "int32") { - return shared_ptr(reinterpret_cast(new int32_t(json_header.get(name))), default_delete()); - - } else if (type == "int64") { - return shared_ptr(reinterpret_cast(new int64_t(json_header.get(name))), default_delete()); - - } else if (type == "float32") { - return shared_ptr(reinterpret_cast(new float(json_header.get(name))), default_delete()); - - } else if (type == "float64") { - return shared_ptr(reinterpret_cast(new double(json_header.get(name))), default_delete()); - - // TODO: This is so ugly I cannot even talk about it. Remove after production panic is over. - } else if (type == "JF4.5M_header") { - - // 8 bytes (int64) * 9 values - char* buffer = new char[72]; - - size_t index = 0; - - for (const auto& item : json_header.get_child(name)) { - - auto value = item.second.get_value(); - char* value_buffer = reinterpret_cast(&value); - - // 8 bytes per value. - memcpy(buffer + (index * 8), value_buffer, 8); - - ++index; - } - - return shared_ptr(buffer, default_delete()); - - // TODO: This is so ugly I cannot even talk about it. Remove after production panic is over. - } else if (type == "JF2.0M_header") { - - // 8 bytes (int64) * 4 values - char* buffer = new char[32]; - - size_t index = 0; - - for (const auto& item : json_header.get_child(name)) { - - auto value = item.second.get_value(); - char* value_buffer = reinterpret_cast(&value); - - // 8 bytes per value. - memcpy(buffer + (index * 8), value_buffer, 8); - - ++index; - } - - return shared_ptr(buffer, default_delete()); - - // TODO: This is so ugly I cannot even talk about it. Remove after production panic is over. - } else if (type == "uJF4.5M_header") { - - // 8 bytes (int64) * 9 values - char* buffer = new char[72]; - - size_t index = 0; - - for (const auto& item : json_header.get_child(name)) { - - auto value = item.second.get_value(); - char* value_buffer = reinterpret_cast(&value); - - // 8 bytes per value. - memcpy(buffer + (index * 8), value_buffer, 8); - - ++index; - } - - return shared_ptr(buffer, default_delete()); - - // TODO: This is so ugly I cannot even talk about it. Remove after production panic is over. - } else if (type == "uJF2.0M_header") { - - // 8 bytes (int64) * 4 values - char* buffer = new char[32]; - - size_t index = 0; - - for (const auto& item : json_header.get_child(name)) { - - auto value = item.second.get_value(); - char* value_buffer = reinterpret_cast(&value); - - // 8 bytes per value. - memcpy(buffer + (index * 8), value_buffer, 8); - - ++index; - } - - return shared_ptr(buffer, default_delete()); - - - } else { - // We cannot really convert this attribute. - stringstream error_message; - error_message << "[ZmqReceiver::get_value_from_json] Unsupported header data type " << type << endl; - - throw runtime_error(error_message.str()); - } -} - shared_ptr ZmqReceiver::read_json_header(const string& header) { try { @@ -222,9 +123,9 @@ shared_ptr ZmqReceiver::read_json_header(const string& header) for (const auto& value_mapping : *header_values_type) { const auto& name = value_mapping.first; - const auto& type = value_mapping.second; + const auto& header_data_type = value_mapping.second; - auto value = get_value_from_json(json_header, name, type); + auto value = get_value_from_json(json_header, name, header_data_type); header_data->header_values.insert( {name, value} @@ -240,16 +141,90 @@ shared_ptr ZmqReceiver::read_json_header(const string& header) if (header_values_type) { for (const auto& value_mapping : *header_values_type) { - cout << "\t" << value_mapping.first << ":" << value_mapping.second << endl; + cout << "\t" << value_mapping.first << ":" << value_mapping.second.type; + cout << "[" << value_mapping.second.value_shape << "]" << endl; } } else { cout << "\tExpected header value types is a null pointer." << endl; } throw; } - } -const shared_ptr> ZmqReceiver::get_header_values_type() const{ +void copy_value_to_buffer(char* buffer, const size_t offset, const pt::ptree& json_value, const HeaderDataType& header_data_type) +{ + if (header_data_type.type == "uint8") { + auto value = json_value.get_value(); + memcpy(buffer + offset, reinterpret_cast(&value), header_data_type.value_bytes_size); + + } else if (header_data_type.type == "uint16") { + auto value = json_value.get_value(); + memcpy(buffer + offset, reinterpret_cast(&value), header_data_type.value_bytes_size); + + } else if (header_data_type.type == "uint32") { + auto value = json_value.get_value(); + memcpy(buffer + offset, reinterpret_cast(&value), header_data_type.value_bytes_size); + + } else if (header_data_type.type == "uint64") { + auto value = json_value.get_value(); + memcpy(buffer + offset, reinterpret_cast(&value), header_data_type.value_bytes_size); + + } else if (header_data_type.type == "int8") { + auto value = json_value.get_value(); + memcpy(buffer + offset, reinterpret_cast(&value), header_data_type.value_bytes_size); + + } else if (header_data_type.type == "int16") { + auto value = json_value.get_value(); + memcpy(buffer + offset, reinterpret_cast(&value), header_data_type.value_bytes_size); + + } else if (header_data_type.type == "int32") { + auto value = json_value.get_value(); + memcpy(buffer + offset, reinterpret_cast(&value), header_data_type.value_bytes_size); + + } else if (header_data_type.type == "int64") { + auto value = json_value.get_value(); + memcpy(buffer + offset, reinterpret_cast(&value), header_data_type.value_bytes_size); + + } else if (header_data_type.type == "float32") { + auto value = json_value.get_value(); + memcpy(buffer + offset, reinterpret_cast(&value), header_data_type.value_bytes_size); + + } else if (header_data_type.type == "float64") { + auto value = json_value.get_value(); + memcpy(buffer + offset, reinterpret_cast(&value), header_data_type.value_bytes_size); + + } else { + // We cannot really convert this attribute. + stringstream error_message; + error_message << "[ZmqReceiver::get_value_from_json] Unsupported header data type " << header_data_type.type << endl; + + throw runtime_error(error_message.str()); + } +} + +shared_ptr ZmqReceiver::get_value_from_json(const pt::ptree& json_header, const string& name, const HeaderDataType& header_data_type) const +{ + char* buffer = new char[header_data_type.value_bytes_size * header_data_type.value_shape]; + + if (header_data_type.value_shape == 1) { + copy_value_to_buffer(buffer, 0, json_header.get_child(name), header_data_type); + + } else { + size_t index = 0; + + for (const auto& item : json_header.get_child(name)) { + auto offset = index * header_data_type.value_bytes_size; + copy_value_to_buffer(buffer, offset, json_header.get_child(name), header_data_type); + + ++index; + } + + } + + return shared_ptr(buffer, default_delete()); +} + +const shared_ptr> ZmqReceiver::get_header_values_type() const +{ return header_values_type; } diff --git a/src/ZmqReceiver.hpp b/src/ZmqReceiver.hpp index e3008fa..1c71644 100644 --- a/src/ZmqReceiver.hpp +++ b/src/ZmqReceiver.hpp @@ -12,6 +12,19 @@ #include "RingBuffer.hpp" +struct HeaderDataType +{ + std::string type; + size_t value_shape; + std::string endianness; + size_t value_bytes_size; + + HeaderDataType(const std::string& type, size_t n_values); +}; + +size_t get_type_byte_size(const std::string& type); +void copy_value_to_buffer(const char* buffer, size_t offset, const boost::property_tree::ptree& json_value, const HeaderDataType& header_data_type); + class ZmqReceiver { const std::string connect_address; @@ -23,16 +36,16 @@ class ZmqReceiver zmq::message_t message_data; boost::property_tree::ptree json_header; - std::shared_ptr> header_values_type = NULL; + std::shared_ptr> header_values_type = NULL; std::shared_ptr read_json_header(const std::string& header); - + std::shared_ptr get_value_from_json(const boost::property_tree::ptree& json_header, - const std::string& name, const std::string& type); + const std::string& name, const HeaderDataType& header_data_type) const; public: ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout, - std::shared_ptr> header_values_type=NULL); + std::shared_ptr> header_values_type=NULL); virtual ~ZmqReceiver(){}; @@ -40,7 +53,7 @@ class ZmqReceiver std::pair, char*> receive(); - const std::shared_ptr> get_header_values_type() const; + const std::shared_ptr> get_header_values_type() const; };