From 03bee71b490af07edfb4864343254685eb2f73df Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 19 May 2020 16:12:55 +0200 Subject: [PATCH] Adjust sf_writer for receiving buffers --- sf-buffer/src/sf_writer.cpp | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/sf-buffer/src/sf_writer.cpp b/sf-buffer/src/sf_writer.cpp index cc342fc..a3b5e7c 100644 --- a/sf-buffer/src/sf_writer.cpp +++ b/sf-buffer/src/sf_writer.cpp @@ -9,6 +9,7 @@ #include "WriterH5Writer.hpp" #include #include +#include #include "date.h" #include "bitshuffle/bitshuffle.h" #include "WriterZmqReceiver.hpp" @@ -20,27 +21,22 @@ void receive_replay( void* ctx, const string ipc_prefix, const size_t n_modules, - FastQueue& queue, + FastQueue& queue, const uint64_t start_pulse_id, const uint64_t stop_pulse_id) { try { WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules); + BufferedFastQueue buffered_queue( + queue, WRITER_DATA_CACHE_N_IMAGES, n_modules); uint64_t current_pulse_id=start_pulse_id; // "<= stop_pulse_id" because we include the last pulse_id. while(current_pulse_id<=stop_pulse_id) { - auto slot_id = queue.reserve(); - - if (slot_id == -1){ - this_thread::sleep_for(chrono::milliseconds(5)); - continue; - } - - auto image_metadata = queue.get_metadata_buffer(slot_id); - auto image_buffer = queue.get_data_buffer(slot_id); + auto image_metadata = buffered_queue.get_metadata_buffer(); + auto image_buffer = buffered_queue.get_data_buffer(); receiver.get_next_image( current_pulse_id, image_metadata, image_buffer); @@ -49,10 +45,12 @@ void receive_replay( throw runtime_error("Wrong pulse id from zmq receiver."); } - queue.commit(); + buffered_queue.commit(); current_pulse_id++; } + buffered_queue.finalize(); + } catch (const std::exception& e) { using namespace date; using namespace chrono; @@ -87,11 +85,8 @@ int main (int argc, char *argv[]) size_t n_modules = 32; -// auto compress_frame_size = bshuf_compress_lz4_bound( -// MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS); - - FastQueue queue( - MODULE_N_BYTES * n_modules, + FastQueue queue( + MODULE_N_BYTES * n_modules * WRITER_DATA_CACHE_N_IMAGES, WRITER_FASTQUEUE_N_SLOTS); auto ctx = zmq_ctx_new();