fixed lockup when deleting events in flow control mode

This commit is contained in:
Jeff Hill
2001-08-27 17:15:37 +00:00
parent 9c892d128f
commit c90112a04c

View File

@@ -46,8 +46,8 @@ of this distribution.
#include "dbEvent.h"
#define EVENTSPERQUE 32
#define EVENTQUESIZE (EVENTENTRIES * EVENTSPERQUE)
#define EVENTENTRIES 4 /* the number of que entries for each event */
#define EVENTQUESIZE (EVENTENTRIES * EVENTSPERQUE)
#define EVENTQEMPTY ((struct evSubscrip *)NULL)
/*
@@ -87,8 +87,8 @@ struct event_que {
struct event_user {
struct event_que firstque; /* the first event que */
epicsEventId ppendsem; /* Wait while empty */
epicsEventId pflush_sem; /* wait for flush */
epicsEventId ppendsem; /* Wait while empty */
epicsEventId pflush_sem; /* wait for flush */
OVRFFUNC *overflow_sub; /* called when overflow detect */
void *overflow_arg; /* parameter to above */
@@ -96,31 +96,21 @@ struct event_user {
EXTRALABORFUNC *extralabor_sub;/* off load to event task */
void *extralabor_arg;/* parameter to above */
epicsThreadId taskid; /* event handler task id */
epicsThreadId taskid; /* event handler task id */
unsigned queovr; /* event que overflow count */
unsigned char pendexit; /* exit pend task */
unsigned char extra_labor; /* if set call extra labor func */
unsigned char flowCtrlMode; /* replace existing monitor */
void (*init_func)();
epicsThreadId init_func_arg;
epicsThreadId init_func_arg;
};
/* what to do with unrecoverable errors */
#define abort(S) cantProceed ("dbEvent abort")
/*
* Reliable intertask communication requires copying the current value of the
* channel for later queing so 3 stepper motor steps of 10 each do not turn
* into only 10 or 20 total steps part of the time.
*
* NOTE: locks on this data structure are optimized so a test and set call is
* made first. If the lock is allready set then the task pends on the lock
* pend sem. Test and set call is much faster than a semaphore. See
* LOCKEVUSER.
*/
#define RNGINC(OLD)\
((OLD)+1)>=EVENTQUESIZE ? 0 : ((OLD)+1)
@@ -148,12 +138,16 @@ LOCAL void *dbevEventBlockFreeList;
static char *EVENT_PEND_NAME = "eventTask";
int DBEDEBUG = 0;
static struct evSubscrip canceledEvent;
/*
* db_event_list ()
*/
int epicsShareAPI db_event_list (const char *pname, unsigned level)
int epicsShareAPI db_event_list ( const char *pname, unsigned level )
{
return dbel (pname, level);
return dbel ( pname, level );
}
/*
@@ -380,7 +374,6 @@ dbEventSubscription epicsShareAPI db_add_event (
freeListFree (dbevEventQueueFreeList, tmp_que);
return NULL;
}
tmp_que->nDuplicates = 0u;
ev_que->nextque = tmp_que;
ev_que = tmp_que;
break;
@@ -450,6 +443,27 @@ void epicsShareAPI db_event_disable (dbEventSubscription es)
UNLOCKREC(precord);
}
/*
* event_remove()
* event queue lock _must_ be applied
*/
LOCAL void event_remove ( struct event_que *ev_que,
unsigned short index, struct evSubscrip *placeHolder )
{
struct evSubscrip *pEvent = ev_que->evque[index];
ev_que->evque[index] = placeHolder;
if ( pEvent->npend == 1u ) {
pEvent->pLastLog = NULL;
}
else {
assert ( pEvent->npend > 1u );
assert ( ev_que->nDuplicates >= 1u );
ev_que->nDuplicates--;
}
pEvent->npend--;
}
/*
* DB_CANCEL_EVENT()
*
@@ -462,14 +476,15 @@ void epicsShareAPI db_event_disable (dbEventSubscription es)
*/
void epicsShareAPI db_cancel_event (dbEventSubscription es)
{
struct evSubscrip *pevent = (struct evSubscrip *) es;
struct dbCommon *precord;
struct evSubscrip * pevent = ( struct evSubscrip * ) es;
struct dbCommon * precord;
unsigned short getix;
precord = (struct dbCommon *) pevent->paddr->precord;
precord = ( struct dbCommon * ) pevent->paddr->precord;
LOCKREC(precord);
ellDelete((ELLLIST*)&precord->mlis, &pevent->node);
UNLOCKREC(precord);
LOCKREC ( precord );
ellDelete ( ( ELLLIST* ) & precord->mlis, &pevent->node );
UNLOCKREC ( precord );
/*
* flag the event as canceled by NULLing out the callback handler
@@ -477,23 +492,40 @@ void epicsShareAPI db_cancel_event (dbEventSubscription es)
* make certain that the event isnt being accessed while
* its call back changes
*/
LOCKEVQUE (pevent->ev_que)
LOCKEVQUE ( pevent->ev_que )
pevent->user_sub = NULL;
while (pevent->npend || pevent->callBackInProgress) {
UNLOCKEVQUE(pevent->ev_que)
epicsEventWaitWithTimeout(pevent->ev_que->evUser->pflush_sem, 1.0);
LOCKEVQUE(pevent->ev_que)
}
/*
* Decrement event que quota
* purge this event from the queue
*
* Its better to take this approach rather than waiting
* for the event thread to finish removing this event
* from the queue because the event thread will not
* process if we are in flow control mode. Since blocking
* here will block CA's TCP input queue then a dead lock
* would be possible.
*/
for ( getix = pevent->ev_que->getix;
pevent->ev_que->evque[getix] != EVENTQEMPTY;
getix = RNGINC ( getix ) ) {
if ( pevent->ev_que->evque[getix] == pevent ) {
event_remove ( pevent->ev_que, getix, &canceledEvent );
}
}
assert ( pevent->npend == 0u );
while ( pevent->callBackInProgress ) {
UNLOCKEVQUE ( pevent->ev_que )
epicsEventMustWait ( pevent->ev_que->evUser->pflush_sem );
LOCKEVQUE ( pevent->ev_que )
}
pevent->ev_que->quota -= EVENTENTRIES;
UNLOCKEVQUE (pevent->ev_que)
UNLOCKEVQUE ( pevent->ev_que )
freeListFree (dbevEventBlockFreeList, pevent);
freeListFree ( dbevEventBlockFreeList, pevent );
return;
}
@@ -738,82 +770,64 @@ void epicsShareAPI db_post_single_event (dbEventSubscription es)
/*
* EVENT_READ()
*/
LOCAL int event_read (struct event_que *ev_que)
LOCAL int event_read ( struct event_que *ev_que )
{
struct evSubscrip *event;
unsigned int nextgetix;
db_field_log *pfl;
void (*user_sub) (void *user_arg, struct dbAddr *paddr,
int eventsRemaining, db_field_log *pfl);
void ( *user_sub ) ( void *user_arg, struct dbAddr *paddr,
int eventsRemaining, db_field_log *pfl );
if ( DBEDEBUG > 0 && EVENTQUESIZE - RNGSPACE( ev_que ) ) {
printf ( "processing event queue %p with %u occupied entries\n",
ev_que, EVENTQUESIZE - RNGSPACE( ev_que ) );
}
/*
* evUser ring buffer must be locked for the multiple
* threads writing/reading it
*/
LOCKEVQUE(ev_que)
LOCKEVQUE ( ev_que )
/*
* if in flow control mode drain duplicates and then
* suspend processing events until flow control
* mode is over
*/
if (ev_que->evUser->flowCtrlMode && ev_que->nDuplicates==0u) {
if ( ev_que->evUser->flowCtrlMode && ev_que->nDuplicates == 0u ) {
UNLOCKEVQUE(ev_que);
return DB_EVENT_OK;
if ( DBEDEBUG > 0 ) {
printf ( "ignoring queue due to flow control mode being active\n" );
}
return DB_EVENT_OK;
}
/*
* Fetch fast register copy
*/
for( event=ev_que->evque[ev_que->getix];
(event) != EVENTQEMPTY;
ev_que->getix = nextgetix, event = ev_que->evque[nextgetix]){
while ( ev_que->evque[ev_que->getix] != EVENTQEMPTY ) {
db_field_log fl = ev_que->valque[ev_que->getix];
struct evSubscrip *event = ev_que->evque[ev_que->getix];
/*
* So I can tell em if more are comming
*/
nextgetix = RNGINC(ev_que->getix);
if ( event == &canceledEvent ) {
ev_que->evque[ev_que->getix] = EVENTQEMPTY;
ev_que->getix = RNGINC ( ev_que->getix );
continue;
}
/*
* Simple type values queued up for reliable interprocess
* communication. (for other types they get whatever happens
* to be there upon wakeup)
*/
if (event->valque) {
if ( event->valque ) {
pfl = &fl;
}
else {
pfl = NULL;
}
ev_que->evque[ev_que->getix] = EVENTQEMPTY;
event_remove ( ev_que, ev_que->getix, EVENTQEMPTY );
ev_que->getix = RNGINC ( ev_que->getix );
/*
* remove event from the queue
*/
if (event->npend==1u) {
event->pLastLog = NULL;
if ( DBEDEBUG > 1 ) {
printf ( "processing event %p\n", (void *) event );
}
else {
assert (event->npend>1u);
assert (ev_que->nDuplicates>=1u);
ev_que->nDuplicates--;
}
/*
* this provides a way to test to see if an event is in use
* despite the fact that the event queue does not point to this event
*/
event->callBackInProgress = TRUE;
/*
* it is essential that the npend count is not lowered
* before the callBackInProgress flag is set
*/
event->npend--;
/*
* create a local copy of the call back parameters while
@@ -831,19 +845,20 @@ LOCAL int event_read (struct event_que *ev_que)
* record lock, and it is calling db_post_events() waiting
* for the event queue lock (which this thread now has).
*/
if (user_sub != NULL) {
UNLOCKEVQUE(ev_que)
(*user_sub) (event->user_arg, event->paddr,
ev_que->evque[nextgetix]?TRUE:FALSE, pfl);
LOCKEVQUE(ev_que)
if ( user_sub ) {
/*
* This provides a way to test to see if an event is in use
* despite the fact that the event queue does not point to
* it.
*/
event->callBackInProgress = TRUE;
UNLOCKEVQUE ( ev_que )
( *user_sub ) ( event->user_arg, event->paddr,
ev_que->evque[ev_que->getix] != EVENTQEMPTY, pfl );
LOCKEVQUE ( ev_que )
event->callBackInProgress = FALSE;
}
/*
* this provides a way to test to see if an event is in use
* despite the fact that the event queue does not point to this event
*/
event->callBackInProgress = FALSE;
/*
* check to see if this event has been canceled each
* time that the callBackInProgress flag is set to false
@@ -851,12 +866,12 @@ LOCAL int event_read (struct event_que *ev_que)
* complete sem if there are no longer any events on the
* queue
*/
if (event->user_sub==NULL && event->npend==0u) {
epicsEventSignal (ev_que->evUser->pflush_sem);
if ( event->user_sub==NULL && event->npend==0u ) {
epicsEventSignal ( ev_que->evUser->pflush_sem );
}
}
UNLOCKEVQUE(ev_que)
UNLOCKEVQUE ( ev_que )
return DB_EVENT_OK;
}
@@ -1003,6 +1018,6 @@ void epicsShareAPI db_event_flow_ctrl_mode_off (dbEventCtx ctx)
#ifdef DEBUG
printf("fc off %lu\n", tickGet());
#endif
}
}