diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 79cdf83dd..0455c6e4f 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -117,12 +117,10 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : threadsBlockingOnNotifyCompletion ( 0u ), maxRecvBytesTCP ( MAX_TCP ), recvProcessEnableRefCount ( 0u ), - recvProcessCompletionNBlockers ( 0u ), pndRecvCnt ( 0u ), readSeq ( 0u ), enablePreemptiveCallback ( enablePreemptiveCallbackIn ), ioInProgress ( false ), - recvProcessInProgress ( false ), recvProcessThreadExitRequest ( false ) { long status; @@ -211,6 +209,7 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : freeListCleanup ( this->tcpLargeRecvBufFreeList ); throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) ); } + this->preemptiveCallbackLock.lock (); } cac::~cac () @@ -371,9 +370,8 @@ void cac::show ( unsigned level ) const { epicsAutoMutex autoMutex2 ( this->mutex ); - ::printf ( "Channel Access Client Context at %p for user %s %s\n", - static_cast ( this ), this->pUserName, - this->recvProcessInProgress ? "busy" : "idle" ); + ::printf ( "Channel Access Client Context at %p for user %s\n", + static_cast ( this ), this->pUserName ); if ( level > 0u ) { tsDLIterConstBD < tcpiiu > piiu = this->iiuList.firstIter (); while ( piiu.valid() ) { @@ -423,8 +421,6 @@ void cac::show ( unsigned level ) const this->ipToAEngine.show ( level - 3u ); ::printf ( "recv process enable count %u\n", this->recvProcessEnableRefCount ); - ::printf ( "recv process blocking for completion count %u\n", - this->recvProcessCompletionNBlockers ); ::printf ( "\trecv process shutdown command boolean %u\n", this->recvProcessThreadExitRequest ); ::printf ( "\tthe current read sequence number is %u\n", @@ -438,8 +434,6 @@ void cac::show ( unsigned level ) const this->recvProcessActivityEvent.show ( level - 3u ); ::printf ( "Receive process exit event:\n" ); this->recvProcessThreadExit.show ( level - 3u ); - ::printf ( "Receive processing done event:\n" ); - this->recvProcessCompletion.show ( level - 3u ); ::printf ( "IO done event:\n"); this->ioDone.show ( level - 3u ); ::printf ( "mutex:\n" ); @@ -662,9 +656,15 @@ void cac::startRecvProcessThread () { epicsAutoMutex epicsMutex ( this->mutex ); if ( ! this->pRecvProcessThread ) { + unsigned priorityOfProcess; + epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove ( + this->getInitializingThreadsPriority (), &priorityOfProcess ); + if ( tbs != epicsThreadBooleanStatusSuccess ) { + priorityOfProcess = this->getInitializingThreadsPriority (); + } + this->pRecvProcessThread = new epicsThread ( *this, "CAC-recv-process", - epicsThreadGetStackSize ( epicsThreadStackSmall ), - this->getInitializingThreadsPriority () ); + epicsThreadGetStackSize ( epicsThreadStackSmall ), priorityOfProcess ); if ( ! this->pRecvProcessThread ) { throw std::bad_alloc (); } @@ -680,23 +680,14 @@ void cac::startRecvProcessThread () // this is the recv process thread entry point void cac::run () { -printf ("recv process thread id is %x\n", epicsThreadGetIdSelf()); - epicsAutoMutex autoMutex ( this->mutex ); this->attachToClientCtx (); while ( ! this->recvProcessThreadExitRequest ) { - if ( this->recvProcessEnableRefCount ) { - this->recvProcessInProgress = true; - this->processRecvBacklog (); - this->recvProcessInProgress = false; - } - bool signalNeeded = this->recvProcessCompletionNBlockers > 0u; { - epicsAutoMutexRelease autoRelease ( this->mutex ); - if ( signalNeeded ) { - this->recvProcessCompletion.signal (); - } - this->recvProcessActivityEvent.wait (); + epicsAutoMutex autoMutexPCB ( this->preemptiveCallbackLock ); + epicsAutoMutex autoMutex ( this->mutex ); + this->processRecvBacklog (); } + this->recvProcessActivityEvent.wait (); } this->recvProcessThreadExit.signal (); } @@ -775,39 +766,27 @@ void cac::enableCallbackPreemption () assert ( this->recvProcessEnableRefCount < UINT_MAX ); this->recvProcessEnableRefCount++; if ( this->recvProcessEnableRefCount == 1u ) { - this->recvProcessActivityEvent.signal (); + this->preemptiveCallbackLock.unlock (); } } // lock must already be applied void cac::disableCallbackPreemption () { - if ( ! this->recvProcessInProgress ) { - assert ( this->recvProcessEnableRefCount != 0u ); - this->recvProcessEnableRefCount--; - return; - } - else { - this->recvProcessCompletionNBlockers++; - } - - while ( true ) { - { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); - this->recvProcessCompletion.wait (); - } - - if ( ! this->recvProcessInProgress ) { - assert ( this->recvProcessEnableRefCount > 0u ); - this->recvProcessEnableRefCount--; - assert ( this->recvProcessCompletionNBlockers > 0u ); - this->recvProcessCompletionNBlockers--; - if ( this->recvProcessCompletionNBlockers > 0u ) { - this->recvProcessCompletion.signal (); + if ( this->recvProcessEnableRefCount == 1u ) { + if ( ! this->preemptiveCallbackLock.tryLock () ) { + { + epicsAutoMutexRelease autoMutexRelease ( this->mutex ); + this->preemptiveCallbackLock.lock (); + } + // in case some thread enabled it while this->mutex was unlocked + if ( this->recvProcessEnableRefCount > 1u ) { + this->preemptiveCallbackLock.unlock (); } - return; } } + assert ( this->recvProcessEnableRefCount > 0u ); + this->recvProcessEnableRefCount--; } void cac::repeaterSubscribeConfirmNotify () @@ -949,7 +928,6 @@ void cac::flushIfRequired ( nciu &chan ) } if ( this->pudpiiu && blockPermit ) { if ( this->pudpiiu->isCurrentThread () ) { -printf ("detected no-block cond and id is %x\n", epicsThreadGetIdSelf()); blockPermit = false; } } @@ -1250,7 +1228,7 @@ void cac::connectAllIO ( nciu &chan ) } pNetIO = next; } - chan.getPIIU()->flushRequest (); + chan.getPIIU()->requestRecvProcessPostponedFlush (); while ( baseNMIU *pIO = tmpList.get () ) { signalNeeded = this->blockForIOCallbackCompletion ( pIO->getID() ); pIO->destroy ( *this ); @@ -1596,7 +1574,7 @@ bool cac::claimCIURespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pM else { sidTmp = pChan->getSID (); } - pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp ); + pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp, iiu.ca_v41_ok() ); return true; } else { diff --git a/src/ca/cac.h b/src/ca/cac.h index 3d3b3d4bf..93bc5d150 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -160,9 +160,9 @@ private: epicsTime programBeginTime; double connTMO; mutable epicsMutex mutex; + epicsMutex preemptiveCallbackLock; epicsEvent notifyCompletionEvent; epicsEvent recvProcessActivityEvent; - epicsEvent recvProcessCompletion; epicsEvent recvProcessThreadExit; epicsEvent ioDone; epicsTimerQueueActive *pTimerQueue; @@ -180,12 +180,10 @@ private: unsigned threadsBlockingOnNotifyCompletion; unsigned maxRecvBytesTCP; unsigned recvProcessEnableRefCount; - unsigned recvProcessCompletionNBlockers; unsigned pndRecvCnt; unsigned readSeq; bool enablePreemptiveCallback; bool ioInProgress; - bool recvProcessInProgress; bool recvProcessThreadExitRequest; void flushRequestPrivate ();