improved schedualing of recv process thread
This commit is contained in:
@@ -117,12 +117,10 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) :
|
||||
threadsBlockingOnNotifyCompletion ( 0u ),
|
||||
maxRecvBytesTCP ( MAX_TCP ),
|
||||
recvProcessEnableRefCount ( 0u ),
|
||||
recvProcessCompletionNBlockers ( 0u ),
|
||||
pndRecvCnt ( 0u ),
|
||||
readSeq ( 0u ),
|
||||
enablePreemptiveCallback ( enablePreemptiveCallbackIn ),
|
||||
ioInProgress ( false ),
|
||||
recvProcessInProgress ( false ),
|
||||
recvProcessThreadExitRequest ( false )
|
||||
{
|
||||
long status;
|
||||
@@ -211,6 +209,7 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) :
|
||||
freeListCleanup ( this->tcpLargeRecvBufFreeList );
|
||||
throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) );
|
||||
}
|
||||
this->preemptiveCallbackLock.lock ();
|
||||
}
|
||||
|
||||
cac::~cac ()
|
||||
@@ -371,9 +370,8 @@ void cac::show ( unsigned level ) const
|
||||
{
|
||||
epicsAutoMutex autoMutex2 ( this->mutex );
|
||||
|
||||
::printf ( "Channel Access Client Context at %p for user %s %s\n",
|
||||
static_cast <const void *> ( this ), this->pUserName,
|
||||
this->recvProcessInProgress ? "busy" : "idle" );
|
||||
::printf ( "Channel Access Client Context at %p for user %s\n",
|
||||
static_cast <const void *> ( this ), this->pUserName );
|
||||
if ( level > 0u ) {
|
||||
tsDLIterConstBD < tcpiiu > piiu = this->iiuList.firstIter ();
|
||||
while ( piiu.valid() ) {
|
||||
@@ -423,8 +421,6 @@ void cac::show ( unsigned level ) const
|
||||
this->ipToAEngine.show ( level - 3u );
|
||||
::printf ( "recv process enable count %u\n",
|
||||
this->recvProcessEnableRefCount );
|
||||
::printf ( "recv process blocking for completion count %u\n",
|
||||
this->recvProcessCompletionNBlockers );
|
||||
::printf ( "\trecv process shutdown command boolean %u\n",
|
||||
this->recvProcessThreadExitRequest );
|
||||
::printf ( "\tthe current read sequence number is %u\n",
|
||||
@@ -438,8 +434,6 @@ void cac::show ( unsigned level ) const
|
||||
this->recvProcessActivityEvent.show ( level - 3u );
|
||||
::printf ( "Receive process exit event:\n" );
|
||||
this->recvProcessThreadExit.show ( level - 3u );
|
||||
::printf ( "Receive processing done event:\n" );
|
||||
this->recvProcessCompletion.show ( level - 3u );
|
||||
::printf ( "IO done event:\n");
|
||||
this->ioDone.show ( level - 3u );
|
||||
::printf ( "mutex:\n" );
|
||||
@@ -662,9 +656,15 @@ void cac::startRecvProcessThread ()
|
||||
{
|
||||
epicsAutoMutex epicsMutex ( this->mutex );
|
||||
if ( ! this->pRecvProcessThread ) {
|
||||
unsigned priorityOfProcess;
|
||||
epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove (
|
||||
this->getInitializingThreadsPriority (), &priorityOfProcess );
|
||||
if ( tbs != epicsThreadBooleanStatusSuccess ) {
|
||||
priorityOfProcess = this->getInitializingThreadsPriority ();
|
||||
}
|
||||
|
||||
this->pRecvProcessThread = new epicsThread ( *this, "CAC-recv-process",
|
||||
epicsThreadGetStackSize ( epicsThreadStackSmall ),
|
||||
this->getInitializingThreadsPriority () );
|
||||
epicsThreadGetStackSize ( epicsThreadStackSmall ), priorityOfProcess );
|
||||
if ( ! this->pRecvProcessThread ) {
|
||||
throw std::bad_alloc ();
|
||||
}
|
||||
@@ -680,23 +680,14 @@ void cac::startRecvProcessThread ()
|
||||
// this is the recv process thread entry point
|
||||
void cac::run ()
|
||||
{
|
||||
printf ("recv process thread id is %x\n", epicsThreadGetIdSelf());
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
this->attachToClientCtx ();
|
||||
while ( ! this->recvProcessThreadExitRequest ) {
|
||||
if ( this->recvProcessEnableRefCount ) {
|
||||
this->recvProcessInProgress = true;
|
||||
this->processRecvBacklog ();
|
||||
this->recvProcessInProgress = false;
|
||||
}
|
||||
bool signalNeeded = this->recvProcessCompletionNBlockers > 0u;
|
||||
{
|
||||
epicsAutoMutexRelease autoRelease ( this->mutex );
|
||||
if ( signalNeeded ) {
|
||||
this->recvProcessCompletion.signal ();
|
||||
}
|
||||
this->recvProcessActivityEvent.wait ();
|
||||
epicsAutoMutex autoMutexPCB ( this->preemptiveCallbackLock );
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
this->processRecvBacklog ();
|
||||
}
|
||||
this->recvProcessActivityEvent.wait ();
|
||||
}
|
||||
this->recvProcessThreadExit.signal ();
|
||||
}
|
||||
@@ -775,39 +766,27 @@ void cac::enableCallbackPreemption ()
|
||||
assert ( this->recvProcessEnableRefCount < UINT_MAX );
|
||||
this->recvProcessEnableRefCount++;
|
||||
if ( this->recvProcessEnableRefCount == 1u ) {
|
||||
this->recvProcessActivityEvent.signal ();
|
||||
this->preemptiveCallbackLock.unlock ();
|
||||
}
|
||||
}
|
||||
|
||||
// lock must already be applied
|
||||
void cac::disableCallbackPreemption ()
|
||||
{
|
||||
if ( ! this->recvProcessInProgress ) {
|
||||
assert ( this->recvProcessEnableRefCount != 0u );
|
||||
this->recvProcessEnableRefCount--;
|
||||
return;
|
||||
}
|
||||
else {
|
||||
this->recvProcessCompletionNBlockers++;
|
||||
}
|
||||
|
||||
while ( true ) {
|
||||
{
|
||||
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
|
||||
this->recvProcessCompletion.wait ();
|
||||
}
|
||||
|
||||
if ( ! this->recvProcessInProgress ) {
|
||||
assert ( this->recvProcessEnableRefCount > 0u );
|
||||
this->recvProcessEnableRefCount--;
|
||||
assert ( this->recvProcessCompletionNBlockers > 0u );
|
||||
this->recvProcessCompletionNBlockers--;
|
||||
if ( this->recvProcessCompletionNBlockers > 0u ) {
|
||||
this->recvProcessCompletion.signal ();
|
||||
if ( this->recvProcessEnableRefCount == 1u ) {
|
||||
if ( ! this->preemptiveCallbackLock.tryLock () ) {
|
||||
{
|
||||
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
|
||||
this->preemptiveCallbackLock.lock ();
|
||||
}
|
||||
// in case some thread enabled it while this->mutex was unlocked
|
||||
if ( this->recvProcessEnableRefCount > 1u ) {
|
||||
this->preemptiveCallbackLock.unlock ();
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
assert ( this->recvProcessEnableRefCount > 0u );
|
||||
this->recvProcessEnableRefCount--;
|
||||
}
|
||||
|
||||
void cac::repeaterSubscribeConfirmNotify ()
|
||||
@@ -949,7 +928,6 @@ void cac::flushIfRequired ( nciu &chan )
|
||||
}
|
||||
if ( this->pudpiiu && blockPermit ) {
|
||||
if ( this->pudpiiu->isCurrentThread () ) {
|
||||
printf ("detected no-block cond and id is %x\n", epicsThreadGetIdSelf());
|
||||
blockPermit = false;
|
||||
}
|
||||
}
|
||||
@@ -1250,7 +1228,7 @@ void cac::connectAllIO ( nciu &chan )
|
||||
}
|
||||
pNetIO = next;
|
||||
}
|
||||
chan.getPIIU()->flushRequest ();
|
||||
chan.getPIIU()->requestRecvProcessPostponedFlush ();
|
||||
while ( baseNMIU *pIO = tmpList.get () ) {
|
||||
signalNeeded = this->blockForIOCallbackCompletion ( pIO->getID() );
|
||||
pIO->destroy ( *this );
|
||||
@@ -1596,7 +1574,7 @@ bool cac::claimCIURespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pM
|
||||
else {
|
||||
sidTmp = pChan->getSID ();
|
||||
}
|
||||
pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp );
|
||||
pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp, iiu.ca_v41_ok() );
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
|
||||
Reference in New Issue
Block a user