/** * Asynchronous networking for SICS and other programs. This module centrally manages * a number of network connections for a client program. It is a layer between the * program and the network which manages non blocking network I/O. To this purpose, the * client program has to call ANETprocess at convenient intervalls. This module * has a couple of features: * - Connections are abstracted to handles which are guranteed to be unique * rather then socket numbers. Socket numbers may be reused by the OS. * - This module allows upper level code to figure out if a connection is still * connected or not. * - This module introduces a buffer layer between the socket and the application. * Thus the upper layer does not have to worry much about I/O blocking. This * is taken care of by this module both for reading and writing. * - All I/O is non blocking. * - This module can detect if a client is hanging and close the connection then. * Hanging is detected by not being able to write to the client for some period * of time. * * copyright: see file COPYRIGHT * * Mark Koennecke, January 2009 */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "asynnet.h" #include "rwpuffer.h" /*--------------------------------------------------------------------------*/ #define SERVERSOCKET 0 #define DATASOCKET 1 #define MAXCONNECTIONS 1024 #define RBUFFERSIZE 262144 /* 256kb */ #define WBUFFERSIZE 20*262144 /* #define WBUFFERSIZE 100*262144 */ #define MAXWBUFFERSIZE 128*1000*1024 /*--------------------------------------------------------------------------*/ typedef struct { int socket; int handle; int type; prwBuffer readBuffer; prwBuffer writeBuffer; time_t lastOpenForWrite; ANETcallback readCallback; void *userData; ANETkill killUser; char host[132]; } SocketDescriptor, *pSocketDescriptor; static SocketDescriptor connections[MAXCONNECTIONS]; static int noConnections = 0; static unsigned int handleID = 0; /*------------------------------------------------------------------------*/ static int SocketCompare(const void *s1, const void *s2) { pSocketDescriptor socke1, socke2; socke1 = (pSocketDescriptor) s1; socke2 = (pSocketDescriptor) s2; return socke1->handle - socke2->handle; } /*------------------------------------------------------------------------*/ static void sortConnections() { qsort(connections, noConnections, sizeof(SocketDescriptor), SocketCompare); } /*------------------------------------------------------------------------*/ static pSocketDescriptor findSocketDescriptor(int handle) { SocketDescriptor key; pSocketDescriptor result; key.handle = handle; result = bsearch(&key, connections, noConnections, sizeof(SocketDescriptor), SocketCompare); return result; } /*------------------------------------------------------------------------*/ static ANETlog logOutput = NULL; static void *logUserData = NULL; /*------------------------------------------------------------------------*/ static void anetLog(int level, char *fmt, ...) { va_list ap; char buf[256]; char *text = NULL; int l; if (logOutput == NULL) { return; } va_start(ap, fmt); l = vsnprintf(buf, sizeof buf, fmt, ap); va_end(ap); if (l < sizeof buf) { text = buf; logOutput(level, text, logUserData); } else { /* assuming we have a C99 conforming snprintf and need a larger buffer */ text = calloc(l, 1); va_start(ap, fmt); vsnprintf(text, l, fmt, ap); va_end(ap); logOutput(level, text, logUserData); free(text); } } /*============= public interface =========================================*/ void ANETsetLog(ANETlog lcb, void *userData) { logOutput = lcb; logUserData = userData; } /*------------------------------------------------------------------------*/ int ANETopenServerPort(int iPort, ANETcallback cb, void *userData) { SocketDescriptor socke; int i = 1, status; struct sockaddr_in addresse; assert(iPort > 0); assert(cb != NULL); memset(&socke, 0, sizeof(SocketDescriptor)); socke.socket = socket(AF_INET, SOCK_STREAM, 0); if (socke.socket < 0) { anetLog(ANETERROR, "Failed to open server port: socket: %d", iPort); return ANETOPENFAIL; } status = setsockopt(socke.socket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int)); if (status < 0) { anetLog(ANETERROR, "Failed to open server port: setsockopt: %d, errno = %d", iPort, errno); return ANETOPENFAIL; } memset(&addresse, 0, sizeof(struct sockaddr_in)); addresse.sin_family = AF_INET; addresse.sin_addr.s_addr = htonl(INADDR_ANY); addresse.sin_port = htons(iPort); status = bind(socke.socket, (struct sockaddr *) &addresse, sizeof(struct sockaddr_in)); if (status < 0) { anetLog(ANETERROR, "Failed to open server port: bind: %d, errno = %d", iPort, errno); return ANETOPENFAIL; } status = listen(socke.socket, 8); if (status < 0) { anetLog(ANETERROR, "Failed to open server port: listen: %d", iPort); return ANETOPENFAIL; } socke.type = SERVERSOCKET; socke.handle = handleID; handleID++; socke.readCallback = cb; socke.userData = userData; socke.lastOpenForWrite = time(NULL); connections[noConnections] = socke; noConnections++; sortConnections(); anetLog(ANETCON, "Opened server port %d", iPort); return socke.handle; } /*-----------------------------------------------------------------------*/ int ANETregisterSocket(int socket) { SocketDescriptor socke; int flags, status; if (noConnections >= MAXCONNECTIONS) { anetLog(ANETERROR, "Maximum number of connections exceeded"); return ANETOUTOFSOCKETS; } memset(&socke, 0, sizeof(SocketDescriptor)); flags = fcntl(socket, F_GETFL, 0); status = fcntl(socket, F_SETFL, flags | O_NONBLOCK); if (status < 0) { return ANETSOCKERROR; } flags =1; setsockopt(socket,IPPROTO_TCP,TCP_NODELAY,(char *) &flags, sizeof(int)); socke.readBuffer = MakeRWPuffer(RBUFFERSIZE); socke.writeBuffer = MakeBigRWPuffer(WBUFFERSIZE, MAXWBUFFERSIZE); if (socke.readBuffer == NULL || socke.writeBuffer == NULL) { return ANETMEM; } socke.socket = socket; socke.handle = handleID; handleID++; socke.type = DATASOCKET; socke.lastOpenForWrite = time(NULL); connections[noConnections] = socke; noConnections++; sortConnections(); return socke.handle; } /*-------------------------------------------------------------------------*/ int ANETconnect(char *name, int iPort) { struct in_addr addr; struct sockaddr_in addresse; struct hostent *host; int socke, status; memset(&addresse, 0, sizeof(struct sockaddr_in)); addresse.sin_family = AF_INET; addresse.sin_port = htons((unsigned short)(iPort &0xFFFF)); host = gethostbyname(name); if (host != NULL) { memcpy((char *) &addr, (char *)host->h_addr_list[0], (size_t)host->h_length); } else { /* check for aaa.bbbb.ccc.dddd */ addr.s_addr = inet_addr(name); if(addr.s_addr == (unsigned int) -1) { anetLog(ANETERROR, "Failed to locate host: %s", name); return ANETOPENFAIL; } } addresse.sin_addr.s_addr = addr.s_addr; socke = socket(AF_INET, SOCK_STREAM, 0); status = connect(socke, (struct sockaddr *) &addresse, sizeof(struct sockaddr_in)); if (status < 0) { close(socke); anetLog(ANETERROR, "Failed to open socket to %s:%d", name, iPort); return ANETOPENFAIL; } anetLog(ANETCON, "Opened socket %d to %s:%d", socke, name, iPort); return ANETregisterSocket(socke); } /*--------------------------------------------------------------------------*/ void ANETclose(int handle) { pSocketDescriptor socke = NULL; socke = findSocketDescriptor(handle); if (socke == NULL) { return; } close(socke->socket); anetLog(ANETCON, "Closed socket %d", socke->socket); if (socke->readBuffer != NULL) { KillRWBuffer(socke->readBuffer); } if (socke->writeBuffer != NULL) { KillRWBuffer(socke->writeBuffer); } if (socke->userData && socke->killUser) { socke->killUser(socke->userData); } if (noConnections > 1) { *socke = connections[noConnections - 1]; noConnections--; sortConnections(); } else { noConnections = 0; } } /*--------------------------------------------------------------------------*/ static int anetWrite(SocketDescriptor con) { int status, length; void *pPtr; pPtr = GetRWBufferData(con.writeBuffer, &length); if (length > 0) { status = send(con.socket, pPtr, length, 0); if (status < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { return 1; } ANETclose(con.handle); return 0; } RemoveRWBufferData(con.writeBuffer, status); } return 1; } /*-------------------------------------------------------------------------- * I have seen that select did not report the write possibility set, * though the send buffer was empty. Thus I try to write if data is there * and leave the lastOpenForWrite flag only when the socket would block * on write. *--------------------------------------------------------------------------*/ static int anetTestWrite(SocketDescriptor con) { int status, length; void *pPtr; time_t lastTime; if(con.type != DATASOCKET){ return 1; } lastTime = con.lastOpenForWrite; con.lastOpenForWrite = time(NULL); pPtr = GetRWBufferData(con.writeBuffer, &length); if (length > 0) { status = send(con.socket, pPtr, length, 0); if (status < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { con.lastOpenForWrite = lastTime; return 1; } ANETclose(con.handle); return 0; } RemoveRWBufferData(con.writeBuffer, status); } return 1; } /*-------------------------------------------------------------------------- There is something implicit in the return here: anetRead must return 0 when the list of sockets changes. Otherwise ANETprocess cannot detect that it has an invalid socket list and jump out. In all other cases anetRead must return 1 -------------------------------------------------------------------------------*/ static int anetRead(SocketDescriptor con) { int socke, handle, status; unsigned int len; struct sockaddr addresse; char buffer[8192]; switch (con.type) { case SERVERSOCKET: len = sizeof(struct sockaddr); socke = accept(con.socket, &addresse, &len); if (socke < 0) { return 1; } handle = ANETregisterSocket(socke); if (handle > 0) { status = con.readCallback(handle, con.userData); if (status != 1) { ANETclose(handle); return 0; } } anetLog(ANETCON, "Accepted socket %d on port %d, handle %d", socke, con.socket, handle); break; case DATASOCKET: memset(buffer, 0, 8192); status = recv(con.socket, buffer, 8192, 0); if (status < 0) { if (errno == EAGAIN) { return 1; } ANETclose(con.handle); return 0; } else if (status == 0) { /* this means EOF */ ANETclose(con.handle); return 0; } else { status = StoreRWBuffer(con.readBuffer, buffer, status); if (status != 1) { anetLog(ANETERROR, "Read buffer overrun at handle %d, socket %d", con.handle, con.socket); } if (con.readCallback != NULL) { con.readCallback(con.handle, con.userData); } } break; } return 1; } /*---------------------------------------------------------------------------*/ void ANETprocess(void) { int i, status, count = 0, socke = 0, length; fd_set readMask, writeMask; struct timeval tmo = { 0, 10000 }; FD_ZERO(&readMask); FD_ZERO(&writeMask); for (i = 0; i < noConnections; i++) { socke = connections[i].socket; FD_SET(socke, &readMask); if (connections[i].writeBuffer != NULL) { GetRWBufferData(connections[i].writeBuffer, &length); if (length > 0) FD_SET(socke, &writeMask); else connections[i].lastOpenForWrite = time(NULL); } if (socke > count) { count = socke; } } count++; status = select(count, &readMask, &writeMask, NULL, &tmo); if (status < 0) { if (errno == EINTR) { return; } return; } /** * I always jump out of this loop when a socket is created or closed * because then the order in the connections array is no longer valid. * Try again the next time round. */ for (i = 0; i < noConnections; i++) { socke = connections[i].socket; if (FD_ISSET(socke, &readMask)) { if (anetRead(connections[i]) == 0) { return; } } if (FD_ISSET(socke, &writeMask)) { /* * This has to be here, as I found out tracing a subtle bug: * If the time is set in writeANET the modification will not * propagate into the array as C copies the structure for the function call. */ connections[i].lastOpenForWrite = time(NULL); if (anetWrite(connections[i]) == 0) { return; } } else { /* * If I could not write to the socket for 5 minutes, * the socket is considered broken and is closed. */ if (time(NULL) > connections[i].lastOpenForWrite + 300 && connections[i].type == DATASOCKET) { GetRWBufferData(connections[i].writeBuffer, &length); anetLog(ANETCON, "Closing socket because of time overrun: %d, delay = %d, bytes to be written = %d, write bit = %d\n", connections[i].socket, (int)(time(NULL) - connections[i].lastOpenForWrite), length, FD_ISSET(connections[i].socket, &writeMask) ); ANETclose(connections[i].handle); return; } } } } /*--------------------------------------------------------------------------*/ int ANETvalidHandle(int handle) { pSocketDescriptor con = NULL; con = findSocketDescriptor(handle); if (con != NULL) { return ANETOK; } else { return 0; } } /*---------------------------------------------------------------------------*/ int ANETinfo(int handle, char *hostname, int hostnameLen) { pSocketDescriptor con = NULL; struct sockaddr_in sin; struct hostent *host; socklen_t len; con = findSocketDescriptor(handle); if (con == NULL) { return ANETDISCONNECTED; } else { if(strlen(con->host) < 3){ len = sizeof sin; if (getpeername(con->socket, (struct sockaddr *) &sin, &len) < 0) { return ANETSOCKERROR; } if ((host = gethostbyaddr((char *) &sin.sin_addr, sizeof(sin.sin_addr), AF_INET)) == NULL) { return ANETSOCKERROR; } strlcpy(con->host,host->h_name, 132); } memset(hostname, 0, hostnameLen); strlcpy(hostname, con->host, hostnameLen); } return 1; } /*---------------------------------------------------------------------------*/ int ANETcanWrite(int handle, void *buffer, int count) { pSocketDescriptor con = NULL; int status; con = findSocketDescriptor(handle); if (con == NULL) { return ANETDISCONNECTED; } else { ANETprocess(); return CanStoreRWBuffer(con->writeBuffer, buffer, count); } } /*---------------------------------------------------------------------------*/ int ANETwrite(int handle, void *buffer, int count) { pSocketDescriptor con = NULL; int status; con = findSocketDescriptor(handle); if (con == NULL) { return ANETDISCONNECTED; } else { status = StoreRWBuffer(con->writeBuffer, buffer, count); /* first try if ANETprocess can write some and free the buffer before giving up */ if (status != 1) { ANETprocess(); status = StoreRWBuffer(con->writeBuffer, buffer, count); } if (status != 1) { anetLog(ANETERROR, "Write buffer overrun on handle %d, socket %d, trying to write %d bytes", con->handle, con->socket, count); return ANETWRITEBUFFERFULL; } } return ANETOK; } /*---------------------------------------------------------------------------*/ int ANETread(int handle, void *buffer, int bufferLength) { pSocketDescriptor con = NULL; int status, length, len; void *data = NULL; con = findSocketDescriptor(handle); if (con == NULL) { return ANETDISCONNECTED; } else { data = GetRWBufferData(con->readBuffer, &length); if (length == 0) { len = 0; } else if (length >= bufferLength) { len = bufferLength; } else { len = length; } if (len > 0) { memcpy(buffer, data, len); } } return len; } /*---------------------------------------------------------------------------*/ void *ANETreadPtr(int handle, int *length) { pSocketDescriptor con = NULL; void *data = NULL; con = findSocketDescriptor(handle); if (con == NULL) { *length = 0; return NULL; } else { data = GetRWBufferData(con->readBuffer, length); return data; } } /*---------------------------------------------------------------------------*/ void ANETreadConsume(int handle, int count) { pSocketDescriptor con = NULL; con = findSocketDescriptor(handle); if (con == NULL) { return; } else { RemoveRWBufferData(con->readBuffer, count); } } /*---------------------------------------------------------------------------*/ void ANETsetReadCallback(int handle, ANETcallback cb, void *userData, ANETkill killUser) { pSocketDescriptor con = NULL; con = findSocketDescriptor(handle); if (con == NULL) { return; } else { con->readCallback = cb; con->userData = userData; con->killUser = killUser; } } /*----------------------------------------------------------------------------*/ int ANETreadTillTerm(int handle, ANETtermCallback tcb, void *termData, ANETwait wcb, void *waitData, char **buffer) { pSocketDescriptor con = NULL; char *data; int length, status; while (wcb(waitData) > 0) { ANETprocess(); con = findSocketDescriptor(handle); if (con == NULL) { return ANETDISCONNECTED; } data = GetRWBufferData(con->readBuffer, &length); if (length > 0) { status = tcb(data, length, termData); if (status > 0) { *buffer = malloc(status * sizeof(char)); if (*buffer != NULL) { memcpy(*buffer, data, status); } RemoveRWBufferData(con->readBuffer, status); return ANETOK; } } } return ANETTIMEOUT; }