pvac: add sync. monitor

This commit is contained in:
Michael Davidsaver
2017-07-18 17:23:21 +02:00
parent 10b4ba8ae5
commit 97e1cd4c69
7 changed files with 305 additions and 39 deletions

View File

@ -18,6 +18,15 @@ The shortest possible PVA put() example.
*/
/**
@page examples_minimonitor Simple Client Monitor Example
The shortest possible PVA monitor() example.
@include minimonitor.cpp
*/
/**
@page examples_getme Client Get Example
This example demonstrates a client which issues a Get operation on startup,

View File

@ -13,10 +13,11 @@
@section main_examples API usage Examples
- Simplest (shortest) possible
- Simple synchronous (blocking) examples
- @ref examples_miniget
- @ref examples_miniput
- More complete
- @ref examples_minimonitor
- More complete callback based examples
- @ref examples_getme
- @ref examples_putme
- @ref examples_monitorme

View File

@ -11,6 +11,7 @@ TESTPROD_HOST += spamme
TESTPROD_HOST += miniget
TESTPROD_HOST += miniput
TESTPROD_HOST += minimonitor
include $(TOP)/configure/RULES
#----------------------------------------

102
examples/minimonitor.cpp Normal file
View File

@ -0,0 +1,102 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
// The simplest possible PVA monitor
#include <iostream>
#if !defined(_WIN32)
#include <signal.h>
#define USE_SIGNAL
#endif
#include <epicsEvent.h>
#include <pva/client.h>
static volatile bool done;
#ifdef USE_SIGNAL
static pvac::MonitorSync * volatile subscription;
static
void handler(int num)
{
(void)num;
done = true;
if(subscription)
subscription->wake();
}
#endif
int main(int argc, char *argv[])
{
try {
if(argc<=1) {
std::cerr<<"Usage: "<<argv[0]<<" <pvname>\n";
return 1;
}
pvac::ClientProvider provider("pva");
pvac::ClientChannel channel(provider.connect(argv[1]));
pvac::MonitorSync mon(channel.monitor());
#ifdef USE_SIGNAL
subscription = &mon;
signal(SIGINT, handler);
signal(SIGTERM, handler);
signal(SIGQUIT, handler);
#endif
int ret = 0;
while(!done) {
if(!mon.wait()) // updates mon.event
continue;
switch(mon.event.event) {
// Subscription network/internal error
case pvac::MonitorEvent::Fail:
std::cerr<<mon.name()<<" : Error : "<<mon.event.message<<"\n";
ret = 1;
done = true;
break;
// explicit call of 'mon.cancel' or subscription dropped
case pvac::MonitorEvent::Cancel:
std::cout<<mon.name()<<" <Cancel>\n";
done = true;
break;
// Underlying channel becomes disconnected
case pvac::MonitorEvent::Disconnect:
std::cout<<mon.name()<<" <Disconnect>\n";
break;
// Data queue becomes not-empty
case pvac::MonitorEvent::Data:
// We drain event FIFO completely
while(mon.poll()) {
std::cout<<mon.name()<<" : "<<mon.root;
}
// check to see if more events might be sent
if(mon.complete()) {
done = true;
std::cout<<mon.name()<<" : <Complete>\n";
}
break;
}
}
#ifdef USE_SIGNAL
signal(SIGINT, SIG_DFL);
signal(SIGTERM, SIG_DFL);
signal(SIGQUIT, SIG_DFL);
subscription = 0;
#endif
return ret;
}catch(std::exception& e){
std::cerr<<"Error: "<<e.what()<<"\n";
return 1;
}
}

View File

@ -40,7 +40,7 @@ struct Monitor::Impl : public pva::MonitorRequester
,seenEmpty(false)
,cb(cb)
{}
virtual ~Impl() {}
virtual ~Impl() {cancel();}
void callEvent(Guard& G, MonitorEvent::event_t evt = MonitorEvent::Fail)
{
@ -74,6 +74,19 @@ struct Monitor::Impl : public pva::MonitorRequester
}
}
void cancel()
{
Guard G(mutex);
last.reset();
if(started) {
op->stop();
started = false;
}
op->destroy();
callEvent(G, MonitorEvent::Cancel);
}
virtual std::string getRequesterName() OVERRIDE FINAL
{ return "RPCer"; }
@ -151,20 +164,10 @@ std::string Monitor::name() const
void Monitor::cancel()
{
if(!impl) return;
Guard G(impl->mutex);
root.reset();
changed.clear();
overrun.clear();
impl->last.reset();
if(impl->started) {
impl->op->stop();
impl->started = false;
}
impl->op->destroy();
impl->callEvent(G, MonitorEvent::Cancel);
root.reset();
if(impl) impl->cancel();
}
bool Monitor::poll()
@ -187,7 +190,7 @@ bool Monitor::poll()
bool Monitor::complete() const
{
if(impl) return true;
if(!impl) return true;
Guard G(impl->mutex);
return impl->done && impl->seenEmpty;
}

View File

@ -48,7 +48,7 @@ struct GetWait : public pvac::ClientChannel::GetCallback,
GetWait() {}
virtual ~GetWait() {}
virtual void getDone(const pvac::GetEvent& evt)
virtual void getDone(const pvac::GetEvent& evt) OVERRIDE FINAL
{
{
Guard G(mutex);
@ -111,7 +111,7 @@ struct PutValCommon : public pvac::ClientChannel::PutCallback,
PutValCommon() {}
virtual ~PutValCommon() {}
virtual void putDone(const PutEvent& evt)
virtual void putDone(const PutEvent& evt) OVERRIDE FINAL
{
{
Guard G(mutex);
@ -134,7 +134,7 @@ struct PutValScalar : public PutValCommon
PutValScalar(const void* value, pvd::ScalarType vtype) :value(value), vtype(vtype) {}
virtual ~PutValScalar() {}
virtual void putBuild(const epics::pvData::StructureConstPtr& build, Args& args)
virtual void putBuild(const epics::pvData::StructureConstPtr& build, Args& args) OVERRIDE FINAL
{
pvd::PVStructurePtr root(pvd::getPVDataCreate()->createPVStructure(build));
pvd::PVScalarPtr value(root->getSubField<pvd::PVScalar>("value"));
@ -157,7 +157,7 @@ struct PutValArray : public PutValCommon
PutValArray(const pvd::shared_vector<const void>& arr) :arr(arr) {}
virtual ~PutValArray() {}
virtual void putBuild(const epics::pvData::StructureConstPtr& build, Args& args)
virtual void putBuild(const epics::pvData::StructureConstPtr& build, Args& args) OVERRIDE FINAL
{
pvd::PVStructurePtr root(pvd::getPVDataCreate()->createPVStructure(build));
pvd::PVScalarArrayPtr value(root->getSubField<pvd::PVScalarArray>("value"));
@ -177,7 +177,7 @@ void
ClientChannel::putValue(const void* value,
pvd::ScalarType vtype,
double timeout,
pvd::PVStructure::const_shared_pointer pvRequest)
const epics::pvData::PVStructure::const_shared_pointer &pvRequest)
{
PutValScalar waiter(value, vtype);
Operation op(put(&waiter, pvRequest));
@ -191,7 +191,7 @@ ClientChannel::putValue(const void* value,
void
ClientChannel::putValue(const epics::pvData::shared_vector<const void>& value,
double timeout,
epics::pvData::PVStructure::const_shared_pointer pvRequest)
const epics::pvData::PVStructure::const_shared_pointer &pvRequest)
{
PutValArray waiter(value);
Operation op(put(&waiter, pvRequest));
@ -202,5 +202,102 @@ ClientChannel::putValue(const epics::pvData::shared_vector<const void>& value,
throw std::runtime_error(waiter.result.message);
}
struct MonitorSync::SImpl : public ClientChannel::MonitorCallback
{
const bool ourevent;
epicsEvent * const event;
epicsMutex mutex;
bool hadevent;
MonitorEvent last;
// maintained to ensure we (MonitorCallback) outlive the subscription
Monitor sub;
SImpl(epicsEvent *event)
:ourevent(!event)
,event(ourevent ? new epicsEvent : event)
{}
virtual ~SImpl()
{
sub.cancel();
if(ourevent)
delete event;
}
virtual void monitorEvent(const MonitorEvent& evt) OVERRIDE FINAL
{
{
Guard G(mutex);
last = evt;
hadevent = true;
}
event->signal();
}
};
MonitorSync::MonitorSync(const Monitor& mon, const std::tr1::shared_ptr<SImpl>& simpl)
:Monitor(mon.impl)
,simpl(simpl)
{
simpl->sub = mon;
event.event = MonitorEvent::Fail;
}
MonitorSync::~MonitorSync() {
std::cout<<"SYNC use_count="<<simpl.use_count()<<"\n";
}
bool MonitorSync::poll()
{
if(!simpl) throw std::logic_error("No subscription");
Guard G(simpl->mutex);
event = simpl->last;
simpl->last.event = MonitorEvent::Fail;
bool ret = simpl->hadevent;
simpl->hadevent = false;
return ret;
}
bool MonitorSync::wait()
{
if(!simpl) throw std::logic_error("No subscription");
simpl->event->wait();
Guard G(simpl->mutex);
event = simpl->last;
simpl->last.event = MonitorEvent::Fail;
bool ret = simpl->hadevent;
simpl->hadevent = false;
return ret;
}
bool MonitorSync::wait(double timeout)
{
if(!simpl) throw std::logic_error("No subscription");
bool ret = simpl->event->wait(timeout);
if(ret) {
Guard G(simpl->mutex);
event = simpl->last;
simpl->last.event = MonitorEvent::Fail;
ret = simpl->hadevent;
simpl->hadevent = false;
}
return ret;
}
void MonitorSync::wake() {
if(simpl) simpl->event->signal();
}
MonitorSync
ClientChannel::monitor(const epics::pvData::PVStructure::const_shared_pointer &pvRequest,
epicsEvent *event)
{
std::tr1::shared_ptr<MonitorSync::SImpl> simpl(new MonitorSync::SImpl(event));
Monitor mon(monitor(simpl.get(), pvRequest));
return MonitorSync(mon, simpl);
}
}//namespace pvac

View File

@ -12,6 +12,8 @@
#include <pv/pvData.h>
#include <pv/bitSet.h>
class epicsEvent;
namespace epics {namespace pvAccess {
class ChannelProvider;
class Channel;
@ -66,7 +68,7 @@ protected:
};
//! Information on put completion
struct epicsShareClass PutEvent
struct PutEvent
{
enum event_t {
Fail, //!< request ends in failure. Check message
@ -84,6 +86,8 @@ struct epicsShareClass GetEvent : public PutEvent
epics::pvData::PVStructure::const_shared_pointer value;
};
struct MonitorSync;
//! Handle for monitor subscription
struct epicsShareClass Monitor
{
@ -97,21 +101,26 @@ struct epicsShareClass Monitor
//! Immediate cancellation.
//! Does not wait for remote confirmation.
void cancel();
//! updates root, changed, overrun
//! return true if root!=NULL
/** updates root, changed, overrun
*
* @return true if root!=NULL
* @note MonitorEvent::Data will not be repeated until poll()==false.
*/
bool poll();
//! true if all events received.
//! Check after poll()==false
bool complete() const;
epics::pvData::PVStructure::const_shared_pointer root;
epics::pvData::BitSet changed,
overrun;
protected:
private:
std::tr1::shared_ptr<Impl> impl;
friend struct MonitorSync;
};
//! Information on monitor subscription/queue change
struct epicsShareClass MonitorEvent
struct MonitorEvent
{
enum event_t {
Fail=1, //!< subscription ends in an error
@ -120,17 +129,49 @@ struct epicsShareClass MonitorEvent
Data=8, //!< Data queue not empty. Call Monitor::poll()
} event;
std::string message; // set for event=Fail
void *priv;
};
/** Subscription usable w/o callbacks
*
* Basic usage is to call wait().
* If true is returned, then the 'event', 'root', 'changed', and 'overrun'
* members have been updated with a new event.
* Test 'event.event' first to find out which kind of event has occured.
*/
struct epicsShareClass MonitorSync : public Monitor
{
struct SImpl;
MonitorSync() {}
MonitorSync(const Monitor&, const std::tr1::shared_ptr<SImpl>&);
~MonitorSync();
//! wait for new event
bool wait();
//! wait for new event
//! @return false on timeout
bool wait(double timeout);
//! check if new event is available
bool poll();
//! Abort one call to wait()
//! wait() will return with MonitorEvent::Fail
void wake();
//! most recent event
//! updated only during wait() or poll()
MonitorEvent event;
private:
std::tr1::shared_ptr<SImpl> simpl;
};
//! information on connect/disconnect
struct epicsShareClass ConnectEvent
struct ConnectEvent
{
bool connected;
};
//! Thrown by blocking methods of ClientChannel on operation timeout
struct epicsShareClass Timeout : public std::runtime_error
struct Timeout : public std::runtime_error
{
Timeout();
};
@ -177,7 +218,7 @@ public:
std::string name() const;
//! callback for get() and rpc()
struct epicsShareClass GetCallback {
struct GetCallback {
virtual ~GetCallback() {}
//! get or rpc operation is complete
virtual void getDone(const GetEvent& evt)=0;
@ -216,7 +257,7 @@ public:
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer());
//! callbacks for put()
struct epicsShareClass PutCallback {
struct PutCallback {
virtual ~PutCallback() {}
struct Args {
Args(epics::pvData::BitSet& tosend) :tosend(tosend) {}
@ -248,23 +289,23 @@ public:
double timeout = 3.0,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer())
{
putValue(&value, ID, timeout, pvRequest);
putValue(static_cast<const void*>(&value), ID, timeout, pvRequest);
}
//! Put to the 'value' field and block until complete.
//! Accepts untyped scalar value
void putValue(const void* value, epics::pvData::ScalarType vtype,
double timeout,
epics::pvData::PVStructure::const_shared_pointer pvRequest);
double timeout = 3.0,
const epics::pvData::PVStructure::const_shared_pointer& pvRequest = epics::pvData::PVStructure::const_shared_pointer());
//! Put to the 'value' field and block until complete.
//! Accepts scalar array
void putValue(const epics::pvData::shared_vector<const void>& value,
double timeout,
epics::pvData::PVStructure::const_shared_pointer pvRequest);
double timeout = 3.0,
const epics::pvData::PVStructure::const_shared_pointer& pvRequest = epics::pvData::PVStructure::const_shared_pointer());
//! Monitor event notification
struct epicsShareClass MonitorCallback {
struct MonitorCallback {
virtual ~MonitorCallback() {}
/** New monitor event
*
@ -281,8 +322,20 @@ public:
Monitor monitor(MonitorCallback *cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer());
/** Begin subscription w/o callbacks
*
* @param event If not NULL, then subscription events are signaled to this epicsEvent. Test with poll().
* Otherwise an internal epicsEvent is allocated for use with wait()
*
* @note For simple usage with a single MonitorSync, pass event=NULL and call wait().
* If more than one MonitorSync is being created, then pass a custom epicsEvent and use poll() to test
* which subscriptions have events pending.
*/
MonitorSync monitor(const epics::pvData::PVStructure::const_shared_pointer& pvRequest = epics::pvData::PVStructure::const_shared_pointer(),
epicsEvent *event =0);
//! Connection state change CB
struct epicsShareClass ConnectCallback {
struct ConnectCallback {
virtual ~ConnectCallback() {}
virtual void connectEvent(const ConnectEvent& evt)=0;
};