diff --git a/core-writer/include/ZmqRecvModule.hpp b/core-writer/include/ZmqRecvModule.hpp index fa22573..e2345b3 100644 --- a/core-writer/include/ZmqRecvModule.hpp +++ b/core-writer/include/ZmqRecvModule.hpp @@ -11,6 +11,7 @@ class ZmqRecvModule RingBuffer& ring_buffer_; const header_map& header_values_; const std::atomic_bool& is_writing_; + std::atomic_bool is_receiving_; protected: void receive_thread( diff --git a/core-writer/src/ZmqRecvModule.cpp b/core-writer/src/ZmqRecvModule.cpp index 747d362..d2e095a 100644 --- a/core-writer/src/ZmqRecvModule.cpp +++ b/core-writer/src/ZmqRecvModule.cpp @@ -1,6 +1,7 @@ #include #include +#include #include "ZmqRecvModule.hpp" using namespace std; @@ -11,10 +12,9 @@ ZmqRecvModule::ZmqRecvModule( const std::atomic_bool& is_writing) : ring_buffer_(ring_buffer_), header_values_(header_values), - is_writing_(is_writing) -{ - -} + is_writing_(is_writing), + is_receiving_(false) +{} void ZmqRecvModule::start( const string& connect_address, @@ -40,13 +40,13 @@ void ZmqRecvModule::receive_thread( receiver.connect(); - while (true) { + while (is_receiving_.load(memory_order_relaxed)) { auto frame = receiver.receive(); // If no message, first and second = nullptr if (frame.first == nullptr || - !is_writing_.load(memory_order::memory_order_relaxed)) { + !is_writing_.load(memory_order_relaxed)) { continue; } @@ -70,11 +70,16 @@ void ZmqRecvModule::receive_thread( char* buffer = ring_buffer_.reserve(frame_metadata); - size_t max_buffer_size = compression::get_bitshuffle_max_buffer_size( - frame_metadata->frame_bytes_size, 1); - - if (max_buffer_size > ring_buffer.get_slot_size()) { + // TODO: Add flag to disable compression. + { + // TODO: Cache results no to calculate this every time. + size_t max_buffer_size = + compression::get_bitshuffle_max_buffer_size( + frame_metadata->frame_bytes_size, 1); + if (max_buffer_size > ring_buffer_.get_slot_size()) { + //TODO: Throw error if not large enough. + } } auto compressed_size = compression::compress_bitshuffle( @@ -85,8 +90,10 @@ void ZmqRecvModule::receive_thread( #ifdef DEBUG_OUTPUT using namespace date; - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ProcessManager::receive_zmq] Compressed image from "; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[ProcessManager::receive_zmq]"; + cout << " Compressed image from "; cout << frame_metadata->frame_bytes_size << " bytes to "; cout << compressed_size << " bytes." << endl; #endif @@ -98,7 +105,9 @@ void ZmqRecvModule::receive_thread( #ifdef DEBUG_OUTPUT using namespace date; - cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ProcessManager::receive_zmq] Receiver thread stopped." << endl; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[ProcessManager::receive_zmq]"; + cout << " Receiver thread stopped." << endl; #endif -} \ No newline at end of file +}