improved shutdown sequence

This commit is contained in:
Jeff Hill
2002-05-08 23:29:57 +00:00
parent 6729deed11
commit 8289f8550b
+95 -130
View File
@@ -43,9 +43,9 @@ void tcpSendThread::start ()
this->thread.start ();
}
bool tcpSendThread::exitWait ( double delay )
void tcpSendThread::exitWait ()
{
return thread.exitWait ( delay );
this->thread.exitWait ();
}
void tcpSendThread::run ()
@@ -57,7 +57,7 @@ void tcpSendThread::run ()
this->iiu.sendThreadFlushEvent.wait ();
if ( this->iiu.state != iiu_connected ) {
if ( this->iiu.state != tcpiiu::iiucs_connected ) {
break;
}
@@ -95,10 +95,24 @@ void tcpSendThread::run ()
break;
}
}
if ( this->iiu.state == tcpiiu::iiucs_clean_shutdown ) {
this->iiu.flush ();
}
}
catch ( ... ) {
this->iiu.printf ("cac: tcp send thread received an unexpected exception - disconnecting\n");
this->iiu.forcedShutdown ();
this->iiu.printf (
"cac: tcp send thread received an unexpected exception - disconnecting\n");
}
this->iiu.sendDog.cancel ();
this->iiu.cacRef.initiateAbortShutdown ( this->iiu );
// wakeup user threads blocking for send backlog to be reduced
// and wait for them to stop using this IIU
this->iiu.flushBlockEvent.signal ();
while ( this->iiu.blockingForFlush ) {
epicsThreadSleep ( 0.1 );
}
}
@@ -108,7 +122,8 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
int status;
unsigned nBytes = 0u;
if ( this->state != iiu_connected ) {
if ( this->state != iiucs_connected &&
this->state != iiucs_clean_shutdown ) {
return 0u;
}
@@ -128,12 +143,13 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
// winsock indicates disconnect by returniing zero here
if ( status == 0 ) {
this->state = iiu_disconnected;
this->cacRef.initiateAbortShutdown ( *this );
nBytes = 0u;
break;
}
if ( localError == SOCK_SHUTDOWN ) {
this->cacRef.initiateAbortShutdown ( *this );
nBytes = 0u;
break;
}
@@ -144,10 +160,11 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
if ( localError != SOCK_EPIPE && localError != SOCK_ECONNRESET &&
localError != SOCK_ETIMEDOUT && localError != SOCK_ECONNABORTED ) {
this->cacRef.printf ( "CAC: unexpected TCP send error: %s\n", SOCKERRSTR (localError) );
this->cacRef.printf ( "CAC: unexpected TCP send error: %s\n",
SOCKERRSTR ( localError ) );
}
this->state = iiu_disconnected;
this->cacRef.initiateAbortShutdown ( *this );
nBytes = 0u;
break;
}
@@ -160,7 +177,8 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf )
{
if ( this->state != iiu_connected ) {
if ( this->state != iiucs_connected &&
this->state != iiucs_clean_shutdown ) {
return 0u;
}
@@ -172,12 +190,12 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf )
int localErrno = SOCKERRNO;
if ( status == 0 ) {
this->state = iiu_disconnected;
this->cacRef.initiateAbortShutdown ( *this );
return 0u;
}
if ( localErrno == SOCK_SHUTDOWN ) {
this->state = iiu_disconnected;
this->cacRef.initiateAbortShutdown ( *this );
return 0u;
}
@@ -186,12 +204,12 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf )
}
if ( localErrno == SOCK_ECONNABORTED ) {
this->state = iiu_disconnected;
this->cacRef.initiateAbortShutdown ( *this );
return 0u;
}
if ( localErrno == SOCK_ECONNRESET ) {
this->state = iiu_disconnected;
this->cacRef.initiateAbortShutdown ( *this );
return 0u;
}
@@ -202,7 +220,7 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf )
name, SOCKERRSTR ( localErrno ) );
}
this->cleanShutdown ();
this->cacRef.initiateAbortShutdown ( *this );
return 0u;
}
@@ -225,6 +243,11 @@ void tcpRecvThread::start ()
this->thread.start ();
}
void tcpRecvThread::exitWait ()
{
this->thread.exitWait ();
}
void tcpRecvThread::run ()
{
try {
@@ -234,15 +257,16 @@ void tcpRecvThread::run ()
this->iiu.connect ();
if ( this->iiu.state == iiu_connected ) {
if ( this->iiu.state == tcpiiu::iiucs_connected ) {
this->iiu.sendThread.start ();
}
else {
this->iiu.cleanShutdown ();
return;
}
comBuf * pComBuf = new comBuf;
while ( this->iiu.state == iiu_connected ) {
while ( this->iiu.state == tcpiiu::iiucs_connected ||
this->iiu.state == tcpiiu::iiucs_clean_shutdown ) {
if ( ! pComBuf ) {
pComBuf = new comBuf;
}
@@ -267,7 +291,8 @@ void tcpRecvThread::run ()
nBytesIn = 0u; // make gnu hoppy
}
if ( this->iiu.state != iiu_connected ) {
if ( this->iiu.state != tcpiiu::iiucs_connected &&
this->iiu.state != tcpiiu::iiucs_clean_shutdown ) {
break;
}
@@ -317,7 +342,7 @@ void tcpRecvThread::run ()
// execute receive labor
bool noProtocolViolation = this->iiu.processIncoming ( guard );
if ( ! noProtocolViolation ) {
this->iiu.state = iiu_disconnected;
this->iiu.cacRef.initiateAbortShutdown ( this->iiu );
break;
}
@@ -346,15 +371,9 @@ void tcpRecvThread::run ()
if ( pComBuf ) {
pComBuf->destroy ();
}
this->iiu.stopThreads ();
// arrange for delete through the timer thread
this->iiu.killTimer.start ();
}
catch ( ... ) {
errlogPrintf ("cac tcp receive thread terminating due to a c++ exception\n" );
this->iiu.killTimer.start ();
}
}
@@ -375,9 +394,8 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
cac::lowestPriorityLevelAbove (
cac::lowestPriorityLevelAbove (
cac.getInitializingThreadsPriority() ) ) ),
recvDog ( *this, connectionTimeout, timerQueue ),
sendDog ( *this, connectionTimeout, timerQueue ),
killTimer ( cac, *this, timerQueue ),
recvDog ( cac, *this, connectionTimeout, timerQueue ),
sendDog ( cac, *this, connectionTimeout, timerQueue ),
sendQue ( *this ),
curDataMax ( MAX_TCP ),
curDataBytes ( 0ul ),
@@ -385,7 +403,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
cacRef ( cac ),
pCurData ( cac.allocateSmallBufferTCP () ),
minorProtocolVersion ( minorVersion ),
state ( iiu_connecting ),
state ( iiucs_connecting ),
sock ( INVALID_SOCKET ),
contigRecvMsgCount ( 0u ),
blockingForFlush ( 0u ),
@@ -396,7 +414,6 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
echoRequestPending ( false ),
oldMsgHeaderAvailable ( false ),
msgHeaderAvailable ( false ),
sockCloseCompleted ( false ),
earlyFlush ( false ),
recvProcessPostponedFlush ( false )
{
@@ -505,9 +522,9 @@ void tcpiiu::connect ()
epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() );
if ( this->state == iiu_connecting ) {
if ( this->state == iiucs_connecting ) {
// put the iiu into the connected state
this->state = iiu_connected;
this->state = iiucs_connected;
// start connection activity watchdog
this->recvDog.connectNotify ();
@@ -519,7 +536,7 @@ void tcpiiu::connect ()
int errnoCpy = SOCKERRNO;
if ( errnoCpy == SOCK_EINTR ) {
if ( this->state != iiu_connecting ) {
if ( this->state != iiucs_connecting ) {
this->sendDog.cancel ();
return;
}
@@ -533,69 +550,48 @@ void tcpiiu::connect ()
this->sendDog.cancel ();
this->printf ( "Unable to connect because %d=\"%s\"\n",
errnoCpy, SOCKERRSTR ( errnoCpy ) );
this->cleanShutdown ();
this->cacRef.initiateAbortShutdown ( *this );
return;
}
}
}
void tcpiiu::cleanShutdown ()
void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard,
epicsGuard < cacMutex > & guard )
{
this->cacRef.tcpCircuitShutdown ( *this, false );
}
void tcpiiu::forcedShutdown ()
{
this->cacRef.tcpCircuitShutdown ( *this, true );
}
//
// tcpiiu::shutdown ()
//
void tcpiiu::shutdown ( epicsGuard < callbackMutex > & cbGuard,
epicsGuard < cacMutex > & guard,
bool discardPendingMessages )
{
if ( ! this->sockCloseCompleted ) {
this->sockCloseCompleted = true;
if ( this->state != iiucs_abort_shutdown ) {
this->state = iiucs_abort_shutdown;
{
epicsGuardRelease < cacMutex > guardRelease ( guard );
this->cacRef.notifyDestroyFD ( cbGuard, this->sock );
}
iiu_conn_state oldState = this->state;
this->state = iiu_disconnected;
if ( discardPendingMessages ) {
// force abortive shutdown sequence
// (discard outstanding sends and receives)
struct linger tmpLinger;
tmpLinger.l_onoff = true;
tmpLinger.l_linger = 0u;
int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER,
reinterpret_cast <char *> ( &tmpLinger ), sizeof (tmpLinger) );
if ( status != 0 ) {
errlogPrintf ( "CAC TCP socket linger set error was %s\n",
SOCKERRSTR (SOCKERRNO) );
}
// force abortive shutdown sequence
// (discard outstanding sends and receives)
struct linger tmpLinger;
tmpLinger.l_onoff = true;
tmpLinger.l_linger = 0u;
int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER,
reinterpret_cast <char *> ( &tmpLinger ), sizeof (tmpLinger) );
if ( status != 0 ) {
errlogPrintf ( "CAC TCP socket linger set error was %s\n",
SOCKERRSTR (SOCKERRNO) );
}
// linux threads in recv() dont wakeup unless we also
// call shutdown ( close() by itself is not enough )
if ( oldState == iiu_connected ) {
int status = ::shutdown ( this->sock, SD_BOTH );
if ( status ) {
errlogPrintf ("CAC TCP socket shutdown error was %s\n",
SOCKERRSTR (SOCKERRNO) );
}
status = ::shutdown ( this->sock, SD_BOTH );
if ( status ) {
errlogPrintf ("CAC TCP socket shutdown error was %s\n",
SOCKERRSTR (SOCKERRNO) );
}
//
// on winsock and probably vxWorks shutdown() does not
// unblock a thread in recv() so we use close() and introduce
// some complexity because we must unregister the fd early
//
int status = socket_close ( this->sock );
status = socket_close ( this->sock );
if ( status ) {
errlogPrintf ("CAC TCP socket close error was %s\n",
SOCKERRSTR (SOCKERRNO) );
@@ -604,56 +600,28 @@ void tcpiiu::shutdown ( epicsGuard < callbackMutex > & cbGuard,
}
}
void tcpiiu::stopThreads ()
{
this->cleanShutdown ();
this->sendDog.cancel ();
this->recvDog.cancel ();
// wait for send thread to exit
static const double shutdownDelay = 15.0;
while ( ! this->sendThread.exitWait ( shutdownDelay ) ) {
if ( ! this->sockCloseCompleted ) {
printf ( "Gave up waiting for \"shutdown()\" to force send thread to exit after %f sec\n",
shutdownDelay );
printf ( "Closing socket\n" );
int status = socket_close ( this->sock );
if ( status ) {
errlogPrintf ("CAC TCP socket close error was %s\n",
SOCKERRSTR ( SOCKERRNO ) );
}
else {
this->sockCloseCompleted = true;
}
}
}
// wakeup user threads blocking for send backlog to be reduced
// and wait for them to stop using this IIU
this->flushBlockEvent.signal ();
while ( this->blockingForFlush ) {
epicsThreadSleep ( 0.1 );
}
this->sendDog.cancel ();
this->recvDog.cancel ();
}
//
// tcpiiu::~tcpiiu ()
//
tcpiiu::~tcpiiu ()
{
if ( ! this->sockCloseCompleted ) {
{
epicsGuard < cacMutex > guard ( this->cacRef.mutexRef() );
if ( this->state == iiucs_connected || this->state == iiucs_connecting ) {
this->state = iiucs_clean_shutdown;
}
}
this->sendThreadFlushEvent.signal ();
this->sendThread.exitWait ();
this->recvThread.exitWait ();
if ( this->state != this->iiucs_abort_shutdown ) {
int status = socket_close ( this->sock );
if ( status ) {
errlogPrintf ("CAC TCP socket close error was %s\n",
SOCKERRSTR ( SOCKERRNO ) );
}
else {
this->sockCloseCompleted = true;
}
}
// free message body cache
@@ -1168,19 +1136,13 @@ bool tcpiiu::flush ()
// ~tcpiiu() will not return while this->blockingForFlush is greater than zero
void tcpiiu::blockUntilSendBacklogIsReasonable (
epicsGuard < callbackMutex > *pCallbackLocker, epicsGuard < cacMutex > & primaryLocker )
cacNotify & notify, epicsGuard < cacMutex > & primaryLocker )
{
assert ( this->blockingForFlush < UINT_MAX );
this->blockingForFlush++;
while ( this->sendQue.flushBlockThreshold(0u) && this->state == iiu_connected ) {
while ( this->sendQue.flushBlockThreshold(0u) && this->state == iiucs_connected ) {
epicsGuardRelease < cacMutex > autoRelease ( primaryLocker );
if ( pCallbackLocker ) {
epicsGuardRelease < callbackMutex > autoReleaseCallback ( *pCallbackLocker );
this->flushBlockEvent.wait ();
}
else {
this->flushBlockEvent.wait ();
}
notify.blockForEventAndEnableCallbacks ( this->flushBlockEvent, 30.0 );
}
assert ( this->blockingForFlush > 0u );
this->blockingForFlush--;
@@ -1256,12 +1218,15 @@ void tcpiiu::installChannel ( epicsGuard < cacMutex > &, nciu & chan, unsigned s
this->flushRequest ();
}
void tcpiiu::uninstallChannel ( epicsGuard < callbackMutex > & cbGuard,
epicsGuard < cacMutex > & guard, nciu & chan )
tcpiiu * tcpiiu::uninstallChanAndReturnDestroyPtr
( epicsGuard < cacMutex > & guard, nciu & chan )
{
this->channelList.remove ( chan );
if ( this->channelList.count() == 0u ) {
this->shutdown ( cbGuard, guard, false );
if ( channelList.count() == 0 ) {
return this;
}
else {
return 0;
}
}