From 0af1781b5eda4a7b96a1435187cf7cee5ae34094 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Wed, 5 Jan 2011 13:36:07 +0100 Subject: [PATCH 1/4] Added connected clients tracking to BlockingTCPAcceptor. For each new connection a new 'BlockingServerTCPTransport' object is created, but was never released. The acceptor now stores all the objects in a set, and registers a callback with the client, to be notified when the connection is closed. On notification the client is them removed from the set and the memory is released. When the acceptor is destroyed, it also closes and deleted all the connected clients. --- .../remote/blockingClientTCPTransport.cpp | 8 ++-- .../remote/blockingServerTCPTransport.cpp | 3 +- pvAccessApp/remote/blockingTCP.h | 31 ++++++++++++- pvAccessApp/remote/blockingTCPAcceptor.cpp | 45 +++++++++++++++++-- pvAccessApp/remote/blockingTCPTransport.cpp | 3 +- 5 files changed, 79 insertions(+), 11 deletions(-) diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index e380b58..75f15d4 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -125,12 +125,12 @@ namespace epics { // check if still acquired int refs = _owners->size(); if(refs>0) { - ostringstream temp; char ipAddrStr[48]; ipAddrToA(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); - temp<<"Transport to "<::iterator it = _owners->begin(); for(; it!=_owners->end(); it++) diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 29272c5..9bd0a93 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -33,7 +33,7 @@ namespace epics { _introspectionRegistry(new IntrospectionRegistry(true)), _lastChannelSID(0), _channels( new map ()), _channelsMutex( - new Mutex()) { + new Mutex()), _notifyOnClose(NULL) { // NOTE: priority not yet known, default priority is used to register/unregister // TODO implement priorities in Reactor... not that user will // change it.. still getPriority() must return "registered" priority! @@ -68,6 +68,7 @@ namespace epics { void BlockingServerTCPTransport::internalClose(bool force) { BlockingTCPTransport::internalClose(force); + if(_notifyOnClose!=NULL) _notifyOnClose->transportClosed(this); destroyAllChannels(); } diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index c6bd5dc..8e82224 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -37,6 +37,7 @@ namespace epics { namespace pvAccess { class MonitorSender; + class BlockingServerTCPTransport; enum ReceiveStage { READ_FROM_SOCKET, PROCESS_HEADER, PROCESS_PAYLOAD, NONE @@ -46,6 +47,19 @@ namespace epics { IMMEDIATE, DELAYED, USER_CONTROLED }; + class TransportCloseNotification { + public: + virtual ~TransportCloseNotification() { + } + + /** + * When transport closes, the owner will be notified through this + * callback + */ + virtual void + transportClosed(BlockingServerTCPTransport* transport) =0; + }; + class BlockingTCPTransport : public Transport, public TransportSendControl { public: @@ -504,7 +518,8 @@ namespace epics { /** * named lock */ - NamedLockPattern* _namedLocker; + NamedLockPattern + * _namedLocker; /** * Receive buffer size. @@ -623,6 +638,10 @@ namespace epics { virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); + void addCloseNotification(TransportCloseNotification* notifyTarget) { + _notifyOnClose = notifyTarget; + } + protected: /** * Introspection registry. @@ -644,6 +663,8 @@ namespace epics { Mutex* _channelsMutex; + TransportCloseNotification* _notifyOnClose; + /** * Destroy all channels. */ @@ -655,7 +676,7 @@ namespace epics { * @author Matej Sekoranja * @version $Id: BlockingTCPAcceptor.java,v 1.1 2010/05/03 14:45:42 mrkraimer Exp $ */ - class BlockingTCPAcceptor { + class BlockingTCPAcceptor : public TransportCloseNotification { public: /** @@ -684,6 +705,8 @@ namespace epics { */ void destroy(); + virtual void transportClosed(BlockingServerTCPTransport* transport); + private: /** * Context instance. @@ -712,6 +735,10 @@ namespace epics { epicsThreadId _threadId; + std::set* _connectedClients; + + Mutex* _connectedClientsMutex; + /** * Initialize connection acception. * @return port where server is listening diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index d35b92c..3d83c31 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -24,6 +24,7 @@ #include using std::ostringstream; +using std::set; namespace epics { namespace pvAccess { @@ -32,12 +33,30 @@ namespace epics { int receiveBufferSize) : _context(context), _bindAddress(NULL), _serverSocketChannel( INVALID_SOCKET), _receiveBufferSize(receiveBufferSize), - _destroyed(false), _threadId(NULL) { + _destroyed(false), _threadId(NULL), _connectedClients( + new set ()), + _connectedClientsMutex(new Mutex()) { initialize(port); } BlockingTCPAcceptor::~BlockingTCPAcceptor() { if(_bindAddress!=NULL) delete _bindAddress; + + _connectedClientsMutex->lock(); + // go through all the connected clients, close them, and destroy + set::iterator it = + _connectedClients->begin(); + while(it!=_connectedClients->end()) { + BlockingServerTCPTransport* client = *it; + it++; + client->close(true); + delete client; + } + _connectedClients->clear(); + delete _connectedClients; + _connectedClientsMutex->unlock(); + + delete _connectedClientsMutex; } int BlockingTCPAcceptor::initialize(in_port_t port) { @@ -213,8 +232,10 @@ namespace epics { //socket.socket().setReceiveBufferSize(); //socket.socket().setSendBufferSize(); - // create transport - // each transport should have its own response handler since it is not "shareable" + /* create transport + * each transport should have its own response + * handler since it is not "shareable" + */ BlockingServerTCPTransport * transport = new BlockingServerTCPTransport( @@ -231,9 +252,16 @@ namespace epics { errlogInfo, "Connection to CA client %s failed to be validated, closing it.", ipAddrStr); + delete transport; return; } + // store the new connected client + _connectedClientsMutex->lock(); + _connectedClients->insert(transport); + transport->addCloseNotification(this); + _connectedClientsMutex->unlock(); + errlogSevPrintf(errlogInfo, "Serving to CA client: %s", ipAddrStr); @@ -277,5 +305,16 @@ namespace epics { } } + void BlockingTCPAcceptor::transportClosed( + BlockingServerTCPTransport* transport) { + Lock lock(_connectedClientsMutex); + + // remove the closed client from the list of connected clients + _connectedClients->erase(transport); + + // release the memory + delete transport; + } + } } diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index b746ccd..e9a769c 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -208,7 +208,8 @@ namespace epics { } void BlockingTCPTransport::internalClose(bool force) { - // noop + // close the socket + epicsSocketDestroy(_channel); } int BlockingTCPTransport::getSocketReceiveBufferSize() const { From 3c1b9d9743b3065b19472741db537c36e7cf857d Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 5 Jan 2011 18:02:09 +0100 Subject: [PATCH 2/4] MacOSX port, this needs to be portable... epicsMutex, but there is no timedlock. --- pvAccessApp/utils/referenceCountingLock.cpp | 5 +++++ testApp/utils/namedLockPatternTest.cpp | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pvAccessApp/utils/referenceCountingLock.cpp b/pvAccessApp/utils/referenceCountingLock.cpp index abd2250..11ab0ea 100644 --- a/pvAccessApp/utils/referenceCountingLock.cpp +++ b/pvAccessApp/utils/referenceCountingLock.cpp @@ -41,6 +41,10 @@ ReferenceCountingLock::~ReferenceCountingLock() bool ReferenceCountingLock::acquire(int64 msecs) { +#ifdef darwin + // timedlock not supported by Darwin OS + return (pthread_mutex_lock(&_mutex) == 0); +#else struct timespec deltatime; deltatime.tv_sec = msecs / 1000; deltatime.tv_nsec = (msecs % 1000) * 1000; @@ -51,6 +55,7 @@ bool ReferenceCountingLock::acquire(int64 msecs) return true; } return false; +#endif } void ReferenceCountingLock::release() diff --git a/testApp/utils/namedLockPatternTest.cpp b/testApp/utils/namedLockPatternTest.cpp index 9224800..edbfbbe 100644 --- a/testApp/utils/namedLockPatternTest.cpp +++ b/testApp/utils/namedLockPatternTest.cpp @@ -156,7 +156,7 @@ void* testWorker2(void* p) assert(namedGuard.acquireSynchronizationObject(addr,timeout)); usleep(1); } - +#ifndef darwin //this thread sleeps a while and gets timeout on lock { sleep(1); @@ -167,7 +167,7 @@ void* testWorker2(void* p) NamedLock namedGuard(namedLockPattern); assert(!namedGuard.acquireSynchronizationObject(addr,timeout)); } - +#endif return NULL; } From 6c3b00dc86c0d17a910a5df8e240fda483f75291 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Thu, 6 Jan 2011 09:49:22 +0100 Subject: [PATCH 3/4] configure/CONFIG_SITE: - added explicit instructions to link pthread library, since it is not included by default on Fedora linux (possibly others?) testRemoteClientImpl.cpp: - added "#include " needed for std::auto_ptr --- configure/CONFIG_SITE | 2 ++ testApp/remote/testRemoteClientImpl.cpp | 1 + 2 files changed, 3 insertions(+) diff --git a/configure/CONFIG_SITE b/configure/CONFIG_SITE index f3bfd36..287667b 100644 --- a/configure/CONFIG_SITE +++ b/configure/CONFIG_SITE @@ -23,3 +23,5 @@ # You must rebuild in the iocBoot directory for this to # take effect. #IOCS_APPL_TOP = + +USR_LDFLAGS += -lpthread diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 11893a8..846d7c7 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include From 8339c338de54f68bf3c2314aba681fbde3966eb3 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Thu, 6 Jan 2011 14:58:32 +0100 Subject: [PATCH 4/4] blockingServerTCPTransport.cpp: - using enum to specify command. BlockingTCPAcceptor.cpp: - added 'destroy()' to dtor - added parentheses to expressions blockingTCPCinnector.cpp: - fixed log message blockingTCPTransport.cpp: - added _socketAddress allocation - fixed waiting for timeout in 'waitUntilVerified' - fixed how many bytes are copied from the buffer responseHandlers.cpp: - added 'ConnectionValidationHandler' implementation - added 'ConnectionValidationHandler' to 'ServerResponseHandler' responseHandlers.h: - added 'ConnectionValidationHandler' declaration inetAddressUtil.cpp: - fixed all issues with sockaddr_in byte-order - removed function 'processAddressForList', using EPICSv3 'aToIPAddr' instead inetAddressUtilsTest.cpp: - fixed the tests in accordance with the correct function implementation testBlockingUDPClnt.cpp: - deleting transport at the end of the test testBlockingTCPSrv.cpp, testBlockingTCPClnt.cpp: - added tests (work in progress). Makefile: - added blockingTCP tests --- .../remote/blockingServerTCPTransport.cpp | 2 +- pvAccessApp/remote/blockingTCPAcceptor.cpp | 6 +- pvAccessApp/remote/blockingTCPConnector.cpp | 2 +- pvAccessApp/remote/blockingTCPTransport.cpp | 9 +- pvAccessApp/server/responseHandlers.cpp | 20 ++- pvAccessApp/server/responseHandlers.h | 18 ++- pvAccessApp/utils/inetAddressUtil.cpp | 39 ++--- testApp/remote/Makefile | 8 ++ testApp/remote/testBlockingTCPClnt.cpp | 134 ++++++++++++++++++ testApp/remote/testBlockingTCPSrv.cpp | 53 +++++++ testApp/remote/testBlockingUDPClnt.cpp | 2 + testApp/utils/inetAddressUtilsTest.cpp | 42 +++--- 12 files changed, 282 insertions(+), 53 deletions(-) create mode 100644 testApp/remote/testBlockingTCPClnt.cpp create mode 100644 testApp/remote/testBlockingTCPSrv.cpp diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 9bd0a93..32bb1db 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -111,7 +111,7 @@ namespace epics { // // send verification message // - control->startMessage(1, 2*sizeof(int32)); + control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)); // receive buffer size buffer->putInt(getReceiveBufferSize()); diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index 3d83c31..ab4c21e 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -40,6 +40,8 @@ namespace epics { } BlockingTCPAcceptor::~BlockingTCPAcceptor() { + destroy(); + if(_bindAddress!=NULL) delete _bindAddress; _connectedClientsMutex->lock(); @@ -190,7 +192,7 @@ namespace epics { } else if(retval>0) { // some event on a socket - if(sockets[0].revents&POLLIN!=0) { + if((sockets[0].revents&POLLIN)!=0) { // connection waiting osiSockAddr address; @@ -267,7 +269,7 @@ namespace epics { }// accept succeeded } // connection waiting - if(sockets[0].revents&(POLLERR|POLLHUP|POLLNVAL)!=0) { + if((sockets[0].revents&(POLLERR|POLLHUP|POLLNVAL))!=0) { errlogSevPrintf(errlogMajor, "error on a socket: POLLERR|POLLHUP|POLLNVAL"); socketOpen = false; diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index 0c7e7f1..0fd7687 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -139,7 +139,7 @@ namespace epics { transport->close(true); errlogSevPrintf( errlogInfo, - "Connection to CA client %s failed to be validated, closing it.", + "Connection to CA server %s failed to be validated, closing it.", ipAddrStr); ostringstream temp; temp<<"Failed to verify TCP connection to '"<unlock(); - while(!internalVerified&&internalTimeout<=0) { + while(!internalVerified&&internalTimeout>0) { epicsThreadSleep(min(0.1, internalTimeout)); internalTimeout -= 0.1; @@ -459,7 +460,7 @@ namespace epics { _socketBuffer->getRemaining()); ssize_t bytesRead = recv(_channel, readBuffer, maxToRead, 0); - _socketBuffer->put(readBuffer, 0, maxToRead); + _socketBuffer->put(readBuffer, 0, bytesRead); if(bytesRead<0) { // error (disconnect, end-of-stream) detected diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 745387a..f69c5c7 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -64,7 +64,7 @@ namespace epics { _handlerTable = new ResponseHandler*[HANDLER_TABLE_LENGTH]; // TODO add real handlers, as they are developed _handlerTable[0] = badResponse; - _handlerTable[1] = badResponse; + _handlerTable[1] = new ConnectionValidationHandler(_context); _handlerTable[2] = badResponse; _handlerTable[3] = badResponse; _handlerTable[4] = badResponse; @@ -95,6 +95,7 @@ namespace epics { ServerResponseHandler::~ServerResponseHandler() { delete _handlerTable[0]; + delete _handlerTable[1]; delete[] _handlerTable; } @@ -119,5 +120,22 @@ namespace epics { version, command, payloadSize, payloadBuffer); } + void ConnectionValidationHandler::handleResponse( + osiSockAddr* responseFrom, Transport* transport, int8 version, + int8 command, int payloadSize, + epics::pvData::ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + transport->ensureData(2*sizeof(int32)+sizeof(int16)); + transport->setRemoteTransportReceiveBufferSize( + payloadBuffer->getInt()); + transport->setRemoteTransportSocketReceiveBufferSize( + payloadBuffer->getInt()); + transport->setRemoteMinorRevision(version); + // TODO support priority !!! + //transport.setPriority(payloadBuffer.getShort()); + } + } } diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index ef74d08..f08c399 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -84,10 +84,26 @@ namespace epics { }; + /** + * Connection validation message handler. + * @author Matej Sekoranja + * @version $Id: ConnectionValidationHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class ConnectionValidationHandler : public AbstractServerResponseHandler { + public: + /** + * @param context + */ + ConnectionValidationHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Connection validation") { + } + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; } } - #endif /* RESPONSEHANDLERS_H_ */ diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index e513380..5fff2d2 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -21,6 +21,7 @@ #include #include #include +#include using namespace std; using namespace epics::pvData; @@ -124,14 +125,14 @@ namespace epics { osiSockAddr* intToIPv4Address(int32 addr) { osiSockAddr* ret = new osiSockAddr; ret->ia.sin_family = AF_INET; - ret->ia.sin_addr.s_addr = (in_addr_t)addr; + ret->ia.sin_addr.s_addr = htonl(addr); ret->ia.sin_port = 0; return ret; } int32 ipv4AddressToInt(const osiSockAddr& addr) { - return (int32)(addr.ia.sin_addr.s_addr); + return (int32)ntohl(addr.ia.sin_addr.s_addr); } int32 parseInetAddress(const String addr) { @@ -168,24 +169,6 @@ namespace epics { return retAddr; } - osiSockAddr* processAddressForList(String address, int defaultPort) { - // check port - int port = defaultPort; - size_t pos = address.find(':'); - if(pos!=String::npos) { - port = atoi(address.substr(pos+1).c_str()); - address = address.substr(0, pos); - } - - // add parsed address - osiSockAddr* addr = new osiSockAddr; - addr->ia.sin_family = AF_INET; - addr->ia.sin_port = port; - addr->ia.sin_addr.s_addr = parseInetAddress(address); - - return addr; - } - InetAddrVector* getSocketAddressList(String list, int defaultPort, const InetAddrVector* appendList) { InetAddrVector* iav = new InetAddrVector(); @@ -195,13 +178,17 @@ namespace epics { size_t subEnd; while((subEnd = list.find(' ', subStart))!=String::npos) { String address = list.substr(subStart, (subEnd-subStart)); - - iav->push_back(processAddressForList(address, defaultPort)); + osiSockAddr* addr = new osiSockAddr; + aToIPAddr(address.c_str(), defaultPort, &addr->ia); + iav->push_back(addr); subStart = list.find_first_not_of(" \t\r\n\v", subEnd); } - if(subStart!=String::npos&&list.length()>0) iav->push_back( - processAddressForList(list.substr(subStart), defaultPort)); + if(subStart!=String::npos&&list.length()>0) { + osiSockAddr* addr = new osiSockAddr; + aToIPAddr(list.substr(subStart).c_str(), defaultPort, &addr->ia); + iav->push_back(addr); + } if(appendList!=NULL) { for(size_t i = 0; isize(); i++) @@ -221,8 +208,8 @@ namespace epics { saddr<<((int)(ipa>>8)&0xFF)<<'.'; saddr<<((int)ipa&0xFF); if(addr->ia.sin_port>0) saddr<<":"<ia.sin_port); - if(displayHex) saddr<<" ("<ia.sin_addr.s_addr))<<")"; + if(displayHex) saddr<<" ("<ia.sin_addr.s_addr)<<")"; return saddr.str(); } diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 0ddd4f4..3939bb3 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -22,6 +22,14 @@ PROD_HOST += testBeaconHandler testBeaconHandler_SRCS += testBeaconHandler.cpp testBeaconHandler_LIBS += pvData pvAccess Com +PROD_HOST += testBlockingTCPSrv +testBlockingTCPSrv_SRCS += testBlockingTCPSrv.cpp +testBlockingTCPSrv_LIBS += pvData pvAccess Com + +PROD_HOST += testBlockingTCPClnt +testBlockingTCPClnt_SRCS += testBlockingTCPClnt +testBlockingTCPClnt_LIBS += pvData pvAccess Com + include $(TOP)/configure/RULES #---------------------------------------- # ADD RULES AFTER THIS LINE diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp new file mode 100644 index 0000000..32c7c29 --- /dev/null +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -0,0 +1,134 @@ +/* + * testBlockingTCPClnt.cpp + * + * Created on: Jan 6, 2011 + * Author: Miha Vitorovic + */ + +#include "remote.h" +#include "blockingTCP.h" +#include "logger.h" +#include "inetAddressUtil.h" +#include "caConstants.h" + +#include + +#include +#include + +#include +#include + +using namespace epics::pvAccess; +using namespace epics::pvData; + +using std::cout; +using std::endl; +using std::sscanf; + +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), + _timer(new Timer("client thread", lowPriority)) {} + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { return _timer; } + virtual TransportRegistry* getTransportRegistry() { return _tr; } +private: + TransportRegistry* _tr; + Timer* _timer; +}; + +class DummyResponseHandler : public ResponseHandler { +public: + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, int payloadSize, + ByteBuffer* payloadBuffer) { + } +}; + +class DummyTransportClient : public TransportClient { +public: + DummyTransportClient() { + } + virtual ~DummyTransportClient() { + } + virtual void transportUnresponsive() { + errlogSevPrintf(errlogInfo, "unresponsive"); + } + virtual void transportResponsive(Transport* transport) { + errlogSevPrintf(errlogInfo, "responsive"); + } + virtual void transportChanged() { + errlogSevPrintf(errlogInfo, "changed"); + } + virtual void transportClosed() { + errlogSevPrintf(errlogInfo, "closed"); + } +}; + +class DummyTransportSender : public TransportSender { +public: + DummyTransportSender() { + for(int i = 0; i<20; i++) + data[i] = (char)(i+1); + count = 0; + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + // send the packet + count++; + control->startMessage(0, count); + buffer->put(data, 0, count); + //control->endMessage(); + } + + virtual void lock() { + } + virtual void unlock() { + } +private: + char data[20]; + int count; +}; + +void testBlockingTCPSender() { + ContextImpl ctx; + BlockingTCPConnector connector(&ctx, 1024, 1.0); + + DummyTransportClient dtc; + DummyTransportSender dts; + DummyResponseHandler drh; + + osiSockAddr srvAddr; + + srvAddr.ia.sin_family = AF_INET; + //srvAddr.ia.sin_port = htons(CA_SERVER_PORT); + if(aToIPAddr("192.168.71.132", CA_SERVER_PORT, &srvAddr.ia)<0) { + cout<<"error in aToIPAddr(...)"<enqueueSendRequest(&dts); + sleep(1); + } + + delete transport; +} + +int main(int argc, char *argv[]) { + createFileLogger("testBlockingTCPClnt.log"); + + testBlockingTCPSender(); + return 0; +} diff --git a/testApp/remote/testBlockingTCPSrv.cpp b/testApp/remote/testBlockingTCPSrv.cpp new file mode 100644 index 0000000..704093a --- /dev/null +++ b/testApp/remote/testBlockingTCPSrv.cpp @@ -0,0 +1,53 @@ +/* + * testBlockingTCPSrv.cpp + * + * Created on: Jan 6, 2011 + * Author: Miha Vitorovic + */ + +#include "blockingTCP.h" +#include "remote.h" +#include "logger.h" + +#include + +using namespace epics::pvData; +using namespace epics::pvAccess; + +using std::cin; +using std::cout; + +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), + _timer(new Timer("server thread", lowPriority)) {} + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { return _timer; } + virtual TransportRegistry* getTransportRegistry() { return _tr; } +private: + TransportRegistry* _tr; + Timer* _timer; +}; + +void testServerConnections() { + ContextImpl ctx; + + BlockingTCPAcceptor* srv = new BlockingTCPAcceptor(&ctx, CA_SERVER_PORT, + 1024); + + cout<<"Press any key to stop the server..."; + char c = cin.peek(); + + delete srv; +} + +int main(int argc, char *argv[]) { + + createFileLogger("testBlockingTCPSrv.log"); + + testServerConnections(); +} diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp index 8437a62..cefed59 100644 --- a/testApp/remote/testBlockingUDPClnt.cpp +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -89,6 +89,8 @@ void testBlockingUDPSender() { sleep(1); } + delete transport; + } int main(int argc, char *argv[]) { diff --git a/testApp/utils/inetAddressUtilsTest.cpp b/testApp/utils/inetAddressUtilsTest.cpp index 16695e0..d6a55d4 100644 --- a/testApp/utils/inetAddressUtilsTest.cpp +++ b/testApp/utils/inetAddressUtilsTest.cpp @@ -32,20 +32,23 @@ int main(int argc, char *argv[]) { osiSockAddr* addr; addr = vec->at(0); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==555); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0x7F000001); + assert(addr->ia.sin_port==htons(555)); + assert(addr->ia.sin_addr.s_addr==htonl(0x7F000001)); + assert(inetAddressToString(addr)=="127.0.0.1:555"); cout<<'\t'<at(1); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==1234); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0x0A0A0C0B); + assert(addr->ia.sin_port==htons(1234)); + assert(addr->ia.sin_addr.s_addr==htonl(0x0A0A0C0B)); + assert(inetAddressToString(addr)=="10.10.12.11:1234"); cout<<'\t'<at(2); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==555); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0xC0A80304); + assert(addr->ia.sin_port==htons(555)); + assert(addr->ia.sin_addr.s_addr==htonl(0xC0A80304)); + assert(inetAddressToString(addr)=="192.168.3.4:555"); cout<<'\t'<at(0); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==6789); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0xAC1037A0); - cout<<'\t'<ia.sin_port==htons(6789)); + assert(addr->ia.sin_addr.s_addr==htonl(0xAC1037A0)); + assert(inetAddressToString(addr)=="172.16.55.160:6789"); +cout<<'\t'<at(1); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==555); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0x7F000001); + assert(addr->ia.sin_port==htons(555)); + assert(addr->ia.sin_addr.s_addr==htonl(0x7F000001)); + assert(inetAddressToString(addr)=="127.0.0.1:555"); cout<<'\t'<at(2); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==1234); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0x0A0A0C0B); + assert(addr->ia.sin_port==htons(1234)); + assert(addr->ia.sin_addr.s_addr==htonl(0x0A0A0C0B)); + assert(inetAddressToString(addr)=="10.10.12.11:1234"); cout<<'\t'<at(3); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==555); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0xC0A80304); + assert(addr->ia.sin_port==htons(555)); + assert(addr->ia.sin_addr.s_addr==htonl(0xC0A80304)); + assert(inetAddressToString(addr)=="192.168.3.4:555"); cout<<'\t'<ia.sin_family==AF_INET); + assert(inetAddressToString(addr)=="127.0.0.1"); cout<<'\t'<ia.sin_family==AF_INET); + assert(inetAddressToString(addr)=="10.10.12.11"); cout<<'\t'<getArray(),src,16)==0); + assert(strncmp(buff->getArray(), src, 16)==0); cout<<"\nPASSED!\n"; // TODO add test for 'getBroadcastAddresses'