diff --git a/src/ioc/db/chfPlugin.c b/src/ioc/db/chfPlugin.c index f32eb846e..d0ef74ab4 100644 --- a/src/ioc/db/chfPlugin.c +++ b/src/ioc/db/chfPlugin.c @@ -367,7 +367,6 @@ static parse_result parse_end(chFilter *filter) { chfPlugin *p = (chfPlugin*) filter->plug->puser; chfFilter *f = (chfFilter*) filter->puser; - const chfPluginArgDef *cur; int i; /* Check if all required arguments were supplied */ @@ -481,6 +480,16 @@ static long channel_open(chFilter *filter) else return 0; } +static long channel_register_pre_eventq_cb(chFilter *filter, chPostEventFunc* cb_in, void *arg_in, + chPostEventFunc **cb_out, void **arg_out) +{ + chfPlugin *p = (chfPlugin*) filter->plug->puser; + chfFilter *f = (chfFilter*) filter->puser; + + if (p->pif->channelRegisterPreEventQueCB) return p->pif->channelRegisterPreEventQueCB(filter->chan, f->puser, cb_in, arg_in, cb_out, arg_out); + else return -1; +} + static void channel_report(chFilter *filter, const char *intro, int level) { chfPlugin *p = (chfPlugin*) filter->plug->puser; @@ -523,6 +532,7 @@ static chFilterIf wrapper_fif = { NULL, /* parse_end_array, */ channel_open, + channel_register_pre_eventq_cb, channel_report, channel_close }; diff --git a/src/ioc/db/chfPlugin.h b/src/ioc/db/chfPlugin.h index 43669c4a5..07c9ff069 100644 --- a/src/ioc/db/chfPlugin.h +++ b/src/ioc/db/chfPlugin.h @@ -14,6 +14,8 @@ #include #include +struct db_field_log; + /* Based on the linkoptions utility by Michael Davidsaver (BNL) */ /** @file chfPlugin.h @@ -121,15 +123,40 @@ typedef struct chfPluginIf { /** @brief Open channel. * * Called as part of the channel connection setup. + * * @param chan dbChannel for which the connection is being made. * @param pvt Pointer to private structure. * @return 0 for success, -1 if operation is to be aborted. */ long (* channel_open) (dbChannel *chan, void *pvt); + /** @brief Register callback for pre-event-queue post_event operation. + * + * Called as part of the channel connection setup. + * + * This function is called to establish the stack of plugins that an event + * is passed through between the database and the event queue. + * + * The plugin must call the supplied 'cb_in' function (usually within + * its own event callback) with 'arg_in' as first argument to forward the + * data to the next plugin towards the event queue. + * + * @param chan dbChannel for which the connection is being made. + * @param pvt Pointer to private structure. + * @param cb_in Pointer to the next plugin's event callback + * @param arg_in Argument that must be supplied when this plugin calls + * next plugin's callback + * @param cb_out Pointer to this plugin's event callback + * @param arg_out Argument that must be supplied when calling this plugin's callback + * @return 0 for success (cb_out and arg_out written), -1 if callback not required + */ + long (* channelRegisterPreEventQueCB) (dbChannel *chan, void *pvt, chPostEventFunc* cb_in, void *arg_in, + chPostEventFunc **cb_out, void **arg_out ); + /** @brief Channel report request. * * Called as part of show... routines. + * * @param chan dbChannel for which the report is requested. * @param pvt Pointer to private structure. * @param intro Line header string to be printed at the beginning of each line. diff --git a/src/ioc/db/dbChannel.c b/src/ioc/db/dbChannel.c index ee8ac34f6..2af5f3895 100644 --- a/src/ioc/db/dbChannel.c +++ b/src/ioc/db/dbChannel.c @@ -12,6 +12,7 @@ #include "dbChannel.h" #include "dbCommon.h" #include "dbBase.h" +#include "dbEvent.h" #include "link.h" #include "dbAccessDefs.h" #include "dbLock.h" @@ -413,14 +414,32 @@ long dbChannelOpen(dbChannel *chan) { chFilter *filter; long status = 0; + chPostEventFunc *nextcb = db_post_single_event_final; + void *nextarg = NULL; + chPostEventFunc *thiscb = NULL; + void *thisarg = NULL; filter = (chFilter *) ellFirst(&chan->filters); while (filter) { status = filter->plug->fif->channel_open(filter); - if (status) - break; + if (status) goto finish; filter = (chFilter *) ellNext(&filter->node); } + + /* Build up the pre-event-queue chain */ + filter = (chFilter *) ellLast(&chan->filters); + while (filter) { + long status = filter->plug->fif->channel_register_pre_eventq_cb(filter, nextcb, nextarg, &thiscb, &thisarg); + if (status == 0) { + nextcb = thiscb; + nextarg = thisarg; + } + filter = (chFilter *) ellPrevious(&filter->node); + } + chan->post_event_cb = nextcb; + chan->post_event_arg = nextarg; + +finish: return status; } diff --git a/src/ioc/db/dbChannel.h b/src/ioc/db/dbChannel.h index 1539ee038..ce3814e0c 100644 --- a/src/ioc/db/dbChannel.h +++ b/src/ioc/db/dbChannel.h @@ -14,20 +14,45 @@ #include "epicsTypes.h" #include "errMdef.h" #include "shareLib.h" +#include "db_field_log.h" +#include "dbEvent.h" #ifdef __cplusplus extern "C" { #endif +/* + * event subscription + */ +typedef struct evSubscrip { + ELLNODE node; + struct dbChannel *chan; + EVENTFUNC *user_sub; + void *user_arg; + struct event_que *ev_que; + db_field_log **pLastLog; + unsigned long npend; /* n times this event is on the queue */ + unsigned long nreplace; /* n times replacing event on the queue */ + unsigned char select; + char useValque; + char callBackInProgress; + char enabled; +} evSubscrip; + +typedef struct chFilter chFilter; + +/* Prototype for the post event function that is called recursively in filter stacks */ +typedef void (chPostEventFunc)(void *pvt, struct evSubscrip *event, db_field_log *pLog); + /* A dbChannel points to a record field, and can have multiple filters */ typedef struct dbChannel { const char *name; dbAddr addr; + chPostEventFunc *post_event_cb; + void *post_event_arg; ELLLIST filters; } dbChannel; -typedef struct chFilter chFilter; - /* Return values from chFilterIf->parse_* routines: */ typedef enum { parse_stop, parse_continue @@ -68,6 +93,7 @@ typedef struct chFilterIf { /* Channel operations: */ long (* channel_open)(chFilter *filter); + long (* channel_register_pre_eventq_cb)(chFilter *filter, chPostEventFunc *cb_in, void *arg_in, chPostEventFunc **cb_out, void**arg_out); void (* channel_report)(chFilter *filter, const char *intro, int level); /* FIXME: More filter routines here ... */ void (* channel_close)(chFilter *filter); diff --git a/src/ioc/db/dbEvent.c b/src/ioc/db/dbEvent.c index 3c80031a0..66b96b982 100644 --- a/src/ioc/db/dbEvent.c +++ b/src/ioc/db/dbEvent.c @@ -49,24 +49,6 @@ #define EVENTQUESIZE (EVENTENTRIES * EVENTSPERQUE) #define EVENTQEMPTY ((struct evSubscrip *)NULL) -/* - * event subscruiption - */ -struct evSubscrip { - ELLNODE node; - struct dbChannel *chan; - EVENTFUNC *user_sub; - void *user_arg; - struct event_que *ev_que; - db_field_log *pLastLog; - unsigned long npend; /* n times this event is on the queue */ - unsigned long nreplace; /* n times replacing event on the queue */ - unsigned char select; - char useValque; - char callBackInProgress; - char enabled; -}; - /* * really a ring buffer */ @@ -487,6 +469,7 @@ void epicsShareAPI db_event_disable (dbEventSubscription event) /* * event_remove() * event queue lock _must_ be applied + * this nulls the entry in the queue, but doesn't delete the db_field_log chunk */ static void event_remove ( struct event_que *ev_que, unsigned short index, struct evSubscrip *placeHolder ) @@ -494,6 +477,7 @@ static void event_remove ( struct event_que *ev_que, struct evSubscrip * const pevent = ev_que->evque[index]; ev_que->evque[index] = placeHolder; + ev_que->valque[index] = NULL; if ( pevent->npend == 1u ) { pevent->pLastLog = NULL; } @@ -637,17 +621,48 @@ int epicsShareAPI db_post_extra_labor (dbEventCtx ctx) } /* - * DB_POST_SINGLE_EVENT_PRIVATE() + * DB_POST_SINGLE_EVENT_FIRST() * * NOTE: This assumes that the db scan lock is already applied + * (as it copies data from the record) */ -static void db_post_single_event_private (struct evSubscrip *event) +db_field_log* db_post_single_event_first (struct evSubscrip *pevent) { - struct event_que * const ev_que = event->ev_que; db_field_log *pLog = NULL; + + if (pevent->useValque) { + pLog = (db_field_log *) freeListCalloc(dbevFieldLogFreeList); + if (pLog) { + struct dbChannel *chan = pevent->chan; + struct dbCommon *prec = dbChannelRecord(chan); + pLog->stat = prec->stat; + pLog->sevr = prec->sevr; + pLog->time = prec->time; + + /* + * use memcpy to avoid a bus error on + * union copy of char in the db at an odd + * address + */ + memcpy(&pLog->field, + dbChannelField(chan), + dbChannelElementSize(chan)); + } + } + return pLog; +} + +/* + * DB_POST_SINGLE_EVENT_FINAL() + * + */ +void db_post_single_event_final (void *pvt, evSubscrip *pevent, db_field_log *pLog) +{ + struct event_que *ev_que; int firstEventFlag; unsigned rngSpace; + ev_que = pevent->ev_que; /* * evUser ring buffer must be locked for the multiple * threads writing/reading it @@ -662,7 +677,7 @@ static void db_post_single_event_private (struct evSubscrip *event) * events (saving them without the current valuye * serves no purpose) */ - if (!event->useValque && event->npend>0u) { + if (!pevent->useValque && pevent->npend>0u) { UNLOCKEVQUE (ev_que) return; } @@ -677,13 +692,16 @@ static void db_post_single_event_private (struct evSubscrip *event) * then replace the last event on the queue (for this monitor) */ rngSpace = ringSpace ( ev_que ); - if ( event->npend>0u && + if ( pevent->npend>0u && (ev_que->evUser->flowCtrlMode || rngSpace<=EVENTSPERQUE) ) { /* * replace last event if no space is left */ - pLog = event->pLastLog; - event->nreplace++; + if (*pevent->pLastLog) { + freeListFree(dbevFieldLogFreeList, *pevent->pLastLog); + *pevent->pLastLog = pLog; + } + pevent->nreplace++; /* * the event task has already been notified about * this so we dont need to post the semaphore @@ -691,16 +709,18 @@ static void db_post_single_event_private (struct evSubscrip *event) firstEventFlag = 0; } /* - * Otherwise, the current entry must be available. + * Otherwise, the current entry must be available. * Fill it in and advance the ring buffer. */ else { assert ( ev_que->evque[ev_que->putix] == EVENTQEMPTY ); - ev_que->evque[ev_que->putix] = event; - if (event->npend>0u) { + ev_que->evque[ev_que->putix] = pevent; + ev_que->valque[ev_que->putix] = pLog; + pevent->pLastLog = &ev_que->valque[ev_que->putix]; + if (pevent->npend>0u) { ev_que->nDuplicates++; } - event->npend++; + pevent->npend++; /* * if the ring buffer was empty before * adding this event @@ -714,30 +734,6 @@ static void db_post_single_event_private (struct evSubscrip *event) ev_que->putix = RNGINC ( ev_que->putix ); } - if (event->useValque) { - if (!pLog) { - pLog = (db_field_log *) freeListCalloc(dbevFieldLogFreeList); - } - if (pLog) { - struct dbChannel *chan = event->chan; - struct dbCommon *prec = dbChannelRecord(chan); - pLog->stat = prec->stat; - pLog->sevr = prec->sevr; - pLog->time = prec->time; - - /* - * use memcpy to avoid a bus error on - * union copy of char in the db at an odd - * address - */ - memcpy(& pLog->field, - dbChannelField(chan), - dbChannelElementSize(chan)); - - event->pLastLog = pLog; - } - } - UNLOCKEVQUE (ev_que) /* @@ -782,7 +778,10 @@ unsigned int caEventMask */ if ( (dbChannelField(pevent->chan) == (void *)pField || pField==NULL) && (caEventMask & pevent->select)) { - db_post_single_event_private (pevent); + /* Call the head of the filter chain */ + pevent->chan->post_event_cb(pevent->chan->post_event_arg, + pevent, + db_post_single_event_first(pevent)); } } @@ -800,7 +799,12 @@ void epicsShareAPI db_post_single_event (dbEventSubscription event) struct dbCommon * const prec = dbChannelRecord(pevent->chan); dbScanLock (prec); - db_post_single_event_private (pevent); + + /* Call the head of the filter chain */ + pevent->chan->post_event_cb(pevent->chan->post_event_arg, + pevent, + db_post_single_event_first(pevent)); + dbScanUnlock (prec); } @@ -835,6 +839,10 @@ static int event_read ( struct event_que *ev_que ) if ( pevent == &canceledEvent ) { ev_que->evque[ev_que->getix] = EVENTQEMPTY; + if (ev_que->valque[ev_que->getix]) { + freeListFree(dbevFieldLogFreeList, ev_que->valque[ev_que->getix]); + ev_que->valque[ev_que->getix] = NULL; + } ev_que->getix = RNGINC ( ev_que->getix ); assert ( ev_que->nCanceled > 0 ); ev_que->nCanceled--; diff --git a/src/ioc/db/dbEvent.h b/src/ioc/db/dbEvent.h index d93e31594..9c46b5f56 100644 --- a/src/ioc/db/dbEvent.h +++ b/src/ioc/db/dbEvent.h @@ -36,6 +36,7 @@ extern "C" { struct dbChannel; struct db_field_log; +struct evSubscrip; epicsShareFunc int epicsShareAPI db_event_list ( const char *name, unsigned level); @@ -71,6 +72,7 @@ epicsShareFunc void epicsShareAPI db_cancel_event (dbEventSubscription es); epicsShareFunc void epicsShareAPI db_post_single_event (dbEventSubscription es); epicsShareFunc void epicsShareAPI db_event_enable (dbEventSubscription es); epicsShareFunc void epicsShareAPI db_event_disable (dbEventSubscription es); +epicsShareFunc void epicsShareAPI db_post_single_event_final (void *pvt, struct evSubscrip *pevent, struct db_field_log *pLog); #define DB_EVENT_OK 0 #define DB_EVENT_ERROR (-1) diff --git a/src/ioc/db/db_field_log.h b/src/ioc/db/db_field_log.h index 2012f49f6..c38830bd9 100644 --- a/src/ioc/db/db_field_log.h +++ b/src/ioc/db/db_field_log.h @@ -14,6 +14,8 @@ #ifndef INCLdb_field_logh #define INCLdb_field_logh +#include "epicsTime.h" + #ifdef __cplusplus extern "C" { #endif @@ -49,7 +51,7 @@ typedef struct db_field_log { unsigned short sevr; /* Alarm Severity */ epicsTimeStamp time; /* time stamp */ union native_value field; /* field value */ -}db_field_log; +} db_field_log; #ifdef __cplusplus } diff --git a/src/ioc/db/test/chfPluginTest.c b/src/ioc/db/test/chfPluginTest.c index 95f0a3c79..564a81302 100644 --- a/src/ioc/db/test/chfPluginTest.c +++ b/src/ioc/db/test/chfPluginTest.c @@ -43,6 +43,8 @@ typedef struct myStruct { int sent6; char c; char c1[2]; + chPostEventFunc *callback; + void *arg; } myStruct; static const @@ -164,6 +166,15 @@ static long channel_open(dbChannel *chan, void *user) return c_open_return; } +static long channelRegisterPreEventQueCB(dbChannel *chan, void *user, chPostEventFunc *cb_in, void* arg_in, + chPostEventFunc **cb_out, void **arg_out) +{ + myStruct *my = (myStruct*)user; + my->callback = cb_in; + my->arg = arg_in; + return 0; +} + static void channel_report(dbChannel *chan, void *user, const char *intro, int level) { testOk(e & e_report, "channel_report called"); @@ -186,6 +197,7 @@ static chfPluginIf myPif = { parse_ok, channel_open, + channelRegisterPreEventQueCB, channel_report, channel_close }; diff --git a/src/ioc/db/test/dbChannelTest.c b/src/ioc/db/test/dbChannelTest.c index 68195a288..352eb2adb 100644 --- a/src/ioc/db/test/dbChannelTest.c +++ b/src/ioc/db/test/dbChannelTest.c @@ -26,8 +26,9 @@ #define e_start_array 0x00000800 #define e_end_array 0x00001000 #define e_open 0x00002000 -#define e_report 0x00004000 -#define e_close 0x00008000 +#define e_reg_pre_cb 0x00004000 +#define e_report 0x00008000 +#define e_close 0x00010000 #define r_any (e_start | e_abort | e_end | \ e_null | e_boolean | e_integer | e_double | e_string | \ @@ -114,6 +115,11 @@ long c_open(chFilter *filter) testOk(e & e_open, "channel_open called"); return 0; } +long c_reg_pre_cb(chFilter *filter, chPostEventFunc *cb_in, void *arg_in, chPostEventFunc **cb_out, void **arg_out) +{ + testOk(e & e_reg_pre_cb, "channel_register_pre_event_queue_callback called"); + return 0; +} void c_report(chFilter *filter, const char *intro, int level) { testOk(e & e_report, "channel_report called, level = %d", level); @@ -126,7 +132,7 @@ void c_close(chFilter *filter) chFilterIf testIf = { p_start, p_abort, p_end, p_null, p_boolean, p_integer, p_double, p_string, p_start_map, p_map_key, p_end_map, p_start_array, p_end_array, - c_open, c_report, c_close }; + c_open, c_reg_pre_cb, c_report, c_close }; MAIN(dbChannelTest) {