Files
Jungfraujoch/image_pusher/TCPStreamPusherSocket.h
Filip Leonarski fbdc0423da
Some checks failed
Build Packages / build:rpm (rocky8_nocuda) (push) Has been cancelled
Build Packages / build:rpm (rocky9_nocuda) (push) Has been cancelled
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Has been cancelled
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Has been cancelled
Build Packages / build:rpm (rocky8_sls9) (push) Has been cancelled
Build Packages / build:rpm (rocky9_sls9) (push) Has been cancelled
Build Packages / build:rpm (rocky8) (push) Has been cancelled
Build Packages / build:rpm (rocky9) (push) Has been cancelled
Build Packages / build:rpm (ubuntu2204) (push) Has been cancelled
Build Packages / build:rpm (ubuntu2404) (push) Has been cancelled
Build Packages / Generate python client (push) Has been cancelled
Build Packages / Build documentation (push) Has been cancelled
Build Packages / Unit tests (push) Has been cancelled
Build Packages / Create release (push) Has been cancelled
Implement raw TCP/IP in jfjoch_broker and jfjoch_writer
2026-03-01 19:45:16 +01:00

85 lines
2.5 KiB
C++

// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#pragma once
#include <atomic>
#include <future>
#include <mutex>
#include <deque>
#include <unordered_map>
#include <optional>
#include <string>
#include "ImagePusher.h"
#include "../common/ThreadSafeFIFO.h"
#include "../common/Logger.h"
#include "../common/JfjochTCP.h"
class TCPStreamPusherSocket {
struct InflightZC {
ZeroCopyReturnValue *z = nullptr;
uint64_t tx_id = 0;
};
std::mutex send_mutex;
std::atomic<bool> active = false;
std::future<void> send_future;
std::future<void> completion_future;
ThreadSafeFIFO<ImagePusherQueueElement> queue;
std::atomic<int> fd{-1};
int listen_fd = -1;
std::string endpoint;
uint32_t socket_number = 0;
uint64_t run_number = 0;
std::optional<size_t> zerocopy_threshold;
std::optional<int32_t> send_buffer_size;
constexpr static auto AcceptTimeout = std::chrono::seconds(5);
std::atomic<bool> ever_connected{false};
std::atomic<bool> broken{false};
std::atomic<uint64_t> next_tx_id{1};
std::mutex inflight_mutex;
std::deque<InflightZC> inflight;
Logger logger{"TCPStream2PusherSocket"};
void WriterThread();
void CompletionThread();
bool EnsureAccepted();
void CloseDataSocket();
bool SendAll(const void *buf, size_t len);
bool SendFrame(const uint8_t *data, size_t size, TCPFrameType type, int64_t image_number, ZeroCopyReturnValue *z);
bool SendPayloadZC(const uint8_t *data, size_t size, ZeroCopyReturnValue *z);
public:
explicit TCPStreamPusherSocket(const std::string& addr,
uint32_t in_socket_number,
std::optional<int32_t> send_buffer_size,
std::optional<size_t> in_zerocopy_threshold,
size_t send_queue_size = 4096);
~TCPStreamPusherSocket();
std::string GetEndpointName() const;
bool Send(const uint8_t *data, size_t size, TCPFrameType type, int64_t image_number = -1);
bool AcceptConnection(std::chrono::milliseconds timeout = std::chrono::duration_cast<std::chrono::milliseconds>(AcceptTimeout));
bool IsConnectionAlive() const;
void StartWriterThread();
void StopWriterThread();
void SetRunNumber(uint64_t in_run_number);
void SendImage(ZeroCopyReturnValue &z);
bool IsBroken() const;
};