reverted back previous monitor queue code and fully implemented it, now Queue exits in pvData

This commit is contained in:
Matej Sekoranja
2014-08-11 23:14:13 +02:00
parent 742f043491
commit b51b0798bc

View File

@@ -2059,145 +2059,240 @@ namespace epics {
class ElementQueue : public Monitor {
class MonitorStrategy : public Monitor {
public:
virtual ~ElementQueue() {};
virtual ~MonitorStrategy() {};
virtual void init(StructureConstPtr const & structure) = 0;
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0;
};
typedef Queue<MonitorElement> MonitorElementQueue;
typedef std::tr1::shared_ptr<MonitorElementQueue> MonitorElementQueuePtr;
class MultipleElementQueue :
public ElementQueue,
public std::tr1::enable_shared_from_this<MultipleElementQueue>
{
private:
class MonitorStrategyQueue :
public MonitorStrategy,
public std::tr1::enable_shared_from_this<MonitorStrategyQueue>
{
private:
int32 m_queueSize;
StructureConstPtr m_lastStructure;
std::tr1::shared_ptr<MonitorElementQueue> m_monitorQueue;
MonitorRequester::shared_pointer m_callback;
int queueSize;
bool queueIsFull;
MonitorElementQueuePtr queue;
BitSetPtr changedBitSet;
BitSetPtr overrunBitSet;
MonitorElementPtr latestMonitorElement;
Mutex m_mutex;
public:
MultipleElementQueue(MonitorRequester::shared_pointer const & callback,int queueSize)
:
m_callback(callback),
queueSize(queueSize),
queueIsFull(false)
{
}
virtual ~MultipleElementQueue()
{
}
Mutex m_mutex;
BitSet::shared_pointer m_bitSet1;
BitSet::shared_pointer m_bitSet2;
bool m_overrunInProgress;
bool m_needToReleaseFirst;
MonitorElement::shared_pointer m_nullMonitorElement;
MonitorElement::shared_pointer m_monitorElement;
public:
MonitorStrategyQueue(MonitorRequester::shared_pointer const & callback, int32 queueSize) :
m_queueSize(queueSize), m_lastStructure(), m_monitorQueue(),
m_callback(callback), m_mutex(),
m_bitSet1(), m_bitSet2(), m_overrunInProgress(false),
m_needToReleaseFirst(false),
m_nullMonitorElement(), m_monitorElement()
{
if (queueSize <= 1)
throw std::invalid_argument("queueSize <= 1");
}
virtual ~MonitorStrategyQueue()
{
}
virtual void init(StructureConstPtr const & structure) {
Lock guard(m_mutex);
size_t nfields = 0;
std::vector<MonitorElementPtr> monitorElementArray;
monitorElementArray.reserve(queueSize);
for(int i=0; i<queueSize; i++) {
PVStructurePtr pvStructure = getPVDataCreate()->createPVStructure(structure);
if(nfields==0) nfields = pvStructure->getNumberFields();
MonitorElementPtr monitorElement(
new MonitorElement(pvStructure));
monitorElementArray.push_back(monitorElement);
}
queue.reset(new MonitorElementQueue(monitorElementArray));
changedBitSet.reset(new BitSet(nfields));
overrunBitSet.reset(new BitSet(nfields));
}
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {
{
Lock guard(m_mutex);
if(queueIsFull) {
MonitorElementPtr monitorElement = latestMonitorElement;
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
changedBitSet->deserialize(payloadBuffer, transport.get());
pvStructure->deserialize(
payloadBuffer,
transport.get(),
changedBitSet.get());
overrunBitSet->deserialize(payloadBuffer, transport.get());
(*monitorElement->changedBitSet)|= (*changedBitSet);
(*monitorElement->overrunBitSet)|= (*changedBitSet);
changedBitSet->clear();
overrunBitSet->clear();
return;
// reuse on reconnect
if (m_lastStructure.get() == 0 ||
*(m_lastStructure.get()) == *(structure.get()))
{
std::vector<MonitorElementPtr> monitorElementArray;
monitorElementArray.reserve(m_queueSize);
for (int32 i = 0; i < m_queueSize; i++)
{
PVStructure::shared_pointer pvStructure = getPVDataCreate()->createPVStructure(structure);
MonitorElement::shared_pointer monitorElement(new MonitorElement(pvStructure));
monitorElementArray.push_back(monitorElement);
}
MonitorElementPtr monitorElement = queue->getFree();
if(monitorElement==NULL) {
throw std::logic_error(std::string("RealQueue::dataChanged() logic error"));
}
if(queue->getNumberFree()==0){
queueIsFull = true;
latestMonitorElement = monitorElement;
}
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
changedBitSet->deserialize(payloadBuffer, transport.get());
pvStructure->deserialize(
payloadBuffer,
transport.get(),
changedBitSet.get());
overrunBitSet->deserialize(payloadBuffer, transport.get());
BitSetUtil::compress(changedBitSet,pvStructure);
BitSetUtil::compress(overrunBitSet,pvStructure);
monitorElement->changedBitSet->clear();
(*monitorElement->changedBitSet)|=(*changedBitSet);
monitorElement->overrunBitSet->clear();
(*monitorElement->overrunBitSet)|=(*overrunBitSet);
changedBitSet->clear();
overrunBitSet->clear();
queue->setUsed(monitorElement);
}
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
m_monitorQueue.reset(new MonitorElementQueue(monitorElementArray));
m_lastStructure = structure;
}
}
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {
bool notify = false;
{
Lock guard(m_mutex);
// if in overrun mode, check if some is free
if (m_overrunInProgress)
{
MonitorElementPtr newElement = m_monitorQueue->getFree();
if (newElement.get() != 0)
{
// take new, put current in use
PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr;
getConvert()->copy(pvStructure, newElement->pvStructurePtr);
BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure);
BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure);
m_monitorQueue->setUsed(m_monitorElement);
m_monitorElement = newElement;
notify = true;
m_overrunInProgress = false;
}
}
}
if (notify)
{
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
{
Lock guard(m_mutex);
// setup current fields
PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr;
BitSet::shared_pointer changedBitSet = m_monitorElement->changedBitSet;
BitSet::shared_pointer overrunBitSet = m_monitorElement->overrunBitSet;
// special treatment if in overrun state
if (m_overrunInProgress)
{
// lazy init
if (m_bitSet1.get() == 0) m_bitSet1.reset(new BitSet(changedBitSet->size()));
if (m_bitSet2.get() == 0) m_bitSet2.reset(new BitSet(overrunBitSet->size()));
m_bitSet1->deserialize(payloadBuffer, transport.get());
pvStructure->deserialize(payloadBuffer, transport.get(), m_bitSet1.get());
m_bitSet2->deserialize(payloadBuffer, transport.get());
// OR local overrun
// TODO this does not work perfectly if bitSet is compressed !!!
// uncompressed bitSets should be used !!!
overrunBitSet->or_and(*(changedBitSet.get()), *(m_bitSet1.get()));
// OR remove change
*(changedBitSet.get()) |= *(m_bitSet1.get());
// OR remote overrun
*(overrunBitSet.get()) |= *(m_bitSet2.get());
}
else
{
// deserialize changedBitSet and data, and overrun bit set
changedBitSet->deserialize(payloadBuffer, transport.get());
pvStructure->deserialize(payloadBuffer, transport.get(), changedBitSet.get());
overrunBitSet->deserialize(payloadBuffer, transport.get());
}
// prepare next free (if any)
MonitorElementPtr newElement = m_monitorQueue->getFree();
if (newElement.get() == 0) {
m_overrunInProgress = true;
return;
}
// if there was overrun in progress we manipulated bitSets... compress them
if (m_overrunInProgress) {
BitSetUtil::compress(changedBitSet, pvStructure);
BitSetUtil::compress(overrunBitSet, pvStructure);
m_overrunInProgress = false;
}
getConvert()->copy(pvStructure, newElement->pvStructurePtr);
m_monitorQueue->setUsed(m_monitorElement);
m_monitorElement = newElement;
}
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
virtual MonitorElement::shared_pointer poll() {
Lock guard(m_mutex);
return queue->getUsed();
}
virtual void release(MonitorElement::shared_pointer const & currentElement) {
Lock guard(m_mutex);
if(queueIsFull) {
MonitorElementPtr monitorElement = latestMonitorElement;
PVStructurePtr pvStructure = monitorElement->pvStructurePtr;
BitSetUtil::compress(monitorElement->changedBitSet,pvStructure);
BitSetUtil::compress(monitorElement->overrunBitSet,pvStructure);
queueIsFull = false;
latestMonitorElement.reset();
}
queue->releaseUsed(currentElement);
}
Status start() {
Lock guard(m_mutex);
queue->clear();
changedBitSet->clear();
overrunBitSet->clear();
queueIsFull = false;
return Status::Ok;
}
Status stop() {
return Status::Ok;
}
void destroy() {
}
};
if (m_needToReleaseFirst)
return m_nullMonitorElement;
MonitorElementPtr retVal = m_monitorQueue->getUsed();
if (retVal.get() != 0)
{
m_needToReleaseFirst = true;
return retVal;
}
// if in overrun mode and we have free, make it as last element
if (m_overrunInProgress)
{
MonitorElementPtr newElement = m_monitorQueue->getFree();
if (newElement.get() != 0)
{
// take new, put current in use
PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr;
getConvert()->copy(pvStructure, newElement->pvStructurePtr);
BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure);
BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure);
m_monitorQueue->setUsed(m_monitorElement);
m_monitorElement = newElement;
m_overrunInProgress = false;
m_needToReleaseFirst = true;
return m_monitorQueue->getUsed();
}
else
return m_nullMonitorElement; // should never happen since queueSize >= 2, but a client not calling release can do this
}
else
return m_nullMonitorElement;
}
virtual void release(MonitorElement::shared_pointer const & monitorElement) {
Lock guard(m_mutex);
m_monitorQueue->releaseUsed(monitorElement);
m_needToReleaseFirst = false;
}
Status start() {
Lock guard(m_mutex);
m_overrunInProgress = false;
m_monitorQueue->clear();
m_monitorElement = m_monitorQueue->getFree();
m_needToReleaseFirst = false;
return Status::Ok;
}
Status stop() {
return Status::Ok;
}
void destroy() {
}
};
@@ -2213,7 +2308,7 @@ namespace epics {
PVStructure::shared_pointer m_pvRequest;
std::tr1::shared_ptr<ElementQueue> m_ElementQueue;
std::tr1::shared_ptr<MonitorStrategy> m_monitorStrategy;
ChannelMonitorImpl(
ChannelImpl::shared_pointer const & channel,
@@ -2256,8 +2351,8 @@ namespace epics {
BaseRequestImpl::activate();
if(queueSize<2) queueSize = 2;
m_ElementQueue.reset(new MultipleElementQueue(m_monitorRequester,queueSize));
if (queueSize<2) queueSize = 2;
m_monitorStrategy.reset(new MonitorStrategyQueue(m_monitorRequester, queueSize));
// subscribe
try {
@@ -2329,7 +2424,7 @@ namespace epics {
dynamic_pointer_cast<const Structure>(
transport->cachedDeserialize(payloadBuffer)
);
m_ElementQueue->init(structure);
m_monitorStrategy->init(structure);
// notify
Monitor::shared_pointer thisChannelMonitor = dynamic_pointer_cast<Monitor>(shared_from_this());
@@ -2352,7 +2447,7 @@ namespace epics {
}
else
{
m_ElementQueue->response(transport, payloadBuffer);
m_monitorStrategy->response(transport, payloadBuffer);
}
}
@@ -2403,7 +2498,7 @@ namespace epics {
if (!m_initialized)
return BaseRequestImpl::notInitializedStatus;
m_ElementQueue->start();
m_monitorStrategy->start();
// start == process + get
if (!startRequest(QOS_PROCESS | QOS_GET))
@@ -2429,7 +2524,7 @@ namespace epics {
if (!m_initialized)
return BaseRequestImpl::notInitializedStatus;
m_ElementQueue->stop();
m_monitorStrategy->stop();
// stop == process + no get
if (!startRequest(QOS_PROCESS))
@@ -2454,12 +2549,12 @@ namespace epics {
virtual MonitorElement::shared_pointer poll()
{
return m_ElementQueue->poll();
return m_monitorStrategy->poll();
}
virtual void release(MonitorElement::shared_pointer const & monitorElement)
{
m_ElementQueue->release(monitorElement);
m_monitorStrategy->release(monitorElement);
}
virtual void lock()