From fe8184cf95c39a4579aad32164aaae7baca40eb7 Mon Sep 17 00:00:00 2001 From: mrkraimer Date: Fri, 8 Jun 2018 15:22:08 -0400 Subject: [PATCH] added aux thread to call ca_clear_subscription --- src/ca/Makefile | 1 + src/ca/caChannel.cpp | 195 ++++++----------------------------- src/ca/caChannel.h | 44 ++------ src/ca/caProvider.cpp | 49 ++++----- src/ca/caProviderPvt.h | 17 ++- src/ca/dbdToPv.h | 5 + src/ca/stopMonitorThread.cpp | 140 +++++++++++++++++++++++++ src/ca/stopMonitorThread.h | 56 ++++++++++ 8 files changed, 275 insertions(+), 232 deletions(-) create mode 100644 src/ca/stopMonitorThread.cpp create mode 100644 src/ca/stopMonitorThread.h diff --git a/src/ca/Makefile b/src/ca/Makefile index 3a4cf10..df0ae03 100644 --- a/src/ca/Makefile +++ b/src/ca/Makefile @@ -11,6 +11,7 @@ LIB_SYS_LIBS_WIN32 += ws2_32 INC += pv/caProvider.h +pvAccessCA_SRCS += stopMonitorThread.cpp pvAccessCA_SRCS += caProvider.cpp pvAccessCA_SRCS += caChannel.cpp pvAccessCA_SRCS += dbdToPv.cpp diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index 6d7d187..7d2dc09 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -11,6 +11,7 @@ #include #include #include +#include "stopMonitorThread.h" #define epicsExportSharedSymbols #include "caChannel.h" @@ -84,18 +85,7 @@ void CAChannel::disconnected() if(DEBUG_LEVEL>0) { cout<< "CAChannel::disconnected " << channelName << endl; } - while(!putQueue.empty()) { - putQueue.front()->channelDisconnect(false); - putQueue.pop(); - } - while(!getQueue.empty()) { - getQueue.front()->channelDisconnect(false); - getQueue.pop(); - } - while(!monitorQueue.empty()) { - monitorQueue.front()->channelDisconnect(false); - monitorQueue.pop(); - } + ChannelRequester::shared_pointer req(channelRequester.lock()); if(req) { EXCEPTION_GUARD(req->channelStateChange( @@ -105,12 +95,12 @@ void CAChannel::disconnected() size_t CAChannel::num_instances; -CAChannel::CAChannel(std::string const & _channelName, - CAChannelProvider::shared_pointer const & _channelProvider, - ChannelRequester::shared_pointer const & _channelRequester) : - channelName(_channelName), - channelProvider(_channelProvider), - channelRequester(_channelRequester), +CAChannel::CAChannel(std::string const & channelName, + CAChannelProvider::shared_pointer const & channelProvider, + ChannelRequester::shared_pointer const & channelRequester) : + channelName(channelName), + channelProvider(channelProvider), + channelRequester(channelRequester), channelID(0), channelCreated(false) { @@ -149,6 +139,10 @@ CAChannel::~CAChannel() if(DEBUG_LEVEL>0) { cout << "CAChannel::~CAChannel() " << channelName << endl; } + { + Lock lock(requestsMutex); + if(!channelCreated) return; + } disconnectChannel(); } @@ -177,54 +171,6 @@ void CAChannel::disconnectChannel() cerr << mess << endl; } - - -void CAChannel::addChannelGet(const CAChannelGetPtr & get) -{ - if(DEBUG_LEVEL>0) { - cout<< "CAChannel::addChannelGet " << channelName << endl; - } - Lock lock(requestsMutex); - for(size_t i=0; i< getList.size(); ++i) { - if(!(getList[i].lock())) { - getList[i] = get; - return; - } - } - getList.push_back(get); -} - -void CAChannel::addChannelPut(const CAChannelPutPtr & put) -{ - if(DEBUG_LEVEL>0) { - cout<< "CAChannel::addChannelPut " << channelName << endl; - } - Lock lock(requestsMutex); - for(size_t i=0; i< putList.size(); ++i) { - if(!(putList[i].lock())) { - putList[i] = put; - return; - } - } - putList.push_back(put); -} - - -void CAChannel::addChannelMonitor(const CAChannelMonitorPtr & monitor) -{ - if(DEBUG_LEVEL>0) { - cout<< "CAChannel::addChannelMonitor " << channelName << endl; - } - Lock lock(requestsMutex); - for(size_t i=0; i< monitorList.size(); ++i) { - if(!(monitorList[i].lock())) { - monitorList[i] = monitor; - return; - } - } - monitorList.push_back(monitor); -} - chid CAChannel::getChannelID() { return channelID; @@ -426,11 +372,12 @@ void CAChannel::attachContext() std::tr1::static_pointer_cast(provider)->attachContext(); return; } - string mess("CAChannel::attachContext "); + string mess("CAChannel::attachContext provider does not exist "); mess += getChannelName(); throw std::runtime_error(mess); } + size_t CAChannelGet::num_instances; CAChannelGetPtr CAChannelGet::create( @@ -460,9 +407,6 @@ CAChannelGet::~CAChannelGet() } } -void CAChannelGet::channelCreated(const epics::pvData::Status& s,Channel::shared_pointer const & c) -{} - void CAChannelGet::activate() { ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock()); @@ -473,33 +417,12 @@ void CAChannelGet::activate() dbdToPv = DbdToPv::create(channel,pvRequest,getIO); pvStructure = dbdToPv->createPVStructure(); bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields())); - channel->addChannelGet(shared_from_this()); EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); } -void CAChannelGet::channelStateChange( - Channel::shared_pointer const & channel, - Channel::ConnectionState connectionState) -{ - string mess("CAChannelGet::channelStateChange was called "); - mess += channel->getChannelName(); - throw std::runtime_error(mess); -} - std::string CAChannelGet::getRequesterName() { return "CAChannelGet";} -void CAChannelGet::channelDisconnect(bool destroy) -{ - if(DEBUG_LEVEL>0) { - std::cout << "CAChannelGet::channelDisconnect " << channel->getChannelName() << endl; - } - ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock()); - if(!getRequester) return; - EXCEPTION_GUARD(getRequester->channelDisconnect(destroy);) - if(!destroy) channel->addChannelGet(shared_from_this()); -} - namespace { static void ca_get_handler(struct event_handler_args args) @@ -590,8 +513,6 @@ CAChannelPut::~CAChannelPut() } } -void CAChannelPut::channelCreated(const Status& status,Channel::shared_pointer const & c) -{} void CAChannelPut::activate() { @@ -608,32 +529,12 @@ void CAChannelPut::activate() std::string val = pvString->get(); if(val.compare("true")==0) block = true; } - channel->addChannelPut(shared_from_this()); EXCEPTION_GUARD(putRequester->channelPutConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); } -void CAChannelPut::channelStateChange( - Channel::shared_pointer const & channel, - Channel::ConnectionState connectionState) -{ - string mess("CAChannelPut::channelStateChange was called "); - mess += channel->getChannelName(); - throw std::runtime_error(mess); -} - std::string CAChannelPut::getRequesterName() { return "CAChannelPut";} -void CAChannelPut::channelDisconnect(bool destroy) -{ - if(DEBUG_LEVEL>0) { - cout << "CAChannelPut::channelDisconnect " << channel->getChannelName() << endl; - } - ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock()); - if(!putRequester) return; - EXCEPTION_GUARD(putRequester->channelDisconnect(destroy);) - if(!destroy) channel->addChannelPut(shared_from_this()); -} /* --------------- epics::pvAccess::ChannelPut --------------- */ @@ -781,6 +682,7 @@ public: void release(MonitorElementPtr const & monitorElement) { Lock guard(mutex); + if(!isStarted) return; if(monitorElementQueue.empty()) { string mess("CAChannelMonitor::release client error calling release "); throw std::runtime_error(mess); @@ -810,26 +712,22 @@ CAChannelMonitor::CAChannelMonitor( channel(channel), monitorRequester(monitorRequester), pvRequest(pvRequest), - isStarted(false) + isStarted(false), + stopMonitorThread(StopMonitorThread::get()) {} CAChannelMonitor::~CAChannelMonitor() { if(DEBUG_LEVEL>0) { - std::cout << "CAChannelMonitor::~CAChannelMonitor() " << channel->getChannelName() << endl; + std::cout << "CAChannelMonitor::~CAChannelMonitor() " + << channel->getChannelName() + << " isStarted " << (isStarted ? "true" : "false") + << endl; } - if(!isStarted) return; - channel->attachContext(); - int result = ca_clear_subscription(eventID); - if (result == ECA_NORMAL) return; - string mess("CAChannelMonitor::~CAChannelMonitor() "); - mess += ca_message(result); - cerr << mess << endl; + if(isStarted) stop(); + stopMonitorThread->waitForNoEvents(); } -void CAChannelMonitor::channelCreated(const Status& status,Channel::shared_pointer const & c) -{} - void CAChannelMonitor::activate() { MonitorRequester::shared_pointer requester(monitorRequester.lock()); @@ -853,44 +751,22 @@ void CAChannelMonitor::activate() } } monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize)); - channel->addChannelMonitor(shared_from_this()); EXCEPTION_GUARD(requester->monitorConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); } - -void CAChannelMonitor::channelStateChange( - Channel::shared_pointer const & channel, - Channel::ConnectionState connectionState) -{ - string mess("CAChannelMonitor::channelStateChange was called "); - mess += channel->getChannelName(); - throw std::runtime_error(mess); -} - std::string CAChannelMonitor::getRequesterName() { return "CAChannelMonitor";} -void CAChannelMonitor::channelDisconnect(bool destroy) -{ - if(DEBUG_LEVEL>0) { - std::cout << "CAChannelMonitor::channelDisconnect " << channel->getChannelName() << endl; - } - MonitorRequester::shared_pointer requester(monitorRequester.lock()); - if(!requester) return; - EXCEPTION_GUARD(requester->channelDisconnect(destroy);) - if(!destroy) channel->addChannelMonitor(shared_from_this()); -} - void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) { if(DEBUG_LEVEL>1) { std::cout << "CAChannelMonitor::subscriptionEvent " << channel->getChannelName() << endl; } + if(!isStarted) return; MonitorRequester::shared_pointer requester(monitorRequester.lock()); if(!requester) return; Status status = dbdToPv->getFromDBD(pvStructure,activeElement->changedBitSet,args); - if(status.isOK()) { if(monitorQueue->event(pvStructure,activeElement)) { @@ -943,22 +819,17 @@ epics::pvData::Status CAChannelMonitor::start() epics::pvData::Status CAChannelMonitor::stop() { if(DEBUG_LEVEL>0) { - std::cout << "CAChannelMonitor::stop " << channel->getChannelName() << endl; + std::cout << "CAChannelMonitor::stop " + << channel->getChannelName() + << " isStarted " << (isStarted ? "true" : "false") + << endl; } - Status status = Status::Ok; if(!isStarted) return Status(Status::STATUSTYPE_WARNING,"already stopped"); - channel->attachContext(); - int result = ca_clear_subscription(eventID); - if (result == ECA_NORMAL) - { - isStarted = false; - monitorQueue->stop(); - result = ca_flush_io(); - } - if (result == ECA_NORMAL) return status; - string mess("CAChannelMonitor::stop() "); - mess += ca_message(result); - return Status(Status::STATUSTYPE_ERROR,mess); + isStarted = false; + monitorQueue->stop(); + stopMonitorThread->callStop(eventID); + eventID = NULL; + return Status::Ok; } diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index a79b65b..529d17d 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -11,6 +11,7 @@ #include #include +#include /* for CA */ #include @@ -22,6 +23,9 @@ namespace epics { namespace pvAccess { namespace ca { +class StopMonitorThread; +typedef std::tr1::shared_ptr StopMonitorThreadPtr; + class CAChannelGetField; typedef std::tr1::shared_ptr CAChannelGetFieldPtr; typedef std::tr1::weak_ptr CAChannelGetFieldWPtr; @@ -85,9 +89,6 @@ public: virtual void printInfo(std::ostream& out); void attachContext(); - void addChannelGet(const CAChannelGetPtr & get); - void addChannelPut(const CAChannelPutPtr & get); - void addChannelMonitor(const CAChannelMonitorPtr & get); void disconnectChannel(); private: virtual void destroy() {} @@ -101,22 +102,17 @@ private: ChannelRequester::weak_pointer channelRequester; chid channelID; bool channelCreated; - epics::pvData::Mutex requestsMutex; + epics::pvData::Mutex requestsMutex; std::queue getFieldQueue; std::queue putQueue; std::queue getQueue; std::queue monitorQueue; - std::vector getList; - std::vector putList; - std::vector monitorList; }; class CAChannelGet : public ChannelGet, - public ChannelRequester, - public ChannelBaseRequester, public std::tr1::enable_shared_from_this { public: @@ -131,15 +127,7 @@ public: virtual Channel::shared_pointer getChannel(); virtual void cancel(); virtual void lastRequest(); - - virtual void channelCreated( - const epics::pvData::Status& status, - Channel::shared_pointer const & channel); - virtual void channelStateChange( - Channel::shared_pointer const & channel, - Channel::ConnectionState cosnectionState); virtual std::string getRequesterName(); - virtual void channelDisconnect(bool destroy); void activate(); @@ -159,8 +147,6 @@ private: class CAChannelPut : public ChannelPut, - public ChannelRequester, - public ChannelBaseRequester, public std::tr1::enable_shared_from_this { @@ -181,15 +167,7 @@ public: virtual void cancel(); virtual void lastRequest(); - virtual void channelCreated( - const epics::pvData::Status& status, - Channel::shared_pointer const & channel); - virtual void channelStateChange( - Channel::shared_pointer const & channel, - Channel::ConnectionState connectionState); virtual std::string getRequesterName(); - virtual void channelDisconnect(bool destroy); - void activate(); private: virtual void destroy() {} @@ -210,8 +188,6 @@ typedef std::tr1::shared_ptr CACMonitorQueuePtr; class CAChannelMonitor : public Monitor, - public ChannelRequester, - public ChannelBaseRequester, public std::tr1::enable_shared_from_this { @@ -229,16 +205,7 @@ public: virtual MonitorElementPtr poll(); virtual void release(MonitorElementPtr const & monitorElement); virtual void cancel(); - - virtual void channelCreated( - const epics::pvData::Status& status, - Channel::shared_pointer const & channel); - virtual void channelStateChange( - Channel::shared_pointer const & channel, - Channel::ConnectionState connectionState); virtual std::string getRequesterName(); - - virtual void channelDisconnect(bool destroy); void activate(); private: virtual void destroy() {} @@ -249,6 +216,7 @@ private: MonitorRequester::weak_pointer monitorRequester; const epics::pvData::PVStructure::shared_pointer pvRequest; bool isStarted; + StopMonitorThreadPtr stopMonitorThread; DbdToPvPtr dbdToPv; epics::pvData::PVStructure::shared_pointer pvStructure; diff --git a/src/ca/caProvider.cpp b/src/ca/caProvider.cpp index b3b2ae1..db09e38 100644 --- a/src/ca/caProvider.cpp +++ b/src/ca/caProvider.cpp @@ -15,6 +15,8 @@ #include #include +#include "stopMonitorThread.h" + #define epicsExportSharedSymbols #include #include "caProviderPvt.h" @@ -40,13 +42,12 @@ CAChannelProvider::CAChannelProvider() } CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr&) - : current_context(0) + : current_context(0), + stopMonitorThread(StopMonitorThread::get()) { if(DEBUG_LEVEL>0) { std::cout<< "CAChannelProvider::CAChannelProvider\n"; } - // Ignoring Configuration as CA only allows config via. environment, - // and we don't want to change this here. initialize(); } @@ -60,23 +61,26 @@ CAChannelProvider::~CAChannelProvider() std::queue channelQ; { Lock lock(channelListMutex); - for(size_t i=0; i< caChannelList.size(); ++i) { + for(size_t i=0; i< caChannelList.size(); ++i) + { CAChannelPtr caChannel(caChannelList[i].lock()); if(caChannel) channelQ.push(caChannel); } caChannelList.clear(); } - attachContext(); while(!channelQ.empty()) { if(DEBUG_LEVEL>0) { - std::cout << "disconnectAllChannels calling disconnectChannel " + std::cout << "~CAChannelProvider() calling disconnectChannel " << channelQ.front()->getChannelName() << std::endl; } channelQ.front()->disconnectChannel(); channelQ.pop(); } - ca_flush_io(); + stopMonitorThread->stop(); + if(DEBUG_LEVEL>0) { + std::cout << "CAChannelProvider::~CAChannelProvider() calling ca_context_destroy\n"; + } ca_context_destroy(); } @@ -90,12 +94,12 @@ ChannelFind::shared_pointer CAChannelProvider::channelFind( ChannelFindRequester::shared_pointer const & channelFindRequester) { if (channelName.empty()) - throw std::invalid_argument("empty channel name"); + throw std::invalid_argument("CAChannelProvider::channelFind empty channel name"); if (!channelFindRequester) - throw std::invalid_argument("null requester"); + throw std::invalid_argument("CAChannelProvider::channelFind null requester"); - Status errorStatus(Status::STATUSTYPE_ERROR, "not implemented"); + Status errorStatus(Status::STATUSTYPE_ERROR, "CAChannelProvider::channelFind not implemented"); ChannelFind::shared_pointer nullChannelFind; EXCEPTION_GUARD(channelFindRequester->channelFindResult(errorStatus, nullChannelFind, false)); return nullChannelFind; @@ -105,9 +109,9 @@ ChannelFind::shared_pointer CAChannelProvider::channelList( ChannelListRequester::shared_pointer const & channelListRequester) { if (!channelListRequester.get()) - throw std::runtime_error("null requester"); + throw std::runtime_error("CAChannelProvider::channelList null requester"); - Status errorStatus(Status::STATUSTYPE_ERROR, "not implemented"); + Status errorStatus(Status::STATUSTYPE_ERROR, "CAChannelProvider::channelList not implemented"); ChannelFind::shared_pointer nullChannelFind; PVStringArray::const_svector none; EXCEPTION_GUARD(channelListRequester->channelListResult(errorStatus, nullChannelFind, none, false)); @@ -131,7 +135,7 @@ Channel::shared_pointer CAChannelProvider::createChannel( std::string const & address) { if (!address.empty()) - throw std::invalid_argument("CA does not support 'address' parameter"); + throw std::invalid_argument("CAChannelProvider::createChannel does not support 'address' parameter"); return CAChannel::create(shared_from_this(), channelName, priority, channelRequester); } @@ -165,40 +169,39 @@ void CAChannelProvider::poll() { } - void CAChannelProvider::attachContext() { ca_client_context* thread_context = ca_current_context(); if (thread_context == current_context) return; if (thread_context != NULL) { - throw std::runtime_error("CAChannelProvider: Foreign CA context in use"); + throw std::runtime_error("CAChannelProvider::attachContext Foreign CA context in use"); } int result = ca_attach_context(current_context); if (result != ECA_NORMAL) { - std::cout << - "CA error %s occurred while calling ca_attach_context:" - << ca_message(result) << std::endl; + std::string mess("CAChannelProvider::attachContext error calling ca_attach_context "); + mess += ca_message(result); + throw std::runtime_error(mess); } } void CAChannelProvider::initialize() { if(DEBUG_LEVEL>0) std::cout << "CAChannelProvider::initialize()\n"; - /* Create Channel Access */ + StopMonitorThreadPtr thread(StopMonitorThread::get()); int result = ca_context_create(ca_enable_preemptive_callback); if (result != ECA_NORMAL) { - throw std::runtime_error( - std::string("CA error %s occurred while trying to start channel access:") - + ca_message(result)); + std::string mess("CAChannelProvider::initialize error calling ca_context_create "); + mess += ca_message(result); + throw std::runtime_error(mess); } current_context = ca_current_context(); + thread->attachContext(current_context); } void CAClientFactory::start() { if(DEBUG_LEVEL>0) std::cout << "CAClientFactory::start()\n"; if(ChannelProviderRegistry::clients()->getProvider("ca")) { - // do not start twice return; } epicsSignalInstallSigAlarmIgnore(); diff --git a/src/ca/caProviderPvt.h b/src/ca/caProviderPvt.h index 7f656b8..3ba1cab 100644 --- a/src/ca/caProviderPvt.h +++ b/src/ca/caProviderPvt.h @@ -17,7 +17,10 @@ namespace epics { namespace pvAccess { namespace ca { -#define DEBUG_LEVEL 0 +#define DEBUG_LEVEL 1 + +class StopMonitorThread; +typedef std::tr1::shared_ptr StopMonitorThreadPtr; class CAChannel; typedef std::tr1::shared_ptr CAChannelPtr; @@ -66,22 +69,18 @@ public: virtual void flush(); virtual void poll(); - void addChannel(const CAChannelPtr & channel); - - /* ---------------------------------------------------------------- */ - void attachContext(); - + void addChannel(const CAChannelPtr & channel); private: + virtual void destroy() EPICS_DEPRECATED {} void initialize(); ca_client_context* current_context; epics::pvData::Mutex channelListMutex; std::vector caChannelList; + StopMonitorThreadPtr stopMonitorThread; }; -} -} -} +}}} #endif /* CAPROVIDERPVT_H */ diff --git a/src/ca/dbdToPv.h b/src/ca/dbdToPv.h index fcb1d62..dea4a4b 100644 --- a/src/ca/dbdToPv.h +++ b/src/ca/dbdToPv.h @@ -35,12 +35,14 @@ typedef std::tr1::shared_ptr ValueAlarmDbdPtr; struct CaAlarm { + CaAlarm() : status(0), severity(0) {} dbr_short_t status; dbr_short_t severity; }; struct CaDisplay { + CaDisplay() : lower_disp_limit(0),upper_disp_limit(0) {} double lower_disp_limit; double upper_disp_limit; std::string units; @@ -49,12 +51,15 @@ struct CaDisplay struct CaControl { + CaControl() : upper_ctrl_limit(0),lower_ctrl_limit(0) {} double upper_ctrl_limit; double lower_ctrl_limit; }; struct CaValueAlarm { + CaValueAlarm() : upper_alarm_limit(0),upper_warning_limit(0),lower_warning_limit(0),lower_alarm_limit(0) + {} double upper_alarm_limit; double upper_warning_limit; double lower_warning_limit; diff --git a/src/ca/stopMonitorThread.cpp b/src/ca/stopMonitorThread.cpp new file mode 100644 index 0000000..ab1c356 --- /dev/null +++ b/src/ca/stopMonitorThread.cpp @@ -0,0 +1,140 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.04 + */ + +#include "caChannel.h" +#include +#define epicsExportSharedSymbols +#include "stopMonitorThread.h" + +using namespace epics::pvData; + +namespace epics { +namespace pvAccess { +namespace ca { + +StopMonitorThreadPtr StopMonitorThread::get() +{ + static StopMonitorThreadPtr master; + static Mutex mutex; + Lock xx(mutex); + if(!master) { + master = StopMonitorThreadPtr(new StopMonitorThread()); + master->start(); + } + return master; +} + +StopMonitorThread::StopMonitorThread() +: isStop(false), + isAttachContext(false), + isWaitForNoEvents(false), + current_context(NULL) +{ +} + +StopMonitorThread::~StopMonitorThread() +{ +std::cout << "StopMonitorThread::~StopMonitorThread()\n"; +} + +void StopMonitorThread::attachContext(ca_client_context* current_context) +{ + Lock xx(mutex); + isAttachContext = true; + this->current_context = current_context; + waitForCommand.signal(); +} + +void StopMonitorThread::start() +{ + thread = std::tr1::shared_ptr(new epicsThread( + *this, + "stopMonitorThread", + epicsThreadGetStackSize(epicsThreadStackSmall), + epicsThreadPriorityLow)); + thread->start(); +} + +void StopMonitorThread::stop() +{ + { + Lock xx(mutex); + isStop = true; + } + waitForCommand.signal(); + waitForStop.wait(); +} + +void StopMonitorThread::callStop(evid pevid) +{ + { + Lock xx(mutex); + evidQueue.push(&(*pevid)); + } + waitForCommand.signal(); +} + +void StopMonitorThread::waitForNoEvents() +{ + while(true) + { + { + Lock xx(mutex); + if(evidQueue.size()==0) return; + isWaitForNoEvents = true; + } + waitForCommand.signal(); + noMoreEvents.wait(); + } +} + +void StopMonitorThread::run() +{ + while(true) + { + waitForCommand.wait(); + Lock lock(mutex); + if(isAttachContext) + { + int result = ca_attach_context(current_context); + if(result != ECA_NORMAL) { + std::string mess("StopMonitorThread::run() while calling ca_attach_context "); + mess += ca_message(result); + throw std::runtime_error(mess); + } + isAttachContext = false; + } + if(evidQueue.size()>0) + { + while(!evidQueue.empty()) + { + evid pvid = evidQueue.front(); + evidQueue.pop(); + int result = ca_clear_subscription(pvid); + if(result!=ECA_NORMAL) + { + std::cout << "StopMonitorThread::run() ca_clear_subscription error " + << ca_message(result) << "\n"; + } + } + } + if(isWaitForNoEvents) + { + isWaitForNoEvents = false; + noMoreEvents.signal(); + } + if(isStop) { + waitForStop.signal(); + break; + } + } +} + +}}} diff --git a/src/ca/stopMonitorThread.h b/src/ca/stopMonitorThread.h new file mode 100644 index 0000000..4b88627 --- /dev/null +++ b/src/ca/stopMonitorThread.h @@ -0,0 +1,56 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.04 + */ +#ifndef StopMonitorThread_H +#define StopMonitorThread_H +#include +#include +#include +#include +#include +#include + +namespace epics { +namespace pvAccess { +namespace ca { + +class StopMonitorThread; +typedef std::tr1::shared_ptr StopMonitorThreadPtr; + +class StopMonitorThread : + public epicsThreadRunable +{ +public: + ~StopMonitorThread(); + virtual void run(); + void start(); + void stop(); + static StopMonitorThreadPtr get(); + void callStop(evid pevid); + void attachContext(ca_client_context* current_context); + void waitForNoEvents(); +private: + StopMonitorThread(); + + std::tr1::shared_ptr thread; + epics::pvData::Mutex mutex; + epics::pvData::Event waitForCommand; + epics::pvData::Event waitForStop; + epics::pvData::Event noMoreEvents; + std::queue evidQueue; + bool isStop; + bool isAttachContext; + bool isWaitForNoEvents; + ca_client_context* current_context; +}; + + +}}} + +#endif /* StopMonitorThread_H */