Cleanup ZmqReceiver

This commit is contained in:
2020-04-03 13:47:48 +02:00
parent 90a4206fe0
commit 3853e08238
2 changed files with 22 additions and 26 deletions
+11 -11
View File
@@ -13,6 +13,7 @@
#include "date.h"
#include "RingBuffer.hpp"
#include "config.hpp"
struct HeaderDataType
{
@@ -42,24 +43,23 @@ class ZmqReceiver
{
typedef std::unordered_map<std::string, HeaderDataType> header_map;
const std::string connect_address;
const int receive_timeout;
const header_map& header_values_type_;
zmq::context_t context_;
zmq::socket_t socket_;
zmq::message_t message_header;
zmq::message_t message_data;
boost::property_tree::ptree json_header;
zmq::message_t message_header_;
zmq::message_t message_data_;
const header_map& header_values_type_;
boost::property_tree::ptree json_header;
public:
ZmqReceiver(
const std::string& connect_address,
const int n_io_threads,
const int receive_timeout,
const header_map& header_values_type);
const header_map& header_values_type,
const int n_io_threads=config::zmq_n_io_threads
);
void connect();
void connect(
const std::string& connect_address,
const int receive_timeout=config::zmq_receive_timeout);
void disconnect();
+11 -15
View File
@@ -52,15 +52,13 @@ size_t get_type_byte_size(const string& type)
}
ZmqReceiver::ZmqReceiver(
const std::string& connect_address,
const int n_io_threads,
const int receive_timeout,
const header_map& header_values_type) :
connect_address(connect_address),
receive_timeout(receive_timeout),
const header_map& header_values_type,
const int n_io_threads) :
header_values_type_(header_values_type),
context_(n_io_threads),
socket_(context_, ZMQ_PULL),
header_values_type_(header_values_type)
message_header_(config::zmq_buffer_size_header),
message_data_(config::zmq_buffer_size_data)
{
#ifdef DEBUG_OUTPUT
using namespace date;
@@ -68,24 +66,22 @@ ZmqReceiver::ZmqReceiver(
cout << "[" << system_clock::now() << "]";
cout << "[ZmqReceiver::ZmqReceiver]";
cout << " Creating ZMQ receiver with";
cout << " connect_address " << connect_address;
cout << " n_io_threads " << n_io_threads;
cout << " receive_timeout " << receive_timeout;
cout << endl;
#endif
message_header = zmq::message_t(config::zmq_buffer_size_header);
message_data = zmq::message_t(config::zmq_buffer_size_data);
}
void ZmqReceiver::connect()
void ZmqReceiver::connect(
const string& connect_address,
const int receive_timeout)
{
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqReceiver::connect]";
cout << " Connecting to address " << connect_address << endl;
cout << " Connecting to address " << connect_address;
cout << " with receive timeout " << receive_timeout << endl;
#endif
socket_.setsockopt(ZMQ_RCVTIMEO, receive_timeout);
@@ -121,7 +117,7 @@ pair<shared_ptr<FrameMetadata>, char*> ZmqReceiver::receive()
}
// Get the message header.
if (!socket_.recv(&message_header)){
if (!socket_.recv(&message_header_)){
return {nullptr, nullptr};
}