fixes for bugs 133 and 134 in Mantis

This commit is contained in:
Jeff Hill
2004-09-23 23:15:22 +00:00
parent 107f1fa37f
commit 7b7a07c667
28 changed files with 1386 additions and 940 deletions

View File

@@ -46,6 +46,7 @@ LIBSRCS += nciu.cpp
LIBSRCS += netiiu.cpp
LIBSRCS += udpiiu.cpp
LIBSRCS += tcpiiu.cpp
LIBSRCS += noopiiu.cpp
LIBSRCS += netReadNotifyIO.cpp
LIBSRCS += netWriteNotifyIO.cpp
LIBSRCS += netSubscription.cpp

View File

@@ -23,7 +23,8 @@
#include <new>
#include <float.h>
#include <epicsExit.h>
#include "epicsExit.h"
#define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
@@ -43,6 +44,7 @@
#include "iocinf.h"
#include "oldAccess.h"
#include "cac.h"
#include "autoPtrFreeList.h"
epicsThreadPrivateId caClientContextId;
@@ -364,10 +366,18 @@ int epicsShareAPI ca_create_channel (
int epicsShareAPI ca_clear_channel ( chid pChan )
{
ca_client_context & cac = pChan->getClientCtx ();
cac.destroyChannel ( *pChan );
epicsGuard < epicsMutex > * pCBGuard = cac.pCallbackGuard.get();
if ( pCBGuard ) {
epicsGuard < epicsMutex > guard ( cac.mutex );
cac.destroyChannel ( *pCBGuard, guard, *pChan );
}
else {
epicsGuard < epicsMutex > cbGuard ( cac.cbMutex );
epicsGuard < epicsMutex > guard ( cac.mutex );
cac.destroyChannel ( cbGuard, guard, *pChan );
}
return ECA_NORMAL;
}
#include "autoPtrFreeList.h"
/*
* ca_array_get ()

View File

@@ -38,7 +38,6 @@
int main()
{
ca_repeater ();
assert ( 0 );
return ( 0 );
}

View File

@@ -28,8 +28,8 @@
#endif
#include <stdexcept>
#include <stdio.h>
#include "epicsExit.h"
#define epicsExportSharedSymbols
@@ -187,11 +187,11 @@ ca_client_context::~ca_client_context ()
}
}
void ca_client_context::destroyChannelPrivate (
oldChannelNotify & chan,
epicsGuard < epicsMutex > & cbGuard )
void ca_client_context::destroyChannel (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard,
oldChannelNotify & chan )
{
epicsGuard < epicsMutex > guard ( this->mutex );
try {
chan.eliminateExcessiveSendBacklog (
&cbGuard, guard );
@@ -203,18 +203,6 @@ void ca_client_context::destroyChannelPrivate (
this->oldChannelNotifyFreeList.release ( & chan );
}
void ca_client_context::destroyChannel ( oldChannelNotify & chan )
{
epicsGuard < epicsMutex > * pCBGuard = this->pCallbackGuard.get();
if ( pCBGuard ) {
destroyChannelPrivate ( chan, *pCBGuard );
}
else {
epicsGuard < epicsMutex > cbGuard ( this->cbMutex );
destroyChannelPrivate ( chan, cbGuard );
}
}
void ca_client_context::destroyGetCopy (
epicsGuard < epicsMutex > & guard, getCopy & gc )
{

View File

@@ -44,6 +44,7 @@
#include "bhe.h"
#include "net_convert.h"
#include "autoPtrFreeList.h"
#include "noopIIU.h"
static const char *pVersionCAC =
"@(#) " EPICS_VERSION_STRING
@@ -232,17 +233,17 @@ cac::~cac ()
// waiting for the UDP thread to exit while it is waiting to
// get the lock.
if ( this->pudpiiu ) {
this->pudpiiu->shutdown ();
epicsGuard < epicsMutex > cbGuard ( this->cbMutex );
epicsGuard < epicsMutex > guard ( this->mutex );
this->pudpiiu->shutdown ( cbGuard, guard );
//
// shutdown all tcp circuits
//
epicsGuard < epicsMutex > cbGuard ( this->cbMutex );
epicsGuard < epicsMutex > guard ( this->mutex );
tsDLIter < tcpiiu > iter = this->circuitList.firstIter ();
while ( iter.valid() ) {
// this causes a clean shutdown to occur
iter->removeAllChannels ( true, cbGuard, guard, *this->pudpiiu );
iter->unlinkAllChannels ( cbGuard, guard );
iter++;
}
}
@@ -425,7 +426,7 @@ void cac::beaconNotify ( const inetAddrID & addr, const epicsTime & currentTime,
this->beaconAnomalyCount++;
this->pudpiiu->beaconAnomalyNotify ( guard, currentTime );
this->pudpiiu->beaconAnomalyNotify ( guard );
# ifdef DEBUG
{
@@ -457,21 +458,19 @@ cacChannel & cac::createChannel (
}
nciu * pNetChan = new ( this->channelFreeList )
nciu ( *this, *this->pudpiiu, chan, pName, pri );
nciu ( *this, noopIIU, chan, pName, pri );
this->chanTable.idAssignAdd ( *pNetChan );
return *pNetChan;
}
bool cac::transferChanToVirtCircuit (
void cac::transferChanToVirtCircuit (
epicsGuard < epicsMutex > & cbGuard, unsigned cid, unsigned sid, // X aCC 431
ca_uint16_t typeCode, arrayElementCount count,
unsigned minorVersionNumber, const osiSockAddr & addr )
unsigned minorVersionNumber, const osiSockAddr & addr,
const epicsTime & currentTime )
{
bool newIIU = false;
tcpiiu * piiu = 0;
if ( addr.sa.sa_family != AF_INET ) {
return false;
return ;
}
epicsGuard < epicsMutex > guard ( this->mutex );
@@ -481,12 +480,12 @@ bool cac::transferChanToVirtCircuit (
*/
nciu * pChan = this->chanTable.lookup ( cid );
if ( ! pChan ) {
return false;
return;
}
/*
* Ignore duplicate search replies
*/
* Ignore duplicate search replies
*/
osiSockAddr chanAddr = pChan->getPIIU(guard)->getNetworkAddress (guard);
if ( chanAddr.sa.sa_family != AF_UNSPEC ) {
if ( ! sockAddrAreIdentical ( &addr, &chanAddr ) ) {
@@ -497,17 +496,18 @@ bool cac::transferChanToVirtCircuit (
*this, pChan->pName ( guard ), acc );
pMsg->ioInitiate ( addr );
}
return false;
return;
}
/*
* look for an existing virtual circuit
*/
bool newIIU = false;
caServerID servID ( addr.ia, pChan->getPriority(guard) );
piiu = this->serverTable.lookup ( servID );
tcpiiu * piiu = this->serverTable.lookup ( servID );
if ( piiu ) {
if ( ! piiu->alive ( guard ) ) {
return false;
return;
}
}
else {
@@ -523,7 +523,7 @@ bool cac::transferChanToVirtCircuit (
pBHE = new ( this->bheFreeList )
bhe ( this->mutex, epicsTime (), 0u, addr.ia );
if ( this->beaconTable.add ( *pBHE ) < 0 ) {
return false;
return;
}
}
this->serverTable.add ( *pnewiiu );
@@ -533,17 +533,20 @@ bool cac::transferChanToVirtCircuit (
newIIU = true;
}
catch ( std::bad_alloc & ) {
return false;
return;
}
catch ( ... ) {
errlogPrintf (
"CAC: Unexpected exception during virtual circuit creation\n" );
return false;
return;
}
}
this->pudpiiu->uninstallChan ( cbGuard, guard, *pChan );
piiu->installChannel ( cbGuard, guard, *pChan, sid, typeCode, count );
// must occur before moving to new iiu
pChan->getPIIU(guard)->uninstallChanDueToSuccessfulSearchResponse (
guard, *pChan, currentTime );
piiu->installChannel (
cbGuard, guard, *pChan, sid, typeCode, count );
if ( ! piiu->ca_v42_ok ( guard ) ) {
// connect to old server with lock applied
@@ -553,8 +556,6 @@ bool cac::transferChanToVirtCircuit (
if ( newIIU ) {
piiu->start ( guard );
}
return true;
}
void cac::destroyChannel (
@@ -1081,21 +1082,19 @@ bool cac::verifyAndDisconnectChan (
if ( ! pChan ) {
return true;
}
this->disconnectChannel ( currentTime, mgr.cbGuard, guard, *pChan );
this->disconnectChannel ( mgr.cbGuard, guard, *pChan );
return true;
}
void cac::disconnectChannel (
const epicsTime & /* currentTime */,
epicsGuard < epicsMutex > & cbGuard, // X aCC 431
epicsGuard < epicsMutex > & guard, nciu & chan )
{
guard.assertIdenticalMutex ( this->mutex );
assert ( this->pudpiiu );
chan.disconnectAllIO ( cbGuard, guard );
chan.getPIIU(guard)->uninstallChan ( cbGuard, guard, chan );
this->pudpiiu->installDisconnectedChannel ( chan );
chan.setServerAddressUnknown ( *this->pudpiiu, guard );
chan.getPIIU(guard)->uninstallChan ( guard, chan );
this->pudpiiu->installDisconnectedChannel ( guard, chan );
chan.unresponsiveCircuitNotify ( cbGuard, guard );
}
@@ -1153,7 +1152,7 @@ void cac::destroyIIU ( tcpiiu & iiu )
}
assert ( this->pudpiiu );
iiu.removeAllChannels ( false, cbGuard, guard, *this->pudpiiu );
iiu.disconnectAllChannels ( cbGuard, guard, *this->pudpiiu );
this->serverTable.remove ( iiu );
this->circuitList.remove ( iiu );
@@ -1191,12 +1190,12 @@ double cac::beaconPeriod (
}
void cac::initiateConnect (
epicsGuard < epicsMutex > & guard, nciu & chan )
epicsGuard < epicsMutex > & guard,
nciu & chan, netiiu * & piiu )
{
guard.assertIdenticalMutex ( this->mutex );
assert ( this->pudpiiu );
this->pudpiiu->installNewChannel (
epicsTime::getCurrent(), chan );
this->pudpiiu->installNewChannel ( guard, chan, piiu );
}
void *cacComBufMemoryManager::allocate ( size_t size )

View File

@@ -85,14 +85,6 @@ private:
cacComBufMemoryManager & operator = ( const cacComBufMemoryManager & );
};
class cacDisconnectChannelPrivate { // X aCC 655
public:
virtual void disconnectChannel (
const epicsTime & currentTime,
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard, nciu & chan ) = 0;
};
class notifyGuard {
public:
notifyGuard ( cacContextNotify & );
@@ -114,7 +106,6 @@ public:
class cac :
public cacContext,
private cacRecycle,
private cacDisconnectChannelPrivate,
private callbackForMultiplyDefinedPV
{
public:
@@ -136,15 +127,12 @@ public:
const epicsTime & currentTime, caHdrLargeArray &, char *pMsgBody );
// channel routines
bool transferChanToVirtCircuit (
void transferChanToVirtCircuit (
epicsGuard < epicsMutex > &,
unsigned cid, unsigned sid,
ca_uint16_t typeCode, arrayElementCount count,
unsigned minorVersionNumber, const osiSockAddr & );
void disconnectAllChannels (
epicsGuard < epicsMutex > & callbackControlGuard,
epicsGuard < epicsMutex > & mutualExclusionGuard,
tcpiiu & );
unsigned minorVersionNumber, const osiSockAddr &,
const epicsTime & currentTime );
cacChannel & createChannel (
epicsGuard < epicsMutex > & guard, const char * pChannelName,
cacChannelNotify &, cacChannel::priLev );
@@ -153,7 +141,7 @@ public:
epicsGuard < epicsMutex > & mutualExclusionGuard,
nciu & );
void initiateConnect (
epicsGuard < epicsMutex > &, nciu & );
epicsGuard < epicsMutex > &, nciu &, netiiu * & );
// IO requests
void writeRequest ( epicsGuard < epicsMutex > &, nciu &, unsigned type,
@@ -293,7 +281,6 @@ private:
epicsGuard < epicsMutex > &, netSubscription &io );
void disconnectChannel (
const epicsTime & currentTime,
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard, nciu & chan );
@@ -423,16 +410,6 @@ inline unsigned cac::beaconAnomaliesSinceProgramStart (
return this->beaconAnomalyCount;
}
inline void cac::disconnectAllChannels (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard,
tcpiiu & iiu )
{
cbGuard.assertIdenticalMutex ( this->cbMutex );
guard.assertIdenticalMutex ( this->mutex );
iiu.removeAllChannels ( false, cbGuard, guard, *this->pudpiiu );
}
inline notifyGuard::notifyGuard ( cacContextNotify & notifyIn ) :
notify ( notifyIn )
{

View File

@@ -137,7 +137,9 @@ public:
virtual ~cacChannelNotify () = 0;
virtual void connectNotify ( epicsGuard < epicsMutex > & ) = 0;
virtual void disconnectNotify ( epicsGuard < epicsMutex > & ) = 0;
virtual void serviceShutdownNotify () = 0;
virtual void serviceShutdownNotify (
epicsGuard < epicsMutex > & callbackControlGuard,
epicsGuard < epicsMutex > & mutualExclusionGuard ) = 0;
virtual void accessRightsNotify (
epicsGuard < epicsMutex > &, const caAccessRights & ) = 0;
virtual void exception (

View File

@@ -24,15 +24,17 @@
#define epicsExportSharedSymbols
#include "disconnectGovernorTimer.h"
#include "udpiiu.h"
#include "nciu.h"
static const double period = 10.0; // sec
static const double disconnectGovernorPeriod = 10.0; // sec
disconnectGovernorTimer::disconnectGovernorTimer (
udpiiu & iiuIn, epicsTimerQueue & queueIn ) :
timer ( queueIn.createTimer () ),
disconnectGovernorNotify & iiuIn,
epicsTimerQueue & queueIn,
epicsMutex & mutexIn ) :
mutex ( mutexIn ), timer ( queueIn.createTimer () ),
iiu ( iiuIn )
{
this->timer.start ( *this, period );
}
disconnectGovernorTimer::~disconnectGovernorTimer ()
@@ -40,18 +42,65 @@ disconnectGovernorTimer::~disconnectGovernorTimer ()
this->timer.destroy ();
}
void disconnectGovernorTimer::shutdown ()
void disconnectGovernorTimer:: start ()
{
this->timer.cancel ();
this->timer.start ( *this, disconnectGovernorPeriod );
}
epicsTimerNotify::expireStatus disconnectGovernorTimer::expire ( const epicsTime & currentTime ) // X aCC 361
void disconnectGovernorTimer::shutdown (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard )
{
this->iiu.govExpireNotify ( currentTime );
return expireStatus ( restart, period );
epicsGuardRelease < epicsMutex > unguard ( guard );
{
epicsGuardRelease < epicsMutex > unguard ( cbGuard );
this->timer.cancel ();
}
while ( nciu * pChan = this->chanList.get () ) {
pChan->channelNode::listMember =
channelNode::cs_none;
pChan->serviceShutdownNotify ( cbGuard, guard );
}
}
void disconnectGovernorTimer::show ( unsigned /* level */ ) const
epicsTimerNotify::expireStatus disconnectGovernorTimer::expire (
const epicsTime & currentTime ) // X aCC 361
{
epicsGuard < epicsMutex > guard ( this->mutex );
while ( nciu * pChan = chanList.get () ) {
pChan->channelNode::listMember =
channelNode::cs_none;
this->iiu.govExpireNotify ( guard, *pChan );
}
return expireStatus ( restart, disconnectGovernorPeriod );
}
void disconnectGovernorTimer::show ( unsigned level ) const
{
epicsGuard < epicsMutex > guard ( this->mutex );
::printf ( "disconnect governor timer:\n" );
tsDLIterConst < nciu > pChan = this->chanList.firstIter ();
while ( pChan.valid () ) {
pChan->show ( level - 1u );
pChan++;
}
}
void disconnectGovernorTimer::installChan (
epicsGuard < epicsMutex > & guard, nciu & chan )
{
guard.assertIdenticalMutex ( this->mutex );
this->chanList.add ( chan );
chan.channelNode::listMember = channelNode::cs_disconnGov;
}
void disconnectGovernorTimer::uninstallChan (
epicsGuard < epicsMutex > & guard, nciu & chan )
{
guard.assertIdenticalMutex ( this->mutex );
this->chanList.remove ( chan );
chan.channelNode::listMember = channelNode::cs_none;
}
disconnectGovernorNotify::~disconnectGovernorNotify () {}

View File

@@ -42,16 +42,34 @@
#endif
#include "caProto.h"
#include "netiiu.h"
class disconnectGovernorNotify {
public:
virtual ~disconnectGovernorNotify () = 0;
virtual void govExpireNotify (
epicsGuard < epicsMutex > &, nciu & ) = 0;
};
class disconnectGovernorTimer : private epicsTimerNotify {
public:
disconnectGovernorTimer ( class udpiiu &, epicsTimerQueue & );
disconnectGovernorTimer (
class disconnectGovernorNotify &, epicsTimerQueue &, epicsMutex & );
virtual ~disconnectGovernorTimer ();
void start ();
void shutdown (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard );
void installChan (
epicsGuard < epicsMutex > &, nciu & );
void uninstallChan (
epicsGuard < epicsMutex > &, nciu & );
void show ( unsigned level ) const;
void shutdown ();
private:
tsDLList < nciu > chanList;
epicsMutex & mutex;
epicsTimer & timer;
class udpiiu & iiu;
class disconnectGovernorNotify & iiu;
epicsTimerNotify::expireStatus expire ( const epicsTime & currentTime );
disconnectGovernorTimer ( const disconnectGovernorTimer & );
disconnectGovernorTimer & operator = ( const disconnectGovernorTimer & );

View File

@@ -18,14 +18,6 @@
*
* Author: Jeff Hill
*
* Notes:
* 1) This class has a pointer to the IIU. This pointer always points at
* a valid IIU. If the client context is deleted then the channel points at a
* static file scope IIU. IIU's that disconnect go into an inactive state
* and are stored on a list for later reuse. When the channel calls a
* member function of the IIU, the IIU verifies that the channel's IIU
* pointer is still pointing at itself only after it has acquired the IIU
* lock.
*/
#include <new>
@@ -34,6 +26,8 @@
#define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
#include "epicsAlgorithm.h"
#define epicsExportSharedSymbols
#include "iocinf.h"
#include "cac.h"
@@ -42,6 +36,7 @@
#include "virtualCircuit.h"
#include "cadef.h"
#include "db_access.h" // for INVALID_DB_REQ
#include "noopIIU.h"
nciu::nciu ( cac & cacIn, netiiu & iiuIn, cacChannelNotify & chanIn,
const char *pNameIn, cacChannel::priLev pri ) :
@@ -97,8 +92,7 @@ void nciu::destroy (
mutualExclusionGuard, this->sid, this->id );
}
this->piiu->uninstallChan (
callbackControlGuard, mutualExclusionGuard, *this );
this->piiu->uninstallChan ( mutualExclusionGuard, *this );
this->cacCtx.destroyChannel (
callbackControlGuard, mutualExclusionGuard, *this );
@@ -125,7 +119,7 @@ void nciu::operator delete ( void * )
void nciu::initiateConnect (
epicsGuard < epicsMutex > & guard )
{
this->cacCtx.initiateConnect ( guard, *this );
this->cacCtx.initiateConnect ( guard, *this, this->piiu );
}
void nciu::connect ( unsigned nativeType,
@@ -186,7 +180,7 @@ void nciu::unresponsiveCircuitNotify (
this->notify().accessRightsNotify ( guard, noRights );
}
void nciu::setServerAddressUnknown ( udpiiu & newiiu,
void nciu::setServerAddressUnknown ( netiiu & newiiu,
epicsGuard < epicsMutex > & guard )
{
guard.assertIdenticalMutex ( this->cacCtx.mutexRef () );
@@ -217,30 +211,16 @@ void nciu::accessRightsStateChange (
/*
* nciu::searchMsg ()
*/
bool nciu::searchMsg ( udpiiu & iiu )
bool nciu::searchMsg ( epicsGuard < epicsMutex > & guard )
{
caHdr msg;
bool success;
msg.m_cmmd = epicsHTON16 ( CA_PROTO_SEARCH );
msg.m_available = epicsHTON32 ( this->getId () );
msg.m_dataType = epicsHTON16 ( DONTREPLY );
msg.m_count = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION );
msg.m_cid = epicsHTON32 ( this->getId () );
success = iiu.pushDatagramMsg ( msg,
this->pNameStr, this->nameLength );
if ( success ) {
//
// increment the number of times we have tried
// to find this channel
//
bool success = this->piiu->searchMsg (
guard, this->getId (), this->pNameStr, this->nameLength );
if ( success ) {
if ( this->retry < UINT_MAX ) {
this->retry++;
}
}
return success;
}
return success;
}
const char *nciu::pName (
@@ -572,3 +552,62 @@ void nciu::disconnectAllIO (
*this, this->eventq );
}
void nciu::serviceShutdownNotify (
epicsGuard < epicsMutex > & callbackControlGuard,
epicsGuard < epicsMutex > & mutualExclusionGuard )
{
this->setServerAddressUnknown ( noopIIU, mutualExclusionGuard );
this->notify().serviceShutdownNotify ( callbackControlGuard, mutualExclusionGuard );
}
void channelNode::setRespPendingState (
epicsGuard < epicsMutex > &, unsigned index )
{
this->listMember =
static_cast < channelNode::channelState >
( channelNode::cs_searchRespPending0 + index );
if ( this->listMember > cs_searchRespPending17 ) {
throw std::runtime_error (
"resp search timer index out of bounds" );
}
}
void channelNode::setReqPendingState (
epicsGuard < epicsMutex > &, unsigned index )
{
this->listMember =
static_cast < channelNode::channelState >
( channelNode::cs_searchReqPending0 + index );
if ( this->listMember > cs_searchReqPending17 ) {
throw std::runtime_error (
"req search timer index out of bounds" );
}
}
unsigned channelNode::getMaxSearchTimerCount ()
{
return epicsMin (
cs_searchReqPending17 - cs_searchReqPending0,
cs_searchRespPending17 - cs_searchRespPending0 ) + 1u;
}
unsigned channelNode::getSearchTimerIndex (
epicsGuard < epicsMutex > & )
{
channelNode::channelState chanState = this->listMember;
unsigned index = 0u;
if ( chanState >= cs_searchReqPending0 &&
chanState <= cs_searchReqPending17 ) {
index = chanState - cs_searchReqPending0;
}
else if ( chanState >= cs_searchRespPending0 &&
chanState <= cs_searchRespPending17 ) {
index = chanState - cs_searchRespPending0;
}
else {
throw std::runtime_error (
"channel was expected to be in a search timer, but wasnt" );;
}
return index;
}

View File

@@ -59,11 +59,51 @@ protected:
channelNode ();
bool isConnected ( epicsGuard < epicsMutex > & ) const;
bool isInstalledInServer ( epicsGuard < epicsMutex > & ) const;
static unsigned getMaxSearchTimerCount ();
private:
enum channelState {
cs_none,
cs_disconnGov,
cs_serverAddrResPend,
// note: indexing is used here
// so these must be contiguous
cs_searchReqPending0,
cs_searchReqPending1,
cs_searchReqPending2,
cs_searchReqPending3,
cs_searchReqPending4,
cs_searchReqPending5,
cs_searchReqPending6,
cs_searchReqPending7,
cs_searchReqPending8,
cs_searchReqPending9,
cs_searchReqPending10,
cs_searchReqPending11,
cs_searchReqPending12,
cs_searchReqPending13,
cs_searchReqPending14,
cs_searchReqPending15,
cs_searchReqPending16,
cs_searchReqPending17,
// note: indexing is used here
// so these must be contiguous
cs_searchRespPending0,
cs_searchRespPending1,
cs_searchRespPending2,
cs_searchRespPending3,
cs_searchRespPending4,
cs_searchRespPending5,
cs_searchRespPending6,
cs_searchRespPending7,
cs_searchRespPending8,
cs_searchRespPending9,
cs_searchRespPending10,
cs_searchRespPending11,
cs_searchRespPending12,
cs_searchRespPending13,
cs_searchRespPending14,
cs_searchRespPending15,
cs_searchRespPending16,
cs_searchRespPending17,
cs_createReqPend,
cs_createRespPend,
cs_subscripReqPend,
@@ -71,9 +111,14 @@ private:
cs_unrespCircuit,
cs_subscripUpdateReqPend
} listMember;
void setRespPendingState ( epicsGuard < epicsMutex > &, unsigned index );
void setReqPendingState ( epicsGuard < epicsMutex > &, unsigned index );
unsigned getSearchTimerIndex ( epicsGuard < epicsMutex > & );
friend class tcpiiu;
friend class tcpSendThread;
friend class udpiiu;
friend class tcpSendThread;
friend class searchTimer;
friend class disconnectGovernorTimer;
};
class privateInterfaceForIO { // X aCC 655
@@ -107,9 +152,12 @@ public:
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard );
void setServerAddressUnknown (
udpiiu & newiiu, epicsGuard < epicsMutex > & guard );
bool searchMsg ( class udpiiu & iiu );
void serviceShutdownNotify ();
netiiu & newiiu, epicsGuard < epicsMutex > & guard );
bool searchMsg (
epicsGuard < epicsMutex > & );
void serviceShutdownNotify (
epicsGuard < epicsMutex > & callbackControlGuard,
epicsGuard < epicsMutex > & mutualExclusionGuard );
void accessRightsStateChange ( const caAccessRights &,
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard );
@@ -150,7 +198,7 @@ public:
void sendSubscriptionUpdateRequests ( epicsGuard < epicsMutex > & );
void disconnectAllIO (
epicsGuard < epicsMutex > &, epicsGuard < epicsMutex > & );
bool connected ( epicsGuard < epicsMutex > & ) const;
bool connected ( epicsGuard < epicsMutex > & ) const;
private:
tsDLList < class baseNMIU > eventq;
@@ -287,11 +335,6 @@ inline const netiiu * nciu::getConstPIIU (
return this->piiu;
}
inline void nciu::serviceShutdownNotify ()
{
this->notify().serviceShutdownNotify ();
}
inline cac & nciu::getClient ()
{
return this->cacCtx;

View File

@@ -19,6 +19,8 @@
* Author: Jeff Hill
*/
#include <stdexcept>
#include <limits.h>
#include <float.h>
@@ -124,17 +126,33 @@ void netiiu::eliminateExcessiveSendBacklog (
void netiiu::requestRecvProcessPostponedFlush (
epicsGuard < epicsMutex > & )
{
return;
}
void netiiu::uninstallChan (
epicsGuard < epicsMutex > &,
epicsGuard < epicsMutex > &, nciu & )
{
throw cacChannel::notConnected();
}
double netiiu::receiveWatchdogDelay (
epicsGuard < epicsMutex > & guard ) const
{
return - DBL_MAX;
}
void netiiu::uninstallChanDueToSuccessfulSearchResponse (
epicsGuard < epicsMutex > &, nciu &,
const epicsTime & currentTime )
{
throw std::runtime_error (
"search response occured when not attached to udpiiu?" );
}
bool netiiu::searchMsg (
epicsGuard < epicsMutex > &, ca_uint32_t id,
const char * pName, unsigned nameLength )
{
return false;
}

View File

@@ -26,20 +26,19 @@
#ifndef netiiuh
#define netiiuh
#include "tsDLList.h"
#include "nciu.h"
#include "cacIO.h"
#include "caProto.h"
class netWriteNotifyIO;
class netReadNotifyIO;
class netSubscription;
union osiSockAddr;
class cac;
class nciu;
class netiiu {
public:
virtual ~netiiu ();
virtual ~netiiu () = 0;
virtual void hostName (
epicsGuard < epicsMutex > &, char * pBuf,
unsigned bufLength ) const = 0;
@@ -84,10 +83,15 @@ public:
virtual osiSockAddr getNetworkAddress (
epicsGuard < epicsMutex > & ) const = 0;
virtual void uninstallChan (
epicsGuard < epicsMutex > & cbMutex,
epicsGuard < epicsMutex > & mutex, nciu & ) = 0;
epicsGuard < epicsMutex > &, nciu & ) = 0;
virtual void uninstallChanDueToSuccessfulSearchResponse (
epicsGuard < epicsMutex > &, nciu &,
const class epicsTime & currentTime ) = 0;
virtual double receiveWatchdogDelay (
epicsGuard < epicsMutex > & ) const = 0;
virtual bool searchMsg (
epicsGuard < epicsMutex > &, ca_uint32_t id,
const char * pName, unsigned nameLength ) = 0;
};
#endif // netiiuh

166
src/ca/noopiiu.cpp Normal file
View File

@@ -0,0 +1,166 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#include "osiSock.h"
#define epicsExportSharedSymbols
#include "noopiiu.h"
noopiiu noopIIU;
noopiiu::~noopiiu ()
{
}
void noopiiu::hostName (
epicsGuard < epicsMutex > & cacGuard,
char *pBuf, unsigned bufLength ) const
{
netiiu::hostName ( cacGuard, pBuf, bufLength );
}
const char * noopiiu::pHostName (
epicsGuard < epicsMutex > & cacGuard ) const
{
return netiiu::pHostName ( cacGuard );
}
bool noopiiu::ca_v42_ok (
epicsGuard < epicsMutex > & cacGuard ) const
{
return netiiu::ca_v42_ok ( cacGuard );
}
bool noopiiu::ca_v41_ok (
epicsGuard < epicsMutex > & cacGuard ) const
{
return netiiu::ca_v41_ok ( cacGuard );
}
void noopiiu::writeRequest (
epicsGuard < epicsMutex > & guard,
nciu & chan, unsigned type,
arrayElementCount nElem, const void * pValue )
{
netiiu::writeRequest ( guard, chan, type, nElem, pValue );
}
void noopiiu::writeNotifyRequest (
epicsGuard < epicsMutex > & guard, nciu & chan,
netWriteNotifyIO & io, unsigned type,
arrayElementCount nElem, const void *pValue )
{
netiiu::writeNotifyRequest ( guard, chan, io, type, nElem, pValue );
}
void noopiiu::readNotifyRequest (
epicsGuard < epicsMutex > & guard, nciu & chan,
netReadNotifyIO & io, unsigned type, arrayElementCount nElem )
{
netiiu::readNotifyRequest ( guard, chan, io, type, nElem );
}
void noopiiu::clearChannelRequest (
epicsGuard < epicsMutex > & guard,
ca_uint32_t sid, ca_uint32_t cid )
{
netiiu::clearChannelRequest ( guard, sid, cid );
}
void noopiiu::subscriptionRequest (
epicsGuard < epicsMutex > & guard, nciu & chan,
netSubscription & subscr )
{
netiiu::subscriptionRequest ( guard, chan, subscr );
}
void noopiiu::subscriptionUpdateRequest (
epicsGuard < epicsMutex > & guard, nciu & chan,
netSubscription & subscr )
{
netiiu::subscriptionUpdateRequest (
guard, chan, subscr );
}
void noopiiu::subscriptionCancelRequest (
epicsGuard < epicsMutex > & guard,
nciu & chan, netSubscription & subscr )
{
netiiu::subscriptionCancelRequest ( guard, chan, subscr );
}
void noopiiu::flushRequest (
epicsGuard < epicsMutex > & guard )
{
netiiu::flushRequest ( guard );
}
void noopiiu::eliminateExcessiveSendBacklog (
epicsGuard < epicsMutex > * pCBGuard,
epicsGuard < epicsMutex > & guard )
{
netiiu::eliminateExcessiveSendBacklog ( pCBGuard, guard );
}
void noopiiu::requestRecvProcessPostponedFlush (
epicsGuard < epicsMutex > & guard )
{
netiiu::requestRecvProcessPostponedFlush ( guard );
}
osiSockAddr noopiiu::getNetworkAddress (
epicsGuard < epicsMutex > & guard ) const
{
return netiiu::getNetworkAddress ( guard );
}
double noopiiu::receiveWatchdogDelay (
epicsGuard < epicsMutex > & guard ) const
{
return netiiu::receiveWatchdogDelay ( guard );
}
void noopiiu::uninstallChan (
epicsGuard < epicsMutex > & guard, nciu & chan )
{
// intentionally does not call default in netiiu
}
void noopiiu::uninstallChanDueToSuccessfulSearchResponse (
epicsGuard < epicsMutex > & guard, nciu & chan,
const class epicsTime & currentTime )
{
netiiu::uninstallChanDueToSuccessfulSearchResponse (
guard, chan, currentTime );
}
bool noopiiu::searchMsg (
epicsGuard < epicsMutex > & guard, ca_uint32_t id,
const char * pName, unsigned nameLength )
{
return netiiu::searchMsg (
guard, id, pName, nameLength );
}

92
src/ca/noopiiu.h Normal file
View File

@@ -0,0 +1,92 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef noopiiuh
#define noopiiuh
#include "netiiu.h"
class noopiiu : public netiiu {
public:
~noopiiu ();
void hostName (
epicsGuard < epicsMutex > &, char * pBuf,
unsigned bufLength ) const;
const char * pHostName (
epicsGuard < epicsMutex > & ) const;
bool ca_v41_ok (
epicsGuard < epicsMutex > & ) const;
bool ca_v42_ok (
epicsGuard < epicsMutex > & ) const;
void eliminateExcessiveSendBacklog (
epicsGuard < epicsMutex > * pCallbackGuard,
epicsGuard < epicsMutex > & mutualExclusionGuard );
void writeRequest (
epicsGuard < epicsMutex > &, nciu &,
unsigned type, arrayElementCount nElem,
const void *pValue );
void writeNotifyRequest (
epicsGuard < epicsMutex > &,
nciu &, netWriteNotifyIO &,
unsigned type, arrayElementCount nElem,
const void *pValue );
void readNotifyRequest (
epicsGuard < epicsMutex > &, nciu &,
netReadNotifyIO &, unsigned type,
arrayElementCount nElem );
void clearChannelRequest (
epicsGuard < epicsMutex > &,
ca_uint32_t sid, ca_uint32_t cid );
void subscriptionRequest (
epicsGuard < epicsMutex > &,
nciu &, netSubscription & );
void subscriptionUpdateRequest (
epicsGuard < epicsMutex > &,
nciu &, netSubscription & );
void subscriptionCancelRequest (
epicsGuard < epicsMutex > &,
nciu & chan, netSubscription & subscr );
void flushRequest (
epicsGuard < epicsMutex > & );
void requestRecvProcessPostponedFlush (
epicsGuard < epicsMutex > & );
osiSockAddr getNetworkAddress (
epicsGuard < epicsMutex > & ) const;
void uninstallChan (
epicsGuard < epicsMutex > & mutex,
nciu & );
void uninstallChanDueToSuccessfulSearchResponse (
epicsGuard < epicsMutex > &, nciu &,
const class epicsTime & currentTime );
double receiveWatchdogDelay (
epicsGuard < epicsMutex > & ) const;
bool searchMsg (
epicsGuard < epicsMutex > &, ca_uint32_t id,
const char * pName, unsigned nameLength );
};
extern noopiiu noopIIU;
#endif // ifndef noopiiuh

View File

@@ -142,7 +142,9 @@ private:
bool prevConnected;
void connectNotify ( epicsGuard < epicsMutex > & );
void disconnectNotify ( epicsGuard < epicsMutex > & );
void serviceShutdownNotify ();
void serviceShutdownNotify (
epicsGuard < epicsMutex > & callbackControlGuard,
epicsGuard < epicsMutex > & mutualExclusionGuard );
void accessRightsNotify (
epicsGuard < epicsMutex > &, const caAccessRights & );
void exception ( epicsGuard < epicsMutex > &,
@@ -324,7 +326,8 @@ public:
void vSignal ( int ca_status, const char * pfilenm,
int lineno, const char *pFormat, va_list args );
bool preemptiveCallbakIsEnabled () const;
void destroyChannel ( oldChannelNotify & chan );
void destroyChannel ( epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard, oldChannelNotify & chan );
void destroyGetCopy ( epicsGuard < epicsMutex > &, getCopy & );
void destroyGetCallback ( epicsGuard < epicsMutex > &, getCallback & );
void destroyPutCallback ( epicsGuard < epicsMutex > &, putCallback & );
@@ -365,8 +368,6 @@ private:
void callbackProcessingCompleteNotify ();
cacContext & createNetworkContext (
epicsMutex & mutualExclusion, epicsMutex & callbackControl );
void destroyChannelPrivate (
oldChannelNotify & chan, epicsGuard < epicsMutex > & cbGuard );
void clearSubscriptionPrivate (
evid pMon, epicsGuard < epicsMutex > & cbGuard );
@@ -379,6 +380,7 @@ private:
friend int epicsShareAPI ca_create_channel (
const char * name_str, caCh * conn_func, void * puser,
capri priority, chid * chanptr );
friend int epicsShareAPI ca_clear_channel ( chid pChan );
friend int epicsShareAPI ca_array_get ( chtype type,
arrayElementCount count, chid pChan, void * pValue );
friend int epicsShareAPI ca_array_get_callback ( chtype type,

View File

@@ -173,9 +173,14 @@ void oldChannelNotify::disconnectNotify (
}
}
void oldChannelNotify::serviceShutdownNotify ()
void oldChannelNotify::serviceShutdownNotify (
epicsGuard < epicsMutex > & callbackControlGuard,
epicsGuard < epicsMutex > & mutualExclusionGuard )
{
this->cacCtx.destroyChannel ( *this );
this->cacCtx.destroyChannel (
callbackControlGuard,
mutualExclusionGuard,
*this );
}
void oldChannelNotify::accessRightsNotify (

View File

@@ -30,14 +30,16 @@
#include "udpiiu.h"
#undef epicsExportSharedSymbols
static const double repeaterSubscribeTimerInitialPeriod = 10.0; // sec
static const double repeaterSubscribeTimerPeriod = 1.0; // sec
repeaterSubscribeTimer::repeaterSubscribeTimer (
udpiiu & iiuIn, epicsTimerQueue & queueIn,
repeaterTimerNotify & iiuIn, epicsTimerQueue & queueIn,
epicsMutex & cbMutexIn, cacContextNotify & ctxNotifyIn ) :
timer ( queueIn.createTimer () ), iiu ( iiuIn ),
cbMutex ( cbMutexIn ),ctxNotify ( ctxNotifyIn ),
attempts ( 0 ), registered ( false ), once ( false )
{
this->timer.start ( *this, 10.0 );
}
repeaterSubscribeTimer::~repeaterSubscribeTimer ()
@@ -45,9 +47,21 @@ repeaterSubscribeTimer::~repeaterSubscribeTimer ()
this->timer.destroy ();
}
void repeaterSubscribeTimer::shutdown ()
void repeaterSubscribeTimer::start ()
{
this->timer.cancel ();
this->timer.start (
*this, repeaterSubscribeTimerInitialPeriod );
}
void repeaterSubscribeTimer::shutdown (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard )
{
epicsGuardRelease < epicsMutex > unguard ( guard );
{
epicsGuardRelease < epicsMutex > unguard ( cbGuard );
this->timer.cancel ();
}
}
epicsTimerNotify::expireStatus repeaterSubscribeTimer::
@@ -58,11 +72,11 @@ epicsTimerNotify::expireStatus repeaterSubscribeTimer::
callbackManager mgr ( this->ctxNotify, this->cbMutex );
this->iiu.printf ( mgr.cbGuard,
"CA client library is unable to contact CA repeater after %u tries.\n",
nTriesToMsg);
nTriesToMsg );
this->iiu.printf ( mgr.cbGuard,
"Silence this message by starting a CA repeater daemon\n");
"Silence this message by starting a CA repeater daemon\n") ;
this->iiu.printf ( mgr.cbGuard,
"or by calling ca_pend_event() and or ca_poll() more often.\n");
"or by calling ca_pend_event() and or ca_poll() more often.\n" );
this->once = true;
}
@@ -73,7 +87,7 @@ epicsTimerNotify::expireStatus repeaterSubscribeTimer::
return noRestart;
}
else {
return expireStatus ( restart, 1.0 );
return expireStatus ( restart, repeaterSubscribeTimerPeriod );
}
}
@@ -87,3 +101,5 @@ void repeaterSubscribeTimer::confirmNotify ()
{
this->registered = true;
}
repeaterTimerNotify::~repeaterTimerNotify () {}

View File

@@ -29,21 +29,34 @@
#include "epicsTimer.h"
class udpiiu;
class epicsMutex;
class cacContextNotify;
class repeaterTimerNotify {
public:
virtual ~repeaterTimerNotify () = 0;
virtual void repeaterRegistrationMessage (
unsigned attemptNumber ) = 0;
virtual int printf (
epicsGuard < epicsMutex > & callbackControl,
const char * pformat, ... ) = 0;
};
class repeaterSubscribeTimer : private epicsTimerNotify {
public:
repeaterSubscribeTimer ( udpiiu &, epicsTimerQueue &,
repeaterSubscribeTimer (
repeaterTimerNotify &, epicsTimerQueue &,
epicsMutex & cbMutex, cacContextNotify & ctxNotify );
virtual ~repeaterSubscribeTimer ();
void shutdown ();
void start ();
void shutdown (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard );
void confirmNotify ();
void show ( unsigned level ) const;
private:
epicsTimer & timer;
udpiiu & iiu;
repeaterTimerNotify & iiu;
epicsMutex & cbMutex;
cacContextNotify & ctxNotify;
unsigned attempts;

View File

@@ -19,6 +19,7 @@
// Author: Jeff Hill
//
#include <stdexcept>
#include <limits.h>
#define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
@@ -28,188 +29,297 @@
#define epicsExportSharedSymbols
#include "iocinf.h"
#include "searchTimer.h"
#include "udpiiu.h"
#include "nciu.h"
static const unsigned initialTriesPerFrame = 1u; // initial UDP frames per search try
static const unsigned maxTriesPerFrame = 64u; // max UDP frames per search try
static const double minSearchPeriod = 30e-3; // seconds
static const double maxSearchPeriodDefault = 5.0 * 60.0; // seconds
static const double maxSearchPeriodLowerLimit = 60.0; // seconds
// This impacts the exponential backoff delay between search messages.
// This delay is two to the power of the minimum channel retry count
// times the estimated round trip time or the OS's delay quantum
// whichever is greater. So this results in about a one second delay.
static const unsigned successfulSearchRetrySetpoint = 6u;
static const unsigned beaconAnomalyRetrySetpoint = 6u;
static const unsigned disconnectRetrySetpoint = 6u;
//
// searchTimer::searchTimer ()
//
searchTimer::searchTimer ( udpiiu & iiuIn,
epicsTimerQueue & queueIn, udpMutex & mutexIn ) :
period ( 1e9 ),
searchTimer::searchTimer (
searchTimerNotify & iiuIn,
epicsTimerQueue & queueIn,
const unsigned indexIn,
epicsMutex & mutexIn,
bool boostPossibleIn ) :
timer ( queueIn.createTimer () ),
iiu ( iiuIn ),
mutex ( mutexIn ),
framesPerTry ( initialTriesPerFrame ),
framesPerTryCongestThresh ( DBL_MAX ),
maxPeriod ( maxSearchPeriodDefault ),
retry ( 0 ),
searchAttempts ( 0u ),
searchResponses ( 0u ),
searchAttemptsThisPass ( 0u ),
searchResponsesThisPass ( 0u ),
index ( indexIn ),
dgSeqNoAtTimerExpireBegin ( 0u ),
dgSeqNoAtTimerExpireEnd ( 0u ),
boostPossible ( boostPossibleIn ),
stopped ( false )
{
if ( envGetConfigParamPtr ( & EPICS_CA_MAX_SEARCH_PERIOD ) ) {
long longStatus = envGetDoubleConfigParam (
& EPICS_CA_MAX_SEARCH_PERIOD, & this->maxPeriod );
if ( ! longStatus ) {
if ( this->maxPeriod < maxSearchPeriodLowerLimit ) {
epicsPrintf ( "EPICS \"%s\" out of range (low)\n",
EPICS_CA_MAX_SEARCH_PERIOD.name );
this->maxPeriod = maxSearchPeriodLowerLimit;
epicsPrintf ( "Setting \"%s\" = %f seconds\n",
EPICS_CA_MAX_SEARCH_PERIOD.name, this->maxPeriod );
}
}
else {
epicsPrintf ( "EPICS \"%s\" wasnt a real number\n",
EPICS_CA_MAX_SEARCH_PERIOD.name );
epicsPrintf ( "Setting \"%s\" = %f seconds\n",
EPICS_CA_MAX_SEARCH_PERIOD.name, this->maxPeriod );
}
}
}
void searchTimer::start ()
{
this->timer.start ( *this, this->period () );
}
searchTimer::~searchTimer ()
{
assert ( this->chanListReqPending.count() == 0 );
assert ( this->chanListRespPending.count() == 0 );
this->timer.destroy ();
}
void searchTimer::shutdown ()
void searchTimer::shutdown (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard )
{
this->stopped = true;
this->timer.cancel ();
}
void searchTimer::channelCreatedNotify (
epicsGuard < udpMutex > & guard,
const epicsTime & currentTime, bool firstChannel )
{
this->newChannelNotify ( guard, currentTime,
firstChannel, 0 );
}
void searchTimer::channelDisconnectedNotify (
epicsGuard < udpMutex > & guard,
const epicsTime & currentTime, bool firstChannel )
{
this->newChannelNotify ( guard, currentTime,
firstChannel, disconnectRetrySetpoint );
}
void searchTimer::newChannelNotify (
epicsGuard < udpMutex > & guard, const epicsTime & currentTime,
bool firstChannel, const unsigned minRetryNo )
{
if ( ! this->stopped ) {
if ( firstChannel ) {
this->recomputeTimerPeriod ( guard, minRetryNo );
double newPeriod = this->period;
{
// avoid timer cancel block deadlock
epicsGuardRelease < udpMutex > unguard ( guard );
this->timer.start ( *this, currentTime + newPeriod );
}
}
else {
this->recomputeTimerPeriodAndStartTimer ( guard,
currentTime, minRetryNo, 0.0 );
}
}
}
void searchTimer::beaconAnomalyNotify (
epicsGuard < udpMutex > & guard,
const epicsTime & currentTime, const double & delay )
{
this->recomputeTimerPeriodAndStartTimer (
guard, currentTime, beaconAnomalyRetrySetpoint, delay );
}
// lock must be applied
void searchTimer::recomputeTimerPeriod (
epicsGuard < udpMutex > & guard, const unsigned retryNew ) // X aCC 431
{
this->retry = retryNew;
size_t idelay = 1u << tsMin ( (size_t) this->retry,
CHAR_BIT * sizeof ( idelay ) - 1u );
double delayFactor = tsMax (
this->iiu.roundTripDelayEstimate ( guard ) * 2.0, minSearchPeriod );
this->period = idelay * delayFactor; /* sec */
this->period = tsMin ( this->maxPeriod, this->period );
}
void searchTimer::recomputeTimerPeriodAndStartTimer ( epicsGuard < udpMutex > & guard,
const epicsTime & currentTime, const unsigned retryNew, const double & initialDelay )
{
if ( this->iiu.unresolvedChannelCount ( guard ) == 0 || this->stopped ) {
return;
}
bool start = false;
double totalDelay = initialDelay;
{
if ( this->retry <= retryNew ) {
return;
}
double oldPeriod = this->period;
this->recomputeTimerPeriod ( guard, retryNew );
totalDelay += this->period;
if ( totalDelay < oldPeriod ) {
epicsTimer::expireInfo info = this->timer.getExpireInfo ();
if ( info.active ) {
double delay = info.expireTime - currentTime;
if ( delay > totalDelay ) {
start = true;
}
}
else {
start = true;
}
epicsGuardRelease < epicsMutex > unguard ( guard );
{
epicsGuardRelease < epicsMutex > unguard ( cbGuard );
this->timer.cancel ();
}
}
if ( start ) {
// avoid timer cancel block deadlock
epicsGuardRelease < udpMutex > unguard ( guard );
this->timer.start ( *this, currentTime + totalDelay );
while ( nciu * pChan = this->chanListReqPending.get () ) {
pChan->channelNode::listMember =
channelNode::cs_none;
pChan->serviceShutdownNotify ( cbGuard, guard );
}
while ( nciu * pChan = this->chanListRespPending.get () ) {
pChan->channelNode::listMember =
channelNode::cs_none;
pChan->serviceShutdownNotify ( cbGuard, guard );
}
}
void searchTimer::installChannel (
epicsGuard < epicsMutex > & guard, nciu & chan )
{
this->chanListReqPending.add ( chan );
chan.channelNode::setReqPendingState ( guard, this->index );
}
void searchTimer::moveChannels (
epicsGuard < epicsMutex > & guard, searchTimer & dest )
{
while ( nciu * pChan = this->chanListRespPending.get () ) {
if ( this->searchAttempts > 0 ) {
this->searchAttempts--;
}
dest.installChannel ( guard, *pChan );
}
while ( nciu * pChan = this->chanListReqPending.get () ) {
dest.installChannel ( guard, *pChan );
}
debugPrintf ( ( "changed search period to %f sec\n", this->period ) );
}
//
// searchTimer::notifySuccessfulSearchResponse ()
// searchTimer::expire ()
//
epicsTimerNotify::expireStatus searchTimer::expire (
const epicsTime & currentTime ) // X aCC 361
{
epicsGuard < epicsMutex > guard ( this->mutex );
while ( nciu * pChan = this->chanListRespPending.get () ) {
pChan->channelNode::listMember =
channelNode::cs_none;
this->iiu.noSearchRespNotify (
guard, *pChan, this->index );
}
// boost search period for channels not recently
// searched for if there was some success
if ( this->searchResponses && this->boostPossible ) {
while ( nciu * pChan = this->chanListReqPending.get () ) {
pChan->channelNode::listMember =
channelNode::cs_none;
this->iiu.boostChannel ( guard, *pChan );
}
}
if ( this->searchAttempts ) {
#if 0
//
// dynamically adjust the number of UDP frames per
// try depending how many search requests are not
// replied to
//
// The variable this->framesPerTry
// determines the number of UDP frames to be sent
// each time that expire() is called.
// If this value is too high we will waste some
// network bandwidth. If it is too low we will
// use very little of the incoming UDP message
// buffer associated with the server's port and
// will therefore take longer to connect. We
// initialize this->framesPerTry to a prime number
// so that it is less likely that the
// same channel is in the last UDP frame
// sent every time that this is called (and
// potentially discarded by a CA server with
// a small UDP input queue).
//
// increase frames per try only if we see better than
// a 93.75% success rate for one pass through the list
//
if ( this->searchResponses >
( this->searchAttempts - (this->searchAttempts/16u) ) ) {
// increase UDP frames per try if we have a good score
if ( this->framesPerTry < maxTriesPerFrame ) {
// a congestion avoidance threshold similar to TCP is now used
if ( this->framesPerTry < this->framesPerTryCongestThresh ) {
this->framesPerTry += this->framesPerTry;
}
else {
this->framesPerTry += (this->framesPerTry/8) + 1;
}
debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n",
this->framesPerTry, this->searchAttempts, this->searchResponses) );
}
}
// if we detect congestion because we have less than a 87.5% success
// rate then gradually reduce the frames per try
else if ( this->searchResponses <
( this->searchAttempts - (this->searchAttempts/8u) ) ) {
if ( this->framesPerTry > 1 ) {
this->framesPerTry--;
}
this->framesPerTryCongestThresh = this->framesPerTry/2 + 1;
debugPrintf ( ("Congestion detected - set frames per try to %f t=%u r=%u\n",
this->framesPerTry, this->searchAttempts, this->searchResponses) );
}
#else
if ( this->searchResponses == this->searchAttempts ) {
// increase UDP frames per try if we have a good score
if ( this->framesPerTry < maxTriesPerFrame ) {
// a congestion avoidance threshold similar to TCP is now used
if ( this->framesPerTry < this->framesPerTryCongestThresh ) {
double doubled = 2 * this->framesPerTry;
if ( doubled > this->framesPerTryCongestThresh ) {
this->framesPerTry = this->framesPerTryCongestThresh;
}
else {
this->framesPerTry = doubled;
}
}
else {
this->framesPerTry += 1.0 / this->framesPerTry;
}
debugPrintf ( ("Increasing frame count to %g t=%u r=%u\n",
this->framesPerTry, this->searchAttempts, this->searchResponses) );
}
}
else {
this->framesPerTryCongestThresh = this->framesPerTry / 2.0;
this->framesPerTry = 1u;
debugPrintf ( ("Congestion detected - set frames per try to %g t=%u r=%u\n",
this->framesPerTry, this->searchAttempts, this->searchResponses) );
}
#endif
}
this->dgSeqNoAtTimerExpireBegin =
this->iiu.datagramSeqNumber ( guard );
this->searchAttempts = 0;
this->searchResponses = 0;
unsigned nFrameSent = 0u;
while ( true ) {
nciu * pChan = this->chanListReqPending.get ();
if ( ! pChan ) {
break;
}
pChan->channelNode::listMember =
channelNode::cs_none;
bool success = pChan->searchMsg ( guard );
if ( ! success ) {
if ( this->iiu.datagramFlush ( guard, currentTime ) ) {
nFrameSent++;
if ( nFrameSent < this->framesPerTry ) {
success = pChan->searchMsg ( guard );
}
}
if ( ! success ) {
this->chanListReqPending.push ( *pChan );
pChan->channelNode::setReqPendingState (
guard, this->index );
break;
}
}
this->chanListRespPending.add ( *pChan );
pChan->channelNode::setRespPendingState (
guard, this->index );
if ( this->searchAttempts < UINT_MAX ) {
this->searchAttempts++;
}
}
// flush out the search request buffer
if ( this->iiu.datagramFlush ( guard, currentTime ) ) {
nFrameSent++;
}
this->dgSeqNoAtTimerExpireEnd =
this->iiu.datagramSeqNumber ( guard ) - 1u;
this->timeAtLastSend = currentTime;
# ifdef DEBUG
if ( this->searchAttempts ) {
char buf[64];
currentTime.strftime ( buf, sizeof(buf), "%M:%S.%09f");
debugPrintf ( ("sent %u delay sec=%f Rts=%s\n",
nFrameSent, this->period(), buf ) );
}
# endif
return expireStatus ( restart, this->period() );
}
void searchTimer::show ( unsigned level ) const
{
epicsGuard < epicsMutex > guard ( this->mutex );
::printf ( "search timer delay %f\n", this->period() );
::printf ( "%u channels with search request pending\n",
this->chanListReqPending.count () );
tsDLIterConst < nciu > pChan = this->chanListReqPending.firstIter ();
while ( pChan.valid () ) {
pChan->show ( level - 1u );
pChan++;
}
::printf ( "%u channels with search response pending\n",
this->chanListRespPending.count () );
pChan = this->chanListRespPending.firstIter ();
while ( pChan.valid () ) {
pChan->show ( level - 1u );
pChan++;
}
}
//
// Reset the delay to the next search request if we get
// at least one response. However, dont reset this delay if we
// get a delayed response to an old search request.
//
void searchTimer::notifySuccessfulSearchResponse ( epicsGuard < udpMutex > & guard,
ca_uint32_t respDatagramSeqNo, bool seqNumberIsValid, const epicsTime & currentTime )
void searchTimer::uninstallChanDueToSuccessfulSearchResponse (
epicsGuard < epicsMutex > & guard, nciu & chan,
ca_uint32_t respDatagramSeqNo, bool seqNumberIsValid,
const epicsTime & currentTime )
{
if ( this->iiu.unresolvedChannelCount ( guard ) == 0 || this->stopped ) {
this->uninstallChan ( guard, chan );
if ( this->stopped ) {
return;
}
@@ -223,235 +333,49 @@ void searchTimer::notifySuccessfulSearchResponse ( epicsGuard < udpMutex > & gua
// if we receive a successful response then reset to a
// reasonable timer period
if ( validResponse ) {
double measured = currentTime - this->timeAtLastSend;
this->iiu.updateRTTE ( measured );
if ( this->searchResponses < UINT_MAX ) {
this->searchResponses++;
if ( this->searchResponses == this->searchAttempts ) {
debugPrintf ( ( "Response set timer delay to zero\n" ) );
if ( this->retry > successfulSearchRetrySetpoint ) {
this->recomputeTimerPeriod (
guard, successfulSearchRetrySetpoint );
}
// avoid timer cancel block deadlock
epicsGuardRelease < udpMutex > unguard ( guard );
//
// when we get 100% success immediately
// send another search request
//
this->timer.start ( *this, currentTime );
}
else {
debugPrintf ( ( "Response set timer delay to beacon anomaly set point\n" ) );
//
// otherwise, if making some progress then dont allow
// retry rate to drop below some reasonable minimum
//
if ( this->retry > successfulSearchRetrySetpoint ) {
this->recomputeTimerPeriodAndStartTimer (
guard, currentTime, successfulSearchRetrySetpoint, 0.0 );
if ( this->chanListReqPending.count () ) {
// avoid timer cancel block deadlock
epicsGuardRelease < epicsMutex > unguard ( guard );
//
// when we get 100% success immediately
// send another search request
//
debugPrintf ( ( "All requests succesful, set timer delay to zero\n" ) );
this->timer.start ( *this, currentTime );
}
}
}
}
}
//
// searchTimer::expire ()
//
epicsTimerNotify::expireStatus searchTimer::expire ( const epicsTime & currentTime ) // X aCC 361
void searchTimer::uninstallChan (
epicsGuard < epicsMutex > & cacGuard, nciu & chan )
{
epicsGuard < udpMutex > guard ( this->mutex );
// check to see if there is nothing to do here
if ( this->iiu.unresolvedChannelCount ( guard ) == 0 ) {
debugPrintf ( ( "all channels located - search timer terminating\n" ) );
this->period = DBL_MAX;
return noRestart;
}
#if 0
//
// dynamically adjust the number of UDP frames per
// try depending how many search requests are not
// replied to
//
// The variable this->framesPerTry
// determines the number of UDP frames to be sent
// each time that expire() is called.
// If this value is too high we will waste some
// network bandwidth. If it is too low we will
// use very little of the incoming UDP message
// buffer associated with the server's port and
// will therefore take longer to connect. We
// initialize this->framesPerTry to a prime number
// so that it is less likely that the
// same channel is in the last UDP frame
// sent every time that this is called (and
// potentially discarded by a CA server with
// a small UDP input queue).
//
// increase frames per try only if we see better than
// a 93.75% success rate for one pass through the list
//
if ( this->searchResponses >
( this->searchAttempts - (this->searchAttempts/16u) ) ) {
// increase UDP frames per try if we have a good score
if ( this->framesPerTry < maxTriesPerFrame ) {
// a congestion avoidance threshold similar to TCP is now used
if ( this->framesPerTry < this->framesPerTryCongestThresh ) {
this->framesPerTry += this->framesPerTry;
}
else {
this->framesPerTry += (this->framesPerTry/8) + 1;
}
debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n",
this->framesPerTry, this->searchAttempts, this->searchResponses) );
}
cacGuard.assertIdenticalMutex ( this->mutex );
if ( chan.channelNode::listMember ==
this->index + channelNode::cs_searchReqPending0 ) {
this->chanListReqPending.remove ( chan );
}
// if we detect congestion because we have less than a 87.5% success
// rate then gradually reduce the frames per try
else if ( this->searchResponses <
( this->searchAttempts - (this->searchAttempts/8u) ) ) {
if ( this->framesPerTry > 1 ) {
this->framesPerTry--;
}
this->framesPerTryCongestThresh = this->framesPerTry/2 + 1;
debugPrintf ( ("Congestion detected - set frames per try to %f t=%u r=%u\n",
this->framesPerTry, this->searchAttempts, this->searchResponses) );
}
#else
if ( this->searchResponses == this->searchAttempts ) {
// increase UDP frames per try if we have a good score
if ( this->framesPerTry < maxTriesPerFrame ) {
// a congestion avoidance threshold similar to TCP is now used
if ( this->framesPerTry < this->framesPerTryCongestThresh ) {
double doubled = 2 * this->framesPerTry;
if ( doubled > this->framesPerTryCongestThresh ) {
this->framesPerTry = this->framesPerTryCongestThresh;
}
else {
this->framesPerTry = doubled;
}
}
else {
this->framesPerTry += 1.0 / this->framesPerTry;
}
debugPrintf ( ("Increasing frame count to %g t=%u r=%u\n",
this->framesPerTry, this->searchAttempts, this->searchResponses) );
}
}
else {
this->framesPerTryCongestThresh = this->framesPerTry / 2.0;
this->framesPerTry = 1u;
debugPrintf ( ("Congestion detected - set frames per try to %g t=%u r=%u\n",
this->framesPerTry, this->searchAttempts, this->searchResponses) );
}
#endif
if ( this->searchAttemptsThisPass <= UINT_MAX - this->searchAttempts ) {
this->searchAttemptsThisPass += this->searchAttempts;
else if ( chan.channelNode::listMember ==
this->index + channelNode::cs_searchRespPending0 ) {
this->chanListRespPending.remove ( chan );
}
else {
this->searchAttemptsThisPass = UINT_MAX;
throw std::runtime_error (
"uninstalling channel search timer, but channel state is wrong" );;
}
if ( this->searchResponsesThisPass <= UINT_MAX - this->searchResponses ) {
this->searchResponsesThisPass += this->searchResponses;
}
else {
this->searchResponsesThisPass = UINT_MAX;
}
this->dgSeqNoAtTimerExpireBegin = this->iiu.datagramSeqNumber ( guard );
this->searchAttempts = 0;
this->searchResponses = 0;
unsigned nChanSent = 0u;
unsigned nFrameSent = 0u;
while ( true ) {
// check to see if we have reached the end of the list
if ( this->searchAttemptsThisPass >= this->iiu.unresolvedChannelCount ( guard ) ) {
// if we are making some progress then dont increase the
// delay between search requests
if ( this->searchResponsesThisPass == 0u ) {
this->recomputeTimerPeriod ( guard, this->retry + 1 );
}
this->searchAttemptsThisPass = 0;
this->searchResponsesThisPass = 0;
debugPrintf ( ("saw end of list\n") );
}
if ( ! this->iiu.searchMsg ( guard ) ) {
nFrameSent++;
if ( nFrameSent >= this->framesPerTry ) {
break;
}
this->dgSeqNoAtTimerExpireEnd = this->iiu.datagramSeqNumber ( guard );
this->iiu.datagramFlush ( guard, currentTime );
if ( ! this->iiu.searchMsg ( guard ) ) {
break;
}
}
if ( this->searchAttempts < UINT_MAX ) {
this->searchAttempts++;
}
if ( nChanSent < UINT_MAX ) {
nChanSent++;
}
//
// dont send any of the channels twice within one try
//
if ( nChanSent >= this->iiu.unresolvedChannelCount ( guard ) ) {
//
// add one to nFrameSent because there may be
// one more partial frame to be sent
//
nFrameSent++;
//
// cap this->framesPerTry to
// the number of frames required for all of
// the unresolved channels
//
if ( this->framesPerTry > nFrameSent ) {
this->framesPerTry = nFrameSent;
}
break;
}
}
// flush out the search request buffer
this->iiu.datagramFlush ( guard, currentTime );
this->dgSeqNoAtTimerExpireEnd = this->iiu.datagramSeqNumber ( guard ) - 1u;
# ifdef DEBUG
char buf[64];
epicsTime ts = currentTime;
ts.strftime ( buf, sizeof(buf), "%M:%S.%09f");
debugPrintf ( ("sent %u delay sec=%f Rts=%s\n",
nFrameSent, this->period, buf ) );
# endif
if ( this->iiu.unresolvedChannelCount ( guard ) == 0 ) {
debugPrintf ( ( "all channels connected\n" ) );
this->period = DBL_MAX;
return noRestart;
}
// the code used to test this->minRetry < maxSearchTries here
// and return no restart if the maximum tries was exceeded
// prior to R3.14.7
return expireStatus ( restart, this->period );
chan.channelNode::listMember = channelNode::cs_none;
}
void searchTimer::show ( unsigned /* level */ ) const
double searchTimer::period () const
{
return (1 << this->index ) * this->iiu.getRTTE ();
}
searchTimerNotify::~searchTimerNotify () {}

View File

@@ -42,50 +42,68 @@
#endif
#include "caProto.h"
#include "netiiu.h"
class udpMutex;
class searchTimerNotify {
public:
virtual ~searchTimerNotify () = 0;
virtual void boostChannel (
epicsGuard < epicsMutex > &, nciu & ) = 0;
virtual void noSearchRespNotify (
epicsGuard < epicsMutex > &, nciu &, unsigned ) = 0;
virtual double getRTTE () const = 0;
virtual void updateRTTE ( double rtte ) = 0;
virtual bool datagramFlush (
epicsGuard < epicsMutex > &,
const epicsTime & currentTime ) = 0;
virtual ca_uint32_t datagramSeqNumber (
epicsGuard < epicsMutex > & ) const = 0;
};
class searchTimer : private epicsTimerNotify {
public:
searchTimer ( class udpiiu &, epicsTimerQueue &, udpMutex & );
searchTimer (
class searchTimerNotify &, epicsTimerQueue &,
const unsigned index, epicsMutex &,
bool boostPossible );
virtual ~searchTimer ();
void notifySuccessfulSearchResponse ( epicsGuard < udpMutex > &,
ca_uint32_t respDatagramSeqNo,
bool seqNumberIsValid, const epicsTime & currentTime );
void beaconAnomalyNotify ( epicsGuard < udpMutex > &,
const epicsTime & currentTime, const double & delay );
void channelCreatedNotify ( epicsGuard < udpMutex > &,
const epicsTime &, bool firstChannel );
void channelDisconnectedNotify ( epicsGuard < udpMutex > &,
const epicsTime &, bool firstChannel );
void shutdown ();
void start ();
void shutdown (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard );
void moveChannels (
epicsGuard < epicsMutex > &, searchTimer & dest );
void installChannel (
epicsGuard < epicsMutex > &, nciu & );
void uninstallChan (
epicsGuard < epicsMutex > &, nciu & );
void uninstallChanDueToSuccessfulSearchResponse (
epicsGuard < epicsMutex > &, nciu &,
ca_uint32_t respDatagramSeqNo, bool seqNumberIsValid,
const epicsTime & currentTime );
void show ( unsigned level ) const;
private:
double period; /* period between tries */
tsDLList < nciu > chanListReqPending;
tsDLList < nciu > chanListRespPending;
epicsTime timeAtLastSend;
epicsTimer & timer;
class udpiiu & iiu;
udpMutex & mutex;
searchTimerNotify & iiu;
epicsMutex & mutex;
double framesPerTry; /* # of UDP frames per search try */
double framesPerTryCongestThresh; /* one half N tries w congest */
double maxPeriod;
unsigned retry;
unsigned searchAttempts; /* num search tries after last timer experation */
unsigned searchResponses; /* num search resp after last timer experation */
unsigned searchAttemptsThisPass; /* num search tries within this pass */
unsigned searchResponsesThisPass; /* num search resp within this pass */
const unsigned index;
ca_uint32_t dgSeqNoAtTimerExpireBegin;
ca_uint32_t dgSeqNoAtTimerExpireEnd;
const bool boostPossible;
bool stopped;
expireStatus expire ( const epicsTime & currentTime );
void recomputeTimerPeriod ( epicsGuard < udpMutex > &, const unsigned minRetryNew );
void recomputeTimerPeriodAndStartTimer ( epicsGuard < udpMutex > &,
const epicsTime & currentTime, const unsigned minRetryNew,
const double & initialDelay );
void newChannelNotify ( epicsGuard < udpMutex > &,
const epicsTime &, bool firstChannel,
const unsigned minRetryNo );
searchTimer ( const searchTimer & );
searchTimer & operator = ( const searchTimer & );
double period () const;
searchTimer ( const searchTimer & ); // not implemented
searchTimer & operator = ( const searchTimer & ); // not implemented
};
#endif // ifdef searchTimerh

View File

@@ -36,7 +36,7 @@ tcpRecvWatchdog::tcpRecvWatchdog
cbMutex ( cbMutexIn ), ctxNotify ( ctxNotifyIn ),
mutex ( mutexIn ), iiu ( iiuIn ),
probeResponsePending ( false ), beaconAnomaly ( true ),
probeTimeoutDetected ( false )
probeTimeoutDetected ( false ), shuttingDown ( false )
{
}
@@ -49,6 +49,10 @@ epicsTimerNotify::expireStatus
tcpRecvWatchdog::expire ( const epicsTime & /* currentTime */ ) // X aCC 361
{
callbackManager mgr ( this->ctxNotify, this->cbMutex );
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->shuttingDown ) {
return noRestart;
}
if ( this->probeResponsePending ) {
if ( this->iiu.bytesArePendingInOS() ) {
this->iiu.printf ( mgr.cbGuard,
@@ -64,7 +68,6 @@ tcpRecvWatchdog::expire ( const epicsTime & /* currentTime */ ) // X aCC 361
"o application is blocked in a callback from the client library\n" );
}
{
epicsGuard < epicsMutex > guard ( this->mutex );
# ifdef DEBUG
char hostName[128];
this->iiu.hostName ( guard, hostName, sizeof (hostName) );
@@ -78,11 +81,8 @@ tcpRecvWatchdog::expire ( const epicsTime & /* currentTime */ ) // X aCC 361
return noRestart;
}
else {
{
epicsGuard < epicsMutex > guard ( this->mutex );
this->probeTimeoutDetected = false;
this->probeResponsePending = this->iiu.setEchoRequestPending ( guard );
}
this->probeTimeoutDetected = false;
this->probeResponsePending = this->iiu.setEchoRequestPending ( guard );
debugPrintf ( ("circuit timed out - sending echo request\n") );
return expireStatus ( restart, CA_ECHO_TIMEOUT );
}
@@ -92,7 +92,7 @@ void tcpRecvWatchdog::beaconArrivalNotify (
epicsGuard < epicsMutex > & guard, const epicsTime & currentTime )
{
guard.assertIdenticalMutex ( this->mutex );
if ( ! this->beaconAnomaly && ! this->probeResponsePending ) {
if ( ! ( this->shuttingDown || this->beaconAnomaly || this->probeResponsePending ) ) {
epicsGuardRelease < epicsMutex > unguard ( guard );
this->timer.start ( *this, currentTime + this->period );
debugPrintf ( ("saw a normal beacon - reseting circuit receive watchdog\n") );
@@ -120,15 +120,15 @@ void tcpRecvWatchdog::messageArrivalNotify (
bool restartNeeded = false;
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( ! this->probeResponsePending ) {
if ( ! ( this->shuttingDown || this->probeResponsePending ) ) {
this->beaconAnomaly = false;
restartNeeded = true;
}
}
// dont hold the lock for fear of deadlocking
// because cancel is blocking for the completion
// of the recvDog expire which takes the lock
// - it take also the callback lock
// of expire() which takes the lock - it take also
// the callback lock
if ( restartNeeded ) {
this->timer.start ( *this, currentTime + this->period );
debugPrintf ( ("received a message - reseting circuit recv watchdog\n") );
@@ -143,7 +143,7 @@ void tcpRecvWatchdog::probeResponseNotify (
double restartDelay = DBL_MAX;
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->probeResponsePending ) {
if ( this->probeResponsePending && ! this->shuttingDown ) {
restartNeeded = true;
if ( this->probeTimeoutDetected ) {
this->probeTimeoutDetected = false;
@@ -192,7 +192,7 @@ void tcpRecvWatchdog::sendBacklogProgressNotify (
// beacon anomaly (which could be transiently detecting a reboot) we will
// not trust the beacon as an indicator of a healthy server until we
// receive at least one message from the server.
if ( this->probeResponsePending ) {
if ( this->probeResponsePending && ! this->shuttingDown ) {
// we avoid calling this with the lock applied because
// it restarts the recv wd timer, this might block
// until a recv wd timer expire callback completes, and
@@ -205,6 +205,12 @@ void tcpRecvWatchdog::sendBacklogProgressNotify (
void tcpRecvWatchdog::connectNotify ()
{
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->shuttingDown ) {
return;
}
}
this->timer.start ( *this, this->period );
debugPrintf ( ("connected to the server - initiating circuit recv watchdog\n") );
}
@@ -217,7 +223,7 @@ void tcpRecvWatchdog::sendTimeoutNotify (
guard.assertIdenticalMutex ( this->mutex );
bool restartNeeded = false;
if ( ! this->probeResponsePending ) {
if ( ! ( this->probeResponsePending || this->shuttingDown ) ) {
this->probeResponsePending = this->iiu.setEchoRequestPending ( guard );
restartNeeded = true;
}
@@ -237,6 +243,16 @@ void tcpRecvWatchdog::cancel ()
debugPrintf ( ("canceling TCP recv watchdog\n") );
}
void tcpRecvWatchdog::shutdown ()
{
{
epicsGuard < epicsMutex > guard ( this->mutex );
this->shuttingDown = true;
}
this->timer.cancel ();
debugPrintf ( ("canceling TCP recv watchdog\n") );
}
double tcpRecvWatchdog::delay () const
{
return this->timer.getExpireDelay ();

View File

@@ -56,6 +56,7 @@ public:
epicsGuard < epicsMutex > & mutex,
const epicsTime & currentTime );
void cancel ();
void shutdown ();
void show ( unsigned level ) const;
double delay () const;
private:
@@ -68,6 +69,7 @@ private:
bool probeResponsePending;
bool beaconAnomaly;
bool probeTimeoutDetected;
bool shuttingDown;
expireStatus expire ( const epicsTime & currentTime );
tcpRecvWatchdog ( const tcpRecvWatchdog & );
tcpRecvWatchdog & operator = ( const tcpRecvWatchdog & );

View File

@@ -44,6 +44,7 @@ epicsTimerNotify::expireStatus tcpSendWatchdog::expire (
const epicsTime & currentTime )
{
callbackManager mgr ( this->ctxNotify, this->cbMutex );
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->iiu.bytesArePendingInOS() ) {
this->iiu.printf ( mgr.cbGuard,
"The CA client library is disconnecting after a flush request "
@@ -51,13 +52,12 @@ epicsTimerNotify::expireStatus tcpSendWatchdog::expire (
"application schedualing problem\n" );
}
# ifdef DEBUG
epicsGuard < epicsMutex > guard ( this->mutex );
char hostName[128];
this->iiu.hostName ( guard, hostName, sizeof ( hostName ) );
debugPrintf ( ( "Request not accepted by CA server %s for %g sec. Disconnecting.\n",
hostName, this->period ) );
# endif
this->iiu.sendTimeoutNotify ( currentTime, mgr );
this->iiu.sendTimeoutNotify ( currentTime, mgr, guard );
return noRestart;
}
@@ -71,3 +71,4 @@ void tcpSendWatchdog::cancel ()
this->timer.cancel ();
}

View File

@@ -185,7 +185,7 @@ void tcpSendThread::run ()
}
this->iiu.sendDog.cancel ();
this->iiu.recvDog.cancel ();
this->iiu.recvDog.shutdown ();
while ( ! this->iiu.recvThread.exitWait ( 30.0 ) ) {
// it is possible to get stuck here if the user calls
@@ -333,15 +333,16 @@ void tcpiiu::recvBytes (
continue;
}
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
// the replacable printf handler isnt called here
// because it reqires a callback lock which probably
// isnt appropriate here
char name[64];
this->hostNameCacheInstance.hostName (
name, sizeof ( name ) );
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf (
"Unexpected problem with CA circuit to"
" server \"%s\" was \"%s\" - disconnecting\n",
@@ -813,10 +814,11 @@ void tcpiiu::responsiveCircuitNotify (
void tcpiiu::sendTimeoutNotify (
const epicsTime & currentTime,
callbackManager & mgr )
callbackManager & mgr,
epicsGuard < epicsMutex > & guard )
{
mgr.cbGuard.assertIdenticalMutex ( this-> cbMutex );
epicsGuard < epicsMutex > guard ( this->mutex );
guard.assertIdenticalMutex ( this->mutex );
this->unresponsiveCircuitNotify ( mgr.cbGuard, guard );
// setup circuit probe sequence
this->recvDog.sendTimeoutNotify ( mgr.cbGuard, guard, currentTime );
@@ -844,8 +846,16 @@ void tcpiiu::unresponsiveCircuitNotify (
this->sendThreadFlushEvent.signal ();
this->flushBlockEvent.signal ();
this->recvDog.cancel ();
this->sendDog.cancel ();
// must not hold lock when canceling timer
{
epicsGuardRelease < epicsMutex > unguard ( guard );
{
epicsGuardRelease < epicsMutex > unguard ( cbGuard );
this->recvDog.cancel ();
this->sendDog.cancel ();
}
}
if ( this->connectedList.count() ) {
char hostNameTmp[128];
this->hostName ( guard, hostNameTmp, sizeof ( hostNameTmp ) );
@@ -944,7 +954,7 @@ tcpiiu::~tcpiiu ()
this->sendThread.exitWait ();
this->recvThread.exitWait ();
this->sendDog.cancel ();
this->recvDog.cancel ();
this->recvDog.shutdown ();
if ( ! this->socketHasBeenClosed ) {
epicsSocketDestroy ( this->sock );
@@ -1691,49 +1701,76 @@ const char * tcpiiu::pHostName (
return nameBuf;
}
void tcpiiu::removeAllChannels (
bool supressApplicationNotify,
void tcpiiu::disconnectAllChannels (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard,
udpiiu & discIIU )
class udpiiu & discIIU )
{
cbGuard.assertIdenticalMutex ( this->cbMutex );
guard.assertIdenticalMutex ( this->mutex );
while ( nciu * pChan = this->createReqPend.get () ) {
discIIU.installDisconnectedChannel ( *pChan );
pChan->setServerAddressUnknown ( discIIU, guard );
discIIU.installDisconnectedChannel ( guard, *pChan );
}
while ( nciu * pChan = this->createRespPend.get () ) {
this->clearChannelRequest ( guard,
pChan->getSID(guard), pChan->getCID(guard) );
discIIU.installDisconnectedChannel ( *pChan );
pChan->setServerAddressUnknown ( discIIU, guard );
discIIU.installDisconnectedChannel ( guard, *pChan );
}
while ( nciu * pChan = this->subscripReqPend.get () ) {
pChan->disconnectAllIO ( cbGuard, guard );
discIIU.installDisconnectedChannel ( *pChan );
pChan->setServerAddressUnknown ( discIIU, guard );
if ( ! supressApplicationNotify ) {
pChan->unresponsiveCircuitNotify ( cbGuard, guard );
}
discIIU.installDisconnectedChannel ( guard, *pChan );
pChan->unresponsiveCircuitNotify ( cbGuard, guard );
}
while ( nciu * pChan = this->connectedList.get () ) {
pChan->disconnectAllIO ( cbGuard, guard );
discIIU.installDisconnectedChannel ( *pChan );
pChan->setServerAddressUnknown ( discIIU, guard );
if ( ! supressApplicationNotify ) {
pChan->unresponsiveCircuitNotify ( cbGuard, guard );
}
discIIU.installDisconnectedChannel ( guard, *pChan );
pChan->unresponsiveCircuitNotify ( cbGuard, guard );
}
while ( nciu * pChan = this->unrespCircuit.get () ) {
pChan->disconnectAllIO ( cbGuard, guard );
discIIU.installDisconnectedChannel ( *pChan );
pChan->setServerAddressUnknown ( discIIU, guard );
discIIU.installDisconnectedChannel ( guard, *pChan );
}
this->channelCountTot = 0u;
this->initiateCleanShutdown ( guard );
}
void tcpiiu::unlinkAllChannels (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard )
{
cbGuard.assertIdenticalMutex ( this->cbMutex );
guard.assertIdenticalMutex ( this->mutex );
while ( nciu * pChan = this->createReqPend.get () ) {
pChan->serviceShutdownNotify ( cbGuard, guard );
}
while ( nciu * pChan = this->createRespPend.get () ) {
this->clearChannelRequest ( guard,
pChan->getSID(guard), pChan->getCID(guard) );
pChan->serviceShutdownNotify ( cbGuard, guard );
}
while ( nciu * pChan = this->subscripReqPend.get () ) {
pChan->disconnectAllIO ( cbGuard, guard );
pChan->serviceShutdownNotify ( cbGuard, guard );
}
while ( nciu * pChan = this->connectedList.get () ) {
pChan->disconnectAllIO ( cbGuard, guard );
pChan->serviceShutdownNotify ( cbGuard, guard );
}
while ( nciu * pChan = this->unrespCircuit.get () ) {
pChan->disconnectAllIO ( cbGuard, guard );
pChan->serviceShutdownNotify ( cbGuard, guard );
}
this->channelCountTot = 0u;
@@ -1785,11 +1822,8 @@ void tcpiiu::connectNotify (
}
void tcpiiu::uninstallChan (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard,
nciu & chan )
epicsGuard < epicsMutex > & guard, nciu & chan )
{
cbGuard.assertIdenticalMutex ( this->cbMutex );
guard.assertIdenticalMutex ( this->mutex );
switch ( chan.channelNode::listMember ) {
@@ -1812,7 +1846,7 @@ void tcpiiu::uninstallChan (
this->subscripUpdateReqPend.remove ( chan );
break;
default:
this->cacRef.printf ( cbGuard,
errlogPrintf (
"cac: attempt to uninstall channel from tcp iiu, but it inst installed there?" );
}
chan.channelNode::listMember = channelNode::cs_none;
@@ -1922,6 +1956,21 @@ unsigned tcpiiu::channelCount ( epicsGuard < epicsMutex > & guard )
return this->channelCountTot;
}
void tcpiiu::uninstallChanDueToSuccessfulSearchResponse (
epicsGuard < epicsMutex > & guard, nciu & chan,
const class epicsTime & currentTime )
{
return netiiu::uninstallChanDueToSuccessfulSearchResponse (
guard, chan, currentTime );
}
bool tcpiiu::searchMsg (
epicsGuard < epicsMutex > & guard, ca_uint32_t id,
const char * pName, unsigned nameLength )
{
return netiiu::searchMsg (
guard, id, pName, nameLength );
}

View File

@@ -36,8 +36,6 @@
#include "iocinf.h"
#include "inetAddrID.h"
#include "cac.h"
#include "searchTimer.h"
#include "repeaterSubscribeTimer.h"
#include "disconnectGovernorTimer.h"
// UDP protocol dispatch table
@@ -77,34 +75,69 @@ udpiiu::udpiiu (
cac::lowestPriorityLevelAbove (
cac::lowestPriorityLevelAbove (
cac.getInitializingThreadsPriority () ) ) ),
rtteMean ( 5.0e-3 ), // seconds
repeaterSubscribeTmr (
*this, timerQueue, cbMutexIn, ctxNotifyIn ),
govTmr ( *this, timerQueue, cacMutexIn ),
maxPeriod ( maxSearchPeriodDefault ),
rtteMean ( minRoundTripEstimate ),
cacRef ( cac ),
cbMutex ( cbMutexIn ),
cacMutex ( cacMutexIn ),
nBytesInXmitBuf ( 0 ),
nTimers ( 0 ),
beaconAnomalyTimerIndex ( 0 ),
sequenceNumber ( 0 ),
rtteSequenceNumber ( 0 ),
lastReceivedSeqNo ( 0 ),
sock ( 0 ),
pGovTmr ( new disconnectGovernorTimer ( *this, timerQueue ) ),
// The udpiiu and the search timer share the same lock because
// this is much more efficent with recursive locks. Also, access
// to the udp's netiiu base list is protected.
pSearchTmr ( new searchTimer ( *this, timerQueue, this->mutex ) ),
pRepeaterSubscribeTmr (
new repeaterSubscribeTimer (
*this, timerQueue, cbMutexIn, ctxNotifyIn ) ),
repeaterPort ( 0 ),
serverPort ( 0 ),
localPort ( 0 ),
shutdownCmd ( false ),
rtteActive ( false ),
lastReceivedSeqNoIsValid ( false )
{
static const unsigned short PORT_ANY = 0u;
osiSockAddr addr;
int boolValue = true;
int status;
if ( envGetConfigParamPtr ( & EPICS_CA_MAX_SEARCH_PERIOD ) ) {
long longStatus = envGetDoubleConfigParam (
& EPICS_CA_MAX_SEARCH_PERIOD, & this->maxPeriod );
if ( ! longStatus ) {
if ( this->maxPeriod < maxSearchPeriodLowerLimit ) {
epicsPrintf ( "\"%s\" out of range (low)\n",
EPICS_CA_MAX_SEARCH_PERIOD.name );
this->maxPeriod = maxSearchPeriodLowerLimit;
epicsPrintf ( "Setting \"%s\" = %f seconds\n",
EPICS_CA_MAX_SEARCH_PERIOD.name, this->maxPeriod );
}
}
else {
epicsPrintf ( "EPICS \"%s\" wasnt a real number\n",
EPICS_CA_MAX_SEARCH_PERIOD.name );
epicsPrintf ( "Setting \"%s\" = %f seconds\n",
EPICS_CA_MAX_SEARCH_PERIOD.name, this->maxPeriod );
}
}
double powerOfTwo = log ( this->maxPeriod / minRoundTripEstimate ) / log ( 2.0 );
this->nTimers = static_cast < unsigned > ( powerOfTwo + 1.0 );
if ( this->nTimers > channelNode::getMaxSearchTimerCount () ) {
this->nTimers = channelNode::getMaxSearchTimerCount ();
epicsPrintf ( "\"%s\" out of range (high)\n",
EPICS_CA_MAX_SEARCH_PERIOD.name );
epicsPrintf ( "Setting \"%s\" = %f seconds\n",
EPICS_CA_MAX_SEARCH_PERIOD.name,
(1<<(this->nTimers-1)) * minRoundTripEstimate );
}
powerOfTwo = log ( beaconAnomalySearchPeriod / minRoundTripEstimate ) / log ( 2.0 );
this->beaconAnomalyTimerIndex = static_cast < unsigned > ( powerOfTwo + 1.0 );
if ( this->beaconAnomalyTimerIndex >= this->nTimers ) {
this->beaconAnomalyTimerIndex = this->nTimers - 1;
}
this->ppSearchTmr.reset ( new epics_auto_ptr < class searchTimer > [ this->nTimers ] );
for ( unsigned i = 0; i < this->nTimers; i++ ) {
this->ppSearchTmr[i].reset (
new searchTimer ( *this, timerQueue, i, cacMutexIn,
i > this->beaconAnomalyTimerIndex ) );
}
this->repeaterPort =
envGetInetPortConfigParam ( &EPICS_CA_REPEATER_PORT,
@@ -124,7 +157,8 @@ udpiiu::udpiiu (
throwWithLocation ( noSocket () );
}
status = setsockopt ( this->sock, SOL_SOCKET, SO_BROADCAST,
int boolValue = true;
int status = setsockopt ( this->sock, SOL_SOCKET, SO_BROADCAST,
(char *) &boolValue, sizeof ( boolValue ) );
if ( status < 0 ) {
char sockErrBuf[64];
@@ -156,6 +190,8 @@ udpiiu::udpiiu (
// force a bind to an unconstrained address so we can obtain
// the local port number below
static const unsigned short PORT_ANY = 0u;
osiSockAddr addr;
memset ( (char *)&addr, 0 , sizeof (addr) );
addr.ia.sin_family = AF_INET;
addr.ia.sin_addr.s_addr = epicsHTON32 (INADDR_ANY);
@@ -202,6 +238,12 @@ udpiiu::udpiiu (
this->pushVersionMsg ();
// start timers and receive thread
for ( unsigned i =0; i < this->nTimers; i++ ) {
this->ppSearchTmr[i]->start ();
}
this->govTmr.start ();
this->repeaterSubscribeTmr.start ();
this->recvThread.start ();
}
@@ -210,23 +252,10 @@ udpiiu::udpiiu (
*/
udpiiu::~udpiiu ()
{
this->shutdown ();
// no need to own CAC lock here because the CA context
// is being decomissioned
tsDLIter < nciu > chan = this->disconnGovernor.firstIter ();
while ( chan.valid () ) {
tsDLIter < nciu > next = chan;
next++;
chan->serviceShutdownNotify ();
chan = next;
}
chan = this->serverAddrRes.firstIter ();
while ( chan.valid () ) {
tsDLIter < nciu > next = chan;
next++;
chan->serviceShutdownNotify ();
chan = next;
{
epicsGuard < epicsMutex > cbGuard ( this->cbMutex );
epicsGuard < epicsMutex > guard ( this->cacMutex );
this->shutdown ( cbGuard, guard );
}
// avoid use of ellFree because problems on windows occur if the
@@ -242,29 +271,40 @@ udpiiu::~udpiiu ()
epicsSocketDestroy ( this->sock );
}
void udpiiu::shutdown ()
void udpiiu::shutdown (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard )
{
// stop all of the timers
this->pGovTmr->shutdown ();
this->pSearchTmr->shutdown ();
this->pRepeaterSubscribeTmr->shutdown ();
this->repeaterSubscribeTmr.shutdown ( cbGuard, guard );
this->govTmr.shutdown ( cbGuard, guard );
for ( unsigned i =0; i < this->nTimers; i++ ) {
this->ppSearchTmr[i]->shutdown ( cbGuard, guard );
}
if ( ! this->recvThread.exitWait ( 0.0 ) ) {
unsigned tries = 0u;
{
epicsGuardRelease < epicsMutex > unguard ( guard );
{
epicsGuardRelease < epicsMutex > unguard ( cbGuard );
this->shutdownCmd = true;
if ( ! this->recvThread.exitWait ( 0.0 ) ) {
unsigned tries = 0u;
this->wakeupMsg ();
this->shutdownCmd = true;
// wait for recv threads to exit
double shutdownDelay = 1.0;
while ( ! this->recvThread.exitWait ( shutdownDelay ) ) {
this->wakeupMsg ();
if ( shutdownDelay < 16.0 ) {
shutdownDelay += shutdownDelay;
}
if ( ++tries > 3 ) {
fprintf ( stderr, "cac: timing out waiting for UDP thread shutdown\n" );
this->wakeupMsg ();
// wait for recv threads to exit
double shutdownDelay = 1.0;
while ( ! this->recvThread.exitWait ( shutdownDelay ) ) {
this->wakeupMsg ();
if ( shutdownDelay < 16.0 ) {
shutdownDelay += shutdownDelay;
}
if ( ++tries > 3 ) {
fprintf ( stderr, "cac: timing out waiting for UDP thread shutdown\n" );
}
}
}
}
}
@@ -352,7 +392,7 @@ void udpRecvThread::run ()
*/
void udpiiu::repeaterRegistrationMessage ( unsigned attemptNumber )
{
epicsGuard < udpMutex > cbGuard ( this->mutex );
epicsGuard < epicsMutex > cbGuard ( this->cacMutex );
caRepeaterRegistrationMessage ( this->sock, this->repeaterPort, attemptNumber );
}
@@ -560,21 +600,10 @@ bool udpiiu::badUDPRespAction (
bool udpiiu::versionAction ( epicsGuard < epicsMutex > &,
const caHdr & hdr, const osiSockAddr &, const epicsTime & currentTime )
{
epicsGuard < udpMutex > guard ( this->mutex );
epicsGuard < epicsMutex > guard ( this->cacMutex );
// update the round trip time estimate
if ( hdr.m_dataType & sequenceNoIsValid ) {
if ( this->rtteActive ) {
if ( this->rtteSequenceNumber == hdr.m_cid ) {
static const double gain = 0.25;
double measured = currentTime - this->rtteTimeStamp;
double error = measured - this->rtteMean;
this->rtteMean += gain * error;
this->rtteSequenceNumber = 0;
this->rtteTimeStamp = epicsTime ();
this->rtteActive = false;
}
}
this->lastReceivedSeqNo = hdr.m_cid;
this->lastReceivedSeqNoIsValid = true;
}
@@ -634,25 +663,15 @@ bool udpiiu::searchRespAction ( // X aCC 361
serverAddr.ia.sin_addr = addr.ia.sin_addr;
}
bool success;
if ( CA_V42 ( minorVersion ) ) {
success = this->cacRef.transferChanToVirtCircuit
this->cacRef.transferChanToVirtCircuit
( cbGuard, msg.m_available, msg.m_cid, 0xffff,
0, minorVersion, serverAddr );
0, minorVersion, serverAddr, currentTime );
}
else {
success = this->cacRef.transferChanToVirtCircuit
this->cacRef.transferChanToVirtCircuit
( cbGuard, msg.m_available, msg.m_cid, msg.m_dataType,
msg.m_count, minorVersion, serverAddr );
}
if ( success ) {
// deadlock can result if this is called while holding the primary
// mutex (because the primary mutex is used in the search timer callback)
epicsGuard < udpMutex > guard ( this->mutex );
this->pSearchTmr->notifySuccessfulSearchResponse (
guard, this->lastReceivedSeqNo,
this->lastReceivedSeqNoIsValid, currentTime );
msg.m_count, minorVersion, serverAddr, currentTime );
}
return true;
@@ -703,14 +722,16 @@ bool udpiiu::beaconAction (
return true;
}
bool udpiiu::repeaterAckAction ( epicsGuard < epicsMutex > &, const caHdr &,
const osiSockAddr &, const epicsTime &)
bool udpiiu::repeaterAckAction (
epicsGuard < epicsMutex > & cbGuard, const caHdr &,
const osiSockAddr &, const epicsTime &)
{
this->pRepeaterSubscribeTmr->confirmNotify ();
this->repeaterSubscribeTmr.confirmNotify ();
return true;
}
bool udpiiu::notHereRespAction ( epicsGuard < epicsMutex > &, const caHdr &,
bool udpiiu::notHereRespAction (
epicsGuard < epicsMutex > &, const caHdr &,
const osiSockAddr &, const epicsTime & )
{
return true;
@@ -741,7 +762,7 @@ bool udpiiu::exceptionRespAction (
return true;
}
void udpiiu::postMsg ( epicsGuard < epicsMutex > & guard,
void udpiiu::postMsg ( epicsGuard < epicsMutex > & cbGuard,
const osiSockAddr & net_addr,
char * pInBuf, arrayElementCount blockSize,
const epicsTime & currentTime )
@@ -757,7 +778,7 @@ void udpiiu::postMsg ( epicsGuard < epicsMutex > & guard,
if ( blockSize < sizeof ( *pCurMsg ) ) {
char buf[64];
sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) );
this->printf ( guard,
this->printf ( cbGuard,
"%s: Undecipherable (too small) UDP msg from %s ignored\n",
__FILE__, buf );
return;
@@ -794,9 +815,9 @@ void udpiiu::postMsg ( epicsGuard < epicsMutex > & guard,
if ( size > blockSize ) {
char buf[64];
sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) );
this->printf ( guard,
"%s: Undecipherable (payload too small) UDP msg from %s ignored\n", __FILE__,
buf );
this->printf ( cbGuard,
"%s: Undecipherable (payload too small) UDP msg from %s ignored\n",
__FILE__, buf );
return;
}
@@ -810,11 +831,11 @@ void udpiiu::postMsg ( epicsGuard < epicsMutex > & guard,
else {
pStub = &udpiiu::badUDPRespAction;
}
bool success = ( this->*pStub ) ( guard, *pCurMsg, net_addr, currentTime );
bool success = ( this->*pStub ) ( cbGuard, *pCurMsg, net_addr, currentTime );
if ( ! success ) {
char buf[256];
sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) );
this->printf ( guard, "CAC: Undecipherable UDP message from %s\n", buf );
this->printf ( cbGuard, "CAC: Undecipherable UDP message from %s\n", buf );
return;
}
@@ -825,7 +846,7 @@ void udpiiu::postMsg ( epicsGuard < epicsMutex > & guard,
bool udpiiu::pushVersionMsg ()
{
epicsGuard < udpMutex > guard ( this->mutex );
epicsGuard < epicsMutex > guard ( this->cacMutex );
this->sequenceNumber++;
@@ -836,13 +857,13 @@ bool udpiiu::pushVersionMsg ()
msg.m_count = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION );
msg.m_cid = epicsHTON32 ( this->sequenceNumber ); // sequence number
return pushDatagramMsg ( msg, 0, 0 );
return this->pushDatagramMsg ( guard, msg, 0, 0 );
}
bool udpiiu::pushDatagramMsg ( const caHdr & msg,
const void * pExt, ca_uint16_t extsize )
bool udpiiu::pushDatagramMsg ( epicsGuard < epicsMutex > & guard,
const caHdr & msg, const void * pExt, ca_uint16_t extsize )
{
epicsGuard < udpMutex > guard ( this->mutex );
guard.assertIdenticalMutex ( this->cacMutex );
ca_uint16_t alignedExtSize = static_cast <ca_uint16_t> (CA_MESSAGE_ALIGN ( extsize ));
arrayElementCount msgsize = sizeof ( caHdr ) + alignedExtSize;
@@ -869,25 +890,12 @@ bool udpiiu::pushDatagramMsg ( const caHdr & msg,
return true;
}
void udpiiu::datagramFlush (
epicsGuard < udpMutex > &, const epicsTime & currentTime )
bool udpiiu::datagramFlush (
epicsGuard < epicsMutex > &, const epicsTime & currentTime )
{
// dont send the version header by itself
if ( this->nBytesInXmitBuf <= sizeof ( caHdr ) ) {
return;
}
if ( this->rtteActive ) {
double delay = currentTime - this->rtteTimeStamp;
if ( delay > 8 * this->rtteMean ) {
this->rtteSequenceNumber = this->sequenceNumber;
this->rtteTimeStamp = currentTime;
}
}
else {
this->rtteActive = true;
this->rtteSequenceNumber = this->sequenceNumber;
this->rtteTimeStamp = currentTime;
return false;
}
osiSockAddrNode *pNode = ( osiSockAddrNode * ) // X aCC 749
@@ -925,11 +933,11 @@ void udpiiu::datagramFlush (
break;
}
else {
char buf[64];
sockAddrToDottedIP ( &pNode->addr.sa, buf, sizeof ( buf ) );
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
char buf[64];
sockAddrToDottedIP ( &pNode->addr.sa, buf, sizeof ( buf ) );
errlogPrintf (
"CAC: error = \"%s\" sending UDP msg to %s\n",
sockErrBuf, buf);
@@ -943,11 +951,13 @@ void udpiiu::datagramFlush (
this->nBytesInXmitBuf = 0u;
this->pushVersionMsg ();
return true;
}
void udpiiu::show ( unsigned level ) const
{
epicsGuard < udpMutex > guard ( this->mutex );
epicsGuard < epicsMutex > guard ( this->cacMutex );
::printf ( "Datagram IO circuit (and disconnected channel repository)\n");
if ( level > 1u ) {
@@ -961,21 +971,12 @@ void udpiiu::show ( unsigned level ) const
::printf ("\tshut down command bool %u\n", this->shutdownCmd );
::printf ( "\trecv thread exit signal:\n" );
this->recvThread.show ( level - 2u );
::printf ( "repeater subscribee timer:\n" );
this->pRepeaterSubscribeTmr->show ( level - 2u );
::printf ( "disconnect governor subscribee timer:\n" );
this->pGovTmr->show ( level - 2u );
tsDLIterConst < nciu > pChan = this->disconnGovernor.firstIter ();
while ( pChan.valid () ) {
pChan->show ( level - 2u );
pChan++;
}
::printf ( "search message timer:\n" );
this->pSearchTmr->show ( level - 2u );
pChan = this->serverAddrRes.firstIter ();
while ( pChan.valid () ) {
pChan->show ( level - 2u );
pChan++;
this->repeaterSubscribeTmr.show ( level - 2u );
this->govTmr.show ( level - 2u );
}
if ( level > 3u ) {
for ( unsigned i =0; i < this->nTimers; i++ ) {
this->ppSearchTmr[i]->show ( level - 3u );
}
}
}
@@ -995,8 +996,6 @@ bool udpiiu::wakeupMsg ()
addr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_LOOPBACK );
addr.ia.sin_port = epicsHTON16 ( this->localPort );
epicsGuard < udpMutex > guard ( this->mutex );
// send a wakeup msg so the UDP recv thread will exit
int status = sendto ( this->sock, reinterpret_cast < char * > ( &msg ),
sizeof (msg), 0, &addr.sa, sizeof ( addr.sa ) );
@@ -1007,97 +1006,98 @@ bool udpiiu::wakeupMsg ()
}
void udpiiu::beaconAnomalyNotify (
epicsGuard < epicsMutex > & cacGuard, const epicsTime & currentTime )
epicsGuard < epicsMutex > & cacGuard )
{
epicsGuard <udpMutex> guard ( this->mutex );
static const double portTicksPerSec = 1000u;
static const unsigned portBasedDelayMask = 0xff;
/*
* This is needed when many machines
* have channels in a disconnected state that
* dont exist anywhere on the network. This insures
* that we dont have many CA clients synchronously
* flooding the network with broadcasts (and swamping
* out requests for valid channels).
*
* I fetch the local UDP port number and use the low
* order bits as a pseudo random delay to prevent every
* one from replying at once.
*/
double delay = ( this->localPort & portBasedDelayMask );
delay /= portTicksPerSec;
this->pSearchTmr->beaconAnomalyNotify ( guard, currentTime, delay );
for ( unsigned i = this->beaconAnomalyTimerIndex+1u;
i < this->nTimers; i++ ) {
this->ppSearchTmr[i]->moveChannels ( cacGuard,
*this->ppSearchTmr[this->beaconAnomalyTimerIndex] );
}
}
bool udpiiu::searchMsg ( epicsGuard < udpMutex > & /* guard */ )
void udpiiu::uninstallChanDueToSuccessfulSearchResponse (
epicsGuard < epicsMutex > & guard, nciu & chan,
const epicsTime & currentTime )
{
bool success;
if ( nciu *pChan = this->serverAddrRes.get () ) {
success = pChan->searchMsg ( *this );
if ( success ) {
this->serverAddrRes.add ( *pChan );
}
else {
this->serverAddrRes.push ( *pChan );
}
channelNode::channelState chanState =
chan.channelNode::listMember;
if ( chanState == channelNode::cs_disconnGov ) {
this->govTmr.uninstallChan ( guard, chan );
}
else {
success = false;
this->ppSearchTmr[ chan.getSearchTimerIndex ( guard ) ]->
uninstallChanDueToSuccessfulSearchResponse (
guard, chan, this->lastReceivedSeqNo,
this->lastReceivedSeqNoIsValid, currentTime );
}
return success;
}
void udpiiu::installNewChannel ( const epicsTime & currentTime, nciu & chan )
void udpiiu::uninstallChan (
epicsGuard < epicsMutex > & guard, nciu & chan )
{
bool firstChannel = false;
epicsGuard < udpMutex > guard ( this->mutex );
if ( this->serverAddrRes.count() == 0 ) {
firstChannel = true;
channelNode::channelState chanState =
chan.channelNode::listMember;
if ( chanState == channelNode::cs_disconnGov ) {
this->govTmr.uninstallChan ( guard, chan );
}
else {
this->ppSearchTmr[ chan.getSearchTimerIndex ( guard ) ]->
uninstallChan ( guard, chan );
}
// push them to the front of the list so that
// a search request is sent immediately, and
// so that the new channel's retry count is
// seen when calculating the minimum retry
// which is used to compute the search interval
this->serverAddrRes.push ( chan );
chan.channelNode::listMember =
channelNode::cs_serverAddrResPend;
this->pSearchTmr->channelCreatedNotify (
guard, currentTime, firstChannel );
}
void udpiiu::installDisconnectedChannel ( nciu & chan )
bool udpiiu::searchMsg (
epicsGuard < epicsMutex > & guard, ca_uint32_t id,
const char * pName, unsigned nameLength )
{
epicsGuard < udpMutex > guard ( this->mutex );
this->disconnGovernor.add ( chan );
chan.channelNode::listMember =
channelNode::cs_disconnGov;
caHdr msg;
msg.m_cmmd = epicsHTON16 ( CA_PROTO_SEARCH );
msg.m_available = epicsHTON32 ( id );
msg.m_dataType = epicsHTON16 ( DONTREPLY );
msg.m_count = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION );
msg.m_cid = epicsHTON32 ( id );
return this->pushDatagramMsg (
guard, msg, pName, nameLength );
}
void udpiiu::govExpireNotify ( const epicsTime & currentTime )
void udpiiu::installNewChannel (
epicsGuard < epicsMutex > & guard, nciu & chan, netiiu * & piiu )
{
epicsGuard < udpMutex > guard ( this->mutex );
if ( this->disconnGovernor.count () ) {
bool firstChannel = this->serverAddrRes.count() == 0;
// push them to the front of the list so that
// a search request is sent immediately, and
// so that the new channel's retry count is
// seen when calculating the minimum retry
// which is used to compute the search interval
while ( nciu * pChan = this->disconnGovernor.get () ) {
this->serverAddrRes.push ( *pChan );
pChan->channelNode::listMember =
channelNode::cs_serverAddrResPend;
}
this->pSearchTmr->channelDisconnectedNotify (
guard, currentTime, firstChannel );
piiu = this;
this->ppSearchTmr[0]->installChannel ( guard, chan );
}
void udpiiu::installDisconnectedChannel (
epicsGuard < epicsMutex > & guard, nciu & chan )
{
chan.setServerAddressUnknown ( *this, guard );
this->govTmr.installChan ( guard, chan );
}
void udpiiu::noSearchRespNotify (
epicsGuard < epicsMutex > & guard, nciu & chan, unsigned index )
{
const unsigned nTimersMinusOne = this->nTimers - 1;
if ( index < nTimersMinusOne ) {
index++;
}
else {
index = nTimersMinusOne;
}
this->ppSearchTmr[index]->installChannel ( guard, chan );
}
void udpiiu::boostChannel (
epicsGuard < epicsMutex > & guard, nciu & chan )
{
this->ppSearchTmr[this->beaconAnomalyTimerIndex]->
installChannel ( guard, chan );
}
void udpiiu::govExpireNotify (
epicsGuard < epicsMutex > & guard, nciu & chan )
{
this->ppSearchTmr[0]->installChannel ( guard, chan );
}
int udpiiu::printf ( epicsGuard < epicsMutex > & cbGuard,
@@ -1115,26 +1115,15 @@ int udpiiu::printf ( epicsGuard < epicsMutex > & cbGuard,
return status;
}
void udpiiu::uninstallChan (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & cacGuard,
nciu & chan )
void udpiiu::updateRTTE ( double measured )
{
cbGuard.assertIdenticalMutex ( this->cbMutex );
cacGuard.assertIdenticalMutex ( this->cacMutex );
double error = measured - this->rtteMean;
this->rtteMean += 0.25 * error;
}
epicsGuard < udpMutex > guard ( this->mutex );
if ( chan.channelNode::listMember == channelNode::cs_disconnGov ) {
this->disconnGovernor.remove ( chan );
}
else if ( chan.channelNode::listMember == channelNode::cs_serverAddrResPend ) {
this->serverAddrRes.remove ( chan );
}
else {
this->cacRef.printf ( cbGuard,
"cac: attempt to uninstall channel from udp iiu, but it inst installed there?" );
}
chan.channelNode::listMember = channelNode::cs_none;
double udpiiu::getRTTE () const
{
return max ( this->rtteMean, minRoundTripEstimate );
}
void udpiiu::hostName (
@@ -1167,7 +1156,6 @@ void udpiiu::writeRequest (
nciu & chan, unsigned type,
arrayElementCount nElem, const void * pValue )
{
guard.assertIdenticalMutex ( this->cacMutex );
netiiu::writeRequest ( guard, chan, type, nElem, pValue );
}
@@ -1176,7 +1164,6 @@ void udpiiu::writeNotifyRequest (
netWriteNotifyIO & io, unsigned type,
arrayElementCount nElem, const void *pValue )
{
guard.assertIdenticalMutex ( this->cacMutex );
netiiu::writeNotifyRequest ( guard, chan, io, type, nElem, pValue );
}
@@ -1184,7 +1171,6 @@ void udpiiu::readNotifyRequest (
epicsGuard < epicsMutex > & guard, nciu & chan,
netReadNotifyIO & io, unsigned type, arrayElementCount nElem )
{
guard.assertIdenticalMutex ( this->cacMutex );
netiiu::readNotifyRequest ( guard, chan, io, type, nElem );
}
@@ -1192,7 +1178,6 @@ void udpiiu::clearChannelRequest (
epicsGuard < epicsMutex > & guard,
ca_uint32_t sid, ca_uint32_t cid )
{
guard.assertIdenticalMutex ( this->cacMutex );
netiiu::clearChannelRequest ( guard, sid, cid );
}
@@ -1200,21 +1185,21 @@ void udpiiu::subscriptionRequest (
epicsGuard < epicsMutex > & guard, nciu & chan,
netSubscription & subscr )
{
guard.assertIdenticalMutex ( this->cacMutex );
netiiu::subscriptionRequest ( guard, chan, subscr );
}
void udpiiu::subscriptionUpdateRequest (
epicsGuard < epicsMutex > &, nciu &,
netSubscription & )
epicsGuard < epicsMutex > & guard, nciu & chan,
netSubscription & subscr )
{
netiiu::subscriptionUpdateRequest (
guard, chan, subscr );
}
void udpiiu::subscriptionCancelRequest (
epicsGuard < epicsMutex > & guard,
nciu & chan, netSubscription & subscr )
{
guard.assertIdenticalMutex ( this->cacMutex );
netiiu::subscriptionCancelRequest ( guard, chan, subscr );
}
@@ -1225,32 +1210,33 @@ void udpiiu::flushRequest (
}
void udpiiu::eliminateExcessiveSendBacklog (
epicsGuard < epicsMutex > *,
epicsGuard < epicsMutex > & )
epicsGuard < epicsMutex > * pCBGuard,
epicsGuard < epicsMutex > & guard )
{
netiiu::eliminateExcessiveSendBacklog ( pCBGuard, guard );
}
void udpiiu::requestRecvProcessPostponedFlush (
epicsGuard < epicsMutex > & guard )
{
guard.assertIdenticalMutex ( this->cacMutex );
netiiu::requestRecvProcessPostponedFlush ( guard );
}
osiSockAddr udpiiu::getNetworkAddress (
epicsGuard < epicsMutex > & guard ) const
{
guard.assertIdenticalMutex ( this->cacMutex );
return netiiu::getNetworkAddress ( guard );
}
double udpiiu::receiveWatchdogDelay (
epicsGuard < epicsMutex > & guard ) const
{
guard.assertIdenticalMutex ( this->cacMutex );
return - DBL_MAX;
return netiiu::receiveWatchdogDelay ( guard );
}
ca_uint32_t udpiiu::datagramSeqNumber (
epicsGuard < epicsMutex > & ) const
{
return this->sequenceNumber;
}

View File

@@ -37,20 +37,26 @@
#include "epicsMemory.h"
#include "epicsTime.h"
#include "tsMinMax.h"
#include "tsDLList.h"
#ifdef udpiiuh_accessh_epicsExportSharedSymbols
# define epicsExportSharedSymbols
# include "shareLib.h"
#endif
#include "netiiu.h"
#include "searchTimer.h"
#include "disconnectGovernorTimer.h"
#include "repeaterSubscribeTimer.h"
extern "C" void cacRecvThreadUDP ( void *pParam );
epicsShareFunc void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repeaterPort );
epicsShareFunc void epicsShareAPI caRepeaterRegistrationMessage ( SOCKET sock, unsigned repeaterPort, unsigned attemptNumber );
extern "C" epicsShareFunc void caRepeaterThread ( void *pDummy );
epicsShareFunc void epicsShareAPI caStartRepeaterIfNotInstalled (
unsigned repeaterPort );
epicsShareFunc void epicsShareAPI caRepeaterRegistrationMessage (
SOCKET sock, unsigned repeaterPort, unsigned attemptNumber );
extern "C" epicsShareFunc void caRepeaterThread (
void * pDummy );
epicsShareFunc void ca_repeater ( void );
class cac;
@@ -74,16 +80,16 @@ private:
void run();
};
class udpMutex {
public:
void lock ();
void unlock ();
void show ( unsigned level ) const;
private:
epicsMutex mutex;
};
static const double minRoundTripEstimate = 32e-3; // seconds
static const double maxSearchPeriodDefault = 5.0 * 60.0; // seconds
static const double maxSearchPeriodLowerLimit = 60.0; // seconds
static const double beaconAnomalySearchPeriod = 5.0; // seconds
class udpiiu : public netiiu {
class udpiiu :
private netiiu,
private searchTimerNotify,
private disconnectGovernorNotify,
private repeaterTimerNotify {
public:
udpiiu (
class epicsTimerQueueActive &,
@@ -92,27 +98,15 @@ public:
cacContextNotify &,
class cac & );
virtual ~udpiiu ();
void installNewChannel ( const epicsTime & currentTime, nciu & );
void installDisconnectedChannel ( nciu & );
void repeaterRegistrationMessage ( unsigned attemptNumber );
bool searchMsg ( epicsGuard < udpMutex > & );
void datagramFlush ( epicsGuard < udpMutex > &, const epicsTime & currentTime );
ca_uint32_t datagramSeqNumber ( epicsGuard < udpMutex > & ) const;
void show ( unsigned level ) const;
bool wakeupMsg ();
void installNewChannel (
epicsGuard < epicsMutex > &, nciu &, netiiu * & );
void installDisconnectedChannel (
epicsGuard < epicsMutex > &, nciu & );
void beaconAnomalyNotify (
epicsGuard < epicsMutex > & guard, const epicsTime & currentTime );
void govExpireNotify ( const epicsTime & currentTime );
unsigned unresolvedChannelCount ( epicsGuard < udpMutex > & ) const;
void uninstallChan (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard,
nciu & );
bool pushDatagramMsg ( const caHdr & hdr,
const void * pExt, ca_uint16_t extsize);
void shutdown ();
double roundTripDelayEstimate ( epicsGuard < udpMutex > & ) const;
int printf ( epicsGuard < epicsMutex > & callbackControl, const char *pformat, ... );
epicsGuard < epicsMutex > & guard );
void shutdown ( epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard );
void show ( unsigned level ) const;
// exceptions
class noSocket {};
@@ -120,37 +114,39 @@ public:
private:
char xmitBuf [MAX_UDP_SEND];
char recvBuf [MAX_UDP_RECV];
mutable udpMutex mutex;
udpRecvThread recvThread;
// nciu state field tells us which list
tsDLList < nciu > disconnGovernor;
tsDLList < nciu > serverAddrRes;
repeaterSubscribeTimer repeaterSubscribeTmr;
disconnectGovernorTimer govTmr;
ELLLIST dest;
epicsTime rtteTimeStamp;
double maxPeriod;
double rtteMean;
cac & cacRef;
mutable epicsMutex & cbMutex;
mutable epicsMutex & cacMutex;
epics_auto_ptr < epics_auto_ptr < class searchTimer >, eapt_array > ppSearchTmr;
unsigned nBytesInXmitBuf;
unsigned nTimers;
unsigned beaconAnomalyTimerIndex;
ca_uint32_t sequenceNumber;
ca_uint32_t rtteSequenceNumber;
ca_uint32_t lastReceivedSeqNo;
SOCKET sock;
epics_auto_ptr < class disconnectGovernorTimer > pGovTmr;
epics_auto_ptr < class searchTimer > pSearchTmr;
epics_auto_ptr < class repeaterSubscribeTimer > pRepeaterSubscribeTmr;
ca_uint16_t repeaterPort;
ca_uint16_t serverPort;
ca_uint16_t localPort;
bool shutdownCmd;
bool rtteActive;
bool lastReceivedSeqNoIsValid;
void postMsg ( epicsGuard < epicsMutex > &,
bool wakeupMsg ();
void postMsg ( epicsGuard < epicsMutex > & cbGuard,
const osiSockAddr & net_addr,
char *pInBuf, arrayElementCount blockSize,
const epicsTime &currenTime );
bool pushDatagramMsg ( epicsGuard < epicsMutex > &,
const caHdr & hdr, const void * pExt,
ca_uint16_t extsize);
typedef bool ( udpiiu::*pProtoStubUDP ) (
epicsGuard < epicsMutex > &, const caHdr &,
const osiSockAddr &, const epicsTime & );
@@ -181,87 +177,90 @@ private:
epicsGuard < epicsMutex > &, const caHdr &msg,
const osiSockAddr &net_addr, const epicsTime & );
friend class udpRecvThread;
// netiiu stubs
void hostName (
epicsGuard < epicsMutex > &,
char *pBuf, unsigned bufLength ) const;
epicsGuard < epicsMutex > &, char * pBuf,
unsigned bufLength ) const;
const char * pHostName (
epicsGuard < epicsMutex > & ) const;
bool ca_v41_ok (
epicsGuard < epicsMutex > & ) const;
bool ca_v42_ok (
epicsGuard < epicsMutex > & ) const;
bool ca_v41_ok (
epicsGuard < epicsMutex > & ) const;
void writeRequest (
epicsGuard < epicsMutex > &, nciu &, unsigned type,
arrayElementCount nElem, const void *pValue );
void eliminateExcessiveSendBacklog (
epicsGuard < epicsMutex > * pCallbackGuard,
epicsGuard < epicsMutex > & mutualExclusionGuard );
void writeRequest (
epicsGuard < epicsMutex > &, nciu &,
unsigned type, arrayElementCount nElem,
const void *pValue );
void writeNotifyRequest (
epicsGuard < epicsMutex > &, nciu &, netWriteNotifyIO &,
unsigned type, arrayElementCount nElem, const void *pValue );
epicsGuard < epicsMutex > &,
nciu &, netWriteNotifyIO &,
unsigned type, arrayElementCount nElem,
const void *pValue );
void readNotifyRequest (
epicsGuard < epicsMutex > &, nciu &, netReadNotifyIO &,
unsigned type, arrayElementCount nElem );
epicsGuard < epicsMutex > &, nciu &,
netReadNotifyIO &, unsigned type,
arrayElementCount nElem );
void clearChannelRequest (
epicsGuard < epicsMutex > &,
ca_uint32_t sid, ca_uint32_t cid );
void subscriptionRequest (
epicsGuard < epicsMutex > &, nciu &,
netSubscription & );
void subscriptionRequest (
epicsGuard < epicsMutex > &,
nciu &, netSubscription & );
void subscriptionUpdateRequest (
epicsGuard < epicsMutex > &, nciu &,
netSubscription & );
bool pushVersionMsg ();
epicsGuard < epicsMutex > &,
nciu &, netSubscription & );
void subscriptionCancelRequest (
epicsGuard < epicsMutex > &,
nciu & chan, netSubscription & subscr );
void flushRequest (
epicsGuard < epicsMutex > & );
void eliminateExcessiveSendBacklog (
epicsGuard < epicsMutex > * pCallbackGuard,
epicsGuard < epicsMutex > & mutualExclusionGuard );
void requestRecvProcessPostponedFlush (
epicsGuard < epicsMutex > & );
osiSockAddr getNetworkAddress (
osiSockAddr getNetworkAddress (
epicsGuard < epicsMutex > & ) const;
double receiveWatchdogDelay (
void uninstallChan (
epicsGuard < epicsMutex > &, nciu & );
void uninstallChanDueToSuccessfulSearchResponse (
epicsGuard < epicsMutex > &, nciu &,
const class epicsTime & currentTime );
double receiveWatchdogDelay (
epicsGuard < epicsMutex > & ) const;
bool searchMsg (
epicsGuard < epicsMutex > &, ca_uint32_t id,
const char * pName, unsigned nameLength );
// searchTimerNotify stubs
double getRTTE () const;
void updateRTTE ( double rtte );
bool pushVersionMsg ();
void boostChannel (
epicsGuard < epicsMutex > & guard, nciu & chan );
void noSearchRespNotify (
epicsGuard < epicsMutex > &, nciu & chan, unsigned index );
bool datagramFlush (
epicsGuard < epicsMutex > &, const epicsTime & currentTime );
ca_uint32_t datagramSeqNumber (
epicsGuard < epicsMutex > & ) const;
// disconnectGovernorNotify
void govExpireNotify (
epicsGuard < epicsMutex > &, nciu & );
// repeaterTimerNotify
void repeaterRegistrationMessage (
unsigned attemptNumber );
int printf (
epicsGuard < epicsMutex > & callbackControl,
const char * pformat, ... );
udpiiu ( const udpiiu & );
udpiiu & operator = ( const udpiiu & );
friend class udpRecvThread;
};
inline void udpMutex::lock ()
{
this->mutex.lock ();
}
inline void udpMutex::unlock ()
{
this->mutex.unlock ();
}
inline void udpMutex::show ( unsigned level ) const
{
this->mutex.show ( level );
}
inline unsigned udpiiu::unresolvedChannelCount (
epicsGuard < udpMutex > & ) const
{
return this->serverAddrRes.count ();
}
inline ca_uint32_t udpiiu::datagramSeqNumber (
epicsGuard < udpMutex > & ) const
{
return this->sequenceNumber;
}
inline double udpiiu::roundTripDelayEstimate (
epicsGuard < udpMutex > & ) const
{
return this->rtteMean;
}
#endif // udpiiuh

View File

@@ -109,7 +109,8 @@ public:
epicsGuard < epicsMutex > & guard );
void sendTimeoutNotify (
const epicsTime & currentTime,
callbackManager & );
callbackManager & cbMgr,
epicsGuard < epicsMutex > & guard );
void receiveTimeoutNotify(
callbackManager &,
epicsGuard < epicsMutex > & );
@@ -159,14 +160,16 @@ public:
const char *pformat, ... );
unsigned channelCount (
epicsGuard < epicsMutex > & );
void removeAllChannels (
bool supressApplicationNotify,
void disconnectAllChannels (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard, udpiiu & );
epicsGuard < epicsMutex > & guard, class udpiiu & );
void unlinkAllChannels (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard );
void installChannel ( epicsGuard < epicsMutex > &,
epicsGuard < epicsMutex > &, nciu & chan,
unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn );
void uninstallChan ( epicsGuard < epicsMutex > & cbGuard,
void uninstallChan (
epicsGuard < epicsMutex > & guard, nciu & chan );
void connectNotify (
epicsGuard < epicsMutex > &, nciu & chan );
@@ -289,6 +292,13 @@ private:
bool flush (
epicsGuard < epicsMutex > & ); // only to be called by the send thread
// netiiu stubs
void uninstallChanDueToSuccessfulSearchResponse (
epicsGuard < epicsMutex > &, nciu &, const class epicsTime & );
bool searchMsg (
epicsGuard < epicsMutex > &, ca_uint32_t id,
const char * pName, unsigned nameLength );
friend class tcpRecvThread;
friend class tcpSendThread;