From 3c03971939c4d8cecd21ff22d797b072eb21f534 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Fri, 7 Jan 2011 13:01:48 +0100 Subject: [PATCH] Fixed core dumps. Transport client now finishes successfully. --- .../remote/blockingServerTCPTransport.cpp | 3 +- pvAccessApp/remote/blockingTCP.h | 42 ++++--------------- pvAccessApp/remote/blockingTCPAcceptor.cpp | 39 +---------------- pvAccessApp/remote/blockingTCPTransport.cpp | 30 +++++++++---- pvAccessApp/server/responseHandlers.cpp | 2 +- pvAccessApp/utils/hexDump.cpp | 9 ++-- 6 files changed, 41 insertions(+), 84 deletions(-) diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 32bb1db..b051e2c 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -33,7 +33,7 @@ namespace epics { _introspectionRegistry(new IntrospectionRegistry(true)), _lastChannelSID(0), _channels( new map ()), _channelsMutex( - new Mutex()), _notifyOnClose(NULL) { + new Mutex()) { // NOTE: priority not yet known, default priority is used to register/unregister // TODO implement priorities in Reactor... not that user will // change it.. still getPriority() must return "registered" priority! @@ -68,7 +68,6 @@ namespace epics { void BlockingServerTCPTransport::internalClose(bool force) { BlockingTCPTransport::internalClose(force); - if(_notifyOnClose!=NULL) _notifyOnClose->transportClosed(this); destroyAllChannels(); } diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 8e82224..52f9b2a 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -37,7 +37,6 @@ namespace epics { namespace pvAccess { class MonitorSender; - class BlockingServerTCPTransport; enum ReceiveStage { READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, NONE @@ -47,19 +46,6 @@ namespace epics { IMMEDIATE, DELAYED, USER_CONTROLED }; - class TransportCloseNotification { - public: - virtual ~TransportCloseNotification() { - } - - /** - * When transport closes, the owner will be notified through this - * callback - */ - virtual void - transportClosed(BlockingServerTCPTransport* transport) =0; - }; - class BlockingTCPTransport : public Transport, public TransportSendControl { public: @@ -67,8 +53,6 @@ namespace epics { ResponseHandler* responseHandler, int receiveBufferSize, int16 priority); - virtual ~BlockingTCPTransport(); - virtual bool isClosed() const { return _closed; } @@ -271,6 +255,8 @@ namespace epics { */ virtual bool send(epics::pvData::ByteBuffer* buffer); + virtual ~BlockingTCPTransport(); + private: /** * Default marker period. @@ -352,6 +338,8 @@ namespace epics { Context* _context; + volatile bool _sendThreadRunning; + /** * Internal method that clears and releases buffer. * sendLock and sendBufferLock must be hold while calling this method. @@ -387,8 +375,6 @@ namespace epics { TransportClient* client, short remoteTransportRevision, float beaconInterval, int16 priority); - virtual ~BlockingClientTCPTransport(); - virtual void timerStopped() { // noop } @@ -444,6 +430,8 @@ namespace epics { virtual void internalClose(bool force); + virtual ~BlockingClientTCPTransport(); + private: /** @@ -549,8 +537,6 @@ namespace epics { BlockingServerTCPTransport(Context* context, SOCKET channel, ResponseHandler* responseHandler, int receiveBufferSize); - virtual ~BlockingServerTCPTransport(); - virtual IntrospectionRegistry* getIntrospectionRegistry() { return _introspectionRegistry; } @@ -638,10 +624,6 @@ namespace epics { virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); - void addCloseNotification(TransportCloseNotification* notifyTarget) { - _notifyOnClose = notifyTarget; - } - protected: /** * Introspection registry. @@ -650,6 +632,8 @@ namespace epics { virtual void internalClose(bool force); + virtual ~BlockingServerTCPTransport(); + private: /** * Last SID cache. @@ -663,8 +647,6 @@ namespace epics { Mutex* _channelsMutex; - TransportCloseNotification* _notifyOnClose; - /** * Destroy all channels. */ @@ -676,7 +658,7 @@ namespace epics { * @author Matej Sekoranja * @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $ */ - class BlockingTCPAcceptor : public TransportCloseNotification { + class BlockingTCPAcceptor { public: /** @@ -705,8 +687,6 @@ namespace epics { */ void destroy(); - virtual void transportClosed(BlockingServerTCPTransport* transport); - private: /** * Context instance. @@ -735,10 +715,6 @@ namespace epics { epicsThreadId _threadId; - std::set* _connectedClients; - - Mutex* _connectedClientsMutex; - /** * Initialize connection acception. * @return port where server is listening diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index ab4c21e..c5bfcb7 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -24,7 +24,6 @@ #include using std::ostringstream; -using std::set; namespace epics { namespace pvAccess { @@ -33,9 +32,7 @@ namespace epics { int receiveBufferSize) : _context(context), _bindAddress(NULL), _serverSocketChannel( INVALID_SOCKET), _receiveBufferSize(receiveBufferSize), - _destroyed(false), _threadId(NULL), _connectedClients( - new set ()), - _connectedClientsMutex(new Mutex()) { + _destroyed(false), _threadId(NULL) { initialize(port); } @@ -43,22 +40,6 @@ namespace epics { destroy(); if(_bindAddress!=NULL) delete _bindAddress; - - _connectedClientsMutex->lock(); - // go through all the connected clients, close them, and destroy - set::iterator it = - _connectedClients->begin(); - while(it!=_connectedClients->end()) { - BlockingServerTCPTransport* client = *it; - it++; - client->close(true); - delete client; - } - _connectedClients->clear(); - delete _connectedClients; - _connectedClientsMutex->unlock(); - - delete _connectedClientsMutex; } int BlockingTCPAcceptor::initialize(in_port_t port) { @@ -254,16 +235,9 @@ namespace epics { errlogInfo, "Connection to CA client %s failed to be validated, closing it.", ipAddrStr); - delete transport; return; } - // store the new connected client - _connectedClientsMutex->lock(); - _connectedClients->insert(transport); - transport->addCloseNotification(this); - _connectedClientsMutex->unlock(); - errlogSevPrintf(errlogInfo, "Serving to CA client: %s", ipAddrStr); @@ -307,16 +281,5 @@ namespace epics { } } - void BlockingTCPAcceptor::transportClosed( - BlockingServerTCPTransport* transport) { - Lock lock(_connectedClientsMutex); - - // remove the closed client from the list of connected clients - _connectedClients->erase(transport); - - // release the memory - delete transport; - } - } } diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 836a055..0f60846 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -87,7 +87,8 @@ namespace epics { _rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue( new GrowingCircularBuffer (100)), _monitorSender(new MonitorSender(_monitorMutex, - _monitorSendQueue)), _context(context) { + _monitorSendQueue)), _context(context), + _sendThreadRunning(false) { _socketBuffer = new ByteBuffer(max(MAX_TCP_RECV +MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize)); @@ -141,6 +142,7 @@ namespace epics { } void BlockingTCPTransport::start() { + _sendThreadRunning = true; String threadName = "TCP-receive "+inetAddressToString( _socketAddress); @@ -210,7 +212,10 @@ namespace epics { void BlockingTCPTransport::internalClose(bool force) { // close the socket - epicsSocketDestroy(_channel); + if(_channel!=INVALID_SOCKET) { + epicsSocketDestroy(_channel); + _channel = INVALID_SOCKET; + } } int BlockingTCPTransport::getSocketReceiveBufferSize() const { @@ -462,11 +467,11 @@ namespace epics { maxToRead, 0); _socketBuffer->put(readBuffer, 0, bytesRead); - if(bytesRead<0) { + if(bytesRead<=0) { // error (disconnect, end-of-stream) detected close(true); - if(nestedCall) THROW_BASE_EXCEPTION( + if(bytesRead<0&&nestedCall) THROW_BASE_EXCEPTION( "bytesRead < 0"); return; @@ -834,12 +839,21 @@ namespace epics { errlogSevPrintf(errlogInfo, "Connection to %s closed.", inetAddressToString(_socketAddress).c_str()); - epicsSocketDestroy(_channel); + if(_channel!=INVALID_SOCKET) { + epicsSocketDestroy(_channel); + _channel = INVALID_SOCKET; + } } void BlockingTCPTransport::rcvThreadRunner(void* param) { - ((BlockingTCPTransport*)param)->processReadCached(false, NONE, - CA_MESSAGE_HEADER_SIZE, false); + BlockingTCPTransport* obj = (BlockingTCPTransport*)param; + + obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false); + + while(obj->_sendThreadRunning) + epicsThreadSleep(0.1); + + delete obj; } void BlockingTCPTransport::sendThreadRunner(void* param) { @@ -848,6 +862,8 @@ namespace epics { obj->processSendQueue(); obj->freeConnectionResorces(); + + obj->_sendThreadRunning = false; } void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) { diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 355c2c6..0471373 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -32,7 +32,7 @@ namespace epics { ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); ostringstream prologue; - prologue< using namespace epics::pvData; +using std::stringstream; +using std::endl; +using std::cout; namespace epics { namespace pvAccess { @@ -29,9 +32,9 @@ namespace epics { void hexDump(const String prologue, const String name, const int8 *bs, int start, int len) { - std::stringstream header; + stringstream header; - header<