improved schedualing of recv process thread

This commit is contained in:
Jeff Hill
2001-07-11 23:28:57 +00:00
parent 72cbc5edf0
commit ca2c93bda4

View File

@@ -213,14 +213,12 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() );
if ( piiu->state == iiu_connected ) {
unsigned priorityOfSend;
epicsThreadBooleanStatus tbs;
epicsThreadId tid;
tbs = epicsThreadLowestPriorityLevelAbove ( piiu->pCAC ()->getInitializingThreadsPriority (), &priorityOfSend );
epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove (
piiu->pCAC ()->getInitializingThreadsPriority (), &priorityOfSend );
if ( tbs != epicsThreadBooleanStatusSuccess ) {
priorityOfSend = piiu->pCAC ()->getInitializingThreadsPriority ();
}
tid = epicsThreadCreate ( "CAC-TCP-send", priorityOfSend,
epicsThreadId tid = epicsThreadCreate ( "CAC-TCP-send", priorityOfSend,
epicsThreadGetStackSize ( epicsThreadStackMedium ), cacSendThreadTCP, piiu );
if ( ! tid ) {
piiu->recvThreadExitEvent.signal ();
@@ -335,7 +333,8 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout,
msgHeaderAvailable ( false ),
sockCloseCompleted ( false ),
f_trueOnceOnly ( true ),
earlyFlush ( false )
earlyFlush ( false ),
recvProcessPostponedFlush ( false )
{
if ( ! this->pCurData ) {
throw std::bad_alloc ();
@@ -421,7 +420,7 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout,
memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) );
unsigned priorityOfRecv;
epicsThreadBooleanStatus tbs = epicsThreadHighestPriorityLevelBelow (
epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove (
this->pCAC ()->getInitializingThreadsPriority (), &priorityOfRecv );
if ( tbs != epicsThreadBooleanStatusSuccess ) {
priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority ();
@@ -438,8 +437,6 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout,
}
}
/*
* tcpiiu::connect ()
*/
@@ -766,6 +763,7 @@ void tcpiiu::processIncoming ()
if ( ! this->msgHeaderAvailable ) {
if ( ! this->oldMsgHeaderAvailable ) {
if ( nBytes < sizeof ( caHdr ) ) {
this->flushIfRecvProcessRequested ();
return;
}
this->curMsg.m_cmmd = this->recvQue.popUInt16 ();
@@ -780,6 +778,7 @@ void tcpiiu::processIncoming ()
static const unsigned annexSize =
sizeof ( this->curMsg.m_postsize ) + sizeof ( this->curMsg.m_count );
if ( this->recvQue.occupiedBytes () < annexSize ) {
this->flushIfRecvProcessRequested ();
return;
}
this->curMsg.m_postsize = this->recvQue.popUInt32 ();
@@ -824,6 +823,7 @@ void tcpiiu::processIncoming ()
this->curMsg.m_postsize - this->curDataBytes );
if ( this->curDataBytes < this->curMsg.m_postsize ) {
this->recvThreadRingBufferSpaceAvailableEvent.signal ();
this->flushIfRecvProcessRequested ();
return;
}
}
@@ -840,11 +840,13 @@ void tcpiiu::processIncoming ()
this->printf (
"CAC: response with payload size=%u > EPICS_CA_MAX_ARRAY_BYTES ignored\n",
this->curMsg.m_postsize );
once = true;
}
this->curDataBytes += this->recvQue.removeBytes (
this->curMsg.m_postsize - this->curDataBytes );
if ( this->curDataBytes < this->curMsg.m_postsize ) {
this->recvThreadRingBufferSpaceAvailableEvent.signal ();
this->flushIfRecvProcessRequested ();
return;
}
}
@@ -1310,6 +1312,12 @@ bool tcpiiu::ca_v42_ok () const
return CA_V42 ( this->minorProtocolVersion );
}
void tcpiiu::requestRecvProcessPostponedFlush ()
{
this->recvProcessPostponedFlush = true;
}