From d3ce104c336f8dd9f84734d3159ae86e699251fb Mon Sep 17 00:00:00 2001 From: mrkraimer Date: Sat, 23 Dec 2017 06:35:23 -0500 Subject: [PATCH] fix issue #77 --- src/ca/caChannel.cpp | 47 ++++++++++++++++++++++++++---------- src/ca/caChannel.h | 7 +++--- src/ca/caProvider.cpp | 54 +++++++++++++++++++++++++++++++++++------- src/ca/caProviderPvt.h | 9 +++++++ 4 files changed, 94 insertions(+), 23 deletions(-) diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index a1e3c90..1c1d0e9 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -374,7 +374,8 @@ CAChannel::CAChannel(std::string const & _channelName, channelRequester(_channelRequester), channelID(0), channelType(0), - elementCount(0) + elementCount(0), + channelCreated(false) { if(DEBUG_LEVEL>0) { cout<< "CAChannel::CAChannel " << channelName << endl; @@ -395,6 +396,9 @@ void CAChannel::activate(short priority) &channelID); if (result == ECA_NORMAL) { + channelCreated = true; + CAChannelProviderPtr provider(channelProvider.lock()); + if(provider) provider->addChannel(shared_from_this()); req->channelCreated(Status::Ok, shared_from_this()); } else { Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result))); @@ -402,6 +406,36 @@ void CAChannel::activate(short priority) } } +CAChannel::~CAChannel() +{ + if(DEBUG_LEVEL>0) { + cout << "CAChannel::~CAChannel() " << channelName << endl; + } + disconnectChannel(); +} + +void CAChannel::disconnectChannel() +{ + if(DEBUG_LEVEL>0) { + cout << "CAChannel::disconnectChannel() " + << channelName + << " channelCreated " << (channelCreated ? "true" : "false") + << endl; + } + bool disconnect = true; + { + Lock lock(requestsMutex); + if(!channelCreated) disconnect = false; + channelCreated = false; + } + if(!disconnect) return; + /* Clear CA Channel */ + threadAttach(); + ca_clear_channel(channelID); +} + + + void CAChannel::addChannelGet(const CAChannelGetPtr & get) { if(DEBUG_LEVEL>0) { @@ -448,17 +482,6 @@ void CAChannel::addChannelMonitor(const CAChannelMonitorPtr & monitor) monitorList.push_back(monitor); } -CAChannel::~CAChannel() -{ - if(DEBUG_LEVEL>0) { - cout << "CAChannel::~CAChannel() " << channelName << endl; - } - /* Clear CA Channel */ - threadAttach(); - ca_clear_channel(channelID); -} - - chid CAChannel::getChannelID() { return channelID; diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index cdb716b..72d201c 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -22,8 +22,7 @@ namespace epics { namespace pvAccess { namespace ca { -class CAChannel; -typedef std::tr1::shared_ptr CAChannelPtr; + class CAChannelGetField; typedef std::tr1::shared_ptr CAChannelGetFieldPtr; typedef std::tr1::weak_ptr CAChannelGetFieldWPtr; @@ -60,7 +59,7 @@ public: static size_t num_instances; - static shared_pointer create(CAChannelProvider::shared_pointer const & channelProvider, + static CAChannelPtr create(CAChannelProvider::shared_pointer const & channelProvider, std::string const & channelName, short priority, ChannelRequester::shared_pointer const & channelRequester); @@ -112,6 +111,7 @@ public: void addChannelGet(const CAChannelGetPtr & get); void addChannelPut(const CAChannelPutPtr & get); void addChannelMonitor(const CAChannelMonitorPtr & get); + void disconnectChannel(); private: @@ -129,6 +129,7 @@ private: chid channelID; chtype channelType; unsigned elementCount; + bool channelCreated; epics::pvData::Structure::const_shared_pointer structure; epics::pvData::Mutex requestsMutex; diff --git a/src/ca/caProvider.cpp b/src/ca/caProvider.cpp index 3da9c69..9267285 100644 --- a/src/ca/caProvider.cpp +++ b/src/ca/caProvider.cpp @@ -52,7 +52,28 @@ CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr&) CAChannelProvider::~CAChannelProvider() { - if(DEBUG_LEVEL>0) std::cout << "CAChannelProvider::~CAChannelProvider()\n"; + if(DEBUG_LEVEL>0) { + std::cout << "CAChannelProvider::~CAChannelProvider()" + << " caChannelList.size() " << caChannelList.size() + << std::endl; + } + std::queue channelQ; + { + Lock lock(channelListMutex); + for(size_t i=0; i< caChannelList.size(); ++i) { + CAChannelPtr caChannel(caChannelList[i].lock()); + if(caChannel) channelQ.push(caChannel); + } + } + while(!channelQ.empty()) { + if(DEBUG_LEVEL>0) { + std::cout << "disconnectAllChannels calling disconnectChannel " + << channelQ.front()->getChannelName() + << std::endl; + } + channelQ.front()->disconnectChannel(); + channelQ.pop(); + } } std::string CAChannelProvider::getProviderName() @@ -112,6 +133,23 @@ Channel::shared_pointer CAChannelProvider::createChannel( return CAChannel::create(shared_from_this(), channelName, priority, channelRequester); } +void CAChannelProvider::addChannel(const CAChannelPtr & channel) +{ + if(DEBUG_LEVEL>0) { + std::cout << "CAChannelProvider::addChannel " + << channel->getChannelName() + << std::endl; + } + Lock lock(channelListMutex); + for(size_t i=0; i< caChannelList.size(); ++i) { + if(!(caChannelList[i].lock())) { + caChannelList[i] = channel; + return; + } + } + caChannelList.push_back(channel); +} + void CAChannelProvider::configure(epics::pvData::PVStructure::shared_pointer /*configuration*/) { } @@ -136,13 +174,11 @@ void CAChannelProvider::initialize() /* Create Channel Access */ int result = ca_context_create(ca_enable_preemptive_callback); if (result != ECA_NORMAL) { - throw std::runtime_error(std::string("CA error %s occurred while trying " - "to start channel access:") + ca_message(result)); + throw std::runtime_error( + std::string("CA error %s occurred while trying to start channel access:") + + ca_message(result)); } - current_context = ca_current_context(); - - // TODO create a ca_poll thread, if ca_disable_preemptive_callback } @@ -154,7 +190,9 @@ void ca_factory_cleanup(void*) ChannelProviderRegistry::clients()->remove("ca"); ca_context_destroy(); } catch(std::exception& e) { - LOG(logLevelWarn, "Error when unregister \"ca\" factory"); + std::string message("Error when unregister \"ca\" factory"); + message += e.what(); + LOG(logLevelWarn,message.c_str()); } } @@ -173,7 +211,7 @@ void CAClientFactory::start() registerRefCounter("CAChannelPut", &CAChannelPut::num_instances); registerRefCounter("CAChannelMonitor", &CAChannelMonitor::num_instances); - if(ChannelProviderRegistry::clients()->add("ca", false)) + if(ChannelProviderRegistry::clients()->add("ca", true)) { epicsAtExit(&ca_factory_cleanup, NULL); } diff --git a/src/ca/caProviderPvt.h b/src/ca/caProviderPvt.h index 3e4b3a5..6d01361 100644 --- a/src/ca/caProviderPvt.h +++ b/src/ca/caProviderPvt.h @@ -19,6 +19,10 @@ namespace ca { #define DEBUG_LEVEL 0 +class CAChannel; +typedef std::tr1::shared_ptr CAChannelPtr; +typedef std::tr1::weak_ptr CAChannelWPtr; + class CAChannelProvider; typedef std::tr1::shared_ptr CAChannelProviderPtr; typedef std::tr1::weak_ptr CAChannelProviderWPtr; @@ -64,13 +68,18 @@ public: virtual void destroy() EPICS_DEPRECATED {}; + void addChannel(const CAChannelPtr & get); + /* ---------------------------------------------------------------- */ void threadAttach(); + private: void initialize(); ca_client_context* current_context; + epics::pvData::Mutex channelListMutex; + std::vector caChannelList; }; }