new implementation of monitor queue

This commit is contained in:
Marty Kraimer
2013-11-12 10:55:21 -05:00
parent 530a96746b
commit cc9fae09e5

View File

@@ -30,6 +30,7 @@
#include <pv/bitSetUtil.h>
#include <pv/serializationHelper.h>
#include <pv/convert.h>
#include <pv/queue.h>
#include <pv/pvAccessMB.h>
@@ -45,11 +46,12 @@ namespace epics {
namespace pvAccess {
String ClientContextImpl::PROVIDER_NAME = "pva";
Status ChannelImpl::channelDestroyed = Status(Status::STATUSTYPE_WARNING, "channel destroyed");
Status ChannelImpl::channelDisconnected = Status(Status::STATUSTYPE_WARNING, "channel disconnected");
Status ChannelImpl::channelDestroyed = Status(
Status::STATUSTYPE_WARNING, "channel destroyed");
Status ChannelImpl::channelDisconnected = Status(
Status::STATUSTYPE_WARNING, "channel disconnected");
String emptyString;
ConvertPtr convert = getConvert();
// TODO consider std::unordered_map
//typedef std::tr1::unordered_map<pvAccessID, ResponseRequest::weak_pointer> IOIDResponseRequestMap;
@@ -1802,509 +1804,256 @@ namespace epics {
class MonitorStrategy : public Monitor {
class ElementQueue : public Monitor {
public:
virtual ~MonitorStrategy() {};
virtual ~ElementQueue() {};
virtual void init(StructureConstPtr const & structure) = 0;
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0;
};
class MonitorStrategyNotify :
public MonitorStrategy,
public std::tr1::enable_shared_from_this<MonitorStrategyNotify>
class SingleElementQueue :
public ElementQueue,
public std::tr1::enable_shared_from_this<SingleElementQueue>
{
private:
MonitorRequester::shared_pointer m_callback;
bool m_gotMonitor;
MonitorElement::shared_pointer m_monitorElement;
Mutex m_mutex;
BitSet::shared_pointer nullBitSet;
PVStructure::shared_pointer nullPVStructure;
MonitorElement::shared_pointer m_nullMonitorElement;
MonitorElement::shared_pointer m_monitorElement;
PVStructurePtr m_pvStructure;
BitSetPtr m_responseChangeBitSet;
BitSetPtr m_responseOverrunBitSet;
BitSetPtr m_structureChangeBitSet;
BitSetPtr m_structureOverrunBitSet;
MonitorElementPtr m_nullMonitorElement;
public:
MonitorStrategyNotify(MonitorRequester::shared_pointer const & callback) :
SingleElementQueue(MonitorRequester::shared_pointer const & callback) :
m_callback(callback), m_gotMonitor(false),
m_mutex(), m_monitorElement(new MonitorElement())
m_monitorElement(new MonitorElement())
{
}
virtual ~MonitorStrategyNotify()
virtual ~SingleElementQueue()
{
}
virtual void init(StructureConstPtr const & /*structure*/) {
// noop
}
virtual void response(Transport::shared_pointer const & /*transport*/, ByteBuffer* /*payloadBuffer*/) {
Lock guard(m_mutex);
m_gotMonitor = true;
// no data, only notify
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
virtual MonitorElement::shared_pointer poll() {
Lock guard(m_mutex);
// TODO PVAS when available
bool gotMonitor = m_gotMonitor;
m_gotMonitor = false;
if (gotMonitor)
return m_monitorElement;
else
return m_nullMonitorElement;
}
virtual void release(MonitorElement::shared_pointer const & /*monitorElement*/) {
// noop
}
Status start() {
return Status::Ok;
}
Status stop() {
return Status::Ok;
}
void destroy() {
}
};
class MonitorStrategyEntire :
public MonitorStrategy,
public std::tr1::enable_shared_from_this<MonitorStrategyEntire>
{
private:
MonitorRequester::shared_pointer m_callback;
bool m_gotMonitor;
Mutex m_mutex;
MonitorElement::shared_pointer m_nullMonitorElement;
MonitorElement::shared_pointer m_monitorElement;
public:
MonitorStrategyEntire(MonitorRequester::shared_pointer const & callback) :
m_callback(callback), m_gotMonitor(false),
m_mutex(), m_monitorElement(new MonitorElement())
{
}
virtual ~MonitorStrategyEntire()
{
}
virtual void init(StructureConstPtr const & structure) {
Lock guard(m_mutex);
// reuse on reconnect
if (m_monitorElement->pvStructurePtr.get() == 0 ||
*(m_monitorElement->pvStructurePtr->getStructure().get()) != *(structure.get()))
{
m_monitorElement->pvStructurePtr = getPVDataCreate()->createPVStructure(structure);
int numberFields = m_monitorElement->pvStructurePtr->getNumberFields();
m_monitorElement->changedBitSet.reset(new BitSet(numberFields));
m_monitorElement->overrunBitSet.reset(new BitSet(numberFields));
}
}
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {
Lock guard(m_mutex);
// simply deserialize and notify
m_monitorElement->changedBitSet->deserialize(payloadBuffer, transport.get());
m_monitorElement->pvStructurePtr->deserialize(payloadBuffer, transport.get(), m_monitorElement->changedBitSet.get());
m_monitorElement->overrunBitSet->deserialize(payloadBuffer, transport.get());
m_gotMonitor = true;
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
virtual MonitorElement::shared_pointer poll() {
Lock guard(m_mutex);
// TODO PVAS when available
bool gotMonitor = m_gotMonitor;
m_gotMonitor = false;
if (gotMonitor)
return m_monitorElement;
else
return m_nullMonitorElement;
}
virtual void release(MonitorElement::shared_pointer const & /*monitorElement*/) {
// noop
}
Status start() {
Lock guard(m_mutex);
m_gotMonitor = false;
return Status::Ok;
}
Status stop() {
return Status::Ok;
}
void destroy() {
}
};
class MonitorStrategySingle :
public MonitorStrategy,
public std::tr1::enable_shared_from_this<MonitorStrategySingle>
{
private:
MonitorRequester::shared_pointer m_callback;
bool m_gotMonitor;
Mutex m_mutex;
BitSet::shared_pointer m_structureChangeBitSet;
BitSet::shared_pointer m_structureOverrunBitSet;
bool m_needToCompress;
MonitorElement::shared_pointer m_nullMonitorElement;
MonitorElement::shared_pointer m_monitorElement;
public:
MonitorStrategySingle(MonitorRequester::shared_pointer const & callback) :
m_callback(callback), m_gotMonitor(false), m_mutex(),
m_needToCompress(false), m_monitorElement(new MonitorElement())
{
}
virtual ~MonitorStrategySingle()
{
}
virtual void init(StructureConstPtr const & structure) {
Lock guard(m_mutex);
// reuse on reconnect
if (m_monitorElement->pvStructurePtr.get() == 0 ||
*(m_monitorElement->pvStructurePtr->getStructure().get()) == *(structure.get()))
{
m_monitorElement->pvStructurePtr = getPVDataCreate()->createPVStructure(structure);
int numberFields = m_monitorElement->pvStructurePtr->getNumberFields();
m_monitorElement->changedBitSet.reset(new BitSet(numberFields));
m_monitorElement->overrunBitSet.reset(new BitSet(numberFields));
m_structureChangeBitSet.reset(new BitSet(numberFields));
m_structureOverrunBitSet.reset(new BitSet(numberFields));
}
}
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {
Lock guard(m_mutex);
if (!m_gotMonitor)
{
// simply deserialize and notify
m_monitorElement->changedBitSet->deserialize(payloadBuffer, transport.get());
m_monitorElement->pvStructurePtr->deserialize(payloadBuffer, transport.get(), m_monitorElement->changedBitSet.get());
m_monitorElement->overrunBitSet->deserialize(payloadBuffer, transport.get());
m_gotMonitor = true;
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
else
{
// deserialize first
m_structureChangeBitSet->deserialize(payloadBuffer, transport.get());
m_monitorElement->pvStructurePtr->deserialize(payloadBuffer, transport.get(), m_structureChangeBitSet.get());
m_structureOverrunBitSet->deserialize(payloadBuffer, transport.get());
// OR local overrun
// TODO should work only on uncompressed
m_monitorElement->overrunBitSet->or_and(*m_structureChangeBitSet.get(), *m_monitorElement->changedBitSet.get());
// OR new changes
*(m_monitorElement->changedBitSet) |= *m_structureChangeBitSet.get();
// OR remote overrun
*(m_monitorElement->overrunBitSet) |= *m_structureOverrunBitSet.get();
}
}
virtual MonitorElement::shared_pointer poll() {
Lock guard(m_mutex);
if (!m_gotMonitor) return m_nullMonitorElement;
m_gotMonitor = false;
// compress if needed
if (m_needToCompress)
{
BitSetUtil::compress(m_monitorElement->changedBitSet, m_monitorElement->pvStructurePtr);
BitSetUtil::compress(m_monitorElement->overrunBitSet, m_monitorElement->pvStructurePtr);
m_needToCompress = false;
}
return m_monitorElement;
}
virtual void release(MonitorElement::shared_pointer const & /*monitorElement*/) {
// noop
}
Status start() {
Lock guard(m_mutex);
// TODO no such check in Java
if (!m_monitorElement->changedBitSet.get())
return Status(Status::STATUSTYPE_ERROR, "Monitor not connected.");
m_gotMonitor = false;
m_monitorElement->changedBitSet->clear();
m_monitorElement->overrunBitSet->clear();
return Status::Ok;
}
Status stop() {
return Status::Ok;
}
void destroy() {
}
};
class MonitorStrategyQueue :
public MonitorStrategy,
public std::tr1::enable_shared_from_this<MonitorStrategyQueue>
{
private:
int32 m_queueSize;
StructureConstPtr m_lastStructure;
//MonitorQueue::shared_pointer m_monitorQueue;
MonitorRequester::shared_pointer m_callback;
Mutex m_mutex;
BitSet::shared_pointer m_bitSet1;
BitSet::shared_pointer m_bitSet2;
bool m_overrunInProgress;
bool m_needToReleaseFirst;
MonitorElement::shared_pointer m_nullMonitorElement;
MonitorElement::shared_pointer m_monitorElement;
public:
MonitorStrategyQueue(MonitorRequester::shared_pointer const & callback, int32 queueSize) :
m_queueSize(queueSize), m_lastStructure(),// m_monitorQueue(),
m_callback(callback), m_mutex(),
m_bitSet1(), m_bitSet2(), m_overrunInProgress(false),
m_needToReleaseFirst(false),
m_nullMonitorElement(), m_monitorElement()
{
if (queueSize <= 1)
throw std::invalid_argument("queueSize <= 1");
}
virtual ~MonitorStrategyQueue()
{
}
virtual void init(StructureConstPtr const & structure) {
Lock guard(m_mutex);
// reuse on reconnect
if (m_lastStructure.get() == 0 ||
*(m_lastStructure.get()) == *(structure.get()))
{
/*
MonitorElement[] monitorElements = new MonitorElement[queueSize];
for(int i=0; i<queueSize; i++) {
PVStructure pvNew = pvDataCreate.createPVStructure(structure);
monitorElements[i] = MonitorQueueFactory.createMonitorElement(pvNew);
}
monitorQueue = MonitorQueueFactory.create(monitorElements);
lastStructure = structure;
}
*/
}
}
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {
bool notify = false;
{
Lock guard(m_mutex);
// if in overrun mode, check if some is free
if (m_overrunInProgress)
{
MonitorElementPtr newElement;//TODO = monitorQueue.getFree();
if (newElement.get() != 0)
{
// take new, put current in use
PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr;
getConvert()->copy(pvStructure, newElement->pvStructurePtr);
BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure);
BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure);
//monitorQueue.setUsed(monitorElement);
m_monitorElement = newElement;
notify = true;
m_overrunInProgress = false;
}
}
}
if (notify)
{
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
{
Lock guard(m_mutex);
// setup current fields
PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr;
BitSet::shared_pointer changedBitSet = m_monitorElement->changedBitSet;
BitSet::shared_pointer overrunBitSet = m_monitorElement->overrunBitSet;
// special treatment if in overrun state
if (m_overrunInProgress)
{
// lazy init
if (m_bitSet1.get() == 0) m_bitSet1.reset(new BitSet(changedBitSet->size()));
if (m_bitSet2.get() == 0) m_bitSet2.reset(new BitSet(overrunBitSet->size()));
m_bitSet1->deserialize(payloadBuffer, transport.get());
pvStructure->deserialize(payloadBuffer, transport.get(), m_bitSet1.get());
m_bitSet2->deserialize(payloadBuffer, transport.get());
// OR local overrun
// TODO this does not work perfectly... uncompressed bitSets should be used!!!
overrunBitSet->or_and(*(changedBitSet.get()), *(m_bitSet1.get()));
// OR remove change
*(changedBitSet.get()) |= *(m_bitSet1.get());
// OR remote overrun
*(overrunBitSet.get()) |= *(m_bitSet2.get());
}
else
{
// deserialize changedBitSet and data, and overrun bit set
changedBitSet->deserialize(payloadBuffer, transport.get());
pvStructure->deserialize(payloadBuffer, transport.get(), changedBitSet.get());
overrunBitSet->deserialize(payloadBuffer, transport.get());
}
// prepare next free (if any)
MonitorElementPtr newElement; // = monitorQueue.getFree();
if (newElement.get() == 0) {
m_overrunInProgress = true;
return;
}
// if there was overrun in progress we manipulated bitSets... compress them
if (m_overrunInProgress) {
BitSetUtil::compress(changedBitSet, pvStructure);
BitSetUtil::compress(overrunBitSet, pvStructure);
m_overrunInProgress = false;
}
getConvert()->copy(pvStructure, newElement->pvStructurePtr);
//monitorQueue.setUsed(monitorElement);
m_monitorElement = newElement;
}
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
virtual MonitorElement::shared_pointer poll() {
Lock guard(m_mutex);
if (m_needToReleaseFirst)
return m_nullMonitorElement;
MonitorElementPtr retVal;// = monitorQueue.getUsed();
if (retVal.get() != 0)
{
m_needToReleaseFirst = true;
return retVal;
}
// if in overrun mode and we have free, make it as last element
if (m_overrunInProgress)
{
MonitorElementPtr newElement;// = monitorQueue.getFree();
if (newElement.get() != 0)
{
// take new, put current in use
PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr;
getConvert()->copy(pvStructure, newElement->pvStructurePtr);
BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure);
BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure);
//monitorQueue.setUsed(monitorElement);
m_monitorElement = newElement;
m_overrunInProgress = false;
m_needToReleaseFirst = true;
return m_nullMonitorElement; // TODO monitorQueue.getUsed();
}
else
return m_nullMonitorElement; // should never happen since queueSize >= 2, but a client not calling release can do this
}
else
return m_nullMonitorElement;
virtual void init(StructureConstPtr const & structure) {
Lock guard(m_mutex);
m_pvStructure = getPVDataCreate()->createPVStructure(structure);
int numberFields = m_pvStructure->getNumberFields();
m_monitorElement->pvStructurePtr = getPVDataCreate()->createPVStructure(structure);
m_monitorElement->changedBitSet.reset(new BitSet(numberFields));
m_monitorElement->overrunBitSet.reset(new BitSet(numberFields));
m_responseChangeBitSet.reset(new BitSet(numberFields));
m_responseOverrunBitSet.reset(new BitSet(numberFields));
m_structureChangeBitSet.reset(new BitSet(numberFields));
m_structureOverrunBitSet.reset(new BitSet(numberFields));
}
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {
bool monitorEvent(false);
{
Lock guard(m_mutex);
// deserialize first
m_responseChangeBitSet->deserialize(payloadBuffer, transport.get());
m_pvStructure->deserialize(
payloadBuffer,
transport.get(),
m_responseChangeBitSet.get());
m_responseOverrunBitSet->deserialize(payloadBuffer, transport.get());
// OR local overrun
// OR new changes
m_structureOverrunBitSet->or_and(
*m_responseChangeBitSet.get(),*m_structureChangeBitSet.get());
// OR new changes
*(m_structureChangeBitSet) |= (*m_responseChangeBitSet);
// OR remote overrun
*(m_structureOverrunBitSet) |= (*m_responseOverrunBitSet);
if (!m_gotMonitor)
{
m_gotMonitor = true;
monitorEvent = true;
}
}
if(monitorEvent) EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
virtual MonitorElement::shared_pointer poll() {
Lock guard(m_mutex);
if (!m_gotMonitor) return m_nullMonitorElement;
m_gotMonitor = false;
convert->copyStructure(m_pvStructure,m_monitorElement->pvStructurePtr);
BitSetUtil::compress(m_structureChangeBitSet,m_pvStructure);
BitSetUtil::compress(m_structureOverrunBitSet,m_pvStructure);
(*m_monitorElement->changedBitSet) = (*m_structureChangeBitSet);
(*m_monitorElement->overrunBitSet) = (*m_structureOverrunBitSet);
m_structureChangeBitSet->clear();
m_structureOverrunBitSet->clear();
return m_monitorElement;
}
virtual void release(MonitorElement::shared_pointer const & /*monitorElement*/) {
// noop
}
Status start() {
Lock guard(m_mutex);
// TODO no such check in Java
if (!m_monitorElement->changedBitSet.get())
return Status(Status::STATUSTYPE_ERROR, "Monitor not connected.");
m_gotMonitor = false;
m_monitorElement->changedBitSet->clear();
m_monitorElement->overrunBitSet->clear();
return Status::Ok;
}
Status stop() {
return Status::Ok;
}
void destroy() {
}
};
virtual void release(MonitorElement::shared_pointer const & /*monitorElement*/) {
Lock guard(m_mutex);
//monitorQueue.releaseUsed(monitorElement);
m_needToReleaseFirst = false;
}
typedef Queue<MonitorElement> MonitorElementQueue;
typedef std::tr1::shared_ptr<MonitorElementQueue> MonitorElementQueuePtr;
Status start() {
Lock guard(m_mutex);
m_overrunInProgress = false;
//monitorQueue.clear();
//m_monitorElement = monitorQueue.getFree();
m_needToReleaseFirst = false;
return Status::Ok;
}
class MultipleElementQueue :
public ElementQueue,
public std::tr1::enable_shared_from_this<MultipleElementQueue>
{
private:
MonitorRequester::shared_pointer m_callback;
int queueSize;
bool queueIsFull;
MonitorElementQueuePtr queue;
BitSetPtr changedBitSet;
BitSetPtr overrunBitSet;
MonitorElementPtr latestMonitorElement;
Mutex m_mutex;
Status stop() {
return Status::Ok;
}
void destroy() {
}
};
public:
MultipleElementQueue(MonitorRequester::shared_pointer const & callback,int queueSize)
:
m_callback(callback),
queueSize(queueSize),
queueIsFull(false)
{
}
virtual ~MultipleElementQueue()
{
}
virtual void init(StructureConstPtr const & structure) {
Lock guard(m_mutex);
size_t nfields = 0;
std::vector<MonitorElementPtr> monitorElementArray;
monitorElementArray.reserve(queueSize);
for(int i=0; i<queueSize; i++) {
PVStructurePtr pvStructure = getPVDataCreate()->createPVStructure(structure);
if(nfields==0) nfields = pvStructure->getNumberFields();
MonitorElementPtr monitorElement(
new MonitorElement(pvStructure));
monitorElementArray.push_back(monitorElement);
}
queue.reset(new MonitorElementQueue(monitorElementArray));
changedBitSet.reset(new BitSet(nfields));
overrunBitSet.reset(new BitSet(nfields));
}
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {
{
Lock guard(m_mutex);
if(queueIsFull) {
MonitorElementPtr monitorElement = latestMonitorElement;
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
changedBitSet->deserialize(payloadBuffer, transport.get());
pvStructure->deserialize(
payloadBuffer,
transport.get(),
changedBitSet.get());
overrunBitSet->deserialize(payloadBuffer, transport.get());
(*monitorElement->changedBitSet)|= (*changedBitSet);
(*monitorElement->overrunBitSet)|= (*changedBitSet);
changedBitSet->clear();
overrunBitSet->clear();
return;
}
MonitorElementPtr monitorElement = queue->getFree();
if(monitorElement==NULL) {
throw std::logic_error(String("RealQueue::dataChanged() logic error"));
}
if(queue->getNumberFree()==0){
queueIsFull = true;
latestMonitorElement = monitorElement;
}
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
changedBitSet->deserialize(payloadBuffer, transport.get());
pvStructure->deserialize(
payloadBuffer,
transport.get(),
changedBitSet.get());
overrunBitSet->deserialize(payloadBuffer, transport.get());
BitSetUtil::compress(changedBitSet,pvStructure);
BitSetUtil::compress(overrunBitSet,pvStructure);
monitorElement->changedBitSet->clear();
(*monitorElement->changedBitSet)|=(*changedBitSet);
monitorElement->overrunBitSet->clear();
(*monitorElement->overrunBitSet)|=(*overrunBitSet);
changedBitSet->clear();
overrunBitSet->clear();
queue->setUsed(monitorElement);
}
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
virtual MonitorElement::shared_pointer poll() {
Lock guard(m_mutex);
return queue->getUsed();
}
virtual void release(MonitorElement::shared_pointer const & currentElement) {
Lock guard(m_mutex);
if(queueIsFull) {
MonitorElementPtr monitorElement = latestMonitorElement;
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
BitSetUtil::compress(monitorElement->changedBitSet,pvStructure);
BitSetUtil::compress(monitorElement->overrunBitSet,pvStructure);
queueIsFull = false;
latestMonitorElement.reset();
}
queue->releaseUsed(currentElement);
}
Status start() {
Lock guard(m_mutex);
queue->clear();
changedBitSet->clear();
overrunBitSet->clear();
queueIsFull = false;
return Status::Ok;
}
Status stop() {
return Status::Ok;
}
void destroy() {
}
};
@@ -2321,7 +2070,7 @@ namespace epics {
PVStructure::shared_pointer m_pvRequest;
std::tr1::shared_ptr<MonitorStrategy> m_monitorStrategy;
std::tr1::shared_ptr<ElementQueue> m_monitorStrategy;
ChannelMonitorImpl(ChannelImpl::shared_pointer const & channel, MonitorRequester::shared_pointer const & monitorRequester, PVStructure::shared_pointer const & pvRequest) :
BaseRequestImpl(channel, monitorRequester),
@@ -2341,37 +2090,31 @@ namespace epics {
return;
}
int queueSize = 2;
PVFieldPtr pvField = m_pvRequest->getSubField("record.queueSize");
if (pvField.get()) {
PVStringPtr pvString = dynamic_pointer_cast<PVString>(pvField);
if (pvString.get())
{
String value = pvString->get();
istringstream buffer(value);
if ((buffer >> queueSize).fail())
{
Status failedToConvert(Status::STATUSTYPE_ERROR, "queueSize type is not a valid integer");
Monitor::shared_pointer thisPointer = dynamic_pointer_cast<Monitor>(shared_from_this());
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(failedToConvert, thisPointer, StructureConstPtr()));
return;
}
}
}
int queueSize = 2;
PVFieldPtr pvField = m_pvRequest->getSubField("record._options");
if (pvField.get()) {
PVStructurePtr pvOptions = static_pointer_cast<PVStructure>(pvField);
pvField = pvOptions->getSubField("queueSize");
if (pvField.get()) {
PVStringPtr pvString = pvOptions->getStringField("queueSize");
if(pvString.get()!=NULL) {
int32 size;
std::stringstream ss;
ss << pvString->get();
ss >> size;
queueSize = size;
}
}
}
BaseRequestImpl::activate();
if (queueSize == -1)
m_monitorStrategy.reset(new MonitorStrategyNotify(m_monitorRequester));
else if (queueSize == 0) // 0 means all (old v3 style), some sending optimization can be done (not to send bit-sets)
m_monitorStrategy.reset(new MonitorStrategyEntire(m_monitorRequester));
else //if (queueSize == 1)
m_monitorStrategy.reset(new MonitorStrategySingle(m_monitorRequester));
/* else
m_monitorStrategy.reset(new MonitorStrategyQueue(queueSize));
*/
if(queueSize<1) queueSize = 1;
if (queueSize == 1) {
m_monitorStrategy.reset(new SingleElementQueue(m_monitorRequester));
} else {
m_monitorStrategy.reset(new MultipleElementQueue(m_monitorRequester,queueSize));
}
// subscribe