From b33253bb52582977463643efac3a59683b0e90e5 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 24 Apr 2020 11:43:47 +0200 Subject: [PATCH] Refactor sf_replay to make it testable --- sf-buffer/src/sf_replay.cpp | 185 +++++++++++++++++++----------------- 1 file changed, 98 insertions(+), 87 deletions(-) diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index 816b51a..e1a0708 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -78,6 +78,102 @@ void load_data_from_file ( input_file.close(); } +void sf_replay ( + void* socket, + const string& device, + const string& channel_name, + const uint16_t module_id, + const uint64_t start_pulse_id, + const uint64_t stop_pulse_id) +{ + auto metadata_buffer = make_unique(); + auto image_buffer = make_unique( + REPLAY_BLOCK_SIZE * MODULE_N_PIXELS); + + auto path_suffixes = + BufferUtils::get_path_suffixes(start_pulse_id, stop_pulse_id); + + uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD; + base_pulse_id *= core_buffer::FILE_MOD; + + size_t current_pulse_id = base_pulse_id; + string filename_base = device + "/" + channel_name + "/"; + + for (const auto& suffix:path_suffixes) { + + string filename = filename_base + suffix.path; + + #ifdef DEBUG_OUTPUT + using namespace date; + using namespace chrono; + + cout << "[" << system_clock::now() << "]"; + cout << "[sf_replay::receive]"; + + cout << " Reading from filename " << filename << endl; + #endif + + for (size_t file_index_offset=0; + file_index_offset < FILE_MOD; + file_index_offset += REPLAY_BLOCK_SIZE) + { + load_data_from_file( + metadata_buffer.get(), + (char*)(image_buffer.get()), + filename, + file_index_offset); + + for (size_t i_frame=0; i_frame < REPLAY_BLOCK_SIZE; i_frame++) { + + ModuleFrame module_frame = { + metadata_buffer->pulse_id[i_frame], + metadata_buffer->frame_index[i_frame], + metadata_buffer->daq_rec[i_frame], + metadata_buffer->n_received_packets[i_frame], + module_id + }; + + if (current_pulse_id < start_pulse_id) { + current_pulse_id++; + continue; + } + + if (current_pulse_id > stop_pulse_id) { + return; + } + + if (current_pulse_id != module_frame.pulse_id) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[sf_replay::receive]"; + err_msg << " Read unexpected pulse_id. "; + err_msg << " Expected " << current_pulse_id; + err_msg << " received " << module_frame.pulse_id; + err_msg << endl; + + throw runtime_error(err_msg.str()); + } + + zmq_send(socket, + &module_frame, + sizeof(ModuleFrame), + ZMQ_SNDMORE); + + auto buff_offset = i_frame * MODULE_N_PIXELS; + zmq_send(socket, + (char*)(image_buffer.get() + buff_offset), + MODULE_N_BYTES, + 0); + + current_pulse_id++; + } + } + } +} + int main (int argc, char *argv[]) { if (argc != 6) { @@ -135,93 +231,8 @@ int main (int argc, char *argv[]) { if (zmq_connect(socket, ipc_address.c_str()) != 0) throw runtime_error(strerror (errno)); - auto metadata_buffer = make_unique(); - auto image_buffer = make_unique( - REPLAY_BLOCK_SIZE * MODULE_N_PIXELS); - - auto path_suffixes = - BufferUtils::get_path_suffixes(start_pulse_id, stop_pulse_id); - - uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD; - base_pulse_id *= core_buffer::FILE_MOD; - - size_t current_pulse_id = base_pulse_id; - string filename_base = device + "/" + channel_name + "/"; - - for (const auto& suffix:path_suffixes) { - - string filename = filename_base + suffix.path; - - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[sf_replay::receive]"; - - cout << " Reading from filename " << filename << endl; - #endif - - for (size_t file_index_offset=0; - file_index_offset < FILE_MOD; - file_index_offset += REPLAY_BLOCK_SIZE) - { - load_data_from_file( - metadata_buffer.get(), - (char*)(image_buffer.get()), - filename, - file_index_offset); - - for (size_t i_frame=0; i_frame < REPLAY_BLOCK_SIZE; i_frame++) { - - ModuleFrame module_frame = { - metadata_buffer->pulse_id[i_frame], - metadata_buffer->frame_index[i_frame], - metadata_buffer->daq_rec[i_frame], - metadata_buffer->n_received_packets[i_frame], - module_id - }; - - if (current_pulse_id < start_pulse_id) { - current_pulse_id++; - continue; - } - - if (current_pulse_id > stop_pulse_id) { - // TODO: This will not work in production - linger. - exit(0); - } - - if (current_pulse_id != module_frame.pulse_id) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[sf_replay::receive]"; - err_msg << " Read unexpected pulse_id. "; - err_msg << " Expected " << current_pulse_id; - err_msg << " received " << module_frame.pulse_id; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - - zmq_send(socket, - &module_frame, - sizeof(ModuleFrame), - ZMQ_SNDMORE); - - auto buff_offset = i_frame * MODULE_N_PIXELS; - zmq_send(socket, - (char*)(image_buffer.get() + buff_offset), - MODULE_N_BYTES, - 0); - - current_pulse_id++; - } - } - } + process(socket, device, channel_name, module_id, + start_pulse_id, stop_pulse_id); zmq_close(socket); zmq_ctx_destroy(ctx);