This commit is contained in:
Michael Davidsaver
2015-09-04 17:22:36 -04:00
parent 2f27b07120
commit 56d3b9b4e0
13 changed files with 1012 additions and 3 deletions

15
.gitignore vendored Normal file
View File

@ -0,0 +1,15 @@
bin/
lib/
doc/
include/
db/
dbd/
documentation/html
documentation/*.tag
envPaths
configure/*.local
configure/RELEASE.*
configure/CONFIG_SITE.*
!configure/ExampleRELEASE.local
**/O.*
QtC-*

15
Makefile Normal file
View File

@ -0,0 +1,15 @@
# Makefile at top of application tree
TOP = .
include $(TOP)/configure/CONFIG
# Directories to build, any order
DIRS += configure
DIRS += $(wildcard *App)
DIRS += $(wildcard iocBoot)
# iocBoot depends on all *App dirs
iocBoot_DEPEND_DIRS += $(filter %App,$(DIRS))
# Add any additional dependency rules here:
include $(TOP)/configure/RULES_TOP

54
README.md Normal file
View File

@ -0,0 +1,54 @@
PV Access to PV Access protocol gateway (aka. proxy)
Theory of Operation
The GW maintains a Channel Cache, which is a dictionary of client side channels
(shared_ptr<epics::pvAccess::Channel> instances)
in the NEVER_CONNECTED or CONNECTED states.
Each entry also has an activity flag and reference count.
The activity flag is set each time the server side receives a search request for a PV.
The reference count is incremented for each active server side channel.
Periodically the cache is iterated and any client channels with !activity and count==0 are dropped.
In addition the activity flag is unconditionally cleared.
Name search handling
The server side listens for name search requests.
When a request is received the channel cache is searched.
If no entry exists, then one is created and no further action is taken.
If an entry exists, but the client channel is not connected, then it's activiy flag is set and no further action is taken.
If a connected entry exists, then an affirmative response is sent to the requester.
When a channel create request is received, the channel cache is checked.
If no connected entry exists, then the request is failed.
Structure associations
ServerChannelProvider 1->1 ChannelCache (composed)
ChannelCache 1->N ChannelCacheEntry (map<shared_ptr<E> >)
ChannelCache :: cacheLock
ChannelCacheEntry 1->1 ChannelCache (C*)
ChannelCacheEntry 1->1 Channel (PVA Client) (shared_ptr<C>)
Channel (PVA Client) 1->1 CRequester (shared_ptr<R>)
Channel :: lock
CRequester 1->1 ChannelCacheEntry (weak_ptr<E>)
ChannelCacheEntry 1->N GWChannel (std<C*>)
GWChannel 1->1 ChannelCacheEntry (shared_ptr<E>)
ServerChannelRequesterImpl::channelStateChange() - placeholder, needs implementation

View File

@ -8,10 +8,13 @@ include $(TOP)/configure/CONFIG
PROD_HOST = p2p
p2p_SRCS +=
p2p_SRCS += main.cpp
p2p_SRCS += server.cpp
p2p_SRCS += client.cpp
p2p_SRCS += chancache.cpp
p2p_SRCS += channel.cpp
#p2p_LIBS += xxx
p2p_LIBS += $(EPICS_BASE_HOST_LIBS)
p2p_LIBS += pvAccess pvData Com
#===========================

172
p2pApp/chancache.cpp Normal file
View File

@ -0,0 +1,172 @@
#include <stdio.h>
#include <pv/epicsException.h>
#include <pv/serverContext.h>
#define epicsExportSharedSymbols
#include "pva2pva.h"
#include "chancache.h"
#include "channel.h"
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
ChannelCacheEntry::ChannelCacheEntry(ChannelCache* c, const std::string& n)
:channelName(n), cache(c), dropPoke(true)
{}
ChannelCacheEntry::~ChannelCacheEntry()
{
// Should *not* be holding cache->cacheLock
std::cout<<"Destroy client channel for '"<<channelName<<"'\n";
if(channel.get())
channel->destroy(); // calls channelStateChange() w/ DESTROY
}
std::string
ChannelCacheEntry::CRequester::getRequesterName()
{
return "GWClient";
}
ChannelCacheEntry::CRequester::~CRequester() {}
void
ChannelCacheEntry::CRequester::message(std::string const & message, pvd::MessageType messageType)
{
ChannelCacheEntry::shared_pointer chan(this->chan);
if(chan)
std::cout<<"message to client about '"<<chan->channelName<<"' : "<<message<<"\n";
}
// for ChannelRequester
void
ChannelCacheEntry::CRequester::channelCreated(const pvd::Status& status,
pva::Channel::shared_pointer const & channel)
{}
void
ChannelCacheEntry::CRequester::channelStateChange(pva::Channel::shared_pointer const & channel,
pva::Channel::ConnectionState connectionState)
{
ChannelCacheEntry::shared_pointer chan(this->chan.lock());
if(!chan)
return;
std::cout<<"Chan change '"<<chan->channelName<<"' is "
<<pva::Channel::ConnectionStateNames[connectionState]<<"\n";
ChannelCacheEntry::interested_t interested;
// fanout notification
{
Guard G(chan->cache->cacheLock);
assert(chan->channel.get()==channel.get());
switch(connectionState)
{
case pva::Channel::DISCONNECTED:
case pva::Channel::DESTROYED:
// Drop from cache
chan->cache->entries.erase(chan->channelName);
// keep 'chan' as a reference is that actual destruction doesn't happen which cacheLock is held
break;
default:
break;
}
interested = chan->interested; // Copy to allow unlock during callback
}
for(ChannelCacheEntry::interested_t::const_iterator it=interested.begin(), end=interested.end();
it!=end; ++it)
{
(*it)->requester->channelStateChange((*it)->shared_from_this(), connectionState);
}
}
struct ChannelCache::cacheClean : public epicsTimerNotify
{
ChannelCache *cache;
cacheClean(ChannelCache *c) : cache(c) {}
epicsTimerNotify::expireStatus expire(const epicsTime &currentTime)
{
// keep a reference to any cache entrys being removed so they
// aren't destroyed while cacheLock is held
std::set<ChannelCacheEntry::shared_pointer> cleaned;
{
Guard G(cache->cacheLock);
std::cout<<"GWServer cleaning cache w/ "<<cache->entries.size()<<" entries\n";
ChannelCache::entries_t::iterator cur=cache->entries.begin(), next, end=cache->entries.end();
while(cur!=end) {
next = cur;
++next;
if(!cur->second->dropPoke && cur->second->interested.empty()) {
//ChannelCacheEntry::shared_pointer E(cur->second);
std::cout<<"GWServer cache remove "<<cur->second->channelName<<"\n";
cleaned.insert(cur->second);
cache->entries.erase(cur);
} else {
cur->second->dropPoke = false;
std::cout<<"GWServer cache "<<cur->second->channelName
<<" interest "<<cur->second->interested.size()
<<"\n";
}
cur = next;
}
}
return epicsTimerNotify::expireStatus(epicsTimerNotify::restart, 30.0);
}
};
ChannelCache::ChannelCache()
:provider(pva::getChannelProviderRegistry()->getProvider("pva"))
,timerQueue(&epicsTimerQueueActive::allocate(1, epicsThreadPriorityCAServerLow-2))
,cleaner(new cacheClean(this))
{
if(!provider)
throw std::logic_error("Missing 'pva' provider");
assert(timerQueue);
cleanTimer = &timerQueue->createTimer();
cleanTimer->start(*cleaner, 30.0);
}
ChannelCache::~ChannelCache()
{
cleanTimer->destroy();
timerQueue->release();
delete cleaner;
}
// caller must host cacheLock
ChannelCacheEntry::shared_pointer
ChannelCache::get(const std::string& name)
{
entries_t::const_iterator it=entries.find(name);
if(it!=entries.end()) {
it->second->dropPoke = true;
return it->second;
}
ChannelCacheEntry::shared_pointer ent(new ChannelCacheEntry(this, name));
std::cout<<"Create client channel for '"<<name<<"'\n";
pva::ChannelRequester::shared_pointer req(new ChannelCacheEntry::CRequester(ent));
ent->channel = provider->createChannel(name, req);
if(!ent->channel)
THROW_EXCEPTION2(std::runtime_error, "Failed to createChannel");
assert(ent->channel->getChannelRequester().get()==req.get());
entries[name] = ent;
assert(entries.size()>0);
return ent;
}

85
p2pApp/chancache.h Normal file
View File

@ -0,0 +1,85 @@
#ifndef CHANCACHE_H
#define CHANCACHE_H
#include <string>
#include <map>
#include <set>
#include <epicsMutex.h>
#include <epicsTimer.h>
#include <pv/pvAccess.h>
struct ChannelCache;
struct GWChannel;
struct ChannelCacheEntry
{
POINTER_DEFINITIONS(ChannelCacheEntry);
struct Update {
virtual ~Update()=0;
virtual void channelStateChange(epics::pvAccess::Channel::ConnectionState connectionState) = 0;
};
const std::string channelName;
ChannelCache * const cache;
// clientChannel
epics::pvAccess::Channel::shared_pointer channel;
bool dropPoke;
typedef std::set<GWChannel*> interested_t;
interested_t interested;
ChannelCacheEntry(ChannelCache*, const std::string& n);
virtual ~ChannelCacheEntry();
// this exists as a seperate object to prevent a reference loop
// ChannelCacheEntry -> pva::Channel -> CRequester
struct CRequester : public epics::pvAccess::ChannelRequester
{
CRequester(const ChannelCacheEntry::shared_pointer& p) : chan(p) {}
virtual ~CRequester();
ChannelCacheEntry::weak_pointer chan;
// for Requester
virtual std::string getRequesterName();
virtual void message(std::string const & message, epics::pvData::MessageType messageType);
// for ChannelRequester
virtual void channelCreated(const epics::pvData::Status& status,
epics::pvAccess::Channel::shared_pointer const & channel);
virtual void channelStateChange(epics::pvAccess::Channel::shared_pointer const & channel,
epics::pvAccess::Channel::ConnectionState connectionState);
};
};
/** Holds the set of channels the GW is searching for, or has found.
*/
struct ChannelCache
{
typedef std::map<std::string, ChannelCacheEntry::shared_pointer > entries_t;
// cacheLock should not be held while calling *Requester methods
epicsMutex cacheLock;
entries_t entries;
epics::pvAccess::ChannelProvider::shared_pointer provider; // client Provider
epics::pvAccess::ChannelProvider::shared_pointer server; // GWServerChannelProvider
epicsTimerQueueActive *timerQueue;
epicsTimer *cleanTimer;
struct cacheClean;
cacheClean *cleaner;
ChannelCache();
~ChannelCache();
// caller must host cacheLock
ChannelCacheEntry::shared_pointer get(const std::string& name);
};
#endif // CHANCACHE_H

154
p2pApp/channel.cpp Normal file
View File

@ -0,0 +1,154 @@
#define epicsExportSharedSymbols
#include "pva2pva.h"
#include "channel.h"
namespace pva = epics::pvAccess;
namespace pvd = epics::pvData;
GWChannel::GWChannel(ChannelCacheEntry::shared_pointer e,
epics::pvAccess::ChannelRequester::shared_pointer r)
:entry(e)
,requester(r)
{
Guard G(entry->cache->cacheLock);
entry->interested.insert(this);
}
GWChannel::~GWChannel()
{
Guard G(entry->cache->cacheLock);
entry->interested.erase(this);
std::cout<<"GWChannel dtor '"<<entry->channelName<<"'\n";
}
std::string
GWChannel::getRequesterName()
{
return "GWChannel";
}
void
GWChannel::message(std::string const & message, pvd::MessageType messageType)
{
std::cout<<"message to client about '"<<entry->channelName<<"' : "<<message<<"\n";
}
std::tr1::shared_ptr<epics::pvAccess::ChannelProvider>
GWChannel::getProvider()
{
return entry->cache->server;
}
std::string
GWChannel::getRemoteAddress()
{
return "foobar";
}
pva::Channel::ConnectionState
GWChannel::getConnectionState()
{
return entry->channel->getConnectionState();
}
std::string
GWChannel::getChannelName()
{
return entry->channelName;
}
std::tr1::shared_ptr<epics::pvAccess::ChannelRequester>
GWChannel::getChannelRequester()
{
return requester;
}
bool
GWChannel::isConnected()
{
return entry->channel->isConnected();
}
void
GWChannel::getField(epics::pvAccess::GetFieldRequester::shared_pointer const & requester,
std::string const & subField)
{
//TODO: cache for top level field?
std::cout<<"getField for "<<entry->channelName<<" "<<subField<<"\n";
entry->channel->getField(requester, subField);
}
epics::pvAccess::AccessRights
GWChannel::getAccessRights(epics::pvData::PVField::shared_pointer const & pvField)
{
return entry->channel->getAccessRights(pvField);
}
epics::pvAccess::ChannelProcess::shared_pointer
GWChannel::createChannelProcess(
epics::pvAccess::ChannelProcessRequester::shared_pointer const & channelProcessRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelProcess(channelProcessRequester, pvRequest);
}
epics::pvAccess::ChannelGet::shared_pointer
GWChannel::createChannelGet(
epics::pvAccess::ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelGet(channelGetRequester, pvRequest);
}
epics::pvAccess::ChannelPut::shared_pointer
GWChannel::createChannelPut(
epics::pvAccess::ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelPut(channelPutRequester, pvRequest);
}
epics::pvAccess::ChannelPutGet::shared_pointer
GWChannel::createChannelPutGet(
epics::pvAccess::ChannelPutGetRequester::shared_pointer const & channelPutGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelPutGet(channelPutGetRequester, pvRequest);
}
epics::pvAccess::ChannelRPC::shared_pointer
GWChannel::createChannelRPC(
epics::pvAccess::ChannelRPCRequester::shared_pointer const & channelRPCRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelRPC(channelRPCRequester, pvRequest);
}
epics::pvData::Monitor::shared_pointer
GWChannel::createMonitor(
epics::pvData::MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
//TODO de-dup monitors
return entry->channel->createMonitor(monitorRequester, pvRequest);
}
epics::pvAccess::ChannelArray::shared_pointer
GWChannel::createChannelArray(
epics::pvAccess::ChannelArrayRequester::shared_pointer const & channelArrayRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelArray(channelArrayRequester, pvRequest);
}
void
GWChannel::printInfo()
{ printInfo(std::cout); }
void
GWChannel::printInfo(std::ostream& out)
{
out<<"GWChannel for "<<entry->channelName<<"\n";
}

65
p2pApp/channel.h Normal file
View File

@ -0,0 +1,65 @@
#ifndef CHANNEL_H
#define CHANNEL_H
#include <pv/pvAccess.h>
#include "chancache.h"
struct GWChannel : public epics::pvAccess::Channel,
std::tr1::enable_shared_from_this<GWChannel>
{
ChannelCacheEntry::shared_pointer entry;
epics::pvAccess::ChannelRequester::shared_pointer requester;
GWChannel(ChannelCacheEntry::shared_pointer e,
epics::pvAccess::ChannelRequester::shared_pointer);
virtual ~GWChannel();
// for Requester
virtual std::string getRequesterName();
virtual void message(std::string const & message, epics::pvData::MessageType messageType);
// for Destroyable
virtual void destroy(){}
// for Channel
virtual std::tr1::shared_ptr<epics::pvAccess::ChannelProvider> getProvider();
virtual std::string getRemoteAddress();
virtual ConnectionState getConnectionState();
virtual std::string getChannelName();
virtual std::tr1::shared_ptr<epics::pvAccess::ChannelRequester> getChannelRequester();
virtual bool isConnected();
virtual void getField(epics::pvAccess::GetFieldRequester::shared_pointer const & requester,
std::string const & subField);
virtual epics::pvAccess::AccessRights getAccessRights(epics::pvData::PVField::shared_pointer const & pvField);
virtual epics::pvAccess::ChannelProcess::shared_pointer createChannelProcess(
epics::pvAccess::ChannelProcessRequester::shared_pointer const & channelProcessRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual epics::pvAccess::ChannelGet::shared_pointer createChannelGet(
epics::pvAccess::ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual epics::pvAccess::ChannelPut::shared_pointer createChannelPut(
epics::pvAccess::ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual epics::pvAccess::ChannelPutGet::shared_pointer createChannelPutGet(
epics::pvAccess::ChannelPutGetRequester::shared_pointer const & channelPutGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual epics::pvAccess::ChannelRPC::shared_pointer createChannelRPC(
epics::pvAccess::ChannelRPCRequester::shared_pointer const & channelRPCRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual epics::pvData::Monitor::shared_pointer createMonitor(
epics::pvData::MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual epics::pvAccess::ChannelArray::shared_pointer createChannelArray(
epics::pvAccess::ChannelArrayRequester::shared_pointer const & channelArrayRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual void printInfo();
virtual void printInfo(std::ostream& out);
};
#endif // CHANNEL_H

25
p2pApp/client.cpp Normal file
View File

@ -0,0 +1,25 @@
#include <epicsGuard.h>
#include <pv/sharedPtr.h>
#include <pv/pvAccess.h>
#include <pv/clientFactory.h>
#define epicsExportSharedSymbols
#include "pva2pva.h"
#include "chancache.h"
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
void registerGWClientIocsh()
{
pva::ClientFactory::start();
}
void gwClientShutdown()
{
pva::ClientFactory::stop();
}

70
p2pApp/iocshelper.h Normal file
View File

@ -0,0 +1,70 @@
#ifndef IOCSHELPER_H
#define IOCSHELPER_H
#include <string>
#include <iocsh.h>
namespace detail {
template<typename T>
struct getarg {};
template<> struct getarg<int> {
static int op(const iocshArgBuf& a) { return a.ival; }
};
template<> struct getarg<double> {
static double op(const iocshArgBuf& a) { return a.dval; }
};
template<> struct getarg<char*> {
static char* op(const iocshArgBuf& a) { return a.sval; }
};
template<int N>
struct iocshFuncInfo{
iocshFuncDef def;
std::string name;
iocshArg *argarr[N];
iocshArg args[N];
std::string argnames[N];
iocshFuncInfo(const std::string& n) :name(n) {
def.name = name.c_str();
def.nargs = N;
def.arg = (iocshArg**)&argarr;
for(size_t i=0; i<N; i++)
argarr[i] = &args[i];
}
};
template<void (*fn)()>
static void call0(const iocshArgBuf *args)
{
fn();
}
template<typename T, void (*fn)(T)>
static void call1(const iocshArgBuf *args)
{
fn(getarg<T>::op(args[0]));
}
}
template<void (*fn)()>
void iocshRegister0(const char *name)
{
detail::iocshFuncInfo<0> *info = new detail::iocshFuncInfo<0>(name);
iocshRegister(&info->def, &detail::call0<fn>);
}
template<typename T, void (*fn)(T)>
void iocshRegister1(const char *name, const char *arg1name)
{
detail::iocshFuncInfo<1> *info = new detail::iocshFuncInfo<1>(name);
info->argnames[0] = arg1name;
info->args[0].name = info->argnames[0].c_str();
iocshRegister(&info->def, &detail::call1<T, fn>);
}
#endif // IOCSHELPER_H

17
p2pApp/main.cpp Normal file
View File

@ -0,0 +1,17 @@
#include <iocsh.h>
#define epicsExportSharedSymbols
#include "pva2pva.h"
int main(int argc, char *argv[])
{
registerGWClientIocsh();
registerGWServerIocsh();
if(argc>1)
iocsh(argv[1]);
int ret = iocsh(NULL);
gwServerShutdown();
gwClientShutdown();
return ret;
}

18
p2pApp/pva2pva.h Normal file
View File

@ -0,0 +1,18 @@
#ifndef PVA2PVA_H
#define PVA2PVA_H
#include <epicsGuard.h>
#include <pv/pvAccess.h>
#include <shareLib.h>
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
epicsShareExtern void registerGWServerIocsh();
epicsShareExtern void registerGWClientIocsh();
epicsShareExtern void gwServerShutdown();
epicsShareExtern void gwClientShutdown();
#endif // PVA2PVA_H

316
p2pApp/server.cpp Normal file
View File

@ -0,0 +1,316 @@
#include <stdio.h>
#include <pv/epicsException.h>
#include <pv/serverContext.h>
#define epicsExportSharedSymbols
#include "pva2pva.h"
#include "iocshelper.h"
#include "chancache.h"
#include "channel.h"
namespace pva = epics::pvAccess;
namespace pvd = epics::pvData;
namespace {
struct GWServerChannelProvider : public
pva::ChannelProvider,
pva::ChannelFind,
std::tr1::enable_shared_from_this<GWServerChannelProvider>
{
ChannelCache cache;
// for ChannelFind
virtual std::tr1::shared_ptr<ChannelProvider> getChannelProvider()
{
return this->shared_from_this();
}
virtual void cancel() {}
// For ChannelProvider
virtual std::string getProviderName() {
return "GWServer";
}
virtual pva::ChannelFind::shared_pointer channelFind(std::string const & channelName,
pva::ChannelFindRequester::shared_pointer const & channelFindRequester)
{
pva::ChannelFind::shared_pointer ret;
bool found = false;
// TODO
// until GW can bind client and server to specific (and different) interfaces
// use a naming convension to avoid loops (GW talks to itself).
// Server listens for names beginning with 'x',
// and re-writes these to start with 'y' for client search.
if(!channelName.empty() && channelName[0]=='x')
{
std::string newName;
// rewrite name
newName.reserve(channelName.size());
newName = "y";
newName.append(channelName.begin()+1, channelName.end());
Guard G(cache.cacheLock);
ChannelCache::entries_t::const_iterator it = cache.entries.find(newName);
if(it==cache.entries.end()) {
// first request, create ChannelCacheEntry
//TODO: async lookup
cache.get(newName);
assert(cache.entries.size()>0);
} else if(it->second->channel->isConnected()) {
// another request, and hey we're connected this time
ret=this->shared_from_this();
found=true;
std::cerr<<"GWServer accepting "<<channelName<<" as "<<newName<<"\n";
it->second->dropPoke = true;
} else {
// not connected yet, but a client is still interested
it->second->dropPoke = true;
std::cout<<"cache poke "<<newName<<"\n";
}
}
// unlock for callback
channelFindRequester->channelFindResult(pvd::Status::Ok, ret, found);
return ret;
}
virtual pva::ChannelFind::shared_pointer channelList(pva::ChannelListRequester::shared_pointer const & channelListRequester)
{
std::cerr<<"GWServer does not advertise a channel list\n";
return pva::ChannelFind::shared_pointer();
}
virtual pva::Channel::shared_pointer createChannel(std::string const & channelName,
pva::ChannelRequester::shared_pointer const & channelRequester,
short priority = PRIORITY_DEFAULT)
{
return createChannel(channelName, channelRequester, priority, "foobar");
}
virtual pva::Channel::shared_pointer createChannel(std::string const & channelName,
pva::ChannelRequester::shared_pointer const & channelRequester,
short priority, std::string const & address)
{
pva::Channel::shared_pointer ret;
std::string newName;
if(!channelName.empty() && channelName[0]=='x')
{
// rewrite name
newName.reserve(channelName.size());
newName = "y";
newName.append(channelName.begin()+1, channelName.end());
Guard G(cache.cacheLock);
ChannelCache::entries_t::const_iterator it = cache.entries.find(newName);
if(it!=cache.entries.end() && it->second->channel->isConnected())
{
ret.reset(new GWChannel(it->second, channelRequester));
}
}
if(!ret) {
std::cerr<<"GWServer refusing channel "<<channelName<<"\n";
pvd::Status S(pvd::Status::STATUSTYPE_ERROR, "Not found");
channelRequester->channelCreated(S, ret);
} else {
std::cerr<<"GWServer connecting channel "<<channelName<<" as "<<newName<<"\n";
channelRequester->channelCreated(pvd::Status::Ok, ret);
}
return ret;
}
virtual void configure(epics::pvData::PVStructure::shared_pointer /*configuration*/) {
std::cout<<"GWServer being configured\n";
}
virtual void destroy()
{
std::cout<<"GWServer destory request\n";
}
GWServerChannelProvider()
{
std::cout<<"GW Server ctor\n";
}
virtual ~GWServerChannelProvider()
{
std::cout<<"GW Server dtor\n";
}
};
struct GWServerChannelProviderFactory : public pva::ChannelProviderFactory
{
pva::ChannelProvider::weak_pointer last_provider;
virtual std::string getFactoryName()
{
return "GWServer";
}
virtual pva::ChannelProvider::shared_pointer sharedInstance()
{
pva::ChannelProvider::shared_pointer P(last_provider.lock());
if(!P) {
P.reset(new GWServerChannelProvider);
((GWServerChannelProvider*)P.get())->cache.server = P;
last_provider = P;
}
return P;
}
virtual pva::ChannelProvider::shared_pointer newInstance()
{
pva::ChannelProvider::shared_pointer P(new GWServerChannelProvider);
((GWServerChannelProvider*)P.get())->cache.server = P;
last_provider = P;
return P;
}
};
static
bool p2pServerRunning;
static
std::tr1::weak_ptr<pva::ServerContextImpl> gblctx;
static
void runGWServer(void *)
{
printf("Gateway server starting\n");
try{
pva::ServerContextImpl::shared_pointer ctx(pva::ServerContextImpl::create());
ctx->setChannelProviderName("GWServer");
ctx->initialize(pva::getChannelProviderRegistry());
ctx->printInfo();
printf("Gateway running\n");
gblctx = ctx;
ctx->run(0); // zero means forever ?
gblctx.reset();
printf("Gateway stopping\n");
ctx->destroy();
}catch(std::exception& e){
printf("Gateway server error: %s\n", e.what());
gblctx.reset();
}
printf("Gateway stopped\n");
p2pServerRunning = false;
}
void startServer()
{
if(p2pServerRunning) {
printf("Already started\n");
return;
}
epicsThreadMustCreate("gwserv",
epicsThreadPriorityCAServerLow-2,
epicsThreadGetStackSize(epicsThreadStackSmall),
&runGWServer, NULL);
p2pServerRunning = true;
}
void stopServer()
{
pva::ServerContextImpl::shared_pointer ctx(gblctx.lock());
if(ctx.get()) {
printf("Reqesting stop\n");
ctx->shutdown();
} else
printf("Not running\n");
}
void statusServer(int lvl)
{
try{
pva::ServerContextImpl::shared_pointer ctx(gblctx);
const std::vector<pva::ChannelProvider::shared_pointer>& prov(ctx->getChannelProviders());
std::cout<<"Server has "<<prov.size()<<" providers\n";
for(size_t i=0; i<prov.size(); i++)
{
pva::ChannelProvider* p = prov[i].get();
std::cout<<"Provider: "<<(p ? p->getProviderName() : std::string("NULL"))<<"\n";
if(!p) continue;
GWServerChannelProvider *scp = dynamic_cast<GWServerChannelProvider*>(p);
if(!scp) continue;
ChannelCache::entries_t entries;
{
Guard G(scp->cache.cacheLock);
std::cout<<"Cache has "<<scp->cache.entries.size()<<" channels\n";
if(lvl>0)
entries = scp->cache.entries; // copy of std::map
}
if(lvl<=0)
continue;
for(ChannelCache::entries_t::const_iterator it=entries.begin(), end=entries.end();
it!=end; ++it)
{
ChannelCacheEntry& E = *it->second;
std::cout<<pva::Channel::ConnectionStateNames[E.channel->getConnectionState()]
<<" Channel '"<<E.channelName<<"' with "<<E.interested.size()<<" clients\n";
}
}
}catch(std::exception& e){
std::cerr<<"Error: "<<e.what()<<"\n";
}
}
} // namespace
static
pva::ChannelProviderFactory::shared_pointer GWServerFactory;
void registerGWServerIocsh()
{
GWServerFactory.reset(new GWServerChannelProviderFactory);
pva::registerChannelProviderFactory(GWServerFactory);
iocshRegister0<&startServer>("gwstart");
iocshRegister0<&stopServer>("gwstop");
iocshRegister1<int, &statusServer>("gwstatus", "level");
}
void gwServerShutdown()
{
pva::ServerContextImpl::shared_pointer P(gblctx.lock());
if(P)
stopServer();
if(GWServerFactory)
unregisterChannelProviderFactory(GWServerFactory);
}