From 97cfa407d3afc365f2e18e894877df54b7dc8916 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 25 May 2020 15:29:47 +0200 Subject: [PATCH] Finished implementation of ZmqRecv for Writer --- sf-writer/src/WriterZmqReceiver.cpp | 152 ++++++++++++++-------------- 1 file changed, 74 insertions(+), 78 deletions(-) diff --git a/sf-writer/src/WriterZmqReceiver.cpp b/sf-writer/src/WriterZmqReceiver.cpp index e562310..f916d39 100644 --- a/sf-writer/src/WriterZmqReceiver.cpp +++ b/sf-writer/src/WriterZmqReceiver.cpp @@ -10,9 +10,11 @@ using namespace core_buffer; WriterZmqReceiver::WriterZmqReceiver( void *ctx, const string &ipc_prefix, - const size_t n_modules) : - n_modules_(n_modules), - sockets_(n_modules) + const size_t n_modules, + const uint64_t stop_pulse_id_) : + n_modules_(n_modules), + sockets_(n_modules), + end_pulse_id_(stop_pulse_id_) { for (size_t i = 0; i < n_modules; i++) { @@ -48,95 +50,89 @@ WriterZmqReceiver::~WriterZmqReceiver() void WriterZmqReceiver::get_next_buffer( const uint64_t start_pulse_id, - ImageMetadataBuffer* image_metadata, + ImageMetadataBuffer* i_meta, char* image_buffer) { - // Init the image metadata. - image_metadata->pulse_id = pulse_id; - image_metadata->frame_index = 0; - image_metadata->daq_rec = 0; - image_metadata->data_n_bytes = 0; - image_metadata->is_good_frame = 1; - bool image_metadata_init = false; + auto n_images_in_buffer = WRITER_DATA_CACHE_N_IMAGES; + auto images_left = end_pulse_id_ - start_pulse_id + 1; + if (images_left < n_images_in_buffer) { + n_images_in_buffer = images_left; + } - size_t image_buffer_offset = 0; + i_meta->n_images = (uint16_t)n_images_in_buffer; - for (size_t pulse_id=start_pulse_id) + for (uint64_t i_pulse=0; i_pulsepulse_id[i_pulse] = pulse_id; + i_meta->is_good_image[i_pulse] = 1; + i_meta->frame_index[i_pulse] = 0; + i_meta->daq_rec[i_pulse] = 0; - if (n_bytes_metadata != sizeof(ReplayModuleFrameBuffer)) { - throw runtime_error("Wrong number of metadata bytes."); - } + for (size_t i_module = 0; i_module < n_modules_; i_module++) { - for (size_t i=0; iis_good_frame = 0; - - // Init the image metadata with the first valid frame. - } else if (!image_metadata_init) { - image_metadata_init = true; - - image_metadata->frame_index = - frame_metadata.metadata.frame_index; - image_metadata->daq_rec = - frame_metadata.metadata.daq_rec; - } - - // Once the image is not good, we don't care to re-flag it. - if (image_metadata->is_good_frame == 1) { - if (frame_metadata.metadata.frame_index != - image_metadata->frame_index) { - image_metadata->is_good_frame = 0; + if (n_bytes_metadata != sizeof(f_meta_)) { + throw runtime_error("Wrong number of metadata bytes."); } - if (frame_metadata.metadata.daq_rec != - image_metadata->daq_rec) { - image_metadata->is_good_frame = 0; + if (f_meta_.pulse_id == 0) { + i_meta->is_good_image[i_pulse] = 0; + + } else { + if (!pulse_id_initialized) { + // Init the image metadata with the first valid frame. + pulse_id_initialized = true; + + i_meta->frame_index[i_pulse] = f_meta_.frame_index; + i_meta->daq_rec[i_pulse] = f_meta_.daq_rec; + } + + if (f_meta_.pulse_id != i_meta->pulse_id[i_pulse]) { + stringstream err_msg; + + err_msg << "[WriterZmqReceiver::get_next_buffer]"; + err_msg << " Read unexpected pulse_id. "; + err_msg << " Expected " << pulse_id; + err_msg << " received "; + err_msg << f_meta_.pulse_id; + err_msg << " from i_module " << i_module << endl; + + throw runtime_error(err_msg.str()); + } } - if (frame_metadata.metadata.n_received_packets != - JUNGFRAU_N_PACKETS_PER_FRAME) { - image_metadata->is_good_frame = 0; + // Once the image is not good, we don't care to re-flag it. + if (i_meta->is_good_image[i_pulse] == 1) { + + if (f_meta_.frame_index != i_meta->frame_index[i_pulse]) { + i_meta->is_good_image[i_pulse] = 0; + } + + if (f_meta_.daq_rec != i_meta->daq_rec[i_pulse]) { + i_meta->is_good_image[i_pulse] = 0; + } + + if (f_meta_.n_received_packets != JF_N_PACKETS_PER_FRAME) { + i_meta->is_good_image[i_pulse] = 0; + } + } + + auto pulse_offset = i_pulse * n_modules_ * MODULE_N_BYTES ; + auto module_offset = i_module * MODULE_N_BYTES; + + auto n_bytes_image = zmq_recv( + sockets_[i_module], + (image_buffer + pulse_offset + module_offset), + MODULE_N_BYTES, 0); + + if (n_bytes_image != MODULE_N_BYTES) { + throw runtime_error("Wrong number of data bytes."); } } - - auto n_bytes_image = zmq_recv( - sockets_[i_module], - (image_buffer + image_buffer_offset), - frame_metadata.data_n_bytes, - 0); - - if (n_bytes_image != frame_metadata.data_n_bytes) { - throw runtime_error("Wrong number of data bytes."); - } - - image_buffer_offset += n_bytes_image; } }