Apply PVRequestMapper to shared state put/get

This commit is contained in:
Michael Davidsaver
2018-07-31 16:48:38 -07:00
parent 527cb3fe62
commit 643f7e47c8
5 changed files with 89 additions and 36 deletions

View File

@@ -12,6 +12,7 @@
#include <pv/sharedPtr.h>
#include <pv/noDefaultMethods.h>
#include <pv/bitSet.h>
#include <pv/createRequest.h>
#include <pva/server.h>
@@ -78,6 +79,12 @@ class epicsShareClass SharedPV
friend struct SharedRPC;
public:
POINTER_DEFINITIONS(SharedPV);
struct epicsShareClass Config {
bool dropEmptyUpdates; //!< default true. Drop updates which don't include an field values.
epics::pvData::PVRequestMapper::mode_t mapperMode; //!< default Mask. @see epics::pvData::PVRequestMapper::mode_t
Config();
};
/** Callbacks associated with a SharedPV.
*
* @note For the purposes of locking, this class is an Requester (see @ref provider_roles_requester_locking)
@@ -96,15 +103,16 @@ public:
/** Allocate a new PV in the closed state.
* @param handler Our callbacks. May be NULL. Stored internally as a shared_ptr<>
* @param conf Optional. Extra configuration. If !NULL, will be modified to reflect configuration actually used.
* @post In the closed state
*/
static shared_pointer build(const std::tr1::shared_ptr<Handler>& handler);
static shared_pointer build(const std::tr1::shared_ptr<Handler>& handler, Config* conf=0);
//! A SharedPV which fails all Put and RPC operations.
static shared_pointer buildReadOnly();
static shared_pointer buildReadOnly(Config* conf=0);
//! A SharedPV which accepts all Put operations, and fails all RPC operations.
static shared_pointer buildMailbox();
static shared_pointer buildMailbox(Config* conf=0);
private:
explicit SharedPV(const std::tr1::shared_ptr<Handler>& handler);
explicit SharedPV(const std::tr1::shared_ptr<Handler>& handler, Config* conf);
public:
virtual ~SharedPV();
@@ -169,6 +177,8 @@ private:
weak_pointer internal_self; // const after build()
const Config config;
mutable epicsMutex mutex;
std::tr1::shared_ptr<SharedPV::Handler> handler;

View File

@@ -126,12 +126,19 @@ pva::ChannelPut::shared_pointer SharedChannel::createChannelPut(
std::tr1::shared_ptr<SharedPut> ret(new SharedPut(shared_from_this(), requester, pvRequest));
pvd::StructureConstPtr type;
std::string warning;
{
Guard G(owner->mutex);
// ~SharedPut removes
owner->puts.push_back(ret.get());
type = owner->type;
if(owner->current) {
ret->mapper.compute(*owner->current, *pvRequest, owner->config.mapperMode);
type = ret->mapper.requested();
warning = ret->mapper.warnings();
}
}
if(!warning.empty())
requester->message(warning, pvd::warningMessage);
if(type)
requester->channelPutConnect(pvd::Status(), ret, type);
return ret;
@@ -157,7 +164,10 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor(
pva::MonitorRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
std::tr1::shared_ptr<SharedMonitorFIFO> ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest));
SharedMonitorFIFO::Config mconf;
mconf.dropEmptyUpdates = owner->config.dropEmptyUpdates;
mconf.mapperMode = owner->config.mapperMode;
std::tr1::shared_ptr<SharedMonitorFIFO> ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest, &mconf));
bool notify;
{
Guard G(owner->mutex);
@@ -177,8 +187,9 @@ pva::Monitor::shared_pointer SharedChannel::createMonitor(
SharedMonitorFIFO::SharedMonitorFIFO(const std::tr1::shared_ptr<SharedChannel>& channel,
const requester_type::shared_pointer& requester,
const pvd::PVStructure::const_shared_pointer &pvRequest)
:pva::MonitorFIFO(requester, pvRequest)
const pvd::PVStructure::const_shared_pointer &pvRequest,
Config *conf)
:pva::MonitorFIFO(requester, pvRequest, pva::MonitorFIFO::Source::shared_pointer(), conf)
,channel(channel)
{}

View File

@@ -105,12 +105,26 @@ void SharedPut::put(
pvd::BitSet::shared_pointer const & putBitSet)
{
std::tr1::shared_ptr<SharedPV::Handler> handler;
pvd::PVStructure::shared_pointer realval;
pvd::BitSet changed;
{
Guard G(channel->owner->mutex);
if(pvPutStructure->getStructure()!=mapper.requested()) {
requester_type::shared_pointer req(requester.lock());
if(req)
req->putDone(pvd::Status::error("Type changed"), shared_from_this());
return;
}
handler = channel->owner->handler;
realval = mapper.buildBase();
mapper.copyBaseFromRequested(*realval, changed, *pvPutStructure, *putBitSet);
}
std::tr1::shared_ptr<PutOP> impl(new PutOP(shared_from_this(), pvRequest, pvPutStructure, *putBitSet),
std::tr1::shared_ptr<PutOP> impl(new PutOP(shared_from_this(), pvRequest, realval, changed),
Operation::Impl::Cleanup());
if(handler) {
@@ -130,20 +144,13 @@ void SharedPut::get()
Guard G(channel->owner->mutex);
if(channel->owner->current) {
const pvd::StructureConstPtr& currentType = channel->owner->current->getStructure();
assert(!!mapper.requested());
current = pvd::getPVDataCreate()->createPVStructure(currentType);
current = mapper.buildRequested();
changed.reset(new pvd::BitSet);
if(currentType!=lastStruct) {
selectMask = pvd::extractRequestMask(current, pvRequest->getSubField<pvd::PVStructure>("field"));
emptyselect = selectMask.isEmpty();
lastStruct = currentType;
}
changed.reset(new pvd::BitSet(channel->owner->valid));
*changed &= selectMask;
// clone
current->copyUnchecked(*channel->owner->current, *changed);
mapper.copyBaseToRequested(*channel->owner->current, channel->owner->valid,
*current, *changed);
}
}

View File

@@ -41,33 +41,39 @@ struct MailboxHandler : public pvas::SharedPV::Handler {
namespace pvas {
SharedPV::Config::Config()
:dropEmptyUpdates(true)
,mapperMode(pvd::PVRequestMapper::Mask)
{}
size_t SharedPV::num_instances;
SharedPV::shared_pointer SharedPV::build(const std::tr1::shared_ptr<Handler>& handler)
SharedPV::shared_pointer SharedPV::build(const std::tr1::shared_ptr<Handler>& handler, Config *conf)
{
assert(!!handler);
SharedPV::shared_pointer ret(new SharedPV(handler));
SharedPV::shared_pointer ret(new SharedPV(handler, conf));
ret->internal_self = ret;
return ret;
}
SharedPV::shared_pointer SharedPV::buildReadOnly()
SharedPV::shared_pointer SharedPV::buildReadOnly(Config *conf)
{
SharedPV::shared_pointer ret(new SharedPV(std::tr1::shared_ptr<Handler>()));
SharedPV::shared_pointer ret(new SharedPV(std::tr1::shared_ptr<Handler>(), conf));
ret->internal_self = ret;
return ret;
}
SharedPV::shared_pointer SharedPV::buildMailbox()
SharedPV::shared_pointer SharedPV::buildMailbox(pvas::SharedPV::Config *conf)
{
std::tr1::shared_ptr<Handler> handler(new MailboxHandler);
SharedPV::shared_pointer ret(new SharedPV(handler));
SharedPV::shared_pointer ret(new SharedPV(handler, conf));
ret->internal_self = ret;
return ret;
}
SharedPV::SharedPV(const std::tr1::shared_ptr<Handler> &handler)
:handler(handler)
SharedPV::SharedPV(const std::tr1::shared_ptr<Handler> &handler, pvas::SharedPV::Config *conf)
:config(conf ? *conf : Config())
,handler(handler)
,debugLvl(0)
{
REFTRACE_INCREMENT(num_instances);
@@ -96,9 +102,20 @@ bool SharedPV::isOpen() const
return !!type;
}
namespace {
struct PutInfo { // oh to be able to use std::tuple ...
std::tr1::shared_ptr<SharedPut> put;
pvd::StructureConstPtr type;
std::string message;
PutInfo(const std::tr1::shared_ptr<SharedPut>& put, const pvd::StructureConstPtr& type, const std::string& message)
:put(put), type(type), message(message)
{}
};
}
void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& valid)
{
typedef std::vector<std::tr1::shared_ptr<SharedPut> > xputs_t;
typedef std::vector<PutInfo> xputs_t;
typedef std::vector<std::tr1::shared_ptr<SharedRPC> > xrpcs_t;
typedef std::vector<std::tr1::shared_ptr<pva::MonitorFIFO> > xmonitors_t;
typedef std::vector<std::tr1::shared_ptr<pva::GetFieldRequester> > xgetfields_t;
@@ -127,7 +144,8 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
FOR_EACH(puts_t::const_iterator, it, end, puts) {
try {
p_put.push_back((*it)->shared_from_this());
(*it)->mapper.compute(*current, *(*it)->pvRequest, config.mapperMode);
p_put.push_back(PutInfo((*it)->shared_from_this(), (*it)->mapper.requested(), (*it)->mapper.warnings()));
}catch(std::tr1::bad_weak_ptr&) {
//racing destruction
}
@@ -152,8 +170,12 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet&
getfields.clear(); // consume
}
FOR_EACH(xputs_t::iterator, it, end, p_put) {
SharedPut::requester_type::shared_pointer requester((*it)->requester.lock());
if(requester) requester->channelPutConnect(pvd::Status(), *it, newtype);
SharedPut::requester_type::shared_pointer requester(it->put->requester.lock());
if(requester) {
if(!it->message.empty())
requester->message(it->message, pvd::warningMessage);
requester->channelPutConnect(pvd::Status(), it->put, it->type);
}
}
FOR_EACH(xrpcs_t::iterator, it, end, p_rpc) {
SharedRPC::requester_type::shared_pointer requester((*it)->requester.lock());
@@ -202,6 +224,7 @@ void SharedPV::close(bool destroy)
p_channel.reserve(channels.size());
FOR_EACH(puts_t::const_iterator, it, end, puts) {
(*it)->mapper.reset();
p_put.push_back((*it)->requester.lock());
}
FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {

View File

@@ -5,6 +5,8 @@
#ifndef SHAREDSTATEIMPL_H
#define SHAREDSTATEIMPL_H
#include <pv/createRequest.h>
#include "pva/sharedstate.h"
#include <pv/pvAccess.h>
#include <pv/reftrack.h>
@@ -62,7 +64,8 @@ struct SharedMonitorFIFO : public pva::MonitorFIFO
const std::tr1::shared_ptr<SharedChannel> channel;
SharedMonitorFIFO(const std::tr1::shared_ptr<SharedChannel>& channel,
const requester_type::shared_pointer& requester,
const pvd::PVStructure::const_shared_pointer &pvRequest);
const pvd::PVStructure::const_shared_pointer &pvRequest,
Config *conf);
virtual ~SharedMonitorFIFO();
};
@@ -74,8 +77,7 @@ struct SharedPut : public pva::ChannelPut,
const pvd::PVStructure::const_shared_pointer pvRequest;
// guarded by PV mutex
pvd::StructureConstPtr lastStruct;
pvd::BitSet selectMask;
pvd::PVRequestMapper mapper;
static size_t num_instances;