diff --git a/p2pApp/chancache.h b/p2pApp/chancache.h index 8d24b71..58309bc 100644 --- a/p2pApp/chancache.h +++ b/p2pApp/chancache.h @@ -29,6 +29,8 @@ struct MonitorCacheEntry : public epics::pvData::MonitorRequester ChannelCacheEntry * const chan; bool done; + size_t nwakeups; // # of upstream monitorEvent() calls + size_t nevents; // # of upstream events poll()'d epics::pvData::StructureConstPtr typedesc; epics::pvData::PVStructure::shared_pointer lastval; @@ -61,6 +63,9 @@ struct MonitorUser : public epics::pvData::Monitor epics::pvData::MonitorRequester::weak_pointer req; bool running; + size_t nwakeups; // # of monitorEvent() calls to req + size_t nevents; // total # events queued + size_t ndropped; // # of events drop because our queue was full std::deque filled, empty; std::set inuse; diff --git a/p2pApp/moncache.cpp b/p2pApp/moncache.cpp index 2bc1913..0398053 100644 --- a/p2pApp/moncache.cpp +++ b/p2pApp/moncache.cpp @@ -14,6 +14,8 @@ size_t MonitorUser::num_instances; MonitorCacheEntry::MonitorCacheEntry(ChannelCacheEntry *ent) :chan(ent) ,done(false) + ,nwakeups(0) + ,nevents(0) { epicsAtomicIncrSizeT(&num_instances); } @@ -82,14 +84,18 @@ MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor) assert(monitor==mon || !lastval); if(!lastval) mon = monitor; + epicsUInt32 cntpoll = 0; //TODO: dequeue and requeue strategy code goes here + epicsAtomicIncrSizeT(&nwakeups); pvd::MonitorElementPtr update; while((update=mon->poll())) { + cntpoll++; lastval = update->pvStructurePtr; + epicsAtomicIncrSizeT(&nevents); AUTO_VAL(tonotify, interested.lock_vector()); // TODO: avoid copy, iterate w/ lock @@ -100,8 +106,10 @@ MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor) { Guard G(chan->cache->cacheLock); // TODO: more granular lock - if(!usr->running || usr->empty.empty()) + if(!usr->running || usr->empty.empty()) { + epicsAtomicIncrSizeT(&usr->ndropped); continue; + } pvd::MonitorElementPtr elem(usr->empty.front()); elem->pvStructurePtr = update->pvStructurePtr; @@ -109,11 +117,13 @@ MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor) elem->changedBitSet = update->changedBitSet; usr->filled.push_back(elem); usr->empty.pop_front(); - + epicsAtomicIncrSizeT(&usr->nevents); } - if(usr->filled.size()==1) + if(usr->filled.size()==1) { + epicsAtomicIncrSizeT(&usr->nwakeups); req->monitorEvent(*it); // notify when first item added to empty queue + } } mon->release(update); @@ -148,6 +158,8 @@ MonitorCacheEntry::message(std::string const & message, pvd::MessageType message MonitorUser::MonitorUser(const MonitorCacheEntry::shared_pointer &e) :entry(e) ,running(false) + ,nevents(0) + ,ndropped(0) { epicsAtomicIncrSizeT(&num_instances); } diff --git a/p2pApp/server.cpp b/p2pApp/server.cpp index 7b4e44e..9c4278d 100644 --- a/p2pApp/server.cpp +++ b/p2pApp/server.cpp @@ -312,13 +312,13 @@ void statusServer(int lvl) <<"' used by "<second; + MonitorCacheEntry::interested_t::vector_type usrs; size_t nsrvmon; bool hastype, hasdata, isdone; { @@ -328,13 +328,41 @@ void statusServer(int lvl) hastype = !!ME.typedesc; hasdata = !!ME.lastval; isdone = ME.done; + + if(lvl>2) + usrs = ME.interested.lock_vector(); } // TODO: how to describe pvRequest in a compact way... std::cout<<" Client Monitor used by "<cache.cacheLock); + + nempty = MU.empty.size(); + nfilled = MU.filled.size(); + nused = MU.inuse.size(); + } + total = nempty + nfilled + nused; + + std::cout<<" Server monitor buffer "<