diff --git a/documentation/RELEASE_NOTES.html b/documentation/RELEASE_NOTES.html index 8ed4e04bf..c7fbc02b2 100644 --- a/documentation/RELEASE_NOTES.html +++ b/documentation/RELEASE_NOTES.html @@ -16,6 +16,23 @@ +

Added new decimation channel filter

+ +

A new server-side filter has been added to the IOC for reducing the number +and frequency of monitor updates from a channel by a client-specified factor. +The filter's behaviour is quite simplistic, it passes the first monitor event it +sees to the client and then drops the next N-1 events before passing another +event. For example to sample a 60Hz channel at 1Hz, a 10Hz channel every 6 +seconds, or a 1Hz channel once every minute:

+ +
    Hal$ camonitor 'test:channel.{"dec":{"n":60}}'
+    ...
+ +

More information is included in the +filters +documentation +file.

+

Cleaning up with Multiple CA contexts in a Process

Bruno Martins reported a problem with the CA client library at shutdown in a diff --git a/src/std/filters/Makefile b/src/std/filters/Makefile index d4539898f..3a27361f6 100644 --- a/src/std/filters/Makefile +++ b/src/std/filters/Makefile @@ -15,6 +15,7 @@ dbRecStd_SRCS += ts.c dbRecStd_SRCS += dbnd.c dbRecStd_SRCS += arr.c dbRecStd_SRCS += sync.c +dbRecStd_SRCS += decimate.c HTMLS += filters.html diff --git a/src/std/filters/decimate.c b/src/std/filters/decimate.c new file mode 100644 index 000000000..d34884ebe --- /dev/null +++ b/src/std/filters/decimate.c @@ -0,0 +1,115 @@ +/*************************************************************************\ +* Copyright (c) 2019 UChicago Argonne LLC, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2010 Brookhaven National Laboratory. +* Copyright (c) 2010 Helmholtz-Zentrum Berlin +* fuer Materialien und Energie GmbH. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +/* + * Authors: Ralph Lange , + * Andrew Johnson + */ + +#include + +#include "freeList.h" +#include "db_field_log.h" +#include "chfPlugin.h" +#include "epicsExport.h" + +typedef struct myStruct { + epicsInt32 n, i; +} myStruct; + +static void *myStructFreeList; + +static const +chfPluginArgDef opts[] = { + chfInt32(myStruct, n, "n", 1, 0), + chfPluginArgEnd +}; + +static void * allocPvt(void) +{ + myStruct *my = (myStruct*) freeListCalloc(myStructFreeList); + return (void *) my; +} + +static void freePvt(void *pvt) +{ + freeListFree(myStructFreeList, pvt); +} + +static int parse_ok(void *pvt) +{ + myStruct *my = (myStruct*) pvt; + + if (my->n < 1) + return -1; + + return 0; +} + +static db_field_log* filter(void* pvt, dbChannel *chan, db_field_log *pfl) { + db_field_log *passfl = NULL; + myStruct *my = (myStruct*) pvt; + epicsInt32 i = my->i; + + if (pfl->ctx == dbfl_context_read) + return pfl; + + if (i++ == 0) + passfl = pfl; + + if (i >= my->n) + i = 0; + + my->i = i; + return passfl; +} + +static void channelRegisterPre(dbChannel *chan, void *pvt, + chPostEventFunc **cb_out, void **arg_out, db_field_log *probe) +{ + *cb_out = filter; + *arg_out = pvt; +} + +static void channel_report(dbChannel *chan, void *pvt, int level, const unsigned short indent) +{ + myStruct *my = (myStruct*) pvt; + printf("%*sDecimate (dec): n=%d, i=%d\n", indent, "", + my->n, my->i); +} + +static chfPluginIf pif = { + allocPvt, + freePvt, + + NULL, /* parse_error, */ + parse_ok, + + NULL, /* channel_open, */ + channelRegisterPre, + NULL, /* channelRegisterPost, */ + channel_report, + NULL /* channel_close */ +}; + +static void decInitialize(void) +{ + static int firstTime = 1; + + if (!firstTime) return; + firstTime = 0; + + if (!myStructFreeList) + freeListInitPvt(&myStructFreeList, sizeof(myStruct), 64); + + chfPluginRegister("dec", &pif, opts); +} + +epicsExportRegistrar(decInitialize); diff --git a/src/std/filters/filters.dbd.pod b/src/std/filters/filters.dbd.pod index f1a848469..d352d0e5c 100644 --- a/src/std/filters/filters.dbd.pod +++ b/src/std/filters/filters.dbd.pod @@ -14,6 +14,8 @@ The following filters are available in this release: =item * L +=item * L + =back =head2 Using Filters @@ -245,3 +247,41 @@ periods only when "blue" is true by using ... =cut + +registrar(decInitialize) + +=head3 Decimation Filter C<"dec"> + +This filter is used to reduce the number or rate of monitor updates from a +channel by an integer factor C that is provided as a filter argument, +discarding the other updates. A true decimation following the original meaning +of the word would be achieved by giving C as 10, to only allow every tenth +update through. + +=head4 Parameters + +=over + +=item Number C<"n"> + +The decimation factor, a positive integer. Giving n=1 is equivalent to a no-op +that allows all updates to be passed to the client. + +=back + +This filter is intentionally very simplistic. It passes on the first monitor +event that it sees after the channel connects, then discards the next N-1 events +before sending the next event. If several clients connect to a channel using the +same filter settings they may see completely different data streams since each +client gets its own instance of the filter whose event counter starts when that +client connects. + +=head4 Example + +To sample a 60Hz channel at 1Hz, a 10Hz channel every 6 seconds or a 1Hz channel +once every minute: + + Hal$ camonitor 'test:channel' 'test:channel.{"dec":{"n":60}}' + ... + +=cut diff --git a/src/std/filters/test/Makefile b/src/std/filters/test/Makefile index 6e6ad79c6..ad77c5d11 100644 --- a/src/std/filters/test/Makefile +++ b/src/std/filters/test/Makefile @@ -56,6 +56,12 @@ syncTest_SRCS += filterTest_registerRecordDeviceDriver.cpp testHarness_SRCS += syncTest.c TESTS += syncTest +TESTPROD_HOST += decTest +decTest_SRCS += decTest.c +decTest_SRCS += filterTest_registerRecordDeviceDriver.cpp +testHarness_SRCS += decTest.c +TESTS += decTest + # epicsRunFilterTests runs all the test programs in a known working order. testHarness_SRCS += epicsRunFilterTests.c diff --git a/src/std/filters/test/decTest.c b/src/std/filters/test/decTest.c new file mode 100644 index 000000000..cc9317eb4 --- /dev/null +++ b/src/std/filters/test/decTest.c @@ -0,0 +1,273 @@ +/*************************************************************************\ +* Copyright (c) 2019 UChicago Argonne LLC, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2010 Brookhaven National Laboratory. +* Copyright (c) 2010 Helmholtz-Zentrum Berlin +* fuer Materialien und Energie GmbH. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +/* + * Authors: Ralph Lange , + * Andrew Johnson + */ + +#include + +#include "dbStaticLib.h" +#include "dbAccessDefs.h" +#include "db_field_log.h" +#include "dbCommon.h" +#include "dbChannel.h" +#include "registry.h" +#include "chfPlugin.h" +#include "errlog.h" +#include "dbmf.h" +#include "epicsUnitTest.h" +#include "dbUnitTest.h" +#include "epicsTime.h" +#include "testMain.h" +#include "osiFileName.h" + +void filterTest_registerRecordDeviceDriver(struct dbBase *); + +static int fl_equal(const db_field_log *pfl1, const db_field_log *pfl2) { + return !(memcmp(pfl1, pfl2, sizeof(db_field_log))); +} + +static void fl_setup(dbChannel *chan, db_field_log *pfl, long val) { + struct dbCommon *prec = dbChannelRecord(chan); + + pfl->ctx = dbfl_context_event; + pfl->type = dbfl_type_val; + pfl->stat = prec->stat; + pfl->sevr = prec->sevr; + pfl->time = prec->time; + pfl->field_type = DBF_LONG; + pfl->no_elements = 1; + /* + * use memcpy to avoid a bus error on + * union copy of char in the db at an odd + * address + */ + memcpy(&pfl->u.v.field, + dbChannelField(chan), + dbChannelFieldSize(chan)); + pfl->u.v.field.dbf_long = val; +} + +static void testHead (char* title) { + testDiag("--------------------------------------------------------"); + testDiag("%s", title); + testDiag("--------------------------------------------------------"); +} + +static void mustDrop(dbChannel *pch, db_field_log *pfl, char* m) { + db_field_log *pfl2 = dbChannelRunPreChain(pch, pfl); + + testOk(NULL == pfl2, "filter drops field_log (%s)", m); +} + +static void mustPass(dbChannel *pch, db_field_log *pfl, char* m) { + db_field_log *pfl2 = dbChannelRunPreChain(pch, pfl); + + testOk(pfl == pfl2, "filter passes field_log (%s)", m); +} + +static void checkAndOpenChannel(dbChannel *pch, const chFilterPlugin *plug) { + ELLNODE *node; + chFilter *filter; + chPostEventFunc *cb_out = NULL; + void *arg_out = NULL; + db_field_log fl, fl1; + + testDiag("Test filter structure and open channel"); + + testOk((ellCount(&pch->filters) == 1), "channel has one plugin"); + + fl_setup(pch, &fl, 1); + fl1 = fl; + node = ellFirst(&pch->filters); + filter = CONTAINER(node, chFilter, list_node); + plug->fif->channel_register_pre(filter, &cb_out, &arg_out, &fl1); + testOk(cb_out && arg_out, + "register_pre registers one filter with argument"); + testOk(fl_equal(&fl1, &fl), + "register_pre does not change field_log data type"); + + testOk(!(dbChannelOpen(pch)), "dbChannel with plugin dec opened"); + node = ellFirst(&pch->pre_chain); + filter = CONTAINER(node, chFilter, pre_node); + testOk((ellCount(&pch->pre_chain) == 1 && filter->pre_arg != NULL), + "dec has one filter with argument in pre chain"); + testOk((ellCount(&pch->post_chain) == 0), + "sync has no filter in post chain"); +} + +MAIN(decTest) +{ + dbChannel *pch; + const chFilterPlugin *plug; + char myname[] = "dec"; + db_field_log *pfl[10]; + int i; + dbEventCtx evtctx; + + testPlan(68); + + testdbPrepare(); + + testdbReadDatabase("filterTest.dbd", NULL, NULL); + + filterTest_registerRecordDeviceDriver(pdbbase); + + testdbReadDatabase("xRecord.db", NULL, NULL); + + eltc(0); + testIocInitOk(); + eltc(1); + + evtctx = db_init_events(); + + testOk(!!(plug = dbFindFilter(myname, strlen(myname))), + "plugin '%s' registered correctly", myname); + + /* N < 1 */ + testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":-1}}")), + "dbChannel with dec (n=-1) failed"); + testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":0}}")), + "dbChannel with dec (n=0) failed"); + /* Bad parms */ + testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{}}")), + "dbChannel with dec (no parm) failed"); + testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{\"x\":true}}")), + "dbChannel with dec (x=true) failed"); + + /* No Decimation (N=1) */ + + testHead("No Decimation (n=1)"); + testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":1}}")), + "dbChannel with plugin dec (n=1) created"); + + checkAndOpenChannel(pch, plug); + + for (i = 0; i < 5; i++) { + pfl[i] = db_create_read_log(pch); + fl_setup(pch, pfl[i], 10 + i); + } + + testDiag("Test event stream"); + + mustPass(pch, pfl[0], "i=0"); + mustPass(pch, pfl[1], "i=1"); + mustPass(pch, pfl[2], "i=2"); + mustPass(pch, pfl[3], "i=3"); + mustPass(pch, pfl[4], "i=4"); + + for (i = 0; i < 5; i++) + db_delete_field_log(pfl[i]); + + dbChannelDelete(pch); + + /* Decimation (N=2) */ + + testHead("Decimation (n=2)"); + testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":2}}")), + "dbChannel with plugin dec (n=2) created"); + + checkAndOpenChannel(pch, plug); + + for (i = 0; i < 10; i++) { + pfl[i] = db_create_read_log(pch); + fl_setup(pch, pfl[i], 20 + i); + } + + testDiag("Test event stream"); + + mustPass(pch, pfl[0], "i=0"); + mustDrop(pch, pfl[1], "i=1"); + mustPass(pch, pfl[2], "i=2"); + mustDrop(pch, pfl[3], "i=3"); + mustPass(pch, pfl[4], "i=4"); + mustDrop(pch, pfl[5], "i=5"); + mustPass(pch, pfl[6], "i=6"); + mustDrop(pch, pfl[7], "i=7"); + mustPass(pch, pfl[8], "i=8"); + mustDrop(pch, pfl[9], "i=9"); + + for (i = 0; i < 10; i++) + db_delete_field_log(pfl[i]); + + dbChannelDelete(pch); + + /* Decimation (N=3) */ + + testHead("Decimation (n=3)"); + testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":3}}")), + "dbChannel with plugin dec (n=3) created"); + + checkAndOpenChannel(pch, plug); + + for (i = 0; i < 10; i++) { + pfl[i] = db_create_read_log(pch); + fl_setup(pch, pfl[i], 30 + i); + } + + testDiag("Test event stream"); + + mustPass(pch, pfl[0], "i=0"); + mustDrop(pch, pfl[1], "i=1"); + mustDrop(pch, pfl[2], "i=2"); + mustPass(pch, pfl[3], "i=3"); + mustDrop(pch, pfl[4], "i=4"); + mustDrop(pch, pfl[5], "i=5"); + mustPass(pch, pfl[6], "i=6"); + mustDrop(pch, pfl[7], "i=7"); + mustDrop(pch, pfl[8], "i=8"); + mustPass(pch, pfl[9], "i=9"); + + for (i = 0; i < 10; i++) + db_delete_field_log(pfl[i]); + + dbChannelDelete(pch); + + /* Decimation (N=4) */ + + testHead("Decimation (n=4)"); + testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":4}}")), + "dbChannel with plugin dec (n=4) created"); + + checkAndOpenChannel(pch, plug); + + for (i = 0; i < 10; i++) { + pfl[i] = db_create_read_log(pch); + fl_setup(pch, pfl[i], 40 + i); + } + + testDiag("Test event stream"); + + mustPass(pch, pfl[0], "i=0"); + mustDrop(pch, pfl[1], "i=1"); + mustDrop(pch, pfl[2], "i=2"); + mustDrop(pch, pfl[3], "i=3"); + mustPass(pch, pfl[4], "i=4"); + mustDrop(pch, pfl[5], "i=5"); + mustDrop(pch, pfl[6], "i=6"); + mustDrop(pch, pfl[7], "i=7"); + mustPass(pch, pfl[8], "i=8"); + mustDrop(pch, pfl[9], "i=9"); + + for (i = 0; i < 10; i++) + db_delete_field_log(pfl[i]); + + dbChannelDelete(pch); + + db_close_events(evtctx); + + testIocShutdownOk(); + + testdbCleanup(); + + return testDone(); +} diff --git a/src/std/filters/test/epicsRunFilterTests.c b/src/std/filters/test/epicsRunFilterTests.c index 236364391..5737d77dc 100644 --- a/src/std/filters/test/epicsRunFilterTests.c +++ b/src/std/filters/test/epicsRunFilterTests.c @@ -17,6 +17,7 @@ int tsTest(void); int dbndTest(void); int syncTest(void); int arrTest(void); +int decTest(void); void epicsRunFilterTests(void) { @@ -26,6 +27,7 @@ void epicsRunFilterTests(void) runTest(dbndTest); runTest(syncTest); runTest(arrTest); + runTest(decTest); dbmfFreeChunks();