diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 2cab0c40a..80d8d48ea 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -88,7 +88,7 @@ void tcpiiu::connect () * attempt to connect to a CA server */ this->armSendWatchdog (); - while (1) { + while ( ! this->sockCloseCompleted ) { int errnoCpy; osiSockAddr addr = this->ipToA.address (); @@ -211,6 +211,10 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, unsigned nBytesInBuf ) int status; unsigned nBytes; + if ( this->state != iiu_connected ) { + return 0u; + } + assert ( nBytesInBuf <= INT_MAX ); this->clientCtx ().enableCallbackPreemption (); this->armSendWatchdog (); @@ -388,7 +392,8 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn, recvMessagePending ( false ), flushPending ( false ), msgHeaderAvailable ( false ), - claimsPending ( false ) + claimsPending ( false ), + sockCloseCompleted ( false ) { SOCKET newSocket; int status; @@ -532,6 +537,9 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn, */ void tcpiiu::cleanShutdown () { + this->cancelSendWatchdog (); + this->cancelRecvWatchdog (); + this->lock (); if ( this->state != iiu_disconnected ) { this->state = iiu_disconnected; @@ -545,7 +553,7 @@ void tcpiiu::cleanShutdown () SOCKERRSTR (SOCKERRNO) ); } else { - this->sock = INVALID_SOCKET; + this->sockCloseCompleted = true; } } semBinaryGive ( this->sendThreadFlushSignal ); @@ -560,15 +568,18 @@ void tcpiiu::cleanShutdown () */ void tcpiiu::forcedShutdown () { + this->cancelSendWatchdog (); + this->cancelRecvWatchdog (); + this->lock (); if ( this->state != iiu_disconnected ) { this->state = iiu_disconnected; + // force abortive shutdown sequence (discard outstanding sends + // and receives struct linger tmpLinger; tmpLinger.l_onoff = true; tmpLinger.l_linger = 0u; - - // force abortive clousure int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER, reinterpret_cast ( &tmpLinger ), sizeof (tmpLinger) ); if ( status != 0 ) { @@ -582,7 +593,7 @@ void tcpiiu::forcedShutdown () SOCKERRSTR (SOCKERRNO) ); } else { - this->sock = INVALID_SOCKET; + this->sockCloseCompleted = true; } semBinaryGive ( this->sendThreadFlushSignal ); @@ -616,9 +627,51 @@ tcpiiu::~tcpiiu () this->cleanShutdown (); - // wait for send and recv threads to exit - semBinaryMustTake ( this->sendThreadExitSignal ); - semBinaryMustTake ( this->recvThreadExitSignal ); + // wait for send thread to exit + static const double shutdownDelay = 15.0; + semTakeStatus semStat; + while ( true ) { + semStat = semBinaryTakeTimeout ( this->sendThreadExitSignal, shutdownDelay ); + if ( semStat == semTakeOK ) { + break; + } + assert ( semStat == semTakeTimeout ); + if ( ! this->sockCloseCompleted ) { + printf ( "Gave up waiting for \"shutdown()\" to force send thread to exit after %f sec\n", + shutdownDelay); + printf ( "Closing socket\n" ); + int status = socket_close ( this->sock ); + if ( status ) { + errlogPrintf ("CAC TCP socket close error was %s\n", + SOCKERRSTR ( SOCKERRNO ) ); + } + else { + this->sockCloseCompleted = true; + } + } + } + + // wait for recv thread to exit + while ( true ) { + semStat = semBinaryTakeTimeout ( this->recvThreadExitSignal, shutdownDelay ); + if ( semStat == semTakeOK ) { + break; + } + assert ( semStat == semTakeTimeout ); + if ( ! this->sockCloseCompleted ) { + printf ( "Gave up waiting for \"shutdown()\" to force receive thread to exit after %f sec\n", + shutdownDelay); + printf ( "Closing socket\n" ); + int status = socket_close ( this->sock ); + if ( status ) { + errlogPrintf ("CAC TCP socket close error was %s\n", + SOCKERRSTR ( SOCKERRNO ) ); + } + else { + this->sockCloseCompleted = true; + } + } + } semBinaryDestroy ( this->sendThreadExitSignal ); semBinaryDestroy ( this->recvThreadExitSignal ); @@ -632,7 +685,7 @@ tcpiiu::~tcpiiu () free ( this->pCurData ); } - if ( this->sock != INVALID_SOCKET ) { + if ( ! this->sockCloseCompleted ) { int status = socket_close ( this->sock ); if ( status ) { errlogPrintf ("CAC TCP socket close error was %s\n",