fixup client operation object lifetime

This commit is contained in:
Michael Davidsaver
2020-04-10 11:10:39 -07:00
parent bcea4f032a
commit b0eecb949f
5 changed files with 127 additions and 72 deletions
+41 -29
View File
@@ -72,6 +72,7 @@ struct SubscriptionImpl : public OperationBase, public Subscription
,ackTick(event_new(chan->context->tcp_loop.base, -1, EV_TIMEOUT, &tickAckS, this))
{}
virtual ~SubscriptionImpl() {
chan->context->tcp_loop.assertInLoop();
_cancel(true);
}
@@ -92,8 +93,6 @@ struct SubscriptionImpl : public OperationBase, public Subscription
virtual void pause(bool p) override final
{
if(!chan)
return;
chan->context->tcp_loop.call([this, p](){
log_info_printf(io, "Server %s channel %s monitor %s\n",
chan->conn ? chan->conn->peerName.c_str() : "<disconnected>",
@@ -165,39 +164,38 @@ struct SubscriptionImpl : public OperationBase, public Subscription
}
virtual void cancel() override final {
_cancel(false);
}
void _cancel(bool implicit) {
auto context = chan->context;
decltype (event) junk;
context->tcp_loop.call([this, &junk, implicit](){
if(implicit && state!=Done) {
log_info_printf(io, "Server %s channel %s monitor implied cancel\n",
chan->conn ? chan->conn->peerName.c_str() : "<disconnected>",
chan->name.c_str());
}
log_info_printf(io, "Server %s channel %s monitor cancel\n",
chan->conn ? chan->conn->peerName.c_str() : "<disconnected>",
chan->name.c_str());
if(state==Idle || state==Running) {
chan->conn->sendDestroyRequest(chan->sid, ioid);
// This opens up a race with an in-flight reply.
chan->conn->opByIOID.erase(ioid);
chan->opByIOID.erase(ioid);
if(pipeline)
(void)event_del(ackTick.get());
}
state = Done;
chan.reset();
context->tcp_loop.call([this, &junk](){
_cancel(false);
junk = std::move(event);
// leave opByIOID for GC
});
}
void _cancel(bool implicit) {
if(implicit && state!=Done) {
log_info_printf(io, "Server %s channel %s monitor implied cancel\n",
chan->conn ? chan->conn->peerName.c_str() : "<disconnected>",
chan->name.c_str());
}
log_info_printf(io, "Server %s channel %s monitor cancel\n",
chan->conn ? chan->conn->peerName.c_str() : "<disconnected>",
chan->name.c_str());
if(state==Idle || state==Running) {
chan->conn->sendDestroyRequest(chan->sid, ioid);
// This opens up a race with an in-flight reply.
chan->conn->opByIOID.erase(ioid);
chan->opByIOID.erase(ioid);
if(pipeline)
(void)event_del(ackTick.get());
}
state = Done;
}
virtual void createOp() override final
{
if(state!=Connecting) {
@@ -559,7 +557,21 @@ std::shared_ptr<Subscription> MonitorBuilder::exec()
chan->pending.push_back(op);
chan->createOperations();
ret = std::move(op);
ret.reset(op.get(), [op](Subscription*) mutable {
// on user thread
auto temp(std::move(op));
temp->chan->context->tcp_loop.call([&temp]() {
// on worker
try {
temp->_cancel(true);
}catch(std::exception& e){
log_exc_printf(monevt, "Channel %s error in monitor cancel(): %s",
temp->channelName.c_str(), e.what());
}
// ensure dtor on worker
temp.reset();
});
});
});
return ret;