This commit is contained in:
lhdamiani
2021-07-07 14:04:39 +02:00
16 changed files with 114 additions and 112 deletions
+6 -1
View File
@@ -1,4 +1,9 @@
cmake-build-*/
docker/
docs/
scripts/
scripts/
debug.Dockerfile
# Include files:
!docker/example_detector.json
+1 -1
View File
@@ -36,7 +36,7 @@ namespace buffer_config {
// HWM for live stream from buffer.
const int BUFFER_ZMQ_RCVHWM = 100;
// IPC address of the live stream.
const std::string BUFFER_LIVE_IPC_URL = "ipc:///tmp/sf-live-";
const std::string IPC_URL_BASE = "ipc:///tmp/std-daq-";
// Number of image slots in ram buffer - 10 seconds should be enough
const int RAM_BUFFER_N_SLOTS = 100 * 10;
}
+5 -4
View File
@@ -26,13 +26,14 @@ struct ModuleFrame {
#pragma pack(push)
#pragma pack(1)
struct ImageMetadata {
uint64_t version;
uint64_t id;
uint64_t height;
uint64_t width;
uint64_t dtype;
uint64_t encoding;
uint64_t source_id;
uint64_t status;
uint16_t dtype;
uint16_t encoding;
uint16_t source_id;
uint16_t status;
uint64_t user_1;
uint64_t user_2;
};
+2 -2
View File
@@ -94,7 +94,7 @@ void BufferUtils::create_destination_folder(const string& output_file)
void* BufferUtils::connect_socket(
void* ctx, const string& detector_name, const string& stream_name)
{
string ipc_address = buffer_config::BUFFER_LIVE_IPC_URL +
string ipc_address = buffer_config::IPC_URL_BASE +
detector_name + "-" +
stream_name;
@@ -127,7 +127,7 @@ void* BufferUtils::connect_socket(
void* BufferUtils::bind_socket(
void* ctx, const string& detector_name, const string& stream_name)
{
string ipc_address = BUFFER_LIVE_IPC_URL +
string ipc_address = IPC_URL_BASE +
detector_name + "-" +
stream_name;
@@ -4,7 +4,10 @@ COPY . /sf_daq_buffer/
RUN mkdir /sf_daq_buffer/build && \
cd /sf_daq_buffer/build && \
cmake3 .. && \
make std-det-writer
# Build the project
cmake3 .. -DUSE_EIGER=1&& \
make && \
# Deploy the test config.
cp /sf_daq_buffer/docker/example_detector.json .
WORKDIR /sf_daq_buffer/build
+7
View File
@@ -0,0 +1,7 @@
{
"detector_name": "cSAXS.EG01V01",
"detector_type": "eiger",
"n_modules": 4,
"image_n_pixels": 123456,
"start_udp_port": 50000
}
@@ -8,8 +8,8 @@
#include <string>
#include <fstream>
struct UdpRecvConfig {
static UdpRecvConfig from_json_file(const std::string& filename) {
struct DetWriterConfig {
static DetWriterConfig from_json_file(const std::string& filename) {
std::ifstream ifs(filename);
rapidjson::IStreamWrapper isw(ifs);
rapidjson::Document config_parameters;
@@ -17,16 +17,12 @@ struct UdpRecvConfig {
return {
config_parameters["detector_name"].GetString(),
config_parameters["detector_type"].GetString(),
config_parameters["n_modules"].GetInt(),
config_parameters["start_udp_port"].GetInt(),
config_parameters["image_n_pixels"].GetInt(),
};
}
const std::string detector_name;
const std::string detector_type;
const int n_modules;
const int start_udp_port;
const int image_n_pixels;
};
+12 -13
View File
@@ -13,7 +13,6 @@ extern "C" {
class JFH5Writer {
const std::string detector_name_;
const std::string root_folder_;
static const int64_t NO_RUN_ID = -1;
@@ -35,24 +34,24 @@ class JFH5Writer {
void close_file();
public:
explicit JFH5Writer(
const std::string detector_name, const std::string root_folder);
JFH5Writer(std::string detector_name);
~JFH5Writer();
void open_run(int64_t run_id,
uint32_t n_images,
uint32_t image_y_size,
uint32_t image_x_size,
uint32_t bits_per_pixel);
void open_run(const std::string& output_file,
const int run_id,
const int n_images,
const int image_y_size,
const int image_x_size,
const int bits_per_pixel);
void close_run();
void write_data(int64_t run_id,
uint32_t index,
void write_data(const int64_t run_id,
const uint32_t index,
const char* data);
void write_meta(int64_t run_id,
uint32_t index,
const ImageMetadata& meta);
void write_meta(const int64_t run_id,
const uint32_t index,
const ImageMetadata* meta);
};
#endif //JF_LIVE_WRITER_HPP
+2 -3
View File
@@ -1,7 +1,6 @@
#include <cstddef>
#include <formats.hpp>
#include <chrono>
#include "broker_format.hpp"
#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP
#define SF_DAQ_BUFFER_FRAMESTATS_HPP
@@ -23,8 +22,8 @@ class WriterStats {
void print_stats();
public:
explicit WriterStats(std::string detector_name);
void start_run(const StoreStream& meta);
explicit WriterStats(std::string detector_name,
const uint64_t image_n_bytes);
void end_run();
void start_image_write();
void end_image_write();
-20
View File
@@ -1,20 +0,0 @@
#ifndef SF_DAQ_BUFFER_BROKER_FORMAT_HPP
#define SF_DAQ_BUFFER_BROKER_FORMAT_HPP
#include "formats.hpp"
#pragma pack(push)
#pragma pack(1)
struct StoreStream {
ImageMetadata image_metadata;
int64_t run_id;
uint32_t i_image;
uint32_t n_images;
uint32_t image_y_size;
uint32_t image_x_size;
uint32_t bits_per_pixel;
};
#pragma pack(pop)
#endif //SF_DAQ_BUFFER_BROKER_FORMAT_HPP
+14 -17
View File
@@ -1,5 +1,6 @@
#include <H5version.h>
#include <iostream>
#include <utility>
#include "JFH5Writer.hpp"
#include "live_writer_config.hpp"
@@ -15,10 +16,8 @@ using namespace std;
using namespace buffer_config;
using namespace live_writer_config;
JFH5Writer::JFH5Writer(
const std::string detector_name, const std::string root_folder):
detector_name_(detector_name),
root_folder_(root_folder)
JFH5Writer::JFH5Writer(std::string detector_name):
detector_name_(std::move(detector_name))
{
}
@@ -42,22 +41,20 @@ hid_t JFH5Writer::get_datatype(const int bits_per_pixel)
}
}
void JFH5Writer::open_run(const int64_t run_id,
const uint32_t n_images,
const uint32_t image_y_size,
const uint32_t image_x_size,
const uint32_t bits_per_pixel)
void JFH5Writer::open_run(const string& output_file,
const int run_id,
const int n_images,
const int image_y_size,
const int image_x_size,
const int dtype)
{
close_run();
const string output_folder = root_folder_ + "/" + OUTPUT_FOLDER_SYMLINK;
// TODO: Maybe add leading zeros to filename?
const string output_file = output_folder + to_string(run_id) + ".h5";
current_run_id_ = run_id;
image_y_size_ = image_y_size;
image_x_size_ = image_x_size;
bits_per_pixel_ = bits_per_pixel;
// The last digit in the enum value represents the number of bytes/pixel.
bits_per_pixel_ = (dtype % 10) * 8;
image_n_bytes_ = (image_y_size_ * image_x_size_ * bits_per_pixel_) / 8;
#ifdef DEBUG_OUTPUT
@@ -237,7 +234,7 @@ void JFH5Writer::write_data(
}
void JFH5Writer::write_meta(
const int64_t run_id, const uint32_t index, const ImageMetadata& meta)
const int64_t run_id, const uint32_t index, const ImageMetadata* meta)
{
if (run_id != current_run_id_) {
throw runtime_error("Invalid run_id.");
@@ -264,12 +261,12 @@ void JFH5Writer::write_meta(
}
if (H5Dwrite(image_id_dataset_, H5T_NATIVE_UINT64,
ram_ds, file_ds, H5P_DEFAULT, &(meta.id)) < 0) {
ram_ds, file_ds, H5P_DEFAULT, &(meta->id)) < 0) {
throw runtime_error("Cannot write data to pulse_id dataset.");
}
if (H5Dwrite(status_dataset_, H5T_NATIVE_UINT64,
ram_ds, file_ds, H5P_DEFAULT, &(meta.status)) < 0) {
ram_ds, file_ds, H5P_DEFAULT, &(meta->status)) < 0) {
throw runtime_error("Cannot write data to is_good_image dataset.");
}
+3 -9
View File
@@ -5,8 +5,9 @@
using namespace std;
using namespace chrono;
WriterStats::WriterStats(string detector_name) :
detector_name_(std::move(detector_name))
WriterStats::WriterStats(string detector_name, size_t image_n_bytes) :
detector_name_(std::move(detector_name)),
image_n_bytes_(image_n_bytes)
{
reset_counters();
}
@@ -36,13 +37,6 @@ void WriterStats::end_image_write()
max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration);
}
void WriterStats::start_run(const StoreStream& meta)
{
image_n_bytes_ = (meta.image_y_size *
meta.image_x_size *
meta.bits_per_pixel) / 8;
}
void WriterStats::end_run()
{
print_stats();
+45 -24
View File
@@ -7,8 +7,10 @@
#include "BufferUtils.hpp"
#include "live_writer_config.hpp"
#include "WriterStats.hpp"
#include "broker_format.hpp"
#include "JFH5Writer.hpp"
#include "DetWriterConfig.hpp"
#include "rapidjson/document.h"
using namespace std;
using namespace buffer_config;
@@ -16,16 +18,19 @@ using namespace live_writer_config;
int main (int argc, char *argv[])
{
if (argc != 2) {
if (argc != 3) {
cout << endl;
cout << "Usage: jf_live_writer [detector_json_filename]" << endl;
cout << "Usage: std-det-writer [detector_json_filename]"
" [bit_depth]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tbit_depth: bit depth of the incoming udp packets." << endl;
cout << endl;
exit(-1);
}
auto const config = BufferUtils::read_json_config(string(argv[1]));
auto const config = DetWriterConfig::from_json_file(string(argv[1]));
const int bit_depth = atoi(argv[2]);
MPI_Init(nullptr, nullptr);
@@ -38,50 +43,66 @@ int main (int argc, char *argv[])
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, LIVE_ZMQ_IO_THREADS);
auto receiver = BufferUtils::connect_socket(
ctx, config.detector_name, "writer-agent");
ctx, config.detector_name, "writer_agent");
const size_t IMAGE_N_BYTES = 12;
const size_t IMAGE_N_BYTES = config.image_n_pixels * bit_depth / 8;
RamBuffer image_buffer(config.detector_name + "_assembler",
sizeof(ImageMetadata), IMAGE_N_BYTES, 1, RAM_BUFFER_N_SLOTS);
JFH5Writer writer(config);
WriterStats stats(config.detector_name);
JFH5Writer writer(config.detector_name);
WriterStats stats(config.detector_name, IMAGE_N_BYTES);
StoreStream meta = {};
char recv_buffer[8192];
while (true) {
zmq_recv(receiver, &meta, sizeof(meta), 0);
zmq_recv(receiver, &recv_buffer, sizeof(recv_buffer), 0);
// i_image == 0 -> we have a new run.
if (meta.i_image == 0) {
writer.open_run(meta.run_id,
meta.n_images,
meta.image_y_size,
meta.image_x_size,
meta.bits_per_pixel);
stats.start_run(meta);
rapidjson::Document document;
if (document.Parse(recv_buffer).HasParseError()) {
continue;
}
const string output_file = document["output_file"].GetString();
const uint64_t image_id = document["image_id"].GetUint64();
const int run_id = document["run_id"].GetInt();
const int i_image = document["i_image"].GetInt();
const int n_images = document["n_images"].GetInt();
// i_image == n_images -> end of run.
if (meta.i_image == meta.n_images) {
if (i_image == n_images) {
writer.close_run();
stats.end_run();
continue;
}
// i_image == 0 -> we have a new run.
if (i_image == 0) {
auto image_meta = (ImageMetadata*)
image_buffer.get_slot_meta(image_id);
writer.open_run(output_file,
run_id,
n_images,
image_meta->height,
image_meta->width,
image_meta->dtype);
}
// Fair distribution of images among writers.
if (meta.i_image % n_writers == i_writer) {
char* data = ram_buffer.get_slot_data(meta.image_metadata.id);
if (i_image % n_writers == i_writer) {
char* data = image_buffer.get_slot_data(image_id);
stats.start_image_write();
writer.write_data(meta.run_id, meta.i_image, data);
writer.write_data(run_id, i_image, data);
stats.end_image_write();
}
// Only the first instance writes metadata.
if (i_writer == 0) {
writer.write_meta(meta.run_id, meta.i_image, meta.image_metadata);
auto image_meta = (ImageMetadata*)
image_buffer.get_slot_meta(image_id);
writer.write_meta(run_id, i_image, image_meta);
}
}
+3 -3
View File
@@ -1,7 +1,7 @@
add_executable(jf-live-writer-tests main.cpp)
add_executable(std-det-writer-tests main.cpp)
target_link_libraries(jf-live-writer-tests
jf-live-writer-lib
target_link_libraries(std-det-writer-tests
std-det-writer-lib
zmq
rt
gtest
@@ -19,14 +19,12 @@ struct StreamSendConfig {
config_parameters["detector_name"].GetString(),
config_parameters["n_modules"].GetInt(),
config_parameters["image_n_pixels"].GetInt(),
config_parameters["stream_address"].GetString()
};
}
const std::string detector_name;
const int n_modules;
const int image_n_pixels;
const std::string stream_address;
};
+5 -3
View File
@@ -13,12 +13,13 @@ using namespace buffer_config;
int main (int argc, char *argv[])
{
if (argc != 3) {
if (argc != 4) {
cout << endl;
cout << "Usage: std_stream_send [detector_json_filename]"
" [bit_depth]" << endl;
" [bit_depth] [stream_address]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tbit_depth: bit depth of the incoming udp packets." << endl;
cout << "\tstream_address: address to bind the output stream." << endl;
cout << endl;
exit(-1);
@@ -26,6 +27,7 @@ int main (int argc, char *argv[])
const auto config = StreamSendConfig::from_json_file(string(argv[1]));
const int bit_depth = atoi(argv[2]);
const string stream_address = string(argv[3]);
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
@@ -43,7 +45,7 @@ int main (int argc, char *argv[])
throw runtime_error(zmq_strerror(errno));
}
if (zmq_bind(sender, config.stream_address.c_str()) != 0) {
if (zmq_bind(sender, stream_address.c_str()) != 0) {
throw runtime_error(zmq_strerror(errno));
}