diff --git a/src/rpcClient/pv/rpcClient.h b/src/rpcClient/pv/rpcClient.h index fd26e4c..3dcde89 100644 --- a/src/rpcClient/pv/rpcClient.h +++ b/src/rpcClient/pv/rpcClient.h @@ -40,14 +40,6 @@ class epicsShareClass RPCClient public: POINTER_DEFINITIONS(RPCClient); - /** - * Create a RPCClient. - * - * @param serviceName the service name - * @return the RPCClient interface - */ - static shared_pointer create(const std::string & serviceName); - /** * Create a RPCClient. * @@ -56,7 +48,16 @@ public: * @return the RPCClient interface */ static shared_pointer create(const std::string & serviceName, - epics::pvData::PVStructure::shared_pointer const & pvRequest); + epics::pvData::PVStructure::shared_pointer const & pvRequest = epics::pvData::PVStructure::shared_pointer()); + + RPCClient(const std::string & serviceName, + epics::pvData::PVStructure::shared_pointer const & pvRequest); + + RPCClient(const ChannelProvider::shared_pointer& provider, + const std::string & serviceName, + epics::pvData::PVStructure::shared_pointer const & pvRequest); + + ~RPCClient() {destroy();} /** * Performs complete blocking RPC call, opening a channel and connecting to the @@ -136,15 +137,21 @@ public: */ epics::pvData::PVStructure::shared_pointer waitResponse(double timeout = RPCCLIENT_DEFAULT_TIMEOUT); - virtual ~RPCClient() {} +private: + void construct(const ChannelProvider::shared_pointer& provider, + const std::string & serviceName, + epics::pvData::PVStructure::shared_pointer const & pvRequest); -protected: - RPCClient(const std::string & serviceName, - epics::pvData::PVStructure::shared_pointer const & pvRequest); - - std::string m_serviceName; + const std::string m_serviceName; Channel::shared_pointer m_channel; - epics::pvData::PVStructure::shared_pointer m_pvRequest; + ChannelRPC::shared_pointer m_rpc; + const epics::pvData::PVStructure::shared_pointer m_pvRequest; + + struct RPCRequester; + std::tr1::shared_ptr m_rpc_requester; + + RPCClient(const RPCClient&); + RPCClient& operator=(const RPCClient&); }; } diff --git a/src/rpcClient/rpcClient.cpp b/src/rpcClient/rpcClient.cpp index 55d5749..38917ca 100644 --- a/src/rpcClient/rpcClient.cpp +++ b/src/rpcClient/rpcClient.cpp @@ -7,8 +7,10 @@ #include #include +#include #include #include +#include #define epicsExportSharedSymbols #include @@ -18,10 +20,39 @@ #include "pv/rpcClient.h" +#if 0 +# define TRACE(msg) std::cerr<<"TRACE: "<getChannelName() << "] channel create: " << status << std::endl; - } - } - else - { - std::cerr << "[" << channel->getChannelName() << "] failed to create a channel: " << status << std::endl; - - { - Lock lock(m_mutex); - m_status = status; - } - m_connectionEvent.signal(); - } - } - - void channelStateChange( - Channel::shared_pointer const & channel, - Channel::ConnectionState connectionState) - { - if (connectionState == Channel::CONNECTED) - { - bool rpcAlreadyConnectedOnce = false; - { - Lock lock(m_mutex); - rpcAlreadyConnectedOnce = (m_channelRPC.get() != 0); - } - - if (!rpcAlreadyConnectedOnce) - { - channel->createChannelRPC(shared_from_this(), m_pvRequest); - } - } - /* - else if (connectionState != Channel::DESTROYED) - { - std::cerr << "[" << channel->getChannelName() << "] channel state change: " << Channel::ConnectionStateNames[connectionState] << std::endl; - } - */ - } + virtual std::string getRequesterName() { return "RPCClient::RPCRequester"; } virtual void channelRPCConnect( - const epics::pvData::Status & status, - ChannelRPC::shared_pointer const & channelRPC) + const pvd::Status& status, + ChannelRPC::shared_pointer const & operation) { - if (status.isSuccess()) + bool lastreq, inprog; + pvd::PVStructure::shared_pointer args; { - if (!status.isOK()) - std::cerr << "[" << channelRPC->getChannel()->getChannelName() << "] channel RPC create: " << status << std::endl; + pvd::Lock L(mutex); + TRACE("status="<getChannel()->getChannelName() << "] failed to create channel RPC: " << status << std::endl; + if(inprog && args) { + TRACE("request deferred: "<lastRequest(); + operation->request(args); } - - { - Lock lock(m_mutex); - m_status = status; - m_channelRPC = channelRPC; - } - - m_connectionEvent.signal(); + event.signal(); } virtual void requestDone( - const epics::pvData::Status & status, - ChannelRPC::shared_pointer const & channelRPC, - epics::pvData::PVStructure::shared_pointer const & pvResponse) + const pvd::Status& status, + ChannelRPC::shared_pointer const & operation, + pvd::PVStructure::shared_pointer const & pvResponse) { - if (status.isSuccess()) + TRACE("status="<getChannel()->getChannelName() << "] channel RPC: " << status << std::endl; + pvd::Lock L(mutex); + if(!inprogress) { + std::cerr<<"pva provider give RPC requestDone() when no request in progress\n"; + } else { + resp_status = status; + last_data = pvResponse; + if(resp_status.isSuccess() && !last_data) { + resp_status = pvd::Status::error("No reply data"); + } + inprogress = false; + } } - else + event.signal(); + } + + virtual void channelDisconnect(bool destroy) + { + TRACE("destroy="<getChannel()->getChannelName() << "] failed to RPC: " << status << std::endl; + pvd::Lock L(mutex); + resp_status = conn_status = pvd::Status::error("Connection lost"); + last_data.reset(); + next_args.reset(); + inprogress = false; } - - { - Lock lock(m_mutex); - m_status = status; - m_response = pvResponse; - } - - m_event.signal(); - } - - bool waitForResponse(double timeOut) - { - return m_event.wait(timeOut); - } - - bool waitUntilRPCConnected(double timeOut) - { - if (isRPCConnected()) - return true; - - return m_connectionEvent.wait(timeOut); - } - - bool isRPCConnected() - { - Lock lock(m_mutex); - return (m_channelRPC.get() != 0); - } - - PVStructure::shared_pointer & getResponse() - { - Lock lock(m_mutex); - return m_response; - } - - Status & getStatus() - { - Lock lock(m_mutex); - return m_status; - } - - void request(PVStructure::shared_pointer const & pvArgument, bool lastRequest) - { - ChannelRPC::shared_pointer rpc; - { - Lock lock(m_mutex); - rpc = m_channelRPC; - } - - if (!rpc) - throw std::runtime_error("channel RPC not connected"); - - if (lastRequest) - rpc->lastRequest(); - - rpc->request(pvArgument); + event.signal(); } }; @@ -213,18 +149,47 @@ public: - - - - - - RPCClient::RPCClient(const std::string & serviceName, - PVStructure::shared_pointer const & pvRequest) - : m_serviceName(serviceName), m_pvRequest(pvRequest) + pvd::PVStructure::shared_pointer const & pvRequest) + : m_serviceName(serviceName), m_pvRequest(pvRequest ? pvRequest : pvd::createRequest("")) { + ClientFactory::start(); + ChannelProvider::shared_pointer provider(getChannelProviderRegistry()->getProvider("pva")); + if(!provider) + throw std::logic_error("Unknown Provider"); + construct(provider, serviceName, pvRequest); } +RPCClient::RPCClient(const ChannelProvider::shared_pointer& provider, + const std::string & serviceName, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +{ + construct(provider, serviceName, pvRequest); +} + +void RPCClient::construct(const ChannelProvider::shared_pointer& provider, + const std::string & serviceName, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +{ + assert(provider); + + DummyChannelRequester::shared_pointer dummy(new DummyChannelRequester); + m_channel = provider->createChannel(serviceName, dummy); + { + pvd::Lock L(dummy->mutex); + if(!dummy->status.isSuccess()) + throw std::runtime_error(dummy->status.getMessage()); + } + if(!m_channel) + throw std::logic_error("provider createChannel() succeeds w/ NULL Channel"); + + m_rpc_requester.reset(new RPCRequester); + m_rpc = m_channel->createChannelRPC(m_rpc_requester, m_pvRequest); + if(!m_rpc) + throw std::logic_error("channel createChannelRPC() NULL"); +} + + void RPCClient::destroy() { if (m_channel) @@ -232,43 +197,43 @@ void RPCClient::destroy() m_channel->destroy(); m_channel.reset(); } + if (m_rpc) + { + m_rpc->destroy(); + m_rpc.reset(); + } } bool RPCClient::connect(double timeout) { - if (m_channel && - TR1::dynamic_pointer_cast(m_channel->getChannelRequester())->isRPCConnected()) - return true; - issueConnect(); return waitConnect(timeout); } void RPCClient::issueConnect() { - ChannelProvider::shared_pointer provider = getChannelProviderRegistry()->getProvider("pva"); - - // TODO try to reuse ChannelRequesterImpl instance (i.e. create only once) - TR1::shared_ptr channelRequesterImpl(new ChannelAndRPCRequesterImpl(m_pvRequest)); - m_channel = provider->createChannel(m_serviceName, channelRequesterImpl); } bool RPCClient::waitConnect(double timeout) { - if (!m_channel) - throw std::runtime_error("issueConnect() must be called before waitConnect()"); - - TR1::shared_ptr channelRequesterImpl = - TR1::dynamic_pointer_cast(m_channel->getChannelRequester()); - - return channelRequesterImpl->waitUntilRPCConnected(timeout) && - channelRequesterImpl->isRPCConnected(); + pvd::Lock L(m_rpc_requester->mutex); + TRACE("timeout="<conn_status.isSuccess()) { + L.unlock(); + if(!m_rpc_requester->event.wait(timeout)) { + TRACE("TIMEOUT"); + return false; + } + L.lock(); + } + TRACE("Connected"); + return true; } -PVStructure::shared_pointer RPCClient::request( - PVStructure::shared_pointer const & pvArgument, +pvd::PVStructure::shared_pointer RPCClient::request( + pvd::PVStructure::shared_pointer const & pvArgument, double timeout, bool lastRequest) { @@ -278,65 +243,88 @@ PVStructure::shared_pointer RPCClient::request( return waitResponse(timeout); // TODO reduce timeout for a time spent on connect } else - throw epics::pvAccess::RPCRequestException(Status::STATUSTYPE_ERROR, "connection timeout"); + throw epics::pvAccess::RPCRequestException(pvd::Status::STATUSTYPE_ERROR, "connection timeout"); } void RPCClient::issueRequest( - PVStructure::shared_pointer const & pvArgument, + pvd::PVStructure::shared_pointer const & pvArgument, bool lastRequest) { - if (!m_channel) - throw std::runtime_error("channel not connected"); - - TR1::shared_ptr channelRequesterImpl = - TR1::dynamic_pointer_cast(m_channel->getChannelRequester()); - - channelRequesterImpl->request(pvArgument, lastRequest); -} - -PVStructure::shared_pointer RPCClient::waitResponse(double timeout) -{ - TR1::shared_ptr channelRequesterImpl = - TR1::dynamic_pointer_cast(m_channel->getChannelRequester()); - - if (channelRequesterImpl->waitForResponse(timeout)) { - Status & status = channelRequesterImpl->getStatus(); - if (status.isSuccess()) - { - // release response structure - PVStructure::shared_pointer & response = channelRequesterImpl->getResponse(); - PVStructure::shared_pointer retVal = response; - response.reset(); - return retVal; + pvd::Lock L(m_rpc_requester->mutex); + TRACE("conn_status="<conn_status + <<" resp_status="<resp_status + <<" args:\n"<inprogress) + throw std::logic_error("Request already in progress"); + m_rpc_requester->inprogress = true; + m_rpc_requester->resp_status = pvd::Status::error("No Data"); + if(!m_rpc_requester->conn_status.isSuccess()) { + TRACE("defer"); + m_rpc_requester->last = lastRequest; + m_rpc_requester->next_args = pvArgument; + return; } - else - throw epics::pvAccess::RPCRequestException(status.getType(), status.getMessage()); + TRACE("request args: "<lastRequest(); + m_rpc->request(pvArgument); } -RPCClient::shared_pointer RPCClient::create(const std::string & serviceName) +pvd::PVStructure::shared_pointer RPCClient::waitResponse(double timeout) { - PVStructure::shared_pointer pvRequest = - CreateRequest::create()->createRequest(""); - return create(serviceName, pvRequest); + pvd::Lock L(m_rpc_requester->mutex); + TRACE("timeout="<inprogress) + throw std::logic_error("No request in progress"); + + while(m_rpc_requester->inprogress) + { + L.unlock(); + if(!m_rpc_requester->event.wait(timeout)) { + TRACE("TIMEOUT"); + throw RPCRequestException(pvd::Status::STATUSTYPE_ERROR, "RPC timeout"); + } + L.lock(); + } + TRACE("Complete: conn_status="<conn_status + <<" resp_status="<resp_status + <<" data:\n"<last_data); + + if(!m_rpc_requester->conn_status.isSuccess()) + throw RPCRequestException(pvd::Status::STATUSTYPE_ERROR, m_rpc_requester->conn_status.getMessage()); + + if(!m_rpc_requester->resp_status.isSuccess()) + throw RPCRequestException(pvd::Status::STATUSTYPE_ERROR, m_rpc_requester->resp_status.getMessage()); + + // consume last_data so that we can't possibly return it twice + pvd::PVStructure::shared_pointer data; + data.swap(m_rpc_requester->last_data); + + if(!data) + throw std::logic_error("No reply data?!?"); + + // copy it so that the caller need not worry about whether it will overwritten + // when the next request is issued + pvd::PVStructure::shared_pointer ret(pvd::getPVDataCreate()->createPVStructure(data->getStructure())); + ret->copyUnchecked(*data); + + return ret; } RPCClient::shared_pointer RPCClient::create(const std::string & serviceName, - PVStructure::shared_pointer const & pvRequest) + pvd::PVStructure::shared_pointer const & pvRequest) { - ClientFactory::start(); return RPCClient::shared_pointer(new RPCClient(serviceName, pvRequest)); } -PVStructure::shared_pointer RPCClient::sendRequest(const std::string & serviceName, - PVStructure::shared_pointer const & queryRequest, +pvd::PVStructure::shared_pointer RPCClient::sendRequest(const std::string & serviceName, + pvd::PVStructure::shared_pointer const & queryRequest, double timeOut) { - RPCClient::shared_pointer client = RPCClient::create(serviceName); - return client->request(queryRequest, timeOut); + RPCClient client(serviceName, queryRequest); + return client.request(queryRequest, timeOut); } diff --git a/testApp/remote/rpcClientExample.cpp b/testApp/remote/rpcClientExample.cpp index 21e1b74..2d6c2f4 100644 --- a/testApp/remote/rpcClientExample.cpp +++ b/testApp/remote/rpcClientExample.cpp @@ -24,48 +24,43 @@ int main() request->getSubField("a")->put("3.14"); request->getSubField("b")->put("2.71"); - // simplest way + std::cout<<"simplest way\n"; try { PVStructure::shared_pointer result = RPCClient::sendRequest("sum", request, TIMEOUT); - std::cout << *result << std::endl; - } catch (RPCRequestException &e) - { - std::cout << e.what() << std::endl; - return 1; - } - - - // simple sync way, allows multiple RPC calls on the clinet instance - try - { - RPCClient::shared_pointer client = RPCClient::create("sum"); - PVStructure::shared_pointer result = client->request(request, TIMEOUT); - std::cout << *result << std::endl; - } catch (RPCRequestException &e) - { - std::cout << e.what() << std::endl; - return 1; - } - - // async way, allows multiple RPC calls on the clinet instance - try - { - RPCClient::shared_pointer client = RPCClient::create("sum"); - client->issueConnect(); - if (client->waitConnect(TIMEOUT)) - { - client->issueRequest(request); - PVStructure::shared_pointer result = client->waitResponse(TIMEOUT); - std::cout << *result << std::endl; - } - else - throw std::runtime_error("connection timeout"); + std::cout << "Error: " << *result << std::endl; } catch (std::exception &e) { std::cout << e.what() << std::endl; return 1; } + + std::cout<<"simple sync way, allows multiple RPC calls on the client instance\n"; + try + { + RPCClient::shared_pointer client = RPCClient::create("sum"); + PVStructure::shared_pointer result = client->request(request, TIMEOUT); + std::cout << *result << std::endl; + } catch (std::exception &e) + { + std::cout << "Error: " << e.what() << std::endl; + return 1; + } + + std::cout<<"async way, allows multiple RPC calls on the client instance\n"; + try + { + RPCClient::shared_pointer client = RPCClient::create("sum"); + client->issueRequest(request); + // go get some coffee + PVStructure::shared_pointer result = client->waitResponse(TIMEOUT); + std::cout << *result << std::endl; + } catch (std::exception &e) + { + std::cout << "Error:" << e.what() << std::endl; + return 1; + } + return 0; }