Pass start_pulse_id to recv thread and check pulses

This commit is contained in:
2020-05-07 17:00:23 +02:00
parent 36e73e108a
commit da4108832d
+24 -26
View File
@@ -18,7 +18,8 @@ void receive_replay(
const string ipc_prefix,
const size_t n_modules,
FastQueue<DetectorFrame>& queue,
void* ctx)
void* ctx,
const uint64_t start_pulse_id)
{
try {
@@ -46,6 +47,7 @@ void receive_replay(
}
auto module_meta_buffer = make_unique<ModuleFrame>();
uint64_t current_pulse_id = start_pulse_id;
while (true) {
@@ -73,8 +75,22 @@ void receive_replay(
0);
if (n_bytes_metadata != sizeof(ModuleFrame)) {
// TODO: Make nicer expcetion.
throw runtime_error(strerror(errno));
throw runtime_error("Wrong number of metadata bytes.");
}
if (module_meta_buffer->pulse_id != current_pulse_id) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[sf_writer::receive_replay]";
err_msg << " Read unexpected pulse_id. ";
err_msg << " Expected " << current_pulse_id;
err_msg << " received ";
err_msg << module_meta_buffer->pulse_id << endl;
throw runtime_error(err_msg.str());
}
// Initialize buffers in first iteration for each pulse_id.
@@ -106,9 +122,10 @@ void receive_replay(
0);
if (n_bytes_image != MODULE_N_BYTES) {
// TODO: Make nicer expcetion.
throw runtime_error("Unexpected number of bytes.");
throw runtime_error("Wrong number of data bytes.");
}
current_pulse_id++;
}
}
@@ -163,11 +180,8 @@ int main (int argc, char *argv[])
zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS);
thread replay_receive_thread(
receive_replay,
ipc_prefix,
n_modules,
ref(queue),
ctx);
receive_replay, ipc_prefix, n_modules,
ref(queue), ctx, start_pulse_id);
size_t n_frames = stop_pulse_id - start_pulse_id;
SFWriter writer(output_file, n_frames, n_modules);
@@ -196,22 +210,6 @@ int main (int argc, char *argv[])
auto metadata = queue.get_metadata_buffer(slot_id);
auto data = queue.get_data_buffer(slot_id);
// TODO: This verification should go in the recv part.
// if (metadata->pulse_id != current_pulse_id) {
// stringstream err_msg;
//
// using namespace date;
// using namespace chrono;
// err_msg << "[" << system_clock::now() << "]";
// err_msg << "[sf_writer::main]";
// err_msg << " Read unexpected pulse_id. ";
// err_msg << " Expected " << current_pulse_id;
// err_msg << " received " << metadata->pulse_id;
// err_msg << endl;
//
// throw runtime_error(err_msg.str());
// }
auto read_end_time = chrono::steady_clock::now();
auto read_us_duration = chrono::duration_cast<chrono::microseconds>(
read_end_time-start_time).count();