improved shutdown

This commit is contained in:
Jeff Hill
2002-12-13 00:54:34 +00:00
parent 256836fe59
commit e53adb99cf
2 changed files with 116 additions and 111 deletions

View File

@@ -41,8 +41,10 @@ const unsigned mSecPerSec = 1000u;
const unsigned uSecPerSec = 1000u * mSecPerSec;
tcpSendThread::tcpSendThread ( class tcpiiu & iiuIn,
const char * pName, unsigned stackSize, unsigned priority ) :
iiu ( iiuIn ), thread ( *this, pName, stackSize, priority )
callbackMutex & cbMutexIn, const char * pName,
unsigned stackSize, unsigned priority ) :
thread ( *this, pName, stackSize, priority ), iiu ( iiuIn ),
cbMutex ( cbMutexIn )
{
}
@@ -120,7 +122,11 @@ void tcpSendThread::run ()
this->iiu.sendDog.cancel ();
this->iiu.cacRef.initiateAbortShutdown ( this->iiu );
{
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
epicsGuard < cacMutex > guard ( this->iiu.cacRef.mutexRef() );
this->iiu.shutdown ( cbGuard, guard );
}
// wakeup user threads blocking for send backlog to be reduced
// and wait for them to stop using this IIU
@@ -144,16 +150,12 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
int status;
unsigned nBytes = 0u;
if ( this->state != iiucs_connected &&
this->state != iiucs_clean_shutdown ) {
return 0u;
}
assert ( nBytesInBuf <= INT_MAX );
this->sendDog.start ();
while ( true ) {
while ( this->state == iiucs_connected ||
this->state == iiucs_clean_shutdown ) {
status = ::send ( this->sock,
static_cast < const char * > (pBuf), (int) nBytesInBuf, 0 );
if ( status > 0 ) {
@@ -166,13 +168,6 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
// winsock indicates disconnect by returniing zero here
if ( status == 0 ) {
this->cacRef.disconnectNotify ( *this );
nBytes = 0u;
break;
}
if ( localError == SOCK_SHUTDOWN ) {
this->cacRef.disconnectNotify ( *this );
nBytes = 0u;
break;
}
@@ -184,13 +179,13 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
localError != SOCK_EPIPE &&
localError != SOCK_ECONNRESET &&
localError != SOCK_ETIMEDOUT &&
localError != SOCK_ECONNABORTED ) {
localError != SOCK_ECONNABORTED &&
localError != SOCK_SHUTDOWN ) {
this->cacRef.printf ( "CAC: unexpected TCP send error: %s\n",
SOCKERRSTR ( localError ) );
}
this->cacRef.disconnectNotify ( *this );
nBytes = 0u;
break;
}
}
@@ -202,64 +197,68 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
unsigned tcpiiu::recvBytes ( void * pBuf, unsigned nBytesInBuf ) epicsThrows (())
{
if ( this->state != iiucs_connected &&
this->state != iiucs_clean_shutdown ) {
return 0u;
}
unsigned bytesAvailable = 0u;
assert ( nBytesInBuf <= INT_MAX );
int status = ::recv ( this->sock, static_cast <char *> ( pBuf ),
static_cast <int> ( nBytesInBuf ), 0 );
while ( this->state == iiucs_connected ) {
// if the circuit was aborted then supress warning message about
// bad file descriptor
if ( this->state != iiucs_connected &&
this->state != iiucs_clean_shutdown ) {
return 0u;
}
int status = ::recv ( this->sock, static_cast <char *> ( pBuf ),
static_cast <int> ( nBytesInBuf ), 0 );
if ( status <= 0 ) {
int localErrno = SOCKERRNO;
if ( status == 0 ) {
this->cacRef.disconnectNotify ( *this );
return 0u;
if ( status > 0 ) {
bytesAvailable = static_cast <unsigned> ( status );
assert ( bytesAvailable <= nBytesInBuf );
break;
}
else {
int localErrno = SOCKERRNO;
if ( localErrno == SOCK_SHUTDOWN ) {
this->cacRef.disconnectNotify ( *this );
return 0u;
if ( status == 0 ) {
this->cacRef.disconnectNotify ( *this );
return 0u;
}
// if the circuit was aborted then supress warning message about
// bad file descriptor
if ( this->state != iiucs_connected &&
this->state != iiucs_clean_shutdown ) {
return 0u;
}
if ( localErrno == SOCK_SHUTDOWN ) {
this->cacRef.disconnectNotify ( *this );
return 0u;
}
if ( localErrno == SOCK_EINTR ) {
continue;
}
if ( localErrno == SOCK_ECONNABORTED ) {
this->cacRef.disconnectNotify ( *this );
return 0u;
}
if ( localErrno == SOCK_ECONNRESET ) {
this->cacRef.disconnectNotify ( *this );
return 0u;
}
{
char name[64];
this->hostName ( name, sizeof ( name ) );
this->printf ( "Unexpected problem with circuit to CA server \"%s\" was \"%s\" - disconnecting\n",
name, SOCKERRSTR ( localErrno ) );
}
this->cacRef.initiateAbortShutdown ( *this );
break;
}
if ( localErrno == SOCK_EINTR ) {
return 0u;
}
if ( localErrno == SOCK_ECONNABORTED ) {
this->cacRef.disconnectNotify ( *this );
return 0u;
}
if ( localErrno == SOCK_ECONNRESET ) {
this->cacRef.disconnectNotify ( *this );
return 0u;
}
{
char name[64];
this->hostName ( name, sizeof ( name ) );
this->printf ( "Unexpected problem with circuit to CA server \"%s\" was \"%s\" - disconnecting\n",
name, SOCKERRSTR ( localErrno ) );
}
this->cacRef.initiateAbortShutdown ( *this );
return 0u;
}
assert ( static_cast <unsigned> ( status ) <= nBytesInBuf );
return static_cast <unsigned> ( status );
return bytesAvailable;
}
tcpRecvThread::tcpRecvThread ( class tcpiiu & iiuIn, class callbackMutex & cbMutexIn,
@@ -320,11 +319,10 @@ void tcpRecvThread::run ()
else {
this->iiu.blockUntilBytesArePendingInOS();
nBytesIn = 0u;
}
if ( this->iiu.state != tcpiiu::iiucs_connected &&
this->iiu.state != tcpiiu::iiucs_clean_shutdown ) {
break;
if ( this->iiu.state != tcpiiu::iiucs_connected &&
this->iiu.state != tcpiiu::iiucs_clean_shutdown ) {
break;
}
}
// reschedule connection activity watchdog
@@ -341,16 +339,15 @@ void tcpRecvThread::run ()
if ( ! this->iiu.cacRef.preemptiveCallbakIsEnabled() ) {
nBytesIn = pComBuf->fillFromWire ( this->iiu );
}
if ( this->iiu.state != tcpiiu::iiucs_connected &&
this->iiu.state != tcpiiu::iiucs_clean_shutdown ) {
break;
if ( this->iiu.state != tcpiiu::iiucs_connected &&
this->iiu.state != tcpiiu::iiucs_clean_shutdown ) {
break;
}
}
// force the receive watchdog to be reset every 5 frames
unsigned contiguousFrameCount = 0;
while ( nBytesIn ) {
while ( nBytesIn && ++contiguousFrameCount <= 5 ) {
if ( nBytesIn == pComBuf->capacityBytes () ) {
if ( this->iiu.contigRecvMsgCount >=
contiguousMsgCountWhichTriggersFlowControl ) {
@@ -380,10 +377,6 @@ void tcpRecvThread::run ()
break;
}
if ( ++contiguousFrameCount >= 5 ) {
break;
}
nBytesIn = pComBuf->fillFromWire ( this->iiu );
}
}
@@ -401,7 +394,11 @@ void tcpRecvThread::run ()
// required because the receive thread must hang around
// until it receives its blocking socket call interrupt
// signal.
this->iiu.cacRef.initiateAbortShutdown ( this->iiu );
{
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
epicsGuard < cacMutex > guard ( this->iiu.cacRef.mutexRef() );
this->iiu.shutdown ( cbGuard, guard );
}
}
//
@@ -417,7 +414,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
recvThread ( *this, cbMutex, "CAC-TCP-recv",
epicsThreadGetStackSize ( epicsThreadStackBig ),
cac::highestPriorityLevelBelow ( cac.getInitializingThreadsPriority() ) ),
sendThread ( *this, "CAC-TCP-send",
sendThread ( *this, cbMutex, "CAC-TCP-send",
epicsThreadGetStackSize ( epicsThreadStackMedium ),
cac::lowestPriorityLevelAbove (
cac::lowestPriorityLevelAbove (
@@ -444,7 +441,8 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
oldMsgHeaderAvailable ( false ),
msgHeaderAvailable ( false ),
earlyFlush ( false ),
recvProcessPostponedFlush ( false )
recvProcessPostponedFlush ( false ),
discardingPendingData ( false )
{
this->sock = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP );
if ( this->sock == INVALID_SOCKET ) {
@@ -562,14 +560,12 @@ void tcpiiu::connect ()
*/
this->sendDog.start ();
while ( true ) {
while ( this->state == iiucs_connecting ) {
osiSockAddr tmp = this->address ();
int status = ::connect ( this->sock,
&tmp.sa, sizeof ( tmp.sa ) );
if ( status == 0 ) {
this->sendDog.cancel ();
epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() );
if ( this->state == iiucs_connecting ) {
@@ -580,30 +576,26 @@ void tcpiiu::connect ()
this->recvDog.connectNotify ();
}
return;
break;
}
int errnoCpy = SOCKERRNO;
if ( errnoCpy == SOCK_EINTR ) {
if ( this->state != iiucs_connecting ) {
this->sendDog.cancel ();
return;
}
continue;
}
else if ( errnoCpy == SOCK_SHUTDOWN ) {
this->sendDog.cancel ();
return;
break;
}
else {
this->sendDog.cancel ();
this->printf ( "Unable to connect because %d=\"%s\"\n",
errnoCpy, SOCKERRSTR ( errnoCpy ) );
this->cacRef.disconnectNotify ( *this );
return;
break;
}
}
this->sendDog.cancel ();
return;
}
void tcpiiu::initiateCleanShutdown ( epicsGuard < cacMutex > & )
@@ -625,14 +617,7 @@ void tcpiiu::disconnectNotify ( epicsGuard < cacMutex > & )
void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard,
epicsGuard <cacMutex > & guard )
{
iiu_conn_state oldState = this->state;
if ( oldState != iiucs_abort_shutdown ) {
this->state = iiucs_abort_shutdown;
{
epicsGuardRelease < cacMutex > guardRelease ( guard );
this->cacRef.notifyDestroyFD ( cbGuard, this->sock );
}
if ( ! this->discardingPendingData ) {
// force abortive shutdown sequence
// (discard outstanding sends and receives)
struct linger tmpLinger;
@@ -644,6 +629,21 @@ void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard,
errlogPrintf ( "CAC TCP socket linger set error was %s\n",
SOCKERRSTR (SOCKERRNO) );
}
this->discardingPendingData = true;
}
this->shutdown ( cbGuard, guard );
}
void tcpiiu::shutdown ( epicsGuard < callbackMutex > & cbGuard,
epicsGuard <cacMutex > & guard )
{
iiu_conn_state oldState = this->state;
if ( oldState != iiucs_abort_shutdown ) {
this->state = iiucs_abort_shutdown;
{
epicsGuardRelease < cacMutex > guardRelease ( guard );
this->cacRef.notifyDestroyFD ( cbGuard, this->sock );
}
//
// on HPUX close() and shutdown() are not enough so we must also
@@ -656,7 +656,7 @@ void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard,
// linux threads in recv() dont wakeup unless we also
// call shutdown ( close() by itself is not enough )
if ( oldState == iiucs_connected ) {
status = ::shutdown ( this->sock, SD_BOTH );
int status = ::shutdown ( this->sock, SD_BOTH );
if ( status ) {
errlogPrintf ("CAC TCP socket shutdown error was %s\n",
SOCKERRSTR (SOCKERRNO) );
@@ -668,7 +668,7 @@ void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard,
// unblock a thread in recv() so we use close() and introduce
// some complexity because we must unregister the fd early
//
status = socket_close ( this->sock );
int status = socket_close ( this->sock );
if ( status ) {
errlogPrintf ("CAC TCP socket close error was %s\n",
SOCKERRSTR (SOCKERRNO) );

View File

@@ -69,16 +69,18 @@ private:
class tcpSendThread : public epicsThreadRunable {
public:
tcpSendThread ( class tcpiiu & iiuIn,
const char * pName, unsigned int stackSize, unsigned int priority );
tcpSendThread ( class tcpiiu & iiuIn, callbackMutex &,
const char * pName, unsigned int stackSize,
unsigned int priority );
virtual ~tcpSendThread ();
void start ();
void exitWait ();
void exitWaitRelease ();
void interruptSocketSend ();
private:
class tcpiiu & iiu;
epicsThread thread;
class tcpiiu & iiu;
callbackMutex & cbMutex;
void run ();
};
@@ -93,6 +95,7 @@ public:
const cacChannel::priLev & priorityIn );
~tcpiiu ();
void start ( epicsGuard < callbackMutex > & );
void initiateCleanShutdown ( epicsGuard < cacMutex > & );
void initiateAbortShutdown ( epicsGuard < callbackMutex > &,
epicsGuard <cacMutex > & );
void disconnectNotify ( epicsGuard <cacMutex > & );
@@ -127,7 +130,6 @@ public:
void installChannel ( epicsGuard < cacMutex > &, nciu & chan,
unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn );
void uninstallChan ( epicsGuard < cacMutex > &, nciu & chan );
void initiateCleanShutdown ( epicsGuard < cacMutex > & );
bool bytesArePendingInOS () const;
@@ -168,6 +170,7 @@ private:
bool msgHeaderAvailable;
bool earlyFlush;
bool recvProcessPostponedFlush;
bool discardingPendingData;
bool processIncoming ( epicsGuard < callbackMutex > & );
unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf ) epicsThrows (());
@@ -175,6 +178,8 @@ private:
void connect ();
const char * pHostName () const;
void blockUntilBytesArePendingInOS ();
void shutdown ( epicsGuard < callbackMutex > &,
epicsGuard <cacMutex > & );
// send protocol stubs
void echoRequest ( epicsGuard < cacMutex > & );