From 56d3b9b4e07fd9b4231edb2e29c7568e6b8d7b70 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Fri, 4 Sep 2015 17:22:36 -0400 Subject: [PATCH] start --- .gitignore | 15 ++ Makefile | 15 ++ README.md | 54 ++++++++ p2pApp/Makefile | 9 +- p2pApp/chancache.cpp | 172 +++++++++++++++++++++++ p2pApp/chancache.h | 85 ++++++++++++ p2pApp/channel.cpp | 154 +++++++++++++++++++++ p2pApp/channel.h | 65 +++++++++ p2pApp/client.cpp | 25 ++++ p2pApp/iocshelper.h | 70 ++++++++++ p2pApp/main.cpp | 17 +++ p2pApp/pva2pva.h | 18 +++ p2pApp/server.cpp | 316 +++++++++++++++++++++++++++++++++++++++++++ 13 files changed, 1012 insertions(+), 3 deletions(-) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 README.md create mode 100644 p2pApp/chancache.cpp create mode 100644 p2pApp/chancache.h create mode 100644 p2pApp/channel.cpp create mode 100644 p2pApp/channel.h create mode 100644 p2pApp/client.cpp create mode 100644 p2pApp/iocshelper.h create mode 100644 p2pApp/main.cpp create mode 100644 p2pApp/pva2pva.h create mode 100644 p2pApp/server.cpp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7dde283 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +bin/ +lib/ +doc/ +include/ +db/ +dbd/ +documentation/html +documentation/*.tag +envPaths +configure/*.local +configure/RELEASE.* +configure/CONFIG_SITE.* +!configure/ExampleRELEASE.local +**/O.* +QtC-* diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a3b4363 --- /dev/null +++ b/Makefile @@ -0,0 +1,15 @@ +# Makefile at top of application tree +TOP = . +include $(TOP)/configure/CONFIG + +# Directories to build, any order +DIRS += configure +DIRS += $(wildcard *App) +DIRS += $(wildcard iocBoot) + +# iocBoot depends on all *App dirs +iocBoot_DEPEND_DIRS += $(filter %App,$(DIRS)) + +# Add any additional dependency rules here: + +include $(TOP)/configure/RULES_TOP diff --git a/README.md b/README.md new file mode 100644 index 0000000..13221f9 --- /dev/null +++ b/README.md @@ -0,0 +1,54 @@ +PV Access to PV Access protocol gateway (aka. proxy) + + +Theory of Operation + +The GW maintains a Channel Cache, which is a dictionary of client side channels +(shared_ptr instances) +in the NEVER_CONNECTED or CONNECTED states. + +Each entry also has an activity flag and reference count. + +The activity flag is set each time the server side receives a search request for a PV. + +The reference count is incremented for each active server side channel. + +Periodically the cache is iterated and any client channels with !activity and count==0 are dropped. +In addition the activity flag is unconditionally cleared. + + +Name search handling + +The server side listens for name search requests. +When a request is received the channel cache is searched. +If no entry exists, then one is created and no further action is taken. +If an entry exists, but the client channel is not connected, then it's activiy flag is set and no further action is taken. +If a connected entry exists, then an affirmative response is sent to the requester. + + +When a channel create request is received, the channel cache is checked. +If no connected entry exists, then the request is failed. + + +Structure associations + +ServerChannelProvider 1->1 ChannelCache (composed) + +ChannelCache 1->N ChannelCacheEntry (map >) +ChannelCache :: cacheLock + +ChannelCacheEntry 1->1 ChannelCache (C*) +ChannelCacheEntry 1->1 Channel (PVA Client) (shared_ptr) + +Channel (PVA Client) 1->1 CRequester (shared_ptr) +Channel :: lock + +CRequester 1->1 ChannelCacheEntry (weak_ptr) + +ChannelCacheEntry 1->N GWChannel (std) + +GWChannel 1->1 ChannelCacheEntry (shared_ptr) + + + +ServerChannelRequesterImpl::channelStateChange() - placeholder, needs implementation diff --git a/p2pApp/Makefile b/p2pApp/Makefile index 74af299..3a0cf88 100644 --- a/p2pApp/Makefile +++ b/p2pApp/Makefile @@ -8,10 +8,13 @@ include $(TOP)/configure/CONFIG PROD_HOST = p2p -p2p_SRCS += +p2p_SRCS += main.cpp +p2p_SRCS += server.cpp +p2p_SRCS += client.cpp +p2p_SRCS += chancache.cpp +p2p_SRCS += channel.cpp -#p2p_LIBS += xxx -p2p_LIBS += $(EPICS_BASE_HOST_LIBS) +p2p_LIBS += pvAccess pvData Com #=========================== diff --git a/p2pApp/chancache.cpp b/p2pApp/chancache.cpp new file mode 100644 index 0000000..e67211a --- /dev/null +++ b/p2pApp/chancache.cpp @@ -0,0 +1,172 @@ +#include + +#include +#include + + +#define epicsExportSharedSymbols +#include "pva2pva.h" +#include "chancache.h" +#include "channel.h" + +namespace pvd = epics::pvData; +namespace pva = epics::pvAccess; + +ChannelCacheEntry::ChannelCacheEntry(ChannelCache* c, const std::string& n) + :channelName(n), cache(c), dropPoke(true) +{} + +ChannelCacheEntry::~ChannelCacheEntry() +{ + // Should *not* be holding cache->cacheLock + std::cout<<"Destroy client channel for '"<destroy(); // calls channelStateChange() w/ DESTROY +} + +std::string +ChannelCacheEntry::CRequester::getRequesterName() +{ + return "GWClient"; +} + +ChannelCacheEntry::CRequester::~CRequester() {} + +void +ChannelCacheEntry::CRequester::message(std::string const & message, pvd::MessageType messageType) +{ + ChannelCacheEntry::shared_pointer chan(this->chan); + if(chan) + std::cout<<"message to client about '"<channelName<<"' : "<chan.lock()); + if(!chan) + return; + + std::cout<<"Chan change '"<channelName<<"' is " + <cache->cacheLock); + + assert(chan->channel.get()==channel.get()); + + switch(connectionState) + { + case pva::Channel::DISCONNECTED: + case pva::Channel::DESTROYED: + // Drop from cache + chan->cache->entries.erase(chan->channelName); + // keep 'chan' as a reference is that actual destruction doesn't happen which cacheLock is held + break; + default: + break; + } + + interested = chan->interested; // Copy to allow unlock during callback + } + + for(ChannelCacheEntry::interested_t::const_iterator it=interested.begin(), end=interested.end(); + it!=end; ++it) + { + (*it)->requester->channelStateChange((*it)->shared_from_this(), connectionState); + } +} + + +struct ChannelCache::cacheClean : public epicsTimerNotify +{ + ChannelCache *cache; + cacheClean(ChannelCache *c) : cache(c) {} + epicsTimerNotify::expireStatus expire(const epicsTime ¤tTime) + { + // keep a reference to any cache entrys being removed so they + // aren't destroyed while cacheLock is held + std::set cleaned; + + { + Guard G(cache->cacheLock); + std::cout<<"GWServer cleaning cache w/ "<entries.size()<<" entries\n"; + + ChannelCache::entries_t::iterator cur=cache->entries.begin(), next, end=cache->entries.end(); + while(cur!=end) { + next = cur; + ++next; + + if(!cur->second->dropPoke && cur->second->interested.empty()) { + //ChannelCacheEntry::shared_pointer E(cur->second); + std::cout<<"GWServer cache remove "<second->channelName<<"\n"; + cleaned.insert(cur->second); + cache->entries.erase(cur); + } else { + cur->second->dropPoke = false; + std::cout<<"GWServer cache "<second->channelName + <<" interest "<second->interested.size() + <<"\n"; + } + + cur = next; + } + } + return epicsTimerNotify::expireStatus(epicsTimerNotify::restart, 30.0); + } +}; + +ChannelCache::ChannelCache() + :provider(pva::getChannelProviderRegistry()->getProvider("pva")) + ,timerQueue(&epicsTimerQueueActive::allocate(1, epicsThreadPriorityCAServerLow-2)) + ,cleaner(new cacheClean(this)) +{ + if(!provider) + throw std::logic_error("Missing 'pva' provider"); + assert(timerQueue); + cleanTimer = &timerQueue->createTimer(); + cleanTimer->start(*cleaner, 30.0); +} + +ChannelCache::~ChannelCache() +{ + cleanTimer->destroy(); + timerQueue->release(); + delete cleaner; +} + +// caller must host cacheLock +ChannelCacheEntry::shared_pointer +ChannelCache::get(const std::string& name) +{ + entries_t::const_iterator it=entries.find(name); + if(it!=entries.end()) { + it->second->dropPoke = true; + return it->second; + } + + ChannelCacheEntry::shared_pointer ent(new ChannelCacheEntry(this, name)); + + std::cout<<"Create client channel for '"<channel = provider->createChannel(name, req); + if(!ent->channel) + THROW_EXCEPTION2(std::runtime_error, "Failed to createChannel"); + assert(ent->channel->getChannelRequester().get()==req.get()); + + entries[name] = ent; + assert(entries.size()>0); + return ent; +} diff --git a/p2pApp/chancache.h b/p2pApp/chancache.h new file mode 100644 index 0000000..64bfe67 --- /dev/null +++ b/p2pApp/chancache.h @@ -0,0 +1,85 @@ +#ifndef CHANCACHE_H +#define CHANCACHE_H + +#include +#include +#include + +#include +#include + +#include + +struct ChannelCache; +struct GWChannel; + +struct ChannelCacheEntry +{ + POINTER_DEFINITIONS(ChannelCacheEntry); + + struct Update { + virtual ~Update()=0; + virtual void channelStateChange(epics::pvAccess::Channel::ConnectionState connectionState) = 0; + }; + + const std::string channelName; + ChannelCache * const cache; + + // clientChannel + epics::pvAccess::Channel::shared_pointer channel; + + bool dropPoke; + + typedef std::set interested_t; + interested_t interested; + + ChannelCacheEntry(ChannelCache*, const std::string& n); + virtual ~ChannelCacheEntry(); + + // this exists as a seperate object to prevent a reference loop + // ChannelCacheEntry -> pva::Channel -> CRequester + struct CRequester : public epics::pvAccess::ChannelRequester + { + CRequester(const ChannelCacheEntry::shared_pointer& p) : chan(p) {} + virtual ~CRequester(); + ChannelCacheEntry::weak_pointer chan; + // for Requester + virtual std::string getRequesterName(); + virtual void message(std::string const & message, epics::pvData::MessageType messageType); + // for ChannelRequester + virtual void channelCreated(const epics::pvData::Status& status, + epics::pvAccess::Channel::shared_pointer const & channel); + virtual void channelStateChange(epics::pvAccess::Channel::shared_pointer const & channel, + epics::pvAccess::Channel::ConnectionState connectionState); + }; +}; + +/** Holds the set of channels the GW is searching for, or has found. + */ +struct ChannelCache +{ + typedef std::map entries_t; + + // cacheLock should not be held while calling *Requester methods + epicsMutex cacheLock; + + entries_t entries; + + epics::pvAccess::ChannelProvider::shared_pointer provider; // client Provider + epics::pvAccess::ChannelProvider::shared_pointer server; // GWServerChannelProvider + + epicsTimerQueueActive *timerQueue; + epicsTimer *cleanTimer; + struct cacheClean; + cacheClean *cleaner; + + ChannelCache(); + ~ChannelCache(); + + // caller must host cacheLock + ChannelCacheEntry::shared_pointer get(const std::string& name); + +}; + +#endif // CHANCACHE_H + diff --git a/p2pApp/channel.cpp b/p2pApp/channel.cpp new file mode 100644 index 0000000..80f66ac --- /dev/null +++ b/p2pApp/channel.cpp @@ -0,0 +1,154 @@ + +#define epicsExportSharedSymbols +#include "pva2pva.h" +#include "channel.h" + +namespace pva = epics::pvAccess; +namespace pvd = epics::pvData; + +GWChannel::GWChannel(ChannelCacheEntry::shared_pointer e, + epics::pvAccess::ChannelRequester::shared_pointer r) + :entry(e) + ,requester(r) +{ + Guard G(entry->cache->cacheLock); + entry->interested.insert(this); +} + +GWChannel::~GWChannel() +{ + Guard G(entry->cache->cacheLock); + entry->interested.erase(this); + std::cout<<"GWChannel dtor '"<channelName<<"'\n"; +} + +std::string +GWChannel::getRequesterName() +{ + return "GWChannel"; +} + +void +GWChannel::message(std::string const & message, pvd::MessageType messageType) +{ + std::cout<<"message to client about '"<channelName<<"' : "< +GWChannel::getProvider() +{ + return entry->cache->server; +} + +std::string +GWChannel::getRemoteAddress() +{ + return "foobar"; +} + +pva::Channel::ConnectionState +GWChannel::getConnectionState() +{ + return entry->channel->getConnectionState(); +} + +std::string +GWChannel::getChannelName() +{ + return entry->channelName; +} + +std::tr1::shared_ptr +GWChannel::getChannelRequester() +{ + return requester; +} + +bool +GWChannel::isConnected() +{ + return entry->channel->isConnected(); +} + + +void +GWChannel::getField(epics::pvAccess::GetFieldRequester::shared_pointer const & requester, + std::string const & subField) +{ + //TODO: cache for top level field? + std::cout<<"getField for "<channelName<<" "<channel->getField(requester, subField); +} + +epics::pvAccess::AccessRights +GWChannel::getAccessRights(epics::pvData::PVField::shared_pointer const & pvField) +{ + return entry->channel->getAccessRights(pvField); +} + +epics::pvAccess::ChannelProcess::shared_pointer +GWChannel::createChannelProcess( + epics::pvAccess::ChannelProcessRequester::shared_pointer const & channelProcessRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +{ + return entry->channel->createChannelProcess(channelProcessRequester, pvRequest); +} + +epics::pvAccess::ChannelGet::shared_pointer +GWChannel::createChannelGet( + epics::pvAccess::ChannelGetRequester::shared_pointer const & channelGetRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +{ + return entry->channel->createChannelGet(channelGetRequester, pvRequest); +} + +epics::pvAccess::ChannelPut::shared_pointer +GWChannel::createChannelPut( + epics::pvAccess::ChannelPutRequester::shared_pointer const & channelPutRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +{ + return entry->channel->createChannelPut(channelPutRequester, pvRequest); +} + +epics::pvAccess::ChannelPutGet::shared_pointer +GWChannel::createChannelPutGet( + epics::pvAccess::ChannelPutGetRequester::shared_pointer const & channelPutGetRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +{ + return entry->channel->createChannelPutGet(channelPutGetRequester, pvRequest); +} + +epics::pvAccess::ChannelRPC::shared_pointer +GWChannel::createChannelRPC( + epics::pvAccess::ChannelRPCRequester::shared_pointer const & channelRPCRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +{ + return entry->channel->createChannelRPC(channelRPCRequester, pvRequest); +} + +epics::pvData::Monitor::shared_pointer +GWChannel::createMonitor( + epics::pvData::MonitorRequester::shared_pointer const & monitorRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +{ + //TODO de-dup monitors + return entry->channel->createMonitor(monitorRequester, pvRequest); +} + +epics::pvAccess::ChannelArray::shared_pointer +GWChannel::createChannelArray( + epics::pvAccess::ChannelArrayRequester::shared_pointer const & channelArrayRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +{ + return entry->channel->createChannelArray(channelArrayRequester, pvRequest); +} + + +void +GWChannel::printInfo() +{ printInfo(std::cout); } +void +GWChannel::printInfo(std::ostream& out) +{ + out<<"GWChannel for "<channelName<<"\n"; +} diff --git a/p2pApp/channel.h b/p2pApp/channel.h new file mode 100644 index 0000000..8693972 --- /dev/null +++ b/p2pApp/channel.h @@ -0,0 +1,65 @@ +#ifndef CHANNEL_H +#define CHANNEL_H + +#include + +#include "chancache.h" + +struct GWChannel : public epics::pvAccess::Channel, + std::tr1::enable_shared_from_this +{ + ChannelCacheEntry::shared_pointer entry; + epics::pvAccess::ChannelRequester::shared_pointer requester; + + GWChannel(ChannelCacheEntry::shared_pointer e, + epics::pvAccess::ChannelRequester::shared_pointer); + virtual ~GWChannel(); + + + // for Requester + virtual std::string getRequesterName(); + virtual void message(std::string const & message, epics::pvData::MessageType messageType); + + // for Destroyable + virtual void destroy(){} + + // for Channel + virtual std::tr1::shared_ptr getProvider(); + virtual std::string getRemoteAddress(); + + virtual ConnectionState getConnectionState(); + virtual std::string getChannelName(); + virtual std::tr1::shared_ptr getChannelRequester(); + virtual bool isConnected(); + + virtual void getField(epics::pvAccess::GetFieldRequester::shared_pointer const & requester, + std::string const & subField); + virtual epics::pvAccess::AccessRights getAccessRights(epics::pvData::PVField::shared_pointer const & pvField); + virtual epics::pvAccess::ChannelProcess::shared_pointer createChannelProcess( + epics::pvAccess::ChannelProcessRequester::shared_pointer const & channelProcessRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest); + virtual epics::pvAccess::ChannelGet::shared_pointer createChannelGet( + epics::pvAccess::ChannelGetRequester::shared_pointer const & channelGetRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest); + virtual epics::pvAccess::ChannelPut::shared_pointer createChannelPut( + epics::pvAccess::ChannelPutRequester::shared_pointer const & channelPutRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest); + virtual epics::pvAccess::ChannelPutGet::shared_pointer createChannelPutGet( + epics::pvAccess::ChannelPutGetRequester::shared_pointer const & channelPutGetRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest); + virtual epics::pvAccess::ChannelRPC::shared_pointer createChannelRPC( + epics::pvAccess::ChannelRPCRequester::shared_pointer const & channelRPCRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest); + virtual epics::pvData::Monitor::shared_pointer createMonitor( + epics::pvData::MonitorRequester::shared_pointer const & monitorRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest); + virtual epics::pvAccess::ChannelArray::shared_pointer createChannelArray( + epics::pvAccess::ChannelArrayRequester::shared_pointer const & channelArrayRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest); + + virtual void printInfo(); + virtual void printInfo(std::ostream& out); + +}; + +#endif // CHANNEL_H diff --git a/p2pApp/client.cpp b/p2pApp/client.cpp new file mode 100644 index 0000000..5bea50f --- /dev/null +++ b/p2pApp/client.cpp @@ -0,0 +1,25 @@ + +#include + +#include +#include +#include + +#define epicsExportSharedSymbols +#include "pva2pva.h" +#include "chancache.h" + +namespace pvd = epics::pvData; +namespace pva = epics::pvAccess; + + +void registerGWClientIocsh() +{ + pva::ClientFactory::start(); + +} + +void gwClientShutdown() +{ + pva::ClientFactory::stop(); +} diff --git a/p2pApp/iocshelper.h b/p2pApp/iocshelper.h new file mode 100644 index 0000000..972696f --- /dev/null +++ b/p2pApp/iocshelper.h @@ -0,0 +1,70 @@ +#ifndef IOCSHELPER_H +#define IOCSHELPER_H + +#include + +#include + +namespace detail { + +template +struct getarg {}; +template<> struct getarg { + static int op(const iocshArgBuf& a) { return a.ival; } +}; +template<> struct getarg { + static double op(const iocshArgBuf& a) { return a.dval; } +}; +template<> struct getarg { + static char* op(const iocshArgBuf& a) { return a.sval; } +}; + + +template +struct iocshFuncInfo{ + iocshFuncDef def; + std::string name; + iocshArg *argarr[N]; + iocshArg args[N]; + std::string argnames[N]; + iocshFuncInfo(const std::string& n) :name(n) { + def.name = name.c_str(); + def.nargs = N; + def.arg = (iocshArg**)&argarr; + for(size_t i=0; i +static void call0(const iocshArgBuf *args) +{ + fn(); +} + + +template +static void call1(const iocshArgBuf *args) +{ + fn(getarg::op(args[0])); +} +} + + +template +void iocshRegister0(const char *name) +{ + detail::iocshFuncInfo<0> *info = new detail::iocshFuncInfo<0>(name); + iocshRegister(&info->def, &detail::call0); +} + +template +void iocshRegister1(const char *name, const char *arg1name) +{ + detail::iocshFuncInfo<1> *info = new detail::iocshFuncInfo<1>(name); + info->argnames[0] = arg1name; + info->args[0].name = info->argnames[0].c_str(); + iocshRegister(&info->def, &detail::call1); +} + +#endif // IOCSHELPER_H diff --git a/p2pApp/main.cpp b/p2pApp/main.cpp new file mode 100644 index 0000000..a696979 --- /dev/null +++ b/p2pApp/main.cpp @@ -0,0 +1,17 @@ + +#include + +#define epicsExportSharedSymbols +#include "pva2pva.h" + +int main(int argc, char *argv[]) +{ + registerGWClientIocsh(); + registerGWServerIocsh(); + if(argc>1) + iocsh(argv[1]); + int ret = iocsh(NULL); + gwServerShutdown(); + gwClientShutdown(); + return ret; +} diff --git a/p2pApp/pva2pva.h b/p2pApp/pva2pva.h new file mode 100644 index 0000000..3dc3612 --- /dev/null +++ b/p2pApp/pva2pva.h @@ -0,0 +1,18 @@ +#ifndef PVA2PVA_H +#define PVA2PVA_H + +#include + +#include + +#include + +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + +epicsShareExtern void registerGWServerIocsh(); +epicsShareExtern void registerGWClientIocsh(); +epicsShareExtern void gwServerShutdown(); +epicsShareExtern void gwClientShutdown(); + +#endif // PVA2PVA_H diff --git a/p2pApp/server.cpp b/p2pApp/server.cpp new file mode 100644 index 0000000..2bd42d2 --- /dev/null +++ b/p2pApp/server.cpp @@ -0,0 +1,316 @@ +#include + +#include +#include + + +#define epicsExportSharedSymbols +#include "pva2pva.h" +#include "iocshelper.h" +#include "chancache.h" +#include "channel.h" + +namespace pva = epics::pvAccess; +namespace pvd = epics::pvData; + +namespace { + +struct GWServerChannelProvider : public + pva::ChannelProvider, + pva::ChannelFind, + std::tr1::enable_shared_from_this +{ + ChannelCache cache; + + // for ChannelFind + + virtual std::tr1::shared_ptr getChannelProvider() + { + return this->shared_from_this(); + } + + virtual void cancel() {} + + // For ChannelProvider + + virtual std::string getProviderName() { + return "GWServer"; + } + + virtual pva::ChannelFind::shared_pointer channelFind(std::string const & channelName, + pva::ChannelFindRequester::shared_pointer const & channelFindRequester) + { + pva::ChannelFind::shared_pointer ret; + bool found = false; + + // TODO + // until GW can bind client and server to specific (and different) interfaces + // use a naming convension to avoid loops (GW talks to itself). + // Server listens for names beginning with 'x', + // and re-writes these to start with 'y' for client search. + if(!channelName.empty() && channelName[0]=='x') + { + std::string newName; + + // rewrite name + newName.reserve(channelName.size()); + newName = "y"; + newName.append(channelName.begin()+1, channelName.end()); + + Guard G(cache.cacheLock); + + ChannelCache::entries_t::const_iterator it = cache.entries.find(newName); + + if(it==cache.entries.end()) { + // first request, create ChannelCacheEntry + //TODO: async lookup + cache.get(newName); + + assert(cache.entries.size()>0); + + } else if(it->second->channel->isConnected()) { + // another request, and hey we're connected this time + + ret=this->shared_from_this(); + found=true; + std::cerr<<"GWServer accepting "<second->dropPoke = true; + + } else { + // not connected yet, but a client is still interested + it->second->dropPoke = true; + std::cout<<"cache poke "<channelFindResult(pvd::Status::Ok, ret, found); + + return ret; + } + + virtual pva::ChannelFind::shared_pointer channelList(pva::ChannelListRequester::shared_pointer const & channelListRequester) + { + std::cerr<<"GWServer does not advertise a channel list\n"; + return pva::ChannelFind::shared_pointer(); + } + + virtual pva::Channel::shared_pointer createChannel(std::string const & channelName, + pva::ChannelRequester::shared_pointer const & channelRequester, + short priority = PRIORITY_DEFAULT) + { + return createChannel(channelName, channelRequester, priority, "foobar"); + } + + virtual pva::Channel::shared_pointer createChannel(std::string const & channelName, + pva::ChannelRequester::shared_pointer const & channelRequester, + short priority, std::string const & address) + { + pva::Channel::shared_pointer ret; + std::string newName; + + if(!channelName.empty() && channelName[0]=='x') + { + + // rewrite name + newName.reserve(channelName.size()); + newName = "y"; + newName.append(channelName.begin()+1, channelName.end()); + + Guard G(cache.cacheLock); + + ChannelCache::entries_t::const_iterator it = cache.entries.find(newName); + if(it!=cache.entries.end() && it->second->channel->isConnected()) + { + ret.reset(new GWChannel(it->second, channelRequester)); + } + } + + if(!ret) { + std::cerr<<"GWServer refusing channel "<channelCreated(S, ret); + } else { + std::cerr<<"GWServer connecting channel "<channelCreated(pvd::Status::Ok, ret); + } + + return ret; + } + + virtual void configure(epics::pvData::PVStructure::shared_pointer /*configuration*/) { + std::cout<<"GWServer being configured\n"; + } + + virtual void destroy() + { + std::cout<<"GWServer destory request\n"; + } + + GWServerChannelProvider() + { + std::cout<<"GW Server ctor\n"; + } + virtual ~GWServerChannelProvider() + { + std::cout<<"GW Server dtor\n"; + } +}; + +struct GWServerChannelProviderFactory : public pva::ChannelProviderFactory +{ + pva::ChannelProvider::weak_pointer last_provider; + + virtual std::string getFactoryName() + { + return "GWServer"; + } + + virtual pva::ChannelProvider::shared_pointer sharedInstance() + { + pva::ChannelProvider::shared_pointer P(last_provider.lock()); + if(!P) { + P.reset(new GWServerChannelProvider); + ((GWServerChannelProvider*)P.get())->cache.server = P; + last_provider = P; + } + return P; + } + + virtual pva::ChannelProvider::shared_pointer newInstance() + { + pva::ChannelProvider::shared_pointer P(new GWServerChannelProvider); + ((GWServerChannelProvider*)P.get())->cache.server = P; + last_provider = P; + return P; + } +}; + +static +bool p2pServerRunning; + +static +std::tr1::weak_ptr gblctx; + +static +void runGWServer(void *) +{ + printf("Gateway server starting\n"); + try{ + pva::ServerContextImpl::shared_pointer ctx(pva::ServerContextImpl::create()); + + ctx->setChannelProviderName("GWServer"); + + ctx->initialize(pva::getChannelProviderRegistry()); + ctx->printInfo(); + + printf("Gateway running\n"); + gblctx = ctx; + ctx->run(0); // zero means forever ? + gblctx.reset(); + printf("Gateway stopping\n"); + + ctx->destroy(); + }catch(std::exception& e){ + printf("Gateway server error: %s\n", e.what()); + gblctx.reset(); + } + printf("Gateway stopped\n"); + p2pServerRunning = false; +} + +void startServer() +{ + if(p2pServerRunning) { + printf("Already started\n"); + return; + } + + epicsThreadMustCreate("gwserv", + epicsThreadPriorityCAServerLow-2, + epicsThreadGetStackSize(epicsThreadStackSmall), + &runGWServer, NULL); + + p2pServerRunning = true; +} + +void stopServer() +{ + pva::ServerContextImpl::shared_pointer ctx(gblctx.lock()); + + if(ctx.get()) { + printf("Reqesting stop\n"); + ctx->shutdown(); + } else + printf("Not running\n"); +} + +void statusServer(int lvl) +{ + try{ + pva::ServerContextImpl::shared_pointer ctx(gblctx); + + const std::vector& prov(ctx->getChannelProviders()); + + std::cout<<"Server has "<getProviderName() : std::string("NULL"))<<"\n"; + if(!p) continue; + GWServerChannelProvider *scp = dynamic_cast(p); + if(!scp) continue; + + ChannelCache::entries_t entries; + + { + Guard G(scp->cache.cacheLock); + + std::cout<<"Cache has "<cache.entries.size()<<" channels\n"; + + if(lvl>0) + entries = scp->cache.entries; // copy of std::map + } + + if(lvl<=0) + continue; + + for(ChannelCache::entries_t::const_iterator it=entries.begin(), end=entries.end(); + it!=end; ++it) + { + ChannelCacheEntry& E = *it->second; + std::cout<getConnectionState()] + <<" Channel '"<("gwstart"); + iocshRegister0<&stopServer>("gwstop"); + iocshRegister1("gwstatus", "level"); + +} + +void gwServerShutdown() +{ + pva::ServerContextImpl::shared_pointer P(gblctx.lock()); + if(P) + stopServer(); + if(GWServerFactory) + unregisterChannelProviderFactory(GWServerFactory); +}