From fd83d42636388bfbe3d9997f8eb7034cc0159fe8 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Thu, 23 Apr 2020 12:22:26 +0200 Subject: [PATCH] Add exception handling to thread --- sf-writer/sf_h5_writer.cpp | 148 ++++++++++++++++++++----------------- 1 file changed, 80 insertions(+), 68 deletions(-) diff --git a/sf-writer/sf_h5_writer.cpp b/sf-writer/sf_h5_writer.cpp index 3a75cf0..0cf4f32 100644 --- a/sf-writer/sf_h5_writer.cpp +++ b/sf-writer/sf_h5_writer.cpp @@ -20,81 +20,93 @@ void receive_replay( RingBuffer& ring_buffer, void* ctx) { + try { + void *sockets[n_modules]; + for (size_t i = 0; i < n_modules; i++) { + sockets[i] = zmq_socket(ctx, ZMQ_PULL); + int rcvhwm = REPLAY_BLOCK_SIZE; + if (zmq_setsockopt(sockets[i], ZMQ_RCVHWM, &rcvhwm, + sizeof(rcvhwm)) != 0) { + throw runtime_error(strerror(errno)); + } + int linger = 0; + if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger, + sizeof(linger)) != 0) { + throw runtime_error(strerror(errno)); + } - void* sockets[n_modules]; - for (size_t i=0; i(); + char *image_buffer = nullptr; - if (zmq_bind(sockets[i], ipc.c_str()) != 0) { - throw runtime_error(strerror (errno)); + while (true) { + auto rb_metadata = make_shared(); + image_buffer = ring_buffer.reserve(rb_metadata); + + for (size_t i = 0; i < n_modules; i++) { + auto n_bytes_metadata = zmq_recv( + sockets[i], + metadata_buffer.get(), + sizeof(ModuleFrame), + 0); + + if (n_bytes_metadata != sizeof(ModuleFrame)) { + // TODO: Make nicer expcetion. + throw runtime_error(strerror(errno)); + } + + // Initialize buffers in first iteration for each pulse_id. + if (i == 0) { + rb_metadata->pulse_id = metadata_buffer->pulse_id; + rb_metadata->frame_index = metadata_buffer->frame_index; + rb_metadata->daq_rec = metadata_buffer->daq_rec; + rb_metadata->n_received_packets = + metadata_buffer->n_received_packets; + } + + if (rb_metadata->pulse_id != metadata_buffer->pulse_id) { + throw runtime_error("Unexpected pulse_id received."); + } + + auto n_bytes_image = zmq_recv( + sockets[i], + (image_buffer + (MODULE_N_PIXELS * i)), + MODULE_N_BYTES, + 0); + + if (n_bytes_image != MODULE_N_BYTES) { + // TODO: Make nicer expcetion. + throw runtime_error("Unexpected number of bytes in image."); + } + } + + ring_buffer.commit(rb_metadata); } + + for (size_t i = 0; i < n_modules; i++) { + zmq_close(sockets[i]); + } + + zmq_ctx_destroy(ctx); + } catch (const std::exception& e) { + using namespace date; + using namespace chrono; + + cout << "[" << system_clock::now() << "]"; + cout << " Stopped because of exception: " << endl; + cout << e.what() << endl; + + throw; } - - auto metadata_buffer = make_unique(); - char* image_buffer = nullptr; - - while (true) { - auto rb_metadata = make_shared(); - image_buffer = ring_buffer.reserve(rb_metadata); - - for (size_t i=0; ipulse_id = metadata_buffer->pulse_id; - rb_metadata->frame_index = metadata_buffer->frame_index; - rb_metadata->daq_rec = metadata_buffer->daq_rec; - rb_metadata->n_received_packets = - metadata_buffer->n_received_packets; - } - - if (rb_metadata->pulse_id != metadata_buffer->pulse_id) { - throw runtime_error("Unexpected pulse_id received."); - } - - auto n_bytes_image = zmq_recv( - sockets[i], - (image_buffer + (MODULE_N_PIXELS*i)), - MODULE_N_BYTES, - 0); - - if (n_bytes_image != MODULE_N_BYTES) { - // TODO: Make nicer expcetion. - throw runtime_error("Unexpected number of bytes in image."); - } - } - - ring_buffer.commit(rb_metadata); - } - - for (size_t i=0; i