From 2c6599d4c947a3a2b3d44d76350ec27dfb200dfe Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 5 Sep 2017 18:52:35 -0500 Subject: [PATCH] singlepv dbnotify --- pdbApp/pdbsingle.cpp | 113 ++++++++++++++++++++++++++++++++++++++++--- pdbApp/pdbsingle.h | 11 +++-- pdbApp/pvif.cpp | 25 +++++----- pdbApp/pvif.h | 2 + 4 files changed, 126 insertions(+), 25 deletions(-) diff --git a/pdbApp/pdbsingle.cpp b/pdbApp/pdbsingle.cpp index 210ca89..11dddae 100644 --- a/pdbApp/pdbsingle.cpp +++ b/pdbApp/pdbsingle.cpp @@ -1,3 +1,5 @@ +#include + #include #include #include @@ -141,6 +143,58 @@ PDBSingleChannel::createMonitor( } +static +int single_put_callback(struct processNotify *notify,notifyPutType type) +{ + PDBSinglePut *self = (PDBSinglePut*)notify->usrPvt; + + if(notify->status!=notifyOK || type==putDisabledType) return 0; + + // we've previously ensured that wait_changed&DBE_VALUE is true + + switch(type) { + case putFieldType: + { + DBScanLocker L(notify->chan); + self->wait_pvif->get(*self->wait_changed); + } + break; + case putType: + self->wait_pvif->get(*self->wait_changed); + break; + } + return 1; +} + +static +void single_done_callback(struct processNotify *notify) +{ + PDBSinglePut *self = (PDBSinglePut*)notify->usrPvt; + pvd::Status sts; + + // busy state should be 1 (normal completion) or 2 (if cancel in progress) + if(epics::atomic::compareAndSwap(self->notifyBusy, 1, 0)==0) { + std::cerr<<"PDBSinglePut dbNotify state error?\n"; + } + + switch(notify->status) { + case notifyOK: + break; + case notifyCanceled: + return; // skip notification + case notifyError: + sts = pvd::Status::error("Error in dbNotify"); + break; + case notifyPutDisabled: + sts = pvd::Status::error("Put disabled"); + break; + } + + PDBSinglePut::requester_type::shared_pointer req(self->requester.lock()); + if(req) + req->putDone(sts, self->shared_from_this()); +} + PDBSinglePut::PDBSinglePut(const PDBSingleChannel::shared_pointer &channel, const pva::ChannelPutRequester::shared_pointer &requester, const pvd::PVStructure::shared_pointer &pvReq) @@ -149,6 +203,7 @@ PDBSinglePut::PDBSinglePut(const PDBSingleChannel::shared_pointer &channel, ,changed(new pvd::BitSet(channel->fielddesc->getNumberFields())) ,pvf(pvd::getPVDataCreate()->createPVStructure(channel->fielddesc)) ,pvif(PVIF::attach(channel->pv->chan, pvf)) + ,notifyBusy(0) ,doProc(true) ,doProcForce(false) ,doWait(false) @@ -161,27 +216,39 @@ PDBSinglePut::PDBSinglePut(const PDBSingleChannel::shared_pointer &channel, precord->scan == 0); pvd::boolean wait; - getS(pvReq, "_record.options.block", wait); + try { + getS(pvReq, "record._options.block", wait); + } catch(std::runtime_error& e) { + requester->message(std::string("block= not understood : ")+e.what(), pva::warningMessage); + } + doWait = wait; std::string proccmd; - if(getS(pvReq, "_record.options.process", proccmd)) { - if(proccmd=="yes" || proccmd=="1") { + if(getS(pvReq, "record._options.process", proccmd)) { + if(proccmd=="true") { doProc = true; doProcForce = true; - } else if(proccmd=="no" || proccmd=="0") { + } else if(proccmd=="false") { doProc = false; doProcForce = true; + doWait = false; // no point in waiting } else if(proccmd=="passive") { doProcForce = false; + } else { + requester->message("process= expects: true|false|passive", pva::warningMessage); } } memset((void*)¬ify, 0, sizeof(notify)); notify.usrPvt = (void*)this; + notify.chan = chan; + notify.putCallback = &single_put_callback; + notify.doneCallback = &single_done_callback; } PDBSinglePut::~PDBSinglePut() { + cancel(); epics::atomic::decrement(num_instances); } @@ -190,6 +257,8 @@ void PDBSinglePut::put(pvd::PVStructure::shared_pointer const & value, { dbChannel *chan = channel->pv->chan; dbFldDes *fld = dbChannelFldDes(chan); + dbCommon *precord = dbChannelRecord(chan); + pvd::Status ret; if(fld->field_type>=DBF_INLINK && fld->field_type<=DBF_FWDLINK) { try{ @@ -203,6 +272,30 @@ void PDBSinglePut::put(pvd::PVStructure::shared_pointer const & value, ret = pvd::Status(pvd::Status::error(strm.str())); } + } else if(doWait) { + // TODO: dbNotify doesn't allow us for force processing + + // assume value may be a different struct each time + std::auto_ptr putpvif(PVIF::attach(channel->pv->chan, value)); + unsigned mask = putpvif->dbe(*changed); + + if(mask&~DBE_VALUE) { + requester_type::shared_pointer req(requester.lock()); + if(req) + req->message("wait=true only supports .value", pva::warningMessage); + } + + if(epics::atomic::compareAndSwap(notifyBusy, 0, 1)!=0) + throw std::logic_error("Previous put() not complete"); + + notify.requestType = (mask&DBE_VALUE) ? putProcessRequest : processRequest; + + wait_pvif = putpvif; + wait_changed = changed; + + dbProcessNotify(¬ify); + + return; // skip notification } else { // assume value may be a different struct each time std::auto_ptr putpvif(PVIF::attach(channel->pv->chan, value)); @@ -210,8 +303,6 @@ void PDBSinglePut::put(pvd::PVStructure::shared_pointer const & value, DBScanLocker L(chan); putpvif->get(*changed); - dbCommon *precord = dbChannelRecord(chan); - bool tryproc = doProcForce ? doProc : dbChannelField(chan) == &precord->proc || (dbChannelFldDes(chan)->process_passive && precord->scan == 0); @@ -235,6 +326,16 @@ void PDBSinglePut::put(pvd::PVStructure::shared_pointer const & value, req->putDone(pvd::Status(), shared_from_this()); } +void PDBSinglePut::cancel() +{ + if(epics::atomic::compareAndSwap(notifyBusy, 1, 2)==1) { + dbNotifyCancel(¬ify); + wait_changed.reset(); + wait_pvif.reset(); + epics::atomic::set(notifyBusy, 0); + } +} + void PDBSinglePut::get() { changed->clear(); diff --git a/pdbApp/pdbsingle.h b/pdbApp/pdbsingle.h index 308c5cf..bd097af 100644 --- a/pdbApp/pdbsingle.h +++ b/pdbApp/pdbsingle.h @@ -88,13 +88,14 @@ struct PDBSinglePut : public epics::pvAccess::ChannelPut, PDBSingleChannel::shared_pointer channel; requester_t::weak_pointer requester; - epics::pvData::BitSetPtr changed; + epics::pvData::BitSetPtr changed, wait_changed; epics::pvData::PVStructurePtr pvf; - std::auto_ptr pvif; + std::auto_ptr pvif, wait_pvif; processNotify notify; - bool doProc, doProcForce, doWait; + int notifyBusy; // atomic: 0 - idle, 1 - active, 2 - being cancelled - std::tr1::shared_ptr procself; // make ref. loop while notify is active + // effectively const after ctor + bool doProc, doProcForce, doWait; static size_t num_instances; @@ -107,7 +108,7 @@ struct PDBSinglePut : public epics::pvAccess::ChannelPut, virtual void lock() {} virtual void unlock() {} virtual std::tr1::shared_ptr getChannel() { return channel; } - virtual void cancel() {} + virtual void cancel(); virtual void lastRequest() {} virtual void put( epics::pvData::PVStructure::shared_pointer const & pvPutStructure, diff --git a/pdbApp/pvif.cpp b/pdbApp/pvif.cpp index d569ef6..ec17a5d 100644 --- a/pdbApp/pvif.cpp +++ b/pdbApp/pvif.cpp @@ -16,13 +16,6 @@ #define epicsExportSharedSymbols #include "pvif.h" - -#if defined(PVDATA_VERSION_INT) -#if PVDATA_VERSION_INT > VERSION_INT(7,0,0,0) -# define USE_LOGICAL_AND -#endif -#endif - namespace pvd = epics::pvData; DBCH::DBCH(dbChannel *ch) :chan(ch) @@ -444,17 +437,21 @@ struct PVIFScalarNumeric : public PVIF virtual void get(const epics::pvData::BitSet& mask) OVERRIDE FINAL { -#ifdef USE_LOGICAL_AND if(mask.logical_and(pvmeta.maskVALUE)) getValue(pvmeta); -#else - pvd::BitSet temp(mask); - temp &= pvmeta.maskVALUE; - if(!temp.isEmpty()) - getValue(pvmeta); -#endif } + virtual unsigned dbe(const epics::pvData::BitSet& mask) OVERRIDE FINAL + { + unsigned ret = 0; + if(mask.logical_and(pvmeta.maskVALUE)) + ret |= DBE_VALUE; + if(mask.logical_and(pvmeta.maskALARM)) + ret |= DBE_ALARM; + if(mask.logical_and(pvmeta.maskPROPERTY)) + ret |= DBE_PROPERTY; + return ret; + } }; } // namespace diff --git a/pdbApp/pvif.h b/pdbApp/pvif.h index da4be22..d53648a 100644 --- a/pdbApp/pvif.h +++ b/pdbApp/pvif.h @@ -266,6 +266,8 @@ struct epicsShareClass PVIF { //! Copy from pvalue to PDB record (call dbChannelPut()) //! caller must lock record virtual void get(const epics::pvData::BitSet& mask) =0; + //! Calculate DBE mask from changed bitset + virtual unsigned dbe(const epics::pvData::BitSet& mask) =0; static void Init();