many changes

This commit is contained in:
Jeff Hill
2004-06-17 23:49:21 +00:00
parent e5ba349551
commit 288e8abe90

View File

@@ -181,15 +181,23 @@ void tcpSendThread::run ()
}
this->iiu.sendDog.cancel ();
// wakeup user threads blocking for send backlog to be reduced
// and wait for them to stop using this IIU
this->iiu.flushBlockEvent.signal ();
while ( this->iiu.blockingForFlush ) {
epicsThreadSleep ( 0.1 );
}
this->iiu.recvDog.cancel ();
this->iiu.recvThread.exitWait ();
// user threads blocking for send backlog to be reduced
// will abort their attempt to get space if
// the state of the tcpiiu changes from connected to a
// disconnecting state. Nevertheless, we need to wait
// for them to finish prior to destroying the IIU.
{
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
while ( this->iiu.blockingForFlush ) {
epicsGuardRelease < epicsMutex > unguard ( guard );
epicsThreadSleep ( 0.1 );
}
}
this->thread.exitWaitRelease ();
this->iiu.cacRef.destroyIIU ( this->iiu );
@@ -288,6 +296,19 @@ void tcpiiu::recvBytes (
// bad file descriptor
if ( this->state != iiucs_connected &&
this->state != iiucs_clean_shutdown ) {
// the replacable printf handler isnt called here
// because it reqires a callback lock which probably
// isnt appropriate here
char name[64];
this->hostNameCacheInstance.hostName (
name, sizeof ( name ) );
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf (
"Unexpected problem with CA circuit to"
" server \"%s\" was \"%s\" - disconnecting\n",
name, sockErrBuf );
stat.bytesCopied = 0u;
stat.circuitState = swioLocalAbort;
return;
@@ -396,17 +417,6 @@ void tcpRecvThread::run ()
break;
}
else if ( stat.circuitState == swioLocalAbort ) {
callbackManager mgr ( this->ctxNotify, this->cbMutex );
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
char name[64];
this->iiu.hostName ( guard, name, sizeof ( name ) );
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
this->iiu.printf ( mgr.cbGuard,
"Unexpected problem with CA circuit to",
" server \"%s\" was \"%s\" - disconnecting\n",
name, sockErrBuf );
break;
}
else {
@@ -472,16 +482,6 @@ void tcpRecvThread::run ()
this->iiu.initiateAbortShutdown ( mgr, guard );
}
else if ( stat.circuitState == swioLocalAbort ) {
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
char name[64];
this->iiu.hostName ( guard, name, sizeof ( name ) );
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
this->iiu.printf ( mgr.cbGuard,
"Unexpected problem with CA circuit to",
" server \"%s\" was \"%s\" - disconnecting\n",
name, sockErrBuf );
break;
}
else {
@@ -746,6 +746,7 @@ void tcpiiu::initiateCleanShutdown (
if ( this->state == iiucs_connected ) {
this->state = iiucs_clean_shutdown;
this->sendThreadFlushEvent.signal ();
this->flushBlockEvent.signal ();
}
}
@@ -756,6 +757,7 @@ void tcpiiu::disconnectNotify ()
this->state = iiucs_disconnected;
}
this->sendThreadFlushEvent.signal ();
this->flushBlockEvent.signal ();
}
void tcpiiu::responsiveCircuitNotify (
@@ -807,6 +809,7 @@ void tcpiiu::unresponsiveCircuitNotify (
this->unresponsiveCircuit = true;
this->echoRequestPending = true;
this->sendThreadFlushEvent.signal ();
this->flushBlockEvent.signal ();
this->recvDog.cancel ();
this->sendDog.cancel ();
@@ -898,6 +901,7 @@ void tcpiiu::initiateAbortShutdown (
// wake up the send thread if it isnt blocking in send()
//
this->sendThreadFlushEvent.signal ();
this->flushBlockEvent.signal ();
}
// Disconnect all channels immediately from the timer thread
@@ -931,6 +935,8 @@ tcpiiu::~tcpiiu ()
{
this->sendThread.exitWait ();
this->recvThread.exitWait ();
this->sendDog.cancel ();
this->recvDog.cancel ();
if ( ! this->socketHasBeenClosed ) {
epicsSocketDestroy ( this->sock );
@@ -1290,7 +1296,11 @@ void tcpiiu::writeRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431
nciu &chan, unsigned type, arrayElementCount nElem, const void *pValue )
{
guard.assertIdenticalMutex ( this->mutex );
// there are situations where the circuit is disconnected, but
// the channel does not know this yet
if ( this->state != iiucs_connected ) {
throw cacChannel::notConnected ();
}
comQueSendMsgMinder minder ( this->sendQue, guard );
this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE,
type, nElem, chan.getSID(guard), chan.getCID(guard), pValue,
@@ -1308,6 +1318,11 @@ void tcpiiu::writeNotifyRequest ( epicsGuard < epicsMutex > & guard, // X aCC 43
if ( ! this->ca_v41_ok ( guard ) ) {
throw cacChannel::unsupportedByService();
}
// there are situations where the circuit is disconnected, but
// the channel does not know this yet
if ( this->state != iiucs_connected ) {
throw cacChannel::notConnected ();
}
comQueSendMsgMinder minder ( this->sendQue, guard );
this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE_NOTIFY,
type, nElem, chan.getSID(guard), io.getId(), pValue,
@@ -1320,7 +1335,11 @@ void tcpiiu::readNotifyRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431
unsigned dataType, arrayElementCount nElem )
{
guard.assertIdenticalMutex ( this->mutex );
// there are situations where the circuit is disconnected, but
// the channel does not know this yet
if ( this->state != iiucs_connected ) {
throw cacChannel::notConnected ();
}
if ( INVALID_DB_REQ ( dataType ) ) {
throw cacChannel::badType ();
}
@@ -1351,6 +1370,11 @@ void tcpiiu::createChannelRequest (
{
guard.assertIdenticalMutex ( this->mutex );
if ( this->state != iiucs_connected &&
this->state != iiucs_connecting ) {
return;
}
const char *pName;
unsigned nameLength;
ca_uint32_t identity;
@@ -1394,7 +1418,11 @@ void tcpiiu::clearChannelRequest ( epicsGuard < epicsMutex > & guard, // X aCC 4
ca_uint32_t sid, ca_uint32_t cid )
{
guard.assertIdenticalMutex ( this->mutex );
// there are situations where the circuit is disconnected, but
// the channel does not know this yet
if ( this->state != iiucs_connected ) {
return;
}
comQueSendMsgMinder minder ( this->sendQue, guard );
this->sendQue.insertRequestHeader (
CA_PROTO_CLEAR_CHANNEL, 0u,
@@ -1412,8 +1440,10 @@ void tcpiiu::subscriptionRequest (
nciu & chan, netSubscription & subscr )
{
guard.assertIdenticalMutex ( this->mutex );
if ( ! chan.isConnected ( guard ) ) {
// there are situations where the circuit is disconnected, but
// the channel does not know this yet
if ( this->state != iiucs_connected &&
this->state != iiucs_connecting ) {
return;
}
unsigned mask = subscr.getMask(guard);
@@ -1456,12 +1486,14 @@ void tcpiiu::subscriptionRequest (
// this routine return void because if this internally fails the best response
// is to try again the next time that we reconnect
//
void tcpiiu::subscriptionUpdateRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431
nciu & chan, netSubscription & subscr )
void tcpiiu::subscriptionUpdateRequest (
epicsGuard < epicsMutex > & guard, // X aCC 431
nciu & chan, netSubscription & subscr )
{
guard.assertIdenticalMutex ( this->mutex );
if ( ! chan.isConnected ( guard ) ) {
// there are situations where the circuit is disconnected, but
// the channel does not know this yet
if ( this->state != iiucs_connected ) {
return;
}
arrayElementCount nElem = subscr.getCount ( guard );
@@ -1493,7 +1525,11 @@ void tcpiiu::subscriptionCancelRequest ( epicsGuard < epicsMutex > & guard, // X
nciu & chan, netSubscription & subscr )
{
guard.assertIdenticalMutex ( this->mutex );
// there are situations where the circuit is disconnected, but
// the channel does not know this yet
if ( this->state != iiucs_connected ) {
return;
}
comQueSendMsgMinder minder ( this->sendQue, guard );
this->sendQue.insertRequestHeader (
CA_PROTO_EVENT_CANCEL, 0u,
@@ -1554,6 +1590,13 @@ void tcpiiu::eliminateExcessiveSendBacklog (
{
mutualExclusionGuard.assertIdenticalMutex ( this->mutex );
// this is used to slow down applications that use too much
// send cache, but this is inappropriate when the circuit hasnt
// connected yet.
if ( this->state == iiucs_connecting ) {
return;
}
if ( this->sendQue.flushBlockThreshold ( 0u ) ) {
this->flushRequest ( mutualExclusionGuard );
// the process thread is not permitted to flush as this
@@ -1568,8 +1611,15 @@ void tcpiiu::eliminateExcessiveSendBacklog (
// pointer to this cac might become invalid
assert ( this->blockingForFlush < UINT_MAX );
this->blockingForFlush++;
while ( this->sendQue.flushBlockThreshold(0u) &&
this->state == iiucs_connected ) {
while ( this->sendQue.flushBlockThreshold(0u) ) {
// fail the users request if we have a disconnected
// or unresponsive circuit
if ( this->state != iiucs_connected ||
this->unresponsiveCircuit ) {
throw cacChannel::notConnected ();
}
epicsGuardRelease < epicsMutex > autoRelease (
mutualExclusionGuard );
if ( pCallbackGuard ) {