diff --git a/documentation/pvalink-schema-0.json b/documentation/pvalink-schema-0.json new file mode 100644 index 0000000..0f04102 --- /dev/null +++ b/documentation/pvalink-schema-0.json @@ -0,0 +1,36 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://mdavidsaver.github.io/pvxs/pvalink-schema-0.json", + "title": "PVA Link schema", + "type": ["string", "object"], + "properties": { + "pv": { "type": "string" }, + "field": { + "type": "string", + "default": "value" + }, + "Q": { + "type": "integer", + "default": 4 + }, + "proc": { + "type": ["boolean", "string", "null"], + "enum": [true, false, null, "", "NPP", "PP", "CP", "CPP"], + "default": null + }, + "sevr": { + "type": ["boolean", "string"], + "enum": [true, false, "NMS", "MS", "MSI", "MSS"], + "default": "NMS" + }, + "time": { "type": "boolean", "default": false }, + "monorder": { "type": "integer", "default": 0 }, + "defer": { "type": "boolean", "default": false }, + "retry": { "type": "boolean", "default": false }, + "pipeline": { "type": "boolean", "default": false }, + "always": { "type": "boolean", "default": false }, + "atomic": { "type": "boolean", "default": false }, + "local": { "type": "boolean", "default": false } + }, + "additionalProperties": false +} diff --git a/ioc/Makefile b/ioc/Makefile index 5f9f7c8..8366e9c 100644 --- a/ioc/Makefile +++ b/ioc/Makefile @@ -63,6 +63,10 @@ pvxsIoc_SRCS += pvalink_jlif.cpp pvxsIoc_SRCS += pvalink_link.cpp pvxsIoc_SRCS += pvalink_lset.cpp +else + +pvxsIoc_SRCS += dummygroup.cpp + endif # BASE_7_0 pvxsIoc_LIBS += $(EPICS_BASE_IOC_LIBS) @@ -91,4 +95,4 @@ ifdef BASE_7_0 else ../O.Common/pvxsIoc.dbd: ../pvxs3x.dbd $(CP) $< $@ -endif \ No newline at end of file +endif diff --git a/ioc/pvalink.cpp b/ioc/pvalink.cpp index 24eb59f..2578896 100644 --- a/ioc/pvalink.cpp +++ b/ioc/pvalink.cpp @@ -20,13 +20,17 @@ #include #include #include +#include #include #include #include #include +#define PVXS_ENABLE_EXPERT_API + #include +#include "channel.h" #include "pvalink.h" #include "dblocker.h" #include "dbentry.h" @@ -63,10 +67,8 @@ static void shutdownStep2() { Guard G(pvaGlobal->lock); - if(pvaGlobal->channels.size()) { - fprintf(stderr, "pvaLink leaves %zu channels open\n", - pvaGlobal->channels.size()); - } + assert(pvaLink::cnt_pvaLink<=1u); // dbRemoveLink() already called + assert(pvaGlobal->channels.empty()); } delete pvaGlobal; @@ -185,20 +187,85 @@ void testqsrvCleanup(void) } } -void testqsrvWaitForLinkEvent(struct link *plink) +static +std::shared_ptr testGetPVALink(struct link *plink) { - std::shared_ptr lchan; - { - DBLocker lock(plink->precord); + DBLocker lock(plink->precord); - if(plink->type!=JSON_LINK || !plink->value.json.jlink || plink->value.json.jlink->pif!=&lsetPVA) { - testAbort("Not a PVA link"); - } - pvaLink *pval = static_cast(plink->value.json.jlink); - lchan = pval->lchan; + if(plink->type!=JSON_LINK || !plink->value.json.jlink || plink->value.json.jlink->pif!=&lsetPVA) { + testAbort("Not a PVA link"); } - if(lchan) { - lchan->run_done.wait(); + pvaLink *pval = static_cast(plink->value.json.jlink); + if(!pval->lchan) + testAbort("PVA link w/o channel?"); + return pval->lchan; +} + +static +DBLINK* testGetLink(const char *pv) +{ + Channel chan(pv); + switch(dbChannelFieldType(chan)) { + case DBF_INLINK: + case DBF_OUTLINK: + case DBF_FWDLINK: + break; + default: + testAbort("%s : not a link field", pv); + } + return static_cast(dbChannelField(chan)); +} + +void testqsrvWaitForLinkConnected(struct link *plink, bool conn) +{ + if(conn) + pvaGlobal->provider_remote.hurryUp(); + std::shared_ptr lchan(testGetPVALink(plink)); + Guard G(lchan->lock); + while(lchan->connected!=conn) { + testDiag("%s(\"%s\", %c) sleep", __func__, plink->precord->name, conn?'C':'D'); + UnGuard U(G); + if(!lchan->update_evt.wait(10.0)) + testAbort("%s(\"%s\") timeout", __func__, plink->precord->name); + errlogFlush(); + testDiag("%s(\"%s\") wakeup", __func__, plink->precord->name); + } + errlogFlush(); +} + +void testqsrvWaitForLinkConnected(const char* pv, bool conn) +{ + testqsrvWaitForLinkConnected(testGetLink(pv), conn); +} + +QSrvWaitForLinkUpdate::QSrvWaitForLinkUpdate(struct link *plink) + :plink(plink) +{ + std::shared_ptr lchan(testGetPVALink(plink)); + Guard G(lchan->lock); + seq = lchan->update_seq; + testDiag("%s(\"%s\") arm at %u", __func__, plink->precord->name, seq); +} + +QSrvWaitForLinkUpdate::QSrvWaitForLinkUpdate(const char *pv) + :QSrvWaitForLinkUpdate(testGetLink(pv)) +{} + +QSrvWaitForLinkUpdate::~QSrvWaitForLinkUpdate() +{ + std::shared_ptr lchan(testGetPVALink(plink)); + Guard G(lchan->lock); + while(seq == lchan->update_seq) { + testDiag("%s(\"%s\") wait for end of %u", __func__, plink->precord->name, seq); + bool ok; + { + UnGuard U(G); + ok = lchan->update_evt.wait(5.0); + } + if(!ok) + testAbort("%s(\"%s\") timeout at %u", __func__, plink->precord->name, seq); + errlogFlush(); + testDiag("%s(\"%s\") wake at %u", __func__, plink->precord->name, seq); } } @@ -252,7 +319,7 @@ void dbpvar(const char *precordname, int level) } nchans++; - if(chan->state == pvaLinkChannel::Connected) + if(chan->connected) nconn++; if(!precordname) @@ -261,7 +328,7 @@ void dbpvar(const char *precordname, int level) if(level<=0) continue; - if(level>=2 || (chan->state != pvaLinkChannel::Connected && level==1)) { + if(level>=2 || (!chan->connected && level==1)) { if(chan->key.first.size()<=28) { printf("%28s ", chan->key.first.c_str()); } else { @@ -269,16 +336,13 @@ void dbpvar(const char *precordname, int level) } printf("conn=%c %zu disconnects, %zu type changes", - chan->state == pvaLinkChannel::Connected?'T':'F', + chan->connected?'T':'F', chan->num_disconnect, chan->num_type_change); if(chan->op_put) { printf(" Put"); } - if(level>=3) { - printf(", provider '%s'", chan->providerName.c_str()); - } printf("\n"); // level 4 reserved for channel/provider details @@ -345,8 +409,6 @@ void installPVAAddLinkHook() initHookRegister(&initPVALink); IOCShCommand("dbpvar", "dbpvar", "record name", "level") .implementation<&dbpvar>(); -// epics::registerRefCounter("pvaLinkChannel", &pvaLinkChannel::num_instances); -// epics::registerRefCounter("pvaLink", &pvaLink::num_instances); } }} // namespace pvxs::ioc diff --git a/ioc/pvalink.h b/ioc/pvalink.h index d79749e..290fa9d 100644 --- a/ioc/pvalink.h +++ b/ioc/pvalink.h @@ -34,8 +34,18 @@ #include #include +#include "utilpvt.h" #include "dbmanylocker.h" + +#if EPICS_VERSION_INT> queue; @@ -105,6 +119,9 @@ struct pvaGlobal_t : private epicsThreadRunable { // Cache of active Channels (really about caching Monitor) channels_t channels; + // pvRequest used with PUT + const Value putReq; + private: epicsThread worker; bool workerStop = false; @@ -112,37 +129,36 @@ private: public: pvaGlobal_t(); + pvaGlobal_t(const pvaGlobal_t&) = delete; + pvaGlobal_t& operator=(const pvaGlobal_t&) = delete; virtual ~pvaGlobal_t(); void close(); }; extern pvaGlobal_t *pvaGlobal; -struct pvaLinkChannel : public epicsThreadRunable +struct pvaLinkChannel final : public epicsThreadRunable ,public std::enable_shared_from_this { const pvaGlobal_t::channels_key_t key; // tuple of (channelName, pvRequest key) const Value pvRequest; // used with monitor - static size_t num_instances; + INST_COUNTER(pvaLinkChannel); + // locker order: record lock(s) -> channel lock epicsMutex lock; - epicsEvent run_done; // used by testing code + epicsEvent update_evt; // used by testing code -// std::shared_ptr chan; std::shared_ptr op_mon; std::shared_ptr op_put; Value root; - std::string providerName; size_t num_disconnect = 0u, num_type_change = 0u; - enum state_t { - Disconnected, - Connecting, - Connected, - } state = Disconnected; - bool isatomic = false; + bool connected = false; bool debug = false; // set if any jlink::debug is set + + unsigned update_seq = 0u; // used by testing code + typedef std::set after_put_t; after_put_t after_put; @@ -165,27 +181,38 @@ struct pvaLinkChannel : public epicsThreadRunable void open(); void put(bool force=false); // begin Put op. - struct AfterPut : public epicsThreadRunable { + struct AfterPut final : public epicsThreadRunable { std::weak_ptr lc; - virtual ~AfterPut() {} + AfterPut() = default; + AfterPut(const AfterPut&) = delete; + AfterPut& operator=(const AfterPut&) = delete; + virtual ~AfterPut() = default; virtual void run() override final; }; - std::shared_ptr AP; + const std::shared_ptr AP; private: virtual void run() override final; - void run_dbProcess(size_t idx); // idx is index in scan_records // ==== Treat remaining as local to run() - std::vector scan_records; - std::vector scan_check_passive; + struct ScanTrack { + dbCommon *prec = nullptr; + // if true, only scan if prec->scan==0 + bool check_passive = false; + + ScanTrack() = default; + ScanTrack(dbCommon *prec, bool check_passive) :prec(prec), check_passive(check_passive) {} + void scan(); + }; + std::vector nonatomic_records, + atomic_records; ioc::DBManyLock atomic_lock; }; struct pvaLink final : public pvaLinkConfig { - static size_t num_instances; + INST_COUNTER(pvaLink); bool alive = true; // attempt to catch some use after free dbfType type = (dbfType)-1; @@ -200,19 +227,21 @@ struct pvaLink final : public pvaLinkConfig // cached fields from channel op_mon // updated in onTypeChange() - Value fld_value; - Value fld_severity, + Value fld_value, + fld_severity, + fld_message, fld_seconds, - fld_nanoseconds; - Value fld_display, - fld_control, - fld_valueAlarm; + fld_nanoseconds, + fld_usertag, + fld_meta; // cached snapshot of alarm and timestamp // captured in pvaGetValue(). // we choose not to ensure consistency with display/control meta-data epicsTimeStamp snap_time = {}; + epicsUTag snap_tag = 0; short snap_severity = INVALID_ALARM; + std::string snap_message; pvaLink(); virtual ~pvaLink(); @@ -222,11 +251,14 @@ struct pvaLink final : public pvaLinkConfig bool valid() const; - // fetch a sub-sub-field of the top monitored field. - Value getSubField(const char *name); - void onDisconnect(); void onTypeChange(); + enum scanOnUpdate_t { + scanOnUpdateNo = -1, + scanOnUpdatePassive = 0, + scanOnUpdateYes = 1, + }; + scanOnUpdate_t scanOnUpdate() const; }; diff --git a/ioc/pvalink_channel.cpp b/ioc/pvalink_channel.cpp index d86bc8d..00e6941 100644 --- a/ioc/pvalink_channel.cpp +++ b/ioc/pvalink_channel.cpp @@ -9,11 +9,13 @@ #include +#include "utilpvt.h" #include "pvalink.h" #include "dblocker.h" #include "dbmanylocker.h" -DEFINE_LOGGER(_logger, "ioc.pvalink.channel"); +DEFINE_LOGGER(_logger, "pvxs.ioc.link.channel"); +DEFINE_LOGGER(_logupdate, "pvxs.ioc.link.channel.update"); int pvaLinkNWorkers = 1; @@ -26,6 +28,14 @@ pvaGlobal_t *pvaGlobal; pvaGlobal_t::pvaGlobal_t() :queue() ,running(false) + ,putReq(TypeDef(TypeCode::Struct, { + members::Struct("field", {}), + members::Struct("record", { + members::Struct("_options", { + members::Bool("block"), + members::String("process"), + }), + }), }).create()) ,worker(*this, "pvxlink", epicsThreadGetStackSize(epicsThreadStackBig), @@ -66,9 +76,8 @@ void pvaGlobal_t::close() worker.exitWait(); } -size_t pvaLinkChannel::num_instances; -size_t pvaLink::num_instances; - +DEFINE_INST_COUNTER(pvaLinkChannel); +DEFINE_INST_COUNTER(pvaLink); bool pvaLinkChannel::LinkSort::operator()(const pvaLink *L, const pvaLink *R) const { if(L->monorder==R->monorder) @@ -104,11 +113,14 @@ void pvaLinkChannel::open() .rawRequest(pvRequest) .event([this](const client::Subscription&) { - log_debug_printf(_logger, "Received message: %s %s\n", key.first.c_str(), key.second.c_str()); - pvaGlobal->queue.push(shared_from_this()); + log_debug_printf(_logger, "Monitor %s wakeup\n", key.first.c_str()); + try { + pvaGlobal->queue.push(shared_from_this()); + }catch(std::bad_weak_ptr&){ + log_err_printf(_logger, "channel '%s' open during dtor?", key.first.c_str()); + } }) .exec(); - providerName = "remote"; } static @@ -138,7 +150,7 @@ Value linkBuildPut(pvaLinkChannel *self, Value&& prototype) value = tosend; } else { if (tosend.empty()) - continue; // TODO: Signal error + continue; // TODO: can't write empty array to scalar field Signal error if (value.type() == TypeCode::Struct && value.id() == "enum_t") { value = value["index"]; // We want to assign to the index for enum types @@ -179,6 +191,7 @@ void linkPutDone(pvaLinkChannel *self, client::Result&& result) ok = true; }catch(std::exception& e){ errlogPrintf("%s PVA link put ERROR: %s\n", self->key.first.c_str(), e.what()); + // TODO: signal INVALID_ALARM ? } bool needscans; @@ -206,24 +219,13 @@ void linkPutDone(pvaLinkChannel *self, client::Result&& result) // call with channel lock held void pvaLinkChannel::put(bool force) { - // TODO cache TypeDef in global - using namespace pvxs::members; - auto pvReq(TypeDef(TypeCode::Struct, { - Struct("field", {}), - Struct("record", { - Struct("_options", { - Bool("block"), - String("process"), - }), - }), }).create() + auto pvReq(pvaGlobal->putReq.cloneEmpty() .update("record._options.block", !after_put.empty())); unsigned reqProcess = 0; bool doit = force; - for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it) + for(auto& link : links) { - pvaLink *link = *it; - if(!link->used_scratch) continue; link->put_queue = std::move(link->put_scratch); @@ -264,6 +266,7 @@ void pvaLinkChannel::put(bool force) if(doit) { // start net Put, cancels in-progress put op_put = pvaGlobal->provider_remote.put(key.first) + .rawRequest(pvReq) .build([this](Value&& prototype) -> Value { return linkBuildPut(this, std::move(prototype)); // TODO @@ -306,160 +309,131 @@ void pvaLinkChannel::AfterPut::run() } -// the work in calling dbProcess() which is common to -// both dbScanLock() and dbScanLockMany() -void pvaLinkChannel::run_dbProcess(size_t idx) +// caller has locked record +void pvaLinkChannel::ScanTrack::scan() { - dbCommon *precord = scan_records[idx]; + if(check_passive && prec->scan!=0) { - if(scan_check_passive[idx] && precord->scan!=0) { - return; - - // TODO: This relates to caching of the individual links and comparing it to - // the posted monitor. This is, as I understand it, an optimisation and - // we can sort of ignore it for now. - //} else if(state_latched == Connected && !op_mon.changed.logical_and(scan_changed[idx])) { - // return; - - } else if (precord->pact) { - if (precord->tpro) - printf("%s: Active %s\n", - epicsThreadGetNameSelf(), precord->name); - precord->rpro = TRUE; + } else if (prec->pact) { + if (prec->tpro) + printf("%s: Active %s\n", epicsThreadGetNameSelf(), prec->name); + prec->rpro = TRUE; + } else { + (void)dbProcess(prec); } - dbProcess(precord); } // Running from global WorkQueue thread void pvaLinkChannel::run() { - bool requeue = false; { Guard G(lock); - log_debug_printf(_logger,"Running task %s\n", this->key.first.c_str()); + log_debug_printf(_logger,"Monitor %s work\n", this->key.first.c_str()); Value top; try { top = op_mon->pop(); if(!top) { - log_debug_printf(_logger, "Queue empty %s\n", this->key.first.c_str()); - run_done.signal(); + log_debug_printf(_logger, "Monitor %s empty\n", this->key.first.c_str()); return; } - state = Connected; - } catch(client::Disconnect&) { - log_debug_printf(_logger, "PVA link %s received disonnection event\n", this->key.first.c_str()); + if(!connected) { + // (re)connect implies type change + log_debug_printf(_logger, "Monitor %s reconnect\n", this->key.first.c_str()); + + root = top; // re-create cache + connected = true; + num_type_change++; + + for(auto link : links) { + link->onTypeChange(); + } + + } else { // update cache + root.assign(top); + } + log_debug_printf(_logupdate, "Monitor %s value %s\n", this->key.first.c_str(), + std::string(SB()<key.first.c_str()); - state = Disconnected; + connected = false; num_disconnect++; // cancel pending put operations op_put.reset(); - for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it) - { - pvaLink *link = *it; + for(auto link : links) { link->onDisconnect(); + link->snap_time = e.time; } // Don't clear previous_root on disconnect. - // We will usually re-connect with the same type, - // and may get back the same PVStructure. + // while disconnected, we will provide the most recent value w/ LINK_ALARM } catch(std::exception& e) { - errlogPrintf("pvalinkChannel::run: Unexpected exception while reading from monitor queue: %s\n", e.what()); + log_exc_printf(_logger, "pvalinkChannel::run: Unexpected exception: %s\n", e.what()); } - if (state == Connected) { - // Fetch the data from the incoming monitor - if (root.equalType(top)) - { - log_debug_printf(_logger, "pvalinkChannel update value %s\n", this->key.first.c_str()); - - root.assign(top); - } - else - { - log_debug_printf(_logger, "pvalinkChannel %s update type\n", this->key.first.c_str()); - root = top; - num_type_change++; - - for (links_t::iterator it(links.begin()), end(links.end()); it != end; ++it) - { - pvaLink *link = *it; - link->onTypeChange(); - } - } - - requeue = true; - } - if(links_changed) { // a link has been added or removed since the last update. // rebuild our cached list of records to (maybe) process. - scan_records.clear(); - scan_check_passive.clear(); + decltype(atomic_records) atomic, nonatomic; + std::vector atomicrecs; - for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it) - { - pvaLink *link = *it; + for(auto link : links) { assert(link && link->alive); - if(!link->plink) continue; - - // only scan on monitor update for input links - if(link->type!=DBF_INLINK) + auto sou(link->scanOnUpdate()); + if(sou==pvaLink::scanOnUpdateNo) continue; - // NPP and none/Default don't scan - // PP, CP, and CPP do scan - // PP and CPP only if SCAN=Passive - if(link->proc != pvaLink::PP && link->proc != pvaLink::CPP && link->proc != pvaLink::CP) - continue; + bool check_passive = sou==pvaLink::scanOnUpdatePassive; - scan_records.push_back(link->plink->precord); - scan_check_passive.push_back(link->proc != pvaLink::CP); + if(link->atomic) { + atomicrecs.push_back(link->plink->precord); + atomic.emplace_back(link->plink->precord, check_passive); + } else { + nonatomic.emplace_back(link->plink->precord, check_passive); + } } - log_debug_printf(_logger, "Links changed, scan_records size = %lu\n", scan_records.size()); + log_debug_printf(_logger, "Links changed, %zu with %zu atomic, %zu nonatomic\n", + links.size(), atomic.size(), nonatomic.size()); - atomic_lock = ioc::DBManyLock(scan_records); + atomic_lock = ioc::DBManyLock(atomicrecs); + atomic_records = std::move(atomic); + nonatomic_records = std::move(nonatomic); links_changed = false; } + + update_seq++; + update_evt.signal(); + log_debug_printf(_logger, "%s Sequence point %u\n", key.first.c_str(), update_seq); } + // unlock link - if(scan_records.empty()) { - // Nothing to do, so don't bother locking - - } else if(isatomic && scan_records.size() > 1u) { + if(!atomic_records.empty()) { ioc::DBManyLocker L(atomic_lock); - - for(size_t i=0, N=scan_records.size(); iname); - - ioc::DBLocker L(scan_records[i]); - run_dbProcess(i); + for(auto& trac : atomic_records) { + trac.scan(); } } - if(requeue) { - log_debug_printf(_logger, "Requeueing %s\n", key.first.c_str()); - // re-queue until monitor queue is empty - pvaGlobal->queue.push(shared_from_this()); - } else { - log_debug_printf(_logger, "Run done instead of requeue %s\n", key.first.c_str()); - run_done.signal(); + for(auto& trac : nonatomic_records) { + ioc::DBLocker L(trac.prec); + trac.scan(); } + + log_debug_printf(_logger, "Requeueing %s\n", key.first.c_str()); + // re-queue until monitor queue is empty + pvaGlobal->queue.push(shared_from_this()); } } // namespace pvalink diff --git a/ioc/pvalink_jlif.cpp b/ioc/pvalink_jlif.cpp index 9090349..bd07adb 100644 --- a/ioc/pvalink_jlif.cpp +++ b/ioc/pvalink_jlif.cpp @@ -6,10 +6,9 @@ #include -#include // redirects stdout/stderr - #include "pvalink.h" +#include // redirects stdout/stderr #include namespace pvxlink { @@ -105,6 +104,8 @@ jlif_result pva_parse_bool(jlink *pjlink, int val) pvt->local = !!val; } else if(pvt->jkey == "always") { pvt->always = !!val; + } else if(pvt->jkey == "atomic") { + pvt->atomic = !!val; } else if(pvt->debug) { printf("pva link parsing unknown integer depth=%u key=\"%s\" value=%s\n", pvt->parseDepth, pvt->jkey.c_str(), val ? "true" : "false"); @@ -241,12 +242,13 @@ void pva_report(const jlink *rpjlink, int lvl, int indent) case pvaLinkConfig::MSI: printf(" MSI"); break; } if(lvl>0) { - printf(" Q=%u pipe=%c defer=%c time=%c retry=%c morder=%d", + printf(" Q=%u pipe=%c defer=%c time=%c retry=%c atomic=%c morder=%d", unsigned(pval->queueSize), pval->pipeline ? 'T' : 'F', pval->defer ? 'T' : 'F', pval->time ? 'T' : 'F', pval->retry ? 'T' : 'F', + pval->atomic ? 'T' : 'F', pval->monorder); } @@ -254,13 +256,13 @@ void pva_report(const jlink *rpjlink, int lvl, int indent) // after open() Guard G(pval->lchan->lock); - printf(" conn=%c", pval->lchan->state == pvaLinkChannel::Connected ? 'T' : 'F'); + printf(" conn=%c", pval->lchan->connected ? 'T' : 'F'); if(pval->lchan->op_put) { printf(" Put"); } if(lvl>0) { - printf(" #disconn=%zu prov=%s", pval->lchan->num_disconnect, pval->lchan->providerName.c_str()); + printf(" #disconn=%zu", pval->lchan->num_disconnect); } // if(lvl>5) { // std::ostringstream strm; diff --git a/ioc/pvalink_link.cpp b/ioc/pvalink_link.cpp index 8171976..c1d2346 100644 --- a/ioc/pvalink_link.cpp +++ b/ioc/pvalink_link.cpp @@ -12,7 +12,7 @@ #include "pvalink.h" -DEFINE_LOGGER(_logger, "ioc.pvalink.link"); +DEFINE_LOGGER(_logger, "pvxs.ioc.link.link"); namespace pvxlink { @@ -34,10 +34,7 @@ pvaLink::~pvaLink() lchan->links_changed = true; bool new_debug = false; - for(pvaLinkChannel::links_t::const_iterator it(lchan->links.begin()), end(lchan->links.end()) - ; it!=end; ++it) - { - const pvaLink *pval = *it; + for(auto pval : lchan->links) { if(pval->debug) { new_debug = true; break; @@ -70,35 +67,7 @@ Value pvaLink::makeRequest() // caller must lock lchan->lock bool pvaLink::valid() const { - return lchan->state == pvaLinkChannel::Connected && lchan->root; -} - -// caller must lock lchan->lock -Value pvaLink::getSubField(const char *name) -{ - Value ret; - if(valid()) { - if(fieldName.empty()) { - // we access the top level struct - ret = lchan->root[name]; - - } else { - // we access a sub-struct - ret = lchan->root[fieldName]; - if(!ret) { - // noop - } else if(ret.type()!=TypeCode::Struct) { - // addressed sub-field isn't a sub-structure - if(strcmp(name, "value")!=0) { - // unless we are trying to fetch the "value", we fail here - ret = Value(); - } - } else { - ret = ret[name]; - } - } - } - return ret; + return lchan->connected && lchan->root; } // call with channel lock held @@ -112,38 +81,54 @@ void pvaLink::onDisconnect() void pvaLink::onTypeChange() { - log_debug_printf(_logger, "%s type change\n", plink->precord->name); + assert(lchan->connected && lchan->root); // we should only be called when connected - assert(lchan->state == pvaLinkChannel::Connected && lchan->root); // we should only be called when connected + fld_value = fld_severity = fld_nanoseconds = fld_usertag + = fld_message = fld_severity = fld_meta = Value(); - fld_value = getSubField("value"); - fld_seconds = getSubField("timeStamp.secondsPastEpoch"); - fld_nanoseconds = getSubField("timeStamp.nanoseconds"); - fld_severity = getSubField("alarm.severity"); - fld_display = getSubField("display"); - fld_control = getSubField("control"); - fld_valueAlarm = getSubField("valueAlarm"); + Value root; + if(fieldName.empty()) { + root = lchan->root; + } else { + root = lchan->root[fieldName]; + } + if(!root) { + log_warn_printf(_logger, "%s has no %s\n", lchan->key.first.c_str(), fieldName.c_str()); - // build mask of all "changed" bits associated with our .value - // CP/CPP input links will process this link only for updates where - // the changed mask and proc_changed share at least one set bit. -// if(fld_value) { -// // bit for this field -// proc_changed.set(fld_value->getFieldOffset()); + } else if(root.type()!=TypeCode::Struct) { + log_debug_printf(_logger, "%s has no meta\n", lchan->key.first.c_str()); + fld_value = root; -// // bits of all parent fields -// for(const pvd::PVStructure* parent = fld_value->getParent(); parent; parent = parent->getParent()) { -// proc_changed.set(parent->getFieldOffset()); -// } + } else { + fld_value = root["value"]; + fld_seconds = root["timeStamp.secondsPastEpoch"]; + fld_nanoseconds = root["timeStamp.nanoseconds"]; + fld_usertag = root["timeStamp.userTag"]; + fld_severity = root["alarm.severity"]; + fld_message = root["alarm.message"]; + fld_meta = std::move(root); + } -// if(fld_value->getField()->getType()==pvd::structure) -// { -// // bits of all child fields -// const pvd::PVStructure *val = static_cast(fld_value.get()); -// for(size_t i=val->getFieldOffset(), N=val->getNextFieldOffset(); iprecord->name, + fld_value ? 'Y' : 'N', + fld_seconds ? 'Y' : 'N', + fld_nanoseconds ? 'Y' : 'N', + fld_severity ? 'Y' : 'N', + fld_meta ? 'Y' : 'N'); +} + +pvaLink::scanOnUpdate_t pvaLink::scanOnUpdate() const +{ + if(!plink) + return scanOnUpdateNo; + if(type!=DBF_INLINK) + return scanOnUpdateNo; + if(proc == pvaLink::CP) + return scanOnUpdateYes; + if(proc == pvaLink::CPP) + return scanOnUpdatePassive; + return scanOnUpdateNo; } } // namespace pvalink diff --git a/ioc/pvalink_lset.cpp b/ioc/pvalink_lset.cpp index d471375..b8f54a2 100644 --- a/ioc/pvalink_lset.cpp +++ b/ioc/pvalink_lset.cpp @@ -15,7 +15,7 @@ #include // redirect stdout/stderr; include after libevent/util.h -DEFINE_LOGGER(_logger, "pvxs.pvalink.lset"); +DEFINE_LOGGER(_logger, "pvxs.ioc.link.lset"); namespace pvxlink { namespace { @@ -45,6 +45,14 @@ void pvaOpenLink(DBLINK *plink) pvaLink* self((pvaLink*)plink->value.json.jlink); self->type = getLinkType(plink); + if(self->local && dbChannelTest(self->channelName.c_str())!=0) { + // TODO: only print duing iocInit()? + fprintf(stderr, "%s Error: local:true link to '%s' can't be fulfilled\n", + plink->precord->name, self->channelName.c_str()); + plink->lset = NULL; + return; + } + // workaround for Base not propagating info(base:lsetDebug to us { ioc::DBEntry rec(plink->precord); @@ -54,7 +62,9 @@ void pvaOpenLink(DBLINK *plink) } } - log_debug_printf(_logger, "%s OPEN %s\n", plink->precord->name, self->channelName.c_str()); + log_debug_printf(_logger, "%s OPEN %s sevr=%d\n", + plink->precord->name, self->channelName.c_str(), + self->sevr); // still single threaded at this point. // also, no pvaLinkChannel::lock yet @@ -82,10 +92,17 @@ void pvaOpenLink(DBLINK *plink) if(!chan) { // open new channel + log_debug_printf(_logger, "%s CREATE %s\n", + plink->precord->name, self->channelName.c_str()); + chan.reset(new pvaLinkChannel(key, pvRequest)); chan->AP->lc = chan; pvaGlobal->channels.insert(std::make_pair(key, chan)); doOpen = true; + + } else { + log_debug_printf(_logger, "%s REUSE %s\n", + plink->precord->name, self->channelName.c_str()); } doOpen &= pvaGlobal->running; // if not running, then open from initHook @@ -95,20 +112,36 @@ void pvaOpenLink(DBLINK *plink) chan->open(); // start subscription } - if(!self->local || chan->providerName=="QSRV"){ + bool scanInit = false; + { Guard G(chan->lock); chan->links.insert(self); chan->links_changed = true; - self->lchan.swap(chan); // we are now attached + self->lchan = std::move(chan); // we are now attached self->lchan->debug |= !!self->debug; - } else { - // TODO: only print duing iocInit()? - fprintf(stderr, "%s Error: local:true link to '%s' can't be fulfilled\n", - plink->precord->name, self->channelName.c_str()); - plink->lset = NULL; + + if(self->lchan->connected) { + self->onTypeChange(); + auto sou(self->scanOnUpdate()); + switch(sou) { + case pvaLink::scanOnUpdateNo: + break; + case pvaLink::scanOnUpdatePassive: + // record is locked + scanInit = plink->precord->scan==menuScanPassive; + break; + case pvaLink::scanOnUpdateYes: + scanInit = true; + break; + } + } + } + if(scanInit) { + // TODO: initial scan on linkGlobal worker? + scanOnce(plink->precord); } return; @@ -119,6 +152,7 @@ void pvaOpenLink(DBLINK *plink) void pvaRemoveLink(struct dbLocker *locker, DBLINK *plink) { + (void)locker; try { std::unique_ptr self((pvaLink*)plink->value.json.jlink); log_debug_printf(_logger, "%s: %s %s\n", __func__, plink->precord->name, self->channelName.c_str()); @@ -151,8 +185,8 @@ int pvaGetDBFtype(const DBLINK *plink) // if sub-field is struct, use sub-struct .value // if sub-field not struct, treat as value - auto value(self->getSubField("value")); - auto vtype(value.type()); + auto& value(self->fld_value); + auto vtype(self->fld_value.type()); if(vtype.isarray()) vtype = vtype.scalarOf(); @@ -208,15 +242,11 @@ long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer, if(!self->valid()) { // disconnected - if(self->sevr != pvaLink::NMS) { - recGblSetSevr(plink->precord, LINK_ALARM, self->snap_severity); - } - // TODO: better capture of disconnect time - epicsTimeGetCurrent(&self->snap_time); + (void)recGblSetSevr(plink->precord, LINK_ALARM, INVALID_ALARM); if(self->time) { plink->precord->time = self->snap_time; } - log_debug_printf(_logger, "%s: %s not valid", __func__, self->channelName.c_str()); + log_debug_printf(_logger, "%s: %s not valid\n", __func__, self->channelName.c_str()); return -1; } @@ -228,7 +258,7 @@ long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer, if(nReq <= 0 || !value) { if(!pnRequest) { - // TODO: fill in dummy scalar + memset(pbuffer, 0, dbValueSize(dbrType)); nReq = 1; } @@ -238,19 +268,15 @@ long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer, if(size_t(nReq) > arr.size()) nReq = arr.size(); - if(arr.original_type()==ArrayType::String) { - auto sarr(arr.castTo()); + if(dbrType==DBR_STRING) { + auto sarr(arr.castTo()); // may copy+convert - if(dbrType==DBR_STRING) { - auto cbuf(reinterpret_cast(pbuffer)); - for(size_t i : range(size_t(nReq))) { - strncpy(cbuf + i*MAX_STRING_SIZE, - sarr[i].c_str(), - MAX_STRING_SIZE-1u); - cbuf[i*MAX_STRING_SIZE + MAX_STRING_SIZE-1] = '\0'; - } - } else { - return S_db_badDbrtype; // TODO: allow implicit parse? + auto cbuf(reinterpret_cast(pbuffer)); + for(size_t i : range(size_t(nReq))) { + strncpy(cbuf + i*MAX_STRING_SIZE, + sarr[i].c_str(), + MAX_STRING_SIZE-1u); + cbuf[i*MAX_STRING_SIZE + MAX_STRING_SIZE-1] = '\0'; } } else { @@ -267,6 +293,8 @@ long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer, case DBR_FLOAT: dtype = ArrayType::Float32; break; case DBR_DOUBLE: dtype = ArrayType::Float64; break; default: + log_debug_printf(_logger, "%s: %s unsupported array conversion\n", + __func__, plink->precord->name); return S_db_badDbrtype; } @@ -305,6 +333,8 @@ long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer, break; } default: + log_debug_printf(_logger, "%s: %s unsupported enum conversion\n", + __func__, plink->precord->name); return S_db_badDbrtype; } @@ -328,6 +358,8 @@ long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer, break; } default: + log_debug_printf(_logger, "%s: %s unsupported scalar conversion\n", + __func__, plink->precord->name); return S_db_badDbrtype; } } @@ -355,9 +387,17 @@ long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer, self->snap_severity = NO_ALARM; } + if(self->fld_message && self->snap_severity!=0) { + self->snap_message = self->fld_message.as(); + } else { + self->snap_message.clear(); + } + if((self->snap_severity!=NO_ALARM && self->sevr == pvaLink::MS) || (self->snap_severity==INVALID_ALARM && self->sevr == pvaLink::MSI)) { + log_debug_printf(_logger, "%s: %s recGblSetSevr %d\n", __func__, plink->precord->name, + self->snap_severity); recGblSetSevr(plink->precord, LINK_ALARM, self->snap_severity); } @@ -365,7 +405,8 @@ long pvaGetValue(DBLINK *plink, short dbrType, void *pbuffer, plink->precord->time = self->snap_time; } - log_debug_printf(_logger, "%s: %s %s OK\n", __func__, plink->precord->name, self->channelName.c_str()); + log_debug_printf(_logger, "%s: %s %s snapsevr=%d OK\n", __func__, plink->precord->name, + self->channelName.c_str(), self->snap_severity); return 0; }CATCH() return -1; @@ -377,19 +418,11 @@ long pvaGetControlLimits(const DBLINK *plink, double *lo, double *hi) Guard G(self->lchan->lock); CHECK_VALID(); - if(self->fld_control) { - Value value; - if(lo) { - if(!self->fld_control["limitLow"].as(*lo)) - *lo = 0.0; - } - if(hi) { - if(!self->fld_control["limitHigh"].as(*hi)) - *hi = 0.0; - } - } else { - *lo = *hi = 0.0; - } + if(lo) + (void)self->fld_meta["control.limitLow"].as(*lo); + if(hi) + (void)self->fld_meta["control.limitHigh"].as(*hi); + log_debug_printf(_logger, "%s: %s %s %f %f\n", __func__, plink->precord->name, self->channelName.c_str(), lo ? *lo : 0, hi ? *hi : 0); return 0; @@ -403,19 +436,11 @@ long pvaGetGraphicLimits(const DBLINK *plink, double *lo, double *hi) Guard G(self->lchan->lock); CHECK_VALID(); - if(self->fld_display) { - Value value; - if(lo) { - if(!self->fld_display["limitLow"].as(*lo)) - *lo = 0.0; - } - if(hi) { - if(!self->fld_display["limitHigh"].as(*hi)) - *hi = 0.0; - } - } else { - *lo = *hi = 0.0; - } + if(lo) + (void)self->fld_meta["display.limitLow"].as(*lo); + if(hi) + (void)self->fld_meta["display.limitHigh"].as(*hi); + log_debug_printf(_logger, "%s: %s %s %f %f\n", __func__, plink->precord->name, self->channelName.c_str(), lo ? *lo : 0, hi ? *hi : 0); return 0; @@ -427,9 +452,19 @@ long pvaGetAlarmLimits(const DBLINK *plink, double *lolo, double *lo, double *hi, double *hihi) { TRY { - //Guard G(self->lchan->lock); - //CHECK_VALID(); - *lolo = *lo = *hi = *hihi = 0.0; + Guard G(self->lchan->lock); + CHECK_VALID(); + + if(lolo) + (void)self->fld_meta["valueAlarm.lowAlarmLimit"].as(*lolo); + if(lo) + (void)self->fld_meta["valueAlarm.lowWarningLimit"].as(*lo); + if(hi) + (void)self->fld_meta["valueAlarm.highWarningLimit"].as(*hi); + if(hihi) + (void)self->fld_meta["valueAlarm.highAlarmLimit"].as(*hihi); + + log_debug_printf(_logger, "%s: %s %s %f %f %f %f\n", __func__, plink->precord->name, self->channelName.c_str(), lo ? *lo : 0, lolo ? *lolo : 0, hi ? *hi : 0, hihi ? *hihi : 0); @@ -441,12 +476,15 @@ long pvaGetAlarmLimits(const DBLINK *plink, double *lolo, double *lo, long pvaGetPrecision(const DBLINK *plink, short *precision) { TRY { - //Guard G(self->lchan->lock); - //CHECK_VALID(); + Guard G(self->lchan->lock); + CHECK_VALID(); - // No sane way to recover precision from display.format string. - *precision = 0; - log_debug_printf(_logger, "%s: %s %s %i\n", __func__, plink->precord->name, self->channelName.c_str(), *precision); + uint16_t prec = 0; + (void)self->fld_meta["display.precision"].as(prec); + if(precision) + *precision = prec; + + log_debug_printf(_logger, "%s: %s %s %i\n", __func__, plink->precord->name, self->channelName.c_str(), prec); return 0; }CATCH() return -1; @@ -458,24 +496,23 @@ long pvaGetUnits(const DBLINK *plink, char *units, int unitsSize) Guard G(self->lchan->lock); CHECK_VALID(); - if(unitsSize==0) return 0; + if(!units || unitsSize==0) return 0; + std::string egu; - if(units && self->fld_display.as(egu)) { - strncpy(units, egu.c_str(), unitsSize-1u); - units[unitsSize-1u] = '\0'; - } else if(units) { - units[0] = '\0'; - } + (void)self->fld_meta["display.units"].as(egu); + strncpy(units, egu.c_str(), unitsSize-1); units[unitsSize-1] = '\0'; + log_debug_printf(_logger, "%s: %s %s %s\n", __func__, plink->precord->name, self->channelName.c_str(), units); return 0; }CATCH() return -1; } -long pvaGetAlarm(const DBLINK *plink, epicsEnum16 *status, - epicsEnum16 *severity) +long pvaGetAlarmMsg(const DBLINK *plink, + epicsEnum16 *status, epicsEnum16 *severity, + char *msgbuf, size_t msgbuflen) { TRY { Guard G(self->lchan->lock); @@ -487,6 +524,14 @@ long pvaGetAlarm(const DBLINK *plink, epicsEnum16 *status, if(status) { *status = self->snap_severity ? LINK_ALARM : NO_ALARM; } + if(msgbuf && msgbuflen) { + if(self->snap_message.empty()) { + msgbuf[0] = '\0'; + } else { + epicsSnprintf(msgbuf, msgbuflen-1u, "%s", self->snap_message.c_str()); + msgbuf[msgbuflen-1u] = '\0'; + } + } log_debug_printf(_logger, "%s: %s %s %i %i\n", __func__, plink->precord->name, self->channelName.c_str(), severity ? *severity : 0, status ? *status : 0); return 0; @@ -494,7 +539,13 @@ long pvaGetAlarm(const DBLINK *plink, epicsEnum16 *status, return -1; } -long pvaGetTimeStamp(const DBLINK *plink, epicsTimeStamp *pstamp) +long pvaGetAlarm(const DBLINK *plink, epicsEnum16 *status, + epicsEnum16 *severity) +{ + return pvaGetAlarmMsg(plink, status, severity, nullptr, 0); +} + +long pvaGetTimeStampTag(const DBLINK *plink, epicsTimeStamp *pstamp, epicsUTag *ptag) { TRY { Guard G(self->lchan->lock); @@ -503,12 +554,20 @@ long pvaGetTimeStamp(const DBLINK *plink, epicsTimeStamp *pstamp) if(pstamp) { *pstamp = self->snap_time; } + if(ptag) { + *ptag = self->snap_tag; + } log_debug_printf(_logger, "%s: %s %s %i %i\n", __func__, plink->precord->name, self->channelName.c_str(), pstamp ? pstamp->secPastEpoch : 0, pstamp ? pstamp->nsec : 0); return 0; }CATCH() return -1; } +long pvaGetTimeStamp(const DBLINK *plink, epicsTimeStamp *pstamp) +{ + return pvaGetTimeStampTag(plink, pstamp, nullptr); +} + long pvaPutValueX(DBLINK *plink, short dbrType, const void *pbuffer, long nRequest, bool wait) { @@ -559,10 +618,8 @@ long pvaPutValueX(DBLINK *plink, short dbrType, self->used_scratch = true; -#ifdef USE_MULTILOCK if(wait) self->lchan->after_put.insert(plink->precord); -#endif if(!self->defer) self->lchan->put(); @@ -591,6 +648,7 @@ void pvaScanForward(DBLINK *plink) Guard G(self->lchan->lock); if(!self->retry && !self->valid()) { + (void)recGblSetSevrMsg(plink->precord, LINK_ALARM, INVALID_ALARM, "Disconn"); return; } @@ -602,6 +660,17 @@ void pvaScanForward(DBLINK *plink) }CATCH() } +#if EPICS_VERSION_INT>=VERSION_INT(3,16,1,0) +long pvaDoLocked(struct link *plink, dbLinkUserCallback rtn, void *priv) +{ + TRY { + Guard G(self->lchan->lock); + return (*rtn)(plink, priv); + }CATCH() + return 1; +} +#endif // >= 3.16.1 + #undef TRY #undef CATCH @@ -625,8 +694,14 @@ lset pva_lset = { &pvaGetTimeStamp, &pvaPutValue, &pvaPutValueAsync, - &pvaScanForward - //&pvaReportLink, + &pvaScanForward, +#if EPICS_VERSION_INT>=VERSION_INT(3,16,1,0) + &pvaDoLocked, +#endif +#if EPICS_VERSION_INT>=VERSION_INT(7,0,6,0) + &pvaGetAlarmMsg, + &pvaGetTimeStampTag, +#endif }; } // namespace pvxlink diff --git a/ioc/pvxs/iochooks.h b/ioc/pvxs/iochooks.h index 6d8e8c3..65b33cc 100644 --- a/ioc/pvxs/iochooks.h +++ b/ioc/pvxs/iochooks.h @@ -100,14 +100,27 @@ void testPrepare(); PVXS_IOC_API void testShutdown(); +#ifdef PVXS_EXPERT_API_ENABLED PVXS_IOC_API -void testqsrvWaitForLinkEvent(struct link *plink); +void testqsrvWaitForLinkConnected(struct link *plink, bool conn=true); +PVXS_IOC_API +void testqsrvWaitForLinkConnected(const char* pv, bool conn=true); + +class PVXS_IOC_API QSrvWaitForLinkUpdate final { + struct link * const plink; + unsigned seq; +public: + QSrvWaitForLinkUpdate(struct link *plink); + QSrvWaitForLinkUpdate(const char* pv); + ~QSrvWaitForLinkUpdate(); +}; PVXS_IOC_API void testqsrvShutdownOk(void); PVXS_IOC_API void testqsrvCleanup(void); +#endif // PVXS_EXPERT_API_ENABLED }} // namespace pvxs::ioc #endif // PVXS_IOCHOOKS_H diff --git a/ioc/singlesource.cpp b/ioc/singlesource.cpp index 66bc0e1..1fd292c 100644 --- a/ioc/singlesource.cpp +++ b/ioc/singlesource.cpp @@ -102,6 +102,33 @@ void onSubscribe(const std::shared_ptr& subscriptio const DBEventContext& eventContext, std::unique_ptr&& subscriptionOperation) { + auto pvReq(subscriptionOperation->pvRequest()); + unsigned dbe = 0; + if(auto fld = pvReq["record._options.DBE"].ifMarked()) { + switch(fld.type().kind()) { + case Kind::String: { + auto mask(fld.as()); + // name and sloppy parsing a la. caProvider... +#define CASE(EVENT) if(mask.find(#EVENT)!=mask.npos) dbe |= DBE_ ## EVENT + CASE(VALUE); + CASE(ARCHIVE); + CASE(ALARM); +// CASE(PROPERTY); // handled as special case +#undef CASE + break; + } + case Kind::Integer: + case Kind::Real: + dbe = fld.as(); + break; + default: + break; + } + } + dbe &= (DBE_VALUE | DBE_ARCHIVE | DBE_ALARM); + if(!dbe) + dbe = DBE_VALUE | DBE_ALARM; + // inform peer of data type and acquire control of the subscription queue subscriptionContext->subscriptionControl = subscriptionOperation->connect(subscriptionContext->currentValue); @@ -115,7 +142,7 @@ void onSubscribe(const std::shared_ptr& subscriptio subscriptionContext->info->chan, subscriptionValueCallback, subscriptionContext.get(), - DBE_VALUE | DBE_ALARM | DBE_ARCHIVE + dbe ); // second subscription is for Property changes subscriptionContext->pPropertiesEventSubscription.subscribe(eventContext.get(), diff --git a/ioc/subscriptionctx.h b/ioc/subscriptionctx.h index db99c5e..2b3f324 100644 --- a/ioc/subscriptionctx.h +++ b/ioc/subscriptionctx.h @@ -39,7 +39,8 @@ public: user_sub, user_arg, select), [chan](dbEventSubscription sub) mutable { - db_cancel_event(sub); + if(sub) + db_cancel_event(sub); chan = Channel(); // dbChannel* must outlive subscription }); if(!sub) diff --git a/setup.py b/setup.py index ec3b9d1..d3a4135 100755 --- a/setup.py +++ b/setup.py @@ -593,6 +593,11 @@ def define_DSOS(self): "ioc/singlesourcehooks.cpp", "ioc/singlesrcsubscriptionctx.cpp", "ioc/typeutils.cpp", + "ioc/pvalink_channel.cpp", + "ioc/pvalink.cpp", + "ioc/pvalink_jlif.cpp", + "ioc/pvalink_link.cpp", + "ioc/pvalink_lset.cpp", ] probe = ProbeToolchain() diff --git a/test/Makefile b/test/Makefile index 4771446..9cd3cb3 100644 --- a/test/Makefile +++ b/test/Makefile @@ -126,6 +126,7 @@ testpvalink_SRCS += testpvalink.cpp testpvalink_SRCS += testioc_registerRecordDeviceDriver.cpp testpvalink_LIBS += pvxsIoc pvxs $(EPICS_BASE_IOC_LIBS) TESTS += testpvalink +TESTFILES += ../testpvalink.db endif diff --git a/test/testpvalink.cpp b/test/testpvalink.cpp index d66bccc..bffeb95 100644 --- a/test/testpvalink.cpp +++ b/test/testpvalink.cpp @@ -1,14 +1,31 @@ +/* + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvxs is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ #include +#include +#include +#include +#include #include +#include +#include +#include #include #include #include +#define PVXS_ENABLE_EXPERT_API + //#include //#include "utilities.h" #include "dblocker.h" +#include #include "pvxs/iochooks.h" +#include +#include #include "pvalink.h" #include "testioc.h" //#include "pv/qsrv.h" @@ -16,16 +33,24 @@ using namespace pvxs::ioc; using namespace pvxs; -namespace -{ +namespace { + struct TestMonitor { + testMonitor * const mon; + TestMonitor(const char* pvname, unsigned dbe_mask, unsigned opt=0) + :mon(testMonitorCreate(pvname, dbe_mask, opt)) + {} + ~TestMonitor() { testMonitorDestroy(mon); } + void wait() { testMonitorWait(mon); } + unsigned count(bool reset=true) { return testMonitorCount(mon, reset); } + }; + void testGet() { testDiag("==== testGet ===="); longinRecord *i1 = (longinRecord *)testdbRecordPtr("src:i1"); - while (!dbIsLinkConnected(&i1->inp)) - testqsrvWaitForLinkEvent(&i1->inp); + testqsrvWaitForLinkConnected(&i1->inp); testdbGetFieldEqual("target:i.VAL", DBF_LONG, 42L); @@ -39,8 +64,7 @@ namespace testdbPutFieldOk("src:i1.INP", DBF_STRING, "{\"pva\":\"target:ai\"}"); - while (!dbIsLinkConnected(&i1->inp)) - testqsrvWaitForLinkEvent(&i1->inp); + testqsrvWaitForLinkConnected(&i1->inp); testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 42L); // changing link doesn't automatically process @@ -58,8 +82,7 @@ namespace std::string pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"field\":\"display.precision\"}}"; testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length()+1, pv_name.c_str()); - while (!dbIsLinkConnected(&i1->inp)) - testqsrvWaitForLinkEvent(&i1->inp); + testqsrvWaitForLinkConnected(&i1->inp); testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 4L); // changing link doesn't automatically process @@ -76,20 +99,24 @@ namespace testDiag("==== Test proc settings ===="); + testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 2L); + // Set it to CPP std::string pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"proc\":\"CPP\"}}"; - testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length()+1, pv_name.c_str()); + { + TestMonitor m("src:i1", DBE_VALUE); + testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length()+1, pv_name.c_str()); + // wait for initial scan + m.wait(); + } - while (!dbIsLinkConnected(&i1->inp)) - testqsrvWaitForLinkEvent(&i1->inp); - - // Link should read old value again + // Link should read current value of target. testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 4L); - testdbPutFieldOk("target:ai", DBF_FLOAT, 5.0); - - // We are already connected at this point, wait for the update. - testqsrvWaitForLinkEvent(&i1->inp); + { + QSrvWaitForLinkUpdate C(&i1->inp); + testdbPutFieldOk("target:ai", DBF_FLOAT, 5.0); + } // now it's changed testdbGetFieldEqual("src:i1.VAL", DBF_LONG, 5L); @@ -104,8 +131,7 @@ namespace std::string pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"sevr\":\"NMS\"}}"; testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length() + 1, pv_name.c_str()); - while (!dbIsLinkConnected(&i1->inp)) - testqsrvWaitForLinkEvent(&i1->inp); + testqsrvWaitForLinkConnected(&i1->inp); testdbPutFieldOk("target:ai.LOLO", DBF_FLOAT, 5.0); testdbPutFieldOk("target:ai.LLSV", DBF_STRING, "MAJOR"); @@ -117,8 +143,7 @@ namespace pv_name = "{\"pva\":{\"pv\":\"target:ai\",\"sevr\":\"MS\"}}"; testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length() + 1, pv_name.c_str()); - while (!dbIsLinkConnected(&i1->inp)) - testqsrvWaitForLinkEvent(&i1->inp); + testqsrvWaitForLinkConnected(&i1->inp); testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); testdbGetFieldEqual("src:i1.SEVR", DBF_SHORT, epicsSevMajor); @@ -126,14 +151,16 @@ namespace pv_name = "{\"pva\":{\"pv\":\"target:mbbi\",\"sevr\":\"MSI\"}}"; testdbPutArrFieldOk("src:i1.INP$", DBF_CHAR, pv_name.length() + 1, pv_name.c_str()); - while (!dbIsLinkConnected(&i1->inp)) - testqsrvWaitForLinkEvent(&i1->inp); + testqsrvWaitForLinkConnected(&i1->inp); testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); testdbGetFieldEqual("src:i1.SEVR", DBF_SHORT, epicsSevNone); - testdbPutFieldOk("target:ai", DBF_FLOAT, 1.0); - testqsrvWaitForLinkEvent(&i1->inp); + { + QSrvWaitForLinkUpdate C(&i1->inp); + testdbPutFieldOk("target:ai", DBF_FLOAT, 1.0); + } + testdbPutFieldOk("src:i1.PROC", DBF_LONG, 1L); testdbGetFieldEqual("src:i1.SEVR", DBF_SHORT, epicsSevInvalid); } @@ -144,17 +171,16 @@ namespace longoutRecord *o2 = (longoutRecord *)testdbRecordPtr("src:o2"); - while (!dbIsLinkConnected(&o2->out)) - testqsrvWaitForLinkEvent(&o2->out); + testqsrvWaitForLinkConnected(&o2->out); testdbGetFieldEqual("target:i2.VAL", DBF_LONG, 43L); testdbGetFieldEqual("src:o2.VAL", DBF_LONG, 0L); testdbGetFieldEqual("src:o2.OUT", DBF_STRING, "{\"pva\":\"target:i2\"}"); - testdbPutFieldOk("src:o2.VAL", DBF_LONG, 14L); - - testqsrvWaitForLinkEvent(&o2->out); - testqsrvWaitForLinkEvent(&o2->out); + { + QSrvWaitForLinkUpdate C(&o2->out); + testdbPutFieldOk("src:o2.VAL", DBF_LONG, 14L); + } testdbGetFieldEqual("target:i2.VAL", DBF_LONG, 14L); testdbGetFieldEqual("src:o2.VAL", DBF_LONG, 14L); @@ -170,71 +196,294 @@ namespace testdbPutFieldOk("target:str1.PROC", DBF_LONG, 1L); testdbGetFieldEqual("target:str1", DBF_STRING, "bar"); - testdbPutFieldOk("src:str.OUT", DBF_STRING, "{pva : \"target:str2\"}"); + testdbPutFieldOk("src:str.OUT", DBF_STRING, R"({"pva" : "target:str2"})"); - while (!dbIsLinkConnected(&so->out)) - testqsrvWaitForLinkEvent(&so->out); + testqsrvWaitForLinkConnected(&so->out); - testdbPutFieldOk("src:str.PROC", DBF_LONG, 1L); - - testqsrvWaitForLinkEvent(&so->out); + { + QSrvWaitForLinkUpdate C(&so->out); + testdbPutFieldOk("src:str.PROC", DBF_LONG, 1L); + } testdbGetFieldEqual("target:str2", DBF_STRING, "bar"); } + void testToFromString() + { + testDiag("==== %s ====", __func__); + + testqsrvWaitForLinkConnected("testToFromString:src.OUT"); + testqsrvWaitForLinkConnected("testToFromString:str2.INP"); + testqsrvWaitForLinkConnected("testToFromString:out.INP"); + + { + QSrvWaitForLinkUpdate C("testToFromString:out.INP"); + testdbPutFieldOk("testToFromString:src", DBR_LONG, 43); + } + + testdbGetFieldEqual("testToFromString:str1", DBR_STRING, "43"); + testdbGetFieldEqual("testToFromString:str2", DBR_STRING, "43"); + testdbGetFieldEqual("testToFromString:out", DBR_LONG, 43); + } + void testArrays() { - aaoRecord *aao = (aaoRecord *)testdbRecordPtr("source:aao"); + auto aai_inp = (aaiRecord *)testdbRecordPtr("target:aai_inp"); testDiag("==== testArrays ===="); static const epicsFloat32 input_arr[] = {1, 2, -1, 1.2, 0}; - testdbPutArrFieldOk("source:aao", DBR_FLOAT, 5, input_arr); + { + QSrvWaitForLinkUpdate C(&aai_inp->inp); + testdbPutArrFieldOk("source:aao", DBR_FLOAT, 5, input_arr); + } + // underlying channel cache updated, but record has not be re-processed testdbGetArrFieldEqual("target:aai_inp", DBF_CHAR, 10, 0, NULL); - testqsrvWaitForLinkEvent(&aao->out); - testqsrvWaitForLinkEvent(&aao->out); - static const epicsInt8 expected_char[] = {1, 2, -1, 1, 0}; testdbPutFieldOk("target:aai_inp.PROC", DBF_LONG, 1L); testdbGetArrFieldEqual("target:aai_inp", DBF_CHAR, 10, 5, expected_char); static const epicsUInt32 expected_ulong[] = {1L, 2L, 4294967295L, 1L, 0}; testdbGetArrFieldEqual("target:aai_inp", DBF_ULONG, 10, 5, expected_ulong); + + testqsrvWaitForLinkConnected("target:aai_inp_first.INP"); + testdbPutFieldOk("target:aai_inp_first.PROC", DBF_LONG, 1L); + testdbGetFieldEqual("target:aai_inp_first", DBR_DOUBLE, 1.0); + } + + void testStringArray() + { + testDiag("==== %s ====", __func__); + + testqsrvWaitForLinkConnected("sarr:inp.INP"); + + const char expect[3][MAX_STRING_SIZE] = {"one", "two", "three"}; + { + QSrvWaitForLinkUpdate U("sarr:inp.INP"); + + testdbPutArrFieldOk("sarr:src", DBR_STRING, 3, expect); + } + + testdbPutFieldOk("sarr:inp.PROC", DBR_LONG, 0); + + testdbGetArrFieldEqual("sarr:inp", DBR_STRING, 4, 3, expect); } void testPutAsync() { -#ifdef USE_MULTILOCK testDiag("==== testPutAsync ===="); - longoutRecord *trig = (longoutRecord *)testdbRecordPtr("async:trig"); + auto trig = (longoutRecord *)testdbRecordPtr("async:trig"); + auto seq = (calcRecord *)testdbRecordPtr("async:seq"); - while (!dbIsLinkConnected(&trig->out)) - testqsrvWaitForLinkEvent(&trig->out); + testqsrvWaitForLinkConnected(&trig->out); - testMonitor *done = testMonitorCreate("async:after", DBE_VALUE, 0); + TestMonitor done("async:seq", DBE_VALUE, 0); testdbPutFieldOk("async:trig.PROC", DBF_LONG, 1); - testMonitorWait(done); + dbScanLock((dbCommon*)seq); + while(seq->val < 2) { + dbScanUnlock((dbCommon*)seq); + done.wait(); + dbScanLock((dbCommon*)seq); + } + dbScanUnlock((dbCommon*)seq); - testdbGetFieldEqual("async:trig", DBF_LONG, 1); - testdbGetFieldEqual("async:slow", DBF_LONG, 1); // pushed from async:trig - testdbGetFieldEqual("async:slow2", DBF_LONG, 2); - testdbGetFieldEqual("async:after", DBF_LONG, 3); - -#else - testSkip(5, "Not USE_MULTILOCK"); -#endif + testdbGetFieldEqual("async:target", DBF_LONG, 1); + testdbGetFieldEqual("async:next", DBF_LONG, 2); + testdbGetFieldEqual("async:seq", DBF_LONG, 2); } + void testDisconnect() + { + testDiag("==== %s ====", __func__); + auto serv(ioc::server()); + + testdbPutFieldFail(-1, "disconnected.PROC", DBF_LONG, 1); + testdbGetFieldEqual("disconnected.SEVR", DBF_SHORT, epicsSevInvalid); + + auto special(server::SharedPV::buildReadonly()); + special.open(nt::NTScalar{TypeCode::Int32}.create() + .update("value", 43)); + serv.addPV("special:pv", special); + + testqsrvWaitForLinkConnected("disconnected.INP"); + + testdbPutFieldOk("disconnected.PROC", DBF_LONG, 1); + testdbGetFieldEqual("disconnected.SEVR", DBF_SHORT, epicsSevNone); + + serv.removePV("special:pv"); + special.close(); + + testqsrvWaitForLinkConnected("disconnected.INP", false); + + testdbPutFieldFail(-1, "disconnected.PROC", DBF_LONG, 1); + testdbGetFieldEqual("disconnected.SEVR", DBF_SHORT, epicsSevInvalid); + + testdbPutFieldOk("disconnected.INP", DBR_STRING, ""); // avoid further log messages + } + + void testMeta() + { + testDiag("==== %s ====", __func__); + + testqsrvWaitForLinkConnected("meta:inp.INP"); + + { + auto src = (aiRecord*)testdbRecordPtr("meta:src"); + QSrvWaitForLinkUpdate U("meta:inp.INP"); + dbScanLock((dbCommon*)src); + src->tse = epicsTimeEventDeviceTime; + src->time.secPastEpoch = 0x12345678; + src->time.nsec = 0x10203040; + src->val = 7; + dbProcess((dbCommon*)src); + dbScanUnlock((dbCommon*)src); + } + auto inp = (aiRecord*)testdbRecordPtr("meta:inp"); + + long ret, nelem; + epicsEnum16 stat, sevr; + epicsTimeStamp time; + char egu[10] = ""; + short prec; + double val, lolo, low, high, hihi; + + dbScanLock((dbCommon*)inp); + + testTrue(dbIsLinkConnected(&inp->inp)!=0); + + testEq(dbGetLinkDBFtype(&inp->inp), DBF_DOUBLE); + + // alarm and time meta-data will be "latched" by a call to dbGetLink. + // until then, the initial values are used + + testTrue((ret=dbGetAlarm(&inp->inp, &stat, &sevr))==0 + && stat==LINK_ALARM && sevr==INVALID_ALARM) + <<" ret="<inp, &nelem))==0 && nelem==1) + <<" ret="<val==0) { + dbScanUnlock(prec); + mon.wait(); + dbScanLock(prec); + } + dbScanUnlock(prec); + } + + testdbGetFieldEqual("flnk:tgt", DBF_LONG, 1); + } + + void testAtomic() + { + testDiag("==== %s ====", __func__); + + testqsrvWaitForLinkConnected("atomic:lnk:1.INP"); + testqsrvWaitForLinkConnected("atomic:lnk:2.INP"); + + { + QSrvWaitForLinkUpdate A("atomic:lnk:1.INP"); + QSrvWaitForLinkUpdate B("atomic:lnk:2.INP"); + + testdbPutFieldOk("atomic:src:1.PROC", DBR_LONG, 0); + } + + epicsUInt32 expect; + { + auto src1(testdbRecordPtr("atomic:src:1")); + dbScanLock(src1); + expect = ((calcoutRecord*)src1)->val; + testEq(expect & ~0xff, 0u); + expect |= expect<<8u; + dbScanUnlock(src1); + } + + testdbGetFieldEqual("atomic:lnk:out", DBF_ULONG, expect); + } + + void testEnum() + { + testDiag("==== %s ====", __func__); + + testqsrvWaitForLinkConnected("enum:src:b.OUT"); + testqsrvWaitForLinkConnected("enum:src:s.OUT"); + testqsrvWaitForLinkConnected("enum:tgt:s.INP"); + testqsrvWaitForLinkConnected("enum:tgt:b.INP"); + + { + QSrvWaitForLinkUpdate A("enum:tgt:b.INP"); // last in chain... + + testdbPutFieldOk("enum:src:b", DBR_STRING, "one"); + } + + testdbGetFieldEqual("enum:tgt:s", DBR_STRING, "one"); + // not clear how to handle this case, where a string is + // read as DBR_USHORT, which is actually as DBF_ENUM + testTodoBegin("Not yet implimented"); + testdbGetFieldEqual("enum:tgt:b", DBR_STRING, "one"); + testTodoEnd(); + } } // namespace extern "C" void testioc_registerRecordDeviceDriver(struct dbBase *); MAIN(testpvalink) { - testPlan(49); + testPlan(92); testSetup(); + pvxs::logger_config_env(); try { @@ -251,8 +500,15 @@ MAIN(testpvalink) testSevr(); testPut(); testStrings(); + testToFromString(); testArrays(); - (void)testPutAsync; + testStringArray(); + testPutAsync(); + testDisconnect(); + testMeta(); + testFwd(); + testAtomic(); + testEnum(); testqsrvShutdownOk(); IOC.shutdown(); testqsrvCleanup(); @@ -261,6 +517,8 @@ MAIN(testpvalink) { testFail("Unexpected exception: %s", e.what()); } - // call epics atexits explicitly as workaround for c++ static dtor issues... - epicsExit(testDone()); + // call epics atexits explicitly to handle older base w/o de-init hooks + epicsExitCallAtExits(); + cleanup_for_valgrind(); + return testDone(); } diff --git a/test/testpvalink.db b/test/testpvalink.db index 793b22f..082a885 100644 --- a/test/testpvalink.db +++ b/test/testpvalink.db @@ -11,6 +11,8 @@ record(ai, "target:ai") { record(longin, "src:i1") { field(INP, {"pva":"target:i"}) + field(MDEL, "-1") + field(TPRO, "1") } record(mbbi, "target:mbbi") { @@ -48,31 +50,37 @@ record(stringout, "src:str") { field(VAL, "bar") } +record(longout, "testToFromString:src") { + field(VAL , "42") + field(OUT , {pva:"testToFromString:str1"}) +} +record(stringin, "testToFromString:str1") { +} +record(aai, "testToFromString:str2") { + field(FTVL, "STRING") + field(NELM, "5") + field(INP , {pva:{pv:"testToFromString:str1", "proc":"CPP"}}) +} +record(longin, "testToFromString:out") { + field(INP , {pva:{pv:"testToFromString:str2", "proc":"CPP"}}) +} + +record(calc, "async:seq") { + field(CALC, "VAL+1") +} + record(longout, "async:trig") { - field(OMSL, "closed_loop") - field(DOL , "async:seq PP") field(DTYP, "Async Soft Channel") - field(OUT , { "pva":{"pv":"async:slow.A", "proc":true} }) - field(FLNK, "async:after") - field(TPRO, "1") + field(OUT , {"pva":{"pv":"async:target", "proc":true}}) + field(FLNK, "async:next") } -record(calcout, "async:slow") { - field(ODLY, "1") - field(CALC, "A") - field(FLNK, "async:slow2") - field(TPRO, "1") +record(longin, "async:target") { + field(INP , "async:seq PP MS") } -record(longin, "async:slow2") { - field(INP , "async:seq PP") - field(TPRO, "1") -} - -record(longin, "async:after") { - field(INP , "async:seq PP") - field(MDEL, "-1") - field(TPRO, "1") +record(longin, "async:next") { + field(INP , "async:seq PP MS") } record(aao, "source:aao") { @@ -90,4 +98,109 @@ record(aai, "target:aai_inp") { record(aai, "target:aai_out") { field(NELM, "2") field(FTVL, "ULONG") -} \ No newline at end of file +} + +record(ai, "target:aai_inp_first") { + field(INP, {pva: "source:aao"}) +} + +record(longin, "disconnected") { + field(INP, {pva: "special:pv"}) + field(VAL, "42") +} + +record(ao, "meta:src") { + field(DRVH, "10") + field(HOPR, "9") + field(HIHI, "8") + field(HIGH, "7") + field(LOW , "-7") + field(LOLO, "-8") + field(LOPR, "-9") + field(DRVL, "-10") + field(HHSV, "MAJOR") + field(HSV , "MINOR") + field(LSV , "MINOR") + field(LLSV, "MAJOR") + field(PREC, "2") + field(EGU , "arb") +} + +record(ai, "meta:inp") { + field(INP, {pva:{pv:"meta:src", sevr:"MS"}}) +} + +record(longout, "flnk:src") { + field(FLNK, {pva:"flnk:tgt"}) +} + +record(calc, "flnk:tgt") { + field(CALC, "VAL+1") +} + + +record(calcout, "atomic:src:1") { + field(CALC, "RNDM*255") + field(OUT , "atomic:src:2.A PP") + info(Q:group, { + "atomic:src":{ + "a": {+channel:"VAL"} + } + }) +} +record(calc, "atomic:src:2") { + field(CALC, "A<<8") + info(Q:group, { + "atomic:src":{ + "b": {+channel:"VAL", +trigger:"*"} + } + }) +} + +record(longin, "atomic:lnk:1") { + field(INP , { + pva:{pv:"atomic:src", field:"a", atomic:true, monorder:0, proc:"CP"} + }) +} +record(longin, "atomic:lnk:2") { + field(INP , { + pva:{pv:"atomic:src", field:"b", atomic:true, monorder:1, proc:"CP"} + }) + field(FLNK, "atomic:lnk:out") +} +record(calc, "atomic:lnk:out") { + field(INPA, "atomic:lnk:1 NPP MS") + field(INPB, "atomic:lnk:2 NPP MS") + field(CALC, "A|B") +} + +record(bo, "enum:src:b") { + field(OUT , {pva:{pv:"enum:tgt", proc:"PP"}}) + field(ZNAM, "zero") + field(ONAM, "one") +} +record(stringout, "enum:src:s") { + field(OUT , {pva:{pv:"enum:tgt", proc:"PP"}}) +} +record(bi, "enum:tgt") { + field(ZNAM, "zero") + field(ONAM, "one") +} +record(stringin, "enum:tgt:s") { + field(INP , {pva:{pv:"enum:tgt", proc:"CP"}}) +} +record(bi, "enum:tgt:b") { + field(INP , {pva:{pv:"enum:tgt:s", proc:"CP"}}) + field(ZNAM, "zero") + field(ONAM, "one") +} + +record(waveform, "sarr:src") { + field(FTVL, "STRING") + field(NELM, "16") +} +record(waveform, "sarr:inp") { + field(INP , {pva:"sarr:src"}) + field(FTVL, "STRING") + field(NELM, "16") +} diff --git a/test/testqgroup.cpp b/test/testqgroup.cpp index a04ea6a..b5741f9 100644 --- a/test/testqgroup.cpp +++ b/test/testqgroup.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include "testioc.h" @@ -740,6 +741,8 @@ MAIN(testqgroup) testIQ(); testConst(); } + // call epics atexits explicitly to handle older base w/o de-init hooks + epicsExitCallAtExits(); cleanup_for_valgrind(); return testDone(); } diff --git a/test/testqsingle.cpp b/test/testqsingle.cpp index 12f0ba7..4f65db3 100644 --- a/test/testqsingle.cpp +++ b/test/testqsingle.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -910,6 +911,8 @@ MAIN(testqsingle) timeSim = false; testPutBlock(); } + // call epics atexits explicitly to handle older base w/o de-init hooks + epicsExitCallAtExits(); cleanup_for_valgrind(); return testDone(); }