From 16268cc9d696595015b6c99bd9c68dd103fd9489 Mon Sep 17 00:00:00 2001 From: mrkraimer Date: Fri, 20 Jul 2018 13:18:59 -0400 Subject: [PATCH 1/2] add channelConnectThread; should fix connecting to 50000 channels --- src/ca/Makefile | 1 + src/ca/caChannel.cpp | 14 +++- src/ca/caChannel.h | 14 ++++ src/ca/caProvider.cpp | 7 +- src/ca/caProviderPvt.h | 4 ++ src/ca/channelConnectThread.cpp | 115 ++++++++++++++++++++++++++++++++ src/ca/channelConnectThread.h | 71 ++++++++++++++++++++ 7 files changed, 223 insertions(+), 3 deletions(-) create mode 100644 src/ca/channelConnectThread.cpp create mode 100644 src/ca/channelConnectThread.h diff --git a/src/ca/Makefile b/src/ca/Makefile index 93e6b27..6bed7c3 100644 --- a/src/ca/Makefile +++ b/src/ca/Makefile @@ -11,6 +11,7 @@ LIB_SYS_LIBS_WIN32 += ws2_32 INC += pv/caProvider.h +pvAccessCA_SRCS += channelConnectThread.cpp pvAccessCA_SRCS += monitorEventThread.cpp pvAccessCA_SRCS += getDoneThread.cpp pvAccessCA_SRCS += putDoneThread.cpp diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index b19349b..40ba9fa 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -10,6 +10,7 @@ #include #include #include +#include "channelConnectThread.h" #include "monitorEventThread.h" #include "getDoneThread.h" #include "putDoneThread.h" @@ -62,6 +63,14 @@ void CAChannel::connected() if(DEBUG_LEVEL>0) { cout<< "CAChannel::connected " << channelName << endl; } + ChannelConnectThread->channelConnected(notifyChannelRequester); +} + +void CAChannel::notifyClient() +{ + if(DEBUG_LEVEL>0) { + cout<< "CAChannel::notifyClient " << channelName << endl; + } while(!putQueue.empty()) { putQueue.front()->activate(); putQueue.pop(); @@ -103,7 +112,8 @@ CAChannel::CAChannel(std::string const & channelName, channelProvider(channelProvider), channelRequester(channelRequester), channelID(0), - channelCreated(false) + channelCreated(false), + ChannelConnectThread(ChannelConnectThread::get()) { if(DEBUG_LEVEL>0) { cout<< "CAChannel::CAChannel " << channelName << endl; @@ -117,6 +127,8 @@ void CAChannel::activate(short priority) } ChannelRequester::shared_pointer req(channelRequester.lock()); if(!req) return; + notifyChannelRequester = NotifyChannelRequesterPtr(new NotifyChannelRequester()); + notifyChannelRequester->setChannel(shared_from_this()); attachContext(); int result = ca_create_channel(channelName.c_str(), ca_connection_handler, diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index 8228627..f1203bc 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -28,6 +28,17 @@ namespace epics { namespace pvAccess { namespace ca { +class CAChannel; +typedef std::tr1::shared_ptr CAChannelPtr; +typedef std::tr1::weak_ptr CAChannelWPtr; +class ChannelConnectThread; +typedef std::tr1::shared_ptr ChannelConnectThreadPtr; + +class NotifyChannelRequester; +typedef std::tr1::shared_ptr NotifyChannelRequesterPtr; +class ChannelConnectThread; +typedef std::tr1::shared_ptr ChannelConnectThreadPtr; + class NotifyMonitorRequester; typedef std::tr1::shared_ptr NotifyMonitorRequesterPtr; class MonitorEventThread; @@ -108,6 +119,7 @@ public: void attachContext(); void disconnectChannel(); + void notifyClient(); private: virtual void destroy() {} CAChannel(std::string const & channelName, @@ -121,6 +133,8 @@ private: ChannelRequester::weak_pointer channelRequester; chid channelID; bool channelCreated; + ChannelConnectThreadPtr ChannelConnectThread; + NotifyChannelRequesterPtr notifyChannelRequester; epics::pvData::Mutex requestsMutex; std::queue getFieldQueue; diff --git a/src/ca/caProvider.cpp b/src/ca/caProvider.cpp index 5c5b22c..f670add 100644 --- a/src/ca/caProvider.cpp +++ b/src/ca/caProvider.cpp @@ -11,6 +11,7 @@ #include #include +#include "channelConnectThread.h" #include "monitorEventThread.h" #include "getDoneThread.h" #include "putDoneThread.h" @@ -39,6 +40,7 @@ CAChannelProvider::CAChannelProvider() CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr&) : current_context(0), + channelConnectThread(ChannelConnectThread::get()), monitorEventThread(MonitorEventThread::get()), getDoneThread(GetDoneThread::get()), putDoneThread(PutDoneThread::get()) @@ -75,9 +77,10 @@ CAChannelProvider::~CAChannelProvider() channelQ.front()->disconnectChannel(); channelQ.pop(); } - monitorEventThread->stop(); - getDoneThread->stop(); putDoneThread->stop(); + getDoneThread->stop(); + monitorEventThread->stop(); + channelConnectThread->stop(); if(DEBUG_LEVEL>0) { std::cout << "CAChannelProvider::~CAChannelProvider() calling ca_context_destroy\n"; } diff --git a/src/ca/caProviderPvt.h b/src/ca/caProviderPvt.h index 0d48271..4a84c9d 100644 --- a/src/ca/caProviderPvt.h +++ b/src/ca/caProviderPvt.h @@ -24,6 +24,9 @@ namespace ca { #define DEBUG_LEVEL 0 +class ChannelConnectThread; +typedef std::tr1::shared_ptr ChannelConnectThreadPtr; + class MonitorEventThread; typedef std::tr1::shared_ptr MonitorEventThreadPtr; @@ -86,6 +89,7 @@ private: ca_client_context* current_context; epics::pvData::Mutex channelListMutex; std::vector caChannelList; + ChannelConnectThreadPtr channelConnectThread; MonitorEventThreadPtr monitorEventThread; GetDoneThreadPtr getDoneThread; PutDoneThreadPtr putDoneThread; diff --git a/src/ca/channelConnectThread.cpp b/src/ca/channelConnectThread.cpp new file mode 100644 index 0000000..c378019 --- /dev/null +++ b/src/ca/channelConnectThread.cpp @@ -0,0 +1,115 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.07 + */ + +#include "caChannel.h" +#include +#define epicsExportSharedSymbols +#include "channelConnectThread.h" + +using namespace epics::pvData; +using namespace std; + +namespace epics { +namespace pvAccess { +namespace ca { + +ChannelConnectThreadPtr ChannelConnectThread::get() +{ + static ChannelConnectThreadPtr master; + static Mutex mutex; + Lock xx(mutex); + if(!master) { + master = ChannelConnectThreadPtr(new ChannelConnectThread()); + master->start(); + } + return master; +} + +ChannelConnectThread::ChannelConnectThread() +: isStop(false) +{ +} + +ChannelConnectThread::~ChannelConnectThread() +{ +//std::cout << "ChannelConnectThread::~ChannelConnectThread()\n"; +} + + +void ChannelConnectThread::start() +{ + thread = std::tr1::shared_ptr(new epicsThread( + *this, + "channelConnectThread", + epicsThreadGetStackSize(epicsThreadStackSmall), + epicsThreadPriorityLow)); + thread->start(); +} + + +void ChannelConnectThread::stop() +{ + { + Lock xx(mutex); + isStop = true; + } + waitForCommand.signal(); + waitForStop.wait(); +} + +void ChannelConnectThread::channelConnected( + NotifyChannelRequesterPtr const ¬ifyChannelRequester) +{ + { + Lock lock(mutex); + if(notifyChannelRequester->isOnQueue) return; + notifyChannelRequester->isOnQueue = true; + notifyChannelQueue.push(notifyChannelRequester); + } + waitForCommand.signal(); +} + +void ChannelConnectThread::run() +{ + while(true) + { + waitForCommand.wait(); + while(true) { + bool more = false; + NotifyChannelRequester* notifyChannelRequester(NULL); + { + Lock lock(mutex); + if(!notifyChannelQueue.empty()) + { + more = true; + NotifyChannelRequesterWPtr req(notifyChannelQueue.front()); + notifyChannelQueue.pop(); + NotifyChannelRequesterPtr reqPtr(req.lock()); + if(reqPtr) { + notifyChannelRequester = reqPtr.get(); + reqPtr->isOnQueue = false; + } + } + } + if(!more) break; + if(notifyChannelRequester!=NULL) + { + CAChannelPtr channel(notifyChannelRequester->channel.lock()); + if(channel) channel->notifyClient(); + } + } + if(isStop) { + waitForStop.signal(); + break; + } + } +} + +}}} diff --git a/src/ca/channelConnectThread.h b/src/ca/channelConnectThread.h new file mode 100644 index 0000000..7dcb568 --- /dev/null +++ b/src/ca/channelConnectThread.h @@ -0,0 +1,71 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ +/** + * @author mrk + * @date 2018.07 + */ +#ifndef ChannelConnectThread_H +#define ChannelConnectThread_H +#include +#include +#include +#include +#include +#include + +namespace epics { +namespace pvAccess { +namespace ca { + +class NotifyChannelRequester; +typedef std::tr1::shared_ptr NotifyChannelRequesterPtr; +typedef std::tr1::weak_ptr NotifyChannelRequesterWPtr; + + +class ChannelConnectThread; +typedef std::tr1::shared_ptr ChannelConnectThreadPtr; + +class CAChannel; +typedef std::tr1::shared_ptr CAChannelPtr; +typedef std::tr1::weak_ptr CAChannelWPtr; + +class NotifyChannelRequester +{ +public: + ChannelRequester::weak_pointer channelRequester; + CAChannelWPtr channel; + bool isOnQueue; + NotifyChannelRequester() : isOnQueue(false) {} + void setChannel(CAChannelPtr const &channel) + { this->channel = channel;} +}; + + +class ChannelConnectThread : + public epicsThreadRunable +{ +public: + static ChannelConnectThreadPtr get(); + ~ChannelConnectThread(); + virtual void run(); + void start(); + void stop(); + void channelConnected(NotifyChannelRequesterPtr const ¬ifyChannelRequester); +private: + ChannelConnectThread(); + + bool isStop; + std::tr1::shared_ptr thread; + epics::pvData::Mutex mutex; + epics::pvData::Event waitForCommand; + epics::pvData::Event waitForStop; + std::queue notifyChannelQueue; +}; + + +}}} + +#endif /* ChannelConnectThread_H */ From 39ede57fe2b79044587845a152b802a8e8cd791c Mon Sep 17 00:00:00 2001 From: mrkraimer Date: Fri, 10 Aug 2018 15:11:02 -0400 Subject: [PATCH 2/2] change ChannelConnectThread to channelConnectThread --- src/ca/caChannel.cpp | 7 +++++-- src/ca/caChannel.h | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index 40ba9fa..c60ab53 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -63,7 +63,7 @@ void CAChannel::connected() if(DEBUG_LEVEL>0) { cout<< "CAChannel::connected " << channelName << endl; } - ChannelConnectThread->channelConnected(notifyChannelRequester); + channelConnectThread->channelConnected(notifyChannelRequester); } void CAChannel::notifyClient() @@ -113,7 +113,7 @@ CAChannel::CAChannel(std::string const & channelName, channelRequester(channelRequester), channelID(0), channelCreated(false), - ChannelConnectThread(ChannelConnectThread::get()) + channelConnectThread(ChannelConnectThread::get()) { if(DEBUG_LEVEL>0) { cout<< "CAChannel::CAChannel " << channelName << endl; @@ -487,6 +487,9 @@ void CAChannelGet::getDone(struct event_handler_args &args) void CAChannelGet::notifyClient() { + if(DEBUG_LEVEL>1) { + std::cout << "CAChannelGet::notifyClient " << channel->getChannelName() << endl; + } ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock()); if(!getRequester) return; EXCEPTION_GUARD(getRequester->getDone(getStatus, shared_from_this(), pvStructure, bitSet)); diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index f1203bc..625ad81 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -133,7 +133,7 @@ private: ChannelRequester::weak_pointer channelRequester; chid channelID; bool channelCreated; - ChannelConnectThreadPtr ChannelConnectThread; + ChannelConnectThreadPtr channelConnectThread; NotifyChannelRequesterPtr notifyChannelRequester; epics::pvData::Mutex requestsMutex;