add request timout to PvaClientRPC

This commit is contained in:
mrkraimer
2016-07-22 14:41:21 -04:00
parent 051c924992
commit eed64822e0
3 changed files with 105 additions and 17 deletions

View File

@ -6,7 +6,7 @@ include $(TOP)/configure/CONFIG
LIBRARY += pvaClient LIBRARY += pvaClient
# shared library ABI version. # shared library ABI version.
SHRLIB_VERSION ?= 4.2-DEV SHRLIB_VERSION ?= 4.2.0
INC += pv/pvaClient.h INC += pv/pvaClient.h
INC += pv/pvaClientMultiChannel.h INC += pv/pvaClientMultiChannel.h

View File

@ -1566,6 +1566,22 @@ public:
/** @brief Destructor /** @brief Destructor
*/ */
~PvaClientRPC(); ~PvaClientRPC();
/**
* @brief Set a timeout for a request.
* @param responseTimeout The time in seconds to wait for a request to complete.
*/
void setResponseTimeout(double responseTimeout)
{
this->responseTimeout = responseTimeout;
}
/**
* @brief Get the responseTimeout.
* @return The value.
*/
double getResponseTimeout()
{
return responseTimeout;
}
/** @brief Call issueConnect and then waitConnect. /** @brief Call issueConnect and then waitConnect.
* *
* An exception is thrown if connect fails. * An exception is thrown if connect fails.
@ -1583,6 +1599,9 @@ public:
*/ */
epics::pvData::Status waitConnect(); epics::pvData::Status waitConnect();
/** @brief Issue a request and wait for response /** @brief Issue a request and wait for response
*
* Note that if responseTimeout is ( lt 0.0, ge 0.0) then this (will, will not) block
* until response completes or timeout.
* @param pvArgument The data to send to the service. * @param pvArgument The data to send to the service.
* @return The result * @return The result
* @throw runtime_error if failure. * @throw runtime_error if failure.
@ -1621,7 +1640,7 @@ private:
PvaClient::weak_pointer pvaClient; PvaClient::weak_pointer pvaClient;
epics::pvAccess::Channel::weak_pointer channel; epics::pvAccess::Channel::weak_pointer channel;
epics::pvData::PVStructurePtr pvRequest; epics::pvData::PVStructurePtr pvRequest;
epics::pvData::PVStructurePtr pvResponse;
epics::pvData::Mutex mutex; epics::pvData::Mutex mutex;
epics::pvData::Event waitForConnect; epics::pvData::Event waitForConnect;
epics::pvData::Event waitForDone; epics::pvData::Event waitForDone;
@ -1629,6 +1648,11 @@ private:
PvaClientRPCRequesterWPtr pvaClientRPCRequester; PvaClientRPCRequesterWPtr pvaClientRPCRequester;
RPCRequesterImplPtr rpcRequester; RPCRequesterImplPtr rpcRequester;
epics::pvAccess::ChannelRPC::shared_pointer channelRPC; epics::pvAccess::ChannelRPC::shared_pointer channelRPC;
epics::pvData::PVStructurePtr pvResponse;
enum RPCState {rpcIdle,rpcActive,rpcComplete};
RPCState rpcState;
double responseTimeout;
friend class RPCRequesterImpl; friend class RPCRequesterImpl;
}; };

View File

@ -99,7 +99,9 @@ PvaClientRPC::PvaClientRPC(
connectState(connectIdle), connectState(connectIdle),
pvaClient(pvaClient), pvaClient(pvaClient),
channel(channel), channel(channel),
pvRequest(pvRequest) pvRequest(pvRequest),
rpcState(rpcIdle),
responseTimeout(0.0)
{ {
if(PvaClient::getDebug()) { if(PvaClient::getDebug()) {
cout<< "PvaClientRPC::PvaClientRPC()" cout<< "PvaClientRPC::PvaClientRPC()"
@ -177,21 +179,38 @@ void PvaClientRPC::requestDone(
ChannelRPC::shared_pointer const & channelRPC, ChannelRPC::shared_pointer const & channelRPC,
PVStructure::shared_pointer const & pvResponse) PVStructure::shared_pointer const & pvResponse)
{ {
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientRPC::requesyDone"
<< " channelName " << channelName
<< endl;
}
PvaClientRPCRequesterPtr req = pvaClientRPCRequester.lock(); PvaClientRPCRequesterPtr req = pvaClientRPCRequester.lock();
{
Lock xx(mutex);
if(PvaClient::getDebug()) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
cout << "PvaClientRPC::requestDone"
<< " channelName " << channelName
<< endl;
}
if(rpcState!=rpcActive) {
string channelName("disconnected");
Channel::shared_pointer chan(channel.lock());
if(chan) channelName = chan->getChannelName();
string message = "channel "
+ channelName
+" PvaClientRPC::requestDone"
+ " but not active";
throw std::runtime_error(message);
}
if(req && (responseTimeout<=0.0)) {
rpcState = rpcIdle;
} else {
rpcState = rpcComplete;
if(!req) this->pvResponse = pvResponse;
waitForDone.signal();
}
}
if(req) { if(req) {
req->requestDone(status,shared_from_this(),pvResponse); req->requestDone(status,shared_from_this(),pvResponse);
return;
} }
this->pvResponse = pvResponse;
waitForDone.signal();
} }
void PvaClientRPC::connect() void PvaClientRPC::connect()
@ -261,8 +280,36 @@ Status PvaClientRPC::waitConnect()
PVStructure::shared_pointer PvaClientRPC::request(PVStructure::shared_pointer const & pvArgument) PVStructure::shared_pointer PvaClientRPC::request(PVStructure::shared_pointer const & pvArgument)
{ {
checkRPCState(); checkRPCState();
{
Lock xx(mutex);
if(rpcState!=rpcIdle) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = "channel "
+ channelName
+ " PvaClientRPC::request request aleady active ";
throw std::runtime_error(message);
}
rpcState = rpcActive;
}
channelRPC->request(pvArgument); channelRPC->request(pvArgument);
waitForDone.wait(); if(responseTimeout>0.0) {
waitForDone.wait(responseTimeout);
} else {
waitForDone.wait();
}
Lock xx(mutex);
if(rpcState!=rpcComplete) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = "channel "
+ channelName
+ " PvaClientRPC::request request timeout ";
throw std::runtime_error(message);
}
rpcState = rpcIdle;
return pvResponse; return pvResponse;
} }
@ -271,9 +318,26 @@ void PvaClientRPC::request(
PVStructure::shared_pointer const & pvArgument, PVStructure::shared_pointer const & pvArgument,
PvaClientRPCRequesterPtr const & pvaClientRPCRequester) PvaClientRPCRequesterPtr const & pvaClientRPCRequester)
{ {
checkRPCState(); checkRPCState();
this->pvaClientRPCRequester = pvaClientRPCRequester; this->pvaClientRPCRequester = pvaClientRPCRequester;
channelRPC->request(pvArgument); if(responseTimeout<=0.0) {
{
Lock xx(mutex);
if(rpcState!=rpcIdle) {
Channel::shared_pointer chan(channel.lock());
string channelName("disconnected");
if(chan) channelName = chan->getChannelName();
string message = "channel "
+ channelName
+ " PvaClientRPC::request request aleady active ";
throw std::runtime_error(message);
}
rpcState = rpcActive;
}
channelRPC->request(pvArgument);
return;
}
request(pvArgument);
} }