From 7d650f3d680ec0400b3c19f8878da3af51021468 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 26 Jan 2011 10:46:51 +0100 Subject: [PATCH] channelRPC done --- testApp/remote/testRemoteClientImpl.cpp | 222 +++++++++++++++++++++++- 1 file changed, 216 insertions(+), 6 deletions(-) diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 4a440c8..c3d419b 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -876,6 +876,157 @@ class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet + + +PVDATA_REFCOUNT_MONITOR_DEFINE(channelRPC); + +class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC +{ + private: + ChannelRPCRequester* m_channelRPCRequester; + + PVStructure* m_pvRequest; + + PVStructure* m_data; + BitSet* m_bitSet; + + private: + ~ChannelRPCImpl() + { + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelRPC); + } + + public: + ChannelRPCImpl(ChannelImpl* channel, ChannelRPCRequester* channelRPCRequester, PVStructure *pvRequest) : + BaseRequestImpl(channel, channelRPCRequester), + m_channelRPCRequester(channelRPCRequester), m_pvRequest(pvRequest), + //(dynamic_cast(getPVDataCreate()->createPVField(0, "", pvRequest))), + m_data(0), m_bitSet(0) + { + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelRPC); + + // subscribe +// try { + resubscribeSubscription(m_channel->checkAndGetTransport()); +// } catch (IllegalStateException ise) { +// TODO m_channelRPCRequester->channelRPCConnect(channelNotConnected, null, null, null); +// } catch (CAException caex) { +// TODO m_channelRPCRequester->channelRPCConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null); +// } + + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { + BaseRequestImpl::send(buffer, control); + return; + } + + control->startMessage((int8)20, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + if ((m_pendingRequest & QOS_INIT) == 0) + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { + buffer->putByte((int8)QOS_INIT); + + // pvRequest + m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); + } + else + { + m_bitSet->serialize(buffer, control); + m_data->serialize(buffer, control, m_bitSet); + } + + stopRequest(); + } + + virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + // data available + // TODO we need a flag here... + return normalResponse(transport, version, payloadBuffer, qos, status); + } + + virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (!status->isSuccess()) + { + m_channelRPCRequester->channelRPCConnect(status, this, 0, 0); + return true; + } + + // create data and its bitSet + m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + m_bitSet = new BitSet(m_data->getNumberFields()); + + // notify + m_channelRPCRequester->channelRPCConnect(okStatus, this, m_data, m_bitSet); + return true; + } + + virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (!status->isSuccess()) + { + m_channelRPCRequester->requestDone(status, 0); + return true; + } + + auto_ptr response(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport)); + m_channelRPCRequester->requestDone(okStatus, response.get()); + return true; + } + + virtual void request(bool lastRequest) { + // TODO sync? + + if (m_destroyed) { + m_channelRPCRequester->requestDone(destroyedStatus, 0); + return; + } + + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + m_channelRPCRequester->requestDone(otherRequestPendingStatus, 0); + return; + } + + //try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + //TODO // m_channelRPCRequester->requestDone(channelNotConnected, 0); + //} + } + + virtual void resubscribeSubscription(Transport* transport) { + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); + } + + + virtual void destroy() + { + BaseRequestImpl::destroy(); + // TODO sync + if (m_data) delete m_data; + if (m_bitSet) delete m_bitSet; + if (m_pvRequest) delete m_pvRequest; + delete this; + } + +}; + + + + + + + + + + PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField); // NOTE: this instance is not returned as Request, so it must self-destruct @@ -2434,8 +2585,7 @@ class TestChannelImpl : public ChannelImpl { virtual ChannelRPC* createChannelRPC(ChannelRPCRequester *channelRPCRequester, epics::pvData::PVStructure *pvRequest) { - // TODO - return 0; + return new ChannelRPCImpl(this, channelRPCRequester, pvRequest); } virtual epics::pvData::Monitor* createMonitor( @@ -3413,6 +3563,7 @@ class ChannelPutGetRequesterImpl : public ChannelPutGetRequester { std::cout << "channelGetPutConnect(" << status->toString() << ")" << std::endl; // TODO sync + m_channelPutGet = channelPutGet; m_putData = putData; m_getData = getData; @@ -3470,6 +3621,54 @@ class ChannelPutGetRequesterImpl : public ChannelPutGetRequester }; + +class ChannelRPCRequesterImpl : public ChannelRPCRequester +{ + ChannelRPC *m_channelRPC; + epics::pvData::PVStructure *m_pvStructure; + epics::pvData::BitSet *m_bitSet; + + virtual String getRequesterName() + { + return "ChannelRPCRequesterImpl"; + }; + + virtual void message(String message,MessageType messageType) + { + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + } + + virtual void channelRPCConnect(epics::pvData::Status *status,ChannelRPC *channelRPC, + epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet) + { + std::cout << "channelRPCConnect(" << status->toString() << ")" << std::endl; + if (pvStructure) + { + String st; + pvStructure->toString(&st); + std::cout << st << std::endl; + } + + // TODO sync + m_channelRPC = channelRPC; + m_pvStructure = pvStructure; + m_bitSet = bitSet; + } + + virtual void requestDone(epics::pvData::Status *status,epics::pvData::PVStructure *pvResponse) + { + std::cout << "requestDone(" << status->toString() << ")" << std::endl; + if (pvResponse) + { + String str; + pvResponse->toString(&str); + std::cout << str; + std::cout << std::endl; + } + } +}; + + class MonitorRequesterImpl : public MonitorRequester { virtual String getRequesterName() @@ -3566,6 +3765,8 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channel->printInfo(); + + PVStructure* pvRequest; /* GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); @@ -3580,7 +3781,7 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); ChannelGetRequesterImpl channelGetRequesterImpl; - PVStructure* pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&channelGetRequesterImpl); + pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&channelGetRequesterImpl); ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest); epicsThreadSleep ( 3.0 ); channelGet->get(false); @@ -3598,9 +3799,9 @@ int main(int argc,char *argv[]) channelPut->put(false); epicsThreadSleep ( 1.0 ); channelPut->destroy(); -*/ + ChannelPutGetRequesterImpl channelPutGetRequesterImpl; - PVStructure* pvRequest = getCreateRequest()->createRequest("putField(value,timeStamp)getField(timeStamp)",&channelPutGetRequesterImpl); + pvRequest = getCreateRequest()->createRequest("putField(value,timeStamp)getField(timeStamp)",&channelPutGetRequesterImpl); ChannelPutGet* channelPutGet = channel->createChannelPutGet(&channelPutGetRequesterImpl, pvRequest); epicsThreadSleep ( 1.0 ); channelPutGet->getGet(); @@ -3610,8 +3811,17 @@ int main(int argc,char *argv[]) channelPutGet->putGet(false); epicsThreadSleep ( 1.0 ); channelPutGet->destroy(); -/* +*/ + ChannelRPCRequesterImpl channelRPCRequesterImpl; + pvRequest = getCreateRequest()->createRequest("record[]field(arguments)",&channelRPCRequesterImpl); + ChannelRPC* channelRPC = channel->createChannelRPC(&channelRPCRequesterImpl, pvRequest); + epicsThreadSleep ( 1.0 ); + channelRPC->request(false); + epicsThreadSleep ( 1.0 ); + channelRPC->destroy(); + +/* MonitorRequesterImpl monitorRequesterImpl; PVStructure* pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&monitorRequesterImpl);