Removed socket polling from transports.

This commit is contained in:
miha_vitorovic
2011-01-12 14:01:54 +01:00
parent f72e495a99
commit 2565191fb7
7 changed files with 118 additions and 181 deletions

View File

@@ -21,7 +21,6 @@
/* standard */
#include <sstream>
#include <poll.h>
using std::ostringstream;
@@ -116,7 +115,7 @@ namespace epics {
}
}
retval = ::listen(_serverSocketChannel, 5);
retval = ::listen(_serverSocketChannel, 1024);
if(retval<0) {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
@@ -157,99 +156,72 @@ namespace epics {
bool socketOpen = true;
char strBuffer[64];
pollfd sockets[1];
sockets[0].fd = _serverSocketChannel;
sockets[0].events = POLLIN;
while(!_destroyed&&socketOpen) {
int retval = ::poll(sockets, 1, 50);
if(retval<0) {
// error in poll
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMajor, "socket poll error: %s",
strBuffer);
socketOpen = false;
}
else if(retval>0) {
// some event on a socket
if((sockets[0].revents&POLLIN)!=0) {
// connection waiting
osiSockAddr address;
osiSocklen_t len = sizeof(sockaddr);
osiSockAddr address;
osiSocklen_t len = sizeof(sockaddr);
SOCKET newClient = epicsSocketAccept(_serverSocketChannel,
&address.sa, &len);
if(newClient!=INVALID_SOCKET) {
// accept succeeded
ipAddrToDottedIP(&address.ia, ipAddrStr, sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo,
"Accepted connection from CA client: %s", ipAddrStr);
SOCKET newClient = epicsSocketAccept(
_serverSocketChannel, &address.sa, &len);
if(newClient!=INVALID_SOCKET) {
// accept succeeded
ipAddrToDottedIP(&address.ia, ipAddrStr,
sizeof(ipAddrStr));
errlogSevPrintf(errlogInfo,
"Accepted connection from CA client: %s",
ipAddrStr);
// enable TCP_NODELAY (disable Nagle's algorithm)
int optval = 1; // true
retval = ::setsockopt(newClient, IPPROTO_TCP,
TCP_NODELAY, &optval, sizeof(int));
if(retval<0) {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMinor,
"Error setting TCP_NODELAY: %s",
strBuffer);
}
// enable TCP_KEEPALIVE
retval = ::setsockopt(newClient, SOL_SOCKET,
SO_KEEPALIVE, &optval, sizeof(int));
if(retval<0) {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMinor,
"Error setting SO_KEEPALIVE: %s",
strBuffer);
}
// TODO tune buffer sizes?!
//socket.socket().setReceiveBufferSize();
//socket.socket().setSendBufferSize();
/* create transport
* each transport should have its own response
* handler since it is not "shareable"
*/
BlockingServerTCPTransport
* transport =
new BlockingServerTCPTransport(
_context,
newClient,
new ServerResponseHandler(
(ServerContextImpl*)_context),
_receiveBufferSize);
// validate connection
if(!validateConnection(transport, ipAddrStr)) {
transport->close(true);
errlogSevPrintf(
errlogInfo,
"Connection to CA client %s failed to be validated, closing it.",
ipAddrStr);
return;
}
errlogSevPrintf(errlogInfo,
"Serving to CA client: %s", ipAddrStr);
}// accept succeeded
} // connection waiting
if((sockets[0].revents&(POLLERR|POLLHUP|POLLNVAL))!=0) {
errlogSevPrintf(errlogMajor,
"error on a socket: POLLERR|POLLHUP|POLLNVAL");
socketOpen = false;
// enable TCP_NODELAY (disable Nagle's algorithm)
int optval = 1; // true
int retval = ::setsockopt(newClient, IPPROTO_TCP,
TCP_NODELAY, &optval, sizeof(int));
if(retval<0) {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMinor,
"Error setting TCP_NODELAY: %s", strBuffer);
}
} // some event on a socket
// enable TCP_KEEPALIVE
retval = ::setsockopt(newClient, SOL_SOCKET, SO_KEEPALIVE,
&optval, sizeof(int));
if(retval<0) {
epicsSocketConvertErrnoToString(strBuffer,
sizeof(strBuffer));
errlogSevPrintf(errlogMinor,
"Error setting SO_KEEPALIVE: %s", strBuffer);
}
// TODO tune buffer sizes?!
//socket.socket().setReceiveBufferSize();
//socket.socket().setSendBufferSize();
/* create transport
* each transport should have its own response
* handler since it is not "shareable"
*/
BlockingServerTCPTransport
* transport =
new BlockingServerTCPTransport(
_context,
newClient,
new ServerResponseHandler(
dynamic_cast<ServerContextImpl*> (_context)),
_receiveBufferSize);
// validate connection
if(!validateConnection(transport, ipAddrStr)) {
transport->close(true);
errlogSevPrintf(
errlogInfo,
"Connection to CA client %s failed to be validated, closing it.",
ipAddrStr);
return;
}
errlogSevPrintf(errlogInfo, "Serving to CA client: %s",
ipAddrStr);
}// accept succeeded
else
socketOpen = false;
} // while
}

View File

@@ -108,10 +108,8 @@ namespace epics {
socket = tryConnect(address, 3);
// verify
if(socket==INVALID_SOCKET) {
errlogSevPrintf(
errlogMajor,
"Connection to CA server %s failed.",
ipAddrStr);
errlogSevPrintf(errlogMajor,
"Connection to CA server %s failed.", ipAddrStr);
ostringstream temp;
temp<<"Failed to verify TCP connection to '"<<ipAddrStr
<<"'.";
@@ -148,6 +146,7 @@ namespace epics {
// verify
if(!transport->waitUntilVerified(3.0)) {
transport->close(true);
socket = INVALID_SOCKET;
errlogSevPrintf(
errlogMinor,
"Connection to CA server %s failed to be validated, closing it.",

View File

@@ -24,7 +24,6 @@
#include <cstdio>
#include <sys/types.h>
#include <sys/socket.h>
#include <poll.h>
#include <errno.h>
namespace epics {
@@ -36,11 +35,15 @@ namespace epics {
ResponseHandler* responseHandler, SOCKET channel,
osiSockAddr& bindAddress, InetAddrVector* sendAddresses,
short remoteTransportRevision) :
_closed(false), _responseHandler(responseHandler),
_channel(channel), _sendAddresses(sendAddresses),
_ignoredAddresses(NULL), _sendTo(NULL), _receiveBuffer(
new ByteBuffer(MAX_UDP_RECV, EPICS_ENDIAN_BIG)), _sendBuffer(
new ByteBuffer(MAX_UDP_RECV, EPICS_ENDIAN_BIG)),
_closed(false),
_responseHandler(responseHandler),
_channel(channel),
_sendAddresses(sendAddresses),
_ignoredAddresses(NULL),
_sendTo(NULL),
_receiveBuffer(new ByteBuffer(MAX_UDP_RECV,
EPICS_ENDIAN_BIG)),
_sendBuffer(new ByteBuffer(MAX_UDP_RECV, EPICS_ENDIAN_BIG)),
_lastMessageStartPosition(0), _readBuffer(
new char[MAX_UDP_RECV]), _mutex(new Mutex()),
_threadId(NULL) {
@@ -125,10 +128,6 @@ namespace epics {
// This function is always called from only one thread - this
// object's own thread.
pollfd pfd;
pfd.fd = _channel;
pfd.events = POLLIN;
osiSockAddr fromAddress;
try {
@@ -136,79 +135,44 @@ namespace epics {
while(!_closed) {
// we poll to prevent blocking indefinitely
/* From 'accept' man page:
* In order to be notified of incoming connections on
* a socket, you can use select(2) or poll(2). A readable
* event will be delivered when a new connection is
* attempted and you may then call accept() to get a
* socket for that connection.
*/
// data ready to be read
_receiveBuffer->clear();
int retval = poll(&pfd, 1, 100);
socklen_t addrStructSize = sizeof(sockaddr);
if(_closed) break; // if the dtor was called during wait
// none of the object properties are no longer valid.
int bytesRead = recvfrom(_channel, _readBuffer,
MAX_UDP_RECV, 0, (sockaddr*)&fromAddress,
&addrStructSize);
if(retval>0) {
// activity on SOCKET
if(pfd.revents&POLLIN) {
// data ready to be read
_receiveBuffer->clear();
socklen_t addrStructSize = sizeof(sockaddr);
int bytesRead = recvfrom(_channel, _readBuffer,
MAX_UDP_RECV, 0, (sockaddr*)&fromAddress,
&addrStructSize);
if(bytesRead>0) {
// successfully got datagram
bool ignore = false;
if(_ignoredAddresses!=NULL) for(size_t i = 0; i
<_ignoredAddresses->size(); i++)
if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr
==fromAddress.ia.sin_addr.s_addr) {
ignore = true;
break;
}
if(!ignore) {
_receiveBuffer->put(
_readBuffer,
0,
bytesRead
<_receiveBuffer->getRemaining() ? bytesRead
: _receiveBuffer->getRemaining());
_receiveBuffer->flip();
processBuffer(fromAddress, _receiveBuffer);
}
if(bytesRead>0) {
// successfully got datagram
bool ignore = false;
if(_ignoredAddresses!=NULL) for(size_t i = 0; i
<_ignoredAddresses->size(); i++)
if(_ignoredAddresses->at(i)->ia.sin_addr.s_addr
==fromAddress.ia.sin_addr.s_addr) {
ignore = true;
break;
}
else {
// log a 'recvfrom' error
if(bytesRead==-1) errlogSevPrintf(errlogMajor,
"Socket recv error: %s",
strerror(errno));
}
}
else {
// error (POLLERR, POLLHUP, or POLLNVAL)
if(pfd.revents&POLLERR) errlogSevPrintf(
errlogMajor, "Socket poll error (POLLERR)");
if(pfd.revents&POLLHUP) errlogSevPrintf(
errlogMinor, "Socket poll error (POLLHUP)");
if(pfd.revents&POLLNVAL) errlogSevPrintf(
errlogMajor,
"Socket poll error: server socket no longer bound.");
if(!ignore) {
_receiveBuffer->put(_readBuffer, 0, bytesRead
<_receiveBuffer->getRemaining() ? bytesRead
: _receiveBuffer->getRemaining());
_receiveBuffer->flip();
processBuffer(fromAddress, _receiveBuffer);
}
}
else {
// 0 == socket close
// retval == 0 : timeout
// log a 'recvfrom' error
if(bytesRead==-1) errlogSevPrintf(errlogMajor,
"Socket recv error: %s", strerror(errno));
}
// retval < 0 : error
if(retval<0) errlogSevPrintf(errlogMajor,
"Socket poll error: %s", strerror(errno));
}
} catch(...) {
// TODO: catch all exceptions, and act accordingly

View File

@@ -88,11 +88,11 @@ namespace epics {
int payloadSize, ByteBuffer* payloadBuffer) {
if(command<0||command>=HANDLER_TABLE_LENGTH) {
errlogSevPrintf(errlogMinor,
"Invalid (or unsupported) command: %d.", command);
"Invalid (or unsupported) command: %x.", (0xFF&command));
// TODO remove debug output
ostringstream name;
name<<"Invalid CA header "<<command;
name<<" + , its payload buffer";
name<<"Invalid CA header "<<hex<<(int)(0xFF&command);
name<<", its payload buffer";
hexDump(name.str(), (const int8*)payloadBuffer->getArray(),
payloadBuffer->getPosition(), payloadSize);

View File

@@ -92,7 +92,8 @@ public:
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
// send the packet
count++;
control->startMessage(0, count);
// using invalid command to force msg dump
control->startMessage(0xC0, count);
buffer->put(data, 0, count);
//control->endMessage();
}

View File

@@ -9,6 +9,7 @@
#include "remote.h"
#include "logger.h"
#include "configuration.h"
#include "serverContext.h"
#include <iostream>
@@ -18,7 +19,7 @@ using namespace epics::pvAccess;
using std::cin;
using std::cout;
class ContextImpl : public Context {
class ContextImpl : public ServerContextImpl {
public:
ContextImpl() :
_tr(new TransportRegistry()),

View File

@@ -78,12 +78,6 @@ public:
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
// SRV_IP defined at the top of the this file
if(aToIPAddr(SRV_IP, 65000, &sendTo.ia)<0) {
cout<<"error in aToIPAddr(...)"<<endl;
return;
}
control->setRecipient(sendTo);
// send the packet
@@ -117,6 +111,12 @@ void testBlockingUDPSender() {
Transport* transport = connector.connect(NULL, &drh, bindAddr, 1, 50);
// SRV_IP defined at the top of the this file
if(aToIPAddr(SRV_IP, 65000, &sendTo.ia)<0) {
cout<<"error in aToIPAddr(...)"<<endl;
return;
}
cout<<"Sending 10 packets..."<<endl;
for(int i = 0; i<10; i++) {