From 0af1781b5eda4a7b96a1435187cf7cee5ae34094 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Wed, 5 Jan 2011 13:36:07 +0100 Subject: [PATCH] Added connected clients tracking to BlockingTCPAcceptor. For each new connection a new 'BlockingServerTCPTransport' object is created, but was never released. The acceptor now stores all the objects in a set, and registers a callback with the client, to be notified when the connection is closed. On notification the client is them removed from the set and the memory is released. When the acceptor is destroyed, it also closes and deleted all the connected clients. --- .../remote/blockingClientTCPTransport.cpp | 8 ++-- .../remote/blockingServerTCPTransport.cpp | 3 +- pvAccessApp/remote/blockingTCP.h | 31 ++++++++++++- pvAccessApp/remote/blockingTCPAcceptor.cpp | 45 +++++++++++++++++-- pvAccessApp/remote/blockingTCPTransport.cpp | 3 +- 5 files changed, 79 insertions(+), 11 deletions(-) diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index e380b58..75f15d4 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -125,12 +125,12 @@ namespace epics { // check if still acquired int refs = _owners->size(); if(refs>0) { - ostringstream temp; char ipAddrStr[48]; ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); - temp<<"Transport to "<::iterator it = _owners->begin(); for(; it!=_owners->end(); it++) diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 29272c5..9bd0a93 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -33,7 +33,7 @@ namespace epics { _introspectionRegistry(new IntrospectionRegistry(true)), _lastChannelSID(0), _channels( new map ()), _channelsMutex( - new Mutex()) { + new Mutex()), _notifyOnClose(NULL) { // NOTE: priority not yet known, default priority is used to register/unregister // TODO implement priorities in Reactor... not that user will // change it.. still getPriority() must return "registered" priority! @@ -68,6 +68,7 @@ namespace epics { void BlockingServerTCPTransport::internalClose(bool force) { BlockingTCPTransport::internalClose(force); + if(_notifyOnClose!=NULL) _notifyOnClose->transportClosed(this); destroyAllChannels(); } diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index c6bd5dc..8e82224 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -37,6 +37,7 @@ namespace epics { namespace pvAccess { class MonitorSender; + class BlockingServerTCPTransport; enum ReceiveStage { READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, NONE @@ -46,6 +47,19 @@ namespace epics { IMMEDIATE, DELAYED, USER_CONTROLED }; + class TransportCloseNotification { + public: + virtual ~TransportCloseNotification() { + } + + /** + * When transport closes, the owner will be notified through this + * callback + */ + virtual void + transportClosed(BlockingServerTCPTransport* transport) =0; + }; + class BlockingTCPTransport : public Transport, public TransportSendControl { public: @@ -504,7 +518,8 @@ namespace epics { /** * named lock */ - NamedLockPattern* _namedLocker; + NamedLockPattern + * _namedLocker; /** * Receive buffer size. @@ -623,6 +638,10 @@ namespace epics { virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); + void addCloseNotification(TransportCloseNotification* notifyTarget) { + _notifyOnClose = notifyTarget; + } + protected: /** * Introspection registry. @@ -644,6 +663,8 @@ namespace epics { Mutex* _channelsMutex; + TransportCloseNotification* _notifyOnClose; + /** * Destroy all channels. */ @@ -655,7 +676,7 @@ namespace epics { * @author Matej Sekoranja * @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $ */ - class BlockingTCPAcceptor { + class BlockingTCPAcceptor : public TransportCloseNotification { public: /** @@ -684,6 +705,8 @@ namespace epics { */ void destroy(); + virtual void transportClosed(BlockingServerTCPTransport* transport); + private: /** * Context instance. @@ -712,6 +735,10 @@ namespace epics { epicsThreadId _threadId; + std::set* _connectedClients; + + Mutex* _connectedClientsMutex; + /** * Initialize connection acception. * @return port where server is listening diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index d35b92c..3d83c31 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -24,6 +24,7 @@ #include using std::ostringstream; +using std::set; namespace epics { namespace pvAccess { @@ -32,12 +33,30 @@ namespace epics { int receiveBufferSize) : _context(context), _bindAddress(NULL), _serverSocketChannel( INVALID_SOCKET), _receiveBufferSize(receiveBufferSize), - _destroyed(false), _threadId(NULL) { + _destroyed(false), _threadId(NULL), _connectedClients( + new set ()), + _connectedClientsMutex(new Mutex()) { initialize(port); } BlockingTCPAcceptor::~BlockingTCPAcceptor() { if(_bindAddress!=NULL) delete _bindAddress; + + _connectedClientsMutex->lock(); + // go through all the connected clients, close them, and destroy + set::iterator it = + _connectedClients->begin(); + while(it!=_connectedClients->end()) { + BlockingServerTCPTransport* client = *it; + it++; + client->close(true); + delete client; + } + _connectedClients->clear(); + delete _connectedClients; + _connectedClientsMutex->unlock(); + + delete _connectedClientsMutex; } int BlockingTCPAcceptor::initialize(in_port_t port) { @@ -213,8 +232,10 @@ namespace epics { //socket.socket().setReceiveBufferSize(); //socket.socket().setSendBufferSize(); - // create transport - // each transport should have its own response handler since it is not "shareable" + /* create transport + * each transport should have its own response + * handler since it is not "shareable" + */ BlockingServerTCPTransport * transport = new BlockingServerTCPTransport( @@ -231,9 +252,16 @@ namespace epics { errlogInfo, "Connection to CA client %s failed to be validated, closing it.", ipAddrStr); + delete transport; return; } + // store the new connected client + _connectedClientsMutex->lock(); + _connectedClients->insert(transport); + transport->addCloseNotification(this); + _connectedClientsMutex->unlock(); + errlogSevPrintf(errlogInfo, "Serving to CA client: %s", ipAddrStr); @@ -277,5 +305,16 @@ namespace epics { } } + void BlockingTCPAcceptor::transportClosed( + BlockingServerTCPTransport* transport) { + Lock lock(_connectedClientsMutex); + + // remove the closed client from the list of connected clients + _connectedClients->erase(transport); + + // release the memory + delete transport; + } + } } diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index b746ccd..e9a769c 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -208,7 +208,8 @@ namespace epics { } void BlockingTCPTransport::internalClose(bool force) { - // noop + // close the socket + epicsSocketDestroy(_channel); } int BlockingTCPTransport::getSocketReceiveBufferSize() const {