dbEvent: handle multiple db_event_cancel()

Allow for multiple db_event_cancel() (concurrent or
self-cancel) prior to event_task wakeup.

In db_event_cancel(), immediate free() only if idle
(not queued or in progress).  Otherwise, defer free()
to event task.  Avoids need to immediately expunge
canceled event from queue.  Allow event task to
process canceled events as normal (except no user_sub)
until npend==0.
This commit is contained in:
Michael Davidsaver
2023-09-14 10:11:25 +02:00
committed by Dirk Zimoch
parent 3fd79a21a2
commit 42dfca2b54
2 changed files with 73 additions and 119 deletions

View File

@@ -49,15 +49,24 @@ typedef struct evSubscrip evSubscrip;
struct evSubscrip {
ELLNODE node;
struct dbChannel * chan;
/* user_sub==NULL used to indicate db_cancel_event() */
EVENTFUNC * user_sub;
void * user_arg;
/* associated queue, may be shared with other evSubscrip */
struct event_que * ev_que;
/* NULL if !npend. if npend!=0, pointer to last event added to event_que::valque */
db_field_log ** pLastLog;
unsigned long npend; /**< n times this event is on the queue */
unsigned long nreplace; /**< n times replacing event on the queue */
/* n times this event is on the queue */
unsigned long npend;
/* n times replacing event on the queue */
unsigned long nreplace;
/* DBE mask */
unsigned char select;
/* if set, subscription will yield dbfl_type_val */
char useValque;
/* event_task is handling this subscription */
char callBackInProgress;
/* this node added to dbCommon::mlis */
char enabled;
};
#endif

View File

@@ -76,7 +76,6 @@ struct event_que {
unsigned short getix;
unsigned short quota; /* the number of assigned entries*/
unsigned short nDuplicates; /* N events duplicated on this q */
unsigned short nCanceled; /* the number of canceled entries */
unsigned possibleStall;
};
@@ -92,7 +91,7 @@ struct event_user {
void *extralabor_arg;/* parameter to above */
epicsThreadId taskid; /* event handler task id */
struct evSubscrip *pSuicideEvent; /* event that is deleting itself */
epicsUInt32 pflush_seq; /* worker cycle count for synchronization */
unsigned queovr; /* event que overflow count */
unsigned char pendexit; /* exit pend task */
unsigned char extra_labor; /* if set call extra labor func */
@@ -123,10 +122,9 @@ static void *dbevFieldLogFreeList;
static char *EVENT_PEND_NAME = "eventTask";
static struct evSubscrip canceledEvent;
static epicsMutexId stopSync;
/* unused space in queue (EVENTQUESIZE when empty) */
static unsigned short ringSpace ( const struct event_que *pevq )
{
if ( pevq->evque[pevq->putix] == EVENTQEMPTY ) {
@@ -140,17 +138,11 @@ static unsigned short ringSpace ( const struct event_que *pevq )
return 0;
}
/*
* db_event_list ()
*/
int db_event_list ( const char *pname, unsigned level )
{
return dbel ( pname, level );
}
/*
* dbel ()
*/
int dbel ( const char *pname, unsigned level )
{
DBADDR addr;
@@ -218,7 +210,6 @@ int dbel ( const char *pname, unsigned level )
if ( level > 2 ) {
unsigned nDuplicates;
unsigned nCanceled;
if ( pevent->nreplace ) {
printf (", discarded by replacement=%ld", pevent->nreplace);
}
@@ -227,14 +218,10 @@ int dbel ( const char *pname, unsigned level )
}
LOCKEVQUE(pevent->ev_que);
nDuplicates = pevent->ev_que->nDuplicates;
nCanceled = pevent->ev_que->nCanceled;
UNLOCKEVQUE(pevent->ev_que);
if ( nDuplicates ) {
printf (", duplicate count =%u\n", nDuplicates );
}
if ( nCanceled ) {
printf (", canceled count =%u\n", nCanceled );
}
}
if ( level > 3 ) {
@@ -331,7 +318,6 @@ dbEventCtx db_init_events (void)
evUser->flowCtrlMode = FALSE;
evUser->extraLaborBusy = FALSE;
evUser->pSuicideEvent = NULL;
return (dbEventCtx) evUser;
fail:
if(evUser->lock)
@@ -462,8 +448,7 @@ dbEventSubscription db_add_event (
while ( TRUE ) {
int success = 0;
LOCKEVQUE ( ev_que );
success = ( ev_que->quota + ev_que->nCanceled <
EVENTQUESIZE - EVENTENTRIES );
success = ( ev_que->quota < EVENTQUESIZE - EVENTENTRIES );
if ( success ) {
ev_que->quota += EVENTENTRIES;
}
@@ -580,62 +565,52 @@ static void event_remove ( struct event_que *ev_que,
void db_cancel_event (dbEventSubscription event)
{
struct evSubscrip * const pevent = (struct evSubscrip *) event;
unsigned short getix;
struct event_que *que = pevent->ev_que;
char sync = 0;
db_event_disable ( event );
/*
* flag the event as canceled by NULLing out the callback handler
*
* make certain that the event isn't being accessed while
* its call back changes
*/
LOCKEVQUE (pevent->ev_que);
LOCKEVQUE (que);
pevent->user_sub = NULL;
pevent->user_sub = NULL; /* callback pointer doubles as canceled flag */
/*
* purge this event from the queue
*
* Its better to take this approach rather than waiting
* for the event thread to finish removing this event
* from the queue because the event thread will not
* process if we are in flow control mode. Since blocking
* here will block CA's TCP input queue then a dead lock
* would be possible.
*/
for ( getix = pevent->ev_que->getix;
pevent->ev_que->evque[getix] != EVENTQEMPTY; ) {
if ( pevent->ev_que->evque[getix] == pevent ) {
assert ( pevent->ev_que->nCanceled < USHRT_MAX );
pevent->ev_que->nCanceled++;
event_remove ( pevent->ev_que, getix, &canceledEvent );
}
getix = RNGINC ( getix );
if ( getix == pevent->ev_que->getix ) {
break;
}
}
assert ( pevent->npend == 0u );
if(pevent->callBackInProgress) {
/* this event callback is pending or in-progress in event_task. */
if(pevent->ev_que->evUser->taskid != epicsThreadGetIdSelf())
sync = 1; /* concurrent to event_task, so wait */
if ( pevent->ev_que->evUser->taskid == epicsThreadGetIdSelf() ) {
pevent->ev_que->evUser->pSuicideEvent = pevent;
}
else {
while ( pevent->callBackInProgress ) {
UNLOCKEVQUE (pevent->ev_que);
epicsEventMustWait ( pevent->ev_que->evUser->pflush_sem );
LOCKEVQUE (pevent->ev_que);
}
} else if(pevent->npend) {
/* some (now defunct) events in the queue, defer free() to event_task */
} else {
/* no other references, cleanup now */
pevent->ev_que->quota -= EVENTENTRIES;
freeListFree ( dbevEventSubscriptionFreeList, pevent );
}
pevent->ev_que->quota -= EVENTENTRIES;
UNLOCKEVQUE (que);
UNLOCKEVQUE (pevent->ev_que);
freeListFree ( dbevEventSubscriptionFreeList, pevent );
return;
if(sync) {
/* wait for worker to cycle */
struct event_user *evUser = que->evUser;
epicsUInt32 curSeq;
epicsMutexMustLock ( evUser->lock );
/* grab current cycle counter, then wait for it to change */
curSeq = evUser->pflush_seq;
do {
epicsMutexUnlock( evUser->lock );
epicsEventMustWait(evUser->pflush_sem);
/* The complexity needed to track the # of waiters does not seem
* worth it for the relatively rare situation of concurrent cancel.
* So uncondtionally re-trigger. This will result in one spurious
* wakeup for each cancellation.
*/
epicsEventTrigger(evUser->pflush_sem);
epicsMutexMustLock ( evUser->lock );
} while(curSeq == evUser->pflush_seq);
epicsMutexUnlock( evUser->lock );
}
}
/*
@@ -935,10 +910,7 @@ void db_post_single_event (dbEventSubscription event)
*/
static int event_read ( struct event_que *ev_que )
{
db_field_log *pfl;
int notifiedRemaining = 0;
void ( *user_sub ) ( void *user_arg, struct dbChannel *chan,
int eventsRemaining, db_field_log *pfl );
/*
* evUser ring buffer must be locked for the multiple
@@ -959,19 +931,7 @@ static int event_read ( struct event_que *ev_que )
while ( ev_que->evque[ev_que->getix] != EVENTQEMPTY ) {
struct evSubscrip *pevent = ev_que->evque[ev_que->getix];
int eventsRemaining;
pfl = ev_que->valque[ev_que->getix];
if ( pevent == &canceledEvent ) {
ev_que->evque[ev_que->getix] = EVENTQEMPTY;
if (ev_que->valque[ev_que->getix]) {
db_delete_field_log(ev_que->valque[ev_que->getix]);
ev_que->valque[ev_que->getix] = NULL;
}
ev_que->getix = RNGINC ( ev_que->getix );
assert ( ev_que->nCanceled > 0 );
ev_que->nCanceled--;
continue;
}
db_field_log *pfl = ev_que->valque[ev_que->getix];
/*
* Simple type values queued up for reliable interprocess
@@ -981,13 +941,7 @@ static int event_read ( struct event_que *ev_que )
event_remove ( ev_que, ev_que->getix, EVENTQEMPTY );
ev_que->getix = RNGINC ( ev_que->getix );
eventsRemaining = ev_que->evque[ev_que->getix] != EVENTQEMPTY && !ev_que->nCanceled;
/*
* create a local copy of the call back parameters while
* we still have the lock
*/
user_sub = pevent->user_sub;
eventsRemaining = ev_que->evque[ev_que->getix] != EVENTQEMPTY;
/*
* Next event pointer can be used by event tasks to determine
@@ -999,14 +953,12 @@ static int event_read ( struct event_que *ev_que )
* record lock, and it is calling db_post_events() waiting
* for the event queue lock (which this thread now has).
*/
if ( user_sub ) {
/*
* This provides a way to test to see if an event is in use
* despite the fact that the event queue does not point to
* it.
*/
if ( pevent->user_sub ) {
EVENTFUNC* user_sub = pevent->user_sub;
pevent->callBackInProgress = TRUE;
UNLOCKEVQUE (ev_que);
/* Run post-event-queue filter chain */
if (ellCount(&pevent->chan->post_chain)) {
pfl = dbChannelRunPostChain(pevent->chan, pfl);
@@ -1017,25 +969,15 @@ static int event_read ( struct event_que *ev_que )
eventsRemaining, pfl );
notifiedRemaining = eventsRemaining;
}
LOCKEVQUE (ev_que);
/* concurrent db_cancel_event() may have free()'d pevent */
/*
* check to see if this event has been canceled each
* time that the callBackInProgress flag is set to false
* while we have the event queue lock, and post the flush
* complete sem if there are no longer any events on the
* queue
*/
if ( ev_que->evUser->pSuicideEvent == pevent ) {
ev_que->evUser->pSuicideEvent = NULL;
}
else {
pevent->callBackInProgress = FALSE;
if ( pevent->user_sub==NULL && pevent->npend==0u ) {
epicsEventSignal ( ev_que->evUser->pflush_sem );
}
}
LOCKEVQUE (ev_que);
pevent->callBackInProgress = FALSE;
}
/* callback may have called db_cancel_event(), so must check user_sub again */
if(!pevent->user_sub && !pevent->npend) {
pevent->ev_que->quota -= EVENTENTRIES;
freeListFree ( dbevEventSubscriptionFreeList, pevent );
}
db_delete_field_log(pfl);
}
@@ -1050,9 +992,6 @@ static int event_read ( struct event_que *ev_que )
return DB_EVENT_OK;
}
/*
* EVENT_TASK()
*/
static void event_task (void *pParm)
{
struct event_user * const evUser = (struct event_user *) pParm;
@@ -1069,6 +1008,7 @@ static void event_task (void *pParm)
do {
void (*pExtraLaborSub) (void *);
void *pExtraLaborArg;
char wake;
epicsEventMustWait(evUser->ppendsem);
/*
@@ -1093,15 +1033,20 @@ static void event_task (void *pParm)
}
evUser->extraLaborBusy = FALSE;
for ( ev_que = &evUser->firstque; ev_que;
ev_que = ev_que->nextque ) {
for ( ev_que = &evUser->firstque; ev_que; ev_que = ev_que->nextque ) {
/* unlock during iteration is safe as event_que will not be free'd */
epicsMutexUnlock ( evUser->lock );
event_read (ev_que);
epicsMutexMustLock ( evUser->lock );
}
pendexit = evUser->pendexit;
evUser->pflush_seq++;
epicsMutexUnlock ( evUser->lock );
epicsEventSignal(evUser->pflush_sem);
} while( ! pendexit );
epicsMutexDestroy(evUser->firstque.writelock);