From 4794bf11bc203f4e4277ced3cb72aaef98db7160 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Thu, 24 Apr 2003 16:39:31 +0000 Subject: [PATCH] cleaned up shutdown procedure --- src/ca/cac.cpp | 11 ++- src/ca/tcpiiu.cpp | 178 ++++++++++++++++++++++++++-------------------- 2 files changed, 111 insertions(+), 78 deletions(-) diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 113a43aa0..72be0704e 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -161,6 +161,13 @@ cac::cac ( cacNotify & notifyIn ) : try { long status; + /* + * Certain os, such as HPUX, do not unblock a socket system call + * when another thread asynchronously calls both shutdown() and + * close(). To solve this problem we need to employ OS specific + * mechanisms. + */ + epicsSignalInstallSigUrgIgnore (); epicsSignalInstallSigPipeIgnore (); { @@ -1507,6 +1514,8 @@ void cac::initiateAbortShutdown ( tcpiiu & iiu ) epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); epicsGuard < cacMutex > guard ( this->mutex ); + iiu.initiateAbortShutdown ( cbGuard, guard ); + // Disconnect all channels immediately from the timer thread // because on certain OS such as HPUX it's difficult to // unblock a blocking send() call, and we need immediate @@ -1517,8 +1526,6 @@ void cac::initiateAbortShutdown ( tcpiiu & iiu ) genLocalExcep ( cbGuard, *this, ECA_DISCONN, hostNameTmp ); } iiu.removeAllChannels ( cbGuard, guard, *this ); - - iiu.initiateAbortShutdown ( cbGuard, guard ); } void cac::uninstallIIU ( epicsGuard < callbackMutex > & cbGuard, tcpiiu & iiu ) diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index f78872b5e..698093751 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -36,6 +36,7 @@ #include "hostNameCache.h" #include "net_convert.h" #include "bhe.h" +#include "epicsSignal.h" const unsigned mSecPerSec = 1000u; const unsigned uSecPerSec = 1000u * mSecPerSec; @@ -65,7 +66,6 @@ void tcpSendThread::exitWait () void tcpSendThread::run () { try { - epicsEnableInterruptedSystemCall (); while ( true ) { bool flowControlLaborNeeded; @@ -113,21 +113,36 @@ void tcpSendThread::run () } if ( this->iiu.state == tcpiiu::iiucs_clean_shutdown ) { this->iiu.flush (); + // this should cause the server to disconnect from + // the client + int status = ::shutdown ( this->iiu.sock, SHUT_WR ); + if ( status ) { + char sockErrBuf[64]; + epicsSocketConvertErrnoToString ( + sockErrBuf, sizeof ( sockErrBuf ) ); + errlogPrintf ("CAC TCP clean socket shutdown error was %s\n", + sockErrBuf ); + } } } catch ( ... ) { this->iiu.printf ( "cac: tcp send thread received an unexpected exception ", "- disconnecting\n"); + // this should cause the server to disconnect from + // the client + int status = ::shutdown ( this->iiu.sock, SHUT_WR ); + if ( status ) { + char sockErrBuf[64]; + epicsSocketConvertErrnoToString ( + sockErrBuf, sizeof ( sockErrBuf ) ); + errlogPrintf ("CAC TCP clean socket shutdown error was %s\n", + sockErrBuf ); + } } this->iiu.sendDog.cancel (); - { - epicsGuard < cacMutex > guard ( this->iiu.cacRef.mutexRef() ); - this->iiu.shutdown ( guard ); - } - // wakeup user threads blocking for send backlog to be reduced // and wait for them to stop using this IIU this->iiu.flushBlockEvent.signal (); @@ -185,7 +200,7 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, localError != SOCK_ECONNABORTED && localError != SOCK_SHUTDOWN ) { char sockErrBuf[64]; - convertSocketErrorToString ( + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); this->cacRef.printf ( "CAC: unexpected TCP send error: %s\n", sockErrBuf ); @@ -256,7 +271,7 @@ unsigned tcpiiu::recvBytes ( void * pBuf, unsigned nBytesInBuf ) char name[64]; this->hostName ( name, sizeof ( name ) ); char sockErrBuf[64]; - convertSocketErrorToString ( + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); this->printf ( "Unexpected problem with circuit to CA ", "server \"%s\" was \"%s\" - disconnecting\n", @@ -294,8 +309,6 @@ void tcpRecvThread::exitWait () void tcpRecvThread::run () { try { - epicsEnableInterruptedSystemCall (); - this->iiu.cacRef.attachToClientCtx (); epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu ); @@ -398,15 +411,6 @@ void tcpRecvThread::run () "CA client library tcp receive thread " "terminating due to a C++ exception\n" ); } - - // Although this is redundant in certain situations it is - // required because the receive thread must hang around - // until it receives its blocking socket call interrupt - // signal. - { - epicsGuard < cacMutex > guard ( this->iiu.cacRef.mutexRef() ); - this->iiu.shutdown ( guard ); - } } // @@ -450,12 +454,13 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, msgHeaderAvailable ( false ), earlyFlush ( false ), recvProcessPostponedFlush ( false ), - discardingPendingData ( false ) + discardingPendingData ( false ), + socketHasBeenClosed ( false ) { this->sock = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if ( this->sock == INVALID_SOCKET ) { char sockErrBuf[64]; - convertSocketErrorToString ( + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); this->printf ( "CAC: unable to create virtual circuit because \"%s\"\n", sockErrBuf ); @@ -468,7 +473,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, (char *) &flag, sizeof ( flag ) ); if ( status < 0 ) { char sockErrBuf[64]; - convertSocketErrorToString ( + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); this->printf ( "CAC: problems setting socket option TCP_NODELAY = \"%s\"\n", sockErrBuf ); @@ -479,7 +484,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, ( char * ) &flag, sizeof ( flag ) ); if ( status < 0 ) { char sockErrBuf[64]; - convertSocketErrorToString ( + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); this->printf ( "CAC: problems setting socket option SO_KEEPALIVE = \"%s\"\n", sockErrBuf ); @@ -507,7 +512,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, ( char * ) &i, sizeof ( i ) ); if (status < 0) { char sockErrBuf[64]; - convertSocketErrorToString ( sockErrBuf, sizeof ( sockErrBuf ) ); + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); this->printf ("CAC: problems setting socket option SO_SNDBUF = \"%s\"\n", sockErrBuf ); } @@ -516,7 +521,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, ( char * ) &i, sizeof ( i ) ); if ( status < 0 ) { char sockErrBuf[64]; - convertSocketErrorToString ( sockErrBuf, sizeof ( sockErrBuf ) ); + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); this->printf ( "CAC: problems setting socket option SO_RCVBUF = \"%s\"\n", sockErrBuf ); } @@ -531,7 +536,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, if ( status < 0 || nBytes < 0 || sizeOfParameter != static_cast < int > ( sizeof ( nBytes ) ) ) { char sockErrBuf[64]; - convertSocketErrorToString ( + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); this->printf ("CAC: problems getting socket option SO_SNDBUF = \"%s\"\n", sockErrBuf ); @@ -612,7 +617,7 @@ void tcpiiu::connect () } else { char sockErrBuf[64]; - convertSocketErrorToString ( + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); this->printf ( "Unable to connect because \"%s\"\n", sockErrBuf ); @@ -626,17 +631,15 @@ void tcpiiu::connect () void tcpiiu::initiateCleanShutdown ( epicsGuard < cacMutex > & ) { - if ( this->state == iiucs_connected || this->state == iiucs_connecting ) { + if ( this->state == iiucs_connected ) { this->state = iiucs_clean_shutdown; + this->sendThreadFlushEvent.signal (); } - this->sendThreadFlushEvent.signal (); } void tcpiiu::disconnectNotify ( epicsGuard < cacMutex > & ) { - if ( this->state == iiucs_connected || this->state == iiucs_connecting ) { - this->state = iiucs_disconnected; - } + this->state = iiucs_disconnected; this->sendThreadFlushEvent.signal (); } @@ -653,56 +656,60 @@ void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard, // reinterpret_cast ( &tmpLinger ), sizeof (tmpLinger) ); if ( status != 0 ) { char sockErrBuf[64]; - convertSocketErrorToString ( + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ( "CAC TCP socket linger set error was %s\n", sockErrBuf ); } this->discardingPendingData = true; } - this->shutdown ( guard ); -} - -void tcpiiu::shutdown ( epicsGuard & guard ) // X aCC 431 -{ iiu_conn_state oldState = this->state; - if ( oldState != iiucs_abort_shutdown ) { + if ( oldState != iiucs_abort_shutdown && oldState != iiucs_disconnected ) { this->state = iiucs_abort_shutdown; - // - // on HPUX close() and shutdown() are not enough so we must also - // throw signals to interrupt the threads that may be in the - // send() and recv() system calls. - // - this->recvThread.interruptSocketRecv (); - this->sendThread.interruptSocketSend (); - - // linux threads in recv() dont wakeup unless we also - // call shutdown ( close() by itself is not enough ) - if ( oldState == iiucs_connected ) { - int status = ::shutdown ( this->sock, SHUT_RDWR ); - if ( status ) { - char sockErrBuf[64]; - convertSocketErrorToString ( - sockErrBuf, sizeof ( sockErrBuf ) ); - errlogPrintf ("CAC TCP socket shutdown error was %s\n", - sockErrBuf ); + epicsSocketSystemCallInterruptMechanismQueryInfo info = + epicsSocketSystemCallInterruptMechanismQuery (); + switch ( info ) { + case esscimqi_socketCloseRequired: + // + // on winsock and probably vxWorks shutdown() does not + // unblock a thread in recv() so we use close() and introduce + // some complexity because we must unregister the fd early + // + if ( ! this->socketHasBeenClosed ) { + int status = socket_close ( this->sock ); + if ( status ) { + char sockErrBuf[64]; + epicsSocketConvertErrnoToString ( + sockErrBuf, sizeof ( sockErrBuf ) ); + errlogPrintf ("CAC TCP socket close error was %s\n", + sockErrBuf ); + } + else { + this->socketHasBeenClosed = true; + } } - } - - // - // on winsock and probably vxWorks shutdown() does not - // unblock a thread in recv() so we use close() and introduce - // some complexity because we must unregister the fd early - // - int status = socket_close ( this->sock ); - if ( status ) { - char sockErrBuf[64]; - convertSocketErrorToString ( - sockErrBuf, sizeof ( sockErrBuf ) ); - errlogPrintf ("CAC TCP socket close error was %s\n", - sockErrBuf ); - } + break; + case esscimqi_socketBothShutdownRequired: + { + int status = ::shutdown ( this->sock, SHUT_RDWR ); + if ( status ) { + char sockErrBuf[64]; + epicsSocketConvertErrnoToString ( + sockErrBuf, sizeof ( sockErrBuf ) ); + errlogPrintf ("CAC TCP socket shutdown error was %s\n", + sockErrBuf ); + } + } + break; + case esscimqi_socketSigurgRequired: + this->recvThread.interruptSocketRecv (); + this->sendThread.interruptSocketSend (); + break; + case esscimqi_shuechanismImplemenedHerein: + default: + break; + }; // // wake up the send thread if it isnt blocking in send() @@ -719,11 +726,11 @@ tcpiiu::~tcpiiu () this->sendThread.exitWait (); this->recvThread.exitWait (); - if ( this->state != this->iiucs_abort_shutdown ) { + if ( ! this->socketHasBeenClosed ) { int status = socket_close ( this->sock ); if ( status ) { char sockErrBuf[64]; - convertSocketErrorToString ( + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ("CAC TCP socket close error was %s\n", sockErrBuf ); @@ -1429,7 +1436,7 @@ void tcpiiu::blockUntilBytesArePendingInOS () char name[64]; this->hostName ( name, sizeof ( name ) ); char sockErrBuf[64]; - convertSocketErrorToString ( + epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); this->printf ( "Unexpected problem with circuit to CA server \"%s\" was \"%s\" - disconnecting\n", name, sockErrBuf ); @@ -1476,6 +1483,25 @@ double tcpiiu::receiveWatchdogDelay () const return this->recvDog.delay (); } - +/* + * Certain OS, such as HPUX, do not unblock a socket system call + * when another thread asynchronously calls both shutdown() and + * close(). To solve this problem we need to employ OS specific + * mechanisms. + */ +void tcpRecvThread::interruptSocketRecv () +{ + epicsThreadId threadId = this->thread.getId (); + if ( threadId ) { + epicsSignalRaiseSigUrg ( threadId ); + } +} +void tcpSendThread::interruptSocketSend () +{ + epicsThreadId threadId = this->thread.getId (); + if ( threadId ) { + epicsSignalRaiseSigUrg ( threadId ); + } +}