From cc9fae09e50fd26387daba98535423c4ade088dc Mon Sep 17 00:00:00 2001 From: Marty Kraimer Date: Tue, 12 Nov 2013 10:55:21 -0500 Subject: [PATCH] new implementation of monitor queue --- .../remoteClient/clientContextImpl.cpp | 765 ++++++------------ 1 file changed, 254 insertions(+), 511 deletions(-) diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index 14b29c7..8b48632 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include @@ -45,11 +46,12 @@ namespace epics { namespace pvAccess { String ClientContextImpl::PROVIDER_NAME = "pva"; - - Status ChannelImpl::channelDestroyed = Status(Status::STATUSTYPE_WARNING, "channel destroyed"); - Status ChannelImpl::channelDisconnected = Status(Status::STATUSTYPE_WARNING, "channel disconnected"); - + Status ChannelImpl::channelDestroyed = Status( + Status::STATUSTYPE_WARNING, "channel destroyed"); + Status ChannelImpl::channelDisconnected = Status( + Status::STATUSTYPE_WARNING, "channel disconnected"); String emptyString; + ConvertPtr convert = getConvert(); // TODO consider std::unordered_map //typedef std::tr1::unordered_map IOIDResponseRequestMap; @@ -1802,509 +1804,256 @@ namespace epics { - - - - - class MonitorStrategy : public Monitor { + class ElementQueue : public Monitor { public: - virtual ~MonitorStrategy() {}; + virtual ~ElementQueue() {}; virtual void init(StructureConstPtr const & structure) = 0; virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0; }; - class MonitorStrategyNotify : - public MonitorStrategy, - public std::tr1::enable_shared_from_this + + + class SingleElementQueue : + public ElementQueue, + public std::tr1::enable_shared_from_this { private: MonitorRequester::shared_pointer m_callback; bool m_gotMonitor; + MonitorElement::shared_pointer m_monitorElement; Mutex m_mutex; - BitSet::shared_pointer nullBitSet; - PVStructure::shared_pointer nullPVStructure; - MonitorElement::shared_pointer m_nullMonitorElement; - MonitorElement::shared_pointer m_monitorElement; - + + PVStructurePtr m_pvStructure; + BitSetPtr m_responseChangeBitSet; + BitSetPtr m_responseOverrunBitSet; + BitSetPtr m_structureChangeBitSet; + BitSetPtr m_structureOverrunBitSet; + MonitorElementPtr m_nullMonitorElement; + public: - MonitorStrategyNotify(MonitorRequester::shared_pointer const & callback) : + SingleElementQueue(MonitorRequester::shared_pointer const & callback) : m_callback(callback), m_gotMonitor(false), - m_mutex(), m_monitorElement(new MonitorElement()) + m_monitorElement(new MonitorElement()) { } - virtual ~MonitorStrategyNotify() + virtual ~SingleElementQueue() { } - virtual void init(StructureConstPtr const & /*structure*/) { - // noop - } - - virtual void response(Transport::shared_pointer const & /*transport*/, ByteBuffer* /*payloadBuffer*/) { - Lock guard(m_mutex); - m_gotMonitor = true; - // no data, only notify - EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); - } - - virtual MonitorElement::shared_pointer poll() { - Lock guard(m_mutex); - // TODO PVAS when available - bool gotMonitor = m_gotMonitor; - m_gotMonitor = false; - if (gotMonitor) - return m_monitorElement; - else - return m_nullMonitorElement; - } - - virtual void release(MonitorElement::shared_pointer const & /*monitorElement*/) { - // noop - } - - Status start() { - return Status::Ok; - } - - Status stop() { - return Status::Ok; - } - - void destroy() { - } - - }; - - class MonitorStrategyEntire : - public MonitorStrategy, - public std::tr1::enable_shared_from_this - { - private: - - MonitorRequester::shared_pointer m_callback; - - bool m_gotMonitor; - Mutex m_mutex; - - MonitorElement::shared_pointer m_nullMonitorElement; - MonitorElement::shared_pointer m_monitorElement; - - public: - - MonitorStrategyEntire(MonitorRequester::shared_pointer const & callback) : - m_callback(callback), m_gotMonitor(false), - m_mutex(), m_monitorElement(new MonitorElement()) - { - } - - virtual ~MonitorStrategyEntire() - { - } - - virtual void init(StructureConstPtr const & structure) { - Lock guard(m_mutex); - - // reuse on reconnect - if (m_monitorElement->pvStructurePtr.get() == 0 || - *(m_monitorElement->pvStructurePtr->getStructure().get()) != *(structure.get())) - { - m_monitorElement->pvStructurePtr = getPVDataCreate()->createPVStructure(structure); - int numberFields = m_monitorElement->pvStructurePtr->getNumberFields(); - m_monitorElement->changedBitSet.reset(new BitSet(numberFields)); - m_monitorElement->overrunBitSet.reset(new BitSet(numberFields)); - } - } - - virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) { - Lock guard(m_mutex); - // simply deserialize and notify - m_monitorElement->changedBitSet->deserialize(payloadBuffer, transport.get()); - m_monitorElement->pvStructurePtr->deserialize(payloadBuffer, transport.get(), m_monitorElement->changedBitSet.get()); - m_monitorElement->overrunBitSet->deserialize(payloadBuffer, transport.get()); - m_gotMonitor = true; - EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); - } - - virtual MonitorElement::shared_pointer poll() { - Lock guard(m_mutex); - // TODO PVAS when available - bool gotMonitor = m_gotMonitor; - m_gotMonitor = false; - if (gotMonitor) - return m_monitorElement; - else - return m_nullMonitorElement; - } - - virtual void release(MonitorElement::shared_pointer const & /*monitorElement*/) { - // noop - } - - Status start() { - Lock guard(m_mutex); - m_gotMonitor = false; - return Status::Ok; - } - - Status stop() { - return Status::Ok; - } - - void destroy() { - } - - }; - - - class MonitorStrategySingle : - public MonitorStrategy, - public std::tr1::enable_shared_from_this - { - private: - - MonitorRequester::shared_pointer m_callback; - - bool m_gotMonitor; - Mutex m_mutex; - - - BitSet::shared_pointer m_structureChangeBitSet; - BitSet::shared_pointer m_structureOverrunBitSet; - bool m_needToCompress; - - MonitorElement::shared_pointer m_nullMonitorElement; - MonitorElement::shared_pointer m_monitorElement; - - public: - - MonitorStrategySingle(MonitorRequester::shared_pointer const & callback) : - m_callback(callback), m_gotMonitor(false), m_mutex(), - m_needToCompress(false), m_monitorElement(new MonitorElement()) - { - } - - virtual ~MonitorStrategySingle() - { - } - - virtual void init(StructureConstPtr const & structure) { - Lock guard(m_mutex); - - // reuse on reconnect - if (m_monitorElement->pvStructurePtr.get() == 0 || - *(m_monitorElement->pvStructurePtr->getStructure().get()) == *(structure.get())) - { - m_monitorElement->pvStructurePtr = getPVDataCreate()->createPVStructure(structure); - int numberFields = m_monitorElement->pvStructurePtr->getNumberFields(); - m_monitorElement->changedBitSet.reset(new BitSet(numberFields)); - m_monitorElement->overrunBitSet.reset(new BitSet(numberFields)); - - m_structureChangeBitSet.reset(new BitSet(numberFields)); - m_structureOverrunBitSet.reset(new BitSet(numberFields)); - } - } - - virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) { - Lock guard(m_mutex); - if (!m_gotMonitor) - { - // simply deserialize and notify - m_monitorElement->changedBitSet->deserialize(payloadBuffer, transport.get()); - m_monitorElement->pvStructurePtr->deserialize(payloadBuffer, transport.get(), m_monitorElement->changedBitSet.get()); - m_monitorElement->overrunBitSet->deserialize(payloadBuffer, transport.get()); - m_gotMonitor = true; - EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); - } - else - { - // deserialize first - m_structureChangeBitSet->deserialize(payloadBuffer, transport.get()); - m_monitorElement->pvStructurePtr->deserialize(payloadBuffer, transport.get(), m_structureChangeBitSet.get()); - m_structureOverrunBitSet->deserialize(payloadBuffer, transport.get()); - - // OR local overrun - // TODO should work only on uncompressed - m_monitorElement->overrunBitSet->or_and(*m_structureChangeBitSet.get(), *m_monitorElement->changedBitSet.get()); - - // OR new changes - *(m_monitorElement->changedBitSet) |= *m_structureChangeBitSet.get(); - - // OR remote overrun - *(m_monitorElement->overrunBitSet) |= *m_structureOverrunBitSet.get(); - } - } - - virtual MonitorElement::shared_pointer poll() { - Lock guard(m_mutex); - if (!m_gotMonitor) return m_nullMonitorElement; - m_gotMonitor = false; - - // compress if needed - if (m_needToCompress) - { - BitSetUtil::compress(m_monitorElement->changedBitSet, m_monitorElement->pvStructurePtr); - BitSetUtil::compress(m_monitorElement->overrunBitSet, m_monitorElement->pvStructurePtr); - m_needToCompress = false; - } - - return m_monitorElement; - } - - virtual void release(MonitorElement::shared_pointer const & /*monitorElement*/) { - // noop - } - - Status start() { - Lock guard(m_mutex); - // TODO no such check in Java - if (!m_monitorElement->changedBitSet.get()) - return Status(Status::STATUSTYPE_ERROR, "Monitor not connected."); - m_gotMonitor = false; - m_monitorElement->changedBitSet->clear(); - m_monitorElement->overrunBitSet->clear(); - return Status::Ok; - } - - Status stop() { - return Status::Ok; - } - - void destroy() { - } - - }; - - - - - class MonitorStrategyQueue : - public MonitorStrategy, - public std::tr1::enable_shared_from_this - { - private: - - int32 m_queueSize; - - StructureConstPtr m_lastStructure; - //MonitorQueue::shared_pointer m_monitorQueue; - - - MonitorRequester::shared_pointer m_callback; - - Mutex m_mutex; - - BitSet::shared_pointer m_bitSet1; - BitSet::shared_pointer m_bitSet2; - bool m_overrunInProgress; - - bool m_needToReleaseFirst; - - MonitorElement::shared_pointer m_nullMonitorElement; - MonitorElement::shared_pointer m_monitorElement; - - public: - - MonitorStrategyQueue(MonitorRequester::shared_pointer const & callback, int32 queueSize) : - m_queueSize(queueSize), m_lastStructure(),// m_monitorQueue(), - m_callback(callback), m_mutex(), - m_bitSet1(), m_bitSet2(), m_overrunInProgress(false), - m_needToReleaseFirst(false), - m_nullMonitorElement(), m_monitorElement() - { - if (queueSize <= 1) - throw std::invalid_argument("queueSize <= 1"); - } - - virtual ~MonitorStrategyQueue() - { - } - - virtual void init(StructureConstPtr const & structure) { - Lock guard(m_mutex); - - // reuse on reconnect - if (m_lastStructure.get() == 0 || - *(m_lastStructure.get()) == *(structure.get())) - { - /* - MonitorElement[] monitorElements = new MonitorElement[queueSize]; - for(int i=0; ipvStructurePtr; - getConvert()->copy(pvStructure, newElement->pvStructurePtr); - - BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure); - BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure); - - //monitorQueue.setUsed(monitorElement); - - m_monitorElement = newElement; - notify = true; - - m_overrunInProgress = false; - } - } - } - - if (notify) - { - EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); - } - - { - Lock guard(m_mutex); - - // setup current fields - PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr; - BitSet::shared_pointer changedBitSet = m_monitorElement->changedBitSet; - BitSet::shared_pointer overrunBitSet = m_monitorElement->overrunBitSet; - - // special treatment if in overrun state - if (m_overrunInProgress) - { - // lazy init - if (m_bitSet1.get() == 0) m_bitSet1.reset(new BitSet(changedBitSet->size())); - if (m_bitSet2.get() == 0) m_bitSet2.reset(new BitSet(overrunBitSet->size())); - - m_bitSet1->deserialize(payloadBuffer, transport.get()); - pvStructure->deserialize(payloadBuffer, transport.get(), m_bitSet1.get()); - m_bitSet2->deserialize(payloadBuffer, transport.get()); - - // OR local overrun - // TODO this does not work perfectly... uncompressed bitSets should be used!!! - overrunBitSet->or_and(*(changedBitSet.get()), *(m_bitSet1.get())); - - // OR remove change - *(changedBitSet.get()) |= *(m_bitSet1.get()); - - // OR remote overrun - *(overrunBitSet.get()) |= *(m_bitSet2.get()); - } - else - { - // deserialize changedBitSet and data, and overrun bit set - changedBitSet->deserialize(payloadBuffer, transport.get()); - pvStructure->deserialize(payloadBuffer, transport.get(), changedBitSet.get()); - overrunBitSet->deserialize(payloadBuffer, transport.get()); - } - - // prepare next free (if any) - MonitorElementPtr newElement; // = monitorQueue.getFree(); - if (newElement.get() == 0) { - m_overrunInProgress = true; - return; - } - - // if there was overrun in progress we manipulated bitSets... compress them - if (m_overrunInProgress) { - BitSetUtil::compress(changedBitSet, pvStructure); - BitSetUtil::compress(overrunBitSet, pvStructure); - - m_overrunInProgress = false; - } - - getConvert()->copy(pvStructure, newElement->pvStructurePtr); - - //monitorQueue.setUsed(monitorElement); - - m_monitorElement = newElement; - } - - EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); - - } - - virtual MonitorElement::shared_pointer poll() { - Lock guard(m_mutex); - - if (m_needToReleaseFirst) - return m_nullMonitorElement; - MonitorElementPtr retVal;// = monitorQueue.getUsed(); - if (retVal.get() != 0) - { - m_needToReleaseFirst = true; - return retVal; - } - - // if in overrun mode and we have free, make it as last element - if (m_overrunInProgress) - { - MonitorElementPtr newElement;// = monitorQueue.getFree(); - if (newElement.get() != 0) - { - // take new, put current in use - PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr; - getConvert()->copy(pvStructure, newElement->pvStructurePtr); - - BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure); - BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure); - //monitorQueue.setUsed(monitorElement); - - m_monitorElement = newElement; - - m_overrunInProgress = false; - - m_needToReleaseFirst = true; - return m_nullMonitorElement; // TODO monitorQueue.getUsed(); - } - else - return m_nullMonitorElement; // should never happen since queueSize >= 2, but a client not calling release can do this - } - else - return m_nullMonitorElement; + virtual void init(StructureConstPtr const & structure) { + Lock guard(m_mutex); + m_pvStructure = getPVDataCreate()->createPVStructure(structure); + int numberFields = m_pvStructure->getNumberFields(); + m_monitorElement->pvStructurePtr = getPVDataCreate()->createPVStructure(structure); + m_monitorElement->changedBitSet.reset(new BitSet(numberFields)); + m_monitorElement->overrunBitSet.reset(new BitSet(numberFields)); + m_responseChangeBitSet.reset(new BitSet(numberFields)); + m_responseOverrunBitSet.reset(new BitSet(numberFields)); + m_structureChangeBitSet.reset(new BitSet(numberFields)); + m_structureOverrunBitSet.reset(new BitSet(numberFields)); } + + virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) { + bool monitorEvent(false); + { + Lock guard(m_mutex); + // deserialize first + m_responseChangeBitSet->deserialize(payloadBuffer, transport.get()); + m_pvStructure->deserialize( + payloadBuffer, + transport.get(), + m_responseChangeBitSet.get()); + m_responseOverrunBitSet->deserialize(payloadBuffer, transport.get()); + // OR local overrun + // OR new changes + m_structureOverrunBitSet->or_and( + *m_responseChangeBitSet.get(),*m_structureChangeBitSet.get()); + // OR new changes + *(m_structureChangeBitSet) |= (*m_responseChangeBitSet); + // OR remote overrun + *(m_structureOverrunBitSet) |= (*m_responseOverrunBitSet); + if (!m_gotMonitor) + { + m_gotMonitor = true; + monitorEvent = true; + } + } + if(monitorEvent) EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); + } + + virtual MonitorElement::shared_pointer poll() { + Lock guard(m_mutex); + if (!m_gotMonitor) return m_nullMonitorElement; + m_gotMonitor = false; + convert->copyStructure(m_pvStructure,m_monitorElement->pvStructurePtr); + BitSetUtil::compress(m_structureChangeBitSet,m_pvStructure); + BitSetUtil::compress(m_structureOverrunBitSet,m_pvStructure); + (*m_monitorElement->changedBitSet) = (*m_structureChangeBitSet); + (*m_monitorElement->overrunBitSet) = (*m_structureOverrunBitSet); + m_structureChangeBitSet->clear(); + m_structureOverrunBitSet->clear(); + return m_monitorElement; + } + + virtual void release(MonitorElement::shared_pointer const & /*monitorElement*/) { + // noop + } + + Status start() { + Lock guard(m_mutex); + // TODO no such check in Java + if (!m_monitorElement->changedBitSet.get()) + return Status(Status::STATUSTYPE_ERROR, "Monitor not connected."); + m_gotMonitor = false; + m_monitorElement->changedBitSet->clear(); + m_monitorElement->overrunBitSet->clear(); + return Status::Ok; + } + + Status stop() { + return Status::Ok; + } + + void destroy() { + } + + }; - virtual void release(MonitorElement::shared_pointer const & /*monitorElement*/) { - Lock guard(m_mutex); - //monitorQueue.releaseUsed(monitorElement); - m_needToReleaseFirst = false; - } + typedef Queue MonitorElementQueue; + typedef std::tr1::shared_ptr MonitorElementQueuePtr; - Status start() { - Lock guard(m_mutex); - m_overrunInProgress = false; - //monitorQueue.clear(); - //m_monitorElement = monitorQueue.getFree(); - m_needToReleaseFirst = false; - return Status::Ok; - } + class MultipleElementQueue : + public ElementQueue, + public std::tr1::enable_shared_from_this + { + private: + + MonitorRequester::shared_pointer m_callback; + int queueSize; + bool queueIsFull; + MonitorElementQueuePtr queue; + BitSetPtr changedBitSet; + BitSetPtr overrunBitSet; + MonitorElementPtr latestMonitorElement; + Mutex m_mutex; - Status stop() { - return Status::Ok; - } - - void destroy() { - } - - }; + public: + + MultipleElementQueue(MonitorRequester::shared_pointer const & callback,int queueSize) + : + m_callback(callback), + queueSize(queueSize), + queueIsFull(false) + { + } + + virtual ~MultipleElementQueue() + { + } + + virtual void init(StructureConstPtr const & structure) { + Lock guard(m_mutex); + size_t nfields = 0; + std::vector monitorElementArray; + monitorElementArray.reserve(queueSize); + for(int i=0; icreatePVStructure(structure); + if(nfields==0) nfields = pvStructure->getNumberFields(); + MonitorElementPtr monitorElement( + new MonitorElement(pvStructure)); + monitorElementArray.push_back(monitorElement); + } + queue.reset(new MonitorElementQueue(monitorElementArray)); + changedBitSet.reset(new BitSet(nfields)); + overrunBitSet.reset(new BitSet(nfields)); + } + + virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) { + { + Lock guard(m_mutex); + if(queueIsFull) { + MonitorElementPtr monitorElement = latestMonitorElement; + PVStructurePtr pvStructure = monitorElement->pvStructurePtr; + changedBitSet->deserialize(payloadBuffer, transport.get()); + pvStructure->deserialize( + payloadBuffer, + transport.get(), + changedBitSet.get()); + overrunBitSet->deserialize(payloadBuffer, transport.get()); + (*monitorElement->changedBitSet)|= (*changedBitSet); + (*monitorElement->overrunBitSet)|= (*changedBitSet); + changedBitSet->clear(); + overrunBitSet->clear(); + return; + } + MonitorElementPtr monitorElement = queue->getFree(); + if(monitorElement==NULL) { + throw std::logic_error(String("RealQueue::dataChanged() logic error")); + } + if(queue->getNumberFree()==0){ + queueIsFull = true; + latestMonitorElement = monitorElement; + } + PVStructurePtr pvStructure = monitorElement->pvStructurePtr; + changedBitSet->deserialize(payloadBuffer, transport.get()); + pvStructure->deserialize( + payloadBuffer, + transport.get(), + changedBitSet.get()); + overrunBitSet->deserialize(payloadBuffer, transport.get()); + BitSetUtil::compress(changedBitSet,pvStructure); + BitSetUtil::compress(overrunBitSet,pvStructure); + monitorElement->changedBitSet->clear(); + (*monitorElement->changedBitSet)|=(*changedBitSet); + monitorElement->overrunBitSet->clear(); + (*monitorElement->overrunBitSet)|=(*overrunBitSet); + changedBitSet->clear(); + overrunBitSet->clear(); + queue->setUsed(monitorElement); + } + EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this())); + } + + virtual MonitorElement::shared_pointer poll() { + Lock guard(m_mutex); + return queue->getUsed(); + } + + virtual void release(MonitorElement::shared_pointer const & currentElement) { + Lock guard(m_mutex); + if(queueIsFull) { + MonitorElementPtr monitorElement = latestMonitorElement; + PVStructurePtr pvStructure = monitorElement->pvStructurePtr; + BitSetUtil::compress(monitorElement->changedBitSet,pvStructure); + BitSetUtil::compress(monitorElement->overrunBitSet,pvStructure); + queueIsFull = false; + latestMonitorElement.reset(); + } + queue->releaseUsed(currentElement); + } + + Status start() { + Lock guard(m_mutex); + queue->clear(); + changedBitSet->clear(); + overrunBitSet->clear(); + queueIsFull = false; + return Status::Ok; + } + + Status stop() { + return Status::Ok; + } + + void destroy() { + } + + }; @@ -2321,7 +2070,7 @@ namespace epics { PVStructure::shared_pointer m_pvRequest; - std::tr1::shared_ptr m_monitorStrategy; + std::tr1::shared_ptr m_monitorStrategy; ChannelMonitorImpl(ChannelImpl::shared_pointer const & channel, MonitorRequester::shared_pointer const & monitorRequester, PVStructure::shared_pointer const & pvRequest) : BaseRequestImpl(channel, monitorRequester), @@ -2341,37 +2090,31 @@ namespace epics { return; } - int queueSize = 2; - PVFieldPtr pvField = m_pvRequest->getSubField("record.queueSize"); - if (pvField.get()) { - PVStringPtr pvString = dynamic_pointer_cast(pvField); - if (pvString.get()) - { - String value = pvString->get(); - - istringstream buffer(value); - - if ((buffer >> queueSize).fail()) - { - Status failedToConvert(Status::STATUSTYPE_ERROR, "queueSize type is not a valid integer"); - Monitor::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_monitorRequester->monitorConnect(failedToConvert, thisPointer, StructureConstPtr())); - return; - } - } - } - + int queueSize = 2; + PVFieldPtr pvField = m_pvRequest->getSubField("record._options"); + if (pvField.get()) { + PVStructurePtr pvOptions = static_pointer_cast(pvField); + pvField = pvOptions->getSubField("queueSize"); + if (pvField.get()) { + PVStringPtr pvString = pvOptions->getStringField("queueSize"); + if(pvString.get()!=NULL) { + int32 size; + std::stringstream ss; + ss << pvString->get(); + ss >> size; + queueSize = size; + } + } + } + BaseRequestImpl::activate(); - if (queueSize == -1) - m_monitorStrategy.reset(new MonitorStrategyNotify(m_monitorRequester)); - else if (queueSize == 0) // 0 means all (old v3 style), some sending optimization can be done (not to send bit-sets) - m_monitorStrategy.reset(new MonitorStrategyEntire(m_monitorRequester)); - else //if (queueSize == 1) - m_monitorStrategy.reset(new MonitorStrategySingle(m_monitorRequester)); - /* else - m_monitorStrategy.reset(new MonitorStrategyQueue(queueSize)); - */ + if(queueSize<1) queueSize = 1; + if (queueSize == 1) { + m_monitorStrategy.reset(new SingleElementQueue(m_monitorRequester)); + } else { + m_monitorStrategy.reset(new MultipleElementQueue(m_monitorRequester,queueSize)); + } // subscribe