From dbf83dd0172996bfaf17baf3247389ac43151794 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 26 May 2023 13:34:34 -0500 Subject: [PATCH 1/4] added data distributor plugin --- src/copy/Makefile | 1 + src/copy/dataDistributorPlugin.cpp | 424 +++++++++++++++++++++++++++++ src/pv/dataDistributorPlugin.h | 167 ++++++++++++ 3 files changed, 592 insertions(+) create mode 100644 src/copy/dataDistributorPlugin.cpp create mode 100644 src/pv/dataDistributorPlugin.h diff --git a/src/copy/Makefile b/src/copy/Makefile index 3b506fe..3c7abd2 100644 --- a/src/copy/Makefile +++ b/src/copy/Makefile @@ -8,5 +8,6 @@ LIBSRCS += pvCopy.cpp LIBSRCS += pvArrayPlugin.cpp LIBSRCS += pvDeadbandPlugin.cpp LIBSRCS += pvTimestampPlugin.cpp +LIBSRCS += dataDistributorPlugin.cpp diff --git a/src/copy/dataDistributorPlugin.cpp b/src/copy/dataDistributorPlugin.cpp new file mode 100644 index 0000000..10f4394 --- /dev/null +++ b/src/copy/dataDistributorPlugin.cpp @@ -0,0 +1,424 @@ +// Copyright information and license terms for this software can be +// found in the file LICENSE that is included with the distribution + +#include + +#include +#include +#include +#include +#include + +#define epicsExportSharedSymbols +#include "pv/dataDistributorPlugin.h" + +using std::string; +using std::size_t; +using std::endl; +using std::tr1::static_pointer_cast; +using std::vector; +using namespace epics::pvData; +namespace epvd = epics::pvData; + +namespace epics { namespace pvCopy { + +// Utilities for manipulating strings +static std::string leftTrim(const std::string& s) +{ + int i; + int n = s.length(); + for (i = 0; i < n; i++) { + if (!isspace(s[i])) { + break; + } + } + return s.substr(i,n-i); +} + +static std::string rightTrim(const std::string& s) +{ + int i; + int n = s.length(); + for (i = n; i > 0; i--) { + if (!isspace(s[i-1])) { + break; + } + } + return s.substr(0,i); +} + +static std::string trim(const std::string& s) +{ + return rightTrim(leftTrim(s)); +} + +static std::vector& split(const std::string& s, char delimiter, std::vector& elements) +{ + std::stringstream ss(s); + std::string item; + while (std::getline(ss, item, delimiter)) { + elements.push_back(trim(item)); + } + return elements; +} + +static std::vector split(const std::string& s, char delimiter) +{ + std::vector elements; + split(s, delimiter, elements); + return elements; +} + +static std::string toLowerCase(const std::string& input) +{ + std::stringstream ss; + for (unsigned int i = 0; i < input.size(); i++) { + char c = std::tolower(input.at(i)); + ss << c; + } + return ss.str(); +} + +// Data distributor class + +static std::string name("distributor"); +bool DataDistributorPlugin::initialized(DataDistributorPlugin::initialize()); + +std::map DataDistributor::dataDistributorMap; +epics::pvData::Mutex DataDistributor::dataDistributorMapMutex; + +DataDistributorPtr DataDistributor::getInstance(const std::string& groupId) +{ + epvd::Lock lock(dataDistributorMapMutex); + std::map::iterator ddit = dataDistributorMap.find(groupId); + if (ddit != dataDistributorMap.end()) { + DataDistributorPtr ddPtr = ddit->second; + return ddPtr; + } + else { + DataDistributorPtr ddPtr(new DataDistributor(groupId)); + dataDistributorMap[groupId] = ddPtr; + return ddPtr; + } +} + +void DataDistributor::removeUnusedInstance(DataDistributorPtr dataDistributorPtr) +{ + epvd::Lock lock(dataDistributorMapMutex); + std::string groupId = dataDistributorPtr->getGroupId(); + std::map::iterator ddit = dataDistributorMap.find(groupId); + if (ddit != dataDistributorMap.end()) { + DataDistributorPtr ddPtr = ddit->second; + int nSets = ddPtr->clientSetMap.size(); + if (nSets == 0) { + dataDistributorMap.erase(ddit); + } + } +} + +DataDistributor::DataDistributor(const std::string& groupId_) + : groupId(groupId_) + , mutex() + , clientSetMap() + , clientSetIdList() + , currentSetIdIter(clientSetIdList.end()) + , lastUpdateValue() +{ +} + +DataDistributor::~DataDistributor() +{ + epvd::Lock lock(mutex); + clientSetMap.clear(); + clientSetIdList.clear(); +} + +std::string DataDistributor::addClient(int clientId, const std::string& setId, const std::string& triggerField, int nUpdatesPerClient, int updateMode) +{ + epvd::Lock lock(mutex); + std::map::iterator git = clientSetMap.find(setId); + if (git != clientSetMap.end()) { + ClientSetPtr setPtr = git->second; + setPtr->clientIdList.push_back(clientId); + return setPtr->triggerField; + } + else { + ClientSetPtr setPtr(new ClientSet(setId, triggerField, nUpdatesPerClient, updateMode)); + setPtr->clientIdList.push_back(clientId); + clientSetMap[setId] = setPtr; + clientSetIdList.push_back(setId); + return triggerField; + } +} + +void DataDistributor::removeClient(int clientId, const std::string& setId) +{ + epvd::Lock lock(mutex); + std::map::iterator git = clientSetMap.find(setId); + if (git != clientSetMap.end()) { + ClientSetPtr setPtr = git->second; + std::list::iterator cit = std::find(setPtr->clientIdList.begin(), setPtr->clientIdList.end(), clientId); + if (cit != setPtr->clientIdList.end()) { + // If we are removing current client id, advance iterator + if (cit == setPtr->currentClientIdIter) { + setPtr->currentClientIdIter++; + } + + // Find current client id + int currentClientId = -1; + if (setPtr->currentClientIdIter != setPtr->clientIdList.end()) { + currentClientId = *(setPtr->currentClientIdIter); + } + + // Remove client id from the list + setPtr->clientIdList.erase(cit); + + // Reset current client id iterator + setPtr->currentClientIdIter = setPtr->clientIdList.end(); + if (currentClientId >= 0) { + std::list::iterator cit2 = std::find(setPtr->clientIdList.begin(), setPtr->clientIdList.end(), currentClientId); + if (cit2 != setPtr->clientIdList.end()) { + setPtr->currentClientIdIter = cit2; + } + } + } + + if (setPtr->clientIdList.size() == 0) { + clientSetMap.erase(git); + std::list::iterator git2 = std::find(clientSetIdList.begin(), clientSetIdList.end(), setId); + if (git2 == currentSetIdIter) { + currentSetIdIter++; + } + if (git2 != clientSetIdList.end()) { + clientSetIdList.erase(git2); + } + } + } +} + +bool DataDistributor::updateClient(int clientId, const std::string& setId, const std::string& triggerFieldValue) +{ + epvd::Lock lock(mutex); + bool proceedWithUpdate = false; + if (currentSetIdIter == clientSetIdList.end()) { + currentSetIdIter = clientSetIdList.begin(); + } + std::string currentSetId = *currentSetIdIter; + if (setId != currentSetId) { + // We are not distributing data to this set at the moment + return proceedWithUpdate; + } + ClientSetPtr setPtr = clientSetMap[currentSetId]; + if (setPtr->currentClientIdIter == setPtr->clientIdList.end()) { + // Move current client iterator to the beginning of the list + setPtr->currentClientIdIter = setPtr->clientIdList.begin(); + } + if (lastUpdateValue == triggerFieldValue) { + // This update was already distributed. + return proceedWithUpdate; + } + switch (setPtr->updateMode) { + case(DD_UPDATE_ONE_PER_GROUP): { + if (clientId != *(setPtr->currentClientIdIter)) { + // Not this client's turn. + return proceedWithUpdate; + } + proceedWithUpdate = true; + lastUpdateValue = triggerFieldValue; + setPtr->lastUpdateValue = triggerFieldValue; + setPtr->updateCounter++; + if (setPtr->updateCounter >= setPtr->nUpdatesPerClient) { + // This client and set are done. + setPtr->currentClientIdIter++; + setPtr->updateCounter = 0; + currentSetIdIter++; + } + break; + } + case(DD_UPDATE_ALL_IN_GROUP): { + proceedWithUpdate = true; + static unsigned int nClientsUpdated = 0; + if (setPtr->lastUpdateValue != triggerFieldValue) { + setPtr->lastUpdateValue = triggerFieldValue; + setPtr->updateCounter++; + nClientsUpdated = 0; + } + nClientsUpdated++; + if (nClientsUpdated == setPtr->clientIdList.size() && setPtr->updateCounter >= setPtr->nUpdatesPerClient) { + // This set is done. + lastUpdateValue = triggerFieldValue; + setPtr->updateCounter = 0; + currentSetIdIter++; + } + break; + } + default: { + proceedWithUpdate = true; + } + } + return proceedWithUpdate; +} + +DataDistributorPlugin::DataDistributorPlugin() +{ +} + +DataDistributorPlugin::~DataDistributorPlugin() +{ +} + +void DataDistributorPlugin::create() +{ + initialize(); +} + +bool DataDistributorPlugin::initialize() +{ + DataDistributorPluginPtr pvPlugin = DataDistributorPluginPtr(new DataDistributorPlugin()); + PVPluginRegistry::registerPlugin(name,pvPlugin); + return true; +} + +PVFilterPtr DataDistributorPlugin::create( + const std::string& requestValue, + const PVCopyPtr& pvCopy, + const PVFieldPtr& master) +{ + return DataDistributorFilter::create(requestValue,pvCopy,master); +} + +DataDistributorFilter::~DataDistributorFilter() +{ + dataDistributorPtr->removeClient(clientId, setId); + DataDistributor::removeUnusedInstance(dataDistributorPtr); +} + +DataDistributorFilterPtr DataDistributorFilter::create( + const std::string& requestValue, + const PVCopyPtr& pvCopy, + const PVFieldPtr& master) +{ + static int clientId = 0; + clientId++; + + std::vector configItems = split(requestValue, ';'); + // Use lowercase keys if possible. + std::string requestValue2 = toLowerCase(requestValue); + std::vector configItems2 = split(requestValue2, ';'); + int nUpdatesPerClient = 1; + int updateMode = DataDistributor::DD_UPDATE_ONE_PER_GROUP; + std::string groupId = "default"; + std::string setId = "default"; + std::string triggerField = "timeStamp"; + bool hasUpdateMode = false; + bool hasSetId = false; + for(unsigned int i = 0; i < configItems2.size(); i++) { + std::string configItem2 = configItems2[i]; + size_t ind = configItem2.find(':'); + if (ind == string::npos) { + continue; + } + if(configItem2.find("updates") == 0) { + std::string svalue = configItem2.substr(ind+1); + nUpdatesPerClient = atoi(svalue.c_str()); + } + else if(configItem2.find("group") == 0) { + std::string configItem = configItems[i]; + groupId = configItem.substr(ind+1); + } + else if(configItem2.find("set") == 0) { + std::string configItem = configItems[i]; + setId = configItem.substr(ind+1); + hasSetId = true; + } + else if(configItem2.find("mode") == 0) { + std::string svalue = toLowerCase(configItem2.substr(ind+1)); + if (svalue == "one") { + updateMode = DataDistributor::DD_UPDATE_ONE_PER_GROUP; + hasUpdateMode = true; + } + else if (svalue == "all") { + updateMode = DataDistributor::DD_UPDATE_ALL_IN_GROUP; + hasUpdateMode = true; + } + } + else if(configItem2.find("trigger") == 0) { + std::string configItem = configItems[i]; + triggerField = configItem.substr(ind+1); + } + } + // If request does not have update mode specified, but has set id + // then use a different update mode + if(!hasUpdateMode && hasSetId) { + updateMode = DataDistributor::DD_UPDATE_ALL_IN_GROUP; + } + + // Make sure request is valid + if(nUpdatesPerClient <= 0) { + return DataDistributorFilterPtr(); + } + DataDistributorFilterPtr filter = + DataDistributorFilterPtr(new DataDistributorFilter(groupId, clientId, setId, triggerField, nUpdatesPerClient, updateMode, pvCopy, master)); + return filter; +} + +DataDistributorFilter::DataDistributorFilter(const std::string& groupId_, int clientId_, const std::string& setId_, const std::string& triggerField_, int nUpdatesPerClient, int updateMode, const PVCopyPtr& copyPtr_, const epvd::PVFieldPtr& masterFieldPtr_) + : dataDistributorPtr(DataDistributor::getInstance(groupId_)) + , clientId(clientId_) + , setId(setId_) + , triggerField(triggerField_) + , masterFieldPtr(masterFieldPtr_) + , triggerFieldPtr() + , firstUpdate(true) +{ + triggerField = dataDistributorPtr->addClient(clientId, setId, triggerField, nUpdatesPerClient, updateMode); + if(masterFieldPtr->getField()->getType() == epvd::structure) { + epvd::PVStructurePtr pvStructurePtr = static_pointer_cast(masterFieldPtr); + if(pvStructurePtr) { + triggerFieldPtr = pvStructurePtr->getSubField(triggerField); + } + } + if(!triggerFieldPtr) { + triggerFieldPtr = masterFieldPtr; + } +} + + +bool DataDistributorFilter::filter(const PVFieldPtr& pvCopy, const BitSetPtr& bitSet, bool toCopy) +{ + if(!toCopy) { + return false; + } + + bool proceedWithUpdate = false; + if(firstUpdate) { + // Always send first update + firstUpdate = false; + proceedWithUpdate = true; + } + else { + std::stringstream ss; + ss << triggerFieldPtr; + std::string triggerFieldValue = ss.str(); + proceedWithUpdate = dataDistributorPtr->updateClient(clientId, setId, triggerFieldValue); + } + + if(proceedWithUpdate) { + pvCopy->copyUnchecked(*masterFieldPtr); + bitSet->set(pvCopy->getFieldOffset()); + } + else { + // Clear all bits + //bitSet->clear(pvCopy->getFieldOffset()); + bitSet->clear(); + } + + return true; +} + +string DataDistributorFilter::getName() +{ + return name; +} + +}} diff --git a/src/pv/dataDistributorPlugin.h b/src/pv/dataDistributorPlugin.h new file mode 100644 index 0000000..b4db73f --- /dev/null +++ b/src/pv/dataDistributorPlugin.h @@ -0,0 +1,167 @@ +// Copyright information and license terms for this software can be +// found in the file LICENSE that is included with the distribution + +#ifndef DATA_DISTRIBUTOR_PLUGIN_H +#define DATA_DISTRIBUTOR_PLUGIN_H + +// The data distributor plugin enables distribution of channel data between +// multiple client applications. + +#include +#include +#include +#include +#include +#include + +#include + +namespace epics { namespace pvCopy { + +class DataDistributorPlugin; +class DataDistributorFilter; +class DataDistributor; + +typedef std::tr1::shared_ptr DataDistributorPluginPtr; +typedef std::tr1::shared_ptr DataDistributorFilterPtr; +typedef std::tr1::shared_ptr DataDistributorPtr; + +struct ClientSet; +typedef std::tr1::shared_ptr ClientSetPtr; +typedef std::tr1::shared_ptr ClientSetConstPtr; + +struct ClientSet +{ + POINTER_DEFINITIONS(ClientSet); + + ClientSet(const std::string& setId_, const std::string triggerField_, int nUpdatesPerClient_, int updateMode_) + : setId(setId_) + , triggerField(triggerField_) + , nUpdatesPerClient(nUpdatesPerClient_) + , updateMode(updateMode_) + , clientIdList() + , lastUpdateValue() + , updateCounter(0) + , currentClientIdIter(clientIdList.end()) + {} + ~ClientSet() {} + std::string setId; + std::string triggerField; + int nUpdatesPerClient; + int updateMode; + std::list clientIdList; + std::string lastUpdateValue; + int updateCounter; + std::list::iterator currentClientIdIter; +}; + +class DataDistributor +{ +public: + enum ClientUpdateMode { + DD_UPDATE_ONE_PER_GROUP = 0, // Update goes to one client per set + DD_UPDATE_ALL_IN_GROUP = 1, // Update goes to all clients in set + DD_N_UPDATE_MODES = 2 // Number of valid update modes + }; + + static DataDistributorPtr getInstance(const std::string& groupId); + static void removeUnusedInstance(DataDistributorPtr dataDistributorPtr); + + virtual ~DataDistributor(); + std::string getGroupId() const { return groupId; } + std::string addClient(int clientId, const std::string& setId, const std::string& triggerField, int nUpdatesPerClient, int updateMode); + void removeClient(int clientId, const std::string& setId); + bool updateClient(int clientId, const std::string& setId, const std::string& triggerFieldValue); + +private: + DataDistributor(const std::string& id); + DataDistributor(const DataDistributor& distributor); + DataDistributor& operator=(const DataDistributor& distributor); + + static std::map dataDistributorMap; + static epics::pvData::Mutex dataDistributorMapMutex; + + std::string groupId; + epics::pvData::Mutex mutex; + std::map clientSetMap; + std::list clientSetIdList; + std::list::iterator currentSetIdIter; + std::string lastUpdateValue; +}; + +class epicsShareClass DataDistributorPlugin : public PVPlugin +{ +private: + DataDistributorPlugin(); +public: + POINTER_DEFINITIONS(DataDistributorPlugin); + virtual ~DataDistributorPlugin(); + /** + * Factory + */ + static void create(); + /** + * Create a PVFilter. + * @param requestValue The value part of a name=value request option. + * @param pvCopy The PVCopy to which the PVFilter will be attached. + * @param master The field in the master PVStructure to which the PVFilter will be attached + * @return The PVFilter. + * Null is returned if master or requestValue is not appropriate for the plugin. + */ + virtual PVFilterPtr create( + const std::string& requestValue, + const PVCopyPtr& pvCopy, + const epics::pvData::PVFieldPtr& master); +private: + static bool initialize(); + static bool initialized; +}; + +/** + * @brief A Plugin for a filter that gets a sub array from a PVScalarDeadband. + */ +class epicsShareClass DataDistributorFilter : public PVFilter +{ +private: + DataDistributorPtr dataDistributorPtr; + int clientId; + std::string setId; + std::string triggerField; + epics::pvData::PVFieldPtr masterFieldPtr; + epics::pvData::PVFieldPtr triggerFieldPtr; + bool firstUpdate; + + DataDistributorFilter(const std::string& groupId, int clientId, const std::string& setId, const std::string& triggerField, int nUpdatesPerClient, int updateMode, const epics::pvCopy::PVCopyPtr& copyPtr, const epics::pvData::PVFieldPtr& masterFieldPtr); + +public: + POINTER_DEFINITIONS(DataDistributorFilter); + virtual ~DataDistributorFilter(); + /** + * Create a DataDistributorFilter. + * @param requestValue The value part of a name=value request option. + * @param master The field in the master PVStructure to which the PVFilter will be attached. + * @return The PVFilter. + * A null is returned if master or requestValue is not appropriate for the plugin. + */ + static DataDistributorFilterPtr create( + const std::string& requestValue, + const PVCopyPtr& pvCopy, + const epics::pvData::PVFieldPtr & master); + /** + * Perform a filter operation + * @param pvCopy The field in the copy PVStructure. + * @param bitSet A bitSet for copyPVStructure. + * @param toCopy (true,false) means copy (from master to copy,from copy to master) + * @return if filter (modified, did not modify) destination. + * Null is returned if master or requestValue is not appropriate for the plugin. + */ + bool filter(const epics::pvData::PVFieldPtr & pvCopy,const epics::pvData::BitSetPtr & bitSet,bool toCopy); + /** + * Get the filter name. + * @return The name. + */ + std::string getName(); +}; + +}} +#endif From c41f7cb3dc98025e27b985c40f248fa18769d08f Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 26 May 2023 13:34:43 -0500 Subject: [PATCH 2/4] added data distributor plugin tests --- test/src/testPlugin.cpp | 116 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/test/src/testPlugin.cpp b/test/src/testPlugin.cpp index f2b341a..4f83a6e 100644 --- a/test/src/testPlugin.cpp +++ b/test/src/testPlugin.cpp @@ -332,14 +332,128 @@ static void ignoreTest() testOk1(nset==3); } +static void debugOutput(const string& what, bool result, uint32 nSet, BitSetPtr bitSet, PVStructurePtr pvStructureCopy) +{ + if(debug) { + cout << what + << " result " << (result ? "true" : "false") + << " nSet " << nSet + << " bitSet " << *bitSet + << "\n pvStructureCopy\n" << pvStructureCopy + << "\n"; + } +} + +static void dataDistributorTest() +{ + if(debug) {cout << endl << endl << "****dataDistributorTest****" << endl;} + bool result = false; + uint32 nSet = 0; + + // Create test structure + PVStructurePtr pvRecordStructure(getStandardPVField()->scalar(pvInt,"")); + PVIntPtr pvValue(pvRecordStructure->getSubField("value")); + PVRecordPtr pvRecord(PVRecord::create("intRecord",pvRecordStructure)); + if(debug) { + cout << " pvRecordStructure\n" << pvRecordStructure + << "\n"; + } + + // Request distributor plugin with trigger field value + PVStructurePtr pvRequest(CreateRequest::create()->createRequest("_[distributor=trigger:value]")); + + // Create clients + PVCopyPtr pvCopy1(PVCopy::create(pvRecordStructure,pvRequest,"")); + PVStructurePtr pvStructureCopy1(pvCopy1->createPVStructure()); + BitSetPtr bitSet1(new BitSet(pvStructureCopy1->getNumberFields())); + + PVCopyPtr pvCopy2(PVCopy::create(pvRecordStructure,pvRequest,"")); + PVStructurePtr pvStructureCopy2(pvCopy2->createPVStructure()); + BitSetPtr bitSet2(new BitSet(pvStructureCopy2->getNumberFields())); + + // Update 0: both clients get it + result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1); + nSet = bitSet1->cardinality(); + debugOutput("client 1: update 0", result, nSet, bitSet1, pvStructureCopy1); + testOk1(result==true); + testOk1(nSet==1); + + result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2); + nSet = bitSet2->cardinality(); + debugOutput("client 2: update 0", result, nSet, bitSet2, pvStructureCopy2); + testOk1(result==true); + testOk1(nSet==1); + + // Update 1: only client 1 gets it + pvValue->put(1); + + result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1); + nSet = bitSet1->cardinality(); + debugOutput("client 1: update 1", result, nSet, bitSet1, pvStructureCopy1); + testOk1(result==true); + testOk1(nSet==1); + + result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2); + nSet = bitSet2->cardinality(); + debugOutput("client 2: update 1", result, nSet, bitSet2, pvStructureCopy2); + testOk1(result==false); + testOk1(nSet==0); + + // Update 2: only client 2 gets it + pvValue->put(2); + + result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1); + nSet = bitSet1->cardinality(); + debugOutput("client 1: update 2", result, nSet, bitSet1, pvStructureCopy1); + testOk1(result==false); + testOk1(nSet==0); + + result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2); + nSet = bitSet2->cardinality(); + debugOutput("client 2: update 2", result, nSet, bitSet2, pvStructureCopy2); + testOk1(result==true); + testOk1(nSet==1); + + // Update 3: only client 1 gets it + pvValue->put(3); + + result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1); + nSet = bitSet1->cardinality(); + debugOutput("client 1: update 3", result, nSet, bitSet1, pvStructureCopy1); + testOk1(result==true); + testOk1(nSet==1); + + result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2); + nSet = bitSet2->cardinality(); + debugOutput("client 2: update 3", result, nSet, bitSet2, pvStructureCopy2); + testOk1(result==false); + testOk1(nSet==0); + + // Update 4: only client 2 gets it + pvValue->put(4); + + result = pvCopy1->updateCopySetBitSet(pvStructureCopy1,bitSet1); + nSet = bitSet1->cardinality(); + debugOutput("client 1: update 4", result, nSet, bitSet1, pvStructureCopy1); + testOk1(result==false); + testOk1(nSet==0); + + result = pvCopy2->updateCopySetBitSet(pvStructureCopy2,bitSet2); + nSet = bitSet2->cardinality(); + debugOutput("client 2: update 4", result, nSet, bitSet2, pvStructureCopy2); + testOk1(result==true); + testOk1(nSet==1); +} + MAIN(testPlugin) { - testPlan(26); + testPlan(46); PVDatabasePtr pvDatabase(PVDatabase::getMaster()); deadbandTest(); arrayTest(); unionArrayTest(); timeStampTest(); ignoreTest(); + dataDistributorTest(); return 0; } From 91d0d2c315b02c9d4335cf8337a44adacfeaf4e9 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Fri, 26 May 2023 13:35:06 -0500 Subject: [PATCH 3/4] added data distributor plugin documentation and updated release notes --- documentation/RELEASE_NOTES.md | 22 +++ documentation/dataDistributorPlugin.md | 236 +++++++++++++++++++++++++ 2 files changed, 258 insertions(+) create mode 100644 documentation/dataDistributorPlugin.md diff --git a/documentation/RELEASE_NOTES.md b/documentation/RELEASE_NOTES.md index cf3ab93..0166a5e 100644 --- a/documentation/RELEASE_NOTES.md +++ b/documentation/RELEASE_NOTES.md @@ -4,6 +4,28 @@ This document summarizes the changes to the module between releases. ## Release 4.7.1 (EPICS 7.0.x, UNRELEASED) +* Added data distributor plugin which can be used for distributing data between + a group of clients. The plugin is triggered by the request string of the + form: + + `_[distributor=group:;set:;trigger:;updates:;mode:]` + + The plugin parameters are described bellow: + + - group: this parameter indicates a group that client application belongs to (default value: "default"); groups of clients are completely independent of each other + + - set: this parameter designates a client set that application belongs to within its group (default value: "default") + + - trigger: this is the PV structure field that distinguishes different channel updates (default value: "timeStamp"); for example, for area detector images one could use the "uniqueId" field of the NTND structure + + - updates: this parameter configures how many sequential updates a client (or a set of clients) will receive before the data distributor starts updating the next one (default value: "1") + + - mode: this parameter configures how channel updates are to be distributed between clients in a set: + - one: update goes to one client per set + - all: update goes to all clients in a set + - default is "one" if client set id is not specified, and "all" if set id is specified + + For more information and examples of usage see the [plugin documentation](dataDistributorPlugin.md). ## Release 4.7.0 (EPICS 7.0.7, Sep 2022) diff --git a/documentation/dataDistributorPlugin.md b/documentation/dataDistributorPlugin.md new file mode 100644 index 0000000..6ede405 --- /dev/null +++ b/documentation/dataDistributorPlugin.md @@ -0,0 +1,236 @@ +# Data Distributor Plugin + +The data distributor plugin enables distribution of channel data between +multiple client applications. The plugin considers two basic use cases +for a group of clients: + +- For simple parallel processing where client applications do not need +to share data all clients in a group receive n sequential updates +in a round-robin fashion: client \#1 sees the first n updates, client \#2 the +second n updates, and so on. + +- For data analysis where several cooperating client applications must all +see the same data in order to process it the applications are grouped +into sets, and each set of clients receives the same number of sequential +updates. The first n updates are sent to all members of client set #1, the second n updates are sent to all members of client set #2, and so on. + +## Requirements + +This plugin relies on the pvDatabase plugin framework and requires +epics base version > 7.0.7 + +## Usage + +The PV request object which triggers plugin instantiation is defined below: + +``` +"_[distributor=group:;set:;trigger:;updates:;mode:]" +``` + +The underscore character at the begining of the PV request object +indicates that the data distributor will be targeting entire PV structure. +The same PV request object format should work regardless of the language +in which a particular client application is written. + +The plugin parameters are the following: + +- group: this parameter indicates a group that client application belongs to +(default value: "default"); groups of clients are completely independent +of each other + +- set: this parameter designates a client set that application belongs to +within its group (default value: "default") + +- trigger: this is the PV structure field that distinguishes +different channel updates (default value: "timeStamp"); for example, +for area detector images one could use the "uniqueId" field of the NTND +structure + +- updates: this parameter configures how many sequential updates +a client (or a set of clients) will receive before the data distributor +starts updating the next one (default value: "1") + +- mode: this parameter configures how channel updates are to be +distributed between clients in a set: + - one: update goes to one client per set + - all: update goes to all clients in a set + - default is "one" if client set id is not specified, and "all" if set + id is specified + +The plugin obeys the following rules: + +- Parameter names are case insensitive, but the string values +are not. For example, "group=abc" and "group=ABC" would indicate two +different groups of clients. + +- Updates for a set of clients are configured when the first client in +the set requests data. Configuration values (i.e., "trigger", +"updates", and "mode"), passed in the PV request by the subsequent +clients are ignored. + +- A set is removed from the group once the last client in that +set disconnects. + +- A group is removed from the distributor plugin once all of its +clients have disconnected. + +- Different client groups are completely independent of each other. +In other words, channel updates sent to clients belonging to +group A do not interfere with updates sent to clients +belonging to group B. + +- The order in which clients and groups receive data is on a +"first connected, first served basis". + +- The current channel PV object is always distributed to a client on an +initial connect. + +- Data distribution is dynamic with respect to the number of clients. +As clients connect and disconnect, the data distribution in a group adjusts +accordingly. For example, with a group of clients configured to +distribute one sequential update to each client, three clients would each be +receiving every third update; after client number four connects, all +clients would start receiving every fourth update; if one of those then +disconnects, remaining three clients would again be receiving every third +update. + +## Examples + +For all examples below we assume that PVDatabase server is serving +area detector images on the channel 'image'. All clients are started before +the server itself, and the initial (empty) object has unique id of 0. + +### Example 1 + +This example show behavior of three clients that belong to the same (default) +group. Each client receives one sequential update in a round-robin fashion. +Note that all clients received current object on initial connection, +and every third object afterward: + +Client 1: +``` +$ pvget -m -r _[distributor=trigger:uniqueId] image | grep uniqueId + int uniqueId 0 + int uniqueId 1 + int uniqueId 4 + int uniqueId 7 + int uniqueId 10 +``` + +Client 2: +``` +$ pvget -m -r _[distributor=trigger:uniqueId] image | grep uniqueId + int uniqueId 0 + int uniqueId 2 + int uniqueId 5 + int uniqueId 8 + int uniqueId 11 +``` + +Client 3: +``` +$ pvget -m -r _[distributor=trigger:uniqueId] image | grep uniqueId + int uniqueId 0 + int uniqueId 3 + int uniqueId 6 + int uniqueId 9 + int uniqueId 12 +``` + +### Example 2 + +In this example we have two sets of two clients, each client set receiving +three sequential updates. Both clients from client set \#1 receive updates +(1,2,3), both clients from client set \#2 receive updates (4,5,6), +client set \#1 receives updates (7,8,9), and so on. + +Client 1 and Client 2/Set 1: +``` +$ pvget -m -r "_[distributor=set:S1;trigger:uniqueId;updates:3]" image | grep uniqueId + int uniqueId 0 + int uniqueId 1 + int uniqueId 2 + int uniqueId 3 + int uniqueId 7 + int uniqueId 8 + int uniqueId 9 + int uniqueId 13 + int uniqueId 14 + int uniqueId 15 +``` + +Client 3 and Client 4/Set 2: +``` +$ pvget -m -r "_[distributor=set:S2;trigger:uniqueId;updates:3]" image | grep uniqueId + int uniqueId 0 + int uniqueId 4 + int uniqueId 5 + int uniqueId 6 + int uniqueId 10 + int uniqueId 11 + int uniqueId 12 + int uniqueId 16 + int uniqueId 17 + int uniqueId 18 +``` + +### Example 3 + +This example illustrates what happens when multiple independent groups of +clients connect to the same channel. Group G1 has two clients belonging +to the same default set, and requesting one sequential update per client, while +Group G2 has two clients in the default set requesting three +sequential updates per client. + +In this case the first client in group G1 receives updates +(1,3,5,...), while the second one receives updates (2,4,6,...). On the +other hand, the first client in group G2 receives updates +(1,2,3,7,8,9,...), while the second one receives updates (4,5,6,10,11,12,...). + +Client 1/Group G1: +``` +$ pvget -m -r "_[distributor=group:G1;trigger:uniqueId]" image | grep uniqueId + int uniqueId 0 + int uniqueId 1 + int uniqueId 3 + int uniqueId 5 + int uniqueId 7 + int uniqueId 9 +``` + +Client 2/Group G1: +``` +pvget -m -r "_[distributor=group:G1;trigger:uniqueId]" image | grep uniqueId + int uniqueId 0 + int uniqueId 2 + int uniqueId 4 + int uniqueId 6 + int uniqueId 8 +``` + +Client 1/Group G2: +``` +$ pvget -m -r "_[distributor=group:G2;trigger:uniqueId;updates:3]" image | grep uniqueId + int uniqueId 0 + int uniqueId 1 + int uniqueId 2 + int uniqueId 3 + int uniqueId 7 + int uniqueId 8 + int uniqueId 9 +``` + +Client 2/Group G2: +``` +$ pvget -m -r "_[distributor=group:G2;trigger:uniqueId;updates:3]" image | grep uniqueId + int uniqueId 0 + int uniqueId 4 + int uniqueId 5 + int uniqueId 6 + int uniqueId 10 + int uniqueId 11 + int uniqueId 12 +``` + +The above shows that the two client groups do not interfere with each other. + From 00a84599447447dd26ddcc9ae4b7da0ec4a00501 Mon Sep 17 00:00:00 2001 From: Sinisa Veseli Date: Wed, 28 Jun 2023 13:11:56 -0500 Subject: [PATCH 4/4] address ANL feedback --- documentation/RELEASE_NOTES.md | 2 +- documentation/dataDistributorPlugin.md | 29 ++++++++++++-------------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/documentation/RELEASE_NOTES.md b/documentation/RELEASE_NOTES.md index 0166a5e..a28de92 100644 --- a/documentation/RELEASE_NOTES.md +++ b/documentation/RELEASE_NOTES.md @@ -10,7 +10,7 @@ This document summarizes the changes to the module between releases. `_[distributor=group:;set:;trigger:;updates:;mode:]` - The plugin parameters are described bellow: + The plugin parameters are optional and are described bellow: - group: this parameter indicates a group that client application belongs to (default value: "default"); groups of clients are completely independent of each other diff --git a/documentation/dataDistributorPlugin.md b/documentation/dataDistributorPlugin.md index 6ede405..fdb353d 100644 --- a/documentation/dataDistributorPlugin.md +++ b/documentation/dataDistributorPlugin.md @@ -34,34 +34,31 @@ in which a particular client application is written. The plugin parameters are the following: -- group: this parameter indicates a group that client application belongs to -(default value: "default"); groups of clients are completely independent -of each other +- `group:`: specifying a `group_id` names a group the client application belongs to (default value: `default`); clients with different group names are +completely independent of each other -- set: this parameter designates a client set that application belongs to -within its group (default value: "default") +- `set:`: this parameter designates a client set that application belongs to within its group (default value: `default`) -- trigger: this is the PV structure field that distinguishes -different channel updates (default value: "timeStamp"); for example, -for area detector images one could use the "uniqueId" field of the NTND +- `trigger:`: this is the PV structure field that distinguishes +different channel updates (default value: `timeStamp`); for example, +for area detector images one could use the `uniqueId` field of the NTND structure -- updates: this parameter configures how many sequential updates -a client (or a set of clients) will receive before the data distributor -starts updating the next one (default value: "1") +- `updates:`: this parameter must be an integer and configures how many sequential updates a client (or a set of clients) will receive before the data distributor starts updating the next one (default value: `1`) -- mode: this parameter configures how channel updates are to be +- `mode:`: this parameter configures how channel updates are to be distributed between clients in a set: - - one: update goes to one client per set - - all: update goes to all clients in a set - - default is "one" if client set id is not specified, and "all" if set + - `one`: update goes to one client per set + - `all`: update goes to all clients in a set + - default is `one` if client set id is not specified, and `all` if set id is specified The plugin obeys the following rules: - Parameter names are case insensitive, but the string values are not. For example, "group=abc" and "group=ABC" would indicate two -different groups of clients. +different groups of clients. String values allow alphanumeric characters, +as well as dashes and underscores. - Updates for a set of clients are configured when the first client in the set requests data. Configuration values (i.e., "trigger",