simplify monitors; cleanup code; fix race condidition for monitor cleanup

This commit is contained in:
Marty Kraimer
2015-02-06 14:49:28 -05:00
parent e30e4f3638
commit 30dd2ed046
7 changed files with 262 additions and 674 deletions

View File

@@ -18,7 +18,6 @@
#define epicsExportSharedSymbols
#include <pv/pvCopyMonitor.h>
#include <pv/channelProviderLocal.h>
using namespace epics::pvData;
@@ -33,6 +32,7 @@ namespace epics { namespace pvDatabase {
static MonitorPtr nullMonitor;
static MonitorElementPtr NULLMonitorElement;
static Status failedToCreateMonitorStatus(Status::STATUSTYPE_ERROR,"failed to create monitor");
static Status wasDestroyedStatus(Status::STATUSTYPE_ERROR,"was destroyed");
static Status alreadyStartedStatus(Status::STATUSTYPE_ERROR,"already started");
static Status notStartedStatus(Status::STATUSTYPE_ERROR,"not started");
@@ -47,7 +47,7 @@ typedef std::tr1::shared_ptr<MonitorElementQueue> MonitorElementQueuePtr;
class MonitorLocal :
public Monitor,
public PVCopyMonitorRequester,
public PVListener,
public std::tr1::enable_shared_from_this<MonitorLocal>
{
enum MonitorState {idle,active, destroyed};
@@ -58,16 +58,22 @@ public:
virtual Status stop();
virtual MonitorElementPtr poll();
virtual void destroy();
virtual void detach(PVRecordPtr const & pvRecord){destroy();}
virtual void release(MonitorElementPtr const & monitorElement);
virtual void dataPut(PVRecordFieldPtr const & pvRecordField);
virtual void dataPut(
PVRecordStructurePtr const & requested,
PVRecordFieldPtr const & pvRecordField);
virtual void beginGroupPut(PVRecordPtr const & pvRecord);
virtual void endGroupPut(PVRecordPtr const & pvRecord);
virtual void unlisten(PVRecordPtr const & pvRecord);
MonitorElementPtr getActiveElement();
MonitorElementPtr releaseActiveElement();
void unlisten();
void releaseActiveElement();
bool init(PVStructurePtr const & pvRequest);
MonitorLocal(
MonitorRequester::shared_pointer const & channelMonitorRequester,
PVRecordPtr const &pvRecord);
PVCopyPtr getPVCopy() { return pvCopy;}
PVCopyMonitorPtr getPVCopyMonitor() { return pvCopyMonitor;}
private:
MonitorLocalPtr getPtrSelf()
{
@@ -76,12 +82,13 @@ private:
MonitorRequester::shared_pointer monitorRequester;
PVRecordPtr pvRecord;
MonitorState state;
bool firstMonitor;
PVCopyPtr pvCopy;
MonitorElementQueuePtr queue;
MonitorElementPtr activeElement;
PVCopyMonitorPtr pvCopyMonitor;
bool isGroupPut;
bool dataChanged;
Mutex mutex;
Mutex queueMutex;
};
MonitorLocal::MonitorLocal(
@@ -90,7 +97,8 @@ MonitorLocal::MonitorLocal(
: monitorRequester(channelMonitorRequester),
pvRecord(pvRecord),
state(idle),
firstMonitor(true)
isGroupPut(false),
dataChanged(false)
{
}
@@ -111,12 +119,16 @@ void MonitorLocal::destroy()
{
Lock xx(mutex);
if(state==destroyed) return;
}
if(pvCopy) pvCopy->destroy();
{
Lock xx(mutex);
state = destroyed;
}
pvCopyMonitor->destroy();
pvCopy->destroy();
pvCopyMonitor.reset();
queue.reset();
{
Lock xx(queueMutex);
queue.reset();
}
pvCopy.reset();
}
@@ -130,14 +142,23 @@ Status MonitorLocal::start()
Lock xx(mutex);
if(state==destroyed) return wasDestroyedStatus;
if(state==active) return alreadyStartedStatus;
}
pvRecord->addListener(getPtrSelf(),pvCopy);
pvRecord->lock();
try {
Lock xx(mutex);
state = active;
firstMonitor = true;
queue->clear();
isGroupPut = false;
activeElement = queue->getFree();
activeElement->changedBitSet->clear();
activeElement->overrunBitSet->clear();
activeElement->changedBitSet->set(0);
releaseActiveElement();
pvRecord->unlock();
} catch(...) {
pvRecord->unlock();
}
pvCopyMonitor->startMonitoring(activeElement);
return Status::Ok;
}
@@ -152,7 +173,7 @@ Status MonitorLocal::stop()
if(state==idle) return notStartedStatus;
state = idle;
}
pvCopyMonitor->stopMonitoring();
pvRecord->removeListener(getPtrSelf(),pvCopy);
return Status::Ok;
}
@@ -162,9 +183,11 @@ MonitorElementPtr MonitorLocal::poll()
{
cout << "MonitorLocal::poll() " << endl;
}
Lock xx(mutex);
if(state!=active) return NULLMonitorElement;
return queue->getUsed();
{
Lock xx(queueMutex);
if(state!=active) return NULLMonitorElement;
return queue->getUsed();
}
}
void MonitorLocal::release(MonitorElementPtr const & monitorElement)
@@ -173,22 +196,24 @@ void MonitorLocal::release(MonitorElementPtr const & monitorElement)
{
cout << "MonitorLocal::release() " << endl;
}
Lock xx(mutex);
if(state!=active) return;
queue->releaseUsed(monitorElement);
{
Lock xx(queueMutex);
if(state!=active) return;
queue->releaseUsed(monitorElement);
}
}
MonitorElementPtr MonitorLocal::releaseActiveElement()
void MonitorLocal::releaseActiveElement()
{
if(pvRecord->getTraceLevel()>1)
{
cout << "MonitorLocal::releaseActiveElement() " << endl;
}
{
Lock xx(mutex);
if(state!=active) return NULLMonitorElement;
Lock xx(queueMutex);
if(state!=active) return;
MonitorElementPtr newActive = queue->getFree();
if(!newActive) return activeElement;
if(!newActive) return;
pvCopy->updateCopyFromBitSet(activeElement->pvStructurePtr,activeElement->changedBitSet);
BitSetUtil::compress(activeElement->changedBitSet,activeElement->pvStructurePtr);
BitSetUtil::compress(activeElement->overrunBitSet,activeElement->pvStructurePtr);
@@ -198,45 +223,128 @@ MonitorElementPtr MonitorLocal::releaseActiveElement()
activeElement->overrunBitSet->clear();
}
monitorRequester->monitorEvent(getPtrSelf());
return activeElement;
return;
}
void MonitorLocal::unlisten()
void MonitorLocal::dataPut(PVRecordFieldPtr const & pvRecordField)
{
if(pvRecord->getTraceLevel()>0)
if(pvRecord->getTraceLevel()>1)
{
cout << "MonitorLocal::unlisten() " << endl;
cout << "PVCopyMonitor::dataPut(pvRecordField)" << endl;
}
if(state!=active) return;
{
Lock xx(mutex);
size_t offset = pvCopy->getCopyOffset(pvRecordField->getPVField());
BitSetPtr const &changedBitSet = activeElement->changedBitSet;
BitSetPtr const &overrunBitSet = activeElement->overrunBitSet;
bool isSet = changedBitSet->get(offset);
changedBitSet->set(offset);
if(isSet) overrunBitSet->set(offset);
dataChanged = true;
}
if(!isGroupPut) {
releaseActiveElement();
dataChanged = false;
}
monitorRequester->unlisten(getPtrSelf());
}
void MonitorLocal::dataPut(
PVRecordStructurePtr const & requested,
PVRecordFieldPtr const & pvRecordField)
{
if(pvRecord->getTraceLevel()>1)
{
cout << "PVCopyMonitor::dataPut(requested,pvRecordField)" << endl;
}
if(state!=active) return;
{
Lock xx(mutex);
BitSetPtr const &changedBitSet = activeElement->changedBitSet;
BitSetPtr const &overrunBitSet = activeElement->overrunBitSet;
size_t offsetCopyRequested = pvCopy->getCopyOffset(
requested->getPVField());
size_t offset = offsetCopyRequested
+ (pvRecordField->getPVField()->getFieldOffset()
- requested->getPVField()->getFieldOffset());
bool isSet = changedBitSet->get(offset);
changedBitSet->set(offset);
if(isSet) overrunBitSet->set(offset);
dataChanged = true;
}
if(!isGroupPut) {
releaseActiveElement();
dataChanged = false;
}
}
void MonitorLocal::beginGroupPut(PVRecordPtr const & pvRecord)
{
if(pvRecord->getTraceLevel()>1)
{
cout << "PVCopyMonitor::beginGroupPut()" << endl;
}
if(state!=active) return;
{
Lock xx(mutex);
isGroupPut = true;
dataChanged = false;
}
}
void MonitorLocal::endGroupPut(PVRecordPtr const & pvRecord)
{
if(pvRecord->getTraceLevel()>1)
{
cout << "PVCopyMonitor::endGroupPut() dataChanged " << dataChanged << endl;
}
if(state!=active) return;
{
Lock xx(mutex);
isGroupPut = false;
}
if(dataChanged) {
dataChanged = false;
releaseActiveElement();
}
}
void MonitorLocal::unlisten(PVRecordPtr const & pvRecord)
{
if(pvRecord->getTraceLevel()>1)
{
cout << "PVCopyMonitor::unlisten\n";
}
pvRecord->removeListener(getPtrSelf(),pvCopy);
}
bool MonitorLocal::init(PVStructurePtr const & pvRequest)
{
PVFieldPtr pvField;
PVStructurePtr pvOptions;
size_t queueSize = 2;
pvField = pvRequest->getSubField("record._options");
if(pvField.get()!=NULL) {
pvOptions = static_pointer_cast<PVStructure>(pvField);
pvField = pvOptions->getSubField("queueSize");
if(pvField.get()!=NULL) {
PVStringPtr pvString = pvOptions->getStringField("queueSize");
if(pvString.get()!=NULL) {
PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
if(pvOptions) {
PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
if(pvString) {
try {
int32 size;
std::stringstream ss;
ss << pvString->get();
ss >> size;
queueSize = size;
} catch (...) {
monitorRequester->message("queueSize " +pvString->get() + " illegal",errorMessage);
return false;
}
}
}
pvField = pvRequest->getSubField("field");
if(pvField.get()==NULL) {
if(!pvField) {
pvCopy = PVCopy::create(
pvRecord->getPVRecordStructure()->getPVStructure(),
pvRequest,"");
if(pvCopy.get()==NULL) {
if(!pvCopy) {
monitorRequester->message("illegal pvRequest",errorMessage);
return false;
}
@@ -248,27 +356,21 @@ bool MonitorLocal::init(PVStructurePtr const & pvRequest)
pvCopy = PVCopy::create(
pvRecord->getPVRecordStructure()->getPVStructure(),
pvRequest,"field");
if(pvCopy.get()==NULL) {
if(!pvCopy) {
monitorRequester->message("illegal pvRequest",errorMessage);
return false;
}
}
pvCopyMonitor = PVCopyMonitor::create(
getPtrSelf(),pvRecord,pvCopy);
// MARTY MUST IMPLEMENT periodic
if(queueSize<2) queueSize = 2;
std::vector<MonitorElementPtr> monitorElementArray;
monitorElementArray.reserve(queueSize);
size_t nfields = 0;
for(size_t i=0; i<queueSize; i++) {
PVStructurePtr pvStructure = pvCopy->createPVStructure();
if(nfields==0) nfields = pvStructure->getNumberFields();
MonitorElementPtr monitorElement(
new MonitorElement(pvStructure));
monitorElementArray.push_back(monitorElement);
}
queue = MonitorElementQueuePtr(new MonitorElementQueue(monitorElementArray));
// MARTY MUST IMPLEMENT algorithm
monitorRequester->monitorConnect(
Status::Ok,
getPtrSelf(),
@@ -289,7 +391,6 @@ MonitorFactory::~MonitorFactory()
void MonitorFactory::destroy()
{
Lock lock(mutex);
if(isDestroyed) return;
isDestroyed = true;
}
@@ -306,7 +407,12 @@ MonitorPtr MonitorFactory::createMonitor(
MonitorLocalPtr monitor(new MonitorLocal(
monitorRequester,pvRecord));
bool result = monitor->init(pvRequest);
if(!result) return nullMonitor;
if(!result) {
MonitorPtr monitor;
StructureConstPtr structure;
monitorRequester->monitorConnect(failedToCreateMonitorStatus,monitor,structure);
return nullMonitor;
}
if(pvRecord->getTraceLevel()>0)
{
cout << "MonitorFactory::createMonitor";