more work on monitor queues

This commit is contained in:
Marty Kraimer
2013-11-12 11:09:25 -05:00
parent 006859120e
commit 3039e1cdb0
8 changed files with 2045 additions and 171 deletions

View File

@@ -35,12 +35,12 @@ static ConvertPtr convert = getConvert();
//class MonitorFieldNode;
//typedef std::tr1::shared_ptr<MonitorFieldNode> MonitorFieldNodePtr;
class MonitorQueue;
typedef std::tr1::shared_ptr<MonitorQueue> MonitorQueuePtr;
class NOQueue;
typedef std::tr1::shared_ptr<NOQueue> NOQueuePtr;
class RealQueue;
typedef std::tr1::shared_ptr<RealQueue> RealQueuePtr;
class ElementQueue;
typedef std::tr1::shared_ptr<ElementQueue> ElementQueuePtr;
class SingleElementQueue;
typedef std::tr1::shared_ptr<SingleElementQueue> SingleElementQueuePtr;
class MultipleElementQueue;
typedef std::tr1::shared_ptr<MultipleElementQueue> MultipleElementQueuePtr;
//class MonitorFieldNode
//{
@@ -49,36 +49,34 @@ typedef std::tr1::shared_ptr<RealQueue> RealQueuePtr;
// size_t bitOffset; // pv pvCopy
//};
class MonitorQueue :
public std::tr1::enable_shared_from_this<MonitorQueue>
class ElementQueue :
public Monitor,
public std::tr1::enable_shared_from_this<ElementQueue>
{
public:
POINTER_DEFINITIONS(MonitorQueue);
virtual ~MonitorQueue(){}
virtual Status start() = 0;
virtual void stop() = 0;
virtual bool dataChanged() = 0;
virtual MonitorElementPtr poll() = 0;
virtual void release(MonitorElementPtr const &monitorElement) = 0;
POINTER_DEFINITIONS(ElementQueue);
virtual ~ElementQueue(){}
virtual bool dataChanged(bool firstMonitor) = 0;
protected:
MonitorQueuePtr getPtrSelf()
ElementQueuePtr getPtrSelf()
{
return shared_from_this();
}
};
class NOQueue :
public MonitorQueue
class SingleElementQueue :
public ElementQueue
{
public:
POINTER_DEFINITIONS(NOQueue);
virtual ~NOQueue(){}
NOQueue(
POINTER_DEFINITIONS(SingleElementQueue);
virtual ~SingleElementQueue(){}
SingleElementQueue(
MonitorLocalPtr const &monitorLocal);
virtual void destroy(){}
virtual Status start();
virtual void stop();
virtual bool dataChanged();
virtual Status stop();
virtual bool dataChanged(bool firstMonitor);
virtual MonitorElementPtr poll();
virtual void release(MonitorElementPtr const &monitorElement);
private:
@@ -88,26 +86,32 @@ private:
bool gotMonitor;
BitSetPtr changedBitSet;
BitSetPtr overrunBitSet;
BitSetPtr dataChangedBitSet;
BitSetPtr dataOverrunBitSet;
};
class RealQueue :
public MonitorQueue
typedef Queue<MonitorElement> MonitorElementQueue;
typedef std::tr1::shared_ptr<MonitorElementQueue> MonitorElementQueuePtr;
class MultipleElementQueue :
public ElementQueue
{
public:
POINTER_DEFINITIONS(RealQueue);
virtual ~RealQueue(){}
RealQueue(
POINTER_DEFINITIONS(MultipleElementQueue);
virtual ~MultipleElementQueue(){}
MultipleElementQueue(
MonitorLocalPtr const &monitorLocal,
std::vector<MonitorElementPtr> &monitorElementArray,
MonitorElementQueuePtr const &queue,
size_t nfields);
virtual void destroy(){}
virtual Status start();
virtual void stop();
virtual bool dataChanged();
virtual Status stop();
virtual bool dataChanged(bool firstMonitor);
virtual MonitorElementPtr poll();
virtual void release(MonitorElementPtr const &monitorElement);
private:
std::tr1::weak_ptr<MonitorLocal> monitorLocal;
Queue<MonitorElement> queue;
MonitorElementQueuePtr queue;
BitSetPtr changedBitSet;
BitSetPtr overrunBitSet;
MonitorElementPtr latestMonitorElement;
@@ -146,7 +150,7 @@ private:
bool isDestroyed;
bool firstMonitor;
PVCopyPtr pvCopy;
MonitorQueuePtr queue;
ElementQueuePtr queue;
PVCopyMonitorPtr pvCopyMonitor;
Mutex mutex;
};
@@ -205,55 +209,41 @@ Status MonitorLocal::start()
Status MonitorLocal::stop()
{
if(pvRecord->getTraceLevel()>0)
{
cout << "MonitorLocal::stop() " << endl;
}
pvCopyMonitor->stopMonitoring();
queue->stop();
{
Lock xx(mutex);
if(pvRecord->getTraceLevel()>0){
cout << "MonitorLocal::stop() " << endl;
}
if(!isDestroyed) queue->stop();
}
return Status::Ok;
}
MonitorElementPtr MonitorLocal::poll()
{
pvRecord->lock();
try {
Lock xx(mutex);
if(pvRecord->getTraceLevel()>1)
{
cout << "MonitorLocal::poll() " << endl;
}
if(isDestroyed) {
pvRecord->unlock();
return NULLMonitorElement;
}
MonitorElementPtr element = queue->poll();
pvRecord->unlock();
return element;
} catch(...) {
pvRecord->unlock();
throw;
Lock xx(mutex);
if(pvRecord->getTraceLevel()>1)
{
cout << "MonitorLocal::poll() " << endl;
}
if(isDestroyed) {
return NULLMonitorElement;
}
return queue->poll();
}
void MonitorLocal::release(MonitorElementPtr const & monitorElement)
{
pvRecord->lock();
try {
Lock xx(mutex);
if(pvRecord->getTraceLevel()>1)
{
cout << "MonitorLocal::release() " << endl;
}
if(isDestroyed) {
return;
}
queue->release(monitorElement);
pvRecord->unlock();
} catch(...) {
pvRecord->unlock();
throw;
Lock xx(mutex);
if(pvRecord->getTraceLevel()>1)
{
cout << "MonitorLocal::release() " << endl;
}
if(isDestroyed) {
return;
}
queue->release(monitorElement);
}
void MonitorLocal::dataChanged()
@@ -262,17 +252,14 @@ void MonitorLocal::dataChanged()
{
cout << "MonitorLocal::dataChanged() " "firstMonitor " << firstMonitor << endl;
}
Lock xx(mutex);
if(isDestroyed) return;
if(firstMonitor) {
queue->dataChanged();
firstMonitor = false;
monitorRequester->monitorEvent(getPtrSelf());
return;
}
if(queue->dataChanged()) {
monitorRequester->monitorEvent(getPtrSelf());
bool getMonitorEvent = false;
{
Lock xx(mutex);
if(isDestroyed) return;
getMonitorEvent = queue->dataChanged(firstMonitor);
firstMonitor = false;
}
if(getMonitorEvent) monitorRequester->monitorEvent(getPtrSelf());
}
void MonitorLocal::unlisten()
@@ -326,18 +313,23 @@ bool MonitorLocal::init(PVStructurePtr const & pvRequest)
pvCopyMonitor = pvCopy->createPVCopyMonitor(getPtrSelf());
// MARTY MUST IMPLEMENT periodic
if(queueSize<2) {
queue = NOQueuePtr(new NOQueue(getPtrSelf()));
queue = SingleElementQueuePtr(new SingleElementQueue(getPtrSelf()));
} else {
std::vector<MonitorElementPtr> monitorElementArray;
monitorElementArray.reserve(queueSize);
size_t nfields = 0;
for(size_t i=0; i<queueSize; i++) {
PVStructurePtr pvStructure = pvCopy->createPVStructure();
if(nfields==0) nfields = pvStructure->getNumberFields();
MonitorElementPtr monitorElement(
new MonitorElement(pvStructure));
monitorElementArray.push_back(monitorElement);
}
size_t nfields = monitorElementArray[0]->pvStructurePtr->getNumberFields();
queue = RealQueuePtr(new RealQueue(getPtrSelf(),monitorElementArray,nfields));
MonitorElementQueuePtr elementQueue(new MonitorElementQueue(monitorElementArray));
queue = MultipleElementQueuePtr(new MultipleElementQueue(
getPtrSelf(),
elementQueue,
nfields));
}
// MARTY MUST IMPLEMENT algorithm
monitorRequester->monitorConnect(
@@ -410,22 +402,26 @@ MonitorAlgorithmCreatePtr MonitorFactory::getMonitorAlgorithmCreate(
return nullMonitorAlgorithmCreate;
}
NOQueue::NOQueue(
SingleElementQueue::SingleElementQueue(
MonitorLocalPtr const &monitorLocal)
: monitorLocal(monitorLocal),
pvCopyStructure(monitorLocal->getPVCopy()->createPVStructure()),
monitorElement(new MonitorElement(pvCopyStructure)),
gotMonitor(false),
changedBitSet(new BitSet(pvCopyStructure->getNumberFields())),
overrunBitSet(new BitSet(pvCopyStructure->getNumberFields()))
overrunBitSet(new BitSet(pvCopyStructure->getNumberFields())),
dataChangedBitSet(new BitSet(pvCopyStructure->getNumberFields())),
dataOverrunBitSet(new BitSet(pvCopyStructure->getNumberFields()))
{
}
Status NOQueue::start()
Status SingleElementQueue::start()
{
gotMonitor = true;
changedBitSet->clear();
overrunBitSet->clear();
dataChangedBitSet->clear();
dataOverrunBitSet->clear();
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return wasDestroyedStatus;
ml->getPVCopyMonitor()->startMonitoring(
@@ -434,54 +430,61 @@ Status NOQueue::start()
return Status::Ok;
}
void NOQueue::stop()
Status SingleElementQueue::stop()
{
return Status::Ok;
}
bool NOQueue::dataChanged()
bool SingleElementQueue::dataChanged(bool firstMonitor)
{
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return false;
if(firstMonitor) changedBitSet->set(0);
ml->getPVCopy()->updateCopyFromBitSet(pvCopyStructure,changedBitSet);
(*dataChangedBitSet) |= (*changedBitSet);
(*dataOverrunBitSet) |= (*overrunBitSet);
changedBitSet->clear();
overrunBitSet->clear();
gotMonitor = true;
return true;
}
MonitorElementPtr NOQueue::poll()
MonitorElementPtr SingleElementQueue::poll()
{
if(!gotMonitor) return NULLMonitorElement;
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return NULLMonitorElement;
ml->getPVCopy()->updateCopyFromBitSet(
pvCopyStructure, changedBitSet);
BitSetUtil::compress(changedBitSet,pvCopyStructure);
BitSetUtil::compress(overrunBitSet,pvCopyStructure);
monitorElement->changedBitSet->clear();
monitorElement->overrunBitSet->clear();
(*monitorElement->changedBitSet) |= (*changedBitSet);
(*monitorElement->overrunBitSet) |= (*overrunBitSet);
changedBitSet->clear();
overrunBitSet->clear();
if(!gotMonitor) return NULLMonitorElement;
gotMonitor = false;
convert->copyStructure(pvCopyStructure,monitorElement->pvStructurePtr);
BitSetUtil::compress(dataChangedBitSet,pvCopyStructure);
BitSetUtil::compress(dataOverrunBitSet,pvCopyStructure);
(*monitorElement->changedBitSet) = (*dataChangedBitSet);
(*monitorElement->overrunBitSet) = (*dataOverrunBitSet);
dataChangedBitSet->clear();
dataOverrunBitSet->clear();
return monitorElement;
}
void NOQueue::release(MonitorElementPtr const &monitorElement)
void SingleElementQueue::release(MonitorElementPtr const &monitorElement)
{
gotMonitor = false;
}
RealQueue::RealQueue(
MultipleElementQueue::MultipleElementQueue(
MonitorLocalPtr const &monitorLocal,
std::vector<MonitorElementPtr> &monitorElementArray,
MonitorElementQueuePtr const &queue,
size_t nfields)
: monitorLocal(monitorLocal),
queue(monitorElementArray),
queue(queue),
changedBitSet(new BitSet(nfields)),
overrunBitSet(new BitSet(nfields)),
queueIsFull(false)
{
}
Status RealQueue::start()
Status MultipleElementQueue::start()
{
queue.clear();
queue->clear();
queueIsFull = false;
changedBitSet->clear();
overrunBitSet->clear();
MonitorLocalPtr ml = monitorLocal.lock();
@@ -490,11 +493,12 @@ Status RealQueue::start()
return Status::Ok;
}
void RealQueue::stop()
Status MultipleElementQueue::stop()
{
return Status::Ok;
}
bool RealQueue::dataChanged()
bool MultipleElementQueue::dataChanged(bool firstMonitor)
{
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return false;
@@ -508,14 +512,15 @@ bool RealQueue::dataChanged()
overrunBitSet->clear();
return false;
}
MonitorElementPtr monitorElement = queue.getFree();
MonitorElementPtr monitorElement = queue->getFree();
if(monitorElement==NULL) {
throw std::logic_error(String("RealQueue::dataChanged() logic error"));
throw std::logic_error(String("MultipleElementQueue::dataChanged() logic error"));
}
if(queue.getNumberFree()==0){
if(queue->getNumberFree()==0){
queueIsFull = true;
latestMonitorElement = monitorElement;
}
if(firstMonitor) changedBitSet->set(0);
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
ml->getPVCopy()->updateCopyFromBitSet(
pvStructure,changedBitSet);
@@ -527,16 +532,16 @@ bool RealQueue::dataChanged()
(*monitorElement->overrunBitSet)|=(*overrunBitSet);
changedBitSet->clear();
overrunBitSet->clear();
queue.setUsed(monitorElement);
queue->setUsed(monitorElement);
return true;
}
MonitorElementPtr RealQueue::poll()
MonitorElementPtr MultipleElementQueue::poll()
{
return queue.getUsed();
return queue->getUsed();
}
void RealQueue::release(MonitorElementPtr const &currentElement)
void MultipleElementQueue::release(MonitorElementPtr const &currentElement)
{
if(queueIsFull) {
MonitorElementPtr monitorElement = latestMonitorElement;
@@ -546,7 +551,7 @@ void RealQueue::release(MonitorElementPtr const &currentElement)
queueIsFull = false;
latestMonitorElement.reset();
}
queue.releaseUsed(currentElement);
queue->releaseUsed(currentElement);
}
MonitorFactoryPtr getMonitorFactory()