jfjoch_writer: Add auto-fork option

This commit is contained in:
2026-03-05 13:46:39 +01:00
parent a5f3cf62d2
commit e8b25b15b2

View File

@@ -4,6 +4,7 @@
#include <getopt.h>
#include <filesystem>
#include <csignal>
#include <sys/wait.h>
#include "../common/Logger.h"
#include "StreamWriter.h"
#include "../common/print_license.h"
@@ -19,7 +20,8 @@ void print_usage() {
logger.Info("Usage ./jfjoch_writer {options} <address of the ZeroMQ data source>");
logger.Info("");
logger.Info("Available options:");
logger.Info("-T Use raw TCP/IP instead of ZeroMQ");
logger.Info("-T | --tcp Use raw TCP/IP instead of ZeroMQ");
logger.Info("-j<int> | --nproc=<int> Number of forks (only with -T)");
logger.Info("-d<int> | --root_dir=<int> Root directory for file writing");
logger.Info("-r<int> | --zmq_repub_port=<int> ZeroMQ port for PUSH socket to republish images");
logger.Info("-f<int> | --zmq_file_port=<int> ZeroMQ port for PUB socket to inform about finalized files");
@@ -37,7 +39,8 @@ static void sigHandler (int sig){
case SIGHUP:
default:
quitok = true;
writer->Cancel();
if (writer != nullptr)
writer->Cancel();
break;
}
}
@@ -64,6 +67,7 @@ int main(int argc, char **argv) {
int32_t zmq_repub_port = -1;
int32_t zmq_file_port = -1;
int32_t nforks = 1;
std::string root_dir = "";
std::optional<int32_t> rcv_watermark;
std::optional<int32_t> repub_watermark;
@@ -77,14 +81,23 @@ int main(int argc, char **argv) {
{"rcv_watermark", required_argument, 0, 'w'},
{"repub_watermark", required_argument, 0, 'W'},
{"verbose", required_argument, 0, 'v'},
{"tcp", no_argument, 0, 'T'},
{"nproc", required_argument, 0, 'j'},
{0, 0, 0, 0}
};
int option_index = 0;
int opt;
bool raw_tcp = false;
while ((opt = getopt_long(argc, argv, "?hH:r:f:R:d:W:w:vT",long_options, &option_index)) != -1 ) {
while ((opt = getopt_long(argc, argv, "?hH:r:f:R:d:W:w:vTj:",long_options, &option_index)) != -1 ) {
switch (opt) {
case 'j':
nforks = atoi(optarg);
if (nforks <= 0) {
logger.Error("-j must be a positive integer");
exit(EXIT_FAILURE);
}
break;
case 'T':
raw_tcp = true;
break;
@@ -137,6 +150,16 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}
if (nforks > 1 && !raw_tcp) {
logger.Error("-j option is only compatible with -T (raw TCP)");
exit(EXIT_FAILURE);
}
if (raw_tcp && zmq_repub_port > 0) {
logger.Error("Republish option at the moment only possible with ZeroMQ socket (no -T");
exit(EXIT_FAILURE);
}
if (!root_dir.empty()) {
try {
std::filesystem::current_path(root_dir);
@@ -163,6 +186,50 @@ int main(int argc, char **argv) {
logger.Info("Images are republished via ZeroMQ PUSH socket {:s}", repub_zmq_addr);
}
std::vector<pid_t> children;
if (nforks > 1) {
if (setpgid(0, 0) != 0) {
logger.Warning("Failed to create dedicated process group; group shutdown may be less reliable");
}
for (int i = 0; i < nforks; i++) {
pid_t pid = fork();
if (pid == 0) {
// Child: continue to normal writer initialization.
children.clear();
break;
} else if (pid < 0) {
logger.Error("Cannot fork child {}", i);
for (pid_t cpid : children) {
kill(cpid, SIGTERM);
}
for (pid_t cpid : children) {
waitpid(cpid, nullptr, 0);
}
exit(EXIT_FAILURE);
} else {
children.push_back(pid);
}
}
if (!children.empty()) {
// Parent manager process only.
std::vector<int> sigs{SIGQUIT, SIGINT, SIGTERM, SIGHUP};
setUpUnixSignals(sigs);
while (!quitok)
pause();
for (pid_t cpid : children) {
kill(cpid, SIGTERM);
}
for (pid_t cpid : children) {
waitpid(cpid, nullptr, 0);
}
exit(EXIT_SUCCESS);
}
}
ZMQContext context;
std::unique_ptr<ImagePuller> puller;