Changed required privilege for chnaging the threshold from manager to user in counter.c Changed an output code in macro.c
656 lines
18 KiB
C
656 lines
18 KiB
C
/**
|
|
* Asynchronous networking for SICS and other programs. This module centrally manages
|
|
* a number of network connections for a client program. It is a layer between the
|
|
* program and the network which manages non blocking network I/O. To this purpose, the
|
|
* client program has to call ANETprocess at convenient intervalls. This module
|
|
* has a couple of features:
|
|
* - Connections are abstracted to handles which are guranteed to be unique
|
|
* rather then socket numbers. Socket numbers may be reused by the OS.
|
|
* - This module allows upper level code to figure out if a connection is still
|
|
* connected or not.
|
|
* - This module introduces a buffer layer between the socket and the application.
|
|
* Thus the upper layer does not have to worry much about I/O blocking. This
|
|
* is taken care of by this module both for reading and writing.
|
|
* - All I/O is non blocking.
|
|
* - This module can detect if a client is hanging and close the connection then.
|
|
* Hanging is detected by not being able to write to the client for some period
|
|
* of time.
|
|
*
|
|
* copyright: see file COPYRIGHT
|
|
*
|
|
* Mark Koennecke, January 2009
|
|
*/
|
|
#include <stdlib.h>
|
|
#include <stdarg.h>
|
|
#include <stdio.h>
|
|
#include <assert.h>
|
|
#include <string.h>
|
|
#include <unistd.h>
|
|
#include <fcntl.h>
|
|
#include <errno.h>
|
|
#include <time.h>
|
|
#include <sys/types.h>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <netinet/tcp.h>
|
|
#include <arpa/inet.h>
|
|
#include <netdb.h>
|
|
#include <strlutil.h>
|
|
#include "asynnet.h"
|
|
#include "rwpuffer.h"
|
|
|
|
/*--------------------------------------------------------------------------*/
|
|
#define SERVERSOCKET 0
|
|
#define DATASOCKET 1
|
|
#define MAXCONNECTIONS 1024
|
|
#define RBUFFERSIZE 262144 /* 256kb */
|
|
#define WBUFFERSIZE 20*262144
|
|
/* #define WBUFFERSIZE 100*262144 */
|
|
#define MAXWBUFFERSIZE 128*1000*1024
|
|
/*--------------------------------------------------------------------------*/
|
|
typedef struct {
|
|
int socket;
|
|
int handle;
|
|
int type;
|
|
prwBuffer readBuffer;
|
|
prwBuffer writeBuffer;
|
|
time_t lastOpenForWrite;
|
|
ANETcallback readCallback;
|
|
void *userData;
|
|
ANETkill killUser;
|
|
char host[132];
|
|
} SocketDescriptor, *pSocketDescriptor;
|
|
|
|
static SocketDescriptor connections[MAXCONNECTIONS];
|
|
static int noConnections = 0;
|
|
static unsigned int handleID = 0;
|
|
/*------------------------------------------------------------------------*/
|
|
static int SocketCompare(const void *s1, const void *s2)
|
|
{
|
|
pSocketDescriptor socke1, socke2;
|
|
|
|
socke1 = (pSocketDescriptor) s1;
|
|
socke2 = (pSocketDescriptor) s2;
|
|
return socke1->handle - socke2->handle;
|
|
}
|
|
|
|
/*------------------------------------------------------------------------*/
|
|
static void sortConnections()
|
|
{
|
|
qsort(connections, noConnections,
|
|
sizeof(SocketDescriptor), SocketCompare);
|
|
}
|
|
|
|
/*------------------------------------------------------------------------*/
|
|
static pSocketDescriptor findSocketDescriptor(int handle)
|
|
{
|
|
SocketDescriptor key;
|
|
pSocketDescriptor result;
|
|
|
|
key.handle = handle;
|
|
result = bsearch(&key, connections, noConnections,
|
|
sizeof(SocketDescriptor), SocketCompare);
|
|
return result;
|
|
}
|
|
|
|
/*------------------------------------------------------------------------*/
|
|
static ANETlog logOutput = NULL;
|
|
static void *logUserData = NULL;
|
|
/*------------------------------------------------------------------------*/
|
|
static void anetLog(int level, char *fmt, ...)
|
|
{
|
|
va_list ap;
|
|
char buf[256];
|
|
char *text = NULL;
|
|
int l;
|
|
|
|
if (logOutput == NULL) {
|
|
return;
|
|
}
|
|
|
|
va_start(ap, fmt);
|
|
l = vsnprintf(buf, sizeof buf, fmt, ap);
|
|
va_end(ap);
|
|
if (l < sizeof buf) {
|
|
text = buf;
|
|
logOutput(level, text, logUserData);
|
|
} else {
|
|
/* assuming we have a C99 conforming snprintf and need a larger buffer */
|
|
text = calloc(l, 1);
|
|
va_start(ap, fmt);
|
|
vsnprintf(text, l, fmt, ap);
|
|
va_end(ap);
|
|
logOutput(level, text, logUserData);
|
|
free(text);
|
|
}
|
|
}
|
|
|
|
/*============= public interface =========================================*/
|
|
void ANETsetLog(ANETlog lcb, void *userData)
|
|
{
|
|
logOutput = lcb;
|
|
logUserData = userData;
|
|
}
|
|
|
|
/*------------------------------------------------------------------------*/
|
|
int ANETopenServerPort(int iPort, ANETcallback cb, void *userData)
|
|
{
|
|
SocketDescriptor socke;
|
|
int i = 1, status;
|
|
struct sockaddr_in addresse;
|
|
|
|
assert(iPort > 0);
|
|
assert(cb != NULL);
|
|
|
|
memset(&socke, 0, sizeof(SocketDescriptor));
|
|
socke.socket = socket(AF_INET, SOCK_STREAM, 0);
|
|
if (socke.socket < 0) {
|
|
anetLog(ANETERROR, "Failed to open server port: socket: %d", iPort);
|
|
return ANETOPENFAIL;
|
|
}
|
|
status =
|
|
setsockopt(socke.socket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int));
|
|
if (status < 0) {
|
|
anetLog(ANETERROR,
|
|
"Failed to open server port: setsockopt: %d, errno = %d",
|
|
iPort, errno);
|
|
return ANETOPENFAIL;
|
|
}
|
|
memset(&addresse, 0, sizeof(struct sockaddr_in));
|
|
addresse.sin_family = AF_INET;
|
|
addresse.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
addresse.sin_port = htons(iPort);
|
|
status = bind(socke.socket, (struct sockaddr *) &addresse,
|
|
sizeof(struct sockaddr_in));
|
|
if (status < 0) {
|
|
anetLog(ANETERROR, "Failed to open server port: bind: %d, errno = %d",
|
|
iPort, errno);
|
|
return ANETOPENFAIL;
|
|
}
|
|
status = listen(socke.socket, 8);
|
|
if (status < 0) {
|
|
anetLog(ANETERROR, "Failed to open server port: listen: %d", iPort);
|
|
return ANETOPENFAIL;
|
|
}
|
|
socke.type = SERVERSOCKET;
|
|
socke.handle = handleID;
|
|
handleID++;
|
|
socke.readCallback = cb;
|
|
socke.userData = userData;
|
|
socke.lastOpenForWrite = time(NULL);
|
|
connections[noConnections] = socke;
|
|
noConnections++;
|
|
sortConnections();
|
|
anetLog(ANETCON, "Opened server port %d", iPort);
|
|
return socke.handle;
|
|
}
|
|
|
|
/*-----------------------------------------------------------------------*/
|
|
int ANETregisterSocket(int socket)
|
|
{
|
|
SocketDescriptor socke;
|
|
int flags, status;
|
|
|
|
if (noConnections >= MAXCONNECTIONS) {
|
|
anetLog(ANETERROR, "Maximum number of connections exceeded");
|
|
return ANETOUTOFSOCKETS;
|
|
}
|
|
|
|
memset(&socke, 0, sizeof(SocketDescriptor));
|
|
flags = fcntl(socket, F_GETFL, 0);
|
|
status = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
|
|
if (status < 0) {
|
|
return ANETSOCKERROR;
|
|
}
|
|
flags =1;
|
|
setsockopt(socket,IPPROTO_TCP,TCP_NODELAY,(char *) &flags, sizeof(int));
|
|
socke.readBuffer = MakeRWPuffer(RBUFFERSIZE);
|
|
socke.writeBuffer = MakeBigRWPuffer(WBUFFERSIZE, MAXWBUFFERSIZE);
|
|
if (socke.readBuffer == NULL || socke.writeBuffer == NULL) {
|
|
return ANETMEM;
|
|
}
|
|
socke.socket = socket;
|
|
socke.handle = handleID;
|
|
handleID++;
|
|
socke.type = DATASOCKET;
|
|
socke.lastOpenForWrite = time(NULL);
|
|
connections[noConnections] = socke;
|
|
noConnections++;
|
|
sortConnections();
|
|
return socke.handle;
|
|
}
|
|
|
|
/*-------------------------------------------------------------------------*/
|
|
int ANETconnect(char *name, int iPort)
|
|
{
|
|
struct in_addr addr;
|
|
struct sockaddr_in addresse;
|
|
struct hostent *host;
|
|
int socke, status;
|
|
|
|
memset(&addresse, 0, sizeof(struct sockaddr_in));
|
|
addresse.sin_family = AF_INET;
|
|
addresse.sin_port = htons((unsigned short)(iPort &0xFFFF));
|
|
host = gethostbyname(name);
|
|
if (host != NULL) {
|
|
memcpy((char *) &addr,
|
|
(char *)host->h_addr_list[0], (size_t)host->h_length);
|
|
} else {
|
|
/* check for aaa.bbbb.ccc.dddd */
|
|
addr.s_addr = inet_addr(name);
|
|
if(addr.s_addr == (unsigned int) -1) {
|
|
anetLog(ANETERROR, "Failed to locate host: %s", name);
|
|
return ANETOPENFAIL;
|
|
}
|
|
}
|
|
addresse.sin_addr.s_addr = addr.s_addr;
|
|
socke = socket(AF_INET, SOCK_STREAM, 0);
|
|
status = connect(socke, (struct sockaddr *) &addresse,
|
|
sizeof(struct sockaddr_in));
|
|
if (status < 0) {
|
|
close(socke);
|
|
anetLog(ANETERROR, "Failed to open socket to %s:%d", name, iPort);
|
|
return ANETOPENFAIL;
|
|
}
|
|
anetLog(ANETCON, "Opened socket %d to %s:%d", socke, name, iPort);
|
|
return ANETregisterSocket(socke);
|
|
}
|
|
|
|
/*--------------------------------------------------------------------------*/
|
|
void ANETclose(int handle)
|
|
{
|
|
pSocketDescriptor socke = NULL;
|
|
|
|
socke = findSocketDescriptor(handle);
|
|
if (socke == NULL) {
|
|
return;
|
|
}
|
|
close(socke->socket);
|
|
anetLog(ANETCON, "Closed socket %d", socke->socket);
|
|
if (socke->readBuffer != NULL) {
|
|
KillRWBuffer(socke->readBuffer);
|
|
}
|
|
if (socke->writeBuffer != NULL) {
|
|
KillRWBuffer(socke->writeBuffer);
|
|
}
|
|
if (socke->userData && socke->killUser) {
|
|
socke->killUser(socke->userData);
|
|
}
|
|
if (noConnections > 1) {
|
|
*socke = connections[noConnections - 1];
|
|
noConnections--;
|
|
sortConnections();
|
|
} else {
|
|
noConnections = 0;
|
|
}
|
|
}
|
|
|
|
/*--------------------------------------------------------------------------*/
|
|
static int anetWrite(SocketDescriptor con)
|
|
{
|
|
int status, length;
|
|
void *pPtr;
|
|
|
|
pPtr = GetRWBufferData(con.writeBuffer, &length);
|
|
if (length > 0) {
|
|
status = send(con.socket, pPtr, length, 0);
|
|
if (status < 0) {
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
|
|
return 1;
|
|
}
|
|
ANETclose(con.handle);
|
|
return 0;
|
|
}
|
|
RemoveRWBufferData(con.writeBuffer, status);
|
|
}
|
|
return 1;
|
|
}
|
|
/*--------------------------------------------------------------------------
|
|
* I have seen that select did not report the write possibility set,
|
|
* though the send buffer was empty. Thus I try to write if data is there
|
|
* and leave the lastOpenForWrite flag only when the socket would block
|
|
* on write.
|
|
*--------------------------------------------------------------------------*/
|
|
static int anetTestWrite(SocketDescriptor con)
|
|
{
|
|
int status, length;
|
|
void *pPtr;
|
|
time_t lastTime;
|
|
|
|
if(con.type != DATASOCKET){
|
|
return 1;
|
|
}
|
|
|
|
lastTime = con.lastOpenForWrite;
|
|
con.lastOpenForWrite = time(NULL);
|
|
pPtr = GetRWBufferData(con.writeBuffer, &length);
|
|
if (length > 0) {
|
|
status = send(con.socket, pPtr, length, 0);
|
|
if (status < 0) {
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
|
|
con.lastOpenForWrite = lastTime;
|
|
return 1;
|
|
}
|
|
ANETclose(con.handle);
|
|
return 0;
|
|
}
|
|
RemoveRWBufferData(con.writeBuffer, status);
|
|
}
|
|
return 1;
|
|
}
|
|
/*--------------------------------------------------------------------------
|
|
There is something implicit in the return here: anetRead must return 0
|
|
when the list of sockets changes. Otherwise ANETprocess cannot detect that it has
|
|
an invalid socket list and jump out. In all other cases anetRead must return 1
|
|
-------------------------------------------------------------------------------*/
|
|
|
|
static int anetRead(SocketDescriptor con)
|
|
{
|
|
int socke, handle, status;
|
|
unsigned int len;
|
|
struct sockaddr addresse;
|
|
char buffer[8192];
|
|
|
|
switch (con.type) {
|
|
case SERVERSOCKET:
|
|
len = sizeof(struct sockaddr);
|
|
socke = accept(con.socket, &addresse, &len);
|
|
if (socke < 0) {
|
|
return 1;
|
|
}
|
|
handle = ANETregisterSocket(socke);
|
|
if (handle > 0) {
|
|
status = con.readCallback(handle, con.userData);
|
|
if (status != 1) {
|
|
ANETclose(handle);
|
|
return 0;
|
|
}
|
|
}
|
|
anetLog(ANETCON, "Accepted socket %d on port %d, handle %d",
|
|
socke, con.socket, handle);
|
|
break;
|
|
case DATASOCKET:
|
|
memset(buffer, 0, 8192);
|
|
status = recv(con.socket, buffer, 8192, 0);
|
|
if (status < 0) {
|
|
if (errno == EAGAIN) {
|
|
return 1;
|
|
}
|
|
ANETclose(con.handle);
|
|
return 0;
|
|
} else if (status == 0) {
|
|
/* this means EOF */
|
|
ANETclose(con.handle);
|
|
return 0;
|
|
} else {
|
|
status = StoreRWBuffer(con.readBuffer, buffer, status);
|
|
if (status != 1) {
|
|
anetLog(ANETERROR, "Read buffer overrun at handle %d, socket %d",
|
|
con.handle, con.socket);
|
|
}
|
|
if (con.readCallback != NULL) {
|
|
con.readCallback(con.handle, con.userData);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
/*---------------------------------------------------------------------------*/
|
|
void ANETprocess(void)
|
|
{
|
|
int i, status, count = 0, socke = 0, length;
|
|
fd_set readMask, writeMask;
|
|
struct timeval tmo = { 0, 10000 };
|
|
|
|
FD_ZERO(&readMask);
|
|
FD_ZERO(&writeMask);
|
|
for (i = 0; i < noConnections; i++) {
|
|
socke = connections[i].socket;
|
|
FD_SET(socke, &readMask);
|
|
if (connections[i].writeBuffer != NULL) {
|
|
GetRWBufferData(connections[i].writeBuffer, &length);
|
|
if (length > 0)
|
|
FD_SET(socke, &writeMask);
|
|
else
|
|
connections[i].lastOpenForWrite = time(NULL);
|
|
}
|
|
if (socke > count) {
|
|
count = socke;
|
|
}
|
|
}
|
|
|
|
count++;
|
|
status = select(count, &readMask, &writeMask, NULL, &tmo);
|
|
if (status < 0) {
|
|
if (errno == EINTR) {
|
|
return;
|
|
}
|
|
return;
|
|
}
|
|
|
|
/**
|
|
* I always jump out of this loop when a socket is created or closed
|
|
* because then the order in the connections array is no longer valid.
|
|
* Try again the next time round.
|
|
*/
|
|
for (i = 0; i < noConnections; i++) {
|
|
socke = connections[i].socket;
|
|
if (FD_ISSET(socke, &readMask)) {
|
|
if (anetRead(connections[i]) == 0) {
|
|
return;
|
|
}
|
|
}
|
|
if (FD_ISSET(socke, &writeMask)) {
|
|
/*
|
|
* This has to be here, as I found out tracing a subtle bug:
|
|
* If the time is set in writeANET the modification will not
|
|
* propagate into the array as C copies the structure for the function call.
|
|
*/
|
|
connections[i].lastOpenForWrite = time(NULL);
|
|
if (anetWrite(connections[i]) == 0) {
|
|
return;
|
|
}
|
|
} else {
|
|
/*
|
|
* If I could not write to the socket for 5 minutes,
|
|
* the socket is considered broken and is closed.
|
|
*/
|
|
if (time(NULL) > connections[i].lastOpenForWrite + 300 &&
|
|
connections[i].type == DATASOCKET) {
|
|
GetRWBufferData(connections[i].writeBuffer, &length);
|
|
anetLog(ANETCON, "Closing socket because of time overrun: %d, delay = %d, bytes to be written = %d, write bit = %d\n", connections[i].socket,
|
|
(int)(time(NULL) - connections[i].lastOpenForWrite), length, FD_ISSET(connections[i].socket, &writeMask) );
|
|
ANETclose(connections[i].handle);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/*--------------------------------------------------------------------------*/
|
|
int ANETvalidHandle(int handle)
|
|
{
|
|
pSocketDescriptor con = NULL;
|
|
|
|
con = findSocketDescriptor(handle);
|
|
if (con != NULL) {
|
|
return ANETOK;
|
|
} else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/*---------------------------------------------------------------------------*/
|
|
int ANETinfo(int handle, char *hostname, int hostnameLen)
|
|
{
|
|
pSocketDescriptor con = NULL;
|
|
struct sockaddr_in sin;
|
|
struct hostent *host;
|
|
socklen_t len;
|
|
|
|
con = findSocketDescriptor(handle);
|
|
if (con == NULL) {
|
|
return ANETDISCONNECTED;
|
|
} else {
|
|
if(strlen(con->host) < 3){
|
|
len = sizeof sin;
|
|
if (getpeername(con->socket, (struct sockaddr *) &sin, &len) < 0) {
|
|
return ANETSOCKERROR;
|
|
}
|
|
if ((host = gethostbyaddr((char *) &sin.sin_addr,
|
|
sizeof(sin.sin_addr), AF_INET)) == NULL) {
|
|
return ANETSOCKERROR;
|
|
}
|
|
strlcpy(con->host,host->h_name, 132);
|
|
}
|
|
memset(hostname, 0, hostnameLen);
|
|
strlcpy(hostname, con->host, hostnameLen);
|
|
}
|
|
return 1;
|
|
}
|
|
/*---------------------------------------------------------------------------*/
|
|
int ANETcanWrite(int handle, void *buffer, int count)
|
|
{
|
|
pSocketDescriptor con = NULL;
|
|
int status;
|
|
|
|
con = findSocketDescriptor(handle);
|
|
if (con == NULL) {
|
|
return ANETDISCONNECTED;
|
|
} else {
|
|
ANETprocess();
|
|
return CanStoreRWBuffer(con->writeBuffer, buffer, count);
|
|
}
|
|
}
|
|
/*---------------------------------------------------------------------------*/
|
|
int ANETwrite(int handle, void *buffer, int count)
|
|
{
|
|
pSocketDescriptor con = NULL;
|
|
int status;
|
|
|
|
con = findSocketDescriptor(handle);
|
|
if (con == NULL) {
|
|
return ANETDISCONNECTED;
|
|
} else {
|
|
status = StoreRWBuffer(con->writeBuffer, buffer, count);
|
|
/*
|
|
first try if ANETprocess can write some and free the buffer
|
|
before giving up
|
|
*/
|
|
if (status != 1) {
|
|
ANETprocess();
|
|
status = StoreRWBuffer(con->writeBuffer, buffer, count);
|
|
}
|
|
if (status != 1) {
|
|
anetLog(ANETERROR, "Write buffer overrun on handle %d, socket %d, trying to write %d bytes",
|
|
con->handle, con->socket, count);
|
|
return ANETWRITEBUFFERFULL;
|
|
}
|
|
}
|
|
return ANETOK;
|
|
}
|
|
|
|
/*---------------------------------------------------------------------------*/
|
|
int ANETread(int handle, void *buffer, int bufferLength)
|
|
{
|
|
pSocketDescriptor con = NULL;
|
|
int status, length, len;
|
|
void *data = NULL;
|
|
|
|
con = findSocketDescriptor(handle);
|
|
if (con == NULL) {
|
|
return ANETDISCONNECTED;
|
|
} else {
|
|
data = GetRWBufferData(con->readBuffer, &length);
|
|
if (length == 0) {
|
|
len = 0;
|
|
} else if (length >= bufferLength) {
|
|
len = bufferLength;
|
|
} else {
|
|
len = length;
|
|
}
|
|
if (len > 0) {
|
|
memcpy(buffer, data, len);
|
|
}
|
|
}
|
|
return len;
|
|
}
|
|
|
|
/*---------------------------------------------------------------------------*/
|
|
void *ANETreadPtr(int handle, int *length)
|
|
{
|
|
pSocketDescriptor con = NULL;
|
|
void *data = NULL;
|
|
|
|
con = findSocketDescriptor(handle);
|
|
if (con == NULL) {
|
|
*length = 0;
|
|
return NULL;
|
|
} else {
|
|
data = GetRWBufferData(con->readBuffer, length);
|
|
return data;
|
|
}
|
|
}
|
|
|
|
/*---------------------------------------------------------------------------*/
|
|
void ANETreadConsume(int handle, int count)
|
|
{
|
|
pSocketDescriptor con = NULL;
|
|
|
|
con = findSocketDescriptor(handle);
|
|
if (con == NULL) {
|
|
return;
|
|
} else {
|
|
RemoveRWBufferData(con->readBuffer, count);
|
|
}
|
|
}
|
|
|
|
/*---------------------------------------------------------------------------*/
|
|
void ANETsetReadCallback(int handle, ANETcallback cb,
|
|
void *userData, ANETkill killUser)
|
|
{
|
|
pSocketDescriptor con = NULL;
|
|
|
|
con = findSocketDescriptor(handle);
|
|
if (con == NULL) {
|
|
return;
|
|
} else {
|
|
con->readCallback = cb;
|
|
con->userData = userData;
|
|
con->killUser = killUser;
|
|
}
|
|
}
|
|
|
|
/*----------------------------------------------------------------------------*/
|
|
int ANETreadTillTerm(int handle,
|
|
ANETtermCallback tcb, void *termData,
|
|
ANETwait wcb, void *waitData, char **buffer)
|
|
{
|
|
pSocketDescriptor con = NULL;
|
|
char *data;
|
|
int length, status;
|
|
|
|
while (wcb(waitData) > 0) {
|
|
ANETprocess();
|
|
con = findSocketDescriptor(handle);
|
|
if (con == NULL) {
|
|
return ANETDISCONNECTED;
|
|
}
|
|
data = GetRWBufferData(con->readBuffer, &length);
|
|
if (length > 0) {
|
|
status = tcb(data, length, termData);
|
|
if (status > 0) {
|
|
*buffer = malloc(status * sizeof(char));
|
|
if (*buffer != NULL) {
|
|
memcpy(*buffer, data, status);
|
|
}
|
|
RemoveRWBufferData(con->readBuffer, status);
|
|
return ANETOK;
|
|
}
|
|
}
|
|
}
|
|
return ANETTIMEOUT;
|
|
}
|