Merge branch 'master' into eiger

This commit is contained in:
Hax Damiani Leonardo
2021-04-26 16:10:43 +02:00
11 changed files with 198 additions and 82 deletions
+1
View File
@@ -17,6 +17,7 @@ Documentation of individual components:
- [sf-stream](sf-stream) (Live streaming of detector data)
- [sf-writer](sf-writer) (Read from buffer and write H5)
- [sf-utils](sf-utils) (Small utilities for debugging and testing)
- [jf-live-writer](jf-live-writer) (Live writer to high performance detectors)
## Design goals
Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 104 KiB

File diff suppressed because one or more lines are too long
+133 -7
View File
@@ -1,11 +1,124 @@
# jf-live-writer
The jf-live-writer is packaged as a Docker container for development and
testing.
This component is a PHDF5 based MPI writer for high performance detectors
that need more than 2GB/s of write speed. It parallelizes the HDF5 writing to
multiple processes in order to overcome the cca. 3GB/s single stream
write limit on GPFS.
# Using the docker container
It expects an input ZMQ stream that contains metadata about what and where to
write and access to the RamBuffer of the images one wants to write.
This writer is stateless but tied to a specific detector. There are no
configuration states, all the information for writing files is expected to
come from the input metadata stream. The images data is directly taken from the
existing buffer without additional memory copies.
## Overview
![image_livewriter_overview](../docs/sf_daq_buffer-overview-LiveWriter.jpg)
The ZMQ store stream is a PUB/SUB stream that gets distributed to all
ranks. This stream caries both ImageMetadata information and file writing
metadata and is generated and sent by the writer agent.
For writing image data, each process decides based on its rank if the
particular store stream message is for him and writes the requested image.
For writing image metadata, the rank 0 is always responsible. The metadata is
always written only by the process with **RANK = 0**.
When to start writing a new file or when to close an existing one is also
decided based on the metadata in the store stream. There is no state machine
in the writer, but which action to take is based solely on the received
metadata. This saves us the need to have inter-rank communication and makes
for a more simple writer.
### ZMQ Store Stream format
This stream is composed by 2 parts. The first part is the already known
**ImageMetadata** the **jf-assembler** provides, and the second part is
provided by the **writer-agent**.
Each message in the stream has this format:
```c++
#pragma pack(push)
#pragma pack(1)
struct StoreStream {
ImageMetadata image_metadata;
int64_t run_id;
uint32_t i_image;
uint32_t n_images;
uint32_t image_y_size;
uint32_t image_x_size;
uint32_t op_code;
uint32_t bits_per_pixel;
};
#pragma pack(pop)
#pragma pack(push)
#pragma pack(1)
struct ImageMetadata {
uint64_t pulse_id;
uint64_t frame_index;
uint32_t daq_rec;
uint32_t is_good_image;
};
#pragma pack(pop)
```
#### StoreStream
| Name | Type | Comment |
| --- | --- | --- |
| run_id | int64 | Run id used to construct the output file name. |
| i_image | uint32_t | Current image index inside this run. |
| n_images | uint32_t | Total number of images in this run. |
| image_y_size | uint32_t | Y image size in pixels. |
| image_x_size | uint32_t | X image size in pixels. |
| op_code | uint32_t | State transition information for the writer. |
| bits_per_pixel | uint32_t | How many bits does 1 pixel have. 8, 16 or 32. |
Some details regarding how this fields are used:
- **run\_id**: Currently the output file name is simply **[run\_id].h5**.
- **i\_images**: Based on this each rank decides if the received message is for
itself and needs to write the corresponding image data to file.
- **op_code**: This is used to steer the file writing and to avoid the need
for a state machine:
- op_code = 0 (Continue - just write to the same file as you already are)
- op_code = 1 (Start - create a new file for this run_id)
- op_code = 2 (Stop - close the current file)
Since the writer is relying on the correct sequence of messages in the input
stream instead of having an internal state machine,
the input stream must always follow a valid pattern of messages:
![image_store_stream](../docs/sf_daq_buffer-StoreStream.jpg)
The sequence must always follow:
- op_code = 1 (also a new run_id if you do not want to overwrite the previous file)
- op_code = 0 (same run_id as the last message)
- op_code = 2 (same run_id as the last message)
- etc.
In case the sequence is broken (wrong send order from the writer agent or lost
messages, etc.) the writer will ignore the received message. An operational
state can be restored by sending a **op\_code = 2** message.
Images are written only when op_code = 0, meaning that the start and stop
message are not used for writing data. This allows to send the stop message
in case we need to reset the writer status at any point.
#### ImageMetadata
This comes from jf_assembler without modifications for a particular
image.
## Build
### Build inside docker
The easiest way to build and test the jf-live-writer is to use the
provided docker container. You need to start it from the project **root**:
provided docker container. You need to start building it
from the project **root**:
```bash
docker build -f jf-live-writer/debug.Dockerfile -t jf-live-writer .
@@ -13,9 +126,22 @@ docker build -f jf-live-writer/debug.Dockerfile -t jf-live-writer .
(Running this command from the project root is mandatory as the entire project
folder needs to be part of the build context.)
# Build on your local machine
This will copy your current working directory to the image and build the
jf-live-writer. Once you've dont this, you can start
## Building
### Build on your machine
In addition to the libraries needed for sf_daq, you need **mpich** installed:
```bash
yum install mpich-devel
ln -v -s /usr/include/mpich-x86_64/* /usr/include/
```
Making the soft links for mpich headers to your /usr/include is
necessary due to HDF5.
#### Building with cmake
In order to build this executable you need to specify the cmake variable
```
cmake3 -DBUILD_JF_LIVE_WRITER=ON
@@ -23,7 +149,7 @@ cmake3 -DBUILD_JF_LIVE_WRITER=ON
The project will not build if you do not have installed the PHDF5 library.
Please follow instructions below on how to do that manually.
## Install PHDF5
#### Install PHDF5
```
wget https://support.hdfgroup.org/ftp/HDF5/releases/hdf5-1.12/hdf5-1.12.0/src/hdf5-1.12.0.tar.gz
tar -xzf hdf5-1.12.0.tar.gz
+15 -15
View File
@@ -1,5 +1,5 @@
#ifndef SFWRITER_HPP
#define SFWRITER_HPP
#ifndef JF_LIVE_WRITER_HPP
#define JF_LIVE_WRITER_HPP
#include <memory>
#include <string>
@@ -32,28 +32,28 @@ class JFH5Writer {
hid_t daq_rec_dataset_id_ = -1;
hid_t is_good_dataset_id_ = -1;
hid_t get_datatype(const int bits_per_pixel);
void open_file(const std::string& output_file, const uint32_t n_images);
static hid_t get_datatype(int bits_per_pixel);
void open_file(const std::string& output_file, uint32_t n_images);
void close_file();
public:
JFH5Writer(const BufferUtils::DetectorConfig config);
explicit JFH5Writer(const BufferUtils::DetectorConfig& config);
~JFH5Writer();
void open_run(const int64_t run_id,
const uint32_t n_images,
const uint32_t image_y_size,
const uint32_t image_x_size,
const uint32_t bits_per_pixel);
void open_run(int64_t run_id,
uint32_t n_images,
uint32_t image_y_size,
uint32_t image_x_size,
uint32_t bits_per_pixel);
void close_run();
void write_data(const int64_t run_id,
const uint32_t index,
void write_data(int64_t run_id,
uint32_t index,
const char* data);
void write_meta(const int64_t run_id,
const uint32_t index,
void write_meta(int64_t run_id,
uint32_t index,
const ImageMetadata& meta);
};
#endif //SFWRITER_HPP
#endif //JF_LIVE_WRITER_HPP
+8 -10
View File
@@ -9,25 +9,23 @@
class WriterStats {
const std::string detector_name_;
const size_t stats_modulo_;
uint32_t image_n_bytes_;
uint32_t image_n_bytes_{};
int image_counter_;
uint64_t total_bytes_;
int image_counter_{};
uint64_t total_bytes_{};
uint32_t total_buffer_write_us_;
uint32_t max_buffer_write_us_;
uint32_t total_buffer_write_us_{};
uint32_t max_buffer_write_us_{};
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
void reset_counters();
void print_stats();
public:
WriterStats(
const std::string &detector_name,
const size_t stats_modulo);
void setup_run(const StoreStream& meta);
explicit WriterStats(std::string detector_name);
void start_run(const StoreStream& meta);
void end_run();
void start_image_write();
void end_image_write();
};
+2 -5
View File
@@ -3,21 +3,18 @@
#include "formats.hpp"
const static uint8_t OP_START = 1;
const static uint8_t OP_END = 2;
#pragma pack(push)
#pragma pack(1)
struct StoreStream {
ImageMetadata image_metadata;
int64_t run_id;
uint32_t i_image;
uint32_t n_images;
uint32_t image_y_size;
uint32_t image_x_size;
uint32_t op_code;
uint32_t bits_per_pixel;
ImageMetadata image_metadata;
};
#pragma pack(pop)
#endif //SF_DAQ_BUFFER_BROKER_FORMAT_HPP
+6 -10
View File
@@ -1,11 +1,7 @@
#include "JFH5Writer.hpp"
#include <sstream>
#include <cstring>
#include <H5version.h>
#include <iostream>
#include "JFH5Writer.hpp"
#include "live_writer_config.hpp"
#include "buffer_config.hpp"
#include "formats.hpp"
@@ -19,7 +15,7 @@ using namespace std;
using namespace buffer_config;
using namespace live_writer_config;
JFH5Writer::JFH5Writer(const BufferUtils::DetectorConfig config):
JFH5Writer::JFH5Writer(const BufferUtils::DetectorConfig& config):
root_folder_(config.buffer_folder),
detector_name_(config.detector_name)
{
@@ -140,14 +136,14 @@ void JFH5Writer::open_file(const string& output_file, const uint32_t n_images)
}
hsize_t image_dataset_dims[] = {n_images, image_y_size_, image_x_size_};
auto image_space_id = H5Screate_simple(3, image_dataset_dims, NULL);
auto image_space_id = H5Screate_simple(3, image_dataset_dims, nullptr);
if (image_space_id < 0) {
throw runtime_error("Cannot create image dataset space.");
}
// TODO: Enable compression.
// bshuf_register_h5filter();
// uint filter_prop[] = {PIXEL_N_BYTES, BSHUF_H5_COMPRESS_LZ4};
// uint filter_prop[] = {0, BSHUF_H5_COMPRESS_LZ4};
// if (H5Pset_filter(dcpl_id, BSHUF_H5FILTER, H5Z_FLAG_MANDATORY,
// 2, filter_prop) < 0) {
// throw runtime_error("Cannot set compression filter on dataset.");
@@ -162,12 +158,12 @@ void JFH5Writer::open_file(const string& output_file, const uint32_t n_images)
// Create metadata datasets.
hsize_t meta_dataset_dims[] = {n_images};
auto meta_space_id = H5Screate_simple(1, meta_dataset_dims, NULL);
auto meta_space_id = H5Screate_simple(1, meta_dataset_dims, nullptr);
if (meta_space_id < 0) {
throw runtime_error("Cannot create meta dataset space.");
}
auto create_meta_dataset = [&](string name, hid_t data_type) {
auto create_meta_dataset = [&](const string& name, hid_t data_type) {
auto dataset_id = H5Dcreate(
data_group_id, name.c_str(), data_type, meta_space_id,
H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
+15 -16
View File
@@ -1,14 +1,12 @@
#include <iostream>
#include <utility>
#include "WriterStats.hpp"
using namespace std;
using namespace chrono;
WriterStats::WriterStats(
const string& detector_name,
const size_t stats_modulo) :
detector_name_(detector_name),
stats_modulo_(stats_modulo)
WriterStats::WriterStats(string detector_name) :
detector_name_(std::move(detector_name))
{
reset_counters();
}
@@ -26,13 +24,6 @@ void WriterStats::start_image_write()
stats_interval_start_ = steady_clock::now();
}
void WriterStats::setup_run(const StoreStream& meta)
{
image_n_bytes_ = (meta.image_y_size *
meta.image_x_size *
meta.bits_per_pixel) / 8;
}
void WriterStats::end_image_write()
{
image_counter_++;
@@ -43,11 +34,19 @@ void WriterStats::end_image_write()
total_buffer_write_us_ += write_us_duration;
max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration);
}
if (image_counter_ == stats_modulo_) {
print_stats();
reset_counters();
}
void WriterStats::start_run(const StoreStream& meta)
{
image_n_bytes_ = (meta.image_y_size *
meta.image_x_size *
meta.bits_per_pixel) / 8;
}
void WriterStats::end_run()
{
print_stats();
reset_counters();
}
void WriterStats::print_stats()
+17 -18
View File
@@ -1,14 +1,14 @@
#include <iostream>
#include <string>
#include <zmq.h>
#include <RamBuffer.hpp>
#include <BufferUtils.hpp>
#include <mpi.h>
#include "RamBuffer.hpp"
#include "BufferUtils.hpp"
#include "live_writer_config.hpp"
#include "WriterStats.hpp"
#include "broker_format.hpp"
#include <mpi.h>
#include <JFH5Writer.hpp>
#include "JFH5Writer.hpp"
using namespace std;
using namespace buffer_config;
@@ -27,7 +27,7 @@ int main (int argc, char *argv[])
auto const config = BufferUtils::read_json_config(string(argv[1]));
MPI_Init(NULL, NULL);
MPI_Init(nullptr, nullptr);
int n_writers;
MPI_Comm_size(MPI_COMM_WORLD, &n_writers);
@@ -43,27 +43,21 @@ int main (int argc, char *argv[])
RamBuffer ram_buffer(config.detector_name, config.n_modules);
JFH5Writer writer(config);
WriterStats stats(config.detector_name, STATS_MODULO);
WriterStats stats(config.detector_name);
StoreStream meta = {};
while (true) {
zmq_recv(receiver, &meta, sizeof(meta), 0);
if (meta.op_code == OP_START) {
// i_image == 0 -> we have a new run.
if (meta.i_image == 0) {
writer.open_run(meta.run_id,
meta.n_images,
meta.image_y_size,
meta.image_x_size,
meta.bits_per_pixel);
stats.setup_run(meta);
continue;
}
if (meta.op_code == OP_END) {
writer.close_run();
continue;
stats.start_run(meta);
}
// Fair distribution of images among writers.
@@ -79,7 +73,12 @@ int main (int argc, char *argv[])
if (i_writer == 0) {
writer.write_meta(meta.run_id, meta.i_image, meta.image_metadata);
}
}
MPI_Finalize();
// i_image + 1 == meta.n_images -> we received the last image.
if (meta.i_image+1 == meta.n_images) {
writer.close_run();
stats.end_run();
}
}
}