diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index db81b6a1b..9f45eefb9 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -94,7 +94,7 @@ void tcpSendThread::run () } } catch ( ... ) { - this->iiu.printf ("cac: tcp send thread received an exception - disconnecting\n"); + this->iiu.printf ("cac: tcp send thread received an unexpected exception - disconnecting\n"); this->iiu.forcedShutdown (); } @@ -226,120 +226,66 @@ void tcpRecvThread::start () void tcpRecvThread::run () { - this->iiu.pCAC()->attachToClientCtx (); + try { + this->iiu.pCAC()->attachToClientCtx (); - epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu ); + epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu ); - this->iiu.connect (); + this->iiu.connect (); - this->iiu.versionMessage ( this->iiu.priority() ); + this->iiu.versionMessage ( this->iiu.priority() ); - if ( this->iiu.state == iiu_connected ) { - this->iiu.sendThread.start (); - } - else { - this->iiu.sendThreadExitEvent.signal (); - this->iiu.cleanShutdown (); - } - - comBuf * pComBuf = new ( std::nothrow ) comBuf; - while ( this->iiu.state == iiu_connected ) { - if ( ! pComBuf ) { - // no way to be informed when memory is available - epicsThreadSleep ( 0.5 ); - pComBuf = new ( std::nothrow ) comBuf; - continue; - } - - // - // We leave the bytes pending and fetch them after - // callbacks are enabled when running in the old preemptive - // call back disabled mode so that asynchronous wakeup via - // file manager call backs works correctly. This does not - // appear to impact performance. - // - unsigned nBytesIn; - if ( this->iiu.pCAC()->preemptiveCallbakIsEnabled() ) { - nBytesIn = pComBuf->fillFromWire ( this->iiu ); - if ( nBytesIn == 0u ) { - break; - } + if ( this->iiu.state == iiu_connected ) { + this->iiu.sendThread.start (); } else { - char buf; - ::recv ( this->iiu.sock, &buf, 1, MSG_PEEK ); - nBytesIn = 0u; // make gnu hoppy - } - - if ( this->iiu.state != iiu_connected ) { - break; + this->iiu.sendThreadExitEvent.signal (); + this->iiu.cleanShutdown (); } - // reschedule connection activity watchdog - // but dont hold the lock for fear of deadlocking - // because cancel is blocking for the completion - // of the recvDog expire which takes the lock - // - it take also the callback lock - this->iiu.recvDog.messageArrivalNotify (); - - // only one recv thread at a time may call callbacks - // - pendEvent() blocks until threads waiting for - // this lock get a chance to run - epicsGuard < callbackMutex > guard ( this->cbMutex ); - - if ( ! this->iiu.pCAC()->preemptiveCallbakIsEnabled() ) { - nBytesIn = pComBuf->fillFromWire ( this->iiu ); - if ( nBytesIn == 0u ) { - // outer loop checks to see if state is connected - // ( properly set by fillFromWire() ) - break; + comBuf * pComBuf = new comBuf; + while ( this->iiu.state == iiu_connected ) { + if ( ! pComBuf ) { + pComBuf = new comBuf; } - } - // force the receive watchdog to be reset every 50 frames - unsigned contiguousFrameCount = 0; - while ( contiguousFrameCount++ < 50 ) { - - if ( nBytesIn == pComBuf->capacityBytes () ) { - if ( this->iiu.contigRecvMsgCount >= - contiguousMsgCountWhichTriggersFlowControl ) { - this->iiu.busyStateDetected = true; - } - else { - this->iiu.contigRecvMsgCount++; + // + // We leave the bytes pending and fetch them after + // callbacks are enabled when running in the old preemptive + // call back disabled mode so that asynchronous wakeup via + // file manager call backs works correctly. This does not + // appear to impact performance. + // + unsigned nBytesIn; + if ( this->iiu.pCAC()->preemptiveCallbakIsEnabled() ) { + nBytesIn = pComBuf->fillFromWire ( this->iiu ); + if ( nBytesIn == 0u ) { + break; } } else { - this->iiu.contigRecvMsgCount = 0u; - this->iiu.busyStateDetected = false; - } - this->iiu.unacknowledgedSendBytes = 0u; - - this->iiu.recvQue.pushLastComBufReceived ( *pComBuf ); - pComBuf = 0; - - // execute receive labor - bool noProtocolViolation = this->iiu.processIncoming ( guard ); - if ( ! noProtocolViolation ) { - this->iiu.state = iiu_disconnected; + char buf; + ::recv ( this->iiu.sock, &buf, 1, MSG_PEEK ); + nBytesIn = 0u; // make gnu hoppy + } + + if ( this->iiu.state != iiu_connected ) { break; } - // allocate a new com buf - pComBuf = new ( std::nothrow ) comBuf; - nBytesIn = 0u; - if ( ! pComBuf ) { - break; - } + // reschedule connection activity watchdog + // but dont hold the lock for fear of deadlocking + // because cancel is blocking for the completion + // of the recvDog expire which takes the lock + // - it take also the callback lock + this->iiu.recvDog.messageArrivalNotify (); - { - int status; - osiSockIoctl_t bytesPending = 0; - status = socket_ioctl ( this->iiu.sock, // X aCC 392 - FIONREAD, & bytesPending ); - if ( status || bytesPending == 0u ) { - break; - } + // only one recv thread at a time may call callbacks + // - pendEvent() blocks until threads waiting for + // this lock get a chance to run + epicsGuard < callbackMutex > guard ( this->cbMutex ); + + if ( ! this->iiu.pCAC()->preemptiveCallbakIsEnabled() ) { nBytesIn = pComBuf->fillFromWire ( this->iiu ); if ( nBytesIn == 0u ) { // outer loop checks to see if state is connected @@ -347,19 +293,74 @@ void tcpRecvThread::run () break; } } + + // force the receive watchdog to be reset every 50 frames + unsigned contiguousFrameCount = 0; + while ( contiguousFrameCount++ < 50 ) { + + if ( nBytesIn == pComBuf->capacityBytes () ) { + if ( this->iiu.contigRecvMsgCount >= + contiguousMsgCountWhichTriggersFlowControl ) { + this->iiu.busyStateDetected = true; + } + else { + this->iiu.contigRecvMsgCount++; + } + } + else { + this->iiu.contigRecvMsgCount = 0u; + this->iiu.busyStateDetected = false; + } + this->iiu.unacknowledgedSendBytes = 0u; + + this->iiu.recvQue.pushLastComBufReceived ( *pComBuf ); + pComBuf = 0; + + // execute receive labor + bool noProtocolViolation = this->iiu.processIncoming ( guard ); + if ( ! noProtocolViolation ) { + this->iiu.state = iiu_disconnected; + break; + } + + // allocate a new com buf + pComBuf = new comBuf; + nBytesIn = 0u; + + { + int status; + osiSockIoctl_t bytesPending = 0; + status = socket_ioctl ( this->iiu.sock, // X aCC 392 + FIONREAD, & bytesPending ); + if ( status || bytesPending == 0u ) { + break; + } + nBytesIn = pComBuf->fillFromWire ( this->iiu ); + if ( nBytesIn == 0u ) { + // outer loop checks to see if state is connected + // ( properly set by fillFromWire() ) + break; + } + } + } } + + if ( pComBuf ) { + pComBuf->destroy (); + } + + this->iiu.stopThreads (); + + // arrange for delete through the timer thread + this->iiu.killTimer.start (); } - - if ( pComBuf ) { - pComBuf->destroy (); + catch ( ... ) { + errlogPrintf ("cac tcp receive thread terminating due to a c++ exception\n" ); + this->iiu.killTimer.start (); } - - this->iiu.stopThreads (); - - // arrange for delete through the timer thread - this->iiu.killTimer.start (); } + // // tcpiiu::tcpiiu () // @@ -401,14 +402,6 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, earlyFlush ( false ), recvProcessPostponedFlush ( false ) { - if ( ! this->pCurData ) { - throw std::bad_alloc (); - } - - if ( ! this->pHostNameCache.get () ) { - throw std::bad_alloc (); - } - this->sock = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP ); if ( this->sock == INVALID_SOCKET ) { this->printf ( "CAC: unable to create virtual circuit because \"%s\"\n",