Files
pva2pva/pdbApp/pvalink_channel.cpp
Michael Davidsaver 2934c8a1e4 minor
2018-04-18 10:46:52 -07:00

300 lines
7.4 KiB
C++

#include <alarm.h>
#include <pv/reftrack.h>
#define epicsExportSharedSymbols
#include <shareLib.h>
#include "pvalink.h"
namespace pvalink {
pvaGlobal_t *pvaGlobal;
pvaGlobal_t::pvaGlobal_t()
:provider_local("server:QSRV")
,provider_remote("pva")
,create(pvd::getPVDataCreate())
,queue("PVAL")
{
// worker should be above PVA worker priority?
queue.start(1, epicsThreadPriorityMedium);
}
pvaGlobal_t::~pvaGlobal_t()
{
}
size_t pvaLinkChannel::num_instances;
size_t pvaLink::num_instances;
bool pvaLinkChannel::LinkSort::operator()(const pvaLink *L, const pvaLink *R) const {
if(L->monorder==R->monorder)
return L < R;
return L->monorder < R->monorder;
}
// being called with pvaGlobal::lock held
pvaLinkChannel::pvaLinkChannel(const pvaGlobal_t::channels_key_t &key, const pvd::PVStructure::const_shared_pointer& pvRequest)
:key(key)
,pvRequest(pvRequest)
,num_disconnect(0u)
,connected(false)
,connected_latched(false)
,isatomic(false)
,queued(false)
,links_changed(false)
{}
pvaLinkChannel::~pvaLinkChannel() {
{
Guard G(pvaGlobal->lock);
pvaGlobal->channels.erase(key);
}
Guard G(lock);
assert(links.empty());
REFTRACE_DECREMENT(num_instances);
}
void pvaLinkChannel::open()
{
Guard G(lock);
try {
chan = pvaGlobal->provider_local.connect(key.first);
TRACE(<<"Local "<<key.first);
} catch(std::exception& e){
errlogPrintf("failed to find in QSRV; %s\n", key.first.c_str());
}
if(!pvaLinkIsolate && !chan) {
chan = pvaGlobal->provider_remote.connect(key.first);
TRACE(<<"Remote "<<key.first);
}
op_mon = chan.monitor(this, pvRequest);
REFTRACE_INCREMENT(num_instances);
}
// call with channel lock held
void pvaLinkChannel::put()
{
if(!connected) return;
bool doit = false;
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
if(!link->used_scratch) continue;
pvd::shared_vector<const void> temp;
temp.swap(link->put_scratch);
link->used_scratch = false;
temp.swap(link->put_queue);
link->used_queue = true;
doit = true;
}
if(doit) {
TRACE(<<"start");
// start net Put, cancels in-progress put
op_put = chan.put(this); // TODO: pvRequest
}
}
void pvaLinkChannel::putBuild(const epics::pvData::StructureConstPtr& build, pvac::ClientChannel::PutCallback::Args& args)
{
TRACE();
Guard G(lock);
pvd::PVStructurePtr top(pvaGlobal->create->createPVStructure(build));
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
if(!link->used_queue) continue;
link->used_queue = false; // clear early so unexpected exception won't get us in a retry loop
pvd::PVFieldPtr value(top->getSubField("value"));
if(!value) return; // TODO: how to signal error?
pvd::PVStringArray::const_svector choices; // TODO populate from op_mon
TRACE(<<"store "<<value->getFullName());
copyDBF2PVD(link->put_queue, value, args.tosend, choices);
link->put_queue.clear();
}
args.root = top;
}
void pvaLinkChannel::putDone(const pvac::PutEvent& evt)
{
TRACE(<<evt.event<<" "<<evt.message);
if(evt.event==pvac::PutEvent::Fail) {
errlogPrintf("%s PVA link put ERROR: %s\n", key.first.c_str(), evt.message.c_str());
}
Guard G(lock);
op_put = pvac::Operation();
if(evt.event!=pvac::PutEvent::Success) {
TRACE(<<"skip");
} else {
TRACE(<<"repeat");
put();
}
}
void pvaLinkChannel::monitorEvent(const pvac::MonitorEvent& evt)
{
bool queue = false;
{
TRACE(<<evt.event);
Guard G(lock);
switch(evt.event) {
case pvac::MonitorEvent::Disconnect:
case pvac::MonitorEvent::Data:
connected = evt.event == pvac::MonitorEvent::Data;
queue = true;
break;
case pvac::MonitorEvent::Cancel:
break; // no-op
case pvac::MonitorEvent::Fail:
connected = false;
queue = true;
errlogPrintf("%s: PVA link monitor ERROR: %s\n", chan.name().c_str(), evt.message.c_str());
break;
}
if(queued)
return; // already scheduled
queued = queue;
}
if(queue) {
pvaGlobal->queue.add(shared_from_this());
}
}
// Running from global WorkQueue thread
void pvaLinkChannel::run()
{
bool requeue = false;
{
Guard G(lock);
queued = false;
connected_latched = connected;
// pop next update from monitor queue.
// still under lock to safeguard concurrent calls to lset functions
if(connected && !op_mon.poll())
return; // monitor queue is empty, nothing more to do here
if(!connected) {
num_disconnect++;
// cancel pending put operations
op_put = pvac::Operation();
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
link->onDisconnect();
}
}
// at this point we know we will re-queue, but not immediately
// so an expected error won't get us stuck in a tight loop.
requeue = queued = true;
if(links_changed) {
// a link has been added or removed since the last update.
// rebuild our cached list of records to (maybe) process.
scan_records.clear();
scan_check_passive.clear();
for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
{
pvaLink *link = *it;
assert(link && link->alive);
if(!link->plink) continue;
// NPP and none/Default don't scan
// PP, CP, and CPP do scan
// PP and CPP only if SCAN=Passive
if(link->pp != pvaLink::PP && link->pp != pvaLink::CPP && link->pp != pvaLink::CP)
continue;
scan_records.push_back(link->plink->precord);
scan_check_passive.push_back(link->pp != pvaLink::CP);
}
DBManyLock ML(scan_records);
atomic_lock.swap(ML);
links_changed = false;
}
}
if(scan_records.empty()) {
// Nothing to do, so don't bother locking
} else if(isatomic) {
DBManyLocker L(atomic_lock);
for(size_t i=0, N=scan_records.size(); i<N; i++) {
dbCommon *precord = scan_records[i];
if (precord->pact) {
if (precord->tpro)
printf("%s: Active %s\n",
epicsThreadGetNameSelf(), precord->name);
precord->rpro = TRUE;
} else if(scan_check_passive[i] && precord->scan!=0) {
continue;
}
dbProcess(precord);
}
} else {
for(size_t i=0, N=scan_records.size(); i<N; i++) {
DBScanLocker L(scan_records[i]);
if(scan_check_passive[i] && scan_records[i]->scan!=0) {
continue;
}
dbProcess(scan_records[i]);
}
}
if(requeue) {
// re-queue until monitor queue is empty
pvaGlobal->queue.add(shared_from_this());
}
}
} // namespace pvalink