diff --git a/core-writer/src/module/ZmqRecvModule.cpp b/core-writer/src/module/ZmqRecvModule.cpp index fe4f1e8..6e1d961 100644 --- a/core-writer/src/module/ZmqRecvModule.cpp +++ b/core-writer/src/module/ZmqRecvModule.cpp @@ -98,86 +98,102 @@ void ZmqRecvModule::stop_saving() void ZmqRecvModule::receive_thread(const string& connect_address) { - ZmqReceiver receiver(header_values_); - receiver.connect(connect_address); + try { - bool rb_initialized(false); + ZmqReceiver receiver(header_values_); + receiver.connect(connect_address); - while (is_receiving_.load(memory_order_relaxed)) { + bool rb_initialized(false); - auto frame = receiver.receive(); + while (is_receiving_.load(memory_order_relaxed)) { - // .first and .second = nullptr when no message received - // If no message or currently not writing, idle. - if (frame.first == nullptr || - !is_saving_.load(memory_order_relaxed)) { - continue; + auto frame = receiver.receive(); + + // .first and .second = nullptr when no message received + // If no message or currently not writing, idle. + if (frame.first == nullptr || + !is_saving_.load(memory_order_relaxed)) { + continue; + } + + auto frame_metadata = frame.first; + auto frame_data = frame.second; + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[ZmqRecvModule::receive_thread]"; + cout << " Processing FrameMetadata with frame_index "; + cout << frame_metadata->frame_index; + cout << " and frame_shape [" << frame_metadata->frame_shape[0]; + cout << ", " << frame_metadata->frame_shape[1] << "]"; + cout << " and endianness " << frame_metadata->endianness; + cout << " and type " << frame_metadata->type; + cout << " and frame_bytes_size "; + cout << frame_metadata->frame_bytes_size << "." << endl; + #endif + + if (!rb_initialized) { + + size_t n_elements = + frame_metadata->frame_shape[0] * + frame_metadata->frame_shape[1]; + + size_t max_buffer_size = + compression::get_bitshuffle_max_buffer_size( + n_elements, + frame_metadata->frame_bytes_size/n_elements); + + ring_buffer_.initialize(max_buffer_size); + + rb_initialized = true; + } + + char* buffer = ring_buffer_.reserve(frame_metadata); + + auto compressed_size = compression::compress_bitshuffle( + static_cast(frame_data), + frame_metadata->frame_bytes_size, + 1, + buffer); + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + cout << "[" << system_clock::now() << "]"; + cout << "[ZmqRecvModule::receive_thread]"; + cout << " Compressed image from "; + cout << frame_metadata->frame_bytes_size << " bytes to "; + cout << compressed_size << " bytes." << endl; + #endif + + frame_metadata->frame_bytes_size = compressed_size; + + ring_buffer_.commit(frame_metadata); } - auto frame_metadata = frame.first; - auto frame_data = frame.second; + receiver.disconnect(); #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; cout << "[ZmqRecvModule::receive_thread]"; - cout << " Processing FrameMetadata with frame_index "; - cout << frame_metadata->frame_index; - cout << " and frame_shape [" << frame_metadata->frame_shape[0]; - cout << ", " << frame_metadata->frame_shape[1] << "]"; - cout << " and endianness " << frame_metadata->endianness; - cout << " and type " << frame_metadata->type; - cout << " and frame_bytes_size "; - cout << frame_metadata->frame_bytes_size << "." << endl; + cout << " Receiver thread stopped." << endl; #endif - if (!rb_initialized) { + } catch (const std::exception& e) { + is_receiving_ = false; - size_t n_elements = - frame_metadata->frame_shape[0] * - frame_metadata->frame_shape[1]; - - size_t max_buffer_size = - compression::get_bitshuffle_max_buffer_size( - n_elements, - frame_metadata->frame_bytes_size/n_elements); - - ring_buffer_.initialize(max_buffer_size); - - rb_initialized = true; - } - - char* buffer = ring_buffer_.reserve(frame_metadata); - - auto compressed_size = compression::compress_bitshuffle( - static_cast(frame_data), - frame_metadata->frame_bytes_size, - 1, - buffer); - - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - cout << "[" << system_clock::now() << "]"; - cout << "[ZmqRecvModule::receive_thread]"; - cout << " Compressed image from "; - cout << frame_metadata->frame_bytes_size << " bytes to "; - cout << compressed_size << " bytes." << endl; - #endif - - frame_metadata->frame_bytes_size = compressed_size; - - ring_buffer_.commit(frame_metadata); - } - - receiver.disconnect(); - - #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; + cout << "[" << system_clock::now() << "]"; cout << "[ZmqRecvModule::receive_thread]"; - cout << " Receiver thread stopped." << endl; - #endif + cout << " Stopped because of exception: " << endl; + cout << e.what() << endl; + + throw; + } } diff --git a/core-writer/test/test_H5WriteModule.cpp b/core-writer/test/test_H5WriteModule.cpp index c3dd4ce..59f930a 100644 --- a/core-writer/test/test_H5WriteModule.cpp +++ b/core-writer/test/test_H5WriteModule.cpp @@ -52,10 +52,18 @@ TEST(H5WriteModule, basic_interaction) H5WriteModule h5_write_module(ring_buffer, {}, format); + ASSERT_FALSE(h5_write_module.is_writing()); h5_write_module.start_writing("ignore_out.h5", 5); - generate_frames(ring_buffer, 5); + ASSERT_TRUE(h5_write_module.is_writing()); + generate_frames(ring_buffer, 3); this_thread::sleep_for(chrono::milliseconds(100)); + ASSERT_TRUE(h5_write_module.is_writing()); + + generate_frames(ring_buffer, 2); + this_thread::sleep_for(chrono::milliseconds(100)); + // Writing should be completed by now. + ASSERT_FALSE(h5_write_module.is_writing()); // Stop should never throw an exception. h5_write_module.stop_writing();