Files
Jungfraujoch/image_pusher/TCPStreamPusher.h

190 lines
7.6 KiB
C++

// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#pragma once
#include <atomic>
#include <future>
#include <mutex>
#include <optional>
#include <string>
#include <vector>
#include <condition_variable>
#include <deque>
#include "ImagePusher.h"
#include "ZMQWriterNotificationPuller.h"
#include "../common/ThreadSafeFIFO.h"
#include "../common/Logger.h"
#include "../common/JfjochTCP.h"
#include "../frame_serialize/CBORStream2Serializer.h"
/// TCP-based image stream pusher with persistent connection pool.
///
/// Threading model:
/// - AcceptorThread: accepts new TCP connections, holds connections_mutex briefly
/// - KeepaliveThread: sends periodic keepalive frames when idle (skipped during data collection)
/// - Per-connection WriterThread: drains the connection's queue, sends DATA frames
/// - Per-connection PersistentAckThread: reads ACKs and keepalive pongs from the peer
/// - Per-connection ZeroCopyCompletionThread: processes kernel MSG_ZEROCOPY completions
///
/// Lock ordering: connections_mutex → send_mutex → ack_mutex → zc_mutex
/// IMPORTANT: Never call blocking queue operations while holding connections_mutex.
///
/// Concurrency contract:
/// - StartDataCollection, EndDataCollection, SendCalibration, and Finalize
/// are called from a single control thread in a serialized manner.
/// - SendImage may be called concurrently from multiple threads between
/// StartDataCollection and EndDataCollection.
/// - SendCalibration is called between StartDataCollection and SendImage calls.
/// - GetConnectedWriters, GetImagesWritten, and PrintSetup are safe to call at any time.
class TCPStreamPusher : public ImagePusher {
struct Connection {
explicit Connection(size_t queue_size) : queue(queue_size) {}
std::atomic<int> fd{-1};
uint32_t socket_number = 0;
std::atomic<bool> active{false}; // data-collection threads running
std::atomic<bool> broken{false};
std::atomic<bool> connected{false}; // persistent connection is alive
std::atomic<bool> zerocopy_enabled{false};
ThreadSafeFIFO<ImagePusherQueueElement> queue;
std::future<void> writer_future;
std::future<void> zc_future;
// Persistent ack/keepalive reader (runs as long as the connection is alive)
std::future<void> persistent_ack_future;
std::mutex send_mutex;
std::mutex ack_mutex;
std::condition_variable ack_cv;
struct PendingZC {
uint32_t first_id = 0;
uint32_t last_id = 0;
ZeroCopyReturnValue* z = nullptr;
};
std::mutex zc_mutex;
std::condition_variable zc_cv;
std::deque<PendingZC> zc_pending;
uint32_t zc_next_id = 0;
uint32_t zc_completed_id = std::numeric_limits<uint32_t>::max();
bool start_ack_received = false;
bool start_ack_ok = false;
bool end_ack_received = false;
bool end_ack_ok = false;
bool cancel_ack_received = false;
bool cancel_ack_ok = false;
std::string last_ack_error;
std::atomic<TCPAckCode> last_ack_code{TCPAckCode::None};
// Soft writer failure reported via DATA ACK (do not break stream on this alone)
std::atomic<bool> data_ack_error_reported{false};
std::string data_ack_error_text;
std::atomic<uint64_t> data_sent{0};
std::atomic<uint64_t> data_acked_ok{0};
std::atomic<uint64_t> data_acked_bad{0};
std::atomic<uint64_t> data_acked_total{0};
std::chrono::steady_clock::time_point last_keepalive_sent{};
std::chrono::steady_clock::time_point last_keepalive_recv{};
};
std::vector<uint8_t> serialization_buffer;
CBORStream2Serializer serializer;
std::string endpoint;
size_t max_connections;
std::optional<int32_t> send_buffer_size;
size_t send_queue_size = 128;
// Persistent connection pool, guarded by connections_mutex.
// IMPORTANT: never call PutBlocking/GetBlocking on a queue while holding this mutex.
mutable std::mutex connections_mutex;
std::vector<std::shared_ptr<Connection>> connections;
std::vector<std::shared_ptr<Connection>> session_connections;
// Acceptor thread state
std::atomic<int> listen_fd{-1};
std::atomic<bool> acceptor_running{false};
std::future<void> acceptor_future;
std::future<void> keepalive_future;
std::chrono::milliseconds send_poll_timeout{250};
std::chrono::milliseconds send_total_timeout{3000};
int64_t images_per_file = 1;
uint64_t run_number = 0;
std::string run_name;
std::atomic<bool> transmission_error = false;
std::atomic<bool> data_collection_active{false};
std::atomic<uint64_t> total_data_acked_ok{0};
std::atomic<uint64_t> total_data_acked_bad{0};
std::atomic<uint64_t> total_data_acked_total{0};
Logger logger{"TCPStreamPusher"};
static std::pair<std::string, std::optional<uint16_t>> ParseTcpAddress(const std::string& addr);
static std::pair<int, std::string> OpenListenSocket(const std::string& addr);
static int AcceptOne(int listen_fd, std::chrono::milliseconds timeout);
static void CloseFd(std::atomic<int>& fd);
bool IsConnectionAlive(const Connection& c) const;
bool SendAll(Connection& c, const void* buf, size_t len, bool allow_zerocopy,
bool* zc_used = nullptr, uint32_t* zc_first = nullptr, uint32_t* zc_last = nullptr);
bool ReadExact(Connection& c, void* buf, size_t len);
bool ReadExactPersistent(Connection& c, void* buf, size_t len);
bool SendFrame(Connection& c, const uint8_t* data, size_t size, TCPFrameType type, int64_t image_number, ZeroCopyReturnValue* z);
void WriterThread(Connection* c);
void PersistentAckThread(Connection* c);
void ZeroCopyCompletionThread(Connection* c);
void AcceptorThread();
void KeepaliveThread();
void SetupNewConnection(int new_fd, uint32_t socket_number);
void RemoveDeadConnections();
void TearDownConnection(Connection& c);
void StartDataCollectionThreads(Connection& c);
void StopDataCollectionThreads(Connection& c);
void EnqueueZeroCopyPending(Connection& c, ZeroCopyReturnValue* z, uint32_t first_id, uint32_t last_id);
void ReleaseCompletedZeroCopy(Connection& c);
void ForceReleasePendingZeroCopy(Connection& c);
bool WaitForZeroCopyDrain(Connection& c, std::chrono::milliseconds timeout);
bool WaitForAck(Connection& c, TCPFrameType ack_for, std::chrono::milliseconds timeout, std::string* error_text);
public:
explicit TCPStreamPusher(const std::string& addr,
size_t in_max_connections,
std::optional<int32_t> in_send_buffer_size = {});
~TCPStreamPusher() override;
std::vector<std::string> GetAddress() const override { return {endpoint}; }
/// Returns the number of currently connected writers (can be called at any time)
size_t GetConnectedWriters() const override;
void StartDataCollection(StartMessage& message) override;
bool EndDataCollection(const EndMessage& message) override;
bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override;
void SendImage(ZeroCopyReturnValue &z) override;
bool SendCalibration(const CompressedImage& message) override;
std::string Finalize() override;
std::string PrintSetup() const override;
std::optional<uint64_t> GetImagesWritten() const override;
std::optional<uint64_t> GetImagesWriteError() const override;
ImagePusherType GetType() const override { return ImagePusherType::TCP; }
};