Merge pull request #67 from mrkraimer/master

caProvider: use weak_pointer where required
This commit is contained in:
Marty Kraimer
2017-10-18 06:14:52 -04:00
committed by GitHub
5 changed files with 171 additions and 215 deletions

View File

@@ -18,6 +18,8 @@
using namespace epics::pvData;
using std::string;
using std::cout;
using std::endl;
namespace epics {
namespace pvAccess {
@@ -27,18 +29,14 @@ namespace ca {
catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }
#define PVACCESS_REFCOUNT_MONITOR_DEFINE(name)
#define PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(name)
#define PVACCESS_REFCOUNT_MONITOR_DESTRUCT(name)
PVACCESS_REFCOUNT_MONITOR_DEFINE(caChannel);
CAChannel::shared_pointer CAChannel::create(CAChannelProvider::shared_pointer const & channelProvider,
std::string const & channelName,
short priority,
ChannelRequester::shared_pointer const & channelRequester)
{
CAChannel::shared_pointer thisPtr(new CAChannel(channelName, channelProvider, channelRequester));
CAChannelPtr thisPtr(
new CAChannel(channelName, channelProvider, channelRequester));
thisPtr->activate(priority);
return thisPtr;
}
@@ -249,7 +247,8 @@ void CAChannel::connected()
"value,timeStamp,alarm,display" :
"value,timeStamp,alarm,display,valueAlarm,control" :
"value,timeStamp,alarm";
Structure::const_shared_pointer structure = createStructure(shared_from_this(), allProperties);
Structure::const_shared_pointer structure = createStructure(
shared_from_this(), allProperties);
// TODO we need only Structure here
this->structure = structure;
@@ -302,7 +301,11 @@ void CAChannel::connected()
monitorQ.front()->channelCreated(Status::Ok,shared_from_this());
monitorQ.pop();
}
EXCEPTION_GUARD(channelRequester->channelStateChange(shared_from_this(), Channel::CONNECTED));
ChannelRequester::shared_pointer req(channelRequester.lock());
if(req) {
EXCEPTION_GUARD(req->channelStateChange(
shared_from_this(), Channel::CONNECTED));
}
}
void CAChannel::disconnected()
@@ -343,7 +346,11 @@ void CAChannel::disconnected()
monitorQ.front()->channelDisconnect(false);
monitorQ.pop();
}
EXCEPTION_GUARD(channelRequester->channelStateChange(shared_from_this(), Channel::DISCONNECTED));
ChannelRequester::shared_pointer req(channelRequester.lock());
if(req) {
EXCEPTION_GUARD(req->channelStateChange(
shared_from_this(), Channel::DISCONNECTED));
}
}
size_t CAChannel::num_instances;
@@ -356,15 +363,17 @@ CAChannel::CAChannel(std::string const & _channelName,
channelRequester(_channelRequester),
channelID(0),
channelType(0),
elementCount(0),
destroyed(false)
elementCount(0)
{
REFTRACE_INCREMENT(num_instances);
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(caChannel);
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::CAChannel " << channelName << endl;
}
}
void CAChannel::activate(short priority)
{
ChannelRequester::shared_pointer req(channelRequester.lock());
if(!req) return;
int result = ca_create_channel(channelName.c_str(),
ca_connection_handler,
this,
@@ -372,15 +381,10 @@ void CAChannel::activate(short priority)
&channelID);
if (result == ECA_NORMAL)
{
channelProvider->registerChannel(shared_from_this());
// TODO be sure that ca_connection_handler is not called before this call
channelRequester->channelCreated(Status::Ok, shared_from_this());
}
else
{
req->channelCreated(Status::Ok, shared_from_this());
} else {
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
channelRequester->channelCreated(errorStatus, shared_from_this());
req->channelCreated(errorStatus, shared_from_this());
}
}
@@ -423,20 +427,12 @@ void CAChannel::addChannelMonitor(const CAChannelMonitorPtr & monitor)
CAChannel::~CAChannel()
{
PVACCESS_REFCOUNT_MONITOR_DESTRUCT(caChannel);
{
Lock lock(requestsMutex);
if (destroyed)
return;
destroyed = true;
if(DEBUG_LEVEL>0) {
cout << "CAChannel::~CAChannel() " << channelName << endl;
}
channelProvider->unregisterChannel(this);
/* Clear CA Channel */
threadAttach();
ca_clear_channel(channelID);
REFTRACE_DECREMENT(num_instances);
}
@@ -465,7 +461,7 @@ Structure::const_shared_pointer CAChannel::getStructure()
std::tr1::shared_ptr<ChannelProvider> CAChannel::getProvider()
{
return channelProvider;
return channelProvider.lock();
}
@@ -497,7 +493,7 @@ std::string CAChannel::getChannelName()
std::tr1::shared_ptr<ChannelRequester> CAChannel::getChannelRequester()
{
return channelRequester;
return channelRequester.lock();
}
void CAChannel::getField(GetFieldRequester::shared_pointer const & requester,
@@ -587,29 +583,14 @@ void CAChannel::printInfo(std::ostream& out)
}
/* --------------- Destroyable --------------- */
void CAChannel::destroy()
{
Lock lock(requestsMutex);
{
if (destroyed) return;
destroyed = true;
}
channelProvider->unregisterChannel(shared_from_this());
/* Clear CA Channel */
threadAttach();
ca_clear_channel(channelID);
}
/* ---------------------------------------------------------- */
void CAChannel::threadAttach()
{
std::tr1::static_pointer_cast<CAChannelProvider>(channelProvider)->threadAttach();
CAChannelProviderPtr provider(channelProvider.lock());
if(provider) {
std::tr1::static_pointer_cast<CAChannelProvider>(provider)->threadAttach();
}
}
@@ -661,31 +642,38 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
:
channel(channel),
channelGetRequester(channelGetRequester),
pvRequest(pvRequest),
lastRequestFlag(false)
pvRequest(pvRequest)
{
REFTRACE_INCREMENT(num_instances);
if(DEBUG_LEVEL>0) {
cout << "CAChannelGet::CAChannelGet() " << channel->getChannelName() << endl;
}
}
CAChannelGet::~CAChannelGet()
{
REFTRACE_DECREMENT(num_instances);
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelGet::~CAChannelGet() " << channel->getChannelName() << endl;
}
}
void CAChannelGet::activate()
{
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
if(pvStructure) throw std::runtime_error("CAChannelGet::activate() was called twice");
getType = getDBRType(pvRequest, channel->getNativeType());
pvStructure = createPVStructure(channel, getType, pvRequest);
bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
bitSet->set(0);
channel->addChannelGet(shared_from_this());
EXCEPTION_GUARD(channelGetRequester->channelGetConnect(Status::Ok, shared_from_this(),
EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
void CAChannelGet::channelCreated(const Status& status,Channel::shared_pointer const & cl)
{
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
chtype newType = getDBRType(pvRequest, channel->getNativeType());
if(newType!=getType) {
getType = getDBRType(pvRequest, channel->getNativeType());
@@ -693,7 +681,7 @@ void CAChannelGet::channelCreated(const Status& status,Channel::shared_pointer c
bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
bitSet->set(0);
}
EXCEPTION_GUARD(channelGetRequester->channelGetConnect(Status::Ok, shared_from_this(),
EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
@@ -701,14 +689,18 @@ void CAChannelGet::channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
EXCEPTION_GUARD(channelGetRequester->channelDisconnect(connectionState==Channel::DESTROYED);)
EXCEPTION_GUARD(getRequester->channelDisconnect(connectionState==Channel::DESTROYED);)
}
}
void CAChannelGet::channelDisconnect(bool destroy)
{
EXCEPTION_GUARD(channelGetRequester->channelDisconnect(destroy);)
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
EXCEPTION_GUARD(getRequester->channelDisconnect(destroy);)
}
/* --------------- epics::pvAccess::ChannelGet --------------- */
@@ -1075,6 +1067,8 @@ static copyDBRtoPVStructure copyFuncTable[] =
void CAChannelGet::getDone(struct event_handler_args &args)
{
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
if (args.status == ECA_NORMAL)
{
copyDBRtoPVStructure copyFunc = copyFuncTable[getType];
@@ -1086,18 +1080,21 @@ void CAChannelGet::getDone(struct event_handler_args &args)
std::cout << "no copy func implemented" << std::endl;
}
EXCEPTION_GUARD(channelGetRequester->getDone(Status::Ok, shared_from_this(), pvStructure, bitSet));
EXCEPTION_GUARD(getRequester->getDone(Status::Ok, shared_from_this(), pvStructure, bitSet));
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(args.status)));
EXCEPTION_GUARD(channelGetRequester->getDone(errorStatus, shared_from_this(), PVStructure::shared_pointer(), BitSet::shared_pointer()));
EXCEPTION_GUARD(getRequester->getDone(errorStatus, shared_from_this(), PVStructure::shared_pointer(), BitSet::shared_pointer()));
}
}
void CAChannelGet::get()
{
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
channel->threadAttach();
/*
@@ -1118,11 +1115,9 @@ void CAChannelGet::get()
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
EXCEPTION_GUARD(channelGetRequester->getDone(errorStatus, shared_from_this(), PVStructure::shared_pointer(), BitSet::shared_pointer()));
EXCEPTION_GUARD(getRequester->getDone(errorStatus, shared_from_this(), PVStructure::shared_pointer(), BitSet::shared_pointer()));
}
if (lastRequestFlag)
destroy();
}
@@ -1140,17 +1135,12 @@ void CAChannelGet::cancel()
void CAChannelGet::lastRequest()
{
// TODO sync !!!
lastRequestFlag = true;
std::cout << "CAChannelGet::lastRequest() "
<< channel->getChannelName()
<< " does not do anything"
<< endl;
}
/* --------------- Destroyable --------------- */
void CAChannelGet::destroy()
{
// TODO
}
CAChannelPutPtr CAChannelPut::create(
@@ -1164,7 +1154,9 @@ CAChannelPutPtr CAChannelPut::create(
CAChannelPut::~CAChannelPut()
{
REFTRACE_DECREMENT(num_instances);
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelPut::~CAChannelPut() " << channel->getChannelName() << endl;
}
}
size_t CAChannelPut::num_instances;
@@ -1176,14 +1168,17 @@ CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel,
channel(channel),
channelPutRequester(channelPutRequester),
pvRequest(pvRequest),
block(false),
lastRequestFlag(false)
block(false)
{
REFTRACE_INCREMENT(num_instances);
if(DEBUG_LEVEL>0) {
cout << "CAChannelPut::CAChannePut() " << channel->getChannelName() << endl;
}
}
void CAChannelPut::activate()
{
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if(pvStructure) throw std::runtime_error("CAChannelPut::activate() was called twice");
getType = getDBRType(pvRequest,channel->getNativeType());
pvStructure = createPVStructure(channel, getType, pvRequest);
@@ -1195,13 +1190,15 @@ void CAChannelPut::activate()
}
bitSet->set(pvStructure->getSubFieldT("value")->getFieldOffset());
channel->addChannelPut(shared_from_this());
EXCEPTION_GUARD(channelPutRequester->channelPutConnect(Status::Ok, shared_from_this(),
EXCEPTION_GUARD(putRequester->channelPutConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
void CAChannelPut::channelCreated(const Status& status,Channel::shared_pointer const & c)
{
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
chtype newType = getDBRType(pvRequest, channel->getNativeType());
if(newType!=getType) {
getType = getDBRType(pvRequest, channel->getNativeType());
@@ -1214,7 +1211,7 @@ void CAChannelPut::channelCreated(const Status& status,Channel::shared_pointer c
}
bitSet->set(0);
}
EXCEPTION_GUARD(channelPutRequester->channelPutConnect(Status::Ok, shared_from_this(),
EXCEPTION_GUARD(putRequester->channelPutConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
@@ -1222,14 +1219,18 @@ void CAChannelPut::channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
EXCEPTION_GUARD(channelPutRequester->channelDisconnect(connectionState==Channel::DESTROYED);)
EXCEPTION_GUARD(putRequester->channelDisconnect(connectionState==Channel::DESTROYED);)
}
}
void CAChannelPut::channelDisconnect(bool destroy)
{
EXCEPTION_GUARD(channelPutRequester->channelDisconnect(destroy);)
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
EXCEPTION_GUARD(putRequester->channelDisconnect(destroy);)
}
/* --------------- epics::pvAccess::ChannelPut --------------- */
@@ -1433,20 +1434,24 @@ static doPut doPutFuncTable[] =
void CAChannelPut::putDone(struct event_handler_args &args)
{
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if (args.status == ECA_NORMAL)
{
EXCEPTION_GUARD(channelPutRequester->putDone(Status::Ok, shared_from_this()));
EXCEPTION_GUARD(putRequester->putDone(Status::Ok, shared_from_this()));
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(args.status)));
EXCEPTION_GUARD(channelPutRequester->putDone(errorStatus, shared_from_this()));
EXCEPTION_GUARD(putRequester->putDone(errorStatus, shared_from_this()));
}
}
void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
BitSet::shared_pointer const & /*putBitSet*/)
{
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
channel->threadAttach();
doPut putFunc = doPutFuncTable[channel->getNativeType()];
if (putFunc)
@@ -1458,18 +1463,18 @@ void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
if (result != ECA_NORMAL)
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
EXCEPTION_GUARD(channelPutRequester->putDone(errorStatus, shared_from_this()));
EXCEPTION_GUARD(putRequester->putDone(errorStatus, shared_from_this()));
}
} else {
int result = putFunc(channel,NULL, pvPutStructure);
if (result == ECA_NORMAL)
{
EXCEPTION_GUARD(channelPutRequester->putDone(Status::Ok, shared_from_this()));
EXCEPTION_GUARD(putRequester->putDone(Status::Ok, shared_from_this()));
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
EXCEPTION_GUARD(channelPutRequester->putDone(errorStatus, shared_from_this()));
EXCEPTION_GUARD(putRequester->putDone(errorStatus, shared_from_this()));
}
}
}
@@ -1479,14 +1484,13 @@ void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
std::cout << "no put func implemented" << std::endl;
}
// TODO here???!!!
if (lastRequestFlag)
destroy();
}
void CAChannelPut::getDone(struct event_handler_args &args)
{
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
if (args.status == ECA_NORMAL)
{
copyDBRtoPVStructure copyFunc = copyFuncTable[getType];
@@ -1498,23 +1502,22 @@ void CAChannelPut::getDone(struct event_handler_args &args)
std::cout << "no copy func implemented" << std::endl;
}
EXCEPTION_GUARD(channelPutRequester->getDone(Status::Ok, shared_from_this(), pvStructure, bitSet));
EXCEPTION_GUARD(putRequester->getDone(Status::Ok, shared_from_this(), pvStructure, bitSet));
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(args.status)));
EXCEPTION_GUARD(channelPutRequester->getDone(errorStatus, shared_from_this(),
EXCEPTION_GUARD(putRequester->getDone(errorStatus, shared_from_this(),
PVStructure::shared_pointer(), BitSet::shared_pointer()));
}
// TODO here???!!!
if (lastRequestFlag)
destroy();
}
void CAChannelPut::get()
{
ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
if(!putRequester) return;
channel->threadAttach();
int result = ca_array_get_callback(getType, channel->getElementCount(),
@@ -1526,7 +1529,7 @@ void CAChannelPut::get()
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
EXCEPTION_GUARD(channelPutRequester->getDone(errorStatus, shared_from_this(),
EXCEPTION_GUARD(putRequester->getDone(errorStatus, shared_from_this(),
PVStructure::shared_pointer(), BitSet::shared_pointer()));
}
}
@@ -1547,17 +1550,12 @@ void CAChannelPut::cancel()
void CAChannelPut::lastRequest()
{
// TODO sync !!!
lastRequestFlag = true;
std::cout << "CAChannelPut::lastRequest() "
<< channel->getChannelName()
<< " does not do anything"
<< endl;
}
/* --------------- Destroyable --------------- */
void CAChannelPut::destroy()
{
// TODO
}
/* --------------- Monitor --------------- */
@@ -1656,10 +1654,12 @@ CAChannelMonitorPtr CAChannelMonitor::create(
CAChannelMonitor::~CAChannelMonitor()
{
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelMonitor::~CAChannelMonitor() " << channel->getChannelName() << endl;
}
if(!isStarted) return;
channel->threadAttach();
ca_clear_subscription(eventID);
REFTRACE_DECREMENT(num_instances);
}
size_t CAChannelMonitor::num_instances;
@@ -1674,11 +1674,15 @@ CAChannelMonitor::CAChannelMonitor(
pvRequest(pvRequest),
isStarted(false)
{
REFTRACE_INCREMENT(num_instances);
if(DEBUG_LEVEL>0) {
cout << "CAChannelMonitor::CAChannelMonitor() " << channel->getChannelName() << endl;
}
}
void CAChannelMonitor::activate()
{
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
if(pvStructure) throw std::runtime_error("CAChannelMonitor::activate() was called twice");
getType = getDBRType(pvRequest, channel->getNativeType());
pvStructure = createPVStructure(channel, getType, pvRequest);
@@ -1696,12 +1700,14 @@ void CAChannelMonitor::activate()
}
monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize));
channel->addChannelMonitor(shared_from_this());
EXCEPTION_GUARD(monitorRequester->monitorConnect(Status::Ok, shared_from_this(),
EXCEPTION_GUARD(requester->monitorConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
void CAChannelMonitor::channelCreated(const Status& status,Channel::shared_pointer const & c)
{
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
chtype newType = getDBRType(pvRequest, channel->getNativeType());
if(newType!=getType) {
getType = getDBRType(pvRequest, channel->getNativeType());
@@ -1720,7 +1726,7 @@ void CAChannelMonitor::channelCreated(const Status& status,Channel::shared_point
}
monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize));
}
EXCEPTION_GUARD(monitorRequester->monitorConnect(Status::Ok, shared_from_this(),
EXCEPTION_GUARD(requester->monitorConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
@@ -1728,19 +1734,25 @@ void CAChannelMonitor::channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
EXCEPTION_GUARD(monitorRequester->channelDisconnect(connectionState==Channel::DESTROYED);)
EXCEPTION_GUARD(requester->channelDisconnect(connectionState==Channel::DESTROYED);)
}
}
void CAChannelMonitor::channelDisconnect(bool destroy)
{
EXCEPTION_GUARD(monitorRequester->channelDisconnect(destroy);)
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
EXCEPTION_GUARD(requester->channelDisconnect(destroy);)
}
void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
{
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
if (args.status == ECA_NORMAL)
{
copyDBRtoPVStructure copyFunc = copyFuncTable[getType];
@@ -1748,7 +1760,7 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
copyFunc(args.dbr, args.count, pvStructure);
monitorQueue->event(pvStructure);
// call monitorRequester even if queue is full
monitorRequester->monitorEvent(shared_from_this());
requester->monitorEvent(shared_from_this());
} else {
std::cout << "no copy func implemented" << std::endl;
@@ -1757,7 +1769,7 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
else
{
//Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(args.status)));
//EXCEPTION_GUARD(channelMonitorRequester->MonitorDone(errorStatus));
//EXCEPTION_GUARD(requester->MonitorDone(errorStatus));
}
}
@@ -1801,6 +1813,7 @@ epics::pvData::Status CAChannelMonitor::start()
epics::pvData::Status CAChannelMonitor::stop()
{
Status status = Status::Ok;
if(!isStarted) return Status(Status::STATUSTYPE_WARNING,"already stopped");
channel->threadAttach();
@@ -1837,15 +1850,5 @@ void CAChannelMonitor::cancel()
// noop
}
/* --------------- Destroyable --------------- */
void CAChannelMonitor::destroy()
{
if(!isStarted) return;
channel->threadAttach();
ca_clear_subscription(eventID);
isStarted = false;
}
}}}

View File

@@ -24,7 +24,6 @@ namespace ca {
class CAChannel;
typedef std::tr1::shared_ptr<CAChannel> CAChannelPtr;
typedef std::tr1::weak_ptr<CAChannel> CAChannelWPtr;
class CAChannelPut;
typedef std::tr1::shared_ptr<CAChannelPut> CAChannelPutPtr;
typedef std::tr1::weak_ptr<CAChannelPut> CAChannelPutWPtr;
@@ -91,7 +90,7 @@ public:
/* --------------- Destroyable --------------- */
virtual void destroy();
virtual void destroy() EPICS_DEPRECATED {};
/* ---------------------------------------------------------------- */
@@ -111,8 +110,8 @@ private:
std::string channelName;
CAChannelProvider::shared_pointer channelProvider;
ChannelRequester::shared_pointer channelRequester;
CAChannelProviderWPtr channelProvider;
ChannelRequester::weak_pointer channelRequester;
chid channelID;
chtype channelType;
@@ -121,7 +120,6 @@ private:
epics::pvData::Mutex requestsMutex;
bool destroyed;
std::queue<CAChannelPutPtr> putQueue;
std::queue<CAChannelGetPtr> getQueue;
std::queue<CAChannelMonitorPtr> monitorQueue;
@@ -170,7 +168,7 @@ public:
/* --------------- ChannelBaseRequester --------------- */
virtual void channelDisconnect(bool destroy);
/* --------------- Destroyable --------------- */
virtual void destroy();
virtual void destroy() EPICS_DEPRECATED {};
void activate();
@@ -180,10 +178,9 @@ private:
ChannelGetRequester::shared_pointer const & _channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
CAChannel::shared_pointer channel;
ChannelGetRequester::shared_pointer channelGetRequester;
CAChannelPtr channel;
ChannelGetRequester::weak_pointer channelGetRequester;
epics::pvData::PVStructure::shared_pointer pvRequest;
bool lastRequestFlag;
chtype getType;
epics::pvData::PVStructure::shared_pointer pvStructure;
@@ -239,7 +236,7 @@ public:
virtual void channelDisconnect(bool destroy);
/* --------------- Destroyable --------------- */
virtual void destroy();
virtual void destroy() EPICS_DEPRECATED {};
void activate();
@@ -249,11 +246,10 @@ private:
ChannelPutRequester::shared_pointer const & _channelPutRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest);
CAChannel::shared_pointer channel;
ChannelPutRequester::shared_pointer channelPutRequester;
CAChannelPtr channel;
ChannelPutRequester::weak_pointer channelPutRequester;
epics::pvData::PVStructure::shared_pointer pvRequest;
bool block;
bool lastRequestFlag;
chtype getType;
epics::pvData::PVStructure::shared_pointer pvStructure;
@@ -303,7 +299,7 @@ public:
/* --------------- ChannelBaseRequester --------------- */
virtual void channelDisconnect(bool destroy);
/* --------------- Destroyable --------------- */
virtual void destroy();
virtual void destroy() EPICS_DEPRECATED {};
void activate();
private:
@@ -313,7 +309,7 @@ private:
CAChannelPtr channel;
MonitorRequester::shared_pointer monitorRequester;
MonitorRequester::weak_pointer monitorRequester;
epics::pvData::PVStructure::shared_pointer pvRequest;
bool isStarted;
chtype getType;

View File

@@ -33,17 +33,18 @@ using namespace epics::pvData;
size_t CAChannelProvider::num_instances;
CAChannelProvider::CAChannelProvider() : current_context(0), destroyed(false)
CAChannelProvider::CAChannelProvider()
: current_context(0)
{
REFTRACE_INCREMENT(num_instances);
initialize();
}
CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr<Configuration>&)
: current_context(0)
, destroyed(false)
: current_context(0)
{
REFTRACE_INCREMENT(num_instances);
if(DEBUG_LEVEL>0) {
std::cout<< "CAChannelProvider::CAChannelProvider\n";
}
// Ignoring Configuration as CA only allows config via. environment,
// and we don't want to change this here.
initialize();
@@ -51,9 +52,7 @@ CAChannelProvider::CAChannelProvider(const std::tr1::shared_ptr<Configuration>&)
CAChannelProvider::~CAChannelProvider()
{
// call destroy() to destroy CA context
destroy();
REFTRACE_DECREMENT(num_instances);
if(DEBUG_LEVEL>0) std::cout << "CAChannelProvider::~CAChannelProvider()\n";
}
std::string CAChannelProvider::getProviderName()
@@ -125,53 +124,15 @@ void CAChannelProvider::poll()
{
}
void CAChannelProvider::destroy()
{
Lock lock(channelsMutex);
{
if (destroyed)
return;
destroyed = true;
while (!channels.empty())
{
Channel::shared_pointer channel = channels.begin()->second.lock();
if (channel)
channel->destroy();
else
channels.erase(channels.begin());
}
}
/* Destroy CA Context */
ca_context_destroy();
}
void CAChannelProvider::threadAttach()
{
ca_attach_context(current_context);
}
void CAChannelProvider::registerChannel(Channel::shared_pointer const & channel)
{
Lock lock(channelsMutex);
channels[channel.get()] = Channel::weak_pointer(channel);
}
void CAChannelProvider::unregisterChannel(Channel::shared_pointer const & channel)
{
Lock lock(channelsMutex);
channels.erase(channel.get());
}
void CAChannelProvider::unregisterChannel(Channel* pchannel)
{
Lock lock(channelsMutex);
channels.erase(pchannel);
}
void CAChannelProvider::initialize()
{
if(DEBUG_LEVEL>0) std::cout << "CAChannelProvider::initialize()\n";
/* Create Channel Access */
int result = ca_context_create(ca_enable_preemptive_callback);
if (result != ECA_NORMAL) {
@@ -188,6 +149,7 @@ void CAChannelProvider::initialize()
static
void ca_factory_cleanup(void*)
{
if(DEBUG_LEVEL>0) std::cout << "ca_factory_cleanup\n";
try {
ChannelProviderRegistry::clients()->remove("ca");
ca_context_destroy();
@@ -198,6 +160,11 @@ void ca_factory_cleanup(void*)
void CAClientFactory::start()
{
if(DEBUG_LEVEL>0) std::cout << "CAClientFactory::start()\n";
if(ChannelProviderRegistry::clients()->getProvider("ca")) {
// do not start twice
return;
}
epicsSignalInstallSigAlarmIgnore();
epicsSignalInstallSigPipeIgnore();
registerRefCounter("CAChannelProvider", &CAChannelProvider::num_instances);
@@ -207,7 +174,9 @@ void CAClientFactory::start()
registerRefCounter("CAChannelMonitor", &CAChannelMonitor::num_instances);
if(ChannelProviderRegistry::clients()->add<CAChannelProvider>("ca", false))
{
epicsAtExit(&ca_factory_cleanup, NULL);
}
}
void CAClientFactory::stop()
@@ -215,17 +184,6 @@ void CAClientFactory::stop()
// unregister now done with exit hook
}
// perhaps useful during dynamic loading?
extern "C" {
void registerClientProvider_ca()
{
try {
CAClientFactory::start();
} catch(std::exception& e){
std::cerr<<"Error loading ca: "<<e.what()<<"\n";
}
}
} // extern "C"
}
}

View File

@@ -11,14 +11,18 @@
#include <pv/caProvider.h>
#include <pv/pvAccess.h>
#include <map>
namespace epics {
namespace pvAccess {
class Configuration;
namespace ca {
#define DEBUG_LEVEL 0
class CAChannelProvider;
typedef std::tr1::shared_ptr<CAChannelProvider> CAChannelProviderPtr;
typedef std::tr1::weak_ptr<CAChannelProvider> CAChannelProviderWPtr;
class CAChannelProvider :
public ChannelProvider,
public std::tr1::enable_shared_from_this<CAChannelProvider>
@@ -58,30 +62,15 @@ public:
virtual void flush();
virtual void poll();
virtual void destroy();
virtual void destroy() EPICS_DEPRECATED {};
/* ---------------------------------------------------------------- */
void threadAttach();
void registerChannel(Channel::shared_pointer const & channel);
void unregisterChannel(Channel::shared_pointer const & channel);
void unregisterChannel(Channel* pchannel);
private:
void initialize();
ca_client_context* current_context;
epics::pvData::Mutex channelsMutex;
// TODO std::unordered_map
// void* is not the nicest thing, but there is no fast weak_ptr::operator==
typedef std::map<void*, Channel::weak_pointer> ChannelList;
ChannelList channels;
// synced on channelsMutex
bool destroyed;
};
}

View File

@@ -12,13 +12,23 @@
namespace epics {
namespace pvAccess {
class Configuration;
namespace ca {
/**
* @brief CAClientFactory is a channel provider for the ca network provider.
*
*
*/
class epicsShareClass CAClientFactory
{
public:
/** @brief start the provider
*
*/
static void start();
/** @brief stop the provider
*
*/
static void stop();
};