Files
sics/asynnet.c
koennecke fd5451e8fd Enhanced comments in asynnet
Changed required privilege for chnaging the threshold from manager to user in counter.c
Changed an output code in macro.c
2016-10-25 17:03:21 +02:00

656 lines
18 KiB
C

/**
* Asynchronous networking for SICS and other programs. This module centrally manages
* a number of network connections for a client program. It is a layer between the
* program and the network which manages non blocking network I/O. To this purpose, the
* client program has to call ANETprocess at convenient intervalls. This module
* has a couple of features:
* - Connections are abstracted to handles which are guranteed to be unique
* rather then socket numbers. Socket numbers may be reused by the OS.
* - This module allows upper level code to figure out if a connection is still
* connected or not.
* - This module introduces a buffer layer between the socket and the application.
* Thus the upper layer does not have to worry much about I/O blocking. This
* is taken care of by this module both for reading and writing.
* - All I/O is non blocking.
* - This module can detect if a client is hanging and close the connection then.
* Hanging is detected by not being able to write to the client for some period
* of time.
*
* copyright: see file COPYRIGHT
*
* Mark Koennecke, January 2009
*/
#include <stdlib.h>
#include <stdarg.h>
#include <stdio.h>
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <strlutil.h>
#include "asynnet.h"
#include "rwpuffer.h"
/*--------------------------------------------------------------------------*/
#define SERVERSOCKET 0
#define DATASOCKET 1
#define MAXCONNECTIONS 1024
#define RBUFFERSIZE 262144 /* 256kb */
#define WBUFFERSIZE 20*262144
/* #define WBUFFERSIZE 100*262144 */
#define MAXWBUFFERSIZE 128*1000*1024
/*--------------------------------------------------------------------------*/
typedef struct {
int socket;
int handle;
int type;
prwBuffer readBuffer;
prwBuffer writeBuffer;
time_t lastOpenForWrite;
ANETcallback readCallback;
void *userData;
ANETkill killUser;
char host[132];
} SocketDescriptor, *pSocketDescriptor;
static SocketDescriptor connections[MAXCONNECTIONS];
static int noConnections = 0;
static unsigned int handleID = 0;
/*------------------------------------------------------------------------*/
static int SocketCompare(const void *s1, const void *s2)
{
pSocketDescriptor socke1, socke2;
socke1 = (pSocketDescriptor) s1;
socke2 = (pSocketDescriptor) s2;
return socke1->handle - socke2->handle;
}
/*------------------------------------------------------------------------*/
static void sortConnections()
{
qsort(connections, noConnections,
sizeof(SocketDescriptor), SocketCompare);
}
/*------------------------------------------------------------------------*/
static pSocketDescriptor findSocketDescriptor(int handle)
{
SocketDescriptor key;
pSocketDescriptor result;
key.handle = handle;
result = bsearch(&key, connections, noConnections,
sizeof(SocketDescriptor), SocketCompare);
return result;
}
/*------------------------------------------------------------------------*/
static ANETlog logOutput = NULL;
static void *logUserData = NULL;
/*------------------------------------------------------------------------*/
static void anetLog(int level, char *fmt, ...)
{
va_list ap;
char buf[256];
char *text = NULL;
int l;
if (logOutput == NULL) {
return;
}
va_start(ap, fmt);
l = vsnprintf(buf, sizeof buf, fmt, ap);
va_end(ap);
if (l < sizeof buf) {
text = buf;
logOutput(level, text, logUserData);
} else {
/* assuming we have a C99 conforming snprintf and need a larger buffer */
text = calloc(l, 1);
va_start(ap, fmt);
vsnprintf(text, l, fmt, ap);
va_end(ap);
logOutput(level, text, logUserData);
free(text);
}
}
/*============= public interface =========================================*/
void ANETsetLog(ANETlog lcb, void *userData)
{
logOutput = lcb;
logUserData = userData;
}
/*------------------------------------------------------------------------*/
int ANETopenServerPort(int iPort, ANETcallback cb, void *userData)
{
SocketDescriptor socke;
int i = 1, status;
struct sockaddr_in addresse;
assert(iPort > 0);
assert(cb != NULL);
memset(&socke, 0, sizeof(SocketDescriptor));
socke.socket = socket(AF_INET, SOCK_STREAM, 0);
if (socke.socket < 0) {
anetLog(ANETERROR, "Failed to open server port: socket: %d", iPort);
return ANETOPENFAIL;
}
status =
setsockopt(socke.socket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int));
if (status < 0) {
anetLog(ANETERROR,
"Failed to open server port: setsockopt: %d, errno = %d",
iPort, errno);
return ANETOPENFAIL;
}
memset(&addresse, 0, sizeof(struct sockaddr_in));
addresse.sin_family = AF_INET;
addresse.sin_addr.s_addr = htonl(INADDR_ANY);
addresse.sin_port = htons(iPort);
status = bind(socke.socket, (struct sockaddr *) &addresse,
sizeof(struct sockaddr_in));
if (status < 0) {
anetLog(ANETERROR, "Failed to open server port: bind: %d, errno = %d",
iPort, errno);
return ANETOPENFAIL;
}
status = listen(socke.socket, 8);
if (status < 0) {
anetLog(ANETERROR, "Failed to open server port: listen: %d", iPort);
return ANETOPENFAIL;
}
socke.type = SERVERSOCKET;
socke.handle = handleID;
handleID++;
socke.readCallback = cb;
socke.userData = userData;
socke.lastOpenForWrite = time(NULL);
connections[noConnections] = socke;
noConnections++;
sortConnections();
anetLog(ANETCON, "Opened server port %d", iPort);
return socke.handle;
}
/*-----------------------------------------------------------------------*/
int ANETregisterSocket(int socket)
{
SocketDescriptor socke;
int flags, status;
if (noConnections >= MAXCONNECTIONS) {
anetLog(ANETERROR, "Maximum number of connections exceeded");
return ANETOUTOFSOCKETS;
}
memset(&socke, 0, sizeof(SocketDescriptor));
flags = fcntl(socket, F_GETFL, 0);
status = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
if (status < 0) {
return ANETSOCKERROR;
}
flags =1;
setsockopt(socket,IPPROTO_TCP,TCP_NODELAY,(char *) &flags, sizeof(int));
socke.readBuffer = MakeRWPuffer(RBUFFERSIZE);
socke.writeBuffer = MakeBigRWPuffer(WBUFFERSIZE, MAXWBUFFERSIZE);
if (socke.readBuffer == NULL || socke.writeBuffer == NULL) {
return ANETMEM;
}
socke.socket = socket;
socke.handle = handleID;
handleID++;
socke.type = DATASOCKET;
socke.lastOpenForWrite = time(NULL);
connections[noConnections] = socke;
noConnections++;
sortConnections();
return socke.handle;
}
/*-------------------------------------------------------------------------*/
int ANETconnect(char *name, int iPort)
{
struct in_addr addr;
struct sockaddr_in addresse;
struct hostent *host;
int socke, status;
memset(&addresse, 0, sizeof(struct sockaddr_in));
addresse.sin_family = AF_INET;
addresse.sin_port = htons((unsigned short)(iPort &0xFFFF));
host = gethostbyname(name);
if (host != NULL) {
memcpy((char *) &addr,
(char *)host->h_addr_list[0], (size_t)host->h_length);
} else {
/* check for aaa.bbbb.ccc.dddd */
addr.s_addr = inet_addr(name);
if(addr.s_addr == (unsigned int) -1) {
anetLog(ANETERROR, "Failed to locate host: %s", name);
return ANETOPENFAIL;
}
}
addresse.sin_addr.s_addr = addr.s_addr;
socke = socket(AF_INET, SOCK_STREAM, 0);
status = connect(socke, (struct sockaddr *) &addresse,
sizeof(struct sockaddr_in));
if (status < 0) {
close(socke);
anetLog(ANETERROR, "Failed to open socket to %s:%d", name, iPort);
return ANETOPENFAIL;
}
anetLog(ANETCON, "Opened socket %d to %s:%d", socke, name, iPort);
return ANETregisterSocket(socke);
}
/*--------------------------------------------------------------------------*/
void ANETclose(int handle)
{
pSocketDescriptor socke = NULL;
socke = findSocketDescriptor(handle);
if (socke == NULL) {
return;
}
close(socke->socket);
anetLog(ANETCON, "Closed socket %d", socke->socket);
if (socke->readBuffer != NULL) {
KillRWBuffer(socke->readBuffer);
}
if (socke->writeBuffer != NULL) {
KillRWBuffer(socke->writeBuffer);
}
if (socke->userData && socke->killUser) {
socke->killUser(socke->userData);
}
if (noConnections > 1) {
*socke = connections[noConnections - 1];
noConnections--;
sortConnections();
} else {
noConnections = 0;
}
}
/*--------------------------------------------------------------------------*/
static int anetWrite(SocketDescriptor con)
{
int status, length;
void *pPtr;
pPtr = GetRWBufferData(con.writeBuffer, &length);
if (length > 0) {
status = send(con.socket, pPtr, length, 0);
if (status < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
return 1;
}
ANETclose(con.handle);
return 0;
}
RemoveRWBufferData(con.writeBuffer, status);
}
return 1;
}
/*--------------------------------------------------------------------------
* I have seen that select did not report the write possibility set,
* though the send buffer was empty. Thus I try to write if data is there
* and leave the lastOpenForWrite flag only when the socket would block
* on write.
*--------------------------------------------------------------------------*/
static int anetTestWrite(SocketDescriptor con)
{
int status, length;
void *pPtr;
time_t lastTime;
if(con.type != DATASOCKET){
return 1;
}
lastTime = con.lastOpenForWrite;
con.lastOpenForWrite = time(NULL);
pPtr = GetRWBufferData(con.writeBuffer, &length);
if (length > 0) {
status = send(con.socket, pPtr, length, 0);
if (status < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
con.lastOpenForWrite = lastTime;
return 1;
}
ANETclose(con.handle);
return 0;
}
RemoveRWBufferData(con.writeBuffer, status);
}
return 1;
}
/*--------------------------------------------------------------------------
There is something implicit in the return here: anetRead must return 0
when the list of sockets changes. Otherwise ANETprocess cannot detect that it has
an invalid socket list and jump out. In all other cases anetRead must return 1
-------------------------------------------------------------------------------*/
static int anetRead(SocketDescriptor con)
{
int socke, handle, status;
unsigned int len;
struct sockaddr addresse;
char buffer[8192];
switch (con.type) {
case SERVERSOCKET:
len = sizeof(struct sockaddr);
socke = accept(con.socket, &addresse, &len);
if (socke < 0) {
return 1;
}
handle = ANETregisterSocket(socke);
if (handle > 0) {
status = con.readCallback(handle, con.userData);
if (status != 1) {
ANETclose(handle);
return 0;
}
}
anetLog(ANETCON, "Accepted socket %d on port %d, handle %d",
socke, con.socket, handle);
break;
case DATASOCKET:
memset(buffer, 0, 8192);
status = recv(con.socket, buffer, 8192, 0);
if (status < 0) {
if (errno == EAGAIN) {
return 1;
}
ANETclose(con.handle);
return 0;
} else if (status == 0) {
/* this means EOF */
ANETclose(con.handle);
return 0;
} else {
status = StoreRWBuffer(con.readBuffer, buffer, status);
if (status != 1) {
anetLog(ANETERROR, "Read buffer overrun at handle %d, socket %d",
con.handle, con.socket);
}
if (con.readCallback != NULL) {
con.readCallback(con.handle, con.userData);
}
}
break;
}
return 1;
}
/*---------------------------------------------------------------------------*/
void ANETprocess(void)
{
int i, status, count = 0, socke = 0, length;
fd_set readMask, writeMask;
struct timeval tmo = { 0, 10000 };
FD_ZERO(&readMask);
FD_ZERO(&writeMask);
for (i = 0; i < noConnections; i++) {
socke = connections[i].socket;
FD_SET(socke, &readMask);
if (connections[i].writeBuffer != NULL) {
GetRWBufferData(connections[i].writeBuffer, &length);
if (length > 0)
FD_SET(socke, &writeMask);
else
connections[i].lastOpenForWrite = time(NULL);
}
if (socke > count) {
count = socke;
}
}
count++;
status = select(count, &readMask, &writeMask, NULL, &tmo);
if (status < 0) {
if (errno == EINTR) {
return;
}
return;
}
/**
* I always jump out of this loop when a socket is created or closed
* because then the order in the connections array is no longer valid.
* Try again the next time round.
*/
for (i = 0; i < noConnections; i++) {
socke = connections[i].socket;
if (FD_ISSET(socke, &readMask)) {
if (anetRead(connections[i]) == 0) {
return;
}
}
if (FD_ISSET(socke, &writeMask)) {
/*
* This has to be here, as I found out tracing a subtle bug:
* If the time is set in writeANET the modification will not
* propagate into the array as C copies the structure for the function call.
*/
connections[i].lastOpenForWrite = time(NULL);
if (anetWrite(connections[i]) == 0) {
return;
}
} else {
/*
* If I could not write to the socket for 5 minutes,
* the socket is considered broken and is closed.
*/
if (time(NULL) > connections[i].lastOpenForWrite + 300 &&
connections[i].type == DATASOCKET) {
GetRWBufferData(connections[i].writeBuffer, &length);
anetLog(ANETCON, "Closing socket because of time overrun: %d, delay = %d, bytes to be written = %d, write bit = %d\n", connections[i].socket,
(int)(time(NULL) - connections[i].lastOpenForWrite), length, FD_ISSET(connections[i].socket, &writeMask) );
ANETclose(connections[i].handle);
return;
}
}
}
}
/*--------------------------------------------------------------------------*/
int ANETvalidHandle(int handle)
{
pSocketDescriptor con = NULL;
con = findSocketDescriptor(handle);
if (con != NULL) {
return ANETOK;
} else {
return 0;
}
}
/*---------------------------------------------------------------------------*/
int ANETinfo(int handle, char *hostname, int hostnameLen)
{
pSocketDescriptor con = NULL;
struct sockaddr_in sin;
struct hostent *host;
socklen_t len;
con = findSocketDescriptor(handle);
if (con == NULL) {
return ANETDISCONNECTED;
} else {
if(strlen(con->host) < 3){
len = sizeof sin;
if (getpeername(con->socket, (struct sockaddr *) &sin, &len) < 0) {
return ANETSOCKERROR;
}
if ((host = gethostbyaddr((char *) &sin.sin_addr,
sizeof(sin.sin_addr), AF_INET)) == NULL) {
return ANETSOCKERROR;
}
strlcpy(con->host,host->h_name, 132);
}
memset(hostname, 0, hostnameLen);
strlcpy(hostname, con->host, hostnameLen);
}
return 1;
}
/*---------------------------------------------------------------------------*/
int ANETcanWrite(int handle, void *buffer, int count)
{
pSocketDescriptor con = NULL;
int status;
con = findSocketDescriptor(handle);
if (con == NULL) {
return ANETDISCONNECTED;
} else {
ANETprocess();
return CanStoreRWBuffer(con->writeBuffer, buffer, count);
}
}
/*---------------------------------------------------------------------------*/
int ANETwrite(int handle, void *buffer, int count)
{
pSocketDescriptor con = NULL;
int status;
con = findSocketDescriptor(handle);
if (con == NULL) {
return ANETDISCONNECTED;
} else {
status = StoreRWBuffer(con->writeBuffer, buffer, count);
/*
first try if ANETprocess can write some and free the buffer
before giving up
*/
if (status != 1) {
ANETprocess();
status = StoreRWBuffer(con->writeBuffer, buffer, count);
}
if (status != 1) {
anetLog(ANETERROR, "Write buffer overrun on handle %d, socket %d, trying to write %d bytes",
con->handle, con->socket, count);
return ANETWRITEBUFFERFULL;
}
}
return ANETOK;
}
/*---------------------------------------------------------------------------*/
int ANETread(int handle, void *buffer, int bufferLength)
{
pSocketDescriptor con = NULL;
int status, length, len;
void *data = NULL;
con = findSocketDescriptor(handle);
if (con == NULL) {
return ANETDISCONNECTED;
} else {
data = GetRWBufferData(con->readBuffer, &length);
if (length == 0) {
len = 0;
} else if (length >= bufferLength) {
len = bufferLength;
} else {
len = length;
}
if (len > 0) {
memcpy(buffer, data, len);
}
}
return len;
}
/*---------------------------------------------------------------------------*/
void *ANETreadPtr(int handle, int *length)
{
pSocketDescriptor con = NULL;
void *data = NULL;
con = findSocketDescriptor(handle);
if (con == NULL) {
*length = 0;
return NULL;
} else {
data = GetRWBufferData(con->readBuffer, length);
return data;
}
}
/*---------------------------------------------------------------------------*/
void ANETreadConsume(int handle, int count)
{
pSocketDescriptor con = NULL;
con = findSocketDescriptor(handle);
if (con == NULL) {
return;
} else {
RemoveRWBufferData(con->readBuffer, count);
}
}
/*---------------------------------------------------------------------------*/
void ANETsetReadCallback(int handle, ANETcallback cb,
void *userData, ANETkill killUser)
{
pSocketDescriptor con = NULL;
con = findSocketDescriptor(handle);
if (con == NULL) {
return;
} else {
con->readCallback = cb;
con->userData = userData;
con->killUser = killUser;
}
}
/*----------------------------------------------------------------------------*/
int ANETreadTillTerm(int handle,
ANETtermCallback tcb, void *termData,
ANETwait wcb, void *waitData, char **buffer)
{
pSocketDescriptor con = NULL;
char *data;
int length, status;
while (wcb(waitData) > 0) {
ANETprocess();
con = findSocketDescriptor(handle);
if (con == NULL) {
return ANETDISCONNECTED;
}
data = GetRWBufferData(con->readBuffer, &length);
if (length > 0) {
status = tcb(data, length, termData);
if (status > 0) {
*buffer = malloc(status * sizeof(char));
if (*buffer != NULL) {
memcpy(*buffer, data, status);
}
RemoveRWBufferData(con->readBuffer, status);
return ANETOK;
}
}
}
return ANETTIMEOUT;
}