server monitor throttle using send queue size
This commit is contained in:
+19
-2
@@ -448,14 +448,31 @@ void ServerConn::bevRead()
|
||||
// TODO configure
|
||||
(void)bufferevent_disable(bev.get(), EV_READ);
|
||||
bufferevent_setwatermark(bev.get(), EV_WRITE, 0x100000/2, 0);
|
||||
log_printf(connio, Debug, "%s suspend READ\n", peerName.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ServerConn::bevWrite()
|
||||
{
|
||||
(void)bufferevent_enable(bev.get(), EV_READ);
|
||||
bufferevent_setwatermark(bev.get(), EV_WRITE, 0, 0);
|
||||
log_printf(connio, Debug, "%s process backlog\n", peerName.c_str());
|
||||
|
||||
auto tx = bufferevent_get_output(bev.get());
|
||||
// handle pending monitors
|
||||
|
||||
while(!backlog.empty() && evbuffer_get_length(tx)<0x100000) {
|
||||
auto fn = std::move(backlog.front());
|
||||
backlog.pop_front();
|
||||
|
||||
fn();
|
||||
}
|
||||
|
||||
// TODO configure
|
||||
if(evbuffer_get_length(tx)<0x100000) {
|
||||
(void)bufferevent_enable(bev.get(), EV_READ);
|
||||
bufferevent_setwatermark(bev.get(), EV_WRITE, 0, 0);
|
||||
log_printf(connio, Debug, "%s resume READ\n", peerName.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
void ServerConn::bevEventS(struct bufferevent *bev, short events, void *ptr)
|
||||
|
||||
@@ -113,6 +113,8 @@ struct ServerConn : public std::enable_shared_from_this<ServerConn>
|
||||
std::map<uint32_t, std::shared_ptr<ServerChan> > chanByCID;
|
||||
std::map<uint32_t, std::shared_ptr<ServerOp> > opByIOID;
|
||||
|
||||
std::list<std::function<void()>> backlog;
|
||||
|
||||
ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen);
|
||||
ServerConn(const ServerConn&) = delete;
|
||||
ServerConn& operator=(const ServerConn&) = delete;
|
||||
|
||||
+16
-1
@@ -59,11 +59,26 @@ struct MonitorOp : public ServerOp,
|
||||
static
|
||||
void maybeReply(server::Server::Pvt* server, const std::shared_ptr<MonitorOp>& op)
|
||||
{
|
||||
// can we send a reply?
|
||||
if(!op->scheduled && op->state==Executing && !op->queue.empty() && (!op->pipeline || op->window))
|
||||
{
|
||||
// based on operation state, yes
|
||||
server->acceptor_loop.dispatch([op](){
|
||||
op->doReply();
|
||||
auto ch(op->chan.lock());
|
||||
if(!ch)
|
||||
return;
|
||||
auto conn(ch->conn.lock());
|
||||
if(!conn)
|
||||
return;
|
||||
|
||||
if(conn->bev && (bufferevent_get_enabled(conn->bev.get())&EV_READ)) {
|
||||
op->doReply();
|
||||
} else {
|
||||
// connection TX queue is too full
|
||||
conn->backlog.push_back(std::bind(&MonitorOp::doReply, op));
|
||||
}
|
||||
});
|
||||
|
||||
op->scheduled = true;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user