From f1472a5fe1462636815761f1709f1d82feed8634 Mon Sep 17 00:00:00 2001 From: leonarski_f Date: Fri, 3 May 2024 15:46:21 +0200 Subject: [PATCH] jfjoch_writer updates * Writer can send a PUB message with information on closed file * Writer uses getopt to format input parameters * DiffractionExperiment: Add frame number limit --- common/DiffractionExperiment.cpp | 10 +++- tests/DiffractionExperimentTest.cpp | 9 +++ tests/HDF5WritingTest.cpp | 88 +++++++++++++++++++++++++++++ writer/HDF5DataFile.cpp | 63 +++++++++++++-------- writer/HDF5DataFile.h | 6 +- writer/HDF5Writer.cpp | 60 +++++++++++++++----- writer/HDF5Writer.h | 8 +++ writer/REAMDE.md | 33 ++++++++++- writer/StreamWriter.cpp | 15 ++++- writer/StreamWriter.h | 9 ++- writer/jfjoch_writer.cpp | 77 ++++++++++++++++++++++--- 11 files changed, 321 insertions(+), 57 deletions(-) diff --git a/common/DiffractionExperiment.cpp b/common/DiffractionExperiment.cpp index e8684cf5..4fa7128f 100644 --- a/common/DiffractionExperiment.cpp +++ b/common/DiffractionExperiment.cpp @@ -5,10 +5,10 @@ #include "NetworkAddressConvert.h" #include "JFJochCompressor.h" // For ZSTD_USE_JFJOCH_RLE -#include "GitInfo.h" #include "DiffractionExperiment.h" #include "JFJochException.h" #include "RawToConvertedGeometry.h" +#include "../fpga/include/jfjoch_fpga.h" using namespace std::literals::chrono_literals; @@ -1118,7 +1118,15 @@ int64_t DiffractionExperiment::GetInternalPacketGeneratorImages() const { } DiffractionExperiment &DiffractionExperiment::ImportDatasetSettings(const DatasetSettings &input) { + auto tmp = dataset; dataset = input; + if (GetFrameNum() >= MAX_FRAMES) { + dataset = tmp; + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, + "Frame number (summation * images_per_trigger * ntrigger) cannot exceed " + + std::to_string(MAX_FRAMES)); + } + return *this; } diff --git a/tests/DiffractionExperimentTest.cpp b/tests/DiffractionExperimentTest.cpp index ad823442..14d94cb9 100644 --- a/tests/DiffractionExperimentTest.cpp +++ b/tests/DiffractionExperimentTest.cpp @@ -880,6 +880,15 @@ TEST_CASE("DiffractioExperiment_GetDefaultPlotBinning", "[DiffractionExperiment] CHECK(x.GetDefaultPlotBinning() == 1); } +TEST_CASE("DiffractioExperiment_ImportDataset_TooManyFrames", "[DiffractionExperiment]") { + DiffractionExperiment x(DetectorGeometry(8, 2, 8, 36, true)); + x.ImagesPerTrigger(345).NumTriggers(17); + DatasetSettings dataset; + dataset.ImagesPerTrigger(100000).NumTriggers(100000); + REQUIRE_THROWS(x.ImportDatasetSettings(dataset)); + REQUIRE(x.GetImageNum() == 345 * 17); +} + TEST_CASE("DiffractioExperiment_ExportROIMask", "[DiffractionExperiment]") { DiffractionExperiment x(DetectorGeometry(8, 2, 8, 36, true)); x.Mode(DetectorMode::Conversion); diff --git a/tests/HDF5WritingTest.cpp b/tests/HDF5WritingTest.cpp index 29e80bff..5e442c72 100644 --- a/tests/HDF5WritingTest.cpp +++ b/tests/HDF5WritingTest.cpp @@ -9,6 +9,7 @@ #include "../writer/HDF5NXmx.h" #include "../compression/JFJochCompressor.h" #include "../image_analysis/AzimuthalIntegrationProfile.h" +#include using namespace std::literals::chrono_literals; @@ -199,9 +200,93 @@ TEST_CASE("HDF5Writer", "[HDF5][Full]") { REQUIRE_NOTHROW(file_set.Write(message)); } + + auto v = file_set.Finalize(); + REQUIRE(v.size() == 3); // 3 files + REQUIRE(v[0].filename == "test02_1p10_data_000001.h5"); + REQUIRE(v[0].total_images == 2); + REQUIRE(v[1].filename == "test02_1p10_data_000002.h5"); + REQUIRE(v[1].total_images == 2); + REQUIRE(v[2].filename == "test02_1p10_data_000003.h5"); + REQUIRE(v[2].total_images == 1); + + REQUIRE(!file_set.GetZMQAddr()); } // No leftover HDF5 objects REQUIRE (H5Fget_obj_count(H5F_OBJ_ALL, H5F_OBJ_ALL) == 0); + remove("test02_1p10_data_000001.h5"); + remove("test02_1p10_data_000002.h5"); + remove("test02_1p10_data_000003.h5"); +} + +TEST_CASE("HDF5Writer_Socket", "[HDF5][Full]") { + { + ZMQContext c; + + RegisterHDF5Filter(); + DiffractionExperiment x(DetectorGeometry(8, 2, 8, 36)); + std::vector spots; + + x.FilePrefix("test05").ImagesPerTrigger(5).ImagesPerFile(2).Compression(CompressionAlgorithm::NO_COMPRESSION) + .HeaderAppendix("{\"z\":567}"); + StartMessage start_message; + x.FillMessage(start_message); + + HDF5Writer file_set(start_message); + file_set.SetupSocket(c, "ipc://#1"); + std::vector image(x.GetPixelsNum()); + + ZMQSocket s(c, ZMQSocketType::Sub); + s.Connect("ipc://#1"); + s.SubscribeAll(); + s.ReceiveTimeout(std::chrono::seconds(5)); + + for (int i = 0; i < x.GetImageNum(); i++) { + DataMessage message{}; + message.image.pixel_depth_bytes = 2; + message.image.pixel_is_signed = false; + message.image.algorithm = CompressionAlgorithm::NO_COMPRESSION; + message.image.xpixel = x.GetXPixelsNum(); + message.image.ypixel = x.GetYPixelsNum(); + message.image.data = (uint8_t *) image.data(); + message.image.size = x.GetPixelsNum() * x.GetPixelDepth(); + message.spots = spots; + message.number = i; + + REQUIRE_NOTHROW(file_set.Write(message)); + } + REQUIRE(file_set.Finalize().size() == 3); + + ZMQMessage msg; + nlohmann::json j; + + REQUIRE(s.Receive(msg, true)); + j = nlohmann::json::parse(std::string((char *) msg.data(), msg.size())); + REQUIRE(j["filename"] == "test05_data_000001.h5"); + REQUIRE(j["nimages"] == 2); + REQUIRE(j.contains("user_data")); + REQUIRE(j["user_data"]["z"] == 567); + + REQUIRE(s.Receive(msg, true)); + j = nlohmann::json::parse(std::string((char *) msg.data(), msg.size())); + REQUIRE(j["filename"] == "test05_data_000002.h5"); + REQUIRE(j["nimages"] == 2); + REQUIRE(j.contains("user_data")); + REQUIRE(j["user_data"]["z"] == 567); + + REQUIRE(s.Receive(msg, true)); + j = nlohmann::json::parse(std::string((char *) msg.data(), msg.size())); + REQUIRE(j["filename"] == "test05_data_000003.h5"); + REQUIRE(j["nimages"] == 1); + REQUIRE(j.contains("user_data")); + REQUIRE(j["user_data"]["z"] == 567); + } + // No leftover HDF5 objects + REQUIRE (H5Fget_obj_count(H5F_OBJ_ALL, H5F_OBJ_ALL) == 0); + + remove("test05_data_000001.h5"); + remove("test05_data_000002.h5"); + remove("test05_data_000003.h5"); } TEST_CASE("HDF5Writer_Spots", "[HDF5][Full]") { @@ -239,6 +324,9 @@ TEST_CASE("HDF5Writer_Spots", "[HDF5][Full]") { } // No leftover HDF5 objects REQUIRE (H5Fget_obj_count(H5F_OBJ_ALL, H5F_OBJ_ALL) == 0); + + remove("test02_1p10_spots_data_000001.h5"); + remove("test02_1p10_spots_data_000002.h5"); } TEST_CASE("HDF5Writer_Rad_Int_Profile", "[HDF5][Full]") { diff --git a/writer/HDF5DataFile.cpp b/writer/HDF5DataFile.cpp index ae9bfdc7..f3c90a35 100644 --- a/writer/HDF5DataFile.cpp +++ b/writer/HDF5DataFile.cpp @@ -31,25 +31,43 @@ HDF5DataFile::HDF5DataFile(const std::string &in_filename, plugins.emplace_back(std::make_unique(in_max_spots)); } +std::optional HDF5DataFile::Close() { + if (!data_file) + return {}; + + HDF5Group group_exp(*data_file, "/entry/detector"); + group_exp.NXClass("NXcollection"); + + group_exp.SaveVector("timestamp", timestamp); + group_exp.SaveVector("exptime", exptime); + group_exp.SaveVector("number", number); + + for (auto &p: plugins) + p->WriteFinal(*data_file); + + if (data_set) { + data_set + ->Attr("image_nr_low", (int32_t) (image_low + 1)) + .Attr("image_nr_high", (int32_t) (image_low + 1 + max_image_number)); + data_set.reset(); + } + data_file.reset(); + std::rename(tmp_filename.c_str(), filename.c_str()); + + closed = true; + + HDF5DataFileStatistics ret; + ret.max_image_number = max_image_number; + ret.total_images = nimages; + ret.filename = filename; + return ret; +} + HDF5DataFile::~HDF5DataFile() { if (data_file) { - HDF5Group group_exp(*data_file, "/entry/detector"); - group_exp.NXClass("NXcollection"); - - group_exp.SaveVector("timestamp", timestamp); - group_exp.SaveVector("exptime", exptime); - group_exp.SaveVector("number", number); - - for (auto &p: plugins) - p->WriteFinal(*data_file); - if (data_set) { - data_set - ->Attr("image_nr_low", (int32_t) (image_low + 1)) - .Attr("image_nr_high", (int32_t) (image_low + 1 + max_image_number)); - data_set.reset(); - } - data_file.reset(); - std::rename(tmp_filename.c_str(), filename.c_str()); + try { + Close(); + } catch (...) {} } } @@ -82,6 +100,9 @@ void HDF5DataFile::CreateFile(const DataMessage& msg) { } void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) { + if (closed) + return; + bool new_file = false; if (!data_file) { @@ -108,14 +129,6 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) { number[image_number] = (msg.original_number) ? msg.original_number.value() : msg.number; } -HDF5DataFileStatistics HDF5DataFile::GetStatistics() const { - HDF5DataFileStatistics ret; - ret.max_image_number = max_image_number; - ret.total_images = nimages; - ret.filename = filename; - return ret; -} - size_t HDF5DataFile::GetNumImages() const { return nimages; } diff --git a/writer/HDF5DataFile.h b/writer/HDF5DataFile.h index 11ca52b9..aecd5ab1 100644 --- a/writer/HDF5DataFile.h +++ b/writer/HDF5DataFile.h @@ -19,7 +19,6 @@ struct HDF5DataFileStatistics { uint64_t total_images; }; - class HDF5DataFile { std::string filename; std::string tmp_filename; @@ -42,14 +41,15 @@ class HDF5DataFile { int32_t image_low; + bool closed = false; + void CreateFile(const DataMessage& msg); public: HDF5DataFile(const std::string& name, const std::vector& rad_int_bin_to_q, int32_t image_low, size_t max_spots = 0); ~HDF5DataFile(); - + std::optional Close(); void Write(const DataMessage& msg, uint64_t image_number); - HDF5DataFileStatistics GetStatistics() const; size_t GetNumImages() const; }; diff --git a/writer/HDF5Writer.cpp b/writer/HDF5Writer.cpp index ef1b678e..aa95dbad 100644 --- a/writer/HDF5Writer.cpp +++ b/writer/HDF5Writer.cpp @@ -2,12 +2,14 @@ #include "HDF5Writer.h" #include "HDF5NXmx.h" +#include HDF5Writer::HDF5Writer(const StartMessage &request) -: images_per_file(request.images_per_file), -file_prefix(request.file_prefix), -max_spot_count(request.max_spot_count), -az_int_bin_to_q(request.az_int_bin_to_q) {} + : images_per_file(request.images_per_file), + file_prefix(request.file_prefix), + max_spot_count(request.max_spot_count), + az_int_bin_to_q(request.az_int_bin_to_q), + user_data(request.user_data) {} void HDF5Writer::Write(const DataMessage& message) { std::lock_guard lock(hdf5_mutex); @@ -35,21 +37,51 @@ void HDF5Writer::Write(const DataMessage& message) { if (message.image.size > 0) files[file_number]->Write(message, image_number); - if (files[file_number]->GetNumImages() == images_per_file) { - stats.emplace_back(files[file_number]->GetStatistics()); - files[file_number].reset(); - } + if (files[file_number]->GetNumImages() == images_per_file) + AddStats(files[file_number]->Close()); } std::vector HDF5Writer::Finalize() { std::lock_guard lock(hdf5_mutex); for (auto &f: files) { - if (f) { - auto tmp = f->GetStatistics(); - if (tmp.total_images > 0) - stats.push_back(tmp); - f.reset(); - } + if (f) + AddStats(f->Close()); } return stats; } + +void HDF5Writer::AddStats(const std::optional& s) { + if (!s) + return; + + stats.push_back(*s); + if (socket) { + nlohmann::json j; + j["filename"] = s->filename; + j["nimages"] = s->total_images; + if (!user_data.empty()) { + nlohmann::json j_userdata; + + // if user_data is valid json, interpret it as such, otherwise embed as string + try { + j_userdata = nlohmann::json::parse(user_data); + } catch (...) { + j_userdata = user_data; + } + j["user_data"] = j_userdata; + } + socket->Send(j.dump()); + } +} + +void HDF5Writer::SetupSocket(ZMQContext &c, const std::string &addr) { + socket = std::make_unique(c, ZMQSocketType::Pub); + socket->Bind(addr); +} + +std::optional HDF5Writer::GetZMQAddr() { + if (socket) { + return socket->GetEndpointName(); + } else + return {}; +} diff --git a/writer/HDF5Writer.h b/writer/HDF5Writer.h index eb95e08a..84e4798f 100644 --- a/writer/HDF5Writer.h +++ b/writer/HDF5Writer.h @@ -7,6 +7,7 @@ #include "HDF5DataFile.h" #include "../frame_serialize/JFJochMessages.h" +#include "../common/ZMQWrappers.h" class HDF5Writer { std::vector > files; @@ -15,10 +16,17 @@ class HDF5Writer { uint64_t max_spot_count; std::vector az_int_bin_to_q; std::vector stats; + std::string user_data; + + std::unique_ptr socket; + + void AddStats(const std::optional& s); public: explicit HDF5Writer(const StartMessage &request); void Write(const DataMessage& msg); std::vector Finalize(); + void SetupSocket(ZMQContext &c, const std::string &addr); + std::optional GetZMQAddr(); }; #endif //JUNGFRAUJOCH_HDF5WRITER_H diff --git a/writer/REAMDE.md b/writer/REAMDE.md index 92612ebf..98d37bd1 100644 --- a/writer/REAMDE.md +++ b/writer/REAMDE.md @@ -1,4 +1,4 @@ -# NXmx compliant writer +# NXmx compliant Jungfraujoch writer ## Acknowledgements * Zdenek Matej (MAX IV) @@ -12,21 +12,48 @@ Writer detects and protects for basic security issues, like `file_prefix` starti ## Usage Writer needs to be started as a background service, with the following command: ``` -jfjoch_writer_service
{
} +jfjoch_writer {options}
+ +Options: +-H | --http_port= HTTP port for statistics +-r | --zmq_repub_port= ZeroMQ port for PUSH socket to republish images +-f | --zmq_file_port= ZeroMQ port for PUB socket to inform about finalized files + ``` for example: ``` -jfjoch_writer_service tcp://dcu-address:5400 5232 tcp://0.0.0.0:3456 +jfjoch_writer -H5234 tcp://dcu-address:5400 ``` + ## HTTP interface Writer has dedicated status interface via HTTP. It allows for two operations: * ***check state of the writer*** to check if the writer is properly synchronized with DCU (e.g., that `file_prefix` agrees with what was set on the DCU) and monitor progress. * ***cancel writing*** this will close all the HDF5 files being written and restart writer - the option should be used only if DCU process was terminated or disconnected, it SHOULD NOT be used as standard cancellation procedure (when DCU received cancel command it should properly finish writing as well) + ## Republish Republish creates a PULL socket on the writer, where all the messages are republished for further use by data analysis pipeline. Republish is non-blocking, so if there is no receiver on other end or the sending queue is full - images won't be republished. In case of START/END messages republishing will attempt sending for 100 ms, but if send times out it won't be retried. Republish address is optional, if omitted this functionality is not enabled. + +## Finalized files information +Creates PUB socket to inform about finalized data files. For each closed file, the socket will send a JSON message, with the following structure: + +``` +{ + "filename": : HDF5 data file name, + "nimages": number of images in the file, + "user_data": or user_data +} +``` +`user_data` is defined as `header_appendix` in the `/start` operation in the `jfjoch_broker`. +If the `header_appendix` is a string with valid JSON meaning, it will be transferred as JSON. + ## NXmx extensions +Jungfraujoch aims to generate files compliant with NXmx format, as well as make them as close as possible to files +written by DECTRIS Filewriter. This ensures the file compatibility of Neggia and Durin XDS plugins, as well as Albula viewer. + +If spot finding is enabled, spots are written in the [CXI format](https://raw.githubusercontent.com/cxidb/CXI/master/cxi_file_format.pdf) and is recoginzed by CrystFEL. + There are custom extension to NXmx format. These will be documented in the future. \ No newline at end of file diff --git a/writer/StreamWriter.cpp b/writer/StreamWriter.cpp index a8e6634d..09fd26d4 100644 --- a/writer/StreamWriter.cpp +++ b/writer/StreamWriter.cpp @@ -6,8 +6,15 @@ #include "HDF5NXmx.h" #include "MakeDirectory.h" -StreamWriter::StreamWriter(ZMQContext &context, Logger &in_logger, const std::string &zmq_addr, const std::string& repub_address) -: image_puller(context, repub_address), logger(in_logger) { +StreamWriter::StreamWriter(ZMQContext &in_context, + Logger &in_logger, + const std::string &zmq_addr, + const std::string &repub_address, + const std::string &in_file_done_address) + : zmq_context(in_context), + image_puller(in_context, repub_address), + logger(in_logger), + file_done_address(in_file_done_address) { image_puller.Connect(zmq_addr); logger.Info("Connected via ZMQ to {}", zmq_addr); } @@ -37,6 +44,10 @@ void StreamWriter::CollectImages(std::vector &v) { CheckPath(image_puller_output.cbor->start_message->file_prefix); MakeDirectory(image_puller_output.cbor->start_message->file_prefix); HDF5Writer writer(*image_puller_output.cbor->start_message); + + if (!file_done_address.empty()) + writer.SetupSocket(zmq_context, file_done_address); + std::unique_ptr master_file; if (!image_puller_output.cbor->start_message->write_master_file || image_puller_output.cbor->start_message->write_master_file.value()) master_file = std::make_unique(*image_puller_output.cbor->start_message); diff --git a/writer/StreamWriter.h b/writer/StreamWriter.h index 9be9f147..02e7085d 100644 --- a/writer/StreamWriter.h +++ b/writer/StreamWriter.h @@ -24,6 +24,9 @@ struct StreamWriterOutput { }; class StreamWriter { + ZMQContext &zmq_context; + std::string file_done_address; + StreamWriterState state = StreamWriterState::Idle; ZMQImagePullerOutput image_puller_output; @@ -39,7 +42,11 @@ class StreamWriter { void CollectImages(std::vector &v); bool WaitForImage(); public: - StreamWriter(ZMQContext& context, Logger &logger, const std::string& zmq_addr, const std::string& repub_address = ""); + StreamWriter(ZMQContext& context, + Logger &logger, + const std::string& zmq_addr, + const std::string& repub_address = "", + const std::string& file_done_address = ""); StreamWriterOutput Run(); void Cancel(); StreamWriterStatistics GetStatistics() const; diff --git a/writer/jfjoch_writer.cpp b/writer/jfjoch_writer.cpp index 6b543199..28655f6e 100644 --- a/writer/jfjoch_writer.cpp +++ b/writer/jfjoch_writer.cpp @@ -4,11 +4,24 @@ #include "../common/Logger.h" #include "JFJochWriterHttp.h" #include "StreamWriter.h" +#include + +static Logger logger("jfjoch_writer"); static Pistache::Http::Endpoint *httpEndpoint; static StreamWriter *writer; volatile static bool quitok = false; +void print_usage() { + logger.Info("Usage ./jfjoch_writer {options}
"); + logger.Info(""); + logger.Info("Available options:"); + logger.Info("-H | --http_port= HTTP port for statistics"); + logger.Info("-r | --zmq_repub_port= ZeroMQ port for PUSH socket to republish images"); + logger.Info("-f | --zmq_file_port= ZeroMQ port for PUB socket to inform about finalized files"); + logger.Info(""); +} + static void sigHandler (int sig){ switch(sig){ case SIGINT: @@ -41,22 +54,70 @@ static void setUpUnixSignals(std::vector quitSignals) { int main(int argc, char **argv) { RegisterHDF5Filter(); - static Logger logger("jfjoch_writer_http"); + int32_t http_port = 5234; + int32_t zmq_repub_port = -1; + int32_t zmq_file_port = -1; - if ((argc != 3) && (argc != 4)) { - logger.Error("Usage ./jfjoch_writer_http {}"); + int c; + static struct option long_options[] = { + {"http_port", required_argument, 0, 'H'}, + {"zmq_repub_port", required_argument, 0, 'r'}, + {"zmq_file_port", required_argument, 0, 'f'}, + {0, 0, 0, 0} + }; + + int option_index = 0; + int opt; + while ((opt = getopt_long(argc, argv, "?hH:r:f:",long_options, &option_index)) != -1 ) { + switch (opt) { + case 'H': + http_port = atoi(optarg); + break; + case 'r': + zmq_repub_port = atoi(optarg); + break; + case 'f': + zmq_file_port = atoi(optarg); + break; + case '?': + case 'h': + print_usage(); + exit(EXIT_SUCCESS); + default: + print_usage(); + exit(EXIT_FAILURE); + } + } + + int first_argc = optind; + + if ((argc - first_argc != 1)) { + print_usage(); exit(EXIT_FAILURE); } - uint16_t http_port = atoi(argv[2]); - std::string repub_address; - if (argc == 4) - repub_address = argv[3]; + if ((http_port <= 0) || (http_port >= UINT16_MAX)) { + logger.Error("Http port must be between 1 - 65534"); + exit(EXIT_FAILURE); + } + logger.Info("HTTP service listening on port {}", http_port); + + std::string repub_zmq_addr, file_done_zmq_addr; + + if ((zmq_file_port < UINT16_MAX) && (zmq_file_port > 0)) { + file_done_zmq_addr = fmt::format("tcp://0.0.0.0:{:d}", zmq_file_port); + logger.Info("Information on closed files is published via ZeroMQ PUB socket {:s}", file_done_zmq_addr); + } + + if ((zmq_repub_port < UINT16_MAX) && (zmq_repub_port > 0)) { + repub_zmq_addr = fmt::format("tcp://0.0.0.0:{:d}", zmq_repub_port); + logger.Info("Images are republished via ZeroMQ PUSH socket {:s}", repub_zmq_addr); + } ZMQContext context; Pistache::Address addr(Pistache::Ipv4::any(), Pistache::Port(http_port)); - writer = new StreamWriter(context, logger, argv[1], repub_address); + writer = new StreamWriter(context, logger, argv[first_argc], repub_zmq_addr, file_done_zmq_addr); httpEndpoint = new Pistache::Http::Endpoint(addr); auto router = std::make_shared();