Merge decimate filter branch

This commit is contained in:
Andrew Johnson
2019-10-21 22:56:05 -05:00
12 changed files with 655 additions and 71 deletions

View File

@@ -6,6 +6,24 @@ This version of EPICS Base has not been released yet.
<!-- Insert new items immediately below here ... -->
### Added new decimation channel filter
A new server-side filter has been added to the IOC for reducing the number
and frequency of monitor updates from a channel by a client-specified factor.
The filter's behaviour is quite simplistic, it passes the first monitor event it
sees to the client and then drops the next N-1 events before passing another
event. For example to sample a 60Hz channel at 1Hz, a 10Hz channel every 6
seconds, or a 1Hz channel once every minute:
```
Hal$ camonitor 'test:channel.{"dec":{"n":60}}'
...
```
More information is included in the filters documentation, which can be found
[here](filters.html) or [here](../html/filters.html) depending on where you're
reading this document from.
### Imported Record Reference Documentation from Wiki
The remaining record types that had 3.14 reference documentation in the EPICS

View File

@@ -1165,3 +1165,8 @@ void db_delete_field_log (db_field_log *pfl)
freeListFree(dbevFieldLogFreeList, pfl);
}
}
int db_available_logs(void)
{
return (int) freeListItemsAvail(dbevFieldLogFreeList);
}

View File

@@ -78,6 +78,7 @@ epicsShareFunc void db_event_disable (dbEventSubscription es);
epicsShareFunc struct db_field_log* db_create_event_log (struct evSubscrip *pevent);
epicsShareFunc struct db_field_log* db_create_read_log (struct dbChannel *chan);
epicsShareFunc void db_delete_field_log (struct db_field_log *pfl);
epicsShareFunc int db_available_logs(void);
#define DB_EVENT_OK 0
#define DB_EVENT_ERROR (-1)
@@ -87,4 +88,3 @@ epicsShareFunc void db_delete_field_log (struct db_field_log *pfl);
#endif
#endif /*INCLdbEventh*/

View File

@@ -15,6 +15,7 @@ dbRecStd_SRCS += ts.c
dbRecStd_SRCS += dbnd.c
dbRecStd_SRCS += arr.c
dbRecStd_SRCS += sync.c
dbRecStd_SRCS += decimate.c
HTMLS += filters.html

117
src/std/filters/decimate.c Normal file
View File

@@ -0,0 +1,117 @@
/*************************************************************************\
* Copyright (c) 2019 UChicago Argonne LLC, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2010 Brookhaven National Laboratory.
* Copyright (c) 2010 Helmholtz-Zentrum Berlin
* fuer Materialien und Energie GmbH.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* Authors: Ralph Lange <Ralph.Lange@bessy.de>,
* Andrew Johnson <anj@anl.gov>
*/
#include <stdio.h>
#include "freeList.h"
#include "db_field_log.h"
#include "chfPlugin.h"
#include "epicsExport.h"
typedef struct myStruct {
epicsInt32 n, i;
} myStruct;
static void *myStructFreeList;
static const
chfPluginArgDef opts[] = {
chfInt32(myStruct, n, "n", 1, 0),
chfPluginArgEnd
};
static void * allocPvt(void)
{
myStruct *my = (myStruct*) freeListCalloc(myStructFreeList);
return (void *) my;
}
static void freePvt(void *pvt)
{
freeListFree(myStructFreeList, pvt);
}
static int parse_ok(void *pvt)
{
myStruct *my = (myStruct*) pvt;
if (my->n < 1)
return -1;
return 0;
}
static db_field_log* filter(void* pvt, dbChannel *chan, db_field_log *pfl) {
db_field_log *passfl = NULL;
myStruct *my = (myStruct*) pvt;
epicsInt32 i = my->i;
if (pfl->ctx == dbfl_context_read)
return pfl;
if (i++ == 0)
passfl = pfl;
else
db_delete_field_log(pfl);
if (i >= my->n)
i = 0;
my->i = i;
return passfl;
}
static void channelRegisterPre(dbChannel *chan, void *pvt,
chPostEventFunc **cb_out, void **arg_out, db_field_log *probe)
{
*cb_out = filter;
*arg_out = pvt;
}
static void channel_report(dbChannel *chan, void *pvt, int level, const unsigned short indent)
{
myStruct *my = (myStruct*) pvt;
printf("%*sDecimate (dec): n=%d, i=%d\n", indent, "",
my->n, my->i);
}
static chfPluginIf pif = {
allocPvt,
freePvt,
NULL, /* parse_error, */
parse_ok,
NULL, /* channel_open, */
channelRegisterPre,
NULL, /* channelRegisterPost, */
channel_report,
NULL /* channel_close */
};
static void decInitialize(void)
{
static int firstTime = 1;
if (!firstTime) return;
firstTime = 0;
if (!myStructFreeList)
freeListInitPvt(&myStructFreeList, sizeof(myStruct), 64);
chfPluginRegister("dec", &pif, opts);
}
epicsExportRegistrar(decInitialize);

View File

@@ -14,6 +14,8 @@ The following filters are available in this release:
=item * L<Synchronize|/"Synchronize Filter sync">
=item * L<Decimation|/"Decimation Filter dec">
=back
=head2 Using Filters
@@ -245,3 +247,41 @@ periods only when "blue" is true by using
...
=cut
registrar(decInitialize)
=head3 Decimation Filter C<"dec">
This filter is used to reduce the number or rate of monitor updates from a
channel by an integer factor C<n> that is provided as a filter argument,
discarding the other updates. A true decimation following the original meaning
of the word would be achieved by giving C<n> as 10, to only allow every tenth
update through.
=head4 Parameters
=over
=item Number C<"n">
The decimation factor, a positive integer. Giving n=1 is equivalent to a no-op
that allows all updates to be passed to the client.
=back
This filter is intentionally very simplistic. It passes on the first monitor
event that it sees after the channel connects, then discards the next N-1 events
before sending the next event. If several clients connect to a channel using the
same filter settings they may see completely different data streams since each
client gets its own instance of the filter whose event counter starts when that
client connects.
=head4 Example
To sample a 60Hz channel at 1Hz, a 10Hz channel every 6 seconds or a 1Hz channel
once every minute:
Hal$ camonitor 'test:channel' 'test:channel.{"dec":{"n":60}}'
...
=cut

View File

@@ -109,7 +109,9 @@ static db_field_log* filter(void* pvt, dbChannel *chan, db_field_log *pfl) {
passfl = pfl;
pfl = NULL;
}
break;
else
db_delete_field_log(pfl);
goto save_state;
case syncModeLast:
if (!actstate && my->laststate) {
passfl = my->lastfl;
@@ -121,28 +123,34 @@ static db_field_log* filter(void* pvt, dbChannel *chan, db_field_log *pfl) {
passfl = pfl;
pfl = NULL;
}
break;
else
db_delete_field_log(pfl);
goto save_state;
case syncModeWhile:
if (actstate) {
if (actstate)
passfl = pfl;
}
else
db_delete_field_log(pfl);
goto no_shift;
case syncModeUnless:
if (!actstate) {
if (!actstate)
passfl = pfl;
}
else
db_delete_field_log(pfl);
goto no_shift;
}
if (my->lastfl)
db_delete_field_log(my->lastfl);
my->lastfl = pfl;
my->laststate = actstate;
/* since no copy is made we can't keep a reference to the returned fl */
assert(my->lastfl != passfl);
no_shift:
save_state:
my->laststate = actstate;
no_shift:
return passfl;
}

View File

@@ -56,6 +56,12 @@ syncTest_SRCS += filterTest_registerRecordDeviceDriver.cpp
testHarness_SRCS += syncTest.c
TESTS += syncTest
TESTPROD_HOST += decTest
decTest_SRCS += decTest.c
decTest_SRCS += filterTest_registerRecordDeviceDriver.cpp
testHarness_SRCS += decTest.c
TESTS += decTest
# epicsRunFilterTests runs all the test programs in a known working order.
testHarness_SRCS += epicsRunFilterTests.c

View File

@@ -39,12 +39,14 @@ static int fl_equal(const db_field_log *pfl1, const db_field_log *pfl2) {
static void fl_setup(dbChannel *chan, db_field_log *pfl) {
struct dbCommon *prec = dbChannelRecord(chan);
memset(pfl, 0, sizeof(db_field_log));
pfl->ctx = dbfl_context_read;
pfl->type = dbfl_type_val;
pfl->stat = prec->stat;
pfl->sevr = prec->sevr;
pfl->time = prec->time;
pfl->field_type = dbChannelFieldType(chan);
pfl->field_size = dbChannelFieldSize(chan);
pfl->no_elements = dbChannelElements(chan);
/*
* use memcpy to avoid a bus error on
@@ -62,6 +64,7 @@ static void changeValue(db_field_log *pfl2, long val) {
}
static void mustPassOnce(dbChannel *pch, db_field_log *pfl2, char* m, double d, long val) {
int oldFree = db_available_logs(), newFree;
db_field_log *pfl;
changeValue(pfl2, val);
@@ -71,18 +74,26 @@ static void mustPassOnce(dbChannel *pch, db_field_log *pfl2, char* m, double d,
testOk(fl_equal(pfl, pfl2), "call 1 does not change field_log data");
pfl = dbChannelRunPreChain(pch, pfl2);
testOk(NULL == pfl, "call 2 drops field_log");
newFree = db_available_logs();
testOk(newFree == oldFree + 1, "field_log was freed - %d+1 => %d",
oldFree, newFree);
}
static void mustDrop(dbChannel *pch, db_field_log *pfl2, char* m, double d, long val) {
int oldFree = db_available_logs(), newFree;
db_field_log *pfl;
changeValue(pfl2, val);
testDiag("mode=%s delta=%g filter must drop", m, d);
pfl = dbChannelRunPreChain(pch, pfl2);
testOk(NULL == pfl, "call 1 drops field_log");
newFree = db_available_logs();
testOk(newFree == oldFree + 1, "field_log was freed - %d+1 => %d",
oldFree, newFree);
}
static void mustPassTwice(dbChannel *pch, db_field_log *pfl2, char* m, double d, long val) {
int oldFree = db_available_logs(), newFree;
db_field_log *pfl;
changeValue(pfl2, val);
@@ -93,6 +104,9 @@ static void mustPassTwice(dbChannel *pch, db_field_log *pfl2, char* m, double d,
pfl = dbChannelRunPreChain(pch, pfl2);
testOk(pfl2 == pfl, "call 2 does not drop or replace field_log");
testOk(fl_equal(pfl, pfl2), "call 2 does not change field_log data");
newFree = db_available_logs();
testOk(newFree == oldFree, "field_log was not freed - %d => %d",
oldFree, newFree);
}
static void testHead (char* title) {
@@ -113,8 +127,9 @@ MAIN(dbndTest)
db_field_log *pfl2;
db_field_log fl1;
dbEventCtx evtctx;
int logsFree, logsFinal;
testPlan(59);
testPlan(77);
testdbPrepare();
@@ -135,6 +150,11 @@ MAIN(dbndTest)
testOk(!!(pch = dbChannelCreate("x.VAL{\"dbnd\":{}}")), "dbChannel with plugin dbnd (delta=0) created");
testOk((ellCount(&pch->filters) == 1), "channel has one plugin");
/* Start the free-list */
db_delete_field_log(db_create_read_log(pch));
logsFree = db_available_logs();
testDiag("%d field_logs on free-list", logsFree);
memset(&fl, PATTERN, sizeof(fl));
fl1 = fl;
node = ellFirst(&pch->filters);
@@ -176,6 +196,8 @@ MAIN(dbndTest)
dbChannelDelete(pch);
testDiag("%d field_logs on free-list", db_available_logs());
/* Delta = -1: pass any update */
testHead("Delta = -1: pass any update");
@@ -192,6 +214,8 @@ MAIN(dbndTest)
db_delete_field_log(pfl2);
dbChannelDelete(pch);
testDiag("%d field_logs on free-list", db_available_logs());
/* Delta = absolute */
testHead("Delta = absolute");
@@ -224,6 +248,8 @@ MAIN(dbndTest)
dbChannelDelete(pch);
testDiag("%d field_logs on free-list", db_available_logs());
/* Delta = relative */
testHead("Delta = relative");
@@ -275,6 +301,9 @@ MAIN(dbndTest)
dbChannelDelete(pch);
logsFinal = db_available_logs();
testOk(logsFree == logsFinal, "%d field_logs on free-list", logsFinal);
db_close_events(evtctx);
testIocShutdownOk();

View File

@@ -0,0 +1,289 @@
/*************************************************************************\
* Copyright (c) 2019 UChicago Argonne LLC, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2010 Brookhaven National Laboratory.
* Copyright (c) 2010 Helmholtz-Zentrum Berlin
* fuer Materialien und Energie GmbH.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* Authors: Ralph Lange <Ralph.Lange@bessy.de>,
* Andrew Johnson <anj@anl.gov>
*/
#include <string.h>
#include "dbStaticLib.h"
#include "dbAccessDefs.h"
#include "db_field_log.h"
#include "dbCommon.h"
#include "dbChannel.h"
#include "registry.h"
#include "chfPlugin.h"
#include "errlog.h"
#include "dbmf.h"
#include "epicsUnitTest.h"
#include "dbUnitTest.h"
#include "epicsTime.h"
#include "testMain.h"
#include "osiFileName.h"
void filterTest_registerRecordDeviceDriver(struct dbBase *);
static int fl_equal(const db_field_log *pfl1, const db_field_log *pfl2) {
return !(memcmp(pfl1, pfl2, sizeof(db_field_log)));
}
static void fl_setup(dbChannel *chan, db_field_log *pfl, long val) {
struct dbCommon *prec = dbChannelRecord(chan);
memset(pfl, 0, sizeof(db_field_log));
pfl->ctx = dbfl_context_event;
pfl->type = dbfl_type_val;
pfl->stat = prec->stat;
pfl->sevr = prec->sevr;
pfl->time = prec->time;
pfl->field_type = DBF_LONG;
pfl->field_size = sizeof(epicsInt32);
pfl->no_elements = 1;
/*
* use memcpy to avoid a bus error on
* union copy of char in the db at an odd
* address
*/
memcpy(&pfl->u.v.field,
dbChannelField(chan),
dbChannelFieldSize(chan));
pfl->u.v.field.dbf_long = val;
}
static void testHead (char* title) {
testDiag("--------------------------------------------------------");
testDiag("%s", title);
testDiag("--------------------------------------------------------");
}
static void mustDrop(dbChannel *pch, db_field_log *pfl, char* m) {
int oldFree = db_available_logs();
db_field_log *pfl2 = dbChannelRunPreChain(pch, pfl);
int newFree = db_available_logs();
testOk(NULL == pfl2, "filter drops field_log (%s)", m);
testOk(newFree == oldFree + 1, "field_log was freed - %d+1 => %d",
oldFree, newFree);
db_delete_field_log(pfl2);
}
static void mustPass(dbChannel *pch, db_field_log *pfl, char* m) {
int oldFree = db_available_logs();
db_field_log *pfl2 = dbChannelRunPreChain(pch, pfl);
int newFree = db_available_logs();
testOk(pfl == pfl2, "filter passes field_log (%s)", m);
testOk(newFree == oldFree, "field_log was not freed - %d => %d",
oldFree, newFree);
db_delete_field_log(pfl2);
}
static void checkAndOpenChannel(dbChannel *pch, const chFilterPlugin *plug) {
ELLNODE *node;
chFilter *filter;
chPostEventFunc *cb_out = NULL;
void *arg_out = NULL;
db_field_log fl, fl1;
testDiag("Test filter structure and open channel");
testOk((ellCount(&pch->filters) == 1), "channel has one plugin");
fl_setup(pch, &fl, 1);
fl1 = fl;
node = ellFirst(&pch->filters);
filter = CONTAINER(node, chFilter, list_node);
plug->fif->channel_register_pre(filter, &cb_out, &arg_out, &fl1);
testOk(cb_out && arg_out,
"register_pre registers one filter with argument");
testOk(fl_equal(&fl1, &fl),
"register_pre does not change field_log data type");
testOk(!(dbChannelOpen(pch)), "dbChannel with plugin dec opened");
node = ellFirst(&pch->pre_chain);
filter = CONTAINER(node, chFilter, pre_node);
testOk((ellCount(&pch->pre_chain) == 1 && filter->pre_arg != NULL),
"dec has one filter with argument in pre chain");
testOk((ellCount(&pch->post_chain) == 0),
"sync has no filter in post chain");
}
MAIN(decTest)
{
dbChannel *pch;
const chFilterPlugin *plug;
char myname[] = "dec";
db_field_log *pfl[10];
int i, logsFree, logsFinal;
dbEventCtx evtctx;
testPlan(104);
testdbPrepare();
testdbReadDatabase("filterTest.dbd", NULL, NULL);
filterTest_registerRecordDeviceDriver(pdbbase);
testdbReadDatabase("xRecord.db", NULL, NULL);
eltc(0);
testIocInitOk();
eltc(1);
evtctx = db_init_events();
testOk(!!(plug = dbFindFilter(myname, strlen(myname))),
"plugin '%s' registered correctly", myname);
/* N < 1 */
testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":-1}}")),
"dbChannel with dec (n=-1) failed");
testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":0}}")),
"dbChannel with dec (n=0) failed");
/* Bad parms */
testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{}}")),
"dbChannel with dec (no parm) failed");
testOk(!(pch = dbChannelCreate("x.VAL{\"dec\":{\"x\":true}}")),
"dbChannel with dec (x=true) failed");
/* No Decimation (N=1) */
testHead("No Decimation (n=1)");
testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":1}}")),
"dbChannel with plugin dec (n=1) created");
/* Start the free-list */
db_delete_field_log(db_create_read_log(pch));
logsFree = db_available_logs();
testDiag("%d field_logs on free-list", logsFree);
checkAndOpenChannel(pch, plug);
for (i = 0; i < 5; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 10 + i);
}
testDiag("Test event stream");
mustPass(pch, pfl[0], "i=0");
mustPass(pch, pfl[1], "i=1");
mustPass(pch, pfl[2], "i=2");
mustPass(pch, pfl[3], "i=3");
mustPass(pch, pfl[4], "i=4");
dbChannelDelete(pch);
testDiag("%d field_logs on free-list", db_available_logs());
/* Decimation (N=2) */
testHead("Decimation (n=2)");
testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":2}}")),
"dbChannel with plugin dec (n=2) created");
checkAndOpenChannel(pch, plug);
for (i = 0; i < 10; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 20 + i);
}
testDiag("Test event stream");
mustPass(pch, pfl[0], "i=0");
mustDrop(pch, pfl[1], "i=1");
mustPass(pch, pfl[2], "i=2");
mustDrop(pch, pfl[3], "i=3");
mustPass(pch, pfl[4], "i=4");
mustDrop(pch, pfl[5], "i=5");
mustPass(pch, pfl[6], "i=6");
mustDrop(pch, pfl[7], "i=7");
mustPass(pch, pfl[8], "i=8");
mustDrop(pch, pfl[9], "i=9");
dbChannelDelete(pch);
testDiag("%d field_logs on free-list", db_available_logs());
/* Decimation (N=3) */
testHead("Decimation (n=3)");
testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":3}}")),
"dbChannel with plugin dec (n=3) created");
checkAndOpenChannel(pch, plug);
for (i = 0; i < 10; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 30 + i);
}
testDiag("Test event stream");
mustPass(pch, pfl[0], "i=0");
mustDrop(pch, pfl[1], "i=1");
mustDrop(pch, pfl[2], "i=2");
mustPass(pch, pfl[3], "i=3");
mustDrop(pch, pfl[4], "i=4");
mustDrop(pch, pfl[5], "i=5");
mustPass(pch, pfl[6], "i=6");
mustDrop(pch, pfl[7], "i=7");
mustDrop(pch, pfl[8], "i=8");
mustPass(pch, pfl[9], "i=9");
dbChannelDelete(pch);
testDiag("%d field_logs on free-list", db_available_logs());
/* Decimation (N=4) */
testHead("Decimation (n=4)");
testOk(!!(pch = dbChannelCreate("x.VAL{\"dec\":{\"n\":4}}")),
"dbChannel with plugin dec (n=4) created");
checkAndOpenChannel(pch, plug);
for (i = 0; i < 10; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 40 + i);
}
testDiag("Test event stream");
mustPass(pch, pfl[0], "i=0");
mustDrop(pch, pfl[1], "i=1");
mustDrop(pch, pfl[2], "i=2");
mustDrop(pch, pfl[3], "i=3");
mustPass(pch, pfl[4], "i=4");
mustDrop(pch, pfl[5], "i=5");
mustDrop(pch, pfl[6], "i=6");
mustDrop(pch, pfl[7], "i=7");
mustPass(pch, pfl[8], "i=8");
mustDrop(pch, pfl[9], "i=9");
dbChannelDelete(pch);
logsFinal = db_available_logs();
testOk(logsFree == logsFinal, "%d field_logs on free-list", logsFinal);
db_close_events(evtctx);
testIocShutdownOk();
testdbCleanup();
return testDone();
}

View File

@@ -17,6 +17,7 @@ int tsTest(void);
int dbndTest(void);
int syncTest(void);
int arrTest(void);
int decTest(void);
void epicsRunFilterTests(void)
{
@@ -26,6 +27,7 @@ void epicsRunFilterTests(void)
runTest(dbndTest);
runTest(syncTest);
runTest(arrTest);
runTest(decTest);
dbmfFreeChunks();

View File

@@ -42,12 +42,14 @@ static int fl_equal(const db_field_log *pfl1, const db_field_log *pfl2) {
static void fl_setup(dbChannel *chan, db_field_log *pfl, long val) {
struct dbCommon *prec = dbChannelRecord(chan);
memset(pfl, 0, sizeof(db_field_log));
pfl->ctx = dbfl_context_event;
pfl->type = dbfl_type_val;
pfl->stat = prec->stat;
pfl->sevr = prec->sevr;
pfl->time = prec->time;
pfl->field_type = DBF_LONG;
pfl->field_size = sizeof(epicsInt32);
pfl->no_elements = 1;
/*
* use memcpy to avoid a bus error on
@@ -66,31 +68,92 @@ static void testHead (char* title) {
testDiag("--------------------------------------------------------");
}
static void mustDrop(dbChannel *pch, db_field_log *pfl2, char* m) {
db_field_log *pfl = dbChannelRunPreChain(pch, pfl2);
testOk(NULL == pfl, "filter drops field_log (%s)", m);
/*
* Use mustDrop() and mustPass() to test filters with no memory
* of previous field_log pointers.
*/
static void mustDrop(dbChannel *pch, db_field_log *pfl, char* m) {
int oldFree = db_available_logs();
db_field_log *pfl2 = dbChannelRunPreChain(pch, pfl);
int newFree = db_available_logs();
testOk(NULL == pfl2, "filter drops field_log (%s)", m);
testOk(newFree == oldFree + 1, "a field_log was freed - %d+1 => %d",
oldFree, newFree);
db_delete_field_log(pfl2);
}
static void mustPassTwice(dbChannel *pch, db_field_log *pfl2, char* m) {
db_field_log *pfl;
static void mustPass(dbChannel *pch, db_field_log *pfl, char* m) {
int oldFree = db_available_logs();
db_field_log *pfl2 = dbChannelRunPreChain(pch, pfl);
int newFree = db_available_logs();
testOk(pfl == pfl2, "filter passes field_log (%s)", m);
testOk(newFree == oldFree, "no field_logs were freed - %d => %d",
oldFree, newFree);
db_delete_field_log(pfl2);
}
/*
* Use mustStash() and mustSwap() to test filters that save
* field_log pointers and return them later.
*
* mustStash() expects the filter to save the current pointer
* (freeing any previously saved pointer) and return NULL.
* mustSwap() expects the filter to return the previously
* saved pointer and save the current pointer.
*/
static db_field_log *stashed;
static void streamReset(void) {
stashed = NULL;
}
static void mustStash(dbChannel *pch, db_field_log *pfl, char* m) {
int oldFree = db_available_logs();
db_field_log *pfl2 = dbChannelRunPreChain(pch, pfl);
int newFree = db_available_logs();
testOk(NULL == pfl2, "filter stashes field_log (%s)", m);
if (stashed) {
testOk(newFree == oldFree + 1, "a field_log was freed - %d+1 => %d",
oldFree, newFree);
}
else {
testOk(newFree == oldFree, "no field_logs were freed - %d => %d",
oldFree, newFree);
}
stashed = pfl;
db_delete_field_log(pfl2);
}
static void mustSwap(dbChannel *pch, db_field_log *pfl, char* m) {
int oldFree = db_available_logs();
db_field_log *pfl2 = dbChannelRunPreChain(pch, pfl);
int newFree = db_available_logs();
testOk(stashed == pfl2, "filter returns stashed field log (%s)", m);
testOk(newFree == oldFree, "no field_logs were freed - %d => %d",
oldFree, newFree);
stashed = pfl;
db_delete_field_log(pfl2);
}
static void mustPassTwice(dbChannel *pch, db_field_log *pfl, char* m) {
int oldFree = db_available_logs(), newFree;
db_field_log *pfl2;
testDiag("%s: filter must pass twice", m);
pfl = dbChannelRunPreChain(pch, pfl2);
pfl2 = dbChannelRunPreChain(pch, pfl);
testOk(pfl2 == pfl, "call 1 does not drop or replace field_log");
pfl = dbChannelRunPreChain(pch, pfl2);
pfl2 = dbChannelRunPreChain(pch, pfl);
testOk(pfl2 == pfl, "call 2 does not drop or replace field_log");
}
static void mustPassOld(dbChannel *pch, db_field_log *old, db_field_log *cur, char* m) {
db_field_log *pfl = dbChannelRunPreChain(pch, cur);
testOk(old == pfl, "filter passes previous field log (%s)", m);
}
static void mustPass(dbChannel *pch, db_field_log *cur, char* m) {
db_field_log *pfl = dbChannelRunPreChain(pch, cur);
testOk(cur == pfl, "filter passes field_log (%s)", m);
newFree = db_available_logs();
testOk(newFree == oldFree, "no field_logs were freed - %d => %d",
oldFree, newFree);
}
static void checkCtxRead(dbChannel *pch, dbStateId id) {
@@ -138,10 +201,10 @@ MAIN(syncTest)
const chFilterPlugin *plug;
char myname[] = "sync";
db_field_log *pfl[10];
int i;
int i, logsFree, logsFinal;
dbEventCtx evtctx;
testPlan(139);
testPlan(214);
testdbPrepare();
@@ -176,9 +239,14 @@ MAIN(syncTest)
testOk(!!(pch = dbChannelCreate("x.VAL{\"sync\":{\"m\":\"while\",\"s\":\"red\"}}")),
"dbChannel with plugin sync (m='while' s='red') created");
/* Start the free-list */
db_delete_field_log(db_create_read_log(pch));
logsFree = db_available_logs();
testDiag("%d field_logs on free-list", logsFree);
checkAndOpenChannel(pch, plug);
for (i = 0; i < 10; i++) {
for (i = 0; i < 9; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 120 + i);
}
@@ -198,11 +266,10 @@ MAIN(syncTest)
mustDrop(pch, pfl[7], "state=FALSE, log7");
mustDrop(pch, pfl[8], "state=FALSE, log8");
for (i = 0; i < 10; i++)
db_delete_field_log(pfl[i]);
dbChannelDelete(pch);
testDiag("%d field_logs on free-list", db_available_logs());
/* mode UNLESS */
testHead("Mode UNLESS (m='unless', s='red')");
@@ -211,7 +278,7 @@ MAIN(syncTest)
checkAndOpenChannel(pch, plug);
for (i = 0; i < 10; i++) {
for (i = 0; i < 9; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 120 + i);
}
@@ -231,11 +298,10 @@ MAIN(syncTest)
mustPass(pch, pfl[7], "state=FALSE, log7");
mustPass(pch, pfl[8], "state=FALSE, log8");
for (i = 0; i < 10; i++)
db_delete_field_log(pfl[i]);
dbChannelDelete(pch);
testDiag("%d field_logs on free-list", db_available_logs());
/* mode BEFORE */
testHead("Mode BEFORE (m='before', s='red')");
@@ -251,24 +317,25 @@ MAIN(syncTest)
testDiag("Test event stream");
streamReset();
dbStateClear(red);
mustDrop(pch, pfl[0], "state=FALSE, log0");
mustDrop(pch, pfl[1], "state=FALSE, log1");
mustDrop(pch, pfl[2], "state=FALSE, log2");
mustStash(pch, pfl[0], "state=FALSE, log0");
mustStash(pch, pfl[1], "state=FALSE, log1");
mustStash(pch, pfl[2], "state=FALSE, log2");
dbStateSet(red);
mustPassOld(pch, pfl[2], pfl[3], "state=TRUE, log3, pass=log2");
mustDrop(pch, pfl[4], "state=TRUE, log4");
mustDrop(pch, pfl[5], "state=TRUE, log5");
mustDrop(pch, pfl[6], "state=TRUE, log6");
mustSwap(pch, pfl[3], "state=TRUE, log3");
mustStash(pch, pfl[4], "state=TRUE, log4");
mustStash(pch, pfl[5], "state=TRUE, log5");
mustStash(pch, pfl[6], "state=TRUE, log6");
dbStateClear(red);
mustDrop(pch, pfl[7], "state=FALSE, log7");
mustDrop(pch, pfl[8], "state=FALSE, log8");
mustDrop(pch, pfl[9], "state=FALSE, log9");
db_delete_field_log(pfl[2]);
mustStash(pch, pfl[7], "state=FALSE, log7");
mustStash(pch, pfl[8], "state=FALSE, log8");
mustStash(pch, pfl[9], "state=FALSE, log9");
dbChannelDelete(pch);
testDiag("%d field_logs on free-list", db_available_logs());
/* mode FIRST */
testHead("Mode FIRST (m='first', s='red')");
@@ -277,13 +344,14 @@ MAIN(syncTest)
checkAndOpenChannel(pch, plug);
for (i = 0; i < 10; i++) {
for (i = 0; i < 9; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 120 + i);
}
testDiag("Test event stream");
streamReset();
dbStateClear(red);
mustDrop(pch, pfl[0], "state=FALSE, log0");
mustDrop(pch, pfl[1], "state=FALSE, log1");
@@ -297,11 +365,10 @@ MAIN(syncTest)
mustDrop(pch, pfl[7], "state=FALSE, log7");
mustDrop(pch, pfl[8], "state=FALSE, log8");
db_delete_field_log(pfl[3]);
db_delete_field_log(pfl[9]);
dbChannelDelete(pch);
testDiag("%d field_logs on free-list", db_available_logs());
/* mode LAST */
testHead("Mode LAST (m='last', s='red')");
@@ -317,24 +384,25 @@ MAIN(syncTest)
testDiag("Test event stream");
streamReset();
dbStateClear(red);
mustDrop(pch, pfl[0], "state=FALSE, log0");
mustDrop(pch, pfl[1], "state=FALSE, log1");
mustDrop(pch, pfl[2], "state=FALSE, log2");
mustStash(pch, pfl[0], "state=FALSE, log0");
mustStash(pch, pfl[1], "state=FALSE, log1");
mustStash(pch, pfl[2], "state=FALSE, log2");
dbStateSet(red);
mustDrop(pch, pfl[3], "state=TRUE, log3");
mustDrop(pch, pfl[4], "state=TRUE, log4");
mustDrop(pch, pfl[5], "state=TRUE, log5");
mustStash(pch, pfl[3], "state=TRUE, log3");
mustStash(pch, pfl[4], "state=TRUE, log4");
mustStash(pch, pfl[5], "state=TRUE, log5");
dbStateClear(red);
mustPassOld(pch, pfl[5], pfl[6], "state=TRUE, log6, pass=log5");
mustDrop(pch, pfl[7], "state=FALSE, log7");
mustDrop(pch, pfl[8], "state=FALSE, log8");
mustDrop(pch, pfl[9], "state=FALSE, log9");
db_delete_field_log(pfl[5]);
mustSwap(pch, pfl[6], "state=TRUE, log6");
mustStash(pch, pfl[7], "state=FALSE, log7");
mustStash(pch, pfl[8], "state=FALSE, log8");
mustStash(pch, pfl[9], "state=FALSE, log9");
dbChannelDelete(pch);
testDiag("%d field_logs on free-list", db_available_logs());
/* mode AFTER */
testHead("Mode AFTER (m='after', s='red')");
@@ -343,13 +411,14 @@ MAIN(syncTest)
checkAndOpenChannel(pch, plug);
for (i = 0; i < 10; i++) {
for (i = 0; i < 9; i++) {
pfl[i] = db_create_read_log(pch);
fl_setup(pch, pfl[i], 120 + i);
}
testDiag("Test event stream");
streamReset();
dbStateClear(red);
mustDrop(pch, pfl[0], "state=FALSE, log0");
mustDrop(pch, pfl[1], "state=FALSE, log1");
@@ -363,11 +432,11 @@ MAIN(syncTest)
mustDrop(pch, pfl[7], "state=FALSE, log7");
mustDrop(pch, pfl[8], "state=FALSE, log8");
db_delete_field_log(pfl[6]);
db_delete_field_log(pfl[9]);
dbChannelDelete(pch);
logsFinal = db_available_logs();
testOk(logsFree == logsFinal, "%d field_logs on free-list", logsFinal);
db_close_events(evtctx);
testIocShutdownOk();