server: rework cleanup of connection, channel, and operation

This commit is contained in:
Michael Davidsaver
2023-02-21 11:57:25 -08:00
parent 9c546f0ed4
commit fedbec649b
5 changed files with 104 additions and 95 deletions
+27 -44
View File
@@ -36,6 +36,30 @@ ServerChan::~ServerChan() {
assert(!onClose);
}
/* reached from:
* 1. connection close
* 2. DESTROY_CHANNEL
* 3. local user calls ServerChannelControl::close()
*/
void ServerChan::cleanup()
{
if(state==ServerChan::Destroy)
return;
state = ServerChan::Destroy;
{
auto ops(std::move(opByIOID));
for(auto& op : ops) {
// removes from conn->opByIOID
op.second->cleanup();
}
}
auto fn(std::move(onClose));
if(fn)
fn("");
}
ServerChannelControl::ServerChannelControl(const std::shared_ptr<ServerConn> &conn, const std::shared_ptr<ServerChan>& channel)
:server::ChannelControl(channel->name, conn->cred, None)
,server(conn->iface->server->internal_self)
@@ -104,45 +128,6 @@ void ServerChannelControl::onClose(std::function<void(const std::string&)>&& fn)
});
}
static
void ServerChannel_shutdown(const std::shared_ptr<ServerChan>& chan)
{
if(chan->state==ServerChan::Destroy)
return;
chan->state = ServerChan::Destroy;
if(auto conn = chan->conn.lock()) {
conn->chanBySID.erase(chan->sid);
for(auto& pair : chan->opByIOID) {
auto op = pair.second;
if(op->state==ServerOp::Dead)
continue;
if(op->state==ServerOp::Executing && op->onCancel)
op->onCancel();
op->state = ServerOp::Dead;
if(op->onClose) {
auto fn(std::move(op->onClose));
fn("");
}
conn->opByIOID.erase(op->ioid);
}
}
chan->opByIOID.clear();
if(chan->onClose) {
auto fn(std::move(chan->onClose));
fn("");
}
}
void ServerChannelControl::close()
{
// fail soft if server stopped, or channel/connection already closed
@@ -167,7 +152,8 @@ void ServerChannelControl::close()
conn->statTx += 16u;
ch->statTx += 16u;
}
ServerChannel_shutdown(ch);
ch->cleanup();
});
}
@@ -407,10 +393,7 @@ void ServerConn::handle_DESTROY_CHANNEL()
unsigned(sid), unsigned(chan->cid), unsigned(cid), chan->name.c_str());
}
ServerChannel_shutdown(chan);
assert(chan.use_count()==1); // we only take transient refs on this thread
// ServerChannel is delete'd
chan->cleanup();
{
auto tx = bufferevent_get_output(bev.get());
+57 -13
View File
@@ -280,10 +280,7 @@ void ServerConn::handle_DESTROY_REQUEST()
if(it!=opByIOID.end()) {
auto op = it->second;
opByIOID.erase(it);
op->state = ServerOp::Dead;
if(op->onClose)
op->onClose("");
op->cleanup();
}
}
@@ -326,22 +323,25 @@ std::shared_ptr<ConnBase> ServerConn::self_from_this()
return shared_from_this();
}
// see also ServerChannel_shutdown()
/* reached from:
* 1. connection close
*/
void ServerConn::cleanup()
{
log_debug_printf(connsetup, "Client %s Cleanup TCP Connection\n", peerName.c_str());
iface->server->connections.erase(this);
for(auto& pair : opByIOID) {
if(pair.second->onClose)
pair.second->onClose("");
// grab maps before cleanup()s would modify
auto ops(std::move(opByIOID));
auto chans(std::move(chanBySID));
for(auto& op : ops) {
op.second->cleanup();
}
for(auto& pair : chanBySID) {
pair.second->state = ServerChan::Destroy;
if(pair.second->onClose) {
auto fn(std::move(pair.second->onClose));
fn("");
}
for(auto& pair : chans) {
pair.second->cleanup();
}
}
@@ -449,4 +449,48 @@ void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, s
ServerOp::~ServerOp() {}
/* reached from:
* 1. connection close
* 2. DESTROY_CHANNEL
* 3. DESTROY_REQUEST
* 4. individual op DESTROY
* 5. local user calls ServerChannelControl::close()
*/
void ServerOp::cleanup()
{
if(state==ServerOp::Dead)
return;
if(state==ServerOp::Executing && onCancel) {
auto fn(std::move(onCancel));
fn();
}
state = ServerOp::Dead;
decltype (onClose) closer;
if(onClose) {
closer = std::move(onClose);
}
bool notify = closer.operator bool();
if(auto ch = chan.lock()) {
ch->opByIOID.erase(ioid);
if(auto conn = ch->conn.lock()) {
conn->opByIOID.erase(ioid);
if(notify) {
conn->iface->server->acceptor_loop.dispatch([closer](){
closer("");
});
notify = false;
}
}
}
if(notify)
closer("");
}
}} // namespace pvxs::impl
+3
View File
@@ -51,6 +51,7 @@ struct ServerOp
ServerOp& operator=(const ServerOp&) = delete;
virtual ~ServerOp() =0;
void cleanup();
virtual void show(std::ostream& strm) const =0;
};
@@ -103,6 +104,8 @@ struct ServerChan
ServerChan(const ServerChan&) = delete;
ServerChan& operator=(const ServerChan&) = delete;
~ServerChan();
void cleanup();
};
struct ServerConn : public ConnBase, public std::enable_shared_from_this<ServerConn>
+1 -15
View File
@@ -119,21 +119,7 @@ struct ServerGPR : public ServerOp
ch->statTx += conn->enqueueTxBody(cmd);
if(state == ServerOp::Dead) {
ch->opByIOID.erase(ioid);
auto it = conn->opByIOID.find(ioid);
if(it!=conn->opByIOID.end()) {
auto self(it->second);
conn->opByIOID.erase(it);
if(self->onClose)
conn->iface->server->acceptor_loop.dispatch([self](){
self->onClose("");
});
} else {
assert(false); // really shouldn't happen
}
conn->opByIOID.erase(ioid);
cleanup();
}
}
+16 -23
View File
@@ -29,7 +29,17 @@ struct MonitorOp : public ServerOp,
{
MonitorOp(const std::shared_ptr<ServerChan>& chan, uint32_t ioid)
:ServerOp(chan, ioid)
{}
{
// ServerOp::onCancel isn't exposed to users for MONITOR
// so we can (ab)use for internal cleanup.
onCancel = [this]() {
if(state == Executing) {
if(onStart)
onStart(false);
state = Idle;
}
};
}
virtual ~MonitorOp() {}
// only access from accepter worker thread
@@ -153,21 +163,7 @@ struct MonitorOp : public ServerOp,
ch->statTx += conn->enqueueTxBody(pva_app_msg_t::CMD_MONITOR);
if(state == ServerOp::Dead) {
ch->opByIOID.erase(ioid);
auto it = conn->opByIOID.find(ioid);
if(it!=conn->opByIOID.end()) {
auto self(it->second);
conn->opByIOID.erase(it);
if(self->onClose)
conn->iface->server->acceptor_loop.dispatch([self](){
self->onClose("");
});
} else {
assert(false); // really shouldn't happen
}
conn->opByIOID.erase(ioid);
cleanup();
return;
}
@@ -581,15 +577,12 @@ void ServerConn::handle_MONITOR()
auto self(it->second);
opByIOID.erase(it);
if(self->onClose) {
iface->server->acceptor_loop.dispatch([self](){
if(self->onClose)
self->onClose("");
});
}
iface->server->acceptor_loop.dispatch([self](){
self->cleanup();
});
} else {
assert(false); // really shouldn't happen
log_exc_printf(connsetup, "Logic error in %s w/ 0x%x\n", __func__, subcmd);
}
opByIOID.erase(ioid);
}