many changes

This commit is contained in:
Jeff Hill
2001-03-07 16:32:18 +00:00
parent 7cac5852ef
commit 35a62778b8
10 changed files with 1162 additions and 1438 deletions
+157 -212
View File
@@ -69,18 +69,12 @@
#include "caProto.h"
#include "net_convert.h"
#ifndef FALSE
# define FALSE 0
#elif FALSE
# error FALSE isnt boolean false
#ifdef DEBUG
# define debugPrintf(argsInParen) printf argsInParen
#else
# define debugPrintf(argsInParen)
#endif
#ifndef TRUE
# define TRUE 1
#elif !TRUE
# error TRUE isnt boolean true
#endif
#ifndef NELEMENTS
# define NELEMENTS(array) (sizeof(array)/sizeof((array)[0]))
#endif
@@ -89,14 +83,6 @@
# define LOCAL static
#endif
#ifndef min
# define min(A,B) ((A)>(B)?(B):(A))
#endif
#ifndef max
# define max(A,B) ((A)<(B)?(B):(A))
#endif
#define MSEC_PER_SEC 1000L
#define USEC_PER_SEC 1000000L
@@ -126,7 +112,7 @@ public:
unsigned unoccupiedBytes () const;
unsigned occupiedBytes () const;
void compress ();
static unsigned maxBytes ();
static unsigned capacityBytes ();
unsigned copyInBytes ( const void *pBuf, unsigned nBytes );
unsigned copyIn ( comBuf & );
unsigned copyIn ( const epicsInt8 *pValue, unsigned nElem );
@@ -154,6 +140,7 @@ private:
unsigned char buf [ comBufSize ]; // optimal for 100 Mb Ethernet LAN MTU
unsigned clipNElem ( unsigned elemSize, unsigned nElem );
static tsFreeList < class comBuf, 0x20 > freeList;
static epicsMutex freeListMutex;
};
struct msgDescriptor {
@@ -209,6 +196,8 @@ private:
static const copyFunc_t dbrCopyVector [39];
};
static const unsigned maxBytesPendingTCP = 0x4000;
class comQueRecv {
public:
comQueRecv ();
@@ -247,9 +236,9 @@ class baseNMIU;
//
class tcpiiuPrivateListOfIO {
private:
tsDLList < class baseNMIU > eventq;
friend tcpiiu;
friend netiiu; // used to install subscriptions when not connected
tsDLList < class baseNMIU > eventq;
};
class nciu : public cacChannelIO, public tsDLNode < nciu >,
@@ -258,37 +247,13 @@ public:
nciu ( class cac &, netiiu &,
cacChannelNotify &, const char *pNameIn );
bool fullyConstructed () const;
void destroy ();
void connect ( unsigned nativeType,
unsigned long nativeCount, unsigned sid );
void connect ();
void disconnect ( netiiu &newiiu );
int createChannelRequest ();
void initiateConnect ();
int read ( unsigned type,
unsigned long count, void *pValue );
int read ( unsigned type,
unsigned long count, cacNotify &notify );
int write ( unsigned type,
unsigned long count, const void *pValue );
int write ( unsigned type,
unsigned long count, const void *pValue, cacNotify & );
int subscribe ( unsigned type, unsigned long nElem,
unsigned mask, cacNotify &notify,
cacNotifyIO *&pNotifyIO );
void hostName ( char *pBuf, unsigned bufLength ) const;
bool ca_v42_ok () const;
short nativeType () const;
unsigned long nativeElementCount () const;
channel_state state () const;
caar accessRights () const;
const char *pName () const;
unsigned nameLen () const;
unsigned searchAttempts () const;
double beaconPeriod () const;
bool connected () const;
bool searchMsg ( unsigned short retrySeqNumber,
unsigned &retryNoForThisChannel );
int createChannelRequest ();
bool isAttachedToVirtaulCircuit ( const osiSockAddr & );
bool identifierEquivelence ( unsigned idToMatch );
void * operator new ( size_t size );
@@ -299,20 +264,20 @@ public:
ca_uint32_t getSID () const;
ca_uint32_t getCID () const;
netiiu * getPIIU ();
cac & getCAC ();
void searchReplySetUp ( netiiu &iiu, unsigned sidIn,
unsigned typeIn, unsigned long countIn );
void show ( unsigned level ) const;
bool verifyIIU ( netiiu & );
bool verifyConnected ( netiiu & );
void decrementOutstandingIO ( unsigned seqNumber );
void incrementOutstandingIO ( unsigned seqNumber );
void connectTimeoutNotify ();
unsigned long nativeElementCount () const;
const char *pName () const;
unsigned nameLen () const;
protected:
~nciu (); // force pool allocation
private:
cac &cacCtx;
caar ar; // access rights
caar accessRightState;
unsigned count;
char *pNameStr;
netiiu *piiu;
@@ -327,71 +292,73 @@ private:
unsigned f_claimSent:1;
unsigned f_firstConnectDecrementsOutstandingIO:1;
unsigned f_connectTimeOutSeen:1;
void lock () const;
void unlock () const;
void initiateConnect ();
int read ( unsigned type,
unsigned long count, cacNotify &notify );
int write ( unsigned type,
unsigned long count, const void *pValue );
int write ( unsigned type,
unsigned long count, const void *pValue, cacNotify & );
int subscribe ( unsigned type, unsigned long nElem,
unsigned mask, cacNotify &notify,
cacNotifyIO *&pNotifyIO );
void hostName ( char *pBuf, unsigned bufLength ) const;
bool ca_v42_ok () const;
short nativeType () const;
channel_state state () const;
caar accessRights () const;
unsigned searchAttempts () const;
double beaconPeriod () const;
bool connected () const;
const char * pHostName () const; // deprecated - please do not use
void notifyStateChangeFirstConnectInCountOfOutstandingIO ();
static tsFreeList < class nciu, 1024 > freeList;
static epicsMutex freeListMutex;
};
class baseNMIU : public tsDLNode < baseNMIU >,
class baseNMIU : public cacNotifyIO, public tsDLNode < baseNMIU >,
public chronIntIdRes < baseNMIU > {
public:
baseNMIU ( nciu &chan );
baseNMIU ( cacNotify &notifyIn, nciu &chan );
virtual ~baseNMIU () = 0;
virtual void completionNotify () = 0;
virtual void completionNotify ( unsigned type,
unsigned long count, const void *pData ) = 0;
virtual void exceptionNotify ( int status,
const char *pContext ) = 0;
virtual void exceptionNotify ( int status,
const char *pContext, unsigned type,
unsigned long count ) = 0;
virtual class netSubscription * isSubscription ();
virtual void show ( unsigned level ) const;
virtual void ioCancelRequest ();
void show ( unsigned level ) const;
ca_uint32_t getID () const;
nciu & channel () const;
void destroy ();
cacChannelIO & channelIO () const;
void cancel ();
protected:
nciu &chan;
};
class netSubscription : public cacNotifyIO, public baseNMIU {
class netSubscription : public baseNMIU {
public:
netSubscription ( nciu &chan, unsigned type, unsigned long count,
unsigned mask, cacNotify &notify );
void destroy ();
void show ( unsigned level ) const;
unsigned long getCount () const;
unsigned getType () const;
unsigned getMask () const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
protected:
~netSubscription ();
private:
unsigned long count;
unsigned type;
unsigned mask;
void completionNotify ();
void completionNotify ( unsigned type,
unsigned long count, const void *pData );
void exceptionNotify ( int status,
const char *pContext );
void exceptionNotify ( int status,
const char *pContext, unsigned type, unsigned long count );
cacChannelIO & channelIO () const;
const unsigned long count;
const unsigned type;
const unsigned mask;
class netSubscription * isSubscription ();
void ioCancelRequest ();
static tsFreeList < class netSubscription, 1024 > freeList;
static epicsMutex freeListMutex;
};
#if 0
class netReadCopyIO : public baseNMIU {
public:
netReadCopyIO ( nciu &chan, unsigned type, unsigned long count,
void *pValue, unsigned seqNumber );
void *pValue, unsigned seqNumber, cacNotify &notifyIn );
void show ( unsigned level ) const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
@@ -402,54 +369,35 @@ private:
unsigned long count;
void *pValue;
unsigned seqNumber;
void completionNotify ( );
void completionNotify ( unsigned type,
unsigned long count, const void *pData );
void exceptionNotify ( int status, const char *pContext );
void exceptionNotify ( int status,
const char *pContext, unsigned type, unsigned long count );
cacChannelIO & channelIO () const;
static tsFreeList < class netReadCopyIO, 1024 > freeList;
static epicsMutex freeListMutex;
};
#endif
class netReadNotifyIO : public cacNotifyIO, public baseNMIU {
class netReadNotifyIO : public baseNMIU {
public:
netReadNotifyIO ( nciu &chan, cacNotify &notify );
void destroy ();
void show ( unsigned level ) const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
protected:
~netReadNotifyIO ();
private:
void completionNotify ();
void completionNotify ( unsigned type,
unsigned long count, const void *pData );
void exceptionNotify ( int status, const char *pContext );
void exceptionNotify ( int status,
const char *pContext, unsigned type, unsigned long count );
cacChannelIO & channelIO () const;
static tsFreeList < class netReadNotifyIO, 1024 > freeList;
static epicsMutex freeListMutex;
};
class netWriteNotifyIO : public cacNotifyIO, public baseNMIU {
class netWriteNotifyIO : public baseNMIU {
public:
netWriteNotifyIO ( nciu &chan, cacNotify &notify );
void destroy ();
void show ( unsigned level ) const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
protected:
~netWriteNotifyIO ();
private:
void completionNotify ();
void completionNotify ( unsigned type,
unsigned long count, const void *pData );
void exceptionNotify ( int status, const char *pContext );
void exceptionNotify ( int status,
const char *pContext, unsigned type, unsigned long count );
cacChannelIO & channelIO () const;
static tsFreeList < class netWriteNotifyIO, 1024 > freeList;
static epicsMutex freeListMutex;
};
/*
@@ -466,10 +414,10 @@ private:
#define CA_RECAST_PERIOD 5.0 /* quiescent search period (sec) */
#if defined (CLOCKS_PER_SEC)
#define CAC_SIGNIFICANT_SELECT_DELAY ( 1.0 / CLOCKS_PER_SEC )
# define CAC_SIGNIFICANT_SELECT_DELAY ( 1.0 / CLOCKS_PER_SEC )
#else
/* on sunos4 GNU does not provide CLOCKS_PER_SEC */
#define CAC_SIGNIFICANT_SELECT_DELAY (1.0 / 1000000u)
# define CAC_SIGNIFICANT_SELECT_DELAY (1.0 / 1000000u)
#endif
/*
@@ -498,8 +446,6 @@ static const unsigned contiguousMsgCountWhichTriggersFlowControl = 10u;
enum iiu_conn_state {iiu_connecting, iiu_connected, iiu_disconnected};
extern epicsThreadPrivateId cacRecursionLock;
class netiiu {
public:
netiiu ( class cac * );
@@ -521,7 +467,6 @@ public:
virtual bool pushDatagramMsg ( const caHdr &hdr, const void *pExt, ca_uint16_t extsize);
virtual int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue);
virtual int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue );
virtual int readCopyRequest ( nciu &, unsigned type, unsigned nElem, void *pValue );
virtual int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem );
virtual int createChannelRequest ( nciu & );
virtual void connectAllIO ( nciu &chan );
@@ -531,15 +476,12 @@ public:
virtual void subscriptionCancelRequest ( netSubscription &subscr, bool userThread );
virtual double beaconPeriod () const;
virtual bool destroyAllIO ( nciu &chan );
protected:
cac * pCAC () const;
mutable epicsMutex mutex;
private:
tsDLList < nciu > channelList;
class cac *pClientCtx;
virtual void lastChannelDetachNotify ();
virtual int subscriptionRequest ( netSubscription &subscr, bool userThread );
};
@@ -553,22 +495,14 @@ extern limboiiu limboIIU;
class udpiiu;
class searchTimer : private osiTimer, private epicsMutex {
class searchTimer : private osiTimer {
public:
searchTimer ( udpiiu &iiu, osiTimerQueue &queue );
void notifySearchResponse ( unsigned short retrySeqNo );
void resetPeriod ( double delayToNextTry );
void show ( unsigned level ) const;
private:
void expire ();
void destroy ();
bool again () const;
double delay () const;
const char *name () const;
void setRetryInterval (unsigned retryNo);
epicsMutex mutex;
udpiiu &iiu;
unsigned framesPerTry; /* # of UDP frames per search try */
unsigned framesPerTryCongestThresh; /* one half N tries w congest */
@@ -579,6 +513,12 @@ private:
unsigned short retrySeqNo; /* search retry seq number */
unsigned short retrySeqAtPassBegin; /* search retry seq number at beg of pass through list */
double period; /* period between tries */
void expire ();
void destroy ();
bool again () const;
double delay () const;
const char *name () const;
void setRetryInterval (unsigned retryNo);
};
class repeaterSubscribeTimer : private osiTimer {
@@ -611,7 +551,7 @@ public:
virtual ~udpiiu ();
void shutdown ();
void recvMsg ();
int postMsg ( const osiSockAddr &net_addr,
void postMsg ( const osiSockAddr &net_addr,
char *pInBuf, unsigned long blockSize );
void repeaterRegistrationMessage ( unsigned attemptNumber );
void flush ();
@@ -640,19 +580,19 @@ private:
friend void cacRecvThreadUDP ( void *pParam );
typedef void (udpiiu::*pProtoStubUDP) ( const caHdr &, const osiSockAddr & );
typedef bool (udpiiu::*pProtoStubUDP) ( const caHdr &, const osiSockAddr & );
// UDP protocol dispatch table
static const pProtoStubUDP udpJumpTableCAC[];
// UDP protocol stubs
void noopAction ( const caHdr &, const osiSockAddr & );
void badUDPRespAction ( const caHdr &msg, const osiSockAddr &netAddr );
void searchRespAction ( const caHdr &msg, const osiSockAddr &net_addr );
void exceptionRespAction ( const caHdr &msg, const osiSockAddr &net_addr );
void beaconAction ( const caHdr &msg, const osiSockAddr &net_addr );
void notHereRespAction ( const caHdr &msg, const osiSockAddr &net_addr );
void repeaterAckAction ( const caHdr &msg, const osiSockAddr &net_addr );
bool noopAction ( const caHdr &, const osiSockAddr & );
bool badUDPRespAction ( const caHdr &msg, const osiSockAddr &netAddr );
bool searchRespAction ( const caHdr &msg, const osiSockAddr &net_addr );
bool exceptionRespAction ( const caHdr &msg, const osiSockAddr &net_addr );
bool beaconAction ( const caHdr &msg, const osiSockAddr &net_addr );
bool notHereRespAction ( const caHdr &msg, const osiSockAddr &net_addr );
bool repeaterAckAction ( const caHdr &msg, const osiSockAddr &net_addr );
};
class tcpRecvWatchdog : private osiTimer {
@@ -712,6 +652,7 @@ private:
cac &cacRef;
static tsFreeList < class msgForMultiplyDefinedPV, 16 > freeList;
static epicsMutex freeListMutex;
};
class hostNameCache : public ipAddrToAsciiAsynchronous {
@@ -728,6 +669,7 @@ private:
bool ioComplete;
char hostNameBuf [128];
static tsFreeList < class hostNameCache, 16 > freeList;
static epicsMutex freeListMutex;
};
extern "C" void cacSendThreadTCP ( void *pParam );
@@ -759,12 +701,6 @@ public:
bool ca_v41_ok () const;
bool ca_v42_ok () const;
bool ca_v44_ok () const;
int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue );
int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue );
int readCopyRequest ( nciu &, unsigned type, unsigned nElem, void *pValue );
int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem );
int createChannelRequest ( nciu & );
int clearChannelRequest ( nciu & );
void hostName ( char *pBuf, unsigned bufLength ) const;
const char * pHostName () const; // deprecated - please do not use
@@ -784,7 +720,7 @@ private:
caHdr curMsg;
unsigned long curDataMax;
class bhe *pBHE;
void *pCurData;
char *pCurData;
unsigned minorProtocolVersion;
iiu_conn_state state;
epicsEventId sendThreadFlushSignal;
@@ -799,6 +735,7 @@ private:
bool echoRequestPending;
bool msgHeaderAvailable;
bool sockCloseCompleted;
bool fdRegCallbackNeeded;
unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf );
unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf );
@@ -809,7 +746,6 @@ private:
friend void cacRecvThreadTCP ( void *pParam );
void lastChannelDetachNotify ();
int requestStubStatus ();
// send protocol stubs
int echoRequest ();
@@ -818,37 +754,39 @@ private:
int enableFlowControlRequest ();
int hostNameSetRequest ();
int userNameSetRequest ();
int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue );
int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue );
int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem );
int createChannelRequest ( nciu & );
int clearChannelRequest ( nciu & );
// recv protocol stubs
void noopAction ();
void echoRespAction ();
void writeNotifyRespAction ();
void readNotifyRespAction ();
void eventRespAction ();
void readRespAction ();
void clearChannelRespAction ();
void exceptionRespAction ();
void accessRightsRespAction ();
void claimCIURespAction ();
void verifyAndDisconnectChan ();
void badTCPRespAction ();
bool noopAction ();
bool echoRespAction ();
bool writeNotifyRespAction ();
bool readNotifyRespAction ();
bool eventRespAction ();
bool readRespAction ();
bool clearChannelRespAction ();
bool exceptionRespAction ();
bool accessRightsRespAction ();
bool claimCIURespAction ();
bool verifyAndDisconnectChan ();
bool badTCPRespAction ();
// IO management routines
//void ioInstall ( baseNMIU &io );
//void ioUninstall ( unsigned id );
//void ioDestroy ( unsigned id );
void ioCompletionNotify ( unsigned id, unsigned type,
bool ioCompletionNotify ( unsigned id, unsigned type,
unsigned long count, const void *pData );
void ioExceptionNotify ( unsigned id,
bool ioExceptionNotify ( unsigned id,
int status, const char *pContext );
void ioExceptionNotify ( unsigned id, int status,
bool ioExceptionNotify ( unsigned id, int status,
const char *pContext, unsigned type, unsigned long count );
void ioCompletionNotifyAndDestroy ( unsigned id );
void ioCompletionNotifyAndDestroy ( unsigned id,
bool ioCompletionNotifyAndDestroy ( unsigned id );
bool ioCompletionNotifyAndDestroy ( unsigned id,
unsigned type, unsigned long count, const void *pData );
void ioExceptionNotifyAndDestroy ( unsigned id,
bool ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext );
void ioExceptionNotifyAndDestroy ( unsigned id,
bool ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext, unsigned type, unsigned long count );
void connectAllIO ( nciu &chan );
void disconnectAllIO ( nciu &chan );
@@ -858,7 +796,7 @@ private:
int subscriptionRequest ( netSubscription &subscr, bool userThread );
void subscriptionCancelRequest ( netSubscription &subscr, bool userThread );
typedef void ( tcpiiu::*pProtoStubTCP ) ();
typedef bool ( tcpiiu::*pProtoStubTCP ) ();
static const pProtoStubTCP tcpJumpTableCAC [];
};
@@ -870,7 +808,7 @@ public:
static unsigned maxIndexBitWidth ();
static unsigned minIndexBitWidth ();
private:
struct sockaddr_in addr;
const struct sockaddr_in addr;
};
class bhe : public tsSLNode < bhe >, public inetAddrID {
@@ -891,6 +829,7 @@ private:
epicsTime timeStamp;
double averagePeriod;
static tsFreeList < class bhe, 1024 > freeList;
static epicsMutex freeListMutex;
};
class caErrorCode {
@@ -969,6 +908,7 @@ private:
void exceptionNotify ( cacChannelIO &,
int status, const char *pContext, unsigned type, unsigned long count );
static tsFreeList < class syncGroupNotify, 1024 > freeList;
static epicsMutex freeListMutex;
};
/*
@@ -985,37 +925,33 @@ public:
void show ( unsigned level ) const;
int get ( chid pChan, unsigned type, unsigned long count, void *pValue );
int put ( chid pChan, unsigned type, unsigned long count, const void *pValue );
void * operator new (size_t size);
void operator delete (void *pCadaver, size_t size);
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
private:
cac &client;
unsigned magic;
epicsMutex mutable mutex;
epicsEvent sem;
tsDLList < syncGroupNotify > ioList;
unsigned long opPendCount;
unsigned long seqNo;
epicsEvent sem;
epicsMutex mutex;
tsDLList <syncGroupNotify> ioList;
static tsFreeList < struct CASG, 128 > freeList;
cac &client;
unsigned magic;
~CASG ();
static tsFreeList < struct CASG, 128 > freeList;
static epicsMutex freeListMutex;
friend class syncGroupNotify;
};
class ioCounter {
class ioCounterNet {
public:
ioCounter ();
void decrementOutstandingIO ();
void decrementOutstandingIO ( unsigned seqNumber );
void incrementOutstandingIO ();
void incrementOutstandingIO ( unsigned seqNumber );
unsigned readSequenceOfOutstandingIO () const;
unsigned currentOutstandingIOCount () const;
void cleanUpOutstandingIO ();
void showOutstandingIO ( unsigned level ) const;
void waitForCompletionOfIO ( double delaySec );
ioCounterNet ();
void increment ();
void decrement ();
void decrement ( unsigned seqNumber );
unsigned sequenceNumber () const;
unsigned currentCount () const;
void cleanUp ();
void show ( unsigned level ) const;
void waitForCompletion ( double delaySec );
private:
unsigned pndrecvcnt;
unsigned readSeq;
@@ -1040,15 +976,11 @@ private:
// w/o taking the defaultMutex in this class first
//
//
class cac : public caClient, public nciuPrivate,
public ioCounter
class cac : public caClient, public nciuPrivate
{
public:
cac ( bool enablePreemptiveCallback = false );
virtual ~cac ();
void flush ();
int pend ( double timeout, int early );
unsigned getInitializingThreadsPriority () const;
// beacon management
void beaconNotify ( const inetAddrID &addr );
@@ -1058,9 +990,19 @@ public:
void signalRecvActivity ();
void processRecvBacklog ();
// IO management routines
// outstanding IO count management routines
void incrementOutstandingIO ();
void decrementOutstandingIO ();
void decrementOutstandingIO ( unsigned sequenceNo );
unsigned sequenceNumberOfOutstandingIO () const;
bool ioComplete () const;
// IO management
void flush ();
bool flushPermit () const;
int pendIO ( const double &timeout );
int pendEvent ( const double &timeout );
// exception routines
void exceptionNotify ( int status, const char *pContext,
const char *pFileName, unsigned lineNo );
@@ -1071,29 +1013,31 @@ public:
void genLocalExcepWFL ( long stat, const char *ctx, const char *pFile, unsigned lineNo );
// channel routines
void connectChannel ( unsigned id );
void connectChannel ( bool v44Ok, unsigned id,
bool connectChannel ( unsigned id );
bool connectChannel ( bool v44Ok, unsigned id,
unsigned nativeType, unsigned long nativeCount, unsigned sid );
void disconnectChannel ( unsigned id );
cacChannelIO * createChannelIO ( const char *name_str, cacChannelNotify &chan );
void installNetworkChannel ( nciu &, netiiu *&piiu );
void lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
bool lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
unsigned typeCode, unsigned long count, unsigned minorVersionNumber,
const osiSockAddr & );
void accessRightsNotify ( unsigned id, const caar & );
void uninstallChannel ( nciu & );
cacChannelIO * createChannelIO ( const char *name_str, cacChannelNotify &chan );
void registerService ( cacServiceIO &service );
// sync group routines
CASG * lookupCASG ( unsigned id );
void installCASG ( CASG & );
void uninstallCASG ( CASG & );
void registerService ( cacServiceIO &service );
// fd call back registration
void registerForFileDescriptorCallBack ( CAFDHANDLER *pFunc, void *pArg );
void getFDRegCallback ( CAFDHANDLER *&fdRegFunc, void *&fdRegArg ) const;
// callback preemption control
void enableCallbackPreemption ();
void disableCallbackPreemption ();
bool flushPermit () const;
const char * userNamePointer () const;
// diagnostics
unsigned connectionCount () const;
@@ -1103,9 +1047,12 @@ public:
void replaceErrLogHandler ( caPrintfFunc *ca_printf_func );
void ipAddrToAsciiAsynchronousRequestInstall ( ipAddrToAsciiAsynchronous & request );
void getFDRegCallback ( CAFDHANDLER *&fdRegFunc, void *&fdRegArg ) const;
// misc
const char * userNamePointer () const;
unsigned getInitializingThreadsPriority () const;
private:
ioCounterNet ioCounter;
ipAddrToAsciiEngine ipToAEngine;
cacServiceList services;
tsDLList <tcpiiu> iiuList;
@@ -1116,7 +1063,7 @@ private:
< CASG > sgTable;
resTable
< bhe, inetAddrID > beaconTable;
epicsTime programBeginTime;
epicsTime programBeginTime;
double connTMO;
// defaultMutex can be applied if iiuListMutex is already applied
mutable epicsMutex defaultMutex;
@@ -1135,23 +1082,21 @@ private:
repeaterSubscribeTimer *pRepeaterSubscribeTmr;
unsigned initializingThreadsPriority;
bool enablePreemptiveCallback;
int pendPrivate ( double timeout, int early );
bool setupUDP ();
};
/*
* CA internal functions
*/
int ca_printf (const char *pformat, ...);
int ca_vPrintf (const char *pformat, va_list args);
epicsShareFunc void epicsShareAPI ca_repeater (void);
int ca_printf ( const char *pformat, ... );
int ca_vPrintf ( const char *pformat, va_list args );
epicsShareFunc void epicsShareAPI ca_repeater ( void );
#define genLocalExcep( CAC, STAT, PCTX ) \
(CAC).genLocalExcepWFL ( STAT, PCTX, __FILE__, __LINE__ )
int fetchClientContext (cac **ppcac);
extern "C" void caRepeaterThread (void *pDummy);
extern "C" void ca_default_exception_handler (struct exception_handler_args args);
int fetchClientContext ( cac **ppcac );
extern "C" void caRepeaterThread ( void *pDummy );
extern "C" void ca_default_exception_handler ( struct exception_handler_args args );
#endif /* this must be the last line in this file */
#endif // ifdef INCiocinfh
+97 -144
View File
@@ -22,14 +22,13 @@
#include "iocinf.h"
#include "nciu_IL.h"
#include "netReadCopyIO_IL.h"
#include "netReadNotifyIO_IL.h"
#include "netWriteNotifyIO_IL.h"
#include "netSubscription_IL.h"
#include "cac_IL.h"
#include "ioCounter_IL.h"
tsFreeList < class nciu, 1024 > nciu::freeList;
epicsMutex nciu::freeListMutex;
static const caar defaultAccessRights = { false, false };
@@ -37,7 +36,7 @@ nciu::nciu ( cac &cacIn, netiiu &iiuIn, cacChannelNotify &chanIn,
const char *pNameIn ) :
cacChannelIO ( chanIn ),
cacCtx ( cacIn ),
ar ( defaultAccessRights ),
accessRightState ( defaultAccessRights ),
count ( 0 ),
piiu ( &iiuIn ),
sid ( UINT_MAX ),
@@ -65,8 +64,12 @@ nciu::nciu ( cac &cacIn, netiiu &iiuIn, cacChannelNotify &chanIn,
strcpy ( this->pNameStr, pNameIn );
}
void nciu::destroy ()
nciu::~nciu ()
{
if ( ! this->fullyConstructed () ) {
return;
}
// care is taken so that a lock is not applied during this phase
unsigned i = 0u;
while ( ! this->piiu->destroyAllIO ( *this ) ) {
@@ -78,14 +81,6 @@ void nciu::destroy ()
}
this->piiu->clearChannelRequest ( *this );
this->cacCtx.uninstallChannel ( *this );
delete this;
}
nciu::~nciu ()
{
if ( ! this->fullyConstructed () ) {
return;
}
if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) {
if ( this->f_firstConnectDecrementsOutstandingIO ) {
@@ -107,7 +102,7 @@ int nciu::read ( unsigned type, unsigned long countIn, cacNotify &notify )
if ( INVALID_DB_REQ (type) ) {
return ECA_BADTYPE;
}
if ( ! this->ar.read_access ) {
if ( ! this->accessRightState.read_access ) {
return ECA_NORDACCESS;
}
if ( countIn > UINT_MAX ) {
@@ -120,31 +115,6 @@ int nciu::read ( unsigned type, unsigned long countIn, cacNotify &notify )
return this->piiu->readNotifyRequest ( *this, notify, type, countIn );
}
int nciu::read ( unsigned type, unsigned long countIn, void *pValue )
{
/*
* fail out if channel isnt connected or arguments are
* otherwise invalid
*/
if ( ! this->f_connected ) {
return ECA_DISCONNCHID;
}
if ( INVALID_DB_REQ ( type ) ) {
return ECA_BADTYPE;
}
if ( ! this->ar.read_access ) {
return ECA_NORDACCESS;
}
if ( countIn > this->count || countIn > 0xffff ) {
return ECA_BADCOUNT;
}
if ( countIn == 0 ) {
countIn = this->count;
}
return this->piiu->readCopyRequest ( *this, type, countIn, pValue );
}
/*
* check_a_dbr_string()
*/
@@ -170,7 +140,7 @@ int nciu::write ( unsigned type, unsigned long countIn, const void *pValue )
return ECA_DISCONNCHID;
}
if ( ! this->ar.write_access ) {
if ( ! this->accessRightState.write_access ) {
return ECA_NOWTACCESS;
}
@@ -195,7 +165,7 @@ int nciu::write ( unsigned type, unsigned long countIn, const void *pValue, cacN
return ECA_DISCONNCHID;
}
if ( ! this->ar.write_access ) {
if ( ! this->accessRightState.write_access ) {
return ECA_NOWTACCESS;
}
@@ -222,53 +192,54 @@ void nciu::initiateConnect ()
void nciu::connect ( unsigned nativeType,
unsigned long nativeCount, unsigned sidIn )
{
if ( ! this->f_claimSent ) {
ca_printf (
"CAC: Ignored conn resp to chan lacking virtual circuit CID=%u SID=%u?\n",
this->getId (), sidIn );
return;
}
bool v41Ok;
{
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
if ( this->f_connected ) {
ca_printf (
"CAC: Ignored conn resp to conn chan CID=%u SID=%u?\n",
this->getId (), sidIn );
return;
}
if ( ! this->f_claimSent ) {
ca_printf (
"CAC: Ignored conn resp to chan lacking virtual circuit CID=%u SID=%u?\n",
this->getId (), sidIn );
return;
}
this->lock ();
if ( this->f_connected ) {
ca_printf (
"CAC: Ignored conn resp to conn chan CID=%u SID=%u?\n",
this->getId (), sidIn );
return;
}
if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) {
if ( this->f_firstConnectDecrementsOutstandingIO ) {
this->cacCtx.decrementOutstandingIO ();
if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) {
if ( this->f_firstConnectDecrementsOutstandingIO ) {
this->cacCtx.decrementOutstandingIO ();
}
}
if ( this->piiu ) {
v41Ok = this->piiu->ca_v41_ok ();
}
else {
v41Ok = false;
}
this->typeCode = nativeType;
this->count = nativeCount;
this->sid = sidIn;
this->f_connected = true;
this->f_previousConn = true;
/*
* if less than v4.1 then the server will never
* send access rights and we know that there
* will always be access
*/
if ( ! v41Ok ) {
this->accessRightState.read_access = true;
this->accessRightState.write_access = true;
}
}
bool v41Ok;
if ( this->piiu ) {
v41Ok = this->piiu->ca_v41_ok ();
}
else {
v41Ok = false;
}
this->typeCode = nativeType;
this->count = nativeCount;
this->sid = sidIn;
this->f_connected = true;
this->f_previousConn = true;
/*
* if less than v4.1 then the server will never
* send access rights and we know that there
* will always be access
*/
if ( ! v41Ok ) {
this->ar.read_access = true;
this->ar.write_access = true;
}
this->unlock ();
// resubscribe for monitors from this channel
this->piiu->connectAllIO ( *this );
@@ -281,54 +252,51 @@ void nciu::connect ( unsigned nativeType,
* their call back here
*/
if ( ! v41Ok ) {
this->notify ().accessRightsNotify ( *this, this->ar );
this->notify ().accessRightsNotify ( *this, this->accessRightState );
}
}
void nciu::connectTimeoutNotify ()
{
if ( ! this->f_connectTimeOutSeen ) {
this->lock ();
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
this->f_connectTimeOutSeen = true;
this->unlock ();
}
}
void nciu::disconnect ( netiiu &newiiu )
{
char hostNameBuf[64];
this->hostName ( hostNameBuf, sizeof ( hostNameBuf ) );
bool wasConnected;
this->piiu->disconnectAllIO ( *this );
this->lock ();
{
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
this->piiu = &newiiu;
this->retry = 0u;
this->typeCode = USHRT_MAX;
this->count = 0u;
this->sid = UINT_MAX;
this->ar.read_access = false;
this->ar.write_access = false;
this->f_claimSent = false;
this->piiu = &newiiu;
this->retry = 0u;
this->typeCode = USHRT_MAX;
this->count = 0u;
this->sid = UINT_MAX;
this->accessRightState.read_access = false;
this->accessRightState.write_access = false;
this->f_claimSent = false;
if ( this->f_connected ) {
wasConnected = true;
if ( this->f_connected ) {
wasConnected = true;
}
else {
wasConnected = false;
}
this->f_connected = false;
}
else {
wasConnected = false;
}
this->f_connected = false;
this->unlock ();
if ( wasConnected ) {
/*
* look for events that have an event cancel in progress
*/
this->notify ().disconnectNotify ( *this );
this->notify ().accessRightsNotify ( *this, this->ar );
this->notify ().accessRightsNotify ( *this, this->accessRightState );
}
this->resetRetryCount ();
@@ -340,7 +308,7 @@ void nciu::disconnect ( netiiu &newiiu )
bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel )
{
caHdr msg;
bool status;
bool success;
msg.m_cmmd = htons ( CA_PROTO_SEARCH );
msg.m_available = this->getId ();
@@ -348,26 +316,22 @@ bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisCh
msg.m_count = htons ( CA_MINOR_VERSION );
msg.m_cid = this->getId ();
status = this->piiu->pushDatagramMsg ( msg,
success = this->piiu->pushDatagramMsg ( msg,
this->pNameStr, this->nameLength );
if ( status ) {
if ( success ) {
//
// increment the number of times we have tried
// to find this channel
//
this->lock ();
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
if ( this->retry < MAXCONNTRIES ) {
this->retry++;
}
this->retrySeqNo = retrySeqNumber;
retryNoForThisChannel = this->retry;
}
this->unlock ();
}
return status;
return success;
}
void nciu::hostName ( char *pBuf, unsigned bufLength ) const
@@ -378,39 +342,30 @@ void nciu::hostName ( char *pBuf, unsigned bufLength ) const
// deprecated - please do not use, this is _not_ thread safe
const char * nciu::pHostName () const
{
this->lock ();
const char *pName = this->piiu->pHostName ();
this->unlock ();
return pName; // ouch !
return this->piiu->pHostName (); // ouch !
}
bool nciu::ca_v42_ok () const
{
bool status;
this->lock ();
if ( this->piiu ) {
status = this->piiu->ca_v42_ok ();
}
else {
status = false;
}
this->unlock ();
return status;
return this->piiu->ca_v42_ok ();
}
short nciu::nativeType () const
{
short type;
this->lock ();
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
if ( this->f_connected ) {
type = static_cast <short> ( this->typeCode );
if ( this->typeCode < SHRT_MAX ) {
type = static_cast <short> ( this->typeCode );
}
else {
type = TYPENOTCONN;
}
}
else {
type = TYPENOTCONN;
}
this->unlock ();
return type;
}
@@ -418,25 +373,23 @@ short nciu::nativeType () const
unsigned long nciu::nativeElementCount () const
{
unsigned long countOut;
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
this->lock ();
if ( this->f_connected ) {
countOut = this->count;
}
else {
countOut = 0ul;
}
this->unlock ();
return countOut;
}
channel_state nciu::state () const
{
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
channel_state stateOut;
this->lock ();
if ( this->f_connected ) {
stateOut = cs_conn;
}
@@ -447,14 +400,14 @@ channel_state nciu::state () const
stateOut = cs_never_conn;
}
this->unlock ();
return stateOut;
}
caar nciu::accessRights () const
{
return this->ar;
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
caar tmp = this->accessRightState;
return tmp;
}
const char *nciu::pName () const
@@ -481,6 +434,7 @@ int nciu::createChannelRequest ()
{
int status = this->piiu->createChannelRequest ( *this );
if ( status == ECA_NORMAL ) {
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
this->f_claimSent = true;
}
return status;
@@ -495,7 +449,7 @@ int nciu::subscribe ( unsigned type, unsigned long nElem,
if ( pSubcr ) {
int status = this->piiu->installSubscription ( *pSubcr );
if ( status != ECA_NORMAL ) {
pSubcr->destroy ();
delete static_cast < baseNMIU * > ( pSubcr );
}
else {
pNotifyIO = pSubcr;
@@ -509,7 +463,7 @@ int nciu::subscribe ( unsigned type, unsigned long nElem,
void nciu::notifyStateChangeFirstConnectInCountOfOutstandingIO ()
{
this->lock ();
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
// test is performed via a callback so that locking is correct
if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) {
if ( this->notify ().includeFirstConnectInCountOfOutstandingIO () ) {
@@ -525,7 +479,6 @@ void nciu::notifyStateChangeFirstConnectInCountOfOutstandingIO ()
}
}
}
this->unlock ();
}
void nciu::show ( unsigned level ) const
@@ -539,8 +492,8 @@ void nciu::show ( unsigned level ) const
printf ( ", native type %s, native element count %u",
dbf_type_to_text ( this->typeCode ), this->count );
printf ( ", %sread access, %swrite access",
this->ar.read_access ? "" : "no ",
this->ar.write_access ? "" : "no ");
this->accessRightState.read_access ? "" : "no ",
this->accessRightState.write_access ? "" : "no ");
}
printf ( "\n" );
}
+10 -33
View File
@@ -20,28 +20,18 @@
// nciu inline member functions
//
#include "ioCounter_IL.h"
inline void * nciu::operator new ( size_t size )
{
epicsAutoMutex locker ( nciu::freeListMutex );
return nciu::freeList.allocate ( size );
}
inline void nciu::operator delete ( void *pCadaver, size_t size )
{
epicsAutoMutex locker ( nciu::freeListMutex );
nciu::freeList.release ( pCadaver, size );
}
inline void nciu::lock () const
{
this->cacCtx.nciuPrivate::mutex.lock ();
}
inline void nciu::unlock () const
{
this->cacCtx.nciuPrivate::mutex.unlock ();
}
inline bool nciu::fullyConstructed () const
{
return this->f_fullyConstructed;
@@ -59,8 +49,11 @@ inline void nciu::resetRetryCount ()
inline void nciu::accessRightsStateChange ( const caar &arIn )
{
this->ar = arIn;
this->notify ().accessRightsNotify ( *this, this->ar );
{
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
this->accessRightState = arIn;
}
this->notify ().accessRightsNotify ( *this, arIn );
}
inline ca_uint32_t nciu::getSID () const
@@ -104,14 +97,13 @@ inline void nciu::connect ()
inline void nciu::searchReplySetUp ( netiiu &iiu, unsigned sidIn,
unsigned typeIn, unsigned long countIn )
{
this->lock ();
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
this->piiu = &iiu;
this->typeCode = typeIn;
this->count = countIn;
this->sid = sidIn;
this->ar.read_access = true;
this->ar.write_access = true;
this->unlock ();
this->accessRightState.read_access = true;
this->accessRightState.write_access = true;
}
inline bool nciu::connected () const
@@ -129,19 +121,4 @@ inline netiiu * nciu::getPIIU ()
return this->piiu;
}
inline cac & nciu::getCAC ()
{
return this->cacCtx;
}
inline void nciu::incrementOutstandingIO ( unsigned seqNumber )
{
this->cacCtx.incrementOutstandingIO ( seqNumber );
}
inline void nciu::decrementOutstandingIO ( unsigned seqNumber )
{
this->cacCtx.decrementOutstandingIO ( seqNumber );
}
+5 -34
View File
@@ -16,50 +16,21 @@
#include "baseNMIU_IL.h"
tsFreeList < class netReadNotifyIO, 1024 > netReadNotifyIO::freeList;
epicsMutex netReadNotifyIO::freeListMutex;
netReadNotifyIO::netReadNotifyIO ( nciu &chan, cacNotify &notifyIn ) :
cacNotifyIO ( notifyIn ), baseNMIU ( chan ) {}
baseNMIU ( notifyIn, chan )
{
}
netReadNotifyIO::~netReadNotifyIO ()
{
}
void netReadNotifyIO::destroy ()
{
delete this;
}
void netReadNotifyIO::completionNotify ()
{
this->notify ().exceptionNotify ( this->channelIO (), ECA_INTERNAL, "no data returned ?" );
}
void netReadNotifyIO::completionNotify ( unsigned type,
unsigned long count, const void *pData )
{
this->notify ().completionNotify ( this->channelIO (), type, count, pData );
}
void netReadNotifyIO::exceptionNotify ( int status, const char *pContext )
{
this->notify ().exceptionNotify ( this->channelIO (), status, pContext );
}
void netReadNotifyIO::exceptionNotify ( int status, const char *pContext,
unsigned type, unsigned long count )
{
this->notify ().exceptionNotify ( this->channelIO (), status, pContext, type ,count );
}
cacChannelIO & netReadNotifyIO::channelIO () const
{
return this->channel ();
}
void netReadNotifyIO::show ( unsigned level ) const
{
printf ( "read notify IO at %p\n",
static_cast <const void *> ( this ) );
static_cast < const void * > ( this ) );
if ( level > 0u ) {
this->baseNMIU::show ( level - 1u );
}
+7 -42
View File
@@ -16,31 +16,17 @@
#include "baseNMIU_IL.h"
tsFreeList < class netSubscription, 1024 > netSubscription::freeList;
epicsMutex netSubscription::freeListMutex;
netSubscription::netSubscription ( nciu &chan, unsigned typeIn, unsigned long countIn,
unsigned maskIn, cacNotify &notifyIn ) :
cacNotifyIO ( notifyIn ), baseNMIU ( chan ),
baseNMIU ( notifyIn, chan ),
count ( countIn ), type ( typeIn ), mask ( maskIn )
{
}
netSubscription::~netSubscription ()
{
// netiiu lock must _not_ be applied when calling this
this->chan.getPIIU ()->subscriptionCancelRequest ( *this, true );
}
void netSubscription::destroy ()
{
unsigned i = 0u;
while ( ! this->chan.getPIIU ()->uninstallIO ( *this ) ) {
if ( i++ > 1000u ) {
this->chan.getCAC ().printf (
"CAC: unable to destroy IO\n" );
break;
}
}
delete this;
}
class netSubscription * netSubscription::isSubscription ()
@@ -48,38 +34,17 @@ class netSubscription * netSubscription::isSubscription ()
return this;
}
void netSubscription::completionNotify ()
void netSubscription::ioCancelRequest ()
{
this->notify ().completionNotify ( this->channel () );
}
void netSubscription::completionNotify ( unsigned typeIn,
unsigned long countIn, const void *pDataIn )
{
this->notify ().completionNotify ( this->channel (), typeIn, countIn, pDataIn );
}
void netSubscription::exceptionNotify ( int status, const char *pContext )
{
this->notify ().exceptionNotify ( this->channel (), status, pContext );
}
void netSubscription::exceptionNotify ( int statusIn,
const char *pContextIn, unsigned typeIn, unsigned long countIn )
{
this->notify ().exceptionNotify ( this->channel (), statusIn,
pContextIn, typeIn, countIn );
}
cacChannelIO & netSubscription::channelIO () const
{
return this->channel ();
this->chan.getPIIU ()->subscriptionCancelRequest ( *this, true );
}
void netSubscription::show ( unsigned level ) const
{
printf ( "event subscription IO at %p, type %s, element count %lu, mask %u\n",
this, dbf_type_to_text ( static_cast <int> (this->type) ), this->count, this->mask );
static_cast < const void * > ( this ),
dbf_type_to_text ( static_cast < int > ( this->type ) ),
this->count, this->mask );
if ( level > 0u ) {
this->baseNMIU::show ( level - 1u );
}
+3 -36
View File
@@ -16,9 +16,10 @@
#include "baseNMIU_IL.h"
tsFreeList < class netWriteNotifyIO, 1024 > netWriteNotifyIO::freeList;
epicsMutex netWriteNotifyIO::freeListMutex;
netWriteNotifyIO::netWriteNotifyIO ( nciu &chan, cacNotify &notifyIn ) :
cacNotifyIO ( notifyIn ), baseNMIU ( chan )
baseNMIU ( notifyIn, chan )
{
}
@@ -26,44 +27,10 @@ netWriteNotifyIO::~netWriteNotifyIO ()
{
}
void netWriteNotifyIO::destroy ()
{
delete this;
}
void netWriteNotifyIO::completionNotify ()
{
this->notify ().completionNotify ( this->channelIO () );
}
void netWriteNotifyIO::completionNotify (
unsigned /* type */, unsigned long /* count */,
const void * /* pData */ )
{
this->notify ().completionNotify ( this->channelIO () );
}
void netWriteNotifyIO::exceptionNotify ( int status,
const char *pContext )
{
this->notify ().exceptionNotify ( this->channelIO (), status, pContext );
}
void netWriteNotifyIO::exceptionNotify ( int status,
const char *pContext, unsigned type, unsigned long count )
{
this->notify ().exceptionNotify ( this->channelIO (), status, pContext, type, count );
}
cacChannelIO & netWriteNotifyIO::channelIO () const
{
return this->channel ();
}
void netWriteNotifyIO::show ( unsigned level ) const
{
printf ( "read write notify IO at %p\n",
static_cast <const void *> ( this ) );
static_cast < const void * > ( this ) );
if ( level > 0u ) {
this->baseNMIU::show ( level - 1u );
}
+14 -26
View File
@@ -45,11 +45,6 @@ void netiiu::show ( unsigned level ) const
}
}
unsigned netiiu::channelCount () const
{
return this->channelList.count ();
}
// cac lock must also be applied when
// calling this
void netiiu::attachChannel ( nciu &chan )
@@ -115,7 +110,7 @@ bool netiiu::destroyAllIO ( nciu &chan )
}
}
while ( baseNMIU *pIO = eventQ.get () ) {
pIO->destroy ();
delete pIO;
}
return true;
}
@@ -142,23 +137,24 @@ void netiiu::resetChannelRetryCounts ()
bool netiiu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel )
{
bool status;
bool success;
epicsAutoMutex autoMutex ( this->mutex );
nciu *pChan = this->channelList.first ();
if ( pChan ) {
status = pChan->searchMsg ( retrySeqNumber, retryNoForThisChannel );
if ( status ) {
this->channelList.remove ( *pChan );
if ( nciu *pChan = this->channelList.get () ) {
success = pChan->searchMsg ( retrySeqNumber, retryNoForThisChannel );
if ( success ) {
this->channelList.add ( *pChan );
}
else {
this->channelList.push ( *pChan );
}
}
else {
status = false;
success = false;
}
return status;
return success;
}
bool netiiu::ca_v42_ok () const
@@ -195,11 +191,6 @@ int netiiu::writeNotifyRequest ( nciu &, cacNotify &, unsigned, unsigned, const
return ECA_DISCONNCHID;
}
int netiiu::readCopyRequest ( nciu &, unsigned, unsigned, void * )
{
return ECA_DISCONNCHID;
}
int netiiu::readNotifyRequest ( nciu &, cacNotify &, unsigned, unsigned )
{
return ECA_DISCONNCHID;
@@ -226,9 +217,6 @@ void netiiu::subscriptionCancelRequest ( netSubscription &, bool )
int netiiu::installSubscription ( netSubscription &subscr )
{
// we must install the subscription first on the channel so that
// proper installation is guaranteed to occur if a connect occurs
// beteen these two steps
{
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr );
@@ -265,11 +253,11 @@ void netiiu::connectAllIO ( nciu & )
bool netiiu::uninstallIO ( baseNMIU &io )
{
epicsAutoMutex autoMutex ( this->mutex );
if ( ! io.channel ().verifyIIU ( *this ) ) {
return false;
if ( io.channel ().verifyIIU ( *this ) ) {
io.channel ().tcpiiuPrivateListOfIO::eventq.remove ( io );
return true;
}
io.channel ().tcpiiuPrivateListOfIO::eventq.remove ( io );
return true;
return false;
}
double netiiu::beaconPeriod () const
+186 -189
View File
@@ -10,29 +10,26 @@
* Author: Jeff Hill
*/
#include "iocinf.h"
#include "tsMinMax.h"
#ifdef DEBUG
# define debugPrintf(argsInParen) printf argsInParen
#else
# define debugPrintf(argsInParen)
#endif
#include "iocinf.h"
#include "netiiu_IL.h"
//
// searchTimer::searchTimer ()
//
searchTimer::searchTimer (udpiiu &iiuIn, osiTimerQueue &queueIn) :
osiTimer (queueIn),
iiu (iiuIn),
framesPerTry (INITIALTRIESPERFRAME),
framesPerTryCongestThresh (UINT_MAX),
minRetry (UINT_MAX),
retry (0u),
searchTriesWithinThisPass (0u),
searchResponsesWithinThisPass (0u),
retrySeqNo (0u),
retrySeqAtPassBegin (0u),
period (CA_RECAST_DELAY)
searchTimer::searchTimer ( udpiiu &iiuIn, osiTimerQueue &queueIn ) :
osiTimer ( queueIn ),
iiu ( iiuIn ),
framesPerTry ( INITIALTRIESPERFRAME ),
framesPerTryCongestThresh ( UINT_MAX ),
minRetry ( UINT_MAX ),
retry ( 0u ),
searchTriesWithinThisPass ( 0u ),
searchResponsesWithinThisPass ( 0u ),
retrySeqNo ( 0u ),
retrySeqAtPassBegin ( 0u ),
period ( CA_RECAST_DELAY )
{
}
@@ -47,16 +44,17 @@ void searchTimer::resetPeriod ( double delayToNextTry )
delayToNextTry = CA_RECAST_DELAY;
}
this->lock ();
this->retry = 0;
if ( this->period > delayToNextTry ) {
reschedule = true;
{
epicsAutoMutex locker ( this->mutex );
this->retry = 0;
if ( this->period > delayToNextTry ) {
reschedule = true;
}
else {
reschedule = false;
}
this->period = CA_RECAST_DELAY;
}
else {
reschedule = false;
}
this->period = CA_RECAST_DELAY;
this->unlock ();
if ( reschedule ) {
this->reschedule ( delayToNextTry );
@@ -76,24 +74,23 @@ void searchTimer::setRetryInterval (unsigned retryNo)
unsigned idelay;
double delay;
this->lock ();
epicsAutoMutex locker ( this->mutex );
/*
* set the retry number
*/
this->retry = min (retryNo, MAXCONNTRIES+1u);
this->retry = tsMin ( retryNo, MAXCONNTRIES + 1u );
/*
* set the retry interval
*/
idelay = 1u << min (this->retry, CHAR_BIT*sizeof(idelay)-1u);
idelay = 1u << tsMin (this->retry, CHAR_BIT*sizeof(idelay)-1u);
delay = idelay * CA_RECAST_DELAY; /* sec */
/*
* place upper limit on the retry delay
*/
this->period = min (CA_RECAST_PERIOD, delay);
this->unlock ();
this->period = tsMin ( CA_RECAST_PERIOD, delay );
debugPrintf ( ("new CA search period is %f sec\n", this->period) );
}
@@ -109,20 +106,20 @@ void searchTimer::notifySearchResponse ( unsigned short retrySeqNoIn )
{
bool reschedualNeeded;
this->lock ();
{
epicsAutoMutex locker ( this->mutex );
if ( this->retrySeqAtPassBegin <= retrySeqNoIn ) {
if ( this->searchResponsesWithinThisPass < UINT_MAX ) {
this->searchResponsesWithinThisPass++;
}
}
if ( this->retrySeqAtPassBegin <= retrySeqNoIn ) {
if ( this->searchResponsesWithinThisPass < UINT_MAX ) {
this->searchResponsesWithinThisPass++;
}
}
reschedualNeeded = ( retrySeqNoIn == this->retrySeqNo );
this->unlock ();
reschedualNeeded = ( retrySeqNoIn == this->retrySeqNo );
}
if ( reschedualNeeded ) {
this->reschedule (0.0);
this->reschedule ( 0.0 );
}
}
@@ -141,159 +138,161 @@ void searchTimer::expire ()
return;
}
this->lock ();
/*
* increment the retry sequence number
*/
this->retrySeqNo++; /* allowed to roll over */
/*
* dynamically adjust the number of UDP frames per
* try depending how many search requests are not
* replied to
*
* This determines how many search request can be
* sent together (at the same instant in time).
*
* The variable this->framesPerTry
* determines the number of UDP frames to be sent
* each time that expire() is called.
* If this value is too high we will waste some
* network bandwidth. If it is too low we will
* use very little of the incoming UDP message
* buffer associated with the server's port and
* will therefore take longer to connect. We
* initialize this->framesPerTry
* to a prime number so that it is less likely that the
* same channel is in the last UDP frame
* sent every time that this is called (and
* potentially discarded by a CA server with
* a small UDP input queue).
*/
/*
* increase frames per try only if we see better than
* a 93.75% success rate for one pass through the list
*/
if (this->searchResponsesWithinThisPass >
(this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/16u)) ) {
{
epicsAutoMutex locker ( this->mutex );
/*
* increase UDP frames per try if we have a good score
* increment the retry sequence number
*/
if ( this->framesPerTry < MAXTRIESPERFRAME ) {
/*
* a congestion avoidance threshold similar to TCP is now used
*/
if ( this->framesPerTry < this->framesPerTryCongestThresh ) {
this->framesPerTry += this->framesPerTry;
}
else {
this->framesPerTry += (this->framesPerTry/8) + 1;
}
debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n",
this->framesPerTry, this->searchTriesWithinThisPass, this->searchResponsesWithinThisPass) );
}
}
/*
* if we detect congestion because we have less than a 87.5% success
* rate then gradually reduce the frames per try
*/
else if ( this->searchResponsesWithinThisPass <
(this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/8u)) ) {
if (this->framesPerTry>1) {
this->framesPerTry--;
}
this->framesPerTryCongestThresh = this->framesPerTry/2 + 1;
debugPrintf ( ("Congestion detected - set frames per try to %u t=%u r=%u\n",
this->framesPerTry, this->searchTriesWithinThisPass,
this->searchResponsesWithinThisPass) );
}
this->retrySeqNo++; /* allowed to roll over */
while ( 1 ) {
/*
* clear counter when we reach the end of the list
* dynamically adjust the number of UDP frames per
* try depending how many search requests are not
* replied to
*
* if we are making some progress then
* dont increase the delay between search
* requests
* This determines how many search request can be
* sent together (at the same instant in time).
*
* The variable this->framesPerTry
* determines the number of UDP frames to be sent
* each time that expire() is called.
* If this value is too high we will waste some
* network bandwidth. If it is too low we will
* use very little of the incoming UDP message
* buffer associated with the server's port and
* will therefore take longer to connect. We
* initialize this->framesPerTry
* to a prime number so that it is less likely that the
* same channel is in the last UDP frame
* sent every time that this is called (and
* potentially discarded by a CA server with
* a small UDP input queue).
*/
if ( this->searchTriesWithinThisPass >= this->iiu.channelCount () ) {
if ( this->searchResponsesWithinThisPass == 0u ) {
debugPrintf ( ("increasing search try interval\n") );
this->setRetryInterval ( this->minRetry + 1u );
}
this->minRetry = UINT_MAX;
/*
* increment the retry sequence number
* (this prevents the time of the next search
* try from being set to the current time if
* we are handling a response from an old
* search message)
*/
this->retrySeqNo++; /* allowed to roll over */
/*
* so that old search tries will not update the counters
*/
this->retrySeqAtPassBegin = this->retrySeqNo;
this->searchTriesWithinThisPass = 0;
this->searchResponsesWithinThisPass = 0;
debugPrintf ( ("saw end of list\n") );
}
unsigned retryNoForThisChannel;
if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) {
nFrameSent++;
if ( nFrameSent >= this->framesPerTry ) {
break;
}
this->iiu.flush ();
if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) {
break;
}
}
this->minRetry = min ( this->minRetry, retryNoForThisChannel );
if ( this->searchTriesWithinThisPass < UINT_MAX ) {
this->searchTriesWithinThisPass++;
}
if ( nChanSent < UINT_MAX ) {
nChanSent++;
}
/*
* dont send any of the channels twice within one try
* increase frames per try only if we see better than
* a 93.75% success rate for one pass through the list
*/
if ( nChanSent >= this->iiu.channelCount () ) {
if (this->searchResponsesWithinThisPass >
(this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/16u)) ) {
/*
* add one to nFrameSent because there may be
* one more partial frame to be sent
* increase UDP frames per try if we have a good score
*/
nFrameSent++;
/*
* cap this->framesPerTry to
* the number of frames required for all of
* the unresolved channels
*/
if ( this->framesPerTry > nFrameSent ) {
this->framesPerTry = nFrameSent;
if ( this->framesPerTry < MAXTRIESPERFRAME ) {
/*
* a congestion avoidance threshold similar to TCP is now used
*/
if ( this->framesPerTry < this->framesPerTryCongestThresh ) {
this->framesPerTry += this->framesPerTry;
}
else {
this->framesPerTry += (this->framesPerTry/8) + 1;
}
debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n",
this->framesPerTry, this->searchTriesWithinThisPass, this->searchResponsesWithinThisPass) );
}
}
/*
* if we detect congestion because we have less than a 87.5% success
* rate then gradually reduce the frames per try
*/
else if ( this->searchResponsesWithinThisPass <
(this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/8u)) ) {
if (this->framesPerTry>1) {
this->framesPerTry--;
}
this->framesPerTryCongestThresh = this->framesPerTry/2 + 1;
debugPrintf ( ("Congestion detected - set frames per try to %u t=%u r=%u\n",
this->framesPerTry, this->searchTriesWithinThisPass,
this->searchResponsesWithinThisPass) );
}
while ( 1 ) {
/*
* clear counter when we reach the end of the list
*
* if we are making some progress then
* dont increase the delay between search
* requests
*/
if ( this->searchTriesWithinThisPass >= this->iiu.channelCount () ) {
if ( this->searchResponsesWithinThisPass == 0u ) {
debugPrintf ( ("increasing search try interval\n") );
this->setRetryInterval ( this->minRetry + 1u );
}
break;
this->minRetry = UINT_MAX;
/*
* increment the retry sequence number
* (this prevents the time of the next search
* try from being set to the current time if
* we are handling a response from an old
* search message)
*/
this->retrySeqNo++; /* allowed to roll over */
/*
* so that old search tries will not update the counters
*/
this->retrySeqAtPassBegin = this->retrySeqNo;
this->searchTriesWithinThisPass = 0;
this->searchResponsesWithinThisPass = 0;
debugPrintf ( ("saw end of list\n") );
}
unsigned retryNoForThisChannel;
if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) {
nFrameSent++;
if ( nFrameSent >= this->framesPerTry ) {
break;
}
this->iiu.flush ();
if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) {
break;
}
}
if ( this->minRetry > retryNoForThisChannel ) {
this->minRetry = retryNoForThisChannel;
}
if ( this->searchTriesWithinThisPass < UINT_MAX ) {
this->searchTriesWithinThisPass++;
}
if ( nChanSent < UINT_MAX ) {
nChanSent++;
}
/*
* dont send any of the channels twice within one try
*/
if ( nChanSent >= this->iiu.channelCount () ) {
/*
* add one to nFrameSent because there may be
* one more partial frame to be sent
*/
nFrameSent++;
/*
* cap this->framesPerTry to
* the number of frames required for all of
* the unresolved channels
*/
if ( this->framesPerTry > nFrameSent ) {
this->framesPerTry = nFrameSent;
}
break;
}
}
}
this->unlock ();
// flush out the search request buffer
this->iiu.flush ();
@@ -310,13 +309,11 @@ bool searchTimer::again () const
if ( this->iiu.channelCount () == 0 ) {
return false;
}
else if ( this->retry < MAXCONNTRIES ) {
return true;
}
else {
if ( this->retry < MAXCONNTRIES ) {
return true;
}
else {
return false;
}
return false;
}
}
@@ -325,7 +322,7 @@ double searchTimer::delay () const
return this->period;
}
void searchTimer::show (unsigned /* level */) const
void searchTimer::show ( unsigned /* level */ ) const
{
}
+331 -344
View File
File diff suppressed because it is too large Load Diff
+352 -378
View File
@@ -18,8 +18,6 @@
#include "netiiu_IL.h"
#include "cac_IL.h"
typedef void (*pProtoStubUDP) (udpiiu *piiu, caHdr *pMsg, const struct sockaddr_in *pnet_addr);
// UDP protocol dispatch table
const udpiiu::pProtoStubUDP udpiiu::udpJumpTableCAC [] =
{
@@ -41,293 +39,8 @@ const udpiiu::pProtoStubUDP udpiiu::udpJumpTableCAC [] =
&udpiiu::badUDPRespAction,
&udpiiu::badUDPRespAction,
&udpiiu::repeaterAckAction,
&udpiiu::badUDPRespAction,
&udpiiu::badUDPRespAction,
&udpiiu::badUDPRespAction,
&udpiiu::badUDPRespAction,
&udpiiu::badUDPRespAction,
&udpiiu::badUDPRespAction,
&udpiiu::badUDPRespAction,
&udpiiu::badUDPRespAction,
&udpiiu::badUDPRespAction,
&udpiiu::badUDPRespAction
};
//
// udpiiu::recvMsg ()
//
void udpiiu::recvMsg ()
{
osiSockAddr src;
osiSocklen_t src_size = sizeof (src);
int status;
status = recvfrom ( this->sock, this->recvBuf, sizeof ( this->recvBuf ), 0,
&src.sa, &src_size );
if ( status <= 0 ) {
if ( status == 0 ) {
return;
}
int errnoCpy = SOCKERRNO;
if ( errnoCpy == SOCK_SHUTDOWN ) {
return;
}
if ( errnoCpy == SOCK_ENOTSOCK ) {
return;
}
if ( errnoCpy == SOCK_EBADF ) {
return;
}
if ( errnoCpy == SOCK_EINTR ) {
return;
}
# ifdef linux
/*
* Avoid spurious ECONNREFUSED bug
* in linux
*/
if ( errnoCpy == SOCK_ECONNREFUSED ) {
return;
}
# endif
ca_printf ( "Unexpected UDP recv error was \"%s\"\n",
SOCKERRSTR (errnoCpy) );
}
else if ( status > 0 ) {
status = this->postMsg ( src,
this->recvBuf, (unsigned long) status );
if ( status != ECA_NORMAL ) {
char buf[64];
sockAddrToDottedIP ( &src.sa, buf, sizeof ( buf ) );
ca_printf (
"%s: bad UDP msg from %s because \"%s\"\n", __FILE__,
buf, ca_message (status) );
return;
}
}
return;
}
/*
* cacRecvThreadUDP ()
*/
extern "C" void cacRecvThreadUDP (void *pParam)
{
udpiiu *piiu = (udpiiu *) pParam;
do {
piiu->recvMsg ();
} while ( ! piiu->shutdownCmd );
epicsEventSignal ( piiu->recvThreadExitSignal );
}
/*
* udpiiu::repeaterRegistrationMessage ()
*
* register with the repeater
*/
void udpiiu::repeaterRegistrationMessage ( unsigned attemptNumber )
{
caRepeaterRegistrationMessage ( this->sock, this->repeaterPort, attemptNumber );
}
/*
* caRepeaterRegistrationMessage ()
*
* register with the repeater
*/
epicsShareFunc void epicsShareAPI caRepeaterRegistrationMessage (
SOCKET sock, unsigned repeaterPort, unsigned attemptNumber )
{
caHdr msg;
osiSockAddr saddr;
int status;
int len;
/*
* In 3.13 beta 11 and before the CA repeater calls local_addr()
* to determine a local address and does not allow registration
* messages originating from other addresses. In these
* releases local_addr() returned the address of the first enabled
* interface found, and this address may or may not have been the loop
* back address. Starting with 3.13 beta 12 local_addr() was
* changed to always return the address of the first enabled
* non-loopback interface because a valid non-loopback local
* address is required in the beacon messages. Therefore, to
* guarantee compatibility with past versions of the repeater
* we alternate between the address returned by local_addr()
* and the loopback address here.
*
* CA repeaters in R3.13 beta 12 and higher allow
* either the loopback address or the address returned
* by local address (the first non-loopback address found)
*/
if ( attemptNumber & 1 ) {
saddr = osiLocalAddr ( sock );
if ( saddr.sa.sa_family != AF_INET ) {
/*
* use the loop back address to communicate with the CA repeater
* if this os does not have interface query capabilities
*
* this will only work with 3.13 beta 12 CA repeaters or later
*/
saddr.ia.sin_family = AF_INET;
saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
saddr.ia.sin_port = htons ( repeaterPort );
}
else {
saddr.ia.sin_port = htons ( repeaterPort );
}
}
else {
saddr.ia.sin_family = AF_INET;
saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
saddr.ia.sin_port = htons ( repeaterPort );
}
memset ( (char *) &msg, 0, sizeof (msg) );
msg.m_cmmd = htons ( REPEATER_REGISTER );
msg.m_available = saddr.ia.sin_addr.s_addr;
/*
* Intentionally sending a zero length message here
* until most CA repeater daemons have been restarted
* (and only then will accept the above protocol)
* (repeaters began accepting this protocol
* starting with EPICS 3.12)
*
* SOLARIS will not accept a zero length message
* and we are just porting there for 3.12 so we will use the
* new protocol for 3.12
*
* recent versions of UCX will not accept a zero
* length message and we will assume that folks
* using newer versions of UCX have rebooted (and
* therefore restarted the CA repeater - and therefore
* moved it to an EPICS release that accepts this protocol)
*/
# if defined ( DOES_NOT_ACCEPT_ZERO_LENGTH_UDP )
len = sizeof (msg);
# else
len = 0;
# endif
status = sendto ( sock, (char *) &msg, len,
0, (struct sockaddr *) &saddr, sizeof ( saddr ) );
if ( status < 0 ) {
int errnoCpy = SOCKERRNO;
if ( errnoCpy != SOCK_EINTR &&
/*
* This is returned from Linux when
* the repeater isnt running
*/
errnoCpy != SOCK_ECONNREFUSED ) {
ca_printf ( "CAC: error sending to repeater was \"%s\"\n",
SOCKERRSTR (errnoCpy) );
}
}
}
/*
* caStartRepeaterIfNotInstalled ()
*
* Test for the repeater already installed
*
* NOTE: potential race condition here can result
* in two copies of the repeater being spawned
* however the repeater detects this, prints a message,
* and lets the other task start the repeater.
*
* QUESTION: is there a better way to test for a port in use?
* ANSWER: none that I can find.
*
* Problems with checking for the repeater installed
* by attempting to bind a socket to its address
* and port.
*
* 1) Closed socket may not release the bound port
* before the repeater wakes up and tries to grab it.
* Attempting to bind the open socket to another port
* also does not work.
*
* 072392 - problem solved by using SO_REUSEADDR
*/
epicsShareFunc void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repeaterPort )
{
bool installed = false;
int status;
SOCKET tmpSock;
struct sockaddr_in bd;
int flag;
if ( repeaterPort > 0xffff ) {
ca_printf ( "caStartRepeaterIfNotInstalled () : strange repeater port specified\n");
return;
}
tmpSock = socket ( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
if ( tmpSock != INVALID_SOCKET ) {
ca_uint16_t port = static_cast < ca_uint16_t > ( repeaterPort );
memset ( (char *) &bd, 0, sizeof ( bd ) );
bd.sin_family = AF_INET;
bd.sin_addr.s_addr = htonl ( INADDR_ANY );
bd.sin_port = htons ( port );
status = bind ( tmpSock, (struct sockaddr *) &bd, sizeof ( bd ) );
if ( status < 0 ) {
if ( SOCKERRNO == SOCK_EADDRINUSE ) {
installed = true;
}
else {
ca_printf ( "caStartRepeaterIfNotInstalled () : bind failed\n");
}
}
}
/*
* turn on reuse only after the test so that
* this works on kernels that support multicast
*/
flag = TRUE;
status = setsockopt ( tmpSock, SOL_SOCKET, SO_REUSEADDR,
(char *) &flag, sizeof ( flag ) );
if ( status < 0 ) {
ca_printf ( "caStartRepeaterIfNotInstalled () : set socket option reuseaddr set failed\n");
}
socket_close ( tmpSock );
if ( ! installed ) {
osiSpawnDetachedProcessReturn osptr;
/*
* This is not called if the repeater is known to be
* already running. (in the event of a race condition
* the 2nd repeater exits when unable to attach to the
* repeater's port)
*/
osptr = osiSpawnDetachedProcess ( "CA Repeater", "caRepeater" );
if ( osptr == osiSpawnDetachedProcessNoSupport ) {
epicsThreadId tid;
tid = epicsThreadCreate ( "CAC-repeater", epicsThreadPriorityLow,
epicsThreadGetStackSize (epicsThreadStackMedium), caRepeaterThread, 0);
if ( tid == 0 ) {
ca_printf ("caStartRepeaterIfNotInstalled : unable to create CA repeater daemon thread\n");
}
}
else if ( osptr == osiSpawnDetachedProcessFail ) {
ca_printf ( "caStartRepeaterIfNotInstalled (): unable to start CA repeater daemon detached process\n" );
}
}
}
//
// udpiiu::udpiiu ()
//
@@ -336,11 +49,11 @@ udpiiu::udpiiu ( cac &cac ) :
{
static const unsigned short PORT_ANY = 0u;
osiSockAddr addr;
int boolValue = TRUE;
int boolValue = true;
int status;
this->repeaterPort =
envGetInetPortConfigParam (&EPICS_CA_REPEATER_PORT, CA_REPEATER_PORT);
envGetInetPortConfigParam ( &EPICS_CA_REPEATER_PORT, CA_REPEATER_PORT );
this->serverPort =
envGetInetPortConfigParam ( &EPICS_CA_SERVER_PORT, CA_SERVER_PORT );
@@ -353,10 +66,10 @@ udpiiu::udpiiu ( cac &cac ) :
}
status = setsockopt ( this->sock, SOL_SOCKET, SO_BROADCAST,
(char *) &boolValue, sizeof (boolValue) );
if (status<0) {
ca_printf ("CAC: unable to enable IP broadcasting because = \"%s\"\n",
SOCKERRSTR (SOCKERRNO));
(char *) &boolValue, sizeof ( boolValue ) );
if ( status < 0 ) {
ca_printf ("CAC: IP broadcasting enable failed because = \"%s\"\n",
SOCKERRSTR ( SOCKERRNO ) );
}
#if 0
@@ -454,7 +167,7 @@ udpiiu::udpiiu ( cac &cac ) :
void *fdRegArg;
this->pCAC ()->getFDRegCallback ( fdRegFunc, fdRegArg );
if ( fdRegFunc ) {
( *fdRegFunc ) ( fdRegArg, this->sock, TRUE );
( *fdRegFunc ) ( fdRegArg, this->sock, true );
}
}
@@ -474,7 +187,7 @@ udpiiu::~udpiiu ()
void *fdRegArg;
this->pCAC ()->getFDRegCallback ( fdRegFunc, fdRegArg );
if ( fdRegFunc ) {
( *fdRegFunc ) ( fdRegArg, this->sock, FALSE );
( *fdRegFunc ) ( fdRegArg, this->sock, false );
}
if ( ! this->sockCloseCompleted ) {
@@ -482,77 +195,326 @@ udpiiu::~udpiiu ()
}
}
/*
* udpiiu::shutdown ()
*/
void udpiiu::shutdown ()
//
// udpiiu::recvMsg ()
//
void udpiiu::recvMsg ()
{
bool laborNeeded;
osiSockAddr src;
osiSocklen_t src_size = sizeof ( src );
int status;
{
epicsAutoMutex autoMutex ( this->mutex );
laborNeeded = ! this->shutdownCmd;
this->shutdownCmd = true;
}
status = recvfrom ( this->sock, this->recvBuf, sizeof ( this->recvBuf ), 0,
&src.sa, &src_size );
if ( status <= 0 ) {
if ( laborNeeded ) {
int status;
osiSockAddr addr;
caHdr msg;
msg.m_cmmd = htons ( CA_PROTO_NOOP );
msg.m_available = htonl ( 0u );
msg.m_dataType = htons ( 0u );
msg.m_count = htons ( 0u );
msg.m_cid = htonl ( 0u );
msg.m_postsize = htons ( 0u );
addr.ia.sin_family = AF_INET;
addr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
addr.ia.sin_port = htons ( this->localPort );
// send a wakeup msg so the UDP recv thread will exit
status = sendto ( this->sock, reinterpret_cast < const char * > ( &msg ),
sizeof (msg), 0, &addr.sa, sizeof ( addr.sa ) );
if ( status < 0 ) {
// this knocks the UDP input thread out of recv ()
// on all os except linux
status = socket_close ( this->sock );
if ( status ) {
errlogPrintf ("CAC UDP socket close error was %s\n",
SOCKERRSTR ( SOCKERRNO ) );
}
else {
this->sockCloseCompleted = true;
}
if ( status == 0 ) {
return;
}
// wait for recv threads to exit
epicsEventMustWait ( this->recvThreadExitSignal );
int errnoCpy = SOCKERRNO;
if ( errnoCpy == SOCK_SHUTDOWN ) {
return;
}
if ( errnoCpy == SOCK_ENOTSOCK ) {
return;
}
if ( errnoCpy == SOCK_EBADF ) {
return;
}
if ( errnoCpy == SOCK_EINTR ) {
return;
}
# ifdef linux
/*
* Avoid spurious ECONNREFUSED bug
* in linux
*/
if ( errnoCpy == SOCK_ECONNREFUSED ) {
return;
}
# endif
ca_printf ( "Unexpected UDP recv error was \"%s\"\n",
SOCKERRSTR (errnoCpy) );
}
else if ( status > 0 ) {
this->postMsg ( src,
this->recvBuf, (unsigned long) status );
}
}
void udpiiu::badUDPRespAction ( const caHdr &msg, const osiSockAddr &netAddr )
{
char buf[256];
sockAddrToDottedIP ( &netAddr.sa, buf, sizeof ( buf ) );
ca_printf ( "CAC: Bad response code in UDP message from %s was %u\n",
buf, msg.m_cmmd);
}
void udpiiu::noopAction ( const caHdr &, const osiSockAddr & )
{
return;
}
void udpiiu::searchRespAction ( const caHdr &msg, const osiSockAddr &addr )
/*
* cacRecvThreadUDP ()
*/
extern "C" void cacRecvThreadUDP ( void *pParam )
{
udpiiu *piiu = (udpiiu *) pParam;
do {
piiu->recvMsg ();
} while ( ! piiu->shutdownCmd );
epicsEventSignal ( piiu->recvThreadExitSignal );
}
/*
* udpiiu::repeaterRegistrationMessage ()
*
* register with the repeater
*/
void udpiiu::repeaterRegistrationMessage ( unsigned attemptNumber )
{
caRepeaterRegistrationMessage ( this->sock, this->repeaterPort, attemptNumber );
}
/*
* caRepeaterRegistrationMessage ()
*
* register with the repeater
*/
epicsShareFunc void epicsShareAPI caRepeaterRegistrationMessage (
SOCKET sock, unsigned repeaterPort, unsigned attemptNumber )
{
caHdr msg;
osiSockAddr saddr;
int status;
int len;
/*
* In 3.13 beta 11 and before the CA repeater calls local_addr()
* to determine a local address and does not allow registration
* messages originating from other addresses. In these
* releases local_addr() returned the address of the first enabled
* interface found, and this address may or may not have been the loop
* back address. Starting with 3.13 beta 12 local_addr() was
* changed to always return the address of the first enabled
* non-loopback interface because a valid non-loopback local
* address is required in the beacon messages. Therefore, to
* guarantee compatibility with past versions of the repeater
* we alternate between the address returned by local_addr()
* and the loopback address here.
*
* CA repeaters in R3.13 beta 12 and higher allow
* either the loopback address or the address returned
* by local address (the first non-loopback address found)
*/
if ( attemptNumber & 1 ) {
saddr = osiLocalAddr ( sock );
if ( saddr.sa.sa_family != AF_INET ) {
/*
* use the loop back address to communicate with the CA repeater
* if this os does not have interface query capabilities
*
* this will only work with 3.13 beta 12 CA repeaters or later
*/
saddr.ia.sin_family = AF_INET;
saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
saddr.ia.sin_port = htons ( repeaterPort );
}
else {
saddr.ia.sin_port = htons ( repeaterPort );
}
}
else {
saddr.ia.sin_family = AF_INET;
saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
saddr.ia.sin_port = htons ( repeaterPort );
}
memset ( (char *) &msg, 0, sizeof (msg) );
msg.m_cmmd = htons ( REPEATER_REGISTER );
msg.m_available = saddr.ia.sin_addr.s_addr;
/*
* Intentionally sending a zero length message here
* until most CA repeater daemons have been restarted
* (and only then will they accept the above protocol)
* (repeaters began accepting this protocol
* starting with EPICS 3.12)
*/
# if defined ( DOES_NOT_ACCEPT_ZERO_LENGTH_UDP )
len = sizeof (msg);
# else
len = 0;
# endif
status = sendto ( sock, (char *) &msg, len,
0, (struct sockaddr *) &saddr, sizeof ( saddr ) );
if ( status < 0 ) {
int errnoCpy = SOCKERRNO;
/*
* Different OS return different codes when the repeater isnt running.
* Its ok to supress these messages because I print another warning message
* if we time out registerring with the repeater.
*
* Linux returns SOCK_ECONNREFUSED
* Windows 2000 returns SOCK_ECONNRESET
*/
if ( errnoCpy != SOCK_EINTR &&
errnoCpy != SOCK_ECONNREFUSED &&
errnoCpy != SOCK_ECONNRESET ) {
ca_printf ( "CAC: error sending registration message to CA repeater daemon was \"%s\"\n",
SOCKERRSTR ( errnoCpy ) );
}
}
}
/*
* caStartRepeaterIfNotInstalled ()
*
* Test for the repeater already installed
*
* NOTE: potential race condition here can result
* in two copies of the repeater being spawned
* however the repeater detects this, prints a message,
* and lets the other task start the repeater.
*
* QUESTION: is there a better way to test for a port in use?
* ANSWER: none that I can find.
*
* Problems with checking for the repeater installed
* by attempting to bind a socket to its address
* and port.
*
* 1) Closed socket may not release the bound port
* before the repeater wakes up and tries to grab it.
* Attempting to bind the open socket to another port
* also does not work.
*
* 072392 - problem solved by using SO_REUSEADDR
*/
epicsShareFunc void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repeaterPort )
{
bool installed = false;
int status;
SOCKET tmpSock;
struct sockaddr_in bd;
int flag;
if ( repeaterPort > 0xffff ) {
ca_printf ( "caStartRepeaterIfNotInstalled () : strange repeater port specified\n");
return;
}
tmpSock = socket ( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
if ( tmpSock != INVALID_SOCKET ) {
ca_uint16_t port = static_cast < ca_uint16_t > ( repeaterPort );
memset ( (char *) &bd, 0, sizeof ( bd ) );
bd.sin_family = AF_INET;
bd.sin_addr.s_addr = htonl ( INADDR_ANY );
bd.sin_port = htons ( port );
status = bind ( tmpSock, (struct sockaddr *) &bd, sizeof ( bd ) );
if ( status < 0 ) {
if ( SOCKERRNO == SOCK_EADDRINUSE ) {
installed = true;
}
else {
ca_printf ( "caStartRepeaterIfNotInstalled () : bind failed\n");
}
}
}
/*
* turn on reuse only after the test so that
* this works on kernels that support multicast
*/
flag = true;
status = setsockopt ( tmpSock, SOL_SOCKET, SO_REUSEADDR,
(char *) &flag, sizeof ( flag ) );
if ( status < 0 ) {
ca_printf ( "caStartRepeaterIfNotInstalled () : set socket option reuseaddr set failed\n");
}
socket_close ( tmpSock );
if ( ! installed ) {
/*
* This is not called if the repeater is known to be
* already running. (in the event of a race condition
* the 2nd repeater exits when unable to attach to the
* repeater's port)
*/
osiSpawnDetachedProcessReturn osptr =
osiSpawnDetachedProcess ( "CA Repeater", "caRepeater" );
if ( osptr == osiSpawnDetachedProcessNoSupport ) {
epicsThreadId tid;
tid = epicsThreadCreate ( "CAC-repeater", epicsThreadPriorityLow,
epicsThreadGetStackSize (epicsThreadStackMedium), caRepeaterThread, 0);
if ( tid == 0 ) {
ca_printf ("caStartRepeaterIfNotInstalled : unable to create CA repeater daemon thread\n");
}
}
else if ( osptr == osiSpawnDetachedProcessFail ) {
ca_printf ( "caStartRepeaterIfNotInstalled (): unable to start CA repeater daemon detached process\n" );
}
}
}
void udpiiu::shutdown ()
{
{
epicsAutoMutex autoMutex ( this->mutex );
if ( this->shutdownCmd ) {
return;
}
this->shutdownCmd = true;
}
caHdr msg;
msg.m_cmmd = htons ( CA_PROTO_NOOP );
msg.m_available = htonl ( 0u );
msg.m_dataType = htons ( 0u );
msg.m_count = htons ( 0u );
msg.m_cid = htonl ( 0u );
msg.m_postsize = htons ( 0u );
osiSockAddr addr;
addr.ia.sin_family = AF_INET;
addr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
addr.ia.sin_port = htons ( this->localPort );
// send a wakeup msg so the UDP recv thread will exit
int status = sendto ( this->sock, reinterpret_cast < const char * > ( &msg ),
sizeof (msg), 0, &addr.sa, sizeof ( addr.sa ) );
if ( status < 0 ) {
// this knocks the UDP input thread out of recv ()
// on all os except linux
status = socket_close ( this->sock );
if ( status == 0 ) {
this->sockCloseCompleted = true;
}
else {
errlogPrintf ("CAC UDP socket close error was %s\n",
SOCKERRSTR ( SOCKERRNO ) );
}
}
// wait for recv threads to exit
epicsEventMustWait ( this->recvThreadExitSignal );
}
bool udpiiu::badUDPRespAction ( const caHdr &msg, const osiSockAddr &netAddr )
{
ca_printf ( "CAC: undecipherable ( bad msg code %u ) UDP message\n",
msg.m_cmmd );
return false;
}
bool udpiiu::noopAction ( const caHdr &, const osiSockAddr & )
{
return true;
}
bool udpiiu::searchRespAction ( const caHdr &msg, const osiSockAddr &addr )
{
osiSockAddr serverAddr;
unsigned minorVersion;
ca_uint16_t *pMinorVersion;
if ( addr.sa.sa_family != AF_INET ) {
return;
return false;
}
/*
@@ -573,7 +535,7 @@ void udpiiu::searchRespAction ( const caHdr &msg, const osiSockAddr &addr )
* so that we can have multiple servers on one host
*/
serverAddr.ia.sin_family = AF_INET;
if ( CA_V48 (CA_PROTOCOL_VERSION,minorVersion) ) {
if ( CA_V48 ( CA_PROTOCOL_VERSION,minorVersion ) ) {
if ( msg.m_cid != INADDR_BROADCAST ) {
/*
* Leave address in network byte order (m_cid has not been
@@ -596,23 +558,23 @@ void udpiiu::searchRespAction ( const caHdr &msg, const osiSockAddr &addr )
}
if ( CA_V42 ( CA_PROTOCOL_VERSION, minorVersion ) ) {
this->pCAC ()->lookupChannelAndTransferToTCP
return this->pCAC ()->lookupChannelAndTransferToTCP
( msg.m_available, msg.m_cid, USHRT_MAX,
0, minorVersion, serverAddr );
}
else {
this->pCAC ()->lookupChannelAndTransferToTCP
return this->pCAC ()->lookupChannelAndTransferToTCP
( msg.m_available, msg.m_cid, msg.m_dataType,
msg.m_count, minorVersion, serverAddr );
}
}
void udpiiu::beaconAction ( const caHdr &msg, const osiSockAddr &net_addr )
bool udpiiu::beaconAction ( const caHdr &msg, const osiSockAddr &net_addr )
{
struct sockaddr_in ina;
if ( net_addr.sa.sa_family != AF_INET ) {
return;
return false;
}
/*
@@ -644,20 +606,21 @@ void udpiiu::beaconAction ( const caHdr &msg, const osiSockAddr &net_addr )
this->pCAC ()->beaconNotify ( ina );
return;
return true;
}
void udpiiu::repeaterAckAction ( const caHdr &, const osiSockAddr &)
bool udpiiu::repeaterAckAction ( const caHdr &, const osiSockAddr &)
{
this->pCAC ()->repeaterSubscribeConfirmNotify ();
return true;
}
void udpiiu::notHereRespAction ( const caHdr &, const osiSockAddr &)
bool udpiiu::notHereRespAction ( const caHdr &, const osiSockAddr &)
{
return;
return true;
}
void udpiiu::exceptionRespAction ( const caHdr &msg, const osiSockAddr &net_addr )
bool udpiiu::exceptionRespAction ( const caHdr &msg, const osiSockAddr &net_addr )
{
const caHdr &reqMsg = * ( &msg + 1 );
char name[64];
@@ -674,14 +637,10 @@ void udpiiu::exceptionRespAction ( const caHdr &msg, const osiSockAddr &net_addr
ca_message ( htonl ( msg.m_available ) ), name );
}
return;
return true;
}
/*
* post_udp_msg ()
*/
int udpiiu::postMsg ( const osiSockAddr &net_addr,
void udpiiu::postMsg ( const osiSockAddr &net_addr,
char *pInBuf, unsigned long blockSize )
{
caHdr *pCurMsg;
@@ -690,10 +649,15 @@ int udpiiu::postMsg ( const osiSockAddr &net_addr,
unsigned long size;
if ( blockSize < sizeof ( *pCurMsg ) ) {
return ECA_TOLARGE;
char buf[64];
sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) );
ca_printf (
"%s: undecipherable (too small) UDP msg from %s ignored\n", __FILE__,
buf );
return;
}
pCurMsg = reinterpret_cast <caHdr *> ( pInBuf );
pCurMsg = reinterpret_cast < caHdr * > ( pInBuf );
/*
* fix endian of bytes
@@ -720,25 +684,35 @@ int udpiiu::postMsg ( const osiSockAddr &net_addr,
* dont allow msg body extending beyond frame boundary
*/
if ( size > blockSize ) {
return ECA_TOLARGE;
char buf[64];
sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) );
ca_printf (
"%s: undecipherable (payload too small) UDP msg from %s ignored\n", __FILE__,
buf );
return;
}
/*
* execute the response message
*/
pProtoStubUDP pStub;
if ( pCurMsg->m_cmmd >= NELEMENTS ( udpJumpTableCAC ) ) {
pStub = &udpiiu::badUDPRespAction;
}
else {
pProtoStubUDP pStub;
if ( pCurMsg->m_cmmd < NELEMENTS ( udpJumpTableCAC ) ) {
pStub = udpJumpTableCAC [pCurMsg->m_cmmd];
}
(this->*pStub) ( *pCurMsg, net_addr);
else {
pStub = &udpiiu::badUDPRespAction;
}
bool success = ( this->*pStub ) ( *pCurMsg, net_addr );
if ( ! success ) {
char buf[256];
sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) );
ca_printf ( "CAC: undecipherable UDP message from %s\n", buf );
return;
}
blockSize -= size;
pInBuf += size;;
}
return ECA_NORMAL;
}
/*
@@ -767,7 +741,7 @@ bool udpiiu::pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t e
pbufmsg = (caHdr *) &this->xmitBuf[this->nBytesInXmitBuf];
*pbufmsg = msg;
memcpy ( pbufmsg+1, pExt, extsize );
memcpy ( pbufmsg + 1, pExt, extsize );
if ( extsize != allignedExtSize ) {
char *pDest = (char *) ( pbufmsg + 1 );
memset ( pDest + extsize, '\0', allignedExtSize - extsize );