RPC cleanup: RPCService is a sub-set of RPCServiceAsync
This commit is contained in:
@@ -45,8 +45,6 @@ public:
|
||||
|
||||
virtual ~RPCServer();
|
||||
|
||||
void registerService(std::string const & serviceName, RPCService::shared_pointer const & service);
|
||||
|
||||
void registerService(std::string const & serviceName, RPCServiceAsync::shared_pointer const & service);
|
||||
|
||||
void unregisterService(std::string const & serviceName);
|
||||
@@ -70,7 +68,7 @@ public:
|
||||
epicsShareFunc Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider,
|
||||
std::string const & channelName,
|
||||
ChannelRequester::shared_pointer const & channelRequester,
|
||||
Service::shared_pointer const & rpcService);
|
||||
RPCServiceAsync::shared_pointer const & rpcService);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,33 +44,13 @@ public:
|
||||
return m_status;
|
||||
}
|
||||
|
||||
epics::pvData::Status asStatus() const {
|
||||
return epics::pvData::Status(m_status, what());
|
||||
}
|
||||
private:
|
||||
epics::pvData::Status::StatusType m_status;
|
||||
};
|
||||
|
||||
class epicsShareClass Service
|
||||
{
|
||||
public:
|
||||
POINTER_DEFINITIONS(Service);
|
||||
|
||||
virtual ~Service() {};
|
||||
};
|
||||
|
||||
class epicsShareClass RPCService :
|
||||
public virtual Service
|
||||
{
|
||||
public:
|
||||
POINTER_DEFINITIONS(RPCService);
|
||||
|
||||
virtual ~RPCService() {};
|
||||
|
||||
virtual epics::pvData::PVStructure::shared_pointer request(
|
||||
epics::pvData::PVStructure::shared_pointer const & args
|
||||
) = 0;
|
||||
};
|
||||
|
||||
|
||||
|
||||
class epicsShareClass RPCResponseCallback
|
||||
{
|
||||
public:
|
||||
@@ -84,8 +64,7 @@ public:
|
||||
) = 0;
|
||||
};
|
||||
|
||||
class epicsShareClass RPCServiceAsync :
|
||||
public virtual Service
|
||||
class epicsShareClass RPCServiceAsync
|
||||
{
|
||||
public:
|
||||
POINTER_DEFINITIONS(RPCServiceAsync);
|
||||
@@ -98,6 +77,27 @@ public:
|
||||
) = 0;
|
||||
};
|
||||
|
||||
typedef RPCServiceAsync Service EPICS_DEPRECATED;
|
||||
|
||||
class epicsShareClass RPCService :
|
||||
public RPCServiceAsync
|
||||
{
|
||||
public:
|
||||
POINTER_DEFINITIONS(RPCService);
|
||||
|
||||
virtual ~RPCService() {};
|
||||
|
||||
virtual epics::pvData::PVStructure::shared_pointer request(
|
||||
epics::pvData::PVStructure::shared_pointer const & args
|
||||
) = 0;
|
||||
|
||||
private:
|
||||
virtual void request(
|
||||
epics::pvData::PVStructure::shared_pointer const & args,
|
||||
RPCResponseCallback::shared_pointer const & callback
|
||||
) OVERRIDE FINAL;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -28,14 +28,14 @@ class ChannelRPCServiceImpl :
|
||||
private:
|
||||
Channel::shared_pointer m_channel;
|
||||
ChannelRPCRequester::shared_pointer m_channelRPCRequester;
|
||||
Service::shared_pointer m_rpcService;
|
||||
RPCServiceAsync::shared_pointer m_rpcService;
|
||||
AtomicBoolean m_lastRequest;
|
||||
|
||||
public:
|
||||
ChannelRPCServiceImpl(
|
||||
Channel::shared_pointer const & channel,
|
||||
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
|
||||
Service::shared_pointer const & rpcService) :
|
||||
RPCServiceAsync::shared_pointer const & rpcService) :
|
||||
m_channel(channel),
|
||||
m_channelRPCRequester(channelRPCRequester),
|
||||
m_rpcService(rpcService),
|
||||
@@ -48,46 +48,6 @@ public:
|
||||
destroy();
|
||||
}
|
||||
|
||||
void processRequest(RPCService::shared_pointer const & service,
|
||||
epics::pvData::PVStructure::shared_pointer const & pvArgument)
|
||||
{
|
||||
epics::pvData::PVStructure::shared_pointer result;
|
||||
Status status = Status::Ok;
|
||||
bool ok = true;
|
||||
try
|
||||
{
|
||||
result = service->request(pvArgument);
|
||||
}
|
||||
catch (RPCRequestException& rre)
|
||||
{
|
||||
status = Status(rre.getStatus(), rre.what());
|
||||
ok = false;
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
status = Status(Status::STATUSTYPE_FATAL, ex.what());
|
||||
ok = false;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// handle user unexpected errors
|
||||
status = Status(Status::STATUSTYPE_FATAL, "Unexpected exception caught while calling RPCService.request(PVStructure).");
|
||||
ok = false;
|
||||
}
|
||||
|
||||
// check null result
|
||||
if (ok && result.get() == 0)
|
||||
{
|
||||
status = Status(Status::STATUSTYPE_FATAL, "RPCService.request(PVStructure) returned null.");
|
||||
}
|
||||
|
||||
m_channelRPCRequester->requestDone(status, shared_from_this(), result);
|
||||
|
||||
if (m_lastRequest.get())
|
||||
destroy();
|
||||
|
||||
}
|
||||
|
||||
virtual void requestDone(
|
||||
epics::pvData::Status const & status,
|
||||
epics::pvData::PVStructure::shared_pointer const & result
|
||||
@@ -99,12 +59,11 @@ public:
|
||||
destroy();
|
||||
}
|
||||
|
||||
void processRequest(RPCServiceAsync::shared_pointer const & service,
|
||||
epics::pvData::PVStructure::shared_pointer const & pvArgument)
|
||||
virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument)
|
||||
{
|
||||
try
|
||||
{
|
||||
service->request(pvArgument, shared_from_this());
|
||||
m_rpcService->request(pvArgument, shared_from_this());
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
@@ -131,25 +90,6 @@ public:
|
||||
// we wait for callback to be called
|
||||
}
|
||||
|
||||
virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument)
|
||||
{
|
||||
RPCService::shared_pointer rpcService =
|
||||
std::tr1::dynamic_pointer_cast<RPCService>(m_rpcService);
|
||||
if (rpcService)
|
||||
{
|
||||
processRequest(rpcService, pvArgument);
|
||||
return;
|
||||
}
|
||||
|
||||
RPCServiceAsync::shared_pointer rpcServiceAsync =
|
||||
std::tr1::dynamic_pointer_cast<RPCServiceAsync>(m_rpcService);
|
||||
if (rpcServiceAsync)
|
||||
{
|
||||
processRequest(rpcServiceAsync, pvArgument);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void lastRequest()
|
||||
{
|
||||
m_lastRequest.set();
|
||||
@@ -186,7 +126,7 @@ private:
|
||||
string m_channelName;
|
||||
ChannelRequester::shared_pointer m_channelRequester;
|
||||
|
||||
Service::shared_pointer m_rpcService;
|
||||
RPCServiceAsync::shared_pointer m_rpcService;
|
||||
|
||||
public:
|
||||
POINTER_DEFINITIONS(RPCChannel);
|
||||
@@ -195,7 +135,7 @@ public:
|
||||
ChannelProvider::shared_pointer const & provider,
|
||||
string const & channelName,
|
||||
ChannelRequester::shared_pointer const & channelRequester,
|
||||
Service::shared_pointer const & rpcService) :
|
||||
RPCServiceAsync::shared_pointer const & rpcService) :
|
||||
m_provider(provider),
|
||||
m_channelName(channelName),
|
||||
m_channelRequester(channelRequester),
|
||||
@@ -289,7 +229,7 @@ public:
|
||||
Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider,
|
||||
std::string const & channelName,
|
||||
ChannelRequester::shared_pointer const & channelRequester,
|
||||
Service::shared_pointer const & rpcService)
|
||||
RPCServiceAsync::shared_pointer const & rpcService)
|
||||
{
|
||||
// TODO use std::make_shared
|
||||
std::tr1::shared_ptr<RPCChannel> tp(
|
||||
@@ -371,7 +311,7 @@ public:
|
||||
ChannelRequester::shared_pointer const & channelRequester,
|
||||
short /*priority*/)
|
||||
{
|
||||
Service::shared_pointer service;
|
||||
RPCServiceAsync::shared_pointer service;
|
||||
|
||||
RPCServiceMap::const_iterator iter;
|
||||
{
|
||||
@@ -414,7 +354,7 @@ public:
|
||||
throw std::runtime_error("not supported");
|
||||
}
|
||||
|
||||
void registerService(std::string const & serviceName, Service::shared_pointer const & service)
|
||||
void registerService(std::string const & serviceName, RPCServiceAsync::shared_pointer const & service)
|
||||
{
|
||||
Lock guard(m_mutex);
|
||||
m_services[serviceName] = service;
|
||||
@@ -443,7 +383,7 @@ public:
|
||||
|
||||
private:
|
||||
// assumes sync on services
|
||||
Service::shared_pointer findWildService(string const & wildcard)
|
||||
RPCServiceAsync::shared_pointer findWildService(string const & wildcard)
|
||||
{
|
||||
if (!m_wildServices.empty())
|
||||
for (RPCWildServiceList::iterator iter = m_wildServices.begin();
|
||||
@@ -452,7 +392,7 @@ private:
|
||||
if (Wildcard::wildcardfit(iter->first.c_str(), wildcard.c_str()))
|
||||
return iter->second;
|
||||
|
||||
return Service::shared_pointer();
|
||||
return RPCServiceAsync::shared_pointer();
|
||||
}
|
||||
|
||||
// (too) simple check
|
||||
@@ -464,10 +404,10 @@ private:
|
||||
(pattern.find('[') != string::npos && pattern.find(']') != string::npos));
|
||||
}
|
||||
|
||||
typedef std::map<string, Service::shared_pointer> RPCServiceMap;
|
||||
typedef std::map<string, RPCServiceAsync::shared_pointer> RPCServiceMap;
|
||||
RPCServiceMap m_services;
|
||||
|
||||
typedef std::vector<std::pair<string, Service::shared_pointer> > RPCWildServiceList;
|
||||
typedef std::vector<std::pair<string, RPCServiceAsync::shared_pointer> > RPCWildServiceList;
|
||||
RPCWildServiceList m_wildServices;
|
||||
|
||||
epics::pvData::Mutex m_mutex;
|
||||
@@ -539,11 +479,6 @@ void RPCServer::destroy()
|
||||
m_serverContext->shutdown();
|
||||
}
|
||||
|
||||
void RPCServer::registerService(std::string const & serviceName, RPCService::shared_pointer const & service)
|
||||
{
|
||||
std::tr1::dynamic_pointer_cast<RPCChannelProvider>(m_channelProviderImpl)->registerService(serviceName, service);
|
||||
}
|
||||
|
||||
void RPCServer::registerService(std::string const & serviceName, RPCServiceAsync::shared_pointer const & service)
|
||||
{
|
||||
std::tr1::dynamic_pointer_cast<RPCChannelProvider>(m_channelProviderImpl)->registerService(serviceName, service);
|
||||
|
||||
@@ -6,3 +6,30 @@
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
#include <pv/rpcService.h>
|
||||
|
||||
namespace pvd = epics::pvData;
|
||||
|
||||
namespace epics{namespace pvAccess{
|
||||
|
||||
void RPCService::request(
|
||||
pvd::PVStructure::shared_pointer const & args,
|
||||
RPCResponseCallback::shared_pointer const & callback)
|
||||
{
|
||||
assert(callback && args);
|
||||
pvd::PVStructure::shared_pointer ret;
|
||||
pvd::Status sts;
|
||||
try {
|
||||
ret = request(args);
|
||||
}catch(RPCRequestException& e){
|
||||
sts = e.asStatus();
|
||||
throw;
|
||||
}catch(std::exception& e){
|
||||
sts = pvd::Status::error(e.what());
|
||||
}
|
||||
if(!ret) {
|
||||
sts = pvd::Status(pvd::Status::STATUSTYPE_FATAL, "RPCService.request(PVStructure) returned null.");
|
||||
}
|
||||
callback->requestDone(sts, ret);
|
||||
}
|
||||
|
||||
}} // namespace epics::pvAccess
|
||||
|
||||
@@ -41,12 +41,76 @@ struct SumService : public pva::RPCService
|
||||
}
|
||||
};
|
||||
|
||||
void testSum(const pva::ChannelProvider::shared_pointer& cli_prov)
|
||||
{
|
||||
pva::RPCClient client("sum", pvd::createRequest("field()"), cli_prov);
|
||||
|
||||
pvd::ValueBuilder args("epics:nt/NTURI:1.0");
|
||||
args.add<pvd::pvString>("scheme", "pva")
|
||||
.add<pvd::pvString>("path", "sum");
|
||||
|
||||
pvd::PVStructurePtr reply;
|
||||
|
||||
testDiag("Request");
|
||||
reply = client.request(args.addNested("query")
|
||||
.add<pvd::pvDouble>("lhs", 5.0)
|
||||
.add<pvd::pvDouble>("rhs", 3.0)
|
||||
.endNested()
|
||||
.buildPVStructure());
|
||||
|
||||
pvd::int32 value = reply->getSubFieldT<pvd::PVScalar>("value")->getAs<pvd::int32>();
|
||||
testOk(value==8, "Reply value = %d", (unsigned)value);
|
||||
|
||||
testDiag("Wait for connect (already connected)");
|
||||
testOk1(client.waitConnect());
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
struct FailService : public pva::RPCService
|
||||
{
|
||||
virtual epics::pvData::PVStructure::shared_pointer request(
|
||||
epics::pvData::PVStructure::shared_pointer const & args
|
||||
) OVERRIDE FINAL
|
||||
{
|
||||
testDiag("failing()");
|
||||
throw std::runtime_error("oops");
|
||||
}
|
||||
};
|
||||
|
||||
void testRPCFail(const pva::ChannelProvider::shared_pointer& cli_prov)
|
||||
{
|
||||
|
||||
testDiag("Fail");
|
||||
|
||||
pva::RPCClient client("fail", pvd::createRequest("field()"), cli_prov);
|
||||
|
||||
pvd::ValueBuilder args("epics:nt/NTURI:1.0");
|
||||
args.add<pvd::pvString>("scheme", "pva")
|
||||
.add<pvd::pvString>("path", "fail");
|
||||
|
||||
|
||||
testDiag("Request");
|
||||
try{
|
||||
(void)client.request(args.addNested("query")
|
||||
.add<pvd::pvDouble>("lhs", 5.0)
|
||||
.add<pvd::pvDouble>("rhs", 3.0)
|
||||
.endNested()
|
||||
.buildPVStructure());
|
||||
testFail("Missing expected exception");
|
||||
}catch(pva::RPCRequestException& e){
|
||||
testPass("caught expected rpc exception: %s", e.what());
|
||||
}catch(std::exception& e){
|
||||
testFail("caught un-expected exception: %s", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
MAIN(testRPC)
|
||||
{
|
||||
testPlan(2);
|
||||
testPlan(3);
|
||||
try {
|
||||
pva::Configuration::shared_pointer conf(pva::ConfigurationBuilder()
|
||||
//.push_env()
|
||||
@@ -65,8 +129,14 @@ MAIN(testRPC)
|
||||
serv.getServer()->getServerPort(),
|
||||
serv.getServer()->getBroadcastPort());
|
||||
|
||||
std::tr1::shared_ptr<pva::RPCService> service(new SumService);
|
||||
serv.registerService("sum", service);
|
||||
{
|
||||
std::tr1::shared_ptr<pva::RPCService> service(new SumService);
|
||||
serv.registerService("sum", service);
|
||||
}
|
||||
{
|
||||
std::tr1::shared_ptr<pva::RPCService> service(new FailService);
|
||||
serv.registerService("fail", service);
|
||||
}
|
||||
|
||||
testDiag("Client Setup");
|
||||
pva::ClientFactory::start();
|
||||
@@ -75,26 +145,9 @@ MAIN(testRPC)
|
||||
if(!cli_prov)
|
||||
testAbort("No pva provider");
|
||||
testDiag("Client Ready");
|
||||
pva::RPCClient client("sum", pvd::createRequest("field()"), cli_prov);
|
||||
|
||||
pvd::ValueBuilder args("epics:nt/NTURI:1.0");
|
||||
args.add<pvd::pvString>("scheme", "pva")
|
||||
.add<pvd::pvString>("path", "sum");
|
||||
|
||||
pvd::PVStructurePtr reply;
|
||||
|
||||
testDiag("Request");
|
||||
reply = client.request(args.addNested("query")
|
||||
.add<pvd::pvDouble>("lhs", 5.0)
|
||||
.add<pvd::pvDouble>("rhs", 3.0)
|
||||
.endNested()
|
||||
.buildPVStructure());
|
||||
|
||||
pvd::int32 value = reply->getSubFieldT<pvd::PVScalar>("value")->getAs<pvd::int32>();
|
||||
testOk(value==8, "Reply value = %d", (unsigned)value);
|
||||
|
||||
testDiag("Wait for connect (already connected)");
|
||||
testOk1(client.waitConnect());
|
||||
testSum(cli_prov);
|
||||
testRPCFail(cli_prov);
|
||||
|
||||
}catch(std::exception& e){
|
||||
PRINT_EXCEPTION(e);
|
||||
|
||||
Reference in New Issue
Block a user