examples: monitorme/spamme support pipeline=true

This commit is contained in:
Michael Davidsaver
2017-06-22 13:34:47 +02:00
parent c8bdcb62de
commit ba76cd0eb3
2 changed files with 75 additions and 25 deletions
+40 -12
View File
@@ -46,9 +46,14 @@ struct SpamMonitor : public pva::Monitor,
{
const std::tr1::shared_ptr<SpamChannel> channel;
const pva::MonitorRequester::shared_pointer requester;
pvd::int32 maxQueue;
bool pipeline;
// Has the client seen poll()==NULL
bool clientEmpty;
epicsMutex mutex;
bool running;
epicsUInt32 remoteQueue;
std::deque<epics::pvData::MonitorElementPtr> filled, empty;
pvd::PVStructure::shared_pointer value;
epicsUInt32 counter;
@@ -58,19 +63,28 @@ struct SpamMonitor : public pva::Monitor,
const pvd::PVStructure::shared_pointer &pvRequest)
:channel(chan)
,requester(requester)
,maxQueue(0)
,pipeline(false)
,clientEmpty(true)
,running(false)
,remoteQueue(0)
,counter(0)
{
pvd::int32 qsize = 0;
pvd::PVScalar::shared_pointer queueSize(pvRequest->getSubField<pvd::PVScalar>("record._options.queueSize"));
if(queueSize)
qsize = queueSize->getAs<pvd::int32>();
if(qsize<=0)
qsize = 3;
pvd::PVScalar::shared_pointer fld;
fld = pvRequest->getSubField<pvd::PVScalar>("record._options.queueSize");
if(fld)
maxQueue = fld->getAs<pvd::int32>();
if(maxQueue<3)
maxQueue = 3;
fld = pvRequest->getSubField<pvd::PVScalar>("record._options.pipeline");
if(fld)
pipeline = fld->getAs<pvd::boolean>();
pvd::PVDataCreatePtr create(pvd::getPVDataCreate());
value = create->createPVStructure(spamtype);
for(pvd::int32 i=0; i<qsize; i++)
for(pvd::int32 i=0; i<maxQueue; i++)
{
pvd::MonitorElementPtr elem(new pvd::MonitorElement(create->createPVStructure(spamtype)));
empty.push_back(elem);
@@ -87,9 +101,9 @@ struct SpamMonitor : public pva::Monitor,
Guard G(mutex);
run = running;
running = true;
clientEmpty = true;
}
if(!run)
pushall();
pushall();
return pvd::Status::Ok;
}
virtual pvd::Status stop() OVERRIDE FINAL
@@ -108,6 +122,7 @@ struct SpamMonitor : public pva::Monitor,
ret = filled.front();
filled.pop_front();
}
clientEmpty = !ret;
return ret;
}
virtual void release(const pva::MonitorElementPtr& elem) OVERRIDE FINAL
@@ -121,13 +136,24 @@ struct SpamMonitor : public pva::Monitor,
pushall();
}
virtual void reportRemoteQueueStatus(pvd::int32 freeElements) OVERRIDE FINAL
{
{
Guard G(mutex);
remoteQueue += freeElements;
}
pushall();
}
void pushall()
{
bool signal = false;
bool signal;
{
Guard G(mutex);
while(!empty.empty()) {
signal = clientEmpty && filled.empty();
while(!empty.empty() && (!pipeline || remoteQueue>0)) {
pva::MonitorElementPtr elem(empty.front());
pvd::PVIntPtr fld(value->getSubFieldT<pvd::PVInt>("value"));
@@ -138,10 +164,12 @@ struct SpamMonitor : public pva::Monitor,
elem->changedBitSet->set(0);
elem->overrunBitSet->clear();
signal |= filled.empty();
filled.push_back(elem);
empty.pop_front();
remoteQueue--;
}
signal &= !filled.empty();
}
if(signal)
requester->monitorEvent(shared_from_this());