re-define user bufferevent limits in terms of OS buffer size

This commit is contained in:
Michael Davidsaver
2022-09-25 11:40:31 -07:00
parent 0d5a3f62e1
commit 8333ce30ec
8 changed files with 68 additions and 23 deletions
+8 -6
View File
@@ -56,19 +56,21 @@ std::shared_ptr<Connection> Connection::build(const std::shared_ptr<ContextImpl>
void Connection::startConnecting()
{
assert(!bev);
assert(!this->bev);
connect(bufferevent_socket_new(context->tcp_loop.base, -1, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS));
auto bev(bufferevent_socket_new(context->tcp_loop.base, -1, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS));
bufferevent_setcb(bev.get(), &bevReadS, nullptr, &bevEventS, this);
bufferevent_setcb(bev, &bevReadS, nullptr, &bevEventS, this);
timeval tmo(totv(context->effective.tcpTimeout));
bufferevent_set_timeouts(bev.get(), &tmo, &tmo);
bufferevent_set_timeouts(bev, &tmo, &tmo);
if(bufferevent_socket_connect(bev.get(), const_cast<sockaddr*>(&peerAddr->sa), peerAddr.size()))
if(bufferevent_socket_connect(bev, const_cast<sockaddr*>(&peerAddr->sa), peerAddr.size()))
throw std::runtime_error("Unable to begin connecting");
log_debug_printf(io, "Connecting to %s\n", peerName.c_str());
connect(bev);
log_debug_printf(io, "Connecting to %s, RX readahead %zu\n", peerName.c_str(), readahead);
}
void Connection::createChannels()
+30 -7
View File
@@ -17,6 +17,15 @@ DEFINE_LOGGER(connio, "pvxs.tcp.io");
namespace pvxs {
namespace impl {
// Amount of following messages which we allow to be read while
// processing the current message. Avoids some extra recv() calls,
// at the price of maybe extra copying.
// Also bounds the loop in ConnBase::bevRead()
//
// Defined as a multiple of the OS RX socket buffer size.
static
constexpr size_t tcp_readahead_mult = 2u;
ConnBase::ConnBase(bool isClient, bool sendBE, bufferevent* bev, const SockAddr& peerAddr)
:peerAddr(peerAddr)
,peerName(peerAddr.tostring())
@@ -29,7 +38,7 @@ ConnBase::ConnBase(bool isClient, bool sendBE, bufferevent* bev, const SockAddr&
,txBody(evbuffer_new())
,state(Holdoff)
{
if(bev)
if(bev) // true for server connection. client will call connect() shortly
connect(bev);
}
@@ -48,10 +57,24 @@ void ConnBase::connect(bufferevent* bev)
this->bev.reset(bev);
readahead = evsocket::get_buffer_size(bufferevent_getfd(bev), false);
#if LIBEVENT_VERSION_NUMBER >= 0x02010000
// allow to drain OS socket buffer in a single read
(void)bufferevent_set_max_single_read(bev, readahead);
#endif
readahead *= tcp_readahead_mult;
#if LIBEVENT_VERSION_NUMBER >= 0x02010000
// allow attempt to write as much as is available
(void)bufferevent_set_max_single_write(bev, EV_SSIZE_MAX);
#endif
state = isClient ? Connecting : Connected;
// initially wait for at least a header
bufferevent_setwatermark(this->bev.get(), EV_READ, 8, tcp_readahead);
bufferevent_setwatermark(this->bev.get(), EV_READ, 8, readahead);
}
void ConnBase::disconnect()
@@ -181,10 +204,10 @@ void ConnBase::bevRead()
if(evbuffer_get_length(rx)-8 < len) {
// wait for complete payload
// and some additional if available
size_t readahead = len;
if(readahead < std::numeric_limits<size_t>::max()-tcp_readahead)
readahead += tcp_readahead;
bufferevent_setwatermark(bev.get(), EV_READ, len, readahead);
size_t newmax = len;
if(newmax < std::numeric_limits<size_t>::max()-readahead)
newmax += readahead;
bufferevent_setwatermark(bev.get(), EV_READ, len, newmax);
bufferevent_enable(bev.get(), EV_READ);
return;
}
@@ -270,7 +293,7 @@ void ConnBase::bevRead()
// incomplete body took earlier return
assert(evbuffer_get_length(rx)<8);
// wait for next header
bufferevent_setwatermark(bev.get(), EV_READ, 8, tcp_readahead);
bufferevent_setwatermark(bev.get(), EV_READ, 8, readahead);
bufferevent_enable(bev.get(), EV_READ);
} else {
+1 -6
View File
@@ -13,12 +13,6 @@
namespace pvxs {
namespace impl {
// Amount of following messages which we allow to be read while
// processing the current message. Avoids some extra recv() calls,
// at the price of maybe extra copying.
// Also bounds the loop in ConnBase::bevRead()
constexpr size_t tcp_readahead = 0x1000u;
struct ConnBase
{
const SockAddr peerAddr;
@@ -47,6 +41,7 @@ public:
evbuf segBuf, txBody;
size_t statTx{}, statRx{};
size_t readahead{};
enum {
Holdoff,
+12
View File
@@ -635,6 +635,18 @@ std::vector<SockAddr> evsocket::broadcasts(const SockAddr* match) const
return ret;
}
size_t evsocket::get_buffer_size(evutil_socket_t sock, bool tx)
{
unsigned ret;
socklen_t len(sizeof(ret));
auto err = getsockopt(sock, SOL_SOCKET, tx ? SO_SNDBUF : SO_RCVBUF, (char*)&ret, &len);
if(err<0 || len!=sizeof(ret)) {
int err = evutil_socket_geterror(sock);
throw std::system_error(err, std::system_category());
}
return ret;
}
#if defined(_WIN32) && !defined(EAFNOSUPPORT)
# define EAFNOSUPPORT WSAESOCKTNOSUPPORT
#endif
+3
View File
@@ -247,6 +247,9 @@ struct PVXS_API evsocket
//! wraps osiSockDiscoverBroadcastAddresses()
std::vector<SockAddr> broadcasts(const SockAddr* match=nullptr) const;
static
size_t get_buffer_size(evutil_socket_t sock, bool tx);
static
bool canIPv6;
+6 -3
View File
@@ -15,8 +15,9 @@
#include <pvxs/log.h>
#include "serverconn.h"
// limit on size of TX buffer above which we suspend RX
static constexpr size_t tcp_tx_limit = 0x100000;
// limit on size of TX buffer above which we suspend RX.
// defined as multiple of OS socket TX buffer size
static constexpr size_t tcp_tx_limit_mult = 2u;
namespace pvxs {
namespace server {
@@ -48,8 +49,10 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *
bufferevent_socket_new(iface->server->acceptor_loop.base, sock, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS),
SockAddr(peer))
,iface(iface)
,tcp_tx_limit(evsocket::get_buffer_size(sock, true) * tcp_tx_limit_mult)
{
log_debug_printf(connio, "Client %s connects\n", peerName.c_str());
log_debug_printf(connio, "Client %s connects, RX readahead %zu TX limit %zu\n",
peerName.c_str(), readahead, tcp_tx_limit);
{
auto cred(std::make_shared<server::ClientCredentials>());
+1
View File
@@ -108,6 +108,7 @@ struct ServerChan
struct ServerConn : public ConnBase, public std::enable_shared_from_this<ServerConn>
{
ServIface* const iface;
const size_t tcp_tx_limit;
std::shared_ptr<const server::ClientCredentials> cred;
+7 -1
View File
@@ -98,6 +98,12 @@ void test_udp(int af)
SockAddr bind_addr(SockAddr::loopback(af));
{
auto rxbuf = evsocket::get_buffer_size(A.sock, false);
auto txbuf = evsocket::get_buffer_size(A.sock, true);
testOk(rxbuf>0 && txbuf>0, "non-zero OS socket buffer sizes %zu, %zu\n", rxbuf, txbuf);
}
A.enable_IP_PKTINFO();
try{
A.bind(bind_addr);
@@ -382,7 +388,7 @@ MAIN(testsock)
{
SockAttach attach;
logger_config_env();
testPlan(66);
testPlan(68);
testSetup();
// check for behavior when binding ipv4 and ipv6 to the same socket
// as a function of socket type and order.