From 59b45653d1bdade4375333809a3231e907f5afc1 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Fri, 7 Jan 2011 08:44:43 +0100 Subject: [PATCH 1/7] blockingClientTCPTransport.cpp: - changed magic numbers to enums blockingTCPTransport.cpp: - debug helpers only responseHandlers.*: - added two new handlers, NOOP and Echo --- .../remote/blockingClientTCPTransport.cpp | 4 +- pvAccessApp/remote/blockingTCPTransport.cpp | 16 +++++++ pvAccessApp/server/responseHandlers.cpp | 43 ++++++++++++++++++- pvAccessApp/server/responseHandlers.h | 35 +++++++++++++++ 4 files changed, 94 insertions(+), 4 deletions(-) diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index 75f15d4..27fe8b6 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -188,7 +188,7 @@ namespace epics { * send verification response message */ - control->startMessage(1, 2*sizeof(int32)+sizeof(int16)); + control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)+sizeof(int16)); // receive buffer size buffer->putInt(getReceiveBufferSize()); @@ -205,7 +205,7 @@ namespace epics { _verifyOrEcho = false; } else { - control->startMessage(2, 0); + control->startMessage(CMD_ECHO, 0); // send immediately control->flush(true); } diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 836a055..f5ecb3f 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -417,6 +417,11 @@ namespace epics { void BlockingTCPTransport::processReadCached(bool nestedCall, ReceiveStage inStage, int requiredBytes, bool addToBuffer) { + // TODO remove debug + errlogSevPrintf(errlogInfo, + "processReadCached(%d, %d, %d, %d), _stage: %d", + nestedCall, inStage, requiredBytes, addToBuffer, _stage); + try { while(!_closed) { if(_stage==READ_FROM_SOCKET||inStage!=NONE) { @@ -453,6 +458,11 @@ namespace epics { int requiredPosition = (currentStartPosition +requiredBytes); + + // TODO remove debug + errlogSevPrintf(errlogInfo, + "requredPos:%d, buffer->pos:%d", + requiredPosition, _socketBuffer->getPosition()); while(_socketBuffer->getPosition()put(readBuffer, 0, bytesRead); + // TODO remove debug + if(bytesRead>0) errlogSevPrintf( + errlogInfo, + "***!!! got %d bytes of %d (reqPos=%d)!!!***", + bytesRead, requiredBytes, requiredPosition); + if(bytesRead<0) { // error (disconnect, end-of-stream) detected close(true); diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index f69c5c7..f0e99d0 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -63,9 +63,9 @@ namespace epics { _handlerTable = new ResponseHandler*[HANDLER_TABLE_LENGTH]; // TODO add real handlers, as they are developed - _handlerTable[0] = badResponse; + _handlerTable[0] = new NoopResponse(_context, "Beacon"); _handlerTable[1] = new ConnectionValidationHandler(_context); - _handlerTable[2] = badResponse; + _handlerTable[2] = new EchoHandler(_context); _handlerTable[3] = badResponse; _handlerTable[4] = badResponse; _handlerTable[5] = badResponse; @@ -96,6 +96,8 @@ namespace epics { ServerResponseHandler::~ServerResponseHandler() { delete _handlerTable[0]; delete _handlerTable[1]; + delete _handlerTable[2]; + delete _handlerTable[27]; delete[] _handlerTable; } @@ -137,5 +139,42 @@ namespace epics { //transport.setPriority(payloadBuffer.getShort()); } + class EchoTransportSender : public TransportSender { + public: + EchoTransportSender(osiSockAddr* echoFrom) { + memcpy(&_echoFrom, echoFrom, sizeof(osiSockAddr)); + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + control->startMessage(CMD_ECHO, 0); + control->setRecipient(&_echoFrom); + } + + virtual void lock() { + } + + virtual void unlock() { + delete this; + } + private: + osiSockAddr _echoFrom; + + virtual ~EchoTransportSender() { + } + }; + + void EchoHandler::handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + EchoTransportSender* echoReply = new EchoTransportSender( + responseFrom); + + // send back + transport->enqueueSendRequest(echoReply); + } + } } diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index f08c399..afbcdba 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -103,6 +103,41 @@ namespace epics { int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); }; + /** + * NOOP response. + * @author Matej Sekoranja + * @version $Id: NoopResponse.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class NoopResponse : public AbstractServerResponseHandler { + public: + /** + * @param context + * @param description + */ + NoopResponse(ServerContextImpl* context, String description) : + AbstractServerResponseHandler(context, description) { + } + }; + + /** + * Echo request handler. + * @author Matej Sekoranja + * @version $Id: EchoHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class EchoHandler : public AbstractServerResponseHandler { + public: + /** + * @param context + */ + EchoHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Echo request") { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; + } } From 387d8aa0dc52e6d5467cd9995fbab44dd0a5356c Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Fri, 7 Jan 2011 08:58:17 +0100 Subject: [PATCH 2/7] Some final fixes to addressUtils regarding byte-order. --- pvAccessApp/utils/inetAddressUtil.cpp | 12 ++++++------ pvAccessApp/utils/inetAddressUtil.h | 2 +- testApp/utils/inetAddressUtilsTest.cpp | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index 5fff2d2..217c740 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -115,7 +115,7 @@ namespace epics { // next 16-bits are 1 buffer->putShort(0xFFFF); // following IPv4 address in big-endian (network) byte order - in_addr_t ipv4Addr = address->ia.sin_addr.s_addr; + in_addr_t ipv4Addr = ntohl(address->ia.sin_addr.s_addr); buffer->putByte((int8)((ipv4Addr>>24)&0xFF)); buffer->putByte((int8)((ipv4Addr>>16)&0xFF)); buffer->putByte((int8)((ipv4Addr>>8)&0xFF)); @@ -166,7 +166,7 @@ namespace epics { retAddr <<= 8; retAddr |= byte; - return retAddr; + return htonl(retAddr); } InetAddrVector* getSocketAddressList(String list, int defaultPort, @@ -198,7 +198,7 @@ namespace epics { } const String inetAddressToString(const osiSockAddr *addr, - bool displayHex) { + bool displayPort, bool displayHex) { stringstream saddr; int ipa = ntohl(addr->ia.sin_addr.s_addr); @@ -207,9 +207,9 @@ namespace epics { saddr<<((int)(ipa>>16)&0xFF)<<'.'; saddr<<((int)(ipa>>8)&0xFF)<<'.'; saddr<<((int)ipa&0xFF); - if(addr->ia.sin_port>0) saddr<<":"<ia.sin_port); - if(displayHex) saddr<<" ("<ia.sin_addr.s_addr)<<")"; + if(displayPort) saddr<<":"<ia.sin_port); + if(displayHex) saddr<<" ("<ia.sin_addr.s_addr) + <<")"; return saddr.str(); } diff --git a/pvAccessApp/utils/inetAddressUtil.h b/pvAccessApp/utils/inetAddressUtil.h index 6910279..0450839 100644 --- a/pvAccessApp/utils/inetAddressUtil.h +++ b/pvAccessApp/utils/inetAddressUtil.h @@ -71,7 +71,7 @@ namespace epics { const InetAddrVector* appendList = NULL); const String inetAddressToString(const osiSockAddr *addr, - bool displayHex = false); + bool displayPort = true, bool displayHex = false); /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ diff --git a/testApp/utils/inetAddressUtilsTest.cpp b/testApp/utils/inetAddressUtilsTest.cpp index d6a55d4..ce77ae4 100644 --- a/testApp/utils/inetAddressUtilsTest.cpp +++ b/testApp/utils/inetAddressUtilsTest.cpp @@ -63,7 +63,7 @@ int main(int argc, char *argv[]) { assert(addr->ia.sin_port==htons(6789)); assert(addr->ia.sin_addr.s_addr==htonl(0xAC1037A0)); assert(inetAddressToString(addr)=="172.16.55.160:6789"); -cout<<'\t'<at(1); assert(addr->ia.sin_family==AF_INET); @@ -100,13 +100,13 @@ cout<<'\t'<ia.sin_family==AF_INET); - assert(inetAddressToString(addr)=="127.0.0.1"); + assert(inetAddressToString(addr)=="127.0.0.1:0"); cout<<'\t'<ia.sin_family==AF_INET); - assert(inetAddressToString(addr)=="10.10.12.11"); + assert(inetAddressToString(addr)=="10.10.12.11:0"); cout<<'\t'< Date: Fri, 7 Jan 2011 09:53:33 +0100 Subject: [PATCH 3/7] TCP transport now basically works. Still some bugs to squash. --- pvAccessApp/remote/blockingTCPTransport.cpp | 16 ---------------- testApp/remote/Makefile | 9 +++++++++ testApp/remote/testBlockingTCPClnt.cpp | 15 +++++++++++---- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index f5ecb3f..836a055 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -417,11 +417,6 @@ namespace epics { void BlockingTCPTransport::processReadCached(bool nestedCall, ReceiveStage inStage, int requiredBytes, bool addToBuffer) { - // TODO remove debug - errlogSevPrintf(errlogInfo, - "processReadCached(%d, %d, %d, %d), _stage: %d", - nestedCall, inStage, requiredBytes, addToBuffer, _stage); - try { while(!_closed) { if(_stage==READ_FROM_SOCKET||inStage!=NONE) { @@ -458,11 +453,6 @@ namespace epics { int requiredPosition = (currentStartPosition +requiredBytes); - - // TODO remove debug - errlogSevPrintf(errlogInfo, - "requredPos:%d, buffer->pos:%d", - requiredPosition, _socketBuffer->getPosition()); while(_socketBuffer->getPosition()put(readBuffer, 0, bytesRead); - // TODO remove debug - if(bytesRead>0) errlogSevPrintf( - errlogInfo, - "***!!! got %d bytes of %d (reqPos=%d)!!!***", - bytesRead, requiredBytes, requiredPosition); - if(bytesRead<0) { // error (disconnect, end-of-stream) detected close(true); diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 620b69c..77979b5 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -26,6 +26,15 @@ PROD_HOST += testChannelSearchManager testChannelSearchManager_SRCS += testChannelSearchManager.cpp testChannelSearchManager_LIBS += pvData pvAccess Com +PROD_HOST += testBlockingTCPSrv +testBlockingTCPSrv_SRCS += testBlockingTCPSrv.cpp +testBlockingTCPSrv_LIBS += pvData pvAccess Com + +PROD_HOST += testBlockingTCPClnt +testBlockingTCPClnt_SRCS += testBlockingTCPClnt.cpp +testBlockingTCPClnt_LIBS += pvData pvAccess Com + + include $(TOP)/configure/RULES #---------------------------------------- # ADD RULES AFTER THIS LINE diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp index 32c7c29..8218dfa 100644 --- a/testApp/remote/testBlockingTCPClnt.cpp +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -29,14 +29,19 @@ using std::sscanf; class ContextImpl : public Context { public: ContextImpl() : - _tr(new TransportRegistry()), - _timer(new Timer("client thread", lowPriority)) {} + _tr(new TransportRegistry()), _timer(new Timer("client thread", + lowPriority)) { + } virtual ~ContextImpl() { delete _tr; delete _timer; } - virtual Timer* getTimer() { return _timer; } - virtual TransportRegistry* getTransportRegistry() { return _tr; } + virtual Timer* getTimer() { + return _timer; + } + virtual TransportRegistry* getTransportRegistry() { + return _tr; + } private: TransportRegistry* _tr; Timer* _timer; @@ -47,6 +52,8 @@ public: virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { + + if(command==CMD_CONNECTION_VALIDATION) transport->verified(); } }; From b868736759f5799e16fb4fd59f6f2889994c2c5f Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Fri, 7 Jan 2011 10:03:20 +0100 Subject: [PATCH 4/7] Some formatting of the debug message. --- pvAccessApp/server/responseHandlers.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index f0e99d0..355c2c6 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -32,8 +32,8 @@ namespace epics { ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); ostringstream prologue; - prologue<<"Message ["<getArray(), From 3c03971939c4d8cecd21ff22d797b072eb21f534 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Fri, 7 Jan 2011 13:01:48 +0100 Subject: [PATCH 5/7] Fixed core dumps. Transport client now finishes successfully. --- .../remote/blockingServerTCPTransport.cpp | 3 +- pvAccessApp/remote/blockingTCP.h | 42 ++++--------------- pvAccessApp/remote/blockingTCPAcceptor.cpp | 39 +---------------- pvAccessApp/remote/blockingTCPTransport.cpp | 30 +++++++++---- pvAccessApp/server/responseHandlers.cpp | 2 +- pvAccessApp/utils/hexDump.cpp | 9 ++-- 6 files changed, 41 insertions(+), 84 deletions(-) diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 32bb1db..b051e2c 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -33,7 +33,7 @@ namespace epics { _introspectionRegistry(new IntrospectionRegistry(true)), _lastChannelSID(0), _channels( new map ()), _channelsMutex( - new Mutex()), _notifyOnClose(NULL) { + new Mutex()) { // NOTE: priority not yet known, default priority is used to register/unregister // TODO implement priorities in Reactor... not that user will // change it.. still getPriority() must return "registered" priority! @@ -68,7 +68,6 @@ namespace epics { void BlockingServerTCPTransport::internalClose(bool force) { BlockingTCPTransport::internalClose(force); - if(_notifyOnClose!=NULL) _notifyOnClose->transportClosed(this); destroyAllChannels(); } diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 8e82224..52f9b2a 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -37,7 +37,6 @@ namespace epics { namespace pvAccess { class MonitorSender; - class BlockingServerTCPTransport; enum ReceiveStage { READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, NONE @@ -47,19 +46,6 @@ namespace epics { IMMEDIATE, DELAYED, USER_CONTROLED }; - class TransportCloseNotification { - public: - virtual ~TransportCloseNotification() { - } - - /** - * When transport closes, the owner will be notified through this - * callback - */ - virtual void - transportClosed(BlockingServerTCPTransport* transport) =0; - }; - class BlockingTCPTransport : public Transport, public TransportSendControl { public: @@ -67,8 +53,6 @@ namespace epics { ResponseHandler* responseHandler, int receiveBufferSize, int16 priority); - virtual ~BlockingTCPTransport(); - virtual bool isClosed() const { return _closed; } @@ -271,6 +255,8 @@ namespace epics { */ virtual bool send(epics::pvData::ByteBuffer* buffer); + virtual ~BlockingTCPTransport(); + private: /** * Default marker period. @@ -352,6 +338,8 @@ namespace epics { Context* _context; + volatile bool _sendThreadRunning; + /** * Internal method that clears and releases buffer. * sendLock and sendBufferLock must be hold while calling this method. @@ -387,8 +375,6 @@ namespace epics { TransportClient* client, short remoteTransportRevision, float beaconInterval, int16 priority); - virtual ~BlockingClientTCPTransport(); - virtual void timerStopped() { // noop } @@ -444,6 +430,8 @@ namespace epics { virtual void internalClose(bool force); + virtual ~BlockingClientTCPTransport(); + private: /** @@ -549,8 +537,6 @@ namespace epics { BlockingServerTCPTransport(Context* context, SOCKET channel, ResponseHandler* responseHandler, int receiveBufferSize); - virtual ~BlockingServerTCPTransport(); - virtual IntrospectionRegistry* getIntrospectionRegistry() { return _introspectionRegistry; } @@ -638,10 +624,6 @@ namespace epics { virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); - void addCloseNotification(TransportCloseNotification* notifyTarget) { - _notifyOnClose = notifyTarget; - } - protected: /** * Introspection registry. @@ -650,6 +632,8 @@ namespace epics { virtual void internalClose(bool force); + virtual ~BlockingServerTCPTransport(); + private: /** * Last SID cache. @@ -663,8 +647,6 @@ namespace epics { Mutex* _channelsMutex; - TransportCloseNotification* _notifyOnClose; - /** * Destroy all channels. */ @@ -676,7 +658,7 @@ namespace epics { * @author Matej Sekoranja * @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $ */ - class BlockingTCPAcceptor : public TransportCloseNotification { + class BlockingTCPAcceptor { public: /** @@ -705,8 +687,6 @@ namespace epics { */ void destroy(); - virtual void transportClosed(BlockingServerTCPTransport* transport); - private: /** * Context instance. @@ -735,10 +715,6 @@ namespace epics { epicsThreadId _threadId; - std::set* _connectedClients; - - Mutex* _connectedClientsMutex; - /** * Initialize connection acception. * @return port where server is listening diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index ab4c21e..c5bfcb7 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -24,7 +24,6 @@ #include using std::ostringstream; -using std::set; namespace epics { namespace pvAccess { @@ -33,9 +32,7 @@ namespace epics { int receiveBufferSize) : _context(context), _bindAddress(NULL), _serverSocketChannel( INVALID_SOCKET), _receiveBufferSize(receiveBufferSize), - _destroyed(false), _threadId(NULL), _connectedClients( - new set ()), - _connectedClientsMutex(new Mutex()) { + _destroyed(false), _threadId(NULL) { initialize(port); } @@ -43,22 +40,6 @@ namespace epics { destroy(); if(_bindAddress!=NULL) delete _bindAddress; - - _connectedClientsMutex->lock(); - // go through all the connected clients, close them, and destroy - set::iterator it = - _connectedClients->begin(); - while(it!=_connectedClients->end()) { - BlockingServerTCPTransport* client = *it; - it++; - client->close(true); - delete client; - } - _connectedClients->clear(); - delete _connectedClients; - _connectedClientsMutex->unlock(); - - delete _connectedClientsMutex; } int BlockingTCPAcceptor::initialize(in_port_t port) { @@ -254,16 +235,9 @@ namespace epics { errlogInfo, "Connection to CA client %s failed to be validated, closing it.", ipAddrStr); - delete transport; return; } - // store the new connected client - _connectedClientsMutex->lock(); - _connectedClients->insert(transport); - transport->addCloseNotification(this); - _connectedClientsMutex->unlock(); - errlogSevPrintf(errlogInfo, "Serving to CA client: %s", ipAddrStr); @@ -307,16 +281,5 @@ namespace epics { } } - void BlockingTCPAcceptor::transportClosed( - BlockingServerTCPTransport* transport) { - Lock lock(_connectedClientsMutex); - - // remove the closed client from the list of connected clients - _connectedClients->erase(transport); - - // release the memory - delete transport; - } - } } diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 836a055..0f60846 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -87,7 +87,8 @@ namespace epics { _rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue( new GrowingCircularBuffer (100)), _monitorSender(new MonitorSender(_monitorMutex, - _monitorSendQueue)), _context(context) { + _monitorSendQueue)), _context(context), + _sendThreadRunning(false) { _socketBuffer = new ByteBuffer(max(MAX_TCP_RECV +MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize)); @@ -141,6 +142,7 @@ namespace epics { } void BlockingTCPTransport::start() { + _sendThreadRunning = true; String threadName = "TCP-receive "+inetAddressToString( _socketAddress); @@ -210,7 +212,10 @@ namespace epics { void BlockingTCPTransport::internalClose(bool force) { // close the socket - epicsSocketDestroy(_channel); + if(_channel!=INVALID_SOCKET) { + epicsSocketDestroy(_channel); + _channel = INVALID_SOCKET; + } } int BlockingTCPTransport::getSocketReceiveBufferSize() const { @@ -462,11 +467,11 @@ namespace epics { maxToRead, 0); _socketBuffer->put(readBuffer, 0, bytesRead); - if(bytesRead<0) { + if(bytesRead<=0) { // error (disconnect, end-of-stream) detected close(true); - if(nestedCall) THROW_BASE_EXCEPTION( + if(bytesRead<0&&nestedCall) THROW_BASE_EXCEPTION( "bytesRead < 0"); return; @@ -834,12 +839,21 @@ namespace epics { errlogSevPrintf(errlogInfo, "Connection to %s closed.", inetAddressToString(_socketAddress).c_str()); - epicsSocketDestroy(_channel); + if(_channel!=INVALID_SOCKET) { + epicsSocketDestroy(_channel); + _channel = INVALID_SOCKET; + } } void BlockingTCPTransport::rcvThreadRunner(void* param) { - ((BlockingTCPTransport*)param)->processReadCached(false, NONE, - CA_MESSAGE_HEADER_SIZE, false); + BlockingTCPTransport* obj = (BlockingTCPTransport*)param; + + obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false); + + while(obj->_sendThreadRunning) + epicsThreadSleep(0.1); + + delete obj; } void BlockingTCPTransport::sendThreadRunner(void* param) { @@ -848,6 +862,8 @@ namespace epics { obj->processSendQueue(); obj->freeConnectionResorces(); + + obj->_sendThreadRunning = false; } void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) { diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 355c2c6..0471373 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -32,7 +32,7 @@ namespace epics { ipAddrToA(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr)); ostringstream prologue; - prologue< using namespace epics::pvData; +using std::stringstream; +using std::endl; +using std::cout; namespace epics { namespace pvAccess { @@ -29,9 +32,9 @@ namespace epics { void hexDump(const String prologue, const String name, const int8 *bs, int start, int len) { - std::stringstream header; + stringstream header; - header<enqueueSendRequest(&dts); + if(!transport->isClosed()) + transport->enqueueSendRequest(&dts); + else + break; sleep(1); } From 8f0b4d859444e428302a6b8ab53dae8af368aff1 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Fri, 7 Jan 2011 14:42:00 +0100 Subject: [PATCH 7/7] Using osiSockAddress parameter by reference wherever possible. --- .cproject | 5 +- pvAccessApp/remote/blockingTCP.h | 6 +- pvAccessApp/remote/blockingTCPConnector.cpp | 20 +++--- pvAccessApp/remote/blockingUDP.h | 20 +++--- pvAccessApp/remote/blockingUDPConnector.cpp | 6 +- pvAccessApp/remote/blockingUDPTransport.cpp | 72 ++++++++++++--------- pvAccessApp/remote/channelSearchManager.h | 2 +- pvAccessApp/remote/remote.h | 4 +- pvAccessApp/server/responseHandlers.cpp | 2 +- testApp/remote/testBeaconEmitter.cpp | 2 +- testApp/remote/testBeaconHandler.cpp | 2 +- testApp/remote/testBlockingTCPClnt.cpp | 2 +- testApp/remote/testBlockingUDPClnt.cpp | 12 ++-- testApp/remote/testBlockingUDPSrv.cpp | 2 +- 14 files changed, 86 insertions(+), 71 deletions(-) diff --git a/.cproject b/.cproject index 0e2a39f..e91f269 100644 --- a/.cproject +++ b/.cproject @@ -307,13 +307,15 @@ make + all true true - true + false make + clean true true @@ -321,7 +323,6 @@ make - uninstall true true diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 5e4a20d..d7302e3 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -117,7 +117,7 @@ namespace epics { _verified = true; } - virtual void setRecipient(const osiSockAddr* sendTo) { + virtual void setRecipient(const osiSockAddr& sendTo) { // noop } @@ -492,7 +492,7 @@ namespace epics { virtual ~BlockingTCPConnector(); virtual Transport* connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* address, + ResponseHandler* responseHandler, osiSockAddr& address, short transportRevision, int16 priority); private: /** @@ -528,7 +528,7 @@ namespace epics { * @return the SOCKET * @throws IOException */ - SOCKET tryConnect(osiSockAddr* address, int tries); + SOCKET tryConnect(osiSockAddr& address, int tries); }; diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index 0fd7687..4a066b6 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -32,13 +32,13 @@ namespace epics { delete _namedLocker; } - SOCKET BlockingTCPConnector::tryConnect(osiSockAddr* address, int tries) { + SOCKET BlockingTCPConnector::tryConnect(osiSockAddr& address, int tries) { for(int tryCount = 0; tryCount0) epicsThreadSleep(0.1); char strBuffer[64]; - ipAddrToA(&address->ia, strBuffer, sizeof(strBuffer)); + ipAddrToA(&address.ia, strBuffer, sizeof(strBuffer)); errlogSevPrintf(errlogInfo, "Opening socket to CA server %s, attempt %d.", @@ -53,7 +53,7 @@ namespace epics { strBuffer); } else { - if(::connect(socket, &address->sa, sizeof(sockaddr))==0) + if(::connect(socket, &address.sa, sizeof(sockaddr))==0) return socket; else { epicsSocketConvertErrnoToString(strBuffer, @@ -67,19 +67,19 @@ namespace epics { } Transport* BlockingTCPConnector::connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* address, + ResponseHandler* responseHandler, osiSockAddr& address, short transportRevision, int16 priority) { SOCKET socket = INVALID_SOCKET; char ipAddrStr[64]; - ipAddrToA(&address->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToA(&address.ia, ipAddrStr, sizeof(ipAddrStr)); // first try to check cache w/o named lock... BlockingClientTCPTransport * transport = (BlockingClientTCPTransport*)(_context->getTransportRegistry()->get( - "TCP", address, priority)); + "TCP", &address, priority)); if(transport!=NULL) { errlogSevPrintf(errlogInfo, "Reusing existing connection to CA server: %s", @@ -88,13 +88,13 @@ namespace epics { } bool lockAcquired = _namedLocker->acquireSynchronizationObject( - address, LOCK_TIMEOUT); + &address, LOCK_TIMEOUT); if(lockAcquired) { try { // ... transport created during waiting in lock transport = (BlockingClientTCPTransport*)(_context->getTransportRegistry()->get( - "TCP", address, priority)); + "TCP", &address, priority)); if(transport!=NULL) { errlogSevPrintf(errlogInfo, "Reusing existing connection to CA server: %s", @@ -156,10 +156,10 @@ namespace epics { } catch(...) { // close socket, if open if(socket!=INVALID_SOCKET) epicsSocketDestroy(socket); - _namedLocker->releaseSynchronizationObject(address); + _namedLocker->releaseSynchronizationObject(&address); throw; } - _namedLocker->releaseSynchronizationObject(address); + _namedLocker->releaseSynchronizationObject(&address); } else { ostringstream temp; diff --git a/pvAccessApp/remote/blockingUDP.h b/pvAccessApp/remote/blockingUDP.h index 785f327..43b1b6f 100644 --- a/pvAccessApp/remote/blockingUDP.h +++ b/pvAccessApp/remote/blockingUDP.h @@ -32,7 +32,7 @@ namespace epics { public TransportSendControl { public: BlockingUDPTransport(ResponseHandler* responseHandler, - SOCKET channel, osiSockAddr* bindAddress, + SOCKET channel, osiSockAddr& bindAddress, InetAddrVector* sendAddresses, short remoteTransportRevision); @@ -107,8 +107,10 @@ namespace epics { // noop since all UDP requests are sent immediately } - virtual void setRecipient(const osiSockAddr* sendTo) { - _sendTo = sendTo; + virtual void setRecipient(const osiSockAddr& sendTo) { + if(_sendTo!=NULL) delete _sendTo; + _sendTo = new osiSockAddr; + memcpy(_sendTo, &sendTo, sizeof(osiSockAddr)); } virtual void flushSerializeBuffer() { @@ -135,7 +137,9 @@ namespace epics { return _ignoredAddresses; } - bool send(ByteBuffer* buffer, const osiSockAddr* address = NULL); + bool send(ByteBuffer* buffer, const osiSockAddr& address); + + bool send(ByteBuffer* buffer); /** * Get list of send addresses. @@ -149,7 +153,7 @@ namespace epics { * Get bind address. * @return bind address. */ - osiSockAddr* getBindAddress() { + const osiSockAddr* getBindAddress() const { return _bindAddress; } @@ -177,7 +181,7 @@ namespace epics { private: static void threadRunner(void* param); - bool processBuffer(osiSockAddr* fromAddress, + bool processBuffer(osiSockAddr& fromAddress, epics::pvData::ByteBuffer* receiveBuffer); // Context only used for logging in this class @@ -207,7 +211,7 @@ namespace epics { */ InetAddrVector* _ignoredAddresses; - const osiSockAddr* _sendTo; + osiSockAddr* _sendTo; /** * Receive buffer. @@ -259,7 +263,7 @@ namespace epics { * NOTE: transport client is ignored for broadcast (UDP). */ virtual Transport* connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* bindAddress, + ResponseHandler* responseHandler, osiSockAddr& bindAddress, short transportRevision, int16 priority); private: diff --git a/pvAccessApp/remote/blockingUDPConnector.cpp b/pvAccessApp/remote/blockingUDPConnector.cpp index 002220b..3e97a93 100644 --- a/pvAccessApp/remote/blockingUDPConnector.cpp +++ b/pvAccessApp/remote/blockingUDPConnector.cpp @@ -24,10 +24,10 @@ namespace epics { namespace pvAccess { Transport* BlockingUDPConnector::connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* bindAddress, + ResponseHandler* responseHandler, osiSockAddr& bindAddress, short transportRevision, int16 priority) { errlogSevPrintf(errlogInfo, "Creating datagram socket to: %s", - inetAddressToString(bindAddress).c_str()); + inetAddressToString(&bindAddress).c_str()); SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if(socket==INVALID_SOCKET) { @@ -45,7 +45,7 @@ namespace epics { * because of an early setsockopt call failing. */ - int retval = ::bind(socket, (sockaddr*)&(bindAddress->sa), + int retval = ::bind(socket, (sockaddr*)&(bindAddress.sa), sizeof(sockaddr)); if(retval<0) { errlogSevPrintf(errlogMajor, "Error binding socket: %s", diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 9a24c71..3f53a8a 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -34,21 +34,28 @@ namespace epics { BlockingUDPTransport::BlockingUDPTransport( ResponseHandler* responseHandler, SOCKET channel, - osiSockAddr* bindAddress, InetAddrVector* sendAddresses, + osiSockAddr& bindAddress, InetAddrVector* sendAddresses, short remoteTransportRevision) : _closed(false), _responseHandler(responseHandler), - _channel(channel), _socketAddress(bindAddress), - _bindAddress(bindAddress), _sendAddresses(sendAddresses), + _channel(channel), _sendAddresses(sendAddresses), _ignoredAddresses(NULL), _sendTo(NULL), _receiveBuffer( new ByteBuffer(MAX_UDP_RECV)), _sendBuffer( new ByteBuffer(MAX_UDP_RECV)), _lastMessageStartPosition(0), _readBuffer( new char[MAX_UDP_RECV]), _mutex(new Mutex()), _threadId(NULL) { + _socketAddress = new osiSockAddr; + memcpy(_socketAddress, &bindAddress, sizeof(osiSockAddr)); + _bindAddress = _socketAddress; + } BlockingUDPTransport::~BlockingUDPTransport() { close(true); // close the socket and stop the thread. + if(_sendTo!=NULL) delete _sendTo; + delete _socketAddress; + // _bindAddress equals _socketAddress + delete _receiveBuffer; delete _sendBuffer; delete[] _readBuffer; @@ -89,7 +96,10 @@ namespace epics { sender->send(_sendBuffer, this); sender->unlock(); endMessage(); - send(_sendBuffer, _sendTo); + if(_sendTo==NULL) + send(_sendBuffer); + else + send(_sendBuffer, *_sendTo); } catch(...) { sender->unlock(); } @@ -172,7 +182,7 @@ namespace epics { _receiveBuffer->flip(); - processBuffer(&fromAddress, _receiveBuffer); + processBuffer(fromAddress, _receiveBuffer); } } else { @@ -210,7 +220,7 @@ namespace epics { errlogSevPrintf(errlogInfo, "Thread '%s' exiting", threadName); } - bool BlockingUDPTransport::processBuffer(osiSockAddr* fromAddress, + bool BlockingUDPTransport::processBuffer(osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) { // handle response(s) @@ -238,7 +248,7 @@ namespace epics { if(nextRequestPosition>receiveBuffer->getLimit()) return false; // handle - _responseHandler->handleResponse(fromAddress, this, + _responseHandler->handleResponse(&fromAddress, this, (int8)(magicAndVersion&0xFF), command, payloadSize, _receiveBuffer); @@ -251,32 +261,32 @@ namespace epics { } bool BlockingUDPTransport::send(ByteBuffer* buffer, - const osiSockAddr* address) { - if(address==NULL&&_sendAddresses==NULL) return false; + const osiSockAddr& address) { - if(address!=NULL) { - buffer->flip(); - int retval = - sendto(_channel, buffer->getArray(), - buffer->getLimit(), 0, &(address->sa), - sizeof(sockaddr)); - if(retval<0) { - errlogSevPrintf(errlogMajor, "Socket sendto error: %s", - strerror(errno)); - return false; - } + buffer->flip(); + int retval = sendto(_channel, buffer->getArray(), + buffer->getLimit(), 0, &(address.sa), sizeof(sockaddr)); + if(retval<0) { + errlogSevPrintf(errlogMajor, "Socket sendto error: %s", + strerror(errno)); + return false; } - else { - for(size_t i = 0; i<_sendAddresses->size(); i++) { - buffer->flip(); - int retval = sendto(_channel, buffer->getArray(), - buffer->getLimit(), 0, - &(_sendAddresses->at(i)->sa), sizeof(sockaddr)); - { - if(retval<0) errlogSevPrintf(errlogMajor, - "Socket sendto error: %s", strerror(errno)); - return false; - } + + return true; + } + + bool BlockingUDPTransport::send(ByteBuffer* buffer) { + if(_sendAddresses==NULL) return false; + + for(size_t i = 0; i<_sendAddresses->size(); i++) { + buffer->flip(); + int retval = sendto(_channel, buffer->getArray(), + buffer->getLimit(), 0, &(_sendAddresses->at(i)->sa), + sizeof(sockaddr)); + { + if(retval<0) errlogSevPrintf(errlogMajor, + "Socket sendto error: %s", strerror(errno)); + return false; } } diff --git a/pvAccessApp/remote/channelSearchManager.h b/pvAccessApp/remote/channelSearchManager.h index b180e71..cb5f3c3 100644 --- a/pvAccessApp/remote/channelSearchManager.h +++ b/pvAccessApp/remote/channelSearchManager.h @@ -418,7 +418,7 @@ class MockTransportSendControl: public TransportSendControl public: void endMessage() {} void flush(bool lastMessageCompleted) {} - void setRecipient(const osiSockAddr* sendTo) {} + void setRecipient(const osiSockAddr& sendTo) {} void startMessage(int8 command, int32 ensureCapacity) {} void ensureBuffer(int32 size) {} void flushSerializeBuffer() {} diff --git a/pvAccessApp/remote/remote.h b/pvAccessApp/remote/remote.h index 4b55a0b..46f7a12 100644 --- a/pvAccessApp/remote/remote.h +++ b/pvAccessApp/remote/remote.h @@ -54,7 +54,7 @@ namespace epics { virtual void flush(bool lastMessageCompleted) =0; - virtual void setRecipient(const osiSockAddr* sendTo) =0; + virtual void setRecipient(const osiSockAddr& sendTo) =0; }; /** @@ -326,7 +326,7 @@ namespace epics { * @throws ConnectionException */ virtual Transport* connect(TransportClient* client, - ResponseHandler* responseHandler, osiSockAddr* address, + ResponseHandler* responseHandler, osiSockAddr& address, short transportRevision, int16 priority) =0; }; diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 0471373..e3bbad5 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -147,7 +147,7 @@ namespace epics { virtual void send(ByteBuffer* buffer, TransportSendControl* control) { control->startMessage(CMD_ECHO, 0); - control->setRecipient(&_echoFrom); + control->setRecipient(_echoFrom); } virtual void lock() { diff --git a/testApp/remote/testBeaconEmitter.cpp b/testApp/remote/testBeaconEmitter.cpp index 9ddd828..0d3c904 100644 --- a/testApp/remote/testBeaconEmitter.cpp +++ b/testApp/remote/testBeaconEmitter.cpp @@ -53,7 +53,7 @@ void testBeaconEmitter() bindAddr.ia.sin_family = AF_INET; bindAddr.ia.sin_port = htons(5066); bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); - Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50); + Transport* transport = connector.connect(NULL, &drh, bindAddr, 1, 50); cout<<"Sending beacons"<getRemoteAddress()); diff --git a/testApp/remote/testBeaconHandler.cpp b/testApp/remote/testBeaconHandler.cpp index 30a51e2..0ac231c 100644 --- a/testApp/remote/testBeaconHandler.cpp +++ b/testApp/remote/testBeaconHandler.cpp @@ -112,7 +112,7 @@ void testBeaconHandler() bindAddr.ia.sin_family = AF_INET; bindAddr.ia.sin_port = htons(5067); bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); - Transport* transport = connector.connect(NULL, &brh, &bindAddr, 1, 50); + Transport* transport = connector.connect(NULL, &brh, bindAddr, 1, 50); (static_cast(transport))->start(); while(1) sleep(1); diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp index c583b74..3f69392 100644 --- a/testApp/remote/testBlockingTCPClnt.cpp +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -118,7 +118,7 @@ void testBlockingTCPSender() { return; } - Transport* transport = connector.connect(&dtc, &drh, &srvAddr, + Transport* transport = connector.connect(&dtc, &drh, srvAddr, CA_MAGIC_AND_VERSION, CA_DEFAULT_PRIORITY); cout<<"Sending 10 messages..."< #include +#define SRV_IP "192.168.71.132" + using namespace epics::pvAccess; using namespace epics::pvData; @@ -41,15 +43,13 @@ public: } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - // set recipient - sendTo.ia.sin_family = AF_INET; - sendTo.ia.sin_port = htons(65000); - if(aToIPAddr("192.168.71.129", 65000, &sendTo.ia)<0) { + // SRV_IP defined at the top of the this file + if(aToIPAddr(SRV_IP, 65000, &sendTo.ia)<0) { cout<<"error in aToIPAddr(...)"<setRecipient(&sendTo); + control->setRecipient(sendTo); // send the packet count++; @@ -79,7 +79,7 @@ void testBlockingUDPSender() { bindAddr.ia.sin_port = htons(65001); bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); - Transport* transport = connector.connect(NULL, &drh, &bindAddr, 1, 50); + Transport* transport = connector.connect(NULL, &drh, bindAddr, 1, 50); cout<<"Sending 10 packets..."<start();