pvac: ensure that listeners are done in ClientChannel

ensure no concurrent callback in progress
when removing listeners.
This commit is contained in:
Michael Davidsaver
2017-11-22 14:20:26 -06:00
parent 6055dbb983
commit d1cbb0abd5

View File

@@ -5,6 +5,7 @@
#include <epicsMutex.h>
#include <epicsGuard.h>
#include <epicsEvent.h>
#include <pv/pvData.h>
#include <pv/bitSet.h>
@@ -20,6 +21,7 @@ namespace pvd = epics::pvData;
namespace pva = epics::pvAccess;
typedef epicsGuard<epicsMutex> Guard;
typedef epicsGuardRelease<epicsMutex> UnGuard;
namespace pvac {
@@ -35,10 +37,12 @@ struct ClientChannel::Impl : public pva::ChannelRequester,
// assume few listeners per channel, store in vector
typedef std::vector<ClientChannel::ConnectCallback*> listeners_t;
listeners_t listeners;
bool listeners_inprogress;
epicsEvent listeners_done;
static size_t num_instances;
Impl() {REFTRACE_INCREMENT(num_instances);}
Impl() :listeners_inprogress(false) {REFTRACE_INCREMENT(num_instances);}
virtual ~Impl() {REFTRACE_DECREMENT(num_instances);}
// called automatically via wrapped_shared_from_this
@@ -47,6 +51,10 @@ struct ClientChannel::Impl : public pva::ChannelRequester,
// ClientChannel destroy implicitly removes all callbacks,
// but doesn't destroy the Channel or cancel Operations
Guard G(mutex);
while(listeners_inprogress) {
UnGuard U(G);
listeners_done.wait();
}
listeners.clear();
}
@@ -60,25 +68,39 @@ struct ClientChannel::Impl : public pva::ChannelRequester,
{
Guard G(mutex);
notify = listeners; // copy vector
listeners_inprogress = true;
}
ConnectEvent evt;
evt.connected = connectionState==pva::Channel::CONNECTED;
for(listeners_t::const_iterator it=notify.begin(), end=notify.end(); it!=end; ++it)
{
try {
(*it)->connectEvent(evt);
}catch(std::exception& e){
LOG(pva::logLevelError, "Unhandled exception in connection state listener: %s\n", e.what());
try {
ConnectEvent evt;
evt.connected = connectionState==pva::Channel::CONNECTED;
for(listeners_t::const_iterator it=notify.begin(), end=notify.end(); it!=end; ++it)
{
try {
(*it)->connectEvent(evt);
}catch(std::exception& e){
LOG(pva::logLevelError, "Unhandled exception in connection state listener: %s\n", e.what());
Guard G(mutex);
for(listeners_t::iterator it2=listeners.begin(), end2=listeners.end(); it2!=end2; ++it2) {
if(*it==*it2) {
listeners.erase(it2);
break;
Guard G(mutex);
for(listeners_t::iterator it2=listeners.begin(), end2=listeners.end(); it2!=end2; ++it2) {
if(*it==*it2) {
listeners.erase(it2);
break;
}
}
}
}
}catch(...){
{
Guard G(mutex);
listeners_inprogress = false;
}
listeners_done.signal();
}
{
Guard G(mutex);
listeners_inprogress = false;
}
listeners_done.signal();
}
};
@@ -159,6 +181,12 @@ void ClientChannel::removeConnectListener(ConnectCallback* cb)
if(!impl) throw std::logic_error("Dead Channel");
Guard G(impl->mutex);
// ensure no in-progress callbacks
while(impl->listeners_inprogress) {
UnGuard U(G);
impl->listeners_done.wait();
}
for(Impl::listeners_t::iterator it=impl->listeners.begin(), end=impl->listeners.end(); it!=end; ++it)
{
if(cb==*it) {