Allow filter plugin to change the type of data and the array size

* Add field_size to db_field_log
* Add final type, dbr_type, no_elements, and field size to dbChannel
  (plus matching access methods)
* Add set-type callback chain to find out final sizes and types
* Add registration calls for pre-event-queue and post-event-queue insertion
This commit is contained in:
Ralph Lange
2012-04-27 13:21:54 -04:00
committed by Michael Davidsaver
parent b09a9758b0
commit b99975cf71
11 changed files with 273 additions and 56 deletions

View File

@@ -480,14 +480,38 @@ static long channel_open(chFilter *filter)
else return 0;
}
static long channel_register_pre_eventq_cb(chFilter *filter, chPostEventFunc* cb_in, void *arg_in,
chPostEventFunc **cb_out, void **arg_out)
static void channel_register_pre_eventq(chFilter *filter,
chPostEventFunc *pe_in, void *pe_arg_in,
chSetTypeFunc *st_in, void *st_arg_in,
chPostEventFunc **pe_out, void **pe_arg_out,
chSetTypeFunc **st_out, void **st_arg_out)
{
chfPlugin *p = (chfPlugin*) filter->plug->puser;
chfFilter *f = (chfFilter*) filter->puser;
if (p->pif->channelRegisterPreEventQueCB) return p->pif->channelRegisterPreEventQueCB(filter->chan, f->puser, cb_in, arg_in, cb_out, arg_out);
else return -1;
if (p->pif->channelRegisterPreEventQue)
p->pif->channelRegisterPreEventQue(filter->chan, f->puser,
pe_in, pe_arg_in,
st_in, st_arg_in,
pe_out, pe_arg_out,
st_out, st_arg_out);
}
static void channel_register_post_eventq(chFilter *filter,
chPostEventFunc *pe_in, void *pe_arg_in,
chSetTypeFunc *st_in, void *st_arg_in,
chPostEventFunc **pe_out, void **pe_arg_out,
chSetTypeFunc **st_out, void **st_arg_out)
{
chfPlugin *p = (chfPlugin*) filter->plug->puser;
chfFilter *f = (chfFilter*) filter->puser;
if (p->pif->channelRegisterPostEventQue)
p->pif->channelRegisterPostEventQue(filter->chan, f->puser,
pe_in, pe_arg_in,
st_in, st_arg_in,
pe_out, pe_arg_out,
st_out, st_arg_out);
}
static void channel_report(chFilter *filter, const char *intro, int level)
@@ -532,7 +556,8 @@ static chFilterIf wrapper_fif = {
NULL, /* parse_end_array, */
channel_open,
channel_register_pre_eventq_cb,
channel_register_pre_eventq,
channel_register_post_eventq,
channel_report,
channel_close
};

View File

@@ -130,28 +130,87 @@ typedef struct chfPluginIf {
*/
long (* channel_open) (dbChannel *chan, void *pvt);
/** @brief Register callback for pre-event-queue post_event operation.
/** @brief Register callbacks for pre-event-queue operation.
*
* <em>Called as part of the channel connection setup.</em>
*
* This function is called to establish the stack of plugins that an event
* is passed through between the database and the event queue.
*
* The plugin must call the supplied 'cb_in' function (usually within
* its own event callback) with 'arg_in' as first argument to forward the
* data to the next plugin towards the event queue.
* The plugin must call the supplied 'pe_in' function (usually within
* its own post-event callback) with 'pe_arg_in' as first argument to forward
* the data to the next plugin towards the event queue.
* It must set pe_out to point to its own post-event callback in order to be
* called when a data update is sent from the database towards the event queue.
*
* The plugin must call the supplied 'st_in' function (from within
* its own set-type callback) with 'st_arg_in' as first argument after changing
* the data type and/or array size of 'chan'.
*
* @param chan dbChannel for which the connection is being made.
* @param pvt Pointer to private structure.
* @param cb_in Pointer to the next plugin's event callback
* @param arg_in Argument that must be supplied when this plugin calls
* next plugin's callback
* @param cb_out Pointer to this plugin's event callback
* @param arg_out Argument that must be supplied when calling this plugin's callback
* @return 0 for success (cb_out and arg_out written), -1 if callback not required
* @param pe_in Pointer to the next plugin's post-event callback
* @param pe_arg_in Argument that must be supplied when this plugin calls
* the next plugin's post-event callback
* @param st_in Pointer to the next plugin's set-type callback
* @param st_arg_in Argument that must be supplied when this plugin calls
* the next plugin's set-type callback
* @param pe_out Pointer to this plugin's post-event callback (set to NULL to bypass
* this plugin)
* @param pe_arg_out Argument that must be supplied when calling
* this plugin's post-event callback
* @param st_out Pointer to this plugin's set-type callback (set to NULL if plugin
* does not change the data type or size)
* @param st_arg_out Argument that must be supplied when calling
* this plugin's set-type callback
*/
long (* channelRegisterPreEventQueCB) (dbChannel *chan, void *pvt, chPostEventFunc* cb_in, void *arg_in,
chPostEventFunc **cb_out, void **arg_out );
void (* channelRegisterPreEventQue) (dbChannel *chan, void *pvt,
chPostEventFunc *pe_in, void *pe_arg_in,
chSetTypeFunc *st_in, void *st_arg_in,
chPostEventFunc **pe_out, void **pe_arg_out,
chSetTypeFunc **st_out, void **st_arg_out);
/** @brief Register callbacks for post-event-queue operation.
*
* <em>Called as part of the channel connection setup.</em>
*
* This function is called to establish the stack of plugins that an event
* is passed through between the event queue and the final user (CA server or
* database access).
*
* The plugin must call the supplied 'pe_in' function (usually within
* its own post-event callback) with 'arg_in' as first argument to forward
* the data to the next plugin towards the final user.
* It must set pe_out to point to its own post-event callback in order to be
* called when a data update is sent from the event queue towards the final
* user.
*
* The plugin must call the supplied 'st_in' function (from within
* its own set-type callback) with 'arg_in' as first argument after changing
* the data type and/or array size of 'chan'.
*
* @param chan dbChannel for which the connection is being made.
* @param pvt Pointer to private structure.
* @param pe_in Pointer to the next plugin's post-event callback
* @param pe_arg_in Argument that must be supplied when this plugin calls
* the next plugin's post-event callback
* @param st_in Pointer to the next plugin's set-type callback
* @param st_arg_in Argument that must be supplied when this plugin calls
* the next plugin's set-type callback
* @param pe_out Pointer to this plugin's post-event callback (set to NULL to bypass
* this plugin)
* @param pe_arg_out Argument that must be supplied when calling
* this plugin's post-event callback
* @param st_out Pointer to this plugin's set-type callback (set to NULL if plugin
* does not change the data type or size)
* @param st_arg_out Argument that must be supplied when calling
* this plugin's set-type callback
*/
void (* channelRegisterPostEventQue) (dbChannel *chan, void *pvt,
chPostEventFunc *pe_in, void *pe_arg_in,
chSetTypeFunc *st_in, void *st_arg_in,
chPostEventFunc **pe_out, void **pe_arg_out,
chSetTypeFunc **st_out, void **st_arg_out);
/** @brief Channel report request.
*

View File

@@ -821,7 +821,8 @@ long epicsShareAPI dbGet(DBADDR *paddr, short dbrType,
}
/* check for array */
if (paddr->special == SPC_DBADDR &&
if ((!pfl || pfl->type == dbfl_type_rec) &&
paddr->special == SPC_DBADDR &&
no_elements > 1 &&
(prset = dbGetRset(paddr)) &&
prset->get_array_info) {
@@ -836,7 +837,9 @@ long epicsShareAPI dbGet(DBADDR *paddr, short dbrType,
(paddr->pfield, pbuffer, paddr);
} else {
DBADDR localAddr = *paddr; /* Structure copy */
localAddr.field_type = pfl->field_type;
localAddr.field_size = pfl->field_size;
localAddr.no_elements = pfl->no_elements;
if (pfl->type == dbfl_type_val)
localAddr.pfield = (char *) &pfl->u.v.field;
else
@@ -870,7 +873,9 @@ long epicsShareAPI dbGet(DBADDR *paddr, short dbrType,
status = convert(paddr, pbuffer, n, no_elements, offset);
} else {
DBADDR localAddr = *paddr; /* Structure copy */
localAddr.field_type = pfl->field_type;
localAddr.field_size = pfl->field_size;
localAddr.no_elements = pfl->no_elements;
if (pfl->type == dbfl_type_val)
localAddr.pfield = (char *) &pfl->u.v.field;
else

View File

@@ -19,6 +19,7 @@
#include "epicsTypes.h"
#include "epicsTime.h"
#include "dbBase.h"
#ifdef INCLdb_accessh_epicsExportSharedSymbols
# define epicsExportSharedSymbols

View File

@@ -24,6 +24,9 @@
#include "special.h"
#include "yajl_parse.h"
/* The following is defined in db_convert.h */
extern unsigned short dbDBRnewToDBRold[DBR_ENUM+1];
typedef struct parseContext {
dbChannel *chan;
chFilter *filter;
@@ -410,14 +413,31 @@ finish:
return chan;
}
static void setFinalType(void *pvt, long no_elements, short field_type, short element_size) {
dbChannel *chan = (dbChannel*) pvt;
chan->final_no_elements = no_elements;
chan->final_element_size = element_size;
chan->final_type = field_type;
chan->dbr_final_type = dbDBRnewToDBRold[mapDBFToDBR[field_type]];
}
long dbChannelOpen(dbChannel *chan)
{
chFilter *filter;
long status = 0;
chPostEventFunc *nextcb = db_post_single_event_final;
void *nextarg = NULL;
chPostEventFunc *thiscb = NULL;
void *thisarg = NULL;
chPostEventFunc *pre_pe_next = db_post_single_event_final;
void *pre_nextarg = NULL;
chPostEventFunc *pre_pe_this = NULL;
void *pre_thisarg = NULL;
chPostEventFunc *post_pe_next = NULL;
void *post_nextarg = NULL;
chPostEventFunc *post_pe_this = NULL;
void *post_thisarg = NULL;
chSetTypeFunc *st_next = setFinalType;
void *st_nextarg = chan;
chSetTypeFunc *st_this = NULL;
void *st_thisarg = NULL;
filter = (chFilter *) ellFirst(&chan->filters);
while (filter) {
@@ -426,18 +446,53 @@ long dbChannelOpen(dbChannel *chan)
filter = (chFilter *) ellNext(&filter->node);
}
/* Build up the pre-event-queue chain */
/*
* Build up the post-event-queue and pre-event-queue filter chains.
* Must be done top-down and separate, since there is only one callback chain
* for type/size changes.
*/
filter = (chFilter *) ellLast(&chan->filters);
while (filter) {
long status = filter->plug->fif->channel_register_pre_eventq_cb(filter, nextcb, nextarg, &thiscb, &thisarg);
if (status == 0) {
nextcb = thiscb;
nextarg = thisarg;
while (filter && filter->plug->fif->channel_register_post_eventq) {
filter->plug->fif->channel_register_post_eventq(filter,
post_pe_next, post_nextarg,
st_next, st_nextarg,
&post_pe_this, &post_thisarg,
&st_this, &st_thisarg);
if (post_pe_this) {
post_pe_next = post_pe_this;
post_nextarg = post_thisarg;
}
if (st_this) {
st_next = st_this;
st_nextarg = st_thisarg;
}
filter = (chFilter *) ellPrevious(&filter->node);
}
chan->post_event_cb = nextcb;
chan->post_event_arg = nextarg;
chan->post_event_cb = post_pe_next;
chan->post_event_arg = post_nextarg;
filter = (chFilter *) ellLast(&chan->filters);
while (filter && filter->plug->fif->channel_register_pre_eventq) {
filter->plug->fif->channel_register_pre_eventq(filter,
pre_pe_next, pre_nextarg,
st_next, st_nextarg,
&pre_pe_this, &pre_thisarg,
&st_this, &st_thisarg);
if (pre_pe_this) {
pre_pe_next = pre_pe_this;
pre_nextarg = pre_thisarg;
}
if (st_this) {
st_next = st_this;
st_nextarg = st_thisarg;
}
filter = (chFilter *) ellPrevious(&filter->node);
}
chan->pre_event_cb = pre_pe_next;
chan->pre_event_arg = pre_nextarg;
/* Call filter chain to determine data type/size changes */
if (st_next) st_next(st_nextarg, chan->addr.no_elements, chan->addr.field_type, chan->addr.field_size);
finish:
return status;
@@ -481,6 +536,26 @@ short dbChannelElementSize(dbChannel *chan)
return chan->addr.field_size;
}
long dbChannelFinalElements(dbChannel *chan)
{
return chan->final_no_elements;
}
short dbChannelFinalFieldType(dbChannel *chan)
{
return chan->final_type;
}
short dbChannelFinalExportType(dbChannel *chan)
{
return chan->dbr_final_type;
}
short dbChannelFinalElementSize(dbChannel *chan)
{
return chan->final_element_size;
}
short dbChannelSpecial(dbChannel *chan)
{
return chan->addr.special;

View File

@@ -48,11 +48,20 @@ typedef void (chPostEventFunc)(void *pvt, struct evSubscrip *event, db_field_log
typedef struct dbChannel {
const char *name;
dbAddr addr;
long final_no_elements; /* final number of elements (arrays) */
short final_element_size; /* final size of element */
short final_type; /* final type of database field */
short dbr_final_type; /* final field type as seen by database request */
chPostEventFunc *pre_event_cb;
void *pre_event_arg;
chPostEventFunc *post_event_cb;
void *post_event_arg;
ELLLIST filters;
} dbChannel;
/* Prototype for the set type function that is called recursively in filter stacks */
typedef void (chSetTypeFunc)(void *pvt, long no_elements, short field_type, short element_size);
/* Return values from chFilterIf->parse_* routines: */
typedef enum {
parse_stop, parse_continue
@@ -93,7 +102,16 @@ typedef struct chFilterIf {
/* Channel operations: */
long (* channel_open)(chFilter *filter);
long (* channel_register_pre_eventq_cb)(chFilter *filter, chPostEventFunc *cb_in, void *arg_in, chPostEventFunc **cb_out, void**arg_out);
void (* channel_register_pre_eventq) (chFilter *filter,
chPostEventFunc *pe_in, void *arg_pe_in,
chSetTypeFunc *st_in, void *arg_st_in,
chPostEventFunc **pe_out, void **arg_pe_out,
chSetTypeFunc **st_out, void **arg_st_out);
void (* channel_register_post_eventq)(chFilter *filter,
chPostEventFunc *pe_in, void *arg_pe_in,
chSetTypeFunc *st_in, void *arg_st_in,
chPostEventFunc **pe_out, void **arg_pe_out,
chSetTypeFunc **st_out, void **arg_st_out);
void (* channel_report)(chFilter *filter, const char *intro, int level);
/* FIXME: More filter routines here ... */
void (* channel_close)(chFilter *filter);
@@ -128,6 +146,10 @@ epicsShareFunc long dbChannelElements(dbChannel *chan);
epicsShareFunc short dbChannelFieldType(dbChannel *chan);
epicsShareFunc short dbChannelExportType(dbChannel *chan);
epicsShareFunc short dbChannelElementSize(dbChannel *chan);
epicsShareFunc long dbChannelFinalElements(dbChannel *chan);
epicsShareFunc short dbChannelFinalFieldType(dbChannel *chan);
epicsShareFunc short dbChannelFinalExportType(dbChannel *chan);
epicsShareFunc short dbChannelFinalElementSize(dbChannel *chan);
epicsShareFunc short dbChannelSpecial(dbChannel *chan);
epicsShareFunc void * dbChannelField(dbChannel *chan);
epicsShareFunc long dbChannelGet(dbChannel *chan, short type,

View File

@@ -784,7 +784,7 @@ unsigned int caEventMask
if ( (dbChannelField(pevent->chan) == (void *)pField || pField==NULL) &&
(caEventMask & pevent->select)) {
/* Call the head of the filter chain */
pevent->chan->post_event_cb(pevent->chan->post_event_arg,
pevent->chan->pre_event_cb(pevent->chan->pre_event_arg,
pevent,
db_post_single_event_first(pevent));
}
@@ -806,7 +806,7 @@ void epicsShareAPI db_post_single_event (dbEventSubscription event)
dbScanLock (prec);
/* Call the head of the filter chain */
pevent->chan->post_event_cb(pevent->chan->post_event_arg,
pevent->chan->pre_event_cb(pevent->chan->pre_event_arg,
pevent,
db_post_single_event_first(pevent));

View File

@@ -74,6 +74,7 @@ typedef struct db_field_log {
unsigned short stat; /* Alarm Status */
unsigned short sevr; /* Alarm Severity */
short field_type; /* DBF type of data */
short field_size; /* Data size */
long no_elements; /* No of array elements */
union {
struct dbfl_val v;

View File

@@ -44,8 +44,14 @@ typedef struct myStruct {
int sent6;
char c;
char c1[2];
chPostEventFunc *callback;
void *arg;
chPostEventFunc *pe_pre;
void *pe_pre_arg;
chPostEventFunc *pe_post;
void *pe_post_arg;
chPostEventFunc *st_pre;
void *st_pre_arg;
chPostEventFunc *st_post;
void *st_post_arg;
} myStruct;
static const
@@ -167,13 +173,26 @@ static long channel_open(dbChannel *chan, void *user)
return c_open_return;
}
static long channelRegisterPreEventQueCB(dbChannel *chan, void *user, chPostEventFunc *cb_in, void* arg_in,
chPostEventFunc **cb_out, void **arg_out)
static void channelRegisterPreEventQue(dbChannel *chan, void *user,
chPostEventFunc *pe_in, void *pe_arg_in,
chSetTypeFunc *st_in, void *st_arg_in,
chPostEventFunc **pe_out, void **pe_arg_out,
chSetTypeFunc **st_out, void **st_arg_out)
{
myStruct *my = (myStruct*)user;
my->callback = cb_in;
my->arg = arg_in;
return 0;
my->pe_pre = pe_in;
my->pe_pre_arg = pe_arg_in;
}
static void channelRegisterPostEventQue(dbChannel *chan, void *user,
chPostEventFunc *pe_in, void *pe_arg_in,
chSetTypeFunc *st_in, void *st_arg_in,
chPostEventFunc **pe_out, void **pe_arg_out,
chSetTypeFunc **st_out, void **st_arg_out)
{
myStruct *my = (myStruct*)user;
my->pe_post = pe_in;
my->pe_post_arg = pe_arg_in;
}
static void channel_report(dbChannel *chan, void *user, const char *intro, int level)
@@ -198,7 +217,8 @@ static chfPluginIf myPif = {
parse_ok,
channel_open,
channelRegisterPreEventQueCB,
channelRegisterPreEventQue,
channelRegisterPostEventQue,
channel_report,
channel_close
};

View File

@@ -26,9 +26,10 @@
#define e_start_array 0x00000800
#define e_end_array 0x00001000
#define e_open 0x00002000
#define e_reg_pre_cb 0x00004000
#define e_report 0x00008000
#define e_close 0x00010000
#define e_reg_pre 0x00004000
#define e_reg_post 0x00008000
#define e_report 0x00010000
#define e_close 0x00020000
#define r_any (e_start | e_abort | e_end | \
e_null | e_boolean | e_integer | e_double | e_string | \
@@ -115,10 +116,17 @@ long c_open(chFilter *filter)
testOk(e & e_open, "channel_open called");
return 0;
}
long c_reg_pre_cb(chFilter *filter, chPostEventFunc *cb_in, void *arg_in, chPostEventFunc **cb_out, void **arg_out)
void c_reg_pre(chFilter *filter,
chPostEventFunc *pe_in, void *pe_arg_in, chSetTypeFunc *st_in, void *st_arg_in,
chPostEventFunc **pe_out, void **arg_out, chSetTypeFunc **st_out, void **st_arg_out)
{
testOk(e & e_reg_pre_cb, "channel_register_pre_event_queue_callback called");
return 0;
testOk(e & e_reg_pre, "channel_register_pre_event_queue called");
}
void c_reg_post(chFilter *filter,
chPostEventFunc *pe_in, void *pe_arg_in, chSetTypeFunc *st_in, void *st_arg_in,
chPostEventFunc **pe_out, void **arg_out, chSetTypeFunc **st_out, void **st_arg_out)
{
testOk(e & e_reg_post, "channel_register_post_event_queue called");
}
void c_report(chFilter *filter, const char *intro, int level)
{
@@ -132,7 +140,7 @@ void c_close(chFilter *filter)
chFilterIf testIf =
{ p_start, p_abort, p_end, p_null, p_boolean, p_integer, p_double,
p_string, p_start_map, p_map_key, p_end_map, p_start_array, p_end_array,
c_open, c_reg_pre_cb, c_report, c_close };
c_open, c_reg_pre, c_reg_post, c_report, c_close };
MAIN(dbChannelTest)
{

View File

@@ -1200,7 +1200,7 @@ static void claim_ciu_reply ( struct channel_in_use * pciu )
ca_uint32_t nElem;
long dbElem;
SEND_LOCK ( pciu->client );
dbElem = dbChannelElements(pciu->dbch);
dbElem = dbChannelFinalElements(pciu->dbch);
if ( dbElem < 0 ) {
nElem = 0;
}
@@ -1219,7 +1219,7 @@ static void claim_ciu_reply ( struct channel_in_use * pciu )
}
status = cas_copy_in_header (
pciu->client, CA_PROTO_CREATE_CHAN, 0u,
dbChannelExportType(pciu->dbch), nElem, pciu->cid,
dbChannelFinalExportType(pciu->dbch), nElem, pciu->cid,
pciu->sid, NULL );
if ( status == ECA_NORMAL ) {
cas_commit_msg ( pciu->client, 0u );
@@ -2253,6 +2253,7 @@ static int search_reply_udp ( caHdrLargeArray *mp, void *pPayload, struct client
*
* New versions dont alloc the channel in response
* to a search request.
* For these, allocation has been moved to claim_ciu_action().
*
* m_count, m_cid are already in host format...
*/
@@ -2282,16 +2283,16 @@ static int search_reply_udp ( caHdrLargeArray *mp, void *pPayload, struct client
return RSRV_OK;
}
sid = pchannel->sid;
if ( dbChannelElements(dbch) < 0 ) {
if ( dbChannelFinalElements(dbch) < 0 ) {
count = 0;
}
else if ( dbChannelElements(dbch) > 0xffff ) {
else if ( dbChannelFinalElements(dbch) > 0xffff ) {
count = 0xfffe;
}
else {
count = (ca_uint16_t) dbChannelElements(dbch);
count = (ca_uint16_t) dbChannelFinalElements(dbch);
}
type = (ca_uint16_t) dbChannelExportType(dbch);
type = (ca_uint16_t) dbChannelFinalExportType(dbch);
}
SEND_LOCK ( client );