From 3f680362bb6adcc46ac102062e1f19f5007aa6b2 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Thu, 20 Feb 2003 17:27:04 +0000 Subject: [PATCH] improved client scedualing fro fdManager based apps --- src/ca/access.cpp | 24 ++++- src/ca/acctst.c | 131 ++++++++++++++++++++++--- src/ca/ca_client_context.cpp | 184 ++++++++++++++++++++++++++--------- src/ca/cac.cpp | 49 +++------- src/ca/cac.h | 5 +- src/ca/cacIO.h | 11 +-- src/ca/oldAccess.h | 52 +++++----- src/ca/tcpiiu.cpp | 47 +++------ src/ca/udpiiu.cpp | 41 ++------ src/ca/udpiiu.h | 13 +-- src/ca/virtualCircuit.h | 13 +-- 11 files changed, 353 insertions(+), 217 deletions(-) diff --git a/src/ca/access.cpp b/src/ca/access.cpp index 7f53a2ccf..722e4d7d1 100644 --- a/src/ca/access.cpp +++ b/src/ca/access.cpp @@ -315,6 +315,23 @@ int epicsShareAPI ca_create_channel ( return caStatus; } + { + CAFDHANDLER * pFunc = 0; + void * pArg = 0; + { + epicsGuard < ca_client_context_mutex > + autoMutex ( pcac->mutex ); + if ( pcac->fdRegFuncNeedsToBeCalled ) { + pFunc = pcac->fdRegFunc; + pArg = pcac->fdRegArg; + pcac->fdRegFuncNeedsToBeCalled = false; + } + } + if ( pFunc ) { + ( *pFunc ) ( pArg, pcac->sock, true ); + } + } + try { oldChannelNotify * pChanNotify = new ( pcac->oldChannelNotifyFreeList ) @@ -432,8 +449,9 @@ int epicsShareAPI ca_array_get_callback ( chtype type, } unsigned tmpType = static_cast < unsigned > ( type ); - autoPtrDestroy < getCallback > pNotify - ( new ( pChan->getClientCtx().getCallbackFreeList ) + autoPtrFreeList < getCallback > pNotify + ( pChan->getClientCtx().getCallbackFreeList, + new ( pChan->getClientCtx().getCallbackFreeList ) getCallback ( *pChan, pfunc, arg ) ); pChan->read ( tmpType, count, *pNotify ); pNotify.release (); @@ -911,7 +929,7 @@ void epicsShareAPI ca_signal_formated ( long ca_status, const char *pfilenm, * */ // extern "C" -int epicsShareAPI ca_add_fd_registration (CAFDHANDLER *func, void *arg) +int epicsShareAPI ca_add_fd_registration ( CAFDHANDLER * func, void * arg ) { ca_client_context *pcac; int caStatus = fetchClientContext ( &pcac ); diff --git a/src/ca/acctst.c b/src/ca/acctst.c index 36fc1832e..6a0ca8700 100644 --- a/src/ca/acctst.c +++ b/src/ca/acctst.c @@ -30,6 +30,7 @@ #include "envDefs.h" #include "caDiagnostics.h" #include "cadef.h" +#include "fdmgr.h" #ifndef min #define min(A,B) ((A)>(B)?(B):(A)) @@ -2214,23 +2215,28 @@ void monitorUpdateTest ( chid chan, unsigned interestLevel ) showProgressEnd ( interestLevel ); } -void verifyReasonableBeaconPeriod ( chid chan ) +void verifyReasonableBeaconPeriod ( chid chan, unsigned interestLevel ) { - double beaconPeriod; + if ( ca_get_ioc_connection_count () > 0 ) { + double beaconPeriod; - assert ( ca_get_ioc_connection_count () > 0 ); + showProgressBegin ( "verifyReasonableBeaconPeriod", interestLevel ); - printf ( "Beacon anomalies detected since program start %u\n", - ca_beacon_anomaly_count () ); - beaconPeriod = ca_beacon_period ( chan ); - assert ( beaconPeriod >= 0.0 ); + printf ( "Beacon anomalies detected since program start %u\n", + ca_beacon_anomaly_count () ); - printf ( "Estimated beacon period for channel %s = %f sec.\n", - ca_name ( chan ), beaconPeriod ); + beaconPeriod = ca_beacon_period ( chan ); + assert ( beaconPeriod >= 0.0 ); + + printf ( "Estimated beacon period for channel %s = %f sec.\n", + ca_name ( chan ), beaconPeriod ); + + showProgressEnd ( interestLevel ); + } } -void verifyOldPend ( unsigned interestLevel) +void verifyOldPend ( unsigned interestLevel ) { int status; @@ -2371,6 +2377,106 @@ void eventClearAndMultipleMonitorTest ( chid chan, unsigned interestLevel ) monitorUpdateTest ( chan, interestLevel ); } +void fdcb ( void * parg ) +{ + ca_poll (); +} + +void fdRegCB ( void * parg, int fd, int opened ) +{ + int status; + + fdctx * mgrCtx = ( fdctx * ) parg; + if ( opened ) { + status = fdmgr_add_callback ( + mgrCtx, fd, fdi_read, fdcb, 0 ); + assert ( status >= 0 ); + } + else { + status = fdmgr_clear_callback ( + mgrCtx, fd, fdi_read ); + assert ( status >= 0 ); + } +} + +void fdManagerVerify ( const char * pName, unsigned interestLevel ) +{ + int status; + fdctx * mgrCtx; + struct timeval tmo; + chid newChan; + evid subscription; + unsigned repCount = 0u; + unsigned eventCount = 0u; + epicsTimeStamp begin, end; + + mgrCtx = fdmgr_init (); + assert ( mgrCtx ); + + showProgressBegin ( "fdManagerVerify", interestLevel ); + + status = ca_add_fd_registration ( fdRegCB, mgrCtx ); + assert ( status == ECA_NORMAL ); + + status = ca_create_channel ( pName, 0, 0, 0, & newChan ); + assert ( status == ECA_NORMAL ); + + while ( ca_state ( newChan ) != cs_conn ) { + tmo.tv_sec = 6000; + tmo.tv_usec = 0; + status = fdmgr_pend_event ( mgrCtx, & tmo ); + assert ( status >= 0 ); + } + + status = ca_add_event ( DBR_FLOAT, newChan, + nUpdatesTester, & eventCount, & subscription ); + assert ( status == ECA_NORMAL ); + + status = ca_flush_io (); + assert ( status == ECA_NORMAL ); + + while ( eventCount < 1 ) { + tmo.tv_sec = 6000; + tmo.tv_usec = 0; + status = fdmgr_pend_event ( mgrCtx, & tmo ); + assert ( status >= 0 ); + } + + status = ca_clear_event ( subscription ); + assert ( status == ECA_NORMAL ); + + status = ca_flush_io (); + assert ( status == ECA_NORMAL ); + + // look for infinite loop in fd manager schedualing + epicsTimeGetCurrent ( & begin ); + eventCount = 0u; + while ( 1 ) { + double delay; + tmo.tv_sec = 1; + tmo.tv_usec = 0; + status = fdmgr_pend_event ( mgrCtx, & tmo ); + assert ( status >= 0 ); + epicsTimeGetCurrent ( & end ); + delay = epicsTimeDiffInSeconds ( & end, & begin ); + if ( delay >= 1.0 ) { + break; + } + assert ( eventCount++ < 100 ); + } + + status = ca_clear_channel ( newChan ); + assert ( status == ECA_NORMAL ); + + status = ca_add_fd_registration ( 0, 0 ); + assert ( status == ECA_NORMAL ); + + status = fdmgr_delete ( mgrCtx ); + assert ( status >= 0 ); + + showProgressEnd ( interestLevel ); +} + int acctst ( char *pName, unsigned interestLevel, unsigned channelCount, unsigned repetitionCount, enum ca_preemptive_callback_select select ) { @@ -2459,6 +2565,9 @@ int acctst ( char *pName, unsigned interestLevel, unsigned channelCount, verifyHighThroughputReadCallback ( chan, interestLevel ); verifyHighThroughputWriteCallback ( chan, interestLevel ); verifyBadString ( chan, interestLevel ); + if ( select != ca_enable_preemptive_callback ) { + fdManagerVerify ( pName, interestLevel ); + } /* * CA pend event delay accuracy test @@ -2485,7 +2594,7 @@ int acctst ( char *pName, unsigned interestLevel, unsigned channelCount, verifyBlockingConnect ( pChans, channelCount, repetitionCount, interestLevel ); verifyClear ( pChans, interestLevel ); - verifyReasonableBeaconPeriod ( chan ); + verifyReasonableBeaconPeriod ( chan, interestLevel ); /* * Verify that we can do IO with the new types for ALH diff --git a/src/ca/ca_client_context.cpp b/src/ca/ca_client_context.cpp index 52d47e4a5..50a2957ac 100644 --- a/src/ca/ca_client_context.cpp +++ b/src/ca/ca_client_context.cpp @@ -29,21 +29,92 @@ extern epicsThreadPrivateId caClientContextId; ca_client_context::ca_client_context ( bool enablePreemptiveCallback ) : - clientCtx ( * new cac ( *this, enablePreemptiveCallback ) ), - pCallbackGuard ( 0 ), ca_exception_func ( 0 ), ca_exception_arg ( 0 ), + ca_exception_func ( 0 ), ca_exception_arg ( 0 ), pVPrintfFunc ( errlogVprintf ), fdRegFunc ( 0 ), fdRegArg ( 0 ), - pndRecvCnt ( 0u ), ioSeqNo ( 0u ) + pndRecvCnt ( 0u ), ioSeqNo ( 0u ), localPort ( 0 ), + fdRegFuncNeedsToBeCalled ( false ), noWakeupSincePend ( true ) { - if ( ! enablePreemptiveCallback ) { - this->pCallbackGuard = new epicsGuard < callbackMutex > - ( this->clientCtx.callbackGuardFactory () ); + static const unsigned short PORT_ANY = 0u; + + this->sock = socket ( AF_INET, SOCK_DGRAM, IPPROTO_UDP ); + if ( this->sock == INVALID_SOCKET ) { + this->printf ( + "ca_client_context: unable to create " + "datagram socket because = \"%s\"\n", + SOCKERRSTR (SOCKERRNO)); + throwWithLocation ( noSocket () ); } + + { + osiSockIoctl_t yes = true; + int status = socket_ioctl ( this->sock, + FIONBIO, & yes); // X aCC 392 + if ( status < 0 ) { + socket_close ( this->sock ); + this->printf ( + "%s: non blocking IO set fail because \"%s\"\n", + __FILE__, SOCKERRSTR ( SOCKERRNO ) ); + throwWithLocation ( noSocket () ); + } + } + + // force a bind to an unconstrained address so we can obtain + // the local port number below + { + osiSockAddr addr; + memset ( (char *)&addr, 0 , sizeof ( addr ) ); + addr.ia.sin_family = AF_INET; + addr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_ANY ); + addr.ia.sin_port = epicsHTON16 ( PORT_ANY ); // X aCC 818 + int status = bind (this->sock, &addr.sa, sizeof (addr) ); + if ( status < 0 ) { + socket_close (this->sock); + this->printf ( + "CAC: unable to bind to an unconstrained " + "address because = \"%s\"\n", + SOCKERRSTR (SOCKERRNO)); + throwWithLocation ( noSocket () ); + } + } + + { + osiSockAddr tmpAddr; + osiSocklen_t saddr_length = sizeof ( tmpAddr ); + int status = getsockname ( this->sock, & tmpAddr.sa, & saddr_length ); + if ( status < 0 ) { + socket_close ( this->sock ); + this->printf ( "CAC: getsockname () error was \"%s\"\n", SOCKERRSTR (SOCKERRNO) ); + throwWithLocation ( noSocket () ); + } + if ( tmpAddr.sa.sa_family != AF_INET) { + socket_close ( this->sock ); + this->printf ( "CAC: UDP socket was not inet addr family\n" ); + throwWithLocation ( noSocket () ); + } + this->localPort = epicsNTOH16 ( tmpAddr.ia.sin_port ); + } + + epics_auto_ptr < cac > pCAC ( + new cac ( *this, enablePreemptiveCallback ) ); + + epics_auto_ptr < epicsGuard < callbackMutex > > pCBGuard; + if ( ! enablePreemptiveCallback ) { + pCBGuard.reset ( new epicsGuard < callbackMutex > + ( pCAC->callbackGuardFactory () ) ); + } + + // multiple steps ensure exception safety + this->pCallbackGuard = pCBGuard; + this->pClientCtx = pCAC; } ca_client_context::~ca_client_context () { - delete this->pCallbackGuard; - delete & this->clientCtx; + if ( this->fdRegFunc ) { + ( *this->fdRegFunc ) + ( this->fdRegArg, this->sock, false ); + } + socket_close ( this->sock ); } void ca_client_context::destroyChannel ( oldChannelNotify & chan ) @@ -101,6 +172,7 @@ void ca_client_context::registerForFileDescriptorCallBack ( CAFDHANDLER *pFunc, epicsGuard < ca_client_context_mutex > autoMutex ( this->mutex ); this->fdRegFunc = pFunc; this->fdRegArg = pArg; + this->fdRegFuncNeedsToBeCalled = true; // should block here until releated callback in progress completes } @@ -160,7 +232,7 @@ void ca_client_context::exception ( int stat, const char *pCtx, ( *pFunc ) ( args ); } else { - this->clientCtx.signal ( stat, pFile, lineNo, pCtx ); + this->pClientCtx->signal ( stat, pFile, lineNo, pCtx ); } } @@ -192,7 +264,7 @@ void ca_client_context::exception ( int status, const char *pContext, ( *pFunc ) ( args ); } else { - this->clientCtx.signal ( status, pFileName, lineNo, + this->pClientCtx->signal ( status, pFileName, lineNo, "op=%u, channel=%s, type=%s, count=%lu, ctx=\"%s\"", op, ca_name ( &chan ), dbr_type_to_text ( static_cast ( type ) ), @@ -200,34 +272,6 @@ void ca_client_context::exception ( int status, const char *pContext, } } -void ca_client_context::fdWasCreated ( int fd ) -{ - CAFDHANDLER *pFunc; - void *pArg; - { - epicsGuard < ca_client_context_mutex > autoMutex ( this->mutex ); - pFunc = this->fdRegFunc; - pArg = this->fdRegArg; - } - if ( pFunc ) { - ( *pFunc ) ( pArg, fd, true ); - } -} - -void ca_client_context::fdWasDestroyed ( int fd ) -{ - CAFDHANDLER *pFunc; - void *pArg; - { - epicsGuard < ca_client_context_mutex > autoMutex ( this->mutex ); - pFunc = this->fdRegFunc; - pArg = this->fdRegArg; - } - if ( pFunc ) { - ( *pFunc ) ( pArg, fd, false ); - } -} - void ca_client_context::show ( unsigned level ) const { ::printf ( "ca_client_context at %p pndRecvCnt=%u ioSeqNo=%u\n", @@ -235,9 +279,9 @@ void ca_client_context::show ( unsigned level ) const this->pndRecvCnt, this->ioSeqNo ); if ( level > 0u ) { this->mutex.show ( level - 1u ); - this->clientCtx.show ( level - 1u ); + this->pClientCtx->show ( level - 1u ); ::printf ( "\tpreemptive callback is %s\n", - this->pCallbackGuard ? "disabled" : "enabled" ); + this->pCallbackGuard.get() ? "disabled" : "enabled" ); ::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n", this->pndRecvCnt ); ::printf ( "\tthe current io sequence number is %u\n", @@ -351,10 +395,36 @@ int ca_client_context::pendEvent ( const double & timeout ) this->flushRequest (); + { + bool cleanupNeeded = false; + { + epicsGuard < ca_client_context_mutex > guard ( this->mutex ); + if ( this->fdRegFunc ) { + cleanupNeeded = true; + } + } + if ( cleanupNeeded ) { + // send short udp message to wake up a file descriptor manager + // when a message arrives + osiSockAddr tmpAddr; + osiSocklen_t addrSize = sizeof ( tmpAddr.sa ); + char buf = 0; + int status = 0; + do { + status = recvfrom ( this->sock, & buf, sizeof ( buf ), + 0, & tmpAddr.sa, & addrSize ); + } while ( status > 0 ); + { + epicsGuard < ca_client_context_mutex > guard ( this->mutex ); + this->noWakeupSincePend = true; + } + } + } + // process at least once if preemptive callback is disabled - if ( this->pCallbackGuard ) { + if ( this->pCallbackGuard.get() ) { epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard ); - this->clientCtx.waitUntilNoRecvThreadsPending (); + this->pClientCtx->waitUntilNoRecvThreadsPending (); } double elapsed = epicsTime::getCurrent() - current; @@ -368,7 +438,7 @@ int ca_client_context::pendEvent ( const double & timeout ) } if ( delay >= CAC_SIGNIFICANT_DELAY ) { - if ( this->pCallbackGuard ) { + if ( this->pCallbackGuard.get() ) { epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard ); epicsThreadSleep ( delay ); } @@ -380,9 +450,10 @@ int ca_client_context::pendEvent ( const double & timeout ) return ECA_TIMEOUT; } -void ca_client_context::blockForEventAndEnableCallbacks ( epicsEvent & event, double timeout ) +void ca_client_context::blockForEventAndEnableCallbacks ( + epicsEvent & event, const double & timeout ) { - if ( this->pCallbackGuard ) { + if ( this->pCallbackGuard.get() ) { epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard ); event.wait ( timeout ); } @@ -390,3 +461,26 @@ void ca_client_context::blockForEventAndEnableCallbacks ( epicsEvent & event, do event.wait ( timeout ); } } + +void ca_client_context::messageArrivalNotify () +{ + bool sendNeeded = false; + { + epicsGuard < ca_client_context_mutex > guard ( this->mutex ); + if ( this->fdRegFunc && this->noWakeupSincePend ) { + this->noWakeupSincePend = false; + sendNeeded = true; + } + } + if ( sendNeeded ) { + // send short udp message to wake up a file descriptor manager + // when a message arrives + osiSockAddr tmpAddr; + tmpAddr.ia.sin_family = AF_INET; + tmpAddr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_LOOPBACK ); + tmpAddr.ia.sin_port = epicsHTON16 ( this->localPort ); + char buf = 0; + sendto ( this->sock, & buf, sizeof ( buf ), + 0, & tmpAddr.sa, sizeof ( tmpAddr.sa ) ); + } +} diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 21b9f0f24..97d5636cb 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -625,7 +625,7 @@ bool cac::lookupChannelAndTransferToTCP ( } if ( newIIU ) { - piiu->start ( cbGuard ); + piiu->start (); } return true; @@ -1497,16 +1497,6 @@ void cac::selfTest () const this->beaconTable.verify (); } -void cac::notifyNewFD ( epicsGuard < callbackMutex > &, SOCKET sock ) const -{ - this->notify.fdWasCreated ( sock ); -} - -void cac::notifyDestroyFD ( epicsGuard < callbackMutex > &, SOCKET sock ) const -{ - this->notify.fdWasDestroyed ( sock ); -} - void cac::disconnectNotify ( tcpiiu & iiu ) { epicsGuard < cacMutex > guard ( this->mutex ); @@ -1620,34 +1610,15 @@ void cac::pvMultiplyDefinedNotify ( msgForMultiplyDefinedPV & mfmdpv, void cac::waitUntilNoRecvThreadsPending () { if ( ! this->preemptiveCallbackEnabled ) { - { - fd_set mask; - FD_ZERO ( & mask ); - SOCKET maxFD = 0; - epicsGuard < cacMutex > guard ( this->mutex ); - tsDLIter < tcpiiu > iter = this->serverList.firstIter (); - if ( this->pudpiiu ) { - this->pudpiiu->fdMaskSet ( mask, maxFD ); - } - while ( iter.valid() ) { - iter->fdMaskSet ( mask, maxFD ); - iter++; - } - - struct timeval delay = { 0, 0 }; - int status = select ( maxFD+1, & mask, 0, 0, & delay ); - if ( status <= 0 ) { - return; - } - this->nRecvThreadsPending = - static_cast < unsigned > ( status ); + epicsGuard < cacMutex > guard ( this->mutex ); + while ( this->nRecvThreadsPending > 0 ) { + epicsGuardRelease < cacMutex > unguard ( guard ); + this->recvThreadActivityComplete.wait ( 30.0 ); } - - this->recvThreadActivityComplete.wait ( 0.1 ); } } -void cac::signalRecvThreadActivity () +void cac::messageProcessingCompleteNotify () { if ( ! this->preemptiveCallbackEnabled ) { bool signalNeeded = false; @@ -1669,5 +1640,13 @@ void cac::signalRecvThreadActivity () } } +void cac::messageArrivalNotify () +{ + if ( ! this->preemptiveCallbackEnabled ) { + epicsGuard < cacMutex > guard ( this->mutex ); + this->nRecvThreadsPending++; + } + this->notify.messageArrivalNotify (); +} diff --git a/src/ca/cac.h b/src/ca/cac.h index 1a4085108..22581f35f 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -185,8 +185,6 @@ public: cacMutex & mutexRef (); void attachToClientCtx (); void selfTest () const; - void notifyNewFD ( epicsGuard < callbackMutex > &, SOCKET ) const; - void notifyDestroyFD ( epicsGuard < callbackMutex > &, SOCKET ) const; bool preemptiveCallbakIsEnabled () const; double beaconPeriod ( const nciu & chan ) const; static unsigned lowestPriorityLevelAbove ( unsigned priority ); @@ -194,7 +192,8 @@ public: void initiateAbortShutdown ( tcpiiu & ); void disconnectNotify ( tcpiiu & ); void uninstallIIU ( tcpiiu & ); - void signalRecvThreadActivity (); + void messageArrivalNotify (); + void messageProcessingCompleteNotify (); private: localHostName hostNameCache; diff --git a/src/ca/cacIO.h b/src/ca/cacIO.h index 45a8f76a1..0f70cebcc 100644 --- a/src/ca/cacIO.h +++ b/src/ca/cacIO.h @@ -213,16 +213,15 @@ class cacNotify { // X aCC 655 public: virtual ~cacNotify () = 0; // we should probably have a different vf for each type of exception ???? - virtual void exception ( int status, const char *pContext, - const char *pFileName, unsigned lineNo ) = 0; + virtual void exception ( int status, const char * pContext, + const char * pFileName, unsigned lineNo ) = 0; // perhaps this should be phased out in deference to the exception mechanism virtual int vPrintf ( const char *pformat, va_list args ) const = 0; -// this should probably be phased out (its not OS independent) - virtual void fdWasCreated ( int fd ) = 0; - virtual void fdWasDestroyed ( int fd ) = 0; // backwards compatibility virtual void attachToClientCtx () = 0; - virtual void blockForEventAndEnableCallbacks ( class epicsEvent & event, double timeout ) = 0; + virtual void blockForEventAndEnableCallbacks ( + class epicsEvent & event, const double & timeout ) = 0; + virtual void messageArrivalNotify () = 0; }; class cacService : public tsDLNode < cacService > { // X aCC 655 diff --git a/src/ca/oldAccess.h b/src/ca/oldAccess.h index 2a69512c6..873003ba1 100644 --- a/src/ca/oldAccess.h +++ b/src/ca/oldAccess.h @@ -33,6 +33,7 @@ #endif #include "tsFreeList.h" +#include "epicsMemory.h" #include "cxxCompilerDependencies.h" #ifdef oldAccessh_restore_epicsExportSharedSymbols @@ -146,7 +147,6 @@ public: getCallback ( oldChannelNotify &chanIn, caEventCallBackFunc *pFunc, void *pPrivate ); ~getCallback (); - void destroy (); void * operator new ( size_t size, tsFreeList < class getCallback, 1024 > & ); epicsPlacementDeleteOperator (( void *, @@ -250,10 +250,11 @@ public: void exception ( int status, const char *pContext, const char *pFileName, unsigned lineNo, oldChannelNotify &chan, unsigned type, arrayElementCount count, unsigned op ); + void blockForEventAndEnableCallbacks ( + epicsEvent & event, const double & timeout ); CASG * lookupCASG ( unsigned id ); void installCASG ( CASG & ); void uninstallCASG ( CASG & ); - void blockForEventAndEnableCallbacks ( epicsEvent & event, double timeout ); void selfTest (); // perhaps these should be eliminated in deference to the exception mechanism int printf ( const char *pformat, ... ) const; @@ -267,6 +268,9 @@ public: void destroyGetCallback ( getCallback & ); void destroyPutCallback ( putCallback & ); void destroySubscription ( oldSubscription & ); + + // exceptions + class noSocket {}; private: tsFreeList < struct oldChannelNotify, 1024 > oldChannelNotifyFreeList; tsFreeList < class getCopy, 1024 > getCopyFreeList; @@ -276,18 +280,21 @@ private: tsFreeList < struct CASG, 128 > casgFreeList; mutable ca_client_context_mutex mutex; epicsEvent ioDone; - cac & clientCtx; - epicsGuard < callbackMutex > * pCallbackGuard; + epics_auto_ptr < cac > pClientCtx; + epics_auto_ptr < epicsGuard < callbackMutex > > pCallbackGuard; caExceptionHandler * ca_exception_func; void * ca_exception_arg; caPrintfFunc * pVPrintfFunc; CAFDHANDLER * fdRegFunc; - void * fdRegArg; + void * fdRegArg; + SOCKET sock; unsigned pndRecvCnt; unsigned ioSeqNo; -// this should probably be phased out (its not OS independent) - void fdWasCreated ( int fd ); - void fdWasDestroyed ( int fd ); + ca_uint16_t localPort; + bool fdRegFuncNeedsToBeCalled; + bool noWakeupSincePend; + + void messageArrivalNotify (); void attachToClientCtx (); ca_client_context ( const ca_client_context & ); ca_client_context & operator = ( const ca_client_context & ); @@ -497,11 +504,6 @@ inline void putCallback::operator delete ( void * pCadaver, } #endif -inline void getCallback::destroy () -{ - delete this; -} - inline void * getCallback::operator new ( size_t size, tsFreeList < class getCallback, 1024 > & freeList ) { @@ -518,60 +520,60 @@ inline void getCallback::operator delete ( void *pCadaver, inline void ca_client_context::registerService ( cacService &service ) { - this->clientCtx.registerService ( service ); + this->pClientCtx->registerService ( service ); } inline cacChannel & ca_client_context::createChannel ( const char * name_str, oldChannelNotify & chan, cacChannel::priLev pri ) { - return this->clientCtx.createChannel ( name_str, chan, pri ); + return this->pClientCtx->createChannel ( name_str, chan, pri ); } inline void ca_client_context::flushRequest () { - this->clientCtx.flushRequest (); + this->pClientCtx->flushRequest (); } inline unsigned ca_client_context::connectionCount () const { - return this->clientCtx.connectionCount (); + return this->pClientCtx->connectionCount (); } inline unsigned ca_client_context::beaconAnomaliesSinceProgramStart () const { - return this->clientCtx.beaconAnomaliesSinceProgramStart (); + return this->pClientCtx->beaconAnomaliesSinceProgramStart (); } inline CASG * ca_client_context::lookupCASG ( unsigned id ) { - return this->clientCtx.lookupCASG ( id ); + return this->pClientCtx->lookupCASG ( id ); } inline void ca_client_context::installCASG ( CASG &sg ) { - this->clientCtx.installCASG ( sg ); + this->pClientCtx->installCASG ( sg ); } inline void ca_client_context::uninstallCASG ( CASG &sg ) { - this->clientCtx.uninstallCASG ( sg ); + this->pClientCtx->uninstallCASG ( sg ); } inline void ca_client_context::vSignal ( int ca_status, const char *pfilenm, int lineno, const char *pFormat, va_list args ) { - this->clientCtx.vSignal ( ca_status, pfilenm, + this->pClientCtx->vSignal ( ca_status, pfilenm, lineno, pFormat, args ); } inline void ca_client_context::selfTest () { - this->clientCtx.selfTest (); + this->pClientCtx->selfTest (); } inline bool ca_client_context::preemptiveCallbakIsEnabled () const { - return this->clientCtx.preemptiveCallbakIsEnabled (); + return this->pClientCtx->preemptiveCallbakIsEnabled (); } inline bool ca_client_context::ioComplete () const @@ -587,7 +589,7 @@ inline unsigned ca_client_context::sequenceNumberOfOutstandingIO () const inline epicsGuard < callbackMutex > ca_client_context::callbackGuardFactory () { - return this->clientCtx.callbackGuardFactory (); + return this->pClientCtx->callbackGuardFactory (); } inline void ca_client_context_mutex::lock () diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 9521fee93..2492cbcc3 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -123,9 +123,8 @@ void tcpSendThread::run () this->iiu.sendDog.cancel (); { - epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); epicsGuard < cacMutex > guard ( this->iiu.cacRef.mutexRef() ); - this->iiu.shutdown ( cbGuard, guard ); + this->iiu.shutdown ( guard ); } // wakeup user threads blocking for send backlog to be reduced @@ -310,20 +309,9 @@ void tcpRecvThread::run () // file manager call backs works correctly. This does not // appear to impact performance. // - unsigned nBytesIn; - if ( this->iiu.cacRef.preemptiveCallbakIsEnabled() ) { - nBytesIn = pComBuf->fillFromWire ( this->iiu ); - if ( nBytesIn == 0u ) { - continue; - } - } - else { - this->iiu.blockUntilBytesArePendingInOS (); - nBytesIn = 0u; - if ( this->iiu.state != tcpiiu::iiucs_connected && - this->iiu.state != tcpiiu::iiucs_clean_shutdown ) { - break; - } + unsigned nBytesIn = pComBuf->fillFromWire ( this->iiu ); + if ( nBytesIn == 0u ) { + continue; } // reschedule connection activity watchdog @@ -333,19 +321,13 @@ void tcpRecvThread::run () // - it take also the callback lock this->iiu.recvDog.messageArrivalNotify (); + this->iiu.cacRef.messageArrivalNotify (); + // only one recv thread at a time may call callbacks // - pendEvent() blocks until threads waiting for // this lock get a chance to run epicsGuard < callbackMutex > guard ( this->cbMutex ); - if ( ! this->iiu.cacRef.preemptiveCallbakIsEnabled() ) { - nBytesIn = pComBuf->fillFromWire ( this->iiu ); - if ( this->iiu.state != tcpiiu::iiucs_connected && - this->iiu.state != tcpiiu::iiucs_clean_shutdown ) { - break; - } - } - // force the receive watchdog to be reset every 5 frames unsigned contiguousFrameCount = 0; while ( nBytesIn ) { @@ -381,7 +363,7 @@ void tcpRecvThread::run () nBytesIn = pComBuf->fillFromWire ( this->iiu ); } - this->iiu.cacRef.signalRecvThreadActivity (); + this->iiu.cacRef.messageProcessingCompleteNotify (); } if ( pComBuf ) { @@ -398,9 +380,8 @@ void tcpRecvThread::run () // until it receives its blocking socket call interrupt // signal. { - epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); epicsGuard < cacMutex > guard ( this->iiu.cacRef.mutexRef() ); - this->iiu.shutdown ( cbGuard, guard ); + this->iiu.shutdown ( guard ); } } @@ -547,10 +528,9 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, // this must always be called by the udp thread when it holds // the callback lock. -void tcpiiu::start ( epicsGuard < callbackMutex > & cbGuard ) +void tcpiiu::start () { this->recvThread.start (); - this->cacRef.notifyNewFD ( cbGuard, this->sock ); } /* @@ -634,19 +614,14 @@ void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard, } this->discardingPendingData = true; } - this->shutdown ( cbGuard, guard ); + this->shutdown ( guard ); } -void tcpiiu::shutdown ( epicsGuard < callbackMutex > & cbGuard, - epicsGuard & guard ) +void tcpiiu::shutdown ( epicsGuard & guard ) { iiu_conn_state oldState = this->state; if ( oldState != iiucs_abort_shutdown ) { this->state = iiucs_abort_shutdown; - { - epicsGuardRelease < cacMutex > guardRelease ( guard ); - this->cacRef.notifyDestroyFD ( cbGuard, this->sock ); - } // // on HPUX close() and shutdown() are not enough so we must also diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp index c6841bb77..c7bd5c9e0 100644 --- a/src/ca/udpiiu.cpp +++ b/src/ca/udpiiu.cpp @@ -135,10 +135,8 @@ udpiiu::udpiiu ( epicsTimerQueueActive & timerQueue, callbackMutex & cbMutex, ca } #endif - /* - * force a bind to an unconstrained address because we may end - * up receiving first - */ + // force a bind to an unconstrained address so we can obtain + // the local port number below memset ( (char *)&addr, 0 , sizeof (addr) ); addr.ia.sin_family = AF_INET; addr.ia.sin_addr.s_addr = epicsHTON32 (INADDR_ANY); @@ -241,31 +239,16 @@ void udpiiu::shutdown () void udpiiu::recvMsg ( callbackMutex & cbMutex ) { osiSockAddr src; - int status; + osiSocklen_t src_size = sizeof ( src ); + int status = recvfrom ( this->sock, + this->recvBuf, sizeof ( this->recvBuf ), 0, + & src.sa, & src_size ); - if ( this->cacRef.preemptiveCallbakIsEnabled() ) { - osiSocklen_t src_size = sizeof ( src ); - status = recvfrom ( this->sock, this->recvBuf, sizeof ( this->recvBuf ), 0, - &src.sa, &src_size ); - } - else { - // peek first at the message so that file descriptor managers will wake up - // in single threaded applications - osiSocklen_t src_size = sizeof ( src ); - char peek; - recvfrom ( this->sock, & peek, sizeof ( peek ), MSG_PEEK, - &src.sa, &src_size ); - status = 0; - } + this->cacRef.messageArrivalNotify (); { epicsGuard < callbackMutex > guard ( cbMutex ); - if ( ! this->cacRef.preemptiveCallbakIsEnabled() ) { - osiSocklen_t src_size = sizeof ( src ); - status = recvfrom ( this->sock, this->recvBuf, sizeof ( this->recvBuf ), 0, - &src.sa, &src_size ); - } if ( status <= 0 ) { if ( status == 0 ) { @@ -303,7 +286,8 @@ void udpiiu::recvMsg ( callbackMutex & cbMutex ) (arrayElementCount) status, epicsTime::getCurrent() ); } } - return; + + this->cacRef.messageProcessingCompleteNotify (); } udpRecvThread::udpRecvThread ( udpiiu & iiuIn, callbackMutex & cbMutexIn, @@ -331,7 +315,6 @@ void udpRecvThread::run () { epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); - this->iiu.cacRef.notifyNewFD ( cbGuard, this->iiu.sock ); if ( ellCount ( & this->iiu.dest ) == 0 ) { // X aCC 392 genLocalExcep ( cbGuard, this->iiu.cacRef, ECA_NOSEARCHADDR, NULL ); } @@ -340,13 +323,7 @@ void udpRecvThread::run () do { this->iiu.recvMsg ( this->cbMutex ); - this->iiu.cacRef.signalRecvThreadActivity (); } while ( ! this->iiu.shutdownCmd ); - - { - epicsGuard < callbackMutex > cbGuard ( this->cbMutex ); - this->iiu.cacRef.notifyDestroyFD ( cbGuard, this->iiu.sock ); - } } /* diff --git a/src/ca/udpiiu.h b/src/ca/udpiiu.h index e40b01b63..023feee2c 100644 --- a/src/ca/udpiiu.h +++ b/src/ca/udpiiu.h @@ -99,7 +99,6 @@ public: bool pushDatagramMsg ( const caHdr &hdr, const void *pExt, ca_uint16_t extsize); void shutdown (); double roundTripDelayEstimate () const; - void fdMaskSet ( fd_set & mask, SOCKET & maxFD ) const; // exceptions class noSocket {}; @@ -121,9 +120,9 @@ private: SOCKET sock; epics_auto_ptr < class searchTimer > pSearchTmr; epics_auto_ptr < class repeaterSubscribeTimer > pRepeaterSubscribeTmr; - unsigned short repeaterPort; - unsigned short serverPort; - unsigned short localPort; + ca_uint16_t repeaterPort; + ca_uint16_t serverPort; + ca_uint16_t localPort; bool shutdownCmd; bool rtteActive; bool lastReceivedSeqNoIsValid; @@ -223,11 +222,5 @@ inline double udpiiu::roundTripDelayEstimate () const return this->rtteMean; } -inline void udpiiu::fdMaskSet ( fd_set & mask, SOCKET & maxFD ) const -{ - maxFD = tsMax ( this->sock, maxFD ); - FD_SET ( this->sock, & mask ); -} - #endif // udpiiuh diff --git a/src/ca/virtualCircuit.h b/src/ca/virtualCircuit.h index 0e7b67b0e..ff20e5aff 100644 --- a/src/ca/virtualCircuit.h +++ b/src/ca/virtualCircuit.h @@ -95,7 +95,7 @@ public: comBufMemoryManager &, unsigned minorVersion, ipAddrToAsciiEngine & engineIn, const cacChannel::priLev & priorityIn ); ~tcpiiu (); - void start ( epicsGuard < callbackMutex > & ); + void start (); void initiateCleanShutdown ( epicsGuard < cacMutex > & ); void initiateAbortShutdown ( epicsGuard < callbackMutex > &, epicsGuard & ); @@ -134,8 +134,6 @@ public: bool bytesArePendingInOS () const; - void fdMaskSet ( fd_set &, SOCKET & maxFd ) const; - private: hostNameCache hostNameCacheInstance; tcpRecvThread recvThread; @@ -181,8 +179,7 @@ private: void connect (); const char * pHostName () const; void blockUntilBytesArePendingInOS (); - void shutdown ( epicsGuard < callbackMutex > &, - epicsGuard & ); + void shutdown ( epicsGuard & ); // send protocol stubs void echoRequest ( epicsGuard < cacMutex > & ); @@ -250,12 +247,6 @@ inline unsigned tcpiiu::channelCount () return this->channelList.count (); } -inline void tcpiiu::fdMaskSet ( fd_set & mask, SOCKET & maxFD ) const -{ - maxFD = tsMax ( this->sock, maxFD ); - FD_SET ( this->sock, & mask ); -} - inline void tcpRecvThread::interruptSocketRecv () { epicsThreadId threadId = this->thread.getId ();