o interrupt socket system calls on unix with a signal'

o use placement new
This commit is contained in:
Jeff Hill
2002-10-23 22:52:06 +00:00
parent d859a43492
commit 39f180ffd1
+136 -47
View File
@@ -37,6 +37,9 @@
#include "net_convert.h"
#include "bhe.h"
const unsigned mSecPerSec = 1000u;
const unsigned uSecPerSec = 1000u * mSecPerSec;
tcpSendThread::tcpSendThread ( class tcpiiu & iiuIn,
const char * pName, unsigned stackSize, unsigned priority ) :
iiu ( iiuIn ), thread ( *this, pName, stackSize, priority )
@@ -60,6 +63,8 @@ void tcpSendThread::exitWait ()
void tcpSendThread::run ()
{
try {
epicsSocketEnableInterruptedSystemCall ();
while ( true ) {
bool flowControlLaborNeeded;
bool echoLaborNeeded;
@@ -175,6 +180,8 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
continue;
}
if (
localError != SOCK_EPIPE &&
localError != SOCK_ECONNRESET &&
@@ -279,6 +286,8 @@ void tcpRecvThread::exitWait ()
void tcpRecvThread::run ()
{
try {
epicsSocketEnableInterruptedSystemCall ();
this->iiu.cacRef.attachToClientCtx ();
epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu );
@@ -292,11 +301,11 @@ void tcpRecvThread::run ()
return;
}
comBuf * pComBuf = new comBuf;
comBuf * pComBuf = new ( this->iiu.comBufMemMgr ) comBuf;
while ( this->iiu.state == tcpiiu::iiucs_connected ||
this->iiu.state == tcpiiu::iiucs_clean_shutdown ) {
if ( ! pComBuf ) {
pComBuf = new comBuf;
pComBuf = new ( this->iiu.comBufMemMgr ) comBuf;
}
//
@@ -310,13 +319,13 @@ void tcpRecvThread::run ()
if ( this->iiu.cacRef.preemptiveCallbakIsEnabled() ) {
nBytesIn = pComBuf->fillFromWire ( this->iiu );
if ( nBytesIn == 0u ) {
break;
continue;
}
}
else {
char buf;
::recv ( this->iiu.sock, &buf, 1, MSG_PEEK );
nBytesIn = 0u; // make gnu hoppy
nBytesIn = 0u;
}
if ( this->iiu.state != tcpiiu::iiucs_connected &&
@@ -341,10 +350,15 @@ void tcpRecvThread::run ()
if ( nBytesIn == 0u ) {
// outer loop checks to see if state is connected
// ( properly set by fillFromWire() )
break;
continue;
}
}
if ( this->iiu.state != tcpiiu::iiucs_connected &&
this->iiu.state != tcpiiu::iiucs_clean_shutdown ) {
break;
}
// force the receive watchdog to be reset every 50 frames
unsigned contiguousFrameCount = 0;
while ( contiguousFrameCount++ < 50 ) {
@@ -375,15 +389,11 @@ void tcpRecvThread::run ()
}
// allocate a new com buf
pComBuf = new comBuf;
pComBuf = new ( this->iiu.comBufMemMgr ) comBuf;
nBytesIn = 0u;
{
int status;
osiSockIoctl_t bytesPending = 0;
status = socket_ioctl ( this->iiu.sock, // X aCC 392
FIONREAD, & bytesPending );
if ( status || bytesPending == 0u ) {
if ( ! this->iiu.bytesArePendingInOS () ) {
break;
}
nBytesIn = pComBuf->fillFromWire ( this->iiu );
@@ -397,24 +407,30 @@ void tcpRecvThread::run ()
}
if ( pComBuf ) {
pComBuf->destroy ();
pComBuf->~comBuf ();
# if defined ( CXX_PLACEMENT_DELETE ) && 0
comBuf::operator delete ( pComBuf, this->iiu.comBufMemMgr );
# else
this->iiu.comBufMemMgr.release ( pComBuf );
# endif
}
}
catch ( ... ) {
errlogPrintf ( "cac tcp receive thread terminating due to a c++ exception\n" );
this->iiu.cacRef.initiateAbortShutdown ( this->iiu );
}
}
}
//
// tcpiiu::tcpiiu ()
//
tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
epicsTimerQueue & timerQueue, const osiSockAddr & addrIn,
comBufMemoryManager & comBufMemMgrIn,
unsigned minorVersion, ipAddrToAsciiEngine & engineIn,
const cacChannel::priLev & priorityIn ) :
caServerID ( addrIn.ia, priorityIn ),
hostNameCacheInstance ( addrIn, engineIn ),
recvThread ( *this, cbMutex, "CAC-TCP-recv",
epicsThreadGetStackSize ( epicsThreadStackBig ),
cac::highestPriorityLevelBelow ( cac.getInitializingThreadsPriority() ) ),
@@ -425,10 +441,11 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
cac.getInitializingThreadsPriority() ) ) ),
recvDog ( cac, *this, connectionTimeout, timerQueue ),
sendDog ( cac, *this, connectionTimeout, timerQueue ),
sendQue ( *this ),
sendQue ( *this, comBufMemMgrIn ),
recvQue ( comBufMemMgrIn ),
curDataMax ( MAX_TCP ),
curDataBytes ( 0ul ),
pHostNameCache ( new hostNameCache ( addrIn, engineIn ) ),
comBufMemMgr ( comBufMemMgrIn ),
cacRef ( cac ),
pCurData ( cac.allocateSmallBufferTCP () ),
minorProtocolVersion ( minorVersion ),
@@ -436,7 +453,7 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
sock ( INVALID_SOCKET ),
contigRecvMsgCount ( 0u ),
blockingForFlush ( 0u ),
socketLibrarySendBufferSize ( 0u ),
socketLibrarySendBufferSize ( 0x1000 ),
unacknowledgedSendBytes ( 0u ),
busyStateDetected ( false ),
flowControlActive ( false ),
@@ -513,13 +530,34 @@ tcpiiu::tcpiiu ( cac & cac, callbackMutex & cbMutex, double connectionTimeout,
sizeOfParameter != static_cast < int > ( sizeof ( nBytes ) ) ) {
this->printf ("CAC: problems getting socket option SO_SNDBUF = \"%s\"\n",
SOCKERRSTR ( SOCKERRNO ) );
this->socketLibrarySendBufferSize = 0u;
}
else {
this->socketLibrarySendBufferSize = static_cast < unsigned > ( nBytes );
}
}
# if 0
//
// windows has a really strange implementation of thess options
// and we can avoid the need for this by using pthread_kill on unix
//
{
struct timeval timeout;
double pollInterval = connectionTimeout / 8.0;
timeout.tv_sec = static_cast < long > ( pollInterval );
timeout.tv_usec = static_cast < long >
( ( pollInterval - timeout.tv_sec ) * uSecPerSec );
// intentionally ignore status as we dont expect that all systems
// will accept this request
setsockopt ( this->sock, SOL_SOCKET, SO_SNDTIMEO,
( char * ) & timeout, sizeof ( timeout ) );
// intentionally ignore status as we dont expect that all systems
// will accept this request
setsockopt ( this->sock, SOL_SOCKET, SO_RCVTIMEO,
( char * ) & timeout, sizeof ( timeout ) );
}
# endif
memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) );
}
@@ -644,6 +682,18 @@ void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard,
errlogPrintf ("CAC TCP socket close error was %s\n",
SOCKERRSTR (SOCKERRNO) );
}
//
// on HPUX close() and shutdown() are not enough so we must also
// throw signals to interrupt the threads that may be in the
// send() and recv() system calls.
//
this->recvThread.interruptSocketRecv ();
this->sendThread.interruptSocketSend ();
//
// wake up the send thread if it isnt blocking in send()
//
this->sendThreadFlushEvent.signal ();
}
}
@@ -679,7 +729,7 @@ void tcpiiu::show ( unsigned level ) const
{
epicsGuard < cacMutex > locker ( this->cacRef.mutexRef() );
char buf[256];
this->pHostNameCache->hostName ( buf, sizeof ( buf ) );
this->hostNameCacheInstance.hostName ( buf, sizeof ( buf ) );
::printf ( "Virtual circuit to \"%s\" at version V%u.%u state %u\n",
buf, CA_MAJOR_PROTOCOL_REVISION,
this->minorProtocolVersion, this->state );
@@ -842,7 +892,9 @@ void tcpiiu::hostNameSetRequest ( epicsGuard < cacMutex > & )
return;
}
const char * pName = pLocalHostNameAtLoadTime->pointer ();
epicsSingleton < localHostName >::reference
ref ( localHostNameAtLoadTime );
const char * pName = ref->pointer ();
unsigned size = strlen ( pName ) + 1u;
unsigned postSize = CA_MESSAGE_ALIGN ( size );
assert ( postSize < 0xffff );
@@ -1133,22 +1185,44 @@ bool tcpiiu::flush ()
return true;
}
bool success = true;
unsigned bytesToBeSent = 0u;
while ( true ) {
comBuf * pBuf;
{
epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() );
// set it here with this odd order because we must have
// the lock and we miust have already sent the bytes
if ( bytesToBeSent ) {
this->unacknowledgedSendBytes += bytesToBeSent;
}
pBuf = this->sendQue.popNextComBufToSend ();
if ( pBuf ) {
this->unacknowledgedSendBytes += pBuf->occupiedBytes ();
if ( ! pBuf ) {
break;
}
else {
if ( this->blockingForFlush ) {
this->flushBlockEvent.signal ();
}
this->earlyFlush = false;
return true;
bytesToBeSent = pBuf->occupiedBytes ();
}
success = pBuf->flushToWire ( *this );
pBuf->~comBuf ();
# if defined ( CXX_PLACEMENT_DELETE ) && 0
comBuf::operator delete ( pBuf, this->comBufMemMgr );
# else
this->comBufMemMgr.release ( pBuf );
# endif
if ( ! success ) {
epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() );
while ( ( pBuf = this->sendQue.popNextComBufToSend () ) ) {
pBuf->~comBuf ();
# if defined ( CXX_PLACEMENT_DELETE ) && 0
comBuf::operator delete ( pBuf, this->comBufMemMgr );
# else
this->comBufMemMgr.release ( pBuf );
# endif
}
break;
}
//
@@ -1161,25 +1235,12 @@ bool tcpiiu::flush ()
this->socketLibrarySendBufferSize ) {
this->recvDog.sendBacklogProgressNotify ();
}
bool success = pBuf->flushToWire ( *this );
pBuf->destroy ();
if ( ! success ) {
epicsGuard < cacMutex > autoMutex ( this->cacRef.mutexRef() );
while ( ( pBuf = this->sendQue.popNextComBufToSend () ) ) {
pBuf->destroy ();
}
if ( this->blockingForFlush ) {
this->flushBlockEvent.signal ();
}
return false;
}
}
# if defined ( __HP_aCC ) && _HP_aCC <= 033300
return false; // to make hpux compiler happy...
# endif
if ( this->blockingForFlush ) {
this->flushBlockEvent.signal ();
}
this->earlyFlush = false;
return success;
}
// ~tcpiiu() will not return while this->blockingForFlush is greater than zero
@@ -1230,7 +1291,7 @@ void tcpiiu::requestRecvProcessPostponedFlush ()
void tcpiiu::hostName ( char *pBuf, unsigned bufLength ) const
{
this->pHostNameCache->hostName ( pBuf, bufLength );
this->hostNameCacheInstance.hostName ( pBuf, bufLength );
}
// deprecated - please dont use - this is _not_ thread safe
@@ -1294,5 +1355,33 @@ void tcpiiu::flushRequest ()
this->sendThreadFlushEvent.signal ();
}
bool tcpiiu::bytesArePendingInOS () const
{
#if 0
FD_SET readBits;
FD_ZERO ( & readBits );
FD_SET ( this->sock, & readBits );
int status = select ( ???, & readBits, NULL, NULL, zero tmo );
if ( status ) {
if ( FD_ISSET ( this->sock, & readBits ) ) {
return true;
}
}
#endif
osiSockIoctl_t bytesPending;
int status = socket_ioctl ( this->sock, // X aCC 392
FIONREAD, & bytesPending );
if ( status ) {
return false;
}
if ( bytesPending > 0 ) {
return true;
}
return false;
}