From 6f6f1d22cd7a644cdbf239150f185dc9ba544fd5 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Tue, 8 Nov 2005 21:25:23 +0000 Subject: [PATCH] better fix for mantis 111 --- src/ca/cac.h | 2 - src/ca/tcpRecvWatchdog.cpp | 41 ++++++------- src/ca/tcpSendWatchdog.cpp | 22 +++---- src/ca/tcpiiu.cpp | 117 +++++++++++++++++++------------------ src/ca/virtualCircuit.h | 16 +++-- 5 files changed, 101 insertions(+), 97 deletions(-) diff --git a/src/ca/cac.h b/src/ca/cac.h index 0d90e5810..9c4c471a7 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -258,8 +258,6 @@ private: // **** lock hierarchy **** // 1) callback lock must always be acquired before // the primary mutex if both locks are needed - // 2) tcpiiu::recvThreadIsRunning lock must always be - // acquired before callback lock if both locks are needed mutable epicsMutex & mutex; mutable epicsMutex & cbMutex; epicsEvent iiuUninstall; diff --git a/src/ca/tcpRecvWatchdog.cpp b/src/ca/tcpRecvWatchdog.cpp index 14507b06a..9e7f38103 100644 --- a/src/ca/tcpRecvWatchdog.cpp +++ b/src/ca/tcpRecvWatchdog.cpp @@ -47,32 +47,15 @@ tcpRecvWatchdog::~tcpRecvWatchdog () epicsTimerNotify::expireStatus tcpRecvWatchdog::expire ( const epicsTime & /* currentTime */ ) // X aCC 361 { - // allow pending receive traffic to run first - this->iiu.deferToRecvBacklog (); - - // callback lock is required because channel disconnect - // state change is initiated from this thread, and - // this can cause their disconnect notify callback - // to be invoked. - callbackManager mgr ( this->ctxNotify, this->cbMutex ); epicsGuard < epicsMutex > guard ( this->mutex ); if ( this->shuttingDown ) { return noRestart; } if ( this->probeResponsePending ) { - if ( this->iiu.bytesArePendingInOS() ) { - this->iiu.printf ( mgr.cbGuard, - "The CA client library's server inactivity timer initiated server disconnect\n" ); - this->iiu.printf ( mgr.cbGuard, - "despite the fact that messages from this server are pending for processing in\n" ); - this->iiu.printf ( mgr.cbGuard, - "the client library. Here are some possible causes of the unnecessary disconnect:\n" ); - this->iiu.printf ( mgr.cbGuard, - "o ca_pend_event() or ca_poll() have not been called for %f seconds\n", - this->period ); - this->iiu.printf ( mgr.cbGuard, - "o application is blocked in a callback from the client library\n" ); + if ( this->iiu.receiveThreadIsBusy ( guard ) ) { + return expireStatus ( restart, CA_ECHO_TIMEOUT ); } + { # ifdef DEBUG char hostName[128]; @@ -81,12 +64,26 @@ tcpRecvWatchdog::expire ( const epicsTime & /* currentTime */ ) // X aCC 361 "- disconnecting.\n", hostName, this->period ) ); # endif - this->iiu.receiveTimeoutNotify ( mgr, guard ); - this->probeTimeoutDetected = true; + // to get the callback lock safely we must reorder + // the lock hierarchy + epicsGuardRelease < epicsMutex > unguard ( guard ); + { + // callback lock is required because channel disconnect + // state change is initiated from this thread, and + // this can cause their disconnect notify callback + // to be invoked. + callbackManager mgr ( this->ctxNotify, this->cbMutex ); + epicsGuard < epicsMutex > tmpGuard ( this->mutex ); + this->iiu.receiveTimeoutNotify ( mgr, tmpGuard ); + this->probeTimeoutDetected = true; + } } return noRestart; } else { + if ( this->iiu.receiveThreadIsBusy ( guard ) ) { + return expireStatus ( restart, this->period ); + } this->probeTimeoutDetected = false; this->probeResponsePending = this->iiu.setEchoRequestPending ( guard ); debugPrintf ( ("circuit timed out - sending echo request\n") ); diff --git a/src/ca/tcpSendWatchdog.cpp b/src/ca/tcpSendWatchdog.cpp index 30e4a197b..6834b39e7 100644 --- a/src/ca/tcpSendWatchdog.cpp +++ b/src/ca/tcpSendWatchdog.cpp @@ -43,27 +43,29 @@ tcpSendWatchdog::~tcpSendWatchdog () epicsTimerNotify::expireStatus tcpSendWatchdog::expire ( const epicsTime & /* currentTime */ ) { - callbackManager mgr ( this->ctxNotify, this->cbMutex ); - epicsGuard < epicsMutex > guard ( this->mutex ); - if ( this->iiu.bytesArePendingInOS() ) { - this->iiu.printf ( mgr.cbGuard, - "The CA client library is disconnecting after a flush request " - "timed out, but receive data is pending, probably because of an " - "application schedualing problem\n" ); + { + epicsGuard < epicsMutex > guard ( this->mutex ); + if ( this->iiu.receiveThreadIsBusy ( guard ) ) { + return expireStatus ( restart, this->period ); + } } + { + callbackManager mgr ( this->ctxNotify, this->cbMutex ); + epicsGuard < epicsMutex > guard ( this->mutex ); # ifdef DEBUG char hostName[128]; this->iiu.getHostName ( guard, hostName, sizeof ( hostName ) ); debugPrintf ( ( "Request not accepted by CA server %s for %g sec. Disconnecting.\n", hostName, this->period ) ); # endif - this->iiu.sendTimeoutNotify ( mgr, guard ); + this->iiu.sendTimeoutNotify ( mgr, guard ); + } return noRestart; } -void tcpSendWatchdog::start ( const epicsTime & currentTime ) +void tcpSendWatchdog::start ( const epicsTime & /* currentTime */ ) { - this->timer.start ( *this, currentTime + this->period ); + this->timer.start ( *this, this->period ); } void tcpSendWatchdog::cancel () diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 75cbabd6d..92f4573d0 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -452,26 +452,10 @@ void tcpRecvThread::run () epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu ); this->iiu.cacRef.attachToClientCtx (); - comBuf * pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; + comBuf * pComBuf = 0; bool breakOut = false; while ( ! breakOut ) { - // - // if this thread has connected channels with subscriptions - // that need to be sent then wakeup the send thread - { - bool wakeupNeeded = false; - { - epicsGuard < epicsMutex > cacGuard ( this->iiu.mutex ); - if ( this->iiu.subscripReqPend.count() ) { - wakeupNeeded = true; - } - } - if ( wakeupNeeded ) { - this->iiu.sendThreadFlushEvent.signal (); - } - } - // // We leave the bytes pending and fetch them after // callbacks are enabled when running in the old preemptive @@ -479,6 +463,9 @@ void tcpRecvThread::run () // file manager call backs works correctly. This does not // appear to impact performance. // + if ( ! pComBuf ) { + pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; + } statusWireIO stat; pComBuf->fillFromWire ( this->iiu, stat ); @@ -492,52 +479,48 @@ void tcpRecvThread::run () if ( stat.bytesCopied == 0u ) { continue; } - // reschedule connection activity watchdog - this->iiu.recvDog.messageArrivalNotify ( guard ); + + this->iiu.recvQue.pushLastComBufReceived ( *pComBuf ); + pComBuf = 0; + + this->iiu._receiveThreadIsBusy = true; } + bool sendWakeupNeeded = false; { - // This is used to enforce that the recv thread runs - // first when both the receive watchdog and the receive - // thread are both waiting for the callback lock. - // This is a workaround for a premature disconnect if - // they dont call ca_poll often enough. - epicsGuard < epicsMutex > - recvThreadBusy ( this->iiu.recvThreadIsRunning ); - // only one recv thread at a time may call callbacks // - pendEvent() blocks until threads waiting for // this lock get a chance to run callbackManager mgr ( this->ctxNotify, this->cbMutex ); + epicsGuard < epicsMutex > guard ( this->iiu.mutex ); + // force the receive watchdog to be reset every 5 frames unsigned contiguousFrameCount = 0; while ( stat.bytesCopied ) { - { - epicsGuard < epicsMutex > guard ( this->iiu.mutex ); - if ( stat.bytesCopied == pComBuf->capacityBytes () ) { - if ( this->iiu.contigRecvMsgCount >= - contiguousMsgCountWhichTriggersFlowControl ) { - this->iiu.busyStateDetected = true; - } - else { - this->iiu.contigRecvMsgCount++; - } + if ( stat.bytesCopied == pComBuf->capacityBytes () ) { + if ( this->iiu.contigRecvMsgCount >= + contiguousMsgCountWhichTriggersFlowControl ) { + this->iiu.busyStateDetected = true; + } + else { + this->iiu.contigRecvMsgCount++; } - else { - this->iiu.contigRecvMsgCount = 0u; - this->iiu.busyStateDetected = false; - } - this->iiu.unacknowledgedSendBytes = 0u; - - this->iiu.recvQue.pushLastComBufReceived ( *pComBuf ); } - pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; + else { + this->iiu.contigRecvMsgCount = 0u; + this->iiu.busyStateDetected = false; + } + this->iiu.unacknowledgedSendBytes = 0u; + + bool protocolOK = false; + { + epicsGuardRelease < epicsMutex > unguard ( guard ); + // execute receive labor + protocolOK = this->iiu.processIncoming ( currentTime, mgr ); + } - // execute receive labor - bool protocolOK = this->iiu.processIncoming ( currentTime, mgr ); if ( ! protocolOK ) { - epicsGuard < epicsMutex > guard ( this->iiu.mutex ); this->iiu.initiateAbortShutdown ( guard ); breakOut = true; break; @@ -548,15 +531,35 @@ void tcpRecvThread::run () break; } - pComBuf->fillFromWire ( this->iiu, stat ); { - epicsGuard < epicsMutex > guard ( this->iiu.mutex ); - if ( ! this->validFillStatus ( guard, stat ) ) { - breakOut = true; - break; + epicsGuardRelease < epicsMutex > unguard ( guard ); + if ( ! pComBuf ) { + pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; } + pComBuf->fillFromWire ( this->iiu, stat ); + } + + if ( this->validFillStatus ( guard, stat ) ) { + this->iiu.recvQue.pushLastComBufReceived ( *pComBuf ); + pComBuf = 0; + } + else { + breakOut = true; + break; } } + this->iiu._receiveThreadIsBusy = false; + // reschedule connection activity watchdog + this->iiu.recvDog.messageArrivalNotify ( guard ); + // + // if this thread has connected channels with subscriptions + // that need to be sent then wakeup the send thread + if ( this->iiu.subscripReqPend.count() ) { + sendWakeupNeeded = true; + } + } + if ( sendWakeupNeeded ) { + this->iiu.sendThreadFlushEvent.signal (); } } @@ -672,6 +675,7 @@ tcpiiu::tcpiiu ( socketLibrarySendBufferSize ( 0x1000 ), unacknowledgedSendBytes ( 0u ), channelCountTot ( 0u ), + _receiveThreadIsBusy ( false ), busyStateDetected ( false ), flowControlActive ( false ), echoRequestPending ( false ), @@ -868,11 +872,6 @@ void tcpiiu::sendTimeoutNotify ( this->recvDog.sendTimeoutNotify ( mgr.cbGuard, guard ); } -void tcpiiu::deferToRecvBacklog () -{ - epicsGuard < epicsMutex > waitUntilRecvThreadIsntBusy ( this->recvThreadIsRunning ); -} - void tcpiiu::receiveTimeoutNotify ( callbackManager & mgr, epicsGuard < epicsMutex > & guard ) @@ -1033,6 +1032,8 @@ void tcpiiu::show ( unsigned level ) const static_cast < void * > ( this->pCurData ), this->curDataMax ); ::printf ( "\tcontiguous receive message count=%u, busy detect bool=%u, flow control bool=%u\n", this->contigRecvMsgCount, this->busyStateDetected, this->flowControlActive ); + ::printf ( "\receive thread is busy=%u\n", + this->_receiveThreadIsBusy ); } if ( level > 2u ) { ::printf ( "\tvirtual circuit socket identifier %d\n", this->sock ); diff --git a/src/ca/virtualCircuit.h b/src/ca/virtualCircuit.h index c15f04ce7..e2d9c9516 100644 --- a/src/ca/virtualCircuit.h +++ b/src/ca/virtualCircuit.h @@ -112,8 +112,6 @@ public: void sendTimeoutNotify ( callbackManager & cbMgr, epicsGuard < epicsMutex > & guard ); - // dont call deferToRecvBacklog() while holding the callback lock. - void deferToRecvBacklog (); void receiveTimeoutNotify( callbackManager &, epicsGuard < epicsMutex > & ); @@ -154,6 +152,8 @@ public: epicsGuard < epicsMutex > & ) const; bool connecting ( epicsGuard < epicsMutex > & ) const; + bool receiveThreadIsBusy ( + epicsGuard < epicsMutex > & ); osiSockAddr getNetworkAddress ( epicsGuard < epicsMutex > & ) const; int printf ( @@ -176,8 +176,6 @@ public: epicsGuard < epicsMutex > &, nciu & chan ); void nameResolutionMsgEndNotify (); - bool bytesArePendingInOS () const; - void * operator new ( size_t size, tsFreeList < class tcpiiu, 32, epicsMutexNOOP > & ); epicsPlacementDeleteOperator (( void *, @@ -207,7 +205,6 @@ private: char * pCurData; epicsMutex & mutex; epicsMutex & cbMutex; - epicsMutex recvThreadIsRunning; unsigned minorProtocolVersion; enum iiu_conn_state { iiucs_connecting, // pending circuit connect @@ -224,6 +221,7 @@ private: unsigned socketLibrarySendBufferSize; unsigned unacknowledgedSendBytes; unsigned channelCountTot; + bool _receiveThreadIsBusy; bool busyStateDetected; // only modified by the recv thread bool flowControlActive; // only modified by the send process thread bool echoRequestPending; @@ -254,6 +252,7 @@ private: epicsGuard < epicsMutex > & ); void disconnectNotify ( epicsGuard < epicsMutex > & ); + bool bytesArePendingInOS () const; // send protocol stubs void echoRequest ( @@ -356,6 +355,13 @@ inline bool tcpiiu::connecting ( return ( this->state == iiucs_connecting ); } +inline bool tcpiiu::receiveThreadIsBusy ( + epicsGuard < epicsMutex > & guard ) +{ + guard.assertIdenticalMutex ( this->mutex ); + return this->_receiveThreadIsBusy; +} + inline void tcpiiu::beaconAnomalyNotify ( epicsGuard < epicsMutex > & guard ) {