34 Commits
4.6.0 ... 4.7.2

Author SHA1 Message Date
Andrew Johnson
8cf550ff57 Final commit for 4.7.2 2025-02-21 15:29:09 -06:00
Andrew Johnson
c070a3485b Merge pull request #82 from sveseli/changed-field-fix
Changed Field Fix
2024-07-09 10:04:14 -05:00
Sinisa Veseli
b31d5079bf updated release notes 2024-07-03 21:55:48 -05:00
Sinisa Veseli
7c78b15a5c added tests for changed set 2024-05-06 10:30:45 -05:00
Sinisa Veseli
cd7d8735af update master field tests 2024-04-30 13:09:59 -05:00
Sinisa Veseli
94b48e4893 do not proceed with pvcopy in master field callback unless master field was requested 2024-04-30 13:09:33 -05:00
Sinisa Veseli
9d94e95521 make sure only one record field has pointer to the master field; fix code indents to 4 spaces 2024-04-30 13:08:06 -05:00
Sinisa Veseli
e0d5925af3 add interface to check if master field was requested 2024-04-30 13:04:08 -05:00
Andrew Johnson
f207e512d6 Updates to the GHA build settings 2023-12-28 20:45:21 +00:00
Andrew Johnson
0a7d5b16aa Configure CI using ci-scripts for GHA and Appveyor 2023-12-28 14:05:15 +00:00
Andrew Johnson
aea8d9105e Delete now-unused CI configuration files 2023-12-27 22:02:28 +00:00
Andrew Johnson
d18e0c913a Set next development version 2023-12-13 17:12:22 -06:00
Andrew Johnson
e4cb8d2397 Set version numbers for release 2023-12-13 17:09:58 -06:00
Andrew Johnson
71b2a4bfe1 Update Release notes 2023-12-13 17:07:59 -06:00
Andrew Johnson
80703b6de1 Merge pull request #80 from sveseli/master
Static Build Fixes
2023-11-17 15:02:06 -06:00
Sinisa Veseli
203ee1b450 fix windows warnings 2023-11-17 14:23:10 -06:00
Sinisa Veseli
f43d50768e fix windows warnings 2023-11-17 14:05:30 -06:00
Sinisa Veseli
8ec8e47924 fix static build 2023-11-17 12:58:58 -06:00
Sinisa Veseli
d374669f04 include cctype, fixes Windows builds 2023-06-28 16:43:22 -05:00
Andrew Johnson
2f53d13021 Merge pull request #79 from sveseli/master
Data Distributor Plugin
2023-06-28 14:10:21 -05:00
Sinisa Veseli
00a8459944 address ANL feedback 2023-06-28 13:11:56 -05:00
Sinisa Veseli
91d0d2c315 added data distributor plugin documentation and updated release notes 2023-05-26 13:35:06 -05:00
Sinisa Veseli
c41f7cb3dc added data distributor plugin tests 2023-05-26 13:34:43 -05:00
Sinisa Veseli
dbf83dd017 added data distributor plugin 2023-05-26 13:34:34 -05:00
Andrew Johnson
1b787c5149 Set next development version 2022-09-07 12:14:21 -05:00
Andrew Johnson
ee3027f641 Update release notes and version numbers for release 2022-09-07 12:10:55 -05:00
Andrew Johnson
6cd0a1bdb0 Merge pull request #76 from sveseli/master
Resolve monitor overrun issue from PR #75.
2022-06-24 10:29:44 -05:00
Sinisa Veseli
0d85561481 this resolves issue with monitor overrun counter; master field listeners will be called only once, before calling listeners for the first subfield 2022-03-14 09:17:33 -05:00
Andrew Johnson
05d54c61e1 Merge pull request #75 from sveseli/master
Support requests to apply a pvCopy filter plugin to a whole PVRecord structure by using a field name `_` in the pvRequest.
2021-12-06 11:28:22 -06:00
Sinisa Veseli
182eee57e2 add tests for whole structure request 2021-12-01 14:36:50 -06:00
Sinisa Veseli
8daf322025 update module release notes 2021-12-01 14:36:20 -06:00
Sinisa Veseli
a68ef61a10 use _ instead of masterField to designate top level structure in the request 2021-11-30 10:32:38 -06:00
Sinisa Veseli
9dfebf1897 allow writing pvCopy plugins for the master field 2021-11-30 08:54:57 -06:00
Andrew Johnson
0cf706511e Set next development version 2021-07-02 15:52:05 -05:00
27 changed files with 1685 additions and 312 deletions

93
.appveyor.yml Normal file
View File

@@ -0,0 +1,93 @@
# .appveyor.yml for use with EPICS Base ci-scripts
# (see: https://github.com/epics-base/ci-scripts)
# This is YAML - indentation levels are crucial
cache:
- C:\Users\appveyor\.tools
#---------------------------------#
# repository cloning #
#---------------------------------#
init:
# Set autocrlf to make batch files work
- git config --global core.autocrlf true
clone_depth: 5
# Build Configurations: dll/static, regular/debug
configuration:
- dynamic
- static
- dynamic-debug
- static-debug
# Default OS Image
image: Visual Studio 2019
# Environment variables: compiler toolchain, base version, setup file, ...
environment:
# common / default variables for all jobs
SETUP_PATH: .ci-local:.ci
matrix:
- BASE: 3.15
CMP: vs2019
- BASE: 7.0
CMP: vs2019
- BASE: 7.0
CMP: gcc
- BASE: 7.0
CMP: vs2017
APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2017
# Platform: processor architecture
platform:
- x64
#---------------------------------#
# building & testing #
#---------------------------------#
install:
- cmd: git submodule update --init --recursive
- cmd: pip install git+https://github.com/mdavidsaver/ci-core-dumper#egg=ci-core-dumper
- cmd: python .ci/cue.py prepare
build_script:
- cmd: python .ci/cue.py build
test_script:
- cmd: python -m ci_core_dumper install
- cmd: python .ci/cue.py -T 20M test
on_finish:
- ps: Get-ChildItem *.tap -Recurse -Force | % { Push-AppveyorArtifact $_.FullName -FileName $_.Name }
- cmd: python .ci/cue.py -T 5M test-results
on_failure:
- cmd: python -m ci_core_dumper report
#---------------------------------#
# debugging #
#---------------------------------#
## if you want to connect by remote desktop to a failed build, uncomment these lines
## note that you will need to connect within the usual build timeout limit (60 minutes)
## so you may want to adjust the build matrix above to just build the one of interest
#on_failure:
# - ps: iex ((new-object net.webclient).DownloadString('https://raw.githubusercontent.com/appveyor/ci/master/scripts/enable-rdp.ps1'))
# - ps: $blockRdp = $true; iex ((new-object net.webclient).DownloadString('https://raw.githubusercontent.com/appveyor/ci/master/scripts/enable-rdp.ps1'))
#---------------------------------#
# notifications #
#---------------------------------#
notifications:
- provider: Email
to:
- core-talk@aps.anl.gov
on_build_success: false
- provider: GitHubPullRequest

1
.ci Submodule

Submodule .ci added at 130e88b709

14
.ci-local/defaults.set Normal file
View File

@@ -0,0 +1,14 @@
# EPICS Base
BASE_DIRNAME=base
BASE_REPONAME=epics-base
BASE_REPOOWNER=epics-base
BASE_VARNAME=EPICS_BASE
BASE_RECURSIVE=NO
MODULES=PVDATA PVACCESS
PVDATA_REPONAME=pvDataCPP
PVDATA_REPOOWNER=epics-base
PVACCESS_REPONAME=pvAccessCPP
PVACCESS_REPOOWNER=epics-base

View File

@@ -1,10 +0,0 @@
#!/bin/sh
set -e -x
make -j2 $EXTRA
if [ "$TEST" != "NO" ]
then
make -j2 tapfiles
make -j2 -s test-results
fi

View File

@@ -1,109 +0,0 @@
#!/bin/sh
set -e -x
CURDIR="$PWD"
cat << EOF > $CURDIR/configure/RELEASE.local
EPICS_BASE=$HOME/.source/epics-base
EOF
install -d "$HOME/.source"
cd "$HOME/.source"
add_gh_flat() {
MODULE=$1
REPOOWNER=$2
REPONAME=$3
BRANCH=$4
MODULE_UC=$(echo $MODULE | tr 'a-z' 'A-Z')
( git clone --quiet --depth 5 --branch $BRANCH https://github.com/$REPOOWNER/$REPONAME.git $MODULE && \
cd $MODULE && git log -n1 )
cat < $CURDIR/configure/RELEASE.local > $MODULE/configure/RELEASE.local
cat << EOF >> $CURDIR/configure/RELEASE.local
${MODULE_UC}=$HOME/.source/$MODULE
EOF
}
# not recursive
git clone --quiet --depth 5 --branch "$BRBASE" https://github.com/${REPOBASE:-epics-base}/epics-base.git epics-base
(cd epics-base && git log -n1 )
add_gh_flat pvData ${REPOPVD:-epics-base} pvDataCPP ${BRPVD:-master}
add_gh_flat pvAccess ${REPOPVA:-epics-base} pvAccessCPP ${BRPVA:-master}
if [ -e $CURDIR/configure/RELEASE.local ]
then
cat $CURDIR/configure/RELEASE.local
fi
EPICS_HOST_ARCH=`sh epics-base/startup/EpicsHostArch`
# requires wine and g++-mingw-w64-i686
if [ "$WINE" = "32" ]
then
echo "Cross mingw32"
sed -i -e '/CMPLR_PREFIX/d' epics-base/configure/os/CONFIG_SITE.linux-x86.win32-x86-mingw
cat << EOF >> epics-base/configure/os/CONFIG_SITE.linux-x86.win32-x86-mingw
CMPLR_PREFIX=i686-w64-mingw32-
EOF
cat << EOF >> epics-base/configure/CONFIG_SITE
CROSS_COMPILER_TARGET_ARCHS+=win32-x86-mingw
EOF
fi
if [ "$STATIC" = "YES" ]
then
echo "Build static libraries/executables"
cat << EOF >> epics-base/configure/CONFIG_SITE
SHARED_LIBRARIES=NO
STATIC_BUILD=YES
EOF
fi
case "$CMPLR" in
clang)
echo "Host compiler is clang"
cat << EOF >> epics-base/configure/os/CONFIG_SITE.Common.$EPICS_HOST_ARCH
GNU = NO
CMPLR_CLASS = clang
CC = clang
CCC = clang++
EOF
# hack
sed -i -e 's/CMPLR_CLASS = gcc/CMPLR_CLASS = clang/' epics-base/configure/CONFIG.gnuCommon
clang --version
;;
*)
echo "Host compiler is default"
gcc --version
;;
esac
cat <<EOF >> epics-base/configure/CONFIG_SITE
USR_CPPFLAGS += $USR_CPPFLAGS
USR_CFLAGS += $USR_CFLAGS
USR_CXXFLAGS += $USR_CXXFLAGS
EOF
# set RTEMS to eg. "4.9" or "4.10"
# requires qemu, bison, flex, texinfo, install-info
if [ -n "$RTEMS" ]
then
echo "Cross RTEMS${RTEMS} for pc386"
curl -L "https://github.com/mdavidsaver/rsb/releases/download/20171203-${RTEMS}/i386-rtems${RTEMS}-trusty-20171203-${RTEMS}.tar.bz2" \
| tar -C / -xmj
sed -i -e '/^RTEMS_VERSION/d' -e '/^RTEMS_BASE/d' epics-base/configure/os/CONFIG_SITE.Common.RTEMS
cat << EOF >> epics-base/configure/os/CONFIG_SITE.Common.RTEMS
RTEMS_VERSION=$RTEMS
RTEMS_BASE=$HOME/.rtems
EOF
cat << EOF >> epics-base/configure/CONFIG_SITE
CROSS_COMPILER_TARGET_ARCHS += RTEMS-pc386-qemu
EOF
fi
make -j2 -C epics-base $EXTRA
make -j2 -C pvData $EXTRA
make -j2 -C pvAccess $EXTRA

163
.github/workflows/ci-scripts-build.yml vendored Normal file
View File

@@ -0,0 +1,163 @@
# .github/workflows/ci-scripts-build.yml for use with EPICS Base ci-scripts
# (see: https://github.com/epics-base/ci-scripts)
# This is YAML - indentation levels are crucial
# Workflow name
name: Base
# Trigger on pushes and PRs to any branch
on:
push:
paths-ignore:
- 'documentation/*'
- '.*.yml'
pull_request:
paths-ignore:
- 'documentation/*'
- '.*.yml'
env:
SETUP_PATH: .ci-local:.ci
EPICS_TEST_IMPRECISE_TIMING: YES
jobs:
build-base:
name: ${{ matrix.name }}
runs-on: ${{ matrix.os }}
# Set environment variables from matrix parameters
env:
BASE: ${{ matrix.base }}
CMP: ${{ matrix.cmp }}
BCFG: ${{ matrix.configuration }}
CI_CROSS_TARGETS: ${{ matrix.cross }}
EXTRA: ${{ matrix.extra }}
TEST: ${{ matrix.test }}
strategy:
fail-fast: false
matrix:
# Job names also name artifacts, character limitations apply
include:
- os: ubuntu-20.04
cmp: gcc
configuration: default
base: "7.0"
cross: "windows-x64-mingw"
name: "7.0 Ub-20 gcc-9 + MinGW"
- os: ubuntu-20.04
cmp: gcc
configuration: static
base: "7.0"
cross: "windows-x64-mingw"
name: "7.0 Ub-20 gcc-9 + MinGW, static"
- os: ubuntu-20.04
cmp: gcc
configuration: static
base: "7.0"
extra: "CMD_CXXFLAGS=-std=c++11"
name: "7.0 Ub-20 gcc-9 C++11, static"
- os: ubuntu-20.04
cmp: clang
configuration: default
base: "7.0"
name: "7.0 Ub-20 clang-10"
- os: ubuntu-20.04
cmp: clang
configuration: default
base: "7.0"
extra: "CMD_CXXFLAGS=-std=c++11"
name: "7.0 Ub-20 clang-10 C++11"
- os: ubuntu-20.04
cmp: gcc
configuration: default
base: "7.0"
cross: "RTEMS-pc686-qemu@5"
name: "7.0 Ub-20 gcc-9 + RT-5.1 pc686"
- os: ubuntu-20.04
cmp: gcc
configuration: default
base: "7.0"
cross: "RTEMS-pc386-qemu@4.10"
test: NO
name: "7.0 Ub-20 gcc-9 + RT-4.10"
- os: ubuntu-20.04
cmp: gcc
configuration: default
base: "7.0"
cross: "RTEMS-pc386-qemu@4.9"
name: "7.0 Ub-20 gcc-9 + RT-4.9"
- os: macos-latest
cmp: clang
configuration: default
base: "7.0"
name: "7.0 MacOS clang-12"
- os: windows-2019
cmp: vs2019
configuration: default
base: "7.0"
name: "7.0 Win2019 MSC-19"
extra: "CMD_CXXFLAGS=-analysis"
- os: windows-2019
cmp: vs2019
configuration: static
base: "7.0"
name: "7.0 Win2019 MSC-19, static"
extra: "CMD_CXXFLAGS=-analysis"
- os: windows-2019
cmp: vs2019
configuration: debug
base: "7.0"
name: "7.0 Win2019 MSC-19, debug"
- os: windows-2019
cmp: gcc
configuration: default
base: "7.0"
name: "7.0 Win2019 mingw"
- os: ubuntu-20.04
cmp: gcc
configuration: default
base: "3.15"
wine: "64"
name: "3.15 Ub-20 gcc-9 + MinGW"
steps:
- uses: actions/checkout@v3
with:
submodules: true
- name: Automatic core dump analysis
uses: mdavidsaver/ci-core-dumper@master
- name: "apt-get install"
run: |
sudo apt-get update
sudo apt-get -y install qemu-system-x86 g++-mingw-w64-x86-64 gdb
if: runner.os == 'Linux'
- name: Prepare and compile dependencies
run: python .ci/cue.py prepare
- name: Build main module
run: python .ci/cue.py build
- name: Run main module tests
run: python .ci/cue.py -T 20M test
- name: Upload tapfiles Artifact
if: ${{ always() }}
uses: actions/upload-artifact@v3
with:
name: tapfiles ${{ matrix.name }}
path: '**/O.*/*.tap'
if-no-files-found: ignore
- name: Collect and show test results
if: ${{ always() }}
run: python .ci/cue.py -T 5M test-results

3
.gitmodules vendored Normal file
View File

@@ -0,0 +1,3 @@
[submodule ".ci"]
path = .ci
url = https://github.com/epics-base/ci-scripts

View File

@@ -1,31 +0,0 @@
sudo: false
dist: trusty
language: c++
compiler:
- gcc
addons:
apt:
packages:
- libreadline6-dev
- libncurses5-dev
- perl
- clang
- g++-mingw-w64-i686
- qemu-system-x86
install:
- ./.ci/travis-prepare.sh
script:
- ./.ci/travis-build.sh
env:
- BRBASE=7.0
- BRBASE=7.0 CMPLR=clang
- BRBASE=7.0 EXTRA="CMD_CXXFLAGS=-std=c++98"
- BRBASE=7.0 EXTRA="CMD_CXXFLAGS=-std=c++11"
- BRBASE=7.0 CMPLR=clang EXTRA="CMD_CXXFLAGS=-std=c++11"
- BRBASE=7.0 WINE=32 TEST=NO STATIC=YES
- BRBASE=7.0 WINE=32 TEST=NO STATIC=NO
- BRBASE=7.0 RTEMS=4.10 TEST=NO
- BRBASE=7.0 RTEMS=4.9 TEST=NO
- BRBASE=3.16
- BRBASE=3.15

View File

@@ -38,7 +38,7 @@ PROJECT_NAME = pvDatabaseCPP
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 4.6.0
PROJECT_NUMBER = 4.7.2
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

View File

@@ -1,8 +1,8 @@
# Version number for the PV Database API and shared library
EPICS_PVDATABASE_MAJOR_VERSION = 4
EPICS_PVDATABASE_MINOR_VERSION = 6
EPICS_PVDATABASE_MAINTENANCE_VERSION = 0
EPICS_PVDATABASE_MINOR_VERSION = 7
EPICS_PVDATABASE_MAINTENANCE_VERSION = 2
# Development flag, set to zero for release versions

View File

@@ -2,6 +2,49 @@
This document summarizes the changes to the module between releases.
## Release 4.7.2 (EPICS 7.0.9, Feb 2025)
* Resolved issue with changed field set in the case where the top level (master)
field ("_") is not requested by the client, but the master field callback causes
all fields to be marked as updated, rather than only those fields that have
actually been modified.
## Release 4.7.1 (EPICS 7.0.8, Dec 2023)
* Added data distributor plugin which can be used for distributing data between
a group of clients. The plugin is triggered by the request string of the
form:
`_[distributor=group:<group id>;set:<set_id>;trigger:<field_name>;updates:<n_updates>;mode:<update_mode>]`
The plugin parameters are optional and are described bellow:
- group: this parameter indicates a group that client application belongs to (default value: "default"); groups of clients are completely independent of each other
- set: this parameter designates a client set that application belongs to within its group (default value: "default")
- trigger: this is the PV structure field that distinguishes different channel updates (default value: "timeStamp"); for example, for area detector images one could use the "uniqueId" field of the NTND structure
- updates: this parameter configures how many sequential updates a client (or a set of clients) will receive before the data distributor starts updating the next one (default value: "1")
- mode: this parameter configures how channel updates are to be distributed between clients in a set:
- one: update goes to one client per set
- all: update goes to all clients in a set
- default is "one" if client set id is not specified, and "all" if set id is specified
For more information and examples of usage see the [plugin documentation](dataDistributorPlugin.md).
## Release 4.7.0 (EPICS 7.0.7, Sep 2022)
* Added support for the whole structure (master field) server side plugins.
The whole structure is identified as the `_` string, and a pvRequest string
that applies a plugin to it takes the form:
`field(_[XYZ=A:3;B:uniqueId])`
where `XYZ` is the name of a specific filter plugin that takes parameters
`A` and `B` with values `3` and `uniqueId` respectively.
## Release 4.6.0 (EPICS 7.0.6, Jul 2021)
* Access Security is now supported.

View File

@@ -0,0 +1,233 @@
# Data Distributor Plugin
The data distributor plugin enables distribution of channel data between
multiple client applications. The plugin considers two basic use cases
for a group of clients:
- For simple parallel processing where client applications do not need
to share data all clients in a group receive n sequential updates
in a round-robin fashion: client \#1 sees the first n updates, client \#2 the
second n updates, and so on.
- For data analysis where several cooperating client applications must all
see the same data in order to process it the applications are grouped
into sets, and each set of clients receives the same number of sequential
updates. The first n updates are sent to all members of client set #1, the second n updates are sent to all members of client set #2, and so on.
## Requirements
This plugin relies on the pvDatabase plugin framework and requires
epics base version > 7.0.7
## Usage
The PV request object which triggers plugin instantiation is defined below:
```
"_[distributor=group:<group id>;set:<set_id>;trigger:<field_name>;updates:<n_updates>;mode:<update_mode>]"
```
The underscore character at the begining of the PV request object
indicates that the data distributor will be targeting entire PV structure.
The same PV request object format should work regardless of the language
in which a particular client application is written.
The plugin parameters are the following:
- `group:<group_id>`: specifying a `group_id` names a group the client application belongs to (default value: `default`); clients with different group names are
completely independent of each other
- `set:<set_id>`: this parameter designates a client set that application belongs to within its group (default value: `default`)
- `trigger:<field_name>`: this is the PV structure field that distinguishes
different channel updates (default value: `timeStamp`); for example,
for area detector images one could use the `uniqueId` field of the NTND
structure
- `updates:<n_updates>`: this parameter must be an integer and configures how many sequential updates a client (or a set of clients) will receive before the data distributor starts updating the next one (default value: `1`)
- `mode:<update_mode>`: this parameter configures how channel updates are to be
distributed between clients in a set:
- `one`: update goes to one client per set
- `all`: update goes to all clients in a set
- default is `one` if client set id is not specified, and `all` if set
id is specified
The plugin obeys the following rules:
- Parameter names are case insensitive, but the string values
are not. For example, "group=abc" and "group=ABC" would indicate two
different groups of clients. String values allow alphanumeric characters,
as well as dashes and underscores.
- Updates for a set of clients are configured when the first client in
the set requests data. Configuration values (i.e., "trigger",
"updates", and "mode"), passed in the PV request by the subsequent
clients are ignored.
- A set is removed from the group once the last client in that
set disconnects.
- A group is removed from the distributor plugin once all of its
clients have disconnected.
- Different client groups are completely independent of each other.
In other words, channel updates sent to clients belonging to
group A do not interfere with updates sent to clients
belonging to group B.
- The order in which clients and groups receive data is on a
"first connected, first served basis".
- The current channel PV object is always distributed to a client on an
initial connect.
- Data distribution is dynamic with respect to the number of clients.
As clients connect and disconnect, the data distribution in a group adjusts
accordingly. For example, with a group of clients configured to
distribute one sequential update to each client, three clients would each be
receiving every third update; after client number four connects, all
clients would start receiving every fourth update; if one of those then
disconnects, remaining three clients would again be receiving every third
update.
## Examples
For all examples below we assume that PVDatabase server is serving
area detector images on the channel 'image'. All clients are started before
the server itself, and the initial (empty) object has unique id of 0.
### Example 1
This example show behavior of three clients that belong to the same (default)
group. Each client receives one sequential update in a round-robin fashion.
Note that all clients received current object on initial connection,
and every third object afterward:
Client 1:
```
$ pvget -m -r _[distributor=trigger:uniqueId] image | grep uniqueId
int uniqueId 0
int uniqueId 1
int uniqueId 4
int uniqueId 7
int uniqueId 10
```
Client 2:
```
$ pvget -m -r _[distributor=trigger:uniqueId] image | grep uniqueId
int uniqueId 0
int uniqueId 2
int uniqueId 5
int uniqueId 8
int uniqueId 11
```
Client 3:
```
$ pvget -m -r _[distributor=trigger:uniqueId] image | grep uniqueId
int uniqueId 0
int uniqueId 3
int uniqueId 6
int uniqueId 9
int uniqueId 12
```
### Example 2
In this example we have two sets of two clients, each client set receiving
three sequential updates. Both clients from client set \#1 receive updates
(1,2,3), both clients from client set \#2 receive updates (4,5,6),
client set \#1 receives updates (7,8,9), and so on.
Client 1 and Client 2/Set 1:
```
$ pvget -m -r "_[distributor=set:S1;trigger:uniqueId;updates:3]" image | grep uniqueId
int uniqueId 0
int uniqueId 1
int uniqueId 2
int uniqueId 3
int uniqueId 7
int uniqueId 8
int uniqueId 9
int uniqueId 13
int uniqueId 14
int uniqueId 15
```
Client 3 and Client 4/Set 2:
```
$ pvget -m -r "_[distributor=set:S2;trigger:uniqueId;updates:3]" image | grep uniqueId
int uniqueId 0
int uniqueId 4
int uniqueId 5
int uniqueId 6
int uniqueId 10
int uniqueId 11
int uniqueId 12
int uniqueId 16
int uniqueId 17
int uniqueId 18
```
### Example 3
This example illustrates what happens when multiple independent groups of
clients connect to the same channel. Group G1 has two clients belonging
to the same default set, and requesting one sequential update per client, while
Group G2 has two clients in the default set requesting three
sequential updates per client.
In this case the first client in group G1 receives updates
(1,3,5,...), while the second one receives updates (2,4,6,...). On the
other hand, the first client in group G2 receives updates
(1,2,3,7,8,9,...), while the second one receives updates (4,5,6,10,11,12,...).
Client 1/Group G1:
```
$ pvget -m -r "_[distributor=group:G1;trigger:uniqueId]" image | grep uniqueId
int uniqueId 0
int uniqueId 1
int uniqueId 3
int uniqueId 5
int uniqueId 7
int uniqueId 9
```
Client 2/Group G1:
```
pvget -m -r "_[distributor=group:G1;trigger:uniqueId]" image | grep uniqueId
int uniqueId 0
int uniqueId 2
int uniqueId 4
int uniqueId 6
int uniqueId 8
```
Client 1/Group G2:
```
$ pvget -m -r "_[distributor=group:G2;trigger:uniqueId;updates:3]" image | grep uniqueId
int uniqueId 0
int uniqueId 1
int uniqueId 2
int uniqueId 3
int uniqueId 7
int uniqueId 8
int uniqueId 9
```
Client 2/Group G2:
```
$ pvget -m -r "_[distributor=group:G2;trigger:uniqueId;updates:3]" image | grep uniqueId
int uniqueId 0
int uniqueId 4
int uniqueId 5
int uniqueId 6
int uniqueId 10
int uniqueId 11
int uniqueId 12
```
The above shows that the two client groups do not interfere with each other.

View File

@@ -1,79 +0,0 @@
# pvDatabase C++ implementation
# Jenkins @ Cloudbees build script
#
# Jenkins invokes scripts with the "-ex" option. So the build is considered a failure
# if any of the commands exits with a non-zero exit code.
#
# Author: Ralph Lange <ralph.lange@gmx.de>
# Copyright (C) 2014 Helmholtz-Zentrum Berlin für Materialien und Energie GmbH
# Copyright (C) 2014-2016 ITER Organization.
# All rights reserved. Use is subject to license terms.
installTool () {
local module=$1
local version=$2
wget -nv https://openepics.ci.cloudbees.com/job/${module}-${version}_Build/lastSuccessfulBuild/artifact/${module,,}-${version}.CB-dist.tar.gz
tar -xzf ${module,,}-${version}.CB-dist.tar.gz
}
installE4 () {
local module=$1
local branch=$2
wget -nv https://openepics.ci.cloudbees.com/job/e4-cpp-${module}-${branch}-build/BASE=${BASE}/lastSuccessfulBuild/artifact/${module}.CB-dist.tar.gz
tar -xzf ${module}.CB-dist.tar.gz
}
###########################################
# Defaults for EPICS Base
DEFAULT_BASE=3.15.4
BASE=${BASE:-${DEFAULT_BASE}}
###########################################
# Dependent module branches
PVDATA_BRANCH="master"
PVACCESS_BRANCH="master"
###########################################
# Fetch and unpack dependencies
export STUFF=/tmp/stuff
rm -fr ${STUFF}
mkdir -p ${STUFF}
cd ${STUFF}
installTool Boost 1.61.0
installTool Base ${BASE}
installE4 pvData ${PVDATA_BRANCH}
installE4 pvAccess ${PVACCESS_BRANCH}
###########################################
# Build
cd ${WORKSPACE}
export EPICS_BASE=${STUFF}
export EPICS_HOST_ARCH=$(${EPICS_BASE}/startup/EpicsHostArch)
export LD_LIBRARY_PATH=${EPICS_BASE}/lib/${EPICS_HOST_ARCH}
export PATH=${STUFF}/bin:${PATH}
cat > configure/RELEASE.local << EOF
EPICS_BASE=${EPICS_BASE}
EOF
make distclean all
###########################################
# Test
make runtests
###########################################
# Create cache
tar -czf pvDatabase.CB-dist.tar.gz lib include dbd LICENSE

View File

@@ -1,66 +0,0 @@
# pvDatabase C++ implementation
# Jenkins @ Cloudbees documentation generation and deployment
#
# Jenkins invokes scripts with the "-ex" option. So the build is considered a failure
# if any of the commands exits with a non-zero exit code.
#
# Author: Ralph Lange <ralph.lange@gmx.de>
# Copyright (C) 2014 Helmholtz-Zentrum Berlin für Materialien und Energie GmbH
# Copyright (C) 2014-2016 ITER Organization.
# All rights reserved. Use is subject to license terms.
installTool () {
local module=$1
local version=$2
wget -nv https://openepics.ci.cloudbees.com/job/${module}-${version}_Build/lastSuccessfulBuild/artifact/${module,,}-${version}.CB-dist.tar.gz
tar -xzf ${module,,}-${version}.CB-dist.tar.gz
}
installE4 () {
local module=$1
local branch=$2
wget -nv https://openepics.ci.cloudbees.com/job/e4-cpp-${module}-${branch}-build/BASE=${BASE}/lastSuccessfulBuild/artifact/${module}.CB-dist.tar.gz
tar -xzf ${module}.CB-dist.tar.gz
}
###########################################
# Defaults for EPICS Base and parameters
BASE=3.15.4
PUBLISH=${PUBLISH:-NO}
BRANCH=${BRANCH:-master}
###########################################
# Fetch and unpack dependencies
export STUFF=/tmp/stuff
rm -fr ${STUFF}
mkdir -p ${STUFF}
cd ${STUFF}
installTool Doxygen 1.8.11
###########################################
# Generate
cd ${WORKSPACE}
installE4 pvDatabase ${BRANCH}
export PATH=${STUFF}/bin:${PATH}
doxygen
###########################################
# Publish
if [ "${PUBLISH}" != "DONT" ]; then
# Upload explicit dummy to ensure target directory exists
echo "Created by CloudBees Jenkins upload job. Should be deleted as part of the job." > DUMMY
rsync -q -e ssh DUMMY epics-jenkins@web.sourceforge.net:/home/project-web/epics-pvdata/htdocs/docbuild/pvDatabaseCPP/${PUBLISH}/
rsync -aqP --delete -e ssh documentation epics-jenkins@web.sourceforge.net:/home/project-web/epics-pvdata/htdocs/docbuild/pvDatabaseCPP/${PUBLISH}/
fi

View File

@@ -8,5 +8,6 @@ LIBSRCS += pvCopy.cpp
LIBSRCS += pvArrayPlugin.cpp
LIBSRCS += pvDeadbandPlugin.cpp
LIBSRCS += pvTimestampPlugin.cpp
LIBSRCS += dataDistributorPlugin.cpp

View File

@@ -0,0 +1,428 @@
// Copyright information and license terms for this software can be
// found in the file LICENSE that is included with the distribution
#include <stdlib.h>
#include <cctype>
#include <string>
#include <algorithm>
#include <pv/lock.h>
#include <pv/pvData.h>
#include <pv/bitSet.h>
#define epicsExportSharedSymbols
#include "pv/dataDistributorPlugin.h"
using std::string;
using std::size_t;
using std::endl;
using std::tr1::static_pointer_cast;
using std::vector;
using namespace epics::pvData;
namespace epvd = epics::pvData;
namespace epics { namespace pvCopy {
// Utilities for manipulating strings
static std::string leftTrim(const std::string& s)
{
unsigned int i;
unsigned int n = (unsigned int)s.length();
for (i = 0; i < n; i++) {
if (!isspace(s[i])) {
break;
}
}
return s.substr(i,n-i);
}
static std::string rightTrim(const std::string& s)
{
unsigned int i;
unsigned int n = (unsigned int)s.length();
for (i = n; i > 0; i--) {
if (!isspace(s[i-1])) {
break;
}
}
return s.substr(0,i);
}
static std::string trim(const std::string& s)
{
return rightTrim(leftTrim(s));
}
static std::vector<std::string>& split(const std::string& s, char delimiter, std::vector<std::string>& elements)
{
std::stringstream ss(s);
std::string item;
while (std::getline(ss, item, delimiter)) {
elements.push_back(trim(item));
}
return elements;
}
static std::vector<std::string> split(const std::string& s, char delimiter)
{
std::vector<std::string> elements;
split(s, delimiter, elements);
return elements;
}
static std::string toLowerCase(const std::string& input)
{
std::stringstream ss;
for (unsigned int i = 0; i < input.size(); i++) {
char c = std::tolower(input.at(i));
ss << c;
}
return ss.str();
}
// Data distributor class
static std::string name("distributor");
bool DataDistributorPlugin::initialized(false);
std::map<std::string, DataDistributorPtr> DataDistributor::dataDistributorMap;
epics::pvData::Mutex DataDistributor::dataDistributorMapMutex;
DataDistributorPtr DataDistributor::getInstance(const std::string& groupId)
{
epvd::Lock lock(dataDistributorMapMutex);
std::map<std::string,DataDistributorPtr>::iterator ddit = dataDistributorMap.find(groupId);
if (ddit != dataDistributorMap.end()) {
DataDistributorPtr ddPtr = ddit->second;
return ddPtr;
}
else {
DataDistributorPtr ddPtr(new DataDistributor(groupId));
dataDistributorMap[groupId] = ddPtr;
return ddPtr;
}
}
void DataDistributor::removeUnusedInstance(DataDistributorPtr dataDistributorPtr)
{
epvd::Lock lock(dataDistributorMapMutex);
std::string groupId = dataDistributorPtr->getGroupId();
std::map<std::string,DataDistributorPtr>::iterator ddit = dataDistributorMap.find(groupId);
if (ddit != dataDistributorMap.end()) {
DataDistributorPtr ddPtr = ddit->second;
size_t nSets = ddPtr->clientSetMap.size();
if (nSets == 0) {
dataDistributorMap.erase(ddit);
}
}
}
DataDistributor::DataDistributor(const std::string& groupId_)
: groupId(groupId_)
, mutex()
, clientSetMap()
, clientSetIdList()
, currentSetIdIter(clientSetIdList.end())
, lastUpdateValue()
{
}
DataDistributor::~DataDistributor()
{
epvd::Lock lock(mutex);
clientSetMap.clear();
clientSetIdList.clear();
}
std::string DataDistributor::addClient(int clientId, const std::string& setId, const std::string& triggerField, int nUpdatesPerClient, int updateMode)
{
epvd::Lock lock(mutex);
std::map<std::string,ClientSetPtr>::iterator git = clientSetMap.find(setId);
if (git != clientSetMap.end()) {
ClientSetPtr setPtr = git->second;
setPtr->clientIdList.push_back(clientId);
return setPtr->triggerField;
}
else {
ClientSetPtr setPtr(new ClientSet(setId, triggerField, nUpdatesPerClient, updateMode));
setPtr->clientIdList.push_back(clientId);
clientSetMap[setId] = setPtr;
clientSetIdList.push_back(setId);
return triggerField;
}
}
void DataDistributor::removeClient(int clientId, const std::string& setId)
{
epvd::Lock lock(mutex);
std::map<std::string,ClientSetPtr>::iterator git = clientSetMap.find(setId);
if (git != clientSetMap.end()) {
ClientSetPtr setPtr = git->second;
std::list<int>::iterator cit = std::find(setPtr->clientIdList.begin(), setPtr->clientIdList.end(), clientId);
if (cit != setPtr->clientIdList.end()) {
// If we are removing current client id, advance iterator
if (cit == setPtr->currentClientIdIter) {
setPtr->currentClientIdIter++;
}
// Find current client id
int currentClientId = -1;
if (setPtr->currentClientIdIter != setPtr->clientIdList.end()) {
currentClientId = *(setPtr->currentClientIdIter);
}
// Remove client id from the list
setPtr->clientIdList.erase(cit);
// Reset current client id iterator
setPtr->currentClientIdIter = setPtr->clientIdList.end();
if (currentClientId >= 0) {
std::list<int>::iterator cit2 = std::find(setPtr->clientIdList.begin(), setPtr->clientIdList.end(), currentClientId);
if (cit2 != setPtr->clientIdList.end()) {
setPtr->currentClientIdIter = cit2;
}
}
}
if (setPtr->clientIdList.size() == 0) {
clientSetMap.erase(git);
std::list<std::string>::iterator git2 = std::find(clientSetIdList.begin(), clientSetIdList.end(), setId);
if (git2 == currentSetIdIter) {
currentSetIdIter++;
}
if (git2 != clientSetIdList.end()) {
clientSetIdList.erase(git2);
}
}
}
}
bool DataDistributor::updateClient(int clientId, const std::string& setId, const std::string& triggerFieldValue)
{
epvd::Lock lock(mutex);
bool proceedWithUpdate = false;
if (currentSetIdIter == clientSetIdList.end()) {
currentSetIdIter = clientSetIdList.begin();
}
std::string currentSetId = *currentSetIdIter;
if (setId != currentSetId) {
// We are not distributing data to this set at the moment
return proceedWithUpdate;
}
ClientSetPtr setPtr = clientSetMap[currentSetId];
if (setPtr->currentClientIdIter == setPtr->clientIdList.end()) {
// Move current client iterator to the beginning of the list
setPtr->currentClientIdIter = setPtr->clientIdList.begin();
}
if (lastUpdateValue == triggerFieldValue) {
// This update was already distributed.
return proceedWithUpdate;
}
switch (setPtr->updateMode) {
case(DD_UPDATE_ONE_PER_GROUP): {
if (clientId != *(setPtr->currentClientIdIter)) {
// Not this client's turn.
return proceedWithUpdate;
}
proceedWithUpdate = true;
lastUpdateValue = triggerFieldValue;
setPtr->lastUpdateValue = triggerFieldValue;
setPtr->updateCounter++;
if (setPtr->updateCounter >= setPtr->nUpdatesPerClient) {
// This client and set are done.
setPtr->currentClientIdIter++;
setPtr->updateCounter = 0;
currentSetIdIter++;
}
break;
}
case(DD_UPDATE_ALL_IN_GROUP): {
proceedWithUpdate = true;
static unsigned int nClientsUpdated = 0;
if (setPtr->lastUpdateValue != triggerFieldValue) {
setPtr->lastUpdateValue = triggerFieldValue;
setPtr->updateCounter++;
nClientsUpdated = 0;
}
nClientsUpdated++;
if (nClientsUpdated == setPtr->clientIdList.size() && setPtr->updateCounter >= setPtr->nUpdatesPerClient) {
// This set is done.
lastUpdateValue = triggerFieldValue;
setPtr->updateCounter = 0;
currentSetIdIter++;
}
break;
}
default: {
proceedWithUpdate = true;
}
}
return proceedWithUpdate;
}
DataDistributorPlugin::DataDistributorPlugin()
{
}
DataDistributorPlugin::~DataDistributorPlugin()
{
}
void DataDistributorPlugin::create()
{
initialize();
}
bool DataDistributorPlugin::initialize()
{
if (!initialized) {
initialized = true;
DataDistributorPluginPtr pvPlugin = DataDistributorPluginPtr(new DataDistributorPlugin());
PVPluginRegistry::registerPlugin(name,pvPlugin);
}
return true;
}
PVFilterPtr DataDistributorPlugin::create(
const std::string& requestValue,
const PVCopyPtr& pvCopy,
const PVFieldPtr& master)
{
return DataDistributorFilter::create(requestValue,pvCopy,master);
}
DataDistributorFilter::~DataDistributorFilter()
{
dataDistributorPtr->removeClient(clientId, setId);
DataDistributor::removeUnusedInstance(dataDistributorPtr);
}
DataDistributorFilterPtr DataDistributorFilter::create(
const std::string& requestValue,
const PVCopyPtr& pvCopy,
const PVFieldPtr& master)
{
static int clientId = 0;
clientId++;
std::vector<std::string> configItems = split(requestValue, ';');
// Use lowercase keys if possible.
std::string requestValue2 = toLowerCase(requestValue);
std::vector<std::string> configItems2 = split(requestValue2, ';');
int nUpdatesPerClient = 1;
int updateMode = DataDistributor::DD_UPDATE_ONE_PER_GROUP;
std::string groupId = "default";
std::string setId = "default";
std::string triggerField = "timeStamp";
bool hasUpdateMode = false;
bool hasSetId = false;
for(unsigned int i = 0; i < configItems2.size(); i++) {
std::string configItem2 = configItems2[i];
size_t ind = configItem2.find(':');
if (ind == string::npos) {
continue;
}
if(configItem2.find("updates") == 0) {
std::string svalue = configItem2.substr(ind+1);
nUpdatesPerClient = atoi(svalue.c_str());
}
else if(configItem2.find("group") == 0) {
std::string configItem = configItems[i];
groupId = configItem.substr(ind+1);
}
else if(configItem2.find("set") == 0) {
std::string configItem = configItems[i];
setId = configItem.substr(ind+1);
hasSetId = true;
}
else if(configItem2.find("mode") == 0) {
std::string svalue = toLowerCase(configItem2.substr(ind+1));
if (svalue == "one") {
updateMode = DataDistributor::DD_UPDATE_ONE_PER_GROUP;
hasUpdateMode = true;
}
else if (svalue == "all") {
updateMode = DataDistributor::DD_UPDATE_ALL_IN_GROUP;
hasUpdateMode = true;
}
}
else if(configItem2.find("trigger") == 0) {
std::string configItem = configItems[i];
triggerField = configItem.substr(ind+1);
}
}
// If request does not have update mode specified, but has set id
// then use a different update mode
if(!hasUpdateMode && hasSetId) {
updateMode = DataDistributor::DD_UPDATE_ALL_IN_GROUP;
}
// Make sure request is valid
if(nUpdatesPerClient <= 0) {
return DataDistributorFilterPtr();
}
DataDistributorFilterPtr filter =
DataDistributorFilterPtr(new DataDistributorFilter(groupId, clientId, setId, triggerField, nUpdatesPerClient, updateMode, pvCopy, master));
return filter;
}
DataDistributorFilter::DataDistributorFilter(const std::string& groupId_, int clientId_, const std::string& setId_, const std::string& triggerField_, int nUpdatesPerClient, int updateMode, const PVCopyPtr& copyPtr_, const epvd::PVFieldPtr& masterFieldPtr_)
: dataDistributorPtr(DataDistributor::getInstance(groupId_))
, clientId(clientId_)
, setId(setId_)
, triggerField(triggerField_)
, masterFieldPtr(masterFieldPtr_)
, triggerFieldPtr()
, firstUpdate(true)
{
triggerField = dataDistributorPtr->addClient(clientId, setId, triggerField, nUpdatesPerClient, updateMode);
if(masterFieldPtr->getField()->getType() == epvd::structure) {
epvd::PVStructurePtr pvStructurePtr = static_pointer_cast<epvd::PVStructure>(masterFieldPtr);
if(pvStructurePtr) {
triggerFieldPtr = pvStructurePtr->getSubField(triggerField);
}
}
if(!triggerFieldPtr) {
triggerFieldPtr = masterFieldPtr;
}
}
bool DataDistributorFilter::filter(const PVFieldPtr& pvCopy, const BitSetPtr& bitSet, bool toCopy)
{
if(!toCopy) {
return false;
}
bool proceedWithUpdate = false;
if(firstUpdate) {
// Always send first update
firstUpdate = false;
proceedWithUpdate = true;
}
else {
std::stringstream ss;
ss << triggerFieldPtr;
std::string triggerFieldValue = ss.str();
proceedWithUpdate = dataDistributorPtr->updateClient(clientId, setId, triggerFieldValue);
}
if(proceedWithUpdate) {
pvCopy->copyUnchecked(*masterFieldPtr);
bitSet->set((unsigned int)pvCopy->getFieldOffset());
}
else {
// Clear all bits
//bitSet->clear(pvCopy->getFieldOffset());
bitSet->clear();
}
return true;
}
string DataDistributorFilter::getName()
{
return name;
}
}}

View File

@@ -88,7 +88,6 @@ PVCopyPtr PVCopy::create(
bool result = pvCopy->init(pvStructure);
if(!result) return PVCopyPtr();
pvCopy->traverseMasterInitPlugin();
//cout << pvCopy->dump() << endl;
return pvCopy;
}
@@ -433,10 +432,23 @@ bool PVCopy::init(epics::pvData::PVStructurePtr const &pvRequest)
PVStructurePtr pvMasterStructure = pvMaster;
size_t len = pvRequest->getPVFields().size();
bool entireMaster = false;
if(len==0) entireMaster = true;
requestHasMasterField = false;
PVStructurePtr pvOptions;
if(len==1) {
pvOptions = pvRequest->getSubField<PVStructure>("_options");
if(len==0) {
entireMaster = true;
}
else {
// If "_" is in the request, but not in the master structure,
// then assume the top level PV structure is requested
PVStructurePtr masterFieldPtr = pvMaster->getSubField<PVStructure>("_");
PVStructurePtr requestFieldPtr = pvRequest->getSubField<PVStructure>("_");
if (requestFieldPtr) {
requestHasMasterField = true;
}
if (!masterFieldPtr && requestFieldPtr) {
entireMaster = true;
pvOptions = requestFieldPtr->getSubField<PVStructure>("_options");
}
}
if(entireMaster) {
structure = pvMasterStructure->getStructure();

View File

@@ -23,6 +23,7 @@
#include "pv/pvArrayPlugin.h"
#include "pv/pvTimestampPlugin.h"
#include "pv/pvDeadbandPlugin.h"
#include "pv/dataDistributorPlugin.h"
using std::tr1::static_pointer_cast;
using namespace epics::pvData;
@@ -44,6 +45,7 @@ PVDatabasePtr PVDatabase::getMaster()
PVArrayPlugin::create();
PVTimestampPlugin::create();
PVDeadbandPlugin::create();
DataDistributorPlugin::create();
}
return pvDatabaseMaster;
}

View File

@@ -332,6 +332,7 @@ PVRecordField::PVRecordField(
PVRecordPtr const & pvRecord)
: pvField(pvField),
isStructure(pvField->getField()->getType()==structure ? true : false),
master(),
parent(parent),
pvRecord(pvRecord)
{
@@ -418,11 +419,18 @@ void PVRecordField::postParent(PVRecordFieldPtr const & subField)
listener->dataPut(pvrs,subField);
}
PVRecordStructurePtr parent(this->parent.lock());
if(parent) parent->postParent(subField);
if(parent) {
parent->postParent(subField);
}
}
void PVRecordField::postSubField()
{
// Master field pointer will be set in only one subfield
PVRecordStructurePtr master(this->master.lock());
if(master) {
master->callListener();
}
callListener();
if(isStructure) {
PVRecordStructurePtr pvrs =
@@ -465,19 +473,38 @@ void PVRecordStructure::init()
PVRecordStructurePtr self =
static_pointer_cast<PVRecordStructure>(shared_from_this());
PVRecordPtr pvRecord = getPVRecord();
static bool masterFieldCallbackSet = false;
bool isMasterField = (!getFullFieldName().size());
if (isMasterField) {
masterFieldCallbackSet = false;
}
for(size_t i=0; i<numFields; i++) {
PVFieldPtr pvField = pvFields[i];
if(pvField->getField()->getType()==structure) {
PVStructurePtr xxx = static_pointer_cast<PVStructure>(pvField);
PVRecordStructurePtr pvRecordStructure(
PVStructurePtr xxx = static_pointer_cast<PVStructure>(pvField);
PVRecordStructurePtr pvRecordStructure(
new PVRecordStructure(xxx,self,pvRecord));
pvRecordFields->push_back(pvRecordStructure);
pvRecordStructure->init();
pvRecordFields->push_back(pvRecordStructure);
pvRecordStructure->init();
} else {
PVRecordFieldPtr pvRecordField(
PVRecordFieldPtr pvRecordField(
new PVRecordField(pvField,self,pvRecord));
pvRecordFields->push_back(pvRecordField);
pvRecordField->init();
pvRecordFields->push_back(pvRecordField);
pvRecordField->init();
// Master field listeners will be called before
// calling listeners for the first subfield
if (!masterFieldCallbackSet) {
masterFieldCallbackSet = true;
// Find master field
PVRecordStructurePtr p = pvRecordField->parent.lock();
while (p) {
PVRecordStructurePtr p2 = p->parent.lock();
if (!p2) {
pvRecordField->master = p;
}
p = p2;
}
}
}
}
}

View File

@@ -0,0 +1,167 @@
// Copyright information and license terms for this software can be
// found in the file LICENSE that is included with the distribution
#ifndef DATA_DISTRIBUTOR_PLUGIN_H
#define DATA_DISTRIBUTOR_PLUGIN_H
// The data distributor plugin enables distribution of channel data between
// multiple client applications.
#include <string>
#include <map>
#include <list>
#include <pv/lock.h>
#include <pv/pvData.h>
#include <pv/pvPlugin.h>
#include <shareLib.h>
namespace epics { namespace pvCopy {
class DataDistributorPlugin;
class DataDistributorFilter;
class DataDistributor;
typedef std::tr1::shared_ptr<DataDistributorPlugin> DataDistributorPluginPtr;
typedef std::tr1::shared_ptr<DataDistributorFilter> DataDistributorFilterPtr;
typedef std::tr1::shared_ptr<DataDistributor> DataDistributorPtr;
struct ClientSet;
typedef std::tr1::shared_ptr<ClientSet> ClientSetPtr;
typedef std::tr1::shared_ptr<const ClientSet> ClientSetConstPtr;
struct ClientSet
{
POINTER_DEFINITIONS(ClientSet);
ClientSet(const std::string& setId_, const std::string triggerField_, int nUpdatesPerClient_, int updateMode_)
: setId(setId_)
, triggerField(triggerField_)
, nUpdatesPerClient(nUpdatesPerClient_)
, updateMode(updateMode_)
, clientIdList()
, lastUpdateValue()
, updateCounter(0)
, currentClientIdIter(clientIdList.end())
{}
~ClientSet() {}
std::string setId;
std::string triggerField;
int nUpdatesPerClient;
int updateMode;
std::list<int> clientIdList;
std::string lastUpdateValue;
int updateCounter;
std::list<int>::iterator currentClientIdIter;
};
class DataDistributor
{
public:
enum ClientUpdateMode {
DD_UPDATE_ONE_PER_GROUP = 0, // Update goes to one client per set
DD_UPDATE_ALL_IN_GROUP = 1, // Update goes to all clients in set
DD_N_UPDATE_MODES = 2 // Number of valid update modes
};
static DataDistributorPtr getInstance(const std::string& groupId);
static void removeUnusedInstance(DataDistributorPtr dataDistributorPtr);
virtual ~DataDistributor();
std::string getGroupId() const { return groupId; }
std::string addClient(int clientId, const std::string& setId, const std::string& triggerField, int nUpdatesPerClient, int updateMode);
void removeClient(int clientId, const std::string& setId);
bool updateClient(int clientId, const std::string& setId, const std::string& triggerFieldValue);
private:
DataDistributor(const std::string& id);
DataDistributor(const DataDistributor& distributor);
DataDistributor& operator=(const DataDistributor& distributor);
static std::map<std::string, DataDistributorPtr> dataDistributorMap;
static epics::pvData::Mutex dataDistributorMapMutex;
std::string groupId;
epics::pvData::Mutex mutex;
std::map<std::string, ClientSetPtr> clientSetMap;
std::list<std::string> clientSetIdList;
std::list<std::string>::iterator currentSetIdIter;
std::string lastUpdateValue;
};
class epicsShareClass DataDistributorPlugin : public PVPlugin
{
private:
DataDistributorPlugin();
public:
POINTER_DEFINITIONS(DataDistributorPlugin);
virtual ~DataDistributorPlugin();
/**
* Factory
*/
static void create();
/**
* Create a PVFilter.
* @param requestValue The value part of a name=value request option.
* @param pvCopy The PVCopy to which the PVFilter will be attached.
* @param master The field in the master PVStructure to which the PVFilter will be attached
* @return The PVFilter.
* Null is returned if master or requestValue is not appropriate for the plugin.
*/
virtual PVFilterPtr create(
const std::string& requestValue,
const PVCopyPtr& pvCopy,
const epics::pvData::PVFieldPtr& master);
private:
static bool initialize();
static bool initialized;
};
/**
* @brief A Plugin for a filter that gets a sub array from a PVScalarDeadband.
*/
class epicsShareClass DataDistributorFilter : public PVFilter
{
private:
DataDistributorPtr dataDistributorPtr;
int clientId;
std::string setId;
std::string triggerField;
epics::pvData::PVFieldPtr masterFieldPtr;
epics::pvData::PVFieldPtr triggerFieldPtr;
bool firstUpdate;
DataDistributorFilter(const std::string& groupId, int clientId, const std::string& setId, const std::string& triggerField, int nUpdatesPerClient, int updateMode, const epics::pvCopy::PVCopyPtr& copyPtr, const epics::pvData::PVFieldPtr& masterFieldPtr);
public:
POINTER_DEFINITIONS(DataDistributorFilter);
virtual ~DataDistributorFilter();
/**
* Create a DataDistributorFilter.
* @param requestValue The value part of a name=value request option.
* @param master The field in the master PVStructure to which the PVFilter will be attached.
* @return The PVFilter.
* A null is returned if master or requestValue is not appropriate for the plugin.
*/
static DataDistributorFilterPtr create(
const std::string& requestValue,
const PVCopyPtr& pvCopy,
const epics::pvData::PVFieldPtr & master);
/**
* Perform a filter operation
* @param pvCopy The field in the copy PVStructure.
* @param bitSet A bitSet for copyPVStructure.
* @param toCopy (true,false) means copy (from master to copy,from copy to master)
* @return if filter (modified, did not modify) destination.
* Null is returned if master or requestValue is not appropriate for the plugin.
*/
bool filter(const epics::pvData::PVFieldPtr & pvCopy,const epics::pvData::BitSetPtr & bitSet,bool toCopy);
/**
* Get the filter name.
* @return The name.
*/
std::string getName();
};
}}
#endif

View File

@@ -376,6 +376,7 @@ private:
std::list<PVListenerWPtr> pvListenerList;
epics::pvData::PVField::weak_pointer pvField;
bool isStructure;
PVRecordStructureWPtr master;
PVRecordStructureWPtr parent;
PVRecordWPtr pvRecord;
std::string fullName;

View File

@@ -167,6 +167,10 @@ public:
* name is the subField name and value is the subField value.
*/
epics::pvData::PVStructurePtr getOptions(std::size_t fieldOffset);
/**
* Is master field requested?
*/
bool isMasterFieldRequested() const {return requestHasMasterField;}
/**
* For debugging.
*/
@@ -183,6 +187,7 @@ private:
CopyNodePtr headNode;
epics::pvData::PVStructurePtr cacheInitStructure;
epics::pvData::BitSetPtr ignorechangeBitSet;
bool requestHasMasterField;
void traverseMaster(
CopyNodePtr const &node,

View File

@@ -292,6 +292,12 @@ void MonitorLocal::dataPut(PVRecordFieldPtr const & pvRecordField)
{
cout << "MonitorLocal::dataPut(pvRecordField)" << endl;
}
// If this record field is the master field, and the master field was not
// requested, we do not proceed with copy
bool isMasterField = pvRecordField->getPVRecord()->getPVStructure()->getFieldOffset()==0;
if (isMasterField && !pvCopy->isMasterFieldRequested()) {
return;
}
if(state!=active) return;
{
Lock xx(mutex);

View File

@@ -33,3 +33,8 @@ TESTPROD_HOST += testPVAServer
testPVAServer_SRCS += testPVAServer.cpp
testHarness_SRCS += testPVAServer.cpp
TESTS += testPVAServer
TESTPROD_HOST += testChannelMonitor
testChannelMonitor_SRCS += testChannelMonitor.cpp
testHarness_SRCS += testChannelMonitor.cpp
TESTS += testChannelMonitor

View File

@@ -0,0 +1,312 @@
/* testChannelMonitor.cpp */
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* EPICS pvData is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <epicsUnitTest.h>
#include <testMain.h>
#include <cstddef>
#include <cstdlib>
#include <cstddef>
#include <string>
#include <cstdio>
#include <memory>
#include <iostream>
#include <epicsStdio.h>
#include <epicsMutex.h>
#include <epicsEvent.h>
#include <epicsThread.h>
#include <pv/standardField.h>
#include <pv/standardPVField.h>
#include <pv/pvData.h>
#include <pv/pvAccess.h>
#include <pv/channelProviderLocal.h>
#include <pv/serverContext.h>
#include <pv/event.h>
#include <pv/clientFactory.h>
using namespace std;
using std::tr1::static_pointer_cast;
using namespace epics::pvData;
using namespace epics::pvAccess;
using namespace epics::pvDatabase;
namespace TR1 = std::tr1;
static bool debug = true;
PVStructurePtr createTestPvStructure()
{
FieldCreatePtr fieldCreate = getFieldCreate();
StandardFieldPtr standardField = getStandardField()
;
PVDataCreatePtr pvDataCreate = getPVDataCreate();
return pvDataCreate->createPVStructure(
fieldCreate->createFieldBuilder()->
add("id",pvInt) ->
add("x",pvInt) ->
add("y",pvInt) ->
add("z",pvInt) ->
add("alarm",standardField->alarm()) ->
add("timeStamp",standardField->timeStamp()) ->
createStructure());
}
class ChannelMonitorRequesterImpl : public MonitorRequester
{
public:
ChannelMonitorRequesterImpl(const std::string& channelName_)
: channelName(channelName_)
, lastReceivedPvStructure(createTestPvStructure())
, lastReceivedBitSet()
{
}
virtual string getRequesterName()
{
return "ChannelMonitorRequesterImpl";
}
virtual void message(const std::string& message, MessageType messageType)
{
cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << endl;
}
virtual void monitorConnect(const epics::pvData::Status& status, const Monitor::shared_pointer& /*monitor*/, const Structure::const_shared_pointer& /*structure*/)
{
if (status.isSuccess()) {
// show warning
if (!status.isOK()) {
cout << "[" << channelName << "] channel monitor create: " << status << endl;
}
connectionEvent.signal();
}
else {
cout << "[" << channelName << "] failed to create channel monitor: " << status << endl;
}
}
virtual void monitorEvent(const Monitor::shared_pointer& monitor)
{
MonitorElement::shared_pointer element;
while ((element = monitor->poll())) {
cout << "changed/overrun " << *element->changedBitSet << '/' << *element->overrunBitSet << endl;
if (!lastReceivedBitSet) {
lastReceivedBitSet = BitSet::create(element->changedBitSet->size());
}
lastReceivedBitSet->clear();
*lastReceivedBitSet |= *element->changedBitSet;
lastReceivedPvStructure->copyUnchecked(*element->pvStructurePtr);
monitor->release(element);
}
}
virtual void unlisten(const Monitor::shared_pointer& /*monitor*/)
{
}
bool waitUntilConnected(double timeOut)
{
return connectionEvent.wait(timeOut);
}
PVStructurePtr getLastReceivedPvStructure()
{
return lastReceivedPvStructure;
}
BitSetPtr getLastReceivedBitSet()
{
return lastReceivedBitSet;
}
private:
Event event;
Event connectionEvent;
string channelName;
PVStructurePtr lastReceivedPvStructure;
BitSetPtr lastReceivedBitSet;
};
class ChannelRequesterImpl : public ChannelRequester
{
private:
Event event;
public:
virtual string getRequesterName()
{
return "ChannelRequesterImpl";
};
virtual void message(const std::string& message, MessageType messageType)
{
cout << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << endl;
}
virtual void channelCreated(const Status& status, const Channel::shared_pointer& channel)
{
if (status.isSuccess()) {
// show warning
if (!status.isOK()) {
cout << "[" << channel->getChannelName() << "] channel create: " << status << endl;
}
}
else {
cout << "[" << channel->getChannelName() << "] failed to create a channel: " << status << endl;
}
}
virtual void channelStateChange(const Channel::shared_pointer& /*channel*/, Channel::ConnectionState connectionState)
{
if (connectionState == Channel::CONNECTED) {
event.signal();
}
else {
cout << Channel::ConnectionStateNames[connectionState] << endl;
exit(3);
}
}
bool waitUntilConnected(double timeOut)
{
return event.wait(timeOut);
}
};
static void test()
{
PVDatabasePtr master = PVDatabase::getMaster();
ChannelProviderLocalPtr channelProvider = getChannelProviderLocal();
string recordName = "positions";
PVStructurePtr pvStructure = createTestPvStructure();
PVRecordPtr pvRecord = PVRecord::create(recordName,pvStructure);
master->addRecord(pvRecord);
pvRecord = master->findRecord(recordName);
{
pvRecord->lock();
pvRecord->process();
pvRecord->unlock();
}
if(debug) {cout << "processed positions" << endl; }
ServerContext::shared_pointer ctx = startPVAServer("local",0,true,true);
testOk1(ctx.get() != 0);
ClientFactory::start();
ChannelProvider::shared_pointer provider = ChannelProviderRegistry::clients()->getProvider("pva");
cout << "creating channel: " << recordName << endl;
TR1::shared_ptr<ChannelRequesterImpl> channelRequesterImpl(new ChannelRequesterImpl());
Channel::shared_pointer channel = provider->createChannel(recordName, channelRequesterImpl);
bool channelConnected = channelRequesterImpl->waitUntilConnected(1.0);
testOk1(channelConnected);
if (channelConnected) {
string remoteAddress = channel->getRemoteAddress();
cout << "remote address: " << remoteAddress << endl;
}
string request = "";
PVStructure::shared_pointer pvRequest = CreateRequest::create()->createRequest(request);
TR1::shared_ptr<ChannelMonitorRequesterImpl> cmRequesterImpl(new ChannelMonitorRequesterImpl(channel->getChannelName()));
Monitor::shared_pointer monitor = channel->createMonitor(cmRequesterImpl, pvRequest);
bool monitorConnected = cmRequesterImpl->waitUntilConnected(1.0);
testOk1(monitorConnected);
Status status = monitor->start();
testOk1(status.isOK());
epicsThreadSleep(1);
// Set id, x
{
pvRecord->beginGroupPut();
PVIntPtr id = pvStructure->getSubField<PVInt>("id");
id->put(1);
PVIntPtr x = pvStructure->getSubField<PVInt>("x");
x->put(1);
pvRecord->endGroupPut();
}
epicsThreadSleep(1);
// Changed set for (id,x): 0 unset, 1 set, 2 set, 3 unset, 4 unset
BitSetPtr changedSet = cmRequesterImpl->getLastReceivedBitSet();
testOk1(!changedSet->get(0) && changedSet->get(1) && changedSet->get(2) && !changedSet->get(3) && !changedSet->get(4));
testOk1(*pvStructure == *cmRequesterImpl->getLastReceivedPvStructure());
// Set id, y
{
pvRecord->beginGroupPut();
PVIntPtr id = pvStructure->getSubField<PVInt>("id");
id->put(2);
PVIntPtr y = pvStructure->getSubField<PVInt>("y");
y->put(2);
pvRecord->endGroupPut();
}
epicsThreadSleep(1);
// Changed set for (id,y): 0 unset, 1 set, 2 unset, 3 set, 4 unset
changedSet = cmRequesterImpl->getLastReceivedBitSet();
testOk1(!changedSet->get(0) && changedSet->get(1) && !changedSet->get(2) && changedSet->get(3) && !changedSet->get(4));
testOk1(*pvStructure == *cmRequesterImpl->getLastReceivedPvStructure());
// Set id, z
{
pvRecord->beginGroupPut();
PVIntPtr id = pvStructure->getSubField<PVInt>("id");
id->put(3);
PVIntPtr z = pvStructure->getSubField<PVInt>("z");
z->put(3);
pvRecord->endGroupPut();
}
epicsThreadSleep(1);
// Changed set for (id,z): 0 unset, 1 set, 2 unset, 3 unset, 4 set
changedSet = cmRequesterImpl->getLastReceivedBitSet();
testOk1(!changedSet->get(0) && changedSet->get(1) && !changedSet->get(2) && !changedSet->get(3) && changedSet->get(4));
testOk1(*pvStructure == *cmRequesterImpl->getLastReceivedPvStructure());
status = monitor->stop();
testOk1(status.isOK());
// Test master field
request = "field(_)";
pvRequest = CreateRequest::create()->createRequest(request);
cmRequesterImpl = TR1::shared_ptr<ChannelMonitorRequesterImpl>(new ChannelMonitorRequesterImpl(channel->getChannelName()));
monitor = channel->createMonitor(cmRequesterImpl, pvRequest);
monitorConnected = cmRequesterImpl->waitUntilConnected(1.0);
testOk1(monitorConnected);
status = monitor->start();
testOk1(status.isOK());
epicsThreadSleep(1);
{
pvRecord->beginGroupPut();
PVIntPtr id = pvStructure->getSubField<PVInt>("id");
id->put(4);
PVIntPtr x = pvStructure->getSubField<PVInt>("x");
x->put(4);
pvRecord->endGroupPut();
}
epicsThreadSleep(1);
// Changed set with master field requested: 0 set
changedSet = cmRequesterImpl->getLastReceivedBitSet();
testOk1(changedSet->get(0));
testOk1(*pvStructure == *cmRequesterImpl->getLastReceivedPvStructure());
status = monitor->stop();
testOk1(status.isOK());
}
MAIN(testChannelMonitor)
{
testPlan(16);
test();
return 0;
}

View File

@@ -274,6 +274,34 @@ static void testPVScalarArray(
}
}
static void testMasterField(PVRecordPtr const& pvRecord)
{
CreateRequest::shared_pointer createRequest = CreateRequest::create();
PVStructurePtr pvRequest = createRequest->createRequest("field(_)");
if(debug) {
cout << "pvRequest" << *pvRequest << endl ;
}
PVStructurePtr pvStructureRecord = pvRecord->getPVRecordStructure()->getPVStructure();
PVCopyPtr pvCopy = PVCopy::create(pvStructureRecord,pvRequest,"");
PVStructurePtr pvMasterField = pvCopy->getPVMaster();
if(debug) {
cout << "PV structure from record" << endl << *pvStructureRecord << endl;
cout << "Master PV structure from copy" << endl << *pvMasterField << endl;
cout << "Master PV structure from copy offset " << pvMasterField->getFieldOffset() << endl;
}
testOk1(pvCopy->isMasterFieldRequested());
testOk1(pvMasterField->getNumberFields() == pvStructureRecord->getNumberFields());
testOk1(pvMasterField->getFieldOffset() == 0);
PVStructurePtr pvStructureCopy = pvCopy->createPVStructure();
BitSetPtr bitSet = BitSetPtr(new BitSet(pvStructureCopy->getNumberFields()));
pvCopy->initCopy(pvStructureCopy, bitSet);
if(debug) {
cout << "PV structure from copy" << endl << *pvStructureCopy << endl;
cout << "PV structure from copy offset " << pvStructureCopy->getFieldOffset() << endl;
}
testOk1(pvMasterField->getNumberFields() == pvStructureCopy->getNumberFields());
}
static void scalarTest()
{
if(debug) {cout << endl << endl << "****scalarTest****" << endl;}
@@ -393,11 +421,21 @@ static void powerSupplyTest()
testPVScalar(valueNameRecord,valueNameCopy,pvRecord,pvCopy);
}
static void masterFieldTest()
{
if(debug) {
cout << endl << endl << "****masterFieldTest****" << endl;
}
PVRecordPtr pvRecord = createScalar("doubleRecord",pvDouble,"alarm,timeStamp,display");
testMasterField(pvRecord);
}
MAIN(testPVCopy)
{
testPlan(67);
testPlan(71);
scalarTest();
arrayTest();
powerSupplyTest();
masterFieldTest();
return 0;
}

View File

@@ -332,14 +332,128 @@ static void ignoreTest()
testOk1(nset==3);
}
static void debugOutput(const string& what, bool result, uint32 nSet, BitSetPtr bitSet, PVStructurePtr pvStructureCopy)
{
if(debug) {
cout << what
<< " result " << (result ? "true" : "false")
<< " nSet " << nSet
<< " bitSet " << *bitSet
<< "\n pvStructureCopy\n" << pvStructureCopy
<< "\n";
}
}
static void dataDistributorTest()
{
if(debug) {cout << endl << endl << "****dataDistributorTest****" << endl;}
bool result = false;
uint32 nSet = 0;
// Create test structure
PVStructurePtr pvRecordStructure(getStandardPVField()->scalar(pvInt,""));
PVIntPtr pvValue(pvRecordStructure->getSubField<PVInt>("value"));
PVRecordPtr pvRecord(PVRecord::create("intRecord",pvRecordStructure));
if(debug) {
cout << " pvRecordStructure\n" << pvRecordStructure
<< "\n";
}
// Request distributor plugin with trigger field value
PVStructurePtr pvRequest(CreateRequest::create()->createRequest("_[distributor=trigger:value]"));
// Create clients
PVCopyPtr pvCopy1(PVCopy::create(pvRecordStructure,pvRequest,""));
PVStructurePtr pvStructureCopy1(pvCopy1->createPVStructure());
BitSetPtr bitSet1(new BitSet(pvStructureCopy1->getNumberFields()));
PVCopyPtr pvCopy2(PVCopy::create(pvRecordStructure,pvRequest,""));
PVStructurePtr pvStructureCopy2(pvCopy2->createPVStructure());
BitSetPtr bitSet2(new BitSet(pvStructureCopy2->getNumberFields()));
// Update 0: both clients get it
result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1);
nSet = bitSet1->cardinality();
debugOutput("client 1: update 0", result, nSet, bitSet1, pvStructureCopy1);
testOk1(result==true);
testOk1(nSet==1);
result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2);
nSet = bitSet2->cardinality();
debugOutput("client 2: update 0", result, nSet, bitSet2, pvStructureCopy2);
testOk1(result==true);
testOk1(nSet==1);
// Update 1: only client 1 gets it
pvValue->put(1);
result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1);
nSet = bitSet1->cardinality();
debugOutput("client 1: update 1", result, nSet, bitSet1, pvStructureCopy1);
testOk1(result==true);
testOk1(nSet==1);
result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2);
nSet = bitSet2->cardinality();
debugOutput("client 2: update 1", result, nSet, bitSet2, pvStructureCopy2);
testOk1(result==false);
testOk1(nSet==0);
// Update 2: only client 2 gets it
pvValue->put(2);
result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1);
nSet = bitSet1->cardinality();
debugOutput("client 1: update 2", result, nSet, bitSet1, pvStructureCopy1);
testOk1(result==false);
testOk1(nSet==0);
result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2);
nSet = bitSet2->cardinality();
debugOutput("client 2: update 2", result, nSet, bitSet2, pvStructureCopy2);
testOk1(result==true);
testOk1(nSet==1);
// Update 3: only client 1 gets it
pvValue->put(3);
result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1);
nSet = bitSet1->cardinality();
debugOutput("client 1: update 3", result, nSet, bitSet1, pvStructureCopy1);
testOk1(result==true);
testOk1(nSet==1);
result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2);
nSet = bitSet2->cardinality();
debugOutput("client 2: update 3", result, nSet, bitSet2, pvStructureCopy2);
testOk1(result==false);
testOk1(nSet==0);
// Update 4: only client 2 gets it
pvValue->put(4);
result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1);
nSet = bitSet1->cardinality();
debugOutput("client 1: update 4", result, nSet, bitSet1, pvStructureCopy1);
testOk1(result==false);
testOk1(nSet==0);
result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2);
nSet = bitSet2->cardinality();
debugOutput("client 2: update 4", result, nSet, bitSet2, pvStructureCopy2);
testOk1(result==true);
testOk1(nSet==1);
}
MAIN(testPlugin)
{
testPlan(26);
testPlan(46);
PVDatabasePtr pvDatabase(PVDatabase::getMaster());
deadbandTest();
arrayTest();
unionArrayTest();
timeStampTest();
ignoreTest();
dataDistributorTest();
return 0;
}