diff --git a/src/ca/cac.h b/src/ca/cac.h index 8ef88c381..9dfcf2f96 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -109,6 +109,14 @@ public: epicsGuard < cacMutex > &, nciu & chan ) = 0; }; +class cacMessageProcessingMinder { +public: + cacMessageProcessingMinder ( class cac & ); + ~cacMessageProcessingMinder (); +private: + class cac & cacRef; +}; + class cac : private cacRecycle, private cacDisconnectChannelPrivate, private callbackForMultiplyDefinedPV { @@ -193,8 +201,6 @@ public: void initiateAbortShutdown ( tcpiiu & ); void disconnectNotify ( tcpiiu & ); void uninstallIIU ( tcpiiu & ); - void messageArrivalNotify (); - void messageProcessingCompleteNotify (); private: localHostName hostNameCache; @@ -340,10 +346,26 @@ private: const char *pCtx, unsigned status ); static const pExcepProtoStubTCP tcpExcepJumpTableCAC []; + void messageArrivalNotify (); + void messageProcessingCompleteNotify (); + cac ( const cac & ); cac & operator = ( const cac & ); + + friend class cacMessageProcessingMinder; }; +inline cacMessageProcessingMinder::cacMessageProcessingMinder ( cac & cacIn ) : + cacRef ( cacIn ) +{ + cacIn.messageArrivalNotify (); +} + +inline cacMessageProcessingMinder::~cacMessageProcessingMinder () +{ + cacRef.messageProcessingCompleteNotify (); +} + inline const char * cac::userNamePointer () const { return this->pUserName; diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 2e751ca9d..85fca9ef6 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -323,49 +323,49 @@ void tcpRecvThread::run () // - it take also the callback lock this->iiu.recvDog.messageArrivalNotify ( currentTime ); - this->iiu.cacRef.messageArrivalNotify (); + cacMessageProcessingMinder msgProcMinder ( this->iiu.cacRef ); + { + // only one recv thread at a time may call callbacks + // - pendEvent() blocks until threads waiting for + // this lock get a chance to run + epicsGuard < callbackMutex > guard ( this->cbMutex ); - // only one recv thread at a time may call callbacks - // - pendEvent() blocks until threads waiting for - // this lock get a chance to run - epicsGuard < callbackMutex > guard ( this->cbMutex ); - - // force the receive watchdog to be reset every 5 frames - unsigned contiguousFrameCount = 0; - while ( nBytesIn ) { - if ( nBytesIn == pComBuf->capacityBytes () ) { - if ( this->iiu.contigRecvMsgCount >= - contiguousMsgCountWhichTriggersFlowControl ) { - this->iiu.busyStateDetected = true; + // force the receive watchdog to be reset every 5 frames + unsigned contiguousFrameCount = 0; + while ( nBytesIn ) { + if ( nBytesIn == pComBuf->capacityBytes () ) { + if ( this->iiu.contigRecvMsgCount >= + contiguousMsgCountWhichTriggersFlowControl ) { + this->iiu.busyStateDetected = true; + } + else { + this->iiu.contigRecvMsgCount++; + } } - else { - this->iiu.contigRecvMsgCount++; + else { + this->iiu.contigRecvMsgCount = 0u; + this->iiu.busyStateDetected = false; + } + this->iiu.unacknowledgedSendBytes = 0u; + + this->iiu.recvQue.pushLastComBufReceived ( *pComBuf ); + pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; + + // execute receive labor + bool protocolOK = this->iiu.processIncoming ( currentTime, guard ); + if ( ! protocolOK ) { + this->iiu.cacRef.initiateAbortShutdown ( this->iiu ); + break; } + + if ( ! this->iiu.bytesArePendingInOS () + || ++contiguousFrameCount > 5 ) { + break; + } + + nBytesIn = pComBuf->fillFromWire ( this->iiu ); } - else { - this->iiu.contigRecvMsgCount = 0u; - this->iiu.busyStateDetected = false; - } - this->iiu.unacknowledgedSendBytes = 0u; - - this->iiu.recvQue.pushLastComBufReceived ( *pComBuf ); - pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; - - // execute receive labor - bool protocolOK = this->iiu.processIncoming ( currentTime, guard ); - if ( ! protocolOK ) { - this->iiu.cacRef.initiateAbortShutdown ( this->iiu ); - break; - } - - if ( ! this->iiu.bytesArePendingInOS () - || ++contiguousFrameCount > 5 ) { - break; - } - - nBytesIn = pComBuf->fillFromWire ( this->iiu ); } - this->iiu.cacRef.messageProcessingCompleteNotify (); } if ( pComBuf ) { @@ -373,8 +373,22 @@ void tcpRecvThread::run () this->iiu.comBufMemMgr.release ( pComBuf ); } } + catch ( std::bad_alloc & ) { + errlogPrintf ( + "CA client library tcp receive thread " + "terminating due to no space in pool " + "C++ exception\n" ); + } + catch ( std::exception & except ) { + errlogPrintf ( + "CA client library tcp receive thread " + "terminating due to C++ exception \"%s\"\n", + except.what () ); + } catch ( ... ) { - errlogPrintf ( "cac tcp receive thread terminating due to a c++ exception\n" ); + errlogPrintf ( + "CA client library tcp receive thread " + "terminating due to a C++ exception\n" ); } // Although this is redundant in certain situations it is diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp index 8775ad3f8..5ae3102f3 100644 --- a/src/ca/udpiiu.cpp +++ b/src/ca/udpiiu.cpp @@ -244,8 +244,7 @@ void udpiiu::recvMsg ( callbackMutex & cbMutex ) this->recvBuf, sizeof ( this->recvBuf ), 0, & src.sa, & src_size ); - this->cacRef.messageArrivalNotify (); - + cacMessageProcessingMinder msgProcMinder ( this->cacRef ); { epicsGuard < callbackMutex > guard ( cbMutex ); @@ -286,8 +285,6 @@ void udpiiu::recvMsg ( callbackMutex & cbMutex ) (arrayElementCount) status, epicsTime::getCurrent() ); } } - - this->cacRef.messageProcessingCompleteNotify (); } udpRecvThread::udpRecvThread ( udpiiu & iiuIn, callbackMutex & cbMutexIn,