Add support for RPC to pvDatabase

Add a new function to PVRecord which returns a service (from the
pvAccess RPC library) for a supplied pvRequest (defaults to returning
a null pointer).

Add an implementation of ChannelRPC which gets the service from the
record and is used by ChannelLocal.
This commit is contained in:
Dave Hickin
2015-12-08 10:37:37 +00:00
parent c717138c7d
commit a99b08fd02
2 changed files with 215 additions and 4 deletions

View File

@ -23,6 +23,7 @@
#include <pv/pvData.h>
#include <pv/pvCopy.h>
#include <pv/pvTimeStamp.h>
#include <pv/rpcService.h>
#ifdef pvdatabaseEpicsExportSharedSymbols
# define epicsExportSharedSymbols
@ -189,6 +190,17 @@ public:
bool removeListener(
PVListenerPtr const & pvListener,
epics::pvData::PVCopyPtr const & pvCopy);
/**
* Return a service corresponding to the specified request PVStructure.
* @param pvRequest The request PVStructure
* @return The corresponding service
*/
virtual epics::pvAccess::Service::shared_pointer getService(
epics::pvData::PVStructurePtr const & pvRequest)
{
return epics::pvAccess::Service::shared_pointer();
}
/**
* Begins a group of puts.
*/

View File

@ -750,6 +750,202 @@ void ChannelPutGetLocal::getGet()
}
}
class ChannelRPCLocal :
public ChannelRPC,
public RPCResponseCallback,
public std::tr1::enable_shared_from_this<ChannelRPCLocal>
{
private:
Channel::shared_pointer m_channel;
ChannelRPCRequester::shared_pointer m_channelRPCRequester;
Service::shared_pointer m_rpcService;
AtomicBoolean m_lastRequest;
public:
static ChannelRPCLocalPtr create(
ChannelLocalPtr const &channelLocal,
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
PVStructurePtr const & pvRequest,
PVRecordPtr const &pvRecord);
ChannelRPCLocal(
Channel::shared_pointer const & channel,
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
Service::shared_pointer const & rpcService) :
m_channel(channel),
m_channelRPCRequester(channelRPCRequester),
m_rpcService(rpcService),
m_lastRequest()
{
}
virtual ~ChannelRPCLocal()
{
destroy();
}
void processRequest(RPCService::shared_pointer const & service,
PVStructure::shared_pointer const & pvArgument)
{
PVStructure::shared_pointer result;
Status status = Status::Ok;
bool ok = true;
try
{
result = service->request(pvArgument);
}
catch (RPCRequestException& rre)
{
status = Status(rre.getStatus(), rre.what());
ok = false;
}
catch (std::exception& ex)
{
status = Status(Status::STATUSTYPE_FATAL, ex.what());
ok = false;
}
catch (...)
{
// handle user unexpected errors
status = Status(Status::STATUSTYPE_FATAL, "Unexpected exception caught while calling RPCService.request(PVStructure).");
ok = false;
}
// check null result
if (ok && result.get() == 0)
{
status = Status(Status::STATUSTYPE_FATAL, "RPCService.request(PVStructure) returned null.");
}
m_channelRPCRequester->requestDone(status, shared_from_this(), result);
if (m_lastRequest.get())
destroy();
}
virtual void requestDone(
Status const & status,
PVStructure::shared_pointer const & result
)
{
m_channelRPCRequester->requestDone(status, shared_from_this(), result);
if (m_lastRequest.get())
destroy();
}
void processRequest(RPCServiceAsync::shared_pointer const & service,
PVStructure::shared_pointer const & pvArgument)
{
try
{
service->request(pvArgument, shared_from_this());
}
catch (std::exception& ex)
{
// handle user unexpected errors
Status errorStatus(Status::STATUSTYPE_FATAL, ex.what());
m_channelRPCRequester->requestDone(errorStatus, shared_from_this(), PVStructure::shared_pointer());
if (m_lastRequest.get())
destroy();
}
catch (...)
{
// handle user unexpected errors
Status errorStatus(Status::STATUSTYPE_FATAL,
"Unexpected exception caught while calling RPCServiceAsync.request(PVStructure, RPCResponseCallback).");
m_channelRPCRequester->requestDone(errorStatus, shared_from_this(), PVStructure::shared_pointer());
if (m_lastRequest.get())
destroy();
}
// we wait for callback to be called
}
virtual void request(PVStructure::shared_pointer const & pvArgument)
{
RPCService::shared_pointer rpcService =
std::tr1::dynamic_pointer_cast<RPCService>(m_rpcService);
if (rpcService)
{
processRequest(rpcService, pvArgument);
return;
}
RPCServiceAsync::shared_pointer rpcServiceAsync =
std::tr1::dynamic_pointer_cast<RPCServiceAsync>(m_rpcService);
if (rpcServiceAsync)
{
processRequest(rpcServiceAsync, pvArgument);
return;
}
}
void lastRequest()
{
m_lastRequest.set();
}
virtual Channel::shared_pointer getChannel()
{
return m_channel;
}
virtual void cancel()
{
// noop
}
virtual void destroy()
{
// noop
}
virtual void lock()
{
// noop
}
virtual void unlock()
{
// noop
}
};
ChannelRPCLocalPtr ChannelRPCLocal::create(
ChannelLocalPtr const &channelLocal,
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
PVStructurePtr const & pvRequest,
PVRecordPtr const &pvRecord)
{
Service::shared_pointer service = pvRecord->getService(pvRequest);
if (service.get() == 0)
{
Status status(Status::STATUSTYPE_ERROR,
"ChannelRPC not supported");
channelRPCRequester->channelRPCConnect(status,ChannelRPCLocalPtr());
return ChannelRPCLocalPtr();
}
if (channelRPCRequester.get() == 0)
throw std::invalid_argument("channelRPCRequester == null");
// TODO use std::make_shared
ChannelRPCLocalPtr rpc(
new ChannelRPCLocal(channelLocal, channelRPCRequester, service)
);
channelRPCRequester->channelRPCConnect(Status::Ok, rpc);
return rpc;
}
typedef std::tr1::shared_ptr<PVArray> PVArrayPtr;
class ChannelArrayLocal :
@ -1209,10 +1405,13 @@ ChannelRPC::shared_pointer ChannelLocal::createChannelRPC(
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
PVStructure::shared_pointer const & pvRequest)
{
Status status(Status::STATUSTYPE_ERROR,
"ChannelRPC not supported");
channelRPCRequester->channelRPCConnect(status,ChannelRPC::shared_pointer());
return ChannelRPC::shared_pointer();
ChannelRPCLocalPtr channelRPC =
ChannelRPCLocal::create(
getPtrSelf(),
channelRPCRequester,
pvRequest,
pvRecord);
return channelRPC;
}
Monitor::shared_pointer ChannelLocal::createMonitor(