diff --git a/core-writer/src/ZmqReceiver.cpp b/core-writer/src/ZmqReceiver.cpp index 69ee578..bfd7fe4 100644 --- a/core-writer/src/ZmqReceiver.cpp +++ b/core-writer/src/ZmqReceiver.cpp @@ -9,13 +9,21 @@ using namespace std; namespace pt = boost::property_tree; HeaderDataType::HeaderDataType(const std::string& type, size_t shape) : - type(type), value_shape(shape), endianness("little"), is_array(true) { - value_bytes_size = get_type_byte_size(type); + type(type), + value_shape(shape), + endianness("little"), + is_array(true) +{ + value_bytes_size = get_type_byte_size(type); } HeaderDataType::HeaderDataType(const std::string& type) : - type(type), value_shape(1), endianness("little"), is_array(false) { - value_bytes_size = get_type_byte_size(type); + type(type), + value_shape(1), + endianness("little"), + is_array(false) +{ + value_bytes_size = get_type_byte_size(type); } size_t get_type_byte_size(const string& type) @@ -36,17 +44,23 @@ size_t get_type_byte_size(const string& type) stringstream error_message; using namespace date; error_message << "[" << std::chrono::system_clock::now() << "]"; - error_message << "[ZmqReceiver::get_type_byte_size] Unsupported data type " << type << endl; + error_message << "[ZmqReceiver::get_type_byte_size]"; + error_message << " Unsupported data type " << type << endl; throw runtime_error(error_message.str()); } } -ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout, - shared_ptr> header_values_type) : - connect_address(connect_address), n_io_threads(n_io_threads), - receive_timeout(receive_timeout), receiver(NULL), header_values_type(header_values_type) - +ZmqReceiver::ZmqReceiver( + const std::string& connect_address, + const int n_io_threads, + const int receive_timeout, + shared_ptr> header_values_type) : + connect_address(connect_address), + n_io_threads(n_io_threads), + receive_timeout(receive_timeout), + receiver(NULL), + header_values_type(header_values_type) { #ifdef DEBUG_OUTPUT using namespace date; @@ -64,11 +78,14 @@ ZmqReceiver::ZmqReceiver(const std::string& connect_address, const int n_io_thre ZmqReceiver::ZmqReceiver(const ZmqReceiver& other) : - ZmqReceiver(other.connect_address, other.n_io_threads, other.receive_timeout, other.header_values_type) + ZmqReceiver( + other.connect_address, + other.n_io_threads, + other.receive_timeout, + other.header_values_type) { } - void ZmqReceiver::connect() { #ifdef DEBUG_OUTPUT @@ -91,8 +108,9 @@ pair, char*> ZmqReceiver::receive() stringstream error_message; using namespace date; error_message << "[" << std::chrono::system_clock::now() << "]"; - error_message << "[ZmqReceiver::receive] Cannot receive before connecting. "; - error_message << "Connect first." << endl; + error_message << "[ZmqReceiver::receive]"; + error_message << " Cannot receive before connecting."; + error_message << " Connect first." << endl; throw runtime_error(error_message.str()); } @@ -102,14 +120,17 @@ pair, char*> ZmqReceiver::receive() return {NULL, NULL}; } - auto header_string = string(static_cast(message_header.data()), message_header.size()); + auto header_string = string(static_cast(message_header.data()), + message_header.size()); auto frame_metadata = read_json_header(header_string); // Get the message data. if (!receiver->recv(&message_data)) { using namespace date; cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ZmqReceiver::receive] Error while reading from ZMQ. Frame index " << frame_metadata->frame_index << " lost."; + cout << "[ZmqReceiver::receive] "; + cout << " Error while reading from ZMQ."; + cout << " Frame index " << frame_metadata->frame_index << " lost."; cout << " Trying to continue with the next frame." << endl; return {NULL, NULL}; @@ -147,7 +168,8 @@ shared_ptr ZmqReceiver::read_json_header(const string& header) const auto& name = value_mapping.first; const auto& header_data_type = value_mapping.second; - auto value = get_value_from_json(json_header, name, header_data_type); + auto value = get_value_from_json( + json_header, name, header_data_type); header_data->header_values.insert( {name, value} @@ -160,12 +182,15 @@ shared_ptr ZmqReceiver::read_json_header(const string& header) } catch (...) { using namespace date; cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ZmqReceiver::read_json_header] Error while interpreting the JSON header. Header string: " << header << endl; - cout << "Expected JSON header format: " << endl; + cout << "[ZmqReceiver::read_json_header] "; + cout << " Error while interpreting the JSON header. "; + cout << " Header string: " << header << endl; + cout << "Expected JSON header format: " << endl; if (header_values_type) { for (const auto& value_mapping : *header_values_type) { - cout << "\t" << value_mapping.first << ":" << value_mapping.second.type; + cout << "\t" << value_mapping.first << ":"; + cout << value_mapping.second.type; cout << "[" << value_mapping.second.value_shape << "]" << endl; } } else { @@ -175,7 +200,11 @@ shared_ptr ZmqReceiver::read_json_header(const string& header) } } -void copy_value_to_buffer(char* buffer, const size_t offset, const pt::ptree& json_value, const HeaderDataType& header_data_type) +void copy_value_to_buffer( + char* buffer, + const size_t offset, + const pt::ptree& json_value, + const HeaderDataType& header_data_type) { if (header_data_type.type == "uint8") { auto value = json_value.get_value(); @@ -222,13 +251,18 @@ void copy_value_to_buffer(char* buffer, const size_t offset, const pt::ptree& js stringstream error_message; using namespace date; error_message << "[" << std::chrono::system_clock::now() << "]"; - error_message << "[ZmqReceiver::get_value_from_json] Unsupported header data type " << header_data_type.type << endl; + error_message << "[ZmqReceiver::get_value_from_json] "; + error_message << " Unsupported header data type "; + error_message << header_data_type.type << endl; throw runtime_error(error_message.str()); } } -shared_ptr get_value_from_json(const pt::ptree& json_header, const string& name, const HeaderDataType& header_data_type) +shared_ptr get_value_from_json( + const pt::ptree& json_header, + const string& name, + const HeaderDataType& header_data_type) { char* buffer = new char[header_data_type.value_bytes_size * header_data_type.value_shape]; @@ -243,7 +277,8 @@ shared_ptr get_value_from_json(const pt::ptree& json_header, const string& } } else { - copy_value_to_buffer(buffer, 0, json_header.get_child(name), header_data_type); + copy_value_to_buffer( + buffer, 0, json_header.get_child(name), header_data_type); } return shared_ptr(buffer, default_delete());