Files
Jungfraujoch/image_pusher/TCPStreamPusherSocket.h
leonarski_f f3e0a15d26
All checks were successful
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 10m51s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 8m0s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 9m6s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 10m7s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 9m47s
Build Packages / Generate python client (push) Successful in 29s
Build Packages / Build documentation (push) Successful in 43s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 10m46s
Build Packages / build:rpm (rocky8) (push) Successful in 9m33s
Build Packages / Unit tests (push) Has been skipped
Build Packages / build:rpm (ubuntu2204) (push) Successful in 8m47s
Build Packages / build:rpm (rocky9) (push) Successful in 9m55s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 9m4s
v1.0.0-rc.127 (#34)
This is an UNSTABLE release. The release has significant modifications and bug fixes, if things go wrong, it is better to revert to 1.0.0-rc.124.

* jfjoch_broker: Default EIGER readout time is 20 microseconds
* jfjoch_broker: Multiple improvements regarding performance
* jfjoch_broker: Image buffer allows to track frames in preparation and sending
* jfjoch_broker: Dedicated thread for ZeroMQ transmission to better utilize the image buffer
* jfjoch_broker: Experimental implementation of transmission with raw TCP/IP sockets
* jfjoch_writer: Fixes regarding properly closing files in long data collections
* jfjoch_process: Scale & merge has been significantly improved, but it is not yet integrated into mainstream code

Reviewed-on: #34
2026-03-02 15:57:12 +01:00

83 lines
2.4 KiB
C++

// SPDX-FileCopyrightText: 2025 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
// SPDX-License-Identifier: GPL-3.0-only
#pragma once
#include <atomic>
#include <future>
#include <mutex>
#include <deque>
#include <unordered_map>
#include <optional>
#include <string>
#include "ImagePusher.h"
#include "../common/ThreadSafeFIFO.h"
#include "../common/Logger.h"
#include "../common/JfjochTCP.h"
class TCPStreamPusherSocket {
struct InflightZC {
ZeroCopyReturnValue *z = nullptr;
uint64_t tx_id = 0;
};
std::mutex send_mutex;
std::atomic<bool> active = false;
std::future<void> send_future;
std::future<void> completion_future;
ThreadSafeFIFO<ImagePusherQueueElement> queue;
std::atomic<int> fd{-1};
int listen_fd = -1;
std::string endpoint;
uint32_t socket_number = 0;
uint64_t run_number = 0;
std::optional<size_t> zerocopy_threshold;
std::optional<int32_t> send_buffer_size;
constexpr static auto AcceptTimeout = std::chrono::seconds(5);
std::atomic<bool> broken{false};
std::atomic<uint64_t> next_tx_id{1};
std::mutex inflight_mutex;
std::deque<InflightZC> inflight;
Logger logger{"TCPStream2PusherSocket"};
void WriterThread();
void CompletionThread();
void CloseDataSocket();
bool SendAll(const void *buf, size_t len);
bool SendFrame(const uint8_t *data, size_t size, TCPFrameType type, int64_t image_number, ZeroCopyReturnValue *z);
bool SendPayloadZC(const uint8_t *data, size_t size, ZeroCopyReturnValue *z);
public:
explicit TCPStreamPusherSocket(const std::string& addr,
uint32_t in_socket_number,
std::optional<int32_t> send_buffer_size,
std::optional<size_t> in_zerocopy_threshold,
size_t send_queue_size = 4096);
~TCPStreamPusherSocket();
std::string GetEndpointName() const;
bool Send(const uint8_t *data, size_t size, TCPFrameType type, int64_t image_number = -1);
bool AcceptConnection(std::chrono::milliseconds timeout = std::chrono::duration_cast<std::chrono::milliseconds>(AcceptTimeout));
bool IsConnectionAlive() const;
void StartWriterThread();
void StopWriterThread();
void SetRunNumber(uint64_t in_run_number);
void SendImage(ZeroCopyReturnValue &z);
bool IsBroken() const;
};