Improve ZmqReceiver code

This commit is contained in:
2020-03-31 18:08:42 +02:00
parent 8f6f2ef32c
commit 3480b98471
2 changed files with 46 additions and 58 deletions
+8 -10
View File
@@ -40,6 +40,8 @@ std::shared_ptr<char> get_value_from_json(
class ZmqReceiver
{
typedef std::unordered_map<std::string, HeaderDataType> header_map;
const std::string connect_address;
const int n_io_threads;
const int receive_timeout;
@@ -49,24 +51,20 @@ class ZmqReceiver
zmq::message_t message_data;
boost::property_tree::ptree json_header;
std::shared_ptr<std::unordered_map<std::string, HeaderDataType>> header_values_type = NULL;
const header_map& header_values_type_;
public:
ZmqReceiver(const std::string& connect_address, const int n_io_threads, const int receive_timeout,
std::shared_ptr<std::unordered_map<std::string, HeaderDataType>> header_values_type=NULL);
ZmqReceiver(const ZmqReceiver& other);
virtual ~ZmqReceiver(){};
ZmqReceiver(
const std::string& connect_address,
const int n_io_threads,
const int receive_timeout,
const header_map& header_values_type);
void connect();
std::shared_ptr<FrameMetadata> read_json_header(const std::string& header);
std::pair<std::shared_ptr<FrameMetadata>, char*> receive();
const std::shared_ptr<std::unordered_map<std::string, HeaderDataType>> get_header_values_type() const;
};
#endif
+38 -48
View File
@@ -55,17 +55,19 @@ ZmqReceiver::ZmqReceiver(
const std::string& connect_address,
const int n_io_threads,
const int receive_timeout,
shared_ptr<unordered_map<string, HeaderDataType>> header_values_type) :
const header_map& header_values_type) :
connect_address(connect_address),
n_io_threads(n_io_threads),
receive_timeout(receive_timeout),
receiver(NULL),
header_values_type(header_values_type)
header_values_type_(header_values_type)
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ZmqReceiver::ZmqReceiver] Creating ZMQ receiver with";
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqReceiver::ZmqReceiver]";
cout << " Creating ZMQ receiver with";
cout << " connect_address " << connect_address;
cout << " n_io_threads " << n_io_threads;
cout << " receive_timeout " << receive_timeout;
@@ -76,22 +78,14 @@ ZmqReceiver::ZmqReceiver(
message_data = zmq::message_t(config::zmq_buffer_size_data);
}
ZmqReceiver::ZmqReceiver(const ZmqReceiver& other) :
ZmqReceiver(
other.connect_address,
other.n_io_threads,
other.receive_timeout,
other.header_values_type)
{
}
void ZmqReceiver::connect()
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ZmqReceiver::connect] Connecting to address " << connect_address;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqReceiver::connect]";
cout << " Connecting to address " << connect_address;
cout << " with n_io_threads " << n_io_threads << endl;
#endif
@@ -107,7 +101,8 @@ pair<shared_ptr<FrameMetadata>, char*> ZmqReceiver::receive()
if (!receiver) {
stringstream error_message;
using namespace date;
error_message << "[" << std::chrono::system_clock::now() << "]";
using namespace chrono;
error_message << "[" << system_clock::now() << "]";
error_message << "[ZmqReceiver::receive]";
error_message << " Cannot receive before connecting.";
error_message << " Connect first." << endl;
@@ -120,15 +115,17 @@ pair<shared_ptr<FrameMetadata>, char*> ZmqReceiver::receive()
return {nullptr, nullptr};
}
auto header_string = string(static_cast<char*>(message_header.data()),
message_header.size());
auto header_string = string(
static_cast<char*>(message_header.data()),
message_header.size());
auto frame_metadata = read_json_header(header_string);
// Get the message data.
if (!receiver->recv(&message_data)) {
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ZmqReceiver::receive] ";
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqReceiver::receive]";
cout << " Error while reading from ZMQ.";
cout << " Frame index " << frame_metadata->frame_index << " lost.";
cout << " Trying to continue with the next frame." << endl;
@@ -162,40 +159,37 @@ shared_ptr<FrameMetadata> ZmqReceiver::read_json_header(const string& header)
header_data->type = json_header.get<string>("type");
if (header_values_type) {
for (const auto& value_mapping : *header_values_type) {
const auto& name = value_mapping.first;
const auto& header_data_type = value_mapping.second;
auto value = get_value_from_json(
json_header, name, header_data_type);
for (const auto& value_mapping : header_values_type_) {
header_data->header_values.insert(
{name, value}
);
}
const auto& name = value_mapping.first;
const auto& header_data_type = value_mapping.second;
auto value = get_value_from_json(
json_header, name, header_data_type);
header_data->header_values.insert(
{name, value}
);
}
return header_data;
} catch (...) {
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ZmqReceiver::read_json_header] ";
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqReceiver::read_json_header]";
cout << " Error while interpreting the JSON header. ";
cout << " Header string: " << header << endl;
cout << "Expected JSON header format: " << endl;
if (header_values_type) {
for (const auto& value_mapping : *header_values_type) {
cout << "\t" << value_mapping.first << ":";
cout << value_mapping.second.type;
cout << "[" << value_mapping.second.value_shape << "]" << endl;
}
} else {
cout << "\tExpected header value types is a null pointer." << endl;
for (const auto& value_mapping : header_values_type_) {
cout << "\t" << value_mapping.first << ":";
cout << value_mapping.second.type;
cout << "[" << value_mapping.second.value_shape << "]" << endl;
}
throw;
}
}
@@ -274,7 +268,8 @@ shared_ptr<char> get_value_from_json(
const string& name,
const HeaderDataType& header_data_type)
{
char* buffer = new char[header_data_type.value_bytes_size * header_data_type.value_shape];
char* buffer = new char[
header_data_type.value_bytes_size * header_data_type.value_shape];
if (header_data_type.is_array) {
size_t index = 0;
@@ -293,8 +288,3 @@ shared_ptr<char> get_value_from_json(
return shared_ptr<char>(buffer, default_delete<char[]>());
}
const shared_ptr<unordered_map<string, HeaderDataType>> ZmqReceiver::get_header_values_type() const
{
return header_values_type;
}