From 2f64098e15272d33774f2dec586dc6900b4b8483 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 16 Mar 2016 23:49:46 -0400 Subject: [PATCH] start group monitor --- pdbApp/pdb.cpp | 40 ++++++++++++++- pdbApp/pdbgroup.cpp | 117 +++++++++++++++++++++++++++++++++++++++++++ pdbApp/pdbgroup.h | 43 +++++++++++++++- pdbApp/pdbsingle.cpp | 2 +- pdbApp/pvif.h | 7 ++- testApp/testpdb.cpp | 15 ++++++ 6 files changed, 220 insertions(+), 4 deletions(-) diff --git a/pdbApp/pdb.cpp b/pdbApp/pdb.cpp index 680caaf..44debfd 100644 --- a/pdbApp/pdb.cpp +++ b/pdbApp/pdb.cpp @@ -89,6 +89,7 @@ PDBProvider::PDBProvider() } pvd::FieldBuilderPtr builder(pvd::getFieldCreate()->createFieldBuilder()); + pvd::PVDataCreatePtr pvbuilder(pvd::getPVDataCreate()); FOREACH(it, end, groups) { @@ -103,7 +104,6 @@ PDBProvider::PDBProvider() pv->weakself = pv; pv->name = info.name; pv->attachments.resize(nchans); - //pv->chan.resize(nchans); pvd::shared_vector chans(nchans); std::vector records(nchans); @@ -121,6 +121,7 @@ PDBProvider::PDBProvider() } pv->fielddesc = builder->createStructure(); + pv->complete = pvbuilder->createPVStructure(pv->fielddesc); pv->chan.swap(chans); DBManyLock L(&records[0], records.size(), 0); @@ -139,6 +140,40 @@ PDBProvider::PDBProvider() int ret = db_start_events(event_context, "PDB-event", NULL, NULL, epicsThreadPriorityCAServerLow-1); if(ret!=DB_EVENT_OK) throw std::runtime_error("Failed to stsart dbEvent context"); + + try { + FOREACH(it, end, persist_pv_map) + { + const PDBPV::shared_pointer& ppv = it->second; + PDBGroupPV *pv = dynamic_cast(ppv.get()); + if(!pv) + continue; + const size_t nchans = pv->chan.size(); + + // prepare for monitor + + pv->pvif.resize(nchans); + epics::pvData::shared_vector values(nchans), props(nchans); + + for(size_t i=0; ipvif[i].reset(PVIF::attach(pv->chan[i], + pv->complete->getSubFieldT(pv->attachments[i]))); + + values[i].create(event_context, pv->chan[i], &pdb_group_event, DBE_VALUE|DBE_ALARM); + values[i].self = pv; + props[i].create(event_context, pv->chan[i], &pdb_group_event, DBE_PROPERTY); + props[i].self = pv; + values[i].index = props[i].index = i; + } + + pv->evts_VALUE.swap(values); + pv->evts_PROPERTY.swap(props); + } + }catch(...){ + db_close_events(event_context); + throw; + } } PDBProvider::~PDBProvider() @@ -158,10 +193,13 @@ PDBProvider::~PDBProvider() void PDBProvider::destroy() { dbEventCtx ctxt = NULL; + persist_pv_map_t ppv; { epicsGuard G(transient_pv_map.mutex()); + persist_pv_map.swap(ppv); std::swap(ctxt, event_context); } + ppv.clear(); // indirectly calls all db_cancel_events() if(ctxt) db_close_events(ctxt); } diff --git a/pdbApp/pdbgroup.cpp b/pdbApp/pdbgroup.cpp index 2db0f65..b2dbba7 100644 --- a/pdbApp/pdbgroup.cpp +++ b/pdbApp/pdbgroup.cpp @@ -2,6 +2,8 @@ #include #include +#include "errlogstream.h" +#include "helper.h" #include "pdbgroup.h" #include "pdb.h" @@ -10,6 +12,46 @@ namespace pva = epics::pvAccess; size_t PDBGroupPV::ninstances; +typedef epicsGuard Guard; + +void pdb_group_event(void *user_arg, struct dbChannel *chan, + int eventsRemaining, struct db_field_log *pfl) +{ + DBEvent *evt=(DBEvent*)user_arg; + unsigned idx = evt->index; + try{ + PDBGroupPV::shared_pointer self(std::tr1::static_pointer_cast(((PDBGroupPV*)evt->self)->shared_from_this())); + + { + Guard G(self->lock); // TODO: lock order? + + self->scratch.clear(); + { + DBScanLocker L(dbChannelRecord(self->chan[idx])); + self->pvif[idx]->put(self->scratch, evt->dbe_mask, pfl); + } + + self->hadevent = true; + + FOREACH(it, end, self->interested) { + PDBGroupMonitor& mon = *it->get(); + mon.post(self->scratch); + } + } + + }catch(std::tr1::bad_weak_ptr&){ + /* We are racing destruction of the PDBGroupPV, but things are ok. + * The destructor is running, but has not completed db_cancel_event() + * so storage is still valid. + * Just do nothing + */ + }catch(std::exception& e){ + errlog_ostream strm; + strm<<"Unhandled exception in pdb_group_event(): "<shared_from_this(), requester, pvRequest)); + ret->weakself = ret; + assert(!!pv->complete); + ret->connect(pv->complete); + return ret; +} + PDBGroupGet::PDBGroupGet(const PDBGroupChannel::shared_pointer &channel, const pva::ChannelGetRequester::shared_pointer &requester, @@ -192,3 +246,66 @@ void PDBGroupPut::get() changed->set(0); requester->getDone(pvd::Status(), shared_from_this(), pvf, changed); } + +PDBGroupMonitor::PDBGroupMonitor(const PDBGroupPV::shared_pointer& pv, + const requester_t::shared_pointer& requester, + const pvd::PVStructure::shared_pointer& pvReq) + :BaseMonitor(requester, pvReq) + ,pv(pv) +{} + +void PDBGroupMonitor::destroy() +{ + BaseMonitor::destroy(); + PDBGroupPV::shared_pointer pv; + { + Guard G(lock); + this->pv.swap(pv); + } +} + +void PDBGroupMonitor::onStart() +{ + guard_t G(pv->lock); + + if(pv->interested.empty()) { + // first subscriber + pv->hadevent = false; + for(size_t i=0; ievts_VALUE.size(); i++) { + db_event_enable(pv->evts_VALUE[i].subscript); + db_event_enable(pv->evts_PROPERTY[i].subscript); + db_post_single_event(pv->evts_VALUE[i].subscript); + db_post_single_event(pv->evts_PROPERTY[i].subscript); + } + } else if(pv->hadevent) { + // new subscriber and already had initial update + post(); + } // else new subscriber, but no initial update. so just wait + + shared_pointer self(std::tr1::static_pointer_cast(shared_from_this())); + pv->interested.insert(self); +} + +void PDBGroupMonitor::onStop() +{ + guard_t G(pv->lock); + shared_pointer self(std::tr1::static_pointer_cast(shared_from_this())); + + if(pv->interested.erase(self)==0) { + fprintf(stderr, "%s: oops\n", __FUNCTION__); + } + + if(pv->interested.empty()) { + // last subscriber + for(size_t i=0; ievts_VALUE.size(); i++) { + db_event_disable(pv->evts_VALUE[i].subscript); + db_event_disable(pv->evts_PROPERTY[i].subscript); + } + } +} + +void PDBGroupMonitor::requestUpdate() +{ + Guard G(pv->lock); + post(); +} diff --git a/pdbApp/pdbgroup.h b/pdbApp/pdbgroup.h index fb1a4fd..afd37e8 100644 --- a/pdbApp/pdbgroup.h +++ b/pdbApp/pdbgroup.h @@ -12,17 +12,36 @@ #include "pvif.h" #include "pdb.h" +struct PDBGroupMonitor; + +void pdb_group_event(void *user_arg, struct dbChannel *chan, + int eventsRemaining, struct db_field_log *pfl); + struct PDBGroupPV : public PDBPV { POINTER_DEFINITIONS(PDBGroupPV); weak_pointer weakself; inline shared_pointer shared_from_this() { return shared_pointer(weakself); } + epicsMutex lock; + + // get/put/monitor std::string name; epics::pvData::shared_vector chan; std::vector attachments; DBManyLock locker; + // monitor only + epics::pvData::BitSet scratch; + std::vector > pvif; + epics::pvData::shared_vector evts_VALUE, evts_PROPERTY; + + epics::pvData::PVStructurePtr complete; // complete copy from subscription + + typedef std::set > interested_t; + interested_t interested; + bool hadevent; + static size_t ninstances; PDBGroupPV(); @@ -52,6 +71,9 @@ struct PDBGroupChannel : public BaseChannel, virtual epics::pvAccess::ChannelPut::shared_pointer createChannelPut( epics::pvAccess::ChannelPutRequester::shared_pointer const & requester, epics::pvData::PVStructure::shared_pointer const & pvRequest); + virtual epics::pvData::Monitor::shared_pointer createMonitor( + epics::pvData::MonitorRequester::shared_pointer const & requester, + epics::pvData::PVStructure::shared_pointer const & pvRequest); virtual void printInfo(std::ostream& out); }; @@ -110,6 +132,25 @@ struct PDBGroupPut : public epics::pvAccess::ChannelPut, virtual void get(); }; -//struct PDBGroupMonitor : public epics::pvData::Monitor {}; +struct PDBGroupMonitor : public BaseMonitor +{ + POINTER_DEFINITIONS(PDBGroupMonitor); + + PDBGroupPV::shared_pointer pv; + + bool atomic; + + PDBGroupMonitor(const PDBGroupPV::shared_pointer& pv, + const requester_t::shared_pointer& requester, + const epics::pvData::PVStructure::shared_pointer& pvReq); + virtual ~PDBGroupMonitor() {destroy();} + + virtual void onStart(); + virtual void onStop(); + virtual void requestUpdate(); + + virtual void destroy(); + +}; #endif // PDBGROUP_H diff --git a/pdbApp/pdbsingle.cpp b/pdbApp/pdbsingle.cpp index 3bbf4e3..76587c4 100644 --- a/pdbApp/pdbsingle.cpp +++ b/pdbApp/pdbsingle.cpp @@ -41,7 +41,7 @@ void pdb_single_event(void *user_arg, struct dbChannel *chan, } }catch(std::tr1::bad_weak_ptr&){ - /* We are racing destruction of the PDBSingleMonitor, but things are ok. + /* We are racing destruction of the PDBSinglePV, but things are ok. * The destructor is running, but has not completed db_cancel_event() * so storage is still valid. * Just do nothing diff --git a/pdbApp/pvif.h b/pdbApp/pvif.h index 1293a87..a9ac507 100644 --- a/pdbApp/pvif.h +++ b/pdbApp/pvif.h @@ -101,7 +101,9 @@ struct DBEvent dbEventSubscription subscript; unsigned dbe_mask; void *self; - DBEvent(void* s) :subscript(NULL), self(s) {} + unsigned index; + DBEvent() :subscript(NULL), self(NULL), index(0) {} + DBEvent(void* s) :subscript(NULL), self(s), index(0) {} ~DBEvent() {destroy();} void create(dbEventCtx ctx, dbChannel *ch, EVENTFUNC *fn, unsigned mask) { @@ -113,6 +115,9 @@ struct DBEvent void destroy() { if(subscript) db_cancel_event(subscript); } +private: + DBEvent(const DBEvent&); + DBEvent& operator=(const DBEvent&); }; struct LocalFL diff --git a/testApp/testpdb.cpp b/testApp/testpdb.cpp index d3fdd0d..63df38e 100644 --- a/testApp/testpdb.cpp +++ b/testApp/testpdb.cpp @@ -340,6 +340,20 @@ void testSingleMonitor(const PDBProvider::shared_pointer& prov) testOk1(!e); } +void testGroupMonitor(const PDBProvider::shared_pointer& prov) +{ + testDiag("test group monitor"); + + testdbPutFieldOk("rec3", DBR_DOUBLE, 3.0); + testdbPutFieldOk("rec4", DBR_DOUBLE, 4.0); + testdbPutFieldOk("rec3.RVAL", DBR_LONG, 30); + testdbPutFieldOk("rec4.RVAL", DBR_LONG, 40); + + testDiag("subscribe to grp1"); + PVMonitor mon(prov, "grp1"); + mon.mon->start(); +} + } // namespace extern "C" @@ -366,6 +380,7 @@ MAIN(testpdb) testGroupPut(prov); testSingleMonitor(prov); + testGroupMonitor(prov); }catch(...){ prov->destroy(); throw;