Add filter/plugin insertion into the high-priority filter list (pre event queue).

* Move evSubscrip from dbEvent.[ch] (private) to dbChannel.h (public).
* Split up db_post_single_event_private into two parts:
  ..._first creates a new db_field_log chunk and copies from db.
  ..._final puts a db_field_log into the event queue.
* Add a typedef for the pre-event-queue recursive callbacks.
* Add a register function for the callback (plus void* arg) to filter and
  plugin interfaces.
* Add the code that builds up the pre-event-queue callback chain.
* Changes to the test files to compile (no tests added yet).
* Fix missing include in db_field_log.h.
This commit is contained in:
Ralph Lange
2012-04-27 13:21:53 -04:00
committed by Michael Davidsaver
parent dd03d19cf2
commit 4dd42383ef
9 changed files with 176 additions and 64 deletions

View File

@@ -367,7 +367,6 @@ static parse_result parse_end(chFilter *filter)
{
chfPlugin *p = (chfPlugin*) filter->plug->puser;
chfFilter *f = (chfFilter*) filter->puser;
const chfPluginArgDef *cur;
int i;
/* Check if all required arguments were supplied */
@@ -481,6 +480,16 @@ static long channel_open(chFilter *filter)
else return 0;
}
static long channel_register_pre_eventq_cb(chFilter *filter, chPostEventFunc* cb_in, void *arg_in,
chPostEventFunc **cb_out, void **arg_out)
{
chfPlugin *p = (chfPlugin*) filter->plug->puser;
chfFilter *f = (chfFilter*) filter->puser;
if (p->pif->channelRegisterPreEventQueCB) return p->pif->channelRegisterPreEventQueCB(filter->chan, f->puser, cb_in, arg_in, cb_out, arg_out);
else return -1;
}
static void channel_report(chFilter *filter, const char *intro, int level)
{
chfPlugin *p = (chfPlugin*) filter->plug->puser;
@@ -523,6 +532,7 @@ static chFilterIf wrapper_fif = {
NULL, /* parse_end_array, */
channel_open,
channel_register_pre_eventq_cb,
channel_report,
channel_close
};

View File

@@ -14,6 +14,8 @@
#include <epicsTypes.h>
#include <dbChannel.h>
struct db_field_log;
/* Based on the linkoptions utility by Michael Davidsaver (BNL) */
/** @file chfPlugin.h
@@ -121,15 +123,40 @@ typedef struct chfPluginIf {
/** @brief Open channel.
*
* <em>Called as part of the channel connection setup.</em>
*
* @param chan dbChannel for which the connection is being made.
* @param pvt Pointer to private structure.
* @return 0 for success, -1 if operation is to be aborted.
*/
long (* channel_open) (dbChannel *chan, void *pvt);
/** @brief Register callback for pre-event-queue post_event operation.
*
* <em>Called as part of the channel connection setup.</em>
*
* This function is called to establish the stack of plugins that an event
* is passed through between the database and the event queue.
*
* The plugin must call the supplied 'cb_in' function (usually within
* its own event callback) with 'arg_in' as first argument to forward the
* data to the next plugin towards the event queue.
*
* @param chan dbChannel for which the connection is being made.
* @param pvt Pointer to private structure.
* @param cb_in Pointer to the next plugin's event callback
* @param arg_in Argument that must be supplied when this plugin calls
* next plugin's callback
* @param cb_out Pointer to this plugin's event callback
* @param arg_out Argument that must be supplied when calling this plugin's callback
* @return 0 for success (cb_out and arg_out written), -1 if callback not required
*/
long (* channelRegisterPreEventQueCB) (dbChannel *chan, void *pvt, chPostEventFunc* cb_in, void *arg_in,
chPostEventFunc **cb_out, void **arg_out );
/** @brief Channel report request.
*
* <em>Called as part of show... routines.</em>
*
* @param chan dbChannel for which the report is requested.
* @param pvt Pointer to private structure.
* @param intro Line header string to be printed at the beginning of each line.

View File

@@ -12,6 +12,7 @@
#include "dbChannel.h"
#include "dbCommon.h"
#include "dbBase.h"
#include "dbEvent.h"
#include "link.h"
#include "dbAccessDefs.h"
#include "dbLock.h"
@@ -413,14 +414,32 @@ long dbChannelOpen(dbChannel *chan)
{
chFilter *filter;
long status = 0;
chPostEventFunc *nextcb = db_post_single_event_final;
void *nextarg = NULL;
chPostEventFunc *thiscb = NULL;
void *thisarg = NULL;
filter = (chFilter *) ellFirst(&chan->filters);
while (filter) {
status = filter->plug->fif->channel_open(filter);
if (status)
break;
if (status) goto finish;
filter = (chFilter *) ellNext(&filter->node);
}
/* Build up the pre-event-queue chain */
filter = (chFilter *) ellLast(&chan->filters);
while (filter) {
long status = filter->plug->fif->channel_register_pre_eventq_cb(filter, nextcb, nextarg, &thiscb, &thisarg);
if (status == 0) {
nextcb = thiscb;
nextarg = thisarg;
}
filter = (chFilter *) ellPrevious(&filter->node);
}
chan->post_event_cb = nextcb;
chan->post_event_arg = nextarg;
finish:
return status;
}

View File

@@ -14,20 +14,45 @@
#include "epicsTypes.h"
#include "errMdef.h"
#include "shareLib.h"
#include "db_field_log.h"
#include "dbEvent.h"
#ifdef __cplusplus
extern "C" {
#endif
/*
* event subscription
*/
typedef struct evSubscrip {
ELLNODE node;
struct dbChannel *chan;
EVENTFUNC *user_sub;
void *user_arg;
struct event_que *ev_que;
db_field_log **pLastLog;
unsigned long npend; /* n times this event is on the queue */
unsigned long nreplace; /* n times replacing event on the queue */
unsigned char select;
char useValque;
char callBackInProgress;
char enabled;
} evSubscrip;
typedef struct chFilter chFilter;
/* Prototype for the post event function that is called recursively in filter stacks */
typedef void (chPostEventFunc)(void *pvt, struct evSubscrip *event, db_field_log *pLog);
/* A dbChannel points to a record field, and can have multiple filters */
typedef struct dbChannel {
const char *name;
dbAddr addr;
chPostEventFunc *post_event_cb;
void *post_event_arg;
ELLLIST filters;
} dbChannel;
typedef struct chFilter chFilter;
/* Return values from chFilterIf->parse_* routines: */
typedef enum {
parse_stop, parse_continue
@@ -68,6 +93,7 @@ typedef struct chFilterIf {
/* Channel operations: */
long (* channel_open)(chFilter *filter);
long (* channel_register_pre_eventq_cb)(chFilter *filter, chPostEventFunc *cb_in, void *arg_in, chPostEventFunc **cb_out, void**arg_out);
void (* channel_report)(chFilter *filter, const char *intro, int level);
/* FIXME: More filter routines here ... */
void (* channel_close)(chFilter *filter);

View File

@@ -49,24 +49,6 @@
#define EVENTQUESIZE (EVENTENTRIES * EVENTSPERQUE)
#define EVENTQEMPTY ((struct evSubscrip *)NULL)
/*
* event subscruiption
*/
struct evSubscrip {
ELLNODE node;
struct dbChannel *chan;
EVENTFUNC *user_sub;
void *user_arg;
struct event_que *ev_que;
db_field_log *pLastLog;
unsigned long npend; /* n times this event is on the queue */
unsigned long nreplace; /* n times replacing event on the queue */
unsigned char select;
char useValque;
char callBackInProgress;
char enabled;
};
/*
* really a ring buffer
*/
@@ -487,6 +469,7 @@ void epicsShareAPI db_event_disable (dbEventSubscription event)
/*
* event_remove()
* event queue lock _must_ be applied
* this nulls the entry in the queue, but doesn't delete the db_field_log chunk
*/
static void event_remove ( struct event_que *ev_que,
unsigned short index, struct evSubscrip *placeHolder )
@@ -494,6 +477,7 @@ static void event_remove ( struct event_que *ev_que,
struct evSubscrip * const pevent = ev_que->evque[index];
ev_que->evque[index] = placeHolder;
ev_que->valque[index] = NULL;
if ( pevent->npend == 1u ) {
pevent->pLastLog = NULL;
}
@@ -637,17 +621,48 @@ int epicsShareAPI db_post_extra_labor (dbEventCtx ctx)
}
/*
* DB_POST_SINGLE_EVENT_PRIVATE()
* DB_POST_SINGLE_EVENT_FIRST()
*
* NOTE: This assumes that the db scan lock is already applied
* (as it copies data from the record)
*/
static void db_post_single_event_private (struct evSubscrip *event)
db_field_log* db_post_single_event_first (struct evSubscrip *pevent)
{
struct event_que * const ev_que = event->ev_que;
db_field_log *pLog = NULL;
if (pevent->useValque) {
pLog = (db_field_log *) freeListCalloc(dbevFieldLogFreeList);
if (pLog) {
struct dbChannel *chan = pevent->chan;
struct dbCommon *prec = dbChannelRecord(chan);
pLog->stat = prec->stat;
pLog->sevr = prec->sevr;
pLog->time = prec->time;
/*
* use memcpy to avoid a bus error on
* union copy of char in the db at an odd
* address
*/
memcpy(&pLog->field,
dbChannelField(chan),
dbChannelElementSize(chan));
}
}
return pLog;
}
/*
* DB_POST_SINGLE_EVENT_FINAL()
*
*/
void db_post_single_event_final (void *pvt, evSubscrip *pevent, db_field_log *pLog)
{
struct event_que *ev_que;
int firstEventFlag;
unsigned rngSpace;
ev_que = pevent->ev_que;
/*
* evUser ring buffer must be locked for the multiple
* threads writing/reading it
@@ -662,7 +677,7 @@ static void db_post_single_event_private (struct evSubscrip *event)
* events (saving them without the current valuye
* serves no purpose)
*/
if (!event->useValque && event->npend>0u) {
if (!pevent->useValque && pevent->npend>0u) {
UNLOCKEVQUE (ev_que)
return;
}
@@ -677,13 +692,16 @@ static void db_post_single_event_private (struct evSubscrip *event)
* then replace the last event on the queue (for this monitor)
*/
rngSpace = ringSpace ( ev_que );
if ( event->npend>0u &&
if ( pevent->npend>0u &&
(ev_que->evUser->flowCtrlMode || rngSpace<=EVENTSPERQUE) ) {
/*
* replace last event if no space is left
*/
pLog = event->pLastLog;
event->nreplace++;
if (*pevent->pLastLog) {
freeListFree(dbevFieldLogFreeList, *pevent->pLastLog);
*pevent->pLastLog = pLog;
}
pevent->nreplace++;
/*
* the event task has already been notified about
* this so we dont need to post the semaphore
@@ -691,16 +709,18 @@ static void db_post_single_event_private (struct evSubscrip *event)
firstEventFlag = 0;
}
/*
* Otherwise, the current entry must be available.
* Otherwise, the current entry must be available.
* Fill it in and advance the ring buffer.
*/
else {
assert ( ev_que->evque[ev_que->putix] == EVENTQEMPTY );
ev_que->evque[ev_que->putix] = event;
if (event->npend>0u) {
ev_que->evque[ev_que->putix] = pevent;
ev_que->valque[ev_que->putix] = pLog;
pevent->pLastLog = &ev_que->valque[ev_que->putix];
if (pevent->npend>0u) {
ev_que->nDuplicates++;
}
event->npend++;
pevent->npend++;
/*
* if the ring buffer was empty before
* adding this event
@@ -714,30 +734,6 @@ static void db_post_single_event_private (struct evSubscrip *event)
ev_que->putix = RNGINC ( ev_que->putix );
}
if (event->useValque) {
if (!pLog) {
pLog = (db_field_log *) freeListCalloc(dbevFieldLogFreeList);
}
if (pLog) {
struct dbChannel *chan = event->chan;
struct dbCommon *prec = dbChannelRecord(chan);
pLog->stat = prec->stat;
pLog->sevr = prec->sevr;
pLog->time = prec->time;
/*
* use memcpy to avoid a bus error on
* union copy of char in the db at an odd
* address
*/
memcpy(& pLog->field,
dbChannelField(chan),
dbChannelElementSize(chan));
event->pLastLog = pLog;
}
}
UNLOCKEVQUE (ev_que)
/*
@@ -782,7 +778,10 @@ unsigned int caEventMask
*/
if ( (dbChannelField(pevent->chan) == (void *)pField || pField==NULL) &&
(caEventMask & pevent->select)) {
db_post_single_event_private (pevent);
/* Call the head of the filter chain */
pevent->chan->post_event_cb(pevent->chan->post_event_arg,
pevent,
db_post_single_event_first(pevent));
}
}
@@ -800,7 +799,12 @@ void epicsShareAPI db_post_single_event (dbEventSubscription event)
struct dbCommon * const prec = dbChannelRecord(pevent->chan);
dbScanLock (prec);
db_post_single_event_private (pevent);
/* Call the head of the filter chain */
pevent->chan->post_event_cb(pevent->chan->post_event_arg,
pevent,
db_post_single_event_first(pevent));
dbScanUnlock (prec);
}
@@ -835,6 +839,10 @@ static int event_read ( struct event_que *ev_que )
if ( pevent == &canceledEvent ) {
ev_que->evque[ev_que->getix] = EVENTQEMPTY;
if (ev_que->valque[ev_que->getix]) {
freeListFree(dbevFieldLogFreeList, ev_que->valque[ev_que->getix]);
ev_que->valque[ev_que->getix] = NULL;
}
ev_que->getix = RNGINC ( ev_que->getix );
assert ( ev_que->nCanceled > 0 );
ev_que->nCanceled--;

View File

@@ -36,6 +36,7 @@ extern "C" {
struct dbChannel;
struct db_field_log;
struct evSubscrip;
epicsShareFunc int epicsShareAPI db_event_list (
const char *name, unsigned level);
@@ -71,6 +72,7 @@ epicsShareFunc void epicsShareAPI db_cancel_event (dbEventSubscription es);
epicsShareFunc void epicsShareAPI db_post_single_event (dbEventSubscription es);
epicsShareFunc void epicsShareAPI db_event_enable (dbEventSubscription es);
epicsShareFunc void epicsShareAPI db_event_disable (dbEventSubscription es);
epicsShareFunc void epicsShareAPI db_post_single_event_final (void *pvt, struct evSubscrip *pevent, struct db_field_log *pLog);
#define DB_EVENT_OK 0
#define DB_EVENT_ERROR (-1)

View File

@@ -14,6 +14,8 @@
#ifndef INCLdb_field_logh
#define INCLdb_field_logh
#include "epicsTime.h"
#ifdef __cplusplus
extern "C" {
#endif
@@ -49,7 +51,7 @@ typedef struct db_field_log {
unsigned short sevr; /* Alarm Severity */
epicsTimeStamp time; /* time stamp */
union native_value field; /* field value */
}db_field_log;
} db_field_log;
#ifdef __cplusplus
}

View File

@@ -43,6 +43,8 @@ typedef struct myStruct {
int sent6;
char c;
char c1[2];
chPostEventFunc *callback;
void *arg;
} myStruct;
static const
@@ -164,6 +166,15 @@ static long channel_open(dbChannel *chan, void *user)
return c_open_return;
}
static long channelRegisterPreEventQueCB(dbChannel *chan, void *user, chPostEventFunc *cb_in, void* arg_in,
chPostEventFunc **cb_out, void **arg_out)
{
myStruct *my = (myStruct*)user;
my->callback = cb_in;
my->arg = arg_in;
return 0;
}
static void channel_report(dbChannel *chan, void *user, const char *intro, int level)
{
testOk(e & e_report, "channel_report called");
@@ -186,6 +197,7 @@ static chfPluginIf myPif = {
parse_ok,
channel_open,
channelRegisterPreEventQueCB,
channel_report,
channel_close
};

View File

@@ -26,8 +26,9 @@
#define e_start_array 0x00000800
#define e_end_array 0x00001000
#define e_open 0x00002000
#define e_report 0x00004000
#define e_close 0x00008000
#define e_reg_pre_cb 0x00004000
#define e_report 0x00008000
#define e_close 0x00010000
#define r_any (e_start | e_abort | e_end | \
e_null | e_boolean | e_integer | e_double | e_string | \
@@ -114,6 +115,11 @@ long c_open(chFilter *filter)
testOk(e & e_open, "channel_open called");
return 0;
}
long c_reg_pre_cb(chFilter *filter, chPostEventFunc *cb_in, void *arg_in, chPostEventFunc **cb_out, void **arg_out)
{
testOk(e & e_reg_pre_cb, "channel_register_pre_event_queue_callback called");
return 0;
}
void c_report(chFilter *filter, const char *intro, int level)
{
testOk(e & e_report, "channel_report called, level = %d", level);
@@ -126,7 +132,7 @@ void c_close(chFilter *filter)
chFilterIf testIf =
{ p_start, p_abort, p_end, p_null, p_boolean, p_integer, p_double,
p_string, p_start_map, p_map_key, p_end_map, p_start_array, p_end_array,
c_open, c_report, c_close };
c_open, c_reg_pre_cb, c_report, c_close };
MAIN(dbChannelTest)
{