Add more checks to sf_replay

This commit is contained in:
2020-04-23 17:47:57 +02:00
parent 537f5995c4
commit 66e47f2b51
+59 -29
View File
@@ -84,29 +84,44 @@ int main (int argc, char *argv[]) {
exit(-1);
}
string device = string(argv[1]);
string channel_name = string(argv[2]);
uint16_t module_id = (uint16_t) atoi(argv[3]);
uint64_t start_pulse_id = (uint64_t) atoll(argv[4]);
uint64_t stop_pulse_id = (uint64_t) atoll(argv[5]);
const string device = string(argv[1]);
const string channel_name = string(argv[2]);
const uint16_t module_id = (uint16_t) atoi(argv[3]);
const uint64_t start_pulse_id = (uint64_t) atoll(argv[4]);
const uint64_t stop_pulse_id = (uint64_t) atoll(argv[5]);
stringstream ipc_stream;
ipc_stream << "ipc://sf-replay-" << (int)module_id;
const auto ipc_address = ipc_stream.str();
#ifdef DEBUG_OUTPUT
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[sf_replay::receive]";
cout << " device " << device;
cout << " channel_name " << channel_name;
cout << " module_id " << module_id;
cout << " start_pulse_id " << start_pulse_id;
cout << " stop_pulse_id " << stop_pulse_id;
cout << " ipc_address " << ipc_address;
#endif
auto ctx = zmq_ctx_new();
auto socket = zmq_socket(ctx, ZMQ_PUSH);
int sndhwm = REPLAY_BLOCK_SIZE;
if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) {
const int sndhwm = REPLAY_BLOCK_SIZE;
if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0)
throw runtime_error(strerror (errno));
};
int linger_ms = 0;
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms))) {
const int linger_ms = 0;
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0)
throw runtime_error(strerror (errno));
}
stringstream ipc_addr;
ipc_addr << "ipc://sf-replay-" << (int)module_id;
auto ipc = ipc_addr.str();
if (zmq_connect(socket, ipc.c_str()) != 0) {
if (zmq_connect(socket, ipc_address.c_str()) != 0)
throw runtime_error(strerror (errno));
}
auto metadata_buffer = make_unique<FileBufferMetadata>();
auto image_buffer = make_unique<uint16_t[]>(
@@ -119,21 +134,17 @@ int main (int argc, char *argv[]) {
for (const auto& suffix:path_suffixes) {
string filename =
device + "/" +
channel_name + "/" +
suffix.path;
string filename = device + "/" + channel_name + "/" + suffix.path;
for (
size_t i_batch=0;
i_batch < FILE_MOD;
i_batch + REPLAY_BLOCK_SIZE)
for (size_t file_index_offset=0;
file_index_offset < FILE_MOD;
file_index_offset += REPLAY_BLOCK_SIZE)
{
load_data_from_file(
metadata_buffer.get(),
(char*)(image_buffer.get()),
filename,
i_batch);
file_index_offset);
for (size_t i_frame=0; i_frame < REPLAY_BLOCK_SIZE; i_frame++) {
@@ -145,10 +156,29 @@ int main (int argc, char *argv[]) {
module_id
};
if (module_frame.pulse_id != current_pulse_id) {
cout << "Unexpected pulse_id for module " << module_id;
cout << " Got " << module_frame.pulse_id;
cout << " expected " << current_pulse_id;
if (current_pulse_id < start_pulse_id) {
current_pulse_id++;
continue;
}
if (current_pulse_id > stop_pulse_id) {
// TODO: This will not work in production - linger.
exit(0);
}
if (current_pulse_id != module_frame.pulse_id) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[sf_replay::receive]";
err_msg << " Read unexpected pulse_id. ";
err_msg << " Expected " << current_pulse_id;
err_msg << " received " << module_frame.pulse_id;
err_msg << endl;
throw runtime_error(err_msg.str());
}
zmq_send(socket,