From 6763d932c0cdb5929973e935ba1d274917e6d7a4 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Tue, 15 Feb 2011 16:42:10 +0100 Subject: [PATCH] monitor simple queues --- .../remoteClient/clientContextImpl.cpp | 430 +++++++++++++++--- 1 file changed, 356 insertions(+), 74 deletions(-) diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index bbd4979..ea8d9fa 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -1,4 +1,5 @@ + /* clientClientContextImpl.cpp */ /* Author: Matej Sekoranja Date: 2011.1.1 */ @@ -25,6 +26,7 @@ #include #include #include +#include using namespace epics::pvData; @@ -1482,14 +1484,319 @@ namespace epics { + class MonitorStrategy : public Monitor { + public: + virtual ~MonitorStrategy() {}; + virtual void init(Structure* structure) = 0; + virtual void response(Transport* transport, ByteBuffer* payloadBuffer) = 0; + }; + + class MonitorStrategyNotify : public MonitorStrategy, public MonitorElement { + private: + + MonitorRequester* m_callback; + + bool m_gotMonitor; + Mutex m_mutex; + + public: + + MonitorStrategyNotify(MonitorRequester* callback) : + m_callback(callback), m_gotMonitor(false), m_mutex() + { + } + + virtual ~MonitorStrategyNotify() + { + } + + virtual void init(Structure* structure) { + // noop + } + + virtual void response(Transport* transport, ByteBuffer* payloadBuffer) { + Lock guard(&m_mutex); + m_gotMonitor = true; + // no data, only notify + m_callback->monitorEvent(this); + } + + virtual MonitorElement* poll() { + Lock guard(&m_mutex); + return m_gotMonitor ? this : 0; + } + + virtual void release(MonitorElement* monitorElement) { + Lock guard(&m_mutex); + m_gotMonitor = false; + } + + Status* start() { + return 0; + } + + Status* stop() { + return 0; + } + + void destroy() { + // noop + } + + // ============ MonitorElement ============ + + virtual PVStructure* getPVStructure() + { + return 0; + } + + virtual BitSet* getChangedBitSet() + { + return 0; + } + + virtual BitSet* getOverrunBitSet() + { + return 0; + } + }; + + class MonitorStrategyEntire : public MonitorStrategy, public MonitorElement { + private: + + MonitorRequester* m_callback; + + bool m_gotMonitor; + Mutex m_mutex; + + PVStructure* m_monitorElementStructure; + BitSet* m_monitorElementChangeBitSet; + BitSet* m_monitorElementOverrunBitSet; + + public: + + MonitorStrategyEntire(MonitorRequester* callback) : + m_callback(callback), m_gotMonitor(false), m_mutex(), + m_monitorElementStructure(0), + m_monitorElementChangeBitSet(0), + m_monitorElementOverrunBitSet(0) + { + } + + virtual ~MonitorStrategyEntire() + { + if (m_monitorElementStructure) delete m_monitorElementStructure; + if (m_monitorElementChangeBitSet) delete m_monitorElementChangeBitSet; + if (m_monitorElementOverrunBitSet) delete m_monitorElementOverrunBitSet; + } + + virtual void init(Structure* structure) { + Lock guard(&m_mutex); + + structure->incReferenceCount(); + m_monitorElementStructure = getPVDataCreate()->createPVStructure(0, structure); + int numberFields = m_monitorElementStructure->getNumberFields(); + m_monitorElementChangeBitSet = new BitSet(numberFields); + m_monitorElementOverrunBitSet = new BitSet(numberFields); + } + + virtual void response(Transport* transport, ByteBuffer* payloadBuffer) { + Lock guard(&m_mutex); + // simply deserialize and notify + m_monitorElementChangeBitSet->deserialize(payloadBuffer, transport); + m_monitorElementStructure->deserialize(payloadBuffer, transport, m_monitorElementChangeBitSet); + m_monitorElementOverrunBitSet->deserialize(payloadBuffer, transport); + m_gotMonitor = true; + m_callback->monitorEvent(this); + } + + virtual MonitorElement* poll() { + Lock guard(&m_mutex); + return m_gotMonitor ? this : 0; + } + + virtual void release(MonitorElement* monitorElement) { + Lock guard(&m_mutex); + m_gotMonitor = false; + } + + Status* start() { + Lock guard(&m_mutex); + m_gotMonitor = false; + return 0; + } + + Status* stop() { + return 0; + } + + void destroy() { + // noop + } + + // ============ MonitorElement ============ + + virtual PVStructure* getPVStructure() + { + return m_monitorElementStructure; + } + + virtual BitSet* getChangedBitSet() + { + return m_monitorElementChangeBitSet; + } + + virtual BitSet* getOverrunBitSet() + { + return m_monitorElementOverrunBitSet; + } + }; + + + class MonitorStrategySingle : public MonitorStrategy, public MonitorElement { + private: + + MonitorRequester* m_callback; + + bool m_gotMonitor; + Mutex m_mutex; + + PVStructure* m_monitorElementStructure; + BitSet* m_monitorElementChangeBitSet; + BitSet* m_monitorElementOverrunBitSet; + + BitSet* m_dataChangeBitSet; + BitSet* m_dataOverrunBitSet; + bool m_needToCompress; + + public: + + MonitorStrategySingle(MonitorRequester* callback) : + m_callback(callback), m_gotMonitor(false), m_mutex(), + m_monitorElementStructure(0), + m_monitorElementChangeBitSet(0), + m_monitorElementOverrunBitSet(0), + m_dataChangeBitSet(0), + m_dataOverrunBitSet(0), + m_needToCompress(false) + { + } + + virtual ~MonitorStrategySingle() + { + if (m_monitorElementStructure) delete m_monitorElementStructure; + if (m_monitorElementChangeBitSet) delete m_monitorElementChangeBitSet; + if (m_monitorElementOverrunBitSet) delete m_monitorElementOverrunBitSet; + + if (m_dataChangeBitSet) delete m_dataChangeBitSet; + if (m_dataOverrunBitSet) delete m_dataOverrunBitSet; + } + + virtual void init(Structure* structure) { + Lock guard(&m_mutex); + + structure->incReferenceCount(); + m_monitorElementStructure = getPVDataCreate()->createPVStructure(0, structure); + int numberFields = m_monitorElementStructure->getNumberFields(); + m_monitorElementChangeBitSet = new BitSet(numberFields); + m_monitorElementOverrunBitSet = new BitSet(numberFields); + + m_dataChangeBitSet = new BitSet(numberFields); + m_dataOverrunBitSet = new BitSet(numberFields); + + } + + virtual void response(Transport* transport, ByteBuffer* payloadBuffer) { + Lock guard(&m_mutex); + + if (!m_gotMonitor) + { + // simply deserialize and notify + m_monitorElementChangeBitSet->deserialize(payloadBuffer, transport); + m_monitorElementStructure->deserialize(payloadBuffer, transport, m_monitorElementChangeBitSet); + m_monitorElementOverrunBitSet->deserialize(payloadBuffer, transport); + m_gotMonitor = true; + m_callback->monitorEvent(this); + } + else + { + // deserialize first + m_dataChangeBitSet->deserialize(payloadBuffer, transport); + m_monitorElementStructure->deserialize(payloadBuffer, transport, m_dataChangeBitSet); + m_dataOverrunBitSet->deserialize(payloadBuffer, transport); + + // OR local overrun + // TODO should work only on uncompressed + m_monitorElementOverrunBitSet->or_and(*m_dataChangeBitSet, *m_monitorElementChangeBitSet); + + // OR new changes + *m_monitorElementChangeBitSet |= *m_dataChangeBitSet; + + // OR remote overrun + *m_monitorElementOverrunBitSet |= *m_dataOverrunBitSet; + } + } + + virtual MonitorElement* poll() { + Lock guard(&m_mutex); + if (!m_gotMonitor) return 0; + + // compress if needed + if (m_needToCompress) + { + BitSetUtil::compress(m_monitorElementChangeBitSet, m_monitorElementStructure); + BitSetUtil::compress(m_monitorElementOverrunBitSet, m_monitorElementStructure); + m_needToCompress = false; + } + + return this; + } + + virtual void release(MonitorElement* monitorElement) { + Lock guard(&m_mutex); + m_gotMonitor = false; + } + + Status* start() { + Lock guard(&m_mutex); + m_gotMonitor = false; + m_monitorElementChangeBitSet->clear(); + m_monitorElementOverrunBitSet->clear(); + return 0; + } + + Status* stop() { + return 0; + } + + void destroy() { + // noop + } + + // ============ MonitorElement ============ + + virtual PVStructure* getPVStructure() + { + return m_monitorElementStructure; + } + + virtual BitSet* getChangedBitSet() + { + return m_monitorElementChangeBitSet; + } + + virtual BitSet* getOverrunBitSet() + { + return m_monitorElementOverrunBitSet; + } + }; PVDATA_REFCOUNT_MONITOR_DEFINE(channelMonitor); - class ChannelMonitorImpl : public BaseRequestImpl, public Monitor, - public MonitorElement + class ChannelMonitorImpl : public BaseRequestImpl, public Monitor { private: MonitorRequester* m_monitorRequester; @@ -1497,15 +1804,8 @@ namespace epics { bool m_started; PVStructure* m_pvRequest; - - // TODO temp - PVStructure* m_pvStructure; - BitSet* m_changedBitSet; - BitSet* m_overrunBitSet; - - - bool m_gotMonitor; - Mutex m_lock; + + MonitorStrategy* m_monitorStrategy; private: ~ChannelMonitorImpl() @@ -1514,16 +1814,8 @@ namespace epics { // synced by code calling this //if (m_pvRequest) delete m_pvRequest; - // uncomment when m_pvStructure not destroyed if (m_structure) m_structure->decReferenceCount(); - - // TODO temp - if (m_pvStructure) - { - delete m_pvStructure; - delete m_overrunBitSet; - delete m_changedBitSet; - } - + + if (m_monitorStrategy) delete m_monitorStrategy; } public: @@ -1532,17 +1824,48 @@ namespace epics { m_monitorRequester(monitorRequester), m_structure(0), m_started(false), m_pvRequest(pvRequest), //(dynamic_cast(getPVDataCreate()->createPVField(0, "", pvRequest))), - m_pvStructure(0), m_gotMonitor(false) + m_monitorStrategy(0) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor); + /* if (pvRequest == 0) { EXCEPTION_GUARD(m_monitorRequester->monitorConnect(pvRequestNull, 0, 0)); return; } + */ + + int queueSize = 2; + PVField* pvField = pvRequest->getSubField("record.queueSize"); + if (pvField) { + PVString* pvString = dynamic_cast(pvField); + if (pvString) + { + String value = pvString->get(); + + istringstream buffer(value); + + if ((buffer >> queueSize).fail()) + { + Status* failedToConvert = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "queueSize type is not a valid integer"); + EXCEPTION_GUARD(m_monitorRequester->monitorConnect(failedToConvert, 0, 0)); + delete failedToConvert; + return; + } + } + } + + if (queueSize == -1) + m_monitorStrategy = new MonitorStrategyNotify(m_monitorRequester); + else if (queueSize == 0) // 0 means all (old v3 style), some sending optimization can be done (not to send bit-sets) + m_monitorStrategy = new MonitorStrategyEntire(m_monitorRequester); + else //if (queueSize == 1) + m_monitorStrategy = new MonitorStrategySingle(m_monitorRequester); + /* else + m_monitorStrategy = new MonitorStrategyQueue(queueSize); + */ - // TODO quques // subscribe try { @@ -1587,17 +1910,10 @@ namespace epics { return true; } - // create data and its bitSet - m_structure = const_cast(dynamic_cast(transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport))); - //monitorStrategy->init(structure); - - - // TODO temp - m_pvStructure = dynamic_cast(getPVDataCreate()->createPVField(0, m_structure)); - m_changedBitSet = new BitSet(m_pvStructure->getNumberFields()); - m_overrunBitSet = new BitSet(m_pvStructure->getNumberFields()); + Structure* structure = const_cast(dynamic_cast(transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport))); + m_monitorStrategy->init(structure); + structure->decReferenceCount(); - // notify EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, this, m_structure)); @@ -1614,16 +1930,7 @@ namespace epics { } else { - // TODO - m_changedBitSet->deserialize(payloadBuffer, transport); - m_pvStructure->deserialize(payloadBuffer, transport, m_changedBitSet); - m_overrunBitSet->deserialize(payloadBuffer, transport); - - m_lock.lock(); - m_gotMonitor = true; - m_lock.unlock(); - - EXCEPTION_GUARD(m_monitorRequester->monitorEvent(this)); + m_monitorStrategy->response(transport, payloadBuffer); } return true; } @@ -1668,13 +1975,12 @@ namespace epics { virtual Status* start() { - Lock guard(&m_lock); + Lock guard(&m_mutex); - // TODO sync if (m_destroyed) return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Monitor destroyed.");; - // TODO monitorStrategy.start(); + m_monitorStrategy->start(); // start == process + get if (!startRequest(QOS_PROCESS | QOS_GET)) @@ -1695,13 +2001,12 @@ namespace epics { virtual Status* stop() { - Lock guard(&m_lock); + Lock guard(&m_mutex); - // TODO sync if (m_destroyed) return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Monitor destroyed.");; - //monitorStrategy.stop(); + m_monitorStrategy->stop(); // stop == process + no get if (!startRequest(QOS_PROCESS)) @@ -1726,39 +2031,16 @@ namespace epics { BaseRequestImpl::destroy(); } - // ============ temp ============ - virtual MonitorElement* poll() { - Lock xx(&m_lock); - if (!m_gotMonitor) return 0; - return this; + return m_monitorStrategy->poll(); } virtual void release(MonitorElement* monitorElement) { - Lock xx(&m_lock); - m_gotMonitor = false; + m_monitorStrategy->release(monitorElement); } - // ============ MonitorElement ============ - - virtual PVStructure* getPVStructure() - { - return m_pvStructure; - } - - virtual BitSet* getChangedBitSet() - { - return m_changedBitSet; - } - - virtual BitSet* getOverrunBitSet() - { - return m_overrunBitSet; - } - - };