diff --git a/core-writer/include/ZmqReceiver.hpp b/core-writer/include/ZmqReceiver.hpp index 664dcfe..726aa8d 100644 --- a/core-writer/include/ZmqReceiver.hpp +++ b/core-writer/include/ZmqReceiver.hpp @@ -43,10 +43,9 @@ class ZmqReceiver typedef std::unordered_map header_map; const std::string connect_address; - const int n_io_threads; const int receive_timeout; - std::shared_ptr receiver = NULL; - std::shared_ptr context = NULL; + zmq::context_t context_; + zmq::socket_t socket_; zmq::message_t message_header; zmq::message_t message_data; boost::property_tree::ptree json_header; diff --git a/core-writer/src/receiver/ZmqReceiver.cpp b/core-writer/src/receiver/ZmqReceiver.cpp index 8e98b9a..c023774 100644 --- a/core-writer/src/receiver/ZmqReceiver.cpp +++ b/core-writer/src/receiver/ZmqReceiver.cpp @@ -57,9 +57,9 @@ ZmqReceiver::ZmqReceiver( const int receive_timeout, const header_map& header_values_type) : connect_address(connect_address), - n_io_threads(n_io_threads), receive_timeout(receive_timeout), - receiver(NULL), + context_(n_io_threads), + socket_(context_, ZMQ_PULL), header_values_type_(header_values_type) { #ifdef DEBUG_OUTPUT @@ -85,15 +85,11 @@ void ZmqReceiver::connect() using namespace chrono; cout << "[" << system_clock::now() << "]"; cout << "[ZmqReceiver::connect]"; - cout << " Connecting to address " << connect_address; - cout << " with n_io_threads " << n_io_threads << endl; + cout << " Connecting to address " << connect_address << endl; #endif - context = make_shared(n_io_threads); - receiver = make_shared(*context, ZMQ_PULL); - - receiver->setsockopt(ZMQ_RCVTIMEO, receive_timeout); - receiver->connect(connect_address); + socket_.setsockopt(ZMQ_RCVTIMEO, receive_timeout); + socket_.connect(connect_address); } void ZmqReceiver::disconnect() @@ -106,13 +102,13 @@ void ZmqReceiver::disconnect() cout << " Disconnect." << endl; #endif - receiver->close(); - context->close(); + socket_.close(); + context_.close(); } pair, char*> ZmqReceiver::receive() { - if (!receiver) { + if (!socket_.connected()) { stringstream error_message; using namespace date; using namespace chrono; @@ -125,7 +121,7 @@ pair, char*> ZmqReceiver::receive() } // Get the message header. - if (!receiver->recv(&message_header)){ + if (!socket_.recv(&message_header)){ return {nullptr, nullptr}; } @@ -135,7 +131,7 @@ pair, char*> ZmqReceiver::receive() auto frame_metadata = read_json_header(header_string); // Get the message data. - if (!receiver->recv(&message_data)) { + if (!socket_.recv(&message_data)) { using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]";