wait that receiver thread is finished, is_good_fame check added, poor man current pulse_id script

This commit is contained in:
Dmitry Ozerov
2020-05-09 23:25:46 +02:00
committed by Data Backend account
parent ed2bcab1ba
commit 8c33c9cc98
3 changed files with 28 additions and 3 deletions
+21 -3
View File
@@ -19,7 +19,8 @@ void receive_replay(
const size_t n_modules,
FastQueue<DetectorFrame>& queue,
void* ctx,
const uint64_t start_pulse_id)
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id)
{
try {
@@ -67,6 +68,8 @@ void receive_replay(
i_buffer++)
{
frame_meta_buffer->is_good_frame[i_buffer] = true;
for (size_t i_module = 0; i_module < n_modules; i_module++) {
auto n_bytes_metadata = zmq_recv(
sockets[i_module],
@@ -103,6 +106,17 @@ void receive_replay(
module_meta_buffer->daq_rec;
frame_meta_buffer->n_received_packets[i_buffer] =
module_meta_buffer->n_received_packets;
if ( module_meta_buffer->n_received_packets != 128 ) frame_meta_buffer->is_good_frame[i_buffer] = false;
} else {
if (module_meta_buffer->pulse_id != frame_meta_buffer->pulse_id[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false;
if (module_meta_buffer->frame_index != frame_meta_buffer->frame_index[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false;
if (module_meta_buffer->daq_rec != frame_meta_buffer->daq_rec[i_buffer]) frame_meta_buffer->is_good_frame[i_buffer] = false;
if (module_meta_buffer->n_received_packets != 128 ) frame_meta_buffer->is_good_frame[i_buffer] = false;
}
if (frame_meta_buffer->pulse_id[i_buffer] !=
@@ -130,8 +144,9 @@ void receive_replay(
}
queue.commit();
}
if ( current_pulse_id > stop_pulse_id ) break;
}
for (size_t i = 0; i < n_modules; i++) {
zmq_close(sockets[i]);
}
@@ -181,7 +196,7 @@ int main (int argc, char *argv[])
thread replay_receive_thread(
receive_replay, ipc_prefix, n_modules,
ref(queue), ctx, start_pulse_id);
ref(queue), ctx, start_pulse_id, stop_pulse_id);
size_t n_frames = stop_pulse_id - start_pulse_id;
SFWriter writer(output_file, n_frames, n_modules);
@@ -217,6 +232,7 @@ int main (int argc, char *argv[])
start_time = chrono::steady_clock::now();
writer.write(metadata, data);
queue.release();
current_pulse_id += WRITER_N_FRAMES_BUFFER;
@@ -257,5 +273,7 @@ int main (int argc, char *argv[])
writer.close_file();
//wait till receive thread is finished
replay_receive_thread.join();
return 0;
}