From d1cbb0abd5cdfad59a5150f90f8654c71fcb3483 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 22 Nov 2017 14:20:26 -0600 Subject: [PATCH] pvac: ensure that listeners are done in ClientChannel ensure no concurrent callback in progress when removing listeners. --- src/client/client.cpp | 56 ++++++++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/src/client/client.cpp b/src/client/client.cpp index 598fa6e..2966e3c 100644 --- a/src/client/client.cpp +++ b/src/client/client.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -20,6 +21,7 @@ namespace pvd = epics::pvData; namespace pva = epics::pvAccess; typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; namespace pvac { @@ -35,10 +37,12 @@ struct ClientChannel::Impl : public pva::ChannelRequester, // assume few listeners per channel, store in vector typedef std::vector listeners_t; listeners_t listeners; + bool listeners_inprogress; + epicsEvent listeners_done; static size_t num_instances; - Impl() {REFTRACE_INCREMENT(num_instances);} + Impl() :listeners_inprogress(false) {REFTRACE_INCREMENT(num_instances);} virtual ~Impl() {REFTRACE_DECREMENT(num_instances);} // called automatically via wrapped_shared_from_this @@ -47,6 +51,10 @@ struct ClientChannel::Impl : public pva::ChannelRequester, // ClientChannel destroy implicitly removes all callbacks, // but doesn't destroy the Channel or cancel Operations Guard G(mutex); + while(listeners_inprogress) { + UnGuard U(G); + listeners_done.wait(); + } listeners.clear(); } @@ -60,25 +68,39 @@ struct ClientChannel::Impl : public pva::ChannelRequester, { Guard G(mutex); notify = listeners; // copy vector + listeners_inprogress = true; } - ConnectEvent evt; - evt.connected = connectionState==pva::Channel::CONNECTED; - for(listeners_t::const_iterator it=notify.begin(), end=notify.end(); it!=end; ++it) - { - try { - (*it)->connectEvent(evt); - }catch(std::exception& e){ - LOG(pva::logLevelError, "Unhandled exception in connection state listener: %s\n", e.what()); + try { + ConnectEvent evt; + evt.connected = connectionState==pva::Channel::CONNECTED; + for(listeners_t::const_iterator it=notify.begin(), end=notify.end(); it!=end; ++it) + { + try { + (*it)->connectEvent(evt); + }catch(std::exception& e){ + LOG(pva::logLevelError, "Unhandled exception in connection state listener: %s\n", e.what()); - Guard G(mutex); - for(listeners_t::iterator it2=listeners.begin(), end2=listeners.end(); it2!=end2; ++it2) { - if(*it==*it2) { - listeners.erase(it2); - break; + Guard G(mutex); + for(listeners_t::iterator it2=listeners.begin(), end2=listeners.end(); it2!=end2; ++it2) { + if(*it==*it2) { + listeners.erase(it2); + break; + } } } } + }catch(...){ + { + Guard G(mutex); + listeners_inprogress = false; + } + listeners_done.signal(); } + { + Guard G(mutex); + listeners_inprogress = false; + } + listeners_done.signal(); } }; @@ -159,6 +181,12 @@ void ClientChannel::removeConnectListener(ConnectCallback* cb) if(!impl) throw std::logic_error("Dead Channel"); Guard G(impl->mutex); + // ensure no in-progress callbacks + while(impl->listeners_inprogress) { + UnGuard U(G); + impl->listeners_done.wait(); + } + for(Impl::listeners_t::iterator it=impl->listeners.begin(), end=impl->listeners.end(); it!=end; ++it) { if(cb==*it) {