Files
Jungfraujoch/image_pusher/TCPStreamPusher.cpp
leonarski_f 6136f858af
Build Packages / Unit tests (push) Successful in 1h26m51s
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 13m23s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 13m56s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 13m43s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 12m53s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 13m44s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 14m22s
Build Packages / build:rpm (rocky8) (push) Successful in 13m1s
Build Packages / build:rpm (rocky9) (push) Successful in 14m6s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 13m0s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 11m51s
Build Packages / DIALS test (push) Successful in 13m52s
Build Packages / XDS test (durin plugin) (push) Successful in 9m24s
Build Packages / XDS test (JFJoch plugin) (push) Successful in 9m35s
Build Packages / XDS test (neggia plugin) (push) Successful in 6m57s
Build Packages / Generate python client (push) Successful in 35s
Build Packages / Build documentation (push) Successful in 47s
Build Packages / Create release (push) Skipped
v1.0.0-rc.154 (#64)
This is an UNSTABLE release. It includes many experimental features, as well as many AI generated fixes. We recommend using rc.152 for production use.

* jfjoch_broker: Fix to TCP file pusher (remove kernel zero copy to improve reliability)

Reviewed-on: #64
Co-authored-by: Filip Leonarski <filip.leonarski@psi.ch>
Co-committed-by: Filip Leonarski <filip.leonarski@psi.ch>
2026-06-25 18:12:00 +02:00

1113 lines
39 KiB
C++

// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#include "TCPStreamPusher.h"
#include <limits>
#include <poll.h>
#include <cerrno>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <chrono>
namespace {
int64_t SteadyNowNs() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
}
std::pair<std::string, std::optional<uint16_t>> TCPStreamPusher::ParseTcpAddress(const std::string& addr) {
const std::string prefix = "tcp://";
if (addr.rfind(prefix, 0) != 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP address: " + addr);
auto hp = addr.substr(prefix.size());
auto p = hp.find_last_of(':');
if (p == std::string::npos)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP address: " + addr);
const auto host = hp.substr(0, p);
const auto port_str = hp.substr(p + 1);
if (port_str == "*")
return {host, std::nullopt};
int port_i = 0;
try {
size_t parsed = 0;
port_i = std::stoi(port_str, &parsed);
if (parsed != port_str.size())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP port in address: " + addr);
} catch (...) {
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP port in address: " + addr);
}
if (port_i < 1 || port_i > static_cast<int>(std::numeric_limits<uint16_t>::max()))
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "TCP port out of range in address: " + addr);
return {host, static_cast<uint16_t>(port_i)};
}
std::pair<int, std::string> TCPStreamPusher::OpenListenSocket(const std::string& addr) {
auto [host, port_opt] = ParseTcpAddress(addr);
int listen_fd = ::socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "socket(listen) failed");
int one = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
sockaddr_in sin{};
sin.sin_family = AF_INET;
sin.sin_port = htons(port_opt.has_value() ? port_opt.value() : 0);
if (host == "*" || host == "0.0.0.0")
sin.sin_addr.s_addr = htonl(INADDR_ANY);
else if (inet_pton(AF_INET, host.c_str(), &sin.sin_addr) != 1)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "inet_pton failed for " + host);
if (bind(listen_fd, reinterpret_cast<sockaddr*>(&sin), sizeof(sin)) != 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "bind() failed to " + addr);
if (listen(listen_fd, 64) != 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "listen() failed on " + addr);
sockaddr_in actual{};
socklen_t actual_len = sizeof(actual);
if (getsockname(listen_fd, reinterpret_cast<sockaddr*>(&actual), &actual_len) != 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "getsockname() failed on " + addr);
const uint16_t bound_port = ntohs(actual.sin_port);
const std::string normalized_host = (host == "*") ? "0.0.0.0" : host;
const std::string bound_endpoint = "tcp://" + normalized_host + ":" + std::to_string(bound_port);
return {listen_fd, bound_endpoint};
}
int TCPStreamPusher::AcceptOne(int listen_fd, std::chrono::milliseconds timeout) {
pollfd pfd{};
pfd.fd = listen_fd;
pfd.events = POLLIN;
const int prc = poll(&pfd, 1, static_cast<int>(timeout.count()));
if (prc <= 0)
return -1;
if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0)
return -1;
return accept(listen_fd, nullptr, nullptr);
}
void TCPStreamPusher::CloseFd(std::atomic<int>& fd) {
int old_fd = fd.exchange(-1);
if (old_fd >= 0) {
shutdown(old_fd, SHUT_RDWR);
close(old_fd);
}
}
TCPStreamPusher::TCPStreamPusher(const std::string& addr,
size_t in_max_connections,
std::optional<int32_t> in_send_buffer_size)
: endpoint(addr),
max_connections(in_max_connections),
send_buffer_size(in_send_buffer_size) {
if (endpoint.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No TCP writer address provided");
if (max_connections == 0)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Max TCP connections cannot be zero");
auto [lfd, bound_endpoint] = OpenListenSocket(endpoint);
listen_fd.store(lfd);
endpoint = bound_endpoint;
acceptor_running = true;
acceptor_future = std::async(std::launch::async, &TCPStreamPusher::AcceptorThread, this);
keepalive_future = std::async(std::launch::async, &TCPStreamPusher::KeepaliveThread, this);
logger.Info("TCPStreamPusher listening on " + endpoint + " (max " + std::to_string(max_connections) + " connections)");
}
TCPStreamPusher::~TCPStreamPusher() {
// 1. Stop acceptor + keepalive threads (they take connections_mutex briefly)
acceptor_running = false;
int lfd = listen_fd.exchange(-1);
if (lfd >= 0) {
shutdown(lfd, SHUT_RDWR);
close(lfd);
}
if (acceptor_future.valid())
acceptor_future.get();
if (keepalive_future.valid())
keepalive_future.get();
// 2. Now no background threads touch connections_mutex. Tear down connections.
// We do NOT hold the mutex while joining futures, to avoid deadlock.
std::vector<std::shared_ptr<Connection>> local_connections;
{
std::lock_guard lg(connections_mutex);
local_connections = std::move(connections);
connections.clear();
session_connections.clear();
calibration_connection.reset();
}
for (auto& c : local_connections) {
if (c)
TearDownConnection(*c);
}
}
void TCPStreamPusher::TearDownConnection(Connection& c) {
StopDataCollectionThreads(c);
c.connected = false;
c.broken = true;
CloseFd(c.fd);
if (c.persistent_ack_future.valid())
c.persistent_ack_future.get();
}
bool TCPStreamPusher::IsConnectionAlive(const Connection& c) const {
if (c.broken)
return false;
const int local_fd = c.fd.load();
if (local_fd < 0)
return false;
pollfd pfd{};
pfd.fd = local_fd;
pfd.events = POLLOUT;
if (poll(&pfd, 1, 0) < 0)
return false;
if ((pfd.revents & (POLLHUP | POLLNVAL)) != 0)
return false;
int so_error = 0;
socklen_t len = sizeof(so_error);
if (getsockopt(local_fd, SOL_SOCKET, SO_ERROR, &so_error, &len) != 0)
return false;
return so_error == 0;
}
bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len) {
const auto* p = static_cast<const uint8_t*>(buf);
size_t sent = 0;
int64_t last_progress_ns = SteadyNowNs();
while (sent < len) {
const int local_fd = c.fd.load();
if (local_fd < 0 || c.broken)
return false;
const int64_t now_ns = SteadyNowNs();
// Treat backpressure as fatal only if the peer shows NO sign of life. A busy
// writer keeps refreshing last_peer_activity_ns via BUSY heartbeats / ACKs, so
// we wait through long stalls; only a silent peer trips this timeout.
const int64_t silent_ns = now_ns - c.last_peer_activity_ns.load(std::memory_order_relaxed);
if (silent_ns > std::chrono::duration_cast<std::chrono::nanoseconds>(peer_liveness_timeout).count()) {
logger.Warning("No liveness from writer on socket " + std::to_string(c.socket_number) +
" for " + std::to_string(silent_ns / 1000000) + " ms while sending; marking broken");
c.broken = true;
CloseFd(c.fd);
return false;
}
// Hard backpressure cap: a writer may keep heartbeating yet never drain (e.g. a
// permanently wedged filesystem). If no bytes leave the socket for this long, give
// up regardless of heartbeats so the run -- and its finalization -- cannot hang.
if (now_ns - last_progress_ns >
std::chrono::duration_cast<std::chrono::nanoseconds>(max_backpressure_timeout).count()) {
logger.Warning("No send progress to writer on socket " + std::to_string(c.socket_number) +
" for " + std::to_string((now_ns - last_progress_ns) / 1000000) +
" ms despite heartbeats; marking broken");
c.broken = true;
CloseFd(c.fd);
return false;
}
pollfd pfd{};
pfd.fd = local_fd;
pfd.events = POLLOUT;
const int prc = poll(&pfd, 1, static_cast<int>(send_poll_timeout.count()));
if (prc < 0) {
if (errno == EINTR)
continue;
c.broken = true;
CloseFd(c.fd);
return false;
}
if (prc == 0)
continue;
if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLOUT)) {
c.broken = true;
CloseFd(c.fd);
return false;
}
ssize_t rc = ::send(local_fd, p + sent, len - sent, MSG_NOSIGNAL);
if (rc < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
continue;
if (errno == EPIPE || errno == ECONNRESET || errno == ENOTCONN) {
c.broken = true;
CloseFd(c.fd);
}
return false;
}
sent += static_cast<size_t>(rc);
last_progress_ns = SteadyNowNs();
}
return true;
}
bool TCPStreamPusher::SendFrame(Connection& c, const uint8_t* data, size_t size, TCPFrameType type, int64_t image_number) {
TcpFrameHeader h{};
h.type = static_cast<uint16_t>(type);
h.payload_size = size;
h.image_number = image_number >= 0 ? static_cast<uint64_t>(image_number) : 0;
h.socket_number = c.socket_number;
h.run_number = run_number;
if (!SendAll(c, &h, sizeof(h)))
return false;
if (size > 0 && !SendAll(c, data, size))
return false;
return true;
}
bool TCPStreamPusher::ReadExact(Connection& c, void* buf, size_t len) {
auto* p = static_cast<uint8_t*>(buf);
size_t got = 0;
while (got < len) {
if (!c.active)
return false;
const int local_fd = c.fd.load();
if (local_fd < 0)
return false;
pollfd pfd{};
pfd.fd = local_fd;
pfd.events = POLLIN;
const int prc = poll(&pfd, 1, 100);
if (prc == 0)
continue;
if (prc < 0) {
if (errno == EINTR)
continue;
return false;
}
if ((pfd.revents & (POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLIN))
return false;
if ((pfd.revents & POLLIN) == 0)
continue;
ssize_t rc = ::recv(local_fd, p + got, len - got, 0);
if (rc == 0)
return false;
if (rc < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
continue;
return false;
}
got += static_cast<size_t>(rc);
}
return true;
}
bool TCPStreamPusher::ReadExactPersistent(Connection& c, void* buf, size_t len) {
auto* p = static_cast<uint8_t*>(buf);
size_t got = 0;
while (got < len) {
if (!c.connected || c.broken)
return false;
const int local_fd = c.fd.load();
if (local_fd < 0)
return false;
pollfd pfd{};
pfd.fd = local_fd;
pfd.events = POLLIN;
const int prc = poll(&pfd, 1, 500);
if (prc == 0)
continue;
if (prc < 0) {
if (errno == EINTR)
continue;
return false;
}
// Only bail if the peer hung up / the fd is invalid and there is no readable
// data left to drain.
if ((pfd.revents & (POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLIN))
return false;
if ((pfd.revents & POLLIN) == 0)
continue;
ssize_t rc = ::recv(local_fd, p + got, len - got, 0);
if (rc == 0)
return false; // Peer closed connection
if (rc < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
continue;
return false;
}
got += static_cast<size_t>(rc);
}
return true;
}
void TCPStreamPusher::WriterThread(Connection* c) {
// Drain the queue until the end sentinel. We deliberately do NOT stop on c->active
// alone: StopDataCollectionThreads() clears active and then pushes the sentinel, and
// every DATA frame enqueued before that sentinel must still be sent. Otherwise END
// (sent by EndDataCollection after stopping this thread) could overtake images that
// are merely waiting in the queue, and the writer would finalize before receiving them.
while (true) {
auto e = c->queue.GetBlocking();
if (e.end)
break;
if (!e.z)
continue;
if (c->broken) {
e.z->release();
continue;
}
bool ok;
{
std::unique_lock ul(c->send_mutex);
ok = SendFrame(*c,
static_cast<const uint8_t*>(e.z->GetImage()),
e.z->GetImageSize(),
TCPFrameType::DATA,
e.z->GetImageNumber());
}
// Synchronous send: once send() returns, the payload has been copied into the
// kernel socket buffer, so the image-buffer slot can be returned immediately.
e.z->release();
if (!ok) {
c->broken = true;
logger.Error("TCP send failed on socket " + std::to_string(c->socket_number));
}
}
}
void TCPStreamPusher::PersistentAckThread(Connection* c) {
while (c->connected && !c->broken) {
TcpFrameHeader h{};
if (!ReadExactPersistent(*c, &h, sizeof(h))) {
if (c->connected && !c->broken) {
c->broken = true;
logger.Info("Persistent connection lost on socket " + std::to_string(c->socket_number));
}
break;
}
if (h.magic != JFJOCH_TCP_MAGIC || h.version != JFJOCH_TCP_VERSION) {
c->broken = true;
logger.Error("Invalid frame on persistent connection, socket " + std::to_string(c->socket_number));
break;
}
// Any well-formed frame from the peer is a sign of life.
c->last_peer_activity_ns.store(SteadyNowNs(), std::memory_order_relaxed);
const auto frame_type = static_cast<TCPFrameType>(h.type);
// Keepalive pong from the writer
if (frame_type == TCPFrameType::KEEPALIVE) {
c->last_keepalive_recv = std::chrono::steady_clock::now();
if (h.payload_size > 0) {
std::vector<uint8_t> discard(h.payload_size);
ReadExactPersistent(*c, discard.data(), discard.size());
}
continue;
}
// Busy/backpressure heartbeat: the writer is alive but stalled. The liveness
// timestamp update above is all we need; just record reported FIFO occupancy.
if (frame_type == TCPFrameType::BUSY) {
c->last_ack_fifo_occupancy.store(h.ack_fifo_occupancy, std::memory_order_relaxed);
if (h.payload_size > 0) {
std::vector<uint8_t> discard(h.payload_size);
ReadExactPersistent(*c, discard.data(), discard.size());
}
continue;
}
if (frame_type != TCPFrameType::ACK) {
c->broken = true;
logger.Error("Unexpected frame type " + std::to_string(h.type) + " on socket " + std::to_string(c->socket_number));
break;
}
c->last_ack_fifo_occupancy.store(h.ack_fifo_occupancy, std::memory_order_relaxed);
// ACK frame — forward to data-collection ack logic
std::string error_text;
if (h.payload_size > 0) {
error_text.resize(h.payload_size);
if (!ReadExactPersistent(*c, error_text.data(), error_text.size())) {
c->broken = true;
break;
}
}
const auto ack_for = static_cast<TCPFrameType>(h.ack_for);
const bool ok = (h.flags & TCP_ACK_FLAG_OK) != 0;
const bool fatal = (h.flags & TCP_ACK_FLAG_FATAL) != 0;
const auto code = static_cast<TCPAckCode>(h.ack_code);
// Validate run number: discard stale ACKs from a previous run on a persistent connection
if (h.run_number != run_number) {
logger.Warning("Discarding ACK with stale run_number " + std::to_string(h.run_number)
+ " (expected " + std::to_string(run_number) + ") on socket "
+ std::to_string(c->socket_number));
continue;
}
{
std::unique_lock ul(c->ack_mutex);
c->last_ack_code = code;
if (!error_text.empty())
c->last_ack_error = error_text;
if (ack_for == TCPFrameType::START) {
c->start_ack_received = true;
c->start_ack_ok = ok;
if (!ok && error_text.empty())
c->last_ack_error = "START rejected";
} else if (ack_for == TCPFrameType::END) {
c->end_ack_received = true;
c->end_ack_ok = ok;
if (!ok && error_text.empty())
c->last_ack_error = "END rejected";
} else if (ack_for == TCPFrameType::CANCEL) {
c->cancel_ack_received = true;
c->cancel_ack_ok = ok;
if (!ok && error_text.empty())
c->last_ack_error = "CANCEL rejected";
} else if (ack_for == TCPFrameType::DATA) {
c->data_acked_total.fetch_add(1, std::memory_order_relaxed);
total_data_acked_total.fetch_add(1, std::memory_order_relaxed);
if (ok && !fatal) {
c->data_acked_ok.fetch_add(1, std::memory_order_relaxed);
total_data_acked_ok.fetch_add(1, std::memory_order_relaxed);
} else {
c->data_acked_bad.fetch_add(1, std::memory_order_relaxed);
total_data_acked_bad.fetch_add(1, std::memory_order_relaxed);
c->data_ack_error_reported = true;
if (!error_text.empty()) {
c->data_ack_error_text = error_text;
} else if (c->data_ack_error_text.empty()) {
c->data_ack_error_text = "DATA ACK failed";
}
}
}
}
c->ack_cv.notify_all();
}
// If the connection broke, also wake up anyone waiting for an ACK
c->ack_cv.notify_all();
}
void TCPStreamPusher::AcceptorThread() {
uint32_t next_socket_number = 0;
while (acceptor_running) {
int lfd = listen_fd.load();
if (lfd < 0)
break;
int new_fd = AcceptOne(lfd, std::chrono::milliseconds(500));
if (new_fd < 0)
continue;
std::lock_guard lg(connections_mutex);
RemoveDeadConnections();
if (connections.size() >= max_connections) {
logger.Warning("Max connections (" + std::to_string(max_connections) +
") reached, rejecting new connection");
shutdown(new_fd, SHUT_RDWR);
close(new_fd);
continue;
}
SetupNewConnection(new_fd, next_socket_number++);
logger.Info("Accepted writer connection (socket_number=" + std::to_string(next_socket_number - 1) +
", total=" + std::to_string(connections.size()) + ")");
}
}
void TCPStreamPusher::SetupNewConnection(int new_fd, uint32_t socket_number) {
auto c = std::make_shared<Connection>(send_queue_size);
c->socket_number = socket_number;
c->fd.store(new_fd);
int one = 1;
setsockopt(new_fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
// Enable OS-level TCP keep-alive
setsockopt(new_fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
int idle = 30; // longer than app-level keepalive to avoid interference
int intvl = 10;
int cnt = 3;
setsockopt(new_fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(idle));
setsockopt(new_fd, IPPROTO_TCP, TCP_KEEPINTVL, &intvl, sizeof(intvl));
setsockopt(new_fd, IPPROTO_TCP, TCP_KEEPCNT, &cnt, sizeof(cnt));
if (send_buffer_size)
setsockopt(new_fd, SOL_SOCKET, SO_SNDBUF, &send_buffer_size.value(), sizeof(int32_t));
c->connected = true;
c->broken = false;
auto now = std::chrono::steady_clock::now();
c->last_keepalive_sent = now;
c->last_keepalive_recv = now;
c->last_peer_activity_ns.store(SteadyNowNs(), std::memory_order_relaxed);
auto* raw = c.get();
c->persistent_ack_future = std::async(std::launch::async, &TCPStreamPusher::PersistentAckThread, this, raw);
connections.emplace_back(std::move(c));
}
void TCPStreamPusher::RemoveDeadConnections() {
// Must be called with connections_mutex held.
// We move dead connections out, release the mutex implicitly (caller still holds it),
// then join their futures. Actually — we can join right here since PersistentAckThread
// doesn't take connections_mutex, so no deadlock.
auto it = connections.begin();
while (it != connections.end()) {
auto c = *it;
if (c->broken || !c->connected || !IsConnectionAlive(*c)) {
c->connected = false;
c->broken = true;
StopDataCollectionThreads(*c);
CloseFd(c->fd);
if (c->persistent_ack_future.valid())
c->persistent_ack_future.get();
logger.Info("Removed dead connection (socket_number=" + std::to_string(c->socket_number) + ")");
it = connections.erase(it);
} else {
++it;
}
}
}
void TCPStreamPusher::KeepaliveThread() {
while (acceptor_running) {
// Sleep in small increments so we can exit promptly
for (int i = 0; i < 50 && acceptor_running; ++i)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (!acceptor_running)
break;
// During data collection, the data flow itself serves as heartbeat
if (data_collection_active)
continue;
std::lock_guard lg(connections_mutex);
for (auto& cptr : connections) {
auto& c = *cptr;
if (c.broken || !c.connected)
continue;
std::unique_lock ul(c.send_mutex);
if (!SendFrame(c, nullptr, 0, TCPFrameType::KEEPALIVE, -1)) {
logger.Warning("Keepalive send failed on socket " + std::to_string(c.socket_number));
c.broken = true;
} else {
c.last_keepalive_sent = std::chrono::steady_clock::now();
}
}
RemoveDeadConnections();
}
}
size_t TCPStreamPusher::GetConnectedWriters() const {
std::lock_guard lg(connections_mutex);
size_t count = 0;
for (const auto& c : connections) {
if (c->connected && !c->broken)
++count;
}
return count;
}
void TCPStreamPusher::StartDataCollectionThreads(Connection& c) {
{
std::unique_lock ul(c.ack_mutex);
c.start_ack_received = false;
c.start_ack_ok = false;
c.end_ack_received = false;
c.end_ack_ok = false;
c.cancel_ack_received = false;
c.cancel_ack_ok = false;
c.last_ack_error.clear();
c.last_ack_code = TCPAckCode::None;
c.data_ack_error_reported = false;
c.data_ack_error_text.clear();
}
c.data_acked_ok.store(0, std::memory_order_relaxed);
c.data_acked_bad.store(0, std::memory_order_relaxed);
c.data_acked_total.store(0, std::memory_order_relaxed);
c.active = true;
c.writer_future = std::async(std::launch::async, &TCPStreamPusher::WriterThread, this, &c);
}
void TCPStreamPusher::StopDataCollectionThreads(Connection& c) {
if (!c.active)
return;
c.active = false;
// Avoid potential shutdown deadlock if queue is full and writer is stalled.
if (!c.queue.PutTimeout({.end = true}, std::chrono::milliseconds(200))) {
c.broken = true;
CloseFd(c.fd);
// active==false makes WriterThread exit without draining, so return any
// queued-but-unsent slots to the image buffer pool ourselves.
ImagePusherQueueElement e{};
while (c.queue.Get(e)) {
if (e.z)
e.z->release();
}
(void)c.queue.Put({.end = true});
}
c.ack_cv.notify_all();
if (c.writer_future.valid())
c.writer_future.get();
}
bool TCPStreamPusher::WaitForAck(Connection& c, TCPFrameType ack_for, std::chrono::milliseconds timeout, std::string* error_text) {
std::unique_lock ul(c.ack_mutex);
const bool ok = c.ack_cv.wait_for(ul, timeout, [&] {
if (ack_for == TCPFrameType::START)
return c.start_ack_received || c.broken.load();
if (ack_for == TCPFrameType::END)
return c.end_ack_received || c.broken.load();
if (ack_for == TCPFrameType::CANCEL)
return c.cancel_ack_received || c.broken.load();
return false;
});
if (!ok) {
if (error_text) *error_text = "ACK timeout";
return false;
}
if (c.broken) {
if (error_text) *error_text = c.last_ack_error.empty() ? "Socket broken" : c.last_ack_error;
return false;
}
bool ack_ok = false;
if (ack_for == TCPFrameType::START) ack_ok = c.start_ack_ok;
if (ack_for == TCPFrameType::END) ack_ok = c.end_ack_ok;
if (ack_for == TCPFrameType::CANCEL) ack_ok = c.cancel_ack_ok;
if (!ack_ok && error_text)
*error_text = c.last_ack_error.empty() ? "ACK rejected" : c.last_ack_error;
return ack_ok;
}
bool TCPStreamPusher::WaitForEndAck(Connection& c, std::chrono::milliseconds liveness_timeout, std::string* error_text) {
std::unique_lock ul(c.ack_mutex);
// After END is sent the writer may still be flushing a backlog to a slow filesystem.
// It keeps proving it is alive via BUSY heartbeats and DATA ACKs (both refresh
// last_peer_activity_ns), so we wait through arbitrarily long stalls and only give up
// if the peer falls completely silent for the whole liveness window.
while (!c.end_ack_received && !c.broken) {
c.ack_cv.wait_for(ul, liveness_timeout, [&] {
return c.end_ack_received || c.broken.load();
});
if (c.end_ack_received || c.broken)
break;
const int64_t silent_ns = SteadyNowNs() - c.last_peer_activity_ns.load(std::memory_order_relaxed);
if (silent_ns > std::chrono::duration_cast<std::chrono::nanoseconds>(liveness_timeout).count()) {
if (error_text) *error_text = "END ACK timeout (writer silent)";
return false;
}
}
if (c.broken) {
if (error_text) *error_text = c.last_ack_error.empty() ? "Socket broken" : c.last_ack_error;
return false;
}
if (!c.end_ack_ok && error_text)
*error_text = c.last_ack_error.empty() ? "END ACK rejected" : c.last_ack_error;
return c.end_ack_ok;
}
void TCPStreamPusher::StartDataCollection(StartMessage& message) {
if (message.images_per_file < 1)
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Images per file cannot be zero or negative");
images_per_file = message.images_per_file;
run_number = message.run_number;
run_name = message.run_name;
transmission_error = false;
total_data_acked_ok.store(0, std::memory_order_relaxed);
total_data_acked_bad.store(0, std::memory_order_relaxed);
total_data_acked_total.store(0, std::memory_order_relaxed);
std::vector<std::shared_ptr<Connection>> local_connections;
{
std::lock_guard lg(connections_mutex);
for (auto& c : connections)
StopDataCollectionThreads(*c);
RemoveDeadConnections();
if (connections.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No writers connected to " + endpoint);
session_connections = connections;
calibration_connection = session_connections[0];
local_connections = session_connections;
}
logger.Info("Starting data collection with " + std::to_string(local_connections.size()) + " connected writers");
data_collection_active = true;
for (auto& c : local_connections)
StartDataCollectionThreads(*c);
std::vector<bool> started(local_connections.size(), false);
auto rollback_cancel = [&]() {
for (size_t i = 0; i < local_connections.size(); i++) {
auto& c = *local_connections[i];
if (!started[i] || c.broken)
continue;
std::unique_lock ul(c.send_mutex);
(void)SendFrame(c, nullptr, 0, TCPFrameType::CANCEL, -1);
std::string cancel_ack_err;
(void)WaitForAck(c, TCPFrameType::CANCEL, std::chrono::milliseconds(500), &cancel_ack_err);
}
for (auto& c : local_connections)
StopDataCollectionThreads(*c);
{
std::lock_guard lg(connections_mutex);
session_connections.clear();
calibration_connection.reset();
}
data_collection_active = false;
};
for (size_t i = 0; i < local_connections.size(); i++) {
auto& c = *local_connections[i];
message.socket_number = static_cast<int64_t>(c.socket_number);
message.write_master_file = (i == 0);
serializer.SerializeSequenceStart(message);
{
std::unique_lock ul(c.send_mutex);
if (!SendFrame(c, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::START, -1)) {
rollback_cancel();
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"Timeout/failure sending START on socket " + std::to_string(c.socket_number));
}
}
std::string ack_err;
if (!WaitForAck(c, TCPFrameType::START, std::chrono::seconds(5), &ack_err)) {
rollback_cancel();
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid,
"START ACK failed on socket " + std::to_string(c.socket_number) + ": " + ack_err);
}
started[i] = true;
}
}
bool TCPStreamPusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) {
std::shared_ptr<Connection> target;
size_t conn_count = 0;
{
std::lock_guard lg(connections_mutex);
const auto& use = (!session_connections.empty() ? session_connections : connections);
if (use.empty())
return false;
conn_count = use.size();
auto idx = static_cast<size_t>((image_number / images_per_file) % static_cast<int64_t>(conn_count));
target = use[idx];
}
auto& c = *target;
if (c.broken || !IsConnectionAlive(c))
return false;
std::unique_lock ul(c.send_mutex);
return SendFrame(c, image_data, image_size, TCPFrameType::DATA, image_number);
}
bool TCPStreamPusher::SendImage(ZeroCopyReturnValue &z) {
// Look up the target connection while holding the mutex, but do NOT call
// PutBlocking while holding it — that can block indefinitely and deadlock
// against AcceptorThread/KeepaliveThread.
std::shared_ptr<Connection> target;
{
std::lock_guard lg(connections_mutex);
const auto& use = (!session_connections.empty() ? session_connections : connections);
if (use.empty()) {
z.release();
return false;
}
auto idx = static_cast<size_t>((z.GetImageNumber() / images_per_file) % static_cast<int64_t>(use.size()));
target = use[idx];
}
if (!target) {
z.release();
return false;
}
// A full queue means the writer is applying backpressure (e.g. briefly stalled on a
// freshly-created file at startup). Keep waiting as long as it proves it is alive —
// BUSY heartbeats / ACKs refresh last_peer_activity_ns from a thread independent of
// its stalled write path — so a slow-but-healthy writer is held back, not dropped.
// Only a peer that has gone completely silent for the liveness window is declared dead;
// a writer that heartbeats but never drains is caught by SendAll's max_backpressure cap.
while (!target->broken && target->active) {
if (target->queue.PutTimeout(ImagePusherQueueElement{
.z = &z,
.end = false
}, std::chrono::milliseconds(50)))
return true;
const int64_t silent_ns = SteadyNowNs() - target->last_peer_activity_ns.load(std::memory_order_relaxed);
if (silent_ns > std::chrono::duration_cast<std::chrono::nanoseconds>(peer_liveness_timeout).count()) {
logger.Warning("No liveness from writer on socket " + std::to_string(target->socket_number) +
" for " + std::to_string(silent_ns / 1000000) + " ms while enqueuing; marking broken");
target->broken = true;
break;
}
}
z.release();
return false;
}
bool TCPStreamPusher::EndDataCollection(const EndMessage& message) {
serializer.SerializeSequenceEnd(message);
bool ret = true;
std::vector<std::shared_ptr<Connection>> local_connections;
{
std::lock_guard lg(connections_mutex);
local_connections = (!session_connections.empty() ? session_connections : connections);
}
for (auto& cptr : local_connections) {
auto& c = *cptr;
// Flush all queued DATA before sending END: stopping the writer thread drains the
// send queue and joins it, so every image is on the wire before END goes out. This
// prevents END from overtaking images still waiting in the queue when the writer
// thread has fallen behind (e.g. on a loaded machine), which would make the remote
// writer finalize early and silently drop the trailing images.
StopDataCollectionThreads(c);
if (c.broken) {
ret = false;
continue;
}
{
std::unique_lock ul(c.send_mutex);
if (!SendFrame(c, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::END, -1)) {
ret = false;
continue;
}
}
std::string ack_err;
if (!WaitForEndAck(c, peer_liveness_timeout, &ack_err))
ret = false;
}
{
std::lock_guard lg(connections_mutex);
session_connections.clear();
calibration_connection.reset();
}
data_collection_active = false;
transmission_error = !ret;
return ret;
}
bool TCPStreamPusher::SendCalibration(const CompressedImage& message) {
std::shared_ptr<Connection> target;
{
std::lock_guard lg(connections_mutex);
if (!data_collection_active) {
logger.Error("Refusing to send TCP calibration: no active run");
return false;
}
if (!calibration_connection) {
logger.Error("Refusing to send TCP calibration: calibration target is not set");
return false;
}
target = calibration_connection;
}
if (!target) {
logger.Error("Refusing to send TCP calibration: calibration target is null");
return false;
}
if (target->broken) {
logger.Error("Refusing to send TCP calibration: target socket {} is marked broken", target->socket_number);
return false;
}
if (!target->connected) {
logger.Error("Refusing to send TCP calibration: target socket {} is not connected", target->socket_number);
return false;
}
if (!target->active) {
logger.Error("Refusing to send TCP calibration: target socket {} is not active in current session", target->socket_number);
return false;
}
serializer.SerializeCalibration(message);
std::unique_lock ul(target->send_mutex);
if (target->broken) {
logger.Error("Refusing to send TCP calibration: target socket {} became broken before send", target->socket_number);
return false;
}
if (!target->connected) {
logger.Error("Refusing to send TCP calibration: target socket {} disconnected before send", target->socket_number);
return false;
}
if (!target->active) {
logger.Error("Refusing to send TCP calibration: target socket {} became inactive before send", target->socket_number);
return false;
}
if (!IsConnectionAlive(*target)) {
logger.Error("Refusing to send TCP calibration: target socket {} is not alive", target->socket_number);
target->broken = true;
return false;
}
const bool ok = SendFrame(*target,
serialization_buffer.data(),
serializer.GetBufferSize(),
TCPFrameType::CALIBRATION,
-1);
if (!ok) {
logger.Error("Failed to send TCP calibration on socket {}", target->socket_number);
target->broken = true;
return false;
}
return true;
}
std::string TCPStreamPusher::Finalize() {
std::string ret;
if (transmission_error)
ret += "Timeout sending images (e.g., writer disabled during data collection);";
std::lock_guard lg(connections_mutex);
for (size_t i = 0; i < connections.size(); i++) {
auto& c = *connections[i];
{
std::unique_lock ul(c.ack_mutex);
if (c.data_ack_error_reported && !c.data_ack_error_text.empty()) {
ret += "Writer " + std::to_string(c.socket_number) + ": " + c.data_ack_error_text + ";";
} else if (!c.last_ack_error.empty()) {
ret += "Writer " + std::to_string(c.socket_number) + ": " + c.last_ack_error + ";";
}
}
}
return ret;
}
std::string TCPStreamPusher::PrintSetup() const {
return "TCPStreamPusher: endpoint=" + endpoint +
" max_connections=" + std::to_string(max_connections) +
" connected=" + std::to_string(GetConnectedWriters());
}
std::optional<uint64_t> TCPStreamPusher::GetImagesWritten() const {
return total_data_acked_ok.load(std::memory_order_relaxed);
}
std::optional<uint64_t> TCPStreamPusher::GetImagesWriteError() const {
return total_data_acked_bad.load(std::memory_order_relaxed);
}
std::vector<int64_t> TCPStreamPusher::GetWriterFifoUtilization() const {
std::vector<int64_t> ret;
std::lock_guard lg(connections_mutex);
ret.reserve(connections.size());
for (const auto& c : connections) {
ret.push_back(c->last_ack_fifo_occupancy.load(std::memory_order_relaxed));
}
return ret;
}