fix for mantis 111: force recv thread to run first when both receive watchdog
and receive thread are blocking for the preeemptive callback control lock I also added a guard in a place that appeared to be not adaquately protected.
This commit is contained in:
@@ -497,53 +497,66 @@ void tcpRecvThread::run ()
|
||||
guard, currentTime );
|
||||
}
|
||||
|
||||
// only one recv thread at a time may call callbacks
|
||||
// - pendEvent() blocks until threads waiting for
|
||||
// this lock get a chance to run
|
||||
callbackManager mgr ( this->ctxNotify, this->cbMutex );
|
||||
{
|
||||
// This is used to enforce that the recv thread runs
|
||||
// first when both the receive watchdog and the receive
|
||||
// thread are both waiting for the callback lock.
|
||||
// This is a workaround for a premature disconnect if
|
||||
// they dont call ca_poll often enough.
|
||||
epicsGuard < epicsMutex >
|
||||
recvThreadBusy ( this->iiu.recvThreadIsRunning );
|
||||
|
||||
// force the receive watchdog to be reset every 5 frames
|
||||
unsigned contiguousFrameCount = 0;
|
||||
while ( stat.bytesCopied ) {
|
||||
if ( stat.bytesCopied == pComBuf->capacityBytes () ) {
|
||||
if ( this->iiu.contigRecvMsgCount >=
|
||||
contiguousMsgCountWhichTriggersFlowControl ) {
|
||||
this->iiu.busyStateDetected = true;
|
||||
// only one recv thread at a time may call callbacks
|
||||
// - pendEvent() blocks until threads waiting for
|
||||
// this lock get a chance to run
|
||||
callbackManager mgr ( this->ctxNotify, this->cbMutex );
|
||||
|
||||
// force the receive watchdog to be reset every 5 frames
|
||||
unsigned contiguousFrameCount = 0;
|
||||
while ( stat.bytesCopied ) {
|
||||
{
|
||||
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
|
||||
if ( stat.bytesCopied == pComBuf->capacityBytes () ) {
|
||||
if ( this->iiu.contigRecvMsgCount >=
|
||||
contiguousMsgCountWhichTriggersFlowControl ) {
|
||||
this->iiu.busyStateDetected = true;
|
||||
}
|
||||
else {
|
||||
this->iiu.contigRecvMsgCount++;
|
||||
}
|
||||
}
|
||||
else {
|
||||
this->iiu.contigRecvMsgCount = 0u;
|
||||
this->iiu.busyStateDetected = false;
|
||||
}
|
||||
this->iiu.unacknowledgedSendBytes = 0u;
|
||||
|
||||
this->iiu.recvQue.pushLastComBufReceived ( *pComBuf );
|
||||
}
|
||||
else {
|
||||
this->iiu.contigRecvMsgCount++;
|
||||
}
|
||||
}
|
||||
else {
|
||||
this->iiu.contigRecvMsgCount = 0u;
|
||||
this->iiu.busyStateDetected = false;
|
||||
}
|
||||
this->iiu.unacknowledgedSendBytes = 0u;
|
||||
pComBuf = new ( this->iiu.comBufMemMgr ) comBuf;
|
||||
|
||||
this->iiu.recvQue.pushLastComBufReceived ( *pComBuf );
|
||||
pComBuf = new ( this->iiu.comBufMemMgr ) comBuf;
|
||||
|
||||
// execute receive labor
|
||||
bool protocolOK = this->iiu.processIncoming ( currentTime, mgr );
|
||||
if ( ! protocolOK ) {
|
||||
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
|
||||
this->iiu.initiateAbortShutdown ( guard );
|
||||
breakOut = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if ( ! this->iiu.bytesArePendingInOS ()
|
||||
|| ++contiguousFrameCount > 5 ) {
|
||||
break;
|
||||
}
|
||||
|
||||
pComBuf->fillFromWire ( this->iiu, stat );
|
||||
{
|
||||
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
|
||||
if ( ! this->validFillStatus ( guard, stat ) ) {
|
||||
// execute receive labor
|
||||
bool protocolOK = this->iiu.processIncoming ( currentTime, mgr );
|
||||
if ( ! protocolOK ) {
|
||||
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
|
||||
this->iiu.initiateAbortShutdown ( guard );
|
||||
breakOut = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if ( ! this->iiu.bytesArePendingInOS ()
|
||||
|| ++contiguousFrameCount > 5 ) {
|
||||
break;
|
||||
}
|
||||
|
||||
pComBuf->fillFromWire ( this->iiu, stat );
|
||||
{
|
||||
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
|
||||
if ( ! this->validFillStatus ( guard, stat ) ) {
|
||||
breakOut = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -857,6 +870,11 @@ void tcpiiu::sendTimeoutNotify (
|
||||
this->recvDog.sendTimeoutNotify ( mgr.cbGuard, guard, currentTime );
|
||||
}
|
||||
|
||||
void tcpiiu::deferToRecvBacklog ()
|
||||
{
|
||||
epicsGuard < epicsMutex > waitUntilRecvThreadIsntBusy ( this->recvThreadIsRunning );
|
||||
}
|
||||
|
||||
void tcpiiu::receiveTimeoutNotify (
|
||||
callbackManager & mgr,
|
||||
epicsGuard < epicsMutex > & guard )
|
||||
|
||||
Reference in New Issue
Block a user