diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index d93f5ad53..b280e8202 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -181,15 +181,23 @@ void tcpSendThread::run () } this->iiu.sendDog.cancel (); - - // wakeup user threads blocking for send backlog to be reduced - // and wait for them to stop using this IIU - this->iiu.flushBlockEvent.signal (); - while ( this->iiu.blockingForFlush ) { - epicsThreadSleep ( 0.1 ); - } + this->iiu.recvDog.cancel (); this->iiu.recvThread.exitWait (); + + // user threads blocking for send backlog to be reduced + // will abort their attempt to get space if + // the state of the tcpiiu changes from connected to a + // disconnecting state. Nevertheless, we need to wait + // for them to finish prior to destroying the IIU. + { + epicsGuard < epicsMutex > guard ( this->iiu.mutex ); + while ( this->iiu.blockingForFlush ) { + epicsGuardRelease < epicsMutex > unguard ( guard ); + epicsThreadSleep ( 0.1 ); + } + } + this->thread.exitWaitRelease (); this->iiu.cacRef.destroyIIU ( this->iiu ); @@ -288,6 +296,19 @@ void tcpiiu::recvBytes ( // bad file descriptor if ( this->state != iiucs_connected && this->state != iiucs_clean_shutdown ) { + // the replacable printf handler isnt called here + // because it reqires a callback lock which probably + // isnt appropriate here + char name[64]; + this->hostNameCacheInstance.hostName ( + name, sizeof ( name ) ); + char sockErrBuf[64]; + epicsSocketConvertErrnoToString ( + sockErrBuf, sizeof ( sockErrBuf ) ); + errlogPrintf ( + "Unexpected problem with CA circuit to" + " server \"%s\" was \"%s\" - disconnecting\n", + name, sockErrBuf ); stat.bytesCopied = 0u; stat.circuitState = swioLocalAbort; return; @@ -396,17 +417,6 @@ void tcpRecvThread::run () break; } else if ( stat.circuitState == swioLocalAbort ) { - callbackManager mgr ( this->ctxNotify, this->cbMutex ); - epicsGuard < epicsMutex > guard ( this->iiu.mutex ); - char name[64]; - this->iiu.hostName ( guard, name, sizeof ( name ) ); - char sockErrBuf[64]; - epicsSocketConvertErrnoToString ( - sockErrBuf, sizeof ( sockErrBuf ) ); - this->iiu.printf ( mgr.cbGuard, - "Unexpected problem with CA circuit to", - " server \"%s\" was \"%s\" - disconnecting\n", - name, sockErrBuf ); break; } else { @@ -472,16 +482,6 @@ void tcpRecvThread::run () this->iiu.initiateAbortShutdown ( mgr, guard ); } else if ( stat.circuitState == swioLocalAbort ) { - epicsGuard < epicsMutex > guard ( this->iiu.mutex ); - char name[64]; - this->iiu.hostName ( guard, name, sizeof ( name ) ); - char sockErrBuf[64]; - epicsSocketConvertErrnoToString ( - sockErrBuf, sizeof ( sockErrBuf ) ); - this->iiu.printf ( mgr.cbGuard, - "Unexpected problem with CA circuit to", - " server \"%s\" was \"%s\" - disconnecting\n", - name, sockErrBuf ); break; } else { @@ -746,6 +746,7 @@ void tcpiiu::initiateCleanShutdown ( if ( this->state == iiucs_connected ) { this->state = iiucs_clean_shutdown; this->sendThreadFlushEvent.signal (); + this->flushBlockEvent.signal (); } } @@ -756,6 +757,7 @@ void tcpiiu::disconnectNotify () this->state = iiucs_disconnected; } this->sendThreadFlushEvent.signal (); + this->flushBlockEvent.signal (); } void tcpiiu::responsiveCircuitNotify ( @@ -807,6 +809,7 @@ void tcpiiu::unresponsiveCircuitNotify ( this->unresponsiveCircuit = true; this->echoRequestPending = true; this->sendThreadFlushEvent.signal (); + this->flushBlockEvent.signal (); this->recvDog.cancel (); this->sendDog.cancel (); @@ -898,6 +901,7 @@ void tcpiiu::initiateAbortShutdown ( // wake up the send thread if it isnt blocking in send() // this->sendThreadFlushEvent.signal (); + this->flushBlockEvent.signal (); } // Disconnect all channels immediately from the timer thread @@ -931,6 +935,8 @@ tcpiiu::~tcpiiu () { this->sendThread.exitWait (); this->recvThread.exitWait (); + this->sendDog.cancel (); + this->recvDog.cancel (); if ( ! this->socketHasBeenClosed ) { epicsSocketDestroy ( this->sock ); @@ -1290,7 +1296,11 @@ void tcpiiu::writeRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 nciu &chan, unsigned type, arrayElementCount nElem, const void *pValue ) { guard.assertIdenticalMutex ( this->mutex ); - + // there are situations where the circuit is disconnected, but + // the channel does not know this yet + if ( this->state != iiucs_connected ) { + throw cacChannel::notConnected (); + } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE, type, nElem, chan.getSID(guard), chan.getCID(guard), pValue, @@ -1308,6 +1318,11 @@ void tcpiiu::writeNotifyRequest ( epicsGuard < epicsMutex > & guard, // X aCC 43 if ( ! this->ca_v41_ok ( guard ) ) { throw cacChannel::unsupportedByService(); } + // there are situations where the circuit is disconnected, but + // the channel does not know this yet + if ( this->state != iiucs_connected ) { + throw cacChannel::notConnected (); + } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE_NOTIFY, type, nElem, chan.getSID(guard), io.getId(), pValue, @@ -1320,7 +1335,11 @@ void tcpiiu::readNotifyRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 unsigned dataType, arrayElementCount nElem ) { guard.assertIdenticalMutex ( this->mutex ); - + // there are situations where the circuit is disconnected, but + // the channel does not know this yet + if ( this->state != iiucs_connected ) { + throw cacChannel::notConnected (); + } if ( INVALID_DB_REQ ( dataType ) ) { throw cacChannel::badType (); } @@ -1351,6 +1370,11 @@ void tcpiiu::createChannelRequest ( { guard.assertIdenticalMutex ( this->mutex ); + if ( this->state != iiucs_connected && + this->state != iiucs_connecting ) { + return; + } + const char *pName; unsigned nameLength; ca_uint32_t identity; @@ -1394,7 +1418,11 @@ void tcpiiu::clearChannelRequest ( epicsGuard < epicsMutex > & guard, // X aCC 4 ca_uint32_t sid, ca_uint32_t cid ) { guard.assertIdenticalMutex ( this->mutex ); - + // there are situations where the circuit is disconnected, but + // the channel does not know this yet + if ( this->state != iiucs_connected ) { + return; + } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( CA_PROTO_CLEAR_CHANNEL, 0u, @@ -1412,8 +1440,10 @@ void tcpiiu::subscriptionRequest ( nciu & chan, netSubscription & subscr ) { guard.assertIdenticalMutex ( this->mutex ); - - if ( ! chan.isConnected ( guard ) ) { + // there are situations where the circuit is disconnected, but + // the channel does not know this yet + if ( this->state != iiucs_connected && + this->state != iiucs_connecting ) { return; } unsigned mask = subscr.getMask(guard); @@ -1456,12 +1486,14 @@ void tcpiiu::subscriptionRequest ( // this routine return void because if this internally fails the best response // is to try again the next time that we reconnect // -void tcpiiu::subscriptionUpdateRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431 - nciu & chan, netSubscription & subscr ) +void tcpiiu::subscriptionUpdateRequest ( + epicsGuard < epicsMutex > & guard, // X aCC 431 + nciu & chan, netSubscription & subscr ) { guard.assertIdenticalMutex ( this->mutex ); - - if ( ! chan.isConnected ( guard ) ) { + // there are situations where the circuit is disconnected, but + // the channel does not know this yet + if ( this->state != iiucs_connected ) { return; } arrayElementCount nElem = subscr.getCount ( guard ); @@ -1493,7 +1525,11 @@ void tcpiiu::subscriptionCancelRequest ( epicsGuard < epicsMutex > & guard, // X nciu & chan, netSubscription & subscr ) { guard.assertIdenticalMutex ( this->mutex ); - + // there are situations where the circuit is disconnected, but + // the channel does not know this yet + if ( this->state != iiucs_connected ) { + return; + } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( CA_PROTO_EVENT_CANCEL, 0u, @@ -1554,6 +1590,13 @@ void tcpiiu::eliminateExcessiveSendBacklog ( { mutualExclusionGuard.assertIdenticalMutex ( this->mutex ); + // this is used to slow down applications that use too much + // send cache, but this is inappropriate when the circuit hasnt + // connected yet. + if ( this->state == iiucs_connecting ) { + return; + } + if ( this->sendQue.flushBlockThreshold ( 0u ) ) { this->flushRequest ( mutualExclusionGuard ); // the process thread is not permitted to flush as this @@ -1568,8 +1611,15 @@ void tcpiiu::eliminateExcessiveSendBacklog ( // pointer to this cac might become invalid assert ( this->blockingForFlush < UINT_MAX ); this->blockingForFlush++; - while ( this->sendQue.flushBlockThreshold(0u) && - this->state == iiucs_connected ) { + while ( this->sendQue.flushBlockThreshold(0u) ) { + + // fail the users request if we have a disconnected + // or unresponsive circuit + if ( this->state != iiucs_connected || + this->unresponsiveCircuit ) { + throw cacChannel::notConnected (); + } + epicsGuardRelease < epicsMutex > autoRelease ( mutualExclusionGuard ); if ( pCallbackGuard ) {