fix all problems raised in issue #71

This commit is contained in:
mrkraimer
2017-11-17 15:18:21 -05:00
parent 5bcbc0d717
commit 32d0ece858
2 changed files with 134 additions and 70 deletions

View File

@@ -259,53 +259,61 @@ void CAChannel::connected()
// TODO we need only Structure here
this->structure = structure;
}
while(!putQueue.empty()) {
putQueue.front()->activate();
putQueue.pop();
while(!getFieldQueue.empty()) {
getFieldQueue.front()->callRequester(shared_from_this());
getFieldQueue.pop();
}
while(!getQueue.empty()) {
getQueue.front()->activate();
getQueue.pop();
}
while(!monitorQueue.empty()) {
monitorQueue.front()->activate();
monitorQueue.pop();
}
std::queue<CAChannelPutPtr> putQ;
std::queue<CAChannelGetPtr> getQ;
std::queue<CAChannelMonitorPtr> monitorQ;
{
Lock lock(requestsMutex);
std::vector<CAChannelGetWPtr>::const_iterator getiter;
for (getiter = getList.begin(); getiter != getList.end(); ++getiter) {
CAChannelGetPtr temp = (*getiter).lock();
if(!temp) continue;
getQ.push(temp);
if(firstConnect) {
while(!putQueue.empty()) {
putQueue.front()->activate();
putQueue.pop();
}
std::vector<CAChannelPutWPtr>::const_iterator putiter;
for (putiter = putList.begin(); putiter != putList.end(); ++putiter) {
CAChannelPutPtr temp = (*putiter).lock();
if(!temp) continue;
putQ.push(temp);
while(!getQueue.empty()) {
getQueue.front()->activate();
getQueue.pop();
}
std::vector<CAChannelMonitorWPtr>::const_iterator monitoriter;
for (monitoriter = monitorList.begin(); monitoriter != monitorList.end(); ++monitoriter) {
CAChannelMonitorPtr temp = (*monitoriter).lock();
if(!temp) continue;
monitorQ.push(temp);
while(!monitorQueue.empty()) {
monitorQueue.front()->activate();
monitorQueue.pop();
}
} else {
firstConnect = false;
std::queue<CAChannelPutPtr> putQ;
std::queue<CAChannelGetPtr> getQ;
std::queue<CAChannelMonitorPtr> monitorQ;
{
Lock lock(requestsMutex);
std::vector<CAChannelGetWPtr>::const_iterator getiter;
for (getiter = getList.begin(); getiter != getList.end(); ++getiter) {
CAChannelGetPtr temp = (*getiter).lock();
if(!temp) continue;
getQ.push(temp);
}
std::vector<CAChannelPutWPtr>::const_iterator putiter;
for (putiter = putList.begin(); putiter != putList.end(); ++putiter) {
CAChannelPutPtr temp = (*putiter).lock();
if(!temp) continue;
putQ.push(temp);
}
std::vector<CAChannelMonitorWPtr>::const_iterator monitoriter;
for (monitoriter = monitorList.begin(); monitoriter != monitorList.end(); ++monitoriter) {
CAChannelMonitorPtr temp = (*monitoriter).lock();
if(!temp) continue;
monitorQ.push(temp);
}
}
while(!putQ.empty()) {
putQ.front()->channelCreated(Status::Ok,shared_from_this());
putQ.pop();
}
while(!getQ.empty()) {
getQ.front()->channelCreated(Status::Ok,shared_from_this());
getQ.pop();
}
while(!monitorQ.empty()) {
monitorQ.front()->channelCreated(Status::Ok,shared_from_this());
monitorQ.pop();
}
}
while(!putQ.empty()) {
putQ.front()->channelCreated(Status::Ok,shared_from_this());
putQ.pop();
}
while(!getQ.empty()) {
getQ.front()->channelCreated(Status::Ok,shared_from_this());
getQ.pop();
}
while(!monitorQ.empty()) {
monitorQ.front()->channelCreated(Status::Ok,shared_from_this());
monitorQ.pop();
}
ChannelRequester::shared_pointer req(channelRequester.lock());
if(req) {
@@ -372,7 +380,8 @@ CAChannel::CAChannel(std::string const & _channelName,
channelRequester(_channelRequester),
channelID(0),
channelType(0),
elementCount(0)
elementCount(0),
firstConnect(true)
{
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::CAChannel " << channelName << endl;
@@ -520,19 +529,14 @@ std::tr1::shared_ptr<ChannelRequester> CAChannel::getChannelRequester()
void CAChannel::getField(GetFieldRequester::shared_pointer const & requester,
std::string const & subField)
{
Field::const_shared_pointer field =
subField.empty() ?
std::tr1::static_pointer_cast<const Field>(structure) :
structure->getField(subField);
if (field)
{
EXCEPTION_GUARD(requester->getDone(Status::Ok, field));
if(DEBUG_LEVEL>0) {
cout << "CAChannel::getField " << channelName << endl;
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, "field '" + subField + "' not found");
EXCEPTION_GUARD(requester->getDone(errorStatus, FieldConstPtr()));
CAChannelGetFieldPtr getField(new CAChannelGetField(requester,subField));
if(getConnectionState()==Channel::CONNECTED) {
getField->callRequester(shared_from_this());
} else {
getFieldQueue.push(getField);
}
}
@@ -613,6 +617,47 @@ void CAChannel::printInfo(std::ostream& out)
}
CAChannelGetField::CAChannelGetField(
GetFieldRequester::shared_pointer const & requester,std::string const & subField)
: getFieldRequester(requester),
subField(subField)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGetField::CAChannelGetField()\n";
}
}
CAChannelGetField::~CAChannelGetField()
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGetField::~CAChannelGetField()\n";
}
}
void CAChannelGetField::callRequester(CAChannelPtr const & caChannel)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGetField::callRequester\n";
}
GetFieldRequester::shared_pointer requester(getFieldRequester.lock());
if(!requester) return;
epics::pvData::Structure::const_shared_pointer structure(caChannel->getStructure());
Field::const_shared_pointer field =
subField.empty() ?
std::tr1::static_pointer_cast<const Field>(structure) :
structure->getField(subField);
if (field)
{
EXCEPTION_GUARD(requester->getDone(Status::Ok, field));
}
else
{
Status errorStatus(Status::STATUSTYPE_ERROR, "field '" + subField + "' not found");
EXCEPTION_GUARD(requester->getDone(errorStatus, FieldConstPtr()));
}
}
/* ---------------------------------------------------------- */
void CAChannel::threadAttach()
@@ -624,15 +669,6 @@ void CAChannel::threadAttach()
}
CAChannelGetPtr CAChannelGet::create(
CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return CAChannelGetPtr(new CAChannelGet(channel, channelGetRequester, pvRequest));
}
static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype nativeType)
{
// get "field" sub-structure
@@ -642,9 +678,13 @@ static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype n
fieldSubField = pvRequest;
Structure::const_shared_pointer fieldStructure = fieldSubField->getStructure();
// no fields or control -> DBR_CTRL_<type>
if (fieldStructure->getNumberFields() == 0 ||
fieldStructure->getField("control"))
// no fields
if (fieldStructure->getNumberFields() == 0)
{
return static_cast<chtype>(static_cast<int>(nativeType) + DBR_TIME_STRING);
}
// control -> DBR_CTRL_<type>
if (fieldStructure->getField("control"))
return static_cast<chtype>(static_cast<int>(nativeType) + DBR_CTRL_STRING);
// display/valueAlarm -> DBR_GR_<type>
@@ -666,6 +706,15 @@ static chtype getDBRType(PVStructure::shared_pointer const & pvRequest, chtype n
size_t CAChannelGet::num_instances;
CAChannelGetPtr CAChannelGet::create(
CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)
{
return CAChannelGetPtr(new CAChannelGet(channel, channelGetRequester, pvRequest));
}
CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
ChannelGetRequester::shared_pointer const & channelGetRequester,
epics::pvData::PVStructure::shared_pointer const & pvRequest)

View File

@@ -24,6 +24,9 @@ namespace ca {
class CAChannel;
typedef std::tr1::shared_ptr<CAChannel> CAChannelPtr;
class CAChannelGetField;
typedef std::tr1::shared_ptr<CAChannelGetField> CAChannelGetFieldPtr;
typedef std::tr1::weak_ptr<CAChannelGetField> CAChannelGetFieldWPtr;
class CAChannelPut;
typedef std::tr1::shared_ptr<CAChannelPut> CAChannelPutPtr;
typedef std::tr1::weak_ptr<CAChannelPut> CAChannelPutWPtr;
@@ -34,8 +37,18 @@ class CAChannelMonitor;
typedef std::tr1::shared_ptr<CAChannelMonitor> CAChannelMonitorPtr;
typedef std::tr1::weak_ptr<CAChannelMonitor> CAChannelMonitorWPtr;
typedef std::tr1::shared_ptr<ChannelBaseRequester> ChannelBaseRequesterPtr;
typedef std::tr1::weak_ptr<ChannelBaseRequester> ChannelBaseRequesterWPtr;
class CAChannelGetField :
public std::tr1::enable_shared_from_this<CAChannelGetField>
{
public:
POINTER_DEFINITIONS(CAChannelGetField);
CAChannelGetField(GetFieldRequester::shared_pointer const & requester,std::string const & subField);
~CAChannelGetField();
void callRequester(CAChannelPtr const & caChannel);
private:
GetFieldRequester::weak_pointer getFieldRequester;
std::string subField;
};
class CAChannel :
public Channel,
@@ -116,10 +129,12 @@ private:
chid channelID;
chtype channelType;
unsigned elementCount;
bool firstConnect;
epics::pvData::Structure::const_shared_pointer structure;
epics::pvData::Mutex requestsMutex;
std::queue<CAChannelGetFieldPtr> getFieldQueue;
std::queue<CAChannelPutPtr> putQueue;
std::queue<CAChannelGetPtr> getQueue;
std::queue<CAChannelMonitorPtr> monitorQueue;