fix incorrect logic

This commit is contained in:
mrkraimer
2017-11-18 07:05:34 -05:00
parent 32d0ece858
commit e4182d0b00
2 changed files with 81 additions and 74 deletions

View File

@@ -238,6 +238,9 @@ void CAChannel::connected()
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::connected " << channelName << endl;
}
std::queue<CAChannelPutPtr> putQ;
std::queue<CAChannelGetPtr> getQ;
std::queue<CAChannelMonitorPtr> monitorQ;
{
Lock lock(requestsMutex);
// we assume array if element count > 1
@@ -258,62 +261,53 @@ void CAChannel::connected()
// TODO we need only Structure here
this->structure = structure;
std::vector<CAChannelGetWPtr>::const_iterator getiter;
for (getiter = getList.begin(); getiter != getList.end(); ++getiter) {
CAChannelGetPtr temp = (*getiter).lock();
if(!temp) continue;
getQ.push(temp);
}
std::vector<CAChannelPutWPtr>::const_iterator putiter;
for (putiter = putList.begin(); putiter != putList.end(); ++putiter) {
CAChannelPutPtr temp = (*putiter).lock();
if(!temp) continue;
putQ.push(temp);
}
std::vector<CAChannelMonitorWPtr>::const_iterator monitoriter;
for (monitoriter = monitorList.begin(); monitoriter != monitorList.end(); ++monitoriter) {
CAChannelMonitorPtr temp = (*monitoriter).lock();
if(!temp) continue;
monitorQ.push(temp);
}
}
while(!putQ.empty()) {
putQ.front()->channelCreated(Status::Ok,shared_from_this());
putQ.pop();
}
while(!getQ.empty()) {
getQ.front()->channelCreated(Status::Ok,shared_from_this());
getQ.pop();
}
while(!monitorQ.empty()) {
monitorQ.front()->channelCreated(Status::Ok,shared_from_this());
monitorQ.pop();
}
while(!getFieldQueue.empty()) {
getFieldQueue.front()->callRequester(shared_from_this());
getFieldQueue.pop();
}
if(firstConnect) {
while(!putQueue.empty()) {
putQueue.front()->activate();
putQueue.pop();
}
while(!getQueue.empty()) {
getQueue.front()->activate();
getQueue.pop();
}
while(!monitorQueue.empty()) {
monitorQueue.front()->activate();
monitorQueue.pop();
}
} else {
firstConnect = false;
std::queue<CAChannelPutPtr> putQ;
std::queue<CAChannelGetPtr> getQ;
std::queue<CAChannelMonitorPtr> monitorQ;
{
Lock lock(requestsMutex);
std::vector<CAChannelGetWPtr>::const_iterator getiter;
for (getiter = getList.begin(); getiter != getList.end(); ++getiter) {
CAChannelGetPtr temp = (*getiter).lock();
if(!temp) continue;
getQ.push(temp);
}
std::vector<CAChannelPutWPtr>::const_iterator putiter;
for (putiter = putList.begin(); putiter != putList.end(); ++putiter) {
CAChannelPutPtr temp = (*putiter).lock();
if(!temp) continue;
putQ.push(temp);
}
std::vector<CAChannelMonitorWPtr>::const_iterator monitoriter;
for (monitoriter = monitorList.begin(); monitoriter != monitorList.end(); ++monitoriter) {
CAChannelMonitorPtr temp = (*monitoriter).lock();
if(!temp) continue;
monitorQ.push(temp);
}
}
while(!putQ.empty()) {
putQ.front()->channelCreated(Status::Ok,shared_from_this());
putQ.pop();
}
while(!getQ.empty()) {
getQ.front()->channelCreated(Status::Ok,shared_from_this());
getQ.pop();
}
while(!monitorQ.empty()) {
monitorQ.front()->channelCreated(Status::Ok,shared_from_this());
monitorQ.pop();
}
while(!putQueue.empty()) {
putQueue.front()->activate();
putQueue.pop();
}
while(!getQueue.empty()) {
getQueue.front()->activate();
getQueue.pop();
}
while(!monitorQueue.empty()) {
monitorQueue.front()->activate();
monitorQueue.pop();
}
ChannelRequester::shared_pointer req(channelRequester.lock());
if(req) {
@@ -380,8 +374,7 @@ CAChannel::CAChannel(std::string const & _channelName,
channelRequester(_channelRequester),
channelID(0),
channelType(0),
elementCount(0),
firstConnect(true)
elementCount(0)
{
if(DEBUG_LEVEL>0) {
cout<< "CAChannel::CAChannel " << channelName << endl;
@@ -533,11 +526,14 @@ void CAChannel::getField(GetFieldRequester::shared_pointer const & requester,
cout << "CAChannel::getField " << channelName << endl;
}
CAChannelGetFieldPtr getField(new CAChannelGetField(requester,subField));
if(getConnectionState()==Channel::CONNECTED) {
getField->callRequester(shared_from_this());
} else {
getFieldQueue.push(getField);
}
{
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
getFieldQueue.push(getField);
return;
}
}
getField->callRequester(shared_from_this());
}
@@ -559,12 +555,16 @@ ChannelGet::shared_pointer CAChannel::createChannelGet(
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createChannelGet " << channelName << endl;
}
CAChannelGetPtr channelGet = CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest);\
if(getConnectionState()==Channel::CONNECTED) {
channelGet->activate();
} else {
getQueue.push(channelGet);
CAChannelGetPtr channelGet =
CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest);
{
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
getQueue.push(channelGet);
return channelGet;
}
}
channelGet->activate();
return channelGet;
}
@@ -576,12 +576,16 @@ ChannelPut::shared_pointer CAChannel::createChannelPut(
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createChannelPut " << channelName << endl;
}
CAChannelPutPtr channelPut = CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest);\
if(getConnectionState()==Channel::CONNECTED) {
channelPut->activate();
} else {
putQueue.push(channelPut);
CAChannelPutPtr channelPut =
CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest);
{
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
putQueue.push(channelPut);
return channelPut;
}
}
channelPut->activate();
return channelPut;
}
@@ -593,12 +597,16 @@ Monitor::shared_pointer CAChannel::createMonitor(
if(DEBUG_LEVEL>0) {
cout << "CAChannel::createMonitor " << channelName << endl;
}
CAChannelMonitorPtr channelMonitor = CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest);\
if(getConnectionState()==Channel::CONNECTED) {
channelMonitor->activate();
} else {
monitorQueue.push(channelMonitor);
CAChannelMonitorPtr channelMonitor =
CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest);
{
Lock lock(requestsMutex);
if(getConnectionState()!=Channel::CONNECTED) {
monitorQueue.push(channelMonitor);
return channelMonitor;
}
}
channelMonitor->activate();
return channelMonitor;
}