From cdbd0b7bed5eae1d548d440fdbedb6db57c37ff4 Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Thu, 25 Jun 2026 16:49:25 +0200 Subject: [PATCH] TCPStreamPusher: flush queued images before END so writer can't drop tail Under load on the CI machine, the JFJochIntegrationTest_TCP_* tests intermittently failed with the writer reporting fewer images than sent (e.g. 4 of 5) and the DATA-ACK count short by one. EndDataCollection sent the END frame as soon as it could acquire send_mutex, without waiting for the per-connection WriterThread to drain its queue. SendImage only enqueues; the WriterThread actually transmits DATA frames. On a fast machine the queue is already empty when END is sent, but on a loaded machine the WriterThread falls behind, END overtakes the still-queued trailing image(s), the remote writer sees END and finalizes, and the late DATA frame is silently dropped by ProcessDataImage's Finalized case (no ACK emitted). Fix: flush before END. WriterThread now drains the queue until the end sentinel instead of bailing the moment c->active is cleared, and EndDataCollection stops (and joins) the WriterThread before sending END. Joining the writer is a race-free barrier: every image is on the wire before END goes out. Co-Authored-By: Claude Opus 4.8 (1M context) --- image_pusher/TCPStreamPusher.cpp | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/image_pusher/TCPStreamPusher.cpp b/image_pusher/TCPStreamPusher.cpp index f4c9b5ac..371ac472 100644 --- a/image_pusher/TCPStreamPusher.cpp +++ b/image_pusher/TCPStreamPusher.cpp @@ -377,7 +377,12 @@ bool TCPStreamPusher::ReadExactPersistent(Connection& c, void* buf, size_t len) } void TCPStreamPusher::WriterThread(Connection* c) { - while (c->active) { + // Drain the queue until the end sentinel. We deliberately do NOT stop on c->active + // alone: StopDataCollectionThreads() clears active and then pushes the sentinel, and + // every DATA frame enqueued before that sentinel must still be sent. Otherwise END + // (sent by EndDataCollection after stopping this thread) could overtake images that + // are merely waiting in the queue, and the writer would finalize before receiving them. + while (true) { auto e = c->queue.GetBlocking(); if (e.end) break; @@ -944,6 +949,14 @@ bool TCPStreamPusher::EndDataCollection(const EndMessage& message) { for (auto& cptr : local_connections) { auto& c = *cptr; + + // Flush all queued DATA before sending END: stopping the writer thread drains the + // send queue and joins it, so every image is on the wire before END goes out. This + // prevents END from overtaking images still waiting in the queue when the writer + // thread has fallen behind (e.g. on a loaded machine), which would make the remote + // writer finalize early and silently drop the trailing images. + StopDataCollectionThreads(c); + if (c.broken) { ret = false; continue; @@ -962,9 +975,6 @@ bool TCPStreamPusher::EndDataCollection(const EndMessage& message) { ret = false; } - for (auto& c : local_connections) - StopDataCollectionThreads(*c); - { std::lock_guard lg(connections_mutex); session_connections.clear();