diff --git a/image_pusher/TCPStreamPusher.cpp b/image_pusher/TCPStreamPusher.cpp index f4c9b5ac..371ac472 100644 --- a/image_pusher/TCPStreamPusher.cpp +++ b/image_pusher/TCPStreamPusher.cpp @@ -377,7 +377,12 @@ bool TCPStreamPusher::ReadExactPersistent(Connection& c, void* buf, size_t len) } void TCPStreamPusher::WriterThread(Connection* c) { - while (c->active) { + // Drain the queue until the end sentinel. We deliberately do NOT stop on c->active + // alone: StopDataCollectionThreads() clears active and then pushes the sentinel, and + // every DATA frame enqueued before that sentinel must still be sent. Otherwise END + // (sent by EndDataCollection after stopping this thread) could overtake images that + // are merely waiting in the queue, and the writer would finalize before receiving them. + while (true) { auto e = c->queue.GetBlocking(); if (e.end) break; @@ -944,6 +949,14 @@ bool TCPStreamPusher::EndDataCollection(const EndMessage& message) { for (auto& cptr : local_connections) { auto& c = *cptr; + + // Flush all queued DATA before sending END: stopping the writer thread drains the + // send queue and joins it, so every image is on the wire before END goes out. This + // prevents END from overtaking images still waiting in the queue when the writer + // thread has fallen behind (e.g. on a loaded machine), which would make the remote + // writer finalize early and silently drop the trailing images. + StopDataCollectionThreads(c); + if (c.broken) { ret = false; continue; @@ -962,9 +975,6 @@ bool TCPStreamPusher::EndDataCollection(const EndMessage& message) { ret = false; } - for (auto& c : local_connections) - StopDataCollectionThreads(*c); - { std::lock_guard lg(connections_mutex); session_connections.clear();