From d5fb4f4dc08d76bc58ff3d265dc52aa9b828d39e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 22 Apr 2020 15:12:19 +0200 Subject: [PATCH] Test partial buffering --- core-writer/include/RingBuffer.hpp | 10 +- sf-buffer/src/sf_replay.cpp | 146 ++++++++++++++++++++--------- 2 files changed, 108 insertions(+), 48 deletions(-) diff --git a/core-writer/include/RingBuffer.hpp b/core-writer/include/RingBuffer.hpp index 0f45edd..e2829f0 100644 --- a/core-writer/include/RingBuffer.hpp +++ b/core-writer/include/RingBuffer.hpp @@ -16,17 +16,17 @@ #pragma pack(1) struct FileBufferMetadata { // Needed by RingBuffer - const uint64_t frame_bytes_size = 2*512*1024*1000; + const uint64_t frame_bytes_size = 2*512*1024*50; uint64_t buffer_slot_index; uint64_t start_pulse_id; uint64_t stop_pulse_id; uint16_t module_id; - uint64_t pulse_id[1000]; - uint64_t frame_index[1000]; - uint32_t daq_rec[1000]; - uint16_t n_received_packets[1000]; + uint64_t pulse_id[50]; + uint64_t frame_index[50]; + uint32_t daq_rec[50]; + uint16_t n_received_packets[50]; }; #pragma pack(pop) diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index a1ee67b..26a7713 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -7,6 +7,7 @@ #include "BufferUtils.hpp" #include "zmq.h" +const int STREAM_BLOCK_SIZE = 50; using namespace std; @@ -40,7 +41,7 @@ int main (int argc, char *argv[]) { metadata_buffer->module_id = module_id; auto image_buffer = make_unique( - BufferUtils::FILE_MOD * 512 * 1024); + STREAM_BLOCK_SIZE * 512 * 1024); auto path_suffixes = BufferUtils::get_path_suffixes( start_pulse_id, stop_pulse_id); @@ -82,58 +83,117 @@ int main (int argc, char *argv[]) { H5::H5File input_file(filename, H5F_ACC_RDONLY); auto image_dataset = input_file.openDataSet("image"); - image_dataset.read( - image_buffer.get(), H5::PredType::NATIVE_UINT16); - auto pulse_id_dataset = input_file.openDataSet("pulse_id"); - pulse_id_dataset.read( - metadata_buffer->pulse_id, H5::PredType::NATIVE_UINT64); - auto frame_id_dataset = input_file.openDataSet("frame_id"); - frame_id_dataset.read( - metadata_buffer->frame_index, H5::PredType::NATIVE_UINT64); - auto daq_rec_dataset = input_file.openDataSet("daq_rec"); - daq_rec_dataset.read( - metadata_buffer->daq_rec, H5::PredType::NATIVE_UINT32); - auto received_packets_dataset = input_file.openDataSet("received_packets"); + + // Load first + + hsize_t file_dim[3] = {BufferUtils::FILE_MOD, 512, 1024}; + H5::DataSpace file_space (3, file_dim); + hsize_t b_count[] = {STREAM_BLOCK_SIZE, 512, 1024}; + hsize_t b_start[] = {0, 0, 0}; + file_space.selectHyperslab(H5S_SELECT_SET, b_count, b_start); + + image_dataset.read( + image_buffer.get(), H5::PredType::NATIVE_UINT16, + H5::DataSpace::ALL, file_space); + + hsize_t meta_dim[2] = {BufferUtils::FILE_MOD, 1}; + H5::DataSpace meta_space (2, meta_dim); + hsize_t m_count[] = {STREAM_BLOCK_SIZE, 1}; + hsize_t m_start[] = {0, 0, 0}; + meta_space.selectHyperslab(H5S_SELECT_SET, m_count, m_start); + + pulse_id_dataset.read( + metadata_buffer->pulse_id, H5::PredType::NATIVE_UINT64, + H5::DataSpace::ALL, meta_space); + + frame_id_dataset.read( + metadata_buffer->frame_index, H5::PredType::NATIVE_UINT64, + H5::DataSpace::ALL, meta_space); + + daq_rec_dataset.read( + metadata_buffer->daq_rec, H5::PredType::NATIVE_UINT32, + H5::DataSpace::ALL, meta_space); + received_packets_dataset.read( metadata_buffer->n_received_packets, - H5::PredType::NATIVE_UINT16); + H5::PredType::NATIVE_UINT16, + H5::DataSpace::ALL, meta_space); + + for ( + size_t start_pulse=0; + start_pulsepulse_id[i_frame], + metadata_buffer->frame_index[i_frame], + metadata_buffer->daq_rec[i_frame], + metadata_buffer->n_received_packets[i_frame], + module_id + }; + + zmq_send(socket, + &module_frame, + sizeof(ModuleFrame), + ZMQ_SNDMORE); + + zmq_send(socket, + (char*) (image_buffer.get() + (i_frame * 512 * 1024)), + 512 * 1024 * 2, + 0); + } + + // Load next + if (start_pulse + STREAM_BLOCK_SIZE < BufferUtils::FILE_MOD) { + + hsize_t file_dim[3] = {BufferUtils::FILE_MOD, 512, 1024}; + H5::DataSpace file_space (3, file_dim); + hsize_t b_count[] = {STREAM_BLOCK_SIZE, 512, 1024}; + hsize_t b_start[] = {start_pulse, 0, 0}; + file_space.selectHyperslab(H5S_SELECT_SET, b_count, b_start); + + image_dataset.read( + image_buffer.get(), H5::PredType::NATIVE_UINT16, + H5::DataSpace::ALL, file_space); + + hsize_t meta_dim[2] = {BufferUtils::FILE_MOD, 1}; + H5::DataSpace meta_space (2, meta_dim); + hsize_t m_count[] = {STREAM_BLOCK_SIZE, 1}; + hsize_t m_start[] = {start_pulse, 0, 0}; + meta_space.selectHyperslab(H5S_SELECT_SET, m_count, m_start); + + pulse_id_dataset.read( + metadata_buffer->pulse_id, H5::PredType::NATIVE_UINT64, + H5::DataSpace::ALL, meta_space); + + frame_id_dataset.read( + metadata_buffer->frame_index, H5::PredType::NATIVE_UINT64, + H5::DataSpace::ALL, meta_space); + + daq_rec_dataset.read( + metadata_buffer->daq_rec, H5::PredType::NATIVE_UINT32, + H5::DataSpace::ALL, meta_space); + + received_packets_dataset.read( + metadata_buffer->n_received_packets, + H5::PredType::NATIVE_UINT16, + H5::DataSpace::ALL, meta_space); + } + + // Wait for sync. + zmq_recv(meta_socket, nullptr, 0, 0); + + + } input_file.close(); - size_t send_counter = 0; - - for (size_t i_frame=0; i_framepulse_id[i_frame], - metadata_buffer->frame_index[i_frame], - metadata_buffer->daq_rec[i_frame], - metadata_buffer->n_received_packets[i_frame], - module_id - }; - - zmq_send(socket, - &module_frame, - sizeof(ModuleFrame), - ZMQ_SNDMORE); - - zmq_send(socket, - (char*) (image_buffer.get() + (i_frame * 512 * 1024)), - 512 * 1024 * 2, - 0); - - send_counter++; - - if (send_counter == 50) { - // Wait for the sync message. - zmq_recv(meta_socket, nullptr, 0, 0); - send_counter = 0; - } - } } zmq_close(socket);