From b45b7d17fa8b340095ec740e876eae8dce7e7ae1 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 20 May 2020 11:45:34 +0200 Subject: [PATCH] Major project refactoring WIP --- core-buffer/CMakeLists.txt | 89 +++---- core-buffer/include/BufferBinaryFormat.hpp | 23 -- core-buffer/include/BufferBinaryWriter.hpp | 32 --- core-buffer/include/BufferUdpReceiver.hpp | 36 --- core-buffer/include/BufferedFastQueue.hpp | 32 --- core-buffer/include/FastQueue.hpp | 40 --- core-buffer/include/LiveH5Reader.hpp | 47 ---- core-buffer/include/LiveRecvModule.hpp | 39 --- core-buffer/include/ReplayH5Reader.hpp | 39 --- core-buffer/include/RingBuffer.hpp | 81 ------ core-buffer/include/UdpReceiver.hpp | 22 -- core-buffer/include/UdpRecvModule.hpp | 33 --- core-buffer/include/WriterH5Writer.hpp | 53 ---- core-buffer/include/WriterUtils.hpp | 11 - core-buffer/include/WriterZmqReceiver.hpp | 31 --- core-buffer/src/buffer/BufferBinaryWriter.cpp | 124 --------- core-buffer/src/buffer/BufferH5Writer.cpp | 126 --------- core-buffer/src/buffer/BufferUdpReceiver.cpp | 137 ---------- core-buffer/src/buffer/UdpReceiver.cpp | 89 ------- core-buffer/src/buffer/WriterUtils.cpp | 49 ---- core-buffer/src/buffer/sf_buffer.cpp | 150 ----------- core-buffer/src/replay/ReplayH5Reader.cpp | 138 ---------- core-buffer/src/replay/sf_replay.cpp | 140 ---------- core-buffer/src/stream/LiveRecvModule.cpp | 223 ---------------- core-buffer/src/stream/sf_stream.cpp | 221 ---------------- core-buffer/src/writer/BufferedFastQueue.cpp | 68 ----- core-buffer/src/writer/FastQueue.cpp | 109 -------- core-buffer/src/writer/WriterH5Writer.cpp | 162 ------------ core-buffer/src/writer/WriterZmqReceiver.cpp | 138 ---------- core-buffer/src/writer/sf_writer.cpp | 179 ------------- core-buffer/test/CMakeLists.txt | 20 +- core-buffer/test/main.cpp | 9 - core-buffer/test/perf/perf_WriterH5Writer.cpp | 90 ------- core-buffer/test/test_BufferBinaryWriter.cpp | 86 ------- core-buffer/test/test_BufferH5Writer.cpp | 130 ---------- core-buffer/test/test_BufferUdpReceiver.cpp | 239 ------------------ core-buffer/test/test_FastQueue.cpp | 147 ----------- core-buffer/test/test_LiveRecvModule.cpp | 87 ------- core-buffer/test/test_ReplayH5Reader.cpp | 110 -------- core-buffer/test/test_UdpReceiver.cpp | 170 ------------- core-buffer/test/test_WriterH5Writer.cpp | 92 ------- core-buffer/test/test_WriterZmqReceiver.cpp | 74 ------ core-buffer/test/test_bitshuffle.cpp | 1 + .../include/BufferH5Writer.hpp | 0 {core-buffer => sf-buffer}/test/mock/udp.hpp | 0 45 files changed, 48 insertions(+), 3868 deletions(-) delete mode 100644 core-buffer/include/BufferBinaryFormat.hpp delete mode 100644 core-buffer/include/BufferBinaryWriter.hpp delete mode 100644 core-buffer/include/BufferUdpReceiver.hpp delete mode 100644 core-buffer/include/BufferedFastQueue.hpp delete mode 100644 core-buffer/include/FastQueue.hpp delete mode 100644 core-buffer/include/LiveH5Reader.hpp delete mode 100644 core-buffer/include/LiveRecvModule.hpp delete mode 100644 core-buffer/include/ReplayH5Reader.hpp delete mode 100644 core-buffer/include/RingBuffer.hpp delete mode 100644 core-buffer/include/UdpReceiver.hpp delete mode 100644 core-buffer/include/UdpRecvModule.hpp delete mode 100644 core-buffer/include/WriterH5Writer.hpp delete mode 100644 core-buffer/include/WriterUtils.hpp delete mode 100644 core-buffer/include/WriterZmqReceiver.hpp delete mode 100644 core-buffer/src/buffer/BufferBinaryWriter.cpp delete mode 100644 core-buffer/src/buffer/BufferH5Writer.cpp delete mode 100644 core-buffer/src/buffer/BufferUdpReceiver.cpp delete mode 100644 core-buffer/src/buffer/UdpReceiver.cpp delete mode 100644 core-buffer/src/buffer/WriterUtils.cpp delete mode 100644 core-buffer/src/buffer/sf_buffer.cpp delete mode 100644 core-buffer/src/replay/ReplayH5Reader.cpp delete mode 100644 core-buffer/src/replay/sf_replay.cpp delete mode 100644 core-buffer/src/stream/LiveRecvModule.cpp delete mode 100644 core-buffer/src/stream/sf_stream.cpp delete mode 100644 core-buffer/src/writer/BufferedFastQueue.cpp delete mode 100644 core-buffer/src/writer/FastQueue.cpp delete mode 100644 core-buffer/src/writer/WriterH5Writer.cpp delete mode 100644 core-buffer/src/writer/WriterZmqReceiver.cpp delete mode 100644 core-buffer/src/writer/sf_writer.cpp delete mode 100644 core-buffer/test/perf/perf_WriterH5Writer.cpp delete mode 100644 core-buffer/test/test_BufferBinaryWriter.cpp delete mode 100644 core-buffer/test/test_BufferH5Writer.cpp delete mode 100644 core-buffer/test/test_BufferUdpReceiver.cpp delete mode 100644 core-buffer/test/test_FastQueue.cpp delete mode 100644 core-buffer/test/test_LiveRecvModule.cpp delete mode 100644 core-buffer/test/test_ReplayH5Reader.cpp delete mode 100644 core-buffer/test/test_UdpReceiver.cpp delete mode 100644 core-buffer/test/test_WriterH5Writer.cpp delete mode 100644 core-buffer/test/test_WriterZmqReceiver.cpp rename {core-buffer => sf-buffer}/include/BufferH5Writer.hpp (100%) rename {core-buffer => sf-buffer}/test/mock/udp.hpp (100%) diff --git a/core-buffer/CMakeLists.txt b/core-buffer/CMakeLists.txt index 1b931e5..5db21bf 100644 --- a/core-buffer/CMakeLists.txt +++ b/core-buffer/CMakeLists.txt @@ -1,63 +1,48 @@ add_subdirectory(external/) file(GLOB SOURCES - src/*.cpp - src/buffer/*.cpp - src/replay/*.cpp - src/stream/*.cpp - src/writer/*.cpp) + src/*.cpp) -add_library(core-buffer STATIC ${SOURCES}) -target_include_directories(core-buffer PUBLIC include/) -target_include_directories(core-buffer PUBLIC external/) +add_library(core-buffer-lib STATIC ${SOURCES}) +target_include_directories(core-buffer-lib PUBLIC include/) +target_include_directories(core-buffer-lib PUBLIC external/) if(CMAKE_BUILD_TYPE STREQUAL "Debug") - target_compile_definitions(core-buffer PRIVATE DEBUG_OUTPUT) + target_compile_definitions(core-buffer-lib PRIVATE DEBUG_OUTPUT) endif() -add_executable(sf-buffer src/buffer/sf_buffer.cpp) -set_target_properties(sf-buffer PROPERTIES OUTPUT_NAME sf_buffer) -target_link_libraries(sf-buffer - core-buffer - zmq - hdf5 - hdf5_hl - hdf5_cpp - boost_system - pthread) - -add_executable(sf-replay src/replay/sf_replay.cpp) -set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay) -target_link_libraries(sf-replay - core-buffer - external - zmq - hdf5 - hdf5_hl - hdf5_cpp - boost_system - pthread) - -add_executable(sf-writer src/writer/sf_writer.cpp) -set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer) -target_link_libraries(sf-writer - core-buffer - external - zmq - hdf5 - hdf5_hl - hdf5_cpp - boost_system - pthread) - -add_executable(sf-stream src/stream/sf_stream.cpp) -set_target_properties(sf-stream PROPERTIES OUTPUT_NAME sf_stream) -target_link_libraries(sf-stream - core-buffer - zmq - jsoncpp - boost_system - pthread) +#add_executable(sf-replay src/replay/sf_replay.cpp) +#set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay) +#target_link_libraries(sf-replay +# core-buffer +# external +# zmq +# hdf5 +# hdf5_hl +# hdf5_cpp +# boost_system +# pthread) +# +#add_executable(sf-writer src/writer/sf_writer.cpp) +#set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer) +#target_link_libraries(sf-writer +# core-buffer +# external +# zmq +# hdf5 +# hdf5_hl +# hdf5_cpp +# boost_system +# pthread) +# +#add_executable(sf-stream src/stream/sf_stream.cpp) +#set_target_properties(sf-stream PROPERTIES OUTPUT_NAME sf_stream) +#target_link_libraries(sf-stream +# core-buffer +# zmq +# jsoncpp +# boost_system +# pthread) enable_testing() add_subdirectory(test/) \ No newline at end of file diff --git a/core-buffer/include/BufferBinaryFormat.hpp b/core-buffer/include/BufferBinaryFormat.hpp deleted file mode 100644 index d9f72cd..0000000 --- a/core-buffer/include/BufferBinaryFormat.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef JFFILEFORMAT_HPP -#define JFFILEFORMAT_HPP - -#include "jungfrau.hpp" - -const char JF_FORMAT_START_BYTE = 0xBE; - -#pragma pack(push) -#pragma pack(1) -struct BufferBinaryFormat { - - BufferBinaryFormat() : FORMAT_MARKER(JF_FORMAT_START_BYTE) {}; - - const char FORMAT_MARKER; - uint64_t pulse_id; - uint64_t frame_id; - uint32_t daq_rec; - uint16_t n_recv_packets; - char data[JUNGFRAU_DATA_BYTES_PER_FRAME]; -}; -#pragma pack(pop) - -#endif // JFFILEFORMAT_HPP \ No newline at end of file diff --git a/core-buffer/include/BufferBinaryWriter.hpp b/core-buffer/include/BufferBinaryWriter.hpp deleted file mode 100644 index ee90afb..0000000 --- a/core-buffer/include/BufferBinaryWriter.hpp +++ /dev/null @@ -1,32 +0,0 @@ -#ifndef BINARYWRITER_HPP -#define BINARYWRITER_HPP - -#include -#include "BufferBinaryFormat.hpp" - -class BufferBinaryWriter { - - const std::string device_name_; - const std::string root_folder_; - std::string latest_filename_; - - std::string current_output_filename_; - int output_file_fd_; - - void open_file(const std::string& filename); - void close_current_file(); - - -public: - BufferBinaryWriter( - const std::string& device_name, - const std::string& root_folder); - - virtual ~BufferBinaryWriter(); - - void write(const uint64_t pulse_id, const BufferBinaryFormat* buffer); - -}; - - -#endif //BINARYWRITER_HPP diff --git a/core-buffer/include/BufferUdpReceiver.hpp b/core-buffer/include/BufferUdpReceiver.hpp deleted file mode 100644 index 2d97a47..0000000 --- a/core-buffer/include/BufferUdpReceiver.hpp +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef SF_DAQ_BUFFER_BUFFERUDPRECEIVER_HPP -#define SF_DAQ_BUFFER_BUFFERUDPRECEIVER_HPP - -#include -#include "UdpReceiver.hpp" -#include "jungfrau.hpp" -#include "buffer_config.hpp" - -class BufferUdpReceiver { - const int source_id_; - - UdpReceiver udp_receiver_; - - jungfrau_packet packet_buffer_[core_buffer::BUFFER_UDP_N_RECV_MSG]; - iovec recv_buff_ptr_[core_buffer::BUFFER_UDP_N_RECV_MSG]; - mmsghdr msgs_[core_buffer::BUFFER_UDP_N_RECV_MSG]; - sockaddr_in sock_from_[core_buffer::BUFFER_UDP_N_RECV_MSG]; - - bool packet_buffer_loaded_ = false; - int packet_buffer_n_packets_ = 0; - int packet_buffer_offset_ = 0; - - inline void init_frame(ModuleFrame& frame_metadata, const int i_packet); - inline void copy_packet_to_buffers( - ModuleFrame& metadata, char* frame_buffer, const int i_packet); - inline uint64_t process_packets( - const int n_packets, ModuleFrame& metadata, char* frame_buffer); - -public: - BufferUdpReceiver(const uint16_t port, const int source_id); - virtual ~BufferUdpReceiver(); - uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); -}; - - -#endif //SF_DAQ_BUFFER_BUFFERUDPRECEIVER_HPP diff --git a/core-buffer/include/BufferedFastQueue.hpp b/core-buffer/include/BufferedFastQueue.hpp deleted file mode 100644 index b690f3e..0000000 --- a/core-buffer/include/BufferedFastQueue.hpp +++ /dev/null @@ -1,32 +0,0 @@ -#ifndef SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP -#define SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP - -#include "FastQueue.hpp" -#include "WriterH5Writer.hpp" - - -class BufferedFastQueue { - FastQueue& queue_; - const size_t buffer_n_pulses_; - const size_t n_modules_; - - ImageMetadataBuffer* queue_meta_buffer_ = nullptr; - char* queue_data_buffer_ = nullptr; - int current_slot_id_ = -1; - - ImageMetadata image_metadata_; - -public: - BufferedFastQueue(FastQueue& queue, - const size_t buffer_n_pulses, - const size_t n_modules); - - ImageMetadata* get_metadata_buffer(); - char* get_data_buffer(); - - void commit(); - void finalize(); -}; - - -#endif //SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP diff --git a/core-buffer/include/FastQueue.hpp b/core-buffer/include/FastQueue.hpp deleted file mode 100644 index 4103c55..0000000 --- a/core-buffer/include/FastQueue.hpp +++ /dev/null @@ -1,40 +0,0 @@ -#ifndef FASTQUEUE_HPP -#define FASTQUEUE_HPP - -#include -#include -#include - -template -class FastQueue { - const size_t slot_n_bytes_; - const size_t n_slots_; - char* buffer_; - std::atomic_int* buffer_status_; - - uint16_t write_slot_id_; - uint16_t read_slot_id_; - -public: - - enum SLOT_STATUS { - EMPTY=0, - RESERVED=1, - READY=2 - }; - - FastQueue(const size_t slot_data_n_bytes, const uint16_t n_slots); - virtual ~FastQueue(); - - T* get_metadata_buffer(const int slot_id); - char* get_data_buffer(const int slot_id); - - int reserve(); - void commit(); - - int read(); - void release(); -}; - - -#endif //FASTQUEUE_HPP diff --git a/core-buffer/include/LiveH5Reader.hpp b/core-buffer/include/LiveH5Reader.hpp deleted file mode 100644 index ebbe69f..0000000 --- a/core-buffer/include/LiveH5Reader.hpp +++ /dev/null @@ -1,47 +0,0 @@ -#ifndef SF_DAQ_BUFFER_LIVEH5READER_HPP -#define SF_DAQ_BUFFER_LIVEH5READER_HPP - -#include -#include -#include "jungfrau.hpp" -#include "buffer_config.hpp" -#include - -class LiveH5Reader { - - const std::string current_filename_; - const uint16_t source_id_; - - std::unique_ptr pulse_id_buffer_; - std::unique_ptr data_buffer_; - - uint64_t current_file_max_pulse_id_; - H5::H5File file_; - - H5::DataSet image_dataset_; - H5::DataSet pulse_id_dataset_; - H5::DataSet frame_index_dataset_; - H5::DataSet daq_rec_dataset_; - H5::DataSet n_received_packets_dataset_; - - void open_file(); - -public: - LiveH5Reader( - const std::string& device, - const std::string& channel_name, - const uint16_t source_id); - - ~LiveH5Reader(); - - uint64_t get_latest_pulse_id(); - void load_pulse_id(uint64_t pulse_id); - - ModuleFrame get_metadata(); - char* get_data(); - - void close_file(); -}; - - -#endif //SF_DAQ_BUFFER_LIVEH5READER_HPP diff --git a/core-buffer/include/LiveRecvModule.hpp b/core-buffer/include/LiveRecvModule.hpp deleted file mode 100644 index be0f8e5..0000000 --- a/core-buffer/include/LiveRecvModule.hpp +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef SF_DAQ_BUFFER_LIVERECVMODULE_HPP -#define SF_DAQ_BUFFER_LIVERECVMODULE_HPP - -#include "FastQueue.hpp" -#include -#include "jungfrau.hpp" -#include - -class LiveRecvModule { - - FastQueue& queue_; - const size_t n_modules_; - void* ctx_; - const std::string ipc_prefix_; - std::atomic_bool is_receiving_; - std::thread receiving_thread_; - -public: - LiveRecvModule( - FastQueue& queue, - const size_t n_modules, - void* ctx, - const std::string& ipc_prefix); - - virtual ~LiveRecvModule(); - void* connect_socket(size_t module_id); - void recv_single_module(void* socket, ModuleFrame* metadata, char* data); - void receive_thread(); - uint64_t align_modules( - const std::vector& sockets, - ModuleFrameBuffer *metadata, - char *data); - - void stop(); - -}; - - -#endif //SF_DAQ_BUFFER_LIVERECVMODULE_HPP diff --git a/core-buffer/include/ReplayH5Reader.hpp b/core-buffer/include/ReplayH5Reader.hpp deleted file mode 100644 index d34beaa..0000000 --- a/core-buffer/include/ReplayH5Reader.hpp +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef SF_DAQ_BUFFER_REPLAYH5READER_HPP -#define SF_DAQ_BUFFER_REPLAYH5READER_HPP - -#include -#include "jungfrau.hpp" -#include -#include -#include "buffer_config.hpp" - - -class ReplayH5Reader { - - const std::string device_; - const std::string channel_name_; - - H5::H5File current_file_; - std::string current_filename_; - H5::DataSet dset_metadata_; - H5::DataSet dset_frame_; - - std::unique_ptr frame_buffer_ = std::make_unique( - core_buffer::MODULE_N_BYTES * core_buffer::REPLAY_READ_BUFFER_SIZE); - std::unique_ptr metadata_buffer_ = - std::make_unique(core_buffer::FILE_MOD); - - uint64_t buffer_start_pulse_id_ = 0; - uint64_t buffer_end_pulse_id_ = 0; - void prepare_buffer_for_pulse(const uint64_t pulse_id); - -public: - ReplayH5Reader(const std::string device, const std::string channel_name); - virtual ~ReplayH5Reader(); - void close_file(); - bool get_frame( - const uint64_t pulse_id, ModuleFrame* metadata, char* frame_buffer); -}; - - -#endif //SF_DAQ_BUFFER_REPLAYH5READER_HPP diff --git a/core-buffer/include/RingBuffer.hpp b/core-buffer/include/RingBuffer.hpp deleted file mode 100644 index 38c9442..0000000 --- a/core-buffer/include/RingBuffer.hpp +++ /dev/null @@ -1,81 +0,0 @@ -#ifndef RINGBUFFER_H -#define RINGBUFFER_H - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "date.h" - -struct FrameMetadata -{ - // Ring buffer needed data. - size_t buffer_slot_index; - size_t frame_bytes_size; - - // Image header data. - uint64_t frame_index; - std::string endianness; - std::string type; - std::vector frame_shape; - - // Pass additional header values. - std::map> header_values; -}; - -struct UdpFrameMetadata -{ - // Ring buffer needed data. - size_t buffer_slot_index; - size_t frame_bytes_size; - - uint64_t pulse_id; - uint64_t frame_index; - uint32_t daq_rec; - uint16_t n_recv_packets; -}; - -template -class RingBuffer -{ - // Initialized in constructor. - size_t n_slots_ = 0; - std::vector ringbuffer_slots_; - - // Set in initialize(). - size_t slot_size_ = 0; - size_t buffer_size_ = 0; - char* frame_data_buffer_ = NULL; - size_t write_index_ = 0; - size_t buffer_used_slots_ = 0; - std::atomic_bool initialized_ = false; - - std::list< std::shared_ptr > frame_metadata_queue_; - std::mutex frame_metadata_queue_mutex_; - std::mutex ringbuffer_slots_mutex_; - - char* get_buffer_slot_address(size_t buffer_slot_index); - - public: - RingBuffer(size_t n_slots); - - virtual ~RingBuffer(); - void initialize(size_t slot_size); - - char* reserve(std::shared_ptr metadata); - void commit(std::shared_ptr metadata); - std::pair, char*> read(); - void release(size_t buffer_slot_index); - - bool is_empty(); - bool is_initialized(); - void clear(); - size_t get_slot_size(); -}; - -#endif diff --git a/core-buffer/include/UdpReceiver.hpp b/core-buffer/include/UdpReceiver.hpp deleted file mode 100644 index 0bea8ef..0000000 --- a/core-buffer/include/UdpReceiver.hpp +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef UDPRECEIVER_H -#define UDPRECEIVER_H - -#include - -class UdpReceiver { - - int socket_fd_; - -public: - UdpReceiver(); - virtual ~UdpReceiver(); - - bool receive(void* buffer, const size_t buffer_n_bytes); - int receive_many(mmsghdr* msgs, const size_t n_msgs); - - void bind(const uint16_t port); - void disconnect(); -}; - - -#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H diff --git a/core-buffer/include/UdpRecvModule.hpp b/core-buffer/include/UdpRecvModule.hpp deleted file mode 100644 index 36e9afc..0000000 --- a/core-buffer/include/UdpRecvModule.hpp +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef UDPRECVMODULE_HPP -#define UDPRECVMODULE_HPP - -#include "RingBuffer.hpp" -#include "FastQueue.hpp" -#include "jungfrau.hpp" -#include - -class UdpRecvModule { - - FastQueue& queue_; - std::thread receiving_thread_; - std::atomic_bool is_receiving_; - - inline void init_frame( - ModuleFrame* frame_metadata, - jungfrau_packet& packet_buffer); - - inline void reserve_next_frame_buffers( - ModuleFrame*& frame_metadata, - char*& frame_buffer); - - protected: - void receive_thread(const uint16_t udp_port); - - public: - UdpRecvModule(FastQueue& queue, const uint16_t udp_port); - virtual ~UdpRecvModule(); - -}; - - -#endif // UDPRECVMODULE_HPP diff --git a/core-buffer/include/WriterH5Writer.hpp b/core-buffer/include/WriterH5Writer.hpp deleted file mode 100644 index d564cb9..0000000 --- a/core-buffer/include/WriterH5Writer.hpp +++ /dev/null @@ -1,53 +0,0 @@ -#ifndef SFWRITER_HPP -#define SFWRITER_HPP - -#include -#include -#include -#include "buffer_config.hpp" - -struct ImageMetadataBuffer -{ - uint64_t pulse_id[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; - uint64_t frame_index[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; - uint32_t daq_rec[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; - uint8_t is_good_frame[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; - uint64_t data_n_bytes[core_buffer::WRITER_DATA_CACHE_N_IMAGES]; - uint16_t n_pulses_in_buffer; -}; - -struct ImageMetadata -{ - uint64_t pulse_id; - uint64_t frame_index; - uint32_t daq_rec; - uint8_t is_good_frame; - uint64_t data_n_bytes; -}; - -class WriterH5Writer { - - const size_t n_frames_; - const size_t n_modules_; - size_t current_write_index_; - - H5::H5File file_; - - H5::DataSet image_dataset_; - H5::DataSet pulse_id_dataset_; - H5::DataSet frame_index_dataset_; - H5::DataSet daq_rec_dataset_; - H5::DataSet is_good_frame_dataset_; - - -public: - WriterH5Writer(const std::string& output_file, - const size_t n_frames, - const size_t n_modules); - ~WriterH5Writer(); - void write(const ImageMetadataBuffer* metadata, const char* data); - void close_file(); -}; - - -#endif //SFWRITER_HPP diff --git a/core-buffer/include/WriterUtils.hpp b/core-buffer/include/WriterUtils.hpp deleted file mode 100644 index 266518f..0000000 --- a/core-buffer/include/WriterUtils.hpp +++ /dev/null @@ -1,11 +0,0 @@ -#ifndef WRITERUTILS_H -#define WRITERUTILS_H - -#include - -namespace WriterUtils { - void set_process_effective_id(int user_id); - void create_destination_folder(const std::string& output_file); -} - -#endif // WRITERUTILS_H diff --git a/core-buffer/include/WriterZmqReceiver.hpp b/core-buffer/include/WriterZmqReceiver.hpp deleted file mode 100644 index cde1c8b..0000000 --- a/core-buffer/include/WriterZmqReceiver.hpp +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP -#define SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP - -#include -#include "WriterH5Writer.hpp" -#include - - -class WriterZmqReceiver { - - const size_t n_modules_; - std::vector sockets_; - - StreamModuleFrame frame_metadata; - -public: - WriterZmqReceiver( - void *ctx, - const std::string& ipc_prefix, - const size_t n_modules); - - virtual ~WriterZmqReceiver(); - - void get_next_image( - const uint64_t pulse_id, - ImageMetadata* image_metadata, - char* image_buffer); -}; - - -#endif //SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP diff --git a/core-buffer/src/buffer/BufferBinaryWriter.cpp b/core-buffer/src/buffer/BufferBinaryWriter.cpp deleted file mode 100644 index 750ef76..0000000 --- a/core-buffer/src/buffer/BufferBinaryWriter.cpp +++ /dev/null @@ -1,124 +0,0 @@ -#include "BufferBinaryWriter.hpp" -#include -#include -#include "date.h" -#include -#include -#include -#include -#include -#include - -using namespace std; - -BufferBinaryWriter::BufferBinaryWriter( - const string& device_name, - const string& root_folder) : - device_name_(device_name), - root_folder_(root_folder), - latest_filename_(root_folder + "/" + device_name + "/LATEST"), - current_output_filename_(""), - output_file_fd_(-1) -{ -} - -BufferBinaryWriter::~BufferBinaryWriter() -{ - close_current_file(); -} - -void BufferBinaryWriter::write(uint64_t pulse_id, const BufferBinaryFormat* buffer) -{ - auto current_frame_file = - BufferUtils::get_filename(root_folder_, device_name_, pulse_id); - - if (current_frame_file != current_output_filename_) { - open_file(current_frame_file); - } - - size_t n_bytes_offset = - BufferUtils::get_file_frame_index(pulse_id) * sizeof(BufferBinaryFormat); - - auto lseek_result = lseek(output_file_fd_, n_bytes_offset, SEEK_SET); - if (lseek_result < 0) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BinaryWriter::write]"; - err_msg << " Error while lseek on file "; - err_msg << current_output_filename_; - err_msg << " for n_bytes_offset "; - err_msg << n_bytes_offset << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - auto n_bytes = ::write(output_file_fd_, buffer, sizeof(BufferBinaryFormat)); - if (n_bytes < sizeof(BufferBinaryFormat)) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BinaryWriter::write]"; - err_msg << " Error while writing to file "; - err_msg << current_output_filename_ << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } -} - -void BufferBinaryWriter::open_file(const std::string& filename) -{ - close_current_file(); - - WriterUtils::create_destination_folder(filename); - - output_file_fd_ = ::open(filename.c_str(), O_WRONLY | O_CREAT, - S_IRWXU | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); - if (output_file_fd_ < 0) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BinaryWriter::open_file]"; - err_msg << " Cannot create file "; - err_msg << filename << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - current_output_filename_ = filename; -} - -void BufferBinaryWriter::close_current_file() -{ - if (output_file_fd_ != -1) { - if (close(output_file_fd_) < 0) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[BinaryWriter::close_current_file]"; - err_msg << " Error while closing file "; - err_msg << current_output_filename_ << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - output_file_fd_ = -1; - - BufferUtils::update_latest_file( - latest_filename_, current_output_filename_); - } - - current_output_filename_ = ""; -} \ No newline at end of file diff --git a/core-buffer/src/buffer/BufferH5Writer.cpp b/core-buffer/src/buffer/BufferH5Writer.cpp deleted file mode 100644 index ed7ce39..0000000 --- a/core-buffer/src/buffer/BufferH5Writer.cpp +++ /dev/null @@ -1,126 +0,0 @@ -#include -#include "BufferH5Writer.hpp" -#include -#include -#include - -extern "C" -{ - #include "H5DOpublic.h" -} - -using namespace std; -using namespace core_buffer; - -BufferH5Writer::BufferH5Writer( - const string& root_folder, - const string& device_name) : - root_folder_(root_folder), - device_name_(device_name), - LATEST_filename_(root_folder + "/" + device_name + "/LATEST"), - CURRENT_filename_(root_folder + "/" + device_name + "/CURRENT"), - output_filename_(""), - current_pulse_id_(0), - current_file_index_(0) -{ -} - -void BufferH5Writer::create_file(const string& filename) -{ - - h5_file_ = H5::H5File(filename, H5F_ACC_TRUNC); - - output_filename_ = filename; - - H5::DataSpace data_dspace(3, data_disk_dims, data_disk_dims); - H5::DSetCreatPropList data_dset_prop; - hsize_t data_dset_chunking[3] = {1, MODULE_Y_SIZE, MODULE_X_SIZE}; - data_dset_prop.setChunk(3, data_dset_chunking); - - current_image_dataset_ = h5_file_.createDataSet( - BUFFER_H5_FRAME_DATASET, - H5::PredType::NATIVE_UINT16, - data_dspace, - data_dset_prop); - - H5::DataSpace meta_dspace(2, meta_disk_dims, meta_disk_dims); - H5::DSetCreatPropList meta_dset_prop; - hsize_t meta_dset_chunking[2] = {1, ModuleFrame_N_FIELDS}; - meta_dset_prop.setChunk(2, meta_dset_chunking); - - current_metadata_dataset_ = h5_file_.createDataSet( - BUFFER_H5_METADATA_DATASET, - H5::PredType::NATIVE_UINT64, - meta_dspace, - meta_dset_prop); -} - -BufferH5Writer::~BufferH5Writer() -{ - close_file(); -} - -void BufferH5Writer::close_file() { - current_image_dataset_.close(); - current_metadata_dataset_.close(); - - h5_file_.close(); - output_filename_ = ""; - - current_pulse_id_ = 0; - current_file_index_ = 0; -} - -void BufferH5Writer::set_pulse_id(const uint64_t pulse_id) -{ - current_pulse_id_ = pulse_id; - current_file_index_ = BufferUtils::get_file_frame_index(pulse_id); - - auto new_output_filename = BufferUtils::get_filename( - root_folder_, device_name_, pulse_id); - - if (new_output_filename != output_filename_){ - - if (h5_file_.getId() != -1) { - auto latest_filename = output_filename_; - close_file(); - BufferUtils::update_latest_file(LATEST_filename_, latest_filename); - } - - WriterUtils::create_destination_folder(new_output_filename); - create_file(new_output_filename); - - BufferUtils::update_latest_file(CURRENT_filename_, output_filename_); - } -} - -void BufferH5Writer::write(const ModuleFrame* metadata, const char* data) -{ - hsize_t meta_buff_dims[1] = {ModuleFrame_N_FIELDS}; - H5::DataSpace meta_buffer_space (1, meta_buff_dims); - - H5::DataSpace meta_disk_space(2, meta_disk_dims); - hsize_t meta_count[] = {1, ModuleFrame_N_FIELDS}; - hsize_t meta_start[] = {current_file_index_, 0}; - meta_disk_space.selectHyperslab(H5S_SELECT_SET, meta_count, meta_start); - - current_metadata_dataset_.write( - (char*) metadata, - H5::PredType::NATIVE_UINT64, - meta_buffer_space, - meta_disk_space); - - hsize_t data_buff_dims[2] = {MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DataSpace data_buffer_space (2, data_buff_dims); - - H5::DataSpace data_disk_space(3, data_disk_dims); - hsize_t data_count[] = {1, MODULE_Y_SIZE, MODULE_X_SIZE}; - hsize_t data_start[] = {current_file_index_, 0, 0}; - data_disk_space.selectHyperslab(H5S_SELECT_SET, data_count, data_start); - - current_image_dataset_.write( - data, - H5::PredType::NATIVE_UINT16, - data_buffer_space, - data_disk_space); -} diff --git a/core-buffer/src/buffer/BufferUdpReceiver.cpp b/core-buffer/src/buffer/BufferUdpReceiver.cpp deleted file mode 100644 index 68b51dc..0000000 --- a/core-buffer/src/buffer/BufferUdpReceiver.cpp +++ /dev/null @@ -1,137 +0,0 @@ -#include -#include -#include "BufferUdpReceiver.hpp" - -using namespace std; -using namespace core_buffer; - -BufferUdpReceiver::BufferUdpReceiver( - const uint16_t port, - const int source_id) : - source_id_(source_id) -{ - udp_receiver_.bind(port); - - for (int i = 0; i < BUFFER_UDP_N_RECV_MSG; i++) { - recv_buff_ptr_[i].iov_base = (void*) &(packet_buffer_[i]); - recv_buff_ptr_[i].iov_len = sizeof(jungfrau_packet); - - msgs_[i].msg_hdr.msg_iov = &recv_buff_ptr_[i]; - msgs_[i].msg_hdr.msg_iovlen = 1; - msgs_[i].msg_hdr.msg_name = &sock_from_[i]; - msgs_[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); - } -} - -BufferUdpReceiver::~BufferUdpReceiver() { - udp_receiver_.disconnect(); -} - -inline void BufferUdpReceiver::init_frame( - ModuleFrame& frame_metadata, const int i_packet) -{ - frame_metadata.pulse_id = packet_buffer_[i_packet].bunchid; - frame_metadata.frame_index = packet_buffer_[i_packet].framenum; - frame_metadata.daq_rec = (uint64_t) packet_buffer_[i_packet].debug; - frame_metadata.module_id = (int64_t) source_id_; -} - -inline void BufferUdpReceiver::copy_packet_to_buffers( - ModuleFrame& metadata, char* frame_buffer, const int i_packet) -{ - size_t frame_buffer_offset = - JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer_[i_packet].packetnum; - memcpy( - (void*) (frame_buffer + frame_buffer_offset), - packet_buffer_[i_packet].data, - JUNGFRAU_DATA_BYTES_PER_PACKET); - - metadata.n_received_packets++; -} - -inline uint64_t BufferUdpReceiver::process_packets( - const int start_offset, - ModuleFrame& metadata, - char* frame_buffer) -{ - for ( - int i_packet=start_offset; - i_packet < packet_buffer_n_packets_; - i_packet++) { - - // First packet for this frame. - if (metadata.pulse_id == 0) { - init_frame(metadata, i_packet); - - // Happens if the last packet from the previous frame gets lost. - } else if (metadata.pulse_id != packet_buffer_[i_packet].bunchid) { - packet_buffer_loaded_ = true; - // Continue on this packet. - packet_buffer_offset_ = i_packet; - - return metadata.pulse_id; - } - - copy_packet_to_buffers(metadata, frame_buffer, i_packet); - - // Last frame packet received. Frame finished. - if (packet_buffer_[i_packet].packetnum == - JUNGFRAU_N_PACKETS_PER_FRAME-1) - { - // Buffer is loaded only if this is not the last message. - if (i_packet+1 != packet_buffer_n_packets_) { - packet_buffer_loaded_ = true; - // Continue on next packet. - packet_buffer_offset_ = i_packet + 1; - - // If i_packet is the last packet the buffer is empty. - } else { - packet_buffer_loaded_ = false; - packet_buffer_offset_ = 0; - } - - return metadata.pulse_id; - } - } - // We emptied the buffer. - packet_buffer_loaded_ = false; - packet_buffer_offset_ = 0; - - return 0; -} - -uint64_t BufferUdpReceiver::get_frame_from_udp( - ModuleFrame& metadata, char* frame_buffer) -{ - // Reset the metadata and frame buffer for the next frame. - metadata.pulse_id = 0; - metadata.n_received_packets = 0; - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - - // Happens when last packet from previous frame was missed. - if (packet_buffer_loaded_) { - - auto pulse_id = process_packets( - packet_buffer_offset_, metadata, frame_buffer); - - if (pulse_id != 0) { - return pulse_id; - } - } - - while (true) { - - packet_buffer_n_packets_ = udp_receiver_.receive_many( - msgs_, BUFFER_UDP_N_RECV_MSG); - - if (packet_buffer_n_packets_ == 0) { - continue; - } - - auto pulse_id = process_packets(0, metadata, frame_buffer); - - if (pulse_id != 0) { - return pulse_id; - } - } -} \ No newline at end of file diff --git a/core-buffer/src/buffer/UdpReceiver.cpp b/core-buffer/src/buffer/UdpReceiver.cpp deleted file mode 100644 index 1832674..0000000 --- a/core-buffer/src/buffer/UdpReceiver.cpp +++ /dev/null @@ -1,89 +0,0 @@ -#include -#include -#include "UdpReceiver.hpp" -#include "jungfrau.hpp" -#include -#include -#include "buffer_config.hpp" - -using namespace std; -using namespace core_buffer; - -UdpReceiver::UdpReceiver() : - socket_fd_(-1) -{ -} - -UdpReceiver::~UdpReceiver() -{ - disconnect(); -} - -void UdpReceiver::bind(const uint16_t port) -{ - if (socket_fd_ > -1) { - throw runtime_error("Socket already bound."); - } - - socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); - if (socket_fd_ < 0) { - throw runtime_error("Cannot open socket."); - } - - sockaddr_in server_address = {0}; - server_address.sin_family = AF_INET; - server_address.sin_addr.s_addr = INADDR_ANY; - server_address.sin_port = htons(port); - - timeval udp_socket_timeout; - udp_socket_timeout.tv_sec = 0; - udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT; - - if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, - &udp_socket_timeout, sizeof(timeval)) == -1) { - throw runtime_error( - "Cannot set SO_RCVTIMEO. " + string(strerror(errno))); - } - - if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, - &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) { - throw runtime_error( - "Cannot set SO_RCVBUF. " + string(strerror(errno))); - }; - //TODO: try to set SO_RCVLOWAT - - auto bind_result = ::bind( - socket_fd_, - reinterpret_cast(&server_address), - sizeof(server_address)); - - if (bind_result < 0) { - throw runtime_error("Cannot bind socket."); - } -} - -int UdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs) -{ - return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0); -} - -bool UdpReceiver::receive(void* buffer, const size_t buffer_n_bytes) -{ - auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0); - - if (data_len < 0) { - return false; - } - - if (data_len != buffer_n_bytes) { - return false; - } - - return true; -} - -void UdpReceiver::disconnect() -{ - close(socket_fd_); - socket_fd_ = -1; -} diff --git a/core-buffer/src/buffer/WriterUtils.cpp b/core-buffer/src/buffer/WriterUtils.cpp deleted file mode 100644 index 7653bee..0000000 --- a/core-buffer/src/buffer/WriterUtils.cpp +++ /dev/null @@ -1,49 +0,0 @@ -#include -#include - -#include "WriterUtils.hpp" -#include "date.h" - -using namespace std; - -void WriterUtils::set_process_effective_id(int user_id) -{ - - // TODO: use setfsuid and setfsgid - - if (setegid(user_id)) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[WriterUtils::set_process_effective_id]"; - err_msg << " Cannot set group_id to " << user_id << endl; - - throw runtime_error(err_msg.str()); - } - - if (seteuid(user_id)) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[WriterUtils::set_process_effective_id]"; - err_msg << " Cannot set user_id to " << user_id << endl; - - throw runtime_error(err_msg.str()); - } -} - -void WriterUtils::create_destination_folder(const string& output_file) -{ - auto file_separator_index = output_file.rfind('/'); - - if (file_separator_index != string::npos) { - string output_folder(output_file.substr(0, file_separator_index)); - - string create_folder_command("mkdir -p " + output_folder); - system(create_folder_command.c_str()); - } -} diff --git a/core-buffer/src/buffer/sf_buffer.cpp b/core-buffer/src/buffer/sf_buffer.cpp deleted file mode 100644 index 3d804d0..0000000 --- a/core-buffer/src/buffer/sf_buffer.cpp +++ /dev/null @@ -1,150 +0,0 @@ -#include -#include -#include -#include -#include "zmq.h" -#include "buffer_config.hpp" -#include "jungfrau.hpp" -#include "BufferUdpReceiver.hpp" - -#include -#include - - - -using namespace std; -using namespace core_buffer; - -int main (int argc, char *argv[]) { - if (argc != 5) { - cout << endl; - cout << "Usage: sf_buffer [device_name] [udp_port] [root_folder]"; - cout << "[source_id]"; - cout << endl; - cout << "\tdevice_name: Name to write to disk."; - cout << "\tudp_port: UDP port to connect to." << endl; - cout << "\troot_folder: FS root folder." << endl; - cout << "\tsource_id: ID of the source for live stream." << endl; - cout << endl; - - exit(-1); - } - - string device_name = string(argv[1]); - int udp_port = atoi(argv[2]); - string root_folder = string(argv[3]); - int source_id = atoi(argv[4]); - - stringstream ipc_stream; - ipc_stream << BUFFER_LIVE_IPC_URL << source_id; - const auto ipc_address = ipc_stream.str(); - - auto ctx = zmq_ctx_new(); - auto socket = zmq_socket(ctx, ZMQ_PUB); - - const int sndhwm = BUFFER_ZMQ_SNDHWM; - if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) - throw runtime_error(strerror (errno)); - - const int linger_ms = 0; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) - throw runtime_error(strerror (errno)); - - if (zmq_bind(socket, ipc_address.c_str()) != 0) - throw runtime_error(strerror (errno)); - - uint64_t stats_counter(0); - uint64_t n_missed_packets = 0; - uint64_t n_missed_frames = 0; - uint64_t n_corrupted_frames = 0; - uint64_t last_pulse_id = 0; - - BufferH5Writer writer(root_folder, device_name); - BufferUdpReceiver receiver(udp_port, source_id); - - pid_t tid; - tid = syscall(SYS_gettid); - int ret = setpriority(PRIO_PROCESS, tid, 0); - if (ret == -1) throw runtime_error("cannot set nice"); - - ModuleFrame metadata; - auto frame_buffer = new char[MODULE_N_BYTES * JUNGFRAU_N_MODULES]; - - size_t write_total_us = 0; - size_t write_max_us = 0; - size_t send_total_us = 0; - size_t send_max_us = 0; - - while (true) { - - auto pulse_id = receiver.get_frame_from_udp(metadata, frame_buffer); - - auto start_time = chrono::steady_clock::now(); - - writer.set_pulse_id(pulse_id); - writer.write(&metadata, frame_buffer); - - auto write_end_time = chrono::steady_clock::now(); - auto write_us_duration = chrono::duration_cast( - write_end_time-start_time).count(); - - start_time = chrono::steady_clock::now(); - - zmq_send(socket, &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); - zmq_send(socket, frame_buffer, MODULE_N_BYTES, 0); - - auto send_end_time = chrono::steady_clock::now(); - auto send_us_duration = chrono::duration_cast( - send_end_time-start_time).count(); - - // TODO: Make real statistics, please. - stats_counter++; - write_total_us += write_us_duration; - send_total_us += send_us_duration; - - if (write_us_duration > write_max_us) { - write_max_us = write_us_duration; - } - - if (send_us_duration > send_max_us) { - send_max_us = send_us_duration; - } - - if (metadata.n_received_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { - n_missed_packets += - JUNGFRAU_N_PACKETS_PER_FRAME - metadata.n_received_packets; - n_corrupted_frames++; - } - - if (last_pulse_id>0) { - n_missed_frames += (pulse_id - last_pulse_id) - 1; - } - last_pulse_id = pulse_id; - - if (stats_counter == STATS_MODULO) { - cout << "sf_buffer:device_name " << device_name; - cout << " sf_buffer:pulse_id " << pulse_id; - cout << " sf_buffer:n_missed_frames " << n_missed_frames; - cout << " sf_buffer:n_missed_packets " << n_missed_packets; - cout << " sf_buffer:n_corrupted_frames " << n_corrupted_frames; - - cout << " sf_buffer:write_total_us " << write_total_us/STATS_MODULO; - cout << " sf_buffer:write_max_us " << write_max_us; - cout << " sf_buffer:send_total_us " << send_total_us/STATS_MODULO; - cout << " sf_buffer:send_max_us " << send_max_us; - cout << endl; - - stats_counter = 0; - n_missed_packets = 0; - n_corrupted_frames = 0; - n_missed_frames = 0; - - write_total_us = 0; - write_max_us = 0; - send_total_us = 0; - send_max_us = 0; - } - } - - delete[] frame_buffer; -} diff --git a/core-buffer/src/replay/ReplayH5Reader.cpp b/core-buffer/src/replay/ReplayH5Reader.cpp deleted file mode 100644 index 24dc53d..0000000 --- a/core-buffer/src/replay/ReplayH5Reader.cpp +++ /dev/null @@ -1,138 +0,0 @@ -#include "ReplayH5Reader.hpp" - -#include "BufferUtils.hpp" -#include -#include -#include -#include "date.h" - -using namespace std; -using namespace core_buffer; - -void ReplayH5Reader::prepare_buffer_for_pulse(const uint64_t pulse_id) -{ - auto pulse_filename = BufferUtils::get_filename( - device_, channel_name_, pulse_id); - - if (pulse_filename != current_filename_) { - close_file(); - - current_filename_ = pulse_filename; - current_file_ = H5::H5File(current_filename_, H5F_ACC_RDONLY); - - dset_metadata_ = current_file_.openDataSet(BUFFER_H5_METADATA_DATASET); - dset_frame_ = current_file_.openDataSet(BUFFER_H5_FRAME_DATASET); - - // We always read the metadata for the entire file. - hsize_t b_metadata_dims[2] = - {FILE_MOD, ModuleFrame_N_FIELDS}; - H5::DataSpace b_m_space (2, b_metadata_dims); - hsize_t b_m_count[] = - {FILE_MOD, ModuleFrame_N_FIELDS}; - hsize_t b_m_start[] = {0, 0}; - b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); - - hsize_t f_metadata_dims[2] = {FILE_MOD, ModuleFrame_N_FIELDS}; - H5::DataSpace f_m_space (2, f_metadata_dims); - hsize_t f_m_count[] = - {FILE_MOD, ModuleFrame_N_FIELDS}; - hsize_t f_m_start[] = {0, 0}; - f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start); - - dset_metadata_.read(&(metadata_buffer_[0]), H5::PredType::NATIVE_UINT64, - b_m_space, f_m_space); - - buffer_start_pulse_id_ = 0; - buffer_end_pulse_id_ = 0; - } - - // End pulse_id is not included in the buffer. - if ((pulse_id >= buffer_start_pulse_id_) && - (pulse_id < buffer_end_pulse_id_)) { - return; - } - - buffer_start_pulse_id_ = pulse_id - (pulse_id % REPLAY_READ_BUFFER_SIZE); - buffer_end_pulse_id_ = buffer_start_pulse_id_ + REPLAY_READ_BUFFER_SIZE; - - auto start_index_in_file = BufferUtils::get_file_frame_index( - buffer_start_pulse_id_); - - hsize_t b_image_dims[3] = - {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DataSpace b_f_space (3, b_image_dims); - hsize_t b_i_count[] = - {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; - hsize_t b_i_start[] = {0, 0, 0}; - b_f_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); - - hsize_t f_frame_dims[3] = {FILE_MOD, MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DataSpace f_f_space (3, f_frame_dims); - hsize_t f_f_count[] = - {REPLAY_READ_BUFFER_SIZE, MODULE_Y_SIZE, MODULE_X_SIZE}; - hsize_t f_f_start[] = {start_index_in_file, 0, 0}; - f_f_space.selectHyperslab(H5S_SELECT_SET, f_f_count, f_f_start); - - dset_frame_.read(&(frame_buffer_[0]), H5::PredType::NATIVE_UINT16, - b_f_space, f_f_space); -} - -ReplayH5Reader::ReplayH5Reader( - const string device, - const string channel_name) : - device_(device), - channel_name_(channel_name) -{ -} - -ReplayH5Reader::~ReplayH5Reader() -{ - close_file(); -} - -void ReplayH5Reader::close_file() -{ - if (current_file_.getId() != -1) { - dset_metadata_.close(); - dset_frame_.close(); - current_file_.close(); - } -} - -bool ReplayH5Reader::get_frame( - const uint64_t pulse_id, ModuleFrame* metadata, char* frame_buffer) -{ - prepare_buffer_for_pulse(pulse_id); - - auto metadata_buffer_index = BufferUtils::get_file_frame_index(pulse_id); - memcpy(metadata, - &(metadata_buffer_[metadata_buffer_index]), - sizeof(ModuleFrame)); - - auto frame_buffer_index = pulse_id - buffer_start_pulse_id_; - memcpy(frame_buffer, - &(frame_buffer_[frame_buffer_index * MODULE_N_BYTES]), - MODULE_N_BYTES); - - if (metadata->pulse_id == 0) { - // Signal that there is no frame at this pulse_id. - metadata->pulse_id = pulse_id; - return false; - - }else if (metadata->pulse_id != pulse_id) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[ReplayH5Reader::get_frame]"; - err_msg << " Corrupted file " << current_filename_; - err_msg << " index_in_file " << metadata_buffer_index; - err_msg << " expected pulse_id " << pulse_id; - err_msg << " but read " << metadata->pulse_id << endl; - - throw runtime_error(err_msg.str()); - } - - return true; -} diff --git a/core-buffer/src/replay/sf_replay.cpp b/core-buffer/src/replay/sf_replay.cpp deleted file mode 100644 index 48164c6..0000000 --- a/core-buffer/src/replay/sf_replay.cpp +++ /dev/null @@ -1,140 +0,0 @@ -#include -#include -#include "jungfrau.hpp" - -#include "zmq.h" -#include "buffer_config.hpp" - -#include -#include -#include "date.h" -#include "bitshuffle/bitshuffle.h" - -using namespace std; -using namespace core_buffer; - -void sf_replay ( - void* socket, - const string& device, - const string& channel_name, - const uint64_t start_pulse_id, - const uint64_t stop_pulse_id) -{ - StreamModuleFrame metadata_buffer; - auto frame_buffer = make_unique(MODULE_N_PIXELS); - - ReplayH5Reader file_reader(device, channel_name); - - //TODO: Add statstics. - uint64_t stats_counter = 0; - - uint64_t total_read_us = 0; - uint64_t max_read_us = 0; - uint64_t total_send_us = 0; - uint64_t max_send_us = 0; - - // "<= stop_pulse_id" because we include the stop_pulse_id in the file. - for ( - uint64_t curr_pulse_id = start_pulse_id; - curr_pulse_id <= stop_pulse_id; - curr_pulse_id++) { - - auto start_time = chrono::steady_clock::now(); - - metadata_buffer.is_frame_present = file_reader.get_frame( - curr_pulse_id, - &(metadata_buffer.metadata), - (char*)(frame_buffer.get())); - - metadata_buffer.data_n_bytes = MODULE_N_BYTES; - - auto end_time = chrono::steady_clock::now(); - auto read_us_duration = chrono::duration_cast( - end_time-start_time).count(); - - start_time = chrono::steady_clock::now(); - - zmq_send(socket, - &metadata_buffer, - sizeof(StreamModuleFrame), - ZMQ_SNDMORE); - zmq_send(socket, - (char*)(frame_buffer.get()), - metadata_buffer.data_n_bytes, - 0); - - end_time = chrono::steady_clock::now(); - auto send_us_duration = chrono::duration_cast( - end_time-start_time).count(); - - // TODO: Make proper stastistics. - stats_counter++; - total_read_us += read_us_duration; - max_read_us = max(max_read_us, (uint64_t)read_us_duration); - - total_send_us += send_us_duration; - max_send_us = max(max_send_us, (uint64_t)send_us_duration); - - if (stats_counter == STATS_MODULO) { - cout << "sf_replay:avg_read_us " << total_read_us/STATS_MODULO; - cout << " sf_replay:max_read_us " << max_read_us; - cout << " sf_replay:avg_send_us " << total_send_us/STATS_MODULO; - cout << " sf_replay:max_send_us " << max_send_us; - - cout << endl; - - stats_counter = 0; - total_read_us = 0; - max_read_us = 0; - total_send_us = 0; - max_send_us = 0; - } - } -} - -int main (int argc, char *argv[]) { - - if (argc != 6) { - cout << endl; - cout << "Usage: sf_replay [device]"; - cout << " [channel_name] [source_id] [start_pulse_id] [stop_pulse_id]"; - cout << endl; - cout << "\tdevice: Name of detector." << endl; - cout << "\tchannel_name: M00-M31 for JF16M." << endl; - cout << "\tsource_id: Module index" << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; - cout << endl; - - exit(-1); - } - - const string device = string(argv[1]); - const string channel_name = string(argv[2]); - const auto source_id = (uint16_t) atoi(argv[3]); - const auto start_pulse_id = (uint64_t) atoll(argv[4]); - const auto stop_pulse_id = (uint64_t) atoll(argv[5]); - - stringstream ipc_stream; - ipc_stream << REPLAY_STREAM_IPC_URL << (int)source_id; - const auto ipc_address = ipc_stream.str(); - - auto ctx = zmq_ctx_new(); - auto socket = zmq_socket(ctx, ZMQ_PUSH); - - const int sndhwm = REPLAY_SNDHWM; - if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) - throw runtime_error(strerror (errno)); - - const int linger_ms = -1; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) - throw runtime_error(strerror (errno)); - - if (zmq_bind(socket, ipc_address.c_str()) != 0) - throw runtime_error(strerror (errno)); - - sf_replay(socket, device, channel_name, start_pulse_id, stop_pulse_id); - - zmq_close(socket); - zmq_ctx_destroy(ctx); -} diff --git a/core-buffer/src/stream/LiveRecvModule.cpp b/core-buffer/src/stream/LiveRecvModule.cpp deleted file mode 100644 index d18054b..0000000 --- a/core-buffer/src/stream/LiveRecvModule.cpp +++ /dev/null @@ -1,223 +0,0 @@ -#include "LiveRecvModule.hpp" -#include "date.h" -#include -#include -#include "zmq.h" -#include "buffer_config.hpp" - -using namespace std; -using namespace core_buffer; - -LiveRecvModule::LiveRecvModule( - FastQueue& queue_, - const size_t n_modules, - void* ctx_, - const string& ipc_prefix) : - queue_(queue_), - n_modules_(n_modules), - ctx_(ctx_), - ipc_prefix_(ipc_prefix), - is_receiving_(true) -{ - receiving_thread_ = thread(&LiveRecvModule::receive_thread, this); -} - -LiveRecvModule::~LiveRecvModule() -{ - stop(); -} - -void LiveRecvModule::stop() -{ - is_receiving_ = false; - receiving_thread_.join(); -} - -void* LiveRecvModule::connect_socket(size_t module_id) -{ - void* sock = zmq_socket(ctx_, ZMQ_SUB); - if (sock == nullptr) { - throw runtime_error(zmq_strerror(errno)); - } - - int rcvhwm = STREAM_RCVHWM; - if (zmq_setsockopt(sock, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - int linger = 0; - if (zmq_setsockopt(sock, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - stringstream ipc_addr; - ipc_addr << ipc_prefix_ << module_id; - const auto ipc = ipc_addr.str(); - - if (zmq_connect(sock, ipc.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - if (zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - return sock; -} - -void LiveRecvModule::recv_single_module( - void* socket, ModuleFrame* metadata, char* data) -{ - auto n_bytes_metadata = zmq_recv( - socket, - metadata, - sizeof(ModuleFrame), - 0); - - if (n_bytes_metadata == -1) { - throw runtime_error(zmq_strerror(errno)); - }else if (n_bytes_metadata != sizeof(ModuleFrame)) { - throw runtime_error("Stream header of wrong size."); - } - - if (metadata->pulse_id == 0) { - throw runtime_error("Received invalid pulse_id=0."); - } - - auto n_bytes_image = zmq_recv( - socket, - data, - MODULE_N_BYTES, - 0); - - if (n_bytes_image == -1) { - throw runtime_error(zmq_strerror(errno)); - } else if (n_bytes_image != MODULE_N_BYTES) { - throw runtime_error("Stream data of wrong size."); - } -} - -uint64_t LiveRecvModule::align_modules( - const vector& sockets, ModuleFrameBuffer *metadata, char *data) -{ - uint64_t max_pulse_id = 0; - - // First pass - determine current max_pulse_id. - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - auto& module_metadata = metadata->module[i_module]; - max_pulse_id = max(max_pulse_id, module_metadata.pulse_id); - } - - // Second pass - align all receivers to max_pulse_id. - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - auto& module_metadata = metadata->module[i_module]; - - size_t diff_to_max = max_pulse_id - module_metadata.pulse_id; - for (size_t i = 0; i < diff_to_max; i++) { - recv_single_module( - sockets[i_module], - &module_metadata, - data + (MODULE_N_BYTES * i_module)); - } - - if (module_metadata.pulse_id != max_pulse_id) { - throw runtime_error("Cannot align pulse_ids."); - } - } - - return max_pulse_id; -} - -void LiveRecvModule::receive_thread() -{ - try { - - vector sockets(n_modules_); - - for (size_t i = 0; i < n_modules_; i++) { - sockets[i] = connect_socket(i); - } - - auto slot_id = queue_.reserve(); - if (slot_id == -1) throw runtime_error("This cannot really happen"); - - auto metadata = queue_.get_metadata_buffer(slot_id); - auto data = queue_.get_data_buffer(slot_id); - - // First buffer load for alignment. - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - auto& module_metadata = metadata->module[i_module]; - - recv_single_module( - sockets[i_module], - &module_metadata, - data + (MODULE_N_BYTES * i_module)); - } - - auto current_pulse_id = align_modules(sockets, metadata, data); - - queue_.commit(); - current_pulse_id++; - - while(is_receiving_.load(memory_order_relaxed)) { - auto slot_id = queue_.reserve(); - - if (slot_id == -1){ - this_thread::sleep_for(chrono::milliseconds(5)); - continue; - } - - metadata = queue_.get_metadata_buffer(slot_id); - data = queue_.get_data_buffer(slot_id); - - bool sync_needed = false; - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - auto& module_metadata = metadata->module[i_module]; - - recv_single_module( - sockets[i_module], - &module_metadata, - data + (MODULE_N_BYTES * i_module)); - - if (module_metadata.pulse_id != current_pulse_id) { - sync_needed = true; - } - } - - if (sync_needed) { - auto start_time = chrono::steady_clock::now(); - - auto new_pulse_id = align_modules(sockets, metadata, data); - auto lost_pulses = new_pulse_id - current_pulse_id; - current_pulse_id = new_pulse_id; - - auto end_time = chrono::steady_clock::now(); - auto us_duration = chrono::duration_cast( - end_time-start_time).count(); - - cout << "sf_stream:sync_lost_pulses " << lost_pulses; - cout << " sf_stream::sync_us " << us_duration; - cout << endl; - } - - queue_.commit(); - current_pulse_id++; - } - - for (size_t i = 0; i < n_modules_; i++) { - zmq_close(sockets[i]); - } - - } catch (const std::exception& e) { - is_receiving_ = false; - - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[LiveRecvModule::receive_thread]"; - cout << " Stopped because of exception: " << endl; - cout << e.what() << endl; - - throw; - } -} \ No newline at end of file diff --git a/core-buffer/src/stream/sf_stream.cpp b/core-buffer/src/stream/sf_stream.cpp deleted file mode 100644 index 18f578f..0000000 --- a/core-buffer/src/stream/sf_stream.cpp +++ /dev/null @@ -1,221 +0,0 @@ -#include -#include -#include "buffer_config.hpp" - -#include -#include -#include -#include -#include "WriterH5Writer.hpp" -#include -#include -#include -#include -#include "date.h" -#include - -using namespace std; -using namespace core_buffer; - -int main (int argc, char *argv[]) -{ - if (argc != 5) { - cout << endl; - cout << "Usage: sf_stream "; - cout << " [streamvis_address] [reduction_factor_streamvis]"; - cout << " [live_analysis_address] [reduction_factor_live_analysis]"; - cout << endl; - cout << "\tstreamvis_address: address to streamvis, example tcp://129.129.241.42:9007" << endl; - cout << "\treduction_factor_streamvis: 1 out of N (example 10) images to send to streamvis. For remaining send metadata." << endl; - cout << "\tlive_analysis_address: address to live_analysis, example tcp://129.129.241.42:9107" << endl; - cout << "\treduction_factor_live_analysis: 1 out of N (example 10) images to send to live analysis. For remaining send metadata. N<=1 - send every image" << endl; - cout << endl; - - exit(-1); - } - - string streamvis_address = string(argv[1]); - int reduction_factor_streamvis = (int) atoll(argv[2]); - string live_analysis_address = string(argv[3]); - int reduction_factor_live_analysis = (uint64_t) atoll(argv[4]); - - size_t n_modules = 32; - FastQueue queue( - n_modules * MODULE_N_BYTES, - STREAM_FASTQUEUE_SLOTS); - - auto ctx = zmq_ctx_new(); - zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); - - LiveRecvModule recv_module(queue, n_modules, ctx, BUFFER_LIVE_IPC_URL); - - // 0mq sockets to streamvis and live analysis - void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB); - if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) { - throw runtime_error(strerror(errno)); - } - void *socket_live = zmq_socket(ctx, ZMQ_PUB); - if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) { - throw runtime_error(strerror(errno)); - } - - uint16_t data_empty [] = { 0, 0, 0, 0}; - Json::Value header; - Json::StreamWriterBuilder builder; - - // TODO: Remove stats trash. - int stats_counter = 0; - - size_t read_total_us = 0; - size_t read_max_us = 0; - - while (true) { - - auto start_time = chrono::steady_clock::now(); - - auto slot_id = queue.read(); - - if(slot_id == -1) { - this_thread::sleep_for(chrono::milliseconds( - core_buffer::RB_READ_RETRY_INTERVAL_MS)); - continue; - } - - auto metadata = queue.get_metadata_buffer(slot_id); - auto data = queue.get_data_buffer(slot_id); - - auto read_end_time = chrono::steady_clock::now(); - auto read_us_duration = chrono::duration_cast( - read_end_time-start_time).count(); - - uint64_t pulse_id = 0; - uint64_t frame_index = 0; - uint64_t daq_rec = 0; - bool is_good_frame = true; - - for (size_t i_module = 0; i_module < n_modules; i_module++) { - // TODO: Place this tests in the appropriate spot. - auto& module_metadata = metadata->module[i_module]; - if (i_module == 0) { - pulse_id = module_metadata.pulse_id; - frame_index = module_metadata.frame_index; - daq_rec = module_metadata.daq_rec; - - if ( module_metadata.n_received_packets != 128 ) is_good_frame = false; - } else { - if (module_metadata.pulse_id != pulse_id) is_good_frame = false; - - if (module_metadata.frame_index != frame_index) is_good_frame = false; - - if (module_metadata.daq_rec != daq_rec) is_good_frame = false; - - if (module_metadata.n_received_packets != 128 ) is_good_frame = false; - } - } - - //Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) - - header["frame"] = (Json::Value::UInt64)frame_index; - header["is_good_frame"] = is_good_frame; - header["daq_rec"] = (Json::Value::UInt64)daq_rec; - header["pulse_id"] = (Json::Value::UInt64)pulse_id; - - //this needs to be re-read from external source - header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5"; - header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5"; - - header["number_frames_expected"] = 10000; - header["run_name"] = to_string(uint64_t(pulse_id/10000)*10000); - - // detector name should come as parameter to sf_stream - header["detector_name"] = "JF07T32V01"; - - header["htype"] = "array-1.0"; - header["type"] = "uint16"; - - int send_streamvis = 0; - if ( reduction_factor_streamvis > 1 ) { - send_streamvis = rand() % reduction_factor_streamvis; - } - if ( send_streamvis == 0 ) { - header["shape"][0] = 16384; - header["shape"][1] = 1024; - } else{ - header["shape"][0] = 2; - header["shape"][1] = 2; - } - - string text_header = Json::writeString(builder, header); - - zmq_send(socket_streamvis, - text_header.c_str(), - text_header.size(), - ZMQ_SNDMORE); - - if ( send_streamvis == 0 ) { - zmq_send(socket_streamvis, - (char*)data, - core_buffer::MODULE_N_BYTES*n_modules, - 0); - } else { - zmq_send(socket_streamvis, - (char*)data_empty, - 8, - 0); - } - - //same for live analysis - int send_live_analysis = 0; - if ( reduction_factor_live_analysis > 1 ) { - send_live_analysis = rand() % reduction_factor_live_analysis; - } - if ( send_live_analysis == 0 ) { - header["shape"][0] = 16384; - header["shape"][1] = 1024; - } else{ - header["shape"][0] = 2; - header["shape"][1] = 2; - } - - text_header = Json::writeString(builder, header); - - zmq_send(socket_live, - text_header.c_str(), - text_header.size(), - ZMQ_SNDMORE); - - if ( send_live_analysis == 0 ) { - zmq_send(socket_live, - (char*)data, - core_buffer::MODULE_N_BYTES*n_modules, - 0); - } else { - zmq_send(socket_live, - (char*)data_empty, - 8, - 0); - } - - queue.release(); - - // TODO: Some poor statistics. - stats_counter++; - read_total_us += read_us_duration; - - if (read_us_duration > read_max_us) { - read_max_us = read_us_duration; - } - - if (stats_counter == STATS_MODULO) { - cout << "sf_stream:read_us " << read_total_us / STATS_MODULO; - cout << " sf_stream:read_max_us " << read_max_us; - cout << endl; - - stats_counter = 0; - read_total_us = 0; - read_max_us = 0; - } - } - - return 0; -} diff --git a/core-buffer/src/writer/BufferedFastQueue.cpp b/core-buffer/src/writer/BufferedFastQueue.cpp deleted file mode 100644 index 01a758e..0000000 --- a/core-buffer/src/writer/BufferedFastQueue.cpp +++ /dev/null @@ -1,68 +0,0 @@ -#include "BufferedFastQueue.hpp" -#include - -using namespace std; -using namespace core_buffer; - -BufferedFastQueue::BufferedFastQueue( - FastQueue& queue, - const size_t buffer_n_pulses, - const size_t n_modules) : - buffer_n_pulses_(buffer_n_pulses), - queue_(queue), - n_modules_(n_modules) -{ - while ((current_slot_id_ = queue_.reserve()) == -1){ - this_thread::sleep_for( - chrono::milliseconds(RB_READ_RETRY_INTERVAL_MS)); - } - - queue_meta_buffer_ = queue_.get_metadata_buffer(current_slot_id_); - queue_meta_buffer_->n_pulses_in_buffer = 0; - queue_data_buffer_ = queue_.get_data_buffer(current_slot_id_); -} - -ImageMetadata* BufferedFastQueue::get_metadata_buffer() -{ - return &image_metadata_; -} - -char* BufferedFastQueue::get_data_buffer() -{ - auto index = queue_meta_buffer_->n_pulses_in_buffer; - auto image_size = MODULE_N_BYTES * n_modules_; - - return queue_data_buffer_ + (index * image_size); -} - -void BufferedFastQueue::commit() -{ - auto index = queue_meta_buffer_->n_pulses_in_buffer; - - queue_meta_buffer_->pulse_id[index] = image_metadata_.pulse_id; - queue_meta_buffer_->frame_index[index] = image_metadata_.frame_index; - queue_meta_buffer_->daq_rec[index] = image_metadata_.daq_rec; - queue_meta_buffer_->is_good_frame[index] = image_metadata_.is_good_frame; - queue_meta_buffer_->data_n_bytes[index] = image_metadata_.data_n_bytes; - - queue_meta_buffer_->n_pulses_in_buffer++; - - if (queue_meta_buffer_->n_pulses_in_buffer == buffer_n_pulses_) { - queue_.commit(); - - while ((current_slot_id_ = queue_.reserve()) == -1){ - this_thread::sleep_for( - chrono::milliseconds(RB_READ_RETRY_INTERVAL_MS)); - } - - queue_meta_buffer_ = queue_.get_metadata_buffer(current_slot_id_); - queue_meta_buffer_->n_pulses_in_buffer = 0; - queue_data_buffer_ = queue_.get_data_buffer(current_slot_id_); - } -} - -void BufferedFastQueue::finalize() { - if (queue_meta_buffer_->n_pulses_in_buffer > 0) { - queue_.commit(); - } -} \ No newline at end of file diff --git a/core-buffer/src/writer/FastQueue.cpp b/core-buffer/src/writer/FastQueue.cpp deleted file mode 100644 index 25dde8c..0000000 --- a/core-buffer/src/writer/FastQueue.cpp +++ /dev/null @@ -1,109 +0,0 @@ -#include -#include -#include -#include "FastQueue.hpp" - -using namespace std; - -template -FastQueue::FastQueue( - const size_t slot_data_n_bytes, - const uint16_t n_slots) : - slot_n_bytes_(slot_data_n_bytes + sizeof(T)), - n_slots_(n_slots) -{ - buffer_ = new char[slot_n_bytes_ * n_slots_]; - buffer_status_ = new atomic_int[n_slots]; - - // TODO: Are atomic variables initialized? - for (size_t i=0; i < n_slots_; i++) { - buffer_status_[i] = 0; - } - - write_slot_id_ = 0; - read_slot_id_ = 0; -} - -template -FastQueue::~FastQueue() -{ - delete[] buffer_; - delete[] buffer_status_; -} - -template -T* FastQueue::get_metadata_buffer(const int slot_id) -{ - return (T*)(buffer_ + (slot_id * slot_n_bytes_)); -} - -template -char* FastQueue::get_data_buffer(const int slot_id) -{ - return (char*)(buffer_ + (slot_id * slot_n_bytes_) + sizeof(T)); -} - -template -int FastQueue::reserve() -{ - int expected = SLOT_STATUS::EMPTY; - // If (buffer_status==SLOT_EMPTY) buffer_status=SLOT_RESERVED. - bool slot_reserved = - buffer_status_[write_slot_id_].compare_exchange_strong( - expected, SLOT_STATUS::RESERVED); - - if (!slot_reserved) { - return -1; - } - - return write_slot_id_; -} - -template -void FastQueue::commit() -{ - int expected = SLOT_STATUS::RESERVED; - // If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY. - bool slot_ready = - buffer_status_[write_slot_id_].compare_exchange_strong( - expected, SLOT_STATUS::READY); - - if (!slot_ready) { - throw runtime_error("Slot should be reserved first."); - } - - write_slot_id_++; - write_slot_id_ %= n_slots_; -} - -template -int FastQueue::read() -{ - if (buffer_status_[read_slot_id_] != SLOT_STATUS::READY) { - return -1; - } - - return read_slot_id_; -} - -template -void FastQueue::release() -{ - int expected = SLOT_STATUS::READY; - // If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY. - bool slot_empty = - buffer_status_[read_slot_id_].compare_exchange_strong( - expected, SLOT_STATUS::EMPTY); - - if (!slot_empty) { - throw runtime_error("Slot should be ready first."); - } - - read_slot_id_++; - read_slot_id_ %= n_slots_; -} - -template class FastQueue; -template class FastQueue; -template class FastQueue; -template class FastQueue; diff --git a/core-buffer/src/writer/WriterH5Writer.cpp b/core-buffer/src/writer/WriterH5Writer.cpp deleted file mode 100644 index 750d6a6..0000000 --- a/core-buffer/src/writer/WriterH5Writer.cpp +++ /dev/null @@ -1,162 +0,0 @@ -#include "WriterH5Writer.hpp" -#include - - -//extern "C" -//{ -// #include "H5DOpublic.h" -// #include -//} - -using namespace std; -using namespace core_buffer; - -WriterH5Writer::WriterH5Writer( - const string& output_file, - const size_t n_frames, - const size_t n_modules) : - n_frames_(n_frames), - n_modules_(n_modules), - current_write_index_(0) -{ - -// bshuf_register_h5filter(); - - file_ = H5::H5File(output_file, H5F_ACC_TRUNC); - - hsize_t image_dataset_dims[3] = - {n_frames_, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE}; - - H5::DataSpace image_dataspace(3, image_dataset_dims); - - hsize_t image_dataset_chunking[3] = - {1, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DSetCreatPropList image_dataset_properties; - image_dataset_properties.setChunk(3, image_dataset_chunking); - -// // block_size, compression type -// uint compression_prop[] = -// {MODULE_N_PIXELS, //block size -// BSHUF_H5_COMPRESS_LZ4}; // Compression type -// -// H5Pset_filter(image_dataset_properties.getId(), -// BSHUF_H5FILTER, -// H5Z_FLAG_MANDATORY, -// 2, -// &(compression_prop[0])); - - image_dataset_ = file_.createDataSet( - "image", - H5::PredType::NATIVE_UINT16, - image_dataspace, - image_dataset_properties); - - hsize_t metadata_dataset_dims[] = {n_frames_, 1}; - H5::DataSpace metadata_dataspace(2, metadata_dataset_dims); - - // Chunk cannot be larger than n_frames. - auto metadata_chunk_size = WRITER_METADATA_CHUNK_N_IMAGES; - if (n_frames < metadata_chunk_size) { - metadata_chunk_size = n_frames; - } - - hsize_t metadata_dataset_chunking[] = {metadata_chunk_size, 1}; - H5::DSetCreatPropList metadata_dataset_properties; - metadata_dataset_properties.setChunk(2, metadata_dataset_chunking); - - pulse_id_dataset_ = file_.createDataSet( - "pulse_id", - H5::PredType::NATIVE_UINT64, - metadata_dataspace, - metadata_dataset_properties); - - frame_index_dataset_ = file_.createDataSet( - "frame_index", - H5::PredType::NATIVE_UINT64, - metadata_dataspace, - metadata_dataset_properties); - - daq_rec_dataset_ = file_.createDataSet( - "daq_rec", - H5::PredType::NATIVE_UINT32, - metadata_dataspace, - metadata_dataset_properties); - - is_good_frame_dataset_ = file_.createDataSet( - "is_good_frame", - H5::PredType::NATIVE_UINT8, - metadata_dataspace, - metadata_dataset_properties); - -} - -WriterH5Writer::~WriterH5Writer() -{ - close_file(); -} - -void WriterH5Writer::close_file() -{ - image_dataset_.close(); - pulse_id_dataset_.close(); - frame_index_dataset_.close(); - daq_rec_dataset_.close(); - is_good_frame_dataset_.close(); - - file_.close(); -} - -void WriterH5Writer::write( - const ImageMetadataBuffer* metadata, const char* data) -{ - auto n_images_in_buffer = metadata->n_pulses_in_buffer; - - hsize_t b_i_dims[3] = { - n_images_in_buffer, - MODULE_Y_SIZE*n_modules_, - MODULE_X_SIZE}; - H5::DataSpace b_i_space(3, b_i_dims); - - hsize_t f_i_dims[3] = {n_frames_, - MODULE_Y_SIZE * n_modules_, - MODULE_X_SIZE}; - H5::DataSpace f_i_space(3, f_i_dims); - - hsize_t i_count[] = {n_images_in_buffer, - MODULE_Y_SIZE*n_modules_, - MODULE_X_SIZE}; - hsize_t i_start[] = {current_write_index_, 0, 0}; - f_i_space.selectHyperslab(H5S_SELECT_SET, i_count, i_start); - - image_dataset_.write( - data, H5::PredType::NATIVE_UINT16, - b_i_space, f_i_space); - - hsize_t b_m_dims[2] = {n_images_in_buffer, 1}; - H5::DataSpace b_m_space (2, b_m_dims); - - hsize_t f_m_dims[] = {n_frames_, 1}; - H5::DataSpace f_m_space(2, f_m_dims); - - hsize_t meta_count[] = {n_images_in_buffer, 1}; - hsize_t meta_start[] = {current_write_index_, 0}; - f_m_space.selectHyperslab(H5S_SELECT_SET, meta_count, meta_start); - - pulse_id_dataset_.write( - &(metadata->pulse_id), H5::PredType::NATIVE_UINT64, - b_m_space, f_m_space); - - frame_index_dataset_.write( - &(metadata->frame_index), H5::PredType::NATIVE_UINT64, - b_m_space, f_m_space); - - daq_rec_dataset_.write( - &(metadata->daq_rec), H5::PredType::NATIVE_UINT32, - b_m_space, f_m_space); - - is_good_frame_dataset_.write( - &(metadata->is_good_frame), H5::PredType::NATIVE_UINT8, - b_m_space, f_m_space); - - current_write_index_++; -} diff --git a/core-buffer/src/writer/WriterZmqReceiver.cpp b/core-buffer/src/writer/WriterZmqReceiver.cpp deleted file mode 100644 index 23b388e..0000000 --- a/core-buffer/src/writer/WriterZmqReceiver.cpp +++ /dev/null @@ -1,138 +0,0 @@ -#include "WriterZmqReceiver.hpp" -#include "zmq.h" -#include "date.h" -#include -#include - -using namespace std; -using namespace core_buffer; - -WriterZmqReceiver::WriterZmqReceiver( - void *ctx, - const string &ipc_prefix, - const size_t n_modules) : - n_modules_(n_modules), - sockets_(n_modules) -{ - - for (size_t i = 0; i < n_modules; i++) { - sockets_[i] = zmq_socket(ctx, ZMQ_PULL); - - int rcvhwm = WRITER_RCVHWM; - if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm, - sizeof(rcvhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - int linger = 0; - if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger, - sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - stringstream ipc_addr; - ipc_addr << ipc_prefix << i; - const auto ipc = ipc_addr.str(); - - if (zmq_connect(sockets_[i], ipc.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - } -} - -WriterZmqReceiver::~WriterZmqReceiver() -{ - for (size_t i = 0; i < n_modules_; i++) { - zmq_close(sockets_[i]); - } -} - -void WriterZmqReceiver::get_next_image( - const uint64_t pulse_id, - ImageMetadata* image_metadata, - char* image_buffer) -{ - // Init the image metadata. - image_metadata->pulse_id = pulse_id; - image_metadata->frame_index = 0; - image_metadata->daq_rec = 0; - image_metadata->data_n_bytes = 0; - image_metadata->is_good_frame = 1; - bool image_metadata_init = false; - - size_t image_buffer_offset = 0; - - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - - auto n_bytes_metadata = zmq_recv( - sockets_[i_module], - &frame_metadata, - sizeof(StreamModuleFrame), - 0); - - if (n_bytes_metadata != sizeof(StreamModuleFrame)) { - throw runtime_error("Wrong number of metadata bytes."); - } - - // sf_replay should always send the right pulse_id. - if (frame_metadata.metadata.pulse_id != pulse_id) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[sf_writer::receive_replay]"; - err_msg << " Read unexpected pulse_id. "; - err_msg << " Expected " << pulse_id; - err_msg << " received "; - err_msg << frame_metadata.metadata.pulse_id; - err_msg << " from i_module " << i_module << endl; - - throw runtime_error(err_msg.str()); - } - - if (!frame_metadata.is_frame_present) { - image_metadata->is_good_frame = 0; - - // Init the image metadata with the first valid frame. - } else if (!image_metadata_init) { - image_metadata_init = true; - - image_metadata->frame_index = - frame_metadata.metadata.frame_index; - image_metadata->daq_rec = - frame_metadata.metadata.daq_rec; - } - - // Once the image is not good, we don't care to re-flag it. - if (image_metadata->is_good_frame == 1) { - if (frame_metadata.metadata.frame_index != - image_metadata->frame_index) { - image_metadata->is_good_frame = 0; - } - - if (frame_metadata.metadata.daq_rec != - image_metadata->daq_rec) { - image_metadata->is_good_frame = 0; - } - - if (frame_metadata.metadata.n_received_packets != - JUNGFRAU_N_PACKETS_PER_FRAME) { - image_metadata->is_good_frame = 0; - } - } - - auto n_bytes_image = zmq_recv( - sockets_[i_module], - (image_buffer + image_buffer_offset), - frame_metadata.data_n_bytes, - 0); - - if (n_bytes_image != frame_metadata.data_n_bytes) { - throw runtime_error("Wrong number of data bytes."); - } - - image_buffer_offset += n_bytes_image; - } - - image_metadata->data_n_bytes = image_buffer_offset; -} diff --git a/core-buffer/src/writer/sf_writer.cpp b/core-buffer/src/writer/sf_writer.cpp deleted file mode 100644 index f4ceb8f..0000000 --- a/core-buffer/src/writer/sf_writer.cpp +++ /dev/null @@ -1,179 +0,0 @@ -#include -#include -#include "buffer_config.hpp" -#include "zmq.h" -#include -#include -#include -#include -#include "WriterH5Writer.hpp" -#include -#include -#include -#include "date.h" -#include "bitshuffle/bitshuffle.h" -#include "WriterZmqReceiver.hpp" - -using namespace std; -using namespace core_buffer; - -void receive_replay( - void* ctx, - const string ipc_prefix, - const size_t n_modules, - FastQueue& queue, - const uint64_t start_pulse_id, - const uint64_t stop_pulse_id) -{ - try { - WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules); - BufferedFastQueue buffered_queue( - queue, WRITER_DATA_CACHE_N_IMAGES, n_modules); - - uint64_t current_pulse_id=start_pulse_id; - - // "<= stop_pulse_id" because we include the last pulse_id. - while(current_pulse_id<=stop_pulse_id) { - - auto image_metadata = buffered_queue.get_metadata_buffer(); - auto image_buffer = buffered_queue.get_data_buffer(); - - receiver.get_next_image( - current_pulse_id, image_metadata, image_buffer); - - if (image_metadata->pulse_id != current_pulse_id) { - throw runtime_error("Wrong pulse id from zmq receiver."); - } - - buffered_queue.commit(); - current_pulse_id++; - } - - buffered_queue.finalize(); - - } catch (const std::exception& e) { - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[sf_writer::receive_replay]"; - cout << " Stopped because of exception: " << endl; - cout << e.what() << endl; - - throw; - } -} - -int main (int argc, char *argv[]) -{ - if (argc != 4) { - cout << endl; - cout << "Usage: sf_writer "; - cout << " [output_file] [start_pulse_id] [stop_pulse_id]"; - cout << endl; - cout << "\toutput_file: Complete path to the output file." << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; - cout << endl; - - exit(-1); - } - - string output_file = string(argv[1]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[2]); - uint64_t stop_pulse_id = (uint64_t) atoll(argv[3]); - - size_t n_modules = 32; - - FastQueue queue( - MODULE_N_BYTES * n_modules * WRITER_DATA_CACHE_N_IMAGES, - WRITER_FASTQUEUE_N_SLOTS); - - auto ctx = zmq_ctx_new(); - zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS); - - thread replay_receive_thread(receive_replay, - ctx, REPLAY_STREAM_IPC_URL, n_modules, - ref(queue), start_pulse_id, stop_pulse_id); - - size_t n_frames = stop_pulse_id - start_pulse_id + 1; - WriterH5Writer writer(output_file, n_frames, n_modules); - - // TODO: Remove stats trash. - int stats_counter = 0; - size_t read_total_us = 0; - size_t write_total_us = 0; - size_t read_max_us = 0; - size_t write_max_us = 0; - - auto start_time = chrono::steady_clock::now(); - - auto current_pulse_id = start_pulse_id; - // "<= stop_pulse_id" because we include the last pulse_id. - while (current_pulse_id <= stop_pulse_id) { - - int slot_id; ; - while((slot_id = queue.read()) == -1) { - this_thread::sleep_for(chrono::milliseconds( - RB_READ_RETRY_INTERVAL_MS)); - } - - auto metadata = queue.get_metadata_buffer(slot_id); - auto data = queue.get_data_buffer(slot_id); - - auto read_end_time = chrono::steady_clock::now(); - auto read_us_duration = chrono::duration_cast( - read_end_time-start_time).count(); - - // Verify that all pulse_ids are correct. - for (int i=0; in_pulses_in_buffer; i++) { - if (metadata->pulse_id[i] != current_pulse_id) { - throw runtime_error("Wrong pulse id from receiver thread."); - } - - current_pulse_id++; - } - - start_time = chrono::steady_clock::now(); - - writer.write(metadata, data); - - auto write_end_time = chrono::steady_clock::now(); - auto write_us_duration = chrono::duration_cast( - write_end_time-start_time).count(); - - queue.release(); - - // TODO: Some poor statistics. - stats_counter++; - - read_total_us += read_us_duration; - read_max_us = max(read_max_us, (uint64_t)read_us_duration); - - write_total_us += write_us_duration; - write_max_us = max(write_max_us, (uint64_t)write_us_duration); - -// if (stats_counter == STATS_MODULO) { - cout << "sf_writer:read_us " << read_total_us / STATS_MODULO; - cout << " sf_writer:read_max_us " << read_max_us; - cout << " sf_writer:write_us " << write_total_us / STATS_MODULO; - cout << " sf_writer:write_max_us " << write_max_us; - - cout << endl; - - stats_counter = 0; - read_total_us = 0; - read_max_us = 0; - write_total_us = 0; - write_max_us = 0; -// } - - start_time = chrono::steady_clock::now(); - } - - writer.close_file(); - - //wait till receive thread is finished - replay_receive_thread.join(); - return 0; -} diff --git a/core-buffer/test/CMakeLists.txt b/core-buffer/test/CMakeLists.txt index 24e0a58..32c5d56 100644 --- a/core-buffer/test/CMakeLists.txt +++ b/core-buffer/test/CMakeLists.txt @@ -1,7 +1,7 @@ -add_executable(core-buffer_tests main.cpp) +add_executable(core-buffer-tests main.cpp) -target_link_libraries(core-buffer_tests - core-buffer +target_link_libraries(core-buffer-tests + core-buffer-lib external hdf5 hdf5_hl @@ -9,10 +9,10 @@ target_link_libraries(core-buffer_tests zmq gtest) -add_executable(perf-sf_writer perf/perf_WriterH5Writer.cpp) -target_link_libraries(perf-sf_writer - core-buffer - hdf5 - hdf5_hl - hdf5_cpp - gtest) +#add_executable(perf-sf_writer perf/perf_WriterH5Writer.cpp) +#target_link_libraries(perf-sf_writer +# core-buffer +# hdf5 +# hdf5_hl +# hdf5_cpp +# gtest) diff --git a/core-buffer/test/main.cpp b/core-buffer/test/main.cpp index b8e0cae..e379d99 100644 --- a/core-buffer/test/main.cpp +++ b/core-buffer/test/main.cpp @@ -1,15 +1,6 @@ #include "gtest/gtest.h" -#include "test_UdpReceiver.cpp" -#include "test_BufferBinaryWriter.cpp" #include "test_buffer_utils.cpp" -#include "test_BufferH5Writer.cpp" -#include "test_ReplayH5Reader.cpp" -#include "test_WriterH5Writer.cpp" -#include "test_FastQueue.cpp" -#include "test_LiveRecvModule.cpp" -#include "test_BufferUdpReceiver.cpp" #include "test_bitshuffle.cpp" -#include "test_WriterZmqReceiver.cpp" using namespace std; diff --git a/core-buffer/test/perf/perf_WriterH5Writer.cpp b/core-buffer/test/perf/perf_WriterH5Writer.cpp deleted file mode 100644 index 1d9df1f..0000000 --- a/core-buffer/test/perf/perf_WriterH5Writer.cpp +++ /dev/null @@ -1,90 +0,0 @@ -#include -#include "buffer_config.hpp" -#include "zmq.h" -#include -#include -#include -#include -#include "WriterH5Writer.hpp" - -using namespace std; -using namespace core_buffer; - - -int main (int argc, char *argv[]) -{ - if (argc != 4) { - cout << endl; - cout << "Usage: sf_writer "; - cout << " [output_file] [start_pulse_id] [stop_pulse_id]"; - cout << endl; - cout << "\toutput_file: Complete path to the output file." << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; - cout << endl; - - exit(-1); - } - - string output_file = string(argv[1]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[2]); - uint64_t stop_pulse_id = (uint64_t) atoll(argv[3]); - - size_t n_modules = 32; - - size_t n_frames = stop_pulse_id - start_pulse_id; - WriterH5Writer writer(output_file, n_frames, n_modules); - - // TODO: Remove stats trash. - int i_write = 0; - size_t total_ms = 0; - size_t max_ms = 0; - size_t min_ms = 10000; // 10 seconds should be a safe first value. - - auto start_time = chrono::steady_clock::now(); - - auto metadata = make_shared(); - auto data = make_unique(MODULE_N_BYTES*n_modules); - - auto current_pulse_id = start_pulse_id; - while (current_pulse_id <= stop_pulse_id) { - - writer.write(metadata.get(), data.get()); - current_pulse_id++; - - i_write++; - - auto end_time = chrono::steady_clock::now(); - - // TODO: Some poor statistics. - - auto ms_duration = chrono::duration_cast( - end_time-start_time).count(); - total_ms += ms_duration; - - if (ms_duration > max_ms) { - max_ms = ms_duration; - } - - if (ms_duration < min_ms) { - min_ms = ms_duration; - } - - if (i_write==100) { - cout << "avg_write_ms " << total_ms / 100; - cout << " min_write_ms " << min_ms; - cout << " max_write_ms " << max_ms << endl; - - i_write = 0; - total_ms = 0; - max_ms = 0; - min_ms = 0; - } - - start_time = chrono::steady_clock::now(); - } - - writer.close_file(); - - return 0; -} diff --git a/core-buffer/test/test_BufferBinaryWriter.cpp b/core-buffer/test/test_BufferBinaryWriter.cpp deleted file mode 100644 index 176eb55..0000000 --- a/core-buffer/test/test_BufferBinaryWriter.cpp +++ /dev/null @@ -1,86 +0,0 @@ -#include -#include "BufferUtils.hpp" -#include -#include "gtest/gtest.h" - -TEST(BinaryWriter, basic_interaction) -{ - auto root_folder = "."; - auto device_name = "test_device"; - uint64_t pulse_id = 5; - - BufferBinaryWriter writer(device_name, root_folder); - - BufferBinaryFormat frame_data; - frame_data.pulse_id = 1; - frame_data.frame_id = 2; - frame_data.daq_rec = 3; - frame_data.n_recv_packets = 4; - - ASSERT_EQ(frame_data.FORMAT_MARKER, JF_FORMAT_START_BYTE); - - writer.write(5, &frame_data); - - auto output_filename = - BufferUtils::get_filename(root_folder, device_name, pulse_id); - - auto read_fd = open(output_filename.c_str(), O_RDONLY); - ASSERT_NE(read_fd, -1); - - auto file_frame_index = BufferUtils::get_file_frame_index(pulse_id); - - BufferBinaryFormat read_data; - - ::lseek(read_fd, file_frame_index * sizeof(BufferBinaryFormat), SEEK_SET); - ::read(read_fd, &read_data, sizeof(BufferBinaryFormat)); - - ASSERT_EQ(frame_data.FORMAT_MARKER, JF_FORMAT_START_BYTE); - ASSERT_EQ(frame_data.FORMAT_MARKER, read_data.FORMAT_MARKER); - ASSERT_EQ(frame_data.pulse_id, read_data.pulse_id); - ASSERT_EQ(frame_data.frame_id, read_data.frame_id); - ASSERT_EQ(frame_data.daq_rec, read_data.daq_rec); - ASSERT_EQ(frame_data.n_recv_packets, read_data.n_recv_packets); -} - -TEST(BinaryWriter, test_format_marker) -{ - auto root_folder = "."; - auto device_name = "test_device"; - uint64_t pulse_id = 5; - - BufferBinaryWriter writer(device_name, root_folder); - - BufferBinaryFormat frame_data; - frame_data.pulse_id = 1; - frame_data.frame_id = 2; - frame_data.daq_rec = 3; - frame_data.n_recv_packets = 4; - - writer.write(5, &frame_data); - - auto output_filename = - BufferUtils::get_filename(root_folder, device_name, pulse_id); - - auto read_fd = open(output_filename.c_str(), O_RDONLY); - ASSERT_NE(read_fd, -1); - - auto file_frame_index = BufferUtils::get_file_frame_index(pulse_id); - - BufferBinaryFormat read_data; - - // One frame before should be empty. - ::lseek(read_fd, (file_frame_index-1) * sizeof(BufferBinaryFormat), SEEK_SET); - ::read(read_fd, &read_data, sizeof(BufferBinaryFormat)); - ASSERT_NE(read_data.FORMAT_MARKER, JF_FORMAT_START_BYTE); - - // One frame after should be empty as well. - ::lseek(read_fd, (file_frame_index+1) * sizeof(BufferBinaryFormat), SEEK_SET); - ::read(read_fd, &read_data, sizeof(BufferBinaryFormat)); - ASSERT_NE(read_data.FORMAT_MARKER, JF_FORMAT_START_BYTE); - - // But this frame should be here. - ::lseek(read_fd, (file_frame_index) * sizeof(BufferBinaryFormat), SEEK_SET); - ::read(read_fd, &read_data, sizeof(BufferBinaryFormat)); - ASSERT_EQ(read_data.FORMAT_MARKER, JF_FORMAT_START_BYTE); - -} \ No newline at end of file diff --git a/core-buffer/test/test_BufferH5Writer.cpp b/core-buffer/test/test_BufferH5Writer.cpp deleted file mode 100644 index 816b41c..0000000 --- a/core-buffer/test/test_BufferH5Writer.cpp +++ /dev/null @@ -1,130 +0,0 @@ -#include "BufferH5Writer.hpp" -#include "gtest/gtest.h" - -using namespace core_buffer; - -TEST(BufferH5Writer, basic_interaction) -{ - auto root_folder = "."; - auto device_name = "fast_device"; - size_t pulse_id = 1; - - auto buffer = make_unique(JUNGFRAU_DATA_BYTES_PER_FRAME); - - ModuleFrame metadata; - metadata.pulse_id = 1; - metadata.frame_index = 2; - metadata.daq_rec = 3; - metadata.n_received_packets = 128; - - BufferH5Writer writer(root_folder, device_name); - writer.set_pulse_id(pulse_id); - writer.write(&metadata, buffer.get()); - writer.close_file(); - - auto filename = BufferUtils::get_filename( - root_folder, device_name, pulse_id); - - auto file_frame_index = BufferUtils::get_file_frame_index(pulse_id); - - H5::H5File input_file(filename, H5F_ACC_RDONLY); - - auto image_dataset = input_file.openDataSet("image"); - size_t image_buffer_n_bytes = JUNGFRAU_DATA_BYTES_PER_FRAME * FILE_MOD; - auto image_buffer = make_unique(image_buffer_n_bytes); - image_dataset.read(image_buffer.get(), H5::PredType::NATIVE_UINT16); - - auto metadata_dataset = input_file.openDataSet("metadata"); - auto metadata_buffer = make_unique(FILE_MOD); - metadata_dataset.read(metadata_buffer.get(), H5::PredType::NATIVE_UINT64); - - EXPECT_EQ(metadata_buffer[file_frame_index].pulse_id, 1); - EXPECT_EQ(metadata_buffer[file_frame_index].frame_index, 2); - EXPECT_EQ(metadata_buffer[file_frame_index].daq_rec, 3); - EXPECT_EQ(metadata_buffer[file_frame_index].n_received_packets, 128); -} -// -//TEST(BufferH5Writer, SWMR) -//{ -// auto root_folder = "."; -// auto device_name = "fast_device"; -// size_t pulse_id = 0; -// -// auto i_write_buffer = make_unique(JUNGFRAU_DATA_BYTES_PER_FRAME); -// size_t image_buffer_n_bytes = JUNGFRAU_DATA_BYTES_PER_FRAME * FILE_MOD; -// auto i_read_buffer = make_unique(image_buffer_n_bytes); -// -// ModuleFrame m_write_buffer = {1, 2, 3, 4, 5}; -// auto m_read_buffer = make_unique(FILE_MOD); -// -// for (size_t i=0; i -#include -#include "gtest/gtest.h" -#include "BufferUdpReceiver.hpp" -#include "mock/udp.hpp" - -#include -#include -#include - -using namespace std; - -TEST(BufferUdpReceiver, simple_recv) -{ - auto n_packets = JUNGFRAU_N_PACKETS_PER_FRAME; - int source_id = 1234; - int n_frames = 5; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - BufferUdpReceiver udp_receiver(udp_port, source_id); - - auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); - - for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_received_packets, n_packets); - ASSERT_EQ(metadata.module_id, source_id); - } - - ::close(send_socket_fd); -} - -TEST(BufferUdpReceiver, missing_middle_packet) -{ - auto n_packets = JUNGFRAU_N_PACKETS_PER_FRAME; - int source_id = 1234; - int n_frames = 3; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - BufferUdpReceiver udp_receiver(udp_port, source_id); - - auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); - - for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_received_packets, n_packets-1); - ASSERT_EQ(metadata.module_id, source_id); - } - - ::close(send_socket_fd); -} - -TEST(BufferUdpReceiver, missing_first_packet) -{ - auto n_packets = JUNGFRAU_N_PACKETS_PER_FRAME; - int source_id = 1234; - int n_frames = 3; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - BufferUdpReceiver udp_receiver(udp_port, source_id); - - auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); - - for (int i_frame=0; i_frame < n_frames; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_received_packets, n_packets-1); - ASSERT_EQ(metadata.module_id, source_id); - } - - ::close(send_socket_fd); -} - -TEST(BufferUdpReceiver, missing_last_packet) -{ - auto n_packets = JUNGFRAU_N_PACKETS_PER_FRAME; - int source_id = 1234; - int n_frames = 3; - - uint16_t udp_port = MOCK_UDP_PORT; - auto server_address = get_server_address(udp_port); - auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - ASSERT_TRUE(send_socket_fd >= 0); - - BufferUdpReceiver udp_receiver(udp_port, source_id); - - auto handle = async(launch::async, [&](){ - for (int i_frame=0; i_frame < n_frames; i_frame++){ - for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); - - // n_frames -1 because the last frame is not complete. - for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { - auto pulse_id = udp_receiver.get_frame_from_udp( - metadata, frame_buffer.get()); - - ASSERT_EQ(i_frame + 1, pulse_id); - ASSERT_EQ(metadata.frame_index, i_frame + 1000); - ASSERT_EQ(metadata.daq_rec, i_frame + 10000); - // -1 because we skipped a packet. - ASSERT_EQ(metadata.n_received_packets, n_packets-1); - ASSERT_EQ(metadata.module_id, source_id); - } - - ::close(send_socket_fd); -} \ No newline at end of file diff --git a/core-buffer/test/test_FastQueue.cpp b/core-buffer/test/test_FastQueue.cpp deleted file mode 100644 index 14d360f..0000000 --- a/core-buffer/test/test_FastQueue.cpp +++ /dev/null @@ -1,147 +0,0 @@ -#include "FastQueue.hpp" -#include "gtest/gtest.h" - -using namespace core_buffer; - -TEST(FastQueue, basic_interaction) -{ - size_t n_slots = 5; - size_t slot_data_n_bytes = MODULE_N_BYTES * 2; - FastQueue queue(slot_data_n_bytes, n_slots); - int slot_id; - - // The queue at the beginning should be empty. - ASSERT_EQ(queue.read(), -1); - // Cannot commit a slot until you reserve it. - ASSERT_THROW(queue.commit(), runtime_error); - // Cannot release a slot until its ready. - ASSERT_THROW(queue.release(), runtime_error); - - // Reserve a slot. - slot_id = queue.reserve(); - ASSERT_NE(slot_id, -1); - // But you cannot reserve 2 slots at once. - ASSERT_EQ(queue.reserve(), -1); - // And cannot read this slot until its committed. - ASSERT_EQ(queue.read(), -1); - - auto detector_frame = queue.get_metadata_buffer(slot_id); - char* meta_ptr = (char*) detector_frame; - char* data_ptr = (char*) queue.get_data_buffer(slot_id); - - queue.commit(); - - slot_id = queue.read(); - // Once the slot is committed we should be able to read it. - ASSERT_NE(slot_id, -1); - // You can read the same slot multiple times. - ASSERT_NE(queue.read(), -1); - // The 2 buffers should match the committed slot. - ASSERT_EQ(meta_ptr, (char*)(queue.get_metadata_buffer(slot_id))); - ASSERT_EQ(data_ptr, (char*)(queue.get_data_buffer(slot_id))); - - queue.release(); -} - -TEST(FastQueue, queue_full) -{ - size_t n_slots = 5; - size_t slot_data_n_bytes = MODULE_N_BYTES * 2; - FastQueue queue(slot_data_n_bytes, n_slots); - - // There is nothing to be read in the queue. - ASSERT_EQ(queue.read(), -1); - - for (size_t i=0; i queue(slot_data_n_bytes, n_slots); - - int write_slot_id = queue.reserve(); - - auto w_metadata = queue.get_metadata_buffer(write_slot_id); - w_metadata->pulse_id = 1; - w_metadata->frame_index = 2; - w_metadata->daq_rec = 3; - w_metadata->is_good_frame = 4; - - auto w_data = (uint16_t*)(queue.get_data_buffer(write_slot_id)); - for (size_t i=0; ipulse_id, - r_metadata->pulse_id); - EXPECT_EQ(w_metadata->frame_index, - r_metadata->frame_index); - EXPECT_EQ(w_metadata->daq_rec, - r_metadata->daq_rec); - EXPECT_EQ(w_metadata->is_good_frame, - r_metadata->is_good_frame); - - auto r_data = (uint16_t*)(queue.get_data_buffer(read_slot_id)); - for (size_t i=0; i queue( - n_modules * MODULE_N_BYTES, - WRITER_FASTQUEUE_N_SLOTS); - - ModuleFrame frame; - - auto slot_id = queue.reserve(); - auto metadata = queue.get_metadata_buffer(slot_id); - - for (int i_module=0; i_modulemodule[i_module]; - - frame.pulse_id = i_module; - frame.frame_index = i_module; - frame.daq_rec = i_module; - frame.n_received_packets = i_module; - frame.module_id = i_module; - - ModuleFrame* p_metadata = &module_metadata; - - memcpy(p_metadata, &frame, sizeof(ModuleFrame)); - } - - for (int i_module=0; i_modulemodule[i_module]; - - ASSERT_EQ(module_metadata.pulse_id, i_module); - ASSERT_EQ(module_metadata.frame_index, i_module); - ASSERT_EQ(module_metadata.daq_rec, i_module); - ASSERT_EQ(module_metadata.n_received_packets, i_module); - ASSERT_EQ(module_metadata.module_id, i_module); - } -} \ No newline at end of file diff --git a/core-buffer/test/test_LiveRecvModule.cpp b/core-buffer/test/test_LiveRecvModule.cpp deleted file mode 100644 index 31c35d4..0000000 --- a/core-buffer/test/test_LiveRecvModule.cpp +++ /dev/null @@ -1,87 +0,0 @@ -#include -#include "LiveRecvModule.hpp" -#include "gtest/gtest.h" -#include "buffer_config.hpp" -#include - -using namespace std; -using namespace core_buffer; - -TEST(LiveRecvModule, transfer_test) { - // TODO: Make this test work again. -// auto ctx = zmq_ctx_new(); -// -// size_t n_modules = 32; -// size_t n_slots = 5; -// FastQueue queue(MODULE_N_BYTES * n_modules, n_slots); -// -// void *sockets[n_modules]; -// for (size_t i = 0; i < n_modules; i++) { -// sockets[i] = zmq_socket(ctx, ZMQ_PUB); -// -// int linger = 0; -// if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger, -// sizeof(linger)) != 0) { -// throw runtime_error(zmq_strerror(errno)); -// } -// -// stringstream ipc_addr; -// ipc_addr << BUFFER_LIVE_IPC_URL << i; -// const auto ipc = ipc_addr.str(); -// -// if (zmq_bind(sockets[i], ipc.c_str()) != 0) { -// throw runtime_error(zmq_strerror(errno)); -// } -// } -// -// LiveRecvModule recv_module(queue, n_modules, ctx, BUFFER_LIVE_IPC_URL); -// -// // Nothing should be committed, queue, should be empty. -// ASSERT_EQ(queue.read(), -1); -// -// ModuleFrame metadata; -// auto data = make_unique(MODULE_N_BYTES); -// -// for (size_t i = 0; i < n_modules; i++) { -// metadata.pulse_id = 1; -// metadata.frame_index = 2; -// metadata.daq_rec = 3; -// metadata.n_received_packets = 4; -// metadata.module_id = i; -// -// zmq_send(sockets[i], &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); -// zmq_send(sockets[i], data.get(), MODULE_N_BYTES, 0); -// } -// -// this_thread::sleep_for(chrono::milliseconds(100)); -// -// auto slot_id = queue.read(); -// // We should have the first Detector frame in the buffer. -// //ASSERT_NE(slot_id, -1); -// -// auto recv_stopped = async(launch::async, [&](){ -// recv_module.stop(); -// }); -// -// this_thread::sleep_for(chrono::milliseconds(100)); -// -// for (size_t i = 0; i < n_modules; i++) { -// metadata.pulse_id = 1; -// metadata.frame_index = 2; -// metadata.daq_rec = 3; -// metadata.n_received_packets = 4; -// metadata.module_id = i; -// -// zmq_send(sockets[i], &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); -// zmq_send(sockets[i], data.get(), MODULE_N_BYTES, 0); -// } -// -// recv_stopped.wait(); -// -// for (size_t i = 0; i < n_modules; i++) { -// zmq_close(sockets[i]); -// } -// -// zmq_ctx_destroy(ctx); -// cout << "We are finished" << endl; -} \ No newline at end of file diff --git a/core-buffer/test/test_ReplayH5Reader.cpp b/core-buffer/test/test_ReplayH5Reader.cpp deleted file mode 100644 index 9f86bb2..0000000 --- a/core-buffer/test/test_ReplayH5Reader.cpp +++ /dev/null @@ -1,110 +0,0 @@ -#include "ReplayH5Reader.hpp" -#include "BufferH5Writer.hpp" -#include "gtest/gtest.h" - -using namespace core_buffer; - -TEST(ReplayH5Reader, basic_interaction) -{ - auto root_folder = "."; - auto device_name = "fast_device"; - - // This 2 must be compatible by design. - BufferH5Writer writer(root_folder, device_name); - ReplayH5Reader reader(root_folder, device_name); - - size_t pulse_id = 65; - - ModuleFrame w_metadata; - ModuleFrame r_metadata; - auto w_frame_buffer = make_unique(MODULE_N_PIXELS); - auto r_frame_buffer = make_unique(MODULE_N_PIXELS); - - // Setup test values. - w_metadata.pulse_id = pulse_id; - w_metadata.frame_index = 2; - w_metadata.daq_rec = 3; - w_metadata.n_received_packets = 128; - w_metadata.module_id = 4; - - for (size_t i=0; i(MODULE_N_PIXELS); - auto r_frame_buffer = make_unique(MODULE_N_PIXELS); - - // Setup test values. - w_metadata.pulse_id = pulse_id; - w_metadata.frame_index = 2; - w_metadata.daq_rec = 3; - w_metadata.n_received_packets = 128; - w_metadata.module_id = 4; - - for (size_t i=0; i -#include -#include "gtest/gtest.h" -#include "UdpReceiver.hpp" -#include "mock/udp.hpp" - -#include -#include - -using namespace std; - -TEST(UdpReceiver, simple_recv) -{ - uint16_t udp_port = MOCK_UDP_PORT; - - auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); - ASSERT_TRUE(send_socket_fd >= 0); - - UdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - - jungfrau_packet send_udp_buffer; - send_udp_buffer.packetnum = 91; - send_udp_buffer.framenum = 92; - send_udp_buffer.bunchid = 93; - send_udp_buffer.debug = 94; - - auto server_address = get_server_address(udp_port); - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - this_thread::sleep_for(chrono::milliseconds(100)); - - jungfrau_packet recv_udp_buffer; - ASSERT_TRUE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - EXPECT_EQ(send_udp_buffer.packetnum, recv_udp_buffer.packetnum); - EXPECT_EQ(send_udp_buffer.framenum, recv_udp_buffer.framenum); - EXPECT_EQ(send_udp_buffer.bunchid, recv_udp_buffer.bunchid); - EXPECT_EQ(send_udp_buffer.debug, recv_udp_buffer.debug); - - ASSERT_FALSE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - udp_receiver.disconnect(); - ::close(send_socket_fd); -} - -TEST(UdpReceiver, false_recv) -{ - uint16_t udp_port = MOCK_UDP_PORT; - - auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); - ASSERT_TRUE(send_socket_fd >= 0); - - UdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - - jungfrau_packet send_udp_buffer; - jungfrau_packet recv_udp_buffer; - - auto server_address = get_server_address(udp_port); - - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET-1, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - ASSERT_FALSE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - ASSERT_TRUE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); - - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET-1, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - ASSERT_TRUE(udp_receiver.receive( - &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET-1)); - - udp_receiver.disconnect(); - ::close(send_socket_fd); -} - -TEST(UdpReceiver, receive_many) -{ - auto n_msg_buffer = JUNGFRAU_N_PACKETS_PER_FRAME; - jungfrau_packet recv_buffer[n_msg_buffer]; - iovec recv_buff_ptr[n_msg_buffer]; - struct mmsghdr msgs[n_msg_buffer]; - struct sockaddr_in sockFrom[n_msg_buffer]; - - for (int i = 0; i < n_msg_buffer; i++) { - recv_buff_ptr[i].iov_base = (void*) &(recv_buffer[i]); - recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet); - - msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i]; - msgs[i].msg_hdr.msg_iovlen = 1; - msgs[i].msg_hdr.msg_name = &sockFrom[i]; - msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); - } - - uint16_t udp_port = MOCK_UDP_PORT; - - auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); - ASSERT_TRUE(send_socket_fd >= 0); - - UdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - - jungfrau_packet send_udp_buffer; - - auto server_address = get_server_address(udp_port); - - send_udp_buffer.bunchid = 0; - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - send_udp_buffer.bunchid = 1; - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - this_thread::sleep_for(chrono::milliseconds(10)); - - auto n_msgs = udp_receiver.receive_many(msgs, JUNGFRAU_N_PACKETS_PER_FRAME); - ASSERT_EQ(n_msgs, 2); - - for (size_t i=0;i(n_modules*MODULE_N_BYTES); - auto metadata = make_shared(); - - // Needed by writer. - metadata->data_n_bytes[0] = 500; - metadata->n_pulses_in_buffer = 1; - - WriterH5Writer writer("ignore.h5", n_frames, n_modules); - writer.write(metadata.get(), data.get()); - writer.close_file(); -} - -TEST(WriterH5Writer, test_compression) -{ -// size_t n_modules = 2; -// size_t n_frames = 2; -// -// auto comp_buffer_size = bshuf_compress_lz4_bound( -// MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS); -// -// auto f_raw_buffer = make_unique(MODULE_N_PIXELS); -// auto f_comp_buffer = make_unique(comp_buffer_size); -// -// auto i_comp_buffer = make_unique( -// (comp_buffer_size * n_modules) + BSHUF_LZ4_HEADER_BYTES); -// auto i_raw_buffer = make_unique( -// MODULE_N_PIXELS * n_modules * n_frames); -// -// bshuf_write_uint64_BE(&i_comp_buffer[0], -// MODULE_N_BYTES * n_modules); -// bshuf_write_uint32_BE(&i_comp_buffer[8], -// MODULE_N_PIXELS * PIXEL_N_BYTES); -// -// size_t total_compressed_size = BSHUF_LZ4_HEADER_BYTES; -// for (int i_module=0; i_module(); -// metadata->data_n_bytes = total_compressed_size; -// -// metadata->is_good_frame = 1; -// metadata->frame_index = 3; -// metadata->pulse_id = 3; -// metadata->daq_rec = 3; -// -// auto result = bshuf_decompress_lz4( -// &i_comp_buffer[12], &i_raw_buffer[0], -// MODULE_N_PIXELS*n_modules, PIXEL_N_BYTES, MODULE_N_PIXELS); -// -// WriterH5Writer writer("ignore.h5", n_frames, n_modules); -// writer.write(metadata.get(), &i_comp_buffer[0]); -// writer.close_file(); -// -// H5::H5File reader("ignore.h5", H5F_ACC_RDONLY); -// auto image_dataset = reader.openDataSet("image"); -// image_dataset.read(&i_raw_buffer[0], H5::PredType::NATIVE_UINT16); -// -// for (int i_module=0; i_module -#include "WriterZmqReceiver.hpp" -#include "bitshuffle/bitshuffle.h" -#include "zmq.h" - -TEST(WriterZmqReceiver, basic_test) -{ - size_t n_modules = 4; - uint64_t pulse_id = 12345; - - auto ctx = zmq_ctx_new(); - zmq_ctx_set (ctx, ZMQ_IO_THREADS, 1); - - void* sockets[n_modules]; - for (size_t i = 0; i < n_modules; i++) { - sockets[i] = zmq_socket(ctx, ZMQ_PUSH); - - int linger = 0; - if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger, - sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - stringstream ipc_addr; - ipc_addr << REPLAY_STREAM_IPC_URL << i; - const auto ipc = ipc_addr.str(); - - if (zmq_bind(sockets[i], ipc.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - } - this_thread::sleep_for(chrono::milliseconds(100)); - - WriterZmqReceiver receiver(ctx, REPLAY_STREAM_IPC_URL, n_modules); - this_thread::sleep_for(chrono::milliseconds(100)); - - size_t compressed_frame_size = 5000; - auto frame_buffer = make_unique(compressed_frame_size); - - ImageMetadata image_metadata; - auto compress_size = bshuf_compress_lz4_bound( - MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS); - auto image_buffer = make_unique(compress_size * n_modules); - - for (size_t i = 0; i < n_modules; i++) { - - StreamModuleFrame frame_metadata; - frame_metadata.metadata.pulse_id = pulse_id; - frame_metadata.metadata.frame_index = pulse_id + 100; - frame_metadata.metadata.n_received_packets = 128; - frame_metadata.metadata.daq_rec = 4; - - frame_metadata.is_frame_present = 1; - frame_metadata.data_n_bytes = compressed_frame_size; - - zmq_send(sockets[i], - &frame_metadata, - sizeof(StreamModuleFrame), - ZMQ_SNDMORE); - - zmq_send(sockets[i], - (char*)(frame_buffer.get()), - compressed_frame_size, - 0); - } - - receiver.get_next_image(pulse_id, &image_metadata, image_buffer.get()); - EXPECT_EQ(pulse_id, image_metadata.pulse_id); - EXPECT_EQ(image_metadata.is_good_frame, 1); - EXPECT_EQ(image_metadata.daq_rec, 4); - EXPECT_EQ(image_metadata.data_n_bytes, - 5000*n_modules); -// 5000*n_modules+BSHUF_LZ4_HEADER_BYTES); -} \ No newline at end of file diff --git a/core-buffer/test/test_bitshuffle.cpp b/core-buffer/test/test_bitshuffle.cpp index 0de7823..b8458fa 100644 --- a/core-buffer/test/test_bitshuffle.cpp +++ b/core-buffer/test/test_bitshuffle.cpp @@ -6,6 +6,7 @@ extern "C" { } using namespace std; +using namespace core_buffer; TEST(bitshuffle, simple_compression) { diff --git a/core-buffer/include/BufferH5Writer.hpp b/sf-buffer/include/BufferH5Writer.hpp similarity index 100% rename from core-buffer/include/BufferH5Writer.hpp rename to sf-buffer/include/BufferH5Writer.hpp diff --git a/core-buffer/test/mock/udp.hpp b/sf-buffer/test/mock/udp.hpp similarity index 100% rename from core-buffer/test/mock/udp.hpp rename to sf-buffer/test/mock/udp.hpp