diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index e380b58..75f15d4 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -125,12 +125,12 @@ namespace epics { // check if still acquired int refs = _owners->size(); if(refs>0) { - ostringstream temp; char ipAddrStr[48]; ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); - temp<<"Transport to "<::iterator it = _owners->begin(); for(; it!=_owners->end(); it++) diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 29272c5..9bd0a93 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -33,7 +33,7 @@ namespace epics { _introspectionRegistry(new IntrospectionRegistry(true)), _lastChannelSID(0), _channels( new map ()), _channelsMutex( - new Mutex()) { + new Mutex()), _notifyOnClose(NULL) { // NOTE: priority not yet known, default priority is used to register/unregister // TODO implement priorities in Reactor... not that user will // change it.. still getPriority() must return "registered" priority! @@ -68,6 +68,7 @@ namespace epics { void BlockingServerTCPTransport::internalClose(bool force) { BlockingTCPTransport::internalClose(force); + if(_notifyOnClose!=NULL) _notifyOnClose->transportClosed(this); destroyAllChannels(); } diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index c6bd5dc..8e82224 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -37,6 +37,7 @@ namespace epics { namespace pvAccess { class MonitorSender; + class BlockingServerTCPTransport; enum ReceiveStage { READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, NONE @@ -46,6 +47,19 @@ namespace epics { IMMEDIATE, DELAYED, USER_CONTROLED }; + class TransportCloseNotification { + public: + virtual ~TransportCloseNotification() { + } + + /** + * When transport closes, the owner will be notified through this + * callback + */ + virtual void + transportClosed(BlockingServerTCPTransport* transport) =0; + }; + class BlockingTCPTransport : public Transport, public TransportSendControl { public: @@ -504,7 +518,8 @@ namespace epics { /** * named lock */ - NamedLockPattern* _namedLocker; + NamedLockPattern + * _namedLocker; /** * Receive buffer size. @@ -623,6 +638,10 @@ namespace epics { virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); + void addCloseNotification(TransportCloseNotification* notifyTarget) { + _notifyOnClose = notifyTarget; + } + protected: /** * Introspection registry. @@ -644,6 +663,8 @@ namespace epics { Mutex* _channelsMutex; + TransportCloseNotification* _notifyOnClose; + /** * Destroy all channels. */ @@ -655,7 +676,7 @@ namespace epics { * @author Matej Sekoranja * @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $ */ - class BlockingTCPAcceptor { + class BlockingTCPAcceptor : public TransportCloseNotification { public: /** @@ -684,6 +705,8 @@ namespace epics { */ void destroy(); + virtual void transportClosed(BlockingServerTCPTransport* transport); + private: /** * Context instance. @@ -712,6 +735,10 @@ namespace epics { epicsThreadId _threadId; + std::set* _connectedClients; + + Mutex* _connectedClientsMutex; + /** * Initialize connection acception. * @return port where server is listening diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index d35b92c..3d83c31 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -24,6 +24,7 @@ #include using std::ostringstream; +using std::set; namespace epics { namespace pvAccess { @@ -32,12 +33,30 @@ namespace epics { int receiveBufferSize) : _context(context), _bindAddress(NULL), _serverSocketChannel( INVALID_SOCKET), _receiveBufferSize(receiveBufferSize), - _destroyed(false), _threadId(NULL) { + _destroyed(false), _threadId(NULL), _connectedClients( + new set ()), + _connectedClientsMutex(new Mutex()) { initialize(port); } BlockingTCPAcceptor::~BlockingTCPAcceptor() { if(_bindAddress!=NULL) delete _bindAddress; + + _connectedClientsMutex->lock(); + // go through all the connected clients, close them, and destroy + set::iterator it = + _connectedClients->begin(); + while(it!=_connectedClients->end()) { + BlockingServerTCPTransport* client = *it; + it++; + client->close(true); + delete client; + } + _connectedClients->clear(); + delete _connectedClients; + _connectedClientsMutex->unlock(); + + delete _connectedClientsMutex; } int BlockingTCPAcceptor::initialize(in_port_t port) { @@ -213,8 +232,10 @@ namespace epics { //socket.socket().setReceiveBufferSize(); //socket.socket().setSendBufferSize(); - // create transport - // each transport should have its own response handler since it is not "shareable" + /* create transport + * each transport should have its own response + * handler since it is not "shareable" + */ BlockingServerTCPTransport * transport = new BlockingServerTCPTransport( @@ -231,9 +252,16 @@ namespace epics { errlogInfo, "Connection to CA client %s failed to be validated, closing it.", ipAddrStr); + delete transport; return; } + // store the new connected client + _connectedClientsMutex->lock(); + _connectedClients->insert(transport); + transport->addCloseNotification(this); + _connectedClientsMutex->unlock(); + errlogSevPrintf(errlogInfo, "Serving to CA client: %s", ipAddrStr); @@ -277,5 +305,16 @@ namespace epics { } } + void BlockingTCPAcceptor::transportClosed( + BlockingServerTCPTransport* transport) { + Lock lock(_connectedClientsMutex); + + // remove the closed client from the list of connected clients + _connectedClients->erase(transport); + + // release the memory + delete transport; + } + } } diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index b746ccd..e9a769c 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -208,7 +208,8 @@ namespace epics { } void BlockingTCPTransport::internalClose(bool force) { - // noop + // close the socket + epicsSocketDestroy(_channel); } int BlockingTCPTransport::getSocketReceiveBufferSize() const {