Files
Jungfraujoch/image_pusher/TCPStreamPusher.cpp
T
leonarski_f 2a9fd084ab TCPStreamPusher: post-zerocopy cleanup + fix queue-path backpressure drop
Follow-up simplifications after removing the zerocopy machinery, plus a real
backpressure bug the cleanup surfaced:

- SendImage(ZeroCopyReturnValue&) imposed a hard 2s deadline on enqueueing and
  then marked the connection broken. At high frame rate the 128-deep queue
  fills in tens of ms, so any filesystem stall longer than ~2s dropped the run
  even though the writer was alive and heartbeating -- defeating the whole
  BUSY-heartbeat backpressure design. Block instead while the peer is alive
  (!broken && active); the real liveness decision already lives in SendAll's
  peer-liveness timeout, which the writer's BUSY heartbeats keep fresh. This
  makes the queue path consistent with the send path: both wait out arbitrarily
  long stalls and only give up when the peer goes genuinely silent.
- Drop the dead per-connection data_sent counter (written, never read) and the
  redundant ImagePusherQueueElement.image_data set on the TCP path (only the
  HDF5 pusher reads that field).
- Add SetPeerLivenessTimeout() so the liveness window is tunable (and testable).

Add TCPImageCommTest_StalledWriter_SurvivesViaHeartbeat: a controllable raw
writer double connects, ACKs START, then stops draining for 4s while still
sending BUSY heartbeats (peer-liveness window set to 2s). The run must ride out
the stall on the zero-copy queue path and deliver all 1000 images. Verified to
fail (115/1000 delivered, connection dropped) against the old 2s-deadline
behavior.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-25 15:55:49 +02:00

1086 lines
37 KiB
C++

// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include "TCPStreamPusher.h"
#include <limits>
#include <poll.h>
#include <cerrno>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <chrono>
namespace {
int64_t SteadyNowNs() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
}
std::pair<std::string, std::optional<uint16_t>> TCPStreamPusher::ParseTcpAddress(const std::string& addr) {
const std::string prefix = "tcp://";
if (addr.rfind(prefix, 0) != 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP address: " + addr);
auto hp = addr.substr(prefix.size());
auto p = hp.find_last_of(':');
if (p == std::string::npos)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP address: " + addr);
const auto host = hp.substr(0, p);
const auto port_str = hp.substr(p + 1);
if (port_str == "*")
return {host, std::nullopt};
int port_i = 0;
try {
size_t parsed = 0;
port_i = std::stoi(port_str, &parsed);
if (parsed != port_str.size())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP port in address: " + addr);
} catch (...) {
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP port in address: " + addr);
}
if (port_i < 1 || port_i > static_cast<int>(std::numeric_limits<uint16_t>::max()))
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "TCP port out of range in address: " + addr);
return {host, static_cast<uint16_t>(port_i)};
}
std::pair<int, std::string> TCPStreamPusher::OpenListenSocket(const std::string& addr) {
auto [host, port_opt] = ParseTcpAddress(addr);
int listen_fd = ::socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "socket(listen) failed");
int one = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
sockaddr_in sin{};
sin.sin_family = AF_INET;
sin.sin_port = htons(port_opt.has_value() ? port_opt.value() : 0);
if (host == "*" || host == "0.0.0.0")
sin.sin_addr.s_addr = htonl(INADDR_ANY);
else if (inet_pton(AF_INET, host.c_str(), &sin.sin_addr) != 1)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "inet_pton failed for " + host);
if (bind(listen_fd, reinterpret_cast<sockaddr*>(&sin), sizeof(sin)) != 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "bind() failed to " + addr);
if (listen(listen_fd, 64) != 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "listen() failed on " + addr);
sockaddr_in actual{};
socklen_t actual_len = sizeof(actual);
if (getsockname(listen_fd, reinterpret_cast<sockaddr*>(&actual), &actual_len) != 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "getsockname() failed on " + addr);
const uint16_t bound_port = ntohs(actual.sin_port);
const std::string normalized_host = (host == "*") ? "0.0.0.0" : host;
const std::string bound_endpoint = "tcp://" + normalized_host + ":" + std::to_string(bound_port);
return {listen_fd, bound_endpoint};
}
int TCPStreamPusher::AcceptOne(int listen_fd, std::chrono::milliseconds timeout) {
pollfd pfd{};
pfd.fd = listen_fd;
pfd.events = POLLIN;
const int prc = poll(&pfd, 1, static_cast<int>(timeout.count()));
if (prc <= 0)
return -1;
if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0)
return -1;
return accept(listen_fd, nullptr, nullptr);
}
void TCPStreamPusher::CloseFd(std::atomic<int>& fd) {
int old_fd = fd.exchange(-1);
if (old_fd >= 0) {
shutdown(old_fd, SHUT_RDWR);
close(old_fd);
}
}
TCPStreamPusher::TCPStreamPusher(const std::string& addr,
size_t in_max_connections,
std::optional<int32_t> in_send_buffer_size)
: endpoint(addr),
max_connections(in_max_connections),
send_buffer_size(in_send_buffer_size) {
if (endpoint.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No TCP writer address provided");
if (max_connections == 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Max TCP connections cannot be zero");
auto [lfd, bound_endpoint] = OpenListenSocket(endpoint);
listen_fd.store(lfd);
endpoint = bound_endpoint;
acceptor_running = true;
acceptor_future = std::async(std::launch::async, &TCPStreamPusher::AcceptorThread, this);
keepalive_future = std::async(std::launch::async, &TCPStreamPusher::KeepaliveThread, this);
logger.Info("TCPStreamPusher listening on " + endpoint + " (max " + std::to_string(max_connections) + " connections)");
}
TCPStreamPusher::~TCPStreamPusher() {
// 1. Stop acceptor + keepalive threads (they take connections_mutex briefly)
acceptor_running = false;
int lfd = listen_fd.exchange(-1);
if (lfd >= 0) {
shutdown(lfd, SHUT_RDWR);
close(lfd);
}
if (acceptor_future.valid())
acceptor_future.get();
if (keepalive_future.valid())
keepalive_future.get();
// 2. Now no background threads touch connections_mutex. Tear down connections.
// We do NOT hold the mutex while joining futures, to avoid deadlock.
std::vector<std::shared_ptr<Connection>> local_connections;
{
std::lock_guard lg(connections_mutex);
local_connections = std::move(connections);
connections.clear();
session_connections.clear();
calibration_connection.reset();
}
for (auto& c : local_connections) {
if (c)
TearDownConnection(*c);
}
}
void TCPStreamPusher::TearDownConnection(Connection& c) {
StopDataCollectionThreads(c);
c.connected = false;
c.broken = true;
CloseFd(c.fd);
if (c.persistent_ack_future.valid())
c.persistent_ack_future.get();
}
bool TCPStreamPusher::IsConnectionAlive(const Connection& c) const {
if (c.broken)
return false;
const int local_fd = c.fd.load();
if (local_fd < 0)
return false;
pollfd pfd{};
pfd.fd = local_fd;
pfd.events = POLLOUT;
if (poll(&pfd, 1, 0) < 0)
return false;
if ((pfd.revents & (POLLHUP | POLLNVAL)) != 0)
return false;
int so_error = 0;
socklen_t len = sizeof(so_error);
if (getsockopt(local_fd, SOL_SOCKET, SO_ERROR, &so_error, &len) != 0)
return false;
return so_error == 0;
}
bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len) {
const auto* p = static_cast<const uint8_t*>(buf);
size_t sent = 0;
while (sent < len) {
const int local_fd = c.fd.load();
if (local_fd < 0 || c.broken)
return false;
// Treat backpressure as fatal only if the peer shows NO sign of life. A busy
// writer keeps refreshing last_peer_activity_ns via BUSY heartbeats / ACKs, so
// we wait through arbitrarily long stalls; only a silent peer trips the timeout.
const int64_t silent_ns = SteadyNowNs() - c.last_peer_activity_ns.load(std::memory_order_relaxed);
if (silent_ns > std::chrono::duration_cast<std::chrono::nanoseconds>(peer_liveness_timeout).count()) {
logger.Warning("No liveness from writer on socket " + std::to_string(c.socket_number) +
" for " + std::to_string(silent_ns / 1000000) + " ms while sending; marking broken");
c.broken = true;
CloseFd(c.fd);
return false;
}
pollfd pfd{};
pfd.fd = local_fd;
pfd.events = POLLOUT;
const int prc = poll(&pfd, 1, static_cast<int>(send_poll_timeout.count()));
if (prc < 0) {
if (errno == EINTR)
continue;
c.broken = true;
CloseFd(c.fd);
return false;
}
if (prc == 0)
continue;
if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLOUT)) {
c.broken = true;
CloseFd(c.fd);
return false;
}
ssize_t rc = ::send(local_fd, p + sent, len - sent, MSG_NOSIGNAL);
if (rc < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
continue;
if (errno == EPIPE || errno == ECONNRESET || errno == ENOTCONN) {
c.broken = true;
CloseFd(c.fd);
}
return false;
}
sent += static_cast<size_t>(rc);
}
return true;
}
bool TCPStreamPusher::SendFrame(Connection& c, const uint8_t* data, size_t size, TCPFrameType type, int64_t image_number) {
TcpFrameHeader h{};
h.type = static_cast<uint16_t>(type);
h.payload_size = size;
h.image_number = image_number >= 0 ? static_cast<uint64_t>(image_number) : 0;
h.socket_number = c.socket_number;
h.run_number = run_number;
if (!SendAll(c, &h, sizeof(h)))
return false;
if (size > 0 && !SendAll(c, data, size))
return false;
return true;
}
bool TCPStreamPusher::ReadExact(Connection& c, void* buf, size_t len) {
auto* p = static_cast<uint8_t*>(buf);
size_t got = 0;
while (got < len) {
if (!c.active)
return false;
const int local_fd = c.fd.load();
if (local_fd < 0)
return false;
pollfd pfd{};
pfd.fd = local_fd;
pfd.events = POLLIN;
const int prc = poll(&pfd, 1, 100);
if (prc == 0)
continue;
if (prc < 0) {
if (errno == EINTR)
continue;
return false;
}
if ((pfd.revents & (POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLIN))
return false;
if ((pfd.revents & POLLIN) == 0)
continue;
ssize_t rc = ::recv(local_fd, p + got, len - got, 0);
if (rc == 0)
return false;
if (rc < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
continue;
return false;
}
got += static_cast<size_t>(rc);
}
return true;
}
bool TCPStreamPusher::ReadExactPersistent(Connection& c, void* buf, size_t len) {
auto* p = static_cast<uint8_t*>(buf);
size_t got = 0;
while (got < len) {
if (!c.connected || c.broken)
return false;
const int local_fd = c.fd.load();
if (local_fd < 0)
return false;
pollfd pfd{};
pfd.fd = local_fd;
pfd.events = POLLIN;
const int prc = poll(&pfd, 1, 500);
if (prc == 0)
continue;
if (prc < 0) {
if (errno == EINTR)
continue;
return false;
}
// Only bail if the peer hung up / the fd is invalid and there is no readable
// data left to drain.
if ((pfd.revents & (POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLIN))
return false;
if ((pfd.revents & POLLIN) == 0)
continue;
ssize_t rc = ::recv(local_fd, p + got, len - got, 0);
if (rc == 0)
return false; // Peer closed connection
if (rc < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
continue;
return false;
}
got += static_cast<size_t>(rc);
}
return true;
}
void TCPStreamPusher::WriterThread(Connection* c) {
while (c->active) {
auto e = c->queue.GetBlocking();
if (e.end)
break;
if (!e.z)
continue;
if (c->broken) {
e.z->release();
continue;
}
bool ok;
{
std::unique_lock ul(c->send_mutex);
ok = SendFrame(*c,
static_cast<const uint8_t*>(e.z->GetImage()),
e.z->GetImageSize(),
TCPFrameType::DATA,
e.z->GetImageNumber());
}
// Synchronous send: once send() returns, the payload has been copied into the
// kernel socket buffer, so the image-buffer slot can be returned immediately.
e.z->release();
if (!ok) {
c->broken = true;
logger.Error("TCP send failed on socket " + std::to_string(c->socket_number));
}
}
}
void TCPStreamPusher::PersistentAckThread(Connection* c) {
while (c->connected && !c->broken) {
TcpFrameHeader h{};
if (!ReadExactPersistent(*c, &h, sizeof(h))) {
if (c->connected && !c->broken) {
c->broken = true;
logger.Info("Persistent connection lost on socket " + std::to_string(c->socket_number));
}
break;
}
if (h.magic != JFJOCH_TCP_MAGIC || h.version != JFJOCH_TCP_VERSION) {
c->broken = true;
logger.Error("Invalid frame on persistent connection, socket " + std::to_string(c->socket_number));
break;
}
// Any well-formed frame from the peer is a sign of life.
c->last_peer_activity_ns.store(SteadyNowNs(), std::memory_order_relaxed);
const auto frame_type = static_cast<TCPFrameType>(h.type);
// Keepalive pong from the writer
if (frame_type == TCPFrameType::KEEPALIVE) {
c->last_keepalive_recv = std::chrono::steady_clock::now();
if (h.payload_size > 0) {
std::vector<uint8_t> discard(h.payload_size);
ReadExactPersistent(*c, discard.data(), discard.size());
}
continue;
}
// Busy/backpressure heartbeat: the writer is alive but stalled. The liveness
// timestamp update above is all we need; just record reported FIFO occupancy.
if (frame_type == TCPFrameType::BUSY) {
c->last_ack_fifo_occupancy.store(h.ack_fifo_occupancy, std::memory_order_relaxed);
if (h.payload_size > 0) {
std::vector<uint8_t> discard(h.payload_size);
ReadExactPersistent(*c, discard.data(), discard.size());
}
continue;
}
if (frame_type != TCPFrameType::ACK) {
c->broken = true;
logger.Error("Unexpected frame type " + std::to_string(h.type) + " on socket " + std::to_string(c->socket_number));
break;
}
c->last_ack_fifo_occupancy.store(h.ack_fifo_occupancy, std::memory_order_relaxed);
// ACK frame — forward to data-collection ack logic
std::string error_text;
if (h.payload_size > 0) {
error_text.resize(h.payload_size);
if (!ReadExactPersistent(*c, error_text.data(), error_text.size())) {
c->broken = true;
break;
}
}
const auto ack_for = static_cast<TCPFrameType>(h.ack_for);
const bool ok = (h.flags & TCP_ACK_FLAG_OK) != 0;
const bool fatal = (h.flags & TCP_ACK_FLAG_FATAL) != 0;
const auto code = static_cast<TCPAckCode>(h.ack_code);
// Validate run number: discard stale ACKs from a previous run on a persistent connection
if (h.run_number != run_number) {
logger.Warning("Discarding ACK with stale run_number " + std::to_string(h.run_number)
+ " (expected " + std::to_string(run_number) + ") on socket "
+ std::to_string(c->socket_number));
continue;
}
{
std::unique_lock ul(c->ack_mutex);
c->last_ack_code = code;
if (!error_text.empty())
c->last_ack_error = error_text;
if (ack_for == TCPFrameType::START) {
c->start_ack_received = true;
c->start_ack_ok = ok;
if (!ok && error_text.empty())
c->last_ack_error = "START rejected";
} else if (ack_for == TCPFrameType::END) {
c->end_ack_received = true;
c->end_ack_ok = ok;
if (!ok && error_text.empty())
c->last_ack_error = "END rejected";
} else if (ack_for == TCPFrameType::CANCEL) {
c->cancel_ack_received = true;
c->cancel_ack_ok = ok;
if (!ok && error_text.empty())
c->last_ack_error = "CANCEL rejected";
} else if (ack_for == TCPFrameType::DATA) {
c->data_acked_total.fetch_add(1, std::memory_order_relaxed);
total_data_acked_total.fetch_add(1, std::memory_order_relaxed);
if (ok && !fatal) {
c->data_acked_ok.fetch_add(1, std::memory_order_relaxed);
total_data_acked_ok.fetch_add(1, std::memory_order_relaxed);
} else {
c->data_acked_bad.fetch_add(1, std::memory_order_relaxed);
total_data_acked_bad.fetch_add(1, std::memory_order_relaxed);
c->data_ack_error_reported = true;
if (!error_text.empty()) {
c->data_ack_error_text = error_text;
} else if (c->data_ack_error_text.empty()) {
c->data_ack_error_text = "DATA ACK failed";
}
}
}
}
c->ack_cv.notify_all();
}
// If the connection broke, also wake up anyone waiting for an ACK
c->ack_cv.notify_all();
}
void TCPStreamPusher::AcceptorThread() {
uint32_t next_socket_number = 0;
while (acceptor_running) {
int lfd = listen_fd.load();
if (lfd < 0)
break;
int new_fd = AcceptOne(lfd, std::chrono::milliseconds(500));
if (new_fd < 0)
continue;
std::lock_guard lg(connections_mutex);
RemoveDeadConnections();
if (connections.size() >= max_connections) {
logger.Warning("Max connections (" + std::to_string(max_connections) +
") reached, rejecting new connection");
shutdown(new_fd, SHUT_RDWR);
close(new_fd);
continue;
}
SetupNewConnection(new_fd, next_socket_number++);
logger.Info("Accepted writer connection (socket_number=" + std::to_string(next_socket_number - 1) +
", total=" + std::to_string(connections.size()) + ")");
}
}
void TCPStreamPusher::SetupNewConnection(int new_fd, uint32_t socket_number) {
auto c = std::make_shared<Connection>(send_queue_size);
c->socket_number = socket_number;
c->fd.store(new_fd);
int one = 1;
setsockopt(new_fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
// Enable OS-level TCP keep-alive
setsockopt(new_fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
int idle = 30; // longer than app-level keepalive to avoid interference
int intvl = 10;
int cnt = 3;
setsockopt(new_fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(idle));
setsockopt(new_fd, IPPROTO_TCP, TCP_KEEPINTVL, &intvl, sizeof(intvl));
setsockopt(new_fd, IPPROTO_TCP, TCP_KEEPCNT, &cnt, sizeof(cnt));
if (send_buffer_size)
setsockopt(new_fd, SOL_SOCKET, SO_SNDBUF, &send_buffer_size.value(), sizeof(int32_t));
c->connected = true;
c->broken = false;
auto now = std::chrono::steady_clock::now();
c->last_keepalive_sent = now;
c->last_keepalive_recv = now;
c->last_peer_activity_ns.store(SteadyNowNs(), std::memory_order_relaxed);
auto* raw = c.get();
c->persistent_ack_future = std::async(std::launch::async, &TCPStreamPusher::PersistentAckThread, this, raw);
connections.emplace_back(std::move(c));
}
void TCPStreamPusher::RemoveDeadConnections() {
// Must be called with connections_mutex held.
// We move dead connections out, release the mutex implicitly (caller still holds it),
// then join their futures. Actually — we can join right here since PersistentAckThread
// doesn't take connections_mutex, so no deadlock.
auto it = connections.begin();
while (it != connections.end()) {
auto c = *it;
if (c->broken || !c->connected || !IsConnectionAlive(*c)) {
c->connected = false;
c->broken = true;
StopDataCollectionThreads(*c);
CloseFd(c->fd);
if (c->persistent_ack_future.valid())
c->persistent_ack_future.get();
logger.Info("Removed dead connection (socket_number=" + std::to_string(c->socket_number) + ")");
it = connections.erase(it);
} else {
++it;
}
}
}
void TCPStreamPusher::KeepaliveThread() {
while (acceptor_running) {
// Sleep in small increments so we can exit promptly
for (int i = 0; i < 50 && acceptor_running; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (!acceptor_running)
break;
// During data collection, the data flow itself serves as heartbeat
if (data_collection_active)
continue;
std::lock_guard lg(connections_mutex);
for (auto& cptr : connections) {
auto& c = *cptr;
if (c.broken || !c.connected)
continue;
std::unique_lock ul(c.send_mutex);
if (!SendFrame(c, nullptr, 0, TCPFrameType::KEEPALIVE, -1)) {
logger.Warning("Keepalive send failed on socket " + std::to_string(c.socket_number));
c.broken = true;
} else {
c.last_keepalive_sent = std::chrono::steady_clock::now();
}
}
RemoveDeadConnections();
}
}
size_t TCPStreamPusher::GetConnectedWriters() const {
std::lock_guard lg(connections_mutex);
size_t count = 0;
for (const auto& c : connections) {
if (c->connected && !c->broken)
++count;
}
return count;
}
void TCPStreamPusher::StartDataCollectionThreads(Connection& c) {
{
std::unique_lock ul(c.ack_mutex);
c.start_ack_received = false;
c.start_ack_ok = false;
c.end_ack_received = false;
c.end_ack_ok = false;
c.cancel_ack_received = false;
c.cancel_ack_ok = false;
c.last_ack_error.clear();
c.last_ack_code = TCPAckCode::None;
c.data_ack_error_reported = false;
c.data_ack_error_text.clear();
}
c.data_acked_ok.store(0, std::memory_order_relaxed);
c.data_acked_bad.store(0, std::memory_order_relaxed);
c.data_acked_total.store(0, std::memory_order_relaxed);
c.active = true;
c.writer_future = std::async(std::launch::async, &TCPStreamPusher::WriterThread, this, &c);
}
void TCPStreamPusher::StopDataCollectionThreads(Connection& c) {
if (!c.active)
return;
c.active = false;
// Avoid potential shutdown deadlock if queue is full and writer is stalled.
if (!c.queue.PutTimeout({.end = true}, std::chrono::milliseconds(200))) {
c.broken = true;
CloseFd(c.fd);
// active==false makes WriterThread exit without draining, so return any
// queued-but-unsent slots to the image buffer pool ourselves.
ImagePusherQueueElement e{};
while (c.queue.Get(e)) {
if (e.z)
e.z->release();
}
(void)c.queue.Put({.end = true});
}
c.ack_cv.notify_all();
if (c.writer_future.valid())
c.writer_future.get();
}
bool TCPStreamPusher::WaitForAck(Connection& c, TCPFrameType ack_for, std::chrono::milliseconds timeout, std::string* error_text) {
std::unique_lock ul(c.ack_mutex);
const bool ok = c.ack_cv.wait_for(ul, timeout, [&] {
if (ack_for == TCPFrameType::START)
return c.start_ack_received || c.broken.load();
if (ack_for == TCPFrameType::END)
return c.end_ack_received || c.broken.load();
if (ack_for == TCPFrameType::CANCEL)
return c.cancel_ack_received || c.broken.load();
return false;
});
if (!ok) {
if (error_text) *error_text = "ACK timeout";
return false;
}
if (c.broken) {
if (error_text) *error_text = c.last_ack_error.empty() ? "Socket broken" : c.last_ack_error;
return false;
}
bool ack_ok = false;
if (ack_for == TCPFrameType::START) ack_ok = c.start_ack_ok;
if (ack_for == TCPFrameType::END) ack_ok = c.end_ack_ok;
if (ack_for == TCPFrameType::CANCEL) ack_ok = c.cancel_ack_ok;
if (!ack_ok && error_text)
*error_text = c.last_ack_error.empty() ? "ACK rejected" : c.last_ack_error;
return ack_ok;
}
bool TCPStreamPusher::WaitForEndAck(Connection& c, std::chrono::milliseconds liveness_timeout, std::string* error_text) {
std::unique_lock ul(c.ack_mutex);
// After END is sent the writer may still be flushing a backlog to a slow filesystem.
// It keeps proving it is alive via BUSY heartbeats and DATA ACKs (both refresh
// last_peer_activity_ns), so we wait through arbitrarily long stalls and only give up
// if the peer falls completely silent for the whole liveness window.
while (!c.end_ack_received && !c.broken) {
c.ack_cv.wait_for(ul, liveness_timeout, [&] {
return c.end_ack_received || c.broken.load();
});
if (c.end_ack_received || c.broken)
break;
const int64_t silent_ns = SteadyNowNs() - c.last_peer_activity_ns.load(std::memory_order_relaxed);
if (silent_ns > std::chrono::duration_cast<std::chrono::nanoseconds>(liveness_timeout).count()) {
if (error_text) *error_text = "END ACK timeout (writer silent)";
return false;
}
}
if (c.broken) {
if (error_text) *error_text = c.last_ack_error.empty() ? "Socket broken" : c.last_ack_error;
return false;
}
if (!c.end_ack_ok && error_text)
*error_text = c.last_ack_error.empty() ? "END ACK rejected" : c.last_ack_error;
return c.end_ack_ok;
}
void TCPStreamPusher::StartDataCollection(StartMessage& message) {
if (message.images_per_file < 1)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Images per file cannot be zero or negative");
images_per_file = message.images_per_file;
run_number = message.run_number;
run_name = message.run_name;
transmission_error = false;
total_data_acked_ok.store(0, std::memory_order_relaxed);
total_data_acked_bad.store(0, std::memory_order_relaxed);
total_data_acked_total.store(0, std::memory_order_relaxed);
std::vector<std::shared_ptr<Connection>> local_connections;
{
std::lock_guard lg(connections_mutex);
for (auto& c : connections)
StopDataCollectionThreads(*c);
RemoveDeadConnections();
if (connections.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No writers connected to " + endpoint);
session_connections = connections;
calibration_connection = session_connections[0];
local_connections = session_connections;
}
logger.Info("Starting data collection with " + std::to_string(local_connections.size()) + " connected writers");
data_collection_active = true;
for (auto& c : local_connections)
StartDataCollectionThreads(*c);
std::vector<bool> started(local_connections.size(), false);
auto rollback_cancel = [&]() {
for (size_t i = 0; i < local_connections.size(); i++) {
auto& c = *local_connections[i];
if (!started[i] || c.broken)
continue;
std::unique_lock ul(c.send_mutex);
(void)SendFrame(c, nullptr, 0, TCPFrameType::CANCEL, -1);
std::string cancel_ack_err;
(void)WaitForAck(c, TCPFrameType::CANCEL, std::chrono::milliseconds(500), &cancel_ack_err);
}
for (auto& c : local_connections)
StopDataCollectionThreads(*c);
{
std::lock_guard lg(connections_mutex);
session_connections.clear();
calibration_connection.reset();
}
data_collection_active = false;
};
for (size_t i = 0; i < local_connections.size(); i++) {
auto& c = *local_connections[i];
message.socket_number = static_cast<int64_t>(c.socket_number);
message.write_master_file = (i == 0);
serializer.SerializeSequenceStart(message);
{
std::unique_lock ul(c.send_mutex);
if (!SendFrame(c, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::START, -1)) {
rollback_cancel();
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Timeout/failure sending START on socket " + std::to_string(c.socket_number));
}
}
std::string ack_err;
if (!WaitForAck(c, TCPFrameType::START, std::chrono::seconds(5), &ack_err)) {
rollback_cancel();
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"START ACK failed on socket " + std::to_string(c.socket_number) + ": " + ack_err);
}
started[i] = true;
}
}
bool TCPStreamPusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) {
std::shared_ptr<Connection> target;
size_t conn_count = 0;
{
std::lock_guard lg(connections_mutex);
const auto& use = (!session_connections.empty() ? session_connections : connections);
if (use.empty())
return false;
conn_count = use.size();
auto idx = static_cast<size_t>((image_number / images_per_file) % static_cast<int64_t>(conn_count));
target = use[idx];
}
auto& c = *target;
if (c.broken || !IsConnectionAlive(c))
return false;
std::unique_lock ul(c.send_mutex);
return SendFrame(c, image_data, image_size, TCPFrameType::DATA, image_number);
}
bool TCPStreamPusher::SendImage(ZeroCopyReturnValue &z) {
// Look up the target connection while holding the mutex, but do NOT call
// PutBlocking while holding it — that can block indefinitely and deadlock
// against AcceptorThread/KeepaliveThread.
std::shared_ptr<Connection> target;
{
std::lock_guard lg(connections_mutex);
const auto& use = (!session_connections.empty() ? session_connections : connections);
if (use.empty()) {
z.release();
return false;
}
auto idx = static_cast<size_t>((z.GetImageNumber() / images_per_file) % static_cast<int64_t>(use.size()));
target = use[idx];
}
if (!target) {
z.release();
return false;
}
// A full queue means the writer is applying backpressure (e.g. briefly stalled on a
// freshly-created file at startup). Keep waiting as long as it proves it is alive —
// BUSY heartbeats / ACKs refresh last_peer_activity_ns from a thread independent of
// its stalled write path — so a slow-but-healthy writer is held back, not dropped.
// Only a peer that has gone completely silent for the liveness window is declared dead;
// a writer that heartbeats but never drains is caught by SendAll's max_backpressure cap.
while (!target->broken && target->active) {
if (target->queue.PutTimeout(ImagePusherQueueElement{
.z = &z,
.end = false
}, std::chrono::milliseconds(50)))
return true;
const int64_t silent_ns = SteadyNowNs() - target->last_peer_activity_ns.load(std::memory_order_relaxed);
if (silent_ns > std::chrono::duration_cast<std::chrono::nanoseconds>(peer_liveness_timeout).count()) {
logger.Warning("No liveness from writer on socket " + std::to_string(target->socket_number) +
" for " + std::to_string(silent_ns / 1000000) + " ms while enqueuing; marking broken");
target->broken = true;
break;
}
}
z.release();
return false;
}
bool TCPStreamPusher::EndDataCollection(const EndMessage& message) {
serializer.SerializeSequenceEnd(message);
bool ret = true;
std::vector<std::shared_ptr<Connection>> local_connections;
{
std::lock_guard lg(connections_mutex);
local_connections = (!session_connections.empty() ? session_connections : connections);
}
for (auto& cptr : local_connections) {
auto& c = *cptr;
if (c.broken) {
ret = false;
continue;
}
{
std::unique_lock ul(c.send_mutex);
if (!SendFrame(c, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::END, -1)) {
ret = false;
continue;
}
}
std::string ack_err;
if (!WaitForEndAck(c, peer_liveness_timeout, &ack_err))
ret = false;
}
for (auto& c : local_connections)
StopDataCollectionThreads(*c);
{
std::lock_guard lg(connections_mutex);
session_connections.clear();
calibration_connection.reset();
}
data_collection_active = false;
transmission_error = !ret;
return ret;
}
bool TCPStreamPusher::SendCalibration(const CompressedImage& message) {
std::shared_ptr<Connection> target;
{
std::lock_guard lg(connections_mutex);
if (!data_collection_active) {
logger.Error("Refusing to send TCP calibration: no active run");
return false;
}
if (!calibration_connection) {
logger.Error("Refusing to send TCP calibration: calibration target is not set");
return false;
}
target = calibration_connection;
}
if (!target) {
logger.Error("Refusing to send TCP calibration: calibration target is null");
return false;
}
if (target->broken) {
logger.Error("Refusing to send TCP calibration: target socket {} is marked broken", target->socket_number);
return false;
}
if (!target->connected) {
logger.Error("Refusing to send TCP calibration: target socket {} is not connected", target->socket_number);
return false;
}
if (!target->active) {
logger.Error("Refusing to send TCP calibration: target socket {} is not active in current session", target->socket_number);
return false;
}
serializer.SerializeCalibration(message);
std::unique_lock ul(target->send_mutex);
if (target->broken) {
logger.Error("Refusing to send TCP calibration: target socket {} became broken before send", target->socket_number);
return false;
}
if (!target->connected) {
logger.Error("Refusing to send TCP calibration: target socket {} disconnected before send", target->socket_number);
return false;
}
if (!target->active) {
logger.Error("Refusing to send TCP calibration: target socket {} became inactive before send", target->socket_number);
return false;
}
if (!IsConnectionAlive(*target)) {
logger.Error("Refusing to send TCP calibration: target socket {} is not alive", target->socket_number);
target->broken = true;
return false;
}
const bool ok = SendFrame(*target,
serialization_buffer.data(),
serializer.GetBufferSize(),
TCPFrameType::CALIBRATION,
-1);
if (!ok) {
logger.Error("Failed to send TCP calibration on socket {}", target->socket_number);
target->broken = true;
return false;
}
return true;
}
std::string TCPStreamPusher::Finalize() {
std::string ret;
if (transmission_error)
ret += "Timeout sending images (e.g., writer disabled during data collection);";
std::lock_guard lg(connections_mutex);
for (size_t i = 0; i < connections.size(); i++) {
auto& c = *connections[i];
{
std::unique_lock ul(c.ack_mutex);
if (c.data_ack_error_reported && !c.data_ack_error_text.empty()) {
ret += "Writer " + std::to_string(c.socket_number) + ": " + c.data_ack_error_text + ";";
} else if (!c.last_ack_error.empty()) {
ret += "Writer " + std::to_string(c.socket_number) + ": " + c.last_ack_error + ";";
}
}
}
return ret;
}
std::string TCPStreamPusher::PrintSetup() const {
return "TCPStreamPusher: endpoint=" + endpoint +
" max_connections=" + std::to_string(max_connections) +
" connected=" + std::to_string(GetConnectedWriters());
}
std::optional<uint64_t> TCPStreamPusher::GetImagesWritten() const {
return total_data_acked_ok.load(std::memory_order_relaxed);
}
std::optional<uint64_t> TCPStreamPusher::GetImagesWriteError() const {
return total_data_acked_bad.load(std::memory_order_relaxed);
}
std::vector<int64_t> TCPStreamPusher::GetWriterFifoUtilization() const {
std::vector<int64_t> ret;
std::lock_guard lg(connections_mutex);
ret.reserve(connections.size());
for (const auto& c : connections) {
ret.push_back(c->last_ack_fifo_occupancy.load(std::memory_order_relaxed));
}
return ret;
}