more monitor delta handling
This commit is contained in:
@ -40,6 +40,9 @@ struct MonitorCacheEntry : public epics::pvData::MonitorRequester
|
||||
size_t nevents; // # of upstream events poll()'d
|
||||
|
||||
epics::pvData::StructureConstPtr typedesc;
|
||||
/** value of upstream monitor (accumulation of all deltas)
|
||||
* changed/overflow bit masks of last delta
|
||||
*/
|
||||
epics::pvData::MonitorElement::shared_pointer lastelem;
|
||||
epics::pvData::MonitorPtr mon;
|
||||
epics::pvData::Status startresult;
|
||||
|
@ -12,14 +12,6 @@ size_t MonitorCacheEntry::num_instances;
|
||||
size_t MonitorUser::num_instances;
|
||||
|
||||
namespace {
|
||||
void assign(pvd::MonitorElementPtr& to, const pvd::MonitorElementPtr& from)
|
||||
{
|
||||
assert(to && from);
|
||||
// TODO: lot of copying happens here. how expensive?
|
||||
*to->changedBitSet = *from->changedBitSet;
|
||||
to->pvStructurePtr->copyUnchecked(*from->pvStructurePtr);
|
||||
}
|
||||
|
||||
// fetch scalar value or default
|
||||
template<typename T>
|
||||
T getS(const pvd::PVStructurePtr& s, const char* name, T dft)
|
||||
@ -148,13 +140,22 @@ MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor)
|
||||
Guard G(usr->mutex());
|
||||
if(usr->initial)
|
||||
continue; // no start() yet
|
||||
// TODO: track overflow when !running (after stop())?
|
||||
if(!usr->running || usr->empty.empty()) {
|
||||
usr->inoverflow = true;
|
||||
|
||||
/* overrun |= lastelem->overrun // upstream overflows
|
||||
* overrun |= changed & lastelem->changed // downstream overflows
|
||||
* changed |= lastelem->changed // accumulate changes
|
||||
*/
|
||||
|
||||
*usr->overflowElement->overrunBitSet |= *lastelem->overrunBitSet;
|
||||
usr->overflowElement->overrunBitSet->or_and(*usr->overflowElement->changedBitSet,
|
||||
*lastelem->changedBitSet);
|
||||
assign(usr->overflowElement, lastelem);
|
||||
*usr->overflowElement->changedBitSet |= *lastelem->changedBitSet;
|
||||
|
||||
usr->overflowElement->pvStructurePtr->copyUnchecked(*lastelem->pvStructurePtr,
|
||||
*lastelem->changedBitSet);
|
||||
|
||||
epicsAtomicIncrSizeT(&usr->ndropped);
|
||||
continue;
|
||||
@ -170,7 +171,10 @@ MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor)
|
||||
AUTO_VAL(elem, usr->empty.front());
|
||||
|
||||
*elem->overrunBitSet = *lastelem->overrunBitSet;
|
||||
assign(elem, lastelem); // TODO: nice to avoid copy
|
||||
*elem->changedBitSet = *lastelem->changedBitSet;
|
||||
// Note: can't use changed mask to optimize this copy since we don't know
|
||||
// the state of the free element
|
||||
elem->pvStructurePtr->copyUnchecked(*lastelem->pvStructurePtr);
|
||||
|
||||
usr->filled.push_back(elem);
|
||||
usr->empty.pop_front();
|
||||
@ -297,6 +301,7 @@ MonitorUser::start()
|
||||
pvd::MonitorElementPtr elem(empty.front());
|
||||
elem->pvStructurePtr = lval;
|
||||
elem->changedBitSet->set(0); // indicate all changed
|
||||
elem->overrunBitSet->clear();
|
||||
filled.push_back(elem);
|
||||
empty.pop_front();
|
||||
}
|
||||
|
@ -148,8 +148,10 @@ struct TestMonitor {
|
||||
testOk1(!!elem.get());
|
||||
testOk1(!!elem2.get());
|
||||
testOk1(elem!=elem2);
|
||||
if(elem) testDiag("elem changed '%s' overflow '%s'", toString(*elem->changedBitSet).c_str(), toString(*elem->overrunBitSet).c_str());
|
||||
testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==42);
|
||||
testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("y")->get()==2);
|
||||
if(elem2) testDiag("elem2 changed '%s' overflow '%s'", toString(*elem2->changedBitSet).c_str(), toString(*elem2->overrunBitSet).c_str());
|
||||
testOk1(elem2 && elem2->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==42);
|
||||
testOk1(elem2 && elem2->pvStructurePtr->getSubFieldT<pvd::PVInt>("y")->get()==2);
|
||||
|
||||
|
@ -75,10 +75,15 @@ void testmonitor()
|
||||
pvd::MonitorElementPtr elem(mon->poll());
|
||||
testOk1(!!elem.get());
|
||||
|
||||
if(elem) testDiag("elem changed '%s' overflow '%s'", toString(*elem->changedBitSet).c_str(), toString(*elem->overrunBitSet).c_str());
|
||||
if(elem) testDiag("elem x=%d y=%d",
|
||||
elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get(),
|
||||
elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("y")->get());
|
||||
testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==42);
|
||||
testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("y")->get()==15);
|
||||
testOk1(elem && elem->changedBitSet->nextSetBit(0)==0); // initial update shows all changed
|
||||
testOk1(elem && elem->changedBitSet->nextSetBit(1)==-1);
|
||||
testOk1(elem && elem->overrunBitSet->nextSetBit(0)==-1);
|
||||
testOk1(elem && elem->overrunBitSet->isEmpty());
|
||||
if(elem) mon->release(elem);
|
||||
|
||||
@ -97,6 +102,10 @@ void testmonitor()
|
||||
elem = mon->poll();
|
||||
testOk1(!!elem.get());
|
||||
|
||||
if(elem) testDiag("elem changed '%s' overflow '%s'", toString(*elem->changedBitSet).c_str(), toString(*elem->overrunBitSet).c_str());
|
||||
if(elem) testDiag("elem x=%d y=%d",
|
||||
elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get(),
|
||||
elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("y")->get());
|
||||
testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("x")->get()==43);
|
||||
testOk1(elem && elem->pvStructurePtr->getSubFieldT<pvd::PVInt>("y")->get()==15);
|
||||
testOk1(elem && elem->changedBitSet->nextSetBit(0)==1);
|
||||
|
@ -276,6 +276,7 @@ TestPVMonitor::TestPVMonitor(const TestPVChannel::shared_pointer& ch,
|
||||
free.push_back(elem);
|
||||
}
|
||||
overflow.reset(new pvd::MonitorElement(fact->createPVStructure(channel->pv->dtype)));
|
||||
overflow->changedBitSet->set(0); // initially all changed
|
||||
epicsAtomicIncrSizeT(&countTestPVMonitor);
|
||||
}
|
||||
|
||||
@ -419,7 +420,7 @@ void TestPV::post(bool notify)
|
||||
|
||||
void TestPV::post(const pvd::BitSet& changed, bool notify)
|
||||
{
|
||||
testDiag("post %s %d", name.c_str(), (int)notify);
|
||||
testDiag("post %s %d changed '%s'", name.c_str(), (int)notify, toString(changed).c_str());
|
||||
Guard G(provider->lock);
|
||||
|
||||
channels_t::vector_type toupdate(channels.lock_vector());
|
||||
@ -442,7 +443,9 @@ void TestPV::post(const pvd::BitSet& changed, bool notify)
|
||||
mon->inoverflow = true;
|
||||
mon->overflow->overrunBitSet->or_and(*mon->overflow->changedBitSet, changed); // oflow = prev_changed & new_changed
|
||||
*mon->overflow->changedBitSet |= changed;
|
||||
testDiag("overflow");
|
||||
testDiag("overflow changed '%s' overrun '%s'",
|
||||
toString(*mon->overflow->changedBitSet).c_str(),
|
||||
toString(*mon->overflow->overrunBitSet).c_str());
|
||||
|
||||
} else {
|
||||
assert(!mon->inoverflow);
|
||||
@ -451,13 +454,17 @@ void TestPV::post(const pvd::BitSet& changed, bool notify)
|
||||
mon->needWakeup = true;
|
||||
|
||||
AUTO_REF(elem, mon->free.front());
|
||||
// Note: can't use 'changed' to optimize this copy since we don't know
|
||||
// the state of the free element
|
||||
elem->pvStructurePtr->copyUnchecked(*mon->overflow->pvStructurePtr);
|
||||
*elem->changedBitSet = changed;
|
||||
elem->overrunBitSet->clear(); // redundant/paranoia
|
||||
|
||||
mon->buffer.push_back(elem);
|
||||
mon->free.pop_front();
|
||||
testDiag("push %p", elem.get());
|
||||
testDiag("push %p changed '%s' overflow '%s'", elem.get(),
|
||||
toString(*elem->changedBitSet).c_str(),
|
||||
toString(*elem->overrunBitSet).c_str());
|
||||
}
|
||||
|
||||
if(mon->needWakeup && notify) {
|
||||
|
@ -2,6 +2,7 @@
|
||||
#define UTILITIES_H
|
||||
|
||||
#include <deque>
|
||||
#include <sstream>
|
||||
|
||||
#include <epicsEvent.h>
|
||||
#include <epicsUnitTest.h>
|
||||
@ -23,6 +24,14 @@ struct TestProvider;
|
||||
testDiag("%s : " #NAME "(%p) : %s", epics::pvData::getMessageTypeName(messageType).c_str(), this, message.c_str()); \
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
inline std::string toString(const T& tbs)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
oss << tbs;
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
template<class C, void (C::*M)()>
|
||||
void test_method(const char *kname, const char *mname)
|
||||
{
|
||||
|
Reference in New Issue
Block a user