diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 3794ea69f..09ed11f3a 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -43,9 +43,9 @@ void tcpSendThread::start () this->thread.start (); } -bool tcpSendThread::exitWait ( double delay ) +void tcpSendThread::exitWait () { - return thread.exitWait ( delay ); + this->thread.exitWait (); } void tcpSendThread::run () @@ -57,7 +57,7 @@ void tcpSendThread::run () this->iiu.sendThreadFlushEvent.wait (); - if ( this->iiu.state != iiu_connected ) { + if ( this->iiu.state != tcpiiu::iiucs_connected ) { break; } @@ -95,10 +95,24 @@ void tcpSendThread::run () break; } } + if ( this->iiu.state == tcpiiu::iiucs_clean_shutdown ) { + this->iiu.flush (); + } } catch ( ... ) { - this->iiu.printf ("cac: tcp send thread received an unexpected exception - disconnecting\n"); - this->iiu.forcedShutdown (); + this->iiu.printf ( + "cac: tcp send thread received an unexpected exception - disconnecting\n"); + } + + this->iiu.sendDog.cancel (); + + this->iiu.cacRef.initiateAbortShutdown ( this->iiu ); + + // wakeup user threads blocking for send backlog to be reduced + // and wait for them to stop using this IIU + this->iiu.flushBlockEvent.signal (); + while ( this->iiu.blockingForFlush ) { + epicsThreadSleep ( 0.1 ); } } @@ -108,7 +122,8 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, int status; unsigned nBytes = 0u; - if ( this->state != iiu_connected ) { + if ( this->state != iiucs_connected && + this->state != iiucs_clean_shutdown ) { return 0u; } @@ -128,12 +143,13 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, // winsock indicates disconnect by returniing zero here if ( status == 0 ) { - this->state = iiu_disconnected; + this->cacRef.initiateAbortShutdown ( *this ); nBytes = 0u; break; } if ( localError == SOCK_SHUTDOWN ) { + this->cacRef.initiateAbortShutdown ( *this ); nBytes = 0u; break; } @@ -144,10 +160,11 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, if ( localError != SOCK_EPIPE && localError != SOCK_ECONNRESET && localError != SOCK_ETIMEDOUT && localError != SOCK_ECONNABORTED ) { - this->cacRef.printf ( "CAC: unexpected TCP send error: %s\n", SOCKERRSTR (localError) ); + this->cacRef.printf ( "CAC: unexpected TCP send error: %s\n", + SOCKERRSTR ( localError ) ); } - this->state = iiu_disconnected; + this->cacRef.initiateAbortShutdown ( *this ); nBytes = 0u; break; } @@ -160,7 +177,8 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf ) { - if ( this->state != iiu_connected ) { + if ( this->state != iiucs_connected && + this->state != iiucs_clean_shutdown ) { return 0u; } @@ -172,12 +190,12 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf ) int localErrno = SOCKERRNO; if ( status == 0 ) { - this->state = iiu_disconnected; + this->cacRef.initiateAbortShutdown ( *this ); return 0u; } if ( localErrno == SOCK_SHUTDOWN ) { - this->state = iiu_disconnected; + this->cacRef.initiateAbortShutdown ( *this ); return 0u; } @@ -186,12 +204,12 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf ) } if ( localErrno == SOCK_ECONNABORTED ) { - this->state = iiu_disconnected; + this->cacRef.initiateAbortShutdown ( *this ); return 0u; } if ( localErrno == SOCK_ECONNRESET ) { - this->state = iiu_disconnected; + this->cacRef.initiateAbortShutdown ( *this ); return 0u; } @@ -202,7 +220,7 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf ) name, SOCKERRSTR ( localErrno ) ); } - this->cleanShutdown (); + this->cacRef.initiateAbortShutdown ( *this ); return 0u; } @@ -225,6 +243,11 @@ void tcpRecvThread::start () this->thread.start (); } +void tcpRecvThread::exitWait () +{ + this->thread.exitWait (); +} + void tcpRecvThread::run () { try { @@ -234,15 +257,16 @@ void tcpRecvThread::run () this->iiu.connect (); - if ( this->iiu.state == iiu_connected ) { + if ( this->iiu.state == tcpiiu::iiucs_connected ) { this->iiu.sendThread.start (); } else { - this->iiu.cleanShutdown (); + return; } comBuf * pComBuf = new comBuf; - while ( this->iiu.state == iiu_connected ) { + while ( this->iiu.state == tcpiiu::iiucs_connected || + this->iiu.state == tcpiiu::iiucs_clean_shutdown ) { if ( ! pComBuf ) { pComBuf = new comBuf; } @@ -267,7 +291,8 @@ void tcpRecvThread::run () nBytesIn = 0u; // make gnu hoppy } - if ( this->iiu.state != iiu_connected ) { + if ( this->iiu.state != tcpiiu::iiucs_connected && + this->iiu.state != tcpiiu::iiucs_clean_shutdown ) { break; } @@ -317,7 +342,7 @@ void tcpRecvThread::run () // execute receive labor bool noProtocolViolation = this->iiu.processIncoming ( guard ); if ( ! noProtocolViolation ) { - this->iiu.state = iiu_disconnected; + this->iiu.cacRef.initiateAbortShutdown ( this->iiu ); break; } @@ -346,15 +371,9 @@ void tcpRecvThread::run () if ( pComBuf ) { pComBuf->destroy (); } - - this->iiu.stopThreads (); - - // arrange for delete through the timer thread - this->iiu.killTimer.start (); } catch ( ... ) { errlogPrintf ("cac tcp receive thread terminating due to a c++ exception\n" ); - this->iiu.killTimer.start (); } } @@ -375,9 +394,8 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, cac::lowestPriorityLevelAbove ( cac::lowestPriorityLevelAbove ( cac.getInitializingThreadsPriority() ) ) ), - recvDog ( *this, connectionTimeout, timerQueue ), - sendDog ( *this, connectionTimeout, timerQueue ), - killTimer ( cac, *this, timerQueue ), + recvDog ( cac, *this, connectionTimeout, timerQueue ), + sendDog ( cac, *this, connectionTimeout, timerQueue ), sendQue ( *this ), curDataMax ( MAX_TCP ), curDataBytes ( 0ul ), @@ -385,7 +403,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, cacRef ( cac ), pCurData ( cac.allocateSmallBufferTCP () ), minorProtocolVersion ( minorVersion ), - state ( iiu_connecting ), + state ( iiucs_connecting ), sock ( INVALID_SOCKET ), contigRecvMsgCount ( 0u ), blockingForFlush ( 0u ), @@ -396,7 +414,6 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, echoRequestPending ( false ), oldMsgHeaderAvailable ( false ), msgHeaderAvailable ( false ), - sockCloseCompleted ( false ), earlyFlush ( false ), recvProcessPostponedFlush ( false ) { @@ -505,9 +522,9 @@ void tcpiiu::connect () epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() ); - if ( this->state == iiu_connecting ) { + if ( this->state == iiucs_connecting ) { // put the iiu into the connected state - this->state = iiu_connected; + this->state = iiucs_connected; // start connection activity watchdog this->recvDog.connectNotify (); @@ -519,7 +536,7 @@ void tcpiiu::connect () int errnoCpy = SOCKERRNO; if ( errnoCpy == SOCK_EINTR ) { - if ( this->state != iiu_connecting ) { + if ( this->state != iiucs_connecting ) { this->sendDog.cancel (); return; } @@ -533,69 +550,48 @@ void tcpiiu::connect () this->sendDog.cancel (); this->printf ( "Unable to connect because %d=\"%s\"\n", errnoCpy, SOCKERRSTR ( errnoCpy ) ); - this->cleanShutdown (); + this->cacRef.initiateAbortShutdown ( *this ); return; } } } -void tcpiiu::cleanShutdown () +void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard, + epicsGuard < cacMutex > & guard ) { - this->cacRef.tcpCircuitShutdown ( *this, false ); -} - -void tcpiiu::forcedShutdown () -{ - this->cacRef.tcpCircuitShutdown ( *this, true ); -} - -// -// tcpiiu::shutdown () -// -void tcpiiu::shutdown ( epicsGuard < callbackMutex > & cbGuard, - epicsGuard < cacMutex > & guard, - bool discardPendingMessages ) -{ - if ( ! this->sockCloseCompleted ) { - - this->sockCloseCompleted = true; - + if ( this->state != iiucs_abort_shutdown ) { + this->state = iiucs_abort_shutdown; { epicsGuardRelease < cacMutex > guardRelease ( guard ); this->cacRef.notifyDestroyFD ( cbGuard, this->sock ); } - iiu_conn_state oldState = this->state; - this->state = iiu_disconnected; - - if ( discardPendingMessages ) { - // force abortive shutdown sequence - // (discard outstanding sends and receives) - struct linger tmpLinger; - tmpLinger.l_onoff = true; - tmpLinger.l_linger = 0u; - int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER, - reinterpret_cast ( &tmpLinger ), sizeof (tmpLinger) ); - if ( status != 0 ) { - errlogPrintf ( "CAC TCP socket linger set error was %s\n", - SOCKERRSTR (SOCKERRNO) ); - } + // force abortive shutdown sequence + // (discard outstanding sends and receives) + struct linger tmpLinger; + tmpLinger.l_onoff = true; + tmpLinger.l_linger = 0u; + int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER, + reinterpret_cast ( &tmpLinger ), sizeof (tmpLinger) ); + if ( status != 0 ) { + errlogPrintf ( "CAC TCP socket linger set error was %s\n", + SOCKERRSTR (SOCKERRNO) ); } + // linux threads in recv() dont wakeup unless we also // call shutdown ( close() by itself is not enough ) - if ( oldState == iiu_connected ) { - int status = ::shutdown ( this->sock, SD_BOTH ); - if ( status ) { - errlogPrintf ("CAC TCP socket shutdown error was %s\n", - SOCKERRSTR (SOCKERRNO) ); - } + status = ::shutdown ( this->sock, SD_BOTH ); + if ( status ) { + errlogPrintf ("CAC TCP socket shutdown error was %s\n", + SOCKERRSTR (SOCKERRNO) ); } + // // on winsock and probably vxWorks shutdown() does not // unblock a thread in recv() so we use close() and introduce // some complexity because we must unregister the fd early // - int status = socket_close ( this->sock ); + status = socket_close ( this->sock ); if ( status ) { errlogPrintf ("CAC TCP socket close error was %s\n", SOCKERRSTR (SOCKERRNO) ); @@ -604,56 +600,28 @@ void tcpiiu::shutdown ( epicsGuard < callbackMutex > & cbGuard, } } -void tcpiiu::stopThreads () -{ - this->cleanShutdown (); - - this->sendDog.cancel (); - this->recvDog.cancel (); - - // wait for send thread to exit - static const double shutdownDelay = 15.0; - while ( ! this->sendThread.exitWait ( shutdownDelay ) ) { - if ( ! this->sockCloseCompleted ) { - printf ( "Gave up waiting for \"shutdown()\" to force send thread to exit after %f sec\n", - shutdownDelay ); - printf ( "Closing socket\n" ); - int status = socket_close ( this->sock ); - if ( status ) { - errlogPrintf ("CAC TCP socket close error was %s\n", - SOCKERRSTR ( SOCKERRNO ) ); - } - else { - this->sockCloseCompleted = true; - } - } - } - - // wakeup user threads blocking for send backlog to be reduced - // and wait for them to stop using this IIU - this->flushBlockEvent.signal (); - while ( this->blockingForFlush ) { - epicsThreadSleep ( 0.1 ); - } - - this->sendDog.cancel (); - this->recvDog.cancel (); -} - // // tcpiiu::~tcpiiu () // tcpiiu::~tcpiiu () { - if ( ! this->sockCloseCompleted ) { + { + epicsGuard < cacMutex > guard ( this->cacRef.mutexRef() ); + if ( this->state == iiucs_connected || this->state == iiucs_connecting ) { + this->state = iiucs_clean_shutdown; + } + } + + this->sendThreadFlushEvent.signal (); + this->sendThread.exitWait (); + this->recvThread.exitWait (); + + if ( this->state != this->iiucs_abort_shutdown ) { int status = socket_close ( this->sock ); if ( status ) { errlogPrintf ("CAC TCP socket close error was %s\n", SOCKERRSTR ( SOCKERRNO ) ); } - else { - this->sockCloseCompleted = true; - } } // free message body cache @@ -1168,19 +1136,13 @@ bool tcpiiu::flush () // ~tcpiiu() will not return while this->blockingForFlush is greater than zero void tcpiiu::blockUntilSendBacklogIsReasonable ( - epicsGuard < callbackMutex > *pCallbackLocker, epicsGuard < cacMutex > & primaryLocker ) + cacNotify & notify, epicsGuard < cacMutex > & primaryLocker ) { assert ( this->blockingForFlush < UINT_MAX ); this->blockingForFlush++; - while ( this->sendQue.flushBlockThreshold(0u) && this->state == iiu_connected ) { + while ( this->sendQue.flushBlockThreshold(0u) && this->state == iiucs_connected ) { epicsGuardRelease < cacMutex > autoRelease ( primaryLocker ); - if ( pCallbackLocker ) { - epicsGuardRelease < callbackMutex > autoReleaseCallback ( *pCallbackLocker ); - this->flushBlockEvent.wait (); - } - else { - this->flushBlockEvent.wait (); - } + notify.blockForEventAndEnableCallbacks ( this->flushBlockEvent, 30.0 ); } assert ( this->blockingForFlush > 0u ); this->blockingForFlush--; @@ -1256,12 +1218,15 @@ void tcpiiu::installChannel ( epicsGuard < cacMutex > &, nciu & chan, unsigned s this->flushRequest (); } -void tcpiiu::uninstallChannel ( epicsGuard < callbackMutex > & cbGuard, - epicsGuard < cacMutex > & guard, nciu & chan ) +tcpiiu * tcpiiu::uninstallChanAndReturnDestroyPtr + ( epicsGuard < cacMutex > & guard, nciu & chan ) { this->channelList.remove ( chan ); - if ( this->channelList.count() == 0u ) { - this->shutdown ( cbGuard, guard, false ); + if ( channelList.count() == 0 ) { + return this; + } + else { + return 0; } }