From 3341d9a2baf63c96e997cd61d1f7c59c17bf8194 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 23 May 2018 13:57:43 -0700 Subject: [PATCH] spamme example use MonitorFIFO --- examples/spamme.cpp | 155 +++++++++----------------------------------- 1 file changed, 29 insertions(+), 126 deletions(-) diff --git a/examples/spamme.cpp b/examples/spamme.cpp index d66942b..3977715 100644 --- a/examples/spamme.cpp +++ b/examples/spamme.cpp @@ -32,6 +32,8 @@ typedef epicsGuard Guard; epicsEvent done; +bool debug; + #ifdef USE_SIGNAL void alldone(int num) { @@ -48,145 +50,39 @@ pvd::Structure::const_shared_pointer spamtype(pvd::getFieldCreate()->createField struct SpamProvider; struct SpamChannel; -struct SpamMonitor : public pva::Monitor, - public std::tr1::enable_shared_from_this +struct SpamSource : public pva::MonitorFIFO::Source { - const std::tr1::shared_ptr channel; - const requester_type::weak_pointer requester; - pvd::int32 maxQueue; - bool pipeline; - // Has the client seen poll()==NULL - bool clientEmpty; epicsMutex mutex; - bool running; - epicsUInt32 remoteQueue; - std::deque filled, empty; - pvd::PVStructure::shared_pointer value; epicsUInt32 counter; - SpamMonitor(const std::tr1::shared_ptr& chan, - const pva::MonitorRequester::shared_pointer &requester, - const pvd::PVStructure::shared_pointer &pvRequest) - :channel(chan) - ,requester(requester) - ,maxQueue(0) - ,pipeline(false) - ,clientEmpty(true) - ,running(false) - ,remoteQueue(0) - ,counter(0) + const pvd::PVStructurePtr cur; + const pvd::PVIntPtr val; + pvd::BitSet changed; + + SpamSource() + :counter(0) + ,cur(pvd::getPVDataCreate()->createPVStructure(spamtype)) + ,val(cur->getSubFieldT("value")) { - pvd::PVScalar::shared_pointer fld; - - fld = pvRequest->getSubField("record._options.queueSize"); - if(fld) - maxQueue = fld->getAs(); - if(maxQueue<3) - maxQueue = 3; - - fld = pvRequest->getSubField("record._options.pipeline"); - if(fld) - pipeline = fld->getAs(); - - pvd::PVDataCreatePtr create(pvd::getPVDataCreate()); - value = create->createPVStructure(spamtype); - for(pvd::int32 i=0; icreatePVStructure(spamtype))); - empty.push_back(elem); - } + changed.set(val->getFieldOffset()); // our value always changes } - virtual ~SpamMonitor() {} - - virtual void destroy() OVERRIDE FINAL {(void)stop();} - - virtual pvd::Status start() OVERRIDE FINAL - { - { - Guard G(mutex); - running = true; - clientEmpty = true; - } - pushall(); - return pvd::Status::Ok; - } - virtual pvd::Status stop() OVERRIDE FINAL - { - { - Guard G(mutex); - running = false; - } - return pvd::Status::Ok; - } - virtual pva::MonitorElementPtr poll() OVERRIDE FINAL + virtual ~SpamSource() {} + virtual void freeHighMark(pva::MonitorFIFO *mon, size_t numEmpty) OVERRIDE FINAL { Guard G(mutex); - pva::MonitorElementPtr ret; - if(!filled.empty()) { - ret = filled.front(); - filled.pop_front(); - } - clientEmpty = !ret; - return ret; - } - virtual void release(const pva::MonitorElementPtr& elem) OVERRIDE FINAL - { - if(elem->pvStructurePtr->getField().get()!=spamtype.get()) - return; + for(;numEmpty; numEmpty--) { - Guard G(mutex); - empty.push_back(elem); - } - pushall(); - } - - virtual void reportRemoteQueueStatus(pvd::int32 freeElements) OVERRIDE FINAL - { - { - Guard G(mutex); - remoteQueue += freeElements; - } - pushall(); - } - - void pushall() - { - bool signal; - { - Guard G(mutex); - - signal = clientEmpty && filled.empty(); - - while(!empty.empty() && (!pipeline || remoteQueue>0)) { - pva::MonitorElementPtr elem(empty.front()); - - pvd::PVIntPtr fld(value->getSubFieldT("value")); - fld->put(counter++); - - elem->pvStructurePtr->copyUnchecked(*value); - elem->changedBitSet->clear(); - elem->changedBitSet->set(0); - elem->overrunBitSet->clear(); - - filled.push_back(elem); - empty.pop_front(); - remoteQueue--; + val->put(counter); + if(!mon->tryPost(*cur, changed) && numEmpty!=1) { + std::cerr<<"spam tryPost() inconsistent "<monitorEvent(shared_from_this()); + counter++; } } }; - struct SpamChannel : public pva::Channel, public std::tr1::enable_shared_from_this { @@ -226,8 +122,11 @@ struct SpamChannel : public pva::Channel, virtual pva::Monitor::shared_pointer createMonitor(const pva::MonitorRequester::shared_pointer &requester, const pvd::PVStructure::shared_pointer &pvRequest) OVERRIDE FINAL { - std::tr1::shared_ptr ret(new SpamMonitor(shared_from_this(), requester, pvRequest)); - requester->monitorConnect(pvd::Status::Ok, ret, spamtype); + std::tr1::shared_ptr us(new SpamSource); + std::tr1::shared_ptr ret(new pva::MonitorFIFO(requester, pvRequest, us)); + // ret holds strong ref. to us + ret->open(spamtype); + ret->notify(); return ret; } }; @@ -277,6 +176,10 @@ struct SpamProvider : public pva::ChannelProvider, int main(int argc, char *argv[]) { try { + for(int i=1; i provider(new SpamProvider("spam")); pva::ServerContext::shared_pointer server(pva::ServerContext::create(pva::ServerContext::Config() .provider(provider)