From 420ad02d89f38e10028ff3c608f3b1f173478517 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Thu, 25 Apr 2002 18:21:20 +0000 Subject: [PATCH] many structural improvements --- src/ca/repeater.cpp | 41 +-- src/ca/syncGroup.h | 55 +++- src/ca/syncGroupReadNotify.cpp | 15 +- src/ca/syncGroupWriteNotify.cpp | 16 +- src/ca/tcpiiu.cpp | 275 +++++++++++-------- src/ca/udpiiu.cpp | 468 +++++++++++++++++++++++--------- src/ca/udpiiu.h | 93 +++++-- src/ca/virtualCircuit.h | 63 +++-- 8 files changed, 700 insertions(+), 326 deletions(-) diff --git a/src/ca/repeater.cpp b/src/ca/repeater.cpp index bb72fe2da..516c65056 100644 --- a/src/ca/repeater.cpp +++ b/src/ca/repeater.cpp @@ -61,38 +61,13 @@ #include "tsFreeList.h" #include "osiWireFormat.h" +#define epicsExportSharedSymbols #include "iocinf.h" #include "caProto.h" #include "taskwd.h" - -#define epicsExportSharedSymbols #include "udpiiu.h" -#undef epicsExportSharedSymbols +#include "repeaterClient.h" -/* - * one socket per client so we will get the ECONNREFUSED - * error code (and then delete the client) - */ -class repeaterClient : public tsDLNode < repeaterClient > { -public: - repeaterClient ( const osiSockAddr &from ); - bool connect (); - bool sendConfirm (); - bool sendMessage ( const void *pBuf, unsigned bufSize ); - void destroy (); - bool verify (); - bool identicalAddress ( const osiSockAddr &from ); - bool identicalPort ( const osiSockAddr &from ); - void * operator new ( size_t size ); - void operator delete ( void *pCadaver, size_t size ); -protected: - ~repeaterClient (); -private: - osiSockAddr from; - SOCKET sock; - unsigned short port () const; - static epicsSingleton < tsFreeList < class repeaterClient, 0x20 > > pFreeList; -}; /* * these can be external since there is only one instance @@ -100,18 +75,6 @@ private: */ static tsDLList < repeaterClient > client_list; -#ifdef _MSC_VER -# pragma warning ( push ) -# pragma warning ( disable:4660 ) -#endif - -template class tsFreeList < repeaterClient, 0x20 >; -template class epicsSingleton < tsFreeList < repeaterClient, 0x20 > >; - -#ifdef _MSC_VER -# pragma warning ( pop ) -#endif - epicsSingleton < tsFreeList < repeaterClient, 0x20 > > repeaterClient::pFreeList; static char buf [MAX_UDP_RECV]; diff --git a/src/ca/syncGroup.h b/src/ca/syncGroup.h index a5cb2bf72..6701babad 100644 --- a/src/ca/syncGroup.h +++ b/src/ca/syncGroup.h @@ -39,6 +39,15 @@ #include "cadef.h" #include "cacIO.h" +// does the local compiler support placement delete +#if defined (_MSC_VER) +# if _MSC_VER >= 1200 +# define CASG_PLACEMENT_DELETE +# endif +#else +# define CASG_PLACEMENT_DELETE +#endif + static const unsigned CASG_MAGIC = 0xFAB4CAFE; // used to control access to CASG's recycle routines which @@ -82,8 +91,8 @@ private: syncGroupReadNotify ( struct CASG &sgIn, chid, void *pValueIn ); void * operator new ( size_t, tsFreeList < class syncGroupReadNotify, 128, epicsMutexNOOP > & ); -# if ! defined ( NO_PLACEMENT_DELETE ) - void operator delete ( void *, size_t, +# if defined ( CASG_PLACEMENT_DELETE ) + void operator delete ( void *, tsFreeList < class syncGroupReadNotify, 128, epicsMutexNOOP > & ); # endif void completion ( @@ -92,6 +101,12 @@ private: int status, const char *pContext, unsigned type, arrayElementCount count ); syncGroupReadNotify ( const syncGroupReadNotify & ); syncGroupReadNotify & operator = ( const syncGroupReadNotify & ); +# if defined (_MSC_VER) && _MSC_VER == 1300 + void operator delete ( void * ); // avoid visual c++ 7 bug +# endif +# if __GNUC__==2 && __GNUC_MINOR_<96 + void operator delete ( void *, size_t ); // avoid gnu g++ bug +# endif }; class syncGroupWriteNotify : public syncGroupNotify, public cacWriteNotify { @@ -110,8 +125,8 @@ private: syncGroupWriteNotify ( struct CASG &, chid ); void * operator new ( size_t, tsFreeList < class syncGroupWriteNotify, 128, epicsMutexNOOP > & ); -# if ! defined ( NO_PLACEMENT_DELETE ) - void operator delete ( void *, size_t, +# if defined ( CASG_PLACEMENT_DELETE ) + void operator delete ( void *, tsFreeList < class syncGroupWriteNotify, 128, epicsMutexNOOP > & ); # endif void completion (); @@ -119,10 +134,25 @@ private: unsigned type, arrayElementCount count ); syncGroupWriteNotify ( const syncGroupWriteNotify & ); syncGroupWriteNotify & operator = ( const syncGroupWriteNotify & ); +# if defined (_MSC_VER) && _MSC_VER == 1300 + void operator delete ( void * ); // avoid visual c++ 7 bug +# endif +# if __GNUC__==2 && __GNUC_MINOR_<96 + void operator delete ( void *, size_t ); // avoid gnu g++ bug +# endif }; struct oldCAC; +class casgMutex { +public: + void lock (); + void unlock (); + void show ( unsigned level ) const; +private: + epicsMutex mutex; +}; + struct CASG : public chronIntIdRes < CASG >, private casgRecycle { public: CASG ( oldCAC & cacIn ); @@ -148,7 +178,7 @@ protected: private: tsDLList < syncGroupNotify > ioPendingList; tsDLList < syncGroupNotify > ioCompletedList; - epicsMutex mutable mutex; + casgMutex mutable mutex; epicsEvent sem; oldCAC & client; unsigned magic; @@ -171,4 +201,19 @@ inline bool syncGroupNotify::ioInitiated () const return this->idIsValid; } +inline void casgMutex::lock () +{ + this->mutex.lock (); +} + +inline void casgMutex::unlock () +{ + this->mutex.unlock (); +} + +inline void casgMutex::show ( unsigned level ) const +{ + this->mutex.show ( level ); +} + #endif // ifdef syncGrouph diff --git a/src/ca/syncGroupReadNotify.cpp b/src/ca/syncGroupReadNotify.cpp index 4d8d6e46d..cfbe7a3c3 100644 --- a/src/ca/syncGroupReadNotify.cpp +++ b/src/ca/syncGroupReadNotify.cpp @@ -28,6 +28,8 @@ * */ +#include + #define epicsAssertAuthor "Jeff Hill johill@lanl.gov" #define epicsExportSharedSymbols @@ -108,11 +110,18 @@ void * syncGroupReadNotify::operator new ( size_t size, return freeList.allocate ( size ); } -#if ! defined ( NO_PLACEMENT_DELETE ) -void syncGroupReadNotify::operator delete ( void *pCadaver, size_t size, +#if defined ( CASG_PLACEMENT_DELETE ) +void syncGroupReadNotify::operator delete ( void *pCadaver, tsFreeList < class syncGroupReadNotify, 128, epicsMutexNOOP > &freeList ) { - freeList.release ( pCadaver, size ); + freeList.release ( pCadaver, sizeof ( syncGroupReadNotify ) ); } #endif +# if defined (_MSC_VER) && _MSC_VER == 1300 + void syncGroupReadNotify::operator delete ( void * ) // avoid visual c++ 7 bug + { + throw std::logic_error ( "_MSC_VER == 1300 bogus stub called?" ); + } +# endif + diff --git a/src/ca/syncGroupWriteNotify.cpp b/src/ca/syncGroupWriteNotify.cpp index 92c721d28..e3c7ec695 100644 --- a/src/ca/syncGroupWriteNotify.cpp +++ b/src/ca/syncGroupWriteNotify.cpp @@ -28,6 +28,8 @@ * */ +#include + #define epicsAssertAuthor "Jeff Hill johill@lanl.gov" #define epicsExportSharedSymbols @@ -103,10 +105,18 @@ void * syncGroupWriteNotify::operator new ( size_t size, return freeList.allocate ( size ); } -#if ! defined ( NO_PLACEMENT_DELETE ) -void syncGroupWriteNotify::operator delete ( void *pCadaver, size_t size, +#if defined ( CASG_PLACEMENT_DELETE ) +void syncGroupWriteNotify::operator delete ( void *pCadaver, tsFreeList < class syncGroupWriteNotify, 128, epicsMutexNOOP > &freeList ) { - freeList.release ( pCadaver, size ); + freeList.release ( pCadaver, sizeof ( syncGroupWriteNotify ) ); } #endif + +# if defined (_MSC_VER) && _MSC_VER == 1300 + void syncGroupWriteNotify::operator delete ( void * ) // avoid visual c++ 7 bug + { + throw std::logic_error ( "_MSC_VER == 1300 bogus stub called?" ); + } +# endif + diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 9f45eefb9..8df88eae8 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -10,6 +10,11 @@ * * Author: Jeff Hill */ + +#ifdef _MSC_VER +# pragma warning(disable:4355) +#endif + #define epicsAssertAuthor "Jeff Hill johill@lanl.gov" #define epicsExportSharedSymbols @@ -45,6 +50,11 @@ void tcpSendThread::start () this->thread.start (); } +bool tcpSendThread::exitWait ( double delay ) +{ + return thread.exitWait ( delay ); +} + void tcpSendThread::run () { try { @@ -59,32 +69,32 @@ void tcpSendThread::run () } { - epicsGuard < epicsMutex > autoMutex ( this->iiu.pCAC()->mutexRef() ); + epicsGuard < cacMutex > guard ( this->iiu.cacRef.mutexRef() ); flowControlLaborNeeded = this->iiu.busyStateDetected != this->iiu.flowControlActive; echoLaborNeeded = this->iiu.echoRequestPending; this->iiu.echoRequestPending = false; - } + + if ( flowControlLaborNeeded ) { + if ( this->iiu.flowControlActive ) { + this->iiu.disableFlowControlRequest ( guard ); + this->iiu.flowControlActive = false; + debugPrintf ( ( "fc off\n" ) ); + } + else { + this->iiu.enableFlowControlRequest ( guard ); + this->iiu.flowControlActive = true; + debugPrintf ( ( "fc on\n" ) ); + } + } - if ( flowControlLaborNeeded ) { - if ( this->iiu.flowControlActive ) { - this->iiu.disableFlowControlRequest (); - this->iiu.flowControlActive = false; - debugPrintf ( ( "fc off\n" ) ); - } - else { - this->iiu.enableFlowControlRequest (); - this->iiu.flowControlActive = true; - debugPrintf ( ( "fc on\n" ) ); - } - } - - if ( echoLaborNeeded ) { - if ( CA_V43 ( this->iiu.minorProtocolVersion ) ) { - this->iiu.echoRequest (); - } - else { - this->iiu.versionMessage ( this->iiu.priority() ); + if ( echoLaborNeeded ) { + if ( CA_V43 ( this->iiu.minorProtocolVersion ) ) { + this->iiu.echoRequest ( guard ); + } + else { + this->iiu.versionMessage ( guard, this->iiu.priority() ); + } } } @@ -97,8 +107,6 @@ void tcpSendThread::run () this->iiu.printf ("cac: tcp send thread received an unexpected exception - disconnecting\n"); this->iiu.forcedShutdown (); } - - this->iiu.sendThreadExitEvent.signal (); } unsigned tcpiiu::sendBytes ( const void *pBuf, @@ -143,7 +151,7 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, if ( localError != SOCK_EPIPE && localError != SOCK_ECONNRESET && localError != SOCK_ETIMEDOUT && localError != SOCK_ECONNABORTED ) { - this->printf ( "CAC: unexpected TCP send error: %s\n", SOCKERRSTR (localError) ); + this->cacRef.printf ( "CAC: unexpected TCP send error: %s\n", SOCKERRSTR (localError) ); } this->state = iiu_disconnected; @@ -227,19 +235,16 @@ void tcpRecvThread::start () void tcpRecvThread::run () { try { - this->iiu.pCAC()->attachToClientCtx (); + this->iiu.cacRef.attachToClientCtx (); epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu ); this->iiu.connect (); - this->iiu.versionMessage ( this->iiu.priority() ); - if ( this->iiu.state == iiu_connected ) { this->iiu.sendThread.start (); } else { - this->iiu.sendThreadExitEvent.signal (); this->iiu.cleanShutdown (); } @@ -257,7 +262,7 @@ void tcpRecvThread::run () // appear to impact performance. // unsigned nBytesIn; - if ( this->iiu.pCAC()->preemptiveCallbakIsEnabled() ) { + if ( this->iiu.cacRef.preemptiveCallbakIsEnabled() ) { nBytesIn = pComBuf->fillFromWire ( this->iiu ); if ( nBytesIn == 0u ) { break; @@ -285,7 +290,7 @@ void tcpRecvThread::run () // this lock get a chance to run epicsGuard < callbackMutex > guard ( this->cbMutex ); - if ( ! this->iiu.pCAC()->preemptiveCallbakIsEnabled() ) { + if ( ! this->iiu.cacRef.preemptiveCallbakIsEnabled() ) { nBytesIn = pComBuf->fillFromWire ( this->iiu ); if ( nBytesIn == 0u ) { // outer loop checks to see if state is connected @@ -368,7 +373,6 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, epicsTimerQueue & timerQueue, const osiSockAddr & addrIn, unsigned minorVersion, ipAddrToAsciiEngine & engineIn, const cacChannel::priLev & priorityIn ) : - netiiu ( & cac ), caServerID ( addrIn.ia, priorityIn ), recvThread ( *this, cbMutex, "CAC-TCP-recv", epicsThreadGetStackSize ( epicsThreadStackBig ), @@ -377,7 +381,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, epicsThreadGetStackSize ( epicsThreadStackMedium ), cac::lowestPriorityLevelAbove ( cac::lowestPriorityLevelAbove ( - this->pCAC()->getInitializingThreadsPriority() ) ) ), + cac.getInitializingThreadsPriority() ) ) ), recvDog ( *this, connectionTimeout, timerQueue ), sendDog ( *this, connectionTimeout, timerQueue ), killTimer ( cac, *this, timerQueue ), @@ -385,6 +389,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, curDataMax ( MAX_TCP ), curDataBytes ( 0ul ), pHostNameCache ( new hostNameCache ( addrIn, engineIn ) ), + cacRef ( cac ), pCurData ( cac.allocateSmallBufferTCP () ), minorProtocolVersion ( minorVersion ), state ( iiu_connecting ), @@ -427,9 +432,13 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, } // load message queue with messages informing server - // of user and host name of client - this->userNameSetRequest (); - this->hostNameSetRequest (); + // of version, user, and host name of client + { + epicsGuard < cacMutex > guard ( this->cacRef.mutexRef() ); + this->versionMessage ( guard, this->priority() ); + this->userNameSetRequest ( guard ); + this->hostNameSetRequest ( guard ); + } # if 0 { @@ -480,7 +489,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, void tcpiiu::start ( epicsGuard < callbackMutex > & cbGuard ) { this->recvThread.start (); - this->pCAC()->notifyNewFD ( cbGuard, this->sock ); + this->cacRef.notifyNewFD ( cbGuard, this->sock ); } /* @@ -501,7 +510,7 @@ void tcpiiu::connect () this->sendDog.cancel (); - epicsGuard < epicsMutex > autoMutex ( this->pCAC()->mutexRef() ); + epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() ); if ( this->state == iiu_connecting ) { // put the iiu into the connected state @@ -539,28 +548,32 @@ void tcpiiu::connect () void tcpiiu::cleanShutdown () { - this->pCAC()->tcpCircuitShutdown ( *this, false ); + this->cacRef.tcpCircuitShutdown ( *this, false ); } void tcpiiu::forcedShutdown () { - this->pCAC()->tcpCircuitShutdown ( *this, true ); + this->cacRef.tcpCircuitShutdown ( *this, true ); } // // tcpiiu::shutdown () // -// caller must hold callback mutex and also primary cac mutex -// when calling this routine -// -void tcpiiu::shutdown ( epicsGuard < callbackMutex > & cbLocker, bool discardPendingMessages ) +void tcpiiu::shutdown ( epicsGuard < callbackMutex > & cbGuard, + epicsGuard < cacMutex > & guard, + bool discardPendingMessages ) { if ( ! this->sockCloseCompleted ) { - this->pCAC()->notifyDestroyFD ( cbLocker, this->sock ); + + this->sockCloseCompleted = true; + + { + epicsGuardRelease < cacMutex > guardRelease ( guard ); + this->cacRef.notifyDestroyFD ( cbGuard, this->sock ); + } iiu_conn_state oldState = this->state; this->state = iiu_disconnected; - this->sockCloseCompleted = true; if ( discardPendingMessages ) { // force abortive shutdown sequence @@ -607,14 +620,10 @@ void tcpiiu::stopThreads () // wait for send thread to exit static const double shutdownDelay = 15.0; - while ( true ) { - bool signaled = this->sendThreadExitEvent.wait ( shutdownDelay ); - if ( signaled ) { - break; - } + while ( ! this->sendThread.exitWait ( shutdownDelay ) ) { if ( ! this->sockCloseCompleted ) { printf ( "Gave up waiting for \"shutdown()\" to force send thread to exit after %f sec\n", - shutdownDelay); + shutdownDelay ); printf ( "Closing socket\n" ); int status = socket_close ( this->sock ); if ( status ) { @@ -657,46 +666,50 @@ tcpiiu::~tcpiiu () // free message body cache if ( this->pCurData ) { if ( this->curDataMax == MAX_TCP ) { - this->pCAC()->releaseSmallBufferTCP ( this->pCurData ); + this->cacRef.releaseSmallBufferTCP ( this->pCurData ); } else { - this->pCAC()->releaseLargeBufferTCP ( this->pCurData ); + this->cacRef.releaseLargeBufferTCP ( this->pCurData ); } } } void tcpiiu::show ( unsigned level ) const { - epicsGuard < epicsMutex > locker ( this->pCAC()->mutexRef() ); + epicsGuard < cacMutex > locker ( this->cacRef.mutexRef() ); char buf[256]; this->pHostNameCache->hostName ( buf, sizeof ( buf ) ); ::printf ( "Virtual circuit to \"%s\" at version V%u.%u state %u\n", buf, CA_MAJOR_PROTOCOL_REVISION, this->minorProtocolVersion, this->state ); if ( level > 1u ) { - this->netiiu::show ( level - 1u ); - } - if ( level > 2u ) { ::printf ( "\tcurrent data cache pointer = %p current data cache size = %lu\n", static_cast < void * > ( this->pCurData ), this->curDataMax ); ::printf ( "\tcontiguous receive message count=%u, busy detect bool=%u, flow control bool=%u\n", this->contigRecvMsgCount, this->busyStateDetected, this->flowControlActive ); } - if ( level > 3u ) { + if ( level > 2u ) { ::printf ( "\tvirtual circuit socket identifier %d\n", this->sock ); ::printf ( "\tsend thread flush signal:\n" ); - this->sendThreadFlushEvent.show ( level-3u ); - ::printf ( "\tsend thread exit signal:\n" ); - this->sendThreadExitEvent.show ( level-3u ); + this->sendThreadFlushEvent.show ( level-2u ); + ::printf ( "\tsend thread:\n" ); + this->sendThread.show ( level-2u ); + ::printf ( "\trecv thread:\n" ); + this->recvThread.show ( level-2u ); ::printf ("\techo pending bool = %u\n", this->echoRequestPending ); ::printf ( "IO identifier hash table:\n" ); + tsDLIterConstBD < nciu > pChan = this->channelList.firstIter (); + while ( pChan.valid () ) { + pChan->show ( level - 2u ); + pChan++; + } } } bool tcpiiu::setEchoRequestPending () // X aCC 361 { { - epicsGuard < epicsMutex > locker ( this->pCAC()->mutexRef() ); + epicsGuard < cacMutex > locker ( this->cacRef.mutexRef() ); this->echoRequestPending = true; } this->flushRequest (); @@ -765,12 +778,12 @@ bool tcpiiu::processIncoming ( epicsGuard < callbackMutex > & guard ) // if ( this->curMsg.m_postsize > this->curDataMax ) { if ( this->curDataMax == MAX_TCP && - this->pCAC()->largeBufferSizeTCP() >= this->curMsg.m_postsize ) { - char * pBuf = this->pCAC()->allocateLargeBufferTCP (); + this->cacRef.largeBufferSizeTCP() >= this->curMsg.m_postsize ) { + char * pBuf = this->cacRef.allocateLargeBufferTCP (); if ( pBuf ) { - this->pCAC()->releaseSmallBufferTCP ( this->pCurData ); + this->cacRef.releaseSmallBufferTCP ( this->pCurData ); this->pCurData = pBuf; - this->curDataMax = this->pCAC()->largeBufferSizeTCP (); + this->curDataMax = this->cacRef.largeBufferSizeTCP (); } else { this->printf ("CAC: not enough memory for message body cache (ignoring response message)\n"); @@ -788,7 +801,7 @@ bool tcpiiu::processIncoming ( epicsGuard < callbackMutex > & guard ) return true; } } - bool msgOK = this->pCAC()->executeResponse ( guard, *this, + bool msgOK = this->cacRef.executeResponse ( guard, *this, this->curMsg, this->pCurData ); if ( ! msgOK ) { return false; @@ -849,7 +862,7 @@ inline void insertRequestHeader ( /* * tcpiiu::hostNameSetRequest () */ -void tcpiiu::hostNameSetRequest () +void tcpiiu::hostNameSetRequest ( epicsGuard < cacMutex > & ) { if ( ! CA_V41 ( this->minorProtocolVersion ) ) { return; @@ -864,8 +877,6 @@ void tcpiiu::hostNameSetRequest () this->flushRequest (); } - epicsGuard < epicsMutex > locker ( this->pCAC()->mutexRef() ); - this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_HOST_NAME ); // cmd this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( postSize ) ); // postsize @@ -881,13 +892,13 @@ void tcpiiu::hostNameSetRequest () /* * tcpiiu::userNameSetRequest () */ -void tcpiiu::userNameSetRequest () +void tcpiiu::userNameSetRequest ( epicsGuard < cacMutex > & ) { if ( ! CA_V41 ( this->minorProtocolVersion ) ) { return; } - const char *pName = this->pCAC ()->userNamePointer (); + const char *pName = this->cacRef.userNamePointer (); unsigned size = strlen ( pName ) + 1u; unsigned postSize = CA_MESSAGE_ALIGN ( size ); assert ( postSize < 0xffff ); @@ -896,7 +907,6 @@ void tcpiiu::userNameSetRequest () this->flushRequest (); } - epicsGuard < epicsMutex > locker ( this->pCAC()->mutexRef() ); this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_CLIENT_NAME ); // cmd this->sendQue.pushUInt16 ( static_cast ( postSize ) ); // postsize @@ -909,13 +919,11 @@ void tcpiiu::userNameSetRequest () this->sendQue.commitMsg (); } -void tcpiiu::disableFlowControlRequest () +void tcpiiu::disableFlowControlRequest ( epicsGuard < cacMutex > & ) { if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { this->flushRequest (); } - - epicsGuard < epicsMutex > locker ( this->pCAC()->mutexRef() ); this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_ON ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize @@ -926,13 +934,11 @@ void tcpiiu::disableFlowControlRequest () this->sendQue.commitMsg (); } -void tcpiiu::enableFlowControlRequest () +void tcpiiu::enableFlowControlRequest ( epicsGuard < cacMutex > & ) { if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { this->flushRequest (); } - - epicsGuard < epicsMutex > locker ( this->pCAC()->mutexRef() ); this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_OFF ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize @@ -943,7 +949,8 @@ void tcpiiu::enableFlowControlRequest () this->sendQue.commitMsg (); } -void tcpiiu::versionMessage ( const cacChannel::priLev & priority ) +void tcpiiu::versionMessage ( epicsGuard < cacMutex > &, + const cacChannel::priLev & priority ) { assert ( priority <= 0xffff ); @@ -951,7 +958,6 @@ void tcpiiu::versionMessage ( const cacChannel::priLev & priority ) this->flushRequest (); } - epicsGuard < epicsMutex > locker ( this->pCAC()->mutexRef() ); this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_VERSION ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize ( old possize field ) @@ -962,13 +968,11 @@ void tcpiiu::versionMessage ( const cacChannel::priLev & priority ) this->sendQue.commitMsg (); } -void tcpiiu::echoRequest () +void tcpiiu::echoRequest ( epicsGuard < cacMutex > & ) { if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { this->flushRequest (); } - - epicsGuard < epicsMutex > locker ( this->pCAC()->mutexRef() ); this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_ECHO ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize @@ -1028,7 +1032,8 @@ inline void insertRequestWithPayLoad ( sendQue.commitMsg (); } -void tcpiiu::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue ) +void tcpiiu::writeRequest ( epicsGuard < cacMutex > &, + nciu &chan, unsigned type, unsigned nElem, const void *pValue ) { if ( ! chan.connected () ) { throw cacChannel::notConnected(); @@ -1039,7 +1044,8 @@ void tcpiiu::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const voi } -void tcpiiu::writeNotifyRequest ( nciu &chan, netWriteNotifyIO &io, unsigned type, +void tcpiiu::writeNotifyRequest ( epicsGuard < cacMutex > &, + nciu &chan, netWriteNotifyIO &io, unsigned type, unsigned nElem, const void *pValue ) { if ( ! chan.connected () ) { @@ -1053,7 +1059,8 @@ void tcpiiu::writeNotifyRequest ( nciu &chan, netWriteNotifyIO &io, unsigned typ CA_V49 ( this->minorProtocolVersion ) ); } -void tcpiiu::readNotifyRequest ( nciu &chan, netReadNotifyIO &io, +void tcpiiu::readNotifyRequest ( epicsGuard < cacMutex > &, + nciu &chan, netReadNotifyIO &io, unsigned dataType, unsigned nElem ) { if ( ! chan.connected () ) { @@ -1067,7 +1074,7 @@ void tcpiiu::readNotifyRequest ( nciu &chan, netReadNotifyIO &io, } unsigned maxBytes; if ( CA_V49 ( this->minorProtocolVersion ) ) { - maxBytes = this->pCAC()->largeBufferSizeTCP (); + maxBytes = this->cacRef.largeBufferSizeTCP (); } else { maxBytes = MAX_TCP; @@ -1084,7 +1091,7 @@ void tcpiiu::readNotifyRequest ( nciu &chan, netReadNotifyIO &io, this->sendQue.commitMsg (); } -void tcpiiu::createChannelRequest ( nciu &chan ) +void tcpiiu::createChannelRequest ( nciu & chan ) { const char *pName; unsigned nameLength; @@ -1127,7 +1134,8 @@ void tcpiiu::createChannelRequest ( nciu &chan ) this->sendQue.commitMsg (); } -void tcpiiu::clearChannelRequest ( ca_uint32_t sid, ca_uint32_t cid ) +void tcpiiu::clearChannelRequest ( epicsGuard < cacMutex > &, + ca_uint32_t sid, ca_uint32_t cid ) { this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_CLEAR_CHANNEL ); // cmd @@ -1143,7 +1151,8 @@ void tcpiiu::clearChannelRequest ( ca_uint32_t sid, ca_uint32_t cid ) // this routine return void because if this internally fails the best response // is to try again the next time that we reconnect // -void tcpiiu::subscriptionRequest ( nciu &chan, netSubscription & subscr ) +void tcpiiu::subscriptionRequest ( epicsGuard < cacMutex > &, + nciu &chan, netSubscription & subscr ) { if ( ! chan.connected() ) { return; @@ -1151,12 +1160,12 @@ void tcpiiu::subscriptionRequest ( nciu &chan, netSubscription & subscr ) unsigned mask = subscr.getMask(); if ( mask > 0xffff ) { mask &= 0xffff; - this->pCAC()->printf ( "CAC: subscriptionRequest() truncated unusual event select mask\n" ); + this->cacRef.printf ( "CAC: subscriptionRequest() truncated unusual event select mask\n" ); } arrayElementCount nElem = subscr.getCount (); unsigned maxBytes; if ( CA_V49 ( this->minorProtocolVersion ) ) { - maxBytes = this->pCAC()->largeBufferSizeTCP (); + maxBytes = this->cacRef.largeBufferSizeTCP (); } else { maxBytes = MAX_TCP; @@ -1181,7 +1190,8 @@ void tcpiiu::subscriptionRequest ( nciu &chan, netSubscription & subscr ) this->sendQue.commitMsg (); } -void tcpiiu::subscriptionCancelRequest ( nciu & chan, netSubscription & subscr ) +void tcpiiu::subscriptionCancelRequest ( epicsGuard < cacMutex > &, + nciu & chan, netSubscription & subscr ) { insertRequestHeader ( this->sendQue, CA_PROTO_EVENT_CANCEL, 0u, @@ -1192,22 +1202,13 @@ void tcpiiu::subscriptionCancelRequest ( nciu & chan, netSubscription & subscr ) this->sendQue.commitMsg (); } -// -// caller must hold both the callback mutex and -// also the cac primary mutex -// -void tcpiiu::lastChannelDetachNotify ( epicsGuard < callbackMutex > & cbLocker ) -{ - this->shutdown ( cbLocker, false ); -} - bool tcpiiu::flush () { while ( true ) { comBuf * pBuf; { - epicsGuard < epicsMutex > autoMutex ( this->pCAC()->mutexRef() ); + epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() ); pBuf = this->sendQue.popNextComBufToSend (); if ( pBuf ) { this->unacknowledgedSendBytes += pBuf->occupiedBytes (); @@ -1237,7 +1238,7 @@ bool tcpiiu::flush () pBuf->destroy (); if ( ! success ) { - epicsGuard < epicsMutex > autoMutex ( this->pCAC()->mutexRef() ); + epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() ); while ( ( pBuf = this->sendQue.popNextComBufToSend () ) ) { pBuf->destroy (); } @@ -1252,12 +1253,12 @@ bool tcpiiu::flush () // ~tcpiiu() will not return while this->blockingForFlush is greater than zero void tcpiiu::blockUntilSendBacklogIsReasonable ( - epicsGuard < callbackMutex > *pCallbackLocker, epicsGuard < epicsMutex > & primaryLocker ) + epicsGuard < callbackMutex > *pCallbackLocker, epicsGuard < cacMutex > & primaryLocker ) { assert ( this->blockingForFlush < UINT_MAX ); this->blockingForFlush++; while ( this->sendQue.flushBlockThreshold(0u) && this->state == iiu_connected ) { - epicsGuardRelease < epicsMutex > autoRelease ( primaryLocker ); + epicsGuardRelease < cacMutex > autoRelease ( primaryLocker ); if ( pCallbackLocker ) { epicsGuardRelease < callbackMutex > autoReleaseCallback ( *pCallbackLocker ); this->flushBlockEvent.wait (); @@ -1273,7 +1274,7 @@ void tcpiiu::blockUntilSendBacklogIsReasonable ( } } -void tcpiiu::flushRequestIfAboveEarlyThreshold () +void tcpiiu::flushRequestIfAboveEarlyThreshold ( epicsGuard < cacMutex > & ) { if ( ! this->earlyFlush && this->sendQue.flushEarlyThreshold(0u) ) { this->earlyFlush = true; @@ -1281,7 +1282,7 @@ void tcpiiu::flushRequestIfAboveEarlyThreshold () } } -bool tcpiiu::flushBlockThreshold () const +bool tcpiiu::flushBlockThreshold ( epicsGuard < cacMutex > & ) const { return this->sendQue.flushBlockThreshold ( 0u ); } @@ -1315,3 +1316,59 @@ const char * tcpiiu::pHostName () const return nameBuf; // ouch !! } +void tcpiiu::removeAllChannels ( epicsGuard < callbackMutex > & cbGuard, + epicsGuard < cacMutex > & guard, + cacDisconnectChannelPrivate & dcp ) +{ + // we are protected here because channel delete takes the callback mutex + while ( nciu *pChan = this->channelList.first() ) { + // if the claim reply has not returned then we will issue + // the clear channel request to the server when the claim reply + // arrives and there is no matching nciu in the client + if ( pChan->connected() ) { + this->clearChannelRequest ( guard, pChan->getSID(), pChan->getCID() ); + } + dcp.disconnectChannel ( cbGuard, guard, *pChan ); + } +} + +void tcpiiu::installChannel ( epicsGuard < cacMutex > &, nciu & chan, unsigned sidIn, + ca_uint16_t typeIn, arrayElementCount countIn ) +{ + this->channelList.add ( chan ); + chan.searchReplySetUp ( *this, sidIn, typeIn, countIn ); + chan.createChannelRequest ( *this ); + this->flushRequest (); +} + +void tcpiiu::uninstallChannel ( epicsGuard < callbackMutex > & cbGuard, + epicsGuard < cacMutex > & guard, nciu & chan ) +{ + this->channelList.remove ( chan ); + if ( this->channelList.count() == 0u ) { + this->shutdown ( cbGuard, guard, false ); + } +} + +int tcpiiu::printf ( const char *pformat, ... ) +{ + va_list theArgs; + int status; + + va_start ( theArgs, pformat ); + + status = this->cacRef.vPrintf ( pformat, theArgs ); + + va_end ( theArgs ); + + return status; +} + +// this is called virtually +void tcpiiu::flushRequest () +{ + this->sendThreadFlushEvent.signal (); +} + + + diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp index 64c4d8c6e..b5ca875f3 100644 --- a/src/ca/udpiiu.cpp +++ b/src/ca/udpiiu.cpp @@ -10,6 +10,10 @@ * Author: Jeff Hill */ +#ifdef _MSC_VER +# pragma warning(disable:4355) +#endif + #define epicsAssertAuthor "Jeff Hill johill@lanl.gov" #include "envDefs.h" @@ -20,11 +24,11 @@ #include "addrList.h" #include "caerr.h" // for ECA_NOSEARCHADDR #include "udpiiu.h" -#undef epicsExportSharedSymbols - #include "iocinf.h" #include "inetAddrID.h" #include "cac.h" +#include "repeaterSubscribeTimer.h" +#include "searchTimer.h" // UDP protocol dispatch table const udpiiu::pProtoStubUDP udpiiu::udpJumpTableCAC [] = @@ -52,15 +56,24 @@ const udpiiu::pProtoStubUDP udpiiu::udpJumpTableCAC [] = // // udpiiu::udpiiu () // -udpiiu::udpiiu ( callbackMutex & cbMutex, cac & cac ) : - netiiu ( &cac ), - recvThread ( *this, cbMutex, - "CAC-UDP", - epicsThreadGetStackSize ( epicsThreadStackMedium ), - cac::lowestPriorityLevelAbove - ( this->pCAC()->getInitializingThreadsPriority () ) ), - shutdownCmd ( false ), - sockCloseCompleted ( false ) +udpiiu::udpiiu ( epicsTimerQueueActive &timerQueue, callbackMutex & cbMutex, cac & cac ) : + recvThread ( *this, cbMutex, + "CAC-UDP", + epicsThreadGetStackSize ( epicsThreadStackMedium ), + cac::lowestPriorityLevelAbove + ( cac.getInitializingThreadsPriority () ) ), + cacRef ( cac ), + nBytesInXmitBuf ( 0 ), + sock ( 0 ), + // The udpiiu and the search timer share the same lock because + // this is much more efficent with recursive locks. Also, access + // to the udp's netiiu base list is protected. + pSearchTmr ( new searchTimer ( *this, timerQueue, this->mutex ) ), + pRepeaterSubscribeTmr ( new repeaterSubscribeTimer ( *this, timerQueue ) ), + repeaterPort ( 0 ), + serverPort ( 0 ), + localPort ( 0 ), + shutdownCmd ( false ) { static const unsigned short PORT_ANY = 0u; osiSockAddr addr; @@ -129,25 +142,17 @@ udpiiu::udpiiu ( callbackMutex & cbMutex, cac & cac ) : status = getsockname ( this->sock, &tmpAddr.sa, &saddr_length ); if ( status < 0 ) { socket_close ( this->sock ); - this->pCAC ()->printf ( "CAC: getsockname () error was \"%s\"\n", SOCKERRSTR (SOCKERRNO) ); + this->printf ( "CAC: getsockname () error was \"%s\"\n", SOCKERRSTR (SOCKERRNO) ); throwWithLocation ( noSocket () ); } if ( tmpAddr.sa.sa_family != AF_INET) { socket_close ( this->sock ); - this->pCAC ()->printf ( "CAC: UDP socket was not inet addr family\n" ); + this->printf ( "CAC: UDP socket was not inet addr family\n" ); throwWithLocation ( noSocket () ); } this->localPort = epicsNTOH16 ( tmpAddr.ia.sin_port ); } - this->nBytesInXmitBuf = 0u; - - this->recvThreadExitSignal = epicsEventMustCreate ( epicsEventEmpty ); - if ( ! this->recvThreadExitSignal ) { - socket_close ( this->sock ); - throw std::bad_alloc (); - } - /* * load user and auto configured * broadcast address list @@ -160,16 +165,12 @@ udpiiu::udpiiu ( callbackMutex & cbMutex, cac & cac ) : // 2) no auxiliary threads are running at this point // (taking the callback lock here would break the // lock hierarchy and risk deadlocks) - genLocalExcep ( *this->pCAC (), ECA_NOSEARCHADDR, NULL ); + genLocalExcep ( this->cacRef, ECA_NOSEARCHADDR, NULL ); } caStartRepeaterIfNotInstalled ( this->repeaterPort ); -} -void udpiiu::start ( epicsGuard < callbackMutex > & cbGuard ) -{ this->recvThread.start (); - this->pCAC()->notifyNewFD ( cbGuard, this->sock ); } /* @@ -177,14 +178,27 @@ void udpiiu::start ( epicsGuard < callbackMutex > & cbGuard ) */ udpiiu::~udpiiu () { - // closes the udp socket and waits for its recv thread to exit - this->shutdown (); + bool closeCompleted = false; - epicsEventDestroy ( this->recvThreadExitSignal ); + this->shutdownCmd = true; - ellFree ( &this->dest ); + this->wakeupMsg (); + + // wait for recv threads to exit + static const double shutdownDelay = 15.0; + while ( ! this->recvThread.exitWait ( shutdownDelay ) ) { + // this knocks the UDP input thread out of recv () + // on all os except linux + if ( ! closeCompleted ) { + socket_close ( this->sock ); + closeCompleted = true; + } + fprintf ( stderr, "cac: timing out waiting for UDP thread shutdown\n" ); + } + + ellFree ( & this->dest ); - if ( ! this->sockCloseCompleted ) { + if ( ! closeCompleted ) { socket_close ( this->sock ); } } @@ -197,7 +211,7 @@ void udpiiu::recvMsg ( callbackMutex & cbMutex ) osiSockAddr src; int status; - if ( this->pCAC()->preemptiveCallbakIsEnabled() ) { + if ( this->cacRef.preemptiveCallbakIsEnabled() ) { osiSocklen_t src_size = sizeof ( src ); status = recvfrom ( this->sock, this->recvBuf, sizeof ( this->recvBuf ), 0, &src.sa, &src_size ); @@ -215,7 +229,7 @@ void udpiiu::recvMsg ( callbackMutex & cbMutex ) { epicsGuard < callbackMutex > guard ( cbMutex ); - if ( ! this->pCAC()->preemptiveCallbakIsEnabled() ) { + if ( ! this->cacRef.preemptiveCallbakIsEnabled() ) { osiSocklen_t src_size = sizeof ( src ); status = recvfrom ( this->sock, this->recvBuf, sizeof ( this->recvBuf ), 0, &src.sa, &src_size ); @@ -274,13 +288,28 @@ void udpRecvThread::start () this->thread.start (); } +bool udpRecvThread::exitWait ( double delay ) +{ + return this->thread.exitWait ( delay ); +} + void udpRecvThread::run () { epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu ); + + { + epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); + this->iiu.cacRef.notifyNewFD ( cbGuard, this->iiu.sock ); + } + do { this->iiu.recvMsg ( this->cbMutex ); } while ( ! this->iiu.shutdownCmd ); - epicsEventSignal ( this->iiu.recvThreadExitSignal ); + + { + epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); + this->iiu.cacRef.notifyDestroyFD ( cbGuard, this->iiu.sock ); + } } /* @@ -290,6 +319,7 @@ void udpRecvThread::run () */ void udpiiu::repeaterRegistrationMessage ( unsigned attemptNumber ) { + epicsGuard < udpMutex > cbGuard ( this->mutex ); caRepeaterRegistrationMessage ( this->sock, this->repeaterPort, attemptNumber ); } @@ -484,20 +514,6 @@ void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repeaterPort ) } } -void udpiiu::shutdown () -{ - if ( this->shutdownCmd ) { - return; - } - - this->shutdownCmd = true; - - this->wakeupMsg (); - - // wait for recv threads to exit - epicsEventMustWait ( this->recvThreadExitSignal ); -} - bool udpiiu::badUDPRespAction ( epicsGuard < callbackMutex > &, const caHdr &msg, const osiSockAddr &netAddr, const epicsTime ¤tTime ) { @@ -569,12 +585,12 @@ bool udpiiu::searchRespAction ( epicsGuard < callbackMutex > & cbLocker, } if ( CA_V42 ( minorVersion ) ) { - return this->pCAC ()->lookupChannelAndTransferToTCP + return this->cacRef.lookupChannelAndTransferToTCP ( cbLocker, msg.m_available, msg.m_cid, 0xffff, 0, minorVersion, serverAddr, currentTime ); } else { - return this->pCAC ()->lookupChannelAndTransferToTCP + return this->cacRef.lookupChannelAndTransferToTCP ( cbLocker, msg.m_available, msg.m_cid, msg.m_dataType, msg.m_count, minorVersion, serverAddr, currentTime ); } @@ -618,7 +634,7 @@ bool udpiiu::beaconAction ( epicsGuard < callbackMutex > &, const caHdr &msg, unsigned protocolRevision = epicsNTOH16 ( msg.m_dataType ); unsigned beaconNumber = epicsNTOH32 ( msg.m_cid ); - this->pCAC ()->beaconNotify ( ina, currentTime, + this->cacRef.beaconNotify ( ina, currentTime, beaconNumber, protocolRevision ); return true; @@ -627,7 +643,7 @@ bool udpiiu::beaconAction ( epicsGuard < callbackMutex > &, const caHdr &msg, bool udpiiu::repeaterAckAction ( epicsGuard < callbackMutex > &, const caHdr &, const osiSockAddr &, const epicsTime &) { - this->pCAC ()->repeaterSubscribeConfirmNotify (); + this->cacRef.repeaterSubscribeConfirmNotify (); return true; } @@ -736,18 +752,12 @@ void udpiiu::postMsg ( epicsGuard < callbackMutex > & guard, } } -/* - * udpiiu::pushDatagramMsg () - */ -bool udpiiu::pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t extsize ) +bool udpiiu::pushDatagramMsg ( const caHdr & msg, const void *pExt, ca_uint16_t extsize ) { - arrayElementCount msgsize; - ca_uint16_t alignedExtSize; - caHdr *pbufmsg; - - alignedExtSize = static_cast (CA_MESSAGE_ALIGN ( extsize )); - msgsize = sizeof ( caHdr ) + alignedExtSize; + epicsGuard < udpMutex > guard ( this->mutex ); + ca_uint16_t alignedExtSize = static_cast (CA_MESSAGE_ALIGN ( extsize )); + arrayElementCount msgsize = sizeof ( caHdr ) + alignedExtSize; /* fail out if max message size exceeded */ if ( msgsize >= sizeof ( this->xmitBuf ) - 7 ) { @@ -758,7 +768,7 @@ bool udpiiu::pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t e return false; } - pbufmsg = (caHdr *) &this->xmitBuf[this->nBytesInXmitBuf]; + caHdr * pbufmsg = ( caHdr * ) &this->xmitBuf[this->nBytesInXmitBuf]; *pbufmsg = msg; memcpy ( pbufmsg + 1, pExt, extsize ); if ( extsize != alignedExtSize ) { @@ -773,89 +783,97 @@ bool udpiiu::pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t e void udpiiu::datagramFlush () { - osiSockAddrNode *pNode; + epicsGuard < udpMutex > guard ( this->mutex ); - if ( this->nBytesInXmitBuf == 0u ) { - return; - } + { + osiSockAddrNode *pNode; - pNode = (osiSockAddrNode *) ellFirst ( &this->dest ); // X aCC 749 - while ( pNode ) { - int status; + if ( this->nBytesInXmitBuf == 0u ) { + return; + } - assert ( this->nBytesInXmitBuf <= INT_MAX ); - status = sendto ( this->sock, this->xmitBuf, - (int) this->nBytesInXmitBuf, 0, - &pNode->addr.sa, sizeof ( pNode->addr.sa ) ); - if ( status != (int) this->nBytesInXmitBuf ) { - if ( status >= 0 ) { - this->printf ( "CAC: UDP sendto () call returned strange xmit count?\n" ); - break; - } - else { - int localErrno = SOCKERRNO; + pNode = (osiSockAddrNode *) ellFirst ( &this->dest ); // X aCC 749 + while ( pNode ) { + int status; - if ( localErrno == SOCK_EINTR ) { - if ( this->shutdownCmd ) { - break; - } - else { - continue; - } - } - else if ( localErrno == SOCK_SHUTDOWN ) { - break; - } - else if ( localErrno == SOCK_ENOTSOCK ) { - break; - } - else if ( localErrno == SOCK_EBADF ) { + assert ( this->nBytesInXmitBuf <= INT_MAX ); + status = sendto ( this->sock, this->xmitBuf, + (int) this->nBytesInXmitBuf, 0, + &pNode->addr.sa, sizeof ( pNode->addr.sa ) ); + if ( status != (int) this->nBytesInXmitBuf ) { + if ( status >= 0 ) { + this->printf ( "CAC: UDP sendto () call returned strange xmit count?\n" ); break; } else { - char buf[64]; + int localErrno = SOCKERRNO; - sockAddrToDottedIP ( &pNode->addr.sa, buf, sizeof ( buf ) ); + if ( localErrno == SOCK_EINTR ) { + if ( this->shutdownCmd ) { + break; + } + else { + continue; + } + } + else if ( localErrno == SOCK_SHUTDOWN ) { + break; + } + else if ( localErrno == SOCK_ENOTSOCK ) { + break; + } + else if ( localErrno == SOCK_EBADF ) { + break; + } + else { + char buf[64]; - this->printf ( - "CAC: error = \"%s\" sending UDP msg to %s\n", - SOCKERRSTR ( localErrno ), buf); - break; + sockAddrToDottedIP ( &pNode->addr.sa, buf, sizeof ( buf ) ); + + this->printf ( + "CAC: error = \"%s\" sending UDP msg to %s\n", + SOCKERRSTR ( localErrno ), buf); + break; + } } } + pNode = (osiSockAddrNode *) ellNext ( &pNode->node ); // X aCC 749 } - pNode = (osiSockAddrNode *) ellNext ( &pNode->node ); // X aCC 749 - } - this->nBytesInXmitBuf = 0u; + this->nBytesInXmitBuf = 0u; + } } void udpiiu::show ( unsigned level ) const { + epicsGuard < udpMutex > guard ( this->mutex ); + ::printf ( "Datagram IO circuit (and disconnected channel repository)\n"); if ( level > 1u ) { - this->netiiu::show ( level - 1u ); - } - if ( level > 2u ) { ::printf ("\trepeater port %u\n", this->repeaterPort ); ::printf ("\tdefault server port %u\n", this->serverPort ); - printChannelAccessAddressList ( &this->dest ); + printChannelAccessAddressList ( & this->dest ); } - if ( level > 3u ) { + if ( level > 2u ) { ::printf ("\tsocket identifier %d\n", this->sock ); ::printf ("\tbytes in xmit buffer %u\n", this->nBytesInXmitBuf ); ::printf ("\tshut down command bool %u\n", this->shutdownCmd ); ::printf ( "\trecv thread exit signal:\n" ); - epicsEventShow ( this->recvThreadExitSignal, level-3u ); + this->recvThread.show ( level - 2u ); + ::printf ( "search message timer:\n" ); + this->pSearchTmr->show ( level - 2u ); + ::printf ( "repeater subscribee timer:\n" ); + this->pRepeaterSubscribeTmr->show ( level - 2u ); + tsDLIterConstBD < nciu > pChan = this->channelList.firstIter (); + while ( pChan.valid () ) { + pChan->show ( level - 2u ); + pChan++; + } } } -void udpiiu::wakeupMsg () +bool udpiiu::wakeupMsg () { - if ( this->sockCloseCompleted ) { - return; - } - caHdr msg; msg.m_cmmd = epicsHTON16 ( CA_PROTO_VERSION ); msg.m_available = epicsHTON32 ( 0u ); @@ -869,21 +887,219 @@ void udpiiu::wakeupMsg () addr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_LOOPBACK ); addr.ia.sin_port = epicsHTON16 ( this->localPort ); + epicsGuard < udpMutex > guard ( this->mutex ); + // send a wakeup msg so the UDP recv thread will exit int status = sendto ( this->sock, reinterpret_cast < const char * > ( &msg ), sizeof (msg), 0, &addr.sa, sizeof ( addr.sa ) ); - if ( status < 0 ) { - // this knocks the UDP input thread out of recv () - // on all os except linux - status = socket_close ( this->sock ); - if ( status == 0 ) { - this->sockCloseCompleted = true; - } - else { - errlogPrintf ("CAC UDP socket close error was %s\n", - SOCKERRSTR ( SOCKERRNO ) ); + if ( status == sizeof (msg) ) { + return true; + } + return false; +} + +void udpiiu::repeaterConfirmNotify () +{ + this->pRepeaterSubscribeTmr->confirmNotify (); +} + +void udpiiu::notifySearchResponse ( unsigned short retrySeqNo, const epicsTime & currentTime ) +{ + // deadlock can result if this is called while holding the primary + // mutex (because the primary mutex is used in the search timer callback) + this->pSearchTmr->notifySearchResponse ( retrySeqNo, currentTime ); +} + +void udpiiu::resetSearchTimerPeriod ( double delay ) +{ + this->pSearchTmr->resetPeriod ( delay ); +} + +void udpiiu::beaconAnomalyNotify () +{ + static const double portTicksPerSec = 1000u; + static const unsigned portBasedDelayMask = 0xff; + + /* + * This is needed when many machines + * have channels in a disconnected state that + * dont exist anywhere on the network. This insures + * that we dont have many CA clients synchronously + * flooding the network with broadcasts (and swamping + * out requests for valid channels). + * + * I fetch the local UDP port number and use the low + * order bits as a pseudo random delay to prevent every + * one from replying at once. + */ + double delay = ( this->localPort & portBasedDelayMask ); + delay /= portTicksPerSec; + + this->pSearchTmr->resetPeriod ( delay ); + + { + epicsGuard guard ( this->mutex ); + tsDLIterBD < nciu > chan = this->channelList.firstIter (); + while ( chan.valid () ) { + chan->resetRetryCount (); + chan++; } } } +void udpiiu::removeAllChannels ( epicsGuard < callbackMutex > & ) +{ + epicsGuard < udpMutex > guard ( this->mutex ); + static limboiiu limboIIU; + while ( nciu * pChan = this->channelList.get () ) { + // no need to own CAC lock here because the channel is being decomissioned + pChan->disconnect ( limboIIU ); + } +} + +void udpiiu::pendioTimeoutNotify () +{ + epicsGuard < udpMutex > guard ( this->mutex ); + tsDLIterBD < nciu > chan = this->channelList.firstIter (); + while ( chan.valid () ) { + chan->connectTimeoutNotify (); + chan++; + } +} + +bool udpiiu::searchMsg ( unsigned short retrySeqNumber, unsigned & retryNoForThisChannel ) +{ + bool success; + + if ( nciu *pChan = this->channelList.get () ) { + success = pChan->searchMsg ( *this, retrySeqNumber, retryNoForThisChannel ); + if ( success ) { + this->channelList.add ( *pChan ); + } + else { + this->channelList.push ( *pChan ); + } + } + else { + success = false; + } + + return success; +} + +void udpiiu::installChannel ( nciu & chan ) +{ + epicsGuard < udpMutex> guard ( this->mutex ); + this->channelList.add ( chan ); + this->resetSearchPeriod ( 0.0 ); +} + +int udpiiu::printf ( const char *pformat, ... ) +{ + va_list theArgs; + int status; + + va_start ( theArgs, pformat ); + + status = this->cacRef.vPrintf ( pformat, theArgs ); + + va_end ( theArgs ); + + return status; +} + +void udpiiu::uninstallChannel ( epicsGuard < callbackMutex > &, + epicsGuard < cacMutex > &, nciu & chan ) +{ + epicsGuard < udpMutex > guard ( this->mutex ); + this->channelList.remove ( chan ); +} + +void udpiiu::hostName ( char *pBuf, unsigned bufLength ) const +{ + netiiu::hostName ( pBuf, bufLength ); +} + +const char * udpiiu::pHostName () const +{ + return netiiu::pHostName (); +} + +bool udpiiu::ca_v42_ok () const +{ + return netiiu::ca_v42_ok (); +} + +void udpiiu::writeRequest ( epicsGuard < cacMutex > & guard, nciu & chan, unsigned type, + unsigned nElem, const void * pValue ) +{ + netiiu::writeRequest ( guard, chan, type, nElem, pValue ); +} + +void udpiiu::writeNotifyRequest ( epicsGuard < cacMutex > & guard, nciu & chan, + netWriteNotifyIO & io, unsigned type, unsigned nElem, const void *pValue ) +{ + netiiu::writeNotifyRequest ( guard, chan, io, type, nElem, pValue ); +} + +void udpiiu::readNotifyRequest ( epicsGuard < cacMutex > & guard, nciu & chan, + netReadNotifyIO & io, unsigned type, unsigned nElem ) +{ + netiiu::readNotifyRequest ( guard, chan, io, type, nElem ); +} + +void udpiiu::clearChannelRequest ( epicsGuard < cacMutex > & guard, + ca_uint32_t sid, ca_uint32_t cid ) +{ + netiiu::clearChannelRequest ( guard, sid, cid ); +} + +void udpiiu::subscriptionRequest ( epicsGuard < cacMutex > & guard, nciu & chan, + netSubscription & subscr ) +{ + netiiu::subscriptionRequest ( guard, chan, subscr ); +} + +void udpiiu::subscriptionCancelRequest ( epicsGuard < cacMutex > & guard, + nciu & chan, netSubscription & subscr ) +{ + return netiiu::subscriptionCancelRequest ( guard, chan, subscr ); +} + +void udpiiu::flushRequest () +{ + netiiu::flushRequest (); +} + +bool udpiiu::flushBlockThreshold ( epicsGuard < cacMutex > & guard ) const +{ + return netiiu::flushBlockThreshold ( guard ); +} + +void udpiiu::flushRequestIfAboveEarlyThreshold ( epicsGuard < cacMutex > & guard ) +{ + netiiu::flushRequestIfAboveEarlyThreshold ( guard ); +} + +void udpiiu::blockUntilSendBacklogIsReasonable + ( epicsGuard < callbackMutex > * pCBGuard, epicsGuard < cacMutex > & guard ) +{ + netiiu::blockUntilSendBacklogIsReasonable ( pCBGuard, guard ); +} + +void udpiiu::requestRecvProcessPostponedFlush () +{ + netiiu::requestRecvProcessPostponedFlush (); +} + +osiSockAddr udpiiu::getNetworkAddress () const +{ + return netiiu::getNetworkAddress (); +} + +void udpiiu::resetSearchPeriod ( double delay ) +{ + this->pSearchTmr->resetPeriod ( delay ); +} + diff --git a/src/ca/udpiiu.h b/src/ca/udpiiu.h index 5d244b6ef..719de13a9 100644 --- a/src/ca/udpiiu.h +++ b/src/ca/udpiiu.h @@ -27,6 +27,7 @@ # include "osiSock.h" # include "epicsThread.h" +# include "epicsMemory.h" #ifdef udpiiuh_accessh_epicsExportSharedSymbols # define epicsExportSharedSymbols @@ -53,6 +54,7 @@ public: const char * pName, unsigned stackSize, unsigned priority ); virtual ~udpRecvThread (); void start (); + bool exitWait ( double delay ); private: class udpiiu & iiu; callbackMutex & cbMutex; @@ -60,23 +62,37 @@ private: void run(); }; +class udpMutex { +public: + void lock (); + void unlock (); + void show ( unsigned level ) const; +private: + epicsMutex mutex; +}; + class udpiiu : public netiiu { public: - udpiiu ( callbackMutex &, class cac & ); - void start ( epicsGuard < callbackMutex > & ); + udpiiu ( class epicsTimerQueueActive &, callbackMutex &, class cac & ); virtual ~udpiiu (); - void shutdown (); - void recvMsg ( callbackMutex & ); - void postMsg ( epicsGuard < callbackMutex > &, - const osiSockAddr & net_addr, - char *pInBuf, arrayElementCount blockSize, - const epicsTime ¤Time ); + void installChannel ( nciu & ); void repeaterRegistrationMessage ( unsigned attemptNumber ); + bool searchMsg ( unsigned short retrySeqNumber, unsigned & retryNoForThisChannel ); void datagramFlush (); - unsigned getPort () const; void show ( unsigned level ) const; - bool isCurrentThread () const; - void wakeupMsg (); + bool wakeupMsg (); + void resetSearchPeriod ( double delay ); + void repeaterConfirmNotify (); + void notifySearchResponse ( unsigned short retrySeqNo, const epicsTime & currentTime ); + void resetSearchTimerPeriod ( double delay ); + void beaconAnomalyNotify (); + void removeAllChannels ( epicsGuard < callbackMutex > & cbGuard ); + void pendioTimeoutNotify (); + int printf ( const char *pformat, ... ); + unsigned channelCount (); + void uninstallChannel ( epicsGuard < callbackMutex > &, + epicsGuard < cacMutex > &, nciu & ); + bool pushDatagramMsg ( const caHdr &hdr, const void *pExt, ca_uint16_t extsize); // exceptions class noSocket {}; @@ -84,18 +100,25 @@ public: private: char xmitBuf [MAX_UDP_SEND]; char recvBuf [MAX_UDP_RECV]; + mutable udpMutex mutex; udpRecvThread recvThread; + tsDLList < nciu > channelList; ELLLIST dest; - epicsEventId recvThreadExitSignal; + cac & cacRef; unsigned nBytesInXmitBuf; SOCKET sock; + epics_auto_ptr < class searchTimer > pSearchTmr; + epics_auto_ptr < class repeaterSubscribeTimer > pRepeaterSubscribeTmr; unsigned short repeaterPort; unsigned short serverPort; unsigned short localPort; bool shutdownCmd; - bool sockCloseCompleted; - bool pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t extsize ); + void recvMsg ( callbackMutex & ); + void postMsg ( epicsGuard < callbackMutex > &, + const osiSockAddr & net_addr, + char *pInBuf, arrayElementCount blockSize, + const epicsTime ¤Time ); typedef bool ( udpiiu::*pProtoStubUDP ) ( epicsGuard < callbackMutex > &, const caHdr &, @@ -122,13 +145,51 @@ private: friend void udpRecvThread::run (); + void hostName ( char *pBuf, unsigned bufLength ) const; + const char * pHostName () const; // deprecated - please do not use + bool ca_v42_ok () const; + void writeRequest ( epicsGuard < cacMutex > &, nciu &, unsigned type, + unsigned nElem, const void *pValue ); + void writeNotifyRequest ( epicsGuard < cacMutex > &, nciu &, netWriteNotifyIO &, + unsigned type, unsigned nElem, const void *pValue ); + void readNotifyRequest ( epicsGuard < cacMutex > &, nciu &, netReadNotifyIO &, + unsigned type, unsigned nElem ); + void clearChannelRequest ( epicsGuard < cacMutex > &, + ca_uint32_t sid, ca_uint32_t cid ); + void subscriptionRequest ( epicsGuard < cacMutex > &, nciu &, + netSubscription &subscr ); + void subscriptionCancelRequest ( epicsGuard < cacMutex > &, + nciu & chan, netSubscription & subscr ); + void flushRequest (); + bool flushBlockThreshold ( epicsGuard < cacMutex > & ) const; + void flushRequestIfAboveEarlyThreshold ( epicsGuard < cacMutex > & ); + void blockUntilSendBacklogIsReasonable + ( epicsGuard < callbackMutex > *, epicsGuard < cacMutex > & ); + void requestRecvProcessPostponedFlush (); + osiSockAddr getNetworkAddress () const; + udpiiu ( const udpiiu & ); udpiiu & operator = ( const udpiiu & ); }; -inline unsigned udpiiu::getPort () const +inline void udpMutex::lock () { - return this->localPort; + this->mutex.lock (); +} + +inline void udpMutex::unlock () +{ + this->mutex.unlock (); +} + +inline void udpMutex::show ( unsigned level ) const +{ + this->mutex.show ( level ); +} + +inline unsigned udpiiu::channelCount () +{ + return this->channelList.count (); } #endif // udpiiuh diff --git a/src/ca/virtualCircuit.h b/src/ca/virtualCircuit.h index 8671fc441..2720763b7 100644 --- a/src/ca/virtualCircuit.h +++ b/src/ca/virtualCircuit.h @@ -65,6 +65,7 @@ public: const char * pName, unsigned int stackSize, unsigned int priority ); virtual ~tcpSendThread (); void start (); + bool exitWait ( double delay ); private: class tcpiiu & iiu; epicsThread thread; @@ -84,19 +85,23 @@ public: void start ( epicsGuard < callbackMutex > & ); void cleanShutdown (); void forcedShutdown (); - void shutdown ( epicsGuard < callbackMutex > & cbLocker, bool discardPendingMessages ); + void shutdown ( epicsGuard < callbackMutex > & cbLocker, + epicsGuard < cacMutex > & guard, bool discardPendingMessages ); void beaconAnomalyNotify (); void beaconArrivalNotify (); void flushRequest (); - bool flushBlockThreshold () const; - void flushRequestIfAboveEarlyThreshold (); + bool flushBlockThreshold ( epicsGuard < cacMutex > & ) const; + void flushRequestIfAboveEarlyThreshold ( epicsGuard < cacMutex > & ); void blockUntilSendBacklogIsReasonable - ( epicsGuard < callbackMutex > * pCallbackGuard, epicsGuard < epicsMutex > & primaryGuard ); + ( epicsGuard < callbackMutex > * pCallbackGuard, + epicsGuard < cacMutex > & primaryGuard ); virtual void show ( unsigned level ) const; bool setEchoRequestPending (); + void createChannelRequest ( nciu & ); void requestRecvProcessPostponedFlush (); - void clearChannelRequest ( ca_uint32_t sid, ca_uint32_t cid ); + void clearChannelRequest ( epicsGuard < cacMutex > &, + ca_uint32_t sid, ca_uint32_t cid ); bool ca_v41_ok () const; bool ca_v42_ok () const; @@ -104,9 +109,17 @@ public: bool ca_v49_ok () const; void hostName ( char *pBuf, unsigned bufLength ) const; - const char * pHostName () const; // deprecated - please do not use bool alive () const; osiSockAddr getNetworkAddress () const; + int printf ( const char *pformat, ... ); + unsigned channelCount (); + void removeAllChannels ( epicsGuard < callbackMutex > & cbGuard, + epicsGuard < cacMutex > & guard, + class cacDisconnectChannelPrivate & ); + void installChannel ( epicsGuard < cacMutex > &, nciu & chan, + unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn ); + void uninstallChannel ( epicsGuard < callbackMutex > &, + epicsGuard < cacMutex > &, nciu & chan ); private: tcpRecvThread recvThread; @@ -116,15 +129,16 @@ private: tcpKillTimer killTimer; comQueSend sendQue; comQueRecv recvQue; + tsDLList < nciu > channelList; caHdrLargeArray curMsg; arrayElementCount curDataMax; arrayElementCount curDataBytes; epics_auto_ptr < hostNameCache > pHostNameCache; + cac & cacRef; char * pCurData; unsigned minorProtocolVersion; iiu_conn_state state; epicsEvent sendThreadFlushEvent; - epicsEvent sendThreadExitEvent; epicsEvent flushBlockEvent; SOCKET sock; unsigned contigRecvMsgCount; @@ -144,22 +158,21 @@ private: bool processIncoming ( epicsGuard < callbackMutex > & ); unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf ); unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf ); - void lastChannelDetachNotify ( epicsGuard < callbackMutex > & cbLocker ); void connect (); + const char * pHostName () const; // send protocol stubs - void echoRequest (); - void versionMessage ( const cacChannel::priLev & priority ); - void disableFlowControlRequest (); - void enableFlowControlRequest (); - void hostNameSetRequest (); - void userNameSetRequest (); - void writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue ); - void writeNotifyRequest ( nciu &, netWriteNotifyIO &, unsigned type, unsigned nElem, const void *pValue ); - void readNotifyRequest ( nciu &, netReadNotifyIO &, unsigned type, unsigned nElem ); - void createChannelRequest ( nciu & ); - void subscriptionRequest ( nciu &, netSubscription & subscr ); - void subscriptionCancelRequest ( nciu & chan, netSubscription & subscr ); + void echoRequest ( epicsGuard < cacMutex > & ); + void versionMessage ( epicsGuard < cacMutex > &, const cacChannel::priLev & priority ); + void disableFlowControlRequest (epicsGuard < cacMutex > & ); + void enableFlowControlRequest (epicsGuard < cacMutex > & ); + void hostNameSetRequest (epicsGuard < cacMutex > & ); + void userNameSetRequest (epicsGuard < cacMutex > & ); + void writeRequest ( epicsGuard < cacMutex > &, nciu &, unsigned type, unsigned nElem, const void *pValue ); + void writeNotifyRequest ( epicsGuard < cacMutex > &, nciu &, netWriteNotifyIO &, unsigned type, unsigned nElem, const void *pValue ); + void readNotifyRequest ( epicsGuard < cacMutex > &, nciu &, netReadNotifyIO &, unsigned type, unsigned nElem ); + void subscriptionRequest ( epicsGuard < cacMutex > &, nciu &, netSubscription & subscr ); + void subscriptionCancelRequest ( epicsGuard < cacMutex > &, nciu & chan, netSubscription & subscr ); void flushIfRecvProcessRequested (); bool flush (); // only to be called by the send thread @@ -170,11 +183,6 @@ private: tcpiiu & operator = ( const tcpiiu & ); }; -inline void tcpiiu::flushRequest () -{ - this->sendThreadFlushEvent.signal (); -} - inline bool tcpiiu::ca_v41_ok () const { return CA_V41 ( this->minorProtocolVersion ); @@ -219,5 +227,10 @@ inline void tcpiiu::flushIfRecvProcessRequested () } } +inline unsigned tcpiiu::channelCount () +{ + return this->channelList.count (); +} + #endif // ifdef virtualCircuith