async RPC service and example

This commit is contained in:
Matej Sekoranja
2015-02-08 22:46:05 +01:00
parent 7eb0087d64
commit 4e474a75fd
6 changed files with 249 additions and 18 deletions

View File

@@ -20,19 +20,20 @@ namespace epics { namespace pvAccess {
class ChannelRPCServiceImpl :
public ChannelRPC,
public RPCResponseCallback,
public std::tr1::enable_shared_from_this<ChannelRPCServiceImpl>
{
private:
Channel::shared_pointer m_channel;
ChannelRPCRequester::shared_pointer m_channelRPCRequester;
RPCService::shared_pointer m_rpcService;
Service::shared_pointer m_rpcService;
AtomicBoolean m_lastRequest;
public:
ChannelRPCServiceImpl(
Channel::shared_pointer const & channel,
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
RPCService::shared_pointer const & rpcService) :
Service::shared_pointer const & rpcService) :
m_channel(channel),
m_channelRPCRequester(channelRPCRequester),
m_rpcService(rpcService),
@@ -45,14 +46,15 @@ class ChannelRPCServiceImpl :
destroy();
}
void processRequest(epics::pvData::PVStructure::shared_pointer const & pvArgument)
void processRequest(RPCService::shared_pointer const & service,
epics::pvData::PVStructure::shared_pointer const & pvArgument)
{
epics::pvData::PVStructure::shared_pointer result;
Status status = Status::Ok;
bool ok = true;
try
{
result = m_rpcService->request(pvArgument);
result = service->request(pvArgument);
}
catch (RPCRequestException& rre)
{
@@ -84,9 +86,66 @@ class ChannelRPCServiceImpl :
}
virtual void requestDone(
epics::pvData::Status const & status,
epics::pvData::PVStructure::shared_pointer const & result
)
{
m_channelRPCRequester->requestDone(status, shared_from_this(), result);
if (m_lastRequest.get())
destroy();
}
void processRequest(RPCServiceAsync::shared_pointer const & service,
epics::pvData::PVStructure::shared_pointer const & pvArgument)
{
try
{
service->request(pvArgument, shared_from_this());
}
catch (std::exception& ex)
{
// handle user unexpected errors
Status errorStatus(Status::STATUSTYPE_FATAL, ex.what());
m_channelRPCRequester->requestDone(errorStatus, shared_from_this(), PVStructure::shared_pointer());
if (m_lastRequest.get())
destroy();
}
catch (...)
{
// handle user unexpected errors
Status errorStatus(Status::STATUSTYPE_FATAL,
"Unexpected exception caught while calling RPCServiceAsync.request(PVStructure, RPCResponseCallback).");
m_channelRPCRequester->requestDone(errorStatus, shared_from_this(), PVStructure::shared_pointer());
if (m_lastRequest.get())
destroy();
}
// we wait for callback to be called
}
virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument)
{
processRequest(pvArgument);
RPCService::shared_pointer rpcService =
std::tr1::dynamic_pointer_cast<RPCService>(m_rpcService);
if (rpcService)
{
processRequest(rpcService, pvArgument);
return;
}
RPCServiceAsync::shared_pointer rpcServiceAsync =
std::tr1::dynamic_pointer_cast<RPCServiceAsync>(m_rpcService);
if (rpcServiceAsync)
{
processRequest(rpcServiceAsync, pvArgument);
return;
}
}
void lastRequest()
@@ -138,7 +197,7 @@ private:
string m_channelName;
ChannelRequester::shared_pointer m_channelRequester;
RPCService::shared_pointer m_rpcService;
Service::shared_pointer m_rpcService;
public:
POINTER_DEFINITIONS(RPCChannel);
@@ -147,7 +206,7 @@ public:
ChannelProvider::shared_pointer const & provider,
string const & channelName,
ChannelRequester::shared_pointer const & channelRequester,
RPCService::shared_pointer const & rpcService) :
Service::shared_pointer const & rpcService) :
m_provider(provider),
m_channelName(channelName),
m_channelRequester(channelRequester),
@@ -325,7 +384,7 @@ Status RPCChannel::destroyedStatus(Status::STATUSTYPE_ERROR, "channel destroyed"
Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider,
std::string const & channelName,
ChannelRequester::shared_pointer const & channelRequester,
RPCService::shared_pointer const & rpcService)
Service::shared_pointer const & rpcService)
{
// TODO use std::make_shared
std::tr1::shared_ptr<RPCChannel> tp(
@@ -407,7 +466,7 @@ public:
ChannelRequester::shared_pointer const & channelRequester,
short /*priority*/)
{
RPCService::shared_pointer service;
Service::shared_pointer service;
RPCServiceMap::const_iterator iter;
{
@@ -450,7 +509,7 @@ public:
throw std::runtime_error("not supported");
}
void registerService(std::string const & serviceName, RPCService::shared_pointer const & service)
void registerService(std::string const & serviceName, Service::shared_pointer const & service)
{
Lock guard(m_mutex);
m_services[serviceName] = service;
@@ -458,7 +517,7 @@ public:
if (isWildcardPattern(serviceName))
m_wildServices.push_back(std::make_pair(serviceName, service));
}
void unregisterService(std::string const & serviceName)
{
Lock guard(m_mutex);
@@ -479,7 +538,7 @@ public:
private:
// assumes sync on services
RPCService::shared_pointer findWildService(string const & wildcard)
Service::shared_pointer findWildService(string const & wildcard)
{
if (!m_wildServices.empty())
for (RPCWildServiceList::iterator iter = m_wildServices.begin();
@@ -488,7 +547,7 @@ private:
if (Wildcard::wildcardfit(iter->first.c_str(), wildcard.c_str()))
return iter->second;
return RPCService::shared_pointer();
return Service::shared_pointer();
}
// (too) simple check
@@ -500,10 +559,10 @@ private:
(pattern.find('[') != string::npos && pattern.find(']') != string::npos));
}
typedef std::map<string, RPCService::shared_pointer> RPCServiceMap;
typedef std::map<string, Service::shared_pointer> RPCServiceMap;
RPCServiceMap m_services;
typedef std::vector<std::pair<string, RPCService::shared_pointer> > RPCWildServiceList;
typedef std::vector<std::pair<string, Service::shared_pointer> > RPCWildServiceList;
RPCWildServiceList m_wildServices;
epics::pvData::Mutex m_mutex;
@@ -619,6 +678,11 @@ void RPCServer::registerService(std::string const & serviceName, RPCService::sha
std::tr1::dynamic_pointer_cast<RPCChannelProvider>(m_channelProviderImpl)->registerService(serviceName, service);
}
void RPCServer::registerService(std::string const & serviceName, RPCServiceAsync::shared_pointer const & service)
{
std::tr1::dynamic_pointer_cast<RPCChannelProvider>(m_channelProviderImpl)->registerService(serviceName, service);
}
void RPCServer::unregisterService(std::string const & serviceName)
{
std::tr1::dynamic_pointer_cast<RPCChannelProvider>(m_channelProviderImpl)->unregisterService(serviceName);