Implement ZmqRecvModule start procedure

This commit is contained in:
2020-04-01 15:41:50 +02:00
parent e43c9c0ccd
commit a3110580dc
2 changed files with 37 additions and 11 deletions
+5 -6
View File
@@ -1,6 +1,7 @@
#ifndef ZMQRECVMODULE_H
#define ZMQRECVMODULE_H
#include <thread>
#include "ZmqReceiver.hpp"
#include "RingBuffer.hpp"
@@ -12,11 +13,11 @@ class ZmqRecvModule
const header_map& header_values_;
const std::atomic_bool& is_writing_;
std::atomic_bool is_receiving_;
std::vector<std::thread> receiving_threads_;
protected:
void receive_thread(
const std::string& connect_address,
const uint8_t n_receiving_threads);
const std::string& connect_address);
public:
ZmqRecvModule(
@@ -24,10 +25,8 @@ public:
const header_map& header_values,
const std::atomic_bool& is_writing);
void start(
const std::string& connect_address,
const uint8_t n_receiving_thread);
void start(const std::string& connect_address,
const uint8_t n_receiving_threads);
void stop();
};
+32 -5
View File
@@ -18,19 +18,46 @@ ZmqRecvModule::ZmqRecvModule(
void ZmqRecvModule::start(
const string& connect_address,
const uint8_t n_receiving_thread)
const uint8_t n_receiving_threads)
{
if (is_receiving_ == true) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[ZmqRecvModule::start]";
err_msg << " Receivers already running." << endl;
throw runtime_error(err_msg.str());
}
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::start]";
cout << " Starting with parameters:";
cout << "\tconnect_address: " << connect_address;
cout << "\tn_receiving_thread: " << n_receiving_threads;
cout << endl;
#endif
is_receiving_ = true;
for (uint8_t i_rec=0; i_rec < n_receiving_threads; i_rec++) {
receiving_threads_.emplace_back(
&ZmqRecvModule::receive_thread, this, connect_address);
}
}
void ZmqRecvModule::stop()
{
is_receiving_ = false;
}
void ZmqRecvModule::receive_thread(
const string& connect_address,
const uint8_t n_receiving_threads)
void ZmqRecvModule::receive_thread(const string& connect_address)
{
ZmqReceiver receiver(
connect_address,
@@ -44,7 +71,7 @@ void ZmqRecvModule::receive_thread(
auto frame = receiver.receive();
// If no message, first and second = nullptr
// If no message, .first and .second = nullptr
if (frame.first == nullptr ||
!is_writing_.load(memory_order_relaxed)) {
continue;