From c90112a04ca4d167c7b4ca66d2e2e13fbcf4f7bc Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Mon, 27 Aug 2001 17:15:37 +0000 Subject: [PATCH] fixed lockup when deleting events in flow control mode --- src/db/dbEvent.c | 199 +++++++++++++++++++++++++---------------------- 1 file changed, 107 insertions(+), 92 deletions(-) diff --git a/src/db/dbEvent.c b/src/db/dbEvent.c index e18433bbb..0b5063c0b 100644 --- a/src/db/dbEvent.c +++ b/src/db/dbEvent.c @@ -46,8 +46,8 @@ of this distribution. #include "dbEvent.h" #define EVENTSPERQUE 32 -#define EVENTQUESIZE (EVENTENTRIES * EVENTSPERQUE) #define EVENTENTRIES 4 /* the number of que entries for each event */ +#define EVENTQUESIZE (EVENTENTRIES * EVENTSPERQUE) #define EVENTQEMPTY ((struct evSubscrip *)NULL) /* @@ -87,8 +87,8 @@ struct event_que { struct event_user { struct event_que firstque; /* the first event que */ - epicsEventId ppendsem; /* Wait while empty */ - epicsEventId pflush_sem; /* wait for flush */ + epicsEventId ppendsem; /* Wait while empty */ + epicsEventId pflush_sem; /* wait for flush */ OVRFFUNC *overflow_sub; /* called when overflow detect */ void *overflow_arg; /* parameter to above */ @@ -96,31 +96,21 @@ struct event_user { EXTRALABORFUNC *extralabor_sub;/* off load to event task */ void *extralabor_arg;/* parameter to above */ - epicsThreadId taskid; /* event handler task id */ + epicsThreadId taskid; /* event handler task id */ unsigned queovr; /* event que overflow count */ unsigned char pendexit; /* exit pend task */ unsigned char extra_labor; /* if set call extra labor func */ unsigned char flowCtrlMode; /* replace existing monitor */ void (*init_func)(); - epicsThreadId init_func_arg; + epicsThreadId init_func_arg; }; - -/* what to do with unrecoverable errors */ -#define abort(S) cantProceed ("dbEvent abort") - /* * Reliable intertask communication requires copying the current value of the * channel for later queing so 3 stepper motor steps of 10 each do not turn * into only 10 or 20 total steps part of the time. - * - * NOTE: locks on this data structure are optimized so a test and set call is - * made first. If the lock is allready set then the task pends on the lock - * pend sem. Test and set call is much faster than a semaphore. See - * LOCKEVUSER. */ - #define RNGINC(OLD)\ ((OLD)+1)>=EVENTQUESIZE ? 0 : ((OLD)+1) @@ -148,12 +138,16 @@ LOCAL void *dbevEventBlockFreeList; static char *EVENT_PEND_NAME = "eventTask"; +int DBEDEBUG = 0; + +static struct evSubscrip canceledEvent; + /* * db_event_list () */ -int epicsShareAPI db_event_list (const char *pname, unsigned level) +int epicsShareAPI db_event_list ( const char *pname, unsigned level ) { - return dbel (pname, level); + return dbel ( pname, level ); } /* @@ -380,7 +374,6 @@ dbEventSubscription epicsShareAPI db_add_event ( freeListFree (dbevEventQueueFreeList, tmp_que); return NULL; } - tmp_que->nDuplicates = 0u; ev_que->nextque = tmp_que; ev_que = tmp_que; break; @@ -450,6 +443,27 @@ void epicsShareAPI db_event_disable (dbEventSubscription es) UNLOCKREC(precord); } +/* + * event_remove() + * event queue lock _must_ be applied + */ +LOCAL void event_remove ( struct event_que *ev_que, + unsigned short index, struct evSubscrip *placeHolder ) +{ + struct evSubscrip *pEvent = ev_que->evque[index]; + + ev_que->evque[index] = placeHolder; + if ( pEvent->npend == 1u ) { + pEvent->pLastLog = NULL; + } + else { + assert ( pEvent->npend > 1u ); + assert ( ev_que->nDuplicates >= 1u ); + ev_que->nDuplicates--; + } + pEvent->npend--; +} + /* * DB_CANCEL_EVENT() * @@ -462,14 +476,15 @@ void epicsShareAPI db_event_disable (dbEventSubscription es) */ void epicsShareAPI db_cancel_event (dbEventSubscription es) { - struct evSubscrip *pevent = (struct evSubscrip *) es; - struct dbCommon *precord; + struct evSubscrip * pevent = ( struct evSubscrip * ) es; + struct dbCommon * precord; + unsigned short getix; - precord = (struct dbCommon *) pevent->paddr->precord; + precord = ( struct dbCommon * ) pevent->paddr->precord; - LOCKREC(precord); - ellDelete((ELLLIST*)&precord->mlis, &pevent->node); - UNLOCKREC(precord); + LOCKREC ( precord ); + ellDelete ( ( ELLLIST* ) & precord->mlis, &pevent->node ); + UNLOCKREC ( precord ); /* * flag the event as canceled by NULLing out the callback handler @@ -477,23 +492,40 @@ void epicsShareAPI db_cancel_event (dbEventSubscription es) * make certain that the event isnt being accessed while * its call back changes */ - LOCKEVQUE (pevent->ev_que) + LOCKEVQUE ( pevent->ev_que ) pevent->user_sub = NULL; - while (pevent->npend || pevent->callBackInProgress) { - UNLOCKEVQUE(pevent->ev_que) - epicsEventWaitWithTimeout(pevent->ev_que->evUser->pflush_sem, 1.0); - LOCKEVQUE(pevent->ev_que) - } /* - * Decrement event que quota + * purge this event from the queue + * + * Its better to take this approach rather than waiting + * for the event thread to finish removing this event + * from the queue because the event thread will not + * process if we are in flow control mode. Since blocking + * here will block CA's TCP input queue then a dead lock + * would be possible. */ + for ( getix = pevent->ev_que->getix; + pevent->ev_que->evque[getix] != EVENTQEMPTY; + getix = RNGINC ( getix ) ) { + if ( pevent->ev_que->evque[getix] == pevent ) { + event_remove ( pevent->ev_que, getix, &canceledEvent ); + } + } + assert ( pevent->npend == 0u ); + + while ( pevent->callBackInProgress ) { + UNLOCKEVQUE ( pevent->ev_que ) + epicsEventMustWait ( pevent->ev_que->evUser->pflush_sem ); + LOCKEVQUE ( pevent->ev_que ) + } + pevent->ev_que->quota -= EVENTENTRIES; - UNLOCKEVQUE (pevent->ev_que) + UNLOCKEVQUE ( pevent->ev_que ) - freeListFree (dbevEventBlockFreeList, pevent); + freeListFree ( dbevEventBlockFreeList, pevent ); return; } @@ -738,82 +770,64 @@ void epicsShareAPI db_post_single_event (dbEventSubscription es) /* * EVENT_READ() */ -LOCAL int event_read (struct event_que *ev_que) +LOCAL int event_read ( struct event_que *ev_que ) { - struct evSubscrip *event; - unsigned int nextgetix; db_field_log *pfl; - void (*user_sub) (void *user_arg, struct dbAddr *paddr, - int eventsRemaining, db_field_log *pfl); + void ( *user_sub ) ( void *user_arg, struct dbAddr *paddr, + int eventsRemaining, db_field_log *pfl ); + if ( DBEDEBUG > 0 && EVENTQUESIZE - RNGSPACE( ev_que ) ) { + printf ( "processing event queue %p with %u occupied entries\n", + ev_que, EVENTQUESIZE - RNGSPACE( ev_que ) ); + } + /* * evUser ring buffer must be locked for the multiple * threads writing/reading it */ - LOCKEVQUE(ev_que) + LOCKEVQUE ( ev_que ) /* * if in flow control mode drain duplicates and then * suspend processing events until flow control * mode is over */ - if (ev_que->evUser->flowCtrlMode && ev_que->nDuplicates==0u) { + if ( ev_que->evUser->flowCtrlMode && ev_que->nDuplicates == 0u ) { UNLOCKEVQUE(ev_que); - return DB_EVENT_OK; + if ( DBEDEBUG > 0 ) { + printf ( "ignoring queue due to flow control mode being active\n" ); + } + return DB_EVENT_OK; } - /* - * Fetch fast register copy - */ - for( event=ev_que->evque[ev_que->getix]; - (event) != EVENTQEMPTY; - ev_que->getix = nextgetix, event = ev_que->evque[nextgetix]){ - + while ( ev_que->evque[ev_que->getix] != EVENTQEMPTY ) { db_field_log fl = ev_que->valque[ev_que->getix]; + struct evSubscrip *event = ev_que->evque[ev_que->getix]; - /* - * So I can tell em if more are comming - */ - nextgetix = RNGINC(ev_que->getix); - + if ( event == &canceledEvent ) { + ev_que->evque[ev_que->getix] = EVENTQEMPTY; + ev_que->getix = RNGINC ( ev_que->getix ); + continue; + } /* * Simple type values queued up for reliable interprocess * communication. (for other types they get whatever happens * to be there upon wakeup) */ - if (event->valque) { + if ( event->valque ) { pfl = &fl; } else { pfl = NULL; } - ev_que->evque[ev_que->getix] = EVENTQEMPTY; + event_remove ( ev_que, ev_que->getix, EVENTQEMPTY ); + ev_que->getix = RNGINC ( ev_que->getix ); - /* - * remove event from the queue - */ - if (event->npend==1u) { - event->pLastLog = NULL; + if ( DBEDEBUG > 1 ) { + printf ( "processing event %p\n", (void *) event ); } - else { - assert (event->npend>1u); - assert (ev_que->nDuplicates>=1u); - ev_que->nDuplicates--; - } - - /* - * this provides a way to test to see if an event is in use - * despite the fact that the event queue does not point to this event - */ - event->callBackInProgress = TRUE; - - /* - * it is essential that the npend count is not lowered - * before the callBackInProgress flag is set - */ - event->npend--; /* * create a local copy of the call back parameters while @@ -831,19 +845,20 @@ LOCAL int event_read (struct event_que *ev_que) * record lock, and it is calling db_post_events() waiting * for the event queue lock (which this thread now has). */ - if (user_sub != NULL) { - UNLOCKEVQUE(ev_que) - (*user_sub) (event->user_arg, event->paddr, - ev_que->evque[nextgetix]?TRUE:FALSE, pfl); - LOCKEVQUE(ev_que) + if ( user_sub ) { + /* + * This provides a way to test to see if an event is in use + * despite the fact that the event queue does not point to + * it. + */ + event->callBackInProgress = TRUE; + UNLOCKEVQUE ( ev_que ) + ( *user_sub ) ( event->user_arg, event->paddr, + ev_que->evque[ev_que->getix] != EVENTQEMPTY, pfl ); + LOCKEVQUE ( ev_que ) + event->callBackInProgress = FALSE; } - /* - * this provides a way to test to see if an event is in use - * despite the fact that the event queue does not point to this event - */ - event->callBackInProgress = FALSE; - /* * check to see if this event has been canceled each * time that the callBackInProgress flag is set to false @@ -851,12 +866,12 @@ LOCAL int event_read (struct event_que *ev_que) * complete sem if there are no longer any events on the * queue */ - if (event->user_sub==NULL && event->npend==0u) { - epicsEventSignal (ev_que->evUser->pflush_sem); + if ( event->user_sub==NULL && event->npend==0u ) { + epicsEventSignal ( ev_que->evUser->pflush_sem ); } } - UNLOCKEVQUE(ev_que) + UNLOCKEVQUE ( ev_que ) return DB_EVENT_OK; } @@ -1003,6 +1018,6 @@ void epicsShareAPI db_event_flow_ctrl_mode_off (dbEventCtx ctx) #ifdef DEBUG printf("fc off %lu\n", tickGet()); #endif -} +}