p2p status reporting update
This commit is contained in:
36
loopback.conf
Normal file
36
loopback.conf
Normal file
@ -0,0 +1,36 @@
|
||||
/* Test configuration with client and server bound to localhost
|
||||
* on different ports.
|
||||
*
|
||||
* Server listens on: localhost:5076 and attempts to bind 5075
|
||||
* Client searches to: localhost:5086 and 5085
|
||||
*
|
||||
* Servers which wish to be behind the gateway should set
|
||||
* EPICS_PVAS_INTF_ADDR_LIST=127.0.0.1
|
||||
* EPICS_PVAS_SERVER_PORT=5085
|
||||
* EPICS_PVAS_BROADCAST_PORT=5086
|
||||
*
|
||||
* Clients which wish to connect through the gateway should set
|
||||
* EPICS_PVA_ADDR_LIST=127.0.0.1
|
||||
*/
|
||||
{
|
||||
"version":1,
|
||||
"clients":[
|
||||
{
|
||||
"name":"theclient",
|
||||
"provider":"pva",
|
||||
"addrlist":"127.0.0.1 127.255.255.255",
|
||||
"autoaddrlist":false,
|
||||
"serverport":5085,
|
||||
"bcastport":5086
|
||||
}
|
||||
],
|
||||
"servers":[
|
||||
{
|
||||
"name":"theserver",
|
||||
"clients":["theclient"],
|
||||
"interface":"127.0.0.1",
|
||||
"serverport":5075,
|
||||
"bcastport":5076
|
||||
}
|
||||
]
|
||||
}
|
@ -13,7 +13,6 @@ p2p_LIBS += p2pcore pvAccessIOC pvAccess pvData Com
|
||||
LIBRARY_HOST += p2pcore
|
||||
|
||||
p2pcore_SRCS += server.cpp
|
||||
p2pcore_SRCS += client.cpp
|
||||
p2pcore_SRCS += chancache.cpp
|
||||
p2pcore_SRCS += moncache.cpp
|
||||
p2pcore_SRCS += channel.cpp
|
||||
|
@ -158,25 +158,16 @@ ChannelCache::ChannelCache(const pva::ChannelProvider::shared_pointer& prov)
|
||||
|
||||
ChannelCache::~ChannelCache()
|
||||
{
|
||||
Guard G(cacheLock);
|
||||
|
||||
cleanTimer->destroy();
|
||||
timerQueue->release();
|
||||
delete cleaner;
|
||||
|
||||
entries_t E;
|
||||
E.swap(entries);
|
||||
|
||||
FOREACH(entries_t::iterator, it, end, E)
|
||||
{
|
||||
ChannelCacheEntry *ent = it->second.get();
|
||||
Guard G(cacheLock);
|
||||
|
||||
if(ent->channel) {
|
||||
epics::pvAccess::Channel::shared_pointer chan;
|
||||
chan.swap(ent->channel);
|
||||
UnGuard U(G);
|
||||
chan->destroy();
|
||||
}
|
||||
cleanTimer->destroy();
|
||||
timerQueue->release();
|
||||
delete cleaner;
|
||||
|
||||
entries_t E;
|
||||
E.swap(entries);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,9 +1,10 @@
|
||||
|
||||
#include <epicsAtomic.h>
|
||||
|
||||
#include <pv/iocshelper.h>
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
#include "helper.h"
|
||||
#include "iocshelper.h"
|
||||
#include "pva2pva.h"
|
||||
#include "channel.h"
|
||||
|
||||
@ -245,5 +246,5 @@ GWChannel::printInfo(std::ostream& out)
|
||||
|
||||
void registerReadOnly()
|
||||
{
|
||||
iocshVariable<int, &p2pReadOnly>("p2pReadOnly");
|
||||
epics::iocshVariable<int, &p2pReadOnly>("p2pReadOnly");
|
||||
}
|
||||
|
@ -1,25 +0,0 @@
|
||||
|
||||
#include <epicsGuard.h>
|
||||
|
||||
#include <pv/sharedPtr.h>
|
||||
#include <pv/pvAccess.h>
|
||||
#include <pv/clientFactory.h>
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
#include "pva2pva.h"
|
||||
#include "chancache.h"
|
||||
|
||||
namespace pvd = epics::pvData;
|
||||
namespace pva = epics::pvAccess;
|
||||
|
||||
|
||||
void registerGWClientIocsh()
|
||||
{
|
||||
pva::ClientFactory::start();
|
||||
|
||||
}
|
||||
|
||||
void gwClientShutdown()
|
||||
{
|
||||
pva::ClientFactory::stop();
|
||||
}
|
@ -19,7 +19,9 @@
|
||||
#include <pv/clientFactory.h>
|
||||
#include <pv/configuration.h>
|
||||
#include <pv/serverContext.h>
|
||||
#include <pv/reftrack.h>
|
||||
#include <pv/iocreftrack.h>
|
||||
#include <pv/iocshelper.h>
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
#include "server.h"
|
||||
@ -41,7 +43,7 @@ pvd::StructureConstPtr schema(pvd::getFieldCreate()->createFieldBuilder()
|
||||
->endNested()
|
||||
->addNestedStructureArray("servers")
|
||||
->add("name", pvd::pvString)
|
||||
->add("client", pvd::pvString)
|
||||
->addArray("clients", pvd::pvString)
|
||||
->add("interface", pvd::pvString)
|
||||
->add("serverport", pvd::pvUShort)
|
||||
->add("bcastport", pvd::pvUShort)
|
||||
@ -144,13 +146,21 @@ pva::ServerContext::shared_pointer configure_server(ServerConfig& arg, const pvd
|
||||
.push_map()
|
||||
.build());
|
||||
|
||||
ServerConfig::clients_t::const_iterator it(arg.clients.find(conf->getSubFieldT<pvd::PVString>("client")->get()));
|
||||
if(it==arg.clients.end())
|
||||
throw std::runtime_error("Server references non-existant client");
|
||||
pvd::PVStringArray::shared_pointer clients(conf->getSubFieldT<pvd::PVStringArray>("clients"));
|
||||
pvd::PVStringArray::const_svector names(clients->view());
|
||||
std::vector<pva::ChannelProvider::shared_pointer> providers;
|
||||
|
||||
for(pvd::PVStringArray::const_svector::const_iterator it(names.begin()), end(names.end()); it!=end; ++it)
|
||||
{
|
||||
ServerConfig::clients_t::const_iterator it2(arg.clients.find(*it));
|
||||
if(it2==arg.clients.end())
|
||||
throw std::runtime_error("Server references non-existant client");
|
||||
providers.push_back(it2->second);
|
||||
}
|
||||
|
||||
pva::ServerContext::shared_pointer ret(pva::ServerContext::create(pva::ServerContext::Config()
|
||||
.config(C)
|
||||
.provider(it->second)));
|
||||
.providers(providers)));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -166,13 +176,60 @@ void sigdone(int num)
|
||||
}
|
||||
#endif
|
||||
|
||||
ServerConfig* volatile theserver;
|
||||
|
||||
void iocsh_drop(const char *client, const char *channel)
|
||||
{
|
||||
if(!theserver)
|
||||
return;
|
||||
try {
|
||||
theserver->drop(client, channel);
|
||||
}catch(std::exception& e){
|
||||
std::cout<<"Error: "<<e.what()<<"\n";
|
||||
}
|
||||
}
|
||||
|
||||
void gwsr(int lvl, const char *server)
|
||||
{
|
||||
if(!theserver)
|
||||
return;
|
||||
try {
|
||||
theserver->status_server(lvl, server);
|
||||
}catch(std::exception& e){
|
||||
std::cout<<"Error: "<<e.what()<<"\n";
|
||||
}
|
||||
}
|
||||
|
||||
void gwcr(int lvl, const char *client, const char *channel)
|
||||
{
|
||||
if(!theserver)
|
||||
return;
|
||||
try {
|
||||
theserver->status_client(lvl, client, channel);
|
||||
}catch(std::exception& e){
|
||||
std::cout<<"Error: "<<e.what()<<"\n";
|
||||
}
|
||||
}
|
||||
|
||||
}// namespace
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
try {
|
||||
pva::refTrackRegistrar();
|
||||
|
||||
epics::iocshRegister<const char*, const char*, &iocsh_drop>("drop", "client", "channel");
|
||||
epics::iocshRegister<int, const char*, &gwsr>("gwsr", "level", "channel");
|
||||
epics::iocshRegister<int, const char*, const char*, &gwcr>("gwcr", "level", "client", "channel");
|
||||
|
||||
epics::registerRefCounter("ChannelCacheEntry", &ChannelCacheEntry::num_instances);
|
||||
epics::registerRefCounter("ChannelCacheEntry::CRequester", &ChannelCacheEntry::CRequester::num_instances);
|
||||
epics::registerRefCounter("GWChannel", &GWChannel::num_instances);
|
||||
epics::registerRefCounter("MonitorCacheEntry", &MonitorCacheEntry::num_instances);
|
||||
epics::registerRefCounter("MonitorUser", &MonitorUser::num_instances);
|
||||
|
||||
ServerConfig arg;
|
||||
theserver = &arg;
|
||||
getargs(arg, argc, argv);
|
||||
|
||||
pva::ClientFactory::start();
|
||||
@ -228,6 +285,8 @@ int main(int argc, char *argv[])
|
||||
}
|
||||
}
|
||||
|
||||
theserver = 0;
|
||||
|
||||
return ret;
|
||||
}catch(std::exception& e){
|
||||
std::cerr<<"Fatal Error : "<<e.what()<<"\n";
|
||||
|
@ -1,132 +0,0 @@
|
||||
#ifndef IOCSHELPER_H
|
||||
#define IOCSHELPER_H
|
||||
|
||||
#include <string>
|
||||
|
||||
#include <iocsh.h>
|
||||
|
||||
namespace detail {
|
||||
|
||||
template<typename T>
|
||||
struct getarg {};
|
||||
template<> struct getarg<int> {
|
||||
static int op(const iocshArgBuf& a) { return a.ival; }
|
||||
enum { argtype = iocshArgInt };
|
||||
};
|
||||
template<> struct getarg<double> {
|
||||
static double op(const iocshArgBuf& a) { return a.dval; }
|
||||
enum { argtype = iocshArgDouble };
|
||||
};
|
||||
template<> struct getarg<char*> {
|
||||
static char* op(const iocshArgBuf& a) { return a.sval; }
|
||||
enum { argtype = iocshArgString };
|
||||
};
|
||||
template<> struct getarg<const char*> {
|
||||
static const char* op(const iocshArgBuf& a) { return a.sval; }
|
||||
enum { argtype = iocshArgString };
|
||||
};
|
||||
|
||||
|
||||
template<int N>
|
||||
struct iocshFuncInfo{
|
||||
iocshFuncDef def;
|
||||
std::string name;
|
||||
iocshArg *argarr[N];
|
||||
iocshArg args[N];
|
||||
std::string argnames[N];
|
||||
iocshFuncInfo(const std::string& n) :name(n) {
|
||||
def.name = name.c_str();
|
||||
def.nargs = N;
|
||||
def.arg = (iocshArg**)&argarr;
|
||||
for(size_t i=0; i<N; i++)
|
||||
argarr[i] = &args[i];
|
||||
}
|
||||
template<int n, typename T>
|
||||
void set(const char *name) {
|
||||
argnames[n] = name;
|
||||
args[n].name = argnames[n].c_str();
|
||||
args[n].type = (iocshArgType)detail::getarg<T>::argtype;
|
||||
}
|
||||
};
|
||||
|
||||
template<void (*fn)()>
|
||||
static void call0(const iocshArgBuf *args)
|
||||
{
|
||||
fn();
|
||||
}
|
||||
|
||||
template<typename A, void (*fn)(A)>
|
||||
static void call1(const iocshArgBuf *args)
|
||||
{
|
||||
fn(getarg<A>::op(args[0]));
|
||||
}
|
||||
|
||||
template<typename A, typename B, void (*fn)(A,B)>
|
||||
static void call2(const iocshArgBuf *args)
|
||||
{
|
||||
fn(getarg<A>::op(args[0]),
|
||||
getarg<B>::op(args[1]));
|
||||
}
|
||||
|
||||
template<typename A, typename B, typename C, void (*fn)(A,B,C)>
|
||||
static void call3(const iocshArgBuf *args)
|
||||
{
|
||||
fn(getarg<A>::op(args[0]),
|
||||
getarg<B>::op(args[1]),
|
||||
getarg<C>::op(args[2]));
|
||||
}
|
||||
|
||||
} // namespace detail
|
||||
|
||||
|
||||
template<void (*fn)()>
|
||||
void iocshRegister(const char *name)
|
||||
{
|
||||
static detail::iocshFuncInfo<0> info(name);
|
||||
iocshRegister(&info.def, &detail::call0<fn>);
|
||||
}
|
||||
|
||||
template<typename A, void (*fn)(A)>
|
||||
void iocshRegister(const char *name, const char *arg1name)
|
||||
{
|
||||
static detail::iocshFuncInfo<1> info(name);
|
||||
info.set<0,A>(arg1name);
|
||||
iocshRegister(&info.def, &detail::call1<A, fn>);
|
||||
}
|
||||
|
||||
template<typename A, typename B, void (*fn)(A,B)>
|
||||
void iocshRegister(const char *name,
|
||||
const char *arg1name,
|
||||
const char *arg2name)
|
||||
{
|
||||
static detail::iocshFuncInfo<2> info(name);
|
||||
info.set<0,A>(arg1name);
|
||||
info.set<1,B>(arg2name);
|
||||
iocshRegister(&info.def, &detail::call2<A, B, fn>);
|
||||
}
|
||||
|
||||
template<typename A, typename B, typename C, void (*fn)(A,B,C)>
|
||||
void iocshRegister(const char *name,
|
||||
const char *arg1name,
|
||||
const char *arg2name,
|
||||
const char *arg3name)
|
||||
{
|
||||
static detail::iocshFuncInfo<3> info(name);
|
||||
info.set<0,A>(arg1name);
|
||||
info.set<1,B>(arg2name);
|
||||
info.set<2,C>(arg3name);
|
||||
iocshRegister(&info.def, &detail::call3<A, B, C, fn>);
|
||||
}
|
||||
|
||||
template<typename V, V* addr>
|
||||
void iocshVariable(const char *name)
|
||||
{
|
||||
static iocshVarDef def[2];
|
||||
def[0].name = name;
|
||||
def[0].pval = (void*)addr;
|
||||
def[0].type = (iocshArgType)detail::getarg<V>::argtype;
|
||||
def[1].name = NULL;
|
||||
iocshRegisterVariable(def);
|
||||
}
|
||||
|
||||
#endif // IOCSHELPER_H
|
@ -10,7 +10,6 @@
|
||||
typedef epicsGuard<epicsMutex> Guard;
|
||||
typedef epicsGuardRelease<epicsMutex> UnGuard;
|
||||
|
||||
epicsShareExtern void registerGWServerIocsh();
|
||||
epicsShareExtern void registerGWClientIocsh();
|
||||
epicsShareExtern void gwServerShutdown();
|
||||
epicsShareExtern void gwClientShutdown();
|
||||
|
@ -7,11 +7,9 @@
|
||||
#include <pv/epicsException.h>
|
||||
#include <pv/serverContext.h>
|
||||
#include <pv/logger.h>
|
||||
#include <pv/reftrack.h>
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
#include "helper.h"
|
||||
#include "iocshelper.h"
|
||||
#include "pva2pva.h"
|
||||
#include "server.h"
|
||||
|
||||
@ -120,327 +118,218 @@ GWServerChannelProvider::~GWServerChannelProvider()
|
||||
std::cout<<"GW Server dtor\n";
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
static epicsMutex gbllock;
|
||||
static std::tr1::shared_ptr<pva::ServerContext> gblctx;
|
||||
|
||||
void startServer()
|
||||
void ServerConfig::drop(const char *client, const char *channel)
|
||||
{
|
||||
try {
|
||||
Guard G(gbllock);
|
||||
if(gblctx) {
|
||||
printf("Already started\n");
|
||||
return;
|
||||
}
|
||||
if(!client)
|
||||
client= "";
|
||||
if(!channel)
|
||||
channel = "";
|
||||
// TODO: channel glob match
|
||||
|
||||
pva::ChannelProvider::shared_pointer client(pva::ChannelProviderRegistry::clients()->getProvider("pva"));
|
||||
GWServerChannelProvider::shared_pointer server(new GWServerChannelProvider(client));
|
||||
FOREACH(clients_t::const_iterator, it, end, clients)
|
||||
{
|
||||
if(client[0]!='\0' && client[0]!='*' && it->first!=client)
|
||||
continue;
|
||||
|
||||
gblctx = pva::ServerContext::create(pva::ServerContext::Config()
|
||||
.provider(server)
|
||||
.config(pva::ConfigurationBuilder()
|
||||
.push_env()
|
||||
.build()));
|
||||
}catch(std::exception& e){
|
||||
printf("Error: %s\n", e.what());
|
||||
}
|
||||
}
|
||||
const GWServerChannelProvider::shared_pointer& prov(it->second);
|
||||
|
||||
void stopServer()
|
||||
{
|
||||
try {
|
||||
Guard G(gbllock);
|
||||
ChannelCacheEntry::shared_pointer entry;
|
||||
|
||||
if(!gblctx) {
|
||||
printf("Not started\n");
|
||||
return;
|
||||
} else {
|
||||
gblctx->shutdown();
|
||||
gblctx.reset();
|
||||
}
|
||||
}catch(std::exception& e){
|
||||
printf("Error: %s\n", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void infoServer(int lvl)
|
||||
{
|
||||
try {
|
||||
Guard G(gbllock);
|
||||
|
||||
if(gblctx) {
|
||||
gblctx->printInfo();
|
||||
} else {
|
||||
printf("Not running");
|
||||
}
|
||||
}catch(std::exception& e){
|
||||
printf("Error: %s\n", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void statusServer(int lvl, const char *chanexpr)
|
||||
{
|
||||
try{
|
||||
pva::ServerContext::shared_pointer ctx;
|
||||
// find the channel, if it's there
|
||||
{
|
||||
Guard G(gbllock);
|
||||
ctx = gblctx;
|
||||
}
|
||||
if(!ctx) {
|
||||
std::cout<<"Not running\n";
|
||||
return;
|
||||
}
|
||||
Guard G(prov->cache.cacheLock);
|
||||
|
||||
bool iswild = chanexpr ? (strchr(chanexpr, '?') || strchr(chanexpr, '*')) : false;
|
||||
if(chanexpr && lvl<1)
|
||||
lvl=1; // giving a channel implies at least channel level of detail
|
||||
|
||||
const std::vector<pva::ChannelProvider::shared_pointer>& prov(ctx->getChannelProviders());
|
||||
|
||||
std::cout<<"Server has "<<prov.size()<<" providers\n";
|
||||
for(size_t i=0; i<prov.size(); i++)
|
||||
{
|
||||
pva::ChannelProvider* p = prov[i].get();
|
||||
std::cout<<"Provider: "<<(p ? p->getProviderName() : std::string("NULL"))<<"\n";
|
||||
if(!p) continue;
|
||||
GWServerChannelProvider *scp = dynamic_cast<GWServerChannelProvider*>(p);
|
||||
if(!scp) continue;
|
||||
|
||||
ChannelCache::entries_t entries;
|
||||
|
||||
size_t ncache, ncleaned, ndust;
|
||||
{
|
||||
Guard G(scp->cache.cacheLock);
|
||||
|
||||
ncache = scp->cache.entries.size();
|
||||
ncleaned = scp->cache.cleanerRuns;
|
||||
ndust = scp->cache.cleanerDust;
|
||||
|
||||
if(lvl>0) {
|
||||
if(!chanexpr || iswild) { // no string or some glob pattern
|
||||
entries = scp->cache.entries; // copy of std::map
|
||||
} else if(chanexpr) { // just one channel
|
||||
ChannelCache::entries_t::iterator it(scp->cache.entries.find(chanexpr));
|
||||
if(it!=scp->cache.entries.end())
|
||||
entries[it->first] = it->second;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::cout<<"Cache has "<<ncache<<" channels. Cleaned "
|
||||
<<ncleaned<<" times closing "<<ndust<<" channels\n";
|
||||
|
||||
if(lvl<=0)
|
||||
ChannelCache::entries_t::iterator it = prov->cache.entries.find(channel);
|
||||
if(it==prov->cache.entries.end())
|
||||
continue;
|
||||
|
||||
FOREACH(ChannelCache::entries_t::const_iterator, it, end, entries) {
|
||||
const std::string& channame = it->first;
|
||||
if(iswild && !epicsStrGlobMatch(channame.c_str(), chanexpr))
|
||||
continue;
|
||||
std::cout<<"Drop from "<<it->first<<" : "<<it->second->channelName<<"\n";
|
||||
|
||||
ChannelCacheEntry& E = *it->second;
|
||||
ChannelCacheEntry::mon_entries_t::lock_vector_type mons;
|
||||
size_t nsrv, nmon;
|
||||
bool dropflag;
|
||||
const char *chstate;
|
||||
{
|
||||
Guard G(E.mutex());
|
||||
chstate = pva::Channel::ConnectionStateNames[E.channel->getConnectionState()];
|
||||
nsrv = E.interested.size();
|
||||
nmon = E.mon_entries.size();
|
||||
dropflag = E.dropPoke;
|
||||
|
||||
if(lvl>1)
|
||||
mons = E.mon_entries.lock_vector();
|
||||
}
|
||||
|
||||
std::cout<<chstate
|
||||
<<" Client Channel '"<<channame
|
||||
<<"' used by "<<nsrv<<" Server channel(s) with "
|
||||
<<nmon<<" unique subscription(s) "
|
||||
<<(dropflag?'!':'_')<<"\n";
|
||||
|
||||
if(lvl<=1)
|
||||
continue;
|
||||
|
||||
FOREACH(ChannelCacheEntry::mon_entries_t::lock_vector_type::const_iterator, it2, end2, mons) {
|
||||
MonitorCacheEntry& ME = *it2->second;
|
||||
|
||||
MonitorCacheEntry::interested_t::vector_type usrs;
|
||||
size_t nsrvmon;
|
||||
#ifdef USE_MSTATS
|
||||
pvd::Monitor::Stats mstats;
|
||||
#endif
|
||||
bool hastype, hasdata, isdone;
|
||||
{
|
||||
Guard G(ME.mutex());
|
||||
|
||||
nsrvmon = ME.interested.size();
|
||||
hastype = !!ME.typedesc;
|
||||
hasdata = !!ME.lastelem;
|
||||
isdone = ME.done;
|
||||
|
||||
#ifdef USE_MSTATS
|
||||
if(ME.mon)
|
||||
ME.mon->getStats(mstats);
|
||||
#endif
|
||||
|
||||
if(lvl>2)
|
||||
usrs = ME.interested.lock_vector();
|
||||
}
|
||||
|
||||
// TODO: how to describe pvRequest in a compact way...
|
||||
std::cout<<" Client Monitor used by "<<nsrvmon<<" Server monitors, "
|
||||
<<"Has "<<(hastype?"":"not ")
|
||||
<<"opened, Has "<<(hasdata?"":"not ")
|
||||
<<"recv'd some data, Has "<<(isdone?"":"not ")<<"finalized\n"
|
||||
" "<< epicsAtomicGetSizeT(&ME.nwakeups)<<" wakeups "
|
||||
<<epicsAtomicGetSizeT(&ME.nevents)<<" events\n";
|
||||
#ifdef USE_MSTATS
|
||||
if(mstats.nempty || mstats.nfilled || mstats.noutstanding)
|
||||
std::cout<<" US monitor queue "<<mstats.nfilled
|
||||
<<" filled, "<<mstats.noutstanding
|
||||
<<" outstanding, "<<mstats.nempty<<" empty\n";
|
||||
#endif
|
||||
if(lvl<=2)
|
||||
continue;
|
||||
|
||||
FOREACH(MonitorCacheEntry::interested_t::vector_type::const_iterator, it3, end3, usrs) {
|
||||
MonitorUser& MU = **it3;
|
||||
|
||||
size_t nempty, nfilled, nused, total;
|
||||
std::string remote;
|
||||
bool isrunning;
|
||||
{
|
||||
Guard G(MU.mutex());
|
||||
|
||||
nempty = MU.empty.size();
|
||||
nfilled = MU.filled.size();
|
||||
nused = MU.inuse.size();
|
||||
isrunning = MU.running;
|
||||
|
||||
GWChannel::shared_pointer srvchan(MU.srvchan.lock());
|
||||
if(srvchan)
|
||||
remote = srvchan->address;
|
||||
else
|
||||
remote = "<unknown>";
|
||||
}
|
||||
total = nempty + nfilled + nused;
|
||||
|
||||
std::cout<<" Server monitor from "
|
||||
<<remote
|
||||
<<(isrunning?"":" Paused")
|
||||
<<" buffer "<<nfilled<<"/"<<total
|
||||
<<" out "<<nused<<"/"<<total
|
||||
<<" "<<epicsAtomicGetSizeT(&MU.nwakeups)<<" wakeups "
|
||||
<<epicsAtomicGetSizeT(&MU.nevents)<<" events "
|
||||
<<epicsAtomicGetSizeT(&MU.ndropped)<<" drops\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
entry = it->second;
|
||||
prov->cache.entries.erase(it); // drop out of cache (TODO: not required)
|
||||
}
|
||||
|
||||
}catch(std::exception& e){
|
||||
std::cerr<<"Error: "<<e.what()<<"\n";
|
||||
// trigger client side disconnect (recursively calls call CRequester::channelStateChange())
|
||||
// TODO: shouldn't need this
|
||||
entry->channel->destroy();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
void dropChannel(const char *chan)
|
||||
void ServerConfig::status_server(int lvl, const char *server)
|
||||
{
|
||||
if(!chan) return;
|
||||
try {
|
||||
pva::ServerContext::shared_pointer ctx(gblctx);
|
||||
if(!ctx) {
|
||||
std::cout<<"Not running\n";
|
||||
return;
|
||||
}
|
||||
std::cout<<"Find and force drop channel '"<<chan<<"'\n";
|
||||
if(!server)
|
||||
server = "";
|
||||
|
||||
const std::vector<pva::ChannelProvider::shared_pointer>& prov(ctx->getChannelProviders());
|
||||
FOREACH(servers_t::const_iterator, it, end, servers)
|
||||
{
|
||||
if(server[0]!='\0' && server[0]!='*' && it->first!=server)
|
||||
continue;
|
||||
|
||||
for(size_t i=0; i<prov.size(); i++)
|
||||
const pva::ServerContext::shared_pointer& serv(it->second);
|
||||
std::cout<<"==> Server: "<<it->first<<"\n";
|
||||
serv->printInfo(std::cout);
|
||||
std::cout<<"<== Server: "<<it->first<<"\n\n";
|
||||
// TODO: print client list somehow
|
||||
}
|
||||
}
|
||||
|
||||
void ServerConfig::status_client(int lvl, const char *client, const char *channel)
|
||||
{
|
||||
if(!client)
|
||||
client= "";
|
||||
if(!channel)
|
||||
channel = "";
|
||||
|
||||
bool iswild = strchr(channel, '?') || strchr(channel, '*');
|
||||
|
||||
FOREACH(clients_t::const_iterator, it, end, clients)
|
||||
{
|
||||
if(client[0]!='\0' && client[0]!='*' && it->first!=client)
|
||||
continue;
|
||||
|
||||
const GWServerChannelProvider::shared_pointer& prov(it->second);
|
||||
|
||||
std::cout<<"==> Client: "<<it->first<<"\n";
|
||||
|
||||
ChannelCache::entries_t entries;
|
||||
|
||||
size_t ncache, ncleaned, ndust;
|
||||
{
|
||||
pva::ChannelProvider* p = prov[i].get();
|
||||
if(!p) continue;
|
||||
GWServerChannelProvider *scp = dynamic_cast<GWServerChannelProvider*>(p);
|
||||
if(!scp) continue;
|
||||
Guard G(prov->cache.cacheLock);
|
||||
|
||||
ChannelCacheEntry::shared_pointer entry;
|
||||
ncache = prov->cache.entries.size();
|
||||
ncleaned = prov->cache.cleanerRuns;
|
||||
ndust = prov->cache.cleanerDust;
|
||||
|
||||
// find the channel, if it's there
|
||||
{
|
||||
Guard G(scp->cache.cacheLock);
|
||||
|
||||
ChannelCache::entries_t::iterator it = scp->cache.entries.find(chan);
|
||||
if(it==scp->cache.entries.end())
|
||||
continue;
|
||||
|
||||
std::cout<<"Found in provider "<<p->getProviderName()<<"\n";
|
||||
|
||||
entry = it->second;
|
||||
scp->cache.entries.erase(it); // drop out of cache (TODO: not required)
|
||||
if(lvl>0) {
|
||||
if(!iswild) { // no string or some glob pattern
|
||||
entries = prov->cache.entries; // copy of std::map
|
||||
} else { // just one channel
|
||||
ChannelCache::entries_t::iterator it(prov->cache.entries.find(channel));
|
||||
if(it!=prov->cache.entries.end())
|
||||
entries[it->first] = it->second;
|
||||
}
|
||||
}
|
||||
|
||||
// trigger client side disconnect (recursively calls call CRequester::channelStateChange())
|
||||
entry->channel->destroy();
|
||||
}
|
||||
|
||||
std::cout<<"Done\n";
|
||||
}catch(std::exception& e){
|
||||
std::cerr<<"Error: "<<e.what()<<"\n";
|
||||
std::cout<<"Cache has "<<ncache<<" channels. Cleaned "
|
||||
<<ncleaned<<" times closing "<<ndust<<" channels\n";
|
||||
|
||||
if(lvl<=0)
|
||||
continue;
|
||||
|
||||
FOREACH(ChannelCache::entries_t::const_iterator, it2, end2, entries)
|
||||
{
|
||||
const std::string& channame = it2->first;
|
||||
if(iswild && !epicsStrGlobMatch(channame.c_str(), channel))
|
||||
continue;
|
||||
|
||||
ChannelCacheEntry& E = *it2->second;
|
||||
ChannelCacheEntry::mon_entries_t::lock_vector_type mons;
|
||||
size_t nsrv, nmon;
|
||||
bool dropflag;
|
||||
const char *chstate;
|
||||
{
|
||||
Guard G(E.mutex());
|
||||
chstate = pva::Channel::ConnectionStateNames[E.channel->getConnectionState()];
|
||||
nsrv = E.interested.size();
|
||||
nmon = E.mon_entries.size();
|
||||
dropflag = E.dropPoke;
|
||||
|
||||
if(lvl>1)
|
||||
mons = E.mon_entries.lock_vector();
|
||||
}
|
||||
|
||||
std::cout<<chstate
|
||||
<<" Client Channel '"<<channame
|
||||
<<"' used by "<<nsrv<<" Server channel(s) with "
|
||||
<<nmon<<" unique subscription(s) "
|
||||
<<(dropflag?'!':'_')<<"\n";
|
||||
|
||||
if(lvl<=1)
|
||||
continue;
|
||||
|
||||
FOREACH(ChannelCacheEntry::mon_entries_t::lock_vector_type::const_iterator, it2, end2, mons) {
|
||||
MonitorCacheEntry& ME = *it2->second;
|
||||
|
||||
MonitorCacheEntry::interested_t::vector_type usrs;
|
||||
size_t nsrvmon;
|
||||
#ifdef USE_MSTATS
|
||||
pvd::Monitor::Stats mstats;
|
||||
#endif
|
||||
bool hastype, hasdata, isdone;
|
||||
{
|
||||
Guard G(ME.mutex());
|
||||
|
||||
nsrvmon = ME.interested.size();
|
||||
hastype = !!ME.typedesc;
|
||||
hasdata = !!ME.lastelem;
|
||||
isdone = ME.done;
|
||||
|
||||
#ifdef USE_MSTATS
|
||||
if(ME.mon)
|
||||
ME.mon->getStats(mstats);
|
||||
#endif
|
||||
|
||||
if(lvl>2)
|
||||
usrs = ME.interested.lock_vector();
|
||||
}
|
||||
|
||||
// TODO: how to describe pvRequest in a compact way...
|
||||
std::cout<<" Client Monitor used by "<<nsrvmon<<" Server monitors, "
|
||||
<<"Has "<<(hastype?"":"not ")
|
||||
<<"opened, Has "<<(hasdata?"":"not ")
|
||||
<<"recv'd some data, Has "<<(isdone?"":"not ")<<"finalized\n"
|
||||
" "<< epicsAtomicGetSizeT(&ME.nwakeups)<<" wakeups "
|
||||
<<epicsAtomicGetSizeT(&ME.nevents)<<" events\n";
|
||||
#ifdef USE_MSTATS
|
||||
if(mstats.nempty || mstats.nfilled || mstats.noutstanding)
|
||||
std::cout<<" US monitor queue "<<mstats.nfilled
|
||||
<<" filled, "<<mstats.noutstanding
|
||||
<<" outstanding, "<<mstats.nempty<<" empty\n";
|
||||
#endif
|
||||
if(lvl<=2)
|
||||
continue;
|
||||
|
||||
FOREACH(MonitorCacheEntry::interested_t::vector_type::const_iterator, it3, end3, usrs) {
|
||||
MonitorUser& MU = **it3;
|
||||
|
||||
size_t nempty, nfilled, nused, total;
|
||||
std::string remote;
|
||||
bool isrunning;
|
||||
{
|
||||
Guard G(MU.mutex());
|
||||
|
||||
nempty = MU.empty.size();
|
||||
nfilled = MU.filled.size();
|
||||
nused = MU.inuse.size();
|
||||
isrunning = MU.running;
|
||||
|
||||
GWChannel::shared_pointer srvchan(MU.srvchan.lock());
|
||||
if(srvchan)
|
||||
remote = srvchan->address;
|
||||
else
|
||||
remote = "<unknown>";
|
||||
}
|
||||
total = nempty + nfilled + nused;
|
||||
|
||||
std::cout<<" Server monitor from "
|
||||
<<remote
|
||||
<<(isrunning?"":" Paused")
|
||||
<<" buffer "<<nfilled<<"/"<<total
|
||||
<<" out "<<nused<<"/"<<total
|
||||
<<" "<<epicsAtomicGetSizeT(&MU.nwakeups)<<" wakeups "
|
||||
<<epicsAtomicGetSizeT(&MU.nevents)<<" events "
|
||||
<<epicsAtomicGetSizeT(&MU.ndropped)<<" drops\n";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
std::cout<<"<== Client: "<<it->first<<"\n\n";
|
||||
}
|
||||
}
|
||||
|
||||
void pvadebug(const char *lvl)
|
||||
{
|
||||
try {
|
||||
std::string lname(lvl ? lvl : "warn");
|
||||
pva::pvAccessLogLevel elvl;
|
||||
if(lname=="off" || lname=="-2")
|
||||
elvl = pva::logLevelOff;
|
||||
else if(lname=="fatal" || lname=="-1")
|
||||
elvl = pva::logLevelFatal;
|
||||
else if(lname=="error" || lname=="0")
|
||||
elvl = pva::logLevelError;
|
||||
else if(lname=="warn" || lname=="1")
|
||||
elvl = pva::logLevelWarn;
|
||||
else if(lname=="info" || lname=="2")
|
||||
elvl = pva::logLevelInfo;
|
||||
else if(lname=="debug" || lname=="3")
|
||||
elvl = pva::logLevelDebug;
|
||||
else if(lname=="trace" || lname=="4")
|
||||
elvl = pva::logLevelTrace;
|
||||
else if(lname=="all" || lname=="5")
|
||||
elvl = pva::logLevelAll;
|
||||
else
|
||||
throw std::invalid_argument("Unknown level name, must be one of off|fatal|error|warn|info|debug|trace|all");
|
||||
|
||||
pva::pvAccessSetLogLevel(elvl);
|
||||
}catch(std::exception& e){
|
||||
std::cout<<"Error: "<<e.what()<<"\n";
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void registerGWServerIocsh()
|
||||
{
|
||||
iocshRegister<&startServer>("gwstart");
|
||||
iocshRegister<&stopServer>("gwstop");
|
||||
iocshRegister<int, &infoServer>("pvasr", "level");
|
||||
iocshRegister<int, const char*, &statusServer>("gwstatus", "level", "channel name/pattern");
|
||||
iocshRegister<const char*, &dropChannel>("gwdrop", "channel");
|
||||
iocshRegister<const char*, &pvadebug>("pvadebug", "level");
|
||||
|
||||
epics::registerRefCounter("ChannelCacheEntry", &ChannelCacheEntry::num_instances);
|
||||
epics::registerRefCounter("ChannelCacheEntry::CRequester", &ChannelCacheEntry::CRequester::num_instances);
|
||||
epics::registerRefCounter("GWChannel", &GWChannel::num_instances);
|
||||
epics::registerRefCounter("MonitorCacheEntry", &MonitorCacheEntry::num_instances);
|
||||
epics::registerRefCounter("MonitorUser", &MonitorUser::num_instances);
|
||||
}
|
||||
|
||||
void gwServerShutdown()
|
||||
{
|
||||
stopServer();
|
||||
}
|
||||
|
@ -50,7 +50,9 @@ struct epicsShareClass ServerConfig {
|
||||
|
||||
ServerConfig() :debug(0), interactive(true) {}
|
||||
|
||||
static std::tr1::shared_ptr<ServerConfig> instance;
|
||||
void drop(const char *client, const char *channel);
|
||||
void status_server(int lvl, const char *server);
|
||||
void status_client(int lvl, const char *client, const char *channel);
|
||||
};
|
||||
|
||||
#endif // SERVER_H
|
||||
|
@ -7,9 +7,11 @@
|
||||
#include <pv/reftrack.h>
|
||||
#include <pv/pvAccess.h>
|
||||
#include <pv/serverContext.h>
|
||||
#include <pv/iocshelper.h>
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
|
||||
#include "pvahelper.h"
|
||||
#include "iocshelper.h"
|
||||
#include "pvif.h"
|
||||
#include "pdb.h"
|
||||
#include "pdbsingle.h"
|
||||
|
Reference in New Issue
Block a user