client: consistent Channel disconnect handling

More commonality betwee disconnect and CMD_DESTROY_CHANNEL.

squash! client: consistent Channel disconnect handling

Periodic cache cleaning
This commit is contained in:
Michael Davidsaver
2021-07-02 16:41:32 -07:00
parent 5d3a21f030
commit f7b3821e10
6 changed files with 136 additions and 108 deletions
+88 -49
View File
@@ -79,19 +79,7 @@ Channel::Channel(const std::shared_ptr<ContextImpl>& context, const std::string&
Channel::~Channel()
{
context->chanByCID.erase(cid);
// searchBuckets cleaned in tickSearch()
if((state==Creating || state==Active) && conn && conn->bev) {
{
(void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get()));
EvOutBuf R(hostBE, conn->txBody.get());
to_wire(R, sid);
to_wire(R, cid);
}
statTx += conn->enqueueTxBody(CMD_DESTROY_CHANNEL);
}
disconnect(nullptr);
}
void Channel::createOperations()
@@ -123,36 +111,77 @@ void Channel::createOperations()
}
}
// call on disconnect or CMD_DESTROY_CHANNEL
// detach from Connection and notify Connect and *Op
void Channel::disconnect(const std::shared_ptr<Channel>& self)
{
self->state = Channel::Searching;
self->sid = 0xdeadbeef; // spoil
assert(!self || this==self.get());
auto current(std::move(conn));
switch(state) {
case Channel::Connecting:
current->pending.erase(cid);
break;
case Channel::Creating:
current->creatingByCID.erase(cid);
break;
case Channel::Active:
current->chanBySID.erase(sid);
break;
default:
break;
}
if((state==Creating || state==Active) && current && current->bev) {
{
(void)evbuffer_drain(current->txBody.get(), evbuffer_get_length(current->txBody.get()));
EvOutBuf R(hostBE, current->txBody.get());
to_wire(R, sid);
to_wire(R, cid);
}
statTx += current->enqueueTxBody(CMD_DESTROY_CHANNEL);
}
state = Channel::Searching;
sid = 0xdeadbeef; // spoil
auto conns(connectors); // copy list
for(auto& conn : conns) {
conn->_connected.store(false, std::memory_order_relaxed);
if(conn->_onDis)
conn->_onDis();
for(auto& interested : conns) {
if(interested->_connected.exchange(false, std::memory_order_relaxed) && interested->_onDis)
interested->_onDis();
}
if(forcedServer.family()==AF_UNSPEC) {
auto ops(std::move(opByIOID));
for(auto& pair : ops) {
auto op = pair.second->handle.lock();
current->opByIOID.erase(pair.first);
if(op)
op->disconnected(op);
}
if(!self) { // in ~Channel
// searchBuckets cleaned in tickSearch()
} else if(forcedServer.family()==AF_UNSPEC) { // begin search
context->searchBuckets[context->currentBucket].push_back(self);
log_debug_printf(io, "Server %s detach channel '%s' to re-search\n",
conn ? conn->peerName.c_str() : "<disconnected>",
self->name.c_str());
current ? current->peerName.c_str() : "<disconnected>",
name.c_str());
} else { // reconnect to specific server
// TODO: holdoff to prevent fast reconnect loop
self->conn = Connection::build(context, self->forcedServer);
conn = Connection::build(context, forcedServer);
self->conn->pending.push_back(self);
self->state = Connecting;
conn->pending[cid] = self;
state = Connecting;
self->conn->createChannels();
conn->createChannels();
}
}
@@ -309,7 +338,7 @@ std::shared_ptr<Channel> Channel::build(const std::shared_ptr<ContextImpl>& cont
chan->forcedServer = forceServer;
chan->conn = Connection::build(context, forceServer);
chan->conn->pending.push_back(chan);
chan->conn->pending[chan->cid] = chan;
chan->state = Connecting;
chan->conn->createChannels();
@@ -350,15 +379,16 @@ void Context::hurryUp()
});
}
void Context::cacheClear(const std::string& name)
void Context::cacheClear(const std::string& name, cacheAction action)
{
if(!pvt)
throw std::logic_error("NULL Context");
pvt->impl->tcp_loop.call([this, name](){
pvt->impl->tcp_loop.call([this, name, action](){
// run twice to ensure both mark and sweep of all unused channels
pvt->impl->cacheClean(name);
pvt->impl->cacheClean(name);
log_debug_printf(setup, "cacheClear('%s')\n", name.c_str());
pvt->impl->cacheClean(name, action);
pvt->impl->cacheClean(name, action);
});
}
@@ -668,7 +698,7 @@ void procSearchReply(ContextImpl& self, const SockAddr& src, Buffer& M, bool ist
chan->conn = Connection::build(self.shared_from_this(), serv);
chan->conn->pending.push_back(chan);
chan->conn->pending[chan->cid] = chan;
chan->state = Channel::Connecting;
chan->conn->createChannels();
@@ -1010,37 +1040,46 @@ void ContextImpl::onNSCheckS(evutil_socket_t fd, short evt, void *raw)
}
}
void ContextImpl::cacheClean(const std::string& name)
void ContextImpl::cacheClean(const std::string& name, Context::cacheAction action)
{
std::set<decltype (chanByName)::key_type> trash;
auto next(chanByName.begin()),
end(chanByName.end());
for(auto& pair : chanByName) {
if(!name.empty() && pair.first.first!=name)
continue; // skip
while(next!=end) {
auto cur(next++);
if(pair.second.use_count()<=1) {
if(!pair.second->garbage) {
if(!name.empty() && cur->first.first!=name)
continue;
else if(action!=Context::Clean || cur->second.use_count()<=1) {
cur->second->garbage = true;
if(action==Context::Clean && !cur->second->garbage) {
// mark for next sweep
log_debug_printf(setup, "Chan GC mark '%s':'%s'\n", pair.first.first.c_str(), pair.first.second.c_str());
pair.second->garbage = true;
log_debug_printf(setup, "Chan GC mark '%s':'%s'\n",
cur->first.first.c_str(), cur->first.second.c_str());
} else {
// sweep
trash.insert(pair.first);
log_debug_printf(setup, "Chan GC sweep '%s':'%s'\n",
cur->first.first.c_str(), cur->first.second.c_str());
auto trash(std::move(cur->second));
// explicitly break ref. loop of channel cache
chanByName.erase(cur);
if(action==Context::Disconnect) {
trash->disconnect(trash);
}
}
}
}
// explicitly break ref. loop of channel cache
for(auto& key : trash) {
chanByName.erase(key);
log_debug_printf(setup, "Chan GC sweep '%s':'%s'\n", key.first.c_str(), key.second.c_str());
}
}
void ContextImpl::cacheCleanS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<ContextImpl*>(raw)->cacheClean(std::string(), Context::Clean);
static_cast<ContextImpl*>(raw)->tickBeaconClean();
}catch(std::exception& e){
log_exc_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what());