From cdc2e68921ff3ea7710713ab9365cb40f94528b5 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Thu, 23 Apr 2020 12:01:27 +0200 Subject: [PATCH] Multithreaded writer and receiver --- sf-writer/sf_h5_writer.cpp | 149 +++++++++++++++++++++++++------------ 1 file changed, 101 insertions(+), 48 deletions(-) diff --git a/sf-writer/sf_h5_writer.cpp b/sf-writer/sf_h5_writer.cpp index af4303c..9abede1 100644 --- a/sf-writer/sf_h5_writer.cpp +++ b/sf-writer/sf_h5_writer.cpp @@ -8,34 +8,17 @@ #include #include #include +#include using namespace std; using namespace core_buffer; -int main (int argc, char *argv[]) + +void receive_replay( + const string ipc_prefix, + const size_t n_modules, + RingBuffer& ring_buffer) { - if (argc != 4) { - cout << endl; - cout << "Usage: sf_h5_writer "; - cout << " [output_file] [start_pulse_id] [stop_pulse_id]"; - cout << endl; - cout << "\toutput_file: Complete path to the output file." << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; - cout << endl; - - exit(-1); - } - - string output_file = string(argv[1]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[2]); - uint64_t stop_pulse_id = (uint64_t) atoll(argv[3]); - - size_t n_modules = 32; - - H5Writer writer(output_file); - writer.create_file(); - auto ctx = zmq_ctx_new(); zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS); @@ -61,16 +44,11 @@ int main (int argc, char *argv[]) } auto metadata_buffer = make_unique(); - auto image_buffer = make_unique(n_modules * MODULE_N_PIXELS); - - int i_write = 0; - size_t total_ms = 0; - size_t max_ms = 0; + char* image_buffer = nullptr; while (true) { - uint64_t pulse_id = 0; - - auto start_time = chrono::steady_clock::now(); + auto rb_metadata = make_shared(); + image_buffer = ring_buffer.reserve(rb_metadata); for (size_t i=0; ipulse_id; + rb_metadata->pulse_id = metadata_buffer->pulse_id; + rb_metadata->frame_index = metadata_buffer->frame_index; + rb_metadata->daq_rec = metadata_buffer->daq_rec; + rb_metadata->n_received_packets = + metadata_buffer->n_received_packets; } - if (pulse_id != metadata_buffer->pulse_id) { - cout << "Module " << i << " pulse " << metadata_buffer->pulse_id; - cout << " instead of " << pulse_id << endl; + if (rb_metadata->pulse_id != metadata_buffer->pulse_id) { + throw runtime_error("Unexpected pulse_id received."); } auto n_bytes_image = zmq_recv( sockets[i], - (image_buffer.get() + (512*1024*i)), - 512 * 1024 * 2, + (image_buffer + (MODULE_N_PIXELS*i)), + MODULE_N_BYTES, 0); - if (n_bytes_image != 512 * 1024 * 2) { - cout << "n_bytes_image " << n_bytes_image << endl; + if (n_bytes_image != MODULE_N_BYTES) { + // TODO: Make nicer expcetion. throw runtime_error("Unexpected number of bytes in image."); } } -// writer.write_data("image", i_write, (char*) (image_buffer.get()), -// {32*512, 1024}, 32*512*1024*2, "uint16", "little"); + ring_buffer.commit(rb_metadata); + } + + for (size_t i=0; i ring_buffer(5); + + string ipc_prefix = "ipc://sf-replay-"; + size_t n_modules = 32; + + thread replay_receive_thread( + receive_replay, + ipc_prefix, + n_modules, + ref(ring_buffer)); + + H5Writer writer(output_file); + writer.create_file(); + + // TODO: Remove stats trash. + int i_write = 0; + size_t total_ms = 0; + size_t max_ms = 0; + + for ( + size_t current_pulse_id=start_pulse_id; + current_pulse_id <= stop_pulse_id; + current_pulse_id++) + { + auto start_time = chrono::steady_clock::now(); + + auto received_data = ring_buffer.read(); + + // .first is nullptr if ringbuffer is empty. + if(received_data.first == nullptr) { + this_thread::sleep_for(chrono::milliseconds( + config::ring_buffer_read_retry_interval)); + + // TODO: Very ugly hack. Make it nicer. + current_pulse_id--; + continue; + } + + auto metadata = received_data.first; + auto data = received_data.second; + + cout << "Received pulse_id " << metadata->pulse_id << endl; + + if (metadata->pulse_id != current_pulse_id) { + cout << "ERROR expecting " << current_pulse_id << endl; + } + + // TODO: Write to H5 + i_write++; auto end_time = chrono::steady_clock::now(); @@ -126,15 +184,10 @@ int main (int argc, char *argv[]) total_ms = 0; max_ms = 0; } + } writer.close_file(); - for (size_t i=0; i