diff --git a/src/ca/iocinf.h b/src/ca/iocinf.h index c4612b612..75d7921c2 100644 --- a/src/ca/iocinf.h +++ b/src/ca/iocinf.h @@ -69,18 +69,12 @@ #include "caProto.h" #include "net_convert.h" -#ifndef FALSE -# define FALSE 0 -#elif FALSE -# error FALSE isnt boolean false +#ifdef DEBUG +# define debugPrintf(argsInParen) printf argsInParen +#else +# define debugPrintf(argsInParen) #endif - -#ifndef TRUE -# define TRUE 1 -#elif !TRUE -# error TRUE isnt boolean true -#endif - + #ifndef NELEMENTS # define NELEMENTS(array) (sizeof(array)/sizeof((array)[0])) #endif @@ -89,14 +83,6 @@ # define LOCAL static #endif -#ifndef min -# define min(A,B) ((A)>(B)?(B):(A)) -#endif - -#ifndef max -# define max(A,B) ((A)<(B)?(B):(A)) -#endif - #define MSEC_PER_SEC 1000L #define USEC_PER_SEC 1000000L @@ -126,7 +112,7 @@ public: unsigned unoccupiedBytes () const; unsigned occupiedBytes () const; void compress (); - static unsigned maxBytes (); + static unsigned capacityBytes (); unsigned copyInBytes ( const void *pBuf, unsigned nBytes ); unsigned copyIn ( comBuf & ); unsigned copyIn ( const epicsInt8 *pValue, unsigned nElem ); @@ -154,6 +140,7 @@ private: unsigned char buf [ comBufSize ]; // optimal for 100 Mb Ethernet LAN MTU unsigned clipNElem ( unsigned elemSize, unsigned nElem ); static tsFreeList < class comBuf, 0x20 > freeList; + static epicsMutex freeListMutex; }; struct msgDescriptor { @@ -209,6 +196,8 @@ private: static const copyFunc_t dbrCopyVector [39]; }; +static const unsigned maxBytesPendingTCP = 0x4000; + class comQueRecv { public: comQueRecv (); @@ -247,9 +236,9 @@ class baseNMIU; // class tcpiiuPrivateListOfIO { private: + tsDLList < class baseNMIU > eventq; friend tcpiiu; friend netiiu; // used to install subscriptions when not connected - tsDLList < class baseNMIU > eventq; }; class nciu : public cacChannelIO, public tsDLNode < nciu >, @@ -258,37 +247,13 @@ public: nciu ( class cac &, netiiu &, cacChannelNotify &, const char *pNameIn ); bool fullyConstructed () const; - void destroy (); void connect ( unsigned nativeType, unsigned long nativeCount, unsigned sid ); void connect (); void disconnect ( netiiu &newiiu ); - int createChannelRequest (); - void initiateConnect (); - int read ( unsigned type, - unsigned long count, void *pValue ); - int read ( unsigned type, - unsigned long count, cacNotify ¬ify ); - int write ( unsigned type, - unsigned long count, const void *pValue ); - int write ( unsigned type, - unsigned long count, const void *pValue, cacNotify & ); - int subscribe ( unsigned type, unsigned long nElem, - unsigned mask, cacNotify ¬ify, - cacNotifyIO *&pNotifyIO ); - void hostName ( char *pBuf, unsigned bufLength ) const; - bool ca_v42_ok () const; - short nativeType () const; - unsigned long nativeElementCount () const; - channel_state state () const; - caar accessRights () const; - const char *pName () const; - unsigned nameLen () const; - unsigned searchAttempts () const; - double beaconPeriod () const; - bool connected () const; bool searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel ); + int createChannelRequest (); bool isAttachedToVirtaulCircuit ( const osiSockAddr & ); bool identifierEquivelence ( unsigned idToMatch ); void * operator new ( size_t size ); @@ -299,20 +264,20 @@ public: ca_uint32_t getSID () const; ca_uint32_t getCID () const; netiiu * getPIIU (); - cac & getCAC (); void searchReplySetUp ( netiiu &iiu, unsigned sidIn, unsigned typeIn, unsigned long countIn ); void show ( unsigned level ) const; bool verifyIIU ( netiiu & ); bool verifyConnected ( netiiu & ); - void decrementOutstandingIO ( unsigned seqNumber ); - void incrementOutstandingIO ( unsigned seqNumber ); void connectTimeoutNotify (); + unsigned long nativeElementCount () const; + const char *pName () const; + unsigned nameLen () const; protected: ~nciu (); // force pool allocation private: cac &cacCtx; - caar ar; // access rights + caar accessRightState; unsigned count; char *pNameStr; netiiu *piiu; @@ -327,71 +292,73 @@ private: unsigned f_claimSent:1; unsigned f_firstConnectDecrementsOutstandingIO:1; unsigned f_connectTimeOutSeen:1; - void lock () const; - void unlock () const; + void initiateConnect (); + int read ( unsigned type, + unsigned long count, cacNotify ¬ify ); + int write ( unsigned type, + unsigned long count, const void *pValue ); + int write ( unsigned type, + unsigned long count, const void *pValue, cacNotify & ); + int subscribe ( unsigned type, unsigned long nElem, + unsigned mask, cacNotify ¬ify, + cacNotifyIO *&pNotifyIO ); + void hostName ( char *pBuf, unsigned bufLength ) const; + bool ca_v42_ok () const; + short nativeType () const; + channel_state state () const; + caar accessRights () const; + unsigned searchAttempts () const; + double beaconPeriod () const; + bool connected () const; const char * pHostName () const; // deprecated - please do not use void notifyStateChangeFirstConnectInCountOfOutstandingIO (); static tsFreeList < class nciu, 1024 > freeList; + static epicsMutex freeListMutex; }; -class baseNMIU : public tsDLNode < baseNMIU >, +class baseNMIU : public cacNotifyIO, public tsDLNode < baseNMIU >, public chronIntIdRes < baseNMIU > { public: - baseNMIU ( nciu &chan ); + baseNMIU ( cacNotify ¬ifyIn, nciu &chan ); virtual ~baseNMIU () = 0; - virtual void completionNotify () = 0; - virtual void completionNotify ( unsigned type, - unsigned long count, const void *pData ) = 0; - virtual void exceptionNotify ( int status, - const char *pContext ) = 0; - virtual void exceptionNotify ( int status, - const char *pContext, unsigned type, - unsigned long count ) = 0; virtual class netSubscription * isSubscription (); - virtual void show ( unsigned level ) const; + virtual void ioCancelRequest (); + void show ( unsigned level ) const; ca_uint32_t getID () const; nciu & channel () const; - void destroy (); + cacChannelIO & channelIO () const; + void cancel (); protected: nciu &chan; }; -class netSubscription : public cacNotifyIO, public baseNMIU { +class netSubscription : public baseNMIU { public: netSubscription ( nciu &chan, unsigned type, unsigned long count, unsigned mask, cacNotify ¬ify ); - void destroy (); void show ( unsigned level ) const; unsigned long getCount () const; unsigned getType () const; unsigned getMask () const; void * operator new ( size_t size ); void operator delete ( void *pCadaver, size_t size ); - protected: ~netSubscription (); - private: - unsigned long count; - unsigned type; - unsigned mask; - - void completionNotify (); - void completionNotify ( unsigned type, - unsigned long count, const void *pData ); - void exceptionNotify ( int status, - const char *pContext ); - void exceptionNotify ( int status, - const char *pContext, unsigned type, unsigned long count ); - cacChannelIO & channelIO () const; + const unsigned long count; + const unsigned type; + const unsigned mask; class netSubscription * isSubscription (); + void ioCancelRequest (); static tsFreeList < class netSubscription, 1024 > freeList; + static epicsMutex freeListMutex; }; +#if 0 class netReadCopyIO : public baseNMIU { public: netReadCopyIO ( nciu &chan, unsigned type, unsigned long count, - void *pValue, unsigned seqNumber ); + void *pValue, unsigned seqNumber, cacNotify ¬ifyIn ); void show ( unsigned level ) const; void * operator new ( size_t size ); void operator delete ( void *pCadaver, size_t size ); @@ -402,54 +369,35 @@ private: unsigned long count; void *pValue; unsigned seqNumber; - void completionNotify ( ); - void completionNotify ( unsigned type, - unsigned long count, const void *pData ); - void exceptionNotify ( int status, const char *pContext ); - void exceptionNotify ( int status, - const char *pContext, unsigned type, unsigned long count ); - cacChannelIO & channelIO () const; static tsFreeList < class netReadCopyIO, 1024 > freeList; + static epicsMutex freeListMutex; }; +#endif -class netReadNotifyIO : public cacNotifyIO, public baseNMIU { +class netReadNotifyIO : public baseNMIU { public: netReadNotifyIO ( nciu &chan, cacNotify ¬ify ); - void destroy (); void show ( unsigned level ) const; void * operator new ( size_t size ); void operator delete ( void *pCadaver, size_t size ); protected: ~netReadNotifyIO (); private: - void completionNotify (); - void completionNotify ( unsigned type, - unsigned long count, const void *pData ); - void exceptionNotify ( int status, const char *pContext ); - void exceptionNotify ( int status, - const char *pContext, unsigned type, unsigned long count ); - cacChannelIO & channelIO () const; static tsFreeList < class netReadNotifyIO, 1024 > freeList; + static epicsMutex freeListMutex; }; -class netWriteNotifyIO : public cacNotifyIO, public baseNMIU { +class netWriteNotifyIO : public baseNMIU { public: netWriteNotifyIO ( nciu &chan, cacNotify ¬ify ); - void destroy (); void show ( unsigned level ) const; void * operator new ( size_t size ); void operator delete ( void *pCadaver, size_t size ); protected: ~netWriteNotifyIO (); private: - void completionNotify (); - void completionNotify ( unsigned type, - unsigned long count, const void *pData ); - void exceptionNotify ( int status, const char *pContext ); - void exceptionNotify ( int status, - const char *pContext, unsigned type, unsigned long count ); - cacChannelIO & channelIO () const; static tsFreeList < class netWriteNotifyIO, 1024 > freeList; + static epicsMutex freeListMutex; }; /* @@ -466,10 +414,10 @@ private: #define CA_RECAST_PERIOD 5.0 /* quiescent search period (sec) */ #if defined (CLOCKS_PER_SEC) -#define CAC_SIGNIFICANT_SELECT_DELAY ( 1.0 / CLOCKS_PER_SEC ) +# define CAC_SIGNIFICANT_SELECT_DELAY ( 1.0 / CLOCKS_PER_SEC ) #else /* on sunos4 GNU does not provide CLOCKS_PER_SEC */ -#define CAC_SIGNIFICANT_SELECT_DELAY (1.0 / 1000000u) +# define CAC_SIGNIFICANT_SELECT_DELAY (1.0 / 1000000u) #endif /* @@ -498,8 +446,6 @@ static const unsigned contiguousMsgCountWhichTriggersFlowControl = 10u; enum iiu_conn_state {iiu_connecting, iiu_connected, iiu_disconnected}; -extern epicsThreadPrivateId cacRecursionLock; - class netiiu { public: netiiu ( class cac * ); @@ -521,7 +467,6 @@ public: virtual bool pushDatagramMsg ( const caHdr &hdr, const void *pExt, ca_uint16_t extsize); virtual int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue); virtual int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue ); - virtual int readCopyRequest ( nciu &, unsigned type, unsigned nElem, void *pValue ); virtual int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem ); virtual int createChannelRequest ( nciu & ); virtual void connectAllIO ( nciu &chan ); @@ -531,15 +476,12 @@ public: virtual void subscriptionCancelRequest ( netSubscription &subscr, bool userThread ); virtual double beaconPeriod () const; virtual bool destroyAllIO ( nciu &chan ); - protected: cac * pCAC () const; mutable epicsMutex mutex; - private: tsDLList < nciu > channelList; class cac *pClientCtx; - virtual void lastChannelDetachNotify (); virtual int subscriptionRequest ( netSubscription &subscr, bool userThread ); }; @@ -553,22 +495,14 @@ extern limboiiu limboIIU; class udpiiu; -class searchTimer : private osiTimer, private epicsMutex { - +class searchTimer : private osiTimer { public: searchTimer ( udpiiu &iiu, osiTimerQueue &queue ); void notifySearchResponse ( unsigned short retrySeqNo ); void resetPeriod ( double delayToNextTry ); void show ( unsigned level ) const; private: - void expire (); - void destroy (); - bool again () const; - double delay () const; - const char *name () const; - - void setRetryInterval (unsigned retryNo); - + epicsMutex mutex; udpiiu &iiu; unsigned framesPerTry; /* # of UDP frames per search try */ unsigned framesPerTryCongestThresh; /* one half N tries w congest */ @@ -579,6 +513,12 @@ private: unsigned short retrySeqNo; /* search retry seq number */ unsigned short retrySeqAtPassBegin; /* search retry seq number at beg of pass through list */ double period; /* period between tries */ + void expire (); + void destroy (); + bool again () const; + double delay () const; + const char *name () const; + void setRetryInterval (unsigned retryNo); }; class repeaterSubscribeTimer : private osiTimer { @@ -611,7 +551,7 @@ public: virtual ~udpiiu (); void shutdown (); void recvMsg (); - int postMsg ( const osiSockAddr &net_addr, + void postMsg ( const osiSockAddr &net_addr, char *pInBuf, unsigned long blockSize ); void repeaterRegistrationMessage ( unsigned attemptNumber ); void flush (); @@ -640,19 +580,19 @@ private: friend void cacRecvThreadUDP ( void *pParam ); - typedef void (udpiiu::*pProtoStubUDP) ( const caHdr &, const osiSockAddr & ); + typedef bool (udpiiu::*pProtoStubUDP) ( const caHdr &, const osiSockAddr & ); // UDP protocol dispatch table static const pProtoStubUDP udpJumpTableCAC[]; // UDP protocol stubs - void noopAction ( const caHdr &, const osiSockAddr & ); - void badUDPRespAction ( const caHdr &msg, const osiSockAddr &netAddr ); - void searchRespAction ( const caHdr &msg, const osiSockAddr &net_addr ); - void exceptionRespAction ( const caHdr &msg, const osiSockAddr &net_addr ); - void beaconAction ( const caHdr &msg, const osiSockAddr &net_addr ); - void notHereRespAction ( const caHdr &msg, const osiSockAddr &net_addr ); - void repeaterAckAction ( const caHdr &msg, const osiSockAddr &net_addr ); + bool noopAction ( const caHdr &, const osiSockAddr & ); + bool badUDPRespAction ( const caHdr &msg, const osiSockAddr &netAddr ); + bool searchRespAction ( const caHdr &msg, const osiSockAddr &net_addr ); + bool exceptionRespAction ( const caHdr &msg, const osiSockAddr &net_addr ); + bool beaconAction ( const caHdr &msg, const osiSockAddr &net_addr ); + bool notHereRespAction ( const caHdr &msg, const osiSockAddr &net_addr ); + bool repeaterAckAction ( const caHdr &msg, const osiSockAddr &net_addr ); }; class tcpRecvWatchdog : private osiTimer { @@ -712,6 +652,7 @@ private: cac &cacRef; static tsFreeList < class msgForMultiplyDefinedPV, 16 > freeList; + static epicsMutex freeListMutex; }; class hostNameCache : public ipAddrToAsciiAsynchronous { @@ -728,6 +669,7 @@ private: bool ioComplete; char hostNameBuf [128]; static tsFreeList < class hostNameCache, 16 > freeList; + static epicsMutex freeListMutex; }; extern "C" void cacSendThreadTCP ( void *pParam ); @@ -759,12 +701,6 @@ public: bool ca_v41_ok () const; bool ca_v42_ok () const; bool ca_v44_ok () const; - int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue ); - int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue ); - int readCopyRequest ( nciu &, unsigned type, unsigned nElem, void *pValue ); - int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem ); - int createChannelRequest ( nciu & ); - int clearChannelRequest ( nciu & ); void hostName ( char *pBuf, unsigned bufLength ) const; const char * pHostName () const; // deprecated - please do not use @@ -784,7 +720,7 @@ private: caHdr curMsg; unsigned long curDataMax; class bhe *pBHE; - void *pCurData; + char *pCurData; unsigned minorProtocolVersion; iiu_conn_state state; epicsEventId sendThreadFlushSignal; @@ -799,6 +735,7 @@ private: bool echoRequestPending; bool msgHeaderAvailable; bool sockCloseCompleted; + bool fdRegCallbackNeeded; unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf ); unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf ); @@ -809,7 +746,6 @@ private: friend void cacRecvThreadTCP ( void *pParam ); void lastChannelDetachNotify (); - int requestStubStatus (); // send protocol stubs int echoRequest (); @@ -818,37 +754,39 @@ private: int enableFlowControlRequest (); int hostNameSetRequest (); int userNameSetRequest (); + int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue ); + int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue ); + int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem ); + int createChannelRequest ( nciu & ); + int clearChannelRequest ( nciu & ); // recv protocol stubs - void noopAction (); - void echoRespAction (); - void writeNotifyRespAction (); - void readNotifyRespAction (); - void eventRespAction (); - void readRespAction (); - void clearChannelRespAction (); - void exceptionRespAction (); - void accessRightsRespAction (); - void claimCIURespAction (); - void verifyAndDisconnectChan (); - void badTCPRespAction (); + bool noopAction (); + bool echoRespAction (); + bool writeNotifyRespAction (); + bool readNotifyRespAction (); + bool eventRespAction (); + bool readRespAction (); + bool clearChannelRespAction (); + bool exceptionRespAction (); + bool accessRightsRespAction (); + bool claimCIURespAction (); + bool verifyAndDisconnectChan (); + bool badTCPRespAction (); // IO management routines - //void ioInstall ( baseNMIU &io ); - //void ioUninstall ( unsigned id ); - //void ioDestroy ( unsigned id ); - void ioCompletionNotify ( unsigned id, unsigned type, + bool ioCompletionNotify ( unsigned id, unsigned type, unsigned long count, const void *pData ); - void ioExceptionNotify ( unsigned id, + bool ioExceptionNotify ( unsigned id, int status, const char *pContext ); - void ioExceptionNotify ( unsigned id, int status, + bool ioExceptionNotify ( unsigned id, int status, const char *pContext, unsigned type, unsigned long count ); - void ioCompletionNotifyAndDestroy ( unsigned id ); - void ioCompletionNotifyAndDestroy ( unsigned id, + bool ioCompletionNotifyAndDestroy ( unsigned id ); + bool ioCompletionNotifyAndDestroy ( unsigned id, unsigned type, unsigned long count, const void *pData ); - void ioExceptionNotifyAndDestroy ( unsigned id, + bool ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext ); - void ioExceptionNotifyAndDestroy ( unsigned id, + bool ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext, unsigned type, unsigned long count ); void connectAllIO ( nciu &chan ); void disconnectAllIO ( nciu &chan ); @@ -858,7 +796,7 @@ private: int subscriptionRequest ( netSubscription &subscr, bool userThread ); void subscriptionCancelRequest ( netSubscription &subscr, bool userThread ); - typedef void ( tcpiiu::*pProtoStubTCP ) (); + typedef bool ( tcpiiu::*pProtoStubTCP ) (); static const pProtoStubTCP tcpJumpTableCAC []; }; @@ -870,7 +808,7 @@ public: static unsigned maxIndexBitWidth (); static unsigned minIndexBitWidth (); private: - struct sockaddr_in addr; + const struct sockaddr_in addr; }; class bhe : public tsSLNode < bhe >, public inetAddrID { @@ -891,6 +829,7 @@ private: epicsTime timeStamp; double averagePeriod; static tsFreeList < class bhe, 1024 > freeList; + static epicsMutex freeListMutex; }; class caErrorCode { @@ -969,6 +908,7 @@ private: void exceptionNotify ( cacChannelIO &, int status, const char *pContext, unsigned type, unsigned long count ); static tsFreeList < class syncGroupNotify, 1024 > freeList; + static epicsMutex freeListMutex; }; /* @@ -985,37 +925,33 @@ public: void show ( unsigned level ) const; int get ( chid pChan, unsigned type, unsigned long count, void *pValue ); int put ( chid pChan, unsigned type, unsigned long count, const void *pValue ); - - void * operator new (size_t size); - void operator delete (void *pCadaver, size_t size); - + void * operator new ( size_t size ); + void operator delete ( void *pCadaver, size_t size ); private: - cac &client; - unsigned magic; + epicsMutex mutable mutex; + epicsEvent sem; + tsDLList < syncGroupNotify > ioList; unsigned long opPendCount; unsigned long seqNo; - epicsEvent sem; - epicsMutex mutex; - tsDLList ioList; - - static tsFreeList < struct CASG, 128 > freeList; - + cac &client; + unsigned magic; ~CASG (); + static tsFreeList < struct CASG, 128 > freeList; + static epicsMutex freeListMutex; friend class syncGroupNotify; }; -class ioCounter { +class ioCounterNet { public: - ioCounter (); - void decrementOutstandingIO (); - void decrementOutstandingIO ( unsigned seqNumber ); - void incrementOutstandingIO (); - void incrementOutstandingIO ( unsigned seqNumber ); - unsigned readSequenceOfOutstandingIO () const; - unsigned currentOutstandingIOCount () const; - void cleanUpOutstandingIO (); - void showOutstandingIO ( unsigned level ) const; - void waitForCompletionOfIO ( double delaySec ); + ioCounterNet (); + void increment (); + void decrement (); + void decrement ( unsigned seqNumber ); + unsigned sequenceNumber () const; + unsigned currentCount () const; + void cleanUp (); + void show ( unsigned level ) const; + void waitForCompletion ( double delaySec ); private: unsigned pndrecvcnt; unsigned readSeq; @@ -1040,15 +976,11 @@ private: // w/o taking the defaultMutex in this class first // // -class cac : public caClient, public nciuPrivate, - public ioCounter +class cac : public caClient, public nciuPrivate { public: cac ( bool enablePreemptiveCallback = false ); virtual ~cac (); - void flush (); - int pend ( double timeout, int early ); - unsigned getInitializingThreadsPriority () const; // beacon management void beaconNotify ( const inetAddrID &addr ); @@ -1058,9 +990,19 @@ public: void signalRecvActivity (); void processRecvBacklog (); - // IO management routines + // outstanding IO count management routines + void incrementOutstandingIO (); + void decrementOutstandingIO (); + void decrementOutstandingIO ( unsigned sequenceNo ); + unsigned sequenceNumberOfOutstandingIO () const; bool ioComplete () const; + // IO management + void flush (); + bool flushPermit () const; + int pendIO ( const double &timeout ); + int pendEvent ( const double &timeout ); + // exception routines void exceptionNotify ( int status, const char *pContext, const char *pFileName, unsigned lineNo ); @@ -1071,29 +1013,31 @@ public: void genLocalExcepWFL ( long stat, const char *ctx, const char *pFile, unsigned lineNo ); // channel routines - void connectChannel ( unsigned id ); - void connectChannel ( bool v44Ok, unsigned id, + bool connectChannel ( unsigned id ); + bool connectChannel ( bool v44Ok, unsigned id, unsigned nativeType, unsigned long nativeCount, unsigned sid ); void disconnectChannel ( unsigned id ); - cacChannelIO * createChannelIO ( const char *name_str, cacChannelNotify &chan ); void installNetworkChannel ( nciu &, netiiu *&piiu ); - void lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, + bool lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, unsigned typeCode, unsigned long count, unsigned minorVersionNumber, const osiSockAddr & ); void accessRightsNotify ( unsigned id, const caar & ); void uninstallChannel ( nciu & ); + cacChannelIO * createChannelIO ( const char *name_str, cacChannelNotify &chan ); + void registerService ( cacServiceIO &service ); // sync group routines CASG * lookupCASG ( unsigned id ); void installCASG ( CASG & ); void uninstallCASG ( CASG & ); - void registerService ( cacServiceIO &service ); + // fd call back registration void registerForFileDescriptorCallBack ( CAFDHANDLER *pFunc, void *pArg ); + void getFDRegCallback ( CAFDHANDLER *&fdRegFunc, void *&fdRegArg ) const; + + // callback preemption control void enableCallbackPreemption (); void disableCallbackPreemption (); - bool flushPermit () const; - const char * userNamePointer () const; // diagnostics unsigned connectionCount () const; @@ -1103,9 +1047,12 @@ public: void replaceErrLogHandler ( caPrintfFunc *ca_printf_func ); void ipAddrToAsciiAsynchronousRequestInstall ( ipAddrToAsciiAsynchronous & request ); - void getFDRegCallback ( CAFDHANDLER *&fdRegFunc, void *&fdRegArg ) const; + // misc + const char * userNamePointer () const; + unsigned getInitializingThreadsPriority () const; private: + ioCounterNet ioCounter; ipAddrToAsciiEngine ipToAEngine; cacServiceList services; tsDLList iiuList; @@ -1116,7 +1063,7 @@ private: < CASG > sgTable; resTable < bhe, inetAddrID > beaconTable; - epicsTime programBeginTime; + epicsTime programBeginTime; double connTMO; // defaultMutex can be applied if iiuListMutex is already applied mutable epicsMutex defaultMutex; @@ -1135,23 +1082,21 @@ private: repeaterSubscribeTimer *pRepeaterSubscribeTmr; unsigned initializingThreadsPriority; bool enablePreemptiveCallback; - - int pendPrivate ( double timeout, int early ); bool setupUDP (); }; /* * CA internal functions */ -int ca_printf (const char *pformat, ...); -int ca_vPrintf (const char *pformat, va_list args); -epicsShareFunc void epicsShareAPI ca_repeater (void); +int ca_printf ( const char *pformat, ... ); +int ca_vPrintf ( const char *pformat, va_list args ); +epicsShareFunc void epicsShareAPI ca_repeater ( void ); #define genLocalExcep( CAC, STAT, PCTX ) \ (CAC).genLocalExcepWFL ( STAT, PCTX, __FILE__, __LINE__ ) -int fetchClientContext (cac **ppcac); -extern "C" void caRepeaterThread (void *pDummy); -extern "C" void ca_default_exception_handler (struct exception_handler_args args); +int fetchClientContext ( cac **ppcac ); +extern "C" void caRepeaterThread ( void *pDummy ); +extern "C" void ca_default_exception_handler ( struct exception_handler_args args ); -#endif /* this must be the last line in this file */ +#endif // ifdef INCiocinfh diff --git a/src/ca/nciu.cpp b/src/ca/nciu.cpp index 2d7711056..8efed0ce9 100644 --- a/src/ca/nciu.cpp +++ b/src/ca/nciu.cpp @@ -22,14 +22,13 @@ #include "iocinf.h" #include "nciu_IL.h" -#include "netReadCopyIO_IL.h" #include "netReadNotifyIO_IL.h" #include "netWriteNotifyIO_IL.h" #include "netSubscription_IL.h" #include "cac_IL.h" -#include "ioCounter_IL.h" tsFreeList < class nciu, 1024 > nciu::freeList; +epicsMutex nciu::freeListMutex; static const caar defaultAccessRights = { false, false }; @@ -37,7 +36,7 @@ nciu::nciu ( cac &cacIn, netiiu &iiuIn, cacChannelNotify &chanIn, const char *pNameIn ) : cacChannelIO ( chanIn ), cacCtx ( cacIn ), - ar ( defaultAccessRights ), + accessRightState ( defaultAccessRights ), count ( 0 ), piiu ( &iiuIn ), sid ( UINT_MAX ), @@ -65,8 +64,12 @@ nciu::nciu ( cac &cacIn, netiiu &iiuIn, cacChannelNotify &chanIn, strcpy ( this->pNameStr, pNameIn ); } -void nciu::destroy () +nciu::~nciu () { + if ( ! this->fullyConstructed () ) { + return; + } + // care is taken so that a lock is not applied during this phase unsigned i = 0u; while ( ! this->piiu->destroyAllIO ( *this ) ) { @@ -78,14 +81,6 @@ void nciu::destroy () } this->piiu->clearChannelRequest ( *this ); this->cacCtx.uninstallChannel ( *this ); - delete this; -} - -nciu::~nciu () -{ - if ( ! this->fullyConstructed () ) { - return; - } if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) { if ( this->f_firstConnectDecrementsOutstandingIO ) { @@ -107,7 +102,7 @@ int nciu::read ( unsigned type, unsigned long countIn, cacNotify ¬ify ) if ( INVALID_DB_REQ (type) ) { return ECA_BADTYPE; } - if ( ! this->ar.read_access ) { + if ( ! this->accessRightState.read_access ) { return ECA_NORDACCESS; } if ( countIn > UINT_MAX ) { @@ -120,31 +115,6 @@ int nciu::read ( unsigned type, unsigned long countIn, cacNotify ¬ify ) return this->piiu->readNotifyRequest ( *this, notify, type, countIn ); } -int nciu::read ( unsigned type, unsigned long countIn, void *pValue ) -{ - /* - * fail out if channel isnt connected or arguments are - * otherwise invalid - */ - if ( ! this->f_connected ) { - return ECA_DISCONNCHID; - } - if ( INVALID_DB_REQ ( type ) ) { - return ECA_BADTYPE; - } - if ( ! this->ar.read_access ) { - return ECA_NORDACCESS; - } - if ( countIn > this->count || countIn > 0xffff ) { - return ECA_BADCOUNT; - } - if ( countIn == 0 ) { - countIn = this->count; - } - - return this->piiu->readCopyRequest ( *this, type, countIn, pValue ); -} - /* * check_a_dbr_string() */ @@ -170,7 +140,7 @@ int nciu::write ( unsigned type, unsigned long countIn, const void *pValue ) return ECA_DISCONNCHID; } - if ( ! this->ar.write_access ) { + if ( ! this->accessRightState.write_access ) { return ECA_NOWTACCESS; } @@ -195,7 +165,7 @@ int nciu::write ( unsigned type, unsigned long countIn, const void *pValue, cacN return ECA_DISCONNCHID; } - if ( ! this->ar.write_access ) { + if ( ! this->accessRightState.write_access ) { return ECA_NOWTACCESS; } @@ -222,53 +192,54 @@ void nciu::initiateConnect () void nciu::connect ( unsigned nativeType, unsigned long nativeCount, unsigned sidIn ) { - if ( ! this->f_claimSent ) { - ca_printf ( - "CAC: Ignored conn resp to chan lacking virtual circuit CID=%u SID=%u?\n", - this->getId (), sidIn ); - return; - } + bool v41Ok; + { + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - if ( this->f_connected ) { - ca_printf ( - "CAC: Ignored conn resp to conn chan CID=%u SID=%u?\n", - this->getId (), sidIn ); - return; - } + if ( ! this->f_claimSent ) { + ca_printf ( + "CAC: Ignored conn resp to chan lacking virtual circuit CID=%u SID=%u?\n", + this->getId (), sidIn ); + return; + } - this->lock (); + if ( this->f_connected ) { + ca_printf ( + "CAC: Ignored conn resp to conn chan CID=%u SID=%u?\n", + this->getId (), sidIn ); + return; + } - if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) { - if ( this->f_firstConnectDecrementsOutstandingIO ) { - this->cacCtx.decrementOutstandingIO (); + + if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) { + if ( this->f_firstConnectDecrementsOutstandingIO ) { + this->cacCtx.decrementOutstandingIO (); + } + } + + if ( this->piiu ) { + v41Ok = this->piiu->ca_v41_ok (); + } + else { + v41Ok = false; + } + this->typeCode = nativeType; + this->count = nativeCount; + this->sid = sidIn; + this->f_connected = true; + this->f_previousConn = true; + + /* + * if less than v4.1 then the server will never + * send access rights and we know that there + * will always be access + */ + if ( ! v41Ok ) { + this->accessRightState.read_access = true; + this->accessRightState.write_access = true; } } - bool v41Ok; - if ( this->piiu ) { - v41Ok = this->piiu->ca_v41_ok (); - } - else { - v41Ok = false; - } - this->typeCode = nativeType; - this->count = nativeCount; - this->sid = sidIn; - this->f_connected = true; - this->f_previousConn = true; - - /* - * if less than v4.1 then the server will never - * send access rights and we know that there - * will always be access - */ - if ( ! v41Ok ) { - this->ar.read_access = true; - this->ar.write_access = true; - } - - this->unlock (); - // resubscribe for monitors from this channel this->piiu->connectAllIO ( *this ); @@ -281,54 +252,51 @@ void nciu::connect ( unsigned nativeType, * their call back here */ if ( ! v41Ok ) { - this->notify ().accessRightsNotify ( *this, this->ar ); + this->notify ().accessRightsNotify ( *this, this->accessRightState ); } } void nciu::connectTimeoutNotify () { if ( ! this->f_connectTimeOutSeen ) { - this->lock (); + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); this->f_connectTimeOutSeen = true; - this->unlock (); } } void nciu::disconnect ( netiiu &newiiu ) { - char hostNameBuf[64]; - this->hostName ( hostNameBuf, sizeof ( hostNameBuf ) ); bool wasConnected; this->piiu->disconnectAllIO ( *this ); - this->lock (); + { + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - this->piiu = &newiiu; - this->retry = 0u; - this->typeCode = USHRT_MAX; - this->count = 0u; - this->sid = UINT_MAX; - this->ar.read_access = false; - this->ar.write_access = false; - this->f_claimSent = false; + this->piiu = &newiiu; + this->retry = 0u; + this->typeCode = USHRT_MAX; + this->count = 0u; + this->sid = UINT_MAX; + this->accessRightState.read_access = false; + this->accessRightState.write_access = false; + this->f_claimSent = false; - if ( this->f_connected ) { - wasConnected = true; + if ( this->f_connected ) { + wasConnected = true; + } + else { + wasConnected = false; + } + this->f_connected = false; } - else { - wasConnected = false; - } - this->f_connected = false; - - this->unlock (); if ( wasConnected ) { /* * look for events that have an event cancel in progress */ this->notify ().disconnectNotify ( *this ); - this->notify ().accessRightsNotify ( *this, this->ar ); + this->notify ().accessRightsNotify ( *this, this->accessRightState ); } this->resetRetryCount (); @@ -340,7 +308,7 @@ void nciu::disconnect ( netiiu &newiiu ) bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel ) { caHdr msg; - bool status; + bool success; msg.m_cmmd = htons ( CA_PROTO_SEARCH ); msg.m_available = this->getId (); @@ -348,26 +316,22 @@ bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisCh msg.m_count = htons ( CA_MINOR_VERSION ); msg.m_cid = this->getId (); - status = this->piiu->pushDatagramMsg ( msg, + success = this->piiu->pushDatagramMsg ( msg, this->pNameStr, this->nameLength ); - if ( status ) { + if ( success ) { // // increment the number of times we have tried // to find this channel // - this->lock (); - + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); if ( this->retry < MAXCONNTRIES ) { this->retry++; } - this->retrySeqNo = retrySeqNumber; retryNoForThisChannel = this->retry; + } - this->unlock (); - } - - return status; + return success; } void nciu::hostName ( char *pBuf, unsigned bufLength ) const @@ -378,39 +342,30 @@ void nciu::hostName ( char *pBuf, unsigned bufLength ) const // deprecated - please do not use, this is _not_ thread safe const char * nciu::pHostName () const { - this->lock (); - const char *pName = this->piiu->pHostName (); - this->unlock (); - return pName; // ouch ! + return this->piiu->pHostName (); // ouch ! } bool nciu::ca_v42_ok () const { - bool status; - - this->lock (); - if ( this->piiu ) { - status = this->piiu->ca_v42_ok (); - } - else { - status = false; - } - this->unlock (); - return status; + return this->piiu->ca_v42_ok (); } short nciu::nativeType () const { short type; - this->lock (); + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); if ( this->f_connected ) { - type = static_cast ( this->typeCode ); + if ( this->typeCode < SHRT_MAX ) { + type = static_cast ( this->typeCode ); + } + else { + type = TYPENOTCONN; + } } else { type = TYPENOTCONN; } - this->unlock (); return type; } @@ -418,25 +373,23 @@ short nciu::nativeType () const unsigned long nciu::nativeElementCount () const { unsigned long countOut; + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - this->lock (); if ( this->f_connected ) { countOut = this->count; } else { countOut = 0ul; } - this->unlock (); return countOut; } channel_state nciu::state () const { + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); channel_state stateOut; - this->lock (); - if ( this->f_connected ) { stateOut = cs_conn; } @@ -447,14 +400,14 @@ channel_state nciu::state () const stateOut = cs_never_conn; } - this->unlock (); - return stateOut; } caar nciu::accessRights () const { - return this->ar; + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); + caar tmp = this->accessRightState; + return tmp; } const char *nciu::pName () const @@ -481,6 +434,7 @@ int nciu::createChannelRequest () { int status = this->piiu->createChannelRequest ( *this ); if ( status == ECA_NORMAL ) { + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); this->f_claimSent = true; } return status; @@ -495,7 +449,7 @@ int nciu::subscribe ( unsigned type, unsigned long nElem, if ( pSubcr ) { int status = this->piiu->installSubscription ( *pSubcr ); if ( status != ECA_NORMAL ) { - pSubcr->destroy (); + delete static_cast < baseNMIU * > ( pSubcr ); } else { pNotifyIO = pSubcr; @@ -509,7 +463,7 @@ int nciu::subscribe ( unsigned type, unsigned long nElem, void nciu::notifyStateChangeFirstConnectInCountOfOutstandingIO () { - this->lock (); + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); // test is performed via a callback so that locking is correct if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) { if ( this->notify ().includeFirstConnectInCountOfOutstandingIO () ) { @@ -525,7 +479,6 @@ void nciu::notifyStateChangeFirstConnectInCountOfOutstandingIO () } } } - this->unlock (); } void nciu::show ( unsigned level ) const @@ -539,8 +492,8 @@ void nciu::show ( unsigned level ) const printf ( ", native type %s, native element count %u", dbf_type_to_text ( this->typeCode ), this->count ); printf ( ", %sread access, %swrite access", - this->ar.read_access ? "" : "no ", - this->ar.write_access ? "" : "no "); + this->accessRightState.read_access ? "" : "no ", + this->accessRightState.write_access ? "" : "no "); } printf ( "\n" ); } diff --git a/src/ca/nciu_IL.h b/src/ca/nciu_IL.h index c9a2c8ca8..aae46f2db 100644 --- a/src/ca/nciu_IL.h +++ b/src/ca/nciu_IL.h @@ -20,28 +20,18 @@ // nciu inline member functions // -#include "ioCounter_IL.h" - inline void * nciu::operator new ( size_t size ) { + epicsAutoMutex locker ( nciu::freeListMutex ); return nciu::freeList.allocate ( size ); } inline void nciu::operator delete ( void *pCadaver, size_t size ) { + epicsAutoMutex locker ( nciu::freeListMutex ); nciu::freeList.release ( pCadaver, size ); } -inline void nciu::lock () const -{ - this->cacCtx.nciuPrivate::mutex.lock (); -} - -inline void nciu::unlock () const -{ - this->cacCtx.nciuPrivate::mutex.unlock (); -} - inline bool nciu::fullyConstructed () const { return this->f_fullyConstructed; @@ -59,8 +49,11 @@ inline void nciu::resetRetryCount () inline void nciu::accessRightsStateChange ( const caar &arIn ) { - this->ar = arIn; - this->notify ().accessRightsNotify ( *this, this->ar ); + { + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); + this->accessRightState = arIn; + } + this->notify ().accessRightsNotify ( *this, arIn ); } inline ca_uint32_t nciu::getSID () const @@ -104,14 +97,13 @@ inline void nciu::connect () inline void nciu::searchReplySetUp ( netiiu &iiu, unsigned sidIn, unsigned typeIn, unsigned long countIn ) { - this->lock (); + epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); this->piiu = &iiu; this->typeCode = typeIn; this->count = countIn; this->sid = sidIn; - this->ar.read_access = true; - this->ar.write_access = true; - this->unlock (); + this->accessRightState.read_access = true; + this->accessRightState.write_access = true; } inline bool nciu::connected () const @@ -129,19 +121,4 @@ inline netiiu * nciu::getPIIU () return this->piiu; } -inline cac & nciu::getCAC () -{ - return this->cacCtx; -} - - -inline void nciu::incrementOutstandingIO ( unsigned seqNumber ) -{ - this->cacCtx.incrementOutstandingIO ( seqNumber ); -} - -inline void nciu::decrementOutstandingIO ( unsigned seqNumber ) -{ - this->cacCtx.decrementOutstandingIO ( seqNumber ); -} diff --git a/src/ca/netReadNotifyIO.cpp b/src/ca/netReadNotifyIO.cpp index f4d077976..5fe76f19c 100644 --- a/src/ca/netReadNotifyIO.cpp +++ b/src/ca/netReadNotifyIO.cpp @@ -16,50 +16,21 @@ #include "baseNMIU_IL.h" tsFreeList < class netReadNotifyIO, 1024 > netReadNotifyIO::freeList; +epicsMutex netReadNotifyIO::freeListMutex; netReadNotifyIO::netReadNotifyIO ( nciu &chan, cacNotify ¬ifyIn ) : - cacNotifyIO ( notifyIn ), baseNMIU ( chan ) {} + baseNMIU ( notifyIn, chan ) +{ +} netReadNotifyIO::~netReadNotifyIO () { } -void netReadNotifyIO::destroy () -{ - delete this; -} - -void netReadNotifyIO::completionNotify () -{ - this->notify ().exceptionNotify ( this->channelIO (), ECA_INTERNAL, "no data returned ?" ); -} - -void netReadNotifyIO::completionNotify ( unsigned type, - unsigned long count, const void *pData ) -{ - this->notify ().completionNotify ( this->channelIO (), type, count, pData ); -} - -void netReadNotifyIO::exceptionNotify ( int status, const char *pContext ) -{ - this->notify ().exceptionNotify ( this->channelIO (), status, pContext ); -} - -void netReadNotifyIO::exceptionNotify ( int status, const char *pContext, - unsigned type, unsigned long count ) -{ - this->notify ().exceptionNotify ( this->channelIO (), status, pContext, type ,count ); -} - -cacChannelIO & netReadNotifyIO::channelIO () const -{ - return this->channel (); -} - void netReadNotifyIO::show ( unsigned level ) const { printf ( "read notify IO at %p\n", - static_cast ( this ) ); + static_cast < const void * > ( this ) ); if ( level > 0u ) { this->baseNMIU::show ( level - 1u ); } diff --git a/src/ca/netSubscription.cpp b/src/ca/netSubscription.cpp index b26858c71..7139bf454 100644 --- a/src/ca/netSubscription.cpp +++ b/src/ca/netSubscription.cpp @@ -16,31 +16,17 @@ #include "baseNMIU_IL.h" tsFreeList < class netSubscription, 1024 > netSubscription::freeList; +epicsMutex netSubscription::freeListMutex; netSubscription::netSubscription ( nciu &chan, unsigned typeIn, unsigned long countIn, unsigned maskIn, cacNotify ¬ifyIn ) : - cacNotifyIO ( notifyIn ), baseNMIU ( chan ), + baseNMIU ( notifyIn, chan ), count ( countIn ), type ( typeIn ), mask ( maskIn ) { } netSubscription::~netSubscription () { - // netiiu lock must _not_ be applied when calling this - this->chan.getPIIU ()->subscriptionCancelRequest ( *this, true ); -} - -void netSubscription::destroy () -{ - unsigned i = 0u; - while ( ! this->chan.getPIIU ()->uninstallIO ( *this ) ) { - if ( i++ > 1000u ) { - this->chan.getCAC ().printf ( - "CAC: unable to destroy IO\n" ); - break; - } - } - delete this; } class netSubscription * netSubscription::isSubscription () @@ -48,38 +34,17 @@ class netSubscription * netSubscription::isSubscription () return this; } -void netSubscription::completionNotify () +void netSubscription::ioCancelRequest () { - this->notify ().completionNotify ( this->channel () ); -} - -void netSubscription::completionNotify ( unsigned typeIn, - unsigned long countIn, const void *pDataIn ) -{ - this->notify ().completionNotify ( this->channel (), typeIn, countIn, pDataIn ); -} - -void netSubscription::exceptionNotify ( int status, const char *pContext ) -{ - this->notify ().exceptionNotify ( this->channel (), status, pContext ); -} - -void netSubscription::exceptionNotify ( int statusIn, - const char *pContextIn, unsigned typeIn, unsigned long countIn ) -{ - this->notify ().exceptionNotify ( this->channel (), statusIn, - pContextIn, typeIn, countIn ); -} - -cacChannelIO & netSubscription::channelIO () const -{ - return this->channel (); + this->chan.getPIIU ()->subscriptionCancelRequest ( *this, true ); } void netSubscription::show ( unsigned level ) const { printf ( "event subscription IO at %p, type %s, element count %lu, mask %u\n", - this, dbf_type_to_text ( static_cast (this->type) ), this->count, this->mask ); + static_cast < const void * > ( this ), + dbf_type_to_text ( static_cast < int > ( this->type ) ), + this->count, this->mask ); if ( level > 0u ) { this->baseNMIU::show ( level - 1u ); } diff --git a/src/ca/netWriteNotifyIO.cpp b/src/ca/netWriteNotifyIO.cpp index 047e92715..e79c31ab3 100644 --- a/src/ca/netWriteNotifyIO.cpp +++ b/src/ca/netWriteNotifyIO.cpp @@ -16,9 +16,10 @@ #include "baseNMIU_IL.h" tsFreeList < class netWriteNotifyIO, 1024 > netWriteNotifyIO::freeList; +epicsMutex netWriteNotifyIO::freeListMutex; netWriteNotifyIO::netWriteNotifyIO ( nciu &chan, cacNotify ¬ifyIn ) : - cacNotifyIO ( notifyIn ), baseNMIU ( chan ) + baseNMIU ( notifyIn, chan ) { } @@ -26,44 +27,10 @@ netWriteNotifyIO::~netWriteNotifyIO () { } -void netWriteNotifyIO::destroy () -{ - delete this; -} - -void netWriteNotifyIO::completionNotify () -{ - this->notify ().completionNotify ( this->channelIO () ); -} - -void netWriteNotifyIO::completionNotify ( - unsigned /* type */, unsigned long /* count */, - const void * /* pData */ ) -{ - this->notify ().completionNotify ( this->channelIO () ); -} - -void netWriteNotifyIO::exceptionNotify ( int status, - const char *pContext ) -{ - this->notify ().exceptionNotify ( this->channelIO (), status, pContext ); -} - -void netWriteNotifyIO::exceptionNotify ( int status, - const char *pContext, unsigned type, unsigned long count ) -{ - this->notify ().exceptionNotify ( this->channelIO (), status, pContext, type, count ); -} - -cacChannelIO & netWriteNotifyIO::channelIO () const -{ - return this->channel (); -} - void netWriteNotifyIO::show ( unsigned level ) const { printf ( "read write notify IO at %p\n", - static_cast ( this ) ); + static_cast < const void * > ( this ) ); if ( level > 0u ) { this->baseNMIU::show ( level - 1u ); } diff --git a/src/ca/netiiu.cpp b/src/ca/netiiu.cpp index 4eadcb3d6..73653d793 100644 --- a/src/ca/netiiu.cpp +++ b/src/ca/netiiu.cpp @@ -45,11 +45,6 @@ void netiiu::show ( unsigned level ) const } } -unsigned netiiu::channelCount () const -{ - return this->channelList.count (); -} - // cac lock must also be applied when // calling this void netiiu::attachChannel ( nciu &chan ) @@ -115,7 +110,7 @@ bool netiiu::destroyAllIO ( nciu &chan ) } } while ( baseNMIU *pIO = eventQ.get () ) { - pIO->destroy (); + delete pIO; } return true; } @@ -142,23 +137,24 @@ void netiiu::resetChannelRetryCounts () bool netiiu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel ) { - bool status; + bool success; epicsAutoMutex autoMutex ( this->mutex ); - nciu *pChan = this->channelList.first (); - if ( pChan ) { - status = pChan->searchMsg ( retrySeqNumber, retryNoForThisChannel ); - if ( status ) { - this->channelList.remove ( *pChan ); + if ( nciu *pChan = this->channelList.get () ) { + success = pChan->searchMsg ( retrySeqNumber, retryNoForThisChannel ); + if ( success ) { this->channelList.add ( *pChan ); } + else { + this->channelList.push ( *pChan ); + } } else { - status = false; + success = false; } - return status; + return success; } bool netiiu::ca_v42_ok () const @@ -195,11 +191,6 @@ int netiiu::writeNotifyRequest ( nciu &, cacNotify &, unsigned, unsigned, const return ECA_DISCONNCHID; } -int netiiu::readCopyRequest ( nciu &, unsigned, unsigned, void * ) -{ - return ECA_DISCONNCHID; -} - int netiiu::readNotifyRequest ( nciu &, cacNotify &, unsigned, unsigned ) { return ECA_DISCONNCHID; @@ -226,9 +217,6 @@ void netiiu::subscriptionCancelRequest ( netSubscription &, bool ) int netiiu::installSubscription ( netSubscription &subscr ) { - // we must install the subscription first on the channel so that - // proper installation is guaranteed to occur if a connect occurs - // beteen these two steps { epicsAutoMutex autoMutex ( this->mutex ); subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr ); @@ -265,11 +253,11 @@ void netiiu::connectAllIO ( nciu & ) bool netiiu::uninstallIO ( baseNMIU &io ) { epicsAutoMutex autoMutex ( this->mutex ); - if ( ! io.channel ().verifyIIU ( *this ) ) { - return false; + if ( io.channel ().verifyIIU ( *this ) ) { + io.channel ().tcpiiuPrivateListOfIO::eventq.remove ( io ); + return true; } - io.channel ().tcpiiuPrivateListOfIO::eventq.remove ( io ); - return true; + return false; } double netiiu::beaconPeriod () const diff --git a/src/ca/searchTimer.cpp b/src/ca/searchTimer.cpp index eecf9514c..daa657de2 100644 --- a/src/ca/searchTimer.cpp +++ b/src/ca/searchTimer.cpp @@ -10,29 +10,26 @@ * Author: Jeff Hill */ -#include "iocinf.h" +#include "tsMinMax.h" -#ifdef DEBUG -# define debugPrintf(argsInParen) printf argsInParen -#else -# define debugPrintf(argsInParen) -#endif +#include "iocinf.h" +#include "netiiu_IL.h" // // searchTimer::searchTimer () // -searchTimer::searchTimer (udpiiu &iiuIn, osiTimerQueue &queueIn) : - osiTimer (queueIn), - iiu (iiuIn), - framesPerTry (INITIALTRIESPERFRAME), - framesPerTryCongestThresh (UINT_MAX), - minRetry (UINT_MAX), - retry (0u), - searchTriesWithinThisPass (0u), - searchResponsesWithinThisPass (0u), - retrySeqNo (0u), - retrySeqAtPassBegin (0u), - period (CA_RECAST_DELAY) +searchTimer::searchTimer ( udpiiu &iiuIn, osiTimerQueue &queueIn ) : + osiTimer ( queueIn ), + iiu ( iiuIn ), + framesPerTry ( INITIALTRIESPERFRAME ), + framesPerTryCongestThresh ( UINT_MAX ), + minRetry ( UINT_MAX ), + retry ( 0u ), + searchTriesWithinThisPass ( 0u ), + searchResponsesWithinThisPass ( 0u ), + retrySeqNo ( 0u ), + retrySeqAtPassBegin ( 0u ), + period ( CA_RECAST_DELAY ) { } @@ -47,16 +44,17 @@ void searchTimer::resetPeriod ( double delayToNextTry ) delayToNextTry = CA_RECAST_DELAY; } - this->lock (); - this->retry = 0; - if ( this->period > delayToNextTry ) { - reschedule = true; + { + epicsAutoMutex locker ( this->mutex ); + this->retry = 0; + if ( this->period > delayToNextTry ) { + reschedule = true; + } + else { + reschedule = false; + } + this->period = CA_RECAST_DELAY; } - else { - reschedule = false; - } - this->period = CA_RECAST_DELAY; - this->unlock (); if ( reschedule ) { this->reschedule ( delayToNextTry ); @@ -76,24 +74,23 @@ void searchTimer::setRetryInterval (unsigned retryNo) unsigned idelay; double delay; - this->lock (); + epicsAutoMutex locker ( this->mutex ); /* * set the retry number */ - this->retry = min (retryNo, MAXCONNTRIES+1u); + this->retry = tsMin ( retryNo, MAXCONNTRIES + 1u ); + /* * set the retry interval */ - idelay = 1u << min (this->retry, CHAR_BIT*sizeof(idelay)-1u); + idelay = 1u << tsMin (this->retry, CHAR_BIT*sizeof(idelay)-1u); delay = idelay * CA_RECAST_DELAY; /* sec */ /* * place upper limit on the retry delay */ - this->period = min (CA_RECAST_PERIOD, delay); - - this->unlock (); + this->period = tsMin ( CA_RECAST_PERIOD, delay ); debugPrintf ( ("new CA search period is %f sec\n", this->period) ); } @@ -109,20 +106,20 @@ void searchTimer::notifySearchResponse ( unsigned short retrySeqNoIn ) { bool reschedualNeeded; - this->lock (); + { + epicsAutoMutex locker ( this->mutex ); - if ( this->retrySeqAtPassBegin <= retrySeqNoIn ) { - if ( this->searchResponsesWithinThisPass < UINT_MAX ) { - this->searchResponsesWithinThisPass++; - } - } + if ( this->retrySeqAtPassBegin <= retrySeqNoIn ) { + if ( this->searchResponsesWithinThisPass < UINT_MAX ) { + this->searchResponsesWithinThisPass++; + } + } - reschedualNeeded = ( retrySeqNoIn == this->retrySeqNo ); - - this->unlock (); + reschedualNeeded = ( retrySeqNoIn == this->retrySeqNo ); + } if ( reschedualNeeded ) { - this->reschedule (0.0); + this->reschedule ( 0.0 ); } } @@ -141,159 +138,161 @@ void searchTimer::expire () return; } - this->lock (); - - /* - * increment the retry sequence number - */ - this->retrySeqNo++; /* allowed to roll over */ - - /* - * dynamically adjust the number of UDP frames per - * try depending how many search requests are not - * replied to - * - * This determines how many search request can be - * sent together (at the same instant in time). - * - * The variable this->framesPerTry - * determines the number of UDP frames to be sent - * each time that expire() is called. - * If this value is too high we will waste some - * network bandwidth. If it is too low we will - * use very little of the incoming UDP message - * buffer associated with the server's port and - * will therefore take longer to connect. We - * initialize this->framesPerTry - * to a prime number so that it is less likely that the - * same channel is in the last UDP frame - * sent every time that this is called (and - * potentially discarded by a CA server with - * a small UDP input queue). - */ - /* - * increase frames per try only if we see better than - * a 93.75% success rate for one pass through the list - */ - if (this->searchResponsesWithinThisPass > - (this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/16u)) ) { + { + epicsAutoMutex locker ( this->mutex ); + /* - * increase UDP frames per try if we have a good score + * increment the retry sequence number */ - if ( this->framesPerTry < MAXTRIESPERFRAME ) { - /* - * a congestion avoidance threshold similar to TCP is now used - */ - if ( this->framesPerTry < this->framesPerTryCongestThresh ) { - this->framesPerTry += this->framesPerTry; - } - else { - this->framesPerTry += (this->framesPerTry/8) + 1; - } - debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n", - this->framesPerTry, this->searchTriesWithinThisPass, this->searchResponsesWithinThisPass) ); - } - } - /* - * if we detect congestion because we have less than a 87.5% success - * rate then gradually reduce the frames per try - */ - else if ( this->searchResponsesWithinThisPass < - (this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/8u)) ) { - if (this->framesPerTry>1) { - this->framesPerTry--; - } - this->framesPerTryCongestThresh = this->framesPerTry/2 + 1; - debugPrintf ( ("Congestion detected - set frames per try to %u t=%u r=%u\n", - this->framesPerTry, this->searchTriesWithinThisPass, - this->searchResponsesWithinThisPass) ); - } + this->retrySeqNo++; /* allowed to roll over */ - while ( 1 ) { - /* - * clear counter when we reach the end of the list + * dynamically adjust the number of UDP frames per + * try depending how many search requests are not + * replied to * - * if we are making some progress then - * dont increase the delay between search - * requests + * This determines how many search request can be + * sent together (at the same instant in time). + * + * The variable this->framesPerTry + * determines the number of UDP frames to be sent + * each time that expire() is called. + * If this value is too high we will waste some + * network bandwidth. If it is too low we will + * use very little of the incoming UDP message + * buffer associated with the server's port and + * will therefore take longer to connect. We + * initialize this->framesPerTry + * to a prime number so that it is less likely that the + * same channel is in the last UDP frame + * sent every time that this is called (and + * potentially discarded by a CA server with + * a small UDP input queue). */ - if ( this->searchTriesWithinThisPass >= this->iiu.channelCount () ) { - if ( this->searchResponsesWithinThisPass == 0u ) { - debugPrintf ( ("increasing search try interval\n") ); - this->setRetryInterval ( this->minRetry + 1u ); - } - - this->minRetry = UINT_MAX; - - /* - * increment the retry sequence number - * (this prevents the time of the next search - * try from being set to the current time if - * we are handling a response from an old - * search message) - */ - this->retrySeqNo++; /* allowed to roll over */ - - /* - * so that old search tries will not update the counters - */ - this->retrySeqAtPassBegin = this->retrySeqNo; - - this->searchTriesWithinThisPass = 0; - this->searchResponsesWithinThisPass = 0; - - debugPrintf ( ("saw end of list\n") ); - } - - unsigned retryNoForThisChannel; - if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) { - nFrameSent++; - - if ( nFrameSent >= this->framesPerTry ) { - break; - } - - this->iiu.flush (); - - if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) { - break; - } - } - - this->minRetry = min ( this->minRetry, retryNoForThisChannel ); - - if ( this->searchTriesWithinThisPass < UINT_MAX ) { - this->searchTriesWithinThisPass++; - } - if ( nChanSent < UINT_MAX ) { - nChanSent++; - } - /* - * dont send any of the channels twice within one try + * increase frames per try only if we see better than + * a 93.75% success rate for one pass through the list */ - if ( nChanSent >= this->iiu.channelCount () ) { + if (this->searchResponsesWithinThisPass > + (this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/16u)) ) { /* - * add one to nFrameSent because there may be - * one more partial frame to be sent + * increase UDP frames per try if we have a good score */ - nFrameSent++; - - /* - * cap this->framesPerTry to - * the number of frames required for all of - * the unresolved channels - */ - if ( this->framesPerTry > nFrameSent ) { - this->framesPerTry = nFrameSent; + if ( this->framesPerTry < MAXTRIESPERFRAME ) { + /* + * a congestion avoidance threshold similar to TCP is now used + */ + if ( this->framesPerTry < this->framesPerTryCongestThresh ) { + this->framesPerTry += this->framesPerTry; + } + else { + this->framesPerTry += (this->framesPerTry/8) + 1; + } + debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n", + this->framesPerTry, this->searchTriesWithinThisPass, this->searchResponsesWithinThisPass) ); } + } + /* + * if we detect congestion because we have less than a 87.5% success + * rate then gradually reduce the frames per try + */ + else if ( this->searchResponsesWithinThisPass < + (this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/8u)) ) { + if (this->framesPerTry>1) { + this->framesPerTry--; + } + this->framesPerTryCongestThresh = this->framesPerTry/2 + 1; + debugPrintf ( ("Congestion detected - set frames per try to %u t=%u r=%u\n", + this->framesPerTry, this->searchTriesWithinThisPass, + this->searchResponsesWithinThisPass) ); + } + + while ( 1 ) { + + /* + * clear counter when we reach the end of the list + * + * if we are making some progress then + * dont increase the delay between search + * requests + */ + if ( this->searchTriesWithinThisPass >= this->iiu.channelCount () ) { + if ( this->searchResponsesWithinThisPass == 0u ) { + debugPrintf ( ("increasing search try interval\n") ); + this->setRetryInterval ( this->minRetry + 1u ); + } - break; + this->minRetry = UINT_MAX; + + /* + * increment the retry sequence number + * (this prevents the time of the next search + * try from being set to the current time if + * we are handling a response from an old + * search message) + */ + this->retrySeqNo++; /* allowed to roll over */ + + /* + * so that old search tries will not update the counters + */ + this->retrySeqAtPassBegin = this->retrySeqNo; + + this->searchTriesWithinThisPass = 0; + this->searchResponsesWithinThisPass = 0; + + debugPrintf ( ("saw end of list\n") ); + } + + unsigned retryNoForThisChannel; + if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) { + nFrameSent++; + + if ( nFrameSent >= this->framesPerTry ) { + break; + } + + this->iiu.flush (); + + if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) { + break; + } + } + + if ( this->minRetry > retryNoForThisChannel ) { + this->minRetry = retryNoForThisChannel; + } + + if ( this->searchTriesWithinThisPass < UINT_MAX ) { + this->searchTriesWithinThisPass++; + } + if ( nChanSent < UINT_MAX ) { + nChanSent++; + } + + /* + * dont send any of the channels twice within one try + */ + if ( nChanSent >= this->iiu.channelCount () ) { + /* + * add one to nFrameSent because there may be + * one more partial frame to be sent + */ + nFrameSent++; + + /* + * cap this->framesPerTry to + * the number of frames required for all of + * the unresolved channels + */ + if ( this->framesPerTry > nFrameSent ) { + this->framesPerTry = nFrameSent; + } + + break; + } } } - - this->unlock (); // flush out the search request buffer this->iiu.flush (); @@ -310,13 +309,11 @@ bool searchTimer::again () const if ( this->iiu.channelCount () == 0 ) { return false; } + else if ( this->retry < MAXCONNTRIES ) { + return true; + } else { - if ( this->retry < MAXCONNTRIES ) { - return true; - } - else { - return false; - } + return false; } } @@ -325,7 +322,7 @@ double searchTimer::delay () const return this->period; } -void searchTimer::show (unsigned /* level */) const +void searchTimer::show ( unsigned /* level */ ) const { } diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index fda000cc2..a8dd90045 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -23,12 +23,11 @@ #include "nciu_IL.h" #include "baseNMIU_IL.h" #include "netWriteNotifyIO_IL.h" -#include "netReadCopyIO_IL.h" #include "netReadNotifyIO_IL.h" #include "netSubscription_IL.h" -#include "ioCounter_IL.h" +#include "net_convert.h" -// nill message pad bytes +// nill message alignment pad bytes static const char nillBytes [] = { 0, 0, 0, 0, @@ -68,23 +67,6 @@ const tcpiiu::pProtoStubTCP tcpiiu::tcpJumpTableCAC [] = &tcpiiu::verifyAndDisconnectChan }; -#ifdef DEBUG -# define debugPrintf(argsInParen) printf argsInParen -#else -# define debugPrintf(argsInParen) -#endif - -#ifdef CONVERSION_REQUIRED -extern CACVRTFUNC *cac_dbr_cvrt[]; -#endif /*CONVERSION_REQUIRED*/ - -const static char nullBuff[32] = { - 0,0,0,0,0,0,0,0,0,0, - 0,0,0,0,0,0,0,0,0,0, - 0,0,0,0,0,0,0,0,0,0, - 0,0 -}; - // // cacSendThreadTCP () // @@ -117,22 +99,17 @@ extern "C" void cacSendThreadTCP ( void *pParam ) if ( status == ECA_NORMAL ) { piiu->flowControlActive = false; } -# if defined ( DEBUG ) - printf ( "fc off\n" ); -# endif + debugPrintf ( ( "fc off\n" ) ); } else { int status = piiu->enableFlowControlRequest (); if ( status == ECA_NORMAL ) { piiu->flowControlActive = true; } -# if defined ( DEBUG ) - printf ( "fc on\n" ); -# endif + debugPrintf ( ( "fc on\n" ) ); } } - if ( echoLaborNeeded ) { if ( CA_V43 ( CA_PROTOCOL_VERSION, piiu->minorProtocolVersion ) ) { piiu->echoRequest (); @@ -207,15 +184,12 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf ) { - unsigned totalBytes; - int status; - if ( this->state != iiu_connected ) { return 0u; } assert ( nBytesInBuf <= INT_MAX ); - status = ::recv ( this->sock, static_cast ( pBuf ), + int status = ::recv ( this->sock, static_cast ( pBuf ), static_cast ( nBytesInBuf ), 0); if ( status <= 0 ) { int localErrno = SOCKERRNO; @@ -243,9 +217,9 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf ) { char name[64]; - this->hostName ( name, sizeof (name) ); + this->hostName ( name, sizeof ( name ) ); ca_printf ( "Disconnecting from CA server %s because: %s\n", - name, SOCKERRSTR (localErrno) ); + name, SOCKERRSTR ( localErrno ) ); } this->cleanShutdown (); @@ -254,27 +228,7 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf ) } assert ( static_cast ( status ) <= nBytesInBuf ); - totalBytes = static_cast ( status ); - - { - epicsAutoMutex autoMutex ( this->mutex ); - if ( nBytesInBuf == totalBytes ) { - if ( this->contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) { - this->busyStateDetected = true; - } - else { - this->contigRecvMsgCount++; - } - } - else { - this->contigRecvMsgCount = 0u; - this->busyStateDetected = false; - } - } - - this->recvDog.messageArrivalNotify (); // reschedule connection activity watchdog - - return totalBytes; + return static_cast ( status ); } /* @@ -314,33 +268,63 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) } } + unsigned nBytes = 0u; while ( piiu->state == iiu_connected ) { - unsigned nBytes; - { + if ( nBytes >= maxBytesPendingTCP ) { + epicsEventMustWait ( piiu->recvThreadRingBufferSpaceAvailableSignal ); epicsAutoMutex autoMutex ( piiu->mutex ); nBytes = piiu->recvQue.occupiedBytes (); } - if ( nBytes >= 0x4000 ) { - epicsEventMustWait ( piiu->recvThreadRingBufferSpaceAvailableSignal ); - } else { comBuf * pComBuf = new comBuf; if ( pComBuf ) { unsigned nBytesIn = pComBuf->fillFromWire ( *piiu ); if ( nBytesIn ) { + bool msgHeaderButNoBody; { epicsAutoMutex autoMutex ( piiu->mutex ); + nBytes = piiu->recvQue.occupiedBytes (); + msgHeaderButNoBody = piiu->msgHeaderAvailable; piiu->recvQue.pushLastComBufReceived ( *pComBuf ); + if ( nBytesIn == pComBuf->capacityBytes () ) { + if ( piiu->contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) { + piiu->busyStateDetected = true; + } + else { + piiu->contigRecvMsgCount++; + } + } + else { + piiu->contigRecvMsgCount = 0u; + piiu->busyStateDetected = false; + } + // reschedule connection activity watchdog + piiu->recvDog.messageArrivalNotify (); + } + // wake up recv thread only if + // 1) there are currently no bytes in the queue + // 2) if the recv thread is currently blocking for an incomplete msg + if ( nBytes < sizeof ( caHdr ) || msgHeaderButNoBody ) { + piiu->pCAC ()->signalRecvActivity (); + } + if ( nBytes <= UINT_MAX - nBytesIn ) { + nBytes += nBytesIn; + } + else { + nBytes = UINT_MAX; } - piiu->pCAC ()->signalRecvActivity (); } else { pComBuf->destroy (); + epicsAutoMutex autoMutex ( piiu->mutex ); + nBytes = piiu->recvQue.occupiedBytes (); } } else { // no way to be informed when memory is available epicsThreadSleep ( 0.5 ); + epicsAutoMutex autoMutex ( piiu->mutex ); + nBytes = piiu->recvQue.occupiedBytes (); } } } @@ -369,7 +353,8 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, osiTimerQueue &timerQueue ) flowControlActive ( false ), echoRequestPending ( false ), msgHeaderAvailable ( false ), - sockCloseCompleted ( false ) + sockCloseCompleted ( false ), + fdRegCallbackNeeded ( true ) { this->addr.sa.sa_family = AF_UNSPEC; @@ -420,101 +405,96 @@ bool tcpiiu::initiateConnect ( const osiSockAddr &addrIn, unsigned minorVersion, int status; int flag; - epicsAutoMutex autoMutex ( this->mutex ); - - this->addr = addrIn; - - this->pHostNameCache = new hostNameCache ( addrIn, engineIn ); - if ( ! this->pHostNameCache ) { - return false; - } - - this->pBHE = &bhe; - bhe.bindToIIU ( *this ); - - this->state = iiu_connecting; - this->minorProtocolVersion = minorVersion; - - this->contigRecvMsgCount = 0u; - this->busyStateDetected = false; - this->flowControlActive = false; - this->echoRequestPending = false; - this->msgHeaderAvailable = false; - this->sockCloseCompleted = false; - - // first message informs server of user and host name of client - this->userNameSetRequest (); - this->hostNameSetRequest (); - - this->sock = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP ); - if ( this->sock == INVALID_SOCKET ) { - ca_printf ( "CAC: unable to create virtual circuit because \"%s\"\n", - SOCKERRSTR ( SOCKERRNO ) ); - return false; - } - - flag = TRUE; - status = setsockopt ( this->sock, IPPROTO_TCP, TCP_NODELAY, - (char *) &flag, sizeof ( flag ) ); - if ( status < 0 ) { - ca_printf ("CAC: problems setting socket option TCP_NODELAY = \"%s\"\n", - SOCKERRSTR (SOCKERRNO)); - } - - flag = TRUE; - status = setsockopt ( this->sock , SOL_SOCKET, SO_KEEPALIVE, - ( char * ) &flag, sizeof ( flag ) ); - if ( status < 0 ) { - ca_printf ( "CAC: problems setting socket option SO_KEEPALIVE = \"%s\"\n", - SOCKERRSTR ( SOCKERRNO ) ); - } - -#if 0 { - int i; + epicsAutoMutex autoMutex ( this->mutex ); - /* - * some concern that vxWorks will run out of mBuf's - * if this change is made joh 11-10-98 - */ - i = MAX_MSG_SIZE; - status = setsockopt ( this->sock, SOL_SOCKET, SO_SNDBUF, - ( char * ) &i, sizeof ( i ) ); - if (status < 0) { - ca_printf ("CAC: problems setting socket option SO_SNDBUF = \"%s\"\n", - SOCKERRSTR ( SOCKERRNO ) ); + this->addr = addrIn; + + this->pHostNameCache = new hostNameCache ( addrIn, engineIn ); + if ( ! this->pHostNameCache ) { + return false; } - i = MAX_MSG_SIZE; - status = setsockopt ( this->sock, SOL_SOCKET, SO_RCVBUF, - ( char * ) &i, sizeof ( i ) ); + + this->pBHE = &bhe; + bhe.bindToIIU ( *this ); + + this->state = iiu_connecting; + this->minorProtocolVersion = minorVersion; + + this->contigRecvMsgCount = 0u; + this->busyStateDetected = false; + this->flowControlActive = false; + this->echoRequestPending = false; + this->msgHeaderAvailable = false; + this->sockCloseCompleted = false; + + // first message informs server of user and host name of client + this->userNameSetRequest (); + this->hostNameSetRequest (); + + this->sock = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP ); + if ( this->sock == INVALID_SOCKET ) { + ca_printf ( "CAC: unable to create virtual circuit because \"%s\"\n", + SOCKERRSTR ( SOCKERRNO ) ); + return false; + } + + flag = true; + status = setsockopt ( this->sock, IPPROTO_TCP, TCP_NODELAY, + (char *) &flag, sizeof ( flag ) ); if ( status < 0 ) { - ca_printf ("CAC: problems setting socket option SO_RCVBUF = \"%s\"\n", + ca_printf ("CAC: problems setting socket option TCP_NODELAY = \"%s\"\n", SOCKERRSTR (SOCKERRNO)); } + + flag = true; + status = setsockopt ( this->sock , SOL_SOCKET, SO_KEEPALIVE, + ( char * ) &flag, sizeof ( flag ) ); + if ( status < 0 ) { + ca_printf ( "CAC: problems setting socket option SO_KEEPALIVE = \"%s\"\n", + SOCKERRSTR ( SOCKERRNO ) ); + } + + #if 0 + { + int i; + + /* + * some concern that vxWorks will run out of mBuf's + * if this change is made joh 11-10-98 + */ + i = MAX_MSG_SIZE; + status = setsockopt ( this->sock, SOL_SOCKET, SO_SNDBUF, + ( char * ) &i, sizeof ( i ) ); + if (status < 0) { + ca_printf ("CAC: problems setting socket option SO_SNDBUF = \"%s\"\n", + SOCKERRSTR ( SOCKERRNO ) ); + } + i = MAX_MSG_SIZE; + status = setsockopt ( this->sock, SOL_SOCKET, SO_RCVBUF, + ( char * ) &i, sizeof ( i ) ); + if ( status < 0 ) { + ca_printf ("CAC: problems setting socket option SO_RCVBUF = \"%s\"\n", + SOCKERRSTR (SOCKERRNO)); + } + } + #endif + + memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) ); + + tbs = epicsThreadHighestPriorityLevelBelow ( this->pCAC ()->getInitializingThreadsPriority (), &priorityOfRecv ); + if ( tbs != epicsThreadBooleanStatusSuccess ) { + priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority (); + } + + tid = epicsThreadCreate ("CAC-TCP-recv", priorityOfRecv, + epicsThreadGetStackSize (epicsThreadStackMedium), cacRecvThreadTCP, this); + if ( tid == 0 ) { + ca_printf ("CA: unable to create CA client receive thread\n"); + socket_close ( this->sock ); + return false; + } } -#endif - - memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) ); - - tbs = epicsThreadHighestPriorityLevelBelow ( this->pCAC ()->getInitializingThreadsPriority (), &priorityOfRecv ); - if ( tbs != epicsThreadBooleanStatusSuccess ) { - priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority (); - } - - tid = epicsThreadCreate ("CAC-TCP-recv", priorityOfRecv, - epicsThreadGetStackSize (epicsThreadStackMedium), cacRecvThreadTCP, this); - if ( tid == 0 ) { - ca_printf ("CA: unable to create CA client receive thread\n"); - socket_close ( this->sock ); - return false; - } - - CAFDHANDLER *fdRegFunc; - void *fdRegArg; - this->pCAC ()->getFDRegCallback ( fdRegFunc, fdRegArg ); - if ( fdRegFunc ) { - ( *fdRegFunc ) ( fdRegArg, this->sock, TRUE ); - } return true; } @@ -673,7 +653,7 @@ void tcpiiu::disconnect () void *fdRegArg; this->pCAC ()->getFDRegCallback ( fdRegFunc, fdRegArg ); if ( fdRegFunc ) { - ( *fdRegFunc ) ( fdRegArg, this->sock, FALSE ); + ( *fdRegFunc ) ( fdRegArg, this->sock, false ); } this->cleanShutdown (); @@ -742,7 +722,7 @@ void tcpiiu::disconnect () epicsAutoMutex autoMutex ( this->mutex ); if ( this->pCurData ) { - free ( this->pCurData ); + delete [] this->pCurData; this->pCurData = 0; this->curDataMax = 0u; } @@ -758,6 +738,7 @@ void tcpiiu::disconnect () this->sendQue.clear (); this->recvQue.clear (); } + this->fdRegCallbackNeeded = true; } @@ -814,7 +795,7 @@ bool tcpiiu::isVirtaulCircuit ( const char *pChannelName, const osiSockAddr &add } if ( ! match ) { - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex locker ( this->mutex ); char acc[64]; if ( this->pHostNameCache ) { this->pHostNameCache->hostName ( acc, sizeof ( acc ) ); @@ -832,7 +813,7 @@ bool tcpiiu::isVirtaulCircuit ( const char *pChannelName, const osiSockAddr &add void tcpiiu::show ( unsigned level ) const { - epicsAutoMutex autoMuext ( this->mutex ); + epicsAutoMutex locker ( this->mutex ); char buf[256]; if ( this->pHostNameCache ) { this->pHostNameCache->hostName ( buf, sizeof ( buf ) ); @@ -849,7 +830,7 @@ void tcpiiu::show ( unsigned level ) const } if ( level > 2u ) { printf ( "\tcurrent data cache pointer = %p current data cache size = %lu\n", - this->pCurData, this->curDataMax ); + static_cast < void * > ( this->pCurData ), this->curDataMax ); printf ( "\tcontiguous receive message count=%u, busy detect bool=%u, flow control bool=%u\n", this->contigRecvMsgCount, this->busyStateDetected, this->flowControlActive ); } @@ -877,7 +858,7 @@ void tcpiiu::show ( unsigned level ) const bool tcpiiu::setEchoRequestPending () { { - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex locker ( this->mutex ); this->echoRequestPending = true; } this->flush (); @@ -909,7 +890,7 @@ int tcpiiu::hostNameSetRequest () this->flush (); } - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex locker ( this->mutex ); int status = this->sendQue.reserveSpace ( postSize + 16u ); if ( status == ECA_NORMAL ) { @@ -945,7 +926,7 @@ int tcpiiu::userNameSetRequest () this->flush (); } - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex locker ( this->mutex ); int status = this->sendQue.reserveSpace ( postSize + 16u ); if ( status == ECA_NORMAL ) { @@ -969,7 +950,7 @@ int tcpiiu::disableFlowControlRequest () this->flush (); } - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex locker ( this->mutex ); int status = this->sendQue.reserveSpace ( 16u ); if ( status == ECA_NORMAL ) { @@ -990,7 +971,7 @@ int tcpiiu::enableFlowControlRequest () this->flush (); } - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex locker ( this->mutex ); int status = this->sendQue.reserveSpace ( 16u ); if ( status == ECA_NORMAL ) { @@ -1011,7 +992,7 @@ int tcpiiu::noopRequest () this->flush (); } - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex locker ( this->mutex ); int status = this->sendQue.reserveSpace ( 16u ); if ( status == ECA_NORMAL ) { @@ -1032,7 +1013,7 @@ int tcpiiu::echoRequest () this->flush (); } - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex locker ( this->mutex ); int status = this->sendQue.reserveSpace ( 16u ); if ( status == ECA_NORMAL ) { @@ -1047,29 +1028,29 @@ int tcpiiu::echoRequest () return status; } -void tcpiiu::noopAction () +bool tcpiiu::noopAction () { - return; + return true; } -void tcpiiu::echoRespAction () +bool tcpiiu::echoRespAction () { - return; + return true; } -void tcpiiu::writeNotifyRespAction () +bool tcpiiu::writeNotifyRespAction () { int status = this->curMsg.m_cid; if ( status == ECA_NORMAL ) { - this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available ); + return this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available ); } else { - this->ioExceptionNotifyAndDestroy ( this->curMsg.m_available, + return this->ioExceptionNotifyAndDestroy ( this->curMsg.m_available, status, "write notify request rejected" ); } } -void tcpiiu::readNotifyRespAction () +bool tcpiiu::readNotifyRespAction () { int v41; int status; @@ -1081,7 +1062,7 @@ void tcpiiu::readNotifyRespAction () # ifdef CONVERSION_REQUIRED if ( this->curMsg.m_dataType < NELEMENTS ( cac_dbr_cvrt ) ) { ( *cac_dbr_cvrt[ this->curMsg.m_dataType ] ) ( - this->pCurData, this->pCurData, FALSE, this->curMsg.m_count); + this->pCurData, this->pCurData, false, this->curMsg.m_count); } else { this->curMsg.m_cid = htonl ( ECA_BADTYPE ); @@ -1097,42 +1078,38 @@ void tcpiiu::readNotifyRespAction () if (v41) { status = this->curMsg.m_cid; } - else{ + else { status = ECA_NORMAL; } if ( status == ECA_NORMAL ) { - this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available, + return this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available, this->curMsg.m_dataType, this->curMsg.m_count, this->pCurData ); } else { - this->ioExceptionNotifyAndDestroy ( this->curMsg.m_available, + return this->ioExceptionNotifyAndDestroy ( this->curMsg.m_available, status, "read failed", this->curMsg.m_dataType, this->curMsg.m_count ); } } -void tcpiiu::eventRespAction () +bool tcpiiu::eventRespAction () { - int v41; - int status; - /* * m_postsize = 0 used to be a confirmation, but is - * now a noop because the above hash lookup will - * not find a matching IO block + * now a noop because the IO block is immediately + * deleted */ if ( ! this->curMsg.m_postsize ) { - return; + return true; } /* - * convert the data buffer from net - * format to host format + * convert the data buffer from net format to host format */ # ifdef CONVERSION_REQUIRED if ( this->curMsg.m_dataType < NELEMENTS ( cac_dbr_cvrt ) ) { ( *cac_dbr_cvrt [ this->curMsg.m_dataType ] )( - this->pCurData, this->pCurData, FALSE, + this->pCurData, this->pCurData, false, this->curMsg.m_count); } else { @@ -1145,120 +1122,120 @@ void tcpiiu::eventRespAction () * read notify status starting * with CA V4.1 */ - v41 = CA_V41 ( CA_PROTOCOL_VERSION, this->minorProtocolVersion ); - if (v41) { + int status; + int v41 = CA_V41 ( CA_PROTOCOL_VERSION, this->minorProtocolVersion ); + if ( v41 ) { status = this->curMsg.m_cid; } else { status = ECA_NORMAL; } if ( status == ECA_NORMAL ) { - this->ioCompletionNotify ( this->curMsg.m_available, + return this->ioCompletionNotify ( this->curMsg.m_available, this->curMsg.m_dataType, this->curMsg.m_count, this->pCurData ); } else { - this->ioExceptionNotify ( this->curMsg.m_available, + return this->ioExceptionNotify ( this->curMsg.m_available, status, "subscription update failed", this->curMsg.m_dataType, this->curMsg.m_count ); } } -void tcpiiu::readRespAction () +bool tcpiiu::readRespAction () { - this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available, + return this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available, this->curMsg.m_dataType, this->curMsg.m_count, this->pCurData ); } -void tcpiiu::clearChannelRespAction () +bool tcpiiu::clearChannelRespAction () { - // currently a noop + return true; // currently a noop } -void tcpiiu::exceptionRespAction () +bool tcpiiu::exceptionRespAction () { + const caHdr * req = reinterpret_cast < const caHdr * > ( this->pCurData ); char context[255]; char hostName[64]; - caHdr *req = (caHdr *) this->pCurData; { epicsAutoMutex autoMutex ( this->mutex ); + const char *pName = reinterpret_cast < const char * > ( req + 1 ); + if ( this->pHostNameCache ) { this->pHostNameCache->hostName ( hostName, sizeof ( hostName ) ); if ( this->curMsg.m_postsize > sizeof (caHdr) ) { sprintf ( context, "detected by: %s for: %s", - hostName, (char *)(req+1) ); + hostName, pName); } else{ - sprintf ( context, "for: %s", (char *) ( req + 1 ) ); + sprintf ( context, "for: %s", pName ); } } else { - sprintf ( context, "for: %s", (char *) ( req + 1 ) ); + sprintf ( context, "for: %s", pName ); } } switch ( ntohs ( req->m_cmmd ) ) { case CA_PROTO_READ_NOTIFY: - this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available), - ntohl (this->curMsg.m_available), context, - ntohs (req->m_dataType), ntohs (req->m_count) ); - break; + return this->ioExceptionNotifyAndDestroy ( ntohl ( req->m_available ), + ntohl ( this->curMsg.m_available ), context, + ntohs ( req->m_dataType ), ntohs ( req->m_count ) ); case CA_PROTO_READ: - this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available), - ntohl (this->curMsg.m_available), context, - ntohs (req->m_dataType), ntohs (req->m_count) ); - break; + return this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available), + ntohl ( this->curMsg.m_available ), context, + ntohs ( req->m_dataType ), ntohs ( req->m_count ) ); case CA_PROTO_WRITE_NOTIFY: - this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available), - ntohl (this->curMsg.m_available), context, - ntohs (req->m_dataType), ntohs (req->m_count) ); - break; + return this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available), + ntohl ( this->curMsg.m_available ), context, + ntohs ( req->m_dataType ), ntohs ( req->m_count ) ); case CA_PROTO_WRITE: - this->pCAC ()->exceptionNotify ( ntohl ( this->curMsg.m_available), - context, ntohs (req->m_dataType), ntohs (req->m_count), __FILE__, __LINE__); - break; + this->pCAC ()->exceptionNotify ( ntohl ( this->curMsg.m_available ), + context, ntohs ( req->m_dataType ), ntohs ( req->m_count ), __FILE__, __LINE__); + return true; case CA_PROTO_EVENT_ADD: - this->ioExceptionNotify ( ntohl (req->m_available), - ntohl (this->curMsg.m_available), context, - ntohs (req->m_dataType), ntohs (req->m_count) ); - break; + return this->ioExceptionNotify ( ntohl ( req->m_available ), + ntohl ( this->curMsg.m_available ), context, + ntohs ( req->m_dataType ), ntohs ( req->m_count ) ); case CA_PROTO_EVENT_CANCEL: - this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available), - ntohl (this->curMsg.m_available), context ); - break; + return this->ioExceptionNotifyAndDestroy ( ntohl ( req->m_available ), + ntohl ( this->curMsg.m_available ), context ); default: - this->pCAC ()->exceptionNotify (ntohl (this->curMsg.m_available), - context, __FILE__, __LINE__); - break; + this->pCAC ()->exceptionNotify ( ntohl ( this->curMsg.m_available ), + context, __FILE__, __LINE__ ); + return true; } } -void tcpiiu::accessRightsRespAction () +bool tcpiiu::accessRightsRespAction () { static caar init; caar arBitField = init; // shut up bounds checker - unsigned ar; + unsigned ar = this->curMsg.m_available; - ar = this->curMsg.m_available; arBitField.read_access = ( ar & CA_PROTO_ACCESS_RIGHT_READ ) ? 1 : 0; arBitField.write_access = ( ar & CA_PROTO_ACCESS_RIGHT_WRITE ) ? 1 : 0; this->pCAC ()->accessRightsNotify ( this->curMsg.m_cid, arBitField ); + + return true; } -void tcpiiu::claimCIURespAction () +bool tcpiiu::claimCIURespAction () { - this->pCAC ()->connectChannel ( this->ca_v44_ok (), this->curMsg.m_cid, + return this->pCAC ()->connectChannel ( this->ca_v44_ok (), this->curMsg.m_cid, this->curMsg.m_dataType, this->curMsg.m_count, this->curMsg.m_available ); } -void tcpiiu::verifyAndDisconnectChan () +bool tcpiiu::verifyAndDisconnectChan () { this->pCAC ()->disconnectChannel ( this->curMsg.m_cid ); + return true; } -void tcpiiu::badTCPRespAction () +bool tcpiiu::badTCPRespAction () { char hostName[64]; bool hostNameInit; @@ -1274,13 +1251,14 @@ void tcpiiu::badTCPRespAction () } if ( hostNameInit ) { - ca_printf ( "CAC: Bad response code in TCP message from %s was %u\n", + ca_printf ( "CAC: Undecipherable TCP message ( bad response type %u ) from %s\n", hostName, this->curMsg.m_cmmd); } else { - ca_printf ( "CAC: Bad response code in TCP message was %u\n", + ca_printf ( "CAC: Undecipherable TCP message ( bad response type %u )\n", this->curMsg.m_cmmd); } + return false; } /* @@ -1288,7 +1266,21 @@ void tcpiiu::badTCPRespAction () */ void tcpiiu::processIncoming () { + // force fd reg callback to occur through the + // recv process thread + if ( this->fdRegCallbackNeeded ) { + this->fdRegCallbackNeeded = false; + CAFDHANDLER *fdRegFunc; + void *fdRegArg; + this->pCAC ()->getFDRegCallback ( fdRegFunc, fdRegArg ); + if ( fdRegFunc ) { + ( *fdRegFunc ) ( fdRegArg, this->sock, true ); + } + } + while ( 1 ) { + unsigned nBytes; + bool signalNeeded; // // fetch a complete message header @@ -1296,6 +1288,8 @@ void tcpiiu::processIncoming () { epicsAutoMutex autoMutex ( this->mutex ); + nBytes = this->recvQue.occupiedBytes (); + if ( ! this->msgHeaderAvailable ) { this->msgHeaderAvailable = this->recvQue.copyOutBytes ( @@ -1344,8 +1338,6 @@ void tcpiiu::processIncoming () // make sure we have a large enough message body cache // if ( this->curMsg.m_postsize > this->curDataMax ) { - void *pData; - size_t cacheSize; /* * scalar DBR_STRING is sometimes clipped to the @@ -1354,18 +1346,22 @@ void tcpiiu::processIncoming () * not page fault if they read MAX_STRING_SIZE * bytes (instead of the actual string size). */ - cacheSize = max ( this->curMsg.m_postsize * 2u, MAX_STRING_SIZE ); - pData = (void *) calloc (1u, cacheSize); + unsigned cacheSize = this->curMsg.m_postsize * 2u; + if ( cacheSize < MAX_STRING_SIZE ) { + cacheSize = MAX_STRING_SIZE; + } + + char *pData = new char [cacheSize]; if ( ! pData ) { ca_printf ("CAC: not enough memory for message body cache (disconnecting)\n"); this->cleanShutdown (); return; } if ( this->pCurData ) { - free ( this->pCurData ); + delete [] this->pCurData; } this->pCurData = pData; - this->curDataMax = this->curMsg.m_postsize; + this->curDataMax = cacheSize; } if ( this->curMsg.m_postsize > 0u ) { @@ -1376,6 +1372,16 @@ void tcpiiu::processIncoming () } } + if ( nBytes >= maxBytesPendingTCP && + this->recvQue.occupiedBytes () < maxBytesPendingTCP ) { + signalNeeded = true; + } + else { + signalNeeded = false; + } + } + + if ( signalNeeded ) { epicsEventSignal ( this->recvThreadRingBufferSpaceAvailableSignal ); } @@ -1395,22 +1401,8 @@ void tcpiiu::processIncoming () } } -inline int tcpiiu::requestStubStatus () -{ - if ( this->state == iiu_connected ) { - return ECA_NORMAL; - } - else { - return ECA_DISCONNCHID; - } -} - int tcpiiu::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue ) { - bufferReservoir reservoir; - unsigned size, postcnt; - bool stringOptim; - if ( ! this->sendQue.dbr_type_ok ( type ) ) { return ECA_BADTYPE; } @@ -1419,9 +1411,11 @@ int tcpiiu::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void return ECA_BADCOUNT; } + bool stringOptim; + unsigned size; if ( type == DBR_STRING && nElem == 1 ) { - char *pstr = (char *) pValue; - size = strlen ( pstr ) +1; + const char *pstr = static_cast < const char * > ( pValue ); + size = strlen ( pstr ) + 1; stringOptim = true; } else { @@ -1429,12 +1423,12 @@ int tcpiiu::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void stringOptim = false; } - postcnt = CA_MESSAGE_ALIGN ( size ); + unsigned postcnt = CA_MESSAGE_ALIGN ( size ); if ( postcnt > 0xffff ) { return ECA_BADCOUNT; } - if ( this->sendQue.flushThreshold ( postcnt + 16u ) ) { + if ( this->sendQue.flushThreshold ( postcnt + sizeof ( caHdr ) ) ) { this->threadContextSensitiveFlushToWire ( true ); } @@ -1469,9 +1463,6 @@ int tcpiiu::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void int tcpiiu::writeNotifyRequest ( nciu &chan, cacNotify ¬ify, unsigned type, unsigned nElem, const void *pValue ) { - ca_uint32_t size, postcnt; - bool stringOptim; - if ( ! this->ca_v41_ok () ) { return ECA_NOSUPPORT; } @@ -1484,6 +1475,8 @@ int tcpiiu::writeNotifyRequest ( nciu &chan, cacNotify ¬ify, unsigned type, return ECA_BADCOUNT; } + ca_uint32_t size; + bool stringOptim; if ( type == DBR_STRING && nElem == 1 ) { char *pstr = (char *) pValue; size = strlen ( pstr ) +1; @@ -1493,7 +1486,7 @@ int tcpiiu::writeNotifyRequest ( nciu &chan, cacNotify ¬ify, unsigned type, size = dbr_size_n ( type, nElem ); stringOptim = false; } - postcnt = CA_MESSAGE_ALIGN ( size ); + ca_uint32_t postcnt = CA_MESSAGE_ALIGN ( size ); if ( postcnt > 0xffff ) { return ECA_BADCOUNT; } @@ -1527,54 +1520,7 @@ int tcpiiu::writeNotifyRequest ( nciu &chan, cacNotify ¬ify, unsigned type, this->sendQue.pushString ( nillBytes, postcnt - size ); } else { - pIO->destroy (); - } - } - else { - status = ECA_ALLOCMEM; - } - } - else { - status = ECA_DISCONNCHID; - } - - return status; -} - -int tcpiiu::readCopyRequest ( nciu &chan, unsigned type, unsigned nElem, void *pValue ) -{ - if ( nElem > 0xffff) { - return ECA_BADCOUNT; - } - if ( type > 0xffff) { - return ECA_BADTYPE; - } - - if ( this->sendQue.flushThreshold ( 16u ) ) { - this->threadContextSensitiveFlushToWire ( true ); - } - - epicsAutoMutex autoMutex ( this->mutex ); - - int status; - if ( chan.verifyConnected ( *this ) ) { - unsigned seqNo = this->pCAC ()->readSequenceOfOutstandingIO (); - netReadCopyIO *pIO = new netReadCopyIO ( chan, type, - nElem, pValue, seqNo ); - if ( pIO ) { - status = this->sendQue.reserveSpace ( 16u ); - if ( status == ECA_NORMAL ) { - this->ioTable.add ( *pIO ); - chan.tcpiiuPrivateListOfIO::eventq.add ( *pIO ); - this->sendQue.pushUInt16 ( CA_PROTO_READ ); // cmd - this->sendQue.pushUInt16 ( 0u ); // postsize - this->sendQue.pushUInt16 ( type ); // dataType - this->sendQue.pushUInt16 ( nElem ); // count - this->sendQue.pushUInt32 ( chan.getSID () ); // cid - this->sendQue.pushUInt32 ( pIO->getID () ); // available - } - else { - pIO->destroy (); + delete static_cast < baseNMIU * > ( pIO ); } } else { @@ -1620,7 +1566,7 @@ int tcpiiu::readNotifyRequest ( nciu &chan, cacNotify ¬ify, this->sendQue.pushUInt32 ( pIO->getID () ); // available } else { - pIO->destroy (); + delete static_cast < baseNMIU * > ( pIO ); } } else { @@ -1662,12 +1608,10 @@ int tcpiiu::createChannelRequest ( nciu &chan ) epicsAutoMutex autoMutex ( this->mutex ); - int status = this->sendQue.reserveSpace ( postCnt + 16u ); - if ( status == ECA_NORMAL ) { - if ( ! chan.verifyIIU ( *this ) ) { - status = ECA_DISCONNCHID; - } - else { + int status; + if ( chan.verifyIIU ( *this ) ) { + status = this->sendQue.reserveSpace ( postCnt + 16u ); + if ( status == ECA_NORMAL ) { this->sendQue.pushUInt16 ( CA_PROTO_CLAIM_CIU ); // cmd this->sendQue.pushUInt16 ( postCnt ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType @@ -1687,6 +1631,9 @@ int tcpiiu::createChannelRequest ( nciu &chan ) } } } + else { + status = ECA_DISCONNCHID; + } return status; } @@ -1721,7 +1668,9 @@ int tcpiiu::clearChannelRequest ( nciu &chan ) int tcpiiu::subscriptionRequest ( netSubscription &subscr, bool userThread ) { - if ( subscr.getCount () > 0xffff ) { + unsigned long count = subscr.getCount (); + + if ( count == 0u || count > 0xffff ) { return ECA_BADCOUNT; } @@ -1744,19 +1693,17 @@ int tcpiiu::subscriptionRequest ( netSubscription &subscr, bool userThread ) epicsAutoMutex autoMutex ( this->mutex ); - int status = this->sendQue.reserveSpace ( 32u ); - if ( status == ECA_NORMAL ) { - if ( ! subscr.channel ().verifyConnected ( *this ) ) { - status = ECA_NORMAL; - } - else { + int status; + if ( subscr.channel ().verifyConnected ( *this ) ) { + status = this->sendQue.reserveSpace ( 32u ); + if ( status == ECA_NORMAL ) { this->ioTable.add ( subscr ); // header this->sendQue.pushUInt16 ( CA_PROTO_EVENT_ADD ); // cmd this->sendQue.pushUInt16 ( 16u ); // postsize this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getType () ) ); // dataType - this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getCount () ) ); // count + this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( count ) ); // count this->sendQue.pushUInt32 ( subscr.channel ().getSID () ); // cid this->sendQue.pushUInt32 ( subscr.getID () ); // available @@ -1768,6 +1715,9 @@ int tcpiiu::subscriptionRequest ( netSubscription &subscr, bool userThread ) this->sendQue.pushUInt16 ( 0u ); // m_pad } } + else { + status = ECA_NORMAL; + } return status; } @@ -1809,11 +1759,11 @@ bool tcpiiu::threadContextSensitiveFlushToWire ( bool userThread ) // can result in a push / pull deadlock on the TCP pipe, // but in that case we still schedual the flush through // the higher priority send thread - if ( ! pCAC ()->flushPermit () ) { - this->flush (); - return true; + if ( pCAC ()->flushPermit () ) { + return this->flushToWire ( userThread ); } - return this->flushToWire ( userThread ); + this->flush (); + return true; } bool tcpiiu::flushToWire ( bool userThread ) @@ -1862,36 +1812,49 @@ bool tcpiiu::flushToWire ( bool userThread ) return success; } -void tcpiiu::ioCompletionNotify ( unsigned id, unsigned type, +bool tcpiiu::ioCompletionNotify ( unsigned id, unsigned type, unsigned long count, const void *pData ) { epicsAutoMutex autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.lookup ( id ); if ( pmiu ) { - pmiu->completionNotify ( type, count, pData ); + pmiu->notify ().completionNotify ( pmiu->channel (), type, count, pData ); + return true; + } + else { + return false; } } -void tcpiiu::ioExceptionNotify ( unsigned id, int status, const char *pContext ) +bool tcpiiu::ioExceptionNotify ( unsigned id, int status, const char *pContext ) { epicsAutoMutex autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.lookup ( id ); if ( pmiu ) { - pmiu->exceptionNotify ( status, pContext ); + pmiu->notify ().exceptionNotify ( pmiu->channel (), status, pContext ); + return true; + } + else { + return false; } } -void tcpiiu::ioExceptionNotify ( unsigned id, int status, +bool tcpiiu::ioExceptionNotify ( unsigned id, int status, const char *pContext, unsigned type, unsigned long count ) { epicsAutoMutex autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.lookup ( id ); if ( pmiu ) { - pmiu->exceptionNotify ( status, pContext, type, count ); + pmiu->notify ().exceptionNotify ( pmiu->channel (), + status, pContext, type, count ); + return true; + } + else { + return false; } } -void tcpiiu::ioCompletionNotifyAndDestroy ( unsigned id ) +bool tcpiiu::ioCompletionNotifyAndDestroy ( unsigned id ) { baseNMIU * pmiu; @@ -1904,12 +1867,16 @@ void tcpiiu::ioCompletionNotifyAndDestroy ( unsigned id ) } if ( pmiu ) { - pmiu->completionNotify (); + pmiu->notify ().completionNotify ( pmiu->channel () ); delete pmiu; + return true; + } + else { + return false; } } -void tcpiiu::ioCompletionNotifyAndDestroy ( unsigned id, +bool tcpiiu::ioCompletionNotifyAndDestroy ( unsigned id, unsigned type, unsigned long count, const void *pData ) { baseNMIU * pmiu; @@ -1923,12 +1890,16 @@ void tcpiiu::ioCompletionNotifyAndDestroy ( unsigned id, } if ( pmiu ) { - pmiu->completionNotify ( type, count, pData ); + pmiu->notify ().completionNotify ( pmiu->channel (), type, count, pData ); delete pmiu; + return true; + } + else { + return false; } } -void tcpiiu::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext ) +bool tcpiiu::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext ) { baseNMIU * pmiu; @@ -1941,12 +1912,16 @@ void tcpiiu::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char * } if ( pmiu ) { - pmiu->exceptionNotify ( status, pContext ); + pmiu->notify ().exceptionNotify ( pmiu->channel (), status, pContext ); delete pmiu; + return true; + } + else { + return false; } } -void tcpiiu::ioExceptionNotifyAndDestroy ( unsigned id, int status, +bool tcpiiu::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext, unsigned type, unsigned long count ) { baseNMIU * pmiu; @@ -1960,8 +1935,13 @@ void tcpiiu::ioExceptionNotifyAndDestroy ( unsigned id, int status, } if ( pmiu ) { - pmiu->exceptionNotify ( status, pContext, type, count ); + pmiu->notify ().exceptionNotify ( pmiu->channel (), status, + pContext, type, count ); delete pmiu; + return true; + } + else { + return false; } } @@ -1983,8 +1963,8 @@ void tcpiiu::connectAllIO ( nciu &chan ) // it shouldnt be here at this point - so uninstall it this->ioTable.remove ( *pNetIO ); chan.tcpiiuPrivateListOfIO::eventq.remove ( *pNetIO ); - pNetIO->exceptionNotify ( ECA_DISCONN, this->pHostName () ); - pNetIO->destroy (); + pNetIO->notify ().exceptionNotify ( pNetIO->channel (), ECA_DISCONN, this->pHostName () ); + delete pNetIO.pointer (); } pNetIO = next; } @@ -2010,8 +1990,8 @@ void tcpiiu::disconnectAllIO ( nciu &chan ) else { // no use after disconnected - so uninstall it chan.tcpiiuPrivateListOfIO::eventq.remove ( *pNetIO ); - pNetIO->exceptionNotify ( ECA_DISCONN, this->pHostName () ); - pNetIO->destroy (); + pNetIO->notify ().exceptionNotify ( pNetIO->channel (), ECA_DISCONN, this->pHostName () ); + delete pNetIO.pointer (); } pNetIO = next; } @@ -2039,7 +2019,7 @@ bool tcpiiu::destroyAllIO ( nciu &chan ) } } while ( baseNMIU *pIO = eventQ.get () ) { - pIO->destroy (); + delete pIO; } return true; } @@ -2068,4 +2048,11 @@ double tcpiiu::beaconPeriod () const } } +// not inline because its virtual +bool tcpiiu::ca_v42_ok () const +{ + return CA_V42 ( CA_PROTOCOL_VERSION, this->minorProtocolVersion ); +} + + diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp index e96faf23f..4ca268994 100644 --- a/src/ca/udpiiu.cpp +++ b/src/ca/udpiiu.cpp @@ -18,8 +18,6 @@ #include "netiiu_IL.h" #include "cac_IL.h" -typedef void (*pProtoStubUDP) (udpiiu *piiu, caHdr *pMsg, const struct sockaddr_in *pnet_addr); - // UDP protocol dispatch table const udpiiu::pProtoStubUDP udpiiu::udpJumpTableCAC [] = { @@ -41,293 +39,8 @@ const udpiiu::pProtoStubUDP udpiiu::udpJumpTableCAC [] = &udpiiu::badUDPRespAction, &udpiiu::badUDPRespAction, &udpiiu::repeaterAckAction, - &udpiiu::badUDPRespAction, - &udpiiu::badUDPRespAction, - &udpiiu::badUDPRespAction, - &udpiiu::badUDPRespAction, - &udpiiu::badUDPRespAction, - &udpiiu::badUDPRespAction, - &udpiiu::badUDPRespAction, - &udpiiu::badUDPRespAction, - &udpiiu::badUDPRespAction, - &udpiiu::badUDPRespAction }; -// -// udpiiu::recvMsg () -// -void udpiiu::recvMsg () -{ - osiSockAddr src; - osiSocklen_t src_size = sizeof (src); - int status; - - status = recvfrom ( this->sock, this->recvBuf, sizeof ( this->recvBuf ), 0, - &src.sa, &src_size ); - if ( status <= 0 ) { - - if ( status == 0 ) { - return; - } - - int errnoCpy = SOCKERRNO; - - if ( errnoCpy == SOCK_SHUTDOWN ) { - return; - } - if ( errnoCpy == SOCK_ENOTSOCK ) { - return; - } - if ( errnoCpy == SOCK_EBADF ) { - return; - } - if ( errnoCpy == SOCK_EINTR ) { - return; - } -# ifdef linux - /* - * Avoid spurious ECONNREFUSED bug - * in linux - */ - if ( errnoCpy == SOCK_ECONNREFUSED ) { - return; - } -# endif - ca_printf ( "Unexpected UDP recv error was \"%s\"\n", - SOCKERRSTR (errnoCpy) ); - } - else if ( status > 0 ) { - status = this->postMsg ( src, - this->recvBuf, (unsigned long) status ); - if ( status != ECA_NORMAL ) { - char buf[64]; - - sockAddrToDottedIP ( &src.sa, buf, sizeof ( buf ) ); - - ca_printf ( - "%s: bad UDP msg from %s because \"%s\"\n", __FILE__, - buf, ca_message (status) ); - return; - } - } - - return; -} - -/* - * cacRecvThreadUDP () - */ -extern "C" void cacRecvThreadUDP (void *pParam) -{ - udpiiu *piiu = (udpiiu *) pParam; - - do { - piiu->recvMsg (); - } while ( ! piiu->shutdownCmd ); - - epicsEventSignal ( piiu->recvThreadExitSignal ); -} - -/* - * udpiiu::repeaterRegistrationMessage () - * - * register with the repeater - */ -void udpiiu::repeaterRegistrationMessage ( unsigned attemptNumber ) -{ - caRepeaterRegistrationMessage ( this->sock, this->repeaterPort, attemptNumber ); -} - -/* - * caRepeaterRegistrationMessage () - * - * register with the repeater - */ -epicsShareFunc void epicsShareAPI caRepeaterRegistrationMessage ( - SOCKET sock, unsigned repeaterPort, unsigned attemptNumber ) -{ - caHdr msg; - osiSockAddr saddr; - int status; - int len; - - /* - * In 3.13 beta 11 and before the CA repeater calls local_addr() - * to determine a local address and does not allow registration - * messages originating from other addresses. In these - * releases local_addr() returned the address of the first enabled - * interface found, and this address may or may not have been the loop - * back address. Starting with 3.13 beta 12 local_addr() was - * changed to always return the address of the first enabled - * non-loopback interface because a valid non-loopback local - * address is required in the beacon messages. Therefore, to - * guarantee compatibility with past versions of the repeater - * we alternate between the address returned by local_addr() - * and the loopback address here. - * - * CA repeaters in R3.13 beta 12 and higher allow - * either the loopback address or the address returned - * by local address (the first non-loopback address found) - */ - if ( attemptNumber & 1 ) { - saddr = osiLocalAddr ( sock ); - if ( saddr.sa.sa_family != AF_INET ) { - /* - * use the loop back address to communicate with the CA repeater - * if this os does not have interface query capabilities - * - * this will only work with 3.13 beta 12 CA repeaters or later - */ - saddr.ia.sin_family = AF_INET; - saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK ); - saddr.ia.sin_port = htons ( repeaterPort ); - } - else { - saddr.ia.sin_port = htons ( repeaterPort ); - } - } - else { - saddr.ia.sin_family = AF_INET; - saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK ); - saddr.ia.sin_port = htons ( repeaterPort ); - } - - memset ( (char *) &msg, 0, sizeof (msg) ); - msg.m_cmmd = htons ( REPEATER_REGISTER ); - msg.m_available = saddr.ia.sin_addr.s_addr; - - /* - * Intentionally sending a zero length message here - * until most CA repeater daemons have been restarted - * (and only then will accept the above protocol) - * (repeaters began accepting this protocol - * starting with EPICS 3.12) - * - * SOLARIS will not accept a zero length message - * and we are just porting there for 3.12 so we will use the - * new protocol for 3.12 - * - * recent versions of UCX will not accept a zero - * length message and we will assume that folks - * using newer versions of UCX have rebooted (and - * therefore restarted the CA repeater - and therefore - * moved it to an EPICS release that accepts this protocol) - */ -# if defined ( DOES_NOT_ACCEPT_ZERO_LENGTH_UDP ) - len = sizeof (msg); -# else - len = 0; -# endif - - status = sendto ( sock, (char *) &msg, len, - 0, (struct sockaddr *) &saddr, sizeof ( saddr ) ); - if ( status < 0 ) { - int errnoCpy = SOCKERRNO; - if ( errnoCpy != SOCK_EINTR && - /* - * This is returned from Linux when - * the repeater isnt running - */ - errnoCpy != SOCK_ECONNREFUSED ) { - ca_printf ( "CAC: error sending to repeater was \"%s\"\n", - SOCKERRSTR (errnoCpy) ); - } - } -} - -/* - * caStartRepeaterIfNotInstalled () - * - * Test for the repeater already installed - * - * NOTE: potential race condition here can result - * in two copies of the repeater being spawned - * however the repeater detects this, prints a message, - * and lets the other task start the repeater. - * - * QUESTION: is there a better way to test for a port in use? - * ANSWER: none that I can find. - * - * Problems with checking for the repeater installed - * by attempting to bind a socket to its address - * and port. - * - * 1) Closed socket may not release the bound port - * before the repeater wakes up and tries to grab it. - * Attempting to bind the open socket to another port - * also does not work. - * - * 072392 - problem solved by using SO_REUSEADDR - */ -epicsShareFunc void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repeaterPort ) -{ - bool installed = false; - int status; - SOCKET tmpSock; - struct sockaddr_in bd; - int flag; - - if ( repeaterPort > 0xffff ) { - ca_printf ( "caStartRepeaterIfNotInstalled () : strange repeater port specified\n"); - return; - } - - tmpSock = socket ( AF_INET, SOCK_DGRAM, IPPROTO_UDP ); - if ( tmpSock != INVALID_SOCKET ) { - ca_uint16_t port = static_cast < ca_uint16_t > ( repeaterPort ); - memset ( (char *) &bd, 0, sizeof ( bd ) ); - bd.sin_family = AF_INET; - bd.sin_addr.s_addr = htonl ( INADDR_ANY ); - bd.sin_port = htons ( port ); - status = bind ( tmpSock, (struct sockaddr *) &bd, sizeof ( bd ) ); - if ( status < 0 ) { - if ( SOCKERRNO == SOCK_EADDRINUSE ) { - installed = true; - } - else { - ca_printf ( "caStartRepeaterIfNotInstalled () : bind failed\n"); - } - } - } - - /* - * turn on reuse only after the test so that - * this works on kernels that support multicast - */ - flag = TRUE; - status = setsockopt ( tmpSock, SOL_SOCKET, SO_REUSEADDR, - (char *) &flag, sizeof ( flag ) ); - if ( status < 0 ) { - ca_printf ( "caStartRepeaterIfNotInstalled () : set socket option reuseaddr set failed\n"); - } - - socket_close ( tmpSock ); - - if ( ! installed ) { - osiSpawnDetachedProcessReturn osptr; - - /* - * This is not called if the repeater is known to be - * already running. (in the event of a race condition - * the 2nd repeater exits when unable to attach to the - * repeater's port) - */ - osptr = osiSpawnDetachedProcess ( "CA Repeater", "caRepeater" ); - if ( osptr == osiSpawnDetachedProcessNoSupport ) { - epicsThreadId tid; - - tid = epicsThreadCreate ( "CAC-repeater", epicsThreadPriorityLow, - epicsThreadGetStackSize (epicsThreadStackMedium), caRepeaterThread, 0); - if ( tid == 0 ) { - ca_printf ("caStartRepeaterIfNotInstalled : unable to create CA repeater daemon thread\n"); - } - } - else if ( osptr == osiSpawnDetachedProcessFail ) { - ca_printf ( "caStartRepeaterIfNotInstalled (): unable to start CA repeater daemon detached process\n" ); - } - } -} - // // udpiiu::udpiiu () // @@ -336,11 +49,11 @@ udpiiu::udpiiu ( cac &cac ) : { static const unsigned short PORT_ANY = 0u; osiSockAddr addr; - int boolValue = TRUE; + int boolValue = true; int status; this->repeaterPort = - envGetInetPortConfigParam (&EPICS_CA_REPEATER_PORT, CA_REPEATER_PORT); + envGetInetPortConfigParam ( &EPICS_CA_REPEATER_PORT, CA_REPEATER_PORT ); this->serverPort = envGetInetPortConfigParam ( &EPICS_CA_SERVER_PORT, CA_SERVER_PORT ); @@ -353,10 +66,10 @@ udpiiu::udpiiu ( cac &cac ) : } status = setsockopt ( this->sock, SOL_SOCKET, SO_BROADCAST, - (char *) &boolValue, sizeof (boolValue) ); - if (status<0) { - ca_printf ("CAC: unable to enable IP broadcasting because = \"%s\"\n", - SOCKERRSTR (SOCKERRNO)); + (char *) &boolValue, sizeof ( boolValue ) ); + if ( status < 0 ) { + ca_printf ("CAC: IP broadcasting enable failed because = \"%s\"\n", + SOCKERRSTR ( SOCKERRNO ) ); } #if 0 @@ -454,7 +167,7 @@ udpiiu::udpiiu ( cac &cac ) : void *fdRegArg; this->pCAC ()->getFDRegCallback ( fdRegFunc, fdRegArg ); if ( fdRegFunc ) { - ( *fdRegFunc ) ( fdRegArg, this->sock, TRUE ); + ( *fdRegFunc ) ( fdRegArg, this->sock, true ); } } @@ -474,7 +187,7 @@ udpiiu::~udpiiu () void *fdRegArg; this->pCAC ()->getFDRegCallback ( fdRegFunc, fdRegArg ); if ( fdRegFunc ) { - ( *fdRegFunc ) ( fdRegArg, this->sock, FALSE ); + ( *fdRegFunc ) ( fdRegArg, this->sock, false ); } if ( ! this->sockCloseCompleted ) { @@ -482,77 +195,326 @@ udpiiu::~udpiiu () } } -/* - * udpiiu::shutdown () - */ -void udpiiu::shutdown () +// +// udpiiu::recvMsg () +// +void udpiiu::recvMsg () { - bool laborNeeded; + osiSockAddr src; + osiSocklen_t src_size = sizeof ( src ); + int status; - { - epicsAutoMutex autoMutex ( this->mutex ); - laborNeeded = ! this->shutdownCmd; - this->shutdownCmd = true; - } + status = recvfrom ( this->sock, this->recvBuf, sizeof ( this->recvBuf ), 0, + &src.sa, &src_size ); + if ( status <= 0 ) { - if ( laborNeeded ) { - int status; - osiSockAddr addr; - - caHdr msg; - msg.m_cmmd = htons ( CA_PROTO_NOOP ); - msg.m_available = htonl ( 0u ); - msg.m_dataType = htons ( 0u ); - msg.m_count = htons ( 0u ); - msg.m_cid = htonl ( 0u ); - msg.m_postsize = htons ( 0u ); - - addr.ia.sin_family = AF_INET; - addr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK ); - addr.ia.sin_port = htons ( this->localPort ); - - // send a wakeup msg so the UDP recv thread will exit - status = sendto ( this->sock, reinterpret_cast < const char * > ( &msg ), - sizeof (msg), 0, &addr.sa, sizeof ( addr.sa ) ); - if ( status < 0 ) { - // this knocks the UDP input thread out of recv () - // on all os except linux - status = socket_close ( this->sock ); - if ( status ) { - errlogPrintf ("CAC UDP socket close error was %s\n", - SOCKERRSTR ( SOCKERRNO ) ); - } - else { - this->sockCloseCompleted = true; - } + if ( status == 0 ) { + return; } - // wait for recv threads to exit - epicsEventMustWait ( this->recvThreadExitSignal ); + int errnoCpy = SOCKERRNO; + + if ( errnoCpy == SOCK_SHUTDOWN ) { + return; + } + if ( errnoCpy == SOCK_ENOTSOCK ) { + return; + } + if ( errnoCpy == SOCK_EBADF ) { + return; + } + if ( errnoCpy == SOCK_EINTR ) { + return; + } +# ifdef linux + /* + * Avoid spurious ECONNREFUSED bug + * in linux + */ + if ( errnoCpy == SOCK_ECONNREFUSED ) { + return; + } +# endif + ca_printf ( "Unexpected UDP recv error was \"%s\"\n", + SOCKERRSTR (errnoCpy) ); + } + else if ( status > 0 ) { + this->postMsg ( src, + this->recvBuf, (unsigned long) status ); } -} - -void udpiiu::badUDPRespAction ( const caHdr &msg, const osiSockAddr &netAddr ) -{ - char buf[256]; - sockAddrToDottedIP ( &netAddr.sa, buf, sizeof ( buf ) ); - ca_printf ( "CAC: Bad response code in UDP message from %s was %u\n", - buf, msg.m_cmmd); -} - -void udpiiu::noopAction ( const caHdr &, const osiSockAddr & ) -{ return; } -void udpiiu::searchRespAction ( const caHdr &msg, const osiSockAddr &addr ) +/* + * cacRecvThreadUDP () + */ +extern "C" void cacRecvThreadUDP ( void *pParam ) +{ + udpiiu *piiu = (udpiiu *) pParam; + do { + piiu->recvMsg (); + } while ( ! piiu->shutdownCmd ); + epicsEventSignal ( piiu->recvThreadExitSignal ); +} + +/* + * udpiiu::repeaterRegistrationMessage () + * + * register with the repeater + */ +void udpiiu::repeaterRegistrationMessage ( unsigned attemptNumber ) +{ + caRepeaterRegistrationMessage ( this->sock, this->repeaterPort, attemptNumber ); +} + +/* + * caRepeaterRegistrationMessage () + * + * register with the repeater + */ +epicsShareFunc void epicsShareAPI caRepeaterRegistrationMessage ( + SOCKET sock, unsigned repeaterPort, unsigned attemptNumber ) +{ + caHdr msg; + osiSockAddr saddr; + int status; + int len; + + /* + * In 3.13 beta 11 and before the CA repeater calls local_addr() + * to determine a local address and does not allow registration + * messages originating from other addresses. In these + * releases local_addr() returned the address of the first enabled + * interface found, and this address may or may not have been the loop + * back address. Starting with 3.13 beta 12 local_addr() was + * changed to always return the address of the first enabled + * non-loopback interface because a valid non-loopback local + * address is required in the beacon messages. Therefore, to + * guarantee compatibility with past versions of the repeater + * we alternate between the address returned by local_addr() + * and the loopback address here. + * + * CA repeaters in R3.13 beta 12 and higher allow + * either the loopback address or the address returned + * by local address (the first non-loopback address found) + */ + if ( attemptNumber & 1 ) { + saddr = osiLocalAddr ( sock ); + if ( saddr.sa.sa_family != AF_INET ) { + /* + * use the loop back address to communicate with the CA repeater + * if this os does not have interface query capabilities + * + * this will only work with 3.13 beta 12 CA repeaters or later + */ + saddr.ia.sin_family = AF_INET; + saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK ); + saddr.ia.sin_port = htons ( repeaterPort ); + } + else { + saddr.ia.sin_port = htons ( repeaterPort ); + } + } + else { + saddr.ia.sin_family = AF_INET; + saddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK ); + saddr.ia.sin_port = htons ( repeaterPort ); + } + + memset ( (char *) &msg, 0, sizeof (msg) ); + msg.m_cmmd = htons ( REPEATER_REGISTER ); + msg.m_available = saddr.ia.sin_addr.s_addr; + + /* + * Intentionally sending a zero length message here + * until most CA repeater daemons have been restarted + * (and only then will they accept the above protocol) + * (repeaters began accepting this protocol + * starting with EPICS 3.12) + */ +# if defined ( DOES_NOT_ACCEPT_ZERO_LENGTH_UDP ) + len = sizeof (msg); +# else + len = 0; +# endif + + status = sendto ( sock, (char *) &msg, len, + 0, (struct sockaddr *) &saddr, sizeof ( saddr ) ); + if ( status < 0 ) { + int errnoCpy = SOCKERRNO; + /* + * Different OS return different codes when the repeater isnt running. + * Its ok to supress these messages because I print another warning message + * if we time out registerring with the repeater. + * + * Linux returns SOCK_ECONNREFUSED + * Windows 2000 returns SOCK_ECONNRESET + */ + if ( errnoCpy != SOCK_EINTR && + errnoCpy != SOCK_ECONNREFUSED && + errnoCpy != SOCK_ECONNRESET ) { + ca_printf ( "CAC: error sending registration message to CA repeater daemon was \"%s\"\n", + SOCKERRSTR ( errnoCpy ) ); + } + } +} + +/* + * caStartRepeaterIfNotInstalled () + * + * Test for the repeater already installed + * + * NOTE: potential race condition here can result + * in two copies of the repeater being spawned + * however the repeater detects this, prints a message, + * and lets the other task start the repeater. + * + * QUESTION: is there a better way to test for a port in use? + * ANSWER: none that I can find. + * + * Problems with checking for the repeater installed + * by attempting to bind a socket to its address + * and port. + * + * 1) Closed socket may not release the bound port + * before the repeater wakes up and tries to grab it. + * Attempting to bind the open socket to another port + * also does not work. + * + * 072392 - problem solved by using SO_REUSEADDR + */ +epicsShareFunc void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repeaterPort ) +{ + bool installed = false; + int status; + SOCKET tmpSock; + struct sockaddr_in bd; + int flag; + + if ( repeaterPort > 0xffff ) { + ca_printf ( "caStartRepeaterIfNotInstalled () : strange repeater port specified\n"); + return; + } + + tmpSock = socket ( AF_INET, SOCK_DGRAM, IPPROTO_UDP ); + if ( tmpSock != INVALID_SOCKET ) { + ca_uint16_t port = static_cast < ca_uint16_t > ( repeaterPort ); + memset ( (char *) &bd, 0, sizeof ( bd ) ); + bd.sin_family = AF_INET; + bd.sin_addr.s_addr = htonl ( INADDR_ANY ); + bd.sin_port = htons ( port ); + status = bind ( tmpSock, (struct sockaddr *) &bd, sizeof ( bd ) ); + if ( status < 0 ) { + if ( SOCKERRNO == SOCK_EADDRINUSE ) { + installed = true; + } + else { + ca_printf ( "caStartRepeaterIfNotInstalled () : bind failed\n"); + } + } + } + + /* + * turn on reuse only after the test so that + * this works on kernels that support multicast + */ + flag = true; + status = setsockopt ( tmpSock, SOL_SOCKET, SO_REUSEADDR, + (char *) &flag, sizeof ( flag ) ); + if ( status < 0 ) { + ca_printf ( "caStartRepeaterIfNotInstalled () : set socket option reuseaddr set failed\n"); + } + + socket_close ( tmpSock ); + + if ( ! installed ) { + + /* + * This is not called if the repeater is known to be + * already running. (in the event of a race condition + * the 2nd repeater exits when unable to attach to the + * repeater's port) + */ + osiSpawnDetachedProcessReturn osptr = + osiSpawnDetachedProcess ( "CA Repeater", "caRepeater" ); + if ( osptr == osiSpawnDetachedProcessNoSupport ) { + epicsThreadId tid; + + tid = epicsThreadCreate ( "CAC-repeater", epicsThreadPriorityLow, + epicsThreadGetStackSize (epicsThreadStackMedium), caRepeaterThread, 0); + if ( tid == 0 ) { + ca_printf ("caStartRepeaterIfNotInstalled : unable to create CA repeater daemon thread\n"); + } + } + else if ( osptr == osiSpawnDetachedProcessFail ) { + ca_printf ( "caStartRepeaterIfNotInstalled (): unable to start CA repeater daemon detached process\n" ); + } + } +} + +void udpiiu::shutdown () +{ + { + epicsAutoMutex autoMutex ( this->mutex ); + if ( this->shutdownCmd ) { + return; + } + this->shutdownCmd = true; + } + + caHdr msg; + msg.m_cmmd = htons ( CA_PROTO_NOOP ); + msg.m_available = htonl ( 0u ); + msg.m_dataType = htons ( 0u ); + msg.m_count = htons ( 0u ); + msg.m_cid = htonl ( 0u ); + msg.m_postsize = htons ( 0u ); + + osiSockAddr addr; + addr.ia.sin_family = AF_INET; + addr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK ); + addr.ia.sin_port = htons ( this->localPort ); + + // send a wakeup msg so the UDP recv thread will exit + int status = sendto ( this->sock, reinterpret_cast < const char * > ( &msg ), + sizeof (msg), 0, &addr.sa, sizeof ( addr.sa ) ); + if ( status < 0 ) { + // this knocks the UDP input thread out of recv () + // on all os except linux + status = socket_close ( this->sock ); + if ( status == 0 ) { + this->sockCloseCompleted = true; + } + else { + errlogPrintf ("CAC UDP socket close error was %s\n", + SOCKERRSTR ( SOCKERRNO ) ); + } + } + + // wait for recv threads to exit + epicsEventMustWait ( this->recvThreadExitSignal ); +} + +bool udpiiu::badUDPRespAction ( const caHdr &msg, const osiSockAddr &netAddr ) +{ + ca_printf ( "CAC: undecipherable ( bad msg code %u ) UDP message\n", + msg.m_cmmd ); + return false; +} + +bool udpiiu::noopAction ( const caHdr &, const osiSockAddr & ) +{ + return true; +} + +bool udpiiu::searchRespAction ( const caHdr &msg, const osiSockAddr &addr ) { osiSockAddr serverAddr; unsigned minorVersion; ca_uint16_t *pMinorVersion; if ( addr.sa.sa_family != AF_INET ) { - return; + return false; } /* @@ -573,7 +535,7 @@ void udpiiu::searchRespAction ( const caHdr &msg, const osiSockAddr &addr ) * so that we can have multiple servers on one host */ serverAddr.ia.sin_family = AF_INET; - if ( CA_V48 (CA_PROTOCOL_VERSION,minorVersion) ) { + if ( CA_V48 ( CA_PROTOCOL_VERSION,minorVersion ) ) { if ( msg.m_cid != INADDR_BROADCAST ) { /* * Leave address in network byte order (m_cid has not been @@ -596,23 +558,23 @@ void udpiiu::searchRespAction ( const caHdr &msg, const osiSockAddr &addr ) } if ( CA_V42 ( CA_PROTOCOL_VERSION, minorVersion ) ) { - this->pCAC ()->lookupChannelAndTransferToTCP + return this->pCAC ()->lookupChannelAndTransferToTCP ( msg.m_available, msg.m_cid, USHRT_MAX, 0, minorVersion, serverAddr ); } else { - this->pCAC ()->lookupChannelAndTransferToTCP + return this->pCAC ()->lookupChannelAndTransferToTCP ( msg.m_available, msg.m_cid, msg.m_dataType, msg.m_count, minorVersion, serverAddr ); } } -void udpiiu::beaconAction ( const caHdr &msg, const osiSockAddr &net_addr ) +bool udpiiu::beaconAction ( const caHdr &msg, const osiSockAddr &net_addr ) { struct sockaddr_in ina; if ( net_addr.sa.sa_family != AF_INET ) { - return; + return false; } /* @@ -644,20 +606,21 @@ void udpiiu::beaconAction ( const caHdr &msg, const osiSockAddr &net_addr ) this->pCAC ()->beaconNotify ( ina ); - return; + return true; } -void udpiiu::repeaterAckAction ( const caHdr &, const osiSockAddr &) +bool udpiiu::repeaterAckAction ( const caHdr &, const osiSockAddr &) { this->pCAC ()->repeaterSubscribeConfirmNotify (); + return true; } -void udpiiu::notHereRespAction ( const caHdr &, const osiSockAddr &) +bool udpiiu::notHereRespAction ( const caHdr &, const osiSockAddr &) { - return; + return true; } -void udpiiu::exceptionRespAction ( const caHdr &msg, const osiSockAddr &net_addr ) +bool udpiiu::exceptionRespAction ( const caHdr &msg, const osiSockAddr &net_addr ) { const caHdr &reqMsg = * ( &msg + 1 ); char name[64]; @@ -674,14 +637,10 @@ void udpiiu::exceptionRespAction ( const caHdr &msg, const osiSockAddr &net_addr ca_message ( htonl ( msg.m_available ) ), name ); } - return; + return true; } - -/* - * post_udp_msg () - */ -int udpiiu::postMsg ( const osiSockAddr &net_addr, +void udpiiu::postMsg ( const osiSockAddr &net_addr, char *pInBuf, unsigned long blockSize ) { caHdr *pCurMsg; @@ -690,10 +649,15 @@ int udpiiu::postMsg ( const osiSockAddr &net_addr, unsigned long size; if ( blockSize < sizeof ( *pCurMsg ) ) { - return ECA_TOLARGE; + char buf[64]; + sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); + ca_printf ( + "%s: undecipherable (too small) UDP msg from %s ignored\n", __FILE__, + buf ); + return; } - pCurMsg = reinterpret_cast ( pInBuf ); + pCurMsg = reinterpret_cast < caHdr * > ( pInBuf ); /* * fix endian of bytes @@ -720,25 +684,35 @@ int udpiiu::postMsg ( const osiSockAddr &net_addr, * dont allow msg body extending beyond frame boundary */ if ( size > blockSize ) { - return ECA_TOLARGE; + char buf[64]; + sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); + ca_printf ( + "%s: undecipherable (payload too small) UDP msg from %s ignored\n", __FILE__, + buf ); + return; } /* * execute the response message */ - pProtoStubUDP pStub; - if ( pCurMsg->m_cmmd >= NELEMENTS ( udpJumpTableCAC ) ) { - pStub = &udpiiu::badUDPRespAction; - } - else { + pProtoStubUDP pStub; + if ( pCurMsg->m_cmmd < NELEMENTS ( udpJumpTableCAC ) ) { pStub = udpJumpTableCAC [pCurMsg->m_cmmd]; } - (this->*pStub) ( *pCurMsg, net_addr); + else { + pStub = &udpiiu::badUDPRespAction; + } + bool success = ( this->*pStub ) ( *pCurMsg, net_addr ); + if ( ! success ) { + char buf[256]; + sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); + ca_printf ( "CAC: undecipherable UDP message from %s\n", buf ); + return; + } blockSize -= size; pInBuf += size;; } - return ECA_NORMAL; } /* @@ -767,7 +741,7 @@ bool udpiiu::pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t e pbufmsg = (caHdr *) &this->xmitBuf[this->nBytesInXmitBuf]; *pbufmsg = msg; - memcpy ( pbufmsg+1, pExt, extsize ); + memcpy ( pbufmsg + 1, pExt, extsize ); if ( extsize != allignedExtSize ) { char *pDest = (char *) ( pbufmsg + 1 ); memset ( pDest + extsize, '\0', allignedExtSize - extsize );