From 2914d77dbd85046642e20c4d4579aaba611bdb2a Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Wed, 4 Mar 2026 15:30:07 +0100 Subject: [PATCH] TCP: Improve behavior and documentation --- docs/{ZEROMQ_STREAM.md => IMAGE_STREAM.md} | 90 +++++++++++++++++++--- docs/JFJOCH_BROKER.md | 6 +- docs/index.rst | 2 +- image_pusher/HDF5FilePusher.cpp | 30 +++++++- image_pusher/HDF5FilePusher.h | 6 ++ image_pusher/ImagePusher.h | 10 +++ image_pusher/TCPStreamPusher.cpp | 13 ++++ image_pusher/TCPStreamPusher.h | 2 + image_pusher/TCPStreamPusherSocket.cpp | 39 ++++++++-- image_pusher/TCPStreamPusherSocket.h | 8 ++ writer/StreamWriter.cpp | 6 +- 11 files changed, 186 insertions(+), 26 deletions(-) rename docs/{ZEROMQ_STREAM.md => IMAGE_STREAM.md} (58%) diff --git a/docs/ZEROMQ_STREAM.md b/docs/IMAGE_STREAM.md similarity index 58% rename from docs/ZEROMQ_STREAM.md rename to docs/IMAGE_STREAM.md index be0dda95..5ef3def3 100644 --- a/docs/ZEROMQ_STREAM.md +++ b/docs/IMAGE_STREAM.md @@ -1,9 +1,9 @@ -# ZeroMQ socket +# Data streams -Jungfraujoch process (`jfjoch_broker`) operates three ZeroMQ outputs. +Jungfraujoch process (`jfjoch_broker`) operates three outputs. All three can be operated/enabled independently. These are: -* **Image** - all the images including metadata (PUSH socket) +* **Image** - all the images including metadata (ZeroMQ PUSH socket or custom TCP/IP socket) * **Preview** - images with metadata at a reduced frame rate (PUB socket) * **Metadata** - only metadata for all the images, bundled into packages (PUB socket) @@ -11,25 +11,95 @@ These are: Images (with metadata) are serialized as CBOR [image message](CBOR.md#image-message). The stream will also include CBOR [start message](CBOR.md#start-message), [calibration messages](CBOR.md#calibration-message) and [end message](CBOR.md#end-message) with run metadata. -Image stream can be split into multiple sockets to increase performance, in this case images will be split according to file number to which the image belongs. -All sockets will forward start and end messages. -Only first socket will forward calibration messages and will be marked to write master file. +If `file_prefix` is not provided for a data collection, images won't be sent to image stream (or its HDF5/CBOR replacements). + +### Splitting image stream +Image stream can be split into multiple sockets to increase performance, in this case images will be split according to file number to which the image belongs. +All sockets will forward start and end messages. Only first socket will forward calibration messages and will be marked to write master file. + +### ZeroMQ image stream This is using PUSH ZeroMQ socket(s). It should be strictly avoided to have multiple receivers connected to one PUSH ZeroMQ socket. ZeroMQ will send the images in a round-robin basis to the receivers. In this case start and end messages will end up only with one receiver. Instead, Jungfraujoch feature of multiple sockets should be used. -Image stream can be replaced with direct HDF5 writer and CBOR dump image pushers, it can be disabled by select "None" image pusher for all the measurements. - -If `file_prefix` is not provided for a data collection, images won't be sent to image stream (or its HDF5/CBOR replacements). - Behavior is as following: * Start message is sent with timeout of 5s. If within the time the message cannot be put in the outgoing queue or there is no connected puller exception is thrown - stop data collection with error due to absence of a writer. * Images are sent in non-blocking way and without timeout. * End message is sent with timeout of 5s. No error is reported. +The format is generally interchangeable with DECTRIS Stream2 format. + +### TCP/IP image stream +This is using TCP/IP socket(s) with a fixed binary frame header followed by payload bytes. +This format was introduced to Jungfraujoch as an alternative to ZeroMQ image stream. It allows two-way communication +between the data collection and the writer, and is therefore more robust than ZeroMQ. + +Payloads for `START`, `DATA`, `CALIBRATION` and `END` frames are CBOR messages, equivalent in content to the ZeroMQ image stream messages. +`ACK` and `CANCEL` are control frames. + +Each data collection is treated as a separate TCP streaming session: writers establish fresh connections for that collection (one connection per configured socket), then perform `START` → `DATA/CALIBRATION` → `END` (or `CANCEL` on startup rollback). + +For each frame: +1. Read one `TcpFrameHeader` (fixed size). +2. Validate `magic` and `version`. +3. Read `payload_size` bytes (if non-zero). + +When image stream is split into multiple sockets: +- `START` and `END` are sent on all sockets, +- `CALIBRATION` is sent only on socket 0, +- `DATA` frames are distributed by file grouping (`images_per_file`). + +ACK handling is mandatory for correct operation: +- `START` **must** be acknowledged (`ACK` with `ack_for=START`) on each socket, otherwise collection start fails. +- `END` **must** be acknowledged (`ack_for=END`) on each socket for successful completion. +- `CANCEL` is acknowledged during rollback paths. +- `DATA` **must** be ackonwledged for every frame and should be used to report fatal downstream errors immediately. + +On Linux, large payload transmission can use kernel TCP zero-copy (`SO_ZEROCOPY`/`MSG_ZEROCOPY`) when enabled; below threshold or when unavailable, transfer falls back to normal `send()` behavior. + +#### Frame types + +| Value | Name | Purpose | +|---:|---|---| +| 1 | `START` | Start-of-run metadata | +| 2 | `DATA` | One image payload | +| 3 | `CALIBRATION` | Calibration payload | +| 4 | `END` | End-of-run metadata | +| 5 | `ACK` | Acknowledgement / error reporting | +| 6 | `CANCEL` | Cancel run initialization/stream | + +#### TCP frame header (`TcpFrameHeader`) + +| Field | Type | Description | +|---|---|---| +| `magic` | `uint32_t` | Protocol magic (`0x4A464A54`, `"JFJT"`) | +| `version` | `uint16_t` | Protocol version (`2`) | +| `type` | `uint16_t` | Frame type (`START`/`DATA`/`CALIBRATION`/`END`/`ACK`/`CANCEL`) | +| `image_number` | `uint64_t` | Image index for `DATA` frames | +| `payload_size` | `uint64_t` | Number of payload bytes after header | +| `socket_number` | `uint32_t` | Socket index in split-stream mode | +| `flags` | `uint32_t` | ACK flags (`OK`, `FATAL`, `HAS_ERROR_TEXT`) | +| `run_number` | `uint64_t` | Run identifier | +| `ack_processed_images` | `uint32_t` | In `ACK`: number of images processed by receiver | +| `ack_code` | `uint16_t` | In `ACK`: error/status code | +| `ack_for` | `uint16_t` | In `ACK`: frame type being acknowledged | +| `reserved` | `uint64_t[2]` | Reserved, set to `0` | + +#### ACK semantics + +- `ACK` frames use `ack_for` to indicate which frame type is acknowledged. +- `flags`: + - `OK`: operation accepted/successful, + - `FATAL`: receiver reports unrecoverable error (primarily for `DATA`), + - `HAS_ERROR_TEXT`: ACK payload contains UTF-8 error text. +- `ack_code` can be used to categorize errors (for example I/O, no space left, permission denied, protocol error). + +### Image stream replacement +Image stream can be replaced with direct HDF5 writer and CBOR dump image pushers, it can be disabled by select "None" image pusher for all the measurements. + ## Writer notification socket Normally ZeroMQ is asynchronous. When `jfjoch_broker` is sending messages via ZeroMQ image stream, it doesn't know if these were properly handled downstream, e.g., written to disk. For this reason a writer notification socket is introduced. diff --git a/docs/JFJOCH_BROKER.md b/docs/JFJOCH_BROKER.md index 073c32fb..d7e58f0b 100644 --- a/docs/JFJOCH_BROKER.md +++ b/docs/JFJOCH_BROKER.md @@ -10,11 +10,11 @@ Broker operates four external interfaces. **Image stream** ZeroMQ PULL socket with CBOR serialization is used to send images, metadata and processing results for writing or downstream -processing. See details [here](ZEROMQ_STREAM.md#image-stream). +processing. See details [here](IMAGE_STREAM.md#image-stream). -**Preview stream** ZeroMQ PUB socket, as above but limited to subset of frames (1 image/s by default). See details [here](ZEROMQ_STREAM.md#preview-stream). +**Preview stream** ZeroMQ PUB socket, as above but limited to subset of frames (1 image/s by default). See details [here](IMAGE_STREAM.md#preview-stream). -**Metadata stream** ZeroMQ PUB socket, contains metadata for all the images, with bundling. See details [here](ZEROMQ_STREAM.md#metadata-stream). +**Metadata stream** ZeroMQ PUB socket, contains metadata for all the images, with bundling. See details [here](IMAGE_STREAM.md#metadata-stream). **Configuration, status and results interface** HTTP/REST interface described in the OpenAPI format. Description of the API is presented in the [OpenAPI description](../broker/redoc-static.html). diff --git a/docs/index.rst b/docs/index.rst index 9d4e4a8e..ad05a795 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -53,7 +53,7 @@ Jungfraujoch is distributed under the GPLv3 license. OPENAPI OPENAPI_SPECS CBOR - ZEROMQ_STREAM + IMAGE_STREAM PIXEL_MASK WEB_FRONTEND TESTS diff --git a/image_pusher/HDF5FilePusher.cpp b/image_pusher/HDF5FilePusher.cpp index 7af63e68..e3b68ee7 100644 --- a/image_pusher/HDF5FilePusher.cpp +++ b/image_pusher/HDF5FilePusher.cpp @@ -10,6 +10,9 @@ void HDF5FilePusher::StartDataCollection(StartMessage &message) { throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Image pusher is already writing images"); writer = std::make_unique(message); writer_future = std::async(std::launch::async, &HDF5FilePusher::WriterThread, this); + images_written = 0; + images_err = 0; + last_processed_image = 0; } bool HDF5FilePusher::EndDataCollection(const EndMessage &message) { @@ -33,9 +36,16 @@ bool HDF5FilePusher::SendImage(const uint8_t *image_data, size_t image_size, int throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Image pusher not ready for sending"); auto deserialized = CBORStream2Deserialize(image_data, image_size); - if (deserialized->data_message) - writer->Write(*deserialized->data_message); - else + if (deserialized->data_message) { + try { + writer->Write(*deserialized->data_message); + images_written++; + if (image_number > last_processed_image) + last_processed_image = image_number; + } catch (const JFJochException &e) { + images_err++; + } + } else throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "HDF5FilePusher::SendImage accepts only data image"); return true; @@ -70,3 +80,17 @@ std::string HDF5FilePusher::PrintSetup() const { std::filesystem::path currentPath = std::filesystem::current_path(); return "HDF5FilePusher: Images are written directly to file in base directory " + currentPath.string(); } + +std::optional HDF5FilePusher::GetAckProgress() const { + uint64_t ack_ok = images_written; + uint64_t ack_bad = images_err; + uint64_t ack_total = ack_ok + ack_bad; + uint64_t last = last_processed_image; + + return ImagePusherAckProgress{ + .data_acked_ok = ack_ok, + .data_acked_bad = ack_bad, + .data_acked_total = ack_total, + .last_processed_images = last + }; +} diff --git a/image_pusher/HDF5FilePusher.h b/image_pusher/HDF5FilePusher.h index 796df4ff..c475fcf8 100644 --- a/image_pusher/HDF5FilePusher.h +++ b/image_pusher/HDF5FilePusher.h @@ -17,6 +17,10 @@ class HDF5FilePusher : public ImagePusher { std::future writer_future; ThreadSafeFIFO writer_queue; void WriterThread(); + + std::atomic images_written = 0; + std::atomic images_err = 0; + std::atomic last_processed_image = 0; public: // Thread safety: StartDataCollection, EndDataCollection and SendCalibration must run poorly in serial context // SendImage can be executed in parallel @@ -27,6 +31,8 @@ public: bool SendCalibration(const CompressedImage &message) override; std::string PrintSetup() const override; + + std::optional GetAckProgress() const override; }; diff --git a/image_pusher/ImagePusher.h b/image_pusher/ImagePusher.h index be030171..455dd032 100644 --- a/image_pusher/ImagePusher.h +++ b/image_pusher/ImagePusher.h @@ -20,6 +20,15 @@ struct ImagePusherQueueElement { bool end; }; +struct ImagePusherAckProgress { + uint64_t data_sent = 0; + uint64_t data_acked_ok = 0; + uint64_t data_acked_bad = 0; + uint64_t data_acked_total = 0; + uint64_t data_ack_pending = 0; + uint64_t last_processed_images = 0; +}; + void PrepareCBORImage(DataMessage& message, const DiffractionExperiment &experiment, void *image, size_t image_size); @@ -35,6 +44,7 @@ public: virtual std::string GetWriterNotificationSocketAddress() const; virtual ~ImagePusher() = default; virtual std::string PrintSetup() const = 0; + virtual std::optional GetAckProgress() const { return std::nullopt; } }; diff --git a/image_pusher/TCPStreamPusher.cpp b/image_pusher/TCPStreamPusher.cpp index 8ac57279..b0714bea 100644 --- a/image_pusher/TCPStreamPusher.cpp +++ b/image_pusher/TCPStreamPusher.cpp @@ -167,3 +167,16 @@ bool TCPStreamPusher::SendCalibration(const CompressedImage &message) { serializer.SerializeCalibration(message); return socket[0]->Send(serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::CALIBRATION); } + +std::optional TCPStreamPusher::GetAckProgress() const { + ImagePusherAckProgress out; + for (const auto &s : socket) { + auto p = s->GetDataAckProgress(); + out.data_sent += p.data_sent; + out.data_acked_ok += p.data_acked_ok; + out.data_acked_bad += p.data_acked_bad; + out.data_acked_total += p.data_acked_total; + out.data_ack_pending += p.data_ack_pending; + } + return out; +} diff --git a/image_pusher/TCPStreamPusher.h b/image_pusher/TCPStreamPusher.h index a29c6115..c9c4d631 100644 --- a/image_pusher/TCPStreamPusher.h +++ b/image_pusher/TCPStreamPusher.h @@ -29,4 +29,6 @@ public: std::string Finalize() override; std::string PrintSetup() const override; + + std::optional GetAckProgress() const override; }; \ No newline at end of file diff --git a/image_pusher/TCPStreamPusherSocket.cpp b/image_pusher/TCPStreamPusherSocket.cpp index 7040dfc3..d39ebd93 100644 --- a/image_pusher/TCPStreamPusherSocket.cpp +++ b/image_pusher/TCPStreamPusherSocket.cpp @@ -307,6 +307,10 @@ bool TCPStreamPusherSocket::SendFrame(const uint8_t *data, size_t size, TCPFrame if (z) z->release(); } + + if (ok && type == TCPFrameType::DATA) + data_sent.fetch_add(1, std::memory_order_relaxed); + return ok; } @@ -454,11 +458,19 @@ void TCPStreamPusherSocket::AckThread() { cancel_ack_ok = ok; if (!ok && error_text.empty()) last_ack_error = "CANCEL rejected"; - } else if (ack_for == TCPFrameType::DATA && (!ok || fatal)) { - broken = true; - if (error_text.empty()) - last_ack_error = "DATA fatal ACK"; - logger.Error("Received fatal DATA ACK on " + endpoint + ": " + last_ack_error); + } else if (ack_for == TCPFrameType::DATA) { + data_acked_total.fetch_add(1, std::memory_order_relaxed); + last_processed_images.store(h.ack_processed_images, std::memory_order_relaxed); + + if (ok && !fatal) { + data_acked_ok.fetch_add(1, std::memory_order_relaxed); + } else { + data_acked_bad.fetch_add(1, std::memory_order_relaxed); + broken = true; // mandatory DATA ACK mode: bad DATA ACK breaks stream + if (error_text.empty()) + last_ack_error = "DATA ACK failed"; + logger.Error("Received failing DATA ACK on " + endpoint + ": " + last_ack_error); + } } } ack_cv.notify_all(); @@ -481,6 +493,12 @@ void TCPStreamPusherSocket::StartWriterThread() { last_ack_code = TCPAckCode::None; } + data_sent.store(0, std::memory_order_relaxed); + data_acked_ok.store(0, std::memory_order_relaxed); + data_acked_bad.store(0, std::memory_order_relaxed); + data_acked_total.store(0, std::memory_order_relaxed); + last_processed_images.store(0, std::memory_order_relaxed); + active = true; send_future = std::async(std::launch::async, &TCPStreamPusherSocket::WriterThread, this); completion_future = std::async(std::launch::async, &TCPStreamPusherSocket::CompletionThread, this); @@ -572,3 +590,14 @@ std::string TCPStreamPusherSocket::GetLastAckError() const { std::unique_lock ul(ack_state_mutex); return last_ack_error; } + +ImagePusherAckProgress TCPStreamPusherSocket::GetDataAckProgress() const { + ImagePusherAckProgress p; + p.data_sent = data_sent.load(std::memory_order_relaxed); + p.data_acked_ok = data_acked_ok.load(std::memory_order_relaxed); + p.data_acked_bad = data_acked_bad.load(std::memory_order_relaxed); + p.data_acked_total = data_acked_total.load(std::memory_order_relaxed); + p.data_ack_pending = (p.data_sent >= p.data_acked_total) ? (p.data_sent - p.data_acked_total) : 0; + p.last_processed_images = last_processed_images.load(std::memory_order_relaxed); + return p; +} \ No newline at end of file diff --git a/image_pusher/TCPStreamPusherSocket.h b/image_pusher/TCPStreamPusherSocket.h index 80350ef7..d15a23aa 100644 --- a/image_pusher/TCPStreamPusherSocket.h +++ b/image_pusher/TCPStreamPusherSocket.h @@ -58,6 +58,12 @@ class TCPStreamPusherSocket { Logger logger{"TCPStream2PusherSocket"}; + std::atomic data_sent{0}; + std::atomic data_acked_ok{0}; + std::atomic data_acked_bad{0}; + std::atomic data_acked_total{0}; + std::atomic last_processed_images{0}; + void WriterThread(); void CompletionThread(); void AckThread(); @@ -95,4 +101,6 @@ public: bool IsBroken() const; std::string GetLastAckError() const; + + ImagePusherAckProgress GetDataAckProgress() const; }; diff --git a/writer/StreamWriter.cpp b/writer/StreamWriter.cpp index ff7e7440..4a2cbc0d 100644 --- a/writer/StreamWriter.cpp +++ b/writer/StreamWriter.cpp @@ -128,15 +128,13 @@ void StreamWriter::ProcessDataImage() { processed_image_size += image_puller_output.cbor->data_message->image.GetCompressedSize(); if (verbose) logger.Info("Written"); + NotifyTcpAck(TCPFrameType::DATA, true, false, TCPAckCode::None); } catch (const JFJochException &e) { logger.ErrorException(e); logger.Warning("Error writing image - switching to error state"); state = StreamWriterState::Error; err = e.what(); - if (!tcp_data_fatal_sent) { - tcp_data_fatal_sent = true; - NotifyTcpAck(TCPFrameType::DATA, false, true, TCPAckCode::DataWriteFailed, err); - } + NotifyTcpAck(TCPFrameType::DATA, false, true, TCPAckCode::DataWriteFailed, err); } break; case StreamWriterState::Error: