From 26d12b6837fb2b088472469beeb0617844647c79 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 29 Apr 2020 14:41:05 +0200 Subject: [PATCH] Make sf_writer buffer aware --- sf-buffer/src/sf_writer.cpp | 82 +++++++++++++++++++++---------------- 1 file changed, 47 insertions(+), 35 deletions(-) diff --git a/sf-buffer/src/sf_writer.cpp b/sf-buffer/src/sf_writer.cpp index ff108aa..8a4d5bc 100644 --- a/sf-buffer/src/sf_writer.cpp +++ b/sf-buffer/src/sf_writer.cpp @@ -58,44 +58,56 @@ void receive_replay( auto frame_meta_buffer = queue.get_metadata_buffer(slot_id); auto frame_buffer = queue.get_data_buffer(slot_id); - for (size_t i = 0; i < n_modules; i++) { - auto n_bytes_metadata = zmq_recv( - sockets[i], - module_meta_buffer.get(), - sizeof(ModuleFrame), - 0); + for ( + size_t i_buffer=0; + i_bufferpulse_id = - module_meta_buffer->pulse_id; - frame_meta_buffer->frame_index = - module_meta_buffer->frame_index; - frame_meta_buffer->daq_rec = - module_meta_buffer->daq_rec; - frame_meta_buffer->n_received_packets = - module_meta_buffer->n_received_packets; - } + if (n_bytes_metadata != sizeof(ModuleFrame)) { + // TODO: Make nicer expcetion. + throw runtime_error(strerror(errno)); + } - if (frame_meta_buffer->pulse_id != + // Initialize buffers in first iteration for each pulse_id. + if (i_module == 0) { + frame_meta_buffer->pulse_id = + module_meta_buffer->pulse_id; + frame_meta_buffer->frame_index = + module_meta_buffer->frame_index; + frame_meta_buffer->daq_rec = + module_meta_buffer->daq_rec; + frame_meta_buffer->n_received_packets = + module_meta_buffer->n_received_packets; + } + + if (frame_meta_buffer->pulse_id != module_meta_buffer->pulse_id) { - throw runtime_error("Unexpected pulse_id received."); - } + throw runtime_error("Unexpected pulse_id received."); + } - auto n_bytes_image = zmq_recv( - sockets[i], - (frame_buffer + (MODULE_N_PIXELS * i)), - MODULE_N_BYTES, - 0); + // Offset due to frame in buffer. + size_t offset = MODULE_N_BYTES * n_modules * i_buffer; + // offset due to module in frame. + offset += MODULE_N_BYTES * i_module; - if (n_bytes_image != MODULE_N_BYTES) { - // TODO: Make nicer expcetion. - throw runtime_error("Unexpected number of bytes in image."); + auto n_bytes_image = zmq_recv( + sockets[i_module], + (frame_buffer + offset), + MODULE_N_BYTES, + 0); + + if (n_bytes_image != MODULE_N_BYTES) { + // TODO: Make nicer expcetion. + throw runtime_error("Unexpected number of bytes."); + } } } @@ -142,7 +154,7 @@ int main (int argc, char *argv[]) size_t n_modules = 32; FastQueue queue( - n_modules * MODULE_N_BYTES, + n_modules * MODULE_N_BYTES * WRITER_N_FRAMES_BUFFER, WRITER_RB_BUFFER_SLOTS); string ipc_prefix = "ipc://sf-replay-"; @@ -206,10 +218,10 @@ int main (int argc, char *argv[]) writer.write(metadata, data); queue.release(); - current_pulse_id++; + current_pulse_id += WRITER_N_FRAMES_BUFFER; // TODO: Some poor statistics. - stats_counter++; + stats_counter += WRITER_N_FRAMES_BUFFER; auto write_end_time = chrono::steady_clock::now(); auto write_us_duration = chrono::duration_cast( write_end_time-start_time).count();