Files
Jungfraujoch/image_puller/TCPImagePuller.h
T
leonarski_f 97c3008c39
Build Packages / Unit tests (push) Successful in 56m25s
Build Packages / DIALS test (push) Successful in 13m9s
Build Packages / XDS test (durin plugin) (push) Successful in 9m34s
Build Packages / XDS test (JFJoch plugin) (push) Successful in 9m37s
Build Packages / XDS test (neggia plugin) (push) Successful in 5m57s
Build Packages / Generate python client (push) Successful in 14s
Build Packages / Build documentation (push) Successful in 40s
Build Packages / Create release (push) Skipped
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 9m57s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 9m47s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 10m11s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 9m57s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 10m20s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 10m41s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 10m19s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 9m42s
Build Packages / build:rpm (rocky8) (push) Successful in 11m4s
Build Packages / build:rpm (rocky9) (push) Successful in 11m43s
TCP stream: tolerate writer backpressure via BUSY heartbeats
A slow filesystem could stall the writer's consume pipeline, propagating
TCP backpressure to the pusher. The pusher then treated that backpressure
as a dead peer and force-closed the connection mid-run; the writer
reconnected as a brand-new connection outside the active session, so the
rest of the run was silently dropped and the half-written HDF5 file was
later finalized with holes.

Replace the throughput/progress-based send timeout with a peer-liveness
timeout:

- Add TCPFrameType::BUSY (wire version 2 -> 3).
- TCPImagePuller runs a heartbeat thread that sends BUSY every 250ms on a
  thread independent of the (possibly stalled) write path, so liveness
  keeps flowing during deep stalls. A send_mutex serializes
  ACK/pong/heartbeat writes.
- TCPStreamPusher refreshes last_peer_activity_ns on every inbound frame
  and only declares a connection dead after peer_liveness_timeout (15s) of
  complete silence, tolerating arbitrarily long backpressure while still
  catching a genuinely dead peer (and immediate EPIPE/ECONNRESET).
- Re-key both backpressure waits (the SendAll data path and the post-END
  WaitForEndAck) onto this liveness signal instead of byte-progress /
  DATA-ACK-progress, so a slow final flush at END is tolerated too.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 16:55:59 +02:00

62 lines
1.9 KiB
C++

// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#pragma once
#include <thread>
#include <atomic>
#include <mutex>
#include <memory>
#include "ImagePuller.h"
#include "../common/Logger.h"
#include "../common/ThreadSafeFIFO.h"
class TCPImagePuller : public ImagePuller {
int fd = -1;
std::mutex fd_mutex;
std::string addr;
std::string host;
uint16_t port = 0;
std::optional<int32_t> receive_buffer_size;
std::atomic<bool> disconnect{false};
ThreadSafeFIFO<ImagePullerOutput> cbor_fifo{200};
ThreadSafeFIFO<ImagePullerOutput> repub_fifo{200};
std::unique_ptr<ZMQSocket> repub_socket;
std::thread receiver_thread;
std::thread cbor_thread;
std::thread repub_thread;
std::thread heartbeat_thread;
// Serializes all writer->pusher sends (ACKs, keepalive pongs, busy heartbeats)
// so that concurrently-sent frames cannot interleave on the socket.
std::mutex send_mutex;
Logger logger{"TCPImagePuller"};
static constexpr uint32_t default_repub_watermark = 220;
static constexpr auto RepubTimeout = std::chrono::milliseconds(100);
static constexpr auto HeartbeatInterval = std::chrono::milliseconds(250);
bool ReadExact(void *buf, size_t size);
bool SendAll(const void *buf, size_t len);
bool EnsureConnected();
void CloseSocket();
void ReceiverThread();
void CBORThread();
void RepubThread();
void HeartbeatThread();
public:
explicit TCPImagePuller(const std::string &tcp_addr,
std::optional<int32_t> rcv_buffer_size = {},
const std::string &repub_address = "",
const std::optional<int32_t> &repub_watermark = {});
~TCPImagePuller() override;
bool SupportsAck() const override { return true; }
bool SendAck(const PullerAckMessage &ack) override;
void Disconnect() override;
};