// Copyright (2019-2023) Paul Scherrer Institute #include "nlohmann/json.hpp" #include #include "../grpc/gRPCServer_Template.h" #include "../common/ZMQImagePusher.h" #include "PCIExpressDevice.h" #include "MlxRawEthDevice.h" #include "LinuxSocketDevice.h" #include "JFJochReceiverService.h" #include "HLSSimulatedDevice.h" #include "../common/NetworkAddressConvert.h" AcquisitionDevice *SetupAcquisitionDevice(const nlohmann::json &input, uint16_t data_stream) { AcquisitionDevice *ret; int16_t numa_node = -1; if (input.contains("numa_node")) numa_node = input["numa_node"]; int64_t frame_buffer_size = UINT16_MAX; if (input.contains("frame_buffer_size")) frame_buffer_size = input["frame_buffer_size"]; uint16_t pci_slot = data_stream; if (input.contains("id")) pci_slot = input["id"]; uint32_t ipv4_addr = IPv4AddressFromStr(input["ipv4_addr"].get()); if (input.contains("type") && (input["type"] == "software")) ret = new HLSSimulatedDevice(data_stream, frame_buffer_size, numa_node); else if (input.contains("type") && (input["type"] == "software")) { auto linux_dev = new LinuxSocketDevice(ipv4_addr, input["udp_port"].get(), data_stream, frame_buffer_size, numa_node); ret = linux_dev; } else if (input.contains("type") && (input["type"] == "pcie")) { auto pci_dev = new PCIExpressDevice(data_stream, pci_slot); pci_dev->SetIPv4Address(ipv4_addr); if (input.contains("mac_addr")) pci_dev->SetMACAddress(MacAddressFromStr(input["mac_addr"].get())); else pci_dev->SetDefaultMAC(); if (input.contains("custom_test_frame")) { std::vector tmp(RAW_MODULE_SIZE); auto filename = input["custom_test_frame"].get(); std::fstream file(filename.c_str(), std::fstream::in | std::fstream::binary); file.read((char *) tmp.data(), RAW_MODULE_SIZE * sizeof(uint16_t)); pci_dev->SetInternalGeneratorFrame(tmp); } ret = pci_dev; } #ifdef JFJOCH_USE_IBVERBS else if (input.contains("type") && (input["type"] == "mlx_raw_eth")) { auto mlx_dev = new MlxRawEthDevice(pci_slot, data_stream, frame_buffer_size, numa_node); mlx_dev->SetIPv4Address(ipv4_addr); mlx_dev->SetMACAddress(MacAddressFromStr(input["mac_addr"].get())); ret = mlx_dev; } #endif else throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Unsupported device type"); return ret; } int main(int argc, char **argv) { if ((argc < 2) || (argc > 3)) { std::cout << "Usage ./jfjoch_receiver {}" << std::endl; exit(EXIT_FAILURE); } std::string log_file; if (argc >= 3) log_file = argv[2]; Logger logger("jfjoch_receiver", log_file); nlohmann::json input; std::ifstream file(argv[1]); try { input = nlohmann::json::parse(file); } catch (const nlohmann::json::exception &e) { logger.Error("JSON Parsing exception: " + std::string(e.what())); exit(EXIT_FAILURE); } std::string grpc_addr; try { grpc_addr = input["grpc_addr"]; } catch (...) { throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "grpc_addr is compulsory parameter"); } bool verbose = false; if (input.contains("verbose") && (input["verbose"])) verbose = true; int32_t zmq_threads = 4; if (input.contains("zmq_threads")) zmq_threads = input["zmq_threads"]; ZMQContext context; context.NumThreads(zmq_threads); logger.Verbose(verbose); std::vector> aq_devices; for (int i = 0; i < input["device"].size(); i++) { auto ptr = SetupAcquisitionDevice(input["device"][i], i); ptr->EnableLogging(&logger); aq_devices.emplace_back(ptr); } std::vector aq_devices_ptr; for (const auto &i: aq_devices) { if (verbose) i->EnableLogging(&logger); aq_devices_ptr.push_back(i.get()); } logger.Info("Enabled acquisition device count: " + std::to_string(aq_devices.size())); int32_t send_buffer_size = -1; if (input.contains("tcp_send_buffer_size")) send_buffer_size = input["tcp_send_buffer_size"]; int32_t send_buffer_high_watermark = -1; if (input.contains("zmq_send_high_watermark")) send_buffer_high_watermark = input["zmq_send_high_watermark"]; std::vector zmq_addr; if (input.contains("image_zmq_addr") && input["image_zmq_addr"].is_array()) { for (const auto &s: input["image_zmq_addr"]) zmq_addr.push_back(s); } ZMQImagePusher pusher(zmq_addr, send_buffer_high_watermark, send_buffer_size); JFJochReceiverService service(aq_devices_ptr, logger, pusher); std::unique_ptr preview, preview_indexed; if (input.contains("preview_zmq_addr")) { preview = std::make_unique(context, input["preview_zmq_addr"]); service.PreviewPublisher(preview.get()); logger.Info("Preview available on ZMQ addr " + input["preview_zmq_addr"].get()); } if (input.contains("compression_threads")) { service.NumThreads(input["compression_threads"].get()); logger.Info("Compression threads {}", input["compression_threads"].get()); } if (input.contains("numa_policy")) { service.NUMAPolicy(input["numa_policy"].get()); logger.Info("NUMA policy {}", input["numa_policy"].get()); } if (input.contains("send_buffer_count")) { service.SendBufferCount(input["send_buffer_count"].get()); logger.Info("Send buffer count {}", input["send_buffer_count"].get()); } auto server = gRPCServer(grpc_addr, service); logger.Info("gRPC configuration listening on address " + grpc_addr); logger.Info("Started"); server->Wait(); }