// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include #include "../common/Logger.h" #include "JFJochWriterHttp.h" #include "StreamWriter.h" #include "../common/print_license.h" #include "../image_puller/ZMQImagePuller.h" #include #include static Logger logger("jfjoch_writer"); static Pistache::Http::Endpoint *httpEndpoint; static StreamWriter *writer; volatile static bool quitok = false; void print_usage() { logger.Info("Usage ./jfjoch_writer {options}
"); logger.Info(""); logger.Info("Available options:"); logger.Info("-R | --root_dir= Root directory for file writing"); logger.Info("-H | --http_port= HTTP port for statistics"); logger.Info("-r | --zmq_repub_port= ZeroMQ port for PUSH socket to republish images"); logger.Info("-f | --zmq_file_port= ZeroMQ port for PUB socket to inform about finalized files"); logger.Info("-w | --rcv_watermark= Receiving ZeroMQ socket watermark (default = 100)"); logger.Info("-W | --repub_watermark= Republish ZeroMQ socket watermark (default = 1000)"); logger.Info("-v | --verbose Verbose output"); logger.Info(""); } static void sigHandler (int sig){ switch(sig){ case SIGINT: case SIGQUIT: case SIGTERM: case SIGHUP: default: httpEndpoint->shutdown(); quitok = true; writer->Cancel(); break; } } static void setUpUnixSignals(std::vector quitSignals) { sigset_t blocking_mask; sigemptyset(&blocking_mask); for (auto sig : quitSignals) sigaddset(&blocking_mask, sig); struct sigaction sa; sa.sa_handler = sigHandler; sa.sa_mask = blocking_mask; sa.sa_flags = 0; for (auto sig : quitSignals) sigaction(sig, &sa, nullptr); } int main(int argc, char **argv) { RegisterHDF5Filter(); print_license("jfjoch_writer"); int32_t http_port = 5234; int32_t zmq_repub_port = -1; int32_t zmq_file_port = -1; std::string root_dir = ""; std::optional rcv_watermark; std::optional repub_watermark; bool verbose = false; static struct option long_options[] = { {"root_dir", required_argument, 0, 'R'}, {"http_port", required_argument, 0, 'H'}, {"zmq_repub_port", required_argument, 0, 'r'}, {"zmq_file_port", required_argument, 0, 'f'}, {"rcv_watermark", required_argument, 0, 'w'}, {"repub_watermark", required_argument, 0, 'W'}, {"verbose", required_argument, 0, 'v'}, {0, 0, 0, 0} }; int option_index = 0; int opt; while ((opt = getopt_long(argc, argv, "?hH:r:f:R:W:w:v",long_options, &option_index)) != -1 ) { switch (opt) { case 'v': verbose = true; break; case 'H': http_port = atoi(optarg); break; case 'r': zmq_repub_port = atoi(optarg); break; case 'w': rcv_watermark = atoi(optarg); if (rcv_watermark.value() <= 0) { std::cerr << "Watermark must be positive number" << std::endl; exit(EXIT_FAILURE); } break; case 'W': repub_watermark = atoi(optarg); if (repub_watermark.value() <= 0) { std::cerr << "Watermark must be positive number" << std::endl; exit(EXIT_FAILURE); } break; case 'f': zmq_file_port = atoi(optarg); break; case 'R': root_dir = std::string(optarg); break; case '?': case 'h': print_usage(); exit(EXIT_SUCCESS); default: print_usage(); exit(EXIT_FAILURE); } } int first_argc = optind; if ((argc - first_argc != 1)) { print_usage(); exit(EXIT_FAILURE); } if ((http_port <= 0) || (http_port >= UINT16_MAX)) { logger.Error("Http port must be between 1 - 65534"); exit(EXIT_FAILURE); } logger.Info("HTTP service listening on port {}", http_port); if (!root_dir.empty()) { try { std::filesystem::current_path(root_dir); } catch (const std::filesystem::filesystem_error &e) { logger.Error("Cannot change path to {}: {}", root_dir, e.what()); exit(EXIT_FAILURE); } } logger.Info("Current path {}", std::filesystem::current_path().string()); std::string repub_zmq_addr, file_done_zmq_addr; if ((zmq_file_port < UINT16_MAX) && (zmq_file_port > 0)) { file_done_zmq_addr = fmt::format("tcp://0.0.0.0:{:d}", zmq_file_port); logger.Info("Information on closed files is published via ZeroMQ PUB socket {:s}", file_done_zmq_addr); } if ((zmq_repub_port < UINT16_MAX) && (zmq_repub_port > 0)) { repub_zmq_addr = fmt::format("tcp://0.0.0.0:{:d}", zmq_repub_port); logger.Info("Images are republished via ZeroMQ PUSH socket {:s}", repub_zmq_addr); } ZMQContext context; Pistache::Address addr(Pistache::Ipv4::any(), Pistache::Port(http_port)); ZMQImagePuller puller(argv[first_argc], repub_zmq_addr, rcv_watermark, repub_watermark); writer = new StreamWriter(logger,puller,file_done_zmq_addr, verbose); httpEndpoint = new Pistache::Http::Endpoint(addr); auto router = std::make_shared(); auto opts = Pistache::Http::Endpoint::options().threads(4); opts.flags(Pistache::Tcp::Options::ReuseAddr); httpEndpoint->init(opts); std::vector sigs{SIGQUIT, SIGINT, SIGTERM, SIGHUP}; setUpUnixSignals(sigs); std::thread writer_thread([] { while (!quitok) writer->Run(); }); JFJochWriterHttp writer_http(*writer, router); httpEndpoint->setHandler(router->handler()); httpEndpoint->serve(); writer_thread.join(); logger.Info("Clean stop"); exit(EXIT_SUCCESS); }