Implement raw TCP/IP in jfjoch_broker and jfjoch_writer
Some checks failed
Build Packages / build:rpm (rocky8_nocuda) (push) Has been cancelled
Build Packages / build:rpm (rocky9_nocuda) (push) Has been cancelled
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Has been cancelled
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Has been cancelled
Build Packages / build:rpm (rocky8_sls9) (push) Has been cancelled
Build Packages / build:rpm (rocky9_sls9) (push) Has been cancelled
Build Packages / build:rpm (rocky8) (push) Has been cancelled
Build Packages / build:rpm (rocky9) (push) Has been cancelled
Build Packages / build:rpm (ubuntu2204) (push) Has been cancelled
Build Packages / build:rpm (ubuntu2404) (push) Has been cancelled
Build Packages / Generate python client (push) Has been cancelled
Build Packages / Build documentation (push) Has been cancelled
Build Packages / Unit tests (push) Has been cancelled
Build Packages / Create release (push) Has been cancelled

This commit is contained in:
2026-03-01 19:19:33 +01:00
parent 0239854dfe
commit 84432efff4
12 changed files with 88 additions and 8 deletions

View File

@@ -9,6 +9,7 @@
#include "OpenAPIConvert.h"
#include "Detector_type.h"
#include "../image_pusher/NonePusher.h"
#include "../image_pusher/TcpStreamPusher.h"
DetectorGeometryModular ParseStandardDetectorGeometry(const org::openapitools::server::model::Detector &j) {
auto s = j.getStandardGeometry();
@@ -190,10 +191,27 @@ std::unique_ptr<ImagePusher> ParseZMQImagePusher(const org::openapitools::server
return std::move(tmp);
}
std::unique_ptr<ImagePusher> ParseTCPImagePusher(const org::openapitools::server::model::Jfjoch_settings &j) {
if (!j.zeromqIsSet())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Socket settings must be provided");
std::optional<int32_t> send_buffer_size;
if (j.getZeromq().sendBufferSizeIsSet())
send_buffer_size = j.getZeromq().getSendBufferSize();
auto tmp = std::make_unique<TCPStreamPusher>(j.getZeromq().getImageSocket(), send_buffer_size);
if (j.getZeromq().writerNotificationSocketIsSet())
tmp->WriterNotificationSocket(j.getZeromq().getWriterNotificationSocket());
return std::move(tmp);
}
std::unique_ptr<ImagePusher> ParseImagePusher(const org::openapitools::server::model::Jfjoch_settings &j) {
switch (j.getImagePusher().getValue()) {
case org::openapitools::server::model::Image_pusher_type::eImage_pusher_type::ZEROMQ:
return ParseZMQImagePusher(j);
case org::openapitools::server::model::Image_pusher_type::eImage_pusher_type::TCP:
return ParseTCPImagePusher(j);
case org::openapitools::server::model::Image_pusher_type::eImage_pusher_type::HDF5:
return std::make_unique<HDF5FilePusher>();
case org::openapitools::server::model::Image_pusher_type::eImage_pusher_type::NONE:

View File

@@ -84,6 +84,9 @@ void to_json(nlohmann::json& j, const Image_pusher_type& o)
case Image_pusher_type::eImage_pusher_type::CBOR:
j = "CBOR";
break;
case Image_pusher_type::eImage_pusher_type::TCP:
j = "TCP";
break;
case Image_pusher_type::eImage_pusher_type::NONE:
j = "None";
break;
@@ -103,6 +106,9 @@ void from_json(const nlohmann::json& j, Image_pusher_type& o)
else if (s == "CBOR") {
o.setValue(Image_pusher_type::eImage_pusher_type::CBOR);
}
else if (s == "TCP") {
o.setValue(Image_pusher_type::eImage_pusher_type::TCP);
}
else if (s == "None") {
o.setValue(Image_pusher_type::eImage_pusher_type::NONE);
} else {

View File

@@ -41,6 +41,7 @@ public:
ZEROMQ,
HDF5,
CBOR,
TCP,
NONE
};

View File

@@ -1908,7 +1908,7 @@ components:
image_pusher_type:
type: string
default: None
enum: [ZeroMQ, HDF5, CBOR, None]
enum: [ZeroMQ, HDF5, CBOR, TCP, None]
standard_detector_geometry:
type: object
description: Regular rectangular geometry, first module is in the bottom left corner of the detector

File diff suppressed because one or more lines are too long

View File

@@ -9,6 +9,8 @@
* `CBOR` (value: `'CBOR'`)
* `TCP` (value: `'TCP'`)
* `NONE` (value: `'None'`)
[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)

View File

@@ -1,12 +1,12 @@
{
"name": "jungfraujoch-frontend",
"version": "1.0.0-rc.126",
"version": "1.0.0-rc.127",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "jungfraujoch-frontend",
"version": "1.0.0-rc.126",
"version": "1.0.0-rc.127",
"license": "GPL-3.0",
"dependencies": {
"@emotion/react": "^11.10.4",

View File

@@ -7,5 +7,6 @@ export enum image_pusher_type {
ZERO_MQ = 'ZeroMQ',
HDF5 = 'HDF5',
CBOR = 'CBOR',
TCP = 'TCP',
NONE = 'None',
}

View File

@@ -90,6 +90,34 @@ bool TCPStreamPusher::EndDataCollection(const EndMessage &message) {
return ret;
}
std::string TCPStreamPusher::Finalize() {
std::string ret;
if (writer_notification_socket) {
for (size_t i = 0; i < socket.size(); i++) {
auto n = writer_notification_socket->Receive(run_number, run_name);
if (!n)
ret += "Writer " + std::to_string(i) + ": no end notification received within 1 minute from collection end";
else if (static_cast<size_t>(n->socket_number) >= socket.size())
ret += "Writer " + std::to_string(i) + ": mismatch in socket number";
else if (!n->ok)
ret += "Writer " + std::to_string(i) + ": " + n->error;
}
}
return ret;
}
std::string TCPStreamPusher::GetWriterNotificationSocketAddress() const {
if (writer_notification_socket)
return writer_notification_socket->GetEndpointName();
else
return "";
}
TCPStreamPusher &TCPStreamPusher::WriterNotificationSocket(const std::string &addr) {
writer_notification_socket = std::make_unique<ZMQWriterNotificationPuller>(addr, std::chrono::minutes(1));
return *this;
}
std::string TCPStreamPusher::PrintSetup() const {
std::string output = "TCPStream2Pusher: Sending images to sockets: ";
for (const auto &s : socket)

View File

@@ -4,12 +4,15 @@
#pragma once
#include "TcpStreamPusherSocket.h"
#include "ZMQWriterNotificationPuller.h"
class TCPStreamPusher : public ImagePusher {
std::vector<uint8_t> serialization_buffer;
CBORStream2Serializer serializer;
std::vector<std::unique_ptr<TCPStreamPusherSocket>> socket;
std::unique_ptr<ZMQWriterNotificationPuller> writer_notification_socket;
int64_t images_per_file = 1;
uint64_t run_number = 0;
std::string run_name;
@@ -20,11 +23,15 @@ public:
std::optional<size_t> zerocopy_threshold = {},
size_t send_queue_size = 4096);
TCPStreamPusher& WriterNotificationSocket(const std::string& addr);
std::string GetWriterNotificationSocketAddress() const override;
void StartDataCollection(StartMessage& message) override;
bool EndDataCollection(const EndMessage& message) override;
bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
void SendImage(ZeroCopyReturnValue &z) override;
bool SendCalibration(const CompressedImage& message) override;
std::string Finalize() override;
std::string PrintSetup() const override;
};

View File

@@ -27,6 +27,7 @@ public:
std::optional<int32_t> send_buffer_size = {});
ZMQStream2Pusher& WriterNotificationSocket(const std::string& addr);
std::string GetWriterNotificationSocketAddress() const override;
std::vector<std::string> GetAddress();
@@ -40,7 +41,7 @@ public:
bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
std::string Finalize() override;
std::string GetWriterNotificationSocketAddress() const override;
std::string PrintSetup() const override;
};

View File

@@ -8,6 +8,7 @@
#include "StreamWriter.h"
#include "../common/print_license.h"
#include "../image_puller/ZMQImagePuller.h"
#include "../image_puller/TcpImagePuller.h"
static Logger logger("jfjoch_writer");
@@ -18,6 +19,7 @@ void print_usage() {
logger.Info("Usage ./jfjoch_writer {options} <address of the ZeroMQ data source>");
logger.Info("");
logger.Info("Available options:");
logger.Info("-T Use raw TCP/IP instead of ZeroMQ");
logger.Info("-d<int> | --root_dir=<int> Root directory for file writing");
logger.Info("-r<int> | --zmq_repub_port=<int> ZeroMQ port for PUSH socket to republish images");
logger.Info("-f<int> | --zmq_file_port=<int> ZeroMQ port for PUB socket to inform about finalized files");
@@ -80,8 +82,12 @@ int main(int argc, char **argv) {
int option_index = 0;
int opt;
while ((opt = getopt_long(argc, argv, "?hH:r:f:R:d:W:w:v",long_options, &option_index)) != -1 ) {
bool raw_tcp = false;
while ((opt = getopt_long(argc, argv, "?hH:r:f:R:d:W:w:vT",long_options, &option_index)) != -1 ) {
switch (opt) {
case 'T':
raw_tcp = true;
break;
case 'v':
verbose = true;
break;
@@ -149,14 +155,24 @@ int main(int argc, char **argv) {
}
if ((zmq_repub_port < UINT16_MAX) && (zmq_repub_port > 0)) {
if (raw_tcp) {
logger.Error("TCP republishing not supported at the moment");
exit(EXIT_FAILURE);
}
repub_zmq_addr = fmt::format("tcp://0.0.0.0:{:d}", zmq_repub_port);
logger.Info("Images are republished via ZeroMQ PUSH socket {:s}", repub_zmq_addr);
}
ZMQContext context;
ZMQImagePuller puller(argv[first_argc], repub_zmq_addr, rcv_watermark, repub_watermark);
writer = new StreamWriter(logger,puller,file_done_zmq_addr, verbose);
std::unique_ptr<ImagePuller> puller;
if (raw_tcp)
puller = std::make_unique<TCPImagePuller>(argv[first_argc]);
else
puller = std::make_unique<ZMQImagePuller>(argv[first_argc], repub_zmq_addr, rcv_watermark, repub_watermark);
writer = new StreamWriter(logger,*puller,file_done_zmq_addr, verbose);
std::vector<int> sigs{SIGQUIT, SIGINT, SIGTERM, SIGHUP};
setUpUnixSignals(sigs);