client track opByIOID per channel
needed to handle CMD_DESTROY_CHANNEL
This commit is contained in:
+5
-3
@@ -83,9 +83,11 @@ void Channel::createOperations()
|
||||
} while(conn->opByIOID.find(ioid)!=conn->opByIOID.end());
|
||||
|
||||
//conn->opByIOID.insert(std::make_pair(ioid, RequestInfo(sid, ioid, op)));
|
||||
conn->opByIOID.emplace(std::piecewise_construct,
|
||||
std::forward_as_tuple(ioid),
|
||||
std::forward_as_tuple(sid, ioid, op));
|
||||
auto pair = conn->opByIOID.emplace(std::piecewise_construct,
|
||||
std::forward_as_tuple(ioid),
|
||||
std::forward_as_tuple(sid, ioid, op));
|
||||
opByIOID[ioid] = &pair.first->second;
|
||||
|
||||
op->ioid = ioid;
|
||||
|
||||
op->createOp();
|
||||
|
||||
+6
-1
@@ -141,6 +141,7 @@ void Connection::cleanup()
|
||||
auto op = pair.second.handle.lock();
|
||||
if(!op)
|
||||
continue;
|
||||
op->chan->opByIOID.erase(op->ioid);
|
||||
op->disconnected(op);
|
||||
}
|
||||
|
||||
@@ -360,7 +361,11 @@ void Connection::handle_DESTROY_CHANNEL()
|
||||
self = std::move(chan->conn);
|
||||
context->searchBuckets[context->currentBucket].push_back(chan);
|
||||
|
||||
// TODO: disconnect Operations
|
||||
for(auto& pair : chan->opByIOID) {
|
||||
auto op = pair.second->handle.lock();
|
||||
opByIOID.erase(pair.first); // invalidates pair.second
|
||||
op->disconnected(op);
|
||||
}
|
||||
|
||||
log_debug_printf(io, "Server %s destroys channel '%s' %u:%u\n",
|
||||
peerName.c_str(), chan->name.c_str(), unsigned(cid), unsigned(sid));
|
||||
|
||||
@@ -67,6 +67,7 @@ struct GPROp : public OperationBase
|
||||
|
||||
// This opens up a race with an in-flight reply.
|
||||
chan->conn->opByIOID.erase(ioid);
|
||||
chan->opByIOID.erase(ioid);
|
||||
}
|
||||
state = Done;
|
||||
chan.reset();
|
||||
@@ -328,6 +329,7 @@ void Connection::handle_GPR(pva_app_msg_t cmd)
|
||||
// so we can ~safely forget about it.
|
||||
// we might get CMD_MESSAGE, but these could be ignored with no ill effects.
|
||||
opByIOID.erase(ioid);
|
||||
gpr->chan->opByIOID.erase(ioid);
|
||||
|
||||
gpr->notify();
|
||||
}
|
||||
|
||||
@@ -59,6 +59,7 @@ struct Connection : public ConnBase, public std::enable_shared_from_this<Connect
|
||||
std::map<uint32_t, std::weak_ptr<Channel>> creatingByCID,
|
||||
chanBySID;
|
||||
|
||||
// entries always have matching entry in a Channel::opByIOID
|
||||
std::map<uint32_t, RequestInfo> opByIOID;
|
||||
|
||||
uint32_t nextIOID = 0u;
|
||||
@@ -122,6 +123,9 @@ struct Channel {
|
||||
|
||||
std::list<std::weak_ptr<OperationBase>> pending;
|
||||
|
||||
// points to storage of Connection::opByIOID
|
||||
std::map<uint32_t, RequestInfo*> opByIOID;
|
||||
|
||||
Channel(const std::shared_ptr<Context::Pvt>& context, const std::string& name, uint32_t cid);
|
||||
~Channel();
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@ struct InfoOp : public OperationBase
|
||||
|
||||
// This opens up a race with an in-flight reply.
|
||||
chan->conn->opByIOID.erase(ioid);
|
||||
chan->opByIOID.erase(ioid);
|
||||
}
|
||||
state = Done;
|
||||
chan.reset();
|
||||
@@ -113,6 +114,7 @@ void Connection::handle_GET_FIELD()
|
||||
}
|
||||
|
||||
std::shared_ptr<Operation> op;
|
||||
InfoOp* info;
|
||||
{
|
||||
auto it = opByIOID.find(ioid);
|
||||
if(it==opByIOID.end()
|
||||
@@ -121,11 +123,11 @@ void Connection::handle_GET_FIELD()
|
||||
log_warn_printf(io, "Server %s sends stale GET_FIELD\n", peerName.c_str());
|
||||
return;
|
||||
}
|
||||
info = static_cast<InfoOp*>(op.get());
|
||||
opByIOID.erase(it);
|
||||
info->chan->opByIOID.erase(ioid);
|
||||
}
|
||||
|
||||
auto info = static_cast<InfoOp*>(op.get());
|
||||
|
||||
if(info->state!=InfoOp::Waiting) {
|
||||
log_warn_printf(io, "Server %s ignore second reply to GET_FIELD\n", peerName.c_str());
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user