add support for channelRPC

This commit is contained in:
mrkraimer
2016-06-22 11:33:39 -04:00
parent a7fb12a16f
commit 5398d67e2a
4 changed files with 449 additions and 1 deletions

View File

@@ -29,6 +29,7 @@ LIBSRCS += pvaClientNTMultiPut.cpp
LIBSRCS += pvaClientNTMultiData.cpp
LIBSRCS += pvaClientNTMultiGet.cpp
LIBSRCS += pvaClientNTMultiMonitor.cpp
LIBSRCS += pvaClientRPC.cpp
pvaClient_LIBS += pvAccess pvData nt Com
pvaClient_LIBS += $(EPICS_BASE_IOC_LIBS)

View File

@@ -72,7 +72,11 @@ typedef std::tr1::shared_ptr<PvaClientMonitorRequester> PvaClientMonitorRequeste
typedef std::tr1::weak_ptr<PvaClientMonitorRequester> PvaClientMonitorRequesterWPtr;
class PvaClientArray;
typedef std::tr1::shared_ptr<PvaClientArray> PvaClientArrayPtr;
class PvaClientRPC;
typedef std::tr1::shared_ptr<PvaClientRPC> PvaClientRPCPtr;
class PvaClientRPCRequester;
typedef std::tr1::shared_ptr<PvaClientRPCRequester> PvaClientRPCRequesterPtr;
typedef std::tr1::weak_ptr<PvaClientRPCRequester> PvaClientRPCRequesterWPtr;
// following are private to pvaClient
class PvaClientChannelCache;
@@ -347,6 +351,29 @@ public:
* @throw runtime_error if failure.
*/
PvaClientMonitorPtr createMonitor(epics::pvData::PVStructurePtr const & pvRequest);
/** Issue a channelRPC request
* @param pvRequest The pvRequest that is passed to createRPC.
* @param pvArgument The argument for a request.
* @return The result.
* @throw runtime_error if failure.
*/
epics::pvData::PVStructurePtr rpc(
epics::pvData::PVStructurePtr const & pvRequest,
epics::pvData::PVStructurePtr const & pvArgument);
/** Issue a channelRPC request
* @param pvArgument The argument for the request.
* @return The result.
* @throw runtime_error if failure.
*/
epics::pvData::PVStructurePtr rpc(
epics::pvData::PVStructurePtr const & pvArgument);
/** Create an PvaClientRPC.
* @param pvRequest The pvRequest that must have the same interface
* as a pvArgument that is passed to an rpcq request.
* @return The interface.
* @throw runtime_error if failure.
*/
PvaClientRPCPtr createRPC(epics::pvData::PVStructurePtr const & pvRequest);
/** Show the list of cached gets and puts.
*/
void showCache();
@@ -1402,6 +1429,122 @@ private:
friend class MonitorRequesterImpl;
};
/**
* @brief Optional client callback.
*
*/
class epicsShareClass PvaClientRPCRequester
{
public:
POINTER_DEFINITIONS(PvaClientRPCRequester);
virtual ~PvaClientRPCRequester() {}
/**
* The request is done. This is always called with no locks held.
* @param status Completion status.
* @param pvaClientRPC The pvaClientRPC interface.
* @param pvResponse The response data for the RPC request or <code>null</code> if the request failed.
*/
virtual void requestDone(
const epics::pvData::Status& status,
PvaClientRPCPtr const & pvaClientRPC,
epics::pvData::PVStructure::shared_pointer const & pvResponse) = 0;
};
// NOTE: must use separate class that implements RPCRequester,
// because pvAccess holds a shared_ptr to RPCRequester instead of weak_pointer
class epicsShareClass RPCRequesterImpl;
typedef std::tr1::shared_ptr<RPCRequesterImpl> RPCRequesterImplPtr;
/**
* @brief An easy to use alternative to RPC.
*
*/
class epicsShareClass PvaClientRPC :
public std::tr1::enable_shared_from_this<PvaClientRPC>
{
public:
POINTER_DEFINITIONS(PvaClientRPC);
/** Create a PvaClientRPC.
* @param &pvaClient Interface to PvaClient
* @param channel Interface to Channel
* @param pvRequest The request structure.
* @return The interface to the PvaClientRPC.
*/
static PvaClientRPCPtr create(
PvaClientPtr const &pvaClient,
epics::pvAccess::Channel::shared_pointer const & channel,
epics::pvData::PVStructurePtr const &pvRequest
);
/** Destructor
*/
~PvaClientRPC();
/** Call issueConnect and then waitConnect.
* An exception is thrown if connect fails.
*/
void connect();
/** Issue the channelRPC connection to the channel.
* This can only be called once.
* An exception is thrown if connect fails.
* @throw runtime_error if failure.
*/
void issueConnect();
/** Wait until the channelRPC connection to the channel is complete.
* @return status;
*/
epics::pvData::Status waitConnect();
/** issue a request and wait for response
* @param pvArgument The data to send to the service.
* @return The result
* @throw runtime_error if failure.
*/
epics::pvData::PVStructure::shared_pointer request(
epics::pvData::PVStructure::shared_pointer const & pvArgument);
/** issue a request and return immediately.
* @param pvArgument The data to send to the service.
* @param pvaClientRPCRequester The requester that is called with the result.
* @throw runtime_error if failure.
*/
void request(
epics::pvData::PVStructure::shared_pointer const & pvArgument,
PvaClientRPCRequesterPtr const & pvaClientRPCRequester);
private:
PvaClientRPC(
PvaClientPtr const &pvaClient,
epics::pvAccess::Channel::shared_pointer const & channel,
epics::pvData::PVStructurePtr const &pvRequest);
virtual std::string getRequesterName();
virtual void message(std::string const & message,epics::pvData::MessageType messageType);
virtual void rpcConnect(
const epics::pvData::Status& status,
epics::pvAccess::ChannelRPC::shared_pointer const & channelRPC);
virtual void requestDone(
const epics::pvData::Status& status,
epics::pvAccess::ChannelRPC::shared_pointer const & channelRPC,
epics::pvData::PVStructure::shared_pointer const & pvResponse);
void checkRPCState();
enum RPCConnectState {connectIdle,connectActive,connected};
bool isDestroyed;
epics::pvData::Status connectStatus;
RPCConnectState connectState;
enum RPCState {rpcIdle,rpcActive,rpcComplete};
RPCState rpcState;
PvaClient::weak_pointer pvaClient;
epics::pvAccess::Channel::weak_pointer channel;
epics::pvData::PVStructurePtr pvRequest;
epics::pvData::PVStructurePtr pvResponse;
epics::pvData::Mutex mutex;
epics::pvData::Event waitForConnect;
epics::pvData::Event waitForDone;
PvaClientRPCRequesterWPtr pvaClientRPCRequester;
RPCRequesterImplPtr rpcRequester;
epics::pvAccess::ChannelRPC::shared_pointer channelRPC;
friend class RPCRequesterImpl;
};
}}
#endif /* PVACLIENT_H */

View File

@@ -528,6 +528,30 @@ PvaClientMonitorPtr PvaClientChannel::createMonitor(PVStructurePtr const & pvR
return PvaClientMonitor::create(yyy,channel,pvRequest);
}
PVStructurePtr PvaClientChannel::rpc(
PVStructurePtr const & pvRequest,
PVStructurePtr const & pvArgument)
{
PvaClientRPCPtr rpc = createRPC(pvRequest);
return rpc->request(pvArgument);
}
PVStructurePtr PvaClientChannel::rpc(
PVStructurePtr const & pvArgument)
{
return rpc(pvArgument,pvArgument);
}
PvaClientRPCPtr PvaClientChannel::createRPC(PVStructurePtr const & pvRequest)
{
if(connectState!=connected) connect(5.0);
PvaClientPtr yyy = pvaClient.lock();
if(!yyy) throw std::runtime_error("PvaClient was destroyed");
return PvaClientRPC::create(yyy,channel,pvRequest);
}
void PvaClientChannel::showCache()
{
if(pvaClientGetCache->cacheSize()>=1) {

280
src/pvaClientRPC.cpp Normal file
View File

@@ -0,0 +1,280 @@
/* pvaClientRPC.cpp */
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* EPICS pvData is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
/**
* @author mrk
* @date 2015.03
*/
#include <sstream>
#include <pv/event.h>
#include <pv/bitSetUtil.h>
#define epicsExportSharedSymbols
#include <pv/pvaClient.h>
using std::tr1::static_pointer_cast;
using std::tr1::dynamic_pointer_cast;
using namespace epics::pvData;
using namespace epics::pvAccess;
using namespace std;
namespace epics { namespace pvaClient {
class RPCRequesterImpl : public ChannelRPCRequester
{
PvaClientRPC::weak_pointer pvaClientRPC;
PvaClient::weak_pointer pvaClient;
public:
RPCRequesterImpl(
PvaClientRPCPtr const & pvaClientRPC,
PvaClientPtr const &pvaClient)
: pvaClientRPC(pvaClientRPC),
pvaClient(pvaClient)
{}
virtual ~RPCRequesterImpl() {
if(PvaClient::getDebug()) std::cout << "~RPCRequesterImpl" << std::endl;
}
virtual std::string getRequesterName() {
PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
if(!clientRPC) return string("pvaClientRPC is null");
return clientRPC->getRequesterName();
}
virtual void message(std::string const & message, epics::pvData::MessageType messageType) {
PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
if(!clientRPC) return;
clientRPC->message(message,messageType);
}
virtual void channelRPCConnect(
const epics::pvData::Status& status,
ChannelRPC::shared_pointer const & channelRPC)
{
PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
if(!clientRPC) return;
clientRPC->rpcConnect(status,channelRPC);
}
virtual void requestDone(
const Status& status,
ChannelRPC::shared_pointer const & channelRPC,
PVStructure::shared_pointer const & pvResponse)
{
PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
if(!clientRPC) return;
clientRPC->requestDone(status,channelRPC,pvResponse);
}
};
PvaClientRPCPtr PvaClientRPC::create(
PvaClientPtr const &pvaClient,
Channel::shared_pointer const & channel,
PVStructurePtr const &pvRequest)
{
PvaClientRPCPtr epv(new PvaClientRPC(pvaClient,channel,pvRequest));
epv->rpcRequester = RPCRequesterImplPtr(
new RPCRequesterImpl(epv,pvaClient));
return epv;
}
PvaClientRPC::PvaClientRPC(
PvaClientPtr const &pvaClient,
Channel::shared_pointer const & channel,
PVStructurePtr const &pvRequest)
: isDestroyed(false),
connectState(connectIdle),
rpcState(rpcIdle),
pvaClient(pvaClient),
channel(channel),
pvRequest(pvRequest)
{
if(PvaClient::getDebug()) {
cout<< "PvaClientRPC::PvaClientRPC()"
<< " channelName " << channel->getChannelName()
<< endl;
}
}
PvaClientRPC::~PvaClientRPC()
{
if(PvaClient::getDebug()) cout<< "PvaClientRPC::~PvaClientRPC\n";
{
Lock xx(mutex);
if(isDestroyed) throw std::runtime_error("pvaClientRPC was destroyed");
isDestroyed = true;
}
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout<< "PvaClientRPC::~PvaClientRPC"
<< " channelName " << channelName
<< endl;
}
}
void PvaClientRPC::checkRPCState()
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientRPC::checkRPCState"
<< " channelName " << channelName
<< " connectState " << connectState
<< endl;
}
if(connectState==connectIdle) connect();
}
string PvaClientRPC::getRequesterName()
{
PvaClientPtr yyy = pvaClient.lock();
if(!yyy) return string("PvaClientRPC::getRequesterName() PvaClient isDestroyed");
return yyy->getRequesterName();
}
void PvaClientRPC::message(string const & message,MessageType messageType)
{
PvaClientPtr yyy = pvaClient.lock();
if(!yyy) return;
yyy->message(message, messageType);
}
void PvaClientRPC::rpcConnect(
const Status& status,
ChannelRPC::shared_pointer const & channelRPC)
{
Channel::shared_pointer chan(channel.lock());
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientRPC::rpcConnect"
<< " channelName " << channelName
<< " status.isOK " << (status.isOK() ? "true" : "false")
<< endl;
}
if(!chan) return;
connectStatus = status;
connectState = connected;
if(PvaClient::getDebug()) {
cout << "PvaClientRPC::rpcConnect calling waitForConnect.signal\n";
}
waitForConnect.signal();
}
void PvaClientRPC::requestDone(
const Status& status,
ChannelRPC::shared_pointer const & channelRPC,
PVStructure::shared_pointer const & pvResponse)
{
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientRPC::requesyDone"
<< " channelName " << channelName
<< endl;
}
PvaClientRPCRequesterPtr req = pvaClientRPCRequester.lock();
if(req) {
req->requestDone(status,shared_from_this(),pvResponse);
return;
}
this->pvResponse = pvResponse;
waitForDone.signal();
}
void PvaClientRPC::connect()
{
if(PvaClient::getDebug()) cout << "PvaClientRPC::connect\n";
issueConnect();
Status status = waitConnect();
if(status.isOK()) return;
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ")
+ channelName
+ " PvaClientRPC::connect "
+ status.getMessage();
throw std::runtime_error(message);
}
void PvaClientRPC::issueConnect()
{
if(PvaClient::getDebug()) cout << "PvaClientRPC::issueConnect\n";
Channel::shared_pointer chan(channel.lock());
if(connectState!=connectIdle) {
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ")
+ channelName
+ " pvaClientRPC already connected ";
throw std::runtime_error(message);
}
if(chan) {
connectState = connectActive;
channelRPC = chan->createChannelRPC(rpcRequester,pvRequest);
return;
}
throw std::runtime_error("PvaClientRPC::issueConnect() but channel disconnected");
}
Status PvaClientRPC::waitConnect()
{
if(PvaClient::getDebug()) cout << "PvaClientRPC::waitConnect\n";
if(connectState==connected) {
if(!connectStatus.isOK()) connectState = connectIdle;
return connectStatus;
}
if(connectState!=connectActive) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = string("channel ")
+ channelName
+ " PvaClientRPC::waitConnect illegal connect state ";
throw std::runtime_error(message);
}
if(PvaClient::getDebug()) {
cout << "PvaClientRPC::waitConnect calling waitForConnect.wait\n";
}
waitForConnect.wait();
connectState = connectStatus.isOK() ? connected : connectIdle;
if(PvaClient::getDebug()) {
cout << "PvaClientRPC::waitConnect"
<< " connectStatus " << (connectStatus.isOK() ? "connected" : "not connected");
}
return connectStatus;
}
PVStructure::shared_pointer PvaClientRPC::request(PVStructure::shared_pointer const & pvArgument)
{
checkRPCState();
channelRPC->request(pvArgument);
waitForDone.wait();
return pvResponse;
}
void PvaClientRPC::request(
PVStructure::shared_pointer const & pvArgument,
PvaClientRPCRequesterPtr const & pvaClientRPCRequester)
{
checkRPCState();
this->pvaClientRPCRequester = pvaClientRPCRequester;
channelRPC->request(pvArgument);
}
}}