From 7de48f0bdacda8fdcad2ce8672dba82feda9fec6 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 8 Dec 2015 17:03:03 -0500 Subject: [PATCH] add lock for MonitorUser --- p2pApp/chancache.h | 2 ++ p2pApp/moncache.cpp | 48 ++++++++++++++++++++++++++++++--------------- p2pApp/server.cpp | 5 ++++- 3 files changed, 38 insertions(+), 17 deletions(-) diff --git a/p2pApp/chancache.h b/p2pApp/chancache.h index 389a697..983dfe2 100644 --- a/p2pApp/chancache.h +++ b/p2pApp/chancache.h @@ -62,6 +62,8 @@ struct MonitorUser : public epics::pvData::Monitor MonitorCacheEntry::shared_pointer entry; epics::pvData::MonitorRequester::weak_pointer req; + // guards queues and member variables + epicsMutex queueLock; bool running; size_t nwakeups; // # of monitorEvent() calls to req size_t nevents; // total # events queued diff --git a/p2pApp/moncache.cpp b/p2pApp/moncache.cpp index 2db631b..2def5a9 100644 --- a/p2pApp/moncache.cpp +++ b/p2pApp/moncache.cpp @@ -103,23 +103,29 @@ MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor) MonitorUser *usr = it->get(); pvd::MonitorRequester::shared_pointer req(usr->req); + bool notify = false; { - Guard G(chan->cache->cacheLock); // TODO: more granular lock + Guard G(usr->queueLock); // TODO: more granular lock if(!usr->running || usr->empty.empty()) { epicsAtomicIncrSizeT(&usr->ndropped); continue; } + notify = usr->filled.empty(); + + AUTO_VAL(elem, usr->empty.front()); - pvd::MonitorElementPtr elem(usr->empty.front()); elem->pvStructurePtr = update->pvStructurePtr; elem->overrunBitSet = update->overrunBitSet; elem->changedBitSet = update->changedBitSet; + usr->filled.push_back(elem); usr->empty.pop_front(); + epicsAtomicIncrSizeT(&usr->nevents); } - if(usr->filled.size()==1) { + //TODO: notify from worker since monitroEvent() will call poll() + if(notify) { epicsAtomicIncrSizeT(&usr->nwakeups); req->monitorEvent(*it); // notify when first item added to empty queue } @@ -181,47 +187,56 @@ MonitorUser::destroy() pvd::Status MonitorUser::start() { - pvd::MonitorRequester::shared_pointer req; + pvd::MonitorRequester::shared_pointer req(this->req.lock()); + if(!req) + return pvd::Status(pvd::Status::STATUSTYPE_FATAL, "already dead"); + bool doEvt = false; + pvd::PVStructurePtr lval; + pvd::StructureConstPtr typedesc; { Guard G(entry->chan->cache->cacheLock); - req = this->req.lock(); - if(!req) - return pvd::Status(pvd::Status::STATUSTYPE_FATAL, "already dead"); + if(!entry->startresult.isSuccess()) + return entry->startresult; - if(entry->startresult.isSuccess()) - running = true; + lval = entry->lastval; + typedesc = entry->typedesc; + } + + { + Guard G(queueLock); //TODO: control queue size empty.resize(4); pvd::PVDataCreatePtr fact(pvd::getPVDataCreate()); for(unsigned i=0; icreatePVStructure(entry->typedesc))); + empty[i].reset(new pvd::MonitorElement(fact->createPVStructure(typedesc))); } - if(entry->lastval) { + if(lval) { //already running, notify of initial element assert(!empty.empty()); pvd::MonitorElementPtr elem(empty.front()); - elem->pvStructurePtr = entry->lastval; + elem->pvStructurePtr = lval; elem->changedBitSet->set(0); // indicate all changed? filled.push_back(elem); empty.pop_front(); doEvt = true; } + running = true; } if(doEvt) - req->monitorEvent(weakref.lock()); + req->monitorEvent(shared_pointer(weakref)); return pvd::Status(); } pvd::Status MonitorUser::stop() { - Guard G(entry->chan->cache->cacheLock); + Guard G(queueLock); running = false; return pvd::Status::Ok; } @@ -229,7 +244,7 @@ MonitorUser::stop() pvd::MonitorElementPtr MonitorUser::poll() { - Guard G(entry->chan->cache->cacheLock); + Guard G(queueLock); pvd::MonitorElementPtr ret; if(!filled.empty()) { ret = filled.front(); @@ -243,7 +258,7 @@ MonitorUser::poll() void MonitorUser::release(pvd::MonitorElementPtr const & monitorElement) { - Guard G(entry->chan->cache->cacheLock); + Guard G(queueLock); //TODO: ifdef DEBUG? (only track inuse when debugging?) std::set::iterator it = inuse.find(monitorElement); if(it!=inuse.end()) { @@ -254,6 +269,7 @@ MonitorUser::release(pvd::MonitorElementPtr const & monitorElement) //TODO: check empty and filled lists to see if this is one of ours, of from somewhere else throw std::invalid_argument("Can't release MonitorElement not in use"); } + // TODO: pipeline window update } std::string diff --git a/p2pApp/server.cpp b/p2pApp/server.cpp index e6a0916..606f80a 100644 --- a/p2pApp/server.cpp +++ b/p2pApp/server.cpp @@ -369,16 +369,19 @@ void statusServer(int lvl, const char *chanexpr) MonitorUser& MU = **it3; size_t nempty, nfilled, nused, total; + bool isrunning; { Guard G(scp->cache.cacheLock); nempty = MU.empty.size(); nfilled = MU.filled.size(); nused = MU.inuse.size(); + isrunning = MU.running; } total = nempty + nfilled + nused; - std::cout<<" Server monitor buffer "<