channelList

This commit is contained in:
Matej Sekoranja
2014-06-09 21:32:20 +02:00
parent b101fa1e7a
commit aea156ebbb
10 changed files with 273 additions and 10 deletions

View File

@@ -57,6 +57,19 @@ ChannelFind::shared_pointer CAChannelProvider::channelFind(
return nullChannelFind;
}
ChannelFind::shared_pointer CAChannelProvider::channelList(
ChannelListRequester::shared_pointer const & channelListRequester)
{
if (!channelListRequester.get())
throw std::runtime_error("null requester");
Status errorStatus(Status::STATUSTYPE_ERROR, "not implemented");
ChannelFind::shared_pointer nullChannelFind;
std::set<epics::pvData::String> none;
EXCEPTION_GUARD(channelListRequester->channelListResult(errorStatus, nullChannelFind, none, false));
return nullChannelFind;
}
Channel::shared_pointer CAChannelProvider::createChannel(
epics::pvData::String const & channelName,
ChannelRequester::shared_pointer const & channelRequester,

View File

@@ -35,6 +35,8 @@ public:
epics::pvData::String const & channelName,
ChannelFindRequester::shared_pointer const & channelFindRequester);
virtual ChannelFind::shared_pointer channelList(
ChannelListRequester::shared_pointer const & channelListRequester);
virtual Channel::shared_pointer createChannel(
epics::pvData::String const & channelName,

View File

@@ -8,6 +8,7 @@
#define PVACCESS_H
#include <vector>
#include <set>
#ifdef epicsExportSharedSymbols
# define pvAccessEpicsExportSharedSymbols
@@ -268,6 +269,24 @@ namespace pvAccess {
bool wasFound) = 0;
};
/**
*
*/
class epicsShareClass ChannelListRequester {
public:
POINTER_DEFINITIONS(ChannelListRequester);
virtual ~ChannelListRequester() {};
/**
* @param status Completion status.
*/
virtual void channelListResult(
const epics::pvData::Status& status,
ChannelFind::shared_pointer const & channelFind,
std::set<epics::pvData::String> const & channelNames,
bool hasDynamic) = 0;
};
/**
* Request to get data from a channel.
@@ -826,6 +845,13 @@ namespace pvAccess {
virtual ChannelFind::shared_pointer channelFind(epics::pvData::String const & channelName,
ChannelFindRequester::shared_pointer const & channelFindRequester) = 0;
/**
* Find channels.
* @param channelFindRequester The epics::pvData::Requester.
* @return An interface for the find.
*/
virtual ChannelFind::shared_pointer channelList(ChannelListRequester::shared_pointer const & channelListRequester) = 0;
/**
* Create a channel.
* @param channelName The name of the channel.

View File

@@ -3015,6 +3015,19 @@ namespace epics {
return nullChannelFind;
}
virtual ChannelFind::shared_pointer channelList(
ChannelListRequester::shared_pointer const & channelListRequester)
{
if (!channelListRequester.get())
throw std::runtime_error("null requester");
Status errorStatus(Status::STATUSTYPE_ERROR, "not implemented");
ChannelFind::shared_pointer nullChannelFind;
std::set<epics::pvData::String> none;
EXCEPTION_GUARD(channelListRequester->channelListResult(errorStatus, nullChannelFind, none, false));
return nullChannelFind;
}
virtual Channel::shared_pointer createChannel(
epics::pvData::String const & channelName,
ChannelRequester::shared_pointer const & channelRequester,

View File

@@ -316,6 +316,13 @@ public:
Status RPCChannel::notSupportedStatus(Status::STATUSTYPE_ERROR, "only channelRPC requests are supported by this channel");
Status RPCChannel::destroyedStatus(Status::STATUSTYPE_ERROR, "channel destroyed");
Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider,
epics::pvData::String const & channelName,
ChannelRequester::shared_pointer const & channelRequester,
RPCService::shared_pointer const & rpcService)
{
return Channel::shared_pointer(new RPCChannel(provider, channelName, channelRequester, rpcService));
}
class RPCChannelProvider :
@@ -362,7 +369,27 @@ public:
}
virtual Channel::shared_pointer createChannel(
virtual ChannelFind::shared_pointer channelList(
ChannelListRequester::shared_pointer const & channelListRequester)
{
if (!channelListRequester.get())
throw std::runtime_error("null requester");
std::set<epics::pvData::String> channelNames;
{
Lock guard(m_mutex);
for (RPCServiceMap::const_iterator iter = m_services.begin();
iter != m_services.end();
iter++)
channelNames.insert(iter->first);
}
ChannelFind::shared_pointer thisPtr(shared_from_this());
channelListRequester->channelListResult(Status::Ok, thisPtr, channelNames, false);
return thisPtr;
}
virtual Channel::shared_pointer createChannel(
epics::pvData::String const & channelName,
ChannelRequester::shared_pointer const & channelRequester,
short /*priority*/)

View File

@@ -64,6 +64,10 @@ class epicsShareClass RPCServer :
};
epicsShareExtern Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider,
epics::pvData::String const & channelName,
ChannelRequester::shared_pointer const & channelRequester,
RPCService::shared_pointer const & rpcService);
}}

View File

@@ -18,6 +18,7 @@
#include <sstream>
#include <pv/pvAccessMB.h>
#include <pv/rpcServer.h>
using std::ostringstream;
using std::hex;
@@ -379,6 +380,145 @@ void ServerChannelFindRequesterImpl::send(ByteBuffer* buffer, TransportSendContr
}
/****************************************************************************************/
class ChannelListRequesterImpl :
public ChannelListRequester
{
public:
POINTER_DEFINITIONS(ChannelListRequesterImpl);
std::set<String> channelNames;
Status status;
virtual void channelListResult(
const epics::pvData::Status& status,
ChannelFind::shared_pointer const & channelFind,
std::set<epics::pvData::String> const & channelNames,
bool hasDynamic)
{
epics::pvData::Lock lock(_waitMutex);
this->status = status;
this->channelNames = channelNames;
_waitEvent.signal();
}
bool waitForCompletion(int32 timeoutSec) {
return _waitEvent.wait(timeoutSec);
}
private:
epics::pvData::Mutex _waitMutex;
epics::pvData::Event _waitEvent;
};
// TODO move out to a separate class
class ServerRPCService : public RPCService {
private:
static int32 TIMEOUT_SEC;
static Structure::const_shared_pointer helpStructure;
static Structure::const_shared_pointer channelListStructure;
static std::string helpString;
ServerContextImpl::shared_pointer m_serverContext;
public:
ServerRPCService(ServerContextImpl::shared_pointer const & context) :
m_serverContext(context)
{
}
virtual epics::pvData::PVStructure::shared_pointer request(
epics::pvData::PVStructure::shared_pointer const & arguments
) throw (RPCRequestException)
{
// NTURI support
PVStructure::shared_pointer args(
(arguments->getStructure()->getID() == "uri:ev4:nt/2012/pwd:NTURI") ?
arguments->getStructureField("query") :
arguments
);
// help support
if (args->getSubField("help"))
{
PVStructure::shared_pointer help = getPVDataCreate()->createPVStructure(helpStructure);
help->getSubField<PVString>("value")->put(helpString);
return help;
}
PVString::shared_pointer opField = args->getSubField<PVString>("op");
if (!opField)
throw RPCRequestException(Status::STATUSTYPE_ERROR, "unspecified 'string op' field");
String op = opField->get();
if (op == "channels")
{
ChannelListRequesterImpl::shared_pointer listListener(new ChannelListRequesterImpl());
m_serverContext->getChannelProviders()[0]->channelList(listListener); // TODO multiple channel providers !!!!
if (!listListener->waitForCompletion(TIMEOUT_SEC))
throw RPCRequestException(Status::STATUSTYPE_ERROR, "failed to fetch channel list due to timeout");
Status& status = listListener->status;
if (!status.isSuccess())
{
String errorMessage = "failed to fetch channel list: " + status.getMessage();
if (!status.getStackDump().empty())
errorMessage += "\n" + status.getStackDump();
throw RPCRequestException(Status::STATUSTYPE_ERROR, errorMessage);
}
std::set<String>& channelNames = listListener->channelNames;
PVStructure::shared_pointer result =
getPVDataCreate()->createPVStructure(channelListStructure);
PVStringArray::shared_pointer pvArray = result->getSubField<PVStringArray>("value");
PVStringArray::svector newdata(channelNames.size());
size_t i = 0;
for (std::set<String>::const_iterator iter = channelNames.begin();
iter != channelNames.end();
iter++)
newdata[i++] = *iter;
pvArray->replace(freeze(newdata));
return result;
}
else
throw RPCRequestException(Status::STATUSTYPE_ERROR, "unsupported operation '" + op + "'.");
}
};
int32 ServerRPCService::TIMEOUT_SEC = 3;
Structure::const_shared_pointer ServerRPCService::helpStructure =
getFieldCreate()->createFieldBuilder()->
setId("uri:ev4:nt/2012/pwd:NTScalar")->
add("value", pvString)->
createStructure();
Structure::const_shared_pointer ServerRPCService::channelListStructure =
getFieldCreate()->createFieldBuilder()->
setId("uri:ev4:nt/2012/pwd:NTScalarArray")->
addArray("value", pvString)->
createStructure();
std::string ServerRPCService::helpString =
"pvAccess server RPC service.\n"
"arguments:\n"
"\tstring op\toperation to execute\n"
"\n"
"\toperations:\n"
"\t\tchannels\treturns a list of 'static' channels the server can provide\n"
"\t\t\t (no arguments)\n"
"\n";
epics::pvData::String ServerCreateChannelHandler::SERVER_CHANNEL_NAME = "server";
void ServerCreateChannelHandler::handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
size_t payloadSize, ByteBuffer* payloadBuffer)
@@ -414,14 +554,22 @@ void ServerCreateChannelHandler::handleResponse(osiSockAddr* responseFrom,
return;
}
// TODO !!!
//ServerChannelRequesterImpl::create(_providers[0], transport, channelName, cid);
if (_providers.size() == 1)
ServerChannelRequesterImpl::create(_providers[0], transport, channelName, cid);
else
ServerChannelRequesterImpl::create(ServerSearchHandler::s_channelNameToProvider[channelName].lock(), transport, channelName, cid); // TODO !!!!
if (channelName == SERVER_CHANNEL_NAME)
{
// TODO singleton!!!
ServerRPCService::shared_pointer serverRPCService(new ServerRPCService(_context));
ChannelRequester::shared_pointer cr(new ServerChannelRequesterImpl(transport, channelName, cid));
Channel::shared_pointer serverChannel = createRPCChannel(ChannelProvider::shared_pointer(), channelName, cr, serverRPCService);
cr->channelCreated(Status::Ok, serverChannel);
}
else
{
if (_providers.size() == 1)
ServerChannelRequesterImpl::create(_providers[0], transport, channelName, cid);
else
ServerChannelRequesterImpl::create(ServerSearchHandler::s_channelNameToProvider[channelName].lock(), transport, channelName, cid); // TODO !!!!
}
}
void ServerCreateChannelHandler::disconnect(Transport::shared_pointer const & transport)

View File

@@ -206,6 +206,8 @@ namespace pvAccess {
std::size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer);
private:
static epics::pvData::String SERVER_CHANNEL_NAME;
void disconnect(Transport::shared_pointer const & transport);
std::vector<ChannelProvider::shared_pointer> _providers;
};
@@ -215,13 +217,14 @@ namespace pvAccess {
public TransportSender,
public std::tr1::enable_shared_from_this<ServerChannelRequesterImpl>
{
friend class ServerCreateChannelHandler;
public:
typedef std::tr1::shared_ptr<ServerChannelRequesterImpl> shared_pointer;
typedef std::tr1::shared_ptr<const ServerChannelRequesterImpl> const_shared_pointer;
protected:
ServerChannelRequesterImpl(Transport::shared_pointer const & transport, const epics::pvData::String channelName, const pvAccessID cid);
public:
virtual ~ServerChannelRequesterImpl() {}
virtual ~ServerChannelRequesterImpl() {}
static ChannelRequester::shared_pointer create(ChannelProvider::shared_pointer const & provider, Transport::shared_pointer const & transport, const epics::pvData::String channelName, const pvAccessID cid);
void channelCreated(const epics::pvData::Status& status, Channel::shared_pointer const & channel);
void channelStateChange(Channel::shared_pointer const & c, const Channel::ConnectionState isConnected);

View File

@@ -2626,6 +2626,25 @@ public:
return m_mockChannelFind;
}
virtual ChannelFind::shared_pointer channelList(
ChannelListRequester::shared_pointer const & channelListRequester)
{
if (!channelListRequester.get())
throw std::runtime_error("null requester");
// NOTE: this adds only active channels, not all (especially RPC ones)
std::set<epics::pvData::String> channelNames;
{
Lock guard(structureStoreMutex);
for (map<String, PVStructure::shared_pointer>::const_iterator iter = structureStore.begin();
iter != structureStore.end();
iter++)
channelNames.insert(iter->first);
}
channelListRequester->channelListResult(Status::Ok, m_mockChannelFind, channelNames, true);
return m_mockChannelFind;
}
virtual Channel::shared_pointer createChannel(
epics::pvData::String const & channelName,
ChannelRequester::shared_pointer const & channelRequester,

View File

@@ -24,6 +24,14 @@ public:
return nullCF;
}
ChannelFind::shared_pointer channelList(ChannelListRequester::shared_pointer const & channelListRequester)
{
ChannelFind::shared_pointer nullCF;
std::set<String> none;
channelListRequester->channelListResult(Status::Ok, nullCF, none, false);
return nullCF;
}
Channel::shared_pointer createChannel(
epics::pvData::String const & channelName,
ChannelRequester::shared_pointer const & channelRequester,