From da4108832d68a39543840b5dcb0e35b8bc818582 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Thu, 7 May 2020 17:00:23 +0200 Subject: [PATCH] Pass start_pulse_id to recv thread and check pulses --- sf-buffer/src/sf_writer.cpp | 50 ++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/sf-buffer/src/sf_writer.cpp b/sf-buffer/src/sf_writer.cpp index f1cb3ed..3d407db 100644 --- a/sf-buffer/src/sf_writer.cpp +++ b/sf-buffer/src/sf_writer.cpp @@ -18,7 +18,8 @@ void receive_replay( const string ipc_prefix, const size_t n_modules, FastQueue& queue, - void* ctx) + void* ctx, + const uint64_t start_pulse_id) { try { @@ -46,6 +47,7 @@ void receive_replay( } auto module_meta_buffer = make_unique(); + uint64_t current_pulse_id = start_pulse_id; while (true) { @@ -73,8 +75,22 @@ void receive_replay( 0); if (n_bytes_metadata != sizeof(ModuleFrame)) { - // TODO: Make nicer expcetion. - throw runtime_error(strerror(errno)); + throw runtime_error("Wrong number of metadata bytes."); + } + + if (module_meta_buffer->pulse_id != current_pulse_id) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[sf_writer::receive_replay]"; + err_msg << " Read unexpected pulse_id. "; + err_msg << " Expected " << current_pulse_id; + err_msg << " received "; + err_msg << module_meta_buffer->pulse_id << endl; + + throw runtime_error(err_msg.str()); } // Initialize buffers in first iteration for each pulse_id. @@ -106,9 +122,10 @@ void receive_replay( 0); if (n_bytes_image != MODULE_N_BYTES) { - // TODO: Make nicer expcetion. - throw runtime_error("Unexpected number of bytes."); + throw runtime_error("Wrong number of data bytes."); } + + current_pulse_id++; } } @@ -163,11 +180,8 @@ int main (int argc, char *argv[]) zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS); thread replay_receive_thread( - receive_replay, - ipc_prefix, - n_modules, - ref(queue), - ctx); + receive_replay, ipc_prefix, n_modules, + ref(queue), ctx, start_pulse_id); size_t n_frames = stop_pulse_id - start_pulse_id; SFWriter writer(output_file, n_frames, n_modules); @@ -196,22 +210,6 @@ int main (int argc, char *argv[]) auto metadata = queue.get_metadata_buffer(slot_id); auto data = queue.get_data_buffer(slot_id); - // TODO: This verification should go in the recv part. -// if (metadata->pulse_id != current_pulse_id) { -// stringstream err_msg; -// -// using namespace date; -// using namespace chrono; -// err_msg << "[" << system_clock::now() << "]"; -// err_msg << "[sf_writer::main]"; -// err_msg << " Read unexpected pulse_id. "; -// err_msg << " Expected " << current_pulse_id; -// err_msg << " received " << metadata->pulse_id; -// err_msg << endl; -// -// throw runtime_error(err_msg.str()); -// } - auto read_end_time = chrono::steady_clock::now(); auto read_us_duration = chrono::duration_cast( read_end_time-start_time).count();