From 759803389ecbfbdbce78455cbc288b14254471d8 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 27 May 2020 12:21:20 +0200 Subject: [PATCH] Major writer re-write --- sf-writer/src/main.cpp | 113 +++++++++++++++++++++-------------------- 1 file changed, 58 insertions(+), 55 deletions(-) diff --git a/sf-writer/src/main.cpp b/sf-writer/src/main.cpp index d4da930..92f6e8a 100644 --- a/sf-writer/src/main.cpp +++ b/sf-writer/src/main.cpp @@ -1,63 +1,58 @@ #include #include -#include "buffer_config.hpp" -#include "zmq.h" #include -#include #include #include -#include "JFH5Writer.hpp" -#include #include + #include "date.h" +#include "zmq.h" +#include "jungfrau.hpp" +#include "buffer_config.hpp" #include "bitshuffle/bitshuffle.h" #include "WriterZmqReceiver.hpp" +#include "JFH5Writer.hpp" +#include "BufferBinaryReader.hpp" using namespace std; using namespace core_buffer; +using namespace chrono; -void receive_replay( - void* ctx, - const string ipc_prefix, - const size_t n_modules, - FastQueue& queue, - const uint64_t start_pulse_id, - const uint64_t stop_pulse_id) +void read_buffer( + const string device, + const string channel_name, + const vector& blocks) { - try { - WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules, stop_pulse_id); + BufferBinaryReader block_reader(device, channel_name); - uint64_t current_pulse_id=start_pulse_id; - // "<= stop_pulse_id" because we include the last pulse_id. - while(current_pulse_id<=stop_pulse_id) { - int slot_id; - while((slot_id = queue.reserve()) == -1) { - this_thread::sleep_for(chrono::milliseconds( - RB_READ_RETRY_INTERVAL_MS)); - } + // "<= stop_block" because we include the stop_block in the transfer. + for (uint64_t curr_block=start_block; + curr_block <= stop_block; + curr_block++) { - auto metadata = queue.get_metadata_buffer(slot_id); - auto buffer = queue.get_data_buffer(slot_id); - - receiver.get_next_buffer( - current_pulse_id, metadata, buffer); - - queue.commit(); - current_pulse_id += metadata->n_images; + int slot_id; + while((slot_id = queue.reserve()) == -1) { + this_thread::sleep_for(chrono::milliseconds( + RB_READ_RETRY_INTERVAL_MS)); } - } catch (const std::exception& e) { - using namespace date; - using namespace chrono; + auto start_time = steady_clock::now(); - cout << "[" << system_clock::now() << "]"; - cout << "[sf_writer::receive_replay]"; - cout << " Stopped because of exception: " << endl; - cout << e.what() << endl; + auto block_buffer = queue.get_metadata_buffer(slot_id); - throw; + block_reader.get_block(curr_block, block_buffer); + + auto end_time = steady_clock::now(); + uint64_t read_us_duration = duration_cast( + end_time-start_time).count(); + + queue.commit(); + + // TODO: Proper statistics + cout << "sf_replay:avg_read_us "; + cout << read_us_duration / BUFFER_BLOCK_SIZE << endl; } } @@ -65,11 +60,11 @@ int main (int argc, char *argv[]) { if (argc != 5) { cout << endl; - cout << "Usage: sf_writer "; - cout << " [ipc_id] [output_file] [start_pulse_id] [stop_pulse_id]"; + cout << "Usage: sf_writer [output_file] [device]"; + cout << " [start_pulse_id] [stop_pulse_id]"; cout << endl; - cout << "\tipc_id: Unique identifier for ipc." << endl; cout << "\toutput_file: Complete path to the output file." << endl; + cout << "\tdevice: Name of detector." << endl; cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; cout << endl; @@ -77,24 +72,32 @@ int main (int argc, char *argv[]) exit(-1); } - const string ipc_id = string(argv[1]); - string output_file = string(argv[2]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[3]); - uint64_t stop_pulse_id = (uint64_t) atoll(argv[4]); - + string output_file = string(argv[1]); + const string device = string(argv[2]); + uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); + uint64_t stop_pulse_id = (uint64_t) atoll(argv[5]); size_t n_modules = 32; - FastQueue queue( - MODULE_N_BYTES * n_modules * WRITER_DATA_CACHE_N_IMAGES, - WRITER_FASTQUEUE_N_SLOTS); + uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE; + uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE; + auto n_blocks = stop_block - start_block + 1; - auto ctx = zmq_ctx_new(); - zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS); + // Generate list of buffer blocks that need to be loaded. + std::vector buffer_blocks(n_blocks); + for (uint64_t curr_block=start_block; + curr_block<=stop_block; + curr_block++) { + buffer_blocks.push_back(curr_block); + } - auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-"; - thread replay_receive_thread(receive_replay, - ctx, ipc_base, n_modules, - ref(queue), start_pulse_id, stop_pulse_id); + std::vector reading_threads(n_modules); + for (size_t i_module=0; i_module