From 5d843d35877fed034fdaefd0b2106404bbefd66d Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Mon, 10 Jul 2017 17:17:07 +0200 Subject: [PATCH] RPC cleanup: RPCService is a sub-set of RPCServiceAsync --- src/rpcService/pv/rpcServer.h | 4 +- src/rpcService/pv/rpcService.h | 50 +++++++++--------- src/rpcService/rpcServer.cpp | 91 +++++-------------------------- src/rpcService/rpcService.cpp | 27 ++++++++++ testApp/remote/testRPC.cpp | 97 ++++++++++++++++++++++++++-------- 5 files changed, 141 insertions(+), 128 deletions(-) diff --git a/src/rpcService/pv/rpcServer.h b/src/rpcService/pv/rpcServer.h index 735e940..2f08d4b 100644 --- a/src/rpcService/pv/rpcServer.h +++ b/src/rpcService/pv/rpcServer.h @@ -45,8 +45,6 @@ public: virtual ~RPCServer(); - void registerService(std::string const & serviceName, RPCService::shared_pointer const & service); - void registerService(std::string const & serviceName, RPCServiceAsync::shared_pointer const & service); void unregisterService(std::string const & serviceName); @@ -70,7 +68,7 @@ public: epicsShareFunc Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider, std::string const & channelName, ChannelRequester::shared_pointer const & channelRequester, - Service::shared_pointer const & rpcService); + RPCServiceAsync::shared_pointer const & rpcService); } } diff --git a/src/rpcService/pv/rpcService.h b/src/rpcService/pv/rpcService.h index 32eed92..deac4f8 100644 --- a/src/rpcService/pv/rpcService.h +++ b/src/rpcService/pv/rpcService.h @@ -44,33 +44,13 @@ public: return m_status; } + epics::pvData::Status asStatus() const { + return epics::pvData::Status(m_status, what()); + } private: epics::pvData::Status::StatusType m_status; }; -class epicsShareClass Service -{ -public: - POINTER_DEFINITIONS(Service); - - virtual ~Service() {}; -}; - -class epicsShareClass RPCService : - public virtual Service -{ -public: - POINTER_DEFINITIONS(RPCService); - - virtual ~RPCService() {}; - - virtual epics::pvData::PVStructure::shared_pointer request( - epics::pvData::PVStructure::shared_pointer const & args - ) = 0; -}; - - - class epicsShareClass RPCResponseCallback { public: @@ -84,8 +64,7 @@ public: ) = 0; }; -class epicsShareClass RPCServiceAsync : - public virtual Service +class epicsShareClass RPCServiceAsync { public: POINTER_DEFINITIONS(RPCServiceAsync); @@ -98,6 +77,27 @@ public: ) = 0; }; +typedef RPCServiceAsync Service EPICS_DEPRECATED; + +class epicsShareClass RPCService : + public RPCServiceAsync +{ +public: + POINTER_DEFINITIONS(RPCService); + + virtual ~RPCService() {}; + + virtual epics::pvData::PVStructure::shared_pointer request( + epics::pvData::PVStructure::shared_pointer const & args + ) = 0; + +private: + virtual void request( + epics::pvData::PVStructure::shared_pointer const & args, + RPCResponseCallback::shared_pointer const & callback + ) OVERRIDE FINAL; +}; + } } diff --git a/src/rpcService/rpcServer.cpp b/src/rpcService/rpcServer.cpp index 12d7366..c219624 100644 --- a/src/rpcService/rpcServer.cpp +++ b/src/rpcService/rpcServer.cpp @@ -28,14 +28,14 @@ class ChannelRPCServiceImpl : private: Channel::shared_pointer m_channel; ChannelRPCRequester::shared_pointer m_channelRPCRequester; - Service::shared_pointer m_rpcService; + RPCServiceAsync::shared_pointer m_rpcService; AtomicBoolean m_lastRequest; public: ChannelRPCServiceImpl( Channel::shared_pointer const & channel, ChannelRPCRequester::shared_pointer const & channelRPCRequester, - Service::shared_pointer const & rpcService) : + RPCServiceAsync::shared_pointer const & rpcService) : m_channel(channel), m_channelRPCRequester(channelRPCRequester), m_rpcService(rpcService), @@ -48,46 +48,6 @@ public: destroy(); } - void processRequest(RPCService::shared_pointer const & service, - epics::pvData::PVStructure::shared_pointer const & pvArgument) - { - epics::pvData::PVStructure::shared_pointer result; - Status status = Status::Ok; - bool ok = true; - try - { - result = service->request(pvArgument); - } - catch (RPCRequestException& rre) - { - status = Status(rre.getStatus(), rre.what()); - ok = false; - } - catch (std::exception& ex) - { - status = Status(Status::STATUSTYPE_FATAL, ex.what()); - ok = false; - } - catch (...) - { - // handle user unexpected errors - status = Status(Status::STATUSTYPE_FATAL, "Unexpected exception caught while calling RPCService.request(PVStructure)."); - ok = false; - } - - // check null result - if (ok && result.get() == 0) - { - status = Status(Status::STATUSTYPE_FATAL, "RPCService.request(PVStructure) returned null."); - } - - m_channelRPCRequester->requestDone(status, shared_from_this(), result); - - if (m_lastRequest.get()) - destroy(); - - } - virtual void requestDone( epics::pvData::Status const & status, epics::pvData::PVStructure::shared_pointer const & result @@ -99,12 +59,11 @@ public: destroy(); } - void processRequest(RPCServiceAsync::shared_pointer const & service, - epics::pvData::PVStructure::shared_pointer const & pvArgument) + virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument) { try { - service->request(pvArgument, shared_from_this()); + m_rpcService->request(pvArgument, shared_from_this()); } catch (std::exception& ex) { @@ -131,25 +90,6 @@ public: // we wait for callback to be called } - virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument) - { - RPCService::shared_pointer rpcService = - std::tr1::dynamic_pointer_cast(m_rpcService); - if (rpcService) - { - processRequest(rpcService, pvArgument); - return; - } - - RPCServiceAsync::shared_pointer rpcServiceAsync = - std::tr1::dynamic_pointer_cast(m_rpcService); - if (rpcServiceAsync) - { - processRequest(rpcServiceAsync, pvArgument); - return; - } - } - void lastRequest() { m_lastRequest.set(); @@ -186,7 +126,7 @@ private: string m_channelName; ChannelRequester::shared_pointer m_channelRequester; - Service::shared_pointer m_rpcService; + RPCServiceAsync::shared_pointer m_rpcService; public: POINTER_DEFINITIONS(RPCChannel); @@ -195,7 +135,7 @@ public: ChannelProvider::shared_pointer const & provider, string const & channelName, ChannelRequester::shared_pointer const & channelRequester, - Service::shared_pointer const & rpcService) : + RPCServiceAsync::shared_pointer const & rpcService) : m_provider(provider), m_channelName(channelName), m_channelRequester(channelRequester), @@ -289,7 +229,7 @@ public: Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider, std::string const & channelName, ChannelRequester::shared_pointer const & channelRequester, - Service::shared_pointer const & rpcService) + RPCServiceAsync::shared_pointer const & rpcService) { // TODO use std::make_shared std::tr1::shared_ptr tp( @@ -371,7 +311,7 @@ public: ChannelRequester::shared_pointer const & channelRequester, short /*priority*/) { - Service::shared_pointer service; + RPCServiceAsync::shared_pointer service; RPCServiceMap::const_iterator iter; { @@ -414,7 +354,7 @@ public: throw std::runtime_error("not supported"); } - void registerService(std::string const & serviceName, Service::shared_pointer const & service) + void registerService(std::string const & serviceName, RPCServiceAsync::shared_pointer const & service) { Lock guard(m_mutex); m_services[serviceName] = service; @@ -443,7 +383,7 @@ public: private: // assumes sync on services - Service::shared_pointer findWildService(string const & wildcard) + RPCServiceAsync::shared_pointer findWildService(string const & wildcard) { if (!m_wildServices.empty()) for (RPCWildServiceList::iterator iter = m_wildServices.begin(); @@ -452,7 +392,7 @@ private: if (Wildcard::wildcardfit(iter->first.c_str(), wildcard.c_str())) return iter->second; - return Service::shared_pointer(); + return RPCServiceAsync::shared_pointer(); } // (too) simple check @@ -464,10 +404,10 @@ private: (pattern.find('[') != string::npos && pattern.find(']') != string::npos)); } - typedef std::map RPCServiceMap; + typedef std::map RPCServiceMap; RPCServiceMap m_services; - typedef std::vector > RPCWildServiceList; + typedef std::vector > RPCWildServiceList; RPCWildServiceList m_wildServices; epics::pvData::Mutex m_mutex; @@ -539,11 +479,6 @@ void RPCServer::destroy() m_serverContext->shutdown(); } -void RPCServer::registerService(std::string const & serviceName, RPCService::shared_pointer const & service) -{ - std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->registerService(serviceName, service); -} - void RPCServer::registerService(std::string const & serviceName, RPCServiceAsync::shared_pointer const & service) { std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->registerService(serviceName, service); diff --git a/src/rpcService/rpcService.cpp b/src/rpcService/rpcService.cpp index db61486..c396e9c 100644 --- a/src/rpcService/rpcService.cpp +++ b/src/rpcService/rpcService.cpp @@ -6,3 +6,30 @@ #define epicsExportSharedSymbols #include + +namespace pvd = epics::pvData; + +namespace epics{namespace pvAccess{ + +void RPCService::request( + pvd::PVStructure::shared_pointer const & args, + RPCResponseCallback::shared_pointer const & callback) +{ + assert(callback && args); + pvd::PVStructure::shared_pointer ret; + pvd::Status sts; + try { + ret = request(args); + }catch(RPCRequestException& e){ + sts = e.asStatus(); + throw; + }catch(std::exception& e){ + sts = pvd::Status::error(e.what()); + } + if(!ret) { + sts = pvd::Status(pvd::Status::STATUSTYPE_FATAL, "RPCService.request(PVStructure) returned null."); + } + callback->requestDone(sts, ret); +} + +}} // namespace epics::pvAccess diff --git a/testApp/remote/testRPC.cpp b/testApp/remote/testRPC.cpp index ac2dd53..cd3cd47 100644 --- a/testApp/remote/testRPC.cpp +++ b/testApp/remote/testRPC.cpp @@ -41,12 +41,76 @@ struct SumService : public pva::RPCService } }; +void testSum(const pva::ChannelProvider::shared_pointer& cli_prov) +{ + pva::RPCClient client("sum", pvd::createRequest("field()"), cli_prov); + + pvd::ValueBuilder args("epics:nt/NTURI:1.0"); + args.add("scheme", "pva") + .add("path", "sum"); + + pvd::PVStructurePtr reply; + + testDiag("Request"); + reply = client.request(args.addNested("query") + .add("lhs", 5.0) + .add("rhs", 3.0) + .endNested() + .buildPVStructure()); + + pvd::int32 value = reply->getSubFieldT("value")->getAs(); + testOk(value==8, "Reply value = %d", (unsigned)value); + + testDiag("Wait for connect (already connected)"); + testOk1(client.waitConnect()); + +} + + + +struct FailService : public pva::RPCService +{ + virtual epics::pvData::PVStructure::shared_pointer request( + epics::pvData::PVStructure::shared_pointer const & args + ) OVERRIDE FINAL + { + testDiag("failing()"); + throw std::runtime_error("oops"); + } +}; + +void testRPCFail(const pva::ChannelProvider::shared_pointer& cli_prov) +{ + + testDiag("Fail"); + + pva::RPCClient client("fail", pvd::createRequest("field()"), cli_prov); + + pvd::ValueBuilder args("epics:nt/NTURI:1.0"); + args.add("scheme", "pva") + .add("path", "fail"); + + + testDiag("Request"); + try{ + (void)client.request(args.addNested("query") + .add("lhs", 5.0) + .add("rhs", 3.0) + .endNested() + .buildPVStructure()); + testFail("Missing expected exception"); + }catch(pva::RPCRequestException& e){ + testPass("caught expected rpc exception: %s", e.what()); + }catch(std::exception& e){ + testFail("caught un-expected exception: %s", e.what()); + } +} } // namespace MAIN(testRPC) { - testPlan(2); + testPlan(3); try { pva::Configuration::shared_pointer conf(pva::ConfigurationBuilder() //.push_env() @@ -65,8 +129,14 @@ MAIN(testRPC) serv.getServer()->getServerPort(), serv.getServer()->getBroadcastPort()); - std::tr1::shared_ptr service(new SumService); - serv.registerService("sum", service); + { + std::tr1::shared_ptr service(new SumService); + serv.registerService("sum", service); + } + { + std::tr1::shared_ptr service(new FailService); + serv.registerService("fail", service); + } testDiag("Client Setup"); pva::ClientFactory::start(); @@ -75,26 +145,9 @@ MAIN(testRPC) if(!cli_prov) testAbort("No pva provider"); testDiag("Client Ready"); - pva::RPCClient client("sum", pvd::createRequest("field()"), cli_prov); - pvd::ValueBuilder args("epics:nt/NTURI:1.0"); - args.add("scheme", "pva") - .add("path", "sum"); - - pvd::PVStructurePtr reply; - - testDiag("Request"); - reply = client.request(args.addNested("query") - .add("lhs", 5.0) - .add("rhs", 3.0) - .endNested() - .buildPVStructure()); - - pvd::int32 value = reply->getSubFieldT("value")->getAs(); - testOk(value==8, "Reply value = %d", (unsigned)value); - - testDiag("Wait for connect (already connected)"); - testOk1(client.waitConnect()); + testSum(cli_prov); + testRPCFail(cli_prov); }catch(std::exception& e){ PRINT_EXCEPTION(e);