mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-08 09:42:05 +02:00
wip std det writer for a gigafrost stream
This commit is contained in:
@@ -56,6 +56,11 @@ namespace BufferUtils
|
||||
const std::string& detector_name,
|
||||
const std::string& stream_name);
|
||||
|
||||
void* connect_socket_gf(
|
||||
void* ctx,
|
||||
const std::string& detector_name,
|
||||
const std::string& stream_name);
|
||||
|
||||
DetectorConfig read_json_config(const std::string& filename);
|
||||
}
|
||||
|
||||
|
||||
@@ -124,6 +124,41 @@ void* BufferUtils::connect_socket(
|
||||
return socket;
|
||||
}
|
||||
|
||||
void* BufferUtils::connect_socket_gf(
|
||||
void* ctx, const string& detector_name, const string& stream_name)
|
||||
{
|
||||
// string ipc_address = buffer_config::IPC_URL_BASE +
|
||||
// detector_name + "-" +
|
||||
// stream_name;
|
||||
|
||||
string ipc_address = stream_name;
|
||||
|
||||
void* socket = zmq_socket(ctx, ZMQ_SUB);
|
||||
if (socket == nullptr) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
int rcvhwm = BUFFER_ZMQ_RCVHWM;
|
||||
if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
int linger = 0;
|
||||
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
if (zmq_connect(socket, ipc_address.c_str()) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) {
|
||||
throw runtime_error(zmq_strerror(errno));
|
||||
}
|
||||
|
||||
return socket;
|
||||
}
|
||||
|
||||
void* BufferUtils::bind_socket(
|
||||
void* ctx, const string& detector_name, const string& stream_name)
|
||||
{
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
{
|
||||
"detector_name": "GF1",
|
||||
"detector_type": "gf",
|
||||
"n_modules": 16,
|
||||
"image_height": 1200,
|
||||
"image_width": 2016,
|
||||
"start_udp_port": 50200
|
||||
}
|
||||
@@ -52,6 +52,11 @@ public:
|
||||
void write_meta(const int64_t run_id,
|
||||
const uint32_t index,
|
||||
const ImageMetadata* meta);
|
||||
|
||||
void write_meta_gf(const int64_t run_id,
|
||||
const uint32_t index,
|
||||
const uint16_t id,
|
||||
const uint64_t status);
|
||||
};
|
||||
|
||||
#endif //JF_LIVE_WRITER_HPP
|
||||
|
||||
@@ -236,6 +236,47 @@ void JFH5Writer::write_data(
|
||||
H5Sclose(ram_ds);
|
||||
}
|
||||
|
||||
void JFH5Writer::write_meta_gf(
|
||||
const int64_t run_id, const uint32_t index, const uint16_t id, const uint64_t status)
|
||||
{
|
||||
if (run_id != current_run_id_) {
|
||||
throw runtime_error("Invalid run_id.");
|
||||
}
|
||||
|
||||
const hsize_t ram_dims[3] = {1, 1, 1};
|
||||
auto ram_ds = H5Screate_simple(3, ram_dims, nullptr);
|
||||
if (ram_ds < 0) {
|
||||
throw runtime_error("Cannot create metadata ram dataspace.");
|
||||
}
|
||||
|
||||
auto file_ds = H5Dget_space(image_id_dataset_);
|
||||
if (file_ds < 0) {
|
||||
throw runtime_error("Cannot get metadata dataset file dataspace.");
|
||||
}
|
||||
|
||||
const hsize_t file_ds_start[] = {index, 0, 0};
|
||||
const hsize_t file_ds_stride[] = {1, 1, 1};
|
||||
const hsize_t file_ds_count[] = {1, 1, 1};
|
||||
const hsize_t file_ds_block[] = {1, 1, 1};
|
||||
if (H5Sselect_hyperslab(file_ds, H5S_SELECT_SET,
|
||||
file_ds_start, file_ds_stride, file_ds_count, file_ds_block) < 0) {
|
||||
throw runtime_error("Cannot select metadata dataset file hyperslab.");
|
||||
}
|
||||
|
||||
if (H5Dwrite(image_id_dataset_, H5T_NATIVE_UINT64,
|
||||
ram_ds, file_ds, H5P_DEFAULT, &id) < 0) {
|
||||
throw runtime_error("Cannot write data to pulse_id dataset.");
|
||||
}
|
||||
|
||||
if (H5Dwrite(status_dataset_, H5T_NATIVE_UINT64,
|
||||
ram_ds, file_ds, H5P_DEFAULT, &status) < 0) {
|
||||
throw runtime_error("Cannot write data to is_good_image dataset.");
|
||||
}
|
||||
|
||||
H5Sclose(file_ds);
|
||||
H5Sclose(ram_ds);
|
||||
}
|
||||
|
||||
void JFH5Writer::write_meta(
|
||||
const int64_t run_id, const uint32_t index, const ImageMetadata* meta)
|
||||
{
|
||||
|
||||
+79
-46
@@ -11,6 +11,8 @@
|
||||
#include "DetWriterConfig.hpp"
|
||||
|
||||
#include "rapidjson/document.h"
|
||||
#include "rapidjson/writer.h"
|
||||
|
||||
|
||||
using namespace std;
|
||||
using namespace buffer_config;
|
||||
@@ -42,65 +44,96 @@ int main (int argc, char *argv[])
|
||||
|
||||
auto ctx = zmq_ctx_new();
|
||||
zmq_ctx_set(ctx, ZMQ_IO_THREADS, LIVE_ZMQ_IO_THREADS);
|
||||
auto receiver = BufferUtils::connect_socket(
|
||||
ctx, config.detector_name, "writer_agent");
|
||||
auto receiver = BufferUtils::connect_socket_gf(
|
||||
ctx, config.detector_name, "tcp://localhost:9911");
|
||||
|
||||
const size_t IMAGE_N_BYTES = config.image_width * config.image_height * bit_depth / 8;
|
||||
RamBuffer image_buffer(config.detector_name + "_assembler",
|
||||
sizeof(ImageMetadata), IMAGE_N_BYTES, 1, RAM_BUFFER_N_SLOTS);
|
||||
|
||||
|
||||
JFH5Writer writer(config.detector_name);
|
||||
WriterStats stats(config.detector_name, IMAGE_N_BYTES);
|
||||
|
||||
char recv_buffer[8192];
|
||||
char recv_buffer_meta[512];
|
||||
char recv_buffer_data[4838400];
|
||||
bool open_run = false;
|
||||
bool header_in = false;
|
||||
int last_run_id = -1;
|
||||
while (true) {
|
||||
auto nbytes = zmq_recv(receiver, &recv_buffer, sizeof(recv_buffer), 0);
|
||||
rapidjson::Document document;
|
||||
if (document.Parse(recv_buffer, nbytes).HasParseError()) {
|
||||
std::string error_str(recv_buffer, nbytes);
|
||||
throw runtime_error(error_str);
|
||||
}
|
||||
|
||||
const string output_file = document["output_file"].GetString();
|
||||
const uint64_t image_id = document["image_id"].GetUint64();
|
||||
const int run_id = document["run_id"].GetInt();
|
||||
const int i_image = document["i_image"].GetInt();
|
||||
const int n_images = document["n_images"].GetInt();
|
||||
// i_image == n_images -> end of run.
|
||||
if (i_image == n_images) {
|
||||
writer.close_run();
|
||||
stats.end_run();
|
||||
continue;
|
||||
}
|
||||
|
||||
// i_image == 0 -> we have a new run.
|
||||
if (i_image == 0) {
|
||||
auto image_meta = (ImageMetadata*)
|
||||
image_buffer.get_slot_meta(image_id);
|
||||
//meta
|
||||
auto header_nbytes = zmq_recv(receiver, &recv_buffer_meta, sizeof(recv_buffer_meta), 0);
|
||||
if (header_nbytes != -1 && header_in == false){
|
||||
header_in = true;
|
||||
// cout << "Header nbytes " << header_nbytes << endl;
|
||||
rapidjson::Document document;
|
||||
if (document.Parse(recv_buffer_meta, header_nbytes).HasParseError()) {
|
||||
std::string error_str(recv_buffer_meta, header_nbytes);
|
||||
throw runtime_error(error_str);
|
||||
}
|
||||
|
||||
writer.open_run(output_file,
|
||||
run_id,
|
||||
n_images,
|
||||
image_meta->height,
|
||||
image_meta->width,
|
||||
image_meta->dtype);
|
||||
}
|
||||
// #ifdef DEBUG_OUTPUT
|
||||
// // using namespace date;
|
||||
// // cout << " [" << std::chrono::system_clock::now() << "]";
|
||||
// cout << " [std_det_writer::json metadata] :";
|
||||
// rapidjson::StringBuffer strbuf;
|
||||
// strbuf.Clear();
|
||||
// rapidjson::Writer<rapidjson::StringBuffer> writer_json_dsa(strbuf);
|
||||
// document.Accept(writer_json_dsa);
|
||||
// cout << strbuf.GetString() << endl;
|
||||
// cout << endl;
|
||||
// #endif
|
||||
|
||||
const string output_file = document["output_file"].GetString();
|
||||
const int run_id = document["run_id"].GetInt();
|
||||
const int i_image = document["i_image"].GetInt();
|
||||
const int n_images = document["n_images"].GetInt();
|
||||
const int status = document["status"].GetInt();
|
||||
const rapidjson::Value& a = document["shape"];
|
||||
const int width = a[0].GetInt();
|
||||
const int heigth = a[1].GetInt();
|
||||
const int dtype = 2;
|
||||
|
||||
// Fair distribution of images among writers.
|
||||
if (i_image % n_writers == i_writer) {
|
||||
char* data = image_buffer.get_slot_data(image_id);
|
||||
// i_image == n_images -> end of run.
|
||||
if (i_image == n_images && open_run == true) {
|
||||
writer.close_run();
|
||||
cout << "[ Writer" << i_writer << " AFTER CLOSE RUN.... " << endl;
|
||||
stats.end_run();
|
||||
open_run = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
stats.start_image_write();
|
||||
writer.write_data(run_id, i_image, data);
|
||||
stats.end_image_write();
|
||||
}
|
||||
// i_image == 0 -> we have a new run.
|
||||
if (i_image == 0 && open_run == false) {
|
||||
writer.open_run(output_file,
|
||||
run_id,
|
||||
n_images,
|
||||
heigth,
|
||||
width,
|
||||
dtype);
|
||||
|
||||
open_run = true;
|
||||
}
|
||||
// data
|
||||
auto img_nbytes = zmq_recv(receiver, &recv_buffer_data, sizeof(recv_buffer_data), 0);
|
||||
if (img_nbytes != -1 && header_in == true && open_run == true){
|
||||
// Fair distribution of images among writers.
|
||||
if (i_image % n_writers == i_writer) {
|
||||
cout << "[ Writer" << i_writer << ":: Frame" << i_image << "]"<< endl;
|
||||
stats.start_image_write();
|
||||
writer.write_data(run_id, i_image, recv_buffer_data);
|
||||
stats.end_image_write();
|
||||
}
|
||||
header_in = false;
|
||||
}
|
||||
|
||||
// Only the first instance writes metadata.
|
||||
if (i_writer == 0) {
|
||||
auto image_meta = (ImageMetadata*)
|
||||
image_buffer.get_slot_meta(image_id);
|
||||
writer.write_meta(run_id, i_image, image_meta);
|
||||
// Only the first instance writes metadata.
|
||||
if (i_writer == 0 && header_in == true && open_run == true) {
|
||||
cout << "[ Writer" << i_writer << "::META from image" << i_image << "]"<< endl;
|
||||
writer.write_meta_gf(run_id,
|
||||
i_image,
|
||||
(uint16_t)run_id,
|
||||
(uint64_t)status);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user