From 9ca7487e29bf912fc18a3c41618d9014a830bcb0 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 7 Oct 2018 20:00:38 -0700 Subject: [PATCH] SharedPV allow use of Channel::destroy() to trigger onLastDisconnect() Add dead flag to catch double destroy() or use after destroy() --- src/server/sharedstate_channel.cpp | 123 +++++++++++++++++++---------- src/server/sharedstate_put.cpp | 54 ++++++++----- src/server/sharedstate_pv.cpp | 5 +- src/server/sharedstate_rpc.cpp | 25 ++++-- src/server/sharedstateimpl.h | 2 + 5 files changed, 140 insertions(+), 69 deletions(-) diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp index 59e1809..e837331 100644 --- a/src/server/sharedstate_channel.cpp +++ b/src/server/sharedstate_channel.cpp @@ -35,6 +35,7 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr &owner, ,channelName(channelName) ,requester(requester) ,provider(provider) + ,dead(false) { REFTRACE_INCREMENT(num_instances); @@ -52,10 +53,20 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr &owner, } SharedChannel::~SharedChannel() +{ + destroy(); + REFTRACE_DECREMENT(num_instances); +} + +void SharedChannel::destroy() { std::tr1::shared_ptr handler; { Guard G(owner->mutex); + + if(dead) return; + dead = true; + bool wasempty = owner->channels.empty(); owner->channels.remove(this); if(!wasempty && owner->channels.empty() && owner->notifiedConn) { @@ -74,12 +85,8 @@ SharedChannel::~SharedChannel() channelName.c_str(), this); } - - REFTRACE_DECREMENT(num_instances); } -void SharedChannel::destroy() {} - std::tr1::shared_ptr SharedChannel::getProvider() { return provider.lock(); @@ -102,22 +109,28 @@ std::tr1::shared_ptr SharedChannel::getChannelRequester() void SharedChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField) { - epics::pvData::FieldConstPtr desc; + pvd::FieldConstPtr desc; + pvd::Status sts; SharedPV::Handler::shared_pointer handler; { Guard G(owner->mutex); - if(owner->type) { - desc = owner->type; - } + if(dead) { + sts = pvd::Status::error("Dead Channel"); - if(!owner->channels.empty() && !owner->notifiedConn) { - handler = owner->handler; - owner->notifiedConn = true; + } else { + if(owner->type) { + desc = owner->type; + } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } + owner->getfields.push_back(requester); } - owner->getfields.push_back(requester); } - if(desc) { - requester->getDone(pvd::Status(), desc); + if(desc || !sts.isOK()) { + requester->getDone(sts, desc); } if(handler) { handler->onFirstConnect(owner); @@ -131,28 +144,34 @@ pva::ChannelPut::shared_pointer SharedChannel::createChannelPut( std::tr1::shared_ptr ret(new SharedPut(shared_from_this(), requester, pvRequest)); pvd::StructureConstPtr type; + pvd::Status sts; std::string warning; SharedPV::Handler::shared_pointer handler; try { { Guard G(owner->mutex); - // ~SharedPut removes - owner->puts.push_back(ret.get()); - if(owner->current) { - ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode); - type = ret->mapper.requested(); - warning = ret->mapper.warnings(); - } + if(dead) { + sts = pvd::Status::error("Dead Channel"); - if(!owner->channels.empty() && !owner->notifiedConn) { - handler = owner->handler; - owner->notifiedConn = true; + } else { + // ~SharedPut removes + owner->puts.push_back(ret.get()); + if(owner->current) { + ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode); + type = ret->mapper.requested(); + warning = ret->mapper.warnings(); + } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } } } if(!warning.empty()) requester->message(warning, pvd::warningMessage); - if(type) - requester->channelPutConnect(pvd::Status(), ret, type); + if(type || !sts.isOK()) + requester->channelPutConnect(sts, ret, type); }catch(std::runtime_error& e){ ret.reset(); type.reset(); @@ -170,11 +189,18 @@ pva::ChannelRPC::shared_pointer SharedChannel::createChannelRPC( { std::tr1::shared_ptr ret(new SharedRPC(shared_from_this(), requester, pvRequest)); ret->connected = true; - requester->channelRPCConnect(pvd::Status(), ret); + + pvd::Status sts; { Guard G(owner->mutex); - owner->rpcs.push_back(ret.get()); + if(dead) { + sts = pvd::Status::error("Dead Channel"); + + } else { + owner->rpcs.push_back(ret.get()); + } } + requester->channelRPCConnect(sts, ret); return ret; } @@ -186,27 +212,42 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor( SharedPV::Handler::shared_pointer handler; mconf.dropEmptyUpdates = owner->config.dropEmptyUpdates; mconf.mapperMode = owner->config.mapperMode; + std::tr1::shared_ptr ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest, &mconf)); + bool notify; + pvd::Status sts; { Guard G(owner->mutex); - owner->monitors.push_back(ret.get()); - notify = !!owner->type; - if(notify) { - ret->open(owner->type); - // post initial update - ret->post(*owner->current, owner->valid); - } + if(dead) { + sts = pvd::Status::error("Dead Channel"); + notify = false; - if(!owner->channels.empty() && !owner->notifiedConn) { - handler = owner->handler; - owner->notifiedConn = true; + } else { + owner->monitors.push_back(ret.get()); + notify = !!owner->type; + if(notify) { + ret->open(owner->type); + // post initial update + ret->post(*owner->current, owner->valid); + } + + if(!owner->channels.empty() && !owner->notifiedConn) { + handler = owner->handler; + owner->notifiedConn = true; + } } } - if(notify) + if(!sts.isOK()) { + requester->monitorConnect(sts, pvd::MonitorPtr(), pvd::StructureConstPtr()); + ret.reset(); + + } else if(notify) { ret->notify(); - if(handler) { - handler->onFirstConnect(owner); + + if(handler) { + handler->onFirstConnect(owner); + } } return ret; } diff --git a/src/server/sharedstate_put.cpp b/src/server/sharedstate_put.cpp index f61f3b3..3afa2c6 100644 --- a/src/server/sharedstate_put.cpp +++ b/src/server/sharedstate_put.cpp @@ -107,43 +107,55 @@ void SharedPut::put( std::tr1::shared_ptr handler; pvd::PVStructure::shared_pointer realval; pvd::BitSet changed; + pvd::Status sts; { Guard G(channel->owner->mutex); - if(pvPutStructure->getStructure()!=mapper.requested()) { + if(channel->dead) { + sts = pvd::Status::error("Dead Channel"); + + } else if(pvPutStructure->getStructure()!=mapper.requested()) { requester_type::shared_pointer req(requester.lock()); - if(req) - req->putDone(pvd::Status::error("Type changed"), shared_from_this()); - return; + sts = pvd::Status::error("Type changed"); + + } else { + + handler = channel->owner->handler; + + realval = mapper.buildBase(); + + mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet); } - - handler = channel->owner->handler; - - realval = mapper.buildBase(); - - mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet); } - std::tr1::shared_ptr impl(new PutOP(shared_from_this(), pvRequest, realval, changed), - Operation::Impl::Cleanup()); + if(!sts.isOK()) { + requester_type::shared_pointer req(requester.lock()); + if(req) + req->putDone(sts, pva::ChannelPut::shared_pointer()); - if(handler) { - Operation op(impl); - handler->onPut(channel->owner, op); + } else { + std::tr1::shared_ptr impl(new PutOP(shared_from_this(), pvRequest, realval, changed), + Operation::Impl::Cleanup()); + + if(handler) { + Operation op(impl); + handler->onPut(channel->owner, op); + } } } void SharedPut::get() { - pvd::Status sts; pvd::PVStructurePtr current; pvd::BitSetPtr changed; - bool emptyselect = false; { Guard G(channel->owner->mutex); - if(channel->owner->current) { + if(channel->dead) { + sts = pvd::Status::error("Dead Channel"); + + } else if(channel->owner->current) { assert(!!mapper.requested()); current = mapper.buildRequested(); @@ -157,10 +169,10 @@ void SharedPut::get() requester_type::shared_pointer req(requester.lock()); if(!req) return; - if(!current) { + if(!sts.isOK()) { + // no-op + } else if(!current) { sts = pvd::Status::error("Get not possible, cache disabled"); - } else if(emptyselect) { - sts = pvd::Status::warn("pvRequest with empty field mask"); } req->getDone(sts, shared_from_this(), current, changed); diff --git a/src/server/sharedstate_pv.cpp b/src/server/sharedstate_pv.cpp index 6ccebc0..b10edb1 100644 --- a/src/server/sharedstate_pv.cpp +++ b/src/server/sharedstate_pv.cpp @@ -152,6 +152,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& this->valid = valid; FOR_EACH(puts_t::const_iterator, it, end, puts) { + if((*it)->channel->dead) continue; try { try { (*it)->mapper.compute(*current, *(*it)->pvRequest, config.mapperMode); @@ -165,12 +166,13 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& } } FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { - if((*it)->connected) continue; + if((*it)->connected || (*it)->channel->dead) continue; try { p_rpc.push_back((*it)->shared_from_this()); }catch(std::tr1::bad_weak_ptr&) {} } FOR_EACH(monitors_t::const_iterator, it, end, monitors) { + if((*it)->channel->dead) continue; try { (*it)->open(newtype); // post initial update @@ -180,6 +182,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& } // consume getField FOR_EACH(getfields_t::iterator, it, end, getfields) { + // TODO: this may be on a dead Channel p_getfield.push_back(it->lock()); } getfields.clear(); // consume diff --git a/src/server/sharedstate_rpc.cpp b/src/server/sharedstate_rpc.cpp index 336d083..e0dd540 100644 --- a/src/server/sharedstate_rpc.cpp +++ b/src/server/sharedstate_rpc.cpp @@ -109,17 +109,30 @@ void SharedRPC::lastRequest() {} void SharedRPC::request(epics::pvData::PVStructure::shared_pointer const & pvArgument) { std::tr1::shared_ptr handler; + pvd::Status sts; { Guard G(channel->owner->mutex); - handler = channel->owner->handler; + if(channel->dead) { + sts = pvd::Status::error("Dead Channel"); + + } else { + handler = channel->owner->handler; + } } - std::tr1::shared_ptr impl(new RPCOP(shared_from_this(), pvRequest, pvArgument), - Operation::Impl::Cleanup()); + if(!sts.isOK()) { + requester_type::shared_pointer req(requester.lock()); + if(req) + req->requestDone(sts, shared_from_this(), pvd::PVStructurePtr()); - if(handler) { - Operation op(impl); - handler->onRPC(channel->owner, op); + } else { + std::tr1::shared_ptr impl(new RPCOP(shared_from_this(), pvRequest, pvArgument), + Operation::Impl::Cleanup()); + + if(handler) { + Operation op(impl); + handler->onRPC(channel->owner, op); + } } } diff --git a/src/server/sharedstateimpl.h b/src/server/sharedstateimpl.h index 8fbf362..c1a72ef 100644 --- a/src/server/sharedstateimpl.h +++ b/src/server/sharedstateimpl.h @@ -31,6 +31,8 @@ struct SharedChannel : public pva::Channel, const requester_type::weak_pointer requester; const pva::ChannelProvider::weak_pointer provider; + bool dead; // has destroy() been called? + SharedChannel(const std::tr1::shared_ptr& owner, const pva::ChannelProvider::shared_pointer provider, const std::string& channelName,