#include #include #include #include #define epicsExportSharedSymbols #include "pva2pva.h" #include "helper.h" #include "chancache.h" #include "channel.h" namespace pvd = epics::pvData; namespace pva = epics::pvAccess; size_t ChannelCacheEntry::num_instances; ChannelCacheEntry::ChannelCacheEntry(ChannelCache* c, const std::string& n) :channelName(n), cache(c), dropPoke(true) { epicsAtomicIncrSizeT(&num_instances); } ChannelCacheEntry::~ChannelCacheEntry() { // Should *not* be holding cache->cacheLock std::cout<<"Destroy client channel for '"<destroy(); // calls channelStateChange() w/ DESTROY epicsAtomicDecrSizeT(&num_instances); } std::string ChannelCacheEntry::CRequester::getRequesterName() { return "GWClient"; } size_t ChannelCacheEntry::CRequester::num_instances; ChannelCacheEntry::CRequester::CRequester(const ChannelCacheEntry::shared_pointer& p) :chan(p) { epicsAtomicIncrSizeT(&num_instances); } ChannelCacheEntry::CRequester::~CRequester() { epicsAtomicDecrSizeT(&num_instances); } void ChannelCacheEntry::CRequester::message(std::string const & message, pvd::MessageType messageType) { ChannelCacheEntry::shared_pointer chan(this->chan); if(chan) std::cout<<"message to client about '"<channelName<<"' : "<chan.lock()); if(!chan) return; std::cout<<"Chan change '"<channelName<<"' is " <cache->cacheLock); assert(chan->channel.get()==channel.get()); switch(connectionState) { case pva::Channel::DISCONNECTED: case pva::Channel::DESTROYED: // Drop from cache chan->cache->entries.erase(chan->channelName); // keep 'chan' as a reference so that actual destruction doesn't happen which cacheLock is held break; default: break; } } // fanout notification AUTO_VAL(interested, chan->interested.lock_vector()); // Copy FOREACH(it, end, interested) { GWChannel *chan = it->get(); pva::ChannelRequester::shared_pointer req(chan->requester.lock()); if(req) req->channelStateChange(*it, connectionState); else std::cout<<"GWChannel w/ dead requester\n"; } } struct ChannelCache::cacheClean : public epicsTimerNotify { ChannelCache *cache; cacheClean(ChannelCache *c) : cache(c) {} epicsTimerNotify::expireStatus expire(const epicsTime ¤tTime) { // keep a reference to any cache entrys being removed so they // aren't destroyed while cacheLock is held std::set cleaned; { Guard G(cache->cacheLock); cache->cleanerRuns++; ChannelCache::entries_t::iterator cur=cache->entries.begin(), next, end=cache->entries.end(); while(cur!=end) { next = cur; ++next; if(!cur->second->dropPoke && cur->second->interested.empty()) { cleaned.insert(cur->second); cache->entries.erase(cur); cache->cleanerDust++; } else { cur->second->dropPoke = false; } cur = next; } } return epicsTimerNotify::expireStatus(epicsTimerNotify::restart, 30.0); } }; ChannelCache::ChannelCache(const pva::ChannelProvider::shared_pointer& prov) :provider(prov) ,timerQueue(&epicsTimerQueueActive::allocate(1, epicsThreadPriorityCAServerLow-2)) ,cleaner(new cacheClean(this)) ,cleanerRuns(0) ,cleanerDust(0) { if(!provider) throw std::logic_error("Missing 'pva' provider"); assert(timerQueue); cleanTimer = &timerQueue->createTimer(); cleanTimer->start(*cleaner, 30.0); } ChannelCache::~ChannelCache() { Guard G(cacheLock); cleanTimer->destroy(); timerQueue->release(); delete cleaner; entries_t E; E.swap(entries); FOREACH(it, end, E) { ChannelCacheEntry *ent = it->second.get(); if(ent->channel) { epics::pvAccess::Channel::shared_pointer chan; chan.swap(ent->channel); UnGuard U(G); chan->destroy(); } } } ChannelCacheEntry::shared_pointer ChannelCache::lookup(const std::string& newName) { ChannelCacheEntry::shared_pointer ret; Guard G(cacheLock); entries_t::const_iterator it = entries.find(newName); if(it==entries.end()) { // first request, create ChannelCacheEntry //TODO: async lookup ChannelCacheEntry::shared_pointer ent(new ChannelCacheEntry(this, newName)); ent->requester.reset(new ChannelCacheEntry::CRequester(ent)); entries[newName] = ent; pva::Channel::shared_pointer M; { // unlock to call createChannel() epicsGuardRelease U(G); M = provider->createChannel(newName, ent->requester); if(!M) THROW_EXCEPTION2(std::runtime_error, "Failed to createChannel"); } ent->channel = M; if(M->isConnected()) ret = ent; // immediate connect, mostly for unit-tests (thus delayed connect not covered) } else if(it->second->channel && it->second->channel->isConnected()) { // another request, and hey we're connected this time ret = it->second; it->second->dropPoke = true; } else { // not connected yet, but a client is still interested it->second->dropPoke = true; } return ret; }