diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 4ca37ba77..8b04c5d58 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -15,7 +15,6 @@ #include #include "epicsMemory.h" -#include "epicsMutex.h" #include "epicsGuard.h" #include "osiProcess.h" #include "osiSigPipeIgnore.h" @@ -30,34 +29,11 @@ #include "syncGroup.h" #include "nciu.h" #include "autoPtrRecycle.h" -#include "searchTimer.h" -#include "repeaterSubscribeTimer.h" #include "msgForMultiplyDefinedPV.h" #include "udpiiu.h" #include "bhe.h" #include "net_convert.h" -#ifdef _MSC_VER -# pragma warning ( push ) -# pragma warning ( disable:4660 ) -#endif - -template class resTable < nciu, chronIntId >; -template class chronIntIdResTable < nciu >; -template class resTable < baseNMIU, chronIntId >; -template class chronIntIdResTable < baseNMIU >; -template class resTable < CASG, chronIntId >; -template class chronIntIdResTable < CASG >; -template class resTable < bhe, inetAddrID >; -template class resTable < tcpiiu, caServerID >; -template class tsFreeList < netReadNotifyIO, 1024, epicsMutexNOOP >; -template class tsFreeList < netWriteNotifyIO, 1024, epicsMutexNOOP >; -template class tsFreeList < netSubscription, 1024, epicsMutexNOOP >; - -#ifdef _MSC_VER -# pragma warning ( pop ) -#endif - // TCP response dispatch table const cac::pProtoStubTCP cac::tcpJumpTableCAC [] = { @@ -137,12 +113,8 @@ extern "C" void cacExitHandler () extern "C" void cacOnceFunc ( void * ) { caClientCallbackThreadId = epicsThreadPrivateCreate (); - if ( caClientCallbackThreadId ) { - atexit ( cacExitHandler ); - } - else { - throw std::bad_alloc (); - } + assert ( caClientCallbackThreadId ); + atexit ( cacExitHandler ); } // @@ -157,8 +129,6 @@ cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) : lowestPriorityLevelAbove(epicsThreadGetPrioritySelf()) ) ), pUserName ( 0 ), pudpiiu ( 0 ), - pSearchTmr ( 0 ), - pRepeaterSubscribeTmr ( 0 ), tcpSmallRecvBufFreeList ( 0 ), tcpLargeRecvBufFreeList ( 0 ), pCallbackGuard ( 0 ), @@ -265,11 +235,10 @@ cac::~cac () // lock intentionally not held here so that we dont deadlock // waiting for the UDP thread to exit while it is waiting to // get the lock. - if ( this->pudpiiu ) { - // this blocks until the UDP thread exits so that - // it will not sneak in any new clients - this->pudpiiu->shutdown (); - } + + // this blocks until the UDP thread exits so that + // it will not sneak in any new clients + delete this->pudpiiu; // // shutdown all tcp connections @@ -277,7 +246,7 @@ cac::~cac () // { epicsGuard < callbackMutex > autoMutexCB ( this->cbMutex ); - epicsGuard < epicsMutex > autoMutexCAC ( this->mutex ); + epicsGuard < cacMutex > autoMutexCAC ( this->mutex ); this->serverTable.traverse ( & tcpiiu::cleanShutdown ); } @@ -288,26 +257,14 @@ cac::~cac () this->iiuUninstall.wait (); } - // (after this point all threads that know about this object - // have shut down so we no longer need to lock) - delete this->pRepeaterSubscribeTmr; - delete this->pSearchTmr; - freeListCleanup ( this->tcpSmallRecvBufFreeList ); freeListCleanup ( this->tcpLargeRecvBufFreeList ); if ( this->pudpiiu ) { - while ( nciu *pChan = this->pudpiiu->firstChannel() ) { - { - epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); - this->pudpiiu->detachChannel ( cbGuard, *pChan ); - } - pChan->disconnect ( limboIIU ); - limboIIU.attachChannel ( *pChan ); - } + epicsGuard < callbackMutex > autoMutexCB ( this->cbMutex ); + this->pudpiiu->removeAllChannels ( autoMutexCB ); } - delete this->pudpiiu; delete [] this->pUserName; this->beaconTable.traverse ( &bhe::destroy ); @@ -321,21 +278,6 @@ cac::~cac () // its his responsibility to clean them up. } -void cac::removeAllChan ( epicsGuard < callbackMutex > & cbLocker, epicsGuard < epicsMutex > & locker, - netiiu & srcIIU, netiiu & dstIIU ) -{ - // we are protected here because channel delete takes the callback mutex - while ( nciu *pChan = srcIIU.firstChannel() ) { - // if the claim reply has not returned then we will issue - // the clear channel request to the server when the claim reply - // arrives and there is no matching nciu in the client - if ( pChan->connected() ) { - srcIIU.clearChannelRequest ( pChan->getSID(), pChan->getCID() ); - } - this->disconnectChannelPrivate ( cbLocker, locker, *pChan, dstIIU ); - } -} - unsigned cac::lowestPriorityLevelAbove ( unsigned priority ) { unsigned abovePriority; @@ -365,7 +307,7 @@ unsigned cac::highestPriorityLevelBelow ( unsigned priority ) // void cac::flushRequest () { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > autoMutex ( this->mutex ); this->flushRequestPrivate (); } @@ -377,13 +319,13 @@ void cac::flushRequestPrivate () unsigned cac::connectionCount () const { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); return this->serverTable.numEntriesInstalled (); } void cac::show ( unsigned level ) const { - epicsGuard < epicsMutex > autoMutex2 ( this->mutex ); + epicsGuard < cacMutex > autoMutex2 ( this->mutex ); ::printf ( "Channel Access Client Context at %p for user %s\n", static_cast ( this ), this->pUserName ); @@ -417,14 +359,6 @@ void cac::show ( unsigned level ) const this->beaconTable.show ( level - 3u ); ::printf ( "Timer queue:\n" ); this->timerQueue.show ( level - 3u ); - if ( this->pSearchTmr ) { - ::printf ( "search message timer:\n" ); - this->pSearchTmr->show ( level - 3u ); - } - if ( this->pRepeaterSubscribeTmr ) { - ::printf ( "repeater subscribee timer:\n" ); - this->pRepeaterSubscribeTmr->show ( level - 3u ); - } ::printf ( "IP address to name conversion engine:\n" ); this->ipToAEngine.show ( level - 3u ); ::printf ( "\tthe current read sequence number is %u\n", @@ -447,7 +381,7 @@ void cac::show ( unsigned level ) const void cac::beaconNotify ( const inetAddrID & addr, const epicsTime & currentTime, unsigned beaconNumber, unsigned protocolRevision ) { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); if ( ! this->pudpiiu ) { return; @@ -483,28 +417,7 @@ void cac::beaconNotify ( const inetAddrID & addr, const epicsTime & currentTime, return; } - /* - * This part is needed when many machines - * have channels in a disconnected state that - * dont exist anywhere on the network. This insures - * that we dont have many CA clients synchronously - * flooding the network with broadcasts (and swamping - * out requests for valid channels). - * - * I fetch the local UDP port number and use the low - * order bits as a pseudo random delay to prevent every - * one from replying at once. - */ - if ( this->pSearchTmr ) { - static const double portTicksPerSec = 1000u; - static const unsigned portBasedDelayMask = 0xff; - unsigned port = this->pudpiiu->getPort (); - double delay = ( port & portBasedDelayMask ); - delay /= portTicksPerSec; - this->pSearchTmr->resetPeriod ( delay ); - } - - this->pudpiiu->resetChannelRetryCounts (); + this->pudpiiu->beaconAnomalyNotify (); # if DEBUG { @@ -535,7 +448,7 @@ int cac::pendIO ( const double & timeout ) double remaining = timeout; { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); this->flushRequestPrivate (); } @@ -563,12 +476,13 @@ int cac::pendIO ( const double & timeout ) } { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); this->readSeq++; this->pndRecvCnt = 0u; - if ( this->pudpiiu ) { - this->pudpiiu->connectTimeoutNotify (); - } + } + + if ( this->pudpiiu ) { + this->pudpiiu->pendioTimeoutNotify (); } return status; @@ -605,7 +519,7 @@ int cac::pendEvent ( const double & timeout ) epicsTime current = epicsTime::getCurrent (); { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); this->flushRequestPrivate (); } @@ -640,19 +554,19 @@ int cac::pendEvent ( const double & timeout ) void cac::installCASG ( CASG &sg ) { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); this->sgTable.add ( sg ); } void cac::uninstallCASG ( CASG &sg ) { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); this->sgTable.remove ( sg ); } CASG * cac::lookupCASG ( unsigned id ) { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); CASG * psg = this->sgTable.lookup ( id ); if ( psg ) { if ( ! psg->verify () ) { @@ -684,74 +598,27 @@ cacChannel & cac::createChannel ( const char * pName, if ( ! pIO ) { pIO = pGlobalServiceListCAC->createChannel ( pName, chan, pri ); if ( ! pIO ) { - if ( ! this->pudpiiu || ! this->pSearchTmr ) { - if ( ! this->setupUDP () ) { - throw ECA_INTERNAL; + if ( ! this->pudpiiu ) { + epicsGuard < cacMutex > guard ( this->mutex ); + if ( ! this->pudpiiu ) { + this->pudpiiu = new udpiiu ( this->timerQueue, this->cbMutex, *this ); } } - epics_auto_ptr < cacChannel > pNetChan - ( new nciu ( *this, limboIIU, chan, pName, pri ) ); + epicsGuard < cacMutex > guard ( this->mutex ); + epics_auto_ptr < nciu > pNetChan + ( new nciu ( *this, *this->pudpiiu, chan, pName, pri ) ); + pNetChan->notifyStateChangeFirstConnectInCountOfOutstandingIO (); + this->chanTable.add ( *pNetChan ); return *pNetChan.release (); } } return *pIO; } -void cac::installNetworkChannel ( nciu & chan, netiiu * & piiu ) -{ - epicsGuard < epicsMutex > autoMutex ( this->mutex ); - this->chanTable.add ( chan ); - this->pudpiiu->attachChannel ( chan ); - piiu = this->pudpiiu; - this->pSearchTmr->resetPeriod ( 0.0 ); -} - -bool cac::setupUDP () -{ - if ( ! this->pudpiiu ) { - udpiiu *piiu = 0; - epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); - { - epicsGuard < epicsMutex > guard ( this->mutex ); - if ( ! this->pudpiiu ) { - piiu = this->pudpiiu = new udpiiu ( this->cbMutex, *this ); - if ( ! this->pudpiiu ) { - return false; - } - } - } - if ( piiu ) { - piiu->start ( cbGuard ); - } - } - - if ( ! this->pSearchTmr ) { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); - if ( ! this->pSearchTmr ) { - this->pSearchTmr = new searchTimer ( *this->pudpiiu, this->timerQueue, this->mutex ); - if ( ! this->pSearchTmr ) { - return false; - } - } - } - - if ( ! this->pRepeaterSubscribeTmr ) { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); - if ( ! this->pRepeaterSubscribeTmr ) { - this->pRepeaterSubscribeTmr = new repeaterSubscribeTimer ( *this->pudpiiu, this->timerQueue ); - if ( ! this->pRepeaterSubscribeTmr ) { - return false; - } - } - } - - return true; -} - void cac::repeaterSubscribeConfirmNotify () { - if ( this->pRepeaterSubscribeTmr ) { - this->pRepeaterSubscribeTmr->confirmNotify (); + if ( this->pudpiiu ) { + this->pudpiiu->repeaterConfirmNotify (); } } @@ -769,30 +636,30 @@ bool cac::lookupChannelAndTransferToTCP ( } bool v41Ok, v42Ok; - nciu *chan; + nciu *pChan; { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); /* * ignore search replies for deleted channels */ - chan = this->chanTable.lookup ( cid ); - if ( ! chan ) { + pChan = this->chanTable.lookup ( cid ); + if ( ! pChan ) { return true; } - retrySeqNumber = chan->getRetrySeqNo (); + retrySeqNumber = pChan->getRetrySeqNo (); /* * Ignore duplicate search replies */ - osiSockAddr chanAddr = chan->getPIIU()->getNetworkAddress (); + osiSockAddr chanAddr = pChan->getPIIU()->getNetworkAddress (); if ( chanAddr.sa.sa_family != AF_UNSPEC ) { if ( ! sockAddrAreIdentical ( &addr, &chanAddr ) ) { char acc[64]; - chan->getPIIU()->hostName ( acc, sizeof ( acc ) ); + pChan->getPIIU()->hostName ( acc, sizeof ( acc ) ); msgForMultiplyDefinedPV *pMsg = new msgForMultiplyDefinedPV ( - this->cbMutex, *this, chan->pName (), acc, addr ); + this->cbMutex, *this, pChan->pName (), acc, addr ); if ( pMsg ) { this->ipAddrToAsciiAsynchronousRequestInstall ( *pMsg ); } @@ -803,7 +670,7 @@ bool cac::lookupChannelAndTransferToTCP ( /* * look for an existing virtual circuit */ - caServerID servID ( addr.ia, chan->getPriority() ); + caServerID servID ( addr.ia, pChan->getPriority() ); tcpiiu * piiu = this->serverTable.lookup ( servID ); if ( piiu ) { if ( ! piiu->alive () ) { @@ -815,7 +682,7 @@ bool cac::lookupChannelAndTransferToTCP ( pnewiiu = piiu = new tcpiiu ( *this, this->cbMutex, this->connTMO, this->timerQueue, addr, minorVersionNumber, this->ipToAEngine, - chan->getPriority() ); + pChan->getPriority() ); if ( ! piiu ) { return true; } @@ -841,21 +708,17 @@ bool cac::lookupChannelAndTransferToTCP ( } } - this->pudpiiu->detachChannel ( cbGuard, *chan ); - chan->searchReplySetUp ( *piiu, sid, typeCode, count ); - piiu->attachChannel ( *chan ); - - chan->createChannelRequest (); - piiu->flushRequest (); + this->pudpiiu->uninstallChannel ( cbGuard, guard, *pChan ); + piiu->installChannel ( guard, *pChan, sid, typeCode, count ); v41Ok = piiu->ca_v41_ok (); v42Ok = piiu->ca_v42_ok (); if ( ! v42Ok ) { // connect to old server with lock applied - chan->connect (); + pChan->connect (); // resubscribe for monitors from this channel - this->connectAllIO ( *chan ); + this->connectAllIO ( guard, *pChan ); } } @@ -868,7 +731,7 @@ bool cac::lookupChannelAndTransferToTCP ( // disconnects to prevent a race condition with the // code below - ie we hold the callback lock here // so a chanel cant be destroyed out from under us. - chan->connectStateNotify (); + pChan->connectStateNotify (); /* * if less than v4.1 then the server will never @@ -877,7 +740,7 @@ bool cac::lookupChannelAndTransferToTCP ( * their call back here */ if ( ! v41Ok ) { - chan->accessRightsNotify (); + pChan->accessRightsNotify (); } } @@ -885,10 +748,8 @@ bool cac::lookupChannelAndTransferToTCP ( pnewiiu->start ( cbGuard ); } - if ( this->pSearchTmr ) { - // deadlock can result if this is called while holding the primary - // mutex (because the primary mutex is used in the search timer callback) - this->pSearchTmr->notifySearchResponse ( retrySeqNumber, currentTime ); + if ( this->pudpiiu ) { + this->pudpiiu->notifySearchResponse ( retrySeqNumber, currentTime ); } return true; @@ -903,7 +764,7 @@ void cac::uninstallChannel ( nciu & chan ) // side effect IO requests w/o holding the callback lock so that // we do not dead lock { - epicsGuard < epicsMutex > guard ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); // if the send backlog is too high send some frames before we get entagled // in the channel shutdown sequence below. There is special protection in @@ -938,7 +799,7 @@ void cac::uninstallChannel ( nciu & chan ) class netSubscription *pSubscr = pIO->isSubscription (); if ( pSubscr && chan.connected() ) { // we will deadlock if we hold the callback lock here - chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); + chan.getPIIU()->subscriptionCancelRequest ( guard, chan, *pSubscr ); } tmpList.add ( *pIO ); } @@ -947,7 +808,7 @@ void cac::uninstallChannel ( nciu & chan ) // the clear channel request to the server when the claim reply // arrives and there is no matching nciu in the client if ( chan.connected() ) { - chan.getPIIU()->clearChannelRequest ( chan.getSID(), chan.getCID() ); + chan.getPIIU()->clearChannelRequest ( guard, chan.getSID(), chan.getCID() ); } } @@ -988,9 +849,11 @@ void cac::uninstallChannel ( nciu & chan ) // o clear channel request // o outstanding callbacks using this channel have completed // o chan destroy exception has been delivered - epicsGuard < epicsMutex > autoMutex ( this->mutex ); - // this destroys the tcpiiu if its the last channel - chan.getPIIU()->detachChannel ( cbGuard, chan ); + { + epicsGuard < cacMutex > guard ( this->mutex ); + // this destroys the tcpiiu if its the last channel + chan.getPIIU()->uninstallChannel ( cbGuard, guard, chan ); + } } } @@ -1009,9 +872,9 @@ int cac::printf ( const char *pformat, ... ) const } // lock must be applied before calling this cac private routine -void cac::flushIfRequired ( epicsGuard < epicsMutex > & guard, netiiu & iiu ) +void cac::flushIfRequired ( epicsGuard < cacMutex > & guard, netiiu & iiu ) { - if ( iiu.flushBlockThreshold() ) { + if ( iiu.flushBlockThreshold ( guard ) ) { iiu.flushRequest (); // the process thread is not permitted to flush as this // can result in a push / pull deadlock on the TCP pipe. @@ -1033,29 +896,29 @@ void cac::flushIfRequired ( epicsGuard < epicsMutex > & guard, netiiu & iiu ) } } else { - iiu.flushRequestIfAboveEarlyThreshold (); + iiu.flushRequestIfAboveEarlyThreshold ( guard ); } } -void cac::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue ) +void cac::writeRequest ( nciu & chan, unsigned type, unsigned nElem, const void * pValue ) { - epicsGuard < epicsMutex > guard ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); this->flushIfRequired ( guard, *chan.getPIIU() ); - chan.getPIIU()->writeRequest ( chan, type, nElem, pValue ); + chan.getPIIU()->writeRequest ( guard, chan, type, nElem, pValue ); } cacChannel::ioid cac::writeNotifyRequest ( nciu &chan, unsigned type, // X aCC 361 unsigned nElem, const void *pValue, cacWriteNotify ¬ifyIn ) { - epicsGuard < epicsMutex > guard ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); autoPtrRecycle < netWriteNotifyIO > pIO ( this->ioTable, chan.cacPrivateListOfIO::eventq, *this, netWriteNotifyIO::factory ( this->freeListWriteNotifyIO, chan, notifyIn ) ); this->ioTable.add ( *pIO ); chan.cacPrivateListOfIO::eventq.add ( *pIO ); this->flushIfRequired ( guard, *chan.getPIIU() ); chan.getPIIU()->writeNotifyRequest ( - chan, *pIO, type, nElem, pValue ); + guard, chan, *pIO, type, nElem, pValue ); return pIO.release()->getId (); } @@ -1063,14 +926,14 @@ cacChannel::ioid cac::readNotifyRequest ( nciu &chan, unsigned type, // X aCC 361 unsigned nElem, cacReadNotify ¬ifyIn ) { - epicsGuard < epicsMutex > guard ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); autoPtrRecycle < netReadNotifyIO > pIO ( this->ioTable, chan.cacPrivateListOfIO::eventq, *this, netReadNotifyIO::factory ( this->freeListReadNotifyIO, chan, notifyIn ) ); this->ioTable.add ( *pIO ); chan.cacPrivateListOfIO::eventq.add ( *pIO ); this->flushIfRequired ( guard, *chan.getPIIU() ); - chan.getPIIU()->readNotifyRequest ( chan, *pIO, type, nElem ); + chan.getPIIU()->readNotifyRequest ( guard, chan, *pIO, type, nElem ); return pIO.release()->getId (); } @@ -1082,7 +945,7 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id ) // but do _not_ hold the callback lock here because this could result // in deadlock { - epicsGuard < epicsMutex > guard ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); pmiu = this->ioTable.remove ( id ); if ( ! pmiu ) { return; @@ -1091,7 +954,7 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id ) if ( pSubscr ) { this->flushIfRequired ( guard, *chan.getPIIU() ); if ( chan.connected() ) { - chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); + chan.getPIIU()->subscriptionCancelRequest ( guard, chan, *pSubscr ); } } // must be uninstalled and also removed from the table @@ -1115,14 +978,14 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id ) } // now it is safe to destroy the IO object { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > autoMutex ( this->mutex ); pmiu->destroy ( *this ); } } void cac::ioShow ( const cacChannel::ioid &id, unsigned level ) const { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.lookup ( id ); if ( pmiu ) { pmiu->show ( level ); @@ -1135,7 +998,7 @@ void cac::ioCompletionNotify ( unsigned id, unsigned type, baseNMIU * pmiu; { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > autoMutex ( this->mutex ); pmiu = this->ioTable.lookup ( id ); if ( ! pmiu ) { return; @@ -1155,7 +1018,7 @@ void cac::ioExceptionNotify ( unsigned id, int status, const char *pContext ) { baseNMIU * pmiu; { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > autoMutex ( this->mutex ); pmiu = this->ioTable.lookup ( id ); } @@ -1179,7 +1042,7 @@ void cac::ioExceptionNotify ( unsigned id, int status, baseNMIU * pmiu; { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > autoMutex ( this->mutex ); pmiu = this->ioTable.lookup ( id ); if ( ! pmiu ) { return; @@ -1197,7 +1060,7 @@ void cac::ioExceptionNotify ( unsigned id, int status, void cac::ioCompletionNotifyAndDestroy ( unsigned id ) { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.remove ( id ); if ( ! pmiu ) { return; @@ -1212,7 +1075,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id ) // it is in use here. // { - epicsGuardRelease < epicsMutex > autoMutexRelease ( autoMutex ); + epicsGuardRelease < cacMutex > autoMutexRelease ( autoMutex ); pmiu->completion (); } @@ -1222,7 +1085,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id ) void cac::ioCompletionNotifyAndDestroy ( unsigned id, unsigned type, arrayElementCount count, const void *pData ) { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.remove ( id ); if ( ! pmiu ) { return; @@ -1236,7 +1099,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id, // it is in use here. // { - epicsGuardRelease < epicsMutex > autoMutexRelease ( autoMutex ); + epicsGuardRelease < cacMutex > autoMutexRelease ( autoMutex ); pmiu->completion ( type, count, pData ); } @@ -1246,7 +1109,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id, void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext ) { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.remove ( id ); if ( ! pmiu ) { return; @@ -1260,7 +1123,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, // it is in use here. // { - epicsGuardRelease < epicsMutex > autoMutexRelease ( autoMutex ); + epicsGuardRelease < cacMutex > autoMutexRelease ( autoMutex ); pmiu->exception ( status, pContext ); } @@ -1270,7 +1133,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext, unsigned type, arrayElementCount count ) { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.remove ( id ); if ( ! pmiu ) { return; @@ -1285,7 +1148,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, // { - epicsGuardRelease < epicsMutex > autoMutexRelease ( autoMutex ); + epicsGuardRelease < cacMutex > autoMutexRelease ( autoMutex ); pmiu->exception ( status, pContext, type, count ); } @@ -1293,8 +1156,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, } // resubscribe for monitors from this channel -// (lock must be applied) -void cac::connectAllIO ( nciu & chan ) +void cac::connectAllIO ( epicsGuard < cacMutex > & guard, nciu & chan ) { tsDLIterBD < baseNMIU > pNetIO = chan.cacPrivateListOfIO::eventq.firstIter (); @@ -1305,7 +1167,7 @@ void cac::connectAllIO ( nciu & chan ) // disconnected channels should have only subscription IO attached assert ( pSubscr ); try { - chan.getPIIU()->subscriptionRequest ( chan, *pSubscr ); + chan.getPIIU()->subscriptionRequest ( guard, chan, *pSubscr ); } catch ( ... ) { this->printf ( "CAC: failed to send subscription request during channel connect\n" ); @@ -1317,7 +1179,7 @@ void cac::connectAllIO ( nciu & chan ) // cancel IO operations and monitor subscriptions // -- callback lock and cac lock must be applied here -void cac::disconnectAllIO ( epicsGuard < epicsMutex > &locker, nciu & chan, bool enableCallbacks ) +void cac::disconnectAllIO ( epicsGuard < cacMutex > &locker, nciu & chan, bool enableCallbacks ) { tsDLIterBD pNetIO = chan.cacPrivateListOfIO::eventq.firstIter(); while ( pNetIO.valid() ) { @@ -1330,8 +1192,8 @@ void cac::disconnectAllIO ( epicsGuard < epicsMutex > &locker, nciu & chan, bool } if ( enableCallbacks ) { char buf[128]; - sprintf ( buf, "host = %100s", chan.pHostName() ); - epicsGuardRelease < epicsMutex > unlocker ( locker ); + sprintf ( buf, "host = %.100s", chan.pHostName() ); + epicsGuardRelease < cacMutex > unlocker ( locker ); pNetIO->exception ( ECA_DISCONN, buf ); } if ( ! pNetIO->isSubscription() ) { @@ -1361,7 +1223,7 @@ cac::subscriptionRequest ( nciu &chan, unsigned type, // X aCC 361 arrayElementCount nElem, unsigned mask, cacStateNotify ¬ifyIn ) { - epicsGuard < epicsMutex > guard ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); autoPtrRecycle < netSubscription > pIO ( this->ioTable, chan.cacPrivateListOfIO::eventq, *this, netSubscription::factory ( this->freeListSubscription, @@ -1370,7 +1232,7 @@ cac::subscriptionRequest ( nciu &chan, unsigned type, // X aCC 361 chan.cacPrivateListOfIO::eventq.add ( *pIO ); if ( chan.connected () ) { this->flushIfRequired ( guard, *chan.getPIIU() ); - chan.getPIIU()->subscriptionRequest ( chan, *pIO ); + chan.getPIIU()->subscriptionRequest ( guard, chan, *pIO ); } cacChannel::ioid id = pIO->getId (); pIO.release (); @@ -1513,7 +1375,7 @@ bool cac::defaultExcep ( epicsGuard < callbackMutex > &, tcpiiu &iiu, char buf[512]; char hostName[64]; iiu.hostName ( hostName, sizeof ( hostName ) ); - sprintf ( buf, "host=%64s ctx=%400s", hostName, pCtx ); + sprintf ( buf, "host=%s ctx=%.400s", hostName, pCtx ); this->notify.exception ( status, buf, 0, 0u ); return true; } @@ -1611,7 +1473,7 @@ bool cac::accessRightsRespAction ( epicsGuard < callbackMutex > &, tcpiiu &, { nciu * pChan; { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); pChan = this->chanTable.lookup ( hdr.m_cid ); if ( pChan ) { unsigned ar = hdr.m_available; @@ -1640,7 +1502,7 @@ bool cac::claimCIURespAction ( epicsGuard < callbackMutex > &, tcpiiu & iiu, nciu * pChan; { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); pChan = this->chanTable.lookup ( hdr.m_cid ); if ( pChan ) { unsigned sidTmp; @@ -1651,12 +1513,12 @@ bool cac::claimCIURespAction ( epicsGuard < callbackMutex > &, tcpiiu & iiu, sidTmp = pChan->getSID (); } pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp, iiu.ca_v41_ok() ); - this->connectAllIO ( *pChan ); + this->connectAllIO ( guard, *pChan ); } else if ( iiu.ca_v44_ok() ) { // this indicates a claim response for a resource that does // not exist in the client - so just remove it from the server - iiu.clearChannelRequest ( hdr.m_available, hdr.m_cid ); + iiu.clearChannelRequest ( guard, hdr.m_available, hdr.m_cid ); } } // the callback lock is taken when a channel is unistalled or when @@ -1668,37 +1530,32 @@ bool cac::claimCIURespAction ( epicsGuard < callbackMutex > &, tcpiiu & iiu, } bool cac::verifyAndDisconnectChan ( - epicsGuard < callbackMutex > & cbMutexIn, tcpiiu & /* iiu */, + epicsGuard < callbackMutex > & cbGuard, tcpiiu & /* iiu */, const caHdrLargeArray & hdr, void * /* pMsgBdy */ ) { - epicsGuard < epicsMutex > autoMutex ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); nciu * pChan = this->chanTable.lookup ( hdr.m_cid ); if ( ! pChan ) { return true; } assert ( this->pudpiiu ); - this->disconnectChannelPrivate ( cbMutexIn, autoMutex, - *pChan, *this->pudpiiu ); - this->pSearchTmr->resetPeriod ( 0.0 ); + this->disconnectChannel ( cbGuard, guard, *pChan ); + if ( this->pudpiiu ) { + this->pudpiiu->resetSearchTimerPeriod ( 0.0 ); + } return true; } -void cac::disconnectChannelPrivate ( epicsGuard < callbackMutex > & cbLocker, - epicsGuard < epicsMutex > & locker, - nciu & chan, netiiu & dstIIU ) +void cac::disconnectChannel ( epicsGuard < callbackMutex > & cbLocker, + epicsGuard < cacMutex > & locker, + nciu & chan ) { this->disconnectAllIO ( locker, chan, true ); - chan.getPIIU()->detachChannel ( cbLocker, chan ); - chan.disconnect ( limboIIU ); - limboIIU.attachChannel ( chan ); - { - epicsGuardRelease < epicsMutex > autoMutexRelease ( locker ); - chan.connectStateNotify (); - chan.accessRightsNotify (); - } - limboIIU.detachChannel ( cbLocker, chan ); - chan.disconnect ( dstIIU ); - dstIIU.attachChannel ( chan ); + chan.disconnect ( *this->pudpiiu ); + this->pudpiiu->installChannel ( chan ); + epicsGuardRelease < cacMutex > autoMutexRelease ( locker ); + chan.connectStateNotify (); + chan.accessRightsNotify (); } bool cac::badTCPRespAction ( epicsGuard < callbackMutex > &, tcpiiu & iiu, @@ -1785,7 +1642,7 @@ void cac::vSignal ( int ca_status, const char *pfilenm, void cac::incrementOutstandingIO () { - epicsGuard < epicsMutex > locker ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); if ( this->pndRecvCnt < UINT_MAX ) { this->pndRecvCnt++; } @@ -1799,7 +1656,7 @@ void cac::decrementOutstandingIO () bool signalNeeded; { - epicsGuard < epicsMutex > locker ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); if ( this->pndRecvCnt > 0u ) { this->pndRecvCnt--; if ( this->pndRecvCnt == 0u ) { @@ -1824,7 +1681,7 @@ void cac::decrementOutstandingIO ( unsigned sequenceNo ) bool signalNeeded; { - epicsGuard < epicsMutex > locker ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); if ( this->readSeq == sequenceNo ) { if ( this->pndRecvCnt > 0u ) { this->pndRecvCnt--; @@ -1873,15 +1730,14 @@ void cac::tcpCircuitShutdown ( tcpiiu & iiu, bool discardPendingMessages ) // will get called in preemptive callback disabled // applications, and therefore the callback lock below // will not block - { - epicsGuard < epicsMutex > autoMutexCAC ( this->mutex ); - if ( this->pudpiiu ) { - this->pudpiiu->wakeupMsg (); - } + if ( this->pudpiiu ) { + this->pudpiiu->wakeupMsg (); + } + { + epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); + epicsGuard < cacMutex > guard ( this->mutex ); + iiu.shutdown ( cbGuard, guard, discardPendingMessages ); } - epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); - epicsGuard < epicsMutex > guard ( this->mutex ); - iiu.shutdown ( cbGuard, discardPendingMessages ); } void cac::uninstallIIU ( tcpiiu & iiu ) @@ -1890,9 +1746,9 @@ void cac::uninstallIIU ( tcpiiu & iiu ) this->privateUninstallIIU ( cbGuard, iiu ); } -void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbMutexIn, tcpiiu & iiu ) +void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbGuard, tcpiiu & iiu ) { - epicsGuard < epicsMutex > autoMutexCAC ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); if ( iiu.channelCount() ) { char hostNameTmp[64]; iiu.hostName ( hostNameTmp, sizeof ( hostNameTmp ) ); @@ -1912,11 +1768,11 @@ void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbMutexIn, tcpiiu this->serverTable.remove ( iiu ); assert ( this->pudpiiu ); - this->removeAllChan ( cbMutexIn, autoMutexCAC, iiu, *this->pudpiiu ); + iiu.removeAllChannels ( cbGuard, guard, *this ); delete &iiu; - this->pSearchTmr->resetPeriod ( 0.0 ); + this->pudpiiu->resetSearchTimerPeriod ( 0.0 ); // signal iiu uninstal event so that cac can properly shut down this->iiuUninstall.signal(); @@ -1924,7 +1780,7 @@ void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbMutexIn, tcpiiu double cac::beaconPeriod ( const nciu & chan ) const { - epicsGuard < epicsMutex > locker ( this->mutex ); + epicsGuard < cacMutex > guard ( this->mutex ); const netiiu * pIIU = chan.getConstPIIU (); if ( pIIU ) { osiSockAddr addr = pIIU->getNetworkAddress (); @@ -1938,3 +1794,10 @@ double cac::beaconPeriod ( const nciu & chan ) const } return - DBL_MAX; } + +void cac::initiateConnect ( nciu & chan ) +{ + assert ( this->pudpiiu ); + this->pudpiiu->installChannel ( chan ); +} + diff --git a/src/ca/cac.h b/src/ca/cac.h index cda795e90..49cf89683 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -75,11 +75,27 @@ private: epicsEvent noRecvThreadsPending; unsigned recvThreadsPendingCount; bool threadsMayBeBlockingForRecvThreadsToFinish; - //callbackMutex ( callbackMutex & ); - //callbackMutex & operator = ( callbackMutex & ); + callbackMutex ( callbackMutex & ); + callbackMutex & operator = ( callbackMutex & ); }; -class cac : private cacRecycle +class cacMutex { +public: + void lock (); + void unlock (); + void show ( unsigned level ) const; +private: + epicsMutex mutex; +}; + +class cacDisconnectChannelPrivate { +public: + virtual void disconnectChannel ( + epicsGuard < callbackMutex > &, + epicsGuard < cacMutex > &, nciu & chan ) = 0; +}; + +class cac : private cacRecycle, private cacDisconnectChannelPrivate { public: cac ( cacNotify &, bool enablePreemptiveCallback = false ); @@ -107,7 +123,6 @@ public: void ioShow ( const cacChannel::ioid &id, unsigned level ) const; // channel routines - void installNetworkChannel ( nciu &, netiiu *&piiu ); bool lookupChannelAndTransferToTCP ( epicsGuard < callbackMutex > &, unsigned cid, unsigned sid, @@ -118,6 +133,7 @@ public: cacChannel & createChannel ( const char *name_str, cacChannelNotify &chan, cacChannel::priLev pri ); void registerService ( cacService &service ); + void initiateConnect ( nciu & ); // IO request stubs void writeRequest ( nciu &, unsigned type, @@ -162,7 +178,7 @@ public: // misc const char * userNamePointer () const; unsigned getInitializingThreadsPriority () const; - epicsMutex & mutexRef (); + cacMutex & mutexRef (); void attachToClientCtx (); void selfTest () const; void notifyNewFD ( epicsGuard < callbackMutex > &, SOCKET ) const; @@ -202,15 +218,12 @@ private: // callback lock must always be acquired before // the primary mutex if both locks are needed callbackMutex cbMutex; - mutable epicsMutex mutex; + mutable cacMutex mutex; epicsEvent ioDone; epicsEvent iiuUninstall; epicsTimerQueueActive & timerQueue; char * pUserName; class udpiiu * pudpiiu; - class searchTimer * pSearchTmr; - class repeaterSubscribeTimer - * pRepeaterSubscribeTmr; void * tcpSmallRecvBufFreeList; void * tcpLargeRecvBufFreeList; epicsGuard * pCallbackGuard; @@ -224,20 +237,15 @@ private: void privateUninstallIIU ( epicsGuard < callbackMutex > &, tcpiiu &iiu ); void flushRequestPrivate (); void run (); - bool setupUDP (); - void connectAllIO ( nciu &chan ); - void disconnectAllIO ( epicsGuard < epicsMutex > & locker, nciu & chan, bool enableCallbacks ); - void flushIfRequired ( epicsGuard < epicsMutex > &, netiiu & ); + void connectAllIO ( epicsGuard < cacMutex > &, nciu &chan ); + void disconnectAllIO ( epicsGuard < cacMutex > & locker, nciu & chan, bool enableCallbacks ); + void flushIfRequired ( epicsGuard < cacMutex > &, netiiu & ); void recycleReadNotifyIO ( netReadNotifyIO &io ); void recycleWriteNotifyIO ( netWriteNotifyIO &io ); void recycleSubscription ( netSubscription &io ); - void removeAllChan ( - epicsGuard < callbackMutex > & cbLocker, epicsGuard < epicsMutex > &locker, - netiiu & srcIIU, netiiu & dstIIU ); - void disconnectChannelPrivate ( - epicsGuard < callbackMutex > &, epicsGuard < epicsMutex > &, - nciu & chan, netiiu & dstIIU ); + void disconnectChannel ( + epicsGuard < callbackMutex > &, epicsGuard < cacMutex > &, nciu & chan ); void ioCompletionNotify ( unsigned id, unsigned type, arrayElementCount count, const void *pData ); @@ -327,7 +335,7 @@ inline unsigned cac::sequenceNumberOfOutstandingIO () const return this->readSeq; } -inline epicsMutex & cac::mutexRef () +inline cacMutex & cac::mutexRef () { return this->mutex; } @@ -387,5 +395,20 @@ inline bool cac::preemptiveCallbakIsEnabled () const return ! this->pCallbackGuard; } +inline void cacMutex::lock () +{ + this->mutex.lock (); +} + +inline void cacMutex::unlock () +{ + this->mutex.unlock (); +} + +inline void cacMutex::show ( unsigned level ) const +{ + this->mutex.show ( level ); +} + #endif // ifdef cach