TCPStreamPusher: flush queued images before END so writer can't drop tail
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 10m40s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 11m0s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 9m14s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 9m39s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 10m24s
Build Packages / build:rpm (rocky8) (push) Successful in 10m30s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 11m23s
Build Packages / build:rpm (rocky9) (push) Successful in 11m9s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 10m7s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 10m9s
Build Packages / Generate python client (push) Successful in 18s
Build Packages / Build documentation (push) Successful in 51s
Build Packages / Create release (push) Skipped
Build Packages / XDS test (neggia plugin) (push) Successful in 6m25s
Build Packages / XDS test (durin plugin) (push) Successful in 7m32s
Build Packages / XDS test (JFJoch plugin) (push) Successful in 7m37s
Build Packages / DIALS test (push) Successful in 11m29s
Build Packages / build:rpm (rocky8_nocuda) (pull_request) Successful in 9m2s
Build Packages / build:rpm (ubuntu2204_nocuda) (pull_request) Successful in 8m46s
Build Packages / build:rpm (rocky9_nocuda) (pull_request) Successful in 9m40s
Build Packages / build:rpm (ubuntu2404_nocuda) (pull_request) Successful in 8m23s
Build Packages / build:rpm (rocky8) (pull_request) Successful in 8m59s
Build Packages / build:rpm (rocky8_sls9) (pull_request) Successful in 9m50s
Build Packages / build:rpm (rocky9_sls9) (pull_request) Successful in 10m36s
Build Packages / build:rpm (rocky9) (pull_request) Successful in 10m21s
Build Packages / build:rpm (ubuntu2404) (pull_request) Successful in 9m53s
Build Packages / build:rpm (ubuntu2204) (pull_request) Successful in 10m24s
Build Packages / XDS test (durin plugin) (pull_request) Successful in 8m19s
Build Packages / Generate python client (pull_request) Successful in 13s
Build Packages / Create release (pull_request) Skipped
Build Packages / Build documentation (pull_request) Successful in 38s
Build Packages / XDS test (JFJoch plugin) (pull_request) Successful in 8m22s
Build Packages / DIALS test (pull_request) Successful in 11m31s
Build Packages / XDS test (neggia plugin) (pull_request) Successful in 5m41s
Build Packages / Unit tests (push) Successful in 1h17m11s
Build Packages / Unit tests (pull_request) Successful in 1h8m58s

Under load on the CI machine, the JFJochIntegrationTest_TCP_* tests
intermittently failed with the writer reporting fewer images than sent
(e.g. 4 of 5) and the DATA-ACK count short by one.

EndDataCollection sent the END frame as soon as it could acquire
send_mutex, without waiting for the per-connection WriterThread to drain
its queue. SendImage only enqueues; the WriterThread actually transmits
DATA frames. On a fast machine the queue is already empty when END is
sent, but on a loaded machine the WriterThread falls behind, END
overtakes the still-queued trailing image(s), the remote writer sees END
and finalizes, and the late DATA frame is silently dropped by
ProcessDataImage's Finalized case (no ACK emitted).

Fix: flush before END. WriterThread now drains the queue until the end
sentinel instead of bailing the moment c->active is cleared, and
EndDataCollection stops (and joins) the WriterThread before sending END.
Joining the writer is a race-free barrier: every image is on the wire
before END goes out.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-25 16:49:25 +02:00
parent 26554c86e3
commit cdbd0b7bed
+14 -4
View File
@@ -377,7 +377,12 @@ bool TCPStreamPusher::ReadExactPersistent(Connection& c, void* buf, size_t len)
}
void TCPStreamPusher::WriterThread(Connection* c) {
while (c->active) {
// Drain the queue until the end sentinel. We deliberately do NOT stop on c->active
// alone: StopDataCollectionThreads() clears active and then pushes the sentinel, and
// every DATA frame enqueued before that sentinel must still be sent. Otherwise END
// (sent by EndDataCollection after stopping this thread) could overtake images that
// are merely waiting in the queue, and the writer would finalize before receiving them.
while (true) {
auto e = c->queue.GetBlocking();
if (e.end)
break;
@@ -944,6 +949,14 @@ bool TCPStreamPusher::EndDataCollection(const EndMessage& message) {
for (auto& cptr : local_connections) {
auto& c = *cptr;
// Flush all queued DATA before sending END: stopping the writer thread drains the
// send queue and joins it, so every image is on the wire before END goes out. This
// prevents END from overtaking images still waiting in the queue when the writer
// thread has fallen behind (e.g. on a loaded machine), which would make the remote
// writer finalize early and silently drop the trailing images.
StopDataCollectionThreads(c);
if (c.broken) {
ret = false;
continue;
@@ -962,9 +975,6 @@ bool TCPStreamPusher::EndDataCollection(const EndMessage& message) {
ret = false;
}
for (auto& c : local_connections)
StopDataCollectionThreads(*c);
{
std::lock_guard lg(connections_mutex);
session_connections.clear();