diff --git a/pvAccessApp/ca/caProvider.cpp b/pvAccessApp/ca/caProvider.cpp index 3b4e9f1..87d0617 100644 --- a/pvAccessApp/ca/caProvider.cpp +++ b/pvAccessApp/ca/caProvider.cpp @@ -57,6 +57,19 @@ ChannelFind::shared_pointer CAChannelProvider::channelFind( return nullChannelFind; } +ChannelFind::shared_pointer CAChannelProvider::channelList( + ChannelListRequester::shared_pointer const & channelListRequester) +{ + if (!channelListRequester.get()) + throw std::runtime_error("null requester"); + + Status errorStatus(Status::STATUSTYPE_ERROR, "not implemented"); + ChannelFind::shared_pointer nullChannelFind; + std::set none; + EXCEPTION_GUARD(channelListRequester->channelListResult(errorStatus, nullChannelFind, none, false)); + return nullChannelFind; +} + Channel::shared_pointer CAChannelProvider::createChannel( epics::pvData::String const & channelName, ChannelRequester::shared_pointer const & channelRequester, diff --git a/pvAccessApp/ca/caProvider.h b/pvAccessApp/ca/caProvider.h index 5eb904c..a40afd7 100644 --- a/pvAccessApp/ca/caProvider.h +++ b/pvAccessApp/ca/caProvider.h @@ -35,6 +35,8 @@ public: epics::pvData::String const & channelName, ChannelFindRequester::shared_pointer const & channelFindRequester); + virtual ChannelFind::shared_pointer channelList( + ChannelListRequester::shared_pointer const & channelListRequester); virtual Channel::shared_pointer createChannel( epics::pvData::String const & channelName, diff --git a/pvAccessApp/client/pvAccess.h b/pvAccessApp/client/pvAccess.h index bb4295b..c2927d2 100644 --- a/pvAccessApp/client/pvAccess.h +++ b/pvAccessApp/client/pvAccess.h @@ -8,6 +8,7 @@ #define PVACCESS_H #include +#include #ifdef epicsExportSharedSymbols # define pvAccessEpicsExportSharedSymbols @@ -268,6 +269,24 @@ namespace pvAccess { bool wasFound) = 0; }; + /** + * + */ + class epicsShareClass ChannelListRequester { + public: + POINTER_DEFINITIONS(ChannelListRequester); + + virtual ~ChannelListRequester() {}; + + /** + * @param status Completion status. + */ + virtual void channelListResult( + const epics::pvData::Status& status, + ChannelFind::shared_pointer const & channelFind, + std::set const & channelNames, + bool hasDynamic) = 0; + }; /** * Request to get data from a channel. @@ -826,6 +845,13 @@ namespace pvAccess { virtual ChannelFind::shared_pointer channelFind(epics::pvData::String const & channelName, ChannelFindRequester::shared_pointer const & channelFindRequester) = 0; + /** + * Find channels. + * @param channelFindRequester The epics::pvData::Requester. + * @return An interface for the find. + */ + virtual ChannelFind::shared_pointer channelList(ChannelListRequester::shared_pointer const & channelListRequester) = 0; + /** * Create a channel. * @param channelName The name of the channel. diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index 1866185..40eedcf 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -3015,6 +3015,19 @@ namespace epics { return nullChannelFind; } + virtual ChannelFind::shared_pointer channelList( + ChannelListRequester::shared_pointer const & channelListRequester) + { + if (!channelListRequester.get()) + throw std::runtime_error("null requester"); + + Status errorStatus(Status::STATUSTYPE_ERROR, "not implemented"); + ChannelFind::shared_pointer nullChannelFind; + std::set none; + EXCEPTION_GUARD(channelListRequester->channelListResult(errorStatus, nullChannelFind, none, false)); + return nullChannelFind; + } + virtual Channel::shared_pointer createChannel( epics::pvData::String const & channelName, ChannelRequester::shared_pointer const & channelRequester, diff --git a/pvAccessApp/rpcService/rpcServer.cpp b/pvAccessApp/rpcService/rpcServer.cpp index ebc5487..0e362f2 100644 --- a/pvAccessApp/rpcService/rpcServer.cpp +++ b/pvAccessApp/rpcService/rpcServer.cpp @@ -316,6 +316,13 @@ public: Status RPCChannel::notSupportedStatus(Status::STATUSTYPE_ERROR, "only channelRPC requests are supported by this channel"); Status RPCChannel::destroyedStatus(Status::STATUSTYPE_ERROR, "channel destroyed"); +Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider, + epics::pvData::String const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + RPCService::shared_pointer const & rpcService) +{ + return Channel::shared_pointer(new RPCChannel(provider, channelName, channelRequester, rpcService)); +} class RPCChannelProvider : @@ -362,7 +369,27 @@ public: } - virtual Channel::shared_pointer createChannel( + virtual ChannelFind::shared_pointer channelList( + ChannelListRequester::shared_pointer const & channelListRequester) + { + if (!channelListRequester.get()) + throw std::runtime_error("null requester"); + + std::set channelNames; + { + Lock guard(m_mutex); + for (RPCServiceMap::const_iterator iter = m_services.begin(); + iter != m_services.end(); + iter++) + channelNames.insert(iter->first); + } + + ChannelFind::shared_pointer thisPtr(shared_from_this()); + channelListRequester->channelListResult(Status::Ok, thisPtr, channelNames, false); + return thisPtr; + } + + virtual Channel::shared_pointer createChannel( epics::pvData::String const & channelName, ChannelRequester::shared_pointer const & channelRequester, short /*priority*/) diff --git a/pvAccessApp/rpcService/rpcServer.h b/pvAccessApp/rpcService/rpcServer.h index fc102e2..8ff7995 100644 --- a/pvAccessApp/rpcService/rpcServer.h +++ b/pvAccessApp/rpcService/rpcServer.h @@ -64,6 +64,10 @@ class epicsShareClass RPCServer : }; +epicsShareExtern Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider, + epics::pvData::String const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + RPCService::shared_pointer const & rpcService); }} diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 8595637..261dbd4 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -18,6 +18,7 @@ #include #include +#include using std::ostringstream; using std::hex; @@ -379,6 +380,145 @@ void ServerChannelFindRequesterImpl::send(ByteBuffer* buffer, TransportSendContr } /****************************************************************************************/ + +class ChannelListRequesterImpl : + public ChannelListRequester +{ +public: + POINTER_DEFINITIONS(ChannelListRequesterImpl); + + std::set channelNames; + Status status; + + virtual void channelListResult( + const epics::pvData::Status& status, + ChannelFind::shared_pointer const & channelFind, + std::set const & channelNames, + bool hasDynamic) + { + epics::pvData::Lock lock(_waitMutex); + + this->status = status; + this->channelNames = channelNames; + + _waitEvent.signal(); + } + + bool waitForCompletion(int32 timeoutSec) { + return _waitEvent.wait(timeoutSec); + } + +private: + epics::pvData::Mutex _waitMutex; + epics::pvData::Event _waitEvent; + +}; + +// TODO move out to a separate class +class ServerRPCService : public RPCService { + +private: + static int32 TIMEOUT_SEC; + + static Structure::const_shared_pointer helpStructure; + static Structure::const_shared_pointer channelListStructure; + + static std::string helpString; + + ServerContextImpl::shared_pointer m_serverContext; + +public: + + ServerRPCService(ServerContextImpl::shared_pointer const & context) : + m_serverContext(context) + { + } + + virtual epics::pvData::PVStructure::shared_pointer request( + epics::pvData::PVStructure::shared_pointer const & arguments + ) throw (RPCRequestException) + { + // NTURI support + PVStructure::shared_pointer args( + (arguments->getStructure()->getID() == "uri:ev4:nt/2012/pwd:NTURI") ? + arguments->getStructureField("query") : + arguments + ); + + // help support + if (args->getSubField("help")) + { + PVStructure::shared_pointer help = getPVDataCreate()->createPVStructure(helpStructure); + help->getSubField("value")->put(helpString); + return help; + } + + PVString::shared_pointer opField = args->getSubField("op"); + if (!opField) + throw RPCRequestException(Status::STATUSTYPE_ERROR, "unspecified 'string op' field"); + + String op = opField->get(); + if (op == "channels") + { + ChannelListRequesterImpl::shared_pointer listListener(new ChannelListRequesterImpl()); + m_serverContext->getChannelProviders()[0]->channelList(listListener); // TODO multiple channel providers !!!! + if (!listListener->waitForCompletion(TIMEOUT_SEC)) + throw RPCRequestException(Status::STATUSTYPE_ERROR, "failed to fetch channel list due to timeout"); + + Status& status = listListener->status; + if (!status.isSuccess()) + { + String errorMessage = "failed to fetch channel list: " + status.getMessage(); + if (!status.getStackDump().empty()) + errorMessage += "\n" + status.getStackDump(); + throw RPCRequestException(Status::STATUSTYPE_ERROR, errorMessage); + } + + std::set& channelNames = listListener->channelNames; + + PVStructure::shared_pointer result = + getPVDataCreate()->createPVStructure(channelListStructure); + PVStringArray::shared_pointer pvArray = result->getSubField("value"); + PVStringArray::svector newdata(channelNames.size()); + size_t i = 0; + for (std::set::const_iterator iter = channelNames.begin(); + iter != channelNames.end(); + iter++) + newdata[i++] = *iter; + pvArray->replace(freeze(newdata)); + + return result; + } + else + throw RPCRequestException(Status::STATUSTYPE_ERROR, "unsupported operation '" + op + "'."); + } +}; + +int32 ServerRPCService::TIMEOUT_SEC = 3; +Structure::const_shared_pointer ServerRPCService::helpStructure = + getFieldCreate()->createFieldBuilder()-> + setId("uri:ev4:nt/2012/pwd:NTScalar")-> + add("value", pvString)-> + createStructure(); + +Structure::const_shared_pointer ServerRPCService::channelListStructure = + getFieldCreate()->createFieldBuilder()-> + setId("uri:ev4:nt/2012/pwd:NTScalarArray")-> + addArray("value", pvString)-> + createStructure(); + +std::string ServerRPCService::helpString = + "pvAccess server RPC service.\n" + "arguments:\n" + "\tstring op\toperation to execute\n" + "\n" + "\toperations:\n" + "\t\tchannels\treturns a list of 'static' channels the server can provide\n" + "\t\t\t (no arguments)\n" + "\n"; + +epics::pvData::String ServerCreateChannelHandler::SERVER_CHANNEL_NAME = "server"; + void ServerCreateChannelHandler::handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, size_t payloadSize, ByteBuffer* payloadBuffer) @@ -414,14 +554,22 @@ void ServerCreateChannelHandler::handleResponse(osiSockAddr* responseFrom, return; } - // TODO !!! - //ServerChannelRequesterImpl::create(_providers[0], transport, channelName, cid); - - - if (_providers.size() == 1) - ServerChannelRequesterImpl::create(_providers[0], transport, channelName, cid); - else - ServerChannelRequesterImpl::create(ServerSearchHandler::s_channelNameToProvider[channelName].lock(), transport, channelName, cid); // TODO !!!! + if (channelName == SERVER_CHANNEL_NAME) + { + // TODO singleton!!! + ServerRPCService::shared_pointer serverRPCService(new ServerRPCService(_context)); + + ChannelRequester::shared_pointer cr(new ServerChannelRequesterImpl(transport, channelName, cid)); + Channel::shared_pointer serverChannel = createRPCChannel(ChannelProvider::shared_pointer(), channelName, cr, serverRPCService); + cr->channelCreated(Status::Ok, serverChannel); + } + else + { + if (_providers.size() == 1) + ServerChannelRequesterImpl::create(_providers[0], transport, channelName, cid); + else + ServerChannelRequesterImpl::create(ServerSearchHandler::s_channelNameToProvider[channelName].lock(), transport, channelName, cid); // TODO !!!! + } } void ServerCreateChannelHandler::disconnect(Transport::shared_pointer const & transport) diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index 552c0c4..f708102 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -206,6 +206,8 @@ namespace pvAccess { std::size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer); private: + static epics::pvData::String SERVER_CHANNEL_NAME; + void disconnect(Transport::shared_pointer const & transport); std::vector _providers; }; @@ -215,13 +217,14 @@ namespace pvAccess { public TransportSender, public std::tr1::enable_shared_from_this { + friend class ServerCreateChannelHandler; public: typedef std::tr1::shared_ptr shared_pointer; typedef std::tr1::shared_ptr const_shared_pointer; protected: ServerChannelRequesterImpl(Transport::shared_pointer const & transport, const epics::pvData::String channelName, const pvAccessID cid); public: - virtual ~ServerChannelRequesterImpl() {} + virtual ~ServerChannelRequesterImpl() {} static ChannelRequester::shared_pointer create(ChannelProvider::shared_pointer const & provider, Transport::shared_pointer const & transport, const epics::pvData::String channelName, const pvAccessID cid); void channelCreated(const epics::pvData::Status& status, Channel::shared_pointer const & channel); void channelStateChange(Channel::shared_pointer const & c, const Channel::ConnectionState isConnected); diff --git a/testApp/remote/testServer.cpp b/testApp/remote/testServer.cpp index e4970ac..4669bab 100644 --- a/testApp/remote/testServer.cpp +++ b/testApp/remote/testServer.cpp @@ -2626,6 +2626,25 @@ public: return m_mockChannelFind; } + virtual ChannelFind::shared_pointer channelList( + ChannelListRequester::shared_pointer const & channelListRequester) + { + if (!channelListRequester.get()) + throw std::runtime_error("null requester"); + + // NOTE: this adds only active channels, not all (especially RPC ones) + std::set channelNames; + { + Lock guard(structureStoreMutex); + for (map::const_iterator iter = structureStore.begin(); + iter != structureStore.end(); + iter++) + channelNames.insert(iter->first); + } + channelListRequester->channelListResult(Status::Ok, m_mockChannelFind, channelNames, true); + return m_mockChannelFind; + } + virtual Channel::shared_pointer createChannel( epics::pvData::String const & channelName, ChannelRequester::shared_pointer const & channelRequester, diff --git a/testApp/remote/testServerContext.cpp b/testApp/remote/testServerContext.cpp index 2c41e68..a672e28 100644 --- a/testApp/remote/testServerContext.cpp +++ b/testApp/remote/testServerContext.cpp @@ -24,6 +24,14 @@ public: return nullCF; } + ChannelFind::shared_pointer channelList(ChannelListRequester::shared_pointer const & channelListRequester) + { + ChannelFind::shared_pointer nullCF; + std::set none; + channelListRequester->channelListResult(Status::Ok, nullCF, none, false); + return nullCF; + } + Channel::shared_pointer createChannel( epics::pvData::String const & channelName, ChannelRequester::shared_pointer const & channelRequester,