Files
Jungfraujoch/writer/jfjoch_writer.cpp
Filip Leonarski d1a4c19ef3
Some checks failed
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 8m23s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 7m2s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 8m3s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 7m49s
Build Packages / build:rpm (rocky8) (push) Successful in 7m43s
Build Packages / Generate python client (push) Successful in 14s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 7m51s
Build Packages / Create release (push) Has been skipped
Build Packages / Build documentation (push) Successful in 31s
Build Packages / build:rpm (rocky9) (push) Successful in 8m32s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 7m7s
Build Packages / Unit tests (push) Successful in 1h12m37s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Failing after 2m34s
v1.0.0-rc.117 (#24)
This is an UNSTABLE release and not recommended for production use (please use rc.111 instead).

* jfjoch_viewer: Add ROI results to the dataset info plots
* jfjoch_writer: Remove HTTP interface, as it is not needed/used at the moment

Reviewed-on: #24
Co-authored-by: Filip Leonarski <filip.leonarski@psi.ch>
Co-committed-by: Filip Leonarski <filip.leonarski@psi.ch>
2025-12-05 22:03:07 +01:00

169 lines
5.5 KiB
C++

// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include <getopt.h>
#include <filesystem>
#include <csignal>
#include "../common/Logger.h"
#include "StreamWriter.h"
#include "../common/print_license.h"
#include "../image_puller/ZMQImagePuller.h"
static Logger logger("jfjoch_writer");
static StreamWriter *writer;
volatile static bool quitok = false;
void print_usage() {
logger.Info("Usage ./jfjoch_writer {options} <address of the ZeroMQ data source>");
logger.Info("");
logger.Info("Available options:");
logger.Info("-d<int> | --root_dir=<int> Root directory for file writing");
logger.Info("-r<int> | --zmq_repub_port=<int> ZeroMQ port for PUSH socket to republish images");
logger.Info("-f<int> | --zmq_file_port=<int> ZeroMQ port for PUB socket to inform about finalized files");
logger.Info("-w<int> | --rcv_watermark=<int> Receiving ZeroMQ socket watermark (default = 100)");
logger.Info("-W<int> | --repub_watermark=<int> Republish ZeroMQ socket watermark (default = 1000)");
logger.Info("-v | --verbose Verbose output");
logger.Info("");
}
static void sigHandler (int sig){
switch(sig){
case SIGINT:
case SIGQUIT:
case SIGTERM:
case SIGHUP:
default:
quitok = true;
writer->Cancel();
break;
}
}
static void setUpUnixSignals(std::vector<int> quitSignals) {
sigset_t blocking_mask;
sigemptyset(&blocking_mask);
for (auto sig : quitSignals)
sigaddset(&blocking_mask, sig);
struct sigaction sa;
sa.sa_handler = sigHandler;
sa.sa_mask = blocking_mask;
sa.sa_flags = 0;
for (auto sig : quitSignals)
sigaction(sig, &sa, nullptr);
}
int main(int argc, char **argv) {
RegisterHDF5Filter();
print_license("jfjoch_writer");
int32_t zmq_repub_port = -1;
int32_t zmq_file_port = -1;
std::string root_dir = "";
std::optional<int32_t> rcv_watermark;
std::optional<int32_t> repub_watermark;
bool verbose = false;
static struct option long_options[] = {
{"root_dir", required_argument, 0, 'd'},
{"http_port", required_argument, 0, 'H'},
{"zmq_repub_port", required_argument, 0, 'r'},
{"zmq_file_port", required_argument, 0, 'f'},
{"rcv_watermark", required_argument, 0, 'w'},
{"repub_watermark", required_argument, 0, 'W'},
{"verbose", required_argument, 0, 'v'},
{0, 0, 0, 0}
};
int option_index = 0;
int opt;
while ((opt = getopt_long(argc, argv, "?hH:r:f:R:d:W:w:v",long_options, &option_index)) != -1 ) {
switch (opt) {
case 'v':
verbose = true;
break;
case 'H':
// For back compatibility
logger.Error("HTTP port option is deprecated");
break;
case 'r':
zmq_repub_port = atoi(optarg);
break;
case 'w':
rcv_watermark = atoi(optarg);
if (rcv_watermark.value() <= 0) {
std::cerr << "Watermark must be positive number" << std::endl;
exit(EXIT_FAILURE);
}
break;
case 'W':
repub_watermark = atoi(optarg);
if (repub_watermark.value() <= 0) {
std::cerr << "Watermark must be positive number" << std::endl;
exit(EXIT_FAILURE);
}
break;
case 'f':
zmq_file_port = atoi(optarg);
break;
case 'R': // back compatibility
logger.Warning("-R option is deprecated; use -d");
case 'd':
root_dir = std::string(optarg);
break;
case '?':
case 'h':
print_usage();
exit(EXIT_SUCCESS);
default:
print_usage();
exit(EXIT_FAILURE);
}
}
int first_argc = optind;
if ((argc - first_argc != 1)) {
print_usage();
exit(EXIT_FAILURE);
}
if (!root_dir.empty()) {
try {
std::filesystem::current_path(root_dir);
} catch (const std::filesystem::filesystem_error &e) {
logger.Error("Cannot change path to {}: {}", root_dir, e.what());
exit(EXIT_FAILURE);
}
}
logger.Info("Current path {}", std::filesystem::current_path().string());
std::string repub_zmq_addr, file_done_zmq_addr;
if ((zmq_file_port < UINT16_MAX) && (zmq_file_port > 0)) {
file_done_zmq_addr = fmt::format("tcp://0.0.0.0:{:d}", zmq_file_port);
logger.Info("Information on closed files is published via ZeroMQ PUB socket {:s}", file_done_zmq_addr);
}
if ((zmq_repub_port < UINT16_MAX) && (zmq_repub_port > 0)) {
repub_zmq_addr = fmt::format("tcp://0.0.0.0:{:d}", zmq_repub_port);
logger.Info("Images are republished via ZeroMQ PUSH socket {:s}", repub_zmq_addr);
}
ZMQContext context;
ZMQImagePuller puller(argv[first_argc], repub_zmq_addr, rcv_watermark, repub_watermark);
writer = new StreamWriter(logger,puller,file_done_zmq_addr, verbose);
std::vector<int> sigs{SIGQUIT, SIGINT, SIGTERM, SIGHUP};
setUpUnixSignals(sigs);
while (!quitok)
writer->Run();
exit(EXIT_SUCCESS);
}