Start of major overhaul

Moved duplicated EXCEPTION_GUARD macro to private header file.
Removed all DEBUG_LEVEL and similar prints.
Some reformatting and code simplification.
This commit is contained in:
Andrew Johnson
2020-10-02 17:57:32 -05:00
committed by mdavidsaver
parent 05c2b605aa
commit 44fda51935
8 changed files with 242 additions and 387 deletions

View File

@ -10,12 +10,12 @@
#include <pv/standardField.h>
#include <pv/logger.h>
#include <pv/pvAccess.h>
#define epicsExportSharedSymbols
#include "channelConnectThread.h"
#include "monitorEventThread.h"
#include "getDoneThread.h"
#include "putDoneThread.h"
#define epicsExportSharedSymbols
#include "caChannel.h"
using namespace epics::pvData;
@ -28,19 +28,12 @@ namespace epics {
namespace pvAccess {
namespace ca {
#define EXCEPTION_GUARD(code) try { code; } \
catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }
CAChannel::shared_pointer CAChannel::create(CAChannelProvider::shared_pointer const & channelProvider,
std::string const & channelName,
short priority,
ChannelRequester::shared_pointer const & channelRequester)
{
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::create " << channelName << endl;
}
CAChannelPtr caChannel(
new CAChannel(channelName, channelProvider, channelRequester));
caChannel->activate(priority);
@ -51,42 +44,32 @@ static void ca_connection_handler(struct connection_handler_args args)
{
CAChannel *channel = static_cast<CAChannel*>(ca_puser(args.chid));
if (args.op == CA_OP_CONN_UP) {
channel->connect(true);
} else if (args.op == CA_OP_CONN_DOWN) {
channel->connect(false);
}
channel->connect(args.op == CA_OP_CONN_UP);
}
void CAChannel::connect(bool isConnected)
{
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::connect " << channelName << endl;
}
{
Lock lock(requestsMutex);
channelConnected = isConnected;
Lock lock(requestsMutex);
channelConnected = isConnected;
}
channelConnectThread->channelConnected(notifyChannelRequester);
}
void CAChannel::notifyClient()
{
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::notifyClient " << channelName << endl;
}
CAChannelProviderPtr provider(channelProvider.lock());
if(!provider) return;
if (!provider) return;
bool isConnected = false;
{
Lock lock(requestsMutex);
isConnected = channelConnected;
Lock lock(requestsMutex);
isConnected = channelConnected;
}
if(!isConnected) {
if (!isConnected) {
ChannelRequester::shared_pointer req(channelRequester.lock());
if(req) {
EXCEPTION_GUARD(req->channelStateChange(
shared_from_this(), Channel::DISCONNECTED));
if (req) {
EXCEPTION_GUARD(req->channelStateChange(shared_from_this(),
Channel::DISCONNECTED));
}
return;
}
@ -109,9 +92,9 @@ void CAChannel::notifyClient()
monitorQueue.pop();
}
ChannelRequester::shared_pointer req(channelRequester.lock());
if(req) {
EXCEPTION_GUARD(req->channelStateChange(
shared_from_this(), Channel::CONNECTED));
if (req) {
EXCEPTION_GUARD(req->channelStateChange(shared_from_this(),
Channel::CONNECTED));
}
}
@ -127,18 +110,12 @@ CAChannel::CAChannel(std::string const & channelName,
channelConnected(false),
channelConnectThread(ChannelConnectThread::get())
{
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::CAChannel " << channelName << endl;
}
}
void CAChannel::activate(short priority)
{
ChannelRequester::shared_pointer req(channelRequester.lock());
if(!req) return;
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::activate " << channelName << endl;
}
if (!req) return;
notifyChannelRequester = NotifyChannelRequesterPtr(new NotifyChannelRequester());
notifyChannelRequester->setChannel(shared_from_this());
attachContext();
@ -147,13 +124,13 @@ void CAChannel::activate(short priority)
this,
priority, // TODO mapping
&channelID);
if (result == ECA_NORMAL)
{
if (result == ECA_NORMAL) {
channelCreated = true;
CAChannelProviderPtr provider(channelProvider.lock());
if(provider) provider->addChannel(shared_from_this());
EXCEPTION_GUARD(req->channelCreated(Status::Ok, shared_from_this()));
} else {
}
else {
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
EXCEPTION_GUARD(req->channelCreated(errorStatus, shared_from_this()));
}
@ -161,42 +138,30 @@ void CAChannel::activate(short priority)
CAChannel::~CAChannel()
{
if(DEBUG_LEVEL>0) {
cout << "CAChannel::~CAChannel() " << channelName
<< " channelCreated " << (channelCreated ? "true" : "false")
<< endl;
}
{
Lock lock(requestsMutex);
if(!channelCreated) return;
if (!channelCreated) return;
}
disconnectChannel();
}
void CAChannel::disconnectChannel()
{
if(DEBUG_LEVEL>0) {
cout << "CAChannel::disconnectChannel() "
<< channelName
<< " channelCreated " << (channelCreated ? "true" : "false")
<< endl;
}
{
Lock lock(requestsMutex);
if(!channelCreated) return;
if (!channelCreated) return;
channelCreated = false;
}
std::vector<CAChannelMonitorWPtr>::iterator it;
for(it = monitorlist.begin(); it!=monitorlist.end(); ++it)
{
CAChannelMonitorPtr mon = (*it).lock();
if(!mon) continue;
mon->stop();
for (it = monitorlist.begin(); it!=monitorlist.end(); ++it) {
CAChannelMonitorPtr mon = (*it).lock();
if (!mon) continue;
mon->stop();
}
monitorlist.resize(0);
/* Clear CA Channel */
CAChannelProviderPtr provider(channelProvider.lock());
if(provider) {
if (provider) {
std::tr1::static_pointer_cast<CAChannelProvider>(provider)->attachContext();
}
int result = ca_clear_channel(channelID);
@ -251,17 +216,14 @@ std::tr1::shared_ptr<ChannelRequester> CAChannel::getChannelRequester()
void CAChannel::getField(GetFieldRequester::shared_pointer const & requester,
std::string const & subField)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannel::getField " << channelName << endl;
}
CAChannelGetFieldPtr getField(
new CAChannelGetField(shared_from_this(),requester,subField));
new CAChannelGetField(shared_from_this(),requester,subField));
{
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
getFieldQueue.push(getField);
return;
}
Lock lock(requestsMutex);
if (getConnectionState()!=Channel::CONNECTED) {
getFieldQueue.push(getField);
return;
}
}
getField->callRequester(shared_from_this());
}
@ -282,17 +244,14 @@ ChannelGet::shared_pointer CAChannel::createChannelGet(
ChannelGetRequester::shared_pointer const & channelGetRequester,
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createChannelGet " << channelName << endl;
}
CAChannelGetPtr channelGet =
CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest);
{
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
getQueue.push(channelGet);
return channelGet;
}
Lock lock(requestsMutex);
if (getConnectionState()!=Channel::CONNECTED) {
getQueue.push(channelGet);
return channelGet;
}
}
channelGet->activate();
return channelGet;
@ -303,17 +262,14 @@ ChannelPut::shared_pointer CAChannel::createChannelPut(
ChannelPutRequester::shared_pointer const & channelPutRequester,
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createChannelPut " << channelName << endl;
}
CAChannelPutPtr channelPut =
CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest);
{
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
putQueue.push(channelPut);
return channelPut;
}
Lock lock(requestsMutex);
if (getConnectionState()!=Channel::CONNECTED) {
putQueue.push(channelPut);
return channelPut;
}
}
channelPut->activate();
return channelPut;
@ -324,17 +280,14 @@ Monitor::shared_pointer CAChannel::createMonitor(
MonitorRequester::shared_pointer const & monitorRequester,
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createMonitor " << channelName << endl;
}
CAChannelMonitorPtr channelMonitor =
CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest);
{
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
monitorQueue.push(channelMonitor);
return channelMonitor;
}
Lock lock(requestsMutex);
if (getConnectionState()!=Channel::CONNECTED) {
monitorQueue.push(channelMonitor);
return channelMonitor;
}
}
channelMonitor->activate();
addMonitor(channelMonitor);
@ -344,12 +297,11 @@ Monitor::shared_pointer CAChannel::createMonitor(
void CAChannel::addMonitor(CAChannelMonitorPtr const & monitor)
{
std::vector<CAChannelMonitorWPtr>::iterator it;
for(it = monitorlist.begin(); it!=monitorlist.end(); ++it)
{
CAChannelMonitorWPtr mon = *it;
if(mon.lock()) continue;
mon = monitor;
return;
for (it = monitorlist.begin(); it!=monitorlist.end(); ++it) {
CAChannelMonitorWPtr mon = *it;
if (mon.lock()) continue;
mon = monitor;
return;
}
monitorlist.push_back(monitor);
}
@ -360,8 +312,7 @@ void CAChannel::printInfo(std::ostream& out)
ConnectionState state = getConnectionState();
out << "STATE : " << ConnectionStateNames[state] << std::endl;
if (state == CONNECTED)
{
if (state == CONNECTED) {
out << "ADDRESS : " << getRemoteAddress() << std::endl;
//out << "RIGHTS : " << getAccessRights() << std::endl;
}
@ -370,36 +321,28 @@ void CAChannel::printInfo(std::ostream& out)
CAChannelGetField::CAChannelGetField(
CAChannelPtr const &channel,
GetFieldRequester::shared_pointer const & requester,std::string const & subField)
GetFieldRequester::shared_pointer const & requester,
std::string const & subField)
: channel(channel),
getFieldRequester(requester),
subField(subField)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGetField::CAChannelGetField()\n";
}
}
void CAChannelGetField::activate()
{
CAChannelPtr chan(channel.lock());
if(chan) callRequester(chan);
if (chan) callRequester(chan);
}
CAChannelGetField::~CAChannelGetField()
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGetField::~CAChannelGetField()\n";
}
}
void CAChannelGetField::callRequester(CAChannelPtr const & caChannel)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGetField::callRequester\n";
}
GetFieldRequester::shared_pointer requester(getFieldRequester.lock());
if(!requester) return;
if (!requester) return;
PVStructurePtr pvRequest(createRequest(""));
DbdToPvPtr dbdToPv = DbdToPv::create(caChannel,pvRequest,getIO);
Structure::const_shared_pointer structure(dbdToPv->getStructure());
@ -408,12 +351,10 @@ void CAChannelGetField::callRequester(CAChannelPtr const & caChannel)
std::tr1::static_pointer_cast<const Field>(structure) :
structure->getField(subField);
if (field)
{
if (field) {
EXCEPTION_GUARD(requester->getDone(Status::Ok, field));
}
else
{
else {
Status errorStatus(Status::STATUSTYPE_ERROR, "field '" + subField + "' not found");
EXCEPTION_GUARD(requester->getDone(errorStatus, FieldConstPtr()));
}
@ -424,13 +365,13 @@ void CAChannelGetField::callRequester(CAChannelPtr const & caChannel)
void CAChannel::attachContext()
{
CAChannelProviderPtr provider(channelProvider.lock());
if(provider) {
if (provider) {
std::tr1::static_pointer_cast<CAChannelProvider>(provider)->attachContext();
return;
}
string mess("CAChannel::attachContext provider does not exist ");
mess += getChannelName();
throw std::runtime_error(mess);
throw std::runtime_error(mess);
}
CAChannelGetPtr CAChannelGet::create(
@ -438,9 +379,6 @@ CAChannelGetPtr CAChannelGet::create(
ChannelGetRequester::shared_pointer const & channelGetRequester,
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGet::create " << channel->getChannelName() << endl;
}
return CAChannelGetPtr(new CAChannelGet(channel, channelGetRequester, pvRequest));
}
@ -457,18 +395,12 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
CAChannelGet::~CAChannelGet()
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelGet::~CAChannelGet() " << channel->getChannelName() << endl;
}
}
void CAChannelGet::activate()
{
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelGet::activate " << channel->getChannelName() << endl;
}
if (!getRequester) return;
dbdToPv = DbdToPv::create(channel,pvRequest,getIO);
dbdToPv->getChoices(channel);
pvStructure = dbdToPv->createPVStructure();
@ -481,7 +413,10 @@ void CAChannelGet::activate()
std::string CAChannelGet::getRequesterName() { return "CAChannelGet";}
std::string CAChannelGet::getRequesterName()
{
return "CAChannelGet";
}
namespace {
@ -495,45 +430,31 @@ static void ca_get_handler(struct event_handler_args args)
void CAChannelGet::getDone(struct event_handler_args &args)
{
if(DEBUG_LEVEL>1) {
std::cout << "CAChannelGet::getDone "
<< channel->getChannelName() << endl;
}
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
if (!getRequester) return;
getStatus = dbdToPv->getFromDBD(pvStructure,bitSet,args);
getDoneThread->getDone(notifyGetRequester);
}
void CAChannelGet::notifyClient()
{
if(DEBUG_LEVEL>1) {
std::cout << "CAChannelGet::notifyClient " << channel->getChannelName() << endl;
}
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
if (!getRequester) return;
EXCEPTION_GUARD(getRequester->getDone(getStatus, shared_from_this(), pvStructure, bitSet));
}
void CAChannelGet::get()
{
if(DEBUG_LEVEL>1) {
std::cout << "CAChannelGet::get " << channel->getChannelName() << endl;
}
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
if (!getRequester) return;
channel->attachContext();
bitSet->clear();
int result = ca_array_get_callback(dbdToPv->getRequestType(),
0,
channel->getChannelID(), ca_get_handler, this);
if (result == ECA_NORMAL)
{
result = ca_flush_io();
}
if (result != ECA_NORMAL)
{
if (result != ECA_NORMAL) {
string mess("CAChannelGet::get ");
mess += channel->getChannelName() + " message " + ca_message(result);
getStatus = Status(Status::STATUSTYPE_ERROR,mess);
@ -559,9 +480,6 @@ CAChannelPutPtr CAChannelPut::create(
ChannelPutRequester::shared_pointer const & channelPutRequester,
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelPut::create " << channel->getChannelName() << endl;
}
return CAChannelPutPtr(new CAChannelPut(channel, channelPutRequester, pvRequest));
}
@ -581,27 +499,22 @@ CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel,
CAChannelPut::~CAChannelPut()
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelPut::~CAChannelPut() " << channel->getChannelName() << endl;
}
}
void CAChannelPut::activate()
{
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if(DEBUG_LEVEL>0) {
cout << "CAChannelPut::activate " << channel->getChannelName() << endl;
}
if (!putRequester) return;
dbdToPv = DbdToPv::create(channel,pvRequest,putIO);
dbdToPv->getChoices(channel);
pvStructure = dbdToPv->createPVStructure();
bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
PVStringPtr pvString = pvRequest->getSubField<PVString>("record._options.block");
if(pvString) {
if (pvString) {
std::string val = pvString->get();
if(val.compare("true")==0) block = true;
if (val.compare("true")==0)
block = true;
}
notifyPutRequester = NotifyPutRequesterPtr(new NotifyPutRequester());
notifyPutRequester->setChannelPut(shared_from_this());
@ -609,7 +522,10 @@ void CAChannelPut::activate()
pvStructure->getStructure()));
}
std::string CAChannelPut::getRequesterName() { return "CAChannelPut";}
std::string CAChannelPut::getRequesterName()
{
return "CAChannelPut";
}
/* --------------- epics::pvAccess::ChannelPut --------------- */
@ -634,17 +550,14 @@ static void ca_put_get_handler(struct event_handler_args args)
void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
BitSet::shared_pointer const & /*putBitSet*/)
{
if(DEBUG_LEVEL>1) {
cout << "CAChannelPut::put " << channel->getChannelName() << endl;
}
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if (!putRequester) return;
{
Lock lock(mutex);
isPut = true;
}
putStatus = dbdToPv->putToDBD(channel,pvPutStructure,block,&ca_put_handler,this);
if(!block || !putStatus.isOK()) {
if (!block || !putStatus.isOK()) {
EXCEPTION_GUARD(putRequester->putDone(putStatus, shared_from_this()));
}
}
@ -652,15 +565,12 @@ void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
void CAChannelPut::putDone(struct event_handler_args &args)
{
if(DEBUG_LEVEL>1) {
cout << "CAChannelPut::putDone " << channel->getChannelName() << endl;
}
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if(args.status!=ECA_NORMAL)
{
if (!putRequester) return;
if (args.status!=ECA_NORMAL) {
putStatus = Status(Status::STATUSTYPE_ERROR, string(ca_message(args.status)));
} else {
}
else {
putStatus = Status::Ok;
}
putDoneThread->putDone(notifyPutRequester);
@ -668,12 +578,8 @@ void CAChannelPut::putDone(struct event_handler_args &args)
void CAChannelPut::getDone(struct event_handler_args &args)
{
if(DEBUG_LEVEL>1) {
cout << "CAChannelPut::getDone " << channel->getChannelName() << endl;
}
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if (!putRequester) return;
getStatus = dbdToPv->getFromDBD(pvStructure,bitSet,args);
putDoneThread->putDone(notifyPutRequester);
}
@ -681,10 +587,11 @@ void CAChannelPut::getDone(struct event_handler_args &args)
void CAChannelPut::notifyClient()
{
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if(isPut) {
if (!putRequester) return;
if (isPut) {
EXCEPTION_GUARD(putRequester->putDone(putStatus, shared_from_this()));
} else {
}
else {
EXCEPTION_GUARD(putRequester->getDone(getStatus, shared_from_this(), pvStructure, bitSet));
}
}
@ -692,11 +599,8 @@ void CAChannelPut::notifyClient()
void CAChannelPut::get()
{
if(DEBUG_LEVEL>1) {
std::cout << "CAChannelPut::get " << channel->getChannelName() << endl;
}
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if (!putRequester) return;
{
Lock lock(mutex);
isPut = false;
@ -708,11 +612,8 @@ void CAChannelPut::get()
0,
channel->getChannelID(), ca_put_get_handler, this);
if (result == ECA_NORMAL)
{
result = ca_flush_io();
}
if (result != ECA_NORMAL)
{
if (result != ECA_NORMAL) {
string mess("CAChannelPut::get ");
mess += channel->getChannelName() + " message " +ca_message(result);
Status status(Status::STATUSTYPE_ERROR,mess);
@ -756,60 +657,68 @@ private:
std::queue<MonitorElementPtr> monitorElementQueue;
public:
CACMonitorQueue(
int32 queueSize)
: queueSize(queueSize),
isStarted(false)
{}
~CACMonitorQueue()
{
}
void start()
{
Lock guard(mutex);
while(!monitorElementQueue.empty()) monitorElementQueue.pop();
isStarted = true;
}
void stop()
{
Lock guard(mutex);
while(!monitorElementQueue.empty()) monitorElementQueue.pop();
isStarted = false;
}
CACMonitorQueue(int32 queueSize)
: queueSize(queueSize),
isStarted(false)
{
}
~CACMonitorQueue()
{
}
void start()
{
Lock guard(mutex);
while (!monitorElementQueue.empty())
monitorElementQueue.pop();
isStarted = true;
}
void stop()
{
Lock guard(mutex);
while (!monitorElementQueue.empty())
monitorElementQueue.pop();
isStarted = false;
}
bool event(
const PVStructurePtr &pvStructure,
const MonitorElementPtr & activeElement)
{
Lock guard(mutex);
if(!isStarted) return false;
if(monitorElementQueue.size()==queueSize) return false;
PVStructure::shared_pointer pvs =
getPVDataCreate()->createPVStructure(pvStructure);
MonitorElementPtr monitorElement(new MonitorElement(pvs));
*(monitorElement->changedBitSet) = *(activeElement->changedBitSet);
*(monitorElement->overrunBitSet) = *(activeElement->overrunBitSet);
monitorElementQueue.push(monitorElement);
return true;
}
MonitorElementPtr poll()
{
Lock guard(mutex);
if(!isStarted) return MonitorElementPtr();
if(monitorElementQueue.empty()) return MonitorElementPtr();
MonitorElementPtr retval = monitorElementQueue.front();
return retval;
}
void release(MonitorElementPtr const & monitorElement)
{
Lock guard(mutex);
if(!isStarted) return;
if(monitorElementQueue.empty()) {
string mess("CAChannelMonitor::release client error calling release ");
throw std::runtime_error(mess);
}
monitorElementQueue.pop();
}
bool event(
const PVStructurePtr &pvStructure,
const MonitorElementPtr &activeElement)
{
Lock guard(mutex);
if (!isStarted)
return false;
if (monitorElementQueue.size() == queueSize)
return false;
PVStructure::shared_pointer pvs =
getPVDataCreate()->createPVStructure(pvStructure);
MonitorElementPtr monitorElement(new MonitorElement(pvs));
*(monitorElement->changedBitSet) = *(activeElement->changedBitSet);
*(monitorElement->overrunBitSet) = *(activeElement->overrunBitSet);
monitorElementQueue.push(monitorElement);
return true;
}
MonitorElementPtr poll()
{
Lock guard(mutex);
if (!isStarted)
return MonitorElementPtr();
if (monitorElementQueue.empty())
return MonitorElementPtr();
MonitorElementPtr retval = monitorElementQueue.front();
return retval;
}
void release(MonitorElementPtr const &monitorElement)
{
Lock guard(mutex);
if (!isStarted)
return;
if (monitorElementQueue.empty())
{
string mess("CAChannelMonitor::release client error calling release ");
throw std::runtime_error(mess);
}
monitorElementQueue.pop();
}
};
CAChannelMonitorPtr CAChannelMonitor::create(
@ -817,9 +726,6 @@ CAChannelMonitorPtr CAChannelMonitor::create(
MonitorRequester::shared_pointer const & monitorRequester,
PVStructure::shared_pointer const & pvRequest)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelMonitor::create " << channel->getChannelName() << endl;
}
return CAChannelMonitorPtr(new CAChannelMonitor(channel, monitorRequester, pvRequest));
}
@ -839,22 +745,13 @@ CAChannelMonitor::CAChannelMonitor(
CAChannelMonitor::~CAChannelMonitor()
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelMonitor::~CAChannelMonitor() "
<< channel->getChannelName()
<< " isStarted " << (isStarted ? "true" : "false")
<< endl;
}
stop();
}
void CAChannelMonitor::activate()
{
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelMonitor::activate " << channel->getChannelName() << endl;
}
if (!requester) return;
dbdToPv = DbdToPv::create(channel,pvRequest,monitorIO);
dbdToPv->getChoices(channel);
pvStructure = dbdToPv->createPVStructure();
@ -871,13 +768,13 @@ void CAChannelMonitor::activate()
if (size > 1) queueSize = size;
}
pvString = pvOptions->getSubField<PVString>("DBE");
if(pvString) {
if (pvString) {
std::string value(pvString->get());
eventMask = 0;
if(value.find("VALUE")!=std::string::npos) eventMask|=DBE_VALUE;
if(value.find("ARCHIVE")!=std::string::npos) eventMask|=DBE_ARCHIVE;
if(value.find("ALARM")!=std::string::npos) eventMask|=DBE_ALARM;
if(value.find("PROPERTY")!=std::string::npos) eventMask|=DBE_PROPERTY;
if (value.find("VALUE")!=std::string::npos) eventMask|=DBE_VALUE;
if (value.find("ARCHIVE")!=std::string::npos) eventMask|=DBE_ARCHIVE;
if (value.find("ALARM")!=std::string::npos) eventMask|=DBE_ALARM;
if (value.find("PROPERTY")!=std::string::npos) eventMask|=DBE_PROPERTY;
}
}
notifyMonitorRequester = NotifyMonitorRequesterPtr(new NotifyMonitorRequester());
@ -887,37 +784,35 @@ void CAChannelMonitor::activate()
pvStructure->getStructure()));
}
std::string CAChannelMonitor::getRequesterName() { return "CAChannelMonitor";}
std::string CAChannelMonitor::getRequesterName()
{
return "CAChannelMonitor";
}
void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
{
if(DEBUG_LEVEL>1) {
std::cout << "CAChannelMonitor::subscriptionEvent "
<< channel->getChannelName() << endl;
}
{
Lock lock(mutex);
if(!isStarted) return;
if (!isStarted) return;
}
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
if (!requester) return;
Status status = dbdToPv->getFromDBD(pvStructure,activeElement->changedBitSet,args);
if(status.isOK())
{
if(monitorQueue->event(pvStructure,activeElement)) {
if (status.isOK()) {
if (monitorQueue->event(pvStructure,activeElement)) {
activeElement->changedBitSet->clear();
activeElement->overrunBitSet->clear();
} else {
}
else {
*(activeElement->overrunBitSet) |= *(activeElement->changedBitSet);
}
monitorEventThread->event(notifyMonitorRequester);
}
else
{
else {
string mess("CAChannelMonitor::subscriptionEvent ");
mess += channel->getChannelName();
mess += ca_message(args.status);
throw std::runtime_error(mess);
throw std::runtime_error(mess);
}
}
@ -929,22 +824,16 @@ void CAChannelMonitor::notifyClient()
if(!isStarted) return;
}
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
if (!requester) return;
requester->monitorEvent(shared_from_this());
}
Status CAChannelMonitor::start()
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelMonitor::start " << channel->getChannelName() << endl;
}
Status status = Status::Ok;
{
Lock lock(mutex);
if(isStarted) {
status = Status(Status::STATUSTYPE_WARNING,"already started");
return status;
}
if (isStarted)
return Status(Status::STATUSTYPE_WARNING, "already started");
isStarted = true;
monitorQueue->start();
}
@ -955,61 +844,47 @@ Status CAChannelMonitor::start()
ca_subscription_handler, this,
&pevid);
if (result == ECA_NORMAL)
{
result = ca_flush_io();
if (result == ECA_NORMAL)
return Status::Ok;
{
Lock lock(mutex);
isStarted = false;
}
if (result == ECA_NORMAL) return status;
isStarted = false;
string message(ca_message(result));
return Status(Status::STATUSTYPE_ERROR,message);
return Status(Status::STATUSTYPE_ERROR, string(ca_message(result)));
}
Status CAChannelMonitor::stop()
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelMonitor::stop "
<< channel->getChannelName()
<< " isStarted " << (isStarted ? "true" : "false")
<< endl;
}
{
Lock lock(mutex);
if(!isStarted) return Status(Status::STATUSTYPE_WARNING,"already stopped");
isStarted = false;
Lock lock(mutex);
if (!isStarted)
return Status(Status::STATUSTYPE_WARNING, "already stopped");
isStarted = false;
}
monitorQueue->stop();
int result = ca_clear_subscription(pevid);
if(result==ECA_NORMAL) return Status::Ok;
return Status(Status::STATUSTYPE_ERROR,string(ca_message(result)));
if (result==ECA_NORMAL)
return Status::Ok;
return Status(Status::STATUSTYPE_ERROR, string(ca_message(result)));
}
MonitorElementPtr CAChannelMonitor::poll()
{
if(DEBUG_LEVEL>1) {
std::cout << "CAChannelMonitor::poll " << channel->getChannelName() << endl;
}
{
Lock lock(mutex);
if(!isStarted) return MonitorElementPtr();
Lock lock(mutex);
if (!isStarted) return MonitorElementPtr();
}
return monitorQueue->poll();
}
void CAChannelMonitor::release(MonitorElementPtr const & monitorElement)
{
if(DEBUG_LEVEL>1) {
std::cout << "CAChannelMonitor::release " << channel->getChannelName() << endl;
}
monitorQueue->release(monitorElement);
}
/* --------------- ChannelRequest --------------- */
void CAChannelMonitor::cancel()
{
// noop
}

View File

@ -28,10 +28,6 @@ namespace ca {
using namespace epics::pvData;
#define EXCEPTION_GUARD(code) try { code; } \
catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }
CAChannelProvider::CAChannelProvider()
: current_context(0)
{
@ -45,47 +41,32 @@ CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr<Configuration>&)
getDoneThread(GetDoneThread::get()),
putDoneThread(PutDoneThread::get())
{
if(DEBUG_LEVEL>0) {
std::cout<< "CAChannelProvider::CAChannelProvider\n";
}
initialize();
}
CAChannelProvider::~CAChannelProvider()
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelProvider::~CAChannelProvider()"
<< " caChannelList.size() " << caChannelList.size()
<< std::endl;
}
std::queue<CAChannelPtr> channelQ;
{
Lock lock(channelListMutex);
for(size_t i=0; i< caChannelList.size(); ++i)
{
CAChannelPtr caChannel(caChannelList[i].lock());
if(caChannel) channelQ.push(caChannel);
}
caChannelList.clear();
Lock lock(channelListMutex);
for (size_t i = 0; i < caChannelList.size(); ++i)
{
CAChannelPtr caChannel(caChannelList[i].lock());
if (caChannel)
channelQ.push(caChannel);
}
caChannelList.clear();
}
while(!channelQ.empty()) {
if(DEBUG_LEVEL>0) {
std::cout << "~CAChannelProvider() calling disconnectChannel "
<< channelQ.front()->getChannelName()
<< std::endl;
}
channelQ.front()->disconnectChannel();
channelQ.pop();
while (!channelQ.empty())
{
channelQ.front()->disconnectChannel();
channelQ.pop();
}
putDoneThread->stop();
getDoneThread->stop();
monitorEventThread->stop();
channelConnectThread->stop();
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelProvider::~CAChannelProvider() calling ca_context_destroy\n";
}
ca_context_destroy();
//std::cout << "CAChannelProvider::~CAChannelProvider() returning\n";
}
std::string CAChannelProvider::getProviderName()
@ -94,8 +75,8 @@ std::string CAChannelProvider::getProviderName()
}
ChannelFind::shared_pointer CAChannelProvider::channelFind(
std::string const & channelName,
ChannelFindRequester::shared_pointer const & channelFindRequester)
std::string const &channelName,
ChannelFindRequester::shared_pointer const &channelFindRequester)
{
if (channelName.empty())
throw std::invalid_argument("CAChannelProvider::channelFind empty channel name");
@ -110,7 +91,7 @@ ChannelFind::shared_pointer CAChannelProvider::channelFind(
}
ChannelFind::shared_pointer CAChannelProvider::channelList(
ChannelListRequester::shared_pointer const & channelListRequester)
ChannelListRequester::shared_pointer const &channelListRequester)
{
if (!channelListRequester.get())
throw std::runtime_error("CAChannelProvider::channelList null requester");
@ -123,8 +104,8 @@ ChannelFind::shared_pointer CAChannelProvider::channelList(
}
Channel::shared_pointer CAChannelProvider::createChannel(
std::string const & channelName,
ChannelRequester::shared_pointer const & channelRequester,
std::string const &channelName,
ChannelRequester::shared_pointer const &channelRequester,
short priority)
{
Channel::shared_pointer channel(
@ -133,10 +114,10 @@ Channel::shared_pointer CAChannelProvider::createChannel(
}
Channel::shared_pointer CAChannelProvider::createChannel(
std::string const & channelName,
ChannelRequester::shared_pointer const & channelRequester,
std::string const &channelName,
ChannelRequester::shared_pointer const &channelRequester,
short priority,
std::string const & address)
std::string const &address)
{
if (!address.empty())
throw std::invalid_argument("CAChannelProvider::createChannel does not support 'address' parameter");
@ -144,19 +125,16 @@ Channel::shared_pointer CAChannelProvider::createChannel(
return CAChannel::create(shared_from_this(), channelName, priority, channelRequester);
}
void CAChannelProvider::addChannel(const CAChannelPtr & channel)
void CAChannelProvider::addChannel(const CAChannelPtr &channel)
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelProvider::addChannel "
<< channel->getChannelName()
<< std::endl;
}
Lock lock(channelListMutex);
for(size_t i=0; i< caChannelList.size(); ++i) {
if(!(caChannelList[i].lock())) {
caChannelList[i] = channel;
return;
}
for (size_t i = 0; i < caChannelList.size(); ++i)
{
if (!(caChannelList[i].lock()))
{
caChannelList[i] = channel;
return;
}
}
caChannelList.push_back(channel);
}
@ -175,12 +153,15 @@ void CAChannelProvider::poll()
void CAChannelProvider::attachContext()
{
ca_client_context* thread_context = ca_current_context();
if (thread_context == current_context) return;
ca_client_context *thread_context = ca_current_context();
if (thread_context == current_context)
return;
int result = ca_attach_context(current_context);
if(result==ECA_ISATTACHED) return;
if (result != ECA_NORMAL) {
std::string mess("CAChannelProvider::attachContext error calling ca_attach_context ");
if (result == ECA_ISATTACHED)
return;
if (result != ECA_NORMAL)
{
std::string mess("CAChannelProvider::attachContext error calling ca_attach_context ");
mess += ca_message(result);
throw std::runtime_error(mess);
}
@ -188,9 +169,9 @@ void CAChannelProvider::attachContext()
void CAChannelProvider::initialize()
{
if(DEBUG_LEVEL>0) std::cout << "CAChannelProvider::initialize()\n";
int result = ca_context_create(ca_enable_preemptive_callback);
if (result != ECA_NORMAL) {
if (result != ECA_NORMAL)
{
std::string mess("CAChannelProvider::initialize error calling ca_context_create ");
mess += ca_message(result);
throw std::runtime_error(mess);
@ -200,21 +181,16 @@ void CAChannelProvider::initialize()
void CAClientFactory::start()
{
if(DEBUG_LEVEL>0) std::cout << "CAClientFactory::start()\n";
if(ChannelProviderRegistry::clients()->getProvider("ca")) {
return;
}
if (ChannelProviderRegistry::clients()->getProvider("ca"))
return;
epicsSignalInstallSigAlarmIgnore();
epicsSignalInstallSigPipeIgnore();
if(!ChannelProviderRegistry::clients()->add<CAChannelProvider>("ca", true))
{
throw std::runtime_error("CAClientFactory::start failed");
}
if (!ChannelProviderRegistry::clients()->add<CAChannelProvider>("ca", true))
throw std::runtime_error("CAClientFactory::start failed");
}
void CAClientFactory::stop()
{
// unregister now done with exit hook
}
}}}

View File

@ -22,8 +22,15 @@ namespace epics {
namespace pvAccess {
namespace ca {
#define DEBUG_LEVEL 0
#define EXCEPTION_GUARD(code) try { code; } \
catch (std::exception &e) { \
LOG(logLevelError, "Unhandled exception from client code at %s:%d: %s", \
__FILE__, __LINE__, e.what()); \
} \
catch (...) { \
LOG(logLevelError, "Unhandled exception from client code at %s:%d.", \
__FILE__, __LINE__); \
}
class ChannelConnectThread;
typedef std::tr1::shared_ptr<ChannelConnectThread> ChannelConnectThreadPtr;

View File

@ -39,13 +39,13 @@ ChannelConnectThread::ChannelConnectThread()
ChannelConnectThread::~ChannelConnectThread()
{
//std::cout << "ChannelConnectThread::~ChannelConnectThread()\n";
}
void ChannelConnectThread::start()
{
thread = std::tr1::shared_ptr<epicsThread>(new epicsThread(
if (thread) return;
thread = std::tr1::shared_ptr<epicsThread>(new epicsThread(
*this,
"channelConnectThread",
epicsThreadGetStackSize(epicsThreadStackSmall),

View File

@ -39,7 +39,6 @@ GetDoneThread::GetDoneThread()
GetDoneThread::~GetDoneThread()
{
//std::cout << "GetDoneThread::~GetDoneThread()\n";
}

View File

@ -39,7 +39,6 @@ MonitorEventThread::MonitorEventThread()
MonitorEventThread::~MonitorEventThread()
{
//std::cout << "MonitorEventThread::~MonitorEventThread()\n";
}
void MonitorEventThread::start()

View File

@ -39,7 +39,6 @@ PutDoneThread::PutDoneThread()
PutDoneThread::~PutDoneThread()
{
//std::cout << "PutDoneThread::~PutDoneThread()\n";
}

View File

@ -47,7 +47,7 @@ public:
static void start();
/** @brief stop provider ca
*
* This does nothing since epicsAtExit is used to destroy the instance.
* This does nothing.
*/
static void stop();
};