channelPut

This commit is contained in:
Matej Sekoranja
2011-01-19 21:24:41 +01:00
parent ff25642cd1
commit d4d82f9788

View File

@@ -227,7 +227,6 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess
{
private:
ChannelProcessRequester* m_callback;
volatile bool m_initialized;
PVStructure* m_pvRequest;
private:
@@ -239,10 +238,12 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess
public:
ChannelProcessRequestImpl(ChannelImpl* channel, ChannelProcessRequester* callback, PVStructure *pvRequest) :
BaseRequestImpl(channel, callback),
m_callback(callback), m_initialized(false), m_pvRequest(pvRequest)
m_callback(callback), m_pvRequest(pvRequest)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelProcess);
// TODO check for nulls!!!!
// TODO best-effort support
// subscribe
@@ -267,7 +268,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)m_pendingRequest);
if (pendingRequest & QOS_INIT)
{
// pvRequest
@@ -471,6 +472,187 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet
PVDATA_REFCOUNT_MONITOR_DEFINE(channelPut);
class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
{
private:
ChannelPutRequester* m_channelPutRequester;
PVStructure* m_pvRequest;
PVStructure* m_data;
BitSet* m_bitSet;
private:
~ChannelPutImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelPut);
}
public:
ChannelPutImpl(ChannelImpl* channel, ChannelPutRequester* channelPutRequester, PVStructure *pvRequest) :
BaseRequestImpl(channel, channelPutRequester),
m_channelPutRequester(channelPutRequester), m_pvRequest(pvRequest), // TODO pvRequest
m_data(0), m_bitSet(0)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelPut);
// TODO low-overhead put
// TODO best-effort put
// subscribe
// try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelPutRequester->channelPutConnect(channelNotConnected, null, null, null);
// } catch (CAException caex) {
// TODO m_channelPutRequester->channelPutConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null);
// }
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
int32 pendingRequest = getPendingRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
return;
}
control->startMessage((int8)11, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)m_pendingRequest);
if (pendingRequest & QOS_INIT)
{
// pvRequest
m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest);
}
else if (!(pendingRequest & QOS_GET))
{
// put
// serialize only what has been changed
m_bitSet->serialize(buffer, control);
m_data->serialize(buffer, control, m_bitSet);
}
stopRequest();
}
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
m_channelPutRequester->putDone(status);
return true;
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelPutRequester->channelPutConnect(status, this, 0, 0);
return true;
}
// create data and its bitSet
m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport);
m_bitSet = new BitSet(m_data->getNumberFields());
// notify
m_channelPutRequester->channelPutConnect(okStatus, this, m_data, m_bitSet);
return true;
}
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (qos & QOS_GET)
{
if (!status->isSuccess())
{
m_channelPutRequester->getDone(status);
return true;
}
m_data->deserialize(payloadBuffer, transport);
m_channelPutRequester->getDone(status);
return true;
}
else
{
m_channelPutRequester->putDone(okStatus);
return true;
}
}
virtual void get() {
// TODO sync?
if (m_destroyed) {
m_channelPutRequester->getDone(destroyedStatus);
return;
}
if (!startRequest(QOS_GET)) {
m_channelPutRequester->getDone(otherRequestPendingStatus);
return;
}
// try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
// } catch (IllegalStateException ise) {
// m_channelPutRequester->getDone(channelNotConnected);
// }
}
virtual void put(bool lastRequest) {
// TODO sync?
if (m_destroyed) {
m_channelPutRequester->putDone(destroyedStatus);
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) {
m_channelPutRequester->putDone(otherRequestPendingStatus);
return;
}
//try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelPutRequester->putDone(channelNotConnected);
//}
}
virtual void resubscribeSubscription(Transport* transport) {
startRequest(QOS_INIT);
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
// TODO sync
if (m_data) delete m_data;
if (m_bitSet) delete m_bitSet;
delete this;
}
};
PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField);
class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender
@@ -606,54 +788,6 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender
PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannelPut);
class ChannelImplPut : public ChannelPut
{
private:
ChannelPutRequester* m_channelPutRequester;
PVStructure* m_pvStructure;
BitSet* m_bitSet;
volatile bool m_first;
private:
~ChannelImplPut()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannelPut);
}
public:
ChannelImplPut(ChannelPutRequester* channelPutRequester, PVStructure *pvStructure, PVStructure *pvRequest) :
m_channelPutRequester(channelPutRequester), m_pvStructure(pvStructure),
m_bitSet(new BitSet(pvStructure->getNumberFields())), m_first(true)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelPut);
// TODO pvRequest
m_channelPutRequester->channelPutConnect(g_statusOK, this, m_pvStructure, m_bitSet);
}
virtual void put(bool lastRequest)
{
m_channelPutRequester->putDone(g_statusOK);
if (lastRequest)
destroy();
}
virtual void get()
{
m_channelPutRequester->getDone(g_statusOK);
}
virtual void destroy()
{
delete m_bitSet;
delete this;
}
};
@@ -1884,7 +2018,7 @@ class TestChannelImpl : public ChannelImpl {
ChannelPutRequester *channelPutRequester,
epics::pvData::PVStructure *pvRequest)
{
return new ChannelImplPut(channelPutRequester, 0, pvRequest);
return new ChannelPutImpl(this, channelPutRequester, pvRequest);
}
virtual ChannelPutGet* createChannelPutGet(
@@ -2949,14 +3083,14 @@ int main(int argc,char *argv[])
GetFieldRequesterImpl getFieldRequesterImpl;
channel->getField(&getFieldRequesterImpl, "");
epicsThreadSleep ( 1.0 );
ChannelProcessRequesterImpl channelProcessRequester;
ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0);
epicsThreadSleep ( 1.0 );
channelProcess->process(false);
epicsThreadSleep ( 1.0 );
channelProcess->destroy();
epicsThreadSleep ( 1.0 );
ChannelGetRequesterImpl channelGetRequesterImpl;
@@ -2965,16 +3099,21 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 3.0 );
channelGet->get(false);
epicsThreadSleep ( 3.0 );
channelGet->destroy();
// TODO delete pvRequest
/*
//TODOchannelGet->destroy();
epicsThreadSleep ( 1.0 );
ChannelPutRequesterImpl channelPutRequesterImpl;
ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, 0);
ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest);
epicsThreadSleep ( 1.0 );
channelPut->get();
epicsThreadSleep ( 1.0 );
channelPut->put(false);
channelPut->destroy();
epicsThreadSleep ( 1.0 );
//TODOchannelPut->destroy();
// TODO delete pvRequest
/*
MonitorRequesterImpl monitorRequesterImpl;
Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0);