diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index 0c308d7..1d06cdc 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -26,8 +26,10 @@ namespace core_buffer { // Extension of our file format. const std::string FILE_EXTENSION = ".h5"; - // How many frames do we read at once during replay. - const size_t REPLAY_READ_BLOCK_SIZE = 100; + // How many frames do we buffer in send. + const size_t REPLAY_SNDHWM = 100; + + const std::string REPLAY_STREAM_IPC_URL = "ipc:///tmp/sf-replay-"; // Size of sf_buffer RB in elements. const size_t BUFFER_INTERNAL_QUEUE_SIZE = 1000; diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index 6b26788..2f627ef 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -1,64 +1,18 @@ #include #include #include "jungfrau.hpp" -#include "BufferUtils.hpp" + #include "zmq.h" #include "buffer_config.hpp" -#include + #include +#include #include "date.h" +#include "bitshuffle/bitshuffle.h" using namespace std; using namespace core_buffer; -void load_data_from_file ( - ModuleFrame* metadata_buffer, - char* image_buffer, - const string &filename, - const size_t start_index) -{ - H5::H5File input_file(filename, H5F_ACC_RDONLY); - - hsize_t b_image_dims[3] = - {REPLAY_READ_BLOCK_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DataSpace b_i_space (3, b_image_dims); - hsize_t b_i_count[] = - {REPLAY_READ_BLOCK_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; - hsize_t b_i_start[] = {0, 0, 0}; - b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); - - hsize_t f_image_dims[3] = {FILE_MOD, MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DataSpace f_i_space (3, f_image_dims); - hsize_t f_i_count[] = - {REPLAY_READ_BLOCK_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; - hsize_t f_i_start[] = {start_index, 0, 0}; - f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start); - - auto image_dataset = input_file.openDataSet("image"); - image_dataset.read( - image_buffer, H5::PredType::NATIVE_UINT16, - b_i_space, f_i_space); - - hsize_t b_metadata_dims[2] = {REPLAY_READ_BLOCK_SIZE, ModuleFrame_N_FIELDS}; - H5::DataSpace b_m_space (2, b_metadata_dims); - hsize_t b_m_count[] = {REPLAY_READ_BLOCK_SIZE, ModuleFrame_N_FIELDS}; - hsize_t b_m_start[] = {0, 0}; - b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); - - hsize_t f_metadata_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS}; - H5::DataSpace f_m_space (2, f_metadata_dims); - hsize_t f_m_count[] = {REPLAY_READ_BLOCK_SIZE, ModuleFrame_N_FIELDS}; - hsize_t f_m_start[] = {start_index, 0}; - f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start); - - auto metadata_dataset = input_file.openDataSet("metadata"); - metadata_dataset.read( - (char*) metadata_buffer, H5::PredType::NATIVE_UINT64, - b_m_space, f_m_space); - - input_file.close(); -} - void sf_replay ( void* socket, const string& device, @@ -66,93 +20,115 @@ void sf_replay ( const uint64_t start_pulse_id, const uint64_t stop_pulse_id) { - auto metadata_buffer = make_unique(REPLAY_READ_BLOCK_SIZE); - auto image_buffer = make_unique( - REPLAY_READ_BLOCK_SIZE * MODULE_N_PIXELS); + CompressedModuleFrame metadata_buffer; + auto frame_buffer = make_unique(MODULE_N_PIXELS); - auto path_suffixes = - BufferUtils::get_path_suffixes(start_pulse_id, stop_pulse_id); + auto compressed_buffer_size = bshuf_compress_lz4_bound( + MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS); + auto compressed_buffer = make_unique(compressed_buffer_size); - uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD; - base_pulse_id *= core_buffer::FILE_MOD; + ReplayH5Reader file_reader( + device, channel_name, start_pulse_id, stop_pulse_id); - size_t current_pulse_id = base_pulse_id; - string filename_base = core_buffer::BUFFER_BASE_DIR + "/" + device + "/" + channel_name + "/"; + //TODO: Add statstics. + uint64_t stats_counter = 0; - for (const auto& filename_suffix:path_suffixes) { + uint64_t total_read_us = 0; + uint64_t max_read_us = 0; + uint64_t total_compress_us = 0; + uint64_t max_compress_us = 0; + uint64_t total_send_us = 0; + uint64_t max_send_us = 0; - string filename = filename_base + filename_suffix.path; + uint64_t total_original_size = 0; + uint64_t total_compressed_size = 0; - for (size_t file_index_offset=0; - file_index_offset < FILE_MOD; - file_index_offset += REPLAY_READ_BLOCK_SIZE) - { - auto start_time = chrono::steady_clock::now(); + // "<= stop_pulse_id" because we include the stop_pulse_id in the file. + for ( + uint64_t curr_pulse_id = start_pulse_id; + curr_pulse_id <= stop_pulse_id; + curr_pulse_id++) { - load_data_from_file( - metadata_buffer.get(), - (char*)(image_buffer.get()), - filename, - file_index_offset); + auto start_time = chrono::steady_clock::now(); - auto end_time = chrono::steady_clock::now(); - auto ms_duration = chrono::duration_cast( - end_time-start_time).count(); + file_reader.get_frame( + curr_pulse_id, + &(metadata_buffer.module_frame), + (char*)(frame_buffer.get())); - cout << "sf_replay:batch_read_ms " << ms_duration << endl; + auto end_time = chrono::steady_clock::now(); + auto read_us_duration = chrono::duration_cast( + end_time-start_time).count(); - for ( - size_t i_frame=0; - i_frame < REPLAY_READ_BLOCK_SIZE; - i_frame++) { + start_time = chrono::steady_clock::now(); - auto current_frame = (metadata_buffer.get())[i_frame]; + auto compressed_size = bshuf_compress_lz4( + frame_buffer.get(), compressed_buffer.get(), + MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS); - if (current_pulse_id < start_pulse_id) { - current_pulse_id++; - continue; - } + if (compressed_size < 0) { + throw runtime_error("Error while compressing buffer."); + } - if (current_pulse_id > stop_pulse_id) { - cout << "Done. Streamed images from "; - cout << start_pulse_id << " to " << stop_pulse_id; - cout << endl; - return; - } + metadata_buffer.compressed_size = compressed_size; - // The buffer did not write this pulse id. - if (current_frame.pulse_id == 0) { - cout << "pulse_id " << current_pulse_id; - cout << " missing in buffer file." << endl; - // Wrong frame in the buffer file. - } else if (current_pulse_id != current_frame.pulse_id) { - stringstream err_msg; + end_time = chrono::steady_clock::now(); + auto compress_us_duration = chrono::duration_cast( + end_time-start_time).count(); - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[sf_replay::receive]"; - err_msg << " Read unexpected pulse_id. "; - err_msg << " Expected " << current_pulse_id; - err_msg << " received " << current_frame.pulse_id; - err_msg << endl; + start_time = chrono::steady_clock::now(); - throw runtime_error(err_msg.str()); - } + zmq_send(socket, + &metadata_buffer, + sizeof(CompressedModuleFrame), + ZMQ_SNDMORE); + zmq_send(socket, + (char*)(frame_buffer.get()), + compressed_size, + 0); - zmq_send(socket, - ¤t_frame, - sizeof(ModuleFrame), - ZMQ_SNDMORE); + end_time = chrono::steady_clock::now(); + auto send_us_duration = chrono::duration_cast( + end_time-start_time).count(); - auto buff_offset = i_frame * MODULE_N_PIXELS; - zmq_send(socket, - (char*)(image_buffer.get() + buff_offset), - MODULE_N_BYTES, - 0); + // TODO: Make proper stastistics. + stats_counter++; + total_read_us += read_us_duration; + max_read_us = max(max_read_us, (uint64_t)read_us_duration); - current_pulse_id++; - } + total_compress_us += compress_us_duration; + max_compress_us = max(max_compress_us, (uint64_t)compress_us_duration); + + total_send_us += send_us_duration; + max_send_us = max(max_send_us, (uint64_t)send_us_duration); + + total_compressed_size += compressed_size; + total_original_size += MODULE_N_BYTES + sizeof(CompressedModuleFrame); + + if (stats_counter == STATS_MODULO) { + cout << "sf_replay:avg_read_us " << total_read_us/STATS_MODULO; + cout << " sf_replay:max_read_us " << max_read_us; + + cout << " sf_replay:avg_compress_us "; + cout << total_compress_us/STATS_MODULO; + cout << " sf_replay:max_compress_us " << max_compress_us; + + cout << " sf_replay:avg_send_us " << total_send_us/STATS_MODULO; + cout << " sf_replay:max_send_us " << max_send_us; + + cout << " sf_replay:compress_ratio "; + cout << total_compressed_size/total_original_size; + cout << endl; + + stats_counter = 0; + total_read_us = 0; + max_read_us = 0; + total_compress_us = 0; + max_compress_us = 0; + total_send_us = 0; + max_send_us = 0; + total_original_size = 0; + total_compressed_size = 0; } } } @@ -162,11 +138,11 @@ int main (int argc, char *argv[]) { if (argc != 6) { cout << endl; cout << "Usage: sf_replay [device]"; - cout << " [channel_name] [module_id] [start_pulse_id] [stop_pulse_id]"; + cout << " [channel_name] [source_id] [start_pulse_id] [stop_pulse_id]"; cout << endl; cout << "\tdevice: Name of detector." << endl; cout << "\tchannel_name: M00-M31 for JF16M." << endl; - cout << "\tmodule_id: Module index" << endl; + cout << "\tsource_id: Module index" << endl; cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; cout << endl; @@ -176,18 +152,18 @@ int main (int argc, char *argv[]) { const string device = string(argv[1]); const string channel_name = string(argv[2]); - const auto module_id = (uint16_t) atoi(argv[3]); + const auto source_id = (uint16_t) atoi(argv[3]); const auto start_pulse_id = (uint64_t) atoll(argv[4]); const auto stop_pulse_id = (uint64_t) atoll(argv[5]); stringstream ipc_stream; - ipc_stream << "ipc:///tmp/sf-replay-" << (int)module_id; + ipc_stream << REPLAY_STREAM_IPC_URL << (int)source_id; const auto ipc_address = ipc_stream.str(); auto ctx = zmq_ctx_new(); auto socket = zmq_socket(ctx, ZMQ_PUSH); - const int sndhwm = REPLAY_READ_BLOCK_SIZE; + const int sndhwm = REPLAY_SNDHWM; if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) throw runtime_error(strerror (errno));