diff --git a/pvAccessApp/client/pvAccess.h b/pvAccessApp/client/pvAccess.h index ac9ea13..11fea9d 100644 --- a/pvAccessApp/client/pvAccess.h +++ b/pvAccessApp/client/pvAccess.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace epics { namespace pvAccess { diff --git a/pvAccessApp/remoteClient/clientContextImpl.h b/pvAccessApp/remoteClient/clientContextImpl.h index a83e644..af43709 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.h +++ b/pvAccessApp/remoteClient/clientContextImpl.h @@ -35,6 +35,7 @@ namespace epics { virtual void registerResponseRequest(ResponseRequest* responseRequest) = 0; virtual void unregisterResponseRequest(ResponseRequest* responseRequest) = 0; virtual Transport* checkAndGetTransport() = 0; + virtual Transport* getTransport() = 0; static Status* channelDestroyed; static Status* channelDisconnected; diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index f3fc02d..7f75082 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -32,7 +32,6 @@ static Status* g_statusOK = getStatusCreate()->getStatusOK(); - static StatusCreate* statusCreate = getStatusCreate(); static Status* okStatus = g_statusOK; static Status* destroyedStatus = statusCreate->createStatus(STATUSTYPE_ERROR, "request destroyed"); @@ -40,6 +39,10 @@ static Status* g_statusOK = getStatusCreate()->getStatusOK(); static Status* otherRequestPendingStatus = statusCreate->createStatus(STATUSTYPE_ERROR, "other request pending"); static PVDataCreate* pvDataCreate = getPVDataCreate(); +// TODO +Status* ChannelImpl::channelDestroyed = statusCreate->createStatus(STATUSTYPE_WARNING, "channel destroyed"); +Status* ChannelImpl::channelDisconnected = statusCreate->createStatus(STATUSTYPE_WARNING, "channel disconnected");; + /** * Base channel request. * @author Matej Sekoranja @@ -110,9 +113,9 @@ public: return m_ioid; } - virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status); - virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status); - virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status); + virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) = 0; + virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) = 0; + virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) = 0; virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) { // TODO? @@ -362,7 +365,7 @@ class ChannelImplProcess : public ChannelProcess PVDATA_REFCOUNT_MONITOR_DEFINE(channelGet); -class ChannelImplGet : public ChannelGet +class ChannelImplGet : public BaseRequestImpl, public ChannelGet { private: ChannelImpl* m_channel; @@ -381,22 +384,111 @@ class ChannelImplGet : public ChannelGet public: ChannelImplGet(ChannelImpl* channel, ChannelGetRequester* channelGetRequester, PVStructure *pvRequest) : - m_channel(channel), m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), + BaseRequestImpl(channel, channelGetRequester), + m_channel(channel), m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), // TODO pvRequest m_data(0), m_bitSet(0) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGet); - // TODO pvRequest - m_channelGetRequester->channelGetConnect(g_statusOK, this, m_data, m_bitSet); + // TODO immediate get, i.e. get data with init message + // TODO one-time get, i.e. immediate get + lastRequest + + // subscribe +// try { + resubscribeSubscription(m_channel->checkAndGetTransport()); +// } catch (IllegalStateException ise) { +// TODO m_channelGetRequester->channelGetConnect(channelNotConnected, null, null, null); +// } catch (CAException caex) { +// TODO m_channelGetRequester->channelGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null); +// } + } - virtual void get(bool lastRequest) - { - m_channelGetRequester->getDone(g_statusOK); + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { + BaseRequestImpl::send(buffer, control); + return; + } + + control->startMessage((int8)10, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { + // pvRequest + m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); + } + + stopRequest(); + } + + virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + // data available + if (qos & QOS_GET) + normalResponse(transport, version, payloadBuffer, qos, status); + return true; + } + + virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (!status->isSuccess()) + { + m_channelGetRequester->channelGetConnect(status, this, 0, 0); + return true; + } + + // create data and its bitSet + m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + m_bitSet = new BitSet(m_data->getNumberFields()); + + // notify + m_channelGetRequester->channelGetConnect(okStatus, this, m_data, m_bitSet); + return true; + } + + virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (!status->isSuccess()) + { + m_channelGetRequester->getDone(status); + return true; + } + + // deserialize bitSet and data + m_bitSet->deserialize(payloadBuffer, transport); + m_data->deserialize(payloadBuffer, transport, m_bitSet); + + m_channelGetRequester->getDone(okStatus); + return true; + } + + virtual void get(bool lastRequest) { + // TODO sync? + + if (m_destroyed) { + m_channelGetRequester->getDone(destroyedStatus); + return; + } + + if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_DEFAULT)) { + m_channelGetRequester->getDone(otherRequestPendingStatus); + return; + } + + //try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + //TODO // m_channelGetRequester->getDone(channelNotConnected); + //} + } + + virtual void resubscribeSubscription(Transport* transport) { + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); + } - if (lastRequest) - destroy(); - } virtual void destroy() { @@ -1552,6 +1644,12 @@ class TestChannelImpl : public ChannelImpl { return m_transport; // TODO transport can be 0 !!!!!!!!!! } + virtual Transport* getTransport() + { + Lock guard(&m_channelMutex); + return m_transport; + } + virtual void transportResponsive(Transport* transport) { Lock guard(&m_channelMutex); if (m_connectionState == DISCONNECTED) @@ -2741,15 +2839,19 @@ int main(int argc,char *argv[]) ChannelRequesterImpl channelRequester; Channel* channel = context->getProvider()->createChannel("structureArrayTest", &channelRequester); channel->printInfo(); - /* - GetFieldRequesterImpl getFieldRequesterImpl; - channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch"); + + //GetFieldRequesterImpl getFieldRequesterImpl; + //channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch"); + + epicsThreadSleep ( 3.0 ); ChannelGetRequesterImpl channelGetRequesterImpl; ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, 0); + epicsThreadSleep ( 3.0 ); channelGet->get(false); + epicsThreadSleep ( 3.0 ); channelGet->destroy(); - +/* ChannelPutRequesterImpl channelPutRequesterImpl; ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, 0); channelPut->get(); @@ -2778,7 +2880,7 @@ int main(int argc,char *argv[]) monitor->destroy(); */ - epicsThreadSleep ( 100.0 ); + epicsThreadSleep ( 20.0 ); channel->destroy(); context->destroy();