diff --git a/common/ThreadSafeFIFO.h b/common/ThreadSafeFIFO.h index 580a8de3..89ca3ea2 100644 --- a/common/ThreadSafeFIFO.h +++ b/common/ThreadSafeFIFO.h @@ -49,6 +49,18 @@ public: c_empty.notify_one(); }; + bool PutTimeout(T val, std::chrono::milliseconds timeout) { + std::unique_lock ul(m); + if (!c_full.wait_for(ul, timeout, [&]{ return queue.size() < max_size; })) + return false; + queue.push(val); + utilization++; + if (utilization > max_utilization) + max_utilization = utilization; + c_empty.notify_one(); + return true; + } + int Get(T &val) { std::unique_lock ul(m); diff --git a/image_pusher/TCPStreamPusher.cpp b/image_pusher/TCPStreamPusher.cpp index bf3ca388..18ae815a 100644 --- a/image_pusher/TCPStreamPusher.cpp +++ b/image_pusher/TCPStreamPusher.cpp @@ -10,11 +10,12 @@ #include #include #include +#include #if defined(MSG_ZEROCOPY) #include #endif -std::pair TCPStreamPusher::ParseTcpAddress(const std::string& addr) { +std::pair> TCPStreamPusher::ParseTcpAddress(const std::string& addr) { const std::string prefix = "tcp://"; if (addr.rfind(prefix, 0) != 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Invalid TCP address: " + addr); @@ -27,6 +28,9 @@ std::pair TCPStreamPusher::ParseTcpAddress(const std::str const auto host = hp.substr(0, p); const auto port_str = hp.substr(p + 1); + if (port_str == "*") + return {host, std::nullopt}; + int port_i = 0; try { size_t parsed = 0; @@ -43,8 +47,8 @@ std::pair TCPStreamPusher::ParseTcpAddress(const std::str return {host, static_cast(port_i)}; } -int TCPStreamPusher::OpenListenSocket(const std::string& addr) { - auto [host, port] = ParseTcpAddress(addr); +std::pair TCPStreamPusher::OpenListenSocket(const std::string& addr) { + auto [host, port_opt] = ParseTcpAddress(addr); int listen_fd = ::socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) @@ -55,7 +59,7 @@ int TCPStreamPusher::OpenListenSocket(const std::string& addr) { sockaddr_in sin{}; sin.sin_family = AF_INET; - sin.sin_port = htons(port); + sin.sin_port = htons(port_opt.has_value() ? port_opt.value() : 0); if (host == "*" || host == "0.0.0.0") sin.sin_addr.s_addr = htonl(INADDR_ANY); else if (inet_pton(AF_INET, host.c_str(), &sin.sin_addr) != 1) @@ -67,7 +71,16 @@ int TCPStreamPusher::OpenListenSocket(const std::string& addr) { if (listen(listen_fd, 64) != 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "listen() failed on " + addr); - return listen_fd; + sockaddr_in actual{}; + socklen_t actual_len = sizeof(actual); + if (getsockname(listen_fd, reinterpret_cast(&actual), &actual_len) != 0) + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "getsockname() failed on " + addr); + + const uint16_t bound_port = ntohs(actual.sin_port); + const std::string normalized_host = (host == "*") ? "0.0.0.0" : host; + const std::string bound_endpoint = "tcp://" + normalized_host + ":" + std::to_string(bound_port); + + return {listen_fd, bound_endpoint}; } int TCPStreamPusher::AcceptOne(int listen_fd, std::chrono::milliseconds timeout) { @@ -105,7 +118,10 @@ TCPStreamPusher::TCPStreamPusher(const std::string& addr, if (max_connections == 0) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Max TCP connections cannot be zero"); - listen_fd.store(OpenListenSocket(endpoint)); + auto [lfd, bound_endpoint] = OpenListenSocket(endpoint); + listen_fd.store(lfd); + endpoint = bound_endpoint; + acceptor_running = true; acceptor_future = std::async(std::launch::async, &TCPStreamPusher::AcceptorThread, this); keepalive_future = std::async(std::launch::async, &TCPStreamPusher::KeepaliveThread, this); @@ -128,15 +144,18 @@ TCPStreamPusher::~TCPStreamPusher() { // 2. Now no background threads touch connections_mutex. Tear down connections. // We do NOT hold the mutex while joining futures, to avoid deadlock. - std::vector> local_connections; + std::vector> local_connections; { std::lock_guard lg(connections_mutex); local_connections = std::move(connections); connections.clear(); + session_connections.clear(); } - for (auto& c : local_connections) - TearDownConnection(*c); + for (auto& c : local_connections) { + if (c) + TearDownConnection(*c); + } } void TCPStreamPusher::TearDownConnection(Connection& c) { @@ -179,6 +198,7 @@ bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len, bool a bool zc_used = false; uint32_t zc_first = 0; uint32_t zc_last = 0; + const auto deadline = std::chrono::steady_clock::now() + send_total_timeout; bool try_zerocopy = false; #if defined(MSG_ZEROCOPY) @@ -194,6 +214,40 @@ bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len, bool a return false; } + if (std::chrono::steady_clock::now() >= deadline) { + c.broken = true; + CloseFd(c.fd); + if (zc_used_out) *zc_used_out = zc_used; + if (zc_first_out) *zc_first_out = zc_first; + if (zc_last_out) *zc_last_out = zc_last; + return false; + } + + pollfd pfd{}; + pfd.fd = local_fd; + pfd.events = POLLOUT; + const int prc = poll(&pfd, 1, static_cast(send_poll_timeout.count())); + if (prc < 0) { + if (errno == EINTR) + continue; + c.broken = true; + CloseFd(c.fd); + if (zc_used_out) *zc_used_out = zc_used; + if (zc_first_out) *zc_first_out = zc_first; + if (zc_last_out) *zc_last_out = zc_last; + return false; + } + if (prc == 0) + continue; + if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0 && !(pfd.revents & POLLOUT)) { + c.broken = true; + CloseFd(c.fd); + if (zc_used_out) *zc_used_out = zc_used; + if (zc_first_out) *zc_first_out = zc_first; + if (zc_last_out) *zc_last_out = zc_last; + return false; + } + int flags = MSG_NOSIGNAL; #if defined(MSG_ZEROCOPY) if (try_zerocopy) @@ -202,7 +256,7 @@ bool TCPStreamPusher::SendAll(Connection& c, const void* buf, size_t len, bool a ssize_t rc = ::send(local_fd, p + sent, len - sent, flags); if (rc < 0) { - if (errno == EINTR) + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) continue; #if defined(MSG_ZEROCOPY) @@ -283,6 +337,7 @@ bool TCPStreamPusher::SendFrame(Connection& c, const uint8_t* data, size_t size, return true; } +// Caller must hold c.zc_mutex. void TCPStreamPusher::ReleaseCompletedZeroCopy(Connection& c) { while (!c.zc_pending.empty()) { const auto& front = c.zc_pending.front(); @@ -630,7 +685,7 @@ void TCPStreamPusher::AcceptorThread() { } void TCPStreamPusher::SetupNewConnection(int new_fd, uint32_t socket_number) { - auto c = std::make_unique(send_queue_size); + auto c = std::make_shared(send_queue_size); c->socket_number = socket_number; c->fd.store(new_fd); @@ -676,23 +731,17 @@ void TCPStreamPusher::RemoveDeadConnections() { // doesn't take connections_mutex, so no deadlock. auto it = connections.begin(); while (it != connections.end()) { - auto& c = **it; - if (c.broken || !c.connected || !IsConnectionAlive(c)) { - // Mark dead first - c.connected = false; - c.broken = true; + auto c = *it; + if (c->broken || !c->connected || !IsConnectionAlive(*c)) { + c->connected = false; + c->broken = true; + StopDataCollectionThreads(*c); + CloseFd(c->fd); - // Stop data collection threads (they don't take connections_mutex) - StopDataCollectionThreads(c); + if (c->persistent_ack_future.valid()) + c->persistent_ack_future.get(); - // Close fd to unblock PersistentAckThread - CloseFd(c.fd); - - // Join persistent ack thread — safe because it doesn't take connections_mutex - if (c.persistent_ack_future.valid()) - c.persistent_ack_future.get(); - - logger.Info("Removed dead connection (socket_number=" + std::to_string(c.socket_number) + ")"); + logger.Info("Removed dead connection (socket_number=" + std::to_string(c->socket_number) + ")"); it = connections.erase(it); } else { ++it; @@ -777,7 +826,15 @@ void TCPStreamPusher::StopDataCollectionThreads(Connection& c) { return; c.active = false; - c.queue.PutBlocking({.end = true}); + + // Avoid potential shutdown deadlock if queue is full and writer is stalled. + if (!c.queue.PutTimeout({.end = true}, std::chrono::milliseconds(200))) { + c.broken = true; + CloseFd(c.fd); + c.queue.Clear(); + (void)c.queue.Put({.end = true}); + } + c.ack_cv.notify_all(); c.zc_cv.notify_all(); @@ -846,33 +903,30 @@ void TCPStreamPusher::StartDataCollection(StartMessage& message) { total_data_acked_bad.store(0, std::memory_order_relaxed); total_data_acked_total.store(0, std::memory_order_relaxed); - // Stop any leftover data-collection threads and clean up dead connections + std::vector> local_connections; { std::lock_guard lg(connections_mutex); for (auto& c : connections) StopDataCollectionThreads(*c); RemoveDeadConnections(); + if (connections.empty()) + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No writers connected to " + endpoint); + + session_connections = connections; + local_connections = session_connections; } - std::lock_guard lg(connections_mutex); - - if (connections.empty()) - throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, - "No writers connected to " + endpoint); - - logger.Info("Starting data collection with " + std::to_string(connections.size()) + " connected writers"); - + logger.Info("Starting data collection with " + std::to_string(local_connections.size()) + " connected writers"); data_collection_active = true; - // Start writer + zerocopy threads for each connection - for (auto& c : connections) + for (auto& c : local_connections) StartDataCollectionThreads(*c); - std::vector started(connections.size(), false); + std::vector started(local_connections.size(), false); auto rollback_cancel = [&]() { - for (size_t i = 0; i < connections.size(); i++) { - auto& c = *connections[i]; + for (size_t i = 0; i < local_connections.size(); i++) { + auto& c = *local_connections[i]; if (!started[i] || c.broken) continue; @@ -883,16 +937,20 @@ void TCPStreamPusher::StartDataCollection(StartMessage& message) { (void)WaitForAck(c, TCPFrameType::CANCEL, std::chrono::milliseconds(500), &cancel_ack_err); } - for (auto& c : connections) + for (auto& c : local_connections) StopDataCollectionThreads(*c); + { + std::lock_guard lg(connections_mutex); + session_connections.clear(); + } data_collection_active = false; }; - for (size_t i = 0; i < connections.size(); i++) { - auto& c = *connections[i]; + for (size_t i = 0; i < local_connections.size(); i++) { + auto& c = *local_connections[i]; - message.socket_number = static_cast(i); + message.socket_number = static_cast(c.socket_number); message.write_master_file = (i == 0); serializer.SerializeSequenceStart(message); @@ -918,13 +976,19 @@ void TCPStreamPusher::StartDataCollection(StartMessage& message) { } bool TCPStreamPusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) { - std::lock_guard lg(connections_mutex); - if (connections.empty()) - return false; - - auto idx = static_cast((image_number / images_per_file) % static_cast(connections.size())); - auto& c = *connections[idx]; + std::shared_ptr target; + size_t conn_count = 0; + { + std::lock_guard lg(connections_mutex); + const auto& use = (!session_connections.empty() ? session_connections : connections); + if (use.empty()) + return false; + conn_count = use.size(); + auto idx = static_cast((image_number / images_per_file) % static_cast(conn_count)); + target = use[idx]; + } + auto& c = *target; if (c.broken || !IsConnectionAlive(c)) return false; @@ -936,39 +1000,56 @@ void TCPStreamPusher::SendImage(ZeroCopyReturnValue &z) { // Look up the target connection while holding the mutex, but do NOT call // PutBlocking while holding it — that can block indefinitely and deadlock // against AcceptorThread/KeepaliveThread. - Connection* target = nullptr; + std::shared_ptr target; { std::lock_guard lg(connections_mutex); - if (connections.empty()) { + const auto& use = (!session_connections.empty() ? session_connections : connections); + if (use.empty()) { z.release(); return; } - auto idx = static_cast((z.GetImageNumber() / images_per_file) % static_cast(connections.size())); - auto& c = *connections[idx]; - - if (c.broken) { - z.release(); - return; - } - target = &c; + auto idx = static_cast((z.GetImageNumber() / images_per_file) % static_cast(use.size())); + target = use[idx]; } - target->queue.PutBlocking(ImagePusherQueueElement{ - .image_data = static_cast(z.GetImage()), - .z = &z, - .end = false - }); + if (!target || target->broken || !target->active) { + z.release(); + return; + } + + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2); + while (std::chrono::steady_clock::now() < deadline) { + if (target->broken || !target->active) { + z.release(); + return; + } + + if (target->queue.PutTimeout(ImagePusherQueueElement{ + .image_data = static_cast(z.GetImage()), + .z = &z, + .end = false + }, std::chrono::milliseconds(50))) { + return; + } + } + + target->broken = true; + z.release(); } bool TCPStreamPusher::EndDataCollection(const EndMessage& message) { serializer.SerializeSequenceEnd(message); bool ret = true; + std::vector> local_connections; - std::lock_guard lg(connections_mutex); + { + std::lock_guard lg(connections_mutex); + local_connections = (!session_connections.empty() ? session_connections : connections); + } - for (auto& cptr : connections) { + for (auto& cptr : local_connections) { auto& c = *cptr; if (c.broken) { ret = false; @@ -988,28 +1069,35 @@ bool TCPStreamPusher::EndDataCollection(const EndMessage& message) { ret = false; } - // Stop only data-collection threads, keep connections alive - for (auto& c : connections) + for (auto& c : local_connections) StopDataCollectionThreads(*c); + { + std::lock_guard lg(connections_mutex); + session_connections.clear(); + } + data_collection_active = false; transmission_error = !ret; return ret; } bool TCPStreamPusher::SendCalibration(const CompressedImage& message) { - std::lock_guard lg(connections_mutex); - if (connections.empty()) + std::shared_ptr target; + { + std::lock_guard lg(connections_mutex); + if (connections.empty()) + return false; + target = connections[0]; + } + + if (!target || target->broken) return false; serializer.SerializeCalibration(message); - auto& c = *connections[0]; - if (c.broken) - return false; - - std::unique_lock ul(c.send_mutex); - return SendFrame(c, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::CALIBRATION, -1, nullptr); + std::unique_lock ul(target->send_mutex); + return SendFrame(*target, serialization_buffer.data(), serializer.GetBufferSize(), TCPFrameType::CALIBRATION, -1, nullptr); } std::string TCPStreamPusher::Finalize() { diff --git a/image_pusher/TCPStreamPusher.h b/image_pusher/TCPStreamPusher.h index d6b3fb79..0243e445 100644 --- a/image_pusher/TCPStreamPusher.h +++ b/image_pusher/TCPStreamPusher.h @@ -18,6 +18,26 @@ #include "../common/Logger.h" #include "../common/JfjochTCP.h" +/// TCP-based image stream pusher with persistent connection pool. +/// +/// Threading model: +/// - AcceptorThread: accepts new TCP connections, holds connections_mutex briefly +/// - KeepaliveThread: sends periodic keepalive frames when idle (skipped during data collection) +/// - Per-connection WriterThread: drains the connection's queue, sends DATA frames +/// - Per-connection PersistentAckThread: reads ACKs and keepalive pongs from the peer +/// - Per-connection ZeroCopyCompletionThread: processes kernel MSG_ZEROCOPY completions +/// +/// Lock ordering: connections_mutex → send_mutex → ack_mutex → zc_mutex +/// IMPORTANT: Never call blocking queue operations while holding connections_mutex. +/// +/// Concurrency contract: +/// - StartDataCollection, EndDataCollection, SendCalibration, and Finalize +/// are called from a single control thread in a serialized manner. +/// - SendImage may be called concurrently from multiple threads between +/// StartDataCollection and EndDataCollection. +/// - SendCalibration is called between StartDataCollection and SendImage calls. +/// - GetConnectedWriters, GetImagesWritten, and PrintSetup are safe to call at any time. + class TCPStreamPusher : public ImagePusher { struct Connection { explicit Connection(size_t queue_size) : queue(queue_size) {} @@ -86,7 +106,8 @@ class TCPStreamPusher : public ImagePusher { // Persistent connection pool, guarded by connections_mutex. // IMPORTANT: never call PutBlocking/GetBlocking on a queue while holding this mutex. mutable std::mutex connections_mutex; - std::vector> connections; + std::vector> connections; + std::vector> session_connections; // Acceptor thread state std::atomic listen_fd{-1}; @@ -94,6 +115,9 @@ class TCPStreamPusher : public ImagePusher { std::future acceptor_future; std::future keepalive_future; + std::chrono::milliseconds send_poll_timeout{250}; + std::chrono::milliseconds send_total_timeout{3000}; + int64_t images_per_file = 1; uint64_t run_number = 0; std::string run_name; @@ -106,8 +130,8 @@ class TCPStreamPusher : public ImagePusher { Logger logger{"TCPStreamPusher"}; - static std::pair ParseTcpAddress(const std::string& addr); - static int OpenListenSocket(const std::string& addr); + static std::pair> ParseTcpAddress(const std::string& addr); + static std::pair OpenListenSocket(const std::string& addr); static int AcceptOne(int listen_fd, std::chrono::milliseconds timeout); static void CloseFd(std::atomic& fd); @@ -144,6 +168,8 @@ public: ~TCPStreamPusher() override; + std::string GetAddress() const { return endpoint; } + /// Returns the number of currently connected writers (can be called at any time) size_t GetConnectedWriters() const; diff --git a/tests/TCPImagePusherTest.cpp b/tests/TCPImagePusherTest.cpp index dce1b93f..1217f98e 100644 --- a/tests/TCPImagePusherTest.cpp +++ b/tests/TCPImagePusherTest.cpp @@ -31,6 +31,11 @@ TEST_CASE("TCPImageCommTest_2Writers_WithAck", "[TCP]") { TCPStreamPusher pusher(addr,npullers); + // Wait for all pullers to connect before starting data collection + for (int attempt = 0; attempt < 100 && pusher.GetConnectedWriters() < static_cast(npullers); ++attempt) + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + REQUIRE(pusher.GetConnectedWriters() == static_cast(npullers)); + std::vector received(npullers, 0); std::vector processed(npullers, 0); @@ -152,6 +157,11 @@ TEST_CASE("TCPImageCommTest_DataFatalAck_PropagatesToPusher", "[TCP]") { TCPStreamPusher pusher(addr,npullers); + // Wait for all pullers to connect before starting data collection + for (int attempt = 0; attempt < 100 && pusher.GetConnectedWriters() < static_cast(npullers); ++attempt) + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + REQUIRE(pusher.GetConnectedWriters() == static_cast(npullers)); + std::atomic sent_fatal{false}; std::thread sender([&] { @@ -278,6 +288,11 @@ TEST_CASE("TCPImageCommTest_GetAckProgress_Correct", "[TCP]") { TCPStreamPusher pusher(addr,npullers); + // Wait for all pullers to connect before starting data collection + for (int attempt = 0; attempt < 100 && pusher.GetConnectedWriters() < static_cast(npullers); ++attempt) + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + REQUIRE(pusher.GetConnectedWriters() == static_cast(npullers)); + std::thread sender([&] { std::vector serialization_buffer(16 * 1024 * 1024); CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); @@ -382,3 +397,129 @@ TEST_CASE("TCPImageCommTest_GetAckProgress_Correct", "[TCP]") { for (auto &p : puller) p->Disconnect(); } + +TEST_CASE("TCPImageCommTest_AutoPort_StarBind", "[TCP]") { + const size_t nframes = 8; + const int64_t images_per_file = 4; + + DiffractionExperiment x(DetJF(1)); + x.Raw(); + x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).IncidentEnergy_keV(12.4) + .ImagesPerTrigger(nframes).Compression(CompressionAlgorithm::NO_COMPRESSION); + + std::vector image1(x.GetPixelsNum() * nframes, 7u); + + TCPStreamPusher pusher("tcp://127.0.0.1:*", 1); + TCPImagePuller puller(pusher.GetAddress(), 64 * 1024 * 1024); + + std::thread receiver([&] { + bool seen_end = false; + uint64_t processed = 0; + + while (!seen_end) { + auto out = puller.PollImage(std::chrono::seconds(10)); + REQUIRE(out.has_value()); + REQUIRE(out->cbor != nullptr); + REQUIRE(out->tcp_msg != nullptr); + + const auto &h = out->tcp_msg->header; + if (out->cbor->start_message) { + PullerAckMessage ack{.ack_for = TCPFrameType::START, .ok = true, .run_number = h.run_number, + .socket_number = h.socket_number, .error_code = TCPAckCode::None}; + REQUIRE(puller.SendAck(ack)); + } else if (out->cbor->data_message) { + processed++; + } else if (out->cbor->end_message) { + PullerAckMessage ack{.ack_for = TCPFrameType::END, .ok = true, .run_number = h.run_number, + .socket_number = h.socket_number, .processed_images = processed, .error_code = TCPAckCode::None}; + REQUIRE(puller.SendAck(ack)); + seen_end = true; + } + } + }); + + std::vector serialization_buffer(16 * 1024 * 1024); + CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); + + StartMessage start{.images_per_file = images_per_file, .write_master_file = true}; + EndMessage end{}; + pusher.StartDataCollection(start); + + for (int64_t i = 0; i < static_cast(nframes); i++) { + DataMessage data_message; + data_message.number = i; + data_message.image = CompressedImage(image1.data() + i * x.GetPixelsNum(), + x.GetPixelsNum() * sizeof(uint16_t), + x.GetXPixelsNum(), x.GetYPixelsNum(), + x.GetImageMode(), x.GetCompressionAlgorithm()); + serializer.SerializeImage(data_message); + REQUIRE(pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i)); + } + + REQUIRE(pusher.EndDataCollection(end)); + receiver.join(); + puller.Disconnect(); +} + +TEST_CASE("TCPImageCommTest_DisconnectMidWrite_NoHang", "[TCP]") { + const size_t nframes = 256; + const int64_t images_per_file = 16; + + DiffractionExperiment x(DetJF(1)); + x.Raw(); + x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).IncidentEnergy_keV(12.4) + .ImagesPerTrigger(nframes).Compression(CompressionAlgorithm::NO_COMPRESSION); + + std::vector image1(x.GetPixelsNum() * nframes, 11u); + + TCPStreamPusher pusher("tcp://127.0.0.1:*", 1); + TCPImagePuller puller(pusher.GetAddress(), 64 * 1024 * 1024); + + std::thread receiver([&] { + bool disconnected = false; + while (!disconnected) { + auto out = puller.PollImage(std::chrono::seconds(10)); + REQUIRE(out.has_value()); + REQUIRE(out->cbor != nullptr); + REQUIRE(out->tcp_msg != nullptr); + + const auto &h = out->tcp_msg->header; + if (out->cbor->start_message) { + PullerAckMessage ack{.ack_for = TCPFrameType::START, .ok = true, .run_number = h.run_number, + .socket_number = h.socket_number, .error_code = TCPAckCode::None}; + REQUIRE(puller.SendAck(ack)); + } else if (out->cbor->data_message) { + puller.Disconnect(); // simulate puller disappearing mid-stream + disconnected = true; + } + } + }); + + auto sender = std::async(std::launch::async, [&] { + std::vector serialization_buffer(16 * 1024 * 1024); + CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); + + StartMessage start{.images_per_file = images_per_file, .write_master_file = true}; + EndMessage end{}; + + pusher.StartDataCollection(start); + + for (int64_t i = 0; i < static_cast(nframes); i++) { + DataMessage data_message; + data_message.number = i; + data_message.image = CompressedImage(image1.data() + i * x.GetPixelsNum(), + x.GetPixelsNum() * sizeof(uint16_t), + x.GetXPixelsNum(), x.GetYPixelsNum(), + x.GetImageMode(), x.GetCompressionAlgorithm()); + serializer.SerializeImage(data_message); + (void)pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i); + } + + return pusher.EndDataCollection(end); + }); + + REQUIRE(sender.wait_for(std::chrono::seconds(20)) == std::future_status::ready); + CHECK(sender.get() == false); + + receiver.join(); +} \ No newline at end of file