From e2d02baed9507740f8fdaaebd44495b469f18bc9 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 25 May 2020 16:31:40 +0200 Subject: [PATCH] Cut down threads --- sf-writer/test/manual/test_sf_writer_recv.cpp | 68 ++++++++++--------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/sf-writer/test/manual/test_sf_writer_recv.cpp b/sf-writer/test/manual/test_sf_writer_recv.cpp index bdb5ffe..ba28f12 100644 --- a/sf-writer/test/manual/test_sf_writer_recv.cpp +++ b/sf-writer/test/manual/test_sf_writer_recv.cpp @@ -25,26 +25,26 @@ void receive_replay( try { WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules, stop_pulse_id); + int slot_id; + while((slot_id = queue.reserve()) == -1) { + this_thread::sleep_for(chrono::milliseconds( + RB_READ_RETRY_INTERVAL_MS)); + } + uint64_t pulse_id = start_pulse_id; // "<= stop_pulse_id" because we include the last pulse_id. while(pulse_id <= stop_pulse_id) { - int slot_id; - while((slot_id = queue.reserve()) == -1) { - this_thread::sleep_for(chrono::milliseconds( - RB_READ_RETRY_INTERVAL_MS)); - } - auto image_metadata = queue.get_metadata_buffer(slot_id); auto image_buffer = queue.get_data_buffer(slot_id); receiver.get_next_buffer(pulse_id, image_metadata, image_buffer); - queue.commit(); - pulse_id += image_metadata->n_images; } + queue.commit(); + } catch (const std::exception& e) { using namespace date; using namespace chrono; @@ -89,32 +89,34 @@ int main (int argc, char *argv[]) zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS); auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-"; - thread replay_receive_thread(receive_replay, - ctx, ipc_base, n_modules, - ref(queue), start_pulse_id, stop_pulse_id); - auto current_pulse_id = start_pulse_id; - // "<= stop_pulse_id" because we include the last pulse_id. - while (current_pulse_id <= stop_pulse_id) { + receive_replay( + ctx, ipc_base, n_modules,queue, start_pulse_id, stop_pulse_id); - int slot_id; - while((slot_id = queue.read()) == -1) { - this_thread::sleep_for(chrono::milliseconds( - RB_READ_RETRY_INTERVAL_MS)); - } - - auto metadata = queue.get_metadata_buffer(slot_id); - - for (int i_pulse=0; i_pulse < metadata->n_images; i_pulse++) { - cout << "Written image " << metadata->pulse_id[i_pulse] << endl; - - } - - queue.release(); - current_pulse_id += metadata->n_images; - } - - //wait till receive thread is finished - replay_receive_thread.join(); +// +// +// auto current_pulse_id = start_pulse_id; +// // "<= stop_pulse_id" because we include the last pulse_id. +// while (current_pulse_id <= stop_pulse_id) { +// +// int slot_id; +// while((slot_id = queue.read()) == -1) { +// this_thread::sleep_for(chrono::milliseconds( +// RB_READ_RETRY_INTERVAL_MS)); +// } +// +// auto metadata = queue.get_metadata_buffer(slot_id); +// +// for (int i_pulse=0; i_pulse < metadata->n_images; i_pulse++) { +// cout << "Written image " << metadata->pulse_id[i_pulse] << endl; +// +// } +// +// queue.release(); +// current_pulse_id += metadata->n_images; +// } +// +// //wait till receive thread is finished +// replay_receive_thread.join(); return 0; }