diff --git a/src/client.cpp b/src/client.cpp index b062684..4b6f672 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -79,19 +79,7 @@ Channel::Channel(const std::shared_ptr& context, const std::string& Channel::~Channel() { - context->chanByCID.erase(cid); - // searchBuckets cleaned in tickSearch() - if((state==Creating || state==Active) && conn && conn->bev) { - { - (void)evbuffer_drain(conn->txBody.get(), evbuffer_get_length(conn->txBody.get())); - - EvOutBuf R(hostBE, conn->txBody.get()); - - to_wire(R, sid); - to_wire(R, cid); - } - statTx += conn->enqueueTxBody(CMD_DESTROY_CHANNEL); - } + disconnect(nullptr); } void Channel::createOperations() @@ -123,36 +111,77 @@ void Channel::createOperations() } } +// call on disconnect or CMD_DESTROY_CHANNEL +// detach from Connection and notify Connect and *Op void Channel::disconnect(const std::shared_ptr& self) { - self->state = Channel::Searching; - self->sid = 0xdeadbeef; // spoil + assert(!self || this==self.get()); + auto current(std::move(conn)); + + switch(state) { + case Channel::Connecting: + current->pending.erase(cid); + break; + case Channel::Creating: + current->creatingByCID.erase(cid); + break; + case Channel::Active: + current->chanBySID.erase(sid); + break; + default: + break; + } + + if((state==Creating || state==Active) && current && current->bev) { + { + (void)evbuffer_drain(current->txBody.get(), evbuffer_get_length(current->txBody.get())); + + EvOutBuf R(hostBE, current->txBody.get()); + + to_wire(R, sid); + to_wire(R, cid); + } + statTx += current->enqueueTxBody(CMD_DESTROY_CHANNEL); + } + + state = Channel::Searching; + sid = 0xdeadbeef; // spoil auto conns(connectors); // copy list - for(auto& conn : conns) { - conn->_connected.store(false, std::memory_order_relaxed); - if(conn->_onDis) - conn->_onDis(); + for(auto& interested : conns) { + if(interested->_connected.exchange(false, std::memory_order_relaxed) && interested->_onDis) + interested->_onDis(); } - if(forcedServer.family()==AF_UNSPEC) { + auto ops(std::move(opByIOID)); + for(auto& pair : ops) { + auto op = pair.second->handle.lock(); + current->opByIOID.erase(pair.first); + if(op) + op->disconnected(op); + } + + if(!self) { // in ~Channel + // searchBuckets cleaned in tickSearch() + + } else if(forcedServer.family()==AF_UNSPEC) { // begin search context->searchBuckets[context->currentBucket].push_back(self); log_debug_printf(io, "Server %s detach channel '%s' to re-search\n", - conn ? conn->peerName.c_str() : "", - self->name.c_str()); + current ? current->peerName.c_str() : "", + name.c_str()); } else { // reconnect to specific server // TODO: holdoff to prevent fast reconnect loop - self->conn = Connection::build(context, self->forcedServer); + conn = Connection::build(context, forcedServer); - self->conn->pending.push_back(self); - self->state = Connecting; + conn->pending[cid] = self; + state = Connecting; - self->conn->createChannels(); + conn->createChannels(); } } @@ -309,7 +338,7 @@ std::shared_ptr Channel::build(const std::shared_ptr& cont chan->forcedServer = forceServer; chan->conn = Connection::build(context, forceServer); - chan->conn->pending.push_back(chan); + chan->conn->pending[chan->cid] = chan; chan->state = Connecting; chan->conn->createChannels(); @@ -350,15 +379,16 @@ void Context::hurryUp() }); } -void Context::cacheClear(const std::string& name) +void Context::cacheClear(const std::string& name, cacheAction action) { if(!pvt) throw std::logic_error("NULL Context"); - pvt->impl->tcp_loop.call([this, name](){ + pvt->impl->tcp_loop.call([this, name, action](){ // run twice to ensure both mark and sweep of all unused channels - pvt->impl->cacheClean(name); - pvt->impl->cacheClean(name); + log_debug_printf(setup, "cacheClear('%s')\n", name.c_str()); + pvt->impl->cacheClean(name, action); + pvt->impl->cacheClean(name, action); }); } @@ -668,7 +698,7 @@ void procSearchReply(ContextImpl& self, const SockAddr& src, Buffer& M, bool ist chan->conn = Connection::build(self.shared_from_this(), serv); - chan->conn->pending.push_back(chan); + chan->conn->pending[chan->cid] = chan; chan->state = Channel::Connecting; chan->conn->createChannels(); @@ -1010,37 +1040,46 @@ void ContextImpl::onNSCheckS(evutil_socket_t fd, short evt, void *raw) } } -void ContextImpl::cacheClean(const std::string& name) +void ContextImpl::cacheClean(const std::string& name, Context::cacheAction action) { - std::set trash; + auto next(chanByName.begin()), + end(chanByName.end()); - for(auto& pair : chanByName) { - if(!name.empty() && pair.first.first!=name) - continue; // skip + while(next!=end) { + auto cur(next++); - if(pair.second.use_count()<=1) { - if(!pair.second->garbage) { + if(!name.empty() && cur->first.first!=name) + continue; + + else if(action!=Context::Clean || cur->second.use_count()<=1) { + cur->second->garbage = true; + + if(action==Context::Clean && !cur->second->garbage) { // mark for next sweep - log_debug_printf(setup, "Chan GC mark '%s':'%s'\n", pair.first.first.c_str(), pair.first.second.c_str()); - pair.second->garbage = true; + log_debug_printf(setup, "Chan GC mark '%s':'%s'\n", + cur->first.first.c_str(), cur->first.second.c_str()); } else { - // sweep - trash.insert(pair.first); + log_debug_printf(setup, "Chan GC sweep '%s':'%s'\n", + cur->first.first.c_str(), cur->first.second.c_str()); + + auto trash(std::move(cur->second)); + + // explicitly break ref. loop of channel cache + chanByName.erase(cur); + + if(action==Context::Disconnect) { + trash->disconnect(trash); + } } } } - - // explicitly break ref. loop of channel cache - for(auto& key : trash) { - chanByName.erase(key); - log_debug_printf(setup, "Chan GC sweep '%s':'%s'\n", key.first.c_str(), key.second.c_str()); - } } void ContextImpl::cacheCleanS(evutil_socket_t fd, short evt, void *raw) { try { + static_cast(raw)->cacheClean(std::string(), Context::Clean); static_cast(raw)->tickBeaconClean(); }catch(std::exception& e){ log_exc_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what()); diff --git a/src/clientconn.cpp b/src/clientconn.cpp index ed4cec7..33f7b2c 100644 --- a/src/clientconn.cpp +++ b/src/clientconn.cpp @@ -59,9 +59,9 @@ void Connection::createChannels() auto todo = std::move(pending); - for(auto& wchan : todo) { - auto chan = wchan.lock(); - if(!chan) + for(auto& pair : todo) { + auto chan = pair.second.lock(); + if(!chan || chan->state!=Channel::Connecting) continue; { @@ -137,36 +137,26 @@ void Connection::cleanup() log_err_printf(io, "Server %s error stopping echoTimer\n", peerName.c_str()); // return Channels to Searching state - for(auto& wchan : pending) { - auto chan = wchan.lock(); - if(!chan) - continue; - - chan->disconnect(chan); + std::set> todo; + for(auto& pair : pending) { + if(auto chan = pair.second.lock()) + todo.insert(chan); } for(auto& pair : chanBySID) { - auto chan = pair.second.lock(); - if(!chan) - continue; - - chan->disconnect(chan); + if(auto chan = pair.second.lock()) + todo.insert(chan); } for(auto& pair : creatingByCID) { - auto chan = pair.second.lock(); - if(!chan) - continue; + if(auto chan = pair.second.lock()) + todo.insert(chan); + } + for(auto& chan : todo) { chan->disconnect(chan); } - auto ops = std::move(opByIOID); - for (auto& pair : ops) { - auto op = pair.second.handle.lock(); - if(!op) - continue; - op->chan->opByIOID.erase(op->ioid); - op->disconnected(op); - } + // Channel::disconnect() should clean + assert(opByIOID.empty()); // paranoia pending.clear(); @@ -361,8 +351,7 @@ void Connection::handle_CREATE_CHANNEL() auto conns(chan->connectors); // copy list for(auto& conn : conns) { - conn->_connected.store(true, std::memory_order_relaxed); - if(conn->_onConn) + if(!conn->_connected.exchange(true, std::memory_order_relaxed) && conn->_onConn) conn->_onConn(); } } @@ -370,20 +359,19 @@ void Connection::handle_CREATE_CHANNEL() void Connection::handle_DESTROY_CHANNEL() { - // (maybe) keep myself alive - std::shared_ptr self; - - EvInBuf M(peerBE, segBuf.get(), 16); - uint32_t cid=0, sid=0; - from_wire(M, sid); - from_wire(M, cid); + { + EvInBuf M(peerBE, segBuf.get(), 16); - if(!M.good()) { - log_crit_printf(io, "%s:%d Server %s sends invalid DESTROY_CHANNEL. Disconnecting...\n", - M.file(), M.line(), peerName.c_str()); - bev.reset(); - return; + from_wire(M, sid); + from_wire(M, cid); + + if(!M.good()) { + log_crit_printf(io, "%s:%d Server %s sends invalid DESTROY_CHANNEL. Disconnecting...\n", + M.file(), M.line(), peerName.c_str()); + bev.reset(); + return; + } } std::shared_ptr chan; @@ -397,17 +385,7 @@ void Connection::handle_DESTROY_CHANNEL() } chanBySID.erase(sid); - - chan->state = Channel::Searching; - chan->sid = 0xdeadbeef; // spoil - self = std::move(chan->conn); - context->searchBuckets[context->currentBucket].push_back(chan); - - for(auto& pair : chan->opByIOID) { - auto op = pair.second->handle.lock(); - opByIOID.erase(pair.first); // invalidates pair.second - op->disconnected(op); - } + chan->disconnect(chan); log_debug_printf(io, "Server %s destroys channel '%s' %u:%u\n", peerName.c_str(), chan->name.c_str(), unsigned(cid), unsigned(sid)); diff --git a/src/clientimpl.h b/src/clientimpl.h index 68c3f0d..ede2bda 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -80,8 +80,8 @@ struct Connection : public ConnBase, public std::enable_shared_from_this> pending; + // channels to be created on this Connection in state==Connecting + std::map> pending; std::map> creatingByCID, // in state==Creating chanBySID; // in state==Active @@ -241,7 +241,7 @@ struct ContextImpl : public std::enable_shared_from_this std::map> chanByCID; // strong ref. loop through Channel::context - // explicitly broken by Context::close(), Context::cacheClear, or ContextImpl::cacheClean() + // explicitly broken by Context::close(), Context::cacheClear(), or ContextImpl::cacheClean() // chanByName key'd by (pv, forceServer) std::map, std::shared_ptr> chanByName; @@ -280,7 +280,7 @@ struct ContextImpl : public std::enable_shared_from_this static void tickSearchS(evutil_socket_t fd, short evt, void *raw); void tickBeaconClean(); static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw); - void cacheClean(const std::string &name); + void cacheClean(const std::string &name, Context::cacheAction force); static void cacheCleanS(evutil_socket_t fd, short evt, void *raw); void onNSCheck(); static void onNSCheckS(evutil_socket_t fd, short evt, void *raw); diff --git a/src/pvxs/client.h b/src/pvxs/client.h index 0448752..6d44a01 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -479,16 +479,23 @@ public: */ void hurryUp(); +#ifdef PVXS_EXPERT_API_ENABLED + //! Actions of cacheClear() + //! @since UNRELEASED + enum cacheAction { + Clean, //!< Remove channel(s) if unused. Optional for user code. + Drop, //!< Remove channel(s) unconditionally. Prevents reuse of open channel(s). + Disconnect, //!< Remove channels(s) unconditionally, and cancel any in-progress operations. + }; + /** Channel cache maintenance. * - * @param name If empty (default), remove all channels from cache. - * If not empty, only remove the named channel. + * @param action cf. cacheAction * - * @since UNRELEASED 'name' argument added. + * @since UNRELEASED 'name' and 'action' arguments. Defaults to previous behavior. */ - void cacheClear(const std::string& name = std::string()); + void cacheClear(const std::string& name = std::string(), cacheAction action = Clean); -#ifdef PVXS_EXPERT_API_ENABLED //! Compile report about peers and channels //! @since UNRELEASED Report report() const; diff --git a/test/testinfo.cpp b/test/testinfo.cpp index 8e5aa53..8c7e2da 100644 --- a/test/testinfo.cpp +++ b/test/testinfo.cpp @@ -4,6 +4,8 @@ * in file LICENSE that is included with this distribution. */ +#define PVXS_ENABLE_EXPERT_API + #include #include diff --git a/test/testrpc.cpp b/test/testrpc.cpp index e3e0386..7ce192c 100644 --- a/test/testrpc.cpp +++ b/test/testrpc.cpp @@ -4,6 +4,8 @@ * in file LICENSE that is included with this distribution. */ +#define PVXS_ENABLE_EXPERT_API + #include #include