termination and error handling changes.

epicsAtExit is no longer used to call ca_context_destroy.
It is now called from the destructor for CAChannelProvider.

Several changes were made for handling errors.
This commit is contained in:
mrkraimer
2018-02-06 10:20:29 -05:00
parent 3bb2f25bed
commit fcaeb414a0
4 changed files with 90 additions and 87 deletions

View File

@@ -19,6 +19,7 @@
using namespace epics::pvData;
using std::string;
using std::cout;
using std::cerr;
using std::endl;
namespace epics {
@@ -107,8 +108,9 @@ static void ca_get_labels_handler(struct event_handler_args args)
}
else
{
// TODO better error handling
std::cerr << "failed to get labels for enum : " << ca_message(args.status) << std::endl;
string mess("ca_get_labels_handler ");
mess += ca_message(args.status);
throw std::runtime_error(mess);
}
}
@@ -174,21 +176,20 @@ static PVStructure::shared_pointer createPVStructure(CAChannel::shared_pointer c
PVScalarArrayPtr pvScalarArray = pvStructure->getSubField<PVStringArray>("value.choices");
// TODO avoid getting labels if DBR_GR_ENUM or DBR_CTRL_ENUM is used in subsequent get
int result = ca_array_get_callback(DBR_GR_ENUM, 1, channel->getChannelID(), ca_get_labels_handler, pvScalarArray.get());
int result = ca_array_get_callback(
DBR_GR_ENUM, 1, channel->getChannelID(), ca_get_labels_handler, pvScalarArray.get());
if (result == ECA_NORMAL)
{
ca_flush_io();
result = ca_flush_io();
// NOTE: we do not wait here, since all subsequent request (over TCP) is serialized
// and will guarantee that ca_get_labels_handler is called first
}
else
{
// TODO better error handling
std::cerr << "failed to get labels for enum " << channel->getChannelName() << ": " << ca_message(result) << std::endl;
if (result != ECA_NORMAL){
string mess(channel->getChannelName() + "PVStructure::shared_pointer createPVStructure ");
mess += "failed to get labels for enum ";
throw std::runtime_error(mess);
}
}
return pvStructure;
}
@@ -389,20 +390,21 @@ void CAChannel::activate(short priority)
}
ChannelRequester::shared_pointer req(channelRequester.lock());
if(!req) return;
attachContext();
int result = ca_create_channel(channelName.c_str(),
ca_connection_handler,
this,
priority, // TODO mapping
&channelID);
ca_connection_handler,
this,
priority, // TODO mapping
&channelID);
if (result == ECA_NORMAL)
{
channelCreated = true;
CAChannelProviderPtr provider(channelProvider.lock());
if(provider) provider->addChannel(shared_from_this());
req->channelCreated(Status::Ok, shared_from_this());
EXCEPTION_GUARD(req->channelCreated(Status::Ok, shared_from_this()));
} else {
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
req->channelCreated(errorStatus, shared_from_this());
EXCEPTION_GUARD(req->channelCreated(errorStatus, shared_from_this()));
}
}
@@ -428,8 +430,12 @@ void CAChannel::disconnectChannel()
channelCreated = false;
}
/* Clear CA Channel */
threadAttach();
ca_clear_channel(channelID);
attachContext();
int result = ca_clear_channel(channelID);
if (result == ECA_NORMAL) return;
string mess("CAChannel::disconnectChannel() ");
mess += ca_message(result);
cerr << mess << endl;
}
@@ -689,15 +695,14 @@ void CAChannelGetField::callRequester(CAChannelPtr const & caChannel)
/* ---------------------------------------------------------- */
void CAChannel::threadAttach()
void CAChannel::attachContext()
{
CAChannelProviderPtr provider(channelProvider.lock());
if(provider) {
std::tr1::static_pointer_cast<CAChannelProvider>(provider)->threadAttach();
std::tr1::static_pointer_cast<CAChannelProvider>(provider)->attachContext();
}
}
static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype nativeType)
{
// get "field" sub-structure
@@ -1201,16 +1206,15 @@ void CAChannelGet::getDone(struct event_handler_args &args)
copyFunc(args.dbr, args.count, pvStructure);
else
{
// TODO remove
std::cout << "no copy func implemented" << std::endl;
throw std::runtime_error("CAChannelGet::getDone no copy func implemented");
}
EXCEPTION_GUARD(getRequester->getDone(Status::Ok, shared_from_this(), pvStructure, bitSet));
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(args.status)));
EXCEPTION_GUARD(getRequester->getDone(errorStatus, shared_from_this(), PVStructure::shared_pointer(), BitSet::shared_pointer()));
EXCEPTION_GUARD(getRequester->getDone(errorStatus, shared_from_this(),
PVStructure::shared_pointer(), BitSet::shared_pointer()));
}
}
@@ -1222,7 +1226,7 @@ void CAChannelGet::get()
}
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
channel->threadAttach();
channel->attachContext();
/*
From R3.14.12 onwards ca_array_get_callback() replies will give a CA client application the current number
@@ -1237,14 +1241,12 @@ void CAChannelGet::get()
channel->getChannelID(), ca_get_handler, this);
if (result == ECA_NORMAL)
{
ca_flush_io();
result = ca_flush_io();
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
EXCEPTION_GUARD(getRequester->getDone(errorStatus, shared_from_this(), PVStructure::shared_pointer(), BitSet::shared_pointer()));
}
if (result == ECA_NORMAL) return;
string mess("CAChannelGet::get ");
mess += ca_message(result);
throw std::runtime_error(mess);
}
@@ -1594,20 +1596,21 @@ void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
}
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
channel->threadAttach();
doPut putFunc = doPutFuncTable[channel->getNativeType()];
if (putFunc)
{
// TODO now we always put all
if(block) {
channel->attachContext();
int result = putFunc(channel, this, pvPutStructure);
if (result != ECA_NORMAL)
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
string message(ca_message(result));
Status errorStatus(Status::STATUSTYPE_ERROR, message);
EXCEPTION_GUARD(putRequester->putDone(errorStatus, shared_from_this()));
}
} else {
channel->attachContext();
int result = putFunc(channel,NULL, pvPutStructure);
if (result == ECA_NORMAL)
{
@@ -1615,7 +1618,8 @@ void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
string message(ca_message(result));
Status errorStatus(Status::STATUSTYPE_ERROR,message);
EXCEPTION_GUARD(putRequester->putDone(errorStatus, shared_from_this()));
}
}
@@ -1666,20 +1670,20 @@ void CAChannelPut::get()
}
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
channel->threadAttach();
channel->attachContext();
int result = ca_array_get_callback(getType, channel->getElementCount(),
channel->getChannelID(), ca_put_get_handler, this);
channel->getChannelID(), ca_put_get_handler, this);
if (result == ECA_NORMAL)
{
ca_flush_io();
result = ca_flush_io();
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
EXCEPTION_GUARD(putRequester->getDone(errorStatus, shared_from_this(),
if (result == ECA_NORMAL) return;
string message(ca_message(result));
Status errorStatus(Status::STATUSTYPE_ERROR, message);
EXCEPTION_GUARD(putRequester->getDone(errorStatus, shared_from_this(),
PVStructure::shared_pointer(), BitSet::shared_pointer()));
}
}
@@ -1801,8 +1805,12 @@ CAChannelMonitor::~CAChannelMonitor()
std::cout << "CAChannelMonitor::~CAChannelMonitor() " << channel->getChannelName() << endl;
}
if(!isStarted) return;
channel->threadAttach();
ca_clear_subscription(eventID);
channel->attachContext();
int result = ca_clear_subscription(eventID);
if (result == ECA_NORMAL) return;
string mess("CAChannelMonitor::~CAChannelMonitor() ");
mess += ca_message(result);
cerr << mess << endl;
}
size_t CAChannelMonitor::num_instances;
@@ -1928,8 +1936,9 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
}
else
{
//Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(args.status)));
//EXCEPTION_GUARD(requester->MonitorDone(errorStatus));
string mess("CAChannelMonitor::subscriptionEvent ");
mess += ca_message(args.status);
throw std::runtime_error(mess);
}
}
@@ -1943,7 +1952,7 @@ epics::pvData::Status CAChannelMonitor::start()
status = Status(Status::STATUSTYPE_WARNING,"already started");
return status;
}
channel->threadAttach();
channel->attachContext();
/*
From R3.14.12 onwards when using the IOC server and the C++ client libraries monitor callbacks
@@ -1966,12 +1975,12 @@ epics::pvData::Status CAChannelMonitor::start()
{
isStarted = true;
monitorQueue->start();
ca_flush_io();
return status;
} else {
isStarted = false;
return Status(Status::STATUSTYPE_ERROR, string(ca_message(result)));
result = ca_flush_io();
}
if (result == ECA_NORMAL) return status;
isStarted = false;
string message(ca_message(result));
return Status(Status::STATUSTYPE_ERROR,message);
}
epics::pvData::Status CAChannelMonitor::stop()
@@ -1981,20 +1990,18 @@ epics::pvData::Status CAChannelMonitor::stop()
}
Status status = Status::Ok;
if(!isStarted) return Status(Status::STATUSTYPE_WARNING,"already stopped");
channel->threadAttach();
channel->attachContext();
int result = ca_clear_subscription(eventID);
if (result == ECA_NORMAL)
{
isStarted = false;
monitorQueue->stop();
return Status::Ok;
}
else
{
return Status(Status::STATUSTYPE_ERROR, string(ca_message(result)));
result = ca_flush_io();
}
if (result == ECA_NORMAL) return status;
string mess("CAChannelMonitor::stop() ");
mess += ca_message(result);
return Status(Status::STATUSTYPE_ERROR,mess);
}

View File

@@ -106,7 +106,7 @@ public:
/* ---------------------------------------------------------------- */
void threadAttach();
void attachContext();
void addChannelGet(const CAChannelGetPtr & get);
void addChannelPut(const CAChannelPutPtr & get);

View File

@@ -64,6 +64,7 @@ CAChannelProvider::~CAChannelProvider()
CAChannelPtr caChannel(caChannelList[i].lock());
if(caChannel) channelQ.push(caChannel);
}
caChannelList.clear();
}
while(!channelQ.empty()) {
if(DEBUG_LEVEL>0) {
@@ -74,6 +75,9 @@ CAChannelProvider::~CAChannelProvider()
channelQ.front()->disconnectChannel();
channelQ.pop();
}
attachContext();
ca_flush_io();
ca_context_destroy();
}
std::string CAChannelProvider::getProviderName()
@@ -115,10 +119,10 @@ Channel::shared_pointer CAChannelProvider::createChannel(
ChannelRequester::shared_pointer const & channelRequester,
short priority)
{
threadAttach();
static std::string emptyString;
return createChannel(channelName, channelRequester, priority, emptyString);
Channel::shared_pointer channel(
createChannel(channelName, channelRequester, priority, emptyString));
return channel;
}
Channel::shared_pointer CAChannelProvider::createChannel(
@@ -163,9 +167,15 @@ void CAChannelProvider::poll()
}
void CAChannelProvider::threadAttach()
void CAChannelProvider::attachContext()
{
ca_attach_context(current_context);
if(ca_current_context()) return;
int result = ca_attach_context(current_context);
if (result != ECA_NORMAL) {
std::cout <<
"CA error %s occurred while calling ca_attach_context:"
<< ca_message(result) << std::endl;
}
}
void CAChannelProvider::initialize()
@@ -181,19 +191,6 @@ void CAChannelProvider::initialize()
current_context = ca_current_context();
}
static
void ca_factory_cleanup(void*)
{
if(DEBUG_LEVEL>0) std::cout << "ca_factory_cleanup\n";
try {
ChannelProviderRegistry::clients()->remove("ca");
ca_context_destroy();
} catch(std::exception& e) {
LOG(logLevelWarn, "Error on unregistering \"ca\" factory: %s", e.what());
}
}
void CAClientFactory::start()
{
if(DEBUG_LEVEL>0) std::cout << "CAClientFactory::start()\n";
@@ -209,9 +206,9 @@ void CAClientFactory::start()
registerRefCounter("CAChannelPut", &CAChannelPut::num_instances);
registerRefCounter("CAChannelMonitor", &CAChannelMonitor::num_instances);
if(ChannelProviderRegistry::clients()->add<CAChannelProvider>("ca", true))
if(!ChannelProviderRegistry::clients()->add<CAChannelProvider>("ca", true))
{
epicsAtExit(&ca_factory_cleanup, NULL);
throw std::runtime_error("CAClientFactory::start failed");
}
}

View File

@@ -68,12 +68,11 @@ public:
virtual void destroy() EPICS_DEPRECATED {};
void addChannel(const CAChannelPtr & get);
void addChannel(const CAChannelPtr & channel);
/* ---------------------------------------------------------------- */
void threadAttach();
void attachContext();
private:
void initialize();