// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include #include #include #include #include "../common/Logger.h" #include "StreamWriter.h" #include "../common/print_license.h" #include "../image_puller/ZMQImagePuller.h" #include "../image_puller/TCPImagePuller.h" static Logger logger("jfjoch_writer"); static StreamWriter *writer; static std::atomic quitok = false; void print_usage() { logger.Info("Usage ./jfjoch_writer {options}
"); logger.Info(""); logger.Info("Available options:"); logger.Info("-T | --tcp Use raw TCP/IP instead of ZeroMQ"); logger.Info("-j | --nproc= Number of forks (only with -T)"); logger.Info("-d | --root_dir= Root directory for file writing"); logger.Info("-r | --zmq_repub_port= ZeroMQ port for PUSH socket to republish images"); logger.Info("-f | --zmq_file_port= ZeroMQ port for PUB socket to inform about finalized files"); logger.Info("-w | --rcv_watermark= Receiving ZeroMQ socket watermark (default = 100)"); logger.Info("-W | --repub_watermark= Republish ZeroMQ socket watermark (default = 1000)"); logger.Info("-v | --verbose Verbose output"); logger.Info(""); } static void sigHandler (int sig){ switch(sig){ case SIGINT: case SIGQUIT: case SIGTERM: case SIGHUP: default: quitok = true; if (writer != nullptr) writer->Cancel(); break; } } static void setUpUnixSignals(std::vector quitSignals) { sigset_t blocking_mask; sigemptyset(&blocking_mask); for (auto sig : quitSignals) sigaddset(&blocking_mask, sig); struct sigaction sa; sa.sa_handler = sigHandler; sa.sa_mask = blocking_mask; sa.sa_flags = 0; for (auto sig : quitSignals) sigaction(sig, &sa, nullptr); } int main(int argc, char **argv) { RegisterHDF5Filter(); print_license("jfjoch_writer"); int32_t zmq_repub_port = -1; int32_t zmq_file_port = -1; int32_t nforks = 1; std::string root_dir = ""; std::optional rcv_watermark; std::optional repub_watermark; bool verbose = false; static struct option long_options[] = { {"root_dir", required_argument, 0, 'd'}, {"http_port", required_argument, 0, 'H'}, {"zmq_repub_port", required_argument, 0, 'r'}, {"zmq_file_port", required_argument, 0, 'f'}, {"rcv_watermark", required_argument, 0, 'w'}, {"repub_watermark", required_argument, 0, 'W'}, {"verbose", required_argument, 0, 'v'}, {"tcp", no_argument, 0, 'T'}, {"nproc", required_argument, 0, 'j'}, {0, 0, 0, 0} }; int option_index = 0; int opt; bool raw_tcp = false; while ((opt = getopt_long(argc, argv, "?hH:r:f:R:d:W:w:vTj:",long_options, &option_index)) != -1 ) { switch (opt) { case 'j': nforks = atoi(optarg); if (nforks <= 0) { logger.Error("-j must be a positive integer"); exit(EXIT_FAILURE); } break; case 'T': raw_tcp = true; break; case 'v': verbose = true; break; case 'H': // For back compatibility logger.Error("HTTP port option is deprecated"); break; case 'r': zmq_repub_port = atoi(optarg); break; case 'w': rcv_watermark = atoi(optarg); if (rcv_watermark.value() <= 0) { std::cerr << "Watermark must be positive number" << std::endl; exit(EXIT_FAILURE); } break; case 'W': repub_watermark = atoi(optarg); if (repub_watermark.value() <= 0) { std::cerr << "Watermark must be positive number" << std::endl; exit(EXIT_FAILURE); } break; case 'f': zmq_file_port = atoi(optarg); break; case 'R': // back compatibility logger.Warning("-R option is deprecated; use -d"); case 'd': root_dir = std::string(optarg); break; case '?': case 'h': print_usage(); exit(EXIT_SUCCESS); default: print_usage(); exit(EXIT_FAILURE); } } int first_argc = optind; if ((argc - first_argc != 1)) { print_usage(); exit(EXIT_FAILURE); } if (nforks > 1 && !raw_tcp) { logger.Error("-j option is only compatible with -T (raw TCP)"); exit(EXIT_FAILURE); } if (!root_dir.empty()) { try { std::filesystem::current_path(root_dir); } catch (const std::filesystem::filesystem_error &e) { logger.Error("Cannot change path to {}: {}", root_dir, e.what()); exit(EXIT_FAILURE); } } logger.Info("Current path {}", std::filesystem::current_path().string()); std::string repub_zmq_addr, file_done_zmq_addr; if ((zmq_file_port < UINT16_MAX) && (zmq_file_port > 0)) { file_done_zmq_addr = fmt::format("tcp://0.0.0.0:{:d}", zmq_file_port); logger.Info("Information on closed files is published via ZeroMQ PUB socket {:s}", file_done_zmq_addr); } if ((zmq_repub_port < UINT16_MAX) && (zmq_repub_port > 0)) { if (raw_tcp) { logger.Error("TCP republishing not supported at the moment"); exit(EXIT_FAILURE); } repub_zmq_addr = fmt::format("tcp://0.0.0.0:{:d}", zmq_repub_port); logger.Info("Images are republished via ZeroMQ PUSH socket {:s}", repub_zmq_addr); } std::vector children; if (nforks > 1) { if (setpgid(0, 0) != 0) { logger.Warning("Failed to create dedicated process group; group shutdown may be less reliable"); } for (int i = 0; i < nforks; i++) { pid_t pid = fork(); if (pid == 0) { // Child: continue to normal writer initialization. children.clear(); break; } else if (pid < 0) { logger.Error("Cannot fork child {}", i); for (pid_t cpid : children) { kill(cpid, SIGTERM); } for (pid_t cpid : children) { waitpid(cpid, nullptr, 0); } exit(EXIT_FAILURE); } else { children.push_back(pid); } } if (!children.empty()) { // Parent manager process only. std::vector sigs{SIGQUIT, SIGINT, SIGTERM, SIGHUP}; setUpUnixSignals(sigs); while (!quitok) pause(); for (pid_t cpid : children) { kill(cpid, SIGTERM); } for (pid_t cpid : children) { waitpid(cpid, nullptr, 0); } exit(EXIT_SUCCESS); } } ZMQContext context; std::unique_ptr puller; if (raw_tcp) puller = std::make_unique(argv[first_argc], std::nullopt, repub_zmq_addr, repub_watermark); else puller = std::make_unique(argv[first_argc], repub_zmq_addr, rcv_watermark, repub_watermark); writer = new StreamWriter(logger,*puller,file_done_zmq_addr, verbose); std::vector sigs{SIGQUIT, SIGINT, SIGTERM, SIGHUP}; setUpUnixSignals(sigs); while (!quitok) writer->Run(); exit(EXIT_SUCCESS); }