From 85660a92602f36b8b12172aa9fefb302f8198929 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 1 May 2020 12:41:39 +0200 Subject: [PATCH] Added new sf_live stub --- sf-buffer/src/sf_live.cpp | 125 ++++++++------------------------------ 1 file changed, 27 insertions(+), 98 deletions(-) diff --git a/sf-buffer/src/sf_live.cpp b/sf-buffer/src/sf_live.cpp index d2c4592..a319390 100644 --- a/sf-buffer/src/sf_live.cpp +++ b/sf-buffer/src/sf_live.cpp @@ -7,17 +7,11 @@ #include #include #include "date.h" +#include "LiveH5Reader.hpp" using namespace std; using namespace core_buffer; -struct FileBufferMetadata { - uint64_t pulse_id[REPLAY_READ_BLOCK_SIZE]; - uint64_t frame_index[REPLAY_READ_BLOCK_SIZE]; - uint32_t daq_rec[REPLAY_READ_BLOCK_SIZE]; - uint16_t n_received_packets[REPLAY_READ_BLOCK_SIZE]; -}; - void load_data_from_file ( FileBufferMetadata* metadata_buffer, char* image_buffer, @@ -84,107 +78,44 @@ void sf_live ( void* socket, const string& device, const string& channel_name, - const uint16_t source_id, - const uint64_t start_pulse_id) + const uint16_t source_id) { - auto metadata_buffer = make_unique(); - auto image_buffer = make_unique( - LIVE_READ_BLOCK_SIZE * MODULE_N_PIXELS); + auto metadata_buffer = make_unique(); + auto image_buffer = make_unique(MODULE_N_PIXELS); - auto latest_filename = ""; + const auto current_filename = device + "/" + channel_name + "/CURRENT"; - uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD; - base_pulse_id *= core_buffer::FILE_MOD; + LiveH5Reader reader(current_filename, source_id); - size_t current_pulse_id = base_pulse_id; - string filename_base = device + "/" + channel_name + "/"; + auto current_pulse_id = reader.get_latest_pulse_id(); - for (const auto& filename_suffix:path_suffixes) { + while (true) { - string filename = filename_base + filename_suffix.path; + reader.get_frame_metadata(current_pulse_id, metadata_buffer.get()); + + zmq_send(socket, + (char*)(metadata_buffer.get()), + sizeof(ModuleFrame), + ZMQ_SNDMORE); + + reader.get_frame_data(current_pulse_id, image_buffer.get()); + + zmq_send(socket, + (char*)(image_buffer.get()), + MODULE_N_BYTES, + 0); #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; - cout << "[sf_replay::sf_replay]"; - - cout << " Reading from filename " << filename << endl; + cout << "[sf_live::sf_live]"; + cout << " Sent pulse_id "; + cout << current_pulse_id << endl; #endif - for (size_t file_index_offset=0; - file_index_offset < FILE_MOD; - file_index_offset += REPLAY_READ_BLOCK_SIZE) - { - auto start_time = chrono::steady_clock::now(); - - load_data_from_file( - metadata_buffer.get(), - (char*)(image_buffer.get()), - filename, - file_index_offset); - - auto end_time = chrono::steady_clock::now(); - auto ms_duration = chrono::duration_cast( - end_time-start_time).count(); - - cout << "sf_replay:batch_read_ms " << ms_duration << endl; - - for (size_t i_frame=0; i_frame < REPLAY_READ_BLOCK_SIZE; i_frame++) { - - ModuleFrame module_frame = { - metadata_buffer->pulse_id[i_frame], - metadata_buffer->frame_index[i_frame], - metadata_buffer->daq_rec[i_frame], - metadata_buffer->n_received_packets[i_frame], - source_id - }; - - if (current_pulse_id < start_pulse_id) { - current_pulse_id++; - continue; - } - - if (current_pulse_id != module_frame.pulse_id) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[sf_live::sf_live]"; - err_msg << " Read unexpected pulse_id. "; - err_msg << " Expected " << current_pulse_id; - err_msg << " received " << module_frame.pulse_id; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - - zmq_send(socket, - &module_frame, - sizeof(ModuleFrame), - ZMQ_SNDMORE); - - auto buff_offset = i_frame * MODULE_N_PIXELS; - zmq_send(socket, - (char*)(image_buffer.get() + buff_offset), - MODULE_N_BYTES, - 0); - - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[sf_replay::sf_replay]"; - cout << " Sent pulse_id "; - cout << current_pulse_id << endl; - #endif - - current_pulse_id++; - } - } + current_pulse_id++; } } @@ -192,8 +123,7 @@ int main (int argc, char *argv[]) { if (argc != 6) { cout << endl; - cout << "Usage: sf_live [device]"; - cout << " [channel_name] [source_id] [start_pulse_id]"; + cout << "Usage: sf_live [device] [channel_name] [source_id]"; cout << endl; cout << "\tdevice: Name of detector." << endl; cout << "\tchannel_name: M00-M31 for JF16M." << endl; @@ -206,7 +136,6 @@ int main (int argc, char *argv[]) { const string device = string(argv[1]); const string channel_name = string(argv[2]); const uint16_t source_id = (uint16_t) atoi(argv[3]); - const uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); stringstream ipc_stream; ipc_stream << "ipc://sf-live-" << (int)source_id; @@ -241,7 +170,7 @@ int main (int argc, char *argv[]) { if (zmq_connect(socket, ipc_address.c_str()) != 0) throw runtime_error(strerror (errno)); - sf_live(socket, device, channel_name, source_id, start_pulse_id); + sf_live(socket, device, channel_name, source_id); zmq_close(socket); zmq_ctx_destroy(ctx);