Files
Jungfraujoch/broker/jfjoch_broker.cpp

82 lines
2.6 KiB
C++

// Copyright (2019-2023) Paul Scherrer Institute
#include <fstream>
#include <nlohmann/json.hpp>
#include "../common/Logger.h"
#include "../common/NetworkAddressConvert.h"
#include "JFJochBroker.h"
#include "../grpc/gRPCServer_Template.h"
#include "JFJochBrokerParser.h"
#include "../common/ZMQImagePusher.h"
int main (int argc, char **argv) {
if (argc > 3) {
std::cout << "Usage ./jfjoch_broker {<JSON config> {<TCP gRPC port>}}" << std::endl;
exit(EXIT_FAILURE);
}
uint16_t grpc_port = 5232;
if (argc >= 3) grpc_port = atoi(argv[2]);
Logger logger("jfjoch_broker");
nlohmann::json input;
if (argc > 1) {
std::ifstream file(argv[1]);
try {
input = nlohmann::json::parse(file);
} catch (const nlohmann::json::exception &e) {
logger.Error("JSON Parsing exception: " + std::string(e.what()));
exit(EXIT_FAILURE);
}
}
std::unique_ptr<JFJochReceiverService> receiver;
std::unique_ptr<ZMQImagePusher> image_pusher;
std::unique_ptr<ZMQPreviewPublisher> preview_publisher;
ZMQContext context;
DiffractionExperiment experiment;
experiment.MaskChipEdges(true).MaskModuleEdges(true);
AcquisitionDeviceGroup aq_devices;
ParseAcquisitionDeviceGroup(input, "devices", aq_devices);
if (aq_devices.size() > 0) {
experiment.DataStreams(aq_devices.size());
image_pusher = std::make_unique<ZMQImagePusher>(context, ParseStringArray(input, "zmq_image_addr"));
receiver = std::make_unique<JFJochReceiverService>(aq_devices, logger, *image_pusher);
std::string zmq_preview_addr = ParseString(input, "zmq_preview_addr");
if (!zmq_preview_addr.empty()) {
preview_publisher = std::make_unique<ZMQPreviewPublisher>(context, zmq_preview_addr);
receiver->PreviewPublisher(preview_publisher.get());
}
std::string numa_policy = ParseString(input, "numa_policy");
if (!numa_policy.empty())
receiver->NUMAPolicy(numa_policy);
receiver->NumThreads(ParseInt64(input, "receiver_threads", 64));
receiver->SendBufferCount(ParseInt64(input, "send_buffer_count", 128));
}
ParseFacilityConfiguration(input, "cfg", experiment);
JFJochBroker broker(experiment);
ParseDetectorSetup(input, "detectors", broker);
if (receiver)
broker.Services().Receiver(receiver.get());
broker.Services().Detector();
std::string grpc_addr = "0.0.0.0:" + std::to_string(grpc_port);
auto server = gRPCServer(grpc_addr, broker);
logger.Info("gRPC configuration listening on address " + grpc_addr);
server->Wait();
}