diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index edbcf27..9e3c248 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -46,10 +46,11 @@ static void ca_connection_handler(struct connection_handler_args args) { CAChannel *channel = static_cast(ca_puser(args.chid)); - if (args.op == CA_OP_CONN_UP) + if (args.op == CA_OP_CONN_UP) { channel->connected(); - else if (args.op == CA_OP_CONN_DOWN) + } else if (args.op == CA_OP_CONN_DOWN) { channel->disconnected(); + } } @@ -232,28 +233,26 @@ static PVStructure::shared_pointer createPVStructure(CAChannel::shared_pointer c void CAChannel::connected() { - // TODO sync - // we assume array if element count > 1 - elementCount = ca_element_count(channelID); - channelType = ca_field_type(channelID); - bool isArray = elementCount > 1; + { + Lock lock(requestsMutex); + // we assume array if element count > 1 + elementCount = ca_element_count(channelID); + channelType = ca_field_type(channelID); + bool isArray = elementCount > 1; - // no valueAlarm and control,display for non-numeric type - // no control,display for numeric arrays - string allProperties = - (channelType != DBR_STRING && channelType != DBR_ENUM) ? - isArray ? - "value,timeStamp,alarm,display" : - "value,timeStamp,alarm,display,valueAlarm,control" : - "value,timeStamp,alarm"; - Structure::const_shared_pointer structure = createStructure(shared_from_this(), allProperties); + // no valueAlarm and control,display for non-numeric type + // no control,display for numeric arrays + string allProperties = + (channelType != DBR_STRING && channelType != DBR_ENUM) ? + isArray ? + "value,timeStamp,alarm,display" : + "value,timeStamp,alarm,display,valueAlarm,control" : + "value,timeStamp,alarm"; + Structure::const_shared_pointer structure = createStructure(shared_from_this(), allProperties); - // TODO thread sync - // TODO we need only Structure here - this->structure = structure; - - // TODO call channelCreated if structure has changed - EXCEPTION_GUARD(channelRequester->channelStateChange(shared_from_this(), Channel::CONNECTED)); + // TODO we need only Structure here + this->structure = structure; + } while(!putQueue.empty()) { putQueue.front()->activate(); putQueue.pop(); @@ -266,10 +265,83 @@ void CAChannel::connected() monitorQueue.front()->activate(); monitorQueue.pop(); } + std::queue putQ; + std::queue getQ; + std::queue monitorQ; + { + Lock lock(requestsMutex); + std::vector::const_iterator getiter; + for (getiter = getList.begin(); getiter != getList.end(); ++getiter) { + CAChannelGetPtr temp = (*getiter).lock(); + if(!temp) continue; + getQ.push(temp); + } + std::vector::const_iterator putiter; + for (putiter = putList.begin(); putiter != putList.end(); ++putiter) { + CAChannelPutPtr temp = (*putiter).lock(); + if(!temp) continue; + putQ.push(temp); + } + std::vector::const_iterator monitoriter; + for (monitoriter = monitorList.begin(); monitoriter != monitorList.end(); ++monitoriter) { + CAChannelMonitorPtr temp = (*monitoriter).lock(); + if(!temp) continue; + monitorQ.push(temp); + } + } + while(!putQ.empty()) { + putQ.front()->channelCreated(Status::Ok,shared_from_this()); + putQ.pop(); + } + while(!getQ.empty()) { + getQ.front()->channelCreated(Status::Ok,shared_from_this()); + getQ.pop(); + } + while(!monitorQ.empty()) { + monitorQ.front()->channelCreated(Status::Ok,shared_from_this()); + monitorQ.pop(); + } + EXCEPTION_GUARD(channelRequester->channelStateChange(shared_from_this(), Channel::CONNECTED)); } void CAChannel::disconnected() { + std::queue putQ; + std::queue getQ; + std::queue monitorQ; + { + Lock lock(requestsMutex); + std::vector::const_iterator getiter; + for (getiter = getList.begin(); getiter != getList.end(); ++getiter) { + CAChannelGetPtr temp = (*getiter).lock(); + if(!temp) continue; + getQ.push(temp); + } + std::vector::const_iterator putiter; + for (putiter = putList.begin(); putiter != putList.end(); ++putiter) { + CAChannelPutPtr temp = (*putiter).lock(); + if(!temp) continue; + putQ.push(temp); + } + std::vector::const_iterator monitoriter; + for (monitoriter = monitorList.begin(); monitoriter != monitorList.end(); ++monitoriter) { + CAChannelMonitorPtr temp = (*monitoriter).lock(); + if(!temp) continue; + monitorQ.push(temp); + } + } + while(!putQ.empty()) { + putQ.front()->channelDisconnect(false); + putQ.pop(); + } + while(!getQ.empty()) { + getQ.front()->channelDisconnect(false); + getQ.pop(); + } + while(!monitorQ.empty()) { + monitorQ.front()->channelDisconnect(false); + monitorQ.pop(); + } EXCEPTION_GUARD(channelRequester->channelStateChange(shared_from_this(), Channel::DISCONNECTED)); } @@ -308,12 +380,48 @@ void CAChannel::activate(short priority) } } +void CAChannel::addChannelGet(const CAChannelGetPtr & get) +{ + Lock lock(requestsMutex); + for(size_t i=0; i< getList.size(); ++i) { + if(!(getList[i].lock())) { + getList[i] = get; + return; + } + } + getList.push_back(get); +} + +void CAChannel::addChannelPut(const CAChannelPutPtr & put) +{ + Lock lock(requestsMutex); + for(size_t i=0; i< putList.size(); ++i) { + if(!(putList[i].lock())) { + putList[i] = put; + return; + } + } + putList.push_back(put); +} + + +void CAChannel::addChannelMonitor(const CAChannelMonitorPtr & monitor) +{ + Lock lock(requestsMutex); + for(size_t i=0; i< monitorList.size(); ++i) { + if(!(monitorList[i].lock())) { + monitorList[i] = monitor; + return; + } + } + monitorList.push_back(monitor); +} + CAChannel::~CAChannel() { PVACCESS_REFCOUNT_MONITOR_DESTRUCT(caChannel); - - Lock lock(requestsMutex); { + Lock lock(requestsMutex); if (destroyed) return; destroyed = true; @@ -344,6 +452,11 @@ unsigned CAChannel::getElementCount() return elementCount; } +Structure::const_shared_pointer CAChannel::getStructure() +{ + return structure; +} + std::tr1::shared_ptr CAChannel::getProvider() { @@ -504,11 +617,6 @@ CAChannelGetPtr CAChannelGet::create( } -CAChannelGet::~CAChannelGet() -{ -} - - static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype nativeType) { // get "field" sub-structure @@ -553,6 +661,10 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel, } +CAChannelGet::~CAChannelGet() +{ +} + void CAChannelGet::activate() { if(pvStructure) throw std::runtime_error("CAChannelGet::activate() was called twice"); @@ -560,10 +672,33 @@ void CAChannelGet::activate() pvStructure = createPVStructure(channel, getType, pvRequest); bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields())); bitSet->set(0); + channel->addChannelGet(shared_from_this()); EXCEPTION_GUARD(channelGetRequester->channelGetConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); } +void CAChannelGet::channelCreated(const Status& status,Channel::shared_pointer const & cl) +{ + chtype newType = getDBRType(pvRequest, channel->getNativeType()); + if(newType!=getType) { + pvStructure.reset(); + activate(); + } +} + +void CAChannelGet::channelStateChange( + Channel::shared_pointer const & channel, + Channel::ConnectionState connectionState) +{ + if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) { + EXCEPTION_GUARD(channelGetRequester->channelDisconnect(connectionState==Channel::DESTROYED);) + } +} + +void CAChannelGet::channelDisconnect(bool destroy) +{ + EXCEPTION_GUARD(channelGetRequester->channelDisconnect(destroy);) +} /* --------------- epics::pvAccess::ChannelGet --------------- */ @@ -717,13 +852,6 @@ void copy_format(const void * /*dbr*/, PVStructure::shared_pointer const & pvDis if (format.get()) format->put("%d"); } -template <> -void copy_format(const void * /*dbr*/, PVStructure::shared_pointer const & pvDisplayStructure) -{ - PVStringPtr format = pvDisplayStructure->getSubField("format"); - if (format.get()) format->put("%s"); -} - #define COPY_FORMAT_FOR(T) \ template <> \ void copy_format(const void * dbr, PVStructure::shared_pointer const & pvDisplayStructure) \ @@ -1025,7 +1153,6 @@ CAChannelPutPtr CAChannelPut::create( CAChannelPut::~CAChannelPut() { - // TODO } @@ -1055,11 +1182,35 @@ void CAChannelPut::activate() if(val.compare("true")==0) block = true; } bitSet->set(pvStructure->getSubFieldT("value")->getFieldOffset()); + channel->addChannelPut(shared_from_this()); EXCEPTION_GUARD(channelPutRequester->channelPutConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); } +void CAChannelPut::channelCreated(const Status& status,Channel::shared_pointer const & c) +{ + chtype newType = getDBRType(pvRequest, channel->getNativeType()); + if(newType!=getType) { + pvStructure.reset(); + activate(); + } +} + +void CAChannelPut::channelStateChange( + Channel::shared_pointer const & channel, + Channel::ConnectionState connectionState) +{ + if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) { + EXCEPTION_GUARD(channelPutRequester->channelDisconnect(connectionState==Channel::DESTROYED);) + } +} + +void CAChannelPut::channelDisconnect(bool destroy) +{ + EXCEPTION_GUARD(channelPutRequester->channelDisconnect(destroy);) +} + /* --------------- epics::pvAccess::ChannelPut --------------- */ namespace { @@ -1516,10 +1667,35 @@ void CAChannelMonitor::activate() } } monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize)); + channel->addChannelMonitor(shared_from_this()); EXCEPTION_GUARD(monitorRequester->monitorConnect(Status::Ok, shared_from_this(), pvStructure->getStructure())); } +void CAChannelMonitor::channelCreated(const Status& status,Channel::shared_pointer const & c) +{ + chtype newType = getDBRType(pvRequest, channel->getNativeType()); + if(newType!=getType) { + pvStructure.reset(); + activate(); + } +} + +void CAChannelMonitor::channelStateChange( + Channel::shared_pointer const & channel, + Channel::ConnectionState connectionState) +{ + if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) { + EXCEPTION_GUARD(monitorRequester->channelDisconnect(connectionState==Channel::DESTROYED);) + } +} + + +void CAChannelMonitor::channelDisconnect(bool destroy) +{ + EXCEPTION_GUARD(monitorRequester->channelDisconnect(destroy);) +} + void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args) { if (args.status == ECA_NORMAL) diff --git a/src/ca/caProvider.cpp b/src/ca/caProvider.cpp index ad40e30..e4ea6b1 100644 --- a/src/ca/caProvider.cpp +++ b/src/ca/caProvider.cpp @@ -180,6 +180,7 @@ void ca_factory_cleanup(void*) { try { ChannelProviderRegistry::clients()->remove("ca"); + ca_context_destroy(); } catch(std::exception& e) { LOG(logLevelWarn, "Error when unregister \"ca\" factory"); } diff --git a/src/ca/pv/caChannel.h b/src/ca/pv/caChannel.h index 21edf0c..2987ba5 100644 --- a/src/ca/pv/caChannel.h +++ b/src/ca/pv/caChannel.h @@ -8,6 +8,7 @@ #define CACHANNEL_H #include +#include #include @@ -21,12 +22,21 @@ namespace epics { namespace pvAccess { namespace ca { +class CAChannel; +typedef std::tr1::shared_ptr CAChannelPtr; +typedef std::tr1::weak_ptr CAChannelWPtr; class CAChannelPut; typedef std::tr1::shared_ptr CAChannelPutPtr; +typedef std::tr1::weak_ptr CAChannelPutWPtr; class CAChannelGet; typedef std::tr1::shared_ptr CAChannelGetPtr; +typedef std::tr1::weak_ptr CAChannelGetWPtr; class CAChannelMonitor; typedef std::tr1::shared_ptr CAChannelMonitorPtr; +typedef std::tr1::weak_ptr CAChannelMonitorWPtr; + +typedef std::tr1::shared_ptr ChannelBaseRequesterPtr; +typedef std::tr1::weak_ptr ChannelBaseRequesterWPtr; class CAChannel : public Channel, @@ -49,6 +59,7 @@ public: chid getChannelID(); chtype getNativeType(); unsigned getElementCount(); + epics::pvData::Structure::const_shared_pointer getStructure(); /* --------------- epics::pvAccess::Channel --------------- */ @@ -84,6 +95,11 @@ public: void threadAttach(); + void addChannelGet(const CAChannelGetPtr & get); + void addChannelPut(const CAChannelPutPtr & get); + void addChannelMonitor(const CAChannelMonitorPtr & get); + + private: CAChannel(std::string const & channelName, @@ -103,16 +119,20 @@ private: epics::pvData::Mutex requestsMutex; - // synced on requestsMutex bool destroyed; std::queue putQueue; std::queue getQueue; std::queue monitorQueue; + std::vector getList; + std::vector putList; + std::vector monitorList; }; class CAChannelGet : public ChannelGet, + public ChannelRequester, + public ChannelBaseRequester, public std::tr1::enable_shared_from_this { @@ -128,17 +148,24 @@ public: void getDone(struct event_handler_args &args); /* --------------- epics::pvAccess::ChannelGet --------------- */ - virtual void get(); /* --------------- epics::pvData::ChannelRequest --------------- */ - virtual Channel::shared_pointer getChannel(); virtual void cancel(); virtual void lastRequest(); - /* --------------- Destroyable --------------- */ - + /* --------------- ChannelRequester --------------- */ + virtual void channelCreated( + const epics::pvData::Status& status, + Channel::shared_pointer const & channel); + virtual void channelStateChange( + Channel::shared_pointer const & channel, + Channel::ConnectionState connectionState); + virtual std::string getRequesterName() { return "CAChannelGet";} + /* --------------- ChannelBaseRequester --------------- */ + virtual void channelDisconnect(bool destroy); + /* --------------- Destroyable --------------- */ virtual void destroy(); void activate(); @@ -165,6 +192,8 @@ private: class CAChannelPut : public ChannelPut, + public ChannelRequester, + public ChannelBaseRequester, public std::tr1::enable_shared_from_this { @@ -194,6 +223,16 @@ public: virtual void cancel(); virtual void lastRequest(); + /* --------------- ChannelRequester --------------- */ + virtual void channelCreated( + const epics::pvData::Status& status, + Channel::shared_pointer const & channel); + virtual void channelStateChange( + Channel::shared_pointer const & channel, + Channel::ConnectionState connectionState); + virtual std::string getRequesterName() { return "CAChannelPut";} + /* --------------- ChannelBaseRequester --------------- */ + virtual void channelDisconnect(bool destroy); /* --------------- Destroyable --------------- */ virtual void destroy(); @@ -224,6 +263,8 @@ typedef std::tr1::shared_ptr CACMonitorQueuePtr; class CAChannelMonitor : public Monitor, + public ChannelRequester, + public ChannelBaseRequester, public std::tr1::enable_shared_from_this { @@ -246,11 +287,18 @@ public: virtual void release(MonitorElementPtr const & monitorElement); /* --------------- epics::pvData::ChannelRequest --------------- */ - virtual void cancel(); - + /* --------------- ChannelRequester --------------- */ + virtual void channelCreated( + const epics::pvData::Status& status, + Channel::shared_pointer const & channel); + virtual void channelStateChange( + Channel::shared_pointer const & channel, + Channel::ConnectionState connectionState); + virtual std::string getRequesterName() { return "CAChannelMonitor";} + /* --------------- ChannelBaseRequester --------------- */ + virtual void channelDisconnect(bool destroy); /* --------------- Destroyable --------------- */ - virtual void destroy(); void activate(); private: @@ -260,7 +308,7 @@ private: epics::pvData::PVStructure::shared_pointer const & pvRequest); - CAChannel::shared_pointer channel; + CAChannelPtr channel; MonitorRequester::shared_pointer monitorRequester; epics::pvData::PVStructure::shared_pointer pvRequest; chtype getType;