From 88e33f115593a9c1cc4136ab9afb89bcba3c2bee Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Mon, 7 Feb 2011 22:19:46 +0100 Subject: [PATCH] channel/responseRequest destruction --- pvAccessApp/remote/blockingTCPTransport.cpp | 2 +- pvAccessApp/remote/remote.h | 16 +- .../remoteClient/clientContextImpl.cpp | 153 +++++++++++------- 3 files changed, 111 insertions(+), 60 deletions(-) diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 4452616..ac051bb 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -587,7 +587,7 @@ namespace epics { this, version, _command, _payloadSize, _socketBuffer); } catch(...) { - //noop + //noop // TODO print? } /* diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 24f4672..6c4ecb0 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -98,13 +98,22 @@ namespace epics { virtual void setRecipient(const osiSockAddr& sendTo) =0; }; + + /** + * Reference counting instance. + */ + class ReferenceCountingInstance { + public: + virtual void acquire() =0; + virtual void release() =0; + }; /** * Interface defining transport sender (instance sending data over transport). * @author Matej Sekoranja * @version $Id: TransportSender.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ */ - class TransportSender { + class TransportSender : public ReferenceCountingInstance { public: virtual ~TransportSender() { } @@ -122,9 +131,6 @@ namespace epics { virtual void lock() =0; virtual void unlock() =0; - - virtual void acquire() =0; - virtual void release() =0; }; /** @@ -507,7 +513,7 @@ namespace epics { * This interface needs to be extended (to provide method called on response). * @author Matej Sekoranja */ - class ResponseRequest { + class ResponseRequest : public ReferenceCountingInstance { public: virtual ~ResponseRequest() {} diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index f799a81..71a5920 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -43,6 +43,16 @@ namespace epics { catch (std::exception &e) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \ catch (...) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); } + class ResponseRequestGuard { + private: + ResponseRequest* m_rr; + public: + // no, don't be tempted to acquire here (must be done in getResponseRequest()) + ResponseRequestGuard(ResponseRequest* rr) : m_rr(rr) {}; + ~ResponseRequestGuard() { if (m_rr) m_rr->release(); }; + ResponseRequest* get() const { return m_rr; }; + }; + /** * Base channel request. * @author Matej Sekoranja @@ -237,6 +247,8 @@ namespace epics { else if (qos == PURE_DESTROY_REQUEST) { control->startMessage((int8)15, 8); + // NOTE: reference to the channel that can be deleted + // however CHANNEL_DESTROY request to the server keeps it alive buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); } @@ -1799,10 +1811,10 @@ namespace epics { AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); transport->ensureData(4); - ResponseRequest* rr = _context->getResponseRequest(payloadBuffer->getInt()); - if (rr) + ResponseRequestGuard rr(_context->getResponseRequest(payloadBuffer->getInt())); + if (rr.get()) { - DataResponse* nrr = dynamic_cast(rr); + DataResponse* nrr = dynamic_cast(rr.get()); if (nrr) nrr->response(transport, version, payloadBuffer); } @@ -1987,13 +1999,17 @@ namespace epics { transport->ensureData(5); - DataResponse* nrr = dynamic_cast(_context->getResponseRequest(payloadBuffer->getInt())); - Requester* requester; - if (nrr && (requester = nrr->getRequester())) + ResponseRequestGuard rr(_context->getResponseRequest(payloadBuffer->getInt())); + if (rr.get()) { - MessageType type = (MessageType)payloadBuffer->getByte(); - String message = SerializeHelper::deserializeString(payloadBuffer, transport); - requester->message(message, type); // TODO do we need to guard from exceptions + DataResponse* nrr = dynamic_cast(rr.get()); + Requester* requester; + if (nrr && (requester = nrr->getRequester())) + { + MessageType type = (MessageType)payloadBuffer->getByte(); + String message = SerializeHelper::deserializeString(payloadBuffer, transport); + requester->message(message, type); + } } } @@ -2266,6 +2282,7 @@ namespace epics { ~InternalChannelImpl() { PVDATA_REFCOUNT_MONITOR_DESTRUCT(channel); + if (m_addresses) delete m_addresses; } public: @@ -2309,9 +2326,7 @@ namespace epics { virtual void destroy() { - destroy(false); //TODO guard - if (m_addresses) delete m_addresses; - delete this; + destroy(false); }; virtual String getRequesterName() @@ -2520,18 +2535,21 @@ namespace epics { * @param force force destruction regardless of reference count */ void destroy(bool force) { + { Lock guard(&m_channelMutex); if (m_connectionState == DESTROYED) throw std::runtime_error("Channel already destroyed."); - + } + // do destruction via context m_context->destroyChannel(this, force); } - - /** - * Increment reference. - */ + + + // NOTE: this is used only to keep instance in memory + // it is not related to channel destroy; not a mechanism to + // allow channel sharing void acquire() { Lock guard(&m_channelMutex); m_references++; @@ -2541,10 +2559,19 @@ namespace epics { m_channelMutex.lock(); m_references--; m_channelMutex.unlock(); - // if (m_references == 0) - // delete this; + if (m_references == 0) + { + if (m_transport) + { + // unresponsive state, do not forget to release transport + ReferenceCountingTransport* rct = dynamic_cast(m_transport); + if (rct) rct->release(this); + m_transport = 0; + } + + delete this; + } } -// TTTOOOOOOODOOOOOO !!! /** * Actual destroy method, to be called CAJContext. @@ -2554,37 +2581,39 @@ namespace epics { * @throws IOException */ void destroyChannel(bool force) { - Lock guard(&m_channelMutex); - - if (m_connectionState == DESTROYED) - throw std::runtime_error("Channel already destroyed."); - - m_references--; - if (m_references > 0 && !force) - return; - - // stop searching... - m_context->getChannelSearchManager()->unregisterChannel(this); - cancel(); - - disconnectPendingIO(true); - - if (m_connectionState == CONNECTED) { - disconnect(false, true); + Lock guard(&m_channelMutex); + + if (m_connectionState == DESTROYED) + throw std::runtime_error("Channel already destroyed."); + + // stop searching... + m_context->getChannelSearchManager()->unregisterChannel(this); + cancel(); + + disconnectPendingIO(true); + + if (m_connectionState == CONNECTED) + { + disconnect(false, true); + }/* + // transport is release on instance release + else if (m_transport) + { + // unresponsive state, do not forget to release transport + ReferenceCountingTransport* rct = dynamic_cast(m_transport); + if (rct) rct->release(this); + m_transport = 0; + }*/ + + setConnectionState(DESTROYED); + + // unregister + m_context->unregisterChannel(this); } - else if (m_transport) - { - // unresponsive state, do not forget to release transport - ReferenceCountingTransport* rct = dynamic_cast(m_transport); - if (rct) rct->release(this); - m_transport = 0; - } - - setConnectionState(DESTROYED); - - // unregister - m_context->unregisterChannel(this); + + // can be delete this + release(); } /** @@ -2612,13 +2641,17 @@ namespace epics { { if (remoteDestroy) { m_issueCreateMessage = false; - // TODO !!! this causes problems.. since qnqueueSendRequest is added and this instance deleted - //m_transport->enqueueSendRequest(this); + // NOTE: this is neccesary, this holds this channel instance reference + // and keeps it alive so that ResponseRequest reference to this instance + // is valid; otherwise ResponseRequests should acquire this instance + m_transport->enqueueSendRequest(this); } - + /* + // will be release on this instance release ReferenceCountingTransport* rct = dynamic_cast(m_transport); if (rct) rct->release(this); m_transport = 0; + */ } if (initiateSearch) @@ -2796,11 +2829,18 @@ namespace epics { m_needSubscriptionUpdate = true; + int count = 0; + ResponseRequest* rrs[m_responseRequests.size()]; for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin(); iter != m_responseRequests.end(); iter++) { - EXCEPTION_GUARD(iter->second->reportStatus(status)); + rrs[count++] = iter->second; + } + + for (int i = 0; i< count; i++) + { + EXCEPTION_GUARD(rrs[i]->reportStatus(status)); } } @@ -2814,6 +2854,7 @@ namespace epics { Transport* transport = getTransport(); + // NOTE: elements cannot be removed within rrs->updateSubscription callbacks for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin(); iter != m_responseRequests.end(); iter++) @@ -2837,6 +2878,7 @@ namespace epics { else return; // noop + // NOTE: elements cannot be removed within rrs->updateSubscription callbacks for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin(); iter != m_responseRequests.end(); iter++) @@ -3359,7 +3401,10 @@ TODO { Lock guard(&m_ioidMapMutex); IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid); - return (it == m_pendingResponseRequests.end() ? 0 : it->second); + if (it == m_pendingResponseRequests.end()) return 0; + ResponseRequest* rr = it->second; + rr->acquire(); + return rr; } /**