improved shutdown behavior
This commit is contained in:
@@ -88,7 +88,7 @@ void tcpiiu::connect ()
|
||||
* attempt to connect to a CA server
|
||||
*/
|
||||
this->armSendWatchdog ();
|
||||
while (1) {
|
||||
while ( ! this->sockCloseCompleted ) {
|
||||
int errnoCpy;
|
||||
|
||||
osiSockAddr addr = this->ipToA.address ();
|
||||
@@ -211,6 +211,10 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, unsigned nBytesInBuf )
|
||||
int status;
|
||||
unsigned nBytes;
|
||||
|
||||
if ( this->state != iiu_connected ) {
|
||||
return 0u;
|
||||
}
|
||||
|
||||
assert ( nBytesInBuf <= INT_MAX );
|
||||
this->clientCtx ().enableCallbackPreemption ();
|
||||
this->armSendWatchdog ();
|
||||
@@ -388,7 +392,8 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn,
|
||||
recvMessagePending ( false ),
|
||||
flushPending ( false ),
|
||||
msgHeaderAvailable ( false ),
|
||||
claimsPending ( false )
|
||||
claimsPending ( false ),
|
||||
sockCloseCompleted ( false )
|
||||
{
|
||||
SOCKET newSocket;
|
||||
int status;
|
||||
@@ -532,6 +537,9 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn,
|
||||
*/
|
||||
void tcpiiu::cleanShutdown ()
|
||||
{
|
||||
this->cancelSendWatchdog ();
|
||||
this->cancelRecvWatchdog ();
|
||||
|
||||
this->lock ();
|
||||
if ( this->state != iiu_disconnected ) {
|
||||
this->state = iiu_disconnected;
|
||||
@@ -545,7 +553,7 @@ void tcpiiu::cleanShutdown ()
|
||||
SOCKERRSTR (SOCKERRNO) );
|
||||
}
|
||||
else {
|
||||
this->sock = INVALID_SOCKET;
|
||||
this->sockCloseCompleted = true;
|
||||
}
|
||||
}
|
||||
semBinaryGive ( this->sendThreadFlushSignal );
|
||||
@@ -560,15 +568,18 @@ void tcpiiu::cleanShutdown ()
|
||||
*/
|
||||
void tcpiiu::forcedShutdown ()
|
||||
{
|
||||
this->cancelSendWatchdog ();
|
||||
this->cancelRecvWatchdog ();
|
||||
|
||||
this->lock ();
|
||||
if ( this->state != iiu_disconnected ) {
|
||||
this->state = iiu_disconnected;
|
||||
|
||||
// force abortive shutdown sequence (discard outstanding sends
|
||||
// and receives
|
||||
struct linger tmpLinger;
|
||||
tmpLinger.l_onoff = true;
|
||||
tmpLinger.l_linger = 0u;
|
||||
|
||||
// force abortive clousure
|
||||
int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER,
|
||||
reinterpret_cast <char *> ( &tmpLinger ), sizeof (tmpLinger) );
|
||||
if ( status != 0 ) {
|
||||
@@ -582,7 +593,7 @@ void tcpiiu::forcedShutdown ()
|
||||
SOCKERRSTR (SOCKERRNO) );
|
||||
}
|
||||
else {
|
||||
this->sock = INVALID_SOCKET;
|
||||
this->sockCloseCompleted = true;
|
||||
}
|
||||
|
||||
semBinaryGive ( this->sendThreadFlushSignal );
|
||||
@@ -616,9 +627,51 @@ tcpiiu::~tcpiiu ()
|
||||
|
||||
this->cleanShutdown ();
|
||||
|
||||
// wait for send and recv threads to exit
|
||||
semBinaryMustTake ( this->sendThreadExitSignal );
|
||||
semBinaryMustTake ( this->recvThreadExitSignal );
|
||||
// wait for send thread to exit
|
||||
static const double shutdownDelay = 15.0;
|
||||
semTakeStatus semStat;
|
||||
while ( true ) {
|
||||
semStat = semBinaryTakeTimeout ( this->sendThreadExitSignal, shutdownDelay );
|
||||
if ( semStat == semTakeOK ) {
|
||||
break;
|
||||
}
|
||||
assert ( semStat == semTakeTimeout );
|
||||
if ( ! this->sockCloseCompleted ) {
|
||||
printf ( "Gave up waiting for \"shutdown()\" to force send thread to exit after %f sec\n",
|
||||
shutdownDelay);
|
||||
printf ( "Closing socket\n" );
|
||||
int status = socket_close ( this->sock );
|
||||
if ( status ) {
|
||||
errlogPrintf ("CAC TCP socket close error was %s\n",
|
||||
SOCKERRSTR ( SOCKERRNO ) );
|
||||
}
|
||||
else {
|
||||
this->sockCloseCompleted = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// wait for recv thread to exit
|
||||
while ( true ) {
|
||||
semStat = semBinaryTakeTimeout ( this->recvThreadExitSignal, shutdownDelay );
|
||||
if ( semStat == semTakeOK ) {
|
||||
break;
|
||||
}
|
||||
assert ( semStat == semTakeTimeout );
|
||||
if ( ! this->sockCloseCompleted ) {
|
||||
printf ( "Gave up waiting for \"shutdown()\" to force receive thread to exit after %f sec\n",
|
||||
shutdownDelay);
|
||||
printf ( "Closing socket\n" );
|
||||
int status = socket_close ( this->sock );
|
||||
if ( status ) {
|
||||
errlogPrintf ("CAC TCP socket close error was %s\n",
|
||||
SOCKERRSTR ( SOCKERRNO ) );
|
||||
}
|
||||
else {
|
||||
this->sockCloseCompleted = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
semBinaryDestroy ( this->sendThreadExitSignal );
|
||||
semBinaryDestroy ( this->recvThreadExitSignal );
|
||||
@@ -632,7 +685,7 @@ tcpiiu::~tcpiiu ()
|
||||
free ( this->pCurData );
|
||||
}
|
||||
|
||||
if ( this->sock != INVALID_SOCKET ) {
|
||||
if ( ! this->sockCloseCompleted ) {
|
||||
int status = socket_close ( this->sock );
|
||||
if ( status ) {
|
||||
errlogPrintf ("CAC TCP socket close error was %s\n",
|
||||
|
||||
Reference in New Issue
Block a user