From ff25642cd189cc691e36a158755e22f4d52db2fa Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 19 Jan 2011 20:01:56 +0100 Subject: [PATCH 1/7] process, getField implementation --- QtC-pvAccess.files | 73 +++- QtC-pvAccess.includes | 3 +- pvAccessApp/factory/CreateRequestFactory.cpp | 16 +- testApp/remote/testRemoteClientImpl.cpp | 371 ++++++++++++------- 4 files changed, 320 insertions(+), 143 deletions(-) diff --git a/QtC-pvAccess.files b/QtC-pvAccess.files index e0ea208..5ba5e53 100644 --- a/QtC-pvAccess.files +++ b/QtC-pvAccess.files @@ -1,3 +1,72 @@ +pvAccessApp/ca/version.cpp +pvAccessApp/client/pvAccess.cpp +pvAccessApp/factory/ChannelAccessFactory.cpp +pvAccessApp/factory/CreateRequestFactory.cpp +pvAccessApp/remote/abstractResponseHandler.cpp +pvAccessApp/remote/beaconEmitter.cpp +pvAccessApp/remote/beaconHandler.cpp +pvAccessApp/remote/beaconServerStatusProvider.cpp +pvAccessApp/remote/blockingClientTCPTransport.cpp +pvAccessApp/remote/blockingServerTCPTransport.cpp +pvAccessApp/remote/blockingTCPAcceptor.cpp +pvAccessApp/remote/blockingTCPConnector.cpp +pvAccessApp/remote/blockingTCPTransport.cpp +pvAccessApp/remote/blockingUDPConnector.cpp +pvAccessApp/remote/blockingUDPTransport.cpp +pvAccessApp/remote/channelSearchManager.cpp +pvAccessApp/server/responseHandlers.cpp +pvAccessApp/utils/configuration.cpp +pvAccessApp/utils/hexDump.cpp +pvAccessApp/utils/inetAddressUtil.cpp +pvAccessApp/utils/introspectionRegistry.cpp +pvAccessApp/utils/logger.cpp +pvAccessApp/utils/namedLockPattern.cpp +pvAccessApp/utils/referenceCountingLock.cpp +pvAccessApp/utils/transportRegistry.cpp +pvAccessApp/utils/wildcharMatcher.cpp +testApp/client/MockClientImpl.cpp +testApp/client/testChannelAccessFactory.cpp +testApp/client/testCreateRequest.cpp +testApp/client/testMockClient.cpp +testApp/remote/testBeaconEmitter.cpp +testApp/remote/testBeaconHandler.cpp +testApp/remote/testBlockingTCPClnt.cpp +testApp/remote/testBlockingTCPSrv.cpp +testApp/remote/testBlockingUDPClnt.cpp +testApp/remote/testBlockingUDPSrv.cpp +testApp/remote/testChannelSearchManager.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/utils/arrayFIFOTest.cpp +testApp/utils/configurationTest.cpp +testApp/utils/growingCircularBufferTest.cpp +testApp/utils/hexDumpTest.cpp +testApp/utils/inetAddressUtilsTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/namedLockPatternTest.cpp +testApp/utils/transportRegistryTest.cpp +testApp/utils/wildcharMatcherTest.cpp +pvAccessApp/ca/caConstants.h +pvAccessApp/ca/version.h pvAccessApp/client/pvAccess.h -pvAccessApp/client/ChannelAccessFactory.cpp -pvAccessApp/testClient/testChannelAccessFactory.cpp +pvAccessApp/remote/beaconEmitter.h +pvAccessApp/remote/beaconHandler.h +pvAccessApp/remote/beaconServerStatusProvider.h +pvAccessApp/remote/blockingTCP.h +pvAccessApp/remote/blockingUDP.h +pvAccessApp/remote/channelSearchManager.h +pvAccessApp/remote/remote.h +pvAccessApp/remoteClient/clientContextImpl.h +pvAccessApp/server/responseHandlers.h +pvAccessApp/server/serverContext.h +pvAccessApp/utils/arrayFIFO.h +pvAccessApp/utils/configuration.h +pvAccessApp/utils/growingCircularBuffer.h +pvAccessApp/utils/hexDump.h +pvAccessApp/utils/inetAddressUtil.h +pvAccessApp/utils/introspectionRegistry.h +pvAccessApp/utils/logger.h +pvAccessApp/utils/namedLockPattern.h +pvAccessApp/utils/referenceCountingLock.h +pvAccessApp/utils/transportRegistry.h +pvAccessApp/utils/wildcharMatcher.h diff --git a/QtC-pvAccess.includes b/QtC-pvAccess.includes index 3ee20f7..267e3ef 100644 --- a/QtC-pvAccess.includes +++ b/QtC-pvAccess.includes @@ -1 +1,2 @@ -pvAccessApp/client \ No newline at end of file +include +../pvDataCPP/include diff --git a/pvAccessApp/factory/CreateRequestFactory.cpp b/pvAccessApp/factory/CreateRequestFactory.cpp index ce9e05d..23fb95e 100644 --- a/pvAccessApp/factory/CreateRequestFactory.cpp +++ b/pvAccessApp/factory/CreateRequestFactory.cpp @@ -142,10 +142,13 @@ class CreateRequestImpl : public CreateRequest { requester->message("illegal option", errorMessage); return false; } - - PVString* pvStringField = static_cast(getPVDataCreate()->createPVScalar(pvParent, token.substr(0, equalsPos), pvString)); - pvStringField->put(token.substr(equalsPos+1)); - pvParent->appendPVField(pvStringField); + + if (equalsPos != std::string::npos) + { + PVString* pvStringField = static_cast(getPVDataCreate()->createPVScalar(pvParent, token.substr(0, equalsPos), pvString)); + pvStringField->put(token.substr(equalsPos+1)); + pvParent->appendPVField(pvStringField); + } } return true; } @@ -160,13 +163,14 @@ class CreateRequestImpl : public CreateRequest { { return getPVDataCreate()->createPVStructure(0, emptyString, 0); } - + size_t offsetRecord = request.find("record["); size_t offsetField = request.find("field("); size_t offsetPutField = request.find("putField("); size_t offsetGetField = request.find("getField("); PVStructure* pvStructure = getPVDataCreate()->createPVStructure(0, emptyString, 0); + if (offsetRecord != std::string::npos) { size_t offsetBegin = request.find('[', offsetRecord); size_t offsetEnd = request.find(']', offsetBegin); @@ -199,7 +203,7 @@ class CreateRequestImpl : public CreateRequest { delete pvStructure; return 0; } - pvStructure->appendPVField(pvStruct); + pvStructure->appendPVField(pvStruct); } if (offsetPutField != std::string::npos) { size_t offsetBegin = request.find('(', offsetPutField); diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index b92d1f1..ba9509b 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -221,135 +221,102 @@ public: -PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannelProcess); +PVDATA_REFCOUNT_MONITOR_DEFINE(channelProcess); -class ChannelImplProcess : public ChannelProcess +class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess { private: - ChannelProcessRequester* m_channelProcessRequester; - PVStructure* m_pvStructure; - PVScalar* m_valueField; + ChannelProcessRequester* m_callback; + volatile bool m_initialized; + PVStructure* m_pvRequest; private: - ~ChannelImplProcess() + ~ChannelProcessRequestImpl() { - PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannelProcess); + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelProcess); } public: - ChannelImplProcess(ChannelProcessRequester* channelProcessRequester, PVStructure *pvStructure, PVStructure *pvRequest) : - m_channelProcessRequester(channelProcessRequester), m_pvStructure(pvStructure) + ChannelProcessRequestImpl(ChannelImpl* channel, ChannelProcessRequester* callback, PVStructure *pvRequest) : + BaseRequestImpl(channel, callback), + m_callback(callback), m_initialized(false), m_pvRequest(pvRequest) { - PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelProcess); + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelProcess); - PVField* field = pvStructure->getSubField(String("value")); - if (field == 0) - { - Status* noValueFieldStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "no 'value' field"); - m_channelProcessRequester->channelProcessConnect(noValueFieldStatus, this); - delete noValueFieldStatus; + // TODO best-effort support - // NOTE client must destroy this instance... - // do not access any fields and return ASAP - return; - } - - if (field->getField()->getType() != scalar) - { - Status* notAScalarStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "'value' field not scalar type"); - m_channelProcessRequester->channelProcessConnect(notAScalarStatus, this); - delete notAScalarStatus; - - // NOTE client must destroy this instance…. - // do not access any fields and return ASAP - return; - } - - m_valueField = static_cast(field); - - // TODO pvRequest - m_channelProcessRequester->channelProcessConnect(g_statusOK, this); + // subscribe +// try { + resubscribeSubscription(channel->checkAndGetTransport()); +/* } catch (IllegalStateException ise) { + callback.channelProcessConnect(channelNotConnected, null); + } catch (CAException e) { + callback.channelProcessConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", e), null); + }*/ } + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { + BaseRequestImpl::send(buffer, control); + return; + } + + control->startMessage((int8)16, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { + // pvRequest + m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); + } + + stopRequest(); + } + + virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + m_callback->processDone(status); + return true; + } + + virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + m_callback->channelProcessConnect(status, this); + return true; + } + + virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + m_callback->processDone(status); + return true; + } + virtual void process(bool lastRequest) { - switch (m_valueField->getScalar()->getScalarType()) - { - case pvBoolean: - { - // negate - PVBoolean *pvBoolean = static_cast(m_valueField); - pvBoolean->put(!pvBoolean->get()); - break; - } - case pvByte: - { - // increment by one - PVByte *pvByte = static_cast(m_valueField); - pvByte->put(pvByte->get() + 1); - break; - } - case pvShort: - { - // increment by one - PVShort *pvShort = static_cast(m_valueField); - pvShort->put(pvShort->get() + 1); - break; - } - case pvInt: - { - // increment by one - PVInt *pvInt = static_cast(m_valueField); - pvInt->put(pvInt->get() + 1); - break; - } - case pvLong: - { - // increment by one - PVLong *pvLong = static_cast(m_valueField); - pvLong->put(pvLong->get() + 1); - break; - } - case pvFloat: - { - // increment by one - PVFloat *pvFloat = static_cast(m_valueField); - pvFloat->put(pvFloat->get() + 1.0f); - break; - } - case pvDouble: - { - // increment by one - PVDouble *pvDouble = static_cast(m_valueField); - pvDouble->put(pvDouble->get() + 1.0); - break; - } - case pvString: - { - // increment by one - PVString *pvString = static_cast(m_valueField); - String val = pvString->get(); - if (val.empty()) - pvString->put("gen0"); - else - { - char c = val[0]; - c++; - pvString->put("gen" + c); - } - break; - } - default: - // noop - break; - - } - m_channelProcessRequester->processDone(g_statusOK); - - if (lastRequest) - destroy(); + // TODO sync + if (m_destroyed) { + m_callback->processDone(destroyedStatus); + return; + } + + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + m_callback->processDone(otherRequestPendingStatus); + return; + } + + //try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + // m_callback->processDone(channelNotConnected); + //} } + virtual void resubscribeSubscription(Transport* transport) { + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); + } + virtual void destroy() { delete this; @@ -362,12 +329,12 @@ class ChannelImplProcess : public ChannelProcess + PVDATA_REFCOUNT_MONITOR_DEFINE(channelGet); -class ChannelImplGet : public BaseRequestImpl, public ChannelGet +class ChannelGetImpl : public BaseRequestImpl, public ChannelGet { private: - ChannelImpl* m_channel; ChannelGetRequester* m_channelGetRequester; PVStructure* m_pvRequest; @@ -376,15 +343,15 @@ class ChannelImplGet : public BaseRequestImpl, public ChannelGet BitSet* m_bitSet; private: - ~ChannelImplGet() + ~ChannelGetImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelGet); } public: - ChannelImplGet(ChannelImpl* channel, ChannelGetRequester* channelGetRequester, PVStructure *pvRequest) : + ChannelGetImpl(ChannelImpl* channel, ChannelGetRequester* channelGetRequester, PVStructure *pvRequest) : BaseRequestImpl(channel, channelGetRequester), - m_channel(channel), m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), // TODO pvRequest + m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), // TODO pvRequest m_data(0), m_bitSet(0) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGet); @@ -491,7 +458,9 @@ class ChannelImplGet : public BaseRequestImpl, public ChannelGet virtual void destroy() { -// delete m_bitSet; + // TODO sync + if (m_data) delete m_data; + if (m_bitSet) delete m_bitSet; delete this; } @@ -502,6 +471,139 @@ class ChannelImplGet : public BaseRequestImpl, public ChannelGet +PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField); + +class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender +{ + private: + ChannelImpl* m_channel; + ClientContextImpl* m_context; + pvAccessID m_ioid; + GetFieldRequester* m_callback; + String m_subField; + Mutex m_mutex; + bool m_destroyed; + + private: + ~ChannelGetFieldRequestImpl() + { + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelGetField); + } + + + public: + ChannelGetFieldRequestImpl(ChannelImpl* channel, GetFieldRequester* callback, String subField) : + m_channel(channel), m_context(channel->getContext()), + m_callback(callback), m_subField(subField), + m_destroyed(false) + { + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGetField); + + // register response request + m_ioid = m_context->registerResponseRequest(this); + channel->registerResponseRequest(this); + + // TODO + // enqueue send request + //try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + // callback.getDone(BaseRequestImpl.channelNotConnected, null); + //} + } + + Requester* getRequester() { + return m_callback; + } + + pvAccessID getIOID() { + return m_ioid; + } + + virtual void lock() { + // noop + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + control->startMessage((int8)17, 8); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + SerializeHelper::serializeString(m_subField, buffer, control); + } + + + virtual void cancel() { + destroy(); + // TODO notify? + } + + virtual void timeout() { + cancel(); + } + + void reportStatus(Status* status) { + // destroy, since channel (parent) was destroyed + if (status == ChannelImpl::channelDestroyed) + destroy(); + // TODO notify? + } + + virtual void unlock() { + // noop + } + + virtual void destroy() + { + { + Lock guard(&m_mutex); + if (m_destroyed) + return; + m_destroyed = true; + } + + // unregister response request + m_context->unregisterResponseRequest(this); + m_channel->unregisterResponseRequest(this); + + delete this; + } + + virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) { +// TODO? +// try +// { + Status* status = statusCreate->deserializeStatus(payloadBuffer, transport); + if (status->isSuccess()) + { + // deserialize Field... + const Field* field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport); + m_callback->getDone(status, field); + } + else + { + m_callback->getDone(status, 0); + } + + // TODO + if (status != okStatus) + delete status; +// } // TODO guard callback +// finally +// { + // always cancel request +// cancel(); +// } + + } + + +}; + + + + + + PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannelPut); @@ -1223,9 +1325,6 @@ class TestChannelImpl : public ChannelImpl { */ bool m_issueCreateMessage; - // TODO mock - PVStructure* m_pvStructure; - private: ~TestChannelImpl() { @@ -1273,7 +1372,6 @@ class TestChannelImpl : public ChannelImpl { virtual void destroy() { if (m_addresses) delete m_addresses; - delete m_pvStructure; delete this; }; @@ -1765,28 +1863,28 @@ class TestChannelImpl : public ChannelImpl { virtual void getField(GetFieldRequester *requester,epics::pvData::String subField) { - requester->getDone(g_statusOK,m_pvStructure->getSubField(subField)->getField()); + new ChannelGetFieldRequestImpl(this, requester, subField); } virtual ChannelProcess* createChannelProcess( ChannelProcessRequester *channelProcessRequester, epics::pvData::PVStructure *pvRequest) { - return new ChannelImplProcess(channelProcessRequester, m_pvStructure, pvRequest); + return new ChannelProcessRequestImpl(this, channelProcessRequester, pvRequest); } virtual ChannelGet* createChannelGet( ChannelGetRequester *channelGetRequester, epics::pvData::PVStructure *pvRequest) { - return new ChannelImplGet(this, channelGetRequester, pvRequest); + return new ChannelGetImpl(this, channelGetRequester, pvRequest); } virtual ChannelPut* createChannelPut( ChannelPutRequester *channelPutRequester, epics::pvData::PVStructure *pvRequest) { - return new ChannelImplPut(channelPutRequester, m_pvStructure, pvRequest); + return new ChannelImplPut(channelPutRequester, 0, pvRequest); } virtual ChannelPutGet* createChannelPutGet( @@ -1808,7 +1906,7 @@ class TestChannelImpl : public ChannelImpl { epics::pvData::MonitorRequester *monitorRequester, epics::pvData::PVStructure *pvRequest) { - return new MockMonitor(monitorRequester, m_pvStructure, pvRequest); + return new MockMonitor(monitorRequester, 0, pvRequest); } virtual ChannelArray* createChannelArray( @@ -2114,6 +2212,7 @@ class TestChannelImpl : public ChannelImpl { listenLocalAddress, CA_MINOR_PROTOCOL_REVISION, CA_DEFAULT_PRIORITY); + BlockingUDPConnector* searchConnector = new BlockingUDPConnector(false, broadcastAddresses, true); // undefined address @@ -2845,12 +2944,21 @@ int main(int argc,char *argv[]) ChannelRequesterImpl channelRequester; Channel* channel = context->getProvider()->createChannel("structureArrayTest", &channelRequester); channel->printInfo(); - - //GetFieldRequesterImpl getFieldRequesterImpl; - //channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch"); epicsThreadSleep ( 3.0 ); + GetFieldRequesterImpl getFieldRequesterImpl; + channel->getField(&getFieldRequesterImpl, ""); + + epicsThreadSleep ( 1.0 ); + + ChannelProcessRequesterImpl channelProcessRequester; + ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); + channelProcess->process(false); + channelProcess->destroy(); + + epicsThreadSleep ( 1.0 ); + ChannelGetRequesterImpl channelGetRequesterImpl; PVStructure* pvRequest = getCreateRequest()->createRequest("field(timeStamp,value)",&channelGetRequesterImpl); ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest); @@ -2875,11 +2983,6 @@ int main(int argc,char *argv[]) delete status; - ChannelProcessRequesterImpl channelProcessRequester; - ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); - channelProcess->process(false); - channelProcess->destroy(); - status = monitor->stop(); std::cout << "monitor->stop() = " << status->toString() << std::endl; From d4d82f97886109f5a4412c1829e75642d8a55246 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 19 Jan 2011 21:24:41 +0100 Subject: [PATCH 2/7] channelPut --- testApp/remote/testRemoteClientImpl.cpp | 257 ++++++++++++++++++------ 1 file changed, 198 insertions(+), 59 deletions(-) diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index ba9509b..a317037 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -227,7 +227,6 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess { private: ChannelProcessRequester* m_callback; - volatile bool m_initialized; PVStructure* m_pvRequest; private: @@ -239,10 +238,12 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess public: ChannelProcessRequestImpl(ChannelImpl* channel, ChannelProcessRequester* callback, PVStructure *pvRequest) : BaseRequestImpl(channel, callback), - m_callback(callback), m_initialized(false), m_pvRequest(pvRequest) + m_callback(callback), m_pvRequest(pvRequest) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelProcess); + // TODO check for nulls!!!! + // TODO best-effort support // subscribe @@ -267,7 +268,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); buffer->putByte((int8)m_pendingRequest); - + if (pendingRequest & QOS_INIT) { // pvRequest @@ -471,6 +472,187 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet + + + + + +PVDATA_REFCOUNT_MONITOR_DEFINE(channelPut); + +class ChannelPutImpl : public BaseRequestImpl, public ChannelPut +{ + private: + ChannelPutRequester* m_channelPutRequester; + + PVStructure* m_pvRequest; + + PVStructure* m_data; + BitSet* m_bitSet; + + private: + ~ChannelPutImpl() + { + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelPut); + } + + public: + ChannelPutImpl(ChannelImpl* channel, ChannelPutRequester* channelPutRequester, PVStructure *pvRequest) : + BaseRequestImpl(channel, channelPutRequester), + m_channelPutRequester(channelPutRequester), m_pvRequest(pvRequest), // TODO pvRequest + m_data(0), m_bitSet(0) + { + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelPut); + + // TODO low-overhead put + // TODO best-effort put + + // subscribe +// try { + resubscribeSubscription(m_channel->checkAndGetTransport()); +// } catch (IllegalStateException ise) { +// TODO m_channelPutRequester->channelPutConnect(channelNotConnected, null, null, null); +// } catch (CAException caex) { +// TODO m_channelPutRequester->channelPutConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null); +// } + + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { + BaseRequestImpl::send(buffer, control); + return; + } + + control->startMessage((int8)11, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { + // pvRequest + m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); + } + else if (!(pendingRequest & QOS_GET)) + { + // put + // serialize only what has been changed + m_bitSet->serialize(buffer, control); + m_data->serialize(buffer, control, m_bitSet); + } + + stopRequest(); + } + + virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + m_channelPutRequester->putDone(status); + return true; + } + + virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (!status->isSuccess()) + { + m_channelPutRequester->channelPutConnect(status, this, 0, 0); + return true; + } + + // create data and its bitSet + m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + m_bitSet = new BitSet(m_data->getNumberFields()); + + // notify + m_channelPutRequester->channelPutConnect(okStatus, this, m_data, m_bitSet); + return true; + } + + virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (qos & QOS_GET) + { + if (!status->isSuccess()) + { + m_channelPutRequester->getDone(status); + return true; + } + + m_data->deserialize(payloadBuffer, transport); + m_channelPutRequester->getDone(status); + return true; + } + else + { + m_channelPutRequester->putDone(okStatus); + return true; + } + } + + virtual void get() { + // TODO sync? + + if (m_destroyed) { + m_channelPutRequester->getDone(destroyedStatus); + return; + } + + if (!startRequest(QOS_GET)) { + m_channelPutRequester->getDone(otherRequestPendingStatus); + return; + } + + +// try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); +// } catch (IllegalStateException ise) { +// m_channelPutRequester->getDone(channelNotConnected); +// } + } + + virtual void put(bool lastRequest) { + // TODO sync? + + if (m_destroyed) { + m_channelPutRequester->putDone(destroyedStatus); + return; + } + + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + m_channelPutRequester->putDone(otherRequestPendingStatus); + return; + } + + //try { + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + //TODO // m_channelPutRequester->putDone(channelNotConnected); + //} + } + + virtual void resubscribeSubscription(Transport* transport) { + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); + } + + + virtual void destroy() + { + // TODO sync + if (m_data) delete m_data; + if (m_bitSet) delete m_bitSet; + delete this; + } + +}; + + + + + + + + + + PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField); class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender @@ -606,54 +788,6 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender -PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannelPut); - -class ChannelImplPut : public ChannelPut -{ - private: - ChannelPutRequester* m_channelPutRequester; - PVStructure* m_pvStructure; - BitSet* m_bitSet; - volatile bool m_first; - - private: - ~ChannelImplPut() - { - PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannelPut); - } - - public: - ChannelImplPut(ChannelPutRequester* channelPutRequester, PVStructure *pvStructure, PVStructure *pvRequest) : - m_channelPutRequester(channelPutRequester), m_pvStructure(pvStructure), - m_bitSet(new BitSet(pvStructure->getNumberFields())), m_first(true) - { - PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelPut); - - // TODO pvRequest - m_channelPutRequester->channelPutConnect(g_statusOK, this, m_pvStructure, m_bitSet); - } - - virtual void put(bool lastRequest) - { - m_channelPutRequester->putDone(g_statusOK); - if (lastRequest) - destroy(); - } - - virtual void get() - { - m_channelPutRequester->getDone(g_statusOK); - } - - virtual void destroy() - { - delete m_bitSet; - delete this; - } - -}; - - @@ -1884,7 +2018,7 @@ class TestChannelImpl : public ChannelImpl { ChannelPutRequester *channelPutRequester, epics::pvData::PVStructure *pvRequest) { - return new ChannelImplPut(channelPutRequester, 0, pvRequest); + return new ChannelPutImpl(this, channelPutRequester, pvRequest); } virtual ChannelPutGet* createChannelPutGet( @@ -2949,14 +3083,14 @@ int main(int argc,char *argv[]) GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); - epicsThreadSleep ( 1.0 ); ChannelProcessRequesterImpl channelProcessRequester; ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); + epicsThreadSleep ( 1.0 ); channelProcess->process(false); + epicsThreadSleep ( 1.0 ); channelProcess->destroy(); - epicsThreadSleep ( 1.0 ); ChannelGetRequesterImpl channelGetRequesterImpl; @@ -2965,16 +3099,21 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 3.0 ); channelGet->get(false); epicsThreadSleep ( 3.0 ); - channelGet->destroy(); - // TODO delete pvRequest -/* + //TODOchannelGet->destroy(); + epicsThreadSleep ( 1.0 ); + ChannelPutRequesterImpl channelPutRequesterImpl; - ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, 0); + ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest); + epicsThreadSleep ( 1.0 ); channelPut->get(); + epicsThreadSleep ( 1.0 ); channelPut->put(false); - channelPut->destroy(); + epicsThreadSleep ( 1.0 ); + //TODOchannelPut->destroy(); + // TODO delete pvRequest +/* MonitorRequesterImpl monitorRequesterImpl; Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0); From b6bbfc7ddf915c8b3ee626842be719136b22ab3f Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 19 Jan 2011 21:39:51 +0100 Subject: [PATCH 3/7] uncommented --- testApp/utils/introspectionRegistryTest.cpp | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/testApp/utils/introspectionRegistryTest.cpp b/testApp/utils/introspectionRegistryTest.cpp index 1770875..ab9694c 100644 --- a/testApp/utils/introspectionRegistryTest.cpp +++ b/testApp/utils/introspectionRegistryTest.cpp @@ -407,12 +407,6 @@ void testSerializeStatus() int main(int argc, char *argv[]) { - cout << "DONE0" << endl; - cout << "DONE0" << endl; - cout << "DONE0" << endl; - cout << "DONE0" << endl; - cout << "DONE0" << endl; -/* pvDataCreate = getPVDataCreate(); statusCreate = getStatusCreate(); fieldCreate = getFieldCreate(); @@ -451,6 +445,6 @@ int main(int argc, char *argv[]) if(serverRegistry) delete serverRegistry; getShowConstructDestruct()->showDeleteStaticExit(stdout); - cout << "DONE" << endl;*/ + cout << "DONE" << endl; return 0; } From c6d51067d1f8e82c6f0142a6d63411c91570bc8f Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Thu, 20 Jan 2011 17:47:13 +0100 Subject: [PATCH 4/7] UDP transport cleanup --- pvAccessApp/remote/blockingUDP.h | 84 +++++++------ pvAccessApp/remote/blockingUDPConnector.cpp | 44 +++---- pvAccessApp/remote/blockingUDPTransport.cpp | 123 ++++++++++-------- testApp/remote/testBeaconEmitter.cpp | 26 ++-- testApp/remote/testBeaconHandler.cpp | 10 +- testApp/remote/testBlockingUDPClnt.cpp | 39 +++--- testApp/remote/testBlockingUDPSrv.cpp | 23 ++-- testApp/remote/testRemoteClientImpl.cpp | 130 +++++++++----------- 8 files changed, 233 insertions(+), 246 deletions(-) diff --git a/pvAccessApp/remote/blockingUDP.h b/pvAccessApp/remote/blockingUDP.h index 43b1b6f..d0b7990 100644 --- a/pvAccessApp/remote/blockingUDP.h +++ b/pvAccessApp/remote/blockingUDP.h @@ -17,7 +17,7 @@ #include #include #include -#include +#include /* EPICSv3 */ #include @@ -33,7 +33,6 @@ namespace epics { public: BlockingUDPTransport(ResponseHandler* responseHandler, SOCKET channel, osiSockAddr& bindAddress, - InetAddrVector* sendAddresses, short remoteTransportRevision); virtual ~BlockingUDPTransport(); @@ -43,7 +42,7 @@ namespace epics { } virtual const osiSockAddr* getRemoteAddress() const { - return _socketAddress; + return &_bindAddress; } virtual const String getType() const { @@ -97,7 +96,7 @@ namespace epics { virtual void close(bool forced); virtual void ensureData(int size) { - // TODO Auto-generated method stub + // noop } virtual void startMessage(int8 command, int ensureCapacity); @@ -108,13 +107,12 @@ namespace epics { } virtual void setRecipient(const osiSockAddr& sendTo) { - if(_sendTo!=NULL) delete _sendTo; - _sendTo = new osiSockAddr; - memcpy(_sendTo, &sendTo, sizeof(osiSockAddr)); + _sendToEnabled = true; + _sendTo = sendTo; } virtual void flushSerializeBuffer() { - // TODO Auto-generated method stub + // noop } virtual void ensureBuffer(int size) { @@ -126,7 +124,15 @@ namespace epics { * @param addresses list of ignored addresses. */ void setIgnoredAddresses(InetAddrVector* addresses) { - _ignoredAddresses = addresses; + if (addresses) + { + if (!_ignoredAddresses) _ignoredAddresses = new InetAddrVector; + *_ignoredAddresses = *addresses; + } + else + { + if (_ignoredAddresses) { delete _ignoredAddresses; _ignoredAddresses = 0; } + } } /** @@ -154,7 +160,7 @@ namespace epics { * @return bind address. */ const osiSockAddr* getBindAddress() const { - return _bindAddress; + return &_bindAddress; } /** @@ -162,11 +168,19 @@ namespace epics { * @param addresses list of send addresses, non-null. */ void setBroadcastAddresses(InetAddrVector* addresses) { - _sendAddresses = addresses; + if (addresses) + { + if (!_sendAddresses) _sendAddresses = new InetAddrVector; + *_sendAddresses = *addresses; + } + else + { + if (_sendAddresses) { delete _sendAddresses; _sendAddresses = 0; } + } } virtual IntrospectionRegistry* getIntrospectionRegistry() { - THROW_BASE_EXCEPTION("not supported by UDP transport"); + return 0; } protected: @@ -184,6 +198,8 @@ namespace epics { bool processBuffer(osiSockAddr& fromAddress, epics::pvData::ByteBuffer* receiveBuffer); + void close(bool forced, bool waitForThreadToComplete); + // Context only used for logging in this class /** @@ -191,15 +207,10 @@ namespace epics { */ SOCKET _channel; - /** - * Cached socket address. - */ - osiSockAddr* _socketAddress; - /** * Bind address. */ - osiSockAddr* _bindAddress; + osiSockAddr _bindAddress; /** * Send addresses. @@ -211,8 +222,12 @@ namespace epics { */ InetAddrVector* _ignoredAddresses; - osiSockAddr* _sendTo; - + /** + * Send address. + */ + osiSockAddr _sendTo; + bool _sendToEnabled; + /** * Receive buffer. */ @@ -228,15 +243,12 @@ namespace epics { */ int _lastMessageStartPosition; - /** - * Read buffer - */ - char* _readBuffer; - /** * Used for process sync. */ - Mutex* _mutex; + Mutex _mutex; + Mutex _sendMutex; + Event _shutdownEvent; /** * Thread ID @@ -245,18 +257,19 @@ namespace epics { }; - class BlockingUDPConnector : public Connector, - public epics::pvData::NoDefaultMethods { + class BlockingUDPConnector : + public Connector, + private epics::pvData::NoDefaultMethods { public: - BlockingUDPConnector(bool reuseSocket, - InetAddrVector* sendAddresses, bool broadcast) : - _sendAddresses(sendAddresses), _reuseSocket(reuseSocket), - _broadcast(broadcast) { + BlockingUDPConnector( + bool reuseSocket, + bool broadcast) : + _reuseSocket(reuseSocket), + _broadcast(broadcast) { } virtual ~BlockingUDPConnector() { - // TODO: delete _sendAddresses here? } /** @@ -268,11 +281,6 @@ namespace epics { private: - /** - * Send address. - */ - InetAddrVector* _sendAddresses; - /** * Reuse socket flag. */ diff --git a/pvAccessApp/remote/blockingUDPConnector.cpp b/pvAccessApp/remote/blockingUDPConnector.cpp index 00225bc..6f66853 100644 --- a/pvAccessApp/remote/blockingUDPConnector.cpp +++ b/pvAccessApp/remote/blockingUDPConnector.cpp @@ -9,9 +9,6 @@ #include "blockingUDP.h" #include "remote.h" -/* pvData */ -#include - /* EPICSv3 */ #include #include @@ -26,6 +23,7 @@ namespace epics { Transport* BlockingUDPConnector::connect(TransportClient* client, ResponseHandler* responseHandler, osiSockAddr& bindAddress, short transportRevision, int16 priority) { + errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s", inetAddressToString(&bindAddress).c_str()); @@ -33,40 +31,32 @@ namespace epics { if(socket==INVALID_SOCKET) { char errStr[64]; epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); - errlogSevPrintf(errlogMajor, "Error creating socket: %s", - errStr); + errlogSevPrintf(errlogMajor, "Error creating socket: %s", errStr); + return 0; } int optval = _broadcast ? 1 : 0; - int retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval, - sizeof(optval)); - if(retval<0) errlogSevPrintf(errlogMajor, - "Error setting SO_BROADCAST: %s", strerror(errno)); - - // set the socket options + int retval = ::setsockopt(socket, SOL_SOCKET, SO_BROADCAST, &optval, sizeof(optval)); + if(retval<0) + { + errlogSevPrintf(errlogMajor, "Error setting SO_BROADCAST: %s", strerror(errno)); + epicsSocketDestroy (socket); + return 0; + } + + // set SO_REUSEADDR or SO_REUSEPORT, OS dependant if (_reuseSocket) epicsSocketEnableAddressUseForDatagramFanout(socket); -/* - optval = _reuseSocket ? 1 : 0; - // or SO_REUSEADDR, OS dependant - retval = ::setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, &optval, - sizeof(optval)); - if(retval<0) errlogSevPrintf(errlogMajor, - "Error setting SO_REUSEADDR: %s", strerror(errno)); -*/ - retval = ::bind(socket, (sockaddr*)&(bindAddress.sa), - sizeof(sockaddr)); + retval = ::bind(socket, (sockaddr*)&(bindAddress.sa), sizeof(sockaddr)); if(retval<0) { - errlogSevPrintf(errlogMajor, "Error binding socket: %s", - strerror(errno)); - THROW_BASE_EXCEPTION(strerror(errno)); + errlogSevPrintf(errlogMajor, "Error binding socket: %s", strerror(errno)); + epicsSocketDestroy (socket); + return 0; } // sockets are blocking by default - - return new BlockingUDPTransport(responseHandler, socket, - bindAddress, _sendAddresses, transportRevision); + return new BlockingUDPTransport(responseHandler, socket, bindAddress, transportRevision); } } diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 7e5cba5..08a00db 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -33,83 +33,85 @@ namespace epics { BlockingUDPTransport::BlockingUDPTransport( ResponseHandler* responseHandler, SOCKET channel, - osiSockAddr& bindAddress, InetAddrVector* sendAddresses, + osiSockAddr& bindAddress, short remoteTransportRevision) : _closed(false), _responseHandler(responseHandler), _channel(channel), - _sendAddresses(sendAddresses), - _ignoredAddresses(NULL), - _sendTo(NULL), - _receiveBuffer(new ByteBuffer(MAX_UDP_RECV, - EPICS_ENDIAN_BIG)), + _bindAddress(bindAddress), + _sendAddresses(0), + _ignoredAddresses(0), + _sendToEnabled(false), + _receiveBuffer(new ByteBuffer(MAX_UDP_RECV, EPICS_ENDIAN_BIG)), _sendBuffer(new ByteBuffer(MAX_UDP_RECV, EPICS_ENDIAN_BIG)), - _lastMessageStartPosition(0), _readBuffer( - new char[MAX_UDP_RECV]), _mutex(new Mutex()), - _threadId(NULL) { - _socketAddress = new osiSockAddr; - memcpy(_socketAddress, &bindAddress, sizeof(osiSockAddr)); - _bindAddress = _socketAddress; - + _lastMessageStartPosition(0), + _threadId(0) + { } BlockingUDPTransport::~BlockingUDPTransport() { close(true); // close the socket and stop the thread. - if(_sendTo!=NULL) delete _sendTo; - delete _socketAddress; - // _bindAddress equals _socketAddress + + if (_sendAddresses) delete _sendAddresses; + if (_ignoredAddresses) delete _ignoredAddresses; delete _receiveBuffer; delete _sendBuffer; - delete[] _readBuffer; - delete _mutex; } void BlockingUDPTransport::start() { - String threadName = "UDP-receive "+inetAddressToString( - _socketAddress); + String threadName = "UDP-receive "+inetAddressToString(&_bindAddress); - errlogSevPrintf(errlogInfo, "Starting thread: %s", - threadName.c_str()); + errlogSevPrintf(errlogInfo, "Starting thread: %s",threadName.c_str()); _threadId = epicsThreadCreate(threadName.c_str(), - epicsThreadPriorityMedium, epicsThreadGetStackSize( - epicsThreadStackMedium), + epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackMedium), BlockingUDPTransport::threadRunner, this); } void BlockingUDPTransport::close(bool forced) { - if(_closed) return; - _closed = true; + close(forced, true); + } - if(_bindAddress!=NULL) errlogSevPrintf(errlogInfo, + void BlockingUDPTransport::close(bool forced, bool waitForThreadToComplete) { + { + Lock guard(&_mutex); + if(_closed) return; + _closed = true; + + errlogSevPrintf(errlogInfo, "UDP socket %s closed.", - inetAddressToString(_bindAddress).c_str()); - - epicsSocketDestroy(_channel); + inetAddressToString(&_bindAddress).c_str()); + + epicsSocketDestroy(_channel); + } + + // wait for send thread to exit cleanly + if (waitForThreadToComplete) + _shutdownEvent.wait(3.0); } void BlockingUDPTransport::enqueueSendRequest(TransportSender* sender) { - Lock lock(_mutex); + Lock lock(&_sendMutex); - _sendTo = NULL; + _sendToEnabled = false; _sendBuffer->clear(); sender->lock(); try { sender->send(_sendBuffer, this); sender->unlock(); endMessage(); - if(_sendTo==NULL) + if(!_sendToEnabled) send(_sendBuffer); else - send(_sendBuffer, *_sendTo); + send(_sendBuffer, _sendTo); } catch(...) { sender->unlock(); } } - void BlockingUDPTransport::startMessage(int8 command, - int ensureCapacity) { + void BlockingUDPTransport::startMessage(int8 command, int ensureCapacity) { _lastMessageStartPosition = _sendBuffer->getPosition(); _sendBuffer->putShort(CA_MAGIC_AND_VERSION); _sendBuffer->putByte(0); // data @@ -118,21 +120,29 @@ namespace epics { } void BlockingUDPTransport::endMessage() { - _sendBuffer->putInt(_lastMessageStartPosition+(sizeof(int16)+2), - _sendBuffer->getPosition()-_lastMessageStartPosition - -CA_MESSAGE_HEADER_SIZE); - + _sendBuffer->putInt( + _lastMessageStartPosition+(sizeof(int16)+2), + _sendBuffer->getPosition()-_lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE); } void BlockingUDPTransport::processRead() { // This function is always called from only one thread - this // object's own thread. + char _readBuffer[MAX_UDP_RECV]; osiSockAddr fromAddress; try { + bool closed; while(!_closed) { + + _mutex.lock(); + closed = _closed; + _mutex.unlock(); + if (closed) + break; + // we poll to prevent blocking indefinitely // data ready to be read @@ -147,18 +157,23 @@ namespace epics { if(bytesRead>0) { // successfully got datagram bool ignore = false; - if(_ignoredAddresses!=NULL) for(size_t i = 0; i - <_ignoredAddresses->size(); i++) + if(_ignoredAddresses!=0) + { + for(size_t i = 0; i <_ignoredAddresses->size(); i++) if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr ==fromAddress.ia.sin_addr.s_addr) { ignore = true; break; } - + } + if(!ignore) { - _receiveBuffer->put(_readBuffer, 0, bytesRead - <_receiveBuffer->getRemaining() ? bytesRead - : _receiveBuffer->getRemaining()); + // TODO do not copy.... wrap the buffer!!! + _receiveBuffer->put(_readBuffer, 0, + bytesRead <_receiveBuffer->getRemaining() ? + bytesRead : + _receiveBuffer->getRemaining() + ); _receiveBuffer->flip(); @@ -166,22 +181,26 @@ namespace epics { } } else { - // 0 == socket close - + // 0 == socket remotely closed // log a 'recvfrom' error - if(bytesRead==-1) errlogSevPrintf(errlogMajor, + if(!_closed && bytesRead==-1) errlogSevPrintf(errlogMajor, "Socket recv error: %s", strerror(errno)); + + close(true, false); + break; } } } catch(...) { // TODO: catch all exceptions, and act accordingly - close(true); + close(true, false); } char threadName[40]; epicsThreadGetName(_threadId, threadName, 40); errlogSevPrintf(errlogInfo, "Thread '%s' exiting", threadName); + + _shutdownEvent.signal(); } bool BlockingUDPTransport::processBuffer(osiSockAddr& fromAddress, @@ -240,7 +259,7 @@ namespace epics { } bool BlockingUDPTransport::send(ByteBuffer* buffer) { - if(_sendAddresses==NULL) return false; + if(!_sendAddresses) return false; for(size_t i = 0; i<_sendAddresses->size(); i++) { buffer->flip(); @@ -262,7 +281,7 @@ namespace epics { // that is the buffer size used by the platform for input on // this DatagramSocket. - int sockBufSize; + int sockBufSize = -1; socklen_t intLen = sizeof(int); int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, diff --git a/testApp/remote/testBeaconEmitter.cpp b/testApp/remote/testBeaconEmitter.cpp index 9e98d85..4c6cdaa 100644 --- a/testApp/remote/testBeaconEmitter.cpp +++ b/testApp/remote/testBeaconEmitter.cpp @@ -56,36 +56,26 @@ void testBeaconEmitter() { ContextImpl ctx; DummyResponseHandler drh(&ctx); -/* SOCKET mysocket; - if ((mysocket = socket (AF_INET, SOCK_DGRAM, 0)) == -1) - { - assert(false); - } - InetAddrVector* broadcastAddresses = getBroadcastAddresses(mysocket);*/ + SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0); + auto_ptr broadcastAddresses(getBroadcastAddresses(socket, 5067)); + epicsSocketDestroy (socket); - InetAddrVector* broadcastAddresses = new InetAddrVector; - osiSockAddr* addr = new osiSockAddr; - addr->ia.sin_family = AF_INET; - addr->ia.sin_port = htons(5067); - if(inet_aton("255.255.255.255",&addr->ia.sin_addr)==0) - { - assert(false); - } - broadcastAddresses->push_back(addr); - BlockingUDPConnector connector(true, broadcastAddresses, true); + BlockingUDPConnector connector(true, true); osiSockAddr bindAddr; bindAddr.ia.sin_family = AF_INET; bindAddr.ia.sin_port = htons(5066); bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); - Transport* transport = connector.connect(NULL, &drh, bindAddr, 1, 50); + + BlockingUDPTransport* transport = (BlockingUDPTransport*)connector.connect(NULL, &drh, bindAddr, 1, 50); + transport->setBroadcastAddresses(broadcastAddresses.get()); cout<<"Sending beacons"<getRemoteAddress()); beaconEmitter.start(); - while(1) sleep(1); + epicsThreadSleep (60.0); delete transport; } diff --git a/testApp/remote/testBeaconHandler.cpp b/testApp/remote/testBeaconHandler.cpp index 0e5ec28..c72194d 100644 --- a/testApp/remote/testBeaconHandler.cpp +++ b/testApp/remote/testBeaconHandler.cpp @@ -57,15 +57,15 @@ public: transport->ensureData((2*sizeof(int16)+2*sizeof(int32)+128)/sizeof(int8)); - const int32 sequentalID = payloadBuffer->getShort() & 0x0000FFFF; - const TimeStamp startupTimestamp(payloadBuffer->getInt() & 0x00000000FFFFFFFFL,(int32)(payloadBuffer->getInt() & 0x00000000FFFFFFFFL)); + /*const int32 sequentalID = */ payloadBuffer->getShort(); + const TimeStamp startupTimestamp(payloadBuffer->getInt(),payloadBuffer->getInt()); // 128-bit IPv6 address osiSockAddr address; decodeFromIPv6Address(payloadBuffer, &address); // get port - const int32 port = payloadBuffer->getShort() & 0xFFFF; + const int32 port = payloadBuffer->getShort(); address.ia.sin_port = ntohs(port); // accept given address if explicitly specified by sender @@ -130,7 +130,7 @@ void testBeaconHandler() { ContextImpl ctx; BeaconResponseHandler brh(&ctx); - BlockingUDPConnector connector(false, NULL, true); + BlockingUDPConnector connector(false, true); osiSockAddr bindAddr; bindAddr.ia.sin_family = AF_INET; @@ -139,7 +139,7 @@ void testBeaconHandler() Transport* transport = connector.connect(NULL, &brh, bindAddr, 1, 50); (static_cast(transport))->start(); - while(1) sleep(1); + epicsThreadSleep (60.0); delete transport; } diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp index 2117906..d34ae35 100644 --- a/testApp/remote/testBlockingUDPClnt.cpp +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -5,17 +5,19 @@ * Author: Miha Vitorovic */ -#include "remote.h" -#include "blockingUDP.h" -#include "logger.h" -#include "inetAddressUtil.h" +#include +#include +#include +#include + +//#include #include #include #include -#define SRV_IP "192.168.71.132" +#define SRV_IP "127.0.0.1" using namespace epics::pvAccess; using namespace epics::pvData; @@ -28,19 +30,16 @@ static osiSockAddr sendTo; class ContextImpl : public Context { public: - ContextImpl() : - _tr(new TransportRegistry()), _timer(new Timer("server thread", - lowPriority)), _conf(new SystemConfigurationImpl()) { - } + ContextImpl() + {} + virtual ~ContextImpl() { - delete _tr; - delete _timer; } virtual Timer* getTimer() { - return _timer; + return 0; } virtual TransportRegistry* getTransportRegistry() { - return _tr; + return 0; } virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; @@ -49,13 +48,8 @@ public: return 0; } virtual Configuration* getConfiguration() { - return _conf; + return 0; } - -private: - TransportRegistry* _tr; - Timer* _timer; - Configuration* _conf; }; class DummyResponseHandler : public ResponseHandler { @@ -97,7 +91,7 @@ private: }; void testBlockingUDPSender() { - BlockingUDPConnector connector(false, NULL, true); + BlockingUDPConnector connector(false, true); ContextImpl ctx; DummyTransportSender dts; @@ -130,8 +124,11 @@ void testBlockingUDPSender() { } int main(int argc, char *argv[]) { - createFileLogger("testBlockingUDPClnt.log"); +// createFileLogger("testBlockingUDPClnt.log"); testBlockingUDPSender(); + +// std::cout << "-----------------------------------------------------------------------" << std::endl; +// getShowConstructDestruct()->constuctDestructTotals(stdout); return (0); } diff --git a/testApp/remote/testBlockingUDPSrv.cpp b/testApp/remote/testBlockingUDPSrv.cpp index 5308428..a8c3aa4 100644 --- a/testApp/remote/testBlockingUDPSrv.cpp +++ b/testApp/remote/testBlockingUDPSrv.cpp @@ -23,19 +23,15 @@ using std::dec; class ContextImpl : public Context { public: - ContextImpl() : - _tr(new TransportRegistry()), _timer(new Timer("server thread", - lowPriority)), _conf(new SystemConfigurationImpl()) { - } + ContextImpl() {} + virtual ~ContextImpl() { - delete _tr; - delete _timer; } virtual Timer* getTimer() { - return _timer; + return 0; } virtual TransportRegistry* getTransportRegistry() { - return _tr; + return 0; } virtual Channel* getChannel(epics::pvAccess::pvAccessID) { return 0; @@ -44,13 +40,8 @@ public: return 0; } virtual Configuration* getConfiguration() { - return _conf; + return 0; } - -private: - TransportRegistry* _tr; - Timer* _timer; - Configuration* _conf; }; class DummyResponseHandler : public ResponseHandler { @@ -102,7 +93,7 @@ void DummyResponseHandler::handleResponse(osiSockAddr* responseFrom, } void testBlockingUDPConnector() { - BlockingUDPConnector connector(false, NULL, true); + BlockingUDPConnector connector(false, true); ContextImpl ctx; DummyResponseHandler drh(&ctx); @@ -127,7 +118,7 @@ void testBlockingUDPConnector() { } int main(int argc, char *argv[]) { - createFileLogger("testBlockingUDPSrv.log"); +// createFileLogger("testBlockingUDPSrv.log"); testBlockingUDPConnector(); return (0); diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index a317037..2e00223 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -7,7 +7,7 @@ #include #include #include -#include +#include ƒ #include #include @@ -2309,6 +2309,7 @@ class TestChannelImpl : public ChannelImpl { // setup UDP transport initializeUDPTransport(); + // TODO what if initialization failed!!! // setup search manager m_channelSearchManager = new ChannelSearchManager(this); @@ -2317,73 +2318,63 @@ class TestChannelImpl : public ChannelImpl { /** * Initialized UDP transport (broadcast socket and repeater connection). */ - void initializeUDPTransport() { - // setup UDP transport - try + bool initializeUDPTransport() { + + // quary broadcast addresses of all IFs + SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0); + auto_ptr broadcastAddresses(getBroadcastAddresses(socket, m_broadcastPort)); + epicsSocketDestroy (socket); + + // set broadcast address list + if (!m_addressList.empty()) { - // where to bind (listen) address - osiSockAddr listenLocalAddress; - listenLocalAddress.ia.sin_family = AF_INET; - listenLocalAddress.ia.sin_port = htons(m_broadcastPort); - listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); + // if auto is true, add it to specified list + InetAddrVector* appendList = 0; + if (m_autoAddressList) + appendList = broadcastAddresses.get(); - // where to send address - SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0); - InetAddrVector* broadcastAddresses = getBroadcastAddresses(socket, m_broadcastPort); - cout<<"Broadcast addresses: "<size()<size(); i++) { - cout<<"Broadcast address: "; - cout<at(i))<connect( - 0, new ClientResponseHandler(this), - listenLocalAddress, CA_MINOR_PROTOCOL_REVISION, - CA_DEFAULT_PRIORITY); - - - BlockingUDPConnector* searchConnector = new BlockingUDPConnector(false, broadcastAddresses, true); - - // undefined address - osiSockAddr undefinedAddress; - undefinedAddress.ia.sin_family = AF_INET; - undefinedAddress.ia.sin_port = htons(0); - undefinedAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - - m_searchTransport = (BlockingUDPTransport*)searchConnector->connect( - 0, new ClientResponseHandler(this), - undefinedAddress, CA_MINOR_PROTOCOL_REVISION, - CA_DEFAULT_PRIORITY); - - // set broadcast address list - if (!m_addressList.empty()) - { - // if auto is true, add it to specified list - InetAddrVector* appendList = 0; - if (m_autoAddressList) - appendList = m_broadcastTransport->getSendAddresses(); - - InetAddrVector* list = getSocketAddressList(m_addressList, m_broadcastPort, appendList); - // TODO delete !!!! - if (list && list->size()) { - m_broadcastTransport->setBroadcastAddresses(list); - m_searchTransport->setBroadcastAddresses(list); - } + auto_ptr list(getSocketAddressList(m_addressList, m_broadcastPort, appendList)); + if (list.get() && list->size()) { + // delete old list and take ownership of a new one + broadcastAddresses = list; } - - m_broadcastTransport->start(); - m_searchTransport->start(); - - } - catch (...) - { - // TODO } + + // where to bind (listen) address + osiSockAddr listenLocalAddress; + listenLocalAddress.ia.sin_family = AF_INET; + listenLocalAddress.ia.sin_port = htons(m_broadcastPort); + listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); + + BlockingUDPConnector* broadcastConnector = new BlockingUDPConnector(true, true); + m_broadcastTransport = (BlockingUDPTransport*)broadcastConnector->connect( + 0, new ClientResponseHandler(this), + listenLocalAddress, CA_MINOR_PROTOCOL_REVISION, + CA_DEFAULT_PRIORITY); + if (!m_broadcastTransport) + return false; + m_broadcastTransport->setBroadcastAddresses(broadcastAddresses.get()); + + // undefined address + osiSockAddr undefinedAddress; + undefinedAddress.ia.sin_family = AF_INET; + undefinedAddress.ia.sin_port = htons(0); + undefinedAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); + + BlockingUDPConnector* searchConnector = new BlockingUDPConnector(false, true); + m_searchTransport = (BlockingUDPTransport*)searchConnector->connect( + 0, new ClientResponseHandler(this), + undefinedAddress, CA_MINOR_PROTOCOL_REVISION, + CA_DEFAULT_PRIORITY); + if (!m_searchTransport) + return false; + m_searchTransport->setBroadcastAddresses(broadcastAddresses.get()); + + // become active + m_broadcastTransport->start(); + m_searchTransport->start(); + + return true; } void internalDestroy() { @@ -3077,10 +3068,11 @@ int main(int argc,char *argv[]) ChannelRequesterImpl channelRequester; Channel* channel = context->getProvider()->createChannel("structureArrayTest", &channelRequester); + + epicsThreadSleep ( 1.0 ); + channel->printInfo(); - - epicsThreadSleep ( 3.0 ); - +/* GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); epicsThreadSleep ( 1.0 ); @@ -3112,7 +3104,7 @@ int main(int argc,char *argv[]) //TODOchannelPut->destroy(); // TODO delete pvRequest - +*/ /* MonitorRequesterImpl monitorRequesterImpl; Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0); @@ -3130,7 +3122,7 @@ int main(int argc,char *argv[]) monitor->destroy(); */ - epicsThreadSleep ( 20.0 ); + epicsThreadSleep ( 3.0 ); channel->destroy(); context->destroy(); From d8f81d79dbea21aa638a267dde27322cac8fc606 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Thu, 20 Jan 2011 19:01:25 +0100 Subject: [PATCH 5/7] MM cleanup --- pvAccessApp/remote/blockingTCPTransport.cpp | 16 +++-- pvAccessApp/remote/blockingUDPConnector.cpp | 2 +- pvAccessApp/remote/blockingUDPTransport.cpp | 9 +-- pvAccessApp/remote/remote.h | 7 ++ pvAccessApp/utils/inetAddressUtil.cpp | 45 +++++-------- pvAccessApp/utils/inetAddressUtil.h | 4 +- testApp/remote/testBlockingUDPClnt.cpp | 2 + testApp/remote/testBlockingUDPSrv.cpp | 2 + testApp/remote/testRemoteClientImpl.cpp | 74 ++++++++++----------- testApp/utils/inetAddressUtilsTest.cpp | 74 +++++++++++---------- 10 files changed, 119 insertions(+), 116 deletions(-) diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index a4778ec..8d3dd00 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -140,12 +140,14 @@ namespace epics { delete _sendQueueMutex; delete _verifiedMutex; delete _monitorMutex; + + delete _responseHandler; } void BlockingTCPTransport::start() { _sendThreadRunning = true; String threadName = "TCP-receive "+inetAddressToString( - _socketAddress); + * _socketAddress); errlogSevPrintf(errlogInfo, "Starting thread: %s", threadName.c_str()); @@ -155,7 +157,7 @@ namespace epics { epicsThreadStackMedium), BlockingTCPTransport::rcvThreadRunner, this); - threadName = "TCP-send "+inetAddressToString(_socketAddress); + threadName = "TCP-send "+inetAddressToString(*_socketAddress); errlogSevPrintf(errlogInfo, "Starting thread: %s", threadName.c_str()); @@ -506,7 +508,7 @@ namespace epics { errlogSevPrintf( errlogMinor, "Invalid header received from client %s, disconnecting...", - inetAddressToString(_socketAddress).c_str()); + inetAddressToString(*_socketAddress).c_str()); close(true); return; } @@ -552,7 +554,7 @@ namespace epics { errlogMajor, "Unknown packet type %d, received from client %s, disconnecting...", type, - inetAddressToString(_socketAddress).c_str()); + inetAddressToString(*_socketAddress).c_str()); close(true); return; } @@ -690,7 +692,7 @@ namespace epics { //errlogSevPrintf(errlogInfo, // "Sending %d of total %d bytes in the packet to %s.", // bytesToSend, limit, - // inetAddressToString(_socketAddress).c_str()); + // inetAddressToString(*_socketAddress).c_str()); while(buffer->getRemaining()>0) { ssize_t bytesSent = ::send(_channel, @@ -716,7 +718,7 @@ namespace epics { //errlogSevPrintf(errlogInfo, // "Send buffer full for %s, waiting...", - // inetAddressToString(_socketAddress)); + // inetAddressToString(*_socketAddress)); return false; } @@ -838,7 +840,7 @@ namespace epics { freeSendBuffers(); errlogSevPrintf(errlogInfo, "Connection to %s closed.", - inetAddressToString(_socketAddress).c_str()); + inetAddressToString(*_socketAddress).c_str()); if(_channel!=INVALID_SOCKET) { epicsSocketDestroy(_channel); diff --git a/pvAccessApp/remote/blockingUDPConnector.cpp b/pvAccessApp/remote/blockingUDPConnector.cpp index 6f66853..629df10 100644 --- a/pvAccessApp/remote/blockingUDPConnector.cpp +++ b/pvAccessApp/remote/blockingUDPConnector.cpp @@ -25,7 +25,7 @@ namespace epics { short transportRevision, int16 priority) { errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s", - inetAddressToString(&bindAddress).c_str()); + inetAddressToString(bindAddress).c_str()); SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if(socket==INVALID_SOCKET) { diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 08a00db..ac47644 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -57,10 +57,11 @@ namespace epics { delete _receiveBuffer; delete _sendBuffer; + delete _responseHandler; } void BlockingUDPTransport::start() { - String threadName = "UDP-receive "+inetAddressToString(&_bindAddress); + String threadName = "UDP-receive "+inetAddressToString(_bindAddress); errlogSevPrintf(errlogInfo, "Starting thread: %s",threadName.c_str()); @@ -82,7 +83,7 @@ namespace epics { errlogSevPrintf(errlogInfo, "UDP socket %s closed.", - inetAddressToString(&_bindAddress).c_str()); + inetAddressToString(_bindAddress).c_str()); epicsSocketDestroy(_channel); } @@ -160,7 +161,7 @@ namespace epics { if(_ignoredAddresses!=0) { for(size_t i = 0; i <_ignoredAddresses->size(); i++) - if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr + if(_ignoredAddresses->at(i).ia.sin_addr.s_addr ==fromAddress.ia.sin_addr.s_addr) { ignore = true; break; @@ -264,7 +265,7 @@ namespace epics { for(size_t i = 0; i<_sendAddresses->size(); i++) { buffer->flip(); int retval = sendto(_channel, buffer->getArray(), - buffer->getLimit(), 0, &(_sendAddresses->at(i)->sa), + buffer->getLimit(), 0, &(_sendAddresses->at(i).sa), sizeof(sockaddr)); { if(retval<0) errlogSevPrintf(errlogMajor, diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index a341de6..2e6a7f3 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -289,6 +289,8 @@ namespace epics { */ class ResponseHandler { public: + virtual ~ResponseHandler() {} + /** * Handle response. * @param[in] responseFrom remote address of the responder, 0 if unknown. @@ -504,6 +506,7 @@ namespace epics { */ class ResponseRequest { public: + virtual ~ResponseRequest() {} /** * Get I/O ID. @@ -540,6 +543,8 @@ namespace epics { */ class DataResponse : public ResponseRequest { public: + virtual ~DataResponse() {} + /** * Notification response. * @param transport @@ -559,6 +564,8 @@ namespace epics { */ class SubscriptionRequest /*: public ResponseRequest*/ { public: + virtual ~SubscriptionRequest() {} + /** * Update (e.g. after some time of unresponsiveness) - report current value. */ diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index 6ee7ab7..17f43b2 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -66,10 +66,10 @@ namespace epics { namespace pvAccess { void addDefaultBroadcastAddress(InetAddrVector* v, in_port_t p) { - osiSockAddr* pNewNode = new osiSockAddr; - pNewNode->ia.sin_family = AF_INET; - pNewNode->ia.sin_addr.s_addr = htonl(INADDR_BROADCAST); - pNewNode->ia.sin_port = htons(p); + osiSockAddr pNewNode; + pNewNode.ia.sin_family = AF_INET; + pNewNode.ia.sin_addr.s_addr = htonl(INADDR_BROADCAST); + pNewNode.ia.sin_port = htons(p); v->push_back(pNewNode); } @@ -84,7 +84,7 @@ namespace epics { struct ifreq* pIfreqList; struct ifreq* pifreq; struct ifreq ifrBuff; - osiSockAddr* pNewNode; + osiSockAddr pNewNode; InetAddrVector* retVector = new InetAddrVector(); @@ -157,16 +157,6 @@ namespace epics { */ if(ifrBuff.ifr_flags&IFF_LOOPBACK) continue; - pNewNode = new osiSockAddr; - if(pNewNode==NULL) { - errlogSevPrintf(errlogMajor, - "getBroadcastAddresses(): no memory available for configuration"); - delete[] pIfreqList; - if(retVector->size()==0) addDefaultBroadcastAddress( - retVector, defaultPort); - return retVector; - } - /* * If this is an interface that supports * broadcast fetch the broadcast address. @@ -186,10 +176,9 @@ namespace epics { errlogMinor, "getBroadcastAddresses(): net intf \"%s\": bcast addr fetch fail", pifreq->ifr_name); - delete pNewNode; continue; } - pNewNode->sa = ifrBuff.ifr_broadaddr; + pNewNode.sa = ifrBuff.ifr_broadaddr; } #ifdef IFF_POINTOPOINT else if(ifrBuff.ifr_flags&IFF_POINTOPOINT) { @@ -201,10 +190,9 @@ namespace epics { errlogMinor, "getBroadcastAddresses(): net intf \"%s\": pt to pt addr fetch fail", pifreq->ifr_name); - delete pNewNode; continue; } - pNewNode->sa = ifrBuff.ifr_dstaddr; + pNewNode.sa = ifrBuff.ifr_dstaddr; } #endif else { @@ -212,10 +200,9 @@ namespace epics { errlogMinor, "getBroadcastAddresses(): net intf \"%s\": not point to point or bcast?", pifreq->ifr_name); - delete pNewNode; continue; } - pNewNode->ia.sin_port = htons(defaultPort); + pNewNode.ia.sin_port = htons(defaultPort); retVector->push_back(pNewNode); } @@ -296,15 +283,15 @@ namespace epics { size_t subEnd; while((subEnd = list.find(' ', subStart))!=String::npos) { String address = list.substr(subStart, (subEnd-subStart)); - osiSockAddr* addr = new osiSockAddr; - aToIPAddr(address.c_str(), defaultPort, &addr->ia); + osiSockAddr addr; + aToIPAddr(address.c_str(), defaultPort, &addr.ia); iav->push_back(addr); subStart = list.find_first_not_of(" \t\r\n\v", subEnd); } if(subStart!=String::npos&&list.length()>0) { - osiSockAddr* addr = new osiSockAddr; - aToIPAddr(list.substr(subStart).c_str(), defaultPort, &addr->ia); + osiSockAddr addr; + aToIPAddr(list.substr(subStart).c_str(), defaultPort, &addr.ia); iav->push_back(addr); } @@ -315,18 +302,18 @@ namespace epics { return iav; } - const String inetAddressToString(const osiSockAddr *addr, + const String inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex) { stringstream saddr; - int ipa = ntohl(addr->ia.sin_addr.s_addr); + int ipa = ntohl(addr.ia.sin_addr.s_addr); saddr<<((int)(ipa>>24)&0xFF)<<'.'; saddr<<((int)(ipa>>16)&0xFF)<<'.'; saddr<<((int)(ipa>>8)&0xFF)<<'.'; saddr<<((int)ipa&0xFF); - if(displayPort) saddr<<":"<ia.sin_port); - if(displayHex) saddr<<" ("<ia.sin_addr.s_addr) + if(displayPort) saddr<<":"< InetAddrVector; + typedef std::vector InetAddrVector; /** * returns a vector containing all the IPv4 broadcast addresses @@ -72,7 +72,7 @@ namespace epics { InetAddrVector* getSocketAddressList(epics::pvData::String list, int defaultPort, const InetAddrVector* appendList = NULL); - const epics::pvData::String inetAddressToString(const osiSockAddr *addr, + const epics::pvData::String inetAddressToString(const osiSockAddr &addr, bool displayPort = true, bool displayHex = false); /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp index d34ae35..c74b6ad 100644 --- a/testApp/remote/testBlockingUDPClnt.cpp +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -57,6 +57,8 @@ public: DummyResponseHandler(Context* ctx) { } + virtual ~DummyResponseHandler() {} + virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { diff --git a/testApp/remote/testBlockingUDPSrv.cpp b/testApp/remote/testBlockingUDPSrv.cpp index a8c3aa4..7bbed6a 100644 --- a/testApp/remote/testBlockingUDPSrv.cpp +++ b/testApp/remote/testBlockingUDPSrv.cpp @@ -49,6 +49,8 @@ public: DummyResponseHandler(Context* context) : packets(0) { } + + virtual ~DummyResponseHandler() {} int getPackets() { return packets; diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 2e00223..272fb11 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -655,6 +655,7 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField); +// NOTE: this instance is not returned as Request, so it must self-destruct class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender { private: @@ -775,6 +776,8 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender // always cancel request // cancel(); // } + + cancel(); } @@ -1251,7 +1254,19 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD public: - ~ClientResponseHandler() { + virtual ~ClientResponseHandler() { + delete m_handlerTable[ 0]; + delete m_handlerTable[ 1]; + delete m_handlerTable[ 2]; + delete m_handlerTable[ 3]; + delete m_handlerTable[ 4]; + delete m_handlerTable[ 5]; + delete m_handlerTable[ 6]; + delete m_handlerTable[ 7]; + delete m_handlerTable[ 8]; + delete m_handlerTable[ 9]; + delete m_handlerTable[18]; + delete[] m_handlerTable; } @@ -1315,27 +1330,6 @@ class ClientResponseHandler : public ResponseHandler, private epics::pvData::NoD } }; - class TCI : public TransportSendControl { - public: - virtual void flushSerializeBuffer() { - } - - virtual void ensureBuffer(int size) { - } - - virtual void startMessage(int8 command, int ensureCapacity){} - virtual void endMessage() {} - - virtual void flush(bool lastMessageCompleted) {} - - virtual void setRecipient(const osiSockAddr& sendTo) {} - }; - - - - - - @@ -1534,7 +1528,7 @@ class TestChannelImpl : public ChannelImpl { } else { - return inetAddressToString(m_transport->getRemoteAddress()); + return inetAddressToString(*m_transport->getRemoteAddress()); } } @@ -1840,7 +1834,7 @@ class TestChannelImpl : public ChannelImpl { if (sockAddrAreIdentical(transport->getRemoteAddress(), serverAddress)) { m_requester->message("More than one channel with name '" + m_name + - "' detected, additional response from: " + inetAddressToString(serverAddress), warningMessage); + "' detected, additional response from: " + inetAddressToString(*serverAddress), warningMessage); return; } } @@ -2184,6 +2178,14 @@ class TestChannelImpl : public ChannelImpl { } virtual Configuration* getConfiguration() { +/* +TODO + final ConfigurationProvider configurationProvider = ConfigurationFactory.getProvider(); + Configuration config = configurationProvider.getConfiguration("pvAccess-client"); + if (config == null) + config = configurationProvider.getConfiguration("system"); + return config; +*/ return m_configuration; } @@ -2289,15 +2291,12 @@ class TestChannelImpl : public ChannelImpl { ~TestClientContextImpl() {}; void loadConfiguration() { - // TODO - /* - m_addressList = config->getPropertyAsString("EPICS4_CA_ADDR_LIST", m_addressList); - m_autoAddressList = config->getPropertyAsBoolean("EPICS4_CA_AUTO_ADDR_LIST", m_autoAddressList); - m_connectionTimeout = config->getPropertyAsFloat("EPICS4_CA_CONN_TMO", m_connectionTimeout); - m_beaconPeriod = config->getPropertyAsFloat("EPICS4_CA_BEACON_PERIOD", m_beaconPeriod); - m_broadcastPort = config->getPropertyAsInteger("EPICS4_CA_BROADCAST_PORT", m_broadcastPort); - m_receiveBufferSize = config->getPropertyAsInteger("EPICS4_CA_MAX_ARRAY_BYTES", m_receiveBufferSize); - */ + m_addressList = m_configuration->getPropertyAsString("EPICS4_CA_ADDR_LIST", m_addressList); + m_autoAddressList = m_configuration->getPropertyAsBoolean("EPICS4_CA_AUTO_ADDR_LIST", m_autoAddressList); + m_connectionTimeout = m_configuration->getPropertyAsFloat("EPICS4_CA_CONN_TMO", m_connectionTimeout); + m_beaconPeriod = m_configuration->getPropertyAsFloat("EPICS4_CA_BEACON_PERIOD", m_beaconPeriod); + m_broadcastPort = m_configuration->getPropertyAsInteger("EPICS4_CA_BROADCAST_PORT", m_broadcastPort); + m_receiveBufferSize = m_configuration->getPropertyAsInteger("EPICS4_CA_MAX_ARRAY_BYTES", m_receiveBufferSize); } void internalInitialize() { @@ -2322,6 +2321,7 @@ class TestChannelImpl : public ChannelImpl { // quary broadcast addresses of all IFs SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0); + if (socket == INVALID_SOCKET) return false; auto_ptr broadcastAddresses(getBroadcastAddresses(socket, m_broadcastPort)); epicsSocketDestroy (socket); @@ -2346,7 +2346,7 @@ class TestChannelImpl : public ChannelImpl { listenLocalAddress.ia.sin_port = htons(m_broadcastPort); listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - BlockingUDPConnector* broadcastConnector = new BlockingUDPConnector(true, true); + auto_ptr broadcastConnector(new BlockingUDPConnector(true, true)); m_broadcastTransport = (BlockingUDPTransport*)broadcastConnector->connect( 0, new ClientResponseHandler(this), listenLocalAddress, CA_MINOR_PROTOCOL_REVISION, @@ -2361,7 +2361,7 @@ class TestChannelImpl : public ChannelImpl { undefinedAddress.ia.sin_port = htons(0); undefinedAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - BlockingUDPConnector* searchConnector = new BlockingUDPConnector(false, true); + auto_ptr searchConnector(new BlockingUDPConnector(false, true)); m_searchTransport = (BlockingUDPTransport*)searchConnector->connect( 0, new ClientResponseHandler(this), undefinedAddress, CA_MINOR_PROTOCOL_REVISION, @@ -3072,11 +3072,11 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channel->printInfo(); -/* + GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); epicsThreadSleep ( 1.0 ); - +/* ChannelProcessRequesterImpl channelProcessRequester; ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); epicsThreadSleep ( 1.0 ); diff --git a/testApp/utils/inetAddressUtilsTest.cpp b/testApp/utils/inetAddressUtilsTest.cpp index bffa82f..eb7c33b 100644 --- a/testApp/utils/inetAddressUtilsTest.cpp +++ b/testApp/utils/inetAddressUtilsTest.cpp @@ -32,25 +32,25 @@ int main(int argc, char *argv[]) { assert(vec->size()==3); - osiSockAddr* addr; + osiSockAddr addr; addr = vec->at(0); - assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==htons(555)); - assert(addr->ia.sin_addr.s_addr==htonl(0x7F000001)); + assert(addr.ia.sin_family==AF_INET); + assert(addr.ia.sin_port==htons(555)); + assert(addr.ia.sin_addr.s_addr==htonl(0x7F000001)); assert(inetAddressToString(addr)=="127.0.0.1:555"); cout<<'\t'<at(1); - assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==htons(1234)); - assert(addr->ia.sin_addr.s_addr==htonl(0x0A0A0C0B)); + assert(addr.ia.sin_family==AF_INET); + assert(addr.ia.sin_port==htons(1234)); + assert(addr.ia.sin_addr.s_addr==htonl(0x0A0A0C0B)); assert(inetAddressToString(addr)=="10.10.12.11:1234"); cout<<'\t'<at(2); - assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==htons(555)); - assert(addr->ia.sin_addr.s_addr==htonl(0xC0A80304)); + assert(addr.ia.sin_family==AF_INET); + assert(addr.ia.sin_port==htons(555)); + assert(addr.ia.sin_addr.s_addr==htonl(0xC0A80304)); assert(inetAddressToString(addr)=="192.168.3.4:555"); cout<<'\t'<size()==4); addr = vec1->at(0); - assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==htons(6789)); - assert(addr->ia.sin_addr.s_addr==htonl(0xAC1037A0)); + assert(addr.ia.sin_family==AF_INET); + assert(addr.ia.sin_port==htons(6789)); + assert(addr.ia.sin_addr.s_addr==htonl(0xAC1037A0)); assert(inetAddressToString(addr)=="172.16.55.160:6789"); cout<<'\t'<at(1); - assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==htons(555)); - assert(addr->ia.sin_addr.s_addr==htonl(0x7F000001)); + assert(addr.ia.sin_family==AF_INET); + assert(addr.ia.sin_port==htons(555)); + assert(addr.ia.sin_addr.s_addr==htonl(0x7F000001)); assert(inetAddressToString(addr)=="127.0.0.1:555"); cout<<'\t'<at(2); - assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==htons(1234)); - assert(addr->ia.sin_addr.s_addr==htonl(0x0A0A0C0B)); + assert(addr.ia.sin_family==AF_INET); + assert(addr.ia.sin_port==htons(1234)); + assert(addr.ia.sin_addr.s_addr==htonl(0x0A0A0C0B)); assert(inetAddressToString(addr)=="10.10.12.11:1234"); cout<<'\t'<at(3); - assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==htons(555)); - assert(addr->ia.sin_addr.s_addr==htonl(0xC0A80304)); + assert(addr.ia.sin_family==AF_INET); + assert(addr.ia.sin_port==htons(555)); + assert(addr.ia.sin_addr.s_addr==htonl(0xC0A80304)); assert(inetAddressToString(addr)=="192.168.3.4:555"); cout<<'\t'<at(0)))==(int32)0x7F000001); - assert(ipv4AddressToInt(*(vec->at(1)))==(int32)0x0A0A0C0B); - assert(ipv4AddressToInt(*(vec->at(2)))==(int32)0xC0A80304); + assert(ipv4AddressToInt((vec->at(0)))==(int32)0x7F000001); + assert(ipv4AddressToInt((vec->at(1)))==(int32)0x0A0A0C0B); + assert(ipv4AddressToInt((vec->at(2)))==(int32)0xC0A80304); cout<<"\nPASSED!\n"; delete vec; delete vec1; + osiSockAddr* paddr; cout<<"Testing \"intToIPv4Address\""<ia.sin_family==AF_INET); - assert(inetAddressToString(addr)=="127.0.0.1:0"); - cout<<'\t'<ia.sin_family==AF_INET); + assert(inetAddressToString(*paddr)=="127.0.0.1:0"); + cout<<'\t'<ia.sin_family==AF_INET); - assert(inetAddressToString(addr)=="10.10.12.11:0"); - cout<<'\t'<ia.sin_family==AF_INET); + assert(inetAddressToString(*paddr)=="10.10.12.11:0"); + cout<<'\t'<getArray(), src, 16)==0); cout<<"\nPASSED!\n"; @@ -135,7 +138,6 @@ int main(int argc, char *argv[]) { } delete broadcasts; - delete addr; return 0; } From b23e7f13d8f61e01b437a1807bd85dc74dadd4cf Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Sun, 23 Jan 2011 23:47:13 +0100 Subject: [PATCH 6/7] memory management fixes --- .../remote/blockingClientTCPTransport.cpp | 54 +++++++++---------- .../remote/blockingServerTCPTransport.cpp | 40 +++++++------- pvAccessApp/remote/blockingTCP.h | 14 ++--- pvAccessApp/remote/blockingTCPTransport.cpp | 8 +-- pvAccessApp/utils/introspectionRegistry.cpp | 9 ++-- testApp/remote/testRemoteClientImpl.cpp | 23 ++++---- 6 files changed, 74 insertions(+), 74 deletions(-) diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index b793374..5f4a30f 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -34,11 +34,9 @@ namespace epics { float beaconInterval, int16 priority) : BlockingTCPTransport(context, channel, responseHandler, receiveBufferSize, priority), _introspectionRegistry( - new IntrospectionRegistry(false)), _owners(new set< - TransportClient*> ()), _connectionTimeout(beaconInterval + new IntrospectionRegistry(false)), _connectionTimeout(beaconInterval *1000), _unresponsiveTransport(false), _timerNode( - new TimerNode(this)), _mutex(new Mutex()), _ownersMutex( - new Mutex()), _verifyOrEcho(true) { + new TimerNode(this)), _verifyOrEcho(true) { _autoDelete = false; // initialize owners list, send queue @@ -58,11 +56,9 @@ namespace epics { } BlockingClientTCPTransport::~BlockingClientTCPTransport() { + printf("========== ~BlockingClientTCPTransport\n"); delete _introspectionRegistry; - delete _owners; delete _timerNode; - delete _mutex; - delete _ownersMutex; } void BlockingClientTCPTransport::callback() { @@ -84,15 +80,15 @@ namespace epics { if(!_unresponsiveTransport) { _unresponsiveTransport = true; - Lock lock(_ownersMutex); - set::iterator it = _owners->begin(); - for(; it!=_owners->end(); it++) + Lock lock(&_ownersMutex); + set::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) (*it)->transportUnresponsive(); } } bool BlockingClientTCPTransport::acquire(TransportClient* client) { - Lock lock(_mutex); + Lock lock(&_mutex); if(_closed) return false; @@ -100,11 +96,9 @@ namespace epics { ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Acquiring transport to %s.", ipAddrStr); - _ownersMutex->lock(); - if(_closed) return false; - - _owners->insert(client); - _ownersMutex->unlock(); + Lock lock2(&_ownersMutex); +// TODO double check? if(_closed) return false; + _owners.insert(client); return true; } @@ -121,10 +115,10 @@ namespace epics { * Notifies clients about disconnect. */ void BlockingClientTCPTransport::closedNotifyClients() { - Lock lock(_ownersMutex); + Lock lock(&_ownersMutex); // check if still acquired - int refs = _owners->size(); + int refs = _owners.size(); if(refs>0) { char ipAddrStr[48]; ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); @@ -133,12 +127,12 @@ namespace epics { "Transport to %s still has %d client(s) active and closing...", ipAddrStr, refs); - set::iterator it = _owners->begin(); - for(; it!=_owners->end(); it++) + set::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) (*it)->transportClosed(); } - _owners->clear(); + _owners.clear(); } void BlockingClientTCPTransport::release(TransportClient* client) { @@ -149,12 +143,12 @@ namespace epics { errlogSevPrintf(errlogInfo, "Releasing transport to %s.", ipAddrStr); - Lock lock(_ownersMutex); - _owners->erase(client); + Lock lock(&_ownersMutex); + _owners.erase(client); // not used anymore // TODO consider delayed destruction (can improve performance!!!) - if(_owners->size()==0) close(false); + if(_owners.size()==0) close(false); } void BlockingClientTCPTransport::aliveNotification() { @@ -165,20 +159,20 @@ namespace epics { void BlockingClientTCPTransport::responsiveTransport() { if(_unresponsiveTransport) { _unresponsiveTransport = false; - Lock lock(_ownersMutex); + Lock lock(&_ownersMutex); - set::iterator it = _owners->begin(); - for(; it!=_owners->end(); it++) + set::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) (*it)->transportResponsive(this); } } void BlockingClientTCPTransport::changedTransport() { _introspectionRegistry->reset(); - Lock lock(_ownersMutex); + Lock lock(&_ownersMutex); - set::iterator it = _owners->begin(); - for(; it!=_owners->end(); it++) + set::iterator it = _owners.begin(); + for(; it!=_owners.end(); it++) (*it)->transportChanged(); } diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 079e2f4..86ab989 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -31,9 +31,7 @@ namespace epics { BlockingTCPTransport(context, channel, responseHandler, receiveBufferSize, CA_DEFAULT_PRIORITY), _introspectionRegistry(new IntrospectionRegistry(true)), - _lastChannelSID(0), _channels( - new map ()), _channelsMutex( - new Mutex()) { + _lastChannelSID(0) { // NOTE: priority not yet known, default priority is used to register/unregister // TODO implement priorities in Reactor... not that user will // change it.. still getPriority() must return "registered" priority! @@ -43,13 +41,11 @@ namespace epics { BlockingServerTCPTransport::~BlockingServerTCPTransport() { delete _introspectionRegistry; - delete _channels; - delete _channelsMutex; } void BlockingServerTCPTransport::destroyAllChannels() { - Lock lock(_channelsMutex); - if(_channels->size()==0) return; + Lock lock(&_channelsMutex); + if(_channels.size()==0) return; char ipAddrStr[64]; ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); @@ -57,13 +53,13 @@ namespace epics { errlogSevPrintf( errlogInfo, "Transport to %s still has %u channel(s) active and closing...", - ipAddrStr, _channels->size()); + ipAddrStr, _channels.size()); - map::iterator it = _channels->begin(); - for(; it!=_channels->end(); it++) + map::iterator it = _channels.begin(); + for(; it!=_channels.end(); it++) it->second->destroy(); - _channels->clear(); + _channels.clear(); } void BlockingServerTCPTransport::internalClose(bool force) { @@ -72,37 +68,37 @@ namespace epics { } pvAccessID BlockingServerTCPTransport::preallocateChannelSID() { - Lock lock(_channelsMutex); + Lock lock(&_channelsMutex); // search first free (theoretically possible loop of death) pvAccessID sid = ++_lastChannelSID; - while(_channels->find(sid)!=_channels->end()) + while(_channels.find(sid)!=_channels.end()) sid = ++_lastChannelSID; return sid; } void BlockingServerTCPTransport::registerChannel(pvAccessID sid, ServerChannel* channel) { - Lock lock(_channelsMutex); - (*_channels)[sid] = channel; + Lock lock(&_channelsMutex); + _channels[sid] = channel; } void BlockingServerTCPTransport::unregisterChannel(pvAccessID sid) { - Lock lock(_channelsMutex); - _channels->erase(sid); + Lock lock(&_channelsMutex); + _channels.erase(sid); } ServerChannel* BlockingServerTCPTransport::getChannel(pvAccessID sid) { - Lock lock(_channelsMutex); + Lock lock(&_channelsMutex); - map::iterator it = _channels->find(sid); - if(it!=_channels->end()) return it->second; + map::iterator it = _channels.find(sid); + if(it!=_channels.end()) return it->second; return NULL; } int BlockingServerTCPTransport::getChannelCount() { - Lock lock(_channelsMutex); - return _channels->size(); + Lock lock(&_channelsMutex); + return _channels.size(); } void BlockingServerTCPTransport::send(ByteBuffer* buffer, diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index b017341..19fc260 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -52,7 +52,7 @@ namespace epics { BlockingTCPTransport(Context* context, SOCKET channel, ResponseHandler* responseHandler, int receiveBufferSize, int16 priority); - + virtual bool isClosed() const { return _closed; } @@ -439,7 +439,7 @@ namespace epics { /** * Owners (users) of the transport. */ - std::set* _owners; + std::set _owners; /** * Connection timeout (no-traffic) flag. @@ -461,8 +461,8 @@ namespace epics { */ volatile epicsTimeStamp _aliveTimestamp; - epics::pvData::Mutex* _mutex; - epics::pvData::Mutex* _ownersMutex; + epics::pvData::Mutex _mutex; + epics::pvData::Mutex _ownersMutex; bool _verifyOrEcho; @@ -645,9 +645,9 @@ namespace epics { /** * Channel table (SID -> channel mapping). */ - std::map* _channels; + std::map _channels; - Mutex* _channelsMutex; + Mutex _channelsMutex; /** * Destroy all channels. @@ -672,7 +672,7 @@ namespace epics { BlockingTCPAcceptor(Context* context, int port, int receiveBufferSize); - ~BlockingTCPAcceptor(); + virtual ~BlockingTCPAcceptor(); void handleEvents(); diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 8d3dd00..930edf5 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -130,6 +130,8 @@ namespace epics { BlockingTCPTransport::~BlockingTCPTransport() { close(true); + // TODO remove + epicsThreadSleep(3.0); delete _socketAddress; delete _sendQueue; @@ -659,7 +661,7 @@ namespace epics { } catch(BaseException* e) { String trace; e->toString(trace); - errlogSevPrintf(errlogMajor, trace.c_str()); + errlogSevPrintf(errlogMajor, "%s", trace.c_str()); // error, release lock clearAndReleaseBuffer(); } catch(...) { @@ -703,7 +705,7 @@ namespace epics { // connection lost ostringstream temp; temp<<"error in sending TCP data: "<toString(trace); - errlogSevPrintf(errlogMajor, trace.c_str()); + errlogSevPrintf(errlogMajor, "%s", trace.c_str()); _sendBuffer->setPosition(_lastMessageStartPosition); } catch(...) { _sendBuffer->setPosition(_lastMessageStartPosition); diff --git a/pvAccessApp/utils/introspectionRegistry.cpp b/pvAccessApp/utils/introspectionRegistry.cpp index a50ee41..5e0e229 100644 --- a/pvAccessApp/utils/introspectionRegistry.cpp +++ b/pvAccessApp/utils/introspectionRegistry.cpp @@ -26,6 +26,7 @@ IntrospectionRegistry::IntrospectionRegistry(bool serverSide) : _mutex(Mutex()) IntrospectionRegistry::~IntrospectionRegistry() { + reset(); } void IntrospectionRegistry::reset() @@ -242,7 +243,9 @@ FieldConstPtr IntrospectionRegistry::deserialize(ByteBuffer* buffer, Deserializa else if(typeCode == IntrospectionRegistry::ONLY_ID_TYPE_CODE) { control->ensureData(sizeof(int16)/sizeof(int8)); - return registry->getIntrospectionInterface(buffer->getShort()); + FieldConstPtr field = registry->getIntrospectionInterface(buffer->getShort()); + field->incReferenceCount(); // we inc, so that deserialize always returns a field with +1 ref. count (as when created) + return field; } // could also be a mask @@ -305,7 +308,6 @@ StructureConstPtr IntrospectionRegistry::deserializeStructureField(ByteBuffer* b } StructureConstPtr structure = _fieldCreate->createStructure(structureFieldName, size, fields); - //???????delete [] fields; return structure; } @@ -353,7 +355,8 @@ PVStructurePtr IntrospectionRegistry::deserializeStructureAndCreatePVStructure(B { return NULL; } - return _pvDataCreate->createPVStructure(NULL,static_cast(field)); + PVStructurePtr retVal = _pvDataCreate->createPVStructure(NULL,static_cast(field)); + return retVal; } void IntrospectionRegistry::serializeStatus(ByteBuffer* buffer, SerializableControl* control, Status* status) diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 272fb11..b26afac 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -761,6 +761,7 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender // deserialize Field... const Field* field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport); m_callback->getDone(status, field); + field->decReferenceCount(); } else { @@ -1499,6 +1500,7 @@ class TestChannelImpl : public ChannelImpl { virtual void destroy() { + destroy(false); //TODO guard if (m_addresses) delete m_addresses; delete this; }; @@ -3072,11 +3074,11 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channel->printInfo(); - +/* GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); epicsThreadSleep ( 1.0 ); -/* + ChannelProcessRequesterImpl channelProcessRequester; ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0); epicsThreadSleep ( 1.0 ); @@ -3084,16 +3086,16 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channelProcess->destroy(); epicsThreadSleep ( 1.0 ); - +*/ ChannelGetRequesterImpl channelGetRequesterImpl; - PVStructure* pvRequest = getCreateRequest()->createRequest("field(timeStamp,value)",&channelGetRequesterImpl); + PVStructure* pvRequest = getCreateRequest()->createRequest("field(value)",&channelGetRequesterImpl); ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest); epicsThreadSleep ( 3.0 ); channelGet->get(false); epicsThreadSleep ( 3.0 ); - //TODOchannelGet->destroy(); + channelGet->destroy(); epicsThreadSleep ( 1.0 ); - +/* ChannelPutRequesterImpl channelPutRequesterImpl; ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest); epicsThreadSleep ( 1.0 ); @@ -3101,10 +3103,9 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channelPut->put(false); epicsThreadSleep ( 1.0 ); - //TODOchannelPut->destroy(); + channelPut->destroy(); + - // TODO delete pvRequest -*/ /* MonitorRequesterImpl monitorRequesterImpl; Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0); @@ -3122,6 +3123,10 @@ int main(int argc,char *argv[]) monitor->destroy(); */ + + // TODO share it? + delete pvRequest; + epicsThreadSleep ( 3.0 ); channel->destroy(); From 5cb78eac0506a5e9da3ecc4e658389516f88cd49 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Mon, 24 Jan 2011 18:07:37 +0100 Subject: [PATCH 7/7] TCP transport cleanup, lots of printf still in use --- .../remote/blockingClientTCPTransport.cpp | 8 +- .../remote/blockingServerTCPTransport.cpp | 2 +- pvAccessApp/remote/blockingTCP.h | 256 +++++++++++------- pvAccessApp/remote/blockingTCPTransport.cpp | 167 ++++++------ testApp/remote/testRemoteClientImpl.cpp | 74 +++-- 5 files changed, 300 insertions(+), 207 deletions(-) diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index 5f4a30f..1f9fe55 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -37,7 +37,7 @@ namespace epics { new IntrospectionRegistry(false)), _connectionTimeout(beaconInterval *1000), _unresponsiveTransport(false), _timerNode( new TimerNode(this)), _verifyOrEcho(true) { - _autoDelete = false; +// _autoDelete = false; // initialize owners list, send queue acquire(client); @@ -93,7 +93,7 @@ namespace epics { if(_closed) return false; char ipAddrStr[48]; - ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Acquiring transport to %s.", ipAddrStr); Lock lock2(&_ownersMutex); @@ -121,7 +121,7 @@ namespace epics { int refs = _owners.size(); if(refs>0) { char ipAddrStr[48]; - ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf( errlogInfo, "Transport to %s still has %d client(s) active and closing...", @@ -139,7 +139,7 @@ namespace epics { if(_closed) return; char ipAddrStr[48]; - ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Releasing transport to %s.", ipAddrStr); diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 86ab989..1deadb4 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -48,7 +48,7 @@ namespace epics { if(_channels.size()==0) return; char ipAddrStr[64]; - ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf( errlogInfo, diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 19fc260..6061b17 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -22,6 +22,7 @@ #include #include #include +#include /* EPICSv3 */ #include @@ -86,7 +87,7 @@ namespace epics { } virtual const osiSockAddr* getRemoteAddress() const { - return _socketAddress; + return &_socketAddress; } virtual int16 getPriority() const { @@ -108,12 +109,12 @@ namespace epics { virtual int getSocketReceiveBufferSize() const; virtual bool isVerified() const { - Lock lock(_verifiedMutex); + Lock lock(const_cast(&_verifiedMutex)); return _verified; } virtual void verified() { - Lock lock(_verifiedMutex); + Lock lock(&_verifiedMutex); _verified = true; } @@ -148,7 +149,7 @@ namespace epics { _flushStrategy = flushStrategy; } - void requestFlush(); + //void requestFlush(); /** * Close and free connection resources. @@ -165,80 +166,7 @@ namespace epics { void enqueueMonitorSendRequest(TransportSender* sender); protected: - /** - * Connection status - */ - bool volatile _closed; - - /** - * Corresponding channel. - */ - SOCKET _channel; - - /** - * Cached socket address. - */ - osiSockAddr* _socketAddress; - - /** - * Send buffer. - */ - epics::pvData::ByteBuffer* _sendBuffer; - - /** - * Remote side transport revision (minor). - */ - int8 _remoteTransportRevision; - - /** - * Remote side transport receive buffer size. - */ - int _remoteTransportReceiveBufferSize; - - /** - * Remote side transport socket receive buffer size. - */ - int _remoteTransportSocketReceiveBufferSize; - - /** - * Priority. - * NOTE: Priority cannot just be changed, since it is registered - * in transport registry with given priority. - */ - int16 _priority; - // TODO to be implemeneted - - /** - * CAS response handler. - */ - ResponseHandler* _responseHandler; - - /** - * Read sync. object monitor. - */ - //Object _readMonitor = new Object(); - - /** - * Total bytes received. - */ - int64 volatile _totalBytesReceived; - - /** - * Total bytes sent. - */ - int64 volatile _totalBytesSent; - - /** - * Marker to send. - */ - volatile int _markerToSend; - - volatile bool _verified; - - volatile int64 _remoteBufferFreeSpace; - - volatile bool _autoDelete; - + virtual void processReadCached(bool nestedCall, ReceiveStage inStage, int requiredBytes, bool addToBuffer); @@ -259,7 +187,8 @@ namespace epics { virtual ~BlockingTCPTransport(); - private: + + /** * Default marker period. */ @@ -267,7 +196,32 @@ namespace epics { static const int MAX_ENSURE_DATA_BUFFER_SIZE = 1024; - static const double delay = 0.01; + static const double _delay = 0.01; + + /****** finally initialized at construction time and after start (called by the same thread) ********/ + + /** + * Corresponding channel. + */ + SOCKET _channel; + + /** + * Cached socket address. + */ + osiSockAddr _socketAddress; + + /** + * Priority. + * NOTE: Priority cannot just be changed, since it is registered + * in transport registry with given priority. + */ + int16 _priority; + // TODO to be implemeneted + + /** + * CAS response handler. + */ + ResponseHandler* _responseHandler; /** * Send buffer size. @@ -284,6 +238,58 @@ namespace epics { */ int64 _markerPeriodBytes; + + SendQueueFlushStrategy _flushStrategy; + + + epicsThreadId _rcvThreadId; + + epicsThreadId _sendThreadId; + + MonitorSender* _monitorSender; + + Context* _context; + + bool _autoDelete; + + + + /**** after verification ****/ + + /** + * Remote side transport revision (minor). + */ + int8 _remoteTransportRevision; + + /** + * Remote side transport receive buffer size. + */ + int _remoteTransportReceiveBufferSize; + + /** + * Remote side transport socket receive buffer size. + */ + int _remoteTransportSocketReceiveBufferSize; + + + + /*** send thread only - no need to sync ***/ + // NOTE: now all send-related external calls are TransportSender IF + // and its reference is only valid when called from send thread + + // initialized at construction time + GrowingCircularBuffer* _sendQueue; + epics::pvData::Mutex _sendQueueMutex; + + // initialized at construction time + GrowingCircularBuffer* _monitorSendQueue; + epics::pvData::Mutex _monitorMutex; + + /** + * Send buffer. + */ + epics::pvData::ByteBuffer* _sendBuffer; + /** * Next planned marker position. */ @@ -299,20 +305,28 @@ namespace epics { */ int _lastMessageStartPosition; + int8 _lastSegmentedMessageType; + int8 _lastSegmentedMessageCommand; + + bool _flushRequested; + + int _sendBufferSentPosition; + + + + + + + + + + /*** receive thread only - no need to sync ***/ + + // initialized at construction time epics::pvData::ByteBuffer* _socketBuffer; int _startPosition; - epics::pvData::Mutex* _mutex; - epics::pvData::Mutex* _sendQueueMutex; - epics::pvData::Mutex* _verifiedMutex; - epics::pvData::Mutex* _monitorMutex; - - ReceiveStage _stage; - - int8 _lastSegmentedMessageType; - int8 _lastSegmentedMessageCommand; - int _storedPayloadSize; int _storedPosition; int _storedLimit; @@ -322,26 +336,68 @@ namespace epics { int8 _command; int _payloadSize; - volatile bool _flushRequested; + ReceiveStage _stage; - int _sendBufferSentPosition; + /** + * Total bytes received. + */ + int64 _totalBytesReceived; - SendQueueFlushStrategy _flushStrategy; - GrowingCircularBuffer* _sendQueue; - epicsThreadId _rcvThreadId; - epicsThreadId _sendThreadId; - GrowingCircularBuffer* _monitorSendQueue; - MonitorSender* _monitorSender; + /*** send/receive thread shared ***/ - Context* _context; + /** + * Connection status + * NOTE: synced by _mutex + */ + bool volatile _closed; - volatile bool _sendThreadRunning; + // NOTE: synced by _mutex + bool _sendThreadExited; + epics::pvData::Mutex _mutex; + + + bool _verified; + epics::pvData::Mutex _verifiedMutex; + + + + + Event _sendQueueEvent; + + + + + + + /** + * Marker to send. + * NOTE: synced by _flowControlMutex + */ + int _markerToSend; + + /** + * Total bytes sent. + * NOTE: synced by _flowControlMutex + */ + int64 _totalBytesSent; + + /** + * Calculated remote free buffer size. + * NOTE: synced by _flowControlMutex + */ + int64 _remoteBufferFreeSpace; + + epics::pvData::Mutex _flowControlMutex; + + + private: + /** * Internal method that clears and releases buffer. * sendLock and sendBufferLock must be hold while calling this method. diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 930edf5..2a88afb 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -65,7 +65,7 @@ namespace epics { BlockingTCPTransport::BlockingTCPTransport(Context* context, SOCKET channel, ResponseHandler* responseHandler, int receiveBufferSize, int16 priority) : - _closed(false), _channel(channel), _socketAddress(new osiSockAddr), + _closed(false), _channel(channel), _remoteTransportRevision(0), _remoteTransportReceiveBufferSize(MAX_TCP_RECV), _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), @@ -75,9 +75,7 @@ namespace epics { LONG_LONG_MAX), _autoDelete(true), _markerPeriodBytes(MARKER_PERIOD), _nextMarkerPosition( _markerPeriodBytes), _sendPending(false), - _lastMessageStartPosition(0), _mutex(new Mutex()), - _sendQueueMutex(new Mutex()), _verifiedMutex(new Mutex()), - _monitorMutex(new Mutex()), _stage(READ_FROM_SOCKET), + _lastMessageStartPosition(0), _stage(READ_FROM_SOCKET), _lastSegmentedMessageType(0), _lastSegmentedMessageCommand( 0), _storedPayloadSize(0), _storedPosition(0), _storedLimit(0), _magicAndVersion(0), _packetType(0), @@ -87,9 +85,9 @@ namespace epics { new GrowingCircularBuffer (100)), _rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue( new GrowingCircularBuffer (100)), - _monitorSender(new MonitorSender(_monitorMutex, + _monitorSender(new MonitorSender(&_monitorMutex, _monitorSendQueue)), _context(context), - _sendThreadRunning(false) { + _sendThreadExited(false) { _socketBuffer = new ByteBuffer(max(MAX_TCP_RECV +MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize), EPICS_ENDIAN_BIG); @@ -114,7 +112,7 @@ namespace epics { } socklen_t saSize = sizeof(sockaddr); - retval = getpeername(_channel, &(_socketAddress->sa), &saSize); + retval = getpeername(_channel, &(_socketAddress.sa), &saSize); if(retval<0) { errlogSevPrintf(errlogMajor, "Error fetching socket remote address: %s", strerror( @@ -130,26 +128,20 @@ namespace epics { BlockingTCPTransport::~BlockingTCPTransport() { close(true); - // TODO remove - epicsThreadSleep(3.0); - delete _socketAddress; delete _sendQueue; delete _socketBuffer; delete _sendBuffer; - delete _mutex; - delete _sendQueueMutex; - delete _verifiedMutex; - delete _monitorMutex; - delete _responseHandler; } void BlockingTCPTransport::start() { - _sendThreadRunning = true; + + // TODO consuder epics::pvData::Thread + String threadName = "TCP-receive "+inetAddressToString( - * _socketAddress); + _socketAddress); errlogSevPrintf(errlogInfo, "Starting thread: %s", threadName.c_str()); @@ -159,7 +151,7 @@ namespace epics { epicsThreadStackMedium), BlockingTCPTransport::rcvThreadRunner, this); - threadName = "TCP-send "+inetAddressToString(*_socketAddress); + threadName = "TCP-send "+inetAddressToString(_socketAddress); errlogSevPrintf(errlogInfo, "Starting thread: %s", threadName.c_str()); @@ -178,9 +170,9 @@ namespace epics { _nextMarkerPosition -= _sendBuffer->getPosition() -CA_MESSAGE_HEADER_SIZE; - _sendQueueMutex->lock(); + _sendQueueMutex.lock(); _flushRequested = false; - _sendQueueMutex->unlock(); + _sendQueueMutex.unlock(); _sendBuffer->clear(); @@ -194,12 +186,13 @@ namespace epics { } void BlockingTCPTransport::close(bool force) { - Lock lock(_mutex); + Lock lock(&_mutex); // already closed check if(_closed) return; _closed = true; + printf("closing.\n"); // remove from registry _context->getTransportRegistry()->remove(this); @@ -207,12 +200,8 @@ namespace epics { // clean resources internalClose(force); - // threads cannot "wait" Epics, no need to notify - // TODO check alternatives to "wait" // notify send queue - //synchronized (sendQueue) { - // sendQueue.notifyAll(); - //} + _sendQueueEvent.signal(); } void BlockingTCPTransport::internalClose(bool force) { @@ -239,21 +228,22 @@ namespace epics { return sockBufSize; } + // TODO reimplement using Event bool BlockingTCPTransport::waitUntilVerified(double timeout) { double internalTimeout = timeout; bool internalVerified = false; - _verifiedMutex->lock(); + _verifiedMutex.lock(); internalVerified = _verified; - _verifiedMutex->unlock(); + _verifiedMutex.unlock(); while(!internalVerified&&internalTimeout>0) { epicsThreadSleep(min(0.1, internalTimeout)); internalTimeout -= 0.1; - _verifiedMutex->lock(); + _verifiedMutex.lock(); internalVerified = _verified; - _verifiedMutex->unlock(); + _verifiedMutex.unlock(); } return internalVerified; } @@ -510,7 +500,7 @@ namespace epics { errlogSevPrintf( errlogMinor, "Invalid header received from client %s, disconnecting...", - inetAddressToString(*_socketAddress).c_str()); + inetAddressToString(_socketAddress).c_str()); close(true); return; } @@ -531,11 +521,14 @@ namespace epics { } else if(type==1) { if(_command==0) { - if(_markerToSend==0) _markerToSend - = _payloadSize; // TODO send back response + _flowControlMutex.lock(); + if(_markerToSend==0) + _markerToSend = _payloadSize; // TODO send back response + _flowControlMutex.unlock(); } else //if (command == 1) { + _flowControlMutex.lock(); int difference = (int)_totalBytesSent -_payloadSize+CA_MESSAGE_HEADER_SIZE; // overrun check @@ -545,6 +538,7 @@ namespace epics { +_remoteTransportSocketReceiveBufferSize -difference; // TODO if this is calculated wrong, this can be critical !!! + _flowControlMutex.unlock(); } // no payload @@ -556,7 +550,7 @@ namespace epics { errlogMajor, "Unknown packet type %d, received from client %s, disconnecting...", type, - inetAddressToString(*_socketAddress).c_str()); + inetAddressToString(_socketAddress).c_str()); close(true); return; } @@ -580,7 +574,7 @@ namespace epics { +_storedPayloadSize, _storedLimit)); try { // handle response - _responseHandler->handleResponse(_socketAddress, + _responseHandler->handleResponse(&_socketAddress, this, version, _command, _payloadSize, _socketBuffer); } catch(...) { @@ -628,8 +622,10 @@ namespace epics { _sendBufferSentPosition = 0; // if not set skip marker otherwise set it + _flowControlMutex.lock(); int markerValue = _markerToSend; _markerToSend = 0; + _flowControlMutex.unlock(); if(markerValue==0) _sendBufferSentPosition = CA_MESSAGE_HEADER_SIZE; else @@ -694,7 +690,7 @@ namespace epics { //errlogSevPrintf(errlogInfo, // "Sending %d of total %d bytes in the packet to %s.", // bytesToSend, limit, - // inetAddressToString(*_socketAddress).c_str()); + // inetAddressToString(_socketAddress).c_str()); while(buffer->getRemaining()>0) { ssize_t bytesSent = ::send(_channel, @@ -720,13 +716,15 @@ namespace epics { //errlogSevPrintf(errlogInfo, // "Send buffer full for %s, waiting...", - // inetAddressToString(*_socketAddress)); + // inetAddressToString(_socketAddress)); return false; } buffer->setPosition(buffer->getPosition()+bytesSent); + _flowControlMutex.lock(); _totalBytesSent += bytesSent; + _flowControlMutex.unlock(); // readjust limit if(bytesToSend==maxBytesToSend) { @@ -752,18 +750,9 @@ namespace epics { TransportSender* BlockingTCPTransport::extractFromSendQueue() { TransportSender* retval; - _sendQueueMutex->lock(); - try { - if(_sendQueue->size()>0) - retval = _sendQueue->extract(); - else - retval = NULL; - } catch(...) { - // not expecting the exception here, but just to be safe - retval = NULL; - } - - _sendQueueMutex->unlock(); + _sendQueueMutex.lock(); + retval = _sendQueue->extract(); + _sendQueueMutex.unlock(); return retval; } @@ -772,23 +761,35 @@ namespace epics { while(!_closed) { TransportSender* sender; +// TODO race! sender = extractFromSendQueue(); + printf("extraced %d\n", sender); // wait for new message - while(sender==NULL&&!_flushRequested&&!_closed) { + while(sender==NULL&&!_flushRequested/*&&!_closed*/) { + + + bool c; + _mutex.lock(); + c = _closed; + printf("closed %d\n", c); + _mutex.unlock(); + if (c) + break; + if(_flushStrategy==DELAYED) { - if(delay>0) epicsThreadSleep(delay); + if(_delay>0) epicsThreadSleep(_delay); if(_sendQueue->size()==0) { // if (hasMonitors || sendBuffer.position() > CAConstants.CA_MESSAGE_HEADER_SIZE) - if(_sendBuffer->getPosition() - >CA_MESSAGE_HEADER_SIZE) + if(_sendBuffer->getPosition()>CA_MESSAGE_HEADER_SIZE) _flushRequested = true; else - epicsThreadSleep(0); + _sendQueueEvent.wait(); } } else - epicsThreadSleep(0); + _sendQueueEvent.wait(); sender = extractFromSendQueue(); + printf("extraced2 %d\n", sender); } // always do flush from this thread @@ -827,13 +828,6 @@ namespace epics { } // while(!_closed) } - void BlockingTCPTransport::requestFlush() { - // needless lock, manipulating a single byte - //Lock lock(_sendQueueMutex); - if(_flushRequested) return; - _flushRequested = true; - } - void BlockingTCPTransport::freeSendBuffers() { // TODO ? } @@ -842,7 +836,7 @@ namespace epics { freeSendBuffers(); errlogSevPrintf(errlogInfo, "Connection to %s closed.", - inetAddressToString(*_socketAddress).c_str()); + inetAddressToString(_socketAddress).c_str()); if(_channel!=INVALID_SOCKET) { epicsSocketDestroy(_channel); @@ -853,53 +847,68 @@ namespace epics { void BlockingTCPTransport::rcvThreadRunner(void* param) { BlockingTCPTransport* obj = (BlockingTCPTransport*)param; +try{ obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false); - +} catch (...) { +printf("rcvThreadRunnner exception\n"); +} + printf("rcvThreadRunner done, autodelete %d-\n", obj->_autoDelete); if(obj->_autoDelete) { - while(obj->_sendThreadRunning) + while(true) + { + printf("waiting send thread to exit.\n"); + bool exited; + obj->_mutex.lock(); + exited = obj->_sendThreadExited; + obj->_mutex.unlock(); + if (exited) + break; epicsThreadSleep(0.1); - + } + printf("deleting.\n"); delete obj; } } void BlockingTCPTransport::sendThreadRunner(void* param) { BlockingTCPTransport* obj = (BlockingTCPTransport*)param; - +try { obj->processSendQueue(); +} catch (...) { +printf("sendThreadRunnner exception\n"); +} obj->freeConnectionResorces(); + printf("exited.\n"); - obj->_sendThreadRunning = false; + // TODO possible crash on unlock + obj->_mutex.lock(); + obj->_sendThreadExited = true; + obj->_mutex.unlock(); } void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) { + Lock lock(&_sendQueueMutex); if(_closed) return; - Lock lock(_sendQueueMutex); _sendQueue->insert(sender); + _sendQueueEvent.signal(); } - void BlockingTCPTransport::enqueueMonitorSendRequest( - TransportSender* sender) { + void BlockingTCPTransport::enqueueMonitorSendRequest(TransportSender* sender) { + Lock lock(&_monitorMutex); if(_closed) return; - Lock lock(_monitorMutex); _monitorSendQueue->insert(sender); if(_monitorSendQueue->size()==1) enqueueSendRequest(_monitorSender); } - void MonitorSender::send(ByteBuffer* buffer, - TransportSendControl* control) { + void MonitorSender::send(ByteBuffer* buffer, TransportSendControl* control) { control->startMessage(19, 0); while(true) { TransportSender* sender; _monitorMutex->lock(); if(_monitorSendQueue->size()>0) - try { - sender = _monitorSendQueue->extract(); - } catch(...) { - sender = NULL; - } + sender = _monitorSendQueue->extract(); else sender = NULL; _monitorMutex->unlock(); diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index b26afac..4e1a6fb 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -165,8 +165,8 @@ public: // destroy remote instance if (!m_remotelyDestroyed) { - startRequest(PURE_DESTROY_REQUEST); - m_channel->checkAndGetTransport()->enqueueSendRequest(this); +// TODO !!! startRequest(PURE_DESTROY_REQUEST); +/// TODO !!! causes crash m_channel->checkAndGetTransport()->enqueueSendRequest(this); } } @@ -320,6 +320,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess virtual void destroy() { + BaseRequestImpl::destroy(); delete this; } @@ -459,6 +460,7 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet virtual void destroy() { + BaseRequestImpl::destroy(); // TODO sync if (m_data) delete m_data; if (m_bitSet) delete m_bitSet; @@ -636,6 +638,7 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut virtual void destroy() { + BaseRequestImpl::destroy(); // TODO sync if (m_data) delete m_data; if (m_bitSet) delete m_bitSet; @@ -993,10 +996,14 @@ typedef std::map IOIDResponseRequestMap; AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); transport->ensureData(4); - DataResponse* nrr = dynamic_cast(_context->getResponseRequest(payloadBuffer->getInt())); - if (nrr) - nrr->response(transport, version, payloadBuffer); - } + ResponseRequest* rr = _context->getResponseRequest(payloadBuffer->getInt()); + if (rr) + { + DataResponse* nrr = dynamic_cast(rr); + if (nrr) + nrr->response(transport, version, payloadBuffer); + } + } }; @@ -1794,7 +1801,8 @@ class TestChannelImpl : public ChannelImpl { { if (remoteDestroy) { m_issueCreateMessage = false; - m_transport->enqueueSendRequest(this); + // TODO !!! this causes problems.. since qnqueueSendRequest is added and this instance deleted + //m_transport->enqueueSendRequest(this); } ReferenceCountingTransport* rct = dynamic_cast(m_transport); @@ -2505,6 +2513,7 @@ TODO { Lock guard(&m_ioidMapMutex); IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid); + printf("getResponseRequest %d = %d\n", ioid, (it == m_pendingResponseRequests.end() ? 0 : it->second)); return (it == m_pendingResponseRequests.end() ? 0 : it->second); } @@ -2517,6 +2526,7 @@ TODO { Lock guard(&m_ioidMapMutex); pvAccessID ioid = generateIOID(); + printf("registerResponseRequest %d = %d\n", ioid, request); m_pendingResponseRequests[ioid] = request; return ioid; } @@ -2530,10 +2540,12 @@ TODO { Lock guard(&m_ioidMapMutex); IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(request->getIOID()); + printf("unregisterResponseRequest %d = %d\n", request->getIOID(), request); if (it == m_pendingResponseRequests.end()) return 0; ResponseRequest* retVal = it->second; + printf("unregisterResponseRequest %d = %d==%d\n", request->getIOID(), request, retVal); m_pendingResponseRequests.erase(it); return retVal; } @@ -2923,10 +2935,13 @@ class ChannelGetRequesterImpl : public ChannelGetRequester virtual void getDone(epics::pvData::Status *status) { std::cout << "getDone(" << status->toString() << ")" << std::endl; - String str; - m_pvStructure->toString(&str); - std::cout << str; - std::cout << std::endl; + if (m_pvStructure) + { + String str; + m_pvStructure->toString(&str); + std::cout << str; + std::cout << std::endl; + } } }; @@ -2960,19 +2975,25 @@ class ChannelPutRequesterImpl : public ChannelPutRequester virtual void getDone(epics::pvData::Status *status) { std::cout << "getDone(" << status->toString() << ")" << std::endl; - String str; - m_pvStructure->toString(&str); - std::cout << str; - std::cout << std::endl; + if (m_pvStructure) + { + String str; + m_pvStructure->toString(&str); + std::cout << str; + std::cout << std::endl; + } } virtual void putDone(epics::pvData::Status *status) { std::cout << "putDone(" << status->toString() << ")" << std::endl; - String str; - m_pvStructure->toString(&str); - std::cout << str; - std::cout << std::endl; + if (m_pvStructure) + { + String str; + m_pvStructure->toString(&str); + std::cout << str; + std::cout << std::endl; + } } }; @@ -3074,7 +3095,7 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channel->printInfo(); -/* + GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); epicsThreadSleep ( 1.0 ); @@ -3086,16 +3107,17 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channelProcess->destroy(); epicsThreadSleep ( 1.0 ); -*/ + ChannelGetRequesterImpl channelGetRequesterImpl; - PVStructure* pvRequest = getCreateRequest()->createRequest("field(value)",&channelGetRequesterImpl); + PVStructure* pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&channelGetRequesterImpl); ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest); epicsThreadSleep ( 3.0 ); channelGet->get(false); epicsThreadSleep ( 3.0 ); channelGet->destroy(); epicsThreadSleep ( 1.0 ); -/* + + ChannelPutRequesterImpl channelPutRequesterImpl; ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest); epicsThreadSleep ( 1.0 ); @@ -3128,9 +3150,15 @@ int main(int argc,char *argv[]) delete pvRequest; epicsThreadSleep ( 3.0 ); + printf("Destroying channel... \n"); channel->destroy(); + printf("done.\n"); + epicsThreadSleep ( 3.0 ); + + printf("Destroying context... \n"); context->destroy(); + printf("done.\n"); std::cout << "-----------------------------------------------------------------------" << std::endl; getShowConstructDestruct()->constuctDestructTotals(stdout);