From 58b04509ded63d9b8c7e73e46d515443d859a019 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Sun, 1 Feb 2015 00:45:13 +0100 Subject: [PATCH] client: reimplemented handling monitors --- src/remoteClient/clientContextImpl.cpp | 228 +++++++++++++++++++------ 1 file changed, 173 insertions(+), 55 deletions(-) diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index d47db17..8aaa489 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2089,8 +2089,9 @@ namespace epics { virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0; }; + typedef vector FreeElementQueue; + typedef queue MonitorElementQueue; - typedef Queue MonitorElementQueue; class MonitorStrategyQueue : public MonitorStrategy, @@ -2101,7 +2102,8 @@ namespace epics { int32 m_queueSize; StructureConstPtr m_lastStructure; - std::tr1::shared_ptr m_monitorQueue; + FreeElementQueue m_freeQueue; + MonitorElementQueue m_monitorQueue; MonitorRequester::shared_pointer m_callback; @@ -2110,24 +2112,30 @@ namespace epics { BitSet::shared_pointer m_bitSet1; BitSet::shared_pointer m_bitSet2; + MonitorElement::shared_pointer m_overrunElement; bool m_overrunInProgress; - bool m_needToReleaseFirst; MonitorElement::shared_pointer m_nullMonitorElement; - MonitorElement::shared_pointer m_monitorElement; + + PVStructure::shared_pointer m_up2datePVStructure; public: MonitorStrategyQueue(MonitorRequester::shared_pointer const & callback, int32 queueSize) : - m_queueSize(queueSize), m_lastStructure(), m_monitorQueue(), + m_queueSize(queueSize), m_lastStructure(), + m_freeQueue(), + m_monitorQueue(), m_callback(callback), m_mutex(), m_bitSet1(), m_bitSet2(), m_overrunInProgress(false), - m_needToReleaseFirst(false), - m_nullMonitorElement(), m_monitorElement() + m_nullMonitorElement() { if (queueSize <= 1) throw std::invalid_argument("queueSize <= 1"); + + m_freeQueue.reserve(m_queueSize); + // TODO array based deque + //m_monitorQueue.reserve(m_queueSize); } virtual ~MonitorStrategyQueue() @@ -2141,20 +2149,67 @@ namespace epics { if (m_lastStructure.get() == 0 || *(m_lastStructure.get()) == *(structure.get())) { - std::vector monitorElementArray; - monitorElementArray.reserve(m_queueSize); - for (int32 i = 0; i < m_queueSize; i++) { PVStructure::shared_pointer pvStructure = getPVDataCreate()->createPVStructure(structure); MonitorElement::shared_pointer monitorElement(new MonitorElement(pvStructure)); - monitorElementArray.push_back(monitorElement); + m_freeQueue.push_back(monitorElement); } - m_monitorQueue.reset(new MonitorElementQueue(monitorElementArray)); m_lastStructure = structure; } } + + + void partialCopy(PVStructure::shared_pointer const & from, + PVStructure::shared_pointer const & to, + BitSet::shared_pointer const & maskBitSet, + bool inverse = false) { + + size_t numberFields = from->getNumberFields(); + size_t offset = from->getFieldOffset(); + int32 next = inverse ? + maskBitSet->nextClearBit(static_cast(offset)) : + maskBitSet->nextSetBit(static_cast(offset)); + + // no more changes or no changes in this structure + if(next<0||next>=static_cast(offset+numberFields)) return; + + // entire structure + if(static_cast(offset)==next) { + getConvert()->copy(from, to); + return; + } + + PVFieldPtrArray const & fromPVFields = from->getPVFields(); + PVFieldPtrArray const & toPVFields = to->getPVFields(); + + size_t fieldsSize = fromPVFields.size(); + for(size_t i = 0; igetFieldOffset(); + int32 inumberFields = static_cast(pvField->getNumberFields()); + next = inverse ? + maskBitSet->nextClearBit(static_cast(offset)) : + maskBitSet->nextSetBit(static_cast(offset)); + + // no more changes + if(next<0) return; + // no change in this pvField + if(next>=static_cast(offset+inumberFields)) continue; + + // serialize field or fields + if(inumberFields==1) { + getConvert()->copy(pvField, toPVFields[i]); + } else { + PVStructure::shared_pointer fromPVStructure = std::tr1::static_pointer_cast(pvField); + PVStructure::shared_pointer toPVStructure = std::tr1::static_pointer_cast(toPVFields[i]); + partialCopy(fromPVStructure, toPVStructure, maskBitSet); + } + } + } + + /* virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) { bool notify = false; @@ -2165,7 +2220,7 @@ namespace epics { // if in overrun mode, check if some is free if (m_overrunInProgress) { - MonitorElementPtr newElement = m_monitorQueue->getFree(); + MonitorElementPtr newElement = m_monitorQueue.getFree(); if (newElement.get() != 0) { // take new, put current in use @@ -2174,7 +2229,7 @@ namespace epics { BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure); BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure); - m_monitorQueue->setUsed(m_monitorElement); + m_monitorQueue.setUsed(m_monitorElement); m_monitorElement = newElement; notify = true; @@ -2228,7 +2283,7 @@ namespace epics { } // prepare next free (if any) - MonitorElementPtr newElement = m_monitorQueue->getFree(); + MonitorElementPtr newElement = m_monitorQueue.getFree(); if (newElement.get() == 0) { m_overrunInProgress = true; return; @@ -2244,7 +2299,7 @@ namespace epics { getConvert()->copy(pvStructure, newElement->pvStructurePtr); - m_monitorQueue->setUsed(m_monitorElement); + m_monitorQueue.setUsed(m_monitorElement); m_monitorElement = newElement; } @@ -2252,59 +2307,122 @@ namespace epics { EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); } + */ + + + virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) { + + { + // TODO do not lock deserialization + Lock guard(m_mutex); + + if (m_overrunInProgress) + { + PVStructurePtr pvStructure = m_overrunElement->pvStructurePtr; + BitSet::shared_pointer changedBitSet = m_overrunElement->changedBitSet; + BitSet::shared_pointer overrunBitSet = m_overrunElement->overrunBitSet; + + // lazy init + if (m_bitSet1.get() == 0) m_bitSet1.reset(new BitSet(changedBitSet->size())); + if (m_bitSet2.get() == 0) m_bitSet2.reset(new BitSet(overrunBitSet->size())); + + m_bitSet1->deserialize(payloadBuffer, transport.get()); + pvStructure->deserialize(payloadBuffer, transport.get(), m_bitSet1.get()); + m_bitSet2->deserialize(payloadBuffer, transport.get()); + + // OR local overrun + // TODO this does not work perfectly if bitSet is compressed !!! + // uncompressed bitSets should be used !!! + overrunBitSet->or_and(*(changedBitSet.get()), *(m_bitSet1.get())); + + // OR remove change + *(changedBitSet.get()) |= *(m_bitSet1.get()); + + // OR remote overrun + *(overrunBitSet.get()) |= *(m_bitSet2.get()); + + // m_up2datePVStructure is already set + + return; + } + + MonitorElementPtr newElement = m_freeQueue.back(); + m_freeQueue.pop_back(); + + if (m_freeQueue.empty()) + { + m_overrunInProgress = true; + m_overrunElement = newElement; + } + + // setup current fields + PVStructurePtr pvStructure = newElement->pvStructurePtr; + BitSet::shared_pointer changedBitSet = newElement->changedBitSet; + BitSet::shared_pointer overrunBitSet = newElement->overrunBitSet; + + // deserialize changedBitSet and data, and overrun bit set + changedBitSet->deserialize(payloadBuffer, transport.get()); + if (m_up2datePVStructure && m_up2datePVStructure.get() != pvStructure.get()) + partialCopy(m_up2datePVStructure, pvStructure, changedBitSet, true); + pvStructure->deserialize(payloadBuffer, transport.get(), changedBitSet.get()); + overrunBitSet->deserialize(payloadBuffer, transport.get()); + + m_up2datePVStructure = pvStructure; + + m_monitorQueue.push(newElement); + } + + if (!m_overrunInProgress) + { + EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); + } + } + virtual MonitorElement::shared_pointer poll() { Lock guard(m_mutex); - if (m_needToReleaseFirst) - return m_nullMonitorElement; - MonitorElementPtr retVal = m_monitorQueue->getUsed(); - if (retVal.get() != 0) - { - m_needToReleaseFirst = true; - return retVal; - } - - // if in overrun mode and we have free, make it as last element - if (m_overrunInProgress) - { - MonitorElementPtr newElement = m_monitorQueue->getFree(); - if (newElement.get() != 0) - { - // take new, put current in use - PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr; - getConvert()->copy(pvStructure, newElement->pvStructurePtr); - - BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure); - BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure); - m_monitorQueue->setUsed(m_monitorElement); - - m_monitorElement = newElement; - - m_overrunInProgress = false; - - m_needToReleaseFirst = true; - return m_monitorQueue->getUsed(); - } - else - return m_nullMonitorElement; // should never happen since queueSize >= 2, but a client not calling release can do this - } - else + if (m_monitorQueue.empty()) return m_nullMonitorElement; + + MonitorElement::shared_pointer retVal = m_monitorQueue.front(); + m_monitorQueue.pop(); + return retVal; } + // NOTE: a client must always call poll() after release() to check the presence of any new monitor elements virtual void release(MonitorElement::shared_pointer const & monitorElement) { Lock guard(m_mutex); - m_monitorQueue->releaseUsed(monitorElement); - m_needToReleaseFirst = false; + + m_freeQueue.push_back(monitorElement); + + if (m_overrunInProgress) + { + // compress bit-set + PVStructurePtr pvStructure = m_overrunElement->pvStructurePtr; + BitSetUtil::compress(m_overrunElement->changedBitSet, pvStructure); + BitSetUtil::compress(m_overrunElement->overrunBitSet, pvStructure); + + m_monitorQueue.push(m_overrunElement); + + m_overrunElement.reset(); + m_overrunInProgress = false; + } } Status start() { Lock guard(m_mutex); + while (!m_monitorQueue.empty()) + { + m_freeQueue.push_back(m_monitorQueue.front()); + m_monitorQueue.pop(); + } + if (m_overrunElement) + { + m_freeQueue.push_back(m_overrunElement); + m_overrunElement.reset(); + } m_overrunInProgress = false; - m_monitorQueue->clear(); - m_monitorElement = m_monitorQueue->getFree(); - m_needToReleaseFirst = false; return Status::Ok; }