From df16e0238ae2ad07cc9cdcc435da8be49ebdb1c0 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Mon, 31 Oct 2005 21:57:14 +0000 Subject: [PATCH] fix for mantis 111: force recv thread to run first when both receive watchdog and receive thread are blocking for the preeemptive callback control lock I also added a guard in a place that appeared to be not adaquately protected. --- src/ca/tcpiiu.cpp | 100 +++++++++++++++++++++++++++------------------- 1 file changed, 59 insertions(+), 41 deletions(-) diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index b3109cfe9..73c8c2acb 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -497,53 +497,66 @@ void tcpRecvThread::run () guard, currentTime ); } - // only one recv thread at a time may call callbacks - // - pendEvent() blocks until threads waiting for - // this lock get a chance to run - callbackManager mgr ( this->ctxNotify, this->cbMutex ); + { + // This is used to enforce that the recv thread runs + // first when both the receive watchdog and the receive + // thread are both waiting for the callback lock. + // This is a workaround for a premature disconnect if + // they dont call ca_poll often enough. + epicsGuard < epicsMutex > + recvThreadBusy ( this->iiu.recvThreadIsRunning ); - // force the receive watchdog to be reset every 5 frames - unsigned contiguousFrameCount = 0; - while ( stat.bytesCopied ) { - if ( stat.bytesCopied == pComBuf->capacityBytes () ) { - if ( this->iiu.contigRecvMsgCount >= - contiguousMsgCountWhichTriggersFlowControl ) { - this->iiu.busyStateDetected = true; + // only one recv thread at a time may call callbacks + // - pendEvent() blocks until threads waiting for + // this lock get a chance to run + callbackManager mgr ( this->ctxNotify, this->cbMutex ); + + // force the receive watchdog to be reset every 5 frames + unsigned contiguousFrameCount = 0; + while ( stat.bytesCopied ) { + { + epicsGuard < epicsMutex > guard ( this->iiu.mutex ); + if ( stat.bytesCopied == pComBuf->capacityBytes () ) { + if ( this->iiu.contigRecvMsgCount >= + contiguousMsgCountWhichTriggersFlowControl ) { + this->iiu.busyStateDetected = true; + } + else { + this->iiu.contigRecvMsgCount++; + } + } + else { + this->iiu.contigRecvMsgCount = 0u; + this->iiu.busyStateDetected = false; + } + this->iiu.unacknowledgedSendBytes = 0u; + + this->iiu.recvQue.pushLastComBufReceived ( *pComBuf ); } - else { - this->iiu.contigRecvMsgCount++; - } - } - else { - this->iiu.contigRecvMsgCount = 0u; - this->iiu.busyStateDetected = false; - } - this->iiu.unacknowledgedSendBytes = 0u; + pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; - this->iiu.recvQue.pushLastComBufReceived ( *pComBuf ); - pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; - - // execute receive labor - bool protocolOK = this->iiu.processIncoming ( currentTime, mgr ); - if ( ! protocolOK ) { - epicsGuard < epicsMutex > guard ( this->iiu.mutex ); - this->iiu.initiateAbortShutdown ( guard ); - breakOut = true; - break; - } - - if ( ! this->iiu.bytesArePendingInOS () - || ++contiguousFrameCount > 5 ) { - break; - } - - pComBuf->fillFromWire ( this->iiu, stat ); - { - epicsGuard < epicsMutex > guard ( this->iiu.mutex ); - if ( ! this->validFillStatus ( guard, stat ) ) { + // execute receive labor + bool protocolOK = this->iiu.processIncoming ( currentTime, mgr ); + if ( ! protocolOK ) { + epicsGuard < epicsMutex > guard ( this->iiu.mutex ); + this->iiu.initiateAbortShutdown ( guard ); breakOut = true; break; } + + if ( ! this->iiu.bytesArePendingInOS () + || ++contiguousFrameCount > 5 ) { + break; + } + + pComBuf->fillFromWire ( this->iiu, stat ); + { + epicsGuard < epicsMutex > guard ( this->iiu.mutex ); + if ( ! this->validFillStatus ( guard, stat ) ) { + breakOut = true; + break; + } + } } } } @@ -857,6 +870,11 @@ void tcpiiu::sendTimeoutNotify ( this->recvDog.sendTimeoutNotify ( mgr.cbGuard, guard, currentTime ); } +void tcpiiu::deferToRecvBacklog () +{ + epicsGuard < epicsMutex > waitUntilRecvThreadIsntBusy ( this->recvThreadIsRunning ); +} + void tcpiiu::receiveTimeoutNotify ( callbackManager & mgr, epicsGuard < epicsMutex > & guard )