From 0b09d90b122b1d0a471adaea7da28cc458a5f13e Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Mon, 13 Jan 1997 23:13:50 +0000 Subject: [PATCH] fixed problems associated with events lost msg --- src/db/dbEvent.c | 296 ++++++++++++++++++++++++----------------------- src/db/dbEvent.h | 4 +- 2 files changed, 152 insertions(+), 148 deletions(-) diff --git a/src/db/dbEvent.c b/src/db/dbEvent.c index eca45387a..43872dd71 100644 --- a/src/db/dbEvent.c +++ b/src/db/dbEvent.c @@ -60,9 +60,11 @@ * joh 19 080393 fixed sem timeout while waiting for marker bug * joh 20 080393 ANSI C changes * joh 21 080393 added task watch dog + * joh 22 011397 designed and installed fix for "events lost" + * problem */ -#include +#include "epicsAssert.h" #include #include @@ -76,16 +78,16 @@ #include #include -#include -#include -#include -#include -#include -#include -#include -#include +#include "taskwd.h" +#include "tsDefs.h" +#include "dbDefs.h" +#include "dbCommon.h" +#include "task_params.h" +#include "dbAccess.h" +#include "dbEvent.h" +#include "caeventmask.h" -#include +#include "memDebugLib.h" /* local function declarations */ @@ -111,6 +113,12 @@ LOCAL int event_read(struct event_que *ev_que); #define RNGINC(OLD)\ ((OLD)+1)>=EVENTQUESIZE ? 0 : ((OLD)+1) +#define RNGSPACE(EVQ)\ +( ( ((EVQ)->getix)>((EVQ)->putix) ) ? \ + ( ((EVQ)->getix)-((EVQ)->putix) ) : \ + ( ( EVENTQUESIZE+((EVQ)->getix) )- ((EVQ)->putix) ) \ +) + #define LOCKEVQUE(EV_QUE)\ FASTLOCK(&(EV_QUE)->writelock); @@ -151,15 +159,16 @@ int db_event_list(char *name) pevent; pevent = (struct event_block *) pevent->node.next){ #ifdef DEBUG - printf(" ev %x\n",pevent); - printf(" ev que %x\n",pevent->ev_que); - printf(" ev user %x\n",pevent->ev_que->evuser); + printf(" ev %lx\n", (unsigned long) pevent); + printf(" ev que %lx\n", (unsigned long) pevent->ev_que); + printf(" ev user %lx\n", (unsigned long) pevent->ev_que->evuser); #endif - printf( "task %x select %x pfield %x behind by %ld\n", + printf( "task %x select %x pfield %lx behind by %ld\n", pevent->ev_que->evuser->taskid, pevent->select, - (unsigned int) pevent->paddr->pfield, + (unsigned long) pevent->paddr->pfield, pevent->npend); + printf("ring space %u\n", RNGSPACE(pevent->ev_que)); } UNLOCKREC(precord); @@ -307,11 +316,12 @@ struct event_block *pevent /* ptr to event blk (not required) */ return ERROR; } - pevent->npend = 0; + pevent->npend = 0ul; pevent->user_sub = user_sub; pevent->user_arg = user_arg; pevent->paddr = paddr; pevent->select = select; + pevent->pLastLog = NULL; /* not yet in the queue */ ev_que->quota += EVENTENTRIES; pevent->ev_que = ev_que; @@ -437,7 +447,7 @@ int db_cancel_event(struct event_block *pevent) flush_event = *pevent; flush_event.user_sub = wake_cancel; flush_event.user_arg = &pevu->pflush_sem; - flush_event.npend = 0; + flush_event.npend = 0ul; if(db_post_single_event(&flush_event)==OK){ /* * insure that the event is @@ -557,64 +567,6 @@ int db_post_extra_labor(struct event_user *evuser) return OK; } - -/* - * DB_POST_SINGLE_EVENT() - * - * - */ -int db_post_single_event(struct event_block *pevent) -{ - struct event_que *ev_que = pevent->ev_que; - int success = FALSE; - struct dbCommon *precord; - unsigned int putix; - - precord = (struct dbCommon *) pevent->paddr->precord; - - /* - * evuser ring buffer must be locked for the multiple threads writing - * to it - */ - LOCKEVQUE(ev_que) - putix = ev_que->putix; - - /* add to task local event que */ - if(ev_que->evque[putix] == EVENTQEMPTY){ - short sevr; - - pevent->npend++; - ev_que->evque[putix] = pevent; - if(pevent->valque) { - ev_que->valque[putix].stat = precord->stat; - sevr = precord->sevr; - ev_que->valque[putix].sevr = sevr; - ev_que->valque[putix].time = precord->time; - /* - * use memcpy to avoid a bus error on - * union copy of char in the db at an odd - * address - */ - memcpy( (char *)&ev_que->valque[putix].field, - pevent->paddr->pfield, - pevent->paddr->field_size); - } - /* notify the event handler */ - semGive(ev_que->evuser->ppendsem); - ev_que->putix = RNGINC(putix); - success = TRUE; - } - else - ev_que->evuser->queovr++; - - UNLOCKEVQUE(ev_que) - - if(success) - return OK; - else - return ERROR; -} - /* * DB_POST_EVENTS() @@ -628,10 +580,7 @@ unsigned int select ) { struct dbCommon *precord = (struct dbCommon *)prec; - union native_value *pvalue = (union native_value *)pval; struct event_block *event; - struct event_que *ev_que; - unsigned int putix; if (precord->mlis.count == 0) return OK; /* no monitors set */ @@ -640,58 +589,16 @@ unsigned int select for( event = (struct event_block *) precord->mlis.node.next; event; event = (struct event_block *) event->node.next){ - - ev_que = event->ev_que; - + /* * Only send event msg if they are waiting on the field which - * changed or pvalue==NULL and waiting on alarms and alarms changed + * changed or pval==NULL and waiting on alarms and alarms changed */ - if( ((event->paddr->pfield == (void *)pvalue) && (select & event->select)) - ||(pvalue==NULL && (select==DBE_ALARM && event->select==DBE_ALARM)) ){ + if ( (event->paddr->pfield == (void *)pval || pval==NULL) && + (select & event->select)) { - /* - * evuser ring buffer must be locked for the multiple - * threads writing to it - */ - - LOCKEVQUE(ev_que) - putix = ev_que->putix; - - /* add to task local event que */ - if(ev_que->evque[putix] == EVENTQEMPTY){ - short sevr; - - event->npend++; - ev_que->evque[putix] = event; - if(event->valque) { - ev_que->valque[putix].stat = precord->stat; - sevr = precord->sevr; - ev_que->valque[putix].sevr = sevr; - ev_que->valque[putix].time = precord->time; - - /* - * use memcpy to avoid a bus error on - * union copy of char in the db at an odd - * address - */ - memcpy( (char *)&ev_que->valque[putix].field, - (char *)event->paddr->pfield, - event->paddr->field_size); - - } - /* notify the event handler */ - semGive(ev_que->evuser->ppendsem); - - ev_que->putix = RNGINC(putix); - } - else - ev_que->evuser->queovr++; - - UNLOCKEVQUE(ev_que) - - } - + db_post_single_event(event); + } } UNLOCKREC(precord); @@ -699,6 +606,97 @@ unsigned int select } + +/* + * DB_POST_SINGLE_EVENT() + */ +int db_post_single_event(struct event_block *event) +{ + struct event_que *ev_que; + db_field_log *pLog; + int updateFlag; + + ev_que = event->ev_que; + + /* + * evuser ring buffer must be locked for the multiple + * threads writing/reading it + */ + + LOCKEVQUE(ev_que) + + /* add to task local event que */ + if ( ev_que->evque[ev_que->putix] == EVENTQEMPTY && + (RNGSPACE(ev_que)>EVENTSPERQUE || event->pLastLog==NULL) ) { + + pLog = &ev_que->valque[ev_que->putix]; + ev_que->evque[ev_que->putix] = event; + event->npend++; + ev_que->putix = RNGINC(ev_que->putix); + updateFlag = 1; + } + else if (event->pLastLog) { + /* + * replace last event if no space is left + */ + pLog = event->pLastLog; + /* + * the event task has already been notified about + * this (and isnt allowed to consume it while the + * lock is on) so we dont need to post the semaphore + */ + updateFlag = 0; + } + else { + /* + * this should never occur if this is bug free + */ + ev_que->evuser->queovr++; + pLog = NULL; + updateFlag = 0; + } + + if (pLog && event->valque) { + struct dbCommon *precord = event->paddr->precord; + pLog->stat = precord->stat; + pLog->sevr = precord->sevr; + pLog->time = precord->time; + + /* + * use memcpy to avoid a bus error on + * union copy of char in the db at an odd + * address + */ + memcpy( (char *)&pLog->field, + (char *)event->paddr->pfield, + event->paddr->field_size); + + event->pLastLog = pLog; + } + + UNLOCKEVQUE(ev_que) + + /* + * its more efficent to notify the event handler + * only after the event is ready and the lock + * is off in case it runs at a higher priority + * than the caller here. + */ + if (updateFlag) { + /* + * notify the event handler + */ + semGive(ev_que->evuser->ppendsem); + } + + if (pLog) { + return OK; + } + else { + return ERROR; + } +} + /* * DB_START_EVENTS() @@ -770,10 +768,6 @@ int init_func_arg if(init_func) (*init_func)(init_func_arg); - /* - * No need to lock getix as I only allow one thread to call this - * routine at a time - */ do{ semTake(evuser->ppendsem, WAIT_FOREVER); @@ -883,30 +877,27 @@ int init_func_arg LOCAL int event_read(struct event_que *ev_que) { struct event_block *event; - unsigned int getix; unsigned int nextgetix; db_field_log *pfl; + /* + * evuser ring buffer must be locked for the multiple + * threads writing/reading it + */ + LOCKEVQUE(ev_que) /* * Fetch fast register copy */ - getix = ev_que->getix; - - /* - * No need to lock getix as I only allow one thread to call this - * routine at a time - */ - - for( event=ev_que->evque[getix]; + for( event=ev_que->evque[ev_que->getix]; (event) != EVENTQEMPTY; - getix = nextgetix, event = ev_que->evque[nextgetix]){ + ev_que->getix = nextgetix, event = ev_que->evque[nextgetix]){ /* * So I can tell em if more are comming */ - nextgetix = RNGINC(getix); + nextgetix = RNGINC(ev_que->getix); /* @@ -915,29 +906,40 @@ LOCAL int event_read(struct event_que *ev_que) * to be there upon wakeup) */ if(event->valque) - pfl = &ev_que->valque[getix]; + pfl = &ev_que->valque[ev_que->getix]; else pfl = NULL; /* * Next event pointer can be used by event tasks to determine * if more events are waiting in the queue + * + * Must remove the lock here so that we dont deadlock if + * this calls dbGetField() and blocks on the record lock, + * dbPutField() is in progress in another task, it has the + * record lock, and it is calling db_post_events() waiting + * for the event queue lock (which this thread now has). */ + UNLOCKEVQUE(ev_que) (*event->user_sub)( event->user_arg, event->paddr, ev_que->evque[nextgetix]?TRUE:FALSE, pfl); + LOCKEVQUE(ev_que) - ev_que->evque[getix] = (struct event_block *) NULL; + ev_que->evque[ev_que->getix] = EVENTQEMPTY; + /* + * remove event from the queue + */ + if (event->npend<=1ul) { + event->pLastLog = NULL; + } event->npend--; } - /* - * Store back to RAM copy - */ - ev_que->getix = getix; + UNLOCKEVQUE(ev_que) return OK; } diff --git a/src/db/dbEvent.h b/src/db/dbEvent.h index 12563bf5b..881f3d46e 100644 --- a/src/db/dbEvent.h +++ b/src/db/dbEvent.h @@ -52,6 +52,7 @@ struct event_block{ db_field_log *pfl); void *user_arg; struct event_que *ev_que; + db_field_log *pLastLog; unsigned char select; char valque; unsigned long npend; /* n times this event is on the que */ @@ -64,7 +65,8 @@ typedef void EVENTFUNC( db_field_log *pfl); -#define EVENTQUESIZE EVENTENTRIES *32 +#define EVENTSPERQUE 32 +#define EVENTQUESIZE (EVENTENTRIES * EVENTSPERQUE) #define EVENTENTRIES 16 /* the number of que entries for each event */ #define EVENTQEMPTY ((struct event_block *)NULL)