diff --git a/modules/database/src/ioc/db/dbChannel.h b/modules/database/src/ioc/db/dbChannel.h index 70e422405..d4cb9e9bc 100644 --- a/modules/database/src/ioc/db/dbChannel.h +++ b/modules/database/src/ioc/db/dbChannel.h @@ -49,15 +49,24 @@ typedef struct evSubscrip evSubscrip; struct evSubscrip { ELLNODE node; struct dbChannel * chan; + /* user_sub==NULL used to indicate db_cancel_event() */ EVENTFUNC * user_sub; void * user_arg; + /* associated queue, may be shared with other evSubscrip */ struct event_que * ev_que; + /* NULL if !npend. if npend!=0, pointer to last event added to event_que::valque */ db_field_log ** pLastLog; - unsigned long npend; /**< n times this event is on the queue */ - unsigned long nreplace; /**< n times replacing event on the queue */ + /* n times this event is on the queue */ + unsigned long npend; + /* n times replacing event on the queue */ + unsigned long nreplace; + /* DBE mask */ unsigned char select; + /* if set, subscription will yield dbfl_type_val */ char useValque; + /* event_task is handling this subscription */ char callBackInProgress; + /* this node added to dbCommon::mlis */ char enabled; }; #endif diff --git a/modules/database/src/ioc/db/dbEvent.c b/modules/database/src/ioc/db/dbEvent.c index 3016630fb..38ec9e701 100644 --- a/modules/database/src/ioc/db/dbEvent.c +++ b/modules/database/src/ioc/db/dbEvent.c @@ -76,7 +76,6 @@ struct event_que { unsigned short getix; unsigned short quota; /* the number of assigned entries*/ unsigned short nDuplicates; /* N events duplicated on this q */ - unsigned short nCanceled; /* the number of canceled entries */ unsigned possibleStall; }; @@ -92,7 +91,7 @@ struct event_user { void *extralabor_arg;/* parameter to above */ epicsThreadId taskid; /* event handler task id */ - struct evSubscrip *pSuicideEvent; /* event that is deleting itself */ + epicsUInt32 pflush_seq; /* worker cycle count for synchronization */ unsigned queovr; /* event que overflow count */ unsigned char pendexit; /* exit pend task */ unsigned char extra_labor; /* if set call extra labor func */ @@ -123,10 +122,9 @@ static void *dbevFieldLogFreeList; static char *EVENT_PEND_NAME = "eventTask"; -static struct evSubscrip canceledEvent; - static epicsMutexId stopSync; +/* unused space in queue (EVENTQUESIZE when empty) */ static unsigned short ringSpace ( const struct event_que *pevq ) { if ( pevq->evque[pevq->putix] == EVENTQEMPTY ) { @@ -140,17 +138,11 @@ static unsigned short ringSpace ( const struct event_que *pevq ) return 0; } -/* - * db_event_list () - */ int db_event_list ( const char *pname, unsigned level ) { return dbel ( pname, level ); } -/* - * dbel () - */ int dbel ( const char *pname, unsigned level ) { DBADDR addr; @@ -218,7 +210,6 @@ int dbel ( const char *pname, unsigned level ) if ( level > 2 ) { unsigned nDuplicates; - unsigned nCanceled; if ( pevent->nreplace ) { printf (", discarded by replacement=%ld", pevent->nreplace); } @@ -227,14 +218,10 @@ int dbel ( const char *pname, unsigned level ) } LOCKEVQUE(pevent->ev_que); nDuplicates = pevent->ev_que->nDuplicates; - nCanceled = pevent->ev_que->nCanceled; UNLOCKEVQUE(pevent->ev_que); if ( nDuplicates ) { printf (", duplicate count =%u\n", nDuplicates ); } - if ( nCanceled ) { - printf (", canceled count =%u\n", nCanceled ); - } } if ( level > 3 ) { @@ -331,7 +318,6 @@ dbEventCtx db_init_events (void) evUser->flowCtrlMode = FALSE; evUser->extraLaborBusy = FALSE; - evUser->pSuicideEvent = NULL; return (dbEventCtx) evUser; fail: if(evUser->lock) @@ -462,8 +448,7 @@ dbEventSubscription db_add_event ( while ( TRUE ) { int success = 0; LOCKEVQUE ( ev_que ); - success = ( ev_que->quota + ev_que->nCanceled < - EVENTQUESIZE - EVENTENTRIES ); + success = ( ev_que->quota < EVENTQUESIZE - EVENTENTRIES ); if ( success ) { ev_que->quota += EVENTENTRIES; } @@ -580,62 +565,52 @@ static void event_remove ( struct event_que *ev_que, void db_cancel_event (dbEventSubscription event) { struct evSubscrip * const pevent = (struct evSubscrip *) event; - unsigned short getix; + struct event_que *que = pevent->ev_que; + char sync = 0; db_event_disable ( event ); - /* - * flag the event as canceled by NULLing out the callback handler - * - * make certain that the event isn't being accessed while - * its call back changes - */ - LOCKEVQUE (pevent->ev_que); + LOCKEVQUE (que); - pevent->user_sub = NULL; + pevent->user_sub = NULL; /* callback pointer doubles as canceled flag */ - /* - * purge this event from the queue - * - * Its better to take this approach rather than waiting - * for the event thread to finish removing this event - * from the queue because the event thread will not - * process if we are in flow control mode. Since blocking - * here will block CA's TCP input queue then a dead lock - * would be possible. - */ - for ( getix = pevent->ev_que->getix; - pevent->ev_que->evque[getix] != EVENTQEMPTY; ) { - if ( pevent->ev_que->evque[getix] == pevent ) { - assert ( pevent->ev_que->nCanceled < USHRT_MAX ); - pevent->ev_que->nCanceled++; - event_remove ( pevent->ev_que, getix, &canceledEvent ); - } - getix = RNGINC ( getix ); - if ( getix == pevent->ev_que->getix ) { - break; - } - } - assert ( pevent->npend == 0u ); + if(pevent->callBackInProgress) { + /* this event callback is pending or in-progress in event_task. */ + if(pevent->ev_que->evUser->taskid != epicsThreadGetIdSelf()) + sync = 1; /* concurrent to event_task, so wait */ - if ( pevent->ev_que->evUser->taskid == epicsThreadGetIdSelf() ) { - pevent->ev_que->evUser->pSuicideEvent = pevent; - } - else { - while ( pevent->callBackInProgress ) { - UNLOCKEVQUE (pevent->ev_que); - epicsEventMustWait ( pevent->ev_que->evUser->pflush_sem ); - LOCKEVQUE (pevent->ev_que); - } + } else if(pevent->npend) { + /* some (now defunct) events in the queue, defer free() to event_task */ + + } else { + /* no other references, cleanup now */ + + pevent->ev_que->quota -= EVENTENTRIES; + freeListFree ( dbevEventSubscriptionFreeList, pevent ); } - pevent->ev_que->quota -= EVENTENTRIES; + UNLOCKEVQUE (que); - UNLOCKEVQUE (pevent->ev_que); - - freeListFree ( dbevEventSubscriptionFreeList, pevent ); - - return; + if(sync) { + /* wait for worker to cycle */ + struct event_user *evUser = que->evUser; + epicsUInt32 curSeq; + epicsMutexMustLock ( evUser->lock ); + /* grab current cycle counter, then wait for it to change */ + curSeq = evUser->pflush_seq; + do { + epicsMutexUnlock( evUser->lock ); + epicsEventMustWait(evUser->pflush_sem); + /* The complexity needed to track the # of waiters does not seem + * worth it for the relatively rare situation of concurrent cancel. + * So uncondtionally re-trigger. This will result in one spurious + * wakeup for each cancellation. + */ + epicsEventTrigger(evUser->pflush_sem); + epicsMutexMustLock ( evUser->lock ); + } while(curSeq == evUser->pflush_seq); + epicsMutexUnlock( evUser->lock ); + } } /* @@ -935,10 +910,7 @@ void db_post_single_event (dbEventSubscription event) */ static int event_read ( struct event_que *ev_que ) { - db_field_log *pfl; int notifiedRemaining = 0; - void ( *user_sub ) ( void *user_arg, struct dbChannel *chan, - int eventsRemaining, db_field_log *pfl ); /* * evUser ring buffer must be locked for the multiple @@ -959,19 +931,7 @@ static int event_read ( struct event_que *ev_que ) while ( ev_que->evque[ev_que->getix] != EVENTQEMPTY ) { struct evSubscrip *pevent = ev_que->evque[ev_que->getix]; int eventsRemaining; - - pfl = ev_que->valque[ev_que->getix]; - if ( pevent == &canceledEvent ) { - ev_que->evque[ev_que->getix] = EVENTQEMPTY; - if (ev_que->valque[ev_que->getix]) { - db_delete_field_log(ev_que->valque[ev_que->getix]); - ev_que->valque[ev_que->getix] = NULL; - } - ev_que->getix = RNGINC ( ev_que->getix ); - assert ( ev_que->nCanceled > 0 ); - ev_que->nCanceled--; - continue; - } + db_field_log *pfl = ev_que->valque[ev_que->getix]; /* * Simple type values queued up for reliable interprocess @@ -981,13 +941,7 @@ static int event_read ( struct event_que *ev_que ) event_remove ( ev_que, ev_que->getix, EVENTQEMPTY ); ev_que->getix = RNGINC ( ev_que->getix ); - eventsRemaining = ev_que->evque[ev_que->getix] != EVENTQEMPTY && !ev_que->nCanceled; - - /* - * create a local copy of the call back parameters while - * we still have the lock - */ - user_sub = pevent->user_sub; + eventsRemaining = ev_que->evque[ev_que->getix] != EVENTQEMPTY; /* * Next event pointer can be used by event tasks to determine @@ -999,14 +953,12 @@ static int event_read ( struct event_que *ev_que ) * record lock, and it is calling db_post_events() waiting * for the event queue lock (which this thread now has). */ - if ( user_sub ) { - /* - * This provides a way to test to see if an event is in use - * despite the fact that the event queue does not point to - * it. - */ + if ( pevent->user_sub ) { + EVENTFUNC* user_sub = pevent->user_sub; pevent->callBackInProgress = TRUE; + UNLOCKEVQUE (ev_que); + /* Run post-event-queue filter chain */ if (ellCount(&pevent->chan->post_chain)) { pfl = dbChannelRunPostChain(pevent->chan, pfl); @@ -1017,25 +969,15 @@ static int event_read ( struct event_que *ev_que ) eventsRemaining, pfl ); notifiedRemaining = eventsRemaining; } - LOCKEVQUE (ev_que); - /* concurrent db_cancel_event() may have free()'d pevent */ - /* - * check to see if this event has been canceled each - * time that the callBackInProgress flag is set to false - * while we have the event queue lock, and post the flush - * complete sem if there are no longer any events on the - * queue - */ - if ( ev_que->evUser->pSuicideEvent == pevent ) { - ev_que->evUser->pSuicideEvent = NULL; - } - else { - pevent->callBackInProgress = FALSE; - if ( pevent->user_sub==NULL && pevent->npend==0u ) { - epicsEventSignal ( ev_que->evUser->pflush_sem ); - } - } + LOCKEVQUE (ev_que); + + pevent->callBackInProgress = FALSE; + } + /* callback may have called db_cancel_event(), so must check user_sub again */ + if(!pevent->user_sub && !pevent->npend) { + pevent->ev_que->quota -= EVENTENTRIES; + freeListFree ( dbevEventSubscriptionFreeList, pevent ); } db_delete_field_log(pfl); } @@ -1050,9 +992,6 @@ static int event_read ( struct event_que *ev_que ) return DB_EVENT_OK; } -/* - * EVENT_TASK() - */ static void event_task (void *pParm) { struct event_user * const evUser = (struct event_user *) pParm; @@ -1069,6 +1008,7 @@ static void event_task (void *pParm) do { void (*pExtraLaborSub) (void *); void *pExtraLaborArg; + char wake; epicsEventMustWait(evUser->ppendsem); /* @@ -1093,15 +1033,20 @@ static void event_task (void *pParm) } evUser->extraLaborBusy = FALSE; - for ( ev_que = &evUser->firstque; ev_que; - ev_que = ev_que->nextque ) { + for ( ev_que = &evUser->firstque; ev_que; ev_que = ev_que->nextque ) { + /* unlock during iteration is safe as event_que will not be free'd */ epicsMutexUnlock ( evUser->lock ); event_read (ev_que); epicsMutexMustLock ( evUser->lock ); } pendexit = evUser->pendexit; + + evUser->pflush_seq++; + epicsMutexUnlock ( evUser->lock ); + epicsEventSignal(evUser->pflush_sem); + } while( ! pendexit ); epicsMutexDestroy(evUser->firstque.writelock);