Finished refactoring sf-writer

This commit is contained in:
2020-05-20 13:00:46 +02:00
parent 87ff1a14ed
commit c7394769ac
13 changed files with 955 additions and 0 deletions
+21
View File
@@ -0,0 +1,21 @@
file(GLOB SOURCES
src/*.cpp)
add_library(sf-writer-lib STATIC ${SOURCES})
target_include_directories(sf-writer-lib PUBLIC include/)
target_link_libraries(sf-writer-lib
external
core-buffer-lib)
add_executable(sf-writer src/main.cpp)
set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer)
target_link_libraries(sf-writer
sf-writer-lib
zmq
hdf5
hdf5_cpp
pthread
)
enable_testing()
add_subdirectory(test/)
+32
View File
@@ -0,0 +1,32 @@
#ifndef SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP
#define SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP
#include "FastQueue.hpp"
#include "WriterH5Writer.hpp"
class BufferedFastQueue {
FastQueue<ImageMetadataBuffer>& queue_;
const size_t buffer_n_pulses_;
const size_t n_modules_;
ImageMetadataBuffer* queue_meta_buffer_ = nullptr;
char* queue_data_buffer_ = nullptr;
int current_slot_id_ = -1;
ImageMetadata image_metadata_;
public:
BufferedFastQueue(FastQueue<ImageMetadataBuffer>& queue,
const size_t buffer_n_pulses,
const size_t n_modules);
ImageMetadata* get_metadata_buffer();
char* get_data_buffer();
void commit();
void finalize();
};
#endif //SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP
+35
View File
@@ -0,0 +1,35 @@
#ifndef SFWRITER_HPP
#define SFWRITER_HPP
#include <memory>
#include <string>
#include <H5Cpp.h>
#include "buffer_config.hpp"
#include "formats.hpp"
class WriterH5Writer {
const size_t n_frames_;
const size_t n_modules_;
size_t current_write_index_;
H5::H5File file_;
H5::DataSet image_dataset_;
H5::DataSet pulse_id_dataset_;
H5::DataSet frame_index_dataset_;
H5::DataSet daq_rec_dataset_;
H5::DataSet is_good_frame_dataset_;
public:
WriterH5Writer(const std::string& output_file,
const size_t n_frames,
const size_t n_modules);
~WriterH5Writer();
void write(const ImageMetadataBuffer* metadata, const char* data);
void close_file();
};
#endif //SFWRITER_HPP
+31
View File
@@ -0,0 +1,31 @@
#ifndef SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP
#define SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP
#include <string>
#include "WriterH5Writer.hpp"
#include <vector>
class WriterZmqReceiver {
const size_t n_modules_;
std::vector<void*> sockets_;
StreamModuleFrame frame_metadata;
public:
WriterZmqReceiver(
void *ctx,
const std::string& ipc_prefix,
const size_t n_modules);
virtual ~WriterZmqReceiver();
void get_next_image(
const uint64_t pulse_id,
ImageMetadata* image_metadata,
char* image_buffer);
};
#endif //SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP
+68
View File
@@ -0,0 +1,68 @@
#include "BufferedFastQueue.hpp"
#include <thread>
using namespace std;
using namespace core_buffer;
BufferedFastQueue::BufferedFastQueue(
FastQueue<ImageMetadataBuffer>& queue,
const size_t buffer_n_pulses,
const size_t n_modules) :
buffer_n_pulses_(buffer_n_pulses),
queue_(queue),
n_modules_(n_modules)
{
while ((current_slot_id_ = queue_.reserve()) == -1){
this_thread::sleep_for(
chrono::milliseconds(RB_READ_RETRY_INTERVAL_MS));
}
queue_meta_buffer_ = queue_.get_metadata_buffer(current_slot_id_);
queue_meta_buffer_->n_pulses_in_buffer = 0;
queue_data_buffer_ = queue_.get_data_buffer(current_slot_id_);
}
ImageMetadata* BufferedFastQueue::get_metadata_buffer()
{
return &image_metadata_;
}
char* BufferedFastQueue::get_data_buffer()
{
auto index = queue_meta_buffer_->n_pulses_in_buffer;
auto image_size = MODULE_N_BYTES * n_modules_;
return queue_data_buffer_ + (index * image_size);
}
void BufferedFastQueue::commit()
{
auto index = queue_meta_buffer_->n_pulses_in_buffer;
queue_meta_buffer_->pulse_id[index] = image_metadata_.pulse_id;
queue_meta_buffer_->frame_index[index] = image_metadata_.frame_index;
queue_meta_buffer_->daq_rec[index] = image_metadata_.daq_rec;
queue_meta_buffer_->is_good_frame[index] = image_metadata_.is_good_frame;
queue_meta_buffer_->data_n_bytes[index] = image_metadata_.data_n_bytes;
queue_meta_buffer_->n_pulses_in_buffer++;
if (queue_meta_buffer_->n_pulses_in_buffer == buffer_n_pulses_) {
queue_.commit();
while ((current_slot_id_ = queue_.reserve()) == -1){
this_thread::sleep_for(
chrono::milliseconds(RB_READ_RETRY_INTERVAL_MS));
}
queue_meta_buffer_ = queue_.get_metadata_buffer(current_slot_id_);
queue_meta_buffer_->n_pulses_in_buffer = 0;
queue_data_buffer_ = queue_.get_data_buffer(current_slot_id_);
}
}
void BufferedFastQueue::finalize() {
if (queue_meta_buffer_->n_pulses_in_buffer > 0) {
queue_.commit();
}
}
+162
View File
@@ -0,0 +1,162 @@
#include "WriterH5Writer.hpp"
#include <sstream>
//extern "C"
//{
// #include "H5DOpublic.h"
// #include <bitshuffle/bshuf_h5filter.h>
//}
using namespace std;
using namespace core_buffer;
WriterH5Writer::WriterH5Writer(
const string& output_file,
const size_t n_frames,
const size_t n_modules) :
n_frames_(n_frames),
n_modules_(n_modules),
current_write_index_(0)
{
// bshuf_register_h5filter();
file_ = H5::H5File(output_file, H5F_ACC_TRUNC);
hsize_t image_dataset_dims[3] =
{n_frames_, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DataSpace image_dataspace(3, image_dataset_dims);
hsize_t image_dataset_chunking[3] =
{1, n_modules * MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DSetCreatPropList image_dataset_properties;
image_dataset_properties.setChunk(3, image_dataset_chunking);
// // block_size, compression type
// uint compression_prop[] =
// {MODULE_N_PIXELS, //block size
// BSHUF_H5_COMPRESS_LZ4}; // Compression type
//
// H5Pset_filter(image_dataset_properties.getId(),
// BSHUF_H5FILTER,
// H5Z_FLAG_MANDATORY,
// 2,
// &(compression_prop[0]));
image_dataset_ = file_.createDataSet(
"image",
H5::PredType::NATIVE_UINT16,
image_dataspace,
image_dataset_properties);
hsize_t metadata_dataset_dims[] = {n_frames_, 1};
H5::DataSpace metadata_dataspace(2, metadata_dataset_dims);
// Chunk cannot be larger than n_frames.
auto metadata_chunk_size = WRITER_METADATA_CHUNK_N_IMAGES;
if (n_frames < metadata_chunk_size) {
metadata_chunk_size = n_frames;
}
hsize_t metadata_dataset_chunking[] = {metadata_chunk_size, 1};
H5::DSetCreatPropList metadata_dataset_properties;
metadata_dataset_properties.setChunk(2, metadata_dataset_chunking);
pulse_id_dataset_ = file_.createDataSet(
"pulse_id",
H5::PredType::NATIVE_UINT64,
metadata_dataspace,
metadata_dataset_properties);
frame_index_dataset_ = file_.createDataSet(
"frame_index",
H5::PredType::NATIVE_UINT64,
metadata_dataspace,
metadata_dataset_properties);
daq_rec_dataset_ = file_.createDataSet(
"daq_rec",
H5::PredType::NATIVE_UINT32,
metadata_dataspace,
metadata_dataset_properties);
is_good_frame_dataset_ = file_.createDataSet(
"is_good_frame",
H5::PredType::NATIVE_UINT8,
metadata_dataspace,
metadata_dataset_properties);
}
WriterH5Writer::~WriterH5Writer()
{
close_file();
}
void WriterH5Writer::close_file()
{
image_dataset_.close();
pulse_id_dataset_.close();
frame_index_dataset_.close();
daq_rec_dataset_.close();
is_good_frame_dataset_.close();
file_.close();
}
void WriterH5Writer::write(
const ImageMetadataBuffer* metadata, const char* data)
{
auto n_images_in_buffer = metadata->n_pulses_in_buffer;
hsize_t b_i_dims[3] = {
n_images_in_buffer,
MODULE_Y_SIZE*n_modules_,
MODULE_X_SIZE};
H5::DataSpace b_i_space(3, b_i_dims);
hsize_t f_i_dims[3] = {n_frames_,
MODULE_Y_SIZE * n_modules_,
MODULE_X_SIZE};
H5::DataSpace f_i_space(3, f_i_dims);
hsize_t i_count[] = {n_images_in_buffer,
MODULE_Y_SIZE*n_modules_,
MODULE_X_SIZE};
hsize_t i_start[] = {current_write_index_, 0, 0};
f_i_space.selectHyperslab(H5S_SELECT_SET, i_count, i_start);
image_dataset_.write(
data, H5::PredType::NATIVE_UINT16,
b_i_space, f_i_space);
hsize_t b_m_dims[2] = {n_images_in_buffer, 1};
H5::DataSpace b_m_space (2, b_m_dims);
hsize_t f_m_dims[] = {n_frames_, 1};
H5::DataSpace f_m_space(2, f_m_dims);
hsize_t meta_count[] = {n_images_in_buffer, 1};
hsize_t meta_start[] = {current_write_index_, 0};
f_m_space.selectHyperslab(H5S_SELECT_SET, meta_count, meta_start);
pulse_id_dataset_.write(
&(metadata->pulse_id), H5::PredType::NATIVE_UINT64,
b_m_space, f_m_space);
frame_index_dataset_.write(
&(metadata->frame_index), H5::PredType::NATIVE_UINT64,
b_m_space, f_m_space);
daq_rec_dataset_.write(
&(metadata->daq_rec), H5::PredType::NATIVE_UINT32,
b_m_space, f_m_space);
is_good_frame_dataset_.write(
&(metadata->is_good_frame), H5::PredType::NATIVE_UINT8,
b_m_space, f_m_space);
current_write_index_++;
}
+138
View File
@@ -0,0 +1,138 @@
#include "WriterZmqReceiver.hpp"
#include "zmq.h"
#include "date.h"
#include <chrono>
#include <sstream>
using namespace std;
using namespace core_buffer;
WriterZmqReceiver::WriterZmqReceiver(
void *ctx,
const string &ipc_prefix,
const size_t n_modules) :
n_modules_(n_modules),
sockets_(n_modules)
{
for (size_t i = 0; i < n_modules; i++) {
sockets_[i] = zmq_socket(ctx, ZMQ_PULL);
int rcvhwm = WRITER_RCVHWM;
if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm,
sizeof(rcvhwm)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
int linger = 0;
if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger,
sizeof(linger)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
stringstream ipc_addr;
ipc_addr << ipc_prefix << i;
const auto ipc = ipc_addr.str();
if (zmq_connect(sockets_[i], ipc.c_str()) != 0) {
throw runtime_error(zmq_strerror(errno));
}
}
}
WriterZmqReceiver::~WriterZmqReceiver()
{
for (size_t i = 0; i < n_modules_; i++) {
zmq_close(sockets_[i]);
}
}
void WriterZmqReceiver::get_next_image(
const uint64_t pulse_id,
ImageMetadata* image_metadata,
char* image_buffer)
{
// Init the image metadata.
image_metadata->pulse_id = pulse_id;
image_metadata->frame_index = 0;
image_metadata->daq_rec = 0;
image_metadata->data_n_bytes = 0;
image_metadata->is_good_frame = 1;
bool image_metadata_init = false;
size_t image_buffer_offset = 0;
for (size_t i_module = 0; i_module < n_modules_; i_module++) {
auto n_bytes_metadata = zmq_recv(
sockets_[i_module],
&frame_metadata,
sizeof(StreamModuleFrame),
0);
if (n_bytes_metadata != sizeof(StreamModuleFrame)) {
throw runtime_error("Wrong number of metadata bytes.");
}
// sf_replay should always send the right pulse_id.
if (frame_metadata.metadata.pulse_id != pulse_id) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[sf_writer::receive_replay]";
err_msg << " Read unexpected pulse_id. ";
err_msg << " Expected " << pulse_id;
err_msg << " received ";
err_msg << frame_metadata.metadata.pulse_id;
err_msg << " from i_module " << i_module << endl;
throw runtime_error(err_msg.str());
}
if (!frame_metadata.is_frame_present) {
image_metadata->is_good_frame = 0;
// Init the image metadata with the first valid frame.
} else if (!image_metadata_init) {
image_metadata_init = true;
image_metadata->frame_index =
frame_metadata.metadata.frame_index;
image_metadata->daq_rec =
frame_metadata.metadata.daq_rec;
}
// Once the image is not good, we don't care to re-flag it.
if (image_metadata->is_good_frame == 1) {
if (frame_metadata.metadata.frame_index !=
image_metadata->frame_index) {
image_metadata->is_good_frame = 0;
}
if (frame_metadata.metadata.daq_rec !=
image_metadata->daq_rec) {
image_metadata->is_good_frame = 0;
}
if (frame_metadata.metadata.n_received_packets !=
JUNGFRAU_N_PACKETS_PER_FRAME) {
image_metadata->is_good_frame = 0;
}
}
auto n_bytes_image = zmq_recv(
sockets_[i_module],
(image_buffer + image_buffer_offset),
frame_metadata.data_n_bytes,
0);
if (n_bytes_image != frame_metadata.data_n_bytes) {
throw runtime_error("Wrong number of data bytes.");
}
image_buffer_offset += n_bytes_image;
}
image_metadata->data_n_bytes = image_buffer_offset;
}
+179
View File
@@ -0,0 +1,179 @@
#include <iostream>
#include <stdexcept>
#include "buffer_config.hpp"
#include "zmq.h"
#include <string>
#include <jungfrau.hpp>
#include <thread>
#include <chrono>
#include "WriterH5Writer.hpp"
#include <FastQueue.hpp>
#include <cstring>
#include <BufferedFastQueue.hpp>
#include "date.h"
#include "bitshuffle/bitshuffle.h"
#include "WriterZmqReceiver.hpp"
using namespace std;
using namespace core_buffer;
void receive_replay(
void* ctx,
const string ipc_prefix,
const size_t n_modules,
FastQueue<ImageMetadataBuffer>& queue,
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id)
{
try {
WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules);
BufferedFastQueue buffered_queue(
queue, WRITER_DATA_CACHE_N_IMAGES, n_modules);
uint64_t current_pulse_id=start_pulse_id;
// "<= stop_pulse_id" because we include the last pulse_id.
while(current_pulse_id<=stop_pulse_id) {
auto image_metadata = buffered_queue.get_metadata_buffer();
auto image_buffer = buffered_queue.get_data_buffer();
receiver.get_next_image(
current_pulse_id, image_metadata, image_buffer);
if (image_metadata->pulse_id != current_pulse_id) {
throw runtime_error("Wrong pulse id from zmq receiver.");
}
buffered_queue.commit();
current_pulse_id++;
}
buffered_queue.finalize();
} catch (const std::exception& e) {
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[sf_writer::receive_replay]";
cout << " Stopped because of exception: " << endl;
cout << e.what() << endl;
throw;
}
}
int main (int argc, char *argv[])
{
if (argc != 4) {
cout << endl;
cout << "Usage: sf_writer ";
cout << " [output_file] [start_pulse_id] [stop_pulse_id]";
cout << endl;
cout << "\toutput_file: Complete path to the output file." << endl;
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl;
cout << endl;
exit(-1);
}
string output_file = string(argv[1]);
uint64_t start_pulse_id = (uint64_t) atoll(argv[2]);
uint64_t stop_pulse_id = (uint64_t) atoll(argv[3]);
size_t n_modules = 32;
FastQueue<ImageMetadataBuffer> queue(
MODULE_N_BYTES * n_modules * WRITER_DATA_CACHE_N_IMAGES,
WRITER_FASTQUEUE_N_SLOTS);
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS);
thread replay_receive_thread(receive_replay,
ctx, REPLAY_STREAM_IPC_URL, n_modules,
ref(queue), start_pulse_id, stop_pulse_id);
size_t n_frames = stop_pulse_id - start_pulse_id + 1;
WriterH5Writer writer(output_file, n_frames, n_modules);
// TODO: Remove stats trash.
int stats_counter = 0;
size_t read_total_us = 0;
size_t write_total_us = 0;
size_t read_max_us = 0;
size_t write_max_us = 0;
auto start_time = chrono::steady_clock::now();
auto current_pulse_id = start_pulse_id;
// "<= stop_pulse_id" because we include the last pulse_id.
while (current_pulse_id <= stop_pulse_id) {
int slot_id; ;
while((slot_id = queue.read()) == -1) {
this_thread::sleep_for(chrono::milliseconds(
RB_READ_RETRY_INTERVAL_MS));
}
auto metadata = queue.get_metadata_buffer(slot_id);
auto data = queue.get_data_buffer(slot_id);
auto read_end_time = chrono::steady_clock::now();
auto read_us_duration = chrono::duration_cast<chrono::microseconds>(
read_end_time-start_time).count();
// Verify that all pulse_ids are correct.
for (int i=0; i<metadata->n_pulses_in_buffer; i++) {
if (metadata->pulse_id[i] != current_pulse_id) {
throw runtime_error("Wrong pulse id from receiver thread.");
}
current_pulse_id++;
}
start_time = chrono::steady_clock::now();
writer.write(metadata, data);
auto write_end_time = chrono::steady_clock::now();
auto write_us_duration = chrono::duration_cast<chrono::microseconds>(
write_end_time-start_time).count();
queue.release();
// TODO: Some poor statistics.
stats_counter++;
read_total_us += read_us_duration;
read_max_us = max(read_max_us, (uint64_t)read_us_duration);
write_total_us += write_us_duration;
write_max_us = max(write_max_us, (uint64_t)write_us_duration);
// if (stats_counter == STATS_MODULO) {
cout << "sf_writer:read_us " << read_total_us / STATS_MODULO;
cout << " sf_writer:read_max_us " << read_max_us;
cout << " sf_writer:write_us " << write_total_us / STATS_MODULO;
cout << " sf_writer:write_max_us " << write_max_us;
cout << endl;
stats_counter = 0;
read_total_us = 0;
read_max_us = 0;
write_total_us = 0;
write_max_us = 0;
// }
start_time = chrono::steady_clock::now();
}
writer.close_file();
//wait till receive thread is finished
replay_receive_thread.join();
return 0;
}
+17
View File
@@ -0,0 +1,17 @@
add_executable(sf-writer-tests main.cpp)
target_link_libraries(sf-writer-tests
sf-writer-lib
hdf5
hdf5_cpp
zmq
gtest
)
#add_executable(perf-sf_writer perf/perf_WriterH5Writer.cpp)
#target_link_libraries(perf-sf_writer
# core-buffer
# hdf5
# hdf5_hl
# hdf5_cpp
# gtest)
+10
View File
@@ -0,0 +1,10 @@
#include "gtest/gtest.h"
#include "test_WriterZmqReceiver.cpp"
#include "test_WriterH5Writer.cpp"
using namespace std;
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
@@ -0,0 +1,90 @@
#include <iostream>
#include "buffer_config.hpp"
#include "zmq.h"
#include <string>
#include <RingBuffer.hpp>
#include <thread>
#include <chrono>
#include "WriterH5Writer.hpp"
using namespace std;
using namespace core_buffer;
int main (int argc, char *argv[])
{
if (argc != 4) {
cout << endl;
cout << "Usage: sf_writer ";
cout << " [output_file] [start_pulse_id] [stop_pulse_id]";
cout << endl;
cout << "\toutput_file: Complete path to the output file." << endl;
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl;
cout << endl;
exit(-1);
}
string output_file = string(argv[1]);
uint64_t start_pulse_id = (uint64_t) atoll(argv[2]);
uint64_t stop_pulse_id = (uint64_t) atoll(argv[3]);
size_t n_modules = 32;
size_t n_frames = stop_pulse_id - start_pulse_id;
WriterH5Writer writer(output_file, n_frames, n_modules);
// TODO: Remove stats trash.
int i_write = 0;
size_t total_ms = 0;
size_t max_ms = 0;
size_t min_ms = 10000; // 10 seconds should be a safe first value.
auto start_time = chrono::steady_clock::now();
auto metadata = make_shared<ImageMetadata>();
auto data = make_unique<char[]>(MODULE_N_BYTES*n_modules);
auto current_pulse_id = start_pulse_id;
while (current_pulse_id <= stop_pulse_id) {
writer.write(metadata.get(), data.get());
current_pulse_id++;
i_write++;
auto end_time = chrono::steady_clock::now();
// TODO: Some poor statistics.
auto ms_duration = chrono::duration_cast<chrono::milliseconds>(
end_time-start_time).count();
total_ms += ms_duration;
if (ms_duration > max_ms) {
max_ms = ms_duration;
}
if (ms_duration < min_ms) {
min_ms = ms_duration;
}
if (i_write==100) {
cout << "avg_write_ms " << total_ms / 100;
cout << " min_write_ms " << min_ms;
cout << " max_write_ms " << max_ms << endl;
i_write = 0;
total_ms = 0;
max_ms = 0;
min_ms = 0;
}
start_time = chrono::steady_clock::now();
}
writer.close_file();
return 0;
}
+92
View File
@@ -0,0 +1,92 @@
#include "WriterH5Writer.hpp"
#include "gtest/gtest.h"
#include "bitshuffle/bitshuffle.h"
using namespace core_buffer;
TEST(WriterH5Writer, basic_interaction)
{
size_t n_modules = 2;
size_t n_frames = 5;
auto data = make_unique<char[]>(n_modules*MODULE_N_BYTES);
auto metadata = make_shared<ImageMetadataBuffer>();
// Needed by writer.
metadata->data_n_bytes[0] = 500;
metadata->n_pulses_in_buffer = 1;
WriterH5Writer writer("ignore.h5", n_frames, n_modules);
writer.write(metadata.get(), data.get());
writer.close_file();
}
TEST(WriterH5Writer, test_compression)
{
// size_t n_modules = 2;
// size_t n_frames = 2;
//
// auto comp_buffer_size = bshuf_compress_lz4_bound(
// MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS);
//
// auto f_raw_buffer = make_unique<uint16_t[]>(MODULE_N_PIXELS);
// auto f_comp_buffer = make_unique<char[]>(comp_buffer_size);
//
// auto i_comp_buffer = make_unique<char[]>(
// (comp_buffer_size * n_modules) + BSHUF_LZ4_HEADER_BYTES);
// auto i_raw_buffer = make_unique<uint16_t[]>(
// MODULE_N_PIXELS * n_modules * n_frames);
//
// bshuf_write_uint64_BE(&i_comp_buffer[0],
// MODULE_N_BYTES * n_modules);
// bshuf_write_uint32_BE(&i_comp_buffer[8],
// MODULE_N_PIXELS * PIXEL_N_BYTES);
//
// size_t total_compressed_size = BSHUF_LZ4_HEADER_BYTES;
// for (int i_module=0; i_module<n_modules; i_module++) {
//
// for (size_t i=0; i<MODULE_N_PIXELS; i++) {
// f_raw_buffer[i] = (uint16_t)((i % 100) + (i_module*100));
// }
//
// auto compressed_size = bshuf_compress_lz4(
// f_raw_buffer.get(), f_comp_buffer.get(),
// MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS);
//
// memcpy((i_comp_buffer.get() + total_compressed_size),
// f_comp_buffer.get(),
// compressed_size);
//
// total_compressed_size += compressed_size;
// }
//
// auto metadata = make_shared<ImageMetadata>();
// metadata->data_n_bytes = total_compressed_size;
//
// metadata->is_good_frame = 1;
// metadata->frame_index = 3;
// metadata->pulse_id = 3;
// metadata->daq_rec = 3;
//
// auto result = bshuf_decompress_lz4(
// &i_comp_buffer[12], &i_raw_buffer[0],
// MODULE_N_PIXELS*n_modules, PIXEL_N_BYTES, MODULE_N_PIXELS);
//
// WriterH5Writer writer("ignore.h5", n_frames, n_modules);
// writer.write(metadata.get(), &i_comp_buffer[0]);
// writer.close_file();
//
// H5::H5File reader("ignore.h5", H5F_ACC_RDONLY);
// auto image_dataset = reader.openDataSet("image");
// image_dataset.read(&i_raw_buffer[0], H5::PredType::NATIVE_UINT16);
//
// for (int i_module=0; i_module<n_modules; i_module++) {
// for (int i_pixel=0; i_pixel<MODULE_N_PIXELS; i_pixel++) {
// size_t offset = (i_module * MODULE_N_PIXELS) + i_pixel;
// ASSERT_EQ(i_raw_buffer[offset],
// (uint16_t)((i_pixel % 100) + (i_module*100)));
// }
// }
}
+80
View File
@@ -0,0 +1,80 @@
#include <gtest/gtest.h>
#include "WriterZmqReceiver.hpp"
#include "bitshuffle/bitshuffle.h"
#include <thread>
#include <sstream>
#include "buffer_config.hpp"
#include "zmq.h"
using namespace std;
using namespace core_buffer;
TEST(WriterZmqReceiver, basic_test)
{
size_t n_modules = 4;
uint64_t pulse_id = 12345;
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, 1);
void* sockets[n_modules];
for (size_t i = 0; i < n_modules; i++) {
sockets[i] = zmq_socket(ctx, ZMQ_PUSH);
int linger = 0;
if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger,
sizeof(linger)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
stringstream ipc_addr;
ipc_addr << REPLAY_STREAM_IPC_URL << i;
const auto ipc = ipc_addr.str();
if (zmq_bind(sockets[i], ipc.c_str()) != 0) {
throw runtime_error(zmq_strerror(errno));
}
}
this_thread::sleep_for(chrono::milliseconds(100));
WriterZmqReceiver receiver(ctx, REPLAY_STREAM_IPC_URL, n_modules);
this_thread::sleep_for(chrono::milliseconds(100));
size_t compressed_frame_size = 5000;
auto frame_buffer = make_unique<char[]>(compressed_frame_size);
ImageMetadata image_metadata;
auto compress_size = bshuf_compress_lz4_bound(
MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS);
auto image_buffer = make_unique<char[]>(compress_size * n_modules);
for (size_t i = 0; i < n_modules; i++) {
StreamModuleFrame frame_metadata;
frame_metadata.metadata.pulse_id = pulse_id;
frame_metadata.metadata.frame_index = pulse_id + 100;
frame_metadata.metadata.n_received_packets = 128;
frame_metadata.metadata.daq_rec = 4;
frame_metadata.is_frame_present = 1;
frame_metadata.data_n_bytes = compressed_frame_size;
zmq_send(sockets[i],
&frame_metadata,
sizeof(StreamModuleFrame),
ZMQ_SNDMORE);
zmq_send(sockets[i],
(char*)(frame_buffer.get()),
compressed_frame_size,
0);
}
receiver.get_next_image(pulse_id, &image_metadata, image_buffer.get());
EXPECT_EQ(pulse_id, image_metadata.pulse_id);
EXPECT_EQ(image_metadata.is_good_frame, 1);
EXPECT_EQ(image_metadata.daq_rec, 4);
EXPECT_EQ(image_metadata.data_n_bytes,
5000*n_modules);
// 5000*n_modules+BSHUF_LZ4_HEADER_BYTES);
}