From d0df2677defea40fdbd37cfbc5875db82e7852b3 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 17 Feb 2021 12:46:58 +0100 Subject: [PATCH] Remove old writer --- CMakeLists.txt | 1 - jf-live-writer/include/ImageBinaryWriter.hpp | 36 --- jf-live-writer/src/ImageBinaryWriter.cpp | 143 ----------- sf-writer/CMakeLists.txt | 22 -- sf-writer/README.md | 42 --- sf-writer/include/BufferBinaryReader.hpp | 28 -- sf-writer/include/ImageAssembler.hpp | 53 ---- sf-writer/include/writer_config.hpp | 9 - sf-writer/src/BufferBinaryReader.cpp | 107 -------- sf-writer/src/ImageAssembler.cpp | 186 -------------- sf-writer/src/main.cpp | 158 ------------ sf-writer/test/CMakeLists.txt | 10 - sf-writer/test/main.cpp | 10 - sf-writer/test/mock/data.hpp | 72 ------ sf-writer/test/perf/perf_WriterH5Writer.cpp | 90 ------- sf-writer/test/test_ImageAssembler.cpp | 90 ------- sf-writer/test/test_JFH5Writer.cpp | 254 ------------------- 17 files changed, 1311 deletions(-) delete mode 100644 jf-live-writer/include/ImageBinaryWriter.hpp delete mode 100644 jf-live-writer/src/ImageBinaryWriter.cpp delete mode 100644 sf-writer/CMakeLists.txt delete mode 100644 sf-writer/README.md delete mode 100644 sf-writer/include/BufferBinaryReader.hpp delete mode 100644 sf-writer/include/ImageAssembler.hpp delete mode 100644 sf-writer/include/writer_config.hpp delete mode 100644 sf-writer/src/BufferBinaryReader.cpp delete mode 100644 sf-writer/src/ImageAssembler.cpp delete mode 100644 sf-writer/src/main.cpp delete mode 100644 sf-writer/test/CMakeLists.txt delete mode 100644 sf-writer/test/main.cpp delete mode 100644 sf-writer/test/mock/data.hpp delete mode 100644 sf-writer/test/perf/perf_WriterH5Writer.cpp delete mode 100644 sf-writer/test/test_ImageAssembler.cpp delete mode 100644 sf-writer/test/test_JFH5Writer.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 63c25dc..ff244cf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,5 +33,4 @@ add_subdirectory("jf-udp-recv") add_subdirectory("jf-buffer-writer") add_subdirectory("jf-assembler") add_subdirectory("sf-stream") -add_subdirectory("sf-writer") add_subdirectory("jf-live-writer") diff --git a/jf-live-writer/include/ImageBinaryWriter.hpp b/jf-live-writer/include/ImageBinaryWriter.hpp deleted file mode 100644 index ce80655..0000000 --- a/jf-live-writer/include/ImageBinaryWriter.hpp +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef IMAGEBINARYWRITER_HPP -#define IMAGEBINARYWRITER_HPP - -#include - -#include "formats.hpp" - - - -class ImageBinaryWriter { - const size_t IMAGE_BYTES; - const size_t IMAGE_SLOT_BYTES; - const size_t MAX_FILE_BYTES; - const std::string detector_folder_; - std::string latest_filename_; - - std::string current_output_filename_; - int output_file_fd_; - - void open_file(const std::string& filename); - void close_current_file(); - - -public: - ImageBinaryWriter( - const std::string& detector_folder, - const uint64_t image_n_bytes); - - virtual ~ImageBinaryWriter(); - - void write(const ImageMetadata meta, const char* data); - -}; - - -#endif //IMAGEBINARYWRITER_HPP diff --git a/jf-live-writer/src/ImageBinaryWriter.cpp b/jf-live-writer/src/ImageBinaryWriter.cpp deleted file mode 100644 index d5f9900..0000000 --- a/jf-live-writer/src/ImageBinaryWriter.cpp +++ /dev/null @@ -1,143 +0,0 @@ -#include "ImageBinaryWriter.hpp" - -#include -#include -#include "date.h" -#include -#include -#include -#include - -#include "BufferUtils.hpp" - -using namespace std; -using namespace buffer_config; - -ImageBinaryWriter::ImageBinaryWriter( - const string& detector_folder, - const size_t image_n_bytes): - IMAGE_BYTES(image_n_bytes), - IMAGE_SLOT_BYTES(IMAGE_BYTES + sizeof(ImageMetadata)), - MAX_FILE_BYTES(IMAGE_SLOT_BYTES * FILE_MOD), - detector_folder_(detector_folder), - latest_filename_(detector_folder + "/LATEST"), - current_output_filename_(""), - output_file_fd_(-1) -{ -} - -ImageBinaryWriter::~ImageBinaryWriter() -{ - close_current_file(); -} - -void ImageBinaryWriter::write(const ImageMetadata meta, const char* data) -{ - auto current_frame_file = - BufferUtils::get_image_filename(detector_folder_, meta.pulse_id); - - if (current_frame_file != current_output_filename_) { - open_file(current_frame_file); - } - - size_t n_bytes_offset = - BufferUtils::get_file_frame_index(meta.pulse_id) * IMAGE_SLOT_BYTES; - - auto lseek_result = lseek(output_file_fd_, n_bytes_offset, SEEK_SET); - if (lseek_result < 0) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[ImageBinaryWriter::write]"; - err_msg << " Error while lseek on file "; - err_msg << current_output_filename_; - err_msg << " for n_bytes_offset "; - err_msg << n_bytes_offset << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - auto n_bytes_meta = ::write(output_file_fd_, &meta, sizeof(ImageMetadata)); - if (n_bytes_meta < sizeof(ImageMetadata)) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BufferBinaryWriter::write]"; - err_msg << " Error while writing to file "; - err_msg << current_output_filename_ << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - auto n_bytes_data = ::write(output_file_fd_, data, IMAGE_BYTES); - if (n_bytes_data < sizeof(IMAGE_BYTES)) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BufferBinaryWriter::write]"; - err_msg << " Error while writing to file "; - err_msg << current_output_filename_ << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } -} - -void ImageBinaryWriter::open_file(const std::string& filename) -{ - close_current_file(); - - BufferUtils::create_destination_folder(filename); - - output_file_fd_ = ::open(filename.c_str(), O_WRONLY | O_CREAT, - S_IRWXU | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); - if (output_file_fd_ < 0) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[ImageBinaryWriter::open_file]"; - err_msg << " Cannot create file "; - err_msg << filename << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - current_output_filename_ = filename; -} - -void ImageBinaryWriter::close_current_file() -{ - if (output_file_fd_ != -1) { - if (close(output_file_fd_) < 0) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[ImageBinaryWriter::close_current_file]"; - err_msg << " Error while closing file "; - err_msg << current_output_filename_ << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - output_file_fd_ = -1; - - BufferUtils::update_latest_file( - latest_filename_, current_output_filename_); - - current_output_filename_ = ""; - } -} \ No newline at end of file diff --git a/sf-writer/CMakeLists.txt b/sf-writer/CMakeLists.txt deleted file mode 100644 index 5b520f5..0000000 --- a/sf-writer/CMakeLists.txt +++ /dev/null @@ -1,22 +0,0 @@ -file(GLOB SOURCES - src/*.cpp) - -add_library(sf-writer-lib STATIC ${SOURCES}) -target_include_directories(sf-writer-lib PUBLIC include/) -target_link_libraries(sf-writer-lib - external - core-buffer-lib) - -add_executable(sf-writer src/main.cpp) -set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer) -target_link_libraries(sf-writer - sf-writer-lib - zmq - hdf5 - hdf5_hl - hdf5_cpp - pthread - ) - -enable_testing() -add_subdirectory(test/) \ No newline at end of file diff --git a/sf-writer/README.md b/sf-writer/README.md deleted file mode 100644 index 606129e..0000000 --- a/sf-writer/README.md +++ /dev/null @@ -1,42 +0,0 @@ -# sf-writer - -sf-writer reads the binary buffer from disk, assembled the images and writes -them down in HDF5 format. - -## Data request ranges - -Data request ranges are composed of: - -- start_pulse_id (first pulse_id to be included in the file) -- stop_pulse_id (last pulse_id to be included in the file) -- pulse_id_step (how many pulses to skip between images.) - -pulse_id_step can be used to write data at different frequencies: - -- pulse_id_step == 1 (100Hz, write very pulse_id) -- pulse_id_step == 2 (50hz, write every second pulse) -- pulse_id_step == 10 (10Hz, write every 10th pulse) - -The next pulse_id to be written is calculated internally as: - -```c++ -auto next_pulse_id = currnet_pulse_id + pulse_id_step; -``` - -The loop criteria for writing is: - -```c++ -for ( - auto curr_pulse_id = start_pulse_id; - curr_pulse_id <= stop_pulse_id; - curr_pulse_id += pulse_id_step -) { - // Write curr_pulse_id to output file. -} -``` - -**Warning** - -If your stop_pulse_id cannot be reached by adding step_pulse_id to -start_pulse_id (start_pulse_id + (n * pulse_id_step) != stop_pulse_id for any n) -it will not be included in the final file. \ No newline at end of file diff --git a/sf-writer/include/BufferBinaryReader.hpp b/sf-writer/include/BufferBinaryReader.hpp deleted file mode 100644 index 0350cba..0000000 --- a/sf-writer/include/BufferBinaryReader.hpp +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP -#define SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP - - -#include - -class BufferBinaryReader { - - const std::string detector_folder_; - const std::string module_name_; - - std::string current_input_file_; - int input_file_fd_; - - void open_file(const std::string& filename); - void close_current_file(); - -public: - BufferBinaryReader(const std::string &detector_folder, - const std::string &module_name); - - ~BufferBinaryReader(); - - void get_block(const uint64_t block_id, BufferBinaryBlock *buffer); -}; - - -#endif //SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP diff --git a/sf-writer/include/ImageAssembler.hpp b/sf-writer/include/ImageAssembler.hpp deleted file mode 100644 index ce98962..0000000 --- a/sf-writer/include/ImageAssembler.hpp +++ /dev/null @@ -1,53 +0,0 @@ -#ifndef SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP -#define SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP - -#include - -#include "buffer_config.hpp" -#include "formats.hpp" - -const uint64_t IA_EMPTY_SLOT_VALUE = 0; - -struct ImageMetadataBlock -{ - uint64_t pulse_id[buffer_config::BUFFER_BLOCK_SIZE]; - uint64_t frame_index[buffer_config::BUFFER_BLOCK_SIZE]; - uint32_t daq_rec[buffer_config::BUFFER_BLOCK_SIZE]; - uint8_t is_good_image[buffer_config::BUFFER_BLOCK_SIZE]; - uint64_t block_start_pulse_id; - uint64_t block_stop_pulse_id; -}; - -class ImageAssembler { - const size_t n_modules_; - const size_t image_buffer_slot_n_bytes_; - - char* image_buffer_; - ImageMetadataBlock* meta_buffer_; - ModuleFrame* frame_meta_buffer_; - std::atomic_int* buffer_status_; - std::atomic_uint64_t* buffer_bunch_id_; - - size_t get_data_offset(const uint64_t slot_id, const int i_module); - size_t get_metadata_offset(const uint64_t slot_id, const int i_module); - -public: - ImageAssembler(const size_t n_modules); - - virtual ~ImageAssembler(); - - bool is_slot_free(const uint64_t bunch_id); - bool is_slot_full(const uint64_t bunch_id); - - void process(const uint64_t bunch_id, - const int i_module, - const BufferBinaryBlock* block_buffer); - - void free_slot(const uint64_t bunch_id); - - ImageMetadataBlock* get_metadata_buffer(const uint64_t bunch_id); - char* get_data_buffer(const uint64_t bunch_id); -}; - - -#endif //SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP diff --git a/sf-writer/include/writer_config.hpp b/sf-writer/include/writer_config.hpp deleted file mode 100644 index 6a60d77..0000000 --- a/sf-writer/include/writer_config.hpp +++ /dev/null @@ -1,9 +0,0 @@ -#include - -namespace writer_config -{ - // MS to retry reading from the image assembler. - const size_t ASSEMBLER_RETRY_MS = 5; - // Number of slots in the reconstruction buffer. - const size_t WRITER_IA_N_SLOTS = 2; -} \ No newline at end of file diff --git a/sf-writer/src/BufferBinaryReader.cpp b/sf-writer/src/BufferBinaryReader.cpp deleted file mode 100644 index e76c5c5..0000000 --- a/sf-writer/src/BufferBinaryReader.cpp +++ /dev/null @@ -1,107 +0,0 @@ -#include "BufferBinaryReader.hpp" - -#include -#include -#include -#include -#include - -#include "BufferUtils.hpp" -#include "writer_config.hpp" -#include "buffer_config.hpp" - -using namespace std; -using namespace writer_config; -using namespace buffer_config; - -BufferBinaryReader::BufferBinaryReader( - const std::string &detector_folder, - const std::string &module_name) : - detector_folder_(detector_folder), - module_name_(module_name), - current_input_file_(""), - input_file_fd_(-1) -{} - -BufferBinaryReader::~BufferBinaryReader() -{ - close_current_file(); -} - -void BufferBinaryReader::get_block( - const uint64_t block_id, BufferBinaryBlock* buffer) -{ - uint64_t block_start_pulse_id = block_id * BUFFER_BLOCK_SIZE; - auto current_block_file = BufferUtils::get_filename( - detector_folder_, module_name_, block_start_pulse_id); - - if (current_block_file != current_input_file_) { - open_file(current_block_file); - } - - size_t file_start_index = - BufferUtils::get_file_frame_index(block_start_pulse_id); - size_t n_bytes_offset = file_start_index * sizeof(BufferBinaryFormat); - - auto lseek_result = lseek(input_file_fd_, n_bytes_offset, SEEK_SET); - if (lseek_result < 0) { - stringstream err_msg; - - err_msg << "[BufferBinaryReader::get_block]"; - err_msg << " Error while lseek on file "; - err_msg << current_input_file_ << " for n_bytes_offset "; - err_msg << n_bytes_offset << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - auto n_bytes = ::read(input_file_fd_, buffer, - sizeof(BufferBinaryFormat) * BUFFER_BLOCK_SIZE); - - if (n_bytes < sizeof(BufferBinaryFormat)) { - stringstream err_msg; - - err_msg << "[BufferBinaryReader::get_block]"; - err_msg << " Error while reading from file "; - err_msg << current_input_file_ << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } -} - -void BufferBinaryReader::open_file(const std::string& filename) -{ - close_current_file(); - - input_file_fd_ = open(filename.c_str(), O_RDONLY); - - if (input_file_fd_ < 0) { - stringstream err_msg; - - err_msg << "[BufferBinaryReader::open_file]"; - err_msg << " Cannot open file " << filename << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - current_input_file_ = filename; -} - -void BufferBinaryReader::close_current_file() -{ - if (input_file_fd_ != -1) { - if (close(input_file_fd_) < 0) { - stringstream err_msg; - - err_msg << "[BinaryWriter::close_current_file]"; - err_msg << " Error while closing file " << current_input_file_; - err_msg << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - input_file_fd_ = -1; - current_input_file_ = ""; - } -} diff --git a/sf-writer/src/ImageAssembler.cpp b/sf-writer/src/ImageAssembler.cpp deleted file mode 100644 index 1a9a5e6..0000000 --- a/sf-writer/src/ImageAssembler.cpp +++ /dev/null @@ -1,186 +0,0 @@ -#include - -#include "ImageAssembler.hpp" -#include "writer_config.hpp" -#include "buffer_config.hpp" - -using namespace std; -using namespace writer_config; -using namespace buffer_config; - -ImageAssembler::ImageAssembler(const size_t n_modules) : - n_modules_(n_modules), - image_buffer_slot_n_bytes_(BUFFER_BLOCK_SIZE * MODULE_N_BYTES * n_modules_) -{ - image_buffer_ = new char[WRITER_IA_N_SLOTS * image_buffer_slot_n_bytes_]; - meta_buffer_ = new ImageMetadataBlock[WRITER_IA_N_SLOTS]; - frame_meta_buffer_ = - new ModuleFrame[WRITER_IA_N_SLOTS * n_modules * BUFFER_BLOCK_SIZE]; - buffer_status_ = new atomic_int[WRITER_IA_N_SLOTS]; - buffer_bunch_id_ = new atomic_uint64_t[WRITER_IA_N_SLOTS]; - - for (size_t i=0; i < WRITER_IA_N_SLOTS; i++) { - free_slot(i); - } -} - -ImageAssembler::~ImageAssembler() -{ - delete[] image_buffer_; - delete[] meta_buffer_; -} - -bool ImageAssembler::is_slot_free(const uint64_t bunch_id) -{ - auto slot_id = bunch_id % WRITER_IA_N_SLOTS; - - uint64_t slot_bunch_id = IA_EMPTY_SLOT_VALUE; - if (buffer_bunch_id_[slot_id].compare_exchange_strong( - slot_bunch_id, bunch_id)) { - return true; - } - - auto is_free = buffer_status_[slot_id].load(memory_order_relaxed) > 0; - return is_free && (slot_bunch_id == bunch_id); -} - -bool ImageAssembler::is_slot_full(const uint64_t bunch_id) -{ - auto slot_id = bunch_id % WRITER_IA_N_SLOTS; - return buffer_status_[slot_id].load(memory_order_relaxed) == 0; -} - -size_t ImageAssembler::get_data_offset( - const uint64_t slot_id, const int i_module) -{ - size_t slot_i_offset = slot_id * image_buffer_slot_n_bytes_; - size_t module_i_offset = i_module * MODULE_N_BYTES; - - return slot_i_offset + module_i_offset; -} - -size_t ImageAssembler::get_metadata_offset( - const uint64_t slot_id, const int i_module) -{ - size_t n_metadata_in_slot = n_modules_ * BUFFER_BLOCK_SIZE; - size_t slot_m_offset = slot_id * n_metadata_in_slot; - size_t module_m_offset = i_module; - - return slot_m_offset + module_m_offset; -} - -void ImageAssembler::process( - const uint64_t bunch_id, - const int i_module, - const BufferBinaryBlock* block_buffer) -{ - const auto slot_id = bunch_id % WRITER_IA_N_SLOTS; - - auto meta_offset = get_metadata_offset(slot_id, i_module); - const auto meta_offset_step = n_modules_; - - auto image_offset = get_data_offset(slot_id, i_module); - const auto image_offset_step = MODULE_N_BYTES * n_modules_; - - for (const auto& frame : block_buffer->frame) { - - memcpy( - &(frame_meta_buffer_[meta_offset]), - &(frame.meta), - sizeof(ModuleFrame)); - - meta_offset += meta_offset_step; - - memcpy( - image_buffer_ + image_offset, - &(frame.data[0]), - MODULE_N_BYTES); - - image_offset += image_offset_step; - } - - buffer_status_[slot_id].fetch_sub(1, memory_order_relaxed); -} - -void ImageAssembler::free_slot(const uint64_t bunch_id) -{ - auto slot_id = bunch_id % WRITER_IA_N_SLOTS; - buffer_status_[slot_id].store(n_modules_, memory_order_relaxed); - buffer_bunch_id_[slot_id].store(IA_EMPTY_SLOT_VALUE, memory_order_relaxed); -} - -ImageMetadataBlock* ImageAssembler::get_metadata_buffer(const uint64_t bunch_id) -{ - const auto slot_id = bunch_id % WRITER_IA_N_SLOTS; - - auto& image_pulse_id = meta_buffer_[slot_id].pulse_id; - auto& image_frame_index = meta_buffer_[slot_id].frame_index; - auto& image_daq_rec = meta_buffer_[slot_id].daq_rec; - auto& image_is_good_frame = meta_buffer_[slot_id].is_good_image; - - auto meta_offset = get_metadata_offset(slot_id, 0); - const auto meta_offset_step = 1; - - uint64_t start_pulse_id = bunch_id * BUFFER_BLOCK_SIZE; - meta_buffer_[slot_id].block_start_pulse_id = start_pulse_id; - - uint64_t stop_pulse_id = start_pulse_id + BUFFER_BLOCK_SIZE - 1; - meta_buffer_[slot_id].block_stop_pulse_id = stop_pulse_id; - - for (size_t i_pulse=0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) { - - auto is_pulse_init = false; - image_is_good_frame[i_pulse] = 1; - image_pulse_id[i_pulse] = 0; - - for (size_t i_module=0; i_module < n_modules_; i_module++) { - - auto& frame_meta = frame_meta_buffer_[meta_offset]; - auto is_good_frame = - frame_meta.n_recv_packets == JF_N_PACKETS_PER_FRAME; - - if (!is_good_frame) { - image_is_good_frame[i_pulse] = 0; - // TODO: Update meta_offset only once in the loop. - meta_offset += meta_offset_step; - continue; - } - - if (!is_pulse_init) { - image_pulse_id[i_pulse] = frame_meta.pulse_id; - image_frame_index[i_pulse] = frame_meta.frame_index; - image_daq_rec[i_pulse] = frame_meta.daq_rec; - - is_pulse_init = true; - } - - if (image_is_good_frame[i_pulse] == 1) { - if (frame_meta.pulse_id != image_pulse_id[i_pulse]) { - image_is_good_frame[i_pulse] = 0; - } - - if (frame_meta.frame_index != image_frame_index[i_pulse]) { - image_is_good_frame[i_pulse] = 0; - } - - if (frame_meta.daq_rec != image_daq_rec[i_pulse]) { - image_is_good_frame[i_pulse] = 0; - } - - if (frame_meta.n_recv_packets != JF_N_PACKETS_PER_FRAME) { - image_is_good_frame[i_pulse] = 0; - } - } - - meta_offset += meta_offset_step; - } - } - - return &(meta_buffer_[slot_id]); -} - -char* ImageAssembler::get_data_buffer(const uint64_t bunch_id) -{ - auto slot_id = bunch_id % WRITER_IA_N_SLOTS; - return image_buffer_ + (slot_id * image_buffer_slot_n_bytes_); -} diff --git a/sf-writer/src/main.cpp b/sf-writer/src/main.cpp deleted file mode 100644 index 643d47f..0000000 --- a/sf-writer/src/main.cpp +++ /dev/null @@ -1,158 +0,0 @@ -#include -#include -#include -#include -#include - -#include "date.h" -#include "zmq.h" -#include "writer_config.hpp" -#include "buffer_config.hpp" -#include "bitshuffle/bitshuffle.h" -#include "JFH5Writer.hpp" -#include "ImageAssembler.hpp" -#include "BufferBinaryReader.hpp" - -using namespace std; -using namespace chrono; -using namespace writer_config; -using namespace buffer_config; - -void read_buffer( - const string detector_folder, - const string module_name, - const int i_module, - const vector& buffer_blocks, - ImageAssembler& image_assembler) -{ - BufferBinaryReader block_reader(detector_folder, module_name); - auto block_buffer = new BufferBinaryBlock(); - - for (uint64_t block_id:buffer_blocks) { - - while(!image_assembler.is_slot_free(block_id)) { - this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS)); - } - - auto start_time = steady_clock::now(); - - block_reader.get_block(block_id, block_buffer); - - auto end_time = steady_clock::now(); - uint64_t read_us_duration = duration_cast( - end_time-start_time).count(); - - start_time = steady_clock::now(); - - image_assembler.process(block_id, i_module, block_buffer); - - end_time = steady_clock::now(); - uint64_t compose_us_duration = duration_cast( - end_time-start_time).count(); - - cout << "sf_writer:avg_read_us "; - cout << read_us_duration / BUFFER_BLOCK_SIZE << endl; - cout << "sf_writer:avg_assemble_us "; - cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl; - } - - delete block_buffer; -} - -int main (int argc, char *argv[]) -{ - if (argc != 7) { - cout << endl; - cout << "Usage: sf_writer [output_file] [detector_folder] [n_modules]"; - cout << " [start_pulse_id] [stop_pulse_id] [pulse_id_step]"; - cout << endl; - cout << "\toutput_file: Complete path to the output file." << endl; - cout << "\tdetector_folder: Absolute path to detector buffer." << endl; - cout << "\tn_modules: number of modules" << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; - cout << "\tpulse_id_step: 1==100Hz, 2==50hz, 4==25Hz.." << endl; - cout << endl; - - exit(-1); - } - - string output_file = string(argv[1]); - const string detector_folder = string(argv[2]); - size_t n_modules = atoi(argv[3]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); - uint64_t stop_pulse_id = (uint64_t) atoll(argv[5]); - int pulse_id_step = atoi(argv[6]); - - // Align start (up) and stop(down) pulse_id with pulse_id_step. - if (start_pulse_id % pulse_id_step != 0) { - start_pulse_id += pulse_id_step - (start_pulse_id % pulse_id_step); - } - if (stop_pulse_id % pulse_id_step != 0) { - stop_pulse_id -= (start_pulse_id % pulse_id_step); - } - - uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE; - uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE; - - // Generate list of buffer blocks that need to be loaded. - std::vector buffer_blocks; - for (uint64_t i_block=start_block; i_block <= stop_block; i_block++) { - buffer_blocks.push_back(i_block); - } - - ImageAssembler image_assembler(n_modules); - - std::vector reading_threads(n_modules); - for (size_t i_module=0; i_module( - end_time-start_time).count(); - - image_assembler.free_slot(block_id); - - cout << "sf_writer:avg_write_us "; - cout << write_us_duration / BUFFER_BLOCK_SIZE << endl; - } - - for (auto& reading_thread : reading_threads) { - if (reading_thread.joinable()) { - reading_thread.join(); - } - } - - return 0; -} diff --git a/sf-writer/test/CMakeLists.txt b/sf-writer/test/CMakeLists.txt deleted file mode 100644 index 2b9f4f0..0000000 --- a/sf-writer/test/CMakeLists.txt +++ /dev/null @@ -1,10 +0,0 @@ -add_executable(sf-writer-tests main.cpp) - -target_link_libraries(sf-writer-tests - sf-writer-lib - hdf5 - hdf5_hl - hdf5_cpp - zmq - gtest - ) diff --git a/sf-writer/test/main.cpp b/sf-writer/test/main.cpp deleted file mode 100644 index 65c6feb..0000000 --- a/sf-writer/test/main.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "gtest/gtest.h" -#include "test_JFH5Writer.cpp" -#include "test_ImageAssembler.cpp" - -using namespace std; - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/sf-writer/test/mock/data.hpp b/sf-writer/test/mock/data.hpp deleted file mode 100644 index 0588de5..0000000 --- a/sf-writer/test/mock/data.hpp +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef SF_DAQ_BUFFER_DATA_HPP -#define SF_DAQ_BUFFER_DATA_HPP - -#include -#include - -#include "buffer_config.hpp" - -auto get_test_block_metadata( - const uint64_t start_pulse_id, - const uint64_t stop_pulse_id, - const int pulse_id_step) -{ - using namespace std; - using namespace buffer_config; - - auto metadata = make_shared(); - - uint64_t block_id = start_pulse_id / BUFFER_BLOCK_SIZE; - - metadata->block_start_pulse_id = block_id * BUFFER_BLOCK_SIZE; - metadata->block_stop_pulse_id = - metadata->block_start_pulse_id + BUFFER_BLOCK_SIZE - 1; - - if (metadata->block_stop_pulse_id < stop_pulse_id) { - throw runtime_error("stop_pulse_id in next block"); - } - - auto offset = start_pulse_id - metadata->block_start_pulse_id; - - for (uint64_t pulse_id = start_pulse_id; - pulse_id <= stop_pulse_id; - pulse_id++, offset++) { - - if (pulse_id % pulse_id_step != 0) { - metadata->is_good_image[offset] = 0; - continue; - } - - metadata->pulse_id[offset] = pulse_id; - metadata->frame_index[offset] = pulse_id + 10; - metadata->daq_rec[offset] = pulse_id + 100; - metadata->is_good_image[offset] = 1; - } - - return metadata; -} - -auto get_test_block_data(const size_t n_modules) -{ - using namespace std; - using namespace buffer_config; - - auto image_buffer = make_unique( - MODULE_N_PIXELS * n_modules * BUFFER_BLOCK_SIZE); - - for (int i_block=0; i_block<=BUFFER_BLOCK_SIZE; i_block++) { - for (int i_module=0; i_module -#include "buffer_config.hpp" -#include "zmq.h" -#include -#include -#include -#include -#include "WriterH5Writer.hpp" - -using namespace std; -using namespace core_buffer; - - -int main (int argc, char *argv[]) -{ - if (argc != 4) { - cout << endl; - cout << "Usage: sf_writer "; - cout << " [output_file] [start_pulse_id] [stop_pulse_id]"; - cout << endl; - cout << "\toutput_file: Complete path to the output file." << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; - cout << endl; - - exit(-1); - } - - string output_file = string(argv[1]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[2]); - uint64_t stop_pulse_id = (uint64_t) atoll(argv[3]); - - size_t n_modules = 32; - - size_t n_frames = stop_pulse_id - start_pulse_id; - WriterH5Writer writer(output_file, n_frames, n_modules); - - // TODO: Remove stats trash. - int i_write = 0; - size_t total_ms = 0; - size_t max_ms = 0; - size_t min_ms = 10000; // 10 seconds should be a safe first value. - - auto start_time = chrono::steady_clock::now(); - - auto metadata = make_shared(); - auto data = make_unique(MODULE_N_BYTES*n_modules); - - auto current_pulse_id = start_pulse_id; - while (current_pulse_id <= stop_pulse_id) { - - writer.write(metadata.get(), data.get()); - current_pulse_id++; - - i_write++; - - auto end_time = chrono::steady_clock::now(); - - // TODO: Some poor statistics. - - auto ms_duration = chrono::duration_cast( - end_time-start_time).count(); - total_ms += ms_duration; - - if (ms_duration > max_ms) { - max_ms = ms_duration; - } - - if (ms_duration < min_ms) { - min_ms = ms_duration; - } - - if (i_write==100) { - cout << "avg_write_ms " << total_ms / 100; - cout << " min_write_ms " << min_ms; - cout << " max_write_ms " << max_ms << endl; - - i_write = 0; - total_ms = 0; - max_ms = 0; - min_ms = 0; - } - - start_time = chrono::steady_clock::now(); - } - - writer.close_file(); - - return 0; -} diff --git a/sf-writer/test/test_ImageAssembler.cpp b/sf-writer/test/test_ImageAssembler.cpp deleted file mode 100644 index 97bdbb3..0000000 --- a/sf-writer/test/test_ImageAssembler.cpp +++ /dev/null @@ -1,90 +0,0 @@ -#include - -#include "ImageAssembler.hpp" -#include "gtest/gtest.h" - -using namespace std; -using namespace buffer_config; - -TEST(ImageAssembler, basic_interaction) -{ - size_t n_modules = 3; - uint64_t bunch_id = 0; - - ImageAssembler assembler(n_modules); - - ASSERT_EQ(assembler.is_slot_free(bunch_id), true); - - auto buffer_block = make_unique(); - auto buffer_ptr = buffer_block.get(); - - for (size_t i_module=0; i_module < n_modules; i_module++) { - assembler.process(bunch_id, i_module, buffer_ptr); - } - - ASSERT_EQ(assembler.is_slot_full(bunch_id), true); - - auto metadata = assembler.get_metadata_buffer(bunch_id); - auto data = assembler.get_data_buffer(bunch_id); - - assembler.free_slot(bunch_id); - ASSERT_EQ(assembler.is_slot_free(bunch_id), true); - - for (size_t i_pulse = 0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) { - ASSERT_EQ(metadata->is_good_image[i_pulse], 0); - } -} - -TEST(ImageAssembler, reconstruction) -{ - size_t n_modules = 2; - uint64_t bunch_id = 0; - - ImageAssembler assembler(n_modules); - - ASSERT_EQ(assembler.is_slot_free(bunch_id), true); - - auto buffer_block = make_unique(); - auto buffer_ptr = buffer_block.get(); - - for (size_t i_module=0; i_module < n_modules; i_module++) { - - for (size_t i_pulse=0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) { - auto& frame_meta = buffer_block->frame[i_pulse].meta; - - frame_meta.pulse_id = 100 + i_pulse; - frame_meta.daq_rec = 1000 + i_pulse; - frame_meta.frame_index = 10000 + i_pulse; - frame_meta.n_recv_packets = JF_N_PACKETS_PER_FRAME; - - for (size_t i_pixel=0; i_pixel < MODULE_N_PIXELS; i_pixel++) { - buffer_block->frame[i_pulse].data[i_pixel] = - (i_module * 10) + (i_pixel % 100); - } - } - - assembler.process(bunch_id, i_module, buffer_ptr); - } - - ASSERT_EQ(assembler.is_slot_full(bunch_id), true); - - auto metadata = assembler.get_metadata_buffer(bunch_id); - auto data = assembler.get_data_buffer(bunch_id); - - assembler.free_slot(bunch_id); - ASSERT_EQ(assembler.is_slot_free(bunch_id), true); - - ASSERT_EQ(metadata->block_start_pulse_id, 0); - ASSERT_EQ(metadata->block_stop_pulse_id, BUFFER_BLOCK_SIZE-1); - - for (size_t i_pulse = 0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) { - ASSERT_EQ(metadata->pulse_id[i_pulse], 100 + i_pulse); - ASSERT_EQ(metadata->daq_rec[i_pulse], 1000 + i_pulse); - ASSERT_EQ(metadata->frame_index[i_pulse], 10000 + i_pulse); - ASSERT_EQ(metadata->is_good_image[i_pulse], 1); - - for (size_t i_module=0; i_module < n_modules; i_module++) { - // TODO: Check assembled image. - } - } -} diff --git a/sf-writer/test/test_JFH5Writer.cpp b/sf-writer/test/test_JFH5Writer.cpp deleted file mode 100644 index 2a2b5a9..0000000 --- a/sf-writer/test/test_JFH5Writer.cpp +++ /dev/null @@ -1,254 +0,0 @@ -#include - -#include "JFH5Writer.hpp" -#include "gtest/gtest.h" -#include "bitshuffle/bitshuffle.h" -#include "mock/data.hpp" - -using namespace std; -using namespace buffer_config; - -TEST(JFH5Writer, basic_interaction) -{ - size_t n_modules = 2; - uint64_t start_pulse_id = 1; - uint64_t stop_pulse_id = 5; - - auto data = make_unique(n_modules*MODULE_N_BYTES*BUFFER_BLOCK_SIZE); - auto metadata = make_shared(); - - // Needed by writer. - metadata->block_start_pulse_id = 0; - metadata->block_stop_pulse_id = BUFFER_BLOCK_SIZE - 1; - - JFH5Writer writer("ignore.h5", "detector", - n_modules, start_pulse_id, stop_pulse_id, 1); - writer.write(metadata.get(), data.get()); -} - -TEST(JFH5Writer, test_writing) -{ - size_t n_modules = 2; - uint64_t start_pulse_id = 5; - uint64_t stop_pulse_id = 10; - auto n_images = stop_pulse_id - start_pulse_id + 1; - - auto meta = get_test_block_metadata(start_pulse_id, stop_pulse_id, 1); - auto data = get_test_block_data(n_modules); - - string detector_name = "detector"; - - // The writer closes the file on destruction. - { - JFH5Writer writer( - "ignore.h5", detector_name, - n_modules, start_pulse_id, stop_pulse_id, 1); - writer.write(meta.get(), (char*)(&data[0])); - } - - H5::H5File reader("ignore.h5", H5F_ACC_RDONLY); - auto image_dataset = reader.openDataSet("/data/detector/data"); - image_dataset.read(&data[0], H5::PredType::NATIVE_UINT16); - - for (int i_image=0; i_image < n_images; i_image++) { - for (int i_module=0; i_module(n_images); - auto pulse_id_dataset = reader.openDataSet("/data/detector/pulse_id"); - pulse_id_dataset.read(&pulse_id_data[0], H5::PredType::NATIVE_UINT64); - - auto frame_index_data = make_unique(n_images); - auto frame_index_dataset = reader.openDataSet("/data/detector/frame_index"); - frame_index_dataset.read(&frame_index_data[0], H5::PredType::NATIVE_UINT64); - - auto daq_rec_data = make_unique(n_images); - auto daq_rec_dataset = reader.openDataSet("/data/detector/daq_rec"); - daq_rec_dataset.read(&daq_rec_data[0], H5::PredType::NATIVE_UINT32); - - auto is_good_frame_data = make_unique(n_images); - auto is_good_frame_dataset = - reader.openDataSet("/data/detector/is_good_frame"); - is_good_frame_dataset.read( - &is_good_frame_data[0], H5::PredType::NATIVE_UINT8); - - auto name_dataset = reader.openDataSet("/general/detector_name"); - string read_detector_name; - name_dataset.read(read_detector_name, name_dataset.getDataType()); - - ASSERT_EQ(detector_name, read_detector_name); - - for (uint64_t pulse_id=start_pulse_id; - pulse_id<=stop_pulse_id; - pulse_id++) { - - ASSERT_EQ(pulse_id_data[pulse_id - start_pulse_id], pulse_id); - ASSERT_EQ(frame_index_data[pulse_id - start_pulse_id], pulse_id + 10); - ASSERT_EQ(daq_rec_data[pulse_id - start_pulse_id], pulse_id + 100); - ASSERT_EQ(is_good_frame_data[pulse_id - start_pulse_id], 1); - } -} - -TEST(JFH5Writer, test_step_pulse_id) -{ - // Start pulse id (5) larger than stop pulse id (4). - ASSERT_THROW(JFH5Writer writer("ignore.h5", "d", 1 , 5, 4, 1), - runtime_error); - - // Start pulse id (5) is equal to stop pulse id (5). - ASSERT_NO_THROW(JFH5Writer writer("ignore.h5", "d", 1, 5, 5, 1)); - - // The step is exactly on start nad stop pulse id. - ASSERT_NO_THROW(JFH5Writer writer("ignore.h5", "d", 1, 5, 5, 5)); - - // No pulses in given range with step = 10 - ASSERT_THROW(JFH5Writer writer("ignore.h5", "d", 1, 1, 9, 10), - runtime_error); - - // Stop pulse id is divisible by step, but start is not. - ASSERT_THROW(JFH5Writer writer("ignore.h5", "d", 1, 5, 10, 10), - runtime_error); - - // Start pulse id is divisible by step, but stop is not. - ASSERT_THROW(JFH5Writer writer("ignore.h5", "d", 1, 10, 19, 10), - runtime_error); - - // Should be ok. - ASSERT_NO_THROW(JFH5Writer("ignore.h5", "d", 1, 1234, 1234, 1)); - // Should be ok. - ASSERT_NO_THROW(JFH5Writer("ignore.h5", "d", 1, 1234, 4567, 1)); - // Should be ok. - ASSERT_NO_THROW(JFH5Writer("ignore.h5", "d", 1, 4, 4, 4)); - - // stop smaller than start. - ASSERT_THROW(JFH5Writer("ignore.h5", "d", 1, 1234, 1233, 1), - runtime_error); - // step is not valid for 100Hz. - ASSERT_THROW(JFH5Writer("ignore.h5", "d", 1, 1234, 1234, 3), - runtime_error); - // start not divisible by step. - ASSERT_THROW(JFH5Writer("ignore.h5", "d", 1, 10, 10, 4), - runtime_error); - // stop not divisible by step - ASSERT_THROW(JFH5Writer("ignore.h5", "d", 1, 8, 10, 4), - runtime_error); -} - -void test_writing_with_step( - uint64_t start_pulse_id, uint64_t stop_pulse_id, size_t step) -{ - size_t n_modules = 3; - - size_t n_images = 1; - n_images += (stop_pulse_id / step); - n_images -= start_pulse_id / step; - - auto meta = get_test_block_metadata(start_pulse_id, stop_pulse_id, step); - auto data = get_test_block_data(n_modules); - - // Verify the metadata has the layout we want to test (50Hz). - for (size_t i_pulse=0; i_pulsepulse_id[i_pulse], 500 + i_pulse); - } else { - ASSERT_EQ(meta->pulse_id[i_pulse], 0); - } - } - - string path_root = "/path/to/"; - string expected_detector_name = "detector"; - - // The writer closes the file on destruction. - { - JFH5Writer writer( - "ignore.h5", path_root + expected_detector_name, - n_modules, start_pulse_id, stop_pulse_id, step); - writer.write(meta.get(), (char*)(&data[0])); - } - - H5::H5File reader("ignore.h5", H5F_ACC_RDONLY); - auto image_dataset = reader.openDataSet("/data/detector/data"); - image_dataset.read(&data[0], H5::PredType::NATIVE_UINT16); - - hsize_t dims[3]; - image_dataset.getSpace().getSimpleExtentDims(dims); - ASSERT_EQ(dims[0], n_images); - ASSERT_EQ(dims[1], n_modules * MODULE_Y_SIZE); - ASSERT_EQ(dims[2], MODULE_X_SIZE); - - auto pulse_id_data = make_unique(n_images); - auto pulse_id_dataset = reader.openDataSet("/data/detector/pulse_id"); - pulse_id_dataset.read(&pulse_id_data[0], H5::PredType::NATIVE_UINT64); - pulse_id_dataset.getSpace().getSimpleExtentDims(dims); - ASSERT_EQ(dims[0], n_images); - ASSERT_EQ(dims[1], 1); - - auto frame_index_data = make_unique(n_images); - auto frame_index_dataset = reader.openDataSet("/data/detector/frame_index"); - frame_index_dataset.read(&frame_index_data[0], H5::PredType::NATIVE_UINT64); - frame_index_dataset.getSpace().getSimpleExtentDims(dims); - ASSERT_EQ(dims[0], n_images); - ASSERT_EQ(dims[1], 1); - - auto daq_rec_data = make_unique(n_images); - auto daq_rec_dataset = reader.openDataSet("/data/detector/daq_rec"); - daq_rec_dataset.read(&daq_rec_data[0], H5::PredType::NATIVE_UINT32); - daq_rec_dataset.getSpace().getSimpleExtentDims(dims); - ASSERT_EQ(dims[0], n_images); - ASSERT_EQ(dims[1], 1); - - auto is_good_frame_data = make_unique(n_images); - auto is_good_frame_dataset = - reader.openDataSet("/data/detector/is_good_frame"); - is_good_frame_dataset.read( - &is_good_frame_data[0], H5::PredType::NATIVE_UINT8); - is_good_frame_dataset.getSpace().getSimpleExtentDims(dims); - ASSERT_EQ(dims[0], n_images); - ASSERT_EQ(dims[1], 1); - - auto name_dataset = reader.openDataSet("/general/detector_name"); - string read_detector_name; - name_dataset.read(read_detector_name, name_dataset.getDataType()); - - ASSERT_EQ(expected_detector_name, read_detector_name); - - uint64_t i_pulse = 0; - for (uint64_t pulse_id=start_pulse_id; - pulse_id<=stop_pulse_id; - pulse_id++) { - - if (pulse_id % step != 0) { - continue; - } - - ASSERT_EQ(pulse_id_data[i_pulse], pulse_id); - ASSERT_EQ(frame_index_data[i_pulse], pulse_id + 10); - ASSERT_EQ(daq_rec_data[i_pulse], pulse_id + 100); - ASSERT_EQ(is_good_frame_data[i_pulse], 1); - - i_pulse++; - } -} - -TEST(JFH5Writer, test_writing_with_step) -{ - // TODO: Write with any number of steps. - // 100Hz - test_writing_with_step(500, 599, 1); - // 50Hz - test_writing_with_step(500, 598, 2); - // 25Hz - test_writing_with_step(500, 596, 4); - // 10Hz - test_writing_with_step(500, 590, 10); - // 1Hz - test_writing_with_step(500, 500, 100); -}