client: reimplemented handling monitors

This commit is contained in:
Matej Sekoranja
2015-02-01 00:45:13 +01:00
parent fd1e0dd636
commit 58b04509de

View File

@@ -2089,8 +2089,9 @@ namespace epics {
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0;
};
typedef vector<MonitorElement::shared_pointer> FreeElementQueue;
typedef queue<MonitorElement::shared_pointer> MonitorElementQueue;
typedef Queue<MonitorElement> MonitorElementQueue;
class MonitorStrategyQueue :
public MonitorStrategy,
@@ -2101,7 +2102,8 @@ namespace epics {
int32 m_queueSize;
StructureConstPtr m_lastStructure;
std::tr1::shared_ptr<MonitorElementQueue> m_monitorQueue;
FreeElementQueue m_freeQueue;
MonitorElementQueue m_monitorQueue;
MonitorRequester::shared_pointer m_callback;
@@ -2110,24 +2112,30 @@ namespace epics {
BitSet::shared_pointer m_bitSet1;
BitSet::shared_pointer m_bitSet2;
MonitorElement::shared_pointer m_overrunElement;
bool m_overrunInProgress;
bool m_needToReleaseFirst;
MonitorElement::shared_pointer m_nullMonitorElement;
MonitorElement::shared_pointer m_monitorElement;
PVStructure::shared_pointer m_up2datePVStructure;
public:
MonitorStrategyQueue(MonitorRequester::shared_pointer const & callback, int32 queueSize) :
m_queueSize(queueSize), m_lastStructure(), m_monitorQueue(),
m_queueSize(queueSize), m_lastStructure(),
m_freeQueue(),
m_monitorQueue(),
m_callback(callback), m_mutex(),
m_bitSet1(), m_bitSet2(), m_overrunInProgress(false),
m_needToReleaseFirst(false),
m_nullMonitorElement(), m_monitorElement()
m_nullMonitorElement()
{
if (queueSize <= 1)
throw std::invalid_argument("queueSize <= 1");
m_freeQueue.reserve(m_queueSize);
// TODO array based deque
//m_monitorQueue.reserve(m_queueSize);
}
virtual ~MonitorStrategyQueue()
@@ -2141,20 +2149,67 @@ namespace epics {
if (m_lastStructure.get() == 0 ||
*(m_lastStructure.get()) == *(structure.get()))
{
std::vector<MonitorElementPtr> monitorElementArray;
monitorElementArray.reserve(m_queueSize);
for (int32 i = 0; i < m_queueSize; i++)
{
PVStructure::shared_pointer pvStructure = getPVDataCreate()->createPVStructure(structure);
MonitorElement::shared_pointer monitorElement(new MonitorElement(pvStructure));
monitorElementArray.push_back(monitorElement);
m_freeQueue.push_back(monitorElement);
}
m_monitorQueue.reset(new MonitorElementQueue(monitorElementArray));
m_lastStructure = structure;
}
}
void partialCopy(PVStructure::shared_pointer const & from,
PVStructure::shared_pointer const & to,
BitSet::shared_pointer const & maskBitSet,
bool inverse = false) {
size_t numberFields = from->getNumberFields();
size_t offset = from->getFieldOffset();
int32 next = inverse ?
maskBitSet->nextClearBit(static_cast<uint32>(offset)) :
maskBitSet->nextSetBit(static_cast<uint32>(offset));
// no more changes or no changes in this structure
if(next<0||next>=static_cast<int32>(offset+numberFields)) return;
// entire structure
if(static_cast<int32>(offset)==next) {
getConvert()->copy(from, to);
return;
}
PVFieldPtrArray const & fromPVFields = from->getPVFields();
PVFieldPtrArray const & toPVFields = to->getPVFields();
size_t fieldsSize = fromPVFields.size();
for(size_t i = 0; i<fieldsSize; i++) {
PVFieldPtr pvField = fromPVFields[i];
offset = pvField->getFieldOffset();
int32 inumberFields = static_cast<int32>(pvField->getNumberFields());
next = inverse ?
maskBitSet->nextClearBit(static_cast<uint32>(offset)) :
maskBitSet->nextSetBit(static_cast<uint32>(offset));
// no more changes
if(next<0) return;
// no change in this pvField
if(next>=static_cast<int32>(offset+inumberFields)) continue;
// serialize field or fields
if(inumberFields==1) {
getConvert()->copy(pvField, toPVFields[i]);
} else {
PVStructure::shared_pointer fromPVStructure = std::tr1::static_pointer_cast<PVStructure>(pvField);
PVStructure::shared_pointer toPVStructure = std::tr1::static_pointer_cast<PVStructure>(toPVFields[i]);
partialCopy(fromPVStructure, toPVStructure, maskBitSet);
}
}
}
/*
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {
bool notify = false;
@@ -2165,7 +2220,7 @@ namespace epics {
// if in overrun mode, check if some is free
if (m_overrunInProgress)
{
MonitorElementPtr newElement = m_monitorQueue->getFree();
MonitorElementPtr newElement = m_monitorQueue.getFree();
if (newElement.get() != 0)
{
// take new, put current in use
@@ -2174,7 +2229,7 @@ namespace epics {
BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure);
BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure);
m_monitorQueue->setUsed(m_monitorElement);
m_monitorQueue.setUsed(m_monitorElement);
m_monitorElement = newElement;
notify = true;
@@ -2228,7 +2283,7 @@ namespace epics {
}
// prepare next free (if any)
MonitorElementPtr newElement = m_monitorQueue->getFree();
MonitorElementPtr newElement = m_monitorQueue.getFree();
if (newElement.get() == 0) {
m_overrunInProgress = true;
return;
@@ -2244,7 +2299,7 @@ namespace epics {
getConvert()->copy(pvStructure, newElement->pvStructurePtr);
m_monitorQueue->setUsed(m_monitorElement);
m_monitorQueue.setUsed(m_monitorElement);
m_monitorElement = newElement;
}
@@ -2252,59 +2307,122 @@ namespace epics {
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
*/
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {
{
// TODO do not lock deserialization
Lock guard(m_mutex);
if (m_overrunInProgress)
{
PVStructurePtr pvStructure = m_overrunElement->pvStructurePtr;
BitSet::shared_pointer changedBitSet = m_overrunElement->changedBitSet;
BitSet::shared_pointer overrunBitSet = m_overrunElement->overrunBitSet;
// lazy init
if (m_bitSet1.get() == 0) m_bitSet1.reset(new BitSet(changedBitSet->size()));
if (m_bitSet2.get() == 0) m_bitSet2.reset(new BitSet(overrunBitSet->size()));
m_bitSet1->deserialize(payloadBuffer, transport.get());
pvStructure->deserialize(payloadBuffer, transport.get(), m_bitSet1.get());
m_bitSet2->deserialize(payloadBuffer, transport.get());
// OR local overrun
// TODO this does not work perfectly if bitSet is compressed !!!
// uncompressed bitSets should be used !!!
overrunBitSet->or_and(*(changedBitSet.get()), *(m_bitSet1.get()));
// OR remove change
*(changedBitSet.get()) |= *(m_bitSet1.get());
// OR remote overrun
*(overrunBitSet.get()) |= *(m_bitSet2.get());
// m_up2datePVStructure is already set
return;
}
MonitorElementPtr newElement = m_freeQueue.back();
m_freeQueue.pop_back();
if (m_freeQueue.empty())
{
m_overrunInProgress = true;
m_overrunElement = newElement;
}
// setup current fields
PVStructurePtr pvStructure = newElement->pvStructurePtr;
BitSet::shared_pointer changedBitSet = newElement->changedBitSet;
BitSet::shared_pointer overrunBitSet = newElement->overrunBitSet;
// deserialize changedBitSet and data, and overrun bit set
changedBitSet->deserialize(payloadBuffer, transport.get());
if (m_up2datePVStructure && m_up2datePVStructure.get() != pvStructure.get())
partialCopy(m_up2datePVStructure, pvStructure, changedBitSet, true);
pvStructure->deserialize(payloadBuffer, transport.get(), changedBitSet.get());
overrunBitSet->deserialize(payloadBuffer, transport.get());
m_up2datePVStructure = pvStructure;
m_monitorQueue.push(newElement);
}
if (!m_overrunInProgress)
{
EXCEPTION_GUARD(m_callback->monitorEvent(shared_from_this()));
}
}
virtual MonitorElement::shared_pointer poll() {
Lock guard(m_mutex);
if (m_needToReleaseFirst)
return m_nullMonitorElement;
MonitorElementPtr retVal = m_monitorQueue->getUsed();
if (retVal.get() != 0)
{
m_needToReleaseFirst = true;
return retVal;
}
// if in overrun mode and we have free, make it as last element
if (m_overrunInProgress)
{
MonitorElementPtr newElement = m_monitorQueue->getFree();
if (newElement.get() != 0)
{
// take new, put current in use
PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr;
getConvert()->copy(pvStructure, newElement->pvStructurePtr);
BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure);
BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure);
m_monitorQueue->setUsed(m_monitorElement);
m_monitorElement = newElement;
m_overrunInProgress = false;
m_needToReleaseFirst = true;
return m_monitorQueue->getUsed();
}
else
return m_nullMonitorElement; // should never happen since queueSize >= 2, but a client not calling release can do this
}
else
if (m_monitorQueue.empty())
return m_nullMonitorElement;
MonitorElement::shared_pointer retVal = m_monitorQueue.front();
m_monitorQueue.pop();
return retVal;
}
// NOTE: a client must always call poll() after release() to check the presence of any new monitor elements
virtual void release(MonitorElement::shared_pointer const & monitorElement) {
Lock guard(m_mutex);
m_monitorQueue->releaseUsed(monitorElement);
m_needToReleaseFirst = false;
m_freeQueue.push_back(monitorElement);
if (m_overrunInProgress)
{
// compress bit-set
PVStructurePtr pvStructure = m_overrunElement->pvStructurePtr;
BitSetUtil::compress(m_overrunElement->changedBitSet, pvStructure);
BitSetUtil::compress(m_overrunElement->overrunBitSet, pvStructure);
m_monitorQueue.push(m_overrunElement);
m_overrunElement.reset();
m_overrunInProgress = false;
}
}
Status start() {
Lock guard(m_mutex);
while (!m_monitorQueue.empty())
{
m_freeQueue.push_back(m_monitorQueue.front());
m_monitorQueue.pop();
}
if (m_overrunElement)
{
m_freeQueue.push_back(m_overrunElement);
m_overrunElement.reset();
}
m_overrunInProgress = false;
m_monitorQueue->clear();
m_monitorElement = m_monitorQueue->getFree();
m_needToReleaseFirst = false;
return Status::Ok;
}