diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index efa691e..ba0a902 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -84,29 +84,44 @@ int main (int argc, char *argv[]) { exit(-1); } - string device = string(argv[1]); - string channel_name = string(argv[2]); - uint16_t module_id = (uint16_t) atoi(argv[3]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); - uint64_t stop_pulse_id = (uint64_t) atoll(argv[5]); + const string device = string(argv[1]); + const string channel_name = string(argv[2]); + const uint16_t module_id = (uint16_t) atoi(argv[3]); + const uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); + const uint64_t stop_pulse_id = (uint64_t) atoll(argv[5]); + + stringstream ipc_stream; + ipc_stream << "ipc://sf-replay-" << (int)module_id; + const auto ipc_address = ipc_stream.str(); + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + + cout << "[" << system_clock::now() << "]"; + cout << "[sf_replay::receive]"; + + cout << " device " << device; + cout << " channel_name " << channel_name; + cout << " module_id " << module_id; + cout << " start_pulse_id " << start_pulse_id; + cout << " stop_pulse_id " << stop_pulse_id; + cout << " ipc_address " << ipc_address; + #endif auto ctx = zmq_ctx_new(); auto socket = zmq_socket(ctx, ZMQ_PUSH); - int sndhwm = REPLAY_BLOCK_SIZE; - if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { + const int sndhwm = REPLAY_BLOCK_SIZE; + if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) throw runtime_error(strerror (errno)); - }; - int linger_ms = 0; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms))) { + + const int linger_ms = 0; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) throw runtime_error(strerror (errno)); - } - stringstream ipc_addr; - ipc_addr << "ipc://sf-replay-" << (int)module_id; - auto ipc = ipc_addr.str(); - if (zmq_connect(socket, ipc.c_str()) != 0) { + + if (zmq_connect(socket, ipc_address.c_str()) != 0) throw runtime_error(strerror (errno)); - } auto metadata_buffer = make_unique(); auto image_buffer = make_unique( @@ -119,21 +134,17 @@ int main (int argc, char *argv[]) { for (const auto& suffix:path_suffixes) { - string filename = - device + "/" + - channel_name + "/" + - suffix.path; + string filename = device + "/" + channel_name + "/" + suffix.path; - for ( - size_t i_batch=0; - i_batch < FILE_MOD; - i_batch + REPLAY_BLOCK_SIZE) + for (size_t file_index_offset=0; + file_index_offset < FILE_MOD; + file_index_offset += REPLAY_BLOCK_SIZE) { load_data_from_file( metadata_buffer.get(), (char*)(image_buffer.get()), filename, - i_batch); + file_index_offset); for (size_t i_frame=0; i_frame < REPLAY_BLOCK_SIZE; i_frame++) { @@ -145,10 +156,29 @@ int main (int argc, char *argv[]) { module_id }; - if (module_frame.pulse_id != current_pulse_id) { - cout << "Unexpected pulse_id for module " << module_id; - cout << " Got " << module_frame.pulse_id; - cout << " expected " << current_pulse_id; + if (current_pulse_id < start_pulse_id) { + current_pulse_id++; + continue; + } + + if (current_pulse_id > stop_pulse_id) { + // TODO: This will not work in production - linger. + exit(0); + } + + if (current_pulse_id != module_frame.pulse_id) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[sf_replay::receive]"; + err_msg << " Read unexpected pulse_id. "; + err_msg << " Expected " << current_pulse_id; + err_msg << " received " << module_frame.pulse_id; + err_msg << endl; + + throw runtime_error(err_msg.str()); } zmq_send(socket,