TCP: Improve behavior and documentation

This commit is contained in:
2026-03-04 15:30:07 +01:00
parent 5345b41e16
commit 2914d77dbd
11 changed files with 186 additions and 26 deletions
+80 -10
View File
@@ -1,9 +1,9 @@
# ZeroMQ socket
# Data streams
Jungfraujoch process (`jfjoch_broker`) operates three ZeroMQ outputs.
Jungfraujoch process (`jfjoch_broker`) operates three outputs.
All three can be operated/enabled independently.
These are:
* **Image** - all the images including metadata (PUSH socket)
* **Image** - all the images including metadata (ZeroMQ PUSH socket or custom TCP/IP socket)
* **Preview** - images with metadata at a reduced frame rate (PUB socket)
* **Metadata** - only metadata for all the images, bundled into packages (PUB socket)
@@ -11,25 +11,95 @@ These are:
Images (with metadata) are serialized as CBOR [image message](CBOR.md#image-message).
The stream will also include CBOR [start message](CBOR.md#start-message), [calibration messages](CBOR.md#calibration-message) and [end message](CBOR.md#end-message) with run metadata.
Image stream can be split into multiple sockets to increase performance, in this case images will be split according to file number to which the image belongs.
All sockets will forward start and end messages.
Only first socket will forward calibration messages and will be marked to write master file.
If `file_prefix` is not provided for a data collection, images won't be sent to image stream (or its HDF5/CBOR replacements).
### Splitting image stream
Image stream can be split into multiple sockets to increase performance, in this case images will be split according to file number to which the image belongs.
All sockets will forward start and end messages. Only first socket will forward calibration messages and will be marked to write master file.
### ZeroMQ image stream
This is using PUSH ZeroMQ socket(s).
It should be strictly avoided to have multiple receivers connected to one PUSH ZeroMQ socket.
ZeroMQ will send the images in a round-robin basis to the receivers.
In this case start and end messages will end up only with one receiver.
Instead, Jungfraujoch feature of multiple sockets should be used.
Image stream can be replaced with direct HDF5 writer and CBOR dump image pushers, it can be disabled by select "None" image pusher for all the measurements.
If `file_prefix` is not provided for a data collection, images won't be sent to image stream (or its HDF5/CBOR replacements).
Behavior is as following:
* Start message is sent with timeout of 5s. If within the time the message cannot be put in the outgoing queue or there is no connected puller exception is thrown - stop data collection with error due to absence of a writer.
* Images are sent in non-blocking way and without timeout.
* End message is sent with timeout of 5s. No error is reported.
The format is generally interchangeable with DECTRIS Stream2 format.
### TCP/IP image stream
This is using TCP/IP socket(s) with a fixed binary frame header followed by payload bytes.
This format was introduced to Jungfraujoch as an alternative to ZeroMQ image stream. It allows two-way communication
between the data collection and the writer, and is therefore more robust than ZeroMQ.
Payloads for `START`, `DATA`, `CALIBRATION` and `END` frames are CBOR messages, equivalent in content to the ZeroMQ image stream messages.
`ACK` and `CANCEL` are control frames.
Each data collection is treated as a separate TCP streaming session: writers establish fresh connections for that collection (one connection per configured socket), then perform `START``DATA/CALIBRATION``END` (or `CANCEL` on startup rollback).
For each frame:
1. Read one `TcpFrameHeader` (fixed size).
2. Validate `magic` and `version`.
3. Read `payload_size` bytes (if non-zero).
When image stream is split into multiple sockets:
- `START` and `END` are sent on all sockets,
- `CALIBRATION` is sent only on socket 0,
- `DATA` frames are distributed by file grouping (`images_per_file`).
ACK handling is mandatory for correct operation:
- `START` **must** be acknowledged (`ACK` with `ack_for=START`) on each socket, otherwise collection start fails.
- `END` **must** be acknowledged (`ack_for=END`) on each socket for successful completion.
- `CANCEL` is acknowledged during rollback paths.
- `DATA` **must** be ackonwledged for every frame and should be used to report fatal downstream errors immediately.
On Linux, large payload transmission can use kernel TCP zero-copy (`SO_ZEROCOPY`/`MSG_ZEROCOPY`) when enabled; below threshold or when unavailable, transfer falls back to normal `send()` behavior.
#### Frame types
| Value | Name | Purpose |
|---:|---|---|
| 1 | `START` | Start-of-run metadata |
| 2 | `DATA` | One image payload |
| 3 | `CALIBRATION` | Calibration payload |
| 4 | `END` | End-of-run metadata |
| 5 | `ACK` | Acknowledgement / error reporting |
| 6 | `CANCEL` | Cancel run initialization/stream |
#### TCP frame header (`TcpFrameHeader`)
| Field | Type | Description |
|---|---|---|
| `magic` | `uint32_t` | Protocol magic (`0x4A464A54`, `"JFJT"`) |
| `version` | `uint16_t` | Protocol version (`2`) |
| `type` | `uint16_t` | Frame type (`START`/`DATA`/`CALIBRATION`/`END`/`ACK`/`CANCEL`) |
| `image_number` | `uint64_t` | Image index for `DATA` frames |
| `payload_size` | `uint64_t` | Number of payload bytes after header |
| `socket_number` | `uint32_t` | Socket index in split-stream mode |
| `flags` | `uint32_t` | ACK flags (`OK`, `FATAL`, `HAS_ERROR_TEXT`) |
| `run_number` | `uint64_t` | Run identifier |
| `ack_processed_images` | `uint32_t` | In `ACK`: number of images processed by receiver |
| `ack_code` | `uint16_t` | In `ACK`: error/status code |
| `ack_for` | `uint16_t` | In `ACK`: frame type being acknowledged |
| `reserved` | `uint64_t[2]` | Reserved, set to `0` |
#### ACK semantics
- `ACK` frames use `ack_for` to indicate which frame type is acknowledged.
- `flags`:
- `OK`: operation accepted/successful,
- `FATAL`: receiver reports unrecoverable error (primarily for `DATA`),
- `HAS_ERROR_TEXT`: ACK payload contains UTF-8 error text.
- `ack_code` can be used to categorize errors (for example I/O, no space left, permission denied, protocol error).
### Image stream replacement
Image stream can be replaced with direct HDF5 writer and CBOR dump image pushers, it can be disabled by select "None" image pusher for all the measurements.
## Writer notification socket
Normally ZeroMQ is asynchronous. When `jfjoch_broker` is sending messages via ZeroMQ image stream, it doesn't know
if these were properly handled downstream, e.g., written to disk. For this reason a writer notification socket is introduced.
+3 -3
View File
@@ -10,11 +10,11 @@
Broker operates four external interfaces.
**Image stream** ZeroMQ PULL socket with CBOR serialization is used to send images, metadata and processing results for writing or downstream
processing. See details [here](ZEROMQ_STREAM.md#image-stream).
processing. See details [here](IMAGE_STREAM.md#image-stream).
**Preview stream** ZeroMQ PUB socket, as above but limited to subset of frames (1 image/s by default). See details [here](ZEROMQ_STREAM.md#preview-stream).
**Preview stream** ZeroMQ PUB socket, as above but limited to subset of frames (1 image/s by default). See details [here](IMAGE_STREAM.md#preview-stream).
**Metadata stream** ZeroMQ PUB socket, contains metadata for all the images, with bundling. See details [here](ZEROMQ_STREAM.md#metadata-stream).
**Metadata stream** ZeroMQ PUB socket, contains metadata for all the images, with bundling. See details [here](IMAGE_STREAM.md#metadata-stream).
**Configuration, status and results interface** HTTP/REST interface described in the OpenAPI format.
Description of the API is presented in the [OpenAPI description](../broker/redoc-static.html).
+1 -1
View File
@@ -53,7 +53,7 @@ Jungfraujoch is distributed under the GPLv3 license.
OPENAPI
OPENAPI_SPECS
CBOR
ZEROMQ_STREAM
IMAGE_STREAM
PIXEL_MASK
WEB_FRONTEND
TESTS
+27 -3
View File
@@ -10,6 +10,9 @@ void HDF5FilePusher::StartDataCollection(StartMessage &message) {
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Image pusher is already writing images");
writer = std::make_unique<FileWriter>(message);
writer_future = std::async(std::launch::async, &HDF5FilePusher::WriterThread, this);
images_written = 0;
images_err = 0;
last_processed_image = 0;
}
bool HDF5FilePusher::EndDataCollection(const EndMessage &message) {
@@ -33,9 +36,16 @@ bool HDF5FilePusher::SendImage(const uint8_t *image_data, size_t image_size, int
throw JFJochException(JFJochExceptionCategory::WrongDAQState, "Image pusher not ready for sending");
auto deserialized = CBORStream2Deserialize(image_data, image_size);
if (deserialized->data_message)
writer->Write(*deserialized->data_message);
else
if (deserialized->data_message) {
try {
writer->Write(*deserialized->data_message);
images_written++;
if (image_number > last_processed_image)
last_processed_image = image_number;
} catch (const JFJochException &e) {
images_err++;
}
} else
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"HDF5FilePusher::SendImage accepts only data image");
return true;
@@ -70,3 +80,17 @@ std::string HDF5FilePusher::PrintSetup() const {
std::filesystem::path currentPath = std::filesystem::current_path();
return "HDF5FilePusher: Images are written directly to file in base directory " + currentPath.string();
}
std::optional<ImagePusherAckProgress> HDF5FilePusher::GetAckProgress() const {
uint64_t ack_ok = images_written;
uint64_t ack_bad = images_err;
uint64_t ack_total = ack_ok + ack_bad;
uint64_t last = last_processed_image;
return ImagePusherAckProgress{
.data_acked_ok = ack_ok,
.data_acked_bad = ack_bad,
.data_acked_total = ack_total,
.last_processed_images = last
};
}
+6
View File
@@ -17,6 +17,10 @@ class HDF5FilePusher : public ImagePusher {
std::future<void> writer_future;
ThreadSafeFIFO<ImagePusherQueueElement> writer_queue;
void WriterThread();
std::atomic<uint64_t> images_written = 0;
std::atomic<uint64_t> images_err = 0;
std::atomic<uint64_t> last_processed_image = 0;
public:
// Thread safety: StartDataCollection, EndDataCollection and SendCalibration must run poorly in serial context
// SendImage can be executed in parallel
@@ -27,6 +31,8 @@ public:
bool SendCalibration(const CompressedImage &message) override;
std::string PrintSetup() const override;
std::optional<ImagePusherAckProgress> GetAckProgress() const override;
};
+10
View File
@@ -20,6 +20,15 @@ struct ImagePusherQueueElement {
bool end;
};
struct ImagePusherAckProgress {
uint64_t data_sent = 0;
uint64_t data_acked_ok = 0;
uint64_t data_acked_bad = 0;
uint64_t data_acked_total = 0;
uint64_t data_ack_pending = 0;
uint64_t last_processed_images = 0;
};
void PrepareCBORImage(DataMessage& message,
const DiffractionExperiment &experiment,
void *image, size_t image_size);
@@ -35,6 +44,7 @@ public:
virtual std::string GetWriterNotificationSocketAddress() const;
virtual ~ImagePusher() = default;
virtual std::string PrintSetup() const = 0;
virtual std::optional<ImagePusherAckProgress> GetAckProgress() const { return std::nullopt; }
};
+13
View File
@@ -167,3 +167,16 @@ bool TCPStreamPusher::SendCalibration(const CompressedImage &message) {
serializer.SerializeCalibration(message);
return socket[0]->Send(serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::CALIBRATION);
}
std::optional<ImagePusherAckProgress> TCPStreamPusher::GetAckProgress() const {
ImagePusherAckProgress out;
for (const auto &s : socket) {
auto p = s->GetDataAckProgress();
out.data_sent += p.data_sent;
out.data_acked_ok += p.data_acked_ok;
out.data_acked_bad += p.data_acked_bad;
out.data_acked_total += p.data_acked_total;
out.data_ack_pending += p.data_ack_pending;
}
return out;
}
+2
View File
@@ -29,4 +29,6 @@ public:
std::string Finalize() override;
std::string PrintSetup() const override;
std::optional<ImagePusherAckProgress> GetAckProgress() const override;
};
+34 -5
View File
@@ -307,6 +307,10 @@ bool TCPStreamPusherSocket::SendFrame(const uint8_t *data, size_t size, TCPFrame
if (z)
z->release();
}
if (ok && type == TCPFrameType::DATA)
data_sent.fetch_add(1, std::memory_order_relaxed);
return ok;
}
@@ -454,11 +458,19 @@ void TCPStreamPusherSocket::AckThread() {
cancel_ack_ok = ok;
if (!ok && error_text.empty())
last_ack_error = "CANCEL rejected";
} else if (ack_for == TCPFrameType::DATA && (!ok || fatal)) {
broken = true;
if (error_text.empty())
last_ack_error = "DATA fatal ACK";
logger.Error("Received fatal DATA ACK on " + endpoint + ": " + last_ack_error);
} else if (ack_for == TCPFrameType::DATA) {
data_acked_total.fetch_add(1, std::memory_order_relaxed);
last_processed_images.store(h.ack_processed_images, std::memory_order_relaxed);
if (ok && !fatal) {
data_acked_ok.fetch_add(1, std::memory_order_relaxed);
} else {
data_acked_bad.fetch_add(1, std::memory_order_relaxed);
broken = true; // mandatory DATA ACK mode: bad DATA ACK breaks stream
if (error_text.empty())
last_ack_error = "DATA ACK failed";
logger.Error("Received failing DATA ACK on " + endpoint + ": " + last_ack_error);
}
}
}
ack_cv.notify_all();
@@ -481,6 +493,12 @@ void TCPStreamPusherSocket::StartWriterThread() {
last_ack_code = TCPAckCode::None;
}
data_sent.store(0, std::memory_order_relaxed);
data_acked_ok.store(0, std::memory_order_relaxed);
data_acked_bad.store(0, std::memory_order_relaxed);
data_acked_total.store(0, std::memory_order_relaxed);
last_processed_images.store(0, std::memory_order_relaxed);
active = true;
send_future = std::async(std::launch::async, &TCPStreamPusherSocket::WriterThread, this);
completion_future = std::async(std::launch::async, &TCPStreamPusherSocket::CompletionThread, this);
@@ -572,3 +590,14 @@ std::string TCPStreamPusherSocket::GetLastAckError() const {
std::unique_lock ul(ack_state_mutex);
return last_ack_error;
}
ImagePusherAckProgress TCPStreamPusherSocket::GetDataAckProgress() const {
ImagePusherAckProgress p;
p.data_sent = data_sent.load(std::memory_order_relaxed);
p.data_acked_ok = data_acked_ok.load(std::memory_order_relaxed);
p.data_acked_bad = data_acked_bad.load(std::memory_order_relaxed);
p.data_acked_total = data_acked_total.load(std::memory_order_relaxed);
p.data_ack_pending = (p.data_sent >= p.data_acked_total) ? (p.data_sent - p.data_acked_total) : 0;
p.last_processed_images = last_processed_images.load(std::memory_order_relaxed);
return p;
}
+8
View File
@@ -58,6 +58,12 @@ class TCPStreamPusherSocket {
Logger logger{"TCPStream2PusherSocket"};
std::atomic<uint64_t> data_sent{0};
std::atomic<uint64_t> data_acked_ok{0};
std::atomic<uint64_t> data_acked_bad{0};
std::atomic<uint64_t> data_acked_total{0};
std::atomic<uint64_t> last_processed_images{0};
void WriterThread();
void CompletionThread();
void AckThread();
@@ -95,4 +101,6 @@ public:
bool IsBroken() const;
std::string GetLastAckError() const;
ImagePusherAckProgress GetDataAckProgress() const;
};
+2 -4
View File
@@ -128,15 +128,13 @@ void StreamWriter::ProcessDataImage() {
processed_image_size += image_puller_output.cbor->data_message->image.GetCompressedSize();
if (verbose)
logger.Info("Written");
NotifyTcpAck(TCPFrameType::DATA, true, false, TCPAckCode::None);
} catch (const JFJochException &e) {
logger.ErrorException(e);
logger.Warning("Error writing image - switching to error state");
state = StreamWriterState::Error;
err = e.what();
if (!tcp_data_fatal_sent) {
tcp_data_fatal_sent = true;
NotifyTcpAck(TCPFrameType::DATA, false, true, TCPAckCode::DataWriteFailed, err);
}
NotifyTcpAck(TCPFrameType::DATA, false, true, TCPAckCode::DataWriteFailed, err);
}
break;
case StreamWriterState::Error: