diff --git a/src/ca/Makefile b/src/ca/Makefile index 93e6b27..6bed7c3 100644 --- a/src/ca/Makefile +++ b/src/ca/Makefile @@ -11,6 +11,7 @@ LIB_SYS_LIBS_WIN32 += ws2_32 INC += pv/caProvider.h +pvAccessCA_SRCS += channelConnectThread.cpp pvAccessCA_SRCS += monitorEventThread.cpp pvAccessCA_SRCS += getDoneThread.cpp pvAccessCA_SRCS += putDoneThread.cpp diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index c1f2dbe..1e33195 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -10,6 +10,7 @@ #include #include #include +#include "channelConnectThread.h" #include "monitorEventThread.h" #include "getDoneThread.h" #include "putDoneThread.h" @@ -76,6 +77,25 @@ void CAChannel::connected() ChannelRequester::shared_pointer req(channelRequester.lock()); if(req) EXCEPTION_GUARD(req->channelCreated(Status::Ok, shared_from_this())); } + ChannelRequester::shared_pointer req(channelRequester.lock()); + if(req) { + EXCEPTION_GUARD(req->channelStateChange( + shared_from_this(), Channel::CONNECTED)); + } +} + +void CAChannel::notifyClient() +{ + if(DEBUG_LEVEL>0) { + cout<< "CAChannel::notifyClient " << channelName << endl; + } + CAChannelProviderPtr provider(channelProvider.lock()); + if(!provider) return; + provider->addChannel(shared_from_this()); + while(!getFieldQueue.empty()) { + getFieldQueue.front()->activate(); + getFieldQueue.pop(); + } while(!getFieldQueue.empty()) { getFieldQueue.front()->activate(); getFieldQueue.pop(); @@ -95,10 +115,9 @@ void CAChannel::connected() monitorQueue.pop(); } ChannelRequester::shared_pointer req(channelRequester.lock()); - if(req) { - EXCEPTION_GUARD(req->channelStateChange( - shared_from_this(), Channel::CONNECTED)); - } + if(!req) return; + EXCEPTION_GUARD(req->channelCreated(Status::Ok, shared_from_this())); + EXCEPTION_GUARD(req->channelStateChange(shared_from_this(), Channel::CONNECTED)); } void CAChannel::disconnected() @@ -121,7 +140,8 @@ CAChannel::CAChannel(std::string const & channelName, channelProvider(channelProvider), channelRequester(channelRequester), channelID(0), - channelCreated(false) + channelCreated(false), + channelConnectThread(ChannelConnectThread::get()) { if(DEBUG_LEVEL>0) { cout<< "CAChannel::CAChannel " << channelName << endl; @@ -130,11 +150,13 @@ CAChannel::CAChannel(std::string const & channelName, void CAChannel::activate(short priority) { + ChannelRequester::shared_pointer req(channelRequester.lock()); + if(!req) return; if(DEBUG_LEVEL>0) { cout<< "CAChannel::activate " << channelName << endl; } - ChannelRequester::shared_pointer req(channelRequester.lock()); - if(!req) return; + notifyChannelRequester = NotifyChannelRequesterPtr(new NotifyChannelRequester()); + notifyChannelRequester->setChannel(shared_from_this()); attachContext(); int result = ca_create_channel(channelName.c_str(), ca_connection_handler, @@ -497,6 +519,9 @@ void CAChannelGet::getDone(struct event_handler_args &args) void CAChannelGet::notifyClient() { + if(DEBUG_LEVEL>1) { + std::cout << "CAChannelGet::notifyClient " << channel->getChannelName() << endl; + } ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock()); if(!getRequester) return; EXCEPTION_GUARD(getRequester->getDone(getStatus, shared_from_this(), pvStructure, bitSet)); diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index d5835cd..d27652d 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -28,6 +28,15 @@ namespace epics { namespace pvAccess { namespace ca { +class CAChannel; +typedef std::tr1::shared_ptr CAChannelPtr; +typedef std::tr1::weak_ptr CAChannelWPtr; +class ChannelConnectThread; +typedef std::tr1::shared_ptr ChannelConnectThreadPtr; + +class NotifyChannelRequester; +typedef std::tr1::shared_ptr NotifyChannelRequesterPtr; + class NotifyMonitorRequester; typedef std::tr1::shared_ptr NotifyMonitorRequesterPtr; class MonitorEventThread; @@ -113,6 +122,7 @@ public: void attachContext(); void disconnectChannel(); + void notifyClient(); private: virtual void destroy() {} CAChannel(std::string const & channelName, @@ -126,6 +136,8 @@ private: ChannelRequester::weak_pointer channelRequester; chid channelID; bool channelCreated; + ChannelConnectThreadPtr channelConnectThread; + NotifyChannelRequesterPtr notifyChannelRequester; epics::pvData::Mutex requestsMutex; std::queue getFieldQueue; diff --git a/src/ca/caProvider.cpp b/src/ca/caProvider.cpp index 5c5b22c..09bcb8b 100644 --- a/src/ca/caProvider.cpp +++ b/src/ca/caProvider.cpp @@ -11,6 +11,7 @@ #include #include +#include "channelConnectThread.h" #include "monitorEventThread.h" #include "getDoneThread.h" #include "putDoneThread.h" @@ -39,6 +40,7 @@ CAChannelProvider::CAChannelProvider() CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr&) : current_context(0), + channelConnectThread(ChannelConnectThread::get()), monitorEventThread(MonitorEventThread::get()), getDoneThread(GetDoneThread::get()), putDoneThread(PutDoneThread::get()) @@ -75,9 +77,10 @@ CAChannelProvider::~CAChannelProvider() channelQ.front()->disconnectChannel(); channelQ.pop(); } - monitorEventThread->stop(); - getDoneThread->stop(); putDoneThread->stop(); + getDoneThread->stop(); + monitorEventThread->stop(); + channelConnectThread->stop(); if(DEBUG_LEVEL>0) { std::cout << "CAChannelProvider::~CAChannelProvider() calling ca_context_destroy\n"; } @@ -174,10 +177,8 @@ void CAChannelProvider::attachContext() { ca_client_context* thread_context = ca_current_context(); if (thread_context == current_context) return; - if (thread_context != NULL) { - throw std::runtime_error("CAChannelProvider::attachContext Foreign CA context in use"); - } int result = ca_attach_context(current_context); + if(result==ECA_ISATTACHED) return; if (result != ECA_NORMAL) { std::string mess("CAChannelProvider::attachContext error calling ca_attach_context "); mess += ca_message(result); diff --git a/src/ca/caProviderPvt.h b/src/ca/caProviderPvt.h index 0d48271..4a84c9d 100644 --- a/src/ca/caProviderPvt.h +++ b/src/ca/caProviderPvt.h @@ -24,6 +24,9 @@ namespace ca { #define DEBUG_LEVEL 0 +class ChannelConnectThread; +typedef std::tr1::shared_ptr ChannelConnectThreadPtr; + class MonitorEventThread; typedef std::tr1::shared_ptr MonitorEventThreadPtr; @@ -86,6 +89,7 @@ private: ca_client_context* current_context; epics::pvData::Mutex channelListMutex; std::vector caChannelList; + ChannelConnectThreadPtr channelConnectThread; MonitorEventThreadPtr monitorEventThread; GetDoneThreadPtr getDoneThread; PutDoneThreadPtr putDoneThread; diff --git a/src/ca/channelConnectThread.cpp b/src/ca/channelConnectThread.cpp new file mode 100644 index 0000000..c378019 --- /dev/null +++ b/src/ca/channelConnectThread.cpp @@ -0,0 +1,115 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.07 + */ + +#include "caChannel.h" +#include +#define epicsExportSharedSymbols +#include "channelConnectThread.h" + +using namespace epics::pvData; +using namespace std; + +namespace epics { +namespace pvAccess { +namespace ca { + +ChannelConnectThreadPtr ChannelConnectThread::get() +{ + static ChannelConnectThreadPtr master; + static Mutex mutex; + Lock xx(mutex); + if(!master) { + master = ChannelConnectThreadPtr(new ChannelConnectThread()); + master->start(); + } + return master; +} + +ChannelConnectThread::ChannelConnectThread() +: isStop(false) +{ +} + +ChannelConnectThread::~ChannelConnectThread() +{ +//std::cout << "ChannelConnectThread::~ChannelConnectThread()\n"; +} + + +void ChannelConnectThread::start() +{ + thread = std::tr1::shared_ptr(new epicsThread( + *this, + "channelConnectThread", + epicsThreadGetStackSize(epicsThreadStackSmall), + epicsThreadPriorityLow)); + thread->start(); +} + + +void ChannelConnectThread::stop() +{ + { + Lock xx(mutex); + isStop = true; + } + waitForCommand.signal(); + waitForStop.wait(); +} + +void ChannelConnectThread::channelConnected( + NotifyChannelRequesterPtr const ¬ifyChannelRequester) +{ + { + Lock lock(mutex); + if(notifyChannelRequester->isOnQueue) return; + notifyChannelRequester->isOnQueue = true; + notifyChannelQueue.push(notifyChannelRequester); + } + waitForCommand.signal(); +} + +void ChannelConnectThread::run() +{ + while(true) + { + waitForCommand.wait(); + while(true) { + bool more = false; + NotifyChannelRequester* notifyChannelRequester(NULL); + { + Lock lock(mutex); + if(!notifyChannelQueue.empty()) + { + more = true; + NotifyChannelRequesterWPtr req(notifyChannelQueue.front()); + notifyChannelQueue.pop(); + NotifyChannelRequesterPtr reqPtr(req.lock()); + if(reqPtr) { + notifyChannelRequester = reqPtr.get(); + reqPtr->isOnQueue = false; + } + } + } + if(!more) break; + if(notifyChannelRequester!=NULL) + { + CAChannelPtr channel(notifyChannelRequester->channel.lock()); + if(channel) channel->notifyClient(); + } + } + if(isStop) { + waitForStop.signal(); + break; + } + } +} + +}}} diff --git a/src/ca/channelConnectThread.h b/src/ca/channelConnectThread.h new file mode 100644 index 0000000..7dcb568 --- /dev/null +++ b/src/ca/channelConnectThread.h @@ -0,0 +1,71 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.07 + */ +#ifndef ChannelConnectThread_H +#define ChannelConnectThread_H +#include +#include +#include +#include +#include +#include + +namespace epics { +namespace pvAccess { +namespace ca { + +class NotifyChannelRequester; +typedef std::tr1::shared_ptr NotifyChannelRequesterPtr; +typedef std::tr1::weak_ptr NotifyChannelRequesterWPtr; + + +class ChannelConnectThread; +typedef std::tr1::shared_ptr ChannelConnectThreadPtr; + +class CAChannel; +typedef std::tr1::shared_ptr CAChannelPtr; +typedef std::tr1::weak_ptr CAChannelWPtr; + +class NotifyChannelRequester +{ +public: + ChannelRequester::weak_pointer channelRequester; + CAChannelWPtr channel; + bool isOnQueue; + NotifyChannelRequester() : isOnQueue(false) {} + void setChannel(CAChannelPtr const &channel) + { this->channel = channel;} +}; + + +class ChannelConnectThread : + public epicsThreadRunable +{ +public: + static ChannelConnectThreadPtr get(); + ~ChannelConnectThread(); + virtual void run(); + void start(); + void stop(); + void channelConnected(NotifyChannelRequesterPtr const ¬ifyChannelRequester); +private: + ChannelConnectThread(); + + bool isStop; + std::tr1::shared_ptr thread; + epics::pvData::Mutex mutex; + epics::pvData::Event waitForCommand; + epics::pvData::Event waitForStop; + std::queue notifyChannelQueue; +}; + + +}}} + +#endif /* ChannelConnectThread_H */