diff --git a/sf-writer/CMakeLists.txt b/sf-writer/CMakeLists.txt new file mode 100644 index 0000000..a66a5b1 --- /dev/null +++ b/sf-writer/CMakeLists.txt @@ -0,0 +1,21 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(sf-writer-lib STATIC ${SOURCES}) +target_include_directories(sf-writer-lib PUBLIC include/) +target_link_libraries(sf-writer-lib + external + core-buffer-lib) + +add_executable(sf-writer src/main.cpp) +set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer) +target_link_libraries(sf-writer + sf-writer-lib + zmq + hdf5 + hdf5_cpp + pthread + ) + +enable_testing() +add_subdirectory(test/) \ No newline at end of file diff --git a/sf-writer/include/BufferedFastQueue.hpp b/sf-writer/include/BufferedFastQueue.hpp new file mode 100644 index 0000000..b690f3e --- /dev/null +++ b/sf-writer/include/BufferedFastQueue.hpp @@ -0,0 +1,32 @@ +#ifndef SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP +#define SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP + +#include "FastQueue.hpp" +#include "WriterH5Writer.hpp" + + +class BufferedFastQueue { + FastQueue& queue_; + const size_t buffer_n_pulses_; + const size_t n_modules_; + + ImageMetadataBuffer* queue_meta_buffer_ = nullptr; + char* queue_data_buffer_ = nullptr; + int current_slot_id_ = -1; + + ImageMetadata image_metadata_; + +public: + BufferedFastQueue(FastQueue& queue, + const size_t buffer_n_pulses, + const size_t n_modules); + + ImageMetadata* get_metadata_buffer(); + char* get_data_buffer(); + + void commit(); + void finalize(); +}; + + +#endif //SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP diff --git a/sf-writer/include/WriterH5Writer.hpp b/sf-writer/include/WriterH5Writer.hpp new file mode 100644 index 0000000..d1983d0 --- /dev/null +++ b/sf-writer/include/WriterH5Writer.hpp @@ -0,0 +1,35 @@ +#ifndef SFWRITER_HPP +#define SFWRITER_HPP + +#include +#include +#include +#include "buffer_config.hpp" +#include "formats.hpp" + +class WriterH5Writer { + + const size_t n_frames_; + const size_t n_modules_; + size_t current_write_index_; + + H5::H5File file_; + + H5::DataSet image_dataset_; + H5::DataSet pulse_id_dataset_; + H5::DataSet frame_index_dataset_; + H5::DataSet daq_rec_dataset_; + H5::DataSet is_good_frame_dataset_; + + +public: + WriterH5Writer(const std::string& output_file, + const size_t n_frames, + const size_t n_modules); + ~WriterH5Writer(); + void write(const ImageMetadataBuffer* metadata, const char* data); + void close_file(); +}; + + +#endif //SFWRITER_HPP diff --git a/sf-writer/include/WriterZmqReceiver.hpp b/sf-writer/include/WriterZmqReceiver.hpp new file mode 100644 index 0000000..cde1c8b --- /dev/null +++ b/sf-writer/include/WriterZmqReceiver.hpp @@ -0,0 +1,31 @@ +#ifndef SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP +#define SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP + +#include +#include "WriterH5Writer.hpp" +#include + + +class WriterZmqReceiver { + + const size_t n_modules_; + std::vector sockets_; + + StreamModuleFrame frame_metadata; + +public: + WriterZmqReceiver( + void *ctx, + const std::string& ipc_prefix, + const size_t n_modules); + + virtual ~WriterZmqReceiver(); + + void get_next_image( + const uint64_t pulse_id, + ImageMetadata* image_metadata, + char* image_buffer); +}; + + +#endif //SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP diff --git a/sf-writer/src/BufferedFastQueue.cpp b/sf-writer/src/BufferedFastQueue.cpp new file mode 100644 index 0000000..01a758e --- /dev/null +++ b/sf-writer/src/BufferedFastQueue.cpp @@ -0,0 +1,68 @@ +#include "BufferedFastQueue.hpp" +#include + +using namespace std; +using namespace core_buffer; + +BufferedFastQueue::BufferedFastQueue( + FastQueue& queue, + const size_t buffer_n_pulses, + const size_t n_modules) : + buffer_n_pulses_(buffer_n_pulses), + queue_(queue), + n_modules_(n_modules) +{ + while ((current_slot_id_ = queue_.reserve()) == -1){ + this_thread::sleep_for( + chrono::milliseconds(RB_READ_RETRY_INTERVAL_MS)); + } + + queue_meta_buffer_ = queue_.get_metadata_buffer(current_slot_id_); + queue_meta_buffer_->n_pulses_in_buffer = 0; + queue_data_buffer_ = queue_.get_data_buffer(current_slot_id_); +} + +ImageMetadata* BufferedFastQueue::get_metadata_buffer() +{ + return &image_metadata_; +} + +char* BufferedFastQueue::get_data_buffer() +{ + auto index = queue_meta_buffer_->n_pulses_in_buffer; + auto image_size = MODULE_N_BYTES * n_modules_; + + return queue_data_buffer_ + (index * image_size); +} + +void BufferedFastQueue::commit() +{ + auto index = queue_meta_buffer_->n_pulses_in_buffer; + + queue_meta_buffer_->pulse_id[index] = image_metadata_.pulse_id; + queue_meta_buffer_->frame_index[index] = image_metadata_.frame_index; + queue_meta_buffer_->daq_rec[index] = image_metadata_.daq_rec; + queue_meta_buffer_->is_good_frame[index] = image_metadata_.is_good_frame; + queue_meta_buffer_->data_n_bytes[index] = image_metadata_.data_n_bytes; + + queue_meta_buffer_->n_pulses_in_buffer++; + + if (queue_meta_buffer_->n_pulses_in_buffer == buffer_n_pulses_) { + queue_.commit(); + + while ((current_slot_id_ = queue_.reserve()) == -1){ + this_thread::sleep_for( + chrono::milliseconds(RB_READ_RETRY_INTERVAL_MS)); + } + + queue_meta_buffer_ = queue_.get_metadata_buffer(current_slot_id_); + queue_meta_buffer_->n_pulses_in_buffer = 0; + queue_data_buffer_ = queue_.get_data_buffer(current_slot_id_); + } +} + +void BufferedFastQueue::finalize() { + if (queue_meta_buffer_->n_pulses_in_buffer > 0) { + queue_.commit(); + } +} \ No newline at end of file diff --git a/sf-writer/src/WriterH5Writer.cpp b/sf-writer/src/WriterH5Writer.cpp new file mode 100644 index 0000000..750d6a6 --- /dev/null +++ b/sf-writer/src/WriterH5Writer.cpp @@ -0,0 +1,162 @@ +#include "WriterH5Writer.hpp" +#include + + +//extern "C" +//{ +// #include "H5DOpublic.h" +// #include +//} + +using namespace std; +using namespace core_buffer; + +WriterH5Writer::WriterH5Writer( + const string& output_file, + const size_t n_frames, + const size_t n_modules) : + n_frames_(n_frames), + n_modules_(n_modules), + current_write_index_(0) +{ + +// bshuf_register_h5filter(); + + file_ = H5::H5File(output_file, H5F_ACC_TRUNC); + + hsize_t image_dataset_dims[3] = + {n_frames_, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE}; + + H5::DataSpace image_dataspace(3, image_dataset_dims); + + hsize_t image_dataset_chunking[3] = + {1, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE}; + H5::DSetCreatPropList image_dataset_properties; + image_dataset_properties.setChunk(3, image_dataset_chunking); + +// // block_size, compression type +// uint compression_prop[] = +// {MODULE_N_PIXELS, //block size +// BSHUF_H5_COMPRESS_LZ4}; // Compression type +// +// H5Pset_filter(image_dataset_properties.getId(), +// BSHUF_H5FILTER, +// H5Z_FLAG_MANDATORY, +// 2, +// &(compression_prop[0])); + + image_dataset_ = file_.createDataSet( + "image", + H5::PredType::NATIVE_UINT16, + image_dataspace, + image_dataset_properties); + + hsize_t metadata_dataset_dims[] = {n_frames_, 1}; + H5::DataSpace metadata_dataspace(2, metadata_dataset_dims); + + // Chunk cannot be larger than n_frames. + auto metadata_chunk_size = WRITER_METADATA_CHUNK_N_IMAGES; + if (n_frames < metadata_chunk_size) { + metadata_chunk_size = n_frames; + } + + hsize_t metadata_dataset_chunking[] = {metadata_chunk_size, 1}; + H5::DSetCreatPropList metadata_dataset_properties; + metadata_dataset_properties.setChunk(2, metadata_dataset_chunking); + + pulse_id_dataset_ = file_.createDataSet( + "pulse_id", + H5::PredType::NATIVE_UINT64, + metadata_dataspace, + metadata_dataset_properties); + + frame_index_dataset_ = file_.createDataSet( + "frame_index", + H5::PredType::NATIVE_UINT64, + metadata_dataspace, + metadata_dataset_properties); + + daq_rec_dataset_ = file_.createDataSet( + "daq_rec", + H5::PredType::NATIVE_UINT32, + metadata_dataspace, + metadata_dataset_properties); + + is_good_frame_dataset_ = file_.createDataSet( + "is_good_frame", + H5::PredType::NATIVE_UINT8, + metadata_dataspace, + metadata_dataset_properties); + +} + +WriterH5Writer::~WriterH5Writer() +{ + close_file(); +} + +void WriterH5Writer::close_file() +{ + image_dataset_.close(); + pulse_id_dataset_.close(); + frame_index_dataset_.close(); + daq_rec_dataset_.close(); + is_good_frame_dataset_.close(); + + file_.close(); +} + +void WriterH5Writer::write( + const ImageMetadataBuffer* metadata, const char* data) +{ + auto n_images_in_buffer = metadata->n_pulses_in_buffer; + + hsize_t b_i_dims[3] = { + n_images_in_buffer, + MODULE_Y_SIZE*n_modules_, + MODULE_X_SIZE}; + H5::DataSpace b_i_space(3, b_i_dims); + + hsize_t f_i_dims[3] = {n_frames_, + MODULE_Y_SIZE * n_modules_, + MODULE_X_SIZE}; + H5::DataSpace f_i_space(3, f_i_dims); + + hsize_t i_count[] = {n_images_in_buffer, + MODULE_Y_SIZE*n_modules_, + MODULE_X_SIZE}; + hsize_t i_start[] = {current_write_index_, 0, 0}; + f_i_space.selectHyperslab(H5S_SELECT_SET, i_count, i_start); + + image_dataset_.write( + data, H5::PredType::NATIVE_UINT16, + b_i_space, f_i_space); + + hsize_t b_m_dims[2] = {n_images_in_buffer, 1}; + H5::DataSpace b_m_space (2, b_m_dims); + + hsize_t f_m_dims[] = {n_frames_, 1}; + H5::DataSpace f_m_space(2, f_m_dims); + + hsize_t meta_count[] = {n_images_in_buffer, 1}; + hsize_t meta_start[] = {current_write_index_, 0}; + f_m_space.selectHyperslab(H5S_SELECT_SET, meta_count, meta_start); + + pulse_id_dataset_.write( + &(metadata->pulse_id), H5::PredType::NATIVE_UINT64, + b_m_space, f_m_space); + + frame_index_dataset_.write( + &(metadata->frame_index), H5::PredType::NATIVE_UINT64, + b_m_space, f_m_space); + + daq_rec_dataset_.write( + &(metadata->daq_rec), H5::PredType::NATIVE_UINT32, + b_m_space, f_m_space); + + is_good_frame_dataset_.write( + &(metadata->is_good_frame), H5::PredType::NATIVE_UINT8, + b_m_space, f_m_space); + + current_write_index_++; +} diff --git a/sf-writer/src/WriterZmqReceiver.cpp b/sf-writer/src/WriterZmqReceiver.cpp new file mode 100644 index 0000000..23b388e --- /dev/null +++ b/sf-writer/src/WriterZmqReceiver.cpp @@ -0,0 +1,138 @@ +#include "WriterZmqReceiver.hpp" +#include "zmq.h" +#include "date.h" +#include +#include + +using namespace std; +using namespace core_buffer; + +WriterZmqReceiver::WriterZmqReceiver( + void *ctx, + const string &ipc_prefix, + const size_t n_modules) : + n_modules_(n_modules), + sockets_(n_modules) +{ + + for (size_t i = 0; i < n_modules; i++) { + sockets_[i] = zmq_socket(ctx, ZMQ_PULL); + + int rcvhwm = WRITER_RCVHWM; + if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm, + sizeof(rcvhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + int linger = 0; + if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger, + sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + stringstream ipc_addr; + ipc_addr << ipc_prefix << i; + const auto ipc = ipc_addr.str(); + + if (zmq_connect(sockets_[i], ipc.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + } +} + +WriterZmqReceiver::~WriterZmqReceiver() +{ + for (size_t i = 0; i < n_modules_; i++) { + zmq_close(sockets_[i]); + } +} + +void WriterZmqReceiver::get_next_image( + const uint64_t pulse_id, + ImageMetadata* image_metadata, + char* image_buffer) +{ + // Init the image metadata. + image_metadata->pulse_id = pulse_id; + image_metadata->frame_index = 0; + image_metadata->daq_rec = 0; + image_metadata->data_n_bytes = 0; + image_metadata->is_good_frame = 1; + bool image_metadata_init = false; + + size_t image_buffer_offset = 0; + + for (size_t i_module = 0; i_module < n_modules_; i_module++) { + + auto n_bytes_metadata = zmq_recv( + sockets_[i_module], + &frame_metadata, + sizeof(StreamModuleFrame), + 0); + + if (n_bytes_metadata != sizeof(StreamModuleFrame)) { + throw runtime_error("Wrong number of metadata bytes."); + } + + // sf_replay should always send the right pulse_id. + if (frame_metadata.metadata.pulse_id != pulse_id) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[sf_writer::receive_replay]"; + err_msg << " Read unexpected pulse_id. "; + err_msg << " Expected " << pulse_id; + err_msg << " received "; + err_msg << frame_metadata.metadata.pulse_id; + err_msg << " from i_module " << i_module << endl; + + throw runtime_error(err_msg.str()); + } + + if (!frame_metadata.is_frame_present) { + image_metadata->is_good_frame = 0; + + // Init the image metadata with the first valid frame. + } else if (!image_metadata_init) { + image_metadata_init = true; + + image_metadata->frame_index = + frame_metadata.metadata.frame_index; + image_metadata->daq_rec = + frame_metadata.metadata.daq_rec; + } + + // Once the image is not good, we don't care to re-flag it. + if (image_metadata->is_good_frame == 1) { + if (frame_metadata.metadata.frame_index != + image_metadata->frame_index) { + image_metadata->is_good_frame = 0; + } + + if (frame_metadata.metadata.daq_rec != + image_metadata->daq_rec) { + image_metadata->is_good_frame = 0; + } + + if (frame_metadata.metadata.n_received_packets != + JUNGFRAU_N_PACKETS_PER_FRAME) { + image_metadata->is_good_frame = 0; + } + } + + auto n_bytes_image = zmq_recv( + sockets_[i_module], + (image_buffer + image_buffer_offset), + frame_metadata.data_n_bytes, + 0); + + if (n_bytes_image != frame_metadata.data_n_bytes) { + throw runtime_error("Wrong number of data bytes."); + } + + image_buffer_offset += n_bytes_image; + } + + image_metadata->data_n_bytes = image_buffer_offset; +} diff --git a/sf-writer/src/main.cpp b/sf-writer/src/main.cpp new file mode 100644 index 0000000..f4ceb8f --- /dev/null +++ b/sf-writer/src/main.cpp @@ -0,0 +1,179 @@ +#include +#include +#include "buffer_config.hpp" +#include "zmq.h" +#include +#include +#include +#include +#include "WriterH5Writer.hpp" +#include +#include +#include +#include "date.h" +#include "bitshuffle/bitshuffle.h" +#include "WriterZmqReceiver.hpp" + +using namespace std; +using namespace core_buffer; + +void receive_replay( + void* ctx, + const string ipc_prefix, + const size_t n_modules, + FastQueue& queue, + const uint64_t start_pulse_id, + const uint64_t stop_pulse_id) +{ + try { + WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules); + BufferedFastQueue buffered_queue( + queue, WRITER_DATA_CACHE_N_IMAGES, n_modules); + + uint64_t current_pulse_id=start_pulse_id; + + // "<= stop_pulse_id" because we include the last pulse_id. + while(current_pulse_id<=stop_pulse_id) { + + auto image_metadata = buffered_queue.get_metadata_buffer(); + auto image_buffer = buffered_queue.get_data_buffer(); + + receiver.get_next_image( + current_pulse_id, image_metadata, image_buffer); + + if (image_metadata->pulse_id != current_pulse_id) { + throw runtime_error("Wrong pulse id from zmq receiver."); + } + + buffered_queue.commit(); + current_pulse_id++; + } + + buffered_queue.finalize(); + + } catch (const std::exception& e) { + using namespace date; + using namespace chrono; + + cout << "[" << system_clock::now() << "]"; + cout << "[sf_writer::receive_replay]"; + cout << " Stopped because of exception: " << endl; + cout << e.what() << endl; + + throw; + } +} + +int main (int argc, char *argv[]) +{ + if (argc != 4) { + cout << endl; + cout << "Usage: sf_writer "; + cout << " [output_file] [start_pulse_id] [stop_pulse_id]"; + cout << endl; + cout << "\toutput_file: Complete path to the output file." << endl; + cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; + cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; + cout << endl; + + exit(-1); + } + + string output_file = string(argv[1]); + uint64_t start_pulse_id = (uint64_t) atoll(argv[2]); + uint64_t stop_pulse_id = (uint64_t) atoll(argv[3]); + + size_t n_modules = 32; + + FastQueue queue( + MODULE_N_BYTES * n_modules * WRITER_DATA_CACHE_N_IMAGES, + WRITER_FASTQUEUE_N_SLOTS); + + auto ctx = zmq_ctx_new(); + zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS); + + thread replay_receive_thread(receive_replay, + ctx, REPLAY_STREAM_IPC_URL, n_modules, + ref(queue), start_pulse_id, stop_pulse_id); + + size_t n_frames = stop_pulse_id - start_pulse_id + 1; + WriterH5Writer writer(output_file, n_frames, n_modules); + + // TODO: Remove stats trash. + int stats_counter = 0; + size_t read_total_us = 0; + size_t write_total_us = 0; + size_t read_max_us = 0; + size_t write_max_us = 0; + + auto start_time = chrono::steady_clock::now(); + + auto current_pulse_id = start_pulse_id; + // "<= stop_pulse_id" because we include the last pulse_id. + while (current_pulse_id <= stop_pulse_id) { + + int slot_id; ; + while((slot_id = queue.read()) == -1) { + this_thread::sleep_for(chrono::milliseconds( + RB_READ_RETRY_INTERVAL_MS)); + } + + auto metadata = queue.get_metadata_buffer(slot_id); + auto data = queue.get_data_buffer(slot_id); + + auto read_end_time = chrono::steady_clock::now(); + auto read_us_duration = chrono::duration_cast( + read_end_time-start_time).count(); + + // Verify that all pulse_ids are correct. + for (int i=0; in_pulses_in_buffer; i++) { + if (metadata->pulse_id[i] != current_pulse_id) { + throw runtime_error("Wrong pulse id from receiver thread."); + } + + current_pulse_id++; + } + + start_time = chrono::steady_clock::now(); + + writer.write(metadata, data); + + auto write_end_time = chrono::steady_clock::now(); + auto write_us_duration = chrono::duration_cast( + write_end_time-start_time).count(); + + queue.release(); + + // TODO: Some poor statistics. + stats_counter++; + + read_total_us += read_us_duration; + read_max_us = max(read_max_us, (uint64_t)read_us_duration); + + write_total_us += write_us_duration; + write_max_us = max(write_max_us, (uint64_t)write_us_duration); + +// if (stats_counter == STATS_MODULO) { + cout << "sf_writer:read_us " << read_total_us / STATS_MODULO; + cout << " sf_writer:read_max_us " << read_max_us; + cout << " sf_writer:write_us " << write_total_us / STATS_MODULO; + cout << " sf_writer:write_max_us " << write_max_us; + + cout << endl; + + stats_counter = 0; + read_total_us = 0; + read_max_us = 0; + write_total_us = 0; + write_max_us = 0; +// } + + start_time = chrono::steady_clock::now(); + } + + writer.close_file(); + + //wait till receive thread is finished + replay_receive_thread.join(); + return 0; +} diff --git a/sf-writer/test/CMakeLists.txt b/sf-writer/test/CMakeLists.txt new file mode 100644 index 0000000..713fdd3 --- /dev/null +++ b/sf-writer/test/CMakeLists.txt @@ -0,0 +1,17 @@ +add_executable(sf-writer-tests main.cpp) + +target_link_libraries(sf-writer-tests + sf-writer-lib + hdf5 + hdf5_cpp + zmq + gtest + ) + +#add_executable(perf-sf_writer perf/perf_WriterH5Writer.cpp) +#target_link_libraries(perf-sf_writer +# core-buffer +# hdf5 +# hdf5_hl +# hdf5_cpp +# gtest) \ No newline at end of file diff --git a/sf-writer/test/main.cpp b/sf-writer/test/main.cpp new file mode 100644 index 0000000..7078be5 --- /dev/null +++ b/sf-writer/test/main.cpp @@ -0,0 +1,10 @@ +#include "gtest/gtest.h" +#include "test_WriterZmqReceiver.cpp" +#include "test_WriterH5Writer.cpp" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/sf-writer/test/perf/perf_WriterH5Writer.cpp b/sf-writer/test/perf/perf_WriterH5Writer.cpp new file mode 100644 index 0000000..1d9df1f --- /dev/null +++ b/sf-writer/test/perf/perf_WriterH5Writer.cpp @@ -0,0 +1,90 @@ +#include +#include "buffer_config.hpp" +#include "zmq.h" +#include +#include +#include +#include +#include "WriterH5Writer.hpp" + +using namespace std; +using namespace core_buffer; + + +int main (int argc, char *argv[]) +{ + if (argc != 4) { + cout << endl; + cout << "Usage: sf_writer "; + cout << " [output_file] [start_pulse_id] [stop_pulse_id]"; + cout << endl; + cout << "\toutput_file: Complete path to the output file." << endl; + cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; + cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; + cout << endl; + + exit(-1); + } + + string output_file = string(argv[1]); + uint64_t start_pulse_id = (uint64_t) atoll(argv[2]); + uint64_t stop_pulse_id = (uint64_t) atoll(argv[3]); + + size_t n_modules = 32; + + size_t n_frames = stop_pulse_id - start_pulse_id; + WriterH5Writer writer(output_file, n_frames, n_modules); + + // TODO: Remove stats trash. + int i_write = 0; + size_t total_ms = 0; + size_t max_ms = 0; + size_t min_ms = 10000; // 10 seconds should be a safe first value. + + auto start_time = chrono::steady_clock::now(); + + auto metadata = make_shared(); + auto data = make_unique(MODULE_N_BYTES*n_modules); + + auto current_pulse_id = start_pulse_id; + while (current_pulse_id <= stop_pulse_id) { + + writer.write(metadata.get(), data.get()); + current_pulse_id++; + + i_write++; + + auto end_time = chrono::steady_clock::now(); + + // TODO: Some poor statistics. + + auto ms_duration = chrono::duration_cast( + end_time-start_time).count(); + total_ms += ms_duration; + + if (ms_duration > max_ms) { + max_ms = ms_duration; + } + + if (ms_duration < min_ms) { + min_ms = ms_duration; + } + + if (i_write==100) { + cout << "avg_write_ms " << total_ms / 100; + cout << " min_write_ms " << min_ms; + cout << " max_write_ms " << max_ms << endl; + + i_write = 0; + total_ms = 0; + max_ms = 0; + min_ms = 0; + } + + start_time = chrono::steady_clock::now(); + } + + writer.close_file(); + + return 0; +} diff --git a/sf-writer/test/test_WriterH5Writer.cpp b/sf-writer/test/test_WriterH5Writer.cpp new file mode 100644 index 0000000..fd3f91c --- /dev/null +++ b/sf-writer/test/test_WriterH5Writer.cpp @@ -0,0 +1,92 @@ + +#include "WriterH5Writer.hpp" +#include "gtest/gtest.h" +#include "bitshuffle/bitshuffle.h" + + +using namespace core_buffer; + +TEST(WriterH5Writer, basic_interaction) +{ + size_t n_modules = 2; + size_t n_frames = 5; + + auto data = make_unique(n_modules*MODULE_N_BYTES); + auto metadata = make_shared(); + + // Needed by writer. + metadata->data_n_bytes[0] = 500; + metadata->n_pulses_in_buffer = 1; + + WriterH5Writer writer("ignore.h5", n_frames, n_modules); + writer.write(metadata.get(), data.get()); + writer.close_file(); +} + +TEST(WriterH5Writer, test_compression) +{ +// size_t n_modules = 2; +// size_t n_frames = 2; +// +// auto comp_buffer_size = bshuf_compress_lz4_bound( +// MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS); +// +// auto f_raw_buffer = make_unique(MODULE_N_PIXELS); +// auto f_comp_buffer = make_unique(comp_buffer_size); +// +// auto i_comp_buffer = make_unique( +// (comp_buffer_size * n_modules) + BSHUF_LZ4_HEADER_BYTES); +// auto i_raw_buffer = make_unique( +// MODULE_N_PIXELS * n_modules * n_frames); +// +// bshuf_write_uint64_BE(&i_comp_buffer[0], +// MODULE_N_BYTES * n_modules); +// bshuf_write_uint32_BE(&i_comp_buffer[8], +// MODULE_N_PIXELS * PIXEL_N_BYTES); +// +// size_t total_compressed_size = BSHUF_LZ4_HEADER_BYTES; +// for (int i_module=0; i_module(); +// metadata->data_n_bytes = total_compressed_size; +// +// metadata->is_good_frame = 1; +// metadata->frame_index = 3; +// metadata->pulse_id = 3; +// metadata->daq_rec = 3; +// +// auto result = bshuf_decompress_lz4( +// &i_comp_buffer[12], &i_raw_buffer[0], +// MODULE_N_PIXELS*n_modules, PIXEL_N_BYTES, MODULE_N_PIXELS); +// +// WriterH5Writer writer("ignore.h5", n_frames, n_modules); +// writer.write(metadata.get(), &i_comp_buffer[0]); +// writer.close_file(); +// +// H5::H5File reader("ignore.h5", H5F_ACC_RDONLY); +// auto image_dataset = reader.openDataSet("image"); +// image_dataset.read(&i_raw_buffer[0], H5::PredType::NATIVE_UINT16); +// +// for (int i_module=0; i_module +#include "WriterZmqReceiver.hpp" +#include "bitshuffle/bitshuffle.h" +#include +#include +#include "buffer_config.hpp" +#include "zmq.h" + +using namespace std; +using namespace core_buffer; + +TEST(WriterZmqReceiver, basic_test) +{ + size_t n_modules = 4; + uint64_t pulse_id = 12345; + + auto ctx = zmq_ctx_new(); + zmq_ctx_set (ctx, ZMQ_IO_THREADS, 1); + + void* sockets[n_modules]; + for (size_t i = 0; i < n_modules; i++) { + sockets[i] = zmq_socket(ctx, ZMQ_PUSH); + + int linger = 0; + if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger, + sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + stringstream ipc_addr; + ipc_addr << REPLAY_STREAM_IPC_URL << i; + const auto ipc = ipc_addr.str(); + + if (zmq_bind(sockets[i], ipc.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + } + this_thread::sleep_for(chrono::milliseconds(100)); + + WriterZmqReceiver receiver(ctx, REPLAY_STREAM_IPC_URL, n_modules); + this_thread::sleep_for(chrono::milliseconds(100)); + + size_t compressed_frame_size = 5000; + auto frame_buffer = make_unique(compressed_frame_size); + + ImageMetadata image_metadata; + auto compress_size = bshuf_compress_lz4_bound( + MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS); + auto image_buffer = make_unique(compress_size * n_modules); + + for (size_t i = 0; i < n_modules; i++) { + + StreamModuleFrame frame_metadata; + frame_metadata.metadata.pulse_id = pulse_id; + frame_metadata.metadata.frame_index = pulse_id + 100; + frame_metadata.metadata.n_received_packets = 128; + frame_metadata.metadata.daq_rec = 4; + + frame_metadata.is_frame_present = 1; + frame_metadata.data_n_bytes = compressed_frame_size; + + zmq_send(sockets[i], + &frame_metadata, + sizeof(StreamModuleFrame), + ZMQ_SNDMORE); + + zmq_send(sockets[i], + (char*)(frame_buffer.get()), + compressed_frame_size, + 0); + } + + receiver.get_next_image(pulse_id, &image_metadata, image_buffer.get()); + EXPECT_EQ(pulse_id, image_metadata.pulse_id); + EXPECT_EQ(image_metadata.is_good_frame, 1); + EXPECT_EQ(image_metadata.daq_rec, 4); + EXPECT_EQ(image_metadata.data_n_bytes, + 5000*n_modules); +// 5000*n_modules+BSHUF_LZ4_HEADER_BYTES); +} \ No newline at end of file