TCPStreamPusherSockets: Fixes to the logic

This commit is contained in:
2026-03-01 20:22:21 +01:00
parent d04e590194
commit 38d8491c2b
2 changed files with 7 additions and 25 deletions
+7 -23
View File
@@ -99,22 +99,15 @@ void TCPStreamPusherSocket::CloseDataSocket() {
bool TCPStreamPusherSocket::AcceptConnection(std::chrono::milliseconds timeout) {
std::unique_lock ul(send_mutex);
if (broken)
return false;
CloseDataSocket();
if (fd.load() >= 0)
return true;
if (ever_connected) {
broken = true; // session policy: no reconnect
return false;
}
broken = false;
pollfd pfd{};
pfd.fd = listen_fd;
pfd.events = POLLIN;
const int prc = ::poll(&pfd, 1, static_cast<int>(timeout.count()));
const int prc = poll(&pfd, 1, static_cast<int>(timeout.count()));
if (prc == 0) {
logger.Error("TCP accept timeout (" + std::to_string(timeout.count()) + " ms) on " + endpoint);
return false;
@@ -130,7 +123,7 @@ bool TCPStreamPusherSocket::AcceptConnection(std::chrono::milliseconds timeout)
return false;
}
int new_fd = ::accept(listen_fd, nullptr, nullptr);
int new_fd = accept(listen_fd, nullptr, nullptr);
if (new_fd < 0)
return false;
@@ -143,7 +136,7 @@ bool TCPStreamPusherSocket::AcceptConnection(std::chrono::milliseconds timeout)
#endif
fd.store(new_fd);
ever_connected = true;
logger.Info("TCP peer connected on " + endpoint);
return true;
}
@@ -159,25 +152,19 @@ bool TCPStreamPusherSocket::IsConnectionAlive() const {
pollfd pfd{};
pfd.fd = local_fd;
pfd.events = POLLOUT;
if (::poll(&pfd, 1, 0) < 0)
if (poll(&pfd, 1, 0) < 0)
return false;
if ((pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) != 0)
return false;
int so_error = 0;
socklen_t len = sizeof(so_error);
if (::getsockopt(local_fd, SOL_SOCKET, SO_ERROR, &so_error, &len) != 0)
if (getsockopt(local_fd, SOL_SOCKET, SO_ERROR, &so_error, &len) != 0)
return false;
return so_error == 0;
}
bool TCPStreamPusherSocket::EnsureAccepted() {
if (fd.load() >= 0)
return true;
return AcceptConnection(std::chrono::duration_cast<std::chrono::milliseconds>(AcceptTimeout));
}
bool TCPStreamPusherSocket::SendAll(const void *buf, size_t len) {
const uint8_t *p = static_cast<const uint8_t *>(buf);
size_t sent = 0;
@@ -282,9 +269,6 @@ void TCPStreamPusherSocket::WriterThread() {
if (!e.z)
continue;
if (!EnsureAccepted())
broken = true;
if (broken) {
e.z->release();
continue;
-2
View File
@@ -39,7 +39,6 @@ class TCPStreamPusherSocket {
constexpr static auto AcceptTimeout = std::chrono::seconds(5);
std::atomic<bool> ever_connected{false};
std::atomic<bool> broken{false};
std::atomic<uint64_t> next_tx_id{1};
@@ -51,7 +50,6 @@ class TCPStreamPusherSocket {
void WriterThread();
void CompletionThread();
bool EnsureAccepted();
void CloseDataSocket();
bool SendAll(const void *buf, size_t len);