fixed event clear while flow control is running deadlock
This commit is contained in:
115
src/db/dbEvent.c
115
src/db/dbEvent.c
@@ -91,6 +91,7 @@
|
||||
|
||||
#include "memDebugLib.h"
|
||||
|
||||
static struct event_block canceledEvent;
|
||||
|
||||
/* local function declarations */
|
||||
|
||||
@@ -420,6 +421,27 @@ int db_event_disable(struct event_block *pevent)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* event_remove()
|
||||
* event queue lock _must_ be applied
|
||||
*/
|
||||
LOCAL void event_remove ( struct event_que *ev_que,
|
||||
unsigned short index, struct event_block *placeHolder )
|
||||
{
|
||||
struct event_block *pEvent = ev_que->evque[index];
|
||||
|
||||
ev_que->evque[index] = placeHolder;
|
||||
if ( pEvent->npend == 1u ) {
|
||||
pEvent->pLastLog = NULL;
|
||||
}
|
||||
else {
|
||||
assert ( pEvent->npend > 1u );
|
||||
assert ( ev_que->nDuplicates >= 1u );
|
||||
ev_que->nDuplicates--;
|
||||
}
|
||||
pEvent->npend--;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DB_CANCEL_EVENT()
|
||||
@@ -433,6 +455,7 @@ int db_event_disable(struct event_block *pevent)
|
||||
*/
|
||||
int db_cancel_event(struct event_block *pevent)
|
||||
{
|
||||
unsigned getix;
|
||||
struct dbCommon *precord;
|
||||
int status;
|
||||
|
||||
@@ -452,18 +475,40 @@ int db_cancel_event(struct event_block *pevent)
|
||||
* its call back changes
|
||||
*/
|
||||
LOCKEVQUE(pevent->ev_que)
|
||||
|
||||
pevent->user_sub = NULL;
|
||||
while (pevent->npend || pevent->callBackInProgress) {
|
||||
UNLOCKEVQUE(pevent->ev_que)
|
||||
|
||||
/*
|
||||
* purge this event from the queue
|
||||
*
|
||||
* Its better to take this approach rather than waiting
|
||||
* for the event thread to finish removing this event
|
||||
* from the queue because the event thread will not
|
||||
* process if we are in flow control mode. Since blocking
|
||||
* here will block CA's TCP input queue then a dead lock
|
||||
* would be possible.
|
||||
*/
|
||||
for ( getix = pevent->ev_que->getix;
|
||||
pevent->ev_que->evque[getix] != EVENTQEMPTY;
|
||||
getix = RNGINC ( getix ) ) {
|
||||
if ( pevent->ev_que->evque[getix] == pevent ) {
|
||||
event_remove ( pevent->ev_que, getix, &canceledEvent );
|
||||
}
|
||||
}
|
||||
assert ( pevent->npend == 0u );
|
||||
|
||||
while ( pevent->callBackInProgress ) {
|
||||
UNLOCKEVQUE ( pevent->ev_que )
|
||||
semTake(pevent->ev_que->evUser->pflush_sem, sysClkRateGet());
|
||||
LOCKEVQUE(pevent->ev_que)
|
||||
}
|
||||
UNLOCKEVQUE(pevent->ev_que)
|
||||
LOCKEVQUE ( pevent->ev_que )
|
||||
}
|
||||
|
||||
/*
|
||||
* Decrement event que quota
|
||||
*/
|
||||
pevent->ev_que->quota -= EVENTENTRIES;
|
||||
pevent->ev_que->quota -= EVENTENTRIES;
|
||||
|
||||
UNLOCKEVQUE(pevent->ev_que)
|
||||
|
||||
return OK;
|
||||
}
|
||||
@@ -916,20 +961,16 @@ int init_func_arg
|
||||
*/
|
||||
LOCAL int event_read (struct event_que *ev_que)
|
||||
{
|
||||
struct event_block *event;
|
||||
unsigned int nextgetix;
|
||||
db_field_log *pfl;
|
||||
void (*user_sub) (void *user_arg, struct dbAddr *paddr,
|
||||
int eventsRemaining, db_field_log *pfl);
|
||||
int status;
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* evUser ring buffer must be locked for the multiple
|
||||
* threads writing/reading it
|
||||
*/
|
||||
LOCKEVQUE(ev_que)
|
||||
LOCKEVQUE(ev_que)
|
||||
|
||||
/*
|
||||
* if in flow control mode drain duplicates and then
|
||||
@@ -944,17 +985,16 @@ LOCAL int event_read (struct event_que *ev_que)
|
||||
/*
|
||||
* Fetch fast register copy
|
||||
*/
|
||||
for( event=ev_que->evque[ev_que->getix];
|
||||
(event) != EVENTQEMPTY;
|
||||
ev_que->getix = nextgetix, event = ev_que->evque[nextgetix]){
|
||||
while ( ev_que->evque[ev_que->getix] != EVENTQEMPTY ) {
|
||||
|
||||
db_field_log fl = ev_que->valque[ev_que->getix];
|
||||
struct event_block *event = ev_que->evque[ev_que->getix];
|
||||
|
||||
/*
|
||||
* So I can tell em if more are comming
|
||||
*/
|
||||
nextgetix = RNGINC(ev_que->getix);
|
||||
|
||||
if ( event == &canceledEvent ) {
|
||||
ev_que->evque[ev_que->getix] = EVENTQEMPTY;
|
||||
ev_que->getix = RNGINC ( ev_que->getix );
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* Simple type values queued up for reliable interprocess
|
||||
@@ -968,31 +1008,8 @@ LOCAL int event_read (struct event_que *ev_que)
|
||||
pfl = NULL;
|
||||
}
|
||||
|
||||
ev_que->evque[ev_que->getix] = EVENTQEMPTY;
|
||||
|
||||
/*
|
||||
* remove event from the queue
|
||||
*/
|
||||
if (event->npend==1u) {
|
||||
event->pLastLog = NULL;
|
||||
}
|
||||
else {
|
||||
assert (event->npend>1u);
|
||||
assert (ev_que->nDuplicates>=1u);
|
||||
ev_que->nDuplicates--;
|
||||
}
|
||||
|
||||
/*
|
||||
* this provides a way to test to see if an event is in use
|
||||
* despite the fact that the event queue does not point to this event
|
||||
*/
|
||||
event->callBackInProgress = TRUE;
|
||||
|
||||
/*
|
||||
* it is essential that the npend count is not lowered
|
||||
* before the callBackInProgress flag is set
|
||||
*/
|
||||
event->npend--;
|
||||
event_remove ( ev_que, ev_que->getix, EVENTQEMPTY );
|
||||
ev_que->getix = RNGINC ( ev_que->getix );
|
||||
|
||||
/*
|
||||
* create a local copy of the call back parameters while
|
||||
@@ -1011,18 +1028,14 @@ LOCAL int event_read (struct event_que *ev_que)
|
||||
* for the event queue lock (which this thread now has).
|
||||
*/
|
||||
if (user_sub != NULL) {
|
||||
event->callBackInProgress = TRUE;
|
||||
UNLOCKEVQUE(ev_que)
|
||||
(*user_sub) (event->user_arg, event->paddr,
|
||||
ev_que->evque[nextgetix]?TRUE:FALSE, pfl);
|
||||
ev_que->evque[ev_que->getix]?TRUE:FALSE, pfl);
|
||||
LOCKEVQUE(ev_que)
|
||||
event->callBackInProgress = FALSE;
|
||||
}
|
||||
|
||||
/*
|
||||
* this provides a way to test to see if an event is in use
|
||||
* despite the fact that the event queue does not point to this event
|
||||
*/
|
||||
event->callBackInProgress = FALSE;
|
||||
|
||||
/*
|
||||
* check to see if this event has been canceled each
|
||||
* time that the callBackInProgress flag is set to false
|
||||
|
||||
Reference in New Issue
Block a user