diff --git a/src/ca/nciu.cpp b/src/ca/nciu.cpp index 7a0038415..6609295ac 100644 --- a/src/ca/nciu.cpp +++ b/src/ca/nciu.cpp @@ -252,7 +252,7 @@ cacChannel::ioStatus nciu::read ( { guard.assertIdenticalMutex ( this->cacCtx.mutexRef () ); - if ( ! this->isConnected ( guard ) ) { + if ( ! this->connected ( guard ) ) { throw cacChannel::notConnected (); } if ( ! this->accessRightState.readPermit () ) { @@ -398,17 +398,11 @@ bool nciu::ca_v42_ok ( short nciu::nativeType ( epicsGuard < epicsMutex > & guard ) const { - short type; - if ( this->channelNode::isConnected ( guard ) ) { + short type = TYPENOTCONN; + if ( this->connected ( guard ) ) { if ( this->typeCode < SHRT_MAX ) { type = static_cast ( this->typeCode ); } - else { - type = TYPENOTCONN; - } - } - else { - type = TYPENOTCONN; } return type; } @@ -416,13 +410,10 @@ short nciu::nativeType ( arrayElementCount nciu::nativeElementCount ( epicsGuard < epicsMutex > & guard ) const { - arrayElementCount countOut; - if ( this->channelNode::isConnected ( guard ) ) { + arrayElementCount countOut = 0ul; + if ( this->connected ( guard ) ) { countOut = this->count; } - else { - countOut = 0ul; - } return countOut; } @@ -455,7 +446,12 @@ double nciu::receiveWatchdogDelay ( bool nciu::connected ( epicsGuard < epicsMutex > & guard ) const { guard.assertIdenticalMutex ( this->cacCtx.mutexRef () ); - return this->channelNode::isConnected ( guard ); + if ( this->piiu->ca_v42_ok ( guard ) ) { + return this->channelNode::isConnectedAtOrAfterV42 ( guard ); + } + else { + return this->channelNode::isConnectedBeforeV42 ( guard ); + } } void nciu::show ( unsigned level ) const @@ -467,7 +463,7 @@ void nciu::show ( unsigned level ) const void nciu::show ( epicsGuard < epicsMutex > & guard, unsigned level ) const { - if ( this->channelNode::isConnected ( guard ) ) { + if ( this->connected ( guard ) ) { char hostNameTmp [256]; this->hostName ( guard, hostNameTmp, sizeof ( hostNameTmp ) ); ::printf ( "Channel \"%s\", connected to server %s", diff --git a/src/ca/nciu.h b/src/ca/nciu.h index a4d09414a..498da80c2 100644 --- a/src/ca/nciu.h +++ b/src/ca/nciu.h @@ -57,7 +57,8 @@ class channelNode : public tsDLNode < class nciu > { protected: channelNode (); - bool isConnected ( epicsGuard < epicsMutex > & ) const; + bool isConnectedAtOrAfterV42 ( epicsGuard < epicsMutex > & ) const; + bool isConnectedBeforeV42 ( epicsGuard < epicsMutex > & ) const; bool isInstalledInServer ( epicsGuard < epicsMutex > & ) const; static unsigned getMaxSearchTimerCount (); private: @@ -351,7 +352,7 @@ inline channelNode::channelNode () : { } -inline bool channelNode::isConnected ( epicsGuard < epicsMutex > & ) const +inline bool channelNode::isConnectedAtOrAfterV42 ( epicsGuard < epicsMutex > & ) const { return this->listMember == cs_connected || @@ -359,6 +360,15 @@ inline bool channelNode::isConnected ( epicsGuard < epicsMutex > & ) const this->listMember == cs_subscripUpdateReqPend; } +inline bool channelNode::isConnectedBeforeV42 ( epicsGuard < epicsMutex > & ) const +{ + return + this->listMember == cs_connected || + this->listMember == cs_subscripReqPend || + this->listMember == cs_subscripUpdateReqPend || + this->listMember == cs_createReqPend; +} + inline bool channelNode::isInstalledInServer ( epicsGuard < epicsMutex > & ) const { return diff --git a/src/ca/tcpRecvWatchdog.cpp b/src/ca/tcpRecvWatchdog.cpp index e81a5677a..3c86be97c 100644 --- a/src/ca/tcpRecvWatchdog.cpp +++ b/src/ca/tcpRecvWatchdog.cpp @@ -115,21 +115,18 @@ void tcpRecvWatchdog::beaconAnomalyNotify ( } void tcpRecvWatchdog::messageArrivalNotify ( + epicsGuard < epicsMutex > & guard, const epicsTime & currentTime ) { - bool restartNeeded = false; - { - epicsGuard < epicsMutex > guard ( this->mutex ); - if ( ! ( this->shuttingDown || this->probeResponsePending ) ) { - this->beaconAnomaly = false; - restartNeeded = true; - } - } - // dont hold the lock for fear of deadlocking - // because cancel is blocking for the completion - // of expire() which takes the lock - it take also - // the callback lock - if ( restartNeeded ) { + guard.assertIdenticalMutex ( this->mutex ); + + if ( ! ( this->shuttingDown || this->probeResponsePending ) ) { + this->beaconAnomaly = false; + // dont hold the lock for fear of deadlocking + // because cancel is blocking for the completion + // of expire() which takes the lock - it take also + // the callback lock + epicsGuardRelease < epicsMutex > unguard ( guard ); this->timer.start ( *this, currentTime + this->period ); debugPrintf ( ("received a message - reseting circuit recv watchdog\n") ); } @@ -203,15 +200,17 @@ void tcpRecvWatchdog::sendBacklogProgressNotify ( } } -void tcpRecvWatchdog::connectNotify () +void tcpRecvWatchdog::connectNotify ( + epicsGuard < epicsMutex > & guard ) { - { - epicsGuard < epicsMutex > guard ( this->mutex ); - if ( this->shuttingDown ) { - return; - } + guard.assertIdenticalMutex ( this->mutex ); + if ( this->shuttingDown ) { + return; + } + { + epicsGuardRelease < epicsMutex > guardRelease ( guard ); + this->timer.start ( *this, this->period ); } - this->timer.start ( *this, this->period ); debugPrintf ( ("connected to the server - initiating circuit recv watchdog\n") ); } diff --git a/src/ca/tcpRecvWatchdog.h b/src/ca/tcpRecvWatchdog.h index bed11fcec..030608f26 100644 --- a/src/ca/tcpRecvWatchdog.h +++ b/src/ca/tcpRecvWatchdog.h @@ -42,6 +42,7 @@ public: epicsGuard < epicsMutex > &, const epicsTime & currentTime ); void messageArrivalNotify ( + epicsGuard < epicsMutex > & guard, const epicsTime & currentTime ); void probeResponseNotify ( epicsGuard < epicsMutex > &, @@ -50,10 +51,11 @@ public: epicsGuard < epicsMutex > &, const epicsTime & currentTime ); void beaconAnomalyNotify ( epicsGuard < epicsMutex > & ); - void connectNotify (); + void connectNotify ( + epicsGuard < epicsMutex > & ); void sendTimeoutNotify ( - epicsGuard < epicsMutex > & cbMutex, - epicsGuard < epicsMutex > & mutex, + epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard, const epicsTime & currentTime ); void cancel (); void shutdown (); diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index a4d40f7bf..e980051ac 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -117,9 +117,17 @@ void tcpSendThread::run () while ( nciu * pChan = this->iiu.createReqPend.get () ) { this->iiu.createChannelRequest ( *pChan, guard ); - this->iiu.createRespPend.add ( *pChan ); - pChan->channelNode::listMember = - channelNode::cs_createRespPend; + if ( CA_V42 ( this->iiu.minorProtocolVersion ) ) { + this->iiu.createRespPend.add ( *pChan ); + pChan->channelNode::listMember = + channelNode::cs_createRespPend; + } + else { + this->iiu.subscripReqPend.add ( *pChan ); + pChan->channelNode::listMember = + channelNode::cs_subscripReqPend; + } + if ( this->iiu.sendQue.flushBlockThreshold ( 0u ) ) { laborPending = true; break; @@ -226,8 +234,7 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, this->sendDog.start ( currentTime ); - while ( this->state == iiucs_connected || - this->state == iiucs_clean_shutdown ) { + while ( true ) { int status = ::send ( this->sock, static_cast < const char * > (pBuf), (int) nBytesInBuf, 0 ); if ( status > 0 ) { @@ -236,13 +243,18 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, break; } else { - int localError = SOCKERRNO; - - // winsock indicates disconnect by returning zero here - if ( status == 0 ) { - this->disconnectNotify (); + epicsGuard < epicsMutex > guard ( this->mutex ); + if ( this->state != iiucs_connected && + this->state != iiucs_clean_shutdown ) { break; } + // winsock indicates disconnect by returning zero here + if ( status == 0 ) { + this->disconnectNotify ( guard ); + break; + } + + int localError = SOCKERRNO; if ( localError == SOCK_EINTR ) { continue; @@ -252,7 +264,10 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, errlogPrintf ( "CAC: system low on network buffers " "- send retry in 15 seconds\n" ); - epicsThreadSleep ( 15.0 ); + { + epicsGuardRelease < epicsMutex > unguard ( guard ); + epicsThreadSleep ( 15.0 ); + } continue; } @@ -269,7 +284,7 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, sockErrBuf ); } - this->disconnectNotify (); + this->disconnectNotify ( guard ); break; } } @@ -284,9 +299,7 @@ void tcpiiu::recvBytes ( { assert ( nBytesInBuf <= INT_MAX ); - while ( this->state == iiucs_connected || - this->state == iiucs_clean_shutdown ) { - + while ( true ) { int status = ::recv ( this->sock, static_cast ( pBuf ), static_cast ( nBytesInBuf ), 0 ); @@ -297,10 +310,10 @@ void tcpiiu::recvBytes ( return; } else { - int localErrno = SOCKERRNO; + epicsGuard < epicsMutex > guard ( this->mutex ); if ( status == 0 ) { - this->disconnectNotify (); + this->disconnectNotify ( guard ); stat.bytesCopied = 0u; stat.circuitState = swioPeerHangup; return; @@ -315,6 +328,8 @@ void tcpiiu::recvBytes ( return; } + int localErrno = SOCKERRNO; + if ( localErrno == SOCK_SHUTDOWN ) { stat.bytesCopied = 0u; stat.circuitState = swioPeerHangup; @@ -329,7 +344,10 @@ void tcpiiu::recvBytes ( errlogPrintf ( "CAC: system low on network buffers " "- receive retry in 15 seconds\n" ); - epicsThreadSleep ( 15.0 ); + { + epicsGuardRelease < epicsMutex > unguard ( guard ); + epicsThreadSleep ( 15.0 ); + } continue; } @@ -386,25 +404,59 @@ void tcpRecvThread::exitWait () this->thread.exitWait (); } +bool tcpRecvThread::validFillStatus ( + epicsGuard < epicsMutex > & guard, const statusWireIO & stat ) +{ + if ( this->iiu.state != tcpiiu::iiucs_connected && + this->iiu.state != tcpiiu::iiucs_clean_shutdown ) { + return false; + } + if ( stat.circuitState == swioConnected ) { + return true; + } + if ( stat.circuitState == swioPeerHangup || + stat.circuitState == swioPeerAbort ) { + this->iiu.disconnectNotify ( guard ); + } + else if ( stat.circuitState == swioLinkFailure ) { + this->iiu.initiateAbortShutdown ( guard ); + } + else if ( stat.circuitState == swioLocalAbort ) { + // state change already occurred + } + else { + errlogMessage ( "cac: invalid fill status - disconnecting" ); + this->iiu.disconnectNotify ( guard ); + } + return false; +} + void tcpRecvThread::run () { try { - this->iiu.cacRef.attachToClientCtx (); - - epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu ); - - this->connect (); - - this->iiu.sendThread.start (); - - if ( this->iiu.state != tcpiiu::iiucs_connected ) { - this->iiu.disconnectNotify (); - return; + { + bool connectSuccess = false; + { + epicsGuard < epicsMutex > guard ( this->iiu.mutex ); + this->connect ( guard ); + connectSuccess = this->iiu.state == tcpiiu::iiucs_connected; + } + if ( ! connectSuccess ) { + this->iiu.recvDog.shutdown (); + this->thread.exitWaitRelease (); + this->iiu.cacRef.destroyIIU ( this->iiu ); + return; + } } + this->iiu.sendThread.start (); + epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu ); + this->iiu.cacRef.attachToClientCtx (); + comBuf * pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; - while ( this->iiu.state == tcpiiu::iiucs_connected || - this->iiu.state == tcpiiu::iiucs_clean_shutdown ) { + bool breakOut = false; + while ( ! breakOut ) { + // // if this thread has connected channels with subscriptions // that need to be sent then wakeup the send thread @@ -430,32 +482,21 @@ void tcpRecvThread::run () // statusWireIO stat; pComBuf->fillFromWire ( this->iiu, stat ); - if ( stat.circuitState != swioConnected ) { - if ( stat.circuitState == swioPeerHangup || - stat.circuitState == swioPeerAbort ) { - this->iiu.disconnectNotify (); - } - else if ( stat.circuitState == swioLinkFailure ) { - epicsGuard < epicsMutex > guard ( this->iiu.mutex ); - this->iiu.initiateAbortShutdown ( guard ); - break; - } - else if ( stat.circuitState == swioLocalAbort ) { - break; - } - else { - assert ( 0 ); - } - } - - if ( stat.bytesCopied == 0u ) { - continue; - } epicsTime currentTime = epicsTime::getCurrent (); - // reschedule connection activity watchdog - this->iiu.recvDog.messageArrivalNotify ( currentTime ); + { + epicsGuard < epicsMutex > guard ( this->iiu.mutex ); + if ( ! this->validFillStatus ( guard, stat ) ) { + break; + } + if ( stat.bytesCopied == 0u ) { + continue; + } + // reschedule connection activity watchdog + this->iiu.recvDog.messageArrivalNotify ( + guard, currentTime ); + } // only one recv thread at a time may call callbacks // - pendEvent() blocks until threads waiting for @@ -488,6 +529,7 @@ void tcpRecvThread::run () if ( ! protocolOK ) { epicsGuard < epicsMutex > guard ( this->iiu.mutex ); this->iiu.initiateAbortShutdown ( guard ); + breakOut = true; break; } @@ -497,20 +539,12 @@ void tcpRecvThread::run () } pComBuf->fillFromWire ( this->iiu, stat ); - if ( stat.circuitState != swioConnected ) { - if ( stat.circuitState == swioPeerHangup ) { - this->iiu.disconnectNotify (); - } - else if ( stat.circuitState == swioLinkFailure ) { - epicsGuard < epicsMutex > guard ( this->iiu.mutex ); - this->iiu.initiateAbortShutdown ( guard ); - } - else if ( stat.circuitState == swioLocalAbort ) { + { + epicsGuard < epicsMutex > guard ( this->iiu.mutex ); + if ( ! this->validFillStatus ( guard, stat ) ) { + breakOut = true; break; } - else { - assert ( 0 ); - } } } } @@ -542,48 +576,46 @@ void tcpRecvThread::run () /* * tcpRecvThread::connect () */ -void tcpRecvThread::connect () +void tcpRecvThread::connect ( + epicsGuard < epicsMutex > & guard ) { // attempt to connect to a CA server - while ( this->iiu.state == tcpiiu::iiucs_connecting ) { - osiSockAddr tmp = this->iiu.address (); - int status = ::connect ( this->iiu.sock, - & tmp.sa, sizeof ( tmp.sa ) ); - if ( status == 0 ) { - bool enteredConnectedState = false; - { - epicsGuard < epicsMutex > autoMutex ( this->iiu.mutex ); + while ( true ) { + int status; + { + epicsGuardRelease < epicsMutex > unguard ( guard ); + osiSockAddr tmp = this->iiu.address (); + status = ::connect ( this->iiu.sock, + & tmp.sa, sizeof ( tmp.sa ) ); + } - if ( this->iiu.state == tcpiiu::iiucs_connecting ) { - // put the iiu into the connected state - this->iiu.state = tcpiiu::iiucs_connected; - enteredConnectedState = true; - } + if ( this->iiu.state != tcpiiu::iiucs_connecting ) { + break; + } + if ( status >= 0 ) { + // put the iiu into the connected state + this->iiu.state = tcpiiu::iiucs_connected; + this->iiu.recvDog.connectNotify ( guard ); + break; + } + else { + int errnoCpy = SOCKERRNO; + + if ( errnoCpy == SOCK_EINTR ) { + continue; } - if ( enteredConnectedState ) { - // start connection activity watchdog - this->iiu.recvDog.connectNotify (); + else if ( errnoCpy == SOCK_SHUTDOWN ) { + break; + } + else { + char sockErrBuf[64]; + epicsSocketConvertErrnoToString ( + sockErrBuf, sizeof ( sockErrBuf ) ); + errlogPrintf ( "CAC: Unable to connect because \"%s\"\n", + sockErrBuf ); + this->iiu.disconnectNotify ( guard ); + break; } - break; - } - - int errnoCpy = SOCKERRNO; - - if ( errnoCpy == SOCK_EINTR ) { - continue; - } - else if ( errnoCpy == SOCK_SHUTDOWN ) { - break; - } - else { - char sockErrBuf[64]; - epicsSocketConvertErrnoToString ( - sockErrBuf, sizeof ( sockErrBuf ) ); - callbackManager mgr ( this->ctxNotify, this->cbMutex ); - this->iiu.printf ( mgr.cbGuard, "Unable to connect because \"%s\"\n", - sockErrBuf ); - this->iiu.disconnectNotify (); - break; } } return; @@ -768,28 +800,30 @@ void tcpiiu::initiateCleanShutdown ( { guard.assertIdenticalMutex ( this->mutex ); if ( this->state == iiucs_connected ) { - if ( ! this->unresponsiveCircuit ) { + if ( this->unresponsiveCircuit ) { + this->initiateAbortShutdown ( guard ); + } + else { this->state = iiucs_clean_shutdown; this->sendThreadFlushEvent.signal (); this->flushBlockEvent.signal (); } - else { - this->initiateAbortShutdown ( guard ); - } } else if ( this->state == iiucs_clean_shutdown ) { if ( this->unresponsiveCircuit ) { this->initiateAbortShutdown ( guard ); } } + else if ( this->state == iiucs_connecting ) { + this->initiateAbortShutdown ( guard ); + } } -void tcpiiu::disconnectNotify () +void tcpiiu::disconnectNotify ( + epicsGuard < epicsMutex > & guard ) { - { - epicsGuard < epicsMutex > guard ( this->mutex ); - this->state = iiucs_disconnected; - } + guard.assertIdenticalMutex ( this->mutex ); + this->state = iiucs_disconnected; this->sendThreadFlushEvent.signal (); this->flushBlockEvent.signal (); } @@ -1314,11 +1348,6 @@ void tcpiiu::writeRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 nciu &chan, unsigned type, arrayElementCount nElem, const void *pValue ) { guard.assertIdenticalMutex ( this->mutex ); - // there are situations where the circuit is disconnected, but - // the channel does not know this yet - if ( this->state != iiucs_connected ) { - throw cacChannel::notConnected (); - } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE, type, nElem, chan.getSID(guard), chan.getCID(guard), pValue, @@ -1336,11 +1365,6 @@ void tcpiiu::writeNotifyRequest ( epicsGuard < epicsMutex > & guard, // X aCC 43 if ( ! this->ca_v41_ok ( guard ) ) { throw cacChannel::unsupportedByService(); } - // there are situations where the circuit is disconnected, but - // the channel does not know this yet - if ( this->state != iiucs_connected ) { - throw cacChannel::notConnected (); - } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE_NOTIFY, type, nElem, chan.getSID(guard), io.getId(), pValue, @@ -1353,11 +1377,6 @@ void tcpiiu::readNotifyRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 unsigned dataType, arrayElementCount nElem ) { guard.assertIdenticalMutex ( this->mutex ); - // there are situations where the circuit is disconnected, but - // the channel does not know this yet - if ( this->state != iiucs_connected ) { - throw cacChannel::notConnected (); - } if ( INVALID_DB_REQ ( dataType ) ) { throw cacChannel::badType (); } @@ -1608,13 +1627,6 @@ void tcpiiu::eliminateExcessiveSendBacklog ( { mutualExclusionGuard.assertIdenticalMutex ( this->mutex ); - // this is used to slow down applications that use too much - // send cache, but this is inappropriate when the circuit hasnt - // connected yet. - if ( this->state == iiucs_connecting ) { - return; - } - if ( this->sendQue.flushBlockThreshold ( 0u ) ) { this->flushRequest ( mutualExclusionGuard ); // the process thread is not permitted to flush as this @@ -1631,9 +1643,13 @@ void tcpiiu::eliminateExcessiveSendBacklog ( this->blockingForFlush++; while ( this->sendQue.flushBlockThreshold(0u) ) { + bool userRequestsCanBeAccepted = + this->state == iiucs_connected || + ( ! this->ca_v42_ok ( mutualExclusionGuard ) && + this->state == iiucs_connecting ); // fail the users request if we have a disconnected // or unresponsive circuit - if ( this->state != iiucs_connected || + if ( ! userRequestsCanBeAccepted || this->unresponsiveCircuit ) { throw cacChannel::notConnected (); } @@ -1749,10 +1765,13 @@ void tcpiiu::unlinkAllChannels ( guard.assertIdenticalMutex ( this->mutex ); while ( nciu * pChan = this->createReqPend.get () ) { + // with server prior to V42 IO could exit here + pChan->disconnectAllIO ( cbGuard, guard ); pChan->serviceShutdownNotify ( cbGuard, guard ); } while ( nciu * pChan = this->createRespPend.get () ) { + pChan->disconnectAllIO ( cbGuard, guard ); this->clearChannelRequest ( guard, pChan->getSID(guard), pChan->getCID(guard) ); pChan->serviceShutdownNotify ( cbGuard, guard ); @@ -1813,10 +1832,13 @@ void tcpiiu::connectNotify ( epicsGuard < epicsMutex > & guard, nciu & chan ) { guard.assertIdenticalMutex ( this->mutex ); - - this->createRespPend.remove ( chan ); - this->subscripReqPend.add ( chan ); - chan.channelNode::listMember = channelNode::cs_subscripReqPend; + // this improves robustness in the face of a server sending + // protocol that does not match its declared protocol revision + if ( chan.channelNode::listMember == channelNode::cs_createRespPend ) { + this->createRespPend.remove ( chan ); + this->subscripReqPend.add ( chan ); + chan.channelNode::listMember = channelNode::cs_subscripReqPend; + } // the TCP send thread is awakened by its receive thread whenever the receive thread // is about to block if this->subscripReqPend has items in it } diff --git a/src/ca/virtualCircuit.h b/src/ca/virtualCircuit.h index 16aab7f02..dfdd81dd5 100644 --- a/src/ca/virtualCircuit.h +++ b/src/ca/virtualCircuit.h @@ -72,7 +72,11 @@ private: epicsMutex & cbMutex; cacContextNotify & ctxNotify; void run (); - void connect (); + void connect ( + epicsGuard < epicsMutex > & guard ); + bool validFillStatus ( + epicsGuard < epicsMutex > & guard, + const statusWireIO & stat ); }; class tcpSendThread : private epicsThreadRunable { @@ -250,7 +254,8 @@ private: epicsGuard < epicsMutex > & ); void initiateAbortShutdown ( epicsGuard < epicsMutex > & ); - void disconnectNotify (); + void disconnectNotify ( + epicsGuard < epicsMutex > & ); // send protocol stubs void echoRequest (