pdb fix monitor lock order
Don't hold PDB*PV::lock while iterating interested list, and post()ing.
This commit is contained in:
@ -1,4 +1,6 @@
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include <epicsAtomic.h>
|
||||
#include <dbAccess.h>
|
||||
|
||||
@ -27,20 +29,35 @@ void pdb_group_event(void *user_arg, struct dbChannel *chan,
|
||||
PDBGroupPV::Info& info = self->members[idx];
|
||||
|
||||
{
|
||||
Guard G(self->lock); // TODO: lock order?
|
||||
bool doPost;
|
||||
{
|
||||
Guard G(self->lock);
|
||||
|
||||
if(!(evt->dbe_mask&DBE_PROPERTY)) {
|
||||
if(!info.had_initial_VALUE) {
|
||||
info.had_initial_VALUE = true;
|
||||
self->initial_waits--;
|
||||
}
|
||||
} else {
|
||||
if(!info.had_initial_PROPERTY) {
|
||||
info.had_initial_PROPERTY = true;
|
||||
self->initial_waits--;
|
||||
if(!(evt->dbe_mask&DBE_PROPERTY)) {
|
||||
if(!info.had_initial_VALUE) {
|
||||
info.had_initial_VALUE = true;
|
||||
assert(self->initial_waits>0);
|
||||
self->initial_waits--;
|
||||
}
|
||||
} else {
|
||||
if(!info.had_initial_PROPERTY) {
|
||||
info.had_initial_PROPERTY = true;
|
||||
assert(self->initial_waits>0);
|
||||
self->initial_waits--;
|
||||
}
|
||||
}
|
||||
|
||||
doPost = self->initial_waits==0;
|
||||
|
||||
if(doPost)
|
||||
self->interested_iterating = true;
|
||||
|
||||
printf("# pdb_group_event group=%s pv=%s mask=%x waits=%u\n",
|
||||
self->name.c_str(), dbChannelName(evt->chan), evt->dbe_mask,
|
||||
(unsigned)self->initial_waits);
|
||||
}
|
||||
|
||||
self->scratch.clear();
|
||||
if(evt->dbe_mask&DBE_PROPERTY || !self->monatomic)
|
||||
{
|
||||
DBScanLocker L(dbChannelRecord(info.chan));
|
||||
@ -59,13 +76,34 @@ void pdb_group_event(void *user_arg, struct dbChannel *chan,
|
||||
}
|
||||
}
|
||||
|
||||
if(self->initial_waits>0) return; // don't post() until all subscriptions get initial updates
|
||||
if(!doPost) return; // don't post() until all subscriptions get initial updates
|
||||
|
||||
FOREACH(PDBGroupPV::interested_t::const_iterator, it, end, self->interested) {
|
||||
PDBGroupMonitor& mon = *it->get();
|
||||
PDBGroupMonitor& mon = **it;
|
||||
mon.post(self->scratch);
|
||||
}
|
||||
self->scratch.clear();
|
||||
|
||||
{
|
||||
Guard G(self->lock);
|
||||
|
||||
assert(self->interested_iterating);
|
||||
|
||||
while(!self->interested_add.empty()) {
|
||||
PDBGroupPV::interested_t::iterator first(self->interested_add.begin());
|
||||
self->interested.insert(*first);
|
||||
self->interested_add.erase(first);
|
||||
}
|
||||
|
||||
while(!self->interested_remove.empty()) {
|
||||
PDBGroupPV::interested_t::iterator first(self->interested_remove.begin());
|
||||
self->interested.erase(*first);
|
||||
self->interested_remove.erase(first);
|
||||
}
|
||||
|
||||
self->interested_iterating = false;
|
||||
|
||||
self->finalizeMonitor();
|
||||
}
|
||||
}
|
||||
|
||||
}catch(std::tr1::bad_weak_ptr&){
|
||||
@ -83,6 +121,7 @@ void pdb_group_event(void *user_arg, struct dbChannel *chan,
|
||||
PDBGroupPV::PDBGroupPV()
|
||||
:pgatomic(false)
|
||||
,monatomic(false)
|
||||
,interested_iterating(false)
|
||||
,initial_waits(0)
|
||||
{
|
||||
epics::atomic::increment(num_instances);
|
||||
@ -101,6 +140,83 @@ PDBGroupPV::connect(const std::tr1::shared_ptr<PDBProvider>& prov,
|
||||
return ret;
|
||||
}
|
||||
|
||||
// caller must not hold lock
|
||||
void PDBGroupPV::addMonitor(PDBGroupMonitor *mon)
|
||||
{
|
||||
bool needpost = false;
|
||||
{
|
||||
Guard G(lock);
|
||||
if(interested.empty() && interested_add.empty()) {
|
||||
// first monitor
|
||||
// start subscriptions
|
||||
|
||||
size_t ievts = 0;
|
||||
for(size_t i=0; i<members.size(); i++) {
|
||||
PDBGroupPV::Info& info = members[i];
|
||||
|
||||
if(!!info.evt_VALUE) {
|
||||
db_event_enable(info.evt_VALUE.subscript);
|
||||
db_post_single_event(info.evt_VALUE.subscript);
|
||||
ievts++;
|
||||
info.had_initial_VALUE = false;
|
||||
} else {
|
||||
info.had_initial_VALUE = true;
|
||||
}
|
||||
assert(info.evt_PROPERTY.subscript);
|
||||
db_event_enable(info.evt_PROPERTY.subscript);
|
||||
db_post_single_event(info.evt_PROPERTY.subscript);
|
||||
ievts++;
|
||||
info.had_initial_PROPERTY = false;
|
||||
}
|
||||
initial_waits = ievts;
|
||||
|
||||
} else if(initial_waits==0) {
|
||||
// new subscriber and already had initial update
|
||||
needpost = true;
|
||||
} // else new subscriber, but no initial update. so just wait
|
||||
|
||||
if(interested_iterating)
|
||||
interested_add.insert(mon);
|
||||
else
|
||||
interested.insert(mon);
|
||||
}
|
||||
if(needpost)
|
||||
mon->post();
|
||||
}
|
||||
|
||||
// caller must not hold lock
|
||||
void PDBGroupPV::removeMonitor(PDBGroupMonitor *mon)
|
||||
{
|
||||
Guard G(lock);
|
||||
|
||||
if(interested_iterating) {
|
||||
interested_remove.insert(mon);
|
||||
} else {
|
||||
interested.erase(mon);
|
||||
finalizeMonitor();
|
||||
}
|
||||
}
|
||||
|
||||
// must hold lock
|
||||
void PDBGroupPV::finalizeMonitor()
|
||||
{
|
||||
assert(!interested_iterating);
|
||||
|
||||
if(!interested.empty())
|
||||
return;
|
||||
|
||||
// last subscriber
|
||||
for(size_t i=0; i<members.size(); i++) {
|
||||
PDBGroupPV::Info& info = members[i];
|
||||
|
||||
if(!!info.evt_VALUE) {
|
||||
db_event_disable(info.evt_VALUE.subscript);
|
||||
}
|
||||
db_event_disable(info.evt_PROPERTY.subscript);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
PDBGroupChannel::PDBGroupChannel(const PDBGroupPV::shared_pointer& pv,
|
||||
const std::tr1::shared_ptr<pva::ChannelProvider>& prov,
|
||||
const pva::ChannelRequester::shared_pointer& req)
|
||||
@ -293,61 +409,12 @@ void PDBGroupMonitor::destroy()
|
||||
|
||||
void PDBGroupMonitor::onStart()
|
||||
{
|
||||
guard_t G(pv->lock);
|
||||
|
||||
pv->scratch.clear();
|
||||
pv->scratch.set(0);
|
||||
|
||||
if(pv->interested.empty()) {
|
||||
// first subscriber
|
||||
size_t ievts = 0;
|
||||
for(size_t i=0; i<pv->members.size(); i++) {
|
||||
PDBGroupPV::Info& info = pv->members[i];
|
||||
|
||||
if(!!info.evt_VALUE) {
|
||||
db_event_enable(info.evt_VALUE.subscript);
|
||||
db_post_single_event(info.evt_VALUE.subscript);
|
||||
ievts++;
|
||||
info.had_initial_VALUE = false;
|
||||
} else {
|
||||
info.had_initial_VALUE = true;
|
||||
}
|
||||
assert(info.evt_PROPERTY.subscript);
|
||||
db_event_enable(info.evt_PROPERTY.subscript);
|
||||
db_post_single_event(info.evt_PROPERTY.subscript);
|
||||
ievts++;
|
||||
info.had_initial_PROPERTY = false;
|
||||
}
|
||||
pv->initial_waits = ievts;
|
||||
} else if(pv->initial_waits==0) {
|
||||
// new subscriber and already had initial update
|
||||
post();
|
||||
} // else new subscriber, but no initial update. so just wait
|
||||
|
||||
shared_pointer self(std::tr1::static_pointer_cast<PDBGroupMonitor>(shared_from_this()));
|
||||
pv->interested.insert(self);
|
||||
pv->addMonitor(this);
|
||||
}
|
||||
|
||||
void PDBGroupMonitor::onStop()
|
||||
{
|
||||
guard_t G(pv->lock);
|
||||
shared_pointer self(std::tr1::static_pointer_cast<PDBGroupMonitor>(shared_from_this()));
|
||||
|
||||
if(pv->interested.erase(self)==0) {
|
||||
fprintf(stderr, "%s: oops\n", __FUNCTION__);
|
||||
}
|
||||
|
||||
if(pv->interested.empty()) {
|
||||
// last subscriber
|
||||
for(size_t i=0; i<pv->members.size(); i++) {
|
||||
PDBGroupPV::Info& info = pv->members[i];
|
||||
|
||||
if(!!info.evt_VALUE) {
|
||||
db_event_disable(info.evt_VALUE.subscript);
|
||||
}
|
||||
db_event_disable(info.evt_PROPERTY.subscript);
|
||||
}
|
||||
}
|
||||
pv->addMonitor(this);
|
||||
}
|
||||
|
||||
void PDBGroupMonitor::requestUpdate()
|
||||
|
@ -76,6 +76,10 @@ struct epicsShareClass PDBGroupPV : public PDBPV
|
||||
weak_pointer weakself;
|
||||
inline shared_pointer shared_from_this() { return shared_pointer(weakself); }
|
||||
|
||||
// only for use in pdb_single_event()
|
||||
// which is not concurrent for all VALUE/PROPERTY.
|
||||
epics::pvData::BitSet scratch;
|
||||
|
||||
epicsMutex lock;
|
||||
|
||||
bool pgatomic, monatomic;
|
||||
@ -101,13 +105,12 @@ struct epicsShareClass PDBGroupPV : public PDBPV
|
||||
|
||||
DBManyLock locker; // all member channels
|
||||
|
||||
// monitor only
|
||||
epics::pvData::BitSet scratch;
|
||||
|
||||
epics::pvData::PVStructurePtr complete; // complete copy from subscription
|
||||
|
||||
typedef std::set<std::tr1::shared_ptr<PDBGroupMonitor> > interested_t;
|
||||
interested_t interested;
|
||||
typedef std::set<PDBGroupMonitor*> interested_t;
|
||||
bool interested_iterating;
|
||||
interested_t interested, interested_add, interested_remove;
|
||||
|
||||
size_t initial_waits;
|
||||
|
||||
static size_t num_instances;
|
||||
@ -119,6 +122,10 @@ struct epicsShareClass PDBGroupPV : public PDBPV
|
||||
epics::pvAccess::Channel::shared_pointer
|
||||
connect(const std::tr1::shared_ptr<PDBProvider>& prov,
|
||||
const epics::pvAccess::ChannelRequester::shared_pointer& req);
|
||||
|
||||
void addMonitor(PDBGroupMonitor*);
|
||||
void removeMonitor(PDBGroupMonitor*);
|
||||
void finalizeMonitor();
|
||||
};
|
||||
|
||||
struct epicsShareClass PDBGroupChannel : public BaseChannel,
|
||||
|
@ -31,26 +31,57 @@ void pdb_single_event(void *user_arg, struct dbChannel *chan,
|
||||
try{
|
||||
PDBSinglePV::shared_pointer self(std::tr1::static_pointer_cast<PDBSinglePV>(((PDBSinglePV*)evt->self)->shared_from_this()));
|
||||
{
|
||||
Guard G(self->lock); // TODO: lock order?
|
||||
bool doPost;
|
||||
{
|
||||
Guard G(self->lock);
|
||||
|
||||
if(evt->dbe_mask&DBE_PROPERTY)
|
||||
self->hadevent_PROPERTY = true;
|
||||
else
|
||||
self->hadevent_VALUE = true;
|
||||
|
||||
doPost = self->hadevent_VALUE && self->hadevent_PROPERTY;
|
||||
|
||||
if(doPost)
|
||||
self->interested_iterating = true;
|
||||
}
|
||||
|
||||
// we have exclusive use of self->scratch
|
||||
self->scratch.clear();
|
||||
{
|
||||
DBScanLocker L(dbChannelRecord(self->chan));
|
||||
self->pvif->put(self->scratch, evt->dbe_mask, pfl);
|
||||
}
|
||||
|
||||
if(evt->dbe_mask&DBE_PROPERTY)
|
||||
self->hadevent_PROPERTY = true;
|
||||
else
|
||||
self->hadevent_VALUE = true;
|
||||
|
||||
if(!self->hadevent_VALUE || !self->hadevent_PROPERTY)
|
||||
if(!doPost)
|
||||
return;
|
||||
|
||||
FOREACH(PDBSinglePV::interested_t::const_iterator, it, end, self->interested) {
|
||||
PDBSingleMonitor& mon = *it->get();
|
||||
PDBSingleMonitor& mon = **it;
|
||||
mon.post(self->scratch);
|
||||
}
|
||||
self->scratch.clear();
|
||||
|
||||
{
|
||||
Guard G(self->lock);
|
||||
|
||||
assert(self->interested_iterating);
|
||||
|
||||
while(!self->interested_add.empty()) {
|
||||
PDBSinglePV::interested_t::iterator first(self->interested_add.begin());
|
||||
self->interested.insert(*first);
|
||||
self->interested_add.erase(first);
|
||||
}
|
||||
|
||||
while(!self->interested_remove.empty()) {
|
||||
PDBSinglePV::interested_t::iterator first(self->interested_remove.begin());
|
||||
self->interested.erase(*first);
|
||||
self->interested_remove.erase(first);
|
||||
}
|
||||
|
||||
self->interested_iterating = false;
|
||||
|
||||
self->finalizeMonitor();
|
||||
}
|
||||
}
|
||||
|
||||
}catch(std::tr1::bad_weak_ptr&){
|
||||
@ -69,6 +100,7 @@ PDBSinglePV::PDBSinglePV(DBCH& chan,
|
||||
const PDBProvider::shared_pointer& prov)
|
||||
:provider(prov)
|
||||
,builder(new ScalarBuilder)
|
||||
,interested_iterating(false)
|
||||
,evt_VALUE(this)
|
||||
,evt_PROPERTY(this)
|
||||
,hadevent_VALUE(false)
|
||||
@ -103,6 +135,58 @@ PDBSinglePV::connect(const std::tr1::shared_ptr<PDBProvider>& prov,
|
||||
return ret;
|
||||
}
|
||||
|
||||
void PDBSinglePV::addMonitor(PDBSingleMonitor* mon)
|
||||
{
|
||||
bool needpost = false;
|
||||
{
|
||||
Guard G(lock);
|
||||
if(interested.empty() && interested_add.empty()) {
|
||||
// first monitor
|
||||
// start subscription
|
||||
|
||||
hadevent_VALUE = false;
|
||||
hadevent_PROPERTY = false;
|
||||
db_event_enable(evt_VALUE.subscript);
|
||||
db_event_enable(evt_PROPERTY.subscript);
|
||||
db_post_single_event(evt_VALUE.subscript);
|
||||
db_post_single_event(evt_PROPERTY.subscript);
|
||||
|
||||
} if(hadevent_VALUE && hadevent_PROPERTY) {
|
||||
// new subscriber and already had initial update
|
||||
needpost = true;
|
||||
} // else new subscriber, but no initial update. so just wait
|
||||
|
||||
if(interested_iterating)
|
||||
interested_add.insert(mon);
|
||||
else
|
||||
interested.insert(mon);
|
||||
}
|
||||
if(needpost)
|
||||
mon->post();
|
||||
}
|
||||
|
||||
void PDBSinglePV::removeMonitor(PDBSingleMonitor* mon)
|
||||
{
|
||||
Guard G(lock);
|
||||
|
||||
if(interested_iterating) {
|
||||
interested_remove.insert(mon);
|
||||
} else {
|
||||
interested.erase(mon);
|
||||
finalizeMonitor();
|
||||
}
|
||||
}
|
||||
|
||||
void PDBSinglePV::finalizeMonitor()
|
||||
{
|
||||
assert(!interested_iterating);
|
||||
|
||||
if(interested.empty()) {
|
||||
db_event_disable(evt_VALUE.subscript);
|
||||
db_event_disable(evt_PROPERTY.subscript);
|
||||
}
|
||||
}
|
||||
|
||||
PDBSingleChannel::PDBSingleChannel(const PDBSinglePV::shared_pointer& pv,
|
||||
const pva::ChannelRequester::shared_pointer& req)
|
||||
:BaseChannel(dbChannelName(pv->chan), pv->provider, req, pv->fielddesc)
|
||||
@ -357,25 +441,8 @@ void PDBSingleMonitor::onStart()
|
||||
{
|
||||
PDBSinglePV::shared_pointer PV(pv.lock());
|
||||
if(!PV) return;
|
||||
guard_t G(PV->lock);
|
||||
|
||||
PV->scratch.clear();
|
||||
PV->scratch.set(0);
|
||||
if(PV->interested.empty()) {
|
||||
// first subscriber
|
||||
PV->hadevent_VALUE = false;
|
||||
PV->hadevent_PROPERTY = false;
|
||||
db_event_enable(PV->evt_VALUE.subscript);
|
||||
db_event_enable(PV->evt_PROPERTY.subscript);
|
||||
db_post_single_event(PV->evt_VALUE.subscript);
|
||||
db_post_single_event(PV->evt_PROPERTY.subscript);
|
||||
} else if(PV->hadevent_VALUE && PV->hadevent_PROPERTY) {
|
||||
// new subscriber and already had initial update
|
||||
post();
|
||||
} // else new subscriber, but no initial update. so just wait
|
||||
|
||||
shared_pointer self(std::tr1::static_pointer_cast<PDBSingleMonitor>(shared_from_this()));
|
||||
PV->interested.insert(self);
|
||||
PV->addMonitor(this);
|
||||
}
|
||||
|
||||
void PDBSingleMonitor::onStop()
|
||||
@ -383,17 +450,8 @@ void PDBSingleMonitor::onStop()
|
||||
PDBSinglePV::shared_pointer PV(pv.lock());
|
||||
if(PV) return;
|
||||
guard_t G(PV->lock);
|
||||
shared_pointer self(std::tr1::static_pointer_cast<PDBSingleMonitor>(shared_from_this()));
|
||||
|
||||
if(PV->interested.erase(self)==0) {
|
||||
fprintf(stderr, "%s: oops\n", __FUNCTION__);
|
||||
}
|
||||
|
||||
if(PV->interested.empty()) {
|
||||
// last subscriber
|
||||
db_event_disable(PV->evt_VALUE.subscript);
|
||||
db_event_disable(PV->evt_PROPERTY.subscript);
|
||||
}
|
||||
PV->removeMonitor(this);
|
||||
}
|
||||
|
||||
void PDBSingleMonitor::requestUpdate()
|
||||
|
@ -33,16 +33,20 @@ struct epicsShareClass PDBSinglePV : public PDBPV
|
||||
DBCH chan;
|
||||
PDBProvider::shared_pointer provider;
|
||||
|
||||
// only for use in pdb_single_event()
|
||||
// which is not concurrent for VALUE/PROPERTY.
|
||||
epics::pvData::BitSet scratch;
|
||||
|
||||
epicsMutex lock;
|
||||
|
||||
epics::pvData::BitSet scratch;
|
||||
p2p::auto_ptr<ScalarBuilder> builder;
|
||||
p2p::auto_ptr<PVIF> pvif;
|
||||
|
||||
epics::pvData::PVStructurePtr complete; // complete copy from subscription
|
||||
|
||||
typedef std::set<std::tr1::shared_ptr<PDBSingleMonitor> > interested_t;
|
||||
interested_t interested;
|
||||
typedef std::set<PDBSingleMonitor*> interested_t;
|
||||
bool interested_iterating;
|
||||
interested_t interested, interested_add, interested_remove;
|
||||
|
||||
DBEvent evt_VALUE, evt_PROPERTY;
|
||||
bool hadevent_VALUE, hadevent_PROPERTY;
|
||||
@ -58,6 +62,10 @@ struct epicsShareClass PDBSinglePV : public PDBPV
|
||||
epics::pvAccess::Channel::shared_pointer
|
||||
connect(const std::tr1::shared_ptr<PDBProvider>& prov,
|
||||
const epics::pvAccess::ChannelRequester::shared_pointer& req);
|
||||
|
||||
void addMonitor(PDBSingleMonitor*);
|
||||
void removeMonitor(PDBSingleMonitor*);
|
||||
void finalizeMonitor();
|
||||
};
|
||||
|
||||
struct PDBSingleChannel : public BaseChannel,
|
||||
|
Reference in New Issue
Block a user