This commit is contained in:
mrkraimer
2018-09-27 10:02:46 -04:00
14 changed files with 268 additions and 130 deletions

View File

@@ -51,17 +51,17 @@ struct ChatHandler : public pvas::SharedPV::Handler
virtual ~ChatHandler() {
printf("Cleanup Room\n");
}
virtual void onLastDisconnect(pvas::SharedPV& self) {
virtual void onLastDisconnect(const pvas::SharedPV::shared_pointer& self) OVERRIDE FINAL {
printf("Close Room %p\n", &self);
}
virtual void onPut(pvas::SharedPV& self, pvas::Operation& op) {
virtual void onPut(const pvas::SharedPV::shared_pointer& self, pvas::Operation& op) OVERRIDE FINAL {
pva::ChannelRequester::shared_pointer req(op.getChannel()->getChannelRequester());
std::ostringstream strm;
if(req) {
strm<<req->getRequesterName()<<" says ";
} else {
op.complete(pvd::Status::error("Defuct Put"));
op.complete(pvd::Status::error("Defunct Put"));
return;
}
@@ -71,7 +71,7 @@ struct ChatHandler : public pvas::SharedPV::Handler
replacement->getSubFieldT<pvd::PVString>("value")->put(strm.str());
self.post(*replacement, op.changed());
self->post(*replacement, op.changed());
op.complete();
}
};

View File

@@ -60,7 +60,7 @@ struct Getter : public pvac::ClientChannel::GetCallback,
op.cancel();
}
virtual void getDone(const pvac::GetEvent& event)
virtual void getDone(const pvac::GetEvent& event) OVERRIDE FINAL
{
switch(event.event) {
case pvac::GetEvent::Fail:
@@ -78,7 +78,7 @@ struct Getter : public pvac::ClientChannel::GetCallback,
}
}
virtual void connectEvent(const pvac::ConnectEvent& evt)
virtual void connectEvent(const pvac::ConnectEvent& evt) OVERRIDE FINAL
{
if(evt.connected) {
op = channel.get(this);

View File

@@ -216,6 +216,7 @@ struct ChannelGetRequesterImpl : public ChannelGetRequester, public Tracker
{
std::cerr << "[" << m_channelName << "] failed to get: " << status << '\n';
}
std::cout.flush();
done();
}
@@ -325,6 +326,7 @@ struct MonitorRequesterImpl : public MonitorRequester, public Tracker
myos << *(element->pvStructurePtr.get()) << "\n\n";
}
std::cout.flush();
}
}

View File

@@ -175,6 +175,7 @@ void printValue(std::string const & channelName, PVStructure::const_shared_point
terseStructure(std::cout, pv) << std::endl;
else
std::cout << std::endl << *(pv.get()) << std::endl << std::endl;
std::cout.flush();
}
void early(const char *inp, unsigned pos)

View File

@@ -26,7 +26,8 @@ MonitorFIFO::Config::Config()
:maxCount(4)
,defCount(4)
,actualCount(0) // readback
,ignoreRequestMask(false)
,dropEmptyUpdates(true)
,mapperMode(pvd::PVRequestMapper::Mask)
{}
size_t MonitorFIFO::num_instances;
@@ -40,8 +41,8 @@ MonitorFIFO::MonitorFIFO(const std::tr1::shared_ptr<MonitorRequester> &requester
,requester(requester)
,pvRequest(pvRequest)
,upstream(source)
,state(Closed)
,pipeline(false)
,opened(false)
,running(false)
,finished(false)
,needConnected(false)
@@ -111,7 +112,13 @@ void MonitorFIFO::show(std::ostream& strm) const
Guard G(mutex);
strm<<" open="<<opened<<" running="<<running<<" finished="<<finished<<"\n";
switch(state) {
case Closed: strm<<" Closed"; break;
case Opened: strm<<" Opened"; break;
case Error: strm<<" Error:"<<error; break;
}
strm<<" running="<<running<<" finished="<<finished<<"\n";
strm<<" #empty="<<empty.size()<<" #returned="<<returned.size()<<" #inuse="<<inuse.size()<<" flowCount="<<flowCount<<"\n";
strm<<" events "<<(needConnected?'C':'_')<<(needEvent?'E':'_')<<(needUnlisten?'U':'_')<<(needClosed?'X':'_')
<<"\n";
@@ -129,11 +136,11 @@ void MonitorFIFO::setFreeHighMark(double level)
void MonitorFIFO::open(const pvd::StructureConstPtr& type)
{
bool emptyselect;
std::string message;
{
Guard G(mutex);
if(opened)
if(state!=Closed)
throw std::logic_error("Monitor already open. Must close() before re-openning");
else if(needClosed)
throw std::logic_error("Monitor needs notify() between close() and open().");
@@ -148,59 +155,55 @@ void MonitorFIFO::open(const pvd::StructureConstPtr& type)
// fill up empty.
pvd::PVDataCreatePtr create(pvd::getPVDataCreate());
while(empty.size() < conf.actualCount+1) {
MonitorElementPtr elem(new MonitorElement(create->createPVStructure(type)));
empty.push_back(elem);
}
opened = true;
try {
mapper.compute(*create->createPVStructure(type), *pvRequest, conf.mapperMode);
message = mapper.warnings();
while(empty.size() < conf.actualCount+1) {
MonitorElementPtr elem(new MonitorElement(mapper.buildRequested()));
empty.push_back(elem);
}
state = Opened;
error = pvd::Status(); // ok
assert(inuse.empty());
assert(empty.size()>=2);
assert(returned.empty());
assert(conf.actualCount>=1);
}catch(std::runtime_error& e){
// error from compute()
error = pvd::Status::error(e.what());
state = Error;
}
needConnected = true;
this->type = type;
if(conf.ignoreRequestMask) {
selectMask.clear();
for(size_t i=0, N=empty.back()->pvStructurePtr->getNextFieldOffset(); i<N; i++)
selectMask.set(i);
} else {
selectMask = pvData::extractRequestMask(empty.back()->pvStructurePtr,
pvRequest->getSubField<pvData::PVStructure>("field"));
}
emptyselect = selectMask.isEmpty();
assert(inuse.empty());
assert(empty.size()>=2);
assert(returned.empty());
assert(conf.actualCount>=1);
}
if(!emptyselect) return;
if(message.empty()) return;
requester_type::shared_pointer req(requester.lock());
if(req) {
req->message("pvRequest with empty field mask", warningMessage);
req->message(message, warningMessage);
}
}
void MonitorFIFO::close()
{
Guard G(mutex);
if(!opened)
return; // no-op
opened = false;
needClosed = true;
selectMask.clear();
type.reset();
needClosed = state==Opened;
state = Closed;
}
void MonitorFIFO::finish()
{
Guard G(mutex);
if(!opened)
throw std::logic_error("Can not finish() a closed Monitor");
if(state==Closed)
throw std::logic_error("Can not finish() a closed Monitor");
else if(finished)
return; // no-op
finished = true;
if(inuse.empty() && running)
if(inuse.empty() && running && state==Opened)
needUnlisten = true;
}
@@ -211,17 +214,14 @@ bool MonitorFIFO::tryPost(const pvData::PVStructure& value,
{
Guard G(mutex);
assert(opened && !finished);
assert(state!=Closed && !finished);
if(state!=Opened) return false; // when Error, act as always "full"
assert(!empty.empty() || !inuse.empty());
// compute effective changed mask for this subscription
scratch = changed;
scratch &= selectMask;
const bool havefree = _freeCount()>0u;
MonitorElementPtr elem;
if(!conf.ignoreRequestMask && scratch.isEmpty()) {
if(conf.dropEmptyUpdates && !changed.logical_and(mapper.requestedMask())) {
// drop empty update
} else if(havefree) {
// take an empty element
@@ -229,16 +229,16 @@ bool MonitorFIFO::tryPost(const pvData::PVStructure& value,
empty.pop_front();
} else if(force) {
// allocate an extra element
elem.reset(new MonitorElement(pvd::getPVDataCreate()->createPVStructure(type)));
elem.reset(new MonitorElement(mapper.buildRequested()));
}
if(elem) {
try {
assert(value.getStructure() == elem->pvStructurePtr->getStructure());
elem->pvStructurePtr->copyUnchecked(value, scratch);
*elem->changedBitSet = scratch;
*elem->overrunBitSet = overrun;
*elem->overrunBitSet &= selectMask;
elem->changedBitSet->clear();
mapper.copyBaseToRequested(value, changed,
*elem->pvStructurePtr, *elem->changedBitSet);
elem->overrunBitSet->clear();
mapper.maskBaseToRequested(overrun, *elem->overrunBitSet);
if(inuse.empty() && running)
needEvent = true;
@@ -263,7 +263,8 @@ void MonitorFIFO::post(const pvData::PVStructure& value,
{
Guard G(mutex);
assert(opened && !finished);
assert(state!=Closed && !finished);
if(state!=Opened) return;
assert(!empty.empty() || !inuse.empty());
const bool use_empty = !empty.empty();
@@ -284,19 +285,16 @@ void MonitorFIFO::post(const pvData::PVStructure& value,
elem = inuse.back();
}
scratch = changed;
scratch &= selectMask;
if(!conf.ignoreRequestMask && scratch.isEmpty())
if(conf.dropEmptyUpdates && !changed.logical_and(mapper.requestedMask()))
return; // drop empty update
assert(value.getStructure() == elem->pvStructurePtr->getStructure());
elem->pvStructurePtr->copyUnchecked(value, scratch);
scratch.clear();
mapper.copyBaseToRequested(value, changed, *elem->pvStructurePtr, scratch);
if(use_empty) {
*elem->changedBitSet = scratch;
*elem->overrunBitSet = overrun;
*elem->overrunBitSet &= selectMask;
elem->overrunBitSet->clear();
mapper.maskBaseToRequested(overrun, *elem->overrunBitSet);
if(inuse.empty() && running)
needEvent = true;
@@ -311,7 +309,9 @@ void MonitorFIFO::post(const pvData::PVStructure& value,
// squash
elem->overrunBitSet->or_and(*elem->changedBitSet, scratch);
*elem->changedBitSet |= scratch;
elem->overrunBitSet->or_and(overrun, selectMask);
oscratch.clear();
mapper.maskBaseToRequested(overrun, oscratch);
elem->overrunBitSet->or_and(oscratch, scratch);
// leave as inuse.back()
}
@@ -326,6 +326,7 @@ void MonitorFIFO::notify()
evt = false,
unl = false,
clo = false;
pvd::Status err;
{
Guard G(mutex);
@@ -334,19 +335,22 @@ void MonitorFIFO::notify()
std::swap(evt, needEvent);
std::swap(unl, needUnlisten);
std::swap(clo, needClosed);
std::swap(err, error);
if(conn | evt | unl | clo) {
req = requester.lock();
self = shared_from_this();
}
if(conn)
type = (!inuse.empty() ? inuse.front() : empty.back())->pvStructurePtr->getStructure();
if(conn && err.isSuccess())
type = mapper.requested();
}
if(!req)
return;
if(conn)
if(conn && err.isSuccess())
req->monitorConnect(pvd::Status(), self, type);
else if(conn)
req->monitorConnect(err, self, type);
if(evt)
req->monitorEvent(self);
if(unl)
@@ -363,10 +367,10 @@ pvd::Status MonitorFIFO::start()
{
Guard G(mutex);
if(!opened)
if(state==Closed)
throw std::logic_error("Monitor can't start() before open()");
if(running)
if(running || state!=Opened)
return pvd::Status();
if(!inuse.empty()) {

View File

@@ -22,6 +22,7 @@
#include <pv/pvData.h>
#include <pv/sharedPtr.h>
#include <pv/bitSet.h>
#include <pv/createRequest.h>
#ifdef monitorEpicsExportSharedSymbols
# define epicsExportSharedSymbols
@@ -274,7 +275,8 @@ public:
size_t maxCount, //!< upper limit on requested FIFO size
defCount, //!< FIFO size when client makes no request
actualCount; //!< filled in with actual FIFO size
bool ignoreRequestMask;
bool dropEmptyUpdates; //!< default true. Drop updates which don't include an field values.
epics::pvData::PVRequestMapper::mode_t mapperMode; //!< default Mask. @see epics::pvData::PVRequestMapper::mode_t
Config();
};
@@ -351,6 +353,7 @@ private:
// -> MonitorRequester::monitorEvent()
// -> MonitorRequester::unlisten()
// -> ChannelBaseRequester::channelDisconnect()
// start() -> MonitorRequester::monitorEvent()
// release() -> Source::freeHighMark()
// -> notify() -> ...
// reportRemoteQueueStatus() -> Source::freeHighMark()
@@ -373,22 +376,27 @@ private:
// and expect that upstream will have only a weak ref to us.
const Source::shared_pointer upstream;
enum state_t {
Closed, // not open()'d
Opened, // successful open()
Error, // unsuccessful open()
} state;
bool pipeline; // const after ctor
bool opened; // open() vs. close()
bool running; // start() vs. stop()
bool finished; // finish() called
epics::pvData::BitSet selectMask, // set during open()
scratch; // using during post to avoid re-alloc
epics::pvData::BitSet scratch, oscratch; // using during post to avoid re-alloc
bool needConnected;
bool needEvent;
bool needUnlisten;
bool needClosed;
epics::pvData::Status error; // Set when entering Error state
size_t freeHighLevel;
epicsInt32 flowCount;
epics::pvData::StructureConstPtr type; // NULL if not opened
epics::pvData::PVRequestMapper mapper;
typedef std::list<MonitorElementPtr> buffer_t;
// we allocate one extra buffer element to hold data when post()

View File

@@ -310,6 +310,8 @@ public:
{
startRequest(PURE_CANCEL_REQUEST);
m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
} catch (std::runtime_error& e) {
// assume from checkAndGetTransport() due to wrong channel state
} catch (std::exception& e) {
// noop (do not complain if fails)
LOG(logLevelWarn, "Ignore exception during ChanneGet::cancel: %s", e.what());

View File

@@ -12,6 +12,7 @@
#include <pv/sharedPtr.h>
#include <pv/noDefaultMethods.h>
#include <pv/bitSet.h>
#include <pv/createRequest.h>
#include <pva/server.h>
@@ -78,6 +79,12 @@ class epicsShareClass SharedPV
friend struct SharedRPC;
public:
POINTER_DEFINITIONS(SharedPV);
struct epicsShareClass Config {
bool dropEmptyUpdates; //!< default true. Drop updates which don't include an field values.
epics::pvData::PVRequestMapper::mode_t mapperMode; //!< default Mask. @see epics::pvData::PVRequestMapper::mode_t
Config();
};
/** Callbacks associated with a SharedPV.
*
* @note For the purposes of locking, this class is an Requester (see @ref provider_roles_requester_locking)
@@ -96,15 +103,16 @@ public:
/** Allocate a new PV in the closed state.
* @param handler Our callbacks. May be NULL. Stored internally as a shared_ptr<>
* @param conf Optional. Extra configuration. If !NULL, will be modified to reflect configuration actually used.
* @post In the closed state
*/
static shared_pointer build(const std::tr1::shared_ptr<Handler>& handler);
//! A SharedPV which fails all Put and RPC operations.
static shared_pointer buildReadOnly();
//! A SharedPV which accepts all Put operations, and fails all RPC operations.
static shared_pointer buildMailbox();
static shared_pointer build(const std::tr1::shared_ptr<Handler>& handler, Config* conf=0);
//! A SharedPV which fails all Put and RPC operations. In closed state.
static shared_pointer buildReadOnly(Config* conf=0);
//! A SharedPV which accepts all Put operations, and fails all RPC operations. In closed state.
static shared_pointer buildMailbox(Config* conf=0);
private:
explicit SharedPV(const std::tr1::shared_ptr<Handler>& handler);
explicit SharedPV(const std::tr1::shared_ptr<Handler>& handler, Config* conf);
public:
virtual ~SharedPV();
@@ -119,7 +127,7 @@ public:
void open(const epics::pvData::PVStructure& value);
//! Begin allowing clients to connect.
//! @param value The initial value of this PV. (any pending Get operation will complete this this)
//! @param value The initial value of this PV. (any pending Get/Monitor operation will complete with this)
//! @param valid Only these marked fields are considered to have non-default values.
//! @throws std::logic_error if not in the closed state.
//! @post In the opened state
@@ -169,6 +177,8 @@ private:
weak_pointer internal_self; // const after build()
const Config config;
mutable epicsMutex mutex;
std::tr1::shared_ptr<SharedPV::Handler> handler;
@@ -192,6 +202,8 @@ private:
//! Used for initial Monitor update and Get operations.
epics::pvData::BitSet valid;
bool notifiedConn; // whether onFirstConnect() has been, or is being, called
int debugLvl;
EPICS_NOT_COPYABLE(SharedPV)

View File

@@ -118,7 +118,7 @@ StaticProvider::StaticProvider(const std::string &name)
impl->external_self = impl;
}
StaticProvider::~StaticProvider() { close(); }
StaticProvider::~StaticProvider() { close(true); }
void StaticProvider::close(bool destroy)
{

View File

@@ -51,6 +51,7 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr<SharedPV> &owner,
if(owner->channels.empty())
handler = owner->handler;
owner->channels.push_back(this);
owner->notifiedConn = !!handler;
}
if(handler) {
handler->onFirstConnect(owner);
@@ -62,10 +63,11 @@ SharedChannel::~SharedChannel()
std::tr1::shared_ptr<SharedPV::Handler> handler;
{
Guard G(owner->mutex);
bool wasempty = owner->channels.empty();
owner->channels.remove(this);
if(owner->channels.empty()) {
Guard G(owner->mutex);
if(!wasempty && owner->channels.empty() && owner->notifiedConn) {
handler = owner->handler;
owner->notifiedConn = false;
}
}
if(handler) {
@@ -74,7 +76,7 @@ SharedChannel::~SharedChannel()
if(owner->debugLvl>5)
{
pva::ChannelRequester::shared_pointer req(requester.lock());
errlogPrintf("%s : Open channel to %s > %p\n",
errlogPrintf("%s : Close channel to %s > %p\n",
req ? req->getRequesterName().c_str() : "<Defunct>",
channelName.c_str(),
this);
@@ -126,14 +128,27 @@ pva::ChannelPut::shared_pointer SharedChannel::createChannelPut(
std::tr1::shared_ptr<SharedPut> ret(new SharedPut(shared_from_this(), requester, pvRequest));
pvd::StructureConstPtr type;
{
Guard G(owner->mutex);
// ~SharedPut removes
owner->puts.push_back(ret.get());
type = owner->type;
std::string warning;
try {
{
Guard G(owner->mutex);
// ~SharedPut removes
owner->puts.push_back(ret.get());
if(owner->current) {
ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode);
type = ret->mapper.requested();
warning = ret->mapper.warnings();
}
}
if(!warning.empty())
requester->message(warning, pvd::warningMessage);
if(type)
requester->channelPutConnect(pvd::Status(), ret, type);
}catch(std::runtime_error& e){
ret.reset();
type.reset();
requester->channelPutConnect(pvd::Status::error(e.what()), ret, type);
}
if(type)
requester->channelPutConnect(pvd::Status(), ret, type);
return ret;
}
@@ -157,7 +172,10 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor(
pva::MonitorRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
std::tr1::shared_ptr<SharedMonitorFIFO> ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest));
SharedMonitorFIFO::Config mconf;
mconf.dropEmptyUpdates = owner->config.dropEmptyUpdates;
mconf.mapperMode = owner->config.mapperMode;
std::tr1::shared_ptr<SharedMonitorFIFO> ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest, &mconf));
bool notify;
{
Guard G(owner->mutex);
@@ -177,8 +195,9 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor(
SharedMonitorFIFO::SharedMonitorFIFO(const std::tr1::shared_ptr<SharedChannel>& channel,
const requester_type::shared_pointer& requester,
const pvd::PVStructure::const_shared_pointer &pvRequest)
:pva::MonitorFIFO(requester, pvRequest)
const pvd::PVStructure::const_shared_pointer &pvRequest,
Config *conf)
:pva::MonitorFIFO(requester, pvRequest, pva::MonitorFIFO::Source::shared_pointer(), conf)
,channel(channel)
{}

View File

@@ -105,12 +105,26 @@ void SharedPut::put(
pvd::BitSet::shared_pointer const & putBitSet)
{
std::tr1::shared_ptr<SharedPV::Handler> handler;
pvd::PVStructure::shared_pointer realval;
pvd::BitSet changed;
{
Guard G(channel->owner->mutex);
if(pvPutStructure->getStructure()!=mapper.requested()) {
requester_type::shared_pointer req(requester.lock());
if(req)
req->putDone(pvd::Status::error("Type changed"), shared_from_this());
return;
}
handler = channel->owner->handler;
realval = mapper.buildBase();
mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet);
}
std::tr1::shared_ptr<PutOP> impl(new PutOP(shared_from_this(), pvRequest, pvPutStructure, *putBitSet),
std::tr1::shared_ptr<PutOP> impl(new PutOP(shared_from_this(), pvRequest, realval, changed),
Operation::Impl::Cleanup());
if(handler) {
@@ -130,20 +144,13 @@ void SharedPut::get()
Guard G(channel->owner->mutex);
if(channel->owner->current) {
const pvd::StructureConstPtr& currentType = channel->owner->current->getStructure();
assert(!!mapper.requested());
current = pvd::getPVDataCreate()->createPVStructure(currentType);
current = mapper.buildRequested();
changed.reset(new pvd::BitSet);
if(currentType!=lastStruct) {
selectMask = pvd::extractRequestMask(current, pvRequest->getSubField<pvd::PVStructure>("field"));
emptyselect = selectMask.isEmpty();
lastStruct = currentType;
}
changed.reset(new pvd::BitSet(channel->owner->valid));
*changed &= selectMask;
// clone
current->copyUnchecked(*channel->owner->current, *changed);
mapper.copyBaseToRequested(*channel->owner->current, channel->owner->valid,
*current, *changed);
}
}

View File

@@ -41,39 +41,47 @@ struct MailboxHandler : public pvas::SharedPV::Handler {
namespace pvas {
SharedPV::Config::Config()
:dropEmptyUpdates(true)
,mapperMode(pvd::PVRequestMapper::Mask)
{}
size_t SharedPV::num_instances;
SharedPV::shared_pointer SharedPV::build(const std::tr1::shared_ptr<Handler>& handler)
SharedPV::shared_pointer SharedPV::build(const std::tr1::shared_ptr<Handler>& handler, Config *conf)
{
assert(!!handler);
SharedPV::shared_pointer ret(new SharedPV(handler));
SharedPV::shared_pointer ret(new SharedPV(handler, conf));
ret->internal_self = ret;
return ret;
}
SharedPV::shared_pointer SharedPV::buildReadOnly()
SharedPV::shared_pointer SharedPV::buildReadOnly(Config *conf)
{
SharedPV::shared_pointer ret(new SharedPV(std::tr1::shared_ptr<Handler>()));
SharedPV::shared_pointer ret(new SharedPV(std::tr1::shared_ptr<Handler>(), conf));
ret->internal_self = ret;
return ret;
}
SharedPV::shared_pointer SharedPV::buildMailbox()
SharedPV::shared_pointer SharedPV::buildMailbox(pvas::SharedPV::Config *conf)
{
std::tr1::shared_ptr<Handler> handler(new MailboxHandler);
SharedPV::shared_pointer ret(new SharedPV(handler));
SharedPV::shared_pointer ret(new SharedPV(handler, conf));
ret->internal_self = ret;
return ret;
}
SharedPV::SharedPV(const std::tr1::shared_ptr<Handler> &handler)
:handler(handler)
SharedPV::SharedPV(const std::tr1::shared_ptr<Handler> &handler, pvas::SharedPV::Config *conf)
:config(conf ? *conf : Config())
,handler(handler)
,notifiedConn(false)
,debugLvl(0)
{
REFTRACE_INCREMENT(num_instances);
}
SharedPV::~SharedPV() {
close();
REFTRACE_DECREMENT(num_instances);
}
@@ -96,9 +104,26 @@ bool SharedPV::isOpen() const
return !!type;
}
namespace {
struct PutInfo { // oh to be able to use std::tuple ...
std::tr1::shared_ptr<SharedPut> put;
pvd::StructureConstPtr type;
pvd::Status status;
PutInfo(const std::tr1::shared_ptr<SharedPut>& put, const pvd::StructureConstPtr& type, const pvd::Status& status)
:put(put), type(type), status(status)
{}
PutInfo(const std::tr1::shared_ptr<SharedPut>& put, const pvd::StructureConstPtr& type, const std::string& message)
:put(put), type(type)
{
if(!message.empty())
status = pvd::Status::warn(message);
}
};
}
void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& valid)
{
typedef std::vector<std::tr1::shared_ptr<SharedPut> > xputs_t;
typedef std::vector<PutInfo> xputs_t;
typedef std::vector<std::tr1::shared_ptr<SharedRPC> > xrpcs_t;
typedef std::vector<std::tr1::shared_ptr<pva::MonitorFIFO> > xmonitors_t;
typedef std::vector<std::tr1::shared_ptr<pva::GetFieldRequester> > xgetfields_t;
@@ -127,7 +152,13 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
FOR_EACH(puts_t::const_iterator, it, end, puts) {
try {
p_put.push_back((*it)->shared_from_this());
try {
(*it)->mapper.compute(*current, *(*it)->pvRequest, config.mapperMode);
p_put.push_back(PutInfo((*it)->shared_from_this(), (*it)->mapper.requested(), (*it)->mapper.warnings()));
}catch(std::runtime_error& e) {
// compute() error
p_put.push_back(PutInfo((*it)->shared_from_this(), pvd::StructureConstPtr(), pvd::Status::error(e.what())));
}
}catch(std::tr1::bad_weak_ptr&) {
//racing destruction
}
@@ -151,9 +182,14 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
}
getfields.clear(); // consume
}
// unlock for callbacks
FOR_EACH(xputs_t::iterator, it, end, p_put) {
SharedPut::requester_type::shared_pointer requester((*it)->requester.lock());
if(requester) requester->channelPutConnect(pvd::Status(), *it, newtype);
SharedPut::requester_type::shared_pointer requester(it->put->requester.lock());
if(requester) {
if(it->status.getType()==pvd::Status::STATUSTYPE_WARNING)
requester->message(it->status.getMessage(), pvd::warningMessage);
requester->channelPutConnect(it->status, it->put, it->type);
}
}
FOR_EACH(xrpcs_t::iterator, it, end, p_rpc) {
SharedRPC::requester_type::shared_pointer requester((*it)->requester.lock());
@@ -190,6 +226,7 @@ void SharedPV::close(bool destroy)
xrpcs_t p_rpc;
xmonitors_t p_monitor;
xchannels_t p_channel;
Handler::shared_pointer p_handler;
{
Guard I(mutex);
@@ -202,6 +239,7 @@ void SharedPV::close(bool destroy)
p_channel.reserve(channels.size());
FOR_EACH(puts_t::const_iterator, it, end, puts) {
(*it)->mapper.reset();
p_put.push_back((*it)->requester.lock());
}
FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {
@@ -209,10 +247,14 @@ void SharedPV::close(bool destroy)
}
FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
(*it)->close();
p_monitor.push_back((*it)->shared_from_this());
try {
p_monitor.push_back((*it)->shared_from_this());
}catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ }
}
FOR_EACH(channels_t::const_iterator, it, end, channels) {
p_channel.push_back((*it)->shared_from_this());
try {
p_channel.push_back((*it)->shared_from_this());
}catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ }
}
type.reset();
@@ -223,6 +265,10 @@ void SharedPV::close(bool destroy)
puts.clear();
rpcs.clear();
monitors.clear();
if(!channels.empty() && notifiedConn) {
p_handler = handler;
notifiedConn = false;
}
channels.clear();
}
}
@@ -240,6 +286,10 @@ void SharedPV::close(bool destroy)
if(!req) continue;
req->channelStateChange(*it, destroy ? pva::Channel::DESTROYED : pva::Channel::DISCONNECTED);
}
if(p_handler) {
shared_pointer self(internal_self);
p_handler->onLastDisconnect(self);
}
}
pvd::PVStructure::shared_pointer SharedPV::build()

View File

@@ -5,6 +5,8 @@
#ifndef SHAREDSTATEIMPL_H
#define SHAREDSTATEIMPL_H
#include <pv/createRequest.h>
#include "pva/sharedstate.h"
#include <pv/pvAccess.h>
#include <pv/reftrack.h>
@@ -62,7 +64,8 @@ struct SharedMonitorFIFO : public pva::MonitorFIFO
const std::tr1::shared_ptr<SharedChannel> channel;
SharedMonitorFIFO(const std::tr1::shared_ptr<SharedChannel>& channel,
const requester_type::shared_pointer& requester,
const pvd::PVStructure::const_shared_pointer &pvRequest);
const pvd::PVStructure::const_shared_pointer &pvRequest,
Config *conf);
virtual ~SharedMonitorFIFO();
};
@@ -74,8 +77,7 @@ struct SharedPut : public pva::ChannelPut,
const pvd::PVStructure::const_shared_pointer pvRequest;
// guarded by PV mutex
pvd::StructureConstPtr lastStruct;
pvd::BitSet selectMask;
pvd::PVRequestMapper mapper;
static size_t num_instances;

View File

@@ -30,6 +30,7 @@ struct Tester {
// we only have one thread, so no need for sync.
enum cb_t {
Connect,
ConnectError,
Event,
Unlisten,
Close,
@@ -39,6 +40,7 @@ struct Tester {
switch(cb) {
#define CASE(NAME) case NAME: return #NAME
CASE(Connect);
CASE(ConnectError);
CASE(Event);
CASE(Unlisten);
CASE(Close);
@@ -64,9 +66,12 @@ struct Tester {
}
virtual void monitorConnect(epics::pvData::Status const & status,
pva::MonitorPtr const & monitor, epics::pvData::StructureConstPtr const & structure) OVERRIDE FINAL {
testDiag("In %s", CURRENT_FUNCTION);
testDiag("In %s : %s", CURRENT_FUNCTION, status.isSuccess() ? "OK" : status.getMessage().c_str());
Guard G(mutex);
Tester::timeline.push_back(Connect);
if(status.isSuccess())
Tester::timeline.push_back(Connect);
else
Tester::timeline.push_back(ConnectError);
}
virtual void monitorEvent(pva::MonitorPtr const & monitor) OVERRIDE FINAL {
testDiag("In %s", CURRENT_FUNCTION);
@@ -763,11 +768,36 @@ void checkCountdown()
tester.testTimeline({Tester::Close});
}
void checkBadRequest()
{
testDiag("==== %s ====", CURRENT_FUNCTION);
pva::MonitorFIFO::Config conf;
conf.maxCount=4;
conf.defCount=3;
Tester tester(pvd::createRequest("field(invalid)"), &conf);
tester.connect(pvd::pvInt);
tester.mon->notify();
tester.testTimeline({Tester::ConnectError});
// when in Error, all are no-op
tester.post(15);
tester.tryPost(4, false);
tester.tryPost(5, false, true);
tester.mon->finish();
tester.mon->notify();
tester.testTimeline({}); // nothing happens
tester.close();
tester.testTimeline({});
}
} // namespace
MAIN(testmonitorfifo)
{
testPlan(184);
testPlan(189);
checkPlain();
checkAfterClose();
checkReOpenLost();
@@ -777,6 +807,7 @@ MAIN(testmonitorfifo)
checkPipeline();
checkSpam();
checkCountdown();
checkBadRequest();
return testDone();
}