From c1c391854cd274348840c246863410d4645cc085 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Tue, 25 Jan 2011 23:43:07 +0100 Subject: [PATCH] putGet impl --- testApp/remote/testRemoteClientImpl.cpp | 319 +++++++++++++++++++++++- 1 file changed, 310 insertions(+), 9 deletions(-) diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index a0297f2..4a440c8 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -400,7 +400,7 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { // data available if (qos & QOS_GET) - normalResponse(transport, version, payloadBuffer, qos, status); + return normalResponse(transport, version, payloadBuffer, qos, status); return true; } @@ -660,6 +660,220 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut +PVDATA_REFCOUNT_MONITOR_DEFINE(channelPutGet); + +class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet +{ + private: + ChannelPutGetRequester* m_channelPutGetRequester; + + PVStructure* m_pvRequest; + + PVStructure* m_putData; + PVStructure* m_getData; + + private: + ~ChannelPutGetImpl() + { + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelPutGet); + } + + public: + ChannelPutGetImpl(ChannelImpl* channel, ChannelPutGetRequester* channelPutGetRequester, PVStructure *pvRequest) : + BaseRequestImpl(channel, channelPutGetRequester), + m_channelPutGetRequester(channelPutGetRequester), m_pvRequest(pvRequest), + //(dynamic_cast(getPVDataCreate()->createPVField(0, "", pvRequest))), + m_putData(0), m_getData(0) + { + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelPutGet); + + // subscribe +// try { + resubscribeSubscription(m_channel->checkAndGetTransport()); +// } catch (IllegalStateException ise) { +// TODO m_channelPutGetRequester->channelPutGetConnect(channelNotConnected, null, null, null); +// } catch (CAException caex) { +// TODO m_channelPutGetRequester->channelPutGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null); +// } + + } + + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { + BaseRequestImpl::send(buffer, control); + return; + } + + control->startMessage((int8)12, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + if ((pendingRequest & QOS_INIT) == 0) + buffer->putByte((int8)pendingRequest); + + if (pendingRequest & QOS_INIT) + { + buffer->putByte((int8)QOS_INIT); + + // pvRequest + m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); + } + else if (pendingRequest & (QOS_GET | QOS_GET_PUT)) { + // noop + } + else + { + m_putData->serialize(buffer, control); + } + + stopRequest(); + } + + virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + // data available + // TODO we need a flag here... + return normalResponse(transport, version, payloadBuffer, qos, status); + } + + virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (!status->isSuccess()) + { + m_channelPutGetRequester->channelPutGetConnect(status, this, 0, 0); + return true; + } + + IntrospectionRegistry* registry = transport->getIntrospectionRegistry(); + m_putData = registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + m_getData = registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + + // notify + m_channelPutGetRequester->channelPutGetConnect(okStatus, this, m_putData, m_getData); + return true; + } + + + virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (qos & QOS_GET) + { + if (!status->isSuccess()) + { + m_channelPutGetRequester->getGetDone(status); + return true; + } + + // deserialize get data + m_getData->deserialize(payloadBuffer, transport); + m_channelPutGetRequester->getGetDone(status); + return true; + } + else if (qos & QOS_GET_PUT) + { + if (!status->isSuccess()) + { + m_channelPutGetRequester->getPutDone(status); + return true; + } + + // deserialize put data + m_putData->deserialize(payloadBuffer, transport); + m_channelPutGetRequester->getPutDone(status); + return true; + } + else + { + if (!status->isSuccess()) + { + m_channelPutGetRequester->putGetDone(status); + return true; + } + + // deserialize data + m_getData->deserialize(payloadBuffer, transport); + m_channelPutGetRequester->putGetDone(status); + return true; + } + } + + + virtual void putGet(bool lastRequest) { + if (m_destroyed) { + m_channelPutGetRequester->putGetDone(destroyedStatus); + return; + } + + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + m_channelPutGetRequester->putGetDone(otherRequestPendingStatus); + return; + } + +// try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); +// } catch (IllegalStateException ise) { +// m_channelPutGetRequester->putGetDone(channelNotConnected); +// } + } + + virtual void getGet() { + if (m_destroyed) { + m_channelPutGetRequester->getGetDone(destroyedStatus); + return; + } + + if (!startRequest(QOS_GET)) { + m_channelPutGetRequester->getGetDone(otherRequestPendingStatus); + return; + } + + //try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + //TODO // m_channelPutGetRequester->getGetDone(channelNotConnected); + //} + } + + virtual void getPut() { + if (m_destroyed) { + m_channelPutGetRequester->getPutDone(destroyedStatus); + return; + } + + if (!startRequest(QOS_GET_PUT)) { + m_channelPutGetRequester->getPutDone(otherRequestPendingStatus); + return; + } + + //try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + //TODO // m_channelPutGetRequester->getPutDone(channelNotConnected); + //} + } + + virtual void resubscribeSubscription(Transport* transport) { + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); + } + + + virtual void destroy() + { + BaseRequestImpl::destroy(); + // TODO sync + if (m_putData) delete m_putData; + if (m_getData) delete m_getData; + if (m_pvRequest) delete m_pvRequest; + delete this; + } + +}; + + + + + + PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField); @@ -882,8 +1096,7 @@ public MonitorElement virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { // data available // TODO if (qos & QOS_GET) - normalResponse(transport, version, payloadBuffer, qos, status); - return true; + return normalResponse(transport, version, payloadBuffer, qos, status); } virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { @@ -2215,8 +2428,7 @@ class TestChannelImpl : public ChannelImpl { ChannelPutGetRequester *channelPutGetRequester, epics::pvData::PVStructure *pvRequest) { - // TODO return new ChannelPutGetImpl(this, channelPutGetRequester, pvRequest); - return 0; + return new ChannelPutGetImpl(this, channelPutGetRequester, pvRequest); } virtual ChannelRPC* createChannelRPC(ChannelRPCRequester *channelRPCRequester, @@ -3180,6 +3392,83 @@ class ChannelPutRequesterImpl : public ChannelPutRequester }; +class ChannelPutGetRequesterImpl : public ChannelPutGetRequester +{ + ChannelPutGet *m_channelPutGet; + epics::pvData::PVStructure *m_putData; + epics::pvData::PVStructure *m_getData; + + virtual String getRequesterName() + { + return "ChannelGetPutRequesterImpl"; + }; + + virtual void message(String message,MessageType messageType) + { + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + } + + virtual void channelPutGetConnect(epics::pvData::Status *status,ChannelPutGet *channelPutGet, + epics::pvData::PVStructure *putData,epics::pvData::PVStructure *getData) + { + std::cout << "channelGetPutConnect(" << status->toString() << ")" << std::endl; + // TODO sync + m_putData = putData; + m_getData = getData; + + if (m_putData) + { + String str; + m_putData->toString(&str); + std::cout << str; + std::cout << std::endl; + } + if (m_getData) + { + String str; + m_getData->toString(&str); + std::cout << str; + std::cout << std::endl; + } + } + + virtual void getGetDone(epics::pvData::Status *status) + { + std::cout << "getGetDone(" << status->toString() << ")" << std::endl; + if (m_getData) + { + String str; + m_getData->toString(&str); + std::cout << str; + std::cout << std::endl; + } + } + + virtual void getPutDone(epics::pvData::Status *status) + { + std::cout << "getPutDone(" << status->toString() << ")" << std::endl; + if (m_putData) + { + String str; + m_putData->toString(&str); + std::cout << str; + std::cout << std::endl; + } + } + + virtual void putGetDone(epics::pvData::Status *status) + { + std::cout << "putGetDone(" << status->toString() << ")" << std::endl; + if (m_putData) + { + String str; + m_putData->toString(&str); + std::cout << str; + std::cout << std::endl; + } + } + +}; class MonitorRequesterImpl : public MonitorRequester { @@ -3291,7 +3580,7 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); ChannelGetRequesterImpl channelGetRequesterImpl; - PVStructure* pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&channelGetRequesterImpl); + PVStructure* pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&channelGetRequesterImpl); ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest); epicsThreadSleep ( 3.0 ); channelGet->get(false); @@ -3301,7 +3590,7 @@ int main(int argc,char *argv[]) ChannelPutRequesterImpl channelPutRequesterImpl; - pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&channelPutRequesterImpl); + pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&channelPutRequesterImpl); ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest); epicsThreadSleep ( 1.0 ); channelPut->get(); @@ -3310,10 +3599,22 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channelPut->destroy(); */ + ChannelPutGetRequesterImpl channelPutGetRequesterImpl; + PVStructure* pvRequest = getCreateRequest()->createRequest("putField(value,timeStamp)getField(timeStamp)",&channelPutGetRequesterImpl); + ChannelPutGet* channelPutGet = channel->createChannelPutGet(&channelPutGetRequesterImpl, pvRequest); + epicsThreadSleep ( 1.0 ); + channelPutGet->getGet(); + epicsThreadSleep ( 1.0 ); + channelPutGet->getPut(); + epicsThreadSleep ( 1.0 ); + channelPutGet->putGet(false); + epicsThreadSleep ( 1.0 ); + channelPutGet->destroy(); +/* MonitorRequesterImpl monitorRequesterImpl; - PVStructure* pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&monitorRequesterImpl); + PVStructure* pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&monitorRequesterImpl); Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, pvRequest); epicsThreadSleep( 1.0 ); @@ -3331,7 +3632,7 @@ int main(int argc,char *argv[]) monitor->destroy(); - +*/ epicsThreadSleep ( 3.0 ); printf("Destroying channel... \n"); channel->destroy();