diff --git a/src/ca/Makefile b/src/ca/Makefile index 93e6b27..6bed7c3 100644 --- a/src/ca/Makefile +++ b/src/ca/Makefile @@ -11,6 +11,7 @@ LIB_SYS_LIBS_WIN32 += ws2_32 INC += pv/caProvider.h +pvAccessCA_SRCS += channelConnectThread.cpp pvAccessCA_SRCS += monitorEventThread.cpp pvAccessCA_SRCS += getDoneThread.cpp pvAccessCA_SRCS += putDoneThread.cpp diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index b19349b..40ba9fa 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -10,6 +10,7 @@ #include #include #include +#include "channelConnectThread.h" #include "monitorEventThread.h" #include "getDoneThread.h" #include "putDoneThread.h" @@ -62,6 +63,14 @@ void CAChannel::connected() if(DEBUG_LEVEL>0) { cout<< "CAChannel::connected " << channelName << endl; } + ChannelConnectThread->channelConnected(notifyChannelRequester); +} + +void CAChannel::notifyClient() +{ + if(DEBUG_LEVEL>0) { + cout<< "CAChannel::notifyClient " << channelName << endl; + } while(!putQueue.empty()) { putQueue.front()->activate(); putQueue.pop(); @@ -103,7 +112,8 @@ CAChannel::CAChannel(std::string const & channelName, channelProvider(channelProvider), channelRequester(channelRequester), channelID(0), - channelCreated(false) + channelCreated(false), + ChannelConnectThread(ChannelConnectThread::get()) { if(DEBUG_LEVEL>0) { cout<< "CAChannel::CAChannel " << channelName << endl; @@ -117,6 +127,8 @@ void CAChannel::activate(short priority) } ChannelRequester::shared_pointer req(channelRequester.lock()); if(!req) return; + notifyChannelRequester = NotifyChannelRequesterPtr(new NotifyChannelRequester()); + notifyChannelRequester->setChannel(shared_from_this()); attachContext(); int result = ca_create_channel(channelName.c_str(), ca_connection_handler, diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index 8228627..f1203bc 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -28,6 +28,17 @@ namespace epics { namespace pvAccess { namespace ca { +class CAChannel; +typedef std::tr1::shared_ptr CAChannelPtr; +typedef std::tr1::weak_ptr CAChannelWPtr; +class ChannelConnectThread; +typedef std::tr1::shared_ptr ChannelConnectThreadPtr; + +class NotifyChannelRequester; +typedef std::tr1::shared_ptr NotifyChannelRequesterPtr; +class ChannelConnectThread; +typedef std::tr1::shared_ptr ChannelConnectThreadPtr; + class NotifyMonitorRequester; typedef std::tr1::shared_ptr NotifyMonitorRequesterPtr; class MonitorEventThread; @@ -108,6 +119,7 @@ public: void attachContext(); void disconnectChannel(); + void notifyClient(); private: virtual void destroy() {} CAChannel(std::string const & channelName, @@ -121,6 +133,8 @@ private: ChannelRequester::weak_pointer channelRequester; chid channelID; bool channelCreated; + ChannelConnectThreadPtr ChannelConnectThread; + NotifyChannelRequesterPtr notifyChannelRequester; epics::pvData::Mutex requestsMutex; std::queue getFieldQueue; diff --git a/src/ca/caProvider.cpp b/src/ca/caProvider.cpp index 5c5b22c..f670add 100644 --- a/src/ca/caProvider.cpp +++ b/src/ca/caProvider.cpp @@ -11,6 +11,7 @@ #include #include +#include "channelConnectThread.h" #include "monitorEventThread.h" #include "getDoneThread.h" #include "putDoneThread.h" @@ -39,6 +40,7 @@ CAChannelProvider::CAChannelProvider() CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr&) : current_context(0), + channelConnectThread(ChannelConnectThread::get()), monitorEventThread(MonitorEventThread::get()), getDoneThread(GetDoneThread::get()), putDoneThread(PutDoneThread::get()) @@ -75,9 +77,10 @@ CAChannelProvider::~CAChannelProvider() channelQ.front()->disconnectChannel(); channelQ.pop(); } - monitorEventThread->stop(); - getDoneThread->stop(); putDoneThread->stop(); + getDoneThread->stop(); + monitorEventThread->stop(); + channelConnectThread->stop(); if(DEBUG_LEVEL>0) { std::cout << "CAChannelProvider::~CAChannelProvider() calling ca_context_destroy\n"; } diff --git a/src/ca/caProviderPvt.h b/src/ca/caProviderPvt.h index 0d48271..4a84c9d 100644 --- a/src/ca/caProviderPvt.h +++ b/src/ca/caProviderPvt.h @@ -24,6 +24,9 @@ namespace ca { #define DEBUG_LEVEL 0 +class ChannelConnectThread; +typedef std::tr1::shared_ptr ChannelConnectThreadPtr; + class MonitorEventThread; typedef std::tr1::shared_ptr MonitorEventThreadPtr; @@ -86,6 +89,7 @@ private: ca_client_context* current_context; epics::pvData::Mutex channelListMutex; std::vector caChannelList; + ChannelConnectThreadPtr channelConnectThread; MonitorEventThreadPtr monitorEventThread; GetDoneThreadPtr getDoneThread; PutDoneThreadPtr putDoneThread; diff --git a/src/ca/channelConnectThread.cpp b/src/ca/channelConnectThread.cpp new file mode 100644 index 0000000..c378019 --- /dev/null +++ b/src/ca/channelConnectThread.cpp @@ -0,0 +1,115 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.07 + */ + +#include "caChannel.h" +#include +#define epicsExportSharedSymbols +#include "channelConnectThread.h" + +using namespace epics::pvData; +using namespace std; + +namespace epics { +namespace pvAccess { +namespace ca { + +ChannelConnectThreadPtr ChannelConnectThread::get() +{ + static ChannelConnectThreadPtr master; + static Mutex mutex; + Lock xx(mutex); + if(!master) { + master = ChannelConnectThreadPtr(new ChannelConnectThread()); + master->start(); + } + return master; +} + +ChannelConnectThread::ChannelConnectThread() +: isStop(false) +{ +} + +ChannelConnectThread::~ChannelConnectThread() +{ +//std::cout << "ChannelConnectThread::~ChannelConnectThread()\n"; +} + + +void ChannelConnectThread::start() +{ + thread = std::tr1::shared_ptr(new epicsThread( + *this, + "channelConnectThread", + epicsThreadGetStackSize(epicsThreadStackSmall), + epicsThreadPriorityLow)); + thread->start(); +} + + +void ChannelConnectThread::stop() +{ + { + Lock xx(mutex); + isStop = true; + } + waitForCommand.signal(); + waitForStop.wait(); +} + +void ChannelConnectThread::channelConnected( + NotifyChannelRequesterPtr const ¬ifyChannelRequester) +{ + { + Lock lock(mutex); + if(notifyChannelRequester->isOnQueue) return; + notifyChannelRequester->isOnQueue = true; + notifyChannelQueue.push(notifyChannelRequester); + } + waitForCommand.signal(); +} + +void ChannelConnectThread::run() +{ + while(true) + { + waitForCommand.wait(); + while(true) { + bool more = false; + NotifyChannelRequester* notifyChannelRequester(NULL); + { + Lock lock(mutex); + if(!notifyChannelQueue.empty()) + { + more = true; + NotifyChannelRequesterWPtr req(notifyChannelQueue.front()); + notifyChannelQueue.pop(); + NotifyChannelRequesterPtr reqPtr(req.lock()); + if(reqPtr) { + notifyChannelRequester = reqPtr.get(); + reqPtr->isOnQueue = false; + } + } + } + if(!more) break; + if(notifyChannelRequester!=NULL) + { + CAChannelPtr channel(notifyChannelRequester->channel.lock()); + if(channel) channel->notifyClient(); + } + } + if(isStop) { + waitForStop.signal(); + break; + } + } +} + +}}} diff --git a/src/ca/channelConnectThread.h b/src/ca/channelConnectThread.h new file mode 100644 index 0000000..7dcb568 --- /dev/null +++ b/src/ca/channelConnectThread.h @@ -0,0 +1,71 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.07 + */ +#ifndef ChannelConnectThread_H +#define ChannelConnectThread_H +#include +#include +#include +#include +#include +#include + +namespace epics { +namespace pvAccess { +namespace ca { + +class NotifyChannelRequester; +typedef std::tr1::shared_ptr NotifyChannelRequesterPtr; +typedef std::tr1::weak_ptr NotifyChannelRequesterWPtr; + + +class ChannelConnectThread; +typedef std::tr1::shared_ptr ChannelConnectThreadPtr; + +class CAChannel; +typedef std::tr1::shared_ptr CAChannelPtr; +typedef std::tr1::weak_ptr CAChannelWPtr; + +class NotifyChannelRequester +{ +public: + ChannelRequester::weak_pointer channelRequester; + CAChannelWPtr channel; + bool isOnQueue; + NotifyChannelRequester() : isOnQueue(false) {} + void setChannel(CAChannelPtr const &channel) + { this->channel = channel;} +}; + + +class ChannelConnectThread : + public epicsThreadRunable +{ +public: + static ChannelConnectThreadPtr get(); + ~ChannelConnectThread(); + virtual void run(); + void start(); + void stop(); + void channelConnected(NotifyChannelRequesterPtr const ¬ifyChannelRequester); +private: + ChannelConnectThread(); + + bool isStop; + std::tr1::shared_ptr thread; + epics::pvData::Mutex mutex; + epics::pvData::Event waitForCommand; + epics::pvData::Event waitForStop; + std::queue notifyChannelQueue; +}; + + +}}} + +#endif /* ChannelConnectThread_H */