add lock for MonitorUser
This commit is contained in:
@ -62,6 +62,8 @@ struct MonitorUser : public epics::pvData::Monitor
|
||||
MonitorCacheEntry::shared_pointer entry;
|
||||
epics::pvData::MonitorRequester::weak_pointer req;
|
||||
|
||||
// guards queues and member variables
|
||||
epicsMutex queueLock;
|
||||
bool running;
|
||||
size_t nwakeups; // # of monitorEvent() calls to req
|
||||
size_t nevents; // total # events queued
|
||||
|
@ -103,23 +103,29 @@ MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor)
|
||||
MonitorUser *usr = it->get();
|
||||
pvd::MonitorRequester::shared_pointer req(usr->req);
|
||||
|
||||
bool notify = false;
|
||||
{
|
||||
Guard G(chan->cache->cacheLock); // TODO: more granular lock
|
||||
Guard G(usr->queueLock); // TODO: more granular lock
|
||||
if(!usr->running || usr->empty.empty()) {
|
||||
epicsAtomicIncrSizeT(&usr->ndropped);
|
||||
continue;
|
||||
}
|
||||
notify = usr->filled.empty();
|
||||
|
||||
AUTO_VAL(elem, usr->empty.front());
|
||||
|
||||
pvd::MonitorElementPtr elem(usr->empty.front());
|
||||
elem->pvStructurePtr = update->pvStructurePtr;
|
||||
elem->overrunBitSet = update->overrunBitSet;
|
||||
elem->changedBitSet = update->changedBitSet;
|
||||
|
||||
usr->filled.push_back(elem);
|
||||
usr->empty.pop_front();
|
||||
|
||||
epicsAtomicIncrSizeT(&usr->nevents);
|
||||
}
|
||||
|
||||
if(usr->filled.size()==1) {
|
||||
//TODO: notify from worker since monitroEvent() will call poll()
|
||||
if(notify) {
|
||||
epicsAtomicIncrSizeT(&usr->nwakeups);
|
||||
req->monitorEvent(*it); // notify when first item added to empty queue
|
||||
}
|
||||
@ -181,47 +187,56 @@ MonitorUser::destroy()
|
||||
pvd::Status
|
||||
MonitorUser::start()
|
||||
{
|
||||
pvd::MonitorRequester::shared_pointer req;
|
||||
pvd::MonitorRequester::shared_pointer req(this->req.lock());
|
||||
if(!req)
|
||||
return pvd::Status(pvd::Status::STATUSTYPE_FATAL, "already dead");
|
||||
|
||||
bool doEvt = false;
|
||||
pvd::PVStructurePtr lval;
|
||||
pvd::StructureConstPtr typedesc;
|
||||
{
|
||||
Guard G(entry->chan->cache->cacheLock);
|
||||
|
||||
req = this->req.lock();
|
||||
if(!req)
|
||||
return pvd::Status(pvd::Status::STATUSTYPE_FATAL, "already dead");
|
||||
if(!entry->startresult.isSuccess())
|
||||
return entry->startresult;
|
||||
|
||||
if(entry->startresult.isSuccess())
|
||||
running = true;
|
||||
lval = entry->lastval;
|
||||
typedesc = entry->typedesc;
|
||||
}
|
||||
|
||||
{
|
||||
Guard G(queueLock);
|
||||
|
||||
//TODO: control queue size
|
||||
empty.resize(4);
|
||||
pvd::PVDataCreatePtr fact(pvd::getPVDataCreate());
|
||||
for(unsigned i=0; i<empty.size(); i++) {
|
||||
empty[i].reset(new pvd::MonitorElement(fact->createPVStructure(entry->typedesc)));
|
||||
empty[i].reset(new pvd::MonitorElement(fact->createPVStructure(typedesc)));
|
||||
}
|
||||
|
||||
if(entry->lastval) {
|
||||
if(lval) {
|
||||
//already running, notify of initial element
|
||||
assert(!empty.empty());
|
||||
|
||||
pvd::MonitorElementPtr elem(empty.front());
|
||||
elem->pvStructurePtr = entry->lastval;
|
||||
elem->pvStructurePtr = lval;
|
||||
elem->changedBitSet->set(0); // indicate all changed?
|
||||
filled.push_back(elem);
|
||||
empty.pop_front();
|
||||
|
||||
doEvt = true;
|
||||
}
|
||||
running = true;
|
||||
}
|
||||
if(doEvt)
|
||||
req->monitorEvent(weakref.lock());
|
||||
req->monitorEvent(shared_pointer(weakref));
|
||||
return pvd::Status();
|
||||
}
|
||||
|
||||
pvd::Status
|
||||
MonitorUser::stop()
|
||||
{
|
||||
Guard G(entry->chan->cache->cacheLock);
|
||||
Guard G(queueLock);
|
||||
running = false;
|
||||
return pvd::Status::Ok;
|
||||
}
|
||||
@ -229,7 +244,7 @@ MonitorUser::stop()
|
||||
pvd::MonitorElementPtr
|
||||
MonitorUser::poll()
|
||||
{
|
||||
Guard G(entry->chan->cache->cacheLock);
|
||||
Guard G(queueLock);
|
||||
pvd::MonitorElementPtr ret;
|
||||
if(!filled.empty()) {
|
||||
ret = filled.front();
|
||||
@ -243,7 +258,7 @@ MonitorUser::poll()
|
||||
void
|
||||
MonitorUser::release(pvd::MonitorElementPtr const & monitorElement)
|
||||
{
|
||||
Guard G(entry->chan->cache->cacheLock);
|
||||
Guard G(queueLock);
|
||||
//TODO: ifdef DEBUG? (only track inuse when debugging?)
|
||||
std::set<epics::pvData::MonitorElementPtr>::iterator it = inuse.find(monitorElement);
|
||||
if(it!=inuse.end()) {
|
||||
@ -254,6 +269,7 @@ MonitorUser::release(pvd::MonitorElementPtr const & monitorElement)
|
||||
//TODO: check empty and filled lists to see if this is one of ours, of from somewhere else
|
||||
throw std::invalid_argument("Can't release MonitorElement not in use");
|
||||
}
|
||||
// TODO: pipeline window update
|
||||
}
|
||||
|
||||
std::string
|
||||
|
@ -369,16 +369,19 @@ void statusServer(int lvl, const char *chanexpr)
|
||||
MonitorUser& MU = **it3;
|
||||
|
||||
size_t nempty, nfilled, nused, total;
|
||||
bool isrunning;
|
||||
{
|
||||
Guard G(scp->cache.cacheLock);
|
||||
|
||||
nempty = MU.empty.size();
|
||||
nfilled = MU.filled.size();
|
||||
nused = MU.inuse.size();
|
||||
isrunning = MU.running;
|
||||
}
|
||||
total = nempty + nfilled + nused;
|
||||
|
||||
std::cout<<" Server monitor buffer "<<nfilled<<"/"<<total
|
||||
std::cout<<" Server monitor"<<(isrunning?"":" Paused")
|
||||
<<" buffer "<<nfilled<<"/"<<total
|
||||
<<" out "<<nused<<"/"<<total
|
||||
<<" "<<epicsAtomicGetSizeT(&MU.nwakeups)<<" wakeups "
|
||||
<<epicsAtomicGetSizeT(&MU.nevents)<<" events "
|
||||
|
Reference in New Issue
Block a user