rework monitor queuing
This commit is contained in:
@ -27,6 +27,8 @@ struct MonitorCacheEntry : public epics::pvData::MonitorRequester
|
||||
|
||||
ChannelCacheEntry * const chan;
|
||||
|
||||
const size_t bufferSize; // DS requested buffer size
|
||||
|
||||
// to avoid yet another mutex borrow interested.mutex() for our members
|
||||
inline epicsMutex& mutex() const { return interested.mutex(); }
|
||||
|
||||
@ -37,14 +39,14 @@ struct MonitorCacheEntry : public epics::pvData::MonitorRequester
|
||||
size_t nevents; // # of upstream events poll()'d
|
||||
|
||||
epics::pvData::StructureConstPtr typedesc;
|
||||
epics::pvData::PVStructure::shared_pointer lastval;
|
||||
epics::pvData::MonitorElement::shared_pointer lastelem;
|
||||
epics::pvData::MonitorPtr mon;
|
||||
epics::pvData::Status startresult;
|
||||
|
||||
typedef weak_set<MonitorUser> interested_t;
|
||||
interested_t interested;
|
||||
|
||||
MonitorCacheEntry(ChannelCacheEntry *ent);
|
||||
MonitorCacheEntry(ChannelCacheEntry *ent, const epics::pvData::PVStructure::shared_pointer& pvr);
|
||||
virtual ~MonitorCacheEntry();
|
||||
|
||||
virtual void monitorConnect(epics::pvData::Status const & status,
|
||||
@ -63,13 +65,15 @@ struct MonitorUser : public epics::pvData::Monitor
|
||||
static size_t num_instances;
|
||||
weak_pointer weakref;
|
||||
|
||||
inline epicsMutex& mutex() const { return entry->mutex(); }
|
||||
|
||||
MonitorCacheEntry::shared_pointer entry;
|
||||
epics::pvData::MonitorRequester::weak_pointer req;
|
||||
std::tr1::weak_ptr<GWChannel> srvchan;
|
||||
|
||||
// guards queues and member variables
|
||||
epicsMutex queueLock;
|
||||
bool running;
|
||||
bool inoverflow;
|
||||
size_t nwakeups; // # of monitorEvent() calls to req
|
||||
size_t nevents; // total # events queued
|
||||
size_t ndropped; // # of events drop because our queue was full
|
||||
@ -77,6 +81,8 @@ struct MonitorUser : public epics::pvData::Monitor
|
||||
std::deque<epics::pvData::MonitorElementPtr> filled, empty;
|
||||
std::set<epics::pvData::MonitorElementPtr> inuse;
|
||||
|
||||
epics::pvData::MonitorElementPtr overflowElement;
|
||||
|
||||
MonitorUser(const MonitorCacheEntry::shared_pointer&);
|
||||
virtual ~MonitorUser();
|
||||
|
||||
|
@ -164,6 +164,8 @@ GWChannel::createMonitor(
|
||||
{
|
||||
Guard G(entry->mutex());
|
||||
|
||||
// TODO: no-cache/no-share flag in pvRequest
|
||||
|
||||
ment = entry->mon_entries.find(ser);
|
||||
if(!ment) {
|
||||
ment.reset(new MonitorCacheEntry(entry.get(), pvRequest));
|
||||
|
@ -11,8 +11,31 @@ namespace pvd = epics::pvData;
|
||||
size_t MonitorCacheEntry::num_instances;
|
||||
size_t MonitorUser::num_instances;
|
||||
|
||||
MonitorCacheEntry::MonitorCacheEntry(ChannelCacheEntry *ent)
|
||||
namespace {
|
||||
void assign(pvd::MonitorElementPtr& to, const pvd::MonitorElementPtr& from)
|
||||
{
|
||||
assert(to && from);
|
||||
// TODO: lot of copying happens here. how expensive?
|
||||
*to->changedBitSet = *from->changedBitSet;
|
||||
*to->overrunBitSet = *from->overrunBitSet;
|
||||
to->pvStructurePtr->copyUnchecked(*from->pvStructurePtr);
|
||||
}
|
||||
|
||||
// fetch scalar value or default
|
||||
template<typename T>
|
||||
T getS(const pvd::PVStructurePtr& s, const char* name, T dft)
|
||||
{
|
||||
try{
|
||||
return s->getSubFieldT<pvd::PVScalar>(name)->getAs<T>();
|
||||
}catch(std::runtime_error& e){
|
||||
return dft;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MonitorCacheEntry::MonitorCacheEntry(ChannelCacheEntry *ent, const pvd::PVStructure::shared_pointer& pvr)
|
||||
:chan(ent)
|
||||
,bufferSize(getS<pvd::uint32>(pvr, "record._options.queueSize", 2)) // should be same default as pvAccess, but not required
|
||||
,done(false)
|
||||
,nwakeups(0)
|
||||
,nevents(0)
|
||||
@ -28,6 +51,7 @@ MonitorCacheEntry::~MonitorCacheEntry()
|
||||
M->destroy();
|
||||
}
|
||||
epicsAtomicDecrSizeT(&num_instances);
|
||||
const_cast<ChannelCacheEntry*&>(chan) = NULL; // spoil to fault use after free
|
||||
}
|
||||
|
||||
void
|
||||
@ -69,9 +93,10 @@ MonitorCacheEntry::monitorConnect(pvd::Status const & status,
|
||||
}
|
||||
|
||||
// notificaton from upstream client that its monitor queue has
|
||||
// become is not empty (transition from empty to not empty)
|
||||
// will not be called again unless we completely empty the queue.
|
||||
// become not empty (transition from empty to not empty)
|
||||
// will not be called again unless we completely empty the upstream queue.
|
||||
// If we don't then it is our responsibly to schedule more poll().
|
||||
// Note: this probably means this is a PVA client RX thread.
|
||||
void
|
||||
MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor)
|
||||
{
|
||||
@ -84,57 +109,70 @@ MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor)
|
||||
* destroy() method is a no-op!
|
||||
*/
|
||||
|
||||
//TODO: dequeue and requeue strategy code goes here
|
||||
epicsAtomicIncrSizeT(&nwakeups);
|
||||
|
||||
shared_pointer self(weakref); // keeps us alive in case all MonitorUsers are destroy()ed
|
||||
|
||||
pvd::MonitorElementPtr update;
|
||||
|
||||
shared_pointer self(weakref); // keeps us alive all MonitorUsers are destroy()ed
|
||||
typedef std::vector<MonitorUser::shared_pointer> dsnotify_t;
|
||||
dsnotify_t dsnotify;
|
||||
|
||||
while((update=monitor->poll()))
|
||||
{
|
||||
Guard G(mutex()); // MCE and MU guarded by the same mutex
|
||||
|
||||
//TODO: flow control, if all MU buffers are full, break before poll()==NULL
|
||||
while((update=monitor->poll()))
|
||||
{
|
||||
Guard G(mutex());
|
||||
lastval = update->pvStructurePtr;
|
||||
}
|
||||
epicsAtomicIncrSizeT(&nevents);
|
||||
epicsAtomicIncrSizeT(&nevents);
|
||||
|
||||
AUTO_VAL(tonotify, interested.lock_vector()); // TODO: avoid copy, iterate w/ lock
|
||||
if(lastelem)
|
||||
monitor->release(lastelem);
|
||||
lastelem = update;
|
||||
|
||||
FOREACH(it, end, tonotify)
|
||||
{
|
||||
MonitorUser *usr = it->get();
|
||||
pvd::MonitorRequester::shared_pointer req(usr->req);
|
||||
|
||||
bool notify = false;
|
||||
interested_t::iterator IIT(interested); // recursively locks interested.mutex() (assumes this->mutex() is interestd.mutex())
|
||||
for(interested_t::value_pointer pusr = IIT.next(); pusr; pusr = IIT.next())
|
||||
{
|
||||
Guard G(usr->queueLock);
|
||||
if(!usr->running || usr->empty.empty()) {
|
||||
epicsAtomicIncrSizeT(&usr->ndropped);
|
||||
continue;
|
||||
MonitorUser *usr = pusr.get();
|
||||
|
||||
{
|
||||
Guard G(usr->mutex());
|
||||
if(!usr->running || usr->empty.empty()) {
|
||||
usr->inoverflow = true;
|
||||
assign(usr->overflowElement, update);
|
||||
// when overflow, US change is DS overflow
|
||||
*usr->overflowElement->overrunBitSet |= *update->changedBitSet;
|
||||
epicsAtomicIncrSizeT(&usr->ndropped);
|
||||
continue;
|
||||
}
|
||||
// we only come out of overflow when downstream release()s an element to us
|
||||
assert(!usr->inoverflow);
|
||||
|
||||
if(usr->filled.empty())
|
||||
dsnotify.push_back(pusr);
|
||||
|
||||
AUTO_VAL(elem, usr->empty.front());
|
||||
|
||||
assign(elem, update); // TODO: nice to avoid copy
|
||||
|
||||
usr->filled.push_back(elem);
|
||||
usr->empty.pop_front();
|
||||
|
||||
epicsAtomicIncrSizeT(&usr->nevents);
|
||||
}
|
||||
notify = usr->filled.empty();
|
||||
|
||||
AUTO_VAL(elem, usr->empty.front());
|
||||
|
||||
elem->pvStructurePtr = update->pvStructurePtr;
|
||||
elem->overrunBitSet = update->overrunBitSet;
|
||||
elem->changedBitSet = update->changedBitSet;
|
||||
|
||||
usr->filled.push_back(elem);
|
||||
usr->empty.pop_front();
|
||||
|
||||
epicsAtomicIncrSizeT(&usr->nevents);
|
||||
}
|
||||
|
||||
//TODO: notify from worker since monitroEvent() will call poll()
|
||||
if(notify) {
|
||||
epicsAtomicIncrSizeT(&usr->nwakeups);
|
||||
req->monitorEvent(*it); // notify when first item added to empty queue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
monitor->release(update);
|
||||
// unlock here, race w/ stop(), unlisten()?
|
||||
//TODO: notify from worker thread
|
||||
|
||||
FOREACH(it,end,dsnotify) {
|
||||
MonitorUser *usr = (*it).get();
|
||||
pvd::MonitorRequester::shared_pointer req(usr->req);
|
||||
epicsAtomicIncrSizeT(&usr->nwakeups);
|
||||
req->monitorEvent(*it); // notify when first item added to empty queue, may call poll(), release(), and others
|
||||
}
|
||||
}
|
||||
|
||||
@ -143,12 +181,28 @@ void
|
||||
MonitorCacheEntry::unlisten(pvd::MonitorPtr const & monitor)
|
||||
{
|
||||
pvd::Monitor::shared_pointer M;
|
||||
M.swap(mon);
|
||||
interested_t::vector_type tonotify;
|
||||
{
|
||||
Guard G(mutex());
|
||||
M.swap(mon);
|
||||
tonotify = interested.lock_vector();
|
||||
// assume that upstream won't call monitorEvent() again
|
||||
monitor->release(lastelem);
|
||||
lastelem.reset();
|
||||
|
||||
// cause future downstream start() to error
|
||||
startresult = pvd::Status(pvd::Status::STATUSTYPE_ERROR, "upstream unlisten()");
|
||||
}
|
||||
if(M) {
|
||||
M->destroy();
|
||||
std::cout<<__PRETTY_FUNCTION__<<" destroy client monitor\n";
|
||||
}
|
||||
// TODO: call all unlisten()
|
||||
FOREACH(it, end, tonotify) {
|
||||
MonitorUser *usr = it->get();
|
||||
pvd::MonitorRequester::shared_pointer req(usr->req);
|
||||
if(usr->inuse.empty()) // TODO: what about stopped?
|
||||
req->unlisten(*it);
|
||||
}
|
||||
}
|
||||
|
||||
std::string
|
||||
@ -166,6 +220,7 @@ MonitorCacheEntry::message(std::string const & message, pvd::MessageType message
|
||||
MonitorUser::MonitorUser(const MonitorCacheEntry::shared_pointer &e)
|
||||
:entry(e)
|
||||
,running(false)
|
||||
,inoverflow(true)
|
||||
,nevents(0)
|
||||
,ndropped(0)
|
||||
{
|
||||
@ -182,7 +237,7 @@ void
|
||||
MonitorUser::destroy()
|
||||
{
|
||||
{
|
||||
Guard G(queueLock);
|
||||
Guard G(mutex());
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
@ -203,23 +258,27 @@ MonitorUser::start()
|
||||
if(!entry->startresult.isSuccess())
|
||||
return entry->startresult;
|
||||
|
||||
lval = entry->lastval;
|
||||
if(entry->lastelem)
|
||||
lval = entry->lastelem->pvStructurePtr;
|
||||
typedesc = entry->typedesc;
|
||||
}
|
||||
|
||||
{
|
||||
Guard G(queueLock);
|
||||
Guard G(mutex());
|
||||
|
||||
//TODO: control queue size
|
||||
empty.resize(4);
|
||||
pvd::PVDataCreatePtr fact(pvd::getPVDataCreate());
|
||||
for(unsigned i=0; i<empty.size(); i++) {
|
||||
empty[i].reset(new pvd::MonitorElement(fact->createPVStructure(typedesc)));
|
||||
if(empty.empty()) {
|
||||
empty.resize(entry->bufferSize);
|
||||
pvd::PVDataCreatePtr fact(pvd::getPVDataCreate());
|
||||
for(unsigned i=0; i<empty.size(); i++) {
|
||||
empty[i].reset(new pvd::MonitorElement(fact->createPVStructure(typedesc)));
|
||||
}
|
||||
|
||||
// extra element to accumulate updates during overflow
|
||||
overflowElement.reset(new pvd::MonitorElement(fact->createPVStructure(typedesc)));
|
||||
}
|
||||
|
||||
if(lval) {
|
||||
if(lval && !empty.empty()) {
|
||||
//already running, notify of initial element
|
||||
assert(!empty.empty());
|
||||
|
||||
pvd::MonitorElementPtr elem(empty.front());
|
||||
elem->pvStructurePtr = lval;
|
||||
@ -232,14 +291,14 @@ MonitorUser::start()
|
||||
running = true;
|
||||
}
|
||||
if(doEvt)
|
||||
req->monitorEvent(shared_pointer(weakref));
|
||||
req->monitorEvent(shared_pointer(weakref)); // TODO: worker thread?
|
||||
return pvd::Status();
|
||||
}
|
||||
|
||||
pvd::Status
|
||||
MonitorUser::stop()
|
||||
{
|
||||
Guard G(queueLock);
|
||||
Guard G(mutex());
|
||||
running = false;
|
||||
return pvd::Status::Ok;
|
||||
}
|
||||
@ -247,7 +306,7 @@ MonitorUser::stop()
|
||||
pvd::MonitorElementPtr
|
||||
MonitorUser::poll()
|
||||
{
|
||||
Guard G(queueLock);
|
||||
Guard G(mutex());
|
||||
pvd::MonitorElementPtr ret;
|
||||
if(!filled.empty()) {
|
||||
ret = filled.front();
|
||||
@ -261,18 +320,37 @@ MonitorUser::poll()
|
||||
void
|
||||
MonitorUser::release(pvd::MonitorElementPtr const & monitorElement)
|
||||
{
|
||||
Guard G(queueLock);
|
||||
Guard G(mutex());
|
||||
//TODO: ifdef DEBUG? (only track inuse when debugging?)
|
||||
std::set<epics::pvData::MonitorElementPtr>::iterator it = inuse.find(monitorElement);
|
||||
if(it!=inuse.end()) {
|
||||
empty.push_back(monitorElement);
|
||||
inuse.erase(it);
|
||||
|
||||
if(inoverflow) { // leaving overflow condition
|
||||
|
||||
// to avoid copy, enqueue the current overflowElement
|
||||
// and replace it with the element being release()d
|
||||
|
||||
empty.push_back(overflowElement);
|
||||
try{
|
||||
filled.push_back(overflowElement);
|
||||
}catch(...){
|
||||
empty.pop_back();
|
||||
throw;
|
||||
}
|
||||
overflowElement = monitorElement;
|
||||
|
||||
inoverflow = false;
|
||||
} else {
|
||||
// push_back empty element
|
||||
empty.push_back(monitorElement);
|
||||
}
|
||||
} else {
|
||||
// oh no, we've been given an element we weren't expecting
|
||||
// oh no, we've been given an element which we didn't give to downstream
|
||||
//TODO: check empty and filled lists to see if this is one of ours, of from somewhere else
|
||||
throw std::invalid_argument("Can't release MonitorElement not in use");
|
||||
}
|
||||
// TODO: pipeline window update
|
||||
// TODO: pipeline window update?
|
||||
}
|
||||
|
||||
std::string
|
||||
|
@ -353,7 +353,7 @@ void statusServer(int lvl, const char *chanexpr)
|
||||
|
||||
nsrvmon = ME.interested.size();
|
||||
hastype = !!ME.typedesc;
|
||||
hasdata = !!ME.lastval;
|
||||
hasdata = !!ME.lastelem;
|
||||
isdone = ME.done;
|
||||
|
||||
if(lvl>2)
|
||||
@ -378,7 +378,7 @@ void statusServer(int lvl, const char *chanexpr)
|
||||
std::string remote;
|
||||
bool isrunning;
|
||||
{
|
||||
Guard G(MU.queueLock);
|
||||
Guard G(MU.mutex());
|
||||
|
||||
nempty = MU.empty.size();
|
||||
nfilled = MU.filled.size();
|
||||
|
Reference in New Issue
Block a user