From 5ab2ead581178a30c9e8b567455fba3ba6ce09e3 Mon Sep 17 00:00:00 2001 From: damjankumar Date: Thu, 13 Feb 2014 15:52:02 +0100 Subject: [PATCH] internalDestroy problem solved + memory leak fixed in abstract codec --- pvAccessApp/remote/codec.cpp | 52 +++++++++++++++++++++----- pvAccessApp/remote/codec.h | 23 +++++++----- testApp/remote/channelAccessIFTest.cpp | 8 ++-- testApp/remote/testChannelAccess.cpp | 2 +- testApp/remote/testCodec.cpp | 35 ++++++++--------- 5 files changed, 78 insertions(+), 42 deletions(-) diff --git a/pvAccessApp/remote/codec.cpp b/pvAccessApp/remote/codec.cpp index e85818d..80a5a9c 100644 --- a/pvAccessApp/remote/codec.cpp +++ b/pvAccessApp/remote/codec.cpp @@ -39,8 +39,8 @@ namespace epics { const std::size_t AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE = 1024; AbstractCodec::AbstractCodec( - ByteBuffer *receiveBuffer, - ByteBuffer *sendBuffer, + std::tr1::shared_ptr receiveBuffer, + std::tr1::shared_ptr sendBuffer, int32_t socketSendBufferSize, bool blockingProcessQueue): //PROTECTED @@ -49,6 +49,8 @@ namespace epics { _blockingProcessQueue(false), _senderThread(0), _writeMode(PROCESS_SEND_QUEUE), _writeOpReady(false),_lowLatency(false), + _socketBuffer(receiveBuffer), + _sendBuffer(sendBuffer), //PRIVATE _storedPayloadSize(0), _storedPosition(0), _startPosition(0), _maxSendPayloadSize(0), @@ -76,9 +78,6 @@ namespace epics { throw std::invalid_argument( "sendBuffer() % PVAConstants.PVA_ALIGNMENT != 0"); - _socketBuffer.reset(receiveBuffer); - _sendBuffer.reset(sendBuffer); - // initialize to be empty _socketBuffer->setPosition(_socketBuffer->getLimit()); _startPosition = _socketBuffer->getPosition(); @@ -369,6 +368,11 @@ namespace epics { if (bytesRead < 0) { + + LOG(logLevelTrace, + "AbstractCodec::before close (threadId: %u)", + epicsThreadGetIdSelf()); + close(); throw connection_closed_exception("bytesRead < 0"); } @@ -1153,6 +1157,7 @@ namespace epics { BlockingAbstractCodec *bac = static_cast(param); + Transport::shared_pointer ptr (bac->shared_from_this()); while (bac->isOpen()) { @@ -1181,6 +1186,8 @@ namespace epics { BlockingAbstractCodec *bac = static_cast(param); + Transport::shared_pointer ptr (bac->shared_from_this()); + bac->setSenderThread(); while (bac->isOpen()) @@ -1209,7 +1216,7 @@ namespace epics { LOG(logLevelTrace, "XXXXXXXXXXXXXXXXXXXXXXXXXXXX" "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX (threadId: %u)", epicsThreadGetIdSelf()); - //bac->internalDestroy(); + bac->internalDestroy(); LOG(logLevelTrace, "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY" "YYYYYYYYYYYYYYYYYYYYYYYYYYYY (threadId: %u)", epicsThreadGetIdSelf()); @@ -1246,12 +1253,12 @@ namespace epics { int32_t sendBufferSize, int32_t receiveBufferSize): BlockingAbstractCodec( - new ByteBuffer((std::max((std::size_t)( + std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + - (PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1))), - new ByteBuffer((std::max((std::size_t)( MAX_TCP_RECV + + (PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))), + std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1)) - & (~(PVA_ALIGNMENT - 1))), sendBufferSize), + & (~(PVA_ALIGNMENT - 1)))), sendBufferSize), _channel(channel) { @@ -1325,10 +1332,22 @@ namespace epics { std::size_t remaining; while((remaining=src->getRemaining()) > 0) { + LOG(logLevelTrace, + "BlockingSocketAbstractCodec::write before send" + " (threadId: %u)", + epicsThreadGetIdSelf()); + + int bytesSent = ::send(_channel, &src->getArray()[src->getPosition()], remaining, 0); + LOG(logLevelTrace, + "BlockingSocketAbstractCodec::write afer send, read:%d" + " (threadId: %u)", + bytesSent, epicsThreadGetIdSelf()); + + if(unlikely(bytesSent<0)) { int socketError = SOCKERRNO; @@ -1390,9 +1409,21 @@ namespace epics { // read std::size_t pos = dst->getPosition(); + + LOG(logLevelTrace, + "BlockingSocketAbstractCodec::read before recv" + " (threadId: %u)", + epicsThreadGetIdSelf()); + int bytesRead = recv(_channel, (char*)(dst->getArray()+pos), remaining, 0); + + LOG(logLevelTrace, + "BlockingSocketAbstractCodec::read after recv, read: %d", + bytesRead," (threadId: %u)", + epicsThreadGetIdSelf()); + if (IS_LOGGABLE(logLevelTrace)) { hexDump(std::string("READ"), @@ -1414,6 +1445,7 @@ namespace epics { return -1; // 0 means connection loss for blocking transport, notify codec by returning -1 } + dst->setPosition(dst->getPosition() + bytesRead); return bytesRead; } diff --git a/pvAccessApp/remote/codec.h b/pvAccessApp/remote/codec.h index f885d88..58339b5 100644 --- a/pvAccessApp/remote/codec.h +++ b/pvAccessApp/remote/codec.h @@ -280,8 +280,8 @@ namespace epics { static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE; AbstractCodec( - epics::pvData::ByteBuffer *receiveBuffer, - epics::pvData::ByteBuffer *sendBuffer, + std::tr1::shared_ptr receiveBuffer, + std::tr1::shared_ptr sendBuffer, int32_t socketSendBufferSize, bool blockingProcessQueue); @@ -354,8 +354,8 @@ namespace epics { bool _writeOpReady; bool _lowLatency; - std::auto_ptr _socketBuffer; - std::auto_ptr _sendBuffer; + std::tr1::shared_ptr _socketBuffer; + std::tr1::shared_ptr _sendBuffer; epics::pvAccess::queue _sendQueue; @@ -385,15 +385,17 @@ namespace epics { }; - class BlockingAbstractCodec: public AbstractCodec { + class BlockingAbstractCodec: + public AbstractCodec, + public std::tr1::enable_shared_from_this { public: POINTER_DEFINITIONS(BlockingAbstractCodec); - BlockingAbstractCodec( - epics::pvData::ByteBuffer *receiveBuffer, - epics::pvData::ByteBuffer *sendBuffer, + BlockingAbstractCodec( + std::tr1::shared_ptr receiveBuffer, + std::tr1::shared_ptr sendBuffer, int32_t socketSendBufferSize): AbstractCodec(receiveBuffer, sendBuffer, socketSendBufferSize, true), _readThread(0), _sendThread(0) { _isOpen.getAndSet(true);} @@ -448,8 +450,9 @@ namespace epics { class BlockingTCPTransportCodec : - public BlockingSocketAbstractCodec, - public std::tr1::enable_shared_from_this { + public BlockingSocketAbstractCodec + + { public: diff --git a/testApp/remote/channelAccessIFTest.cpp b/testApp/remote/channelAccessIFTest.cpp index cdf5f69..8f8b51b 100755 --- a/testApp/remote/channelAccessIFTest.cpp +++ b/testApp/remote/channelAccessIFTest.cpp @@ -48,7 +48,7 @@ int ChannelAccessIFTest::runAllTest() { testPlan(152); #endif -/* test_implementation(); + test_implementation(); test_providerName(); test_createEmptyChannel(); @@ -56,10 +56,10 @@ int ChannelAccessIFTest::runAllTest() { test_createChannel(); test_recreateChannelOnDestroyedProvider(); test_findEmptyChannel(); - test_findChannel();*/ + test_findChannel(); test_channel(); -/* test_channelGetWithInvalidChannelAndRequester(); + test_channelGetWithInvalidChannelAndRequester(); test_channelGetNoProcess(); test_channelGetIntProcess(); test_channelGetTestNoConnection(); @@ -95,7 +95,7 @@ int ChannelAccessIFTest::runAllTest() { test_channelArray(); test_channelArray_destroy(); - test_channelArrayTestNoConnection();*/ + test_channelArrayTestNoConnection(); #ifdef ENABLE_STRESS_TESTS test_stressConnectDisconnect(); diff --git a/testApp/remote/testChannelAccess.cpp b/testApp/remote/testChannelAccess.cpp index d6778c6..0f1021d 100755 --- a/testApp/remote/testChannelAccess.cpp +++ b/testApp/remote/testChannelAccess.cpp @@ -88,7 +88,7 @@ class ChannelAccessIFRemoteTest: public ChannelAccessIFTest { MAIN(testChannelProvider) { - SET_LOG_LEVEL(logLevelTrace); + SET_LOG_LEVEL(logLevelError); ChannelAccessIFRemoteTest caRemoteTest; return caRemoteTest.runAllTest(); } diff --git a/testApp/remote/testCodec.cpp b/testApp/remote/testCodec.cpp index 814b384..602ee82 100644 --- a/testApp/remote/testCodec.cpp +++ b/testApp/remote/testCodec.cpp @@ -6,7 +6,6 @@ #define NOMINMAX #endif -//testing #include #include @@ -25,13 +24,7 @@ namespace epics { class PVAMessage { public: - - int8_t _version; - int8_t _flags; - int8_t _command; - int32_t _payloadSize; - std::tr1::shared_ptr _payload; - + PVAMessage(int8_t version, int8_t flags, int8_t command, @@ -42,6 +35,12 @@ namespace epics { _payloadSize = payloadSize; } + int8_t _version; + int8_t _flags; + int8_t _command; + int32_t _payloadSize; + std::tr1::shared_ptr _payload; + //memberwise copy constructor/assigment operator //provided by the compiler }; @@ -67,7 +66,12 @@ namespace epics { std::size_t receiveBufferSize, std::size_t sendBufferSize, bool blocking = false): - _closedCount(0), + AbstractCodec( + std::tr1::shared_ptr(new ByteBuffer(receiveBufferSize)), + std::tr1::shared_ptr(new ByteBuffer(sendBufferSize)), + sendBufferSize/10, + blocking ), + _closedCount(0), _invalidDataStreamCount(0), _scheduleSendCount(0), _sendCompletedCount(0), @@ -79,12 +83,8 @@ namespace epics { _disconnected(false), _forcePayloadRead(-1), _readBuffer(new ByteBuffer(receiveBufferSize)), - _writeBuffer(sendBufferSize), - AbstractCodec( - new ByteBuffer(receiveBufferSize), - new ByteBuffer(sendBufferSize), - sendBufferSize/10, - blocking ) { + _writeBuffer(sendBufferSize) + { } @@ -253,6 +253,7 @@ namespace epics { void endBlockedProcessSendQueue() { + //TODO not thread safe _blockingProcessQueue = false; _sendQueue.wakeup(); } @@ -266,14 +267,14 @@ namespace epics { WriteMode getWriteMode() { return _writeMode;} - std::auto_ptr & getSendBuffer() + std::tr1::shared_ptr getSendBuffer() { return _sendBuffer; } osiSockAddr getLastReadBufferSocketAddress() { - osiSockAddr tmp = {0}; + osiSockAddr tmp = {{0}}; return tmp; }