enable "Async Soft Channel" for output links

Add asynchronous output link support.
When triggered, re-process record(s)
after put completes
This commit is contained in:
Michael Davidsaver
2021-09-09 12:53:33 -07:00
parent cb1b41c1b2
commit 122e9bdbb4
6 changed files with 165 additions and 11 deletions

View File

@ -154,6 +154,8 @@ struct pvaLinkChannel : public pvac::ClientChannel::MonitorCallback,
bool queued; // added to WorkQueue
bool debug; // set if any jlink::debug is set
std::tr1::shared_ptr<const void> previous_root;
typedef std::set<dbCommon*> after_put_t;
after_put_t after_put;
struct LinkSort {
bool operator()(const pvaLink *L, const pvaLink *R) const;
@ -180,6 +182,12 @@ struct pvaLinkChannel : public pvac::ClientChannel::MonitorCallback,
// pvac::ClientChanel::PutCallback
virtual void putBuild(const epics::pvData::StructureConstPtr& build, pvac::ClientChannel::PutCallback::Args& args) OVERRIDE FINAL;
virtual void putDone(const pvac::PutEvent& evt) OVERRIDE FINAL;
struct AfterPut : public epicsThreadRunable {
std::tr1::weak_ptr<pvaLinkChannel> lc;
virtual ~AfterPut() {}
virtual void run() OVERRIDE FINAL;
};
std::tr1::shared_ptr<AfterPut> AP;
private:
virtual void run() OVERRIDE FINAL;
void run_dbProcess(size_t idx); // idx is index in scan_records
@ -198,6 +206,7 @@ struct pvaLink : public pvaLinkConfig
static size_t num_instances;
bool alive; // attempt to catch some use after free
dbfType type;
DBLINK * plink; // may be NULL

View File

@ -47,6 +47,7 @@ pvaLinkChannel::pvaLinkChannel(const pvaGlobal_t::channels_key_t &key, const pvd
,queued(false)
,debug(false)
,links_changed(false)
,AP(new AfterPut)
{}
pvaLinkChannel::~pvaLinkChannel() {
@ -103,7 +104,7 @@ pvd::StructureConstPtr putRequestType = pvd::getFieldCreate()->createFieldBuilde
void pvaLinkChannel::put(bool force)
{
pvd::PVStructurePtr pvReq(pvd::getPVDataCreate()->createPVStructure(putRequestType));
pvReq->getSubFieldT<pvd::PVBoolean>("record._options.block")->put(false); // TODO: some way to expose completion...
pvReq->getSubFieldT<pvd::PVBoolean>("record._options.block")->put(!after_put.empty());
unsigned reqProcess = 0;
bool doit = force;
@ -191,22 +192,70 @@ void pvaLinkChannel::putBuild(const epics::pvData::StructureConstPtr& build, pva
args.root = top;
}
namespace {
// soo much easier with c++11 std::shared_ptr...
struct AFLinker {
std::tr1::shared_ptr<pvaLinkChannel> chan;
AFLinker(const std::tr1::shared_ptr<pvaLinkChannel>& chan) :chan(chan) {}
void operator()(pvaLinkChannel::AfterPut *) {
chan.reset();
}
};
} // namespace
void pvaLinkChannel::putDone(const pvac::PutEvent& evt)
{
if(evt.event==pvac::PutEvent::Fail) {
errlogPrintf("%s PVA link put ERROR: %s\n", key.first.c_str(), evt.message.c_str());
}
Guard G(lock);
bool needscans;
{
Guard G(lock);
DEBUG(this, <<key.first<<" Put result "<<evt.event);
DEBUG(this, <<key.first<<" Put result "<<evt.event);
op_put = pvac::Operation();
needscans = !after_put.empty();
op_put = pvac::Operation();
if(evt.event==pvac::PutEvent::Success) {
// see if we need start a queue'd put
put();
if(evt.event==pvac::PutEvent::Success) {
// see if we need start a queue'd put
put();
}
}
if(needscans) {
pvaGlobal->queue.add(AP);
}
}
void pvaLinkChannel::AfterPut::run()
{
std::set<dbCommon*> toscan;
std::tr1::shared_ptr<pvaLinkChannel> link(lc.lock());
if(!link)
return;
{
Guard G(link->lock);
toscan.swap(link->after_put);
}
for(after_put_t::iterator it=toscan.begin(), end=toscan.end();
it!=end; ++it)
{
dbCommon *prec = *it;
dbScanLock(prec);
if(prec->pact) { // complete async. processing
(prec)->rset->process(prec);
} else {
// maybe the result of "cancellation" or some record support logic error?
errlogPrintf("%s : not PACT when async PVA link completed. Logic error?\n", prec->name);
}
dbScanUnlock(prec);
}
}
void pvaLinkChannel::monitorEvent(const pvac::MonitorEvent& evt)
@ -335,6 +384,10 @@ void pvaLinkChannel::run()
if(!link->plink) continue;
// only scan on monitor update for input links
if(link->type!=DBF_INLINK)
continue;
// NPP and none/Default don't scan
// PP, CP, and CPP do scan
// PP and CPP only if SCAN=Passive

View File

@ -7,6 +7,7 @@ namespace pvalink {
pvaLink::pvaLink()
:alive(true)
,type((dbfType)-1)
,plink(0)
,used_scratch(false)
,used_queue(false)

View File

@ -20,10 +20,23 @@ using namespace pvalink;
#define CHECK_VALID() if(!self->valid()) { DEBUG(self, <<CURRENT_FUNCTION<<" "<<self->channelName<<" !valid"); return -1;}
dbfType getLinkType(DBLINK *plink)
{
dbCommon *prec = plink->precord;
pdbRecordIterator iter(prec);
for(long status = dbFirstField(&iter.ent, 0); !status; status = dbNextField(&iter.ent, 0)) {
if(iter.ent.pfield==plink)
return iter.ent.pflddes->field_type;
}
throw std::logic_error("DBLINK* corrupt");
}
void pvaOpenLink(DBLINK *plink)
{
try {
pvaLink* self((pvaLink*)plink->value.json.jlink);
self->type = getLinkType(plink);
// workaround for Base not propagating info(base:lsetDebug to us
{
@ -70,6 +83,7 @@ void pvaOpenLink(DBLINK *plink)
// open new channel
chan.reset(new pvaLinkChannel(key, pvRequest));
chan->AP->lc = chan;
pvaGlobal->channels.insert(std::make_pair(key, chan));
doOpen = true;
}
@ -397,8 +411,8 @@ pvd::ScalarType DBR2PVD(short dbr)
throw std::invalid_argument("Unsupported DBR code");
}
long pvaPutValue(DBLINK *plink, short dbrType,
const void *pbuffer, long nRequest)
long pvaPutValueX(DBLINK *plink, short dbrType,
const void *pbuffer, long nRequest, bool wait)
{
TRY {
(void)self;
@ -437,6 +451,11 @@ long pvaPutValue(DBLINK *plink, short dbrType,
self->used_scratch = true;
#ifdef USE_MULTILOCK
if(wait)
self->lchan->after_put.insert(plink->precord);
#endif
if(!self->defer) self->lchan->put();
DEBUG(self, <<plink->precord->name<<" "<<CURRENT_FUNCTION<<" "<<self->channelName<<" "<<self->lchan->op_put.valid());
@ -445,6 +464,18 @@ long pvaPutValue(DBLINK *plink, short dbrType,
return -1;
}
long pvaPutValue(DBLINK *plink, short dbrType,
const void *pbuffer, long nRequest)
{
return pvaPutValueX(plink, dbrType, pbuffer, nRequest, false);
}
long pvaPutValueAsync(DBLINK *plink, short dbrType,
const void *pbuffer, long nRequest)
{
return pvaPutValueX(plink, dbrType, pbuffer, nRequest, true);
}
void pvaScanForward(DBLINK *plink)
{
TRY {
@ -485,7 +516,7 @@ lset pva_lset = {
&pvaGetAlarm,
&pvaGetTimeStamp,
&pvaPutValue,
NULL,
&pvaPutValueAsync,
&pvaScanForward
//&pvaReportLink,
};

View File

@ -61,6 +61,31 @@ void testPut()
testdbGetFieldEqual("src:o2.VAL", DBF_INT64, 14LL);
}
void testPutAsync()
{
#ifdef USE_MULTILOCK
testDiag("==== testPutAsync ====");
int64outRecord *trig = (int64outRecord*)testdbRecordPtr("async:trig");
while(!dbIsLinkConnected(&trig->out))
testqsrvWaitForLinkEvent(&trig->out);
testMonitor* done = testMonitorCreate("async:after", DBE_VALUE, 0);
testdbPutFieldOk("async:trig.PROC", DBF_LONG, 1);
testMonitorWait(done);
testdbGetFieldEqual("async:trig", DBF_LONG, 1);
testdbGetFieldEqual("async:slow", DBF_LONG, 1); // pushed from async:trig
testdbGetFieldEqual("async:slow2", DBF_LONG, 2);
testdbGetFieldEqual("async:after", DBF_LONG, 3);
#else
testSkip(5, "Not USE_MULTILOCK");
#endif
}
} // namespace
extern "C"
@ -68,7 +93,7 @@ void pvaLinkTestIoc_registerRecordDeviceDriver(struct dbBase *);
MAIN(testpvalink)
{
testPlan(15);
testPlan(20);
// Disable PVA client provider, use local/QSRV provider
pvaLinkIsolate = 1;
@ -83,6 +108,7 @@ MAIN(testpvalink)
IOC.init();
testGet();
testPut();
testPutAsync();
testqsrvShutdownOk();
IOC.shutdown();
testqsrvCleanup();

View File

@ -19,3 +19,37 @@ record(int64in, "target:i2") {
record(int64out, "src:o2") {
field(OUT, {"pva":"target:i2"})
}
# used by testPutAsync()
record(calc, "async:seq") {
field(CALC, "VAL+1")
field(VAL , "0")
field(TPRO, "1")
}
record(longout, "async:trig") {
field(OMSL, "closed_loop")
field(DOL , "async:seq PP")
field(DTYP, "Async Soft Channel")
field(OUT , { "pva":{"pv":"async:slow.A", "proc":true} })
field(FLNK, "async:after")
field(TPRO, "1")
}
record(calcout, "async:slow") {
field(ODLY, "1")
field(CALC, "A")
field(FLNK, "async:slow2")
field(TPRO, "1")
}
record(longin, "async:slow2") {
field(INP , "async:seq PP")
field(TPRO, "1")
}
record(longin, "async:after") {
field(INP , "async:seq PP")
field(MDEL, "-1")
field(TPRO, "1")
}