Merge branch '2405-writer' into 'main'

jfjoch_writer updates

See merge request jungfraujoch/nextgendcu!55
This commit is contained in:
2024-05-03 15:46:21 +02:00
11 changed files with 321 additions and 57 deletions
+9 -1
View File
@@ -5,10 +5,10 @@
#include "NetworkAddressConvert.h"
#include "JFJochCompressor.h" // For ZSTD_USE_JFJOCH_RLE
#include "GitInfo.h"
#include "DiffractionExperiment.h"
#include "JFJochException.h"
#include "RawToConvertedGeometry.h"
#include "../fpga/include/jfjoch_fpga.h"
using namespace std::literals::chrono_literals;
@@ -1118,7 +1118,15 @@ int64_t DiffractionExperiment::GetInternalPacketGeneratorImages() const {
}
DiffractionExperiment &DiffractionExperiment::ImportDatasetSettings(const DatasetSettings &input) {
auto tmp = dataset;
dataset = input;
if (GetFrameNum() >= MAX_FRAMES) {
dataset = tmp;
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Frame number (summation * images_per_trigger * ntrigger) cannot exceed "
+ std::to_string(MAX_FRAMES));
}
return *this;
}
+9
View File
@@ -880,6 +880,15 @@ TEST_CASE("DiffractioExperiment_GetDefaultPlotBinning", "[DiffractionExperiment]
CHECK(x.GetDefaultPlotBinning() == 1);
}
TEST_CASE("DiffractioExperiment_ImportDataset_TooManyFrames", "[DiffractionExperiment]") {
DiffractionExperiment x(DetectorGeometry(8, 2, 8, 36, true));
x.ImagesPerTrigger(345).NumTriggers(17);
DatasetSettings dataset;
dataset.ImagesPerTrigger(100000).NumTriggers(100000);
REQUIRE_THROWS(x.ImportDatasetSettings(dataset));
REQUIRE(x.GetImageNum() == 345 * 17);
}
TEST_CASE("DiffractioExperiment_ExportROIMask", "[DiffractionExperiment]") {
DiffractionExperiment x(DetectorGeometry(8, 2, 8, 36, true));
x.Mode(DetectorMode::Conversion);
+88
View File
@@ -9,6 +9,7 @@
#include "../writer/HDF5NXmx.h"
#include "../compression/JFJochCompressor.h"
#include "../image_analysis/AzimuthalIntegrationProfile.h"
#include <nlohmann/json.hpp>
using namespace std::literals::chrono_literals;
@@ -199,9 +200,93 @@ TEST_CASE("HDF5Writer", "[HDF5][Full]") {
REQUIRE_NOTHROW(file_set.Write(message));
}
auto v = file_set.Finalize();
REQUIRE(v.size() == 3); // 3 files
REQUIRE(v[0].filename == "test02_1p10_data_000001.h5");
REQUIRE(v[0].total_images == 2);
REQUIRE(v[1].filename == "test02_1p10_data_000002.h5");
REQUIRE(v[1].total_images == 2);
REQUIRE(v[2].filename == "test02_1p10_data_000003.h5");
REQUIRE(v[2].total_images == 1);
REQUIRE(!file_set.GetZMQAddr());
}
// No leftover HDF5 objects
REQUIRE (H5Fget_obj_count(H5F_OBJ_ALL, H5F_OBJ_ALL) == 0);
remove("test02_1p10_data_000001.h5");
remove("test02_1p10_data_000002.h5");
remove("test02_1p10_data_000003.h5");
}
TEST_CASE("HDF5Writer_Socket", "[HDF5][Full]") {
{
ZMQContext c;
RegisterHDF5Filter();
DiffractionExperiment x(DetectorGeometry(8, 2, 8, 36));
std::vector<SpotToSave> spots;
x.FilePrefix("test05").ImagesPerTrigger(5).ImagesPerFile(2).Compression(CompressionAlgorithm::NO_COMPRESSION)
.HeaderAppendix("{\"z\":567}");
StartMessage start_message;
x.FillMessage(start_message);
HDF5Writer file_set(start_message);
file_set.SetupSocket(c, "ipc://#1");
std::vector<uint16_t> image(x.GetPixelsNum());
ZMQSocket s(c, ZMQSocketType::Sub);
s.Connect("ipc://#1");
s.SubscribeAll();
s.ReceiveTimeout(std::chrono::seconds(5));
for (int i = 0; i < x.GetImageNum(); i++) {
DataMessage message{};
message.image.pixel_depth_bytes = 2;
message.image.pixel_is_signed = false;
message.image.algorithm = CompressionAlgorithm::NO_COMPRESSION;
message.image.xpixel = x.GetXPixelsNum();
message.image.ypixel = x.GetYPixelsNum();
message.image.data = (uint8_t *) image.data();
message.image.size = x.GetPixelsNum() * x.GetPixelDepth();
message.spots = spots;
message.number = i;
REQUIRE_NOTHROW(file_set.Write(message));
}
REQUIRE(file_set.Finalize().size() == 3);
ZMQMessage msg;
nlohmann::json j;
REQUIRE(s.Receive(msg, true));
j = nlohmann::json::parse(std::string((char *) msg.data(), msg.size()));
REQUIRE(j["filename"] == "test05_data_000001.h5");
REQUIRE(j["nimages"] == 2);
REQUIRE(j.contains("user_data"));
REQUIRE(j["user_data"]["z"] == 567);
REQUIRE(s.Receive(msg, true));
j = nlohmann::json::parse(std::string((char *) msg.data(), msg.size()));
REQUIRE(j["filename"] == "test05_data_000002.h5");
REQUIRE(j["nimages"] == 2);
REQUIRE(j.contains("user_data"));
REQUIRE(j["user_data"]["z"] == 567);
REQUIRE(s.Receive(msg, true));
j = nlohmann::json::parse(std::string((char *) msg.data(), msg.size()));
REQUIRE(j["filename"] == "test05_data_000003.h5");
REQUIRE(j["nimages"] == 1);
REQUIRE(j.contains("user_data"));
REQUIRE(j["user_data"]["z"] == 567);
}
// No leftover HDF5 objects
REQUIRE (H5Fget_obj_count(H5F_OBJ_ALL, H5F_OBJ_ALL) == 0);
remove("test05_data_000001.h5");
remove("test05_data_000002.h5");
remove("test05_data_000003.h5");
}
TEST_CASE("HDF5Writer_Spots", "[HDF5][Full]") {
@@ -239,6 +324,9 @@ TEST_CASE("HDF5Writer_Spots", "[HDF5][Full]") {
}
// No leftover HDF5 objects
REQUIRE (H5Fget_obj_count(H5F_OBJ_ALL, H5F_OBJ_ALL) == 0);
remove("test02_1p10_spots_data_000001.h5");
remove("test02_1p10_spots_data_000002.h5");
}
TEST_CASE("HDF5Writer_Rad_Int_Profile", "[HDF5][Full]") {
+38 -25
View File
@@ -31,25 +31,43 @@ HDF5DataFile::HDF5DataFile(const std::string &in_filename,
plugins.emplace_back(std::make_unique<HDF5DataFilePluginMX>(in_max_spots));
}
std::optional<HDF5DataFileStatistics> HDF5DataFile::Close() {
if (!data_file)
return {};
HDF5Group group_exp(*data_file, "/entry/detector");
group_exp.NXClass("NXcollection");
group_exp.SaveVector("timestamp", timestamp);
group_exp.SaveVector("exptime", exptime);
group_exp.SaveVector("number", number);
for (auto &p: plugins)
p->WriteFinal(*data_file);
if (data_set) {
data_set
->Attr("image_nr_low", (int32_t) (image_low + 1))
.Attr("image_nr_high", (int32_t) (image_low + 1 + max_image_number));
data_set.reset();
}
data_file.reset();
std::rename(tmp_filename.c_str(), filename.c_str());
closed = true;
HDF5DataFileStatistics ret;
ret.max_image_number = max_image_number;
ret.total_images = nimages;
ret.filename = filename;
return ret;
}
HDF5DataFile::~HDF5DataFile() {
if (data_file) {
HDF5Group group_exp(*data_file, "/entry/detector");
group_exp.NXClass("NXcollection");
group_exp.SaveVector("timestamp", timestamp);
group_exp.SaveVector("exptime", exptime);
group_exp.SaveVector("number", number);
for (auto &p: plugins)
p->WriteFinal(*data_file);
if (data_set) {
data_set
->Attr("image_nr_low", (int32_t) (image_low + 1))
.Attr("image_nr_high", (int32_t) (image_low + 1 + max_image_number));
data_set.reset();
}
data_file.reset();
std::rename(tmp_filename.c_str(), filename.c_str());
try {
Close();
} catch (...) {}
}
}
@@ -82,6 +100,9 @@ void HDF5DataFile::CreateFile(const DataMessage& msg) {
}
void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) {
if (closed)
return;
bool new_file = false;
if (!data_file) {
@@ -108,14 +129,6 @@ void HDF5DataFile::Write(const DataMessage &msg, uint64_t image_number) {
number[image_number] = (msg.original_number) ? msg.original_number.value() : msg.number;
}
HDF5DataFileStatistics HDF5DataFile::GetStatistics() const {
HDF5DataFileStatistics ret;
ret.max_image_number = max_image_number;
ret.total_images = nimages;
ret.filename = filename;
return ret;
}
size_t HDF5DataFile::GetNumImages() const {
return nimages;
}
+3 -3
View File
@@ -19,7 +19,6 @@ struct HDF5DataFileStatistics {
uint64_t total_images;
};
class HDF5DataFile {
std::string filename;
std::string tmp_filename;
@@ -42,14 +41,15 @@ class HDF5DataFile {
int32_t image_low;
bool closed = false;
void CreateFile(const DataMessage& msg);
public:
HDF5DataFile(const std::string& name, const std::vector<float>& rad_int_bin_to_q,
int32_t image_low, size_t max_spots = 0);
~HDF5DataFile();
std::optional<HDF5DataFileStatistics> Close();
void Write(const DataMessage& msg, uint64_t image_number);
HDF5DataFileStatistics GetStatistics() const;
size_t GetNumImages() const;
};
+46 -14
View File
@@ -2,12 +2,14 @@
#include "HDF5Writer.h"
#include "HDF5NXmx.h"
#include <nlohmann/json.hpp>
HDF5Writer::HDF5Writer(const StartMessage &request)
: images_per_file(request.images_per_file),
file_prefix(request.file_prefix),
max_spot_count(request.max_spot_count),
az_int_bin_to_q(request.az_int_bin_to_q) {}
: images_per_file(request.images_per_file),
file_prefix(request.file_prefix),
max_spot_count(request.max_spot_count),
az_int_bin_to_q(request.az_int_bin_to_q),
user_data(request.user_data) {}
void HDF5Writer::Write(const DataMessage& message) {
std::lock_guard<std::mutex> lock(hdf5_mutex);
@@ -35,21 +37,51 @@ void HDF5Writer::Write(const DataMessage& message) {
if (message.image.size > 0)
files[file_number]->Write(message, image_number);
if (files[file_number]->GetNumImages() == images_per_file) {
stats.emplace_back(files[file_number]->GetStatistics());
files[file_number].reset();
}
if (files[file_number]->GetNumImages() == images_per_file)
AddStats(files[file_number]->Close());
}
std::vector<HDF5DataFileStatistics> HDF5Writer::Finalize() {
std::lock_guard<std::mutex> lock(hdf5_mutex);
for (auto &f: files) {
if (f) {
auto tmp = f->GetStatistics();
if (tmp.total_images > 0)
stats.push_back(tmp);
f.reset();
}
if (f)
AddStats(f->Close());
}
return stats;
}
void HDF5Writer::AddStats(const std::optional<HDF5DataFileStatistics>& s) {
if (!s)
return;
stats.push_back(*s);
if (socket) {
nlohmann::json j;
j["filename"] = s->filename;
j["nimages"] = s->total_images;
if (!user_data.empty()) {
nlohmann::json j_userdata;
// if user_data is valid json, interpret it as such, otherwise embed as string
try {
j_userdata = nlohmann::json::parse(user_data);
} catch (...) {
j_userdata = user_data;
}
j["user_data"] = j_userdata;
}
socket->Send(j.dump());
}
}
void HDF5Writer::SetupSocket(ZMQContext &c, const std::string &addr) {
socket = std::make_unique<ZMQSocket>(c, ZMQSocketType::Pub);
socket->Bind(addr);
}
std::optional<std::string> HDF5Writer::GetZMQAddr() {
if (socket) {
return socket->GetEndpointName();
} else
return {};
}
+8
View File
@@ -7,6 +7,7 @@
#include "HDF5DataFile.h"
#include "../frame_serialize/JFJochMessages.h"
#include "../common/ZMQWrappers.h"
class HDF5Writer {
std::vector<std::unique_ptr<HDF5DataFile> > files;
@@ -15,10 +16,17 @@ class HDF5Writer {
uint64_t max_spot_count;
std::vector<float> az_int_bin_to_q;
std::vector<HDF5DataFileStatistics> stats;
std::string user_data;
std::unique_ptr<ZMQSocket> socket;
void AddStats(const std::optional<HDF5DataFileStatistics>& s);
public:
explicit HDF5Writer(const StartMessage &request);
void Write(const DataMessage& msg);
std::vector<HDF5DataFileStatistics> Finalize();
void SetupSocket(ZMQContext &c, const std::string &addr);
std::optional<std::string> GetZMQAddr();
};
#endif //JUNGFRAUJOCH_HDF5WRITER_H
+30 -3
View File
@@ -1,4 +1,4 @@
# NXmx compliant writer
# NXmx compliant Jungfraujoch writer
## Acknowledgements
* Zdenek Matej (MAX IV)
@@ -12,21 +12,48 @@ Writer detects and protects for basic security issues, like `file_prefix` starti
## Usage
Writer needs to be started as a background service, with the following command:
```
jfjoch_writer_service <address to connect via ZeroMQ to DCU> <port for HTTP> {<address to bind via ZeroMQ to republish>}
jfjoch_writer {options} <address to connect via ZeroMQ to DCU>
Options:
-H<int> | --http_port=<int> HTTP port for statistics
-r<int> | --zmq_repub_port=<int> ZeroMQ port for PUSH socket to republish images
-f<int> | --zmq_file_port=<int> ZeroMQ port for PUB socket to inform about finalized files
```
for example:
```
jfjoch_writer_service tcp://dcu-address:5400 5232 tcp://0.0.0.0:3456
jfjoch_writer -H5234 tcp://dcu-address:5400
```
## HTTP interface
Writer has dedicated status interface via HTTP. It allows for two operations:
* ***check state of the writer*** to check if the writer is properly synchronized with DCU (e.g., that `file_prefix` agrees with what was set on the DCU) and monitor progress.
* ***cancel writing*** this will close all the HDF5 files being written and restart writer - the option should be used only if DCU process was terminated or disconnected, it SHOULD NOT be used as standard cancellation procedure (when DCU received cancel command it should properly finish writing as well)
## Republish
Republish creates a PULL socket on the writer, where all the messages are republished for further use by data analysis pipeline.
Republish is non-blocking, so if there is no receiver on other end or the sending queue is full - images won't be republished.
In case of START/END messages republishing will attempt sending for 100 ms, but if send times out it won't be retried.
Republish address is optional, if omitted this functionality is not enabled.
## Finalized files information
Creates PUB socket to inform about finalized data files. For each closed file, the socket will send a JSON message, with the following structure:
```
{
"filename": <string>: HDF5 data file name,
"nimages": <int> number of images in the file,
"user_data": <string> or <json> user_data
}
```
`user_data` is defined as `header_appendix` in the `/start` operation in the `jfjoch_broker`.
If the `header_appendix` is a string with valid JSON meaning, it will be transferred as JSON.
## NXmx extensions
Jungfraujoch aims to generate files compliant with NXmx format, as well as make them as close as possible to files
written by DECTRIS Filewriter. This ensures the file compatibility of Neggia and Durin XDS plugins, as well as Albula viewer.
If spot finding is enabled, spots are written in the [CXI format](https://raw.githubusercontent.com/cxidb/CXI/master/cxi_file_format.pdf) and is recoginzed by CrystFEL.
There are custom extension to NXmx format. These will be documented in the future.
+13 -2
View File
@@ -6,8 +6,15 @@
#include "HDF5NXmx.h"
#include "MakeDirectory.h"
StreamWriter::StreamWriter(ZMQContext &context, Logger &in_logger, const std::string &zmq_addr, const std::string& repub_address)
: image_puller(context, repub_address), logger(in_logger) {
StreamWriter::StreamWriter(ZMQContext &in_context,
Logger &in_logger,
const std::string &zmq_addr,
const std::string &repub_address,
const std::string &in_file_done_address)
: zmq_context(in_context),
image_puller(in_context, repub_address),
logger(in_logger),
file_done_address(in_file_done_address) {
image_puller.Connect(zmq_addr);
logger.Info("Connected via ZMQ to {}", zmq_addr);
}
@@ -37,6 +44,10 @@ void StreamWriter::CollectImages(std::vector<HDF5DataFileStatistics> &v) {
CheckPath(image_puller_output.cbor->start_message->file_prefix);
MakeDirectory(image_puller_output.cbor->start_message->file_prefix);
HDF5Writer writer(*image_puller_output.cbor->start_message);
if (!file_done_address.empty())
writer.SetupSocket(zmq_context, file_done_address);
std::unique_ptr<NXmx> master_file;
if (!image_puller_output.cbor->start_message->write_master_file || image_puller_output.cbor->start_message->write_master_file.value())
master_file = std::make_unique<NXmx>(*image_puller_output.cbor->start_message);
+8 -1
View File
@@ -24,6 +24,9 @@ struct StreamWriterOutput {
};
class StreamWriter {
ZMQContext &zmq_context;
std::string file_done_address;
StreamWriterState state = StreamWriterState::Idle;
ZMQImagePullerOutput image_puller_output;
@@ -39,7 +42,11 @@ class StreamWriter {
void CollectImages(std::vector<HDF5DataFileStatistics> &v);
bool WaitForImage();
public:
StreamWriter(ZMQContext& context, Logger &logger, const std::string& zmq_addr, const std::string& repub_address = "");
StreamWriter(ZMQContext& context,
Logger &logger,
const std::string& zmq_addr,
const std::string& repub_address = "",
const std::string& file_done_address = "");
StreamWriterOutput Run();
void Cancel();
StreamWriterStatistics GetStatistics() const;
+69 -8
View File
@@ -4,11 +4,24 @@
#include "../common/Logger.h"
#include "JFJochWriterHttp.h"
#include "StreamWriter.h"
#include <getopt.h>
static Logger logger("jfjoch_writer");
static Pistache::Http::Endpoint *httpEndpoint;
static StreamWriter *writer;
volatile static bool quitok = false;
void print_usage() {
logger.Info("Usage ./jfjoch_writer {options} <address of the ZeroMQ data source>");
logger.Info("");
logger.Info("Available options:");
logger.Info("-H<int> | --http_port=<int> HTTP port for statistics");
logger.Info("-r<int> | --zmq_repub_port=<int> ZeroMQ port for PUSH socket to republish images");
logger.Info("-f<int> | --zmq_file_port=<int> ZeroMQ port for PUB socket to inform about finalized files");
logger.Info("");
}
static void sigHandler (int sig){
switch(sig){
case SIGINT:
@@ -41,22 +54,70 @@ static void setUpUnixSignals(std::vector<int> quitSignals) {
int main(int argc, char **argv) {
RegisterHDF5Filter();
static Logger logger("jfjoch_writer_http");
int32_t http_port = 5234;
int32_t zmq_repub_port = -1;
int32_t zmq_file_port = -1;
if ((argc != 3) && (argc != 4)) {
logger.Error("Usage ./jfjoch_writer_http <target ZMQ addr> <TCP port> {<repub ZMQ addr to bind>}");
int c;
static struct option long_options[] = {
{"http_port", required_argument, 0, 'H'},
{"zmq_repub_port", required_argument, 0, 'r'},
{"zmq_file_port", required_argument, 0, 'f'},
{0, 0, 0, 0}
};
int option_index = 0;
int opt;
while ((opt = getopt_long(argc, argv, "?hH:r:f:",long_options, &option_index)) != -1 ) {
switch (opt) {
case 'H':
http_port = atoi(optarg);
break;
case 'r':
zmq_repub_port = atoi(optarg);
break;
case 'f':
zmq_file_port = atoi(optarg);
break;
case '?':
case 'h':
print_usage();
exit(EXIT_SUCCESS);
default:
print_usage();
exit(EXIT_FAILURE);
}
}
int first_argc = optind;
if ((argc - first_argc != 1)) {
print_usage();
exit(EXIT_FAILURE);
}
uint16_t http_port = atoi(argv[2]);
std::string repub_address;
if (argc == 4)
repub_address = argv[3];
if ((http_port <= 0) || (http_port >= UINT16_MAX)) {
logger.Error("Http port must be between 1 - 65534");
exit(EXIT_FAILURE);
}
logger.Info("HTTP service listening on port {}", http_port);
std::string repub_zmq_addr, file_done_zmq_addr;
if ((zmq_file_port < UINT16_MAX) && (zmq_file_port > 0)) {
file_done_zmq_addr = fmt::format("tcp://0.0.0.0:{:d}", zmq_file_port);
logger.Info("Information on closed files is published via ZeroMQ PUB socket {:s}", file_done_zmq_addr);
}
if ((zmq_repub_port < UINT16_MAX) && (zmq_repub_port > 0)) {
repub_zmq_addr = fmt::format("tcp://0.0.0.0:{:d}", zmq_repub_port);
logger.Info("Images are republished via ZeroMQ PUSH socket {:s}", repub_zmq_addr);
}
ZMQContext context;
Pistache::Address addr(Pistache::Ipv4::any(), Pistache::Port(http_port));
writer = new StreamWriter(context, logger, argv[1], repub_address);
writer = new StreamWriter(context, logger, argv[first_argc], repub_zmq_addr, file_done_zmq_addr);
httpEndpoint = new Pistache::Http::Endpoint(addr);
auto router = std::make_shared<Pistache::Rest::Router>();