Remove old writer

This commit is contained in:
2021-02-17 12:46:58 +01:00
parent 6212869f0a
commit d0df2677de
17 changed files with 0 additions and 1311 deletions
-1
View File
@@ -33,5 +33,4 @@ add_subdirectory("jf-udp-recv")
add_subdirectory("jf-buffer-writer")
add_subdirectory("jf-assembler")
add_subdirectory("sf-stream")
add_subdirectory("sf-writer")
add_subdirectory("jf-live-writer")
@@ -1,36 +0,0 @@
#ifndef IMAGEBINARYWRITER_HPP
#define IMAGEBINARYWRITER_HPP
#include <string>
#include "formats.hpp"
class ImageBinaryWriter {
const size_t IMAGE_BYTES;
const size_t IMAGE_SLOT_BYTES;
const size_t MAX_FILE_BYTES;
const std::string detector_folder_;
std::string latest_filename_;
std::string current_output_filename_;
int output_file_fd_;
void open_file(const std::string& filename);
void close_current_file();
public:
ImageBinaryWriter(
const std::string& detector_folder,
const uint64_t image_n_bytes);
virtual ~ImageBinaryWriter();
void write(const ImageMetadata meta, const char* data);
};
#endif //IMAGEBINARYWRITER_HPP
-143
View File
@@ -1,143 +0,0 @@
#include "ImageBinaryWriter.hpp"
#include <unistd.h>
#include <iostream>
#include "date.h"
#include <cerrno>
#include <chrono>
#include <cstring>
#include <fcntl.h>
#include "BufferUtils.hpp"
using namespace std;
using namespace buffer_config;
ImageBinaryWriter::ImageBinaryWriter(
const string& detector_folder,
const size_t image_n_bytes):
IMAGE_BYTES(image_n_bytes),
IMAGE_SLOT_BYTES(IMAGE_BYTES + sizeof(ImageMetadata)),
MAX_FILE_BYTES(IMAGE_SLOT_BYTES * FILE_MOD),
detector_folder_(detector_folder),
latest_filename_(detector_folder + "/LATEST"),
current_output_filename_(""),
output_file_fd_(-1)
{
}
ImageBinaryWriter::~ImageBinaryWriter()
{
close_current_file();
}
void ImageBinaryWriter::write(const ImageMetadata meta, const char* data)
{
auto current_frame_file =
BufferUtils::get_image_filename(detector_folder_, meta.pulse_id);
if (current_frame_file != current_output_filename_) {
open_file(current_frame_file);
}
size_t n_bytes_offset =
BufferUtils::get_file_frame_index(meta.pulse_id) * IMAGE_SLOT_BYTES;
auto lseek_result = lseek(output_file_fd_, n_bytes_offset, SEEK_SET);
if (lseek_result < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[ImageBinaryWriter::write]";
err_msg << " Error while lseek on file ";
err_msg << current_output_filename_;
err_msg << " for n_bytes_offset ";
err_msg << n_bytes_offset << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
auto n_bytes_meta = ::write(output_file_fd_, &meta, sizeof(ImageMetadata));
if (n_bytes_meta < sizeof(ImageMetadata)) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::write]";
err_msg << " Error while writing to file ";
err_msg << current_output_filename_ << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
auto n_bytes_data = ::write(output_file_fd_, data, IMAGE_BYTES);
if (n_bytes_data < sizeof(IMAGE_BYTES)) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::write]";
err_msg << " Error while writing to file ";
err_msg << current_output_filename_ << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
}
void ImageBinaryWriter::open_file(const std::string& filename)
{
close_current_file();
BufferUtils::create_destination_folder(filename);
output_file_fd_ = ::open(filename.c_str(), O_WRONLY | O_CREAT,
S_IRWXU | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
if (output_file_fd_ < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[ImageBinaryWriter::open_file]";
err_msg << " Cannot create file ";
err_msg << filename << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
current_output_filename_ = filename;
}
void ImageBinaryWriter::close_current_file()
{
if (output_file_fd_ != -1) {
if (close(output_file_fd_) < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[ImageBinaryWriter::close_current_file]";
err_msg << " Error while closing file ";
err_msg << current_output_filename_ << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
output_file_fd_ = -1;
BufferUtils::update_latest_file(
latest_filename_, current_output_filename_);
current_output_filename_ = "";
}
}
-22
View File
@@ -1,22 +0,0 @@
file(GLOB SOURCES
src/*.cpp)
add_library(sf-writer-lib STATIC ${SOURCES})
target_include_directories(sf-writer-lib PUBLIC include/)
target_link_libraries(sf-writer-lib
external
core-buffer-lib)
add_executable(sf-writer src/main.cpp)
set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer)
target_link_libraries(sf-writer
sf-writer-lib
zmq
hdf5
hdf5_hl
hdf5_cpp
pthread
)
enable_testing()
add_subdirectory(test/)
-42
View File
@@ -1,42 +0,0 @@
# sf-writer
sf-writer reads the binary buffer from disk, assembled the images and writes
them down in HDF5 format.
## Data request ranges
Data request ranges are composed of:
- start_pulse_id (first pulse_id to be included in the file)
- stop_pulse_id (last pulse_id to be included in the file)
- pulse_id_step (how many pulses to skip between images.)
pulse_id_step can be used to write data at different frequencies:
- pulse_id_step == 1 (100Hz, write very pulse_id)
- pulse_id_step == 2 (50hz, write every second pulse)
- pulse_id_step == 10 (10Hz, write every 10th pulse)
The next pulse_id to be written is calculated internally as:
```c++
auto next_pulse_id = currnet_pulse_id + pulse_id_step;
```
The loop criteria for writing is:
```c++
for (
auto curr_pulse_id = start_pulse_id;
curr_pulse_id <= stop_pulse_id;
curr_pulse_id += pulse_id_step
) {
// Write curr_pulse_id to output file.
}
```
**Warning**
If your stop_pulse_id cannot be reached by adding step_pulse_id to
start_pulse_id (start_pulse_id + (n * pulse_id_step) != stop_pulse_id for any n)
it will not be included in the final file.
-28
View File
@@ -1,28 +0,0 @@
#ifndef SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP
#define SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP
#include <formats.hpp>
class BufferBinaryReader {
const std::string detector_folder_;
const std::string module_name_;
std::string current_input_file_;
int input_file_fd_;
void open_file(const std::string& filename);
void close_current_file();
public:
BufferBinaryReader(const std::string &detector_folder,
const std::string &module_name);
~BufferBinaryReader();
void get_block(const uint64_t block_id, BufferBinaryBlock *buffer);
};
#endif //SF_DAQ_BUFFER_BUFFERBINARYREADER_HPP
-53
View File
@@ -1,53 +0,0 @@
#ifndef SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP
#define SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP
#include <atomic>
#include "buffer_config.hpp"
#include "formats.hpp"
const uint64_t IA_EMPTY_SLOT_VALUE = 0;
struct ImageMetadataBlock
{
uint64_t pulse_id[buffer_config::BUFFER_BLOCK_SIZE];
uint64_t frame_index[buffer_config::BUFFER_BLOCK_SIZE];
uint32_t daq_rec[buffer_config::BUFFER_BLOCK_SIZE];
uint8_t is_good_image[buffer_config::BUFFER_BLOCK_SIZE];
uint64_t block_start_pulse_id;
uint64_t block_stop_pulse_id;
};
class ImageAssembler {
const size_t n_modules_;
const size_t image_buffer_slot_n_bytes_;
char* image_buffer_;
ImageMetadataBlock* meta_buffer_;
ModuleFrame* frame_meta_buffer_;
std::atomic_int* buffer_status_;
std::atomic_uint64_t* buffer_bunch_id_;
size_t get_data_offset(const uint64_t slot_id, const int i_module);
size_t get_metadata_offset(const uint64_t slot_id, const int i_module);
public:
ImageAssembler(const size_t n_modules);
virtual ~ImageAssembler();
bool is_slot_free(const uint64_t bunch_id);
bool is_slot_full(const uint64_t bunch_id);
void process(const uint64_t bunch_id,
const int i_module,
const BufferBinaryBlock* block_buffer);
void free_slot(const uint64_t bunch_id);
ImageMetadataBlock* get_metadata_buffer(const uint64_t bunch_id);
char* get_data_buffer(const uint64_t bunch_id);
};
#endif //SF_DAQ_BUFFER_IMAGEASSEMBLER_HPP
-9
View File
@@ -1,9 +0,0 @@
#include <cstddef>
namespace writer_config
{
// MS to retry reading from the image assembler.
const size_t ASSEMBLER_RETRY_MS = 5;
// Number of slots in the reconstruction buffer.
const size_t WRITER_IA_N_SLOTS = 2;
}
-107
View File
@@ -1,107 +0,0 @@
#include "BufferBinaryReader.hpp"
#include <unistd.h>
#include <sstream>
#include <cstring>
#include <fcntl.h>
#include <stdexcept>
#include "BufferUtils.hpp"
#include "writer_config.hpp"
#include "buffer_config.hpp"
using namespace std;
using namespace writer_config;
using namespace buffer_config;
BufferBinaryReader::BufferBinaryReader(
const std::string &detector_folder,
const std::string &module_name) :
detector_folder_(detector_folder),
module_name_(module_name),
current_input_file_(""),
input_file_fd_(-1)
{}
BufferBinaryReader::~BufferBinaryReader()
{
close_current_file();
}
void BufferBinaryReader::get_block(
const uint64_t block_id, BufferBinaryBlock* buffer)
{
uint64_t block_start_pulse_id = block_id * BUFFER_BLOCK_SIZE;
auto current_block_file = BufferUtils::get_filename(
detector_folder_, module_name_, block_start_pulse_id);
if (current_block_file != current_input_file_) {
open_file(current_block_file);
}
size_t file_start_index =
BufferUtils::get_file_frame_index(block_start_pulse_id);
size_t n_bytes_offset = file_start_index * sizeof(BufferBinaryFormat);
auto lseek_result = lseek(input_file_fd_, n_bytes_offset, SEEK_SET);
if (lseek_result < 0) {
stringstream err_msg;
err_msg << "[BufferBinaryReader::get_block]";
err_msg << " Error while lseek on file ";
err_msg << current_input_file_ << " for n_bytes_offset ";
err_msg << n_bytes_offset << ": " << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
auto n_bytes = ::read(input_file_fd_, buffer,
sizeof(BufferBinaryFormat) * BUFFER_BLOCK_SIZE);
if (n_bytes < sizeof(BufferBinaryFormat)) {
stringstream err_msg;
err_msg << "[BufferBinaryReader::get_block]";
err_msg << " Error while reading from file ";
err_msg << current_input_file_ << ": " << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
}
void BufferBinaryReader::open_file(const std::string& filename)
{
close_current_file();
input_file_fd_ = open(filename.c_str(), O_RDONLY);
if (input_file_fd_ < 0) {
stringstream err_msg;
err_msg << "[BufferBinaryReader::open_file]";
err_msg << " Cannot open file " << filename << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
current_input_file_ = filename;
}
void BufferBinaryReader::close_current_file()
{
if (input_file_fd_ != -1) {
if (close(input_file_fd_) < 0) {
stringstream err_msg;
err_msg << "[BinaryWriter::close_current_file]";
err_msg << " Error while closing file " << current_input_file_;
err_msg << ": " << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
input_file_fd_ = -1;
current_input_file_ = "";
}
}
-186
View File
@@ -1,186 +0,0 @@
#include <cstring>
#include "ImageAssembler.hpp"
#include "writer_config.hpp"
#include "buffer_config.hpp"
using namespace std;
using namespace writer_config;
using namespace buffer_config;
ImageAssembler::ImageAssembler(const size_t n_modules) :
n_modules_(n_modules),
image_buffer_slot_n_bytes_(BUFFER_BLOCK_SIZE * MODULE_N_BYTES * n_modules_)
{
image_buffer_ = new char[WRITER_IA_N_SLOTS * image_buffer_slot_n_bytes_];
meta_buffer_ = new ImageMetadataBlock[WRITER_IA_N_SLOTS];
frame_meta_buffer_ =
new ModuleFrame[WRITER_IA_N_SLOTS * n_modules * BUFFER_BLOCK_SIZE];
buffer_status_ = new atomic_int[WRITER_IA_N_SLOTS];
buffer_bunch_id_ = new atomic_uint64_t[WRITER_IA_N_SLOTS];
for (size_t i=0; i < WRITER_IA_N_SLOTS; i++) {
free_slot(i);
}
}
ImageAssembler::~ImageAssembler()
{
delete[] image_buffer_;
delete[] meta_buffer_;
}
bool ImageAssembler::is_slot_free(const uint64_t bunch_id)
{
auto slot_id = bunch_id % WRITER_IA_N_SLOTS;
uint64_t slot_bunch_id = IA_EMPTY_SLOT_VALUE;
if (buffer_bunch_id_[slot_id].compare_exchange_strong(
slot_bunch_id, bunch_id)) {
return true;
}
auto is_free = buffer_status_[slot_id].load(memory_order_relaxed) > 0;
return is_free && (slot_bunch_id == bunch_id);
}
bool ImageAssembler::is_slot_full(const uint64_t bunch_id)
{
auto slot_id = bunch_id % WRITER_IA_N_SLOTS;
return buffer_status_[slot_id].load(memory_order_relaxed) == 0;
}
size_t ImageAssembler::get_data_offset(
const uint64_t slot_id, const int i_module)
{
size_t slot_i_offset = slot_id * image_buffer_slot_n_bytes_;
size_t module_i_offset = i_module * MODULE_N_BYTES;
return slot_i_offset + module_i_offset;
}
size_t ImageAssembler::get_metadata_offset(
const uint64_t slot_id, const int i_module)
{
size_t n_metadata_in_slot = n_modules_ * BUFFER_BLOCK_SIZE;
size_t slot_m_offset = slot_id * n_metadata_in_slot;
size_t module_m_offset = i_module;
return slot_m_offset + module_m_offset;
}
void ImageAssembler::process(
const uint64_t bunch_id,
const int i_module,
const BufferBinaryBlock* block_buffer)
{
const auto slot_id = bunch_id % WRITER_IA_N_SLOTS;
auto meta_offset = get_metadata_offset(slot_id, i_module);
const auto meta_offset_step = n_modules_;
auto image_offset = get_data_offset(slot_id, i_module);
const auto image_offset_step = MODULE_N_BYTES * n_modules_;
for (const auto& frame : block_buffer->frame) {
memcpy(
&(frame_meta_buffer_[meta_offset]),
&(frame.meta),
sizeof(ModuleFrame));
meta_offset += meta_offset_step;
memcpy(
image_buffer_ + image_offset,
&(frame.data[0]),
MODULE_N_BYTES);
image_offset += image_offset_step;
}
buffer_status_[slot_id].fetch_sub(1, memory_order_relaxed);
}
void ImageAssembler::free_slot(const uint64_t bunch_id)
{
auto slot_id = bunch_id % WRITER_IA_N_SLOTS;
buffer_status_[slot_id].store(n_modules_, memory_order_relaxed);
buffer_bunch_id_[slot_id].store(IA_EMPTY_SLOT_VALUE, memory_order_relaxed);
}
ImageMetadataBlock* ImageAssembler::get_metadata_buffer(const uint64_t bunch_id)
{
const auto slot_id = bunch_id % WRITER_IA_N_SLOTS;
auto& image_pulse_id = meta_buffer_[slot_id].pulse_id;
auto& image_frame_index = meta_buffer_[slot_id].frame_index;
auto& image_daq_rec = meta_buffer_[slot_id].daq_rec;
auto& image_is_good_frame = meta_buffer_[slot_id].is_good_image;
auto meta_offset = get_metadata_offset(slot_id, 0);
const auto meta_offset_step = 1;
uint64_t start_pulse_id = bunch_id * BUFFER_BLOCK_SIZE;
meta_buffer_[slot_id].block_start_pulse_id = start_pulse_id;
uint64_t stop_pulse_id = start_pulse_id + BUFFER_BLOCK_SIZE - 1;
meta_buffer_[slot_id].block_stop_pulse_id = stop_pulse_id;
for (size_t i_pulse=0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) {
auto is_pulse_init = false;
image_is_good_frame[i_pulse] = 1;
image_pulse_id[i_pulse] = 0;
for (size_t i_module=0; i_module < n_modules_; i_module++) {
auto& frame_meta = frame_meta_buffer_[meta_offset];
auto is_good_frame =
frame_meta.n_recv_packets == JF_N_PACKETS_PER_FRAME;
if (!is_good_frame) {
image_is_good_frame[i_pulse] = 0;
// TODO: Update meta_offset only once in the loop.
meta_offset += meta_offset_step;
continue;
}
if (!is_pulse_init) {
image_pulse_id[i_pulse] = frame_meta.pulse_id;
image_frame_index[i_pulse] = frame_meta.frame_index;
image_daq_rec[i_pulse] = frame_meta.daq_rec;
is_pulse_init = true;
}
if (image_is_good_frame[i_pulse] == 1) {
if (frame_meta.pulse_id != image_pulse_id[i_pulse]) {
image_is_good_frame[i_pulse] = 0;
}
if (frame_meta.frame_index != image_frame_index[i_pulse]) {
image_is_good_frame[i_pulse] = 0;
}
if (frame_meta.daq_rec != image_daq_rec[i_pulse]) {
image_is_good_frame[i_pulse] = 0;
}
if (frame_meta.n_recv_packets != JF_N_PACKETS_PER_FRAME) {
image_is_good_frame[i_pulse] = 0;
}
}
meta_offset += meta_offset_step;
}
}
return &(meta_buffer_[slot_id]);
}
char* ImageAssembler::get_data_buffer(const uint64_t bunch_id)
{
auto slot_id = bunch_id % WRITER_IA_N_SLOTS;
return image_buffer_ + (slot_id * image_buffer_slot_n_bytes_);
}
-158
View File
@@ -1,158 +0,0 @@
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <vector>
#include "date.h"
#include "zmq.h"
#include "writer_config.hpp"
#include "buffer_config.hpp"
#include "bitshuffle/bitshuffle.h"
#include "JFH5Writer.hpp"
#include "ImageAssembler.hpp"
#include "BufferBinaryReader.hpp"
using namespace std;
using namespace chrono;
using namespace writer_config;
using namespace buffer_config;
void read_buffer(
const string detector_folder,
const string module_name,
const int i_module,
const vector<uint64_t>& buffer_blocks,
ImageAssembler& image_assembler)
{
BufferBinaryReader block_reader(detector_folder, module_name);
auto block_buffer = new BufferBinaryBlock();
for (uint64_t block_id:buffer_blocks) {
while(!image_assembler.is_slot_free(block_id)) {
this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS));
}
auto start_time = steady_clock::now();
block_reader.get_block(block_id, block_buffer);
auto end_time = steady_clock::now();
uint64_t read_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
start_time = steady_clock::now();
image_assembler.process(block_id, i_module, block_buffer);
end_time = steady_clock::now();
uint64_t compose_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
cout << "sf_writer:avg_read_us ";
cout << read_us_duration / BUFFER_BLOCK_SIZE << endl;
cout << "sf_writer:avg_assemble_us ";
cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl;
}
delete block_buffer;
}
int main (int argc, char *argv[])
{
if (argc != 7) {
cout << endl;
cout << "Usage: sf_writer [output_file] [detector_folder] [n_modules]";
cout << " [start_pulse_id] [stop_pulse_id] [pulse_id_step]";
cout << endl;
cout << "\toutput_file: Complete path to the output file." << endl;
cout << "\tdetector_folder: Absolute path to detector buffer." << endl;
cout << "\tn_modules: number of modules" << endl;
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl;
cout << "\tpulse_id_step: 1==100Hz, 2==50hz, 4==25Hz.." << endl;
cout << endl;
exit(-1);
}
string output_file = string(argv[1]);
const string detector_folder = string(argv[2]);
size_t n_modules = atoi(argv[3]);
uint64_t start_pulse_id = (uint64_t) atoll(argv[4]);
uint64_t stop_pulse_id = (uint64_t) atoll(argv[5]);
int pulse_id_step = atoi(argv[6]);
// Align start (up) and stop(down) pulse_id with pulse_id_step.
if (start_pulse_id % pulse_id_step != 0) {
start_pulse_id += pulse_id_step - (start_pulse_id % pulse_id_step);
}
if (stop_pulse_id % pulse_id_step != 0) {
stop_pulse_id -= (start_pulse_id % pulse_id_step);
}
uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE;
uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE;
// Generate list of buffer blocks that need to be loaded.
std::vector<uint64_t> buffer_blocks;
for (uint64_t i_block=start_block; i_block <= stop_block; i_block++) {
buffer_blocks.push_back(i_block);
}
ImageAssembler image_assembler(n_modules);
std::vector<std::thread> reading_threads(n_modules);
for (size_t i_module=0; i_module<n_modules; i_module++) {
// TODO: Very ugly. Fix.
string module_name = "M";
if (i_module < 10) {
module_name += "0";
}
module_name += to_string(i_module);
reading_threads.emplace_back(
read_buffer,
detector_folder,
module_name,
i_module,
ref(buffer_blocks),
ref(image_assembler));
}
JFH5Writer writer(output_file, detector_folder, n_modules,
start_pulse_id, stop_pulse_id, pulse_id_step);
for (uint64_t block_id:buffer_blocks) {
while(!image_assembler.is_slot_full(block_id)) {
this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS));
}
auto metadata = image_assembler.get_metadata_buffer(block_id);
auto data = image_assembler.get_data_buffer(block_id);
auto start_time = steady_clock::now();
writer.write(metadata, data);
auto end_time = steady_clock::now();
auto write_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
image_assembler.free_slot(block_id);
cout << "sf_writer:avg_write_us ";
cout << write_us_duration / BUFFER_BLOCK_SIZE << endl;
}
for (auto& reading_thread : reading_threads) {
if (reading_thread.joinable()) {
reading_thread.join();
}
}
return 0;
}
-10
View File
@@ -1,10 +0,0 @@
add_executable(sf-writer-tests main.cpp)
target_link_libraries(sf-writer-tests
sf-writer-lib
hdf5
hdf5_hl
hdf5_cpp
zmq
gtest
)
-10
View File
@@ -1,10 +0,0 @@
#include "gtest/gtest.h"
#include "test_JFH5Writer.cpp"
#include "test_ImageAssembler.cpp"
using namespace std;
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
-72
View File
@@ -1,72 +0,0 @@
#ifndef SF_DAQ_BUFFER_DATA_HPP
#define SF_DAQ_BUFFER_DATA_HPP
#include <utility>
#include <memory>
#include "buffer_config.hpp"
auto get_test_block_metadata(
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id,
const int pulse_id_step)
{
using namespace std;
using namespace buffer_config;
auto metadata = make_shared<ImageMetadataBlock>();
uint64_t block_id = start_pulse_id / BUFFER_BLOCK_SIZE;
metadata->block_start_pulse_id = block_id * BUFFER_BLOCK_SIZE;
metadata->block_stop_pulse_id =
metadata->block_start_pulse_id + BUFFER_BLOCK_SIZE - 1;
if (metadata->block_stop_pulse_id < stop_pulse_id) {
throw runtime_error("stop_pulse_id in next block");
}
auto offset = start_pulse_id - metadata->block_start_pulse_id;
for (uint64_t pulse_id = start_pulse_id;
pulse_id <= stop_pulse_id;
pulse_id++, offset++) {
if (pulse_id % pulse_id_step != 0) {
metadata->is_good_image[offset] = 0;
continue;
}
metadata->pulse_id[offset] = pulse_id;
metadata->frame_index[offset] = pulse_id + 10;
metadata->daq_rec[offset] = pulse_id + 100;
metadata->is_good_image[offset] = 1;
}
return metadata;
}
auto get_test_block_data(const size_t n_modules)
{
using namespace std;
using namespace buffer_config;
auto image_buffer = make_unique<uint16_t[]>(
MODULE_N_PIXELS * n_modules * BUFFER_BLOCK_SIZE);
for (int i_block=0; i_block<=BUFFER_BLOCK_SIZE; i_block++) {
for (int i_module=0; i_module<n_modules; i_module++) {
auto offset = i_block * MODULE_N_PIXELS;
offset += i_module * MODULE_N_PIXELS;
for (int i_pixel=0; i_pixel<MODULE_N_PIXELS; i_pixel++) {
image_buffer[offset + i_pixel] = i_pixel % 100;
}
}
}
return image_buffer;
}
#endif //SF_DAQ_BUFFER_DATA_HPP
@@ -1,90 +0,0 @@
#include <iostream>
#include "buffer_config.hpp"
#include "zmq.h"
#include <string>
#include <RingBuffer.hpp>
#include <thread>
#include <chrono>
#include "WriterH5Writer.hpp"
using namespace std;
using namespace core_buffer;
int main (int argc, char *argv[])
{
if (argc != 4) {
cout << endl;
cout << "Usage: sf_writer ";
cout << " [output_file] [start_pulse_id] [stop_pulse_id]";
cout << endl;
cout << "\toutput_file: Complete path to the output file." << endl;
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl;
cout << endl;
exit(-1);
}
string output_file = string(argv[1]);
uint64_t start_pulse_id = (uint64_t) atoll(argv[2]);
uint64_t stop_pulse_id = (uint64_t) atoll(argv[3]);
size_t n_modules = 32;
size_t n_frames = stop_pulse_id - start_pulse_id;
WriterH5Writer writer(output_file, n_frames, n_modules);
// TODO: Remove stats trash.
int i_write = 0;
size_t total_ms = 0;
size_t max_ms = 0;
size_t min_ms = 10000; // 10 seconds should be a safe first value.
auto start_time = chrono::steady_clock::now();
auto metadata = make_shared<ImageMetadata>();
auto data = make_unique<char[]>(MODULE_N_BYTES*n_modules);
auto current_pulse_id = start_pulse_id;
while (current_pulse_id <= stop_pulse_id) {
writer.write(metadata.get(), data.get());
current_pulse_id++;
i_write++;
auto end_time = chrono::steady_clock::now();
// TODO: Some poor statistics.
auto ms_duration = chrono::duration_cast<chrono::milliseconds>(
end_time-start_time).count();
total_ms += ms_duration;
if (ms_duration > max_ms) {
max_ms = ms_duration;
}
if (ms_duration < min_ms) {
min_ms = ms_duration;
}
if (i_write==100) {
cout << "avg_write_ms " << total_ms / 100;
cout << " min_write_ms " << min_ms;
cout << " max_write_ms " << max_ms << endl;
i_write = 0;
total_ms = 0;
max_ms = 0;
min_ms = 0;
}
start_time = chrono::steady_clock::now();
}
writer.close_file();
return 0;
}
-90
View File
@@ -1,90 +0,0 @@
#include <memory>
#include "ImageAssembler.hpp"
#include "gtest/gtest.h"
using namespace std;
using namespace buffer_config;
TEST(ImageAssembler, basic_interaction)
{
size_t n_modules = 3;
uint64_t bunch_id = 0;
ImageAssembler assembler(n_modules);
ASSERT_EQ(assembler.is_slot_free(bunch_id), true);
auto buffer_block = make_unique<BufferBinaryBlock>();
auto buffer_ptr = buffer_block.get();
for (size_t i_module=0; i_module < n_modules; i_module++) {
assembler.process(bunch_id, i_module, buffer_ptr);
}
ASSERT_EQ(assembler.is_slot_full(bunch_id), true);
auto metadata = assembler.get_metadata_buffer(bunch_id);
auto data = assembler.get_data_buffer(bunch_id);
assembler.free_slot(bunch_id);
ASSERT_EQ(assembler.is_slot_free(bunch_id), true);
for (size_t i_pulse = 0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) {
ASSERT_EQ(metadata->is_good_image[i_pulse], 0);
}
}
TEST(ImageAssembler, reconstruction)
{
size_t n_modules = 2;
uint64_t bunch_id = 0;
ImageAssembler assembler(n_modules);
ASSERT_EQ(assembler.is_slot_free(bunch_id), true);
auto buffer_block = make_unique<BufferBinaryBlock>();
auto buffer_ptr = buffer_block.get();
for (size_t i_module=0; i_module < n_modules; i_module++) {
for (size_t i_pulse=0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) {
auto& frame_meta = buffer_block->frame[i_pulse].meta;
frame_meta.pulse_id = 100 + i_pulse;
frame_meta.daq_rec = 1000 + i_pulse;
frame_meta.frame_index = 10000 + i_pulse;
frame_meta.n_recv_packets = JF_N_PACKETS_PER_FRAME;
for (size_t i_pixel=0; i_pixel < MODULE_N_PIXELS; i_pixel++) {
buffer_block->frame[i_pulse].data[i_pixel] =
(i_module * 10) + (i_pixel % 100);
}
}
assembler.process(bunch_id, i_module, buffer_ptr);
}
ASSERT_EQ(assembler.is_slot_full(bunch_id), true);
auto metadata = assembler.get_metadata_buffer(bunch_id);
auto data = assembler.get_data_buffer(bunch_id);
assembler.free_slot(bunch_id);
ASSERT_EQ(assembler.is_slot_free(bunch_id), true);
ASSERT_EQ(metadata->block_start_pulse_id, 0);
ASSERT_EQ(metadata->block_stop_pulse_id, BUFFER_BLOCK_SIZE-1);
for (size_t i_pulse = 0; i_pulse < BUFFER_BLOCK_SIZE; i_pulse++) {
ASSERT_EQ(metadata->pulse_id[i_pulse], 100 + i_pulse);
ASSERT_EQ(metadata->daq_rec[i_pulse], 1000 + i_pulse);
ASSERT_EQ(metadata->frame_index[i_pulse], 10000 + i_pulse);
ASSERT_EQ(metadata->is_good_image[i_pulse], 1);
for (size_t i_module=0; i_module < n_modules; i_module++) {
// TODO: Check assembled image.
}
}
}
-254
View File
@@ -1,254 +0,0 @@
#include <memory>
#include "JFH5Writer.hpp"
#include "gtest/gtest.h"
#include "bitshuffle/bitshuffle.h"
#include "mock/data.hpp"
using namespace std;
using namespace buffer_config;
TEST(JFH5Writer, basic_interaction)
{
size_t n_modules = 2;
uint64_t start_pulse_id = 1;
uint64_t stop_pulse_id = 5;
auto data = make_unique<char[]>(n_modules*MODULE_N_BYTES*BUFFER_BLOCK_SIZE);
auto metadata = make_shared<ImageMetadataBlock>();
// Needed by writer.
metadata->block_start_pulse_id = 0;
metadata->block_stop_pulse_id = BUFFER_BLOCK_SIZE - 1;
JFH5Writer writer("ignore.h5", "detector",
n_modules, start_pulse_id, stop_pulse_id, 1);
writer.write(metadata.get(), data.get());
}
TEST(JFH5Writer, test_writing)
{
size_t n_modules = 2;
uint64_t start_pulse_id = 5;
uint64_t stop_pulse_id = 10;
auto n_images = stop_pulse_id - start_pulse_id + 1;
auto meta = get_test_block_metadata(start_pulse_id, stop_pulse_id, 1);
auto data = get_test_block_data(n_modules);
string detector_name = "detector";
// The writer closes the file on destruction.
{
JFH5Writer writer(
"ignore.h5", detector_name,
n_modules, start_pulse_id, stop_pulse_id, 1);
writer.write(meta.get(), (char*)(&data[0]));
}
H5::H5File reader("ignore.h5", H5F_ACC_RDONLY);
auto image_dataset = reader.openDataSet("/data/detector/data");
image_dataset.read(&data[0], H5::PredType::NATIVE_UINT16);
for (int i_image=0; i_image < n_images; i_image++) {
for (int i_module=0; i_module<n_modules; i_module++) {
auto offset = i_image * MODULE_N_PIXELS;
offset += i_module * MODULE_N_PIXELS;
for (int i_pixel=0; i_pixel<MODULE_N_PIXELS; i_pixel++) {
ASSERT_EQ(data[offset + i_pixel], i_pixel % 100);
}
}
}
auto pulse_id_data = make_unique<uint64_t[]>(n_images);
auto pulse_id_dataset = reader.openDataSet("/data/detector/pulse_id");
pulse_id_dataset.read(&pulse_id_data[0], H5::PredType::NATIVE_UINT64);
auto frame_index_data = make_unique<uint64_t[]>(n_images);
auto frame_index_dataset = reader.openDataSet("/data/detector/frame_index");
frame_index_dataset.read(&frame_index_data[0], H5::PredType::NATIVE_UINT64);
auto daq_rec_data = make_unique<uint32_t[]>(n_images);
auto daq_rec_dataset = reader.openDataSet("/data/detector/daq_rec");
daq_rec_dataset.read(&daq_rec_data[0], H5::PredType::NATIVE_UINT32);
auto is_good_frame_data = make_unique<uint8_t[]>(n_images);
auto is_good_frame_dataset =
reader.openDataSet("/data/detector/is_good_frame");
is_good_frame_dataset.read(
&is_good_frame_data[0], H5::PredType::NATIVE_UINT8);
auto name_dataset = reader.openDataSet("/general/detector_name");
string read_detector_name;
name_dataset.read(read_detector_name, name_dataset.getDataType());
ASSERT_EQ(detector_name, read_detector_name);
for (uint64_t pulse_id=start_pulse_id;
pulse_id<=stop_pulse_id;
pulse_id++) {
ASSERT_EQ(pulse_id_data[pulse_id - start_pulse_id], pulse_id);
ASSERT_EQ(frame_index_data[pulse_id - start_pulse_id], pulse_id + 10);
ASSERT_EQ(daq_rec_data[pulse_id - start_pulse_id], pulse_id + 100);
ASSERT_EQ(is_good_frame_data[pulse_id - start_pulse_id], 1);
}
}
TEST(JFH5Writer, test_step_pulse_id)
{
// Start pulse id (5) larger than stop pulse id (4).
ASSERT_THROW(JFH5Writer writer("ignore.h5", "d", 1 , 5, 4, 1),
runtime_error);
// Start pulse id (5) is equal to stop pulse id (5).
ASSERT_NO_THROW(JFH5Writer writer("ignore.h5", "d", 1, 5, 5, 1));
// The step is exactly on start nad stop pulse id.
ASSERT_NO_THROW(JFH5Writer writer("ignore.h5", "d", 1, 5, 5, 5));
// No pulses in given range with step = 10
ASSERT_THROW(JFH5Writer writer("ignore.h5", "d", 1, 1, 9, 10),
runtime_error);
// Stop pulse id is divisible by step, but start is not.
ASSERT_THROW(JFH5Writer writer("ignore.h5", "d", 1, 5, 10, 10),
runtime_error);
// Start pulse id is divisible by step, but stop is not.
ASSERT_THROW(JFH5Writer writer("ignore.h5", "d", 1, 10, 19, 10),
runtime_error);
// Should be ok.
ASSERT_NO_THROW(JFH5Writer("ignore.h5", "d", 1, 1234, 1234, 1));
// Should be ok.
ASSERT_NO_THROW(JFH5Writer("ignore.h5", "d", 1, 1234, 4567, 1));
// Should be ok.
ASSERT_NO_THROW(JFH5Writer("ignore.h5", "d", 1, 4, 4, 4));
// stop smaller than start.
ASSERT_THROW(JFH5Writer("ignore.h5", "d", 1, 1234, 1233, 1),
runtime_error);
// step is not valid for 100Hz.
ASSERT_THROW(JFH5Writer("ignore.h5", "d", 1, 1234, 1234, 3),
runtime_error);
// start not divisible by step.
ASSERT_THROW(JFH5Writer("ignore.h5", "d", 1, 10, 10, 4),
runtime_error);
// stop not divisible by step
ASSERT_THROW(JFH5Writer("ignore.h5", "d", 1, 8, 10, 4),
runtime_error);
}
void test_writing_with_step(
uint64_t start_pulse_id, uint64_t stop_pulse_id, size_t step)
{
size_t n_modules = 3;
size_t n_images = 1;
n_images += (stop_pulse_id / step);
n_images -= start_pulse_id / step;
auto meta = get_test_block_metadata(start_pulse_id, stop_pulse_id, step);
auto data = get_test_block_data(n_modules);
// Verify the metadata has the layout we want to test (50Hz).
for (size_t i_pulse=0; i_pulse<BUFFER_BLOCK_SIZE; i_pulse++) {
if (i_pulse % step == 0) {
ASSERT_EQ(meta->pulse_id[i_pulse], 500 + i_pulse);
} else {
ASSERT_EQ(meta->pulse_id[i_pulse], 0);
}
}
string path_root = "/path/to/";
string expected_detector_name = "detector";
// The writer closes the file on destruction.
{
JFH5Writer writer(
"ignore.h5", path_root + expected_detector_name,
n_modules, start_pulse_id, stop_pulse_id, step);
writer.write(meta.get(), (char*)(&data[0]));
}
H5::H5File reader("ignore.h5", H5F_ACC_RDONLY);
auto image_dataset = reader.openDataSet("/data/detector/data");
image_dataset.read(&data[0], H5::PredType::NATIVE_UINT16);
hsize_t dims[3];
image_dataset.getSpace().getSimpleExtentDims(dims);
ASSERT_EQ(dims[0], n_images);
ASSERT_EQ(dims[1], n_modules * MODULE_Y_SIZE);
ASSERT_EQ(dims[2], MODULE_X_SIZE);
auto pulse_id_data = make_unique<uint64_t[]>(n_images);
auto pulse_id_dataset = reader.openDataSet("/data/detector/pulse_id");
pulse_id_dataset.read(&pulse_id_data[0], H5::PredType::NATIVE_UINT64);
pulse_id_dataset.getSpace().getSimpleExtentDims(dims);
ASSERT_EQ(dims[0], n_images);
ASSERT_EQ(dims[1], 1);
auto frame_index_data = make_unique<uint64_t[]>(n_images);
auto frame_index_dataset = reader.openDataSet("/data/detector/frame_index");
frame_index_dataset.read(&frame_index_data[0], H5::PredType::NATIVE_UINT64);
frame_index_dataset.getSpace().getSimpleExtentDims(dims);
ASSERT_EQ(dims[0], n_images);
ASSERT_EQ(dims[1], 1);
auto daq_rec_data = make_unique<uint32_t[]>(n_images);
auto daq_rec_dataset = reader.openDataSet("/data/detector/daq_rec");
daq_rec_dataset.read(&daq_rec_data[0], H5::PredType::NATIVE_UINT32);
daq_rec_dataset.getSpace().getSimpleExtentDims(dims);
ASSERT_EQ(dims[0], n_images);
ASSERT_EQ(dims[1], 1);
auto is_good_frame_data = make_unique<uint8_t[]>(n_images);
auto is_good_frame_dataset =
reader.openDataSet("/data/detector/is_good_frame");
is_good_frame_dataset.read(
&is_good_frame_data[0], H5::PredType::NATIVE_UINT8);
is_good_frame_dataset.getSpace().getSimpleExtentDims(dims);
ASSERT_EQ(dims[0], n_images);
ASSERT_EQ(dims[1], 1);
auto name_dataset = reader.openDataSet("/general/detector_name");
string read_detector_name;
name_dataset.read(read_detector_name, name_dataset.getDataType());
ASSERT_EQ(expected_detector_name, read_detector_name);
uint64_t i_pulse = 0;
for (uint64_t pulse_id=start_pulse_id;
pulse_id<=stop_pulse_id;
pulse_id++) {
if (pulse_id % step != 0) {
continue;
}
ASSERT_EQ(pulse_id_data[i_pulse], pulse_id);
ASSERT_EQ(frame_index_data[i_pulse], pulse_id + 10);
ASSERT_EQ(daq_rec_data[i_pulse], pulse_id + 100);
ASSERT_EQ(is_good_frame_data[i_pulse], 1);
i_pulse++;
}
}
TEST(JFH5Writer, test_writing_with_step)
{
// TODO: Write with any number of steps.
// 100Hz
test_writing_with_step(500, 599, 1);
// 50Hz
test_writing_with_step(500, 598, 2);
// 25Hz
test_writing_with_step(500, 596, 4);
// 10Hz
test_writing_with_step(500, 590, 10);
// 1Hz
test_writing_with_step(500, 500, 100);
}