From 2565191fb770e90a13a6fd1f4c7dab5e77272bd2 Mon Sep 17 00:00:00 2001 From: miha_vitorovic Date: Wed, 12 Jan 2011 14:01:54 +0100 Subject: [PATCH] Removed socket polling from transports. --- pvAccessApp/remote/blockingTCPAcceptor.cpp | 154 ++++++++------------ pvAccessApp/remote/blockingTCPConnector.cpp | 7 +- pvAccessApp/remote/blockingUDPTransport.cpp | 114 +++++---------- pvAccessApp/server/responseHandlers.cpp | 6 +- testApp/remote/testBlockingTCPClnt.cpp | 3 +- testApp/remote/testBlockingTCPSrv.cpp | 3 +- testApp/remote/testBlockingUDPClnt.cpp | 12 +- 7 files changed, 118 insertions(+), 181 deletions(-) diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index 10c5f21..33368cb 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -21,7 +21,6 @@ /* standard */ #include -#include using std::ostringstream; @@ -116,7 +115,7 @@ namespace epics { } } - retval = ::listen(_serverSocketChannel, 5); + retval = ::listen(_serverSocketChannel, 1024); if(retval<0) { epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer)); @@ -157,99 +156,72 @@ namespace epics { bool socketOpen = true; char strBuffer[64]; - pollfd sockets[1]; - sockets[0].fd = _serverSocketChannel; - sockets[0].events = POLLIN; - while(!_destroyed&&socketOpen) { - int retval = ::poll(sockets, 1, 50); - if(retval<0) { - // error in poll - epicsSocketConvertErrnoToString(strBuffer, - sizeof(strBuffer)); - errlogSevPrintf(errlogMajor, "socket poll error: %s", - strBuffer); - socketOpen = false; - } - else if(retval>0) { - // some event on a socket - if((sockets[0].revents&POLLIN)!=0) { - // connection waiting + osiSockAddr address; + osiSocklen_t len = sizeof(sockaddr); - osiSockAddr address; - osiSocklen_t len = sizeof(sockaddr); + SOCKET newClient = epicsSocketAccept(_serverSocketChannel, + &address.sa, &len); + if(newClient!=INVALID_SOCKET) { + // accept succeeded + ipAddrToDottedIP(&address.ia, ipAddrStr, sizeof(ipAddrStr)); + errlogSevPrintf(errlogInfo, + "Accepted connection from CA client: %s", ipAddrStr); - SOCKET newClient = epicsSocketAccept( - _serverSocketChannel, &address.sa, &len); - if(newClient!=INVALID_SOCKET) { - // accept succeeded - ipAddrToDottedIP(&address.ia, ipAddrStr, - sizeof(ipAddrStr)); - errlogSevPrintf(errlogInfo, - "Accepted connection from CA client: %s", - ipAddrStr); - - // enable TCP_NODELAY (disable Nagle's algorithm) - int optval = 1; // true - retval = ::setsockopt(newClient, IPPROTO_TCP, - TCP_NODELAY, &optval, sizeof(int)); - if(retval<0) { - epicsSocketConvertErrnoToString(strBuffer, - sizeof(strBuffer)); - errlogSevPrintf(errlogMinor, - "Error setting TCP_NODELAY: %s", - strBuffer); - } - - // enable TCP_KEEPALIVE - retval = ::setsockopt(newClient, SOL_SOCKET, - SO_KEEPALIVE, &optval, sizeof(int)); - if(retval<0) { - epicsSocketConvertErrnoToString(strBuffer, - sizeof(strBuffer)); - errlogSevPrintf(errlogMinor, - "Error setting SO_KEEPALIVE: %s", - strBuffer); - } - - // TODO tune buffer sizes?! - //socket.socket().setReceiveBufferSize(); - //socket.socket().setSendBufferSize(); - - /* create transport - * each transport should have its own response - * handler since it is not "shareable" - */ - BlockingServerTCPTransport - * transport = - new BlockingServerTCPTransport( - _context, - newClient, - new ServerResponseHandler( - (ServerContextImpl*)_context), - _receiveBufferSize); - - // validate connection - if(!validateConnection(transport, ipAddrStr)) { - transport->close(true); - errlogSevPrintf( - errlogInfo, - "Connection to CA client %s failed to be validated, closing it.", - ipAddrStr); - return; - } - - errlogSevPrintf(errlogInfo, - "Serving to CA client: %s", ipAddrStr); - - }// accept succeeded - } // connection waiting - if((sockets[0].revents&(POLLERR|POLLHUP|POLLNVAL))!=0) { - errlogSevPrintf(errlogMajor, - "error on a socket: POLLERR|POLLHUP|POLLNVAL"); - socketOpen = false; + // enable TCP_NODELAY (disable Nagle's algorithm) + int optval = 1; // true + int retval = ::setsockopt(newClient, IPPROTO_TCP, + TCP_NODELAY, &optval, sizeof(int)); + if(retval<0) { + epicsSocketConvertErrnoToString(strBuffer, + sizeof(strBuffer)); + errlogSevPrintf(errlogMinor, + "Error setting TCP_NODELAY: %s", strBuffer); } - } // some event on a socket + + // enable TCP_KEEPALIVE + retval = ::setsockopt(newClient, SOL_SOCKET, SO_KEEPALIVE, + &optval, sizeof(int)); + if(retval<0) { + epicsSocketConvertErrnoToString(strBuffer, + sizeof(strBuffer)); + errlogSevPrintf(errlogMinor, + "Error setting SO_KEEPALIVE: %s", strBuffer); + } + + // TODO tune buffer sizes?! + //socket.socket().setReceiveBufferSize(); + //socket.socket().setSendBufferSize(); + + /* create transport + * each transport should have its own response + * handler since it is not "shareable" + */ + BlockingServerTCPTransport + * transport = + new BlockingServerTCPTransport( + _context, + newClient, + new ServerResponseHandler( + dynamic_cast (_context)), + _receiveBufferSize); + + // validate connection + if(!validateConnection(transport, ipAddrStr)) { + transport->close(true); + errlogSevPrintf( + errlogInfo, + "Connection to CA client %s failed to be validated, closing it.", + ipAddrStr); + return; + } + + errlogSevPrintf(errlogInfo, "Serving to CA client: %s", + ipAddrStr); + + }// accept succeeded + else + socketOpen = false; } // while } diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index 5c3b021..024654d 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -108,10 +108,8 @@ namespace epics { socket = tryConnect(address, 3); // verify if(socket==INVALID_SOCKET) { - errlogSevPrintf( - errlogMajor, - "Connection to CA server %s failed.", - ipAddrStr); + errlogSevPrintf(errlogMajor, + "Connection to CA server %s failed.", ipAddrStr); ostringstream temp; temp<<"Failed to verify TCP connection to '"<waitUntilVerified(3.0)) { transport->close(true); + socket = INVALID_SOCKET; errlogSevPrintf( errlogMinor, "Connection to CA server %s failed to be validated, closing it.", diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index bb61ea9..7e5cba5 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include namespace epics { @@ -36,11 +35,15 @@ namespace epics { ResponseHandler* responseHandler, SOCKET channel, osiSockAddr& bindAddress, InetAddrVector* sendAddresses, short remoteTransportRevision) : - _closed(false), _responseHandler(responseHandler), - _channel(channel), _sendAddresses(sendAddresses), - _ignoredAddresses(NULL), _sendTo(NULL), _receiveBuffer( - new ByteBuffer(MAX_UDP_RECV, EPICS_ENDIAN_BIG)), _sendBuffer( - new ByteBuffer(MAX_UDP_RECV, EPICS_ENDIAN_BIG)), + _closed(false), + _responseHandler(responseHandler), + _channel(channel), + _sendAddresses(sendAddresses), + _ignoredAddresses(NULL), + _sendTo(NULL), + _receiveBuffer(new ByteBuffer(MAX_UDP_RECV, + EPICS_ENDIAN_BIG)), + _sendBuffer(new ByteBuffer(MAX_UDP_RECV, EPICS_ENDIAN_BIG)), _lastMessageStartPosition(0), _readBuffer( new char[MAX_UDP_RECV]), _mutex(new Mutex()), _threadId(NULL) { @@ -125,10 +128,6 @@ namespace epics { // This function is always called from only one thread - this // object's own thread. - pollfd pfd; - pfd.fd = _channel; - pfd.events = POLLIN; - osiSockAddr fromAddress; try { @@ -136,79 +135,44 @@ namespace epics { while(!_closed) { // we poll to prevent blocking indefinitely - /* From 'accept' man page: - * In order to be notified of incoming connections on - * a socket, you can use select(2) or poll(2). A readable - * event will be delivered when a new connection is - * attempted and you may then call accept() to get a - * socket for that connection. - */ + // data ready to be read + _receiveBuffer->clear(); - int retval = poll(&pfd, 1, 100); + socklen_t addrStructSize = sizeof(sockaddr); - if(_closed) break; // if the dtor was called during wait - // none of the object properties are no longer valid. + int bytesRead = recvfrom(_channel, _readBuffer, + MAX_UDP_RECV, 0, (sockaddr*)&fromAddress, + &addrStructSize); - if(retval>0) { - // activity on SOCKET - if(pfd.revents&POLLIN) { - // data ready to be read - _receiveBuffer->clear(); - - socklen_t addrStructSize = sizeof(sockaddr); - - int bytesRead = recvfrom(_channel, _readBuffer, - MAX_UDP_RECV, 0, (sockaddr*)&fromAddress, - &addrStructSize); - - if(bytesRead>0) { - // successfully got datagram - bool ignore = false; - if(_ignoredAddresses!=NULL) for(size_t i = 0; i - <_ignoredAddresses->size(); i++) - if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr - ==fromAddress.ia.sin_addr.s_addr) { - ignore = true; - break; - } - - if(!ignore) { - _receiveBuffer->put( - _readBuffer, - 0, - bytesRead - <_receiveBuffer->getRemaining() ? bytesRead - : _receiveBuffer->getRemaining()); - - _receiveBuffer->flip(); - - processBuffer(fromAddress, _receiveBuffer); - } + if(bytesRead>0) { + // successfully got datagram + bool ignore = false; + if(_ignoredAddresses!=NULL) for(size_t i = 0; i + <_ignoredAddresses->size(); i++) + if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr + ==fromAddress.ia.sin_addr.s_addr) { + ignore = true; + break; } - else { - // log a 'recvfrom' error - if(bytesRead==-1) errlogSevPrintf(errlogMajor, - "Socket recv error: %s", - strerror(errno)); - } - } - else { - // error (POLLERR, POLLHUP, or POLLNVAL) - if(pfd.revents&POLLERR) errlogSevPrintf( - errlogMajor, "Socket poll error (POLLERR)"); - if(pfd.revents&POLLHUP) errlogSevPrintf( - errlogMinor, "Socket poll error (POLLHUP)"); - if(pfd.revents&POLLNVAL) errlogSevPrintf( - errlogMajor, - "Socket poll error: server socket no longer bound."); + + if(!ignore) { + _receiveBuffer->put(_readBuffer, 0, bytesRead + <_receiveBuffer->getRemaining() ? bytesRead + : _receiveBuffer->getRemaining()); + + _receiveBuffer->flip(); + + processBuffer(fromAddress, _receiveBuffer); } } + else { + // 0 == socket close - // retval == 0 : timeout + // log a 'recvfrom' error + if(bytesRead==-1) errlogSevPrintf(errlogMajor, + "Socket recv error: %s", strerror(errno)); + } - // retval < 0 : error - if(retval<0) errlogSevPrintf(errlogMajor, - "Socket poll error: %s", strerror(errno)); } } catch(...) { // TODO: catch all exceptions, and act accordingly diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index b96c659..2e65cbe 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -88,11 +88,11 @@ namespace epics { int payloadSize, ByteBuffer* payloadBuffer) { if(command<0||command>=HANDLER_TABLE_LENGTH) { errlogSevPrintf(errlogMinor, - "Invalid (or unsupported) command: %d.", command); + "Invalid (or unsupported) command: %x.", (0xFF&command)); // TODO remove debug output ostringstream name; - name<<"Invalid CA header "<getArray(), payloadBuffer->getPosition(), payloadSize); diff --git a/testApp/remote/testBlockingTCPClnt.cpp b/testApp/remote/testBlockingTCPClnt.cpp index 4340caa..0cacc8f 100644 --- a/testApp/remote/testBlockingTCPClnt.cpp +++ b/testApp/remote/testBlockingTCPClnt.cpp @@ -92,7 +92,8 @@ public: virtual void send(ByteBuffer* buffer, TransportSendControl* control) { // send the packet count++; - control->startMessage(0, count); + // using invalid command to force msg dump + control->startMessage(0xC0, count); buffer->put(data, 0, count); //control->endMessage(); } diff --git a/testApp/remote/testBlockingTCPSrv.cpp b/testApp/remote/testBlockingTCPSrv.cpp index c7ced35..7d086e3 100644 --- a/testApp/remote/testBlockingTCPSrv.cpp +++ b/testApp/remote/testBlockingTCPSrv.cpp @@ -9,6 +9,7 @@ #include "remote.h" #include "logger.h" #include "configuration.h" +#include "serverContext.h" #include @@ -18,7 +19,7 @@ using namespace epics::pvAccess; using std::cin; using std::cout; -class ContextImpl : public Context { +class ContextImpl : public ServerContextImpl { public: ContextImpl() : _tr(new TransportRegistry()), diff --git a/testApp/remote/testBlockingUDPClnt.cpp b/testApp/remote/testBlockingUDPClnt.cpp index 7fb490c..2117906 100644 --- a/testApp/remote/testBlockingUDPClnt.cpp +++ b/testApp/remote/testBlockingUDPClnt.cpp @@ -78,12 +78,6 @@ public: } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - // SRV_IP defined at the top of the this file - if(aToIPAddr(SRV_IP, 65000, &sendTo.ia)<0) { - cout<<"error in aToIPAddr(...)"<setRecipient(sendTo); // send the packet @@ -117,6 +111,12 @@ void testBlockingUDPSender() { Transport* transport = connector.connect(NULL, &drh, bindAddr, 1, 50); + // SRV_IP defined at the top of the this file + if(aToIPAddr(SRV_IP, 65000, &sendTo.ia)<0) { + cout<<"error in aToIPAddr(...)"<