From 39f180ffd18c15b7fbc8292a4acb995a78758c38 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Wed, 23 Oct 2002 22:52:06 +0000 Subject: [PATCH] o interrupt socket system calls on unix with a signal' o use placement new --- src/ca/tcpiiu.cpp | 183 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 136 insertions(+), 47 deletions(-) diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 0e56ea086..a2650ca1b 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -37,6 +37,9 @@ #include "net_convert.h" #include "bhe.h" +const unsigned mSecPerSec = 1000u; +const unsigned uSecPerSec = 1000u * mSecPerSec; + tcpSendThread::tcpSendThread ( class tcpiiu & iiuIn, const char * pName, unsigned stackSize, unsigned priority ) : iiu ( iiuIn ), thread ( *this, pName, stackSize, priority ) @@ -60,6 +63,8 @@ void tcpSendThread::exitWait () void tcpSendThread::run () { try { + epicsSocketEnableInterruptedSystemCall (); + while ( true ) { bool flowControlLaborNeeded; bool echoLaborNeeded; @@ -175,6 +180,8 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, continue; } + + if ( localError != SOCK_EPIPE && localError != SOCK_ECONNRESET && @@ -279,6 +286,8 @@ void tcpRecvThread::exitWait () void tcpRecvThread::run () { try { + epicsSocketEnableInterruptedSystemCall (); + this->iiu.cacRef.attachToClientCtx (); epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu ); @@ -292,11 +301,11 @@ void tcpRecvThread::run () return; } - comBuf * pComBuf = new comBuf; + comBuf * pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; while ( this->iiu.state == tcpiiu::iiucs_connected || this->iiu.state == tcpiiu::iiucs_clean_shutdown ) { if ( ! pComBuf ) { - pComBuf = new comBuf; + pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; } // @@ -310,13 +319,13 @@ void tcpRecvThread::run () if ( this->iiu.cacRef.preemptiveCallbakIsEnabled() ) { nBytesIn = pComBuf->fillFromWire ( this->iiu ); if ( nBytesIn == 0u ) { - break; + continue; } } else { char buf; ::recv ( this->iiu.sock, &buf, 1, MSG_PEEK ); - nBytesIn = 0u; // make gnu hoppy + nBytesIn = 0u; } if ( this->iiu.state != tcpiiu::iiucs_connected && @@ -341,10 +350,15 @@ void tcpRecvThread::run () if ( nBytesIn == 0u ) { // outer loop checks to see if state is connected // ( properly set by fillFromWire() ) - break; + continue; } } + if ( this->iiu.state != tcpiiu::iiucs_connected && + this->iiu.state != tcpiiu::iiucs_clean_shutdown ) { + break; + } + // force the receive watchdog to be reset every 50 frames unsigned contiguousFrameCount = 0; while ( contiguousFrameCount++ < 50 ) { @@ -375,15 +389,11 @@ void tcpRecvThread::run () } // allocate a new com buf - pComBuf = new comBuf; + pComBuf = new ( this->iiu.comBufMemMgr ) comBuf; nBytesIn = 0u; { - int status; - osiSockIoctl_t bytesPending = 0; - status = socket_ioctl ( this->iiu.sock, // X aCC 392 - FIONREAD, & bytesPending ); - if ( status || bytesPending == 0u ) { + if ( ! this->iiu.bytesArePendingInOS () ) { break; } nBytesIn = pComBuf->fillFromWire ( this->iiu ); @@ -397,24 +407,30 @@ void tcpRecvThread::run () } if ( pComBuf ) { - pComBuf->destroy (); + pComBuf->~comBuf (); +# if defined ( CXX_PLACEMENT_DELETE ) && 0 + comBuf::operator delete ( pComBuf, this->iiu.comBufMemMgr ); +# else + this->iiu.comBufMemMgr.release ( pComBuf ); +# endif } } catch ( ... ) { errlogPrintf ( "cac tcp receive thread terminating due to a c++ exception\n" ); this->iiu.cacRef.initiateAbortShutdown ( this->iiu ); - } + } } - // // tcpiiu::tcpiiu () // tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, epicsTimerQueue & timerQueue, const osiSockAddr & addrIn, + comBufMemoryManager & comBufMemMgrIn, unsigned minorVersion, ipAddrToAsciiEngine & engineIn, const cacChannel::priLev & priorityIn ) : caServerID ( addrIn.ia, priorityIn ), + hostNameCacheInstance ( addrIn, engineIn ), recvThread ( *this, cbMutex, "CAC-TCP-recv", epicsThreadGetStackSize ( epicsThreadStackBig ), cac::highestPriorityLevelBelow ( cac.getInitializingThreadsPriority() ) ), @@ -425,10 +441,11 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, cac.getInitializingThreadsPriority() ) ) ), recvDog ( cac, *this, connectionTimeout, timerQueue ), sendDog ( cac, *this, connectionTimeout, timerQueue ), - sendQue ( *this ), + sendQue ( *this, comBufMemMgrIn ), + recvQue ( comBufMemMgrIn ), curDataMax ( MAX_TCP ), curDataBytes ( 0ul ), - pHostNameCache ( new hostNameCache ( addrIn, engineIn ) ), + comBufMemMgr ( comBufMemMgrIn ), cacRef ( cac ), pCurData ( cac.allocateSmallBufferTCP () ), minorProtocolVersion ( minorVersion ), @@ -436,7 +453,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, sock ( INVALID_SOCKET ), contigRecvMsgCount ( 0u ), blockingForFlush ( 0u ), - socketLibrarySendBufferSize ( 0u ), + socketLibrarySendBufferSize ( 0x1000 ), unacknowledgedSendBytes ( 0u ), busyStateDetected ( false ), flowControlActive ( false ), @@ -513,13 +530,34 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout, sizeOfParameter != static_cast < int > ( sizeof ( nBytes ) ) ) { this->printf ("CAC: problems getting socket option SO_SNDBUF = \"%s\"\n", SOCKERRSTR ( SOCKERRNO ) ); - this->socketLibrarySendBufferSize = 0u; } else { this->socketLibrarySendBufferSize = static_cast < unsigned > ( nBytes ); } } +# if 0 + // + // windows has a really strange implementation of thess options + // and we can avoid the need for this by using pthread_kill on unix + // + { + struct timeval timeout; + double pollInterval = connectionTimeout / 8.0; + timeout.tv_sec = static_cast < long > ( pollInterval ); + timeout.tv_usec = static_cast < long > + ( ( pollInterval - timeout.tv_sec ) * uSecPerSec ); + // intentionally ignore status as we dont expect that all systems + // will accept this request + setsockopt ( this->sock, SOL_SOCKET, SO_SNDTIMEO, + ( char * ) & timeout, sizeof ( timeout ) ); + // intentionally ignore status as we dont expect that all systems + // will accept this request + setsockopt ( this->sock, SOL_SOCKET, SO_RCVTIMEO, + ( char * ) & timeout, sizeof ( timeout ) ); + } +# endif + memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) ); } @@ -644,6 +682,18 @@ void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard, errlogPrintf ("CAC TCP socket close error was %s\n", SOCKERRSTR (SOCKERRNO) ); } + + // + // on HPUX close() and shutdown() are not enough so we must also + // throw signals to interrupt the threads that may be in the + // send() and recv() system calls. + // + this->recvThread.interruptSocketRecv (); + this->sendThread.interruptSocketSend (); + + // + // wake up the send thread if it isnt blocking in send() + // this->sendThreadFlushEvent.signal (); } } @@ -679,7 +729,7 @@ void tcpiiu::show ( unsigned level ) const { epicsGuard < cacMutex > locker ( this->cacRef.mutexRef() ); char buf[256]; - this->pHostNameCache->hostName ( buf, sizeof ( buf ) ); + this->hostNameCacheInstance.hostName ( buf, sizeof ( buf ) ); ::printf ( "Virtual circuit to \"%s\" at version V%u.%u state %u\n", buf, CA_MAJOR_PROTOCOL_REVISION, this->minorProtocolVersion, this->state ); @@ -842,7 +892,9 @@ void tcpiiu::hostNameSetRequest ( epicsGuard < cacMutex > & ) return; } - const char * pName = pLocalHostNameAtLoadTime->pointer (); + epicsSingleton < localHostName >::reference + ref ( localHostNameAtLoadTime ); + const char * pName = ref->pointer (); unsigned size = strlen ( pName ) + 1u; unsigned postSize = CA_MESSAGE_ALIGN ( size ); assert ( postSize < 0xffff ); @@ -1133,22 +1185,44 @@ bool tcpiiu::flush () return true; } + bool success = true; + unsigned bytesToBeSent = 0u; while ( true ) { comBuf * pBuf; { epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() ); + // set it here with this odd order because we must have + // the lock and we miust have already sent the bytes + if ( bytesToBeSent ) { + this->unacknowledgedSendBytes += bytesToBeSent; + } pBuf = this->sendQue.popNextComBufToSend (); - if ( pBuf ) { - this->unacknowledgedSendBytes += pBuf->occupiedBytes (); + if ( ! pBuf ) { + break; } - else { - if ( this->blockingForFlush ) { - this->flushBlockEvent.signal (); - } - this->earlyFlush = false; - return true; + bytesToBeSent = pBuf->occupiedBytes (); + } + + success = pBuf->flushToWire ( *this ); + pBuf->~comBuf (); +# if defined ( CXX_PLACEMENT_DELETE ) && 0 + comBuf::operator delete ( pBuf, this->comBufMemMgr ); +# else + this->comBufMemMgr.release ( pBuf ); +# endif + + if ( ! success ) { + epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() ); + while ( ( pBuf = this->sendQue.popNextComBufToSend () ) ) { + pBuf->~comBuf (); +# if defined ( CXX_PLACEMENT_DELETE ) && 0 + comBuf::operator delete ( pBuf, this->comBufMemMgr ); +# else + this->comBufMemMgr.release ( pBuf ); +# endif } + break; } // @@ -1161,25 +1235,12 @@ bool tcpiiu::flush () this->socketLibrarySendBufferSize ) { this->recvDog.sendBacklogProgressNotify (); } - - bool success = pBuf->flushToWire ( *this ); - - pBuf->destroy (); - - if ( ! success ) { - epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() ); - while ( ( pBuf = this->sendQue.popNextComBufToSend () ) ) { - pBuf->destroy (); - } - if ( this->blockingForFlush ) { - this->flushBlockEvent.signal (); - } - return false; - } } -# if defined ( __HP_aCC ) && _HP_aCC <= 033300 - return false; // to make hpux compiler happy... -# endif + if ( this->blockingForFlush ) { + this->flushBlockEvent.signal (); + } + this->earlyFlush = false; + return success; } // ~tcpiiu() will not return while this->blockingForFlush is greater than zero @@ -1230,7 +1291,7 @@ void tcpiiu::requestRecvProcessPostponedFlush () void tcpiiu::hostName ( char *pBuf, unsigned bufLength ) const { - this->pHostNameCache->hostName ( pBuf, bufLength ); + this->hostNameCacheInstance.hostName ( pBuf, bufLength ); } // deprecated - please dont use - this is _not_ thread safe @@ -1294,5 +1355,33 @@ void tcpiiu::flushRequest () this->sendThreadFlushEvent.signal (); } +bool tcpiiu::bytesArePendingInOS () const +{ +#if 0 + FD_SET readBits; + + FD_ZERO ( & readBits ); + FD_SET ( this->sock, & readBits ); + + int status = select ( ???, & readBits, NULL, NULL, zero tmo ); + if ( status ) { + if ( FD_ISSET ( this->sock, & readBits ) ) { + return true; + } + } +#endif + + osiSockIoctl_t bytesPending; + int status = socket_ioctl ( this->sock, // X aCC 392 + FIONREAD, & bytesPending ); + if ( status ) { + return false; + } + if ( bytesPending > 0 ) { + return true; + } + return false; +} +