From 8c33c9cc98e30e616fc4cbfe6e2b043d934e15f1 Mon Sep 17 00:00:00 2001 From: Dmitry Ozerov Date: Sat, 9 May 2020 23:25:46 +0200 Subject: [PATCH] wait that receiver thread is finished, is_good_fame check added, poor man current pulse_id script --- core-buffer/src/SFWriter.cpp | 1 + scripts/poor_pulseid_now.py | 6 ++++++ sf-buffer/src/sf_writer.cpp | 24 +++++++++++++++++++++--- 3 files changed, 28 insertions(+), 3 deletions(-) create mode 100644 scripts/poor_pulseid_now.py diff --git a/core-buffer/src/SFWriter.cpp b/core-buffer/src/SFWriter.cpp index 3e1d4a2..5e9f804 100644 --- a/core-buffer/src/SFWriter.cpp +++ b/core-buffer/src/SFWriter.cpp @@ -89,6 +89,7 @@ void SFWriter::write(const DetectorFrame* metadata, const char* data) { auto frame_index_data = (char*)(metadata->frame_index); auto daq_rec_data = (char*)(metadata->daq_rec); auto n_received_packets_data = (char*)(metadata->n_received_packets); + auto is_good_frame_data = (char*)(metadata->is_good_frame); hsize_t image_offset[] = {current_write_index_, 0, 0}; hsize_t metadata_offset [] = {current_write_index_, 0}; diff --git a/scripts/poor_pulseid_now.py b/scripts/poor_pulseid_now.py new file mode 100644 index 0000000..957b1d7 --- /dev/null +++ b/scripts/poor_pulseid_now.py @@ -0,0 +1,6 @@ +import datetime +#2020-05-08 08:29:52.742737 : 11718049010 +reference_date = datetime.datetime(2020, 5, 8, 8, 29, 52) +now = datetime.datetime.utcnow() +delta = (datetime.datetime.utcnow()-reference_date).total_seconds()*1000 +print(int(delta/10)+11718049010) diff --git a/sf-buffer/src/sf_writer.cpp b/sf-buffer/src/sf_writer.cpp index 608ddef..d88c0b9 100644 --- a/sf-buffer/src/sf_writer.cpp +++ b/sf-buffer/src/sf_writer.cpp @@ -19,7 +19,8 @@ void receive_replay( const size_t n_modules, FastQueue& queue, void* ctx, - const uint64_t start_pulse_id) + const uint64_t start_pulse_id, + const uint64_t stop_pulse_id) { try { @@ -67,6 +68,8 @@ void receive_replay( i_buffer++) { + frame_meta_buffer->is_good_frame[i_buffer] = true; + for (size_t i_module = 0; i_module < n_modules; i_module++) { auto n_bytes_metadata = zmq_recv( sockets[i_module], @@ -103,6 +106,17 @@ void receive_replay( module_meta_buffer->daq_rec; frame_meta_buffer->n_received_packets[i_buffer] = module_meta_buffer->n_received_packets; + + if ( module_meta_buffer->n_received_packets != 128 ) frame_meta_buffer->is_good_frame[i_buffer] = false; + + } else { + if (module_meta_buffer->pulse_id != frame_meta_buffer->pulse_id[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false; + + if (module_meta_buffer->frame_index != frame_meta_buffer->frame_index[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false; + + if (module_meta_buffer->daq_rec != frame_meta_buffer->daq_rec[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false; + + if (module_meta_buffer->n_received_packets != 128 ) frame_meta_buffer->is_good_frame[i_buffer] = false; } if (frame_meta_buffer->pulse_id[i_buffer] != @@ -130,8 +144,9 @@ void receive_replay( } queue.commit(); - } + if ( current_pulse_id > stop_pulse_id ) break; + } for (size_t i = 0; i < n_modules; i++) { zmq_close(sockets[i]); } @@ -181,7 +196,7 @@ int main (int argc, char *argv[]) thread replay_receive_thread( receive_replay, ipc_prefix, n_modules, - ref(queue), ctx, start_pulse_id); + ref(queue), ctx, start_pulse_id, stop_pulse_id); size_t n_frames = stop_pulse_id - start_pulse_id; SFWriter writer(output_file, n_frames, n_modules); @@ -217,6 +232,7 @@ int main (int argc, char *argv[]) start_time = chrono::steady_clock::now(); writer.write(metadata, data); + queue.release(); current_pulse_id += WRITER_N_FRAMES_BUFFER; @@ -257,5 +273,7 @@ int main (int argc, char *argv[]) writer.close_file(); + //wait till receive thread is finished + replay_receive_thread.join(); return 0; }