Wrap entire thread in try-catch for ZmqReceiverModule as well

This commit is contained in:
2020-04-08 08:48:12 +02:00
parent 934cf9e0c4
commit b872bb7510
2 changed files with 88 additions and 64 deletions
+79 -63
View File
@@ -98,86 +98,102 @@ void ZmqRecvModule::stop_saving()
void ZmqRecvModule::receive_thread(const string& connect_address)
{
ZmqReceiver receiver(header_values_);
receiver.connect(connect_address);
try {
bool rb_initialized(false);
ZmqReceiver receiver(header_values_);
receiver.connect(connect_address);
while (is_receiving_.load(memory_order_relaxed)) {
bool rb_initialized(false);
auto frame = receiver.receive();
while (is_receiving_.load(memory_order_relaxed)) {
// .first and .second = nullptr when no message received
// If no message or currently not writing, idle.
if (frame.first == nullptr ||
!is_saving_.load(memory_order_relaxed)) {
continue;
auto frame = receiver.receive();
// .first and .second = nullptr when no message received
// If no message or currently not writing, idle.
if (frame.first == nullptr ||
!is_saving_.load(memory_order_relaxed)) {
continue;
}
auto frame_metadata = frame.first;
auto frame_data = frame.second;
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::receive_thread]";
cout << " Processing FrameMetadata with frame_index ";
cout << frame_metadata->frame_index;
cout << " and frame_shape [" << frame_metadata->frame_shape[0];
cout << ", " << frame_metadata->frame_shape[1] << "]";
cout << " and endianness " << frame_metadata->endianness;
cout << " and type " << frame_metadata->type;
cout << " and frame_bytes_size ";
cout << frame_metadata->frame_bytes_size << "." << endl;
#endif
if (!rb_initialized) {
size_t n_elements =
frame_metadata->frame_shape[0] *
frame_metadata->frame_shape[1];
size_t max_buffer_size =
compression::get_bitshuffle_max_buffer_size(
n_elements,
frame_metadata->frame_bytes_size/n_elements);
ring_buffer_.initialize(max_buffer_size);
rb_initialized = true;
}
char* buffer = ring_buffer_.reserve(frame_metadata);
auto compressed_size = compression::compress_bitshuffle(
static_cast<const char*>(frame_data),
frame_metadata->frame_bytes_size,
1,
buffer);
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::receive_thread]";
cout << " Compressed image from ";
cout << frame_metadata->frame_bytes_size << " bytes to ";
cout << compressed_size << " bytes." << endl;
#endif
frame_metadata->frame_bytes_size = compressed_size;
ring_buffer_.commit(frame_metadata);
}
auto frame_metadata = frame.first;
auto frame_data = frame.second;
receiver.disconnect();
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::receive_thread]";
cout << " Processing FrameMetadata with frame_index ";
cout << frame_metadata->frame_index;
cout << " and frame_shape [" << frame_metadata->frame_shape[0];
cout << ", " << frame_metadata->frame_shape[1] << "]";
cout << " and endianness " << frame_metadata->endianness;
cout << " and type " << frame_metadata->type;
cout << " and frame_bytes_size ";
cout << frame_metadata->frame_bytes_size << "." << endl;
cout << " Receiver thread stopped." << endl;
#endif
if (!rb_initialized) {
} catch (const std::exception& e) {
is_receiving_ = false;
size_t n_elements =
frame_metadata->frame_shape[0] *
frame_metadata->frame_shape[1];
size_t max_buffer_size =
compression::get_bitshuffle_max_buffer_size(
n_elements,
frame_metadata->frame_bytes_size/n_elements);
ring_buffer_.initialize(max_buffer_size);
rb_initialized = true;
}
char* buffer = ring_buffer_.reserve(frame_metadata);
auto compressed_size = compression::compress_bitshuffle(
static_cast<const char*>(frame_data),
frame_metadata->frame_bytes_size,
1,
buffer);
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::receive_thread]";
cout << " Compressed image from ";
cout << frame_metadata->frame_bytes_size << " bytes to ";
cout << compressed_size << " bytes." << endl;
#endif
frame_metadata->frame_bytes_size = compressed_size;
ring_buffer_.commit(frame_metadata);
}
receiver.disconnect();
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[ZmqRecvModule::receive_thread]";
cout << " Receiver thread stopped." << endl;
#endif
cout << " Stopped because of exception: " << endl;
cout << e.what() << endl;
throw;
}
}