start on pvaTestClient

This commit is contained in:
Michael Davidsaver
2017-07-07 13:42:45 +02:00
parent 525b950f62
commit bc2dccf95c
7 changed files with 903 additions and 0 deletions

View File

@@ -23,6 +23,7 @@ include $(PVACCESS_SRC)/rpcService/Makefile
include $(PVACCESS_SRC)/rpcClient/Makefile
include $(PVACCESS_SRC)/pipelineService/Makefile
include $(PVACCESS_SRC)/ca/Makefile
include $(PVACCESS_SRC)/testing/Makefile
include $(PVACCESS_SRC)/mb/Makefile
LIBRARY = pvAccess

10
src/testing/Makefile Normal file
View File

@@ -0,0 +1,10 @@
# This is a Makefile fragment, see ../Makefile
SRC_DIRS += $(PVACCESS_SRC)/testing
INC += pv/pvaTestClient.h
LIBSRCS += pvaTestClient.cpp
LIBSRCS += pvaTestClientGet.cpp
LIBSRCS += pvaTestClientRPC.cpp
LIBSRCS += pvaTestClientMonitor.cpp

View File

@@ -0,0 +1,164 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#ifndef PVATESTCLIENT_H
#define PVATESTCLIENT_H
#include <epicsMutex.h>
#include <pv/pvData.h>
#include <pv/bitSet.h>
namespace epics {namespace pvAccess {
class ChannelProvider;
class Channel;
class Monitor;
class Configuration;
}}//namespace epics::pvAccess
struct epicsShareClass TestOperation
{
struct Impl
{
virtual ~Impl() {}
virtual std::string name() const =0;
virtual void cancel() =0;
};
TestOperation() {}
TestOperation(const std::tr1::shared_ptr<Impl>&);
~TestOperation();
std::string name() const;
void cancel();
protected:
std::tr1::shared_ptr<Impl> impl;
};
struct epicsShareClass TestPutEvent
{
enum event_t {
Fail,
Cancel,
Success,
} event;
std::string message;
void *priv;
};
struct epicsShareClass TestGetEvent : public TestPutEvent
{
epics::pvData::PVStructure::const_shared_pointer value;
};
struct epicsShareClass TestMonitor
{
struct Impl;
TestMonitor() {}
TestMonitor(const std::tr1::shared_ptr<Impl>&);
~TestMonitor();
std::string name() const;
void cancel();
//! updates root, changed, overrun
//! return true if root!=NULL
bool poll();
//! true if all events received.
bool complete() const;
epics::pvData::PVStructure::const_shared_pointer root;
epics::pvData::BitSet changed,
overrun;
protected:
std::tr1::shared_ptr<Impl> impl;
};
struct epicsShareClass TestMonitorEvent
{
enum event_t {
Fail=1, // subscription ends in an error
Cancel=2, // subscription ends in cancellation
Disconnect=4,// subscription interrupted to do lose of communication
Data=8, // Data queue not empty
} event;
std::string message; // set for event=Fail
void *priv;
};
struct epicsShareClass TestConnectEvent
{
bool connected;
};
class epicsShareClass TestClientChannel
{
struct Impl;
std::tr1::shared_ptr<Impl> impl;
public:
struct Options {
short priority;
std::string address;
Options();
};
TestClientChannel() {}
TestClientChannel(const std::tr1::shared_ptr<epics::pvAccess::ChannelProvider>& provider,
const std::string& name,
const Options& opt = Options());
~TestClientChannel();
struct epicsShareClass GetCallback {
virtual ~GetCallback() {}
virtual void getDone(const TestGetEvent& evt)=0;
};
TestOperation get(GetCallback* cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer());
TestOperation rpc(GetCallback* cb,
const epics::pvData::PVStructure::const_shared_pointer& arguments,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer());
struct epicsShareClass PutCallback {
virtual ~PutCallback() {}
virtual epics::pvData::PVStructure::const_shared_pointer putBuild(const epics::pvData::StructureConstPtr& build) =0;
virtual void putDone(const TestPutEvent& evt)=0;
};
TestOperation put(PutCallback* cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer());
struct epicsShareClass MonitorCallback {
virtual ~MonitorCallback() {}
virtual void monitorEvent(const TestMonitorEvent& evt)=0;
};
TestMonitor monitor(MonitorCallback *cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer());
struct epicsShareClass ConnectCallback {
virtual ~ConnectCallback() {}
virtual void connectEvent(const TestConnectEvent& evt)=0;
};
void addConnectListener(ConnectCallback*);
void removeConnectListener(ConnectCallback*);
private:
std::tr1::shared_ptr<epics::pvAccess::Channel> getChannel();
};
class epicsShareClass TestClientProvider
{
std::tr1::shared_ptr<epics::pvAccess::ChannelProvider> provider;
public:
TestClientProvider(const std::string& providerName,
const std::tr1::shared_ptr<epics::pvAccess::Configuration>& conf = std::tr1::shared_ptr<epics::pvAccess::Configuration>());
~TestClientProvider();
TestClientChannel connect(const std::string& name,
const TestClientChannel::Options& conf = TestClientChannel::Options());
};
#endif // PVATESTCLIENT_H

View File

@@ -0,0 +1,159 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <pv/pvData.h>
#include <pv/bitSet.h>
#define epicsExportSharedSymbols
#include "pv/logger.h"
#include "pv/pvaTestClient.h"
#include "pv/pvAccess.h"
#include "pv/configuration.h"
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
typedef epicsGuard<epicsMutex> Guard;
struct TestClientChannel::Impl : public pva::ChannelRequester
{
epicsMutex mutex;
pva::Channel::shared_pointer channel;
// assume few listeners per channel, store in vector
typedef std::vector<TestClientChannel::ConnectCallback*> listeners_t;
listeners_t listeners;
virtual ~Impl() {}
virtual std::string getRequesterName() OVERRIDE FINAL { return "TestClientChannel::Impl"; }
virtual void channelCreated(const pvd::Status& status, pva::Channel::shared_pointer const & channel) OVERRIDE FINAL {}
virtual void channelStateChange(pva::Channel::shared_pointer const & channel, pva::Channel::ConnectionState connectionState) OVERRIDE FINAL
{
listeners_t notify;
{
Guard G(mutex);
notify = listeners;
}
TestConnectEvent evt;
evt.connected = connectionState==pva::Channel::CONNECTED;
for(listeners_t::const_iterator it=notify.begin(), end=notify.end(); it!=end; ++it)
{
try {
(*it)->connectEvent(evt);
}catch(std::exception& e){
LOG(pva::logLevelError, "Unhandled exception in connection state listener: %s\n", e.what());
Guard G(mutex);
for(listeners_t::iterator it2=listeners.begin(), end2=listeners.end(); it2!=end2; ++it2) {
if(*it==*it2) {
listeners.erase(it2);
break;
}
}
}
}
}
};
TestClientChannel::Options::Options()
:priority(0)
,address()
{}
TestOperation::TestOperation(const std::tr1::shared_ptr<Impl>& i)
:impl(i)
{}
TestOperation::~TestOperation() {}
std::string TestOperation::name() const
{
return impl ? impl->name() : "<NULL>";
}
void TestOperation::cancel()
{
if(impl) impl->cancel();
}
TestClientChannel::TestClientChannel(const std::tr1::shared_ptr<pva::ChannelProvider>& provider,
const std::string& name,
const Options& opt)
:impl(new Impl)
{
if(!provider)
throw std::logic_error("NULL ChannelProvider");
impl->channel = provider->createChannel(name, impl, opt.priority, opt.address);
if(!impl->channel)
throw std::logic_error("ChannelProvider failed to create Channel");
}
TestClientChannel::~TestClientChannel() {}
void TestClientChannel::addConnectListener(ConnectCallback* cb)
{
if(!impl) throw std::logic_error("Dead Channel");
TestConnectEvent evt;
{
Guard G(impl->mutex);
for(Impl::listeners_t::const_iterator it=impl->listeners.begin(), end=impl->listeners.end(); it!=end; ++it)
{
if(cb==*it) return; // no duplicates
}
impl->listeners.push_back(cb);
evt.connected = impl->channel->isConnected();
}
try{
cb->connectEvent(evt);
}catch(...){
removeConnectListener(cb);
throw;
}
}
void TestClientChannel::removeConnectListener(ConnectCallback* cb)
{
if(!impl) throw std::logic_error("Dead Channel");
Guard G(impl->mutex);
for(Impl::listeners_t::iterator it=impl->listeners.begin(), end=impl->listeners.end(); it!=end; ++it)
{
if(cb==*it) {
impl->listeners.erase(it);
return;
}
}
}
std::tr1::shared_ptr<epics::pvAccess::Channel>
TestClientChannel::getChannel()
{ return impl->channel; }
TestClientProvider::TestClientProvider(const std::string& providerName,
const std::tr1::shared_ptr<epics::pvAccess::Configuration>& conf)
:provider(pva::ChannelProviderRegistry::clients()->createProvider(providerName,
conf ? conf : pva::ConfigurationBuilder()
.push_env()
.build()))
{
if(!provider)
THROW_EXCEPTION2(std::invalid_argument, providerName);
}
TestClientProvider::~TestClientProvider() {}
TestClientChannel
TestClientProvider::connect(const std::string& name,
const TestClientChannel::Options& conf)
{
return TestClientChannel(provider, name, conf);
}

View File

@@ -0,0 +1,205 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <pv/current_function.h>
#include <pv/pvData.h>
#include <pv/bitSet.h>
#define epicsExportSharedSymbols
#include "pv/logger.h"
#include "pv/pvaTestClient.h"
#include "pv/pvAccess.h"
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
namespace {
struct GetPutter : public pva::ChannelPutRequester,
public TestOperation::Impl
{
mutable epicsMutex mutex;
bool started;
operation_type::shared_pointer op;
TestClientChannel::GetCallback *getcb;
TestClientChannel::PutCallback *putcb;
TestGetEvent event;
GetPutter(TestClientChannel::GetCallback* cb) :started(false), getcb(cb), putcb(0) {}
GetPutter(TestClientChannel::PutCallback* cb) :started(false), getcb(0), putcb(cb) {}
virtual ~GetPutter() {}
void callEvent(Guard& G, TestGetEvent::event_t evt = TestGetEvent::Fail)
{
if(!putcb && !getcb) return;
event.event = evt;
if(putcb) {
TestClientChannel::PutCallback *cb=putcb;
putcb = 0;
UnGuard U(G);
cb->putDone(event);
}
if(getcb) {
TestClientChannel::GetCallback *cb=getcb;
getcb = 0;
UnGuard U(G);
cb->getDone(event);
}
}
virtual std::string name() const OVERRIDE FINAL
{
Guard G(mutex);
return op ? op->getChannel()->getChannelName() : "<dead>";
}
virtual void cancel() OVERRIDE FINAL
{
Guard G(mutex);
if(started && op) op->cancel();
callEvent(G, TestGetEvent::Cancel);
}
virtual std::string getRequesterName() OVERRIDE FINAL
{ return "GetPutter"; }
virtual void channelPutConnect(
const epics::pvData::Status& status,
pva::ChannelPut::shared_pointer const & channelPut,
epics::pvData::Structure::const_shared_pointer const & structure) OVERRIDE FINAL
{
Guard G(mutex);
if(started) return;
if(!putcb && !getcb) return;
if(!status.isOK()) {
event.message = status.getMessage();
} else {
event.message.clear();
}
if(!status.isSuccess()) {
callEvent(G);
} else if(getcb){
channelPut->lastRequest();
channelPut->get();
started = true;
} else if(putcb){
TestClientChannel::PutCallback *cb(putcb);
pvd::PVStructure::const_shared_pointer val;
try {
UnGuard U(G);
val = cb->putBuild(structure);
}catch(std::exception& e){
if(putcb) {
event.message = e.what();
callEvent(G);
} else {
LOG(pva::logLevelInfo, "Lost exception in %s: %s", CURRENT_FUNCTION, e.what());
}
}
if(putcb) {
pvd::BitSet::shared_pointer all(new pvd::BitSet);
all->set(0);
channelPut->lastRequest();
channelPut->put(std::tr1::const_pointer_cast<pvd::PVStructure>(val), all);
started = true;
}
}
}
virtual void channelDisconnect(bool destroy) OVERRIDE FINAL
{
Guard G(mutex);
event.message = "Disconnect";
callEvent(G);
}
virtual void putDone(
const epics::pvData::Status& status,
pva::ChannelPut::shared_pointer const & channelPut) OVERRIDE FINAL
{
Guard G(mutex);
if(!getcb) return;
if(!status.isOK()) {
event.message = status.getMessage();
} else {
event.message.clear();
}
callEvent(G, status.isSuccess()? TestGetEvent::Success : TestGetEvent::Fail);
}
virtual void getDone(
const epics::pvData::Status& status,
pva::ChannelPut::shared_pointer const & channelPut,
epics::pvData::PVStructure::shared_pointer const & pvStructure,
epics::pvData::BitSet::shared_pointer const & bitSet) OVERRIDE FINAL
{
Guard G(mutex);
if(!getcb) return;
if(!status.isOK()) {
event.message = status.getMessage();
} else {
event.message.clear();
}
event.value = pvStructure;
// assume bitSet->get(0)==true as we only make one request
callEvent(G, status.isSuccess()? TestGetEvent::Success : TestGetEvent::Fail);
}
};
} //namespace
TestOperation
TestClientChannel::get(TestClientChannel::GetCallback* cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest)
{
if(!impl) throw std::logic_error("Dead Channel");
if(!pvRequest)
pvRequest = pvd::createRequest("field()");
std::tr1::shared_ptr<GetPutter> ret(new GetPutter(cb));
{
Guard G(ret->mutex);
ret->op = getChannel()->createChannelPut(ret, std::tr1::const_pointer_cast<pvd::PVStructure>(pvRequest));
}
return TestOperation(ret);
}
TestOperation
TestClientChannel::put(PutCallback* cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest)
{
if(!impl) throw std::logic_error("Dead Channel");
if(!pvRequest)
pvRequest = pvd::createRequest("field()");
std::tr1::shared_ptr<GetPutter> ret(new GetPutter(cb));
{
Guard G(ret->mutex);
ret->op = getChannel()->createChannelPut(ret, std::tr1::const_pointer_cast<pvd::PVStructure>(pvRequest));
}
return TestOperation(ret);
}

View File

@@ -0,0 +1,209 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <pv/current_function.h>
#include <pv/pvData.h>
#include <pv/bitSet.h>
#define epicsExportSharedSymbols
#include "pv/logger.h"
#include "pv/pvaTestClient.h"
#include "pv/pvAccess.h"
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
struct TestMonitor::Impl : public pva::MonitorRequester
{
mutable epicsMutex mutex;
pva::Channel::shared_pointer chan;
operation_type::shared_pointer op;
bool started, done, seenEmpty;
TestClientChannel::MonitorCallback *cb;
TestMonitorEvent event;
pva::MonitorElement::Ref last;
Impl(TestClientChannel::MonitorCallback* cb)
:started(false)
,done(false)
,seenEmpty(false)
,cb(cb)
{}
virtual ~Impl() {}
void callEvent(Guard& G, TestMonitorEvent::event_t evt = TestMonitorEvent::Fail)
{
TestClientChannel::MonitorCallback *cb=this->cb;
if(!cb) return;
event.event = evt;
if(evt==TestMonitorEvent::Fail || evt==TestMonitorEvent::Cancel)
this->cb = 0; // last event
try {
UnGuard U(G);
cb->monitorEvent(event);
return;
}catch(std::exception& e){
if(!this->cb || evt==TestMonitorEvent::Fail) {
LOG(pva::logLevelError, "Unhandled exception in TestClientChannel::MonitorCallback::monitorEvent(): %s", e.what());
} else {
event.event = TestMonitorEvent::Fail;
event.message = e.what();
}
}
// continues error handling
try {
UnGuard U(G);
cb->monitorEvent(event);
return;
}catch(std::exception& e){
LOG(pva::logLevelError, "Unhandled exception following exception in TestClientChannel::MonitorCallback::monitorEvent(): %s", e.what());
}
}
virtual std::string getRequesterName() OVERRIDE FINAL
{ return "RPCer"; }
virtual void monitorConnect(pvd::Status const & status,
pva::MonitorPtr const & operation,
pvd::StructureConstPtr const & structure) OVERRIDE FINAL
{
Guard G(mutex);
if(!cb || started || done) return;
if(!status.isOK()) {
event.message = status.getMessage();
} else {
event.message.clear();
}
if(!status.isSuccess()) {
callEvent(G);
} else {
pvd::Status sts(operation->start());
if(sts.isSuccess()) {
started = true;
last.attach(operation);
} else {
event.message = sts.getMessage();
callEvent(G);
}
}
}
virtual void channelDisconnect(bool destroy) OVERRIDE FINAL
{
Guard G(mutex);
if(!cb || done) return;
event.message = "Disconnect";
started = false;
callEvent(G, TestMonitorEvent::Disconnect);
}
virtual void monitorEvent(pva::MonitorPtr const & monitor) OVERRIDE FINAL
{
Guard G(mutex);
if(!cb || done) return;
event.message.clear();
callEvent(G, TestMonitorEvent::Data);
}
virtual void unlisten(pva::MonitorPtr const & monitor) OVERRIDE FINAL
{
Guard G(mutex);
if(!cb || done) return;
done = true;
if(seenEmpty)
callEvent(G, TestMonitorEvent::Data);
// else // wait until final poll()
}
};
TestMonitor::TestMonitor(const std::tr1::shared_ptr<Impl>& impl)
:impl(impl)
{}
TestMonitor::~TestMonitor() {}
std::string TestMonitor::name() const
{
return impl ? impl->chan->getChannelName() : "<NULL>";
}
void TestMonitor::cancel()
{
if(!impl) return;
Guard G(impl->mutex);
root.reset();
changed.clear();
overrun.clear();
impl->last.reset();
if(impl->started) {
impl->op->stop();
impl->started = false;
}
impl->op->destroy();
impl->callEvent(G, TestMonitorEvent::Cancel);
}
bool TestMonitor::poll()
{
if(!impl) return false;
Guard G(impl->mutex);
if(!impl->done && impl->last.next()) {
root = impl->last->pvStructurePtr;
changed = *impl->last->changedBitSet;
overrun = *impl->last->overrunBitSet;
} else {
root.reset();
changed.clear();
overrun.clear();
}
return impl->seenEmpty = !!root;
}
bool TestMonitor::complete() const
{
if(impl) return true;
Guard G(impl->mutex);
return impl->done && impl->seenEmpty;
}
TestMonitor
TestClientChannel::monitor(MonitorCallback *cb,
epics::pvData::PVStructure::const_shared_pointer pvRequest)
{
if(!impl) throw std::logic_error("Dead Channel");
if(!pvRequest)
pvRequest = pvd::createRequest("field()");
std::tr1::shared_ptr<TestMonitor::Impl> ret(new TestMonitor::Impl(cb));
ret->chan = getChannel();
{
Guard G(ret->mutex);
ret->op = ret->chan->createMonitor(ret, std::tr1::const_pointer_cast<pvd::PVStructure>(pvRequest));
}
return TestMonitor(ret);
}

View File

@@ -0,0 +1,155 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <pv/pvData.h>
#include <pv/bitSet.h>
#define epicsExportSharedSymbols
#include "pv/logger.h"
#include "pv/pvaTestClient.h"
#include "pv/pvAccess.h"
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
namespace {
struct RPCer : public pva::ChannelRPCRequester,
public TestOperation::Impl
{
mutable epicsMutex mutex;
bool started;
operation_type::shared_pointer op;
TestClientChannel::GetCallback *cb;
TestGetEvent event;
pvd::PVStructure::const_shared_pointer args;
RPCer(TestClientChannel::GetCallback* cb,
const pvd::PVStructure::const_shared_pointer& args) :started(false), cb(cb), args(args) {}
virtual ~RPCer() {}
void callEvent(Guard& G, TestGetEvent::event_t evt = TestGetEvent::Fail)
{
TestClientChannel::GetCallback *cb=this->cb;
if(!cb) return;
event.event = evt;
this->cb = 0;
try {
UnGuard U(G);
cb->getDone(event);
return;
}catch(std::exception& e){
if(!this->cb || evt==TestGetEvent::Fail) {
LOG(pva::logLevelError, "Unhandled exception in TestClientChannel::GetCallback::getDone(): %s", e.what());
} else {
event.event = TestGetEvent::Fail;
event.message = e.what();
}
}
// continues error handling
try {
UnGuard U(G);
cb->getDone(event);
return;
}catch(std::exception& e){
LOG(pva::logLevelError, "Unhandled exception following exception in TestClientChannel::GetCallback::monitorEvent(): %s", e.what());
}
}
virtual std::string name() const OVERRIDE FINAL
{
Guard G(mutex);
return op ? op->getChannel()->getChannelName() : "<dead>";
}
virtual void cancel()
{
Guard G(mutex);
if(started && op) op->cancel();
callEvent(G, TestGetEvent::Cancel);
}
virtual std::string getRequesterName() OVERRIDE FINAL
{ return "RPCer"; }
virtual void channelRPCConnect(
const epics::pvData::Status& status,
pva::ChannelRPC::shared_pointer const & operation)
{
Guard G(mutex);
if(!cb || started) return;
if(!status.isOK()) {
event.message = status.getMessage();
} else {
event.message.clear();
}
if(!status.isSuccess()) {
callEvent(G);
} else {
operation->request(std::tr1::const_pointer_cast<pvd::PVStructure>(args));
started = true;
}
}
virtual void channelDisconnect(bool destroy) OVERRIDE FINAL
{
Guard G(mutex);
event.message = "Disconnect";
callEvent(G);
}
virtual void requestDone(
const epics::pvData::Status& status,
pva::ChannelRPC::shared_pointer const & operation,
epics::pvData::PVStructure::shared_pointer const & pvResponse)
{
Guard G(mutex);
if(!cb) return;
if(!status.isOK()) {
event.message = status.getMessage();
} else {
event.message.clear();
}
event.value = pvResponse;
callEvent(G, status.isSuccess()? TestGetEvent::Success : TestGetEvent::Fail);
}
};
}//namespace
TestOperation
TestClientChannel::rpc(GetCallback* cb,
const epics::pvData::PVStructure::const_shared_pointer& arguments,
epics::pvData::PVStructure::const_shared_pointer pvRequest)
{
if(!impl) throw std::logic_error("Dead Channel");
if(!pvRequest)
pvRequest = pvd::createRequest("field()");
std::tr1::shared_ptr<RPCer> ret(new RPCer(cb, arguments));
{
Guard G(ret->mutex);
ret->op = getChannel()->createChannelRPC(ret, std::tr1::const_pointer_cast<pvd::PVStructure>(pvRequest));
}
return TestOperation(ret);
}