From e73c6b1548b490e624fb1c5a7fe129168dea1598 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Mon, 3 Sep 2012 12:28:38 +0200 Subject: [PATCH] rpcService --- pvAccessApp/Makefile | 5 + pvAccessApp/rpcService/rpcServer.cpp | 442 +++++++++++++++++++++++++++ pvAccessApp/rpcService/rpcServer.h | 51 ++++ pvAccessApp/rpcService/rpcService.h | 45 +++ testApp/remote/Makefile | 4 + testApp/remote/rpcServiceExample.cpp | 55 ++++ 6 files changed, 602 insertions(+) create mode 100644 pvAccessApp/rpcService/rpcServer.cpp create mode 100644 pvAccessApp/rpcService/rpcServer.h create mode 100644 pvAccessApp/rpcService/rpcService.h create mode 100644 testApp/remote/rpcServiceExample.cpp diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index df43651..06e4475 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -76,6 +76,11 @@ LIBSRCS += baseChannelRequester.cpp LIBSRCS += beaconEmitter.cpp LIBSRCS += beaconServerStatusProvider.cpp +SRC_DIRS += $(PVACCESS)/rpcService +INC += rpcService.h +INC += rpcServer.h +LIBSRCS += rpcServer.cpp + LIBRARY = pvAccess pvAccess_LIBS += Com pvAccess_LIBS += pvData diff --git a/pvAccessApp/rpcService/rpcServer.cpp b/pvAccessApp/rpcService/rpcServer.cpp new file mode 100644 index 0000000..e5a4e90 --- /dev/null +++ b/pvAccessApp/rpcService/rpcServer.cpp @@ -0,0 +1,442 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +using namespace epics::pvData; + + +namespace epics { namespace pvAccess { + + +class ChannelRPCServiceImpl : public ChannelRPC +{ + private: + ChannelRPCRequester::shared_pointer m_channelRPCRequester; + RPCService::shared_pointer m_rpcService; + + public: + ChannelRPCServiceImpl( + ChannelRPCRequester::shared_pointer const & channelRPCRequester, + RPCService::shared_pointer const & rpcService) : + m_channelRPCRequester(channelRPCRequester), + m_rpcService(rpcService) + { + } + + virtual ~ChannelRPCServiceImpl() + { + destroy(); + } + + void processRequest(epics::pvData::PVStructure::shared_pointer const & pvArgument, bool lastRequest) + { + epics::pvData::PVStructure::shared_pointer result; + Status status = Status::OK; + bool ok = true; + try + { + result = m_rpcService->request(pvArgument); + } + catch (RPCRequestException& rre) + { + status = Status(rre.getStatus(), rre.what()); + ok = false; + } + catch (std::exception& ex) + { + status = Status(Status::STATUSTYPE_FATAL, ex.what()); + ok = false; + } + catch (...) + { + // handle user unexpected errors + status = Status(Status::STATUSTYPE_FATAL, "Unexpected exception caught while calling RPCService.request(PVStructure)."); + ok = false; + } + + // check null result + if (ok && result.get() == 0) + { + status = Status(Status::STATUSTYPE_FATAL, "RPCService.request(PVStructure) returned null."); + } + + m_channelRPCRequester->requestDone(status, result); + + if (lastRequest) + destroy(); + + } + + virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument, bool lastRequest) + { + processRequest(pvArgument, lastRequest); + } + + virtual void destroy() + { + // noop + } + + virtual void lock() + { + // noop + } + + virtual void unlock() + { + // noop + } +}; + + + + +class RPCChannel : + public virtual Channel +{ +private: + + static Status notSupportedStatus; + static Status destroyedStatus; + + AtomicBoolean m_destroyed; + + ChannelProvider::shared_pointer m_provider; + String m_channelName; + ChannelRequester::shared_pointer m_channelRequester; + + RPCService::shared_pointer m_rpcService; + +public: + POINTER_DEFINITIONS(RPCChannel); + + RPCChannel( + ChannelProvider::shared_pointer const & provider, + String const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + RPCService::shared_pointer const & rpcService) : + m_provider(provider), + m_channelName(channelName), + m_channelRequester(channelRequester), + m_rpcService(rpcService) + { + } + + virtual ~RPCChannel() + { + destroy(); + } + + virtual std::tr1::shared_ptr const & getProvider() + { + return m_provider; + } + + virtual epics::pvData::String getRemoteAddress() + { + // local + return getChannelName(); + } + + virtual ConnectionState getConnectionState() + { + return isConnected() ? + Channel::CONNECTED : + Channel::DESTROYED; + } + + virtual epics::pvData::String getChannelName() + { + return m_channelName; + } + + virtual std::tr1::shared_ptr const & getChannelRequester() + { + return m_channelRequester; + } + + virtual bool isConnected() + { + return !m_destroyed.get(); + } + + + virtual AccessRights getAccessRights(epics::pvData::PVField::shared_pointer const & pvField) + { + return none; + } + + virtual void getField(GetFieldRequester::shared_pointer const & requester,epics::pvData::String const & subField) + { + requester->getDone(notSupportedStatus, epics::pvData::Field::shared_pointer()); + } + + virtual ChannelProcess::shared_pointer createChannelProcess( + ChannelProcessRequester::shared_pointer const & channelProcessRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) + { + ChannelProcess::shared_pointer nullPtr; + channelProcessRequester->channelProcessConnect(notSupportedStatus, nullPtr); + return nullPtr; + } + + virtual ChannelGet::shared_pointer createChannelGet( + ChannelGetRequester::shared_pointer const & channelGetRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) + { + ChannelGet::shared_pointer nullPtr; + channelGetRequester->channelGetConnect(notSupportedStatus, nullPtr, + epics::pvData::PVStructure::shared_pointer(), epics::pvData::BitSet::shared_pointer()); + return nullPtr; + } + + virtual ChannelPut::shared_pointer createChannelPut( + ChannelPutRequester::shared_pointer const & channelPutRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) + { + ChannelPut::shared_pointer nullPtr; + channelPutRequester->channelPutConnect(notSupportedStatus, nullPtr, + epics::pvData::PVStructure::shared_pointer(), epics::pvData::BitSet::shared_pointer()); + return nullPtr; + } + + + virtual ChannelPutGet::shared_pointer createChannelPutGet( + ChannelPutGetRequester::shared_pointer const & channelPutGetRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) + { + ChannelPutGet::shared_pointer nullPtr; + epics::pvData::PVStructure::shared_pointer nullStructure; + channelPutGetRequester->channelPutGetConnect(notSupportedStatus, nullPtr, nullStructure, nullStructure); + return nullPtr; + } + + virtual ChannelRPC::shared_pointer createChannelRPC( + ChannelRPCRequester::shared_pointer const & channelRPCRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) + { + // nothing expected to be in pvRequest + + if (channelRPCRequester.get() == 0) + throw std::invalid_argument("channelRPCRequester == null"); + + if (m_destroyed.get()) + { + ChannelRPC::shared_pointer nullPtr; + channelRPCRequester->channelRPCConnect(destroyedStatus, nullPtr); + return nullPtr; + } + + ChannelRPC::shared_pointer channelRPCImpl(new ChannelRPCServiceImpl(channelRPCRequester, m_rpcService)); + channelRPCRequester->channelRPCConnect(Status::OK, channelRPCImpl); + return channelRPCImpl; + } + + virtual epics::pvData::Monitor::shared_pointer createMonitor( + epics::pvData::MonitorRequester::shared_pointer const & monitorRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) + { + epics::pvData::Monitor::shared_pointer nullPtr; + monitorRequester->monitorConnect(notSupportedStatus, nullPtr, epics::pvData::Structure::shared_pointer()); + return nullPtr; + } + + virtual ChannelArray::shared_pointer createChannelArray( + ChannelArrayRequester::shared_pointer const & channelArrayRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) + { + ChannelArray::shared_pointer nullPtr; + channelArrayRequester->channelArrayConnect(notSupportedStatus, nullPtr, epics::pvData::PVArray::shared_pointer()); + return nullPtr; + } + + + virtual void printInfo() + { + std::cout << "RPCChannel: " << getChannelName() << " [" << Channel::ConnectionStateNames[getConnectionState()] << "]" << std::endl; + } + + virtual void printInfo(epics::pvData::StringBuilder out) + { + *out += "RPCChannel: "; + *out += getChannelName(); + *out += " ["; + *out += Channel::ConnectionStateNames[getConnectionState()]; + *out += "]"; + } + + virtual String getRequesterName() + { + return getChannelName(); + } + + virtual void message(String const & message,MessageType messageType) + { + // just delegate + m_channelRequester->message(message, messageType); + } + + virtual void destroy() + { + m_destroyed.set(); + } +}; + +Status RPCChannel::notSupportedStatus(Status::STATUSTYPE_ERROR, "only channelRPC requests are supported by this channel"); +Status RPCChannel::destroyedStatus(Status::STATUSTYPE_ERROR, "channel destroyed"); + + + +class RPCChannelProvider : + public virtual ChannelProvider, + public virtual ChannelFind, + public std::tr1::enable_shared_from_this { + +public: + POINTER_DEFINITIONS(RPCChannelProvider); + + static String PROVIDER_NAME; + + static Status noSuchChannelStatus; + + // TODO thread pool support + + RPCChannelProvider() { + } + + virtual String getProviderName() { + return PROVIDER_NAME; + } + + virtual std::tr1::shared_ptr getChannelProvider() + { + return shared_from_this(); + } + + virtual void cancelChannelFind() {} + + virtual void destroy() {} + + virtual ChannelFind::shared_pointer channelFind(epics::pvData::String const & channelName, + ChannelFindRequester::shared_pointer const & channelFindRequester) + { + bool found; + { + Lock guard(m_mutex); + found = (m_services.find(channelName) != m_services.end()); + } + ChannelFind::shared_pointer thisPtr(shared_from_this()); + channelFindRequester->channelFindResult(Status::OK, thisPtr, found); + return thisPtr; + } + + + virtual Channel::shared_pointer createChannel( + epics::pvData::String const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + short priority) + { + RPCServiceMap::const_iterator iter; + { + Lock guard(m_mutex); + iter = m_services.find(channelName); + } + + if (iter == m_services.end()) + { + Channel::shared_pointer nullChannel; + channelRequester->channelCreated(noSuchChannelStatus, nullChannel); + return nullChannel; + } + + Channel::shared_pointer rpcChannel(new RPCChannel( + shared_from_this(), + channelName, + channelRequester, + iter->second)); + channelRequester->channelCreated(Status::OK, rpcChannel); + return rpcChannel; + } + + virtual Channel::shared_pointer createChannel( + epics::pvData::String const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + short priority, + epics::pvData::String const & address) + { + // this will never get called by the pvAccess server + throw std::runtime_error("not supported"); + } + + void registerService(String const & serviceName, RPCService::shared_pointer const & service) + { + Lock guard(m_mutex); + m_services[serviceName] = service; + } + + void unregisterService(String const & serviceName) + { + Lock guard(m_mutex); + m_services.erase(serviceName); + } + +private: + typedef std::map RPCServiceMap; + RPCServiceMap m_services; + epics::pvData::Mutex m_mutex; +}; + +String RPCChannelProvider::PROVIDER_NAME("rpcService"); +Status RPCChannelProvider::noSuchChannelStatus(Status::STATUSTYPE_ERROR, "no such channel"); + + +RPCServer::RPCServer() +{ + m_channelProviderImpl.reset(new RPCChannelProvider()); + registerChannelProvider(m_channelProviderImpl); + + setenv("EPICS4_CAS_PROVIDER_NAMES", m_channelProviderImpl->getProviderName().c_str(), 1); + + m_serverContext = ServerContextImpl::create(); + + m_serverContext->initialize(getChannelAccess()); +} + +RPCServer::~RPCServer() +{ + // multiple destroy call is OK + destroy(); +} + +void RPCServer::printInfo() +{ + std::cout << m_serverContext->getVersion().getVersionString() << std::endl; + m_serverContext->printInfo(); +} + +void RPCServer::run(int seconds) +{ + m_serverContext->run(seconds); +} + +void RPCServer::destroy() +{ + m_serverContext->destroy(); +} + +void RPCServer::registerService(String const & serviceName, RPCService::shared_pointer const & service) +{ + std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->registerService(serviceName, service); +} + +void RPCServer::unregisterService(String const & serviceName) +{ + std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->unregisterService(serviceName); +} + +}} diff --git a/pvAccessApp/rpcService/rpcServer.h b/pvAccessApp/rpcService/rpcServer.h new file mode 100644 index 0000000..e3918e7 --- /dev/null +++ b/pvAccessApp/rpcService/rpcServer.h @@ -0,0 +1,51 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef RPCSERVER_H +#define RPCSERVER_H + +#include +#include + +#include +#include + +namespace epics { namespace pvAccess { + +class RPCServer { + private: + + ServerContextImpl::shared_pointer m_serverContext; + ChannelProvider::shared_pointer m_channelProviderImpl; + + // TODO no thread poll implementation + + public: + POINTER_DEFINITIONS(RPCServer); + + RPCServer(); + + virtual ~RPCServer(); + + void registerService(epics::pvData::String const & serviceName, RPCService::shared_pointer const & service); + + void unregisterService(epics::pvData::String const & serviceName); + + void run(int seconds = 0); + + void destroy(); + + /** + * Display basic information about the context. + */ + void printInfo(); + +}; + + +}} + +#endif /* RPCSERVER_H */ diff --git a/pvAccessApp/rpcService/rpcService.h b/pvAccessApp/rpcService/rpcService.h new file mode 100644 index 0000000..88a38ee --- /dev/null +++ b/pvAccessApp/rpcService/rpcService.h @@ -0,0 +1,45 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef RPCSERVICE_H +#define RPCSERVICE_H + +#include +#include +#include +#include + +namespace epics { namespace pvAccess { + +class RPCRequestException : public std::runtime_error { +public: + + RPCRequestException(epics::pvData::Status::StatusType status, epics::pvData::String const & message) : + std::runtime_error(message), m_status(status) + { + } + + epics::pvData::Status::StatusType getStatus() const { + return m_status; + } + +private: + epics::pvData::Status::StatusType m_status; +}; + + +class RPCService { + public: + POINTER_DEFINITIONS(RPCService); + + virtual epics::pvData::PVStructure::shared_pointer request( + epics::pvData::PVStructure::shared_pointer const & args + ) throw (RPCRequestException) = 0; +}; + +}} + +#endif /* RPCSERVICE_H */ diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index bec4c44..b537885 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -66,6 +66,10 @@ PROD_HOST += eget eget_SRCS += eget.cpp eget_LIBS += pvData pvAccess Com +PROD_HOST += rpcServiceExample +rpcServiceExample_SRCS += rpcServiceExample.cpp +rpcServiceExample_LIBS += pvData pvAccess Com + include $(TOP)/configure/RULES #---------------------------------------- # ADD RULES AFTER THIS LINE diff --git a/testApp/remote/rpcServiceExample.cpp b/testApp/remote/rpcServiceExample.cpp new file mode 100644 index 0000000..cab6dbe --- /dev/null +++ b/testApp/remote/rpcServiceExample.cpp @@ -0,0 +1,55 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +using namespace epics::pvData; +using namespace epics::pvAccess; + + + +static StructureConstPtr createResultField() +{ + FieldCreatePtr fieldCreate = getFieldCreate(); + + StringArray fieldNames; + fieldNames.push_back("c"); + FieldConstPtrArray fields; + fields.push_back(fieldCreate->createScalar(pvDouble)); + return fieldCreate->createStructure(fieldNames, fields); +} + +static StructureConstPtr resultStructure = createResultField(); + +class SumServiceImpl : + public RPCService +{ + PVStructure::shared_pointer request(PVStructure::shared_pointer const & args) + throw (RPCRequestException) + { + // TODO error handling + double a = atof(args->getStringField("a")->get().c_str()); + double b = atof(args->getStringField("b")->get().c_str()); + + PVStructure::shared_pointer result = getPVDataCreate()->createPVStructure(resultStructure); + result->getDoubleField("c")->put(a+b); + return result; + } +}; + +int main() +{ + RPCServer server; + + server.registerService("sum", RPCService::shared_pointer(new SumServiceImpl())); + // you can register as many services as you want here ... + + server.printInfo(); + server.run(); + + return 0; +} \ No newline at end of file