fixed potential race condition and turned TCPNODELAY on

This commit is contained in:
Jeff Hill
2000-09-07 01:30:43 +00:00
parent b7b1a82472
commit ef77d41cc1

View File

@@ -126,13 +126,16 @@ void tcpiiu::connect ()
}
}
/*
* cacSendThreadTCP ()
*/
//
// cacSendThreadTCP ()
//
// care is taken to not hold the lock while sending a message
//
extern "C" void cacSendThreadTCP ( void *pParam )
{
tcpiiu *piiu = ( tcpiiu * ) pParam;
claimMsgCache cache ( CA_V44 ( CA_PROTOCOL_VERSION, piiu->minorProtocolVersionNumber ) );
bool laborNeeded;
while ( true ) {
@@ -142,32 +145,61 @@ extern "C" void cacSendThreadTCP ( void *pParam )
break;
}
if ( piiu->pClaimsPendingIIU->channelCount () > 0u ) {
piiu->pClaimsPendingIIU->sendPendingClaims ( *piiu,
piiu->lock ();
laborNeeded = piiu->claimsPending;
piiu->claimsPending = false;
piiu->unlock ();
if ( laborNeeded ) {
piiu->sendPendingClaims (
CA_V42 ( CA_PROTOCOL_VERSION, piiu->minorProtocolVersionNumber ), cache );
piiu->flushPending = true;
}
if ( piiu->busyStateDetected != piiu->flowControlActive ) {
piiu->lock ();
laborNeeded = piiu->busyStateDetected != piiu->flowControlActive;
piiu->unlock ();
if ( laborNeeded ) {
if ( piiu->flowControlActive ) {
piiu->enableFlowControlMsg ();
piiu->comQueSend::disableFlowControlRequest ();
piiu->flowControlActive = false;
# if defined ( DEBUG ) || 1
printf ( "fc off\n" );
# endif
}
else {
piiu->disableFlowControlMsg ();
piiu->comQueSend::enableFlowControlRequest ();
piiu->flowControlActive = true;
# if defined ( DEBUG ) || 1
printf ( "fc on\n" );
# endif
}
piiu->flushPending = true;
}
if ( piiu->echoRequestPending ) {
piiu->echoRequestMsg ();
piiu->lock ();
laborNeeded = piiu->echoRequestPending;
piiu->echoRequestPending = false;
piiu->unlock ();
if ( laborNeeded ) {
if ( CA_V43 ( CA_PROTOCOL_VERSION, piiu->minorProtocolVersionNumber ) ) {
piiu->comQueSend::echoRequest ();
}
else {
piiu->comQueSend::noopRequest ();
}
piiu->flushPending = true;
}
if ( piiu->flushPending ) {
piiu->lock ();
laborNeeded = piiu->flushPending;
piiu->flushPending = false;
piiu->unlock ();
if ( laborNeeded ) {
if ( ! piiu->flushToWire () ) {
break;
}
piiu->flushPending = false;
}
}
@@ -269,6 +301,7 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf )
assert ( static_cast <unsigned> ( status ) <= nBytesInBuf );
totalBytes = static_cast <unsigned> ( status );
this->lock ();
if ( nBytesInBuf == totalBytes ) {
if ( this->contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) {
this->busyStateDetected = true;
@@ -281,6 +314,7 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf )
this->contigRecvMsgCount = 0u;
this->busyStateDetected = false;
}
this->unlock ();
this->messageArrivalNotify (); // reschedule connection activity watchdog
@@ -349,7 +383,8 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn,
flowControlActive ( false ),
recvMessagePending ( false ),
flushPending ( false ),
msgHeaderAvailable ( false )
msgHeaderAvailable ( false ),
claimsPending ( false )
{
SOCKET newSocket;
int status;
@@ -363,10 +398,6 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn,
return;
}
//
// apparently this is nolonger necessary with modern IP kernels
//
#if 0
flag = TRUE;
status = setsockopt ( newSocket, IPPROTO_TCP, TCP_NODELAY,
(char *)&flag, sizeof(flag) );
@@ -374,7 +405,7 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn,
ca_printf ("CAC: problems setting socket option TCP_NODELAY = \"%s\"\n",
SOCKERRSTR (SOCKERRNO));
}
#endif
flag = TRUE;
status = setsockopt ( newSocket , SOL_SOCKET, SO_KEEPALIVE,
(char *)&flag, sizeof (flag) );
@@ -438,21 +469,11 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn,
return;
}
this->pClaimsPendingIIU = new claimsPendingIIU ( *this );
if ( ! this->pClaimsPendingIIU ) {
ca_printf ("CA: unable to create claimsPendingIIU object\n");
semBinaryDestroy (this->sendThreadExitSignal);
socket_close ( newSocket );
this->fullyConstructedFlag = false;
return;
}
this->sendThreadFlushSignal = semBinaryCreate ( semEmpty );
if ( ! this->sendThreadFlushSignal ) {
ca_printf ("CA: unable to create sendThreadFlushSignal object\n");
semBinaryDestroy (this->sendThreadExitSignal);
socket_close ( newSocket );
delete this->pClaimsPendingIIU;
this->fullyConstructedFlag = false;
return;
}
@@ -463,7 +484,6 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn,
semBinaryDestroy (this->sendThreadExitSignal);
semBinaryDestroy (this->sendThreadFlushSignal);
socket_close ( newSocket );
delete this->pClaimsPendingIIU;
this->fullyConstructedFlag = false;
return;
}
@@ -490,7 +510,6 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn,
semBinaryDestroy (this->recvThreadExitSignal);
semBinaryDestroy (this->sendThreadExitSignal);
socket_close ( newSocket );
delete this->pClaimsPendingIIU;
semBinaryDestroy (this->sendThreadFlushSignal);
semBinaryDestroy (this->recvThreadRingBufferSpaceAvailableSignal);
this->fullyConstructedFlag = false;
@@ -556,6 +575,8 @@ tcpiiu::~tcpiiu ()
this->fullyConstructedFlag = false;
this->clientCtx ().removeIIU ( *this );
if ( this->channelCount () ) {
char hostNameTmp[64];
this->ipToA.hostName ( hostNameTmp, sizeof ( hostNameTmp ) );
@@ -564,11 +585,6 @@ tcpiiu::~tcpiiu ()
this->disconnectAllChan ();
if ( this->pClaimsPendingIIU ) {
this->pClaimsPendingIIU->disconnectAllChan ();
delete this->pClaimsPendingIIU;
}
this->cleanShutdown ();
// wait for send and recv threads to exit
@@ -580,8 +596,6 @@ tcpiiu::~tcpiiu ()
semBinaryDestroy ( this->sendThreadFlushSignal );
semBinaryDestroy ( this->recvThreadRingBufferSpaceAvailableSignal );
this->clientCtx ().removeIIU ( *this );
/*
* free message body cache
*/
@@ -643,9 +657,6 @@ void tcpiiu::show ( unsigned level ) const
printf ("\treceive message pending bool = %u\n", this->recvMessagePending );
printf ("\tflush pending bool = %u\n", this->flushPending );
printf ("\treceive message header available bool = %u\n", this->msgHeaderAvailable );
if ( this->pClaimsPendingIIU ) {
this->pClaimsPendingIIU->show ( level - 3u );
}
this->bhe.show ( level - 3u );
}
this->unlock ();
@@ -653,49 +664,12 @@ void tcpiiu::show ( unsigned level ) const
void tcpiiu::echoRequest ()
{
this->lock ();
this->echoRequestPending = true;
this->unlock ();
this->flush ();
}
/*
* tcpiiu::echoRequestMsg ()
*/
void tcpiiu::echoRequestMsg ()
{
if ( CA_V43 ( CA_PROTOCOL_VERSION, this->minorProtocolVersionNumber ) ) {
this->comQueSend::echoRequest ();
}
else {
this->comQueSend::noopRequest ();
}
this->echoRequestPending = false;
}
/*
* tcpiiu::enableFlowControlMsg ()
*/
void tcpiiu::enableFlowControlMsg ()
{
this->comQueSend::enableFlowControlRequest ();
this->flowControlActive = true;
# if defined ( DEBUG ) || 1
printf( "fc on\n" );
# endif
}
/*
* tcpiiu::disableFlowControlMsg ()
*/
void tcpiiu::disableFlowControlMsg ()
{
this->comQueSend::disableFlowControlRequest ();
this->flowControlActive = false;
# if defined ( DEBUG ) || 1
printf ( "fc off\n" );
# endif
}
/*
* tcpiiu::hostNameSetMsg ()
*/
@@ -914,7 +888,7 @@ void tcpiiu::accessRightsRespAction ()
void tcpiiu::claimCIURespAction ()
{
this->clientCtx ().connectChannel ( this->curMsg.m_cid, *this,
this->clientCtx ().connectChannel ( this->ca_v44_ok (), this->curMsg.m_cid,
this->curMsg.m_dataType, this->curMsg.m_count, this->curMsg.m_available );
}
@@ -1108,8 +1082,11 @@ void tcpiiu::lastChannelDetachNotify ()
void tcpiiu::installChannelPendingClaim ( nciu &chan )
{
chan.attachChanToIIU ( *this->pClaimsPendingIIU );
// wake up send thread which sends claim message
chan.attachChanToIIU ( *this );
this->lock ();
this->claimsPending = true;
this->unlock ();
// wake up send thread which ultimately sends the claim message
semBinaryGive ( this->sendThreadFlushSignal );
}
@@ -1121,7 +1098,9 @@ void tcpiiu::installChannelPendingClaim ( nciu &chan )
bool tcpiiu::flushToWirePermit ()
{
if ( this->clientCtx ().currentThreadIsRecvProcessThread () ) {
this->lock ();
this->flushPending = true;
this->unlock ();
semBinaryGive ( this->sendThreadFlushSignal );
return false;
}
@@ -1130,4 +1109,3 @@ bool tcpiiu::flushToWirePermit ()
}
}