From c15b74a5696b42e58f51226d421f325b9889c430 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 28 Apr 2020 17:44:55 +0200 Subject: [PATCH] Simplify interface --- sf-buffer/src/sf_writer.cpp | 62 +++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/sf-buffer/src/sf_writer.cpp b/sf-buffer/src/sf_writer.cpp index cdddb09..414a0fb 100644 --- a/sf-buffer/src/sf_writer.cpp +++ b/sf-buffer/src/sf_writer.cpp @@ -3,21 +3,28 @@ #include "buffer_config.hpp" #include "zmq.h" #include -#include #include #include #include #include "SFWriter.hpp" #include +#include using namespace std; using namespace core_buffer; +struct DetectorFrame +{ + uint64_t pulse_id; + uint64_t frame_index; + uint32_t daq_rec; + uint16_t n_received_packets; +}; void receive_replay( const string ipc_prefix, const size_t n_modules, - RingBuffer& ring_buffer, + FastQueue& queue, void* ctx) { try { @@ -45,22 +52,24 @@ void receive_replay( } } - auto metadata_buffer = make_unique(); - char *image_buffer = nullptr; + auto module_meta_buffer = make_unique(); while (true) { - auto rb_metadata = make_shared(); - image_buffer = ring_buffer.reserve(rb_metadata); - if (image_buffer == nullptr){ + auto slot_id = queue.reserve(); + + if (slot_id == -1){ this_thread::sleep_for(chrono::milliseconds(5)); continue; } + auto frame_meta_buffer = queue.get_metadata_buffer(slot_id); + auto frame_buffer = queue.get_data_buffer(slot_id); + for (size_t i = 0; i < n_modules; i++) { auto n_bytes_metadata = zmq_recv( sockets[i], - metadata_buffer.get(), + (char*) frame_meta_buffer, sizeof(ModuleFrame), 0); @@ -71,20 +80,24 @@ void receive_replay( // Initialize buffers in first iteration for each pulse_id. if (i == 0) { - rb_metadata->pulse_id = metadata_buffer->pulse_id; - rb_metadata->frame_index = metadata_buffer->frame_index; - rb_metadata->daq_rec = metadata_buffer->daq_rec; - rb_metadata->n_received_packets = - metadata_buffer->n_received_packets; + frame_meta_buffer->pulse_id = + module_meta_buffer->pulse_id; + frame_meta_buffer->frame_index = + module_meta_buffer->frame_index; + frame_meta_buffer->daq_rec = + module_meta_buffer->daq_rec; + frame_meta_buffer->n_received_packets = + module_meta_buffer->n_received_packets; } - if (rb_metadata->pulse_id != metadata_buffer->pulse_id) { + if (frame_meta_buffer->pulse_id != + module_meta_buffer->pulse_id) { throw runtime_error("Unexpected pulse_id received."); } auto n_bytes_image = zmq_recv( sockets[i], - (image_buffer + (MODULE_N_PIXELS * i)), + (frame_buffer + (MODULE_N_PIXELS * i)), MODULE_N_BYTES, 0); @@ -94,7 +107,7 @@ void receive_replay( } } - ring_buffer.commit(rb_metadata); + queue.commit(); } for (size_t i = 0; i < n_modules; i++) { @@ -136,8 +149,9 @@ int main (int argc, char *argv[]) size_t n_modules = 32; - RingBuffer ring_buffer(WRITER_RB_BUFFER_SLOTS); - ring_buffer.initialize(MODULE_N_BYTES*n_modules); + FastQueue queue( + n_modules * MODULE_N_BYTES, + WRITER_RB_BUFFER_SLOTS); string ipc_prefix = "ipc://sf-replay-"; auto ctx = zmq_ctx_new(); @@ -147,7 +161,7 @@ int main (int argc, char *argv[]) receive_replay, ipc_prefix, n_modules, - ref(ring_buffer), + ref(queue), ctx); size_t n_frames = stop_pulse_id - start_pulse_id; @@ -166,16 +180,16 @@ int main (int argc, char *argv[]) auto current_pulse_id = start_pulse_id; while (current_pulse_id <= stop_pulse_id) { - auto received_data = ring_buffer.read(); + auto slot_id = queue.read(); - if(received_data.first == nullptr) { + if(slot_id == -1) { this_thread::sleep_for(chrono::milliseconds( config::ring_buffer_read_retry_interval)); continue; } - auto metadata = received_data.first; - auto data = received_data.second; + auto metadata = queue.get_metadata_buffer(slot_id); + auto data = queue.get_data_buffer(slot_id); if (metadata->pulse_id != current_pulse_id) { stringstream err_msg; @@ -199,7 +213,7 @@ int main (int argc, char *argv[]) start_time = chrono::steady_clock::now(); writer.write(metadata, data); - ring_buffer.release(metadata->buffer_slot_index); + queue.release(); current_pulse_id++; // TODO: Some poor statistics.