From ab52e91d66ac5f64678eed81cc2b32774322c38e Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Tue, 19 Jun 2001 20:05:44 +0000 Subject: [PATCH] IO deletes must wait for callback completion --- src/ca/cac.cpp | 163 ++++++++++++++++++++++++++++--------------------- src/ca/cac.h | 1 + 2 files changed, 96 insertions(+), 68 deletions(-) diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 142bc80b0..e4b468bf0 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -927,9 +927,23 @@ cacChannel::ioid cac::readNotifyRequest ( nciu &chan, unsigned type, } } +// lock must be applied +bool cac::blockForIOCallbackCompletion ( const cacChannel::ioid & id ) +{ + while ( this->ioInProgress && this->ioNotifyInProgressId == id ) { + assert ( this->threadsBlockingOnNotifyCompletion < UINT_MAX ); + this->threadsBlockingOnNotifyCompletion++; + epicsAutoMutexRelease autoRelease ( this->mutex ); + this->notifyCompletionEvent.wait ( 0.5 ); + assert ( this->threadsBlockingOnNotifyCompletion > 0u ); + this->threadsBlockingOnNotifyCompletion--; + } + return this->threadsBlockingOnNotifyCompletion > 0u; +} + void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id ) { - bool signalNeeded; + bool signalNeeded = false; { epicsAutoMutex autoMutex ( this->mutex ); baseNMIU * pmiu = this->ioTable.remove ( id ); @@ -939,17 +953,9 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id ) if ( pSubscr ) { chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); } + signalNeeded = this->blockForIOCallbackCompletion ( id ); pmiu->destroy ( *this ); } - assert ( this->threadsBlockingOnNotifyCompletion < UINT_MAX ); - this->threadsBlockingOnNotifyCompletion++; - while ( this->ioInProgress && this->ioNotifyInProgressId == id ) { - epicsAutoMutexRelease autoRelease ( this->mutex ); - this->notifyCompletionEvent.wait ( 0.5 ); - } - assert ( this->threadsBlockingOnNotifyCompletion > 0u ); - this->threadsBlockingOnNotifyCompletion--; - signalNeeded = this->threadsBlockingOnNotifyCompletion > 0u; } if ( signalNeeded ) { this->notifyCompletionEvent.signal (); @@ -980,7 +986,7 @@ void cac::ioCompletionNotify ( unsigned id, unsigned type, pmiu->completion ( type, count, pData ); } // threads blocked canceling this IO will wait - // until we stop processing this IO + // until we stop processing it this->ioInProgress = false; if ( this->threadsBlockingOnNotifyCompletion ) { this->notifyCompletionEvent.signal (); @@ -1001,7 +1007,7 @@ void cac::ioExceptionNotify ( unsigned id, int status, const char *pContext ) pmiu->exception ( status, pContext ); } // threads blocked canceling this IO will wait - // until we stop processing this IO + // until we stop processing it this->ioInProgress = false; if ( this->threadsBlockingOnNotifyCompletion ) { this->notifyCompletionEvent.signal (); @@ -1023,7 +1029,7 @@ void cac::ioExceptionNotify ( unsigned id, int status, pmiu->exception ( status, pContext, type, count ); } // threads blocked canceling this IO will wait - // until we stop processing this IO + // until we stop processing it this->ioInProgress = false; if ( this->threadsBlockingOnNotifyCompletion ) { this->notifyCompletionEvent.signal (); @@ -1046,7 +1052,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id ) } pmiu->destroy ( *this ); // threads blocked canceling this IO will wait - // until we stop processing this IO + // until we stop processing it this->ioInProgress = false; if ( this->threadsBlockingOnNotifyCompletion ) { this->notifyCompletionEvent.signal (); @@ -1070,7 +1076,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id, } pmiu->destroy ( *this ); // threads blocked canceling this IO will wait - // until we stop processing this IO + // until we stop processing it this->ioInProgress = false; if ( this->threadsBlockingOnNotifyCompletion ) { this->notifyCompletionEvent.signal (); @@ -1094,7 +1100,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, } pmiu->destroy ( *this ); // threads blocked canceling this IO will wait - // until we stop processing this IO + // until we stop processing it this->ioInProgress = false; if ( this->threadsBlockingOnNotifyCompletion ) { this->notifyCompletionEvent.signal (); @@ -1118,7 +1124,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, } pmiu->destroy ( *this ); // threads blocked canceling this IO will wait - // until we stop processing this IO + // until we stop processing it this->ioInProgress = false; if ( this->threadsBlockingOnNotifyCompletion ) { this->notifyCompletionEvent.signal (); @@ -1128,76 +1134,97 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, // resubscribe for monitors from this channel void cac::connectAllIO ( nciu &chan ) { - tsDLList < baseNMIU > tmpList; - epicsAutoMutex autoMutex ( this->mutex ); - tsDLIterBD < baseNMIU > pNetIO = - chan.cacPrivateListOfIO::eventq.firstIter (); - while ( pNetIO.valid () ) { - tsDLIterBD < baseNMIU > next = pNetIO; - next++; - class netSubscription *pSubscr = pNetIO->isSubscription (); - if ( pSubscr ) { - try { - chan.getPIIU()->subscriptionRequest ( chan, *pSubscr ); + bool signalNeeded; + { + tsDLList < baseNMIU > tmpList; + epicsAutoMutex autoMutex ( this->mutex ); + tsDLIterBD < baseNMIU > pNetIO = + chan.cacPrivateListOfIO::eventq.firstIter (); + while ( pNetIO.valid () ) { + tsDLIterBD < baseNMIU > next = pNetIO; + next++; + class netSubscription *pSubscr = pNetIO->isSubscription (); + if ( pSubscr ) { + try { + chan.getPIIU()->subscriptionRequest ( chan, *pSubscr ); + } + catch ( ... ) { + this->printf ( "cac: invalid subscription request ignored\n" ); + } } - catch ( ... ) { - this->printf ( "cac: invalid subscription request ignored\n" ); + else { + // it shouldnt be here at this point - so uninstall it + this->ioTable.remove ( *pNetIO ); + chan.cacPrivateListOfIO::eventq.remove ( *pNetIO ); + tmpList.add ( *pNetIO ); } + pNetIO = next; } - else { - // it shouldnt be here at this point - so uninstall it - this->ioTable.remove ( *pNetIO ); - chan.cacPrivateListOfIO::eventq.remove ( *pNetIO ); - tmpList.add ( *pNetIO ); + chan.getPIIU()->flushRequest (); + while ( baseNMIU *pIO = tmpList.get () ) { + signalNeeded = this->blockForIOCallbackCompletion ( pIO->getID() ); + pIO->destroy ( *this ); } - pNetIO = next; } - chan.getPIIU()->flushRequest (); - while ( baseNMIU *pIO = tmpList.get () ) { - pIO->destroy ( *this ); + if ( signalNeeded ) { + this->notifyCompletionEvent.signal (); } } // cancel IO operations and monitor subscriptions void cac::disconnectAllIO ( nciu &chan ) { - epicsAutoMutex autoMutex ( this->mutex ); - tsDLList < baseNMIU > tmpList; - tsDLIterBD < baseNMIU > pNetIO = - chan.cacPrivateListOfIO::eventq.firstIter (); - while ( pNetIO.valid () ) { - tsDLIterBD < baseNMIU > next = pNetIO; - next++; - if ( ! pNetIO->isSubscription () ) { - // no use after disconnected - so uninstall it - this->ioTable.remove ( *pNetIO ); - chan.cacPrivateListOfIO::eventq.remove ( *pNetIO ); - tmpList.add ( *pNetIO ); + bool signalNeeded; + { + epicsAutoMutex autoMutex ( this->mutex ); + tsDLList < baseNMIU > tmpList; + tsDLIterBD < baseNMIU > pNetIO = + chan.cacPrivateListOfIO::eventq.firstIter (); + while ( pNetIO.valid () ) { + tsDLIterBD < baseNMIU > next = pNetIO; + next++; + if ( ! pNetIO->isSubscription () ) { + // no use after disconnected - so uninstall it + this->ioTable.remove ( *pNetIO ); + chan.cacPrivateListOfIO::eventq.remove ( *pNetIO ); + tmpList.add ( *pNetIO ); + } + pNetIO = next; + } + while ( baseNMIU *pIO = tmpList.get () ) { + char buf[128]; + sprintf ( buf, "host = %100s", chan.pHostName() ); + { + epicsAutoMutexRelease unlocker ( this->mutex ); + pIO->exception ( ECA_DISCONN, buf ); + } + signalNeeded = this->blockForIOCallbackCompletion ( pIO->getID() ); + pIO->destroy ( *this ); } - pNetIO = next; } - while ( baseNMIU *pIO = tmpList.get () ) { - char buf[128]; - sprintf ( buf, "host = %100s", chan.pHostName() ); - { - epicsAutoMutexRelease unlocker ( this->mutex ); - pIO->exception ( ECA_DISCONN, buf ); - } - pIO->destroy ( *this ); + if ( signalNeeded ) { + this->notifyCompletionEvent.signal (); } } void cac::destroyAllIO ( nciu &chan ) { - epicsAutoMutex autoMutex ( this->mutex ); - while ( baseNMIU *pIO = chan.cacPrivateListOfIO::eventq.get() ) { - this->ioTable.remove ( *pIO ); - this->flushIfRequired ( chan ); - class netSubscription *pSubscr = pIO->isSubscription (); - if ( pSubscr ) { - chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); + bool signalNeeded; + { + epicsAutoMutex autoMutex ( this->mutex ); + while ( baseNMIU *pIO = chan.cacPrivateListOfIO::eventq.get() ) { + this->ioTable.remove ( *pIO ); + this->flushIfRequired ( chan ); + class netSubscription *pSubscr = pIO->isSubscription (); + if ( pSubscr ) { + chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr ); + } + signalNeeded = this->blockForIOCallbackCompletion ( pIO->getID() ); + pIO->destroy ( *this ); } - pIO->destroy ( *this ); + } + if ( signalNeeded ) { + this->notifyCompletionEvent.signal (); } } diff --git a/src/ca/cac.h b/src/ca/cac.h index 4adc9f8af..90bb8eac2 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -246,6 +246,7 @@ private: int status, const char *pContext ); void ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext, unsigned type, arrayElementCount count ); + bool blockForIOCallbackCompletion ( const cacChannel::ioid & id ); // recv protocol stubs bool noopAction ( tcpiiu &, const caHdrLargeArray &, void *pMsgBdy );