From 8339c338de54f68bf3c2314aba681fbde3966eb3 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Thu, 6 Jan 2011 14:58:32 +0100 Subject: [PATCH] blockingServerTCPTransport.cpp: - using enum to specify command. BlockingTCPAcceptor.cpp: - added 'destroy()' to dtor - added parentheses to expressions blockingTCPCinnector.cpp: - fixed log message blockingTCPTransport.cpp: - added _socketAddress allocation - fixed waiting for timeout in 'waitUntilVerified' - fixed how many bytes are copied from the buffer responseHandlers.cpp: - added 'ConnectionValidationHandler' implementation - added 'ConnectionValidationHandler' to 'ServerResponseHandler' responseHandlers.h: - added 'ConnectionValidationHandler' declaration inetAddressUtil.cpp: - fixed all issues with sockaddr_in byte-order - removed function 'processAddressForList', using EPICSv3 'aToIPAddr' instead inetAddressUtilsTest.cpp: - fixed the tests in accordance with the correct function implementation testBlockingUDPClnt.cpp: - deleting transport at the end of the test testBlockingTCPSrv.cpp, testBlockingTCPClnt.cpp: - added tests (work in progress). Makefile: - added blockingTCP tests --- .../remote/blockingServerTCPTransport.cpp | 2 +- pvAccessApp/remote/blockingTCPAcceptor.cpp | 6 +- pvAccessApp/remote/blockingTCPConnector.cpp | 2 +- pvAccessApp/remote/blockingTCPTransport.cpp | 9 +- pvAccessApp/server/responseHandlers.cpp | 20 ++- pvAccessApp/server/responseHandlers.h | 18 ++- pvAccessApp/utils/inetAddressUtil.cpp | 39 ++--- testApp/remote/Makefile | 8 ++ testApp/remote/testBlockingTCPClnt.cpp | 134 ++++++++++++++++++ testApp/remote/testBlockingTCPSrv.cpp | 53 +++++++ testApp/remote/testBlockingUDPClnt.cpp | 2 + testApp/utils/inetAddressUtilsTest.cpp | 42 +++--- 12 files changed, 282 insertions(+), 53 deletions(-) create mode 100644 testApp/remote/testBlockingTCPClnt.cpp create mode 100644 testApp/remote/testBlockingTCPSrv.cpp diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 9bd0a93..32bb1db 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -111,7 +111,7 @@ namespace epics { // // send verification message // - control->startMessage(1, 2*sizeof(int32)); + control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)); // receive buffer size buffer->putInt(getReceiveBufferSize()); diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index 3d83c31..ab4c21e 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -40,6 +40,8 @@ namespace epics { } BlockingTCPAcceptor::~BlockingTCPAcceptor() { + destroy(); + if(_bindAddress!=NULL) delete _bindAddress; _connectedClientsMutex->lock(); @@ -190,7 +192,7 @@ namespace epics { } else if(retval>0) { // some event on a socket - if(sockets[0].revents&POLLIN!=0) { + if((sockets[0].revents&POLLIN)!=0) { // connection waiting osiSockAddr address; @@ -267,7 +269,7 @@ namespace epics { }// accept succeeded } // connection waiting - if(sockets[0].revents&(POLLERR|POLLHUP|POLLNVAL)!=0) { + if((sockets[0].revents&(POLLERR|POLLHUP|POLLNVAL))!=0) { errlogSevPrintf(errlogMajor, "error on a socket: POLLERR|POLLHUP|POLLNVAL"); socketOpen = false; diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index 0c7e7f1..0fd7687 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -139,7 +139,7 @@ namespace epics { transport->close(true); errlogSevPrintf( errlogInfo, - "Connection to CA client %s failed to be validated, closing it.", + "Connection to CA server %s failed to be validated, closing it.", ipAddrStr); ostringstream temp; temp<<"Failed to verify TCP connection to '"<unlock(); - while(!internalVerified&&internalTimeout<=0) { + while(!internalVerified&&internalTimeout>0) { epicsThreadSleep(min(0.1, internalTimeout)); internalTimeout -= 0.1; @@ -459,7 +460,7 @@ namespace epics { _socketBuffer->getRemaining()); ssize_t bytesRead = recv(_channel, readBuffer, maxToRead, 0); - _socketBuffer->put(readBuffer, 0, maxToRead); + _socketBuffer->put(readBuffer, 0, bytesRead); if(bytesRead<0) { // error (disconnect, end-of-stream) detected diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 745387a..f69c5c7 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -64,7 +64,7 @@ namespace epics { _handlerTable = new ResponseHandler*[HANDLER_TABLE_LENGTH]; // TODO add real handlers, as they are developed _handlerTable[0] = badResponse; - _handlerTable[1] = badResponse; + _handlerTable[1] = new ConnectionValidationHandler(_context); _handlerTable[2] = badResponse; _handlerTable[3] = badResponse; _handlerTable[4] = badResponse; @@ -95,6 +95,7 @@ namespace epics { ServerResponseHandler::~ServerResponseHandler() { delete _handlerTable[0]; + delete _handlerTable[1]; delete[] _handlerTable; } @@ -119,5 +120,22 @@ namespace epics { version, command, payloadSize, payloadBuffer); } + void ConnectionValidationHandler::handleResponse( + osiSockAddr* responseFrom, Transport* transport, int8 version, + int8 command, int payloadSize, + epics::pvData::ByteBuffer* payloadBuffer) { + AbstractServerResponseHandler::handleResponse(responseFrom, + transport, version, command, payloadSize, payloadBuffer); + + transport->ensureData(2*sizeof(int32)+sizeof(int16)); + transport->setRemoteTransportReceiveBufferSize( + payloadBuffer->getInt()); + transport->setRemoteTransportSocketReceiveBufferSize( + payloadBuffer->getInt()); + transport->setRemoteMinorRevision(version); + // TODO support priority !!! + //transport.setPriority(payloadBuffer.getShort()); + } + } } diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index ef74d08..f08c399 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -84,10 +84,26 @@ namespace epics { }; + /** + * Connection validation message handler. + * @author Matej Sekoranja + * @version $Id: ConnectionValidationHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $ + */ + class ConnectionValidationHandler : public AbstractServerResponseHandler { + public: + /** + * @param context + */ + ConnectionValidationHandler(ServerContextImpl* context) : + AbstractServerResponseHandler(context, "Connection validation") { + } + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, + int payloadSize, epics::pvData::ByteBuffer* payloadBuffer); + }; } } - #endif /* RESPONSEHANDLERS_H_ */ diff --git a/pvAccessApp/utils/inetAddressUtil.cpp b/pvAccessApp/utils/inetAddressUtil.cpp index e513380..5fff2d2 100644 --- a/pvAccessApp/utils/inetAddressUtil.cpp +++ b/pvAccessApp/utils/inetAddressUtil.cpp @@ -21,6 +21,7 @@ #include #include #include +#include using namespace std; using namespace epics::pvData; @@ -124,14 +125,14 @@ namespace epics { osiSockAddr* intToIPv4Address(int32 addr) { osiSockAddr* ret = new osiSockAddr; ret->ia.sin_family = AF_INET; - ret->ia.sin_addr.s_addr = (in_addr_t)addr; + ret->ia.sin_addr.s_addr = htonl(addr); ret->ia.sin_port = 0; return ret; } int32 ipv4AddressToInt(const osiSockAddr& addr) { - return (int32)(addr.ia.sin_addr.s_addr); + return (int32)ntohl(addr.ia.sin_addr.s_addr); } int32 parseInetAddress(const String addr) { @@ -168,24 +169,6 @@ namespace epics { return retAddr; } - osiSockAddr* processAddressForList(String address, int defaultPort) { - // check port - int port = defaultPort; - size_t pos = address.find(':'); - if(pos!=String::npos) { - port = atoi(address.substr(pos+1).c_str()); - address = address.substr(0, pos); - } - - // add parsed address - osiSockAddr* addr = new osiSockAddr; - addr->ia.sin_family = AF_INET; - addr->ia.sin_port = port; - addr->ia.sin_addr.s_addr = parseInetAddress(address); - - return addr; - } - InetAddrVector* getSocketAddressList(String list, int defaultPort, const InetAddrVector* appendList) { InetAddrVector* iav = new InetAddrVector(); @@ -195,13 +178,17 @@ namespace epics { size_t subEnd; while((subEnd = list.find(' ', subStart))!=String::npos) { String address = list.substr(subStart, (subEnd-subStart)); - - iav->push_back(processAddressForList(address, defaultPort)); + osiSockAddr* addr = new osiSockAddr; + aToIPAddr(address.c_str(), defaultPort, &addr->ia); + iav->push_back(addr); subStart = list.find_first_not_of(" \t\r\n\v", subEnd); } - if(subStart!=String::npos&&list.length()>0) iav->push_back( - processAddressForList(list.substr(subStart), defaultPort)); + if(subStart!=String::npos&&list.length()>0) { + osiSockAddr* addr = new osiSockAddr; + aToIPAddr(list.substr(subStart).c_str(), defaultPort, &addr->ia); + iav->push_back(addr); + } if(appendList!=NULL) { for(size_t i = 0; isize(); i++) @@ -221,8 +208,8 @@ namespace epics { saddr<<((int)(ipa>>8)&0xFF)<<'.'; saddr<<((int)ipa&0xFF); if(addr->ia.sin_port>0) saddr<<":"<ia.sin_port); - if(displayHex) saddr<<" ("<ia.sin_addr.s_addr))<<")"; + if(displayHex) saddr<<" ("<ia.sin_addr.s_addr)<<")"; return saddr.str(); } diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 0ddd4f4..3939bb3 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -22,6 +22,14 @@ PROD_HOST += testBeaconHandler testBeaconHandler_SRCS += testBeaconHandler.cpp testBeaconHandler_LIBS += pvData pvAccess Com +PROD_HOST += testBlockingTCPSrv +testBlockingTCPSrv_SRCS += testBlockingTCPSrv.cpp +testBlockingTCPSrv_LIBS += pvData pvAccess Com + +PROD_HOST += testBlockingTCPClnt +testBlockingTCPClnt_SRCS += testBlockingTCPClnt +testBlockingTCPClnt_LIBS += pvData pvAccess Com + include $(TOP)/configure/RULES #---------------------------------------- # ADD RULES AFTER THIS LINE diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp new file mode 100644 index 0000000..32c7c29 --- /dev/null +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -0,0 +1,134 @@ +/* + * testBlockingTCPClnt.cpp + * + * Created on: Jan 6, 2011 + * Author: Miha Vitorovic + */ + +#include "remote.h" +#include "blockingTCP.h" +#include "logger.h" +#include "inetAddressUtil.h" +#include "caConstants.h" + +#include + +#include +#include + +#include +#include + +using namespace epics::pvAccess; +using namespace epics::pvData; + +using std::cout; +using std::endl; +using std::sscanf; + +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), + _timer(new Timer("client thread", lowPriority)) {} + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { return _timer; } + virtual TransportRegistry* getTransportRegistry() { return _tr; } +private: + TransportRegistry* _tr; + Timer* _timer; +}; + +class DummyResponseHandler : public ResponseHandler { +public: + virtual void handleResponse(osiSockAddr* responseFrom, + Transport* transport, int8 version, int8 command, int payloadSize, + ByteBuffer* payloadBuffer) { + } +}; + +class DummyTransportClient : public TransportClient { +public: + DummyTransportClient() { + } + virtual ~DummyTransportClient() { + } + virtual void transportUnresponsive() { + errlogSevPrintf(errlogInfo, "unresponsive"); + } + virtual void transportResponsive(Transport* transport) { + errlogSevPrintf(errlogInfo, "responsive"); + } + virtual void transportChanged() { + errlogSevPrintf(errlogInfo, "changed"); + } + virtual void transportClosed() { + errlogSevPrintf(errlogInfo, "closed"); + } +}; + +class DummyTransportSender : public TransportSender { +public: + DummyTransportSender() { + for(int i = 0; i<20; i++) + data[i] = (char)(i+1); + count = 0; + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + // send the packet + count++; + control->startMessage(0, count); + buffer->put(data, 0, count); + //control->endMessage(); + } + + virtual void lock() { + } + virtual void unlock() { + } +private: + char data[20]; + int count; +}; + +void testBlockingTCPSender() { + ContextImpl ctx; + BlockingTCPConnector connector(&ctx, 1024, 1.0); + + DummyTransportClient dtc; + DummyTransportSender dts; + DummyResponseHandler drh; + + osiSockAddr srvAddr; + + srvAddr.ia.sin_family = AF_INET; + //srvAddr.ia.sin_port = htons(CA_SERVER_PORT); + if(aToIPAddr("192.168.71.132", CA_SERVER_PORT, &srvAddr.ia)<0) { + cout<<"error in aToIPAddr(...)"<enqueueSendRequest(&dts); + sleep(1); + } + + delete transport; +} + +int main(int argc, char *argv[]) { + createFileLogger("testBlockingTCPClnt.log"); + + testBlockingTCPSender(); + return 0; +} diff --git a/testApp/remote/testBlockingTCPSrv.cpp b/testApp/remote/testBlockingTCPSrv.cpp new file mode 100644 index 0000000..704093a --- /dev/null +++ b/testApp/remote/testBlockingTCPSrv.cpp @@ -0,0 +1,53 @@ +/* + * testBlockingTCPSrv.cpp + * + * Created on: Jan 6, 2011 + * Author: Miha Vitorovic + */ + +#include "blockingTCP.h" +#include "remote.h" +#include "logger.h" + +#include + +using namespace epics::pvData; +using namespace epics::pvAccess; + +using std::cin; +using std::cout; + +class ContextImpl : public Context { +public: + ContextImpl() : + _tr(new TransportRegistry()), + _timer(new Timer("server thread", lowPriority)) {} + virtual ~ContextImpl() { + delete _tr; + delete _timer; + } + virtual Timer* getTimer() { return _timer; } + virtual TransportRegistry* getTransportRegistry() { return _tr; } +private: + TransportRegistry* _tr; + Timer* _timer; +}; + +void testServerConnections() { + ContextImpl ctx; + + BlockingTCPAcceptor* srv = new BlockingTCPAcceptor(&ctx, CA_SERVER_PORT, + 1024); + + cout<<"Press any key to stop the server..."; + char c = cin.peek(); + + delete srv; +} + +int main(int argc, char *argv[]) { + + createFileLogger("testBlockingTCPSrv.log"); + + testServerConnections(); +} diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp index 8437a62..cefed59 100644 --- a/testApp/remote/testBlockingUDPClnt.cpp +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -89,6 +89,8 @@ void testBlockingUDPSender() { sleep(1); } + delete transport; + } int main(int argc, char *argv[]) { diff --git a/testApp/utils/inetAddressUtilsTest.cpp b/testApp/utils/inetAddressUtilsTest.cpp index 16695e0..d6a55d4 100644 --- a/testApp/utils/inetAddressUtilsTest.cpp +++ b/testApp/utils/inetAddressUtilsTest.cpp @@ -32,20 +32,23 @@ int main(int argc, char *argv[]) { osiSockAddr* addr; addr = vec->at(0); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==555); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0x7F000001); + assert(addr->ia.sin_port==htons(555)); + assert(addr->ia.sin_addr.s_addr==htonl(0x7F000001)); + assert(inetAddressToString(addr)=="127.0.0.1:555"); cout<<'\t'<at(1); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==1234); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0x0A0A0C0B); + assert(addr->ia.sin_port==htons(1234)); + assert(addr->ia.sin_addr.s_addr==htonl(0x0A0A0C0B)); + assert(inetAddressToString(addr)=="10.10.12.11:1234"); cout<<'\t'<at(2); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==555); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0xC0A80304); + assert(addr->ia.sin_port==htons(555)); + assert(addr->ia.sin_addr.s_addr==htonl(0xC0A80304)); + assert(inetAddressToString(addr)=="192.168.3.4:555"); cout<<'\t'<at(0); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==6789); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0xAC1037A0); - cout<<'\t'<ia.sin_port==htons(6789)); + assert(addr->ia.sin_addr.s_addr==htonl(0xAC1037A0)); + assert(inetAddressToString(addr)=="172.16.55.160:6789"); +cout<<'\t'<at(1); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==555); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0x7F000001); + assert(addr->ia.sin_port==htons(555)); + assert(addr->ia.sin_addr.s_addr==htonl(0x7F000001)); + assert(inetAddressToString(addr)=="127.0.0.1:555"); cout<<'\t'<at(2); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==1234); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0x0A0A0C0B); + assert(addr->ia.sin_port==htons(1234)); + assert(addr->ia.sin_addr.s_addr==htonl(0x0A0A0C0B)); + assert(inetAddressToString(addr)=="10.10.12.11:1234"); cout<<'\t'<at(3); assert(addr->ia.sin_family==AF_INET); - assert(addr->ia.sin_port==555); - assert(addr->ia.sin_addr.s_addr==(uint32_t)0xC0A80304); + assert(addr->ia.sin_port==htons(555)); + assert(addr->ia.sin_addr.s_addr==htonl(0xC0A80304)); + assert(inetAddressToString(addr)=="192.168.3.4:555"); cout<<'\t'<ia.sin_family==AF_INET); + assert(inetAddressToString(addr)=="127.0.0.1"); cout<<'\t'<ia.sin_family==AF_INET); + assert(inetAddressToString(addr)=="10.10.12.11"); cout<<'\t'<getArray(),src,16)==0); + assert(strncmp(buff->getArray(), src, 16)==0); cout<<"\nPASSED!\n"; // TODO add test for 'getBroadcastAddresses'