diff --git a/image_pusher/TCPStreamPusher.cpp b/image_pusher/TCPStreamPusher.cpp index 88678d3b..7c2656ba 100644 --- a/image_pusher/TCPStreamPusher.cpp +++ b/image_pusher/TCPStreamPusher.cpp @@ -1094,23 +1094,28 @@ bool TCPStreamPusher::SendImage(ZeroCopyReturnValue &z) { return false; } - const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2); - while (std::chrono::steady_clock::now() < deadline) { - if (target->broken || !target->active) { - z.release(); - return false; - } - + // A full queue means the writer is applying backpressure (e.g. briefly stalled on a + // freshly-created file at startup). Keep waiting as long as it proves it is alive — + // BUSY heartbeats / ACKs refresh last_peer_activity_ns from a thread independent of + // its stalled write path — so a slow-but-healthy writer is held back, not dropped. + // Only a peer that has gone completely silent for the liveness window is declared dead. + while (!target->broken && target->active) { if (target->queue.PutTimeout(ImagePusherQueueElement{ .image_data = static_cast(z.GetImage()), .z = &z, .end = false - }, std::chrono::milliseconds(50))) { + }, std::chrono::milliseconds(50))) return true; - } + + const int64_t silent_ns = SteadyNowNs() - target->last_peer_activity_ns.load(std::memory_order_relaxed); + if (silent_ns > std::chrono::duration_cast(peer_liveness_timeout).count()) { + logger.Warning("No liveness from writer on socket " + std::to_string(target->socket_number) + + " for " + std::to_string(silent_ns / 1000000) + " ms while enqueuing; marking broken"); + target->broken = true; + break; + } } - target->broken = true; z.release(); return false; }