improved recv schedualing
This commit is contained in:
+86
-61
@@ -160,7 +160,7 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf )
|
||||
assert ( nBytesInBuf <= INT_MAX );
|
||||
|
||||
int status = ::recv ( this->sock, static_cast <char *> ( pBuf ),
|
||||
static_cast <int> ( nBytesInBuf ), this->recvFlag );
|
||||
static_cast <int> ( nBytesInBuf ), 0 );
|
||||
if ( status <= 0 ) {
|
||||
int localErrno = SOCKERRNO;
|
||||
|
||||
@@ -226,6 +226,13 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
|
||||
if ( tbs != epicsThreadBooleanStatusSuccess ) {
|
||||
priorityOfSend = piiu->pCAC ()->getInitializingThreadsPriority ();
|
||||
}
|
||||
else {
|
||||
epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove (
|
||||
priorityOfSend, &priorityOfSend );
|
||||
if ( tbs != epicsThreadBooleanStatusSuccess ) {
|
||||
priorityOfSend = piiu->pCAC ()->getInitializingThreadsPriority ();
|
||||
}
|
||||
}
|
||||
epicsThreadId tid = epicsThreadCreate ( "CAC-TCP-send", priorityOfSend,
|
||||
epicsThreadGetStackSize ( epicsThreadStackMedium ), cacSendThreadTCP, piiu );
|
||||
if ( ! tid ) {
|
||||
@@ -238,68 +245,87 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
|
||||
piiu->cleanShutdown ();
|
||||
}
|
||||
|
||||
comBuf * pComBuf = new ( std::nothrow ) comBuf;
|
||||
while ( piiu->state == iiu_connected ) {
|
||||
comBuf * pComBuf = new ( std::nothrow ) comBuf;
|
||||
if ( pComBuf ) {
|
||||
if ( piiu->preemptiveCallbackEnable ) {
|
||||
piiu->recvFlag = 0;
|
||||
}
|
||||
else {
|
||||
piiu->recvFlag = MSG_PEEK;
|
||||
}
|
||||
unsigned nBytesIn = pComBuf->fillFromWire ( *piiu );
|
||||
|
||||
// only one recv thread at a time may call callbacks
|
||||
callbackAutoMutex autoMutex ( *piiu->pCAC() );
|
||||
|
||||
//
|
||||
// We leave the bytes pending and fetch them again after
|
||||
// callbacks are enabled when running in the old preemptive
|
||||
// call back disabled mode so that asynchronous wakeup via
|
||||
// file manager call backs works correctly. Oddly enough,
|
||||
// this does not appear to impact performance.
|
||||
//
|
||||
if ( ! piiu->preemptiveCallbackEnable ) {
|
||||
pComBuf->clear ();
|
||||
piiu->recvFlag = 0;
|
||||
nBytesIn = pComBuf->fillFromWire ( *piiu );
|
||||
}
|
||||
|
||||
if ( nBytesIn ) {
|
||||
piiu->recvQue.pushLastComBufReceived ( *pComBuf );
|
||||
if ( nBytesIn == pComBuf->capacityBytes () ) {
|
||||
if ( piiu->contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) {
|
||||
piiu->busyStateDetected = true;
|
||||
}
|
||||
else {
|
||||
piiu->contigRecvMsgCount++;
|
||||
}
|
||||
}
|
||||
else {
|
||||
piiu->contigRecvMsgCount = 0u;
|
||||
piiu->busyStateDetected = false;
|
||||
}
|
||||
piiu->unacknowledgedSendBytes = 0u;
|
||||
|
||||
// reschedule connection activity watchdog
|
||||
// but dont hold the lock for fear of deadlocking
|
||||
// because cancel is blocking for the completion
|
||||
// of the recvDog expire which takes the lock
|
||||
piiu->recvDog.messageArrivalNotify ();
|
||||
|
||||
//
|
||||
// execute receive labor
|
||||
//
|
||||
piiu->processIncoming ();
|
||||
}
|
||||
else {
|
||||
pComBuf->destroy ();
|
||||
}
|
||||
}
|
||||
else {
|
||||
if ( ! pComBuf ) {
|
||||
// no way to be informed when memory is available
|
||||
epicsThreadSleep ( 0.5 );
|
||||
pComBuf = new ( std::nothrow ) comBuf;
|
||||
continue;
|
||||
}
|
||||
|
||||
//
|
||||
// We leave the bytes pending and fetch them after
|
||||
// callbacks are enabled when running in the old preemptive
|
||||
// call back disabled mode so that asynchronous wakeup via
|
||||
// file manager call backs works correctly. This does not
|
||||
// appear to impact performance.
|
||||
//
|
||||
unsigned nBytesIn;
|
||||
if ( piiu->preemptiveCallbackEnable ) {
|
||||
nBytesIn = pComBuf->fillFromWire ( *piiu );
|
||||
}
|
||||
else {
|
||||
char oneByte;
|
||||
int status = ::recv ( piiu->sock, & oneByte,
|
||||
sizeof ( oneByte ), MSG_PEEK );
|
||||
nBytesIn = 0u;
|
||||
}
|
||||
|
||||
// only one recv thread at a time may call callbacks
|
||||
callbackAutoMutex autoMutex ( *piiu->pCAC() );
|
||||
|
||||
osiSockIoctl_t bytesPending;
|
||||
do {
|
||||
if ( nBytesIn == 0u ) {
|
||||
nBytesIn = pComBuf->fillFromWire ( *piiu );
|
||||
if ( nBytesIn == 0u ) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
piiu->recvQue.pushLastComBufReceived ( *pComBuf );
|
||||
|
||||
if ( nBytesIn == pComBuf->capacityBytes () ) {
|
||||
if ( piiu->contigRecvMsgCount >=
|
||||
contiguousMsgCountWhichTriggersFlowControl ) {
|
||||
piiu->busyStateDetected = true;
|
||||
}
|
||||
else {
|
||||
piiu->contigRecvMsgCount++;
|
||||
}
|
||||
}
|
||||
else {
|
||||
piiu->contigRecvMsgCount = 0u;
|
||||
piiu->busyStateDetected = false;
|
||||
}
|
||||
piiu->unacknowledgedSendBytes = 0u;
|
||||
|
||||
// reschedule connection activity watchdog
|
||||
// but dont hold the lock for fear of deadlocking
|
||||
// because cancel is blocking for the completion
|
||||
// of the recvDog expire which takes the lock
|
||||
piiu->recvDog.messageArrivalNotify ();
|
||||
|
||||
// execute receive labor
|
||||
piiu->processIncoming ();
|
||||
|
||||
// allocate a new com buf
|
||||
pComBuf = new ( std::nothrow ) comBuf;
|
||||
nBytesIn = 0u;
|
||||
|
||||
{
|
||||
int status;
|
||||
status = socket_ioctl ( piiu->sock, FIONREAD, & bytesPending );
|
||||
if ( status ) {
|
||||
bytesPending = 0u;
|
||||
}
|
||||
}
|
||||
} while ( bytesPending && pComBuf );
|
||||
}
|
||||
|
||||
if ( pComBuf ) {
|
||||
pComBuf->destroy ();
|
||||
}
|
||||
|
||||
{
|
||||
@@ -336,7 +362,6 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout,
|
||||
blockingForFlush ( 0u ),
|
||||
socketLibrarySendBufferSize ( 0u ),
|
||||
unacknowledgedSendBytes ( 0u ),
|
||||
recvFlag ( 0 ),
|
||||
busyStateDetected ( false ),
|
||||
flowControlActive ( false ),
|
||||
echoRequestPending ( false ),
|
||||
@@ -430,7 +455,7 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout,
|
||||
memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) );
|
||||
|
||||
unsigned priorityOfRecv;
|
||||
epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove (
|
||||
epicsThreadBooleanStatus tbs = epicsThreadHighestPriorityLevelBelow (
|
||||
this->pCAC()->getInitializingThreadsPriority (), &priorityOfRecv );
|
||||
if ( tbs != epicsThreadBooleanStatusSuccess ) {
|
||||
priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority ();
|
||||
|
||||
Reference in New Issue
Block a user