call channelDisconnect

This commit is contained in:
mrkraimer
2017-08-04 14:15:42 -04:00
parent b519422df5
commit f18fe3dca4
3 changed files with 271 additions and 46 deletions

View File

@@ -46,10 +46,11 @@ static void ca_connection_handler(struct connection_handler_args args)
{
CAChannel *channel = static_cast<CAChannel*>(ca_puser(args.chid));
if (args.op == CA_OP_CONN_UP)
if (args.op == CA_OP_CONN_UP) {
channel->connected();
else if (args.op == CA_OP_CONN_DOWN)
} else if (args.op == CA_OP_CONN_DOWN) {
channel->disconnected();
}
}
@@ -232,28 +233,26 @@ static PVStructure::shared_pointer createPVStructure(CAChannel::shared_pointer c
void CAChannel::connected()
{
// TODO sync
// we assume array if element count > 1
elementCount = ca_element_count(channelID);
channelType = ca_field_type(channelID);
bool isArray = elementCount > 1;
{
Lock lock(requestsMutex);
// we assume array if element count > 1
elementCount = ca_element_count(channelID);
channelType = ca_field_type(channelID);
bool isArray = elementCount > 1;
// no valueAlarm and control,display for non-numeric type
// no control,display for numeric arrays
string allProperties =
(channelType != DBR_STRING && channelType != DBR_ENUM) ?
isArray ?
"value,timeStamp,alarm,display" :
"value,timeStamp,alarm,display,valueAlarm,control" :
"value,timeStamp,alarm";
Structure::const_shared_pointer structure = createStructure(shared_from_this(), allProperties);
// no valueAlarm and control,display for non-numeric type
// no control,display for numeric arrays
string allProperties =
(channelType != DBR_STRING && channelType != DBR_ENUM) ?
isArray ?
"value,timeStamp,alarm,display" :
"value,timeStamp,alarm,display,valueAlarm,control" :
"value,timeStamp,alarm";
Structure::const_shared_pointer structure = createStructure(shared_from_this(), allProperties);
// TODO thread sync
// TODO we need only Structure here
this->structure = structure;
// TODO call channelCreated if structure has changed
EXCEPTION_GUARD(channelRequester->channelStateChange(shared_from_this(), Channel::CONNECTED));
// TODO we need only Structure here
this->structure = structure;
}
while(!putQueue.empty()) {
putQueue.front()->activate();
putQueue.pop();
@@ -266,10 +265,83 @@ void CAChannel::connected()
monitorQueue.front()->activate();
monitorQueue.pop();
}
std::queue<CAChannelPutPtr> putQ;
std::queue<CAChannelGetPtr> getQ;
std::queue<CAChannelMonitorPtr> monitorQ;
{
Lock lock(requestsMutex);
std::vector<CAChannelGetWPtr>::const_iterator getiter;
for (getiter = getList.begin(); getiter != getList.end(); ++getiter) {
CAChannelGetPtr temp = (*getiter).lock();
if(!temp) continue;
getQ.push(temp);
}
std::vector<CAChannelPutWPtr>::const_iterator putiter;
for (putiter = putList.begin(); putiter != putList.end(); ++putiter) {
CAChannelPutPtr temp = (*putiter).lock();
if(!temp) continue;
putQ.push(temp);
}
std::vector<CAChannelMonitorWPtr>::const_iterator monitoriter;
for (monitoriter = monitorList.begin(); monitoriter != monitorList.end(); ++monitoriter) {
CAChannelMonitorPtr temp = (*monitoriter).lock();
if(!temp) continue;
monitorQ.push(temp);
}
}
while(!putQ.empty()) {
putQ.front()->channelCreated(Status::Ok,shared_from_this());
putQ.pop();
}
while(!getQ.empty()) {
getQ.front()->channelCreated(Status::Ok,shared_from_this());
getQ.pop();
}
while(!monitorQ.empty()) {
monitorQ.front()->channelCreated(Status::Ok,shared_from_this());
monitorQ.pop();
}
EXCEPTION_GUARD(channelRequester->channelStateChange(shared_from_this(), Channel::CONNECTED));
}
void CAChannel::disconnected()
{
std::queue<CAChannelPutPtr> putQ;
std::queue<CAChannelGetPtr> getQ;
std::queue<CAChannelMonitorPtr> monitorQ;
{
Lock lock(requestsMutex);
std::vector<CAChannelGetWPtr>::const_iterator getiter;
for (getiter = getList.begin(); getiter != getList.end(); ++getiter) {
CAChannelGetPtr temp = (*getiter).lock();
if(!temp) continue;
getQ.push(temp);
}
std::vector<CAChannelPutWPtr>::const_iterator putiter;
for (putiter = putList.begin(); putiter != putList.end(); ++putiter) {
CAChannelPutPtr temp = (*putiter).lock();
if(!temp) continue;
putQ.push(temp);
}
std::vector<CAChannelMonitorWPtr>::const_iterator monitoriter;
for (monitoriter = monitorList.begin(); monitoriter != monitorList.end(); ++monitoriter) {
CAChannelMonitorPtr temp = (*monitoriter).lock();
if(!temp) continue;
monitorQ.push(temp);
}
}
while(!putQ.empty()) {
putQ.front()->channelDisconnect(false);
putQ.pop();
}
while(!getQ.empty()) {
getQ.front()->channelDisconnect(false);
getQ.pop();
}
while(!monitorQ.empty()) {
monitorQ.front()->channelDisconnect(false);
monitorQ.pop();
}
EXCEPTION_GUARD(channelRequester->channelStateChange(shared_from_this(), Channel::DISCONNECTED));
}
@@ -308,12 +380,48 @@ void CAChannel::activate(short priority)
}
}
void CAChannel::addChannelGet(const CAChannelGetPtr & get)
{
Lock lock(requestsMutex);
for(size_t i=0; i< getList.size(); ++i) {
if(!(getList[i].lock())) {
getList[i] = get;
return;
}
}
getList.push_back(get);
}
void CAChannel::addChannelPut(const CAChannelPutPtr & put)
{
Lock lock(requestsMutex);
for(size_t i=0; i< putList.size(); ++i) {
if(!(putList[i].lock())) {
putList[i] = put;
return;
}
}
putList.push_back(put);
}
void CAChannel::addChannelMonitor(const CAChannelMonitorPtr & monitor)
{
Lock lock(requestsMutex);
for(size_t i=0; i< monitorList.size(); ++i) {
if(!(monitorList[i].lock())) {
monitorList[i] = monitor;
return;
}
}
monitorList.push_back(monitor);
}
CAChannel::~CAChannel()
{
PVACCESS_REFCOUNT_MONITOR_DESTRUCT(caChannel);
Lock lock(requestsMutex);
{
Lock lock(requestsMutex);
if (destroyed)
return;
destroyed = true;
@@ -344,6 +452,11 @@ unsigned CAChannel::getElementCount()
return elementCount;
}
Structure::const_shared_pointer CAChannel::getStructure()
{
return structure;
}
std::tr1::shared_ptr<ChannelProvider> CAChannel::getProvider()
{
@@ -504,11 +617,6 @@ CAChannelGetPtr CAChannelGet::create(
}
CAChannelGet::~CAChannelGet()
{
}
static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype nativeType)
{
// get "field" sub-structure
@@ -553,6 +661,10 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
}
CAChannelGet::~CAChannelGet()
{
}
void CAChannelGet::activate()
{
if(pvStructure) throw std::runtime_error("CAChannelGet::activate() was called twice");
@@ -560,10 +672,33 @@ void CAChannelGet::activate()
pvStructure = createPVStructure(channel, getType, pvRequest);
bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
bitSet->set(0);
channel->addChannelGet(shared_from_this());
EXCEPTION_GUARD(channelGetRequester->channelGetConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
void CAChannelGet::channelCreated(const Status& status,Channel::shared_pointer const & cl)
{
chtype newType = getDBRType(pvRequest, channel->getNativeType());
if(newType!=getType) {
pvStructure.reset();
activate();
}
}
void CAChannelGet::channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
EXCEPTION_GUARD(channelGetRequester->channelDisconnect(connectionState==Channel::DESTROYED);)
}
}
void CAChannelGet::channelDisconnect(bool destroy)
{
EXCEPTION_GUARD(channelGetRequester->channelDisconnect(destroy);)
}
/* --------------- epics::pvAccess::ChannelGet --------------- */
@@ -717,13 +852,6 @@ void copy_format(const void * /*dbr*/, PVStructure::shared_pointer const & pvDis
if (format.get()) format->put("%d");
}
template <>
void copy_format<dbr_time_string>(const void * /*dbr*/, PVStructure::shared_pointer const & pvDisplayStructure)
{
PVStringPtr format = pvDisplayStructure->getSubField<PVString>("format");
if (format.get()) format->put("%s");
}
#define COPY_FORMAT_FOR(T) \
template <> \
void copy_format<T>(const void * dbr, PVStructure::shared_pointer const & pvDisplayStructure) \
@@ -1025,7 +1153,6 @@ CAChannelPutPtr CAChannelPut::create(
CAChannelPut::~CAChannelPut()
{
// TODO
}
@@ -1055,11 +1182,35 @@ void CAChannelPut::activate()
if(val.compare("true")==0) block = true;
}
bitSet->set(pvStructure->getSubFieldT("value")->getFieldOffset());
channel->addChannelPut(shared_from_this());
EXCEPTION_GUARD(channelPutRequester->channelPutConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
void CAChannelPut::channelCreated(const Status& status,Channel::shared_pointer const & c)
{
chtype newType = getDBRType(pvRequest, channel->getNativeType());
if(newType!=getType) {
pvStructure.reset();
activate();
}
}
void CAChannelPut::channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
EXCEPTION_GUARD(channelPutRequester->channelDisconnect(connectionState==Channel::DESTROYED);)
}
}
void CAChannelPut::channelDisconnect(bool destroy)
{
EXCEPTION_GUARD(channelPutRequester->channelDisconnect(destroy);)
}
/* --------------- epics::pvAccess::ChannelPut --------------- */
namespace {
@@ -1516,10 +1667,35 @@ void CAChannelMonitor::activate()
}
}
monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize));
channel->addChannelMonitor(shared_from_this());
EXCEPTION_GUARD(monitorRequester->monitorConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
}
void CAChannelMonitor::channelCreated(const Status& status,Channel::shared_pointer const & c)
{
chtype newType = getDBRType(pvRequest, channel->getNativeType());
if(newType!=getType) {
pvStructure.reset();
activate();
}
}
void CAChannelMonitor::channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
EXCEPTION_GUARD(monitorRequester->channelDisconnect(connectionState==Channel::DESTROYED);)
}
}
void CAChannelMonitor::channelDisconnect(bool destroy)
{
EXCEPTION_GUARD(monitorRequester->channelDisconnect(destroy);)
}
void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
{
if (args.status == ECA_NORMAL)

View File

@@ -180,6 +180,7 @@ void ca_factory_cleanup(void*)
{
try {
ChannelProviderRegistry::clients()->remove("ca");
ca_context_destroy();
} catch(std::exception& e) {
LOG(logLevelWarn, "Error when unregister \"ca\" factory");
}

View File

@@ -8,6 +8,7 @@
#define CACHANNEL_H
#include <queue>
#include <vector>
#include <pv/pvAccess.h>
@@ -21,12 +22,21 @@ namespace epics {
namespace pvAccess {
namespace ca {
class CAChannel;
typedef std::tr1::shared_ptr<CAChannel> CAChannelPtr;
typedef std::tr1::weak_ptr<CAChannel> CAChannelWPtr;
class CAChannelPut;
typedef std::tr1::shared_ptr<CAChannelPut> CAChannelPutPtr;
typedef std::tr1::weak_ptr<CAChannelPut> CAChannelPutWPtr;
class CAChannelGet;
typedef std::tr1::shared_ptr<CAChannelGet> CAChannelGetPtr;
typedef std::tr1::weak_ptr<CAChannelGet> CAChannelGetWPtr;
class CAChannelMonitor;
typedef std::tr1::shared_ptr<CAChannelMonitor> CAChannelMonitorPtr;
typedef std::tr1::weak_ptr<CAChannelMonitor> CAChannelMonitorWPtr;
typedef std::tr1::shared_ptr<ChannelBaseRequester> ChannelBaseRequesterPtr;
typedef std::tr1::weak_ptr<ChannelBaseRequester> ChannelBaseRequesterWPtr;
class CAChannel :
public Channel,
@@ -49,6 +59,7 @@ public:
chid getChannelID();
chtype getNativeType();
unsigned getElementCount();
epics::pvData::Structure::const_shared_pointer getStructure();
/* --------------- epics::pvAccess::Channel --------------- */
@@ -84,6 +95,11 @@ public:
void threadAttach();
void addChannelGet(const CAChannelGetPtr & get);
void addChannelPut(const CAChannelPutPtr & get);
void addChannelMonitor(const CAChannelMonitorPtr & get);
private:
CAChannel(std::string const & channelName,
@@ -103,16 +119,20 @@ private:
epics::pvData::Mutex requestsMutex;
// synced on requestsMutex
bool destroyed;
std::queue<CAChannelPutPtr> putQueue;
std::queue<CAChannelGetPtr> getQueue;
std::queue<CAChannelMonitorPtr> monitorQueue;
std::vector<CAChannelGetWPtr> getList;
std::vector<CAChannelPutWPtr> putList;
std::vector<CAChannelMonitorWPtr> monitorList;
};
class CAChannelGet :
public ChannelGet,
public ChannelRequester,
public ChannelBaseRequester,
public std::tr1::enable_shared_from_this<CAChannelGet>
{
@@ -128,17 +148,24 @@ public:
void getDone(struct event_handler_args &args);
/* --------------- epics::pvAccess::ChannelGet --------------- */
virtual void get();
/* --------------- epics::pvData::ChannelRequest --------------- */
virtual Channel::shared_pointer getChannel();
virtual void cancel();
virtual void lastRequest();
/* --------------- Destroyable --------------- */
/* --------------- ChannelRequester --------------- */
virtual void channelCreated(
const epics::pvData::Status& status,
Channel::shared_pointer const & channel);
virtual void channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState);
virtual std::string getRequesterName() { return "CAChannelGet";}
/* --------------- ChannelBaseRequester --------------- */
virtual void channelDisconnect(bool destroy);
/* --------------- Destroyable --------------- */
virtual void destroy();
void activate();
@@ -165,6 +192,8 @@ private:
class CAChannelPut :
public ChannelPut,
public ChannelRequester,
public ChannelBaseRequester,
public std::tr1::enable_shared_from_this<CAChannelPut>
{
@@ -194,6 +223,16 @@ public:
virtual void cancel();
virtual void lastRequest();
/* --------------- ChannelRequester --------------- */
virtual void channelCreated(
const epics::pvData::Status& status,
Channel::shared_pointer const & channel);
virtual void channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState);
virtual std::string getRequesterName() { return "CAChannelPut";}
/* --------------- ChannelBaseRequester --------------- */
virtual void channelDisconnect(bool destroy);
/* --------------- Destroyable --------------- */
virtual void destroy();
@@ -224,6 +263,8 @@ typedef std::tr1::shared_ptr<CACMonitorQueue> CACMonitorQueuePtr;
class CAChannelMonitor :
public Monitor,
public ChannelRequester,
public ChannelBaseRequester,
public std::tr1::enable_shared_from_this<CAChannelMonitor>
{
@@ -246,11 +287,18 @@ public:
virtual void release(MonitorElementPtr const & monitorElement);
/* --------------- epics::pvData::ChannelRequest --------------- */
virtual void cancel();
/* --------------- ChannelRequester --------------- */
virtual void channelCreated(
const epics::pvData::Status& status,
Channel::shared_pointer const & channel);
virtual void channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState);
virtual std::string getRequesterName() { return "CAChannelMonitor";}
/* --------------- ChannelBaseRequester --------------- */
virtual void channelDisconnect(bool destroy);
/* --------------- Destroyable --------------- */
virtual void destroy();
void activate();
private:
@@ -260,7 +308,7 @@ private:
epics::pvData::PVStructure::shared_pointer const & pvRequest);
CAChannel::shared_pointer channel;
CAChannelPtr channel;
MonitorRequester::shared_pointer monitorRequester;
epics::pvData::PVStructure::shared_pointer pvRequest;
chtype getType;