cleaned up shutdown procedure
This commit is contained in:
@@ -161,6 +161,13 @@ cac::cac ( cacNotify & notifyIn ) :
|
||||
try {
|
||||
long status;
|
||||
|
||||
/*
|
||||
* Certain os, such as HPUX, do not unblock a socket system call
|
||||
* when another thread asynchronously calls both shutdown() and
|
||||
* close(). To solve this problem we need to employ OS specific
|
||||
* mechanisms.
|
||||
*/
|
||||
epicsSignalInstallSigUrgIgnore ();
|
||||
epicsSignalInstallSigPipeIgnore ();
|
||||
|
||||
{
|
||||
@@ -1507,6 +1514,8 @@ void cac::initiateAbortShutdown ( tcpiiu & iiu )
|
||||
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
|
||||
epicsGuard < cacMutex > guard ( this->mutex );
|
||||
|
||||
iiu.initiateAbortShutdown ( cbGuard, guard );
|
||||
|
||||
// Disconnect all channels immediately from the timer thread
|
||||
// because on certain OS such as HPUX it's difficult to
|
||||
// unblock a blocking send() call, and we need immediate
|
||||
@@ -1517,8 +1526,6 @@ void cac::initiateAbortShutdown ( tcpiiu & iiu )
|
||||
genLocalExcep ( cbGuard, *this, ECA_DISCONN, hostNameTmp );
|
||||
}
|
||||
iiu.removeAllChannels ( cbGuard, guard, *this );
|
||||
|
||||
iiu.initiateAbortShutdown ( cbGuard, guard );
|
||||
}
|
||||
|
||||
void cac::uninstallIIU ( epicsGuard < callbackMutex > & cbGuard, tcpiiu & iiu )
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
#include "hostNameCache.h"
|
||||
#include "net_convert.h"
|
||||
#include "bhe.h"
|
||||
#include "epicsSignal.h"
|
||||
|
||||
const unsigned mSecPerSec = 1000u;
|
||||
const unsigned uSecPerSec = 1000u * mSecPerSec;
|
||||
@@ -65,7 +66,6 @@ void tcpSendThread::exitWait ()
|
||||
void tcpSendThread::run ()
|
||||
{
|
||||
try {
|
||||
epicsEnableInterruptedSystemCall ();
|
||||
|
||||
while ( true ) {
|
||||
bool flowControlLaborNeeded;
|
||||
@@ -113,21 +113,36 @@ void tcpSendThread::run ()
|
||||
}
|
||||
if ( this->iiu.state == tcpiiu::iiucs_clean_shutdown ) {
|
||||
this->iiu.flush ();
|
||||
// this should cause the server to disconnect from
|
||||
// the client
|
||||
int status = ::shutdown ( this->iiu.sock, SHUT_WR );
|
||||
if ( status ) {
|
||||
char sockErrBuf[64];
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
errlogPrintf ("CAC TCP clean socket shutdown error was %s\n",
|
||||
sockErrBuf );
|
||||
}
|
||||
}
|
||||
}
|
||||
catch ( ... ) {
|
||||
this->iiu.printf (
|
||||
"cac: tcp send thread received an unexpected exception ",
|
||||
"- disconnecting\n");
|
||||
// this should cause the server to disconnect from
|
||||
// the client
|
||||
int status = ::shutdown ( this->iiu.sock, SHUT_WR );
|
||||
if ( status ) {
|
||||
char sockErrBuf[64];
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
errlogPrintf ("CAC TCP clean socket shutdown error was %s\n",
|
||||
sockErrBuf );
|
||||
}
|
||||
}
|
||||
|
||||
this->iiu.sendDog.cancel ();
|
||||
|
||||
{
|
||||
epicsGuard < cacMutex > guard ( this->iiu.cacRef.mutexRef() );
|
||||
this->iiu.shutdown ( guard );
|
||||
}
|
||||
|
||||
// wakeup user threads blocking for send backlog to be reduced
|
||||
// and wait for them to stop using this IIU
|
||||
this->iiu.flushBlockEvent.signal ();
|
||||
@@ -185,7 +200,7 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
|
||||
localError != SOCK_ECONNABORTED &&
|
||||
localError != SOCK_SHUTDOWN ) {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
this->cacRef.printf ( "CAC: unexpected TCP send error: %s\n",
|
||||
sockErrBuf );
|
||||
@@ -256,7 +271,7 @@ unsigned tcpiiu::recvBytes ( void * pBuf, unsigned nBytesInBuf )
|
||||
char name[64];
|
||||
this->hostName ( name, sizeof ( name ) );
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
this->printf ( "Unexpected problem with circuit to CA ",
|
||||
"server \"%s\" was \"%s\" - disconnecting\n",
|
||||
@@ -294,8 +309,6 @@ void tcpRecvThread::exitWait ()
|
||||
void tcpRecvThread::run ()
|
||||
{
|
||||
try {
|
||||
epicsEnableInterruptedSystemCall ();
|
||||
|
||||
this->iiu.cacRef.attachToClientCtx ();
|
||||
|
||||
epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu );
|
||||
@@ -398,15 +411,6 @@ void tcpRecvThread::run ()
|
||||
"CA client library tcp receive thread "
|
||||
"terminating due to a C++ exception\n" );
|
||||
}
|
||||
|
||||
// Although this is redundant in certain situations it is
|
||||
// required because the receive thread must hang around
|
||||
// until it receives its blocking socket call interrupt
|
||||
// signal.
|
||||
{
|
||||
epicsGuard < cacMutex > guard ( this->iiu.cacRef.mutexRef() );
|
||||
this->iiu.shutdown ( guard );
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
@@ -450,12 +454,13 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
|
||||
msgHeaderAvailable ( false ),
|
||||
earlyFlush ( false ),
|
||||
recvProcessPostponedFlush ( false ),
|
||||
discardingPendingData ( false )
|
||||
discardingPendingData ( false ),
|
||||
socketHasBeenClosed ( false )
|
||||
{
|
||||
this->sock = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP );
|
||||
if ( this->sock == INVALID_SOCKET ) {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
this->printf ( "CAC: unable to create virtual circuit because \"%s\"\n",
|
||||
sockErrBuf );
|
||||
@@ -468,7 +473,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
|
||||
(char *) &flag, sizeof ( flag ) );
|
||||
if ( status < 0 ) {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
this->printf ( "CAC: problems setting socket option TCP_NODELAY = \"%s\"\n",
|
||||
sockErrBuf );
|
||||
@@ -479,7 +484,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
|
||||
( char * ) &flag, sizeof ( flag ) );
|
||||
if ( status < 0 ) {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
this->printf ( "CAC: problems setting socket option SO_KEEPALIVE = \"%s\"\n",
|
||||
sockErrBuf );
|
||||
@@ -507,7 +512,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
|
||||
( char * ) &i, sizeof ( i ) );
|
||||
if (status < 0) {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString ( sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
this->printf ("CAC: problems setting socket option SO_SNDBUF = \"%s\"\n",
|
||||
sockErrBuf );
|
||||
}
|
||||
@@ -516,7 +521,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
|
||||
( char * ) &i, sizeof ( i ) );
|
||||
if ( status < 0 ) {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString ( sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
this->printf ( "CAC: problems setting socket option SO_RCVBUF = \"%s\"\n",
|
||||
sockErrBuf );
|
||||
}
|
||||
@@ -531,7 +536,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
|
||||
if ( status < 0 || nBytes < 0 ||
|
||||
sizeOfParameter != static_cast < int > ( sizeof ( nBytes ) ) ) {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
this->printf ("CAC: problems getting socket option SO_SNDBUF = \"%s\"\n",
|
||||
sockErrBuf );
|
||||
@@ -612,7 +617,7 @@ void tcpiiu::connect ()
|
||||
}
|
||||
else {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
this->printf ( "Unable to connect because \"%s\"\n",
|
||||
sockErrBuf );
|
||||
@@ -626,17 +631,15 @@ void tcpiiu::connect ()
|
||||
|
||||
void tcpiiu::initiateCleanShutdown ( epicsGuard < cacMutex > & )
|
||||
{
|
||||
if ( this->state == iiucs_connected || this->state == iiucs_connecting ) {
|
||||
if ( this->state == iiucs_connected ) {
|
||||
this->state = iiucs_clean_shutdown;
|
||||
this->sendThreadFlushEvent.signal ();
|
||||
}
|
||||
this->sendThreadFlushEvent.signal ();
|
||||
}
|
||||
|
||||
void tcpiiu::disconnectNotify ( epicsGuard < cacMutex > & )
|
||||
{
|
||||
if ( this->state == iiucs_connected || this->state == iiucs_connecting ) {
|
||||
this->state = iiucs_disconnected;
|
||||
}
|
||||
this->state = iiucs_disconnected;
|
||||
this->sendThreadFlushEvent.signal ();
|
||||
}
|
||||
|
||||
@@ -653,56 +656,60 @@ void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard, //
|
||||
reinterpret_cast <char *> ( &tmpLinger ), sizeof (tmpLinger) );
|
||||
if ( status != 0 ) {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
errlogPrintf ( "CAC TCP socket linger set error was %s\n",
|
||||
sockErrBuf );
|
||||
}
|
||||
this->discardingPendingData = true;
|
||||
}
|
||||
this->shutdown ( guard );
|
||||
}
|
||||
|
||||
void tcpiiu::shutdown ( epicsGuard <cacMutex > & guard ) // X aCC 431
|
||||
{
|
||||
iiu_conn_state oldState = this->state;
|
||||
if ( oldState != iiucs_abort_shutdown ) {
|
||||
if ( oldState != iiucs_abort_shutdown && oldState != iiucs_disconnected ) {
|
||||
this->state = iiucs_abort_shutdown;
|
||||
|
||||
//
|
||||
// on HPUX close() and shutdown() are not enough so we must also
|
||||
// throw signals to interrupt the threads that may be in the
|
||||
// send() and recv() system calls.
|
||||
//
|
||||
this->recvThread.interruptSocketRecv ();
|
||||
this->sendThread.interruptSocketSend ();
|
||||
|
||||
// linux threads in recv() dont wakeup unless we also
|
||||
// call shutdown ( close() by itself is not enough )
|
||||
if ( oldState == iiucs_connected ) {
|
||||
int status = ::shutdown ( this->sock, SHUT_RDWR );
|
||||
if ( status ) {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
errlogPrintf ("CAC TCP socket shutdown error was %s\n",
|
||||
sockErrBuf );
|
||||
epicsSocketSystemCallInterruptMechanismQueryInfo info =
|
||||
epicsSocketSystemCallInterruptMechanismQuery ();
|
||||
switch ( info ) {
|
||||
case esscimqi_socketCloseRequired:
|
||||
//
|
||||
// on winsock and probably vxWorks shutdown() does not
|
||||
// unblock a thread in recv() so we use close() and introduce
|
||||
// some complexity because we must unregister the fd early
|
||||
//
|
||||
if ( ! this->socketHasBeenClosed ) {
|
||||
int status = socket_close ( this->sock );
|
||||
if ( status ) {
|
||||
char sockErrBuf[64];
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
errlogPrintf ("CAC TCP socket close error was %s\n",
|
||||
sockErrBuf );
|
||||
}
|
||||
else {
|
||||
this->socketHasBeenClosed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// on winsock and probably vxWorks shutdown() does not
|
||||
// unblock a thread in recv() so we use close() and introduce
|
||||
// some complexity because we must unregister the fd early
|
||||
//
|
||||
int status = socket_close ( this->sock );
|
||||
if ( status ) {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
errlogPrintf ("CAC TCP socket close error was %s\n",
|
||||
sockErrBuf );
|
||||
}
|
||||
break;
|
||||
case esscimqi_socketBothShutdownRequired:
|
||||
{
|
||||
int status = ::shutdown ( this->sock, SHUT_RDWR );
|
||||
if ( status ) {
|
||||
char sockErrBuf[64];
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
errlogPrintf ("CAC TCP socket shutdown error was %s\n",
|
||||
sockErrBuf );
|
||||
}
|
||||
}
|
||||
break;
|
||||
case esscimqi_socketSigurgRequired:
|
||||
this->recvThread.interruptSocketRecv ();
|
||||
this->sendThread.interruptSocketSend ();
|
||||
break;
|
||||
case esscimqi_shuechanismImplemenedHerein:
|
||||
default:
|
||||
break;
|
||||
};
|
||||
|
||||
//
|
||||
// wake up the send thread if it isnt blocking in send()
|
||||
@@ -719,11 +726,11 @@ tcpiiu::~tcpiiu ()
|
||||
this->sendThread.exitWait ();
|
||||
this->recvThread.exitWait ();
|
||||
|
||||
if ( this->state != this->iiucs_abort_shutdown ) {
|
||||
if ( ! this->socketHasBeenClosed ) {
|
||||
int status = socket_close ( this->sock );
|
||||
if ( status ) {
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
errlogPrintf ("CAC TCP socket close error was %s\n",
|
||||
sockErrBuf );
|
||||
@@ -1429,7 +1436,7 @@ void tcpiiu::blockUntilBytesArePendingInOS ()
|
||||
char name[64];
|
||||
this->hostName ( name, sizeof ( name ) );
|
||||
char sockErrBuf[64];
|
||||
convertSocketErrorToString (
|
||||
epicsSocketConvertErrnoToString (
|
||||
sockErrBuf, sizeof ( sockErrBuf ) );
|
||||
this->printf ( "Unexpected problem with circuit to CA server \"%s\" was \"%s\" - disconnecting\n",
|
||||
name, sockErrBuf );
|
||||
@@ -1476,6 +1483,25 @@ double tcpiiu::receiveWatchdogDelay () const
|
||||
return this->recvDog.delay ();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Certain OS, such as HPUX, do not unblock a socket system call
|
||||
* when another thread asynchronously calls both shutdown() and
|
||||
* close(). To solve this problem we need to employ OS specific
|
||||
* mechanisms.
|
||||
*/
|
||||
void tcpRecvThread::interruptSocketRecv ()
|
||||
{
|
||||
epicsThreadId threadId = this->thread.getId ();
|
||||
if ( threadId ) {
|
||||
epicsSignalRaiseSigUrg ( threadId );
|
||||
}
|
||||
}
|
||||
void tcpSendThread::interruptSocketSend ()
|
||||
{
|
||||
epicsThreadId threadId = this->thread.getId ();
|
||||
if ( threadId ) {
|
||||
epicsSignalRaiseSigUrg ( threadId );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user