From f7f4822c086928e4dfc942c9e66b867b820daaa1 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 19 Sep 2018 14:27:05 -0700 Subject: [PATCH 01/12] pva/client.h separate Get and Put Apparently some servers (pvDatabase) have different behavior for ChannelGet::get() and ChannelPut::get() . ClientChannel::get() now uses ChannelGet::get() while ClientChannel::put() will automatically do a ChannelPut::get() and pass the result to the putBuilder callback. --- src/client/Makefile | 1 + src/client/client.cpp | 1 + src/client/clientGet.cpp | 118 ++++---------------- src/client/clientPut.cpp | 233 +++++++++++++++++++++++++++++++++++++++ src/client/clientpvt.h | 1 + src/client/pva/client.h | 20 +++- 6 files changed, 277 insertions(+), 97 deletions(-) create mode 100644 src/client/clientPut.cpp diff --git a/src/client/Makefile b/src/client/Makefile index 891547b..920a2be 100644 --- a/src/client/Makefile +++ b/src/client/Makefile @@ -11,5 +11,6 @@ pvAccess_SRCS += monitor.cpp pvAccess_SRCS += client.cpp pvAccess_SRCS += clientSync.cpp pvAccess_SRCS += clientGet.cpp +pvAccess_SRCS += clientPut.cpp pvAccess_SRCS += clientRPC.cpp pvAccess_SRCS += clientMonitor.cpp diff --git a/src/client/client.cpp b/src/client/client.cpp index cdb374b..4461feb 100644 --- a/src/client/client.cpp +++ b/src/client/client.cpp @@ -219,6 +219,7 @@ void register_reftrack() // done is an optimization, duplicate calls to registerRef* are no-ops pvac::detail::registerRefTrack(); pvac::detail::registerRefTrackGet(); + pvac::detail::registerRefTrackPut(); pvac::detail::registerRefTrackMonitor(); pvac::detail::registerRefTrackRPC(); } diff --git a/src/client/clientGet.cpp b/src/client/clientGet.cpp index 0bbdb63..cb7cbc8 100644 --- a/src/client/clientGet.cpp +++ b/src/client/clientGet.cpp @@ -24,9 +24,9 @@ typedef epicsGuardRelease UnGuard; namespace { -struct GetPutter : public pva::ChannelPutRequester, +struct Getter : public pva::ChannelGetRequester, public pvac::Operation::Impl, - public pvac::detail::wrapped_shared_from_this + public pvac::detail::wrapped_shared_from_this { mutable epicsMutex mutex; @@ -34,34 +34,23 @@ struct GetPutter : public pva::ChannelPutRequester, operation_type::shared_pointer op; pvac::ClientChannel::GetCallback *getcb; - pvac::ClientChannel::PutCallback *putcb; pvac::GetEvent event; static size_t num_instances; - explicit GetPutter(pvac::ClientChannel::GetCallback* cb) :started(false), getcb(cb), putcb(0) + explicit Getter(pvac::ClientChannel::GetCallback* cb) :started(false), getcb(cb) {REFTRACE_INCREMENT(num_instances);} - explicit GetPutter(pvac::ClientChannel::PutCallback* cb) :started(false), getcb(0), putcb(cb) - {REFTRACE_INCREMENT(num_instances);} - virtual ~GetPutter() {REFTRACE_DECREMENT(num_instances);} + virtual ~Getter() {REFTRACE_DECREMENT(num_instances);} void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) { - if(!putcb && !getcb) return; + if(!getcb) return; event.event = evt; - if(putcb) { - pvac::ClientChannel::PutCallback *cb=putcb; - putcb = 0; - UnGuard U(G); - cb->putDone(event); - } - if(getcb) { - pvac::ClientChannel::GetCallback *cb=getcb; - getcb = 0; - UnGuard U(G); - cb->getDone(event); - } + pvac::ClientChannel::GetCallback *cb=getcb; + getcb = 0; + UnGuard U(G); + cb->getDone(event); } virtual std::string name() const OVERRIDE FINAL @@ -74,7 +63,7 @@ struct GetPutter : public pva::ChannelPutRequester, virtual void cancel() OVERRIDE FINAL { // keepalive for safety in case callback wants to destroy us - std::tr1::shared_ptr keepalive(internal_shared_from_this()); + std::tr1::shared_ptr keepalive(internal_shared_from_this()); Guard G(mutex); if(started && op) op->cancel(); callEvent(G, pvac::GetEvent::Cancel); @@ -86,15 +75,15 @@ struct GetPutter : public pva::ChannelPutRequester, return op ? op->getChannel()->getRequesterName() : ""; } - virtual void channelPutConnect( + virtual void channelGetConnect( const epics::pvData::Status& status, - pva::ChannelPut::shared_pointer const & channelPut, + pva::ChannelGet::shared_pointer const & channelGet, epics::pvData::Structure::const_shared_pointer const & structure) OVERRIDE FINAL { - std::tr1::shared_ptr keepalive(internal_shared_from_this()); + std::tr1::shared_ptr keepalive(internal_shared_from_this()); Guard G(mutex); if(started) return; - if(!putcb && !getcb) return; + if(!getcb) return; if(!status.isOK()) { event.message = status.getMessage(); @@ -104,34 +93,10 @@ struct GetPutter : public pva::ChannelPutRequester, if(!status.isSuccess()) { callEvent(G); - } else if(getcb){ - channelPut->get(); + } else { + channelGet->get(); started = true; - } else if(putcb){ - pvac::ClientChannel::PutCallback *cb(putcb); - pvd::BitSet::shared_pointer tosend(new pvd::BitSet); - pvac::ClientChannel::PutCallback::Args args(*tosend); - try { - UnGuard U(G); - cb->putBuild(structure, args); - if(!args.root) - throw std::logic_error("No put value provided"); - else if(*args.root->getStructure()!=*structure) - throw std::logic_error("Provided put value with wrong type"); - }catch(std::exception& e){ - if(putcb) { - event.message = e.what(); - callEvent(G); - } else { - LOG(pva::logLevelInfo, "Lost exception in %s: %s", CURRENT_FUNCTION, e.what()); - } - } - // check putcb again after UnGuard - if(putcb) { - channelPut->put(std::tr1::const_pointer_cast(args.root), tosend); - started = true; - } } } @@ -143,30 +108,13 @@ struct GetPutter : public pva::ChannelPutRequester, callEvent(G); } - virtual void putDone( - const epics::pvData::Status& status, - pva::ChannelPut::shared_pointer const & channelPut) OVERRIDE FINAL - { - std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); - if(!putcb) return; - - if(!status.isOK()) { - event.message = status.getMessage(); - } else { - event.message.clear(); - } - - callEvent(G, status.isSuccess()? pvac::GetEvent::Success : pvac::GetEvent::Fail); - } - virtual void getDone( const epics::pvData::Status& status, - pva::ChannelPut::shared_pointer const & channelPut, + pva::ChannelGet::shared_pointer const & channelGet, epics::pvData::PVStructure::shared_pointer const & pvStructure, epics::pvData::BitSet::shared_pointer const & bitSet) OVERRIDE FINAL { - std::tr1::shared_ptr keepalive(internal_shared_from_this()); + std::tr1::shared_ptr keepalive(internal_shared_from_this()); Guard G(mutex); if(!getcb) return; @@ -183,13 +131,13 @@ struct GetPutter : public pva::ChannelPutRequester, virtual void show(std::ostream &strm) const { - strm << "Operation(Get/Put" + strm << "Operation(Get" "\"" << name() <<"\"" ")"; } }; -size_t GetPutter::num_instances; +size_t Getter::num_instances; } //namespace @@ -203,42 +151,22 @@ ClientChannel::get(ClientChannel::GetCallback* cb, if(!pvRequest) pvRequest = pvd::createRequest("field()"); - std::tr1::shared_ptr ret(GetPutter::build(cb)); + std::tr1::shared_ptr ret(Getter::build(cb)); { Guard G(ret->mutex); - ret->op = getChannel()->createChannelPut(ret->internal_shared_from_this(), + ret->op = getChannel()->createChannelGet(ret->internal_shared_from_this(), std::tr1::const_pointer_cast(pvRequest)); } return Operation(ret); } -Operation -ClientChannel::put(PutCallback* cb, - epics::pvData::PVStructure::const_shared_pointer pvRequest) -{ - if(!impl) throw std::logic_error("Dead Channel"); - if(!pvRequest) - pvRequest = pvd::createRequest("field()"); - - std::tr1::shared_ptr ret(GetPutter::build(cb)); - - { - Guard G(ret->mutex); - ret->op = getChannel()->createChannelPut(ret->internal_shared_from_this(), - std::tr1::const_pointer_cast(pvRequest)); - } - - return Operation(ret); - -} - namespace detail { void registerRefTrackGet() { - epics::registerRefCounter("pvac::GetPutter", &GetPutter::num_instances); + epics::registerRefCounter("pvac::Getter", &Getter::num_instances); } } diff --git a/src/client/clientPut.cpp b/src/client/clientPut.cpp new file mode 100644 index 0000000..c63c771 --- /dev/null +++ b/src/client/clientPut.cpp @@ -0,0 +1,233 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ + +#include +#include +#include + +#include +#include +#include +#include + +#define epicsExportSharedSymbols +#include "pv/logger.h" +#include "clientpvt.h" +#include "pv/pvAccess.h" + +namespace pvd = epics::pvData; +namespace pva = epics::pvAccess; +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + +namespace { + +struct Putter : public pva::ChannelPutRequester, + public pvac::Operation::Impl, + public pvac::detail::wrapped_shared_from_this +{ + mutable epicsMutex mutex; + + const bool getcurrent; + + bool started; // whether the put() has actually been sent. After which point we can't safely re-try. + operation_type::shared_pointer op; + pvd::StructureConstPtr puttype; + + pvac::ClientChannel::PutCallback *putcb; + pvac::GetEvent event; + + static size_t num_instances; + + Putter(pvac::ClientChannel::PutCallback* cb, bool getcurrent) :getcurrent(getcurrent), started(false), putcb(cb) + {REFTRACE_INCREMENT(num_instances);} + virtual ~Putter() {REFTRACE_DECREMENT(num_instances);} + + void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) + { + if(!putcb) return; + + event.event = evt; + pvac::ClientChannel::PutCallback *cb=putcb; + putcb = 0; + UnGuard U(G); + cb->putDone(event); + } + + virtual std::string name() const OVERRIDE FINAL + { + Guard G(mutex); + return op ? op->getChannel()->getChannelName() : ""; + } + + // called automatically via wrapped_shared_from_this + virtual void cancel() OVERRIDE FINAL + { + // keepalive for safety in case callback wants to destroy us + std::tr1::shared_ptr keepalive(internal_shared_from_this()); + Guard G(mutex); + if(started && op) op->cancel(); + callEvent(G, pvac::GetEvent::Cancel); + } + + virtual std::string getRequesterName() OVERRIDE FINAL + { + Guard G(mutex); + return op ? op->getChannel()->getRequesterName() : ""; + } + + virtual void channelPutConnect( + const epics::pvData::Status& status, + pva::ChannelPut::shared_pointer const & channelPut, + epics::pvData::Structure::const_shared_pointer const & structure) OVERRIDE FINAL + { + std::tr1::shared_ptr keepalive(internal_shared_from_this()); + Guard G(mutex); + op = channelPut; // we may be called before createChannelPut() has returned. + puttype = structure; + if(started || !putcb) return; + + if(!status.isOK()) { + event.message = status.getMessage(); + } else { + event.message.clear(); + } + if(!status.isSuccess()) { + callEvent(G); + + } else if(getcurrent) { + // fetch a previous value first + op->get(); + } else { + // build Put value immediately + pvd::BitSet empty; + pvd::BitSet::shared_pointer tosend(new pvd::BitSet); + pvac::ClientChannel::PutCallback::Args args(*tosend, empty); + // args.previous = 0; // implied + doPut(G, args, channelPut, tosend); + } + } + + virtual void channelDisconnect(bool destroy) OVERRIDE FINAL + { + Guard G(mutex); + event.message = "Disconnect"; + + callEvent(G); + } + + void doPut(Guard& G, + pvac::ClientChannel::PutCallback::Args& args, + pva::ChannelPut::shared_pointer const & channelPut, + const pvd::BitSet::shared_pointer& tosend) + { + try { + pvac::ClientChannel::PutCallback *cb(putcb); + UnGuard U(G); + cb->putBuild(puttype, args); + if(!args.root) + throw std::logic_error("No put value provided"); + else if(*args.root->getStructure()!=*puttype) + throw std::logic_error("Provided put value with wrong type"); + }catch(std::exception& e){ + if(putcb) { + event.message = e.what(); + callEvent(G); + } else { + LOG(pva::logLevelInfo, "Lost exception in %s: %s", CURRENT_FUNCTION, e.what()); + } + } + // check putcb again after UnGuard + if(putcb) { + channelPut->put(std::tr1::const_pointer_cast(args.root), tosend); + started = true; + } + } + + virtual void getDone( + const epics::pvData::Status& status, + pva::ChannelPut::shared_pointer const & channelPut, + epics::pvData::PVStructure::shared_pointer const & pvStructure, + epics::pvData::BitSet::shared_pointer const & bitSet) OVERRIDE FINAL + { + std::tr1::shared_ptr keepalive(internal_shared_from_this()); + Guard G(mutex); + if(!putcb) return; + + if(!status.isOK()) { + event.message = status.getMessage(); + + callEvent(G, pvac::GetEvent::Fail); + + } else { + pvd::BitSet::shared_pointer tosend(new pvd::BitSet); + pvac::ClientChannel::PutCallback::Args args(*tosend, *bitSet); + args.previous = pvStructure; + doPut(G, args, channelPut, tosend); + } + } + + virtual void putDone( + const epics::pvData::Status& status, + pva::ChannelPut::shared_pointer const & channelPut) OVERRIDE FINAL + { + std::tr1::shared_ptr keepalive(internal_shared_from_this()); + Guard G(mutex); + if(!putcb) return; + + if(!status.isOK()) { + event.message = status.getMessage(); + } else { + event.message.clear(); + } + + callEvent(G, status.isSuccess()? pvac::GetEvent::Success : pvac::GetEvent::Fail); + } + + virtual void show(std::ostream &strm) const + { + strm << "Operation(Put" + "\"" << name() <<"\"" + ")"; + } +}; + +size_t Putter::num_instances; + +} //namespace + +namespace pvac { + +Operation +ClientChannel::put(PutCallback* cb, + epics::pvData::PVStructure::const_shared_pointer pvRequest, + bool getcurrent) +{ + if(!impl) throw std::logic_error("Dead Channel"); + if(!pvRequest) + pvRequest = pvd::createRequest("field()"); + + std::tr1::shared_ptr ret(Putter::build(cb, getcurrent)); + + { + Guard G(ret->mutex); + ret->op = getChannel()->createChannelPut(ret->internal_shared_from_this(), + std::tr1::const_pointer_cast(pvRequest)); + } + + return Operation(ret); + +} + +namespace detail { + +void registerRefTrackPut() +{ + epics::registerRefCounter("pvac::Putter", &Putter::num_instances); +} + +} + +}//namespace pvac diff --git a/src/client/clientpvt.h b/src/client/clientpvt.h index 61f3c11..c2b057d 100644 --- a/src/client/clientpvt.h +++ b/src/client/clientpvt.h @@ -67,6 +67,7 @@ public: void registerRefTrack(); void registerRefTrackGet(); +void registerRefTrackPut(); void registerRefTrackMonitor(); void registerRefTrackRPC(); diff --git a/src/client/pva/client.h b/src/client/pva/client.h index 5b43878..702377d 100644 --- a/src/client/pva/client.h +++ b/src/client/pva/client.h @@ -344,9 +344,20 @@ public: struct PutCallback { virtual ~PutCallback() {} struct Args { - Args(epics::pvData::BitSet& tosend) :tosend(tosend) {} + Args(epics::pvData::BitSet& tosend, epics::pvData::BitSet& previousmask) :tosend(tosend), previousmask(previousmask) {} + //! Callee must fill this in with an instance of the Structure passed as the 'build' argument. epics::pvData::PVStructure::const_shared_pointer root; + //! Callee must set bits corresponding to the fields of 'root' which will actually be sent. epics::pvData::BitSet& tosend; + //! A previous value of the PV being "put" when put(..., getprevious=true). eg. use to find enumeration value. + //! Otherwise NULL. + //! @note The value of the PV may change between the point where "previous" is fetched, + //! and when this Put operation completes. + //! @since 6.1.0 Added after 6.0.0 + epics::pvData::PVStructure::const_shared_pointer previous; + //! Bit mask indicating those fields of 'previous' which have been set by the server. (others have local defaults) + //! Unused if previous==NULL. + const epics::pvData::BitSet& previousmask; }; /** Server provides expected structure. * @@ -362,8 +373,13 @@ public: //! Initiate request to change PV //! @param cb Completion notification callback. Must outlive Operation (call Operation::cancel() to force release) + //! @param pvRequest if NULL defaults to "field()". + //! @param getprevious If true, fetch a previous value of the PV and make + //! this available as PutCallback::Args::previous and previousmask. + //! If false, then previous=NULL Operation put(PutCallback* cb, - epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer()); + epics::pvData::PVStructure::const_shared_pointer pvRequest = epics::pvData::PVStructure::const_shared_pointer(), + bool getcurrent = false); //! Synchronious put operation inline From d01421bf3002996bd2b5452eca4148dd778ab0ac Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 25 Sep 2018 16:44:10 -0700 Subject: [PATCH 02/12] client.h cleanup --- src/client/clientGet.cpp | 26 +++++++++----------------- src/client/clientMonitor.cpp | 5 ----- src/client/clientPut.cpp | 33 ++++++++++++++------------------- src/client/clientRPC.cpp | 5 ----- src/client/clientpvt.h | 5 +++++ 5 files changed, 28 insertions(+), 46 deletions(-) diff --git a/src/client/clientGet.cpp b/src/client/clientGet.cpp index cb7cbc8..9091b03 100644 --- a/src/client/clientGet.cpp +++ b/src/client/clientGet.cpp @@ -17,11 +17,6 @@ #include "clientpvt.h" #include "pv/pvAccess.h" -namespace pvd = epics::pvData; -namespace pva = epics::pvAccess; -typedef epicsGuard Guard; -typedef epicsGuardRelease UnGuard; - namespace { struct Getter : public pva::ChannelGetRequester, @@ -30,27 +25,26 @@ struct Getter : public pva::ChannelGetRequester, { mutable epicsMutex mutex; - bool started; operation_type::shared_pointer op; - pvac::ClientChannel::GetCallback *getcb; + pvac::ClientChannel::GetCallback *cb; pvac::GetEvent event; static size_t num_instances; - explicit Getter(pvac::ClientChannel::GetCallback* cb) :started(false), getcb(cb) + explicit Getter(pvac::ClientChannel::GetCallback* cb) :cb(cb) {REFTRACE_INCREMENT(num_instances);} virtual ~Getter() {REFTRACE_DECREMENT(num_instances);} void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) { - if(!getcb) return; + if(!cb) return; event.event = evt; - pvac::ClientChannel::GetCallback *cb=getcb; - getcb = 0; + pvac::ClientChannel::GetCallback *C=cb; + cb = 0; UnGuard U(G); - cb->getDone(event); + C->getDone(event); } virtual std::string name() const OVERRIDE FINAL @@ -65,7 +59,7 @@ struct Getter : public pva::ChannelGetRequester, // keepalive for safety in case callback wants to destroy us std::tr1::shared_ptr keepalive(internal_shared_from_this()); Guard G(mutex); - if(started && op) op->cancel(); + if(op) op->cancel(); callEvent(G, pvac::GetEvent::Cancel); } @@ -82,8 +76,7 @@ struct Getter : public pva::ChannelGetRequester, { std::tr1::shared_ptr keepalive(internal_shared_from_this()); Guard G(mutex); - if(started) return; - if(!getcb) return; + if(!cb) return; if(!status.isOK()) { event.message = status.getMessage(); @@ -95,7 +88,6 @@ struct Getter : public pva::ChannelGetRequester, } else { channelGet->get(); - started = true; } } @@ -116,7 +108,7 @@ struct Getter : public pva::ChannelGetRequester, { std::tr1::shared_ptr keepalive(internal_shared_from_this()); Guard G(mutex); - if(!getcb) return; + if(!cb) return; if(!status.isOK()) { event.message = status.getMessage(); diff --git a/src/client/clientMonitor.cpp b/src/client/clientMonitor.cpp index 2dd6c3c..f285904 100644 --- a/src/client/clientMonitor.cpp +++ b/src/client/clientMonitor.cpp @@ -16,11 +16,6 @@ #include "clientpvt.h" #include "pv/pvAccess.h" -namespace pvd = epics::pvData; -namespace pva = epics::pvAccess; -typedef epicsGuard Guard; -typedef epicsGuardRelease UnGuard; - namespace pvac { struct Monitor::Impl : public pva::MonitorRequester, diff --git a/src/client/clientPut.cpp b/src/client/clientPut.cpp index c63c771..d70973f 100644 --- a/src/client/clientPut.cpp +++ b/src/client/clientPut.cpp @@ -17,11 +17,6 @@ #include "clientpvt.h" #include "pv/pvAccess.h" -namespace pvd = epics::pvData; -namespace pva = epics::pvAccess; -typedef epicsGuard Guard; -typedef epicsGuardRelease UnGuard; - namespace { struct Putter : public pva::ChannelPutRequester, @@ -36,24 +31,24 @@ struct Putter : public pva::ChannelPutRequester, operation_type::shared_pointer op; pvd::StructureConstPtr puttype; - pvac::ClientChannel::PutCallback *putcb; + pvac::ClientChannel::PutCallback *cb; pvac::GetEvent event; static size_t num_instances; - Putter(pvac::ClientChannel::PutCallback* cb, bool getcurrent) :getcurrent(getcurrent), started(false), putcb(cb) + Putter(pvac::ClientChannel::PutCallback* cb, bool getcurrent) :getcurrent(getcurrent), started(false), cb(cb) {REFTRACE_INCREMENT(num_instances);} virtual ~Putter() {REFTRACE_DECREMENT(num_instances);} void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) { - if(!putcb) return; + if(!cb) return; event.event = evt; - pvac::ClientChannel::PutCallback *cb=putcb; - putcb = 0; + pvac::ClientChannel::PutCallback *C=cb; + cb = 0; UnGuard U(G); - cb->putDone(event); + C->putDone(event); } virtual std::string name() const OVERRIDE FINAL @@ -87,7 +82,7 @@ struct Putter : public pva::ChannelPutRequester, Guard G(mutex); op = channelPut; // we may be called before createChannelPut() has returned. puttype = structure; - if(started || !putcb) return; + if(started || !cb) return; if(!status.isOK()) { event.message = status.getMessage(); @@ -124,23 +119,23 @@ struct Putter : public pva::ChannelPutRequester, const pvd::BitSet::shared_pointer& tosend) { try { - pvac::ClientChannel::PutCallback *cb(putcb); + pvac::ClientChannel::PutCallback *C(cb); UnGuard U(G); - cb->putBuild(puttype, args); + C->putBuild(puttype, args); if(!args.root) throw std::logic_error("No put value provided"); else if(*args.root->getStructure()!=*puttype) throw std::logic_error("Provided put value with wrong type"); }catch(std::exception& e){ - if(putcb) { + if(cb) { event.message = e.what(); callEvent(G); } else { LOG(pva::logLevelInfo, "Lost exception in %s: %s", CURRENT_FUNCTION, e.what()); } } - // check putcb again after UnGuard - if(putcb) { + // check cb again after UnGuard + if(cb) { channelPut->put(std::tr1::const_pointer_cast(args.root), tosend); started = true; } @@ -154,7 +149,7 @@ struct Putter : public pva::ChannelPutRequester, { std::tr1::shared_ptr keepalive(internal_shared_from_this()); Guard G(mutex); - if(!putcb) return; + if(!cb) return; if(!status.isOK()) { event.message = status.getMessage(); @@ -175,7 +170,7 @@ struct Putter : public pva::ChannelPutRequester, { std::tr1::shared_ptr keepalive(internal_shared_from_this()); Guard G(mutex); - if(!putcb) return; + if(!cb) return; if(!status.isOK()) { event.message = status.getMessage(); diff --git a/src/client/clientRPC.cpp b/src/client/clientRPC.cpp index ff52a3e..12b06ef 100644 --- a/src/client/clientRPC.cpp +++ b/src/client/clientRPC.cpp @@ -15,11 +15,6 @@ #include "clientpvt.h" #include "pv/pvAccess.h" -namespace pvd = epics::pvData; -namespace pva = epics::pvAccess; -typedef epicsGuard Guard; -typedef epicsGuardRelease UnGuard; - namespace { struct RPCer : public pva::ChannelRPCRequester, diff --git a/src/client/clientpvt.h b/src/client/clientpvt.h index c2b057d..402c655 100644 --- a/src/client/clientpvt.h +++ b/src/client/clientpvt.h @@ -5,6 +5,11 @@ #include +namespace pvd = epics::pvData; +namespace pva = epics::pvAccess; +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + namespace pvac{namespace detail{ /* Like std::tr1::enable_shared_from_this * with the notion of internal vs. external references. From 855335f3b13f4df069a3f705bcd17944514b701c Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 25 Sep 2018 18:05:00 -0700 Subject: [PATCH 03/12] client.h guard concurrent callbacks on cancel cancel() blocks until concurrent callbacks have completed. Also serializes any callbacks for each Operation/Monitor. --- src/client/clientGet.cpp | 31 ++++++++----- src/client/clientMonitor.cpp | 31 ++++++++----- src/client/clientPut.cpp | 39 +++++++++------- src/client/clientRPC.cpp | 42 ++++++++---------- src/client/clientpvt.h | 86 ++++++++++++++++++++++++++++++++++++ 5 files changed, 166 insertions(+), 63 deletions(-) diff --git a/src/client/clientGet.cpp b/src/client/clientGet.cpp index 9091b03..6314db0 100644 --- a/src/client/clientGet.cpp +++ b/src/client/clientGet.cpp @@ -18,13 +18,14 @@ #include "pv/pvAccess.h" namespace { +using pvac::detail::CallbackGuard; +using pvac::detail::CallbackUse; -struct Getter : public pva::ChannelGetRequester, - public pvac::Operation::Impl, - public pvac::detail::wrapped_shared_from_this +struct Getter : public pvac::detail::CallbackStorage, + public pva::ChannelGetRequester, + public pvac::Operation::Impl, + public pvac::detail::wrapped_shared_from_this { - mutable epicsMutex mutex; - operation_type::shared_pointer op; pvac::ClientChannel::GetCallback *cb; @@ -34,16 +35,21 @@ struct Getter : public pva::ChannelGetRequester, explicit Getter(pvac::ClientChannel::GetCallback* cb) :cb(cb) {REFTRACE_INCREMENT(num_instances);} - virtual ~Getter() {REFTRACE_DECREMENT(num_instances);} + virtual ~Getter() { + CallbackGuard G(*this); + cb = 0; + G.wait(); // paranoia + REFTRACE_DECREMENT(num_instances); + } - void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) + void callEvent(CallbackGuard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) { if(!cb) return; event.event = evt; pvac::ClientChannel::GetCallback *C=cb; cb = 0; - UnGuard U(G); + CallbackUse U(G); C->getDone(event); } @@ -58,9 +64,10 @@ struct Getter : public pva::ChannelGetRequester, { // keepalive for safety in case callback wants to destroy us std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(op) op->cancel(); callEvent(G, pvac::GetEvent::Cancel); + G.wait(); } virtual std::string getRequesterName() OVERRIDE FINAL @@ -75,7 +82,7 @@ struct Getter : public pva::ChannelGetRequester, epics::pvData::Structure::const_shared_pointer const & structure) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb) return; if(!status.isOK()) { @@ -94,7 +101,7 @@ struct Getter : public pva::ChannelGetRequester, virtual void channelDisconnect(bool destroy) OVERRIDE FINAL { - Guard G(mutex); + CallbackGuard G(*this); event.message = "Disconnect"; callEvent(G); @@ -107,7 +114,7 @@ struct Getter : public pva::ChannelGetRequester, epics::pvData::BitSet::shared_pointer const & bitSet) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb) return; if(!status.isOK()) { diff --git a/src/client/clientMonitor.cpp b/src/client/clientMonitor.cpp index f285904..7a8fa14 100644 --- a/src/client/clientMonitor.cpp +++ b/src/client/clientMonitor.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -17,11 +18,13 @@ #include "pv/pvAccess.h" namespace pvac { +using pvac::detail::CallbackGuard; +using pvac::detail::CallbackUse; -struct Monitor::Impl : public pva::MonitorRequester, +struct Monitor::Impl : public pvac::detail::CallbackStorage, + public pva::MonitorRequester, public pvac::detail::wrapped_shared_from_this { - mutable epicsMutex mutex; pva::Channel::shared_pointer chan; operation_type::shared_pointer op; bool started, done, seenEmpty; @@ -39,9 +42,14 @@ struct Monitor::Impl : public pva::MonitorRequester, ,seenEmpty(false) ,cb(cb) {REFTRACE_INCREMENT(num_instances);} - virtual ~Impl() {REFTRACE_DECREMENT(num_instances);} + virtual ~Impl() { + CallbackGuard G(*this); + cb = 0; + G.wait(); // paranoia + REFTRACE_DECREMENT(num_instances); + } - void callEvent(Guard& G, MonitorEvent::event_t evt = MonitorEvent::Fail) + void callEvent(CallbackGuard& G, MonitorEvent::event_t evt = MonitorEvent::Fail) { ClientChannel::MonitorCallback *cb=this->cb; if(!cb) return; @@ -52,7 +60,7 @@ struct Monitor::Impl : public pva::MonitorRequester, this->cb = 0; // last event try { - UnGuard U(G); + CallbackUse U(G); cb->monitorEvent(event); return; }catch(std::exception& e){ @@ -66,7 +74,7 @@ struct Monitor::Impl : public pva::MonitorRequester, } // continues error handling try { - UnGuard U(G); + CallbackUse U(G); cb->monitorEvent(event); return; }catch(std::exception& e){ @@ -82,7 +90,7 @@ struct Monitor::Impl : public pva::MonitorRequester, // keepalive for safety in case callback wants to destroy us std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); last.reset(); @@ -93,6 +101,7 @@ struct Monitor::Impl : public pva::MonitorRequester, temp.swap(op); callEvent(G, MonitorEvent::Cancel); + G.wait(); } if(temp) temp->destroy(); @@ -110,7 +119,7 @@ struct Monitor::Impl : public pva::MonitorRequester, pvd::StructureConstPtr const & structure) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb || started || done) return; if(!status.isOK()) { @@ -140,7 +149,7 @@ struct Monitor::Impl : public pva::MonitorRequester, virtual void channelDisconnect(bool destroy) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb || done) return; event.message = "Disconnect"; started = false; @@ -150,7 +159,7 @@ struct Monitor::Impl : public pva::MonitorRequester, virtual void monitorEvent(pva::MonitorPtr const & monitor) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb || done) return; event.message.clear(); @@ -160,7 +169,7 @@ struct Monitor::Impl : public pva::MonitorRequester, virtual void unlisten(pva::MonitorPtr const & monitor) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb || done) return; done = true; diff --git a/src/client/clientPut.cpp b/src/client/clientPut.cpp index d70973f..bb481e8 100644 --- a/src/client/clientPut.cpp +++ b/src/client/clientPut.cpp @@ -18,13 +18,14 @@ #include "pv/pvAccess.h" namespace { +using pvac::detail::CallbackGuard; +using pvac::detail::CallbackUse; -struct Putter : public pva::ChannelPutRequester, - public pvac::Operation::Impl, - public pvac::detail::wrapped_shared_from_this +struct Putter : public pvac::detail::CallbackStorage, + public pva::ChannelPutRequester, + public pvac::Operation::Impl, + public pvac::detail::wrapped_shared_from_this { - mutable epicsMutex mutex; - const bool getcurrent; bool started; // whether the put() has actually been sent. After which point we can't safely re-try. @@ -38,16 +39,21 @@ struct Putter : public pva::ChannelPutRequester, Putter(pvac::ClientChannel::PutCallback* cb, bool getcurrent) :getcurrent(getcurrent), started(false), cb(cb) {REFTRACE_INCREMENT(num_instances);} - virtual ~Putter() {REFTRACE_DECREMENT(num_instances);} + virtual ~Putter() { + CallbackGuard G(*this); + cb = 0; + G.wait(); // paranoia + REFTRACE_DECREMENT(num_instances); + } - void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) + void callEvent(CallbackGuard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) { if(!cb) return; event.event = evt; pvac::ClientChannel::PutCallback *C=cb; cb = 0; - UnGuard U(G); + CallbackUse U(G); C->putDone(event); } @@ -62,9 +68,10 @@ struct Putter : public pva::ChannelPutRequester, { // keepalive for safety in case callback wants to destroy us std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(started && op) op->cancel(); callEvent(G, pvac::GetEvent::Cancel); + G.wait(); } virtual std::string getRequesterName() OVERRIDE FINAL @@ -79,7 +86,7 @@ struct Putter : public pva::ChannelPutRequester, epics::pvData::Structure::const_shared_pointer const & structure) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); op = channelPut; // we may be called before createChannelPut() has returned. puttype = structure; if(started || !cb) return; @@ -107,20 +114,20 @@ struct Putter : public pva::ChannelPutRequester, virtual void channelDisconnect(bool destroy) OVERRIDE FINAL { - Guard G(mutex); + CallbackGuard G(*this); event.message = "Disconnect"; callEvent(G); } - void doPut(Guard& G, + void doPut(CallbackGuard& G, pvac::ClientChannel::PutCallback::Args& args, pva::ChannelPut::shared_pointer const & channelPut, const pvd::BitSet::shared_pointer& tosend) { try { pvac::ClientChannel::PutCallback *C(cb); - UnGuard U(G); + CallbackUse U(G); C->putBuild(puttype, args); if(!args.root) throw std::logic_error("No put value provided"); @@ -136,8 +143,8 @@ struct Putter : public pva::ChannelPutRequester, } // check cb again after UnGuard if(cb) { - channelPut->put(std::tr1::const_pointer_cast(args.root), tosend); started = true; + channelPut->put(std::tr1::const_pointer_cast(args.root), tosend); } } @@ -148,7 +155,7 @@ struct Putter : public pva::ChannelPutRequester, epics::pvData::BitSet::shared_pointer const & bitSet) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb) return; if(!status.isOK()) { @@ -169,7 +176,7 @@ struct Putter : public pva::ChannelPutRequester, pva::ChannelPut::shared_pointer const & channelPut) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb) return; if(!status.isOK()) { diff --git a/src/client/clientRPC.cpp b/src/client/clientRPC.cpp index 12b06ef..ff6d54d 100644 --- a/src/client/clientRPC.cpp +++ b/src/client/clientRPC.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -16,13 +17,14 @@ #include "pv/pvAccess.h" namespace { +using pvac::detail::CallbackGuard; +using pvac::detail::CallbackUse; -struct RPCer : public pva::ChannelRPCRequester, +struct RPCer : public pvac::detail::CallbackStorage, + public pva::ChannelRPCRequester, public pvac::Operation::Impl, public pvac::detail::wrapped_shared_from_this { - mutable epicsMutex mutex; - bool started; operation_type::shared_pointer op; @@ -36,9 +38,14 @@ struct RPCer : public pva::ChannelRPCRequester, RPCer(pvac::ClientChannel::GetCallback* cb, const pvd::PVStructure::const_shared_pointer& args) :started(false), cb(cb), args(args) {REFTRACE_INCREMENT(num_instances);} - virtual ~RPCer() {REFTRACE_DECREMENT(num_instances);} + virtual ~RPCer() { + CallbackGuard G(*this); + cb = 0; + G.wait(); // paranoia + REFTRACE_DECREMENT(num_instances); + } - void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) + void callEvent(CallbackGuard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) { pvac::ClientChannel::GetCallback *cb=this->cb; if(!cb) return; @@ -48,24 +55,11 @@ struct RPCer : public pva::ChannelRPCRequester, this->cb = 0; try { - UnGuard U(G); + CallbackUse U(G); cb->getDone(event); return; }catch(std::exception& e){ - if(!this->cb || evt==pvac::GetEvent::Fail) { - LOG(pva::logLevelError, "Unhandled exception in ClientChannel::GetCallback::getDone(): %s", e.what()); - } else { - event.event = pvac::GetEvent::Fail; - event.message = e.what(); - } - } - // continues error handling - try { - UnGuard U(G); - cb->getDone(event); - return; - }catch(std::exception& e){ - LOG(pva::logLevelError, "Unhandled exception following exception in ClientChannel::GetCallback::monitorEvent(): %s", e.what()); + LOG(pva::logLevelError, "Unhandled exception in ClientChannel::RPCCallback::requestDone(): %s", e.what()); } } @@ -79,7 +73,7 @@ struct RPCer : public pva::ChannelRPCRequester, virtual void cancel() { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(started && op) op->cancel(); callEvent(G, pvac::GetEvent::Cancel); } @@ -95,7 +89,7 @@ struct RPCer : public pva::ChannelRPCRequester, pva::ChannelRPC::shared_pointer const & operation) { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb || started) return; if(!status.isOK()) { @@ -115,7 +109,7 @@ struct RPCer : public pva::ChannelRPCRequester, virtual void channelDisconnect(bool destroy) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); event.message = "Disconnect"; callEvent(G); @@ -127,7 +121,7 @@ struct RPCer : public pva::ChannelRPCRequester, epics::pvData::PVStructure::shared_pointer const & pvResponse) { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb) return; if(!status.isOK()) { diff --git a/src/client/clientpvt.h b/src/client/clientpvt.h index 402c655..1beb9eb 100644 --- a/src/client/clientpvt.h +++ b/src/client/clientpvt.h @@ -1,6 +1,9 @@ #ifndef CLIENTPVT_H #define CLIENTPVT_H +#include +#include + #include #include @@ -70,6 +73,89 @@ public: } }; +/** Safe use of raw callback pointer while unlocked. + * clear pointer and then call CallbackGuard::wait() to ensure that concurrent + * callback have completed. + * + * Prototype usage + @code + * struct mycb : public CallbackStorage { + * void (*ptr)(); + * }; + * // make a callback + * void docb(mycb& cb) { + * CallbackGuard G(cb); // lock + * // decide whether to make CB + * if(P){ + * void (*P)() = ptr; // copy for use while unlocked + * CallbackUse U(G); // unlock + * (*P)(); + * // automatic re-lock + * } + * // automatic final unlock + * } + * void cancelop(mycb& cb) { + * CallbackGuard G(cb); + * ptr = 0; // prevent further callbacks from starting + * G.wait(); // wait for inprogress callbacks to complete + * } + @endcode + */ +struct CallbackStorage { + mutable epicsMutex mutex; + epicsEvent wakeup; + size_t nwaitcb; + epicsThreadId incb; + CallbackStorage() :nwaitcb(0u), incb(0) {} +}; + +// analogous to epicsGuard +struct CallbackGuard { + CallbackStorage& store; + epicsThreadId self; + explicit CallbackGuard(CallbackStorage& store) :store(store), self(0) { + store.mutex.lock(); + } + ~CallbackGuard() { + bool notify = store.nwaitcb!=0; + store.mutex.unlock(); + if(notify) + store.wakeup.signal(); + } + void ensureself() { + if(!self) + self = epicsThreadGetIdSelf(); + } + // unlock and block until no in-progress callbacks + void wait() { + if(!store.incb) return; + ensureself(); + store.nwaitcb++; + while(store.incb && store.incb!=self) { + store.mutex.unlock(); + store.wakeup.wait(); + store.mutex.lock(); + } + store.nwaitcb--; + } +}; + +// analogous to epicsGuardRelease +struct CallbackUse { + CallbackGuard& G; + explicit CallbackUse(CallbackGuard& G) :G(G) { + G.wait(); // serialize callbacks + G.ensureself(); + G.store.incb=G.self; + G.store.mutex.unlock(); + } + ~CallbackUse() { + G.store.mutex.lock(); + G.store.incb=0; + } +}; + + void registerRefTrack(); void registerRefTrackGet(); void registerRefTrackPut(); From 51bc630755443ac539a566cde92ff877473f45d1 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 20 Sep 2018 17:09:26 -0700 Subject: [PATCH 04/12] add pvac::ClientChannel::info() --- src/client/Makefile | 1 + src/client/client.cpp | 1 + src/client/clientInfo.cpp | 123 ++++++++++++++++++++++++++++++++++++++ src/client/clientSync.cpp | 46 ++++++++++++++ src/client/clientpvt.h | 1 + src/client/pva/client.h | 22 ++++++- 6 files changed, 193 insertions(+), 1 deletion(-) create mode 100644 src/client/clientInfo.cpp diff --git a/src/client/Makefile b/src/client/Makefile index 920a2be..1a923ed 100644 --- a/src/client/Makefile +++ b/src/client/Makefile @@ -14,3 +14,4 @@ pvAccess_SRCS += clientGet.cpp pvAccess_SRCS += clientPut.cpp pvAccess_SRCS += clientRPC.cpp pvAccess_SRCS += clientMonitor.cpp +pvAccess_SRCS += clientInfo.cpp diff --git a/src/client/client.cpp b/src/client/client.cpp index 4461feb..8913902 100644 --- a/src/client/client.cpp +++ b/src/client/client.cpp @@ -222,6 +222,7 @@ void register_reftrack() pvac::detail::registerRefTrackPut(); pvac::detail::registerRefTrackMonitor(); pvac::detail::registerRefTrackRPC(); + pvac::detail::registerRefTrackInfo(); } std::tr1::shared_ptr diff --git a/src/client/clientInfo.cpp b/src/client/clientInfo.cpp new file mode 100644 index 0000000..de71ce1 --- /dev/null +++ b/src/client/clientInfo.cpp @@ -0,0 +1,123 @@ +/* + * Copyright information and license terms for this software can be + * found in the file LICENSE that is included with the distribution + */ + +#include +#include +#include + +#include +#include +#include +#include + +#define epicsExportSharedSymbols +#include "pv/logger.h" +#include "clientpvt.h" +#include "pv/pvAccess.h" + +namespace { +using pvac::detail::CallbackGuard; +using pvac::detail::CallbackUse; + +struct Infoer : public pvac::detail::CallbackStorage, + public pva::GetFieldRequester, + public pvac::Operation::Impl, + public pvac::detail::wrapped_shared_from_this +{ + pvac::ClientChannel::InfoCallback *cb; + const pva::Channel::shared_pointer channel; + + static size_t num_instances; + + explicit Infoer(pvac::ClientChannel::InfoCallback *cb, const pva::Channel::shared_pointer& channel) + :cb(cb), channel(channel) + {REFTRACE_INCREMENT(num_instances);} + virtual ~Infoer() { + CallbackGuard G(*this); + cb = 0; + G.wait(); // paranoia + REFTRACE_DECREMENT(num_instances); + } + + + virtual std::string getRequesterName() OVERRIDE FINAL + { + Guard G(mutex); + return channel->getChannelName(); + } + + virtual void getDone( + const pvd::Status& status, + pvd::FieldConstPtr const & field) OVERRIDE FINAL + { + CallbackGuard G(*this); + pvac::ClientChannel::InfoCallback *C(cb); + cb = 0; + if(C) { + pvac::InfoEvent evt; + evt.event = status.isSuccess() ? pvac::InfoEvent::Success : pvac::InfoEvent::Fail; + evt.message = status.getMessage(); + evt.type = field; + CallbackUse U(G); + C->infoDone(evt); + } + pvac::InfoEvent evt; + } + + virtual std::string name() const OVERRIDE FINAL { return channel->getChannelName(); } + + virtual void cancel() OVERRIDE FINAL { + CallbackGuard G(*this); + // we can't actually cancel a getField + pvac::ClientChannel::InfoCallback *C(cb); + cb = 0; + if(C) { + pvac::InfoEvent evt; + evt.event = pvac::InfoEvent::Cancel; + CallbackUse U(G); + C->infoDone(evt); + } + G.wait(); + } + + virtual void show(std::ostream& strm) const OVERRIDE FINAL { + strm << "Operation(Info" + "\"" << name() <<"\"" + ")"; + } +}; + +size_t Infoer::num_instances; + +} // namespace + +namespace pvac { + +Operation ClientChannel::info(InfoCallback *cb, const std::string& subfld) +{ + if(!impl) throw std::logic_error("Dead Channel"); + + std::tr1::shared_ptr ret(Infoer::build(cb, getChannel())); + + { + Guard G(ret->mutex); + getChannel()->getField(ret, subfld); + // getField is an oddity as it doesn't have an associated Operation class, + // and is thus largely out of our control. (eg. can't cancel) + } + + return Operation(ret); +} + +namespace detail { + +void registerRefTrackInfo() +{ + epics::registerRefCounter("pvac::Infoer", &Infoer::num_instances); +} + +} + +} // namespace pvac diff --git a/src/client/clientSync.cpp b/src/client/clientSync.cpp index 3811ee5..8442115 100644 --- a/src/client/clientSync.cpp +++ b/src/client/clientSync.cpp @@ -376,5 +376,51 @@ ClientChannel::monitor(const epics::pvData::PVStructure::const_shared_pointer &p return MonitorSync(mon, simpl); } +namespace { + + +struct InfoWait : public pvac::ClientChannel::InfoCallback, + public WaitCommon +{ + pvac::InfoEvent result; + + InfoWait() {} + virtual ~InfoWait() {} + virtual void infoDone(const pvac::InfoEvent& evt) OVERRIDE FINAL + { + { + Guard G(mutex); + if(done) { + LOG(pva::logLevelWarn, "oops, double event to InfoCallback"); + } else { + result = evt; + done = true; + } + } + event.signal(); + } +}; + +} // namespace + +epics::pvData::FieldConstPtr +ClientChannel::info(double timeout, const std::string& subfld) +{ + InfoWait waiter; + { + Operation op(info(&waiter, subfld)); + waiter.wait(timeout); + } + switch(waiter.result.event) { + case InfoEvent::Success: + return waiter.result.type; + case InfoEvent::Fail: + throw std::runtime_error(waiter.result.message); + default: + case InfoEvent::Cancel: // cancel implies timeout, which should already be thrown + THROW_EXCEPTION2(std::logic_error, "Cancelled!?!?"); + } +} + }//namespace pvac diff --git a/src/client/clientpvt.h b/src/client/clientpvt.h index 1beb9eb..defb821 100644 --- a/src/client/clientpvt.h +++ b/src/client/clientpvt.h @@ -161,6 +161,7 @@ void registerRefTrackGet(); void registerRefTrackPut(); void registerRefTrackMonitor(); void registerRefTrackRPC(); +void registerRefTrackInfo(); }} // namespace pvac::detail diff --git a/src/client/pva/client.h b/src/client/pva/client.h index 702377d..b5e25e1 100644 --- a/src/client/pva/client.h +++ b/src/client/pva/client.h @@ -85,7 +85,7 @@ protected: }; //! Information on put completion -struct PutEvent +struct epicsShareClass PutEvent { enum event_t { Fail, //!< request ends in failure. Check message @@ -105,6 +105,12 @@ struct epicsShareClass GetEvent : public PutEvent epics::pvData::BitSet::const_shared_pointer valid; }; +struct epicsShareClass InfoEvent : public PutEvent +{ + //! Type description resulting from getField operation. NULL unless event==Success + epics::pvData::FieldConstPtr type; +}; + struct MonitorSync; //! Handle for monitor subscription @@ -416,6 +422,20 @@ public: MonitorSync monitor(const epics::pvData::PVStructure::const_shared_pointer& pvRequest = epics::pvData::PVStructure::const_shared_pointer(), epicsEvent *event =0); + struct InfoCallback { + virtual ~InfoCallback() {} + //! getField operation is complete + virtual void infoDone(const InfoEvent& evt) =0; + }; + + //! Request PV type info. + //! @note This type may not be the same as the types used in the get/put/monitor operations. + Operation info(InfoCallback *cb, const std::string& subfld = std::string()); + + //! Synchronious getField opreation + epics::pvData::FieldConstPtr info(double timeout = 3.0, + const std::string& subfld = std::string()); + //! Connection state change CB struct ConnectCallback { virtual ~ConnectCallback() {} From 7ed6f1315f7f4f0d8f987f52ee7a0bb0f46257e9 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 23 Sep 2018 12:03:45 -0700 Subject: [PATCH 05/12] SharedPV::open() less work under lock --- src/server/sharedstate_pv.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/server/sharedstate_pv.cpp b/src/server/sharedstate_pv.cpp index 17f45f0..4860b23 100644 --- a/src/server/sharedstate_pv.cpp +++ b/src/server/sharedstate_pv.cpp @@ -129,6 +129,8 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& typedef std::vector > xgetfields_t; const pvd::StructureConstPtr newtype(value.getStructure()); + pvd::PVStructurePtr newvalue(pvd::getPVDataCreate()->createPVStructure(newtype)); + newvalue->copyUnchecked(value, valid); xputs_t p_put; xrpcs_t p_rpc; @@ -145,9 +147,8 @@ void SharedPV::open(const pvd::PVStructure &value, const epics::pvData::BitSet& p_monitor.reserve(monitors.size()); p_getfield.reserve(getfields.size()); - type = value.getStructure(); - current = pvd::getPVDataCreate()->createPVStructure(newtype); - current->copyUnchecked(value); + type = newtype; + current = newvalue; this->valid = valid; FOR_EACH(puts_t::const_iterator, it, end, puts) { From 83b2105241e925b14f1d7dfdb883c2c874fd391d Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 26 Sep 2018 14:56:11 -0700 Subject: [PATCH 06/12] StaticProvider createChannel() avoid possible deadlock connect() may invoke SharedPV onFirstConnect() callback. --- src/server/server.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/server/server.cpp b/src/server/server.cpp index ba4867e..7e64650 100644 --- a/src/server/server.cpp +++ b/src/server/server.cpp @@ -87,12 +87,17 @@ struct StaticProvider::Impl : public pva::ChannelProvider pva::Channel::shared_pointer ret; pvd::Status sts; + builders_t::mapped_type builder; { Guard G(mutex); builders_t::const_iterator it(builders.find(name)); - if(it!=builders.end()) - ret = it->second->connect(Impl::shared_pointer(internal_self), name, requester); + if(it!=builders.end()) { + UnGuard U(G); + builder = it->second; + } } + if(builder) + ret = builder->connect(Impl::shared_pointer(internal_self), name, requester); if(!ret) { sts = pvd::Status::error("No such channel"); From 4296c5e015b473dbb6653b5433765285938e551f Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 23 Sep 2018 12:03:21 -0700 Subject: [PATCH 07/12] more SharedPV::close() --- src/server/pva/sharedstate.h | 4 +++ src/server/sharedstate_channel.cpp | 2 +- src/server/sharedstate_pv.cpp | 51 +++++++++++++++--------------- 3 files changed, 31 insertions(+), 26 deletions(-) diff --git a/src/server/pva/sharedstate.h b/src/server/pva/sharedstate.h index 6d6d19f..2a32b8e 100644 --- a/src/server/pva/sharedstate.h +++ b/src/server/pva/sharedstate.h @@ -143,6 +143,10 @@ public: //! If destory=true, the internal client list is cleared. //! @post In the closed state //! @note Provider locking rules apply (@see provider_roles_requester_locking). + //! + //! close() is not final, even with destroy=true new clients may begin connecting, and open() may be called again. + //! A final close() should be performed after the removal from StaticProvider/DynamicProvider + //! which will prevent new clients. virtual void close(bool destroy=false); //! Create a new container which may be used to prepare to call post(). diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp index b5912e7..7789b8c 100644 --- a/src/server/sharedstate_channel.cpp +++ b/src/server/sharedstate_channel.cpp @@ -51,7 +51,7 @@ SharedChannel::SharedChannel(const std::tr1::shared_ptr &owner, if(owner->channels.empty()) handler = owner->handler; owner->channels.push_back(this); - owner->notifiedConn = !!handler; + owner->notifiedConn = true; } if(handler) { handler->onFirstConnect(owner); diff --git a/src/server/sharedstate_pv.cpp b/src/server/sharedstate_pv.cpp index 4860b23..98851dc 100644 --- a/src/server/sharedstate_pv.cpp +++ b/src/server/sharedstate_pv.cpp @@ -231,35 +231,36 @@ void SharedPV::close(bool destroy) { Guard I(mutex); - if(!type) - return; + if(type) { - p_put.reserve(puts.size()); - p_rpc.reserve(rpcs.size()); - p_monitor.reserve(monitors.size()); - p_channel.reserve(channels.size()); + p_put.reserve(puts.size()); + p_rpc.reserve(rpcs.size()); + p_monitor.reserve(monitors.size()); + p_channel.reserve(channels.size()); - FOR_EACH(puts_t::const_iterator, it, end, puts) { - (*it)->mapper.reset(); - p_put.push_back((*it)->requester.lock()); - } - FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { - p_rpc.push_back((*it)->requester.lock()); - } - FOR_EACH(monitors_t::const_iterator, it, end, monitors) { - (*it)->close(); - try { - p_monitor.push_back((*it)->shared_from_this()); - }catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ } - } - FOR_EACH(channels_t::const_iterator, it, end, channels) { - try { - p_channel.push_back((*it)->shared_from_this()); - }catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ } + FOR_EACH(puts_t::const_iterator, it, end, puts) { + (*it)->mapper.reset(); + p_put.push_back((*it)->requester.lock()); + } + FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) { + p_rpc.push_back((*it)->requester.lock()); + } + FOR_EACH(monitors_t::const_iterator, it, end, monitors) { + (*it)->close(); + try { + p_monitor.push_back((*it)->shared_from_this()); + }catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ } + } + FOR_EACH(channels_t::const_iterator, it, end, channels) { + try { + p_channel.push_back((*it)->shared_from_this()); + }catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ } + } + + type.reset(); + current.reset(); } - type.reset(); - current.reset(); if(destroy) { // forget about all clients, to prevent the possibility of our // sending a second destroy notification. From 4e5aef3e42de9dcfcbf106d86ecd2f56ce2e9904 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 2 Oct 2018 12:47:50 -0700 Subject: [PATCH 08/12] SharedPV allow rpc() while close()'d No reason to prevent this as RPCs need not use the same type (Structure) as get/put/monitor. --- src/server/pva/sharedstate.h | 4 ++-- src/server/sharedstate_channel.cpp | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/server/pva/sharedstate.h b/src/server/pva/sharedstate.h index 2a32b8e..e424dd7 100644 --- a/src/server/pva/sharedstate.h +++ b/src/server/pva/sharedstate.h @@ -47,7 +47,7 @@ struct Operation; /** A Shared State Process Variable (PV) * * "Shared" in the sense that all clients/subscribers interact with the - * same PVStructure. + * same PVStructure (excluding the RPC operation). * * @warning For the purposes of locking, this class is an Operation (see @ref provider_roles_requester_locking). * eg. no locks may be held when calling post(), open(), close(), or connect(). @@ -62,7 +62,7 @@ struct Operation; * Calling close() will close all currently opened client channels. * * Client channels, and operations on them, may be initiated at any time (via connect()). - * However, operations will not be fully created until open() is called. + * However, operations other than RPC will not proceed until open() is called. * * @note A SharedPV does not have a name. Name(s) are associated with a SharedPV * By a Provider (StaticProvider, DynamicProvider, or any epics::pvAccess::ChannelProvider). diff --git a/src/server/sharedstate_channel.cpp b/src/server/sharedstate_channel.cpp index 7789b8c..76b6c64 100644 --- a/src/server/sharedstate_channel.cpp +++ b/src/server/sharedstate_channel.cpp @@ -157,14 +157,11 @@ pva::ChannelRPC::shared_pointer SharedChannel::createChannelRPC( pvd::PVStructure::shared_pointer const & pvRequest) { std::tr1::shared_ptr ret(new SharedRPC(shared_from_this(), requester, pvRequest)); - bool opened; { Guard G(owner->mutex); owner->rpcs.push_back(ret.get()); - opened = !!owner->type; } - if(opened) - requester->channelRPCConnect(pvd::Status(), ret); + requester->channelRPCConnect(pvd::Status(), ret); return ret; } From e6902ee41fbd05a7af188e87dab7e97b39f03a83 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sat, 29 Sep 2018 12:58:51 -0700 Subject: [PATCH 09/12] avoid UB in decodeAsIPv6Address() Order of evaluation within an expression is undefined behavior. --- src/utils/inetAddressUtil.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/utils/inetAddressUtil.cpp b/src/utils/inetAddressUtil.cpp index 378cf27..873c819 100644 --- a/src/utils/inetAddressUtil.cpp +++ b/src/utils/inetAddressUtil.cpp @@ -57,11 +57,13 @@ bool decodeAsIPv6Address(ByteBuffer* buffer, osiSockAddr* address) { // allow all zeros address //if (ffff != (int16)0xFFFF) return false; - uint32_t ipv4Addr = - ((uint32_t)(buffer->getByte()&0xFF))<<24 | - ((uint32_t)(buffer->getByte()&0xFF))<<16 | - ((uint32_t)(buffer->getByte()&0xFF))<<8 | - ((uint32_t)(buffer->getByte()&0xFF)); + uint32 ipv4Addr = uint8(buffer->getByte()); + ipv4Addr <<= 8; + ipv4Addr |= uint8(buffer->getByte()); + ipv4Addr <<= 8; + ipv4Addr |= uint8(buffer->getByte()); + ipv4Addr <<= 8; + ipv4Addr |= uint8(buffer->getByte()); if (ffff != (int16)0xFFFF && ipv4Addr != (uint32_t)0) return false; From 2fec84461da29c67656f6b07a3816bd3ac89778c Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sat, 29 Sep 2018 15:01:11 -0700 Subject: [PATCH 10/12] use of CMD_* instead of magic numbers --- src/remote/codec.cpp | 2 +- src/remote/pv/codec.h | 2 +- src/server/beaconEmitter.cpp | 2 +- src/server/responseHandlers.cpp | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 72d59b3..012b029 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -1398,7 +1398,7 @@ public: } void send(ByteBuffer* buffer, TransportSendControl* control) { - control->startMessage((int8)5, 0); + control->startMessage(CMD_AUTHNZ, 0); SerializationHelper::serializeFull(buffer, control, _data); // send immediately control->flush(true); diff --git a/src/remote/pv/codec.h b/src/remote/pv/codec.h index c1563af..e792bdf 100644 --- a/src/remote/pv/codec.h +++ b/src/remote/pv/codec.h @@ -339,7 +339,7 @@ public: } virtual void processControlMessage() OVERRIDE FINAL { - if (_command == 2) + if (_command == CMD_SET_ENDIANESS) { // check 7-th bit setByteOrder(_flags < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE); diff --git a/src/server/beaconEmitter.cpp b/src/server/beaconEmitter.cpp index 5341f3c..4a0f157 100644 --- a/src/server/beaconEmitter.cpp +++ b/src/server/beaconEmitter.cpp @@ -62,7 +62,7 @@ void BeaconEmitter::send(ByteBuffer* buffer, TransportSendControl* control) } // send beacon - control->startMessage((int8)0, 12+2+2+16+2); + control->startMessage((int8)CMD_BEACON, 12+2+2+16+2); buffer->put(_guid.value, 0, sizeof(_guid.value)); diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index dcd41b0..80d1d88 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -443,7 +443,7 @@ void ServerChannelFindRequesterImpl::channelFindResult(const Status& /*status*/, void ServerChannelFindRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) { - control->startMessage((int8)4, 12+4+16+2); + control->startMessage(CMD_SEARCH_RESPONSE, 12+4+16+2); Lock guard(_mutex); buffer->put(_guid.value, 0, sizeof(_guid.value)); @@ -1802,7 +1802,7 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon return; } - control->startMessage((int32)12, sizeof(int32)/sizeof(int8) + 1); + control->startMessage(CMD_PUT_GET, sizeof(int32)/sizeof(int8) + 1); buffer->putInt(_ioid); buffer->putByte((int8)request); { From d4f3abf461547393172e263f7a62b0669352a814 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 30 Sep 2018 14:29:57 -0700 Subject: [PATCH 11/12] more descriptive UDP logging --- src/remote/blockingUDPConnector.cpp | 3 --- src/remote/blockingUDPTransport.cpp | 14 ++++++++------ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/remote/blockingUDPConnector.cpp b/src/remote/blockingUDPConnector.cpp index 3b83faa..a2bebec 100644 --- a/src/remote/blockingUDPConnector.cpp +++ b/src/remote/blockingUDPConnector.cpp @@ -39,9 +39,6 @@ BlockingUDPTransport::shared_pointer BlockingUDPConnector::connect(std::tr1::sha ResponseHandler::shared_pointer const & responseHandler, osiSockAddr& bindAddress, int8 transportRevision, int16 /*priority*/) { - LOG(logLevelDebug, "Creating datagram socket to: %s.", - inetAddressToString(bindAddress).c_str()); - SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if(socket==INVALID_SOCKET) { char errStr[64]; diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index d151a31..a0db1ef 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -81,6 +81,8 @@ BlockingUDPTransport::BlockingUDPTransport(bool serverFlag, char strBuffer[64]; sockAddrToDottedIP(&_remoteAddress.sa, strBuffer, sizeof(strBuffer)); _remoteName = strBuffer; + LOG(logLevelDebug, "Creating datagram socket from: %s.", + _remoteName.c_str()); } REFTRACE_INCREMENT(num_instances); @@ -398,8 +400,8 @@ bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSock { if (IS_LOGGABLE(logLevelDebug)) { - LOG(logLevelDebug, "Sending %zu bytes to %s.", - length, inetAddressToString(address).c_str()); + LOG(logLevelDebug, "Sending %zu bytes %s -> %s.", + length, _remoteName.c_str(), inetAddressToString(address).c_str()); } int retval = sendto(_channel, buffer, @@ -422,8 +424,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address) if (IS_LOGGABLE(logLevelDebug)) { - LOG(logLevelDebug, "Sending %zu bytes to %s.", - buffer->getRemaining(), inetAddressToString(address).c_str()); + LOG(logLevelDebug, "Sending %zu bytes %s -> %s.", + buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(address).c_str()); } int retval = sendto(_channel, buffer->getArray(), @@ -459,8 +461,8 @@ bool BlockingUDPTransport::send(ByteBuffer* buffer, InetAddressType target) { if (IS_LOGGABLE(logLevelDebug)) { - LOG(logLevelDebug, "Sending %zu bytes to %s.", - buffer->getRemaining(), inetAddressToString(_sendAddresses[i]).c_str()); + LOG(logLevelDebug, "Sending %zu bytes %s -> %s.", + buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str()); } int retval = sendto(_channel, buffer->getArray(), From 30f07f2fb6a633391c2fdc1c9902b628dea02a10 Mon Sep 17 00:00:00 2001 From: "Evan J. Smith" Date: Wed, 12 Sep 2018 09:48:41 -0400 Subject: [PATCH 12/12] Changing default request string to 'field()' --- pvtoolsSrc/pvget.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pvtoolsSrc/pvget.cpp b/pvtoolsSrc/pvget.cpp index cdae143..028079c 100644 --- a/pvtoolsSrc/pvget.cpp +++ b/pvtoolsSrc/pvget.cpp @@ -42,7 +42,7 @@ namespace { bool debugFlag = false; -string request("field(value)"); +string request("field()"); string defaultProvider("pva"); enum PrintMode { ValueOnlyMode, StructureMode, TerseMode }; @@ -80,7 +80,7 @@ void printValue(std::string const & channelName, PVStructure::shared_pointer con PVField::shared_pointer value = pv->getSubField("value"); if (value.get() == 0) { - std::cerr << "no 'value' field\n"; + //std::cerr << "no 'value' field\n"; pvutil_ostream myos(std::cout); myos << channelName << "\n" << *(pv.get()) << "\n\n"; }