From a3110580dc62e94b31b864ee3fc27dbb728547fd Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 1 Apr 2020 15:41:50 +0200 Subject: [PATCH] Implement ZmqRecvModule start procedure --- core-writer/include/ZmqRecvModule.hpp | 11 ++++---- core-writer/src/ZmqRecvModule.cpp | 37 +++++++++++++++++++++++---- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/core-writer/include/ZmqRecvModule.hpp b/core-writer/include/ZmqRecvModule.hpp index e2345b3..c65005b 100644 --- a/core-writer/include/ZmqRecvModule.hpp +++ b/core-writer/include/ZmqRecvModule.hpp @@ -1,6 +1,7 @@ #ifndef ZMQRECVMODULE_H #define ZMQRECVMODULE_H +#include #include "ZmqReceiver.hpp" #include "RingBuffer.hpp" @@ -12,11 +13,11 @@ class ZmqRecvModule const header_map& header_values_; const std::atomic_bool& is_writing_; std::atomic_bool is_receiving_; + std::vector receiving_threads_; protected: void receive_thread( - const std::string& connect_address, - const uint8_t n_receiving_threads); + const std::string& connect_address); public: ZmqRecvModule( @@ -24,10 +25,8 @@ public: const header_map& header_values, const std::atomic_bool& is_writing); - void start( - const std::string& connect_address, - const uint8_t n_receiving_thread); - + void start(const std::string& connect_address, + const uint8_t n_receiving_threads); void stop(); }; diff --git a/core-writer/src/ZmqRecvModule.cpp b/core-writer/src/ZmqRecvModule.cpp index d2e095a..f481900 100644 --- a/core-writer/src/ZmqRecvModule.cpp +++ b/core-writer/src/ZmqRecvModule.cpp @@ -18,19 +18,46 @@ ZmqRecvModule::ZmqRecvModule( void ZmqRecvModule::start( const string& connect_address, - const uint8_t n_receiving_thread) + const uint8_t n_receiving_threads) { + if (is_receiving_ == true) { + stringstream err_msg; + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[ZmqRecvModule::start]"; + err_msg << " Receivers already running." << endl; + + throw runtime_error(err_msg.str()); + } + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[ZmqRecvModule::start]"; + cout << " Starting with parameters:"; + cout << "\tconnect_address: " << connect_address; + cout << "\tn_receiving_thread: " << n_receiving_threads; + cout << endl; + #endif + + is_receiving_ = true; + + for (uint8_t i_rec=0; i_rec < n_receiving_threads; i_rec++) { + receiving_threads_.emplace_back( + &ZmqRecvModule::receive_thread, this, connect_address); + } } void ZmqRecvModule::stop() { + is_receiving_ = false; } -void ZmqRecvModule::receive_thread( - const string& connect_address, - const uint8_t n_receiving_threads) +void ZmqRecvModule::receive_thread(const string& connect_address) { ZmqReceiver receiver( connect_address, @@ -44,7 +71,7 @@ void ZmqRecvModule::receive_thread( auto frame = receiver.receive(); - // If no message, first and second = nullptr + // If no message, .first and .second = nullptr if (frame.first == nullptr || !is_writing_.load(memory_order_relaxed)) { continue;