diff --git a/pdbApp/pvalink.h b/pdbApp/pvalink.h index 6029a0d..77141fd 100644 --- a/pdbApp/pvalink.h +++ b/pdbApp/pvalink.h @@ -154,6 +154,8 @@ struct pvaLinkChannel : public pvac::ClientChannel::MonitorCallback, bool queued; // added to WorkQueue bool debug; // set if any jlink::debug is set std::tr1::shared_ptr previous_root; + typedef std::set after_put_t; + after_put_t after_put; struct LinkSort { bool operator()(const pvaLink *L, const pvaLink *R) const; @@ -180,6 +182,12 @@ struct pvaLinkChannel : public pvac::ClientChannel::MonitorCallback, // pvac::ClientChanel::PutCallback virtual void putBuild(const epics::pvData::StructureConstPtr& build, pvac::ClientChannel::PutCallback::Args& args) OVERRIDE FINAL; virtual void putDone(const pvac::PutEvent& evt) OVERRIDE FINAL; + struct AfterPut : public epicsThreadRunable { + std::tr1::weak_ptr lc; + virtual ~AfterPut() {} + virtual void run() OVERRIDE FINAL; + }; + std::tr1::shared_ptr AP; private: virtual void run() OVERRIDE FINAL; void run_dbProcess(size_t idx); // idx is index in scan_records @@ -198,6 +206,7 @@ struct pvaLink : public pvaLinkConfig static size_t num_instances; bool alive; // attempt to catch some use after free + dbfType type; DBLINK * plink; // may be NULL diff --git a/pdbApp/pvalink_channel.cpp b/pdbApp/pvalink_channel.cpp index 551b581..6effccc 100644 --- a/pdbApp/pvalink_channel.cpp +++ b/pdbApp/pvalink_channel.cpp @@ -47,6 +47,7 @@ pvaLinkChannel::pvaLinkChannel(const pvaGlobal_t::channels_key_t &key, const pvd ,queued(false) ,debug(false) ,links_changed(false) + ,AP(new AfterPut) {} pvaLinkChannel::~pvaLinkChannel() { @@ -103,7 +104,7 @@ pvd::StructureConstPtr putRequestType = pvd::getFieldCreate()->createFieldBuilde void pvaLinkChannel::put(bool force) { pvd::PVStructurePtr pvReq(pvd::getPVDataCreate()->createPVStructure(putRequestType)); - pvReq->getSubFieldT("record._options.block")->put(false); // TODO: some way to expose completion... + pvReq->getSubFieldT("record._options.block")->put(!after_put.empty()); unsigned reqProcess = 0; bool doit = force; @@ -191,22 +192,70 @@ void pvaLinkChannel::putBuild(const epics::pvData::StructureConstPtr& build, pva args.root = top; } +namespace { +// soo much easier with c++11 std::shared_ptr... +struct AFLinker { + std::tr1::shared_ptr chan; + AFLinker(const std::tr1::shared_ptr& chan) :chan(chan) {} + void operator()(pvaLinkChannel::AfterPut *) { + chan.reset(); + } +}; +} // namespace + void pvaLinkChannel::putDone(const pvac::PutEvent& evt) { if(evt.event==pvac::PutEvent::Fail) { errlogPrintf("%s PVA link put ERROR: %s\n", key.first.c_str(), evt.message.c_str()); } - Guard G(lock); + bool needscans; + { + Guard G(lock); - DEBUG(this, <queue.add(AP); + } +} + +void pvaLinkChannel::AfterPut::run() +{ + std::set toscan; + std::tr1::shared_ptr link(lc.lock()); + if(!link) + return; + + { + Guard G(link->lock); + toscan.swap(link->after_put); + } + + for(after_put_t::iterator it=toscan.begin(), end=toscan.end(); + it!=end; ++it) + { + dbCommon *prec = *it; + dbScanLock(prec); + if(prec->pact) { // complete async. processing + (prec)->rset->process(prec); + + } else { + // maybe the result of "cancellation" or some record support logic error? + errlogPrintf("%s : not PACT when async PVA link completed. Logic error?\n", prec->name); + } + dbScanUnlock(prec); + } + } void pvaLinkChannel::monitorEvent(const pvac::MonitorEvent& evt) @@ -335,6 +384,10 @@ void pvaLinkChannel::run() if(!link->plink) continue; + // only scan on monitor update for input links + if(link->type!=DBF_INLINK) + continue; + // NPP and none/Default don't scan // PP, CP, and CPP do scan // PP and CPP only if SCAN=Passive diff --git a/pdbApp/pvalink_link.cpp b/pdbApp/pvalink_link.cpp index 62e6f5b..3bb14f7 100644 --- a/pdbApp/pvalink_link.cpp +++ b/pdbApp/pvalink_link.cpp @@ -7,6 +7,7 @@ namespace pvalink { pvaLink::pvaLink() :alive(true) + ,type((dbfType)-1) ,plink(0) ,used_scratch(false) ,used_queue(false) diff --git a/pdbApp/pvalink_lset.cpp b/pdbApp/pvalink_lset.cpp index 1797589..653b33d 100644 --- a/pdbApp/pvalink_lset.cpp +++ b/pdbApp/pvalink_lset.cpp @@ -20,10 +20,23 @@ using namespace pvalink; #define CHECK_VALID() if(!self->valid()) { DEBUG(self, <channelName<<" !valid"); return -1;} +dbfType getLinkType(DBLINK *plink) +{ + dbCommon *prec = plink->precord; + pdbRecordIterator iter(prec); + + for(long status = dbFirstField(&iter.ent, 0); !status; status = dbNextField(&iter.ent, 0)) { + if(iter.ent.pfield==plink) + return iter.ent.pflddes->field_type; + } + throw std::logic_error("DBLINK* corrupt"); +} + void pvaOpenLink(DBLINK *plink) { try { pvaLink* self((pvaLink*)plink->value.json.jlink); + self->type = getLinkType(plink); // workaround for Base not propagating info(base:lsetDebug to us { @@ -70,6 +83,7 @@ void pvaOpenLink(DBLINK *plink) // open new channel chan.reset(new pvaLinkChannel(key, pvRequest)); + chan->AP->lc = chan; pvaGlobal->channels.insert(std::make_pair(key, chan)); doOpen = true; } @@ -397,8 +411,8 @@ pvd::ScalarType DBR2PVD(short dbr) throw std::invalid_argument("Unsupported DBR code"); } -long pvaPutValue(DBLINK *plink, short dbrType, - const void *pbuffer, long nRequest) +long pvaPutValueX(DBLINK *plink, short dbrType, + const void *pbuffer, long nRequest, bool wait) { TRY { (void)self; @@ -437,6 +451,11 @@ long pvaPutValue(DBLINK *plink, short dbrType, self->used_scratch = true; +#ifdef USE_MULTILOCK + if(wait) + self->lchan->after_put.insert(plink->precord); +#endif + if(!self->defer) self->lchan->put(); DEBUG(self, <precord->name<<" "<channelName<<" "<lchan->op_put.valid()); @@ -445,6 +464,18 @@ long pvaPutValue(DBLINK *plink, short dbrType, return -1; } +long pvaPutValue(DBLINK *plink, short dbrType, + const void *pbuffer, long nRequest) +{ + return pvaPutValueX(plink, dbrType, pbuffer, nRequest, false); +} + +long pvaPutValueAsync(DBLINK *plink, short dbrType, + const void *pbuffer, long nRequest) +{ + return pvaPutValueX(plink, dbrType, pbuffer, nRequest, true); +} + void pvaScanForward(DBLINK *plink) { TRY { @@ -485,7 +516,7 @@ lset pva_lset = { &pvaGetAlarm, &pvaGetTimeStamp, &pvaPutValue, - NULL, + &pvaPutValueAsync, &pvaScanForward //&pvaReportLink, }; diff --git a/testApp/testpvalink.cpp b/testApp/testpvalink.cpp index 4cc37f5..74d20ef 100644 --- a/testApp/testpvalink.cpp +++ b/testApp/testpvalink.cpp @@ -61,6 +61,31 @@ void testPut() testdbGetFieldEqual("src:o2.VAL", DBF_INT64, 14LL); } +void testPutAsync() +{ +#ifdef USE_MULTILOCK + testDiag("==== testPutAsync ===="); + + int64outRecord *trig = (int64outRecord*)testdbRecordPtr("async:trig"); + + while(!dbIsLinkConnected(&trig->out)) + testqsrvWaitForLinkEvent(&trig->out); + + testMonitor* done = testMonitorCreate("async:after", DBE_VALUE, 0); + + testdbPutFieldOk("async:trig.PROC", DBF_LONG, 1); + testMonitorWait(done); + + testdbGetFieldEqual("async:trig", DBF_LONG, 1); + testdbGetFieldEqual("async:slow", DBF_LONG, 1); // pushed from async:trig + testdbGetFieldEqual("async:slow2", DBF_LONG, 2); + testdbGetFieldEqual("async:after", DBF_LONG, 3); + +#else + testSkip(5, "Not USE_MULTILOCK"); +#endif +} + } // namespace extern "C" @@ -68,7 +93,7 @@ void pvaLinkTestIoc_registerRecordDeviceDriver(struct dbBase *); MAIN(testpvalink) { - testPlan(15); + testPlan(20); // Disable PVA client provider, use local/QSRV provider pvaLinkIsolate = 1; @@ -83,6 +108,7 @@ MAIN(testpvalink) IOC.init(); testGet(); testPut(); + testPutAsync(); testqsrvShutdownOk(); IOC.shutdown(); testqsrvCleanup(); diff --git a/testApp/testpvalink.db b/testApp/testpvalink.db index ddf0593..9a5d797 100644 --- a/testApp/testpvalink.db +++ b/testApp/testpvalink.db @@ -19,3 +19,37 @@ record(int64in, "target:i2") { record(int64out, "src:o2") { field(OUT, {"pva":"target:i2"}) } + +# used by testPutAsync() +record(calc, "async:seq") { + field(CALC, "VAL+1") + field(VAL , "0") + field(TPRO, "1") +} + +record(longout, "async:trig") { + field(OMSL, "closed_loop") + field(DOL , "async:seq PP") + field(DTYP, "Async Soft Channel") + field(OUT , { "pva":{"pv":"async:slow.A", "proc":true} }) + field(FLNK, "async:after") + field(TPRO, "1") +} + +record(calcout, "async:slow") { + field(ODLY, "1") + field(CALC, "A") + field(FLNK, "async:slow2") + field(TPRO, "1") +} + +record(longin, "async:slow2") { + field(INP , "async:seq PP") + field(TPRO, "1") +} + +record(longin, "async:after") { + field(INP , "async:seq PP") + field(MDEL, "-1") + field(TPRO, "1") +}