diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 51ac538bd..b6617acd0 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -126,13 +126,16 @@ void tcpiiu::connect () } } -/* - * cacSendThreadTCP () - */ +// +// cacSendThreadTCP () +// +// care is taken to not hold the lock while sending a message +// extern "C" void cacSendThreadTCP ( void *pParam ) { tcpiiu *piiu = ( tcpiiu * ) pParam; claimMsgCache cache ( CA_V44 ( CA_PROTOCOL_VERSION, piiu->minorProtocolVersionNumber ) ); + bool laborNeeded; while ( true ) { @@ -142,32 +145,61 @@ extern "C" void cacSendThreadTCP ( void *pParam ) break; } - if ( piiu->pClaimsPendingIIU->channelCount () > 0u ) { - piiu->pClaimsPendingIIU->sendPendingClaims ( *piiu, + piiu->lock (); + laborNeeded = piiu->claimsPending; + piiu->claimsPending = false; + piiu->unlock (); + if ( laborNeeded ) { + piiu->sendPendingClaims ( CA_V42 ( CA_PROTOCOL_VERSION, piiu->minorProtocolVersionNumber ), cache ); piiu->flushPending = true; } - if ( piiu->busyStateDetected != piiu->flowControlActive ) { + piiu->lock (); + laborNeeded = piiu->busyStateDetected != piiu->flowControlActive; + piiu->unlock (); + if ( laborNeeded ) { if ( piiu->flowControlActive ) { - piiu->enableFlowControlMsg (); + piiu->comQueSend::disableFlowControlRequest (); + piiu->flowControlActive = false; +# if defined ( DEBUG ) || 1 + printf ( "fc off\n" ); +# endif } else { - piiu->disableFlowControlMsg (); + piiu->comQueSend::enableFlowControlRequest (); + piiu->flowControlActive = true; +# if defined ( DEBUG ) || 1 + printf ( "fc on\n" ); +# endif } piiu->flushPending = true; } - if ( piiu->echoRequestPending ) { - piiu->echoRequestMsg (); + piiu->lock (); + laborNeeded = piiu->echoRequestPending; + piiu->echoRequestPending = false; + piiu->unlock (); + + if ( laborNeeded ) { + if ( CA_V43 ( CA_PROTOCOL_VERSION, piiu->minorProtocolVersionNumber ) ) { + piiu->comQueSend::echoRequest (); + } + else { + piiu->comQueSend::noopRequest (); + } piiu->flushPending = true; } - if ( piiu->flushPending ) { + piiu->lock (); + laborNeeded = piiu->flushPending; + piiu->flushPending = false; + piiu->unlock (); + + if ( laborNeeded ) { if ( ! piiu->flushToWire () ) { break; } - piiu->flushPending = false; } } @@ -269,6 +301,7 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf ) assert ( static_cast ( status ) <= nBytesInBuf ); totalBytes = static_cast ( status ); + this->lock (); if ( nBytesInBuf == totalBytes ) { if ( this->contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) { this->busyStateDetected = true; @@ -281,6 +314,7 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf ) this->contigRecvMsgCount = 0u; this->busyStateDetected = false; } + this->unlock (); this->messageArrivalNotify (); // reschedule connection activity watchdog @@ -349,7 +383,8 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn, flowControlActive ( false ), recvMessagePending ( false ), flushPending ( false ), - msgHeaderAvailable ( false ) + msgHeaderAvailable ( false ), + claimsPending ( false ) { SOCKET newSocket; int status; @@ -363,10 +398,6 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn, return; } - // - // apparently this is nolonger necessary with modern IP kernels - // -#if 0 flag = TRUE; status = setsockopt ( newSocket, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag) ); @@ -374,7 +405,7 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn, ca_printf ("CAC: problems setting socket option TCP_NODELAY = \"%s\"\n", SOCKERRSTR (SOCKERRNO)); } -#endif + flag = TRUE; status = setsockopt ( newSocket , SOL_SOCKET, SO_KEEPALIVE, (char *)&flag, sizeof (flag) ); @@ -438,21 +469,11 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn, return; } - this->pClaimsPendingIIU = new claimsPendingIIU ( *this ); - if ( ! this->pClaimsPendingIIU ) { - ca_printf ("CA: unable to create claimsPendingIIU object\n"); - semBinaryDestroy (this->sendThreadExitSignal); - socket_close ( newSocket ); - this->fullyConstructedFlag = false; - return; - } - this->sendThreadFlushSignal = semBinaryCreate ( semEmpty ); if ( ! this->sendThreadFlushSignal ) { ca_printf ("CA: unable to create sendThreadFlushSignal object\n"); semBinaryDestroy (this->sendThreadExitSignal); socket_close ( newSocket ); - delete this->pClaimsPendingIIU; this->fullyConstructedFlag = false; return; } @@ -463,7 +484,6 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn, semBinaryDestroy (this->sendThreadExitSignal); semBinaryDestroy (this->sendThreadFlushSignal); socket_close ( newSocket ); - delete this->pClaimsPendingIIU; this->fullyConstructedFlag = false; return; } @@ -490,7 +510,6 @@ tcpiiu::tcpiiu ( cac &cac, const osiSockAddr &addrIn, semBinaryDestroy (this->recvThreadExitSignal); semBinaryDestroy (this->sendThreadExitSignal); socket_close ( newSocket ); - delete this->pClaimsPendingIIU; semBinaryDestroy (this->sendThreadFlushSignal); semBinaryDestroy (this->recvThreadRingBufferSpaceAvailableSignal); this->fullyConstructedFlag = false; @@ -556,6 +575,8 @@ tcpiiu::~tcpiiu () this->fullyConstructedFlag = false; + this->clientCtx ().removeIIU ( *this ); + if ( this->channelCount () ) { char hostNameTmp[64]; this->ipToA.hostName ( hostNameTmp, sizeof ( hostNameTmp ) ); @@ -564,11 +585,6 @@ tcpiiu::~tcpiiu () this->disconnectAllChan (); - if ( this->pClaimsPendingIIU ) { - this->pClaimsPendingIIU->disconnectAllChan (); - delete this->pClaimsPendingIIU; - } - this->cleanShutdown (); // wait for send and recv threads to exit @@ -580,8 +596,6 @@ tcpiiu::~tcpiiu () semBinaryDestroy ( this->sendThreadFlushSignal ); semBinaryDestroy ( this->recvThreadRingBufferSpaceAvailableSignal ); - this->clientCtx ().removeIIU ( *this ); - /* * free message body cache */ @@ -643,9 +657,6 @@ void tcpiiu::show ( unsigned level ) const printf ("\treceive message pending bool = %u\n", this->recvMessagePending ); printf ("\tflush pending bool = %u\n", this->flushPending ); printf ("\treceive message header available bool = %u\n", this->msgHeaderAvailable ); - if ( this->pClaimsPendingIIU ) { - this->pClaimsPendingIIU->show ( level - 3u ); - } this->bhe.show ( level - 3u ); } this->unlock (); @@ -653,49 +664,12 @@ void tcpiiu::show ( unsigned level ) const void tcpiiu::echoRequest () { + this->lock (); this->echoRequestPending = true; + this->unlock (); this->flush (); } -/* - * tcpiiu::echoRequestMsg () - */ -void tcpiiu::echoRequestMsg () -{ - if ( CA_V43 ( CA_PROTOCOL_VERSION, this->minorProtocolVersionNumber ) ) { - this->comQueSend::echoRequest (); - } - else { - this->comQueSend::noopRequest (); - } - - this->echoRequestPending = false; -} - -/* - * tcpiiu::enableFlowControlMsg () - */ -void tcpiiu::enableFlowControlMsg () -{ - this->comQueSend::enableFlowControlRequest (); - this->flowControlActive = true; -# if defined ( DEBUG ) || 1 - printf( "fc on\n" ); -# endif -} - -/* - * tcpiiu::disableFlowControlMsg () - */ -void tcpiiu::disableFlowControlMsg () -{ - this->comQueSend::disableFlowControlRequest (); - this->flowControlActive = false; -# if defined ( DEBUG ) || 1 - printf ( "fc off\n" ); -# endif -} - /* * tcpiiu::hostNameSetMsg () */ @@ -914,7 +888,7 @@ void tcpiiu::accessRightsRespAction () void tcpiiu::claimCIURespAction () { - this->clientCtx ().connectChannel ( this->curMsg.m_cid, *this, + this->clientCtx ().connectChannel ( this->ca_v44_ok (), this->curMsg.m_cid, this->curMsg.m_dataType, this->curMsg.m_count, this->curMsg.m_available ); } @@ -1108,8 +1082,11 @@ void tcpiiu::lastChannelDetachNotify () void tcpiiu::installChannelPendingClaim ( nciu &chan ) { - chan.attachChanToIIU ( *this->pClaimsPendingIIU ); - // wake up send thread which sends claim message + chan.attachChanToIIU ( *this ); + this->lock (); + this->claimsPending = true; + this->unlock (); + // wake up send thread which ultimately sends the claim message semBinaryGive ( this->sendThreadFlushSignal ); } @@ -1121,7 +1098,9 @@ void tcpiiu::installChannelPendingClaim ( nciu &chan ) bool tcpiiu::flushToWirePermit () { if ( this->clientCtx ().currentThreadIsRecvProcessThread () ) { + this->lock (); this->flushPending = true; + this->unlock (); semBinaryGive ( this->sendThreadFlushSignal ); return false; } @@ -1130,4 +1109,3 @@ bool tcpiiu::flushToWirePermit () } } -