Files
Jungfraujoch/broker/jfjoch_broker.cpp
2024-10-07 11:56:40 +02:00

116 lines
3.3 KiB
C++

// Copyright (2019-2023) Paul Scherrer Institute
// Using OpenAPI licensed with Apache License 2.0
#include <vector>
#include <csignal>
#include <fstream>
#include <nlohmann/json.hpp>
#include "../common/Logger.h"
#include "JFJochBrokerHttp.h"
#include "JFJochBrokerParser.h"
static Pistache::Http::Endpoint *httpEndpoint;
static void sigHandler [[noreturn]] (int sig){
switch(sig){
case SIGINT:
case SIGQUIT:
case SIGTERM:
case SIGHUP:
default:
httpEndpoint->shutdown();
break;
}
exit(0);
}
static void setUpUnixSignals(std::vector<int> quitSignals) {
sigset_t blocking_mask;
sigemptyset(&blocking_mask);
for (auto sig : quitSignals)
sigaddset(&blocking_mask, sig);
struct sigaction sa;
sa.sa_handler = sigHandler;
sa.sa_mask = blocking_mask;
sa.sa_flags = 0;
for (auto sig : quitSignals)
sigaction(sig, &sa, nullptr);
}
int main (int argc, char **argv) {
if ((argc == 1) || (argc > 3)) {
std::cout << "Usage ./jfjoch_broker <JSON config> {<TCP http port>}" << std::endl;
exit(EXIT_FAILURE);
}
uint16_t http_port = 5232;
if (argc >= 3) http_port = atoi(argv[2]);
Logger logger("jfjoch_broker");
org::openapitools::server::model::Jfjoch_settings settings;
std::ifstream file(argv[1]);
try {
nlohmann::json input = nlohmann::json::parse(file);
settings = input;
settings.validate();
logger.Info("JSON configuration file read properly");
} catch (const std::exception &e) {
logger.Error("Error reading JSON configuration file: " + std::string(e.what()));
exit(EXIT_FAILURE);
}
std::unique_ptr<JFJochReceiverService> receiver;
std::unique_ptr<ImagePusher> image_pusher = ParseImagePusher(settings);
DiffractionExperiment experiment;
ParseFacilityConfiguration(settings, experiment);
AcquisitionDeviceGroup aq_devices;
ParseAcquisitionDeviceGroup(settings, aq_devices);
experiment.DataStreams(aq_devices.size());
int32_t send_buffer_size_MiB = settings.getImageBufferMiB();
receiver = std::make_unique<JFJochReceiverService>(aq_devices, logger, *image_pusher, send_buffer_size_MiB);
std::string numa_policy = settings.getNumaPolicy();
if (!numa_policy.empty())
receiver->NUMAPolicy(numa_policy);
receiver->NumThreads(settings.getReceiverThreads());
Pistache::Address addr(Pistache::Ipv4::any(), Pistache::Port(http_port));
httpEndpoint = new Pistache::Http::Endpoint((addr));
auto router = std::make_shared<Pistache::Rest::Router>();
auto opts = Pistache::Http::Endpoint::options().threads(8).maxRequestSize(64*1024*1024);
opts.flags(Pistache::Tcp::Options::ReuseAddr);
httpEndpoint->init(opts);
signal(SIGPIPE, SIG_IGN);
std::vector<int> sigs{SIGQUIT, SIGINT, SIGTERM, SIGHUP};
setUpUnixSignals(sigs);
JFJochBrokerHttp broker(experiment, router);
broker.FrontendDirectory(settings.getFrontendDirectory());
for (const auto &d: settings.getDetector())
broker.AddDetectorSetup(ParseDetectorSetup(d));
if (receiver)
broker.Services().Receiver(receiver.get());
httpEndpoint->setHandler(router->handler());
httpEndpoint->serve();
httpEndpoint->shutdown();
}