diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 0de386ffd..8ac188dfb 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -41,8 +41,10 @@ const unsigned mSecPerSec = 1000u; const unsigned uSecPerSec = 1000u * mSecPerSec; tcpSendThread::tcpSendThread ( class tcpiiu & iiuIn, - const char * pName, unsigned stackSize, unsigned priority ) : - iiu ( iiuIn ), thread ( *this, pName, stackSize, priority ) + callbackMutex & cbMutexIn, const char * pName, + unsigned stackSize, unsigned priority ) : + thread ( *this, pName, stackSize, priority ), iiu ( iiuIn ), + cbMutex ( cbMutexIn ) { } @@ -120,7 +122,11 @@ void tcpSendThread::run () this->iiu.sendDog.cancel (); - this->iiu.cacRef.initiateAbortShutdown ( this->iiu ); + { + epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); + epicsGuard < cacMutex > guard ( this->iiu.cacRef.mutexRef() ); + this->iiu.shutdown ( cbGuard, guard ); + } // wakeup user threads blocking for send backlog to be reduced // and wait for them to stop using this IIU @@ -144,16 +150,12 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, int status; unsigned nBytes = 0u; - if ( this->state != iiucs_connected && - this->state != iiucs_clean_shutdown ) { - return 0u; - } - assert ( nBytesInBuf <= INT_MAX ); this->sendDog.start (); - while ( true ) { + while ( this->state == iiucs_connected || + this->state == iiucs_clean_shutdown ) { status = ::send ( this->sock, static_cast < const char * > (pBuf), (int) nBytesInBuf, 0 ); if ( status > 0 ) { @@ -166,13 +168,6 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, // winsock indicates disconnect by returniing zero here if ( status == 0 ) { this->cacRef.disconnectNotify ( *this ); - nBytes = 0u; - break; - } - - if ( localError == SOCK_SHUTDOWN ) { - this->cacRef.disconnectNotify ( *this ); - nBytes = 0u; break; } @@ -184,13 +179,13 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, localError != SOCK_EPIPE && localError != SOCK_ECONNRESET && localError != SOCK_ETIMEDOUT && - localError != SOCK_ECONNABORTED ) { + localError != SOCK_ECONNABORTED && + localError != SOCK_SHUTDOWN ) { this->cacRef.printf ( "CAC: unexpected TCP send error: %s\n", SOCKERRSTR ( localError ) ); } this->cacRef.disconnectNotify ( *this ); - nBytes = 0u; break; } } @@ -202,64 +197,68 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, unsigned tcpiiu::recvBytes ( void * pBuf, unsigned nBytesInBuf ) epicsThrows (()) { - if ( this->state != iiucs_connected && - this->state != iiucs_clean_shutdown ) { - return 0u; - } + unsigned bytesAvailable = 0u; assert ( nBytesInBuf <= INT_MAX ); - int status = ::recv ( this->sock, static_cast ( pBuf ), - static_cast ( nBytesInBuf ), 0 ); + while ( this->state == iiucs_connected ) { - // if the circuit was aborted then supress warning message about - // bad file descriptor - if ( this->state != iiucs_connected && - this->state != iiucs_clean_shutdown ) { - return 0u; - } + int status = ::recv ( this->sock, static_cast ( pBuf ), + static_cast ( nBytesInBuf ), 0 ); - if ( status <= 0 ) { - int localErrno = SOCKERRNO; - - if ( status == 0 ) { - this->cacRef.disconnectNotify ( *this ); - return 0u; + if ( status > 0 ) { + bytesAvailable = static_cast ( status ); + assert ( bytesAvailable <= nBytesInBuf ); + break; } + else { + int localErrno = SOCKERRNO; - if ( localErrno == SOCK_SHUTDOWN ) { - this->cacRef.disconnectNotify ( *this ); - return 0u; + if ( status == 0 ) { + this->cacRef.disconnectNotify ( *this ); + return 0u; + } + + // if the circuit was aborted then supress warning message about + // bad file descriptor + if ( this->state != iiucs_connected && + this->state != iiucs_clean_shutdown ) { + return 0u; + } + + if ( localErrno == SOCK_SHUTDOWN ) { + this->cacRef.disconnectNotify ( *this ); + return 0u; + } + + if ( localErrno == SOCK_EINTR ) { + continue; + } + + if ( localErrno == SOCK_ECONNABORTED ) { + this->cacRef.disconnectNotify ( *this ); + return 0u; + } + + if ( localErrno == SOCK_ECONNRESET ) { + this->cacRef.disconnectNotify ( *this ); + return 0u; + } + + { + char name[64]; + this->hostName ( name, sizeof ( name ) ); + this->printf ( "Unexpected problem with circuit to CA server \"%s\" was \"%s\" - disconnecting\n", + name, SOCKERRSTR ( localErrno ) ); + } + + this->cacRef.initiateAbortShutdown ( *this ); + + break; } - - if ( localErrno == SOCK_EINTR ) { - return 0u; - } - - if ( localErrno == SOCK_ECONNABORTED ) { - this->cacRef.disconnectNotify ( *this ); - return 0u; - } - - if ( localErrno == SOCK_ECONNRESET ) { - this->cacRef.disconnectNotify ( *this ); - return 0u; - } - - { - char name[64]; - this->hostName ( name, sizeof ( name ) ); - this->printf ( "Unexpected problem with circuit to CA server \"%s\" was \"%s\" - disconnecting\n", - name, SOCKERRSTR ( localErrno ) ); - } - - this->cacRef.initiateAbortShutdown ( *this ); - - return 0u; } - - assert ( static_cast ( status ) <= nBytesInBuf ); - return static_cast ( status ); + + return bytesAvailable; } tcpRecvThread::tcpRecvThread ( class tcpiiu & iiuIn, class callbackMutex & cbMutexIn, @@ -320,11 +319,10 @@ void tcpRecvThread::run () else { this->iiu.blockUntilBytesArePendingInOS(); nBytesIn = 0u; - } - - if ( this->iiu.state != tcpiiu::iiucs_connected && - this->iiu.state != tcpiiu::iiucs_clean_shutdown ) { - break; + if ( this->iiu.state != tcpiiu::iiucs_connected && + this->iiu.state != tcpiiu::iiucs_clean_shutdown ) { + break; + } } // reschedule connection activity watchdog @@ -341,16 +339,15 @@ void tcpRecvThread::run () if ( ! this->iiu.cacRef.preemptiveCallbakIsEnabled() ) { nBytesIn = pComBuf->fillFromWire ( this->iiu ); - } - - if ( this->iiu.state != tcpiiu::iiucs_connected && - this->iiu.state != tcpiiu::iiucs_clean_shutdown ) { - break; + if ( this->iiu.state != tcpiiu::iiucs_connected && + this->iiu.state != tcpiiu::iiucs_clean_shutdown ) { + break; + } } // force the receive watchdog to be reset every 5 frames unsigned contiguousFrameCount = 0; - while ( nBytesIn ) { + while ( nBytesIn && ++contiguousFrameCount <= 5 ) { if ( nBytesIn == pComBuf->capacityBytes () ) { if ( this->iiu.contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) { @@ -380,10 +377,6 @@ void tcpRecvThread::run () break; } - if ( ++contiguousFrameCount >= 5 ) { - break; - } - nBytesIn = pComBuf->fillFromWire ( this->iiu ); } } @@ -401,7 +394,11 @@ void tcpRecvThread::run () // required because the receive thread must hang around // until it receives its blocking socket call interrupt // signal. - this->iiu.cacRef.initiateAbortShutdown ( this->iiu ); + { + epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); + epicsGuard < cacMutex > guard ( this->iiu.cacRef.mutexRef() ); + this->iiu.shutdown ( cbGuard, guard ); + } } // @@ -417,7 +414,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, recvThread ( *this, cbMutex, "CAC-TCP-recv", epicsThreadGetStackSize ( epicsThreadStackBig ), cac::highestPriorityLevelBelow ( cac.getInitializingThreadsPriority() ) ), - sendThread ( *this, "CAC-TCP-send", + sendThread ( *this, cbMutex, "CAC-TCP-send", epicsThreadGetStackSize ( epicsThreadStackMedium ), cac::lowestPriorityLevelAbove ( cac::lowestPriorityLevelAbove ( @@ -444,7 +441,8 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, oldMsgHeaderAvailable ( false ), msgHeaderAvailable ( false ), earlyFlush ( false ), - recvProcessPostponedFlush ( false ) + recvProcessPostponedFlush ( false ), + discardingPendingData ( false ) { this->sock = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if ( this->sock == INVALID_SOCKET ) { @@ -562,14 +560,12 @@ void tcpiiu::connect () */ this->sendDog.start (); - while ( true ) { + while ( this->state == iiucs_connecting ) { osiSockAddr tmp = this->address (); int status = ::connect ( this->sock, &tmp.sa, sizeof ( tmp.sa ) ); if ( status == 0 ) { - this->sendDog.cancel (); - epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() ); if ( this->state == iiucs_connecting ) { @@ -580,30 +576,26 @@ void tcpiiu::connect () this->recvDog.connectNotify (); } - return; + break; } int errnoCpy = SOCKERRNO; if ( errnoCpy == SOCK_EINTR ) { - if ( this->state != iiucs_connecting ) { - this->sendDog.cancel (); - return; - } continue; } else if ( errnoCpy == SOCK_SHUTDOWN ) { - this->sendDog.cancel (); - return; + break; } else { - this->sendDog.cancel (); this->printf ( "Unable to connect because %d=\"%s\"\n", errnoCpy, SOCKERRSTR ( errnoCpy ) ); this->cacRef.disconnectNotify ( *this ); - return; + break; } } + this->sendDog.cancel (); + return; } void tcpiiu::initiateCleanShutdown ( epicsGuard < cacMutex > & ) @@ -625,14 +617,7 @@ void tcpiiu::disconnectNotify ( epicsGuard < cacMutex > & ) void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard, epicsGuard & guard ) { - iiu_conn_state oldState = this->state; - if ( oldState != iiucs_abort_shutdown ) { - this->state = iiucs_abort_shutdown; - { - epicsGuardRelease < cacMutex > guardRelease ( guard ); - this->cacRef.notifyDestroyFD ( cbGuard, this->sock ); - } - + if ( ! this->discardingPendingData ) { // force abortive shutdown sequence // (discard outstanding sends and receives) struct linger tmpLinger; @@ -644,6 +629,21 @@ void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard, errlogPrintf ( "CAC TCP socket linger set error was %s\n", SOCKERRSTR (SOCKERRNO) ); } + this->discardingPendingData = true; + } + this->shutdown ( cbGuard, guard ); +} + +void tcpiiu::shutdown ( epicsGuard < callbackMutex > & cbGuard, + epicsGuard & guard ) +{ + iiu_conn_state oldState = this->state; + if ( oldState != iiucs_abort_shutdown ) { + this->state = iiucs_abort_shutdown; + { + epicsGuardRelease < cacMutex > guardRelease ( guard ); + this->cacRef.notifyDestroyFD ( cbGuard, this->sock ); + } // // on HPUX close() and shutdown() are not enough so we must also @@ -656,7 +656,7 @@ void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard, // linux threads in recv() dont wakeup unless we also // call shutdown ( close() by itself is not enough ) if ( oldState == iiucs_connected ) { - status = ::shutdown ( this->sock, SD_BOTH ); + int status = ::shutdown ( this->sock, SD_BOTH ); if ( status ) { errlogPrintf ("CAC TCP socket shutdown error was %s\n", SOCKERRSTR (SOCKERRNO) ); @@ -668,7 +668,7 @@ void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard, // unblock a thread in recv() so we use close() and introduce // some complexity because we must unregister the fd early // - status = socket_close ( this->sock ); + int status = socket_close ( this->sock ); if ( status ) { errlogPrintf ("CAC TCP socket close error was %s\n", SOCKERRSTR (SOCKERRNO) ); diff --git a/src/ca/virtualCircuit.h b/src/ca/virtualCircuit.h index 2db3afcad..91cf42ad4 100644 --- a/src/ca/virtualCircuit.h +++ b/src/ca/virtualCircuit.h @@ -69,16 +69,18 @@ private: class tcpSendThread : public epicsThreadRunable { public: - tcpSendThread ( class tcpiiu & iiuIn, - const char * pName, unsigned int stackSize, unsigned int priority ); + tcpSendThread ( class tcpiiu & iiuIn, callbackMutex &, + const char * pName, unsigned int stackSize, + unsigned int priority ); virtual ~tcpSendThread (); void start (); void exitWait (); void exitWaitRelease (); void interruptSocketSend (); private: - class tcpiiu & iiu; epicsThread thread; + class tcpiiu & iiu; + callbackMutex & cbMutex; void run (); }; @@ -93,6 +95,7 @@ public: const cacChannel::priLev & priorityIn ); ~tcpiiu (); void start ( epicsGuard < callbackMutex > & ); + void initiateCleanShutdown ( epicsGuard < cacMutex > & ); void initiateAbortShutdown ( epicsGuard < callbackMutex > &, epicsGuard & ); void disconnectNotify ( epicsGuard & ); @@ -127,7 +130,6 @@ public: void installChannel ( epicsGuard < cacMutex > &, nciu & chan, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn ); void uninstallChan ( epicsGuard < cacMutex > &, nciu & chan ); - void initiateCleanShutdown ( epicsGuard < cacMutex > & ); bool bytesArePendingInOS () const; @@ -168,6 +170,7 @@ private: bool msgHeaderAvailable; bool earlyFlush; bool recvProcessPostponedFlush; + bool discardingPendingData; bool processIncoming ( epicsGuard < callbackMutex > & ); unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf ) epicsThrows (()); @@ -175,6 +178,8 @@ private: void connect (); const char * pHostName () const; void blockUntilBytesArePendingInOS (); + void shutdown ( epicsGuard < callbackMutex > &, + epicsGuard & ); // send protocol stubs void echoRequest ( epicsGuard < cacMutex > & );