monitor cache

This commit is contained in:
Michael Davidsaver
2015-11-19 12:17:19 -06:00
parent 3e265f02c0
commit ccdc51e417
8 changed files with 588 additions and 62 deletions

View File

@ -12,6 +12,7 @@ p2p_SRCS += main.cpp
p2p_SRCS += server.cpp
p2p_SRCS += client.cpp
p2p_SRCS += chancache.cpp
p2p_SRCS += moncache.cpp
p2p_SRCS += channel.cpp
p2p_LIBS += pvAccess pvData Com

View File

@ -1,9 +1,10 @@
#include <stdio.h>
#include <epicsAtomic.h>
#include <pv/epicsException.h>
#include <pv/serverContext.h>
#define epicsExportSharedSymbols
#include "pva2pva.h"
#include "chancache.h"
@ -12,9 +13,13 @@
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
size_t ChannelCacheEntry::num_instances;
ChannelCacheEntry::ChannelCacheEntry(ChannelCache* c, const std::string& n)
:channelName(n), cache(c), dropPoke(true)
{}
{
epicsAtomicIncrSizeT(&num_instances);
}
ChannelCacheEntry::~ChannelCacheEntry()
{
@ -22,6 +27,7 @@ ChannelCacheEntry::~ChannelCacheEntry()
std::cout<<"Destroy client channel for '"<<channelName<<"'\n";
if(channel.get())
channel->destroy(); // calls channelStateChange() w/ DESTROY
epicsAtomicDecrSizeT(&num_instances);
}
std::string
@ -58,7 +64,7 @@ ChannelCacheEntry::CRequester::channelStateChange(pva::Channel::shared_pointer c
<<pva::Channel::ConnectionStateNames[connectionState]<<"\n";
ChannelCacheEntry::interested_t interested;
ChannelCacheEntry::interested_t::vector_type interested;
// fanout notification
@ -79,13 +85,14 @@ ChannelCacheEntry::CRequester::channelStateChange(pva::Channel::shared_pointer c
break;
}
interested = chan->interested; // Copy to allow unlock during callback
interested = chan->interested.lock_vector(); // Copy to allow unlock during callback
}
for(ChannelCacheEntry::interested_t::const_iterator it=interested.begin(), end=interested.end();
for(ChannelCacheEntry::interested_t::vector_type::const_iterator
it=interested.begin(), end=interested.end();
it!=end; ++it)
{
(*it)->requester->channelStateChange((*it)->shared_from_this(), connectionState);
(*it)->requester->channelStateChange(*it, connectionState);
}
}

View File

@ -4,23 +4,84 @@
#include <string>
#include <map>
#include <set>
#include <deque>
#include <epicsMutex.h>
#include <epicsTimer.h>
#include <pv/pvAccess.h>
#include "weakmap.h"
#include "weakset.h"
struct ChannelCache;
struct ChannelCacheEntry;
struct MonitorUser;
struct GWChannel;
struct MonitorCacheEntry : public epics::pvData::MonitorRequester
{
POINTER_DEFINITIONS(MonitorCacheEntry);
static size_t num_instances;
typedef std::vector<epicsUInt8> pvrequest_t;
pvrequest_t key;
ChannelCacheEntry * const chan;
epics::pvData::StructureConstPtr typedesc;
epics::pvData::PVStructure::shared_pointer lastval;
epics::pvData::MonitorPtr mon;
epics::pvData::Status startresult;
typedef weak_set<MonitorUser> interested_t;
interested_t interested;
MonitorCacheEntry(ChannelCacheEntry *ent);
virtual ~MonitorCacheEntry();
virtual void monitorConnect(epics::pvData::Status const & status,
epics::pvData::MonitorPtr const & monitor,
epics::pvData::StructureConstPtr const & structure);
virtual void monitorEvent(epics::pvData::MonitorPtr const & monitor);
virtual void unlisten(epics::pvData::MonitorPtr const & monitor);
virtual std::string getRequesterName();
virtual void message(std::string const & message, epics::pvData::MessageType messageType);
};
struct MonitorUser : public epics::pvData::Monitor
{
POINTER_DEFINITIONS(MonitorUser);
static size_t num_instances;
weak_pointer weakref;
MonitorCacheEntry::shared_pointer entry;
epics::pvData::MonitorRequester::weak_pointer req;
bool running;
std::deque<epics::pvData::MonitorElementPtr> filled, empty;
std::set<epics::pvData::MonitorElementPtr> inuse;
MonitorUser(const MonitorCacheEntry::shared_pointer&);
virtual ~MonitorUser();
virtual void destroy();
virtual epics::pvData::Status start();
virtual epics::pvData::Status stop();
virtual epics::pvData::MonitorElementPtr poll();
virtual void release(epics::pvData::MonitorElementPtr const & monitorElement);
virtual std::string getRequesterName();
virtual void message(std::string const & message, epics::pvData::MessageType messageType);
};
struct ChannelCacheEntry
{
POINTER_DEFINITIONS(ChannelCacheEntry);
struct Update {
virtual ~Update()=0;
virtual void channelStateChange(epics::pvAccess::Channel::ConnectionState connectionState) = 0;
};
static size_t num_instances;
const std::string channelName;
ChannelCache * const cache;
@ -30,9 +91,13 @@ struct ChannelCacheEntry
bool dropPoke;
typedef std::set<GWChannel*> interested_t;
typedef weak_set<GWChannel> interested_t;
interested_t interested;
typedef MonitorCacheEntry::pvrequest_t pvrequest_t;
typedef weak_value_map<pvrequest_t, MonitorCacheEntry> mon_entries_t;
mon_entries_t mon_entries;
ChannelCacheEntry(ChannelCache*, const std::string& n);
virtual ~ChannelCacheEntry();

View File

@ -1,25 +1,27 @@
#include <epicsAtomic.h>
#define epicsExportSharedSymbols
#include "helper.h"
#include "pva2pva.h"
#include "channel.h"
namespace pva = epics::pvAccess;
namespace pvd = epics::pvData;
size_t GWChannel::num_instances;
GWChannel::GWChannel(ChannelCacheEntry::shared_pointer e,
epics::pvAccess::ChannelRequester::shared_pointer r)
pva::ChannelRequester::shared_pointer r)
:entry(e)
,requester(r)
{
Guard G(entry->cache->cacheLock);
entry->interested.insert(this);
epicsAtomicIncrSizeT(&num_instances);
}
GWChannel::~GWChannel()
{
Guard G(entry->cache->cacheLock);
entry->interested.erase(this);
std::cout<<"GWChannel dtor '"<<entry->channelName<<"'\n";
epicsAtomicDecrSizeT(&num_instances);
}
std::string
@ -34,7 +36,26 @@ GWChannel::message(std::string const & message, pvd::MessageType messageType)
std::cout<<"message to client about '"<<entry->channelName<<"' : "<<message<<"\n";
}
std::tr1::shared_ptr<epics::pvAccess::ChannelProvider>
void
GWChannel::destroy()
{
std::cout<<__PRETTY_FUNCTION__<<"\n";
// Client closes channel. Release our references,
// won't
shared_pointer self(weakref);
{
Guard G(entry->cache->cacheLock);
// remove ourselves before releasing our reference to prevent "stale" pointers.
// Poke the cache so that this channel is held open for a while longer
// in case this client reconnects shortly.
entry->dropPoke = true;
entry->interested.erase(self);
}
requester.reset();
entry.reset();
}
std::tr1::shared_ptr<pva::ChannelProvider>
GWChannel::getProvider()
{
return entry->cache->server.lock();
@ -43,7 +64,8 @@ GWChannel::getProvider()
std::string
GWChannel::getRemoteAddress()
{
return "foobar";
// pass through address of origin server (information leak?)
return entry->channel->getRemoteAddress();
}
pva::Channel::ConnectionState
@ -58,7 +80,7 @@ GWChannel::getChannelName()
return entry->channelName;
}
std::tr1::shared_ptr<epics::pvAccess::ChannelRequester>
std::tr1::shared_ptr<pva::ChannelRequester>
GWChannel::getChannelRequester()
{
return requester;
@ -72,73 +94,137 @@ GWChannel::isConnected()
void
GWChannel::getField(epics::pvAccess::GetFieldRequester::shared_pointer const & requester,
GWChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,
std::string const & subField)
{
//TODO: cache for top level field?
std::cout<<"getField for "<<entry->channelName<<" "<<subField<<"\n";
std::cout<<"getField for "<<entry->channelName<<" '"<<subField<<"'\n";
entry->channel->getField(requester, subField);
}
epics::pvAccess::AccessRights
GWChannel::getAccessRights(epics::pvData::PVField::shared_pointer const & pvField)
pva::AccessRights
GWChannel::getAccessRights(pvd::PVField::shared_pointer const & pvField)
{
return entry->channel->getAccessRights(pvField);
}
epics::pvAccess::ChannelProcess::shared_pointer
pva::ChannelProcess::shared_pointer
GWChannel::createChannelProcess(
epics::pvAccess::ChannelProcessRequester::shared_pointer const & channelProcessRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
pva::ChannelProcessRequester::shared_pointer const & channelProcessRequester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelProcess(channelProcessRequester, pvRequest);
}
epics::pvAccess::ChannelGet::shared_pointer
pva::ChannelGet::shared_pointer
GWChannel::createChannelGet(
epics::pvAccess::ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
pva::ChannelGetRequester::shared_pointer const & channelGetRequester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelGet(channelGetRequester, pvRequest);
}
epics::pvAccess::ChannelPut::shared_pointer
pva::ChannelPut::shared_pointer
GWChannel::createChannelPut(
epics::pvAccess::ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
pva::ChannelPutRequester::shared_pointer const & channelPutRequester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelPut(channelPutRequester, pvRequest);
}
epics::pvAccess::ChannelPutGet::shared_pointer
pva::ChannelPutGet::shared_pointer
GWChannel::createChannelPutGet(
epics::pvAccess::ChannelPutGetRequester::shared_pointer const & channelPutGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
pva::ChannelPutGetRequester::shared_pointer const & channelPutGetRequester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelPutGet(channelPutGetRequester, pvRequest);
}
epics::pvAccess::ChannelRPC::shared_pointer
pva::ChannelRPC::shared_pointer
GWChannel::createChannelRPC(
epics::pvAccess::ChannelRPCRequester::shared_pointer const & channelRPCRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
pva::ChannelRPCRequester::shared_pointer const & channelRPCRequester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelRPC(channelRPCRequester, pvRequest);
}
epics::pvData::Monitor::shared_pointer
GWChannel::createMonitor(
epics::pvData::MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
//TODO de-dup monitors
return entry->channel->createMonitor(monitorRequester, pvRequest);
namespace {
struct noclean {
void operator()(MonitorCacheEntry *) {}
};
}
epics::pvAccess::ChannelArray::shared_pointer
pvd::Monitor::shared_pointer
GWChannel::createMonitor(
pvd::MonitorRequester::shared_pointer const & monitorRequester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
std::cout<<__PRETTY_FUNCTION__<<"\n";
ChannelCacheEntry::pvrequest_t ser;
// serialize request struct to string using host byte order (only used for local comparison)
pvd::serializeToVector(pvRequest.get(), EPICS_BYTE_ORDER, ser);
MonitorCacheEntry::shared_pointer ent;
MonitorUser::shared_pointer mon;
pvd::Status startresult;
pvd::StructureConstPtr typedesc;
try {
Guard G(entry->cache->cacheLock);
ent = entry->mon_entries.find(ser);
if(!ent) {
ent.reset(new MonitorCacheEntry(entry.get()));
entry->mon_entries[ser] = ent;
// Create upstream monitor
// This would create a strong ref. loop between ent and ent->mon.
// Instead we get clever and pass a "fake" strong ref, which simply
// checks to see that it will out-live the object.
MonitorCacheEntry::shared_pointer fakereal(ent.get(), noclean());
ent->mon = entry->channel->createMonitor(fakereal, pvRequest);
ent->key.swap(ser); // no copy
std::cout<<"Monitor cache "<<entry->channelName<<" Miss\n";
} else {
std::cout<<"Monitor cache "<<entry->channelName<<" Hit\n";
}
mon.reset(new MonitorUser(ent));
ent->interested.insert(mon);
mon->weakref = mon;
mon->req = monitorRequester;
typedesc = ent->typedesc;
startresult = ent->startresult;
} catch(std::exception& e) {
mon.reset();
std::cerr<<"Exception in "<<__PRETTY_FUNCTION__<<"\n"
"is "<<e.what()<<"\n";
pvd::Status oops(pvd::Status::STATUSTYPE_FATAL, "Error during GWChannel setup");
startresult = oops;
monitorRequester->monitorConnect(oops, mon, typedesc);
return mon;
}
// unlock for callback
if(typedesc || !startresult.isSuccess()) {
// upstream monitor already connected, or never will be,
// so connect immeidately
monitorRequester->monitorConnect(pvd::Status::Ok, mon, typedesc);
}
return mon;
}
pva::ChannelArray::shared_pointer
GWChannel::createChannelArray(
epics::pvAccess::ChannelArrayRequester::shared_pointer const & channelArrayRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
pva::ChannelArrayRequester::shared_pointer const & channelArrayRequester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
return entry->channel->createChannelArray(channelArrayRequester, pvRequest);
}

View File

@ -5,9 +5,12 @@
#include "chancache.h"
struct GWChannel : public epics::pvAccess::Channel,
std::tr1::enable_shared_from_this<GWChannel>
struct GWChannel : public epics::pvAccess::Channel
{
POINTER_DEFINITIONS(GWChannel);
static size_t num_instances;
weak_pointer weakref;
ChannelCacheEntry::shared_pointer entry;
epics::pvAccess::ChannelRequester::shared_pointer requester;
@ -21,7 +24,7 @@ struct GWChannel : public epics::pvAccess::Channel,
virtual void message(std::string const & message, epics::pvData::MessageType messageType);
// for Destroyable
virtual void destroy(){}
virtual void destroy();
// for Channel
virtual std::tr1::shared_ptr<epics::pvAccess::ChannelProvider> getProvider();

16
p2pApp/helper.h Normal file
View File

@ -0,0 +1,16 @@
#ifndef HELPER_H
#define HELPER_H
#if __cplusplus>=201103L
# define AUTO_VAL(NAME,VAL) auto NAME = VAL
# define AUTO_REF(NAME,VAL) auto& NAME = VAL
# define FOREACH(IT,END,C) for(auto IT=(C).begin(), END=(C).end(); IT!=END; ++IT)
#elif defined(__GNUC__)
# define AUTO_VAL(NAME,VAL) __typeof__(VAL) NAME(VAL)
# define AUTO_REF(NAME,VAL) __typeof__(VAL)& NAME(VAL)
# define FOREACH(IT,END,C) for(__typeof__((C).begin()) IT=(C).begin(), END=(C).end(); IT!=END; ++IT)
#else
# error Require C++11 or G++
#endif
#endif // HELPER_H

273
p2pApp/moncache.cpp Normal file
View File

@ -0,0 +1,273 @@
#include <epicsAtomic.h>
#include "helper.h"
#include "pva2pva.h"
#include "chancache.h"
namespace pva = epics::pvAccess;
namespace pvd = epics::pvData;
size_t MonitorCacheEntry::num_instances;
size_t MonitorUser::num_instances;
MonitorCacheEntry::MonitorCacheEntry(ChannelCacheEntry *ent)
:chan(ent)
{
epicsAtomicIncrSizeT(&num_instances);
}
MonitorCacheEntry::~MonitorCacheEntry()
{
pvd::Monitor::shared_pointer M;
M.swap(mon);
if(M) {
M->destroy();
std::cout<<__PRETTY_FUNCTION__<<" destroy client monitor\n";
}
epicsAtomicDecrSizeT(&num_instances);
}
void
MonitorCacheEntry::monitorConnect(pvd::Status const & status,
pvd::MonitorPtr const & monitor,
pvd::StructureConstPtr const & structure)
{
printf("Called %s %p %p\n", __PRETTY_FUNCTION__, monitor.get(), mon.get());
assert(monitor==mon);
interested_t::vector_type tonotify;
{
Guard G(chan->cache->cacheLock);
typedesc = structure;
if(status.isSuccess()) {
startresult = monitor->start();
} else {
startresult = status;
}
// set typedesc and startresult for futured MonitorUsers
// and copy snapshot of already interested MonitorUsers
tonotify = interested.lock_vector();
}
if(!startresult.isSuccess()) {
std::cout<<"upstream monitor start() fails\n";
} else {
std::cout<<"upstream monitor start() begins\n";
}
for(interested_t::vector_type::const_iterator it = tonotify.begin(),
end = tonotify.end(); it!=end; ++it)
{
pvd::MonitorRequester::shared_pointer req((*it)->req);
if(req) {
req->monitorConnect(startresult, *it, structure);
} else {
std::cout<<"Dead requester in monitorConnect()\n";
}
}
}
// notificaton from upstream client that its monitor queue has
// become is not empty (transition from empty to not empty)
// will not be called again unless we completely empty the queue.
// If we don't then it is our responsibly to schedule more poll().
void
MonitorCacheEntry::monitorEvent(pvd::MonitorPtr const & monitor)
{
printf("Called %s %p %p\n", __PRETTY_FUNCTION__, monitor.get(), mon.get());
/* PVA is being tricky, the Monitor* passed to monitorConnect()
* isn't the same one we see here!
* The original was a ChannelMonitorImpl, we now see a MonitorStrategyQueue
* owned by the original, which delegates deserialization and accumulation
* of deltas into complete events for us.
*/
assert(monitor==mon || !lastval);
if(!lastval)
mon = monitor;
//TODO: dequeue and requeue strategy code goes here
pvd::MonitorElementPtr update;
while((update=mon->poll()))
{
lastval = update->pvStructurePtr;
std::cout<<" Poll event\n";
AUTO_VAL(tonotify, interested.lock_vector()); // TODO: avoid copy, iterate w/ lock
FOREACH(it, end, tonotify)
{
MonitorUser *usr = it->get();
pvd::MonitorRequester::shared_pointer req(usr->req);
{
Guard G(chan->cache->cacheLock); // TODO: more granular lock
if(!usr->running || usr->empty.empty())
continue;
pvd::MonitorElementPtr elem(usr->empty.front());
elem->pvStructurePtr = update->pvStructurePtr;
elem->overrunBitSet = update->overrunBitSet;
elem->changedBitSet = update->changedBitSet;
usr->filled.push_back(elem);
usr->empty.pop_front();
std::cout<<" Dispatch to "<<usr<<" "
<<usr->filled.size()<<"/"<<usr->empty.size()<<" "<<(int)usr->running<<"\n";
}
if(usr->filled.size()==1)
req->monitorEvent(*it); // notify when first item added to empty queue
}
mon->release(update);
}
}
// notificaton from upstream client that no more monitor updates will come, ever
void
MonitorCacheEntry::unlisten(pvd::MonitorPtr const & monitor)
{
printf("Called %s\n", __PRETTY_FUNCTION__);
pvd::Monitor::shared_pointer M;
M.swap(mon);
if(M) {
M->destroy();
std::cout<<__PRETTY_FUNCTION__<<" destroy client monitor\n";
}
}
std::string
MonitorCacheEntry::getRequesterName()
{
return "MonitorCacheEntry";
}
void
MonitorCacheEntry::message(std::string const & message, pvd::MessageType messageType)
{
std::cout<<"message to Monitor cache entry about '"<<chan->channelName<<"' : "<<message<<"\n";
}
MonitorUser::MonitorUser(const MonitorCacheEntry::shared_pointer &e)
:entry(e)
,running(false)
{
epicsAtomicIncrSizeT(&num_instances);
}
MonitorUser::~MonitorUser()
{
epicsAtomicDecrSizeT(&num_instances);
}
// downstream server closes monitor
void
MonitorUser::destroy()
{
printf("Called %s\n", __PRETTY_FUNCTION__);
Guard G(entry->chan->cache->cacheLock);
running = false;
req.reset();
}
pvd::Status
MonitorUser::start()
{
printf("Called %s\n", __PRETTY_FUNCTION__);
pvd::MonitorRequester::shared_pointer req;
bool doEvt = false;
{
Guard G(entry->chan->cache->cacheLock);
req = this->req.lock();
if(!req)
return pvd::Status(pvd::Status::STATUSTYPE_FATAL, "already dead");
if(entry->startresult.isSuccess())
running = true;
//TODO: control queue size
empty.resize(4);
pvd::PVDataCreatePtr fact(pvd::getPVDataCreate());
for(unsigned i=0; i<empty.size(); i++) {
empty[i].reset(new pvd::MonitorElement(fact->createPVStructure(entry->typedesc)));
}
if(entry->lastval) {
//already running, notify of initial element
assert(!empty.empty());
pvd::MonitorElementPtr elem(empty.front());
elem->pvStructurePtr = entry->lastval;
elem->changedBitSet->set(0); // indicate all changed?
filled.push_back(elem);
empty.pop_front();
doEvt = true;
}
}
if(doEvt)
req->monitorEvent(weakref.lock());
return pvd::Status();
}
pvd::Status
MonitorUser::stop()
{
printf("Called %s\n", __PRETTY_FUNCTION__);
Guard G(entry->chan->cache->cacheLock);
running = false;
return pvd::Status::Ok;
}
pvd::MonitorElementPtr
MonitorUser::poll()
{
printf("Called %s\n", __PRETTY_FUNCTION__);
Guard G(entry->chan->cache->cacheLock);
pvd::MonitorElementPtr ret;
std::cout<<" Poll "<<this<<" "<<filled.size()<<"/"<<empty.size()<<"\n";
if(!filled.empty()) {
ret = filled.front();
inuse.insert(ret); // track which ones are out for client use
filled.pop_front();
//TODO: track lost buffers w/ wrapped shared_ptr?
}
std::cout<<" yields "<<this<<" "<<ret<<"\n";
return ret;
}
void
MonitorUser::release(pvd::MonitorElementPtr const & monitorElement)
{
printf("Called %s\n", __PRETTY_FUNCTION__);
std::cout<<" Release "<<this<<" "<<monitorElement<<"\n";
Guard G(entry->chan->cache->cacheLock);
//TODO: ifdef DEBUG? (only track inuse when debugging?)
std::set<epics::pvData::MonitorElementPtr>::iterator it = inuse.find(monitorElement);
if(it!=inuse.end()) {
empty.push_back(monitorElement);
inuse.erase(it);
} else {
// oh no, we've been given an element we weren't expecting
//TODO: check empty and filled lists to see if this is one of ours, of from somewhere else
throw std::invalid_argument("Can't release MonitorElement not in use");
}
}
std::string
MonitorUser::getRequesterName()
{
return "MonitorCacheEntry";
}
void
MonitorUser::message(std::string const & message, pvd::MessageType messageType)
{
std::cout<<"message to Monitor cache client : "<<message<<"\n";
}

View File

@ -1,10 +1,12 @@
#include <stdio.h>
#include <epicsAtomic.h>
#include <pv/epicsException.h>
#include <pv/serverContext.h>
#define epicsExportSharedSymbols
#include "helper.h"
#include "pva2pva.h"
#include "iocshelper.h"
#include "chancache.h"
@ -55,9 +57,8 @@ struct GWServerChannelProvider : public
std::string newName;
// rewrite name
newName.reserve(channelName.size());
newName = "y";
newName.append(channelName.begin()+1, channelName.end());
newName = channelName;
newName[0] = 'y';
Guard G(cache.cacheLock);
@ -112,16 +113,15 @@ struct GWServerChannelProvider : public
pva::ChannelRequester::shared_pointer const & channelRequester,
short priority, std::string const & address)
{
pva::Channel::shared_pointer ret;
GWChannel::shared_pointer ret;
std::string newName;
if(!channelName.empty() && channelName[0]=='x')
{
// rewrite name
newName.reserve(channelName.size());
newName = "y";
newName.append(channelName.begin()+1, channelName.end());
newName = channelName;
newName[0] = 'y';
Guard G(cache.cacheLock);
@ -129,6 +129,8 @@ struct GWServerChannelProvider : public
if(it!=cache.entries.end() && it->second->channel->isConnected())
{
ret.reset(new GWChannel(it->second, channelRequester));
it->second->interested.insert(ret);
ret->weakref = ret;
}
}
@ -141,7 +143,7 @@ struct GWServerChannelProvider : public
channelRequester->channelCreated(pvd::Status::Ok, ret);
}
return ret;
return ret; // ignored by caller
}
virtual void configure(epics::pvData::PVStructure::shared_pointer /*configuration*/) {
@ -344,6 +346,78 @@ void dropChannel(const char *chan)
}
}
void show_cnt(const char *name, const size_t& live, const size_t& reach) {
ptrdiff_t delta = ptrdiff_t(live)-ptrdiff_t(reach);
std::cout<<name<<" live: "<<live
<<" reachable: "<<reach
<<" delta: "<<delta<<"\n";
}
void refCheck(int lvl)
{
try{
std::cout<<"GW instances reference counts.\n"
"Note: small inconsistencies (positive and negative) are normal due to locking.\n"
" Actual leaks will manifest as a sustained increases.\n";
pva::ServerContextImpl::shared_pointer ctx(gblctx);
if(!ctx) {
std::cout<<"Not running\n";
return;
}
const AUTO_REF(prov, ctx->getChannelProviders());
size_t chan_count = 0, mon_count = 0, mon_user_count = 0;
if(lvl>0) std::cout<<"Server has "<<prov.size()<<" providers\n";
for(size_t i=0; i<prov.size(); i++)
{
pva::ChannelProvider* p = prov[i].get();
if(!p) continue;
GWServerChannelProvider *scp = dynamic_cast<GWServerChannelProvider*>(p);
if(!scp) continue;
{
Guard G(scp->cache.cacheLock);
AUTO_REF(entries, scp->cache.entries);
if(lvl>0) std::cout<<" Cache has "<<scp->cache.entries.size()<<" channels\n";
chan_count += entries.size();
FOREACH(it, end, entries)
{
AUTO_VAL(M, it->second->mon_entries.lock_vector());
if(lvl>0) std::cout<<" Channel "<<it->second->channelName
<<" has "<<M.size()<<" Client Monitors\n";
mon_count += M.size();
FOREACH(it2, end2, M)
{
AUTO_REF(W, it2->second->interested);
if(lvl>0) std::cout<<" Used by "<<W.size()<<" Client Monitors\n";
mon_user_count += W.size();
}
}
}
}
size_t chan_latch = epicsAtomicGetSizeT(&ChannelCacheEntry::num_instances),
mon_latch = epicsAtomicGetSizeT(&MonitorCacheEntry::num_instances),
mon_user_latch = epicsAtomicGetSizeT(&MonitorUser::num_instances);
std::cout<<"GWChannel live: "<<epicsAtomicGetSizeT(&GWChannel::num_instances)<<"\n";
show_cnt("ChannelCacheEntry",chan_latch,chan_count);
show_cnt("MonitorCacheEntry",mon_latch,mon_count);
show_cnt("MonitorUser",mon_user_latch,mon_user_count);
}catch(std::exception& e){
std::cerr<<"Error: "<<e.what()<<"\n";
}
}
} // namespace
static
@ -358,6 +432,7 @@ void registerGWServerIocsh()
iocshRegister<&stopServer>("gwstop");
iocshRegister<int, &statusServer>("gwstatus", "level");
iocshRegister<const char*, &dropChannel>("gwdrop", "channel");
iocshRegister<int, &refCheck>("gwref", "level");
}