fix pdb monitor locking

Guard PV*Monitor::complete to prevent
concurrent access from dbEvent callback,
and PVA server calling release().
This commit is contained in:
Michael Davidsaver
2018-02-06 18:05:11 -08:00
parent 1635e75c86
commit b6b6ac3305
3 changed files with 185 additions and 195 deletions

View File

@ -76,6 +76,7 @@ struct BaseMonitor : public epics::pvAccess::Monitor
epicsMutex& lock; // not held during any callback
typedef epicsGuard<epicsMutex> guard_t;
typedef epicsGuardRelease<epicsMutex> unguard_t;
private:
const requester_t::weak_pointer requester;
@ -106,121 +107,136 @@ public:
//! Must call before first post(). Sets .complete and calls monitorConnect()
//! @note that value will never by accessed except by post() and requestUpdate()
void connect(const epics::pvData::PVStructurePtr& value)
void connect(guard_t& guard, const epics::pvData::PVStructurePtr& value)
{
guard.assertIdenticalMutex(lock);
epics::pvData::StructureConstPtr dtype(value->getStructure());
epics::pvData::PVDataCreatePtr create(epics::pvData::getPVDataCreate());
BaseMonitor::shared_pointer self(shared_from_this());
requester_t::shared_pointer req(requester.lock());
{
guard_t G(lock);
assert(!complete); // can't call twice
complete = value;
empty.resize(nbuffers);
for(size_t i=0; i<empty.size(); i++) {
empty[i].reset(new epics::pvAccess::MonitorElement(create->createPVStructure(dtype)));
}
assert(!complete); // can't call twice
complete = value;
empty.resize(nbuffers);
for(size_t i=0; i<empty.size(); i++) {
empty[i].reset(new epics::pvAccess::MonitorElement(create->createPVStructure(dtype)));
}
epics::pvData::Status sts;
if(req)
if(req) {
unguard_t U(guard);
epics::pvData::Status sts;
req->monitorConnect(sts, self, dtype);
}
}
struct no_overflow {};
//! post update if queue not full, if full return false w/o overflow
bool post(const epics::pvData::BitSet& updated, no_overflow)
bool post(guard_t& guard, const epics::pvData::BitSet& updated, no_overflow)
{
guard.assertIdenticalMutex(lock);
requester_t::shared_pointer req;
{
guard_t G(lock);
if(!complete || !running) return false;
changed |= updated;
if(!complete || !running) return false;
if(empty.empty()) return false;
changed |= updated;
if(p_postone())
req = requester.lock();
inoverflow = false;
if(empty.empty()) return false;
if(p_postone())
req = requester.lock();
inoverflow = false;
if(req) {
unguard_t U(guard);
req->monitorEvent(shared_from_this());
}
if(req) req->monitorEvent(shared_from_this());
return true;
}
//! post update of pending changes. eg. call from requestUpdate()
bool post()
bool post(guard_t& guard)
{
guard.assertIdenticalMutex(lock);
bool oflow;
requester_t::shared_pointer req;
{
guard_t G(lock);
if(!complete || !running) return false;
if(empty.empty()) {
oflow = inoverflow = true;
if(!complete || !running) return false;
} else {
if(empty.empty()) {
oflow = inoverflow = true;
if(p_postone())
req = requester.lock();
oflow = inoverflow = false;
}
} else {
if(p_postone())
req = requester.lock();
oflow = inoverflow = false;
}
if(req) {
unguard_t U(guard);
req->monitorEvent(shared_from_this());
}
if(req) req->monitorEvent(shared_from_this());
return !oflow;
}
//! post update with changed and overflowed masks (eg. when updates were lost in some upstream queue)
bool post(const epics::pvData::BitSet& updated, const epics::pvData::BitSet& overflowed)
bool post(guard_t& guard,
const epics::pvData::BitSet& updated,
const epics::pvData::BitSet& overflowed)
{
guard.assertIdenticalMutex(lock);
bool oflow;
requester_t::shared_pointer req;
{
guard_t G(lock);
if(!complete || !running) return false;
if(empty.empty()) {
oflow = inoverflow = true;
overflow |= overflowed;
overflow.or_and(updated, changed);
changed |= updated;
if(!complete || !running) return false;
} else {
if(empty.empty()) {
oflow = inoverflow = true;
overflow |= overflowed;
overflow.or_and(updated, changed);
changed |= updated;
changed |= updated;
if(p_postone())
req = requester.lock();
oflow = inoverflow = false;
}
} else {
changed |= updated;
if(p_postone())
req = requester.lock();
oflow = inoverflow = false;
}
if(req) {
unguard_t U(guard);
req->monitorEvent(shared_from_this());
}
if(req) req->monitorEvent(shared_from_this());
return !oflow;
}
//! post update with changed
bool post(const epics::pvData::BitSet& updated) {
bool post(guard_t& guard, const epics::pvData::BitSet& updated) {
bool oflow;
requester_t::shared_pointer req;
{
guard_t G(lock);
if(!complete || !running) return false;
if(empty.empty()) {
oflow = inoverflow = true;
overflow.or_and(updated, changed);
changed |= updated;
if(!complete || !running) return false;
} else {
if(empty.empty()) {
oflow = inoverflow = true;
overflow.or_and(updated, changed);
changed |= updated;
changed |= updated;
if(p_postone())
req = requester.lock();
oflow = inoverflow = false;
}
} else {
changed |= updated;
if(p_postone())
req = requester.lock();
oflow = inoverflow = false;
}
if(req) {
unguard_t U(guard);
req->monitorEvent(shared_from_this());
}
if(req) req->monitorEvent(shared_from_this());
return !oflow;
}
@ -252,7 +268,7 @@ public:
virtual void onStop() {}
//! called when within release() when the opportunity exists to end the overflow condition
//! May do nothing, or lock and call post()
virtual void requestUpdate() {guard_t G(lock); post();}
virtual void requestUpdate() {guard_t G(lock); post(G);}
virtual void destroy()
{

View File

@ -36,30 +36,10 @@ void pdb_group_event(void *user_arg, struct dbChannel *chan,
PDBGroupPV::shared_pointer self(std::tr1::static_pointer_cast<PDBGroupPV>(((PDBGroupPV*)evt->self)->shared_from_this()));
PDBGroupPV::Info& info = self->members[idx];
PDBGroupPV::interested_remove_t temp;
{
bool doPost;
{
Guard G(self->lock);
if(!(evt->dbe_mask&DBE_PROPERTY)) {
if(!info.had_initial_VALUE) {
info.had_initial_VALUE = true;
assert(self->initial_waits>0);
self->initial_waits--;
}
} else {
if(!info.had_initial_PROPERTY) {
info.had_initial_PROPERTY = true;
assert(self->initial_waits>0);
self->initial_waits--;
}
}
doPost = self->initial_waits==0;
if(doPost)
self->interested_iterating = true;
}
Guard G(self->lock);
self->scratch.clear();
if(evt->dbe_mask&DBE_PROPERTY || !self->monatomic)
@ -80,35 +60,50 @@ void pdb_group_event(void *user_arg, struct dbChannel *chan,
}
}
if(!doPost) return; // don't post() until all subscriptions get initial updates
FOREACH(PDBGroupPV::interested_t::const_iterator, it, end, self->interested) {
PDBGroupMonitor& mon = **it;
mon.post(self->scratch);
if(!(evt->dbe_mask&DBE_PROPERTY)) {
if(!info.had_initial_VALUE) {
info.had_initial_VALUE = true;
assert(self->initial_waits>0);
self->initial_waits--;
}
} else {
if(!info.had_initial_PROPERTY) {
info.had_initial_PROPERTY = true;
assert(self->initial_waits>0);
self->initial_waits--;
}
}
PDBGroupPV::interested_remove_t temp;
{
Guard G(self->lock);
if(self->initial_waits==0) {
self->interested_iterating = true;
assert(self->interested_iterating);
while(!self->interested_add.empty()) {
PDBGroupPV::interested_t::iterator first(self->interested_add.begin());
self->interested.insert(*first);
self->interested_add.erase(first);
FOREACH(PDBGroupPV::interested_t::const_iterator, it, end, self->interested) {
PDBGroupMonitor& mon = **it;
mon.post(G, self->scratch); // G unlocked
}
temp.swap(self->interested_remove);
for(PDBGroupPV::interested_remove_t::iterator it(temp.begin()),
end(temp.end()); it != end; ++it)
{
self->interested.erase(static_cast<PDBGroupMonitor*>(it->get()));
Guard G(self->lock);
assert(self->interested_iterating);
while(!self->interested_add.empty()) {
PDBGroupPV::interested_t::iterator first(self->interested_add.begin());
self->interested.insert(*first);
self->interested_add.erase(first);
}
temp.swap(self->interested_remove);
for(PDBGroupPV::interested_remove_t::iterator it(temp.begin()),
end(temp.end()); it != end; ++it)
{
self->interested.erase(static_cast<PDBGroupMonitor*>(it->get()));
}
self->interested_iterating = false;
self->finalizeMonitor();
}
self->interested_iterating = false;
self->finalizeMonitor();
}
}
@ -149,45 +144,40 @@ PDBGroupPV::connect(const std::tr1::shared_ptr<PDBProvider>& prov,
// caller must not hold lock
void PDBGroupPV::addMonitor(PDBGroupMonitor *mon)
{
bool needpost = false;
{
Guard G(lock);
if(interested.empty() && interested_add.empty()) {
// first monitor
// start subscriptions
Guard G(lock);
if(interested.empty() && interested_add.empty()) {
// first monitor
// start subscriptions
size_t ievts = 0;
for(size_t i=0; i<members.size(); i++) {
PDBGroupPV::Info& info = members[i];
size_t ievts = 0;
for(size_t i=0; i<members.size(); i++) {
PDBGroupPV::Info& info = members[i];
if(!!info.evt_VALUE) {
db_event_enable(info.evt_VALUE.subscript);
db_post_single_event(info.evt_VALUE.subscript);
ievts++;
info.had_initial_VALUE = false;
} else {
info.had_initial_VALUE = true;
}
assert(info.evt_PROPERTY.subscript);
db_event_enable(info.evt_PROPERTY.subscript);
db_post_single_event(info.evt_PROPERTY.subscript);
if(!!info.evt_VALUE) {
db_event_enable(info.evt_VALUE.subscript);
db_post_single_event(info.evt_VALUE.subscript);
ievts++;
info.had_initial_PROPERTY = false;
info.had_initial_VALUE = false;
} else {
info.had_initial_VALUE = true;
}
initial_waits = ievts;
assert(info.evt_PROPERTY.subscript);
db_event_enable(info.evt_PROPERTY.subscript);
db_post_single_event(info.evt_PROPERTY.subscript);
ievts++;
info.had_initial_PROPERTY = false;
}
initial_waits = ievts;
} else if(initial_waits==0) {
// new subscriber and already had initial update
needpost = true;
} // else new subscriber, but no initial update. so just wait
} else if(initial_waits==0) {
// new subscriber and already had initial update
mon->post(G);
} // else new subscriber, but no initial update. so just wait
if(interested_iterating)
interested_add.insert(mon);
else
interested.insert(mon);
}
if(needpost)
mon->post();
if(interested_iterating)
interested_add.insert(mon);
else
interested.insert(mon);
}
// caller must not hold lock
@ -285,7 +275,8 @@ PDBGroupChannel::createMonitor(
PDBGroupMonitor::shared_pointer ret(new PDBGroupMonitor(pv->shared_from_this(), requester, pvRequest));
ret->weakself = ret;
assert(!!pv->complete);
ret->connect(pv->complete);
guard_t G(pv->lock);
ret->connect(G, pv->complete);
return ret;
}
@ -451,5 +442,5 @@ void PDBGroupMonitor::onStop()
void PDBGroupMonitor::requestUpdate()
{
Guard G(pv->lock);
post();
post(G);
}

View File

@ -35,21 +35,9 @@ void pdb_single_event(void *user_arg, struct dbChannel *chan,
DBEvent *evt=(DBEvent*)user_arg;
try{
PDBSinglePV::shared_pointer self(std::tr1::static_pointer_cast<PDBSinglePV>(((PDBSinglePV*)evt->self)->shared_from_this()));
PDBSinglePV::interested_remove_t temp;
{
bool doPost;
{
Guard G(self->lock);
if(evt->dbe_mask&DBE_PROPERTY)
self->hadevent_PROPERTY = true;
else
self->hadevent_VALUE = true;
doPost = self->hadevent_VALUE && self->hadevent_PROPERTY;
if(doPost)
self->interested_iterating = true;
}
Guard G(self->lock);
// we have exclusive use of self->scratch
self->scratch.clear();
@ -59,20 +47,19 @@ void pdb_single_event(void *user_arg, struct dbChannel *chan,
self->pvif->put(self->scratch, evt->dbe_mask, pfl);
}
if(!doPost)
return;
if(evt->dbe_mask&DBE_PROPERTY)
self->hadevent_PROPERTY = true;
else
self->hadevent_VALUE = true;
FOREACH(PDBSinglePV::interested_t::const_iterator, it, end, self->interested) {
PDBSingleMonitor& mon = **it;
// from self->complete into monitor queue element
mon.post(self->scratch);
}
if(self->hadevent_VALUE && self->hadevent_PROPERTY) {
self->interested_iterating = true;
PDBSinglePV::interested_remove_t temp;
{
Guard G(self->lock);
assert(self->interested_iterating);
FOREACH(PDBSinglePV::interested_t::const_iterator, it, end, self->interested) {
PDBSingleMonitor& mon = **it;
// from self->complete into monitor queue element
mon.post(G, self->scratch); // G unlocked during call
}
while(!self->interested_add.empty()) {
PDBSinglePV::interested_t::iterator first(self->interested_add.begin());
@ -146,33 +133,28 @@ PDBSinglePV::connect(const std::tr1::shared_ptr<PDBProvider>& prov,
void PDBSinglePV::addMonitor(PDBSingleMonitor* mon)
{
bool needpost = false;
{
Guard G(lock);
if(interested.empty() && interested_add.empty()) {
// first monitor
// start subscription
Guard G(lock);
if(interested.empty() && interested_add.empty()) {
// first monitor
// start subscription
hadevent_VALUE = false;
hadevent_PROPERTY = false;
db_event_enable(evt_VALUE.subscript);
db_event_enable(evt_PROPERTY.subscript);
db_post_single_event(evt_VALUE.subscript);
db_post_single_event(evt_PROPERTY.subscript);
hadevent_VALUE = false;
hadevent_PROPERTY = false;
db_event_enable(evt_VALUE.subscript);
db_event_enable(evt_PROPERTY.subscript);
db_post_single_event(evt_VALUE.subscript);
db_post_single_event(evt_PROPERTY.subscript);
} if(hadevent_VALUE && hadevent_PROPERTY) {
// new subscriber and already had initial update
needpost = true;
} // else new subscriber, but no initial update. so just wait
} if(hadevent_VALUE && hadevent_PROPERTY) {
// new subscriber and already had initial update
mon->post(G);
} // else new subscriber, but no initial update. so just wait
if(interested_iterating) {
interested_add.insert(mon);
} else {
interested.insert(mon);
}
if(interested_iterating) {
interested_add.insert(mon);
} else {
interested.insert(mon);
}
if(needpost)
mon->post();
}
void PDBSinglePV::removeMonitor(PDBSingleMonitor* mon)
@ -240,7 +222,8 @@ PDBSingleChannel::createMonitor(
PDBSingleMonitor::shared_pointer ret(new PDBSingleMonitor(pv->shared_from_this(), requester, pvRequest));
ret->weakself = ret;
assert(!!pv->complete);
ret->connect(pv->complete);
guard_t G(pv->lock);
ret->connect(G, pv->complete);
return ret;
}
@ -467,5 +450,5 @@ void PDBSingleMonitor::onStop()
void PDBSingleMonitor::requestUpdate()
{
guard_t G(pv->lock);
post();
post(G);
}