From 5398d67e2ab0ba267decb4e2ceb32f1ac54eba9c Mon Sep 17 00:00:00 2001 From: mrkraimer Date: Wed, 22 Jun 2016 11:33:39 -0400 Subject: [PATCH] add support for channelRPC --- src/Makefile | 1 + src/pv/pvaClient.h | 145 +++++++++++++++++++- src/pvaClientChannel.cpp | 24 ++++ src/pvaClientRPC.cpp | 280 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 449 insertions(+), 1 deletion(-) create mode 100644 src/pvaClientRPC.cpp diff --git a/src/Makefile b/src/Makefile index 7c05e15..d4d9092 100644 --- a/src/Makefile +++ b/src/Makefile @@ -29,6 +29,7 @@ LIBSRCS += pvaClientNTMultiPut.cpp LIBSRCS += pvaClientNTMultiData.cpp LIBSRCS += pvaClientNTMultiGet.cpp LIBSRCS += pvaClientNTMultiMonitor.cpp +LIBSRCS += pvaClientRPC.cpp pvaClient_LIBS += pvAccess pvData nt Com pvaClient_LIBS += $(EPICS_BASE_IOC_LIBS) diff --git a/src/pv/pvaClient.h b/src/pv/pvaClient.h index 117d83f..1327462 100644 --- a/src/pv/pvaClient.h +++ b/src/pv/pvaClient.h @@ -72,7 +72,11 @@ typedef std::tr1::shared_ptr PvaClientMonitorRequeste typedef std::tr1::weak_ptr PvaClientMonitorRequesterWPtr; class PvaClientArray; typedef std::tr1::shared_ptr PvaClientArrayPtr; - +class PvaClientRPC; +typedef std::tr1::shared_ptr PvaClientRPCPtr; +class PvaClientRPCRequester; +typedef std::tr1::shared_ptr PvaClientRPCRequesterPtr; +typedef std::tr1::weak_ptr PvaClientRPCRequesterWPtr; // following are private to pvaClient class PvaClientChannelCache; @@ -347,6 +351,29 @@ public: * @throw runtime_error if failure. */ PvaClientMonitorPtr createMonitor(epics::pvData::PVStructurePtr const & pvRequest); + /** Issue a channelRPC request + * @param pvRequest The pvRequest that is passed to createRPC. + * @param pvArgument The argument for a request. + * @return The result. + * @throw runtime_error if failure. + */ + epics::pvData::PVStructurePtr rpc( + epics::pvData::PVStructurePtr const & pvRequest, + epics::pvData::PVStructurePtr const & pvArgument); + /** Issue a channelRPC request + * @param pvArgument The argument for the request. + * @return The result. + * @throw runtime_error if failure. + */ + epics::pvData::PVStructurePtr rpc( + epics::pvData::PVStructurePtr const & pvArgument); + /** Create an PvaClientRPC. + * @param pvRequest The pvRequest that must have the same interface + * as a pvArgument that is passed to an rpcq request. + * @return The interface. + * @throw runtime_error if failure. + */ + PvaClientRPCPtr createRPC(epics::pvData::PVStructurePtr const & pvRequest); /** Show the list of cached gets and puts. */ void showCache(); @@ -1402,6 +1429,122 @@ private: friend class MonitorRequesterImpl; }; +/** + * @brief Optional client callback. + * + */ +class epicsShareClass PvaClientRPCRequester +{ +public: + POINTER_DEFINITIONS(PvaClientRPCRequester); + virtual ~PvaClientRPCRequester() {} + /** + * The request is done. This is always called with no locks held. + * @param status Completion status. + * @param pvaClientRPC The pvaClientRPC interface. + * @param pvResponse The response data for the RPC request or null if the request failed. + */ + virtual void requestDone( + const epics::pvData::Status& status, + PvaClientRPCPtr const & pvaClientRPC, + epics::pvData::PVStructure::shared_pointer const & pvResponse) = 0; +}; +// NOTE: must use separate class that implements RPCRequester, +// because pvAccess holds a shared_ptr to RPCRequester instead of weak_pointer +class epicsShareClass RPCRequesterImpl; +typedef std::tr1::shared_ptr RPCRequesterImplPtr; + +/** + * @brief An easy to use alternative to RPC. + * + */ +class epicsShareClass PvaClientRPC : + public std::tr1::enable_shared_from_this +{ +public: + POINTER_DEFINITIONS(PvaClientRPC); + /** Create a PvaClientRPC. + * @param &pvaClient Interface to PvaClient + * @param channel Interface to Channel + * @param pvRequest The request structure. + * @return The interface to the PvaClientRPC. + */ + static PvaClientRPCPtr create( + PvaClientPtr const &pvaClient, + epics::pvAccess::Channel::shared_pointer const & channel, + epics::pvData::PVStructurePtr const &pvRequest + ); + /** Destructor + */ + ~PvaClientRPC(); + /** Call issueConnect and then waitConnect. + * An exception is thrown if connect fails. + */ + void connect(); + /** Issue the channelRPC connection to the channel. + * This can only be called once. + * An exception is thrown if connect fails. + * @throw runtime_error if failure. + */ + void issueConnect(); + /** Wait until the channelRPC connection to the channel is complete. + * @return status; + */ + epics::pvData::Status waitConnect(); + /** issue a request and wait for response + * @param pvArgument The data to send to the service. + * @return The result + * @throw runtime_error if failure. + */ + epics::pvData::PVStructure::shared_pointer request( + epics::pvData::PVStructure::shared_pointer const & pvArgument); + /** issue a request and return immediately. + * @param pvArgument The data to send to the service. + * @param pvaClientRPCRequester The requester that is called with the result. + * @throw runtime_error if failure. + */ + void request( + epics::pvData::PVStructure::shared_pointer const & pvArgument, + PvaClientRPCRequesterPtr const & pvaClientRPCRequester); +private: + PvaClientRPC( + PvaClientPtr const &pvaClient, + epics::pvAccess::Channel::shared_pointer const & channel, + epics::pvData::PVStructurePtr const &pvRequest); + virtual std::string getRequesterName(); + virtual void message(std::string const & message,epics::pvData::MessageType messageType); + virtual void rpcConnect( + const epics::pvData::Status& status, + epics::pvAccess::ChannelRPC::shared_pointer const & channelRPC); + virtual void requestDone( + const epics::pvData::Status& status, + epics::pvAccess::ChannelRPC::shared_pointer const & channelRPC, + epics::pvData::PVStructure::shared_pointer const & pvResponse); + + void checkRPCState(); + + enum RPCConnectState {connectIdle,connectActive,connected}; + bool isDestroyed; + epics::pvData::Status connectStatus; + RPCConnectState connectState; + + enum RPCState {rpcIdle,rpcActive,rpcComplete}; + RPCState rpcState; + + PvaClient::weak_pointer pvaClient; + epics::pvAccess::Channel::weak_pointer channel; + epics::pvData::PVStructurePtr pvRequest; + epics::pvData::PVStructurePtr pvResponse; + epics::pvData::Mutex mutex; + epics::pvData::Event waitForConnect; + epics::pvData::Event waitForDone; + + PvaClientRPCRequesterWPtr pvaClientRPCRequester; + RPCRequesterImplPtr rpcRequester; + epics::pvAccess::ChannelRPC::shared_pointer channelRPC; + friend class RPCRequesterImpl; +}; + }} #endif /* PVACLIENT_H */ diff --git a/src/pvaClientChannel.cpp b/src/pvaClientChannel.cpp index 90ad206..52c032b 100644 --- a/src/pvaClientChannel.cpp +++ b/src/pvaClientChannel.cpp @@ -528,6 +528,30 @@ PvaClientMonitorPtr PvaClientChannel::createMonitor(PVStructurePtr const & pvR return PvaClientMonitor::create(yyy,channel,pvRequest); } +PVStructurePtr PvaClientChannel::rpc( + PVStructurePtr const & pvRequest, + PVStructurePtr const & pvArgument) +{ + + PvaClientRPCPtr rpc = createRPC(pvRequest); + return rpc->request(pvArgument); +} + +PVStructurePtr PvaClientChannel::rpc( + PVStructurePtr const & pvArgument) +{ + return rpc(pvArgument,pvArgument); +} + + +PvaClientRPCPtr PvaClientChannel::createRPC(PVStructurePtr const & pvRequest) +{ + if(connectState!=connected) connect(5.0); + PvaClientPtr yyy = pvaClient.lock(); + if(!yyy) throw std::runtime_error("PvaClient was destroyed"); + return PvaClientRPC::create(yyy,channel,pvRequest); +} + void PvaClientChannel::showCache() { if(pvaClientGetCache->cacheSize()>=1) { diff --git a/src/pvaClientRPC.cpp b/src/pvaClientRPC.cpp new file mode 100644 index 0000000..83da666 --- /dev/null +++ b/src/pvaClientRPC.cpp @@ -0,0 +1,280 @@ +/* pvaClientRPC.cpp */ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * EPICS pvData is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2015.03 + */ + +#include +#include +#include + +#define epicsExportSharedSymbols + +#include + +using std::tr1::static_pointer_cast; +using std::tr1::dynamic_pointer_cast; +using namespace epics::pvData; +using namespace epics::pvAccess; +using namespace std; + +namespace epics { namespace pvaClient { + +class RPCRequesterImpl : public ChannelRPCRequester +{ + PvaClientRPC::weak_pointer pvaClientRPC; + PvaClient::weak_pointer pvaClient; +public: + RPCRequesterImpl( + PvaClientRPCPtr const & pvaClientRPC, + PvaClientPtr const &pvaClient) + : pvaClientRPC(pvaClientRPC), + pvaClient(pvaClient) + {} + virtual ~RPCRequesterImpl() { + if(PvaClient::getDebug()) std::cout << "~RPCRequesterImpl" << std::endl; + } + + virtual std::string getRequesterName() { + PvaClientRPCPtr clientRPC(pvaClientRPC.lock()); + if(!clientRPC) return string("pvaClientRPC is null"); + return clientRPC->getRequesterName(); + } + + virtual void message(std::string const & message, epics::pvData::MessageType messageType) { + PvaClientRPCPtr clientRPC(pvaClientRPC.lock()); + if(!clientRPC) return; + clientRPC->message(message,messageType); + } + + virtual void channelRPCConnect( + const epics::pvData::Status& status, + ChannelRPC::shared_pointer const & channelRPC) + { + PvaClientRPCPtr clientRPC(pvaClientRPC.lock()); + if(!clientRPC) return; + clientRPC->rpcConnect(status,channelRPC); + } + + virtual void requestDone( + const Status& status, + ChannelRPC::shared_pointer const & channelRPC, + PVStructure::shared_pointer const & pvResponse) + { + PvaClientRPCPtr clientRPC(pvaClientRPC.lock()); + if(!clientRPC) return; + clientRPC->requestDone(status,channelRPC,pvResponse); + } +}; + + +PvaClientRPCPtr PvaClientRPC::create( + PvaClientPtr const &pvaClient, + Channel::shared_pointer const & channel, + PVStructurePtr const &pvRequest) +{ + PvaClientRPCPtr epv(new PvaClientRPC(pvaClient,channel,pvRequest)); + epv->rpcRequester = RPCRequesterImplPtr( + new RPCRequesterImpl(epv,pvaClient)); + return epv; +} + +PvaClientRPC::PvaClientRPC( + PvaClientPtr const &pvaClient, + Channel::shared_pointer const & channel, + PVStructurePtr const &pvRequest) +: isDestroyed(false), + connectState(connectIdle), + rpcState(rpcIdle), + pvaClient(pvaClient), + channel(channel), + pvRequest(pvRequest) +{ + if(PvaClient::getDebug()) { + cout<< "PvaClientRPC::PvaClientRPC()" + << " channelName " << channel->getChannelName() + << endl; + } +} + +PvaClientRPC::~PvaClientRPC() +{ + if(PvaClient::getDebug()) cout<< "PvaClientRPC::~PvaClientRPC\n"; + { + Lock xx(mutex); + if(isDestroyed) throw std::runtime_error("pvaClientRPC was destroyed"); + isDestroyed = true; + } + if(PvaClient::getDebug()) { + string channelName("disconnected"); + Channel::shared_pointer chan(channel.lock()); + if(chan) channelName = chan->getChannelName(); + cout<< "PvaClientRPC::~PvaClientRPC" + << " channelName " << channelName + << endl; + } +} + +void PvaClientRPC::checkRPCState() +{ + if(PvaClient::getDebug()) { + string channelName("disconnected"); + Channel::shared_pointer chan(channel.lock()); + if(chan) channelName = chan->getChannelName(); + cout << "PvaClientRPC::checkRPCState" + << " channelName " << channelName + << " connectState " << connectState + << endl; + } + if(connectState==connectIdle) connect(); +} + +string PvaClientRPC::getRequesterName() +{ + PvaClientPtr yyy = pvaClient.lock(); + if(!yyy) return string("PvaClientRPC::getRequesterName() PvaClient isDestroyed"); + return yyy->getRequesterName(); +} + +void PvaClientRPC::message(string const & message,MessageType messageType) +{ + PvaClientPtr yyy = pvaClient.lock(); + if(!yyy) return; + yyy->message(message, messageType); +} + +void PvaClientRPC::rpcConnect( + const Status& status, + ChannelRPC::shared_pointer const & channelRPC) +{ + Channel::shared_pointer chan(channel.lock()); + if(PvaClient::getDebug()) { + string channelName("disconnected"); + Channel::shared_pointer chan(channel.lock()); + if(chan) channelName = chan->getChannelName(); + cout << "PvaClientRPC::rpcConnect" + << " channelName " << channelName + << " status.isOK " << (status.isOK() ? "true" : "false") + << endl; + } + if(!chan) return; + connectStatus = status; + connectState = connected; + if(PvaClient::getDebug()) { + cout << "PvaClientRPC::rpcConnect calling waitForConnect.signal\n"; + } + waitForConnect.signal(); + +} + +void PvaClientRPC::requestDone( + const Status& status, + ChannelRPC::shared_pointer const & channelRPC, + PVStructure::shared_pointer const & pvResponse) +{ + if(PvaClient::getDebug()) { + string channelName("disconnected"); + Channel::shared_pointer chan(channel.lock()); + if(chan) channelName = chan->getChannelName(); + cout << "PvaClientRPC::requesyDone" + << " channelName " << channelName + << endl; + } + PvaClientRPCRequesterPtr req = pvaClientRPCRequester.lock(); + if(req) { + req->requestDone(status,shared_from_this(),pvResponse); + return; + } + this->pvResponse = pvResponse; + waitForDone.signal(); +} + +void PvaClientRPC::connect() +{ + if(PvaClient::getDebug()) cout << "PvaClientRPC::connect\n"; + issueConnect(); + Status status = waitConnect(); + if(status.isOK()) return; + Channel::shared_pointer chan(channel.lock()); + string channelName("disconnected"); + if(chan) channelName = chan->getChannelName(); + string message = string("channel ") + + channelName + + " PvaClientRPC::connect " + + status.getMessage(); + throw std::runtime_error(message); +} + +void PvaClientRPC::issueConnect() +{ + if(PvaClient::getDebug()) cout << "PvaClientRPC::issueConnect\n"; + Channel::shared_pointer chan(channel.lock()); + if(connectState!=connectIdle) { + string channelName("disconnected"); + if(chan) channelName = chan->getChannelName(); + string message = string("channel ") + + channelName + + " pvaClientRPC already connected "; + throw std::runtime_error(message); + } + if(chan) { + connectState = connectActive; + channelRPC = chan->createChannelRPC(rpcRequester,pvRequest); + return; + } + throw std::runtime_error("PvaClientRPC::issueConnect() but channel disconnected"); +} + +Status PvaClientRPC::waitConnect() +{ + if(PvaClient::getDebug()) cout << "PvaClientRPC::waitConnect\n"; + if(connectState==connected) { + if(!connectStatus.isOK()) connectState = connectIdle; + return connectStatus; + } + if(connectState!=connectActive) { + Channel::shared_pointer chan(channel.lock()); + string channelName("disconnected"); + if(chan) channelName = chan->getChannelName(); + string message = string("channel ") + + channelName + + " PvaClientRPC::waitConnect illegal connect state "; + throw std::runtime_error(message); + } + if(PvaClient::getDebug()) { + cout << "PvaClientRPC::waitConnect calling waitForConnect.wait\n"; + } + waitForConnect.wait(); + connectState = connectStatus.isOK() ? connected : connectIdle; + if(PvaClient::getDebug()) { + cout << "PvaClientRPC::waitConnect" + << " connectStatus " << (connectStatus.isOK() ? "connected" : "not connected"); + } + return connectStatus; +} + +PVStructure::shared_pointer PvaClientRPC::request(PVStructure::shared_pointer const & pvArgument) +{ + checkRPCState(); + channelRPC->request(pvArgument); + waitForDone.wait(); + return pvResponse; +} + + +void PvaClientRPC::request( + PVStructure::shared_pointer const & pvArgument, + PvaClientRPCRequesterPtr const & pvaClientRPCRequester) +{ + checkRPCState(); + this->pvaClientRPCRequester = pvaClientRPCRequester; + channelRPC->request(pvArgument); +} + + +}}