From 7b7a07c667b1620303873edb5c5fba0c1926254d Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Thu, 23 Sep 2004 23:15:22 +0000 Subject: [PATCH] fixes for bugs 133 and 134 in Mantis --- src/ca/Makefile | 1 + src/ca/access.cpp | 16 +- src/ca/caRepeater.cpp | 1 - src/ca/ca_client_context.cpp | 22 +- src/ca/cac.cpp | 67 ++- src/ca/cac.h | 31 +- src/ca/cacIO.h | 4 +- src/ca/disconnectGovernorTimer.cpp | 69 +++- src/ca/disconnectGovernorTimer.h | 24 +- src/ca/nciu.cpp | 103 +++-- src/ca/nciu.h | 65 ++- src/ca/netiiu.cpp | 26 +- src/ca/netiiu.h | 18 +- src/ca/noopiiu.cpp | 166 ++++++++ src/ca/noopiiu.h | 92 +++++ src/ca/oldAccess.h | 10 +- src/ca/oldChannelNotify.cpp | 9 +- src/ca/repeaterSubscribeTimer.cpp | 32 +- src/ca/repeaterSubscribeTimer.h | 21 +- src/ca/searchTimer.cpp | 628 +++++++++++++---------------- src/ca/searchTimer.h | 72 ++-- src/ca/tcpRecvWatchdog.cpp | 44 +- src/ca/tcpRecvWatchdog.h | 2 + src/ca/tcpSendWatchdog.cpp | 5 +- src/ca/tcpiiu.cpp | 115 ++++-- src/ca/udpiiu.cpp | 462 ++++++++++----------- src/ca/udpiiu.h | 201 +++++---- src/ca/virtualCircuit.h | 20 +- 28 files changed, 1386 insertions(+), 940 deletions(-) create mode 100644 src/ca/noopiiu.cpp create mode 100644 src/ca/noopiiu.h diff --git a/src/ca/Makefile b/src/ca/Makefile index 75e9d1787..2ec5847a1 100644 --- a/src/ca/Makefile +++ b/src/ca/Makefile @@ -46,6 +46,7 @@ LIBSRCS += nciu.cpp LIBSRCS += netiiu.cpp LIBSRCS += udpiiu.cpp LIBSRCS += tcpiiu.cpp +LIBSRCS += noopiiu.cpp LIBSRCS += netReadNotifyIO.cpp LIBSRCS += netWriteNotifyIO.cpp LIBSRCS += netSubscription.cpp diff --git a/src/ca/access.cpp b/src/ca/access.cpp index 3826e2a46..ffccc87a9 100644 --- a/src/ca/access.cpp +++ b/src/ca/access.cpp @@ -23,7 +23,8 @@ #include #include -#include + +#include "epicsExit.h" #define epicsAssertAuthor "Jeff Hill johill@lanl.gov" @@ -43,6 +44,7 @@ #include "iocinf.h" #include "oldAccess.h" #include "cac.h" +#include "autoPtrFreeList.h" epicsThreadPrivateId caClientContextId; @@ -364,10 +366,18 @@ int epicsShareAPI ca_create_channel ( int epicsShareAPI ca_clear_channel ( chid pChan ) { ca_client_context & cac = pChan->getClientCtx (); - cac.destroyChannel ( *pChan ); + epicsGuard < epicsMutex > * pCBGuard = cac.pCallbackGuard.get(); + if ( pCBGuard ) { + epicsGuard < epicsMutex > guard ( cac.mutex ); + cac.destroyChannel ( *pCBGuard, guard, *pChan ); + } + else { + epicsGuard < epicsMutex > cbGuard ( cac.cbMutex ); + epicsGuard < epicsMutex > guard ( cac.mutex ); + cac.destroyChannel ( cbGuard, guard, *pChan ); + } return ECA_NORMAL; } -#include "autoPtrFreeList.h" /* * ca_array_get () diff --git a/src/ca/caRepeater.cpp b/src/ca/caRepeater.cpp index 2044b9a92..7d9806658 100644 --- a/src/ca/caRepeater.cpp +++ b/src/ca/caRepeater.cpp @@ -38,7 +38,6 @@ int main() { ca_repeater (); - assert ( 0 ); return ( 0 ); } diff --git a/src/ca/ca_client_context.cpp b/src/ca/ca_client_context.cpp index a7a89ef97..17a7de87e 100644 --- a/src/ca/ca_client_context.cpp +++ b/src/ca/ca_client_context.cpp @@ -28,8 +28,8 @@ #endif #include - #include + #include "epicsExit.h" #define epicsExportSharedSymbols @@ -187,11 +187,11 @@ ca_client_context::~ca_client_context () } } -void ca_client_context::destroyChannelPrivate ( - oldChannelNotify & chan, - epicsGuard < epicsMutex > & cbGuard ) +void ca_client_context::destroyChannel ( + epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard, + oldChannelNotify & chan ) { - epicsGuard < epicsMutex > guard ( this->mutex ); try { chan.eliminateExcessiveSendBacklog ( &cbGuard, guard ); @@ -203,18 +203,6 @@ void ca_client_context::destroyChannelPrivate ( this->oldChannelNotifyFreeList.release ( & chan ); } -void ca_client_context::destroyChannel ( oldChannelNotify & chan ) -{ - epicsGuard < epicsMutex > * pCBGuard = this->pCallbackGuard.get(); - if ( pCBGuard ) { - destroyChannelPrivate ( chan, *pCBGuard ); - } - else { - epicsGuard < epicsMutex > cbGuard ( this->cbMutex ); - destroyChannelPrivate ( chan, cbGuard ); - } -} - void ca_client_context::destroyGetCopy ( epicsGuard < epicsMutex > & guard, getCopy & gc ) { diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 26b0a6728..fb78cf169 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -44,6 +44,7 @@ #include "bhe.h" #include "net_convert.h" #include "autoPtrFreeList.h" +#include "noopIIU.h" static const char *pVersionCAC = "@(#) " EPICS_VERSION_STRING @@ -232,17 +233,17 @@ cac::~cac () // waiting for the UDP thread to exit while it is waiting to // get the lock. if ( this->pudpiiu ) { - this->pudpiiu->shutdown (); + epicsGuard < epicsMutex > cbGuard ( this->cbMutex ); + epicsGuard < epicsMutex > guard ( this->mutex ); + this->pudpiiu->shutdown ( cbGuard, guard ); // // shutdown all tcp circuits // - epicsGuard < epicsMutex > cbGuard ( this->cbMutex ); - epicsGuard < epicsMutex > guard ( this->mutex ); tsDLIter < tcpiiu > iter = this->circuitList.firstIter (); while ( iter.valid() ) { // this causes a clean shutdown to occur - iter->removeAllChannels ( true, cbGuard, guard, *this->pudpiiu ); + iter->unlinkAllChannels ( cbGuard, guard ); iter++; } } @@ -425,7 +426,7 @@ void cac::beaconNotify ( const inetAddrID & addr, const epicsTime & currentTime, this->beaconAnomalyCount++; - this->pudpiiu->beaconAnomalyNotify ( guard, currentTime ); + this->pudpiiu->beaconAnomalyNotify ( guard ); # ifdef DEBUG { @@ -457,21 +458,19 @@ cacChannel & cac::createChannel ( } nciu * pNetChan = new ( this->channelFreeList ) - nciu ( *this, *this->pudpiiu, chan, pName, pri ); + nciu ( *this, noopIIU, chan, pName, pri ); this->chanTable.idAssignAdd ( *pNetChan ); return *pNetChan; } -bool cac::transferChanToVirtCircuit ( +void cac::transferChanToVirtCircuit ( epicsGuard < epicsMutex > & cbGuard, unsigned cid, unsigned sid, // X aCC 431 ca_uint16_t typeCode, arrayElementCount count, - unsigned minorVersionNumber, const osiSockAddr & addr ) + unsigned minorVersionNumber, const osiSockAddr & addr, + const epicsTime & currentTime ) { - bool newIIU = false; - tcpiiu * piiu = 0; - if ( addr.sa.sa_family != AF_INET ) { - return false; + return ; } epicsGuard < epicsMutex > guard ( this->mutex ); @@ -481,12 +480,12 @@ bool cac::transferChanToVirtCircuit ( */ nciu * pChan = this->chanTable.lookup ( cid ); if ( ! pChan ) { - return false; + return; } /* - * Ignore duplicate search replies - */ + * Ignore duplicate search replies + */ osiSockAddr chanAddr = pChan->getPIIU(guard)->getNetworkAddress (guard); if ( chanAddr.sa.sa_family != AF_UNSPEC ) { if ( ! sockAddrAreIdentical ( &addr, &chanAddr ) ) { @@ -497,17 +496,18 @@ bool cac::transferChanToVirtCircuit ( *this, pChan->pName ( guard ), acc ); pMsg->ioInitiate ( addr ); } - return false; + return; } /* * look for an existing virtual circuit */ + bool newIIU = false; caServerID servID ( addr.ia, pChan->getPriority(guard) ); - piiu = this->serverTable.lookup ( servID ); + tcpiiu * piiu = this->serverTable.lookup ( servID ); if ( piiu ) { if ( ! piiu->alive ( guard ) ) { - return false; + return; } } else { @@ -523,7 +523,7 @@ bool cac::transferChanToVirtCircuit ( pBHE = new ( this->bheFreeList ) bhe ( this->mutex, epicsTime (), 0u, addr.ia ); if ( this->beaconTable.add ( *pBHE ) < 0 ) { - return false; + return; } } this->serverTable.add ( *pnewiiu ); @@ -533,17 +533,20 @@ bool cac::transferChanToVirtCircuit ( newIIU = true; } catch ( std::bad_alloc & ) { - return false; + return; } catch ( ... ) { errlogPrintf ( "CAC: Unexpected exception during virtual circuit creation\n" ); - return false; + return; } } - this->pudpiiu->uninstallChan ( cbGuard, guard, *pChan ); - piiu->installChannel ( cbGuard, guard, *pChan, sid, typeCode, count ); + // must occur before moving to new iiu + pChan->getPIIU(guard)->uninstallChanDueToSuccessfulSearchResponse ( + guard, *pChan, currentTime ); + piiu->installChannel ( + cbGuard, guard, *pChan, sid, typeCode, count ); if ( ! piiu->ca_v42_ok ( guard ) ) { // connect to old server with lock applied @@ -553,8 +556,6 @@ bool cac::transferChanToVirtCircuit ( if ( newIIU ) { piiu->start ( guard ); } - - return true; } void cac::destroyChannel ( @@ -1081,21 +1082,19 @@ bool cac::verifyAndDisconnectChan ( if ( ! pChan ) { return true; } - this->disconnectChannel ( currentTime, mgr.cbGuard, guard, *pChan ); + this->disconnectChannel ( mgr.cbGuard, guard, *pChan ); return true; } void cac::disconnectChannel ( - const epicsTime & /* currentTime */, epicsGuard < epicsMutex > & cbGuard, // X aCC 431 epicsGuard < epicsMutex > & guard, nciu & chan ) { guard.assertIdenticalMutex ( this->mutex ); assert ( this->pudpiiu ); chan.disconnectAllIO ( cbGuard, guard ); - chan.getPIIU(guard)->uninstallChan ( cbGuard, guard, chan ); - this->pudpiiu->installDisconnectedChannel ( chan ); - chan.setServerAddressUnknown ( *this->pudpiiu, guard ); + chan.getPIIU(guard)->uninstallChan ( guard, chan ); + this->pudpiiu->installDisconnectedChannel ( guard, chan ); chan.unresponsiveCircuitNotify ( cbGuard, guard ); } @@ -1153,7 +1152,7 @@ void cac::destroyIIU ( tcpiiu & iiu ) } assert ( this->pudpiiu ); - iiu.removeAllChannels ( false, cbGuard, guard, *this->pudpiiu ); + iiu.disconnectAllChannels ( cbGuard, guard, *this->pudpiiu ); this->serverTable.remove ( iiu ); this->circuitList.remove ( iiu ); @@ -1191,12 +1190,12 @@ double cac::beaconPeriod ( } void cac::initiateConnect ( - epicsGuard < epicsMutex > & guard, nciu & chan ) + epicsGuard < epicsMutex > & guard, + nciu & chan, netiiu * & piiu ) { guard.assertIdenticalMutex ( this->mutex ); assert ( this->pudpiiu ); - this->pudpiiu->installNewChannel ( - epicsTime::getCurrent(), chan ); + this->pudpiiu->installNewChannel ( guard, chan, piiu ); } void *cacComBufMemoryManager::allocate ( size_t size ) diff --git a/src/ca/cac.h b/src/ca/cac.h index fbcde6344..a5bfe44cf 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -85,14 +85,6 @@ private: cacComBufMemoryManager & operator = ( const cacComBufMemoryManager & ); }; -class cacDisconnectChannelPrivate { // X aCC 655 -public: - virtual void disconnectChannel ( - const epicsTime & currentTime, - epicsGuard < epicsMutex > & cbGuard, - epicsGuard < epicsMutex > & guard, nciu & chan ) = 0; -}; - class notifyGuard { public: notifyGuard ( cacContextNotify & ); @@ -114,7 +106,6 @@ public: class cac : public cacContext, private cacRecycle, - private cacDisconnectChannelPrivate, private callbackForMultiplyDefinedPV { public: @@ -136,15 +127,12 @@ public: const epicsTime & currentTime, caHdrLargeArray &, char *pMsgBody ); // channel routines - bool transferChanToVirtCircuit ( + void transferChanToVirtCircuit ( epicsGuard < epicsMutex > &, unsigned cid, unsigned sid, ca_uint16_t typeCode, arrayElementCount count, - unsigned minorVersionNumber, const osiSockAddr & ); - void disconnectAllChannels ( - epicsGuard < epicsMutex > & callbackControlGuard, - epicsGuard < epicsMutex > & mutualExclusionGuard, - tcpiiu & ); + unsigned minorVersionNumber, const osiSockAddr &, + const epicsTime & currentTime ); cacChannel & createChannel ( epicsGuard < epicsMutex > & guard, const char * pChannelName, cacChannelNotify &, cacChannel::priLev ); @@ -153,7 +141,7 @@ public: epicsGuard < epicsMutex > & mutualExclusionGuard, nciu & ); void initiateConnect ( - epicsGuard < epicsMutex > &, nciu & ); + epicsGuard < epicsMutex > &, nciu &, netiiu * & ); // IO requests void writeRequest ( epicsGuard < epicsMutex > &, nciu &, unsigned type, @@ -293,7 +281,6 @@ private: epicsGuard < epicsMutex > &, netSubscription &io ); void disconnectChannel ( - const epicsTime & currentTime, epicsGuard < epicsMutex > & cbGuard, epicsGuard < epicsMutex > & guard, nciu & chan ); @@ -423,16 +410,6 @@ inline unsigned cac::beaconAnomaliesSinceProgramStart ( return this->beaconAnomalyCount; } -inline void cac::disconnectAllChannels ( - epicsGuard < epicsMutex > & cbGuard, - epicsGuard < epicsMutex > & guard, - tcpiiu & iiu ) -{ - cbGuard.assertIdenticalMutex ( this->cbMutex ); - guard.assertIdenticalMutex ( this->mutex ); - iiu.removeAllChannels ( false, cbGuard, guard, *this->pudpiiu ); -} - inline notifyGuard::notifyGuard ( cacContextNotify & notifyIn ) : notify ( notifyIn ) { diff --git a/src/ca/cacIO.h b/src/ca/cacIO.h index 7cd4a8ddd..eecd1e44a 100644 --- a/src/ca/cacIO.h +++ b/src/ca/cacIO.h @@ -137,7 +137,9 @@ public: virtual ~cacChannelNotify () = 0; virtual void connectNotify ( epicsGuard < epicsMutex > & ) = 0; virtual void disconnectNotify ( epicsGuard < epicsMutex > & ) = 0; - virtual void serviceShutdownNotify () = 0; + virtual void serviceShutdownNotify ( + epicsGuard < epicsMutex > & callbackControlGuard, + epicsGuard < epicsMutex > & mutualExclusionGuard ) = 0; virtual void accessRightsNotify ( epicsGuard < epicsMutex > &, const caAccessRights & ) = 0; virtual void exception ( diff --git a/src/ca/disconnectGovernorTimer.cpp b/src/ca/disconnectGovernorTimer.cpp index 54bf37d57..23f3c9f09 100644 --- a/src/ca/disconnectGovernorTimer.cpp +++ b/src/ca/disconnectGovernorTimer.cpp @@ -24,15 +24,17 @@ #define epicsExportSharedSymbols #include "disconnectGovernorTimer.h" #include "udpiiu.h" +#include "nciu.h" -static const double period = 10.0; // sec +static const double disconnectGovernorPeriod = 10.0; // sec disconnectGovernorTimer::disconnectGovernorTimer ( - udpiiu & iiuIn, epicsTimerQueue & queueIn ) : - timer ( queueIn.createTimer () ), + disconnectGovernorNotify & iiuIn, + epicsTimerQueue & queueIn, + epicsMutex & mutexIn ) : + mutex ( mutexIn ), timer ( queueIn.createTimer () ), iiu ( iiuIn ) { - this->timer.start ( *this, period ); } disconnectGovernorTimer::~disconnectGovernorTimer () @@ -40,18 +42,65 @@ disconnectGovernorTimer::~disconnectGovernorTimer () this->timer.destroy (); } -void disconnectGovernorTimer::shutdown () +void disconnectGovernorTimer:: start () { - this->timer.cancel (); + this->timer.start ( *this, disconnectGovernorPeriod ); } -epicsTimerNotify::expireStatus disconnectGovernorTimer::expire ( const epicsTime & currentTime ) // X aCC 361 +void disconnectGovernorTimer::shutdown ( + epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard ) { - this->iiu.govExpireNotify ( currentTime ); - return expireStatus ( restart, period ); + epicsGuardRelease < epicsMutex > unguard ( guard ); + { + epicsGuardRelease < epicsMutex > unguard ( cbGuard ); + this->timer.cancel (); + } + while ( nciu * pChan = this->chanList.get () ) { + pChan->channelNode::listMember = + channelNode::cs_none; + pChan->serviceShutdownNotify ( cbGuard, guard ); + } } -void disconnectGovernorTimer::show ( unsigned /* level */ ) const +epicsTimerNotify::expireStatus disconnectGovernorTimer::expire ( + const epicsTime & currentTime ) // X aCC 361 { + epicsGuard < epicsMutex > guard ( this->mutex ); + while ( nciu * pChan = chanList.get () ) { + pChan->channelNode::listMember = + channelNode::cs_none; + this->iiu.govExpireNotify ( guard, *pChan ); + } + return expireStatus ( restart, disconnectGovernorPeriod ); } +void disconnectGovernorTimer::show ( unsigned level ) const +{ + epicsGuard < epicsMutex > guard ( this->mutex ); + ::printf ( "disconnect governor timer:\n" ); + tsDLIterConst < nciu > pChan = this->chanList.firstIter (); + while ( pChan.valid () ) { + pChan->show ( level - 1u ); + pChan++; + } +} + +void disconnectGovernorTimer::installChan ( + epicsGuard < epicsMutex > & guard, nciu & chan ) +{ + guard.assertIdenticalMutex ( this->mutex ); + this->chanList.add ( chan ); + chan.channelNode::listMember = channelNode::cs_disconnGov; +} + +void disconnectGovernorTimer::uninstallChan ( + epicsGuard < epicsMutex > & guard, nciu & chan ) +{ + guard.assertIdenticalMutex ( this->mutex ); + this->chanList.remove ( chan ); + chan.channelNode::listMember = channelNode::cs_none; +} + +disconnectGovernorNotify::~disconnectGovernorNotify () {} + diff --git a/src/ca/disconnectGovernorTimer.h b/src/ca/disconnectGovernorTimer.h index b4608d59a..6b0de98d4 100644 --- a/src/ca/disconnectGovernorTimer.h +++ b/src/ca/disconnectGovernorTimer.h @@ -42,16 +42,34 @@ #endif #include "caProto.h" +#include "netiiu.h" + +class disconnectGovernorNotify { +public: + virtual ~disconnectGovernorNotify () = 0; + virtual void govExpireNotify ( + epicsGuard < epicsMutex > &, nciu & ) = 0; +}; class disconnectGovernorTimer : private epicsTimerNotify { public: - disconnectGovernorTimer ( class udpiiu &, epicsTimerQueue & ); + disconnectGovernorTimer ( + class disconnectGovernorNotify &, epicsTimerQueue &, epicsMutex & ); virtual ~disconnectGovernorTimer (); + void start (); + void shutdown ( + epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard ); + void installChan ( + epicsGuard < epicsMutex > &, nciu & ); + void uninstallChan ( + epicsGuard < epicsMutex > &, nciu & ); void show ( unsigned level ) const; - void shutdown (); private: + tsDLList < nciu > chanList; + epicsMutex & mutex; epicsTimer & timer; - class udpiiu & iiu; + class disconnectGovernorNotify & iiu; epicsTimerNotify::expireStatus expire ( const epicsTime & currentTime ); disconnectGovernorTimer ( const disconnectGovernorTimer & ); disconnectGovernorTimer & operator = ( const disconnectGovernorTimer & ); diff --git a/src/ca/nciu.cpp b/src/ca/nciu.cpp index 5f7c0dbcf..82d22df8c 100644 --- a/src/ca/nciu.cpp +++ b/src/ca/nciu.cpp @@ -18,14 +18,6 @@ * * Author: Jeff Hill * - * Notes: - * 1) This class has a pointer to the IIU. This pointer always points at - * a valid IIU. If the client context is deleted then the channel points at a - * static file scope IIU. IIU's that disconnect go into an inactive state - * and are stored on a list for later reuse. When the channel calls a - * member function of the IIU, the IIU verifies that the channel's IIU - * pointer is still pointing at itself only after it has acquired the IIU - * lock. */ #include @@ -34,6 +26,8 @@ #define epicsAssertAuthor "Jeff Hill johill@lanl.gov" +#include "epicsAlgorithm.h" + #define epicsExportSharedSymbols #include "iocinf.h" #include "cac.h" @@ -42,6 +36,7 @@ #include "virtualCircuit.h" #include "cadef.h" #include "db_access.h" // for INVALID_DB_REQ +#include "noopIIU.h" nciu::nciu ( cac & cacIn, netiiu & iiuIn, cacChannelNotify & chanIn, const char *pNameIn, cacChannel::priLev pri ) : @@ -97,8 +92,7 @@ void nciu::destroy ( mutualExclusionGuard, this->sid, this->id ); } - this->piiu->uninstallChan ( - callbackControlGuard, mutualExclusionGuard, *this ); + this->piiu->uninstallChan ( mutualExclusionGuard, *this ); this->cacCtx.destroyChannel ( callbackControlGuard, mutualExclusionGuard, *this ); @@ -125,7 +119,7 @@ void nciu::operator delete ( void * ) void nciu::initiateConnect ( epicsGuard < epicsMutex > & guard ) { - this->cacCtx.initiateConnect ( guard, *this ); + this->cacCtx.initiateConnect ( guard, *this, this->piiu ); } void nciu::connect ( unsigned nativeType, @@ -186,7 +180,7 @@ void nciu::unresponsiveCircuitNotify ( this->notify().accessRightsNotify ( guard, noRights ); } -void nciu::setServerAddressUnknown ( udpiiu & newiiu, +void nciu::setServerAddressUnknown ( netiiu & newiiu, epicsGuard < epicsMutex > & guard ) { guard.assertIdenticalMutex ( this->cacCtx.mutexRef () ); @@ -217,30 +211,16 @@ void nciu::accessRightsStateChange ( /* * nciu::searchMsg () */ -bool nciu::searchMsg ( udpiiu & iiu ) +bool nciu::searchMsg ( epicsGuard < epicsMutex > & guard ) { - caHdr msg; - bool success; - - msg.m_cmmd = epicsHTON16 ( CA_PROTO_SEARCH ); - msg.m_available = epicsHTON32 ( this->getId () ); - msg.m_dataType = epicsHTON16 ( DONTREPLY ); - msg.m_count = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION ); - msg.m_cid = epicsHTON32 ( this->getId () ); - - success = iiu.pushDatagramMsg ( msg, - this->pNameStr, this->nameLength ); - if ( success ) { - // - // increment the number of times we have tried - // to find this channel - // + bool success = this->piiu->searchMsg ( + guard, this->getId (), this->pNameStr, this->nameLength ); + if ( success ) { if ( this->retry < UINT_MAX ) { this->retry++; } - } - - return success; + } + return success; } const char *nciu::pName ( @@ -572,3 +552,62 @@ void nciu::disconnectAllIO ( *this, this->eventq ); } +void nciu::serviceShutdownNotify ( + epicsGuard < epicsMutex > & callbackControlGuard, + epicsGuard < epicsMutex > & mutualExclusionGuard ) +{ + this->setServerAddressUnknown ( noopIIU, mutualExclusionGuard ); + this->notify().serviceShutdownNotify ( callbackControlGuard, mutualExclusionGuard ); +} + +void channelNode::setRespPendingState ( + epicsGuard < epicsMutex > &, unsigned index ) +{ + this->listMember = + static_cast < channelNode::channelState > + ( channelNode::cs_searchRespPending0 + index ); + if ( this->listMember > cs_searchRespPending17 ) { + throw std::runtime_error ( + "resp search timer index out of bounds" ); + } +} + +void channelNode::setReqPendingState ( + epicsGuard < epicsMutex > &, unsigned index ) +{ + this->listMember = + static_cast < channelNode::channelState > + ( channelNode::cs_searchReqPending0 + index ); + if ( this->listMember > cs_searchReqPending17 ) { + throw std::runtime_error ( + "req search timer index out of bounds" ); + } +} + +unsigned channelNode::getMaxSearchTimerCount () +{ + return epicsMin ( + cs_searchReqPending17 - cs_searchReqPending0, + cs_searchRespPending17 - cs_searchRespPending0 ) + 1u; +} + +unsigned channelNode::getSearchTimerIndex ( + epicsGuard < epicsMutex > & ) +{ + channelNode::channelState chanState = this->listMember; + unsigned index = 0u; + if ( chanState >= cs_searchReqPending0 && + chanState <= cs_searchReqPending17 ) { + index = chanState - cs_searchReqPending0; + } + else if ( chanState >= cs_searchRespPending0 && + chanState <= cs_searchRespPending17 ) { + index = chanState - cs_searchRespPending0; + } + else { + throw std::runtime_error ( + "channel was expected to be in a search timer, but wasnt" );; + } + return index; +} + diff --git a/src/ca/nciu.h b/src/ca/nciu.h index e67ff889d..a4d09414a 100644 --- a/src/ca/nciu.h +++ b/src/ca/nciu.h @@ -59,11 +59,51 @@ protected: channelNode (); bool isConnected ( epicsGuard < epicsMutex > & ) const; bool isInstalledInServer ( epicsGuard < epicsMutex > & ) const; + static unsigned getMaxSearchTimerCount (); private: enum channelState { cs_none, cs_disconnGov, - cs_serverAddrResPend, + // note: indexing is used here + // so these must be contiguous + cs_searchReqPending0, + cs_searchReqPending1, + cs_searchReqPending2, + cs_searchReqPending3, + cs_searchReqPending4, + cs_searchReqPending5, + cs_searchReqPending6, + cs_searchReqPending7, + cs_searchReqPending8, + cs_searchReqPending9, + cs_searchReqPending10, + cs_searchReqPending11, + cs_searchReqPending12, + cs_searchReqPending13, + cs_searchReqPending14, + cs_searchReqPending15, + cs_searchReqPending16, + cs_searchReqPending17, + // note: indexing is used here + // so these must be contiguous + cs_searchRespPending0, + cs_searchRespPending1, + cs_searchRespPending2, + cs_searchRespPending3, + cs_searchRespPending4, + cs_searchRespPending5, + cs_searchRespPending6, + cs_searchRespPending7, + cs_searchRespPending8, + cs_searchRespPending9, + cs_searchRespPending10, + cs_searchRespPending11, + cs_searchRespPending12, + cs_searchRespPending13, + cs_searchRespPending14, + cs_searchRespPending15, + cs_searchRespPending16, + cs_searchRespPending17, cs_createReqPend, cs_createRespPend, cs_subscripReqPend, @@ -71,9 +111,14 @@ private: cs_unrespCircuit, cs_subscripUpdateReqPend } listMember; + void setRespPendingState ( epicsGuard < epicsMutex > &, unsigned index ); + void setReqPendingState ( epicsGuard < epicsMutex > &, unsigned index ); + unsigned getSearchTimerIndex ( epicsGuard < epicsMutex > & ); friend class tcpiiu; - friend class tcpSendThread; friend class udpiiu; + friend class tcpSendThread; + friend class searchTimer; + friend class disconnectGovernorTimer; }; class privateInterfaceForIO { // X aCC 655 @@ -107,9 +152,12 @@ public: epicsGuard < epicsMutex > & cbGuard, epicsGuard < epicsMutex > & guard ); void setServerAddressUnknown ( - udpiiu & newiiu, epicsGuard < epicsMutex > & guard ); - bool searchMsg ( class udpiiu & iiu ); - void serviceShutdownNotify (); + netiiu & newiiu, epicsGuard < epicsMutex > & guard ); + bool searchMsg ( + epicsGuard < epicsMutex > & ); + void serviceShutdownNotify ( + epicsGuard < epicsMutex > & callbackControlGuard, + epicsGuard < epicsMutex > & mutualExclusionGuard ); void accessRightsStateChange ( const caAccessRights &, epicsGuard < epicsMutex > & cbGuard, epicsGuard < epicsMutex > & guard ); @@ -150,7 +198,7 @@ public: void sendSubscriptionUpdateRequests ( epicsGuard < epicsMutex > & ); void disconnectAllIO ( epicsGuard < epicsMutex > &, epicsGuard < epicsMutex > & ); - bool connected ( epicsGuard < epicsMutex > & ) const; + bool connected ( epicsGuard < epicsMutex > & ) const; private: tsDLList < class baseNMIU > eventq; @@ -287,11 +335,6 @@ inline const netiiu * nciu::getConstPIIU ( return this->piiu; } -inline void nciu::serviceShutdownNotify () -{ - this->notify().serviceShutdownNotify (); -} - inline cac & nciu::getClient () { return this->cacCtx; diff --git a/src/ca/netiiu.cpp b/src/ca/netiiu.cpp index 41cf4767b..88c643328 100644 --- a/src/ca/netiiu.cpp +++ b/src/ca/netiiu.cpp @@ -19,6 +19,8 @@ * Author: Jeff Hill */ +#include + #include #include @@ -124,17 +126,33 @@ void netiiu::eliminateExcessiveSendBacklog ( void netiiu::requestRecvProcessPostponedFlush ( epicsGuard < epicsMutex > & ) { - return; } void netiiu::uninstallChan ( - epicsGuard < epicsMutex > &, epicsGuard < epicsMutex > &, nciu & ) { throw cacChannel::notConnected(); } - - +double netiiu::receiveWatchdogDelay ( + epicsGuard < epicsMutex > & guard ) const +{ + return - DBL_MAX; +} + +void netiiu::uninstallChanDueToSuccessfulSearchResponse ( + epicsGuard < epicsMutex > &, nciu &, + const epicsTime & currentTime ) +{ + throw std::runtime_error ( + "search response occured when not attached to udpiiu?" ); +} + +bool netiiu::searchMsg ( + epicsGuard < epicsMutex > &, ca_uint32_t id, + const char * pName, unsigned nameLength ) +{ + return false; +} diff --git a/src/ca/netiiu.h b/src/ca/netiiu.h index db2d16616..ffa2d9643 100644 --- a/src/ca/netiiu.h +++ b/src/ca/netiiu.h @@ -26,20 +26,19 @@ #ifndef netiiuh #define netiiuh -#include "tsDLList.h" - -#include "nciu.h" +#include "cacIO.h" +#include "caProto.h" class netWriteNotifyIO; class netReadNotifyIO; class netSubscription; union osiSockAddr; - class cac; +class nciu; class netiiu { public: - virtual ~netiiu (); + virtual ~netiiu () = 0; virtual void hostName ( epicsGuard < epicsMutex > &, char * pBuf, unsigned bufLength ) const = 0; @@ -84,10 +83,15 @@ public: virtual osiSockAddr getNetworkAddress ( epicsGuard < epicsMutex > & ) const = 0; virtual void uninstallChan ( - epicsGuard < epicsMutex > & cbMutex, - epicsGuard < epicsMutex > & mutex, nciu & ) = 0; + epicsGuard < epicsMutex > &, nciu & ) = 0; + virtual void uninstallChanDueToSuccessfulSearchResponse ( + epicsGuard < epicsMutex > &, nciu &, + const class epicsTime & currentTime ) = 0; virtual double receiveWatchdogDelay ( epicsGuard < epicsMutex > & ) const = 0; + virtual bool searchMsg ( + epicsGuard < epicsMutex > &, ca_uint32_t id, + const char * pName, unsigned nameLength ) = 0; }; #endif // netiiuh diff --git a/src/ca/noopiiu.cpp b/src/ca/noopiiu.cpp new file mode 100644 index 000000000..c5e958d68 --- /dev/null +++ b/src/ca/noopiiu.cpp @@ -0,0 +1,166 @@ +/*************************************************************************\ +* Copyright (c) 2002 The University of Chicago, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2002 The Regents of the University of California, as +* Operator of Los Alamos National Laboratory. +* EPICS BASE Versions 3.13.7 +* and higher are distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* + * $Id$ + * + * + * L O S A L A M O S + * Los Alamos National Laboratory + * Los Alamos, New Mexico 87545 + * + * Copyright, 1986, The Regents of the University of California. + * + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + */ + +#include "osiSock.h" + +#define epicsExportSharedSymbols +#include "noopiiu.h" + +noopiiu noopIIU; + +noopiiu::~noopiiu () +{ +} + +void noopiiu::hostName ( + epicsGuard < epicsMutex > & cacGuard, + char *pBuf, unsigned bufLength ) const +{ + netiiu::hostName ( cacGuard, pBuf, bufLength ); +} + +const char * noopiiu::pHostName ( + epicsGuard < epicsMutex > & cacGuard ) const +{ + return netiiu::pHostName ( cacGuard ); +} + +bool noopiiu::ca_v42_ok ( + epicsGuard < epicsMutex > & cacGuard ) const +{ + return netiiu::ca_v42_ok ( cacGuard ); +} + +bool noopiiu::ca_v41_ok ( + epicsGuard < epicsMutex > & cacGuard ) const +{ + return netiiu::ca_v41_ok ( cacGuard ); +} + +void noopiiu::writeRequest ( + epicsGuard < epicsMutex > & guard, + nciu & chan, unsigned type, + arrayElementCount nElem, const void * pValue ) +{ + netiiu::writeRequest ( guard, chan, type, nElem, pValue ); +} + +void noopiiu::writeNotifyRequest ( + epicsGuard < epicsMutex > & guard, nciu & chan, + netWriteNotifyIO & io, unsigned type, + arrayElementCount nElem, const void *pValue ) +{ + netiiu::writeNotifyRequest ( guard, chan, io, type, nElem, pValue ); +} + +void noopiiu::readNotifyRequest ( + epicsGuard < epicsMutex > & guard, nciu & chan, + netReadNotifyIO & io, unsigned type, arrayElementCount nElem ) +{ + netiiu::readNotifyRequest ( guard, chan, io, type, nElem ); +} + +void noopiiu::clearChannelRequest ( + epicsGuard < epicsMutex > & guard, + ca_uint32_t sid, ca_uint32_t cid ) +{ + netiiu::clearChannelRequest ( guard, sid, cid ); +} + +void noopiiu::subscriptionRequest ( + epicsGuard < epicsMutex > & guard, nciu & chan, + netSubscription & subscr ) +{ + netiiu::subscriptionRequest ( guard, chan, subscr ); +} + +void noopiiu::subscriptionUpdateRequest ( + epicsGuard < epicsMutex > & guard, nciu & chan, + netSubscription & subscr ) +{ + netiiu::subscriptionUpdateRequest ( + guard, chan, subscr ); +} + +void noopiiu::subscriptionCancelRequest ( + epicsGuard < epicsMutex > & guard, + nciu & chan, netSubscription & subscr ) +{ + netiiu::subscriptionCancelRequest ( guard, chan, subscr ); +} + +void noopiiu::flushRequest ( + epicsGuard < epicsMutex > & guard ) +{ + netiiu::flushRequest ( guard ); +} + +void noopiiu::eliminateExcessiveSendBacklog ( + epicsGuard < epicsMutex > * pCBGuard, + epicsGuard < epicsMutex > & guard ) +{ + netiiu::eliminateExcessiveSendBacklog ( pCBGuard, guard ); +} + +void noopiiu::requestRecvProcessPostponedFlush ( + epicsGuard < epicsMutex > & guard ) +{ + netiiu::requestRecvProcessPostponedFlush ( guard ); +} + +osiSockAddr noopiiu::getNetworkAddress ( + epicsGuard < epicsMutex > & guard ) const +{ + return netiiu::getNetworkAddress ( guard ); +} + +double noopiiu::receiveWatchdogDelay ( + epicsGuard < epicsMutex > & guard ) const +{ + return netiiu::receiveWatchdogDelay ( guard ); +} + +void noopiiu::uninstallChan ( + epicsGuard < epicsMutex > & guard, nciu & chan ) +{ + // intentionally does not call default in netiiu +} + +void noopiiu::uninstallChanDueToSuccessfulSearchResponse ( + epicsGuard < epicsMutex > & guard, nciu & chan, + const class epicsTime & currentTime ) +{ + netiiu::uninstallChanDueToSuccessfulSearchResponse ( + guard, chan, currentTime ); +} + +bool noopiiu::searchMsg ( + epicsGuard < epicsMutex > & guard, ca_uint32_t id, + const char * pName, unsigned nameLength ) +{ + return netiiu::searchMsg ( + guard, id, pName, nameLength ); +} + diff --git a/src/ca/noopiiu.h b/src/ca/noopiiu.h new file mode 100644 index 000000000..35e14c3a4 --- /dev/null +++ b/src/ca/noopiiu.h @@ -0,0 +1,92 @@ +/*************************************************************************\ +* Copyright (c) 2002 The University of Chicago, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2002 The Regents of the University of California, as +* Operator of Los Alamos National Laboratory. +* EPICS BASE Versions 3.13.7 +* and higher are distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* + * $Id$ + * + * + * L O S A L A M O S + * Los Alamos National Laboratory + * Los Alamos, New Mexico 87545 + * + * Copyright, 1986, The Regents of the University of California. + * + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + */ + +#ifndef noopiiuh +#define noopiiuh + +#include "netiiu.h" + +class noopiiu : public netiiu { +public: + ~noopiiu (); + void hostName ( + epicsGuard < epicsMutex > &, char * pBuf, + unsigned bufLength ) const; + const char * pHostName ( + epicsGuard < epicsMutex > & ) const; + bool ca_v41_ok ( + epicsGuard < epicsMutex > & ) const; + bool ca_v42_ok ( + epicsGuard < epicsMutex > & ) const; + void eliminateExcessiveSendBacklog ( + epicsGuard < epicsMutex > * pCallbackGuard, + epicsGuard < epicsMutex > & mutualExclusionGuard ); + void writeRequest ( + epicsGuard < epicsMutex > &, nciu &, + unsigned type, arrayElementCount nElem, + const void *pValue ); + void writeNotifyRequest ( + epicsGuard < epicsMutex > &, + nciu &, netWriteNotifyIO &, + unsigned type, arrayElementCount nElem, + const void *pValue ); + void readNotifyRequest ( + epicsGuard < epicsMutex > &, nciu &, + netReadNotifyIO &, unsigned type, + arrayElementCount nElem ); + void clearChannelRequest ( + epicsGuard < epicsMutex > &, + ca_uint32_t sid, ca_uint32_t cid ); + void subscriptionRequest ( + epicsGuard < epicsMutex > &, + nciu &, netSubscription & ); + void subscriptionUpdateRequest ( + epicsGuard < epicsMutex > &, + nciu &, netSubscription & ); + void subscriptionCancelRequest ( + epicsGuard < epicsMutex > &, + nciu & chan, netSubscription & subscr ); + void flushRequest ( + epicsGuard < epicsMutex > & ); + void requestRecvProcessPostponedFlush ( + epicsGuard < epicsMutex > & ); + osiSockAddr getNetworkAddress ( + epicsGuard < epicsMutex > & ) const; + void uninstallChan ( + epicsGuard < epicsMutex > & mutex, + nciu & ); + void uninstallChanDueToSuccessfulSearchResponse ( + epicsGuard < epicsMutex > &, nciu &, + const class epicsTime & currentTime ); + double receiveWatchdogDelay ( + epicsGuard < epicsMutex > & ) const; + bool searchMsg ( + epicsGuard < epicsMutex > &, ca_uint32_t id, + const char * pName, unsigned nameLength ); +}; + +extern noopiiu noopIIU; + +#endif // ifndef noopiiuh diff --git a/src/ca/oldAccess.h b/src/ca/oldAccess.h index e10557a47..47e8671ff 100644 --- a/src/ca/oldAccess.h +++ b/src/ca/oldAccess.h @@ -142,7 +142,9 @@ private: bool prevConnected; void connectNotify ( epicsGuard < epicsMutex > & ); void disconnectNotify ( epicsGuard < epicsMutex > & ); - void serviceShutdownNotify (); + void serviceShutdownNotify ( + epicsGuard < epicsMutex > & callbackControlGuard, + epicsGuard < epicsMutex > & mutualExclusionGuard ); void accessRightsNotify ( epicsGuard < epicsMutex > &, const caAccessRights & ); void exception ( epicsGuard < epicsMutex > &, @@ -324,7 +326,8 @@ public: void vSignal ( int ca_status, const char * pfilenm, int lineno, const char *pFormat, va_list args ); bool preemptiveCallbakIsEnabled () const; - void destroyChannel ( oldChannelNotify & chan ); + void destroyChannel ( epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard, oldChannelNotify & chan ); void destroyGetCopy ( epicsGuard < epicsMutex > &, getCopy & ); void destroyGetCallback ( epicsGuard < epicsMutex > &, getCallback & ); void destroyPutCallback ( epicsGuard < epicsMutex > &, putCallback & ); @@ -365,8 +368,6 @@ private: void callbackProcessingCompleteNotify (); cacContext & createNetworkContext ( epicsMutex & mutualExclusion, epicsMutex & callbackControl ); - void destroyChannelPrivate ( - oldChannelNotify & chan, epicsGuard < epicsMutex > & cbGuard ); void clearSubscriptionPrivate ( evid pMon, epicsGuard < epicsMutex > & cbGuard ); @@ -379,6 +380,7 @@ private: friend int epicsShareAPI ca_create_channel ( const char * name_str, caCh * conn_func, void * puser, capri priority, chid * chanptr ); + friend int epicsShareAPI ca_clear_channel ( chid pChan ); friend int epicsShareAPI ca_array_get ( chtype type, arrayElementCount count, chid pChan, void * pValue ); friend int epicsShareAPI ca_array_get_callback ( chtype type, diff --git a/src/ca/oldChannelNotify.cpp b/src/ca/oldChannelNotify.cpp index dcf9e5a7c..09024003b 100644 --- a/src/ca/oldChannelNotify.cpp +++ b/src/ca/oldChannelNotify.cpp @@ -173,9 +173,14 @@ void oldChannelNotify::disconnectNotify ( } } -void oldChannelNotify::serviceShutdownNotify () +void oldChannelNotify::serviceShutdownNotify ( + epicsGuard < epicsMutex > & callbackControlGuard, + epicsGuard < epicsMutex > & mutualExclusionGuard ) { - this->cacCtx.destroyChannel ( *this ); + this->cacCtx.destroyChannel ( + callbackControlGuard, + mutualExclusionGuard, + *this ); } void oldChannelNotify::accessRightsNotify ( diff --git a/src/ca/repeaterSubscribeTimer.cpp b/src/ca/repeaterSubscribeTimer.cpp index 67c77227d..636d2b71c 100644 --- a/src/ca/repeaterSubscribeTimer.cpp +++ b/src/ca/repeaterSubscribeTimer.cpp @@ -30,14 +30,16 @@ #include "udpiiu.h" #undef epicsExportSharedSymbols +static const double repeaterSubscribeTimerInitialPeriod = 10.0; // sec +static const double repeaterSubscribeTimerPeriod = 1.0; // sec + repeaterSubscribeTimer::repeaterSubscribeTimer ( - udpiiu & iiuIn, epicsTimerQueue & queueIn, + repeaterTimerNotify & iiuIn, epicsTimerQueue & queueIn, epicsMutex & cbMutexIn, cacContextNotify & ctxNotifyIn ) : timer ( queueIn.createTimer () ), iiu ( iiuIn ), cbMutex ( cbMutexIn ),ctxNotify ( ctxNotifyIn ), attempts ( 0 ), registered ( false ), once ( false ) { - this->timer.start ( *this, 10.0 ); } repeaterSubscribeTimer::~repeaterSubscribeTimer () @@ -45,9 +47,21 @@ repeaterSubscribeTimer::~repeaterSubscribeTimer () this->timer.destroy (); } -void repeaterSubscribeTimer::shutdown () +void repeaterSubscribeTimer::start () { - this->timer.cancel (); + this->timer.start ( + *this, repeaterSubscribeTimerInitialPeriod ); +} + +void repeaterSubscribeTimer::shutdown ( + epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard ) +{ + epicsGuardRelease < epicsMutex > unguard ( guard ); + { + epicsGuardRelease < epicsMutex > unguard ( cbGuard ); + this->timer.cancel (); + } } epicsTimerNotify::expireStatus repeaterSubscribeTimer:: @@ -58,11 +72,11 @@ epicsTimerNotify::expireStatus repeaterSubscribeTimer:: callbackManager mgr ( this->ctxNotify, this->cbMutex ); this->iiu.printf ( mgr.cbGuard, "CA client library is unable to contact CA repeater after %u tries.\n", - nTriesToMsg); + nTriesToMsg ); this->iiu.printf ( mgr.cbGuard, - "Silence this message by starting a CA repeater daemon\n"); + "Silence this message by starting a CA repeater daemon\n") ; this->iiu.printf ( mgr.cbGuard, - "or by calling ca_pend_event() and or ca_poll() more often.\n"); + "or by calling ca_pend_event() and or ca_poll() more often.\n" ); this->once = true; } @@ -73,7 +87,7 @@ epicsTimerNotify::expireStatus repeaterSubscribeTimer:: return noRestart; } else { - return expireStatus ( restart, 1.0 ); + return expireStatus ( restart, repeaterSubscribeTimerPeriod ); } } @@ -87,3 +101,5 @@ void repeaterSubscribeTimer::confirmNotify () { this->registered = true; } + +repeaterTimerNotify::~repeaterTimerNotify () {} diff --git a/src/ca/repeaterSubscribeTimer.h b/src/ca/repeaterSubscribeTimer.h index 620f24384..d6429c236 100644 --- a/src/ca/repeaterSubscribeTimer.h +++ b/src/ca/repeaterSubscribeTimer.h @@ -29,21 +29,34 @@ #include "epicsTimer.h" -class udpiiu; class epicsMutex; class cacContextNotify; +class repeaterTimerNotify { +public: + virtual ~repeaterTimerNotify () = 0; + virtual void repeaterRegistrationMessage ( + unsigned attemptNumber ) = 0; + virtual int printf ( + epicsGuard < epicsMutex > & callbackControl, + const char * pformat, ... ) = 0; +}; + class repeaterSubscribeTimer : private epicsTimerNotify { public: - repeaterSubscribeTimer ( udpiiu &, epicsTimerQueue &, + repeaterSubscribeTimer ( + repeaterTimerNotify &, epicsTimerQueue &, epicsMutex & cbMutex, cacContextNotify & ctxNotify ); virtual ~repeaterSubscribeTimer (); - void shutdown (); + void start (); + void shutdown ( + epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard ); void confirmNotify (); void show ( unsigned level ) const; private: epicsTimer & timer; - udpiiu & iiu; + repeaterTimerNotify & iiu; epicsMutex & cbMutex; cacContextNotify & ctxNotify; unsigned attempts; diff --git a/src/ca/searchTimer.cpp b/src/ca/searchTimer.cpp index 86b5dc924..a38e5b2c1 100644 --- a/src/ca/searchTimer.cpp +++ b/src/ca/searchTimer.cpp @@ -19,6 +19,7 @@ // Author: Jeff Hill // +#include #include #define epicsAssertAuthor "Jeff Hill johill@lanl.gov" @@ -28,188 +29,297 @@ #define epicsExportSharedSymbols #include "iocinf.h" -#include "searchTimer.h" #include "udpiiu.h" +#include "nciu.h" static const unsigned initialTriesPerFrame = 1u; // initial UDP frames per search try static const unsigned maxTriesPerFrame = 64u; // max UDP frames per search try -static const double minSearchPeriod = 30e-3; // seconds -static const double maxSearchPeriodDefault = 5.0 * 60.0; // seconds -static const double maxSearchPeriodLowerLimit = 60.0; // seconds - -// This impacts the exponential backoff delay between search messages. -// This delay is two to the power of the minimum channel retry count -// times the estimated round trip time or the OS's delay quantum -// whichever is greater. So this results in about a one second delay. -static const unsigned successfulSearchRetrySetpoint = 6u; -static const unsigned beaconAnomalyRetrySetpoint = 6u; -static const unsigned disconnectRetrySetpoint = 6u; - // // searchTimer::searchTimer () // -searchTimer::searchTimer ( udpiiu & iiuIn, - epicsTimerQueue & queueIn, udpMutex & mutexIn ) : - period ( 1e9 ), +searchTimer::searchTimer ( + searchTimerNotify & iiuIn, + epicsTimerQueue & queueIn, + const unsigned indexIn, + epicsMutex & mutexIn, + bool boostPossibleIn ) : timer ( queueIn.createTimer () ), iiu ( iiuIn ), mutex ( mutexIn ), framesPerTry ( initialTriesPerFrame ), framesPerTryCongestThresh ( DBL_MAX ), - maxPeriod ( maxSearchPeriodDefault ), retry ( 0 ), searchAttempts ( 0u ), searchResponses ( 0u ), - searchAttemptsThisPass ( 0u ), - searchResponsesThisPass ( 0u ), + index ( indexIn ), dgSeqNoAtTimerExpireBegin ( 0u ), dgSeqNoAtTimerExpireEnd ( 0u ), + boostPossible ( boostPossibleIn ), stopped ( false ) { - if ( envGetConfigParamPtr ( & EPICS_CA_MAX_SEARCH_PERIOD ) ) { - long longStatus = envGetDoubleConfigParam ( - & EPICS_CA_MAX_SEARCH_PERIOD, & this->maxPeriod ); - if ( ! longStatus ) { - if ( this->maxPeriod < maxSearchPeriodLowerLimit ) { - epicsPrintf ( "EPICS \"%s\" out of range (low)\n", - EPICS_CA_MAX_SEARCH_PERIOD.name ); - this->maxPeriod = maxSearchPeriodLowerLimit; - epicsPrintf ( "Setting \"%s\" = %f seconds\n", - EPICS_CA_MAX_SEARCH_PERIOD.name, this->maxPeriod ); - } - } - else { - epicsPrintf ( "EPICS \"%s\" wasnt a real number\n", - EPICS_CA_MAX_SEARCH_PERIOD.name ); - epicsPrintf ( "Setting \"%s\" = %f seconds\n", - EPICS_CA_MAX_SEARCH_PERIOD.name, this->maxPeriod ); - } - } +} + +void searchTimer::start () +{ + this->timer.start ( *this, this->period () ); } searchTimer::~searchTimer () { + assert ( this->chanListReqPending.count() == 0 ); + assert ( this->chanListRespPending.count() == 0 ); this->timer.destroy (); } -void searchTimer::shutdown () +void searchTimer::shutdown ( + epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard ) { this->stopped = true; - this->timer.cancel (); -} - -void searchTimer::channelCreatedNotify ( - epicsGuard < udpMutex > & guard, - const epicsTime & currentTime, bool firstChannel ) -{ - this->newChannelNotify ( guard, currentTime, - firstChannel, 0 ); -} - -void searchTimer::channelDisconnectedNotify ( - epicsGuard < udpMutex > & guard, - const epicsTime & currentTime, bool firstChannel ) -{ - this->newChannelNotify ( guard, currentTime, - firstChannel, disconnectRetrySetpoint ); -} - -void searchTimer::newChannelNotify ( - epicsGuard < udpMutex > & guard, const epicsTime & currentTime, - bool firstChannel, const unsigned minRetryNo ) -{ - if ( ! this->stopped ) { - if ( firstChannel ) { - this->recomputeTimerPeriod ( guard, minRetryNo ); - double newPeriod = this->period; - { - // avoid timer cancel block deadlock - epicsGuardRelease < udpMutex > unguard ( guard ); - this->timer.start ( *this, currentTime + newPeriod ); - } - } - else { - this->recomputeTimerPeriodAndStartTimer ( guard, - currentTime, minRetryNo, 0.0 ); - } - } -} - -void searchTimer::beaconAnomalyNotify ( - epicsGuard < udpMutex > & guard, - const epicsTime & currentTime, const double & delay ) -{ - this->recomputeTimerPeriodAndStartTimer ( - guard, currentTime, beaconAnomalyRetrySetpoint, delay ); -} - -// lock must be applied -void searchTimer::recomputeTimerPeriod ( - epicsGuard < udpMutex > & guard, const unsigned retryNew ) // X aCC 431 -{ - this->retry = retryNew; - size_t idelay = 1u << tsMin ( (size_t) this->retry, - CHAR_BIT * sizeof ( idelay ) - 1u ); - double delayFactor = tsMax ( - this->iiu.roundTripDelayEstimate ( guard ) * 2.0, minSearchPeriod ); - this->period = idelay * delayFactor; /* sec */ - this->period = tsMin ( this->maxPeriod, this->period ); -} - -void searchTimer::recomputeTimerPeriodAndStartTimer ( epicsGuard < udpMutex > & guard, - const epicsTime & currentTime, const unsigned retryNew, const double & initialDelay ) -{ - if ( this->iiu.unresolvedChannelCount ( guard ) == 0 || this->stopped ) { - return; - } - - bool start = false; - double totalDelay = initialDelay; { - if ( this->retry <= retryNew ) { - return; - } - - double oldPeriod = this->period; - - this->recomputeTimerPeriod ( guard, retryNew ); - - totalDelay += this->period; - - if ( totalDelay < oldPeriod ) { - epicsTimer::expireInfo info = this->timer.getExpireInfo (); - if ( info.active ) { - double delay = info.expireTime - currentTime; - if ( delay > totalDelay ) { - start = true; - } - } - else { - start = true; - } + epicsGuardRelease < epicsMutex > unguard ( guard ); + { + epicsGuardRelease < epicsMutex > unguard ( cbGuard ); + this->timer.cancel (); } } - if ( start ) { - // avoid timer cancel block deadlock - epicsGuardRelease < udpMutex > unguard ( guard ); - this->timer.start ( *this, currentTime + totalDelay ); + while ( nciu * pChan = this->chanListReqPending.get () ) { + pChan->channelNode::listMember = + channelNode::cs_none; + pChan->serviceShutdownNotify ( cbGuard, guard ); + } + while ( nciu * pChan = this->chanListRespPending.get () ) { + pChan->channelNode::listMember = + channelNode::cs_none; + pChan->serviceShutdownNotify ( cbGuard, guard ); + } +} + +void searchTimer::installChannel ( + epicsGuard < epicsMutex > & guard, nciu & chan ) +{ + this->chanListReqPending.add ( chan ); + chan.channelNode::setReqPendingState ( guard, this->index ); +} + +void searchTimer::moveChannels ( + epicsGuard < epicsMutex > & guard, searchTimer & dest ) +{ + while ( nciu * pChan = this->chanListRespPending.get () ) { + if ( this->searchAttempts > 0 ) { + this->searchAttempts--; + } + dest.installChannel ( guard, *pChan ); + } + while ( nciu * pChan = this->chanListReqPending.get () ) { + dest.installChannel ( guard, *pChan ); } - debugPrintf ( ( "changed search period to %f sec\n", this->period ) ); } // -// searchTimer::notifySuccessfulSearchResponse () +// searchTimer::expire () +// +epicsTimerNotify::expireStatus searchTimer::expire ( + const epicsTime & currentTime ) // X aCC 361 +{ + epicsGuard < epicsMutex > guard ( this->mutex ); + + while ( nciu * pChan = this->chanListRespPending.get () ) { + pChan->channelNode::listMember = + channelNode::cs_none; + this->iiu.noSearchRespNotify ( + guard, *pChan, this->index ); + } + + // boost search period for channels not recently + // searched for if there was some success + if ( this->searchResponses && this->boostPossible ) { + while ( nciu * pChan = this->chanListReqPending.get () ) { + pChan->channelNode::listMember = + channelNode::cs_none; + this->iiu.boostChannel ( guard, *pChan ); + } + } + + if ( this->searchAttempts ) { +#if 0 + // + // dynamically adjust the number of UDP frames per + // try depending how many search requests are not + // replied to + // + // The variable this->framesPerTry + // determines the number of UDP frames to be sent + // each time that expire() is called. + // If this value is too high we will waste some + // network bandwidth. If it is too low we will + // use very little of the incoming UDP message + // buffer associated with the server's port and + // will therefore take longer to connect. We + // initialize this->framesPerTry to a prime number + // so that it is less likely that the + // same channel is in the last UDP frame + // sent every time that this is called (and + // potentially discarded by a CA server with + // a small UDP input queue). + // + // increase frames per try only if we see better than + // a 93.75% success rate for one pass through the list + // + if ( this->searchResponses > + ( this->searchAttempts - (this->searchAttempts/16u) ) ) { + // increase UDP frames per try if we have a good score + if ( this->framesPerTry < maxTriesPerFrame ) { + // a congestion avoidance threshold similar to TCP is now used + if ( this->framesPerTry < this->framesPerTryCongestThresh ) { + this->framesPerTry += this->framesPerTry; + } + else { + this->framesPerTry += (this->framesPerTry/8) + 1; + } + debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n", + this->framesPerTry, this->searchAttempts, this->searchResponses) ); + } + } + // if we detect congestion because we have less than a 87.5% success + // rate then gradually reduce the frames per try + else if ( this->searchResponses < + ( this->searchAttempts - (this->searchAttempts/8u) ) ) { + if ( this->framesPerTry > 1 ) { + this->framesPerTry--; + } + this->framesPerTryCongestThresh = this->framesPerTry/2 + 1; + debugPrintf ( ("Congestion detected - set frames per try to %f t=%u r=%u\n", + this->framesPerTry, this->searchAttempts, this->searchResponses) ); + } +#else + if ( this->searchResponses == this->searchAttempts ) { + // increase UDP frames per try if we have a good score + if ( this->framesPerTry < maxTriesPerFrame ) { + // a congestion avoidance threshold similar to TCP is now used + if ( this->framesPerTry < this->framesPerTryCongestThresh ) { + double doubled = 2 * this->framesPerTry; + if ( doubled > this->framesPerTryCongestThresh ) { + this->framesPerTry = this->framesPerTryCongestThresh; + } + else { + this->framesPerTry = doubled; + } + } + else { + this->framesPerTry += 1.0 / this->framesPerTry; + } + debugPrintf ( ("Increasing frame count to %g t=%u r=%u\n", + this->framesPerTry, this->searchAttempts, this->searchResponses) ); + } + } + else { + this->framesPerTryCongestThresh = this->framesPerTry / 2.0; + this->framesPerTry = 1u; + debugPrintf ( ("Congestion detected - set frames per try to %g t=%u r=%u\n", + this->framesPerTry, this->searchAttempts, this->searchResponses) ); + } +#endif + } + + this->dgSeqNoAtTimerExpireBegin = + this->iiu.datagramSeqNumber ( guard ); + + this->searchAttempts = 0; + this->searchResponses = 0; + + unsigned nFrameSent = 0u; + while ( true ) { + nciu * pChan = this->chanListReqPending.get (); + if ( ! pChan ) { + break; + } + + pChan->channelNode::listMember = + channelNode::cs_none; + + bool success = pChan->searchMsg ( guard ); + if ( ! success ) { + if ( this->iiu.datagramFlush ( guard, currentTime ) ) { + nFrameSent++; + if ( nFrameSent < this->framesPerTry ) { + success = pChan->searchMsg ( guard ); + } + } + if ( ! success ) { + this->chanListReqPending.push ( *pChan ); + pChan->channelNode::setReqPendingState ( + guard, this->index ); + break; + } + } + + this->chanListRespPending.add ( *pChan ); + pChan->channelNode::setRespPendingState ( + guard, this->index ); + + if ( this->searchAttempts < UINT_MAX ) { + this->searchAttempts++; + } + } + + // flush out the search request buffer + if ( this->iiu.datagramFlush ( guard, currentTime ) ) { + nFrameSent++; + } + + this->dgSeqNoAtTimerExpireEnd = + this->iiu.datagramSeqNumber ( guard ) - 1u; + + this->timeAtLastSend = currentTime; + +# ifdef DEBUG + if ( this->searchAttempts ) { + char buf[64]; + currentTime.strftime ( buf, sizeof(buf), "%M:%S.%09f"); + debugPrintf ( ("sent %u delay sec=%f Rts=%s\n", + nFrameSent, this->period(), buf ) ); + } +# endif + + return expireStatus ( restart, this->period() ); +} + +void searchTimer::show ( unsigned level ) const +{ + epicsGuard < epicsMutex > guard ( this->mutex ); + ::printf ( "search timer delay %f\n", this->period() ); + ::printf ( "%u channels with search request pending\n", + this->chanListReqPending.count () ); + tsDLIterConst < nciu > pChan = this->chanListReqPending.firstIter (); + while ( pChan.valid () ) { + pChan->show ( level - 1u ); + pChan++; + } + ::printf ( "%u channels with search response pending\n", + this->chanListRespPending.count () ); + pChan = this->chanListRespPending.firstIter (); + while ( pChan.valid () ) { + pChan->show ( level - 1u ); + pChan++; + } +} + // // Reset the delay to the next search request if we get // at least one response. However, dont reset this delay if we // get a delayed response to an old search request. // -void searchTimer::notifySuccessfulSearchResponse ( epicsGuard < udpMutex > & guard, - ca_uint32_t respDatagramSeqNo, bool seqNumberIsValid, const epicsTime & currentTime ) +void searchTimer::uninstallChanDueToSuccessfulSearchResponse ( + epicsGuard < epicsMutex > & guard, nciu & chan, + ca_uint32_t respDatagramSeqNo, bool seqNumberIsValid, + const epicsTime & currentTime ) { - if ( this->iiu.unresolvedChannelCount ( guard ) == 0 || this->stopped ) { + this->uninstallChan ( guard, chan ); + + if ( this->stopped ) { return; } @@ -223,235 +333,49 @@ void searchTimer::notifySuccessfulSearchResponse ( epicsGuard < udpMutex > & gua // if we receive a successful response then reset to a // reasonable timer period if ( validResponse ) { + double measured = currentTime - this->timeAtLastSend; + this->iiu.updateRTTE ( measured ); + if ( this->searchResponses < UINT_MAX ) { this->searchResponses++; if ( this->searchResponses == this->searchAttempts ) { - debugPrintf ( ( "Response set timer delay to zero\n" ) ); - if ( this->retry > successfulSearchRetrySetpoint ) { - this->recomputeTimerPeriod ( - guard, successfulSearchRetrySetpoint ); - } - // avoid timer cancel block deadlock - epicsGuardRelease < udpMutex > unguard ( guard ); - // - // when we get 100% success immediately - // send another search request - // - this->timer.start ( *this, currentTime ); - } - else { - debugPrintf ( ( "Response set timer delay to beacon anomaly set point\n" ) ); - // - // otherwise, if making some progress then dont allow - // retry rate to drop below some reasonable minimum - // - if ( this->retry > successfulSearchRetrySetpoint ) { - this->recomputeTimerPeriodAndStartTimer ( - guard, currentTime, successfulSearchRetrySetpoint, 0.0 ); + if ( this->chanListReqPending.count () ) { + // avoid timer cancel block deadlock + epicsGuardRelease < epicsMutex > unguard ( guard ); + // + // when we get 100% success immediately + // send another search request + // + debugPrintf ( ( "All requests succesful, set timer delay to zero\n" ) ); + this->timer.start ( *this, currentTime ); } } } } } -// -// searchTimer::expire () -// -epicsTimerNotify::expireStatus searchTimer::expire ( const epicsTime & currentTime ) // X aCC 361 +void searchTimer::uninstallChan ( + epicsGuard < epicsMutex > & cacGuard, nciu & chan ) { - epicsGuard < udpMutex > guard ( this->mutex ); - - // check to see if there is nothing to do here - if ( this->iiu.unresolvedChannelCount ( guard ) == 0 ) { - debugPrintf ( ( "all channels located - search timer terminating\n" ) ); - this->period = DBL_MAX; - return noRestart; - } - -#if 0 - // - // dynamically adjust the number of UDP frames per - // try depending how many search requests are not - // replied to - // - // The variable this->framesPerTry - // determines the number of UDP frames to be sent - // each time that expire() is called. - // If this value is too high we will waste some - // network bandwidth. If it is too low we will - // use very little of the incoming UDP message - // buffer associated with the server's port and - // will therefore take longer to connect. We - // initialize this->framesPerTry to a prime number - // so that it is less likely that the - // same channel is in the last UDP frame - // sent every time that this is called (and - // potentially discarded by a CA server with - // a small UDP input queue). - // - // increase frames per try only if we see better than - // a 93.75% success rate for one pass through the list - // - if ( this->searchResponses > - ( this->searchAttempts - (this->searchAttempts/16u) ) ) { - // increase UDP frames per try if we have a good score - if ( this->framesPerTry < maxTriesPerFrame ) { - // a congestion avoidance threshold similar to TCP is now used - if ( this->framesPerTry < this->framesPerTryCongestThresh ) { - this->framesPerTry += this->framesPerTry; - } - else { - this->framesPerTry += (this->framesPerTry/8) + 1; - } - debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n", - this->framesPerTry, this->searchAttempts, this->searchResponses) ); - } + cacGuard.assertIdenticalMutex ( this->mutex ); + if ( chan.channelNode::listMember == + this->index + channelNode::cs_searchReqPending0 ) { + this->chanListReqPending.remove ( chan ); } - // if we detect congestion because we have less than a 87.5% success - // rate then gradually reduce the frames per try - else if ( this->searchResponses < - ( this->searchAttempts - (this->searchAttempts/8u) ) ) { - if ( this->framesPerTry > 1 ) { - this->framesPerTry--; - } - this->framesPerTryCongestThresh = this->framesPerTry/2 + 1; - debugPrintf ( ("Congestion detected - set frames per try to %f t=%u r=%u\n", - this->framesPerTry, this->searchAttempts, this->searchResponses) ); - } -#else - if ( this->searchResponses == this->searchAttempts ) { - // increase UDP frames per try if we have a good score - if ( this->framesPerTry < maxTriesPerFrame ) { - // a congestion avoidance threshold similar to TCP is now used - if ( this->framesPerTry < this->framesPerTryCongestThresh ) { - double doubled = 2 * this->framesPerTry; - if ( doubled > this->framesPerTryCongestThresh ) { - this->framesPerTry = this->framesPerTryCongestThresh; - } - else { - this->framesPerTry = doubled; - } - } - else { - this->framesPerTry += 1.0 / this->framesPerTry; - } - debugPrintf ( ("Increasing frame count to %g t=%u r=%u\n", - this->framesPerTry, this->searchAttempts, this->searchResponses) ); - } - } - else { - this->framesPerTryCongestThresh = this->framesPerTry / 2.0; - this->framesPerTry = 1u; - debugPrintf ( ("Congestion detected - set frames per try to %g t=%u r=%u\n", - this->framesPerTry, this->searchAttempts, this->searchResponses) ); - } -#endif - - if ( this->searchAttemptsThisPass <= UINT_MAX - this->searchAttempts ) { - this->searchAttemptsThisPass += this->searchAttempts; + else if ( chan.channelNode::listMember == + this->index + channelNode::cs_searchRespPending0 ) { + this->chanListRespPending.remove ( chan ); } else { - this->searchAttemptsThisPass = UINT_MAX; + throw std::runtime_error ( + "uninstalling channel search timer, but channel state is wrong" );; } - if ( this->searchResponsesThisPass <= UINT_MAX - this->searchResponses ) { - this->searchResponsesThisPass += this->searchResponses; - } - else { - this->searchResponsesThisPass = UINT_MAX; - } - - this->dgSeqNoAtTimerExpireBegin = this->iiu.datagramSeqNumber ( guard ); - - this->searchAttempts = 0; - this->searchResponses = 0; - - unsigned nChanSent = 0u; - unsigned nFrameSent = 0u; - while ( true ) { - - // check to see if we have reached the end of the list - if ( this->searchAttemptsThisPass >= this->iiu.unresolvedChannelCount ( guard ) ) { - // if we are making some progress then dont increase the - // delay between search requests - if ( this->searchResponsesThisPass == 0u ) { - this->recomputeTimerPeriod ( guard, this->retry + 1 ); - } - - this->searchAttemptsThisPass = 0; - this->searchResponsesThisPass = 0; - - debugPrintf ( ("saw end of list\n") ); - } - - if ( ! this->iiu.searchMsg ( guard ) ) { - nFrameSent++; - - if ( nFrameSent >= this->framesPerTry ) { - break; - } - this->dgSeqNoAtTimerExpireEnd = this->iiu.datagramSeqNumber ( guard ); - this->iiu.datagramFlush ( guard, currentTime ); - if ( ! this->iiu.searchMsg ( guard ) ) { - break; - } - } - - if ( this->searchAttempts < UINT_MAX ) { - this->searchAttempts++; - } - if ( nChanSent < UINT_MAX ) { - nChanSent++; - } - - // - // dont send any of the channels twice within one try - // - if ( nChanSent >= this->iiu.unresolvedChannelCount ( guard ) ) { - // - // add one to nFrameSent because there may be - // one more partial frame to be sent - // - nFrameSent++; - - // - // cap this->framesPerTry to - // the number of frames required for all of - // the unresolved channels - // - if ( this->framesPerTry > nFrameSent ) { - this->framesPerTry = nFrameSent; - } - - break; - } - } - - // flush out the search request buffer - this->iiu.datagramFlush ( guard, currentTime ); - - this->dgSeqNoAtTimerExpireEnd = this->iiu.datagramSeqNumber ( guard ) - 1u; - -# ifdef DEBUG - char buf[64]; - epicsTime ts = currentTime; - ts.strftime ( buf, sizeof(buf), "%M:%S.%09f"); - debugPrintf ( ("sent %u delay sec=%f Rts=%s\n", - nFrameSent, this->period, buf ) ); -# endif - - if ( this->iiu.unresolvedChannelCount ( guard ) == 0 ) { - debugPrintf ( ( "all channels connected\n" ) ); - this->period = DBL_MAX; - return noRestart; - } - - // the code used to test this->minRetry < maxSearchTries here - // and return no restart if the maximum tries was exceeded - // prior to R3.14.7 - return expireStatus ( restart, this->period ); + chan.channelNode::listMember = channelNode::cs_none; } -void searchTimer::show ( unsigned /* level */ ) const +double searchTimer::period () const { + return (1 << this->index ) * this->iiu.getRTTE (); } +searchTimerNotify::~searchTimerNotify () {} diff --git a/src/ca/searchTimer.h b/src/ca/searchTimer.h index 1a0f96d6f..3c16b80f9 100644 --- a/src/ca/searchTimer.h +++ b/src/ca/searchTimer.h @@ -42,50 +42,68 @@ #endif #include "caProto.h" +#include "netiiu.h" -class udpMutex; +class searchTimerNotify { +public: + virtual ~searchTimerNotify () = 0; + virtual void boostChannel ( + epicsGuard < epicsMutex > &, nciu & ) = 0; + virtual void noSearchRespNotify ( + epicsGuard < epicsMutex > &, nciu &, unsigned ) = 0; + virtual double getRTTE () const = 0; + virtual void updateRTTE ( double rtte ) = 0; + virtual bool datagramFlush ( + epicsGuard < epicsMutex > &, + const epicsTime & currentTime ) = 0; + virtual ca_uint32_t datagramSeqNumber ( + epicsGuard < epicsMutex > & ) const = 0; +}; class searchTimer : private epicsTimerNotify { public: - searchTimer ( class udpiiu &, epicsTimerQueue &, udpMutex & ); + searchTimer ( + class searchTimerNotify &, epicsTimerQueue &, + const unsigned index, epicsMutex &, + bool boostPossible ); virtual ~searchTimer (); - void notifySuccessfulSearchResponse ( epicsGuard < udpMutex > &, - ca_uint32_t respDatagramSeqNo, - bool seqNumberIsValid, const epicsTime & currentTime ); - void beaconAnomalyNotify ( epicsGuard < udpMutex > &, - const epicsTime & currentTime, const double & delay ); - void channelCreatedNotify ( epicsGuard < udpMutex > &, - const epicsTime &, bool firstChannel ); - void channelDisconnectedNotify ( epicsGuard < udpMutex > &, - const epicsTime &, bool firstChannel ); - void shutdown (); + void start (); + void shutdown ( + epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard ); + void moveChannels ( + epicsGuard < epicsMutex > &, searchTimer & dest ); + void installChannel ( + epicsGuard < epicsMutex > &, nciu & ); + void uninstallChan ( + epicsGuard < epicsMutex > &, nciu & ); + void uninstallChanDueToSuccessfulSearchResponse ( + epicsGuard < epicsMutex > &, nciu &, + ca_uint32_t respDatagramSeqNo, bool seqNumberIsValid, + const epicsTime & currentTime ); void show ( unsigned level ) const; private: - double period; /* period between tries */ + tsDLList < nciu > chanListReqPending; + tsDLList < nciu > chanListRespPending; + epicsTime timeAtLastSend; epicsTimer & timer; - class udpiiu & iiu; - udpMutex & mutex; + searchTimerNotify & iiu; + epicsMutex & mutex; double framesPerTry; /* # of UDP frames per search try */ double framesPerTryCongestThresh; /* one half N tries w congest */ - double maxPeriod; unsigned retry; unsigned searchAttempts; /* num search tries after last timer experation */ unsigned searchResponses; /* num search resp after last timer experation */ - unsigned searchAttemptsThisPass; /* num search tries within this pass */ - unsigned searchResponsesThisPass; /* num search resp within this pass */ + const unsigned index; ca_uint32_t dgSeqNoAtTimerExpireBegin; ca_uint32_t dgSeqNoAtTimerExpireEnd; + const bool boostPossible; bool stopped; + expireStatus expire ( const epicsTime & currentTime ); - void recomputeTimerPeriod ( epicsGuard < udpMutex > &, const unsigned minRetryNew ); - void recomputeTimerPeriodAndStartTimer ( epicsGuard < udpMutex > &, - const epicsTime & currentTime, const unsigned minRetryNew, - const double & initialDelay ); - void newChannelNotify ( epicsGuard < udpMutex > &, - const epicsTime &, bool firstChannel, - const unsigned minRetryNo ); - searchTimer ( const searchTimer & ); - searchTimer & operator = ( const searchTimer & ); + double period () const; + searchTimer ( const searchTimer & ); // not implemented + searchTimer & operator = ( const searchTimer & ); // not implemented }; #endif // ifdef searchTimerh diff --git a/src/ca/tcpRecvWatchdog.cpp b/src/ca/tcpRecvWatchdog.cpp index 8229bb0e3..e81a5677a 100644 --- a/src/ca/tcpRecvWatchdog.cpp +++ b/src/ca/tcpRecvWatchdog.cpp @@ -36,7 +36,7 @@ tcpRecvWatchdog::tcpRecvWatchdog cbMutex ( cbMutexIn ), ctxNotify ( ctxNotifyIn ), mutex ( mutexIn ), iiu ( iiuIn ), probeResponsePending ( false ), beaconAnomaly ( true ), - probeTimeoutDetected ( false ) + probeTimeoutDetected ( false ), shuttingDown ( false ) { } @@ -49,6 +49,10 @@ epicsTimerNotify::expireStatus tcpRecvWatchdog::expire ( const epicsTime & /* currentTime */ ) // X aCC 361 { callbackManager mgr ( this->ctxNotify, this->cbMutex ); + epicsGuard < epicsMutex > guard ( this->mutex ); + if ( this->shuttingDown ) { + return noRestart; + } if ( this->probeResponsePending ) { if ( this->iiu.bytesArePendingInOS() ) { this->iiu.printf ( mgr.cbGuard, @@ -64,7 +68,6 @@ tcpRecvWatchdog::expire ( const epicsTime & /* currentTime */ ) // X aCC 361 "o application is blocked in a callback from the client library\n" ); } { - epicsGuard < epicsMutex > guard ( this->mutex ); # ifdef DEBUG char hostName[128]; this->iiu.hostName ( guard, hostName, sizeof (hostName) ); @@ -78,11 +81,8 @@ tcpRecvWatchdog::expire ( const epicsTime & /* currentTime */ ) // X aCC 361 return noRestart; } else { - { - epicsGuard < epicsMutex > guard ( this->mutex ); - this->probeTimeoutDetected = false; - this->probeResponsePending = this->iiu.setEchoRequestPending ( guard ); - } + this->probeTimeoutDetected = false; + this->probeResponsePending = this->iiu.setEchoRequestPending ( guard ); debugPrintf ( ("circuit timed out - sending echo request\n") ); return expireStatus ( restart, CA_ECHO_TIMEOUT ); } @@ -92,7 +92,7 @@ void tcpRecvWatchdog::beaconArrivalNotify ( epicsGuard < epicsMutex > & guard, const epicsTime & currentTime ) { guard.assertIdenticalMutex ( this->mutex ); - if ( ! this->beaconAnomaly && ! this->probeResponsePending ) { + if ( ! ( this->shuttingDown || this->beaconAnomaly || this->probeResponsePending ) ) { epicsGuardRelease < epicsMutex > unguard ( guard ); this->timer.start ( *this, currentTime + this->period ); debugPrintf ( ("saw a normal beacon - reseting circuit receive watchdog\n") ); @@ -120,15 +120,15 @@ void tcpRecvWatchdog::messageArrivalNotify ( bool restartNeeded = false; { epicsGuard < epicsMutex > guard ( this->mutex ); - if ( ! this->probeResponsePending ) { + if ( ! ( this->shuttingDown || this->probeResponsePending ) ) { this->beaconAnomaly = false; restartNeeded = true; } } // dont hold the lock for fear of deadlocking // because cancel is blocking for the completion - // of the recvDog expire which takes the lock - // - it take also the callback lock + // of expire() which takes the lock - it take also + // the callback lock if ( restartNeeded ) { this->timer.start ( *this, currentTime + this->period ); debugPrintf ( ("received a message - reseting circuit recv watchdog\n") ); @@ -143,7 +143,7 @@ void tcpRecvWatchdog::probeResponseNotify ( double restartDelay = DBL_MAX; { epicsGuard < epicsMutex > guard ( this->mutex ); - if ( this->probeResponsePending ) { + if ( this->probeResponsePending && ! this->shuttingDown ) { restartNeeded = true; if ( this->probeTimeoutDetected ) { this->probeTimeoutDetected = false; @@ -192,7 +192,7 @@ void tcpRecvWatchdog::sendBacklogProgressNotify ( // beacon anomaly (which could be transiently detecting a reboot) we will // not trust the beacon as an indicator of a healthy server until we // receive at least one message from the server. - if ( this->probeResponsePending ) { + if ( this->probeResponsePending && ! this->shuttingDown ) { // we avoid calling this with the lock applied because // it restarts the recv wd timer, this might block // until a recv wd timer expire callback completes, and @@ -205,6 +205,12 @@ void tcpRecvWatchdog::sendBacklogProgressNotify ( void tcpRecvWatchdog::connectNotify () { + { + epicsGuard < epicsMutex > guard ( this->mutex ); + if ( this->shuttingDown ) { + return; + } + } this->timer.start ( *this, this->period ); debugPrintf ( ("connected to the server - initiating circuit recv watchdog\n") ); } @@ -217,7 +223,7 @@ void tcpRecvWatchdog::sendTimeoutNotify ( guard.assertIdenticalMutex ( this->mutex ); bool restartNeeded = false; - if ( ! this->probeResponsePending ) { + if ( ! ( this->probeResponsePending || this->shuttingDown ) ) { this->probeResponsePending = this->iiu.setEchoRequestPending ( guard ); restartNeeded = true; } @@ -237,6 +243,16 @@ void tcpRecvWatchdog::cancel () debugPrintf ( ("canceling TCP recv watchdog\n") ); } +void tcpRecvWatchdog::shutdown () +{ + { + epicsGuard < epicsMutex > guard ( this->mutex ); + this->shuttingDown = true; + } + this->timer.cancel (); + debugPrintf ( ("canceling TCP recv watchdog\n") ); +} + double tcpRecvWatchdog::delay () const { return this->timer.getExpireDelay (); diff --git a/src/ca/tcpRecvWatchdog.h b/src/ca/tcpRecvWatchdog.h index 7903eba13..bed11fcec 100644 --- a/src/ca/tcpRecvWatchdog.h +++ b/src/ca/tcpRecvWatchdog.h @@ -56,6 +56,7 @@ public: epicsGuard < epicsMutex > & mutex, const epicsTime & currentTime ); void cancel (); + void shutdown (); void show ( unsigned level ) const; double delay () const; private: @@ -68,6 +69,7 @@ private: bool probeResponsePending; bool beaconAnomaly; bool probeTimeoutDetected; + bool shuttingDown; expireStatus expire ( const epicsTime & currentTime ); tcpRecvWatchdog ( const tcpRecvWatchdog & ); tcpRecvWatchdog & operator = ( const tcpRecvWatchdog & ); diff --git a/src/ca/tcpSendWatchdog.cpp b/src/ca/tcpSendWatchdog.cpp index 1ee16fb1e..ca5186328 100644 --- a/src/ca/tcpSendWatchdog.cpp +++ b/src/ca/tcpSendWatchdog.cpp @@ -44,6 +44,7 @@ epicsTimerNotify::expireStatus tcpSendWatchdog::expire ( const epicsTime & currentTime ) { callbackManager mgr ( this->ctxNotify, this->cbMutex ); + epicsGuard < epicsMutex > guard ( this->mutex ); if ( this->iiu.bytesArePendingInOS() ) { this->iiu.printf ( mgr.cbGuard, "The CA client library is disconnecting after a flush request " @@ -51,13 +52,12 @@ epicsTimerNotify::expireStatus tcpSendWatchdog::expire ( "application schedualing problem\n" ); } # ifdef DEBUG - epicsGuard < epicsMutex > guard ( this->mutex ); char hostName[128]; this->iiu.hostName ( guard, hostName, sizeof ( hostName ) ); debugPrintf ( ( "Request not accepted by CA server %s for %g sec. Disconnecting.\n", hostName, this->period ) ); # endif - this->iiu.sendTimeoutNotify ( currentTime, mgr ); + this->iiu.sendTimeoutNotify ( currentTime, mgr, guard ); return noRestart; } @@ -71,3 +71,4 @@ void tcpSendWatchdog::cancel () this->timer.cancel (); } + diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index bb2d3a8c1..063bcab4c 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -185,7 +185,7 @@ void tcpSendThread::run () } this->iiu.sendDog.cancel (); - this->iiu.recvDog.cancel (); + this->iiu.recvDog.shutdown (); while ( ! this->iiu.recvThread.exitWait ( 30.0 ) ) { // it is possible to get stuck here if the user calls @@ -333,15 +333,16 @@ void tcpiiu::recvBytes ( continue; } + char sockErrBuf[64]; + epicsSocketConvertErrnoToString ( + sockErrBuf, sizeof ( sockErrBuf ) ); + // the replacable printf handler isnt called here // because it reqires a callback lock which probably // isnt appropriate here char name[64]; this->hostNameCacheInstance.hostName ( name, sizeof ( name ) ); - char sockErrBuf[64]; - epicsSocketConvertErrnoToString ( - sockErrBuf, sizeof ( sockErrBuf ) ); errlogPrintf ( "Unexpected problem with CA circuit to" " server \"%s\" was \"%s\" - disconnecting\n", @@ -813,10 +814,11 @@ void tcpiiu::responsiveCircuitNotify ( void tcpiiu::sendTimeoutNotify ( const epicsTime & currentTime, - callbackManager & mgr ) + callbackManager & mgr, + epicsGuard < epicsMutex > & guard ) { mgr.cbGuard.assertIdenticalMutex ( this-> cbMutex ); - epicsGuard < epicsMutex > guard ( this->mutex ); + guard.assertIdenticalMutex ( this->mutex ); this->unresponsiveCircuitNotify ( mgr.cbGuard, guard ); // setup circuit probe sequence this->recvDog.sendTimeoutNotify ( mgr.cbGuard, guard, currentTime ); @@ -844,8 +846,16 @@ void tcpiiu::unresponsiveCircuitNotify ( this->sendThreadFlushEvent.signal (); this->flushBlockEvent.signal (); - this->recvDog.cancel (); - this->sendDog.cancel (); + // must not hold lock when canceling timer + { + epicsGuardRelease < epicsMutex > unguard ( guard ); + { + epicsGuardRelease < epicsMutex > unguard ( cbGuard ); + this->recvDog.cancel (); + this->sendDog.cancel (); + } + } + if ( this->connectedList.count() ) { char hostNameTmp[128]; this->hostName ( guard, hostNameTmp, sizeof ( hostNameTmp ) ); @@ -944,7 +954,7 @@ tcpiiu::~tcpiiu () this->sendThread.exitWait (); this->recvThread.exitWait (); this->sendDog.cancel (); - this->recvDog.cancel (); + this->recvDog.shutdown (); if ( ! this->socketHasBeenClosed ) { epicsSocketDestroy ( this->sock ); @@ -1691,49 +1701,76 @@ const char * tcpiiu::pHostName ( return nameBuf; } -void tcpiiu::removeAllChannels ( - bool supressApplicationNotify, +void tcpiiu::disconnectAllChannels ( epicsGuard < epicsMutex > & cbGuard, epicsGuard < epicsMutex > & guard, - udpiiu & discIIU ) + class udpiiu & discIIU ) { cbGuard.assertIdenticalMutex ( this->cbMutex ); guard.assertIdenticalMutex ( this->mutex ); while ( nciu * pChan = this->createReqPend.get () ) { - discIIU.installDisconnectedChannel ( *pChan ); - pChan->setServerAddressUnknown ( discIIU, guard ); + discIIU.installDisconnectedChannel ( guard, *pChan ); } while ( nciu * pChan = this->createRespPend.get () ) { this->clearChannelRequest ( guard, pChan->getSID(guard), pChan->getCID(guard) ); - discIIU.installDisconnectedChannel ( *pChan ); - pChan->setServerAddressUnknown ( discIIU, guard ); + discIIU.installDisconnectedChannel ( guard, *pChan ); } while ( nciu * pChan = this->subscripReqPend.get () ) { pChan->disconnectAllIO ( cbGuard, guard ); - discIIU.installDisconnectedChannel ( *pChan ); - pChan->setServerAddressUnknown ( discIIU, guard ); - if ( ! supressApplicationNotify ) { - pChan->unresponsiveCircuitNotify ( cbGuard, guard ); - } + discIIU.installDisconnectedChannel ( guard, *pChan ); + pChan->unresponsiveCircuitNotify ( cbGuard, guard ); } while ( nciu * pChan = this->connectedList.get () ) { pChan->disconnectAllIO ( cbGuard, guard ); - discIIU.installDisconnectedChannel ( *pChan ); - pChan->setServerAddressUnknown ( discIIU, guard ); - if ( ! supressApplicationNotify ) { - pChan->unresponsiveCircuitNotify ( cbGuard, guard ); - } + discIIU.installDisconnectedChannel ( guard, *pChan ); + pChan->unresponsiveCircuitNotify ( cbGuard, guard ); } while ( nciu * pChan = this->unrespCircuit.get () ) { pChan->disconnectAllIO ( cbGuard, guard ); - discIIU.installDisconnectedChannel ( *pChan ); - pChan->setServerAddressUnknown ( discIIU, guard ); + discIIU.installDisconnectedChannel ( guard, *pChan ); + } + + this->channelCountTot = 0u; + + this->initiateCleanShutdown ( guard ); +} + +void tcpiiu::unlinkAllChannels ( + epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard ) +{ + cbGuard.assertIdenticalMutex ( this->cbMutex ); + guard.assertIdenticalMutex ( this->mutex ); + + while ( nciu * pChan = this->createReqPend.get () ) { + pChan->serviceShutdownNotify ( cbGuard, guard ); + } + + while ( nciu * pChan = this->createRespPend.get () ) { + this->clearChannelRequest ( guard, + pChan->getSID(guard), pChan->getCID(guard) ); + pChan->serviceShutdownNotify ( cbGuard, guard ); + } + + while ( nciu * pChan = this->subscripReqPend.get () ) { + pChan->disconnectAllIO ( cbGuard, guard ); + pChan->serviceShutdownNotify ( cbGuard, guard ); + } + + while ( nciu * pChan = this->connectedList.get () ) { + pChan->disconnectAllIO ( cbGuard, guard ); + pChan->serviceShutdownNotify ( cbGuard, guard ); + } + + while ( nciu * pChan = this->unrespCircuit.get () ) { + pChan->disconnectAllIO ( cbGuard, guard ); + pChan->serviceShutdownNotify ( cbGuard, guard ); } this->channelCountTot = 0u; @@ -1785,11 +1822,8 @@ void tcpiiu::connectNotify ( } void tcpiiu::uninstallChan ( - epicsGuard < epicsMutex > & cbGuard, - epicsGuard < epicsMutex > & guard, - nciu & chan ) + epicsGuard < epicsMutex > & guard, nciu & chan ) { - cbGuard.assertIdenticalMutex ( this->cbMutex ); guard.assertIdenticalMutex ( this->mutex ); switch ( chan.channelNode::listMember ) { @@ -1812,7 +1846,7 @@ void tcpiiu::uninstallChan ( this->subscripUpdateReqPend.remove ( chan ); break; default: - this->cacRef.printf ( cbGuard, + errlogPrintf ( "cac: attempt to uninstall channel from tcp iiu, but it inst installed there?" ); } chan.channelNode::listMember = channelNode::cs_none; @@ -1922,6 +1956,21 @@ unsigned tcpiiu::channelCount ( epicsGuard < epicsMutex > & guard ) return this->channelCountTot; } +void tcpiiu::uninstallChanDueToSuccessfulSearchResponse ( + epicsGuard < epicsMutex > & guard, nciu & chan, + const class epicsTime & currentTime ) +{ + return netiiu::uninstallChanDueToSuccessfulSearchResponse ( + guard, chan, currentTime ); +} + +bool tcpiiu::searchMsg ( + epicsGuard < epicsMutex > & guard, ca_uint32_t id, + const char * pName, unsigned nameLength ) +{ + return netiiu::searchMsg ( + guard, id, pName, nameLength ); +} diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp index 32086d323..55e5307a3 100644 --- a/src/ca/udpiiu.cpp +++ b/src/ca/udpiiu.cpp @@ -36,8 +36,6 @@ #include "iocinf.h" #include "inetAddrID.h" #include "cac.h" -#include "searchTimer.h" -#include "repeaterSubscribeTimer.h" #include "disconnectGovernorTimer.h" // UDP protocol dispatch table @@ -77,34 +75,69 @@ udpiiu::udpiiu ( cac::lowestPriorityLevelAbove ( cac::lowestPriorityLevelAbove ( cac.getInitializingThreadsPriority () ) ) ), - rtteMean ( 5.0e-3 ), // seconds + repeaterSubscribeTmr ( + *this, timerQueue, cbMutexIn, ctxNotifyIn ), + govTmr ( *this, timerQueue, cacMutexIn ), + maxPeriod ( maxSearchPeriodDefault ), + rtteMean ( minRoundTripEstimate ), cacRef ( cac ), cbMutex ( cbMutexIn ), cacMutex ( cacMutexIn ), nBytesInXmitBuf ( 0 ), + nTimers ( 0 ), + beaconAnomalyTimerIndex ( 0 ), sequenceNumber ( 0 ), - rtteSequenceNumber ( 0 ), lastReceivedSeqNo ( 0 ), sock ( 0 ), - pGovTmr ( new disconnectGovernorTimer ( *this, timerQueue ) ), - // The udpiiu and the search timer share the same lock because - // this is much more efficent with recursive locks. Also, access - // to the udp's netiiu base list is protected. - pSearchTmr ( new searchTimer ( *this, timerQueue, this->mutex ) ), - pRepeaterSubscribeTmr ( - new repeaterSubscribeTimer ( - *this, timerQueue, cbMutexIn, ctxNotifyIn ) ), repeaterPort ( 0 ), serverPort ( 0 ), localPort ( 0 ), shutdownCmd ( false ), - rtteActive ( false ), lastReceivedSeqNoIsValid ( false ) { - static const unsigned short PORT_ANY = 0u; - osiSockAddr addr; - int boolValue = true; - int status; + if ( envGetConfigParamPtr ( & EPICS_CA_MAX_SEARCH_PERIOD ) ) { + long longStatus = envGetDoubleConfigParam ( + & EPICS_CA_MAX_SEARCH_PERIOD, & this->maxPeriod ); + if ( ! longStatus ) { + if ( this->maxPeriod < maxSearchPeriodLowerLimit ) { + epicsPrintf ( "\"%s\" out of range (low)\n", + EPICS_CA_MAX_SEARCH_PERIOD.name ); + this->maxPeriod = maxSearchPeriodLowerLimit; + epicsPrintf ( "Setting \"%s\" = %f seconds\n", + EPICS_CA_MAX_SEARCH_PERIOD.name, this->maxPeriod ); + } + } + else { + epicsPrintf ( "EPICS \"%s\" wasnt a real number\n", + EPICS_CA_MAX_SEARCH_PERIOD.name ); + epicsPrintf ( "Setting \"%s\" = %f seconds\n", + EPICS_CA_MAX_SEARCH_PERIOD.name, this->maxPeriod ); + } + } + + double powerOfTwo = log ( this->maxPeriod / minRoundTripEstimate ) / log ( 2.0 ); + this->nTimers = static_cast < unsigned > ( powerOfTwo + 1.0 ); + if ( this->nTimers > channelNode::getMaxSearchTimerCount () ) { + this->nTimers = channelNode::getMaxSearchTimerCount (); + epicsPrintf ( "\"%s\" out of range (high)\n", + EPICS_CA_MAX_SEARCH_PERIOD.name ); + epicsPrintf ( "Setting \"%s\" = %f seconds\n", + EPICS_CA_MAX_SEARCH_PERIOD.name, + (1<<(this->nTimers-1)) * minRoundTripEstimate ); + } + + powerOfTwo = log ( beaconAnomalySearchPeriod / minRoundTripEstimate ) / log ( 2.0 ); + this->beaconAnomalyTimerIndex = static_cast < unsigned > ( powerOfTwo + 1.0 ); + if ( this->beaconAnomalyTimerIndex >= this->nTimers ) { + this->beaconAnomalyTimerIndex = this->nTimers - 1; + } + + this->ppSearchTmr.reset ( new epics_auto_ptr < class searchTimer > [ this->nTimers ] ); + for ( unsigned i = 0; i < this->nTimers; i++ ) { + this->ppSearchTmr[i].reset ( + new searchTimer ( *this, timerQueue, i, cacMutexIn, + i > this->beaconAnomalyTimerIndex ) ); + } this->repeaterPort = envGetInetPortConfigParam ( &EPICS_CA_REPEATER_PORT, @@ -124,7 +157,8 @@ udpiiu::udpiiu ( throwWithLocation ( noSocket () ); } - status = setsockopt ( this->sock, SOL_SOCKET, SO_BROADCAST, + int boolValue = true; + int status = setsockopt ( this->sock, SOL_SOCKET, SO_BROADCAST, (char *) &boolValue, sizeof ( boolValue ) ); if ( status < 0 ) { char sockErrBuf[64]; @@ -156,6 +190,8 @@ udpiiu::udpiiu ( // force a bind to an unconstrained address so we can obtain // the local port number below + static const unsigned short PORT_ANY = 0u; + osiSockAddr addr; memset ( (char *)&addr, 0 , sizeof (addr) ); addr.ia.sin_family = AF_INET; addr.ia.sin_addr.s_addr = epicsHTON32 (INADDR_ANY); @@ -202,6 +238,12 @@ udpiiu::udpiiu ( this->pushVersionMsg (); + // start timers and receive thread + for ( unsigned i =0; i < this->nTimers; i++ ) { + this->ppSearchTmr[i]->start (); + } + this->govTmr.start (); + this->repeaterSubscribeTmr.start (); this->recvThread.start (); } @@ -210,23 +252,10 @@ udpiiu::udpiiu ( */ udpiiu::~udpiiu () { - this->shutdown (); - - // no need to own CAC lock here because the CA context - // is being decomissioned - tsDLIter < nciu > chan = this->disconnGovernor.firstIter (); - while ( chan.valid () ) { - tsDLIter < nciu > next = chan; - next++; - chan->serviceShutdownNotify (); - chan = next; - } - chan = this->serverAddrRes.firstIter (); - while ( chan.valid () ) { - tsDLIter < nciu > next = chan; - next++; - chan->serviceShutdownNotify (); - chan = next; + { + epicsGuard < epicsMutex > cbGuard ( this->cbMutex ); + epicsGuard < epicsMutex > guard ( this->cacMutex ); + this->shutdown ( cbGuard, guard ); } // avoid use of ellFree because problems on windows occur if the @@ -242,29 +271,40 @@ udpiiu::~udpiiu () epicsSocketDestroy ( this->sock ); } -void udpiiu::shutdown () +void udpiiu::shutdown ( + epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard ) { // stop all of the timers - this->pGovTmr->shutdown (); - this->pSearchTmr->shutdown (); - this->pRepeaterSubscribeTmr->shutdown (); + this->repeaterSubscribeTmr.shutdown ( cbGuard, guard ); + this->govTmr.shutdown ( cbGuard, guard ); + for ( unsigned i =0; i < this->nTimers; i++ ) { + this->ppSearchTmr[i]->shutdown ( cbGuard, guard ); + } - if ( ! this->recvThread.exitWait ( 0.0 ) ) { - unsigned tries = 0u; + { + epicsGuardRelease < epicsMutex > unguard ( guard ); + { + epicsGuardRelease < epicsMutex > unguard ( cbGuard ); - this->shutdownCmd = true; + if ( ! this->recvThread.exitWait ( 0.0 ) ) { + unsigned tries = 0u; - this->wakeupMsg (); + this->shutdownCmd = true; - // wait for recv threads to exit - double shutdownDelay = 1.0; - while ( ! this->recvThread.exitWait ( shutdownDelay ) ) { - this->wakeupMsg (); - if ( shutdownDelay < 16.0 ) { - shutdownDelay += shutdownDelay; - } - if ( ++tries > 3 ) { - fprintf ( stderr, "cac: timing out waiting for UDP thread shutdown\n" ); + this->wakeupMsg (); + + // wait for recv threads to exit + double shutdownDelay = 1.0; + while ( ! this->recvThread.exitWait ( shutdownDelay ) ) { + this->wakeupMsg (); + if ( shutdownDelay < 16.0 ) { + shutdownDelay += shutdownDelay; + } + if ( ++tries > 3 ) { + fprintf ( stderr, "cac: timing out waiting for UDP thread shutdown\n" ); + } + } } } } @@ -352,7 +392,7 @@ void udpRecvThread::run () */ void udpiiu::repeaterRegistrationMessage ( unsigned attemptNumber ) { - epicsGuard < udpMutex > cbGuard ( this->mutex ); + epicsGuard < epicsMutex > cbGuard ( this->cacMutex ); caRepeaterRegistrationMessage ( this->sock, this->repeaterPort, attemptNumber ); } @@ -560,21 +600,10 @@ bool udpiiu::badUDPRespAction ( bool udpiiu::versionAction ( epicsGuard < epicsMutex > &, const caHdr & hdr, const osiSockAddr &, const epicsTime & currentTime ) { - epicsGuard < udpMutex > guard ( this->mutex ); + epicsGuard < epicsMutex > guard ( this->cacMutex ); // update the round trip time estimate if ( hdr.m_dataType & sequenceNoIsValid ) { - if ( this->rtteActive ) { - if ( this->rtteSequenceNumber == hdr.m_cid ) { - static const double gain = 0.25; - double measured = currentTime - this->rtteTimeStamp; - double error = measured - this->rtteMean; - this->rtteMean += gain * error; - this->rtteSequenceNumber = 0; - this->rtteTimeStamp = epicsTime (); - this->rtteActive = false; - } - } this->lastReceivedSeqNo = hdr.m_cid; this->lastReceivedSeqNoIsValid = true; } @@ -634,25 +663,15 @@ bool udpiiu::searchRespAction ( // X aCC 361 serverAddr.ia.sin_addr = addr.ia.sin_addr; } - bool success; if ( CA_V42 ( minorVersion ) ) { - success = this->cacRef.transferChanToVirtCircuit + this->cacRef.transferChanToVirtCircuit ( cbGuard, msg.m_available, msg.m_cid, 0xffff, - 0, minorVersion, serverAddr ); + 0, minorVersion, serverAddr, currentTime ); } else { - success = this->cacRef.transferChanToVirtCircuit + this->cacRef.transferChanToVirtCircuit ( cbGuard, msg.m_available, msg.m_cid, msg.m_dataType, - msg.m_count, minorVersion, serverAddr ); - } - - if ( success ) { - // deadlock can result if this is called while holding the primary - // mutex (because the primary mutex is used in the search timer callback) - epicsGuard < udpMutex > guard ( this->mutex ); - this->pSearchTmr->notifySuccessfulSearchResponse ( - guard, this->lastReceivedSeqNo, - this->lastReceivedSeqNoIsValid, currentTime ); + msg.m_count, minorVersion, serverAddr, currentTime ); } return true; @@ -703,14 +722,16 @@ bool udpiiu::beaconAction ( return true; } -bool udpiiu::repeaterAckAction ( epicsGuard < epicsMutex > &, const caHdr &, - const osiSockAddr &, const epicsTime &) +bool udpiiu::repeaterAckAction ( + epicsGuard < epicsMutex > & cbGuard, const caHdr &, + const osiSockAddr &, const epicsTime &) { - this->pRepeaterSubscribeTmr->confirmNotify (); + this->repeaterSubscribeTmr.confirmNotify (); return true; } -bool udpiiu::notHereRespAction ( epicsGuard < epicsMutex > &, const caHdr &, +bool udpiiu::notHereRespAction ( + epicsGuard < epicsMutex > &, const caHdr &, const osiSockAddr &, const epicsTime & ) { return true; @@ -741,7 +762,7 @@ bool udpiiu::exceptionRespAction ( return true; } -void udpiiu::postMsg ( epicsGuard < epicsMutex > & guard, +void udpiiu::postMsg ( epicsGuard < epicsMutex > & cbGuard, const osiSockAddr & net_addr, char * pInBuf, arrayElementCount blockSize, const epicsTime & currentTime ) @@ -757,7 +778,7 @@ void udpiiu::postMsg ( epicsGuard < epicsMutex > & guard, if ( blockSize < sizeof ( *pCurMsg ) ) { char buf[64]; sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); - this->printf ( guard, + this->printf ( cbGuard, "%s: Undecipherable (too small) UDP msg from %s ignored\n", __FILE__, buf ); return; @@ -794,9 +815,9 @@ void udpiiu::postMsg ( epicsGuard < epicsMutex > & guard, if ( size > blockSize ) { char buf[64]; sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); - this->printf ( guard, - "%s: Undecipherable (payload too small) UDP msg from %s ignored\n", __FILE__, - buf ); + this->printf ( cbGuard, + "%s: Undecipherable (payload too small) UDP msg from %s ignored\n", + __FILE__, buf ); return; } @@ -810,11 +831,11 @@ void udpiiu::postMsg ( epicsGuard < epicsMutex > & guard, else { pStub = &udpiiu::badUDPRespAction; } - bool success = ( this->*pStub ) ( guard, *pCurMsg, net_addr, currentTime ); + bool success = ( this->*pStub ) ( cbGuard, *pCurMsg, net_addr, currentTime ); if ( ! success ) { char buf[256]; sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); - this->printf ( guard, "CAC: Undecipherable UDP message from %s\n", buf ); + this->printf ( cbGuard, "CAC: Undecipherable UDP message from %s\n", buf ); return; } @@ -825,7 +846,7 @@ void udpiiu::postMsg ( epicsGuard < epicsMutex > & guard, bool udpiiu::pushVersionMsg () { - epicsGuard < udpMutex > guard ( this->mutex ); + epicsGuard < epicsMutex > guard ( this->cacMutex ); this->sequenceNumber++; @@ -836,13 +857,13 @@ bool udpiiu::pushVersionMsg () msg.m_count = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION ); msg.m_cid = epicsHTON32 ( this->sequenceNumber ); // sequence number - return pushDatagramMsg ( msg, 0, 0 ); + return this->pushDatagramMsg ( guard, msg, 0, 0 ); } -bool udpiiu::pushDatagramMsg ( const caHdr & msg, - const void * pExt, ca_uint16_t extsize ) +bool udpiiu::pushDatagramMsg ( epicsGuard < epicsMutex > & guard, + const caHdr & msg, const void * pExt, ca_uint16_t extsize ) { - epicsGuard < udpMutex > guard ( this->mutex ); + guard.assertIdenticalMutex ( this->cacMutex ); ca_uint16_t alignedExtSize = static_cast (CA_MESSAGE_ALIGN ( extsize )); arrayElementCount msgsize = sizeof ( caHdr ) + alignedExtSize; @@ -869,25 +890,12 @@ bool udpiiu::pushDatagramMsg ( const caHdr & msg, return true; } -void udpiiu::datagramFlush ( - epicsGuard < udpMutex > &, const epicsTime & currentTime ) +bool udpiiu::datagramFlush ( + epicsGuard < epicsMutex > &, const epicsTime & currentTime ) { // dont send the version header by itself if ( this->nBytesInXmitBuf <= sizeof ( caHdr ) ) { - return; - } - - if ( this->rtteActive ) { - double delay = currentTime - this->rtteTimeStamp; - if ( delay > 8 * this->rtteMean ) { - this->rtteSequenceNumber = this->sequenceNumber; - this->rtteTimeStamp = currentTime; - } - } - else { - this->rtteActive = true; - this->rtteSequenceNumber = this->sequenceNumber; - this->rtteTimeStamp = currentTime; + return false; } osiSockAddrNode *pNode = ( osiSockAddrNode * ) // X aCC 749 @@ -925,11 +933,11 @@ void udpiiu::datagramFlush ( break; } else { - char buf[64]; - sockAddrToDottedIP ( &pNode->addr.sa, buf, sizeof ( buf ) ); char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); + char buf[64]; + sockAddrToDottedIP ( &pNode->addr.sa, buf, sizeof ( buf ) ); errlogPrintf ( "CAC: error = \"%s\" sending UDP msg to %s\n", sockErrBuf, buf); @@ -943,11 +951,13 @@ void udpiiu::datagramFlush ( this->nBytesInXmitBuf = 0u; this->pushVersionMsg (); + + return true; } void udpiiu::show ( unsigned level ) const { - epicsGuard < udpMutex > guard ( this->mutex ); + epicsGuard < epicsMutex > guard ( this->cacMutex ); ::printf ( "Datagram IO circuit (and disconnected channel repository)\n"); if ( level > 1u ) { @@ -961,21 +971,12 @@ void udpiiu::show ( unsigned level ) const ::printf ("\tshut down command bool %u\n", this->shutdownCmd ); ::printf ( "\trecv thread exit signal:\n" ); this->recvThread.show ( level - 2u ); - ::printf ( "repeater subscribee timer:\n" ); - this->pRepeaterSubscribeTmr->show ( level - 2u ); - ::printf ( "disconnect governor subscribee timer:\n" ); - this->pGovTmr->show ( level - 2u ); - tsDLIterConst < nciu > pChan = this->disconnGovernor.firstIter (); - while ( pChan.valid () ) { - pChan->show ( level - 2u ); - pChan++; - } - ::printf ( "search message timer:\n" ); - this->pSearchTmr->show ( level - 2u ); - pChan = this->serverAddrRes.firstIter (); - while ( pChan.valid () ) { - pChan->show ( level - 2u ); - pChan++; + this->repeaterSubscribeTmr.show ( level - 2u ); + this->govTmr.show ( level - 2u ); + } + if ( level > 3u ) { + for ( unsigned i =0; i < this->nTimers; i++ ) { + this->ppSearchTmr[i]->show ( level - 3u ); } } } @@ -995,8 +996,6 @@ bool udpiiu::wakeupMsg () addr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_LOOPBACK ); addr.ia.sin_port = epicsHTON16 ( this->localPort ); - epicsGuard < udpMutex > guard ( this->mutex ); - // send a wakeup msg so the UDP recv thread will exit int status = sendto ( this->sock, reinterpret_cast < char * > ( &msg ), sizeof (msg), 0, &addr.sa, sizeof ( addr.sa ) ); @@ -1007,97 +1006,98 @@ bool udpiiu::wakeupMsg () } void udpiiu::beaconAnomalyNotify ( - epicsGuard < epicsMutex > & cacGuard, const epicsTime & currentTime ) + epicsGuard < epicsMutex > & cacGuard ) { - epicsGuard guard ( this->mutex ); - - static const double portTicksPerSec = 1000u; - static const unsigned portBasedDelayMask = 0xff; - - /* - * This is needed when many machines - * have channels in a disconnected state that - * dont exist anywhere on the network. This insures - * that we dont have many CA clients synchronously - * flooding the network with broadcasts (and swamping - * out requests for valid channels). - * - * I fetch the local UDP port number and use the low - * order bits as a pseudo random delay to prevent every - * one from replying at once. - */ - double delay = ( this->localPort & portBasedDelayMask ); - delay /= portTicksPerSec; - - this->pSearchTmr->beaconAnomalyNotify ( guard, currentTime, delay ); + for ( unsigned i = this->beaconAnomalyTimerIndex+1u; + i < this->nTimers; i++ ) { + this->ppSearchTmr[i]->moveChannels ( cacGuard, + *this->ppSearchTmr[this->beaconAnomalyTimerIndex] ); + } } -bool udpiiu::searchMsg ( epicsGuard < udpMutex > & /* guard */ ) +void udpiiu::uninstallChanDueToSuccessfulSearchResponse ( + epicsGuard < epicsMutex > & guard, nciu & chan, + const epicsTime & currentTime ) { - bool success; - - if ( nciu *pChan = this->serverAddrRes.get () ) { - success = pChan->searchMsg ( *this ); - if ( success ) { - this->serverAddrRes.add ( *pChan ); - } - else { - this->serverAddrRes.push ( *pChan ); - } + channelNode::channelState chanState = + chan.channelNode::listMember; + if ( chanState == channelNode::cs_disconnGov ) { + this->govTmr.uninstallChan ( guard, chan ); } else { - success = false; + this->ppSearchTmr[ chan.getSearchTimerIndex ( guard ) ]-> + uninstallChanDueToSuccessfulSearchResponse ( + guard, chan, this->lastReceivedSeqNo, + this->lastReceivedSeqNoIsValid, currentTime ); } - - return success; } -void udpiiu::installNewChannel ( const epicsTime & currentTime, nciu & chan ) +void udpiiu::uninstallChan ( + epicsGuard < epicsMutex > & guard, nciu & chan ) { - bool firstChannel = false; - epicsGuard < udpMutex > guard ( this->mutex ); - if ( this->serverAddrRes.count() == 0 ) { - firstChannel = true; + channelNode::channelState chanState = + chan.channelNode::listMember; + if ( chanState == channelNode::cs_disconnGov ) { + this->govTmr.uninstallChan ( guard, chan ); + } + else { + this->ppSearchTmr[ chan.getSearchTimerIndex ( guard ) ]-> + uninstallChan ( guard, chan ); } - // push them to the front of the list so that - // a search request is sent immediately, and - // so that the new channel's retry count is - // seen when calculating the minimum retry - // which is used to compute the search interval - this->serverAddrRes.push ( chan ); - chan.channelNode::listMember = - channelNode::cs_serverAddrResPend; - - this->pSearchTmr->channelCreatedNotify ( - guard, currentTime, firstChannel ); } -void udpiiu::installDisconnectedChannel ( nciu & chan ) +bool udpiiu::searchMsg ( + epicsGuard < epicsMutex > & guard, ca_uint32_t id, + const char * pName, unsigned nameLength ) { - epicsGuard < udpMutex > guard ( this->mutex ); - this->disconnGovernor.add ( chan ); - chan.channelNode::listMember = - channelNode::cs_disconnGov; + caHdr msg; + msg.m_cmmd = epicsHTON16 ( CA_PROTO_SEARCH ); + msg.m_available = epicsHTON32 ( id ); + msg.m_dataType = epicsHTON16 ( DONTREPLY ); + msg.m_count = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION ); + msg.m_cid = epicsHTON32 ( id ); + return this->pushDatagramMsg ( + guard, msg, pName, nameLength ); } -void udpiiu::govExpireNotify ( const epicsTime & currentTime ) +void udpiiu::installNewChannel ( + epicsGuard < epicsMutex > & guard, nciu & chan, netiiu * & piiu ) { - epicsGuard < udpMutex > guard ( this->mutex ); - if ( this->disconnGovernor.count () ) { - bool firstChannel = this->serverAddrRes.count() == 0; - // push them to the front of the list so that - // a search request is sent immediately, and - // so that the new channel's retry count is - // seen when calculating the minimum retry - // which is used to compute the search interval - while ( nciu * pChan = this->disconnGovernor.get () ) { - this->serverAddrRes.push ( *pChan ); - pChan->channelNode::listMember = - channelNode::cs_serverAddrResPend; - } - this->pSearchTmr->channelDisconnectedNotify ( - guard, currentTime, firstChannel ); + piiu = this; + this->ppSearchTmr[0]->installChannel ( guard, chan ); +} + +void udpiiu::installDisconnectedChannel ( + epicsGuard < epicsMutex > & guard, nciu & chan ) +{ + chan.setServerAddressUnknown ( *this, guard ); + this->govTmr.installChan ( guard, chan ); +} + +void udpiiu::noSearchRespNotify ( + epicsGuard < epicsMutex > & guard, nciu & chan, unsigned index ) +{ + const unsigned nTimersMinusOne = this->nTimers - 1; + if ( index < nTimersMinusOne ) { + index++; } + else { + index = nTimersMinusOne; + } + this->ppSearchTmr[index]->installChannel ( guard, chan ); +} + +void udpiiu::boostChannel ( + epicsGuard < epicsMutex > & guard, nciu & chan ) +{ + this->ppSearchTmr[this->beaconAnomalyTimerIndex]-> + installChannel ( guard, chan ); +} + +void udpiiu::govExpireNotify ( + epicsGuard < epicsMutex > & guard, nciu & chan ) +{ + this->ppSearchTmr[0]->installChannel ( guard, chan ); } int udpiiu::printf ( epicsGuard < epicsMutex > & cbGuard, @@ -1115,26 +1115,15 @@ int udpiiu::printf ( epicsGuard < epicsMutex > & cbGuard, return status; } -void udpiiu::uninstallChan ( - epicsGuard < epicsMutex > & cbGuard, - epicsGuard < epicsMutex > & cacGuard, - nciu & chan ) +void udpiiu::updateRTTE ( double measured ) { - cbGuard.assertIdenticalMutex ( this->cbMutex ); - cacGuard.assertIdenticalMutex ( this->cacMutex ); + double error = measured - this->rtteMean; + this->rtteMean += 0.25 * error; +} - epicsGuard < udpMutex > guard ( this->mutex ); - if ( chan.channelNode::listMember == channelNode::cs_disconnGov ) { - this->disconnGovernor.remove ( chan ); - } - else if ( chan.channelNode::listMember == channelNode::cs_serverAddrResPend ) { - this->serverAddrRes.remove ( chan ); - } - else { - this->cacRef.printf ( cbGuard, - "cac: attempt to uninstall channel from udp iiu, but it inst installed there?" ); - } - chan.channelNode::listMember = channelNode::cs_none; +double udpiiu::getRTTE () const +{ + return max ( this->rtteMean, minRoundTripEstimate ); } void udpiiu::hostName ( @@ -1167,7 +1156,6 @@ void udpiiu::writeRequest ( nciu & chan, unsigned type, arrayElementCount nElem, const void * pValue ) { - guard.assertIdenticalMutex ( this->cacMutex ); netiiu::writeRequest ( guard, chan, type, nElem, pValue ); } @@ -1176,7 +1164,6 @@ void udpiiu::writeNotifyRequest ( netWriteNotifyIO & io, unsigned type, arrayElementCount nElem, const void *pValue ) { - guard.assertIdenticalMutex ( this->cacMutex ); netiiu::writeNotifyRequest ( guard, chan, io, type, nElem, pValue ); } @@ -1184,7 +1171,6 @@ void udpiiu::readNotifyRequest ( epicsGuard < epicsMutex > & guard, nciu & chan, netReadNotifyIO & io, unsigned type, arrayElementCount nElem ) { - guard.assertIdenticalMutex ( this->cacMutex ); netiiu::readNotifyRequest ( guard, chan, io, type, nElem ); } @@ -1192,7 +1178,6 @@ void udpiiu::clearChannelRequest ( epicsGuard < epicsMutex > & guard, ca_uint32_t sid, ca_uint32_t cid ) { - guard.assertIdenticalMutex ( this->cacMutex ); netiiu::clearChannelRequest ( guard, sid, cid ); } @@ -1200,21 +1185,21 @@ void udpiiu::subscriptionRequest ( epicsGuard < epicsMutex > & guard, nciu & chan, netSubscription & subscr ) { - guard.assertIdenticalMutex ( this->cacMutex ); netiiu::subscriptionRequest ( guard, chan, subscr ); } void udpiiu::subscriptionUpdateRequest ( - epicsGuard < epicsMutex > &, nciu &, - netSubscription & ) + epicsGuard < epicsMutex > & guard, nciu & chan, + netSubscription & subscr ) { + netiiu::subscriptionUpdateRequest ( + guard, chan, subscr ); } void udpiiu::subscriptionCancelRequest ( epicsGuard < epicsMutex > & guard, nciu & chan, netSubscription & subscr ) { - guard.assertIdenticalMutex ( this->cacMutex ); netiiu::subscriptionCancelRequest ( guard, chan, subscr ); } @@ -1225,32 +1210,33 @@ void udpiiu::flushRequest ( } void udpiiu::eliminateExcessiveSendBacklog ( - epicsGuard < epicsMutex > *, - epicsGuard < epicsMutex > & ) + epicsGuard < epicsMutex > * pCBGuard, + epicsGuard < epicsMutex > & guard ) { + netiiu::eliminateExcessiveSendBacklog ( pCBGuard, guard ); } void udpiiu::requestRecvProcessPostponedFlush ( epicsGuard < epicsMutex > & guard ) { - guard.assertIdenticalMutex ( this->cacMutex ); netiiu::requestRecvProcessPostponedFlush ( guard ); } osiSockAddr udpiiu::getNetworkAddress ( epicsGuard < epicsMutex > & guard ) const { - guard.assertIdenticalMutex ( this->cacMutex ); return netiiu::getNetworkAddress ( guard ); } double udpiiu::receiveWatchdogDelay ( epicsGuard < epicsMutex > & guard ) const { - guard.assertIdenticalMutex ( this->cacMutex ); - return - DBL_MAX; + return netiiu::receiveWatchdogDelay ( guard ); } - - +ca_uint32_t udpiiu::datagramSeqNumber ( + epicsGuard < epicsMutex > & ) const +{ + return this->sequenceNumber; +} diff --git a/src/ca/udpiiu.h b/src/ca/udpiiu.h index 0dfd56fcd..bd39e59fe 100644 --- a/src/ca/udpiiu.h +++ b/src/ca/udpiiu.h @@ -37,20 +37,26 @@ #include "epicsMemory.h" #include "epicsTime.h" #include "tsMinMax.h" +#include "tsDLList.h" #ifdef udpiiuh_accessh_epicsExportSharedSymbols # define epicsExportSharedSymbols # include "shareLib.h" #endif - #include "netiiu.h" +#include "searchTimer.h" +#include "disconnectGovernorTimer.h" +#include "repeaterSubscribeTimer.h" extern "C" void cacRecvThreadUDP ( void *pParam ); -epicsShareFunc void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repeaterPort ); -epicsShareFunc void epicsShareAPI caRepeaterRegistrationMessage ( SOCKET sock, unsigned repeaterPort, unsigned attemptNumber ); -extern "C" epicsShareFunc void caRepeaterThread ( void *pDummy ); +epicsShareFunc void epicsShareAPI caStartRepeaterIfNotInstalled ( + unsigned repeaterPort ); +epicsShareFunc void epicsShareAPI caRepeaterRegistrationMessage ( + SOCKET sock, unsigned repeaterPort, unsigned attemptNumber ); +extern "C" epicsShareFunc void caRepeaterThread ( + void * pDummy ); epicsShareFunc void ca_repeater ( void ); class cac; @@ -74,16 +80,16 @@ private: void run(); }; -class udpMutex { -public: - void lock (); - void unlock (); - void show ( unsigned level ) const; -private: - epicsMutex mutex; -}; +static const double minRoundTripEstimate = 32e-3; // seconds +static const double maxSearchPeriodDefault = 5.0 * 60.0; // seconds +static const double maxSearchPeriodLowerLimit = 60.0; // seconds +static const double beaconAnomalySearchPeriod = 5.0; // seconds -class udpiiu : public netiiu { +class udpiiu : + private netiiu, + private searchTimerNotify, + private disconnectGovernorNotify, + private repeaterTimerNotify { public: udpiiu ( class epicsTimerQueueActive &, @@ -92,27 +98,15 @@ public: cacContextNotify &, class cac & ); virtual ~udpiiu (); - void installNewChannel ( const epicsTime & currentTime, nciu & ); - void installDisconnectedChannel ( nciu & ); - void repeaterRegistrationMessage ( unsigned attemptNumber ); - bool searchMsg ( epicsGuard < udpMutex > & ); - void datagramFlush ( epicsGuard < udpMutex > &, const epicsTime & currentTime ); - ca_uint32_t datagramSeqNumber ( epicsGuard < udpMutex > & ) const; - void show ( unsigned level ) const; - bool wakeupMsg (); + void installNewChannel ( + epicsGuard < epicsMutex > &, nciu &, netiiu * & ); + void installDisconnectedChannel ( + epicsGuard < epicsMutex > &, nciu & ); void beaconAnomalyNotify ( - epicsGuard < epicsMutex > & guard, const epicsTime & currentTime ); - void govExpireNotify ( const epicsTime & currentTime ); - unsigned unresolvedChannelCount ( epicsGuard < udpMutex > & ) const; - void uninstallChan ( - epicsGuard < epicsMutex > & cbGuard, - epicsGuard < epicsMutex > & guard, - nciu & ); - bool pushDatagramMsg ( const caHdr & hdr, - const void * pExt, ca_uint16_t extsize); - void shutdown (); - double roundTripDelayEstimate ( epicsGuard < udpMutex > & ) const; - int printf ( epicsGuard < epicsMutex > & callbackControl, const char *pformat, ... ); + epicsGuard < epicsMutex > & guard ); + void shutdown ( epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard ); + void show ( unsigned level ) const; // exceptions class noSocket {}; @@ -120,37 +114,39 @@ public: private: char xmitBuf [MAX_UDP_SEND]; char recvBuf [MAX_UDP_RECV]; - mutable udpMutex mutex; udpRecvThread recvThread; - // nciu state field tells us which list - tsDLList < nciu > disconnGovernor; - tsDLList < nciu > serverAddrRes; + repeaterSubscribeTimer repeaterSubscribeTmr; + disconnectGovernorTimer govTmr; ELLLIST dest; - epicsTime rtteTimeStamp; + double maxPeriod; double rtteMean; cac & cacRef; mutable epicsMutex & cbMutex; mutable epicsMutex & cacMutex; + epics_auto_ptr < epics_auto_ptr < class searchTimer >, eapt_array > ppSearchTmr; unsigned nBytesInXmitBuf; + unsigned nTimers; + unsigned beaconAnomalyTimerIndex; ca_uint32_t sequenceNumber; - ca_uint32_t rtteSequenceNumber; ca_uint32_t lastReceivedSeqNo; SOCKET sock; - epics_auto_ptr < class disconnectGovernorTimer > pGovTmr; - epics_auto_ptr < class searchTimer > pSearchTmr; - epics_auto_ptr < class repeaterSubscribeTimer > pRepeaterSubscribeTmr; ca_uint16_t repeaterPort; ca_uint16_t serverPort; ca_uint16_t localPort; bool shutdownCmd; - bool rtteActive; bool lastReceivedSeqNoIsValid; - void postMsg ( epicsGuard < epicsMutex > &, + bool wakeupMsg (); + + void postMsg ( epicsGuard < epicsMutex > & cbGuard, const osiSockAddr & net_addr, char *pInBuf, arrayElementCount blockSize, const epicsTime ¤Time ); + bool pushDatagramMsg ( epicsGuard < epicsMutex > &, + const caHdr & hdr, const void * pExt, + ca_uint16_t extsize); + typedef bool ( udpiiu::*pProtoStubUDP ) ( epicsGuard < epicsMutex > &, const caHdr &, const osiSockAddr &, const epicsTime & ); @@ -181,87 +177,90 @@ private: epicsGuard < epicsMutex > &, const caHdr &msg, const osiSockAddr &net_addr, const epicsTime & ); - friend class udpRecvThread; - + // netiiu stubs void hostName ( - epicsGuard < epicsMutex > &, - char *pBuf, unsigned bufLength ) const; + epicsGuard < epicsMutex > &, char * pBuf, + unsigned bufLength ) const; const char * pHostName ( epicsGuard < epicsMutex > & ) const; + bool ca_v41_ok ( + epicsGuard < epicsMutex > & ) const; bool ca_v42_ok ( epicsGuard < epicsMutex > & ) const; - bool ca_v41_ok ( - epicsGuard < epicsMutex > & ) const; - void writeRequest ( - epicsGuard < epicsMutex > &, nciu &, unsigned type, - arrayElementCount nElem, const void *pValue ); + void eliminateExcessiveSendBacklog ( + epicsGuard < epicsMutex > * pCallbackGuard, + epicsGuard < epicsMutex > & mutualExclusionGuard ); + void writeRequest ( + epicsGuard < epicsMutex > &, nciu &, + unsigned type, arrayElementCount nElem, + const void *pValue ); void writeNotifyRequest ( - epicsGuard < epicsMutex > &, nciu &, netWriteNotifyIO &, - unsigned type, arrayElementCount nElem, const void *pValue ); + epicsGuard < epicsMutex > &, + nciu &, netWriteNotifyIO &, + unsigned type, arrayElementCount nElem, + const void *pValue ); void readNotifyRequest ( - epicsGuard < epicsMutex > &, nciu &, netReadNotifyIO &, - unsigned type, arrayElementCount nElem ); + epicsGuard < epicsMutex > &, nciu &, + netReadNotifyIO &, unsigned type, + arrayElementCount nElem ); void clearChannelRequest ( epicsGuard < epicsMutex > &, ca_uint32_t sid, ca_uint32_t cid ); - void subscriptionRequest ( - epicsGuard < epicsMutex > &, nciu &, - netSubscription & ); + void subscriptionRequest ( + epicsGuard < epicsMutex > &, + nciu &, netSubscription & ); void subscriptionUpdateRequest ( - epicsGuard < epicsMutex > &, nciu &, - netSubscription & ); - bool pushVersionMsg (); + epicsGuard < epicsMutex > &, + nciu &, netSubscription & ); void subscriptionCancelRequest ( epicsGuard < epicsMutex > &, nciu & chan, netSubscription & subscr ); void flushRequest ( epicsGuard < epicsMutex > & ); - void eliminateExcessiveSendBacklog ( - epicsGuard < epicsMutex > * pCallbackGuard, - epicsGuard < epicsMutex > & mutualExclusionGuard ); void requestRecvProcessPostponedFlush ( epicsGuard < epicsMutex > & ); - osiSockAddr getNetworkAddress ( + osiSockAddr getNetworkAddress ( epicsGuard < epicsMutex > & ) const; - double receiveWatchdogDelay ( + void uninstallChan ( + epicsGuard < epicsMutex > &, nciu & ); + void uninstallChanDueToSuccessfulSearchResponse ( + epicsGuard < epicsMutex > &, nciu &, + const class epicsTime & currentTime ); + double receiveWatchdogDelay ( epicsGuard < epicsMutex > & ) const; + bool searchMsg ( + epicsGuard < epicsMutex > &, ca_uint32_t id, + const char * pName, unsigned nameLength ); + + // searchTimerNotify stubs + double getRTTE () const; + void updateRTTE ( double rtte ); + bool pushVersionMsg (); + void boostChannel ( + epicsGuard < epicsMutex > & guard, nciu & chan ); + void noSearchRespNotify ( + epicsGuard < epicsMutex > &, nciu & chan, unsigned index ); + bool datagramFlush ( + epicsGuard < epicsMutex > &, const epicsTime & currentTime ); + ca_uint32_t datagramSeqNumber ( + epicsGuard < epicsMutex > & ) const; + + // disconnectGovernorNotify + void govExpireNotify ( + epicsGuard < epicsMutex > &, nciu & ); + + // repeaterTimerNotify + void repeaterRegistrationMessage ( + unsigned attemptNumber ); + int printf ( + epicsGuard < epicsMutex > & callbackControl, + const char * pformat, ... ); udpiiu ( const udpiiu & ); udpiiu & operator = ( const udpiiu & ); + + friend class udpRecvThread; }; -inline void udpMutex::lock () -{ - this->mutex.lock (); -} - -inline void udpMutex::unlock () -{ - this->mutex.unlock (); -} - -inline void udpMutex::show ( unsigned level ) const -{ - this->mutex.show ( level ); -} - -inline unsigned udpiiu::unresolvedChannelCount ( - epicsGuard < udpMutex > & ) const -{ - return this->serverAddrRes.count (); -} - -inline ca_uint32_t udpiiu::datagramSeqNumber ( - epicsGuard < udpMutex > & ) const -{ - return this->sequenceNumber; -} - -inline double udpiiu::roundTripDelayEstimate ( - epicsGuard < udpMutex > & ) const -{ - return this->rtteMean; -} - #endif // udpiiuh diff --git a/src/ca/virtualCircuit.h b/src/ca/virtualCircuit.h index 31142c63d..16aab7f02 100644 --- a/src/ca/virtualCircuit.h +++ b/src/ca/virtualCircuit.h @@ -109,7 +109,8 @@ public: epicsGuard < epicsMutex > & guard ); void sendTimeoutNotify ( const epicsTime & currentTime, - callbackManager & ); + callbackManager & cbMgr, + epicsGuard < epicsMutex > & guard ); void receiveTimeoutNotify( callbackManager &, epicsGuard < epicsMutex > & ); @@ -159,14 +160,16 @@ public: const char *pformat, ... ); unsigned channelCount ( epicsGuard < epicsMutex > & ); - void removeAllChannels ( - bool supressApplicationNotify, + void disconnectAllChannels ( epicsGuard < epicsMutex > & cbGuard, - epicsGuard < epicsMutex > & guard, udpiiu & ); + epicsGuard < epicsMutex > & guard, class udpiiu & ); + void unlinkAllChannels ( + epicsGuard < epicsMutex > & cbGuard, + epicsGuard < epicsMutex > & guard ); void installChannel ( epicsGuard < epicsMutex > &, epicsGuard < epicsMutex > &, nciu & chan, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn ); - void uninstallChan ( epicsGuard < epicsMutex > & cbGuard, + void uninstallChan ( epicsGuard < epicsMutex > & guard, nciu & chan ); void connectNotify ( epicsGuard < epicsMutex > &, nciu & chan ); @@ -289,6 +292,13 @@ private: bool flush ( epicsGuard < epicsMutex > & ); // only to be called by the send thread + // netiiu stubs + void uninstallChanDueToSuccessfulSearchResponse ( + epicsGuard < epicsMutex > &, nciu &, const class epicsTime & ); + bool searchMsg ( + epicsGuard < epicsMutex > &, ca_uint32_t id, + const char * pName, unsigned nameLength ); + friend class tcpRecvThread; friend class tcpSendThread;