// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "TCPStreamPusher.h" #include #include #include #include #include #include #include #include namespace { int64_t SteadyNowNs() { return std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()).count(); } } std::pair> TCPStreamPusher::ParseTcpAddress(const std::string& addr) { const std::string prefix = "tcp://"; if (addr.rfind(prefix, 0) != 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP address: " + addr); auto hp = addr.substr(prefix.size()); auto p = hp.find_last_of(':'); if (p == std::string::npos) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP address: " + addr); const auto host = hp.substr(0, p); const auto port_str = hp.substr(p + 1); if (port_str == "*") return {host, std::nullopt}; int port_i = 0; try { size_t parsed = 0; port_i = std::stoi(port_str, &parsed); if (parsed != port_str.size()) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP port in address: " + addr); } catch (...) { throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP port in address: " + addr); } if (port_i < 1 || port_i > static_cast(std::numeric_limits::max())) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "TCP port out of range in address: " + addr); return {host, static_cast(port_i)}; } std::pair TCPStreamPusher::OpenListenSocket(const std::string& addr) { auto [host, port_opt] = ParseTcpAddress(addr); int listen_fd = ::socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "socket(listen) failed"); int one = 1; setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); sockaddr_in sin{}; sin.sin_family = AF_INET; sin.sin_port = htons(port_opt.has_value() ? port_opt.value() : 0); if (host == "*" || host == "0.0.0.0") sin.sin_addr.s_addr = htonl(INADDR_ANY); else if (inet_pton(AF_INET, host.c_str(), &sin.sin_addr) != 1) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "inet_pton failed for " + host); if (bind(listen_fd, reinterpret_cast(&sin), sizeof(sin)) != 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "bind() failed to " + addr); if (listen(listen_fd, 64) != 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "listen() failed on " + addr); sockaddr_in actual{}; socklen_t actual_len = sizeof(actual); if (getsockname(listen_fd, reinterpret_cast(&actual), &actual_len) != 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "getsockname() failed on " + addr); const uint16_t bound_port = ntohs(actual.sin_port); const std::string normalized_host = (host == "*") ? "0.0.0.0" : host; const std::string bound_endpoint = "tcp://" + normalized_host + ":" + std::to_string(bound_port); return {listen_fd, bound_endpoint}; } int TCPStreamPusher::AcceptOne(int listen_fd, std::chrono::milliseconds timeout) { pollfd pfd{}; pfd.fd = listen_fd; pfd.events = POLLIN; const int prc = poll(&pfd, 1, static_cast(timeout.count())); if (prc <= 0) return -1; if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0) return -1; return accept(listen_fd, nullptr, nullptr); } void TCPStreamPusher::CloseFd(std::atomic& fd) { int old_fd = fd.exchange(-1); if (old_fd >= 0) { shutdown(old_fd, SHUT_RDWR); close(old_fd); } } TCPStreamPusher::TCPStreamPusher(const std::string& addr, size_t in_max_connections, std::optional in_send_buffer_size) : endpoint(addr), max_connections(in_max_connections), send_buffer_size(in_send_buffer_size) { if (endpoint.empty()) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No TCP writer address provided"); if (max_connections == 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Max TCP connections cannot be zero"); auto [lfd, bound_endpoint] = OpenListenSocket(endpoint); listen_fd.store(lfd); endpoint = bound_endpoint; acceptor_running = true; acceptor_future = std::async(std::launch::async, &TCPStreamPusher::AcceptorThread, this); keepalive_future = std::async(std::launch::async, &TCPStreamPusher::KeepaliveThread, this); logger.Info("TCPStreamPusher listening on " + endpoint + " (max " + std::to_string(max_connections) + " connections)"); } TCPStreamPusher::~TCPStreamPusher() { // 1. Stop acceptor + keepalive threads (they take connections_mutex briefly) acceptor_running = false; int lfd = listen_fd.exchange(-1); if (lfd >= 0) { shutdown(lfd, SHUT_RDWR); close(lfd); } if (acceptor_future.valid()) acceptor_future.get(); if (keepalive_future.valid()) keepalive_future.get(); // 2. Now no background threads touch connections_mutex. Tear down connections. // We do NOT hold the mutex while joining futures, to avoid deadlock. std::vector> local_connections; { std::lock_guard lg(connections_mutex); local_connections = std::move(connections); connections.clear(); session_connections.clear(); calibration_connection.reset(); } for (auto& c : local_connections) { if (c) TearDownConnection(*c); } } void TCPStreamPusher::TearDownConnection(Connection& c) { StopDataCollectionThreads(c); c.connected = false; c.broken = true; CloseFd(c.fd); if (c.persistent_ack_future.valid()) c.persistent_ack_future.get(); } bool TCPStreamPusher::IsConnectionAlive(const Connection& c) const { if (c.broken) return false; const int local_fd = c.fd.load(); if (local_fd < 0) return false; pollfd pfd{}; pfd.fd = local_fd; pfd.events = POLLOUT; if (poll(&pfd, 1, 0) < 0) return false; if ((pfd.revents & (POLLHUP | POLLNVAL)) != 0) return false; int so_error = 0; socklen_t len = sizeof(so_error); if (getsockopt(local_fd, SOL_SOCKET, SO_ERROR, &so_error, &len) != 0) return false; return so_error == 0; } bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len) { const auto* p = static_cast(buf); size_t sent = 0; int64_t last_progress_ns = SteadyNowNs(); while (sent < len) { const int local_fd = c.fd.load(); if (local_fd < 0 || c.broken) return false; const int64_t now_ns = SteadyNowNs(); // Treat backpressure as fatal only if the peer shows NO sign of life. A busy // writer keeps refreshing last_peer_activity_ns via BUSY heartbeats / ACKs, so // we wait through long stalls; only a silent peer trips this timeout. const int64_t silent_ns = now_ns - c.last_peer_activity_ns.load(std::memory_order_relaxed); if (silent_ns > std::chrono::duration_cast(peer_liveness_timeout).count()) { logger.Warning("No liveness from writer on socket " + std::to_string(c.socket_number) + " for " + std::to_string(silent_ns / 1000000) + " ms while sending; marking broken"); c.broken = true; CloseFd(c.fd); return false; } // Hard backpressure cap: a writer may keep heartbeating yet never drain (e.g. a // permanently wedged filesystem). If no bytes leave the socket for this long, give // up regardless of heartbeats so the run -- and its finalization -- cannot hang. if (now_ns - last_progress_ns > std::chrono::duration_cast(max_backpressure_timeout).count()) { logger.Warning("No send progress to writer on socket " + std::to_string(c.socket_number) + " for " + std::to_string((now_ns - last_progress_ns) / 1000000) + " ms despite heartbeats; marking broken"); c.broken = true; CloseFd(c.fd); return false; } pollfd pfd{}; pfd.fd = local_fd; pfd.events = POLLOUT; const int prc = poll(&pfd, 1, static_cast(send_poll_timeout.count())); if (prc < 0) { if (errno == EINTR) continue; c.broken = true; CloseFd(c.fd); return false; } if (prc == 0) continue; if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLOUT)) { c.broken = true; CloseFd(c.fd); return false; } ssize_t rc = ::send(local_fd, p + sent, len - sent, MSG_NOSIGNAL); if (rc < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) continue; if (errno == EPIPE || errno == ECONNRESET || errno == ENOTCONN) { c.broken = true; CloseFd(c.fd); } return false; } sent += static_cast(rc); last_progress_ns = SteadyNowNs(); } return true; } bool TCPStreamPusher::SendFrame(Connection& c, const uint8_t* data, size_t size, TCPFrameType type, int64_t image_number) { TcpFrameHeader h{}; h.type = static_cast(type); h.payload_size = size; h.image_number = image_number >= 0 ? static_cast(image_number) : 0; h.socket_number = c.socket_number; h.run_number = run_number; if (!SendAll(c, &h, sizeof(h))) return false; if (size > 0 && !SendAll(c, data, size)) return false; return true; } bool TCPStreamPusher::ReadExact(Connection& c, void* buf, size_t len) { auto* p = static_cast(buf); size_t got = 0; while (got < len) { if (!c.active) return false; const int local_fd = c.fd.load(); if (local_fd < 0) return false; pollfd pfd{}; pfd.fd = local_fd; pfd.events = POLLIN; const int prc = poll(&pfd, 1, 100); if (prc == 0) continue; if (prc < 0) { if (errno == EINTR) continue; return false; } if ((pfd.revents & (POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLIN)) return false; if ((pfd.revents & POLLIN) == 0) continue; ssize_t rc = ::recv(local_fd, p + got, len - got, 0); if (rc == 0) return false; if (rc < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) continue; return false; } got += static_cast(rc); } return true; } bool TCPStreamPusher::ReadExactPersistent(Connection& c, void* buf, size_t len) { auto* p = static_cast(buf); size_t got = 0; while (got < len) { if (!c.connected || c.broken) return false; const int local_fd = c.fd.load(); if (local_fd < 0) return false; pollfd pfd{}; pfd.fd = local_fd; pfd.events = POLLIN; const int prc = poll(&pfd, 1, 500); if (prc == 0) continue; if (prc < 0) { if (errno == EINTR) continue; return false; } // Only bail if the peer hung up / the fd is invalid and there is no readable // data left to drain. if ((pfd.revents & (POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLIN)) return false; if ((pfd.revents & POLLIN) == 0) continue; ssize_t rc = ::recv(local_fd, p + got, len - got, 0); if (rc == 0) return false; // Peer closed connection if (rc < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) continue; return false; } got += static_cast(rc); } return true; } void TCPStreamPusher::WriterThread(Connection* c) { while (c->active) { auto e = c->queue.GetBlocking(); if (e.end) break; if (!e.z) continue; if (c->broken) { e.z->release(); continue; } bool ok; { std::unique_lock ul(c->send_mutex); ok = SendFrame(*c, static_cast(e.z->GetImage()), e.z->GetImageSize(), TCPFrameType::DATA, e.z->GetImageNumber()); } // Synchronous send: once send() returns, the payload has been copied into the // kernel socket buffer, so the image-buffer slot can be returned immediately. e.z->release(); if (!ok) { c->broken = true; logger.Error("TCP send failed on socket " + std::to_string(c->socket_number)); } } } void TCPStreamPusher::PersistentAckThread(Connection* c) { while (c->connected && !c->broken) { TcpFrameHeader h{}; if (!ReadExactPersistent(*c, &h, sizeof(h))) { if (c->connected && !c->broken) { c->broken = true; logger.Info("Persistent connection lost on socket " + std::to_string(c->socket_number)); } break; } if (h.magic != JFJOCH_TCP_MAGIC || h.version != JFJOCH_TCP_VERSION) { c->broken = true; logger.Error("Invalid frame on persistent connection, socket " + std::to_string(c->socket_number)); break; } // Any well-formed frame from the peer is a sign of life. c->last_peer_activity_ns.store(SteadyNowNs(), std::memory_order_relaxed); const auto frame_type = static_cast(h.type); // Keepalive pong from the writer if (frame_type == TCPFrameType::KEEPALIVE) { c->last_keepalive_recv = std::chrono::steady_clock::now(); if (h.payload_size > 0) { std::vector discard(h.payload_size); ReadExactPersistent(*c, discard.data(), discard.size()); } continue; } // Busy/backpressure heartbeat: the writer is alive but stalled. The liveness // timestamp update above is all we need; just record reported FIFO occupancy. if (frame_type == TCPFrameType::BUSY) { c->last_ack_fifo_occupancy.store(h.ack_fifo_occupancy, std::memory_order_relaxed); if (h.payload_size > 0) { std::vector discard(h.payload_size); ReadExactPersistent(*c, discard.data(), discard.size()); } continue; } if (frame_type != TCPFrameType::ACK) { c->broken = true; logger.Error("Unexpected frame type " + std::to_string(h.type) + " on socket " + std::to_string(c->socket_number)); break; } c->last_ack_fifo_occupancy.store(h.ack_fifo_occupancy, std::memory_order_relaxed); // ACK frame — forward to data-collection ack logic std::string error_text; if (h.payload_size > 0) { error_text.resize(h.payload_size); if (!ReadExactPersistent(*c, error_text.data(), error_text.size())) { c->broken = true; break; } } const auto ack_for = static_cast(h.ack_for); const bool ok = (h.flags & TCP_ACK_FLAG_OK) != 0; const bool fatal = (h.flags & TCP_ACK_FLAG_FATAL) != 0; const auto code = static_cast(h.ack_code); // Validate run number: discard stale ACKs from a previous run on a persistent connection if (h.run_number != run_number) { logger.Warning("Discarding ACK with stale run_number " + std::to_string(h.run_number) + " (expected " + std::to_string(run_number) + ") on socket " + std::to_string(c->socket_number)); continue; } { std::unique_lock ul(c->ack_mutex); c->last_ack_code = code; if (!error_text.empty()) c->last_ack_error = error_text; if (ack_for == TCPFrameType::START) { c->start_ack_received = true; c->start_ack_ok = ok; if (!ok && error_text.empty()) c->last_ack_error = "START rejected"; } else if (ack_for == TCPFrameType::END) { c->end_ack_received = true; c->end_ack_ok = ok; if (!ok && error_text.empty()) c->last_ack_error = "END rejected"; } else if (ack_for == TCPFrameType::CANCEL) { c->cancel_ack_received = true; c->cancel_ack_ok = ok; if (!ok && error_text.empty()) c->last_ack_error = "CANCEL rejected"; } else if (ack_for == TCPFrameType::DATA) { c->data_acked_total.fetch_add(1, std::memory_order_relaxed); total_data_acked_total.fetch_add(1, std::memory_order_relaxed); if (ok && !fatal) { c->data_acked_ok.fetch_add(1, std::memory_order_relaxed); total_data_acked_ok.fetch_add(1, std::memory_order_relaxed); } else { c->data_acked_bad.fetch_add(1, std::memory_order_relaxed); total_data_acked_bad.fetch_add(1, std::memory_order_relaxed); c->data_ack_error_reported = true; if (!error_text.empty()) { c->data_ack_error_text = error_text; } else if (c->data_ack_error_text.empty()) { c->data_ack_error_text = "DATA ACK failed"; } } } } c->ack_cv.notify_all(); } // If the connection broke, also wake up anyone waiting for an ACK c->ack_cv.notify_all(); } void TCPStreamPusher::AcceptorThread() { uint32_t next_socket_number = 0; while (acceptor_running) { int lfd = listen_fd.load(); if (lfd < 0) break; int new_fd = AcceptOne(lfd, std::chrono::milliseconds(500)); if (new_fd < 0) continue; std::lock_guard lg(connections_mutex); RemoveDeadConnections(); if (connections.size() >= max_connections) { logger.Warning("Max connections (" + std::to_string(max_connections) + ") reached, rejecting new connection"); shutdown(new_fd, SHUT_RDWR); close(new_fd); continue; } SetupNewConnection(new_fd, next_socket_number++); logger.Info("Accepted writer connection (socket_number=" + std::to_string(next_socket_number - 1) + ", total=" + std::to_string(connections.size()) + ")"); } } void TCPStreamPusher::SetupNewConnection(int new_fd, uint32_t socket_number) { auto c = std::make_shared(send_queue_size); c->socket_number = socket_number; c->fd.store(new_fd); int one = 1; setsockopt(new_fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)); // Enable OS-level TCP keep-alive setsockopt(new_fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one)); int idle = 30; // longer than app-level keepalive to avoid interference int intvl = 10; int cnt = 3; setsockopt(new_fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(idle)); setsockopt(new_fd, IPPROTO_TCP, TCP_KEEPINTVL, &intvl, sizeof(intvl)); setsockopt(new_fd, IPPROTO_TCP, TCP_KEEPCNT, &cnt, sizeof(cnt)); if (send_buffer_size) setsockopt(new_fd, SOL_SOCKET, SO_SNDBUF, &send_buffer_size.value(), sizeof(int32_t)); c->connected = true; c->broken = false; auto now = std::chrono::steady_clock::now(); c->last_keepalive_sent = now; c->last_keepalive_recv = now; c->last_peer_activity_ns.store(SteadyNowNs(), std::memory_order_relaxed); auto* raw = c.get(); c->persistent_ack_future = std::async(std::launch::async, &TCPStreamPusher::PersistentAckThread, this, raw); connections.emplace_back(std::move(c)); } void TCPStreamPusher::RemoveDeadConnections() { // Must be called with connections_mutex held. // We move dead connections out, release the mutex implicitly (caller still holds it), // then join their futures. Actually — we can join right here since PersistentAckThread // doesn't take connections_mutex, so no deadlock. auto it = connections.begin(); while (it != connections.end()) { auto c = *it; if (c->broken || !c->connected || !IsConnectionAlive(*c)) { c->connected = false; c->broken = true; StopDataCollectionThreads(*c); CloseFd(c->fd); if (c->persistent_ack_future.valid()) c->persistent_ack_future.get(); logger.Info("Removed dead connection (socket_number=" + std::to_string(c->socket_number) + ")"); it = connections.erase(it); } else { ++it; } } } void TCPStreamPusher::KeepaliveThread() { while (acceptor_running) { // Sleep in small increments so we can exit promptly for (int i = 0; i < 50 && acceptor_running; ++i) std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (!acceptor_running) break; // During data collection, the data flow itself serves as heartbeat if (data_collection_active) continue; std::lock_guard lg(connections_mutex); for (auto& cptr : connections) { auto& c = *cptr; if (c.broken || !c.connected) continue; std::unique_lock ul(c.send_mutex); if (!SendFrame(c, nullptr, 0, TCPFrameType::KEEPALIVE, -1)) { logger.Warning("Keepalive send failed on socket " + std::to_string(c.socket_number)); c.broken = true; } else { c.last_keepalive_sent = std::chrono::steady_clock::now(); } } RemoveDeadConnections(); } } size_t TCPStreamPusher::GetConnectedWriters() const { std::lock_guard lg(connections_mutex); size_t count = 0; for (const auto& c : connections) { if (c->connected && !c->broken) ++count; } return count; } void TCPStreamPusher::StartDataCollectionThreads(Connection& c) { { std::unique_lock ul(c.ack_mutex); c.start_ack_received = false; c.start_ack_ok = false; c.end_ack_received = false; c.end_ack_ok = false; c.cancel_ack_received = false; c.cancel_ack_ok = false; c.last_ack_error.clear(); c.last_ack_code = TCPAckCode::None; c.data_ack_error_reported = false; c.data_ack_error_text.clear(); } c.data_acked_ok.store(0, std::memory_order_relaxed); c.data_acked_bad.store(0, std::memory_order_relaxed); c.data_acked_total.store(0, std::memory_order_relaxed); c.active = true; c.writer_future = std::async(std::launch::async, &TCPStreamPusher::WriterThread, this, &c); } void TCPStreamPusher::StopDataCollectionThreads(Connection& c) { if (!c.active) return; c.active = false; // Avoid potential shutdown deadlock if queue is full and writer is stalled. if (!c.queue.PutTimeout({.end = true}, std::chrono::milliseconds(200))) { c.broken = true; CloseFd(c.fd); // active==false makes WriterThread exit without draining, so return any // queued-but-unsent slots to the image buffer pool ourselves. ImagePusherQueueElement e{}; while (c.queue.Get(e)) { if (e.z) e.z->release(); } (void)c.queue.Put({.end = true}); } c.ack_cv.notify_all(); if (c.writer_future.valid()) c.writer_future.get(); } bool TCPStreamPusher::WaitForAck(Connection& c, TCPFrameType ack_for, std::chrono::milliseconds timeout, std::string* error_text) { std::unique_lock ul(c.ack_mutex); const bool ok = c.ack_cv.wait_for(ul, timeout, [&] { if (ack_for == TCPFrameType::START) return c.start_ack_received || c.broken.load(); if (ack_for == TCPFrameType::END) return c.end_ack_received || c.broken.load(); if (ack_for == TCPFrameType::CANCEL) return c.cancel_ack_received || c.broken.load(); return false; }); if (!ok) { if (error_text) *error_text = "ACK timeout"; return false; } if (c.broken) { if (error_text) *error_text = c.last_ack_error.empty() ? "Socket broken" : c.last_ack_error; return false; } bool ack_ok = false; if (ack_for == TCPFrameType::START) ack_ok = c.start_ack_ok; if (ack_for == TCPFrameType::END) ack_ok = c.end_ack_ok; if (ack_for == TCPFrameType::CANCEL) ack_ok = c.cancel_ack_ok; if (!ack_ok && error_text) *error_text = c.last_ack_error.empty() ? "ACK rejected" : c.last_ack_error; return ack_ok; } bool TCPStreamPusher::WaitForEndAck(Connection& c, std::chrono::milliseconds liveness_timeout, std::string* error_text) { std::unique_lock ul(c.ack_mutex); // After END is sent the writer may still be flushing a backlog to a slow filesystem. // It keeps proving it is alive via BUSY heartbeats and DATA ACKs (both refresh // last_peer_activity_ns), so we wait through arbitrarily long stalls and only give up // if the peer falls completely silent for the whole liveness window. while (!c.end_ack_received && !c.broken) { c.ack_cv.wait_for(ul, liveness_timeout, [&] { return c.end_ack_received || c.broken.load(); }); if (c.end_ack_received || c.broken) break; const int64_t silent_ns = SteadyNowNs() - c.last_peer_activity_ns.load(std::memory_order_relaxed); if (silent_ns > std::chrono::duration_cast(liveness_timeout).count()) { if (error_text) *error_text = "END ACK timeout (writer silent)"; return false; } } if (c.broken) { if (error_text) *error_text = c.last_ack_error.empty() ? "Socket broken" : c.last_ack_error; return false; } if (!c.end_ack_ok && error_text) *error_text = c.last_ack_error.empty() ? "END ACK rejected" : c.last_ack_error; return c.end_ack_ok; } void TCPStreamPusher::StartDataCollection(StartMessage& message) { if (message.images_per_file < 1) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Images per file cannot be zero or negative"); images_per_file = message.images_per_file; run_number = message.run_number; run_name = message.run_name; transmission_error = false; total_data_acked_ok.store(0, std::memory_order_relaxed); total_data_acked_bad.store(0, std::memory_order_relaxed); total_data_acked_total.store(0, std::memory_order_relaxed); std::vector> local_connections; { std::lock_guard lg(connections_mutex); for (auto& c : connections) StopDataCollectionThreads(*c); RemoveDeadConnections(); if (connections.empty()) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No writers connected to " + endpoint); session_connections = connections; calibration_connection = session_connections[0]; local_connections = session_connections; } logger.Info("Starting data collection with " + std::to_string(local_connections.size()) + " connected writers"); data_collection_active = true; for (auto& c : local_connections) StartDataCollectionThreads(*c); std::vector started(local_connections.size(), false); auto rollback_cancel = [&]() { for (size_t i = 0; i < local_connections.size(); i++) { auto& c = *local_connections[i]; if (!started[i] || c.broken) continue; std::unique_lock ul(c.send_mutex); (void)SendFrame(c, nullptr, 0, TCPFrameType::CANCEL, -1); std::string cancel_ack_err; (void)WaitForAck(c, TCPFrameType::CANCEL, std::chrono::milliseconds(500), &cancel_ack_err); } for (auto& c : local_connections) StopDataCollectionThreads(*c); { std::lock_guard lg(connections_mutex); session_connections.clear(); calibration_connection.reset(); } data_collection_active = false; }; for (size_t i = 0; i < local_connections.size(); i++) { auto& c = *local_connections[i]; message.socket_number = static_cast(c.socket_number); message.write_master_file = (i == 0); serializer.SerializeSequenceStart(message); { std::unique_lock ul(c.send_mutex); if (!SendFrame(c, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::START, -1)) { rollback_cancel(); throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Timeout/failure sending START on socket " + std::to_string(c.socket_number)); } } std::string ack_err; if (!WaitForAck(c, TCPFrameType::START, std::chrono::seconds(5), &ack_err)) { rollback_cancel(); throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "START ACK failed on socket " + std::to_string(c.socket_number) + ": " + ack_err); } started[i] = true; } } bool TCPStreamPusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) { std::shared_ptr target; size_t conn_count = 0; { std::lock_guard lg(connections_mutex); const auto& use = (!session_connections.empty() ? session_connections : connections); if (use.empty()) return false; conn_count = use.size(); auto idx = static_cast((image_number / images_per_file) % static_cast(conn_count)); target = use[idx]; } auto& c = *target; if (c.broken || !IsConnectionAlive(c)) return false; std::unique_lock ul(c.send_mutex); return SendFrame(c, image_data, image_size, TCPFrameType::DATA, image_number); } bool TCPStreamPusher::SendImage(ZeroCopyReturnValue &z) { // Look up the target connection while holding the mutex, but do NOT call // PutBlocking while holding it — that can block indefinitely and deadlock // against AcceptorThread/KeepaliveThread. std::shared_ptr target; { std::lock_guard lg(connections_mutex); const auto& use = (!session_connections.empty() ? session_connections : connections); if (use.empty()) { z.release(); return false; } auto idx = static_cast((z.GetImageNumber() / images_per_file) % static_cast(use.size())); target = use[idx]; } if (!target) { z.release(); return false; } // A full queue means the writer is applying backpressure (e.g. briefly stalled on a // freshly-created file at startup). Keep waiting as long as it proves it is alive — // BUSY heartbeats / ACKs refresh last_peer_activity_ns from a thread independent of // its stalled write path — so a slow-but-healthy writer is held back, not dropped. // Only a peer that has gone completely silent for the liveness window is declared dead; // a writer that heartbeats but never drains is caught by SendAll's max_backpressure cap. while (!target->broken && target->active) { if (target->queue.PutTimeout(ImagePusherQueueElement{ .z = &z, .end = false }, std::chrono::milliseconds(50))) return true; const int64_t silent_ns = SteadyNowNs() - target->last_peer_activity_ns.load(std::memory_order_relaxed); if (silent_ns > std::chrono::duration_cast(peer_liveness_timeout).count()) { logger.Warning("No liveness from writer on socket " + std::to_string(target->socket_number) + " for " + std::to_string(silent_ns / 1000000) + " ms while enqueuing; marking broken"); target->broken = true; break; } } z.release(); return false; } bool TCPStreamPusher::EndDataCollection(const EndMessage& message) { serializer.SerializeSequenceEnd(message); bool ret = true; std::vector> local_connections; { std::lock_guard lg(connections_mutex); local_connections = (!session_connections.empty() ? session_connections : connections); } for (auto& cptr : local_connections) { auto& c = *cptr; if (c.broken) { ret = false; continue; } { std::unique_lock ul(c.send_mutex); if (!SendFrame(c, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::END, -1)) { ret = false; continue; } } std::string ack_err; if (!WaitForEndAck(c, peer_liveness_timeout, &ack_err)) ret = false; } for (auto& c : local_connections) StopDataCollectionThreads(*c); { std::lock_guard lg(connections_mutex); session_connections.clear(); calibration_connection.reset(); } data_collection_active = false; transmission_error = !ret; return ret; } bool TCPStreamPusher::SendCalibration(const CompressedImage& message) { std::shared_ptr target; { std::lock_guard lg(connections_mutex); if (!data_collection_active) { logger.Error("Refusing to send TCP calibration: no active run"); return false; } if (!calibration_connection) { logger.Error("Refusing to send TCP calibration: calibration target is not set"); return false; } target = calibration_connection; } if (!target) { logger.Error("Refusing to send TCP calibration: calibration target is null"); return false; } if (target->broken) { logger.Error("Refusing to send TCP calibration: target socket {} is marked broken", target->socket_number); return false; } if (!target->connected) { logger.Error("Refusing to send TCP calibration: target socket {} is not connected", target->socket_number); return false; } if (!target->active) { logger.Error("Refusing to send TCP calibration: target socket {} is not active in current session", target->socket_number); return false; } serializer.SerializeCalibration(message); std::unique_lock ul(target->send_mutex); if (target->broken) { logger.Error("Refusing to send TCP calibration: target socket {} became broken before send", target->socket_number); return false; } if (!target->connected) { logger.Error("Refusing to send TCP calibration: target socket {} disconnected before send", target->socket_number); return false; } if (!target->active) { logger.Error("Refusing to send TCP calibration: target socket {} became inactive before send", target->socket_number); return false; } if (!IsConnectionAlive(*target)) { logger.Error("Refusing to send TCP calibration: target socket {} is not alive", target->socket_number); target->broken = true; return false; } const bool ok = SendFrame(*target, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::CALIBRATION, -1); if (!ok) { logger.Error("Failed to send TCP calibration on socket {}", target->socket_number); target->broken = true; return false; } return true; } std::string TCPStreamPusher::Finalize() { std::string ret; if (transmission_error) ret += "Timeout sending images (e.g., writer disabled during data collection);"; std::lock_guard lg(connections_mutex); for (size_t i = 0; i < connections.size(); i++) { auto& c = *connections[i]; { std::unique_lock ul(c.ack_mutex); if (c.data_ack_error_reported && !c.data_ack_error_text.empty()) { ret += "Writer " + std::to_string(c.socket_number) + ": " + c.data_ack_error_text + ";"; } else if (!c.last_ack_error.empty()) { ret += "Writer " + std::to_string(c.socket_number) + ": " + c.last_ack_error + ";"; } } } return ret; } std::string TCPStreamPusher::PrintSetup() const { return "TCPStreamPusher: endpoint=" + endpoint + " max_connections=" + std::to_string(max_connections) + " connected=" + std::to_string(GetConnectedWriters()); } std::optional TCPStreamPusher::GetImagesWritten() const { return total_data_acked_ok.load(std::memory_order_relaxed); } std::optional TCPStreamPusher::GetImagesWriteError() const { return total_data_acked_bad.load(std::memory_order_relaxed); } std::vector TCPStreamPusher::GetWriterFifoUtilization() const { std::vector ret; std::lock_guard lg(connections_mutex); ret.reserve(connections.size()); for (const auto& c : connections) { ret.push_back(c->last_ack_fifo_occupancy.load(std::memory_order_relaxed)); } return ret; }