@ -4,6 +4,28 @@ This document summarizes the changes to the module between releases.
|
||||
|
||||
## Release 4.7.1 (EPICS 7.0.x, UNRELEASED)
|
||||
|
||||
* Added data distributor plugin which can be used for distributing data between
|
||||
a group of clients. The plugin is triggered by the request string of the
|
||||
form:
|
||||
|
||||
`_[distributor=group:<group id>;set:<set_id>;trigger:<field_name>;updates:<n_updates>;mode:<update_mode>]`
|
||||
|
||||
The plugin parameters are optional and are described bellow:
|
||||
|
||||
- group: this parameter indicates a group that client application belongs to (default value: "default"); groups of clients are completely independent of each other
|
||||
|
||||
- set: this parameter designates a client set that application belongs to within its group (default value: "default")
|
||||
|
||||
- trigger: this is the PV structure field that distinguishes different channel updates (default value: "timeStamp"); for example, for area detector images one could use the "uniqueId" field of the NTND structure
|
||||
|
||||
- updates: this parameter configures how many sequential updates a client (or a set of clients) will receive before the data distributor starts updating the next one (default value: "1")
|
||||
|
||||
- mode: this parameter configures how channel updates are to be distributed between clients in a set:
|
||||
- one: update goes to one client per set
|
||||
- all: update goes to all clients in a set
|
||||
- default is "one" if client set id is not specified, and "all" if set id is specified
|
||||
|
||||
For more information and examples of usage see the [plugin documentation](dataDistributorPlugin.md).
|
||||
|
||||
## Release 4.7.0 (EPICS 7.0.7, Sep 2022)
|
||||
|
||||
|
233
documentation/dataDistributorPlugin.md
Normal file
233
documentation/dataDistributorPlugin.md
Normal file
@ -0,0 +1,233 @@
|
||||
# Data Distributor Plugin
|
||||
|
||||
The data distributor plugin enables distribution of channel data between
|
||||
multiple client applications. The plugin considers two basic use cases
|
||||
for a group of clients:
|
||||
|
||||
- For simple parallel processing where client applications do not need
|
||||
to share data all clients in a group receive n sequential updates
|
||||
in a round-robin fashion: client \#1 sees the first n updates, client \#2 the
|
||||
second n updates, and so on.
|
||||
|
||||
- For data analysis where several cooperating client applications must all
|
||||
see the same data in order to process it the applications are grouped
|
||||
into sets, and each set of clients receives the same number of sequential
|
||||
updates. The first n updates are sent to all members of client set #1, the second n updates are sent to all members of client set #2, and so on.
|
||||
|
||||
## Requirements
|
||||
|
||||
This plugin relies on the pvDatabase plugin framework and requires
|
||||
epics base version > 7.0.7
|
||||
|
||||
## Usage
|
||||
|
||||
The PV request object which triggers plugin instantiation is defined below:
|
||||
|
||||
```
|
||||
"_[distributor=group:<group id>;set:<set_id>;trigger:<field_name>;updates:<n_updates>;mode:<update_mode>]"
|
||||
```
|
||||
|
||||
The underscore character at the begining of the PV request object
|
||||
indicates that the data distributor will be targeting entire PV structure.
|
||||
The same PV request object format should work regardless of the language
|
||||
in which a particular client application is written.
|
||||
|
||||
The plugin parameters are the following:
|
||||
|
||||
- `group:<group_id>`: specifying a `group_id` names a group the client application belongs to (default value: `default`); clients with different group names are
|
||||
completely independent of each other
|
||||
|
||||
- `set:<set_id>`: this parameter designates a client set that application belongs to within its group (default value: `default`)
|
||||
|
||||
- `trigger:<field_name>`: this is the PV structure field that distinguishes
|
||||
different channel updates (default value: `timeStamp`); for example,
|
||||
for area detector images one could use the `uniqueId` field of the NTND
|
||||
structure
|
||||
|
||||
- `updates:<n_updates>`: this parameter must be an integer and configures how many sequential updates a client (or a set of clients) will receive before the data distributor starts updating the next one (default value: `1`)
|
||||
|
||||
- `mode:<update_mode>`: this parameter configures how channel updates are to be
|
||||
distributed between clients in a set:
|
||||
- `one`: update goes to one client per set
|
||||
- `all`: update goes to all clients in a set
|
||||
- default is `one` if client set id is not specified, and `all` if set
|
||||
id is specified
|
||||
|
||||
The plugin obeys the following rules:
|
||||
|
||||
- Parameter names are case insensitive, but the string values
|
||||
are not. For example, "group=abc" and "group=ABC" would indicate two
|
||||
different groups of clients. String values allow alphanumeric characters,
|
||||
as well as dashes and underscores.
|
||||
|
||||
- Updates for a set of clients are configured when the first client in
|
||||
the set requests data. Configuration values (i.e., "trigger",
|
||||
"updates", and "mode"), passed in the PV request by the subsequent
|
||||
clients are ignored.
|
||||
|
||||
- A set is removed from the group once the last client in that
|
||||
set disconnects.
|
||||
|
||||
- A group is removed from the distributor plugin once all of its
|
||||
clients have disconnected.
|
||||
|
||||
- Different client groups are completely independent of each other.
|
||||
In other words, channel updates sent to clients belonging to
|
||||
group A do not interfere with updates sent to clients
|
||||
belonging to group B.
|
||||
|
||||
- The order in which clients and groups receive data is on a
|
||||
"first connected, first served basis".
|
||||
|
||||
- The current channel PV object is always distributed to a client on an
|
||||
initial connect.
|
||||
|
||||
- Data distribution is dynamic with respect to the number of clients.
|
||||
As clients connect and disconnect, the data distribution in a group adjusts
|
||||
accordingly. For example, with a group of clients configured to
|
||||
distribute one sequential update to each client, three clients would each be
|
||||
receiving every third update; after client number four connects, all
|
||||
clients would start receiving every fourth update; if one of those then
|
||||
disconnects, remaining three clients would again be receiving every third
|
||||
update.
|
||||
|
||||
## Examples
|
||||
|
||||
For all examples below we assume that PVDatabase server is serving
|
||||
area detector images on the channel 'image'. All clients are started before
|
||||
the server itself, and the initial (empty) object has unique id of 0.
|
||||
|
||||
### Example 1
|
||||
|
||||
This example show behavior of three clients that belong to the same (default)
|
||||
group. Each client receives one sequential update in a round-robin fashion.
|
||||
Note that all clients received current object on initial connection,
|
||||
and every third object afterward:
|
||||
|
||||
Client 1:
|
||||
```
|
||||
$ pvget -m -r _[distributor=trigger:uniqueId] image | grep uniqueId
|
||||
int uniqueId 0
|
||||
int uniqueId 1
|
||||
int uniqueId 4
|
||||
int uniqueId 7
|
||||
int uniqueId 10
|
||||
```
|
||||
|
||||
Client 2:
|
||||
```
|
||||
$ pvget -m -r _[distributor=trigger:uniqueId] image | grep uniqueId
|
||||
int uniqueId 0
|
||||
int uniqueId 2
|
||||
int uniqueId 5
|
||||
int uniqueId 8
|
||||
int uniqueId 11
|
||||
```
|
||||
|
||||
Client 3:
|
||||
```
|
||||
$ pvget -m -r _[distributor=trigger:uniqueId] image | grep uniqueId
|
||||
int uniqueId 0
|
||||
int uniqueId 3
|
||||
int uniqueId 6
|
||||
int uniqueId 9
|
||||
int uniqueId 12
|
||||
```
|
||||
|
||||
### Example 2
|
||||
|
||||
In this example we have two sets of two clients, each client set receiving
|
||||
three sequential updates. Both clients from client set \#1 receive updates
|
||||
(1,2,3), both clients from client set \#2 receive updates (4,5,6),
|
||||
client set \#1 receives updates (7,8,9), and so on.
|
||||
|
||||
Client 1 and Client 2/Set 1:
|
||||
```
|
||||
$ pvget -m -r "_[distributor=set:S1;trigger:uniqueId;updates:3]" image | grep uniqueId
|
||||
int uniqueId 0
|
||||
int uniqueId 1
|
||||
int uniqueId 2
|
||||
int uniqueId 3
|
||||
int uniqueId 7
|
||||
int uniqueId 8
|
||||
int uniqueId 9
|
||||
int uniqueId 13
|
||||
int uniqueId 14
|
||||
int uniqueId 15
|
||||
```
|
||||
|
||||
Client 3 and Client 4/Set 2:
|
||||
```
|
||||
$ pvget -m -r "_[distributor=set:S2;trigger:uniqueId;updates:3]" image | grep uniqueId
|
||||
int uniqueId 0
|
||||
int uniqueId 4
|
||||
int uniqueId 5
|
||||
int uniqueId 6
|
||||
int uniqueId 10
|
||||
int uniqueId 11
|
||||
int uniqueId 12
|
||||
int uniqueId 16
|
||||
int uniqueId 17
|
||||
int uniqueId 18
|
||||
```
|
||||
|
||||
### Example 3
|
||||
|
||||
This example illustrates what happens when multiple independent groups of
|
||||
clients connect to the same channel. Group G1 has two clients belonging
|
||||
to the same default set, and requesting one sequential update per client, while
|
||||
Group G2 has two clients in the default set requesting three
|
||||
sequential updates per client.
|
||||
|
||||
In this case the first client in group G1 receives updates
|
||||
(1,3,5,...), while the second one receives updates (2,4,6,...). On the
|
||||
other hand, the first client in group G2 receives updates
|
||||
(1,2,3,7,8,9,...), while the second one receives updates (4,5,6,10,11,12,...).
|
||||
|
||||
Client 1/Group G1:
|
||||
```
|
||||
$ pvget -m -r "_[distributor=group:G1;trigger:uniqueId]" image | grep uniqueId
|
||||
int uniqueId 0
|
||||
int uniqueId 1
|
||||
int uniqueId 3
|
||||
int uniqueId 5
|
||||
int uniqueId 7
|
||||
int uniqueId 9
|
||||
```
|
||||
|
||||
Client 2/Group G1:
|
||||
```
|
||||
pvget -m -r "_[distributor=group:G1;trigger:uniqueId]" image | grep uniqueId
|
||||
int uniqueId 0
|
||||
int uniqueId 2
|
||||
int uniqueId 4
|
||||
int uniqueId 6
|
||||
int uniqueId 8
|
||||
```
|
||||
|
||||
Client 1/Group G2:
|
||||
```
|
||||
$ pvget -m -r "_[distributor=group:G2;trigger:uniqueId;updates:3]" image | grep uniqueId
|
||||
int uniqueId 0
|
||||
int uniqueId 1
|
||||
int uniqueId 2
|
||||
int uniqueId 3
|
||||
int uniqueId 7
|
||||
int uniqueId 8
|
||||
int uniqueId 9
|
||||
```
|
||||
|
||||
Client 2/Group G2:
|
||||
```
|
||||
$ pvget -m -r "_[distributor=group:G2;trigger:uniqueId;updates:3]" image | grep uniqueId
|
||||
int uniqueId 0
|
||||
int uniqueId 4
|
||||
int uniqueId 5
|
||||
int uniqueId 6
|
||||
int uniqueId 10
|
||||
int uniqueId 11
|
||||
int uniqueId 12
|
||||
```
|
||||
|
||||
The above shows that the two client groups do not interfere with each other.
|
||||
|
@ -8,5 +8,6 @@ LIBSRCS += pvCopy.cpp
|
||||
LIBSRCS += pvArrayPlugin.cpp
|
||||
LIBSRCS += pvDeadbandPlugin.cpp
|
||||
LIBSRCS += pvTimestampPlugin.cpp
|
||||
LIBSRCS += dataDistributorPlugin.cpp
|
||||
|
||||
|
||||
|
424
src/copy/dataDistributorPlugin.cpp
Normal file
424
src/copy/dataDistributorPlugin.cpp
Normal file
@ -0,0 +1,424 @@
|
||||
// Copyright information and license terms for this software can be
|
||||
// found in the file LICENSE that is included with the distribution
|
||||
|
||||
#include <stdlib.h>
|
||||
|
||||
#include <string>
|
||||
#include <algorithm>
|
||||
#include <pv/lock.h>
|
||||
#include <pv/pvData.h>
|
||||
#include <pv/bitSet.h>
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
#include "pv/dataDistributorPlugin.h"
|
||||
|
||||
using std::string;
|
||||
using std::size_t;
|
||||
using std::endl;
|
||||
using std::tr1::static_pointer_cast;
|
||||
using std::vector;
|
||||
using namespace epics::pvData;
|
||||
namespace epvd = epics::pvData;
|
||||
|
||||
namespace epics { namespace pvCopy {
|
||||
|
||||
// Utilities for manipulating strings
|
||||
static std::string leftTrim(const std::string& s)
|
||||
{
|
||||
int i;
|
||||
int n = s.length();
|
||||
for (i = 0; i < n; i++) {
|
||||
if (!isspace(s[i])) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return s.substr(i,n-i);
|
||||
}
|
||||
|
||||
static std::string rightTrim(const std::string& s)
|
||||
{
|
||||
int i;
|
||||
int n = s.length();
|
||||
for (i = n; i > 0; i--) {
|
||||
if (!isspace(s[i-1])) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return s.substr(0,i);
|
||||
}
|
||||
|
||||
static std::string trim(const std::string& s)
|
||||
{
|
||||
return rightTrim(leftTrim(s));
|
||||
}
|
||||
|
||||
static std::vector<std::string>& split(const std::string& s, char delimiter, std::vector<std::string>& elements)
|
||||
{
|
||||
std::stringstream ss(s);
|
||||
std::string item;
|
||||
while (std::getline(ss, item, delimiter)) {
|
||||
elements.push_back(trim(item));
|
||||
}
|
||||
return elements;
|
||||
}
|
||||
|
||||
static std::vector<std::string> split(const std::string& s, char delimiter)
|
||||
{
|
||||
std::vector<std::string> elements;
|
||||
split(s, delimiter, elements);
|
||||
return elements;
|
||||
}
|
||||
|
||||
static std::string toLowerCase(const std::string& input)
|
||||
{
|
||||
std::stringstream ss;
|
||||
for (unsigned int i = 0; i < input.size(); i++) {
|
||||
char c = std::tolower(input.at(i));
|
||||
ss << c;
|
||||
}
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
// Data distributor class
|
||||
|
||||
static std::string name("distributor");
|
||||
bool DataDistributorPlugin::initialized(DataDistributorPlugin::initialize());
|
||||
|
||||
std::map<std::string, DataDistributorPtr> DataDistributor::dataDistributorMap;
|
||||
epics::pvData::Mutex DataDistributor::dataDistributorMapMutex;
|
||||
|
||||
DataDistributorPtr DataDistributor::getInstance(const std::string& groupId)
|
||||
{
|
||||
epvd::Lock lock(dataDistributorMapMutex);
|
||||
std::map<std::string,DataDistributorPtr>::iterator ddit = dataDistributorMap.find(groupId);
|
||||
if (ddit != dataDistributorMap.end()) {
|
||||
DataDistributorPtr ddPtr = ddit->second;
|
||||
return ddPtr;
|
||||
}
|
||||
else {
|
||||
DataDistributorPtr ddPtr(new DataDistributor(groupId));
|
||||
dataDistributorMap[groupId] = ddPtr;
|
||||
return ddPtr;
|
||||
}
|
||||
}
|
||||
|
||||
void DataDistributor::removeUnusedInstance(DataDistributorPtr dataDistributorPtr)
|
||||
{
|
||||
epvd::Lock lock(dataDistributorMapMutex);
|
||||
std::string groupId = dataDistributorPtr->getGroupId();
|
||||
std::map<std::string,DataDistributorPtr>::iterator ddit = dataDistributorMap.find(groupId);
|
||||
if (ddit != dataDistributorMap.end()) {
|
||||
DataDistributorPtr ddPtr = ddit->second;
|
||||
int nSets = ddPtr->clientSetMap.size();
|
||||
if (nSets == 0) {
|
||||
dataDistributorMap.erase(ddit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DataDistributor::DataDistributor(const std::string& groupId_)
|
||||
: groupId(groupId_)
|
||||
, mutex()
|
||||
, clientSetMap()
|
||||
, clientSetIdList()
|
||||
, currentSetIdIter(clientSetIdList.end())
|
||||
, lastUpdateValue()
|
||||
{
|
||||
}
|
||||
|
||||
DataDistributor::~DataDistributor()
|
||||
{
|
||||
epvd::Lock lock(mutex);
|
||||
clientSetMap.clear();
|
||||
clientSetIdList.clear();
|
||||
}
|
||||
|
||||
std::string DataDistributor::addClient(int clientId, const std::string& setId, const std::string& triggerField, int nUpdatesPerClient, int updateMode)
|
||||
{
|
||||
epvd::Lock lock(mutex);
|
||||
std::map<std::string,ClientSetPtr>::iterator git = clientSetMap.find(setId);
|
||||
if (git != clientSetMap.end()) {
|
||||
ClientSetPtr setPtr = git->second;
|
||||
setPtr->clientIdList.push_back(clientId);
|
||||
return setPtr->triggerField;
|
||||
}
|
||||
else {
|
||||
ClientSetPtr setPtr(new ClientSet(setId, triggerField, nUpdatesPerClient, updateMode));
|
||||
setPtr->clientIdList.push_back(clientId);
|
||||
clientSetMap[setId] = setPtr;
|
||||
clientSetIdList.push_back(setId);
|
||||
return triggerField;
|
||||
}
|
||||
}
|
||||
|
||||
void DataDistributor::removeClient(int clientId, const std::string& setId)
|
||||
{
|
||||
epvd::Lock lock(mutex);
|
||||
std::map<std::string,ClientSetPtr>::iterator git = clientSetMap.find(setId);
|
||||
if (git != clientSetMap.end()) {
|
||||
ClientSetPtr setPtr = git->second;
|
||||
std::list<int>::iterator cit = std::find(setPtr->clientIdList.begin(), setPtr->clientIdList.end(), clientId);
|
||||
if (cit != setPtr->clientIdList.end()) {
|
||||
// If we are removing current client id, advance iterator
|
||||
if (cit == setPtr->currentClientIdIter) {
|
||||
setPtr->currentClientIdIter++;
|
||||
}
|
||||
|
||||
// Find current client id
|
||||
int currentClientId = -1;
|
||||
if (setPtr->currentClientIdIter != setPtr->clientIdList.end()) {
|
||||
currentClientId = *(setPtr->currentClientIdIter);
|
||||
}
|
||||
|
||||
// Remove client id from the list
|
||||
setPtr->clientIdList.erase(cit);
|
||||
|
||||
// Reset current client id iterator
|
||||
setPtr->currentClientIdIter = setPtr->clientIdList.end();
|
||||
if (currentClientId >= 0) {
|
||||
std::list<int>::iterator cit2 = std::find(setPtr->clientIdList.begin(), setPtr->clientIdList.end(), currentClientId);
|
||||
if (cit2 != setPtr->clientIdList.end()) {
|
||||
setPtr->currentClientIdIter = cit2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (setPtr->clientIdList.size() == 0) {
|
||||
clientSetMap.erase(git);
|
||||
std::list<std::string>::iterator git2 = std::find(clientSetIdList.begin(), clientSetIdList.end(), setId);
|
||||
if (git2 == currentSetIdIter) {
|
||||
currentSetIdIter++;
|
||||
}
|
||||
if (git2 != clientSetIdList.end()) {
|
||||
clientSetIdList.erase(git2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool DataDistributor::updateClient(int clientId, const std::string& setId, const std::string& triggerFieldValue)
|
||||
{
|
||||
epvd::Lock lock(mutex);
|
||||
bool proceedWithUpdate = false;
|
||||
if (currentSetIdIter == clientSetIdList.end()) {
|
||||
currentSetIdIter = clientSetIdList.begin();
|
||||
}
|
||||
std::string currentSetId = *currentSetIdIter;
|
||||
if (setId != currentSetId) {
|
||||
// We are not distributing data to this set at the moment
|
||||
return proceedWithUpdate;
|
||||
}
|
||||
ClientSetPtr setPtr = clientSetMap[currentSetId];
|
||||
if (setPtr->currentClientIdIter == setPtr->clientIdList.end()) {
|
||||
// Move current client iterator to the beginning of the list
|
||||
setPtr->currentClientIdIter = setPtr->clientIdList.begin();
|
||||
}
|
||||
if (lastUpdateValue == triggerFieldValue) {
|
||||
// This update was already distributed.
|
||||
return proceedWithUpdate;
|
||||
}
|
||||
switch (setPtr->updateMode) {
|
||||
case(DD_UPDATE_ONE_PER_GROUP): {
|
||||
if (clientId != *(setPtr->currentClientIdIter)) {
|
||||
// Not this client's turn.
|
||||
return proceedWithUpdate;
|
||||
}
|
||||
proceedWithUpdate = true;
|
||||
lastUpdateValue = triggerFieldValue;
|
||||
setPtr->lastUpdateValue = triggerFieldValue;
|
||||
setPtr->updateCounter++;
|
||||
if (setPtr->updateCounter >= setPtr->nUpdatesPerClient) {
|
||||
// This client and set are done.
|
||||
setPtr->currentClientIdIter++;
|
||||
setPtr->updateCounter = 0;
|
||||
currentSetIdIter++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case(DD_UPDATE_ALL_IN_GROUP): {
|
||||
proceedWithUpdate = true;
|
||||
static unsigned int nClientsUpdated = 0;
|
||||
if (setPtr->lastUpdateValue != triggerFieldValue) {
|
||||
setPtr->lastUpdateValue = triggerFieldValue;
|
||||
setPtr->updateCounter++;
|
||||
nClientsUpdated = 0;
|
||||
}
|
||||
nClientsUpdated++;
|
||||
if (nClientsUpdated == setPtr->clientIdList.size() && setPtr->updateCounter >= setPtr->nUpdatesPerClient) {
|
||||
// This set is done.
|
||||
lastUpdateValue = triggerFieldValue;
|
||||
setPtr->updateCounter = 0;
|
||||
currentSetIdIter++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
proceedWithUpdate = true;
|
||||
}
|
||||
}
|
||||
return proceedWithUpdate;
|
||||
}
|
||||
|
||||
DataDistributorPlugin::DataDistributorPlugin()
|
||||
{
|
||||
}
|
||||
|
||||
DataDistributorPlugin::~DataDistributorPlugin()
|
||||
{
|
||||
}
|
||||
|
||||
void DataDistributorPlugin::create()
|
||||
{
|
||||
initialize();
|
||||
}
|
||||
|
||||
bool DataDistributorPlugin::initialize()
|
||||
{
|
||||
DataDistributorPluginPtr pvPlugin = DataDistributorPluginPtr(new DataDistributorPlugin());
|
||||
PVPluginRegistry::registerPlugin(name,pvPlugin);
|
||||
return true;
|
||||
}
|
||||
|
||||
PVFilterPtr DataDistributorPlugin::create(
|
||||
const std::string& requestValue,
|
||||
const PVCopyPtr& pvCopy,
|
||||
const PVFieldPtr& master)
|
||||
{
|
||||
return DataDistributorFilter::create(requestValue,pvCopy,master);
|
||||
}
|
||||
|
||||
DataDistributorFilter::~DataDistributorFilter()
|
||||
{
|
||||
dataDistributorPtr->removeClient(clientId, setId);
|
||||
DataDistributor::removeUnusedInstance(dataDistributorPtr);
|
||||
}
|
||||
|
||||
DataDistributorFilterPtr DataDistributorFilter::create(
|
||||
const std::string& requestValue,
|
||||
const PVCopyPtr& pvCopy,
|
||||
const PVFieldPtr& master)
|
||||
{
|
||||
static int clientId = 0;
|
||||
clientId++;
|
||||
|
||||
std::vector<std::string> configItems = split(requestValue, ';');
|
||||
// Use lowercase keys if possible.
|
||||
std::string requestValue2 = toLowerCase(requestValue);
|
||||
std::vector<std::string> configItems2 = split(requestValue2, ';');
|
||||
int nUpdatesPerClient = 1;
|
||||
int updateMode = DataDistributor::DD_UPDATE_ONE_PER_GROUP;
|
||||
std::string groupId = "default";
|
||||
std::string setId = "default";
|
||||
std::string triggerField = "timeStamp";
|
||||
bool hasUpdateMode = false;
|
||||
bool hasSetId = false;
|
||||
for(unsigned int i = 0; i < configItems2.size(); i++) {
|
||||
std::string configItem2 = configItems2[i];
|
||||
size_t ind = configItem2.find(':');
|
||||
if (ind == string::npos) {
|
||||
continue;
|
||||
}
|
||||
if(configItem2.find("updates") == 0) {
|
||||
std::string svalue = configItem2.substr(ind+1);
|
||||
nUpdatesPerClient = atoi(svalue.c_str());
|
||||
}
|
||||
else if(configItem2.find("group") == 0) {
|
||||
std::string configItem = configItems[i];
|
||||
groupId = configItem.substr(ind+1);
|
||||
}
|
||||
else if(configItem2.find("set") == 0) {
|
||||
std::string configItem = configItems[i];
|
||||
setId = configItem.substr(ind+1);
|
||||
hasSetId = true;
|
||||
}
|
||||
else if(configItem2.find("mode") == 0) {
|
||||
std::string svalue = toLowerCase(configItem2.substr(ind+1));
|
||||
if (svalue == "one") {
|
||||
updateMode = DataDistributor::DD_UPDATE_ONE_PER_GROUP;
|
||||
hasUpdateMode = true;
|
||||
}
|
||||
else if (svalue == "all") {
|
||||
updateMode = DataDistributor::DD_UPDATE_ALL_IN_GROUP;
|
||||
hasUpdateMode = true;
|
||||
}
|
||||
}
|
||||
else if(configItem2.find("trigger") == 0) {
|
||||
std::string configItem = configItems[i];
|
||||
triggerField = configItem.substr(ind+1);
|
||||
}
|
||||
}
|
||||
// If request does not have update mode specified, but has set id
|
||||
// then use a different update mode
|
||||
if(!hasUpdateMode && hasSetId) {
|
||||
updateMode = DataDistributor::DD_UPDATE_ALL_IN_GROUP;
|
||||
}
|
||||
|
||||
// Make sure request is valid
|
||||
if(nUpdatesPerClient <= 0) {
|
||||
return DataDistributorFilterPtr();
|
||||
}
|
||||
DataDistributorFilterPtr filter =
|
||||
DataDistributorFilterPtr(new DataDistributorFilter(groupId, clientId, setId, triggerField, nUpdatesPerClient, updateMode, pvCopy, master));
|
||||
return filter;
|
||||
}
|
||||
|
||||
DataDistributorFilter::DataDistributorFilter(const std::string& groupId_, int clientId_, const std::string& setId_, const std::string& triggerField_, int nUpdatesPerClient, int updateMode, const PVCopyPtr& copyPtr_, const epvd::PVFieldPtr& masterFieldPtr_)
|
||||
: dataDistributorPtr(DataDistributor::getInstance(groupId_))
|
||||
, clientId(clientId_)
|
||||
, setId(setId_)
|
||||
, triggerField(triggerField_)
|
||||
, masterFieldPtr(masterFieldPtr_)
|
||||
, triggerFieldPtr()
|
||||
, firstUpdate(true)
|
||||
{
|
||||
triggerField = dataDistributorPtr->addClient(clientId, setId, triggerField, nUpdatesPerClient, updateMode);
|
||||
if(masterFieldPtr->getField()->getType() == epvd::structure) {
|
||||
epvd::PVStructurePtr pvStructurePtr = static_pointer_cast<epvd::PVStructure>(masterFieldPtr);
|
||||
if(pvStructurePtr) {
|
||||
triggerFieldPtr = pvStructurePtr->getSubField(triggerField);
|
||||
}
|
||||
}
|
||||
if(!triggerFieldPtr) {
|
||||
triggerFieldPtr = masterFieldPtr;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool DataDistributorFilter::filter(const PVFieldPtr& pvCopy, const BitSetPtr& bitSet, bool toCopy)
|
||||
{
|
||||
if(!toCopy) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool proceedWithUpdate = false;
|
||||
if(firstUpdate) {
|
||||
// Always send first update
|
||||
firstUpdate = false;
|
||||
proceedWithUpdate = true;
|
||||
}
|
||||
else {
|
||||
std::stringstream ss;
|
||||
ss << triggerFieldPtr;
|
||||
std::string triggerFieldValue = ss.str();
|
||||
proceedWithUpdate = dataDistributorPtr->updateClient(clientId, setId, triggerFieldValue);
|
||||
}
|
||||
|
||||
if(proceedWithUpdate) {
|
||||
pvCopy->copyUnchecked(*masterFieldPtr);
|
||||
bitSet->set(pvCopy->getFieldOffset());
|
||||
}
|
||||
else {
|
||||
// Clear all bits
|
||||
//bitSet->clear(pvCopy->getFieldOffset());
|
||||
bitSet->clear();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
string DataDistributorFilter::getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
}}
|
167
src/pv/dataDistributorPlugin.h
Normal file
167
src/pv/dataDistributorPlugin.h
Normal file
@ -0,0 +1,167 @@
|
||||
// Copyright information and license terms for this software can be
|
||||
// found in the file LICENSE that is included with the distribution
|
||||
|
||||
#ifndef DATA_DISTRIBUTOR_PLUGIN_H
|
||||
#define DATA_DISTRIBUTOR_PLUGIN_H
|
||||
|
||||
// The data distributor plugin enables distribution of channel data between
|
||||
// multiple client applications.
|
||||
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <list>
|
||||
#include <pv/lock.h>
|
||||
#include <pv/pvData.h>
|
||||
#include <pv/pvPlugin.h>
|
||||
|
||||
#include <shareLib.h>
|
||||
|
||||
namespace epics { namespace pvCopy {
|
||||
|
||||
class DataDistributorPlugin;
|
||||
class DataDistributorFilter;
|
||||
class DataDistributor;
|
||||
|
||||
typedef std::tr1::shared_ptr<DataDistributorPlugin> DataDistributorPluginPtr;
|
||||
typedef std::tr1::shared_ptr<DataDistributorFilter> DataDistributorFilterPtr;
|
||||
typedef std::tr1::shared_ptr<DataDistributor> DataDistributorPtr;
|
||||
|
||||
struct ClientSet;
|
||||
typedef std::tr1::shared_ptr<ClientSet> ClientSetPtr;
|
||||
typedef std::tr1::shared_ptr<const ClientSet> ClientSetConstPtr;
|
||||
|
||||
struct ClientSet
|
||||
{
|
||||
POINTER_DEFINITIONS(ClientSet);
|
||||
|
||||
ClientSet(const std::string& setId_, const std::string triggerField_, int nUpdatesPerClient_, int updateMode_)
|
||||
: setId(setId_)
|
||||
, triggerField(triggerField_)
|
||||
, nUpdatesPerClient(nUpdatesPerClient_)
|
||||
, updateMode(updateMode_)
|
||||
, clientIdList()
|
||||
, lastUpdateValue()
|
||||
, updateCounter(0)
|
||||
, currentClientIdIter(clientIdList.end())
|
||||
{}
|
||||
~ClientSet() {}
|
||||
std::string setId;
|
||||
std::string triggerField;
|
||||
int nUpdatesPerClient;
|
||||
int updateMode;
|
||||
std::list<int> clientIdList;
|
||||
std::string lastUpdateValue;
|
||||
int updateCounter;
|
||||
std::list<int>::iterator currentClientIdIter;
|
||||
};
|
||||
|
||||
class DataDistributor
|
||||
{
|
||||
public:
|
||||
enum ClientUpdateMode {
|
||||
DD_UPDATE_ONE_PER_GROUP = 0, // Update goes to one client per set
|
||||
DD_UPDATE_ALL_IN_GROUP = 1, // Update goes to all clients in set
|
||||
DD_N_UPDATE_MODES = 2 // Number of valid update modes
|
||||
};
|
||||
|
||||
static DataDistributorPtr getInstance(const std::string& groupId);
|
||||
static void removeUnusedInstance(DataDistributorPtr dataDistributorPtr);
|
||||
|
||||
virtual ~DataDistributor();
|
||||
std::string getGroupId() const { return groupId; }
|
||||
std::string addClient(int clientId, const std::string& setId, const std::string& triggerField, int nUpdatesPerClient, int updateMode);
|
||||
void removeClient(int clientId, const std::string& setId);
|
||||
bool updateClient(int clientId, const std::string& setId, const std::string& triggerFieldValue);
|
||||
|
||||
private:
|
||||
DataDistributor(const std::string& id);
|
||||
DataDistributor(const DataDistributor& distributor);
|
||||
DataDistributor& operator=(const DataDistributor& distributor);
|
||||
|
||||
static std::map<std::string, DataDistributorPtr> dataDistributorMap;
|
||||
static epics::pvData::Mutex dataDistributorMapMutex;
|
||||
|
||||
std::string groupId;
|
||||
epics::pvData::Mutex mutex;
|
||||
std::map<std::string, ClientSetPtr> clientSetMap;
|
||||
std::list<std::string> clientSetIdList;
|
||||
std::list<std::string>::iterator currentSetIdIter;
|
||||
std::string lastUpdateValue;
|
||||
};
|
||||
|
||||
class epicsShareClass DataDistributorPlugin : public PVPlugin
|
||||
{
|
||||
private:
|
||||
DataDistributorPlugin();
|
||||
public:
|
||||
POINTER_DEFINITIONS(DataDistributorPlugin);
|
||||
virtual ~DataDistributorPlugin();
|
||||
/**
|
||||
* Factory
|
||||
*/
|
||||
static void create();
|
||||
/**
|
||||
* Create a PVFilter.
|
||||
* @param requestValue The value part of a name=value request option.
|
||||
* @param pvCopy The PVCopy to which the PVFilter will be attached.
|
||||
* @param master The field in the master PVStructure to which the PVFilter will be attached
|
||||
* @return The PVFilter.
|
||||
* Null is returned if master or requestValue is not appropriate for the plugin.
|
||||
*/
|
||||
virtual PVFilterPtr create(
|
||||
const std::string& requestValue,
|
||||
const PVCopyPtr& pvCopy,
|
||||
const epics::pvData::PVFieldPtr& master);
|
||||
private:
|
||||
static bool initialize();
|
||||
static bool initialized;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief A Plugin for a filter that gets a sub array from a PVScalarDeadband.
|
||||
*/
|
||||
class epicsShareClass DataDistributorFilter : public PVFilter
|
||||
{
|
||||
private:
|
||||
DataDistributorPtr dataDistributorPtr;
|
||||
int clientId;
|
||||
std::string setId;
|
||||
std::string triggerField;
|
||||
epics::pvData::PVFieldPtr masterFieldPtr;
|
||||
epics::pvData::PVFieldPtr triggerFieldPtr;
|
||||
bool firstUpdate;
|
||||
|
||||
DataDistributorFilter(const std::string& groupId, int clientId, const std::string& setId, const std::string& triggerField, int nUpdatesPerClient, int updateMode, const epics::pvCopy::PVCopyPtr& copyPtr, const epics::pvData::PVFieldPtr& masterFieldPtr);
|
||||
|
||||
public:
|
||||
POINTER_DEFINITIONS(DataDistributorFilter);
|
||||
virtual ~DataDistributorFilter();
|
||||
/**
|
||||
* Create a DataDistributorFilter.
|
||||
* @param requestValue The value part of a name=value request option.
|
||||
* @param master The field in the master PVStructure to which the PVFilter will be attached.
|
||||
* @return The PVFilter.
|
||||
* A null is returned if master or requestValue is not appropriate for the plugin.
|
||||
*/
|
||||
static DataDistributorFilterPtr create(
|
||||
const std::string& requestValue,
|
||||
const PVCopyPtr& pvCopy,
|
||||
const epics::pvData::PVFieldPtr & master);
|
||||
/**
|
||||
* Perform a filter operation
|
||||
* @param pvCopy The field in the copy PVStructure.
|
||||
* @param bitSet A bitSet for copyPVStructure.
|
||||
* @param toCopy (true,false) means copy (from master to copy,from copy to master)
|
||||
* @return if filter (modified, did not modify) destination.
|
||||
* Null is returned if master or requestValue is not appropriate for the plugin.
|
||||
*/
|
||||
bool filter(const epics::pvData::PVFieldPtr & pvCopy,const epics::pvData::BitSetPtr & bitSet,bool toCopy);
|
||||
/**
|
||||
* Get the filter name.
|
||||
* @return The name.
|
||||
*/
|
||||
std::string getName();
|
||||
};
|
||||
|
||||
}}
|
||||
#endif
|
@ -332,14 +332,128 @@ static void ignoreTest()
|
||||
testOk1(nset==3);
|
||||
}
|
||||
|
||||
static void debugOutput(const string& what, bool result, uint32 nSet, BitSetPtr bitSet, PVStructurePtr pvStructureCopy)
|
||||
{
|
||||
if(debug) {
|
||||
cout << what
|
||||
<< " result " << (result ? "true" : "false")
|
||||
<< " nSet " << nSet
|
||||
<< " bitSet " << *bitSet
|
||||
<< "\n pvStructureCopy\n" << pvStructureCopy
|
||||
<< "\n";
|
||||
}
|
||||
}
|
||||
|
||||
static void dataDistributorTest()
|
||||
{
|
||||
if(debug) {cout << endl << endl << "****dataDistributorTest****" << endl;}
|
||||
bool result = false;
|
||||
uint32 nSet = 0;
|
||||
|
||||
// Create test structure
|
||||
PVStructurePtr pvRecordStructure(getStandardPVField()->scalar(pvInt,""));
|
||||
PVIntPtr pvValue(pvRecordStructure->getSubField<PVInt>("value"));
|
||||
PVRecordPtr pvRecord(PVRecord::create("intRecord",pvRecordStructure));
|
||||
if(debug) {
|
||||
cout << " pvRecordStructure\n" << pvRecordStructure
|
||||
<< "\n";
|
||||
}
|
||||
|
||||
// Request distributor plugin with trigger field value
|
||||
PVStructurePtr pvRequest(CreateRequest::create()->createRequest("_[distributor=trigger:value]"));
|
||||
|
||||
// Create clients
|
||||
PVCopyPtr pvCopy1(PVCopy::create(pvRecordStructure,pvRequest,""));
|
||||
PVStructurePtr pvStructureCopy1(pvCopy1->createPVStructure());
|
||||
BitSetPtr bitSet1(new BitSet(pvStructureCopy1->getNumberFields()));
|
||||
|
||||
PVCopyPtr pvCopy2(PVCopy::create(pvRecordStructure,pvRequest,""));
|
||||
PVStructurePtr pvStructureCopy2(pvCopy2->createPVStructure());
|
||||
BitSetPtr bitSet2(new BitSet(pvStructureCopy2->getNumberFields()));
|
||||
|
||||
// Update 0: both clients get it
|
||||
result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1);
|
||||
nSet = bitSet1->cardinality();
|
||||
debugOutput("client 1: update 0", result, nSet, bitSet1, pvStructureCopy1);
|
||||
testOk1(result==true);
|
||||
testOk1(nSet==1);
|
||||
|
||||
result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2);
|
||||
nSet = bitSet2->cardinality();
|
||||
debugOutput("client 2: update 0", result, nSet, bitSet2, pvStructureCopy2);
|
||||
testOk1(result==true);
|
||||
testOk1(nSet==1);
|
||||
|
||||
// Update 1: only client 1 gets it
|
||||
pvValue->put(1);
|
||||
|
||||
result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1);
|
||||
nSet = bitSet1->cardinality();
|
||||
debugOutput("client 1: update 1", result, nSet, bitSet1, pvStructureCopy1);
|
||||
testOk1(result==true);
|
||||
testOk1(nSet==1);
|
||||
|
||||
result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2);
|
||||
nSet = bitSet2->cardinality();
|
||||
debugOutput("client 2: update 1", result, nSet, bitSet2, pvStructureCopy2);
|
||||
testOk1(result==false);
|
||||
testOk1(nSet==0);
|
||||
|
||||
// Update 2: only client 2 gets it
|
||||
pvValue->put(2);
|
||||
|
||||
result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1);
|
||||
nSet = bitSet1->cardinality();
|
||||
debugOutput("client 1: update 2", result, nSet, bitSet1, pvStructureCopy1);
|
||||
testOk1(result==false);
|
||||
testOk1(nSet==0);
|
||||
|
||||
result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2);
|
||||
nSet = bitSet2->cardinality();
|
||||
debugOutput("client 2: update 2", result, nSet, bitSet2, pvStructureCopy2);
|
||||
testOk1(result==true);
|
||||
testOk1(nSet==1);
|
||||
|
||||
// Update 3: only client 1 gets it
|
||||
pvValue->put(3);
|
||||
|
||||
result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1);
|
||||
nSet = bitSet1->cardinality();
|
||||
debugOutput("client 1: update 3", result, nSet, bitSet1, pvStructureCopy1);
|
||||
testOk1(result==true);
|
||||
testOk1(nSet==1);
|
||||
|
||||
result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2);
|
||||
nSet = bitSet2->cardinality();
|
||||
debugOutput("client 2: update 3", result, nSet, bitSet2, pvStructureCopy2);
|
||||
testOk1(result==false);
|
||||
testOk1(nSet==0);
|
||||
|
||||
// Update 4: only client 2 gets it
|
||||
pvValue->put(4);
|
||||
|
||||
result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1);
|
||||
nSet = bitSet1->cardinality();
|
||||
debugOutput("client 1: update 4", result, nSet, bitSet1, pvStructureCopy1);
|
||||
testOk1(result==false);
|
||||
testOk1(nSet==0);
|
||||
|
||||
result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2);
|
||||
nSet = bitSet2->cardinality();
|
||||
debugOutput("client 2: update 4", result, nSet, bitSet2, pvStructureCopy2);
|
||||
testOk1(result==true);
|
||||
testOk1(nSet==1);
|
||||
}
|
||||
|
||||
MAIN(testPlugin)
|
||||
{
|
||||
testPlan(26);
|
||||
testPlan(46);
|
||||
PVDatabasePtr pvDatabase(PVDatabase::getMaster());
|
||||
deadbandTest();
|
||||
arrayTest();
|
||||
unionArrayTest();
|
||||
timeStampTest();
|
||||
ignoreTest();
|
||||
dataDistributorTest();
|
||||
return 0;
|
||||
}
|
||||
|
Reference in New Issue
Block a user