From ff25642cd189cc691e36a158755e22f4d52db2fa Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 19 Jan 2011 20:01:56 +0100 Subject: [PATCH] process, getField implementation --- QtC-pvAccess.files | 73 +++- QtC-pvAccess.includes | 3 +- pvAccessApp/factory/CreateRequestFactory.cpp | 16 +- testApp/remote/testRemoteClientImpl.cpp | 371 ++++++++++++------- 4 files changed, 320 insertions(+), 143 deletions(-) diff --git a/QtC-pvAccess.files b/QtC-pvAccess.files index e0ea208..5ba5e53 100644 --- a/QtC-pvAccess.files +++ b/QtC-pvAccess.files @@ -1,3 +1,72 @@ +pvAccessApp/ca/version.cpp +pvAccessApp/client/pvAccess.cpp +pvAccessApp/factory/ChannelAccessFactory.cpp +pvAccessApp/factory/CreateRequestFactory.cpp +pvAccessApp/remote/abstractResponseHandler.cpp +pvAccessApp/remote/beaconEmitter.cpp +pvAccessApp/remote/beaconHandler.cpp +pvAccessApp/remote/beaconServerStatusProvider.cpp +pvAccessApp/remote/blockingClientTCPTransport.cpp +pvAccessApp/remote/blockingServerTCPTransport.cpp +pvAccessApp/remote/blockingTCPAcceptor.cpp +pvAccessApp/remote/blockingTCPConnector.cpp +pvAccessApp/remote/blockingTCPTransport.cpp +pvAccessApp/remote/blockingUDPConnector.cpp +pvAccessApp/remote/blockingUDPTransport.cpp +pvAccessApp/remote/channelSearchManager.cpp +pvAccessApp/server/responseHandlers.cpp +pvAccessApp/utils/configuration.cpp +pvAccessApp/utils/hexDump.cpp +pvAccessApp/utils/inetAddressUtil.cpp +pvAccessApp/utils/introspectionRegistry.cpp +pvAccessApp/utils/logger.cpp +pvAccessApp/utils/namedLockPattern.cpp +pvAccessApp/utils/referenceCountingLock.cpp +pvAccessApp/utils/transportRegistry.cpp +pvAccessApp/utils/wildcharMatcher.cpp +testApp/client/MockClientImpl.cpp +testApp/client/testChannelAccessFactory.cpp +testApp/client/testCreateRequest.cpp +testApp/client/testMockClient.cpp +testApp/remote/testBeaconEmitter.cpp +testApp/remote/testBeaconHandler.cpp +testApp/remote/testBlockingTCPClnt.cpp +testApp/remote/testBlockingTCPSrv.cpp +testApp/remote/testBlockingUDPClnt.cpp +testApp/remote/testBlockingUDPSrv.cpp +testApp/remote/testChannelSearchManager.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/utils/arrayFIFOTest.cpp +testApp/utils/configurationTest.cpp +testApp/utils/growingCircularBufferTest.cpp +testApp/utils/hexDumpTest.cpp +testApp/utils/inetAddressUtilsTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/namedLockPatternTest.cpp +testApp/utils/transportRegistryTest.cpp +testApp/utils/wildcharMatcherTest.cpp +pvAccessApp/ca/caConstants.h +pvAccessApp/ca/version.h pvAccessApp/client/pvAccess.h -pvAccessApp/client/ChannelAccessFactory.cpp -pvAccessApp/testClient/testChannelAccessFactory.cpp +pvAccessApp/remote/beaconEmitter.h +pvAccessApp/remote/beaconHandler.h +pvAccessApp/remote/beaconServerStatusProvider.h +pvAccessApp/remote/blockingTCP.h +pvAccessApp/remote/blockingUDP.h +pvAccessApp/remote/channelSearchManager.h +pvAccessApp/remote/remote.h +pvAccessApp/remoteClient/clientContextImpl.h +pvAccessApp/server/responseHandlers.h +pvAccessApp/server/serverContext.h +pvAccessApp/utils/arrayFIFO.h +pvAccessApp/utils/configuration.h +pvAccessApp/utils/growingCircularBuffer.h +pvAccessApp/utils/hexDump.h +pvAccessApp/utils/inetAddressUtil.h +pvAccessApp/utils/introspectionRegistry.h +pvAccessApp/utils/logger.h +pvAccessApp/utils/namedLockPattern.h +pvAccessApp/utils/referenceCountingLock.h +pvAccessApp/utils/transportRegistry.h +pvAccessApp/utils/wildcharMatcher.h diff --git a/QtC-pvAccess.includes b/QtC-pvAccess.includes index 3ee20f7..267e3ef 100644 --- a/QtC-pvAccess.includes +++ b/QtC-pvAccess.includes @@ -1 +1,2 @@ -pvAccessApp/client \ No newline at end of file +include +../pvDataCPP/include diff --git a/pvAccessApp/factory/CreateRequestFactory.cpp b/pvAccessApp/factory/CreateRequestFactory.cpp index ce9e05d..23fb95e 100644 --- a/pvAccessApp/factory/CreateRequestFactory.cpp +++ b/pvAccessApp/factory/CreateRequestFactory.cpp @@ -142,10 +142,13 @@ class CreateRequestImpl : public CreateRequest { requester->message("illegal option", errorMessage); return false; } - - PVString* pvStringField = static_cast(getPVDataCreate()->createPVScalar(pvParent, token.substr(0, equalsPos), pvString)); - pvStringField->put(token.substr(equalsPos+1)); - pvParent->appendPVField(pvStringField); + + if (equalsPos != std::string::npos) + { + PVString* pvStringField = static_cast(getPVDataCreate()->createPVScalar(pvParent, token.substr(0, equalsPos), pvString)); + pvStringField->put(token.substr(equalsPos+1)); + pvParent->appendPVField(pvStringField); + } } return true; } @@ -160,13 +163,14 @@ class CreateRequestImpl : public CreateRequest { { return getPVDataCreate()->createPVStructure(0, emptyString, 0); } - + size_t offsetRecord = request.find("record["); size_t offsetField = request.find("field("); size_t offsetPutField = request.find("putField("); size_t offsetGetField = request.find("getField("); PVStructure* pvStructure = getPVDataCreate()->createPVStructure(0, emptyString, 0); + if (offsetRecord != std::string::npos) { size_t offsetBegin = request.find('[', offsetRecord); size_t offsetEnd = request.find(']', offsetBegin); @@ -199,7 +203,7 @@ class CreateRequestImpl : public CreateRequest { delete pvStructure; return 0; } - pvStructure->appendPVField(pvStruct); + pvStructure->appendPVField(pvStruct); } if (offsetPutField != std::string::npos) { size_t offsetBegin = request.find('(', offsetPutField); diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index b92d1f1..ba9509b 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -221,135 +221,102 @@ public: -PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannelProcess); +PVDATA_REFCOUNT_MONITOR_DEFINE(channelProcess); -class ChannelImplProcess : public ChannelProcess +class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess { private: - ChannelProcessRequester* m_channelProcessRequester; - PVStructure* m_pvStructure; - PVScalar* m_valueField; + ChannelProcessRequester* m_callback; + volatile bool m_initialized; + PVStructure* m_pvRequest; private: - ~ChannelImplProcess() + ~ChannelProcessRequestImpl() { - PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannelProcess); + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelProcess); } public: - ChannelImplProcess(ChannelProcessRequester* channelProcessRequester, PVStructure *pvStructure, PVStructure *pvRequest) : - m_channelProcessRequester(channelProcessRequester), m_pvStructure(pvStructure) + ChannelProcessRequestImpl(ChannelImpl* channel, ChannelProcessRequester* callback, PVStructure *pvRequest) : + BaseRequestImpl(channel, callback), + m_callback(callback), m_initialized(false), m_pvRequest(pvRequest) { - PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelProcess); + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelProcess); - PVField* field = pvStructure->getSubField(String("value")); - if (field == 0) - { - Status* noValueFieldStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "no 'value' field"); - m_channelProcessRequester->channelProcessConnect(noValueFieldStatus, this); - delete noValueFieldStatus; + // TODO best-effort support - // NOTE client must destroy this instance... - // do not access any fields and return ASAP - return; - } - - if (field->getField()->getType() != scalar) - { - Status* notAScalarStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "'value' field not scalar type"); - m_channelProcessRequester->channelProcessConnect(notAScalarStatus, this); - delete notAScalarStatus; - - // NOTE client must destroy this instance…. - // do not access any fields and return ASAP - return; - } - - m_valueField = static_cast(field); - - // TODO pvRequest - m_channelProcessRequester->channelProcessConnect(g_statusOK, this); + // subscribe +// try { + resubscribeSubscription(channel->checkAndGetTransport()); +/* } catch (IllegalStateException ise) { + callback.channelProcessConnect(channelNotConnected, null); + } catch (CAException e) { + callback.channelProcessConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", e), null); + }*/ } + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { + BaseRequestImpl::send(buffer, control); + return; + } + + control->startMessage((int8)16, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { + // pvRequest + m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); + } + + stopRequest(); + } + + virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + m_callback->processDone(status); + return true; + } + + virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + m_callback->channelProcessConnect(status, this); + return true; + } + + virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + m_callback->processDone(status); + return true; + } + virtual void process(bool lastRequest) { - switch (m_valueField->getScalar()->getScalarType()) - { - case pvBoolean: - { - // negate - PVBoolean *pvBoolean = static_cast(m_valueField); - pvBoolean->put(!pvBoolean->get()); - break; - } - case pvByte: - { - // increment by one - PVByte *pvByte = static_cast(m_valueField); - pvByte->put(pvByte->get() + 1); - break; - } - case pvShort: - { - // increment by one - PVShort *pvShort = static_cast(m_valueField); - pvShort->put(pvShort->get() + 1); - break; - } - case pvInt: - { - // increment by one - PVInt *pvInt = static_cast(m_valueField); - pvInt->put(pvInt->get() + 1); - break; - } - case pvLong: - { - // increment by one - PVLong *pvLong = static_cast(m_valueField); - pvLong->put(pvLong->get() + 1); - break; - } - case pvFloat: - { - // increment by one - PVFloat *pvFloat = static_cast(m_valueField); - pvFloat->put(pvFloat->get() + 1.0f); - break; - } - case pvDouble: - { - // increment by one - PVDouble *pvDouble = static_cast(m_valueField); - pvDouble->put(pvDouble->get() + 1.0); - break; - } - case pvString: - { - // increment by one - PVString *pvString = static_cast(m_valueField); - String val = pvString->get(); - if (val.empty()) - pvString->put("gen0"); - else - { - char c = val[0]; - c++; - pvString->put("gen" + c); - } - break; - } - default: - // noop - break; - - } - m_channelProcessRequester->processDone(g_statusOK); - - if (lastRequest) - destroy(); + // TODO sync + if (m_destroyed) { + m_callback->processDone(destroyedStatus); + return; + } + + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + m_callback->processDone(otherRequestPendingStatus); + return; + } + + //try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + // m_callback->processDone(channelNotConnected); + //} } + virtual void resubscribeSubscription(Transport* transport) { + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); + } + virtual void destroy() { delete this; @@ -362,12 +329,12 @@ class ChannelImplProcess : public ChannelProcess + PVDATA_REFCOUNT_MONITOR_DEFINE(channelGet); -class ChannelImplGet : public BaseRequestImpl, public ChannelGet +class ChannelGetImpl : public BaseRequestImpl, public ChannelGet { private: - ChannelImpl* m_channel; ChannelGetRequester* m_channelGetRequester; PVStructure* m_pvRequest; @@ -376,15 +343,15 @@ class ChannelImplGet : public BaseRequestImpl, public ChannelGet BitSet* m_bitSet; private: - ~ChannelImplGet() + ~ChannelGetImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelGet); } public: - ChannelImplGet(ChannelImpl* channel, ChannelGetRequester* channelGetRequester, PVStructure *pvRequest) : + ChannelGetImpl(ChannelImpl* channel, ChannelGetRequester* channelGetRequester, PVStructure *pvRequest) : BaseRequestImpl(channel, channelGetRequester), - m_channel(channel), m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), // TODO pvRequest + m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), // TODO pvRequest m_data(0), m_bitSet(0) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGet); @@ -491,7 +458,9 @@ class ChannelImplGet : public BaseRequestImpl, public ChannelGet virtual void destroy() { -// delete m_bitSet; + // TODO sync + if (m_data) delete m_data; + if (m_bitSet) delete m_bitSet; delete this; } @@ -502,6 +471,139 @@ class ChannelImplGet : public BaseRequestImpl, public ChannelGet +PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField); + +class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender +{ + private: + ChannelImpl* m_channel; + ClientContextImpl* m_context; + pvAccessID m_ioid; + GetFieldRequester* m_callback; + String m_subField; + Mutex m_mutex; + bool m_destroyed; + + private: + ~ChannelGetFieldRequestImpl() + { + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelGetField); + } + + + public: + ChannelGetFieldRequestImpl(ChannelImpl* channel, GetFieldRequester* callback, String subField) : + m_channel(channel), m_context(channel->getContext()), + m_callback(callback), m_subField(subField), + m_destroyed(false) + { + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGetField); + + // register response request + m_ioid = m_context->registerResponseRequest(this); + channel->registerResponseRequest(this); + + // TODO + // enqueue send request + //try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + // callback.getDone(BaseRequestImpl.channelNotConnected, null); + //} + } + + Requester* getRequester() { + return m_callback; + } + + pvAccessID getIOID() { + return m_ioid; + } + + virtual void lock() { + // noop + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + control->startMessage((int8)17, 8); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + SerializeHelper::serializeString(m_subField, buffer, control); + } + + + virtual void cancel() { + destroy(); + // TODO notify? + } + + virtual void timeout() { + cancel(); + } + + void reportStatus(Status* status) { + // destroy, since channel (parent) was destroyed + if (status == ChannelImpl::channelDestroyed) + destroy(); + // TODO notify? + } + + virtual void unlock() { + // noop + } + + virtual void destroy() + { + { + Lock guard(&m_mutex); + if (m_destroyed) + return; + m_destroyed = true; + } + + // unregister response request + m_context->unregisterResponseRequest(this); + m_channel->unregisterResponseRequest(this); + + delete this; + } + + virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) { +// TODO? +// try +// { + Status* status = statusCreate->deserializeStatus(payloadBuffer, transport); + if (status->isSuccess()) + { + // deserialize Field... + const Field* field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport); + m_callback->getDone(status, field); + } + else + { + m_callback->getDone(status, 0); + } + + // TODO + if (status != okStatus) + delete status; +// } // TODO guard callback +// finally +// { + // always cancel request +// cancel(); +// } + + } + + +}; + + + + + + PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannelPut); @@ -1223,9 +1325,6 @@ class TestChannelImpl : public ChannelImpl { */ bool m_issueCreateMessage; - // TODO mock - PVStructure* m_pvStructure; - private: ~TestChannelImpl() { @@ -1273,7 +1372,6 @@ class TestChannelImpl : public ChannelImpl { virtual void destroy() { if (m_addresses) delete m_addresses; - delete m_pvStructure; delete this; }; @@ -1765,28 +1863,28 @@ class TestChannelImpl : public ChannelImpl { virtual void getField(GetFieldRequester *requester,epics::pvData::String subField) { - requester->getDone(g_statusOK,m_pvStructure->getSubField(subField)->getField()); + new ChannelGetFieldRequestImpl(this, requester, subField); } virtual ChannelProcess* createChannelProcess( ChannelProcessRequester *channelProcessRequester, epics::pvData::PVStructure *pvRequest) { - return new ChannelImplProcess(channelProcessRequester, m_pvStructure, pvRequest); + return new ChannelProcessRequestImpl(this, channelProcessRequester, pvRequest); } virtual ChannelGet* createChannelGet( ChannelGetRequester *channelGetRequester, epics::pvData::PVStructure *pvRequest) { - return new ChannelImplGet(this, channelGetRequester, pvRequest); + return new ChannelGetImpl(this, channelGetRequester, pvRequest); } virtual ChannelPut* createChannelPut( ChannelPutRequester *channelPutRequester, epics::pvData::PVStructure *pvRequest) { - return new ChannelImplPut(channelPutRequester, m_pvStructure, pvRequest); + return new ChannelImplPut(channelPutRequester, 0, pvRequest); } virtual ChannelPutGet* createChannelPutGet( @@ -1808,7 +1906,7 @@ class TestChannelImpl : public ChannelImpl { epics::pvData::MonitorRequester *monitorRequester, epics::pvData::PVStructure *pvRequest) { - return new MockMonitor(monitorRequester, m_pvStructure, pvRequest); + return new MockMonitor(monitorRequester, 0, pvRequest); } virtual ChannelArray* createChannelArray( @@ -2114,6 +2212,7 @@ class TestChannelImpl : public ChannelImpl { listenLocalAddress, CA_MINOR_PROTOCOL_REVISION, CA_DEFAULT_PRIORITY); + BlockingUDPConnector* searchConnector = new BlockingUDPConnector(false, broadcastAddresses, true); // undefined address @@ -2845,12 +2944,21 @@ int main(int argc,char *argv[]) ChannelRequesterImpl channelRequester; Channel* channel = context->getProvider()->createChannel("structureArrayTest", &channelRequester); channel->printInfo(); - - //GetFieldRequesterImpl getFieldRequesterImpl; - //channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch"); epicsThreadSleep ( 3.0 ); + GetFieldRequesterImpl getFieldRequesterImpl; + channel->getField(&getFieldRequesterImpl, ""); + + epicsThreadSleep ( 1.0 ); + + ChannelProcessRequesterImpl channelProcessRequester; + ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); + channelProcess->process(false); + channelProcess->destroy(); + + epicsThreadSleep ( 1.0 ); + ChannelGetRequesterImpl channelGetRequesterImpl; PVStructure* pvRequest = getCreateRequest()->createRequest("field(timeStamp,value)",&channelGetRequesterImpl); ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest); @@ -2875,11 +2983,6 @@ int main(int argc,char *argv[]) delete status; - ChannelProcessRequesterImpl channelProcessRequester; - ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); - channelProcess->process(false); - channelProcess->destroy(); - status = monitor->stop(); std::cout << "monitor->stop() = " << status->toString() << std::endl;