added aux thread to call ca_clear_subscription

This commit is contained in:
mrkraimer
2018-06-08 15:22:08 -04:00
parent 673edb27e1
commit fe8184cf95
8 changed files with 275 additions and 232 deletions

View File

@@ -11,6 +11,7 @@
#include <pv/logger.h>
#include <pv/pvAccess.h>
#include <pv/reftrack.h>
#include "stopMonitorThread.h"
#define epicsExportSharedSymbols
#include "caChannel.h"
@@ -84,18 +85,7 @@ void CAChannel::disconnected()
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::disconnected " << channelName << endl;
}
while(!putQueue.empty()) {
putQueue.front()->channelDisconnect(false);
putQueue.pop();
}
while(!getQueue.empty()) {
getQueue.front()->channelDisconnect(false);
getQueue.pop();
}
while(!monitorQueue.empty()) {
monitorQueue.front()->channelDisconnect(false);
monitorQueue.pop();
}
ChannelRequester::shared_pointer req(channelRequester.lock());
if(req) {
EXCEPTION_GUARD(req->channelStateChange(
@@ -105,12 +95,12 @@ void CAChannel::disconnected()
size_t CAChannel::num_instances;
CAChannel::CAChannel(std::string const & _channelName,
CAChannelProvider::shared_pointer const & _channelProvider,
ChannelRequester::shared_pointer const & _channelRequester) :
channelName(_channelName),
channelProvider(_channelProvider),
channelRequester(_channelRequester),
CAChannel::CAChannel(std::string const & channelName,
CAChannelProvider::shared_pointer const & channelProvider,
ChannelRequester::shared_pointer const & channelRequester) :
channelName(channelName),
channelProvider(channelProvider),
channelRequester(channelRequester),
channelID(0),
channelCreated(false)
{
@@ -149,6 +139,10 @@ CAChannel::~CAChannel()
if(DEBUG_LEVEL>0) {
cout << "CAChannel::~CAChannel() " << channelName << endl;
}
{
Lock lock(requestsMutex);
if(!channelCreated) return;
}
disconnectChannel();
}
@@ -177,54 +171,6 @@ void CAChannel::disconnectChannel()
cerr << mess << endl;
}
void CAChannel::addChannelGet(const CAChannelGetPtr & get)
{
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::addChannelGet " << channelName << endl;
}
Lock lock(requestsMutex);
for(size_t i=0; i< getList.size(); ++i) {
if(!(getList[i].lock())) {
getList[i] = get;
return;
}
}
getList.push_back(get);
}
void CAChannel::addChannelPut(const CAChannelPutPtr & put)
{
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::addChannelPut " << channelName << endl;
}
Lock lock(requestsMutex);
for(size_t i=0; i< putList.size(); ++i) {
if(!(putList[i].lock())) {
putList[i] = put;
return;
}
}
putList.push_back(put);
}
void CAChannel::addChannelMonitor(const CAChannelMonitorPtr & monitor)
{
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::addChannelMonitor " << channelName << endl;
}
Lock lock(requestsMutex);
for(size_t i=0; i< monitorList.size(); ++i) {
if(!(monitorList[i].lock())) {
monitorList[i] = monitor;
return;
}
}
monitorList.push_back(monitor);
}
chid CAChannel::getChannelID()
{
return channelID;
@@ -426,11 +372,12 @@ void CAChannel::attachContext()
std::tr1::static_pointer_cast<CAChannelProvider>(provider)->attachContext();
return;
}
string mess("CAChannel::attachContext ");
string mess("CAChannel::attachContext provider does not exist ");
mess += getChannelName();
throw std::runtime_error(mess);
}
size_t CAChannelGet::num_instances;
CAChannelGetPtr CAChannelGet::create(
@@ -460,9 +407,6 @@ CAChannelGet::~CAChannelGet()
}
}
void CAChannelGet::channelCreated(const epics::pvData::Status& s,Channel::shared_pointer const & c)
{}
void CAChannelGet::activate()
{
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
@@ -473,33 +417,12 @@ void CAChannelGet::activate()
dbdToPv = DbdToPv::create(channel,pvRequest,getIO);
pvStructure = dbdToPv->createPVStructure();
bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
channel->addChannelGet(shared_from_this());
EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
void CAChannelGet::channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
string mess("CAChannelGet::channelStateChange was called ");
mess += channel->getChannelName();
throw std::runtime_error(mess);
}
std::string CAChannelGet::getRequesterName() { return "CAChannelGet";}
void CAChannelGet::channelDisconnect(bool destroy)
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelGet::channelDisconnect " << channel->getChannelName() << endl;
}
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
EXCEPTION_GUARD(getRequester->channelDisconnect(destroy);)
if(!destroy) channel->addChannelGet(shared_from_this());
}
namespace {
static void ca_get_handler(struct event_handler_args args)
@@ -590,8 +513,6 @@ CAChannelPut::~CAChannelPut()
}
}
void CAChannelPut::channelCreated(const Status& status,Channel::shared_pointer const & c)
{}
void CAChannelPut::activate()
{
@@ -608,32 +529,12 @@ void CAChannelPut::activate()
std::string val = pvString->get();
if(val.compare("true")==0) block = true;
}
channel->addChannelPut(shared_from_this());
EXCEPTION_GUARD(putRequester->channelPutConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
void CAChannelPut::channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
string mess("CAChannelPut::channelStateChange was called ");
mess += channel->getChannelName();
throw std::runtime_error(mess);
}
std::string CAChannelPut::getRequesterName() { return "CAChannelPut";}
void CAChannelPut::channelDisconnect(bool destroy)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelPut::channelDisconnect " << channel->getChannelName() << endl;
}
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
EXCEPTION_GUARD(putRequester->channelDisconnect(destroy);)
if(!destroy) channel->addChannelPut(shared_from_this());
}
/* --------------- epics::pvAccess::ChannelPut --------------- */
@@ -781,6 +682,7 @@ public:
void release(MonitorElementPtr const & monitorElement)
{
Lock guard(mutex);
if(!isStarted) return;
if(monitorElementQueue.empty()) {
string mess("CAChannelMonitor::release client error calling release ");
throw std::runtime_error(mess);
@@ -810,26 +712,22 @@ CAChannelMonitor::CAChannelMonitor(
channel(channel),
monitorRequester(monitorRequester),
pvRequest(pvRequest),
isStarted(false)
isStarted(false),
stopMonitorThread(StopMonitorThread::get())
{}
CAChannelMonitor::~CAChannelMonitor()
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelMonitor::~CAChannelMonitor() " << channel->getChannelName() << endl;
std::cout << "CAChannelMonitor::~CAChannelMonitor() "
<< channel->getChannelName()
<< " isStarted " << (isStarted ? "true" : "false")
<< endl;
}
if(!isStarted) return;
channel->attachContext();
int result = ca_clear_subscription(eventID);
if (result == ECA_NORMAL) return;
string mess("CAChannelMonitor::~CAChannelMonitor() ");
mess += ca_message(result);
cerr << mess << endl;
if(isStarted) stop();
stopMonitorThread->waitForNoEvents();
}
void CAChannelMonitor::channelCreated(const Status& status,Channel::shared_pointer const & c)
{}
void CAChannelMonitor::activate()
{
MonitorRequester::shared_pointer requester(monitorRequester.lock());
@@ -853,44 +751,22 @@ void CAChannelMonitor::activate()
}
}
monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize));
channel->addChannelMonitor(shared_from_this());
EXCEPTION_GUARD(requester->monitorConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
void CAChannelMonitor::channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
string mess("CAChannelMonitor::channelStateChange was called ");
mess += channel->getChannelName();
throw std::runtime_error(mess);
}
std::string CAChannelMonitor::getRequesterName() { return "CAChannelMonitor";}
void CAChannelMonitor::channelDisconnect(bool destroy)
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelMonitor::channelDisconnect " << channel->getChannelName() << endl;
}
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
EXCEPTION_GUARD(requester->channelDisconnect(destroy);)
if(!destroy) channel->addChannelMonitor(shared_from_this());
}
void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
{
if(DEBUG_LEVEL>1) {
std::cout << "CAChannelMonitor::subscriptionEvent "
<< channel->getChannelName() << endl;
}
if(!isStarted) return;
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
Status status = dbdToPv->getFromDBD(pvStructure,activeElement->changedBitSet,args);
if(status.isOK())
{
if(monitorQueue->event(pvStructure,activeElement)) {
@@ -943,22 +819,17 @@ epics::pvData::Status CAChannelMonitor::start()
epics::pvData::Status CAChannelMonitor::stop()
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelMonitor::stop " << channel->getChannelName() << endl;
std::cout << "CAChannelMonitor::stop "
<< channel->getChannelName()
<< " isStarted " << (isStarted ? "true" : "false")
<< endl;
}
Status status = Status::Ok;
if(!isStarted) return Status(Status::STATUSTYPE_WARNING,"already stopped");
channel->attachContext();
int result = ca_clear_subscription(eventID);
if (result == ECA_NORMAL)
{
isStarted = false;
monitorQueue->stop();
result = ca_flush_io();
}
if (result == ECA_NORMAL) return status;
string mess("CAChannelMonitor::stop() ");
mess += ca_message(result);
return Status(Status::STATUSTYPE_ERROR,mess);
isStarted = false;
monitorQueue->stop();
stopMonitorThread->callStop(eventID);
eventID = NULL;
return Status::Ok;
}