monitor simple queues

This commit is contained in:
Matej Sekoranja
2011-02-15 16:42:10 +01:00
parent a8abd07282
commit 6763d932c0

View File

@@ -1,4 +1,5 @@
/* clientClientContextImpl.cpp */
/* Author: Matej Sekoranja Date: 2011.1.1 */
@@ -25,6 +26,7 @@
#include <configuration.h>
#include <beaconHandler.h>
#include <errlog.h>
#include <bitSetUtil.h>
using namespace epics::pvData;
@@ -1482,14 +1484,319 @@ namespace epics {
class MonitorStrategy : public Monitor {
public:
virtual ~MonitorStrategy() {};
virtual void init(Structure* structure) = 0;
virtual void response(Transport* transport, ByteBuffer* payloadBuffer) = 0;
};
class MonitorStrategyNotify : public MonitorStrategy, public MonitorElement {
private:
MonitorRequester* m_callback;
bool m_gotMonitor;
Mutex m_mutex;
public:
MonitorStrategyNotify(MonitorRequester* callback) :
m_callback(callback), m_gotMonitor(false), m_mutex()
{
}
virtual ~MonitorStrategyNotify()
{
}
virtual void init(Structure* structure) {
// noop
}
virtual void response(Transport* transport, ByteBuffer* payloadBuffer) {
Lock guard(&m_mutex);
m_gotMonitor = true;
// no data, only notify
m_callback->monitorEvent(this);
}
virtual MonitorElement* poll() {
Lock guard(&m_mutex);
return m_gotMonitor ? this : 0;
}
virtual void release(MonitorElement* monitorElement) {
Lock guard(&m_mutex);
m_gotMonitor = false;
}
Status* start() {
return 0;
}
Status* stop() {
return 0;
}
void destroy() {
// noop
}
// ============ MonitorElement ============
virtual PVStructure* getPVStructure()
{
return 0;
}
virtual BitSet* getChangedBitSet()
{
return 0;
}
virtual BitSet* getOverrunBitSet()
{
return 0;
}
};
class MonitorStrategyEntire : public MonitorStrategy, public MonitorElement {
private:
MonitorRequester* m_callback;
bool m_gotMonitor;
Mutex m_mutex;
PVStructure* m_monitorElementStructure;
BitSet* m_monitorElementChangeBitSet;
BitSet* m_monitorElementOverrunBitSet;
public:
MonitorStrategyEntire(MonitorRequester* callback) :
m_callback(callback), m_gotMonitor(false), m_mutex(),
m_monitorElementStructure(0),
m_monitorElementChangeBitSet(0),
m_monitorElementOverrunBitSet(0)
{
}
virtual ~MonitorStrategyEntire()
{
if (m_monitorElementStructure) delete m_monitorElementStructure;
if (m_monitorElementChangeBitSet) delete m_monitorElementChangeBitSet;
if (m_monitorElementOverrunBitSet) delete m_monitorElementOverrunBitSet;
}
virtual void init(Structure* structure) {
Lock guard(&m_mutex);
structure->incReferenceCount();
m_monitorElementStructure = getPVDataCreate()->createPVStructure(0, structure);
int numberFields = m_monitorElementStructure->getNumberFields();
m_monitorElementChangeBitSet = new BitSet(numberFields);
m_monitorElementOverrunBitSet = new BitSet(numberFields);
}
virtual void response(Transport* transport, ByteBuffer* payloadBuffer) {
Lock guard(&m_mutex);
// simply deserialize and notify
m_monitorElementChangeBitSet->deserialize(payloadBuffer, transport);
m_monitorElementStructure->deserialize(payloadBuffer, transport, m_monitorElementChangeBitSet);
m_monitorElementOverrunBitSet->deserialize(payloadBuffer, transport);
m_gotMonitor = true;
m_callback->monitorEvent(this);
}
virtual MonitorElement* poll() {
Lock guard(&m_mutex);
return m_gotMonitor ? this : 0;
}
virtual void release(MonitorElement* monitorElement) {
Lock guard(&m_mutex);
m_gotMonitor = false;
}
Status* start() {
Lock guard(&m_mutex);
m_gotMonitor = false;
return 0;
}
Status* stop() {
return 0;
}
void destroy() {
// noop
}
// ============ MonitorElement ============
virtual PVStructure* getPVStructure()
{
return m_monitorElementStructure;
}
virtual BitSet* getChangedBitSet()
{
return m_monitorElementChangeBitSet;
}
virtual BitSet* getOverrunBitSet()
{
return m_monitorElementOverrunBitSet;
}
};
class MonitorStrategySingle : public MonitorStrategy, public MonitorElement {
private:
MonitorRequester* m_callback;
bool m_gotMonitor;
Mutex m_mutex;
PVStructure* m_monitorElementStructure;
BitSet* m_monitorElementChangeBitSet;
BitSet* m_monitorElementOverrunBitSet;
BitSet* m_dataChangeBitSet;
BitSet* m_dataOverrunBitSet;
bool m_needToCompress;
public:
MonitorStrategySingle(MonitorRequester* callback) :
m_callback(callback), m_gotMonitor(false), m_mutex(),
m_monitorElementStructure(0),
m_monitorElementChangeBitSet(0),
m_monitorElementOverrunBitSet(0),
m_dataChangeBitSet(0),
m_dataOverrunBitSet(0),
m_needToCompress(false)
{
}
virtual ~MonitorStrategySingle()
{
if (m_monitorElementStructure) delete m_monitorElementStructure;
if (m_monitorElementChangeBitSet) delete m_monitorElementChangeBitSet;
if (m_monitorElementOverrunBitSet) delete m_monitorElementOverrunBitSet;
if (m_dataChangeBitSet) delete m_dataChangeBitSet;
if (m_dataOverrunBitSet) delete m_dataOverrunBitSet;
}
virtual void init(Structure* structure) {
Lock guard(&m_mutex);
structure->incReferenceCount();
m_monitorElementStructure = getPVDataCreate()->createPVStructure(0, structure);
int numberFields = m_monitorElementStructure->getNumberFields();
m_monitorElementChangeBitSet = new BitSet(numberFields);
m_monitorElementOverrunBitSet = new BitSet(numberFields);
m_dataChangeBitSet = new BitSet(numberFields);
m_dataOverrunBitSet = new BitSet(numberFields);
}
virtual void response(Transport* transport, ByteBuffer* payloadBuffer) {
Lock guard(&m_mutex);
if (!m_gotMonitor)
{
// simply deserialize and notify
m_monitorElementChangeBitSet->deserialize(payloadBuffer, transport);
m_monitorElementStructure->deserialize(payloadBuffer, transport, m_monitorElementChangeBitSet);
m_monitorElementOverrunBitSet->deserialize(payloadBuffer, transport);
m_gotMonitor = true;
m_callback->monitorEvent(this);
}
else
{
// deserialize first
m_dataChangeBitSet->deserialize(payloadBuffer, transport);
m_monitorElementStructure->deserialize(payloadBuffer, transport, m_dataChangeBitSet);
m_dataOverrunBitSet->deserialize(payloadBuffer, transport);
// OR local overrun
// TODO should work only on uncompressed
m_monitorElementOverrunBitSet->or_and(*m_dataChangeBitSet, *m_monitorElementChangeBitSet);
// OR new changes
*m_monitorElementChangeBitSet |= *m_dataChangeBitSet;
// OR remote overrun
*m_monitorElementOverrunBitSet |= *m_dataOverrunBitSet;
}
}
virtual MonitorElement* poll() {
Lock guard(&m_mutex);
if (!m_gotMonitor) return 0;
// compress if needed
if (m_needToCompress)
{
BitSetUtil::compress(m_monitorElementChangeBitSet, m_monitorElementStructure);
BitSetUtil::compress(m_monitorElementOverrunBitSet, m_monitorElementStructure);
m_needToCompress = false;
}
return this;
}
virtual void release(MonitorElement* monitorElement) {
Lock guard(&m_mutex);
m_gotMonitor = false;
}
Status* start() {
Lock guard(&m_mutex);
m_gotMonitor = false;
m_monitorElementChangeBitSet->clear();
m_monitorElementOverrunBitSet->clear();
return 0;
}
Status* stop() {
return 0;
}
void destroy() {
// noop
}
// ============ MonitorElement ============
virtual PVStructure* getPVStructure()
{
return m_monitorElementStructure;
}
virtual BitSet* getChangedBitSet()
{
return m_monitorElementChangeBitSet;
}
virtual BitSet* getOverrunBitSet()
{
return m_monitorElementOverrunBitSet;
}
};
PVDATA_REFCOUNT_MONITOR_DEFINE(channelMonitor);
class ChannelMonitorImpl : public BaseRequestImpl, public Monitor,
public MonitorElement
class ChannelMonitorImpl : public BaseRequestImpl, public Monitor
{
private:
MonitorRequester* m_monitorRequester;
@@ -1497,15 +1804,8 @@ namespace epics {
bool m_started;
PVStructure* m_pvRequest;
// TODO temp
PVStructure* m_pvStructure;
BitSet* m_changedBitSet;
BitSet* m_overrunBitSet;
bool m_gotMonitor;
Mutex m_lock;
MonitorStrategy* m_monitorStrategy;
private:
~ChannelMonitorImpl()
@@ -1514,16 +1814,8 @@ namespace epics {
// synced by code calling this
//if (m_pvRequest) delete m_pvRequest;
// uncomment when m_pvStructure not destroyed if (m_structure) m_structure->decReferenceCount();
// TODO temp
if (m_pvStructure)
{
delete m_pvStructure;
delete m_overrunBitSet;
delete m_changedBitSet;
}
if (m_monitorStrategy) delete m_monitorStrategy;
}
public:
@@ -1532,17 +1824,48 @@ namespace epics {
m_monitorRequester(monitorRequester), m_structure(0),
m_started(false), m_pvRequest(pvRequest),
//(dynamic_cast<PVStructure*>(getPVDataCreate()->createPVField(0, "", pvRequest))),
m_pvStructure(0), m_gotMonitor(false)
m_monitorStrategy(0)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor);
/*
if (pvRequest == 0)
{
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(pvRequestNull, 0, 0));
return;
}
*/
int queueSize = 2;
PVField* pvField = pvRequest->getSubField("record.queueSize");
if (pvField) {
PVString* pvString = dynamic_cast<PVString*>(pvField);
if (pvString)
{
String value = pvString->get();
istringstream buffer(value);
if ((buffer >> queueSize).fail())
{
Status* failedToConvert = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "queueSize type is not a valid integer");
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(failedToConvert, 0, 0));
delete failedToConvert;
return;
}
}
}
if (queueSize == -1)
m_monitorStrategy = new MonitorStrategyNotify(m_monitorRequester);
else if (queueSize == 0) // 0 means all (old v3 style), some sending optimization can be done (not to send bit-sets)
m_monitorStrategy = new MonitorStrategyEntire(m_monitorRequester);
else //if (queueSize == 1)
m_monitorStrategy = new MonitorStrategySingle(m_monitorRequester);
/* else
m_monitorStrategy = new MonitorStrategyQueue(queueSize);
*/
// TODO quques
// subscribe
try {
@@ -1587,17 +1910,10 @@ namespace epics {
return true;
}
// create data and its bitSet
m_structure = const_cast<Structure*>(dynamic_cast<const Structure*>(transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport)));
//monitorStrategy->init(structure);
// TODO temp
m_pvStructure = dynamic_cast<PVStructure*>(getPVDataCreate()->createPVField(0, m_structure));
m_changedBitSet = new BitSet(m_pvStructure->getNumberFields());
m_overrunBitSet = new BitSet(m_pvStructure->getNumberFields());
Structure* structure = const_cast<Structure*>(dynamic_cast<const Structure*>(transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport)));
m_monitorStrategy->init(structure);
structure->decReferenceCount();
// notify
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, this, m_structure));
@@ -1614,16 +1930,7 @@ namespace epics {
}
else
{
// TODO
m_changedBitSet->deserialize(payloadBuffer, transport);
m_pvStructure->deserialize(payloadBuffer, transport, m_changedBitSet);
m_overrunBitSet->deserialize(payloadBuffer, transport);
m_lock.lock();
m_gotMonitor = true;
m_lock.unlock();
EXCEPTION_GUARD(m_monitorRequester->monitorEvent(this));
m_monitorStrategy->response(transport, payloadBuffer);
}
return true;
}
@@ -1668,13 +1975,12 @@ namespace epics {
virtual Status* start()
{
Lock guard(&m_lock);
Lock guard(&m_mutex);
// TODO sync
if (m_destroyed)
return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Monitor destroyed.");;
// TODO monitorStrategy.start();
m_monitorStrategy->start();
// start == process + get
if (!startRequest(QOS_PROCESS | QOS_GET))
@@ -1695,13 +2001,12 @@ namespace epics {
virtual Status* stop()
{
Lock guard(&m_lock);
Lock guard(&m_mutex);
// TODO sync
if (m_destroyed)
return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Monitor destroyed.");;
//monitorStrategy.stop();
m_monitorStrategy->stop();
// stop == process + no get
if (!startRequest(QOS_PROCESS))
@@ -1726,39 +2031,16 @@ namespace epics {
BaseRequestImpl::destroy();
}
// ============ temp ============
virtual MonitorElement* poll()
{
Lock xx(&m_lock);
if (!m_gotMonitor) return 0;
return this;
return m_monitorStrategy->poll();
}
virtual void release(MonitorElement* monitorElement)
{
Lock xx(&m_lock);
m_gotMonitor = false;
m_monitorStrategy->release(monitorElement);
}
// ============ MonitorElement ============
virtual PVStructure* getPVStructure()
{
return m_pvStructure;
}
virtual BitSet* getChangedBitSet()
{
return m_changedBitSet;
}
virtual BitSet* getOverrunBitSet()
{
return m_overrunBitSet;
}
};