diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 2cafcd2ae..1a12235de 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -213,14 +213,12 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() ); if ( piiu->state == iiu_connected ) { unsigned priorityOfSend; - epicsThreadBooleanStatus tbs; - epicsThreadId tid; - - tbs = epicsThreadLowestPriorityLevelAbove ( piiu->pCAC ()->getInitializingThreadsPriority (), &priorityOfSend ); + epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove ( + piiu->pCAC ()->getInitializingThreadsPriority (), &priorityOfSend ); if ( tbs != epicsThreadBooleanStatusSuccess ) { priorityOfSend = piiu->pCAC ()->getInitializingThreadsPriority (); } - tid = epicsThreadCreate ( "CAC-TCP-send", priorityOfSend, + epicsThreadId tid = epicsThreadCreate ( "CAC-TCP-send", priorityOfSend, epicsThreadGetStackSize ( epicsThreadStackMedium ), cacSendThreadTCP, piiu ); if ( ! tid ) { piiu->recvThreadExitEvent.signal (); @@ -335,7 +333,8 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, msgHeaderAvailable ( false ), sockCloseCompleted ( false ), f_trueOnceOnly ( true ), - earlyFlush ( false ) + earlyFlush ( false ), + recvProcessPostponedFlush ( false ) { if ( ! this->pCurData ) { throw std::bad_alloc (); @@ -421,7 +420,7 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) ); unsigned priorityOfRecv; - epicsThreadBooleanStatus tbs = epicsThreadHighestPriorityLevelBelow ( + epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove ( this->pCAC ()->getInitializingThreadsPriority (), &priorityOfRecv ); if ( tbs != epicsThreadBooleanStatusSuccess ) { priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority (); @@ -438,8 +437,6 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, } } - - /* * tcpiiu::connect () */ @@ -766,6 +763,7 @@ void tcpiiu::processIncoming () if ( ! this->msgHeaderAvailable ) { if ( ! this->oldMsgHeaderAvailable ) { if ( nBytes < sizeof ( caHdr ) ) { + this->flushIfRecvProcessRequested (); return; } this->curMsg.m_cmmd = this->recvQue.popUInt16 (); @@ -780,6 +778,7 @@ void tcpiiu::processIncoming () static const unsigned annexSize = sizeof ( this->curMsg.m_postsize ) + sizeof ( this->curMsg.m_count ); if ( this->recvQue.occupiedBytes () < annexSize ) { + this->flushIfRecvProcessRequested (); return; } this->curMsg.m_postsize = this->recvQue.popUInt32 (); @@ -824,6 +823,7 @@ void tcpiiu::processIncoming () this->curMsg.m_postsize - this->curDataBytes ); if ( this->curDataBytes < this->curMsg.m_postsize ) { this->recvThreadRingBufferSpaceAvailableEvent.signal (); + this->flushIfRecvProcessRequested (); return; } } @@ -840,11 +840,13 @@ void tcpiiu::processIncoming () this->printf ( "CAC: response with payload size=%u > EPICS_CA_MAX_ARRAY_BYTES ignored\n", this->curMsg.m_postsize ); + once = true; } this->curDataBytes += this->recvQue.removeBytes ( this->curMsg.m_postsize - this->curDataBytes ); if ( this->curDataBytes < this->curMsg.m_postsize ) { this->recvThreadRingBufferSpaceAvailableEvent.signal (); + this->flushIfRecvProcessRequested (); return; } } @@ -1310,6 +1312,12 @@ bool tcpiiu::ca_v42_ok () const return CA_V42 ( this->minorProtocolVersion ); } +void tcpiiu::requestRecvProcessPostponedFlush () +{ + this->recvProcessPostponedFlush = true; +} + +