diff --git a/core-writer/src/ZmqRecvModule.cpp b/core-writer/src/ZmqRecvModule.cpp index 8402929..a67cd8f 100644 --- a/core-writer/src/ZmqRecvModule.cpp +++ b/core-writer/src/ZmqRecvModule.cpp @@ -101,11 +101,14 @@ void ZmqRecvModule::receive_thread(const string& connect_address) ZmqReceiver receiver(header_values_); receiver.connect(connect_address); + bool rb_initialized(false); + while (is_receiving_.load(memory_order_relaxed)) { auto frame = receiver.receive(); - // If no message, .first and .second = nullptr + // .first and .second = nullptr when no message received + // If no message or currently not writing, idle. if (frame.first == nullptr || !is_writing_.load(memory_order_relaxed)) { continue; @@ -129,11 +132,8 @@ void ZmqRecvModule::receive_thread(const string& connect_address) cout << frame_metadata->frame_bytes_size << "." << endl; #endif - char* buffer = ring_buffer_.reserve(frame_metadata); + if (!rb_initialized) { - // TODO: Add flag to disable compression. - // TODO: Cache results no to calculate this every time. - { size_t n_elements = frame_metadata->frame_shape[0] * frame_metadata->frame_shape[1]; @@ -143,22 +143,13 @@ void ZmqRecvModule::receive_thread(const string& connect_address) n_elements, frame_metadata->frame_bytes_size/n_elements); - if (max_buffer_size > ring_buffer_.get_slot_size()) { - stringstream err_msg; + ring_buffer_.initialize(max_buffer_size); - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[ZmqRecvModule::receive_thread]"; - err_msg << " RingBuffer slot size "; - err_msg << ring_buffer_.get_slot_size(); - err_msg << " smaller than max_buffer_size "; - err_msg << max_buffer_size << endl; - - throw runtime_error(err_msg.str()); - } + rb_initialized = true; } + char* buffer = ring_buffer_.reserve(frame_metadata); + auto compressed_size = compression::compress_bitshuffle( static_cast(frame_data), frame_metadata->frame_bytes_size,