All checks were successful
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 10m51s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 8m0s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 9m6s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 10m7s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 9m47s
Build Packages / Generate python client (push) Successful in 29s
Build Packages / Build documentation (push) Successful in 43s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 10m46s
Build Packages / build:rpm (rocky8) (push) Successful in 9m33s
Build Packages / Unit tests (push) Has been skipped
Build Packages / build:rpm (ubuntu2204) (push) Successful in 8m47s
Build Packages / build:rpm (rocky9) (push) Successful in 9m55s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 9m4s
This is an UNSTABLE release. The release has significant modifications and bug fixes, if things go wrong, it is better to revert to 1.0.0-rc.124. * jfjoch_broker: Default EIGER readout time is 20 microseconds * jfjoch_broker: Multiple improvements regarding performance * jfjoch_broker: Image buffer allows to track frames in preparation and sending * jfjoch_broker: Dedicated thread for ZeroMQ transmission to better utilize the image buffer * jfjoch_broker: Experimental implementation of transmission with raw TCP/IP sockets * jfjoch_writer: Fixes regarding properly closing files in long data collections * jfjoch_process: Scale & merge has been significantly improved, but it is not yet integrated into mainstream code Reviewed-on: #34
83 lines
2.4 KiB
C++
83 lines
2.4 KiB
C++
// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
|
|
// SPDX-License-Identifier: GPL-3.0-only
|
|
|
|
#pragma once
|
|
|
|
#include <atomic>
|
|
#include <future>
|
|
#include <mutex>
|
|
#include <deque>
|
|
#include <unordered_map>
|
|
#include <optional>
|
|
#include <string>
|
|
|
|
#include "ImagePusher.h"
|
|
#include "../common/ThreadSafeFIFO.h"
|
|
#include "../common/Logger.h"
|
|
#include "../common/JfjochTCP.h"
|
|
|
|
class TCPStreamPusherSocket {
|
|
struct InflightZC {
|
|
ZeroCopyReturnValue *z = nullptr;
|
|
uint64_t tx_id = 0;
|
|
};
|
|
|
|
std::mutex send_mutex;
|
|
std::atomic<bool> active = false;
|
|
std::future<void> send_future;
|
|
std::future<void> completion_future;
|
|
|
|
ThreadSafeFIFO<ImagePusherQueueElement> queue;
|
|
|
|
std::atomic<int> fd{-1};
|
|
int listen_fd = -1;
|
|
std::string endpoint;
|
|
uint32_t socket_number = 0;
|
|
uint64_t run_number = 0;
|
|
std::optional<size_t> zerocopy_threshold;
|
|
std::optional<int32_t> send_buffer_size;
|
|
|
|
constexpr static auto AcceptTimeout = std::chrono::seconds(5);
|
|
|
|
std::atomic<bool> broken{false};
|
|
|
|
std::atomic<uint64_t> next_tx_id{1};
|
|
std::mutex inflight_mutex;
|
|
std::deque<InflightZC> inflight;
|
|
|
|
Logger logger{"TCPStream2PusherSocket"};
|
|
|
|
void WriterThread();
|
|
void CompletionThread();
|
|
|
|
void CloseDataSocket();
|
|
|
|
bool SendAll(const void *buf, size_t len);
|
|
bool SendFrame(const uint8_t *data, size_t size, TCPFrameType type, int64_t image_number, ZeroCopyReturnValue *z);
|
|
bool SendPayloadZC(const uint8_t *data, size_t size, ZeroCopyReturnValue *z);
|
|
public:
|
|
explicit TCPStreamPusherSocket(const std::string& addr,
|
|
uint32_t in_socket_number,
|
|
std::optional<int32_t> send_buffer_size,
|
|
std::optional<size_t> in_zerocopy_threshold,
|
|
size_t send_queue_size = 4096);
|
|
|
|
~TCPStreamPusherSocket();
|
|
|
|
std::string GetEndpointName() const;
|
|
|
|
bool Send(const uint8_t *data, size_t size, TCPFrameType type, int64_t image_number = -1);
|
|
|
|
bool AcceptConnection(std::chrono::milliseconds timeout = std::chrono::duration_cast<std::chrono::milliseconds>(AcceptTimeout));
|
|
bool IsConnectionAlive() const;
|
|
|
|
void StartWriterThread();
|
|
void StopWriterThread();
|
|
|
|
void SetRunNumber(uint64_t in_run_number);
|
|
|
|
void SendImage(ZeroCopyReturnValue &z);
|
|
|
|
bool IsBroken() const;
|
|
};
|