From 1a1c884fa98e27b30db60f2ef693ec0b784e97e8 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Thu, 28 Feb 2002 00:20:31 +0000 Subject: [PATCH] changed channel and subscription uninstal procedures --- src/ca/cac.cpp | 371 +++++++++++++++++++++++++++++-------------------- 1 file changed, 220 insertions(+), 151 deletions(-) diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 9315e7c65..5865f60ac 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -152,7 +152,7 @@ extern "C" void cacOnceFunc ( void * ) // cac::cac () // cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) : - ipToAEngine ( "caIPAddrToAsciiEngine" ), + ipToAEngine ( "dnsQuery" ), programBeginTime ( epicsTime::getCurrent() ), connTMO ( CA_CONN_VERIFY_PERIOD ), timerQueue ( epicsTimerQueueActive::allocate ( false, @@ -261,9 +261,12 @@ cac::~cac () { // // release callback lock + // (disconnect callbacks will occur if they still have connected + // channels at this point) // delete this->pCallbackLocker; + // // lock intentionally not held here so that we dont deadlock // waiting for the UDP thread to exit while it is waiting to @@ -276,9 +279,11 @@ cac::~cac () // // shutdown all tcp connections + // (take both locks here in the proper order to avoid deadlocks) // { - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex autoMutexCB ( this->callbackMutex ); + epicsAutoMutex autoMutexCAC ( this->mutex ); this->serverTable.traverse ( & tcpiiu::cleanShutdown ); } @@ -289,17 +294,22 @@ cac::~cac () this->iiuUninstall.wait (); } + // (after this point all threads that know about this object + // have shut down so we no longer need to lock) delete this->pRepeaterSubscribeTmr; delete this->pSearchTmr; freeListCleanup ( this->tcpSmallRecvBufFreeList ); freeListCleanup ( this->tcpLargeRecvBufFreeList ); - { - epicsAutoMutex autoMutexCB ( this->callbackMutex ); - epicsAutoMutex autoMutexCAC ( this->mutex ); - if ( this->pudpiiu ) { - this->removeAllChan ( *this->pudpiiu ); + if ( this->pudpiiu ) { + while ( nciu *pChan = this->pudpiiu->firstChannel() ) { + { + callbackAutoMutex autoMutexCB ( *this ); + this->pudpiiu->detachChannel ( autoMutexCB, *pChan ); + } + pChan->disconnect ( limboIIU ); + limboIIU.attachChannel ( *pChan ); } } @@ -308,17 +318,17 @@ cac::~cac () this->beaconTable.traverse ( &bhe::destroy ); - // its ok for channels and subscriptions to still - // exist at this point. The user created them and - // its his responsibility to clean them up. - osiSockRelease (); this->timerQueue.release (); + + // its ok for channels and subscriptions to still + // exist at this point. The user created them and + // its his responsibility to clean them up. } -// must have callback lock and also cac lock -void cac::removeAllChan ( netiiu & srcIIU, netiiu *pDstIIU ) +void cac::removeAllChan ( callbackAutoMutex & cbLocker, epicsAutoMutex & locker, + netiiu & srcIIU, netiiu & dstIIU ) { // we are protected here because channel delete takes the callback mutex while ( nciu *pChan = srcIIU.firstChannel() ) { @@ -328,7 +338,7 @@ void cac::removeAllChan ( netiiu & srcIIU, netiiu *pDstIIU ) if ( pChan->connected() ) { srcIIU.clearChannelRequest ( pChan->getSID(), pChan->getCID() ); } - this->disconnectChannelPrivate ( *pChan, pDstIIU ); + this->disconnectChannelPrivate ( cbLocker, locker, *pChan, dstIIU ); } } @@ -542,7 +552,7 @@ int cac::pendIO ( const double & timeout ) } if ( this->pCallbackLocker ) { - epicsAutoMutexRelease autoRelease ( this->callbackMutex ); + callbackAutoMutexRelease autoRelease ( *this->pCallbackLocker ); this->ioDone.wait ( remaining ); } else { @@ -573,7 +583,7 @@ int cac::pendIO ( const double & timeout ) int cac::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ) { if ( this->pCallbackLocker ) { - epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex ); + callbackAutoMutexRelease autoMutexRelease ( *this->pCallbackLocker ); event.wait ( timeout ); } else { @@ -605,11 +615,10 @@ int cac::pendEvent ( const double & timeout ) this->flushRequestPrivate (); } - // process at least once if preemptive callback - // isnt enabled + // process at least once if preemptive callback is disabled if ( this->pCallbackLocker ) { - epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex ); - while ( this->recvThreadsPendingCount > 1 ) { + callbackAutoMutexRelease autoMutexRelease ( *this->pCallbackLocker ); + while ( this->recvThreadsPendingCount > 0 ) { this->noRecvThreadsPending.wait (); } } @@ -626,7 +635,7 @@ int cac::pendEvent ( const double & timeout ) if ( delay >= CAC_SIGNIFICANT_DELAY ) { if ( this->pCallbackLocker ) { - epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex ); + callbackAutoMutexRelease autoMutexRelease ( *this->pCallbackLocker ); epicsThreadSleep ( delay ); } else { @@ -715,7 +724,8 @@ bool cac::setupUDP () epicsAutoMutex autoMutex ( this->mutex ); if ( ! this->pudpiiu ) { - this->pudpiiu = new udpiiu ( *this ); + callbackAutoMutex cbMutex ( *this ); + this->pudpiiu = new udpiiu ( cbMutex, *this ); if ( ! this->pudpiiu ) { return false; } @@ -745,12 +755,14 @@ void cac::repeaterSubscribeConfirmNotify () } } -bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, +bool cac::lookupChannelAndTransferToTCP ( + callbackAutoMutex & cbMutex, unsigned cid, unsigned sid, ca_uint16_t typeCode, arrayElementCount count, unsigned minorVersionNumber, const osiSockAddr & addr, const epicsTime & currentTime ) { tcpiiu * pnewiiu = 0; + unsigned short retrySeqNumber = 0u; if ( addr.sa.sa_family != AF_INET ) { return false; @@ -769,7 +781,7 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, return true; } - unsigned short retrySeqNumber = chan->getRetrySeqNo (); + retrySeqNumber = chan->getRetrySeqNo (); /* * Ignore duplicate search replies @@ -819,7 +831,7 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, } } - this->pudpiiu->detachChannel ( *chan ); + this->pudpiiu->detachChannel ( cbMutex, *chan ); chan->searchReplySetUp ( *piiu, sid, typeCode, count ); piiu->attachChannel ( *chan ); @@ -835,10 +847,6 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, // resubscribe for monitors from this channel this->connectAllIO ( *chan ); } - - if ( this->pSearchTmr ) { - this->pSearchTmr->notifySearchResponse ( retrySeqNumber, currentTime ); - } } if ( ! v42Ok ) { @@ -864,16 +872,18 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, } if ( pnewiiu ) { - // this is done here after we release the priamry - // lock so that we will hold the callback lock but - // not the primary lock when the fd is registered - // with the user - bool success = pnewiiu->start (); + bool success = pnewiiu->start ( cbMutex ); if ( ! success ) { - this->privateUninstallIIU ( *pnewiiu ); + this->privateUninstallIIU ( cbMutex, *pnewiiu ); } } + if ( this->pSearchTmr ) { + // deadlock can result if this is called while holding the primary + // mutex (because the primary mutex is used in the search timer callback) + this->pSearchTmr->notifySearchResponse ( retrySeqNumber, currentTime ); + } + return true; } @@ -887,61 +897,93 @@ void cac::uninstallChannel ( nciu & chan ) // we do not dead lock { epicsAutoMutex autoMutex ( this->mutex ); + + // if the send backlog is too high send some frames before we get entagled + // in the channel shutdown sequence below. There is special protection in + // this routine that releases the callback lock if we are already holding it + // when this is the tcp receive thread or if this is the main thread and + // preemptive callback is disabled. this->flushIfRequired ( *chan.getPIIU() ); + + // unregister the channel + if ( this->chanTable.remove ( chan ) != &chan ) { + errlogPrintf ( + "CAC: Attemt to uninstall unregisterred channel ID=%u ignored.\n", + chan.getId () ); + return; + } + + // for each outstanding IO + // + // IO must be removed from the list and also uninstalled while holding the + // lock so that ioCancel() does not break in while postponing the destroy + // waiting for outstanding callbacks to complete + // while ( baseNMIU *pIO = chan.cacPrivateListOfIO::eventq.get() ) { - tmpList.add ( *pIO ); - this->ioTable.remove ( *pIO ); + // unregister IO class + if ( pIO != this->ioTable.remove ( *pIO ) ) { + errlogPrintf ( + "CAC: Unregister IO ID=%u found when uninstalling channel?\n", + pIO->getId () ); + continue; + } + // connected subscriptions must be canceled in the server class netSubscription *pSubscr = pIO->isSubscription (); if ( pSubscr && chan.connected() ) { + // we will deadlock if we hold the callback lock here chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); } - { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); - // If they call ioCancel() here it will be ignored - // because the IO has been unregistered above - pIO->exception ( ECA_CHANDESTROY, chan.pName() ); - } + tmpList.add ( *pIO ); } - nciu * pChan = this->chanTable.remove ( chan ); - assert ( pChan == &chan ); + // if the claim reply has not returned yet then we will issue // the clear channel request to the server when the claim reply // arrives and there is no matching nciu in the client - if ( pChan->connected() ) { + if ( chan.connected() ) { chan.getPIIU()->clearChannelRequest ( chan.getSID(), chan.getCID() ); } - chan.getPIIU()->detachChannel ( chan ); } - // take callback lock briefly to ensure that any callbacks in - // progress complete prior to destroying channel and any associated IO + // take callback lock to ensure that any callbacks in + // progress complete prior to destroying channel and any + // associated IO // // If this is a callback thread then it already owns the - // CB lock at this point. If this is the main thread then we - // are not in pendEvent, pendIO, SG block, etc and - // this->pCallbackLocker protects. Otherwise if this is - // a preemptive callback enabled app then this->pCallbackLocker + // CB lock at this point. If this is the main thread then + // it is possible that this->pCallbackLocker already owns + // the callback lock. Otherwise if this is a preemptive + // callback enabled app then this->pCallbackLocker // isnt set and we must take the call back lock. // - // must _not_ hold callback lock while sending channel and subscription - // cancel requests above unless. - // 1) this is a recv thread - // 2) this is the thread that initialized the library and preemptive - // callback is disabled - bool alreadyLocked = epicsThreadPrivateGet ( caClientCallbackThreadId ) - || this->pCallbackLocker; - if ( ! alreadyLocked ) { - // taking this mutex priior to deleting the IO and channel guarantees - // that we will not delete a channel out from under a callback - epicsAutoMutex autoCallbackMutex ( this->callbackMutex ); - } - - // destroy subsiderary IO now that it is safe to do so + // We take the callback lock again here and rely on recursive + // mutex capabilities, but perhaps this should be handled + // differently in the future. + // + // bool alreadyLocked = epicsThreadPrivateGet ( caClientCallbackThreadId ) + // || this->pCallbackLocker; { - epicsAutoMutex autoMutex ( this->mutex ); + // taking this mutex prior to deleting the IO and channel guarantees + // that we will not delete a channel out from under a callback + callbackAutoMutex cbLocker ( *this ); + + // destroy subsiderary IO now that it is safe to do so while ( baseNMIU *pIO = tmpList.get() ) { + // If they call ioCancel() here it will be ignored + // because the IO has been unregistered above. + // This must be done after outstanding callbacks + // for this channel have completed. + pIO->exception ( ECA_CHANDESTROY, chan.pName() ); pIO->destroy ( *this ); } + + // this must be done after the following + // o subscription cancel requests + // o clear channel request + // o outstanding callbacks using this channel have completed + // o chan destroy exception has been delivered + epicsAutoMutex autoMutex ( this->mutex ); + // this destroys the tcpiiu if its the last channel + chan.getPIIU()->detachChannel ( cbLocker, chan ); } } @@ -1037,6 +1079,29 @@ cac::readNotifyRequest ( nciu &chan, unsigned type, // X aCC 361 void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id ) { + baseNMIU * pmiu; + + // unistall the IO object so that a receive thread will not find it, + // but do _not_ hold the callback lock here because this could result + // in deadlock + { + epicsAutoMutex autoMutex ( this->mutex ); + pmiu = this->ioTable.remove ( id ); + if ( ! pmiu ) { + return; + } + class netSubscription *pSubscr = pmiu->isSubscription (); + if ( pSubscr ) { + this->flushIfRequired ( *chan.getPIIU() ); + if ( chan.connected() ) { + chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); + } + } + // must be uninstalled and also removed from the table + // while holding the lock to prevent a channel delete + // from destroying this IO object after we release the lock + chan.cacPrivateListOfIO::eventq.remove ( *pmiu ); + } // wait for any IO callbacks in progress to complete // prior to destroying the IO object // @@ -1046,31 +1111,14 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id ) // this->pCallbackLocker protects. Otherwise if this id // the users auxillary thread then this->pCallbackLocker // isnt set and we must take the call back lock. - if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) { - this->ioCancelPrivate ( chan, id ); - } - else if ( this->pCallbackLocker ) { - this->ioCancelPrivate ( chan, id ); - } - else { + bool alreadyLocked = ( epicsThreadPrivateGet ( caClientCallbackThreadId ) + || this->pCallbackLocker ); + if ( ! alreadyLocked ) { epicsAutoMutex autoMutex ( this->callbackMutex ); - this->ioCancelPrivate ( chan, id ); } -} - -void cac::ioCancelPrivate ( nciu & chan, const cacChannel::ioid & id ) -{ - epicsAutoMutex autoMutex ( this->mutex ); - baseNMIU * pmiu = this->ioTable.remove ( id ); - if ( pmiu ) { - chan.cacPrivateListOfIO::eventq.remove ( *pmiu ); - class netSubscription *pSubscr = pmiu->isSubscription (); - if ( pSubscr ) { - this->flushIfRequired ( *chan.getPIIU() ); - if ( chan.connected() ) { - chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); - } - } + // now it is safe to destroy the IO object + { + epicsAutoMutex autoMutex ( this->mutex ); pmiu->destroy ( *this ); } } @@ -1167,7 +1215,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id ) // it is in use here. // { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); + epicsAutoMutexRelease autoMutexRelease ( autoMutex ); pmiu->completion (); } @@ -1191,7 +1239,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id, // it is in use here. // { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); + epicsAutoMutexRelease autoMutexRelease ( autoMutex ); pmiu->completion ( type, count, pData ); } @@ -1215,7 +1263,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, // it is in use here. // { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); + epicsAutoMutexRelease autoMutexRelease ( autoMutex ); pmiu->exception ( status, pContext ); } @@ -1240,7 +1288,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, // { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); + epicsAutoMutexRelease autoMutexRelease ( autoMutex ); pmiu->exception ( status, pContext, type, count ); } @@ -1272,7 +1320,7 @@ void cac::connectAllIO ( nciu & chan ) // cancel IO operations and monitor subscriptions // -- callback lock and cac lock must be applied here -void cac::disconnectAllIO ( nciu & chan, bool enableCallbacks ) +void cac::disconnectAllIO ( epicsAutoMutex &locker, nciu & chan, bool enableCallbacks ) { tsDLIterBD pNetIO = chan.cacPrivateListOfIO::eventq.firstIter(); while ( pNetIO.valid() ) { @@ -1286,7 +1334,7 @@ void cac::disconnectAllIO ( nciu & chan, bool enableCallbacks ) if ( enableCallbacks ) { char buf[128]; sprintf ( buf, "host = %100s", chan.pHostName() ); - epicsAutoMutexRelease unlocker ( this->mutex ); + epicsAutoMutexRelease unlocker ( locker ); pNetIO->exception ( ECA_DISCONN, buf ); } if ( ! pNetIO->isSubscription() ) { @@ -1337,17 +1385,20 @@ cac::subscriptionRequest ( nciu &chan, unsigned type, // X aCC 361 } } -bool cac::noopAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ ) +bool cac::noopAction ( callbackAutoMutex &, tcpiiu &, + const caHdrLargeArray &, void * /* pMsgBdy */ ) { return true; } -bool cac::echoRespAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ ) +bool cac::echoRespAction ( callbackAutoMutex &, tcpiiu &, + const caHdrLargeArray &, void * /* pMsgBdy */ ) { return true; } -bool cac::writeNotifyRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * /* pMsgBdy */ ) +bool cac::writeNotifyRespAction ( callbackAutoMutex &, tcpiiu &, + const caHdrLargeArray &hdr, void * /* pMsgBdy */ ) { int caStatus = hdr.m_cid; if ( caStatus == ECA_NORMAL ) { @@ -1360,7 +1411,8 @@ bool cac::writeNotifyRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * / return true; } -bool cac::readNotifyRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy ) +bool cac::readNotifyRespAction ( callbackAutoMutex &, tcpiiu &iiu, + const caHdrLargeArray &hdr, void *pMsgBdy ) { /* @@ -1400,7 +1452,8 @@ bool cac::readNotifyRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void * return true; } -bool cac::eventRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy ) +bool cac::eventRespAction (callbackAutoMutex &, tcpiiu &iiu, + const caHdrLargeArray &hdr, void *pMsgBdy ) { int caStatus; @@ -1448,19 +1501,22 @@ bool cac::eventRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgB return true; } -bool cac::readRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void *pMsgBdy ) +bool cac::readRespAction ( callbackAutoMutex &, tcpiiu &, + const caHdrLargeArray &hdr, void *pMsgBdy ) { this->ioCompletionNotifyAndDestroy ( hdr.m_available, hdr.m_dataType, hdr.m_count, pMsgBdy ); return true; } -bool cac::clearChannelRespAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ ) +bool cac::clearChannelRespAction ( callbackAutoMutex &, tcpiiu &, + const caHdrLargeArray &, void * /* pMsgBdy */ ) { return true; // currently a noop } -bool cac::defaultExcep ( tcpiiu &iiu, const caHdrLargeArray &, +bool cac::defaultExcep ( callbackAutoMutex &, tcpiiu &iiu, + const caHdrLargeArray &, const char *pCtx, unsigned status ) { char buf[512]; @@ -1471,7 +1527,8 @@ bool cac::defaultExcep ( tcpiiu &iiu, const caHdrLargeArray &, return true; } -bool cac::eventAddExcep ( tcpiiu & /* iiu */, const caHdrLargeArray &hdr, +bool cac::eventAddExcep ( callbackAutoMutex &, tcpiiu & /* iiu */, + const caHdrLargeArray &hdr, const char *pCtx, unsigned status ) { this->ioExceptionNotify ( hdr.m_available, status, pCtx, @@ -1479,7 +1536,8 @@ bool cac::eventAddExcep ( tcpiiu & /* iiu */, const caHdrLargeArray &hdr, return true; } -bool cac::readExcep ( tcpiiu &, const caHdrLargeArray &hdr, +bool cac::readExcep ( callbackAutoMutex &, tcpiiu &, + const caHdrLargeArray &hdr, const char *pCtx, unsigned status ) { this->ioExceptionNotifyAndDestroy ( hdr.m_available, @@ -1487,7 +1545,8 @@ bool cac::readExcep ( tcpiiu &, const caHdrLargeArray &hdr, return true; } -bool cac::writeExcep ( tcpiiu &, const caHdrLargeArray &hdr, +bool cac::writeExcep ( callbackAutoMutex &, tcpiiu &, + const caHdrLargeArray &hdr, const char *pCtx, unsigned status ) { nciu * pChan = this->chanTable.lookup ( hdr.m_available ); @@ -1498,7 +1557,8 @@ bool cac::writeExcep ( tcpiiu &, const caHdrLargeArray &hdr, return true; } -bool cac::readNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr, +bool cac::readNotifyExcep ( callbackAutoMutex &, tcpiiu &, + const caHdrLargeArray &hdr, const char *pCtx, unsigned status ) { this->ioExceptionNotifyAndDestroy ( hdr.m_available, @@ -1506,7 +1566,8 @@ bool cac::readNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr, return true; } -bool cac::writeNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr, +bool cac::writeNotifyExcep ( callbackAutoMutex &, tcpiiu &, + const caHdrLargeArray &hdr, const char *pCtx, unsigned status ) { this->ioExceptionNotifyAndDestroy ( hdr.m_available, @@ -1514,7 +1575,8 @@ bool cac::writeNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr, return true; } -bool cac::exceptionRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy ) +bool cac::exceptionRespAction ( callbackAutoMutex & cbMutex, tcpiiu & iiu, + const caHdrLargeArray & hdr, void * pMsgBdy ) { const caHdr * pReq = reinterpret_cast < const caHdr * > ( pMsgBdy ); unsigned bytesSoFar = sizeof ( *pReq ); @@ -1550,10 +1612,11 @@ bool cac::exceptionRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *p pStub = cac::tcpExcepJumpTableCAC [req.m_cmmd]; } const char *pCtx = reinterpret_cast < const char * > ( pLW ); - return ( this->*pStub ) ( iiu, req, pCtx, hdr.m_available ); + return ( this->*pStub ) ( cbMutex, iiu, req, pCtx, hdr.m_available ); } -bool cac::accessRightsRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * /* pMsgBdy */ ) +bool cac::accessRightsRespAction ( callbackAutoMutex &, tcpiiu &, + const caHdrLargeArray &hdr, void * /* pMsgBdy */ ) { nciu * pChan; { @@ -1580,7 +1643,7 @@ bool cac::accessRightsRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * return true; } -bool cac::claimCIURespAction ( tcpiiu & iiu, +bool cac::claimCIURespAction ( callbackAutoMutex &, tcpiiu & iiu, const caHdrLargeArray & hdr, void * /*pMsgBdy */ ) { nciu * pChan; @@ -1613,7 +1676,8 @@ bool cac::claimCIURespAction ( tcpiiu & iiu, return true; } -bool cac::verifyAndDisconnectChan ( tcpiiu & /* iiu */, +bool cac::verifyAndDisconnectChan ( + callbackAutoMutex & cbMutex, tcpiiu & /* iiu */, const caHdrLargeArray & hdr, void * /* pMsgBdy */ ) { epicsAutoMutex autoMutex ( this->mutex ); @@ -1622,31 +1686,31 @@ bool cac::verifyAndDisconnectChan ( tcpiiu & /* iiu */, return true; } assert ( this->pudpiiu ); - this->disconnectChannelPrivate ( *pChan, this->pudpiiu ); + this->disconnectChannelPrivate ( cbMutex, autoMutex, + *pChan, *this->pudpiiu ); this->pSearchTmr->resetPeriod ( 0.0 ); return true; } -// callback lock and cac lock must be applied -void cac::disconnectChannelPrivate ( nciu & chan, netiiu *pDstIIU ) +void cac::disconnectChannelPrivate ( callbackAutoMutex & cbLocker, + epicsAutoMutex & locker, + nciu & chan, netiiu & dstIIU ) { - this->disconnectAllIO ( chan, true ); - chan.getPIIU()->detachChannel ( chan ); + this->disconnectAllIO ( locker, chan, true ); + chan.getPIIU()->detachChannel ( cbLocker, chan ); chan.disconnect ( limboIIU ); limboIIU.attachChannel ( chan ); - if ( pDstIIU ) { - epicsAutoMutexRelease autoMutexRelease ( this->mutex ); + { + epicsAutoMutexRelease autoMutexRelease ( locker ); chan.connectStateNotify (); chan.accessRightsNotify (); } - if ( pDstIIU ) { - limboIIU.detachChannel ( chan ); - chan.disconnect ( *pDstIIU ); - pDstIIU->attachChannel ( chan ); - } + limboIIU.detachChannel ( cbLocker, chan ); + chan.disconnect ( dstIIU ); + dstIIU.attachChannel ( chan ); } -bool cac::badTCPRespAction ( tcpiiu & iiu, +bool cac::badTCPRespAction ( callbackAutoMutex &, tcpiiu & iiu, const caHdrLargeArray & hdr, void * /* pMsgBdy */ ) { char hostName[64]; @@ -1656,7 +1720,8 @@ bool cac::badTCPRespAction ( tcpiiu & iiu, return false; } -bool cac::executeResponse ( tcpiiu &iiu, caHdrLargeArray &hdr, char *pMshBody ) +bool cac::executeResponse ( callbackAutoMutex & cbLocker, tcpiiu & iiu, + caHdrLargeArray & hdr, char * pMshBody ) { // execute the response message pProtoStubTCP pStub; @@ -1666,7 +1731,7 @@ bool cac::executeResponse ( tcpiiu &iiu, caHdrLargeArray &hdr, char *pMshBody ) else { pStub = cac::tcpJumpTableCAC [hdr.m_cmmd]; } - return ( this->*pStub ) ( iiu, hdr, pMshBody ); + return ( this->*pStub ) ( cbLocker, iiu, hdr, pMshBody ); } void cac::signal ( int ca_status, const char *pfilenm, @@ -1801,27 +1866,40 @@ void cac::selfTest () const this->beaconTable.verify (); } -void cac::notifyNewFD ( SOCKET sock ) const +void cac::notifyNewFD ( callbackAutoMutex &, SOCKET sock ) const { - if ( this->pCallbackLocker ) { - this->notify.fdWasCreated ( sock ); - } + this->notify.fdWasCreated ( sock ); } -void cac::notifyDestroyFD ( SOCKET sock ) const +void cac::notifyDestroyFD ( callbackAutoMutex &, SOCKET sock ) const { - if ( this->pCallbackLocker ) { - this->notify.fdWasDestroyed ( sock ); + this->notify.fdWasDestroyed ( sock ); +} + +void cac::tcpCircuitShutdown ( tcpiiu & iiu, bool discardPendingMessages ) +{ + // generate some NOOP UDP traffic so that ca_pend_event() + // will get called in preemptive callback disabled + // applications, and therefore the callback lock below + // will not block + { + epicsAutoMutex autoMutexCAC ( this->mutex ); + if ( this->pudpiiu ) { + this->pudpiiu->wakeupMsg (); + } } + callbackAutoMutex autoMutexCB ( *this ); + epicsAutoMutex autoMutexCAC ( this->mutex ); + iiu.shutdown ( autoMutexCB, discardPendingMessages ); } void cac::uninstallIIU ( tcpiiu & iiu ) { - epicsAutoMutex autoMutexCB ( this->callbackMutex ); - this->privateUninstallIIU ( iiu ); + callbackAutoMutex cbLocker ( *this ); + this->privateUninstallIIU ( cbLocker, iiu ); } -void cac::privateUninstallIIU ( tcpiiu & iiu ) +void cac::privateUninstallIIU ( callbackAutoMutex & cbMutex, tcpiiu & iiu ) { epicsAutoMutex autoMutexCAC ( this->mutex ); if ( iiu.channelCount() ) { @@ -1843,7 +1921,7 @@ void cac::privateUninstallIIU ( tcpiiu & iiu ) this->serverTable.remove ( iiu ); assert ( this->pudpiiu ); - this->removeAllChan ( iiu, this->pudpiiu ); + this->removeAllChan ( cbMutex, autoMutexCAC, iiu, *this->pudpiiu ); delete &iiu; @@ -1873,7 +1951,7 @@ void cac::preemptiveCallbackUnlock () assert ( this->recvThreadsPendingCount > 0 ); this->recvThreadsPendingCount--; if ( this->pCallbackLocker ) { - if ( this->recvThreadsPendingCount == 1u ) { + if ( this->recvThreadsPendingCount == 0u ) { signalRequired = true; } else { @@ -1905,12 +1983,3 @@ double cac::beaconPeriod ( const nciu & chan ) const } return - DBL_MAX; } - -void cac::udpWakeup () -{ - epicsAutoMutex locker ( this->mutex ); - if ( this->pudpiiu ) { - this->pudpiiu->wakeupMsg (); - } -} -