From f859f8108fc5f1a8bdd4c5011a891a2339267ef3 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Thu, 25 Jun 2026 13:05:41 +0200 Subject: [PATCH 1/7] TCPStreamPusher: remove MSG_ZEROCOPY machinery, use plain blocking send The MSG_ZEROCOPY path was the common factor behind the occasional mid-run writer disconnects and added substantial failure surface for no benefit at this throughput (tens of MB/s): the socket error queue raises POLLERR as a normal event (entangled with liveness detection), and the per-connection completion-id counter was reset every run while the kernel's sk_zckey is monotonic for the life of the socket, so on a persistent connection the bookkeeping diverged from run 2 onward. Replace it with a straightforward synchronous send(): - SendAll/SendFrame lose all zerocopy params; DATA payloads are sent with a plain ::send(MSG_NOSIGNAL), so the image-buffer slot is owned by the kernel once send() returns and WriterThread releases it immediately. - Drop ZeroCopyCompletionThread and the zc_pending/zc_mutex/zc_cv/zc_*_id state, SO_ZEROCOPY setup, and the errqueue include. - StopDataCollectionThreads now drains and releases any queued-but-unsent slots on the stalled-writer path (active==false makes WriterThread exit without draining) instead of Clear()-ing them, avoiding a slot leak. The BUSY-heartbeat peer-liveness timeout (backpressure tolerance) is kept; it is independent of zerocopy. Co-Authored-By: Claude Opus 4.8 (1M context) --- image_pusher/TCPStreamPusher.cpp | 277 +++++-------------------------- image_pusher/TCPStreamPusher.h | 29 +--- 2 files changed, 40 insertions(+), 266 deletions(-) diff --git a/image_pusher/TCPStreamPusher.cpp b/image_pusher/TCPStreamPusher.cpp index 7c2656ba..75844476 100644 --- a/image_pusher/TCPStreamPusher.cpp +++ b/image_pusher/TCPStreamPusher.cpp @@ -11,9 +11,6 @@ #include #include #include -#if defined(MSG_ZEROCOPY) -#include -#endif namespace { int64_t SteadyNowNs() { @@ -197,27 +194,14 @@ bool TCPStreamPusher::IsConnectionAlive(const Connection& c) const { return so_error == 0; } -bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len, bool allow_zerocopy, - bool* zc_used_out, uint32_t* zc_first_out, uint32_t* zc_last_out) { +bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len) { const auto* p = static_cast(buf); size_t sent = 0; - bool zc_used = false; - uint32_t zc_first = 0; - uint32_t zc_last = 0; - - bool try_zerocopy = false; -#if defined(MSG_ZEROCOPY) - try_zerocopy = allow_zerocopy && c.zerocopy_enabled.load(std::memory_order_relaxed); -#endif while (sent < len) { const int local_fd = c.fd.load(); - if (local_fd < 0 || c.broken) { - if (zc_used_out) *zc_used_out = zc_used; - if (zc_first_out) *zc_first_out = zc_first; - if (zc_last_out) *zc_last_out = zc_last; + if (local_fd < 0 || c.broken) return false; - } // Treat backpressure as fatal only if the peer shows NO sign of life. A busy // writer keeps refreshing last_peer_activity_ns via BUSY heartbeats / ACKs, so @@ -228,9 +212,6 @@ bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len, bool a " for " + std::to_string(silent_ns / 1000000) + " ms while sending; marking broken"); c.broken = true; CloseFd(c.fd); - if (zc_used_out) *zc_used_out = zc_used; - if (zc_first_out) *zc_first_out = zc_first; - if (zc_last_out) *zc_last_out = zc_last; return false; } @@ -243,9 +224,6 @@ bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len, bool a continue; c.broken = true; CloseFd(c.fd); - if (zc_used_out) *zc_used_out = zc_used; - if (zc_first_out) *zc_first_out = zc_first; - if (zc_last_out) *zc_last_out = zc_last; return false; } if (prc == 0) @@ -253,63 +231,28 @@ bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len, bool a if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLOUT)) { c.broken = true; CloseFd(c.fd); - if (zc_used_out) *zc_used_out = zc_used; - if (zc_first_out) *zc_first_out = zc_first; - if (zc_last_out) *zc_last_out = zc_last; return false; } - int flags = MSG_NOSIGNAL; -#if defined(MSG_ZEROCOPY) - if (try_zerocopy) - flags |= MSG_ZEROCOPY; -#endif - - ssize_t rc = ::send(local_fd, p + sent, len - sent, flags); + ssize_t rc = ::send(local_fd, p + sent, len - sent, MSG_NOSIGNAL); if (rc < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) continue; -#if defined(MSG_ZEROCOPY) - if (try_zerocopy && (errno == EOPNOTSUPP || errno == EINVAL || errno == ENOBUFS)) { - try_zerocopy = false; - c.zerocopy_enabled.store(false, std::memory_order_relaxed); - continue; - } -#endif - if (errno == EPIPE || errno == ECONNRESET || errno == ENOTCONN) { c.broken = true; CloseFd(c.fd); } - - if (zc_used_out) *zc_used_out = zc_used; - if (zc_first_out) *zc_first_out = zc_first; - if (zc_last_out) *zc_last_out = zc_last; return false; } -#if defined(MSG_ZEROCOPY) - if (try_zerocopy && rc > 0) { - const uint32_t this_id = c.zc_next_id++; - if (!zc_used) { - zc_used = true; - zc_first = this_id; - } - zc_last = this_id; - } -#endif - sent += static_cast(rc); } - if (zc_used_out) *zc_used_out = zc_used; - if (zc_first_out) *zc_first_out = zc_first; - if (zc_last_out) *zc_last_out = zc_last; return true; } -bool TCPStreamPusher::SendFrame(Connection& c, const uint8_t* data, size_t size, TCPFrameType type, int64_t image_number, ZeroCopyReturnValue* z) { +bool TCPStreamPusher::SendFrame(Connection& c, const uint8_t* data, size_t size, TCPFrameType type, int64_t image_number) { TcpFrameHeader h{}; h.type = static_cast(type); h.payload_size = size; @@ -317,35 +260,11 @@ bool TCPStreamPusher::SendFrame(Connection& c, const uint8_t* data, size_t size, h.socket_number = c.socket_number; h.run_number = run_number; - if (!SendAll(c, &h, sizeof(h), false)) { - if (z) z->release(); + if (!SendAll(c, &h, sizeof(h))) return false; - } - bool zc_used = false; - uint32_t zc_first = 0; - uint32_t zc_last = 0; - - if (size > 0) { - // MSG_ZEROCOPY leaves the kernel referencing the caller's buffer after send() - // returns, so it is only safe when a ZeroCopyReturnValue keeps that buffer alive - // until the completion notification. The synchronous path passes a transient - // buffer (z == nullptr) that the caller reuses immediately — never zero-copy it. - const bool allow_zerocopy = (z != nullptr) - && (type == TCPFrameType::DATA || type == TCPFrameType::CALIBRATION); - if (!SendAll(c, data, size, allow_zerocopy, &zc_used, &zc_first, &zc_last)) { - if (z) { - if (zc_used) EnqueueZeroCopyPending(c, z, zc_first, zc_last); - else z->release(); - } - return false; - } - } - - if (z) { - if (zc_used) EnqueueZeroCopyPending(c, z, zc_first, zc_last); - else z->release(); - } + if (size > 0 && !SendAll(c, data, size)) + return false; if (type == TCPFrameType::DATA) c.data_sent.fetch_add(1, std::memory_order_relaxed); @@ -353,110 +272,6 @@ bool TCPStreamPusher::SendFrame(Connection& c, const uint8_t* data, size_t size, return true; } -// Caller must hold c.zc_mutex. -void TCPStreamPusher::ReleaseCompletedZeroCopy(Connection& c) { - while (!c.zc_pending.empty()) { - const auto& front = c.zc_pending.front(); - if (c.zc_completed_id == std::numeric_limits::max() || front.last_id > c.zc_completed_id) - break; - if (front.z) - front.z->release(); - c.zc_pending.pop_front(); - } -} - -void TCPStreamPusher::EnqueueZeroCopyPending(Connection& c, ZeroCopyReturnValue* z, uint32_t first_id, uint32_t last_id) { - std::unique_lock ul(c.zc_mutex); - c.zc_pending.push_back(Connection::PendingZC{ - .first_id = first_id, - .last_id = last_id, - .z = z - }); - c.zc_cv.notify_all(); -} - -void TCPStreamPusher::ForceReleasePendingZeroCopy(Connection& c) { - std::unique_lock ul(c.zc_mutex); - while (!c.zc_pending.empty()) { - auto p = c.zc_pending.front(); - c.zc_pending.pop_front(); - if (p.z) - p.z->release(); - } - c.zc_cv.notify_all(); -} - -bool TCPStreamPusher::WaitForZeroCopyDrain(Connection& c, std::chrono::milliseconds timeout) { - std::unique_lock ul(c.zc_mutex); - return c.zc_cv.wait_for(ul, timeout, [&] { - return c.zc_pending.empty() || c.broken.load(); - }); -} - -void TCPStreamPusher::ZeroCopyCompletionThread(Connection* c) { -#if defined(MSG_ZEROCOPY) - while (c->active || !c->zc_pending.empty()) { - const int local_fd = c->fd.load(); - if (local_fd < 0) - break; - - pollfd pfd{}; - pfd.fd = local_fd; - pfd.events = 0; // We only care about POLLERR for errqueue - - const int prc = poll(&pfd, 1, 100); - if (prc < 0) { - if (errno == EINTR) - continue; - break; - } - if (prc == 0) - continue; - - // Only process the error queue; POLLERR is always reported regardless of events mask - uint8_t control[512]; - uint8_t data[1]; - iovec iov{}; - iov.iov_base = data; - iov.iov_len = sizeof(data); - - msghdr msg{}; - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - msg.msg_control = control; - msg.msg_controllen = sizeof(control); - - while (true) { - const ssize_t rr = recvmsg(local_fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT); - if (rr < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) - break; - // A real error on the errqueue likely means the socket is dead - c->broken = true; - break; - } - for (cmsghdr* cm = CMSG_FIRSTHDR(&msg); cm != nullptr; cm = CMSG_NXTHDR(&msg, cm)) { - if (cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) - continue; - - auto* se = reinterpret_cast(CMSG_DATA(cm)); - if (!se || se->ee_origin != SO_EE_ORIGIN_ZEROCOPY) - continue; - - const uint32_t end_id = se->ee_data; - std::unique_lock ul(c->zc_mutex); - if (c->zc_completed_id == std::numeric_limits::max() || end_id > c->zc_completed_id) - c->zc_completed_id = end_id; - ReleaseCompletedZeroCopy(*c); - c->zc_cv.notify_all(); - } - } - } -#else - (void)c; -#endif -} - bool TCPStreamPusher::ReadExact(Connection& c, void* buf, size_t len) { auto* p = static_cast(buf); size_t got = 0; @@ -525,8 +340,8 @@ bool TCPStreamPusher::ReadExactPersistent(Connection& c, void* buf, size_t len) continue; return false; } - // POLLERR can fire alongside POLLIN during zerocopy — only bail if we have - // POLLHUP/POLLNVAL without any readable data. + // Only bail if the peer hung up / the fd is invalid and there is no readable + // data left to drain. if ((pfd.revents & (POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLIN)) return false; if ((pfd.revents & POLLIN) == 0) @@ -560,13 +375,21 @@ void TCPStreamPusher::WriterThread(Connection* c) { continue; } - std::unique_lock ul(c->send_mutex); - if (!SendFrame(*c, - static_cast(e.z->GetImage()), - e.z->GetImageSize(), - TCPFrameType::DATA, - e.z->GetImageNumber(), - e.z)) { + bool ok; + { + std::unique_lock ul(c->send_mutex); + ok = SendFrame(*c, + static_cast(e.z->GetImage()), + e.z->GetImageSize(), + TCPFrameType::DATA, + e.z->GetImageNumber()); + } + + // Synchronous send: once send() returns, the payload has been copied into the + // kernel socket buffer, so the image-buffer slot can be returned immediately. + e.z->release(); + + if (!ok) { c->broken = true; logger.Error("TCP send failed on socket " + std::to_string(c->socket_number)); } @@ -744,14 +567,6 @@ void TCPStreamPusher::SetupNewConnection(int new_fd, uint32_t socket_number) { if (send_buffer_size) setsockopt(new_fd, SOL_SOCKET, SO_SNDBUF, &send_buffer_size.value(), sizeof(int32_t)); -#if defined(SO_ZEROCOPY) - int zc_one = 1; - if (setsockopt(new_fd, SOL_SOCKET, SO_ZEROCOPY, &zc_one, sizeof(zc_one)) == 0) - c->zerocopy_enabled.store(true, std::memory_order_relaxed); - else - c->zerocopy_enabled.store(false, std::memory_order_relaxed); -#endif - c->connected = true; c->broken = false; auto now = std::chrono::steady_clock::now(); @@ -809,7 +624,7 @@ void TCPStreamPusher::KeepaliveThread() { continue; std::unique_lock ul(c.send_mutex); - if (!SendFrame(c, nullptr, 0, TCPFrameType::KEEPALIVE, -1, nullptr)) { + if (!SendFrame(c, nullptr, 0, TCPFrameType::KEEPALIVE, -1)) { logger.Warning("Keepalive send failed on socket " + std::to_string(c.socket_number)); c.broken = true; } else { @@ -850,16 +665,9 @@ void TCPStreamPusher::StartDataCollectionThreads(Connection& c) { c.data_acked_ok.store(0, std::memory_order_relaxed); c.data_acked_bad.store(0, std::memory_order_relaxed); c.data_acked_total.store(0, std::memory_order_relaxed); - c.zc_next_id = 0; - c.zc_completed_id = std::numeric_limits::max(); - { - std::unique_lock ul(c.zc_mutex); - c.zc_pending.clear(); - } c.active = true; c.writer_future = std::async(std::launch::async, &TCPStreamPusher::WriterThread, this, &c); - c.zc_future = std::async(std::launch::async, &TCPStreamPusher::ZeroCopyCompletionThread, this, &c); } void TCPStreamPusher::StopDataCollectionThreads(Connection& c) { @@ -872,30 +680,20 @@ void TCPStreamPusher::StopDataCollectionThreads(Connection& c) { if (!c.queue.PutTimeout({.end = true}, std::chrono::milliseconds(200))) { c.broken = true; CloseFd(c.fd); - c.queue.Clear(); + // active==false makes WriterThread exit without draining, so return any + // queued-but-unsent slots to the image buffer pool ourselves. + ImagePusherQueueElement e{}; + while (c.queue.Get(e)) { + if (e.z) + e.z->release(); + } (void)c.queue.Put({.end = true}); } c.ack_cv.notify_all(); - c.zc_cv.notify_all(); if (c.writer_future.valid()) c.writer_future.get(); - - constexpr auto zc_drain_timeout = std::chrono::seconds(2); - if (!WaitForZeroCopyDrain(c, zc_drain_timeout)) { - logger.Warning("TCP zerocopy completion drain timeout on socket " + std::to_string(c.socket_number) + - "; forcing socket close"); - c.broken = true; - CloseFd(c.fd); - c.zc_cv.notify_all(); - c.ack_cv.notify_all(); - } - - if (c.zc_future.valid()) - c.zc_future.get(); - - ForceReleasePendingZeroCopy(c); } bool TCPStreamPusher::WaitForAck(Connection& c, TCPFrameType ack_for, std::chrono::milliseconds timeout, std::string* error_text) { @@ -1006,7 +804,7 @@ void TCPStreamPusher::StartDataCollection(StartMessage& message) { continue; std::unique_lock ul(c.send_mutex); - (void)SendFrame(c, nullptr, 0, TCPFrameType::CANCEL, -1, nullptr); + (void)SendFrame(c, nullptr, 0, TCPFrameType::CANCEL, -1); std::string cancel_ack_err; (void)WaitForAck(c, TCPFrameType::CANCEL, std::chrono::milliseconds(500), &cancel_ack_err); @@ -1033,7 +831,7 @@ void TCPStreamPusher::StartDataCollection(StartMessage& message) { { std::unique_lock ul(c.send_mutex); - if (!SendFrame(c, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::START, -1, nullptr)) { + if (!SendFrame(c, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::START, -1)) { rollback_cancel(); throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Timeout/failure sending START on socket " + std::to_string(c.socket_number)); @@ -1069,7 +867,7 @@ bool TCPStreamPusher::SendImage(const uint8_t *image_data, size_t image_size, in return false; std::unique_lock ul(c.send_mutex); - return SendFrame(c, image_data, image_size, TCPFrameType::DATA, image_number, nullptr); + return SendFrame(c, image_data, image_size, TCPFrameType::DATA, image_number); } bool TCPStreamPusher::SendImage(ZeroCopyReturnValue &z) { @@ -1140,7 +938,7 @@ bool TCPStreamPusher::EndDataCollection(const EndMessage& message) { { std::unique_lock ul(c.send_mutex); - if (!SendFrame(c, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::END, -1, nullptr)) { + if (!SendFrame(c, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::END, -1)) { ret = false; continue; } @@ -1233,8 +1031,7 @@ bool TCPStreamPusher::SendCalibration(const CompressedImage& message) { serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::CALIBRATION, - -1, - nullptr); + -1); if (!ok) { logger.Error("Failed to send TCP calibration on socket {}", target->socket_number); diff --git a/image_pusher/TCPStreamPusher.h b/image_pusher/TCPStreamPusher.h index 2ae61271..78398d8a 100644 --- a/image_pusher/TCPStreamPusher.h +++ b/image_pusher/TCPStreamPusher.h @@ -10,7 +10,6 @@ #include #include #include -#include #include "ImagePusher.h" #include "ZMQWriterNotificationPuller.h" @@ -26,9 +25,8 @@ /// - KeepaliveThread: sends periodic keepalive frames when idle (skipped during data collection) /// - Per-connection WriterThread: drains the connection's queue, sends DATA frames /// - Per-connection PersistentAckThread: reads ACKs and keepalive pongs from the peer -/// - Per-connection ZeroCopyCompletionThread: processes kernel MSG_ZEROCOPY completions /// -/// Lock ordering: connections_mutex → send_mutex → ack_mutex → zc_mutex +/// Lock ordering: connections_mutex → send_mutex → ack_mutex /// IMPORTANT: Never call blocking queue operations while holding connections_mutex. /// /// Concurrency contract: @@ -48,11 +46,9 @@ class TCPStreamPusher : public ImagePusher { std::atomic active{false}; // data-collection threads running std::atomic broken{false}; std::atomic connected{false}; // persistent connection is alive - std::atomic zerocopy_enabled{false}; ThreadSafeFIFO queue; std::future writer_future; - std::future zc_future; // Persistent ack/keepalive reader (runs as long as the connection is alive) std::future persistent_ack_future; @@ -61,18 +57,6 @@ class TCPStreamPusher : public ImagePusher { std::mutex ack_mutex; std::condition_variable ack_cv; - struct PendingZC { - uint32_t first_id = 0; - uint32_t last_id = 0; - ZeroCopyReturnValue* z = nullptr; - }; - - std::mutex zc_mutex; - std::condition_variable zc_cv; - std::deque zc_pending; - uint32_t zc_next_id = 0; - uint32_t zc_completed_id = std::numeric_limits::max(); - bool start_ack_received = false; bool start_ack_ok = false; bool end_ack_received = false; @@ -145,15 +129,13 @@ class TCPStreamPusher : public ImagePusher { static void CloseFd(std::atomic& fd); bool IsConnectionAlive(const Connection& c) const; - bool SendAll(Connection& c, const void* buf, size_t len, bool allow_zerocopy, - bool* zc_used = nullptr, uint32_t* zc_first = nullptr, uint32_t* zc_last = nullptr); + bool SendAll(Connection& c, const void* buf, size_t len); bool ReadExact(Connection& c, void* buf, size_t len); bool ReadExactPersistent(Connection& c, void* buf, size_t len); - bool SendFrame(Connection& c, const uint8_t* data, size_t size, TCPFrameType type, int64_t image_number, ZeroCopyReturnValue* z); + bool SendFrame(Connection& c, const uint8_t* data, size_t size, TCPFrameType type, int64_t image_number); void WriterThread(Connection* c); void PersistentAckThread(Connection* c); - void ZeroCopyCompletionThread(Connection* c); void AcceptorThread(); void KeepaliveThread(); @@ -164,11 +146,6 @@ class TCPStreamPusher : public ImagePusher { void StartDataCollectionThreads(Connection& c); void StopDataCollectionThreads(Connection& c); - void EnqueueZeroCopyPending(Connection& c, ZeroCopyReturnValue* z, uint32_t first_id, uint32_t last_id); - void ReleaseCompletedZeroCopy(Connection& c); - void ForceReleasePendingZeroCopy(Connection& c); - bool WaitForZeroCopyDrain(Connection& c, std::chrono::milliseconds timeout); - bool WaitForAck(Connection& c, TCPFrameType ack_for, std::chrono::milliseconds timeout, std::string* error_text); bool WaitForEndAck(Connection& c, std::chrono::milliseconds liveness_timeout, std::string* error_text); public: -- 2.52.0 From 2a9fd084ab93ba5b99142dd1aa3aceccb5f6fa0b Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Thu, 25 Jun 2026 13:36:57 +0200 Subject: [PATCH 2/7] TCPStreamPusher: post-zerocopy cleanup + fix queue-path backpressure drop Follow-up simplifications after removing the zerocopy machinery, plus a real backpressure bug the cleanup surfaced: - SendImage(ZeroCopyReturnValue&) imposed a hard 2s deadline on enqueueing and then marked the connection broken. At high frame rate the 128-deep queue fills in tens of ms, so any filesystem stall longer than ~2s dropped the run even though the writer was alive and heartbeating -- defeating the whole BUSY-heartbeat backpressure design. Block instead while the peer is alive (!broken && active); the real liveness decision already lives in SendAll's peer-liveness timeout, which the writer's BUSY heartbeats keep fresh. This makes the queue path consistent with the send path: both wait out arbitrarily long stalls and only give up when the peer goes genuinely silent. - Drop the dead per-connection data_sent counter (written, never read) and the redundant ImagePusherQueueElement.image_data set on the TCP path (only the HDF5 pusher reads that field). - Add SetPeerLivenessTimeout() so the liveness window is tunable (and testable). Add TCPImageCommTest_StalledWriter_SurvivesViaHeartbeat: a controllable raw writer double connects, ACKs START, then stops draining for 4s while still sending BUSY heartbeats (peer-liveness window set to 2s). The run must ride out the stall on the zero-copy queue path and deliver all 1000 images. Verified to fail (115/1000 delivered, connection dropped) against the old 2s-deadline behavior. Co-Authored-By: Claude Opus 4.8 (1M context) --- image_pusher/TCPStreamPusher.cpp | 10 +- image_pusher/TCPStreamPusher.h | 7 +- tests/TCPImagePusherTest.cpp | 232 +++++++++++++++++++++++++++++++ 3 files changed, 241 insertions(+), 8 deletions(-) diff --git a/image_pusher/TCPStreamPusher.cpp b/image_pusher/TCPStreamPusher.cpp index 75844476..d8291661 100644 --- a/image_pusher/TCPStreamPusher.cpp +++ b/image_pusher/TCPStreamPusher.cpp @@ -266,9 +266,6 @@ bool TCPStreamPusher::SendFrame(Connection& c, const uint8_t* data, size_t size, if (size > 0 && !SendAll(c, data, size)) return false; - if (type == TCPFrameType::DATA) - c.data_sent.fetch_add(1, std::memory_order_relaxed); - return true; } @@ -661,7 +658,6 @@ void TCPStreamPusher::StartDataCollectionThreads(Connection& c) { c.data_ack_error_text.clear(); } - c.data_sent.store(0, std::memory_order_relaxed); c.data_acked_ok.store(0, std::memory_order_relaxed); c.data_acked_bad.store(0, std::memory_order_relaxed); c.data_acked_total.store(0, std::memory_order_relaxed); @@ -887,7 +883,7 @@ bool TCPStreamPusher::SendImage(ZeroCopyReturnValue &z) { target = use[idx]; } - if (!target || target->broken || !target->active) { + if (!target) { z.release(); return false; } @@ -896,10 +892,10 @@ bool TCPStreamPusher::SendImage(ZeroCopyReturnValue &z) { // freshly-created file at startup). Keep waiting as long as it proves it is alive — // BUSY heartbeats / ACKs refresh last_peer_activity_ns from a thread independent of // its stalled write path — so a slow-but-healthy writer is held back, not dropped. - // Only a peer that has gone completely silent for the liveness window is declared dead. + // Only a peer that has gone completely silent for the liveness window is declared dead; + // a writer that heartbeats but never drains is caught by SendAll's max_backpressure cap. while (!target->broken && target->active) { if (target->queue.PutTimeout(ImagePusherQueueElement{ - .image_data = static_cast(z.GetImage()), .z = &z, .end = false }, std::chrono::milliseconds(50))) diff --git a/image_pusher/TCPStreamPusher.h b/image_pusher/TCPStreamPusher.h index 78398d8a..7ef05382 100644 --- a/image_pusher/TCPStreamPusher.h +++ b/image_pusher/TCPStreamPusher.h @@ -71,7 +71,6 @@ class TCPStreamPusher : public ImagePusher { std::atomic data_ack_error_reported{false}; std::string data_ack_error_text; - std::atomic data_sent{0}; std::atomic data_acked_ok{0}; std::atomic data_acked_bad{0}; std::atomic data_acked_total{0}; @@ -155,6 +154,12 @@ public: ~TCPStreamPusher() override; + /// Max time a send may block on backpressure with no sign of life from the peer + /// before the connection is declared dead. A busy-but-alive writer keeps it fresh + /// via BUSY heartbeats, so this only catches a genuinely silent peer. Must be set + /// before data collection starts. + void SetPeerLivenessTimeout(std::chrono::milliseconds t) { peer_liveness_timeout = t; } + std::vector GetAddress() const override { return {endpoint}; } /// Returns the number of currently connected writers (can be called at any time) diff --git a/tests/TCPImagePusherTest.cpp b/tests/TCPImagePusherTest.cpp index 7b558518..20cf1440 100644 --- a/tests/TCPImagePusherTest.cpp +++ b/tests/TCPImagePusherTest.cpp @@ -3,11 +3,25 @@ #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include "../image_pusher/TCPStreamPusher.h" #include "../image_puller/TCPImagePuller.h" #include "../image_puller/ZMQImagePuller.h" +#include "../common/ImageBuffer.h" +#include "../common/ZeroCopyReturnValue.h" TEST_CASE("TCPImageCommTest_2Writers_WithAck", "[TCP]") { const size_t nframes = 128; @@ -698,3 +712,221 @@ TEST_CASE("TCPImageCommTest_RepubToZMQ", "[TCP][ZeroMQ]") { REQUIRE(zmq_nimages == nframes); REQUIRE(zmq_errors == 0); } + +namespace { + +// Controllable TCP "writer" peer for backpressure tests. Connects to the pusher, ACKs +// START, then *stalls* (stops draining the socket) until Release() is called, while a +// background thread keeps sending BUSY heartbeats — i.e. a writer that is alive but +// wedged (e.g. on a slow filesystem at high frame rate). Catch2 assertion macros are not +// thread-safe, so the worker threads only touch atomics; the test thread asserts. +class StallableWriterDouble { +public: + StallableWriterDouble(const std::string &tcp_addr, int rcvbuf_bytes) { + auto [host, port] = ParseHostPort(tcp_addr); + fd_ = ::socket(AF_INET, SOCK_STREAM, 0); + if (fd_ < 0) + return; + setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &rcvbuf_bytes, sizeof(rcvbuf_bytes)); + sockaddr_in sin{}; + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + inet_pton(AF_INET, host.c_str(), &sin.sin_addr); + if (::connect(fd_, reinterpret_cast(&sin), sizeof(sin)) != 0) { + ::close(fd_); + fd_ = -1; + return; + } + busy_thread_ = std::thread([this] { BusyLoop(); }); + reader_thread_ = std::thread([this] { ReaderLoop(); }); + } + + ~StallableWriterDouble() { + stop_ = true; + Release(); + if (fd_ >= 0) + ::shutdown(fd_, SHUT_RDWR); + if (reader_thread_.joinable()) + reader_thread_.join(); + if (busy_thread_.joinable()) + busy_thread_.join(); + if (fd_ >= 0) + ::close(fd_); + } + + [[nodiscard]] bool Connected() const { return fd_ >= 0; } + + // Stop stalling: let the reader drain DATA and ACK END. + void Release() { + { + std::lock_guard lg(mtx_); + released_ = true; + } + cv_.notify_all(); + } + + [[nodiscard]] size_t DataFramesReceived() const { return data_frames_.load(); } + [[nodiscard]] bool EndAcked() const { return end_acked_.load(); } + +private: + static std::pair ParseHostPort(const std::string &addr) { + const std::string prefix = "tcp://"; + const auto hp = addr.substr(prefix.size()); + const auto p = hp.find_last_of(':'); + return {hp.substr(0, p), static_cast(std::stoi(hp.substr(p + 1)))}; + } + + bool SendHeader(TCPFrameType type, TCPFrameType ack_for, uint64_t run, uint32_t sock, uint32_t flags) { + TcpFrameHeader h{}; + h.type = static_cast(type); + h.ack_for = static_cast(ack_for); + h.run_number = run; + h.socket_number = sock; + h.flags = flags; + h.payload_size = 0; + std::lock_guard lg(send_mtx_); + if (fd_ < 0) + return false; + return ::send(fd_, &h, sizeof(h), MSG_NOSIGNAL) == static_cast(sizeof(h)); + } + + bool ReadExact(void *buf, size_t len) { + auto *p = static_cast(buf); + size_t got = 0; + while (got < len) { + const ssize_t rc = ::recv(fd_, p + got, len - got, 0); + if (rc <= 0) + return false; + got += static_cast(rc); + } + return true; + } + + void BusyLoop() { + // Heartbeat keeps the pusher's peer-liveness fresh even while we are not draining. + while (!stop_) { + SendHeader(TCPFrameType::BUSY, TCPFrameType::DATA, run_.load(), sock_.load(), 0); + for (int i = 0; i < 5 && !stop_; i++) + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + } + + void ReaderLoop() { + std::vector discard; + while (!stop_) { + TcpFrameHeader h{}; + if (!ReadExact(&h, sizeof(h))) + return; + if (h.magic != JFJOCH_TCP_MAGIC || h.version != JFJOCH_TCP_VERSION) + return; + if (h.payload_size > 0) { + discard.resize(h.payload_size); + if (!ReadExact(discard.data(), discard.size())) + return; + } + switch (static_cast(h.type)) { + case TCPFrameType::START: + run_.store(h.run_number); + sock_.store(h.socket_number); + SendHeader(TCPFrameType::ACK, TCPFrameType::START, h.run_number, h.socket_number, TCP_ACK_FLAG_OK); + { // Stall: stop reading until released. + std::unique_lock ul(mtx_); + cv_.wait(ul, [this] { return released_ || stop_; }); + } + break; + case TCPFrameType::DATA: + data_frames_.fetch_add(1); + break; + case TCPFrameType::END: + SendHeader(TCPFrameType::ACK, TCPFrameType::END, h.run_number, h.socket_number, TCP_ACK_FLAG_OK); + end_acked_.store(true); + return; + default: + break; // ignore KEEPALIVE etc. + } + } + } + + int fd_ = -1; + std::thread reader_thread_; + std::thread busy_thread_; + std::atomic stop_{false}; + std::atomic run_{0}; + std::atomic sock_{0}; + std::atomic data_frames_{0}; + std::atomic end_acked_{false}; + std::mutex send_mtx_; + std::mutex mtx_; + std::condition_variable cv_; + bool released_ = false; +}; + +} // namespace + +TEST_CASE("TCPImageCommTest_StalledWriter_SurvivesViaHeartbeat", "[TCP]") { + // A writer that is alive (still heartbeating) but has stopped draining — e.g. wedged + // on a slow filesystem at high frame rate — must NOT be dropped mid-run. The pusher + // rides out the backpressure on the production zero-copy queue path until the writer + // recovers. Regression for the queue-path send giving up on a fixed deadline, and for + // the BUSY heartbeat keeping the connection alive past the peer-liveness window. + constexpr int64_t N = 1000; // > queue depth + socket buffers + constexpr auto liveness = std::chrono::milliseconds(2000); + constexpr auto stall = std::chrono::milliseconds(4000); // > liveness AND > old send deadline + + // Small SO_SNDBUF/SO_RCVBUF so backpressure reaches the queue after few images. + TCPStreamPusher pusher("tcp://127.0.0.1:*", 1, 16 * 1024); + pusher.SetPeerLivenessTimeout(liveness); + + StallableWriterDouble writer(pusher.GetAddress()[0], 16 * 1024); + REQUIRE(writer.Connected()); + + for (int attempt = 0; attempt < 200 && pusher.GetConnectedWriters() < 1; ++attempt) + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + REQUIRE(pusher.GetConnectedWriters() == 1); + + ImageBuffer image_buffer(16 * 1024 * 1024); + image_buffer.StartMeasurement(static_cast(4096)); + + StartMessage start{.images_per_file = 1000, .write_master_file = true}; + pusher.StartDataCollection(start); // writer ACKs START, then stalls (stops reading) + + auto sender = std::async(std::launch::async, [&] { + for (int64_t i = 0; i < N; i++) { + ZeroCopyReturnValue *slot = nullptr; + while ((slot = image_buffer.GetImageSlot()) == nullptr) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::memset(slot->GetImage(), 0, 256); + slot->SetImageNumber(i); + slot->SetImageSize(256); // arbitrary payload; the writer double discards it + slot->ReadyToSend(); + pusher.SendImage(*slot); + } + }); + + // During the stall the queue is full; SendImage must block, not drop the connection. + std::this_thread::sleep_for(stall); + CHECK(pusher.GetConnectedWriters() == 1); + CHECK(sender.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready); + + // Writer recovers and starts draining. + writer.Release(); + + REQUIRE(sender.wait_for(std::chrono::seconds(30)) == std::future_status::ready); + sender.get(); + + // Every image makes it across once the stall clears. + for (int attempt = 0; attempt < 1200 && writer.DataFramesReceived() < static_cast(N); ++attempt) + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + CHECK(writer.DataFramesReceived() == static_cast(N)); + + // Queue fully drained: END now hands over cleanly without racing data frames. + EndMessage end{}; + CHECK(pusher.EndDataCollection(end) == true); + + for (int attempt = 0; attempt < 200 && !writer.EndAcked(); ++attempt) + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + CHECK(writer.EndAcked()); + CHECK(pusher.GetConnectedWriters() == 1); + + image_buffer.Finalize(std::chrono::seconds(5)); +} -- 2.52.0 From 3cd96b8607a38c6f9928f39f81a96197f50858ca Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Thu, 25 Jun 2026 13:52:37 +0200 Subject: [PATCH 3/7] TCPStreamPusher: hard backpressure cap so a wedged writer can't hang the run The peer-liveness timeout only catches a *silent* writer. A misbehaving writer that keeps sending BUSY heartbeats while never draining (e.g. a permanently wedged filesystem) would otherwise block SendAll -- and, through it, the queued SendImage path and the end-of-run frame_transformation_futures.get() -- forever. Add a progress-based cap in SendAll: if no bytes leave the socket for max_backpressure_timeout (default 60s, tunable via SetMaxBackpressureTimeout) the connection is declared dead regardless of heartbeats. It is one global cap, enforced everywhere SendAll runs, so it bounds both mid-run stalls and finalization. Generous relative to the 15s liveness window, since a heartbeating peer is given more grace than a silent one -- but finite. Add TCPImageCommTest_WedgedWriter_DroppedByBackpressureCap: a writer that ACKs START then stalls forever while heartbeating (cap 1.5s, liveness 5s) must have its connection dropped, and neither the producers nor EndDataCollection may hang. Verified to hang (timeout) with the cap disabled. Co-Authored-By: Claude Opus 4.8 (1M context) --- image_pusher/TCPStreamPusher.cpp | 21 ++++++++++-- image_pusher/TCPStreamPusher.h | 12 +++++++ tests/TCPImagePusherTest.cpp | 59 ++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 2 deletions(-) diff --git a/image_pusher/TCPStreamPusher.cpp b/image_pusher/TCPStreamPusher.cpp index d8291661..f4c9b5ac 100644 --- a/image_pusher/TCPStreamPusher.cpp +++ b/image_pusher/TCPStreamPusher.cpp @@ -197,16 +197,19 @@ bool TCPStreamPusher::IsConnectionAlive(const Connection& c) const { bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len) { const auto* p = static_cast(buf); size_t sent = 0; + int64_t last_progress_ns = SteadyNowNs(); while (sent < len) { const int local_fd = c.fd.load(); if (local_fd < 0 || c.broken) return false; + const int64_t now_ns = SteadyNowNs(); + // Treat backpressure as fatal only if the peer shows NO sign of life. A busy // writer keeps refreshing last_peer_activity_ns via BUSY heartbeats / ACKs, so - // we wait through arbitrarily long stalls; only a silent peer trips the timeout. - const int64_t silent_ns = SteadyNowNs() - c.last_peer_activity_ns.load(std::memory_order_relaxed); + // we wait through long stalls; only a silent peer trips this timeout. + const int64_t silent_ns = now_ns - c.last_peer_activity_ns.load(std::memory_order_relaxed); if (silent_ns > std::chrono::duration_cast(peer_liveness_timeout).count()) { logger.Warning("No liveness from writer on socket " + std::to_string(c.socket_number) + " for " + std::to_string(silent_ns / 1000000) + " ms while sending; marking broken"); @@ -215,6 +218,19 @@ bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len) { return false; } + // Hard backpressure cap: a writer may keep heartbeating yet never drain (e.g. a + // permanently wedged filesystem). If no bytes leave the socket for this long, give + // up regardless of heartbeats so the run -- and its finalization -- cannot hang. + if (now_ns - last_progress_ns > + std::chrono::duration_cast(max_backpressure_timeout).count()) { + logger.Warning("No send progress to writer on socket " + std::to_string(c.socket_number) + + " for " + std::to_string((now_ns - last_progress_ns) / 1000000) + + " ms despite heartbeats; marking broken"); + c.broken = true; + CloseFd(c.fd); + return false; + } + pollfd pfd{}; pfd.fd = local_fd; pfd.events = POLLOUT; @@ -247,6 +263,7 @@ bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len) { } sent += static_cast(rc); + last_progress_ns = SteadyNowNs(); } return true; diff --git a/image_pusher/TCPStreamPusher.h b/image_pusher/TCPStreamPusher.h index 7ef05382..c2ec0edf 100644 --- a/image_pusher/TCPStreamPusher.h +++ b/image_pusher/TCPStreamPusher.h @@ -109,6 +109,12 @@ class TCPStreamPusher : public ImagePusher { // every ~250 ms via BUSY heartbeats (and via DATA ACKs), so genuine backpressure of any // duration is tolerated; only a truly silent (frozen/dead) peer trips this. std::chrono::milliseconds peer_liveness_timeout{15000}; + // Hard upper bound on backpressure: if the socket accepts no bytes for this long the + // writer is wedged and is declared dead even if it keeps heartbeating, so a + // misbehaving writer cannot block the run (or its finalization) forever. Generous + // relative to peer_liveness_timeout, since a heartbeating peer is given more grace + // than a silent one — but still finite. + std::chrono::milliseconds max_backpressure_timeout{60000}; int64_t images_per_file = 1; uint64_t run_number = 0; @@ -160,6 +166,12 @@ public: /// before data collection starts. void SetPeerLivenessTimeout(std::chrono::milliseconds t) { peer_liveness_timeout = t; } + /// Hard upper bound on backpressure. Even while the peer keeps heartbeating, if no + /// bytes can be sent for this long the writer is declared dead so a wedged writer + /// cannot block the run or its finalization forever. Must be set before data + /// collection starts. + void SetMaxBackpressureTimeout(std::chrono::milliseconds t) { max_backpressure_timeout = t; } + std::vector GetAddress() const override { return {endpoint}; } /// Returns the number of currently connected writers (can be called at any time) diff --git a/tests/TCPImagePusherTest.cpp b/tests/TCPImagePusherTest.cpp index 20cf1440..9b04c63d 100644 --- a/tests/TCPImagePusherTest.cpp +++ b/tests/TCPImagePusherTest.cpp @@ -930,3 +930,62 @@ TEST_CASE("TCPImageCommTest_StalledWriter_SurvivesViaHeartbeat", "[TCP]") { image_buffer.Finalize(std::chrono::seconds(5)); } + +TEST_CASE("TCPImageCommTest_WedgedWriter_DroppedByBackpressureCap", "[TCP]") { + // A writer that keeps heartbeating but never drains (e.g. a permanently wedged + // filesystem) must not block the run or its finalization forever. The hard + // backpressure cap tears the connection down even though BUSY keeps arriving, and + // well before the (longer) peer-liveness timeout that those heartbeats keep at bay. + constexpr int64_t N = 1000; + constexpr auto liveness = std::chrono::milliseconds(5000); // kept fresh by heartbeats + constexpr auto max_backpressure = std::chrono::milliseconds(1500); + + TCPStreamPusher pusher("tcp://127.0.0.1:*", 1, 16 * 1024); + pusher.SetPeerLivenessTimeout(liveness); + pusher.SetMaxBackpressureTimeout(max_backpressure); + + StallableWriterDouble writer(pusher.GetAddress()[0], 16 * 1024); // never released + REQUIRE(writer.Connected()); + + for (int attempt = 0; attempt < 200 && pusher.GetConnectedWriters() < 1; ++attempt) + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + REQUIRE(pusher.GetConnectedWriters() == 1); + + ImageBuffer image_buffer(16 * 1024 * 1024); + image_buffer.StartMeasurement(static_cast(4096)); + + StartMessage start{.images_per_file = 1000, .write_master_file = true}; + pusher.StartDataCollection(start); // writer ACKs START, then stalls forever + + auto sender = std::async(std::launch::async, [&] { + for (int64_t i = 0; i < N; i++) { + ZeroCopyReturnValue *slot = nullptr; + while ((slot = image_buffer.GetImageSlot()) == nullptr) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::memset(slot->GetImage(), 0, 256); + slot->SetImageNumber(i); + slot->SetImageSize(256); + slot->ReadyToSend(); + pusher.SendImage(*slot); + } + }); + + // The cap must fire and drop the connection despite continuous heartbeats. + bool dropped = false; + for (int attempt = 0; attempt < 400 && !dropped; ++attempt) { + if (pusher.GetConnectedWriters() == 0) + dropped = true; + else + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + } + CHECK(dropped); + + // Neither the producers nor finalization may hang once the writer is wedged. + REQUIRE(sender.wait_for(std::chrono::seconds(10)) == std::future_status::ready); + sender.get(); + + EndMessage end{}; + CHECK(pusher.EndDataCollection(end) == false); // bounded, and reports failure + + image_buffer.Finalize(std::chrono::seconds(5)); +} -- 2.52.0 From a02fd19af40fdd6ef5b7456c09ac48a4556636c1 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Thu, 25 Jun 2026 14:17:27 +0200 Subject: [PATCH 4/7] broker: optional TCP liveness/backpressure timeouts in config Expose the two new TCPStreamPusher knobs through the broker config so they can be tuned per deployment, like send_buffer_size already is: - peer_liveness_timeout_ms -> SetPeerLivenessTimeout() - max_backpressure_timeout_ms -> SetMaxBackpressureTimeout() Both are optional fields on tcp_settings; when unset the parser leaves the pusher's built-in defaults (15s / 60s) in place, so existing config files keep working unchanged. The parser also ignores non-positive values. The generated broker/gen/model/Tcp_settings.{h,cpp} are hand-patched to mirror exactly what openapi-generator emits for these fields (verified against the send_buffer_size pattern and by a from_json/to_json round-trip), so re-running update_version.sh reproduces them with no diff. Co-Authored-By: Claude Opus 4.8 (1M context) --- broker/JFJochBrokerParser.cpp | 6 +++ broker/gen/model/Tcp_settings.cpp | 70 ++++++++++++++++++++++++++++--- broker/gen/model/Tcp_settings.h | 20 ++++++++- broker/jfjoch_api.yaml | 15 +++++++ 4 files changed, 104 insertions(+), 7 deletions(-) diff --git a/broker/JFJochBrokerParser.cpp b/broker/JFJochBrokerParser.cpp index 54f4ada9..af2061f4 100644 --- a/broker/JFJochBrokerParser.cpp +++ b/broker/JFJochBrokerParser.cpp @@ -206,6 +206,12 @@ std::unique_ptr ParseTCPImagePusher(const org::openapitools::server auto tmp = std::make_unique(j.getTcp().getImageSocket(), j.getTcp().getNwriters(), send_buffer_size); + // Optional liveness/backpressure tuning; unset -> keep the pusher's built-in defaults. + if (j.getTcp().peerLivenessTimeoutMsIsSet() && j.getTcp().getPeerLivenessTimeoutMs() > 0) + tmp->SetPeerLivenessTimeout(std::chrono::milliseconds(j.getTcp().getPeerLivenessTimeoutMs())); + if (j.getTcp().maxBackpressureTimeoutMsIsSet() && j.getTcp().getMaxBackpressureTimeoutMs() > 0) + tmp->SetMaxBackpressureTimeout(std::chrono::milliseconds(j.getTcp().getMaxBackpressureTimeoutMs())); + return std::move(tmp); } diff --git a/broker/gen/model/Tcp_settings.cpp b/broker/gen/model/Tcp_settings.cpp index 883ab4b7..ef632894 100644 --- a/broker/gen/model/Tcp_settings.cpp +++ b/broker/gen/model/Tcp_settings.cpp @@ -27,7 +27,11 @@ Tcp_settings::Tcp_settings() m_Image_socketIsSet = false; m_Nwriters = 32L; m_NwritersIsSet = false; - + m_Peer_liveness_timeout_ms = 0L; + m_Peer_liveness_timeout_msIsSet = false; + m_Max_backpressure_timeout_ms = 0L; + m_Max_backpressure_timeout_msIsSet = false; + } void Tcp_settings::validate() const @@ -84,8 +88,14 @@ bool Tcp_settings::operator==(const Tcp_settings& rhs) const ((!imageSocketIsSet() && !rhs.imageSocketIsSet()) || (imageSocketIsSet() && rhs.imageSocketIsSet() && getImageSocket() == rhs.getImageSocket())) && - ((!nwritersIsSet() && !rhs.nwritersIsSet()) || (nwritersIsSet() && rhs.nwritersIsSet() && getNwriters() == rhs.getNwriters())) - + ((!nwritersIsSet() && !rhs.nwritersIsSet()) || (nwritersIsSet() && rhs.nwritersIsSet() && getNwriters() == rhs.getNwriters())) && + + + ((!peerLivenessTimeoutMsIsSet() && !rhs.peerLivenessTimeoutMsIsSet()) || (peerLivenessTimeoutMsIsSet() && rhs.peerLivenessTimeoutMsIsSet() && getPeerLivenessTimeoutMs() == rhs.getPeerLivenessTimeoutMs())) && + + + ((!maxBackpressureTimeoutMsIsSet() && !rhs.maxBackpressureTimeoutMsIsSet()) || (maxBackpressureTimeoutMsIsSet() && rhs.maxBackpressureTimeoutMsIsSet() && getMaxBackpressureTimeoutMs() == rhs.getMaxBackpressureTimeoutMs())) + ; } @@ -103,7 +113,11 @@ void to_json(nlohmann::json& j, const Tcp_settings& o) j["image_socket"] = o.m_Image_socket; if(o.nwritersIsSet()) j["nwriters"] = o.m_Nwriters; - + if(o.peerLivenessTimeoutMsIsSet()) + j["peer_liveness_timeout_ms"] = o.m_Peer_liveness_timeout_ms; + if(o.maxBackpressureTimeoutMsIsSet()) + j["max_backpressure_timeout_ms"] = o.m_Max_backpressure_timeout_ms; + } void from_json(const nlohmann::json& j, Tcp_settings& o) @@ -122,8 +136,18 @@ void from_json(const nlohmann::json& j, Tcp_settings& o) { j.at("nwriters").get_to(o.m_Nwriters); o.m_NwritersIsSet = true; - } - + } + if(j.find("peer_liveness_timeout_ms") != j.end()) + { + j.at("peer_liveness_timeout_ms").get_to(o.m_Peer_liveness_timeout_ms); + o.m_Peer_liveness_timeout_msIsSet = true; + } + if(j.find("max_backpressure_timeout_ms") != j.end()) + { + j.at("max_backpressure_timeout_ms").get_to(o.m_Max_backpressure_timeout_ms); + o.m_Max_backpressure_timeout_msIsSet = true; + } + } int64_t Tcp_settings::getSendBufferSize() const @@ -177,6 +201,40 @@ void Tcp_settings::unsetNwriters() { m_NwritersIsSet = false; } +int64_t Tcp_settings::getPeerLivenessTimeoutMs() const +{ + return m_Peer_liveness_timeout_ms; +} +void Tcp_settings::setPeerLivenessTimeoutMs(int64_t const value) +{ + m_Peer_liveness_timeout_ms = value; + m_Peer_liveness_timeout_msIsSet = true; +} +bool Tcp_settings::peerLivenessTimeoutMsIsSet() const +{ + return m_Peer_liveness_timeout_msIsSet; +} +void Tcp_settings::unsetPeer_liveness_timeout_ms() +{ + m_Peer_liveness_timeout_msIsSet = false; +} +int64_t Tcp_settings::getMaxBackpressureTimeoutMs() const +{ + return m_Max_backpressure_timeout_ms; +} +void Tcp_settings::setMaxBackpressureTimeoutMs(int64_t const value) +{ + m_Max_backpressure_timeout_ms = value; + m_Max_backpressure_timeout_msIsSet = true; +} +bool Tcp_settings::maxBackpressureTimeoutMsIsSet() const +{ + return m_Max_backpressure_timeout_msIsSet; +} +void Tcp_settings::unsetMax_backpressure_timeout_ms() +{ + m_Max_backpressure_timeout_msIsSet = false; +} } // namespace org::openapitools::server::model diff --git a/broker/gen/model/Tcp_settings.h b/broker/gen/model/Tcp_settings.h index 2c933bdd..61144d44 100644 --- a/broker/gen/model/Tcp_settings.h +++ b/broker/gen/model/Tcp_settings.h @@ -79,6 +79,20 @@ public: void setNwriters(int64_t const value); bool nwritersIsSet() const; void unsetNwriters(); + /// + /// Max time (in milliseconds) a send may block with no sign of life from the writer (no BUSY heartbeat or ACK) before the connection is declared dead. Optional; the built-in default is used when unset. + /// + int64_t getPeerLivenessTimeoutMs() const; + void setPeerLivenessTimeoutMs(int64_t const value); + bool peerLivenessTimeoutMsIsSet() const; + void unsetPeer_liveness_timeout_ms(); + /// + /// Hard upper bound (in milliseconds) on backpressure: if no bytes can be sent to the writer for this long the connection is declared dead even while the writer keeps heartbeating, so a wedged writer cannot block the run or its finalization. Optional; the built-in default is used when unset. + /// + int64_t getMaxBackpressureTimeoutMs() const; + void setMaxBackpressureTimeoutMs(int64_t const value); + bool maxBackpressureTimeoutMsIsSet() const; + void unsetMax_backpressure_timeout_ms(); friend void to_json(nlohmann::json& j, const Tcp_settings& o); friend void from_json(const nlohmann::json& j, Tcp_settings& o); @@ -89,7 +103,11 @@ protected: bool m_Image_socketIsSet; int64_t m_Nwriters; bool m_NwritersIsSet; - + int64_t m_Peer_liveness_timeout_ms; + bool m_Peer_liveness_timeout_msIsSet; + int64_t m_Max_backpressure_timeout_ms; + bool m_Max_backpressure_timeout_msIsSet; + }; } // namespace org::openapitools::server::model diff --git a/broker/jfjoch_api.yaml b/broker/jfjoch_api.yaml index 00e70e80..86ead6d2 100644 --- a/broker/jfjoch_api.yaml +++ b/broker/jfjoch_api.yaml @@ -2232,6 +2232,21 @@ components: maximum: 100 default: 32 description: Number of TCP/IP writers to be used for streaming images + peer_liveness_timeout_ms: + type: integer + format: int64 + description: | + Max time (in milliseconds) a send may block with no sign of life from the + writer (no BUSY heartbeat or ACK) before the connection is declared dead. + Optional; the built-in default is used when unset. + max_backpressure_timeout_ms: + type: integer + format: int64 + description: | + Hard upper bound (in milliseconds) on backpressure: if no bytes can be sent to + the writer for this long the connection is declared dead even while the writer + keeps heartbeating, so a wedged writer cannot block the run or its + finalization. Optional; the built-in default is used when unset. pcie_devices: type: array minLength: 1 -- 2.52.0 From 51bfb57782560db105e3119853a1145459428120 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Thu, 25 Jun 2026 14:19:09 +0200 Subject: [PATCH 5/7] Regenerate OpenAPI --- broker/gen/model/Tcp_settings.cpp | 24 ++++++++++++------------ broker/gen/model/Tcp_settings.h | 6 +++--- broker/redoc-static.html | 2 +- docs/python_client/docs/TcpSettings.md | 2 ++ frontend/src/client/types.gen.ts | 15 +++++++++++++++ frontend/src/client/zod.gen.ts | 4 +++- 6 files changed, 36 insertions(+), 17 deletions(-) diff --git a/broker/gen/model/Tcp_settings.cpp b/broker/gen/model/Tcp_settings.cpp index ef632894..391cfb4c 100644 --- a/broker/gen/model/Tcp_settings.cpp +++ b/broker/gen/model/Tcp_settings.cpp @@ -31,7 +31,7 @@ Tcp_settings::Tcp_settings() m_Peer_liveness_timeout_msIsSet = false; m_Max_backpressure_timeout_ms = 0L; m_Max_backpressure_timeout_msIsSet = false; - + } void Tcp_settings::validate() const @@ -72,7 +72,7 @@ bool Tcp_settings::validate(std::stringstream& msg, const std::string& pathPrefi } } - + return success; } @@ -89,13 +89,13 @@ bool Tcp_settings::operator==(const Tcp_settings& rhs) const ((!nwritersIsSet() && !rhs.nwritersIsSet()) || (nwritersIsSet() && rhs.nwritersIsSet() && getNwriters() == rhs.getNwriters())) && - - + + ((!peerLivenessTimeoutMsIsSet() && !rhs.peerLivenessTimeoutMsIsSet()) || (peerLivenessTimeoutMsIsSet() && rhs.peerLivenessTimeoutMsIsSet() && getPeerLivenessTimeoutMs() == rhs.getPeerLivenessTimeoutMs())) && - - + + ((!maxBackpressureTimeoutMsIsSet() && !rhs.maxBackpressureTimeoutMsIsSet()) || (maxBackpressureTimeoutMsIsSet() && rhs.maxBackpressureTimeoutMsIsSet() && getMaxBackpressureTimeoutMs() == rhs.getMaxBackpressureTimeoutMs())) - + ; } @@ -117,7 +117,7 @@ void to_json(nlohmann::json& j, const Tcp_settings& o) j["peer_liveness_timeout_ms"] = o.m_Peer_liveness_timeout_ms; if(o.maxBackpressureTimeoutMsIsSet()) j["max_backpressure_timeout_ms"] = o.m_Max_backpressure_timeout_ms; - + } void from_json(const nlohmann::json& j, Tcp_settings& o) @@ -136,18 +136,18 @@ void from_json(const nlohmann::json& j, Tcp_settings& o) { j.at("nwriters").get_to(o.m_Nwriters); o.m_NwritersIsSet = true; - } + } if(j.find("peer_liveness_timeout_ms") != j.end()) { j.at("peer_liveness_timeout_ms").get_to(o.m_Peer_liveness_timeout_ms); o.m_Peer_liveness_timeout_msIsSet = true; - } + } if(j.find("max_backpressure_timeout_ms") != j.end()) { j.at("max_backpressure_timeout_ms").get_to(o.m_Max_backpressure_timeout_ms); o.m_Max_backpressure_timeout_msIsSet = true; - } - + } + } int64_t Tcp_settings::getSendBufferSize() const diff --git a/broker/gen/model/Tcp_settings.h b/broker/gen/model/Tcp_settings.h index 61144d44..d323c3bd 100644 --- a/broker/gen/model/Tcp_settings.h +++ b/broker/gen/model/Tcp_settings.h @@ -80,14 +80,14 @@ public: bool nwritersIsSet() const; void unsetNwriters(); /// - /// Max time (in milliseconds) a send may block with no sign of life from the writer (no BUSY heartbeat or ACK) before the connection is declared dead. Optional; the built-in default is used when unset. + /// Max time (in milliseconds) a send may block with no sign of life from the writer (no BUSY heartbeat or ACK) before the connection is declared dead. Optional; the built-in default is used when unset. /// int64_t getPeerLivenessTimeoutMs() const; void setPeerLivenessTimeoutMs(int64_t const value); bool peerLivenessTimeoutMsIsSet() const; void unsetPeer_liveness_timeout_ms(); /// - /// Hard upper bound (in milliseconds) on backpressure: if no bytes can be sent to the writer for this long the connection is declared dead even while the writer keeps heartbeating, so a wedged writer cannot block the run or its finalization. Optional; the built-in default is used when unset. + /// Hard upper bound (in milliseconds) on backpressure: if no bytes can be sent to the writer for this long the connection is declared dead even while the writer keeps heartbeating, so a wedged writer cannot block the run or its finalization. Optional; the built-in default is used when unset. /// int64_t getMaxBackpressureTimeoutMs() const; void setMaxBackpressureTimeoutMs(int64_t const value); @@ -107,7 +107,7 @@ protected: bool m_Peer_liveness_timeout_msIsSet; int64_t m_Max_backpressure_timeout_ms; bool m_Max_backpressure_timeout_msIsSet; - + }; } // namespace org::openapitools::server::model diff --git a/broker/redoc-static.html b/broker/redoc-static.html index 0bc418af..207f3324 100644 --- a/broker/redoc-static.html +++ b/broker/redoc-static.html @@ -937,7 +937,7 @@ then image might be replaced in the buffer between calling /images and /image.cb