Merge pull request #3 from dhickin/rpc

Add support for Channel RPC
This commit is contained in:
Marty Kraimer
2015-12-09 06:19:44 -05:00
35 changed files with 1261 additions and 4 deletions

View File

@@ -23,6 +23,7 @@
#include <pv/pvData.h>
#include <pv/pvCopy.h>
#include <pv/pvTimeStamp.h>
#include <pv/rpcService.h>
#ifdef pvdatabaseEpicsExportSharedSymbols
# define epicsExportSharedSymbols
@@ -189,6 +190,17 @@ public:
bool removeListener(
PVListenerPtr const & pvListener,
epics::pvData::PVCopyPtr const & pvCopy);
/**
* Return a service corresponding to the specified request PVStructure.
* @param pvRequest The request PVStructure
* @return The corresponding service
*/
virtual epics::pvAccess::Service::shared_pointer getService(
epics::pvData::PVStructurePtr const & pvRequest)
{
return epics::pvAccess::Service::shared_pointer();
}
/**
* Begins a group of puts.
*/

View File

@@ -750,6 +750,233 @@ void ChannelPutGetLocal::getGet()
}
}
class ChannelRPCLocal :
public ChannelRPC,
public RPCResponseCallback,
public std::tr1::enable_shared_from_this<ChannelRPCLocal>
{
public:
POINTER_DEFINITIONS(ChannelRPCLocal);
static ChannelRPCLocalPtr create(
ChannelLocalPtr const & channelLocal,
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
PVStructurePtr const & pvRequest,
PVRecordPtr const & pvRecord);
ChannelRPCLocal(
ChannelLocalPtr const & channelLocal,
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
Service::shared_pointer const & service,
PVRecordPtr const & pvRecord) :
isDestroyed(),
channelLocal(channelLocal),
channelRPCRequester(channelRPCRequester),
service(service),
pvRecord(pvRecord),
isLastRequest()
{
}
virtual ~ChannelRPCLocal()
{
if(pvRecord->getTraceLevel()>0)
{
cout << "~ChannelRPCLocal()" << endl;
}
destroy();
}
void processRequest(RPCService::shared_pointer const & service,
PVStructurePtr const & pvArgument);
virtual void requestDone(Status const & status,
PVStructurePtr const & result)
{
channelRPCRequester->requestDone(status, getPtrSelf(), result);
if (isLastRequest.get())
destroy();
}
void processRequest(RPCServiceAsync::shared_pointer const & service,
PVStructurePtr const & pvArgument);
virtual void request(PVStructurePtr const & pvArgument);
void lastRequest()
{
isLastRequest.set();
}
virtual Channel::shared_pointer getChannel()
{
return channelLocal;
}
virtual void cancel() {}
virtual void destroy();
virtual void lock() {}
virtual void unlock() {}
private:
shared_pointer getPtrSelf()
{
return shared_from_this();
}
AtomicBoolean isDestroyed;
ChannelLocalPtr channelLocal;
ChannelRPCRequester::shared_pointer channelRPCRequester;
Service::shared_pointer service;
PVRecordPtr pvRecord;
AtomicBoolean isLastRequest;
};
ChannelRPCLocalPtr ChannelRPCLocal::create(
ChannelLocalPtr const &channelLocal,
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
PVStructurePtr const & pvRequest,
PVRecordPtr const &pvRecord)
{
Service::shared_pointer service = pvRecord->getService(pvRequest);
if (service.get() == 0)
{
Status status(Status::STATUSTYPE_ERROR,
"ChannelRPC not supported");
channelRPCRequester->channelRPCConnect(status,ChannelRPCLocalPtr());
return ChannelRPCLocalPtr();
}
if (channelRPCRequester.get() == 0)
throw std::invalid_argument("channelRPCRequester == null");
// TODO use std::make_shared
ChannelRPCLocalPtr rpc(
new ChannelRPCLocal(channelLocal, channelRPCRequester, service, pvRecord)
);
channelRPCRequester->channelRPCConnect(Status::Ok, rpc);
if(pvRecord->getTraceLevel()>0)
{
cout << "ChannelRPCLocal::create";
cout << " recordName " << pvRecord->getRecordName() << endl;
}
return rpc;
}
void ChannelRPCLocal::processRequest(
RPCService::shared_pointer const & service,
PVStructurePtr const & pvArgument)
{
PVStructurePtr result;
Status status = Status::Ok;
bool ok = true;
try
{
result = service->request(pvArgument);
}
catch (RPCRequestException& rre)
{
status = Status(rre.getStatus(), rre.what());
ok = false;
}
catch (std::exception& ex)
{
status = Status(Status::STATUSTYPE_FATAL, ex.what());
ok = false;
}
catch (...)
{
// handle user unexpected errors
status = Status(Status::STATUSTYPE_FATAL, "Unexpected exception caught while calling RPCService.request(PVStructure).");
ok = false;
}
// check null result
if (ok && result.get() == 0)
{
status = Status(Status::STATUSTYPE_FATAL, "RPCService.request(PVStructure) returned null.");
}
channelRPCRequester->requestDone(status, getPtrSelf(), result);
if (isLastRequest.get())
destroy();
}
void ChannelRPCLocal::processRequest(
RPCServiceAsync::shared_pointer const & service,
PVStructurePtr const & pvArgument)
{
try
{
service->request(pvArgument, getPtrSelf());
}
catch (std::exception& ex)
{
// handle user unexpected errors
Status errorStatus(Status::STATUSTYPE_FATAL, ex.what());
channelRPCRequester->requestDone(errorStatus, getPtrSelf(), PVStructurePtr());
if (isLastRequest.get())
destroy();
}
catch (...)
{
// handle user unexpected errors
Status errorStatus(Status::STATUSTYPE_FATAL,
"Unexpected exception caught while calling RPCServiceAsync.request(PVStructure, RPCResponseCallback).");
channelRPCRequester->requestDone(errorStatus, shared_from_this(), PVStructurePtr());
if (isLastRequest.get())
destroy();
}
// we wait for callback to be called
}
void ChannelRPCLocal::request(PVStructurePtr const & pvArgument)
{
if(pvRecord->getTraceLevel()>1)
{
cout << "ChannelRPCLocal::request" << endl;
}
RPCService::shared_pointer rpcService =
std::tr1::dynamic_pointer_cast<RPCService>(service);
if (rpcService)
{
processRequest(rpcService, pvArgument);
return;
}
RPCServiceAsync::shared_pointer rpcServiceAsync =
std::tr1::dynamic_pointer_cast<RPCServiceAsync>(service);
if (rpcServiceAsync)
{
processRequest(rpcServiceAsync, pvArgument);
return;
}
}
void ChannelRPCLocal::destroy()
{
if(pvRecord->getTraceLevel()>0)
{
cout << "ChannelRPCLocal::destroy";
cout << " destroyed " << isDestroyed.get() << endl;
}
isDestroyed.set();
}
typedef std::tr1::shared_ptr<PVArray> PVArrayPtr;
class ChannelArrayLocal :
@@ -1209,10 +1436,13 @@ ChannelRPC::shared_pointer ChannelLocal::createChannelRPC(
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
PVStructure::shared_pointer const & pvRequest)
{
Status status(Status::STATUSTYPE_ERROR,
"ChannelRPC not supported");
channelRPCRequester->channelRPCConnect(status,ChannelRPC::shared_pointer());
return ChannelRPC::shared_pointer();
ChannelRPCLocalPtr channelRPC =
ChannelRPCLocal::create(
getPtrSelf(),
channelRPCRequester,
pvRequest,
pvRecord);
return channelRPC;
}
Monitor::shared_pointer ChannelLocal::createMonitor(