diff --git a/writer/jfjoch_writer.cpp b/writer/jfjoch_writer.cpp index 9067ba29..57f2ef8d 100644 --- a/writer/jfjoch_writer.cpp +++ b/writer/jfjoch_writer.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include "../common/Logger.h" #include "StreamWriter.h" #include "../common/print_license.h" @@ -19,7 +20,8 @@ void print_usage() { logger.Info("Usage ./jfjoch_writer {options}
"); logger.Info(""); logger.Info("Available options:"); - logger.Info("-T Use raw TCP/IP instead of ZeroMQ"); + logger.Info("-T | --tcp Use raw TCP/IP instead of ZeroMQ"); + logger.Info("-j | --nproc= Number of forks (only with -T)"); logger.Info("-d | --root_dir= Root directory for file writing"); logger.Info("-r | --zmq_repub_port= ZeroMQ port for PUSH socket to republish images"); logger.Info("-f | --zmq_file_port= ZeroMQ port for PUB socket to inform about finalized files"); @@ -37,7 +39,8 @@ static void sigHandler (int sig){ case SIGHUP: default: quitok = true; - writer->Cancel(); + if (writer != nullptr) + writer->Cancel(); break; } } @@ -64,6 +67,7 @@ int main(int argc, char **argv) { int32_t zmq_repub_port = -1; int32_t zmq_file_port = -1; + int32_t nforks = 1; std::string root_dir = ""; std::optional rcv_watermark; std::optional repub_watermark; @@ -77,14 +81,23 @@ int main(int argc, char **argv) { {"rcv_watermark", required_argument, 0, 'w'}, {"repub_watermark", required_argument, 0, 'W'}, {"verbose", required_argument, 0, 'v'}, + {"tcp", no_argument, 0, 'T'}, + {"nproc", required_argument, 0, 'j'}, {0, 0, 0, 0} }; int option_index = 0; int opt; bool raw_tcp = false; - while ((opt = getopt_long(argc, argv, "?hH:r:f:R:d:W:w:vT",long_options, &option_index)) != -1 ) { + while ((opt = getopt_long(argc, argv, "?hH:r:f:R:d:W:w:vTj:",long_options, &option_index)) != -1 ) { switch (opt) { + case 'j': + nforks = atoi(optarg); + if (nforks <= 0) { + logger.Error("-j must be a positive integer"); + exit(EXIT_FAILURE); + } + break; case 'T': raw_tcp = true; break; @@ -137,6 +150,16 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } + if (nforks > 1 && !raw_tcp) { + logger.Error("-j option is only compatible with -T (raw TCP)"); + exit(EXIT_FAILURE); + } + + if (raw_tcp && zmq_repub_port > 0) { + logger.Error("Republish option at the moment only possible with ZeroMQ socket (no -T"); + exit(EXIT_FAILURE); + } + if (!root_dir.empty()) { try { std::filesystem::current_path(root_dir); @@ -163,6 +186,50 @@ int main(int argc, char **argv) { logger.Info("Images are republished via ZeroMQ PUSH socket {:s}", repub_zmq_addr); } + std::vector children; + if (nforks > 1) { + if (setpgid(0, 0) != 0) { + logger.Warning("Failed to create dedicated process group; group shutdown may be less reliable"); + } + + for (int i = 0; i < nforks; i++) { + pid_t pid = fork(); + if (pid == 0) { + // Child: continue to normal writer initialization. + children.clear(); + break; + } else if (pid < 0) { + logger.Error("Cannot fork child {}", i); + for (pid_t cpid : children) { + kill(cpid, SIGTERM); + } + for (pid_t cpid : children) { + waitpid(cpid, nullptr, 0); + } + exit(EXIT_FAILURE); + } else { + children.push_back(pid); + } + } + + if (!children.empty()) { + // Parent manager process only. + std::vector sigs{SIGQUIT, SIGINT, SIGTERM, SIGHUP}; + setUpUnixSignals(sigs); + + while (!quitok) + pause(); + + for (pid_t cpid : children) { + kill(cpid, SIGTERM); + } + for (pid_t cpid : children) { + waitpid(cpid, nullptr, 0); + } + exit(EXIT_SUCCESS); + } + } + ZMQContext context; std::unique_ptr puller;