From de6ce2f6794faf4461fefe9f57f11fb0fc526987 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Sat, 18 Nov 2006 00:56:59 +0000 Subject: [PATCH] dont allow unbounded round trip estimates --- src/ca/udpiiu.cpp | 116 ++++++++++++++++++++++++++-------------------- src/ca/udpiiu.h | 7 ++- 2 files changed, 71 insertions(+), 52 deletions(-) diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp index 5a0d3ccdd..53e7896fd 100644 --- a/src/ca/udpiiu.cpp +++ b/src/ca/udpiiu.cpp @@ -67,6 +67,7 @@ const udpiiu::pProtoStubUDP udpiiu::udpJumpTableCAC [] = // udpiiu::udpiiu () // udpiiu::udpiiu ( + epicsGuard < epicsMutex > & cacGuard, epicsTimerQueueActive & timerQueue, epicsMutex & cbMutexIn, epicsMutex & cacMutexIn, @@ -82,6 +83,7 @@ udpiiu::udpiiu ( govTmr ( *this, timerQueue, cacMutexIn ), maxPeriod ( maxSearchPeriodDefault ), rtteMean ( minRoundTripEstimate ), + rtteMeanDev ( 0 ), cacRef ( cac ), cbMutex ( cbMutexIn ), cacMutex ( cacMutexIn ), @@ -97,6 +99,8 @@ udpiiu::udpiiu ( shutdownCmd ( false ), lastReceivedSeqNoIsValid ( false ) { + cacGuard.assertIdenticalMutex ( cacMutex ); + if ( envGetConfigParamPtr ( & EPICS_CA_MAX_SEARCH_PERIOD ) ) { long longStatus = envGetDoubleConfigParam ( & EPICS_CA_MAX_SEARCH_PERIOD, & this->maxPeriod ); @@ -196,8 +200,8 @@ udpiiu::udpiiu ( osiSockAddr addr; memset ( (char *)&addr, 0 , sizeof (addr) ); addr.ia.sin_family = AF_INET; - addr.ia.sin_addr.s_addr = epicsHTON32 (INADDR_ANY); - addr.ia.sin_port = epicsHTON16 (PORT_ANY); // X aCC 818 + addr.ia.sin_addr.s_addr = htonl ( INADDR_ANY ); + addr.ia.sin_port = htons ( PORT_ANY ); // X aCC 818 status = bind (this->sock, &addr.sa, sizeof (addr) ); if ( status < 0 ) { char sockErrBuf[64]; @@ -226,7 +230,7 @@ udpiiu::udpiiu ( errlogPrintf ( "CAC: UDP socket was not inet addr family\n" ); throwWithLocation ( noSocket () ); } - this->localPort = epicsNTOH16 ( tmpAddr.ia.sin_port ); + this->localPort = ntohs ( tmpAddr.ia.sin_port ); } /* @@ -242,7 +246,7 @@ udpiiu::udpiiu ( // start timers and receive thread for ( unsigned j =0; j < this->nTimers; j++ ) { - this->ppSearchTmr[j]->start (); + this->ppSearchTmr[j]->start ( cacGuard ); } this->govTmr.start (); this->repeaterSubscribeTmr.start (); @@ -439,21 +443,21 @@ void epicsShareAPI caRepeaterRegistrationMessage ( * this will only work with 3.13 beta 12 CA repeaters or later */ saddr.ia.sin_family = AF_INET; - saddr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_LOOPBACK ); - saddr.ia.sin_port = epicsHTON16 ( port ); + saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK ); + saddr.ia.sin_port = htons ( port ); } else { - saddr.ia.sin_port = epicsHTON16 ( port ); + saddr.ia.sin_port = htons ( port ); } } else { saddr.ia.sin_family = AF_INET; - saddr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_LOOPBACK ); - saddr.ia.sin_port = epicsHTON16 ( port ); + saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK ); + saddr.ia.sin_port = htons ( port ); } memset ( (char *) &msg, 0, sizeof (msg) ); - msg.m_cmmd = epicsHTON16 ( REPEATER_REGISTER ); // X aCC 818 + AlignedWireRef < epicsUInt16 > ( msg.m_cmmd ) = REPEATER_REGISTER; // X aCC 818 msg.m_available = saddr.ia.sin_addr.s_addr; /* @@ -537,8 +541,8 @@ void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repeaterPort ) ca_uint16_t port = static_cast < ca_uint16_t > ( repeaterPort ); memset ( (char *) &bd, 0, sizeof ( bd ) ); bd.ia.sin_family = AF_INET; - bd.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_ANY ); - bd.ia.sin_port = epicsHTON16 ( port ); + bd.ia.sin_addr.s_addr = htonl ( INADDR_ANY ); + bd.ia.sin_port = htons ( port ); status = bind ( tmpSock, &bd.sa, sizeof ( bd ) ); if ( status < 0 ) { if ( SOCKERRNO == SOCK_EADDRINUSE ) { @@ -650,14 +654,14 @@ bool udpiiu::searchRespAction ( // X aCC 361 else { serverAddr.ia.sin_addr = addr.ia.sin_addr; } - serverAddr.ia.sin_port = epicsHTON16 ( msg.m_dataType ); + serverAddr.ia.sin_port = htons ( msg.m_dataType ); } else if ( CA_V45 (minorVersion) ) { - serverAddr.ia.sin_port = epicsHTON16 ( msg.m_dataType ); + serverAddr.ia.sin_port = htons ( msg.m_dataType ); serverAddr.ia.sin_addr = addr.ia.sin_addr; } else { - serverAddr.ia.sin_port = epicsHTON16 ( this->serverPort ); + serverAddr.ia.sin_port = htons ( this->serverPort ); serverAddr.ia.sin_addr = addr.ia.sin_addr; } @@ -691,25 +695,25 @@ bool udpiiu::beaconAction ( * * old servers: * 1) set this field to one of the ip addresses of the host _or_ - * 2) set this field to epicsHTON32(INADDR_ANY) + * 2) set this field to INADDR_ANY * new servers: - * always set this field to epicsHTON32(INADDR_ANY) + * always set this field to INADDR_ANY * * clients always assume that if this - * field is set to something that isnt epicsHTON32(INADDR_ANY) + * field is set to something that isnt INADDR_ANY * then it is the overriding IP address of the server. */ ina.sin_family = AF_INET; - ina.sin_addr.s_addr = epicsHTON32 ( msg.m_available ); + ina.sin_addr.s_addr = htonl ( msg.m_available ); if ( msg.m_count != 0 ) { - ina.sin_port = epicsHTON16 ( msg.m_count ); + ina.sin_port = htons ( msg.m_count ); } else { /* * old servers dont supply this and the * default port must be assumed */ - ina.sin_port = epicsHTON16 ( this->serverPort ); + ina.sin_port = htons ( this->serverPort ); } unsigned protocolRevision = msg.m_dataType; ca_uint32_t beaconNumber = msg.m_cid; @@ -787,12 +791,12 @@ void udpiiu::postMsg ( /* * fix endian of bytes */ - pCurMsg->m_postsize = epicsNTOH16 ( pCurMsg->m_postsize ); - pCurMsg->m_cmmd = epicsNTOH16 ( pCurMsg->m_cmmd ); - pCurMsg->m_dataType = epicsNTOH16 ( pCurMsg->m_dataType ); - pCurMsg->m_count = epicsNTOH16 ( pCurMsg->m_count ); - pCurMsg->m_available = epicsNTOH32 ( pCurMsg->m_available ); - pCurMsg->m_cid = epicsNTOH32 ( pCurMsg->m_cid ); + pCurMsg->m_postsize = AlignedWireRef < epicsUInt16 > ( pCurMsg->m_postsize ); + pCurMsg->m_cmmd = AlignedWireRef < epicsUInt16 > ( pCurMsg->m_cmmd ); + pCurMsg->m_dataType = AlignedWireRef < epicsUInt16 > ( pCurMsg->m_dataType ); + pCurMsg->m_count = AlignedWireRef < epicsUInt16 > ( pCurMsg->m_count ); + pCurMsg->m_available = AlignedWireRef < epicsUInt32 > ( pCurMsg->m_available ); + pCurMsg->m_cid = AlignedWireRef < epicsUInt32 > ( pCurMsg->m_cid ); #if 0 printf ( "UDP Cmd=%3d Type=%3d Count=%4d Size=%4d", @@ -849,11 +853,11 @@ bool udpiiu::pushVersionMsg () this->sequenceNumber++; caHdr msg; - msg.m_cmmd = epicsHTON16 ( CA_PROTO_VERSION ); - msg.m_available = epicsHTON32 ( 0 ); - msg.m_dataType = epicsHTON16 ( sequenceNoIsValid ); - msg.m_count = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION ); - msg.m_cid = epicsHTON32 ( this->sequenceNumber ); // sequence number + AlignedWireRef < epicsUInt16 > ( msg.m_cmmd ) = CA_PROTO_VERSION; + AlignedWireRef < epicsUInt32 > ( msg.m_available ) = 0; + AlignedWireRef < epicsUInt16 > ( msg.m_dataType ) = sequenceNoIsValid; + AlignedWireRef < epicsUInt16 > ( msg.m_count ) = CA_MINOR_PROTOCOL_REVISION; + AlignedWireRef < epicsUInt32 > ( msg.m_cid ) = this->sequenceNumber; // sequence number return this->pushDatagramMsg ( guard, msg, 0, 0 ); } @@ -882,7 +886,7 @@ bool udpiiu::pushDatagramMsg ( epicsGuard < epicsMutex > & guard, char *pDest = (char *) ( pbufmsg + 1 ); memset ( pDest + extsize, '\0', alignedExtSize - extsize ); } - pbufmsg->m_postsize = epicsHTON16 ( alignedExtSize ); + AlignedWireRef < epicsUInt16 > ( pbufmsg->m_postsize ) = alignedExtSize; this->nBytesInXmitBuf += msgsize; return true; @@ -982,17 +986,17 @@ void udpiiu::show ( unsigned level ) const bool udpiiu::wakeupMsg () { caHdr msg; - msg.m_cmmd = epicsHTON16 ( CA_PROTO_VERSION ); - msg.m_available = epicsHTON32 ( 0u ); - msg.m_dataType = epicsHTON16 ( 0u ); - msg.m_count = epicsHTON16 ( 0u ); - msg.m_cid = epicsHTON32 ( 0u ); - msg.m_postsize = epicsHTON16 ( 0u ); + AlignedWireRef < epicsUInt16 > ( msg.m_cmmd ) = CA_PROTO_VERSION; + AlignedWireRef < epicsUInt32 > ( msg.m_available ) = 0u; + AlignedWireRef < epicsUInt16 > ( msg.m_dataType ) = 0u; + AlignedWireRef < epicsUInt16 > ( msg.m_count ) = 0u; + AlignedWireRef < epicsUInt32 > ( msg.m_cid ) = 0u; + AlignedWireRef < epicsUInt16 > ( msg.m_postsize ) = 0u; osiSockAddr addr; addr.ia.sin_family = AF_INET; - addr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_LOOPBACK ); - addr.ia.sin_port = epicsHTON16 ( this->localPort ); + addr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK ); + addr.ia.sin_port = htons ( this->localPort ); // send a wakeup msg so the UDP recv thread will exit int status = sendto ( this->sock, reinterpret_cast < char * > ( &msg ), @@ -1049,11 +1053,11 @@ bool udpiiu::searchMsg ( const char * pName, unsigned nameLength ) { caHdr msg; - msg.m_cmmd = epicsHTON16 ( CA_PROTO_SEARCH ); - msg.m_available = epicsHTON32 ( id ); - msg.m_dataType = epicsHTON16 ( DONTREPLY ); - msg.m_count = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION ); - msg.m_cid = epicsHTON32 ( id ); + AlignedWireRef < epicsUInt16 > ( msg.m_cmmd ) = CA_PROTO_SEARCH; + AlignedWireRef < epicsUInt32 > ( msg.m_available ) = id; + AlignedWireRef < epicsUInt16 > ( msg.m_dataType ) = DONTREPLY; + AlignedWireRef < epicsUInt16 > ( msg.m_count ) = CA_MINOR_PROTOCOL_REVISION; + AlignedWireRef < epicsUInt32 > ( msg.m_cid ) = id; return this->pushDatagramMsg ( guard, msg, pName, (ca_uint16_t) nameLength ); } @@ -1113,15 +1117,27 @@ int udpiiu::printf ( epicsGuard < epicsMutex > & cbGuard, return status; } -void udpiiu::updateRTTE ( double measured ) +void udpiiu::updateRTTE ( epicsGuard < epicsMutex > & guard, double measured ) { + guard.assertIdenticalMutex ( this->cacMutex ); + if ( measured > maxRoundTripEstimate ) { + measured = maxRoundTripEstimate; + } + if ( measured < minRoundTripEstimate ) { + measured = minRoundTripEstimate; + } double error = measured - this->rtteMean; - this->rtteMean += 0.25 * error; + this->rtteMean += 0.125 * error; + if ( error < 0.0 ) { + error = - error; + } + this->rtteMeanDev = this->rtteMeanDev + .25 * ( error - this->rtteMeanDev ); } -double udpiiu::getRTTE () const +double udpiiu::getRTTE ( epicsGuard < epicsMutex > & guard ) const { - return epicsMax ( this->rtteMean, minRoundTripEstimate ); + guard.assertIdenticalMutex ( this->cacMutex ); + return this->rtteMean + 4 * this->rtteMeanDev; } unsigned udpiiu::getHostName ( diff --git a/src/ca/udpiiu.h b/src/ca/udpiiu.h index 639cebcfe..0f9b191d1 100644 --- a/src/ca/udpiiu.h +++ b/src/ca/udpiiu.h @@ -80,6 +80,7 @@ private: }; static const double minRoundTripEstimate = 32e-3; // seconds +static const double maxRoundTripEstimate = 30; // seconds static const double maxSearchPeriodDefault = 5.0 * 60.0; // seconds static const double maxSearchPeriodLowerLimit = 60.0; // seconds static const double beaconAnomalySearchPeriod = 5.0; // seconds @@ -91,6 +92,7 @@ class udpiiu : private repeaterTimerNotify { public: udpiiu ( + epicsGuard < epicsMutex > & cacGuard, class epicsTimerQueueActive &, epicsMutex & callbackControl, epicsMutex & mutualExclusion, @@ -119,6 +121,7 @@ private: ELLLIST dest; double maxPeriod; double rtteMean; + double rtteMeanDev; cac & cacRef; mutable epicsMutex & cbMutex; mutable epicsMutex & cacMutex; @@ -232,8 +235,8 @@ private: const char * pName, unsigned nameLength ); // searchTimerNotify stubs - double getRTTE () const; - void updateRTTE ( double rtte ); + double getRTTE ( epicsGuard < epicsMutex > & ) const; + void updateRTTE ( epicsGuard < epicsMutex > &, double rtte ); bool pushVersionMsg (); void boostChannel ( epicsGuard < epicsMutex > & guard, nciu & chan );