diff --git a/p2pApp/chancache.h b/p2pApp/chancache.h index fbaf010..af248c0 100644 --- a/p2pApp/chancache.h +++ b/p2pApp/chancache.h @@ -40,6 +40,9 @@ struct MonitorCacheEntry : public epics::pvData::MonitorRequester size_t nevents; // # of upstream events poll()'d epics::pvData::StructureConstPtr typedesc; + /** value of upstream monitor (accumulation of all deltas) + * changed/overflow bit masks of last delta + */ epics::pvData::MonitorElement::shared_pointer lastelem; epics::pvData::MonitorPtr mon; epics::pvData::Status startresult; diff --git a/p2pApp/moncache.cpp b/p2pApp/moncache.cpp index ca191e6..cd5534f 100644 --- a/p2pApp/moncache.cpp +++ b/p2pApp/moncache.cpp @@ -12,14 +12,6 @@ size_t MonitorCacheEntry::num_instances; size_t MonitorUser::num_instances; namespace { -void assign(pvd::MonitorElementPtr& to, const pvd::MonitorElementPtr& from) -{ - assert(to && from); - // TODO: lot of copying happens here. how expensive? - *to->changedBitSet = *from->changedBitSet; - to->pvStructurePtr->copyUnchecked(*from->pvStructurePtr); -} - // fetch scalar value or default template T getS(const pvd::PVStructurePtr& s, const char* name, T dft) @@ -148,13 +140,22 @@ MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor) Guard G(usr->mutex()); if(usr->initial) continue; // no start() yet + // TODO: track overflow when !running (after stop())? if(!usr->running || usr->empty.empty()) { usr->inoverflow = true; + /* overrun |= lastelem->overrun // upstream overflows + * overrun |= changed & lastelem->changed // downstream overflows + * changed |= lastelem->changed // accumulate changes + */ + *usr->overflowElement->overrunBitSet |= *lastelem->overrunBitSet; usr->overflowElement->overrunBitSet->or_and(*usr->overflowElement->changedBitSet, *lastelem->changedBitSet); - assign(usr->overflowElement, lastelem); + *usr->overflowElement->changedBitSet |= *lastelem->changedBitSet; + + usr->overflowElement->pvStructurePtr->copyUnchecked(*lastelem->pvStructurePtr, + *lastelem->changedBitSet); epicsAtomicIncrSizeT(&usr->ndropped); continue; @@ -170,7 +171,10 @@ MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor) AUTO_VAL(elem, usr->empty.front()); *elem->overrunBitSet = *lastelem->overrunBitSet; - assign(elem, lastelem); // TODO: nice to avoid copy + *elem->changedBitSet = *lastelem->changedBitSet; + // Note: can't use changed mask to optimize this copy since we don't know + // the state of the free element + elem->pvStructurePtr->copyUnchecked(*lastelem->pvStructurePtr); usr->filled.push_back(elem); usr->empty.pop_front(); @@ -297,6 +301,7 @@ MonitorUser::start() pvd::MonitorElementPtr elem(empty.front()); elem->pvStructurePtr = lval; elem->changedBitSet->set(0); // indicate all changed + elem->overrunBitSet->clear(); filled.push_back(elem); empty.pop_front(); } diff --git a/testApp/testmon.cpp b/testApp/testmon.cpp index 99579b9..f61928e 100644 --- a/testApp/testmon.cpp +++ b/testApp/testmon.cpp @@ -148,8 +148,10 @@ struct TestMonitor { testOk1(!!elem.get()); testOk1(!!elem2.get()); testOk1(elem!=elem2); + if(elem) testDiag("elem changed '%s' overflow '%s'", toString(*elem->changedBitSet).c_str(), toString(*elem->overrunBitSet).c_str()); testOk1(elem && elem->pvStructurePtr->getSubFieldT("x")->get()==42); testOk1(elem && elem->pvStructurePtr->getSubFieldT("y")->get()==2); + if(elem2) testDiag("elem2 changed '%s' overflow '%s'", toString(*elem2->changedBitSet).c_str(), toString(*elem2->overrunBitSet).c_str()); testOk1(elem2 && elem2->pvStructurePtr->getSubFieldT("x")->get()==42); testOk1(elem2 && elem2->pvStructurePtr->getSubFieldT("y")->get()==2); diff --git a/testApp/testtest.cpp b/testApp/testtest.cpp index 602c6cf..39573c1 100644 --- a/testApp/testtest.cpp +++ b/testApp/testtest.cpp @@ -75,10 +75,15 @@ void testmonitor() pvd::MonitorElementPtr elem(mon->poll()); testOk1(!!elem.get()); + if(elem) testDiag("elem changed '%s' overflow '%s'", toString(*elem->changedBitSet).c_str(), toString(*elem->overrunBitSet).c_str()); + if(elem) testDiag("elem x=%d y=%d", + elem->pvStructurePtr->getSubFieldT("x")->get(), + elem->pvStructurePtr->getSubFieldT("y")->get()); testOk1(elem && elem->pvStructurePtr->getSubFieldT("x")->get()==42); testOk1(elem && elem->pvStructurePtr->getSubFieldT("y")->get()==15); testOk1(elem && elem->changedBitSet->nextSetBit(0)==0); // initial update shows all changed testOk1(elem && elem->changedBitSet->nextSetBit(1)==-1); + testOk1(elem && elem->overrunBitSet->nextSetBit(0)==-1); testOk1(elem && elem->overrunBitSet->isEmpty()); if(elem) mon->release(elem); @@ -97,6 +102,10 @@ void testmonitor() elem = mon->poll(); testOk1(!!elem.get()); + if(elem) testDiag("elem changed '%s' overflow '%s'", toString(*elem->changedBitSet).c_str(), toString(*elem->overrunBitSet).c_str()); + if(elem) testDiag("elem x=%d y=%d", + elem->pvStructurePtr->getSubFieldT("x")->get(), + elem->pvStructurePtr->getSubFieldT("y")->get()); testOk1(elem && elem->pvStructurePtr->getSubFieldT("x")->get()==43); testOk1(elem && elem->pvStructurePtr->getSubFieldT("y")->get()==15); testOk1(elem && elem->changedBitSet->nextSetBit(0)==1); diff --git a/testApp/utilities.cpp b/testApp/utilities.cpp index b2c5315..5a7b79d 100644 --- a/testApp/utilities.cpp +++ b/testApp/utilities.cpp @@ -276,6 +276,7 @@ TestPVMonitor::TestPVMonitor(const TestPVChannel::shared_pointer& ch, free.push_back(elem); } overflow.reset(new pvd::MonitorElement(fact->createPVStructure(channel->pv->dtype))); + overflow->changedBitSet->set(0); // initially all changed epicsAtomicIncrSizeT(&countTestPVMonitor); } @@ -419,7 +420,7 @@ void TestPV::post(bool notify) void TestPV::post(const pvd::BitSet& changed, bool notify) { - testDiag("post %s %d", name.c_str(), (int)notify); + testDiag("post %s %d changed '%s'", name.c_str(), (int)notify, toString(changed).c_str()); Guard G(provider->lock); channels_t::vector_type toupdate(channels.lock_vector()); @@ -442,7 +443,9 @@ void TestPV::post(const pvd::BitSet& changed, bool notify) mon->inoverflow = true; mon->overflow->overrunBitSet->or_and(*mon->overflow->changedBitSet, changed); // oflow = prev_changed & new_changed *mon->overflow->changedBitSet |= changed; - testDiag("overflow"); + testDiag("overflow changed '%s' overrun '%s'", + toString(*mon->overflow->changedBitSet).c_str(), + toString(*mon->overflow->overrunBitSet).c_str()); } else { assert(!mon->inoverflow); @@ -451,13 +454,17 @@ void TestPV::post(const pvd::BitSet& changed, bool notify) mon->needWakeup = true; AUTO_REF(elem, mon->free.front()); + // Note: can't use 'changed' to optimize this copy since we don't know + // the state of the free element elem->pvStructurePtr->copyUnchecked(*mon->overflow->pvStructurePtr); *elem->changedBitSet = changed; elem->overrunBitSet->clear(); // redundant/paranoia mon->buffer.push_back(elem); mon->free.pop_front(); - testDiag("push %p", elem.get()); + testDiag("push %p changed '%s' overflow '%s'", elem.get(), + toString(*elem->changedBitSet).c_str(), + toString(*elem->overrunBitSet).c_str()); } if(mon->needWakeup && notify) { diff --git a/testApp/utilities.h b/testApp/utilities.h index e5e43de..eb40af7 100644 --- a/testApp/utilities.h +++ b/testApp/utilities.h @@ -2,6 +2,7 @@ #define UTILITIES_H #include +#include #include #include @@ -23,6 +24,14 @@ struct TestProvider; testDiag("%s : " #NAME "(%p) : %s", epics::pvData::getMessageTypeName(messageType).c_str(), this, message.c_str()); \ } +template +inline std::string toString(const T& tbs) +{ + std::ostringstream oss; + oss << tbs; + return oss.str(); +} + template void test_method(const char *kname, const char *mname) {