diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 255360f..f37ca92 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2059,145 +2059,240 @@ namespace epics { - class ElementQueue : public Monitor { + class MonitorStrategy : public Monitor { public: - virtual ~ElementQueue() {}; + virtual ~MonitorStrategy() {}; virtual void init(StructureConstPtr const & structure) = 0; virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0; }; typedef Queue MonitorElementQueue; - typedef std::tr1::shared_ptr MonitorElementQueuePtr; - class MultipleElementQueue : - public ElementQueue, - public std::tr1::enable_shared_from_this - { - private: - + class MonitorStrategyQueue : + public MonitorStrategy, + public std::tr1::enable_shared_from_this + { + private: + + int32 m_queueSize; + + StructureConstPtr m_lastStructure; + std::tr1::shared_ptr m_monitorQueue; + + MonitorRequester::shared_pointer m_callback; - int queueSize; - bool queueIsFull; - MonitorElementQueuePtr queue; - BitSetPtr changedBitSet; - BitSetPtr overrunBitSet; - MonitorElementPtr latestMonitorElement; - Mutex m_mutex; - public: - - MultipleElementQueue(MonitorRequester::shared_pointer const & callback,int queueSize) - : - m_callback(callback), - queueSize(queueSize), - queueIsFull(false) - { - } - - virtual ~MultipleElementQueue() - { - } - + Mutex m_mutex; + + BitSet::shared_pointer m_bitSet1; + BitSet::shared_pointer m_bitSet2; + bool m_overrunInProgress; + + bool m_needToReleaseFirst; + + MonitorElement::shared_pointer m_nullMonitorElement; + MonitorElement::shared_pointer m_monitorElement; + + public: + + MonitorStrategyQueue(MonitorRequester::shared_pointer const & callback, int32 queueSize) : + m_queueSize(queueSize), m_lastStructure(), m_monitorQueue(), + m_callback(callback), m_mutex(), + m_bitSet1(), m_bitSet2(), m_overrunInProgress(false), + m_needToReleaseFirst(false), + m_nullMonitorElement(), m_monitorElement() + { + if (queueSize <= 1) + throw std::invalid_argument("queueSize <= 1"); + } + + virtual ~MonitorStrategyQueue() + { + } + virtual void init(StructureConstPtr const & structure) { Lock guard(m_mutex); - size_t nfields = 0; - std::vector monitorElementArray; - monitorElementArray.reserve(queueSize); - for(int i=0; icreatePVStructure(structure); - if(nfields==0) nfields = pvStructure->getNumberFields(); - MonitorElementPtr monitorElement( - new MonitorElement(pvStructure)); - monitorElementArray.push_back(monitorElement); - } - queue.reset(new MonitorElementQueue(monitorElementArray)); - changedBitSet.reset(new BitSet(nfields)); - overrunBitSet.reset(new BitSet(nfields)); - } - - virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) { - { - Lock guard(m_mutex); - if(queueIsFull) { - MonitorElementPtr monitorElement = latestMonitorElement; - PVStructurePtr pvStructure = monitorElement->pvStructurePtr; - changedBitSet->deserialize(payloadBuffer, transport.get()); - pvStructure->deserialize( - payloadBuffer, - transport.get(), - changedBitSet.get()); - overrunBitSet->deserialize(payloadBuffer, transport.get()); - (*monitorElement->changedBitSet)|= (*changedBitSet); - (*monitorElement->overrunBitSet)|= (*changedBitSet); - changedBitSet->clear(); - overrunBitSet->clear(); - return; + + // reuse on reconnect + if (m_lastStructure.get() == 0 || + *(m_lastStructure.get()) == *(structure.get())) + { + std::vector monitorElementArray; + monitorElementArray.reserve(m_queueSize); + + for (int32 i = 0; i < m_queueSize; i++) + { + PVStructure::shared_pointer pvStructure = getPVDataCreate()->createPVStructure(structure); + MonitorElement::shared_pointer monitorElement(new MonitorElement(pvStructure)); + monitorElementArray.push_back(monitorElement); } - MonitorElementPtr monitorElement = queue->getFree(); - if(monitorElement==NULL) { - throw std::logic_error(std::string("RealQueue::dataChanged() logic error")); - } - if(queue->getNumberFree()==0){ - queueIsFull = true; - latestMonitorElement = monitorElement; - } - PVStructurePtr pvStructure = monitorElement->pvStructurePtr; - changedBitSet->deserialize(payloadBuffer, transport.get()); - pvStructure->deserialize( - payloadBuffer, - transport.get(), - changedBitSet.get()); - overrunBitSet->deserialize(payloadBuffer, transport.get()); - BitSetUtil::compress(changedBitSet,pvStructure); - BitSetUtil::compress(overrunBitSet,pvStructure); - monitorElement->changedBitSet->clear(); - (*monitorElement->changedBitSet)|=(*changedBitSet); - monitorElement->overrunBitSet->clear(); - (*monitorElement->overrunBitSet)|=(*overrunBitSet); - changedBitSet->clear(); - overrunBitSet->clear(); - queue->setUsed(monitorElement); - } - EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); + m_monitorQueue.reset(new MonitorElementQueue(monitorElementArray)); + m_lastStructure = structure; + } } - + + virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) { + + bool notify = false; + + { + Lock guard(m_mutex); + + // if in overrun mode, check if some is free + if (m_overrunInProgress) + { + MonitorElementPtr newElement = m_monitorQueue->getFree(); + if (newElement.get() != 0) + { + // take new, put current in use + PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr; + getConvert()->copy(pvStructure, newElement->pvStructurePtr); + + BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure); + BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure); + m_monitorQueue->setUsed(m_monitorElement); + + m_monitorElement = newElement; + notify = true; + + m_overrunInProgress = false; + } + } + } + + if (notify) + { + EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); + } + + { + Lock guard(m_mutex); + + // setup current fields + PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr; + BitSet::shared_pointer changedBitSet = m_monitorElement->changedBitSet; + BitSet::shared_pointer overrunBitSet = m_monitorElement->overrunBitSet; + + // special treatment if in overrun state + if (m_overrunInProgress) + { + // lazy init + if (m_bitSet1.get() == 0) m_bitSet1.reset(new BitSet(changedBitSet->size())); + if (m_bitSet2.get() == 0) m_bitSet2.reset(new BitSet(overrunBitSet->size())); + + m_bitSet1->deserialize(payloadBuffer, transport.get()); + pvStructure->deserialize(payloadBuffer, transport.get(), m_bitSet1.get()); + m_bitSet2->deserialize(payloadBuffer, transport.get()); + + // OR local overrun + // TODO this does not work perfectly if bitSet is compressed !!! + // uncompressed bitSets should be used !!! + overrunBitSet->or_and(*(changedBitSet.get()), *(m_bitSet1.get())); + + // OR remove change + *(changedBitSet.get()) |= *(m_bitSet1.get()); + + // OR remote overrun + *(overrunBitSet.get()) |= *(m_bitSet2.get()); + } + else + { + // deserialize changedBitSet and data, and overrun bit set + changedBitSet->deserialize(payloadBuffer, transport.get()); + pvStructure->deserialize(payloadBuffer, transport.get(), changedBitSet.get()); + overrunBitSet->deserialize(payloadBuffer, transport.get()); + } + + // prepare next free (if any) + MonitorElementPtr newElement = m_monitorQueue->getFree(); + if (newElement.get() == 0) { + m_overrunInProgress = true; + return; + } + + // if there was overrun in progress we manipulated bitSets... compress them + if (m_overrunInProgress) { + BitSetUtil::compress(changedBitSet, pvStructure); + BitSetUtil::compress(overrunBitSet, pvStructure); + + m_overrunInProgress = false; + } + + getConvert()->copy(pvStructure, newElement->pvStructurePtr); + + m_monitorQueue->setUsed(m_monitorElement); + + m_monitorElement = newElement; + } + + EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); + + } + virtual MonitorElement::shared_pointer poll() { Lock guard(m_mutex); - return queue->getUsed(); - } - - virtual void release(MonitorElement::shared_pointer const & currentElement) { - Lock guard(m_mutex); - if(queueIsFull) { - MonitorElementPtr monitorElement = latestMonitorElement; - PVStructurePtr pvStructure = monitorElement->pvStructurePtr; - BitSetUtil::compress(monitorElement->changedBitSet,pvStructure); - BitSetUtil::compress(monitorElement->overrunBitSet,pvStructure); - queueIsFull = false; - latestMonitorElement.reset(); - } - queue->releaseUsed(currentElement); - } - - Status start() { - Lock guard(m_mutex); - queue->clear(); - changedBitSet->clear(); - overrunBitSet->clear(); - queueIsFull = false; - return Status::Ok; - } - - Status stop() { - return Status::Ok; - } - - void destroy() { - } - - }; + if (m_needToReleaseFirst) + return m_nullMonitorElement; + MonitorElementPtr retVal = m_monitorQueue->getUsed(); + if (retVal.get() != 0) + { + m_needToReleaseFirst = true; + return retVal; + } + + // if in overrun mode and we have free, make it as last element + if (m_overrunInProgress) + { + MonitorElementPtr newElement = m_monitorQueue->getFree(); + if (newElement.get() != 0) + { + // take new, put current in use + PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr; + getConvert()->copy(pvStructure, newElement->pvStructurePtr); + + BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure); + BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure); + m_monitorQueue->setUsed(m_monitorElement); + + m_monitorElement = newElement; + + m_overrunInProgress = false; + + m_needToReleaseFirst = true; + return m_monitorQueue->getUsed(); + } + else + return m_nullMonitorElement; // should never happen since queueSize >= 2, but a client not calling release can do this + } + else + return m_nullMonitorElement; + } + + virtual void release(MonitorElement::shared_pointer const & monitorElement) { + Lock guard(m_mutex); + m_monitorQueue->releaseUsed(monitorElement); + m_needToReleaseFirst = false; + } + + Status start() { + Lock guard(m_mutex); + m_overrunInProgress = false; + m_monitorQueue->clear(); + m_monitorElement = m_monitorQueue->getFree(); + m_needToReleaseFirst = false; + return Status::Ok; + } + + Status stop() { + return Status::Ok; + } + + void destroy() { + } + + }; @@ -2213,7 +2308,7 @@ namespace epics { PVStructure::shared_pointer m_pvRequest; - std::tr1::shared_ptr m_ElementQueue; + std::tr1::shared_ptr m_monitorStrategy; ChannelMonitorImpl( ChannelImpl::shared_pointer const & channel, @@ -2256,8 +2351,8 @@ namespace epics { BaseRequestImpl::activate(); - if(queueSize<2) queueSize = 2; - m_ElementQueue.reset(new MultipleElementQueue(m_monitorRequester,queueSize)); + if (queueSize<2) queueSize = 2; + m_monitorStrategy.reset(new MonitorStrategyQueue(m_monitorRequester, queueSize)); // subscribe try { @@ -2329,7 +2424,7 @@ namespace epics { dynamic_pointer_cast( transport->cachedDeserialize(payloadBuffer) ); - m_ElementQueue->init(structure); + m_monitorStrategy->init(structure); // notify Monitor::shared_pointer thisChannelMonitor = dynamic_pointer_cast(shared_from_this()); @@ -2352,7 +2447,7 @@ namespace epics { } else { - m_ElementQueue->response(transport, payloadBuffer); + m_monitorStrategy->response(transport, payloadBuffer); } } @@ -2403,7 +2498,7 @@ namespace epics { if (!m_initialized) return BaseRequestImpl::notInitializedStatus; - m_ElementQueue->start(); + m_monitorStrategy->start(); // start == process + get if (!startRequest(QOS_PROCESS | QOS_GET)) @@ -2429,7 +2524,7 @@ namespace epics { if (!m_initialized) return BaseRequestImpl::notInitializedStatus; - m_ElementQueue->stop(); + m_monitorStrategy->stop(); // stop == process + no get if (!startRequest(QOS_PROCESS)) @@ -2454,12 +2549,12 @@ namespace epics { virtual MonitorElement::shared_pointer poll() { - return m_ElementQueue->poll(); + return m_monitorStrategy->poll(); } virtual void release(MonitorElement::shared_pointer const & monitorElement) { - m_ElementQueue->release(monitorElement); + m_monitorStrategy->release(monitorElement); } virtual void lock()