From e668038250dc2aedaf2fd23cd6d6cd762265fea8 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 1 Mar 2020 19:57:13 -0800 Subject: [PATCH] client track opByIOID per channel needed to handle CMD_DESTROY_CHANNEL --- src/client.cpp | 8 +++++--- src/clientconn.cpp | 7 ++++++- src/clientget.cpp | 2 ++ src/clientimpl.h | 4 ++++ src/clientintrospect.cpp | 6 ++++-- 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index b0f0cec..4b22858 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -83,9 +83,11 @@ void Channel::createOperations() } while(conn->opByIOID.find(ioid)!=conn->opByIOID.end()); //conn->opByIOID.insert(std::make_pair(ioid, RequestInfo(sid, ioid, op))); - conn->opByIOID.emplace(std::piecewise_construct, - std::forward_as_tuple(ioid), - std::forward_as_tuple(sid, ioid, op)); + auto pair = conn->opByIOID.emplace(std::piecewise_construct, + std::forward_as_tuple(ioid), + std::forward_as_tuple(sid, ioid, op)); + opByIOID[ioid] = &pair.first->second; + op->ioid = ioid; op->createOp(); diff --git a/src/clientconn.cpp b/src/clientconn.cpp index 9d7ccce..8ad224a 100644 --- a/src/clientconn.cpp +++ b/src/clientconn.cpp @@ -141,6 +141,7 @@ void Connection::cleanup() auto op = pair.second.handle.lock(); if(!op) continue; + op->chan->opByIOID.erase(op->ioid); op->disconnected(op); } @@ -360,7 +361,11 @@ void Connection::handle_DESTROY_CHANNEL() self = std::move(chan->conn); context->searchBuckets[context->currentBucket].push_back(chan); - // TODO: disconnect Operations + for(auto& pair : chan->opByIOID) { + auto op = pair.second->handle.lock(); + opByIOID.erase(pair.first); // invalidates pair.second + op->disconnected(op); + } log_debug_printf(io, "Server %s destroys channel '%s' %u:%u\n", peerName.c_str(), chan->name.c_str(), unsigned(cid), unsigned(sid)); diff --git a/src/clientget.cpp b/src/clientget.cpp index ed153ff..ec3f089 100644 --- a/src/clientget.cpp +++ b/src/clientget.cpp @@ -67,6 +67,7 @@ struct GPROp : public OperationBase // This opens up a race with an in-flight reply. chan->conn->opByIOID.erase(ioid); + chan->opByIOID.erase(ioid); } state = Done; chan.reset(); @@ -328,6 +329,7 @@ void Connection::handle_GPR(pva_app_msg_t cmd) // so we can ~safely forget about it. // we might get CMD_MESSAGE, but these could be ignored with no ill effects. opByIOID.erase(ioid); + gpr->chan->opByIOID.erase(ioid); gpr->notify(); } diff --git a/src/clientimpl.h b/src/clientimpl.h index 1331ab5..6bb0e1a 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -59,6 +59,7 @@ struct Connection : public ConnBase, public std::enable_shared_from_this> creatingByCID, chanBySID; + // entries always have matching entry in a Channel::opByIOID std::map opByIOID; uint32_t nextIOID = 0u; @@ -122,6 +123,9 @@ struct Channel { std::list> pending; + // points to storage of Connection::opByIOID + std::map opByIOID; + Channel(const std::shared_ptr& context, const std::string& name, uint32_t cid); ~Channel(); diff --git a/src/clientintrospect.cpp b/src/clientintrospect.cpp index 79a2f59..5ad9f37 100644 --- a/src/clientintrospect.cpp +++ b/src/clientintrospect.cpp @@ -46,6 +46,7 @@ struct InfoOp : public OperationBase // This opens up a race with an in-flight reply. chan->conn->opByIOID.erase(ioid); + chan->opByIOID.erase(ioid); } state = Done; chan.reset(); @@ -113,6 +114,7 @@ void Connection::handle_GET_FIELD() } std::shared_ptr op; + InfoOp* info; { auto it = opByIOID.find(ioid); if(it==opByIOID.end() @@ -121,11 +123,11 @@ void Connection::handle_GET_FIELD() log_warn_printf(io, "Server %s sends stale GET_FIELD\n", peerName.c_str()); return; } + info = static_cast(op.get()); opByIOID.erase(it); + info->chan->opByIOID.erase(ioid); } - auto info = static_cast(op.get()); - if(info->state!=InfoOp::Waiting) { log_warn_printf(io, "Server %s ignore second reply to GET_FIELD\n", peerName.c_str()); return;