diff --git a/src/clientconn.cpp b/src/clientconn.cpp index 35c6547..05567c9 100644 --- a/src/clientconn.cpp +++ b/src/clientconn.cpp @@ -56,19 +56,21 @@ std::shared_ptr Connection::build(const std::shared_ptr void Connection::startConnecting() { - assert(!bev); + assert(!this->bev); - connect(bufferevent_socket_new(context->tcp_loop.base, -1, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS)); + auto bev(bufferevent_socket_new(context->tcp_loop.base, -1, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS)); - bufferevent_setcb(bev.get(), &bevReadS, nullptr, &bevEventS, this); + bufferevent_setcb(bev, &bevReadS, nullptr, &bevEventS, this); timeval tmo(totv(context->effective.tcpTimeout)); - bufferevent_set_timeouts(bev.get(), &tmo, &tmo); + bufferevent_set_timeouts(bev, &tmo, &tmo); - if(bufferevent_socket_connect(bev.get(), const_cast(&peerAddr->sa), peerAddr.size())) + if(bufferevent_socket_connect(bev, const_cast(&peerAddr->sa), peerAddr.size())) throw std::runtime_error("Unable to begin connecting"); - log_debug_printf(io, "Connecting to %s\n", peerName.c_str()); + connect(bev); + + log_debug_printf(io, "Connecting to %s, RX readahead %zu\n", peerName.c_str(), readahead); } void Connection::createChannels() diff --git a/src/conn.cpp b/src/conn.cpp index 63e709e..c76cdba 100644 --- a/src/conn.cpp +++ b/src/conn.cpp @@ -17,6 +17,15 @@ DEFINE_LOGGER(connio, "pvxs.tcp.io"); namespace pvxs { namespace impl { +// Amount of following messages which we allow to be read while +// processing the current message. Avoids some extra recv() calls, +// at the price of maybe extra copying. +// Also bounds the loop in ConnBase::bevRead() +// +// Defined as a multiple of the OS RX socket buffer size. +static +constexpr size_t tcp_readahead_mult = 2u; + ConnBase::ConnBase(bool isClient, bool sendBE, bufferevent* bev, const SockAddr& peerAddr) :peerAddr(peerAddr) ,peerName(peerAddr.tostring()) @@ -29,7 +38,7 @@ ConnBase::ConnBase(bool isClient, bool sendBE, bufferevent* bev, const SockAddr& ,txBody(evbuffer_new()) ,state(Holdoff) { - if(bev) + if(bev) // true for server connection. client will call connect() shortly connect(bev); } @@ -48,10 +57,24 @@ void ConnBase::connect(bufferevent* bev) this->bev.reset(bev); + readahead = evsocket::get_buffer_size(bufferevent_getfd(bev), false); + +#if LIBEVENT_VERSION_NUMBER >= 0x02010000 + // allow to drain OS socket buffer in a single read + (void)bufferevent_set_max_single_read(bev, readahead); +#endif + + readahead *= tcp_readahead_mult; + +#if LIBEVENT_VERSION_NUMBER >= 0x02010000 + // allow attempt to write as much as is available + (void)bufferevent_set_max_single_write(bev, EV_SSIZE_MAX); +#endif + state = isClient ? Connecting : Connected; // initially wait for at least a header - bufferevent_setwatermark(this->bev.get(), EV_READ, 8, tcp_readahead); + bufferevent_setwatermark(this->bev.get(), EV_READ, 8, readahead); } void ConnBase::disconnect() @@ -181,10 +204,10 @@ void ConnBase::bevRead() if(evbuffer_get_length(rx)-8 < len) { // wait for complete payload // and some additional if available - size_t readahead = len; - if(readahead < std::numeric_limits::max()-tcp_readahead) - readahead += tcp_readahead; - bufferevent_setwatermark(bev.get(), EV_READ, len, readahead); + size_t newmax = len; + if(newmax < std::numeric_limits::max()-readahead) + newmax += readahead; + bufferevent_setwatermark(bev.get(), EV_READ, len, newmax); bufferevent_enable(bev.get(), EV_READ); return; } @@ -270,7 +293,7 @@ void ConnBase::bevRead() // incomplete body took earlier return assert(evbuffer_get_length(rx)<8); // wait for next header - bufferevent_setwatermark(bev.get(), EV_READ, 8, tcp_readahead); + bufferevent_setwatermark(bev.get(), EV_READ, 8, readahead); bufferevent_enable(bev.get(), EV_READ); } else { diff --git a/src/conn.h b/src/conn.h index a7b1988..b4ba715 100644 --- a/src/conn.h +++ b/src/conn.h @@ -13,12 +13,6 @@ namespace pvxs { namespace impl { -// Amount of following messages which we allow to be read while -// processing the current message. Avoids some extra recv() calls, -// at the price of maybe extra copying. -// Also bounds the loop in ConnBase::bevRead() -constexpr size_t tcp_readahead = 0x1000u; - struct ConnBase { const SockAddr peerAddr; @@ -47,6 +41,7 @@ public: evbuf segBuf, txBody; size_t statTx{}, statRx{}; + size_t readahead{}; enum { Holdoff, diff --git a/src/evhelper.cpp b/src/evhelper.cpp index eca14de..54b55ee 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -635,6 +635,18 @@ std::vector evsocket::broadcasts(const SockAddr* match) const return ret; } +size_t evsocket::get_buffer_size(evutil_socket_t sock, bool tx) +{ + unsigned ret; + socklen_t len(sizeof(ret)); + auto err = getsockopt(sock, SOL_SOCKET, tx ? SO_SNDBUF : SO_RCVBUF, (char*)&ret, &len); + if(err<0 || len!=sizeof(ret)) { + int err = evutil_socket_geterror(sock); + throw std::system_error(err, std::system_category()); + } + return ret; +} + #if defined(_WIN32) && !defined(EAFNOSUPPORT) # define EAFNOSUPPORT WSAESOCKTNOSUPPORT #endif diff --git a/src/evhelper.h b/src/evhelper.h index b1f8370..1ef7dc2 100644 --- a/src/evhelper.h +++ b/src/evhelper.h @@ -247,6 +247,9 @@ struct PVXS_API evsocket //! wraps osiSockDiscoverBroadcastAddresses() std::vector broadcasts(const SockAddr* match=nullptr) const; + static + size_t get_buffer_size(evutil_socket_t sock, bool tx); + static bool canIPv6; diff --git a/src/serverconn.cpp b/src/serverconn.cpp index fc4de32..978ebcd 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -15,8 +15,9 @@ #include #include "serverconn.h" -// limit on size of TX buffer above which we suspend RX -static constexpr size_t tcp_tx_limit = 0x100000; +// limit on size of TX buffer above which we suspend RX. +// defined as multiple of OS socket TX buffer size +static constexpr size_t tcp_tx_limit_mult = 2u; namespace pvxs { namespace server { @@ -48,8 +49,10 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr * bufferevent_socket_new(iface->server->acceptor_loop.base, sock, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS), SockAddr(peer)) ,iface(iface) + ,tcp_tx_limit(evsocket::get_buffer_size(sock, true) * tcp_tx_limit_mult) { - log_debug_printf(connio, "Client %s connects\n", peerName.c_str()); + log_debug_printf(connio, "Client %s connects, RX readahead %zu TX limit %zu\n", + peerName.c_str(), readahead, tcp_tx_limit); { auto cred(std::make_shared()); diff --git a/src/serverconn.h b/src/serverconn.h index 6f668ae..0af6f0f 100644 --- a/src/serverconn.h +++ b/src/serverconn.h @@ -108,6 +108,7 @@ struct ServerChan struct ServerConn : public ConnBase, public std::enable_shared_from_this { ServIface* const iface; + const size_t tcp_tx_limit; std::shared_ptr cred; diff --git a/test/testsock.cpp b/test/testsock.cpp index 26d712f..2abaa74 100644 --- a/test/testsock.cpp +++ b/test/testsock.cpp @@ -98,6 +98,12 @@ void test_udp(int af) SockAddr bind_addr(SockAddr::loopback(af)); + { + auto rxbuf = evsocket::get_buffer_size(A.sock, false); + auto txbuf = evsocket::get_buffer_size(A.sock, true); + testOk(rxbuf>0 && txbuf>0, "non-zero OS socket buffer sizes %zu, %zu\n", rxbuf, txbuf); + } + A.enable_IP_PKTINFO(); try{ A.bind(bind_addr); @@ -382,7 +388,7 @@ MAIN(testsock) { SockAttach attach; logger_config_env(); - testPlan(66); + testPlan(68); testSetup(); // check for behavior when binding ipv4 and ipv6 to the same socket // as a function of socket type and order.