SharedPV allow use of Channel::destroy() to trigger onLastDisconnect()

Add dead flag to catch double destroy() or use after destroy()
This commit is contained in:
Michael Davidsaver
2018-10-07 20:00:38 -07:00
parent ed1bd1b962
commit 9ca7487e29
5 changed files with 140 additions and 69 deletions

View File

@@ -35,6 +35,7 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr<SharedPV> &owner,
,channelName(channelName)
,requester(requester)
,provider(provider)
,dead(false)
{
REFTRACE_INCREMENT(num_instances);
@@ -52,10 +53,20 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr<SharedPV> &owner,
}
SharedChannel::~SharedChannel()
{
destroy();
REFTRACE_DECREMENT(num_instances);
}
void SharedChannel::destroy()
{
std::tr1::shared_ptr<SharedPV::Handler> handler;
{
Guard G(owner->mutex);
if(dead) return;
dead = true;
bool wasempty = owner->channels.empty();
owner->channels.remove(this);
if(!wasempty && owner->channels.empty() && owner->notifiedConn) {
@@ -74,12 +85,8 @@ SharedChannel::~SharedChannel()
channelName.c_str(),
this);
}
REFTRACE_DECREMENT(num_instances);
}
void SharedChannel::destroy() {}
std::tr1::shared_ptr<pva::ChannelProvider> SharedChannel::getProvider()
{
return provider.lock();
@@ -102,22 +109,28 @@ std::tr1::shared_ptr<pva::ChannelRequester> SharedChannel::getChannelRequester()
void SharedChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField)
{
epics::pvData::FieldConstPtr desc;
pvd::FieldConstPtr desc;
pvd::Status sts;
SharedPV::Handler::shared_pointer handler;
{
Guard G(owner->mutex);
if(owner->type) {
desc = owner->type;
}
if(dead) {
sts = pvd::Status::error("Dead Channel");
if(!owner->channels.empty() && !owner->notifiedConn) {
handler = owner->handler;
owner->notifiedConn = true;
} else {
if(owner->type) {
desc = owner->type;
}
if(!owner->channels.empty() && !owner->notifiedConn) {
handler = owner->handler;
owner->notifiedConn = true;
}
owner->getfields.push_back(requester);
}
owner->getfields.push_back(requester);
}
if(desc) {
requester->getDone(pvd::Status(), desc);
if(desc || !sts.isOK()) {
requester->getDone(sts, desc);
}
if(handler) {
handler->onFirstConnect(owner);
@@ -131,28 +144,34 @@ pva::ChannelPut::shared_pointer SharedChannel::createChannelPut(
std::tr1::shared_ptr<SharedPut> ret(new SharedPut(shared_from_this(), requester, pvRequest));
pvd::StructureConstPtr type;
pvd::Status sts;
std::string warning;
SharedPV::Handler::shared_pointer handler;
try {
{
Guard G(owner->mutex);
// ~SharedPut removes
owner->puts.push_back(ret.get());
if(owner->current) {
ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode);
type = ret->mapper.requested();
warning = ret->mapper.warnings();
}
if(dead) {
sts = pvd::Status::error("Dead Channel");
if(!owner->channels.empty() && !owner->notifiedConn) {
handler = owner->handler;
owner->notifiedConn = true;
} else {
// ~SharedPut removes
owner->puts.push_back(ret.get());
if(owner->current) {
ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode);
type = ret->mapper.requested();
warning = ret->mapper.warnings();
}
if(!owner->channels.empty() && !owner->notifiedConn) {
handler = owner->handler;
owner->notifiedConn = true;
}
}
}
if(!warning.empty())
requester->message(warning, pvd::warningMessage);
if(type)
requester->channelPutConnect(pvd::Status(), ret, type);
if(type || !sts.isOK())
requester->channelPutConnect(sts, ret, type);
}catch(std::runtime_error& e){
ret.reset();
type.reset();
@@ -170,11 +189,18 @@ pva::ChannelRPC::shared_pointer SharedChannel::createChannelRPC(
{
std::tr1::shared_ptr<SharedRPC> ret(new SharedRPC(shared_from_this(), requester, pvRequest));
ret->connected = true;
requester->channelRPCConnect(pvd::Status(), ret);
pvd::Status sts;
{
Guard G(owner->mutex);
owner->rpcs.push_back(ret.get());
if(dead) {
sts = pvd::Status::error("Dead Channel");
} else {
owner->rpcs.push_back(ret.get());
}
}
requester->channelRPCConnect(sts, ret);
return ret;
}
@@ -186,27 +212,42 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor(
SharedPV::Handler::shared_pointer handler;
mconf.dropEmptyUpdates = owner->config.dropEmptyUpdates;
mconf.mapperMode = owner->config.mapperMode;
std::tr1::shared_ptr<SharedMonitorFIFO> ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest, &mconf));
bool notify;
pvd::Status sts;
{
Guard G(owner->mutex);
owner->monitors.push_back(ret.get());
notify = !!owner->type;
if(notify) {
ret->open(owner->type);
// post initial update
ret->post(*owner->current, owner->valid);
}
if(dead) {
sts = pvd::Status::error("Dead Channel");
notify = false;
if(!owner->channels.empty() && !owner->notifiedConn) {
handler = owner->handler;
owner->notifiedConn = true;
} else {
owner->monitors.push_back(ret.get());
notify = !!owner->type;
if(notify) {
ret->open(owner->type);
// post initial update
ret->post(*owner->current, owner->valid);
}
if(!owner->channels.empty() && !owner->notifiedConn) {
handler = owner->handler;
owner->notifiedConn = true;
}
}
}
if(notify)
if(!sts.isOK()) {
requester->monitorConnect(sts, pvd::MonitorPtr(), pvd::StructureConstPtr());
ret.reset();
} else if(notify) {
ret->notify();
if(handler) {
handler->onFirstConnect(owner);
if(handler) {
handler->onFirstConnect(owner);
}
}
return ret;
}

View File

@@ -107,43 +107,55 @@ void SharedPut::put(
std::tr1::shared_ptr<SharedPV::Handler> handler;
pvd::PVStructure::shared_pointer realval;
pvd::BitSet changed;
pvd::Status sts;
{
Guard G(channel->owner->mutex);
if(pvPutStructure->getStructure()!=mapper.requested()) {
if(channel->dead) {
sts = pvd::Status::error("Dead Channel");
} else if(pvPutStructure->getStructure()!=mapper.requested()) {
requester_type::shared_pointer req(requester.lock());
if(req)
req->putDone(pvd::Status::error("Type changed"), shared_from_this());
return;
sts = pvd::Status::error("Type changed");
} else {
handler = channel->owner->handler;
realval = mapper.buildBase();
mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet);
}
handler = channel->owner->handler;
realval = mapper.buildBase();
mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet);
}
std::tr1::shared_ptr<PutOP> impl(new PutOP(shared_from_this(), pvRequest, realval, changed),
Operation::Impl::Cleanup());
if(!sts.isOK()) {
requester_type::shared_pointer req(requester.lock());
if(req)
req->putDone(sts, pva::ChannelPut::shared_pointer());
if(handler) {
Operation op(impl);
handler->onPut(channel->owner, op);
} else {
std::tr1::shared_ptr<PutOP> impl(new PutOP(shared_from_this(), pvRequest, realval, changed),
Operation::Impl::Cleanup());
if(handler) {
Operation op(impl);
handler->onPut(channel->owner, op);
}
}
}
void SharedPut::get()
{
pvd::Status sts;
pvd::PVStructurePtr current;
pvd::BitSetPtr changed;
bool emptyselect = false;
{
Guard G(channel->owner->mutex);
if(channel->owner->current) {
if(channel->dead) {
sts = pvd::Status::error("Dead Channel");
} else if(channel->owner->current) {
assert(!!mapper.requested());
current = mapper.buildRequested();
@@ -157,10 +169,10 @@ void SharedPut::get()
requester_type::shared_pointer req(requester.lock());
if(!req) return;
if(!current) {
if(!sts.isOK()) {
// no-op
} else if(!current) {
sts = pvd::Status::error("Get not possible, cache disabled");
} else if(emptyselect) {
sts = pvd::Status::warn("pvRequest with empty field mask");
}
req->getDone(sts, shared_from_this(), current, changed);

View File

@@ -152,6 +152,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
this->valid = valid;
FOR_EACH(puts_t::const_iterator, it, end, puts) {
if((*it)->channel->dead) continue;
try {
try {
(*it)->mapper.compute(*current, *(*it)->pvRequest, config.mapperMode);
@@ -165,12 +166,13 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
}
}
FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {
if((*it)->connected) continue;
if((*it)->connected || (*it)->channel->dead) continue;
try {
p_rpc.push_back((*it)->shared_from_this());
}catch(std::tr1::bad_weak_ptr&) {}
}
FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
if((*it)->channel->dead) continue;
try {
(*it)->open(newtype);
// post initial update
@@ -180,6 +182,7 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
}
// consume getField
FOR_EACH(getfields_t::iterator, it, end, getfields) {
// TODO: this may be on a dead Channel
p_getfield.push_back(it->lock());
}
getfields.clear(); // consume

View File

@@ -109,17 +109,30 @@ void SharedRPC::lastRequest() {}
void SharedRPC::request(epics::pvData::PVStructure::shared_pointer const & pvArgument)
{
std::tr1::shared_ptr<SharedPV::Handler> handler;
pvd::Status sts;
{
Guard G(channel->owner->mutex);
handler = channel->owner->handler;
if(channel->dead) {
sts = pvd::Status::error("Dead Channel");
} else {
handler = channel->owner->handler;
}
}
std::tr1::shared_ptr<RPCOP> impl(new RPCOP(shared_from_this(), pvRequest, pvArgument),
Operation::Impl::Cleanup());
if(!sts.isOK()) {
requester_type::shared_pointer req(requester.lock());
if(req)
req->requestDone(sts, shared_from_this(), pvd::PVStructurePtr());
if(handler) {
Operation op(impl);
handler->onRPC(channel->owner, op);
} else {
std::tr1::shared_ptr<RPCOP> impl(new RPCOP(shared_from_this(), pvRequest, pvArgument),
Operation::Impl::Cleanup());
if(handler) {
Operation op(impl);
handler->onRPC(channel->owner, op);
}
}
}

View File

@@ -31,6 +31,8 @@ struct SharedChannel : public pva::Channel,
const requester_type::weak_pointer requester;
const pva::ChannelProvider::weak_pointer provider;
bool dead; // has destroy() been called?
SharedChannel(const std::tr1::shared_ptr<SharedPV>& owner,
const pva::ChannelProvider::shared_pointer provider,
const std::string& channelName,