diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index ba9509b..a317037 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -227,7 +227,6 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess { private: ChannelProcessRequester* m_callback; - volatile bool m_initialized; PVStructure* m_pvRequest; private: @@ -239,10 +238,12 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess public: ChannelProcessRequestImpl(ChannelImpl* channel, ChannelProcessRequester* callback, PVStructure *pvRequest) : BaseRequestImpl(channel, callback), - m_callback(callback), m_initialized(false), m_pvRequest(pvRequest) + m_callback(callback), m_pvRequest(pvRequest) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelProcess); + // TODO check for nulls!!!! + // TODO best-effort support // subscribe @@ -267,7 +268,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); buffer->putByte((int8)m_pendingRequest); - + if (pendingRequest & QOS_INIT) { // pvRequest @@ -471,6 +472,187 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet + + + + + +PVDATA_REFCOUNT_MONITOR_DEFINE(channelPut); + +class ChannelPutImpl : public BaseRequestImpl, public ChannelPut +{ + private: + ChannelPutRequester* m_channelPutRequester; + + PVStructure* m_pvRequest; + + PVStructure* m_data; + BitSet* m_bitSet; + + private: + ~ChannelPutImpl() + { + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelPut); + } + + public: + ChannelPutImpl(ChannelImpl* channel, ChannelPutRequester* channelPutRequester, PVStructure *pvRequest) : + BaseRequestImpl(channel, channelPutRequester), + m_channelPutRequester(channelPutRequester), m_pvRequest(pvRequest), // TODO pvRequest + m_data(0), m_bitSet(0) + { + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelPut); + + // TODO low-overhead put + // TODO best-effort put + + // subscribe +// try { + resubscribeSubscription(m_channel->checkAndGetTransport()); +// } catch (IllegalStateException ise) { +// TODO m_channelPutRequester->channelPutConnect(channelNotConnected, null, null, null); +// } catch (CAException caex) { +// TODO m_channelPutRequester->channelPutConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null); +// } + + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { + BaseRequestImpl::send(buffer, control); + return; + } + + control->startMessage((int8)11, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { + // pvRequest + m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); + } + else if (!(pendingRequest & QOS_GET)) + { + // put + // serialize only what has been changed + m_bitSet->serialize(buffer, control); + m_data->serialize(buffer, control, m_bitSet); + } + + stopRequest(); + } + + virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + m_channelPutRequester->putDone(status); + return true; + } + + virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (!status->isSuccess()) + { + m_channelPutRequester->channelPutConnect(status, this, 0, 0); + return true; + } + + // create data and its bitSet + m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + m_bitSet = new BitSet(m_data->getNumberFields()); + + // notify + m_channelPutRequester->channelPutConnect(okStatus, this, m_data, m_bitSet); + return true; + } + + virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (qos & QOS_GET) + { + if (!status->isSuccess()) + { + m_channelPutRequester->getDone(status); + return true; + } + + m_data->deserialize(payloadBuffer, transport); + m_channelPutRequester->getDone(status); + return true; + } + else + { + m_channelPutRequester->putDone(okStatus); + return true; + } + } + + virtual void get() { + // TODO sync? + + if (m_destroyed) { + m_channelPutRequester->getDone(destroyedStatus); + return; + } + + if (!startRequest(QOS_GET)) { + m_channelPutRequester->getDone(otherRequestPendingStatus); + return; + } + + +// try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); +// } catch (IllegalStateException ise) { +// m_channelPutRequester->getDone(channelNotConnected); +// } + } + + virtual void put(bool lastRequest) { + // TODO sync? + + if (m_destroyed) { + m_channelPutRequester->putDone(destroyedStatus); + return; + } + + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + m_channelPutRequester->putDone(otherRequestPendingStatus); + return; + } + + //try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + //TODO // m_channelPutRequester->putDone(channelNotConnected); + //} + } + + virtual void resubscribeSubscription(Transport* transport) { + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); + } + + + virtual void destroy() + { + // TODO sync + if (m_data) delete m_data; + if (m_bitSet) delete m_bitSet; + delete this; + } + +}; + + + + + + + + + + PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField); class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender @@ -606,54 +788,6 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender -PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannelPut); - -class ChannelImplPut : public ChannelPut -{ - private: - ChannelPutRequester* m_channelPutRequester; - PVStructure* m_pvStructure; - BitSet* m_bitSet; - volatile bool m_first; - - private: - ~ChannelImplPut() - { - PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannelPut); - } - - public: - ChannelImplPut(ChannelPutRequester* channelPutRequester, PVStructure *pvStructure, PVStructure *pvRequest) : - m_channelPutRequester(channelPutRequester), m_pvStructure(pvStructure), - m_bitSet(new BitSet(pvStructure->getNumberFields())), m_first(true) - { - PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelPut); - - // TODO pvRequest - m_channelPutRequester->channelPutConnect(g_statusOK, this, m_pvStructure, m_bitSet); - } - - virtual void put(bool lastRequest) - { - m_channelPutRequester->putDone(g_statusOK); - if (lastRequest) - destroy(); - } - - virtual void get() - { - m_channelPutRequester->getDone(g_statusOK); - } - - virtual void destroy() - { - delete m_bitSet; - delete this; - } - -}; - - @@ -1884,7 +2018,7 @@ class TestChannelImpl : public ChannelImpl { ChannelPutRequester *channelPutRequester, epics::pvData::PVStructure *pvRequest) { - return new ChannelImplPut(channelPutRequester, 0, pvRequest); + return new ChannelPutImpl(this, channelPutRequester, pvRequest); } virtual ChannelPutGet* createChannelPutGet( @@ -2949,14 +3083,14 @@ int main(int argc,char *argv[]) GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); - epicsThreadSleep ( 1.0 ); ChannelProcessRequesterImpl channelProcessRequester; ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); + epicsThreadSleep ( 1.0 ); channelProcess->process(false); + epicsThreadSleep ( 1.0 ); channelProcess->destroy(); - epicsThreadSleep ( 1.0 ); ChannelGetRequesterImpl channelGetRequesterImpl; @@ -2965,16 +3099,21 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 3.0 ); channelGet->get(false); epicsThreadSleep ( 3.0 ); - channelGet->destroy(); - // TODO delete pvRequest -/* + //TODOchannelGet->destroy(); + epicsThreadSleep ( 1.0 ); + ChannelPutRequesterImpl channelPutRequesterImpl; - ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, 0); + ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest); + epicsThreadSleep ( 1.0 ); channelPut->get(); + epicsThreadSleep ( 1.0 ); channelPut->put(false); - channelPut->destroy(); + epicsThreadSleep ( 1.0 ); + //TODOchannelPut->destroy(); + // TODO delete pvRequest +/* MonitorRequesterImpl monitorRequesterImpl; Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0);