mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-06-06 11:58:41 +02:00
Merge branch 'eiger' of https://github.com/paulscherrerinstitute/sf_daq_buffer into eiger
This commit is contained in:
+1
-1
@@ -38,5 +38,5 @@ add_subdirectory("std-udp-sync")
|
||||
add_subdirectory("jf-assembler")
|
||||
#add_subdirectory("sf-stream")
|
||||
#add_subdirectory("sf-writer")
|
||||
add_subdirectory("jf-live-writer")
|
||||
add_subdirectory("std-det-writer")
|
||||
|
||||
|
||||
@@ -1,38 +0,0 @@
|
||||
find_package(MPI REQUIRED)
|
||||
# Because of openmpi.
|
||||
add_definitions(-DOMPI_SKIP_MPICXX)
|
||||
|
||||
file(GLOB SOURCES
|
||||
src/*.cpp)
|
||||
|
||||
add_library(jf-live-writer-lib STATIC ${SOURCES})
|
||||
target_include_directories(jf-live-writer-lib
|
||||
PUBLIC include/
|
||||
SYSTEM ${MPI_INCLUDE_PATH})
|
||||
|
||||
target_link_libraries(jf-live-writer-lib
|
||||
external
|
||||
core-buffer-lib
|
||||
${MPI_LIBRARIES}
|
||||
)
|
||||
|
||||
add_executable(jf-live-writer src/main.cpp)
|
||||
|
||||
|
||||
if (USE_EIGER)
|
||||
set (LIB_NAME_UDP_RECV "eiger_live_writer")
|
||||
else()
|
||||
set (LIB_NAME_UDP_RECV "jf_live_writer")
|
||||
endif()
|
||||
|
||||
set_target_properties(jf-live-writer PROPERTIES OUTPUT_NAME ${LIB_NAME_UDP_RECV})
|
||||
|
||||
target_link_libraries(jf-live-writer
|
||||
jf-live-writer-lib
|
||||
zmq
|
||||
hdf5
|
||||
rt
|
||||
)
|
||||
|
||||
enable_testing()
|
||||
add_subdirectory(test/)
|
||||
@@ -0,0 +1,31 @@
|
||||
find_package(MPI REQUIRED)
|
||||
# Because of openmpi.
|
||||
add_definitions(-DOMPI_SKIP_MPICXX)
|
||||
|
||||
file(GLOB SOURCES
|
||||
src/*.cpp)
|
||||
|
||||
add_library(std-det-writer-lib STATIC ${SOURCES})
|
||||
target_include_directories(std-det-writer-lib
|
||||
PUBLIC include/
|
||||
SYSTEM ${MPI_INCLUDE_PATH})
|
||||
|
||||
target_link_libraries(std-det-writer-lib
|
||||
external
|
||||
core-buffer-lib
|
||||
${MPI_LIBRARIES}
|
||||
)
|
||||
|
||||
add_executable(std-det-writer src/main.cpp)
|
||||
|
||||
set_target_properties(std-det-writer PROPERTIES OUTPUT_NAME std_det_writer)
|
||||
|
||||
target_link_libraries(std-det-writer
|
||||
std-det-writer-lib
|
||||
zmq
|
||||
hdf5
|
||||
rt
|
||||
)
|
||||
|
||||
enable_testing()
|
||||
add_subdirectory(test/)
|
||||
@@ -1,4 +1,4 @@
|
||||
# jf-live-writer
|
||||
# std-det-writer
|
||||
|
||||
This component is a PHDF5 based MPI writer for high performance detectors
|
||||
that need more than 2GB/s of write speed. It parallelizes the HDF5 writing to
|
||||
@@ -4,7 +4,7 @@ COPY . /sf_daq_buffer/
|
||||
|
||||
RUN mkdir /sf_daq_buffer/build && \
|
||||
cd /sf_daq_buffer/build && \
|
||||
cmake3 -DBUILD_JF_LIVE_WRITER=ON .. && \
|
||||
make jf-live-writer
|
||||
cmake3 .. && \
|
||||
make std-det-writer
|
||||
|
||||
WORKDIR /sf_daq_buffer/build
|
||||
@@ -12,8 +12,8 @@ extern "C" {
|
||||
|
||||
class JFH5Writer {
|
||||
|
||||
const std::string root_folder_;
|
||||
const std::string detector_name_;
|
||||
const std::string root_folder_;
|
||||
|
||||
static const int64_t NO_RUN_ID = -1;
|
||||
|
||||
@@ -26,18 +26,17 @@ class JFH5Writer {
|
||||
|
||||
// Open file specific variables.
|
||||
hid_t file_id_ = -1;
|
||||
hid_t image_dataset_id_ = -1;
|
||||
hid_t pulse_dataset_id_= -1;
|
||||
hid_t frame_dataset_id_ = -1;
|
||||
hid_t daq_rec_dataset_id_ = -1;
|
||||
hid_t is_good_dataset_id_ = -1;
|
||||
hid_t image_data_dataset_ = -1;
|
||||
hid_t image_id_dataset_ = -1;
|
||||
hid_t status_dataset_ = -1;
|
||||
|
||||
static hid_t get_datatype(int bits_per_pixel);
|
||||
void open_file(const std::string& output_file, uint32_t n_images);
|
||||
void close_file();
|
||||
|
||||
public:
|
||||
explicit JFH5Writer(const BufferUtils::DetectorConfig& config);
|
||||
explicit JFH5Writer(
|
||||
const std::string detector_name, const std::string root_folder);
|
||||
~JFH5Writer();
|
||||
|
||||
void open_run(int64_t run_id,
|
||||
@@ -0,0 +1,33 @@
|
||||
#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
|
||||
#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
|
||||
|
||||
|
||||
#include <rapidjson/istreamwrapper.h>
|
||||
#include <rapidjson/document.h>
|
||||
#include <rapidjson/stringbuffer.h>
|
||||
#include <string>
|
||||
#include <fstream>
|
||||
|
||||
struct UdpRecvConfig {
|
||||
static UdpRecvConfig from_json_file(const std::string& filename) {
|
||||
std::ifstream ifs(filename);
|
||||
rapidjson::IStreamWrapper isw(ifs);
|
||||
rapidjson::Document config_parameters;
|
||||
config_parameters.ParseStream(isw);
|
||||
|
||||
return {
|
||||
config_parameters["detector_name"].GetString(),
|
||||
config_parameters["detector_type"].GetString(),
|
||||
config_parameters["n_modules"].GetInt(),
|
||||
config_parameters["start_udp_port"].GetInt(),
|
||||
};
|
||||
}
|
||||
|
||||
const std::string detector_name;
|
||||
const std::string detector_type;
|
||||
const int n_modules;
|
||||
const int start_udp_port;
|
||||
};
|
||||
|
||||
|
||||
#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
|
||||
@@ -15,9 +15,10 @@ using namespace std;
|
||||
using namespace buffer_config;
|
||||
using namespace live_writer_config;
|
||||
|
||||
JFH5Writer::JFH5Writer(const BufferUtils::DetectorConfig& config):
|
||||
root_folder_(config.buffer_folder),
|
||||
detector_name_(config.detector_name)
|
||||
JFH5Writer::JFH5Writer(
|
||||
const std::string detector_name, const std::string root_folder):
|
||||
detector_name_(detector_name),
|
||||
root_folder_(root_folder)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -149,10 +150,10 @@ void JFH5Writer::open_file(const string& output_file, const uint32_t n_images)
|
||||
// throw runtime_error("Cannot set compression filter on dataset.");
|
||||
// }
|
||||
|
||||
image_dataset_id_ = H5Dcreate(
|
||||
image_data_dataset_ = H5Dcreate(
|
||||
data_group_id, "data", get_datatype(bits_per_pixel_),
|
||||
image_space_id, H5P_DEFAULT, dcpl_id, H5P_DEFAULT);
|
||||
if (image_dataset_id_ < 0) {
|
||||
if (image_data_dataset_ < 0) {
|
||||
throw runtime_error("Cannot create image dataset.");
|
||||
}
|
||||
|
||||
@@ -174,10 +175,8 @@ void JFH5Writer::open_file(const string& output_file, const uint32_t n_images)
|
||||
return dataset_id;
|
||||
};
|
||||
|
||||
pulse_dataset_id_ = create_meta_dataset("pulse_id", H5T_NATIVE_UINT64);
|
||||
frame_dataset_id_ = create_meta_dataset("frame_index", H5T_NATIVE_UINT64);
|
||||
daq_rec_dataset_id_ = create_meta_dataset("daq_rec", H5T_NATIVE_UINT32);
|
||||
is_good_dataset_id_ = create_meta_dataset("is_good_frame", H5T_NATIVE_UINT8);
|
||||
image_id_dataset_ = create_meta_dataset("image_id", H5T_NATIVE_UINT64);
|
||||
status_dataset_ = create_meta_dataset("status", H5T_NATIVE_UINT64);
|
||||
|
||||
H5Sclose(meta_space_id);
|
||||
H5Sclose(image_space_id);
|
||||
@@ -191,20 +190,11 @@ void JFH5Writer::close_file()
|
||||
return;
|
||||
}
|
||||
|
||||
H5Dclose(image_dataset_id_);
|
||||
image_dataset_id_ = -1;
|
||||
H5Dclose(image_id_dataset_);
|
||||
image_id_dataset_ = -1;
|
||||
|
||||
H5Dclose(pulse_dataset_id_);
|
||||
pulse_dataset_id_ = -1;
|
||||
|
||||
H5Dclose(frame_dataset_id_);
|
||||
frame_dataset_id_ = -1;
|
||||
|
||||
H5Dclose(daq_rec_dataset_id_);
|
||||
daq_rec_dataset_id_ = -1;
|
||||
|
||||
H5Dclose(is_good_dataset_id_);
|
||||
is_good_dataset_id_ = -1;
|
||||
H5Dclose(status_dataset_);
|
||||
status_dataset_ = -1;
|
||||
|
||||
H5Fclose(file_id_);
|
||||
file_id_ = -1;
|
||||
@@ -223,7 +213,7 @@ void JFH5Writer::write_data(
|
||||
throw runtime_error("Cannot create image ram dataspace.");
|
||||
}
|
||||
|
||||
auto file_ds = H5Dget_space(image_dataset_id_);
|
||||
auto file_ds = H5Dget_space(image_data_dataset_);
|
||||
if (file_ds < 0) {
|
||||
throw runtime_error("Cannot get image dataset file dataspace.");
|
||||
}
|
||||
@@ -237,8 +227,8 @@ void JFH5Writer::write_data(
|
||||
throw runtime_error("Cannot select image dataset file hyperslab.");
|
||||
}
|
||||
|
||||
if (H5Dwrite(image_dataset_id_, get_datatype(bits_per_pixel_),
|
||||
ram_ds, file_ds, H5P_DEFAULT, data) < 0) {
|
||||
if (H5Dwrite(image_data_dataset_, get_datatype(bits_per_pixel_),
|
||||
ram_ds, file_ds, H5P_DEFAULT, data) < 0) {
|
||||
throw runtime_error("Cannot write data to image dataset.");
|
||||
}
|
||||
|
||||
@@ -259,7 +249,7 @@ void JFH5Writer::write_meta(
|
||||
throw runtime_error("Cannot create metadata ram dataspace.");
|
||||
}
|
||||
|
||||
auto file_ds = H5Dget_space(pulse_dataset_id_);
|
||||
auto file_ds = H5Dget_space(image_id_dataset_);
|
||||
if (file_ds < 0) {
|
||||
throw runtime_error("Cannot get metadata dataset file dataspace.");
|
||||
}
|
||||
@@ -273,23 +263,13 @@ void JFH5Writer::write_meta(
|
||||
throw runtime_error("Cannot select metadata dataset file hyperslab.");
|
||||
}
|
||||
|
||||
if (H5Dwrite(pulse_dataset_id_, H5T_NATIVE_UINT64,
|
||||
ram_ds, file_ds, H5P_DEFAULT, &(meta.pulse_id)) < 0) {
|
||||
if (H5Dwrite(image_id_dataset_, H5T_NATIVE_UINT64,
|
||||
ram_ds, file_ds, H5P_DEFAULT, &(meta.id)) < 0) {
|
||||
throw runtime_error("Cannot write data to pulse_id dataset.");
|
||||
}
|
||||
|
||||
if (H5Dwrite(frame_dataset_id_, H5T_NATIVE_UINT64,
|
||||
ram_ds, file_ds, H5P_DEFAULT, &(meta.frame_index)) < 0) {
|
||||
throw runtime_error("Cannot write data to frame_index dataset.");
|
||||
}
|
||||
|
||||
if (H5Dwrite(daq_rec_dataset_id_, H5T_NATIVE_UINT32,
|
||||
ram_ds, file_ds, H5P_DEFAULT, &(meta.daq_rec)) < 0) {
|
||||
throw runtime_error("Cannot write data to daq_rec dataset.");
|
||||
}
|
||||
|
||||
if (H5Dwrite(is_good_dataset_id_, H5T_NATIVE_UINT32,
|
||||
ram_ds, file_ds, H5P_DEFAULT, &(meta.is_good_image)) < 0) {
|
||||
if (H5Dwrite(status_dataset_, H5T_NATIVE_UINT64,
|
||||
ram_ds, file_ds, H5P_DEFAULT, &(meta.status)) < 0) {
|
||||
throw runtime_error("Cannot write data to is_good_image dataset.");
|
||||
}
|
||||
|
||||
@@ -40,7 +40,9 @@ int main (int argc, char *argv[])
|
||||
auto receiver = BufferUtils::connect_socket(
|
||||
ctx, config.detector_name, "writer-agent");
|
||||
|
||||
RamBuffer ram_buffer(config.detector_name, config.n_modules);
|
||||
const size_t IMAGE_N_BYTES = 12;
|
||||
RamBuffer image_buffer(config.detector_name + "_assembler",
|
||||
sizeof(ImageMetadata), IMAGE_N_BYTES, 1, RAM_BUFFER_N_SLOTS);
|
||||
|
||||
JFH5Writer writer(config);
|
||||
WriterStats stats(config.detector_name);
|
||||
@@ -60,9 +62,17 @@ int main (int argc, char *argv[])
|
||||
stats.start_run(meta);
|
||||
}
|
||||
|
||||
// i_image == n_images -> end of run.
|
||||
if (meta.i_image == meta.n_images) {
|
||||
writer.close_run();
|
||||
|
||||
stats.end_run();
|
||||
continue;
|
||||
}
|
||||
|
||||
// Fair distribution of images among writers.
|
||||
if (meta.i_image % n_writers == i_writer) {
|
||||
char* data = ram_buffer.get_slot_data(meta.image_metadata.pulse_id);
|
||||
char* data = ram_buffer.get_slot_data(meta.image_metadata.id);
|
||||
|
||||
stats.start_image_write();
|
||||
writer.write_data(meta.run_id, meta.i_image, data);
|
||||
@@ -74,11 +84,5 @@ int main (int argc, char *argv[])
|
||||
writer.write_meta(meta.run_id, meta.i_image, meta.image_metadata);
|
||||
}
|
||||
|
||||
// i_image + 1 == meta.n_images -> we received the last image.
|
||||
if (meta.i_image+1 == meta.n_images) {
|
||||
writer.close_run();
|
||||
|
||||
stats.end_run();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user