Add SharedPV

This commit is contained in:
Michael Davidsaver
2018-05-29 21:35:34 -07:00
parent 146fbbc719
commit 74c2ec1ec3
14 changed files with 2050 additions and 0 deletions

View File

@@ -25,6 +25,7 @@
#include <pv/serverContextImpl.h>
#include <pv/serverChannelImpl.h>
#include <pv/blockingUDP.h>
#include <sharedstateimpl.h>
using namespace epics::pvData;
using std::string;
@@ -225,6 +226,9 @@ void providerRegInit(void*)
registerRefCounter("ResponseHandler (ABC)", &ResponseHandler::num_instances);
registerRefCounter("MonitorFIFO", &MonitorFIFO::num_instances);
pvas::registerRefTrackServer();
registerRefCounter("pvas::SharedChannel", &pvas::SharedChannel::num_instances);
registerRefCounter("pvas::SharedPut", &pvas::SharedPut::num_instances);
registerRefCounter("pvas::SharedRPC", &pvas::SharedRPC::num_instances);
}
ChannelProviderRegistry::shared_pointer ChannelProviderRegistry::clients()

View File

@@ -5,6 +5,7 @@ SRC_DIRS += $(PVACCESS_SRC)/server
INC += pv/serverContext.h
INC += pv/beaconServerStatusProvider.h
INC += pva/server.h
INC += pva/sharedstate.h
pvAccess_SRCS += responseHandlers.cpp
pvAccess_SRCS += serverContext.cpp
@@ -13,3 +14,7 @@ pvAccess_SRCS += baseChannelRequester.cpp
pvAccess_SRCS += beaconEmitter.cpp
pvAccess_SRCS += beaconServerStatusProvider.cpp
pvAccess_SRCS += server.cpp
pvAccess_SRCS += sharedstate_pv.cpp
pvAccess_SRCS += sharedstate_channel.cpp
pvAccess_SRCS += sharedstate_rpc.cpp
pvAccess_SRCS += sharedstate_put.cpp

View File

@@ -0,0 +1,256 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#ifndef PV_SHAREDSTATE_H
#define PV_SHAREDSTATE_H
#include <string>
#include <list>
#include <shareLib.h>
#include <pv/sharedPtr.h>
#include <pv/noDefaultMethods.h>
#include <pv/bitSet.h>
#include <pva/server.h>
namespace epics{namespace pvData{
class Structure;
class PVStructure;
class BitSet;
class Status;
}} // epics::pvData
namespace epics{namespace pvAccess{
class ChannelProvider;
class Channel;
class ChannelRequester;
class ChannelBaseRequester;
class GetFieldRequester;
}} // epics::pvAccess
namespace pvas {
struct SharedChannel;
struct SharedMonitorFIFO;
struct SharedPut;
struct SharedRPC;
struct Operation;
/** @addtogroup pvas
* @{
*/
/** A Shared State Process Variable (PV)
*
* "Shared" in the sense that all clients/subscribers interact with the
* same PVStructure.
*
* @warning For the purposes of locking, this class is an Operation (see @ref provider_roles_requester_locking).
* eg. no locks may be held when calling post(), open(), close(), or connect().
*
* This class contains a cached PVStructure, which is updated by post(),
* also a list of subscribing clients and in-progress network Operations.
*
* On construction a SharedPV is in a "disconnected" state.
* It has no associated PVStructure (or Structure). No type.
* A type is associated via the open() method.
* After it has been open()'d. Calls to post() may be made.
* Calling close() will close all currently opened client channels.
*
* Client channels, and operations on them, may be initiated at any time (via connect()).
* However, operations will not be fully created until open() is called.
*
* @note A SharedPV does not have a name. Name(s) are associated with a SharedPV
* By a Provider (StaticProvider, DynamicProvider, or any epics::pvAccess::ChannelProvider).
* These channel names may be seen via connect()
*
* @see @ref pvas_sharedptr
*/
class epicsShareClass SharedPV
: public pvas::StaticProvider::ChannelBuilder
{
friend struct SharedChannel;
friend struct SharedMonitorFIFO;
friend struct SharedPut;
friend struct SharedRPC;
public:
POINTER_DEFINITIONS(SharedPV);
/** Callbacks associated with a SharedPV.
*
* @note For the purposes of locking, this class is an Requester (see @ref provider_roles_requester_locking)
*/
struct epicsShareClass Handler {
POINTER_DEFINITIONS(Handler);
virtual ~Handler() {}
virtual void onFirstConnect(const SharedPV::shared_pointer& pv) {}
//! Called when the last client disconnects. May close()
virtual void onLastDisconnect(const SharedPV::shared_pointer& pv) {}
//! Client requests Put
virtual void onPut(const SharedPV::shared_pointer& pv, Operation& op) {}
//! Client requests RPC
virtual void onRPC(const SharedPV::shared_pointer& pv, Operation& op) {}
};
/** Allocate a new PV in the closed state.
* @param handler Our callbacks. May be NULL. Stored internally as a shared_ptr<>
* @post In the closed state
*/
static shared_pointer build(const std::tr1::shared_ptr<Handler>& handler);
//! A SharedPV which fails all Put and RPC operations.
static shared_pointer buildReadOnly();
//! A SharedPV which accepts all Put operations, and fails all RPC operations.
static shared_pointer buildMailbox();
private:
explicit SharedPV(const std::tr1::shared_ptr<Handler>& handler);
public:
virtual ~SharedPV();
//! Replace Handler given with ctor
void setHandler(const std::tr1::shared_ptr<Handler>& handler);
Handler::shared_pointer getHandler() const;
//! test open-ness. cf. open() and close()
bool isOpen() const;
//! Shorthand for @code open(value, pvd::BitSet().set(0)) @endcode
void open(const epics::pvData::PVStructure& value);
//! Begin allowing clients to connect.
//! @param value The initial value of this PV. (any pending Get operation will complete this this)
//! @param valid Only these marked fields are considered to have non-default values.
//! @throws std::logic_error if not in the closed state.
//! @post In the opened state
//! @note Provider locking rules apply (@see provider_roles_requester_locking).
void open(const epics::pvData::PVStructure& value, const epics::pvData::BitSet& valid);
//! Shorthand for @code open(*pvd::getPVDataCreate()->createPVStructure(type), pvd::BitSet().set(0)) @endcode
void open(const epics::pvData::StructureConstPtr& type);
//! Force any clients to disconnect, and prevent re-connection
//! @param destroy Indicate whether this close() is permanent for clients.
//! If destroy=false, the internal client list is retained, and these clients will see a subsequent open().
//! If destory=true, the internal client list is cleared.
//! @post In the closed state
//! @note Provider locking rules apply (@see provider_roles_requester_locking).
virtual void close(bool destroy=false);
//! Create a new container which may be used to prepare to call post().
//! This container will be owned exclusively by the caller.
std::tr1::shared_ptr<epics::pvData::PVStructure> build();
//! Update the cached PVStructure in this SharedPV.
//! Only those fields marked as changed will be copied in.
//! Makes a light-weight copy.
//! @pre isOpen()==true
//! @throws std::logic_error if !isOpen()
//! @note Provider locking rules apply (@see provider_roles_requester_locking).
void post(const epics::pvData::PVStructure& value,
const epics::pvData::BitSet& changed);
//! Update arguments with current value, which is the initial value from open() with accumulated post() calls.
void fetch(epics::pvData::PVStructure& value, epics::pvData::BitSet& valid);
//! may call Handler::onFirstConnect()
//! @note Provider locking rules apply (@see provider_roles_requester_locking).
virtual std::tr1::shared_ptr<epics::pvAccess::Channel> connect(
const std::tr1::shared_ptr<epics::pvAccess::ChannelProvider>& provider,
const std::string& channelName,
const std::tr1::shared_ptr<epics::pvAccess::ChannelRequester>& requester);
void setDebug(int lvl);
int isDebug() const;
private:
static size_t num_instances;
weak_pointer internal_self; // const after build()
mutable epicsMutex mutex;
std::tr1::shared_ptr<SharedPV::Handler> handler;
typedef std::list<SharedPut*> puts_t;
typedef std::list<SharedRPC*> rpcs_t;
typedef std::list<SharedMonitorFIFO*> monitors_t;
typedef std::list<std::tr1::weak_ptr<epics::pvAccess::GetFieldRequester> > getfields_t;
typedef std::list<SharedChannel*> channels_t;
std::tr1::shared_ptr<const epics::pvData::Structure> type;
puts_t puts;
rpcs_t rpcs;
monitors_t monitors;
getfields_t getfields;
channels_t channels;
std::tr1::shared_ptr<epics::pvData::PVStructure> current;
//! mask of fields which are considered to have non-default values.
//! Used for initial Monitor update and Get operations.
epics::pvData::BitSet valid;
int debugLvl;
EPICS_NOT_COPYABLE(SharedPV)
};
//! An in-progress network operation (Put or RPC).
//! Use value(), changed() to see input data, and
//! call complete() when done handling.
struct epicsShareClass Operation {
POINTER_DEFINITIONS(Operation);
struct Impl;
private:
std::tr1::shared_ptr<Impl> impl;
friend struct SharedPut;
friend struct SharedRPC;
explicit Operation(const std::tr1::shared_ptr<Impl> impl);
public:
Operation() {} //!< create empty op for later assignment
//! pvRequest blob, may be used to modify handling.
const epics::pvData::PVStructure& pvRequest() const;
const epics::pvData::PVStructure& value() const; //!< Input data
//! Applies to value(). Which fields of input data are actual valid. Others should not be used.
const epics::pvData::BitSet& changed() const;
//! The name of the channel through which this request was made (eg. for logging purposes).
std::string channelName() const;
void complete(); //!< shorthand for successful completion w/o data (Put or RPC with void return)
//! Complete with success or error w/o data.
void complete(const epics::pvData::Status& sts);
//! Sucessful completion with data (RPC only)
void complete(const epics::pvData::PVStructure& value,
const epics::pvData::BitSet& changed);
//! Send info message to client. Does not complete().
void info(const std::string&);
//! Send warning message to client. Does not complete().
void warn(const std::string&);
int isDebug() const;
// escape hatch. never NULL
std::tr1::shared_ptr<epics::pvAccess::Channel> getChannel();
// escape hatch. may be NULL
std::tr1::shared_ptr<epics::pvAccess::ChannelBaseRequester> getRequester();
bool valid() const;
#if __cplusplus>=201103L
explicit operator bool() const { return valid(); }
#else
private:
typedef bool (Operation::*bool_type)() const;
public:
operator bool_type() const { return valid() ? &Operation::valid : 0; }
#endif
};
} // namespace pvas
//! @}
#endif // PV_SHAREDSTATE_H

View File

@@ -0,0 +1,277 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#include <list>
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <errlog.h>
#include <shareLib.h>
#include <pv/sharedPtr.h>
#include <pv/noDefaultMethods.h>
#define epicsExportSharedSymbols
#include "sharedstateimpl.h"
namespace pvas {
size_t SharedChannel::num_instances;
SharedChannel::SharedChannel(const std::tr1::shared_ptr<SharedPV> &owner,
const pva::ChannelProvider::shared_pointer provider,
const std::string& channelName,
const requester_type::shared_pointer& requester)
:owner(owner)
,channelName(channelName)
,requester(requester)
,provider(provider)
{
REFTRACE_INCREMENT(num_instances);
if(owner->debugLvl>5) {
errlogPrintf("%s : Open channel to %s > %p\n",
requester->getRequesterName().c_str(),
channelName.c_str(),
this);
}
SharedPV::Handler::shared_pointer handler;
{
Guard G(owner->mutex);
if(owner->channels.empty())
handler = owner->handler;
owner->channels.push_back(this);
}
if(handler) {
handler->onFirstConnect(owner);
}
}
SharedChannel::~SharedChannel()
{
std::tr1::shared_ptr<SharedPV::Handler> handler;
{
Guard G(owner->mutex);
owner->channels.remove(this);
if(owner->channels.empty()) {
Guard G(owner->mutex);
handler = owner->handler;
}
}
if(handler) {
handler->onLastDisconnect(owner);
}
if(owner->debugLvl>5)
{
pva::ChannelRequester::shared_pointer req(requester.lock());
errlogPrintf("%s : Open channel to %s > %p\n",
req ? req->getRequesterName().c_str() : "<Defunct>",
channelName.c_str(),
this);
}
REFTRACE_DECREMENT(num_instances);
}
void SharedChannel::destroy() {}
std::tr1::shared_ptr<pva::ChannelProvider> SharedChannel::getProvider()
{
return provider.lock();
}
std::string SharedChannel::getRemoteAddress()
{
return getChannelName(); // for lack of anything better to do...
}
std::string SharedChannel::getChannelName()
{
return channelName;
}
std::tr1::shared_ptr<pva::ChannelRequester> SharedChannel::getChannelRequester()
{
return requester.lock();
}
void SharedChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField)
{
epics::pvData::FieldConstPtr desc;
{
Guard G(owner->mutex);
if(owner->type)
desc = owner->type;
else
owner->getfields.push_back(requester);
}
if(desc)
requester->getDone(pvd::Status(), desc);
}
pva::ChannelPut::shared_pointer SharedChannel::createChannelPut(
pva::ChannelPutRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
std::tr1::shared_ptr<SharedPut> ret(new SharedPut(shared_from_this(), requester, pvRequest));
pvd::StructureConstPtr type;
{
Guard G(owner->mutex);
// ~SharedPut removes
owner->puts.push_back(ret.get());
type = owner->type;
}
if(type)
requester->channelPutConnect(pvd::Status(), ret, type);
return ret;
}
pva::ChannelRPC::shared_pointer SharedChannel::createChannelRPC(
pva::ChannelRPCRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
std::tr1::shared_ptr<SharedRPC> ret(new SharedRPC(shared_from_this(), requester, pvRequest));
bool opened;
{
Guard G(owner->mutex);
owner->rpcs.push_back(ret.get());
opened = !!owner->type;
}
if(opened)
requester->channelRPCConnect(pvd::Status(), ret);
return ret;
}
pva::Monitor::shared_pointer SharedChannel::createMonitor(
pva::MonitorRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest)
{
std::tr1::shared_ptr<SharedMonitorFIFO> ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest));
bool notify;
{
Guard G(owner->mutex);
owner->monitors.push_back(ret.get());
notify = !!owner->type;
if(notify) {
ret->open(owner->type);
// post initial update
ret->post(*owner->current, owner->valid);
}
}
if(notify)
ret->notify();
return ret;
}
SharedMonitorFIFO::SharedMonitorFIFO(const std::tr1::shared_ptr<SharedChannel>& channel,
const requester_type::shared_pointer& requester,
const pvd::PVStructure::const_shared_pointer &pvRequest)
:pva::MonitorFIFO(requester, pvRequest)
,channel(channel)
{}
SharedMonitorFIFO::~SharedMonitorFIFO()
{
Guard G(channel->owner->mutex);
channel->owner->monitors.remove(this);
}
Operation::Operation(const std::tr1::shared_ptr<Impl> impl)
:impl(impl)
{}
const epics::pvData::PVStructure& Operation::pvRequest() const
{
return *impl->pvRequest;
}
const epics::pvData::PVStructure& Operation::value() const
{
return *impl->value;
}
const epics::pvData::BitSet& Operation::changed() const
{
return impl->changed;
}
std::string Operation::channelName() const
{
std::string ret;
std::tr1::shared_ptr<epics::pvAccess::Channel> chan(impl->getChannel());
if(chan) {
ret = chan->getChannelName();
}
return ret;
}
void Operation::complete()
{
impl->complete(pvd::Status(), 0);
}
void Operation::complete(const epics::pvData::Status& sts)
{
impl->complete(sts, 0);
}
void Operation::complete(const epics::pvData::PVStructure &value,
const epics::pvData::BitSet &changed)
{
impl->complete(pvd::Status(), &value);
}
void Operation::info(const std::string& msg)
{
pva::ChannelBaseRequester::shared_pointer req(impl->getRequester());
if(req)
req->message(msg, pvd::infoMessage);
}
void Operation::warn(const std::string& msg)
{
pva::ChannelBaseRequester::shared_pointer req(impl->getRequester());
if(req)
req->message(msg, pvd::warningMessage);
}
int Operation::isDebug() const
{
Guard G(impl->mutex);
return impl->debugLvl;
}
std::tr1::shared_ptr<epics::pvAccess::Channel> Operation::getChannel()
{
return impl->getChannel();
}
std::tr1::shared_ptr<pva::ChannelBaseRequester> Operation::getRequester()
{
return impl->getRequester();
}
bool Operation::valid() const
{
return !!impl;
}
void Operation::Impl::Cleanup::operator ()(Operation::Impl* impl) {
bool err;
{
Guard G(impl->mutex);
err = !impl->done;
}
if(err)
impl->complete(pvd::Status::error("Implicit Cancel"), 0);
delete impl;
}
} // namespace pvas

View File

@@ -0,0 +1,142 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#include <list>
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <shareLib.h>
#include <pv/sharedPtr.h>
#include <pv/noDefaultMethods.h>
#define epicsExportSharedSymbols
#include "sharedstateimpl.h"
namespace {
struct PutOP : public pvas::Operation::Impl
{
const std::tr1::shared_ptr<pvas::SharedPut> op;
PutOP(const std::tr1::shared_ptr<pvas::SharedPut>& op,
const pvd::PVStructure::const_shared_pointer& pvRequest,
const pvd::PVStructure::const_shared_pointer& value,
const pvd::BitSet& changed)
:Impl(pvRequest, value, changed)
,op(op)
{}
virtual ~PutOP() {}
virtual pva::Channel::shared_pointer getChannel() OVERRIDE FINAL
{
return op->channel;
}
virtual pva::ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL
{
return op->requester.lock();
}
virtual void complete(const pvd::Status& sts,
const epics::pvData::PVStructure* value) OVERRIDE FINAL
{
if(value)
throw std::logic_error("Put can't complete() with data");
{
Guard G(mutex);
if(done)
throw std::logic_error("Operation already complete");
done = true;
}
pva::ChannelPutRequester::shared_pointer req(op->requester.lock());
if(req)
req->putDone(sts, op);
}
};
}
namespace pvas {
size_t SharedPut::num_instances;
SharedPut::SharedPut(const std::tr1::shared_ptr<SharedChannel>& channel,
const requester_type::shared_pointer& requester,
const pvd::PVStructure::const_shared_pointer &pvRequest)
:channel(channel)
,requester(requester)
,pvRequest(pvRequest)
{
REFTRACE_INCREMENT(num_instances);
}
SharedPut::~SharedPut()
{
Guard G(channel->owner->mutex);
channel->owner->puts.remove(this);
REFTRACE_DECREMENT(num_instances);
}
void SharedPut::destroy() {}
std::tr1::shared_ptr<pva::Channel> SharedPut::getChannel()
{
return channel;
}
void SharedPut::cancel() {}
void SharedPut::lastRequest() {}
void SharedPut::put(
pvd::PVStructure::shared_pointer const & pvPutStructure,
pvd::BitSet::shared_pointer const & putBitSet)
{
std::tr1::shared_ptr<SharedPV::Handler> handler;
{
Guard G(channel->owner->mutex);
handler = channel->owner->handler;
}
std::tr1::shared_ptr<PutOP> impl(new PutOP(shared_from_this(), pvRequest, pvPutStructure, *putBitSet),
Operation::Impl::Cleanup());
if(handler) {
Operation op(impl);
handler->onPut(channel->owner, op);
}
}
void SharedPut::get()
{
pvd::Status sts;
pvd::PVStructurePtr current;
pvd::BitSetPtr changed;
{
Guard G(channel->owner->mutex);
if(channel->owner->current) {
// clone
current = pvd::getPVDataCreate()->createPVStructure(channel->owner->current->getStructure());
current->copyUnchecked(*channel->owner->current);
changed.reset(new pvd::BitSet(channel->owner->valid));
}
}
requester_type::shared_pointer req(requester.lock());
if(!req) return;
if(!current) {
sts = pvd::Status::error("Get not possible, cache disabled");
}
req->getDone(sts, shared_from_this(), current, changed);
}
} // namespace pvas

View File

@@ -0,0 +1,312 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#include <list>
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <errlog.h>
#include <shareLib.h>
#include <pv/sharedPtr.h>
#include <pv/noDefaultMethods.h>
#define epicsExportSharedSymbols
#include "sharedstateimpl.h"
namespace {
struct MailboxHandler : public pvas::SharedPV::Handler {
virtual ~MailboxHandler() {}
virtual void onPut(pvas::SharedPV& self, pvas::Operation& op)
{
self.post(op.value(), op.changed());
op.info("Set!");
op.complete();
}
static std::tr1::shared_ptr<pvas::SharedPV::Handler> build() {
std::tr1::shared_ptr<MailboxHandler> ret(new MailboxHandler);
return ret;
}
};
} // namespace
namespace pvas {
size_t SharedPV::num_instances;
SharedPV::shared_pointer SharedPV::build(const std::tr1::shared_ptr<Handler>& handler)
{
assert(!!handler);
SharedPV::shared_pointer ret(new SharedPV(handler));
ret->internal_self = ret;
return ret;
}
SharedPV::shared_pointer SharedPV::buildReadOnly()
{
SharedPV::shared_pointer ret(new SharedPV(std::tr1::shared_ptr<Handler>()));
ret->internal_self = ret;
return ret;
}
SharedPV::shared_pointer SharedPV::buildMailbox()
{
std::tr1::shared_ptr<Handler> handler(new MailboxHandler);
SharedPV::shared_pointer ret(new SharedPV(handler));
ret->internal_self = ret;
return ret;
}
SharedPV::SharedPV(const std::tr1::shared_ptr<Handler> &handler)
:handler(handler)
,debugLvl(0)
{
REFTRACE_INCREMENT(num_instances);
}
SharedPV::~SharedPV() {
REFTRACE_DECREMENT(num_instances);
}
void SharedPV::setHandler(const std::tr1::shared_ptr<Handler>& handler)
{
Guard G(mutex);
this->handler = handler;
}
SharedPV::Handler::shared_pointer SharedPV::getHandler() const
{
Guard G(mutex);
return handler;
}
bool SharedPV::isOpen() const
{
Guard G(mutex);
return !!type;
}
void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& valid)
{
typedef std::vector<std::tr1::shared_ptr<SharedPut> > xputs_t;
typedef std::vector<std::tr1::shared_ptr<SharedRPC> > xrpcs_t;
typedef std::vector<std::tr1::shared_ptr<pva::MonitorFIFO> > xmonitors_t;
typedef std::vector<std::tr1::shared_ptr<pva::GetFieldRequester> > xgetfields_t;
const pvd::StructureConstPtr newtype(value.getStructure());
xputs_t p_put;
xrpcs_t p_rpc;
xmonitors_t p_monitor;
xgetfields_t p_getfield;
{
Guard I(mutex);
if(type)
throw std::logic_error("Already open()");
p_put.reserve(puts.size());
p_rpc.reserve(rpcs.size());
p_monitor.reserve(monitors.size());
p_getfield.reserve(getfields.size());
type = value.getStructure();
current = pvd::getPVDataCreate()->createPVStructure(newtype);
current->copyUnchecked(value);
this->valid = valid;
FOR_EACH(puts_t::const_iterator, it, end, puts) {
try {
p_put.push_back((*it)->shared_from_this());
}catch(std::tr1::bad_weak_ptr&) {
//racing destruction
}
}
FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {
try {
p_rpc.push_back((*it)->shared_from_this());
}catch(std::tr1::bad_weak_ptr&) {}
}
FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
try {
(*it)->open(newtype);
// post initial update
(*it)->post(*current, valid);
p_monitor.push_back((*it)->shared_from_this());
}catch(std::tr1::bad_weak_ptr&) {}
}
// consume getField
FOR_EACH(getfields_t::iterator, it, end, getfields) {
p_getfield.push_back(it->lock());
}
getfields.clear(); // consume
}
FOR_EACH(xputs_t::iterator, it, end, p_put) {
SharedPut::requester_type::shared_pointer requester((*it)->requester.lock());
if(requester) requester->channelPutConnect(pvd::Status(), *it, newtype);
}
FOR_EACH(xrpcs_t::iterator, it, end, p_rpc) {
SharedRPC::requester_type::shared_pointer requester((*it)->requester.lock());
if(requester) requester->channelRPCConnect(pvd::Status(), *it);
}
FOR_EACH(xmonitors_t::iterator, it, end, p_monitor) {
(*it)->notify();
}
FOR_EACH(xgetfields_t::iterator, it, end, p_getfield) {
if(*it) (*it)->getDone(pvd::Status(), newtype);
}
}
void SharedPV::open(const epics::pvData::PVStructure& value)
{
// consider all fields to have non-default values. For users how don't keep track of this.
open(value, pvd::BitSet().set(0));
}
void SharedPV::open(const pvd::StructureConstPtr& type)
{
pvd::PVStructurePtr value(pvd::getPVDataCreate()->createPVStructure(type));
open(*value);
}
void SharedPV::close(bool destroy)
{
typedef std::vector<std::tr1::shared_ptr<pva::ChannelPutRequester> > xputs_t;
typedef std::vector<std::tr1::shared_ptr<pva::ChannelRPCRequester> > xrpcs_t;
typedef std::vector<std::tr1::shared_ptr<pva::MonitorFIFO> > xmonitors_t;
typedef std::vector<std::tr1::shared_ptr<SharedChannel> > xchannels_t;
xputs_t p_put;
xrpcs_t p_rpc;
xmonitors_t p_monitor;
xchannels_t p_channel;
{
Guard I(mutex);
if(!type)
return;
p_put.reserve(puts.size());
p_rpc.reserve(rpcs.size());
p_monitor.reserve(monitors.size());
p_channel.reserve(channels.size());
FOR_EACH(puts_t::const_iterator, it, end, puts) {
p_put.push_back((*it)->requester.lock());
}
FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {
p_rpc.push_back((*it)->requester.lock());
}
FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
(*it)->close();
p_monitor.push_back((*it)->shared_from_this());
}
FOR_EACH(channels_t::const_iterator, it, end, channels) {
p_channel.push_back((*it)->shared_from_this());
}
type.reset();
current.reset();
if(destroy) {
// forget about all clients, to prevent the possibility of our
// sending a second destroy notification.
puts.clear();
rpcs.clear();
monitors.clear();
channels.clear();
}
}
FOR_EACH(xputs_t::iterator, it, end, p_put) {
if(*it) (*it)->channelDisconnect(destroy);
}
FOR_EACH(xrpcs_t::iterator, it, end, p_rpc) {
if(*it) (*it)->channelDisconnect(destroy);
}
FOR_EACH(xmonitors_t::iterator, it, end, p_monitor) {
(*it)->notify();
}
FOR_EACH(xchannels_t::iterator, it, end, p_channel) {
pva::ChannelRequester::shared_pointer req((*it)->requester.lock());
if(!req) continue;
req->channelStateChange(*it, destroy ? pva::Channel::DESTROYED : pva::Channel::DISCONNECTED);
}
}
pvd::PVStructure::shared_pointer SharedPV::build()
{
Guard G(mutex);
if(!type)
throw std::logic_error("Can't build() before open()");
return pvd::getPVDataCreate()->createPVStructure(type);
}
void SharedPV::post(const pvd::PVStructure& value,
const pvd::BitSet& changed)
{
typedef std::vector<std::tr1::shared_ptr<pva::MonitorFIFO> > xmonitors_t;
xmonitors_t p_monitor;
{
Guard I(mutex);
if(!type)
throw std::logic_error("Not open()");
else if(*type!=*value.getStructure())
throw std::logic_error("Type mis-match");
if(current) {
current->copyUnchecked(value, changed);
valid |= changed;
}
p_monitor.reserve(monitors.size()); // ick, for lack of a list with thread-safe iteration
FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
(*it)->post(value, changed);
p_monitor.push_back((*it)->shared_from_this());
}
}
FOR_EACH(xmonitors_t::iterator, it, end, p_monitor) {
(*it)->notify();
}
}
void SharedPV::fetch(epics::pvData::PVStructure& value, epics::pvData::BitSet& valid)
{
Guard I(mutex);
if(!type)
throw std::logic_error("Not open()");
else if(value.getStructure()!=type)
throw std::logic_error("Types do not match");
value.copy(*current);
valid = this->valid;
}
std::tr1::shared_ptr<pva::Channel>
SharedPV::connect(const std::tr1::shared_ptr<epics::pvAccess::ChannelProvider> &provider,
const std::string &channelName,
const std::tr1::shared_ptr<pva::ChannelRequester>& requester)
{
shared_pointer self(internal_self);
std::tr1::shared_ptr<SharedChannel> ret(new SharedChannel(self, provider, channelName, requester));
return ret;
}
void SharedPV::setDebug(int lvl)
{
Guard G(mutex);
debugLvl = lvl;
}
int SharedPV::isDebug() const
{
Guard G(mutex);
return debugLvl;
}
} // namespace pvas

View File

@@ -0,0 +1,119 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#include <list>
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <shareLib.h>
#include <pv/sharedPtr.h>
#include <pv/noDefaultMethods.h>
#define epicsExportSharedSymbols
#include "sharedstateimpl.h"
namespace {
struct RPCOP : public pvas::Operation::Impl
{
const std::tr1::shared_ptr<pvas::SharedRPC> op;
RPCOP(const std::tr1::shared_ptr<pvas::SharedRPC>& op,
const pvd::PVStructure::const_shared_pointer& pvRequest,
const pvd::PVStructure::const_shared_pointer& value)
:Impl(pvRequest, value, pvd::BitSet().set(0))
,op(op)
{}
virtual ~RPCOP() {}
virtual pva::Channel::shared_pointer getChannel() OVERRIDE FINAL
{
return op->channel;
}
virtual pva::ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL
{
return op->requester.lock();
}
virtual void complete(const pvd::Status& sts,
const epics::pvData::PVStructure* value) OVERRIDE FINAL
{
{
Guard G(mutex);
if(done)
throw std::logic_error("Operation already complete");
done = true;
}
epics::pvData::PVStructurePtr tosend;
if(!sts.isSuccess()) {
// no data for error
} else if(value) {
tosend = pvd::getPVDataCreate()->createPVStructure(value->getStructure());
tosend->copyUnchecked(*value);
} else {
// RPC with null result. Make empty structure
tosend = pvd::getPVDataCreate()->createPVStructure(
pvd::getFieldCreate()
->createFieldBuilder()
->createStructure());
}
pva::ChannelRPCRequester::shared_pointer req(op->requester.lock());
if(req)
req->requestDone(sts, op, tosend);
}
};
}
namespace pvas {
size_t SharedRPC::num_instances;
SharedRPC::SharedRPC(const std::tr1::shared_ptr<SharedChannel>& channel,
const requester_type::shared_pointer& requester,
const pvd::PVStructure::const_shared_pointer &pvRequest)
:channel(channel)
,requester(requester)
,pvRequest(pvRequest)
{
REFTRACE_INCREMENT(num_instances);
}
SharedRPC::~SharedRPC() {
Guard G(channel->owner->mutex);
channel->owner->rpcs.remove(this);
REFTRACE_DECREMENT(num_instances);
}
void SharedRPC::destroy() {}
std::tr1::shared_ptr<pva::Channel> SharedRPC::getChannel()
{
return channel;
}
void SharedRPC::cancel() {}
void SharedRPC::lastRequest() {}
void SharedRPC::request(epics::pvData::PVStructure::shared_pointer const & pvArgument)
{
std::tr1::shared_ptr<SharedPV::Handler> handler;
{
Guard G(channel->owner->mutex);
handler = channel->owner->handler;
}
std::tr1::shared_ptr<RPCOP> impl(new RPCOP(shared_from_this(), pvRequest, pvArgument),
Operation::Impl::Cleanup());
if(handler) {
Operation op(impl);
handler->onRPC(channel->owner, op);
}
}
} // namespace pvas

View File

@@ -0,0 +1,149 @@
/*
* Copyright information and license terms for this software can be
* found in the file LICENSE that is included with the distribution
*/
#ifndef SHAREDSTATEIMPL_H
#define SHAREDSTATEIMPL_H
#include "pva/sharedstate.h"
#include <pv/pvAccess.h>
#include <pv/reftrack.h>
#define FOR_EACH(TYPE, IT, END, OBJ) for(TYPE IT((OBJ).begin()), END((OBJ).end()); IT != END; ++IT)
namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
namespace pvas {
struct SharedChannel : public pva::Channel,
public std::tr1::enable_shared_from_this<SharedChannel>
{
static size_t num_instances;
const std::tr1::shared_ptr<SharedPV> owner;
const std::string channelName;
const requester_type::weak_pointer requester;
const pva::ChannelProvider::weak_pointer provider;
SharedChannel(const std::tr1::shared_ptr<SharedPV>& owner,
const pva::ChannelProvider::shared_pointer provider,
const std::string& channelName,
const requester_type::shared_pointer& requester);
virtual ~SharedChannel();
virtual void destroy() OVERRIDE FINAL;
virtual std::tr1::shared_ptr<pva::ChannelProvider> getProvider() OVERRIDE FINAL;
virtual std::string getRemoteAddress() OVERRIDE FINAL;
virtual std::string getChannelName() OVERRIDE FINAL;
virtual std::tr1::shared_ptr<pva::ChannelRequester> getChannelRequester() OVERRIDE FINAL;
virtual void getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField) OVERRIDE FINAL;
virtual pva::ChannelPut::shared_pointer createChannelPut(
pva::ChannelPutRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL;
virtual pva::ChannelRPC::shared_pointer createChannelRPC(
pva::ChannelRPCRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL;
virtual pva::Monitor::shared_pointer createMonitor(
pva::MonitorRequester::shared_pointer const & requester,
pvd::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL;
};
struct SharedMonitorFIFO : public pva::MonitorFIFO
{
const std::tr1::shared_ptr<SharedChannel> channel;
SharedMonitorFIFO(const std::tr1::shared_ptr<SharedChannel>& channel,
const requester_type::shared_pointer& requester,
const pvd::PVStructure::const_shared_pointer &pvRequest);
virtual ~SharedMonitorFIFO();
};
struct SharedPut : public pva::ChannelPut,
public std::tr1::enable_shared_from_this<SharedPut>
{
const std::tr1::shared_ptr<SharedChannel> channel;
const requester_type::weak_pointer requester;
const pvd::PVStructure::const_shared_pointer pvRequest;
static size_t num_instances;
SharedPut(const std::tr1::shared_ptr<SharedChannel>& channel,
const requester_type::shared_pointer& requester,
const pvd::PVStructure::const_shared_pointer &pvRequest);
virtual ~SharedPut();
virtual void destroy() OVERRIDE FINAL;
virtual std::tr1::shared_ptr<pva::Channel> getChannel() OVERRIDE FINAL;
virtual void cancel() OVERRIDE FINAL;
virtual void lastRequest() OVERRIDE FINAL;
virtual void put(
epics::pvData::PVStructure::shared_pointer const & pvPutStructure,
epics::pvData::BitSet::shared_pointer const & putBitSet) OVERRIDE FINAL;
virtual void get() OVERRIDE FINAL;
};
struct SharedRPC : public pva::ChannelRPC,
public std::tr1::enable_shared_from_this<SharedRPC>
{
const std::tr1::shared_ptr<SharedChannel> channel;
const requester_type::weak_pointer requester;
const pvd::PVStructure::const_shared_pointer pvRequest;
static size_t num_instances;
SharedRPC(const std::tr1::shared_ptr<SharedChannel>& channel,
const requester_type::shared_pointer& requester,
const pvd::PVStructure::const_shared_pointer &pvRequest);
virtual ~SharedRPC();
virtual void destroy() OVERRIDE FINAL;
virtual std::tr1::shared_ptr<pva::Channel> getChannel() OVERRIDE FINAL;
virtual void cancel() OVERRIDE FINAL;
virtual void lastRequest() OVERRIDE FINAL;
virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument) OVERRIDE FINAL;
};
struct Operation::Impl
{
static size_t num_instances;
epicsMutex mutex;
const pvd::PVStructure::const_shared_pointer pvRequest, value;
const pvd::BitSet changed;
bool done;
int debugLvl;
Impl(const pvd::PVStructure::const_shared_pointer& pvRequest,
const pvd::PVStructure::const_shared_pointer& value,
const pvd::BitSet& changed,
int debugLvl = 0)
:pvRequest(pvRequest), value(value), changed(changed), done(false), debugLvl(debugLvl)
{}
virtual ~Impl() {}
virtual pva::Channel::shared_pointer getChannel() =0;
virtual pva::ChannelBaseRequester::shared_pointer getRequester() =0;
virtual void complete(const pvd::Status& sts,
const epics::pvData::PVStructure* value) =0;
struct Cleanup {
void operator()(Impl*);
};
};
} // namespace pvas
#endif // SHAREDSTATEIMPL_H