make compatible with latest pvDataCPP and pvAccessCPP; lots of work on queues.

This commit is contained in:
Marty Kraimer
2013-11-06 15:44:58 -05:00
parent 14b3640e9a
commit 006859120e
12 changed files with 78 additions and 554 deletions

View File

@@ -98,7 +98,8 @@ public:
virtual ~RealQueue(){}
RealQueue(
MonitorLocalPtr const &monitorLocal,
std::vector<MonitorElementPtr> &monitorElementArray);
std::vector<MonitorElementPtr> &monitorElementArray,
size_t nfields);
virtual Status start();
virtual void stop();
virtual bool dataChanged();
@@ -107,8 +108,10 @@ public:
private:
std::tr1::weak_ptr<MonitorLocal> monitorLocal;
Queue<MonitorElement> queue;
BitSetPtr changedBitSet;
BitSetPtr overrunBitSet;
MonitorElementPtr latestMonitorElement;
bool queueIsFull;
MonitorElementPtr monitorElement;
};
@@ -333,7 +336,8 @@ bool MonitorLocal::init(PVStructurePtr const & pvRequest)
new MonitorElement(pvStructure));
monitorElementArray.push_back(monitorElement);
}
queue = RealQueuePtr(new RealQueue(getPtrSelf(),monitorElementArray));
size_t nfields = monitorElementArray[0]->pvStructurePtr->getNumberFields();
queue = RealQueuePtr(new RealQueue(getPtrSelf(),monitorElementArray,nfields));
}
// MARTY MUST IMPLEMENT algorithm
monitorRequester->monitorConnect(
@@ -447,11 +451,12 @@ MonitorElementPtr NOQueue::poll()
if(ml==NULL) return NULLMonitorElement;
ml->getPVCopy()->updateCopyFromBitSet(
pvCopyStructure, changedBitSet);
getConvert()->copyStructure(pvCopyStructure,monitorElement->pvStructurePtr);
BitSetUtil::compress(changedBitSet,pvCopyStructure);
BitSetUtil::compress(overrunBitSet,pvCopyStructure);
(*monitorElement->changedBitSet) = (*changedBitSet);
(*monitorElement->overrunBitSet) = (*overrunBitSet);
monitorElement->changedBitSet->clear();
monitorElement->overrunBitSet->clear();
(*monitorElement->changedBitSet) |= (*changedBitSet);
(*monitorElement->overrunBitSet) |= (*overrunBitSet);
changedBitSet->clear();
overrunBitSet->clear();
return monitorElement;
@@ -464,9 +469,12 @@ void NOQueue::release(MonitorElementPtr const &monitorElement)
RealQueue::RealQueue(
MonitorLocalPtr const &monitorLocal,
std::vector<MonitorElementPtr> &monitorElementArray)
std::vector<MonitorElementPtr> &monitorElementArray,
size_t nfields)
: monitorLocal(monitorLocal),
queue(monitorElementArray),
changedBitSet(new BitSet(nfields)),
overrunBitSet(new BitSet(nfields)),
queueIsFull(false)
{
}
@@ -474,14 +482,11 @@ RealQueue::RealQueue(
Status RealQueue::start()
{
queue.clear();
monitorElement = queue.getFree();
monitorElement->changedBitSet->clear();
monitorElement->overrunBitSet->clear();
changedBitSet->clear();
overrunBitSet->clear();
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return wasDestroyedStatus;
ml->getPVCopyMonitor()->startMonitoring(
monitorElement->changedBitSet,
monitorElement->overrunBitSet);
ml->getPVCopyMonitor()->startMonitoring(changedBitSet,overrunBitSet);
return Status::Ok;
}
@@ -491,29 +496,38 @@ void RealQueue::stop()
bool RealQueue::dataChanged()
{
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return false;
ml->getPVCopy()->updateCopyFromBitSet(
pvStructure,monitorElement->changedBitSet);
if(queue.getNumberFree()==0) {
if(queueIsFull) return false;
queueIsFull = true;
return true;
if(queueIsFull) {
MonitorElementPtr monitorElement = latestMonitorElement;
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
ml->getPVCopy()->updateCopyFromBitSet(pvStructure,changedBitSet);
(*monitorElement->changedBitSet)|= (*changedBitSet);
(*monitorElement->overrunBitSet)|= (*changedBitSet);
changedBitSet->clear();
overrunBitSet->clear();
return false;
}
MonitorElementPtr newElement = queue.getFree();
if(newElement==NULL) {
MonitorElementPtr monitorElement = queue.getFree();
if(monitorElement==NULL) {
throw std::logic_error(String("RealQueue::dataChanged() logic error"));
}
BitSetUtil::compress(monitorElement->changedBitSet,pvStructure);
BitSetUtil::compress(monitorElement->overrunBitSet,pvStructure);
convert->copy(pvStructure,newElement->pvStructurePtr);
newElement->changedBitSet->clear();
newElement->overrunBitSet->clear();
ml->getPVCopyMonitor()->switchBitSets(
newElement->changedBitSet,newElement->overrunBitSet);
if(queue.getNumberFree()==0){
queueIsFull = true;
latestMonitorElement = monitorElement;
}
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
ml->getPVCopy()->updateCopyFromBitSet(
pvStructure,changedBitSet);
BitSetUtil::compress(changedBitSet,pvStructure);
BitSetUtil::compress(overrunBitSet,pvStructure);
monitorElement->changedBitSet->clear();
(*monitorElement->changedBitSet)|=(*changedBitSet);
monitorElement->overrunBitSet->clear();
(*monitorElement->overrunBitSet)|=(*overrunBitSet);
changedBitSet->clear();
overrunBitSet->clear();
queue.setUsed(monitorElement);
monitorElement = newElement;
return true;
}
@@ -524,8 +538,15 @@ MonitorElementPtr RealQueue::poll()
void RealQueue::release(MonitorElementPtr const &currentElement)
{
if(queueIsFull) {
MonitorElementPtr monitorElement = latestMonitorElement;
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
BitSetUtil::compress(monitorElement->changedBitSet,pvStructure);
BitSetUtil::compress(monitorElement->overrunBitSet,pvStructure);
queueIsFull = false;
latestMonitorElement.reset();
}
queue.releaseUsed(currentElement);
queueIsFull = false;
}
MonitorFactoryPtr getMonitorFactory()