From 2e34ee4829695206ac7365656ddf3cdb3db92088 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Thu, 2 Aug 2001 18:17:20 +0000 Subject: [PATCH] improved recv schedualing --- src/ca/tcpiiu.cpp | 147 +++++++++++++++++++++++++++------------------- 1 file changed, 86 insertions(+), 61 deletions(-) diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index ce30a7438..bc563c69e 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -160,7 +160,7 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf ) assert ( nBytesInBuf <= INT_MAX ); int status = ::recv ( this->sock, static_cast ( pBuf ), - static_cast ( nBytesInBuf ), this->recvFlag ); + static_cast ( nBytesInBuf ), 0 ); if ( status <= 0 ) { int localErrno = SOCKERRNO; @@ -226,6 +226,13 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) if ( tbs != epicsThreadBooleanStatusSuccess ) { priorityOfSend = piiu->pCAC ()->getInitializingThreadsPriority (); } + else { + epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove ( + priorityOfSend, &priorityOfSend ); + if ( tbs != epicsThreadBooleanStatusSuccess ) { + priorityOfSend = piiu->pCAC ()->getInitializingThreadsPriority (); + } + } epicsThreadId tid = epicsThreadCreate ( "CAC-TCP-send", priorityOfSend, epicsThreadGetStackSize ( epicsThreadStackMedium ), cacSendThreadTCP, piiu ); if ( ! tid ) { @@ -238,68 +245,87 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) piiu->cleanShutdown (); } + comBuf * pComBuf = new ( std::nothrow ) comBuf; while ( piiu->state == iiu_connected ) { - comBuf * pComBuf = new ( std::nothrow ) comBuf; - if ( pComBuf ) { - if ( piiu->preemptiveCallbackEnable ) { - piiu->recvFlag = 0; - } - else { - piiu->recvFlag = MSG_PEEK; - } - unsigned nBytesIn = pComBuf->fillFromWire ( *piiu ); - - // only one recv thread at a time may call callbacks - callbackAutoMutex autoMutex ( *piiu->pCAC() ); - - // - // We leave the bytes pending and fetch them again after - // callbacks are enabled when running in the old preemptive - // call back disabled mode so that asynchronous wakeup via - // file manager call backs works correctly. Oddly enough, - // this does not appear to impact performance. - // - if ( ! piiu->preemptiveCallbackEnable ) { - pComBuf->clear (); - piiu->recvFlag = 0; - nBytesIn = pComBuf->fillFromWire ( *piiu ); - } - - if ( nBytesIn ) { - piiu->recvQue.pushLastComBufReceived ( *pComBuf ); - if ( nBytesIn == pComBuf->capacityBytes () ) { - if ( piiu->contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) { - piiu->busyStateDetected = true; - } - else { - piiu->contigRecvMsgCount++; - } - } - else { - piiu->contigRecvMsgCount = 0u; - piiu->busyStateDetected = false; - } - piiu->unacknowledgedSendBytes = 0u; - - // reschedule connection activity watchdog - // but dont hold the lock for fear of deadlocking - // because cancel is blocking for the completion - // of the recvDog expire which takes the lock - piiu->recvDog.messageArrivalNotify (); - - // - // execute receive labor - // - piiu->processIncoming (); - } - else { - pComBuf->destroy (); - } - } - else { + if ( ! pComBuf ) { // no way to be informed when memory is available epicsThreadSleep ( 0.5 ); + pComBuf = new ( std::nothrow ) comBuf; + continue; } + + // + // We leave the bytes pending and fetch them after + // callbacks are enabled when running in the old preemptive + // call back disabled mode so that asynchronous wakeup via + // file manager call backs works correctly. This does not + // appear to impact performance. + // + unsigned nBytesIn; + if ( piiu->preemptiveCallbackEnable ) { + nBytesIn = pComBuf->fillFromWire ( *piiu ); + } + else { + char oneByte; + int status = ::recv ( piiu->sock, & oneByte, + sizeof ( oneByte ), MSG_PEEK ); + nBytesIn = 0u; + } + + // only one recv thread at a time may call callbacks + callbackAutoMutex autoMutex ( *piiu->pCAC() ); + + osiSockIoctl_t bytesPending; + do { + if ( nBytesIn == 0u ) { + nBytesIn = pComBuf->fillFromWire ( *piiu ); + if ( nBytesIn == 0u ) { + break; + } + } + + piiu->recvQue.pushLastComBufReceived ( *pComBuf ); + + if ( nBytesIn == pComBuf->capacityBytes () ) { + if ( piiu->contigRecvMsgCount >= + contiguousMsgCountWhichTriggersFlowControl ) { + piiu->busyStateDetected = true; + } + else { + piiu->contigRecvMsgCount++; + } + } + else { + piiu->contigRecvMsgCount = 0u; + piiu->busyStateDetected = false; + } + piiu->unacknowledgedSendBytes = 0u; + + // reschedule connection activity watchdog + // but dont hold the lock for fear of deadlocking + // because cancel is blocking for the completion + // of the recvDog expire which takes the lock + piiu->recvDog.messageArrivalNotify (); + + // execute receive labor + piiu->processIncoming (); + + // allocate a new com buf + pComBuf = new ( std::nothrow ) comBuf; + nBytesIn = 0u; + + { + int status; + status = socket_ioctl ( piiu->sock, FIONREAD, & bytesPending ); + if ( status ) { + bytesPending = 0u; + } + } + } while ( bytesPending && pComBuf ); + } + + if ( pComBuf ) { + pComBuf->destroy (); } { @@ -336,7 +362,6 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, blockingForFlush ( 0u ), socketLibrarySendBufferSize ( 0u ), unacknowledgedSendBytes ( 0u ), - recvFlag ( 0 ), busyStateDetected ( false ), flowControlActive ( false ), echoRequestPending ( false ), @@ -430,7 +455,7 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) ); unsigned priorityOfRecv; - epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove ( + epicsThreadBooleanStatus tbs = epicsThreadHighestPriorityLevelBelow ( this->pCAC()->getInitializingThreadsPriority (), &priorityOfRecv ); if ( tbs != epicsThreadBooleanStatusSuccess ) { priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority ();