From fedbec649b8d81eec7955a1602adebbe5a89ca9b Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 21 Feb 2023 11:57:25 -0800 Subject: [PATCH] server: rework cleanup of connection, channel, and operation --- src/serverchan.cpp | 71 ++++++++++++++++++---------------------------- src/serverconn.cpp | 70 ++++++++++++++++++++++++++++++++++++--------- src/serverconn.h | 3 ++ src/serverget.cpp | 16 +---------- src/servermon.cpp | 39 +++++++++++-------------- 5 files changed, 104 insertions(+), 95 deletions(-) diff --git a/src/serverchan.cpp b/src/serverchan.cpp index d1d867e..91b322f 100644 --- a/src/serverchan.cpp +++ b/src/serverchan.cpp @@ -36,6 +36,30 @@ ServerChan::~ServerChan() { assert(!onClose); } +/* reached from: + * 1. connection close + * 2. DESTROY_CHANNEL + * 3. local user calls ServerChannelControl::close() + */ +void ServerChan::cleanup() +{ + if(state==ServerChan::Destroy) + return; + state = ServerChan::Destroy; + + { + auto ops(std::move(opByIOID)); + for(auto& op : ops) { + // removes from conn->opByIOID + op.second->cleanup(); + } + } + + auto fn(std::move(onClose)); + if(fn) + fn(""); +} + ServerChannelControl::ServerChannelControl(const std::shared_ptr &conn, const std::shared_ptr& channel) :server::ChannelControl(channel->name, conn->cred, None) ,server(conn->iface->server->internal_self) @@ -104,45 +128,6 @@ void ServerChannelControl::onClose(std::function&& fn) }); } -static -void ServerChannel_shutdown(const std::shared_ptr& chan) -{ - if(chan->state==ServerChan::Destroy) - return; - - chan->state = ServerChan::Destroy; - - if(auto conn = chan->conn.lock()) { - - conn->chanBySID.erase(chan->sid); - - for(auto& pair : chan->opByIOID) { - auto op = pair.second; - if(op->state==ServerOp::Dead) - continue; - - if(op->state==ServerOp::Executing && op->onCancel) - op->onCancel(); - - op->state = ServerOp::Dead; - - if(op->onClose) { - auto fn(std::move(op->onClose)); - fn(""); - } - - conn->opByIOID.erase(op->ioid); - } - } - - chan->opByIOID.clear(); - - if(chan->onClose) { - auto fn(std::move(chan->onClose)); - fn(""); - } -} - void ServerChannelControl::close() { // fail soft if server stopped, or channel/connection already closed @@ -167,7 +152,8 @@ void ServerChannelControl::close() conn->statTx += 16u; ch->statTx += 16u; } - ServerChannel_shutdown(ch); + + ch->cleanup(); }); } @@ -407,10 +393,7 @@ void ServerConn::handle_DESTROY_CHANNEL() unsigned(sid), unsigned(chan->cid), unsigned(cid), chan->name.c_str()); } - ServerChannel_shutdown(chan); - - assert(chan.use_count()==1); // we only take transient refs on this thread - // ServerChannel is delete'd + chan->cleanup(); { auto tx = bufferevent_get_output(bev.get()); diff --git a/src/serverconn.cpp b/src/serverconn.cpp index 795f07d..8a1bf4d 100644 --- a/src/serverconn.cpp +++ b/src/serverconn.cpp @@ -280,10 +280,7 @@ void ServerConn::handle_DESTROY_REQUEST() if(it!=opByIOID.end()) { auto op = it->second; opByIOID.erase(it); - op->state = ServerOp::Dead; - - if(op->onClose) - op->onClose(""); + op->cleanup(); } } @@ -326,22 +323,25 @@ std::shared_ptr ServerConn::self_from_this() return shared_from_this(); } +// see also ServerChannel_shutdown() +/* reached from: + * 1. connection close + */ void ServerConn::cleanup() { log_debug_printf(connsetup, "Client %s Cleanup TCP Connection\n", peerName.c_str()); iface->server->connections.erase(this); - for(auto& pair : opByIOID) { - if(pair.second->onClose) - pair.second->onClose(""); + // grab maps before cleanup()s would modify + auto ops(std::move(opByIOID)); + auto chans(std::move(chanBySID)); + + for(auto& op : ops) { + op.second->cleanup(); } - for(auto& pair : chanBySID) { - pair.second->state = ServerChan::Destroy; - if(pair.second->onClose) { - auto fn(std::move(pair.second->onClose)); - fn(""); - } + for(auto& pair : chans) { + pair.second->cleanup(); } } @@ -449,4 +449,48 @@ void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, s ServerOp::~ServerOp() {} +/* reached from: + * 1. connection close + * 2. DESTROY_CHANNEL + * 3. DESTROY_REQUEST + * 4. individual op DESTROY + * 5. local user calls ServerChannelControl::close() + */ +void ServerOp::cleanup() +{ + if(state==ServerOp::Dead) + return; + + if(state==ServerOp::Executing && onCancel) { + auto fn(std::move(onCancel)); + fn(); + } + + state = ServerOp::Dead; + + decltype (onClose) closer; + if(onClose) { + closer = std::move(onClose); + } + bool notify = closer.operator bool(); + + if(auto ch = chan.lock()) { + ch->opByIOID.erase(ioid); + + if(auto conn = ch->conn.lock()) { + conn->opByIOID.erase(ioid); + + if(notify) { + conn->iface->server->acceptor_loop.dispatch([closer](){ + closer(""); + }); + notify = false; + } + } + } + + if(notify) + closer(""); +} + }} // namespace pvxs::impl diff --git a/src/serverconn.h b/src/serverconn.h index 0af6f0f..747be17 100644 --- a/src/serverconn.h +++ b/src/serverconn.h @@ -51,6 +51,7 @@ struct ServerOp ServerOp& operator=(const ServerOp&) = delete; virtual ~ServerOp() =0; + void cleanup(); virtual void show(std::ostream& strm) const =0; }; @@ -103,6 +104,8 @@ struct ServerChan ServerChan(const ServerChan&) = delete; ServerChan& operator=(const ServerChan&) = delete; ~ServerChan(); + + void cleanup(); }; struct ServerConn : public ConnBase, public std::enable_shared_from_this diff --git a/src/serverget.cpp b/src/serverget.cpp index f8f1e9e..ab63bdf 100644 --- a/src/serverget.cpp +++ b/src/serverget.cpp @@ -119,21 +119,7 @@ struct ServerGPR : public ServerOp ch->statTx += conn->enqueueTxBody(cmd); if(state == ServerOp::Dead) { - ch->opByIOID.erase(ioid); - auto it = conn->opByIOID.find(ioid); - if(it!=conn->opByIOID.end()) { - auto self(it->second); - conn->opByIOID.erase(it); - - if(self->onClose) - conn->iface->server->acceptor_loop.dispatch([self](){ - self->onClose(""); - }); - - } else { - assert(false); // really shouldn't happen - } - conn->opByIOID.erase(ioid); + cleanup(); } } diff --git a/src/servermon.cpp b/src/servermon.cpp index 3609a15..32af592 100644 --- a/src/servermon.cpp +++ b/src/servermon.cpp @@ -29,7 +29,17 @@ struct MonitorOp : public ServerOp, { MonitorOp(const std::shared_ptr& chan, uint32_t ioid) :ServerOp(chan, ioid) - {} + { + // ServerOp::onCancel isn't exposed to users for MONITOR + // so we can (ab)use for internal cleanup. + onCancel = [this]() { + if(state == Executing) { + if(onStart) + onStart(false); + state = Idle; + } + }; + } virtual ~MonitorOp() {} // only access from accepter worker thread @@ -153,21 +163,7 @@ struct MonitorOp : public ServerOp, ch->statTx += conn->enqueueTxBody(pva_app_msg_t::CMD_MONITOR); if(state == ServerOp::Dead) { - ch->opByIOID.erase(ioid); - auto it = conn->opByIOID.find(ioid); - if(it!=conn->opByIOID.end()) { - auto self(it->second); - conn->opByIOID.erase(it); - - if(self->onClose) - conn->iface->server->acceptor_loop.dispatch([self](){ - self->onClose(""); - }); - - } else { - assert(false); // really shouldn't happen - } - conn->opByIOID.erase(ioid); + cleanup(); return; } @@ -581,15 +577,12 @@ void ServerConn::handle_MONITOR() auto self(it->second); opByIOID.erase(it); - if(self->onClose) { - iface->server->acceptor_loop.dispatch([self](){ - if(self->onClose) - self->onClose(""); - }); - } + iface->server->acceptor_loop.dispatch([self](){ + self->cleanup(); + }); } else { - assert(false); // really shouldn't happen + log_exc_printf(connsetup, "Logic error in %s w/ 0x%x\n", __func__, subcmd); } opByIOID.erase(ioid); }