From 855335f3b13f4df069a3f705bcd17944514b701c Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 25 Sep 2018 18:05:00 -0700 Subject: [PATCH] client.h guard concurrent callbacks on cancel cancel() blocks until concurrent callbacks have completed. Also serializes any callbacks for each Operation/Monitor. --- src/client/clientGet.cpp | 31 ++++++++----- src/client/clientMonitor.cpp | 31 ++++++++----- src/client/clientPut.cpp | 39 +++++++++------- src/client/clientRPC.cpp | 42 ++++++++---------- src/client/clientpvt.h | 86 ++++++++++++++++++++++++++++++++++++ 5 files changed, 166 insertions(+), 63 deletions(-) diff --git a/src/client/clientGet.cpp b/src/client/clientGet.cpp index 9091b03..6314db0 100644 --- a/src/client/clientGet.cpp +++ b/src/client/clientGet.cpp @@ -18,13 +18,14 @@ #include "pv/pvAccess.h" namespace { +using pvac::detail::CallbackGuard; +using pvac::detail::CallbackUse; -struct Getter : public pva::ChannelGetRequester, - public pvac::Operation::Impl, - public pvac::detail::wrapped_shared_from_this +struct Getter : public pvac::detail::CallbackStorage, + public pva::ChannelGetRequester, + public pvac::Operation::Impl, + public pvac::detail::wrapped_shared_from_this { - mutable epicsMutex mutex; - operation_type::shared_pointer op; pvac::ClientChannel::GetCallback *cb; @@ -34,16 +35,21 @@ struct Getter : public pva::ChannelGetRequester, explicit Getter(pvac::ClientChannel::GetCallback* cb) :cb(cb) {REFTRACE_INCREMENT(num_instances);} - virtual ~Getter() {REFTRACE_DECREMENT(num_instances);} + virtual ~Getter() { + CallbackGuard G(*this); + cb = 0; + G.wait(); // paranoia + REFTRACE_DECREMENT(num_instances); + } - void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) + void callEvent(CallbackGuard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) { if(!cb) return; event.event = evt; pvac::ClientChannel::GetCallback *C=cb; cb = 0; - UnGuard U(G); + CallbackUse U(G); C->getDone(event); } @@ -58,9 +64,10 @@ struct Getter : public pva::ChannelGetRequester, { // keepalive for safety in case callback wants to destroy us std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(op) op->cancel(); callEvent(G, pvac::GetEvent::Cancel); + G.wait(); } virtual std::string getRequesterName() OVERRIDE FINAL @@ -75,7 +82,7 @@ struct Getter : public pva::ChannelGetRequester, epics::pvData::Structure::const_shared_pointer const & structure) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb) return; if(!status.isOK()) { @@ -94,7 +101,7 @@ struct Getter : public pva::ChannelGetRequester, virtual void channelDisconnect(bool destroy) OVERRIDE FINAL { - Guard G(mutex); + CallbackGuard G(*this); event.message = "Disconnect"; callEvent(G); @@ -107,7 +114,7 @@ struct Getter : public pva::ChannelGetRequester, epics::pvData::BitSet::shared_pointer const & bitSet) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb) return; if(!status.isOK()) { diff --git a/src/client/clientMonitor.cpp b/src/client/clientMonitor.cpp index f285904..7a8fa14 100644 --- a/src/client/clientMonitor.cpp +++ b/src/client/clientMonitor.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -17,11 +18,13 @@ #include "pv/pvAccess.h" namespace pvac { +using pvac::detail::CallbackGuard; +using pvac::detail::CallbackUse; -struct Monitor::Impl : public pva::MonitorRequester, +struct Monitor::Impl : public pvac::detail::CallbackStorage, + public pva::MonitorRequester, public pvac::detail::wrapped_shared_from_this { - mutable epicsMutex mutex; pva::Channel::shared_pointer chan; operation_type::shared_pointer op; bool started, done, seenEmpty; @@ -39,9 +42,14 @@ struct Monitor::Impl : public pva::MonitorRequester, ,seenEmpty(false) ,cb(cb) {REFTRACE_INCREMENT(num_instances);} - virtual ~Impl() {REFTRACE_DECREMENT(num_instances);} + virtual ~Impl() { + CallbackGuard G(*this); + cb = 0; + G.wait(); // paranoia + REFTRACE_DECREMENT(num_instances); + } - void callEvent(Guard& G, MonitorEvent::event_t evt = MonitorEvent::Fail) + void callEvent(CallbackGuard& G, MonitorEvent::event_t evt = MonitorEvent::Fail) { ClientChannel::MonitorCallback *cb=this->cb; if(!cb) return; @@ -52,7 +60,7 @@ struct Monitor::Impl : public pva::MonitorRequester, this->cb = 0; // last event try { - UnGuard U(G); + CallbackUse U(G); cb->monitorEvent(event); return; }catch(std::exception& e){ @@ -66,7 +74,7 @@ struct Monitor::Impl : public pva::MonitorRequester, } // continues error handling try { - UnGuard U(G); + CallbackUse U(G); cb->monitorEvent(event); return; }catch(std::exception& e){ @@ -82,7 +90,7 @@ struct Monitor::Impl : public pva::MonitorRequester, // keepalive for safety in case callback wants to destroy us std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); last.reset(); @@ -93,6 +101,7 @@ struct Monitor::Impl : public pva::MonitorRequester, temp.swap(op); callEvent(G, MonitorEvent::Cancel); + G.wait(); } if(temp) temp->destroy(); @@ -110,7 +119,7 @@ struct Monitor::Impl : public pva::MonitorRequester, pvd::StructureConstPtr const & structure) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb || started || done) return; if(!status.isOK()) { @@ -140,7 +149,7 @@ struct Monitor::Impl : public pva::MonitorRequester, virtual void channelDisconnect(bool destroy) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb || done) return; event.message = "Disconnect"; started = false; @@ -150,7 +159,7 @@ struct Monitor::Impl : public pva::MonitorRequester, virtual void monitorEvent(pva::MonitorPtr const & monitor) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb || done) return; event.message.clear(); @@ -160,7 +169,7 @@ struct Monitor::Impl : public pva::MonitorRequester, virtual void unlisten(pva::MonitorPtr const & monitor) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb || done) return; done = true; diff --git a/src/client/clientPut.cpp b/src/client/clientPut.cpp index d70973f..bb481e8 100644 --- a/src/client/clientPut.cpp +++ b/src/client/clientPut.cpp @@ -18,13 +18,14 @@ #include "pv/pvAccess.h" namespace { +using pvac::detail::CallbackGuard; +using pvac::detail::CallbackUse; -struct Putter : public pva::ChannelPutRequester, - public pvac::Operation::Impl, - public pvac::detail::wrapped_shared_from_this +struct Putter : public pvac::detail::CallbackStorage, + public pva::ChannelPutRequester, + public pvac::Operation::Impl, + public pvac::detail::wrapped_shared_from_this { - mutable epicsMutex mutex; - const bool getcurrent; bool started; // whether the put() has actually been sent. After which point we can't safely re-try. @@ -38,16 +39,21 @@ struct Putter : public pva::ChannelPutRequester, Putter(pvac::ClientChannel::PutCallback* cb, bool getcurrent) :getcurrent(getcurrent), started(false), cb(cb) {REFTRACE_INCREMENT(num_instances);} - virtual ~Putter() {REFTRACE_DECREMENT(num_instances);} + virtual ~Putter() { + CallbackGuard G(*this); + cb = 0; + G.wait(); // paranoia + REFTRACE_DECREMENT(num_instances); + } - void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) + void callEvent(CallbackGuard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) { if(!cb) return; event.event = evt; pvac::ClientChannel::PutCallback *C=cb; cb = 0; - UnGuard U(G); + CallbackUse U(G); C->putDone(event); } @@ -62,9 +68,10 @@ struct Putter : public pva::ChannelPutRequester, { // keepalive for safety in case callback wants to destroy us std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(started && op) op->cancel(); callEvent(G, pvac::GetEvent::Cancel); + G.wait(); } virtual std::string getRequesterName() OVERRIDE FINAL @@ -79,7 +86,7 @@ struct Putter : public pva::ChannelPutRequester, epics::pvData::Structure::const_shared_pointer const & structure) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); op = channelPut; // we may be called before createChannelPut() has returned. puttype = structure; if(started || !cb) return; @@ -107,20 +114,20 @@ struct Putter : public pva::ChannelPutRequester, virtual void channelDisconnect(bool destroy) OVERRIDE FINAL { - Guard G(mutex); + CallbackGuard G(*this); event.message = "Disconnect"; callEvent(G); } - void doPut(Guard& G, + void doPut(CallbackGuard& G, pvac::ClientChannel::PutCallback::Args& args, pva::ChannelPut::shared_pointer const & channelPut, const pvd::BitSet::shared_pointer& tosend) { try { pvac::ClientChannel::PutCallback *C(cb); - UnGuard U(G); + CallbackUse U(G); C->putBuild(puttype, args); if(!args.root) throw std::logic_error("No put value provided"); @@ -136,8 +143,8 @@ struct Putter : public pva::ChannelPutRequester, } // check cb again after UnGuard if(cb) { - channelPut->put(std::tr1::const_pointer_cast(args.root), tosend); started = true; + channelPut->put(std::tr1::const_pointer_cast(args.root), tosend); } } @@ -148,7 +155,7 @@ struct Putter : public pva::ChannelPutRequester, epics::pvData::BitSet::shared_pointer const & bitSet) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb) return; if(!status.isOK()) { @@ -169,7 +176,7 @@ struct Putter : public pva::ChannelPutRequester, pva::ChannelPut::shared_pointer const & channelPut) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb) return; if(!status.isOK()) { diff --git a/src/client/clientRPC.cpp b/src/client/clientRPC.cpp index 12b06ef..ff6d54d 100644 --- a/src/client/clientRPC.cpp +++ b/src/client/clientRPC.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -16,13 +17,14 @@ #include "pv/pvAccess.h" namespace { +using pvac::detail::CallbackGuard; +using pvac::detail::CallbackUse; -struct RPCer : public pva::ChannelRPCRequester, +struct RPCer : public pvac::detail::CallbackStorage, + public pva::ChannelRPCRequester, public pvac::Operation::Impl, public pvac::detail::wrapped_shared_from_this { - mutable epicsMutex mutex; - bool started; operation_type::shared_pointer op; @@ -36,9 +38,14 @@ struct RPCer : public pva::ChannelRPCRequester, RPCer(pvac::ClientChannel::GetCallback* cb, const pvd::PVStructure::const_shared_pointer& args) :started(false), cb(cb), args(args) {REFTRACE_INCREMENT(num_instances);} - virtual ~RPCer() {REFTRACE_DECREMENT(num_instances);} + virtual ~RPCer() { + CallbackGuard G(*this); + cb = 0; + G.wait(); // paranoia + REFTRACE_DECREMENT(num_instances); + } - void callEvent(Guard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) + void callEvent(CallbackGuard& G, pvac::GetEvent::event_t evt = pvac::GetEvent::Fail) { pvac::ClientChannel::GetCallback *cb=this->cb; if(!cb) return; @@ -48,24 +55,11 @@ struct RPCer : public pva::ChannelRPCRequester, this->cb = 0; try { - UnGuard U(G); + CallbackUse U(G); cb->getDone(event); return; }catch(std::exception& e){ - if(!this->cb || evt==pvac::GetEvent::Fail) { - LOG(pva::logLevelError, "Unhandled exception in ClientChannel::GetCallback::getDone(): %s", e.what()); - } else { - event.event = pvac::GetEvent::Fail; - event.message = e.what(); - } - } - // continues error handling - try { - UnGuard U(G); - cb->getDone(event); - return; - }catch(std::exception& e){ - LOG(pva::logLevelError, "Unhandled exception following exception in ClientChannel::GetCallback::monitorEvent(): %s", e.what()); + LOG(pva::logLevelError, "Unhandled exception in ClientChannel::RPCCallback::requestDone(): %s", e.what()); } } @@ -79,7 +73,7 @@ struct RPCer : public pva::ChannelRPCRequester, virtual void cancel() { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(started && op) op->cancel(); callEvent(G, pvac::GetEvent::Cancel); } @@ -95,7 +89,7 @@ struct RPCer : public pva::ChannelRPCRequester, pva::ChannelRPC::shared_pointer const & operation) { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb || started) return; if(!status.isOK()) { @@ -115,7 +109,7 @@ struct RPCer : public pva::ChannelRPCRequester, virtual void channelDisconnect(bool destroy) OVERRIDE FINAL { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); event.message = "Disconnect"; callEvent(G); @@ -127,7 +121,7 @@ struct RPCer : public pva::ChannelRPCRequester, epics::pvData::PVStructure::shared_pointer const & pvResponse) { std::tr1::shared_ptr keepalive(internal_shared_from_this()); - Guard G(mutex); + CallbackGuard G(*this); if(!cb) return; if(!status.isOK()) { diff --git a/src/client/clientpvt.h b/src/client/clientpvt.h index 402c655..1beb9eb 100644 --- a/src/client/clientpvt.h +++ b/src/client/clientpvt.h @@ -1,6 +1,9 @@ #ifndef CLIENTPVT_H #define CLIENTPVT_H +#include +#include + #include #include @@ -70,6 +73,89 @@ public: } }; +/** Safe use of raw callback pointer while unlocked. + * clear pointer and then call CallbackGuard::wait() to ensure that concurrent + * callback have completed. + * + * Prototype usage + @code + * struct mycb : public CallbackStorage { + * void (*ptr)(); + * }; + * // make a callback + * void docb(mycb& cb) { + * CallbackGuard G(cb); // lock + * // decide whether to make CB + * if(P){ + * void (*P)() = ptr; // copy for use while unlocked + * CallbackUse U(G); // unlock + * (*P)(); + * // automatic re-lock + * } + * // automatic final unlock + * } + * void cancelop(mycb& cb) { + * CallbackGuard G(cb); + * ptr = 0; // prevent further callbacks from starting + * G.wait(); // wait for inprogress callbacks to complete + * } + @endcode + */ +struct CallbackStorage { + mutable epicsMutex mutex; + epicsEvent wakeup; + size_t nwaitcb; + epicsThreadId incb; + CallbackStorage() :nwaitcb(0u), incb(0) {} +}; + +// analogous to epicsGuard +struct CallbackGuard { + CallbackStorage& store; + epicsThreadId self; + explicit CallbackGuard(CallbackStorage& store) :store(store), self(0) { + store.mutex.lock(); + } + ~CallbackGuard() { + bool notify = store.nwaitcb!=0; + store.mutex.unlock(); + if(notify) + store.wakeup.signal(); + } + void ensureself() { + if(!self) + self = epicsThreadGetIdSelf(); + } + // unlock and block until no in-progress callbacks + void wait() { + if(!store.incb) return; + ensureself(); + store.nwaitcb++; + while(store.incb && store.incb!=self) { + store.mutex.unlock(); + store.wakeup.wait(); + store.mutex.lock(); + } + store.nwaitcb--; + } +}; + +// analogous to epicsGuardRelease +struct CallbackUse { + CallbackGuard& G; + explicit CallbackUse(CallbackGuard& G) :G(G) { + G.wait(); // serialize callbacks + G.ensureself(); + G.store.incb=G.self; + G.store.mutex.unlock(); + } + ~CallbackUse() { + G.store.mutex.lock(); + G.store.incb=0; + } +}; + + void registerRefTrack(); void registerRefTrackGet(); void registerRefTrackPut();