diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index d69d0f4..02b3f53 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -238,6 +238,9 @@ void CAChannel::connected() if(DEBUG_LEVEL>0) { cout<< "CAChannel::connected " << channelName << endl; } + std::queue putQ; + std::queue getQ; + std::queue monitorQ; { Lock lock(requestsMutex); // we assume array if element count > 1 @@ -258,24 +261,7 @@ void CAChannel::connected() // TODO we need only Structure here this->structure = structure; - } - while(!putQueue.empty()) { - putQueue.front()->activate(); - putQueue.pop(); - } - while(!getQueue.empty()) { - getQueue.front()->activate(); - getQueue.pop(); - } - while(!monitorQueue.empty()) { - monitorQueue.front()->activate(); - monitorQueue.pop(); - } - std::queue putQ; - std::queue getQ; - std::queue monitorQ; - { - Lock lock(requestsMutex); + std::vector::const_iterator getiter; for (getiter = getList.begin(); getiter != getList.end(); ++getiter) { CAChannelGetPtr temp = (*getiter).lock(); @@ -296,8 +282,8 @@ void CAChannel::connected() } } while(!putQ.empty()) { - putQ.front()->channelCreated(Status::Ok,shared_from_this()); - putQ.pop(); + putQ.front()->channelCreated(Status::Ok,shared_from_this()); + putQ.pop(); } while(!getQ.empty()) { getQ.front()->channelCreated(Status::Ok,shared_from_this()); @@ -307,6 +293,22 @@ void CAChannel::connected() monitorQ.front()->channelCreated(Status::Ok,shared_from_this()); monitorQ.pop(); } + while(!getFieldQueue.empty()) { + getFieldQueue.front()->callRequester(shared_from_this()); + getFieldQueue.pop(); + } + while(!putQueue.empty()) { + putQueue.front()->activate(); + putQueue.pop(); + } + while(!getQueue.empty()) { + getQueue.front()->activate(); + getQueue.pop(); + } + while(!monitorQueue.empty()) { + monitorQueue.front()->activate(); + monitorQueue.pop(); + } ChannelRequester::shared_pointer req(channelRequester.lock()); if(req) { EXCEPTION_GUARD(req->channelStateChange( @@ -520,20 +522,18 @@ std::tr1::shared_ptr CAChannel::getChannelRequester() void CAChannel::getField(GetFieldRequester::shared_pointer const & requester, std::string const & subField) { - Field::const_shared_pointer field = - subField.empty() ? - std::tr1::static_pointer_cast(structure) : - structure->getField(subField); - - if (field) - { - EXCEPTION_GUARD(requester->getDone(Status::Ok, field)); + if(DEBUG_LEVEL>0) { + cout << "CAChannel::getField " << channelName << endl; } - else + CAChannelGetFieldPtr getField(new CAChannelGetField(requester,subField)); { - Status errorStatus(Status::STATUSTYPE_ERROR, "field '" + subField + "' not found"); - EXCEPTION_GUARD(requester->getDone(errorStatus, FieldConstPtr())); - } + Lock lock(requestsMutex); + if(getConnectionState()!=Channel::CONNECTED) { + getFieldQueue.push(getField); + return; + } + } + getField->callRequester(shared_from_this()); } @@ -555,12 +555,16 @@ ChannelGet::shared_pointer CAChannel::createChannelGet( if(DEBUG_LEVEL>0) { cout << "CAChannel::createChannelGet " << channelName << endl; } - CAChannelGetPtr channelGet = CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest);\ - if(getConnectionState()==Channel::CONNECTED) { - channelGet->activate(); - } else { - getQueue.push(channelGet); + CAChannelGetPtr channelGet = + CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest); + { + Lock lock(requestsMutex); + if(getConnectionState()!=Channel::CONNECTED) { + getQueue.push(channelGet); + return channelGet; + } } + channelGet->activate(); return channelGet; } @@ -572,12 +576,16 @@ ChannelPut::shared_pointer CAChannel::createChannelPut( if(DEBUG_LEVEL>0) { cout << "CAChannel::createChannelPut " << channelName << endl; } - CAChannelPutPtr channelPut = CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest);\ - if(getConnectionState()==Channel::CONNECTED) { - channelPut->activate(); - } else { - putQueue.push(channelPut); + CAChannelPutPtr channelPut = + CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest); + { + Lock lock(requestsMutex); + if(getConnectionState()!=Channel::CONNECTED) { + putQueue.push(channelPut); + return channelPut; + } } + channelPut->activate(); return channelPut; } @@ -589,12 +597,16 @@ Monitor::shared_pointer CAChannel::createMonitor( if(DEBUG_LEVEL>0) { cout << "CAChannel::createMonitor " << channelName << endl; } - CAChannelMonitorPtr channelMonitor = CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest);\ - if(getConnectionState()==Channel::CONNECTED) { - channelMonitor->activate(); - } else { - monitorQueue.push(channelMonitor); + CAChannelMonitorPtr channelMonitor = + CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest); + { + Lock lock(requestsMutex); + if(getConnectionState()!=Channel::CONNECTED) { + monitorQueue.push(channelMonitor); + return channelMonitor; + } } + channelMonitor->activate(); return channelMonitor; } @@ -613,6 +625,47 @@ void CAChannel::printInfo(std::ostream& out) } +CAChannelGetField::CAChannelGetField( + GetFieldRequester::shared_pointer const & requester,std::string const & subField) + : getFieldRequester(requester), + subField(subField) +{ + if(DEBUG_LEVEL>0) { + cout << "CAChannelGetField::CAChannelGetField()\n"; + } +} + +CAChannelGetField::~CAChannelGetField() +{ + if(DEBUG_LEVEL>0) { + cout << "CAChannelGetField::~CAChannelGetField()\n"; + } +} + +void CAChannelGetField::callRequester(CAChannelPtr const & caChannel) +{ + if(DEBUG_LEVEL>0) { + cout << "CAChannelGetField::callRequester\n"; + } + GetFieldRequester::shared_pointer requester(getFieldRequester.lock()); + if(!requester) return; + epics::pvData::Structure::const_shared_pointer structure(caChannel->getStructure()); + Field::const_shared_pointer field = + subField.empty() ? + std::tr1::static_pointer_cast(structure) : + structure->getField(subField); + + if (field) + { + EXCEPTION_GUARD(requester->getDone(Status::Ok, field)); + } + else + { + Status errorStatus(Status::STATUSTYPE_ERROR, "field '" + subField + "' not found"); + EXCEPTION_GUARD(requester->getDone(errorStatus, FieldConstPtr())); + } +} + /* ---------------------------------------------------------- */ void CAChannel::threadAttach() @@ -624,15 +677,6 @@ void CAChannel::threadAttach() } -CAChannelGetPtr CAChannelGet::create( - CAChannel::shared_pointer const & channel, - ChannelGetRequester::shared_pointer const & channelGetRequester, - epics::pvData::PVStructure::shared_pointer const & pvRequest) -{ - return CAChannelGetPtr(new CAChannelGet(channel, channelGetRequester, pvRequest)); -} - - static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype nativeType) { // get "field" sub-structure @@ -642,9 +686,13 @@ static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype n fieldSubField = pvRequest; Structure::const_shared_pointer fieldStructure = fieldSubField->getStructure(); - // no fields or control -> DBR_CTRL_ - if (fieldStructure->getNumberFields() == 0 || - fieldStructure->getField("control")) + // no fields + if (fieldStructure->getNumberFields() == 0) + { + return static_cast(static_cast(nativeType) + DBR_TIME_STRING); + } + // control -> DBR_CTRL_ + if (fieldStructure->getField("control")) return static_cast(static_cast(nativeType) + DBR_CTRL_STRING); // display/valueAlarm -> DBR_GR_ @@ -666,6 +714,15 @@ static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype n size_t CAChannelGet::num_instances; +CAChannelGetPtr CAChannelGet::create( + CAChannel::shared_pointer const & channel, + ChannelGetRequester::shared_pointer const & channelGetRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) +{ + return CAChannelGetPtr(new CAChannelGet(channel, channelGetRequester, pvRequest)); +} + + CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel, ChannelGetRequester::shared_pointer const & channelGetRequester, epics::pvData::PVStructure::shared_pointer const & pvRequest) diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index 80c2604..d79a52f 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -24,6 +24,9 @@ namespace ca { class CAChannel; typedef std::tr1::shared_ptr CAChannelPtr; +class CAChannelGetField; +typedef std::tr1::shared_ptr CAChannelGetFieldPtr; +typedef std::tr1::weak_ptr CAChannelGetFieldWPtr; class CAChannelPut; typedef std::tr1::shared_ptr CAChannelPutPtr; typedef std::tr1::weak_ptr CAChannelPutWPtr; @@ -34,8 +37,18 @@ class CAChannelMonitor; typedef std::tr1::shared_ptr CAChannelMonitorPtr; typedef std::tr1::weak_ptr CAChannelMonitorWPtr; -typedef std::tr1::shared_ptr ChannelBaseRequesterPtr; -typedef std::tr1::weak_ptr ChannelBaseRequesterWPtr; +class CAChannelGetField : + public std::tr1::enable_shared_from_this +{ +public: + POINTER_DEFINITIONS(CAChannelGetField); + CAChannelGetField(GetFieldRequester::shared_pointer const & requester,std::string const & subField); + ~CAChannelGetField(); + void callRequester(CAChannelPtr const & caChannel); +private: + GetFieldRequester::weak_pointer getFieldRequester; + std::string subField; +}; class CAChannel : public Channel, @@ -120,6 +133,7 @@ private: epics::pvData::Mutex requestsMutex; + std::queue getFieldQueue; std::queue putQueue; std::queue getQueue; std::queue monitorQueue;