IO deletes must wait for callback completion

This commit is contained in:
Jeff Hill
2001-06-19 20:05:44 +00:00
parent 843595ed42
commit ab52e91d66
2 changed files with 96 additions and 68 deletions
+95 -68
View File
@@ -927,9 +927,23 @@ cacChannel::ioid cac::readNotifyRequest ( nciu &chan, unsigned type,
}
}
// lock must be applied
bool cac::blockForIOCallbackCompletion ( const cacChannel::ioid & id )
{
while ( this->ioInProgress && this->ioNotifyInProgressId == id ) {
assert ( this->threadsBlockingOnNotifyCompletion < UINT_MAX );
this->threadsBlockingOnNotifyCompletion++;
epicsAutoMutexRelease autoRelease ( this->mutex );
this->notifyCompletionEvent.wait ( 0.5 );
assert ( this->threadsBlockingOnNotifyCompletion > 0u );
this->threadsBlockingOnNotifyCompletion--;
}
return this->threadsBlockingOnNotifyCompletion > 0u;
}
void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
{
bool signalNeeded;
bool signalNeeded = false;
{
epicsAutoMutex autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.remove ( id );
@@ -939,17 +953,9 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
if ( pSubscr ) {
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
}
signalNeeded = this->blockForIOCallbackCompletion ( id );
pmiu->destroy ( *this );
}
assert ( this->threadsBlockingOnNotifyCompletion < UINT_MAX );
this->threadsBlockingOnNotifyCompletion++;
while ( this->ioInProgress && this->ioNotifyInProgressId == id ) {
epicsAutoMutexRelease autoRelease ( this->mutex );
this->notifyCompletionEvent.wait ( 0.5 );
}
assert ( this->threadsBlockingOnNotifyCompletion > 0u );
this->threadsBlockingOnNotifyCompletion--;
signalNeeded = this->threadsBlockingOnNotifyCompletion > 0u;
}
if ( signalNeeded ) {
this->notifyCompletionEvent.signal ();
@@ -980,7 +986,7 @@ void cac::ioCompletionNotify ( unsigned id, unsigned type,
pmiu->completion ( type, count, pData );
}
// threads blocked canceling this IO will wait
// until we stop processing this IO
// until we stop processing it
this->ioInProgress = false;
if ( this->threadsBlockingOnNotifyCompletion ) {
this->notifyCompletionEvent.signal ();
@@ -1001,7 +1007,7 @@ void cac::ioExceptionNotify ( unsigned id, int status, const char *pContext )
pmiu->exception ( status, pContext );
}
// threads blocked canceling this IO will wait
// until we stop processing this IO
// until we stop processing it
this->ioInProgress = false;
if ( this->threadsBlockingOnNotifyCompletion ) {
this->notifyCompletionEvent.signal ();
@@ -1023,7 +1029,7 @@ void cac::ioExceptionNotify ( unsigned id, int status,
pmiu->exception ( status, pContext, type, count );
}
// threads blocked canceling this IO will wait
// until we stop processing this IO
// until we stop processing it
this->ioInProgress = false;
if ( this->threadsBlockingOnNotifyCompletion ) {
this->notifyCompletionEvent.signal ();
@@ -1046,7 +1052,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id )
}
pmiu->destroy ( *this );
// threads blocked canceling this IO will wait
// until we stop processing this IO
// until we stop processing it
this->ioInProgress = false;
if ( this->threadsBlockingOnNotifyCompletion ) {
this->notifyCompletionEvent.signal ();
@@ -1070,7 +1076,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id,
}
pmiu->destroy ( *this );
// threads blocked canceling this IO will wait
// until we stop processing this IO
// until we stop processing it
this->ioInProgress = false;
if ( this->threadsBlockingOnNotifyCompletion ) {
this->notifyCompletionEvent.signal ();
@@ -1094,7 +1100,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
}
pmiu->destroy ( *this );
// threads blocked canceling this IO will wait
// until we stop processing this IO
// until we stop processing it
this->ioInProgress = false;
if ( this->threadsBlockingOnNotifyCompletion ) {
this->notifyCompletionEvent.signal ();
@@ -1118,7 +1124,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
}
pmiu->destroy ( *this );
// threads blocked canceling this IO will wait
// until we stop processing this IO
// until we stop processing it
this->ioInProgress = false;
if ( this->threadsBlockingOnNotifyCompletion ) {
this->notifyCompletionEvent.signal ();
@@ -1128,76 +1134,97 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
// resubscribe for monitors from this channel
void cac::connectAllIO ( nciu &chan )
{
tsDLList < baseNMIU > tmpList;
epicsAutoMutex autoMutex ( this->mutex );
tsDLIterBD < baseNMIU > pNetIO =
chan.cacPrivateListOfIO::eventq.firstIter ();
while ( pNetIO.valid () ) {
tsDLIterBD < baseNMIU > next = pNetIO;
next++;
class netSubscription *pSubscr = pNetIO->isSubscription ();
if ( pSubscr ) {
try {
chan.getPIIU()->subscriptionRequest ( chan, *pSubscr );
bool signalNeeded;
{
tsDLList < baseNMIU > tmpList;
epicsAutoMutex autoMutex ( this->mutex );
tsDLIterBD < baseNMIU > pNetIO =
chan.cacPrivateListOfIO::eventq.firstIter ();
while ( pNetIO.valid () ) {
tsDLIterBD < baseNMIU > next = pNetIO;
next++;
class netSubscription *pSubscr = pNetIO->isSubscription ();
if ( pSubscr ) {
try {
chan.getPIIU()->subscriptionRequest ( chan, *pSubscr );
}
catch ( ... ) {
this->printf ( "cac: invalid subscription request ignored\n" );
}
}
catch ( ... ) {
this->printf ( "cac: invalid subscription request ignored\n" );
else {
// it shouldnt be here at this point - so uninstall it
this->ioTable.remove ( *pNetIO );
chan.cacPrivateListOfIO::eventq.remove ( *pNetIO );
tmpList.add ( *pNetIO );
}
pNetIO = next;
}
else {
// it shouldnt be here at this point - so uninstall it
this->ioTable.remove ( *pNetIO );
chan.cacPrivateListOfIO::eventq.remove ( *pNetIO );
tmpList.add ( *pNetIO );
chan.getPIIU()->flushRequest ();
while ( baseNMIU *pIO = tmpList.get () ) {
signalNeeded = this->blockForIOCallbackCompletion ( pIO->getID() );
pIO->destroy ( *this );
}
pNetIO = next;
}
chan.getPIIU()->flushRequest ();
while ( baseNMIU *pIO = tmpList.get () ) {
pIO->destroy ( *this );
if ( signalNeeded ) {
this->notifyCompletionEvent.signal ();
}
}
// cancel IO operations and monitor subscriptions
void cac::disconnectAllIO ( nciu &chan )
{
epicsAutoMutex autoMutex ( this->mutex );
tsDLList < baseNMIU > tmpList;
tsDLIterBD < baseNMIU > pNetIO =
chan.cacPrivateListOfIO::eventq.firstIter ();
while ( pNetIO.valid () ) {
tsDLIterBD < baseNMIU > next = pNetIO;
next++;
if ( ! pNetIO->isSubscription () ) {
// no use after disconnected - so uninstall it
this->ioTable.remove ( *pNetIO );
chan.cacPrivateListOfIO::eventq.remove ( *pNetIO );
tmpList.add ( *pNetIO );
bool signalNeeded;
{
epicsAutoMutex autoMutex ( this->mutex );
tsDLList < baseNMIU > tmpList;
tsDLIterBD < baseNMIU > pNetIO =
chan.cacPrivateListOfIO::eventq.firstIter ();
while ( pNetIO.valid () ) {
tsDLIterBD < baseNMIU > next = pNetIO;
next++;
if ( ! pNetIO->isSubscription () ) {
// no use after disconnected - so uninstall it
this->ioTable.remove ( *pNetIO );
chan.cacPrivateListOfIO::eventq.remove ( *pNetIO );
tmpList.add ( *pNetIO );
}
pNetIO = next;
}
while ( baseNMIU *pIO = tmpList.get () ) {
char buf[128];
sprintf ( buf, "host = %100s", chan.pHostName() );
{
epicsAutoMutexRelease unlocker ( this->mutex );
pIO->exception ( ECA_DISCONN, buf );
}
signalNeeded = this->blockForIOCallbackCompletion ( pIO->getID() );
pIO->destroy ( *this );
}
pNetIO = next;
}
while ( baseNMIU *pIO = tmpList.get () ) {
char buf[128];
sprintf ( buf, "host = %100s", chan.pHostName() );
{
epicsAutoMutexRelease unlocker ( this->mutex );
pIO->exception ( ECA_DISCONN, buf );
}
pIO->destroy ( *this );
if ( signalNeeded ) {
this->notifyCompletionEvent.signal ();
}
}
void cac::destroyAllIO ( nciu &chan )
{
epicsAutoMutex autoMutex ( this->mutex );
while ( baseNMIU *pIO = chan.cacPrivateListOfIO::eventq.get() ) {
this->ioTable.remove ( *pIO );
this->flushIfRequired ( chan );
class netSubscription *pSubscr = pIO->isSubscription ();
if ( pSubscr ) {
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
bool signalNeeded;
{
epicsAutoMutex autoMutex ( this->mutex );
while ( baseNMIU *pIO = chan.cacPrivateListOfIO::eventq.get() ) {
this->ioTable.remove ( *pIO );
this->flushIfRequired ( chan );
class netSubscription *pSubscr = pIO->isSubscription ();
if ( pSubscr ) {
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
}
signalNeeded = this->blockForIOCallbackCompletion ( pIO->getID() );
pIO->destroy ( *this );
}
pIO->destroy ( *this );
}
if ( signalNeeded ) {
this->notifyCompletionEvent.signal ();
}
}
+1
View File
@@ -246,6 +246,7 @@ private:
int status, const char *pContext );
void ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext, unsigned type, arrayElementCount count );
bool blockForIOCallbackCompletion ( const cacChannel::ioid & id );
// recv protocol stubs
bool noopAction ( tcpiiu &, const caHdrLargeArray &, void *pMsgBdy );