From 97e1cd4c69c32b564e9d80185dc9d773cc5bce23 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 18 Jul 2017 17:23:21 +0200 Subject: [PATCH] pvac: add sync. monitor --- documentation/examples.h | 9 +++ documentation/mainpage.h | 5 +- examples/Makefile | 1 + examples/minimonitor.cpp | 102 ++++++++++++++++++++++++++++++++ src/client/clientMonitor.cpp | 31 +++++----- src/client/clientSync.cpp | 109 +++++++++++++++++++++++++++++++++-- src/client/pva/client.h | 87 ++++++++++++++++++++++------ 7 files changed, 305 insertions(+), 39 deletions(-) create mode 100644 examples/minimonitor.cpp diff --git a/documentation/examples.h b/documentation/examples.h index 83997ca..8d8e094 100644 --- a/documentation/examples.h +++ b/documentation/examples.h @@ -18,6 +18,15 @@ The shortest possible PVA put() example. */ /** +@page examples_minimonitor Simple Client Monitor Example + +The shortest possible PVA monitor() example. + +@include minimonitor.cpp + +*/ +/** + @page examples_getme Client Get Example This example demonstrates a client which issues a Get operation on startup, diff --git a/documentation/mainpage.h b/documentation/mainpage.h index afe1571..ab899d2 100644 --- a/documentation/mainpage.h +++ b/documentation/mainpage.h @@ -13,10 +13,11 @@ @section main_examples API usage Examples -- Simplest (shortest) possible +- Simple synchronous (blocking) examples - @ref examples_miniget - @ref examples_miniput -- More complete + - @ref examples_minimonitor +- More complete callback based examples - @ref examples_getme - @ref examples_putme - @ref examples_monitorme diff --git a/examples/Makefile b/examples/Makefile index 117db1f..ad03187 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -11,6 +11,7 @@ TESTPROD_HOST += spamme TESTPROD_HOST += miniget TESTPROD_HOST += miniput +TESTPROD_HOST += minimonitor include $(TOP)/configure/RULES #---------------------------------------- diff --git a/examples/minimonitor.cpp b/examples/minimonitor.cpp new file mode 100644 index 0000000..58cce0d --- /dev/null +++ b/examples/minimonitor.cpp @@ -0,0 +1,102 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ +// The simplest possible PVA monitor + +#include + +#if !defined(_WIN32) +#include +#define USE_SIGNAL +#endif + +#include + +#include + +static volatile bool done; + +#ifdef USE_SIGNAL +static pvac::MonitorSync * volatile subscription; +static +void handler(int num) +{ + (void)num; + done = true; + if(subscription) + subscription->wake(); +} +#endif + +int main(int argc, char *argv[]) +{ + try { + if(argc<=1) { + std::cerr<<"Usage: "<\n"; + return 1; + } + + pvac::ClientProvider provider("pva"); + + pvac::ClientChannel channel(provider.connect(argv[1])); + + pvac::MonitorSync mon(channel.monitor()); + +#ifdef USE_SIGNAL + subscription = &mon; + signal(SIGINT, handler); + signal(SIGTERM, handler); + signal(SIGQUIT, handler); +#endif + + int ret = 0; + + while(!done) { + if(!mon.wait()) // updates mon.event + continue; + + switch(mon.event.event) { + // Subscription network/internal error + case pvac::MonitorEvent::Fail: + std::cerr<\n"; + done = true; + break; + // Underlying channel becomes disconnected + case pvac::MonitorEvent::Disconnect: + std::cout<\n"; + break; + // Data queue becomes not-empty + case pvac::MonitorEvent::Data: + // We drain event FIFO completely + while(mon.poll()) { + std::cout<\n"; + } + break; + } + } + +#ifdef USE_SIGNAL + signal(SIGINT, SIG_DFL); + signal(SIGTERM, SIG_DFL); + signal(SIGQUIT, SIG_DFL); + subscription = 0; +#endif + + return ret; + }catch(std::exception& e){ + std::cerr<<"Error: "<stop(); + started = false; + } + op->destroy(); + callEvent(G, MonitorEvent::Cancel); + } virtual std::string getRequesterName() OVERRIDE FINAL { return "RPCer"; } @@ -151,20 +164,10 @@ std::string Monitor::name() const void Monitor::cancel() { - if(!impl) return; - Guard G(impl->mutex); - - root.reset(); changed.clear(); overrun.clear(); - impl->last.reset(); - - if(impl->started) { - impl->op->stop(); - impl->started = false; - } - impl->op->destroy(); - impl->callEvent(G, MonitorEvent::Cancel); + root.reset(); + if(impl) impl->cancel(); } bool Monitor::poll() @@ -187,7 +190,7 @@ bool Monitor::poll() bool Monitor::complete() const { - if(impl) return true; + if(!impl) return true; Guard G(impl->mutex); return impl->done && impl->seenEmpty; } diff --git a/src/client/clientSync.cpp b/src/client/clientSync.cpp index ff7093e..1c30088 100644 --- a/src/client/clientSync.cpp +++ b/src/client/clientSync.cpp @@ -48,7 +48,7 @@ struct GetWait : public pvac::ClientChannel::GetCallback, GetWait() {} virtual ~GetWait() {} - virtual void getDone(const pvac::GetEvent& evt) + virtual void getDone(const pvac::GetEvent& evt) OVERRIDE FINAL { { Guard G(mutex); @@ -111,7 +111,7 @@ struct PutValCommon : public pvac::ClientChannel::PutCallback, PutValCommon() {} virtual ~PutValCommon() {} - virtual void putDone(const PutEvent& evt) + virtual void putDone(const PutEvent& evt) OVERRIDE FINAL { { Guard G(mutex); @@ -134,7 +134,7 @@ struct PutValScalar : public PutValCommon PutValScalar(const void* value, pvd::ScalarType vtype) :value(value), vtype(vtype) {} virtual ~PutValScalar() {} - virtual void putBuild(const epics::pvData::StructureConstPtr& build, Args& args) + virtual void putBuild(const epics::pvData::StructureConstPtr& build, Args& args) OVERRIDE FINAL { pvd::PVStructurePtr root(pvd::getPVDataCreate()->createPVStructure(build)); pvd::PVScalarPtr value(root->getSubField("value")); @@ -157,7 +157,7 @@ struct PutValArray : public PutValCommon PutValArray(const pvd::shared_vector& arr) :arr(arr) {} virtual ~PutValArray() {} - virtual void putBuild(const epics::pvData::StructureConstPtr& build, Args& args) + virtual void putBuild(const epics::pvData::StructureConstPtr& build, Args& args) OVERRIDE FINAL { pvd::PVStructurePtr root(pvd::getPVDataCreate()->createPVStructure(build)); pvd::PVScalarArrayPtr value(root->getSubField("value")); @@ -177,7 +177,7 @@ void ClientChannel::putValue(const void* value, pvd::ScalarType vtype, double timeout, - pvd::PVStructure::const_shared_pointer pvRequest) + const epics::pvData::PVStructure::const_shared_pointer &pvRequest) { PutValScalar waiter(value, vtype); Operation op(put(&waiter, pvRequest)); @@ -191,7 +191,7 @@ ClientChannel::putValue(const void* value, void ClientChannel::putValue(const epics::pvData::shared_vector& value, double timeout, - epics::pvData::PVStructure::const_shared_pointer pvRequest) + const epics::pvData::PVStructure::const_shared_pointer &pvRequest) { PutValArray waiter(value); Operation op(put(&waiter, pvRequest)); @@ -202,5 +202,102 @@ ClientChannel::putValue(const epics::pvData::shared_vector& value, throw std::runtime_error(waiter.result.message); } +struct MonitorSync::SImpl : public ClientChannel::MonitorCallback +{ + const bool ourevent; + epicsEvent * const event; + + epicsMutex mutex; + bool hadevent; + + MonitorEvent last; + + // maintained to ensure we (MonitorCallback) outlive the subscription + Monitor sub; + + SImpl(epicsEvent *event) + :ourevent(!event) + ,event(ourevent ? new epicsEvent : event) + {} + virtual ~SImpl() + { + sub.cancel(); + if(ourevent) + delete event; + } + + virtual void monitorEvent(const MonitorEvent& evt) OVERRIDE FINAL + { + { + Guard G(mutex); + last = evt; + hadevent = true; + } + event->signal(); + } +}; + +MonitorSync::MonitorSync(const Monitor& mon, const std::tr1::shared_ptr& simpl) + :Monitor(mon.impl) + ,simpl(simpl) +{ + simpl->sub = mon; + event.event = MonitorEvent::Fail; +} + +MonitorSync::~MonitorSync() { + std::cout<<"SYNC use_count="<mutex); + event = simpl->last; + simpl->last.event = MonitorEvent::Fail; + bool ret = simpl->hadevent; + simpl->hadevent = false; + return ret; +} + +bool MonitorSync::wait() +{ + if(!simpl) throw std::logic_error("No subscription"); + simpl->event->wait(); + Guard G(simpl->mutex); + event = simpl->last; + simpl->last.event = MonitorEvent::Fail; + bool ret = simpl->hadevent; + simpl->hadevent = false; + return ret; +} + +bool MonitorSync::wait(double timeout) +{ + if(!simpl) throw std::logic_error("No subscription"); + bool ret = simpl->event->wait(timeout); + if(ret) { + Guard G(simpl->mutex); + event = simpl->last; + simpl->last.event = MonitorEvent::Fail; + ret = simpl->hadevent; + simpl->hadevent = false; + } + return ret; +} + +void MonitorSync::wake() { + if(simpl) simpl->event->signal(); +} + +MonitorSync +ClientChannel::monitor(const epics::pvData::PVStructure::const_shared_pointer &pvRequest, + epicsEvent *event) +{ + std::tr1::shared_ptr simpl(new MonitorSync::SImpl(event)); + Monitor mon(monitor(simpl.get(), pvRequest)); + return MonitorSync(mon, simpl); +} + }//namespace pvac diff --git a/src/client/pva/client.h b/src/client/pva/client.h index c0a8c11..32f84b8 100644 --- a/src/client/pva/client.h +++ b/src/client/pva/client.h @@ -12,6 +12,8 @@ #include #include +class epicsEvent; + namespace epics {namespace pvAccess { class ChannelProvider; class Channel; @@ -66,7 +68,7 @@ protected: }; //! Information on put completion -struct epicsShareClass PutEvent +struct PutEvent { enum event_t { Fail, //!< request ends in failure. Check message @@ -84,6 +86,8 @@ struct epicsShareClass GetEvent : public PutEvent epics::pvData::PVStructure::const_shared_pointer value; }; +struct MonitorSync; + //! Handle for monitor subscription struct epicsShareClass Monitor { @@ -97,21 +101,26 @@ struct epicsShareClass Monitor //! Immediate cancellation. //! Does not wait for remote confirmation. void cancel(); - //! updates root, changed, overrun - //! return true if root!=NULL + /** updates root, changed, overrun + * + * @return true if root!=NULL + * @note MonitorEvent::Data will not be repeated until poll()==false. + */ bool poll(); //! true if all events received. + //! Check after poll()==false bool complete() const; epics::pvData::PVStructure::const_shared_pointer root; epics::pvData::BitSet changed, overrun; -protected: +private: std::tr1::shared_ptr impl; + friend struct MonitorSync; }; //! Information on monitor subscription/queue change -struct epicsShareClass MonitorEvent +struct MonitorEvent { enum event_t { Fail=1, //!< subscription ends in an error @@ -120,17 +129,49 @@ struct epicsShareClass MonitorEvent Data=8, //!< Data queue not empty. Call Monitor::poll() } event; std::string message; // set for event=Fail - void *priv; +}; + +/** Subscription usable w/o callbacks + * + * Basic usage is to call wait(). + * If true is returned, then the 'event', 'root', 'changed', and 'overrun' + * members have been updated with a new event. + * Test 'event.event' first to find out which kind of event has occured. + */ +struct epicsShareClass MonitorSync : public Monitor +{ + struct SImpl; + MonitorSync() {} + MonitorSync(const Monitor&, const std::tr1::shared_ptr&); + ~MonitorSync(); + + //! wait for new event + bool wait(); + //! wait for new event + //! @return false on timeout + bool wait(double timeout); + //! check if new event is available + bool poll(); + + //! Abort one call to wait() + //! wait() will return with MonitorEvent::Fail + void wake(); + + //! most recent event + //! updated only during wait() or poll() + MonitorEvent event; +private: + std::tr1::shared_ptr simpl; }; //! information on connect/disconnect -struct epicsShareClass ConnectEvent +struct ConnectEvent { bool connected; }; //! Thrown by blocking methods of ClientChannel on operation timeout -struct epicsShareClass Timeout : public std::runtime_error +struct Timeout : public std::runtime_error { Timeout(); }; @@ -177,7 +218,7 @@ public: std::string name() const; //! callback for get() and rpc() - struct epicsShareClass GetCallback { + struct GetCallback { virtual ~GetCallback() {} //! get or rpc operation is complete virtual void getDone(const GetEvent& evt)=0; @@ -216,7 +257,7 @@ public: epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer()); //! callbacks for put() - struct epicsShareClass PutCallback { + struct PutCallback { virtual ~PutCallback() {} struct Args { Args(epics::pvData::BitSet& tosend) :tosend(tosend) {} @@ -248,23 +289,23 @@ public: double timeout = 3.0, epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer()) { - putValue(&value, ID, timeout, pvRequest); + putValue(static_cast(&value), ID, timeout, pvRequest); } //! Put to the 'value' field and block until complete. //! Accepts untyped scalar value void putValue(const void* value, epics::pvData::ScalarType vtype, - double timeout, - epics::pvData::PVStructure::const_shared_pointer pvRequest); + double timeout = 3.0, + const epics::pvData::PVStructure::const_shared_pointer& pvRequest = epics::pvData::PVStructure::const_shared_pointer()); //! Put to the 'value' field and block until complete. //! Accepts scalar array void putValue(const epics::pvData::shared_vector& value, - double timeout, - epics::pvData::PVStructure::const_shared_pointer pvRequest); + double timeout = 3.0, + const epics::pvData::PVStructure::const_shared_pointer& pvRequest = epics::pvData::PVStructure::const_shared_pointer()); //! Monitor event notification - struct epicsShareClass MonitorCallback { + struct MonitorCallback { virtual ~MonitorCallback() {} /** New monitor event * @@ -281,8 +322,20 @@ public: Monitor monitor(MonitorCallback *cb, epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer()); + /** Begin subscription w/o callbacks + * + * @param event If not NULL, then subscription events are signaled to this epicsEvent. Test with poll(). + * Otherwise an internal epicsEvent is allocated for use with wait() + * + * @note For simple usage with a single MonitorSync, pass event=NULL and call wait(). + * If more than one MonitorSync is being created, then pass a custom epicsEvent and use poll() to test + * which subscriptions have events pending. + */ + MonitorSync monitor(const epics::pvData::PVStructure::const_shared_pointer& pvRequest = epics::pvData::PVStructure::const_shared_pointer(), + epicsEvent *event =0); + //! Connection state change CB - struct epicsShareClass ConnectCallback { + struct ConnectCallback { virtual ~ConnectCallback() {} virtual void connectEvent(const ConnectEvent& evt)=0; };