diff --git a/examples/Makefile b/examples/Makefile index 1d0b871..213d459 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -16,6 +16,15 @@ monitorme_SRCS = monitorme.cpp TESTPROD_HOST += spamme spamme_SRCS = spamme.cpp +TESTPROD_HOST += mailbox +mailbox_SRCS += mailbox.cpp + +TESTPROD_HOST += epicschat +epicschat_SRCS += epicschat.cpp + +TESTPROD_HOST += lazycounter +lazycounter_SRCS += lazycounter.cpp + TESTPROD_HOST += miniget miniget_SRCS = miniget.cpp diff --git a/examples/epicschat.cpp b/examples/epicschat.cpp new file mode 100644 index 0000000..950799b --- /dev/null +++ b/examples/epicschat.cpp @@ -0,0 +1,200 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include + +#include +#include + +#if !defined(_WIN32) +#include +#define USE_SIGNAL +#endif + +#include +#include +#include + +#include +#include +#include +#include + +namespace pvd = epics::pvData; +namespace pva = epics::pvAccess; + +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + +namespace { + +epicsEvent done; + +#ifdef USE_SIGNAL +void alldone(int num) +{ + (void)num; + done.signal(); +} +#endif + +static pvd::StructureConstPtr string_type(pvd::getFieldCreate()->createFieldBuilder() + ->add("value", pvd::pvString) + ->createStructure()); + +struct ChatHandler : public pvas::SharedPV::Handler +{ + POINTER_DEFINITIONS(ChatHandler); + virtual ~ChatHandler() { + printf("Cleanup Room\n"); + } + virtual void onLastDisconnect(pvas::SharedPV& self) { + printf("Close Room %p\n", &self); + } + virtual void onPut(pvas::SharedPV& self, pvas::Operation& op) { + pva::ChannelRequester::shared_pointer req(op.getChannel()->getChannelRequester()); + std::ostringstream strm; + + if(req) { + strm<getRequesterName()<<" says "; + } else { + op.complete(pvd::Status::error("Defuct Put")); + return; + } + + strm<("value")->get(); + + pvd::PVStructurePtr replacement(pvd::getPVDataCreate()->createPVStructure(string_type)); + + replacement->getSubFieldT("value")->put(strm.str()); + + self.post(*replacement, op.changed()); + op.complete(); + } +}; + +struct RoomHandler : public pvas::DynamicProvider::Handler, + public std::tr1::enable_shared_from_this +{ + POINTER_DEFINITIONS(RoomHandler); + + const std::string prefix; + + mutable epicsMutex mutex; + + typedef std::map rooms_t; + rooms_t rooms; + + RoomHandler(const std::string& prefix) :prefix(prefix) {} + virtual ~RoomHandler() {} + + virtual void hasChannels(pvas::DynamicProvider::search_type& names) OVERRIDE FINAL { + for(pvas::DynamicProvider::search_type::iterator it(names.begin()), end(names.end()); + it != end; ++it) + { + if(it->name().find(prefix)==0) + it->claim(); + } + } + + virtual std::tr1::shared_ptr createChannel(const std::tr1::shared_ptr& provider, + const std::string& name, + const std::tr1::shared_ptr& requester) OVERRIDE FINAL + { + pva::Channel::shared_pointer ret; + + pvas::SharedPV::shared_pointer pv; + bool created = false; + if(name.find(prefix)==0) + { + Guard G(mutex); + + rooms_t::iterator it(rooms.find(name)); + if(it!=rooms.end()) { + // re-use existing? + pv = it->second.lock(); + } + + // rather than deal with wrapped shared_ptr to remove PVs + // as they are destroyed, just sweep each time a new channel is created + for(rooms_t::iterator next(rooms.begin()), end(rooms.end()); next!=end;) { + rooms_t::iterator cur(next++); + if(cur->second.expired()) + rooms.erase(cur); + } + + if(!pv) { + // nope + ChatHandler::shared_pointer handler(new ChatHandler); + pv = pvas::SharedPV::build(handler); + + rooms[name] = pv; + created = true; + } + + } + // unlock + + if(pv) { + if(created) { + pv->open(string_type); + + // set a non-default initial value so that if we are connecting for + // a get, then there will be something to be got. + pvd::PVStructurePtr initial(pvd::getPVDataCreate()->createPVStructure(string_type)); + pvd::PVStringPtr value(initial->getSubFieldT("value")); + value->put("Created!"); + + pv->post(*initial, pvd::BitSet().set(value->getFieldOffset())); + printf("New Room: '%s' for %s as %p\n", name.c_str(), requester->getRequesterName().c_str(), pv.get()); + } else { + printf("Attach Room: '%s' for %s as %p\n", name.c_str(), requester->getRequesterName().c_str(), pv.get()); + } + + ret = pv->connect(provider, name, requester); + } else { + // mis-matched prefix + } + + return ret; + } +}; + +}//namespace + +int main(int argc, char *argv[]) +{ + try { + if(argc<=1) { + fprintf(stderr, "Usage: %s ", argv[0]); + return 1; + } + + RoomHandler::shared_pointer handler(new RoomHandler(argv[1])); + + pvas::DynamicProvider provider("chat", handler); + + pva::ServerContext::shared_pointer server(pva::ServerContext::create( + pva::ServerContext::Config() + // use default config from environment + .provider(provider.provider()) + )); + +#ifdef USE_SIGNAL + signal(SIGINT, alldone); + signal(SIGTERM, alldone); + signal(SIGQUIT, alldone); +#endif + server->printInfo(); + + done.wait(); + + } catch(std::exception& e){ + std::cerr<<"Error: "< + +#include +#include +#include + +#if !defined(_WIN32) +#include +#define USE_SIGNAL +#endif + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace pvd = epics::pvData; +namespace pva = epics::pvAccess; + +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + +namespace { + +epicsEvent done; + +#ifdef USE_SIGNAL +void alldone(int num) +{ + (void)num; + done.signal(); +} +#endif + +// for demonstration purposes, we will switch between two different types. + +static pvd::StructureConstPtr int_type(pvd::getFieldCreate()->createFieldBuilder() + ->add("value", pvd::pvULong) + ->createStructure()); + +static pvd::StructureConstPtr flt_type(pvd::getFieldCreate()->createFieldBuilder() + ->add("value", pvd::pvDouble) + ->createStructure()); + +struct Counter : public pvas::SharedPV::Handler, + public pvd::TimerCallback, + public pva::Destroyable +{ + POINTER_DEFINITIONS(Counter); + + // our name, internally only for logging. + // The searchable channel name is given to the StaticProvider + const std::string name; + const pvas::SharedPV::weak_pointer pv; + pvd::Timer& timer_queue; + + // const after build() + weak_pointer internal_self; + + mutable epicsMutex mutex; + + bool queued; // are we in the Timer queue? + + pvd::uint64 count; + bool typesel; + pvd::PVStructurePtr scratch; + pvd::PVScalarPtr scratch_value; + + static Counter::shared_pointer build(const pvas::SharedPV::shared_pointer& pv, + const std::string& name, + pvd::Timer& timer_queue) { + Counter::shared_pointer internal(new Counter(pv, name, timer_queue)), + external(internal.get(), pva::Destroyable::cleaner(internal)); + // we give out internal ref (to Timer) + internal->internal_self = internal; + // SharedPV keeps us alive. + // destroy() is called when SharedPV is destroyed (or Handler is replace) + pv->setHandler(external); + return external; + } + + Counter(const pvas::SharedPV::shared_pointer& pv, const std::string& name, pvd::Timer& timer_queue) + :name(name) + ,pv(pv) + ,timer_queue(timer_queue) + ,queued(false) + ,count(0u) + ,typesel(false) + {} + virtual ~Counter() { + printf("%s: destroy\n", name.c_str()); + } + + virtual void destroy() OVERRIDE FINAL { + + { + Guard G(mutex); + if(!queued) return; + queued = false; + } + printf("%s: shutdown\n", name.c_str()); + timer_queue.cancel(shared_pointer(internal_self)); + } + + // when we go from zero clients connected to more than one client connected. + virtual void onFirstConnect(const pvas::SharedPV::shared_pointer& pv) OVERRIDE FINAL { + { + Guard G(mutex); + assert(!queued); + queued = true; + } + printf("%s: starting\n", name.c_str()); + // timer first expires after 1 second, then again every second. + // so any operation (including pvinfo) will take 1 second. + timer_queue.schedulePeriodic(shared_pointer(internal_self), 1.0, 1.0); + } + + // timer expires + virtual void callback() OVERRIDE FINAL { + bool open; + pvd::uint64 next; + pvd::PVStructurePtr top; + pvd::BitSet vmask; + { + Guard G(mutex); + if(!queued) return; + + open = !scratch; + if(open) { + // first expiration after onFirstConnect() + // select type. + pvd::StructureConstPtr type = typesel ? int_type : flt_type; + typesel = !typesel; + + scratch = pvd::getPVDataCreate()->createPVStructure(type); + scratch_value = scratch->getSubFieldT("value"); + } + + // store counter value + next = count++; + scratch_value->putFrom(next); + vmask.set(scratch_value->getFieldOffset()); + + // We will use the PVStructure when the lock is not held. + // This is safe as it is only modified from this (Timer) + // thread. + top = scratch; + } + + pvas::SharedPV::shared_pointer pv(this->pv); + + if(open) { + // go from closed -> open. + // provide initial value (and new type) + printf("%s: open %llu\n", name.c_str(), (unsigned long long)next); + pv->open(*top, vmask); + } else { + // post update + printf("%s: tick %llu\n", name.c_str(), (unsigned long long)next); + pv->post(*top, vmask); + } + } + + virtual void timerStopped() OVERRIDE FINAL {} + + // when we go from 1 client connected to zero clients connected. + virtual void onLastDisconnect(const pvas::SharedPV::shared_pointer& pv) OVERRIDE FINAL { + bool close; + bool cancel; + { + Guard G(mutex); + cancel = queued; + queued = false; + close = !!scratch; + + scratch.reset(); + scratch_value.reset(); + } + // !close implies only all clients disconnect before timer expires the first time + if(close) { + printf("%s: close\n", name.c_str()); + pv->close(); + } + if(cancel) { + timer_queue.cancel(shared_pointer(internal_self)); + } + printf("%s: stopping\n", name.c_str()); + } +}; + +} //namespace + +int main(int argc, char *argv[]) +{ + try { + if(argc<=1) { + fprintf(stderr, "Usage: %s ...\n", argv[0]); + return 1; + } + + pvd::Timer timer_queue("counters", (pvd::ThreadPriority)epicsThreadPriorityMedium); + + pvas::StaticProvider provider("counters"); // provider name "counters" is arbitrary + + for(int i=1; iprintInfo(); + + printf("Running with counters\n"); + + done.wait(); + + timer_queue.close(); // joins timer worker + + } catch(std::exception& e){ + std::cerr<<"Error: "< + +#if !defined(_WIN32) +#include +#define USE_SIGNAL +#endif + +#include + +#include +#include +#include +#include + +namespace pvd = epics::pvData; +namespace pva = epics::pvAccess; + +namespace { + +epicsEvent done; + +#ifdef USE_SIGNAL +void alldone(int num) +{ + (void)num; + done.signal(); +} +#endif + +static pvd::StructureConstPtr string_type(pvd::getFieldCreate()->createFieldBuilder() + ->add("value", pvd::pvString) + ->createStructure()); + +static pvd::StructureConstPtr int_type(pvd::getFieldCreate()->createFieldBuilder() + ->add("value", pvd::pvInt) + ->createStructure()); + +static pvd::StructureConstPtr real_type(pvd::getFieldCreate()->createFieldBuilder() + ->add("value", pvd::pvDouble) + ->createStructure()); + +}//namespace + +int main(int argc, char *argv[]) +{ + try { + if(argc<=1) { + fprintf(stderr, "Usage: %s ...\n type: string, int, real", argv[0]); + return 1; + } + + // container for PVs + pvas::StaticProvider provider("mailbox"); // provider name "mailbox" is arbitrary + + for(int i=1; iopen(string_type); + } else if(type=="int") { + pv->open(int_type); + } else if(type=="real") { + pv->open(real_type); + } else { + fprintf(stderr, "Unknown type '%s'\n", type.c_str()); + return 1; + } + + // add to container + provider.add(argv[1], pv); + } + + // create and run network server + pva::ServerContext::shared_pointer server(pva::ServerContext::create( + pva::ServerContext::Config() + // use default config from environment + .provider(provider.provider()) + )); + +#ifdef USE_SIGNAL + signal(SIGINT, alldone); + signal(SIGTERM, alldone); + signal(SIGQUIT, alldone); +#endif + server->printInfo(); + + printf("Running with mailbox '%s'\n", argv[1]); + + done.wait(); + + } catch(std::exception& e){ + std::cerr<<"Error: "< #include #include +#include using namespace epics::pvData; using std::string; @@ -225,6 +226,9 @@ void providerRegInit(void*) registerRefCounter("ResponseHandler (ABC)", &ResponseHandler::num_instances); registerRefCounter("MonitorFIFO", &MonitorFIFO::num_instances); pvas::registerRefTrackServer(); + registerRefCounter("pvas::SharedChannel", &pvas::SharedChannel::num_instances); + registerRefCounter("pvas::SharedPut", &pvas::SharedPut::num_instances); + registerRefCounter("pvas::SharedRPC", &pvas::SharedRPC::num_instances); } ChannelProviderRegistry::shared_pointer ChannelProviderRegistry::clients() diff --git a/src/server/Makefile b/src/server/Makefile index 7b0195e..4a5fd93 100644 --- a/src/server/Makefile +++ b/src/server/Makefile @@ -5,6 +5,7 @@ SRC_DIRS += $(PVACCESS_SRC)/server INC += pv/serverContext.h INC += pv/beaconServerStatusProvider.h INC += pva/server.h +INC += pva/sharedstate.h pvAccess_SRCS += responseHandlers.cpp pvAccess_SRCS += serverContext.cpp @@ -13,3 +14,7 @@ pvAccess_SRCS += baseChannelRequester.cpp pvAccess_SRCS += beaconEmitter.cpp pvAccess_SRCS += beaconServerStatusProvider.cpp pvAccess_SRCS += server.cpp +pvAccess_SRCS += sharedstate_pv.cpp +pvAccess_SRCS += sharedstate_channel.cpp +pvAccess_SRCS += sharedstate_rpc.cpp +pvAccess_SRCS += sharedstate_put.cpp diff --git a/src/server/pva/sharedstate.h b/src/server/pva/sharedstate.h new file mode 100644 index 0000000..4b9b5f7 --- /dev/null +++ b/src/server/pva/sharedstate.h @@ -0,0 +1,256 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ +#ifndef PV_SHAREDSTATE_H +#define PV_SHAREDSTATE_H + +#include +#include + +#include +#include +#include +#include + +#include + +namespace epics{namespace pvData{ +class Structure; +class PVStructure; +class BitSet; +class Status; +}} // epics::pvData +namespace epics{namespace pvAccess{ +class ChannelProvider; +class Channel; +class ChannelRequester; +class ChannelBaseRequester; +class GetFieldRequester; +}} // epics::pvAccess + +namespace pvas { + +struct SharedChannel; +struct SharedMonitorFIFO; +struct SharedPut; +struct SharedRPC; + +struct Operation; + +/** @addtogroup pvas + * @{ + */ + +/** A Shared State Process Variable (PV) + * + * "Shared" in the sense that all clients/subscribers interact with the + * same PVStructure. + * + * @warning For the purposes of locking, this class is an Operation (see @ref provider_roles_requester_locking). + * eg. no locks may be held when calling post(), open(), close(), or connect(). + * + * This class contains a cached PVStructure, which is updated by post(), + * also a list of subscribing clients and in-progress network Operations. + * + * On construction a SharedPV is in a "disconnected" state. + * It has no associated PVStructure (or Structure). No type. + * A type is associated via the open() method. + * After it has been open()'d. Calls to post() may be made. + * Calling close() will close all currently opened client channels. + * + * Client channels, and operations on them, may be initiated at any time (via connect()). + * However, operations will not be fully created until open() is called. + * + * @note A SharedPV does not have a name. Name(s) are associated with a SharedPV + * By a Provider (StaticProvider, DynamicProvider, or any epics::pvAccess::ChannelProvider). + * These channel names may be seen via connect() + * + * @see @ref pvas_sharedptr + */ +class epicsShareClass SharedPV + : public pvas::StaticProvider::ChannelBuilder +{ + friend struct SharedChannel; + friend struct SharedMonitorFIFO; + friend struct SharedPut; + friend struct SharedRPC; +public: + POINTER_DEFINITIONS(SharedPV); + /** Callbacks associated with a SharedPV. + * + * @note For the purposes of locking, this class is an Requester (see @ref provider_roles_requester_locking) + */ + struct epicsShareClass Handler { + POINTER_DEFINITIONS(Handler); + virtual ~Handler() {} + virtual void onFirstConnect(const SharedPV::shared_pointer& pv) {} + //! Called when the last client disconnects. May close() + virtual void onLastDisconnect(const SharedPV::shared_pointer& pv) {} + //! Client requests Put + virtual void onPut(const SharedPV::shared_pointer& pv, Operation& op) {} + //! Client requests RPC + virtual void onRPC(const SharedPV::shared_pointer& pv, Operation& op) {} + }; + + /** Allocate a new PV in the closed state. + * @param handler Our callbacks. May be NULL. Stored internally as a shared_ptr<> + * @post In the closed state + */ + static shared_pointer build(const std::tr1::shared_ptr& handler); + //! A SharedPV which fails all Put and RPC operations. + static shared_pointer buildReadOnly(); + //! A SharedPV which accepts all Put operations, and fails all RPC operations. + static shared_pointer buildMailbox(); +private: + explicit SharedPV(const std::tr1::shared_ptr& handler); +public: + virtual ~SharedPV(); + + //! Replace Handler given with ctor + void setHandler(const std::tr1::shared_ptr& handler); + Handler::shared_pointer getHandler() const; + + //! test open-ness. cf. open() and close() + bool isOpen() const; + + //! Shorthand for @code open(value, pvd::BitSet().set(0)) @endcode + void open(const epics::pvData::PVStructure& value); + + //! Begin allowing clients to connect. + //! @param value The initial value of this PV. (any pending Get operation will complete this this) + //! @param valid Only these marked fields are considered to have non-default values. + //! @throws std::logic_error if not in the closed state. + //! @post In the opened state + //! @note Provider locking rules apply (@see provider_roles_requester_locking). + void open(const epics::pvData::PVStructure& value, const epics::pvData::BitSet& valid); + + //! Shorthand for @code open(*pvd::getPVDataCreate()->createPVStructure(type), pvd::BitSet().set(0)) @endcode + void open(const epics::pvData::StructureConstPtr& type); + + //! Force any clients to disconnect, and prevent re-connection + //! @param destroy Indicate whether this close() is permanent for clients. + //! If destroy=false, the internal client list is retained, and these clients will see a subsequent open(). + //! If destory=true, the internal client list is cleared. + //! @post In the closed state + //! @note Provider locking rules apply (@see provider_roles_requester_locking). + virtual void close(bool destroy=false); + + //! Create a new container which may be used to prepare to call post(). + //! This container will be owned exclusively by the caller. + std::tr1::shared_ptr build(); + + //! Update the cached PVStructure in this SharedPV. + //! Only those fields marked as changed will be copied in. + //! Makes a light-weight copy. + //! @pre isOpen()==true + //! @throws std::logic_error if !isOpen() + //! @note Provider locking rules apply (@see provider_roles_requester_locking). + void post(const epics::pvData::PVStructure& value, + const epics::pvData::BitSet& changed); + + //! Update arguments with current value, which is the initial value from open() with accumulated post() calls. + void fetch(epics::pvData::PVStructure& value, epics::pvData::BitSet& valid); + + //! may call Handler::onFirstConnect() + //! @note Provider locking rules apply (@see provider_roles_requester_locking). + virtual std::tr1::shared_ptr connect( + const std::tr1::shared_ptr& provider, + const std::string& channelName, + const std::tr1::shared_ptr& requester); + + void setDebug(int lvl); + int isDebug() const; + +private: + static size_t num_instances; + + weak_pointer internal_self; // const after build() + + mutable epicsMutex mutex; + + std::tr1::shared_ptr handler; + + typedef std::list puts_t; + typedef std::list rpcs_t; + typedef std::list monitors_t; + typedef std::list > getfields_t; + typedef std::list channels_t; + + std::tr1::shared_ptr type; + + puts_t puts; + rpcs_t rpcs; + monitors_t monitors; + getfields_t getfields; + channels_t channels; + + std::tr1::shared_ptr current; + //! mask of fields which are considered to have non-default values. + //! Used for initial Monitor update and Get operations. + epics::pvData::BitSet valid; + + int debugLvl; + + EPICS_NOT_COPYABLE(SharedPV) +}; + +//! An in-progress network operation (Put or RPC). +//! Use value(), changed() to see input data, and +//! call complete() when done handling. +struct epicsShareClass Operation { + POINTER_DEFINITIONS(Operation); + struct Impl; +private: + std::tr1::shared_ptr impl; + + friend struct SharedPut; + friend struct SharedRPC; + explicit Operation(const std::tr1::shared_ptr impl); +public: + Operation() {} //!< create empty op for later assignment + + //! pvRequest blob, may be used to modify handling. + const epics::pvData::PVStructure& pvRequest() const; + const epics::pvData::PVStructure& value() const; //!< Input data + //! Applies to value(). Which fields of input data are actual valid. Others should not be used. + const epics::pvData::BitSet& changed() const; + //! The name of the channel through which this request was made (eg. for logging purposes). + std::string channelName() const; + + void complete(); //!< shorthand for successful completion w/o data (Put or RPC with void return) + //! Complete with success or error w/o data. + void complete(const epics::pvData::Status& sts); + //! Sucessful completion with data (RPC only) + void complete(const epics::pvData::PVStructure& value, + const epics::pvData::BitSet& changed); + + //! Send info message to client. Does not complete(). + void info(const std::string&); + //! Send warning message to client. Does not complete(). + void warn(const std::string&); + + int isDebug() const; + + // escape hatch. never NULL + std::tr1::shared_ptr getChannel(); + // escape hatch. may be NULL + std::tr1::shared_ptr getRequester(); + + bool valid() const; + +#if __cplusplus>=201103L + explicit operator bool() const { return valid(); } +#else +private: + typedef bool (Operation::*bool_type)() const; +public: + operator bool_type() const { return valid() ? &Operation::valid : 0; } +#endif +}; + +} // namespace pvas + +//! @} + +#endif // PV_SHAREDSTATE_H diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp new file mode 100644 index 0000000..cf12656 --- /dev/null +++ b/src/server/sharedstate_channel.cpp @@ -0,0 +1,277 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ + +#include + +#include +#include +#include + +#include +#include +#include + +#define epicsExportSharedSymbols +#include "sharedstateimpl.h" + +namespace pvas { + +size_t SharedChannel::num_instances; + + +SharedChannel::SharedChannel(const std::tr1::shared_ptr &owner, + const pva::ChannelProvider::shared_pointer provider, + const std::string& channelName, + const requester_type::shared_pointer& requester) + :owner(owner) + ,channelName(channelName) + ,requester(requester) + ,provider(provider) +{ + REFTRACE_INCREMENT(num_instances); + + if(owner->debugLvl>5) { + errlogPrintf("%s : Open channel to %s > %p\n", + requester->getRequesterName().c_str(), + channelName.c_str(), + this); + } + + SharedPV::Handler::shared_pointer handler; + { + Guard G(owner->mutex); + if(owner->channels.empty()) + handler = owner->handler; + owner->channels.push_back(this); + } + if(handler) { + handler->onFirstConnect(owner); + } +} + +SharedChannel::~SharedChannel() +{ + std::tr1::shared_ptr handler; + { + Guard G(owner->mutex); + owner->channels.remove(this); + if(owner->channels.empty()) { + Guard G(owner->mutex); + handler = owner->handler; + } + } + if(handler) { + handler->onLastDisconnect(owner); + } + if(owner->debugLvl>5) + { + pva::ChannelRequester::shared_pointer req(requester.lock()); + errlogPrintf("%s : Open channel to %s > %p\n", + req ? req->getRequesterName().c_str() : "", + channelName.c_str(), + this); + } + + REFTRACE_DECREMENT(num_instances); +} + +void SharedChannel::destroy() {} + +std::tr1::shared_ptr SharedChannel::getProvider() +{ + return provider.lock(); +} + +std::string SharedChannel::getRemoteAddress() +{ + return getChannelName(); // for lack of anything better to do... +} + +std::string SharedChannel::getChannelName() +{ + return channelName; +} + +std::tr1::shared_ptr SharedChannel::getChannelRequester() +{ + return requester.lock(); +} + +void SharedChannel::getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField) +{ + epics::pvData::FieldConstPtr desc; + { + Guard G(owner->mutex); + if(owner->type) + desc = owner->type; + else + owner->getfields.push_back(requester); + } + if(desc) + requester->getDone(pvd::Status(), desc); +} + +pva::ChannelPut::shared_pointer SharedChannel::createChannelPut( + pva::ChannelPutRequester::shared_pointer const & requester, + pvd::PVStructure::shared_pointer const & pvRequest) +{ + std::tr1::shared_ptr ret(new SharedPut(shared_from_this(), requester, pvRequest)); + + pvd::StructureConstPtr type; + { + Guard G(owner->mutex); + // ~SharedPut removes + owner->puts.push_back(ret.get()); + type = owner->type; + } + if(type) + requester->channelPutConnect(pvd::Status(), ret, type); + return ret; +} + +pva::ChannelRPC::shared_pointer SharedChannel::createChannelRPC( + pva::ChannelRPCRequester::shared_pointer const & requester, + pvd::PVStructure::shared_pointer const & pvRequest) +{ + std::tr1::shared_ptr ret(new SharedRPC(shared_from_this(), requester, pvRequest)); + bool opened; + { + Guard G(owner->mutex); + owner->rpcs.push_back(ret.get()); + opened = !!owner->type; + } + if(opened) + requester->channelRPCConnect(pvd::Status(), ret); + return ret; +} + +pva::Monitor::shared_pointer SharedChannel::createMonitor( + pva::MonitorRequester::shared_pointer const & requester, + pvd::PVStructure::shared_pointer const & pvRequest) +{ + std::tr1::shared_ptr ret(new SharedMonitorFIFO(shared_from_this(), requester, pvRequest)); + bool notify; + { + Guard G(owner->mutex); + owner->monitors.push_back(ret.get()); + notify = !!owner->type; + if(notify) { + ret->open(owner->type); + // post initial update + ret->post(*owner->current, owner->valid); + } + } + if(notify) + ret->notify(); + return ret; +} + + +SharedMonitorFIFO::SharedMonitorFIFO(const std::tr1::shared_ptr& channel, + const requester_type::shared_pointer& requester, + const pvd::PVStructure::const_shared_pointer &pvRequest) + :pva::MonitorFIFO(requester, pvRequest) + ,channel(channel) +{} + +SharedMonitorFIFO::~SharedMonitorFIFO() +{ + Guard G(channel->owner->mutex); + channel->owner->monitors.remove(this); +} + +Operation::Operation(const std::tr1::shared_ptr impl) + :impl(impl) +{} + +const epics::pvData::PVStructure& Operation::pvRequest() const +{ + return *impl->pvRequest; +} + +const epics::pvData::PVStructure& Operation::value() const +{ + return *impl->value; +} + +const epics::pvData::BitSet& Operation::changed() const +{ + return impl->changed; +} + +std::string Operation::channelName() const +{ + std::string ret; + std::tr1::shared_ptr chan(impl->getChannel()); + if(chan) { + ret = chan->getChannelName(); + } + return ret; +} + +void Operation::complete() +{ + impl->complete(pvd::Status(), 0); +} + +void Operation::complete(const epics::pvData::Status& sts) +{ + impl->complete(sts, 0); +} + +void Operation::complete(const epics::pvData::PVStructure &value, + const epics::pvData::BitSet &changed) +{ + impl->complete(pvd::Status(), &value); +} + +void Operation::info(const std::string& msg) +{ + pva::ChannelBaseRequester::shared_pointer req(impl->getRequester()); + if(req) + req->message(msg, pvd::infoMessage); +} + +void Operation::warn(const std::string& msg) +{ + pva::ChannelBaseRequester::shared_pointer req(impl->getRequester()); + if(req) + req->message(msg, pvd::warningMessage); +} + +int Operation::isDebug() const +{ + Guard G(impl->mutex); + return impl->debugLvl; +} + +std::tr1::shared_ptr Operation::getChannel() +{ + return impl->getChannel(); +} + +std::tr1::shared_ptr Operation::getRequester() +{ + return impl->getRequester(); +} + +bool Operation::valid() const +{ + return !!impl; +} + +void Operation::Impl::Cleanup::operator ()(Operation::Impl* impl) { + bool err; + { + Guard G(impl->mutex); + err = !impl->done; + } + if(err) + impl->complete(pvd::Status::error("Implicit Cancel"), 0); + + delete impl; +} + +} // namespace pvas diff --git a/src/server/sharedstate_put.cpp b/src/server/sharedstate_put.cpp new file mode 100644 index 0000000..c9b9efc --- /dev/null +++ b/src/server/sharedstate_put.cpp @@ -0,0 +1,142 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ + +#include + +#include +#include + +#include +#include +#include + +#define epicsExportSharedSymbols +#include "sharedstateimpl.h" + +namespace { +struct PutOP : public pvas::Operation::Impl +{ + const std::tr1::shared_ptr op; + + PutOP(const std::tr1::shared_ptr& op, + const pvd::PVStructure::const_shared_pointer& pvRequest, + const pvd::PVStructure::const_shared_pointer& value, + const pvd::BitSet& changed) + :Impl(pvRequest, value, changed) + ,op(op) + {} + virtual ~PutOP() {} + + virtual pva::Channel::shared_pointer getChannel() OVERRIDE FINAL + { + return op->channel; + } + + virtual pva::ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL + { + return op->requester.lock(); + } + + virtual void complete(const pvd::Status& sts, + const epics::pvData::PVStructure* value) OVERRIDE FINAL + { + if(value) + throw std::logic_error("Put can't complete() with data"); + + { + Guard G(mutex); + if(done) + throw std::logic_error("Operation already complete"); + done = true; + } + + pva::ChannelPutRequester::shared_pointer req(op->requester.lock()); + if(req) + req->putDone(sts, op); + } +}; +} + + +namespace pvas { + +size_t SharedPut::num_instances; + +SharedPut::SharedPut(const std::tr1::shared_ptr& channel, + const requester_type::shared_pointer& requester, + const pvd::PVStructure::const_shared_pointer &pvRequest) + :channel(channel) + ,requester(requester) + ,pvRequest(pvRequest) +{ + REFTRACE_INCREMENT(num_instances); +} + +SharedPut::~SharedPut() +{ + Guard G(channel->owner->mutex); + channel->owner->puts.remove(this); + REFTRACE_DECREMENT(num_instances); +} + +void SharedPut::destroy() {} + +std::tr1::shared_ptr SharedPut::getChannel() +{ + return channel; +} + +void SharedPut::cancel() {} + +void SharedPut::lastRequest() {} + +void SharedPut::put( + pvd::PVStructure::shared_pointer const & pvPutStructure, + pvd::BitSet::shared_pointer const & putBitSet) +{ + std::tr1::shared_ptr handler; + { + Guard G(channel->owner->mutex); + handler = channel->owner->handler; + } + + std::tr1::shared_ptr impl(new PutOP(shared_from_this(), pvRequest, pvPutStructure, *putBitSet), + Operation::Impl::Cleanup()); + + if(handler) { + Operation op(impl); + handler->onPut(channel->owner, op); + } +} + +void SharedPut::get() +{ + + pvd::Status sts; + pvd::PVStructurePtr current; + pvd::BitSetPtr changed; + { + Guard G(channel->owner->mutex); + + if(channel->owner->current) { + // clone + current = pvd::getPVDataCreate()->createPVStructure(channel->owner->current->getStructure()); + current->copyUnchecked(*channel->owner->current); + + changed.reset(new pvd::BitSet(channel->owner->valid)); + } + } + + requester_type::shared_pointer req(requester.lock()); + if(!req) return; + + if(!current) { + sts = pvd::Status::error("Get not possible, cache disabled"); + } + + req->getDone(sts, shared_from_this(), current, changed); +} + +} // namespace pvas diff --git a/src/server/sharedstate_pv.cpp b/src/server/sharedstate_pv.cpp new file mode 100644 index 0000000..3d45c83 --- /dev/null +++ b/src/server/sharedstate_pv.cpp @@ -0,0 +1,312 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ + +#include + +#include +#include +#include + +#include +#include +#include + +#define epicsExportSharedSymbols +#include "sharedstateimpl.h" + + +namespace { +struct MailboxHandler : public pvas::SharedPV::Handler { + virtual ~MailboxHandler() {} + virtual void onPut(pvas::SharedPV& self, pvas::Operation& op) + { + self.post(op.value(), op.changed()); + op.info("Set!"); + op.complete(); + } + static std::tr1::shared_ptr build() { + std::tr1::shared_ptr ret(new MailboxHandler); + return ret; + } +}; +} // namespace + +namespace pvas { + +size_t SharedPV::num_instances; + +SharedPV::shared_pointer SharedPV::build(const std::tr1::shared_ptr& handler) +{ + assert(!!handler); + SharedPV::shared_pointer ret(new SharedPV(handler)); + ret->internal_self = ret; + return ret; +} + +SharedPV::shared_pointer SharedPV::buildReadOnly() +{ + SharedPV::shared_pointer ret(new SharedPV(std::tr1::shared_ptr())); + ret->internal_self = ret; + return ret; +} + +SharedPV::shared_pointer SharedPV::buildMailbox() +{ + std::tr1::shared_ptr handler(new MailboxHandler); + SharedPV::shared_pointer ret(new SharedPV(handler)); + ret->internal_self = ret; + return ret; +} + +SharedPV::SharedPV(const std::tr1::shared_ptr &handler) + :handler(handler) + ,debugLvl(0) +{ + REFTRACE_INCREMENT(num_instances); +} + +SharedPV::~SharedPV() { + REFTRACE_DECREMENT(num_instances); +} + +void SharedPV::setHandler(const std::tr1::shared_ptr& handler) +{ + Guard G(mutex); + this->handler = handler; +} + +SharedPV::Handler::shared_pointer SharedPV::getHandler() const +{ + Guard G(mutex); + return handler; +} + + +bool SharedPV::isOpen() const +{ + Guard G(mutex); + return !!type; +} + +void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& valid) +{ + typedef std::vector > xputs_t; + typedef std::vector > xrpcs_t; + typedef std::vector > xmonitors_t; + typedef std::vector > xgetfields_t; + + const pvd::StructureConstPtr newtype(value.getStructure()); + + xputs_t p_put; + xrpcs_t p_rpc; + xmonitors_t p_monitor; + xgetfields_t p_getfield; + { + Guard I(mutex); + + if(type) + throw std::logic_error("Already open()"); + + p_put.reserve(puts.size()); + p_rpc.reserve(rpcs.size()); + p_monitor.reserve(monitors.size()); + p_getfield.reserve(getfields.size()); + + type = value.getStructure(); + current = pvd::getPVDataCreate()->createPVStructure(newtype); + current->copyUnchecked(value); + this->valid = valid; + + FOR_EACH(puts_t::const_iterator, it, end, puts) { + try { + p_put.push_back((*it)->shared_from_this()); + }catch(std::tr1::bad_weak_ptr&) { + //racing destruction + } + } + FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { + try { + p_rpc.push_back((*it)->shared_from_this()); + }catch(std::tr1::bad_weak_ptr&) {} + } + FOR_EACH(monitors_t::const_iterator, it, end, monitors) { + try { + (*it)->open(newtype); + // post initial update + (*it)->post(*current, valid); + p_monitor.push_back((*it)->shared_from_this()); + }catch(std::tr1::bad_weak_ptr&) {} + } + // consume getField + FOR_EACH(getfields_t::iterator, it, end, getfields) { + p_getfield.push_back(it->lock()); + } + getfields.clear(); // consume + } + FOR_EACH(xputs_t::iterator, it, end, p_put) { + SharedPut::requester_type::shared_pointer requester((*it)->requester.lock()); + if(requester) requester->channelPutConnect(pvd::Status(), *it, newtype); + } + FOR_EACH(xrpcs_t::iterator, it, end, p_rpc) { + SharedRPC::requester_type::shared_pointer requester((*it)->requester.lock()); + if(requester) requester->channelRPCConnect(pvd::Status(), *it); + } + FOR_EACH(xmonitors_t::iterator, it, end, p_monitor) { + (*it)->notify(); + } + FOR_EACH(xgetfields_t::iterator, it, end, p_getfield) { + if(*it) (*it)->getDone(pvd::Status(), newtype); + } +} + +void SharedPV::open(const epics::pvData::PVStructure& value) +{ + // consider all fields to have non-default values. For users how don't keep track of this. + open(value, pvd::BitSet().set(0)); +} + +void SharedPV::open(const pvd::StructureConstPtr& type) +{ + pvd::PVStructurePtr value(pvd::getPVDataCreate()->createPVStructure(type)); + open(*value); +} + +void SharedPV::close(bool destroy) +{ + typedef std::vector > xputs_t; + typedef std::vector > xrpcs_t; + typedef std::vector > xmonitors_t; + typedef std::vector > xchannels_t; + + xputs_t p_put; + xrpcs_t p_rpc; + xmonitors_t p_monitor; + xchannels_t p_channel; + { + Guard I(mutex); + + if(!type) + return; + + p_put.reserve(puts.size()); + p_rpc.reserve(rpcs.size()); + p_monitor.reserve(monitors.size()); + p_channel.reserve(channels.size()); + + FOR_EACH(puts_t::const_iterator, it, end, puts) { + p_put.push_back((*it)->requester.lock()); + } + FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { + p_rpc.push_back((*it)->requester.lock()); + } + FOR_EACH(monitors_t::const_iterator, it, end, monitors) { + (*it)->close(); + p_monitor.push_back((*it)->shared_from_this()); + } + FOR_EACH(channels_t::const_iterator, it, end, channels) { + p_channel.push_back((*it)->shared_from_this()); + } + + type.reset(); + current.reset(); + if(destroy) { + // forget about all clients, to prevent the possibility of our + // sending a second destroy notification. + puts.clear(); + rpcs.clear(); + monitors.clear(); + channels.clear(); + } + } + FOR_EACH(xputs_t::iterator, it, end, p_put) { + if(*it) (*it)->channelDisconnect(destroy); + } + FOR_EACH(xrpcs_t::iterator, it, end, p_rpc) { + if(*it) (*it)->channelDisconnect(destroy); + } + FOR_EACH(xmonitors_t::iterator, it, end, p_monitor) { + (*it)->notify(); + } + FOR_EACH(xchannels_t::iterator, it, end, p_channel) { + pva::ChannelRequester::shared_pointer req((*it)->requester.lock()); + if(!req) continue; + req->channelStateChange(*it, destroy ? pva::Channel::DESTROYED : pva::Channel::DISCONNECTED); + } +} + +pvd::PVStructure::shared_pointer SharedPV::build() +{ + Guard G(mutex); + if(!type) + throw std::logic_error("Can't build() before open()"); + return pvd::getPVDataCreate()->createPVStructure(type); +} + +void SharedPV::post(const pvd::PVStructure& value, + const pvd::BitSet& changed) +{ + typedef std::vector > xmonitors_t; + xmonitors_t p_monitor; + { + Guard I(mutex); + + if(!type) + throw std::logic_error("Not open()"); + else if(*type!=*value.getStructure()) + throw std::logic_error("Type mis-match"); + + if(current) { + current->copyUnchecked(value, changed); + valid |= changed; + } + + p_monitor.reserve(monitors.size()); // ick, for lack of a list with thread-safe iteration + + FOR_EACH(monitors_t::const_iterator, it, end, monitors) { + (*it)->post(value, changed); + p_monitor.push_back((*it)->shared_from_this()); + } + } + FOR_EACH(xmonitors_t::iterator, it, end, p_monitor) { + (*it)->notify(); + } +} + +void SharedPV::fetch(epics::pvData::PVStructure& value, epics::pvData::BitSet& valid) +{ + Guard I(mutex); + if(!type) + throw std::logic_error("Not open()"); + else if(value.getStructure()!=type) + throw std::logic_error("Types do not match"); + + value.copy(*current); + valid = this->valid; +} + + +std::tr1::shared_ptr +SharedPV::connect(const std::tr1::shared_ptr &provider, + const std::string &channelName, + const std::tr1::shared_ptr& requester) +{ + shared_pointer self(internal_self); + std::tr1::shared_ptr ret(new SharedChannel(self, provider, channelName, requester)); + return ret; +} + +void SharedPV::setDebug(int lvl) +{ + Guard G(mutex); + debugLvl = lvl; +} + +int SharedPV::isDebug() const +{ + Guard G(mutex); + return debugLvl; +} + +} // namespace pvas diff --git a/src/server/sharedstate_rpc.cpp b/src/server/sharedstate_rpc.cpp new file mode 100644 index 0000000..48a5233 --- /dev/null +++ b/src/server/sharedstate_rpc.cpp @@ -0,0 +1,119 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ + +#include + +#include +#include + +#include +#include +#include + +#define epicsExportSharedSymbols +#include "sharedstateimpl.h" + +namespace { +struct RPCOP : public pvas::Operation::Impl +{ + const std::tr1::shared_ptr op; + + RPCOP(const std::tr1::shared_ptr& op, + const pvd::PVStructure::const_shared_pointer& pvRequest, + const pvd::PVStructure::const_shared_pointer& value) + :Impl(pvRequest, value, pvd::BitSet().set(0)) + ,op(op) + {} + virtual ~RPCOP() {} + + virtual pva::Channel::shared_pointer getChannel() OVERRIDE FINAL + { + return op->channel; + } + + virtual pva::ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL + { + return op->requester.lock(); + } + + virtual void complete(const pvd::Status& sts, + const epics::pvData::PVStructure* value) OVERRIDE FINAL + { + { + Guard G(mutex); + if(done) + throw std::logic_error("Operation already complete"); + done = true; + } + epics::pvData::PVStructurePtr tosend; + if(!sts.isSuccess()) { + // no data for error + } else if(value) { + tosend = pvd::getPVDataCreate()->createPVStructure(value->getStructure()); + tosend->copyUnchecked(*value); + } else { + // RPC with null result. Make empty structure + tosend = pvd::getPVDataCreate()->createPVStructure( + pvd::getFieldCreate() + ->createFieldBuilder() + ->createStructure()); + } + pva::ChannelRPCRequester::shared_pointer req(op->requester.lock()); + if(req) + req->requestDone(sts, op, tosend); + } +}; +} + +namespace pvas { + +size_t SharedRPC::num_instances; + +SharedRPC::SharedRPC(const std::tr1::shared_ptr& channel, + const requester_type::shared_pointer& requester, + const pvd::PVStructure::const_shared_pointer &pvRequest) + :channel(channel) + ,requester(requester) + ,pvRequest(pvRequest) +{ + REFTRACE_INCREMENT(num_instances); +} + +SharedRPC::~SharedRPC() { + Guard G(channel->owner->mutex); + channel->owner->rpcs.remove(this); + REFTRACE_DECREMENT(num_instances); +} + +void SharedRPC::destroy() {} + +std::tr1::shared_ptr SharedRPC::getChannel() +{ + return channel; +} + +void SharedRPC::cancel() {} + +void SharedRPC::lastRequest() {} + +void SharedRPC::request(epics::pvData::PVStructure::shared_pointer const & pvArgument) +{ + std::tr1::shared_ptr handler; + { + Guard G(channel->owner->mutex); + handler = channel->owner->handler; + } + + std::tr1::shared_ptr impl(new RPCOP(shared_from_this(), pvRequest, pvArgument), + Operation::Impl::Cleanup()); + + if(handler) { + Operation op(impl); + handler->onRPC(channel->owner, op); + } +} + + +} // namespace pvas diff --git a/src/server/sharedstateimpl.h b/src/server/sharedstateimpl.h new file mode 100644 index 0000000..cf648eb --- /dev/null +++ b/src/server/sharedstateimpl.h @@ -0,0 +1,149 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ +#ifndef SHAREDSTATEIMPL_H +#define SHAREDSTATEIMPL_H + +#include "pva/sharedstate.h" +#include +#include + +#define FOR_EACH(TYPE, IT, END, OBJ) for(TYPE IT((OBJ).begin()), END((OBJ).end()); IT != END; ++IT) + +namespace pvd = epics::pvData; +namespace pva = epics::pvAccess; + +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + +namespace pvas { + +struct SharedChannel : public pva::Channel, + public std::tr1::enable_shared_from_this +{ + static size_t num_instances; + + const std::tr1::shared_ptr owner; + const std::string channelName; + const requester_type::weak_pointer requester; + const pva::ChannelProvider::weak_pointer provider; + + SharedChannel(const std::tr1::shared_ptr& owner, + const pva::ChannelProvider::shared_pointer provider, + const std::string& channelName, + const requester_type::shared_pointer& requester); + virtual ~SharedChannel(); + + virtual void destroy() OVERRIDE FINAL; + + virtual std::tr1::shared_ptr getProvider() OVERRIDE FINAL; + virtual std::string getRemoteAddress() OVERRIDE FINAL; + virtual std::string getChannelName() OVERRIDE FINAL; + virtual std::tr1::shared_ptr getChannelRequester() OVERRIDE FINAL; + + virtual void getField(pva::GetFieldRequester::shared_pointer const & requester,std::string const & subField) OVERRIDE FINAL; + + virtual pva::ChannelPut::shared_pointer createChannelPut( + pva::ChannelPutRequester::shared_pointer const & requester, + pvd::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL; + + virtual pva::ChannelRPC::shared_pointer createChannelRPC( + pva::ChannelRPCRequester::shared_pointer const & requester, + pvd::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL; + + virtual pva::Monitor::shared_pointer createMonitor( + pva::MonitorRequester::shared_pointer const & requester, + pvd::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL; +}; + +struct SharedMonitorFIFO : public pva::MonitorFIFO +{ + const std::tr1::shared_ptr channel; + SharedMonitorFIFO(const std::tr1::shared_ptr& channel, + const requester_type::shared_pointer& requester, + const pvd::PVStructure::const_shared_pointer &pvRequest); + virtual ~SharedMonitorFIFO(); +}; + +struct SharedPut : public pva::ChannelPut, + public std::tr1::enable_shared_from_this +{ + const std::tr1::shared_ptr channel; + const requester_type::weak_pointer requester; + const pvd::PVStructure::const_shared_pointer pvRequest; + + static size_t num_instances; + + SharedPut(const std::tr1::shared_ptr& channel, + const requester_type::shared_pointer& requester, + const pvd::PVStructure::const_shared_pointer &pvRequest); + virtual ~SharedPut(); + + virtual void destroy() OVERRIDE FINAL; + virtual std::tr1::shared_ptr getChannel() OVERRIDE FINAL; + virtual void cancel() OVERRIDE FINAL; + virtual void lastRequest() OVERRIDE FINAL; + + virtual void put( + epics::pvData::PVStructure::shared_pointer const & pvPutStructure, + epics::pvData::BitSet::shared_pointer const & putBitSet) OVERRIDE FINAL; + + virtual void get() OVERRIDE FINAL; +}; + +struct SharedRPC : public pva::ChannelRPC, + public std::tr1::enable_shared_from_this +{ + const std::tr1::shared_ptr channel; + const requester_type::weak_pointer requester; + const pvd::PVStructure::const_shared_pointer pvRequest; + + static size_t num_instances; + + SharedRPC(const std::tr1::shared_ptr& channel, + const requester_type::shared_pointer& requester, + const pvd::PVStructure::const_shared_pointer &pvRequest); + virtual ~SharedRPC(); + + virtual void destroy() OVERRIDE FINAL; + virtual std::tr1::shared_ptr getChannel() OVERRIDE FINAL; + virtual void cancel() OVERRIDE FINAL; + virtual void lastRequest() OVERRIDE FINAL; + + virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument) OVERRIDE FINAL; +}; + +struct Operation::Impl +{ + static size_t num_instances; + + epicsMutex mutex; + + const pvd::PVStructure::const_shared_pointer pvRequest, value; + const pvd::BitSet changed; + + bool done; + int debugLvl; + + Impl(const pvd::PVStructure::const_shared_pointer& pvRequest, + const pvd::PVStructure::const_shared_pointer& value, + const pvd::BitSet& changed, + int debugLvl = 0) + :pvRequest(pvRequest), value(value), changed(changed), done(false), debugLvl(debugLvl) + {} + virtual ~Impl() {} + + virtual pva::Channel::shared_pointer getChannel() =0; + virtual pva::ChannelBaseRequester::shared_pointer getRequester() =0; + virtual void complete(const pvd::Status& sts, + const epics::pvData::PVStructure* value) =0; + + struct Cleanup { + void operator()(Impl*); + }; +}; + +} // namespace pvas + +#endif // SHAREDSTATEIMPL_H diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index e3d6059..4f15079 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -31,6 +31,10 @@ TESTPROD_HOST += testmonitorfifo testmonitorfifo_SRCS += testmonitorfifo.cpp TESTS += testmonitorfifo +TESTPROD_HOST += testsharedstate +testsharedstate_SRCS += testsharedstate.cpp +TESTS += testsharedstate + PROD_HOST += testServer testServer_SRCS += testServer.cpp diff --git a/testApp/remote/testsharedstate.cpp b/testApp/remote/testsharedstate.cpp new file mode 100644 index 0000000..85581e2 --- /dev/null +++ b/testApp/remote/testsharedstate.cpp @@ -0,0 +1,203 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ + +#include +#include + +#include +#include +#include +//#include + +namespace pvd = epics::pvData; +namespace pva = epics::pvAccess; + +namespace { + + +const pvd::StructureConstPtr type(pvd::getFieldCreate()->createFieldBuilder() + ->add("value", pvd::pvInt) + ->createStructure()); + +void testNoClient() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + + pvas::SharedPV::shared_pointer pv(pvas::SharedPV::buildReadOnly()); + + pvd::PVStructurePtr inst(pvd::getPVDataCreate()->createPVStructure(type)); + pvd::BitSet changed; + + testThrows(std::logic_error, pv->post(*inst, changed)); // not open()'d + + pv->close(); // close while closed is a no-op + + testThrows(std::logic_error, pv->build()); // not open()'d +} + +void testGetMon() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + + std::tr1::shared_ptr prov(new pvas::StaticProvider("test")); + std::tr1::shared_ptr pv(pvas::SharedPV::buildReadOnly()); + + prov->add("pv:name", pv); + + pv->open(type); + + pvd::PVStructurePtr inst(pvd::getPVDataCreate()->createPVStructure(type)); + pvd::BitSet changed; + pvd::PVScalarPtr value(inst->getSubFieldT("value")); + value->putFrom(42); + changed.set(value->getFieldOffset()); + + pv->post(*inst, changed); + + pvac::ClientProvider cli(prov->provider()); + + pvac::ClientChannel chan(cli.connect("pv:name")); + + pvac::MonitorSync mon(chan.monitor()); + + { + pvd::PVStructure::const_shared_pointer R(chan.get()); + + testEqual(R->getSubFieldT("value")->getAs(), 42u); + } + + testOk1(mon.test()); + testEqual(mon.event.event, pvac::MonitorEvent::Data); + + { + bool poll = mon.poll(); + testOk1(poll); + if(poll) { + testEqual(mon.root->getSubFieldT("value")->getAs(), 42u); + } else { + testSkip(1, "No data"); + } + + } + + testOk1(!mon.test()); + testOk1(!mon.poll()); + + value->putFrom(43); + pv->post(*inst, changed); + + testOk1(mon.test()); + + { + bool poll = mon.poll(); + testOk1(poll); + if(poll) { + testEqual(mon.root->getSubFieldT("value")->getAs(), 43u); + } else { + testSkip(1, "No data"); + } + + } + + testOk1(!mon.test()); + testOk1(!mon.poll()); +} + +void testPutRPCCancel() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + + std::tr1::shared_ptr prov(new pvas::StaticProvider("test")); + std::tr1::shared_ptr pv(pvas::SharedPV::buildReadOnly()); + + prov->add("pv:name", pv); + + pv->open(type); + + pvac::ClientProvider cli(prov->provider()); + + pvac::ClientChannel chan(cli.connect("pv:name")); + + testThrows(std::runtime_error, chan.put() + .set("value", 44u) + .exec()); + + { + pvd::PVStructurePtr inst(pvd::getPVDataCreate()->createPVStructure(type)); + testThrows(std::runtime_error, chan.rpc(1.0, inst)) + } +} + +struct TestPutRPCHandler : public pvas::SharedPV::Handler +{ + virtual ~TestPutRPCHandler() {} + virtual void onPut(const pvas::SharedPV::shared_pointer& pv, pvas::Operation& op) OVERRIDE FINAL + { + pv->post(op.value(), op.changed()); + op.complete(); + } + virtual void onRPC(const pvas::SharedPV::shared_pointer& pv, pvas::Operation& op) OVERRIDE FINAL + { + pvd::PVStructurePtr reply(pvd::getPVDataCreate()->createPVStructure(type)); + reply->getSubFieldT("value")->putFrom(100); + op.complete(*reply, pvd::BitSet()); + } +}; + +void testPutRPC() +{ + testDiag("==== %s ====", CURRENT_FUNCTION); + + std::tr1::shared_ptr prov(new pvas::StaticProvider("test")); + std::tr1::shared_ptr handler(new TestPutRPCHandler); + std::tr1::shared_ptr pv(pvas::SharedPV::build(handler)); + + prov->add("pv:name", pv); + + pv->open(type); + + pvac::ClientProvider cli(prov->provider()); + + pvac::ClientChannel chan(cli.connect("pv:name")); + + { + pvd::PVStructure::const_shared_pointer R(chan.get()); + + testEqual(R->getSubFieldT("value")->getAs(), 0u); + } + + chan.put() + .set("value", 44u) + .exec(); + + { + pvd::PVStructure::const_shared_pointer R(chan.get()); + + testEqual(R->getSubFieldT("value")->getAs(), 44u); + } + + pvd::PVStructurePtr arg(pvd::getPVDataCreate()->createPVStructure(type)); + arg->getSubFieldT("value")->putFrom(50); + + pvd::PVStructure::const_shared_pointer reply(chan.rpc(1.0, arg)); + + testEqual(reply->getSubFieldT("value")->getAs(), 100u); +} + +} // namespace + +MAIN(testsharedstate) +{ + testPlan(19); + try { + testNoClient(); + testGetMon(); + testPutRPCCancel(); + testPutRPC(); + }catch(std::exception& e){ + testAbort("Unexpected exception: %s", e.what()); + } + return testDone(); +}