diff --git a/src/ca/CASG.cpp b/src/ca/CASG.cpp index 3b5035498..b4b1fa7fb 100644 --- a/src/ca/CASG.cpp +++ b/src/ca/CASG.cpp @@ -73,9 +73,15 @@ int CASG::block ( double timeout ) { epicsTime cur_time; epicsTime beg_time; - double delay; - double remaining; - int status; + double delay; + double remaining; + int status; + + // prevent recursion nightmares by disabling blocking + // for IO from within a CA callback. + if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) { + return ECA_EVDISALLOW; + } if ( timeout < 0.0 ) { return ECA_TIMEOUT; @@ -105,7 +111,15 @@ int CASG::block ( double timeout ) break; } - this->client.blockForEventAndEnableCallbacks ( this->sem, remaining ); + { + // serialize access the blocking mechanism below + epicsAutoMutex autoMutex ( this->serializeBlock ); + + status = this->client.blockForEventAndEnableCallbacks ( this->sem, remaining ); + if ( status != ECA_NORMAL ) { + return status; + } + } /* * force a time update diff --git a/src/ca/access.cpp b/src/ca/access.cpp index a9bca73df..e4b359ca9 100644 --- a/src/ca/access.cpp +++ b/src/ca/access.cpp @@ -32,7 +32,7 @@ #include "oldAccess.h" #include "autoPtrDestroy.h" -epicsThreadPrivateId caClientContextId; +static epicsThreadPrivateId caClientContextId; static epicsThreadOnceId caClientContextIdOnce = EPICS_THREAD_ONCE_INIT; diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index c7e12d732..46d34bac2 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -100,6 +100,27 @@ const cac::pExcepProtoStubTCP cac::tcpExcepJumpTableCAC [] = &cac::defaultExcep // CA_PROTO_SERVER_DISCONN }; +epicsThreadPrivateId caClientCallbackThreadId; + +static epicsThreadOnceId cacOnce = EPICS_THREAD_ONCE_INIT; + +extern "C" void cacExitHandler () +{ + epicsThreadPrivateDelete ( caClientCallbackThreadId ); +} + +// runs once only for each process +extern "C" void cacOnceFunc ( void * ) +{ + caClientCallbackThreadId = epicsThreadPrivateCreate (); + if ( caClientCallbackThreadId ) { + atexit ( cacExitHandler ); + } + else { + throw std::bad_alloc (); + } +} + // // cac::cac () // @@ -110,36 +131,27 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : pRepeaterSubscribeTmr ( 0 ), tcpSmallRecvBufFreeList ( 0 ), tcpLargeRecvBufFreeList ( 0 ), - pRecvProcessThread ( 0 ), - isRecvProcessId ( epicsThreadPrivateCreate() ), notify ( notifyIn ), - ioNotifyInProgressId ( 0 ), initializingThreadsPriority ( epicsThreadGetPrioritySelf () ), - threadsBlockingOnNotifyCompletion ( 0u ), maxRecvBytesTCP ( MAX_TCP ), - recvProcessEnableRefCount ( 0u ), pndRecvCnt ( 0u ), readSeq ( 0u ), - enablePreemptiveCallback ( enablePreemptiveCallbackIn ), - ioInProgress ( false ), - recvProcessThreadExitRequest ( false ) + recvThreadsPendingCount ( 0u ), + enablePreemptiveCallback ( enablePreemptiveCallbackIn ) { long status; unsigned abovePriority; - if ( ! this->isRecvProcessId ) { - throw std::bad_alloc (); - } + epicsThreadOnce ( &cacOnce, cacOnceFunc, 0 ); if ( ! osiSockAttach () ) { - epicsThreadPrivateDelete ( this->isRecvProcessId ); throwWithLocation ( caErrorCode (ECA_INTERNAL) ); } { epicsThreadBooleanStatus tbs; - tbs = epicsThreadLowestPriorityLevelAbove ( this->initializingThreadsPriority, &abovePriority); + tbs = epicsThreadLowestPriorityLevelAbove ( this->initializingThreadsPriority, &abovePriority ); if ( tbs != epicsThreadBooleanStatusSuccess ) { abovePriority = this->initializingThreadsPriority; } @@ -159,7 +171,6 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : len = strlen ( tmp ) + 1; this->pUserName = new char [len]; if ( ! this->pUserName ) { - epicsThreadPrivateDelete ( this->isRecvProcessId ); throwWithLocation ( caErrorCode (ECA_ALLOCMEM) ); } strncpy ( this->pUserName, tmp, len ); @@ -199,7 +210,6 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : freeListInitPvt ( &this->tcpSmallRecvBufFreeList, MAX_TCP, 1 ); if ( ! this->tcpSmallRecvBufFreeList ) { osiSockRelease (); - epicsThreadPrivateDelete ( this->isRecvProcessId ); free ( this->pUserName ); throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) ); } @@ -207,7 +217,6 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : freeListInitPvt ( &this->tcpLargeRecvBufFreeList, this->maxRecvBytesTCP, 1 ); if ( ! this->tcpLargeRecvBufFreeList ) { osiSockRelease (); - epicsThreadPrivateDelete ( this->isRecvProcessId ); free ( this->pUserName ); freeListCleanup ( this->tcpSmallRecvBufFreeList ); throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) ); @@ -216,13 +225,14 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : this->pTimerQueue = & epicsTimerQueueActive::allocate ( false, abovePriority ); if ( ! this->pTimerQueue ) { osiSockRelease (); - epicsThreadPrivateDelete ( this->isRecvProcessId ); free ( this->pUserName ); freeListCleanup ( this->tcpSmallRecvBufFreeList ); freeListCleanup ( this->tcpLargeRecvBufFreeList ); throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) ); } - this->preemptiveCallbackLock.lock (); + if ( ! this->enablePreemptiveCallback ) { + this->callbackMutex.lock (); + } } cac::~cac () @@ -231,41 +241,36 @@ cac::~cac () // make certain that process thread isnt deleting // tcpiiu objects at the same that this thread is // - if ( this->pRecvProcessThread ) { - this->recvProcessThreadExitRequest = true; - this->recvProcessActivityEvent.signal (); - { - epicsAutoMutex autoMutex ( this->mutex ); - this->enableCallbackPreemption (); - } - this->recvProcessThreadExit.wait (); - delete this->pRecvProcessThread; + if ( ! this->enablePreemptiveCallback ) { + this->callbackMutex.unlock (); } + // + // lock intentionally not held here so that we dont deadlock + // waiting for the UDP thread to exit while it is waiting to + // get the lock. + if ( this->pudpiiu ) { + // this blocks until the UDP thread exits so that + // it will not sneak in any new clients + this->pudpiiu->shutdown (); + } + + // + // shutdown all tcp connections + // { epicsAutoMutex autoMutex ( this->mutex ); - if ( this->pudpiiu ) { - // this blocks until the UDP thread exits so that - // it will not sneak in any new clients - this->pudpiiu->shutdown (); + for ( tsDLIterBD < tcpiiu > piiu = this->iiuList.firstIter(); + piiu.valid(); piiu++ ) { + piiu->cleanShutdown (); } } // - // shutdown all tcp connections and wait for threads to exit + // wait for tcp threads to exit // - while ( true ) { - tcpiiu * piiu; - { - epicsAutoMutex autoMutex ( this->mutex ); - if ( ( piiu = this->iiuList.get() ) ) { - piiu->disconnectAllChan ( limboIIU ); - } - } - if ( ! piiu ) { - break; - } - piiu->destroy (); + while ( this->iiuList.count() ) { + this->iiuUninstal.wait (); } delete this->pRepeaterSubscribeTmr; @@ -277,18 +282,18 @@ cac::~cac () { epicsAutoMutex autoMutex ( this->mutex ); if ( this->pudpiiu ) { - - // - // make certain that the UDP thread isnt starting - // up new clients. - // - this->pudpiiu->disconnectAllChan ( limboIIU ); + tsDLList < nciu > tmpList; + this->pudpiiu->uninstallAllChan ( tmpList ); + while ( nciu *pChan = tmpList.get () ) { + this->disconnectAllIO ( *pChan, false ); + pChan->disconnect ( limboIIU ); + // no need to call disconnect notify or + // access rights notify here + limboIIU.attachChannel ( *pChan ); + } } } - if ( ! this->enablePreemptiveCallback && this->pudpiiu ) { - this->notify.fdWasDestroyed ( this->pudpiiu->getSock() ); - } delete this->pudpiiu; delete this->pUserName; @@ -304,64 +309,6 @@ cac::~cac () osiSockRelease (); this->pTimerQueue->release (); - - epicsThreadPrivateDelete ( this->isRecvProcessId ); -} - -// lock must be applied -void cac::processRecvBacklog () -{ - epicsThreadPrivateSet ( this->isRecvProcessId, this ); - - tsDLList < tcpiiu > deadIIU; - { - tsDLIterBD < tcpiiu > piiu = this->iiuList.firstIter (); - while ( piiu.valid () ) { - tsDLIterBD < tcpiiu > pNext = piiu; - pNext++; - if ( ! piiu->alive () ) { - if ( piiu->channelCount () ) { - char hostNameTmp[64]; - piiu->hostName ( hostNameTmp, sizeof ( hostNameTmp ) ); - genLocalExcep ( *this, ECA_DISCONN, hostNameTmp ); - } - piiu->getBHE().unbindFromIIU (); - assert ( this->pudpiiu ); - piiu->disconnectAllChan ( *this->pudpiiu ); - this->iiuList.remove ( *piiu ); - deadIIU.add ( *piiu ); // postpone destroy and avoid deadlock - } - else { - // make certain that: - // 1) this is called from the appropriate thread - // 2) lock is not held while in call back - if ( piiu->trueOnceOnly() && ! this->enablePreemptiveCallback ) { - epicsAutoMutexRelease autoRelease ( this->mutex ); - this->notify.fdWasCreated ( piiu->getSock() ); - } - piiu->processIncoming (); - } - piiu = pNext; - } - } - if ( deadIIU.count() ) { - { - epicsAutoMutexRelease autoRelease ( this->mutex ); - while ( tcpiiu *piiu = deadIIU.get() ) { - // make certain that: - // 1) this is called from the appropriate thread - // 2) lock is not held while in call back - if ( ! this->enablePreemptiveCallback ) { - this->notify.fdWasDestroyed ( piiu->getSock() ); - } - piiu->destroy (); - } - } - assert ( this->pSearchTmr ); - this->pSearchTmr->resetPeriod ( 0.0 ); - } - - epicsThreadPrivateSet ( this->isRecvProcessId, 0 ); } // @@ -442,10 +389,6 @@ void cac::show ( unsigned level ) const } ::printf ( "IP address to name conversion engine:\n" ); this->ipToAEngine.show ( level - 3u ); - ::printf ( "recv process enable count %u\n", - this->recvProcessEnableRefCount ); - ::printf ( "\trecv process shutdown command boolean %u\n", - this->recvProcessThreadExitRequest ); ::printf ( "\tthe current read sequence number is %u\n", this->readSeq ); } @@ -453,10 +396,6 @@ void cac::show ( unsigned level ) const if ( level > 3u ) { ::printf ( "Default mutex:\n"); this->mutex.show ( level - 4u ); - ::printf ( "Receive process activity event:\n" ); - this->recvProcessActivityEvent.show ( level - 3u ); - ::printf ( "Receive process exit event:\n" ); - this->recvProcessThreadExit.show ( level - 3u ); ::printf ( "IO done event:\n"); this->ioDone.show ( level - 3u ); ::printf ( "mutex:\n" ); @@ -539,8 +478,8 @@ void cac::beaconNotify ( const inetAddrID &addr, const epicsTime ¤tTime ) int cac::pendIO ( const double & timeout ) { // prevent recursion nightmares by disabling calls to - // pendIO () from within a CA callback - if ( epicsThreadPrivateGet ( this->isRecvProcessId ) ) { + // pendIO () from within a CA callback. + if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) { return ECA_EVDISALLOW; } @@ -550,20 +489,25 @@ int cac::pendIO ( const double & timeout ) { epicsAutoMutex autoMutex ( this->mutex ); - this->flushRequestPrivate (); + } + { + // serialize access the blocking mechanism below + epicsAutoMutex autoMutex ( this->serializePendIO ); + while ( this->pndRecvCnt > 0 ) { if ( remaining < CAC_SIGNIFICANT_DELAY ) { status = ECA_TIMEOUT; break; } - this->enableCallbackPreemption (); - { - epicsAutoMutexRelease autoRelease ( this->mutex ); + if ( this->enablePreemptiveCallback ) { + this->ioDone.wait ( remaining ); + } + else { + epicsAutoMutexRelease autoRelease ( this->callbackMutex ); this->ioDone.wait ( remaining ); } - this->disableCallbackPreemption (); double delay = epicsTime::getCurrent () - beg_time; if ( delay < timeout ) { remaining = timeout - delay; @@ -572,7 +516,10 @@ int cac::pendIO ( const double & timeout ) remaining = 0.0; } } + } + { + epicsAutoMutex autoMutex ( this->mutex ); this->readSeq++; this->pndRecvCnt = 0u; if ( this->pudpiiu ) { @@ -583,77 +530,71 @@ int cac::pendIO ( const double & timeout ) return status; } -void cac::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ) +int cac::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ) { - epicsAutoMutex autoMutex ( this->mutex ); - this->enableCallbackPreemption (); - { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); + if ( this->enablePreemptiveCallback ) { event.wait ( timeout ); } - this->disableCallbackPreemption (); + else { + epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex ); + event.wait ( timeout ); + } + + return ECA_NORMAL; } int cac::pendEvent ( const double & timeout ) { - epicsTime current = epicsTime::getCurrent (); - // prevent recursion nightmares by disabling calls to - // pendIO () from within a CA callback - if ( epicsThreadPrivateGet ( this->isRecvProcessId ) ) { + // pendIO () from within a CA callback. + if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) { return ECA_EVDISALLOW; } + epicsTime current = epicsTime::getCurrent (); + { epicsAutoMutex autoMutex ( this->mutex ); - this->flushRequestPrivate (); + } + + { + // serialize access the blocking mechanism below + epicsAutoMutex autoMutex ( this->serializePendEvent ); // process at least once if preemptive callback - // isnt enabled (this avoids complications associated - // with forcing the recv processing thread to run. - if ( ! this->enablePreemptiveCallback && - this->recvProcessEnableRefCount == 0 ) { - this->processRecvBacklog (); + // isnt enabled + if ( ! this->enablePreemptiveCallback ) { + epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex ); + while ( this->recvThreadsPendingCount ) { + this->noRecvThreadsPending.wait (); + } } + } - double elapsed = epicsTime::getCurrent() - current; - double delay; + double elapsed = epicsTime::getCurrent() - current; + double delay; - if ( timeout > elapsed ) { - delay = timeout - elapsed; + if ( timeout > elapsed ) { + delay = timeout - elapsed; + } + else { + delay = 0.0; + } + + if ( delay >= CAC_SIGNIFICANT_DELAY ) { + if ( this->enablePreemptiveCallback ) { + epicsThreadSleep ( delay ); } else { - delay = 0.0; - } - - if ( delay >= CAC_SIGNIFICANT_DELAY ) { - this->enableCallbackPreemption (); - { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); - epicsThreadSleep ( delay ); - } - this->disableCallbackPreemption (); + epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex ); + epicsThreadSleep ( delay ); } } return ECA_TIMEOUT; } -// this is to only be used by early protocol revisions -bool cac::connectChannel ( unsigned id ) -{ - epicsAutoMutex autoMutex ( this->mutex ); - nciu * pChan = this->chanTable.lookup ( id ); - if ( pChan ) { - pChan->connect (); - return true; - } - else { - return false; - } -} - void cac::installCASG ( CASG &sg ) { epicsAutoMutex autoMutex ( this->mutex ); @@ -683,44 +624,6 @@ void cac::registerService ( cacService &service ) this->services.registerService ( service ); } -void cac::startRecvProcessThread () -{ - epicsAutoMutex epicsMutex ( this->mutex ); - if ( ! this->pRecvProcessThread ) { - unsigned priorityOfProcess; - epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove ( - this->getInitializingThreadsPriority (), &priorityOfProcess ); - if ( tbs != epicsThreadBooleanStatusSuccess ) { - priorityOfProcess = this->getInitializingThreadsPriority (); - } - - this->pRecvProcessThread = new epicsThread ( *this, "CAC-recv-process", - epicsThreadGetStackSize ( epicsThreadStackSmall ), priorityOfProcess ); - if ( ! this->pRecvProcessThread ) { - throw std::bad_alloc (); - } - this->pRecvProcessThread->start (); - if ( this->enablePreemptiveCallback ) { - this->enableCallbackPreemption (); - } - } -} - -// this is the recv process thread entry point -void cac::run () -{ - this->attachToClientCtx (); - while ( ! this->recvProcessThreadExitRequest ) { - { - epicsAutoMutex autoMutexPCB ( this->preemptiveCallbackLock ); - epicsAutoMutex autoMutex ( this->mutex ); - this->processRecvBacklog (); - } - this->recvProcessActivityEvent.wait (); - } - this->recvProcessThreadExit.signal (); -} - cacChannel & cac::createChannel ( const char *pName, cacChannelNotify &chan ) { cacChannel *pIO; @@ -737,7 +640,6 @@ cacChannel & cac::createChannel ( const char *pName, cacChannelNotify &chan ) epics_auto_ptr < cacChannel > pNetChan ( new nciu ( *this, limboIIU, chan, pName ) ); if ( pNetChan.get() ) { - this->startRecvProcessThread (); return *pNetChan.release (); } else { @@ -762,14 +664,10 @@ bool cac::setupUDP () epicsAutoMutex autoMutex ( this->mutex ); if ( ! this->pudpiiu ) { - this->pudpiiu = new udpiiu ( *this, this->isRecvProcessId ); + this->pudpiiu = new udpiiu ( *this ); if ( ! this->pudpiiu ) { return false; } - if ( ! this->enablePreemptiveCallback ) { - epicsAutoMutexRelease autoRelease ( this->mutex ); - this->notify.fdWasCreated ( this->pudpiiu->getSock() ); - } } if ( ! this->pSearchTmr ) { @@ -789,29 +687,6 @@ bool cac::setupUDP () return true; } -// lock must already be applied -void cac::enableCallbackPreemption () -{ - assert ( this->recvProcessEnableRefCount < UINT_MAX ); - this->recvProcessEnableRefCount++; - if ( this->recvProcessEnableRefCount == 1u ) { - this->preemptiveCallbackLock.unlock (); - } -} - -// lock must already be applied -void cac::disableCallbackPreemption () -{ - assert ( this->recvProcessEnableRefCount > 0u ); - this->recvProcessEnableRefCount--; - if ( this->recvProcessEnableRefCount == 0u ) { - if ( ! this->preemptiveCallbackLock.tryLock () ) { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); - this->preemptiveCallbackLock.lock (); - } - } -} - void cac::repeaterSubscribeConfirmNotify () { if ( this->pRepeaterSubscribeTmr ) { @@ -830,82 +705,113 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, return false; } - epicsAutoMutex autoMutex ( this->mutex ); + bool v41Ok, v42Ok; nciu *chan; + { + epicsAutoMutex autoMutex ( this->mutex ); - /* - * ignore search replies for deleted channels - */ - chan = this->chanTable.lookup ( cid ); - if ( ! chan ) { - return true; - } - - retrySeqNumber = chan->getRetrySeqNo (); - - /* - * Ignore duplicate search replies - */ - if ( chan->getPIIU()->isVirtaulCircuit( chan->pName(), addr ) ) { - return true; - } - - /* - * look for an existing virtual circuit - */ - tcpiiu *piiu; - bhe *pBHE = this->beaconTable.lookup ( addr.ia ); - if ( pBHE ) { - piiu = pBHE->getIIU (); - if ( piiu ) { - if ( ! piiu->alive () ) { - return true; - } + /* + * ignore search replies for deleted channels + */ + chan = this->chanTable.lookup ( cid ); + if ( ! chan ) { + return true; } - } - else { - pBHE = new bhe ( epicsTime (), addr.ia ); + + retrySeqNumber = chan->getRetrySeqNo (); + + /* + * Ignore duplicate search replies + */ + if ( chan->getPIIU()->isVirtaulCircuit( chan->pName(), addr ) ) { + return true; + } + + /* + * look for an existing virtual circuit + */ + tcpiiu *piiu; + bhe *pBHE = this->beaconTable.lookup ( addr.ia ); if ( pBHE ) { - if ( this->beaconTable.add ( *pBHE ) < 0 ) { - pBHE->destroy (); - return true; + piiu = pBHE->getIIU (); + if ( piiu ) { + if ( ! piiu->alive () ) { + return true; + } } } else { - return true; - } - piiu = 0; - } - - if ( ! piiu ) { - try { - piiu = new tcpiiu ( *this, this->connTMO, *this->pTimerQueue, - addr, minorVersionNumber, *pBHE, this->ipToAEngine ); - if ( ! piiu ) { + pBHE = new bhe ( epicsTime (), addr.ia ); + if ( pBHE ) { + if ( this->beaconTable.add ( *pBHE ) < 0 ) { + pBHE->destroy (); + return true; + } + } + else { return true; } - this->iiuList.add ( *piiu ); - + piiu = 0; } - catch ( ... ) { - this->printf ( "CAC: Exception during virtual circuit creation\n" ); - return true; + + if ( ! piiu ) { + try { + piiu = new tcpiiu ( *this, this->connTMO, *this->pTimerQueue, + addr, minorVersionNumber, *pBHE, this->ipToAEngine, + this->enablePreemptiveCallback ); + if ( ! piiu ) { + return true; + } + this->iiuList.add ( *piiu ); + + } + catch ( ... ) { + this->printf ( "CAC: Exception during virtual circuit creation\n" ); + return true; + } + } + + this->pudpiiu->detachChannel ( *chan ); + chan->searchReplySetUp ( *piiu, sid, typeCode, count ); + piiu->attachChannel ( *chan ); + + chan->createChannelRequest (); + piiu->flushRequest (); + + v41Ok = piiu->ca_v41_ok (); + v42Ok = piiu->ca_v42_ok (); + + if ( ! v42Ok ) { + // connect to old server with lock applied + chan->connect (); + // resubscribe for monitors from this channel + this->connectAllIO ( *chan ); + } + + if ( this->pSearchTmr ) { + this->pSearchTmr->notifySearchResponse ( retrySeqNumber, currentTime ); } } - this->pudpiiu->detachChannel ( *chan ); - chan->searchReplySetUp ( *piiu, sid, typeCode, count ); - piiu->attachChannel ( *chan ); + if ( ! v42Ok ) { + // channel uninstal routine grabs the callback lock so + // a channel will not be deleted while a call back is + // in progress + // + // the callback lock is also taken when a channel + // disconnects to prevent a race condition with the + // code below + chan->connectStateNotify (); - chan->createChannelRequest (); - piiu->flushRequest (); - - if ( ! piiu->ca_v42_ok () ) { - chan->connect (); - } - - if ( this->pSearchTmr ) { - this->pSearchTmr->notifySearchResponse ( retrySeqNumber, currentTime ); + /* + * if less than v4.1 then the server will never + * send access rights and we know that there + * will always be access and also need to call + * their call back here + */ + if ( ! v41Ok ) { + chan->accessRightsNotify (); + } } return true; @@ -913,12 +819,19 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, void cac::uninstallChannel ( nciu & chan ) { - epicsAutoMutex autoMutex ( this->mutex ); - nciu *pChan = this->chanTable.remove ( chan ); - assert ( pChan = &chan ); - this->flushIfRequired ( chan ); - chan.getPIIU()->clearChannelRequest ( chan ); - chan.getPIIU()->detachChannel ( chan ); + { + epicsAutoMutex autoMutex ( this->mutex ); + nciu *pChan = this->chanTable.remove ( chan ); + assert ( pChan = &chan ); + // flush prior to taking the callback lock + this->flushIfRequired ( *chan.getPIIU() ); + chan.getPIIU()->clearChannelRequest ( chan ); + chan.getPIIU()->detachChannel ( chan ); + } + + // taking this mutex guarantees that we will not delete + // a channel out from under a callback + epicsAutoMutex autoCallbackMutex ( this->callbackMutex ); } int cac::printf ( const char *pformat, ... ) const @@ -936,34 +849,38 @@ int cac::printf ( const char *pformat, ... ) const } // lock must be applied before calling this cac private routine -void cac::flushIfRequired ( nciu &chan ) +void cac::flushIfRequired ( netiiu & iiu ) { - if ( chan.getPIIU()->flushBlockThreshold() ) { - this->flushRequestPrivate (); + if ( iiu.flushBlockThreshold() ) { + iiu.flushRequest (); // the process thread is not permitted to flush as this // can result in a push / pull deadlock on the TCP pipe. // Instead, the process thread scheduals the flush with the // send thread which runs at a higher priority than the // send thread. The same applies to the UDP thread for // locking hierarchy reasons. - if ( ! epicsThreadPrivateGet ( this->isRecvProcessId ) ) { + if ( ! epicsThreadPrivateGet ( caClientCallbackThreadId ) ) { // enable / disable of call back preemption must occur here // because the tcpiiu might disconnect while waiting and its // pointer to this cac might become invalid - this->enableCallbackPreemption (); - chan.getPIIU()->blockUntilSendBacklogIsReasonable ( this->mutex ); - this->disableCallbackPreemption (); + if ( this->enablePreemptiveCallback ) { + iiu.blockUntilSendBacklogIsReasonable ( 0, this->mutex ); + } + else { + iiu.blockUntilSendBacklogIsReasonable + ( &this->callbackMutex, this->mutex ); + } } } else { - chan.getPIIU()->flushRequestIfAboveEarlyThreshold (); + iiu.flushRequestIfAboveEarlyThreshold (); } } void cac::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue ) { epicsAutoMutex autoMutex ( this->mutex ); - this->flushIfRequired ( chan ); + this->flushIfRequired ( *chan.getPIIU() ); chan.getPIIU()->writeRequest ( chan, type, nElem, pValue ); } @@ -976,7 +893,7 @@ cacChannel::ioid cac::writeNotifyRequest ( nciu &chan, unsigned type, unsigned n if ( pIO.get() ) { this->ioTable.add ( *pIO ); chan.cacPrivateListOfIO::eventq.add ( *pIO ); - this->flushIfRequired ( chan ); + this->flushIfRequired ( *chan.getPIIU() ); chan.getPIIU()->writeNotifyRequest ( chan, *pIO, type, nElem, pValue ); return pIO.release()->getId (); @@ -995,7 +912,7 @@ cacChannel::ioid cac::readNotifyRequest ( nciu &chan, unsigned type, if ( pIO.get() ) { this->ioTable.add ( *pIO ); chan.cacPrivateListOfIO::eventq.add ( *pIO ); - this->flushIfRequired ( chan ); + this->flushIfRequired ( *chan.getPIIU() ); chan.getPIIU()->readNotifyRequest ( chan, *pIO, type, nElem ); return pIO.release()->getId (); } @@ -1004,43 +921,32 @@ cacChannel::ioid cac::readNotifyRequest ( nciu &chan, unsigned type, } } -// lock must be applied -bool cac::blockForIOCallbackCompletion ( const cacChannel::ioid & id ) -{ - while ( this->ioInProgress && this->ioNotifyInProgressId == id ) { - assert ( this->threadsBlockingOnNotifyCompletion < UINT_MAX ); - this->threadsBlockingOnNotifyCompletion++; - epicsAutoMutexRelease autoRelease ( this->mutex ); - this->notifyCompletionEvent.wait ( 0.5 ); - assert ( this->threadsBlockingOnNotifyCompletion > 0u ); - this->threadsBlockingOnNotifyCompletion--; - } - return this->threadsBlockingOnNotifyCompletion > 0u; -} - void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id ) { - bool signalNeeded = false; - { - epicsAutoMutex autoMutex ( this->mutex ); - baseNMIU * pmiu = this->ioTable.remove ( id ); - if ( pmiu ) { - chan.cacPrivateListOfIO::eventq.remove ( *pmiu ); - class netSubscription *pSubscr = pmiu->isSubscription (); - if ( pSubscr ) { - chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); - } - if ( epicsThreadPrivateGet ( this->isRecvProcessId ) ) { - signalNeeded = false; - } - else { - signalNeeded = this->blockForIOCallbackCompletion ( id ); - } - pmiu->destroy ( *this ); - } + if ( ! epicsThreadPrivateGet ( caClientCallbackThreadId ) && + this->enablePreemptiveCallback ) { + // wait for any IO callbacks in progress to complete + // prior to destroying the IO object + epicsAutoMutex autoMutex ( this->callbackMutex ); + this->ioCancelPrivate ( chan, id ); } - if ( signalNeeded ) { - this->notifyCompletionEvent.signal (); + else { + this->ioCancelPrivate ( chan, id ); + } +} + +void cac::ioCancelPrivate ( nciu &chan, const cacChannel::ioid &id ) +{ + epicsAutoMutex autoMutex ( this->mutex ); + this->flushIfRequired ( *chan.getPIIU() ); + baseNMIU * pmiu = this->ioTable.remove ( id ); + if ( pmiu ) { + chan.cacPrivateListOfIO::eventq.remove ( *pmiu ); + class netSubscription *pSubscr = pmiu->isSubscription (); + if ( pSubscr ) { + chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); + } + pmiu->destroy ( *this ); } } @@ -1056,258 +962,237 @@ void cac::ioShow ( const cacChannel::ioid &id, unsigned level ) const void cac::ioCompletionNotify ( unsigned id, unsigned type, arrayElementCount count, const void *pData ) { - baseNMIU * pmiu = this->ioTable.lookup ( id ); - if ( ! pmiu ) { - return; - } - assert ( ! this->ioInProgress ); - this->ioInProgress = true; - this->ioNotifyInProgressId = id; + baseNMIU * pmiu; + { - epicsAutoMutexRelease autoRelease ( this->mutex ); - pmiu->completion ( type, count, pData ); - } - // threads blocked canceling this IO will wait - // until we stop processing it - this->ioInProgress = false; - if ( this->threadsBlockingOnNotifyCompletion ) { - this->notifyCompletionEvent.signal (); + epicsAutoMutex autoMutex ( this->mutex ); + pmiu = this->ioTable.lookup ( id ); + if ( ! pmiu ) { + return; + } } + + // + // The IO destroy routines take the call back mutex + // when uninstalling and deleting the baseNMIU so there is + // no need to worry here about the baseNMIU being deleted while + // it is in use here. + // + pmiu->completion ( type, count, pData ); } void cac::ioExceptionNotify ( unsigned id, int status, const char *pContext ) { - baseNMIU * pmiu = this->ioTable.lookup ( id ); + baseNMIU * pmiu; + { + epicsAutoMutex autoMutex ( this->mutex ); + pmiu = this->ioTable.lookup ( id ); + } + if ( ! pmiu ) { return; } - assert ( ! this->ioInProgress ); - this->ioInProgress = true; - this->ioNotifyInProgressId = id; - { - epicsAutoMutexRelease autoRelease ( this->mutex ); - pmiu->exception ( status, pContext ); - } - // threads blocked canceling this IO will wait - // until we stop processing it - this->ioInProgress = false; - if ( this->threadsBlockingOnNotifyCompletion ) { - this->notifyCompletionEvent.signal (); - } + + // + // The IO destroy routines take the call back mutex + // when uninstalling and deleting the baseNMIU so there is + // no need to worry here about the baseNMIU being deleted while + // it is in use here. + // + + pmiu->exception ( status, pContext ); } void cac::ioExceptionNotify ( unsigned id, int status, const char *pContext, unsigned type, arrayElementCount count ) { - baseNMIU * pmiu = this->ioTable.lookup ( id ); - if ( ! pmiu ) { - return; - } - assert ( ! this->ioInProgress ); - this->ioInProgress = true; - this->ioNotifyInProgressId = id; + baseNMIU * pmiu; + { - epicsAutoMutexRelease autoRelease ( this->mutex ); - pmiu->exception ( status, pContext, type, count ); - } - // threads blocked canceling this IO will wait - // until we stop processing it - this->ioInProgress = false; - if ( this->threadsBlockingOnNotifyCompletion ) { - this->notifyCompletionEvent.signal (); + epicsAutoMutex autoMutex ( this->mutex ); + pmiu = this->ioTable.lookup ( id ); + if ( ! pmiu ) { + return; + } } + + // + // The IO destroy routines take the call back mutex + // when uninstalling and deleting the baseNMIU so there is + // no need to worry here about the baseNMIU being deleted while + // it is in use here. + // + pmiu->exception ( status, pContext, type, count ); } void cac::ioCompletionNotifyAndDestroy ( unsigned id ) { + epicsAutoMutex autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.remove ( id ); if ( ! pmiu ) { return; } + pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu ); - assert ( ! this->ioInProgress ); - this->ioInProgress = true; - this->ioNotifyInProgressId = id; + + // + // The IO destroy routines take the call back mutex + // when uninstalling and deleting the baseNMIU so there is + // no need to worry here about the baseNMIU being deleted while + // it is in use here. + // { - epicsAutoMutexRelease autoRelease ( this->mutex ); + epicsAutoMutexRelease autoMutexRelease ( this->mutex ); pmiu->completion (); } + pmiu->destroy ( *this ); - // threads blocked canceling this IO will wait - // until we stop processing it - this->ioInProgress = false; - if ( this->threadsBlockingOnNotifyCompletion ) { - this->notifyCompletionEvent.signal (); - } } void cac::ioCompletionNotifyAndDestroy ( unsigned id, unsigned type, arrayElementCount count, const void *pData ) { + epicsAutoMutex autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.remove ( id ); if ( ! pmiu ) { return; } - assert ( ! this->ioInProgress ); - this->ioInProgress = true; - this->ioNotifyInProgressId = id; pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu ); + + // + // The IO destroy routines take the call back mutex + // when uninstalling and deleting the baseNMIU so there is + // no need to worry here about the baseNMIU being deleted while + // it is in use here. + // { - epicsAutoMutexRelease autoRelease ( this->mutex ); + epicsAutoMutexRelease autoMutexRelease ( this->mutex ); pmiu->completion ( type, count, pData ); } + pmiu->destroy ( *this ); - // threads blocked canceling this IO will wait - // until we stop processing it - this->ioInProgress = false; - if ( this->threadsBlockingOnNotifyCompletion ) { - this->notifyCompletionEvent.signal (); - } } void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext ) { + epicsAutoMutex autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.remove ( id ); if ( ! pmiu ) { return; } - assert ( ! this->ioInProgress ); - this->ioInProgress = true; - this->ioNotifyInProgressId = id; pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu ); + + // + // The IO destroy routines take the call back mutex + // when uninstalling and deleting the baseNMIU so there is + // no need to worry here about the baseNMIU being deleted while + // it is in use here. + // { - epicsAutoMutexRelease autoRelease ( this->mutex ); + epicsAutoMutexRelease autoMutexRelease ( this->mutex ); pmiu->exception ( status, pContext ); } + pmiu->destroy ( *this ); - // threads blocked canceling this IO will wait - // until we stop processing it - this->ioInProgress = false; - if ( this->threadsBlockingOnNotifyCompletion ) { - this->notifyCompletionEvent.signal (); - } } void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext, unsigned type, arrayElementCount count ) { + epicsAutoMutex autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.remove ( id ); if ( ! pmiu ) { return; } - assert ( ! this->ioInProgress ); - this->ioInProgress = true; - this->ioNotifyInProgressId = id; pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu ); + + // + // The IO destroy routines take the call back mutex + // when uninstalling and deleting the baseNMIU so there is + // no need to worry here about the baseNMIU being deleted while + // it is in use here. + // + { - epicsAutoMutexRelease autoRelease ( this->mutex ); + epicsAutoMutexRelease autoMutexRelease ( this->mutex ); pmiu->exception ( status, pContext, type, count ); } + pmiu->destroy ( *this ); - // threads blocked canceling this IO will wait - // until we stop processing it - this->ioInProgress = false; - if ( this->threadsBlockingOnNotifyCompletion ) { - this->notifyCompletionEvent.signal (); - } } // resubscribe for monitors from this channel -void cac::connectAllIO ( nciu &chan ) +// (lock must be applied) +void cac::connectAllIO ( nciu & chan ) { - bool signalNeeded = false; - { - tsDLList < baseNMIU > tmpList; - epicsAutoMutex autoMutex ( this->mutex ); - tsDLIterBD < baseNMIU > pNetIO = - chan.cacPrivateListOfIO::eventq.firstIter (); - while ( pNetIO.valid () ) { - tsDLIterBD < baseNMIU > next = pNetIO; - next++; - class netSubscription *pSubscr = pNetIO->isSubscription (); - if ( pSubscr ) { - try { - chan.getPIIU()->subscriptionRequest ( chan, *pSubscr ); - } - catch ( ... ) { - this->printf ( "cac: invalid subscription request ignored\n" ); - } - } - else { - // it shouldnt be here at this point - so uninstall it - this->ioTable.remove ( *pNetIO ); - chan.cacPrivateListOfIO::eventq.remove ( *pNetIO ); - tmpList.add ( *pNetIO ); - } - pNetIO = next; + tsDLIterBD < baseNMIU > pNetIO = + chan.cacPrivateListOfIO::eventq.firstIter (); + while ( pNetIO.valid () ) { + tsDLIterBD < baseNMIU > next = pNetIO; + next++; + class netSubscription *pSubscr = pNetIO->isSubscription (); + // disconnected channels should have only subscription IO attached + assert ( pSubscr ); + try { + chan.getPIIU()->subscriptionRequest ( chan, *pSubscr ); } - chan.getPIIU()->requestRecvProcessPostponedFlush (); - while ( baseNMIU *pIO = tmpList.get () ) { - signalNeeded = this->blockForIOCallbackCompletion ( pIO->getID() ); - pIO->destroy ( *this ); + catch ( ... ) { + this->printf ( "cac: invalid subscription request ignored\n" ); } + pNetIO = next; } - if ( signalNeeded ) { - this->notifyCompletionEvent.signal (); - } + chan.getPIIU()->requestRecvProcessPostponedFlush (); } // cancel IO operations and monitor subscriptions -void cac::disconnectAllIO ( nciu &chan ) +// (lock must be applied here) +void cac::disconnectAllIO ( nciu & chan, bool enableCallbacks ) { - bool signalNeeded = false; - { - epicsAutoMutex autoMutex ( this->mutex ); - tsDLList < baseNMIU > tmpList; - tsDLIterBD < baseNMIU > pNetIO = - chan.cacPrivateListOfIO::eventq.firstIter (); - while ( pNetIO.valid () ) { - tsDLIterBD < baseNMIU > next = pNetIO; - next++; - if ( ! pNetIO->isSubscription () ) { - // no use after disconnected - so uninstall it - this->ioTable.remove ( *pNetIO ); - chan.cacPrivateListOfIO::eventq.remove ( *pNetIO ); - tmpList.add ( *pNetIO ); - } - pNetIO = next; - } - while ( baseNMIU *pIO = tmpList.get () ) { - char buf[128]; - sprintf ( buf, "host = %100s", chan.pHostName() ); - { + while ( baseNMIU *pNetIO = chan.cacPrivateListOfIO::eventq.get() ) { + if ( ! pNetIO->isSubscription () ) { + // no use after disconnected - so uninstall it + this->ioTable.remove ( *pNetIO ); + chan.cacPrivateListOfIO::eventq.remove ( *pNetIO ); + if ( enableCallbacks ) { epicsAutoMutexRelease unlocker ( this->mutex ); - pIO->exception ( ECA_DISCONN, buf ); + char buf[128]; + sprintf ( buf, "host = %100s", chan.pHostName() ); + // callbacks are locked at a higher level + pNetIO->exception ( ECA_DISCONN, buf ); } - signalNeeded = this->blockForIOCallbackCompletion ( pIO->getID() ); - pIO->destroy ( *this ); + pNetIO->destroy ( *this ); } } - if ( signalNeeded ) { - this->notifyCompletionEvent.signal (); - } } -void cac::destroyAllIO ( nciu &chan ) +// this gets called when the user destroys a channel +void cac::destroyAllIO ( nciu & chan ) { - bool signalNeeded = false; - { - epicsAutoMutex autoMutex ( this->mutex ); - while ( baseNMIU *pIO = chan.cacPrivateListOfIO::eventq.get() ) { - this->ioTable.remove ( *pIO ); - this->flushIfRequired ( chan ); - class netSubscription *pSubscr = pIO->isSubscription (); - if ( pSubscr ) { - chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); - } - signalNeeded = this->blockForIOCallbackCompletion ( pIO->getID() ); - pIO->destroy ( *this ); + if ( ! epicsThreadPrivateGet ( caClientCallbackThreadId ) && + this->enablePreemptiveCallback ) { + // force any callbacks in progress to complete + // before deleteing the IO + epicsAutoMutex autoMutex ( this->callbackMutex ); + this->privateDestroyAllIO ( chan ); + } + else { + this->privateDestroyAllIO ( chan ); + } +} + +void cac::privateDestroyAllIO ( nciu & chan ) +{ + epicsAutoMutex autoMutex ( this->mutex ); + this->flushIfRequired ( *chan.getPIIU() ); + while ( baseNMIU *pIO = chan.cacPrivateListOfIO::eventq.get() ) { + this->ioTable.remove ( *pIO ); + class netSubscription *pSubscr = pIO->isSubscription (); + if ( pSubscr ) { + chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); } - } - if ( signalNeeded ) { - this->notifyCompletionEvent.signal (); - } + pIO->destroy ( *this ); + } } void cac::recycleReadNotifyIO ( netReadNotifyIO &io ) @@ -1335,7 +1220,7 @@ cacChannel::ioid cac::subscriptionRequest ( nciu &chan, unsigned type, this->ioTable.add ( *pIO ); chan.cacPrivateListOfIO::eventq.add ( *pIO ); if ( chan.connected () ) { - this->flushIfRequired ( chan ); + this->flushIfRequired ( *chan.getPIIU() ); chan.getPIIU()->subscriptionRequest ( chan, *pIO ); } cacChannel::ioid id = pIO->getId (); @@ -1566,45 +1451,79 @@ bool cac::exceptionRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *p bool cac::accessRightsRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * /* pMsgBdy */ ) { - nciu * pChan = this->chanTable.lookup ( hdr.m_cid ); - if ( pChan ) { - unsigned ar = hdr.m_available; - caAccessRights accessRights ( - ( ar & CA_PROTO_ACCESS_RIGHT_READ ) ? true : false, - ( ar & CA_PROTO_ACCESS_RIGHT_WRITE ) ? true : false); - pChan->accessRightsStateChange ( accessRights ); + nciu * pChan; + { + epicsAutoMutex autoMutex ( this->mutex ); + pChan = this->chanTable.lookup ( hdr.m_cid ); + if ( pChan ) { + unsigned ar = hdr.m_available; + caAccessRights accessRights ( + ( ar & CA_PROTO_ACCESS_RIGHT_READ ) ? true : false, + ( ar & CA_PROTO_ACCESS_RIGHT_WRITE ) ? true : false); + pChan->accessRightsStateChange ( accessRights ); + } } + + // + // the channel delete routine takes the call back lock so + // that this will not be called when the channel is being + // deleted. + // + if ( pChan ) { + pChan->accessRightsNotify (); + } + return true; } bool cac::claimCIURespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void * /*pMsgBdy */ ) { - nciu * pChan = this->chanTable.lookup ( hdr.m_cid ); + nciu * pChan; + + { + epicsAutoMutex autoMutex ( this->mutex ); + pChan = this->chanTable.lookup ( hdr.m_cid ); + if ( pChan ) { + unsigned sidTmp; + if ( iiu.ca_v44_ok() ) { + sidTmp = hdr.m_available; + } + else { + sidTmp = pChan->getSID (); + } + pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp, iiu.ca_v41_ok() ); + this->connectAllIO ( *pChan ); + } + } + // the callback lock is taken when a channel is unistalled or when + // is disconnected to prevent race conditions here if ( pChan ) { - unsigned sidTmp; - if ( iiu.ca_v44_ok() ) { - sidTmp = hdr.m_available; - } - else { - sidTmp = pChan->getSID (); - } - pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp, iiu.ca_v41_ok() ); - return true; - } - else { - return true; // ignore claim response to deleted channel + pChan->connectStateNotify (); } + return true; } -bool cac::verifyAndDisconnectChan ( tcpiiu &, const caHdrLargeArray &hdr, void * /* pMsgBdy */ ) +bool cac::verifyAndDisconnectChan ( tcpiiu & iiu, + const caHdrLargeArray & hdr, void * /* pMsgBdy */ ) { - nciu * pChan = this->chanTable.lookup ( hdr.m_cid ); - if ( pChan ) { - assert ( this->pudpiiu && this->pSearchTmr ); - pChan->disconnect ( *this->pudpiiu ); + nciu * pChan; + + { + epicsAutoMutex autoMutex ( this->mutex ); + pChan = this->chanTable.lookup ( hdr.m_cid ); + if ( ! pChan ) { + return true; + } + this->disconnectAllIO ( *pChan, true ); this->pSearchTmr->resetPeriod ( 0.0 ); + pChan->disconnect ( *this->pudpiiu ); + this->pudpiiu->attachChannel ( *pChan ); } + + pChan->connectStateNotify (); + pChan->accessRightsNotify (); + return true; } @@ -1749,7 +1668,7 @@ void cac::decrementOutstandingIO ( unsigned sequenceNo ) } } -void cac::selfTest () +void cac::selfTest () const { this->chanTable.verify (); this->ioTable.verify (); @@ -1757,3 +1676,77 @@ void cac::selfTest () this->beaconTable.verify (); } +void cac::notifyNewFD ( SOCKET sock ) const +{ + if ( ! this->enablePreemptiveCallback ) { + this->notify.fdWasCreated ( sock ); + } +} + +void cac::notifyDestroyFD ( SOCKET sock ) const +{ + if ( ! this->enablePreemptiveCallback ) { + this->notify.fdWasDestroyed ( sock ); + } +} + +void cac::uninstallIIU ( tcpiiu & iiu ) +{ + epicsAutoMutex autoMutex ( this->mutex ); + if ( iiu.channelCount () ) { + char hostNameTmp[64]; + iiu.hostName ( hostNameTmp, sizeof ( hostNameTmp ) ); + genLocalExcep ( *this, ECA_DISCONN, hostNameTmp ); + } + iiu.getBHE().unbindFromIIU (); + assert ( this->pudpiiu ); + tsDLList < nciu > tmpList; + iiu.uninstallAllChan ( tmpList ); + while ( nciu *pChan = tmpList.get () ) { + this->disconnectAllIO ( *pChan, true ); + pChan->disconnect ( *this->pudpiiu ); + this->pudpiiu->attachChannel ( *pChan ); + { + epicsAutoMutexRelease autoMutexRelease ( this->mutex ); + pChan->connectStateNotify (); + pChan->accessRightsNotify (); + } + } + this->iiuList.remove ( iiu ); + this->pSearchTmr->resetPeriod ( 0.0 ); + // signal iiu uninstal event so that cac can properly shut down + this->iiuUninstal.signal(); +} + +void cac::preemptiveCallbackLock() +{ + if ( ! this->enablePreemptiveCallback ) { + epicsAutoMutex autoMutex ( this->mutex ); + assert ( this->recvThreadsPendingCount < UINT_MAX ); + this->recvThreadsPendingCount++; + } + this->callbackMutex.lock (); +} + +void cac::preemptiveCallbackUnlock() +{ + this->callbackMutex.unlock (); + if ( ! this->enablePreemptiveCallback ) { + bool signalRequired; + { + epicsAutoMutex autoMutex ( this->mutex ); + assert ( this->recvThreadsPendingCount > 0 ); + this->recvThreadsPendingCount--; + if ( this->recvThreadsPendingCount == 0u ) { + signalRequired = true; + } + else { + signalRequired = false; + } + } + if ( signalRequired ) { + this->noRecvThreadsPending.signal (); + } + } +} + diff --git a/src/ca/cac.h b/src/ca/cac.h index 4b7893218..4157c4296 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -48,7 +48,9 @@ struct CASG; class inetAddrID; struct caHdrLargeArray; -class cac : private cacRecycle, private epicsThreadRunable +extern epicsThreadPrivateId caClientCallbackThreadId; + +class cac : private cacRecycle { public: cac ( cacNotify &, bool enablePreemptiveCallback = false ); @@ -59,9 +61,6 @@ public: const epicsTime ¤tTime ); void repeaterSubscribeConfirmNotify (); - // IIU routines - void signalRecvActivity (); - // outstanding IO count management routines void incrementOutstandingIO (); void decrementOutstandingIO (); @@ -73,15 +72,12 @@ public: void flushRequest (); int pendIO ( const double &timeout ); int pendEvent ( const double &timeout ); - void connectAllIO ( nciu &chan ); - void disconnectAllIO ( nciu &chan ); void destroyAllIO ( nciu &chan ); bool executeResponse ( tcpiiu &, caHdrLargeArray &, char *pMsgBody ); void ioCancel ( nciu &chan, const cacChannel::ioid &id ); void ioShow ( const cacChannel::ioid &id, unsigned level ) const; // channel routines - bool connectChannel ( unsigned id ); void installNetworkChannel ( nciu &, netiiu *&piiu ); bool lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, ca_uint16_t typeCode, arrayElementCount count, unsigned minorVersionNumber, @@ -110,7 +106,7 @@ public: const char *pFileName, unsigned lineNo ); // callback preemption control - void blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ); + int blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ); // diagnostics unsigned connectionCount () const; @@ -123,17 +119,22 @@ public: void vSignal ( int ca_status, const char *pfilenm, int lineno, const char *pFormat, va_list args ); - // misc - const char * userNamePointer () const; - unsigned getInitializingThreadsPriority () const; - epicsMutex & mutexRef (); - void attachToClientCtx (); + // buffer management char * allocateSmallBufferTCP (); void releaseSmallBufferTCP ( char * ); unsigned largeBufferSizeTCP () const; char * allocateLargeBufferTCP (); void releaseLargeBufferTCP ( char * ); - void selfTest (); + + // misc + const char * userNamePointer () const; + unsigned getInitializingThreadsPriority () const; + epicsMutex & mutexRef (); + void attachToClientCtx (); + void selfTest () const; + void notifyNewFD ( SOCKET ) const; + void notifyDestroyFD ( SOCKET ) const; + void uninstallIIU ( tcpiiu &iiu ); private: ipAddrToAsciiEngine ipToAEngine; @@ -159,11 +160,12 @@ private: epicsTime programBeginTime; double connTMO; mutable epicsMutex mutex; - epicsMutex preemptiveCallbackLock; - epicsEvent notifyCompletionEvent; - epicsEvent recvProcessActivityEvent; - epicsEvent recvProcessThreadExit; + epicsMutex callbackMutex; + epicsMutex serializePendIO; + epicsMutex serializePendEvent; epicsEvent ioDone; + epicsEvent noRecvThreadsPending; + epicsEvent iiuUninstal; epicsTimerQueueActive *pTimerQueue; char *pUserName; class udpiiu *pudpiiu; @@ -172,32 +174,28 @@ private: *pRepeaterSubscribeTmr; void *tcpSmallRecvBufFreeList; void *tcpLargeRecvBufFreeList; - epicsThread *pRecvProcessThread; - epicsThreadPrivateId isRecvProcessId; cacNotify ¬ify; - unsigned ioNotifyInProgressId; unsigned initializingThreadsPriority; - unsigned threadsBlockingOnNotifyCompletion; unsigned maxRecvBytesTCP; - unsigned recvProcessEnableRefCount; unsigned pndRecvCnt; unsigned readSeq; + unsigned recvThreadsPendingCount; bool enablePreemptiveCallback; - bool ioInProgress; - bool recvProcessThreadExitRequest; - void processRecvBacklog (); void flushRequestPrivate (); void run (); bool setupUDP (); - void enableCallbackPreemption (); - void disableCallbackPreemption (); - void flushIfRequired ( nciu & ); // lock must be applied + void connectAllIO ( nciu &chan ); + void disconnectAllIO ( nciu &chan, bool enableCallbacks ); + void privateDestroyAllIO ( nciu & chan ); + void ioCancelPrivate ( nciu &chan, const cacChannel::ioid &id ); + void flushIfRequired ( netiiu & ); void recycleReadNotifyIO ( netReadNotifyIO &io ); void recycleWriteNotifyIO ( netWriteNotifyIO &io ); void recycleSubscription ( netSubscription &io ); - bool recvProcessThreadIsCurrentThread () const; - void startRecvProcessThread (); + void preemptiveCallbackLock (); + void preemptiveCallbackUnlock (); + void ioCompletionNotify ( unsigned id, unsigned type, arrayElementCount count, const void *pData ); void ioExceptionNotify ( unsigned id, @@ -212,7 +210,6 @@ private: int status, const char *pContext ); void ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext, unsigned type, arrayElementCount count ); - bool blockForIOCallbackCompletion ( const cacChannel::ioid & id ); // recv protocol stubs bool noopAction ( tcpiiu &, const caHdrLargeArray &, void *pMsgBdy ); @@ -249,6 +246,22 @@ private: tcpiiu &iiu, const caHdrLargeArray &hdr, const char *pCtx, unsigned status ); static const pExcepProtoStubTCP tcpExcepJumpTableCAC []; + + friend class callbackAutoMutex; +}; + +class callbackAutoMutex { +public: + callbackAutoMutex ( cac & ctxIn ) : ctx ( ctxIn ) + { + this->ctx.preemptiveCallbackLock (); + } + ~callbackAutoMutex () + { + this->ctx.preemptiveCallbackUnlock (); + } +private: + cac & ctx; }; inline const char * cac::userNamePointer () const @@ -321,25 +334,10 @@ inline void cac::releaseLargeBufferTCP ( char *pBuf ) freeListFree ( this->tcpLargeRecvBufFreeList, pBuf ); } -inline bool cac::recvProcessThreadIsCurrentThread () const -{ - if ( this->pRecvProcessThread ) { - return this->pRecvProcessThread->isCurrentThread(); - } - else { - return false; - } -} - inline bool cac::ioComplete () const { return ( this->pndRecvCnt == 0u ); } -inline void cac::signalRecvActivity () -{ - this->recvProcessActivityEvent.signal (); -} - #endif // ifdef cach diff --git a/src/ca/comBuf.h b/src/ca/comBuf.h index f816de65a..15866c3e2 100644 --- a/src/ca/comBuf.h +++ b/src/ca/comBuf.h @@ -49,6 +49,7 @@ public: unsigned unoccupiedBytes () const; unsigned occupiedBytes () const; static unsigned capacityBytes (); + void clear (); unsigned copyInBytes ( const void *pBuf, unsigned nBytes ); unsigned copyIn ( comBuf & ); unsigned copyIn ( const epicsInt8 *pValue, unsigned nElem ); @@ -105,6 +106,12 @@ inline void comBuf::destroy () delete this; } +inline void comBuf::clear () +{ + this->nextWriteIndex = 0u; + this->nextReadIndex = 0u; +} + inline void * comBuf::operator new ( size_t size, const std::nothrow_t & ) { epicsAutoMutex locker ( comBuf::freeListMutex ); diff --git a/src/ca/msgForMultiplyDefinedPV.cpp b/src/ca/msgForMultiplyDefinedPV.cpp index 844e0b10f..d7938f0d3 100644 --- a/src/ca/msgForMultiplyDefinedPV.cpp +++ b/src/ca/msgForMultiplyDefinedPV.cpp @@ -44,5 +44,6 @@ void msgForMultiplyDefinedPV::ioCompletionNotify ( const char *pHostNameRej ) char buf[256]; sprintf ( buf, "Channel: \"%.64s\", Connecting to: %.64s, Ignored: %.64s", this->channel, this->acc, pHostNameRej ); + callbackAutoMutex autoMutex ( this->cacRef ); genLocalExcep ( this->cacRef, ECA_DBLCHNL, buf ); } diff --git a/src/ca/nciu.cpp b/src/ca/nciu.cpp index 9c532f149..b2f9c15a3 100644 --- a/src/ca/nciu.cpp +++ b/src/ca/nciu.cpp @@ -118,37 +118,17 @@ void nciu::connect ( unsigned nativeType, /* * if less than v4.1 then the server will never - * send access rights and we know that there - * will always be access + * send access rights and there will always be access */ if ( ! v41Ok ) { this->accessRightState.setReadPermit(); this->accessRightState.setWritePermit(); } - - // resubscribe for monitors from this channel - this->cacCtx.connectAllIO ( *this ); - - this->notify().connectNotify (); - - /* - * if less than v4.1 then the server will never - * send access rights and we know that there - * will always be access and also need to call - * their call back here - */ - if ( ! v41Ok ) { - this->notify().accessRightsNotify ( this->accessRightState ); - } } -void nciu::disconnect ( netiiu &newiiu ) +void nciu::disconnect ( netiiu & newiiu ) { - bool wasConnected; - - this->piiu->disconnectAllIO ( *this ); - - this->piiu = &newiiu; + this->piiu = & newiiu; this->retry = 0u; this->typeCode = USHRT_MAX; this->count = 0u; @@ -156,24 +136,7 @@ void nciu::disconnect ( netiiu &newiiu ) this->accessRightState.clrReadPermit(); this->accessRightState.clrWritePermit(); this->f_claimSent = false; - - if ( this->f_connected ) { - wasConnected = true; - } - else { - wasConnected = false; - } this->f_connected = false; - - if ( wasConnected ) { - /* - * look for events that have an event cancel in progress - */ - this->notify().disconnectNotify (); - this->notify().accessRightsNotify ( this->accessRightState ); - } - - this->resetRetryCount (); } /* diff --git a/src/ca/nciu.h b/src/ca/nciu.h index dd6b80d80..2ef98a056 100644 --- a/src/ca/nciu.h +++ b/src/ca/nciu.h @@ -47,6 +47,8 @@ public: void connect ( unsigned nativeType, unsigned nativeCount, unsigned sid, bool v41Ok ); void connect (); + void connectStateNotify () const; + void accessRightsNotify () const; void disconnect ( netiiu &newiiu ); bool searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel ); @@ -77,9 +79,9 @@ protected: ~nciu (); // force pool allocation private: caAccessRights accessRightState; - cac &cacCtx; - char *pNameStr; - netiiu *piiu; + cac & cacCtx; + char * pNameStr; + netiiu * piiu; ca_uint32_t sid; // server id unsigned count; unsigned retry; // search retry number @@ -139,7 +141,6 @@ inline void nciu::resetRetryCount () inline void nciu::accessRightsStateChange ( const caAccessRights &arIn ) { this->accessRightState = arIn; - this->notify().accessRightsNotify ( arIn ); } inline ca_uint32_t nciu::getSID () const @@ -203,4 +204,19 @@ inline void nciu::writeException ( int status, this->notify().writeException ( status, pContext, typeIn, countIn ); } +inline void nciu::connectStateNotify () const +{ + if ( this->f_connected ) { + this->notify().connectNotify (); + } + else { + this->notify().disconnectNotify (); + } +} + +inline void nciu::accessRightsNotify () const +{ + this->notify().accessRightsNotify ( this->accessRightState ); +} + #endif // ifdef nciuh diff --git a/src/ca/netIO.h b/src/ca/netIO.h index 5dc7e404d..d5fdd0582 100644 --- a/src/ca/netIO.h +++ b/src/ca/netIO.h @@ -43,7 +43,7 @@ protected: // perhaps we should not store the channel here and instead fetch it out of the // notify // - nciu &chan; + nciu & chan; }; class netSubscription : public baseNMIU { diff --git a/src/ca/netiiu.cpp b/src/ca/netiiu.cpp index a7fa42e82..0bbd945ae 100644 --- a/src/ca/netiiu.cpp +++ b/src/ca/netiiu.cpp @@ -45,17 +45,11 @@ void netiiu::show ( unsigned level ) const } // cac lock must also be applied when calling this -void netiiu::disconnectAllChan ( netiiu & newiiu ) +void netiiu::uninstallAllChan ( tsDLList < nciu > & dstList ) { - tsDLIterBD < nciu > chan = this->channelList.firstIter (); - while ( chan.valid () ) { - tsDLIterBD < nciu > next = chan; - next++; - this->clearChannelRequest ( *chan ); - this->channelList.remove ( *chan ); - chan->disconnect ( newiiu ); - newiiu.channelList.add ( *chan ); - chan = next; + while ( nciu *pChan = this->channelList.get () ) { + this->clearChannelRequest ( *pChan ); + dstList.add ( *pChan ); } } @@ -160,14 +154,6 @@ const char * netiiu::pHostName () const return ""; } -void netiiu::disconnectAllIO ( nciu & ) -{ -} - -void netiiu::connectAllIO ( nciu & ) -{ -} - double netiiu::beaconPeriod () const { return ( - DBL_MAX ); @@ -186,7 +172,7 @@ void netiiu::flushRequestIfAboveEarlyThreshold () { } -void netiiu::blockUntilSendBacklogIsReasonable ( epicsMutex & ) +void netiiu::blockUntilSendBacklogIsReasonable ( epicsMutex *, epicsMutex & ) { } diff --git a/src/ca/netiiu.h b/src/ca/netiiu.h index f81411931..1c7afc9ee 100644 --- a/src/ca/netiiu.h +++ b/src/ca/netiiu.h @@ -35,7 +35,7 @@ public: virtual ~netiiu (); void show ( unsigned level ) const; unsigned channelCount () const; - void disconnectAllChan ( netiiu & newiiu ); + void uninstallAllChan ( tsDLList < nciu > & dstList ); void connectTimeoutNotify (); bool searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel ); void resetChannelRetryCounts (); @@ -51,8 +51,6 @@ public: virtual void writeNotifyRequest ( nciu &, netWriteNotifyIO &, unsigned type, unsigned nElem, const void *pValue ); virtual void readNotifyRequest ( nciu &, netReadNotifyIO &, unsigned type, unsigned nElem ); virtual void createChannelRequest ( nciu & ); - virtual void connectAllIO ( nciu &chan ); - virtual void disconnectAllIO ( nciu &chan ); virtual void clearChannelRequest ( nciu & ); virtual void subscriptionRequest ( nciu &, netSubscription &subscr ); virtual void subscriptionCancelRequest ( nciu &, netSubscription &subscr ); @@ -60,7 +58,7 @@ public: virtual void flushRequest (); virtual bool flushBlockThreshold () const; virtual void flushRequestIfAboveEarlyThreshold (); - virtual void blockUntilSendBacklogIsReasonable ( epicsMutex & ); + virtual void blockUntilSendBacklogIsReasonable ( epicsMutex *, epicsMutex & ); virtual void requestRecvProcessPostponedFlush (); protected: cac * pCAC () const; diff --git a/src/ca/oldAccess.h b/src/ca/oldAccess.h index 2c0ff46c3..b0b48075f 100644 --- a/src/ca/oldAccess.h +++ b/src/ca/oldAccess.h @@ -212,7 +212,7 @@ public: CASG * lookupCASG ( unsigned id ); void installCASG ( CASG & ); void uninstallCASG ( CASG & ); - void blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ); + int blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ); void selfTest (); // perhaps these should be eliminated in deference to the exception mechanism int printf ( const char *pformat, ... ) const; @@ -492,10 +492,10 @@ inline void oldCAC::uninstallCASG ( CASG &sg ) this->clientCtx.uninstallCASG ( sg ); } -inline void oldCAC::blockForEventAndEnableCallbacks ( +inline int oldCAC::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ) { - this->clientCtx.blockForEventAndEnableCallbacks ( event, timeout ); + return this->clientCtx.blockForEventAndEnableCallbacks ( event, timeout ); } inline void oldCAC::vSignal ( int ca_status, const char *pfilenm, diff --git a/src/ca/oldCAC.cpp b/src/ca/oldCAC.cpp index 35ab04b45..b936271f7 100644 --- a/src/ca/oldCAC.cpp +++ b/src/ca/oldCAC.cpp @@ -191,16 +191,6 @@ void oldCAC::show ( unsigned level ) const { ::printf ( "oldCAC at %p\n", static_cast ( this ) ); -#if 0 // gnu compiler does not like casting func ptr to void ptr - ::printf ( "exception func at %p arg at %p\n", - static_cast ( this->ca_exception_func ), - static_cast ( this->ca_exception_arg ) ); - ::printf ( "printf func at %p\n", - static_cast ( this->pVPrintfFunc ) ); - ::printf ( "fd registration func at %p arg at %p\n", - static_cast ( this->fdRegFunc ), - static_cast ( this->fdRegArg ) ); -#endif if ( level > 0u ) { this->mutex.show ( level - 1u ); this->clientCtx.show ( level - 1u ); diff --git a/src/ca/syncGroup.h b/src/ca/syncGroup.h index 7ab756687..70cb73df1 100644 --- a/src/ca/syncGroup.h +++ b/src/ca/syncGroup.h @@ -130,6 +130,7 @@ protected: private: tsDLList < syncGroupNotify > ioList; epicsMutex mutable mutex; + epicsMutex serializeBlock; epicsEvent sem; oldCAC & client; unsigned magic; diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index f95bf01d3..ce30a7438 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -89,7 +89,7 @@ extern "C" void cacSendThreadTCP ( void *pParam ) } } catch ( ... ) { - piiu->printf ("cac: tcp send thread received an exception - diconnecting\n"); + piiu->printf ("cac: tcp send thread received an exception - disconnecting\n"); piiu->forcedShutdown (); } @@ -158,8 +158,9 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf ) } assert ( nBytesInBuf <= INT_MAX ); + int status = ::recv ( this->sock, static_cast ( pBuf ), - static_cast ( nBytesInBuf ), 0); + static_cast ( nBytesInBuf ), this->recvFlag ); if ( status <= 0 ) { int localErrno = SOCKERRNO; @@ -207,102 +208,107 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) { tcpiiu *piiu = ( tcpiiu * ) pParam; + piiu->pCAC()->attachToClientCtx (); + + epicsThreadPrivateSet ( caClientCallbackThreadId, piiu ); + piiu->connect (); { - epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() ); - if ( piiu->state == iiu_connected ) { - unsigned priorityOfSend; - epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove ( - piiu->pCAC ()->getInitializingThreadsPriority (), &priorityOfSend ); - if ( tbs != epicsThreadBooleanStatusSuccess ) { - priorityOfSend = piiu->pCAC ()->getInitializingThreadsPriority (); - } - epicsThreadId tid = epicsThreadCreate ( "CAC-TCP-send", priorityOfSend, - epicsThreadGetStackSize ( epicsThreadStackMedium ), cacSendThreadTCP, piiu ); - if ( ! tid ) { - piiu->recvThreadExitEvent.signal (); - piiu->sendThreadExitEvent.signal (); - piiu->cleanShutdown (); - return; - } - } - else { - piiu->recvThreadExitEvent.signal (); - piiu->sendThreadExitEvent.signal (); - piiu->cleanShutdown (); - return; - } + callbackAutoMutex autoMutex ( *piiu->pCAC() ); + piiu->pCAC()->notifyNewFD ( piiu->sock ); } - unsigned nBytes = 0u; - while ( piiu->state == iiu_connected ) { - if ( nBytes >= maxBytesPendingTCP ) { - piiu->recvThreadRingBufferSpaceAvailableEvent.wait (); - epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() ); - nBytes = piiu->recvQue.occupiedBytes (); + if ( piiu->state == iiu_connected ) { + unsigned priorityOfSend; + epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove ( + piiu->pCAC()->getInitializingThreadsPriority (), &priorityOfSend ); + if ( tbs != epicsThreadBooleanStatusSuccess ) { + priorityOfSend = piiu->pCAC ()->getInitializingThreadsPriority (); } - else { - comBuf * pComBuf = new ( std::nothrow ) comBuf; - if ( pComBuf ) { - unsigned nBytesIn = pComBuf->fillFromWire ( *piiu ); - if ( nBytesIn ) { - bool msgHeaderButNoBody; - { - epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() ); - nBytes = piiu->recvQue.occupiedBytes (); - msgHeaderButNoBody = piiu->oldMsgHeaderAvailable; - piiu->recvQue.pushLastComBufReceived ( *pComBuf ); - if ( nBytesIn == pComBuf->capacityBytes () ) { - if ( piiu->contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) { - piiu->busyStateDetected = true; - } - else { - piiu->contigRecvMsgCount++; - } - } - else { - piiu->contigRecvMsgCount = 0u; - piiu->busyStateDetected = false; - } - piiu->unacknowledgedSendBytes = 0u; - } - // reschedule connection activity watchdog - // but dont hold the lock for fear of deadlocking - // because cancel is blocking for the completion - // of the recvDog expire which takes the lock - piiu->recvDog.messageArrivalNotify (); + epicsThreadId tid = epicsThreadCreate ( "CAC-TCP-send", priorityOfSend, + epicsThreadGetStackSize ( epicsThreadStackMedium ), cacSendThreadTCP, piiu ); + if ( ! tid ) { + piiu->sendThreadExitEvent.signal (); + piiu->cleanShutdown (); + } + } + else { + piiu->sendThreadExitEvent.signal (); + piiu->cleanShutdown (); + } - // wake up recv thread only if - // 1) there are currently no bytes in the queue - // 2) if the recv thread is currently blocking for an incomplete msg - if ( nBytes < sizeof ( caHdr ) || msgHeaderButNoBody ) { - piiu->pCAC()->signalRecvActivity (); - } + while ( piiu->state == iiu_connected ) { + comBuf * pComBuf = new ( std::nothrow ) comBuf; + if ( pComBuf ) { + if ( piiu->preemptiveCallbackEnable ) { + piiu->recvFlag = 0; + } + else { + piiu->recvFlag = MSG_PEEK; + } + unsigned nBytesIn = pComBuf->fillFromWire ( *piiu ); - if ( nBytes <= UINT_MAX - nBytesIn ) { - nBytes += nBytesIn; + // only one recv thread at a time may call callbacks + callbackAutoMutex autoMutex ( *piiu->pCAC() ); + + // + // We leave the bytes pending and fetch them again after + // callbacks are enabled when running in the old preemptive + // call back disabled mode so that asynchronous wakeup via + // file manager call backs works correctly. Oddly enough, + // this does not appear to impact performance. + // + if ( ! piiu->preemptiveCallbackEnable ) { + pComBuf->clear (); + piiu->recvFlag = 0; + nBytesIn = pComBuf->fillFromWire ( *piiu ); + } + + if ( nBytesIn ) { + piiu->recvQue.pushLastComBufReceived ( *pComBuf ); + if ( nBytesIn == pComBuf->capacityBytes () ) { + if ( piiu->contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) { + piiu->busyStateDetected = true; } - else { - nBytes = UINT_MAX; + else { + piiu->contigRecvMsgCount++; } } else { - pComBuf->destroy (); - epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() ); - nBytes = piiu->recvQue.occupiedBytes (); - } + piiu->contigRecvMsgCount = 0u; + piiu->busyStateDetected = false; + } + piiu->unacknowledgedSendBytes = 0u; + + // reschedule connection activity watchdog + // but dont hold the lock for fear of deadlocking + // because cancel is blocking for the completion + // of the recvDog expire which takes the lock + piiu->recvDog.messageArrivalNotify (); + + // + // execute receive labor + // + piiu->processIncoming (); } else { - // no way to be informed when memory is available - epicsThreadSleep ( 0.5 ); - epicsAutoMutex autoMutex ( piiu->pCAC()->mutexRef() ); - nBytes = piiu->recvQue.occupiedBytes (); + pComBuf->destroy (); } } + else { + // no way to be informed when memory is available + epicsThreadSleep ( 0.5 ); + } } - piiu->recvThreadExitEvent.signal (); + { + callbackAutoMutex autoMutex ( *piiu->pCAC() ); + piiu->pCAC()->uninstallIIU ( *piiu ); + piiu->pCAC()->notifyDestroyFD ( piiu->sock ); + } + + piiu->destroy (); } // @@ -311,7 +317,8 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, epicsTimerQueue &timerQueue, const osiSockAddr &addrIn, unsigned minorVersion, class bhe &bheIn, - ipAddrToAsciiEngine &engineIn ) : + ipAddrToAsciiEngine &engineIn, + bool preemptiveCallbackEnableIn ) : netiiu ( &cac ), recvDog ( *this, connectionTimeout, timerQueue ), sendDog ( *this, connectionTimeout, timerQueue ), @@ -329,15 +336,16 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, blockingForFlush ( 0u ), socketLibrarySendBufferSize ( 0u ), unacknowledgedSendBytes ( 0u ), + recvFlag ( 0 ), busyStateDetected ( false ), flowControlActive ( false ), echoRequestPending ( false ), oldMsgHeaderAvailable ( false ), msgHeaderAvailable ( false ), sockCloseCompleted ( false ), - f_trueOnceOnly ( true ), earlyFlush ( false ), - recvProcessPostponedFlush ( false ) + recvProcessPostponedFlush ( false ), + preemptiveCallbackEnable ( preemptiveCallbackEnableIn ) { if ( ! this->pCurData ) { throw std::bad_alloc (); @@ -378,7 +386,6 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, this->userNameSetRequest (); this->hostNameSetRequest (); - # if 0 { int i; @@ -424,7 +431,7 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, unsigned priorityOfRecv; epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove ( - this->pCAC ()->getInitializingThreadsPriority (), &priorityOfRecv ); + this->pCAC()->getInitializingThreadsPriority (), &priorityOfRecv ); if ( tbs != epicsThreadBooleanStatusSuccess ) { priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority (); } @@ -524,7 +531,7 @@ void tcpiiu::cleanShutdown () else if ( this->state == iiu_connecting ) { int status = socket_close ( this->sock ); if ( status ) { - errlogPrintf ("CAC TCP socket close error was %s\n", + errlogPrintf ( "CAC TCP socket close error was %s\n", SOCKERRSTR (SOCKERRNO) ); } else { @@ -533,8 +540,6 @@ void tcpiiu::cleanShutdown () } } this->sendThreadFlushEvent.signal (); - this->recvThreadRingBufferSpaceAvailableEvent.signal (); - this->pCAC ()->signalRecvActivity (); } /* @@ -569,8 +574,6 @@ void tcpiiu::forcedShutdown () } this->sendThreadFlushEvent.signal (); - this->recvThreadRingBufferSpaceAvailableEvent.signal (); - this->pCAC()->signalRecvActivity (); } // @@ -605,27 +608,6 @@ tcpiiu::~tcpiiu () } } - // wait for recv thread to exit - while ( true ) { - bool signaled = this->recvThreadExitEvent.wait ( shutdownDelay ); - if ( signaled ) { - break; - } - if ( ! this->sockCloseCompleted ) { - printf ( "Gave up waiting for \"shutdown()\" to force receive thread to exit after %f sec\n", - shutdownDelay); - printf ( "Closing socket\n" ); - int status = socket_close ( this->sock ); - if ( status ) { - errlogPrintf ("CAC TCP socket close error was %s\n", - SOCKERRSTR ( SOCKERRNO ) ); - } - else { - this->sockCloseCompleted = true; - } - } - } - if ( ! this->sockCloseCompleted ) { int status = socket_close ( this->sock ); if ( status ) { @@ -722,12 +704,8 @@ void tcpiiu::show ( unsigned level ) const ::printf ( "\tvirtual circuit socket identifier %d\n", this->sock ); ::printf ( "\tsend thread flush signal:\n" ); this->sendThreadFlushEvent.show ( level-3u ); - ::printf ( "\trecv thread buffer space available signal:\n" ); - this->recvThreadRingBufferSpaceAvailableEvent.show ( level-3u ); ::printf ( "\tsend thread exit signal:\n" ); this->sendThreadExitEvent.show ( level-3u ); - ::printf ( "\trecv thread exit signal:\n" ); - this->recvThreadExitEvent.show ( level-3u ); ::printf ("\techo pending bool = %u\n", this->echoRequestPending ); ::printf ( "IO identifier hash table:\n" ); this->BHE.show ( level - 3u ); @@ -756,7 +734,7 @@ bool tcpiiu::setEchoRequestPending () // void tcpiiu::processIncoming () { - while ( 1 ) { + while ( true ) { // // fetch a complete message header @@ -825,7 +803,6 @@ void tcpiiu::processIncoming () &this->pCurData[this->curDataBytes], this->curMsg.m_postsize - this->curDataBytes ); if ( this->curDataBytes < this->curMsg.m_postsize ) { - this->recvThreadRingBufferSpaceAvailableEvent.signal (); this->flushIfRecvProcessRequested (); return; } @@ -848,16 +825,10 @@ void tcpiiu::processIncoming () this->curDataBytes += this->recvQue.removeBytes ( this->curMsg.m_postsize - this->curDataBytes ); if ( this->curDataBytes < this->curMsg.m_postsize ) { - this->recvThreadRingBufferSpaceAvailableEvent.signal (); this->flushIfRecvProcessRequested (); return; } } - - if ( nBytes >= maxBytesPendingTCP && - this->recvQue.occupiedBytes () < maxBytesPendingTCP ) { - this->recvThreadRingBufferSpaceAvailableEvent.signal (); - } this->oldMsgHeaderAvailable = false; this->msgHeaderAvailable = false; @@ -1276,19 +1247,26 @@ bool tcpiiu::flush () } // ~tcpiiu() will not return while this->blockingForFlush is greater than zero -void tcpiiu::blockUntilSendBacklogIsReasonable ( epicsMutex &mutex ) +void tcpiiu::blockUntilSendBacklogIsReasonable ( + epicsMutex *pCallbackMutex, epicsMutex & primaryMutex ) { assert ( this->blockingForFlush < UINT_MAX ); this->blockingForFlush++; while ( this->sendQue.flushBlockThreshold(0u) && this->state == iiu_connected ) { - epicsAutoMutexRelease autoRelease ( mutex ); - this->flushBlockEvent.wait ( 5.0 ); - } - if ( this->blockingForFlush == 1 ) { - this->flushBlockEvent.signal (); + epicsAutoMutexRelease autoRelease ( primaryMutex ); + if ( pCallbackMutex ) { + epicsAutoMutexRelease autoRelease ( *pCallbackMutex ); + this->flushBlockEvent.wait (); + } + else { + this->flushBlockEvent.wait (); + } } assert ( this->blockingForFlush > 0u ); this->blockingForFlush--; + if ( this->blockingForFlush == 0 ) { + this->flushBlockEvent.signal (); + } } void tcpiiu::flushRequestIfAboveEarlyThreshold () diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp index 56ff2c98d..b5e905877 100644 --- a/src/ca/udpiiu.cpp +++ b/src/ca/udpiiu.cpp @@ -51,9 +51,9 @@ const udpiiu::pProtoStubUDP udpiiu::udpJumpTableCAC [] = // // udpiiu::udpiiu () // -udpiiu::udpiiu ( cac &cac, epicsThreadPrivateId isRecvProcessIdIn ) : - netiiu ( &cac ), isRecvProcessId ( isRecvProcessIdIn ), - shutdownCmd ( false ), sockCloseCompleted ( false ) +udpiiu::udpiiu ( cac &cac ) : + netiiu ( &cac ), shutdownCmd ( false ), + sockCloseCompleted ( false ) { static const unsigned short PORT_ANY = 0u; osiSockAddr addr; @@ -146,6 +146,11 @@ udpiiu::udpiiu ( cac &cac, epicsThreadPrivateId isRecvProcessIdIn ) : ellInit ( &this->dest ); configureChannelAccessAddressList ( &this->dest, this->sock, this->serverPort ); if ( ellCount ( &this->dest ) == 0 ) { + // no need to lock callbacks here because + // 1) this is called while in a CA client function + // 2) no auxiliary threads are running at this point + // (taking the callback lock here would break the + // lock hierarchy and risk deadlocks) genLocalExcep ( *this->pCAC (), ECA_NOSEARCHADDR, NULL ); } @@ -245,7 +250,7 @@ void udpiiu::recvMsg () extern "C" void cacRecvThreadUDP ( void *pParam ) { udpiiu *piiu = (udpiiu *) pParam; - epicsThreadPrivateSet ( piiu->isRecvProcessId, pParam ); + epicsThreadPrivateSet ( caClientCallbackThreadId, pParam ); do { piiu->recvMsg (); } while ( ! piiu->shutdownCmd ); @@ -643,12 +648,14 @@ bool udpiiu::exceptionRespAction ( const caHdr &msg, return true; } -void udpiiu::postMsg ( const osiSockAddr &net_addr, - char *pInBuf, arrayElementCount blockSize, - const epicsTime ¤tTime) +void udpiiu::postMsg ( const osiSockAddr & net_addr, + char * pInBuf, arrayElementCount blockSize, + const epicsTime & currentTime ) { caHdr *pCurMsg; + callbackAutoMutex autoMutex ( *this->pCAC() ); + while ( blockSize ) { arrayElementCount size; @@ -656,8 +663,8 @@ void udpiiu::postMsg ( const osiSockAddr &net_addr, char buf[64]; sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); this->printf ( - "%s: undecipherable (too small) UDP msg from %s ignored\n", __FILE__, - buf ); + "%s: undecipherable (too small) UDP msg from %s ignored\n", + __FILE__, buf ); return; } diff --git a/src/ca/udpiiu.h b/src/ca/udpiiu.h index 35b102c98..dad8f9e5b 100644 --- a/src/ca/udpiiu.h +++ b/src/ca/udpiiu.h @@ -43,7 +43,7 @@ class epicsTime; class udpiiu : public netiiu { public: - udpiiu ( class cac &, epicsThreadPrivateId ); + udpiiu ( class cac & ); virtual ~udpiiu (); void shutdown (); void recvMsg (); @@ -59,15 +59,12 @@ public: // exceptions class noSocket {}; - SOCKET getSock () const; - private: char xmitBuf [MAX_UDP_SEND]; char recvBuf [MAX_UDP_RECV]; ELLLIST dest; epicsThreadId recvThreadId; epicsEventId recvThreadExitSignal; - epicsThreadPrivateId isRecvProcessId; unsigned nBytesInXmitBuf; SOCKET sock; unsigned short repeaterPort; @@ -113,10 +110,5 @@ inline unsigned udpiiu::getPort () const return this->localPort; } -inline SOCKET udpiiu::getSock () const -{ - return this->sock; -} - #endif // udpiiuh diff --git a/src/ca/virtualCircuit.h b/src/ca/virtualCircuit.h index d4fde2931..c018ac226 100644 --- a/src/ca/virtualCircuit.h +++ b/src/ca/virtualCircuit.h @@ -215,20 +215,21 @@ public: tcpiiu ( cac &cac, double connectionTimeout, epicsTimerQueue &timerQueue, const osiSockAddr &addrIn, unsigned minorVersion, class bhe &bhe, - ipAddrToAsciiEngine &engineIn ); + ipAddrToAsciiEngine & engineIn, + bool preemptiveCallbackEnable ); ~tcpiiu (); void connect (); - void processIncoming (); void destroy (); - void cleanShutdown (); void forcedShutdown (); + void cleanShutdown (); void beaconAnomalyNotify (); void beaconArrivalNotify (); void flushRequest (); bool flushBlockThreshold () const; void flushRequestIfAboveEarlyThreshold (); - void blockUntilSendBacklogIsReasonable ( epicsMutex & ); + void blockUntilSendBacklogIsReasonable + ( epicsMutex * pCallBack, epicsMutex & primary ); virtual void show ( unsigned level ) const; bool setEchoRequestPending (); void requestRecvProcessPostponedFlush (); @@ -245,9 +246,6 @@ public: double beaconPeriod () const; bhe & getBHE () const; - SOCKET getSock() const; - bool trueOnceOnly (); - private: tcpRecvWatchdog recvDog; tcpSendWatchdog sendDog; @@ -263,24 +261,25 @@ private: unsigned minorProtocolVersion; iiu_conn_state state; epicsEvent sendThreadFlushEvent; - epicsEvent recvThreadRingBufferSpaceAvailableEvent; epicsEvent sendThreadExitEvent; - epicsEvent recvThreadExitEvent; epicsEvent flushBlockEvent; SOCKET sock; unsigned contigRecvMsgCount; unsigned blockingForFlush; unsigned socketLibrarySendBufferSize; unsigned unacknowledgedSendBytes; + int recvFlag; bool busyStateDetected; // only modified by the recv thread bool flowControlActive; // only modified by the send process thread bool echoRequestPending; bool oldMsgHeaderAvailable; bool msgHeaderAvailable; bool sockCloseCompleted; - bool f_trueOnceOnly; bool earlyFlush; bool recvProcessPostponedFlush; + bool preemptiveCallbackEnable; + + void processIncoming (); unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf ); unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf ); @@ -491,22 +490,6 @@ inline void tcpiiu::beaconArrivalNotify () this->recvDog.beaconArrivalNotify (); } -inline bool tcpiiu::trueOnceOnly () -{ - if ( this->f_trueOnceOnly ) { - this->f_trueOnceOnly = false; - return true; - } - else { - return false; - } -} - -inline SOCKET tcpiiu::getSock () const -{ - return this->sock; -} - inline void tcpiiu::flushIfRecvProcessRequested () { if ( this->recvProcessPostponedFlush ) {