diff --git a/src/client/Makefile b/src/client/Makefile index a8636cb..891547b 100644 --- a/src/client/Makefile +++ b/src/client/Makefile @@ -7,6 +7,7 @@ INC += pv/pvAccess.h INC += pva/client.h pvAccess_SRCS += pvAccess.cpp +pvAccess_SRCS += monitor.cpp pvAccess_SRCS += client.cpp pvAccess_SRCS += clientSync.cpp pvAccess_SRCS += clientGet.cpp diff --git a/src/client/monitor.cpp b/src/client/monitor.cpp new file mode 100644 index 0000000..b65d823 --- /dev/null +++ b/src/client/monitor.cpp @@ -0,0 +1,473 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ + +#include +#include + +#include +#include + +#define epicsExportSharedSymbols +#include +#include +#include + +namespace pvd = epics::pvData; + +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + +namespace epics {namespace pvAccess { + +static const MonitorFIFO::Config default_conf = {4, 4, 0}; + +size_t MonitorFIFO::num_instances; + +MonitorFIFO::Source::~Source() {} + +MonitorFIFO::MonitorFIFO(const std::tr1::shared_ptr &requester, + const pvData::PVStructure::const_shared_pointer &pvRequest, + const Source::shared_pointer &source, Config *inconf) + :conf(inconf ? *inconf : default_conf) + ,requester(requester) + ,upstream(source) + ,pipeline(false) + ,opened(false) + ,running(false) + ,finished(false) + ,needConnected(false) + ,needEvent(false) + ,needUnlisten(false) + ,needClosed(false) + ,freeHighLevel(0u) + ,flowCount(0) +{ + REFTRACE_INCREMENT(num_instances); + + if(conf.maxCount==0) + conf.maxCount = 1; + + if(conf.defCount==0) + conf.defCount = 1; + + pvd::PVScalar::const_shared_pointer O(pvRequest->getSubField("record._options.queueSize")); + if(O && conf.actualCount==0) { + try { + conf.actualCount = O->getAs(); + } catch(std::exception& e) { + std::ostringstream strm; + strm<<"invalid queueSize : "<message(strm.str()); + } + } + + if(conf.actualCount==0) + conf.actualCount = conf.defCount; + + if(conf.actualCount > conf.maxCount) + conf.actualCount = conf.maxCount; + + O = pvRequest->getSubField("record._options.pipeline"); + if(O) { + try { + pipeline = O->getAs(); + } catch(std::exception& e) { + std::ostringstream strm; + strm<<"invalid pipeline : "<message(strm.str()); + } + } + + setFreeHighMark(0.00); + + if(inconf) + *inconf = conf; +} + +MonitorFIFO::~MonitorFIFO() { + REFTRACE_DECREMENT(num_instances); +} + +void MonitorFIFO::destroy() +{} + +void MonitorFIFO::show(std::ostream& strm) const +{ + // const (after ctor) bits + strm<<"MonitorFIFO" + " pipeline="<createPVStructure(type))); + empty.push_back(elem); + } + + opened = true; + needConnected = true; + + assert(inuse.empty()); + assert(empty.size()>=2); + assert(returned.empty()); + assert(conf.actualCount>=1); +} + +void MonitorFIFO::close() +{ + Guard G(mutex); + if(!opened) + return; // no-op + + opened = false; + needClosed = true; +} + +void MonitorFIFO::finish() +{ + Guard G(mutex); + if(!opened) + throw std::logic_error("Can not finish() a closed Monitor"); + else if(finished) + return; // no-op + + finished = true; + if(inuse.empty() && running) + needUnlisten = true; +} + +bool MonitorFIFO::tryPost(const pvData::PVStructure& value, + const pvd::BitSet& changed, + const pvd::BitSet& overrun, + bool force) +{ + Guard G(mutex); + + assert(opened && !finished); + assert(!empty.empty() || !inuse.empty()); + + const bool havefree = _freeCount()>0u; + + MonitorElementPtr elem; + if(havefree) { + // take an empty element + elem = empty.front(); + empty.pop_front(); + } else if(force) { + // allocate an extra element + elem.reset(new MonitorElement(pvd::getPVDataCreate()->createPVStructure(inuse.back()->pvStructurePtr->getStructure()))); + } + + if(elem) { + try { + assert(value.getStructure() == elem->pvStructurePtr->getStructure()); + elem->pvStructurePtr->copyUnchecked(value, changed); + *elem->changedBitSet = changed; + *elem->overrunBitSet = overrun; + + if(inuse.empty() && running) + needEvent = true; + inuse.push_back(elem); + }catch(...){ + if(havefree) { + empty.push_front(elem); + } + throw; + } + if(pipeline) + flowCount--; + } + + return _freeCount()>0u; +} + + +void MonitorFIFO::post(const pvData::PVStructure& value, + const pvd::BitSet& changed, + const pvd::BitSet& overrun) +{ + Guard G(mutex); + + assert(opened && !finished); + assert(!empty.empty() || !inuse.empty()); + + const bool use_empty = !empty.empty(); + + MonitorElementPtr elem; + + if(use_empty) { + // space in window, or entering overflow, fill an empty element + + assert(!empty.empty()); + + elem = empty.front(); + + } else { + // window full and already in overflow + // squash with last element + assert(!inuse.empty()); + elem = inuse.back(); + } + + assert(value.getStructure() == elem->pvStructurePtr->getStructure()); + elem->pvStructurePtr->copyUnchecked(value, changed); + + if(use_empty) { + *elem->changedBitSet = changed; + *elem->overrunBitSet = overrun; + + if(inuse.empty() && running) + needEvent = true; + + inuse.push_back(elem); + empty.pop_front(); + if(pipeline) + flowCount--; + + } else { + // in overflow + // squash + elem->overrunBitSet->or_and(*elem->changedBitSet, changed); + *elem->overrunBitSet |= overrun; + *elem->changedBitSet |= changed; + + // leave as inuse.back() + } +} + +void MonitorFIFO::notify() +{ + Monitor::shared_pointer self; + MonitorRequester::shared_pointer req; + pvd::StructureConstPtr type; + bool conn = false, + evt = false, + unl = false, + clo = false; + + { + Guard G(mutex); + + std::swap(conn, needConnected); + std::swap(evt, needEvent); + std::swap(unl, needUnlisten); + std::swap(clo, needClosed); + + if(conn | evt | unl | clo) { + req = requester.lock(); + self = shared_from_this(); + } + if(conn) + type = (!inuse.empty() ? inuse.front() : empty.back())->pvStructurePtr->getStructure(); + } + + if(!req) + return; + if(conn) + req->monitorConnect(pvd::Status(), self, type); + if(evt) + req->monitorEvent(self); + if(unl) + req->unlisten(self); + if(clo) + req->channelDisconnect(false); +} + +pvd::Status MonitorFIFO::start() +{ + Monitor::shared_pointer self; + MonitorRequester::shared_pointer req; + + { + Guard G(mutex); + + if(!opened) + throw std::logic_error("Monitor can't start() before open()"); + + if(running) + return pvd::Status(); + + if(!inuse.empty()) { + self = shared_from_this(); + req = requester.lock(); + } + + running = true; + } + + if(req) + req->monitorEvent(self); + + return pvd::Status(); +} + +pvd::Status MonitorFIFO::stop() +{ + Guard G(mutex); + + running = false; + + return pvd::Status(); +} + +MonitorElementPtr MonitorFIFO::poll() +{ + MonitorElementPtr ret; + Monitor::shared_pointer self; + MonitorRequester::shared_pointer req; + + { + Guard G(mutex); + + if(!inuse.empty() && inuse.size() + empty.size() > 1) { + ret = inuse.front(); + inuse.pop_front(); + if(inuse.empty() && finished) { + self = shared_from_this(); + req = requester.lock(); + } + } + + assert(!inuse.empty() || !empty.empty()); + } + + if(req) { + req->unlisten(self); + } + + return ret; +} + +void MonitorFIFO::release(MonitorElementPtr const & elem) +{ + size_t nempty; + { + Guard G(mutex); + + assert(!inuse.empty() || !empty.empty()); + + const pvd::StructureConstPtr& type((!inuse.empty() ? inuse.front() : empty.back())->pvStructurePtr->getStructure()); + + if(elem->pvStructurePtr->getStructure() != type // return of old type + || empty.size()+returned.size()>=conf.actualCount+1) // return of force'd + return; // ignore it + + if(pipeline) { + // work done during reportRemoteQueueStatus() + returned.push_back(elem); + return; + } + + bool below = _freeCount() <= freeHighLevel; + + empty.push_front(elem); + + bool above = _freeCount() > freeHighLevel; + + if(!below || !above || !upstream) + return; + + nempty = _freeCount(); + } + + upstream->freeHighMark(this, nempty); + notify(); +} + +void MonitorFIFO::getStats(Stats& s) const +{ + Guard G(mutex); + s.nempty = empty.size() + returned.size(); + s.nfilled = inuse.size(); + s.noutstanding = conf.actualCount - s.nempty - s.nfilled; +} + +void MonitorFIFO::reportRemoteQueueStatus(pvd::int32 nfree) +{ + if(nfree<=0 || !pipeline) + return; // paranoia + + size_t nempty; + { + Guard G(mutex); + + bool below = _freeCount() <= freeHighLevel; + + size_t nack = std::min(size_t(nfree), returned.size()); + flowCount += nfree; + + buffer_t::iterator end(returned.begin()); + std::advance(end, nack); + + // remove[0, nack) from returned and append to empty + empty.splice(empty.end(), returned, returned.begin(), end); + + bool above = _freeCount() > freeHighLevel; + + if(!below || !above || empty.size()<=1 || !upstream) + return; + + nempty = _freeCount(); + } + + upstream->freeHighMark(this, nempty); + notify(); +} + +size_t MonitorFIFO::freeCount() const +{ + Guard G(mutex); + return _freeCount(); +} + +// caller must hold lock +size_t MonitorFIFO::_freeCount() const +{ + if(pipeline) { + return std::max(0, std::min(flowCount, epicsInt32(empty.size()))); + } else { + return empty.empty() ? 0 : empty.size()-1; + } +} + +}} // namespace epics::pvAccess diff --git a/src/client/pv/monitor.h b/src/client/pv/monitor.h index 0572a10..310af64 100644 --- a/src/client/pv/monitor.h +++ b/src/client/pv/monitor.h @@ -9,11 +9,15 @@ #ifndef MONITOR_H #define MONITOR_H +#include +#include + #ifdef epicsExportSharedSymbols # define monitorEpicsExportSharedSymbols # undef epicsExportSharedSymbols #endif +#include #include #include #include @@ -209,6 +213,196 @@ inline MonitorElement::Ref begin(Monitor& mon) { return MonitorElement::Ref(mon) inline MonitorElement::Ref end(Monitor& mon) { return MonitorElement::Ref(); } #endif // __cplusplus<201103L +/** Utility implementation of Monitor. + * + * The Monitor interface defines the downstream (consumer facing) side + * of a FIFO. This class is a concrete implementation of this FIFO, + * including the upstream (producer facing) side. + * + * In addition to MonitorRequester, which provides callbacks to the downstream side, + * The MonitorFIFO::Source class provides callbacks to the upstream side. + * + * The simplest usage is to create (as shown below), then put update into the FIFO + * using post() and tryPost(). These methods behave the same when the queue is + * not full, but differ when it is. Additionally, tryPost() has an argument 'force'. + * Together there are three actions + * + * # post(value, changed) - combines the new update with the last (most recent) in the FIFO. + * # tryPost(value, changed, ..., false) - Makes no change to the FIFO and returns false. + * # tryPost(value, changed, ..., true) - Over-fills the FIFO with the new element, then returns false. + * + * @note Calls to post() or tryPost() __must__ be followed with a call to notify(). + * Callers of notify() __must__ not hold any locks, or a deadlock is possible. + * + * The intent of tryPost() with force=true is to aid code which is transferring values from + * some upstream buffer and this FIFO. Such code can be complicated if an item is removed + * from the upstream buffer, but can't be put into this downstream FIFO. Rather than + * being forced to effectivly maintain a third FIFO, code can use force=true. + * + * In either case, tryPost()==false indicates the the FIFO is full. + * + * eg. simple usage in a sub-class for Channel named MyChannel. + @code + pva::Monitor::shared_pointer + MyChannel::createMonitor(const pva::MonitorRequester::shared_pointer &requester, + const pvd::PVStructure::shared_pointer &pvRequest) + { + std::tr1::shared_ptr ret(new pva::MonitorFIFO(requester, pvRequest)); + ret->open(spamtype); + ret->notify(); + // ret->post(...); // maybe initial update + } + @endcode + */ +class epicsShareClass MonitorFIFO : public Monitor, + public std::tr1::enable_shared_from_this +{ +public: + POINTER_DEFINITIONS(MonitorFIFO); + //! Source methods may be called with downstream mutex locked. + //! Do not call notify(). This is done automatically after return in a way + //! which avoids locking and recursion problems. + struct epicsShareClass Source { + POINTER_DEFINITIONS(Source); + virtual ~Source(); + //! Called when MonitorFIFO::freeCount() rises above the level computed + //! from MonitorFIFO::setFreeHighMark(). + //! @param numEmpty The number of empty slots in the FIFO. + virtual void freeHighMark(MonitorFIFO *mon, size_t numEmpty) {} + }; + struct Config { + size_t maxCount, //!< upper limit on requested FIFO size + defCount, //!< FIFO size when client makes no request + actualCount; //!< filled in with actual FIFO size + }; + + /** + * @param requester Downstream/consumer callbacks + * @param pvRequest Downstream provided options + * @param source Upstream/producer callbacks + * @param conf Upstream provided options. Updated with actual values used. May be NULL to use defaults. + */ + MonitorFIFO(const std::tr1::shared_ptr &requester, + const pvData::PVStructure::const_shared_pointer &pvRequest, + const Source::shared_pointer& source = Source::shared_pointer(), + Config *conf=0); + virtual ~MonitorFIFO(); + + void show(std::ostream& strm) const; + + virtual void destroy() OVERRIDE FINAL; + + // configuration + + //! Level, as a percentage of empty buffer slots, at which to call Source::freeHighMark(). + //! Trigger condition is when number of free buffer slots goes above this level. + //! In range [0.0, 1.0) + void setFreeHighMark(double level); + + // up-stream interface (putting data into FIFO) + //! Mark subscription as "open" with the associated structure type. + void open(const epics::pvData::StructureConstPtr& type); + //! Abnormal closure (eg. due to upstream dis-connection) + void close(); + //! Successful closure (eg. RDB query done) + void finish(); + //! Consume a free slot if available. otherwise ... + //! if !force take no action and return false. + //! if force then attempt to allocate and fill a new slot, then return false. + //! The extra slot will be free'd after it is consumed. + bool tryPost(const pvData::PVStructure& value, + const epics::pvData::BitSet& changed, + const epics::pvData::BitSet& overrun = epics::pvData::BitSet(), + bool force =false); + //! Consume a free slot if available, otherwise squash with most recent + void post(const pvData::PVStructure& value, + const epics::pvData::BitSet& changed, + const epics::pvData::BitSet& overrun = epics::pvData::BitSet()); + //! Call after calling any other upstream interface methods (open()/close()/finish()/post()/...) + //! when no upstream mutexes are locked. + //! Do not call from Source::freeHighMark(). This is done automatically. + //! Call any MonitorRequester methods. + void notify(); + + // down-stream interface (taking data from FIFO) + virtual epics::pvData::Status start() OVERRIDE FINAL; + virtual epics::pvData::Status stop() OVERRIDE FINAL; + virtual MonitorElementPtr poll() OVERRIDE FINAL; + virtual void release(MonitorElementPtr const & monitorElement) OVERRIDE FINAL; // may call Source::freeHighMark() + virtual void getStats(Stats& s) const OVERRIDE FINAL; + virtual void reportRemoteQueueStatus(epics::pvData::int32 freeElements) OVERRIDE FINAL; + + //! Number of unused FIFO slots at this moment, which may changed in the next. + size_t freeCount() const; +private: + size_t _freeCount() const; + + friend void providerRegInit(void*); + static size_t num_instances; + + // const after ctor + Config conf; + + // locking here is complicated... + // our entry points which make callbacks are: + // notify() -> MonitorRequester::monitorConnect() + // -> MonitorRequester::monitorEvent() + // -> MonitorRequester::unlisten() + // -> ChannelBaseRequester::channelDisconnect() + // release() -> Source::freeHighMark() + // -> notify() -> ... + // reportRemoteQueueStatus() -> Source::freeHighMark() + // -> notify() -> ... + mutable epicsMutex mutex; + + // ownership is archored at the downstream (consumer) end. + // strong refs are: + // downstream -> MonitorFIFO -> Source + // weak refs are: + // MonitorRequester <- MonitorFIFO <- upstream + + // so we expect that downstream will hold a strong ref to us, + // and we keep a weak ref to downstream's MonitorRequester + const std::tr1::weak_ptr requester; + + // then we expect to keep a strong ref to upstream (Source) + // and expect that upstream will have only a weak ref to us. + const Source::shared_pointer upstream; + + bool pipeline; // const after ctor + bool opened; // open() vs. close() + bool running; // start() vs. stop() + bool finished; // finish() called + + bool needConnected; + bool needEvent; + bool needUnlisten; + bool needClosed; + + size_t freeHighLevel; + epicsInt32 flowCount; + + typedef std::list buffer_t; + // we allocate one extra buffer element to hold data when post() + // while all elements poll()'d. So there will always be one + // element on either the empty or inuse lists + buffer_t inuse, empty, returned; + /* our elements are in one of 4 states + * Empty - on empty list + * In Use - on inuse list + * Polled - Returnedd from poll(). Not tracked + * Returned - only if pipeline==true, release()'d but not ack'd + */ + + EPICS_NOT_COPYABLE(MonitorFIFO) +}; + +static inline +std::ostream& operator<<(std::ostream& strm, const MonitorFIFO& fifo) { + fifo.show(strm); + return strm; +} + }} namespace epics { namespace pvData { diff --git a/src/factory/ChannelAccessFactory.cpp b/src/factory/ChannelAccessFactory.cpp index 95252b8..a5a1976 100644 --- a/src/factory/ChannelAccessFactory.cpp +++ b/src/factory/ChannelAccessFactory.cpp @@ -198,6 +198,8 @@ struct providerRegGbl_t { epicsThreadOnceId providerRegOnce = EPICS_THREAD_ONCE_INIT; +} // namespace + void providerRegInit(void*) { epicsSignalInstallSigAlarmIgnore(); @@ -217,10 +219,9 @@ void providerRegInit(void*) registerRefCounter("ChannelBaseRequester (ABC)", &ChannelBaseRequester::num_instances); registerRefCounter("ChannelRequest (ABC)", &ChannelRequest::num_instances); registerRefCounter("ResponseHandler (ABC)", &ResponseHandler::num_instances); + registerRefCounter("MonitorFIFO", &MonitorFIFO::num_instances); } -} // namespace - ChannelProviderRegistry::shared_pointer ChannelProviderRegistry::clients() { epicsThreadOnce(&providerRegOnce, &providerRegInit, 0); diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 26a53fb..e3d6059 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -27,6 +27,9 @@ TESTPROD_HOST += testServerContext testServerContext_SRCS += testServerContext.cpp TESTS += testServerContext +TESTPROD_HOST += testmonitorfifo +testmonitorfifo_SRCS += testmonitorfifo.cpp +TESTS += testmonitorfifo PROD_HOST += testServer testServer_SRCS += testServer.cpp diff --git a/testApp/remote/testmonitorfifo.cpp b/testApp/remote/testmonitorfifo.cpp new file mode 100644 index 0000000..eb0bac4 --- /dev/null +++ b/testApp/remote/testmonitorfifo.cpp @@ -0,0 +1,781 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ + +#include + +#include +#include +#include + +#include +#include + +#if __cplusplus>=201103L + +#include +#include + +namespace pvd = epics::pvData; +namespace pva = epics::pvAccess; + +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + +namespace { + +struct Tester { + // we only have one thread, so no need for sync. + enum cb_t { + Connect, + Event, + Unlisten, + Close, + LowWater, + }; + static const char* name(cb_t cb) { + switch(cb) { +#define CASE(NAME) case NAME: return #NAME + CASE(Connect); + CASE(Event); + CASE(Unlisten); + CASE(Close); + CASE(LowWater); + default: return "???"; + } + } + + typedef std::vector timeline_t; + static timeline_t timeline; + + struct Requester : public pva::MonitorRequester { + POINTER_DEFINITIONS(Requester); + + epicsMutex mutex; // not strictly needed, but a good simulation of usage + + virtual ~Requester() {} + virtual std::string getRequesterName() OVERRIDE FINAL {return "Tester::Requester";} + virtual void channelDisconnect(bool destroy) OVERRIDE FINAL { + testDiag("In %s", CURRENT_FUNCTION); + Guard G(mutex); + Tester::timeline.push_back(Close); + } + virtual void monitorConnect(epics::pvData::Status const & status, + pva::MonitorPtr const & monitor, epics::pvData::StructureConstPtr const & structure) OVERRIDE FINAL { + testDiag("In %s", CURRENT_FUNCTION); + Guard G(mutex); + Tester::timeline.push_back(Connect); + } + virtual void monitorEvent(pva::MonitorPtr const & monitor) OVERRIDE FINAL { + testDiag("In %s", CURRENT_FUNCTION); + Guard G(mutex); + Tester::timeline.push_back(Event); + } + virtual void unlisten(pva::MonitorPtr const & monitor) OVERRIDE FINAL { + testDiag("In %s", CURRENT_FUNCTION); + Guard G(mutex); + Tester::timeline.push_back(Unlisten); + } + }; + Requester::shared_pointer requester; + + struct Handler : public pva::MonitorFIFO::Source { + POINTER_DEFINITIONS(Handler); + + epicsMutex mutex; // not strictly needed, but a good simulation of usage + + std::function action; + + virtual ~Handler() {} + virtual void freeHighMark(pva::MonitorFIFO *mon, size_t numEmpty) OVERRIDE FINAL { + testDiag("In %s", CURRENT_FUNCTION); + Guard G(mutex); + Tester::timeline.push_back(LowWater); + if(action) + action(mon, numEmpty); + } + }; + Handler::weak_pointer handler; + + void setAction(const std::function& action) { + Handler::shared_pointer H(handler); + + H->action = action; + } + + pva::MonitorFIFO::shared_pointer mon; + + pvd::StructureConstPtr type; + + Tester(const pvd::PVStructure::const_shared_pointer& pvReq, + pva::MonitorFIFO::Config *conf) + { + Handler::shared_pointer H(new Handler); + handler = H; + requester.reset(new Requester); + mon.reset(new pva::MonitorFIFO(requester, pvReq, H, conf)); + } + + void reset() { + timeline.clear(); + } + + void testTimeline(std::initializer_list l) { + size_t i=0; + for(auto event : l) { + if(i >= timeline.size()) { + testFail("timeline ends early. Expected %s", name(event)); + continue; + } + testOk(event==timeline[i], "%s == %s", name(event), name(timeline[i])); + + i++; + } + for(; icreateFieldBuilder() + ->add("value", t) + ->createStructure(); + + mon->open(type); + } + + void close() { + testDiag("close()"); + type.reset(); + mon->close(); + } + + template + void post(T val) + { + testShow()<<"post("<createPVStructure(type)); + pvd::PVScalarPtr fld(V->getSubFieldT("value")); + fld->putFrom(val); + pvd::BitSet changed; + changed.set(fld->getFieldOffset()); + mon->post(*V, changed); + } + + template + void tryPost(T val, bool expect, bool force = false) + { + assert(!!type); + pvd::PVStructurePtr V(pvd::getPVDataCreate()->createPVStructure(type)); + pvd::PVScalarPtr fld(V->getSubFieldT("value")); + fld->putFrom(val); + pvd::BitSet changed; + changed.set(fld->getFieldOffset()); + testEqual(mon->tryPost(*V, changed, pvd::BitSet(), force), expect)<<" value="< +void testPop(pva::Monitor& mon, T expected, bool overrun = false) +{ + pva::MonitorElement::Ref elem(mon); + if(!elem) { + testFail("Queue unexpected empty"); + return; + } + pvd::PVScalarPtr fld(elem->pvStructurePtr->getSubFieldT("value")); + T actual(fld->getAs()); + bool changed = elem->changedBitSet->get(fld->getFieldOffset()); + bool overran = elem->overrunBitSet->get(fld->getFieldOffset()); + + bool pop = actual==expected && changed && overran==overrun; + testTrue(pop) + <<" "<createPVStructure( + pvd::getFieldCreate()->createFieldBuilder() + ->createStructure())); +pvd::PVStructure::const_shared_pointer +pvReqPipelineBuild() { + pvd::PVStructure::shared_pointer ret(pvd::getPVDataCreate()->createPVStructure( + pvd::getFieldCreate()->createFieldBuilder() + ->addNestedStructure("record") + ->addNestedStructure("_options") + ->add("pipeline", pvd::pvBoolean) + ->add("queueSize", pvd::pvUInt) + ->endNested() + ->endNested() + ->createStructure())); + ret->getSubFieldT("record._options.pipeline")->putFrom(true); + ret->getSubFieldT("record._options.queueSize")->putFrom(2); + return ret; +} + +static const pvd::PVStructure::const_shared_pointer pvReqPipeline(pvReqPipelineBuild()); + +// Test the basic life cycle of a FIFO. +// post and pop single elements +void checkPlain() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + Tester tester(pvReqEmpty, 0); + + tester.connect(pvd::pvInt); + tester.mon->notify(); + tester.testTimeline({Tester::Connect}); + + tester.mon->start(); + tester.testTimeline({}); + + testEmpty(*tester.mon); + + tester.post(5); + tester.mon->notify(); + + tester.testTimeline({Tester::Event}); + + testPop(*tester.mon, 5); + testEmpty(*tester.mon); + + tester.mon->stop(); + tester.close(); + tester.mon->notify(); + tester.testTimeline({Tester::Close}); +} + +// close() while FIFO not empty. +void checkAfterClose() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + Tester tester(pvReqEmpty, 0); + + tester.connect(pvd::pvInt); + tester.mon->notify(); + tester.testTimeline({Tester::Connect}); + + tester.mon->start(); + tester.testTimeline({}); + + testEmpty(*tester.mon); + + tester.post(5); + tester.mon->stop(); + tester.close(); + tester.mon->notify(); + + tester.testTimeline({Tester::Event, Tester::Close}); + + // FIFO not cleared by close + testPop(*tester.mon, 5); + testEmpty(*tester.mon); + + tester.mon->notify(); + tester.testTimeline({}); +} + +// close() while FIFO not empty, then re-open +void checkReOpenLost() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + Tester tester(pvReqEmpty, 0); + + tester.connect(pvd::pvInt); + tester.mon->notify(); + tester.testTimeline({Tester::Connect}); + + tester.mon->start(); + tester.testTimeline({}); + + testEmpty(*tester.mon); + + tester.post(5); + tester.mon->stop(); + tester.close(); + tester.mon->notify(); + // FIFO cleared by connect() + tester.connect(pvd::pvInt); + tester.mon->notify(); + tester.mon->start(); + + tester.testTimeline({Tester::Event, Tester::Close, Tester::Connect}); + + // update 5 was lost! + testEmpty(*tester.mon); + + tester.mon->notify(); + tester.testTimeline({}); +} + +// type change while FIFO is empty +void checkTypeChange() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + Tester tester(pvReqEmpty, 0); + + tester.connect(pvd::pvInt); + tester.mon->start(); + tester.mon->notify(); + tester.testTimeline({Tester::Connect}); + + tester.post(5); + tester.mon->notify(); + tester.testTimeline({Tester::Event}); + + testPop(*tester.mon, 5); + testEmpty(*tester.mon); + + tester.mon->stop(); + tester.close(); + tester.mon->notify(); + tester.testTimeline({Tester::Close}); + + testDiag("Type change"); + + tester.connect(pvd::pvString); + tester.mon->start(); + tester.mon->notify(); + tester.testTimeline({Tester::Connect}); + + tester.post(std::string("hello")); + tester.mon->notify(); + tester.testTimeline({Tester::Event}); + + testPop(*tester.mon, std::string("hello")); + testEmpty(*tester.mon); + + tester.mon->stop(); + tester.close(); + tester.mon->notify(); + tester.testTimeline({Tester::Close}); +} + +// post() until the FIFO is full, and keep going. Check overrun behavour +void checkFill() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + pva::MonitorFIFO::Config conf = {4, 2, 0}; + Tester tester(pvReqEmpty, &conf); + + testEqual(conf.actualCount, 2u); + + tester.connect(pvd::pvInt); + tester.mon->notify(); + + tester.testTimeline({Tester::Connect}); + + tester.mon->start(); + + testEqual(conf.actualCount, tester.mon->freeCount()); + + testShow()<<"Empty before "<<*tester.mon; + + // fill up and overflow + tester.post(5); + testShow()<<"A5 "<<*tester.mon; + tester.post(6); + testShow()<<"Full "<<*tester.mon; + tester.post(7); + + tester.mon->notify(); + tester.testTimeline({Tester::Event}); + + testShow()<<"Overrun1 "<<*tester.mon; + + tester.post(8); + tester.mon->notify(); + tester.testTimeline({}); + + testShow()<<"Overrun2 "<<*tester.mon; + + testPop(*tester.mon, 5); + tester.testTimeline({}); + testPop(*tester.mon, 6); + tester.testTimeline({Tester::LowWater}); + testPop(*tester.mon, 8, true); + tester.testTimeline({}); + testEmpty(*tester.mon); + + testShow()<<"Empty again "<<*tester.mon; + + // repeat + tester.post(15); + tester.post(16); + tester.post(17); + tester.post(18); + tester.mon->notify(); + tester.testTimeline({Tester::Event}); + + testPop(*tester.mon, 15); + tester.testTimeline({}); + testPop(*tester.mon, 16); + tester.testTimeline({Tester::LowWater}); + testPop(*tester.mon, 18, true); + tester.testTimeline({}); + testEmpty(*tester.mon); + + tester.mon->stop(); + tester.close(); + tester.mon->notify(); + tester.testTimeline({Tester::Close}); +} + +// post() until past full, then pop() and post() on a partially full queue +void checkSaturate() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + pva::MonitorFIFO::Config conf = {4, 2, 0}; + Tester tester(pvReqEmpty, &conf); + + testEqual(conf.actualCount, 2u); + + tester.connect(pvd::pvInt); + tester.mon->notify(); + tester.testTimeline({Tester::Connect}); + + tester.mon->start(); + + testShow()<<"Empty before "<<*tester.mon; + + // fill up and overflow + tester.post(5); + tester.post(6); + tester.post(7); + tester.post(8); + tester.mon->notify(); + tester.testTimeline({Tester::Event}); + + testPop(*tester.mon, 5); + tester.testTimeline({}); + tester.post(9); + tester.mon->notify(); + tester.testTimeline({}); + + testPop(*tester.mon, 6); + tester.testTimeline({}); + tester.post(10); + tester.mon->notify(); + tester.testTimeline({}); + + testPop(*tester.mon, 8, true); + tester.testTimeline({}); + tester.post(11); + tester.mon->notify(); + tester.testTimeline({}); + + testShow()<<"Overrun2 "<<*tester.mon; + + testPop(*tester.mon, 9); + tester.testTimeline({}); + testPop(*tester.mon, 10); + tester.testTimeline({Tester::LowWater}); + testPop(*tester.mon, 11); + tester.testTimeline({}); + testEmpty(*tester.mon); + tester.testTimeline({}); + + testShow()<<"Empty again "<<*tester.mon; + + tester.mon->stop(); + tester.close(); + tester.mon->notify(); + tester.testTimeline({Tester::Close}); +} + +// check handling of pipeline=true +void checkPipeline() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + pva::MonitorFIFO::Config conf = {4, 3, 0}; + Tester tester(pvReqPipeline, &conf); + + testEqual(conf.actualCount, 2u); + + tester.connect(pvd::pvInt); + tester.mon->notify(); + tester.testTimeline({Tester::Connect}); + + tester.mon->start(); + + testEqual(tester.mon->freeCount(), 0u); + tester.mon->reportRemoteQueueStatus(2); + testEqual(tester.mon->freeCount(), 2u); + tester.testTimeline({Tester::LowWater}); + + // fill up and overflow + tester.post(5); + tester.post(6); + tester.post(7); + tester.post(8); + tester.mon->notify(); + tester.testTimeline({Tester::Event}); + + testEqual(tester.mon->freeCount(), 0u); + testPop(*tester.mon, 5); + testPop(*tester.mon, 6); + testEmpty(*tester.mon); + + testDiag("ACK 2"); + testShow()<<"Before ACK "<<*tester.mon; + tester.mon->reportRemoteQueueStatus(2); + testShow()<<"After ACK "<<*tester.mon; + tester.testTimeline({Tester::LowWater}); + testEqual(tester.mon->freeCount(), 1u); + + // still have the overflow'd element on the queue + + testPop(*tester.mon, 8, true); + testEmpty(*tester.mon); + + tester.mon->reportRemoteQueueStatus(1); + testEqual(tester.mon->freeCount(), 2u); + tester.testTimeline({}); + + testShow()<<"Empty before re-fill "<<*tester.mon; + + // fill up and overflow + tester.post(15); + tester.post(16); + tester.post(17); + tester.post(18); + tester.mon->notify(); + tester.testTimeline({Tester::Event}); + + testPop(*tester.mon, 15); + testPop(*tester.mon, 16); + testEmpty(*tester.mon); + tester.testTimeline({}); + + testDiag("ACK 1"); + testShow()<<"Before ACK "<<*tester.mon; + tester.mon->reportRemoteQueueStatus(1); + testShow()<<"After ACK "<<*tester.mon; + tester.testTimeline({}); + + // 1 inuse, 1 returned, 1 empty + + tester.post(19); + tester.post(20); + + testPop(*tester.mon, 18, true); + testEmpty(*tester.mon); + + tester.post(21); + + testEmpty(*tester.mon); + + testShow()<<"Before final ACK "<<*tester.mon; + + tester.mon->reportRemoteQueueStatus(2); + tester.testTimeline({Tester::LowWater}); + + testPop(*tester.mon, 21, true); + testEmpty(*tester.mon); + + tester.mon->stop(); + tester.close(); + tester.mon->notify(); + tester.testTimeline({Tester::Close}); +} + +// test the spam counter (keeps FIFO full) with pipeline=true +void checkSpam() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + pva::MonitorFIFO::Config conf = {4, 3, 0}; + Tester tester(pvReqPipeline, &conf); + + pvd::uint32 cnt = 0; + + tester.setAction([&tester, &cnt] (pva::MonitorFIFO *mon, size_t nfree) { + testDiag("Spamming %zu", nfree); + while(nfree--) { + testDiag("nfree=%zu %c", nfree, (nfree>0)?'T':'F'); + tester.tryPost(cnt++, nfree>0); + } + }); + tester.mon->setFreeHighMark(0); // run action when all buffers free + + tester.connect(pvd::pvInt); + tester.mon->notify(); + tester.testTimeline({Tester::Connect}); + + testEqual(conf.actualCount, 2u); + + testDiag("prime the pump"); + tester.mon->reportRemoteQueueStatus(conf.actualCount); + + tester.mon->notify(); + tester.testTimeline({Tester::LowWater}); + + testShow()<<"Before start() "<<*tester.mon<<"\n"; + tester.mon->start(); + testShow()<<"After start() "<<*tester.mon<<"\n"; + tester.testTimeline({Tester::Event}); + + testPop(*tester.mon, 0); + testPop(*tester.mon, 1); + testEmpty(*tester.mon); + + tester.mon->reportRemoteQueueStatus(2); + tester.testTimeline({Tester::LowWater, Tester::Event}); + + testPop(*tester.mon, 2); + testPop(*tester.mon, 3); + testEmpty(*tester.mon); + + tester.mon->reportRemoteQueueStatus(1); + tester.testTimeline({Tester::LowWater, Tester::Event}); + + testPop(*tester.mon, 4); + testEmpty(*tester.mon); + + tester.mon->reportRemoteQueueStatus(2); + tester.testTimeline({Tester::LowWater, Tester::Event}); + + testPop(*tester.mon, 5); + testPop(*tester.mon, 6); + testEmpty(*tester.mon); + + tester.mon->stop(); + tester.close(); + tester.mon->notify(); + tester.testTimeline({Tester::Close}); +} + +// check pipeline=true, watermark, and unlisten. +// a sequence with a definite end +void checkCountdown() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + pva::MonitorFIFO::Config conf = {4, 3, 0}; + Tester tester(pvReqPipeline, &conf); + + pvd::int32 cnt = 10; + + tester.setAction([&tester, &cnt] (pva::MonitorFIFO *mon, size_t nfree) { + testDiag("Spamming %zu", nfree); + while(cnt >= 0 && nfree--) { + pvd::int32 c = cnt--; + testDiag("Count %d nfree=%zu %c", c, nfree, (nfree>0)?'T':'F'); + tester.tryPost(c, nfree>0); + } + if(cnt<0) { + testDiag("Finished!"); + tester.mon->finish(); + } + }); + tester.mon->setFreeHighMark(0.5); // run action() when one of two buffer elements is free + + tester.connect(pvd::pvInt); + tester.mon->notify(); + tester.testTimeline({Tester::Connect}); + + testEqual(conf.actualCount, 2u); + + testDiag("prime the pump"); + tester.mon->reportRemoteQueueStatus(conf.actualCount); + + tester.mon->notify(); + tester.testTimeline({Tester::LowWater}); + + tester.mon->start(); + tester.testTimeline({Tester::Event}); + + + testPop(*tester.mon, 10); + testPop(*tester.mon, 9); + testEmpty(*tester.mon); + tester.testTimeline({}); + + tester.mon->reportRemoteQueueStatus(2); + tester.testTimeline({Tester::LowWater, Tester::Event}); + + testPop(*tester.mon, 8); + testPop(*tester.mon, 7); + testEmpty(*tester.mon); + tester.testTimeline({}); + + tester.mon->reportRemoteQueueStatus(1); + tester.testTimeline({}); // nothing happens, watermark not reached + + tester.mon->reportRemoteQueueStatus(1); + tester.testTimeline({Tester::LowWater, Tester::Event}); + + testPop(*tester.mon, 6); + testPop(*tester.mon, 5); + testEmpty(*tester.mon); + tester.testTimeline({}); + + tester.mon->reportRemoteQueueStatus(1); + tester.testTimeline({}); + tester.mon->reportRemoteQueueStatus(1); + tester.testTimeline({Tester::LowWater, Tester::Event}); + + testPop(*tester.mon, 4); + testPop(*tester.mon, 3); + testEmpty(*tester.mon); + tester.testTimeline({}); + + tester.mon->reportRemoteQueueStatus(2); + tester.testTimeline({Tester::LowWater, Tester::Event}); + + testPop(*tester.mon, 2); + testPop(*tester.mon, 1); + testEmpty(*tester.mon); + tester.testTimeline({}); + + tester.mon->reportRemoteQueueStatus(2); + tester.testTimeline({Tester::LowWater, Tester::Event}); + + testPop(*tester.mon, 0); + testEmpty(*tester.mon); + tester.testTimeline({Tester::Unlisten}); + + tester.mon->stop(); + tester.close(); + tester.mon->notify(); + tester.testTimeline({Tester::Close}); +} + +} // namespace + +MAIN(testmonitorfifo) +{ + testPlan(184); + checkPlain(); + checkAfterClose(); + checkReOpenLost(); + checkTypeChange(); + checkFill(); + checkSaturate(); + checkPipeline(); + checkSpam(); + checkCountdown(); + return testDone(); +} + +#else /* c++11 */ + +MAIN(testmonitorfifo) +{ + testPlan(1); + testSkip(1, "no c++11"); + return testDone(); +} + +#endif /* c++11 */