diff --git a/core-writer/src/ZmqRecvModule.cpp b/core-writer/src/ZmqRecvModule.cpp index 9366fcd..1c47072 100644 --- a/core-writer/src/ZmqRecvModule.cpp +++ b/core-writer/src/ZmqRecvModule.cpp @@ -25,7 +25,7 @@ void ZmqRecvModule::start_recv( using namespace date; using namespace chrono; err_msg << "[" << system_clock::now() << "]"; - err_msg << "[ZmqRecvModule::start]"; + err_msg << "[ZmqRecvModule::start_recv]"; err_msg << " Receivers already running." << endl; throw runtime_error(err_msg.str()); @@ -38,7 +38,7 @@ void ZmqRecvModule::start_recv( cout << "[ZmqRecvModule::start]"; cout << " Starting with parameters:"; cout << "\tconnect_address: " << connect_address; - cout << "\tn_receiving_thread: " << n_receiving_threads << endl; + cout << "\tn_receiving_thread: " << (int) n_receiving_threads << endl; #endif is_receiving_ = true; @@ -55,14 +55,17 @@ void ZmqRecvModule::stop_recv() using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; - cout << "[ZmqRecvModule::stop]"; + cout << "[ZmqRecvModule::stop_recv]"; cout << " Stop receiving threads." << endl; #endif is_receiving_ = false; - for (auto& thread_ptr:receiving_threads_) { - thread_ptr.join(); + for (auto& recv_thread:receiving_threads_) { + if (recv_thread.joinable()) { + cout << "joining first thread" << endl; + recv_thread.join(); + } } receiving_threads_.clear(); @@ -167,6 +170,8 @@ void ZmqRecvModule::receive_thread(const string& connect_address) ring_buffer_.commit(frame_metadata); } + receiver.disconnect(); + #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono;