diff --git a/image_pusher/TCPStreamPusher.cpp b/image_pusher/TCPStreamPusher.cpp index 75844476..d8291661 100644 --- a/image_pusher/TCPStreamPusher.cpp +++ b/image_pusher/TCPStreamPusher.cpp @@ -266,9 +266,6 @@ bool TCPStreamPusher::SendFrame(Connection& c, const uint8_t* data, size_t size, if (size > 0 && !SendAll(c, data, size)) return false; - if (type == TCPFrameType::DATA) - c.data_sent.fetch_add(1, std::memory_order_relaxed); - return true; } @@ -661,7 +658,6 @@ void TCPStreamPusher::StartDataCollectionThreads(Connection& c) { c.data_ack_error_text.clear(); } - c.data_sent.store(0, std::memory_order_relaxed); c.data_acked_ok.store(0, std::memory_order_relaxed); c.data_acked_bad.store(0, std::memory_order_relaxed); c.data_acked_total.store(0, std::memory_order_relaxed); @@ -887,7 +883,7 @@ bool TCPStreamPusher::SendImage(ZeroCopyReturnValue &z) { target = use[idx]; } - if (!target || target->broken || !target->active) { + if (!target) { z.release(); return false; } @@ -896,10 +892,10 @@ bool TCPStreamPusher::SendImage(ZeroCopyReturnValue &z) { // freshly-created file at startup). Keep waiting as long as it proves it is alive — // BUSY heartbeats / ACKs refresh last_peer_activity_ns from a thread independent of // its stalled write path — so a slow-but-healthy writer is held back, not dropped. - // Only a peer that has gone completely silent for the liveness window is declared dead. + // Only a peer that has gone completely silent for the liveness window is declared dead; + // a writer that heartbeats but never drains is caught by SendAll's max_backpressure cap. while (!target->broken && target->active) { if (target->queue.PutTimeout(ImagePusherQueueElement{ - .image_data = static_cast(z.GetImage()), .z = &z, .end = false }, std::chrono::milliseconds(50))) diff --git a/image_pusher/TCPStreamPusher.h b/image_pusher/TCPStreamPusher.h index 78398d8a..7ef05382 100644 --- a/image_pusher/TCPStreamPusher.h +++ b/image_pusher/TCPStreamPusher.h @@ -71,7 +71,6 @@ class TCPStreamPusher : public ImagePusher { std::atomic data_ack_error_reported{false}; std::string data_ack_error_text; - std::atomic data_sent{0}; std::atomic data_acked_ok{0}; std::atomic data_acked_bad{0}; std::atomic data_acked_total{0}; @@ -155,6 +154,12 @@ public: ~TCPStreamPusher() override; + /// Max time a send may block on backpressure with no sign of life from the peer + /// before the connection is declared dead. A busy-but-alive writer keeps it fresh + /// via BUSY heartbeats, so this only catches a genuinely silent peer. Must be set + /// before data collection starts. + void SetPeerLivenessTimeout(std::chrono::milliseconds t) { peer_liveness_timeout = t; } + std::vector GetAddress() const override { return {endpoint}; } /// Returns the number of currently connected writers (can be called at any time) diff --git a/tests/TCPImagePusherTest.cpp b/tests/TCPImagePusherTest.cpp index 7b558518..20cf1440 100644 --- a/tests/TCPImagePusherTest.cpp +++ b/tests/TCPImagePusherTest.cpp @@ -3,11 +3,25 @@ #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include "../image_pusher/TCPStreamPusher.h" #include "../image_puller/TCPImagePuller.h" #include "../image_puller/ZMQImagePuller.h" +#include "../common/ImageBuffer.h" +#include "../common/ZeroCopyReturnValue.h" TEST_CASE("TCPImageCommTest_2Writers_WithAck", "[TCP]") { const size_t nframes = 128; @@ -698,3 +712,221 @@ TEST_CASE("TCPImageCommTest_RepubToZMQ", "[TCP][ZeroMQ]") { REQUIRE(zmq_nimages == nframes); REQUIRE(zmq_errors == 0); } + +namespace { + +// Controllable TCP "writer" peer for backpressure tests. Connects to the pusher, ACKs +// START, then *stalls* (stops draining the socket) until Release() is called, while a +// background thread keeps sending BUSY heartbeats — i.e. a writer that is alive but +// wedged (e.g. on a slow filesystem at high frame rate). Catch2 assertion macros are not +// thread-safe, so the worker threads only touch atomics; the test thread asserts. +class StallableWriterDouble { +public: + StallableWriterDouble(const std::string &tcp_addr, int rcvbuf_bytes) { + auto [host, port] = ParseHostPort(tcp_addr); + fd_ = ::socket(AF_INET, SOCK_STREAM, 0); + if (fd_ < 0) + return; + setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &rcvbuf_bytes, sizeof(rcvbuf_bytes)); + sockaddr_in sin{}; + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + inet_pton(AF_INET, host.c_str(), &sin.sin_addr); + if (::connect(fd_, reinterpret_cast(&sin), sizeof(sin)) != 0) { + ::close(fd_); + fd_ = -1; + return; + } + busy_thread_ = std::thread([this] { BusyLoop(); }); + reader_thread_ = std::thread([this] { ReaderLoop(); }); + } + + ~StallableWriterDouble() { + stop_ = true; + Release(); + if (fd_ >= 0) + ::shutdown(fd_, SHUT_RDWR); + if (reader_thread_.joinable()) + reader_thread_.join(); + if (busy_thread_.joinable()) + busy_thread_.join(); + if (fd_ >= 0) + ::close(fd_); + } + + [[nodiscard]] bool Connected() const { return fd_ >= 0; } + + // Stop stalling: let the reader drain DATA and ACK END. + void Release() { + { + std::lock_guard lg(mtx_); + released_ = true; + } + cv_.notify_all(); + } + + [[nodiscard]] size_t DataFramesReceived() const { return data_frames_.load(); } + [[nodiscard]] bool EndAcked() const { return end_acked_.load(); } + +private: + static std::pair ParseHostPort(const std::string &addr) { + const std::string prefix = "tcp://"; + const auto hp = addr.substr(prefix.size()); + const auto p = hp.find_last_of(':'); + return {hp.substr(0, p), static_cast(std::stoi(hp.substr(p + 1)))}; + } + + bool SendHeader(TCPFrameType type, TCPFrameType ack_for, uint64_t run, uint32_t sock, uint32_t flags) { + TcpFrameHeader h{}; + h.type = static_cast(type); + h.ack_for = static_cast(ack_for); + h.run_number = run; + h.socket_number = sock; + h.flags = flags; + h.payload_size = 0; + std::lock_guard lg(send_mtx_); + if (fd_ < 0) + return false; + return ::send(fd_, &h, sizeof(h), MSG_NOSIGNAL) == static_cast(sizeof(h)); + } + + bool ReadExact(void *buf, size_t len) { + auto *p = static_cast(buf); + size_t got = 0; + while (got < len) { + const ssize_t rc = ::recv(fd_, p + got, len - got, 0); + if (rc <= 0) + return false; + got += static_cast(rc); + } + return true; + } + + void BusyLoop() { + // Heartbeat keeps the pusher's peer-liveness fresh even while we are not draining. + while (!stop_) { + SendHeader(TCPFrameType::BUSY, TCPFrameType::DATA, run_.load(), sock_.load(), 0); + for (int i = 0; i < 5 && !stop_; i++) + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + } + + void ReaderLoop() { + std::vector discard; + while (!stop_) { + TcpFrameHeader h{}; + if (!ReadExact(&h, sizeof(h))) + return; + if (h.magic != JFJOCH_TCP_MAGIC || h.version != JFJOCH_TCP_VERSION) + return; + if (h.payload_size > 0) { + discard.resize(h.payload_size); + if (!ReadExact(discard.data(), discard.size())) + return; + } + switch (static_cast(h.type)) { + case TCPFrameType::START: + run_.store(h.run_number); + sock_.store(h.socket_number); + SendHeader(TCPFrameType::ACK, TCPFrameType::START, h.run_number, h.socket_number, TCP_ACK_FLAG_OK); + { // Stall: stop reading until released. + std::unique_lock ul(mtx_); + cv_.wait(ul, [this] { return released_ || stop_; }); + } + break; + case TCPFrameType::DATA: + data_frames_.fetch_add(1); + break; + case TCPFrameType::END: + SendHeader(TCPFrameType::ACK, TCPFrameType::END, h.run_number, h.socket_number, TCP_ACK_FLAG_OK); + end_acked_.store(true); + return; + default: + break; // ignore KEEPALIVE etc. + } + } + } + + int fd_ = -1; + std::thread reader_thread_; + std::thread busy_thread_; + std::atomic stop_{false}; + std::atomic run_{0}; + std::atomic sock_{0}; + std::atomic data_frames_{0}; + std::atomic end_acked_{false}; + std::mutex send_mtx_; + std::mutex mtx_; + std::condition_variable cv_; + bool released_ = false; +}; + +} // namespace + +TEST_CASE("TCPImageCommTest_StalledWriter_SurvivesViaHeartbeat", "[TCP]") { + // A writer that is alive (still heartbeating) but has stopped draining — e.g. wedged + // on a slow filesystem at high frame rate — must NOT be dropped mid-run. The pusher + // rides out the backpressure on the production zero-copy queue path until the writer + // recovers. Regression for the queue-path send giving up on a fixed deadline, and for + // the BUSY heartbeat keeping the connection alive past the peer-liveness window. + constexpr int64_t N = 1000; // > queue depth + socket buffers + constexpr auto liveness = std::chrono::milliseconds(2000); + constexpr auto stall = std::chrono::milliseconds(4000); // > liveness AND > old send deadline + + // Small SO_SNDBUF/SO_RCVBUF so backpressure reaches the queue after few images. + TCPStreamPusher pusher("tcp://127.0.0.1:*", 1, 16 * 1024); + pusher.SetPeerLivenessTimeout(liveness); + + StallableWriterDouble writer(pusher.GetAddress()[0], 16 * 1024); + REQUIRE(writer.Connected()); + + for (int attempt = 0; attempt < 200 && pusher.GetConnectedWriters() < 1; ++attempt) + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + REQUIRE(pusher.GetConnectedWriters() == 1); + + ImageBuffer image_buffer(16 * 1024 * 1024); + image_buffer.StartMeasurement(static_cast(4096)); + + StartMessage start{.images_per_file = 1000, .write_master_file = true}; + pusher.StartDataCollection(start); // writer ACKs START, then stalls (stops reading) + + auto sender = std::async(std::launch::async, [&] { + for (int64_t i = 0; i < N; i++) { + ZeroCopyReturnValue *slot = nullptr; + while ((slot = image_buffer.GetImageSlot()) == nullptr) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::memset(slot->GetImage(), 0, 256); + slot->SetImageNumber(i); + slot->SetImageSize(256); // arbitrary payload; the writer double discards it + slot->ReadyToSend(); + pusher.SendImage(*slot); + } + }); + + // During the stall the queue is full; SendImage must block, not drop the connection. + std::this_thread::sleep_for(stall); + CHECK(pusher.GetConnectedWriters() == 1); + CHECK(sender.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready); + + // Writer recovers and starts draining. + writer.Release(); + + REQUIRE(sender.wait_for(std::chrono::seconds(30)) == std::future_status::ready); + sender.get(); + + // Every image makes it across once the stall clears. + for (int attempt = 0; attempt < 1200 && writer.DataFramesReceived() < static_cast(N); ++attempt) + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + CHECK(writer.DataFramesReceived() == static_cast(N)); + + // Queue fully drained: END now hands over cleanly without racing data frames. + EndMessage end{}; + CHECK(pusher.EndDataCollection(end) == true); + + for (int attempt = 0; attempt < 200 && !writer.EndAcked(); ++attempt) + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + CHECK(writer.EndAcked()); + CHECK(pusher.GetConnectedWriters() == 1); + + image_buffer.Finalize(std::chrono::seconds(5)); +}