jfjoch_broker: Export writer FIFO utilization for TCP socket

This commit is contained in:
2026-03-05 19:59:23 +01:00
parent 512023f5f6
commit 1758417702
13 changed files with 157 additions and 9 deletions

View File

@@ -1118,5 +1118,6 @@ org::openapitools::server::model::Image_pusher_status Convert(const ImagePusherS
ret.setImagesWritten(input.images_written.value());
if (input.images_write_error)
ret.setImagesWriteError(input.images_write_error.value());
ret.setWriterFifoUtilization(input.writer_fifo_utilization);
return ret;
}

View File

@@ -26,6 +26,7 @@ Image_pusher_status::Image_pusher_status()
m_Images_writtenIsSet = false;
m_Images_write_error = 0L;
m_Images_write_errorIsSet = false;
m_Writer_fifo_utilizationIsSet = false;
}
@@ -69,7 +70,75 @@ bool Image_pusher_status::validate(std::stringstream& msg, const std::string& pa
}
}
/* Connected_writers */ {
const int64_t& value = m_Connected_writers;
const std::string currentValuePath = _pathPrefix + ".connectedWriters";
if (value < 0ll)
{
success = false;
msg << currentValuePath << ": must be greater than or equal to 0;";
}
}
if (imagesWrittenIsSet())
{
const int64_t& value = m_Images_written;
const std::string currentValuePath = _pathPrefix + ".imagesWritten";
if (value < 0ll)
{
success = false;
msg << currentValuePath << ": must be greater than or equal to 0;";
}
}
if (imagesWriteErrorIsSet())
{
const int64_t& value = m_Images_write_error;
const std::string currentValuePath = _pathPrefix + ".imagesWriteError";
if (value < 0ll)
{
success = false;
msg << currentValuePath << ": must be greater than or equal to 0;";
}
}
if (writerFifoUtilizationIsSet())
{
const std::vector<int64_t>& value = m_Writer_fifo_utilization;
const std::string currentValuePath = _pathPrefix + ".writerFifoUtilization";
{ // Recursive validation of array elements
const std::string oldValuePath = currentValuePath;
int i = 0;
for (const int64_t& value : value)
{
const std::string currentValuePath = oldValuePath + "[" + std::to_string(i) + "]";
if (value < 0ll)
{
success = false;
msg << currentValuePath << ": must be greater than or equal to 0;";
}
i++;
}
}
}
return success;
}
@@ -91,7 +160,10 @@ bool Image_pusher_status::operator==(const Image_pusher_status& rhs) const
((!imagesWrittenIsSet() && !rhs.imagesWrittenIsSet()) || (imagesWrittenIsSet() && rhs.imagesWrittenIsSet() && getImagesWritten() == rhs.getImagesWritten())) &&
((!imagesWriteErrorIsSet() && !rhs.imagesWriteErrorIsSet()) || (imagesWriteErrorIsSet() && rhs.imagesWriteErrorIsSet() && getImagesWriteError() == rhs.getImagesWriteError()))
((!imagesWriteErrorIsSet() && !rhs.imagesWriteErrorIsSet()) || (imagesWriteErrorIsSet() && rhs.imagesWriteErrorIsSet() && getImagesWriteError() == rhs.getImagesWriteError())) &&
((!writerFifoUtilizationIsSet() && !rhs.writerFifoUtilizationIsSet()) || (writerFifoUtilizationIsSet() && rhs.writerFifoUtilizationIsSet() && getWriterFifoUtilization() == rhs.getWriterFifoUtilization()))
;
}
@@ -111,6 +183,8 @@ void to_json(nlohmann::json& j, const Image_pusher_status& o)
j["images_written"] = o.m_Images_written;
if(o.imagesWriteErrorIsSet())
j["images_write_error"] = o.m_Images_write_error;
if(o.writerFifoUtilizationIsSet() || !o.m_Writer_fifo_utilization.empty())
j["writer_fifo_utilization"] = o.m_Writer_fifo_utilization;
}
@@ -129,6 +203,11 @@ void from_json(const nlohmann::json& j, Image_pusher_status& o)
j.at("images_write_error").get_to(o.m_Images_write_error);
o.m_Images_write_errorIsSet = true;
}
if(j.find("writer_fifo_utilization") != j.end())
{
j.at("writer_fifo_utilization").get_to(o.m_Writer_fifo_utilization);
o.m_Writer_fifo_utilizationIsSet = true;
}
}
@@ -190,6 +269,23 @@ void Image_pusher_status::unsetImages_write_error()
{
m_Images_write_errorIsSet = false;
}
std::vector<int64_t> Image_pusher_status::getWriterFifoUtilization() const
{
return m_Writer_fifo_utilization;
}
void Image_pusher_status::setWriterFifoUtilization(std::vector<int64_t> const value)
{
m_Writer_fifo_utilization = value;
m_Writer_fifo_utilizationIsSet = true;
}
bool Image_pusher_status::writerFifoUtilizationIsSet() const
{
return m_Writer_fifo_utilizationIsSet;
}
void Image_pusher_status::unsetWriter_fifo_utilization()
{
m_Writer_fifo_utilizationIsSet = false;
}
} // namespace org::openapitools::server::model

View File

@@ -89,6 +89,13 @@ public:
void setImagesWriteError(int64_t const value);
bool imagesWriteErrorIsSet() const;
void unsetImages_write_error();
/// <summary>
/// Utilization of internal writer FIFO. This number is updated live during operation for TCP/IP image socket. No other socket use it.
/// </summary>
std::vector<int64_t> getWriterFifoUtilization() const;
void setWriterFifoUtilization(std::vector<int64_t> const value);
bool writerFifoUtilizationIsSet() const;
void unsetWriter_fifo_utilization();
friend void to_json(nlohmann::json& j, const Image_pusher_status& o);
friend void from_json(const nlohmann::json& j, Image_pusher_status& o);
@@ -103,6 +110,8 @@ protected:
bool m_Images_writtenIsSet;
int64_t m_Images_write_error;
bool m_Images_write_errorIsSet;
std::vector<int64_t> m_Writer_fifo_utilization;
bool m_Writer_fifo_utilizationIsSet;
};

View File

@@ -24,7 +24,7 @@ Tcp_settings::Tcp_settings()
m_Send_buffer_size = 0L;
m_Send_buffer_sizeIsSet = false;
m_Addr = "";
m_Nwriters = 1L;
m_Nwriters = 32L;
m_NwritersIsSet = false;
}

View File

@@ -1368,6 +1368,7 @@ components:
connected_writers:
type: integer
format: int64
minimum: 0
description: |
Number of connected writers
For ZeroMQ image socket: number is constant
@@ -1375,6 +1376,7 @@ components:
images_written:
type: integer
format: int64
minimum: 0
description: |
Number of images written to the image socket.
This number is updated live during operation for TCP/IP image socket and direct HDF5 writer.
@@ -1382,10 +1384,21 @@ components:
images_write_error:
type: integer
format: int64
minimum: 0
description: |
Number of images that could not be written to the image socket.
This number is updated live during operation for TCP/IP image socket.
No other socket use it.
writer_fifo_utilization:
type: array
items:
type: integer
format: int64
minimum: 0
description: |
Utilization of internal writer FIFO.
This number is updated live during operation for TCP/IP image socket.
No other socket use it.
plot:
type: object
required:
@@ -2164,7 +2177,7 @@ components:
format: int64
minimum: 1
maximum: 100
default: 1
default: 32
description: Number of TCP/IP writers to be used for streaming images
pcie_devices:
type: array

File diff suppressed because one or more lines are too long

View File

@@ -11,6 +11,7 @@ Name | Type | Description | Notes
**connected_writers** | **int** | Number of connected writers For ZeroMQ image socket: number is constant For TCP/IP image socket: number is updated live during operation |
**images_written** | **int** | Number of images written to the image socket. This number is updated live during operation for TCP/IP image socket and direct HDF5 writer. It is updated at the end of experiment for ZeroMQ image socket. | [optional]
**images_write_error** | **int** | Number of images that could not be written to the image socket. This number is updated live during operation for TCP/IP image socket. No other socket use it. | [optional]
**writer_fifo_utilization** | **List[int]** | Utilization of internal writer FIFO. This number is updated live during operation for TCP/IP image socket. No other socket use it. | [optional]
## Example

View File

@@ -8,7 +8,7 @@ Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**send_buffer_size** | **int** | Send buffer size for TCP/IP socket | [optional]
**addr** | **str** | tcp://&lt;IP address&gt;:&lt;port&gt; 0.0.0.0 instead of IP address is accepted and means listening on all network interfaces |
**nwriters** | **int** | Number of TCP/IP writers to be used for streaming images | [optional] [default to 1]
**nwriters** | **int** | Number of TCP/IP writers to be used for streaming images | [optional] [default to 32]
## Example

View File

@@ -100,5 +100,9 @@
"detector_trigger_delay_ns": 0
},
"frontend_directory": "../../frontend/dist/",
"image_pusher": "HDF5"
"image_pusher": "TCP",
"tcp": {
"addr": "tcp://0.0.0.0:*",
"nwriters": 32
}
}

View File

@@ -40,5 +40,12 @@ export type image_pusher_status = {
*
*/
images_write_error?: number;
/**
* Utilization of internal writer FIFO.
* This number is updated live during operation for TCP/IP image socket.
* No other socket use it.
*
*/
writer_fifo_utilization?: Array<number>;
};

View File

@@ -19,6 +19,7 @@ struct ImagePusherStatus {
size_t connected_writers = 0;
std::optional<size_t> images_written = 0;
std::optional<size_t> images_write_error = 0;
std::vector<int64_t> writer_fifo_utilization;
};
struct ImagePusherQueueElement {
@@ -48,6 +49,7 @@ public:
virtual std::optional<uint64_t> GetImagesWriteError() const { return std::nullopt; }
virtual size_t GetConnectedWriters() const { return 0; };
virtual ImagePusherType GetType() const = 0;
virtual std::vector<int64_t> GetWriterFifoUtilization() const { return {}; }
ImagePusherStatus GetStatus() const {
return ImagePusherStatus{
@@ -55,7 +57,8 @@ public:
GetAddress(),
GetConnectedWriters(),
GetImagesWritten(),
GetImagesWriteError()
GetImagesWriteError(),
GetWriterFifoUtilization()
};
}
};

View File

@@ -592,6 +592,8 @@ void TCPStreamPusher::PersistentAckThread(Connection* c) {
break;
}
c->last_ack_fifo_occupancy.store(h.ack_fifo_occupancy, std::memory_order_relaxed);
// ACK frame — forward to data-collection ack logic
std::string error_text;
if (h.payload_size > 0) {
@@ -1143,3 +1145,13 @@ std::optional<uint64_t> TCPStreamPusher::GetImagesWritten() const {
std::optional<uint64_t> TCPStreamPusher::GetImagesWriteError() const {
return total_data_acked_bad.load(std::memory_order_relaxed);
}
std::vector<int64_t> TCPStreamPusher::GetWriterFifoUtilization() const {
std::vector<int64_t> ret;
std::lock_guard lg(connections_mutex);
ret.reserve(connections.size());
for (const auto& c : connections) {
ret.push_back(c->last_ack_fifo_occupancy.load(std::memory_order_relaxed));
}
return ret;
}

View File

@@ -91,6 +91,7 @@ class TCPStreamPusher : public ImagePusher {
std::atomic<uint64_t> data_acked_ok{0};
std::atomic<uint64_t> data_acked_bad{0};
std::atomic<uint64_t> data_acked_total{0};
std::atomic<uint64_t> last_ack_fifo_occupancy{0};
std::chrono::steady_clock::time_point last_keepalive_sent{};
std::chrono::steady_clock::time_point last_keepalive_recv{};
@@ -185,5 +186,6 @@ public:
std::optional<uint64_t> GetImagesWritten() const override;
std::optional<uint64_t> GetImagesWriteError() const override;
std::vector<int64_t> GetWriterFifoUtilization() const override;
ImagePusherType GetType() const override { return ImagePusherType::TCP; }
};