diff --git a/sf-writer/BufferMultiReader.cpp b/sf-writer/BufferMultiReader.cpp index 2e4c89b..4ba6019 100644 --- a/sf-writer/BufferMultiReader.cpp +++ b/sf-writer/BufferMultiReader.cpp @@ -1,139 +1,82 @@ -#include -#include "BufferMultiReader.hpp" -#include "BufferUtils.hpp" -#include - -using namespace std; - -BufferMultiReader::BufferMultiReader( - const std::string& root_folder) : - root_folder_(root_folder), - is_running_(true), - pulse_id_(0) -{ - frame_buffer_ = new uint16_t[32*512*1024]; - frame_metadata_buffer_ = new UdpFrameMetadata[32]; - - for (uint8_t i_reader=0; i_reader < 32; i_reader++) { - receiving_threads_.emplace_back( - &BufferMultiReader::read_thread, this, i_reader); - } -} - -BufferMultiReader::~BufferMultiReader() -{ - is_running_ = false; - - this_thread::sleep_for(chrono::milliseconds(100)); - - for (auto& recv_thread:receiving_threads_) { - if (recv_thread.joinable()) { - recv_thread.join(); - } - } - - delete[] frame_buffer_; - delete[] frame_metadata_buffer_; -} - -char* BufferMultiReader::get_buffer() -{ - return (char*) frame_buffer_; -} - - -UdpFrameMetadata BufferMultiReader::load_frame_to_buffer( - const uint64_t pulse_id) -{ - memset(frame_buffer_, 0, 2*32*512*1024); - memset(frame_metadata_buffer_, 0, 32*sizeof(UdpFrameMetadata)); - - pulse_id_ = pulse_id; - n_modules_left_ = 32; - - while (n_modules_left_ > 0) { - this_thread::sleep_for(chrono::milliseconds(5)); - } - - UdpFrameMetadata metadata = frame_metadata_buffer_[0]; - return metadata; -} - -void BufferMultiReader::read_thread(uint8_t module_number) -{ - stringstream name; - name << "M"; - if (module_number < 10) name << "0"; - name << (int) module_number; - - string device_name = name.str(); - size_t buffer_offset = 512*1024*module_number; - - string current_filename = ""; - uint64_t last_pulse_id = 0; - - auto image_buffer = new uint16_t[1000*512*1024]; - auto pulse_id_buffer = new uint64_t[1000]; - auto frame_id_buffer = new uint64_t[1000]; - auto daq_rec_buffer = new uint32_t[1000]; - auto received_packets_buffer = new uint16_t[1000]; - - while (is_running_) { - if (last_pulse_id == pulse_id_) { - this_thread::sleep_for(chrono::milliseconds(1)); - continue; - } - last_pulse_id = pulse_id_; - - auto pulse_filename = BufferUtils::get_filename( - root_folder_, device_name, last_pulse_id); - - if (pulse_filename != current_filename) { - - H5::H5File input_file(pulse_filename, - H5F_ACC_RDONLY | H5F_ACC_SWMR_READ); - - - auto image_dataset = input_file.openDataSet("image"); - auto pulse_id_dataset = input_file.openDataSet("pulse_id"); - auto frame_id_dataset = input_file.openDataSet("frame_id"); - auto daq_rec_dataset = input_file.openDataSet("daq_rec"); - auto received_packets_dataset = - input_file.openDataSet("received_packets"); - - image_dataset.read( - image_buffer, H5::PredType::NATIVE_UINT16); - pulse_id_dataset.read( - pulse_id_buffer, H5::PredType::NATIVE_UINT64); - frame_id_dataset.read( - frame_id_buffer, H5::PredType::NATIVE_UINT64); - daq_rec_dataset.read( - daq_rec_buffer, H5::PredType::NATIVE_UINT32); - received_packets_dataset.read( - received_packets_buffer, H5::PredType::NATIVE_UINT16); - - current_filename = pulse_filename; - } - - auto file_frame_index = - BufferUtils::get_file_frame_index(last_pulse_id); - - memcpy( - (char*) (frame_buffer_ + buffer_offset), - (char*)(image_buffer + (file_frame_index*512*1024)), - 512*1024*2); - - UdpFrameMetadata metadata; - metadata.pulse_id = pulse_id_buffer[file_frame_index]; - metadata.frame_index = frame_id_buffer[file_frame_index]; - metadata.daq_rec = daq_rec_buffer[file_frame_index]; - metadata.n_recv_packets = received_packets_buffer[file_frame_index]; - - memcpy( - (char*) (frame_metadata_buffer_ + module_number), - &metadata, - sizeof(UdpFrameMetadata)); - - n_modules_left_--; - } -} +//#include +//#include "BufferMultiReader.hpp" +//#include "BufferUtils.hpp" +//#include +// +//using namespace std; +// +//BufferMultiReader::BufferMultiReader( +// const std::string& root_folder) : +// root_folder_(root_folder), +// is_running_(true), +// pulse_id_(0) +//{ +// frame_buffer_ = new uint16_t[1000*32*512*1024]; +// frame_metadata_buffer_ = new UdpFrameMetadata[32]; +//} +// +//BufferMultiReader::~BufferMultiReader() +//{ +// delete[] frame_buffer_; +// delete[] frame_metadata_buffer_; +//} +// +//char* BufferMultiReader::get_buffer() +//{ +// return (char*) frame_buffer_; +//} +// +// +//UdpFrameMetadata BufferMultiReader::load_frame_to_buffer( +// const uint64_t pulse_id) +//{ +// if (pulse_id >= min_cached_pulse_id_ && pulse_id <= max_cached_pulse_id_) { +// auto file_frame_index = BufferUtils::get_file_frame_index(pulse_id); +// return frame_metadata_buffer_[file_frame_index]; +// } +// +// for (size_t i_module; i_module<32; i_module++) { +// stringstream name; +// name << "M"; +// if (i_module < 10) name << "0"; +// name << (int) i_module; +// string device_name = name.str(); +// +// auto pulse_filename = BufferUtils::get_filename( +// root_folder_, device_name, pulse_id); +// +// H5::H5File input_file(pulse_filename, +// H5F_ACC_RDONLY | H5F_ACC_SWMR_READ); +// +// auto image_dataset = input_file.openDataSet("image"); +// auto pulse_id_dataset = input_file.openDataSet("pulse_id"); +// auto frame_id_dataset = input_file.openDataSet("frame_id"); +// auto daq_rec_dataset = input_file.openDataSet("daq_rec"); +// auto received_packets_dataset = +// input_file.openDataSet("received_packets"); +// +// hsize_t buff_dim[3] = {1000, 32*512, 1024}; +// H5::DataSpace buffer_space (3, buff_dim); +// hsize_t b_offset[] = {0, i_module*512, 0}; +// hsize_t b_stride[] = {n_new_pulses, 512, 1024}; +// hsize_t b_block[] = {1, 512, 1024}; +// hsize_t b_count[] = {1000, 1, 1}; +// buffer_space.selectHyperslab( +// H5S_SELECT_SET, b_count, b_offset, b_stride, b_block); +// +// image_dataset.read( +// image_buffer_, H5::PredType::NATIVE_UINT16, +// buffer_space); +// pulse_id_dataset.read( +// pulse_id_buffer, H5::PredType::NATIVE_UINT64); +// frame_id_dataset.read( +// frame_id_buffer, H5::PredType::NATIVE_UINT64); +// daq_rec_dataset.read( +// daq_rec_buffer, H5::PredType::NATIVE_UINT32); +// received_packets_dataset.read( +// received_packets_buffer, H5::PredType::NATIVE_UINT16); +// } +// +// UdpFrameMetadata metadata = frame_metadata_buffer_[0]; +// return metadata; +//} diff --git a/sf-writer/BufferMultiReader.hpp b/sf-writer/BufferMultiReader.hpp index ada9db7..bbc995a 100644 --- a/sf-writer/BufferMultiReader.hpp +++ b/sf-writer/BufferMultiReader.hpp @@ -1,31 +1,43 @@ -#ifndef BUFFERMULTIREADER_H -#define BUFFERMULTIREADER_H - -#include -#include "RingBuffer.hpp" - -class BufferMultiReader -{ - const std::string root_folder_; - std::atomic_bool is_running_; - uint16_t* frame_buffer_; - UdpFrameMetadata* frame_metadata_buffer_; - - std::atomic_uint n_modules_left_; - std::atomic_uint64_t pulse_id_; - std::vector receiving_threads_; - -protected: - void read_thread(uint8_t module_number); - -public: - BufferMultiReader(const std::string& root_folder); - - virtual ~BufferMultiReader(); - - char* get_buffer(); - - UdpFrameMetadata load_frame_to_buffer(const uint64_t pulse_id); -}; - -#endif \ No newline at end of file +//#ifndef BUFFERMULTIREADER_H +//#define BUFFERMULTIREADER_H +// +//#include +//#include "RingBuffer.hpp" +// +// +//#pragma pack(push) +//#pragma pack(1) +//struct BufferMetadata { +// uint64_t pulse_id; +// uint64_t frame_id; +// uint32_t daq_rac; +// uint16_t n_recv_frames; +//}; +//#pragma pack(pop) +// +//class BufferMultiReader +//{ +// const std::string root_folder_; +// std::atomic_bool is_running_; +// uint16_t* frame_buffer_; +// UdpFrameMetadata* frame_metadata_buffer_; +// +// uint64_t min_cached_pulse_id_; +// uint64_t max_cached_pulse_id_; +// +// +// +//protected: +// void read_thread(uint8_t module_number); +// +//public: +// BufferMultiReader(const std::string& root_folder); +// +// virtual ~BufferMultiReader(); +// +// char* get_buffer(); +// +// UdpFrameMetadata load_frame_to_buffer(const uint64_t pulse_id); +//}; +// +//#endif \ No newline at end of file