pdb: group put w/ process

This commit is contained in:
Michael Davidsaver
2017-09-29 14:22:41 -05:00
parent b839d5dc6d
commit e0bac39667
9 changed files with 207 additions and 49 deletions

View File

@ -7,8 +7,8 @@
#include <pv/pvAccess.h>
template<typename T>
bool getS(const epics::pvData::PVStructurePtr& S, const char *name, T& val)
template<typename T, typename A>
bool getS(const epics::pvData::PVStructurePtr& S, const char *name, A& val)
{
epics::pvData::PVScalarPtr F(S->getSubField<epics::pvData::PVScalar>(name));
if(F)

View File

@ -1,5 +1,6 @@
#!../../bin/linux-x86_64-debug/softIocPVA
dbLoadRecords("image.db","N=TST:image1")
dbLoadRecords("table.db","N=TST:table1")
iocInit()

View File

@ -0,0 +1,45 @@
record(aai, "$(N)Labels_") {
field(FTVL, "STRING")
field(NELM, "2")
field(INP , {const:["Column A", "Column B"]})
info(Q:group, {
"$(N)Tbl":{
+id:"epics:nt/NTTable:1.0",
"labels":{+type:"plain", +channel:"VAL"}
}
})
field(TPRO, "1")
}
record(aao, "$(N)A") {
field(FTVL, "DOUBLE")
field(NELM, "10")
info(Q:group, {
"$(N)Tbl":{
"value.A":{+type:"plain", +channel:"VAL", +putorder:1}
}
})
field(TPRO, "1")
}
record(aao, "$(N)B") {
field(FTVL, "DOUBLE")
field(NELM, "10")
info(Q:group, {
"$(N)Tbl":{
"":{+type:"meta", +channel:"VAL"},
"value.B":{+type:"plain", +channel:"VAL", +putorder:1}
}
})
field(TPRO, "1")
}
record(bo, "$(N)Save") {
field(TPRO, "1")
info(Q:group, {
"$(N)Tbl":{
"_save":{+type:"plain", +channel:"VAL", +putorder:2}
}
})
}

View File

@ -47,16 +47,17 @@ struct Splitter {
struct GroupMemberInfo {
// consumes builder
GroupMemberInfo(const std::string& a, const std::string& b, p2p::auto_ptr<PVIFBuilder>& builder)
:pvname(a), pvfldname(b), builder(PTRMOVE(builder)) {}
:pvname(a), pvfldname(b), builder(PTRMOVE(builder)), putorder(0) {}
std::string pvname, // aka. name passed to dbChannelOpen()
pvfldname; // PVStructure sub-field
std::string structID; // ID to assign to sub-field
std::set<std::string> triggers; // names in GroupInfo::members_names which are post()d on events from pvfldname
std::tr1::shared_ptr<PVIFBuilder> builder; // not actually shared, but allows us to be copyable
int putorder;
bool operator<(const GroupMemberInfo& o) const {
return pvfldname<o.pvfldname;
return putorder<o.putorder;
}
};
@ -216,11 +217,9 @@ struct PDBProcessor
GroupInfo::members_map_t::const_iterator oldgrp(curgroup->members_map.find(fldname));
if(oldgrp!=curgroup->members_map.end()) {
const GroupMemberInfo& other = curgroup->members[oldgrp->second];
fprintf(stderr, "%s.%s ignoring duplicate mapping %s%s and %s\n",
fprintf(stderr, "%s.%s ignoring duplicate mapping %s%s\n",
grpname.c_str(), fldname.c_str(),
recbase.c_str(), fld.channel.c_str(),
other.pvname.c_str());
recbase.c_str(), fld.channel.c_str());
continue;
}
@ -228,6 +227,7 @@ struct PDBProcessor
curgroup->members.push_back(GroupMemberInfo(fld.channel.empty() ? fld.channel : recbase + fld.channel, fldname, builder));
curgroup->members.back().structID = fld.id;
curgroup->members.back().putorder = fld.putorder;
curgroup->members_map[fldname] = (size_t)-1; // placeholder see below
if(PDBProviderDebug>2) {
@ -278,7 +278,7 @@ struct PDBProcessor
}
// re-sort GroupInfo::members to ensure the shorter names appear first
// allows use of 'existing' PVIFBuilder on leaves
// allows use of 'existing' PVIFBuilder on leaves.
for(groups_t::iterator it = groups.begin(), end = groups.end(); it!=end; ++it)
{
GroupInfo& info = it->second;
@ -294,6 +294,8 @@ struct PDBProcessor
}
resolveTriggers();
// must not re-sort members after this point as resolveTriggers()
// has stored array indicies.
}
};
@ -379,6 +381,12 @@ PDBProvider::PDBProvider(const epics::pvAccess::Configuration::shared_pointer &)
DBCH chan;
if(!mem.pvname.empty()) {
DBCH temp(mem.pvname);
unsigned ftype = dbChannelFieldType(temp);
// can't include in multi-locking
if(ftype>=DBF_INLINK && ftype<=DBF_FWDLINK)
throw std::runtime_error("Can't include link fields in group");
chan.swap(temp);
}
@ -396,6 +404,7 @@ PDBProvider::PDBProvider(const epics::pvAccess::Configuration::shared_pointer &)
members_map[mem.pvfldname] = J;
PDBGroupPV::Info& info = members[J];
info.allowProc = mem.putorder != std::numeric_limits<int>::min();
info.builder = PTRMOVE(mem.builder);
assert(info.builder.get());

View File

@ -147,27 +147,42 @@ PDBGroupChannel::createMonitor(
PDBGroupPut::PDBGroupPut(const PDBGroupChannel::shared_pointer& channel,
const requester_type::weak_pointer& requester,
const requester_type::shared_pointer& requester,
const epics::pvData::PVStructure::shared_pointer &pvReq)
:channel(channel)
,requester(requester)
,atomic(channel->pv->pgatomic)
,doWait(false)
,doProc(PVIF::ProcPassive)
,changed(new pvd::BitSet(channel->fielddesc->getNumberFields()))
,pvf(pvd::getPVDataCreate()->createPVStructure(channel->fielddesc))
{
epics::atomic::increment(num_instances);
pvd::PVScalarPtr atomicopt(pvReq->getSubField<pvd::PVScalar>("record._options.atomic"));
if(atomicopt) {
try {
atomic = atomicopt->getAs<pvd::boolean>();
}catch(std::exception& e){
requester_type::shared_pointer req(requester.lock());
if(req)
req->message("Unable to parse 'atomic' request option. Default is false.", pvd::warningMessage);
try {
getS<pvd::boolean>(pvReq, "record._options.atomic", atomic);
getS<pvd::boolean>(pvReq, "record._options.block", doWait);
std::string proccmd;
if(getS<std::string>(pvReq, "record._options.process", proccmd)) {
if(proccmd=="true") {
doProc = PVIF::ProcForce;
} else if(proccmd=="false") {
doProc = PVIF::ProcInhibit;
doWait = false; // no point in waiting
} else if(proccmd=="passive") {
doProc = PVIF::ProcPassive;
} else {
requester->message("process= expects: true|false|passive", pva::warningMessage);
}
}
}catch(std::exception& e){
requester->message(std::string("Error processing request options: ")+e.what());
}
pvf->getSubFieldT<pvd::PVBoolean>("record._options.atomic")->put(atomic);
const size_t npvs = channel->pv->members.size();
pvif.resize(npvs);
for(size_t i=0; i<npvs; i++)
@ -193,27 +208,36 @@ void PDBGroupPut::put(pvd::PVStructure::shared_pointer const & value,
for(size_t i=0; i<npvs; i++)
{
PDBGroupPV::Info& info = channel->pv->members[i];
if(!info.allowProc) continue;
putpvif[i].reset(info.builder->attach(info.chan, value, info.attachment));
}
pvd::Status ret;
if(atomic) {
DBManyLocker L(channel->pv->locker);
for(size_t i=0; i<npvs; i++)
putpvif[i]->get(*changed);
for(size_t i=0; ret && i<npvs; i++) {
if(!putpvif[i].get()) continue;
ret |= putpvif[i]->get(*changed, doProc);
}
} else {
for(size_t i=0; i<npvs; i++)
for(size_t i=0; ret && i<npvs; i++)
{
if(!putpvif[i].get()) continue;
PDBGroupPV::Info& info = channel->pv->members[i];
DBScanLocker L(dbChannelRecord(info.chan));
putpvif[i]->get(*changed);
ret |= putpvif[i]->get(*changed, info.allowProc ? doProc : PVIF::ProcInhibit);
}
}
requester_type::shared_pointer req(requester.lock());
if(req)
req->putDone(pvd::Status(), shared_from_this());
req->putDone(ret, shared_from_this());
}
void PDBGroupPut::get()

View File

@ -91,9 +91,9 @@ struct epicsShareClass PDBGroupPV : public PDBPV
DBManyLock locker; // lock only those channels being triggered
p2p::auto_ptr<PVIF> pvif;
DBEvent evt_VALUE, evt_PROPERTY;
bool had_initial_VALUE, had_initial_PROPERTY;
bool had_initial_VALUE, had_initial_PROPERTY, allowProc;
Info() :had_initial_VALUE(false), had_initial_PROPERTY(false) {}
Info() :had_initial_VALUE(false), had_initial_PROPERTY(false), allowProc(false) {}
};
epics::pvData::shared_vector<Info> members;
@ -150,7 +150,10 @@ struct PDBGroupPut : public epics::pvAccess::ChannelPut,
PDBGroupChannel::shared_pointer channel;
requester_type::weak_pointer requester;
bool atomic;
// effectively const after ctor
bool atomic, doWait;
PVIF::proc_t doProc;
epics::pvData::BitSetPtr changed;
epics::pvData::PVStructurePtr pvf;
std::vector<std::tr1::shared_ptr<PVIF> > pvif;
@ -158,7 +161,7 @@ struct PDBGroupPut : public epics::pvAccess::ChannelPut,
static size_t num_instances;
PDBGroupPut(const PDBGroupChannel::shared_pointer &channel,
const requester_type::weak_pointer &requester,
const epics::pvAccess::ChannelPutRequester::shared_pointer &requester,
const epics::pvData::PVStructure::shared_pointer& pvReq);
virtual ~PDBGroupPut();

View File

@ -220,16 +220,14 @@ PDBSinglePut::PDBSinglePut(const PDBSingleChannel::shared_pointer &channel,
(dbChannelFldDes(chan)->process_passive &&
precord->scan == 0);
pvd::boolean wait = doWait;
try {
getS(pvReq, "record._options.block", wait);
getS<pvd::boolean>(pvReq, "record._options.block", doWait);
} catch(std::runtime_error& e) {
requester->message(std::string("block= not understood : ")+e.what(), pva::warningMessage);
}
doWait = wait;
std::string proccmd;
if(getS(pvReq, "record._options.process", proccmd)) {
if(getS<std::string>(pvReq, "record._options.process", proccmd)) {
if(proccmd=="true") {
doProc = true;
doProcForce = true;

View File

@ -304,16 +304,34 @@ void getValue(dbChannel *chan, pvd::PVScalar* value)
void getValue(dbChannel *chan, pvd::PVScalarArray* value)
{
short dbr = dbChannelFinalFieldType(chan);
pvd::shared_vector<const void> buf;
assert(dbr!=DBR_STRING);
if(dbr!=DBR_STRING) {
pvd::shared_vector<const void> buf;
value->getAs(buf);
long nReq = buf.size()/pvd::ScalarTypeFunc::elementSize(value->getScalarArray()->getElementType());
value->getAs(buf);
long nReq = buf.size()/pvd::ScalarTypeFunc::elementSize(value->getScalarArray()->getElementType());
long status = dbChannelPut(chan, dbr, buf.data(), nReq);
if(status)
throw std::runtime_error("dbChannelPut for meta fails");
long status = dbChannelPut(chan, dbr, buf.data(), nReq);
if(status)
throw std::runtime_error("dbChannelPut fails");
} else {
pvd::shared_vector<const std::string> buf;
value->getAs(buf);
std::vector<char> temp(buf.size()*MAX_STRING_SIZE);
for(size_t i=0, N=buf.size(); i<N; i++)
{
strncpy(&temp[i*MAX_STRING_SIZE], buf[i].c_str(), MAX_STRING_SIZE-1);
temp[i*MAX_STRING_SIZE + MAX_STRING_SIZE-1] = '\0';
}
long status = dbChannelPut(chan, dbr, &temp[0], buf.size());
if(status)
throw std::runtime_error("dbChannelPut fails");
}
}
void putValue(dbChannel *chan, pvd::PVScalarArray* value, db_field_log *pfl)
@ -323,17 +341,33 @@ void putValue(dbChannel *chan, pvd::PVScalarArray* value, db_field_log *pfl)
long nReq = dbChannelFinalElements(chan);
const pvd::ScalarType etype = value->getScalarArray()->getElementType();
assert(dbr!=DBR_STRING);
if(dbr!=DBR_STRING) {
pvd::shared_vector<void> buf(pvd::ScalarTypeFunc::allocArray(etype, nReq)); // TODO: pool?
pvd::shared_vector<void> buf(pvd::ScalarTypeFunc::allocArray(etype, nReq)); // TODO: pool?
long status = dbChannelGet(chan, dbr, buf.data(), NULL, &nReq, pfl);
if(status)
throw std::runtime_error("dbChannelGet for meta fails");
long status = dbChannelGet(chan, dbr, buf.data(), NULL, &nReq, pfl);
if(status)
throw std::runtime_error("dbChannelGet for value fails");
buf.slice(0, nReq*pvd::ScalarTypeFunc::elementSize(etype));
buf.slice(0, nReq*pvd::ScalarTypeFunc::elementSize(etype));
value->putFrom(pvd::freeze(buf));
value->putFrom(pvd::freeze(buf));
} else {
std::vector<char> temp(nReq*MAX_STRING_SIZE);
long status = dbChannelGet(chan, dbr, &temp[0], NULL, &nReq, pfl);
if(status)
throw std::runtime_error("dbChannelGet for value fails");
pvd::shared_vector<std::string> buf(nReq);
for(long i=0; i<nReq; i++) {
temp[i*MAX_STRING_SIZE + MAX_STRING_SIZE-1] = '\0';
buf[i] = std::string(&temp[i*MAX_STRING_SIZE]);
}
value->putFrom(pvd::freeze(buf));
}
}
template<typename META>
@ -475,10 +509,12 @@ struct PVIFScalarNumeric : public PVIF
}
}
virtual void get(const epics::pvData::BitSet& mask) OVERRIDE FINAL
virtual pvd::Status get(const epics::pvData::BitSet& mask, proc_t proc) OVERRIDE FINAL
{
if(mask.logical_and(pvmeta.maskVALUEPut))
getValue(pvmeta.chan, pvmeta.value.get());
return PVIF::get(mask, proc);
}
virtual unsigned dbe(const epics::pvData::BitSet& mask) OVERRIDE FINAL
@ -629,10 +665,11 @@ struct PVIFPlain : public PVIF
}
}
virtual void get(const epics::pvData::BitSet& mask)
virtual pvd::Status get(const epics::pvData::BitSet& mask, proc_t proc)
{
if(mask.get(fieldOffset))
getValue(channel, field.get());
return PVIF::get(mask, proc);
}
virtual unsigned dbe(const epics::pvData::BitSet& mask)
@ -763,9 +800,10 @@ struct PVIFMeta : public PVIF
putTime(meta, dbe, pfl);
}
virtual void get(const epics::pvData::BitSet& mask)
virtual pvd::Status get(const epics::pvData::BitSet& mask, proc_t proc)
{
// can't put time/alarm
return pvd::Status::Ok;
}
virtual unsigned dbe(const epics::pvData::BitSet& mask)
@ -821,6 +859,39 @@ struct MetaBuilder : public PVIFBuilder
}//namespace
pvd::Status PVIF::get(const epics::pvData::BitSet& mask, proc_t proc)
{
dbCommon *precord = dbChannelRecord(chan);
bool tryproc = proc!=ProcPassive ? proc==ProcForce :
dbChannelField(chan) == &precord->proc ||
(dbChannelFldDes(chan)->process_passive &&
precord->scan == 0);
pvd::Status ret;
if (tryproc) {
if (precord->pact) {
if (precord->tpro)
printf("%s: Active %s\n",
epicsThreadGetNameSelf(), precord->name);
precord->rpro = TRUE;
} else {
/* indicate that dbPutField called dbProcess */
precord->putf = TRUE;
long err = dbProcess(precord);
if(err) {
char buf[32];
errSymLookup(err, buf, sizeof(buf));
std::ostringstream msg;
msg<<"process error : "<<buf;
ret = pvd::Status::error(msg.str());
}
}
}
return ret;
}
epics::pvData::FieldBuilderPtr
PVIFBuilder::dtype(epics::pvData::FieldBuilderPtr& builder,

View File

@ -10,6 +10,7 @@
#include <dbEvent.h>
#include <epicsVersion.h>
#include <pv/status.h>
#include <pv/bitSet.h>
#include <pv/pvData.h>
@ -298,12 +299,18 @@ struct epicsShareClass PVIF {
dbChannel * const chan; // borrowed reference from PVIFBuilder
enum proc_t {
ProcPassive,
ProcInhibit,
ProcForce,
};
//! Copy from PDB record to pvalue (call dbChannelGet())
//! caller must lock record
virtual void put(epics::pvData::BitSet& mask, unsigned dbe, db_field_log *pfl) =0;
//! Copy from pvalue to PDB record (call dbChannelPut())
//! caller must lock record
virtual void get(const epics::pvData::BitSet& mask) =0;
virtual epics::pvData::Status get(const epics::pvData::BitSet& mask, proc_t proc=ProcInhibit) =0;
//! Calculate DBE mask from changed bitset
virtual unsigned dbe(const epics::pvData::BitSet& mask) =0;