Added decimation filter, documentation and tests

This commit is contained in:
Andrew Johnson
2019-06-04 12:23:13 -05:00
parent 96259b7bdc
commit deb9dbcd77
7 changed files with 454 additions and 0 deletions

View File

@@ -16,6 +16,23 @@
<!-- Insert new items immediately below here ... -->
<h3>Added new decimation channel filter</h3>
<p>A new server-side filter has been added to the IOC for reducing the number
and frequency of monitor updates from a channel by a client-specified factor.
The filter's behaviour is quite simplistic, it passes the first monitor event it
sees to the client and then drops the next N-1 events before passing another
event. For example to sample a 60Hz channel at 1Hz, a 10Hz channel every 6
seconds, or a 1Hz channel once every minute:</p>
<pre> Hal$ camonitor 'test:channel.{"dec":{"n":60}}'
...</pre>
<p>More information is included in the
<a href="filters.html">filters</a><!-- href for the EPICS website -->
<a href="../html/filters.html">documentation</a><!-- href for install tree -->
file.</p>
<h3>Cleaning up with Multiple CA contexts in a Process</h3>
<p>Bruno Martins reported a problem with the CA client library at shutdown in a

View File

@@ -15,6 +15,7 @@ dbRecStd_SRCS += ts.c
dbRecStd_SRCS += dbnd.c
dbRecStd_SRCS += arr.c
dbRecStd_SRCS += sync.c
dbRecStd_SRCS += decimate.c
HTMLS += filters.html

115
src/std/filters/decimate.c Normal file
View File

@@ -0,0 +1,115 @@
/*************************************************************************\
* Copyright (c) 2019 UChicago Argonne LLC, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2010 Brookhaven National Laboratory.
* Copyright (c) 2010 Helmholtz-Zentrum Berlin
* fuer Materialien und Energie GmbH.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* Authors: Ralph Lange <Ralph.Lange@bessy.de>,
* Andrew Johnson <anj@anl.gov>
*/
#include <stdio.h>
#include "freeList.h"
#include "db_field_log.h"
#include "chfPlugin.h"
#include "epicsExport.h"
typedef struct myStruct {
epicsInt32 n, i;
} myStruct;
static void *myStructFreeList;
static const
chfPluginArgDef opts[] = {
chfInt32(myStruct, n, "n", 1, 0),
chfPluginArgEnd
};
static void * allocPvt(void)
{
myStruct *my = (myStruct*) freeListCalloc(myStructFreeList);
return (void *) my;
}
static void freePvt(void *pvt)
{
freeListFree(myStructFreeList, pvt);
}
static int parse_ok(void *pvt)
{
myStruct *my = (myStruct*) pvt;
if (my->n < 1)
return -1;
return 0;
}
static db_field_log* filter(void* pvt, dbChannel *chan, db_field_log *pfl) {
db_field_log *passfl = NULL;
myStruct *my = (myStruct*) pvt;
epicsInt32 i = my->i;
if (pfl->ctx == dbfl_context_read)
return pfl;
if (i++ == 0)
passfl = pfl;
if (i >= my->n)
i = 0;
my->i = i;
return passfl;
}
static void channelRegisterPre(dbChannel *chan, void *pvt,
chPostEventFunc **cb_out, void **arg_out, db_field_log *probe)
{
*cb_out = filter;
*arg_out = pvt;
}
static void channel_report(dbChannel *chan, void *pvt, int level, const unsigned short indent)
{
myStruct *my = (myStruct*) pvt;
printf("%*sDecimate (dec): n=%d, i=%d\n", indent, "",
my->n, my->i);
}
static chfPluginIf pif = {
allocPvt,
freePvt,
NULL, /* parse_error, */
parse_ok,
NULL, /* channel_open, */
channelRegisterPre,
NULL, /* channelRegisterPost, */
channel_report,
NULL /* channel_close */
};
static void decInitialize(void)
{
static int firstTime = 1;
if (!firstTime) return;
firstTime = 0;
if (!myStructFreeList)
freeListInitPvt(&myStructFreeList, sizeof(myStruct), 64);
chfPluginRegister("dec", &pif, opts);
}
epicsExportRegistrar(decInitialize);

View File

@@ -14,6 +14,8 @@ The following filters are available in this release:
=item * L<Synchronize|/"Synchronize Filter sync">
=item * L<Decimation|/"Decimation Filter dec">
=back
=head2 Using Filters
@@ -245,3 +247,41 @@ periods only when "blue" is true by using
...
=cut
registrar(decInitialize)
=head3 Decimation Filter C<"dec">
This filter is used to reduce the number or rate of monitor updates from a
channel by an integer factor C<n> that is provided as a filter argument,
discarding the other updates. A true decimation following the original meaning
of the word would be achieved by giving C<n> as 10, to only allow every tenth
update through.
=head4 Parameters
=over
=item Number C<"n">
The decimation factor, a positive integer. Giving n=1 is equivalent to a no-op
that allows all updates to be passed to the client.
=back
This filter is intentionally very simplistic. It passes on the first monitor
event that it sees after the channel connects, then discards the next N-1 events
before sending the next event. If several clients connect to a channel using the
same filter settings they may see completely different data streams since each
client gets its own instance of the filter whose event counter starts when that
client connects.
=head4 Example
To sample a 60Hz channel at 1Hz, a 10Hz channel every 6 seconds or a 1Hz channel
once every minute:
Hal$ camonitor 'test:channel' 'test:channel.{"dec":{"n":60}}'
...
=cut

View File

@@ -56,6 +56,12 @@ syncTest_SRCS += filterTest_registerRecordDeviceDriver.cpp
testHarness_SRCS += syncTest.c
TESTS += syncTest
TESTPROD_HOST += decTest
decTest_SRCS += decTest.c
decTest_SRCS += filterTest_registerRecordDeviceDriver.cpp
testHarness_SRCS += decTest.c
TESTS += decTest
# epicsRunFilterTests runs all the test programs in a known working order.
testHarness_SRCS += epicsRunFilterTests.c

View File

@@ -0,0 +1,273 @@
/*************************************************************************\
* Copyright (c) 2019 UChicago Argonne LLC, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2010 Brookhaven National Laboratory.
* Copyright (c) 2010 Helmholtz-Zentrum Berlin
* fuer Materialien und Energie GmbH.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* Authors: Ralph Lange <Ralph.Lange@bessy.de>,
* Andrew Johnson <anj@anl.gov>
*/
#include <string.h>
#include "dbStaticLib.h"
#include "dbAccessDefs.h"
#include "db_field_log.h"
#include "dbCommon.h"
#include "dbChannel.h"
#include "registry.h"
#include "chfPlugin.h"
#include "errlog.h"
#include "dbmf.h"
#include "epicsUnitTest.h"
#include "dbUnitTest.h"
#include "epicsTime.h"
#include "testMain.h"
#include "osiFileName.h"
void filterTest_registerRecordDeviceDriver(struct dbBase *);
static int fl_equal(const db_field_log *pfl1, const db_field_log *pfl2) {
return !(memcmp(pfl1, pfl2, sizeof(db_field_log)));
}
static void fl_setup(dbChannel *chan, db_field_log *pfl, long val) {
struct dbCommon *prec = dbChannelRecord(chan);
pfl->ctx = dbfl_context_event;
pfl->type = dbfl_type_val;
pfl->stat = prec->stat;
pfl->sevr = prec->sevr;
pfl->time = prec->time;
pfl->field_type = DBF_LONG;
pfl->no_elements = 1;
/*
* use memcpy to avoid a bus error on
* union copy of char in the db at an odd
* address
*/
memcpy(&pfl->u.v.field,
dbChannelField(chan),
dbChannelFieldSize(chan));
pfl->u.v.field.dbf_long = val;
}
static void testHead (char* title) {
testDiag("--------------------------------------------------------");
testDiag("%s", title);
testDiag("--------------------------------------------------------");
}
static void mustDrop(dbChannel *pch, db_field_log *pfl, char* m) {
db_field_log *pfl2 = dbChannelRunPreChain(pch, pfl);
testOk(NULL == pfl2, "filter drops field_log (%s)", m);
}
static void mustPass(dbChannel *pch, db_field_log *pfl, char* m) {
db_field_log *pfl2 = dbChannelRunPreChain(pch, pfl);
testOk(pfl == pfl2, "filter passes field_log (%s)", m);
}
static void checkAndOpenChannel(dbChannel *pch, const chFilterPlugin *plug) {
ELLNODE *node;
chFilter *filter;
chPostEventFunc *cb_out = NULL;
void *arg_out = NULL;
db_field_log fl, fl1;
testDiag("Test filter structure and open channel");
testOk((ellCount(&pch->filters) == 1), "channel has one plugin");
fl_setup(pch, &fl, 1);
fl1 = fl;
node = ellFirst(&pch->filters);
filter = CONTAINER(node, chFilter, list_node);
plug->fif->channel_register_pre(filter, &cb_out, &arg_out, &fl1);
testOk(cb_out && arg_out,
"register_pre registers one filter with argument");
testOk(fl_equal(&fl1, &fl),
"register_pre does not change field_log data type");
testOk(!(dbChannelOpen(pch)), "dbChannel with plugin dec opened");
node = ellFirst(&pch->pre_chain);
filter = CONTAINER(node, chFilter, pre_node);
testOk((ellCount(&pch->pre_chain) == 1 && filter->pre_arg != NULL),
"dec has one filter with argument in pre chain");
testOk((ellCount(&pch->post_chain) == 0),
"sync has no filter in post chain");
}
MAIN(decTest)
{
dbChannel *pch;
const chFilterPlugin *plug;
char myname[] = "dec";
db_field_log *pfl[10];
int i;
dbEventCtx evtctx;
testPlan(68);
testdbPrepare();
testdbReadDatabase("filterTest.dbd", NULL, NULL);
filterTest_registerRecordDeviceDriver(pdbbase);
testdbReadDatabase("xRecord.db", NULL, NULL);
eltc(0);
testIocInitOk();
eltc(1);
evtctx = db_init_events();
testOk(!!(plug = dbFindFilter(myname, strlen(myname))),
"plugin '%s' registered correctly", myname);
/* N < 1 */
testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":-1}}")),
"dbChannel with dec (n=-1) failed");
testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":0}}")),
"dbChannel with dec (n=0) failed");
/* Bad parms */
testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{}}")),
"dbChannel with dec (no parm) failed");
testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{\"x\":true}}")),
"dbChannel with dec (x=true) failed");
/* No Decimation (N=1) */
testHead("No Decimation (n=1)");
testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":1}}")),
"dbChannel with plugin dec (n=1) created");
checkAndOpenChannel(pch, plug);
for (i = 0; i < 5; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 10 + i);
}
testDiag("Test event stream");
mustPass(pch, pfl[0], "i=0");
mustPass(pch, pfl[1], "i=1");
mustPass(pch, pfl[2], "i=2");
mustPass(pch, pfl[3], "i=3");
mustPass(pch, pfl[4], "i=4");
for (i = 0; i < 5; i++)
db_delete_field_log(pfl[i]);
dbChannelDelete(pch);
/* Decimation (N=2) */
testHead("Decimation (n=2)");
testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":2}}")),
"dbChannel with plugin dec (n=2) created");
checkAndOpenChannel(pch, plug);
for (i = 0; i < 10; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 20 + i);
}
testDiag("Test event stream");
mustPass(pch, pfl[0], "i=0");
mustDrop(pch, pfl[1], "i=1");
mustPass(pch, pfl[2], "i=2");
mustDrop(pch, pfl[3], "i=3");
mustPass(pch, pfl[4], "i=4");
mustDrop(pch, pfl[5], "i=5");
mustPass(pch, pfl[6], "i=6");
mustDrop(pch, pfl[7], "i=7");
mustPass(pch, pfl[8], "i=8");
mustDrop(pch, pfl[9], "i=9");
for (i = 0; i < 10; i++)
db_delete_field_log(pfl[i]);
dbChannelDelete(pch);
/* Decimation (N=3) */
testHead("Decimation (n=3)");
testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":3}}")),
"dbChannel with plugin dec (n=3) created");
checkAndOpenChannel(pch, plug);
for (i = 0; i < 10; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 30 + i);
}
testDiag("Test event stream");
mustPass(pch, pfl[0], "i=0");
mustDrop(pch, pfl[1], "i=1");
mustDrop(pch, pfl[2], "i=2");
mustPass(pch, pfl[3], "i=3");
mustDrop(pch, pfl[4], "i=4");
mustDrop(pch, pfl[5], "i=5");
mustPass(pch, pfl[6], "i=6");
mustDrop(pch, pfl[7], "i=7");
mustDrop(pch, pfl[8], "i=8");
mustPass(pch, pfl[9], "i=9");
for (i = 0; i < 10; i++)
db_delete_field_log(pfl[i]);
dbChannelDelete(pch);
/* Decimation (N=4) */
testHead("Decimation (n=4)");
testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":4}}")),
"dbChannel with plugin dec (n=4) created");
checkAndOpenChannel(pch, plug);
for (i = 0; i < 10; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 40 + i);
}
testDiag("Test event stream");
mustPass(pch, pfl[0], "i=0");
mustDrop(pch, pfl[1], "i=1");
mustDrop(pch, pfl[2], "i=2");
mustDrop(pch, pfl[3], "i=3");
mustPass(pch, pfl[4], "i=4");
mustDrop(pch, pfl[5], "i=5");
mustDrop(pch, pfl[6], "i=6");
mustDrop(pch, pfl[7], "i=7");
mustPass(pch, pfl[8], "i=8");
mustDrop(pch, pfl[9], "i=9");
for (i = 0; i < 10; i++)
db_delete_field_log(pfl[i]);
dbChannelDelete(pch);
db_close_events(evtctx);
testIocShutdownOk();
testdbCleanup();
return testDone();
}

View File

@@ -17,6 +17,7 @@ int tsTest(void);
int dbndTest(void);
int syncTest(void);
int arrTest(void);
int decTest(void);
void epicsRunFilterTests(void)
{
@@ -26,6 +27,7 @@ void epicsRunFilterTests(void)
runTest(dbndTest);
runTest(syncTest);
runTest(arrTest);
runTest(decTest);
dbmfFreeChunks();