#include #include #include #include /* for pvdVersion.h */ #include #include #include #define epicsExportSharedSymbols #include "helper.h" #include "iocshelper.h" #include "pva2pva.h" #include "server.h" #if defined(PVDATA_VERSION_INT) #if PVDATA_VERSION_INT > VERSION_INT(7,0,0,0) # define USE_MSTATS #endif #endif namespace pva = epics::pvAccess; namespace pvd = epics::pvData; std::tr1::shared_ptr GWServerChannelProvider::getChannelProvider() { return shared_from_this(); } // Called from UDP search thread with no locks held // Called from TCP threads (for search w/ TCP) pva::ChannelFind::shared_pointer GWServerChannelProvider::channelFind(std::string const & channelName, pva::ChannelFindRequester::shared_pointer const & channelFindRequester) { pva::ChannelFind::shared_pointer ret; bool found = false; if(!channelName.empty()) { std::string newName; // rewrite name newName = channelName; //newName[0] = 'y'; ChannelCacheEntry::shared_pointer ent(cache.lookup(newName)); if(ent) { found = true; ret = shared_from_this(); } } // unlock for callback channelFindRequester->channelFindResult(pvd::Status::Ok, ret, found); return ret; } // The return value of this function is ignored // The newly created channel is given to the ChannelRequester pva::Channel::shared_pointer GWServerChannelProvider::createChannel(std::string const & channelName, pva::ChannelRequester::shared_pointer const & channelRequester, short priority, std::string const & addressx) { GWChannel::shared_pointer ret; std::string newName; std::string address = channelRequester->getRequesterName(); if(!channelName.empty()) { // rewrite name newName = channelName; //newName[0] = 'y'; Guard G(cache.cacheLock); ChannelCacheEntry::shared_pointer ent(cache.lookup(newName)); // recursively locks cacheLock if(ent) { ret.reset(new GWChannel(ent, shared_from_this(), channelRequester, address)); ent->interested.insert(ret); ret->weakref = ret; } } if(!ret) { std::cerr<<"GWServer refusing channel "<channelCreated(S, ret); } else { std::cerr<<"GWServer connecting channel "<channelCreated(pvd::Status::Ok, ret); channelRequester->channelStateChange(ret, pva::Channel::CONNECTED); } return ret; // ignored by caller } void GWServerChannelProvider::destroy() { std::cout<<"GWServer destory request\n"; } GWServerChannelProvider::GWServerChannelProvider(const pva::ChannelProvider::shared_pointer& prov) :cache(prov) { std::cout<<"GW Server ctor\n"; } GWServerChannelProvider::~GWServerChannelProvider() { std::cout<<"GW Server dtor\n"; } namespace { static epicsMutex gbllock; static std::tr1::shared_ptr gblctx; void startServer() { try { Guard G(gbllock); if(gblctx) { printf("Already started\n"); return; } pva::ChannelProvider::shared_pointer client(pva::ChannelProviderRegistry::clients()->getProvider("pva")), server(new GWServerChannelProvider(client)); gblctx = pva::ServerContext::create(pva::ServerContext::Config() .provider(server) .config(pva::ConfigurationBuilder() .push_env() .build())); }catch(std::exception& e){ printf("Error: %s\n", e.what()); } } void stopServer() { try { Guard G(gbllock); if(!gblctx) { printf("Not started\n"); return; } else { gblctx->shutdown(); gblctx.reset(); } }catch(std::exception& e){ printf("Error: %s\n", e.what()); } } void infoServer(int lvl) { try { Guard G(gbllock); if(gblctx) { gblctx->printInfo(); } else { printf("Not running"); } }catch(std::exception& e){ printf("Error: %s\n", e.what()); } } void statusServer(int lvl, const char *chanexpr) { try{ pva::ServerContext::shared_pointer ctx; { Guard G(gbllock); ctx = gblctx; } if(!ctx) { std::cout<<"Not running\n"; return; } bool iswild = chanexpr ? (strchr(chanexpr, '?') || strchr(chanexpr, '*')) : false; if(chanexpr && lvl<1) lvl=1; // giving a channel implies at least channel level of detail const std::vector& prov(ctx->getChannelProviders()); std::cout<<"Server has "<getProviderName() : std::string("NULL"))<<"\n"; if(!p) continue; GWServerChannelProvider *scp = dynamic_cast(p); if(!scp) continue; ChannelCache::entries_t entries; size_t ncache, ncleaned, ndust; { Guard G(scp->cache.cacheLock); ncache = scp->cache.entries.size(); ncleaned = scp->cache.cleanerRuns; ndust = scp->cache.cleanerDust; if(lvl>0) { if(!chanexpr || iswild) { // no string or some glob pattern entries = scp->cache.entries; // copy of std::map } else if(chanexpr) { // just one channel AUTO_VAL(it, scp->cache.entries.find(chanexpr)); if(it!=scp->cache.entries.end()) entries[it->first] = it->second; } } } std::cout<<"Cache has "<first; if(iswild && !epicsStrGlobMatch(channame.c_str(), chanexpr)) continue; ChannelCacheEntry& E = *it->second; ChannelCacheEntry::mon_entries_t::lock_vector_type mons; size_t nsrv, nmon; bool dropflag; const char *chstate; { Guard G(E.mutex()); chstate = pva::Channel::ConnectionStateNames[E.channel->getConnectionState()]; nsrv = E.interested.size(); nmon = E.mon_entries.size(); dropflag = E.dropPoke; if(lvl>1) mons = E.mon_entries.lock_vector(); } std::cout<second; MonitorCacheEntry::interested_t::vector_type usrs; size_t nsrvmon; #ifdef USE_MSTATS pvd::Monitor::Stats mstats; #endif bool hastype, hasdata, isdone; { Guard G(ME.mutex()); nsrvmon = ME.interested.size(); hastype = !!ME.typedesc; hasdata = !!ME.lastelem; isdone = ME.done; #ifdef USE_MSTATS if(ME.mon) ME.mon->getStats(mstats); #endif if(lvl>2) usrs = ME.interested.lock_vector(); } // TODO: how to describe pvRequest in a compact way... std::cout<<" Client Monitor used by "<address; else remote = ""; } total = nempty + nfilled + nused; std::cout<<" Server monitor from " <& prov(ctx->getChannelProviders()); for(size_t i=0; i(p); if(!scp) continue; ChannelCacheEntry::shared_pointer entry; // find the channel, if it's there { Guard G(scp->cache.cacheLock); ChannelCache::entries_t::iterator it = scp->cache.entries.find(chan); if(it==scp->cache.entries.end()) continue; std::cout<<"Found in provider "<getProviderName()<<"\n"; entry = it->second; scp->cache.entries.erase(it); // drop out of cache (TODO: not required) } // trigger client side disconnect (recursively calls call CRequester::channelStateChange()) entry->channel->destroy(); } std::cout<<"Done\n"; }catch(std::exception& e){ std::cerr<<"Error: "<getChannelProviders()); if(lvl>0) std::cout<<"Server has "<(p); if(!scp) continue; ChannelCache::entries_t entries; { Guard G(scp->cache.cacheLock); entries = scp->cache.entries; // Copy } if(lvl>0) std::cout<<" Cache has "<second->mon_entries.lock_vector()); if(lvl>0) std::cout<<" Channel "<second->channelName <<" has "<second->interested); if(lvl>0) std::cout<<" Used by "<("gwstart"); iocshRegister<&stopServer>("gwstop"); iocshRegister("pvasr", "level"); iocshRegister("gwstatus", "level", "channel name/pattern"); iocshRegister("gwdrop", "channel"); iocshRegister("gwref", "level"); iocshRegister("pvadebug", "level"); } void gwServerShutdown() { stopServer(); }