From 3480b9847145b1466db730adaa773f7619b75397 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 31 Mar 2020 18:08:42 +0200 Subject: [PATCH] Improve ZmqReceiver code --- core-writer/include/ZmqReceiver.hpp | 18 +++-- core-writer/src/receiver/ZmqReceiver.cpp | 86 +++++++++++------------- 2 files changed, 46 insertions(+), 58 deletions(-) diff --git a/core-writer/include/ZmqReceiver.hpp b/core-writer/include/ZmqReceiver.hpp index ce863ba..0d66d0d 100644 --- a/core-writer/include/ZmqReceiver.hpp +++ b/core-writer/include/ZmqReceiver.hpp @@ -40,6 +40,8 @@ std::shared_ptr get_value_from_json( class ZmqReceiver { + typedef std::unordered_map header_map; + const std::string connect_address; const int n_io_threads; const int receive_timeout; @@ -49,24 +51,20 @@ class ZmqReceiver zmq::message_t message_data; boost::property_tree::ptree json_header; - std::shared_ptr> header_values_type = NULL; + const header_map& header_values_type_; public: - ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout, - std::shared_ptr> header_values_type=NULL); - - ZmqReceiver(const ZmqReceiver& other); - - virtual ~ZmqReceiver(){}; + ZmqReceiver( + const std::string& connect_address, + const int n_io_threads, + const int receive_timeout, + const header_map& header_values_type); void connect(); std::shared_ptr read_json_header(const std::string& header); std::pair, char*> receive(); - - const std::shared_ptr> get_header_values_type() const; - }; #endif diff --git a/core-writer/src/receiver/ZmqReceiver.cpp b/core-writer/src/receiver/ZmqReceiver.cpp index 458bf00..6484259 100644 --- a/core-writer/src/receiver/ZmqReceiver.cpp +++ b/core-writer/src/receiver/ZmqReceiver.cpp @@ -55,17 +55,19 @@ ZmqReceiver::ZmqReceiver( const std::string& connect_address, const int n_io_threads, const int receive_timeout, - shared_ptr> header_values_type) : + const header_map& header_values_type) : connect_address(connect_address), n_io_threads(n_io_threads), receive_timeout(receive_timeout), receiver(NULL), - header_values_type(header_values_type) + header_values_type_(header_values_type) { #ifdef DEBUG_OUTPUT using namespace date; - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ZmqReceiver::ZmqReceiver] Creating ZMQ receiver with"; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[ZmqReceiver::ZmqReceiver]"; + cout << " Creating ZMQ receiver with"; cout << " connect_address " << connect_address; cout << " n_io_threads " << n_io_threads; cout << " receive_timeout " << receive_timeout; @@ -76,22 +78,14 @@ ZmqReceiver::ZmqReceiver( message_data = zmq::message_t(config::zmq_buffer_size_data); } - -ZmqReceiver::ZmqReceiver(const ZmqReceiver& other) : - ZmqReceiver( - other.connect_address, - other.n_io_threads, - other.receive_timeout, - other.header_values_type) -{ -} - void ZmqReceiver::connect() { #ifdef DEBUG_OUTPUT using namespace date; - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ZmqReceiver::connect] Connecting to address " << connect_address; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[ZmqReceiver::connect]"; + cout << " Connecting to address " << connect_address; cout << " with n_io_threads " << n_io_threads << endl; #endif @@ -107,7 +101,8 @@ pair, char*> ZmqReceiver::receive() if (!receiver) { stringstream error_message; using namespace date; - error_message << "[" << std::chrono::system_clock::now() << "]"; + using namespace chrono; + error_message << "[" << system_clock::now() << "]"; error_message << "[ZmqReceiver::receive]"; error_message << " Cannot receive before connecting."; error_message << " Connect first." << endl; @@ -120,15 +115,17 @@ pair, char*> ZmqReceiver::receive() return {nullptr, nullptr}; } - auto header_string = string(static_cast(message_header.data()), - message_header.size()); + auto header_string = string( + static_cast(message_header.data()), + message_header.size()); auto frame_metadata = read_json_header(header_string); // Get the message data. if (!receiver->recv(&message_data)) { using namespace date; - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ZmqReceiver::receive] "; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[ZmqReceiver::receive]"; cout << " Error while reading from ZMQ."; cout << " Frame index " << frame_metadata->frame_index << " lost."; cout << " Trying to continue with the next frame." << endl; @@ -162,40 +159,37 @@ shared_ptr ZmqReceiver::read_json_header(const string& header) header_data->type = json_header.get("type"); - if (header_values_type) { - for (const auto& value_mapping : *header_values_type) { - - const auto& name = value_mapping.first; - const auto& header_data_type = value_mapping.second; - auto value = get_value_from_json( - json_header, name, header_data_type); + for (const auto& value_mapping : header_values_type_) { - header_data->header_values.insert( - {name, value} - ); - } + const auto& name = value_mapping.first; + const auto& header_data_type = value_mapping.second; + + auto value = get_value_from_json( + json_header, name, header_data_type); + + header_data->header_values.insert( + {name, value} + ); } return header_data; } catch (...) { using namespace date; - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ZmqReceiver::read_json_header] "; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[ZmqReceiver::read_json_header]"; cout << " Error while interpreting the JSON header. "; cout << " Header string: " << header << endl; cout << "Expected JSON header format: " << endl; - if (header_values_type) { - for (const auto& value_mapping : *header_values_type) { - cout << "\t" << value_mapping.first << ":"; - cout << value_mapping.second.type; - cout << "[" << value_mapping.second.value_shape << "]" << endl; - } - } else { - cout << "\tExpected header value types is a null pointer." << endl; + for (const auto& value_mapping : header_values_type_) { + cout << "\t" << value_mapping.first << ":"; + cout << value_mapping.second.type; + cout << "[" << value_mapping.second.value_shape << "]" << endl; } + throw; } } @@ -274,7 +268,8 @@ shared_ptr get_value_from_json( const string& name, const HeaderDataType& header_data_type) { - char* buffer = new char[header_data_type.value_bytes_size * header_data_type.value_shape]; + char* buffer = new char[ + header_data_type.value_bytes_size * header_data_type.value_shape]; if (header_data_type.is_array) { size_t index = 0; @@ -293,8 +288,3 @@ shared_ptr get_value_from_json( return shared_ptr(buffer, default_delete()); } - -const shared_ptr> ZmqReceiver::get_header_values_type() const -{ - return header_values_type; -}