changed channel and subscription uninstal procedures

This commit is contained in:
Jeff Hill
2002-02-28 00:20:31 +00:00
parent 18e9d2a299
commit 1a1c884fa9

View File

@@ -152,7 +152,7 @@ extern "C" void cacOnceFunc ( void * )
// cac::cac ()
//
cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) :
ipToAEngine ( "caIPAddrToAsciiEngine" ),
ipToAEngine ( "dnsQuery" ),
programBeginTime ( epicsTime::getCurrent() ),
connTMO ( CA_CONN_VERIFY_PERIOD ),
timerQueue ( epicsTimerQueueActive::allocate ( false,
@@ -261,9 +261,12 @@ cac::~cac ()
{
//
// release callback lock
// (disconnect callbacks will occur if they still have connected
// channels at this point)
//
delete this->pCallbackLocker;
//
// lock intentionally not held here so that we dont deadlock
// waiting for the UDP thread to exit while it is waiting to
@@ -276,9 +279,11 @@ cac::~cac ()
//
// shutdown all tcp connections
// (take both locks here in the proper order to avoid deadlocks)
//
{
epicsAutoMutex autoMutex ( this->mutex );
epicsAutoMutex autoMutexCB ( this->callbackMutex );
epicsAutoMutex autoMutexCAC ( this->mutex );
this->serverTable.traverse ( & tcpiiu::cleanShutdown );
}
@@ -289,17 +294,22 @@ cac::~cac ()
this->iiuUninstall.wait ();
}
// (after this point all threads that know about this object
// have shut down so we no longer need to lock)
delete this->pRepeaterSubscribeTmr;
delete this->pSearchTmr;
freeListCleanup ( this->tcpSmallRecvBufFreeList );
freeListCleanup ( this->tcpLargeRecvBufFreeList );
{
epicsAutoMutex autoMutexCB ( this->callbackMutex );
epicsAutoMutex autoMutexCAC ( this->mutex );
if ( this->pudpiiu ) {
this->removeAllChan ( *this->pudpiiu );
if ( this->pudpiiu ) {
while ( nciu *pChan = this->pudpiiu->firstChannel() ) {
{
callbackAutoMutex autoMutexCB ( *this );
this->pudpiiu->detachChannel ( autoMutexCB, *pChan );
}
pChan->disconnect ( limboIIU );
limboIIU.attachChannel ( *pChan );
}
}
@@ -308,17 +318,17 @@ cac::~cac ()
this->beaconTable.traverse ( &bhe::destroy );
// its ok for channels and subscriptions to still
// exist at this point. The user created them and
// its his responsibility to clean them up.
osiSockRelease ();
this->timerQueue.release ();
// its ok for channels and subscriptions to still
// exist at this point. The user created them and
// its his responsibility to clean them up.
}
// must have callback lock and also cac lock
void cac::removeAllChan ( netiiu & srcIIU, netiiu *pDstIIU )
void cac::removeAllChan ( callbackAutoMutex & cbLocker, epicsAutoMutex & locker,
netiiu & srcIIU, netiiu & dstIIU )
{
// we are protected here because channel delete takes the callback mutex
while ( nciu *pChan = srcIIU.firstChannel() ) {
@@ -328,7 +338,7 @@ void cac::removeAllChan ( netiiu & srcIIU, netiiu *pDstIIU )
if ( pChan->connected() ) {
srcIIU.clearChannelRequest ( pChan->getSID(), pChan->getCID() );
}
this->disconnectChannelPrivate ( *pChan, pDstIIU );
this->disconnectChannelPrivate ( cbLocker, locker, *pChan, dstIIU );
}
}
@@ -542,7 +552,7 @@ int cac::pendIO ( const double & timeout )
}
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoRelease ( this->callbackMutex );
callbackAutoMutexRelease autoRelease ( *this->pCallbackLocker );
this->ioDone.wait ( remaining );
}
else {
@@ -573,7 +583,7 @@ int cac::pendIO ( const double & timeout )
int cac::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout )
{
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex );
callbackAutoMutexRelease autoMutexRelease ( *this->pCallbackLocker );
event.wait ( timeout );
}
else {
@@ -605,11 +615,10 @@ int cac::pendEvent ( const double & timeout )
this->flushRequestPrivate ();
}
// process at least once if preemptive callback
// isnt enabled
// process at least once if preemptive callback is disabled
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex );
while ( this->recvThreadsPendingCount > 1 ) {
callbackAutoMutexRelease autoMutexRelease ( *this->pCallbackLocker );
while ( this->recvThreadsPendingCount > 0 ) {
this->noRecvThreadsPending.wait ();
}
}
@@ -626,7 +635,7 @@ int cac::pendEvent ( const double & timeout )
if ( delay >= CAC_SIGNIFICANT_DELAY ) {
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex );
callbackAutoMutexRelease autoMutexRelease ( *this->pCallbackLocker );
epicsThreadSleep ( delay );
}
else {
@@ -715,7 +724,8 @@ bool cac::setupUDP ()
epicsAutoMutex autoMutex ( this->mutex );
if ( ! this->pudpiiu ) {
this->pudpiiu = new udpiiu ( *this );
callbackAutoMutex cbMutex ( *this );
this->pudpiiu = new udpiiu ( cbMutex, *this );
if ( ! this->pudpiiu ) {
return false;
}
@@ -745,12 +755,14 @@ void cac::repeaterSubscribeConfirmNotify ()
}
}
bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
bool cac::lookupChannelAndTransferToTCP (
callbackAutoMutex & cbMutex, unsigned cid, unsigned sid,
ca_uint16_t typeCode, arrayElementCount count,
unsigned minorVersionNumber, const osiSockAddr & addr,
const epicsTime & currentTime )
{
tcpiiu * pnewiiu = 0;
unsigned short retrySeqNumber = 0u;
if ( addr.sa.sa_family != AF_INET ) {
return false;
@@ -769,7 +781,7 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
return true;
}
unsigned short retrySeqNumber = chan->getRetrySeqNo ();
retrySeqNumber = chan->getRetrySeqNo ();
/*
* Ignore duplicate search replies
@@ -819,7 +831,7 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
}
}
this->pudpiiu->detachChannel ( *chan );
this->pudpiiu->detachChannel ( cbMutex, *chan );
chan->searchReplySetUp ( *piiu, sid, typeCode, count );
piiu->attachChannel ( *chan );
@@ -835,10 +847,6 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
// resubscribe for monitors from this channel
this->connectAllIO ( *chan );
}
if ( this->pSearchTmr ) {
this->pSearchTmr->notifySearchResponse ( retrySeqNumber, currentTime );
}
}
if ( ! v42Ok ) {
@@ -864,16 +872,18 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
}
if ( pnewiiu ) {
// this is done here after we release the priamry
// lock so that we will hold the callback lock but
// not the primary lock when the fd is registered
// with the user
bool success = pnewiiu->start ();
bool success = pnewiiu->start ( cbMutex );
if ( ! success ) {
this->privateUninstallIIU ( *pnewiiu );
this->privateUninstallIIU ( cbMutex, *pnewiiu );
}
}
if ( this->pSearchTmr ) {
// deadlock can result if this is called while holding the primary
// mutex (because the primary mutex is used in the search timer callback)
this->pSearchTmr->notifySearchResponse ( retrySeqNumber, currentTime );
}
return true;
}
@@ -887,61 +897,93 @@ void cac::uninstallChannel ( nciu & chan )
// we do not dead lock
{
epicsAutoMutex autoMutex ( this->mutex );
// if the send backlog is too high send some frames before we get entagled
// in the channel shutdown sequence below. There is special protection in
// this routine that releases the callback lock if we are already holding it
// when this is the tcp receive thread or if this is the main thread and
// preemptive callback is disabled.
this->flushIfRequired ( *chan.getPIIU() );
// unregister the channel
if ( this->chanTable.remove ( chan ) != &chan ) {
errlogPrintf (
"CAC: Attemt to uninstall unregisterred channel ID=%u ignored.\n",
chan.getId () );
return;
}
// for each outstanding IO
//
// IO must be removed from the list and also uninstalled while holding the
// lock so that ioCancel() does not break in while postponing the destroy
// waiting for outstanding callbacks to complete
//
while ( baseNMIU *pIO = chan.cacPrivateListOfIO::eventq.get() ) {
tmpList.add ( *pIO );
this->ioTable.remove ( *pIO );
// unregister IO class
if ( pIO != this->ioTable.remove ( *pIO ) ) {
errlogPrintf (
"CAC: Unregister IO ID=%u found when uninstalling channel?\n",
pIO->getId () );
continue;
}
// connected subscriptions must be canceled in the server
class netSubscription *pSubscr = pIO->isSubscription ();
if ( pSubscr && chan.connected() ) {
// we will deadlock if we hold the callback lock here
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
}
{
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
// If they call ioCancel() here it will be ignored
// because the IO has been unregistered above
pIO->exception ( ECA_CHANDESTROY, chan.pName() );
}
tmpList.add ( *pIO );
}
nciu * pChan = this->chanTable.remove ( chan );
assert ( pChan == &chan );
// if the claim reply has not returned yet then we will issue
// the clear channel request to the server when the claim reply
// arrives and there is no matching nciu in the client
if ( pChan->connected() ) {
if ( chan.connected() ) {
chan.getPIIU()->clearChannelRequest ( chan.getSID(), chan.getCID() );
}
chan.getPIIU()->detachChannel ( chan );
}
// take callback lock briefly to ensure that any callbacks in
// progress complete prior to destroying channel and any associated IO
// take callback lock to ensure that any callbacks in
// progress complete prior to destroying channel and any
// associated IO
//
// If this is a callback thread then it already owns the
// CB lock at this point. If this is the main thread then we
// are not in pendEvent, pendIO, SG block, etc and
// this->pCallbackLocker protects. Otherwise if this is
// a preemptive callback enabled app then this->pCallbackLocker
// CB lock at this point. If this is the main thread then
// it is possible that this->pCallbackLocker already owns
// the callback lock. Otherwise if this is a preemptive
// callback enabled app then this->pCallbackLocker
// isnt set and we must take the call back lock.
//
// must _not_ hold callback lock while sending channel and subscription
// cancel requests above unless.
// 1) this is a recv thread
// 2) this is the thread that initialized the library and preemptive
// callback is disabled
bool alreadyLocked = epicsThreadPrivateGet ( caClientCallbackThreadId )
|| this->pCallbackLocker;
if ( ! alreadyLocked ) {
// taking this mutex priior to deleting the IO and channel guarantees
// that we will not delete a channel out from under a callback
epicsAutoMutex autoCallbackMutex ( this->callbackMutex );
}
// destroy subsiderary IO now that it is safe to do so
// We take the callback lock again here and rely on recursive
// mutex capabilities, but perhaps this should be handled
// differently in the future.
//
// bool alreadyLocked = epicsThreadPrivateGet ( caClientCallbackThreadId )
// || this->pCallbackLocker;
{
epicsAutoMutex autoMutex ( this->mutex );
// taking this mutex prior to deleting the IO and channel guarantees
// that we will not delete a channel out from under a callback
callbackAutoMutex cbLocker ( *this );
// destroy subsiderary IO now that it is safe to do so
while ( baseNMIU *pIO = tmpList.get() ) {
// If they call ioCancel() here it will be ignored
// because the IO has been unregistered above.
// This must be done after outstanding callbacks
// for this channel have completed.
pIO->exception ( ECA_CHANDESTROY, chan.pName() );
pIO->destroy ( *this );
}
// this must be done after the following
// o subscription cancel requests
// o clear channel request
// o outstanding callbacks using this channel have completed
// o chan destroy exception has been delivered
epicsAutoMutex autoMutex ( this->mutex );
// this destroys the tcpiiu if its the last channel
chan.getPIIU()->detachChannel ( cbLocker, chan );
}
}
@@ -1037,6 +1079,29 @@ cac::readNotifyRequest ( nciu &chan, unsigned type, // X aCC 361
void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
{
baseNMIU * pmiu;
// unistall the IO object so that a receive thread will not find it,
// but do _not_ hold the callback lock here because this could result
// in deadlock
{
epicsAutoMutex autoMutex ( this->mutex );
pmiu = this->ioTable.remove ( id );
if ( ! pmiu ) {
return;
}
class netSubscription *pSubscr = pmiu->isSubscription ();
if ( pSubscr ) {
this->flushIfRequired ( *chan.getPIIU() );
if ( chan.connected() ) {
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
}
}
// must be uninstalled and also removed from the table
// while holding the lock to prevent a channel delete
// from destroying this IO object after we release the lock
chan.cacPrivateListOfIO::eventq.remove ( *pmiu );
}
// wait for any IO callbacks in progress to complete
// prior to destroying the IO object
//
@@ -1046,31 +1111,14 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
// this->pCallbackLocker protects. Otherwise if this id
// the users auxillary thread then this->pCallbackLocker
// isnt set and we must take the call back lock.
if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
this->ioCancelPrivate ( chan, id );
}
else if ( this->pCallbackLocker ) {
this->ioCancelPrivate ( chan, id );
}
else {
bool alreadyLocked = ( epicsThreadPrivateGet ( caClientCallbackThreadId )
|| this->pCallbackLocker );
if ( ! alreadyLocked ) {
epicsAutoMutex autoMutex ( this->callbackMutex );
this->ioCancelPrivate ( chan, id );
}
}
void cac::ioCancelPrivate ( nciu & chan, const cacChannel::ioid & id )
{
epicsAutoMutex autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( pmiu ) {
chan.cacPrivateListOfIO::eventq.remove ( *pmiu );
class netSubscription *pSubscr = pmiu->isSubscription ();
if ( pSubscr ) {
this->flushIfRequired ( *chan.getPIIU() );
if ( chan.connected() ) {
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
}
}
// now it is safe to destroy the IO object
{
epicsAutoMutex autoMutex ( this->mutex );
pmiu->destroy ( *this );
}
}
@@ -1167,7 +1215,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id )
// it is in use here.
//
{
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
epicsAutoMutexRelease autoMutexRelease ( autoMutex );
pmiu->completion ();
}
@@ -1191,7 +1239,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id,
// it is in use here.
//
{
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
epicsAutoMutexRelease autoMutexRelease ( autoMutex );
pmiu->completion ( type, count, pData );
}
@@ -1215,7 +1263,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
// it is in use here.
//
{
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
epicsAutoMutexRelease autoMutexRelease ( autoMutex );
pmiu->exception ( status, pContext );
}
@@ -1240,7 +1288,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
//
{
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
epicsAutoMutexRelease autoMutexRelease ( autoMutex );
pmiu->exception ( status, pContext, type, count );
}
@@ -1272,7 +1320,7 @@ void cac::connectAllIO ( nciu & chan )
// cancel IO operations and monitor subscriptions
// -- callback lock and cac lock must be applied here
void cac::disconnectAllIO ( nciu & chan, bool enableCallbacks )
void cac::disconnectAllIO ( epicsAutoMutex &locker, nciu & chan, bool enableCallbacks )
{
tsDLIterBD<baseNMIU> pNetIO = chan.cacPrivateListOfIO::eventq.firstIter();
while ( pNetIO.valid() ) {
@@ -1286,7 +1334,7 @@ void cac::disconnectAllIO ( nciu & chan, bool enableCallbacks )
if ( enableCallbacks ) {
char buf[128];
sprintf ( buf, "host = %100s", chan.pHostName() );
epicsAutoMutexRelease unlocker ( this->mutex );
epicsAutoMutexRelease unlocker ( locker );
pNetIO->exception ( ECA_DISCONN, buf );
}
if ( ! pNetIO->isSubscription() ) {
@@ -1337,17 +1385,20 @@ cac::subscriptionRequest ( nciu &chan, unsigned type, // X aCC 361
}
}
bool cac::noopAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ )
bool cac::noopAction ( callbackAutoMutex &, tcpiiu &,
const caHdrLargeArray &, void * /* pMsgBdy */ )
{
return true;
}
bool cac::echoRespAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ )
bool cac::echoRespAction ( callbackAutoMutex &, tcpiiu &,
const caHdrLargeArray &, void * /* pMsgBdy */ )
{
return true;
}
bool cac::writeNotifyRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * /* pMsgBdy */ )
bool cac::writeNotifyRespAction ( callbackAutoMutex &, tcpiiu &,
const caHdrLargeArray &hdr, void * /* pMsgBdy */ )
{
int caStatus = hdr.m_cid;
if ( caStatus == ECA_NORMAL ) {
@@ -1360,7 +1411,8 @@ bool cac::writeNotifyRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * /
return true;
}
bool cac::readNotifyRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy )
bool cac::readNotifyRespAction ( callbackAutoMutex &, tcpiiu &iiu,
const caHdrLargeArray &hdr, void *pMsgBdy )
{
/*
@@ -1400,7 +1452,8 @@ bool cac::readNotifyRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *
return true;
}
bool cac::eventRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy )
bool cac::eventRespAction (callbackAutoMutex &, tcpiiu &iiu,
const caHdrLargeArray &hdr, void *pMsgBdy )
{
int caStatus;
@@ -1448,19 +1501,22 @@ bool cac::eventRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgB
return true;
}
bool cac::readRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void *pMsgBdy )
bool cac::readRespAction ( callbackAutoMutex &, tcpiiu &,
const caHdrLargeArray &hdr, void *pMsgBdy )
{
this->ioCompletionNotifyAndDestroy ( hdr.m_available,
hdr.m_dataType, hdr.m_count, pMsgBdy );
return true;
}
bool cac::clearChannelRespAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ )
bool cac::clearChannelRespAction ( callbackAutoMutex &, tcpiiu &,
const caHdrLargeArray &, void * /* pMsgBdy */ )
{
return true; // currently a noop
}
bool cac::defaultExcep ( tcpiiu &iiu, const caHdrLargeArray &,
bool cac::defaultExcep ( callbackAutoMutex &, tcpiiu &iiu,
const caHdrLargeArray &,
const char *pCtx, unsigned status )
{
char buf[512];
@@ -1471,7 +1527,8 @@ bool cac::defaultExcep ( tcpiiu &iiu, const caHdrLargeArray &,
return true;
}
bool cac::eventAddExcep ( tcpiiu & /* iiu */, const caHdrLargeArray &hdr,
bool cac::eventAddExcep ( callbackAutoMutex &, tcpiiu & /* iiu */,
const caHdrLargeArray &hdr,
const char *pCtx, unsigned status )
{
this->ioExceptionNotify ( hdr.m_available, status, pCtx,
@@ -1479,7 +1536,8 @@ bool cac::eventAddExcep ( tcpiiu & /* iiu */, const caHdrLargeArray &hdr,
return true;
}
bool cac::readExcep ( tcpiiu &, const caHdrLargeArray &hdr,
bool cac::readExcep ( callbackAutoMutex &, tcpiiu &,
const caHdrLargeArray &hdr,
const char *pCtx, unsigned status )
{
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
@@ -1487,7 +1545,8 @@ bool cac::readExcep ( tcpiiu &, const caHdrLargeArray &hdr,
return true;
}
bool cac::writeExcep ( tcpiiu &, const caHdrLargeArray &hdr,
bool cac::writeExcep ( callbackAutoMutex &, tcpiiu &,
const caHdrLargeArray &hdr,
const char *pCtx, unsigned status )
{
nciu * pChan = this->chanTable.lookup ( hdr.m_available );
@@ -1498,7 +1557,8 @@ bool cac::writeExcep ( tcpiiu &, const caHdrLargeArray &hdr,
return true;
}
bool cac::readNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr,
bool cac::readNotifyExcep ( callbackAutoMutex &, tcpiiu &,
const caHdrLargeArray &hdr,
const char *pCtx, unsigned status )
{
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
@@ -1506,7 +1566,8 @@ bool cac::readNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr,
return true;
}
bool cac::writeNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr,
bool cac::writeNotifyExcep ( callbackAutoMutex &, tcpiiu &,
const caHdrLargeArray &hdr,
const char *pCtx, unsigned status )
{
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
@@ -1514,7 +1575,8 @@ bool cac::writeNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr,
return true;
}
bool cac::exceptionRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy )
bool cac::exceptionRespAction ( callbackAutoMutex & cbMutex, tcpiiu & iiu,
const caHdrLargeArray & hdr, void * pMsgBdy )
{
const caHdr * pReq = reinterpret_cast < const caHdr * > ( pMsgBdy );
unsigned bytesSoFar = sizeof ( *pReq );
@@ -1550,10 +1612,11 @@ bool cac::exceptionRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *p
pStub = cac::tcpExcepJumpTableCAC [req.m_cmmd];
}
const char *pCtx = reinterpret_cast < const char * > ( pLW );
return ( this->*pStub ) ( iiu, req, pCtx, hdr.m_available );
return ( this->*pStub ) ( cbMutex, iiu, req, pCtx, hdr.m_available );
}
bool cac::accessRightsRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * /* pMsgBdy */ )
bool cac::accessRightsRespAction ( callbackAutoMutex &, tcpiiu &,
const caHdrLargeArray &hdr, void * /* pMsgBdy */ )
{
nciu * pChan;
{
@@ -1580,7 +1643,7 @@ bool cac::accessRightsRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void *
return true;
}
bool cac::claimCIURespAction ( tcpiiu & iiu,
bool cac::claimCIURespAction ( callbackAutoMutex &, tcpiiu & iiu,
const caHdrLargeArray & hdr, void * /*pMsgBdy */ )
{
nciu * pChan;
@@ -1613,7 +1676,8 @@ bool cac::claimCIURespAction ( tcpiiu & iiu,
return true;
}
bool cac::verifyAndDisconnectChan ( tcpiiu & /* iiu */,
bool cac::verifyAndDisconnectChan (
callbackAutoMutex & cbMutex, tcpiiu & /* iiu */,
const caHdrLargeArray & hdr, void * /* pMsgBdy */ )
{
epicsAutoMutex autoMutex ( this->mutex );
@@ -1622,31 +1686,31 @@ bool cac::verifyAndDisconnectChan ( tcpiiu & /* iiu */,
return true;
}
assert ( this->pudpiiu );
this->disconnectChannelPrivate ( *pChan, this->pudpiiu );
this->disconnectChannelPrivate ( cbMutex, autoMutex,
*pChan, *this->pudpiiu );
this->pSearchTmr->resetPeriod ( 0.0 );
return true;
}
// callback lock and cac lock must be applied
void cac::disconnectChannelPrivate ( nciu & chan, netiiu *pDstIIU )
void cac::disconnectChannelPrivate ( callbackAutoMutex & cbLocker,
epicsAutoMutex & locker,
nciu & chan, netiiu & dstIIU )
{
this->disconnectAllIO ( chan, true );
chan.getPIIU()->detachChannel ( chan );
this->disconnectAllIO ( locker, chan, true );
chan.getPIIU()->detachChannel ( cbLocker, chan );
chan.disconnect ( limboIIU );
limboIIU.attachChannel ( chan );
if ( pDstIIU ) {
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
{
epicsAutoMutexRelease autoMutexRelease ( locker );
chan.connectStateNotify ();
chan.accessRightsNotify ();
}
if ( pDstIIU ) {
limboIIU.detachChannel ( chan );
chan.disconnect ( *pDstIIU );
pDstIIU->attachChannel ( chan );
}
limboIIU.detachChannel ( cbLocker, chan );
chan.disconnect ( dstIIU );
dstIIU.attachChannel ( chan );
}
bool cac::badTCPRespAction ( tcpiiu & iiu,
bool cac::badTCPRespAction ( callbackAutoMutex &, tcpiiu & iiu,
const caHdrLargeArray & hdr, void * /* pMsgBdy */ )
{
char hostName[64];
@@ -1656,7 +1720,8 @@ bool cac::badTCPRespAction ( tcpiiu & iiu,
return false;
}
bool cac::executeResponse ( tcpiiu &iiu, caHdrLargeArray &hdr, char *pMshBody )
bool cac::executeResponse ( callbackAutoMutex & cbLocker, tcpiiu & iiu,
caHdrLargeArray & hdr, char * pMshBody )
{
// execute the response message
pProtoStubTCP pStub;
@@ -1666,7 +1731,7 @@ bool cac::executeResponse ( tcpiiu &iiu, caHdrLargeArray &hdr, char *pMshBody )
else {
pStub = cac::tcpJumpTableCAC [hdr.m_cmmd];
}
return ( this->*pStub ) ( iiu, hdr, pMshBody );
return ( this->*pStub ) ( cbLocker, iiu, hdr, pMshBody );
}
void cac::signal ( int ca_status, const char *pfilenm,
@@ -1801,27 +1866,40 @@ void cac::selfTest () const
this->beaconTable.verify ();
}
void cac::notifyNewFD ( SOCKET sock ) const
void cac::notifyNewFD ( callbackAutoMutex &, SOCKET sock ) const
{
if ( this->pCallbackLocker ) {
this->notify.fdWasCreated ( sock );
}
this->notify.fdWasCreated ( sock );
}
void cac::notifyDestroyFD ( SOCKET sock ) const
void cac::notifyDestroyFD ( callbackAutoMutex &, SOCKET sock ) const
{
if ( this->pCallbackLocker ) {
this->notify.fdWasDestroyed ( sock );
this->notify.fdWasDestroyed ( sock );
}
void cac::tcpCircuitShutdown ( tcpiiu & iiu, bool discardPendingMessages )
{
// generate some NOOP UDP traffic so that ca_pend_event()
// will get called in preemptive callback disabled
// applications, and therefore the callback lock below
// will not block
{
epicsAutoMutex autoMutexCAC ( this->mutex );
if ( this->pudpiiu ) {
this->pudpiiu->wakeupMsg ();
}
}
callbackAutoMutex autoMutexCB ( *this );
epicsAutoMutex autoMutexCAC ( this->mutex );
iiu.shutdown ( autoMutexCB, discardPendingMessages );
}
void cac::uninstallIIU ( tcpiiu & iiu )
{
epicsAutoMutex autoMutexCB ( this->callbackMutex );
this->privateUninstallIIU ( iiu );
callbackAutoMutex cbLocker ( *this );
this->privateUninstallIIU ( cbLocker, iiu );
}
void cac::privateUninstallIIU ( tcpiiu & iiu )
void cac::privateUninstallIIU ( callbackAutoMutex & cbMutex, tcpiiu & iiu )
{
epicsAutoMutex autoMutexCAC ( this->mutex );
if ( iiu.channelCount() ) {
@@ -1843,7 +1921,7 @@ void cac::privateUninstallIIU ( tcpiiu & iiu )
this->serverTable.remove ( iiu );
assert ( this->pudpiiu );
this->removeAllChan ( iiu, this->pudpiiu );
this->removeAllChan ( cbMutex, autoMutexCAC, iiu, *this->pudpiiu );
delete &iiu;
@@ -1873,7 +1951,7 @@ void cac::preemptiveCallbackUnlock ()
assert ( this->recvThreadsPendingCount > 0 );
this->recvThreadsPendingCount--;
if ( this->pCallbackLocker ) {
if ( this->recvThreadsPendingCount == 1u ) {
if ( this->recvThreadsPendingCount == 0u ) {
signalRequired = true;
}
else {
@@ -1905,12 +1983,3 @@ double cac::beaconPeriod ( const nciu & chan ) const
}
return - DBL_MAX;
}
void cac::udpWakeup ()
{
epicsAutoMutex locker ( this->mutex );
if ( this->pudpiiu ) {
this->pudpiiu->wakeupMsg ();
}
}