diff --git a/src/ca/oldAccess.h b/src/ca/oldAccess.h index 27b18f6a7..b4d694fc0 100644 --- a/src/ca/oldAccess.h +++ b/src/ca/oldAccess.h @@ -84,10 +84,13 @@ protected: ~oldChannelNotify (); // must allocate from pool private: oldCAC & cacCtx; + cacChannel & io; caCh * pConnCallBack; void * pPrivate; caArh * pAccessRightsFunc; - cacChannel & io; + unsigned ioSeqNo; + bool prevConnected; + bool connCallbackInProress; void connectNotify (); void disconnectNotify (); void accessRightsNotify ( const caAccessRights & ); @@ -96,7 +99,6 @@ private: unsigned type, arrayElementCount count, void *pValue ); void writeException ( int status, const char *pContext, unsigned type, arrayElementCount count ); - bool includeFirstConnectInCountOfOutstandingIO () const; static epicsSingleton < tsFreeList < struct oldChannelNotify, 1024 > > pFreeList; oldChannelNotify ( const oldChannelNotify & ); oldChannelNotify & operator = ( const oldChannelNotify & ); @@ -118,7 +120,7 @@ private: oldCAC &cacCtx; oldChannelNotify &chan; void *pValue; - unsigned readSeq; + unsigned ioSeqNo; unsigned type; void completion ( unsigned type, arrayElementCount count, const void *pData); @@ -212,21 +214,21 @@ struct oldCAC : public cacNotify public: oldCAC ( bool enablePreemptiveCallback = false ); virtual ~oldCAC (); - void changeExceptionEvent ( caExceptionHandler *pfunc, void *arg ); - void registerForFileDescriptorCallBack ( CAFDHANDLER *pFunc, void *pArg ); - void replaceErrLogHandler ( caPrintfFunc *ca_printf_func ); - void registerService ( cacService &service ); + void changeExceptionEvent ( caExceptionHandler * pfunc, void * arg ); + void registerForFileDescriptorCallBack ( CAFDHANDLER * pFunc, void * pArg ); + void replaceErrLogHandler ( caPrintfFunc * ca_printf_func ); + void registerService ( cacService & service ); cacChannel & createChannel ( const char * name_str, oldChannelNotify & chan, cacChannel::priLev pri ); void flushRequest (); - int pendIO ( const double &timeout ); - int pendEvent ( const double &timeout ); + int pendIO ( const double & timeout ); + int pendEvent ( const double & timeout ); bool ioComplete () const; void show ( unsigned level ) const; unsigned connectionCount () const; unsigned sequenceNumberOfOutstandingIO () const; - void incrementOutstandingIO (); - void decrementOutstandingIO ( unsigned sequenceNo ); + void incrementOutstandingIO ( unsigned ioSeqNo ); + void decrementOutstandingIO ( unsigned ioSeqNo ); void exception ( int status, const char *pContext, const char *pFileName, unsigned lineNo ); void exception ( int status, const char *pContext, @@ -235,7 +237,7 @@ public: CASG * lookupCASG ( unsigned id ); void installCASG ( CASG & ); void uninstallCASG ( CASG & ); - int blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ); + void blockForEventAndEnableCallbacks ( epicsEvent & event, double timeout ); void selfTest (); // perhaps these should be eliminated in deference to the exception mechanism int printf ( const char *pformat, ... ) const; @@ -243,14 +245,19 @@ public: void vSignal ( int ca_status, const char *pfilenm, int lineno, const char *pFormat, va_list args ); bool preemptiveCallbakIsEnabled () const; + epicsGuard < callbackMutex > callbackGuardFactory (); private: mutable oldCACMutex mutex; + epicsEvent ioDone; cac & clientCtx; - caExceptionHandler *ca_exception_func; - void *ca_exception_arg; - caPrintfFunc *pVPrintfFunc; - CAFDHANDLER *fdRegFunc; - void *fdRegArg; + epicsGuard < callbackMutex > * pCallbackGuard; + caExceptionHandler * ca_exception_func; + void * ca_exception_arg; + caPrintfFunc * pVPrintfFunc; + CAFDHANDLER * fdRegFunc; + void * fdRegArg; + unsigned pndRecvCnt; + unsigned ioSeqNo; // this should probably be phased out (its not OS independent) void fdWasCreated ( int fd ); void fdWasDestroyed ( int fd ); @@ -353,7 +360,7 @@ inline bool oldChannelNotify::connected () const inline bool oldChannelNotify::previouslyConnected () const { - return this->io.previouslyConnected (); + return this->prevConnected; } inline void oldChannelNotify::hostName ( char *pBuf, unsigned bufLength ) const @@ -457,46 +464,16 @@ inline cacChannel & oldCAC::createChannel ( const char * name_str, return this->clientCtx.createChannel ( name_str, chan, pri ); } -inline int oldCAC::pendIO ( const double &timeout ) -{ - return this->clientCtx.pendIO ( timeout ); -} - -inline int oldCAC::pendEvent ( const double &timeout ) -{ - return this->clientCtx.pendEvent ( timeout ); -} - inline void oldCAC::flushRequest () { this->clientCtx.flushRequest (); } -inline bool oldCAC::ioComplete () const -{ - return this->clientCtx.ioComplete (); -} - inline unsigned oldCAC::connectionCount () const { return this->clientCtx.connectionCount (); } -inline unsigned oldCAC::sequenceNumberOfOutstandingIO () const -{ - return this->clientCtx.sequenceNumberOfOutstandingIO (); -} - -inline void oldCAC::incrementOutstandingIO () -{ - this->clientCtx.incrementOutstandingIO (); -} - -inline void oldCAC::decrementOutstandingIO ( unsigned sequenceNo ) -{ - this->clientCtx.decrementOutstandingIO ( sequenceNo ); -} - inline CASG * oldCAC::lookupCASG ( unsigned id ) { return this->clientCtx.lookupCASG ( id ); @@ -512,12 +489,6 @@ inline void oldCAC::uninstallCASG ( CASG &sg ) this->clientCtx.uninstallCASG ( sg ); } -inline int oldCAC::blockForEventAndEnableCallbacks ( - epicsEvent &event, double timeout ) -{ - return this->clientCtx.blockForEventAndEnableCallbacks ( event, timeout ); -} - inline void oldCAC::vSignal ( int ca_status, const char *pfilenm, int lineno, const char *pFormat, va_list args ) { @@ -535,6 +506,21 @@ inline bool oldCAC::preemptiveCallbakIsEnabled () const return this->clientCtx.preemptiveCallbakIsEnabled (); } +inline bool oldCAC::ioComplete () const +{ + return ( this->pndRecvCnt == 0u ); +} + +inline unsigned oldCAC::sequenceNumberOfOutstandingIO () const +{ + return this->ioSeqNo; +} + +inline epicsGuard < callbackMutex > oldCAC::callbackGuardFactory () +{ + return this->clientCtx.callbackGuardFactory (); +} + inline void oldCACMutex::lock () { this->mutex.lock (); diff --git a/src/ca/oldCAC.cpp b/src/ca/oldCAC.cpp index 76c5dd224..a213870ef 100644 --- a/src/ca/oldCAC.cpp +++ b/src/ca/oldCAC.cpp @@ -18,6 +18,8 @@ # pragma warning(disable:4355) #endif +#include + #include #define epicsExportSharedSymbols @@ -29,18 +31,24 @@ extern epicsThreadPrivateId caClientContextId; oldCAC::oldCAC ( bool enablePreemptiveCallback ) : clientCtx ( * new cac ( *this, enablePreemptiveCallback ) ), ca_exception_func ( 0 ), ca_exception_arg ( 0 ), - pVPrintfFunc ( errlogVprintf ), fdRegFunc ( 0 ), fdRegArg ( 0 ) + pVPrintfFunc ( errlogVprintf ), fdRegFunc ( 0 ), fdRegArg ( 0 ), + pCallbackGuard ( 0 ), pndRecvCnt ( 0u ), ioSeqNo ( 0u ) { + if ( enablePreemptiveCallback ) { + this->pCallbackGuard = new epicsGuard < callbackMutex > + ( this->clientCtx.callbackGuardFactory () ); + } } oldCAC::~oldCAC () { + delete this->pCallbackGuard; delete & this->clientCtx; } void oldCAC::changeExceptionEvent ( caExceptionHandler *pfunc, void *arg ) { - epicsGuard < oldCACMutex > autoMutex ( this->mutex ); + epicsGuard < oldCACMutex > guard ( this->mutex ); this->ca_exception_func = pfunc; this->ca_exception_arg = arg; // should block here until releated callback in progress completes @@ -197,6 +205,15 @@ void oldCAC::show ( unsigned level ) const if ( level > 0u ) { this->mutex.show ( level - 1u ); this->clientCtx.show ( level - 1u ); + ::printf ( "\tpreemptive calback is %s\n", + this->pCallbackGuard ? "disabled" : "enabled" ); + ::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n", + this->pndRecvCnt ); + ::printf ( "\tthe current io sequence number is %u\n", + this->ioSeqNo ); + ::printf ( "IO done event:\n"); + this->ioDone.show ( level - 1u ); + } } @@ -206,5 +223,156 @@ void oldCAC::attachToClientCtx () epicsThreadPrivateSet ( caClientContextId, this ); } +void oldCAC::incrementOutstandingIO ( unsigned ioSeqNo ) +{ + if ( this->ioSeqNo == ioSeqNo ) { + epicsGuard < oldCACMutex > guard ( this->mutex ); + if ( this->ioSeqNo == ioSeqNo ) { + if ( this->pndRecvCnt < UINT_MAX ) { + this->pndRecvCnt++; + } + else { + throw std::logic_error ( + "oldCAC::incrementOutstandingIO() IO counter overflow" ); + } + } + } +} +void oldCAC::decrementOutstandingIO ( unsigned ioSeqNo ) +{ + if ( this->ioSeqNo != ioSeqNo ) { + return; + } + bool signalNeeded; + { + epicsGuard < oldCACMutex > guard ( this->mutex ); + if ( this->ioSeqNo == ioSeqNo ) { + if ( this->pndRecvCnt > 0u ) { + this->pndRecvCnt--; + if ( this->pndRecvCnt == 0u ) { + signalNeeded = true; + } + else { + signalNeeded = false; + } + } + else { + signalNeeded = true; + } + } + else { + signalNeeded = false; + } + } + + if ( signalNeeded ) { + this->ioDone.signal (); + } +} + +// !!!! This routine is only visible in the old interface - or in a new ST interface. +// !!!! In the old interface we restrict thread attach so that calls from threads +// !!!! other than the initializing thread are not allowed if preemptive callback +// !!!! is disabled. This prevents the preemptive callback lock from being released +// !!!! by other threads than the one that locked it. +// +int oldCAC::pendIO ( const double & timeout ) +{ + // prevent recursion nightmares by disabling calls to + // pendIO () from within a CA callback. + if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) { + return ECA_EVDISALLOW; + } + + int status = ECA_NORMAL; + epicsTime beg_time = epicsTime::getCurrent (); + double remaining = timeout; + + this->flushRequest (); + + while ( this->pndRecvCnt > 0 ) { + if ( remaining < CAC_SIGNIFICANT_DELAY ) { + status = ECA_TIMEOUT; + break; + } + + this->blockForEventAndEnableCallbacks ( this->ioDone, remaining ); + + double delay = epicsTime::getCurrent () - beg_time; + if ( delay < timeout ) { + remaining = timeout - delay; + } + else { + remaining = 0.0; + } + } + + { + epicsGuard < oldCACMutex > guard ( this->mutex ); + this->ioSeqNo++; + this->pndRecvCnt = 0u; + } + + return status; +} + +// !!!! This routine is only visible in the old interface - or in a new ST interface. +// !!!! In the old interface we restrict thread attach so that calls from threads +// !!!! other than the initializing thread are not allowed if preemptive callback +// !!!! is disabled. This prevents the preemptive callback lock from being released +// !!!! by other threads than the one that locked it. +// +// this routine should probably be moved to the oldCAC? +int oldCAC::pendEvent ( const double & timeout ) +{ + // prevent recursion nightmares by disabling calls to + // pendIO () from within a CA callback. + if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) { + return ECA_EVDISALLOW; + } + + epicsTime current = epicsTime::getCurrent (); + + this->flushRequest (); + + // process at least once if preemptive callback is disabled + if ( this->pCallbackGuard ) { + epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard ); + this->clientCtx.waitUntilNoRecvThreadsPending (); + } + + double elapsed = epicsTime::getCurrent() - current; + double delay; + + if ( timeout > elapsed ) { + delay = timeout - elapsed; + } + else { + delay = 0.0; + } + + if ( delay >= CAC_SIGNIFICANT_DELAY ) { + if ( this->pCallbackGuard ) { + epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard ); + epicsThreadSleep ( delay ); + } + else { + epicsThreadSleep ( delay ); + } + } + + return ECA_TIMEOUT; +} + +void oldCAC::blockForEventAndEnableCallbacks ( epicsEvent & event, double timeout ) +{ + if ( this->pCallbackGuard ) { + epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard ); + event.wait ( timeout ); + } + else { + event.wait ( timeout ); + } +} diff --git a/src/ca/oldChannelNotify.cpp b/src/ca/oldChannelNotify.cpp index 8edaf25b8..587696557 100644 --- a/src/ca/oldChannelNotify.cpp +++ b/src/ca/oldChannelNotify.cpp @@ -27,10 +27,6 @@ epicsSingleton < tsFreeList < struct oldChannelNotify, 1024 > > oldChannelNotify::pFreeList; -extern "C" void cacNoopConnHandler ( struct connection_handler_args ) -{ -} - extern "C" void cacNoopAccesRightsHandler ( struct access_rights_handler_args ) { } @@ -38,15 +34,28 @@ extern "C" void cacNoopAccesRightsHandler ( struct access_rights_handler_args ) oldChannelNotify::oldChannelNotify ( oldCAC & cacIn, const char *pName, caCh * pConnCallBackIn, void * pPrivateIn, capri priority ) : cacCtx ( cacIn ), - pConnCallBack ( pConnCallBackIn ? pConnCallBackIn : cacNoopConnHandler ), + pConnCallBack ( pConnCallBackIn ), pPrivate ( pPrivateIn ), pAccessRightsFunc ( cacNoopAccesRightsHandler ), - io ( cacIn.createChannel ( pName, *this, priority ) ) + io ( cacIn.createChannel ( pName, *this, priority ) ), + ioSeqNo ( cacIn.sequenceNumberOfOutstandingIO () ), + prevConnected ( false ) { + // no need to worry about a connect preempting here because + // the connect sequence will not start untill initiateConnect() + // is called + if ( pConnCallBackIn == 0 ) { + this->cacCtx.incrementOutstandingIO ( cacIn.sequenceNumberOfOutstandingIO () ); + } } oldChannelNotify::~oldChannelNotify () { + delete & this->io; + + // no need to worry about a connect preempting here because + // the nciu as been deleted + this->cacCtx.decrementOutstandingIO ( this->ioSeqNo ); } void oldChannelNotify::setPrivatePointer ( void *pPrivateIn ) @@ -59,15 +68,6 @@ void * oldChannelNotify::privatePointer () const return this->pPrivate; } -int oldChannelNotify::changeConnCallBack ( caCh *pfunc ) -{ - this->pConnCallBack = pfunc ? pfunc : cacNoopConnHandler; - // test for NOOP connection handler does _not_ occur here because the - // lock is not applied - this->io.notifyStateChangeFirstConnectInCountOfOutstandingIO (); - return ECA_NORMAL; -} - int oldChannelNotify::replaceAccessRightsEvent ( caArh *pfunc ) { // The order of the following is significant to guarantee that the @@ -87,20 +87,55 @@ int oldChannelNotify::replaceAccessRightsEvent ( caArh *pfunc ) return ECA_NORMAL; } +int oldChannelNotify::changeConnCallBack ( caCh * pfunc ) +{ + epicsGuard < callbackMutex > callbackGuard = + this->cacCtx.callbackGuardFactory (); + + if ( ! this->prevConnected ) { + if ( pfunc ) { + if ( ! this->pConnCallBack ) { + this->cacCtx.decrementOutstandingIO ( this->ioSeqNo ); + } + } + else { + if ( this->pConnCallBack ) { + this->cacCtx.incrementOutstandingIO ( this->ioSeqNo ); + } + } + } + this->pConnCallBack = pfunc; + + return ECA_NORMAL; +} + void oldChannelNotify::connectNotify () { - struct connection_handler_args args; - args.chid = this; - args.op = CA_OP_CONN_UP; - ( *this->pConnCallBack ) ( args ); + this->prevConnected = true; + if ( this->pConnCallBack ) { + struct connection_handler_args args; + args.chid = this; + args.op = CA_OP_CONN_UP; + ( *this->pConnCallBack ) ( args ); + + } + else { + this->cacCtx.decrementOutstandingIO ( this->ioSeqNo ); + } + } void oldChannelNotify::disconnectNotify () { - struct connection_handler_args args; - args.chid = this; - args.op = CA_OP_CONN_DOWN; - ( *this->pConnCallBack ) ( args ); + if ( this->pConnCallBack ) { + struct connection_handler_args args; + args.chid = this; + args.op = CA_OP_CONN_DOWN; + ( *this->pConnCallBack ) ( args ); + } + else { + this->cacCtx.incrementOutstandingIO ( this->ioSeqNo ); + } } void oldChannelNotify::accessRightsNotify ( const caAccessRights &ar ) @@ -131,11 +166,6 @@ void oldChannelNotify::writeException ( int status, const char *pContext, __FILE__, __LINE__, *this, type, count, CA_OP_PUT ); } -bool oldChannelNotify::includeFirstConnectInCountOfOutstandingIO () const -{ - return ( this->pConnCallBack == cacNoopConnHandler ); -} - void * oldChannelNotify::operator new ( size_t size ) { return oldChannelNotify::pFreeList->allocate ( size );