#include #include #include #include /* for pvdVersion.h */ #include #include #define epicsExportSharedSymbols #include "helper.h" #include "iocshelper.h" #include "pva2pva.h" #include "server.h" #if defined(PVDATA_VERSION_INT) #if PVDATA_VERSION_INT > VERSION_INT(7,0,0,0) # define USE_MSTATS #endif #endif namespace pva = epics::pvAccess; namespace pvd = epics::pvData; std::tr1::shared_ptr GWServerChannelProvider::getChannelProvider() { return shared_from_this(); } // Called from UDP search thread with no locks held // Called from TCP threads (for search w/ TCP) pva::ChannelFind::shared_pointer GWServerChannelProvider::channelFind(std::string const & channelName, pva::ChannelFindRequester::shared_pointer const & channelFindRequester) { pva::ChannelFind::shared_pointer ret; bool found = false; if(!channelName.empty()) { std::string newName; // rewrite name newName = channelName; //newName[0] = 'y'; ChannelCacheEntry::shared_pointer ent(cache.lookup(newName)); if(ent) { found = true; ret = shared_from_this(); } } // unlock for callback channelFindRequester->channelFindResult(pvd::Status::Ok, ret, found); return ret; } pva::ChannelFind::shared_pointer GWServerChannelProvider::channelList(pva::ChannelListRequester::shared_pointer const & channelListRequester) { std::cerr<<"GWServer does not advertise a channel list\n"; return pva::ChannelFind::shared_pointer(); } pva::Channel::shared_pointer GWServerChannelProvider::createChannel(std::string const & channelName, pva::ChannelRequester::shared_pointer const & channelRequester, short priority) { return createChannel(channelName, channelRequester, priority, "foobar"); } // The return value of this function is ignored // The newly created channel is given to the ChannelRequester pva::Channel::shared_pointer GWServerChannelProvider::createChannel(std::string const & channelName, pva::ChannelRequester::shared_pointer const & channelRequester, short priority, std::string const & addressx) { GWChannel::shared_pointer ret; std::string newName; std::string address = channelRequester->getRequesterName(); if(!channelName.empty()) { // rewrite name newName = channelName; //newName[0] = 'y'; Guard G(cache.cacheLock); ChannelCacheEntry::shared_pointer ent(cache.lookup(newName)); // recursively locks cacheLock if(ent) { ret.reset(new GWChannel(ent, shared_from_this(), channelRequester, address)); ent->interested.insert(ret); ret->weakref = ret; } } if(!ret) { std::cerr<<"GWServer refusing channel "<channelCreated(S, ret); } else { std::cerr<<"GWServer connecting channel "<channelCreated(pvd::Status::Ok, ret); channelRequester->channelStateChange(ret, pva::Channel::CONNECTED); } return ret; // ignored by caller } void GWServerChannelProvider::configure(epics::pvData::PVStructure::shared_pointer /*configuration*/) { std::cout<<"GWServer being configured\n"; } void GWServerChannelProvider::destroy() { std::cout<<"GWServer destory request\n"; } GWServerChannelProvider::GWServerChannelProvider(const pva::ChannelProvider::shared_pointer& prov) :cache(prov) { std::cout<<"GW Server ctor\n"; } GWServerChannelProvider::~GWServerChannelProvider() { std::cout<<"GW Server dtor\n"; } namespace { struct GWServerChannelProviderFactory : public pva::ChannelProviderFactory { pva::ChannelProvider::weak_pointer last_provider; virtual std::string getFactoryName() { return "GWServer"; } virtual pva::ChannelProvider::shared_pointer sharedInstance() { pva::ChannelProvider::shared_pointer P(last_provider.lock()); if(!P) { P.reset(new GWServerChannelProvider(pva::getChannelProviderRegistry()->getProvider("pva"))); last_provider = P; } return P; } virtual pva::ChannelProvider::shared_pointer newInstance() { pva::ChannelProvider::shared_pointer P(new GWServerChannelProvider(pva::getChannelProviderRegistry()->getProvider("pva"))); last_provider = P; return P; } }; static bool p2pServerRunning; static std::tr1::weak_ptr gblctx; //TODO mutex for this static void runGWServer(void *) { printf("Gateway server starting\n"); try{ pva::ServerContextImpl::shared_pointer ctx(pva::ServerContextImpl::create()); ctx->setChannelProviderName("GWServer"); ctx->initialize(pva::getChannelProviderRegistry()); printf("Gateway running\n"); gblctx = ctx; ctx->run(0); // zero means forever ? gblctx.reset(); printf("Gateway stopping\n"); ctx->destroy(); }catch(std::exception& e){ printf("Gateway server error: %s\n", e.what()); gblctx.reset(); } printf("Gateway stopped\n"); p2pServerRunning = false; } void startServer() { if(p2pServerRunning) { printf("Already started\n"); return; } epicsThreadMustCreate("gwserv", epicsThreadPriorityCAServerLow-2, epicsThreadGetStackSize(epicsThreadStackSmall), &runGWServer, NULL); p2pServerRunning = true; } void stopServer() { pva::ServerContextImpl::shared_pointer ctx(gblctx.lock()); if(ctx.get()) { printf("Reqesting stop\n"); ctx->shutdown(); } else printf("Not running\n"); } void infoServer(int lvl) { (void)lvl; pva::ServerContextImpl::shared_pointer ctx(gblctx.lock()); if(ctx) { ctx->printInfo(); } else { printf("Not running"); } } void statusServer(int lvl, const char *chanexpr) { try{ pva::ServerContextImpl::shared_pointer ctx(gblctx); if(!ctx) { std::cout<<"Not running\n"; return; } bool iswild = chanexpr ? (strchr(chanexpr, '?') || strchr(chanexpr, '*')) : false; if(chanexpr && lvl<1) lvl=1; // giving a channel implies at least channel level of detail const std::vector& prov(ctx->getChannelProviders()); std::cout<<"Server has "<getProviderName() : std::string("NULL"))<<"\n"; if(!p) continue; GWServerChannelProvider *scp = dynamic_cast(p); if(!scp) continue; ChannelCache::entries_t entries; size_t ncache, ncleaned, ndust; { Guard G(scp->cache.cacheLock); ncache = scp->cache.entries.size(); ncleaned = scp->cache.cleanerRuns; ndust = scp->cache.cleanerDust; if(lvl>0) { if(!chanexpr || iswild) { // no string or some glob pattern entries = scp->cache.entries; // copy of std::map } else if(chanexpr) { // just one channel AUTO_VAL(it, scp->cache.entries.find(chanexpr)); if(it!=scp->cache.entries.end()) entries[it->first] = it->second; } } } std::cout<<"Cache has "<first; if(iswild && !epicsStrGlobMatch(channame.c_str(), chanexpr)) continue; ChannelCacheEntry& E = *it->second; ChannelCacheEntry::mon_entries_t::lock_vector_type mons; size_t nsrv, nmon; bool dropflag; const char *chstate; { Guard G(E.mutex()); chstate = pva::Channel::ConnectionStateNames[E.channel->getConnectionState()]; nsrv = E.interested.size(); nmon = E.mon_entries.size(); dropflag = E.dropPoke; if(lvl>1) mons = E.mon_entries.lock_vector(); } std::cout<second; MonitorCacheEntry::interested_t::vector_type usrs; size_t nsrvmon; #ifdef USE_MSTATS pvd::Monitor::Stats mstats; #endif bool hastype, hasdata, isdone; { Guard G(ME.mutex()); nsrvmon = ME.interested.size(); hastype = !!ME.typedesc; hasdata = !!ME.lastelem; isdone = ME.done; #ifdef USE_MSTATS if(ME.mon) ME.mon->getStats(mstats); #endif if(lvl>2) usrs = ME.interested.lock_vector(); } // TODO: how to describe pvRequest in a compact way... std::cout<<" Client Monitor used by "<address; else remote = ""; } total = nempty + nfilled + nused; std::cout<<" Server monitor from " <& prov(ctx->getChannelProviders()); for(size_t i=0; i(p); if(!scp) continue; ChannelCacheEntry::shared_pointer entry; // find the channel, if it's there { Guard G(scp->cache.cacheLock); ChannelCache::entries_t::iterator it = scp->cache.entries.find(chan); if(it==scp->cache.entries.end()) continue; std::cout<<"Found in provider "<getProviderName()<<"\n"; entry = it->second; scp->cache.entries.erase(it); // drop out of cache (TODO: not required) } // trigger client side disconnect (recursively calls call CRequester::channelStateChange()) entry->channel->destroy(); } std::cout<<"Done\n"; }catch(std::exception& e){ std::cerr<<"Error: "<getChannelProviders()); if(lvl>0) std::cout<<"Server has "<(p); if(!scp) continue; ChannelCache::entries_t entries; { Guard G(scp->cache.cacheLock); entries = scp->cache.entries; // Copy } if(lvl>0) std::cout<<" Cache has "<second->mon_entries.lock_vector()); if(lvl>0) std::cout<<" Channel "<second->channelName <<" has "<second->interested); if(lvl>0) std::cout<<" Used by "<("gwstart"); iocshRegister<&stopServer>("gwstop"); iocshRegister("pvasr", "level"); iocshRegister("gwstatus", "level", "channel name/pattern"); iocshRegister("gwdrop", "channel"); iocshRegister("gwref", "level"); iocshRegister("pvadebug", "level"); } void gwServerShutdown() { pva::ServerContextImpl::shared_pointer P(gblctx.lock()); if(P) stopServer(); if(GWServerFactory) unregisterChannelProviderFactory(GWServerFactory); }