no-queue monitor implementation

This commit is contained in:
Matej Sekoranja
2011-01-24 23:20:19 +01:00
parent 5cb78eac05
commit 8dbc67377c
3 changed files with 239 additions and 73 deletions

View File

@@ -239,6 +239,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess
ChannelProcessRequestImpl(ChannelImpl* channel, ChannelProcessRequester* callback, PVStructure *pvRequest) :
BaseRequestImpl(channel, callback),
m_callback(callback), m_pvRequest(pvRequest)
//(dynamic_cast<PVStructure*>(getPVDataCreate()->createPVField(0, "", pvRequest)))
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelProcess);
@@ -321,6 +322,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess
virtual void destroy()
{
BaseRequestImpl::destroy();
if (m_pvRequest) delete m_pvRequest;
delete this;
}
@@ -353,7 +355,8 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet
public:
ChannelGetImpl(ChannelImpl* channel, ChannelGetRequester* channelGetRequester, PVStructure *pvRequest) :
BaseRequestImpl(channel, channelGetRequester),
m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), // TODO pvRequest
m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest),
//(dynamic_cast<PVStructure*>(getPVDataCreate()->createPVField(0, "", pvRequest))),
m_data(0), m_bitSet(0)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGet);
@@ -464,6 +467,7 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet
// TODO sync
if (m_data) delete m_data;
if (m_bitSet) delete m_bitSet;
if (m_pvRequest) delete m_pvRequest;
delete this;
}
@@ -500,7 +504,8 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
public:
ChannelPutImpl(ChannelImpl* channel, ChannelPutRequester* channelPutRequester, PVStructure *pvRequest) :
BaseRequestImpl(channel, channelPutRequester),
m_channelPutRequester(channelPutRequester), m_pvRequest(pvRequest), // TODO pvRequest
m_channelPutRequester(channelPutRequester), m_pvRequest(pvRequest),
//(dynamic_cast<PVStructure*>(getPVDataCreate()->createPVField(0, "", pvRequest))),
m_data(0), m_bitSet(0)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelPut);
@@ -642,6 +647,7 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
// TODO sync
if (m_data) delete m_data;
if (m_bitSet) delete m_bitSet;
if (m_pvRequest) delete m_pvRequest;
delete this;
}
@@ -800,60 +806,246 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender
PVDATA_REFCOUNT_MONITOR_DEFINE(mockMonitor);
class MockMonitor : public Monitor, public MonitorElement
PVDATA_REFCOUNT_MONITOR_DEFINE(channelMonitor);
class ChannelMonitorImpl : public BaseRequestImpl, public Monitor,
public MonitorElement
{
private:
MonitorRequester* m_monitorRequester;
Structure* m_structure;
bool m_started;
PVStructure* m_pvRequest;
// TODO temp
PVStructure* m_pvStructure;
BitSet* m_changedBitSet;
BitSet* m_overrunBitSet;
volatile bool m_first;
Mutex* m_lock;
volatile int m_count;
int m_count;
Mutex m_lock;
private:
~MockMonitor()
~ChannelMonitorImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockMonitor);
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelMonitor);
}
public:
MockMonitor(MonitorRequester* monitorRequester, PVStructure *pvStructure, PVStructure *pvRequest) :
m_monitorRequester(monitorRequester), m_pvStructure(pvStructure),
m_changedBitSet(new BitSet(pvStructure->getNumberFields())),
m_overrunBitSet(new BitSet(pvStructure->getNumberFields())),
m_first(true),
m_lock(new Mutex()),
m_count(0)
ChannelMonitorImpl(ChannelImpl* channel, MonitorRequester* monitorRequester, PVStructure *pvRequest) :
BaseRequestImpl(channel, monitorRequester),
m_monitorRequester(monitorRequester), m_structure(0),
m_started(false), m_pvRequest(pvRequest),
//(dynamic_cast<PVStructure*>(getPVDataCreate()->createPVField(0, "", pvRequest))),
m_pvStructure(0), m_count(0)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockMonitor);
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor);
m_changedBitSet->set(0);
// TODO quques
// subscribe
// try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_monitorRequester->monitorConnect(channelNotConnected, null, null);
// } catch (CAException caex) {
// TODO m_monitorRequester->monitorConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null);
// }
// TODO pvRequest
m_monitorRequester->monitorConnect(g_statusOK, this, const_cast<Structure*>(m_pvStructure->getStructure()));
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
int32 pendingRequest = getPendingRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
return;
}
control->startMessage((int8)13, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)m_pendingRequest);
if (pendingRequest & QOS_INIT)
{
// pvRequest
m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest);
}
stopRequest();
}
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
// data available
// TODO if (qos & QOS_GET)
normalResponse(transport, version, payloadBuffer, qos, status);
return true;
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_monitorRequester->monitorConnect(status, this, 0);
return true;
}
// create data and its bitSet
m_structure = const_cast<Structure*>(dynamic_cast<const Structure*>(transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport)));
//monitorStrategy->init(structure);
// TODO temp
m_pvStructure = dynamic_cast<PVStructure*>(getPVDataCreate()->createPVField(0, m_structure));
m_changedBitSet = new BitSet(m_pvStructure->getNumberFields());
m_overrunBitSet = new BitSet(m_pvStructure->getNumberFields());
// notify
m_monitorRequester->monitorConnect(okStatus, this, m_structure);
return true;
}
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (qos & QOS_GET)
{
// TODO not supported by IF yet...
}
else
{
// TODO
m_changedBitSet->deserialize(payloadBuffer, transport);
m_pvStructure->deserialize(payloadBuffer, transport, m_changedBitSet);
m_overrunBitSet->deserialize(payloadBuffer, transport);
m_monitorRequester->monitorEvent(this);
}
return true;
}
virtual void resubscribeSubscription(Transport* transport) {
startRequest(QOS_INIT);
transport->enqueueSendRequest(this);
if (m_started)
start();
}
virtual void destroy()
{
BaseRequestImpl::destroy();
// TODO sync
if (m_pvRequest) delete m_pvRequest;
// uncomment when m_pvStructure not destroyed if (m_structure) m_structure->decReferenceCount();
// TODO temp
if (m_pvStructure)
{
delete m_pvStructure;
delete m_overrunBitSet;
delete m_changedBitSet;
}
delete this;
}
// override, since we optimize status
virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) {
// TODO?
// try
// {
transport->ensureData(1);
int8 qos = payloadBuffer->getByte();
if (qos & QOS_INIT)
{
Status* status = statusCreate->deserializeStatus(payloadBuffer, transport);
initResponse(transport, version, payloadBuffer, qos, status);
// TODO
if (status != okStatus)
delete status;
}
else if (qos & QOS_DESTROY)
{
Status* status = statusCreate->deserializeStatus(payloadBuffer, transport);
m_remotelyDestroyed = true;
if (!destroyResponse(transport, version, payloadBuffer, qos, status))
cancel();
// TODO
if (status != okStatus)
delete status;
}
else
{
normalResponse(transport, version, payloadBuffer, qos, okStatus);
}
}
virtual Status* start()
{
// fist monitor
m_monitorRequester->monitorEvent(this);
Lock guard(&m_lock);
// TODO sync
if (m_destroyed)
return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Monitor destroyed.");;
// TODO monitorStrategy.start();
//try {
// start == process + get
if (!startRequest(QOS_PROCESS | QOS_GET))
{
return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Other request pending.");
}
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
m_started = true;
// client needs to delete status, so passing shared OK instance is not right thing to do
return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor started.");
//} catch (IllegalStateException ise) {
// return channelNotConnected;
//}
// client needs to delete status, so passing shared OK instance is not right thing to do
return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor started.");
}
virtual Status* stop()
{
// client needs to delete status, so passing shared OK instance is not right thing to do
return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor stopped.");
Lock guard(&m_lock);
// TODO sync
if (m_destroyed)
return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Monitor destroyed.");;
//monitorStrategy.stop();
//try {
// stop == process + no get
if (!startRequest(QOS_PROCESS))
{
return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Other request pending.");
}
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
m_started = false;
// client needs to delete status, so passing shared OK instance is not right thing to do
return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor stopped.");
//} catch (IllegalStateException ise) {
// return channelNotConnected;
//}
}
// ============ temp ============
virtual MonitorElement* poll()
{
Lock xx(m_lock);
Lock xx(&m_lock);
if (m_count)
{
return 0;
@@ -867,21 +1059,11 @@ class MockMonitor : public Monitor, public MonitorElement
virtual void release(MonitorElement* monitorElement)
{
Lock xx(m_lock);
Lock xx(&m_lock);
if (m_count)
m_count--;
}
virtual void destroy()
{
delete stop();
delete m_lock;
delete m_overrunBitSet;
delete m_changedBitSet;
delete this;
}
// ============ MonitorElement ============
virtual PVStructure* getPVStructure()
@@ -898,12 +1080,16 @@ class MockMonitor : public Monitor, public MonitorElement
{
return m_overrunBitSet;
}
};
// TODO consider std::unordered_map
typedef std::map<pvAccessID, ResponseRequest*> IOIDResponseRequestMap;
@@ -2029,8 +2215,8 @@ class TestChannelImpl : public ChannelImpl {
ChannelPutGetRequester *channelPutGetRequester,
epics::pvData::PVStructure *pvRequest)
{
// TODO
return 0;
// TODO return new ChannelPutGetImpl(this, channelPutGetRequester, pvRequest);
return 0;
}
virtual ChannelRPC* createChannelRPC(ChannelRPCRequester *channelRPCRequester,
@@ -2044,7 +2230,7 @@ class TestChannelImpl : public ChannelImpl {
epics::pvData::MonitorRequester *monitorRequester,
epics::pvData::PVStructure *pvRequest)
{
return new MockMonitor(monitorRequester, 0, pvRequest);
return new ChannelMonitorImpl(this, monitorRequester, pvRequest);
}
virtual ChannelArray* createChannelArray(
@@ -2513,7 +2699,6 @@ TODO
{
Lock guard(&m_ioidMapMutex);
IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid);
printf("getResponseRequest %d = %d\n", ioid, (it == m_pendingResponseRequests.end() ? 0 : it->second));
return (it == m_pendingResponseRequests.end() ? 0 : it->second);
}
@@ -2526,7 +2711,6 @@ TODO
{
Lock guard(&m_ioidMapMutex);
pvAccessID ioid = generateIOID();
printf("registerResponseRequest %d = %d\n", ioid, request);
m_pendingResponseRequests[ioid] = request;
return ioid;
}
@@ -2540,12 +2724,10 @@ TODO
{
Lock guard(&m_ioidMapMutex);
IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(request->getIOID());
printf("unregisterResponseRequest %d = %d\n", request->getIOID(), request);
if (it == m_pendingResponseRequests.end())
return 0;
ResponseRequest* retVal = it->second;
printf("unregisterResponseRequest %d = %d==%d\n", request->getIOID(), request, retVal);
m_pendingResponseRequests.erase(it);
return retVal;
}
@@ -3095,7 +3277,7 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 );
channel->printInfo();
/*
GetFieldRequesterImpl getFieldRequesterImpl;
channel->getField(&getFieldRequesterImpl, "");
epicsThreadSleep ( 1.0 );
@@ -3119,6 +3301,7 @@ int main(int argc,char *argv[])
ChannelPutRequesterImpl channelPutRequesterImpl;
pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&channelPutRequesterImpl);
ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest);
epicsThreadSleep ( 1.0 );
channelPut->get();
@@ -3126,17 +3309,20 @@ int main(int argc,char *argv[])
channelPut->put(false);
epicsThreadSleep ( 1.0 );
channelPut->destroy();
*/
/*
MonitorRequesterImpl monitorRequesterImpl;
Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0);
PVStructure* pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&monitorRequesterImpl);
Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, pvRequest);
epicsThreadSleep( 1.0 );
Status* status = monitor->start();
std::cout << "monitor->start() = " << status->toString() << std::endl;
delete status;
epicsThreadSleep( 3.0 );
status = monitor->stop();
std::cout << "monitor->stop() = " << status->toString() << std::endl;
@@ -3144,10 +3330,7 @@ int main(int argc,char *argv[])
monitor->destroy();
*/
// TODO share it?
delete pvRequest;
epicsThreadSleep ( 3.0 );
printf("Destroying channel... \n");