From 4a1bfff40fe6ff0cfb978f0a29ae26bc14488a1e Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 18 Nov 2015 16:20:49 -0600 Subject: [PATCH] Remove clearSendQueue Use BreakTransport exception instead --- src/remote/codec.cpp | 56 +++++++++++++++----------- src/remote/codec.h | 2 +- src/remoteClient/clientContextImpl.cpp | 5 ++- testApp/remote/testCodec.cpp | 37 +---------------- 4 files changed, 38 insertions(+), 62 deletions(-) diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 9bb4e6d..7036d1a 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -32,6 +32,18 @@ using namespace std; using namespace epics::pvData; using namespace epics::pvAccess; +namespace { +struct BreakTransport : TransportSender +{ + virtual ~BreakTransport() {} + virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) + { + throw epics::pvAccess::detail::connection_closed_exception("Break"); + } + virtual void lock() {} + virtual void unlock() {} +}; +} // namespace namespace epics { namespace pvAccess { @@ -883,12 +895,6 @@ namespace epics { } - void AbstractCodec::clearSendQueue() - { - _sendQueue.clear(); - } - - void AbstractCodec::enqueueSendRequest( TransportSender::shared_pointer const & sender) { _sendQueue.push_back(sender); @@ -1044,6 +1050,13 @@ namespace epics { .autostart(false)) { _isOpen.getAndSet(true);} + BlockingAbstractCodec::~BlockingAbstractCodec() + { + assert(!_isOpen.get()); + _sendThread.exitWait(); + _readThread.exitWait(); + } + void BlockingAbstractCodec::readPollOne() { throw std::logic_error("should not be called for blocking IO"); } @@ -1061,18 +1074,21 @@ namespace epics { // always close in the same thread, same way, etc. // wakeup processSendQueue - // clean resources + // clean resources (close socket) internalClose(true); - // this is important to avoid cyclic refs (memory leak) - clearSendQueue(); + // Break sender from queue wait + BreakTransport::shared_pointer B(new BreakTransport); + enqueueSendRequest(B); // post close internalPostClose(true); } } - void BlockingAbstractCodec::internalClose(bool /*force*/) { + void BlockingAbstractCodec::internalClose(bool /*force*/) + { + this->internalDestroy(); } void BlockingAbstractCodec::internalPostClose(bool /*force*/) { @@ -1143,18 +1159,7 @@ namespace epics { __FILE__, __LINE__); } } - - /* - // wait read thread to die - // TODO rewise if this is really needed - // this timeout is needed where close() is initiated from the send thread, - // and not from the read thread as usualy - recv() does not exit until socket is not destroyed, - // which is done the internalDestroy() call below - bac->_shutdownEvent.wait(3.0); - */ - - // call internal destroy - this->internalDestroy(); + _sendQueue.clear(); } @@ -1233,7 +1238,7 @@ namespace epics { epicsSocketDestroy(_channel); } - _channel = INVALID_SOCKET; + _channel = INVALID_SOCKET; //TODO: mutex to guard _channel } } @@ -1863,7 +1868,10 @@ namespace epics { // not used anymore, close it // TODO consider delayed destruction (can improve performance!!!) - if(_owners.size()==0) close(); // TODO close(false) + if(_owners.size()==0) { + lock.unlock(); + close(); + } } void BlockingClientTCPTransportCodec::aliveNotification() { diff --git a/src/remote/codec.h b/src/remote/codec.h index 8d3084d..e01d863 100644 --- a/src/remote/codec.h +++ b/src/remote/codec.h @@ -193,7 +193,6 @@ namespace epics { void processWrite(); void processRead(); void processSendQueue(); - void clearSendQueue(); void enqueueSendRequest(TransportSender::shared_pointer const & sender); void enqueueSendRequest(TransportSender::shared_pointer const & sender, std::size_t requiredBufferSize); @@ -285,6 +284,7 @@ namespace epics { std::tr1::shared_ptr const & receiveBuffer, std::tr1::shared_ptr const & sendBuffer, int32_t socketSendBufferSize); + virtual ~BlockingAbstractCodec(); void readPollOne(); void writePollOne(); diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index ffe72bb..84a2e66 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -3877,6 +3877,9 @@ namespace epics { * @param remoteDestroy issue channel destroy request. */ void disconnect(bool initiateSearch, bool remoteDestroy) { + // order of oldchan and guard is important to ensure + // oldchan is destoryed after unlock + Transport::shared_pointer oldchan; Lock guard(m_channelMutex); if (m_connectionState != CONNECTED) @@ -3900,7 +3903,7 @@ namespace epics { } m_transport->release(getID()); - m_transport.reset(); + oldchan.swap(m_transport); } if (initiateSearch) diff --git a/testApp/remote/testCodec.cpp b/testApp/remote/testCodec.cpp index c476cfe..9afa90d 100644 --- a/testApp/remote/testCodec.cpp +++ b/testApp/remote/testCodec.cpp @@ -436,7 +436,7 @@ namespace epics { public: int runAllTest() { - testPlan(5885); + testPlan(5883); testHeaderProcess(); testInvalidHeaderMagic(); testInvalidHeaderSegmentedInNormal(); @@ -462,7 +462,6 @@ namespace epics { testSendException(); testSendHugeMessagePartes(); testRecipient(); - testClearSendQueue(); testInvalidArguments(); testDefaultModes(); testEnqueueSendRequestExceptionThrown(); @@ -2935,40 +2934,6 @@ namespace epics { TestCodec &_codec; }; - - void testClearSendQueue() - { - testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); - TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); - - std::tr1::shared_ptr sender = - std::tr1::shared_ptr( - new TransportSenderForTestClearSendQueue(codec)); - - std::tr1::shared_ptr sender2 = - std::tr1::shared_ptr( - new TransportSender2ForTestClearSendQueue(codec)); - - - codec.enqueueSendRequest(sender); - codec.enqueueSendRequest(sender2); - - codec.clearSendQueue(); - - codec.breakSender(); - try{ - codec.processSendQueue(); - }catch(sender_break&) {} - - testOk(0 == codec.getSendBuffer()->getPosition(), - "%s: 0 == codec.getSendBuffer()->getPosition()", - CURRENT_FUNCTION); - testOk(0 == codec._writeBuffer.getPosition(), - "%s: 0 == codec._writeBuffer.getPosition()", - CURRENT_FUNCTION); - } - - void testInvalidArguments() { testDiag("BEGIN TEST %s:", CURRENT_FUNCTION);