get and monitor: only report fields that have changed

This commit is contained in:
mrkraimer
2018-02-16 06:03:57 -05:00
parent 952c482d68
commit 99d20ae903
4 changed files with 72 additions and 35 deletions

View File

@@ -755,7 +755,8 @@ CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
:
channel(channel),
channelGetRequester(channelGetRequester),
pvRequest(pvRequest)
pvRequest(pvRequest),
firstTime(true)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelGet::CAChannelGet() " << channel->getChannelName() << endl;
@@ -780,7 +781,9 @@ void CAChannelGet::activate()
getType = getDBRType(pvRequest, channel->getNativeType());
pvStructure = createPVStructure(channel, getType, pvRequest);
bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
bitSet->set(0);
pvCopy = PVCopy::create(
createPVStructure(channel, getType, pvRequest),
pvRequest,"");
channel->addChannelGet(shared_from_this());
if(channel->getConnectionState()==Channel::CONNECTED) {
EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(),
@@ -793,6 +796,7 @@ void CAChannelGet::channelCreated(const Status& status,Channel::shared_pointer c
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelGet::channelCreated " << channel->getChannelName() << endl;
}
firstTime = true;
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
chtype newType = getDBRType(pvRequest, channel->getNativeType());
@@ -800,7 +804,7 @@ void CAChannelGet::channelCreated(const Status& status,Channel::shared_pointer c
getType = getDBRType(pvRequest, channel->getNativeType());
pvStructure = createPVStructure(channel, getType, pvRequest);
bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
bitSet->set(0);
pvCopy = PVCopy::create(pvStructure,pvRequest,"");
}
EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(),
pvStructure->getStructure()));
@@ -1195,7 +1199,10 @@ static copyDBRtoPVStructure copyFuncTable[] =
void CAChannelGet::getDone(struct event_handler_args &args)
{
if(DEBUG_LEVEL>1) {
std::cout << "CAChannelGet::getDone " << channel->getChannelName() << endl;
std::cout << "CAChannelGet::getDone "
<< channel->getChannelName()
<< " firstTime " << (firstTime ? "true" : "false")
<< endl;
}
ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
if(!getRequester) return;
@@ -1208,6 +1215,12 @@ void CAChannelGet::getDone(struct event_handler_args &args)
{
throw std::runtime_error("CAChannelGet::getDone no copy func implemented");
}
pvCopy->updateMasterSetBitSet(pvStructure,bitSet);
if(firstTime) {
bitSet->clear();
bitSet->set(0);
firstTime = false;
}
EXCEPTION_GUARD(getRequester->getDone(Status::Ok, shared_from_this(), pvStructure, bitSet));
}
else
@@ -1235,7 +1248,7 @@ void CAChannelGet::get()
Prior to R3.14.12 requesting zero elements in a ca_array_get_callback() call was illegal and would fail
immediately.
*/
bitSet->clear();
int result = ca_array_get_callback(getType,
0,
channel->getChannelID(), ca_get_handler, this);
@@ -1720,7 +1733,6 @@ public:
POINTER_DEFINITIONS(CACMonitorQueue);
private:
size_t queueSize;
bool overrunInProgress;
bool isStarted;
Mutex mutex;
@@ -1729,7 +1741,6 @@ public:
CACMonitorQueue(
int32 queueSize)
: queueSize(queueSize),
overrunInProgress(false),
isStarted(false)
{}
~CACMonitorQueue()
@@ -1739,39 +1750,29 @@ public:
{
Lock guard(mutex);
while(!monitorElementQueue.empty()) monitorElementQueue.pop();
overrunInProgress = false;
isStarted = true;
}
void stop()
{
Lock guard(mutex);
while(!monitorElementQueue.empty()) monitorElementQueue.pop();
overrunInProgress = false;
isStarted = false;
}
// return true if added to queue
bool event(const PVStructurePtr &pvStructure)
bool event(
const PVStructurePtr &pvStructure,
const MonitorElementPtr & activeElement)
{
Lock guard(mutex);
if(!isStarted) return false;
if(monitorElementQueue.size()==queueSize)
{
overrunInProgress = true;
return false;
} else {
PVStructure::shared_pointer pvs =
getPVDataCreate()->createPVStructure(pvStructure->getStructure());
pvs->copy(*pvStructure);
MonitorElementPtr monitorElement(new MonitorElement(pvs));
monitorElement->changedBitSet->set(0);
if(overrunInProgress) {
overrunInProgress = false;
monitorElement->overrunBitSet->set(0);
}
monitorElementQueue.push(monitorElement);
return true;
}
if(monitorElementQueue.size()==queueSize) return false;
PVStructure::shared_pointer pvs =
getPVDataCreate()->createPVStructure(pvStructure);
MonitorElementPtr monitorElement(new MonitorElement(pvs));
*(monitorElement->changedBitSet) = *(activeElement->changedBitSet);
*(monitorElement->overrunBitSet) = *(activeElement->overrunBitSet);
monitorElementQueue.push(monitorElement);
return true;
}
MonitorElementPtr poll()
{
@@ -1823,7 +1824,8 @@ CAChannelMonitor::CAChannelMonitor(
channel(channel),
monitorRequester(monitorRequester),
pvRequest(pvRequest),
isStarted(false)
isStarted(false),
firstTime(true)
{
if(DEBUG_LEVEL>0) {
cout << "CAChannelMonitor::CAChannelMonitor() " << channel->getChannelName() << endl;
@@ -1840,6 +1842,10 @@ void CAChannelMonitor::activate()
if(pvStructure) throw std::runtime_error("CAChannelMonitor::activate() was called twice");
getType = getDBRType(pvRequest, channel->getNativeType());
pvStructure = createPVStructure(channel, getType, pvRequest);
activeElement = MonitorElementPtr(new MonitorElement(pvStructure));
pvCopy = PVCopy::create(
createPVStructure(channel, getType, pvRequest),
pvRequest,"");
int32 queueSize = 2;
PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
if (pvOptions) {
@@ -1865,12 +1871,17 @@ void CAChannelMonitor::channelCreated(const Status& status,Channel::shared_point
if(DEBUG_LEVEL>0) {
std::cout << "CAChannelMonitor::channelCreated " << channel->getChannelName() << endl;
}
firstTime = true;
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
chtype newType = getDBRType(pvRequest, channel->getNativeType());
if(newType!=getType) {
getType = getDBRType(pvRequest, channel->getNativeType());
pvStructure = createPVStructure(channel, getType, pvRequest);
activeElement = MonitorElementPtr(new MonitorElement(pvStructure));
pvCopy = PVCopy::create(
createPVStructure(channel, getType, pvRequest),
pvRequest,"");
int32 queueSize = 2;
PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
if (pvOptions) {
@@ -1917,7 +1928,10 @@ void CAChannelMonitor::channelDisconnect(bool destroy)
void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
{
if(DEBUG_LEVEL>1) {
std::cout << "CAChannelMonitor::subscriptionEvent " << channel->getChannelName() << endl;
std::cout << "CAChannelMonitor::subscriptionEvent "
<< channel->getChannelName()
<< " firstTime " << (firstTime ? "true" : "false")
<< endl;
}
MonitorRequester::shared_pointer requester(monitorRequester.lock());
if(!requester) return;
@@ -1926,7 +1940,20 @@ void CAChannelMonitor::subscriptionEvent(struct event_handler_args &args)
copyDBRtoPVStructure copyFunc = copyFuncTable[getType];
if (copyFunc) {
copyFunc(args.dbr, args.count, pvStructure);
monitorQueue->event(pvStructure);
pvCopy->updateMasterSetBitSet(pvStructure,activeElement->changedBitSet);
if(firstTime) {
activeElement->changedBitSet->clear();
activeElement->overrunBitSet->clear();
activeElement->changedBitSet->set(0);
firstTime = false;
}
if(monitorQueue->event(pvStructure,activeElement)) {
activeElement->changedBitSet->clear();
activeElement->overrunBitSet->clear();
} else {
*(activeElement->overrunBitSet) |= *(activeElement->changedBitSet);
}
// call monitorRequester even if queue is full
requester->monitorEvent(shared_from_this());
} else {
@@ -1966,6 +1993,7 @@ epics::pvData::Status CAChannelMonitor::start()
*/
// TODO DBE_PROPERTY support
monitorQueue->start();
int result = ca_create_subscription(getType,
0,
channel->getChannelID(), DBE_VALUE,
@@ -1974,7 +2002,6 @@ epics::pvData::Status CAChannelMonitor::start()
if (result == ECA_NORMAL)
{
isStarted = true;
monitorQueue->start();
result = ca_flush_io();
}
if (result == ECA_NORMAL) return status;