Merge remote-tracking branch 'md/master'

* md/master: (40 commits)
  pvalink: test FWD_LINK
  QSRV: fix put w/ proc w/o values
  pvalink: add wf test case
  pvalink: defer opening links until IOC running
  update iocpvalink to test output links
  pvalink overhaul debug print controls
  pvalink misc
  pvalink minor
  add iocsh dbpvar()
  pvalink: hide QSRV channel create errors
  attempt to disable pvalink for Base <3.16
  pvalink: input proc if changed
  iocpvalink
  pvalink fix sevr and doc
  pvalink local:
  pvalink add retry: option
  pvalink Put use fieldName
  fix tpool cleanup
  testpvalink avoid false positive races
  pvalink config # of workers
  ...
This commit is contained in:
Michael Davidsaver
2018-05-28 09:08:12 -07:00
34 changed files with 3041 additions and 1031 deletions

View File

@ -1,9 +1,9 @@
# Module (source) version
EPICS_QSRV_MAJOR_VERSION = 1
EPICS_QSRV_MINOR_VERSION = 0
EPICS_QSRV_MINOR_VERSION = 1
EPICS_QSRV_MAINTENANCE_VERSION = 1
EPICS_QSRV_DEVELOPMENT_FLAG = 1
# ABI version
EPICS_QSRV_ABI_MAJOR_VERSION = 1
EPICS_QSRV_ABI_MINOR_VERSION = 0
EPICS_QSRV_ABI_MINOR_VERSION = 1

View File

@ -1,6 +1,7 @@
/**
@mainpage pva2pva Home of QSRV and pvAccess 2 pvAccess gateway
- [Source](https://github.com/epics-base/pva2pva)
- [Download](https://sourceforge.net/projects/epics-pvdata/files/)
@section qsrv QSRV
@ -10,7 +11,8 @@ runs inside an EPICS IOC process and allows clients
to make requests to access the Process Variables (PVs)
within.
Documentation of @ref qsrv_config including @ref qsrv_group_def.
Documentation of @ref qsrv_config including @ref qsrv_group_def
and @ref qsrv_link configuration.
- @ref release_notes

View File

@ -130,7 +130,7 @@ will split the nanoseconds value stored in the associated record.
The least significant # bits are stored in the 'timeStamp.userTag' field.
While the remaining 32-# bits are stored in 'timeStamp.nanoseconds' (without shifting).
For example, in the following situation 16 bits are split off.
For example, in the following situation 20 bits are split off into userTag.
If the nanoseconds part of the record timestamp is 0x12345678,
then the PVD structure would include "timeStamp.nanoseconds=0x12300000"
and "timeStamp.userTag=0x45678".
@ -141,4 +141,172 @@ record(ai, "...") {
}
@endcode
@subsection qsrv_link PVAccess Links
When built against Base >= 3.16.1, support is enabled for PVAccess links,
which are analogous to Channel Access (CA) links. However, the syntax
for PVA links is quite different.
@note The "dbjlr" and "dbpvar" IOC shell command provide information about PVA links in a running IOC.
@warning The PVA Link syntax shown below is provisional and subject to change.
A simple configuration using defaults is
@code
record(longin, "tgt") {}
record(longin, "src") {
field(INP, {pva:"tgt"})
}
@endcode
This is a shorthand for
@code
record(longin, "tgt") {}
record(longin, "src") {
field(INP, {pva:{pv:"tgt"}})
}
@endcode
Some additional keys (beyond "pv") may be used.
Defaults are shown below:
@code
record(longin, "tgt") {}
record(longin, "src") {
field(INP, {pva:{
pv:"tgt",
field:"", # may be a sub-field
local:false,# Require local PV
Q:4, # monitor queue depth
pipeline:false, # require that server uses monitor flow control protocol
proc:none, # Request record processing (side-effects).
sevr:false, # Maximize severity.
time:false, # set record time during getValue
monorder:0, # Order of record processing as a result of CP and CPP
retry:false,# allow Put while disconnected.
always:false,# CP/CPP input link process even when .value field hasn't changed
defer:false # Defer put
}})
}
@endcode
@subsubsection qsrv_link_pv pv: Target PV name
The PV name to search for.
This is the same name which could be used with 'pvget' or other client tools.
@subsubsection qsrv_link_field field: Structure field name
The name of a sub-field of the remotely provided Structure.
By default, an empty string "" uses the top-level Structure.
If the top level structure, or a sub-structure is selected, then
it is expeccted to conform to NTScalar, NTScalarArray, or NTEnum
to extract value and meta-data.
If the sub-field is an PVScalar or PVScalarArray, then a value
will be taken from it, but not meta-data will be available.
@todo Ability to traverse through unions and into structure arrays (as with group mappings).
@subsubsection qsrv_link_local local: Require local PV
When true, link will not connect unless the named PV is provided by the local (QSRV) data provider.
@subsubsection qsrv_link_Q Q: Monitor queue depth
Requests a certain monitor queue depth.
The server may, or may not, take this into consideration when selecting
a queue depth.
@subsubsection qsrv_link_pipeline pipeline: Monitor flow control
Expect that the server supports PVA monitor flow control.
If not, then the subscription will stall (ick.)
@subsubsection qsrv_link_proc proc: Request record processing (side-effects)
The meaning of this option depends on the direction of the link.
For output links, this option allows a request for remote processing (side-effects).
@li none (default) - Make no special request. Uses a server specific default.
@li false, "NPP" - Request to skip processing.
@li true, "PP" - Request to force processing.
@li "CP", "CPP" - For output links, an alias for "PP".
For input links, this option controls whether the record containing
the PVA link will be processed when subscription events are received.
@li none (default), false, "NPP" - Do not process on subscription updates.
@li true, "CP" - Always process on subscription updates.
@li "PP", "CPP" - Process on subscription updates if SCAN=Passive
@subsubsection qsrv_link_sevr sevr: Alarm propagation
This option controls whether reading a value from an input PVA link
has the addition effect of propagating any alarm via the Maximize
Severity process.
@li false - Do not maximize severity.
@li true - Maximize alarm severity
@li "MSI" - Maximize only if the remote severity is INVALID.
@subsubsection qsrv_link_time time: Time propagation
Somewhat analogous to sevr: applied to timestamp.
When true, the record TIME field is updated when the link value is read.
@warning TSEL must be set to -2 for time:true to have an effect.
@subsubsection qsrv_link_monorder monorder: Monitor processing order
When multiple record target the same target PV, and request processing
on subscription updates. This option allows the order of processing
to be specified.
Record are processed in increasing order.
monorder=-1 is processed before monorder=0.
Both are processed before monorder=1.
@subsubsection qsrv_link_defer defer: Defer put
By default (defer=false) an output link will immediately
start a PVA Put operation. defer=true will store the
new value in an internal cache, but not start a PVA Put.
This option, in combination with field: allows a single
Put to contain updates to multiple sub-fields.
@subsubsection qsrv_link_retry retry: Put while disconnected
Allow a Put operation to be queued while the link is disconnected.
The Put will be executed when the link becomes connected.
@subsubsection qsrv_link_always always: CP/CPP always process
By default (always:false) a subscription update will only cause a CP input link
to scan if the structure field (cf. field: option) is marked as changed.
Set to true to override this, and always process the link.
@subsubsection qsrv_link_sem Link semantics/behavior
This section attempts to answer some questions about how links behave in certain situations.
Links are evaluated in three basic contexts.
@li dbPutLink()/dbScanFwdLink()
@li dbGetLink() of non-CP link
@li dbGetLink() during a scan resulting from a CP link.
An input link can bring in a Value as well as meta-data, alarm, time, and display/control info.
For input links, the PVA link engine attempts to always maintain consistency between Value, alarm, and time.
However, consistency between these, and the display/control info is only ensured during a CP scan.
*/

View File

@ -0,0 +1,5 @@
TOP = ../..
include $(TOP)/configure/CONFIG
ARCH = linux-x86_64-debug
TARGETS = envPaths
include $(TOP)/configure/RULES.ioc

View File

@ -0,0 +1,5 @@
IOC for testing PVA links
Primary test is st.cmd. st2.cmd is run/stopped concurrently
to test remote connect/disconnect. st2a.cmd to test type
change.

View File

@ -0,0 +1,105 @@
record(ai, "$(P)target") {}
# local input link shorthand
record(longin, "$(P)inp1") {
field(INP , {pva:"$(P)target"})
}
# local input link longhand
record(longin, "$(P)inp2") {
field(INP , {pva:{pv:"$(P)target"}})
}
# remote input link
record(longin, "$(P)inp3") {
field(INP , {pva:"invalid:pv:name"})
}
# null like (requires local, but not local)
record(longin, "$(P)inp4") {
field(INP , {pva:{pv:"invalid:pv:name", local:true}})
}
# local input link w/ CP (process on update)
record(calc, "$(P)cnt") {
field(SCAN, "1 second")
field(INPA, "$(P)cnt")
field(INPB, "9")
field(CALC, "A<B?A+1:0")
field(HIGH, "7")
field(HSV , "MAJOR")
}
record(longin, "$(P)track") {
field(INP , {pva:{
pv:"$(P)cnt",
proc:"CP",
sevr:true,
time:true
}})
field(TSE, "-2")
}
# remote input link w/ CP (process on update)
record(longin, "$(P)track2") {
field(INP , {pva:{
pv:"$(P)cnt:rmt",
proc:"CP",
sevr:true,
time:true
}})
field(TSE, "-2")
}
# local output link
record(longout, "$(P)out1") {
field(OUT , {pva:{pv:"$(P)out:tgt"}})
field(TPRO, "1")
}
record(longout, "$(P)out:tgt") {
field(TPRO, "1")
}
# remote output link
record(longout, "$(P)out2") {
field(OUT , {pva:{pv:"$(P)out:rmt"}})
field(TPRO, "1")
}
# remote output link w/ retry (queue put while disconnected)
record(longout, "$(P)out3") {
field(OUT , {pva:{pv:"$(P)out:rmt", retry:true}})
field(TPRO, "1")
#info(base:lsetDebug, "YES")
}
# output link to waveform
record(aao, "$(P)out4:wf") {
field(FTVL, "DOUBLE")
field(NELM, "10")
field(OUT , {pva:{pv:"$(P)target:wf"}})
}
# input link to waveform
record(aao, "$(P)target:wf") {
field(FTVL, "DOUBLE")
field(NELM, "10")
}
record(aai, "$(P)inp5:wf") {
field(FTVL, "DOUBLE")
field(NELM, "10")
field(INP , {pva:{pv:"$(P)target:wf"}})
}
# test forward link
record(calc, "$(P)fcnt1") {
field(CALC, "VAL+1")
field(FLNK, {pva:{pv:"$(P)fcnt2"}})
info(base:lsetDebug, "YES")
}
record(calc, "$(P)fcnt2") {
field(CALC, "VAL+1")
}

View File

@ -0,0 +1,14 @@
# use with pvalink.db, run in a second IOC
record(calc, "$(P)cnt:rmt") {
field(SCAN, "1 second")
field(INPA, "$(P)cnt")
field(INPB, "9")
field(CALC, "A<B?A+0.1:0")
field(HIGH, "7")
field(HSV , "MAJOR")
}
record(longout, "$(P)out:rmt") {
field(TPRO, "1")
}

View File

@ -0,0 +1,18 @@
# use with pvalink.db, run in a second IOC
# use instead of pvalink2.db to cause a type change
record(calcout, "$(P)cnt:rmtx") {
field(SCAN, "1 second")
field(OUT , "$(P)cnt:rmt PP")
field(INPA, "$(P)cnt")
field(INPB, "9")
field(CALC, "A<B?A+1:0")
field(HIGH, "7")
field(HSV , "MAJOR")
}
record(longin, "$(P)cnt:rmt") {}
record(stringout, "$(P)out:rmt") {
field(TPRO, "1")
}

5
iocBoot/iocpvalink/st.cmd Executable file
View File

@ -0,0 +1,5 @@
#!../../bin/linux-x86_64-debug/softIocPVA
dbLoadRecords("pvalink.db","P=TST:")
iocInit()

5
iocBoot/iocpvalink/st2.cmd Executable file
View File

@ -0,0 +1,5 @@
#!../../bin/linux-x86_64-debug/softIocPVA
dbLoadRecords("pvalink2.db","P=TST:")
iocInit()

5
iocBoot/iocpvalink/st2a.cmd Executable file
View File

@ -0,0 +1,5 @@
#!../../bin/linux-x86_64-debug/softIocPVA
dbLoadRecords("pvalink2a.db","P=TST:")
iocInit()

View File

@ -5,23 +5,53 @@ include $(TOP)/configure/CONFIG
# ADD MACRO DEFINITIONS AFTER THIS LINE
#=============================
ifeq ($(OS_CLASS),RTEMS)
# set by Base in configure/os/CONFIG.UnixCommon.Common
# then clobbered by included RTEMS make/host.cfg
# to drop the '-p'
MKDIR = mkdir -p
endif
LIBRARY += qsrv
SHRLIB_VERSION ?= $(EPICS_QSRV_ABI_MAJOR_VERSION).$(EPICS_QSRV_ABI_MINOR_VERSION)
USR_CPPFLAGS += -I$(TOP)/common -I$(TOP)/p2pApp
INC += pv/qsrv.h
INC += pv/qsrvVersionNum.h
EXPANDVARS += EPICS_QSRV_MAJOR_VERSION
EXPANDVARS += EPICS_QSRV_MINOR_VERSION
EXPANDVARS += EPICS_QSRV_MAINTENANCE_VERSION
EXPANDVARS += EPICS_QSRV_DEVELOPMENT_FLAG
EXPANDVARS += EPICS_QSRV_ABI_MAJOR_VERSION
EXPANDVARS += EPICS_QSRV_ABI_MINOR_VERSION
EXPANDFLAGS += $(foreach var,$(EXPANDVARS),-D$(var)="$(strip $($(var)))")
qsrv_SRCS += pvif.cpp
qsrv_SRCS += qsrv.cpp
qsrv_SRCS += pdb.cpp
qsrv_SRCS += pdbsingle.cpp
#qsrv_SRCS += pvalink.cpp
qsrv_SRCS += demo.cpp
qsrv_SRCS += imagedemo.c
ifdef BASE_3_16
qsrv_SRCS += pdbgroup.cpp
qsrv_SRCS += configparse.cpp
qsrv_SRCS += dbf_copy.cpp
qsrv_SRCS += tpool.cpp
qsrv_SRCS += pvalink.cpp
qsrv_SRCS += pvalink_lset.cpp
qsrv_SRCS += pvalink_jlif.cpp
qsrv_SRCS += pvalink_link.cpp
qsrv_SRCS += pvalink_channel.cpp
else
qsrv_SRCS += pvalink_null.cpp
endif
qsrv_LIBS += pvAccess pvData
@ -53,3 +83,21 @@ include $(TOP)/configure/RULES
#----------------------------------------
# ADD RULES AFTER THIS LINE
# Can't use EXPAND as generated headers must appear
# in O.Common, but EXPAND emits rules for O.$(T_A)
../O.Common/pv/qsrvVersionNum.h: ../pv/qsrvVersionNum.h@
$(MKDIR) $(COMMON_DIR)/pv
$(EXPAND_TOOL) $(EXPANDFLAGS) $($@_EXPANDFLAGS) $< $@
qsrv$(DEP): ../O.Common/pv/qsrvVersionNum.h
pvalink$(DEP): ../O.Common/pv/qsrvVersionNum.h
ifdef BASE_3_16
../O.Common/qsrv.dbd: ../qsrv-new.dbd
$(CP) $< $@
else
../O.Common/qsrv.dbd: ../qsrv-old.dbd
$(CP) $< $@
endif
../O.Common/softIocPVA.dbd: ../O.Common/qsrv.dbd

244
pdbApp/dbf_copy.cpp Normal file
View File

@ -0,0 +1,244 @@
#include <epicsStdio.h>
#include <dbAccess.h>
#include <dbChannel.h>
#include <dbStaticLib.h>
#include <dbLock.h>
#include <dbEvent.h>
#include <epicsString.h>
#include <epicsVersion.h>
#include <pv/status.h>
#include <pv/bitSet.h>
#include <pv/pvData.h>
#include <pv/anyscalar.h>
#define epicsExportSharedSymbols
#include "pvif.h"
namespace pvd = epics::pvData;
// note that we handle DBF_ENUM differently than in pvif.cpp
static
pvd::ScalarType DBR2PVD(short dbr)
{
switch(dbr) {
#define CASE(BASETYPE, PVATYPE, DBFTYPE, PVACODE) case DBR_##DBFTYPE: return pvd::pv##PVACODE;
#define CASE_SKIP_BOOL
#include "pv/typemap.h"
#undef CASE_SKIP_BOOL
#undef CASE
case DBF_ENUM: return pvd::pvUShort;
case DBF_STRING: return pvd::pvString;
}
throw std::invalid_argument("Unsupported DBR code");
}
long copyPVD2DBF(const pvd::PVField::const_shared_pointer& inraw,
void *outbuf, short outdbf, long *outnReq)
{
long nreq = outnReq ? *outnReq : 1;
if(!inraw || nreq <= 0 || INVALID_DB_REQ(outdbf)) return S_db_errArg;
pvd::ScalarType outpvd = DBR2PVD(outdbf);
pvd::PVField::const_shared_pointer in(inraw);
if(outdbf != DBF_STRING && in->getField()->getType() == pvd::structure) {
// assume NTEnum.
// index to string not requested, so attempt to treat .index as plain integer
in = static_cast<const pvd::PVStructure*>(in.get())->getSubField("index");
if(!in) return S_db_errArg;
}
if(in->getField()->getType() == pvd::structure) {
assert(outdbf == DBF_STRING);
char *outsbuf = (char*)outbuf;
// maybe NTEnum
// try index -> string
const pvd::PVStructure* sin = static_cast<const pvd::PVStructure*>(in.get());
pvd::PVScalar::const_shared_pointer index(sin->getSubField<pvd::PVScalar>("index"));
if(!index) return S_db_badField; // Not NTEnum, don't know how to handle...
// we will have an answer.
if(outnReq)
*outnReq = 1;
pvd::uint16 ival = index->getAs<pvd::uint16>();
pvd::PVStringArray::const_shared_pointer choices(sin->getSubField<pvd::PVStringArray>("choices"));
if(choices) {
pvd::PVStringArray::const_svector strs(choices->view());
if(ival < strs.size()) {
// found it!
const std::string& sval = strs[ival];
size_t slen = std::min(sval.size(), size_t(MAX_STRING_SIZE-1));
memcpy(outbuf, sval.c_str(), slen);
outsbuf[slen] = '\0';
return 0;
}
}
// didn't find it. either no choices or index is out of range
// print numeric index
epicsSnprintf(outsbuf, MAX_STRING_SIZE, "%u", ival);
return 0;
} else if(in->getField()->getType() == pvd::scalarArray) {
const pvd::PVScalarArray* sarr = static_cast<const pvd::PVScalarArray*>(in.get());
pvd::shared_vector<const void> arr;
sarr->getAs(arr);
size_t elemsize = pvd::ScalarTypeFunc::elementSize(arr.original_type());
arr.slice(0, nreq*elemsize);
nreq = arr.size()/elemsize;
if(outdbf == DBF_STRING) {
char *outsbuf = (char*)outbuf;
// allocate a temp buffer of string[], ick...
pvd::shared_vector<std::string> strs(nreq); // alloc
pvd::castUnsafeV(nreq, pvd::pvString, strs.data(), arr.original_type(), arr.data());
for(long i =0; i<nreq; i++, outsbuf += MAX_STRING_SIZE) {
size_t slen = std::min(strs[i].size(), size_t(MAX_STRING_SIZE-1));
memcpy(outsbuf, strs[i].c_str(), slen);
outsbuf[slen] = '\0';
}
} else {
pvd::castUnsafeV(nreq, outpvd, outbuf, arr.original_type(), arr.data());
}
if(outnReq)
*outnReq = nreq;
return 0;
} else if(in->getField()->getType() == pvd::scalar) {
char *outsbuf = (char*)outbuf;
const pvd::PVScalar* sval = static_cast<const pvd::PVScalar*>(in.get());
pvd::AnyScalar val;
sval->getAs(val);
if(outdbf == DBF_STRING && val.type()==pvd::pvString) {
// std::string to char*
size_t len = std::min(val.as<std::string>().size(), size_t(MAX_STRING_SIZE-1));
memcpy(outbuf, val.as<std::string>().c_str(), len);
outsbuf[len] = '\0';
} else if(outdbf == DBF_STRING) {
// non-string to char*
std::string temp;
pvd::castUnsafeV(1, pvd::pvString, &temp, val.type(), val.unsafe());
size_t len = std::min(temp.size(), size_t(MAX_STRING_SIZE-1));
memcpy(outbuf, temp.c_str(), len);
outsbuf[len] = '\0';
} else {
// non-string to any
pvd::castUnsafeV(1, outpvd, outbuf, val.type(), val.unsafe());
}
if(outnReq)
*outnReq = 1;
return 0;
} else {
// struct array or other strangeness which I don't know how to handle
return S_dbLib_badField;
}
}
long copyDBF2PVD(const pvd::shared_vector<const void> &inbuf,
const pvd::PVField::shared_pointer& outraw,
pvd::BitSet& changed,
const pvd::PVStringArray::const_svector &choices)
{
pvd::ScalarType inpvd = inbuf.original_type();
size_t incnt = inbuf.size()/pvd::ScalarTypeFunc::elementSize(inpvd);
if(!outraw) return S_db_errArg;
pvd::PVField::shared_pointer out(outraw);
if(inpvd != pvd::pvString && out->getField()->getType() == pvd::structure) {
// assume NTEnum.
// string to index not requested, so attempt to treat .index as plain integer
out = static_cast<pvd::PVStructure*>(out.get())->getSubField("index");
if(!out) return S_db_errArg;
}
if(out->getField()->getType() == pvd::structure) {
assert(inpvd == pvd::pvString);
if(incnt==0)
return S_db_errArg; // Need at least one string
const pvd::shared_vector<const std::string> insbuf(pvd::static_shared_vector_cast<const std::string>(inbuf));
const std::string& instr(insbuf[0]);
// assume NTEnum
// try string to index, then parse
pvd::PVStructure* sout = static_cast<pvd::PVStructure*>(out.get());
pvd::PVScalar::shared_pointer index(sout->getSubField<pvd::PVScalar>("index"));
if(!index) return S_db_badField; // Not NTEnum, don't know how to handle...
pvd::uint16 result = pvd::uint16(-1);
bool match = false;
for(size_t i=0, N=std::min(size_t(0xffff), choices.size()); i<N; i++) {
if(choices[i] == instr) {
match = true;
result = pvd::uint16(i);
}
}
if(!match) {
// no choice string matched, so try to parse as integer
try{
result = pvd::castUnsafe<pvd::uint16>(instr);
}catch(std::exception&){
return S_db_errArg;
}
}
index->putFrom(result);
out = index;
} else if(out->getField()->getType() == pvd::scalarArray) {
pvd::PVScalarArray* sarr = static_cast<pvd::PVScalarArray*>(out.get());
sarr->putFrom(inbuf);
} else if(out->getField()->getType() == pvd::scalar) {
pvd::PVScalar* sval = static_cast<pvd::PVScalar*>(out.get());
if(incnt==0) return S_db_errArg;
pvd::AnyScalar val(inpvd, inbuf.data());
sval->putFrom(val);
} else {
// struct array or other strangeness which I don't know how to handle
return S_db_badField;
}
changed.set(out->getFieldOffset());
return 0;
}

64
pdbApp/pv/qsrv.h Normal file
View File

@ -0,0 +1,64 @@
#ifndef PV_QSRV_H
#define PV_QSRV_H
#include <epicsVersion.h>
#include <shareLib.h>
#ifndef VERSION_INT
# define VERSION_INT(V,R,M,P) ( ((V)<<24) | ((R)<<16) | ((M)<<8) | (P))
#endif
/* generated header with EPICS_QSRV_*_VERSION macros */
# include <pv/qsrvVersionNum.h>
#define QSRV_VERSION_INT VERSION_INT(EPICS_QSRV_MAJOR_VERSION, EPICS_QSRV_MINOR_VERSION, EPICS_QSRV_MAINTENANCE_VERSION, !(EPICS_QSRV_DEVELOPMENT_FLAG))
#define QSRV_ABI_VERSION_INT VERSION_INT(EPICS_QSRV_ABI_MAJOR_VERSION, EPICS_QSRV_ABI_MINOR_VERSION, 0, 0)
#ifdef __cplusplus
extern "C" {
#endif
struct link; /* aka. DBLINK from link.h */
/** returns QSRV_VERSION_INT captured at compilation time */
epicsShareExtern unsigned qsrvVersion(void);
/** returns QSRV_ABI_VERSION_INT captured at compilation time */
epicsShareExtern unsigned qsrvABIVersion(void);
epicsShareFunc void testqsrvWaitForLinkEvent(struct link *plink);
/** Call before testIocShutdownOk()
@code
testdbPrepare();
...
testIocInitOk();
...
testqsrvShutdownOk();
testIocShutdownOk();
testqsrvCleanup();
testdbCleanup();
@endcode
*/
epicsShareExtern void testqsrvShutdownOk(void);
/** Call after testIocShutdownOk() and before testdbCleanup()
@code
testdbPrepare();
...
testIocInitOk();
...
testqsrvShutdownOk();
testIocShutdownOk();
testqsrvCleanup();
testdbCleanup();
@endcode
*/
epicsShareExtern void testqsrvCleanup(void);
#ifdef __cplusplus
}
#endif
#endif /* PV_QSRV_H */

View File

@ -0,0 +1,9 @@
#ifndef PV_QSRV_H
# error qsrvVersionNum.h should never be included directly. Include <pv/qsrv.h>
#endif
#define EPICS_QSRV_MAJOR_VERSION @EPICS_QSRV_MAJOR_VERSION@
#define EPICS_QSRV_MINOR_VERSION @EPICS_QSRV_MINOR_VERSION@
#define EPICS_QSRV_MAINTENANCE_VERSION @EPICS_QSRV_MAINTENANCE_VERSION@
#define EPICS_QSRV_DEVELOPMENT_FLAG @EPICS_QSRV_DEVELOPMENT_FLAG@
#define EPICS_QSRV_ABI_MAJOR_VERSION @EPICS_QSRV_ABI_MAJOR_VERSION@
#define EPICS_QSRV_ABI_MINOR_VERSION @EPICS_QSRV_ABI_MINOR_VERSION@

File diff suppressed because it is too large Load Diff

View File

@ -16,19 +16,33 @@
#include <alarm.h>
#include <epicsExit.h>
#include <epicsAtomic.h>
#include <epicsThreadPool.h>
#include <link.h>
#include <dbJLink.h>
#include <pv/pvAccess.h>
#include <pv/clientFactory.h>
#include <pva/client.h>
#include <pv/anyscalar.h>
#include <pv/thread.h>
#include <pv/lock.h>
#include <pv/iocshelper.h>
#include "helper.h"
#include "iocshelper.h"
#include "pvif.h"
#include "tpool.h"
extern int pvaLinkDebug;
extern int pvaLinkIsolate;
extern "C" {
epicsShareExtern int pvaLinkDebug;
epicsShareExtern int pvaLinkIsolate;
epicsShareExtern int pvaLinkNWorkers;
}
#if 0
# define TRACE(X) std::cerr<<"PVAL "<<__func__<<" " X <<"\n"
#else
# define TRACE(X) do {} while(0)
#endif
// pvaLink and pvaLinkChannel have ->debug
#define DEBUG(OBJ, X) do{ if((OBJ)->debug) std::cout X<<"\n"; }while(0)
namespace pvalink {
@ -41,289 +55,173 @@ typedef epicsGuardRelease<pvd::Mutex> UnGuard;
struct pvaLink;
struct pvaLinkChannel;
struct pvaGlobal_t {
pva::ChannelProvider::shared_pointer provider;
extern lset pva_lset;
extern jlif lsetPVA;
pvd::StructureConstPtr reqtype;
pvd::PVDataCreatePtr create;
struct pvaLinkConfig : public jlink
{
// configuration, output of jlif parsing
//! Channel (aka PV) name string
std::string channelName;
//! sub-field within addressed PVStructure
std::string fieldName;
size_t queueSize;
enum pp_t {
NPP,
Default, // for put() only. For monitor, treated as NPP
PP, // for put() only, For monitor, treated as NPP
CP, // for monitor only, put treats as pp
CPP, // for monitor only, put treats as pp
} pp;
enum ms_t {
NMS,
MS,
MSI,
} ms;
bool defer, pipeline, time, retry, local, always;
int monorder;
// internals used by jlif parsing
std::string jkey;
pvaLinkConfig();
virtual ~pvaLinkConfig();
};
struct pvaGlobal_t {
pvac::ClientProvider provider_local,
provider_remote;
const pvd::PVDataCreatePtr create;
WorkQueue queue;
pvd::Mutex lock;
struct Scan {
// the PVA channel which triggered this scan
std::tr1::weak_ptr<pvaLinkChannel> chan;
bool usecached;
Scan() :usecached(false) {}
};
bool running; // set after dbEvent is initialized and safe to use
epicsThreadPrivate<Scan> scanmagic;
epicsThreadPool *scanpool;
typedef std::map<std::string, std::tr1::shared_ptr<pvaLinkChannel> > channels_t;
// a tuple of channel name and printed pvRequest (or Monitor)
typedef std::pair<std::string, std::string> channels_key_t;
// pvaLinkChannel dtor prunes dead entires
typedef std::map<channels_key_t, std::tr1::weak_ptr<pvaLinkChannel> > channels_t;
// Cache of active Channels (really about caching Monitor)
channels_t channels;
std::tr1::shared_ptr<pvaLinkChannel> connect(const char *name);
pvaGlobal_t();
~pvaGlobal_t()
{
provider->destroy();
epicsThreadPoolDestroy(scanpool);
}
~pvaGlobal_t();
};
extern pvaGlobal_t *pvaGlobal;
struct pvaLinkChannel : public pva::ChannelRequester, pva::MonitorRequester,
std::tr1::enable_shared_from_this<pvaLinkChannel>
struct pvaLinkChannel : public pvac::ClientChannel::MonitorCallback,
public pvac::ClientChannel::PutCallback,
public epicsThreadRunable,
public std::tr1::enable_shared_from_this<pvaLinkChannel>
{
const std::string name;
const pvaGlobal_t::channels_key_t key; // tuple of (channelName, pvRequest key)
const pvd::PVStructure::const_shared_pointer pvRequest; // used with monitor
static size_t refs;
typedef std::set<pvaLink*> links_t;
links_t links;
static size_t num_instances;
pvd::Mutex lock;
epicsEvent run_done; // used by testing code
pva::Channel::shared_pointer chan;
pvac::ClientChannel chan;
pvac::Monitor op_mon;
pvac::Operation op_put;
pva::Monitor::shared_pointer chanmon;
//pva::ChannelPut::shared_pointer chanput;
std::string providerName;
size_t num_disconnect, num_type_change;
bool connected;
bool connected_latched; // connection status at the run()
bool isatomic;
bool queued; // added to WorkQueue
bool debug; // set if any jlink::debug is set
std::tr1::shared_ptr<const void> previous_root;
pvd::PVStructurePtr lastval;
pvd::PVScalarPtr isatomic;
struct LinkSort {
bool operator()(const pvaLink *L, const pvaLink *R) const;
};
epicsJob *scanjob;
std::tr1::shared_ptr<pvaLinkChannel> scanself; // create ref loop while scan is queued
bool scanatomic;
typedef std::set<pvaLink*, LinkSort> links_t;
pvaLinkChannel(const char *name)
:name(name)
,scanjob(epicsJobCreate(pvaGlobal->scanpool, &pvaLinkChannel::scan, this))
,scanatomic(false)
{
if(!scanjob)
throw std::runtime_error("failed to create job for pvaLink");
epics::atomic::increment(refs);
}
virtual ~pvaLinkChannel() {
Guard G(lock);
assert(links.empty());
epicsJobDestroy(scanjob);
scanjob = NULL;
epics::atomic::decrement(refs);
std::cerr<<"pvaLinkChannel: destroy "<<name<<"\n";
}
// list of currently attached links. maintained by pvaLink ctor/dtor
// TODO: sort by PHAS
links_t links;
void doConnect() {
// TODO: local PVA?
Guard G(lock);
chan = pvaGlobal->provider->createChannel(name, shared_from_this());
channelStateChange(chan, chan->getConnectionState());
}
void doClose() {
Guard G(lock);
errlogPrintf("pvaLink closing %s\n", name.c_str());
channelStateChange(chan, pva::Channel::DESTROYED);
chan->destroy();
chan.reset();
std::cerr<<"pvaLink: channel destroy "<<name<<"\n";
}
// set when 'links' is modified to trigger re-compute of record scan list
bool links_changed;
void triggerProc(bool atomic=false, bool force=false);
pvaLinkChannel(const pvaGlobal_t::channels_key_t& key, const epics::pvData::PVStructure::const_shared_pointer &pvRequest);
virtual ~pvaLinkChannel();
static void scan(void* arg, epicsJobMode mode);
void open();
void put(bool force=false); // begin Put op.
virtual std::string getRequesterName() { return "pvaLink"; }
virtual void message(std::string const & message, pva::MessageType messageType)
{
errlogPrintf("%s pvaLink \"%s\": %s\n",
pvd::getMessageTypeName(messageType).c_str(),
name.c_str(),
message.c_str());
}
// pvac::ClientChanel::MonitorCallback
virtual void monitorEvent(const pvac::MonitorEvent& evt) OVERRIDE FINAL;
virtual void channelCreated(const epics::pvData::Status& status, pva::Channel::shared_pointer const & channel)
{
if(!status.isSuccess()) {
errlogPrintf("pvaLink create fails %s: %s\n", name.c_str(), status.getMessage().c_str());
return;
}
Guard G(lock);
//assert(chan==channel); // may be called before createChannel() returns
chan = channel;
}
// pvac::ClientChanel::PutCallback
virtual void putBuild(const epics::pvData::StructureConstPtr& build, pvac::ClientChannel::PutCallback::Args& args) OVERRIDE FINAL;
virtual void putDone(const pvac::PutEvent& evt) OVERRIDE FINAL;
private:
virtual void run() OVERRIDE FINAL;
void run_dbProcess(size_t idx); // idx is index in scan_records
virtual void channelStateChange(pva::Channel::shared_pointer const & channel, pva::Channel::ConnectionState connectionState);
// ==== Treat remaining as local to run()
virtual void monitorConnect(pvd::Status const & status,
pva::Monitor::shared_pointer const & monitor,
pvd::StructureConstPtr const & structure);
virtual void monitorEvent(pva::Monitor::shared_pointer const & monitor);
virtual void unlisten(pva::Monitor::shared_pointer const & monitor)
{
// what to do??
}
std::vector<dbCommon*> scan_records;
std::vector<bool> scan_check_passive;
std::vector<epics::pvData::BitSet> scan_changed;
DBManyLock atomic_lock;
};
struct pvaLink : public jlink
struct pvaLink : public pvaLinkConfig
{
static size_t refs;
static size_t num_instances;
bool alive; // attempt to catch some use after free
DBLINK * plink; // may be NULL
unsigned linkmods;
unsigned parse_level;
std::string name, field;
const pva::Channel::shared_pointer chan;
bool alive; // attempt to catch some use after free
std::tr1::shared_ptr<pvaLinkChannel> lchan;
pvd::PVScalarPtr valueS;
pvd::PVScalarArray::shared_pointer valueA;
pvd::PVScalar::shared_pointer sevr, sec, nsec;
pvd::ScalarType etype;
bool used_scratch, used_queue;
pvd::shared_vector<const void> put_scratch, put_queue;
struct Value {
bool valid;
bool scalar;
pvd::ScalarType etype;
pvd::shared_vector<const void> valueA;
dbrbuf valueS;
epicsUInt16 sevr;
epicsTimeStamp time;
Value() :valid(false) {}
void clear() {
valid = false;
valueA.clear();
}
};
// cached fields from channel op_mon
// updated in onTypeChange()
epics::pvData::PVField::const_shared_pointer fld_value;
epics::pvData::PVScalar::const_shared_pointer fld_severity,
fld_seconds,
fld_nanoseconds;
epics::pvData::PVStructure::const_shared_pointer fld_display,
fld_control,
fld_valueAlarm;
epics::pvData::BitSet proc_changed;
Value atomcache;
// cached snapshot of alarm and timestamp
// captured in pvaGetValue().
// we choose not to ensure consistency with display/control meta-data
epicsTimeStamp snap_time;
short snap_severity;
pvaLink()
:plink(0)
,linkmods(0)
,parse_level(0)
,alive(true)
{
epics::atomic::increment(refs);
//TODO: valgrind tells me these aren't initialized by Base, but probably should be.
parseDepth = 0;
parent = 0;
}
pvaLink();
virtual ~pvaLink();
void open()
{
if(this->name.empty())
throw std::logic_error("open() w/o target PV name");
this->name = name;
//TODO: how to distinguish "record.FLD" from pva "channel.subfield"?
size_t dot = this->name.find_first_of('.');
if(dot!=this->name.npos) {
field = this->name.substr(dot+1);
this->name = this->name.substr(0, dot);
}
lchan = pvaGlobal->connect(this->name.c_str());
Guard G(lchan->lock);
lchan->links.insert(this);
if(lchan->lastval)
attach();
}
~pvaLink()
{
alive = false;
if(lchan) { // may be NULL if parsing fails
Guard G(lchan->lock);
detach();
lchan->links.erase(this);
if(lchan->links.empty()) {
pvaGlobal->channels.erase(lchan->name);
lchan->doClose();
}
}
epics::atomic::decrement(refs);
}
// returns pvRequest to be used with monitor
pvd::PVStructurePtr makeRequest();
void detach()
{
valueS.reset();
valueA.reset();
sevr.reset();
sec.reset();
nsec.reset();
}
bool valid() const;
bool attach()
{
pvd::PVStructurePtr base(lchan->lastval);
// fetch a sub-sub-field of the top monitored field.
pvd::PVField::const_shared_pointer getSubField(const char *name);
if(!field.empty())
base = base->getSubField<pvd::PVStructure>(field);
if(!base) {
errlogPrintf("pvaLink not %s%c%s\n", name.c_str(), field.empty() ? ' ' : '.', field.c_str());
return false;
}
pvd::PVFieldPtr value(base->getSubField("value"));
switch(value->getField()->getType())
{
case pvd::scalar:
valueS = std::tr1::static_pointer_cast<pvd::PVScalar>(value);
etype = valueS->getScalar()->getScalarType();
break;
case pvd::scalarArray:
valueA = std::tr1::static_pointer_cast<pvd::PVScalarArray>(value);
etype = valueA->getScalarArray()->getElementType();
break;
default:
errlogPrintf("pvaLink not .value : %s%c%s\n", name.c_str(), field.empty() ? ' ' : '.', field.c_str());
return false;
}
sevr = base->getSubField<pvd::PVScalar>("alarm.severity");
sec = base->getSubField<pvd::PVScalar>("timeStamp.secondsPastEpoch");
nsec = base->getSubField<pvd::PVScalar>("timeStamp.nanoseconds");
return true;
}
void get(Value& v)
{
if(valueA) {
valueA->getAs<const void>(v.valueA);
v.etype = v.valueA.original_type();
v.scalar = false;
} else if(valueS) {
switch(etype) {
#define CASE(BASETYPE, PVATYPE, DBFTYPE, PVACODE) case pvd::pv ## PVACODE: v.valueS.dbf_##DBFTYPE = valueS->getAs<PVATYPE>(); break;
#define CASE_SQUEEZE_INT64
#include "pvatypemap.h"
#undef CASE_SQUEEZE_INT64
#undef CASE
case pvd::pvString: {
strncpy(v.valueS.dbf_STRING, valueS->getAs<std::string>().c_str(), sizeof(v.valueS.dbf_STRING));
v.valueS.dbf_STRING[sizeof(v.valueS.dbf_STRING)-1] = '\0';
}
break;
default:
throw std::runtime_error("putValue unsupported DBR code");
}
v.etype = etype;
v.scalar = true;
}
v.sevr = sevr->getAs<epicsUInt16>();
v.time.secPastEpoch = sec->getAs<epicsUInt32>()-POSIX_TIME_AT_EPICS_EPOCH;
v.time.nsec = nsec->getAs<epicsUInt32>();
v.valid = true;
}
void onDisconnect();
void onTypeChange();
};

387
pdbApp/pvalink_channel.cpp Normal file
View File

@ -0,0 +1,387 @@
#include <alarm.h>
#include <pv/reftrack.h>
#define epicsExportSharedSymbols
#include <shareLib.h>
#include "pvalink.h"
int pvaLinkNWorkers = 1;
namespace pvalink {
pvaGlobal_t *pvaGlobal;
pvaGlobal_t::pvaGlobal_t()
:provider_local("server:QSRV")
,provider_remote("pva")
,create(pvd::getPVDataCreate())
,queue("PVAL")
,running(false)
{
// worker should be above PVA worker priority?
queue.start(std::max(1, pvaLinkNWorkers), epicsThreadPriorityMedium);
}
pvaGlobal_t::~pvaGlobal_t()
{
}
size_t pvaLinkChannel::num_instances;
size_t pvaLink::num_instances;
bool pvaLinkChannel::LinkSort::operator()(const pvaLink *L, const pvaLink *R) const {
if(L->monorder==R->monorder)
return L < R;
return L->monorder < R->monorder;
}
// being called with pvaGlobal::lock held
pvaLinkChannel::pvaLinkChannel(const pvaGlobal_t::channels_key_t &key, const pvd::PVStructure::const_shared_pointer& pvRequest)
:key(key)
,pvRequest(pvRequest)
,num_disconnect(0u)
,num_type_change(0u)
,connected(false)
,connected_latched(false)
,isatomic(false)
,queued(false)
,debug(false)
,links_changed(false)
{}
pvaLinkChannel::~pvaLinkChannel() {
{
Guard G(pvaGlobal->lock);
pvaGlobal->channels.erase(key);
}
Guard G(lock);
assert(links.empty());
REFTRACE_DECREMENT(num_instances);
}
void pvaLinkChannel::open()
{
Guard G(lock);
try {
chan = pvaGlobal->provider_local.connect(key.first);
DEBUG(this, <<key.first<<" OPEN Local");
providerName = pvaGlobal->provider_local.name();
} catch(std::exception& e){
// The PDBProvider doesn't have a way to communicate to us
// whether this is an invalid record or group name,
// or if this is some sort of internal error.
// So we are forced to assume it is an invalid name.
DEBUG(this, <<key.first<<" OPEN Not local "<<e.what());
}
if(!pvaLinkIsolate && !chan) {
chan = pvaGlobal->provider_remote.connect(key.first);
DEBUG(this, <<key.first<<" OPEN Remote ");
providerName = pvaGlobal->provider_remote.name();
}
op_mon = chan.monitor(this, pvRequest);
REFTRACE_INCREMENT(num_instances);
}
static
pvd::StructureConstPtr putRequestType = pvd::getFieldCreate()->createFieldBuilder()
->addNestedStructure("field")
->endNested()
->addNestedStructure("record")
->addNestedStructure("_options")
->add("block", pvd::pvBoolean)
->add("process", pvd::pvString) // "true", "false", or "passive"
->endNested()
->endNested()
->createStructure();
// call with channel lock held
void pvaLinkChannel::put(bool force)
{
pvd::PVStructurePtr pvReq(pvd::getPVDataCreate()->createPVStructure(putRequestType));
pvReq->getSubFieldT<pvd::PVBoolean>("record._options.block")->put(false); // TODO: some way to expose completion...
unsigned reqProcess = 0;
bool doit = force;
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
if(!link->used_scratch) continue;
pvd::shared_vector<const void> temp;
temp.swap(link->put_scratch);
link->used_scratch = false;
temp.swap(link->put_queue);
link->used_queue = true;
doit = true;
switch(link->pp) {
case pvaLink::NPP:
reqProcess |= 1;
break;
case pvaLink::Default:
break;
case pvaLink::PP:
case pvaLink::CP:
case pvaLink::CPP:
reqProcess |= 2;
break;
}
}
/* By default, use remote default (passive).
* Request processing, or not, if any link asks.
* Prefer PP over NPP if both are specified.
*
* TODO: per field granularity?
*/
const char *proc = "passive";
if((reqProcess&2) || force) {
proc = "true";
} else if(reqProcess&1) {
proc = "false";
}
pvReq->getSubFieldT<pvd::PVString>("record._options.process")->put(proc);
DEBUG(this, <<key.first<<"Start put "<<doit);
if(doit) {
// start net Put, cancels in-progress put
op_put = chan.put(this, pvReq);
}
}
void pvaLinkChannel::putBuild(const epics::pvData::StructureConstPtr& build, pvac::ClientChannel::PutCallback::Args& args)
{
Guard G(lock);
pvd::PVStructurePtr top(pvaGlobal->create->createPVStructure(build));
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
if(!link->used_queue) continue;
link->used_queue = false; // clear early so unexpected exception won't get us in a retry loop
pvd::PVFieldPtr value(link->fieldName.empty() ? pvd::PVFieldPtr(top) : top->getSubField(link->fieldName));
if(value && value->getField()->getType()==pvd::structure) {
// maybe drill into NTScalar et al.
pvd::PVFieldPtr sub(static_cast<pvd::PVStructure*>(value.get())->getSubField("value"));
if(sub)
value.swap(sub);
}
if(!value) continue; // TODO: how to signal error?
pvd::PVStringArray::const_svector choices; // TODO populate from op_mon
DEBUG(this, <<key.first<<" <- "<<value->getFullName());
copyDBF2PVD(link->put_queue, value, args.tosend, choices);
link->put_queue.clear();
}
DEBUG(this, <<key.first<<" Put built");
args.root = top;
}
void pvaLinkChannel::putDone(const pvac::PutEvent& evt)
{
if(evt.event==pvac::PutEvent::Fail) {
errlogPrintf("%s PVA link put ERROR: %s\n", key.first.c_str(), evt.message.c_str());
}
Guard G(lock);
DEBUG(this, <<key.first<<" Put result "<<evt.event);
op_put = pvac::Operation();
if(evt.event==pvac::PutEvent::Success) {
// see if we need start a queue'd put
put();
}
}
void pvaLinkChannel::monitorEvent(const pvac::MonitorEvent& evt)
{
bool queue = false;
{
DEBUG(this, <<key.first<<" EVENT "<<evt.event);
Guard G(lock);
switch(evt.event) {
case pvac::MonitorEvent::Disconnect:
case pvac::MonitorEvent::Data:
connected = evt.event == pvac::MonitorEvent::Data;
queue = true;
break;
case pvac::MonitorEvent::Cancel:
break; // no-op
case pvac::MonitorEvent::Fail:
connected = false;
queue = true;
errlogPrintf("%s: PVA link monitor ERROR: %s\n", chan.name().c_str(), evt.message.c_str());
break;
}
if(queued)
return; // already scheduled
queued = queue;
}
if(queue) {
pvaGlobal->queue.add(shared_from_this());
}
}
// the work in calling dbProcess() which is common to
// both dbScanLock() and dbScanLockMany()
void pvaLinkChannel::run_dbProcess(size_t idx)
{
dbCommon *precord = scan_records[idx];
if(scan_check_passive[idx] && precord->scan!=0) {
return;
} else if(connected_latched && !op_mon.changed.logical_and(scan_changed[idx])) {
return;
} else if (precord->pact) {
if (precord->tpro)
printf("%s: Active %s\n",
epicsThreadGetNameSelf(), precord->name);
precord->rpro = TRUE;
}
dbProcess(precord);
}
// Running from global WorkQueue thread
void pvaLinkChannel::run()
{
bool requeue = false;
{
Guard G(lock);
queued = false;
connected_latched = connected;
// pop next update from monitor queue.
// still under lock to safeguard concurrent calls to lset functions
if(connected && !op_mon.poll()) {
DEBUG(this, <<key.first<<" RUN "<<"empty");
run_done.signal();
return; // monitor queue is empty, nothing more to do here
}
DEBUG(this, <<key.first<<" RUN "<<(connected_latched?"connected":"disconnected"));
assert(!connected || !!op_mon.root);
if(!connected) {
num_disconnect++;
// cancel pending put operations
op_put = pvac::Operation();
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
link->onDisconnect();
}
// Don't clear previous_root on disconnect.
// We will usually re-connect with the same type,
// and may get back the same PVStructure.
} else if(previous_root.get() != (const void*)op_mon.root.get()) {
num_type_change++;
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
link->onTypeChange();
}
previous_root = std::tr1::static_pointer_cast<const void>(op_mon.root);
}
// at this point we know we will re-queue, but not immediately
// so an expected error won't get us stuck in a tight loop.
requeue = queued = connected_latched;
if(links_changed) {
// a link has been added or removed since the last update.
// rebuild our cached list of records to (maybe) process.
scan_records.clear();
scan_check_passive.clear();
scan_changed.clear();
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
assert(link && link->alive);
if(!link->plink) continue;
// NPP and none/Default don't scan
// PP, CP, and CPP do scan
// PP and CPP only if SCAN=Passive
if(link->pp != pvaLink::PP && link->pp != pvaLink::CPP && link->pp != pvaLink::CP)
continue;
scan_records.push_back(link->plink->precord);
scan_check_passive.push_back(link->pp != pvaLink::CP);
scan_changed.push_back(link->proc_changed);
}
DBManyLock ML(scan_records);
atomic_lock.swap(ML);
links_changed = false;
}
}
if(scan_records.empty()) {
// Nothing to do, so don't bother locking
} else if(isatomic && scan_records.size() > 1u) {
DBManyLocker L(atomic_lock);
for(size_t i=0, N=scan_records.size(); i<N; i++) {
run_dbProcess(i);
}
} else {
for(size_t i=0, N=scan_records.size(); i<N; i++) {
DBScanLocker L(scan_records[i]);
run_dbProcess(i);
}
}
if(requeue) {
// re-queue until monitor queue is empty
pvaGlobal->queue.add(shared_from_this());
} else {
run_done.signal();
}
}
} // namespace pvalink

324
pdbApp/pvalink_jlif.cpp Normal file
View File

@ -0,0 +1,324 @@
#include <sstream>
#include <epicsStdio.h> // redirects stdout/stderr
#define epicsExportSharedSymbols
#include <shareLib.h>
#include "pvalink.h"
namespace pvalink {
pvaLinkConfig::pvaLinkConfig()
:queueSize(4)
,pp(Default)
,ms(NMS)
,defer(false)
,pipeline(false)
,time(false)
,retry(false)
,local(false)
,always(false)
,monorder(0)
{}
pvaLinkConfig::~pvaLinkConfig() {}
}
namespace {
using namespace pvalink;
/* link options.
*
* "pvname" # short-hand, sets PV name only
*
* {
* "pv":"name",
* "field":"blah.foo",
* "Q":5,
* "pipeline":false,
* "proc":true, // false, true, none, "", "NPP", "PP", "CP", "CPP"
* "sevr":true, // false, true, "NMS", "MS", "MSI", "MSS"
* "time":true, // false, true
* "monorder":#,// order of processing during CP scan
* "defer":true,// whether to immediately start Put, or only queue value to be sent
* "retry":true,// queue Put while disconnected, and retry on connect
* "always":true,// CP/CPP updates always process a like, even if its input field hasn't changed
* "local":false,// Require local channel
* }
*/
jlink* pva_alloc_jlink(short dbr)
{
try {
TRACE();
return new pvaLink;
}catch(std::exception& e){
errlogPrintf("Error allocating pva link: %s\n", e.what());
return NULL;
}
}
#define TRY pvaLinkConfig *pvt = static_cast<pvaLinkConfig*>(pjlink); (void)pvt; try
#define CATCH(RET) catch(std::exception& e){ \
errlogPrintf("Error in %s link: %s\n", __FUNCTION__, e.what()); \
return RET; }
void pva_free_jlink(jlink *pjlink)
{
TRY {
TRACE();
delete pvt;
}catch(std::exception& e){
errlogPrintf("Error freeing pva link: %s\n", e.what());
}
}
jlif_result pva_parse_null(jlink *pjlink)
{
TRY {
TRACE(<<pvt->jkey<<" ");
if(pvt->parseDepth!=1) {
// ignore
} else if(pvt->jkey == "proc") {
pvt->pp = pvaLinkConfig::Default;
} else if(pvt->jkey == "sevr") {
pvt->ms = pvaLinkConfig::NMS;
} else if(pvt->jkey == "local") {
pvt->local = false; // alias for local:false
} else if(pvt->debug) {
printf("pva link parsing unknown none depth=%u key=\"%s\"\n",
pvt->parseDepth, pvt->jkey.c_str());
}
pvt->jkey.clear();
return jlif_continue;
}CATCH(jlif_stop)
}
jlif_result pva_parse_bool(jlink *pjlink, int val)
{
TRY {
TRACE(<<pvt->jkey<<" "<<(val?"true":"false"));
if(pvt->parseDepth!=1) {
// ignore
} else if(pvt->jkey == "proc") {
pvt->pp = val ? pvaLinkConfig::PP : pvaLinkConfig::NPP;
} else if(pvt->jkey == "sevr") {
pvt->ms = val ? pvaLinkConfig::MS : pvaLinkConfig::NMS;
} else if(pvt->jkey == "defer") {
pvt->defer = !!val;
} else if(pvt->jkey == "pipeline") {
pvt->pipeline = !!val;
} else if(pvt->jkey == "time") {
pvt->time = !!val;
} else if(pvt->jkey == "retry") {
pvt->retry = !!val;
} else if(pvt->jkey == "local") {
pvt->local = !!val;
} else if(pvt->jkey == "always") {
pvt->always = !!val;
} else if(pvt->debug) {
printf("pva link parsing unknown integer depth=%u key=\"%s\" value=%s\n",
pvt->parseDepth, pvt->jkey.c_str(), val ? "true" : "false");
}
pvt->jkey.clear();
return jlif_continue;
}CATCH(jlif_stop)
}
jlif_result pva_parse_integer(jlink *pjlink, long long val)
{
TRY {
TRACE(<<pvt->jkey<<" "<<val);
if(pvt->parseDepth!=1) {
// ignore
} else if(pvt->jkey == "Q") {
pvt->queueSize = val < 1 ? 1 : size_t(val);
} else if(pvt->jkey == "monorder") {
pvt->monorder = std::max(-1024, std::min(int(val), 1024));
} else if(pvt->debug) {
printf("pva link parsing unknown integer depth=%u key=\"%s\" value=%lld\n",
pvt->parseDepth, pvt->jkey.c_str(), val);
}
pvt->jkey.clear();
return jlif_continue;
}CATCH(jlif_stop)
}
jlif_result pva_parse_string(jlink *pjlink, const char *val, size_t len)
{
TRY{
std::string sval(val, len);
TRACE(<<pvt->jkey<<" "<<sval);
if(pvt->parseDepth==0 || (pvt->parseDepth==1 && pvt->jkey=="pv")) {
pvt->channelName = sval;
} else if(pvt->parseDepth > 1) {
// ignore
} else if(pvt->jkey=="field") {
pvt->fieldName = sval;
} else if(pvt->jkey=="proc") {
if(sval.empty()) {
pvt->pp = pvaLinkConfig::Default;
} else if(sval=="CP") {
pvt->pp = pvaLinkConfig::CP;
} else if(sval=="CPP") {
pvt->pp = pvaLinkConfig::CPP;
} else if(sval=="PP") {
pvt->pp = pvaLinkConfig::PP;
} else if(sval=="NPP") {
pvt->pp = pvaLinkConfig::NPP;
} else if(pvt->debug) {
printf("pva link parsing unknown proc depth=%u key=\"%s\" value=\"%s\"\n",
pvt->parseDepth, pvt->jkey.c_str(), sval.c_str());
}
} else if(pvt->jkey=="sevr") {
if(sval=="NMS") {
pvt->ms = pvaLinkConfig::NMS;
} else if(sval=="MS") {
pvt->ms = pvaLinkConfig::MS;
} else if(sval=="MSI") {
pvt->ms = pvaLinkConfig::MSI;
} else if(sval=="MSS") {
// not sure how to handle mapping severity for MSS.
// leave room for this to happen compatibly later by
// handling as alias for MS until then.
pvt->ms = pvaLinkConfig::MS;
} else if(pvt->debug) {
printf("pva link parsing unknown sevr depth=%u key=\"%s\" value=\"%s\"\n",
pvt->parseDepth, pvt->jkey.c_str(), sval.c_str());
}
} else if(pvt->debug) {
printf("pva link parsing unknown string depth=%u key=\"%s\" value=\"%s\"\n",
pvt->parseDepth, pvt->jkey.c_str(), sval.c_str());
}
pvt->jkey.clear();
return jlif_continue;
}CATCH(jlif_stop)
}
jlif_key_result pva_parse_start_map(jlink *pjlink)
{
TRY {
TRACE();
return jlif_key_continue;
}CATCH(jlif_key_stop)
}
jlif_result pva_parse_key_map(jlink *pjlink, const char *key, size_t len)
{
TRY {
std::string sval(key, len);
TRACE(<<sval);
pvt->jkey = sval;
return jlif_continue;
}CATCH(jlif_stop)
}
jlif_result pva_parse_end_map(jlink *pjlink)
{
TRY {
TRACE();
return jlif_continue;
}CATCH(jlif_stop)
}
struct lset* pva_get_lset(const jlink *pjlink)
{
TRACE();
return &pva_lset;
}
void pva_report(const jlink *rpjlink, int lvl, int indent)
{
const pvaLink *pval = static_cast<const pvaLink*>(rpjlink);
try {
(void)pval;
printf("%*s'pva': %s", indent, "", pval->channelName.c_str());
if(!pval->fieldName.empty())
printf("|.%s", pval->fieldName.c_str());
switch(pval->pp) {
case pvaLinkConfig::NPP: printf(" NPP"); break;
case pvaLinkConfig::Default: printf(" Def"); break;
case pvaLinkConfig::PP: printf(" PP"); break;
case pvaLinkConfig::CP: printf(" CP"); break;
case pvaLinkConfig::CPP: printf(" CPP"); break;
}
switch(pval->ms) {
case pvaLinkConfig::NMS: printf(" NMS"); break;
case pvaLinkConfig::MS: printf(" MS"); break;
case pvaLinkConfig::MSI: printf(" MSI"); break;
}
if(lvl>0) {
printf(" Q=%u pipe=%c defer=%c time=%c retry=%c morder=%d",
unsigned(pval->queueSize),
pval->pipeline ? 'T' : 'F',
pval->defer ? 'T' : 'F',
pval->time ? 'T' : 'F',
pval->retry ? 'T' : 'F',
pval->monorder);
}
if(pval->lchan) {
// after open()
Guard G(pval->lchan->lock);
printf(" conn=%c", pval->lchan->connected ? 'T' : 'F');
if(pval->lchan->op_put.valid()) {
printf(" Put");
}
if(lvl>0) {
printf(" #disconn=%zu prov=%s", pval->lchan->num_disconnect, pval->lchan->providerName.c_str());
}
if(lvl>1) {
printf(" inprog=%c",
pval->lchan->queued?'T':'F');
}
if(lvl>5) {
std::ostringstream strm;
pval->lchan->chan.show(strm);
printf("\n%*s CH: %s", indent, "", strm.str().c_str());
}
} else {
printf(" No Channel");
}
printf("\n");
}CATCH()
}
} //namespace
namespace pvalink {
jlif lsetPVA = {
"pva",
&pva_alloc_jlink,
&pva_free_jlink,
&pva_parse_null,
&pva_parse_bool,
&pva_parse_integer,
NULL,
&pva_parse_string,
&pva_parse_start_map,
&pva_parse_key_map,
&pva_parse_end_map,
NULL,
NULL,
NULL,
&pva_get_lset,
&pva_report,
NULL
};
} //namespace pvalink

156
pdbApp/pvalink_link.cpp Normal file
View File

@ -0,0 +1,156 @@
#include <pv/reftrack.h>
#include <alarm.h>
#define epicsExportSharedSymbols
#include <shareLib.h>
#include "pvalink.h"
namespace pvalink {
pvaLink::pvaLink()
:alive(true)
,plink(0)
,used_scratch(false)
,used_queue(false)
{
REFTRACE_INCREMENT(num_instances);
snap_severity = INVALID_ALARM;
snap_time.secPastEpoch = 0;
snap_time.nsec = 0;
//TODO: valgrind tells me these aren't initialized by Base, but probably should be.
parseDepth = 0;
parent = 0;
}
pvaLink::~pvaLink()
{
alive = false;
if(lchan) { // may be NULL if parsing fails
Guard G(lchan->lock);
lchan->links.erase(this);
lchan->links_changed = true;
bool new_debug = false;
for(pvaLinkChannel::links_t::const_iterator it(lchan->links.begin()), end(lchan->links.end())
; it!=end; ++it)
{
const pvaLink *pval = *it;
if(pval->debug) {
new_debug = true;
break;
}
}
lchan->debug = new_debug;
}
REFTRACE_DECREMENT(num_instances);
}
static
pvd::StructureConstPtr monitorRequestType = pvd::getFieldCreate()->createFieldBuilder()
->addNestedStructure("field")
->endNested()
->addNestedStructure("record")
->addNestedStructure("_options")
->add("pipeline", pvd::pvBoolean)
->add("atomic", pvd::pvBoolean)
->add("queueSize", pvd::pvUInt)
->endNested()
->endNested()
->createStructure();
pvd::PVStructurePtr pvaLink::makeRequest()
{
pvd::PVStructurePtr ret(pvd::getPVDataCreate()->createPVStructure(monitorRequestType));
ret->getSubFieldT<pvd::PVBoolean>("record._options.pipeline")->put(pipeline);
ret->getSubFieldT<pvd::PVBoolean>("record._options.atomic")->put(true);
ret->getSubFieldT<pvd::PVUInt>("record._options.queueSize")->put(queueSize);
return ret;
}
// caller must lock lchan->lock
bool pvaLink::valid() const
{
return lchan->connected_latched && lchan->op_mon.root;
}
// caller must lock lchan->lock
pvd::PVField::const_shared_pointer pvaLink::getSubField(const char *name)
{
pvd::PVField::const_shared_pointer ret;
if(valid()) {
if(fieldName.empty()) {
// we access the top level struct
ret = lchan->op_mon.root->getSubField(name);
} else {
// we access a sub-struct
ret = lchan->op_mon.root->getSubField(fieldName);
if(ret->getField()->getType()!=pvd::structure) {
// addressed sub-field isn't a sub-structure
if(strcmp(name, "value")!=0) {
// unless we are trying to fetch the "value", we fail here
ret.reset();
}
} else {
ret = static_cast<const pvd::PVStructure*>(ret.get())->getSubField(name);
}
}
}
return ret;
}
// call with channel lock held
void pvaLink::onDisconnect()
{
DEBUG(this,<<plink->precord->name<<" disconnect");
// TODO: option to remain queue'd while disconnected
used_queue = used_scratch = false;
}
void pvaLink::onTypeChange()
{
DEBUG(this,<<plink->precord->name<<" type change");
assert(lchan->connected_latched && !!lchan->op_mon.root); // we should only be called when connected
fld_value = getSubField("value");
fld_seconds = std::tr1::dynamic_pointer_cast<const pvd::PVScalar>(getSubField("timeStamp.secondsPastEpoch"));
fld_nanoseconds = std::tr1::dynamic_pointer_cast<const pvd::PVScalar>(getSubField("timeStamp.nanoseconds"));
fld_severity = std::tr1::dynamic_pointer_cast<const pvd::PVScalar>(getSubField("alarm.severity"));
fld_display = std::tr1::dynamic_pointer_cast<const pvd::PVStructure>(getSubField("display"));
fld_control = std::tr1::dynamic_pointer_cast<const pvd::PVStructure>(getSubField("control"));
fld_valueAlarm = std::tr1::dynamic_pointer_cast<const pvd::PVStructure>(getSubField("valueAlarm"));
proc_changed.clear();
// build mask of all "changed" bits associated with our .value
// CP/CPP input links will process this link only for updates where
// the changed mask and proc_changed share at least one set bit.
if(fld_value) {
// bit for this field
proc_changed.set(fld_value->getFieldOffset());
// bits of all parent fields
for(const pvd::PVStructure* parent = fld_value->getParent(); parent; parent = parent->getParent()) {
proc_changed.set(parent->getFieldOffset());
}
if(fld_value->getField()->getType()==pvd::structure)
{
// bits of all child fields
const pvd::PVStructure *val = static_cast<const pvd::PVStructure*>(fld_value.get());
for(size_t i=val->getFieldOffset(), N=val->getNextFieldOffset(); i<N; i++)
proc_changed.set(i);
}
}
}
} // namespace pvalink

494
pdbApp/pvalink_lset.cpp Normal file
View File

@ -0,0 +1,494 @@
#include <epicsString.h>
#include <alarm.h>
#include <recGbl.h>
#include <epicsStdio.h> // redirect stdout/stderr
#include <pv/current_function.h>
#define epicsExportSharedSymbols
#include <shareLib.h>
#include "pvalink.h"
namespace {
using namespace pvalink;
#define TRY pvaLink *self = static_cast<pvaLink*>(plink->value.json.jlink); assert(self->alive); try
#define CATCH() catch(std::exception& e) { \
errlogPrintf("pvaLink %s fails %s: %s\n", CURRENT_FUNCTION, plink->precord->name, e.what()); \
}
#define CHECK_VALID() if(!self->valid()) { DEBUG(self, <<CURRENT_FUNCTION<<" "<<self->channelName<<" !valid"); return -1;}
void pvaOpenLink(DBLINK *plink)
{
try {
pvaLink* self((pvaLink*)plink->value.json.jlink);
// workaround for Base not propagating info(base:lsetDebug to us
{
pdbRecordIterator rec(plink->precord);
if(epicsStrCaseCmp(rec.info("base:lsetDebug", "NO"), "YES")==0) {
self->debug = 1;
}
}
DEBUG(self, <<plink->precord->name<<" OPEN "<<self->channelName);
// still single threaded at this point.
// also, no pvaLinkChannel::lock yet
self->plink = plink;
if(self->channelName.empty())
return; // nothing to do...
pvd::PVStructure::const_shared_pointer pvRequest(self->makeRequest());
pvaGlobal_t::channels_key_t key;
{
std::ostringstream strm;
strm<<*pvRequest; // print the request as a convient key for our channel cache
key = std::make_pair(self->channelName, strm.str());
}
std::tr1::shared_ptr<pvaLinkChannel> chan;
bool doOpen = false;
{
Guard G(pvaGlobal->lock);
pvaGlobal_t::channels_t::iterator it(pvaGlobal->channels.find(key));
if(it!=pvaGlobal->channels.end()) {
// re-use existing channel
chan = it->second.lock();
}
if(!chan) {
// open new channel
chan.reset(new pvaLinkChannel(key, pvRequest));
pvaGlobal->channels.insert(std::make_pair(key, chan));
doOpen = true;
}
doOpen &= pvaGlobal->running; // if not running, then open from initHook
}
if(doOpen) {
chan->open(); // start subscription
}
if(!self->local || chan->providerName=="QSRV"){
Guard G(chan->lock);
chan->links.insert(self);
chan->links_changed = true;
self->lchan.swap(chan); // we are now attached
self->lchan->debug |= !!self->debug;
} else {
// TODO: only print duing iocInit()?
fprintf(stderr, "%s Error: local:true link to '%s' can't be fulfilled\n",
plink->precord->name, self->channelName.c_str());
plink->lset = NULL;
}
return;
}CATCH()
// on error, prevent any further calls to our lset functions
plink->lset = NULL;
}
void pvaRemoveLink(struct dbLocker *locker, DBLINK *plink)
{
try {
p2p::auto_ptr<pvaLink> self((pvaLink*)plink->value.json.jlink);
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName);
assert(self->alive);
}CATCH()
}
int pvaIsConnected(const DBLINK *plink)
{
TRY {
Guard G(self->lchan->lock);
bool ret = self->valid();
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<ret);
return ret;
}CATCH()
return 0;
}
int pvaGetDBFtype(const DBLINK *plink)
{
TRY {
Guard G(self->lchan->lock);
CHECK_VALID();
// if fieldName is empty, use top struct value
// if fieldName not empty
// if sub-field is struct, use sub-struct .value
// if sub-field not struct, treat as value
pvd::PVField::const_shared_pointer value(self->getSubField("value"));
pvd::ScalarType ftype = pvd::pvInt; // default for un-mapable types.
if(!value) {
// no-op
} else if(value->getField()->getType()==pvd::scalar)
ftype = static_cast<const pvd::Scalar*>(value->getField().get())->getScalarType();
else if(value->getField()->getType()==pvd::scalarArray)
ftype = static_cast<const pvd::ScalarArray*>(value->getField().get())->getElementType();
int ret;
switch(ftype) {
#define CASE(BASETYPE, PVATYPE, DBFTYPE, PVACODE) case pvd::pv##PVACODE: ret = DBF_##DBFTYPE;
#define CASE_REAL_INT64
#include "pv/typemap.h"
#undef CASE_REAL_INT64
#undef CASE
case pvd::pvString: ret = DBF_STRING; // TODO: long string?
}
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<dbGetFieldTypeString(ret));
return ret;
}CATCH()
return -1;
}
long pvaGetElements(const DBLINK *plink, long *nelements)
{
TRY {
Guard G(self->lchan->lock);
CHECK_VALID();
long ret = 0;
if(self->fld_value && self->fld_value->getField()->getType()==pvd::scalarArray)
ret = static_cast<const pvd::PVScalarArray*>(self->fld_value.get())->getLength();
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<ret);
return ret;
}CATCH()
return -1;
}
long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer,
long *pnRequest)
{
TRY {
Guard G(self->lchan->lock);
if(!self->valid()) {
// disconnected
if(self->ms != pvaLink::NMS) {
recGblSetSevr(plink->precord, LINK_ALARM, self->snap_severity);
}
// TODO: better capture of disconnect time
epicsTimeGetCurrent(&self->snap_time);
if(self->time) {
plink->precord->time = self->snap_time;
}
DEBUG(self, <<CURRENT_FUNCTION<<" "<<self->channelName<<" !valid");
return -1;
}
if(self->fld_value) {
long status = copyPVD2DBF(self->fld_value, pbuffer, dbrType, pnRequest);
if(status) {
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<status);
return status;
}
}
if(self->fld_seconds) {
self->snap_time.secPastEpoch = self->fld_seconds->getAs<pvd::uint32>() - POSIX_TIME_AT_EPICS_EPOCH;
if(self->fld_nanoseconds) {
self->snap_time.nsec = self->fld_nanoseconds->getAs<pvd::uint32>();
} else {
self->snap_time.nsec = 0u;
}
} else {
self->snap_time.secPastEpoch = 0u;
self->snap_time.nsec = 0u;
}
if(self->fld_severity) {
self->snap_severity = self->fld_severity->getAs<pvd::uint16>();
} else {
self->snap_severity = NO_ALARM;
}
if((self->snap_severity!=NO_ALARM && self->ms == pvaLink::MS) ||
(self->snap_severity==INVALID_ALARM && self->ms == pvaLink::MSI))
{
recGblSetSevr(plink->precord, LINK_ALARM, self->snap_severity);
}
if(self->time) {
plink->precord->time = self->snap_time;
}
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" OK");
return 0;
}CATCH()
return -1;
}
long pvaGetControlLimits(const DBLINK *plink, double *lo, double *hi)
{
TRY {
Guard G(self->lchan->lock);
CHECK_VALID();
if(self->fld_control) {
pvd::PVScalar::const_shared_pointer value;
if(lo) {
value = std::tr1::static_pointer_cast<const pvd::PVScalar>(self->fld_control->getSubField("limitLow"));
*lo = value ? value->getAs<double>() : 0.0;
}
if(hi) {
value = std::tr1::static_pointer_cast<const pvd::PVScalar>(self->fld_control->getSubField("limitHigh"));
*hi = value ? value->getAs<double>() : 0.0;
}
} else {
*lo = *hi = 0.0;
}
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<(lo ? *lo : 0)<<" "<<(hi ? *hi : 0));
return 0;
}CATCH()
return -1;
}
long pvaGetGraphicLimits(const DBLINK *plink, double *lo, double *hi)
{
TRY {
Guard G(self->lchan->lock);
CHECK_VALID();
if(self->fld_display) {
pvd::PVScalar::const_shared_pointer value;
if(lo) {
value = std::tr1::static_pointer_cast<const pvd::PVScalar>(self->fld_display->getSubField("limitLow"));
*lo = value ? value->getAs<double>() : 0.0;
}
if(hi) {
value = std::tr1::static_pointer_cast<const pvd::PVScalar>(self->fld_display->getSubField("limitHigh"));
*hi = value ? value->getAs<double>() : 0.0;
}
} else {
*lo = *hi = 0.0;
}
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<(lo ? *lo : 0)<<" "<<(hi ? *hi : 0));
return 0;
}CATCH()
return -1;
}
long pvaGetAlarmLimits(const DBLINK *plink, double *lolo, double *lo,
double *hi, double *hihi)
{
TRY {
//Guard G(self->lchan->lock);
//CHECK_VALID();
*lolo = *lo = *hi = *hihi = 0.0;
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<(lolo ? *lolo : 0)<<" "<<(lo ? *lo : 0)<<" "<<(hi ? *hi : 0)<<" "<<(hihi ? *hihi : 0));
return 0;
}CATCH()
return -1;
}
long pvaGetPrecision(const DBLINK *plink, short *precision)
{
TRY {
//Guard G(self->lchan->lock);
//CHECK_VALID();
// No sane way to recover precision from display.format string.
*precision = 0;
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<precision);
return 0;
}CATCH()
return -1;
}
long pvaGetUnits(const DBLINK *plink, char *units, int unitsSize)
{
TRY {
Guard G(self->lchan->lock);
CHECK_VALID();
if(unitsSize==0) return 0;
if(units && self->fld_display) {
pvd::PVString::const_shared_pointer value(std::tr1::static_pointer_cast<const pvd::PVString>(self->fld_display->getSubField("units")));
if(value) {
const std::string& egu = value->get();
strncpy(units, egu.c_str(), unitsSize);
}
} else if(units) {
units[0] = '\0';
}
units[unitsSize-1] = '\0';
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<units);
return 0;
}CATCH()
return -1;
}
long pvaGetAlarm(const DBLINK *plink, epicsEnum16 *status,
epicsEnum16 *severity)
{
TRY {
Guard G(self->lchan->lock);
CHECK_VALID();
if(severity) {
*severity = self->snap_severity;
}
if(status) {
*status = self->snap_severity ? LINK_ALARM : NO_ALARM;
}
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<(severity ? *severity : 0)<<" "<<(status ? *status : 0));
return 0;
}CATCH()
return -1;
}
long pvaGetTimeStamp(const DBLINK *plink, epicsTimeStamp *pstamp)
{
TRY {
Guard G(self->lchan->lock);
CHECK_VALID();
if(pstamp) {
*pstamp = self->snap_time;
}
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<(pstamp ? pstamp->secPastEpoch : 0)<<":"<<(pstamp ? pstamp->nsec: 0));
return 0;
}CATCH()
return -1;
}
// note that we handle DBF_ENUM differently than in pvif.cpp
pvd::ScalarType DBR2PVD(short dbr)
{
switch(dbr) {
#define CASE(BASETYPE, PVATYPE, DBFTYPE, PVACODE) case DBR_##DBFTYPE: return pvd::pv##PVACODE;
#define CASE_SKIP_BOOL
#include "pv/typemap.h"
#undef CASE_SKIP_BOOL
#undef CASE
case DBF_ENUM: return pvd::pvUShort;
case DBF_STRING: return pvd::pvString;
}
throw std::invalid_argument("Unsupported DBR code");
}
long pvaPutValue(DBLINK *plink, short dbrType,
const void *pbuffer, long nRequest)
{
TRY {
(void)self;
Guard G(self->lchan->lock);
if(nRequest < 0) return -1;
if(!self->retry && !self->valid()) {
DEBUG(self, <<CURRENT_FUNCTION<<" "<<self->channelName<<" !valid");
return -1;
}
pvd::ScalarType stype = DBR2PVD(dbrType);
pvd::shared_vector<const void> buf;
if(dbrType == DBF_STRING) {
const char *sbuffer = (const char*)pbuffer;
pvd::shared_vector<std::string> sval(nRequest);
for(long n=0; n<nRequest; n++, sbuffer += MAX_STRING_SIZE) {
sval[n] = std::string(sbuffer, epicsStrnLen(sbuffer, MAX_STRING_SIZE));
}
self->put_scratch = pvd::static_shared_vector_cast<const void>(pvd::freeze(sval));
} else {
pvd::shared_vector<void> val(pvd::ScalarTypeFunc::allocArray(stype, size_t(nRequest)));
assert(size_t(dbValueSize(dbrType)*nRequest) == val.size());
memcpy(val.data(), pbuffer, val.size());
self->put_scratch = pvd::freeze(val);
}
self->used_scratch = true;
if(!self->defer) self->lchan->put();
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<self->lchan->op_put.valid());
return 0;
}CATCH()
return -1;
}
void pvaScanForward(DBLINK *plink)
{
TRY {
Guard G(self->lchan->lock);
if(!self->retry && !self->valid()) {
return;
}
// FWD_LINK is never deferred, and always results in a Put
self->lchan->put(true);
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<self->lchan->op_put.valid());
}CATCH()
}
#undef TRY
#undef CATCH
} //namespace
namespace pvalink {
lset pva_lset = {
0, 1, // non-const, volatile
&pvaOpenLink,
&pvaRemoveLink,
NULL, NULL, NULL,
&pvaIsConnected,
&pvaGetDBFtype,
&pvaGetElements,
&pvaGetValue,
&pvaGetControlLimits,
&pvaGetGraphicLimits,
&pvaGetAlarmLimits,
&pvaGetPrecision,
&pvaGetUnits,
&pvaGetAlarm,
&pvaGetTimeStamp,
&pvaPutValue,
NULL,
&pvaScanForward
//&pvaReportLink,
};
} //namespace pvalink

16
pdbApp/pvalink_null.cpp Normal file
View File

@ -0,0 +1,16 @@
#include <epicsExport.h>
static void installPVAAddLinkHook() {}
struct jlif {} lsetPVA;
extern "C" {
int pvaLinkDebug;
int pvaLinkNWorkers;
epicsExportRegistrar(installPVAAddLinkHook);
epicsExportAddress(jlif, lsetPVA);
epicsExportAddress(int, pvaLinkDebug);
epicsExportAddress(int, pvaLinkNWorkers);
}

View File

@ -606,8 +606,11 @@ struct PVIFScalarNumeric : public PVIF
virtual pvd::Status get(const epics::pvData::BitSet& mask, proc_t proc) OVERRIDE FINAL
{
pvd::Status ret;
if(mask.logical_and(pvmeta.maskVALUEPut)) {
bool newval = mask.logical_and(pvmeta.maskVALUEPut);
if(newval) {
getValue(pvmeta.chan, pvmeta.value.get());
}
if(newval || proc==PVIF::ProcForce) {
ret = PVIF::get(mask, proc);
}
return ret;
@ -628,6 +631,7 @@ struct PVIFScalarNumeric : public PVIF
} // namespace
static
pvd::ScalarType DBR2PVD(short dbr)
{
switch(dbr) {
@ -639,22 +643,29 @@ pvd::ScalarType DBR2PVD(short dbr)
#undef CASE_SKIP_BOOL
#undef CASE
case DBF_STRING: return pvd::pvString;
default:
throw std::invalid_argument("Unsupported DBR code");
}
throw std::invalid_argument("Unsupported DBR code");
}
short PVD2DBR(pvd::ScalarType pvt)
{
switch(pvt) {
#define CASE(BASETYPE, PVATYPE, DBFTYPE, PVACODE) case pvd::pv##PVACODE: return DBR_##DBFTYPE;
#define CASE_SQUEEZE_INT64
#ifdef USE_INT64
# define CASE_REAL_INT64
#else
# define CASE_SQUEEZE_INT64
#endif
#include "pv/typemap.h"
#undef CASE_SQUEEZE_INT64
#ifdef USE_INT64
# undef CASE_REAL_INT64
#else
# undef CASE_SQUEEZE_INT64
#endif
#undef CASE
default:
throw std::invalid_argument("Unsupported pvType code");
case pvd::pvString: return DBF_STRING;
}
return -1;
}
epics::pvData::FieldConstPtr
@ -767,8 +778,11 @@ struct PVIFPlain : public PVIF
virtual pvd::Status get(const epics::pvData::BitSet& mask, proc_t proc)
{
pvd::Status ret;
if(mask.get(fieldOffset)) {
bool newval = mask.get(fieldOffset);
if(newval) {
getValue(channel, field.get());
}
if(newval || proc==PVIF::ProcForce) {
ret = PVIF::get(mask, proc);
}
return ret;

View File

@ -29,9 +29,19 @@
# define USE_MULTILOCK
#endif
epics::pvData::ScalarType DBR2PVD(short dbr);
short PVD2DBR(epics::pvData::ScalarType pvt);
// copy from PVField (.value sub-field) to DBF buffer
epicsShareExtern
long copyPVD2DBF(const epics::pvData::PVField::const_shared_pointer& in,
void *outbuf, short outdbf, long *outnReq);
// copy from DBF buffer to PVField (.value sub-field)
epicsShareExtern
long copyDBF2PVD(const epics::pvData::shared_vector<const void>& buf,
const epics::pvData::PVField::shared_pointer& out,
epics::pvData::BitSet &changed,
const epics::pvData::PVStringArray::const_svector& choices);
union dbrbuf {
epicsInt8 dbf_CHAR;
epicsUInt8 dbf_UCHAR;
@ -109,6 +119,13 @@ struct pdbRecordIterator {
#endif
m_done = false;
}
#if EPICS_VERSION_INT>=VERSION_INT(3,16,1,0)
pdbRecordIterator(dbCommon *prec)
{
dbInitEntryFromRecord(prec, &ent);
m_done = false;
}
#endif
~pdbRecordIterator()
{
dbFinishEntry(&ent);
@ -229,7 +246,7 @@ struct DBManyLock
{
dbLocker *plock;
DBManyLock() :plock(NULL) {}
DBManyLock(const std::vector<dbCommon*>& recs, unsigned flags)
DBManyLock(const std::vector<dbCommon*>& recs, unsigned flags=0)
:plock(dbLockerAlloc((dbCommon**)&recs[0], recs.size(), flags))
{
if(!plock) throw std::invalid_argument("Failed to create locker");

20
pdbApp/qsrv-new.dbd Normal file
View File

@ -0,0 +1,20 @@
# Changes to this file may require incrementing
# the ABI version in CONFIG_QSRV_VERSION
registrar(QSRVRegistrar)
registrar(installPVAAddLinkHook)
link("pva", "lsetPVA")
# from demo.cpp
device(waveform, CONSTANT, devWfPDBDemo, "QSRV Demo")
# from imagedemo.c
function(QSRV_image_demo)
# from pvif.cpp
# Disable mapping of display.format
variable(qsrvDisableFormat, int)
# from pdb.cpp
# Extra debug info when parsing group definitions
variable(PDBProviderDebug, int)
# Number of worker threads for handling monitor updates.
# Default: 1
variable(pvaLinkNWorkers, int)

View File

@ -2,8 +2,7 @@
# the ABI version in CONFIG_QSRV_VERSION
registrar(QSRVRegistrar)
#registrar(installPVAAddLinkHook)
#link("pva", "lsetPVA")
registrar(installPVAAddLinkHook)
# from demo.cpp
device(waveform, CONSTANT, devWfPDBDemo, "QSRV Demo")
@ -15,3 +14,6 @@ variable(qsrvDisableFormat, int)
# from pdb.cpp
# Extra debug info when parsing group definitions
variable(PDBProviderDebug, int)
# Number of worker threads for handling monitor updates.
# Default: 1
variable(pvaLinkNWorkers, int)

View File

@ -20,6 +20,7 @@
#define epicsExportSharedSymbols
#include "pv/qsrv.h"
#include "pvahelper.h"
#include "pvif.h"
#include "pdb.h"
@ -93,6 +94,16 @@ void QSRVRegistrar()
} // namespace
unsigned qsrvVersion(void)
{
return QSRV_VERSION_INT;
}
unsigned qsrvABIVersion(void)
{
return QSRV_ABI_VERSION_INT;
}
extern "C" {
epicsExportRegistrar(QSRVRegistrar);
}

134
pdbApp/tpool.cpp Normal file
View File

@ -0,0 +1,134 @@
#include <typeinfo>
#include <stdexcept>
#include <epicsGuard.h>
#include <errlog.h>
#include <pv/sharedPtr.h>
#define epicsExportSharedSymbols
#include "helper.h"
#include "tpool.h"
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
WorkQueue::WorkQueue(const std::string& name)
:name(name)
,state(Idle)
{}
WorkQueue::~WorkQueue() { close(); }
void WorkQueue::start(unsigned nworkers, unsigned prio)
{
Guard G(mutex);
if(state!=Idle)
throw std::logic_error("Already started");
try {
state = Active;
for(unsigned i=0; i<nworkers; i++) {
p2p::auto_ptr<epicsThread> worker(new epicsThread(*this, name.c_str(),
epicsThreadGetStackSize(epicsThreadStackSmall),
prio));
worker->start();
workers.push_back(worker.get());
worker.release();
}
}catch(...){
UnGuard U(G); // unlock as close() blocks to join any workers which were started
close();
throw;
}
}
void WorkQueue::close()
{
workers_t temp;
{
Guard G(mutex);
if(state!=Active)
return;
temp.swap(workers);
state = Stopping;
}
wakeup.signal();
for(workers_t::iterator it(temp.begin()), end(temp.end()); it!=end; ++it)
{
(*it)->exitWait();
delete *it;
}
{
Guard G(mutex);
state = Idle;
}
}
void WorkQueue::add(const value_type& work)
{
bool empty;
{
Guard G(mutex);
if(state!=Active)
return;
empty = queue.empty();
queue.push_back(work);
}
if(empty) {
wakeup.signal();
}
}
void WorkQueue::run()
{
Guard G(mutex);
std::tr1::shared_ptr<epicsThreadRunable> work;
while(state==Active) {
if(!queue.empty()) {
work = queue.front().lock();
queue.pop_front();
}
bool last = queue.empty();
{
UnGuard U(G);
if(work) {
try {
work->run();
work.reset();
}catch(std::exception& e){
errlogPrintf("%s Unhandled exception from %s: %s\n",
name.c_str(), typeid(work.get()).name(), e.what());
work.reset();
}
}
if(last) {
wakeup.wait();
}
}
}
// pass along the close() signal to next worker
wakeup.signal();
}

53
pdbApp/tpool.h Normal file
View File

@ -0,0 +1,53 @@
#ifndef TPOOL_H
#define TPOOL_H
#include <stdexcept>
#include <deque>
#include <vector>
#include <errlog.h>
#include <epicsThread.h>
#include <epicsMutex.h>
#include <epicsEvent.h>
#include <pv/sharedPtr.h>
#include <shareLib.h>
struct WorkQueue : private epicsThreadRunable
{
typedef std::tr1::weak_ptr<epicsThreadRunable> value_type;
private:
const std::string name;
epicsMutex mutex;
enum state_t {
Idle,
Active,
Stopping,
} state;
typedef std::deque<value_type> queue_t;
queue_t queue;
epicsEvent wakeup;
typedef std::vector<epicsThread*> workers_t;
workers_t workers;
public:
WorkQueue(const std::string& name);
virtual ~WorkQueue();
void start(unsigned nworkers=1, unsigned prio = epicsThreadPriorityLow);
void close();
void add(const value_type& work);
private:
virtual void run();
};
#endif // TPOOL_H

View File

@ -14,6 +14,10 @@ p2pTestIoc_DBD += base.dbd
# Tests explicitly create/destory PDB provider
#p2pTestIoc_DBD += qsrv.dbd
TARGETS += $(COMMON_DIR)/pvaLinkTestIoc.dbd
pvaLinkTestIoc_DBD += base.dbd
pvaLinkTestIoc_DBD += qsrv.dbd
PROD_SRCS += utilitiesx.cpp
PROD_LIBS += pvAccess pvData
PROD_LIBS += $(EPICS_BASE_IOC_LIBS)
@ -43,17 +47,24 @@ TESTS += testpdb
PROD_HOST += check_consist
check_consist_SRCS += check_consist.cpp
#TESTPROD_HOST += testpvalink
testpvalink_SRCS += testpvalink.cpp
testpvalink_SRCS += p2pTestIoc_registerRecordDeviceDriver.cpp
testpvalink_LIBS += qsrv
#TESTS += testpvalink
ifdef BASE_3_16
TESTPROD_HOST += testpvalink
testpvalink_SRCS += testpvalink.cpp
testpvalink_SRCS += pvaLinkTestIoc_registerRecordDeviceDriver.cpp
testpvalink_LIBS += qsrv
TESTS += testpvalink
TESTPROD_HOST += testgroupconfig
testgroupconfig_SRCS += testgroupconfig
testgroupconfig_LIBS += qsrv pvAccess pvData
testgroupconfig_LIBS += $(EPICS_BASE_IOC_LIBS)
TESTS += testgroupconfig
TESTPROD_HOST += testdbf_copy
testdbf_copy_SRCS += testdbf_copy
testdbf_copy_LIBS += qsrv pvAccess pvData
testdbf_copy_LIBS += $(EPICS_BASE_IOC_LIBS)
TESTS += testdbf_copy
endif
TESTSCRIPTS_HOST += $(TESTS:%=%.t)

269
testApp/testdbf_copy.cpp Normal file
View File

@ -0,0 +1,269 @@
#include <stdexcept>
#include <pv/pvUnitTest.h>
#include <testMain.h>
#include <dbStaticLib.h>
#include <epicsTypes.h>
#include <pv/valueBuilder.h>
#include <pv/pvData.h>
#include "pvif.h"
namespace pvd = epics::pvData;
namespace {
template<pvd::ScalarType ENUM, typename DBF, typename E>
void testPVD2DBR_scalar(unsigned dbf, typename pvd::meta::arg_type<typename pvd::ScalarTypeTraits<ENUM>::type>::type V, E expect)
{
testDiag("testPVD2DBR_scalar(%s, %s)", pvd::ScalarTypeFunc::name(ENUM), dbGetFieldTypeString(dbf));
pvd::PVStructure::shared_pointer top(pvd::ValueBuilder()
.add<ENUM>("value", V)
.buildPVStructure());
DBF buf;
copyPVD2DBF(top->getSubFieldT("value"), &buf, dbf, NULL);
testEqual(buf, expect);
}
void testPVD2DBR_enum()
{
testDiag("testPVD2DBR_enum()");
pvd::shared_vector<std::string> choices(3);
choices[0] = "zero";
choices[1] = "one";
choices[2] = "two";
pvd::PVStructure::shared_pointer top(pvd::ValueBuilder()
.addNested("value")
.add<pvd::pvInt>("index", 1)
.add("choices", pvd::static_shared_vector_cast<const void>(pvd::freeze(choices)))
.endNested()
.buildPVStructure());
{
epicsEnum16 ival;
copyPVD2DBF(top->getSubFieldT("value"), &ival, DBF_ENUM, NULL);
testEqual(ival, 1);
ival = 0;
testOk1(!!top->getSubField("value"));
copyPVD2DBF(top->getSubFieldT("value"), &ival, DBF_USHORT, NULL);
testEqual(ival, 1);
}
{
epicsUInt32 ival;
copyPVD2DBF(top->getSubFieldT("value"), &ival, DBF_LONG, NULL);
testEqual(ival, 1u);
}
char sval[MAX_STRING_SIZE];
copyPVD2DBF(top->getSubFieldT("value"), sval, DBF_STRING, NULL);
testEqual(std::string(sval) , "one");
}
void testPVD2DBR_array()
{
testDiag("testPVD2DBR_array()");
pvd::shared_vector<const pvd::uint32> arr;
{
pvd::shared_vector<pvd::uint32> iarr(3);
iarr[0] = 1; iarr[1] = 2; iarr[2] = 3;
arr = pvd::freeze(iarr);
}
pvd::PVStructure::shared_pointer top(pvd::ValueBuilder()
.add("value", pvd::static_shared_vector_cast<const void>(arr))
.buildPVStructure());
{
epicsUInt16 sarr[5];
{
long nreq = 5;
copyPVD2DBF(top->getSubFieldT("value"), sarr, DBF_SHORT, &nreq);
testEqual(nreq, 3);
}
testEqual(sarr[0], arr[0]);
testEqual(sarr[1], arr[1]);
testEqual(sarr[2], arr[2]);
}
{
char sarr[MAX_STRING_SIZE*5];
{
long nreq = 5;
copyPVD2DBF(top->getSubFieldT("value"), sarr, DBF_STRING, &nreq);
testEqual(nreq, 3);
}
testEqual(sarr[0*MAX_STRING_SIZE+0], '1');
testEqual(int(sarr[0*MAX_STRING_SIZE+1]), int('\0'));
testEqual(sarr[1*MAX_STRING_SIZE+0], '2');
testEqual(int(sarr[1*MAX_STRING_SIZE+1]), int('\0'));
testEqual(sarr[2*MAX_STRING_SIZE+0], '3');
testEqual(int(sarr[2*MAX_STRING_SIZE+1]), int('\0'));
}
}
template<pvd::ScalarType IN, pvd::ScalarType OUT>
void testDBR2PVD_scalar(typename pvd::meta::arg_type<typename pvd::ScalarTypeTraits<IN>::type>::type input,
typename pvd::meta::arg_type<typename pvd::ScalarTypeTraits<OUT>::type>::type expect)
{
typedef typename pvd::ScalarTypeTraits<IN>::type input_t;
typedef typename pvd::ScalarTypeTraits<OUT>::type output_t;
testDiag("testDBR2PVD_scalar(%s, %s)", pvd::ScalarTypeFunc::name(IN), pvd::ScalarTypeFunc::name(OUT));
pvd::PVStructure::shared_pointer top(pvd::getPVDataCreate()->createPVStructure(pvd::getFieldCreate()->createFieldBuilder()
->add("value", OUT) // initially zero or ""
->createStructure()));
pvd::PVStringArray::const_svector choices;
pvd::BitSet changed;
pvd::shared_vector<input_t> buf(1);
buf[0] = input;
copyDBF2PVD(pvd::static_shared_vector_cast<const void>(pvd::freeze(buf)),
top->getSubFieldT("value"), changed, choices);
output_t actual = top->getSubFieldT<pvd::PVScalar>("value")->getAs<output_t>();
testEqual(actual, expect);
testOk1(changed.get(top->getSubFieldT("value")->getFieldOffset()));
}
template<typename input_t>
void testDBR2PVD_enum(const input_t& input, pvd::int32 expect)
{
testDiag("testDBR2PVD_enum()");
pvd::shared_vector<const std::string> choices;
{
pvd::shared_vector<std::string> temp(3);
temp[0] = "zero";
temp[1] = "one";
temp[2] = "two";
choices = pvd::freeze(temp);
}
pvd::PVStructure::shared_pointer top(pvd::ValueBuilder()
.addNested("value")
.add<pvd::pvInt>("index", 0)
.add("choices", pvd::static_shared_vector_cast<const void>(choices))
.endNested()
.buildPVStructure());
pvd::BitSet changed;
pvd::shared_vector<input_t> buf(1);
buf[0] = input;
copyDBF2PVD(pvd::static_shared_vector_cast<const void>(pvd::freeze(buf)),
top->getSubFieldT("value"), changed, choices);
pvd::int32 actual = top->getSubFieldT<pvd::PVScalar>("value.index")->getAs<pvd::int32>();
testShow()<<top;
testShow()<<changed;
testEqual(actual, expect);
testOk1(changed.get(top->getSubFieldT("value.index")->getFieldOffset()));
}
void testDBR2PVD_array()
{
testDiag("testDBR2PVD_array()");
pvd::PVStructure::shared_pointer top(pvd::getPVDataCreate()->createPVStructure(pvd::getFieldCreate()->createFieldBuilder()
->addArray("value", pvd::pvInt) // initially zero or ""
->createStructure()));
pvd::PVStringArray::const_svector choices;
pvd::BitSet changed;
{
pvd::shared_vector<pvd::uint32> buf(3);
buf[0] = 1; buf[1] = 2; buf[2] = 3;
copyDBF2PVD(pvd::static_shared_vector_cast<const void>(pvd::freeze(buf)),
top->getSubFieldT("value"), changed, choices);
pvd::PVIntArray::const_svector arr(top->getSubFieldT<pvd::PVIntArray>("value")->view());
testEqual(arr.size(), 3u);
testEqual(arr[0], 1);
testEqual(arr[1], 2);
testEqual(arr[2], 3);
testOk1(changed.get(top->getSubFieldT("value")->getFieldOffset()));
}
changed.clear();
{
pvd::shared_vector<std::string> buf(4);
buf[0] = "4";
buf[1] = "5";
buf[2] = "6";
buf[3] = "7";
copyDBF2PVD(pvd::static_shared_vector_cast<const void>(pvd::freeze(buf)),
top->getSubFieldT("value"), changed, choices);
pvd::PVIntArray::const_svector arr(top->getSubFieldT<pvd::PVIntArray>("value")->view());
testEqual(arr.size(), 4u);
testEqual(arr[0], 4);
testEqual(arr[1], 5);
testEqual(arr[2], 6);
testEqual(arr[3], 7);
testOk1(changed.get(top->getSubFieldT("value")->getFieldOffset()));
}
}
}
MAIN(testdbf_copy)
{
testPlan(51);
try{
testPVD2DBR_scalar<pvd::pvDouble, double>(DBF_DOUBLE, 42.2, 42.2);
testPVD2DBR_scalar<pvd::pvDouble, pvd::uint16>(DBF_USHORT, 42.2, 42u);
testPVD2DBR_scalar<pvd::pvInt, pvd::int32>(DBF_LONG, 42, 42);
testPVD2DBR_scalar<pvd::pvInt, char[MAX_STRING_SIZE]>(DBF_STRING, 42, std::string("42"));
testPVD2DBR_scalar<pvd::pvUShort, pvd::uint16>(DBF_USHORT, 41u, 41);
testPVD2DBR_scalar<pvd::pvByte, pvd::int8>(DBF_CHAR, 41, 41);
testPVD2DBR_scalar<pvd::pvString, char[MAX_STRING_SIZE]>(DBF_STRING, "hello", std::string("hello"));
testPVD2DBR_scalar<pvd::pvString, pvd::int32>(DBF_LONG, "-100", -100);
//testPVD2DBR_scalar<pvd::pvBoolean, pvd::int8>(DBF_CHAR, true, 1);
testPVD2DBR_enum();
testPVD2DBR_array();
testDBR2PVD_scalar<pvd::pvDouble, pvd::pvDouble>(42.2, 42.2);
testDBR2PVD_scalar<pvd::pvUShort, pvd::pvDouble>(42u, 42.0);
testDBR2PVD_scalar<pvd::pvInt, pvd::pvInt>(-41, -41);
testDBR2PVD_scalar<pvd::pvString, pvd::pvInt>("-41", -41);
testDBR2PVD_scalar<pvd::pvString, pvd::pvString>("hello", "hello");
testDBR2PVD_scalar<pvd::pvInt, pvd::pvString>(-42, "-42");
testDBR2PVD_enum<pvd::uint32>(2, 2);
testDBR2PVD_enum<std::string>("two", 2);
testDBR2PVD_array();
}catch(std::exception& e){
testFail("Unexpected exception: %s", e.what());
}
return testDone();
}

View File

@ -1,9 +1,13 @@
#include <dbUnitTest.h>
#include <testMain.h>
#include <longinRecord.h>
#include <longoutRecord.h>
#include <pv/qsrv.h>
#include "utilities.h"
#include "pvalink.h"
#include "pv/qsrv.h"
namespace {
@ -11,66 +15,77 @@ void testGet()
{
testDiag("==== testGet ====");
longinRecord *li1 = (longinRecord*)testdbRecordPtr("src:li1");
while(!dbIsLinkConnected(&li1->inp))
testqsrvWaitForLinkEvent(&li1->inp);
testdbGetFieldEqual("target:li.VAL", DBF_LONG, 42);
testdbGetFieldEqual("src:li1.VAL", DBF_LONG, 0);
testdbGetFieldEqual("src:li1.VAL", DBF_LONG, 0); // value before first process
testdbGetFieldEqual("src:li1.INP", DBF_STRING, "{\"pva\":\"target:li\"}");
testdbPutFieldOk("src:li1.PROC", DBF_LONG, 1);
//TODO: wait for dbEvent queue update
epicsThreadSleep(0.1);
testdbGetFieldEqual("src:li1.VAL", DBF_LONG, 42);
testdbPutFieldOk("src:li1.INP", DBF_STRING, "{\"pva\":\"target:ai\"}");
testdbGetFieldEqual("src:li1.VAL", DBF_LONG, 42);
while(!dbIsLinkConnected(&li1->inp))
testqsrvWaitForLinkEvent(&li1->inp);
testdbGetFieldEqual("src:li1.VAL", DBF_LONG, 42); // changing link doesn't automatically process
//TODO: wait for pvalink worker update
epicsThreadSleep(0.1);
testdbPutFieldOk("src:li1.PROC", DBF_LONG, 1);
//TODO: wait for dbEvent queue update
epicsThreadSleep(0.1);
testdbGetFieldEqual("src:li1.VAL", DBF_LONG, 4);
testdbGetFieldEqual("src:li1.VAL", DBF_LONG, 4); // now it's changed
}
void testPut()
{
testDiag("==== testPut ====");
testdbGetFieldEqual("target:li2.VAL", DBF_LONG, 43);
testdbGetFieldEqual("src:li2.VAL", DBF_LONG, 0);
testdbGetFieldEqual("src:li2.INP", DBF_STRING, "{\"pva\":\"target:l2\"}");
testdbPutFieldOk("src:li2.VAL", DBF_LONG, 14);
longoutRecord *lo2 = (longoutRecord*)testdbRecordPtr("src:lo2");
while(!dbIsLinkConnected(&lo2->out))
testqsrvWaitForLinkEvent(&lo2->out);
testdbGetFieldEqual("target:li2.VAL", DBF_LONG, 43);
testdbGetFieldEqual("src:lo2.VAL", DBF_LONG, 0);
testdbGetFieldEqual("src:lo2.OUT", DBF_STRING, "{\"pva\":\"target:li2\"}");
testdbPutFieldOk("src:lo2.VAL", DBF_LONG, 14);
testdbGetFieldEqual("target:li2.VAL", DBF_LONG, 14);
testdbGetFieldEqual("src:li2.VAL", DBF_LONG, 14);
testdbGetFieldEqual("src:lo2.VAL", DBF_LONG, 14);
}
} // namespace
extern "C"
void p2pTestIoc_registerRecordDeviceDriver(struct dbBase *);
void pvaLinkTestIoc_registerRecordDeviceDriver(struct dbBase *);
MAIN(testpvalink)
{
testPlan(0);
testPlan(15);
// Disable PVA client provider, use local/QSRV provider
pvaLinkIsolate = 1;
pvaLinkDebug = 5;
try {
TestIOC IOC;
testdbReadDatabase("p2pTestIoc.dbd", NULL, NULL);
p2pTestIoc_registerRecordDeviceDriver(pdbbase);
testdbReadDatabase("pvaLinkTestIoc.dbd", NULL, NULL);
pvaLinkTestIoc_registerRecordDeviceDriver(pdbbase);
testdbReadDatabase("testpvalink.db", NULL, NULL);
IOC.init();
testGet();
testPut();
testqsrvShutdownOk();
IOC.shutdown();
testqsrvCleanup();
}catch(std::exception& e){
testFail("Unexpected exception: %s", e.what());

View File

@ -16,6 +16,6 @@ record(longin, "target:li2") {
field(VAL, "43")
}
record(longout, "src:li2") {
field(OUT, {pva:"target:li"})
record(longout, "src:lo2") {
field(OUT, {pva:"target:li2"})
}

View File

@ -289,8 +289,6 @@ void testScalar()
pvif_mbbi->get(mask);
testEqual(prec_mbbi->val, 2);
dbScanUnlock((dbCommon*)prec_mbbi);
iocshCmd("stopPVAServer");
}
void testPlain()