From eed64822e0159c7a706fec9f49d0f9ebe6847df0 Mon Sep 17 00:00:00 2001 From: mrkraimer Date: Fri, 22 Jul 2016 14:41:21 -0400 Subject: [PATCH] add request timout to PvaClientRPC --- src/Makefile | 2 +- src/pv/pvaClient.h | 26 +++++++++++- src/pvaClientRPC.cpp | 94 +++++++++++++++++++++++++++++++++++++------- 3 files changed, 105 insertions(+), 17 deletions(-) diff --git a/src/Makefile b/src/Makefile index f96ede2..7e945c7 100644 --- a/src/Makefile +++ b/src/Makefile @@ -6,7 +6,7 @@ include $(TOP)/configure/CONFIG LIBRARY += pvaClient # shared library ABI version. -SHRLIB_VERSION ?= 4.2-DEV +SHRLIB_VERSION ?= 4.2.0 INC += pv/pvaClient.h INC += pv/pvaClientMultiChannel.h diff --git a/src/pv/pvaClient.h b/src/pv/pvaClient.h index 25d0a1e..a554e50 100644 --- a/src/pv/pvaClient.h +++ b/src/pv/pvaClient.h @@ -1566,6 +1566,22 @@ public: /** @brief Destructor */ ~PvaClientRPC(); + /** + * @brief Set a timeout for a request. + * @param responseTimeout The time in seconds to wait for a request to complete. + */ + void setResponseTimeout(double responseTimeout) + { + this->responseTimeout = responseTimeout; + } + /** + * @brief Get the responseTimeout. + * @return The value. + */ + double getResponseTimeout() + { + return responseTimeout; + } /** @brief Call issueConnect and then waitConnect. * * An exception is thrown if connect fails. @@ -1583,6 +1599,9 @@ public: */ epics::pvData::Status waitConnect(); /** @brief Issue a request and wait for response + * + * Note that if responseTimeout is ( lt 0.0, ge 0.0) then this (will, will not) block + * until response completes or timeout. * @param pvArgument The data to send to the service. * @return The result * @throw runtime_error if failure. @@ -1621,7 +1640,7 @@ private: PvaClient::weak_pointer pvaClient; epics::pvAccess::Channel::weak_pointer channel; epics::pvData::PVStructurePtr pvRequest; - epics::pvData::PVStructurePtr pvResponse; + epics::pvData::Mutex mutex; epics::pvData::Event waitForConnect; epics::pvData::Event waitForDone; @@ -1629,6 +1648,11 @@ private: PvaClientRPCRequesterWPtr pvaClientRPCRequester; RPCRequesterImplPtr rpcRequester; epics::pvAccess::ChannelRPC::shared_pointer channelRPC; + epics::pvData::PVStructurePtr pvResponse; + + enum RPCState {rpcIdle,rpcActive,rpcComplete}; + RPCState rpcState; + double responseTimeout; friend class RPCRequesterImpl; }; diff --git a/src/pvaClientRPC.cpp b/src/pvaClientRPC.cpp index f58b396..bcc21bc 100644 --- a/src/pvaClientRPC.cpp +++ b/src/pvaClientRPC.cpp @@ -99,7 +99,9 @@ PvaClientRPC::PvaClientRPC( connectState(connectIdle), pvaClient(pvaClient), channel(channel), - pvRequest(pvRequest) + pvRequest(pvRequest), + rpcState(rpcIdle), + responseTimeout(0.0) { if(PvaClient::getDebug()) { cout<< "PvaClientRPC::PvaClientRPC()" @@ -177,21 +179,38 @@ void PvaClientRPC::requestDone( ChannelRPC::shared_pointer const & channelRPC, PVStructure::shared_pointer const & pvResponse) { - if(PvaClient::getDebug()) { - string channelName("disconnected"); - Channel::shared_pointer chan(channel.lock()); - if(chan) channelName = chan->getChannelName(); - cout << "PvaClientRPC::requesyDone" - << " channelName " << channelName - << endl; - } PvaClientRPCRequesterPtr req = pvaClientRPCRequester.lock(); + { + Lock xx(mutex); + if(PvaClient::getDebug()) { + string channelName("disconnected"); + Channel::shared_pointer chan(channel.lock()); + if(chan) channelName = chan->getChannelName(); + cout << "PvaClientRPC::requestDone" + << " channelName " << channelName + << endl; + } + if(rpcState!=rpcActive) { + string channelName("disconnected"); + Channel::shared_pointer chan(channel.lock()); + if(chan) channelName = chan->getChannelName(); + string message = "channel " + + channelName + +" PvaClientRPC::requestDone" + + " but not active"; + throw std::runtime_error(message); + } + if(req && (responseTimeout<=0.0)) { + rpcState = rpcIdle; + } else { + rpcState = rpcComplete; + if(!req) this->pvResponse = pvResponse; + waitForDone.signal(); + } + } if(req) { req->requestDone(status,shared_from_this(),pvResponse); - return; } - this->pvResponse = pvResponse; - waitForDone.signal(); } void PvaClientRPC::connect() @@ -261,8 +280,36 @@ Status PvaClientRPC::waitConnect() PVStructure::shared_pointer PvaClientRPC::request(PVStructure::shared_pointer const & pvArgument) { checkRPCState(); + { + Lock xx(mutex); + if(rpcState!=rpcIdle) { + Channel::shared_pointer chan(channel.lock()); + string channelName("disconnected"); + if(chan) channelName = chan->getChannelName(); + string message = "channel " + + channelName + + " PvaClientRPC::request request aleady active "; + throw std::runtime_error(message); + } + rpcState = rpcActive; + } channelRPC->request(pvArgument); - waitForDone.wait(); + if(responseTimeout>0.0) { + waitForDone.wait(responseTimeout); + } else { + waitForDone.wait(); + } + Lock xx(mutex); + if(rpcState!=rpcComplete) { + Channel::shared_pointer chan(channel.lock()); + string channelName("disconnected"); + if(chan) channelName = chan->getChannelName(); + string message = "channel " + + channelName + + " PvaClientRPC::request request timeout "; + throw std::runtime_error(message); + } + rpcState = rpcIdle; return pvResponse; } @@ -271,9 +318,26 @@ void PvaClientRPC::request( PVStructure::shared_pointer const & pvArgument, PvaClientRPCRequesterPtr const & pvaClientRPCRequester) { - checkRPCState(); + checkRPCState(); this->pvaClientRPCRequester = pvaClientRPCRequester; - channelRPC->request(pvArgument); + if(responseTimeout<=0.0) { + { + Lock xx(mutex); + if(rpcState!=rpcIdle) { + Channel::shared_pointer chan(channel.lock()); + string channelName("disconnected"); + if(chan) channelName = chan->getChannelName(); + string message = "channel " + + channelName + + " PvaClientRPC::request request aleady active "; + throw std::runtime_error(message); + } + rpcState = rpcActive; + } + channelRPC->request(pvArgument); + return; + } + request(pvArgument); }