/* monitorFactory.cpp */ /** * Copyright - See the COPYRIGHT that is included with this distribution. * EPICS pvData is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. */ /** * @author Marty Kraimer * @date 2013.04 */ #include #include #include #include #include #include #include #include #include #include #include #define epicsExportSharedSymbols #include "pv/pvDatabase.h" #include "pv/channelProviderLocal.h" using namespace epics::pvData; using namespace epics::pvAccess; using namespace epics::pvCopy; using std::tr1::static_pointer_cast; using std::cout; using std::endl; using std::string; namespace epics { namespace pvDatabase { class MonitorLocal; typedef std::tr1::shared_ptr MonitorLocalPtr; static MonitorPtr nullMonitor; static MonitorElementPtr NULLMonitorElement; static Status failedToCreateMonitorStatus( Status::STATUSTYPE_ERROR,"failed to create monitor"); static Status alreadyStartedStatus(Status::STATUSTYPE_ERROR,"already started"); static Status notStartedStatus(Status::STATUSTYPE_ERROR,"not started"); static Status deletedStatus(Status::STATUSTYPE_ERROR,"record is deleted"); class MonitorElementQueue; typedef std::tr1::shared_ptr MonitorElementQueuePtr; class MonitorElementQueue { private: MonitorElementPtrArray elements; // TODO use size_t instead int size; int numberFree; int numberUsed; int nextGetFree; int nextSetUsed; int nextGetUsed; int nextReleaseUsed; public: POINTER_DEFINITIONS(MonitorElementQueue); MonitorElementQueue(std::vector monitorElementArray) : elements(monitorElementArray), size(monitorElementArray.size()), numberFree(size), numberUsed(0), nextGetFree(0), nextSetUsed(0), nextGetUsed(0), nextReleaseUsed(0) { } virtual ~MonitorElementQueue() {} void clear() { numberFree = size; numberUsed = 0; nextGetFree = 0; nextSetUsed = 0; nextGetUsed = 0; nextReleaseUsed = 0; } MonitorElementPtr getFree() { if(numberFree==0) return MonitorElementPtr(); numberFree--; int ind = nextGetFree; MonitorElementPtr queueElement = elements[nextGetFree++]; if(nextGetFree>=size) nextGetFree = 0; return elements[ind]; } void setUsed(MonitorElementPtr const &element) { if(element!=elements[nextSetUsed++]) { throw std::logic_error("not correct queueElement"); } numberUsed++; if(nextSetUsed>=size) nextSetUsed = 0; } MonitorElementPtr getUsed() { if(numberUsed==0) return MonitorElementPtr(); int ind = nextGetUsed; MonitorElementPtr queueElement = elements[nextGetUsed++]; if(nextGetUsed>=size) nextGetUsed = 0; return elements[ind]; } void releaseUsed(MonitorElementPtr const &element) { if(element!=elements[nextReleaseUsed++]) { throw std::logic_error( "not queueElement returned by last call to getUsed"); } if(nextReleaseUsed>=size) nextReleaseUsed = 0; numberUsed--; numberFree++; } }; typedef std::tr1::shared_ptr MonitorRequesterPtr; class MonitorLocal : public Monitor, public PVListener, public std::tr1::enable_shared_from_this { enum MonitorState {idle,active,deleted}; public: POINTER_DEFINITIONS(MonitorLocal); virtual ~MonitorLocal(); virtual void destroy() {} // DEPRECATED virtual Status start(); virtual Status stop(); virtual MonitorElementPtr poll(); virtual void detach(PVRecordPtr const & pvRecord){} virtual void release(MonitorElementPtr const & monitorElement); virtual void dataPut(PVRecordFieldPtr const & pvRecordField); virtual void dataPut( PVRecordStructurePtr const & requested, PVRecordFieldPtr const & pvRecordField); virtual void beginGroupPut(PVRecordPtr const & pvRecord); virtual void endGroupPut(PVRecordPtr const & pvRecord); virtual void unlisten(PVRecordPtr const & pvRecord); MonitorElementPtr getActiveElement(); void releaseActiveElement(); bool init(PVStructurePtr const & pvRequest); MonitorLocal( MonitorRequester::shared_pointer const & channelMonitorRequester, PVRecordPtr const &pvRecord); PVCopyPtr getPVCopy() { return pvCopy;} private: MonitorLocalPtr getPtrSelf() { return shared_from_this(); } MonitorRequester::weak_pointer monitorRequester; PVRecordPtr pvRecord; MonitorState state; PVCopyPtr pvCopy; MonitorElementQueuePtr queue; MonitorElementPtr activeElement; bool isGroupPut; bool dataChanged; Mutex mutex; Mutex queueMutex; }; MonitorLocal::MonitorLocal( MonitorRequester::shared_pointer const & channelMonitorRequester, PVRecordPtr const &pvRecord) : monitorRequester(channelMonitorRequester), pvRecord(pvRecord), state(idle), isGroupPut(false), dataChanged(false) { } MonitorLocal::~MonitorLocal() { if(pvRecord->getTraceLevel()>0) { cout << "MonitorLocal::~MonitorLocal()" << endl; } } Status MonitorLocal::start() { if(pvRecord->getTraceLevel()>0) { cout << "MonitorLocal::start state " << state << endl; } { Lock xx(mutex); if(state==active) return alreadyStartedStatus; if(state==deleted) return deletedStatus; } pvRecord->addListener(getPtrSelf(),pvCopy); epicsGuard guard(*pvRecord); Lock xx(mutex); state = active; queue->clear(); isGroupPut = false; activeElement = queue->getFree(); activeElement->changedBitSet->clear(); activeElement->overrunBitSet->clear(); activeElement->changedBitSet->set(0); releaseActiveElement(); return Status::Ok; } Status MonitorLocal::stop() { if(pvRecord->getTraceLevel()>0){ cout << "MonitorLocal::stop state " << state << endl; } { Lock xx(mutex); if(state==idle) return notStartedStatus; if(state==deleted) return deletedStatus; state = idle; } pvRecord->removeListener(getPtrSelf(),pvCopy); return Status::Ok; } MonitorElementPtr MonitorLocal::poll() { if(pvRecord->getTraceLevel()>1) { cout << "MonitorLocal::poll state " << state << endl; } { Lock xx(queueMutex); if(state!=active) return NULLMonitorElement; return queue->getUsed(); } } void MonitorLocal::release(MonitorElementPtr const & monitorElement) { if(pvRecord->getTraceLevel()>1) { cout << "MonitorLocal::release state " << state << endl; } { Lock xx(queueMutex); if(state!=active) return; queue->releaseUsed(monitorElement); } } void MonitorLocal::releaseActiveElement() { if(pvRecord->getTraceLevel()>1) { cout << "MonitorLocal::releaseActiveElement state " << state << endl; } { Lock xx(queueMutex); if(state!=active) return; bool result = pvCopy->updateCopyFromBitSet(activeElement->pvStructurePtr,activeElement->changedBitSet); if(!result) return; MonitorElementPtr newActive = queue->getFree(); if(!newActive) return; BitSetUtil::compress(activeElement->changedBitSet,activeElement->pvStructurePtr); BitSetUtil::compress(activeElement->overrunBitSet,activeElement->pvStructurePtr); queue->setUsed(activeElement); activeElement = newActive; activeElement->changedBitSet->clear(); activeElement->overrunBitSet->clear(); } MonitorRequesterPtr requester = monitorRequester.lock(); if(!requester) return; requester->monitorEvent(getPtrSelf()); return; } void MonitorLocal::dataPut(PVRecordFieldPtr const & pvRecordField) { if(pvRecord->getTraceLevel()>1) { cout << "PVCopyMonitor::dataPut(pvRecordField)" << endl; } if(state!=active) return; { Lock xx(mutex); size_t offset = pvCopy->getCopyOffset(pvRecordField->getPVField()); BitSetPtr const &changedBitSet = activeElement->changedBitSet; BitSetPtr const &overrunBitSet = activeElement->overrunBitSet; bool isSet = changedBitSet->get(offset); changedBitSet->set(offset); if(isSet) overrunBitSet->set(offset); dataChanged = true; } if(!isGroupPut) { releaseActiveElement(); dataChanged = false; } } void MonitorLocal::dataPut( PVRecordStructurePtr const & requested, PVRecordFieldPtr const & pvRecordField) { if(pvRecord->getTraceLevel()>1) { cout << "PVCopyMonitor::dataPut(requested,pvRecordField)" << endl; } if(state!=active) return; { Lock xx(mutex); BitSetPtr const &changedBitSet = activeElement->changedBitSet; BitSetPtr const &overrunBitSet = activeElement->overrunBitSet; size_t offsetCopyRequested = pvCopy->getCopyOffset( requested->getPVField()); size_t offset = offsetCopyRequested + (pvRecordField->getPVField()->getFieldOffset() - requested->getPVField()->getFieldOffset()); bool isSet = changedBitSet->get(offset); changedBitSet->set(offset); if(isSet) overrunBitSet->set(offset); dataChanged = true; } if(!isGroupPut) { releaseActiveElement(); dataChanged = false; } } void MonitorLocal::beginGroupPut(PVRecordPtr const & pvRecord) { if(pvRecord->getTraceLevel()>1) { cout << "PVCopyMonitor::beginGroupPut()" << endl; } if(state!=active) return; { Lock xx(mutex); isGroupPut = true; dataChanged = false; } } void MonitorLocal::endGroupPut(PVRecordPtr const & pvRecord) { if(pvRecord->getTraceLevel()>1) { cout << "PVCopyMonitor::endGroupPut dataChanged " << dataChanged << endl; } if(state!=active) return; { Lock xx(mutex); isGroupPut = false; } if(dataChanged) { dataChanged = false; releaseActiveElement(); } } void MonitorLocal::unlisten(PVRecordPtr const & pvRecord) { if(pvRecord->getTraceLevel()>1) { cout << "PVCopyMonitor::unlisten\n"; } { Lock xx(mutex); state = deleted; } MonitorRequesterPtr requester = monitorRequester.lock(); if(requester) { if(pvRecord->getTraceLevel()>1) { cout << "PVCopyMonitor::unlisten calling requester->unlisten\n"; } requester->unlisten(getPtrSelf()); } } bool MonitorLocal::init(PVStructurePtr const & pvRequest) { PVFieldPtr pvField; size_t queueSize = 2; PVStructurePtr pvOptions = pvRequest->getSubField("record._options"); MonitorRequesterPtr requester = monitorRequester.lock(); if(!requester) return false; if(pvOptions) { PVStringPtr pvString = pvOptions->getSubField("queueSize"); if(pvString) { try { int32 size; std::stringstream ss; ss << pvString->get(); ss >> size; queueSize = size; } catch (...) { requester->message("queueSize " +pvString->get() + " illegal",errorMessage); return false; } } } pvField = pvRequest->getSubField("field"); if(!pvField) { pvCopy = PVCopy::create( pvRecord->getPVRecordStructure()->getPVStructure(), pvRequest,""); if(!pvCopy) { requester->message("illegal pvRequest",errorMessage); return false; } } else { if(pvField->getField()->getType()!=structure) { requester->message("illegal pvRequest",errorMessage); return false; } pvCopy = PVCopy::create( pvRecord->getPVRecordStructure()->getPVStructure(), pvRequest,"field"); if(!pvCopy) { requester->message("illegal pvRequest",errorMessage); return false; } } if(queueSize<2) queueSize = 2; std::vector monitorElementArray; monitorElementArray.reserve(queueSize); for(size_t i=0; icreatePVStructure(); MonitorElementPtr monitorElement( new MonitorElement(pvStructure)); monitorElementArray.push_back(monitorElement); } queue = MonitorElementQueuePtr(new MonitorElementQueue(monitorElementArray)); requester->monitorConnect( Status::Ok, getPtrSelf(), pvCopy->getStructure()); return true; } MonitorPtr createMonitorLocal( PVRecordPtr const & pvRecord, MonitorRequester::shared_pointer const & monitorRequester, PVStructurePtr const & pvRequest) { MonitorLocalPtr monitor(new MonitorLocal( monitorRequester,pvRecord)); bool result = monitor->init(pvRequest); if(!result) { MonitorPtr monitor; StructureConstPtr structure; monitorRequester->monitorConnect( failedToCreateMonitorStatus,monitor,structure); return nullMonitor; } if(pvRecord->getTraceLevel()>0) { cout << "MonitorFactory::createMonitor" << " recordName " << pvRecord->getRecordName() << endl; } return monitor; } }}