From dd3baf0ce4634685b6af90581a4afeb742dc67ff Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Fri, 13 Jul 2001 23:12:41 +0000 Subject: [PATCH] fixed and also simplified locking logic --- src/ca/cac.cpp | 132 ++++++++++++++++++++++--------------------------- src/ca/cac.h | 3 -- 2 files changed, 58 insertions(+), 77 deletions(-) diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 1f0b3f61e..395f02963 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -119,11 +119,9 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : recvProcessEnableRefCount ( 0u ), pndRecvCnt ( 0u ), readSeq ( 0u ), - recvProcessCompletionSignalRequestCount ( 0u ), enablePreemptiveCallback ( enablePreemptiveCallbackIn ), ioInProgress ( false ), - recvProcessThreadExitRequest ( false ), - recvProcessPending ( false ) + recvProcessThreadExitRequest ( false ) { long status; unsigned abovePriority; @@ -223,7 +221,10 @@ cac::~cac () if ( this->pRecvProcessThread ) { this->recvProcessThreadExitRequest = true; this->recvProcessActivityEvent.signal (); - this->enableCallbackPreemption (); + { + epicsAutoMutex autoMutex ( this->mutex ); + this->enableCallbackPreemption (); + } this->recvProcessThreadExit.wait (); delete this->pRecvProcessThread; } @@ -577,8 +578,10 @@ void cac::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ) this->disableCallbackPreemption (); } -int cac::pendEvent ( const double &timeout ) +int cac::pendEvent ( const double & timeout ) { + epicsTime current = epicsTime::getCurrent (); + // prevent recursion nightmares by disabling calls to // pendIO () from within a CA callback if ( this->recvProcessThreadIsCurrentThread () ) { @@ -590,39 +593,31 @@ int cac::pendEvent ( const double &timeout ) this->flushRequestPrivate (); - bool waitForSignal; - if ( this->recvProcessPending ) { - this->recvProcessCompletionSignalRequestCount++; - waitForSignal = true; + // process at least once if preemptive callback + // isnt enabled (this avoids complications associated + // with forcing the recv processing thread to run. + if ( ! this->enablePreemptiveCallback && + this->recvProcessEnableRefCount == 0 ) { + this->processRecvBacklog (); + } + + double elapsed = epicsTime::getCurrent() - current; + double delay; + + if ( timeout > elapsed ) { + delay = timeout - elapsed; } else { - waitForSignal = false; + delay = 0.0; } - this->enableCallbackPreemption (); - - { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); - if ( timeout == 0.0 ) { - while ( true ) { - epicsThreadSleep ( 60.0 ); - } - } - else if ( timeout >= CAC_SIGNIFICANT_DELAY ) { - epicsThreadSleep ( timeout ); - } - if ( waitForSignal ) { - this->recvProcessCompleted.wait (); - } - } - - this->disableCallbackPreemption (); - - if ( waitForSignal ) { - this->recvProcessCompletionSignalRequestCount--; - if ( this->recvProcessCompletionSignalRequestCount ) { - this->recvProcessCompleted.signal (); + if ( delay >= CAC_SIGNIFICANT_DELAY ) { + this->enableCallbackPreemption (); + { + epicsAutoMutexRelease autoMutexRelease ( this->mutex ); + epicsThreadSleep ( delay ); } + this->disableCallbackPreemption (); } } @@ -674,28 +669,24 @@ void cac::registerService ( cacService &service ) void cac::startRecvProcessThread () { - bool newThread = false; - { - epicsAutoMutex epicsMutex ( this->mutex ); - if ( ! this->pRecvProcessThread ) { - unsigned priorityOfProcess; - epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove ( - this->getInitializingThreadsPriority (), &priorityOfProcess ); - if ( tbs != epicsThreadBooleanStatusSuccess ) { - priorityOfProcess = this->getInitializingThreadsPriority (); - } - - this->pRecvProcessThread = new epicsThread ( *this, "CAC-recv-process", - epicsThreadGetStackSize ( epicsThreadStackSmall ), priorityOfProcess ); - if ( ! this->pRecvProcessThread ) { - throw std::bad_alloc (); - } - this->pRecvProcessThread->start (); - newThread = true; + epicsAutoMutex epicsMutex ( this->mutex ); + if ( ! this->pRecvProcessThread ) { + unsigned priorityOfProcess; + epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove ( + this->getInitializingThreadsPriority (), &priorityOfProcess ); + if ( tbs != epicsThreadBooleanStatusSuccess ) { + priorityOfProcess = this->getInitializingThreadsPriority (); + } + + this->pRecvProcessThread = new epicsThread ( *this, "CAC-recv-process", + epicsThreadGetStackSize ( epicsThreadStackSmall ), priorityOfProcess ); + if ( ! this->pRecvProcessThread ) { + throw std::bad_alloc (); + } + this->pRecvProcessThread->start (); + if ( this->enablePreemptiveCallback ) { + this->enableCallbackPreemption (); } - } - if ( this->enablePreemptiveCallback && newThread ) { - this->enableCallbackPreemption (); } } @@ -704,17 +695,10 @@ void cac::run () { this->attachToClientCtx (); while ( ! this->recvProcessThreadExitRequest ) { - bool wakeupNeeded; { - this->recvProcessPending = true; epicsAutoMutex autoMutexPCB ( this->preemptiveCallbackLock ); epicsAutoMutex autoMutex ( this->mutex ); this->processRecvBacklog (); - this->recvProcessPending = false; - wakeupNeeded = this->recvProcessCompletionSignalRequestCount > 0u; - } - if ( wakeupNeeded ) { - this->recvProcessCompleted.signal (); } this->recvProcessActivityEvent.wait (); } @@ -802,20 +786,14 @@ void cac::enableCallbackPreemption () // lock must already be applied void cac::disableCallbackPreemption () { - if ( this->recvProcessEnableRefCount == 1u ) { - if ( ! this->preemptiveCallbackLock.tryLock () ) { - { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); - this->preemptiveCallbackLock.lock (); - } - // in case some thread enabled it while this->mutex was unlocked - if ( this->recvProcessEnableRefCount > 1u ) { - this->preemptiveCallbackLock.unlock (); - } - } - } assert ( this->recvProcessEnableRefCount > 0u ); this->recvProcessEnableRefCount--; + if ( this->recvProcessEnableRefCount == 0u ) { + if ( ! this->preemptiveCallbackLock.tryLock () ) { + epicsAutoMutexRelease autoMutexRelease ( this->mutex ); + this->preemptiveCallbackLock.lock (); + } + } } void cac::repeaterSubscribeConfirmNotify () @@ -1045,7 +1023,13 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id ) if ( pSubscr ) { chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); } - if ( pRecvProcessThread->isCurrentThread() ) { + // + // if preemptive callback is not enabled then + // the code will block until the recv process + // thread is idle before returning control back + // to the user + if ( ! this->enablePreemptiveCallback || + pRecvProcessThread->isCurrentThread() ) { signalNeeded = false; } else { diff --git a/src/ca/cac.h b/src/ca/cac.h index 41e956613..d686fa4c4 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -163,7 +163,6 @@ private: epicsEvent notifyCompletionEvent; epicsEvent recvProcessActivityEvent; epicsEvent recvProcessThreadExit; - epicsEvent recvProcessCompleted; epicsEvent ioDone; epicsTimerQueueActive *pTimerQueue; char *pUserName; @@ -182,11 +181,9 @@ private: unsigned recvProcessEnableRefCount; unsigned pndRecvCnt; unsigned readSeq; - unsigned recvProcessCompletionSignalRequestCount; bool enablePreemptiveCallback; bool ioInProgress; bool recvProcessThreadExitRequest; - bool recvProcessPending; void processRecvBacklog (); void flushRequestPrivate ();