add channelConnectThread; should fix connecting to 50000 channels

This commit is contained in:
mrkraimer
2018-07-20 13:18:59 -04:00
parent 5a271ecf71
commit 16268cc9d6
7 changed files with 223 additions and 3 deletions
+1
View File
@@ -11,6 +11,7 @@ LIB_SYS_LIBS_WIN32 += ws2_32
INC += pv/caProvider.h
pvAccessCA_SRCS += channelConnectThread.cpp
pvAccessCA_SRCS += monitorEventThread.cpp
pvAccessCA_SRCS += getDoneThread.cpp
pvAccessCA_SRCS += putDoneThread.cpp
+13 -1
View File
@@ -10,6 +10,7 @@
#include <pv/standardField.h>
#include <pv/logger.h>
#include <pv/pvAccess.h>
#include "channelConnectThread.h"
#include "monitorEventThread.h"
#include "getDoneThread.h"
#include "putDoneThread.h"
@@ -62,6 +63,14 @@ void CAChannel::connected()
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::connected " << channelName << endl;
}
ChannelConnectThread->channelConnected(notifyChannelRequester);
}
void CAChannel::notifyClient()
{
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::notifyClient " << channelName << endl;
}
while(!putQueue.empty()) {
putQueue.front()->activate();
putQueue.pop();
@@ -103,7 +112,8 @@ CAChannel::CAChannel(std::string const & channelName,
channelProvider(channelProvider),
channelRequester(channelRequester),
channelID(0),
channelCreated(false)
channelCreated(false),
ChannelConnectThread(ChannelConnectThread::get())
{
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::CAChannel " << channelName << endl;
@@ -117,6 +127,8 @@ void CAChannel::activate(short priority)
}
ChannelRequester::shared_pointer req(channelRequester.lock());
if(!req) return;
notifyChannelRequester = NotifyChannelRequesterPtr(new NotifyChannelRequester());
notifyChannelRequester->setChannel(shared_from_this());
attachContext();
int result = ca_create_channel(channelName.c_str(),
ca_connection_handler,
+14
View File
@@ -28,6 +28,17 @@ namespace epics {
namespace pvAccess {
namespace ca {
class CAChannel;
typedef std::tr1::shared_ptr<CAChannel> CAChannelPtr;
typedef std::tr1::weak_ptr<CAChannel> CAChannelWPtr;
class ChannelConnectThread;
typedef std::tr1::shared_ptr<ChannelConnectThread> ChannelConnectThreadPtr;
class NotifyChannelRequester;
typedef std::tr1::shared_ptr<NotifyChannelRequester> NotifyChannelRequesterPtr;
class ChannelConnectThread;
typedef std::tr1::shared_ptr<ChannelConnectThread> ChannelConnectThreadPtr;
class NotifyMonitorRequester;
typedef std::tr1::shared_ptr<NotifyMonitorRequester> NotifyMonitorRequesterPtr;
class MonitorEventThread;
@@ -108,6 +119,7 @@ public:
void attachContext();
void disconnectChannel();
void notifyClient();
private:
virtual void destroy() {}
CAChannel(std::string const & channelName,
@@ -121,6 +133,8 @@ private:
ChannelRequester::weak_pointer channelRequester;
chid channelID;
bool channelCreated;
ChannelConnectThreadPtr ChannelConnectThread;
NotifyChannelRequesterPtr notifyChannelRequester;
epics::pvData::Mutex requestsMutex;
std::queue<CAChannelGetFieldPtr> getFieldQueue;
+5 -2
View File
@@ -11,6 +11,7 @@
#include <pv/logger.h>
#include <pv/pvAccess.h>
#include "channelConnectThread.h"
#include "monitorEventThread.h"
#include "getDoneThread.h"
#include "putDoneThread.h"
@@ -39,6 +40,7 @@ CAChannelProvider::CAChannelProvider()
CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr<Configuration>&)
: current_context(0),
channelConnectThread(ChannelConnectThread::get()),
monitorEventThread(MonitorEventThread::get()),
getDoneThread(GetDoneThread::get()),
putDoneThread(PutDoneThread::get())
@@ -75,9 +77,10 @@ CAChannelProvider::~CAChannelProvider()
channelQ.front()->disconnectChannel();
channelQ.pop();
}
monitorEventThread->stop();
getDoneThread->stop();
putDoneThread->stop();
getDoneThread->stop();
monitorEventThread->stop();
channelConnectThread->stop();
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelProvider::~CAChannelProvider() calling ca_context_destroy\n";
}
+4
View File
@@ -24,6 +24,9 @@ namespace ca {
#define DEBUG_LEVEL 0
class ChannelConnectThread;
typedef std::tr1::shared_ptr<ChannelConnectThread> ChannelConnectThreadPtr;
class MonitorEventThread;
typedef std::tr1::shared_ptr<MonitorEventThread> MonitorEventThreadPtr;
@@ -86,6 +89,7 @@ private:
ca_client_context* current_context;
epics::pvData::Mutex channelListMutex;
std::vector<CAChannelWPtr> caChannelList;
ChannelConnectThreadPtr channelConnectThread;
MonitorEventThreadPtr monitorEventThread;
GetDoneThreadPtr getDoneThread;
PutDoneThreadPtr putDoneThread;
+115
View File
@@ -0,0 +1,115 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
/**
* @author mrk
* @date 2018.07
*/
#include "caChannel.h"
#include <epicsExit.h>
#define epicsExportSharedSymbols
#include "channelConnectThread.h"
using namespace epics::pvData;
using namespace std;
namespace epics {
namespace pvAccess {
namespace ca {
ChannelConnectThreadPtr ChannelConnectThread::get()
{
static ChannelConnectThreadPtr master;
static Mutex mutex;
Lock xx(mutex);
if(!master) {
master = ChannelConnectThreadPtr(new ChannelConnectThread());
master->start();
}
return master;
}
ChannelConnectThread::ChannelConnectThread()
: isStop(false)
{
}
ChannelConnectThread::~ChannelConnectThread()
{
//std::cout << "ChannelConnectThread::~ChannelConnectThread()\n";
}
void ChannelConnectThread::start()
{
thread = std::tr1::shared_ptr<epicsThread>(new epicsThread(
*this,
"channelConnectThread",
epicsThreadGetStackSize(epicsThreadStackSmall),
epicsThreadPriorityLow));
thread->start();
}
void ChannelConnectThread::stop()
{
{
Lock xx(mutex);
isStop = true;
}
waitForCommand.signal();
waitForStop.wait();
}
void ChannelConnectThread::channelConnected(
NotifyChannelRequesterPtr const &notifyChannelRequester)
{
{
Lock lock(mutex);
if(notifyChannelRequester->isOnQueue) return;
notifyChannelRequester->isOnQueue = true;
notifyChannelQueue.push(notifyChannelRequester);
}
waitForCommand.signal();
}
void ChannelConnectThread::run()
{
while(true)
{
waitForCommand.wait();
while(true) {
bool more = false;
NotifyChannelRequester* notifyChannelRequester(NULL);
{
Lock lock(mutex);
if(!notifyChannelQueue.empty())
{
more = true;
NotifyChannelRequesterWPtr req(notifyChannelQueue.front());
notifyChannelQueue.pop();
NotifyChannelRequesterPtr reqPtr(req.lock());
if(reqPtr) {
notifyChannelRequester = reqPtr.get();
reqPtr->isOnQueue = false;
}
}
}
if(!more) break;
if(notifyChannelRequester!=NULL)
{
CAChannelPtr channel(notifyChannelRequester->channel.lock());
if(channel) channel->notifyClient();
}
}
if(isStop) {
waitForStop.signal();
break;
}
}
}
}}}
+71
View File
@@ -0,0 +1,71 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvAccessCPP is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
/**
* @author mrk
* @date 2018.07
*/
#ifndef ChannelConnectThread_H
#define ChannelConnectThread_H
#include <queue>
#include <cadef.h>
#include <shareLib.h>
#include <epicsThread.h>
#include <pv/event.h>
#include <pv/lock.h>
namespace epics {
namespace pvAccess {
namespace ca {
class NotifyChannelRequester;
typedef std::tr1::shared_ptr<NotifyChannelRequester> NotifyChannelRequesterPtr;
typedef std::tr1::weak_ptr<NotifyChannelRequester> NotifyChannelRequesterWPtr;
class ChannelConnectThread;
typedef std::tr1::shared_ptr<ChannelConnectThread> ChannelConnectThreadPtr;
class CAChannel;
typedef std::tr1::shared_ptr<CAChannel> CAChannelPtr;
typedef std::tr1::weak_ptr<CAChannel> CAChannelWPtr;
class NotifyChannelRequester
{
public:
ChannelRequester::weak_pointer channelRequester;
CAChannelWPtr channel;
bool isOnQueue;
NotifyChannelRequester() : isOnQueue(false) {}
void setChannel(CAChannelPtr const &channel)
{ this->channel = channel;}
};
class ChannelConnectThread :
public epicsThreadRunable
{
public:
static ChannelConnectThreadPtr get();
~ChannelConnectThread();
virtual void run();
void start();
void stop();
void channelConnected(NotifyChannelRequesterPtr const &notifyChannelRequester);
private:
ChannelConnectThread();
bool isStop;
std::tr1::shared_ptr<epicsThread> thread;
epics::pvData::Mutex mutex;
epics::pvData::Event waitForCommand;
epics::pvData::Event waitForStop;
std::queue<NotifyChannelRequesterWPtr> notifyChannelQueue;
};
}}}
#endif /* ChannelConnectThread_H */