diff --git a/src/ca/Makefile b/src/ca/Makefile index df0ae03..7aa4247 100644 --- a/src/ca/Makefile +++ b/src/ca/Makefile @@ -11,7 +11,7 @@ LIB_SYS_LIBS_WIN32 += ws2_32 INC += pv/caProvider.h -pvAccessCA_SRCS += stopMonitorThread.cpp +pvAccessCA_SRCS += monitorEventThread.cpp pvAccessCA_SRCS += caProvider.cpp pvAccessCA_SRCS += caChannel.cpp pvAccessCA_SRCS += dbdToPv.cpp diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index 47b4fe3..a4294a3 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -11,7 +11,7 @@ #include #include #include -#include "stopMonitorThread.h" +#include "monitorEventThread.h" #define epicsExportSharedSymbols #include "caChannel.h" @@ -70,7 +70,9 @@ void CAChannel::connected() getQueue.pop(); } while(!monitorQueue.empty()) { - monitorQueue.front()->activate(); + CAChannelMonitorPtr monitor(monitorQueue.front()); + monitor->activate(); + addMonitor(monitor); monitorQueue.pop(); } ChannelRequester::shared_pointer req(channelRequester.lock()); @@ -161,6 +163,14 @@ void CAChannel::disconnectChannel() if(!channelCreated) return; channelCreated = false; } + std::vector::iterator it; + for(it = monitorlist.begin(); it!=monitorlist.end(); ++it) + { + CAChannelMonitorPtr mon = (*it).lock(); + if(!mon) continue; + mon->stop(); + } + monitorlist.resize(0); /* Clear CA Channel */ CAChannelProviderPtr provider(channelProvider.lock()); if(provider) { @@ -233,7 +243,7 @@ void CAChannel::getField(GetFieldRequester::shared_pointer const & requester, } -AccessRights CAChannel::getAccessRights(epics::pvData::PVField::shared_pointer const & /*pvField*/) +AccessRights CAChannel::getAccessRights(PVField::shared_pointer const & /*pvField*/) { if (ca_write_access(channelID)) return readWrite; @@ -246,7 +256,7 @@ AccessRights CAChannel::getAccessRights(epics::pvData::PVField::shared_pointer c ChannelGet::shared_pointer CAChannel::createChannelGet( ChannelGetRequester::shared_pointer const & channelGetRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) + PVStructure::shared_pointer const & pvRequest) { if(DEBUG_LEVEL>0) { cout << "CAChannel::createChannelGet " << channelName << endl; @@ -267,7 +277,7 @@ ChannelGet::shared_pointer CAChannel::createChannelGet( ChannelPut::shared_pointer CAChannel::createChannelPut( ChannelPutRequester::shared_pointer const & channelPutRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) + PVStructure::shared_pointer const & pvRequest) { if(DEBUG_LEVEL>0) { cout << "CAChannel::createChannelPut " << channelName << endl; @@ -288,7 +298,7 @@ ChannelPut::shared_pointer CAChannel::createChannelPut( Monitor::shared_pointer CAChannel::createMonitor( MonitorRequester::shared_pointer const & monitorRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) + PVStructure::shared_pointer const & pvRequest) { if(DEBUG_LEVEL>0) { cout << "CAChannel::createMonitor " << channelName << endl; @@ -303,9 +313,22 @@ Monitor::shared_pointer CAChannel::createMonitor( } } channelMonitor->activate(); + addMonitor(channelMonitor); return channelMonitor; } +void CAChannel::addMonitor(CAChannelMonitorPtr const & monitor) +{ + std::vector::iterator it; + for(it = monitorlist.begin(); it!=monitorlist.end(); ++it) + { + CAChannelMonitorWPtr mon = *it; + if(mon.lock()) continue; + mon = monitor; + return; + } + monitorlist.push_back(monitor); +} void CAChannel::printInfo(std::ostream& out) { @@ -348,7 +371,7 @@ void CAChannelGetField::callRequester(CAChannelPtr const & caChannel) PVStructurePtr pvRequest(createRequest("")); DbdToPvPtr dbdToPv = DbdToPv::create(caChannel,pvRequest,getIO); PVStructurePtr pvStructure = dbdToPv->createPVStructure(); - epics::pvData::Structure::const_shared_pointer structure(pvStructure->getStructure()); + Structure::const_shared_pointer structure(pvStructure->getStructure()); Field::const_shared_pointer field = subField.empty() ? std::tr1::static_pointer_cast(structure) : @@ -385,7 +408,7 @@ size_t CAChannelGet::num_instances; CAChannelGetPtr CAChannelGet::create( CAChannel::shared_pointer const & channel, ChannelGetRequester::shared_pointer const & channelGetRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) + PVStructure::shared_pointer const & pvRequest) { if(DEBUG_LEVEL>0) { cout << "CAChannelGet::create " << channel->getChannelName() << endl; @@ -395,7 +418,7 @@ CAChannelGetPtr CAChannelGet::create( CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel, ChannelGetRequester::shared_pointer const & channelGetRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) + PVStructure::shared_pointer const & pvRequest) : channel(channel), channelGetRequester(channelGetRequester), @@ -490,7 +513,7 @@ size_t CAChannelPut::num_instances; CAChannelPutPtr CAChannelPut::create( CAChannel::shared_pointer const & channel, ChannelPutRequester::shared_pointer const & channelPutRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) + PVStructure::shared_pointer const & pvRequest) { if(DEBUG_LEVEL>0) { cout << "CAChannelPut::create " << channel->getChannelName() << endl; @@ -500,7 +523,7 @@ CAChannelPutPtr CAChannelPut::create( CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel, ChannelPutRequester::shared_pointer const & channelPutRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) + PVStructure::shared_pointer const & pvRequest) : channel(channel), channelPutRequester(channelPutRequester), @@ -698,7 +721,7 @@ size_t CAChannelMonitor::num_instances; CAChannelMonitorPtr CAChannelMonitor::create( CAChannel::shared_pointer const & channel, MonitorRequester::shared_pointer const & monitorRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) + PVStructure::shared_pointer const & pvRequest) { if(DEBUG_LEVEL>0) { cout << "CAChannelMonitor::create " << channel->getChannelName() << endl; @@ -715,7 +738,8 @@ CAChannelMonitor::CAChannelMonitor( monitorRequester(monitorRequester), pvRequest(pvRequest), isStarted(false), - stopMonitorThread(StopMonitorThread::get()) + monitorEventThread(MonitorEventThread::get()), + pevid(NULL) {} CAChannelMonitor::~CAChannelMonitor() @@ -726,9 +750,7 @@ CAChannelMonitor::~CAChannelMonitor() << " isStarted " << (isStarted ? "true" : "false") << endl; } - if(isStarted) stop(); - stopMonitorThread->addNoEventsCallback(&waitForNoEvents); - waitForNoEvents.wait(); + stop(); } void CAChannelMonitor::activate() @@ -753,6 +775,9 @@ void CAChannelMonitor::activate() if (size > 1) queueSize = size; } } + notifyRequester = NotifyRequesterPtr(new NotifyRequester()); + + notifyRequester->setChannelMonitor(shared_from_this()); monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize)); EXCEPTION_GUARD(requester->monitorConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); @@ -766,7 +791,10 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) std::cout << "CAChannelMonitor::subscriptionEvent " << channel->getChannelName() << endl; } - if(!isStarted) return; + { + Lock lock(mutex); + if(!isStarted) return; + } MonitorRequester::shared_pointer requester(monitorRequester.lock()); if(!requester) return; Status status = dbdToPv->getFromDBD(pvStructure,activeElement->changedBitSet,args); @@ -778,9 +806,7 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) } else { *(activeElement->overrunBitSet) |= *(activeElement->changedBitSet); } - - // call monitorRequester even if queue is full - requester->monitorEvent(shared_from_this()); + monitorEventThread->event(notifyRequester); } else { @@ -791,24 +817,27 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) } } -epics::pvData::Status CAChannelMonitor::start() +Status CAChannelMonitor::start() { if(DEBUG_LEVEL>0) { std::cout << "CAChannelMonitor::start " << channel->getChannelName() << endl; } Status status = Status::Ok; - if(isStarted) { - status = Status(Status::STATUSTYPE_WARNING,"already started"); - return status; + { + Lock lock(mutex); + if(isStarted) { + status = Status(Status::STATUSTYPE_WARNING,"already started"); + return status; + } + isStarted = true; + monitorQueue->start(); } channel->attachContext(); - monitorQueue->start(); - isStarted = true; int result = ca_create_subscription(dbdToPv->getRequestType(), 0, channel->getChannelID(), DBE_VALUE, ca_subscription_handler, this, - &eventID); + &pevid); if (result == ECA_NORMAL) { result = ca_flush_io(); @@ -819,7 +848,7 @@ epics::pvData::Status CAChannelMonitor::start() return Status(Status::STATUSTYPE_ERROR,message); } -epics::pvData::Status CAChannelMonitor::stop() +Status CAChannelMonitor::stop() { if(DEBUG_LEVEL>0) { std::cout << "CAChannelMonitor::stop " @@ -827,12 +856,26 @@ epics::pvData::Status CAChannelMonitor::stop() << " isStarted " << (isStarted ? "true" : "false") << endl; } - if(!isStarted) return Status(Status::STATUSTYPE_WARNING,"already stopped"); - isStarted = false; + { + Lock lock(mutex); + if(!isStarted) return Status(Status::STATUSTYPE_WARNING,"already stopped"); + isStarted = false; + } monitorQueue->stop(); - stopMonitorThread->callStop(eventID); - eventID = NULL; - return Status::Ok; + int result = ca_clear_subscription(pevid); + if(result==ECA_NORMAL) return Status::Ok; + return Status(Status::STATUSTYPE_ERROR,string(ca_message(result))); +} + +void CAChannelMonitor::notifyClient() +{ + { + Lock lock(mutex); + if(!isStarted) return; + } + MonitorRequester::shared_pointer requester(monitorRequester.lock()); + if(!requester) return; + requester->monitorEvent(shared_from_this()); } @@ -841,6 +884,10 @@ MonitorElementPtr CAChannelMonitor::poll() if(DEBUG_LEVEL>1) { std::cout << "CAChannelMonitor::poll " << channel->getChannelName() << endl; } + { + Lock lock(mutex); + if(!isStarted) return MonitorElementPtr(); + } return monitorQueue->poll(); } @@ -853,7 +900,7 @@ void CAChannelMonitor::release(MonitorElementPtr const & monitorElement) monitorQueue->release(monitorElement); } -/* --------------- epics::pvData::ChannelRequest --------------- */ +/* --------------- ChannelRequest --------------- */ void CAChannelMonitor::cancel() { diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index 24b0dfd..ae641c0 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -23,8 +23,12 @@ namespace epics { namespace pvAccess { namespace ca { -class StopMonitorThread; -typedef std::tr1::shared_ptr StopMonitorThreadPtr; +class NotifyRequester; +typedef std::tr1::shared_ptr NotifyRequesterPtr; + +class MonitorEventThread; +typedef std::tr1::shared_ptr MonitorEventThreadPtr; + class CAChannelGetField; typedef std::tr1::shared_ptr CAChannelGetFieldPtr; @@ -96,6 +100,7 @@ private: CAChannelProvider::shared_pointer const & channelProvider, ChannelRequester::shared_pointer const & channelRequester); void activate(short priority); + void addMonitor(CAChannelMonitorPtr const & monitor); std::string channelName; CAChannelProviderWPtr channelProvider; @@ -108,6 +113,7 @@ private: std::queue putQueue; std::queue getQueue; std::queue monitorQueue; + std::vector monitorlist; }; @@ -207,6 +213,7 @@ public: virtual void cancel(); virtual std::string getRequesterName(); void activate(); + void notifyClient(); private: virtual void destroy() {} CAChannelMonitor(CAChannel::shared_pointer const & _channel, @@ -216,13 +223,15 @@ private: MonitorRequester::weak_pointer monitorRequester; const epics::pvData::PVStructure::shared_pointer pvRequest; bool isStarted; - StopMonitorThreadPtr stopMonitorThread; + NotifyRequesterPtr notifyRequester; + MonitorEventThreadPtr monitorEventThread; + evid pevid; DbdToPvPtr dbdToPv; - epics::pvData::Event waitForNoEvents; + epics::pvData::Mutex mutex; epics::pvData::PVStructure::shared_pointer pvStructure; epics::pvData::MonitorElementPtr activeElement; - evid eventID; + CACMonitorQueuePtr monitorQueue; }; diff --git a/src/ca/caProvider.cpp b/src/ca/caProvider.cpp index db09e38..27386e2 100644 --- a/src/ca/caProvider.cpp +++ b/src/ca/caProvider.cpp @@ -15,7 +15,7 @@ #include #include -#include "stopMonitorThread.h" +#include "monitorEventThread.h" #define epicsExportSharedSymbols #include @@ -43,7 +43,7 @@ CAChannelProvider::CAChannelProvider() CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr&) : current_context(0), - stopMonitorThread(StopMonitorThread::get()) + monitorEventThread(MonitorEventThread::get()) { if(DEBUG_LEVEL>0) { std::cout<< "CAChannelProvider::CAChannelProvider\n"; @@ -77,11 +77,12 @@ CAChannelProvider::~CAChannelProvider() channelQ.front()->disconnectChannel(); channelQ.pop(); } - stopMonitorThread->stop(); + monitorEventThread->stop(); if(DEBUG_LEVEL>0) { std::cout << "CAChannelProvider::~CAChannelProvider() calling ca_context_destroy\n"; } ca_context_destroy(); +std::cout << "CAChannelProvider::~CAChannelProvider() returning\n"; } std::string CAChannelProvider::getProviderName() @@ -187,7 +188,6 @@ void CAChannelProvider::attachContext() void CAChannelProvider::initialize() { if(DEBUG_LEVEL>0) std::cout << "CAChannelProvider::initialize()\n"; - StopMonitorThreadPtr thread(StopMonitorThread::get()); int result = ca_context_create(ca_enable_preemptive_callback); if (result != ECA_NORMAL) { std::string mess("CAChannelProvider::initialize error calling ca_context_create "); @@ -195,7 +195,11 @@ void CAChannelProvider::initialize() throw std::runtime_error(mess); } current_context = ca_current_context(); - thread->attachContext(current_context); +} + +ca_client_context * CAChannelProvider::get_ca_client_context() +{ + return current_context; } void CAClientFactory::start() @@ -218,13 +222,21 @@ void CAClientFactory::start() } } +ca_client_context * CAClientFactory::get_ca_client_context() +{ + if(DEBUG_LEVEL>0) std::cout << "CAClientFactory::get_ca_client_context\n"; + ChannelProvider::shared_pointer channelProvider( + ChannelProviderRegistry::clients()->getProvider("ca")); + if(!channelProvider) throw std::runtime_error("CAClientFactory::start() was not called"); + CAChannelProviderPtr cacChannelProvider + = std::tr1::static_pointer_cast(channelProvider); + return cacChannelProvider->get_ca_client_context(); +} + void CAClientFactory::stop() { // unregister now done with exit hook } - -} -} -} +}}} diff --git a/src/ca/caProviderPvt.h b/src/ca/caProviderPvt.h index c04361e..af0c4d9 100644 --- a/src/ca/caProviderPvt.h +++ b/src/ca/caProviderPvt.h @@ -19,8 +19,9 @@ namespace ca { #define DEBUG_LEVEL 0 -class StopMonitorThread; -typedef std::tr1::shared_ptr StopMonitorThreadPtr; +class MonitorEventThread; +typedef std::tr1::shared_ptr MonitorEventThreadPtr; + class CAChannel; typedef std::tr1::shared_ptr CAChannelPtr; @@ -71,6 +72,7 @@ public: void attachContext(); void addChannel(const CAChannelPtr & channel); + ca_client_context* get_ca_client_context(); private: virtual void destroy() EPICS_DEPRECATED {} @@ -78,7 +80,7 @@ private: ca_client_context* current_context; epics::pvData::Mutex channelListMutex; std::vector caChannelList; - StopMonitorThreadPtr stopMonitorThread; + MonitorEventThreadPtr monitorEventThread; }; }}} diff --git a/src/ca/dbdToPv.cpp b/src/ca/dbdToPv.cpp index fd9c361..f8fe8ca 100644 --- a/src/ca/dbdToPv.cpp +++ b/src/ca/dbdToPv.cpp @@ -77,7 +77,10 @@ DbdToPv::DbdToPv(IOType ioType) caValueType(-1), caRequestType(-1), maxElements(0) -{} +{ + caTimeStamp.secPastEpoch = 0; + caTimeStamp.nsec = 0; +} static ScalarType dbr2ST[] = { diff --git a/src/ca/monitorEventThread.cpp b/src/ca/monitorEventThread.cpp new file mode 100644 index 0000000..e95d101 --- /dev/null +++ b/src/ca/monitorEventThread.cpp @@ -0,0 +1,113 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.06 + */ + +#include "caChannel.h" +#include +#define epicsExportSharedSymbols +#include "monitorEventThread.h" + +using namespace epics::pvData; +using namespace std; + +namespace epics { +namespace pvAccess { +namespace ca { + +MonitorEventThreadPtr MonitorEventThread::get() +{ + static MonitorEventThreadPtr master; + static Mutex mutex; + Lock xx(mutex); + if(!master) { + master = MonitorEventThreadPtr(new MonitorEventThread()); + master->start(); + } + return master; +} + +MonitorEventThread::MonitorEventThread() +: isStop(false) +{ +} + +MonitorEventThread::~MonitorEventThread() +{ +std::cout << "MonitorEventThread::~MonitorEventThread()\n"; +} + +void MonitorEventThread::start() +{ + thread = std::tr1::shared_ptr(new epicsThread( + *this, + "monitorEventThread", + epicsThreadGetStackSize(epicsThreadStackSmall), + epicsThreadPriorityLow)); + thread->start(); +} + +void MonitorEventThread::stop() +{ + { + Lock xx(mutex); + isStop = true; + } + waitForCommand.signal(); + waitForStop.wait(); +} + + +void MonitorEventThread::event(NotifyRequesterPtr const &stopMonitor) +{ + { + Lock lock(mutex); + if(stopMonitor->isOnQueue) return; + stopMonitor->isOnQueue = true; + notifyMonitorQueue.push(stopMonitor); + } + waitForCommand.signal(); +} + +void MonitorEventThread::run() +{ + while(true) + { + waitForCommand.wait(); + while(true) { + bool more = false; + NotifyRequester* notifyRequester(NULL); + { + Lock lock(mutex); + if(!notifyMonitorQueue.empty()) + { + more = true; + NotifyRequesterWPtr req(notifyMonitorQueue.front()); + notifyMonitorQueue.pop(); + NotifyRequesterPtr reqPtr(req.lock()); + if(reqPtr) { + notifyRequester = reqPtr.get(); + reqPtr->isOnQueue = false; + } + } + } + if(!more) break; + if(notifyRequester!=NULL) + { + CAChannelMonitorPtr channelMonitor(notifyRequester->channelMonitor.lock()); + if(channelMonitor) channelMonitor->notifyClient(); + } + } + if(isStop) { + waitForStop.signal(); + break; + } + } +} + +}}} diff --git a/src/ca/monitorEventThread.h b/src/ca/monitorEventThread.h new file mode 100644 index 0000000..0784d07 --- /dev/null +++ b/src/ca/monitorEventThread.h @@ -0,0 +1,71 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.06 + */ +#ifndef MonitorEventThread_H +#define MonitorEventThread_H +#include +#include +#include +#include +#include +#include + +namespace epics { +namespace pvAccess { +namespace ca { + +class NotifyRequester; +typedef std::tr1::shared_ptr NotifyRequesterPtr; +typedef std::tr1::weak_ptr NotifyRequesterWPtr; + + +class MonitorEventThread; +typedef std::tr1::shared_ptr MonitorEventThreadPtr; + +class CAChannelMonitor; +typedef std::tr1::shared_ptr CAChannelMonitorPtr; +typedef std::tr1::weak_ptr CAChannelMonitorWPtr; + +class NotifyRequester +{ +public: + MonitorRequester::weak_pointer monitorRequester; + CAChannelMonitorWPtr channelMonitor; + bool isOnQueue; + NotifyRequester() : isOnQueue(false) {} + void setChannelMonitor(CAChannelMonitorPtr const &channelMonitor) + { this->channelMonitor = channelMonitor;} +}; + + +class MonitorEventThread : + public epicsThreadRunable +{ +public: + static MonitorEventThreadPtr get(); + ~MonitorEventThread(); + virtual void run(); + void start(); + void stop(); + void event(NotifyRequesterPtr const ¬ifyRequester); +private: + MonitorEventThread(); + + bool isStop; + std::tr1::shared_ptr thread; + epics::pvData::Mutex mutex; + epics::pvData::Event waitForCommand; + epics::pvData::Event waitForStop; + std::queue notifyMonitorQueue; +}; + + +}}} + +#endif /* MonitorEventThread_H */ diff --git a/src/ca/pv/caProvider.h b/src/ca/pv/caProvider.h index 3f75c4a..a147d26 100644 --- a/src/ca/pv/caProvider.h +++ b/src/ca/pv/caProvider.h @@ -8,6 +8,7 @@ #define CAPROVIDER_H #include +#include #include namespace epics { @@ -17,23 +18,35 @@ namespace ca { /** * @brief CAClientFactory is a channel provider for the ca network provider. * + * A single instance is created the first time CAClientFactory::start is called. + * epicsAtExit is used to destroy the instance. + * + * The thread that calls start, or a ca auxillary thread, are the only threads + * that can call the ca_* functions. + * + * Note the monitor callbacks are made from a separate thread that must NOT call any ca_* function. * */ class epicsShareClass CAClientFactory { public: - /** @brief start the provider + /** @brief start provider ca * */ static void start(); - /** @brief stop the provider + /** @brief get the ca_client_context * + * This can be called by an application specific auxiliary thread. + * See ca documentation. Not for casual use. + */ + static ca_client_context * get_ca_client_context(); + /** @brief stop provider ca + * + * This does nothing since epicsAtExit is used to destroy the instance. */ static void stop(); }; -} -} -} +}}} #endif /* CAPROVIDER_H */ diff --git a/src/ca/stopMonitorThread.cpp b/src/ca/stopMonitorThread.cpp deleted file mode 100644 index 794bcd6..0000000 --- a/src/ca/stopMonitorThread.cpp +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Copyright - See the COPYRIGHT that is included with this distribution. - * pvAccessCPP is distributed subject to a Software License Agreement found - * in file LICENSE that is included with this distribution. - */ -/** - * @author mrk - * @date 2018.04 - */ - -#include "caChannel.h" -#include -#define epicsExportSharedSymbols -#include "stopMonitorThread.h" - -using namespace epics::pvData; - -namespace epics { -namespace pvAccess { -namespace ca { - -StopMonitorThreadPtr StopMonitorThread::get() -{ - static StopMonitorThreadPtr master; - static Mutex mutex; - Lock xx(mutex); - if(!master) { - master = StopMonitorThreadPtr(new StopMonitorThread()); - master->start(); - } - return master; -} - -StopMonitorThread::StopMonitorThread() -: isStop(false), - isAttachContext(false), - current_context(NULL) -{ -} - -StopMonitorThread::~StopMonitorThread() -{ -std::cout << "StopMonitorThread::~StopMonitorThread()\n"; -} - -void StopMonitorThread::attachContext(ca_client_context* current_context) -{ - Lock xx(mutex); - isAttachContext = true; - this->current_context = current_context; - waitForCommand.signal(); -} - -void StopMonitorThread::start() -{ - thread = std::tr1::shared_ptr(new epicsThread( - *this, - "stopMonitorThread", - epicsThreadGetStackSize(epicsThreadStackSmall), - epicsThreadPriorityLow)); - thread->start(); -} - -void StopMonitorThread::stop() -{ - { - Lock xx(mutex); - isStop = true; - } - waitForCommand.signal(); - waitForStop.wait(); -} - -void StopMonitorThread::callStop(evid pevid) -{ - Lock xx(mutex); - evidQueue.push(&(*pevid)); - waitForCommand.signal(); -} - -void StopMonitorThread::addNoEventsCallback(Event * event) -{ - Lock xx(mutex); - noEventsCallbackQueue.push(event); - waitForCommand.signal(); -} - - -void StopMonitorThread::run() -{ - while(true) - { - waitForCommand.wait(); - Lock lock(mutex); - if(isAttachContext) - { - int result = ca_attach_context(current_context); - if(result != ECA_NORMAL) { - std::string mess("StopMonitorThread::run() while calling ca_attach_context "); - mess += ca_message(result); - throw std::runtime_error(mess); - } - isAttachContext = false; - } - if(evidQueue.size()>0) - { - while(!evidQueue.empty()) - { - evid pvid = evidQueue.front(); - evidQueue.pop(); - int result = ca_clear_subscription(pvid); - if(result!=ECA_NORMAL) - { - std::cout << "StopMonitorThread::run() ca_clear_subscription error " - << ca_message(result) << "\n"; - } - } - } - if(noEventsCallbackQueue.size()>0) - { - while(!noEventsCallbackQueue.empty()) - { - Event * event = noEventsCallbackQueue.front(); - noEventsCallbackQueue.pop(); - event->signal(); - } - } - if(isStop) { - waitForStop.signal(); - break; - } - } -} - -}}} diff --git a/src/ca/stopMonitorThread.h b/src/ca/stopMonitorThread.h deleted file mode 100644 index d2e7bd2..0000000 --- a/src/ca/stopMonitorThread.h +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Copyright - See the COPYRIGHT that is included with this distribution. - * pvAccessCPP is distributed subject to a Software License Agreement found - * in file LICENSE that is included with this distribution. - */ -/** - * @author mrk - * @date 2018.04 - */ -#ifndef StopMonitorThread_H -#define StopMonitorThread_H -#include -#include -#include -#include -#include -#include - -namespace epics { -namespace pvAccess { -namespace ca { - - -class StopMonitorThread; -typedef std::tr1::shared_ptr StopMonitorThreadPtr; - - -class StopMonitorThread : - public epicsThreadRunable -{ -public: - ~StopMonitorThread(); - virtual void run(); - void start(); - void stop(); - static StopMonitorThreadPtr get(); - void callStop(evid pevid); - void attachContext(ca_client_context* current_context); - void addNoEventsCallback(epics::pvData::Event * event); -private: - StopMonitorThread(); - - std::tr1::shared_ptr thread; - epics::pvData::Mutex mutex; - epics::pvData::Event waitForCommand; - epics::pvData::Event waitForStop; - std::queue evidQueue; - std::queue noEventsCallbackQueue; - bool isStop; - bool isAttachContext; - ca_client_context* current_context; -}; - - -}}} - -#endif /* StopMonitorThread_H */