From d156dfa2e8d3f501ffccfa10874fff7b1bdc05c6 Mon Sep 17 00:00:00 2001 From: leonarski_f Date: Tue, 23 Jun 2026 18:09:37 +0200 Subject: [PATCH] TCP stream: gate SendImage enqueue on writer liveness too The broker logs for the dropped runs show the connection torn down ~2s into a collection (not 3s), via "TCP send failed -> Removed dead connection -> Accepted (new socket)". That is too early for the SendAll send deadline: the real gate was the fixed 2-second enqueue deadline in the zerocopy SendImage path. At the start of a large dataset the writer briefly stalls draining the socket while it creates the master file and writes the large START metadata + calibration frames to GPFS; the per-connection queue fills, and after 2s SendImage marked the connection broken. The writer then reconnected outside the active session, so the rest of the run was dropped and the half-written file was finalized at the next START. Replace the fixed 2s enqueue deadline with the same peer-liveness condition used on the send path: keep applying backpressure while the writer proves it is alive (BUSY heartbeats / ACKs refresh last_peer_activity_ns from a thread independent of the stalled write path), and only declare it dead after the liveness window of complete silence. A transient startup stall is now ridden out instead of dropping the run. Co-Authored-By: Claude Opus 4.8 --- image_pusher/TCPStreamPusher.cpp | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/image_pusher/TCPStreamPusher.cpp b/image_pusher/TCPStreamPusher.cpp index 88678d3b..7c2656ba 100644 --- a/image_pusher/TCPStreamPusher.cpp +++ b/image_pusher/TCPStreamPusher.cpp @@ -1094,23 +1094,28 @@ bool TCPStreamPusher::SendImage(ZeroCopyReturnValue &z) { return false; } - const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2); - while (std::chrono::steady_clock::now() < deadline) { - if (target->broken || !target->active) { - z.release(); - return false; - } - + // A full queue means the writer is applying backpressure (e.g. briefly stalled on a + // freshly-created file at startup). Keep waiting as long as it proves it is alive — + // BUSY heartbeats / ACKs refresh last_peer_activity_ns from a thread independent of + // its stalled write path — so a slow-but-healthy writer is held back, not dropped. + // Only a peer that has gone completely silent for the liveness window is declared dead. + while (!target->broken && target->active) { if (target->queue.PutTimeout(ImagePusherQueueElement{ .image_data = static_cast(z.GetImage()), .z = &z, .end = false - }, std::chrono::milliseconds(50))) { + }, std::chrono::milliseconds(50))) return true; - } + + const int64_t silent_ns = SteadyNowNs() - target->last_peer_activity_ns.load(std::memory_order_relaxed); + if (silent_ns > std::chrono::duration_cast(peer_liveness_timeout).count()) { + logger.Warning("No liveness from writer on socket " + std::to_string(target->socket_number) + + " for " + std::to_string(silent_ns / 1000000) + " ms while enqueuing; marking broken"); + target->broken = true; + break; + } } - target->broken = true; z.release(); return false; }