diff --git a/src/ca/Makefile b/src/ca/Makefile index 4f947630e..e05b3b3f1 100644 --- a/src/ca/Makefile +++ b/src/ca/Makefile @@ -22,7 +22,6 @@ LIBSRCS += cacWriteNotify.cpp LIBSRCS += cacStateNotify.cpp LIBSRCS += cacServiceList.cpp LIBSRCS += access.cpp -LIBSRCS += recvProcessThread.cpp LIBSRCS += iocinf.cpp LIBSRCS += convert.cpp LIBSRCS += test_event.cpp @@ -57,7 +56,6 @@ LIBSRCS += comQueRecv.cpp LIBSRCS += comQueSend.cpp LIBSRCS += comBuf.cpp LIBSRCS += hostNameCache.cpp -LIBSRCS += ioCounterNet.cpp LIBSRCS += msgForMultiplyDefinedPV.cpp LIBSRCS += limboiiu.cpp diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index e4b468bf0..da120306a 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -110,13 +110,20 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : pRepeaterSubscribeTmr ( 0 ), tcpSmallRecvBufFreeList ( 0 ), tcpLargeRecvBufFreeList ( 0 ), + pRecvProcessThread ( 0 ), notify ( notifyIn ), ioNotifyInProgressId ( 0 ), initializingThreadsPriority ( epicsThreadGetPrioritySelf () ), threadsBlockingOnNotifyCompletion ( 0u ), maxRecvBytesTCP ( MAX_TCP ), + recvProcessEnableRefCount ( 0u ), + recvProcessCompletionNBlockers ( 0u ), + pndRecvCnt ( 0u ), + readSeq ( 0u ), enablePreemptiveCallback ( enablePreemptiveCallbackIn ), - ioInProgress ( false ) + ioInProgress ( false ), + recvProcessInProgress ( false ), + recvProcessThreadExitRequest ( false ) { long status; unsigned abovePriority; @@ -204,37 +211,19 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : freeListCleanup ( this->tcpLargeRecvBufFreeList ); throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) ); } - - // - // unfortunately, this must be created here in the - // constructor, and not on demand (only when it is needed) - // because the enable reference count must be - // maintained whenever this object exists. - // - this->pRecvProcThread = new recvProcessThread ( this ); - if ( ! this->pRecvProcThread ) { - this->pTimerQueue->release (); - free ( this->pUserName ); - freeListCleanup ( this->tcpSmallRecvBufFreeList ); - freeListCleanup ( this->tcpLargeRecvBufFreeList ); - throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) ); - } - else if ( this->enablePreemptiveCallback ) { - // only after this->pRecvProcThread is valid - this->enableCallbackPreemption (); - } } cac::~cac () { - this->enableCallbackPreemption (); - // // make certain that process thread isnt deleting // tcpiiu objects at the same that this thread is // if ( this->pRecvProcThread ) { - this->pRecvProcThread->disable (); + this->recvProcessThreadExitRequest = true; + this->recvProcessActivityEvent.signal (); + this->recvProcessThreadExit.wait (); + delete this->pRecvProcThread; } { @@ -263,7 +252,6 @@ cac::~cac () piiu->destroy (); } - delete this->pRecvProcThread; delete this->pRepeaterSubscribeTmr; delete this->pSearchTmr; @@ -302,12 +290,11 @@ cac::~cac () this->pTimerQueue->release (); } +// lock must be applied void cac::processRecvBacklog () { tsDLList < tcpiiu > deadIIU; { - epicsAutoMutex autoMutex ( this->mutex ); - tsDLIterBD < tcpiiu > piiu = this->iiuList.firstIter (); while ( piiu.valid () ) { tsDLIterBD < tcpiiu > pNext = piiu; @@ -351,13 +338,13 @@ void cac::processRecvBacklog () } } if ( deadIIU.count() ) { - while ( tcpiiu *piiu = deadIIU.get() ) { - piiu->destroy (); - } { - epicsAutoMutex autoMutex ( this->mutex ); - this->pSearchTmr->resetPeriod ( 0.0 ); + epicsAutoMutex autoRelease ( this->mutex ); + while ( tcpiiu *piiu = deadIIU.get() ) { + piiu->destroy (); + } } + this->pSearchTmr->resetPeriod ( 0.0 ); } } @@ -384,11 +371,12 @@ void cac::show ( unsigned level ) const { epicsAutoMutex autoMutex2 ( this->mutex ); - ::printf ( "Channel Access Client Context at %p for user %s\n", - static_cast ( this ), this->pUserName ); + ::printf ( "Channel Access Client Context at %p for user %s %s\n", + static_cast ( this ), this->pUserName, + this->recvProcessInProgress ? "busy" : "idle" ); if ( level > 0u ) { tsDLIterConstBD < tcpiiu > piiu = this->iiuList.firstIter (); - while ( piiu.valid () ) { + while ( piiu.valid() ) { piiu->show ( level - 1u ); piiu++; } @@ -404,7 +392,8 @@ void cac::show ( unsigned level ) const if ( this->pudpiiu ) { this->pudpiiu->show ( level - 2u ); } - this->ioCounter.show ( level - 2u ); + ::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n", + this->pndRecvCnt ); } if ( level > 2u ) { @@ -422,10 +411,6 @@ void cac::show ( unsigned level ) const ::printf ( "Timer queue:\n" ); this->pTimerQueue->show ( level - 3u ); } - if ( this->pRecvProcThread ) { - ::printf ( "incoming messages processing thread:\n" ); - this->pRecvProcThread->show ( level - 3u ); - } if ( this->pSearchTmr ) { ::printf ( "search message timer:\n" ); this->pSearchTmr->show ( level - 3u ); @@ -436,18 +421,29 @@ void cac::show ( unsigned level ) const } ::printf ( "IP address to name conversion engine:\n" ); this->ipToAEngine.show ( level - 3u ); + ::printf ( "recv process enable count %u\n", + this->recvProcessEnableRefCount ); + ::printf ( "recv process blocking for completion count %u\n", + this->recvProcessCompletionNBlockers ); + ::printf ( "\trecv process shutdown command boolean %u\n", + this->recvProcessThreadExitRequest ); + ::printf ( "\tthe current read sequence number is %u\n", + this->readSeq ); } if ( level > 3u ) { ::printf ( "Default mutex:\n"); this->mutex.show ( level - 4u ); - } -} - -void cac::signalRecvActivity () -{ - if ( this->pRecvProcThread ) { - this->pRecvProcThread->signalActivity (); + ::printf ( "Receive process activity event:\n" ); + this->recvProcessActivityEvent.show ( level - 3u ); + ::printf ( "Receive process exit event:\n" ); + this->recvProcessThreadExit.show ( level - 3u ); + ::printf ( "Receive processing done event:\n" ); + this->recvProcessCompletion.show ( level - 3u ); + ::printf ( "IO done event:\n"); + this->ioDone.show ( level - 3u ); + ::printf ( "mutex:\n" ); + this->mutex.show ( level - 3u ); } } @@ -527,7 +523,7 @@ int cac::pendIO ( const double &timeout ) { // prevent recursion nightmares by disabling calls to // pendIO () from within a CA callback - if ( this->pRecvProcThread->isCurrentThread () ) { + if ( this->recvProcessThreadIsCurrentThread () ) { return ECA_EVDISALLOW; } @@ -544,21 +540,22 @@ int cac::pendIO ( const double &timeout ) else{ remaining = timeout; } - while ( this->ioCounter.currentCount () > 0 ) { + while ( this->pndRecvCnt > 0 ) { if ( remaining < CAC_SIGNIFICANT_SELECT_DELAY ) { status = ECA_TIMEOUT; break; } - this->ioCounter.waitForCompletion ( remaining ); + this->ioDone.wait ( remaining ); if ( timeout != 0.0 ) { double delay = epicsTime::getCurrent () - beg_time; remaining = timeout - delay; } } - this->ioCounter.cleanUp (); { epicsAutoMutex autoMutex ( this->mutex ); + this->readSeq++; + this->pndRecvCnt = 0u; if ( this->pudpiiu ) { this->pudpiiu->connectTimeoutNotify (); } @@ -573,7 +570,7 @@ int cac::pendEvent ( const double &timeout ) { // prevent recursion nightmares by disabling calls to // pendIO () from within a CA callback - if ( this->pRecvProcThread->isCurrentThread () ) { + if ( this->recvProcessThreadIsCurrentThread () ) { return ECA_EVDISALLOW; } @@ -595,16 +592,6 @@ int cac::pendEvent ( const double &timeout ) return ECA_TIMEOUT; } -bool cac::ioComplete () const -{ - if ( this->ioCounter.currentCount () == 0u ) { - return true; - } - else{ - return false; - } -} - // this is to only be used by early protocol revisions bool cac::connectChannel ( unsigned id ) { @@ -648,6 +635,63 @@ void cac::registerService ( cacService &service ) this->services.registerService ( service ); } +void cac::startRecvProcessThread () +{ + bool newThread = false; + { + epicsAutoMutex epicsMutex ( this->mutex ); + if ( ! this->pRecvProcessThread ) { + this->pRecvProcessThread = new epicsThread ( *this, "CAC-recv-process", + epicsThreadGetStackSize ( epicsThreadStackSmall ), + this->getInitializingThreadsPriority () ); + if ( ! this->pRecvProcessThread ) { + throw std::bad_alloc (); + } + this->pRecvProcessThread->start (); + newThread = true; + } + } + if ( this->enablePreemptiveCallback && newThread ) { + this->enableCallbackPreemption (); + } +} + +// this is the recv process thread entry point +void cac::run () +{ + epicsAutoMutex autoMutex ( this->mutex ); + + this->attachToClientCtx (); + + while ( ! this->recvProcessThreadExitRequest ) { + + { + if ( this->recvProcessEnableRefCount ) { + this->recvProcessInProgress = true; + } + } + + if ( this->recvProcessInProgress ) { + this->processRecvBacklog (); + } + + bool signalNeeded; + { + this->recvProcessInProgress = false; + signalNeeded = this->recvProcessCompletionNBlockers > 0u; + } + + { + epicsAutoMutexRelease autoRelease ( this->mutex ); + if ( signalNeeded ) { + this->recvProcessCompletion.signal (); + } + this->recvProcessActivityEvent.wait (); + } + } + this->recvProcessThreadExit.signal (); +} + cacChannel & cac::createChannel ( const char *pName, cacChannelNotify &chan ) { cacChannel *pIO; @@ -664,6 +708,7 @@ cacChannel & cac::createChannel ( const char *pName, cacChannelNotify &chan ) epics_auto_ptr < cacChannel > pNetChan ( new nciu ( *this, limboIIU, chan, pName ) ); if ( pNetChan.get() ) { + this->startRecvProcessThread (); return *pNetChan.release (); } else { @@ -717,15 +762,54 @@ bool cac::setupUDP () void cac::enableCallbackPreemption () { - if ( this->pRecvProcThread ) { - this->pRecvProcThread->enable (); + unsigned copy; + { + epicsAutoMutex autoMutex ( this->mutex ); + assert ( this->recvProcessEnableRefCount < UINT_MAX ); + copy = this->recvProcessEnableRefCount; + this->recvProcessEnableRefCount++; + } + if ( copy == 0u ) { + this->recvProcessActivityEvent.signal (); } } void cac::disableCallbackPreemption () { - if ( this->pRecvProcThread ) { - this->pRecvProcThread->disable (); + bool wakeupNeeded; + + { + epicsAutoMutex autoMutex ( this->mutex ); + + if ( ! this->recvProcessInProgress ) { + assert ( this->recvProcessEnableRefCount != 0u ); + this->recvProcessEnableRefCount--; + return; + } + else { + this->recvProcessCompletionNBlockers++; + } + } + + while ( true ) { + this->recvProcessCompletion.wait (); + + { + epicsAutoMutex autoMutex ( this->mutex ); + + if ( ! this->recvProcessInProgress ) { + assert ( this->recvProcessEnableRefCount > 0u ); + this->recvProcessEnableRefCount--; + assert ( this->recvProcessCompletionNBlockers > 0u ); + this->recvProcessCompletionNBlockers--; + wakeupNeeded = this->recvProcessCompletionNBlockers > 0u; + break; + } + } + } + + if ( wakeupNeeded ) { + this->recvProcessCompletion.signal (); } } @@ -864,7 +948,7 @@ void cac::flushIfRequired ( nciu &chan ) // locking hierarchy reasons. bool blockPermit = true; if ( this->pRecvProcThread ) { - if ( this->pRecvProcThread->isCurrentThread () ) { + if ( this->recvProcessThreadIsCurrentThread () ) { blockPermit = false; } } @@ -1600,3 +1684,68 @@ void cac::vSignal ( int ca_status, const char *pfilenm, this->printf ( "..................................................................\n" ); } +void cac::incrementOutstandingIO () +{ + epicsAutoMutex locker ( this->mutex ); + if ( this->pndRecvCnt < UINT_MAX ) { + this->pndRecvCnt++; + } + else { + throwWithLocation ( caErrorCode (ECA_INTERNAL) ); + } +} + +void cac::decrementOutstandingIO () +{ + bool signalNeeded; + + { + epicsAutoMutex locker ( this->mutex ); + if ( this->pndRecvCnt > 0u ) { + this->pndRecvCnt--; + if ( this->pndRecvCnt == 0u ) { + signalNeeded = true; + } + else { + signalNeeded = false; + } + } + else { + signalNeeded = true; + } + } + + if ( signalNeeded ) { + this->ioDone.signal (); + } +} + +void cac::decrementOutstandingIO ( unsigned sequenceNo ) +{ + bool signalNeeded; + + { + epicsAutoMutex locker ( this->mutex ); + if ( this->readSeq == sequenceNo ) { + if ( this->pndRecvCnt > 0u ) { + this->pndRecvCnt--; + if ( this->pndRecvCnt == 0u ) { + signalNeeded = true; + } + else { + signalNeeded = false; + } + } + else { + signalNeeded = true; + } + } + else { + signalNeeded = false; + } + } + + if ( signalNeeded ) { + this->ioDone.signal (); + } +} diff --git a/src/ca/cac.h b/src/ca/cac.h index 90bb8eac2..57cae26fc 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -30,54 +30,6 @@ #include "cacIO.h" #undef epicsExportSharedSymbols -class ioCounterNet { -public: - ioCounterNet (); - void increment (); - void decrement (); - void decrement ( unsigned seqNumber ); - unsigned sequenceNumber () const; - unsigned currentCount () const; - void cleanUp (); - void show ( unsigned level ) const; - void waitForCompletion ( double delaySec ); -private: - unsigned pndrecvcnt; - unsigned readSeq; - epicsMutex mutex; - epicsEvent ioDone; -}; - -class recvProcessThread : public epicsThreadRunable { -public: - recvProcessThread ( class cac *pcacIn ); - virtual ~recvProcessThread (); - void run (); - void enable (); - void disable (); - void signalActivity (); - bool isCurrentThread () const; - void show ( unsigned level ) const; -private: - // - // The additional complexity associated with - // "processingDone" event and the "processing" flag - // avoid complex locking hierarchy constraints - // and therefore reduces the chance of creating - // a deadlock window during code maintenance. - // - epicsThread thread; - epicsEvent recvActivity; - class cac *pcac; - epicsEvent exit; - epicsEvent processingDone; - mutable epicsMutex mutex; - unsigned enableRefCount; - unsigned blockingForCompletion; - bool processing; - bool shutDown; -}; - class netWriteNotifyIO; class netReadNotifyIO; class netSubscription; @@ -96,7 +48,7 @@ struct CASG; class inetAddrID; struct caHdrLargeArray; -class cac : private cacRecycle +class cac : private cacRecycle, private epicsThreadRunable { public: cac ( cacNotify &, bool enablePreemptiveCallback = false ); @@ -185,7 +137,6 @@ public: void releaseLargeBufferTCP ( char * ); private: - ioCounterNet ioCounter; ipAddrToAsciiEngine ipToAEngine; cacServiceList services; tsDLList < tcpiiu > iiuList; @@ -210,28 +161,42 @@ private: double connTMO; mutable epicsMutex mutex; epicsEvent notifyCompletionEvent; + epicsEvent recvProcessActivityEvent; + epicsEvent recvProcessCompletion; + epicsEvent recvProcessThreadExit; + epicsEvent ioDone; epicsTimerQueueActive *pTimerQueue; char *pUserName; - recvProcessThread *pRecvProcThread; + epicsThread *pRecvProcThread; class udpiiu *pudpiiu; class searchTimer *pSearchTmr; class repeaterSubscribeTimer *pRepeaterSubscribeTmr; void *tcpSmallRecvBufFreeList; void *tcpLargeRecvBufFreeList; + epicsThread *pRecvProcessThread; cacNotify ¬ify; unsigned ioNotifyInProgressId; unsigned initializingThreadsPriority; unsigned threadsBlockingOnNotifyCompletion; unsigned maxRecvBytesTCP; + unsigned recvProcessEnableRefCount; + unsigned recvProcessCompletionNBlockers; + unsigned pndRecvCnt; + unsigned readSeq; bool enablePreemptiveCallback; bool ioInProgress; + bool recvProcessInProgress; + bool recvProcessThreadExitRequest; + + void run (); bool setupUDP (); void flushIfRequired ( nciu & ); // lock must be applied void recycleReadNotifyIO ( netReadNotifyIO &io ); void recycleWriteNotifyIO ( netWriteNotifyIO &io ); void recycleSubscription ( netSubscription &io ); - + bool recvProcessThreadIsCurrentThread () const; + void startRecvProcessThread (); void ioCompletionNotify ( unsigned id, unsigned type, arrayElementCount count, const void *pData ); void ioExceptionNotify ( unsigned id, @@ -300,24 +265,9 @@ inline unsigned cac::getInitializingThreadsPriority () const return this->initializingThreadsPriority; } -inline void cac::incrementOutstandingIO () -{ - this->ioCounter.increment (); -} - -inline void cac::decrementOutstandingIO () -{ - this->ioCounter.decrement (); -} - -inline void cac::decrementOutstandingIO ( unsigned sequenceNo ) -{ - this->ioCounter.decrement ( sequenceNo ); -} - inline unsigned cac::sequenceNumberOfOutstandingIO () const { - return this->ioCounter.sequenceNumber (); + return this->readSeq; } inline epicsMutex & cac::mutexRef () @@ -366,9 +316,24 @@ inline void cac::releaseLargeBufferTCP ( char *pBuf ) freeListFree ( this->tcpLargeRecvBufFreeList, pBuf ); } -inline bool recvProcessThread::isCurrentThread () const +inline bool cac::recvProcessThreadIsCurrentThread () const { - return this->thread.isCurrentThread (); + if ( this->pRecvProcessThread ) { + return this->pRecvProcessThread->isCurrentThread(); + } + else { + return false; + } +} + +inline bool cac::ioComplete () const +{ + return ( this->pndRecvCnt == 0u ); +} + +inline void cac::signalRecvActivity () +{ + this->recvProcessActivityEvent.signal (); } #endif // ifdef cach diff --git a/src/ca/ioCounterNet.cpp b/src/ca/ioCounterNet.cpp deleted file mode 100644 index d722ea7e8..000000000 --- a/src/ca/ioCounterNet.cpp +++ /dev/null @@ -1,135 +0,0 @@ - -/* - * $Id$ - * - * - * L O S A L A M O S - * Los Alamos National Laboratory - * Los Alamos, New Mexico 87545 - * - * Copyright, 1986, The Regents of the University of California. - * - * - * Author Jeffrey O. Hill - * johill@lanl.gov - * 505 665 1831 - */ - -#include - -#define epicsAssertAuthor "Jeff Hill johill@lanl.gov" - -#include "iocinf.h" -#include "cac.h" - -#define epicsExportSharedSymbols -#include "caerr.h" // for CA_INTERNAL -#undef epicsExportSharedSymbols - -ioCounterNet::ioCounterNet () : pndrecvcnt ( 0u ), readSeq ( 0u ) -{ -} - -void ioCounterNet::increment () -{ - epicsAutoMutex locker ( this->mutex ); - if ( this->pndrecvcnt < UINT_MAX ) { - this->pndrecvcnt++; - } - else { - throwWithLocation ( caErrorCode (ECA_INTERNAL) ); - } -} - -unsigned ioCounterNet::sequenceNumber () const -{ - return this->readSeq; -} - -unsigned ioCounterNet::currentCount () const -{ - return this->pndrecvcnt; -} - -void ioCounterNet::waitForCompletion ( double delaySec ) -{ - this->ioDone.wait ( delaySec ); -} - -void ioCounterNet::cleanUp () -{ - epicsAutoMutex locker ( this->mutex ); - this->readSeq++; - this->pndrecvcnt = 0u; -} - -void ioCounterNet::decrement () -{ - bool signalNeeded; - - { - epicsAutoMutex locker ( this->mutex ); - if ( this->pndrecvcnt > 0u ) { - this->pndrecvcnt--; - if ( this->pndrecvcnt == 0u ) { - signalNeeded = true; - } - else { - signalNeeded = false; - } - } - else { - signalNeeded = true; - } - } - - if ( signalNeeded ) { - this->ioDone.signal (); - } -} - -void ioCounterNet::decrement ( unsigned seqNumber ) -{ - bool signalNeeded; - - { - epicsAutoMutex locker ( this->mutex ); - if ( this->readSeq == seqNumber ) { - if ( this->pndrecvcnt > 0u ) { - this->pndrecvcnt--; - if ( this->pndrecvcnt == 0u ) { - signalNeeded = true; - } - else { - signalNeeded = false; - } - } - else { - signalNeeded = true; - } - } - else { - signalNeeded = false; - } - } - - if ( signalNeeded ) { - this->ioDone.signal (); - } -} - -void ioCounterNet::show ( unsigned level ) const -{ - ::printf ( "ioCounterNet at %p\n", - static_cast ( this ) ); - ::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n", - this->pndrecvcnt ); - if ( level > 0u ) { - ::printf ( "\tthe current read sequence number is %u\n", - this->readSeq ); - } - if ( level > 1u ) { - ::printf ( "IO done event:\n"); - this->ioDone.show ( level - 2u ); - } -} diff --git a/src/ca/recvProcessThread.cpp b/src/ca/recvProcessThread.cpp deleted file mode 100644 index fadf3c397..000000000 --- a/src/ca/recvProcessThread.cpp +++ /dev/null @@ -1,164 +0,0 @@ - -/* - * $Id$ - * - * - * L O S A L A M O S - * Los Alamos National Laboratory - * Los Alamos, New Mexico 87545 - * - * Copyright, 1986, The Regents of the University of California. - * - * - * Author Jeffrey O. Hill - * johill@lanl.gov - * 505 665 1831 - * - */ - -#define epicsAssertAuthor "Jeff Hill johill@lanl.gov" - -#include "iocinf.h" -#include "cac.h" - -#define epicsExportSharedSymbols -#include "cadef.h" -#undef epicsExportSharedSymbols - -recvProcessThread::recvProcessThread (cac *pcacIn) : - thread ( *this, "CAC-recv-process", - epicsThreadGetStackSize ( epicsThreadStackSmall ), - pcacIn->getInitializingThreadsPriority () ), - pcac ( pcacIn ), - enableRefCount ( 0u ), - blockingForCompletion ( 0u ), - processing ( false ), - shutDown ( false ) -{ - this->thread.start (); -} - -recvProcessThread::~recvProcessThread () -{ - this->shutDown = true; - this->recvActivity.signal (); - this->exit.wait (); -} - -void recvProcessThread::run () -{ - this->pcac->attachToClientCtx (); - - while ( ! this->shutDown ) { - - { - epicsAutoMutex autoMutex ( this->mutex ); - if ( this->enableRefCount ) { - this->processing = true; - } - } - - if ( this->processing ) { - pcac->processRecvBacklog (); - } - - bool signalNeeded; - { - epicsAutoMutex autoMutex ( this->mutex ); - this->processing = false; - signalNeeded = this->blockingForCompletion > 0u; - } - - if ( signalNeeded ) { - this->processingDone.signal (); - } - - this->recvActivity.wait (); - } - this->exit.signal (); -} - -void recvProcessThread::enable () -{ - unsigned copy; - - { - epicsAutoMutex autoMutex ( this->mutex ); - assert ( this->enableRefCount < UINT_MAX ); - copy = this->enableRefCount; - this->enableRefCount++; - } - - if ( copy == 0u ) { - this->recvActivity.signal (); - } -} - -void recvProcessThread::disable () -{ - bool wakeupNeeded; - - { - epicsAutoMutex autoMutex ( this->mutex ); - - if ( ! this->processing ) { - assert ( this->enableRefCount != 0u ); - this->enableRefCount--; - return; - } - else { - this->blockingForCompletion++; - } - } - - while ( true ) { - this->processingDone.wait (); - - { - epicsAutoMutex autoMutex ( this->mutex ); - - if ( ! this->processing ) { - assert ( this->enableRefCount > 0u ); - this->enableRefCount--; - assert ( this->blockingForCompletion > 0u ); - this->blockingForCompletion--; - wakeupNeeded = this->blockingForCompletion > 0u; - break; - } - } - } - - if ( wakeupNeeded ) { - this->processingDone.signal (); - } -} - -void recvProcessThread::signalActivity () -{ - this->recvActivity.signal (); -} - -void recvProcessThread::show ( unsigned level ) const -{ - epicsAutoMutex autoMutex ( this->mutex ); - ::printf ( "CA receive processing thread at %p state=%s\n", - static_cast ( this ), this->processing ? "busy" : "idle"); - if ( level > 0u ) { - ::printf ( "enable count %u\n", this->enableRefCount ); - ::printf ( "blocking for completion count %u\n", this->blockingForCompletion ); - } - if ( level > 1u ) { - ::printf ( "\tCA client at %p\n", static_cast < void * > ( this->pcac ) ); - ::printf ( "\tshutdown command boolean %u\n", this->shutDown ); - } - if ( level > 2u ) { - ::printf ( "Receive activity event:\n" ); - this->recvActivity.show ( level - 3u ); - ::printf ( "exit event:\n" ); - this->exit.show ( level - 3u ); - ::printf ( "processing done event:\n" ); - this->processingDone.show ( level - 3u ); - ::printf ( "mutex:\n" ); - this->mutex.show ( level - 3u ); - } -}