dont allow unbounded round trip estimates

This commit is contained in:
Jeff Hill
2006-11-18 00:56:59 +00:00
parent ffb5e239f2
commit de6ce2f679
2 changed files with 71 additions and 52 deletions
+66 -50
View File
@@ -67,6 +67,7 @@ const udpiiu::pProtoStubUDP udpiiu::udpJumpTableCAC [] =
// udpiiu::udpiiu ()
//
udpiiu::udpiiu (
epicsGuard < epicsMutex > & cacGuard,
epicsTimerQueueActive & timerQueue,
epicsMutex & cbMutexIn,
epicsMutex & cacMutexIn,
@@ -82,6 +83,7 @@ udpiiu::udpiiu (
govTmr ( *this, timerQueue, cacMutexIn ),
maxPeriod ( maxSearchPeriodDefault ),
rtteMean ( minRoundTripEstimate ),
rtteMeanDev ( 0 ),
cacRef ( cac ),
cbMutex ( cbMutexIn ),
cacMutex ( cacMutexIn ),
@@ -97,6 +99,8 @@ udpiiu::udpiiu (
shutdownCmd ( false ),
lastReceivedSeqNoIsValid ( false )
{
cacGuard.assertIdenticalMutex ( cacMutex );
if ( envGetConfigParamPtr ( & EPICS_CA_MAX_SEARCH_PERIOD ) ) {
long longStatus = envGetDoubleConfigParam (
& EPICS_CA_MAX_SEARCH_PERIOD, & this->maxPeriod );
@@ -196,8 +200,8 @@ udpiiu::udpiiu (
osiSockAddr addr;
memset ( (char *)&addr, 0 , sizeof (addr) );
addr.ia.sin_family = AF_INET;
addr.ia.sin_addr.s_addr = epicsHTON32 (INADDR_ANY);
addr.ia.sin_port = epicsHTON16 (PORT_ANY); // X aCC 818
addr.ia.sin_addr.s_addr = htonl ( INADDR_ANY );
addr.ia.sin_port = htons ( PORT_ANY ); // X aCC 818
status = bind (this->sock, &addr.sa, sizeof (addr) );
if ( status < 0 ) {
char sockErrBuf[64];
@@ -226,7 +230,7 @@ udpiiu::udpiiu (
errlogPrintf ( "CAC: UDP socket was not inet addr family\n" );
throwWithLocation ( noSocket () );
}
this->localPort = epicsNTOH16 ( tmpAddr.ia.sin_port );
this->localPort = ntohs ( tmpAddr.ia.sin_port );
}
/*
@@ -242,7 +246,7 @@ udpiiu::udpiiu (
// start timers and receive thread
for ( unsigned j =0; j < this->nTimers; j++ ) {
this->ppSearchTmr[j]->start ();
this->ppSearchTmr[j]->start ( cacGuard );
}
this->govTmr.start ();
this->repeaterSubscribeTmr.start ();
@@ -439,21 +443,21 @@ void epicsShareAPI caRepeaterRegistrationMessage (
* this will only work with 3.13 beta 12 CA repeaters or later
*/
saddr.ia.sin_family = AF_INET;
saddr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_LOOPBACK );
saddr.ia.sin_port = epicsHTON16 ( port );
saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
saddr.ia.sin_port = htons ( port );
}
else {
saddr.ia.sin_port = epicsHTON16 ( port );
saddr.ia.sin_port = htons ( port );
}
}
else {
saddr.ia.sin_family = AF_INET;
saddr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_LOOPBACK );
saddr.ia.sin_port = epicsHTON16 ( port );
saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
saddr.ia.sin_port = htons ( port );
}
memset ( (char *) &msg, 0, sizeof (msg) );
msg.m_cmmd = epicsHTON16 ( REPEATER_REGISTER ); // X aCC 818
AlignedWireRef < epicsUInt16 > ( msg.m_cmmd ) = REPEATER_REGISTER; // X aCC 818
msg.m_available = saddr.ia.sin_addr.s_addr;
/*
@@ -537,8 +541,8 @@ void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repeaterPort )
ca_uint16_t port = static_cast < ca_uint16_t > ( repeaterPort );
memset ( (char *) &bd, 0, sizeof ( bd ) );
bd.ia.sin_family = AF_INET;
bd.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_ANY );
bd.ia.sin_port = epicsHTON16 ( port );
bd.ia.sin_addr.s_addr = htonl ( INADDR_ANY );
bd.ia.sin_port = htons ( port );
status = bind ( tmpSock, &bd.sa, sizeof ( bd ) );
if ( status < 0 ) {
if ( SOCKERRNO == SOCK_EADDRINUSE ) {
@@ -650,14 +654,14 @@ bool udpiiu::searchRespAction ( // X aCC 361
else {
serverAddr.ia.sin_addr = addr.ia.sin_addr;
}
serverAddr.ia.sin_port = epicsHTON16 ( msg.m_dataType );
serverAddr.ia.sin_port = htons ( msg.m_dataType );
}
else if ( CA_V45 (minorVersion) ) {
serverAddr.ia.sin_port = epicsHTON16 ( msg.m_dataType );
serverAddr.ia.sin_port = htons ( msg.m_dataType );
serverAddr.ia.sin_addr = addr.ia.sin_addr;
}
else {
serverAddr.ia.sin_port = epicsHTON16 ( this->serverPort );
serverAddr.ia.sin_port = htons ( this->serverPort );
serverAddr.ia.sin_addr = addr.ia.sin_addr;
}
@@ -691,25 +695,25 @@ bool udpiiu::beaconAction (
*
* old servers:
* 1) set this field to one of the ip addresses of the host _or_
* 2) set this field to epicsHTON32(INADDR_ANY)
* 2) set this field to INADDR_ANY
* new servers:
* always set this field to epicsHTON32(INADDR_ANY)
* always set this field to INADDR_ANY
*
* clients always assume that if this
* field is set to something that isnt epicsHTON32(INADDR_ANY)
* field is set to something that isnt INADDR_ANY
* then it is the overriding IP address of the server.
*/
ina.sin_family = AF_INET;
ina.sin_addr.s_addr = epicsHTON32 ( msg.m_available );
ina.sin_addr.s_addr = htonl ( msg.m_available );
if ( msg.m_count != 0 ) {
ina.sin_port = epicsHTON16 ( msg.m_count );
ina.sin_port = htons ( msg.m_count );
}
else {
/*
* old servers dont supply this and the
* default port must be assumed
*/
ina.sin_port = epicsHTON16 ( this->serverPort );
ina.sin_port = htons ( this->serverPort );
}
unsigned protocolRevision = msg.m_dataType;
ca_uint32_t beaconNumber = msg.m_cid;
@@ -787,12 +791,12 @@ void udpiiu::postMsg (
/*
* fix endian of bytes
*/
pCurMsg->m_postsize = epicsNTOH16 ( pCurMsg->m_postsize );
pCurMsg->m_cmmd = epicsNTOH16 ( pCurMsg->m_cmmd );
pCurMsg->m_dataType = epicsNTOH16 ( pCurMsg->m_dataType );
pCurMsg->m_count = epicsNTOH16 ( pCurMsg->m_count );
pCurMsg->m_available = epicsNTOH32 ( pCurMsg->m_available );
pCurMsg->m_cid = epicsNTOH32 ( pCurMsg->m_cid );
pCurMsg->m_postsize = AlignedWireRef < epicsUInt16 > ( pCurMsg->m_postsize );
pCurMsg->m_cmmd = AlignedWireRef < epicsUInt16 > ( pCurMsg->m_cmmd );
pCurMsg->m_dataType = AlignedWireRef < epicsUInt16 > ( pCurMsg->m_dataType );
pCurMsg->m_count = AlignedWireRef < epicsUInt16 > ( pCurMsg->m_count );
pCurMsg->m_available = AlignedWireRef < epicsUInt32 > ( pCurMsg->m_available );
pCurMsg->m_cid = AlignedWireRef < epicsUInt32 > ( pCurMsg->m_cid );
#if 0
printf ( "UDP Cmd=%3d Type=%3d Count=%4d Size=%4d",
@@ -849,11 +853,11 @@ bool udpiiu::pushVersionMsg ()
this->sequenceNumber++;
caHdr msg;
msg.m_cmmd = epicsHTON16 ( CA_PROTO_VERSION );
msg.m_available = epicsHTON32 ( 0 );
msg.m_dataType = epicsHTON16 ( sequenceNoIsValid );
msg.m_count = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION );
msg.m_cid = epicsHTON32 ( this->sequenceNumber ); // sequence number
AlignedWireRef < epicsUInt16 > ( msg.m_cmmd ) = CA_PROTO_VERSION;
AlignedWireRef < epicsUInt32 > ( msg.m_available ) = 0;
AlignedWireRef < epicsUInt16 > ( msg.m_dataType ) = sequenceNoIsValid;
AlignedWireRef < epicsUInt16 > ( msg.m_count ) = CA_MINOR_PROTOCOL_REVISION;
AlignedWireRef < epicsUInt32 > ( msg.m_cid ) = this->sequenceNumber; // sequence number
return this->pushDatagramMsg ( guard, msg, 0, 0 );
}
@@ -882,7 +886,7 @@ bool udpiiu::pushDatagramMsg ( epicsGuard < epicsMutex > & guard,
char *pDest = (char *) ( pbufmsg + 1 );
memset ( pDest + extsize, '\0', alignedExtSize - extsize );
}
pbufmsg->m_postsize = epicsHTON16 ( alignedExtSize );
AlignedWireRef < epicsUInt16 > ( pbufmsg->m_postsize ) = alignedExtSize;
this->nBytesInXmitBuf += msgsize;
return true;
@@ -982,17 +986,17 @@ void udpiiu::show ( unsigned level ) const
bool udpiiu::wakeupMsg ()
{
caHdr msg;
msg.m_cmmd = epicsHTON16 ( CA_PROTO_VERSION );
msg.m_available = epicsHTON32 ( 0u );
msg.m_dataType = epicsHTON16 ( 0u );
msg.m_count = epicsHTON16 ( 0u );
msg.m_cid = epicsHTON32 ( 0u );
msg.m_postsize = epicsHTON16 ( 0u );
AlignedWireRef < epicsUInt16 > ( msg.m_cmmd ) = CA_PROTO_VERSION;
AlignedWireRef < epicsUInt32 > ( msg.m_available ) = 0u;
AlignedWireRef < epicsUInt16 > ( msg.m_dataType ) = 0u;
AlignedWireRef < epicsUInt16 > ( msg.m_count ) = 0u;
AlignedWireRef < epicsUInt32 > ( msg.m_cid ) = 0u;
AlignedWireRef < epicsUInt16 > ( msg.m_postsize ) = 0u;
osiSockAddr addr;
addr.ia.sin_family = AF_INET;
addr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_LOOPBACK );
addr.ia.sin_port = epicsHTON16 ( this->localPort );
addr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
addr.ia.sin_port = htons ( this->localPort );
// send a wakeup msg so the UDP recv thread will exit
int status = sendto ( this->sock, reinterpret_cast < char * > ( &msg ),
@@ -1049,11 +1053,11 @@ bool udpiiu::searchMsg (
const char * pName, unsigned nameLength )
{
caHdr msg;
msg.m_cmmd = epicsHTON16 ( CA_PROTO_SEARCH );
msg.m_available = epicsHTON32 ( id );
msg.m_dataType = epicsHTON16 ( DONTREPLY );
msg.m_count = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION );
msg.m_cid = epicsHTON32 ( id );
AlignedWireRef < epicsUInt16 > ( msg.m_cmmd ) = CA_PROTO_SEARCH;
AlignedWireRef < epicsUInt32 > ( msg.m_available ) = id;
AlignedWireRef < epicsUInt16 > ( msg.m_dataType ) = DONTREPLY;
AlignedWireRef < epicsUInt16 > ( msg.m_count ) = CA_MINOR_PROTOCOL_REVISION;
AlignedWireRef < epicsUInt32 > ( msg.m_cid ) = id;
return this->pushDatagramMsg (
guard, msg, pName, (ca_uint16_t) nameLength );
}
@@ -1113,15 +1117,27 @@ int udpiiu::printf ( epicsGuard < epicsMutex > & cbGuard,
return status;
}
void udpiiu::updateRTTE ( double measured )
void udpiiu::updateRTTE ( epicsGuard < epicsMutex > & guard, double measured )
{
guard.assertIdenticalMutex ( this->cacMutex );
if ( measured > maxRoundTripEstimate ) {
measured = maxRoundTripEstimate;
}
if ( measured < minRoundTripEstimate ) {
measured = minRoundTripEstimate;
}
double error = measured - this->rtteMean;
this->rtteMean += 0.25 * error;
this->rtteMean += 0.125 * error;
if ( error < 0.0 ) {
error = - error;
}
this->rtteMeanDev = this->rtteMeanDev + .25 * ( error - this->rtteMeanDev );
}
double udpiiu::getRTTE () const
double udpiiu::getRTTE ( epicsGuard < epicsMutex > & guard ) const
{
return epicsMax ( this->rtteMean, minRoundTripEstimate );
guard.assertIdenticalMutex ( this->cacMutex );
return this->rtteMean + 4 * this->rtteMeanDev;
}
unsigned udpiiu::getHostName (
+5 -2
View File
@@ -80,6 +80,7 @@ private:
};
static const double minRoundTripEstimate = 32e-3; // seconds
static const double maxRoundTripEstimate = 30; // seconds
static const double maxSearchPeriodDefault = 5.0 * 60.0; // seconds
static const double maxSearchPeriodLowerLimit = 60.0; // seconds
static const double beaconAnomalySearchPeriod = 5.0; // seconds
@@ -91,6 +92,7 @@ class udpiiu :
private repeaterTimerNotify {
public:
udpiiu (
epicsGuard < epicsMutex > & cacGuard,
class epicsTimerQueueActive &,
epicsMutex & callbackControl,
epicsMutex & mutualExclusion,
@@ -119,6 +121,7 @@ private:
ELLLIST dest;
double maxPeriod;
double rtteMean;
double rtteMeanDev;
cac & cacRef;
mutable epicsMutex & cbMutex;
mutable epicsMutex & cacMutex;
@@ -232,8 +235,8 @@ private:
const char * pName, unsigned nameLength );
// searchTimerNotify stubs
double getRTTE () const;
void updateRTTE ( double rtte );
double getRTTE ( epicsGuard < epicsMutex > & ) const;
void updateRTTE ( epicsGuard < epicsMutex > &, double rtte );
bool pushVersionMsg ();
void boostChannel (
epicsGuard < epicsMutex > & guard, nciu & chan );