rewrite pvalink using 3.16 link support API

This commit is contained in:
Michael Davidsaver
2017-11-09 14:42:17 -06:00
parent d5ee917c6d
commit f0be2f7016
7 changed files with 1231 additions and 994 deletions

View File

@ -16,7 +16,11 @@ qsrv_SRCS += qsrv.cpp
qsrv_SRCS += pdb.cpp
qsrv_SRCS += pdbsingle.cpp
qsrv_SRCS += dbf_copy.cpp
#qsrv_SRCS += pvalink.cpp
qsrv_SRCS += pvalink.cpp
qsrv_SRCS += pvalink_lset.cpp
qsrv_SRCS += pvalink_jlif.cpp
qsrv_SRCS += pvalink_link.cpp
qsrv_SRCS += pvalink_channel.cpp
qsrv_SRCS += tpool.cpp
qsrv_SRCS += demo.cpp
qsrv_SRCS += imagedemo.c

View File

@ -14,565 +14,52 @@
#include <alarm.h>
#include <epicsExit.h>
#include <epicsAtomic.h>
#include <epicsThreadPool.h>
#include <link.h>
#include <dbJLink.h>
#include <epicsStdio.h> /* redirects stdout/stderr */
#include <pv/pvAccess.h>
#include <pv/clientFactory.h>
#include <pv/iocshelper.h>
#include <pv/reftrack.h>
#define epicsExportSharedSymbols
#include <shareLib.h>
#include "helper.h"
#include "iocshelper.h"
#include "pvif.h"
#include "pvalink.h"
#include <epicsExport.h>
int pvaLinkDebug;
int pvaLinkIsolate;
namespace pvalink {
pvaGlobal_t *pvaGlobal;
std::tr1::shared_ptr<pvaLinkChannel> pvaGlobal_t::connect(const char *name)
{
pvd::Mutex lock;
std::tr1::shared_ptr<pvaLinkChannel> ret;
bool doconn = false;
{
Guard G(lock);
channels_t::iterator it = channels.find(name);
if(it==channels.end()) {
if(pvaLinkDebug>2) std::cerr<<"pvalink open channel for '"<<name<<"'\n";
std::tr1::shared_ptr<pvaLinkChannel> C(new pvaLinkChannel(name));
ret = channels[name] = C;
doconn = true;
} else {
if(pvaLinkDebug>2) std::cerr<<"pvalink reuse channel for '"<<name<<"'\n";
ret = it->second;
}
}
if(doconn) {
ret->doConnect();
}
return ret;
}
pvaGlobal_t::pvaGlobal_t()
:provider(pva::getChannelProviderRegistry()->getProvider(pvaLinkIsolate ? "QSRV" : "pva"))
,reqtype(pvd::getFieldCreate()->createFieldBuilder()
->createStructure())
,create(pvd::getPVDataCreate())
{
if(!provider)
throw std::runtime_error("No pva provider");
epicsThreadPoolConfig conf;
epicsThreadPoolConfigDefaults(&conf);
conf.workerPriority = epicsThreadPriorityLow+10; // similar to once thread
conf.initialThreads = 1;
scanpool = epicsThreadPoolCreate(&conf);
if(!scanpool)
throw std::runtime_error("Failed to create pvaLink scan pool");
}
size_t pvaLinkChannel::refs;
size_t pvaLink::refs;
void pvaLinkChannel::channelStateChange(pva::Channel::shared_pointer const & channel, pva::Channel::ConnectionState connectionState)
{
Guard G(lock);
assert(chan==channel);
if(pvaLinkDebug>2) std::cerr<<"pvaLink channelStateChange "<<name<<pva::Channel::ConnectionStateNames[connectionState]<<"\n";
if(connectionState!=pva::Channel::CONNECTED) {
FOREACH(links_t::const_iterator, it, end, links) {
pvaLink* L = *it;
L->detach();
}
lastval.reset();
isatomic.reset();
if(chanmon) {
chanmon->destroy();
chanmon.reset();
std::cerr<<"pvaLink: monitor destroy "<<name<<"\n";
}
triggerProc(false, true); // force scan to get disconnect
} else if(!chanmon) {
pvd::PVStructurePtr pvreq(pvaGlobal->create->createPVStructure(pvaGlobal->reqtype));
Guard G(lock);
chanmon = channel->createMonitor(shared_from_this(), pvreq);
chan = channel;
if(pvaLinkDebug>4) std::cerr<<"pvaLink channelStateChange start monitor\n";
}
}
void pvaLinkChannel::monitorConnect(pvd::Status const & status,
pva::Monitor::shared_pointer const & monitor,
pvd::StructureConstPtr const & structure)
{
if(pvaLinkDebug>4) std::cerr<<"pvaLink monitorConnect "<<name<<"\n";
if(!status.isSuccess()) {
errlogPrintf("pvaLink connect monitor fails %s: %s\n", name.c_str(), status.getMessage().c_str());
return;
}
Guard G(lock);
lastval = pvaGlobal->create->createPVStructure(structure);
isatomic = lastval->getSubField<pvd::PVScalar>("record._options.atomic");
if(!isatomic)
std::cerr<<"================ not atomic\n"<<lastval<<"\n";
chanmon = monitor;
pvd::Status sstatus = monitor->start();
if(!sstatus.isSuccess()) {
errlogPrintf("pvaLink start monitor fails %s: %s\n", name.c_str(), sstatus.getMessage().c_str());
return;
}
FOREACH(links_t::const_iterator, it, end, links) {
pvaLink* L = *it;
L->attach();
}
}
void pvaLinkChannel::monitorEvent(pva::Monitor::shared_pointer const & monitor)
{
Guard G(lock);
if(pvaLinkDebug>3) std::cerr<<"pvaLink monitorEvent "<<name<<"\n";
if(!lastval) return;
pva::MonitorElementPtr elem;
bool updated = false;
bool atomic = false;
while(!!(elem=monitor->poll())) {
try{
lastval->copyUnchecked(*elem->pvStructurePtr, *elem->changedBitSet);
atomic = isatomic ? isatomic->getAs<pvd::boolean>() : false;
updated = true;
monitor->release(elem);
}catch(...){
monitor->release(elem);
throw;
}
}
if(updated) triggerProc(atomic);
}
// caller must have channel's lock
void pvaLinkChannel::triggerProc(bool atomic, bool force)
{
bool doscan = false;
// check if we actually need to scan anything
FOREACH(links_t::const_iterator, it, end, links) {
pvaLink* L = *it;
if ((L->linkmods & pvlOptCP) ||
((L->linkmods & pvlOptCPP) && L->plink->precord->scan == 0))
{
doscan = true;
}
}
if(force || (doscan && !scanself)) { // need to scan, and not already queued, then queue
int ret = epicsJobQueue(scanjob);
if(ret && ret!=S_pool_paused) {
errlogPrintf("pvaLink: failed to queue scan from %s\n", name.c_str());
} else {
scanself = shared_from_this();
scanatomic = atomic;
if(pvaLinkDebug>1) std::cerr<<"pvaLink trigger proc"<< name<<"\n";
}
}
}
void pvaLinkChannel::scan(void* arg, epicsJobMode mode)
{
pvaLinkChannel *selfraw = (pvaLinkChannel*)arg;
if(mode!=epicsJobModeRun) return; // we will cleanup later
pvaGlobal_t::Scan myscan;
try {
if(pvaLinkDebug>3) std::cerr<<"pvaLink scan "<<selfraw->name<<"\n";
std::tr1::shared_ptr<pvaLinkChannel> self;
Guard G(selfraw->lock);
selfraw->scanself.swap(self); // we take over ref, to keep channel alive, and allow re-queue
assert(self.get()==selfraw); // if scanself wasn't set, then the channel may be free'd
myscan.chan = self; // store a weak ref
links_t links(self->links); // TODO: avoid copy if set not changing
bool usecached = self->scanatomic && !!self->chanmon;
myscan.usecached = usecached;
if(usecached) {
if(pvaLinkDebug>4) std::cerr<<"populate cache\n";
FOREACH(links_t::const_iterator, it, end, links) {
pvaLink *link = *it;
link->get(link->atomcache);
if(pvaLinkDebug>4)
std::cerr<<"== "<<self->name<<"."<<link->field<<" "<<link->valueS<<"\n";
}
}
pvaGlobal->scanmagic.set(usecached ? &myscan : NULL);
{
UnGuard U(G);
// we may scan a record after the originating link is re-targeted
FOREACH(links_t::const_iterator, it, end, links) {
pvaLink *link = *it;
dbCommon *prec=link->plink->precord;
if ((link->linkmods & pvlOptCP) ||
((link->linkmods & pvlOptCPP) && prec->scan == 0))
{
DBScanLocker L(prec);
if(pvaLinkDebug>3) std::cerr<<prec->name<<" PVA link scan\n";
dbProcess(prec);
}
}
}
// another scan may be queued by this point
if(usecached) {
FOREACH(links_t::const_iterator, it, end, links) {
pvaLink *link = *it;
link->atomcache.clear();
}
}
}catch(std::exception& e){
errlogPrintf("%s: pvaLink exception while processing: %s\n", selfraw->name.c_str(), e.what());
// what to do?
}
pvaGlobal->scanmagic.set(NULL);
}
} // namespace pvalink
using namespace pvalink;
namespace {
#define TRY pvaLink *self = static_cast<pvaLink*>(plink->value.json.jlink); assert(self->alive); try
#define CATCH(LOC) catch(std::exception& e) { \
errlogPrintf("pvaLink " #LOC " fails %s: %s\n", plink->precord->name, e.what()); \
}
void pvaOpenLink(DBLINK *plink)
{
try {
pvaLink* self((pvaLink*)plink->value.json.jlink);
self->plink = plink;
std::cerr<<plink->precord->name<<" Open link to '"<<self->name<<"'\n";
if(!self->name.empty()) {
self->open();
}
}CATCH(pvaOpenLink)
}
void pvaRemoveLink(struct dbLocker *locker, DBLINK *plink)
{
try {
p2p::auto_ptr<pvaLink> self((pvaLink*)plink->value.json.jlink);
assert(self->alive);
Guard G(self->lchan->lock);
// TODO: ???
std::cerr<<__FUNCTION__<<" "<<self->plink->precord->name<<" -> "<<self->name<<"\n";
}CATCH(pvaRemoteLink)
}
int pvaIsConnected(const DBLINK *plink)
{
TRY {
if(pvaGlobal->scanmagic.get()) return 1;
Guard G(self->lchan->lock);
return !!self->lchan->chanmon && (self->valueS || self->valueA);
}CATCH(pvaIsConnected)
return 0;
}
int pvaGetDBFtype(const DBLINK *plink)
{
TRY {
if(pvaGlobal->scanmagic.get() && self->atomcache.valid)
return PVD2DBR(self->atomcache.etype);
Guard G(self->lchan->lock);
pvd::ScalarType ftype;
if(self->valueS)
ftype = self->valueS->getScalar()->getScalarType();
else if(self->valueA)
ftype = self->valueA->getScalarArray()->getElementType();
else
return DBF_LONG;
switch(ftype) {
#define CASE(BASETYPE, PVATYPE, DBFTYPE, PVACODE) case pvd::pv##PVACODE: return DBF_##DBFTYPE;
#define CASE_SQUEEZE_INT64
#include "pv/typemap.h"
#undef CASE_SQUEEZE_INT64
#undef CASE
case pvd::pvString: return DBF_STRING; // TODO: long string?
}
}CATCH(pvaIsConnected)
return DBF_LONG;
}
long pvaGetElements(const DBLINK *plink, long *nelements)
{
TRY {
if(pvaGlobal->scanmagic.get() && self->atomcache.valid) {
if(self->atomcache.scalar) return 1;
else return self->atomcache.valueA.size();
}
Guard G(self->lchan->lock);
if(self->valueA)
return self->valueA->getLength();
else
return 1;
}CATCH(pvaIsConnected)
return 1;
}
long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer,
long *pnRequest)
{
TRY {
if(pvaGlobal->scanmagic.get() && self->atomcache.valid) {
const void *buf;
size_t count = pnRequest ? *pnRequest : 1;
if(self->atomcache.scalar) {
buf = (void*)&self->atomcache.valueS;
count = std::min((size_t)1u, count);
} else {
buf = self->atomcache.valueA.data();
count = std::min(self->atomcache.valueA.size(), count);
}
pvd::castUnsafeV(count, DBR2PVD(dbrType), pbuffer, self->atomcache.etype, buf);
// if(dbrType==DBF_DOUBLE) {
// std::cerr<<"get from cache "<<*(double*)pbuffer<<"\n";
// }
if(pnRequest) *pnRequest = count;
return 0;
}
Guard G(self->lchan->lock);
if(self->valueA) {
pvd::shared_vector<const void> arrval;
self->valueA->getAs<const void>(arrval);
long nelem = std::min(*pnRequest, (long)arrval.size());
pvd::castUnsafeV(nelem, DBR2PVD(dbrType), pbuffer, arrval.original_type(), arrval.data());
if(pnRequest) *pnRequest = nelem;
} else if(self->valueS) {
switch(dbrType) {
#define CASE(BASETYPE, PVATYPE, DBFTYPE, PVACODE) case DBR_##DBFTYPE: *((epics##BASETYPE*)pbuffer) = self->valueS->getAs<epics##BASETYPE>(); break;
#define CASE_SKIP_BOOL
#define CASE_ENUM
#include "pv/typemap.h"
#undef CASE_SKIP_BOOL
#undef CASE_ENUM
#undef CASE
case DBR_STRING: {
char *cbuf = (char*)pbuffer;
strncpy(cbuf, self->valueS->getAs<std::string>().c_str(), MAX_STRING_SIZE);
cbuf[MAX_STRING_SIZE-1] = '\0';
}
break;
default:
throw std::runtime_error("putValue unsupported DBR code");
}
if(pnRequest) *pnRequest = 1;
if(dbrType==DBF_DOUBLE)
std::cerr<<"get direct "<<*(double*)pbuffer<<"\n";
} else {
return -1;
}
return 0;
}CATCH(pvaIsConnected)
return S_dbLib_badLink;
}
long pvaGetControlLimits(const DBLINK *plink, double *lo, double *hi)
{
TRY {
//Guard G(self->lchan->lock);
*lo = *hi = 0.0;
}CATCH(pvaIsConnected)
return 0;
}
long pvaGetGraphicLimits(const DBLINK *plink, double *lo, double *hi)
{
TRY {
//Guard G(self->lchan->lock);
*lo = *hi = 0.0;
}CATCH(pvaIsConnected)
return 0;
}
long pvaGetAlarmLimits(const DBLINK *plink, double *lolo, double *lo,
double *hi, double *hihi)
{
TRY {
//Guard G(self->lchan->lock);
*lolo = *lo = *hi = *hihi = 0.0;
}CATCH(pvaIsConnected)
return 0;
}
long pvaGetPrecision(const DBLINK *plink, short *precision)
{
TRY {
//Guard G(self->lchan->lock);
*precision = 0;
}CATCH(pvaIsConnected)
return 0;
}
long pvaGetUnits(const DBLINK *plink, char *units, int unitsSize)
{
TRY {
//Guard G(self->lchan->lock);
}CATCH(pvaIsConnected)
return 0;
}
long pvaGetAlarm(const DBLINK *plink, epicsEnum16 *status,
epicsEnum16 *severity)
{
TRY {
Guard G(self->lchan->lock);
unsigned sevr = INVALID_ALARM;
if(pvaGlobal->scanmagic.get() && self->atomcache.valid) {
sevr = self->atomcache.sevr;
} else if(self->sevr) {
sevr = self->sevr->getAs<epicsInt32>();
}
if(sevr)
*status = LINK_ALARM;
*severity = std::max(0u, std::min(sevr, 3u));
}CATCH(pvaIsConnected)
return 0;
}
long pvaGetTimeStamp(const DBLINK *plink, epicsTimeStamp *pstamp)
{
TRY {
Guard G(self->lchan->lock);
if(pvaGlobal->scanmagic.get() && self->atomcache.valid) {
*pstamp = self->atomcache.time;
} else if(self->sec && self->nsec) {
pstamp->secPastEpoch = self->sec->getAs<epicsUInt32>()-POSIX_TIME_AT_EPICS_EPOCH;
pstamp->nsec = self->sec->getAs<epicsUInt32>();
} else {
epicsTimeGetCurrent(pstamp);
}
}CATCH(pvaIsConnected)
return 0;
}
long pvaPutValue(DBLINK *plink, short dbrType,
const void *pbuffer, long nRequest)
{
TRY {
(void)self;
//Guard G(self->lchan->lock);
return S_db_putDisabled;
}CATCH(pvaIsConnected)
}
void pvaScanForward(DBLINK *plink)
{
TRY {
(void)self;
//Guard G(self->lchan->lock);
}CATCH(pvaIsConnected)
}
#undef TRY
#undef CATCH
lset pva_lset = {
0, 1, // non-const, volatile
&pvaOpenLink,
&pvaRemoveLink,
NULL, NULL, NULL,
&pvaIsConnected,
&pvaGetDBFtype,
&pvaGetElements,
&pvaGetValue,
&pvaGetControlLimits,
&pvaGetGraphicLimits,
&pvaGetAlarmLimits,
&pvaGetPrecision,
&pvaGetUnits,
&pvaGetAlarm,
&pvaGetTimeStamp,
&pvaPutValue,
NULL,
&pvaScanForward
//&pvaReportLink,
};
static void stopPVAPool(void*)
{
// stop CP scans before closing links
epicsThreadPoolControl(pvaGlobal->scanpool, epicsThreadPoolQueueAdd, 0);
epicsThreadPoolWait(pvaGlobal->scanpool, -1.0);
pvaGlobal->queue.close();
}
static void finalizePVA(void*)
{
try {
std::cout<<"cleanupPVALink\n";
//dbAddLinkHook = nextAddLinkHook;
{
Guard G(pvaGlobal->lock);
if(pvaGlobal->channels.size()) {
std::cerr<<"pvaLink still has "<<pvaGlobal->channels.size()
<<"active channels after doCloseLinks()\n";
fprintf(stderr, "pvaLink leaves %zu channels open\n",
pvaGlobal->channels.size());
}
}
epicsThreadPoolDestroy(pvaGlobal->scanpool);
pvaGlobal->scanpool = NULL; // TODO: locking?
delete pvaGlobal;
pvaGlobal = NULL;
if(epics::atomic::get(pvaLink::refs)) {
std::cerr<<"pvaLink leaking "<<epics::atomic::get(pvaLink::refs)<<" links\n";
}
if(epics::atomic::get(pvaLinkChannel::refs)) {
std::cerr<<"pvaLink leaking "<<epics::atomic::get(pvaLinkChannel::refs)<<" channels\n";
}
}catch(std::exception& e){
errlogPrintf("Error initializing pva link handling : %s\n", e.what());
fprintf(stderr, "Error initializing pva link handling : %s\n", e.what());
}
}
@ -583,18 +70,12 @@ void initPVALink(initHookState state)
// so hook registered here will be run after iocShutdown()
// which closes links
try {
std::cout<<"initPVALink\n";
pva::ClientFactory::start();
pvaGlobal = new pvaGlobal_t;
epicsAtExit(finalizePVA, NULL);
//nextAddLinkHook = dbAddLinkHook;
//dbAddLinkHook = &pvaAddLinkHook;
}catch(std::exception& e){
errlogPrintf("Error initializing pva link handling : %s\n", e.what());
cantProceed("Error initializing pva link handling : %s\n", e.what());
}
} else if(state==initHookAfterIocBuilt) {
@ -604,225 +85,18 @@ void initPVALink(initHookState state)
}
}
jlink* pva_alloc_jlink(short dbr)
{
try {
std::cerr<<"alloc jlink\n";
return new pvaLink;
}catch(std::exception& e){
return NULL;
}
}
#define TRY pvaLink *pvt = static_cast<pvaLink*>(pjlink); (void)pvt; try
#define CATCH(RET) catch(std::exception& e){ \
errlogPrintf("Error in %s link: %s\n", __FUNCTION__, e.what()); \
return RET; }
void pva_free_jlink(jlink *pjlink)
{
//TODO: not called on parse error?
TRY {
std::cerr<<"free jlink\n";
delete pvt;
}catch(std::exception& e){
errlogPrintf("Error freeing pva link: %s\n", e.what());
}
}
void pva_end_child(jlink *pjparent, jlink *pjlink)
{}
jlif_result pva_parse_null(jlink *pjlink)
{
TRY{
std::cerr<<"NULL parse\n";
return jlif_stop;
}CATCH(jlif_stop)
}
jlif_result pva_parse_boolean(jlink *pjlink, int val)
{
TRY{
std::cerr<<"bool parse\n";
return jlif_stop;
}CATCH(jlif_stop)
}
jlif_result pva_parse_integer(jlink *pjlink, long num)
{
TRY{
std::cerr<<"INT parse\n";
return jlif_stop;
}CATCH(jlif_stop)
}
jlif_result pva_parse_double(jlink *pjlink, double num)
{
TRY{
std::cerr<<"DOUBLE parse\n";
return jlif_stop;
}CATCH(jlif_stop)
}
jlif_result pva_parse_string(jlink *pjlink, const char *val, size_t len)
{
TRY{
if(pvt->parse_level==0) {
std::string lstr(val, len);
size_t A, B;
A = lstr.find_first_not_of(" \t");
B = lstr.find_first_of(" \t", A);
if(A==lstr.npos || A==B) {
std::cerr<<"Empty PVA target?\n";
return jlif_stop;
}
pvt->name = lstr.substr(A, B==lstr.npos ? lstr.npos : B-A);
for(A = lstr.find_first_not_of(" \t", B),
B = lstr.find_first_of(" \t", A);
A!=lstr.npos;
A = lstr.find_first_not_of(" \t", B),
B = lstr.find_first_of(" \t", A)
){
size_t C = B==lstr.npos ? lstr.npos : B-A;
if(lstr.compare(A, C, "CPP")==0)
pvt->linkmods |= pvlOptCPP;
else if(lstr.compare(A, C, "CP")==0)
pvt->linkmods |= pvlOptCP;
else if(lstr.compare(A, C, "MSI")==0)
pvt->linkmods |= pvlOptMSI;
else if(lstr.compare(A, C, "MSS")==0)
pvt->linkmods |= pvlOptMSS;
else if(lstr.compare(A, C, "MS")==0)
pvt->linkmods |= pvlOptMS;
// else // unknown modifier?
}
std::cerr<<"Link set PVA name '"<<pvt->name<<"'\n";
return jlif_continue;
}
std::cerr<<"STRING parse\n";
return jlif_stop;
}CATCH(jlif_stop)
}
jlif_key_result pva_parse_start_map(jlink *pjlink)
{
TRY{
std::cerr<<"{ parse\n";
return jlif_key_stop;
}CATCH(jlif_key_stop)
}
jlif_result pva_parse_map_key(jlink *pjlink, const char *key, size_t len)
{
TRY{
std::cerr<<"KEY parse\n";
return jlif_stop;
}CATCH(jlif_stop)
}
jlif_result pva_parse_end_map(jlink *pjlink)
{
TRY{
std::cerr<<"} parse\n";
return jlif_stop;
}CATCH(jlif_stop)
}
jlif_result pva_parse_start_array(jlink *pjlink)
{
TRY{
std::cerr<<"[ parse\n";
return jlif_stop;
}CATCH(jlif_stop)
}
jlif_result pva_parse_end_array(jlink *pjlink)
{
TRY{
std::cerr<<"] parse\n";
return jlif_stop;
}CATCH(jlif_stop)
}
struct lset* pva_get_lset(const jlink *pjlink)
{
return &pva_lset;
}
void pva_report(const jlink *rpjlink, int lvl, int indent)
{
jlink *pjlink = const_cast<jlink*>(rpjlink);
TRY {
const char * fname = "???", //TODO: how to find out?
* rname = pvt->plink->precord->name;
int connected = pvt->lchan->chan && pvt->lchan->chanmon;
if(connected) {
if(lvl>=1){
printf("%*s%28s.%-4s ==> pva://%s.%s\n",
indent, "", rname, fname,
pvt->name.c_str(), pvt->field.c_str());
}
} else {
if(lvl>=0){
printf("%*s%28s.%-4s --> pva://%s.%s\n",
indent, "", rname, fname,
pvt->name.c_str(), pvt->field.c_str());
}
}
}CATCH()
}
jlif lsetPVA = {
"pva",
&pva_alloc_jlink,
&pva_free_jlink,
&pva_parse_null,
&pva_parse_boolean,
&pva_parse_integer,
&pva_parse_double,
&pva_parse_string,
&pva_parse_start_map,
&pva_parse_map_key,
&pva_parse_end_map,
&pva_parse_start_array,
&pva_parse_end_array,
&pva_end_child,
&pva_get_lset,
&pva_report,
NULL
};
} // namespace
void pvalr(int level)
{
try {
std::cout<<"pvaLink count "<<epics::atomic::get(pvaLink::refs)<<"\n"
"pvaLinkChannel count "<<epics::atomic::get(pvaLinkChannel::refs)<<"\n";
}catch(std::exception& e){
std::cerr<<"Error :"<<e.what()<<"\n";
}
}
static
void installPVAAddLinkHook()
{
initHookRegister(&initPVALink);
iocshRegister<int, &pvalr>("pvalr", "level");
iocshVariable<int, &pvaLinkDebug>("pvaLinkDebug");
epics::registerRefCounter("pvaLinkChannel", &pvaLinkChannel::num_instances);
epics::registerRefCounter("pvaLink", &pvaLink::num_instances);
}
epicsExportRegistrar(installPVAAddLinkHook);
epicsExportAddress(jlif, lsetPVA);
extern "C" {
epicsExportRegistrar(installPVAAddLinkHook);
epicsExportAddress(jlif, lsetPVA);
epicsExportAddress(int, pvaLinkDebug);
}

View File

@ -16,20 +16,28 @@
#include <alarm.h>
#include <epicsExit.h>
#include <epicsAtomic.h>
#include <epicsThreadPool.h>
#include <link.h>
#include <dbJLink.h>
#include <pv/pvAccess.h>
#include <pv/clientFactory.h>
#include <pva/client.h>
#include <pv/anyscalar.h>
#include <pv/thread.h>
#include <pv/lock.h>
#include <pv/iocshelper.h>
#include "helper.h"
#include "iocshelper.h"
#include "pvif.h"
#include "tpool.h"
extern int pvaLinkDebug;
extern int pvaLinkIsolate;
#if 0
# define TRACE(X) std::cerr<<"PVAL "<<__func__<<" " X <<"\n"
#else
# define TRACE(X) do {} while(0)
#endif
namespace pvalink {
namespace pvd = epics::pvData;
@ -41,289 +49,148 @@ typedef epicsGuardRelease<pvd::Mutex> UnGuard;
struct pvaLink;
struct pvaLinkChannel;
struct pvaGlobal_t {
pva::ChannelProvider::shared_pointer provider;
extern lset pva_lset;
extern jlif lsetPVA;
pvd::StructureConstPtr reqtype;
pvd::PVDataCreatePtr create;
struct pvaLinkConfig : public jlink
{
// configuration, output of jlif parsing
//! Channel (aka PV) name string
std::string channelName;
//! sub-field within addressed PVStructure
std::string fieldName;
size_t queueSize;
enum pp_t {
NPP,
Default, // for put() only. For monitor, treated as NPP
PP, // for put() only, For monitor, treated as NPP
CP, // for monitor only, put treats as pp
CPP, // for monitor only, put treats as pp
} pp;
enum ms_t {
NMS,
MS,
MSS,
MSI,
} ms;
bool defer;
int monorder;
// internals used by jlif parsing
std::string jkey;
pvaLinkConfig();
virtual ~pvaLinkConfig();
};
struct pvaGlobal_t {
pvac::ClientProvider provider_local,
provider_remote;
const pvd::PVDataCreatePtr create;
WorkQueue queue;
pvd::Mutex lock;
struct Scan {
// the PVA channel which triggered this scan
std::tr1::weak_ptr<pvaLinkChannel> chan;
bool usecached;
Scan() :usecached(false) {}
};
epicsThreadPrivate<Scan> scanmagic;
epicsThreadPool *scanpool;
typedef std::map<std::string, std::tr1::shared_ptr<pvaLinkChannel> > channels_t;
// a tuple of channel name and printed pvRequest (or Monitor)
typedef std::pair<std::string, std::string> channels_key_t;
// pvaLinkChannel dtor prunes dead entires
typedef std::map<channels_key_t, std::tr1::weak_ptr<pvaLinkChannel> > channels_t;
// Cache of active Channels (really about caching Monitor)
channels_t channels;
std::tr1::shared_ptr<pvaLinkChannel> connect(const char *name);
pvaGlobal_t();
~pvaGlobal_t()
{
provider->destroy();
epicsThreadPoolDestroy(scanpool);
}
~pvaGlobal_t();
};
extern pvaGlobal_t *pvaGlobal;
struct pvaLinkChannel : public pva::ChannelRequester, pva::MonitorRequester,
std::tr1::enable_shared_from_this<pvaLinkChannel>
struct pvaLinkChannel : public pvac::ClientChannel::MonitorCallback,
public pvac::ClientChannel::PutCallback,
public epicsThreadRunable,
public std::tr1::enable_shared_from_this<pvaLinkChannel>
{
const std::string name;
const pvaGlobal_t::channels_key_t key; // tuple of (channelName, pvRequest key)
const pvd::PVStructure::const_shared_pointer pvRequest; // used with monitor
static size_t refs;
typedef std::set<pvaLink*> links_t;
links_t links;
static size_t num_instances;
pvd::Mutex lock;
pva::Channel::shared_pointer chan;
pvac::ClientChannel chan;
pvac::Monitor op_mon;
pvac::Operation op_put;
pva::Monitor::shared_pointer chanmon;
//pva::ChannelPut::shared_pointer chanput;
size_t num_disconnect;
bool connected;
bool connected_latched; // connection status at the run()
bool isatomic;
bool queued; // added to WorkQueue
pvd::PVStructurePtr lastval;
pvd::PVScalarPtr isatomic;
struct LinkSort {
bool operator()(const pvaLink *L, const pvaLink *R) const;
};
epicsJob *scanjob;
std::tr1::shared_ptr<pvaLinkChannel> scanself; // create ref loop while scan is queued
bool scanatomic;
typedef std::set<pvaLink*, LinkSort> links_t;
pvaLinkChannel(const char *name)
:name(name)
,scanjob(epicsJobCreate(pvaGlobal->scanpool, &pvaLinkChannel::scan, this))
,scanatomic(false)
{
if(!scanjob)
throw std::runtime_error("failed to create job for pvaLink");
epics::atomic::increment(refs);
}
virtual ~pvaLinkChannel() {
Guard G(lock);
assert(links.empty());
epicsJobDestroy(scanjob);
scanjob = NULL;
epics::atomic::decrement(refs);
std::cerr<<"pvaLinkChannel: destroy "<<name<<"\n";
}
// list of currently attached links. maintained by pvaLink ctor/dtor
// TODO: sort by PHAS
links_t links;
void doConnect() {
// TODO: local PVA?
Guard G(lock);
chan = pvaGlobal->provider->createChannel(name, shared_from_this());
channelStateChange(chan, chan->getConnectionState());
}
void doClose() {
Guard G(lock);
errlogPrintf("pvaLink closing %s\n", name.c_str());
channelStateChange(chan, pva::Channel::DESTROYED);
chan->destroy();
chan.reset();
std::cerr<<"pvaLink: channel destroy "<<name<<"\n";
}
// set when 'links' is modified to trigger re-compute of record scan list
bool links_changed;
void triggerProc(bool atomic=false, bool force=false);
pvaLinkChannel(const pvaGlobal_t::channels_key_t& key, const epics::pvData::PVStructure::const_shared_pointer &pvRequest);
virtual ~pvaLinkChannel();
static void scan(void* arg, epicsJobMode mode);
void open();
void put(); // begin Put op.
virtual std::string getRequesterName() { return "pvaLink"; }
virtual void message(std::string const & message, pva::MessageType messageType)
{
errlogPrintf("%s pvaLink \"%s\": %s\n",
pvd::getMessageTypeName(messageType).c_str(),
name.c_str(),
message.c_str());
}
// pvac::ClientChanel::MonitorCallback
virtual void monitorEvent(const pvac::MonitorEvent& evt) OVERRIDE FINAL;
virtual void channelCreated(const epics::pvData::Status& status, pva::Channel::shared_pointer const & channel)
{
if(!status.isSuccess()) {
errlogPrintf("pvaLink create fails %s: %s\n", name.c_str(), status.getMessage().c_str());
return;
}
Guard G(lock);
//assert(chan==channel); // may be called before createChannel() returns
chan = channel;
}
// pvac::ClientChanel::PutCallback
virtual void putBuild(const epics::pvData::StructureConstPtr& build, pvac::ClientChannel::PutCallback::Args& args) OVERRIDE FINAL;
virtual void putDone(const pvac::PutEvent& evt) OVERRIDE FINAL;
private:
virtual void run() OVERRIDE FINAL;
virtual void channelStateChange(pva::Channel::shared_pointer const & channel, pva::Channel::ConnectionState connectionState);
// ==== Treat remaining as local to run()
virtual void monitorConnect(pvd::Status const & status,
pva::Monitor::shared_pointer const & monitor,
pvd::StructureConstPtr const & structure);
virtual void monitorEvent(pva::Monitor::shared_pointer const & monitor);
virtual void unlisten(pva::Monitor::shared_pointer const & monitor)
{
// what to do??
}
std::vector<dbCommon*> scan_records;
std::vector<bool> scan_check_passive;
DBManyLock atomic_lock;
};
struct pvaLink : public jlink
struct pvaLink : public pvaLinkConfig
{
static size_t refs;
static size_t num_instances;
bool alive; // attempt to catch some use after free
DBLINK * plink; // may be NULL
unsigned linkmods;
unsigned parse_level;
std::string name, field;
const pva::Channel::shared_pointer chan;
bool alive; // attempt to catch some use after free
std::tr1::shared_ptr<pvaLinkChannel> lchan;
pvd::PVScalarPtr valueS;
pvd::PVScalarArray::shared_pointer valueA;
pvd::PVScalar::shared_pointer sevr, sec, nsec;
pvd::ScalarType etype;
bool used_scratch, used_queue;
pvd::shared_vector<const void> put_scratch, put_queue;
struct Value {
bool valid;
bool scalar;
pvd::ScalarType etype;
pvd::shared_vector<const void> valueA;
dbrbuf valueS;
epicsUInt16 sevr;
epicsTimeStamp time;
Value() :valid(false) {}
void clear() {
valid = false;
valueA.clear();
}
};
pvaLink();
virtual ~pvaLink();
Value atomcache;
// returns pvRequest to be used with monitor
pvd::PVStructurePtr makeRequest();
pvaLink()
:plink(0)
,linkmods(0)
,parse_level(0)
,alive(true)
{
epics::atomic::increment(refs);
//TODO: valgrind tells me these aren't initialized by Base, but probably should be.
parseDepth = 0;
parent = 0;
}
bool valid() const;
void open()
{
if(this->name.empty())
throw std::logic_error("open() w/o target PV name");
this->name = name;
//TODO: how to distinguish "record.FLD" from pva "channel.subfield"?
size_t dot = this->name.find_first_of('.');
if(dot!=this->name.npos) {
field = this->name.substr(dot+1);
this->name = this->name.substr(0, dot);
}
lchan = pvaGlobal->connect(this->name.c_str());
Guard G(lchan->lock);
lchan->links.insert(this);
if(lchan->lastval)
attach();
}
~pvaLink()
{
alive = false;
if(lchan) { // may be NULL if parsing fails
Guard G(lchan->lock);
detach();
lchan->links.erase(this);
if(lchan->links.empty()) {
pvaGlobal->channels.erase(lchan->name);
lchan->doClose();
}
}
epics::atomic::decrement(refs);
}
// fetch a sub-sub-field of the top monitored field.
pvd::PVField::const_shared_pointer getSubField(const char *name);
void detach()
{
valueS.reset();
valueA.reset();
sevr.reset();
sec.reset();
nsec.reset();
}
bool attach()
{
pvd::PVStructurePtr base(lchan->lastval);
if(!field.empty())
base = base->getSubField<pvd::PVStructure>(field);
if(!base) {
errlogPrintf("pvaLink not %s%c%s\n", name.c_str(), field.empty() ? ' ' : '.', field.c_str());
return false;
}
pvd::PVFieldPtr value(base->getSubField("value"));
switch(value->getField()->getType())
{
case pvd::scalar:
valueS = std::tr1::static_pointer_cast<pvd::PVScalar>(value);
etype = valueS->getScalar()->getScalarType();
break;
case pvd::scalarArray:
valueA = std::tr1::static_pointer_cast<pvd::PVScalarArray>(value);
etype = valueA->getScalarArray()->getElementType();
break;
default:
errlogPrintf("pvaLink not .value : %s%c%s\n", name.c_str(), field.empty() ? ' ' : '.', field.c_str());
return false;
}
sevr = base->getSubField<pvd::PVScalar>("alarm.severity");
sec = base->getSubField<pvd::PVScalar>("timeStamp.secondsPastEpoch");
nsec = base->getSubField<pvd::PVScalar>("timeStamp.nanoseconds");
return true;
}
void get(Value& v)
{
if(valueA) {
valueA->getAs<const void>(v.valueA);
v.etype = v.valueA.original_type();
v.scalar = false;
} else if(valueS) {
switch(etype) {
#define CASE(BASETYPE, PVATYPE, DBFTYPE, PVACODE) case pvd::pv ## PVACODE: v.valueS.dbf_##DBFTYPE = valueS->getAs<PVATYPE>(); break;
#define CASE_SQUEEZE_INT64
#include "pvatypemap.h"
#undef CASE_SQUEEZE_INT64
#undef CASE
case pvd::pvString: {
strncpy(v.valueS.dbf_STRING, valueS->getAs<std::string>().c_str(), sizeof(v.valueS.dbf_STRING));
v.valueS.dbf_STRING[sizeof(v.valueS.dbf_STRING)-1] = '\0';
}
break;
default:
throw std::runtime_error("putValue unsupported DBR code");
}
v.etype = etype;
v.scalar = true;
}
v.sevr = sevr->getAs<epicsUInt16>();
v.time.secPastEpoch = sec->getAs<epicsUInt32>()-POSIX_TIME_AT_EPICS_EPOCH;
v.time.nsec = nsec->getAs<epicsUInt32>();
v.valid = true;
}
void onDisconnect();
};

299
pdbApp/pvalink_channel.cpp Normal file
View File

@ -0,0 +1,299 @@
#include <alarm.h>
#include <pv/reftrack.h>
#define epicsExportSharedSymbols
#include <shareLib.h>
#include "pvalink.h"
namespace pvalink {
pvaGlobal_t *pvaGlobal;
pvaGlobal_t::pvaGlobal_t()
:provider_local("server:QSRV")
,provider_remote("pva")
,create(pvd::getPVDataCreate())
,queue("PVAL")
{
// worker should be above PVA worker priority?
queue.start(1, epicsThreadPriorityMedium);
}
pvaGlobal_t::~pvaGlobal_t()
{
}
size_t pvaLinkChannel::num_instances;
size_t pvaLink::num_instances;
bool pvaLinkChannel::LinkSort::operator()(const pvaLink *L, const pvaLink *R) const {
if(L->monorder==R->monorder)
return L < R;
return L->monorder < R->monorder;
}
// being called with pvaGlobal::lock held
pvaLinkChannel::pvaLinkChannel(const pvaGlobal_t::channels_key_t &key, const pvd::PVStructure::const_shared_pointer& pvRequest)
:key(key)
,pvRequest(pvRequest)
,num_disconnect(0u)
,connected(false)
,connected_latched(false)
,isatomic(false)
,queued(false)
,links_changed(false)
{}
pvaLinkChannel::~pvaLinkChannel() {
{
Guard G(pvaGlobal->lock);
pvaGlobal->channels.erase(key);
}
Guard G(lock);
assert(links.empty());
REFTRACE_DECREMENT(num_instances);
}
void pvaLinkChannel::open()
{
Guard G(lock);
try {
chan = pvaGlobal->provider_local.connect(key.first);
TRACE(<<"Local "<<key.first);
} catch(std::exception& e){
errlogPrintf("failed to find in QSRV; %s\n", key.first.c_str());
}
if(!chan) {
chan = pvaGlobal->provider_remote.connect(key.first);
TRACE(<<"Remote "<<key.first);
}
op_mon = chan.monitor(this, pvRequest);
REFTRACE_INCREMENT(num_instances);
}
// call with channel lock held
void pvaLinkChannel::put()
{
if(!connected) return;
bool doit = false;
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
if(!link->used_scratch) continue;
pvd::shared_vector<const void> temp;
temp.swap(link->put_scratch);
link->used_scratch = false;
temp.swap(link->put_queue);
link->used_queue = true;
doit = true;
}
if(doit) {
TRACE(<<"start");
// start net Put, cancels in-progress put
op_put = chan.put(this); // TODO: pvRequest
}
}
void pvaLinkChannel::putBuild(const epics::pvData::StructureConstPtr& build, pvac::ClientChannel::PutCallback::Args& args)
{
TRACE();
Guard G(lock);
pvd::PVStructurePtr top(pvaGlobal->create->createPVStructure(build));
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
if(!link->used_queue) continue;
link->used_queue = false; // clear early so unexpected exception won't get us in a retry loop
pvd::PVFieldPtr value(top->getSubField("value"));
if(!value) return; // TODO: how to signal error?
pvd::PVStringArray::const_svector choices; // TODO populate from op_mon
TRACE(<<"store "<<value->getFullName());
copyDBF2PVD(link->put_queue, value, args.tosend, choices);
link->put_queue.clear();
}
args.root = top;
}
void pvaLinkChannel::putDone(const pvac::PutEvent& evt)
{
TRACE(<<evt.event<<" "<<evt.message);
if(evt.event==pvac::PutEvent::Fail) {
errlogPrintf("%s PVA link put ERROR: %s\n", key.first.c_str(), evt.message.c_str());
}
Guard G(lock);
op_put = pvac::Operation();
if(evt.event!=pvac::PutEvent::Success) {
TRACE(<<"skip");
} else {
TRACE(<<"repeat");
put();
}
}
void pvaLinkChannel::monitorEvent(const pvac::MonitorEvent& evt)
{
bool queue = false;
{
TRACE(<<evt.event);
Guard G(lock);
switch(evt.event) {
case pvac::MonitorEvent::Disconnect:
case pvac::MonitorEvent::Data:
connected = evt.event == pvac::MonitorEvent::Data;
queue = true;
break;
case pvac::MonitorEvent::Cancel:
break; // no-op
case pvac::MonitorEvent::Fail:
connected = false;
queue = true;
errlogPrintf("%s: PVA link monitor ERROR: %s\n", chan.name().c_str(), evt.message.c_str());
break;
}
if(queued)
return; // already scheduled
queued = queue;
}
if(queue) {
pvaGlobal->queue.add(shared_from_this());
}
}
// Running from global WorkQueue thread
void pvaLinkChannel::run()
{
bool requeue = false;
{
Guard G(lock);
queued = false;
connected_latched = connected;
// pop next update from monitor queue.
// still under lock to safeguard concurrent calls to lset functions
if(connected && !op_mon.poll())
return; // monitor queue is empty, nothing more to do here
if(!connected) {
num_disconnect++;
// cancel pending put operations
op_put = pvac::Operation();
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
link->onDisconnect();
}
}
// at this point we know we will re-queue, but not immediately
// so an expected error won't get us stuck in a tight loop.
requeue = queued = true;
if(links_changed) {
// a link has been added or removed since the last update.
// rebuild our cached list of records to (maybe) process.
scan_records.clear();
scan_check_passive.clear();
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
assert(link && link->alive);
if(!link->plink) continue;
// NPP and none/Default don't scan
// PP, CP, and CPP do scan
// PP and CPP only if SCAN=Passive
if(link->pp != pvaLink::CPP && link->pp != pvaLink::CP)
continue;
scan_records.push_back(link->plink->precord);
scan_check_passive.push_back(link->pp != pvaLink::CP);
}
DBManyLock ML(scan_records);
atomic_lock.swap(ML);
links_changed = false;
}
}
if(scan_records.empty()) {
// Nothing to do, so don't bother locking
} else if(isatomic) {
DBManyLocker L(atomic_lock);
for(size_t i=0, N=scan_records.size(); i<N; i++) {
dbCommon *precord = scan_records[i];
if (precord->pact) {
if (precord->tpro)
printf("%s: Active %s\n",
epicsThreadGetNameSelf(), precord->name);
precord->rpro = TRUE;
} else if(scan_check_passive[i] && precord->scan!=0) {
continue;
}
dbProcess(precord);
}
} else {
for(size_t i=0, N=scan_records.size(); i<N; i++) {
DBScanLocker L(scan_records[i]);
if(scan_check_passive[i] && scan_records[i]->scan!=0) {
continue;
}
dbProcess(scan_records[i]);
}
}
if(requeue) {
// re-queue until monitor queue is empty
pvaGlobal->queue.add(shared_from_this());
}
}
} // namespace pvalink

276
pdbApp/pvalink_jlif.cpp Normal file
View File

@ -0,0 +1,276 @@
#include <sstream>
#define epicsExportSharedSymbols
#include <shareLib.h>
#include "pvalink.h"
namespace pvalink {
pvaLinkConfig::pvaLinkConfig()
:queueSize(4)
,pp(NPP)
,ms(NMS)
,defer(false)
,monorder(0)
{}
pvaLinkConfig::~pvaLinkConfig() {}
}
namespace {
using namespace pvalink;
/* link options.
*
* "pvname" # short-hand, sets PV name only
*
* {
* "pv":"name",
* "field":"blah.foo",
* "Q":5,
* "proc":true, // false, true, none, "CP", "CPP"
* "sevr":true, // false, true, "MSI", "MSS"
* "monorder":#,// order of processing during CP scan
* "defer":true,// whether to immediately start Put, or only queue value to be sent
* }
*/
jlink* pva_alloc_jlink(short dbr)
{
try {
TRACE();
return new pvaLink;
}catch(std::exception& e){
errlogPrintf("Error allocating pva link: %s\n", e.what());
return NULL;
}
}
#define TRY pvaLinkConfig *pvt = static_cast<pvaLinkConfig*>(pjlink); (void)pvt; try
#define CATCH(RET) catch(std::exception& e){ \
errlogPrintf("Error in %s link: %s\n", __FUNCTION__, e.what()); \
return RET; }
void pva_free_jlink(jlink *pjlink)
{
TRY {
TRACE();
delete pvt;
}catch(std::exception& e){
errlogPrintf("Error freeing pva link: %s\n", e.what());
}
}
jlif_result pva_parse_null(jlink *pjlink)
{
TRY {
TRACE(<<pvt->jkey<<" ");
if(pvt->parseDepth!=1) {
// ignore
} else if(pvt->jkey == "proc") {
pvt->pp = pvaLinkConfig::Default;
} else if(pvt->jkey == "sevr") {
pvt->ms = pvaLinkConfig::NMS;
} else if(pvt->debug) {
printf("pva link parsing unknown none depth=%u key=\"%s\"\n",
pvt->parseDepth, pvt->jkey.c_str());
}
pvt->jkey.clear();
return jlif_continue;
}CATCH(jlif_stop)
}
jlif_result pva_parse_bool(jlink *pjlink, int val)
{
TRY {
TRACE(<<pvt->jkey<<" "<<(val?"true":"false"));
if(pvt->parseDepth!=1) {
// ignore
} else if(pvt->jkey == "proc") {
pvt->pp = val ? pvaLinkConfig::PP : pvaLinkConfig::NPP;
} else if(pvt->jkey == "sevr") {
pvt->ms = val ? pvaLinkConfig::MS : pvaLinkConfig::NMS;
} else if(pvt->jkey == "defer") {
pvt->defer = !!val;
} else if(pvt->debug) {
printf("pva link parsing unknown integer depth=%u key=\"%s\" value=%s\n",
pvt->parseDepth, pvt->jkey.c_str(), val ? "true" : "false");
}
pvt->jkey.clear();
return jlif_continue;
}CATCH(jlif_stop)
}
jlif_result pva_parse_integer(jlink *pjlink, long long val)
{
TRY {
TRACE(<<pvt->jkey<<" "<<val);
if(pvt->parseDepth!=1) {
// ignore
} else if(pvt->jkey == "Q") {
pvt->queueSize = val < 1 ? 1 : size_t(val);
} else if(pvt->jkey == "monorder") {
pvt->monorder = std::max(-1024, std::min(int(val), 1024));
} else if(pvt->debug) {
printf("pva link parsing unknown integer depth=%u key=\"%s\" value=%lld\n",
pvt->parseDepth, pvt->jkey.c_str(), val);
}
pvt->jkey.clear();
return jlif_continue;
}CATCH(jlif_stop)
}
jlif_result pva_parse_string(jlink *pjlink, const char *val, size_t len)
{
TRY{
std::string sval(val, len);
TRACE(<<pvt->jkey<<" "<<sval);
if(pvt->parseDepth==0 || (pvt->parseDepth==1 && pvt->jkey=="pv")) {
pvt->channelName = sval;
} else if(pvt->parseDepth > 1) {
// ignore
} else if(pvt->jkey=="field") {
pvt->fieldName = sval;
} else if(pvt->jkey=="proc") {
if(sval=="CP") {
pvt->pp = pvaLinkConfig::CP;
} else if(sval=="CPP") {
pvt->pp = pvaLinkConfig::CPP;
} else if(sval=="PP") {
pvt->pp = pvaLinkConfig::PP;
} else if(sval=="NPP") {
pvt->pp = pvaLinkConfig::NPP;
} else if(pvt->debug) {
printf("pva link parsing unknown proc depth=%u key=\"%s\" value=\"%s\"\n",
pvt->parseDepth, pvt->jkey.c_str(), sval.c_str());
}
} else if(pvt->jkey=="sevr") {
if(sval=="MSS") {
pvt->ms = pvaLinkConfig::MSS;
} else if(sval=="MSI") {
pvt->ms = pvaLinkConfig::MSI;
} else if(pvt->debug) {
printf("pva link parsing unknown sevr depth=%u key=\"%s\" value=\"%s\"\n",
pvt->parseDepth, pvt->jkey.c_str(), sval.c_str());
}
} else if(pvt->debug) {
printf("pva link parsing unknown string depth=%u key=\"%s\" value=\"%s\"\n",
pvt->parseDepth, pvt->jkey.c_str(), sval.c_str());
}
pvt->jkey.clear();
return jlif_continue;
}CATCH(jlif_stop)
}
jlif_key_result pva_parse_start_map(jlink *pjlink)
{
TRY {
TRACE();
return jlif_key_continue;
}CATCH(jlif_key_stop)
}
jlif_result pva_parse_key_map(jlink *pjlink, const char *key, size_t len)
{
TRY {
std::string sval(key, len);
TRACE(<<sval);
pvt->jkey = sval;
return jlif_continue;
}CATCH(jlif_stop)
}
jlif_result pva_parse_end_map(jlink *pjlink)
{
TRY {
TRACE();
return jlif_continue;
}CATCH(jlif_stop)
}
struct lset* pva_get_lset(const jlink *pjlink)
{
TRACE();
return &pva_lset;
}
void pva_report(const jlink *rpjlink, int lvl, int indent)
{
const pvaLink *pval = static_cast<const pvaLink*>(rpjlink);
try {
(void)pval;
printf("%*s'pva': %s", indent, "", pval->channelName.c_str());
if(!pval->fieldName.empty())
printf("|.%s", pval->fieldName.c_str());
if(pval->lchan) {
// after open()
Guard G(pval->lchan->lock);
printf(" %sconnected", pval->lchan->connected ? "" : "dis");
if(lvl>0) {
printf(" #disconn=%zu", pval->lchan->num_disconnect);
switch(pval->pp) {
case pvaLinkConfig::NPP: printf(" NPP"); break;
case pvaLinkConfig::Default: printf(" Def"); break;
case pvaLinkConfig::PP: printf(" PP"); break;
case pvaLinkConfig::CP: printf(" CP"); break;
case pvaLinkConfig::CPP: printf(" CPP"); break;
}
switch(pval->ms) {
case pvaLinkConfig::NMS: printf(" NMS"); break;
case pvaLinkConfig::MS: printf(" MS"); break;
case pvaLinkConfig::MSS: printf(" MSS"); break;
case pvaLinkConfig::MSI: printf(" MSI"); break;
}
}
if(lvl>1) {
printf(" Q=%c",
pval->lchan->queued?'T':'F');
}
if(lvl>5) {
std::ostringstream strm;
pval->lchan->chan.show(strm);
printf("\n%*s CH: %s", indent, "", strm.str().c_str());
}
}
printf("\n");
}CATCH()
}
} //namespace
namespace pvalink {
jlif lsetPVA = {
"pva",
&pva_alloc_jlink,
&pva_free_jlink,
&pva_parse_null,
&pva_parse_bool,
&pva_parse_integer,
NULL,
&pva_parse_string,
&pva_parse_start_map,
&pva_parse_key_map,
&pva_parse_end_map,
NULL,
NULL,
NULL,
&pva_get_lset,
&pva_report,
NULL
};
} //namespace pvalink

127
pdbApp/pvalink_link.cpp Normal file
View File

@ -0,0 +1,127 @@
#include <pv/reftrack.h>
#define epicsExportSharedSymbols
#include <shareLib.h>
#include "pvalink.h"
namespace pvalink {
pvaLink::pvaLink()
:alive(true)
,plink(0)
,used_scratch(false)
,used_queue(false)
{
REFTRACE_INCREMENT(num_instances);
//TODO: valgrind tells me these aren't initialized by Base, but probably should be.
parseDepth = 0;
parent = 0;
}
pvaLink::~pvaLink()
{
alive = false;
if(lchan) { // may be NULL if parsing fails
Guard G(lchan->lock);
lchan->links.erase(this);
lchan->links_changed = true;
}
REFTRACE_DECREMENT(num_instances);
}
static
pvd::StructureConstPtr putRequestType = pvd::getFieldCreate()->createFieldBuilder()
->addNestedStructure("field")
->endNested()
->addNestedStructure("record")
->addNestedStructure("_options")
->add("block", pvd::pvBoolean)
->add("process", pvd::pvString) // "true", "false", or "passive"
->endNested()
->endNested()
->createStructure();
static
pvd::StructureConstPtr monitorRequestType = pvd::getFieldCreate()->createFieldBuilder()
->addNestedStructure("field")
->endNested()
->addNestedStructure("record")
->addNestedStructure("_options")
->add("pipeline", pvd::pvBoolean)
->add("atomic", pvd::pvBoolean)
->add("queueSize", pvd::pvUInt)
->endNested()
->endNested()
->createStructure();
pvd::PVStructurePtr pvaLink::makeRequest()
{
// const char *proc = "passive";
// switch(pp) {
// case NPP: proc = "false"; break;
// case Default: break;
// case PP:
// case CP:
// case CPP:
// proc = "true";
// }
pvd::PVStructurePtr ret;
// ret = pvd::getPVDataCreate()->createPVStructure(putRequestType);
// ret->getSubFieldT<pvd::PVBoolean>("record._options.block")->put(false); // TODO: some way to expose completion...
// ret->getSubFieldT<pvd::PVString>("record._options.process")->put(proc);
ret = pvd::getPVDataCreate()->createPVStructure(monitorRequestType);
ret->getSubFieldT<pvd::PVBoolean>("record._options.pipeline")->put(false);
ret->getSubFieldT<pvd::PVBoolean>("record._options.atomic")->put(true);
ret->getSubFieldT<pvd::PVUInt>("record._options.queueSize")->put(queueSize);
return ret;
}
// caller must lock lchan->lock
bool pvaLink::valid() const
{
return lchan->connected && lchan->op_mon.root;
}
// caller must lock lchan->lock
pvd::PVField::const_shared_pointer pvaLink::getSubField(const char *name)
{
pvd::PVField::const_shared_pointer ret;
if(valid()) {
if(fieldName.empty()) {
// we access the top level struct
ret = lchan->op_mon.root->getSubField(name);
} else {
// we access a sub-struct
ret = lchan->op_mon.root->getSubField(fieldName);
if(ret->getField()->getType()!=pvd::structure) {
// addressed sub-field isn't a sub-structure
if(strcmp(name, "value")!=0) {
// unless we are trying to fetch the "value", we fail here
ret.reset();
}
} else {
ret = static_cast<const pvd::PVStructure*>(ret.get())->getSubField(name);
}
}
}
return ret;
}
// call with channel lock held
void pvaLink::onDisconnect()
{
// TODO: option to remain queue'd while disconnected
used_queue = used_scratch = false;
}
} // namespace pvalink

390
pdbApp/pvalink_lset.cpp Normal file
View File

@ -0,0 +1,390 @@
#include <epicsString.h>
#define epicsExportSharedSymbols
#include <shareLib.h>
#include "pvalink.h"
namespace {
using namespace pvalink;
#define TRY pvaLink *self = static_cast<pvaLink*>(plink->value.json.jlink); assert(self->alive); try
#define CATCH(LOC) catch(std::exception& e) { \
errlogPrintf("pvaLink " #LOC " fails %s: %s\n", plink->precord->name, e.what()); \
}
void pvaOpenLink(DBLINK *plink)
{
try {
pvaLink* self((pvaLink*)plink->value.json.jlink);
TRACE(<<plink->precord->name<<" "<<self->channelName);
// still single threaded at this point.
// also, no pvaLinkChannel::lock yet
self->plink = plink;
if(self->channelName.empty())
return; // nothing to do...
pvd::PVStructure::const_shared_pointer pvRequest(self->makeRequest());
pvaGlobal_t::channels_key_t key;
{
std::ostringstream strm;
strm<<*pvRequest; // print the request as a convient key for our channel cache
key = std::make_pair(self->channelName, strm.str());
}
std::tr1::shared_ptr<pvaLinkChannel> chan;
bool doOpen = false;
{
Guard G(pvaGlobal->lock);
pvaGlobal_t::channels_t::iterator it(pvaGlobal->channels.find(key));
if(it!=pvaGlobal->channels.end()) {
// re-use existing channel
chan = it->second.lock();
}
if(!chan) {
// open new channel
chan.reset(new pvaLinkChannel(key, pvRequest));
pvaGlobal->channels.insert(std::make_pair(key, chan));
doOpen = true;
}
}
if(doOpen) {
chan->open(); // start subscription
}
{
Guard G(chan->lock);
chan->links.insert(self);
chan->links_changed = true;
self->lchan.swap(chan); // we are now attached
}
return;
}CATCH(pvaOpenLink)
// on error, prevent any further calls to our lset functions
plink->lset = NULL;
}
void pvaRemoveLink(struct dbLocker *locker, DBLINK *plink)
{
try {
p2p::auto_ptr<pvaLink> self((pvaLink*)plink->value.json.jlink);
TRACE(<<plink->precord->name<<" "<<self->channelName);
assert(self->alive);
Guard G(self->lchan->lock);
}CATCH(pvaRemoteLink)
}
int pvaIsConnected(const DBLINK *plink)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
Guard G(self->lchan->lock);
if(!self->valid()) return -1;
return self->valid();
}CATCH(pvaIsConnected)
return 0;
}
int pvaGetDBFtype(const DBLINK *plink)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
Guard G(self->lchan->lock);
if(!self->valid()) return -1;
// if fieldName is empty, use top struct value
// if fieldName not empty
// if sub-field is struct, use sub-struct .value
// if sub-field not struct, treat as value
pvd::PVField::const_shared_pointer value(self->getSubField("value"));
pvd::ScalarType ftype = pvd::pvInt; // default for un-mapable types.
if(!value) {
// no-op
} else if(value->getField()->getType()==pvd::scalar)
ftype = static_cast<const pvd::Scalar*>(value->getField().get())->getScalarType();
else if(value->getField()->getType()==pvd::scalarArray)
ftype = static_cast<const pvd::ScalarArray*>(value->getField().get())->getElementType();
switch(ftype) {
#define CASE(BASETYPE, PVATYPE, DBFTYPE, PVACODE) case pvd::pv##PVACODE: return DBF_##DBFTYPE;
#define CASE_REAL_INT64
#include "pv/typemap.h"
#undef CASE_REAL_INT64
#undef CASE
case pvd::pvString: return DBF_STRING; // TODO: long string?
}
}CATCH(pvaIsConnected)
return -1;
}
long pvaGetElements(const DBLINK *plink, long *nelements)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
Guard G(self->lchan->lock);
if(!self->valid()) return -1;
pvd::PVField::const_shared_pointer value(self->getSubField("value"));
if(value && value->getField()->getType()==pvd::scalarArray)
return static_cast<const pvd::PVScalarArray*>(value.get())->getLength();
else
return 0;
}CATCH(pvaIsConnected)
return -1;
}
long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer,
long *pnRequest)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
Guard G(self->lchan->lock);
pvd::PVField::const_shared_pointer value(self->getSubField("value"));
// copy from 'value' into 'pbuffer'
long status = copyPVD2DBF(value, pbuffer, dbrType, pnRequest);
if(status) return status;
if(self->ms != pvaLink::NMS) {
// TODO
}
return 0;
}CATCH(pvaIsConnected)
return -1;
}
long pvaGetControlLimits(const DBLINK *plink, double *lo, double *hi)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
Guard G(self->lchan->lock);
if(!self->valid()) return -1;
if(self->lchan->connected_latched) {
pvd::PVScalar::const_shared_pointer value;
if(lo) {
value = std::tr1::static_pointer_cast<const pvd::PVScalar>(self->getSubField("control.limitLow"));
*lo = value ? value->getAs<double>() : 0.0;
}
if(hi) {
value = std::tr1::static_pointer_cast<const pvd::PVScalar>(self->getSubField("control.limitHigh"));
*hi = value ? value->getAs<double>() : 0.0;
}
} else {
*lo = *hi = 0.0;
}
return 0;
}CATCH(pvaIsConnected)
return -1;
}
long pvaGetGraphicLimits(const DBLINK *plink, double *lo, double *hi)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
if(!self->valid()) return -1;
//Guard G(self->lchan->lock);
*lo = *hi = 0.0;
return 0;
}CATCH(pvaIsConnected)
return -1;
}
long pvaGetAlarmLimits(const DBLINK *plink, double *lolo, double *lo,
double *hi, double *hihi)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
if(!self->valid()) return -1;
//Guard G(self->lchan->lock);
*lolo = *lo = *hi = *hihi = 0.0;
return 0;
}CATCH(pvaIsConnected)
return -1;
}
long pvaGetPrecision(const DBLINK *plink, short *precision)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
if(!self->valid()) return -1;
//Guard G(self->lchan->lock);
*precision = 0;
return 0;
}CATCH(pvaIsConnected)
return -1;
}
long pvaGetUnits(const DBLINK *plink, char *units, int unitsSize)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
if(!self->valid()) return -1;
//Guard G(self->lchan->lock);
return 0;
}CATCH(pvaIsConnected)
return -1;
}
long pvaGetAlarm(const DBLINK *plink, epicsEnum16 *status,
epicsEnum16 *severity)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
if(!self->valid()) return -1;
Guard G(self->lchan->lock);
epicsEnum16 stat = NO_ALARM;
pvd::PVScalar::const_shared_pointer afld;
if(severity && (afld = std::tr1::static_pointer_cast<const pvd::PVScalar>(self->getSubField("alarm.severity")))) {
*severity = afld->getAs<pvd::uint16>();
// no direct translation for NT alarm status codes
stat = LINK_ALARM;
}
if(status) {
*status = stat;
}
return 0;
}CATCH(pvaIsConnected)
return -1;
}
long pvaGetTimeStamp(const DBLINK *plink, epicsTimeStamp *pstamp)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
if(!self->valid()) return -1;
Guard G(self->lchan->lock);
pvd::PVScalar::const_shared_pointer afld;
if(afld = std::tr1::static_pointer_cast<const pvd::PVScalar>(self->getSubField("timeStamp.secondsPastEpoch"))) {
pstamp->secPastEpoch = afld->getAs<pvd::uint32>()-POSIX_TIME_AT_EPICS_EPOCH;
} else {
return S_time_noProvider;
}
if(afld = std::tr1::static_pointer_cast<const pvd::PVScalar>(self->getSubField("timeStamp.nanoseconds"))) {
pstamp->nsec = afld->getAs<pvd::uint32>();
} else {
pstamp->nsec = 0u;
}
return 0;
}CATCH(pvaIsConnected)
return -1;
}
long pvaPutValue(DBLINK *plink, short dbrType,
const void *pbuffer, long nRequest)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
(void)self;
Guard G(self->lchan->lock);
if(nRequest < 0) return -1;
if(!self->valid()) {
// TODO: option to queue while disconnected
return -1;
}
pvd::ScalarType stype = DBR2PVD(dbrType);
pvd::shared_vector<const void> buf;
if(dbrType == DBF_STRING) {
const char *sbuffer = (const char*)pbuffer;
pvd::shared_vector<std::string> sval(nRequest);
for(long n=0; n<nRequest; n++, sbuffer += MAX_STRING_SIZE) {
sval[n] = std::string(sbuffer, epicsStrnLen(sbuffer, MAX_STRING_SIZE));
}
self->put_scratch = pvd::static_shared_vector_cast<const void>(pvd::freeze(sval));
} else {
pvd::shared_vector<void> val(pvd::ScalarTypeFunc::allocArray(stype, size_t(nRequest)));
assert(size_t(dbValueSize(dbrType)*nRequest) == val.size());
memcpy(val.data(), pbuffer, val.size());
self->put_scratch = pvd::freeze(val);
}
self->used_scratch = true;
if(!self->defer) self->lchan->put();
return 0;
}CATCH(pvaIsConnected)
return -1;
}
void pvaScanForward(DBLINK *plink)
{
TRY {
TRACE(<<plink->precord->name<<" "<<self->channelName);
Guard G(self->lchan->lock);
}CATCH(pvaIsConnected)
}
#undef TRY
#undef CATCH
} //namespace
namespace pvalink {
lset pva_lset = {
0, 1, // non-const, volatile
&pvaOpenLink,
&pvaRemoveLink,
NULL, NULL, NULL,
&pvaIsConnected,
&pvaGetDBFtype,
&pvaGetElements,
&pvaGetValue,
&pvaGetControlLimits,
&pvaGetGraphicLimits,
&pvaGetAlarmLimits,
&pvaGetPrecision,
&pvaGetUnits,
&pvaGetAlarm,
&pvaGetTimeStamp,
&pvaPutValue,
NULL,
&pvaScanForward
//&pvaReportLink,
};
} //namespace pvalink