From 07bf67ef2ff8f3d6fb2379e70fb31b6a58147912 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Thu, 9 May 2002 00:36:55 +0000 Subject: [PATCH] moved someof the functionality into the old interface and improved shutdown sequence --- src/ca/cac.cpp | 312 ++++++------------------------------------------- 1 file changed, 33 insertions(+), 279 deletions(-) diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 212d26b30..ea71c17d9 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -131,13 +131,11 @@ cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) : pudpiiu ( 0 ), tcpSmallRecvBufFreeList ( 0 ), tcpLargeRecvBufFreeList ( 0 ), - pCallbackGuard ( 0 ), notify ( notifyIn ), initializingThreadsId ( epicsThreadGetIdSelf() ), initializingThreadsPriority ( epicsThreadGetPrioritySelf() ), maxRecvBytesTCP ( MAX_TCP ), - pndRecvCnt ( 0u ), - readSeq ( 0u ) + preemptiveCallbackEnabled ( enablePreemptiveCallbackIn ) { if ( ! osiSockAttach () ) { throwWithLocation ( caErrorCode (ECA_INTERNAL) ); @@ -202,10 +200,6 @@ cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) : if ( ! this->tcpLargeRecvBufFreeList ) { throw std::bad_alloc (); } - - if ( ! enablePreemptiveCallbackIn ) { - this->pCallbackGuard = new epicsGuard < callbackMutex > ( this->cbMutex ); - } } catch ( ... ) { osiSockRelease (); @@ -223,14 +217,6 @@ cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) : cac::~cac () { - // - // release callback lock - // (disconnect callbacks will occur if they still have connected - // channels at this point) - // - delete this->pCallbackGuard; - - // // lock intentionally not held here so that we dont deadlock // waiting for the UDP thread to exit while it is waiting to @@ -239,16 +225,26 @@ cac::~cac () // this blocks until the UDP thread exits so that // it will not sneak in any new clients delete this->pudpiiu; - this->pudpiiu = 0; // for tcpCircuitShutdown + this->pudpiiu = 0; // for initiateAbortShutdown() // // shutdown all tcp connections // (take both locks here in the proper order to avoid deadlocks) // + tsSLList < tcpiiu > dest; { - epicsGuard < callbackMutex > autoMutexCB ( this->cbMutex ); - epicsGuard < cacMutex > autoMutexCAC ( this->mutex ); - this->serverTable.traverse ( & tcpiiu::cleanShutdown ); + epicsGuard < cacMutex > guard ( this->mutex ); + this->serverTable.removeAll ( dest ); + } + + while ( tcpiiu * pIIU = dest.get () ) { + { + epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); + this->privateUninstallIIU ( cbGuard, *pIIU ); + } + // this will block for oustanding sends to go out so dont + // hold a lock while destroying the tcpiiu + delete pIIU; } // @@ -308,13 +304,7 @@ unsigned cac::highestPriorityLevelBelow ( unsigned priority ) // void cac::flushRequest () { - epicsGuard < cacMutex > autoMutex ( this->mutex ); - this->flushRequestPrivate (); -} - -// lock must be applied -void cac::flushRequestPrivate () -{ + epicsGuard < cacMutex > guard ( this->mutex ); this->serverTable.traverse ( & tcpiiu::flushRequest ); } @@ -333,8 +323,6 @@ void cac::show ( unsigned level ) const if ( level > 0u ) { this->serverTable.show ( level - 1u ); ::printf ( "\tconnection time out watchdog period %f\n", this->connTMO ); - ::printf ( "\tpreemptive calback is %s\n", - this->pCallbackGuard ? "disabled" : "enabled" ); ::printf ( "list of installed services:\n" ); this->services.show ( level - 1u ); } @@ -343,8 +331,6 @@ void cac::show ( unsigned level ) const if ( this->pudpiiu ) { this->pudpiiu->show ( level - 2u ); } - ::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n", - this->pndRecvCnt ); } if ( level > 2u ) { @@ -362,15 +348,11 @@ void cac::show ( unsigned level ) const this->timerQueue.show ( level - 3u ); ::printf ( "IP address to name conversion engine:\n" ); this->ipToAEngine.show ( level - 3u ); - ::printf ( "\tthe current read sequence number is %u\n", - this->readSeq ); } if ( level > 3u ) { ::printf ( "Default mutex:\n"); this->mutex.show ( level - 4u ); - ::printf ( "IO done event:\n"); - this->ioDone.show ( level - 3u ); ::printf ( "mutex:\n" ); this->mutex.show ( level - 3u ); } @@ -429,130 +411,6 @@ void cac::beaconNotify ( const inetAddrID & addr, const epicsTime & currentTime, # endif } -// !!!! This routine is only visible in the old interface - or in a new ST interface. -// !!!! In the old interface we restrict thread attach so that calls from threads -// !!!! other than the initializing thread are not allowed if preemptive callback -// !!!! is disabled. This prevents the preemptive callback lock from being released -// !!!! by other threads than the one that locked it. -// -// this routine should probably be moved to the oldCAC? -int cac::pendIO ( const double & timeout ) -{ - // prevent recursion nightmares by disabling calls to - // pendIO () from within a CA callback. - if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) { - return ECA_EVDISALLOW; - } - - int status = ECA_NORMAL; - epicsTime beg_time = epicsTime::getCurrent (); - double remaining = timeout; - - { - epicsGuard < cacMutex > guard ( this->mutex ); - this->flushRequestPrivate (); - } - - while ( this->pndRecvCnt > 0 ) { - if ( remaining < CAC_SIGNIFICANT_DELAY ) { - status = ECA_TIMEOUT; - break; - } - - if ( this->pCallbackGuard ) { - epicsGuardRelease < callbackMutex > cbRelease ( *this->pCallbackGuard ); - this->ioDone.wait ( remaining ); - } - else { - this->ioDone.wait ( remaining ); - } - - double delay = epicsTime::getCurrent () - beg_time; - if ( delay < timeout ) { - remaining = timeout - delay; - } - else { - remaining = 0.0; - } - } - - { - epicsGuard < cacMutex > guard ( this->mutex ); - this->readSeq++; - this->pndRecvCnt = 0u; - } - - if ( this->pudpiiu ) { - this->pudpiiu->pendioTimeoutNotify (); - } - - return status; -} - -int cac::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ) -{ - if ( this->pCallbackGuard ) { - epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard ); - event.wait ( timeout ); - } - else { - event.wait ( timeout ); - } - - return ECA_NORMAL; -} - -// !!!! This routine is only visible in the old interface - or in a new ST interface. -// !!!! In the old interface we restrict thread attach so that calls from threads -// !!!! other than the initializing thread are not allowed if preemptive callback -// !!!! is disabled. This prevents the preemptive callback lock from being released -// !!!! by other threads than the one that locked it. -// -// this routine should probably be moved to the oldCAC? -int cac::pendEvent ( const double & timeout ) -{ - // prevent recursion nightmares by disabling calls to - // pendIO () from within a CA callback. - if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) { - return ECA_EVDISALLOW; - } - - epicsTime current = epicsTime::getCurrent (); - - { - epicsGuard < cacMutex > guard ( this->mutex ); - this->flushRequestPrivate (); - } - - // process at least once if preemptive callback is disabled - if ( this->pCallbackGuard ) { - epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard ); - this->cbMutex.waitUntilNoRecvThreadsPending (); - } - - double elapsed = epicsTime::getCurrent() - current; - double delay; - - if ( timeout > elapsed ) { - delay = timeout - elapsed; - } - else { - delay = 0.0; - } - - if ( delay >= CAC_SIGNIFICANT_DELAY ) { - if ( this->pCallbackGuard ) { - epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard ); - epicsThreadSleep ( delay ); - } - else { - epicsThreadSleep ( delay ); - } - } - - return ECA_TIMEOUT; -} - void cac::installCASG ( CASG &sg ) { epicsGuard < cacMutex > guard ( this->mutex ); @@ -608,7 +466,6 @@ cacChannel & cac::createChannel ( const char * pName, epicsGuard < cacMutex > guard ( this->mutex ); epics_auto_ptr < nciu > pNetChan ( new nciu ( *this, *this->pudpiiu, chan, pName, pri ) ); - pNetChan->notifyStateChangeFirstConnectInCountOfOutstandingIO (); this->chanTable.add ( *pNetChan ); return *pNetChan.release (); } @@ -714,7 +571,7 @@ bool cac::lookupChannelAndTransferToTCP ( } } - this->pudpiiu->uninstallChannel ( cbGuard, guard, *pChan ); + this->pudpiiu->uninstallChanAndReturnDestroyPtr ( guard, *pChan ); piiu->installChannel ( guard, *pChan, sid, typeCode, count ); v41Ok = piiu->ca_v41_ok (); @@ -818,23 +675,7 @@ void cac::uninstallChannel ( nciu & chan ) } } - // take callback lock to ensure that any callbacks in - // progress complete prior to destroying channel and any - // associated IO - // - // If this is a callback thread then it already owns the - // CB lock at this point. If this is the main thread then - // it is possible that this->pCallbackGuard already owns - // the callback lock. Otherwise if this is a preemptive - // callback enabled app then this->pCallbackGuard - // isnt set and we must take the call back lock. - // - // We take the callback lock again here and rely on recursive - // mutex capabilities, but perhaps this should be handled - // differently in the future. - // - // bool alreadyLocked = epicsThreadPrivateGet ( caClientCallbackThreadId ) - // || this->pCallbackGuard; + tcpiiu * pDestroyIIU; { // taking this mutex prior to deleting the IO and channel guarantees // that we will not delete a channel out from under a callback @@ -857,10 +698,18 @@ void cac::uninstallChannel ( nciu & chan ) // o chan destroy exception has been delivered { epicsGuard < cacMutex > guard ( this->mutex ); - // this destroys the tcpiiu if its the last channel - chan.getPIIU()->uninstallChannel ( cbGuard, guard, chan ); + pDestroyIIU = chan.getPIIU()->uninstallChanAndReturnDestroyPtr + ( guard, chan ); + if ( pDestroyIIU ) { + this->serverTable.remove ( *pDestroyIIU ); + this->privateUninstallIIU ( cbGuard, *pDestroyIIU ); + } } } + + // this blocks for all outstanding messages to be sent so + // no lock can be held here + delete pDestroyIIU; } int cac::printf ( const char *pformat, ... ) const @@ -892,13 +741,7 @@ void cac::flushIfRequired ( epicsGuard < cacMutex > & guard, netiiu & iiu ) // enable / disable of call back preemption must occur here // because the tcpiiu might disconnect while waiting and its // pointer to this cac might become invalid - if ( this->pCallbackGuard ) { - iiu.blockUntilSendBacklogIsReasonable - ( this->pCallbackGuard, guard ); - } - else { - iiu.blockUntilSendBacklogIsReasonable ( 0, guard ); - } + iiu.blockUntilSendBacklogIsReasonable ( this->notify, guard ); } } else { @@ -970,18 +813,7 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id ) } // wait for any IO callbacks in progress to complete // prior to destroying the IO object - // - // If this is a callback thread then it already owns the - // CB lock at this point. If this is the main thread then we - // are not in pendEvent, pendIO, SG block, etc and - // this->pCallbackGuard protects. Otherwise if this id - // the users auxillary thread then this->pCallbackGuard - // isnt set and we must take the call back lock. - bool alreadyLocked = ( epicsThreadPrivateGet ( caClientCallbackThreadId ) - || this->pCallbackGuard ); - if ( ! alreadyLocked ) { - epicsGuard < callbackMutex > autoMutex ( this->cbMutex ); - } + epicsGuard < callbackMutex > autoMutex ( this->cbMutex ); // now it is safe to destroy the IO object { epicsGuard < cacMutex > autoMutex ( this->mutex ); @@ -1651,72 +1483,6 @@ void cac::vSignal ( int ca_status, const char *pfilenm, this->printf ( "..................................................................\n" ); } -void cac::incrementOutstandingIO () -{ - epicsGuard < cacMutex > guard ( this->mutex ); - if ( this->pndRecvCnt < UINT_MAX ) { - this->pndRecvCnt++; - } - else { - throwWithLocation ( caErrorCode (ECA_INTERNAL) ); - } -} - -void cac::decrementOutstandingIO () -{ - bool signalNeeded; - - { - epicsGuard < cacMutex > guard ( this->mutex ); - if ( this->pndRecvCnt > 0u ) { - this->pndRecvCnt--; - if ( this->pndRecvCnt == 0u ) { - signalNeeded = true; - } - else { - signalNeeded = false; - } - } - else { - signalNeeded = true; - } - } - - if ( signalNeeded ) { - this->ioDone.signal (); - } -} - -void cac::decrementOutstandingIO ( unsigned sequenceNo ) -{ - bool signalNeeded; - - { - epicsGuard < cacMutex > guard ( this->mutex ); - if ( this->readSeq == sequenceNo ) { - if ( this->pndRecvCnt > 0u ) { - this->pndRecvCnt--; - if ( this->pndRecvCnt == 0u ) { - signalNeeded = true; - } - else { - signalNeeded = false; - } - } - else { - signalNeeded = true; - } - } - else { - signalNeeded = false; - } - } - - if ( signalNeeded ) { - this->ioDone.signal (); - } -} - void cac::selfTest () const { this->chanTable.verify (); @@ -1735,7 +1501,7 @@ void cac::notifyDestroyFD ( epicsGuard < callbackMutex > &, SOCKET sock ) const this->notify.fdWasDestroyed ( sock ); } -void cac::tcpCircuitShutdown ( tcpiiu & iiu, bool discardPendingMessages ) +void cac::initiateAbortShutdown ( tcpiiu & iiu ) { // generate some NOOP UDP traffic so that ca_pend_event() // will get called in preemptive callback disabled @@ -1747,16 +1513,10 @@ void cac::tcpCircuitShutdown ( tcpiiu & iiu, bool discardPendingMessages ) { epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); epicsGuard < cacMutex > guard ( this->mutex ); - iiu.shutdown ( cbGuard, guard, discardPendingMessages ); + iiu.initiateAbortShutdown ( cbGuard, guard ); } } -void cac::uninstallIIU ( tcpiiu & iiu ) -{ - epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); - this->privateUninstallIIU ( cbGuard, iiu ); -} - void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbGuard, tcpiiu & iiu ) { epicsGuard < cacMutex > guard ( this->mutex ); @@ -1773,16 +1533,10 @@ void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbGuard, tcpiiu & pBHE->unregisterIIU ( iiu ); } } - - // dont allow any channels to jump back onto - // this iiu while the lock is removed below - this->serverTable.remove ( iiu ); - + assert ( this->pudpiiu ); iiu.removeAllChannels ( cbGuard, guard, *this ); - delete &iiu; - this->pudpiiu->resetSearchTimerPeriod ( 0.0 ); // signal iiu uninstal event so that cac can properly shut down