diff --git a/core-writer/include/ZmqReceiver.hpp b/core-writer/include/ZmqReceiver.hpp index 726aa8d..3d72df1 100644 --- a/core-writer/include/ZmqReceiver.hpp +++ b/core-writer/include/ZmqReceiver.hpp @@ -13,6 +13,7 @@ #include "date.h" #include "RingBuffer.hpp" +#include "config.hpp" struct HeaderDataType { @@ -42,24 +43,23 @@ class ZmqReceiver { typedef std::unordered_map header_map; - const std::string connect_address; - const int receive_timeout; + const header_map& header_values_type_; zmq::context_t context_; zmq::socket_t socket_; - zmq::message_t message_header; - zmq::message_t message_data; - boost::property_tree::ptree json_header; + zmq::message_t message_header_; + zmq::message_t message_data_; - const header_map& header_values_type_; + boost::property_tree::ptree json_header; public: ZmqReceiver( - const std::string& connect_address, - const int n_io_threads, - const int receive_timeout, - const header_map& header_values_type); + const header_map& header_values_type, + const int n_io_threads=config::zmq_n_io_threads + ); - void connect(); + void connect( + const std::string& connect_address, + const int receive_timeout=config::zmq_receive_timeout); void disconnect(); diff --git a/core-writer/src/receiver/ZmqReceiver.cpp b/core-writer/src/receiver/ZmqReceiver.cpp index c023774..356464c 100644 --- a/core-writer/src/receiver/ZmqReceiver.cpp +++ b/core-writer/src/receiver/ZmqReceiver.cpp @@ -52,15 +52,13 @@ size_t get_type_byte_size(const string& type) } ZmqReceiver::ZmqReceiver( - const std::string& connect_address, - const int n_io_threads, - const int receive_timeout, - const header_map& header_values_type) : - connect_address(connect_address), - receive_timeout(receive_timeout), + const header_map& header_values_type, + const int n_io_threads) : + header_values_type_(header_values_type), context_(n_io_threads), socket_(context_, ZMQ_PULL), - header_values_type_(header_values_type) + message_header_(config::zmq_buffer_size_header), + message_data_(config::zmq_buffer_size_data) { #ifdef DEBUG_OUTPUT using namespace date; @@ -68,24 +66,22 @@ ZmqReceiver::ZmqReceiver( cout << "[" << system_clock::now() << "]"; cout << "[ZmqReceiver::ZmqReceiver]"; cout << " Creating ZMQ receiver with"; - cout << " connect_address " << connect_address; cout << " n_io_threads " << n_io_threads; - cout << " receive_timeout " << receive_timeout; cout << endl; #endif - - message_header = zmq::message_t(config::zmq_buffer_size_header); - message_data = zmq::message_t(config::zmq_buffer_size_data); } -void ZmqReceiver::connect() +void ZmqReceiver::connect( + const string& connect_address, + const int receive_timeout) { #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; cout << "[ZmqReceiver::connect]"; - cout << " Connecting to address " << connect_address << endl; + cout << " Connecting to address " << connect_address; + cout << " with receive timeout " << receive_timeout << endl; #endif socket_.setsockopt(ZMQ_RCVTIMEO, receive_timeout); @@ -121,7 +117,7 @@ pair, char*> ZmqReceiver::receive() } // Get the message header. - if (!socket_.recv(&message_header)){ + if (!socket_.recv(&message_header_)){ return {nullptr, nullptr}; }