fixed bugs in implementation of montor; connection problems still exist

This commit is contained in:
Marty Kraimer
2014-08-07 08:02:53 -04:00
parent d6aa03815e
commit 9551b0e4c6
10 changed files with 152 additions and 224 deletions

View File

@@ -30,6 +30,7 @@ using std::string;
namespace epics { namespace pvDatabase {
static MonitorPtr nullMonitor;
static MonitorElementPtr NULLMonitorElement;
static Status wasDestroyedStatus(Status::STATUSTYPE_ERROR,"was destroyed");
@@ -37,52 +38,9 @@ static Status wasDestroyedStatus(Status::STATUSTYPE_ERROR,"was destroyed");
static ConvertPtr convert = getConvert();
class ElementQueue;
typedef std::tr1::shared_ptr<ElementQueue> ElementQueuePtr;
class MultipleElementQueue;
typedef std::tr1::shared_ptr<MultipleElementQueue> MultipleElementQueuePtr;
class ElementQueue :
public Monitor,
public std::tr1::enable_shared_from_this<ElementQueue>
{
public:
POINTER_DEFINITIONS(ElementQueue);
virtual ~ElementQueue(){}
virtual bool dataChanged() = 0;
protected:
ElementQueuePtr getPtrSelf()
{
return shared_from_this();
}
};
typedef Queue<MonitorElement> MonitorElementQueue;
typedef std::tr1::shared_ptr<MonitorElementQueue> MonitorElementQueuePtr;
class MultipleElementQueue :
public ElementQueue
{
public:
POINTER_DEFINITIONS(MultipleElementQueue);
virtual ~MultipleElementQueue(){}
MultipleElementQueue(
MonitorLocalPtr const &monitorLocal,
MonitorElementQueuePtr const &queue,
size_t nfields);
virtual void destroy(){}
virtual Status start();
virtual Status stop();
virtual bool dataChanged();
virtual MonitorElementPtr poll();
virtual void release(MonitorElementPtr const &monitorElement);
private:
std::tr1::weak_ptr<MonitorLocal> monitorLocal;
MonitorElementQueuePtr queue;
MonitorElementPtr activeElement;
bool queueIsFull;
};
class MonitorLocal :
@@ -97,9 +55,10 @@ public:
virtual Status stop();
virtual MonitorElementPtr poll();
virtual void destroy();
virtual void dataChanged();
virtual void unlisten();
virtual void release(MonitorElementPtr const & monitorElement);
MonitorElementPtr getActiveElement();
MonitorElementPtr releaseActiveElement();
void unlisten();
bool init(PVStructurePtr const & pvRequest);
MonitorLocal(
MonitorRequester::shared_pointer const & channelMonitorRequester,
@@ -116,7 +75,10 @@ private:
bool isDestroyed;
bool firstMonitor;
PVCopyPtr pvCopy;
ElementQueuePtr queue;
// MultipleElementQueuePtr queue;
MonitorElementQueuePtr queue;
MonitorElementPtr activeElement;
bool queueIsFull;
PVCopyMonitorPtr pvCopyMonitor;
Mutex mutex;
};
@@ -161,69 +123,93 @@ void MonitorLocal::destroy()
Status MonitorLocal::start()
{
Lock xx(mutex);
if(pvRecord->getTraceLevel()>0)
{
cout << "MonitorLocal::start() " << endl;
}
Lock xx(mutex);
if(isDestroyed) return wasDestroyedStatus;
firstMonitor = true;
return queue->start();
queue->clear();
queueIsFull = false;
activeElement = queue->getFree();
activeElement->changedBitSet->clear();
activeElement->overrunBitSet->clear();
pvCopyMonitor->startMonitoring();
return Status::Ok;
}
Status MonitorLocal::stop()
{
pvCopyMonitor->stopMonitoring();
{
Lock xx(mutex);
if(pvRecord->getTraceLevel()>0){
cout << "MonitorLocal::stop() " << endl;
}
if(!isDestroyed) queue->stop();
if(pvRecord->getTraceLevel()>0){
cout << "MonitorLocal::stop() " << endl;
}
Lock xx(mutex);
pvCopyMonitor->stopMonitoring();
return Status::Ok;
}
MonitorElementPtr MonitorLocal::poll()
{
Lock xx(mutex);
if(pvRecord->getTraceLevel()>1)
{
cout << "MonitorLocal::poll() " << endl;
}
Lock xx(mutex);
if(isDestroyed) {
return NULLMonitorElement;
}
return queue->poll();
return queue->getUsed();
}
void MonitorLocal::release(MonitorElementPtr const & monitorElement)
{
Lock xx(mutex);
if(pvRecord->getTraceLevel()>1)
{
cout << "MonitorLocal::release() " << endl;
}
Lock xx(mutex);
if(isDestroyed) {
return;
}
queue->release(monitorElement);
queue->releaseUsed(monitorElement);
queueIsFull = false;
}
void MonitorLocal::dataChanged()
MonitorElementPtr MonitorLocal::getActiveElement()
{
if(pvRecord->getTraceLevel()>1)
if(pvRecord->getTraceLevel()>0)
{
cout << "MonitorLocal::dataChanged() " "firstMonitor " << firstMonitor << endl;
cout << "MonitorLocal::getActiveElement() " << endl;
}
Lock xx(mutex);
return activeElement;
}
MonitorElementPtr MonitorLocal::releaseActiveElement()
{
if(pvRecord->getTraceLevel()>0)
{
cout << "MonitorLocal::releaseActiveElement() " << endl;
}
bool getMonitorEvent = false;
{
Lock xx(mutex);
if(isDestroyed) return;
getMonitorEvent = queue->dataChanged();
firstMonitor = false;
if(queueIsFull) return activeElement;
pvCopy->updateCopyFromBitSet(activeElement->pvStructurePtr,activeElement->changedBitSet);
BitSetUtil::compress(activeElement->changedBitSet,activeElement->pvStructurePtr);
BitSetUtil::compress(activeElement->overrunBitSet,activeElement->pvStructurePtr);
queue->setUsed(activeElement);
activeElement = queue->getFree();
if(activeElement==NULL) {
throw std::logic_error("MultipleLocal::releaseActiveElement logic error");
}
if(queue->getNumberFree()==0) queueIsFull = true;
activeElement->changedBitSet->clear();
activeElement->overrunBitSet->clear();
}
if(getMonitorEvent) monitorRequester->monitorEvent(getPtrSelf());
monitorRequester->monitorEvent(getPtrSelf());
return activeElement;
}
void MonitorLocal::unlisten()
@@ -292,11 +278,7 @@ bool MonitorLocal::init(PVStructurePtr const & pvRequest)
new MonitorElement(pvStructure));
monitorElementArray.push_back(monitorElement);
}
MonitorElementQueuePtr elementQueue(new MonitorElementQueue(monitorElementArray));
queue = MultipleElementQueuePtr(new MultipleElementQueue(
getPtrSelf(),
elementQueue,
nfields));
queue = MonitorElementQueuePtr(new MonitorElementQueue(monitorElementArray));
// MARTY MUST IMPLEMENT algorithm
monitorRequester->monitorConnect(
Status::Ok,
@@ -345,75 +327,6 @@ MonitorPtr MonitorFactory::createMonitor(
}
MultipleElementQueue::MultipleElementQueue(
MonitorLocalPtr const &monitorLocal,
MonitorElementQueuePtr const &queue,
size_t nfields)
: monitorLocal(monitorLocal),
queue(queue),
queueIsFull(false)
{
}
Status MultipleElementQueue::start()
{
queue->clear();
queueIsFull = false;
activeElement = queue->getFree();
activeElement->changedBitSet->clear();
activeElement->overrunBitSet->clear();
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return wasDestroyedStatus;
ml->getPVCopyMonitor()->setMonitorElement(activeElement);
ml->getPVCopyMonitor()->startMonitoring();
return Status::Ok;
}
Status MultipleElementQueue::stop()
{
return Status::Ok;
}
bool MultipleElementQueue::dataChanged()
{
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return false;
if(queueIsFull) return false;
ml->getPVCopy()->updateCopyFromBitSet(
activeElement->pvStructurePtr,activeElement->changedBitSet);
BitSetUtil::compress(activeElement->changedBitSet,activeElement->pvStructurePtr);
BitSetUtil::compress(activeElement->overrunBitSet,activeElement->pvStructurePtr);
queue->setUsed(activeElement);
activeElement = queue->getFree();
if(activeElement==NULL) {
throw std::logic_error("MultipleElementQueue::dataChanged() logic error");
}
if(queue->getNumberFree()==0) queueIsFull = true;
activeElement->changedBitSet->clear();
activeElement->overrunBitSet->clear();
ml->getPVCopyMonitor()->setMonitorElement(activeElement);
return true;
}
MonitorElementPtr MultipleElementQueue::poll()
{
MonitorLocalPtr ml = monitorLocal.lock();
if(ml==NULL) return MonitorElementPtr();
MonitorElementPtr monitorElement = queue->getUsed();
if(monitorElement==NULL) return monitorElement;
ml->getPVCopyMonitor()->monitorDone(monitorElement);
return monitorElement;
}
void MultipleElementQueue::release(MonitorElementPtr const &element)
{
queue->releaseUsed(element);
if(!queueIsFull) return;
queueIsFull = false;
if(!activeElement->changedBitSet->isEmpty()) {
dataChanged();
}
}
MonitorFactoryPtr getMonitorFactory()
{