diff --git a/image_pusher/TCPStreamPusherSocket.cpp b/image_pusher/TCPStreamPusherSocket.cpp index 6094827c..489c0498 100644 --- a/image_pusher/TCPStreamPusherSocket.cpp +++ b/image_pusher/TCPStreamPusherSocket.cpp @@ -99,22 +99,15 @@ void TCPStreamPusherSocket::CloseDataSocket() { bool TCPStreamPusherSocket::AcceptConnection(std::chrono::milliseconds timeout) { std::unique_lock ul(send_mutex); - if (broken) - return false; + CloseDataSocket(); - if (fd.load() >= 0) - return true; - - if (ever_connected) { - broken = true; // session policy: no reconnect - return false; - } + broken = false; pollfd pfd{}; pfd.fd = listen_fd; pfd.events = POLLIN; - const int prc = ::poll(&pfd, 1, static_cast(timeout.count())); + const int prc = poll(&pfd, 1, static_cast(timeout.count())); if (prc == 0) { logger.Error("TCP accept timeout (" + std::to_string(timeout.count()) + " ms) on " + endpoint); return false; @@ -130,7 +123,7 @@ bool TCPStreamPusherSocket::AcceptConnection(std::chrono::milliseconds timeout) return false; } - int new_fd = ::accept(listen_fd, nullptr, nullptr); + int new_fd = accept(listen_fd, nullptr, nullptr); if (new_fd < 0) return false; @@ -143,7 +136,7 @@ bool TCPStreamPusherSocket::AcceptConnection(std::chrono::milliseconds timeout) #endif fd.store(new_fd); - ever_connected = true; + logger.Info("TCP peer connected on " + endpoint); return true; } @@ -159,25 +152,19 @@ bool TCPStreamPusherSocket::IsConnectionAlive() const { pollfd pfd{}; pfd.fd = local_fd; pfd.events = POLLOUT; - if (::poll(&pfd, 1, 0) < 0) + if (poll(&pfd, 1, 0) < 0) return false; if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0) return false; int so_error = 0; socklen_t len = sizeof(so_error); - if (::getsockopt(local_fd, SOL_SOCKET, SO_ERROR, &so_error, &len) != 0) + if (getsockopt(local_fd, SOL_SOCKET, SO_ERROR, &so_error, &len) != 0) return false; return so_error == 0; } -bool TCPStreamPusherSocket::EnsureAccepted() { - if (fd.load() >= 0) - return true; - return AcceptConnection(std::chrono::duration_cast(AcceptTimeout)); -} - bool TCPStreamPusherSocket::SendAll(const void *buf, size_t len) { const uint8_t *p = static_cast(buf); size_t sent = 0; @@ -282,9 +269,6 @@ void TCPStreamPusherSocket::WriterThread() { if (!e.z) continue; - if (!EnsureAccepted()) - broken = true; - if (broken) { e.z->release(); continue; diff --git a/image_pusher/TCPStreamPusherSocket.h b/image_pusher/TCPStreamPusherSocket.h index e2338c19..c735be59 100644 --- a/image_pusher/TCPStreamPusherSocket.h +++ b/image_pusher/TCPStreamPusherSocket.h @@ -39,7 +39,6 @@ class TCPStreamPusherSocket { constexpr static auto AcceptTimeout = std::chrono::seconds(5); - std::atomic ever_connected{false}; std::atomic broken{false}; std::atomic next_tx_id{1}; @@ -51,7 +50,6 @@ class TCPStreamPusherSocket { void WriterThread(); void CompletionThread(); - bool EnsureAccepted(); void CloseDataSocket(); bool SendAll(const void *buf, size_t len);