From b34484136552be9c05c0aca3f6179edcd0d0d13d Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Thu, 13 Aug 2009 23:29:02 +0000 Subject: [PATCH] fixed the following issues with detection of flow control mode o the flow control contig frame thresh should be based on max array size o it appears that the wakeup mechanism for the send thread, who sends the flow control on/off messages was broken o looking at full buffers isnt a perfect detection scheme compared to the simpler approach of just checking to see if bytes are pending in the socket before calling recv --- src/ca/cac.cpp | 6 ++++++ src/ca/cac.h | 8 ++++++++ src/ca/tcpiiu.cpp | 49 +++++++++++++++++++++++++++++++++-------------- 3 files changed, 49 insertions(+), 14 deletions(-) diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 4205fc1f3..6961d95d3 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -143,6 +143,7 @@ cac::cac ( initializingThreadsId ( epicsThreadGetIdSelf() ), initializingThreadsPriority ( epicsThreadGetPrioritySelf() ), maxRecvBytesTCP ( MAX_TCP ), + maxContigFrames ( contiguousMsgCountWhichTriggersFlowControl ), beaconAnomalyCount ( 0u ), iiuExistenceCount ( 0u ) { @@ -215,6 +216,11 @@ cac::cac ( if ( ! this->tcpLargeRecvBufFreeList ) { throw std::bad_alloc (); } + unsigned bufsPerArray = this->maxRecvBytesTCP / comBuf::capacityBytes (); + if ( bufsPerArray > 1u ) { + maxContigFrames = bufsPerArray * + contiguousMsgCountWhichTriggersFlowControl; + } } catch ( ... ) { osiSockRelease (); diff --git a/src/ca/cac.h b/src/ca/cac.h index 34efd6c17..4c0d23da4 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -198,6 +198,7 @@ public: unsigned largeBufferSizeTCP () const; char * allocateLargeBufferTCP (); void releaseLargeBufferTCP ( char * ); + unsigned maxContiguousFrames ( epicsGuard < epicsMutex > & ) const; // misc const char * userNamePointer () const; @@ -272,6 +273,7 @@ private: epicsThreadId initializingThreadsId; unsigned initializingThreadsPriority; unsigned maxRecvBytesTCP; + unsigned maxContigFrames; unsigned beaconAnomalyCount; unsigned iiuExistenceCount; @@ -442,5 +444,11 @@ inline const char * cac :: pLocalHostName () return _refLocalHostName->pointer (); } +inline unsigned cac :: + maxContiguousFrames ( epicsGuard < epicsMutex > & ) const +{ + return maxContigFrames; +} + #endif // ifdef cach diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index e96dbd38e..13c5368ca 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -468,6 +468,7 @@ void tcpRecvThread::run () if ( ! pComBuf ) { pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; } + statusWireIO stat; pComBuf->fillFromWire ( this->iiu, stat ); @@ -497,7 +498,6 @@ void tcpRecvThread::run () callbackManager mgr ( this->ctxNotify, this->cbMutex ); epicsGuard < epicsMutex > guard ( this->iiu.mutex ); - // route legacy V42 channel connect through the recv thread - // the only thread that should be taking the callback lock @@ -506,19 +506,6 @@ void tcpRecvThread::run () pChan->connect ( mgr.cbGuard, guard ); } - if ( stat.bytesCopied == pComBuf->capacityBytes () ) { - if ( this->iiu.contigRecvMsgCount >= - contiguousMsgCountWhichTriggersFlowControl ) { - this->iiu.busyStateDetected = true; - } - else { - this->iiu.contigRecvMsgCount++; - } - } - else { - this->iiu.contigRecvMsgCount = 0u; - this->iiu.busyStateDetected = false; - } this->iiu.unacknowledgedSendBytes = 0u; bool protocolOK = false; @@ -542,6 +529,40 @@ void tcpRecvThread::run () sendWakeupNeeded = true; } } + + // + // we dont feel comfortable calling this with a lock applied + // (it might block for longer than we like) + // + // we would prefer to improve efficency by trying, first, a + // recv with the new MSG_DONTWAIT flag set, but there isnt + // universal support + // + bool bytesArePending = this->iiu.bytesArePendingInOS (); + { + epicsGuard < epicsMutex > guard ( this->iiu.mutex ); + if ( bytesArePending ) { + if ( ! this->iiu.busyStateDetected ) { + this->iiu.contigRecvMsgCount++; + if ( this->iiu.contigRecvMsgCount >= + this->iiu.cacRef.maxContiguousFrames ( guard ) ) { + this->iiu.busyStateDetected = true; + sendWakeupNeeded = true; + } + } + } + else { + // if no bytes are pending then we must immediately + // switch off flow control w/o waiting for more + // data to arrive + this->iiu.contigRecvMsgCount = 0u; + if ( this->iiu.busyStateDetected ) { + sendWakeupNeeded = true; + this->iiu.busyStateDetected = false; + } + } + } + if ( sendWakeupNeeded ) { this->iiu.sendThreadFlushEvent.signal (); }