add BaseMonitor

This commit is contained in:
Michael Davidsaver
2016-03-16 16:37:29 -04:00
parent eaf441d9f7
commit 79da95efd5
5 changed files with 316 additions and 263 deletions

View File

@ -1,6 +1,8 @@
#ifndef PVAHELPER_H
#define PVAHELPER_H
#include <deque>
#include <epicsGuard.h>
#include <pv/pvAccess.h>
@ -20,13 +22,22 @@ struct BaseChannel : public epics::pvAccess::Channel
typedef epicsGuard<epicsMutex> guard_t;
const std::string pvname;
std::tr1::shared_ptr<epics::pvAccess::ChannelProvider> provider;
epics::pvAccess::ChannelRequester::shared_pointer requester;
typedef epics::pvAccess::ChannelRequester::shared_pointer requester_t;
requester_t requester;
const epics::pvData::StructureConstPtr fielddesc;
// assume Requester methods not called after destory()
virtual std::string getRequesterName() { guard_t G(lock); return requester->getRequesterName(); }
virtual void destroy() { guard_t G(lock); provider.reset(); requester.reset(); }
virtual void destroy() {
std::tr1::shared_ptr<epics::pvAccess::ChannelProvider> prov;
requester_t req;
{
guard_t G(lock);
provider.swap(prov);
requester.swap(req);
}
}
virtual std::tr1::shared_ptr<epics::pvAccess::ChannelProvider> getProvider() { guard_t G(lock); return provider; }
virtual std::string getRemoteAddress() { guard_t G(lock); return requester->getRequesterName(); }
@ -115,4 +126,273 @@ struct BaseChannel : public epics::pvAccess::Channel
}
};
struct BaseMonitor : public epics::pvAccess::Monitor,
public std::tr1::enable_shared_from_this<BaseMonitor>
{
POINTER_DEFINITIONS(BaseMonitor);
typedef epics::pvAccess::MonitorRequester requester_t;
mutable epicsMutex lock; // not held during any callback
typedef epicsGuard<epicsMutex> guard_t;
private:
requester_t::shared_pointer requester;
epics::pvData::PVStructurePtr complete;
epics::pvData::BitSet changed, overflow;
typedef std::deque<epics::pvAccess::MonitorElementPtr> buffer_t;
bool inoverflow;
bool running;
size_t nbuffers;
buffer_t inuse, empty;
public:
BaseMonitor(const requester_t::shared_pointer& requester,
const epics::pvData::PVStructure::shared_pointer& pvReq)
:requester(requester)
,inoverflow(false)
,running(false)
,nbuffers(2)
{}
virtual ~BaseMonitor() {destroy();}
inline const epics::pvData::PVStructurePtr& getValue() { return complete; }
//! Must call before first post(). Sets .complete and calls monitorConnect()
//! @note that value will never by accessed except by post() and requestUpdate()
void connect(const epics::pvData::PVStructurePtr& value)
{
epics::pvData::StructureConstPtr dtype(value->getStructure());
epics::pvData::PVDataCreatePtr create(epics::pvData::getPVDataCreate());
BaseMonitor::shared_pointer self;
requester_t::shared_pointer req;
{
guard_t G(lock);
assert(!complete); // can't call twice
self = shared_from_this();
req = requester;
complete = value;
empty.resize(nbuffers);
for(size_t i=0; i<empty.size(); i++) {
empty[i].reset(new epics::pvAccess::MonitorElement(create->createPVStructure(dtype)));
}
}
epics::pvData::Status sts;
req->monitorConnect(sts, self, dtype);
}
struct no_overflow {};
//! post update if queue not full, if full return false w/o overflow
bool post(const epics::pvData::BitSet& updated, no_overflow)
{
requester_t::shared_pointer req;
{
guard_t G(lock);
if(!complete || !running) return false;
changed |= updated;
if(empty.empty()) return false;
if(p_postone())
req = requester;
inoverflow = false;
}
if(req) req->monitorEvent(shared_from_this());
return true;
}
//! post update of pending changes. eg. call from requestUpdate()
bool post()
{
bool oflow;
requester_t::shared_pointer req;
{
guard_t G(lock);
if(!complete || !running) return false;
if(empty.empty()) {
oflow = inoverflow = true;
} else {
if(p_postone())
req = requester;
oflow = inoverflow = false;
}
}
if(req) req->monitorEvent(shared_from_this());
return !oflow;
}
//! post update with changed and overflowed masks (eg. when updates were lost in some upstream queue)
bool post(const epics::pvData::BitSet& updated, const epics::pvData::BitSet& overflowed)
{
bool oflow;
requester_t::shared_pointer req;
{
guard_t G(lock);
if(!complete || !running) return false;
if(empty.empty()) {
oflow = inoverflow = true;
overflow |= overflowed;
overflow.or_and(updated, changed);
changed |= updated;
} else {
if(p_postone())
req = requester;
oflow = inoverflow = false;
}
}
if(req) req->monitorEvent(shared_from_this());
return !oflow;
}
//! post update with changed
bool post(const epics::pvData::BitSet& updated) {
bool oflow;
requester_t::shared_pointer req;
{
guard_t G(lock);
if(!complete || !running) return false;
if(empty.empty()) {
oflow = inoverflow = true;
overflow.or_and(updated, changed);
changed |= updated;
} else {
if(p_postone())
req = requester;
oflow = inoverflow = false;
}
}
if(req) req->monitorEvent(shared_from_this());
return !oflow;
}
private:
bool p_postone()
{
bool ret;
// assume lock is held
assert(!empty.empty());
epics::pvAccess::MonitorElementPtr& elem = empty.front();
elem->pvStructurePtr->copyUnchecked(*complete);
*elem->changedBitSet = changed;
*elem->overrunBitSet = overflow;
overflow.clear();
changed.clear();
ret = inuse.empty();
inuse.push_back(elem);
empty.pop_front();
return ret;
}
public:
// for special handling when MonitorRequester start()s or stop()s
virtual void onStart() {}
virtual void onStop() {}
//! called when a MonitorRequester callback would result in a subscription update
//! sub-class may apply necessary locking, update .complete, .changed, and .overflow, then call post()
virtual void requestUpdate() {guard_t G(lock); post();}
virtual void destroy()
{
requester_t::shared_pointer req;
{
guard_t G(lock);
if(running) {
running = false;
this->onStop();
}
requester.swap(req);
}
}
private:
virtual epics::pvData::Status start()
{
epics::pvData::Status ret;
bool notify = false;
BaseMonitor::shared_pointer self;
{
guard_t G(lock);
if(running) return ret;
running = true;
if(!complete) return ret; // haveType() not called (error?)
inoverflow = empty.empty();
if(!inoverflow) {
// post complete event
overflow.clear();
changed.clear();
changed.set(0);
notify = true;
}
}
if(notify) onStart(); // may result in post()
return ret;
}
virtual epics::pvData::Status stop()
{
BaseMonitor::shared_pointer self;
bool notify;
epics::pvData::Status ret;
{
guard_t G(lock);
notify = running;
running = false;
}
if(notify) onStop();
return ret;
}
virtual epics::pvAccess::MonitorElementPtr poll()
{
epics::pvAccess::MonitorElementPtr ret;
guard_t G(lock);
if(running && complete && !inuse.empty()) {
ret = inuse.front();
inuse.pop_front();
}
return ret;
}
virtual void release(epics::pvAccess::MonitorElementPtr const & elem)
{
BaseMonitor::shared_pointer self;
requester_t::shared_pointer req;
{
guard_t G(lock);
empty.push_back(elem);
if(inoverflow)
this->requestUpdate(); // may result in post()
}
}
public:
virtual void getStats(Stats& s) const
{
guard_t G(lock);
s.nempty = empty.size();
s.nfilled = inuse.size();
s.noutstanding = nbuffers - s.nempty - s.nfilled;
}
};
#endif // PVAHELPER_H

View File

@ -74,7 +74,7 @@ PDBSingleChannel::createMonitor(
pvd::PVStructure::shared_pointer const & pvRequest)
{
pva::Monitor::shared_pointer ret(new PDBSingleMonitor(shared_from_this(), requester, pvRequest));
requester->monitorConnect(pvd::Status(), ret, pv->fielddesc);
((PDBSingleMonitor*)ret.get())->activate();
return ret;
}
@ -147,44 +147,18 @@ void pdb_single_event(void *user_arg, struct dbChannel *chan,
{
PDBSingleMonitor::Event *evt=(PDBSingleMonitor::Event*)user_arg;
try{
PDBSingleMonitor::shared_pointer self(evt->self->shared_from_this());
PDBSingleMonitor::shared_pointer self(std::tr1::static_pointer_cast<PDBSingleMonitor>(evt->self->shared_from_this()));
PDBSingleMonitor::requester_t::shared_pointer req;
{
Guard G(self->lock); // TODO: lock order?
if(!self->running) return; // ignore
self->scratch.clear();
{
DBScanLocker L(dbChannelRecord(self->channel->pv->chan));
self->pvif->put(self->scratch, evt->dbe_mask, pfl);
}
if(self->empty.empty()){
self->changed |= self->scratch;
self->overflow |= self->scratch;
self->inoverflow = true;
} else {
assert(!self->inoverflow);
pvd::MonitorElementPtr elem(self->empty.front());
elem->pvStructurePtr->copyUnchecked(*self->complete);
*elem->changedBitSet = self->scratch;
elem->overrunBitSet->clear();
if(self->inuse.empty())
req = self->requester;
self->inuse.push_back(elem);
self->empty.pop_front();
}
}
if(req) {
//TODO: in worker/pool?
req->monitorEvent(self);
}
self->post(self->scratch);
}catch(std::tr1::bad_weak_ptr&){
/* We are racing destruction of the PDBSingleMonitor, but things are ok.
@ -220,109 +194,40 @@ PDBSingleMonitor::Event::~Event() {
PDBSingleMonitor::PDBSingleMonitor(const PDBSingleChannel::shared_pointer& channel,
const requester_t::shared_pointer& requester,
const pvd::PVStructure::shared_pointer& pvReq)
:channel(channel)
,requester(requester)
:BaseMonitor(requester, pvReq)
,channel(channel)
,evt_VALUE(this, DBE_VALUE|DBE_ALARM)
,evt_PROPERTY(this, DBE_PROPERTY)
,inoverflow(false)
,running(false)
,nbuffers(2)
{}
void PDBSingleMonitor::activate()
{
pvd::PVDataCreatePtr create(pvd::getPVDataCreate());
pvd::StructureConstPtr fielddesc(channel->pv->fielddesc);
complete = create->createPVStructure(fielddesc);
pvif.reset(PVIF::attach(channel->pv->chan, complete));
empty.resize(nbuffers);
for(size_t i=0; i<empty.size(); i++) {
empty[i].reset(new pva::MonitorElement(create->createPVStructure(fielddesc)));
}
connect(pvd::getPVDataCreate()->createPVStructure(channel->pv->fielddesc));
pvif.reset(PVIF::attach(channel->pv->chan, getValue()));
}
void PDBSingleMonitor::destroy()
{
PDBSingleChannel::shared_pointer ch;
requester_t::shared_pointer req;
{
Guard G(lock);
req.swap(requester);
channel.swap(ch);
}
BaseMonitor::destroy();
}
pvd::Status PDBSingleMonitor::start()
void PDBSingleMonitor::onStart()
{
Guard G(lock);
if(!running) {
running = true;
changed.clear();
overflow.clear();
db_event_enable(evt_VALUE.subscript);
db_event_enable(evt_PROPERTY.subscript);
db_post_single_event(evt_VALUE.subscript);
db_post_single_event(evt_PROPERTY.subscript);
inoverflow = empty.empty();
}
return pvd::Status();
guard_t G(lock);
db_event_enable(evt_VALUE.subscript);
db_event_enable(evt_PROPERTY.subscript);
db_post_single_event(evt_VALUE.subscript);
db_post_single_event(evt_PROPERTY.subscript);
}
pvd::Status PDBSingleMonitor::stop()
void PDBSingleMonitor::onStop()
{
Guard G(lock);
if(running) {
running = false;
db_event_disable(evt_VALUE.subscript);
db_event_disable(evt_PROPERTY.subscript);
}
return pvd::Status();
}
pva::MonitorElementPtr PDBSingleMonitor::poll()
{
Guard G(lock);
pva::MonitorElementPtr ret;
if(!inuse.empty()) {
ret = inuse.front();
inuse.pop_front();
}
return ret;
}
void PDBSingleMonitor::release(pva::MonitorElementPtr const & elem)
{
PDBSingleMonitor::shared_pointer self(shared_from_this());
PDBSingleMonitor::requester_t::shared_pointer req;
{
Guard G(lock);
if(inoverflow) {
elem->pvStructurePtr->copyUnchecked(*complete);
*elem->changedBitSet = changed;
*elem->overrunBitSet = overflow;
changed.clear();
overflow.clear();
if(self->inuse.empty())
req = self->requester;
inuse.push_back(elem);
} else {
empty.push_back(elem);
}
}
if(req) {
//TODO: in worker/pool?
req->monitorEvent(self);
}
}
void PDBSingleMonitor::getStats(Stats& s) const
{
s.nempty = empty.size();
s.nfilled = inuse.size();
s.noutstanding = nbuffers - s.nempty - s.nfilled;
guard_t G(lock);
db_event_disable(evt_VALUE.subscript);
db_event_disable(evt_PROPERTY.subscript);
}

View File

@ -113,20 +113,13 @@ struct PDBSinglePut : public epics::pvAccess::ChannelPut,
virtual void get();
};
struct PDBSingleMonitor : public epics::pvAccess::Monitor,
public std::tr1::enable_shared_from_this<PDBSingleMonitor>
struct PDBSingleMonitor : public BaseMonitor
{
POINTER_DEFINITIONS(PDBSingleMonitor);
typedef epics::pvAccess::MonitorRequester requester_t;
PDBSingleChannel::shared_pointer channel;
epicsMutex lock;
requester_t::shared_pointer requester;
epics::pvData::PVStructurePtr complete;
epics::pvData::BitSet changed, overflow, scratch;
epics::pvData::BitSet scratch;
std::auto_ptr<PVIF> pvif;
struct Event {
@ -138,24 +131,16 @@ struct PDBSingleMonitor : public epics::pvAccess::Monitor,
};
Event evt_VALUE, evt_PROPERTY;
typedef std::deque<epics::pvAccess::MonitorElementPtr> buffer_t;
bool inoverflow;
bool running;
size_t nbuffers;
buffer_t inuse, empty;
PDBSingleMonitor(const PDBSingleChannel::shared_pointer& channel,
const requester_t::shared_pointer& requester,
const epics::pvData::PVStructure::shared_pointer& pvReq);
virtual ~PDBSingleMonitor() {destroy();}
void activate();
virtual void onStart();
virtual void onStop();
virtual void destroy();
virtual epics::pvData::Status start();
virtual epics::pvData::Status stop();
virtual epics::pvAccess::MonitorElementPtr poll();
virtual void release(epics::pvAccess::MonitorElementPtr const & elem);
virtual void getStats(Stats& s) const;
};
#endif // PDBSINGLE_H

View File

@ -201,8 +201,8 @@ static size_t countTestPVChannel;
TestPVChannel::TestPVChannel(const std::tr1::shared_ptr<TestPV> &pv,
const std::tr1::shared_ptr<pva::ChannelRequester> &req)
:pv(pv)
,requester(req)
:BaseChannel(pv->name, pv->provider, req, pv->dtype)
,pv(pv)
,state(CONNECTED)
{
epicsAtomicIncrSizeT(&countTestPVChannel);
@ -216,32 +216,12 @@ TestPVChannel::~TestPVChannel()
epicsAtomicDecrSizeT(&countTestPVChannel);
}
void TestPVChannel::destroy()
{
std::tr1::shared_ptr<TestProvider> P(pv->provider);
Guard G(P->lock);
requester.reset();
state = DESTROYED;
}
std::tr1::shared_ptr<pva::ChannelProvider>
TestPVChannel::getProvider()
{ return pv->provider; }
TestPVChannel::ConnectionState TestPVChannel::getConnectionState()
{
Guard G(pv->provider->lock);
return state;
}
std::string TestPVChannel::getChannelName()
{ return pv->name; }
bool TestPVChannel::isConnected()
{
return getConnectionState()==CONNECTED;
}
void TestPVChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField)
{
Guard G(pv->provider->lock);
@ -250,64 +230,6 @@ void TestPVChannel::getField(pva::GetFieldRequester::shared_pointer const & requ
requester->getDone(pvd::Status(), pv->dtype);
}
pva::ChannelProcess::shared_pointer
TestPVChannel::createChannelProcess(
pva::ChannelProcessRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
pva::ChannelProcess::shared_pointer ret;
requester->channelProcessConnect(pvd::Status(pvd::Status::STATUSTYPE_FATAL, "Not implemented"), ret);
return ret;
}
pva::ChannelGet::shared_pointer
TestPVChannel::createChannelGet(
pva::ChannelGetRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
pva::ChannelGet::shared_pointer ret;
requester->channelGetConnect(pvd::Status(pvd::Status::STATUSTYPE_FATAL, "Not implemented"),
ret,
pvd::StructureConstPtr());
return ret;
}
pva::ChannelPut::shared_pointer
TestPVChannel::createChannelPut(
pva::ChannelPutRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
pva::ChannelPut::shared_pointer ret;
requester->channelPutConnect(pvd::Status(pvd::Status::STATUSTYPE_FATAL, "Not implemented"),
ret,
pvd::StructureConstPtr());
return ret;
}
pva::ChannelPutGet::shared_pointer
TestPVChannel::createChannelPutGet(
pva::ChannelPutGetRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
pva::ChannelPutGet::shared_pointer ret;
requester->channelPutGetConnect(pvd::Status(pvd::Status::STATUSTYPE_FATAL, "Not implemented"),
ret,
pvd::StructureConstPtr(),
pvd::StructureConstPtr());
return ret;
}
pva::ChannelRPC::shared_pointer
TestPVChannel::createChannelRPC(
pva::ChannelRPCRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
pva::ChannelRPC::shared_pointer ret;
requester->channelRPCConnect(pvd::Status(pvd::Status::STATUSTYPE_FATAL, "Not implemented"),
ret);
return ret;
}
pvd::Monitor::shared_pointer
TestPVChannel::createMonitor(
pvd::MonitorRequester::shared_pointer const & requester,
@ -325,18 +247,6 @@ TestPVChannel::createMonitor(
return ret;
}
pva::ChannelArray::shared_pointer
TestPVChannel::createChannelArray(
pva::ChannelArrayRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
pva::ChannelArray::shared_pointer ret;
requester->channelArrayConnect(pvd::Status(pvd::Status::STATUSTYPE_FATAL, "Not implemented"),
ret,
pvd::Array::const_shared_pointer());
return ret;
}
static size_t countTestPVMonitor;
TestPVMonitor::TestPVMonitor(const TestPVChannel::shared_pointer& ch,

View File

@ -11,6 +11,7 @@
#include <pv/pvAccess.h>
#include "pvahelper.h"
#include "weakmap.h"
#include "weakset.h"
@ -204,14 +205,13 @@ struct TestChannelMonitorRequester : public epics::pvData::MonitorRequester
bool waitForEvent();
};
struct TestPVChannel : public epics::pvAccess::Channel
struct TestPVChannel : public BaseChannel
{
POINTER_DEFINITIONS(TestPVChannel);
DUMBREQUESTER(TestPVChannel)
std::tr1::weak_ptr<TestPVChannel> weakself;
const std::tr1::shared_ptr<TestPV> pv;
std::tr1::shared_ptr<epics::pvAccess::ChannelRequester> requester;
ConnectionState state;
typedef weak_set<TestPVMonitor> monitors_t;
@ -221,42 +221,15 @@ struct TestPVChannel : public epics::pvAccess::Channel
const std::tr1::shared_ptr<epics::pvAccess::ChannelRequester>& req);
virtual ~TestPVChannel();
virtual void destroy();
virtual std::tr1::shared_ptr<epics::pvAccess::ChannelProvider> getProvider();
virtual std::string getRemoteAddress() { return "localhost:1234"; }
virtual ConnectionState getConnectionState();
virtual std::string getChannelName();
virtual std::tr1::shared_ptr<epics::pvAccess::ChannelRequester> getChannelRequester()
{ return std::tr1::shared_ptr<epics::pvAccess::ChannelRequester>(requester); }
virtual bool isConnected();
virtual void getField(epics::pvAccess::GetFieldRequester::shared_pointer const & requester,std::string const & subField);
virtual epics::pvAccess::AccessRights getAccessRights(epics::pvData::PVField::shared_pointer const & pvField)
{ return epics::pvAccess::readWrite; }
virtual epics::pvAccess::ChannelProcess::shared_pointer createChannelProcess(
epics::pvAccess::ChannelProcessRequester::shared_pointer const & channelProcessRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual epics::pvAccess::ChannelGet::shared_pointer createChannelGet(
epics::pvAccess::ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual epics::pvAccess::ChannelPut::shared_pointer createChannelPut(
epics::pvAccess::ChannelPutRequester::shared_pointer const & channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual epics::pvAccess::ChannelPutGet::shared_pointer createChannelPutGet(
epics::pvAccess::ChannelPutGetRequester::shared_pointer const & channelPutGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual epics::pvAccess::ChannelRPC::shared_pointer createChannelRPC(
epics::pvAccess::ChannelRPCRequester::shared_pointer const & channelRPCRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual void getField(epics::pvAccess::GetFieldRequester::shared_pointer const & requester,std::string const & subField);
virtual epics::pvData::Monitor::shared_pointer createMonitor(
epics::pvData::MonitorRequester::shared_pointer const & monitorRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual epics::pvAccess::ChannelArray::shared_pointer createChannelArray(
epics::pvAccess::ChannelArrayRequester::shared_pointer const & channelArrayRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual void printInfo() {}
virtual void printInfo(std::ostream& out) {}
};