jfjoch_broker: Allow to query connected writers

This commit is contained in:
2026-03-05 12:05:46 +01:00
parent ca0409bd5f
commit 4ede10aa6a
23 changed files with 209 additions and 59 deletions
+7 -1
View File
@@ -269,4 +269,10 @@ void JFJochServices::LoadDetectorPixelMask(PixelMask &mask) {
void JFJochServices::SetupIndexing(const IndexingSettings &input) {
if (receiver)
receiver->Indexing(input);
}
}
uint64_t JFJochServices::GetConnectedWriters() const {
if (receiver)
return receiver->GetConnectedWriters();
return 0;
}
+2
View File
@@ -67,6 +67,8 @@ public:
void LoadDetectorPixelMask(PixelMask &mask);
void SetupIndexing(const IndexingSettings& input);
uint64_t GetConnectedWriters() const;
};
+1
View File
@@ -554,6 +554,7 @@ BrokerStatus JFJochStateMachine::GetStatus() const {
ret.progress = services.GetReceiverProgress();
ret.gpu_count = gpu_count;
ret.broker_version = jfjoch_version();
ret.connected_writers = services.GetConnectedWriters();
return ret;
}
+1
View File
@@ -242,6 +242,7 @@ org::openapitools::server::model::Broker_status Convert(const BrokerStatus& inpu
ret.setGpuCount(input.gpu_count);
ret.setBrokerVersion(input.broker_version);
ret.setConnectedWriters(input.connected_writers);
return ret;
}
+31 -2
View File
@@ -32,6 +32,8 @@ Broker_status::Broker_status()
m_Gpu_countIsSet = false;
m_Broker_version = "";
m_Broker_versionIsSet = false;
m_Connected_writers = 0L;
m_Connected_writersIsSet = false;
}
@@ -73,7 +75,7 @@ bool Broker_status::validate(std::stringstream& msg, const std::string& pathPref
}
}
return success;
}
@@ -98,7 +100,10 @@ bool Broker_status::operator==(const Broker_status& rhs) const
((!gpuCountIsSet() && !rhs.gpuCountIsSet()) || (gpuCountIsSet() && rhs.gpuCountIsSet() && getGpuCount() == rhs.getGpuCount())) &&
((!brokerVersionIsSet() && !rhs.brokerVersionIsSet()) || (brokerVersionIsSet() && rhs.brokerVersionIsSet() && getBrokerVersion() == rhs.getBrokerVersion()))
((!brokerVersionIsSet() && !rhs.brokerVersionIsSet()) || (brokerVersionIsSet() && rhs.brokerVersionIsSet() && getBrokerVersion() == rhs.getBrokerVersion())) &&
((!connectedWritersIsSet() && !rhs.connectedWritersIsSet()) || (connectedWritersIsSet() && rhs.connectedWritersIsSet() && getConnectedWriters() == rhs.getConnectedWriters()))
;
}
@@ -122,6 +127,8 @@ void to_json(nlohmann::json& j, const Broker_status& o)
j["gpu_count"] = o.m_Gpu_count;
if(o.brokerVersionIsSet())
j["broker_version"] = o.m_Broker_version;
if(o.connectedWritersIsSet())
j["connected_writers"] = o.m_Connected_writers;
}
@@ -153,6 +160,11 @@ void from_json(const nlohmann::json& j, Broker_status& o)
j.at("broker_version").get_to(o.m_Broker_version);
o.m_Broker_versionIsSet = true;
}
if(j.find("connected_writers") != j.end())
{
j.at("connected_writers").get_to(o.m_Connected_writers);
o.m_Connected_writersIsSet = true;
}
}
@@ -249,6 +261,23 @@ void Broker_status::unsetBroker_version()
{
m_Broker_versionIsSet = false;
}
int64_t Broker_status::getConnectedWriters() const
{
return m_Connected_writers;
}
void Broker_status::setConnectedWriters(int64_t const value)
{
m_Connected_writers = value;
m_Connected_writersIsSet = true;
}
bool Broker_status::connectedWritersIsSet() const
{
return m_Connected_writersIsSet;
}
void Broker_status::unsetConnected_writers()
{
m_Connected_writersIsSet = false;
}
} // namespace org::openapitools::server::model
+9
View File
@@ -98,6 +98,13 @@ public:
void setBrokerVersion(std::string const& value);
bool brokerVersionIsSet() const;
void unsetBroker_version();
/// <summary>
/// Number of connected writers For ZeroMQ image socket: number is constant For TCP/IP image socket: number is updated live during operation
/// </summary>
int64_t getConnectedWriters() const;
void setConnectedWriters(int64_t const value);
bool connectedWritersIsSet() const;
void unsetConnected_writers();
friend void to_json(nlohmann::json& j, const Broker_status& o);
friend void from_json(const nlohmann::json& j, Broker_status& o);
@@ -114,6 +121,8 @@ protected:
bool m_Gpu_countIsSet;
std::string m_Broker_version;
bool m_Broker_versionIsSet;
int64_t m_Connected_writers;
bool m_Connected_writersIsSet;
};
+7
View File
@@ -1344,6 +1344,13 @@ components:
type: string
description: Version of the jfjoch_broker
example: "1.0.0-rc.128"
connected_writers:
type: integer
format: int64
description: |
Number of connected writers
For ZeroMQ image socket: number is constant
For TCP/IP image socket: number is updated live during operation
plot:
type: object
required:
File diff suppressed because one or more lines are too long
+1
View File
@@ -15,4 +15,5 @@ struct BrokerStatus {
enum class MessageSeverity {Error, Info, Warning, Success} message_severity = MessageSeverity::Error;
int64_t gpu_count;
std::string broker_version;
int64_t connected_writers;
};
+110 -48
View File
@@ -1,6 +1,7 @@
# Data streams
Jungfraujoch process (`jfjoch_broker`) operates three outputs.
Jungfraujoch process (`jfjoch_broker`) operates three outputs.
All three can be operated/enabled independently.
These are:
* **Image** - all the images including metadata (ZeroMQ PUSH socket or custom TCP/IP socket)
@@ -8,10 +9,9 @@ These are:
* **Metadata** - only metadata for all the images, bundled into packages (PUB socket)
## Image stream
Images (with metadata) are serialized as CBOR [image message](CBOR.md#image-message).
Images (with metadata) are serialized as CBOR [image message](CBOR.md#image-message).
The stream will also include CBOR [start message](CBOR.md#start-message), [calibration messages](CBOR.md#calibration-message) and [end message](CBOR.md#end-message) with run metadata.
If `file_prefix` is not provided for a data collection, images won't be sent to image stream (or its HDF5/CBOR replacements).
### Splitting image stream
@@ -19,50 +19,97 @@ Image stream can be split into multiple sockets to increase performance, in this
All sockets will forward start and end messages. Only first socket will forward calibration messages and will be marked to write master file.
### ZeroMQ image stream
This is using PUSH ZeroMQ socket(s).
It should be strictly avoided to have multiple receivers connected to one PUSH ZeroMQ socket.
ZeroMQ will send the images in a round-robin basis to the receivers.
This is using PUSH ZeroMQ socket(s).
It should be strictly avoided to have multiple receivers connected to one PUSH ZeroMQ socket.
ZeroMQ will send the images in a round-robin basis to the receivers.
In this case start and end messages will end up only with one receiver.
Instead, Jungfraujoch feature of multiple sockets should be used.
For ZeroMQ image stream, each writer connects to a different port.
Behavior is as following:
* Start message is sent with timeout of 5s. If within the time the message cannot be put in the outgoing queue or there is no connected puller exception is thrown - stop data collection with error due to absence of a writer.
* Images are sent in non-blocking way and without timeout.
* End message is sent with timeout of 5s. No error is reported.
* Start message is sent with timeout of 1s per socket. If within the time the message cannot be put in the outgoing queue or there is no connected puller, an exception is thrown data collection is stopped with an error due to absence of a writer.
* Calibration message is sent to the first socket only, with timeout of 1s.
* Images are sent via a per-socket writer thread. If a send times out, the pusher switches to non-blocking mode for the remainder of the collection (images may be dropped).
* End message is sent with timeout of 1s per socket. No exception is thrown on timeout, but a transmission error is recorded.
The format is generally interchangeable with DECTRIS Stream2 format.
#### ZeroMQ configuration
ZeroMQ image stream is configured in the broker JSON configuration file under the `zeromq_settings` section:
```json
{
"image_socket": ["tcp://192.168.0.1:9000", "tcp://192.168.0.1:9001"],
"send_watermark": 100,
"send_buffer_size": 67108864,
"writer_notification_socket": "tcp://192.168.0.1:*"
}
```
- `image_socket`: one or more PUSH socket addresses. Multiple entries split the image stream across sockets. Addresses follow ZeroMQ conventions (`tcp://`, `ipc://`). `0.0.0.0` binds on all network interfaces.
- `send_watermark` (optional): ZeroMQ send high-water mark (number of outstanding messages per socket).
- `send_buffer_size` (optional): OS-level send buffer size for the ZeroMQ socket.
- `writer_notification_socket` (optional): see [Writer notification socket](#writer-notification-socket) below.
### TCP/IP image stream
This is using TCP/IP socket(s) with a fixed binary frame header followed by payload bytes.
This format was introduced to Jungfraujoch as an alternative to ZeroMQ image stream. It allows two-way communication
This is using TCP/IP socket(s) with a fixed binary frame header followed by payload bytes.
This format was introduced to Jungfraujoch as an alternative to ZeroMQ image stream. It allows two-way communication
between the data collection and the writer, and is therefore more robust than ZeroMQ.
For TCP/IP image stream, all writers connect to a single TCP port.
For TCP/IP image stream, Jungfraujoch **listens** on a single TCP port and all writers **connect** to it. Connections are persistent — writers connect once and stay connected across multiple data collections. Jungfraujoch sends periodic `KEEPALIVE` frames when no data collection is active to detect dead connections; writers are expected to respond with a `KEEPALIVE` pong.
Using `*` as port number (e.g. `tcp://127.0.0.1:*`) is supported — the OS assigns a free port and the actual bound address can be queried via `GetAddress()`.
Payloads for `START`, `DATA`, `CALIBRATION` and `END` frames are CBOR messages, equivalent in content to the ZeroMQ image stream messages.
`ACK` and `CANCEL` are control frames.
`ACK`, `CANCEL`, and `KEEPALIVE` are control frames (no CBOR payload).
Each data collection is treated as a separate TCP streaming session: writers establish fresh connections for that collection (one connection per configured socket), then perform `START``DATA/CALIBRATION``END` (or `CANCEL` on startup rollback).
The data collection lifecycle on each connection follows:
`START``CALIBRATION` (socket 0 only) → `DATA` (repeated) → `END`
If a `START` ACK fails on any connection, Jungfraujoch sends `CANCEL` to all already-started connections and rolls back.
For each frame:
1. Read one `TcpFrameHeader` (fixed size).
2. Validate `magic` and `version`.
1. Read one `TcpFrameHeader` (fixed size, 64-byte aligned).
2. Validate `magic` (`0x4A464A54` / `"JFJT"`) and `version` (`2`).
3. Read `payload_size` bytes (if non-zero).
When image stream is split into multiple sockets:
- `START` and `END` are sent on all sockets,
- `CALIBRATION` is sent only on socket 0,
- `DATA` frames are distributed by file grouping (`images_per_file`).
When image stream is split into multiple connections:
- `START` and `END` are sent on all connections,
- `CALIBRATION` is sent only on connection 0,
- `DATA` frames are distributed by file grouping: connection index = `(image_number / images_per_file) % num_connections`.
#### TCP/IP configuration
TCP/IP image stream is configured in the broker JSON configuration file under the `tcp_settings` section:
```json
{
"addr": "tcp://192.168.0.1:9100",
"nwriters": 2,
"send_buffer_size": 67108864
}
```
- `addr`: listen address in `tcp://<IP>:<port>` format. `0.0.0.0` binds on all interfaces. `*` as port selects a random free port.
- `nwriters` (optional): maximum number of simultaneous writer connections accepted.
- `send_buffer_size` (optional): OS-level `SO_SNDBUF` size for accepted connections.
#### ACK handling
ACK handling is mandatory for correct operation:
- `START` **must** be acknowledged (`ACK` with `ack_for=START`) on each socket, otherwise collection start fails.
- `END` **must** be acknowledged (`ack_for=END`) on each socket for successful completion.
- `CANCEL` is acknowledged during rollback paths.
- `DATA` **must** be ackonwledged for every frame and should be used to report fatal downstream errors immediately.
- `CALIBRATION` not acknowledged at this moment.
- `START` **must** be acknowledged (`ACK` with `ack_for=START`) on each connection within 5 seconds, otherwise collection start fails and a rollback is triggered.
- `END` **must** be acknowledged (`ack_for=END`) on each connection within 10 seconds for successful completion.
- `CANCEL` should be acknowledged during rollback paths (500ms timeout).
- `DATA` should be acknowledged for every frame. A `DATA` ACK with `FATAL` flag set reports a downstream error (e.g. disk full) which is propagated to `jfjoch_broker` via `Finalize()`. A failed `DATA` ACK does **not** break the TCP connection on its own — data continues to flow.
- `CALIBRATION` is not acknowledged at this time.
- `KEEPALIVE` frames are not acknowledged via ACK; the writer responds with a `KEEPALIVE` pong frame instead.
On Linux, large payload transmission can use kernel TCP zero-copy (`SO_ZEROCOPY`/`MSG_ZEROCOPY`) when enabled; when unavailable, transfer falls back to normal `send()` behavior.
#### Keepalive
When no data collection is active, Jungfraujoch sends `KEEPALIVE` frames approximately every 5 seconds on each persistent connection. Writers should respond with a `KEEPALIVE` frame (pong). OS-level TCP keepalive is also enabled (`TCP_KEEPIDLE=30s`, `TCP_KEEPINTVL=10s`, `TCP_KEEPCNT=3`) as a secondary safety net. Dead connections are automatically removed from the pool.
#### Zero-copy transmission
On Linux, large payload transmission (`DATA` and `CALIBRATION` frames) can use kernel TCP zero-copy (`SO_ZEROCOPY`/`MSG_ZEROCOPY`) when available. If the kernel does not support it or the socket option fails, transmission transparently falls back to normal `send()` behavior. Zero-copy completion notifications are processed by a dedicated per-connection thread.
#### Frame types
@@ -74,6 +121,7 @@ On Linux, large payload transmission can use kernel TCP zero-copy (`SO_ZEROCOPY`
| 4 | `END` | End-of-run metadata |
| 5 | `ACK` | Acknowledgement / error reporting |
| 6 | `CANCEL` | Cancel run initialization/stream |
| 7 | `KEEPALIVE` | Connection liveness probe/pong |
#### TCP frame header (`TcpFrameHeader`)
@@ -81,10 +129,10 @@ On Linux, large payload transmission can use kernel TCP zero-copy (`SO_ZEROCOPY`
|---|---|---|
| `magic` | `uint32_t` | Protocol magic (`0x4A464A54`, `"JFJT"`) |
| `version` | `uint16_t` | Protocol version (`2`) |
| `type` | `uint16_t` | Frame type (`START`/`DATA`/`CALIBRATION`/`END`/`ACK`/`CANCEL`) |
| `type` | `uint16_t` | Frame type (see table above) |
| `image_number` | `uint64_t` | Image index for `DATA` frames |
| `payload_size` | `uint64_t` | Number of payload bytes after header |
| `socket_number` | `uint32_t` | Socket index in split-stream mode |
| `socket_number` | `uint32_t` | Connection index in split-stream mode |
| `flags` | `uint32_t` | ACK flags (`OK`, `FATAL`, `HAS_ERROR_TEXT`) |
| `run_number` | `uint64_t` | Run identifier |
| `ack_processed_images` | `uint32_t` | In `ACK`: number of images processed by receiver |
@@ -92,22 +140,36 @@ On Linux, large payload transmission can use kernel TCP zero-copy (`SO_ZEROCOPY`
| `ack_for` | `uint16_t` | In `ACK`: frame type being acknowledged |
| `reserved` | `uint64_t[2]` | Reserved, set to `0` |
The header is 64-byte aligned (`alignas(64)`).
#### ACK semantics
- `ACK` frames use `ack_for` to indicate which frame type is acknowledged.
- `flags`:
- `OK`: operation accepted/successful,
- `FATAL`: receiver reports unrecoverable error (primarily for `DATA`),
- `HAS_ERROR_TEXT`: ACK payload contains UTF-8 error text.
- `ack_code` can be used to categorize errors (for example I/O, no space left, permission denied, protocol error).
- `OK` (bit 0): operation accepted/successful,
- `FATAL` (bit 1): receiver reports unrecoverable error (primarily for `DATA`),
- `HAS_ERROR_TEXT` (bit 2): ACK payload contains UTF-8 error text.
- `ack_code` can be used to categorize errors:
| Code | Name | Meaning |
|---:|---|---|
| 0 | `None` | No error |
| 1 | `StartFailed` | START processing failed |
| 2 | `DataWriteFailed` | Image write failed |
| 3 | `EndFailed` | END processing failed |
| 4 | `DiskQuotaExceeded` | Disk quota exceeded |
| 5 | `NoSpaceLeft` | No space left on device |
| 6 | `PermissionDenied` | Permission denied |
| 7 | `IoError` | General I/O error |
| 8 | `ProtocolError` | Protocol-level error |
### Image stream replacement
Image stream can be replaced with direct HDF5 writer and CBOR dump image pushers, it can be disabled by select "None" image pusher for all the measurements.
Image stream can be replaced with direct HDF5 writer and CBOR dump image pushers, or it can be disabled by selecting "None" image pusher for all the measurements.
## Writer notification socket
Normally ZeroMQ is asynchronous. When `jfjoch_broker` is sending messages via ZeroMQ image stream, it doesn't know
if these were properly handled downstream, e.g., written to disk. For this reason a writer notification socket is introduced.
It allows to downstream processing code to notify 'jfjoch_broker' that all images were handled properly.
The writer notification socket is used **only with ZeroMQ image stream**. Since ZeroMQ is asynchronous, `jfjoch_broker` does not know whether messages were properly handled downstream (e.g. written to disk). The writer notification socket allows downstream code to report back.
For TCP/IP image stream, this mechanism is not needed — ACK frames provide synchronous feedback for each control and data frame.
To use writer notification socket, it has to be first enabled in the JSON configuration file of broker with `writer_notification_socket` entry:
```json
@@ -115,13 +177,13 @@ To use writer notification socket, it has to be first enabled in the JSON config
"writer_notification_socket":"tcp://192.168.0.1:*"
}
```
Such entry will create PULL socket on `192.168.0.1` network interface listening on one, random TCP port. When data processing is started, the
Such entry will create PULL socket on `192.168.0.1` network interface listening on one, random TCP port. When data processing is started, the
image stream will send CBOR [start message](CBOR.md#start-message). This message will include information on `writer_notification_zmq_addr`,
which needs to be used by downstream code. Since the start message must reference the address of `jfjoch_broker` host, notification
socket should always listen on a particular network interface, and should not be configured with placeholder address `0.0.0.0`. It is, however, OK
to use placeholder `:*` for network port, as it will be substituted for the one chosen by ZeroMQ.
For every image stream, downstream code must send the following message to the PULL socket:
For every image stream socket, downstream code must send the following message to the PULL socket:
```json
{
"run_number":135,
@@ -131,7 +193,7 @@ For every image stream, downstream code must send the following message to the P
"ok": true
}
```
Here `run_number`, `run_name` and `socket_number` must match information from the start message.
Here `run_number`, `run_name` and `socket_number` must match information from the start message.
`ok` is boolean confirming if the writing process was OK.
`processed_images` is number of images that were written/processed, this is to track how many images were ignored by non-blocking ZeroMQ procedures.
If not, it is possible to include error message:
@@ -150,23 +212,23 @@ This way errors from the downstream code are propagated to `jfjoch_broker`.
If writer notification socket is configured, but downstream code doesn't send proper notification, `jfjoch_broker` will time out after 60 seconds producing an error message.
## Preview stream
Jungfraujoch can also send images (with metadata) at a reduced frame rate for preview purpose.
Jungfraujoch can also send images (with metadata) at a reduced frame rate for preview purpose.
Images are serialized as CBOR [image message](CBOR.md#image-message).
The stream will also include CBOR [start message](CBOR.md#start-message) and [end message](CBOR.md#end-message) with run metadata.
Only start and image messages are sent.
This is using PUB socket with conflate option. I.e., only the last message is kept by ZeroMQ, so if receiver cannot cope
with the messages, it will always receive the last generated message (no backlog).
For this reason it is also recommended to use the same option on receiver side.
This is using PUB socket with conflate option. I.e., only the last message is kept by ZeroMQ, so if receiver cannot cope
with the messages, it will always receive the last generated message (no backlog).
For this reason it is also recommended to use the same option on receiver side.
Given PUB socket properties, it is possible to connect multiple viewers to a single socket --- all the viewers should receive all the images sent.
## Metadata stream
Jungfrajoch can also send pure metadata for the purpose of archiving such information.
Metadata are serialized as CBOR [metadata message](CBOR.md#metadata-message).
Jungfraujoch can also send pure metadata for the purpose of archiving such information.
Metadata are serialized as CBOR [metadata message](CBOR.md#metadata-message).
This is very similar as image message, but excludes the actual image array and spot positions.
As metadata are relatively small, to avoid large number of messages, Jungfraujoch bundles metadata of many images in one message.
Order of images within bundle, as well a size of the bundle, are not guaranteed
As metadata are relatively small, to avoid large number of messages, Jungfraujoch bundles metadata of many images in one message.
Order of images within bundle, as well a size of the bundle, are not guaranteed.
The stream will also include CBOR [start message](CBOR.md#start-message) and [end message](CBOR.md#end-message) with run metadata.
This is using PUB socket with watermark, so there is some queuing of messages with ZeroMQ. Multiple receivers can be connected.
This is using PUB socket with watermark, so there is some queuing of messages with ZeroMQ. Multiple receivers can be connected.
+1
View File
@@ -11,6 +11,7 @@ Name | Type | Description | Notes
**message_severity** | **str** | Level of the message to display | [optional] [default to 'error']
**gpu_count** | **int** | Number of installed GPUs | [optional]
**broker_version** | **str** | Version of the jfjoch_broker | [optional]
**connected_writers** | **int** | Number of connected writers For ZeroMQ image socket: number is constant For TCP/IP image socket: number is updated live during operation | [optional]
## Example
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "jungfraujoch-frontend",
"version": "1.0.0-rc.128",
"version": "1.0.0-rc.129",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "jungfraujoch-frontend",
"version": "1.0.0-rc.128",
"version": "1.0.0-rc.129",
"license": "GPL-3.0",
"dependencies": {
"@emotion/react": "^11.10.4",
@@ -28,6 +28,13 @@ export type broker_status = {
* Version of the jfjoch_broker
*/
broker_version?: string;
/**
* Number of connected writers
* For ZeroMQ image socket: number is constant
* For TCP/IP image socket: number is updated live during operation
*
*/
connected_writers?: number;
};
export namespace broker_status {
+5 -1
View File
@@ -57,4 +57,8 @@ std::string CBORFilePusher::PrintSetup() const {
std::filesystem::path currentPath = std::filesystem::current_path();
return "CBORFilePusher: CBOR messages for debugging are written directly to file in base directory " + currentPath.string();
}
}
size_t CBORFilePusher::GetConnectedWriters() const {
return 1;
}
+2
View File
@@ -18,6 +18,8 @@ public:
bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
bool SendCalibration(const CompressedImage &message) override;
std::string PrintSetup() const override;
size_t GetConnectedWriters() const override;
};
+4
View File
@@ -76,3 +76,7 @@ std::string HDF5FilePusher::PrintSetup() const {
std::optional<uint64_t> HDF5FilePusher::GetImagesWritten() const {
return images_written;
}
size_t HDF5FilePusher::GetConnectedWriters() const {
return 1;
}
+1
View File
@@ -31,6 +31,7 @@ public:
std::string PrintSetup() const override;
std::optional<uint64_t> GetImagesWritten() const override;
size_t GetConnectedWriters() const override;
};
+1
View File
@@ -36,6 +36,7 @@ public:
virtual ~ImagePusher() = default;
virtual std::string PrintSetup() const = 0;
virtual std::optional<uint64_t> GetImagesWritten() const { return std::nullopt; }
virtual size_t GetConnectedWriters() const { return 0; };
};
+1 -1
View File
@@ -171,7 +171,7 @@ public:
std::string GetAddress() const { return endpoint; }
/// Returns the number of currently connected writers (can be called at any time)
size_t GetConnectedWriters() const;
size_t GetConnectedWriters() const override;
void StartDataCollection(StartMessage& message) override;
bool EndDataCollection(const EndMessage& message) override;
+5 -1
View File
@@ -142,4 +142,8 @@ std::string ZMQStream2Pusher::PrintSetup() const {
std::optional<uint64_t> ZMQStream2Pusher::GetImagesWritten() const {
std::unique_lock ul(images_written_mutex);
return images_written;
}
}
size_t ZMQStream2Pusher::GetConnectedWriters() const {
return socket.size();
}
+2
View File
@@ -48,6 +48,8 @@ public:
std::string PrintSetup() const override;
std::optional<uint64_t> GetImagesWritten() const override;
size_t GetConnectedWriters() const override;
};
#endif //JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H
+4
View File
@@ -337,3 +337,7 @@ JFJochReceiverService &JFJochReceiverService::Indexing(const IndexingSettings &i
throw JFJochException(JFJochExceptionCategory::WrongDAQState,
"Cannot change indexing settings during data collection");
}
uint64_t JFJochReceiverService::GetConnectedWriters() const {
return image_pusher.GetConnectedWriters();
}
+2
View File
@@ -88,6 +88,8 @@ public:
std::string GetJPEGFromBuffer(const PreviewImageSettings &settings, int64_t image_number = ImageBuffer::MaxImage);
ImageBufferStatus GetImageBufferStatus() const;
void ClearImageBuffer();
uint64_t GetConnectedWriters() const;
};