diff --git a/pvAccessCPP.files b/pvAccessCPP.files index 9e5a3ae..5617612 100644 --- a/pvAccessCPP.files +++ b/pvAccessCPP.files @@ -923,3 +923,40 @@ testApp/rtemsNetworking.h testApp/rtemsTestHarness.c src/ca/caStatus.cpp src/ca/caStatus.h +testApp/client/O.darwin-x86/Makefile +testApp/O.darwin-x86/Makefile +testApp/remote/O.darwin-x86/Makefile +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/Makefile +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceAsyncExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/rpcWildServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/O.darwin-x86/Makefile +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/Makefile +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +testApp/Makefile +testApp/pvAccessAllTests.c +testApp/rtemsConfig.c +testApp/rtemsNetworking.h +testApp/rtemsTestHarness.c diff --git a/src/rpcService/rpcServer.cpp b/src/rpcService/rpcServer.cpp index d356ab7..b868eaf 100644 --- a/src/rpcService/rpcServer.cpp +++ b/src/rpcService/rpcServer.cpp @@ -20,19 +20,20 @@ namespace epics { namespace pvAccess { class ChannelRPCServiceImpl : public ChannelRPC, + public RPCResponseCallback, public std::tr1::enable_shared_from_this { private: Channel::shared_pointer m_channel; ChannelRPCRequester::shared_pointer m_channelRPCRequester; - RPCService::shared_pointer m_rpcService; + Service::shared_pointer m_rpcService; AtomicBoolean m_lastRequest; public: ChannelRPCServiceImpl( Channel::shared_pointer const & channel, ChannelRPCRequester::shared_pointer const & channelRPCRequester, - RPCService::shared_pointer const & rpcService) : + Service::shared_pointer const & rpcService) : m_channel(channel), m_channelRPCRequester(channelRPCRequester), m_rpcService(rpcService), @@ -45,14 +46,15 @@ class ChannelRPCServiceImpl : destroy(); } - void processRequest(epics::pvData::PVStructure::shared_pointer const & pvArgument) + void processRequest(RPCService::shared_pointer const & service, + epics::pvData::PVStructure::shared_pointer const & pvArgument) { epics::pvData::PVStructure::shared_pointer result; Status status = Status::Ok; bool ok = true; try { - result = m_rpcService->request(pvArgument); + result = service->request(pvArgument); } catch (RPCRequestException& rre) { @@ -84,9 +86,66 @@ class ChannelRPCServiceImpl : } + virtual void requestDone( + epics::pvData::Status const & status, + epics::pvData::PVStructure::shared_pointer const & result + ) + { + m_channelRPCRequester->requestDone(status, shared_from_this(), result); + + if (m_lastRequest.get()) + destroy(); + } + + void processRequest(RPCServiceAsync::shared_pointer const & service, + epics::pvData::PVStructure::shared_pointer const & pvArgument) + { + try + { + service->request(pvArgument, shared_from_this()); + } + catch (std::exception& ex) + { + // handle user unexpected errors + Status errorStatus(Status::STATUSTYPE_FATAL, ex.what()); + + m_channelRPCRequester->requestDone(errorStatus, shared_from_this(), PVStructure::shared_pointer()); + + if (m_lastRequest.get()) + destroy(); + } + catch (...) + { + // handle user unexpected errors + Status errorStatus(Status::STATUSTYPE_FATAL, + "Unexpected exception caught while calling RPCServiceAsync.request(PVStructure, RPCResponseCallback)."); + + m_channelRPCRequester->requestDone(errorStatus, shared_from_this(), PVStructure::shared_pointer()); + + if (m_lastRequest.get()) + destroy(); + } + + // we wait for callback to be called + } + virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument) { - processRequest(pvArgument); + RPCService::shared_pointer rpcService = + std::tr1::dynamic_pointer_cast(m_rpcService); + if (rpcService) + { + processRequest(rpcService, pvArgument); + return; + } + + RPCServiceAsync::shared_pointer rpcServiceAsync = + std::tr1::dynamic_pointer_cast(m_rpcService); + if (rpcServiceAsync) + { + processRequest(rpcServiceAsync, pvArgument); + return; + } } void lastRequest() @@ -138,7 +197,7 @@ private: string m_channelName; ChannelRequester::shared_pointer m_channelRequester; - RPCService::shared_pointer m_rpcService; + Service::shared_pointer m_rpcService; public: POINTER_DEFINITIONS(RPCChannel); @@ -147,7 +206,7 @@ public: ChannelProvider::shared_pointer const & provider, string const & channelName, ChannelRequester::shared_pointer const & channelRequester, - RPCService::shared_pointer const & rpcService) : + Service::shared_pointer const & rpcService) : m_provider(provider), m_channelName(channelName), m_channelRequester(channelRequester), @@ -325,7 +384,7 @@ Status RPCChannel::destroyedStatus(Status::STATUSTYPE_ERROR, "channel destroyed" Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider, std::string const & channelName, ChannelRequester::shared_pointer const & channelRequester, - RPCService::shared_pointer const & rpcService) + Service::shared_pointer const & rpcService) { // TODO use std::make_shared std::tr1::shared_ptr tp( @@ -407,7 +466,7 @@ public: ChannelRequester::shared_pointer const & channelRequester, short /*priority*/) { - RPCService::shared_pointer service; + Service::shared_pointer service; RPCServiceMap::const_iterator iter; { @@ -450,7 +509,7 @@ public: throw std::runtime_error("not supported"); } - void registerService(std::string const & serviceName, RPCService::shared_pointer const & service) + void registerService(std::string const & serviceName, Service::shared_pointer const & service) { Lock guard(m_mutex); m_services[serviceName] = service; @@ -458,7 +517,7 @@ public: if (isWildcardPattern(serviceName)) m_wildServices.push_back(std::make_pair(serviceName, service)); } - + void unregisterService(std::string const & serviceName) { Lock guard(m_mutex); @@ -479,7 +538,7 @@ public: private: // assumes sync on services - RPCService::shared_pointer findWildService(string const & wildcard) + Service::shared_pointer findWildService(string const & wildcard) { if (!m_wildServices.empty()) for (RPCWildServiceList::iterator iter = m_wildServices.begin(); @@ -488,7 +547,7 @@ private: if (Wildcard::wildcardfit(iter->first.c_str(), wildcard.c_str())) return iter->second; - return RPCService::shared_pointer(); + return Service::shared_pointer(); } // (too) simple check @@ -500,10 +559,10 @@ private: (pattern.find('[') != string::npos && pattern.find(']') != string::npos)); } - typedef std::map RPCServiceMap; + typedef std::map RPCServiceMap; RPCServiceMap m_services; - typedef std::vector > RPCWildServiceList; + typedef std::vector > RPCWildServiceList; RPCWildServiceList m_wildServices; epics::pvData::Mutex m_mutex; @@ -619,6 +678,11 @@ void RPCServer::registerService(std::string const & serviceName, RPCService::sha std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->registerService(serviceName, service); } +void RPCServer::registerService(std::string const & serviceName, RPCServiceAsync::shared_pointer const & service) +{ + std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->registerService(serviceName, service); +} + void RPCServer::unregisterService(std::string const & serviceName) { std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->unregisterService(serviceName); diff --git a/src/rpcService/rpcServer.h b/src/rpcService/rpcServer.h index 9009deb..795be36 100644 --- a/src/rpcService/rpcServer.h +++ b/src/rpcService/rpcServer.h @@ -47,6 +47,8 @@ class epicsShareClass RPCServer : void registerService(std::string const & serviceName, RPCService::shared_pointer const & service); + void registerService(std::string const & serviceName, RPCServiceAsync::shared_pointer const & service); + void unregisterService(std::string const & serviceName); void run(int seconds = 0); @@ -67,7 +69,7 @@ class epicsShareClass RPCServer : epicsShareExtern Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider, std::string const & channelName, ChannelRequester::shared_pointer const & channelRequester, - RPCService::shared_pointer const & rpcService); + Service::shared_pointer const & rpcService); }} diff --git a/src/rpcService/rpcService.h b/src/rpcService/rpcService.h index 8b6180d..c5e890e 100644 --- a/src/rpcService/rpcService.h +++ b/src/rpcService/rpcService.h @@ -44,9 +44,18 @@ private: epics::pvData::Status::StatusType m_status; }; +class epicsShareClass Service +{ +public: + POINTER_DEFINITIONS(Service); -class epicsShareClass RPCService { - public: + virtual ~Service() {}; +}; + +class epicsShareClass RPCService : + public virtual Service +{ +public: POINTER_DEFINITIONS(RPCService); virtual ~RPCService() {}; @@ -56,6 +65,35 @@ class epicsShareClass RPCService { ) throw (RPCRequestException) = 0; }; + + +class epicsShareClass RPCResponseCallback +{ +public: + POINTER_DEFINITIONS(RPCResponseCallback); + + virtual ~RPCResponseCallback() {}; + + virtual void requestDone( + epics::pvData::Status const & status, + epics::pvData::PVStructure::shared_pointer const & result + ) = 0; +}; + +class epicsShareClass RPCServiceAsync : + public virtual Service +{ +public: + POINTER_DEFINITIONS(RPCServiceAsync); + + virtual ~RPCServiceAsync() {}; + + virtual void request( + epics::pvData::PVStructure::shared_pointer const & args, + RPCResponseCallback::shared_pointer const & callback + ) = 0; +}; + }} #endif /* RPCSERVICE_H */ diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 017b00b..9ef227e 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -36,6 +36,9 @@ testMonitorPerformance_SRCS += testMonitorPerformance.cpp PROD_HOST += rpcServiceExample rpcServiceExample_SRCS += rpcServiceExample.cpp +PROD_HOST += rpcServiceAsyncExample +rpcServiceAsyncExample_SRCS += rpcServiceAsyncExample.cpp + PROD_HOST += rpcWildServiceExample rpcWildServiceExample_SRCS += rpcWildServiceExample.cpp diff --git a/testApp/remote/rpcServiceAsyncExample.cpp b/testApp/remote/rpcServiceAsyncExample.cpp new file mode 100644 index 0000000..1484178 --- /dev/null +++ b/testApp/remote/rpcServiceAsyncExample.cpp @@ -0,0 +1,87 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +using namespace epics::pvData; +using namespace epics::pvAccess; + +static Structure::const_shared_pointer resultStructure = + getFieldCreate()->createFieldBuilder()-> + add("c", pvDouble)-> + createStructure(); + +// s1 starts with s2 check +static bool starts_with(const std::string& s1, const std::string& s2) { + return s2.size() <= s1.size() && s1.compare(0, s2.size(), s2) == 0; +} + +class SumServiceImpl : + public RPCServiceAsync +{ + void request(PVStructure::shared_pointer const & pvArguments, + RPCResponseCallback::shared_pointer const & callback) + { + // NTURI support + PVStructure::shared_pointer args( + (starts_with(pvArguments->getStructure()->getID(), "epics:nt/NTURI:1.")) ? + pvArguments->getStructureField("query") : + pvArguments + ); + + // get fields and check their existence + PVScalar::shared_pointer af = args->getSubField("a"); + PVScalar::shared_pointer bf = args->getSubField("b"); + if (!af || !bf) + { + callback->requestDone( + Status( + Status::STATUSTYPE_ERROR, + "scalar 'a' and 'b' fields are required" + ), + PVStructure::shared_pointer()); + return; + } + + // get the numbers (and convert if neccessary) + double a, b; + try + { + a = af->getAs(); + b = bf->getAs(); + } + catch (std::exception &e) + { + callback->requestDone( + Status( + Status::STATUSTYPE_ERROR, + std::string("failed to convert arguments to double: ") + e.what() + ), + PVStructure::shared_pointer()); + return; + } + + // create return structure and set data + PVStructure::shared_pointer result = getPVDataCreate()->createPVStructure(resultStructure); + result->getSubField("c")->put(a+b); + + callback->requestDone(Status::Ok, result); + } +}; + +int main() +{ + RPCServer server; + + server.registerService("sum", RPCServiceAsync::shared_pointer(new SumServiceImpl())); + // you can register as many services as you want here ... + + server.printInfo(); + server.run(); + + return 0; +}