- Fixed a core dump in the sycamore protocol
- Added missing files aynnet.* - Fixed the addition to root issue in scriptcontex
This commit is contained in:
576
asynnet.c
Normal file
576
asynnet.c
Normal file
@ -0,0 +1,576 @@
|
||||
/**
|
||||
* Asynchronous networking for SICS and other programs. This module centrally manages
|
||||
* a number of network connections for a client program. It is a layer between the
|
||||
* program and the network which manages non blocking network I/O. To this purpose, the
|
||||
* client program has to call ANETprocess at convenient intervalls. This module
|
||||
* has a couple of features:
|
||||
* - Connections are abstracted to handles which are guranteed to be unique
|
||||
* rather then socket numbers. Socket numbers may be reused by the OS.
|
||||
* - This module allows upper level code to figure out if a connection is still
|
||||
* connected or not.
|
||||
* - This module introduces a buffer layer between the socket and the application.
|
||||
* Thus the upper layer does not have to worry much about I/O blocking. This
|
||||
* is taken care of by this module both for reading and writing.
|
||||
* - All I/O is non blocking.
|
||||
* - This module can detect if a client is hanging and close the connection then.
|
||||
* Hanging is detected by not being able to write to the client for some period
|
||||
* of time.
|
||||
*
|
||||
* copyright: see file COPYRIGHT
|
||||
*
|
||||
* Mark Koennecke, January 2009
|
||||
*/
|
||||
#include <stdlib.h>
|
||||
#include <stdarg.h>
|
||||
#include <stdio.h>
|
||||
#include <assert.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <errno.h>
|
||||
#include <time.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <netdb.h>
|
||||
#include "asynnet.h"
|
||||
#include "rwpuffer.h"
|
||||
|
||||
/*--------------------------------------------------------------------------*/
|
||||
#define SERVERSOCKET 0
|
||||
#define DATASOCKET 1
|
||||
#define MAXCONNECTIONS 1024
|
||||
#define RBUFFERSIZE 262144 /* 256kb */
|
||||
#define WBUFFERSIZE 2*262144 /* 512kb */
|
||||
/*--------------------------------------------------------------------------*/
|
||||
typedef struct {
|
||||
int socket;
|
||||
int handle;
|
||||
int type;
|
||||
prwBuffer readBuffer;
|
||||
prwBuffer writeBuffer;
|
||||
time_t lastOpenForWrite;
|
||||
ANETcallback readCallback;
|
||||
void *userData;
|
||||
ANETkill killUser;
|
||||
} SocketDescriptor, *pSocketDescriptor;
|
||||
|
||||
static SocketDescriptor connections[MAXCONNECTIONS];
|
||||
static int noConnections = 0;
|
||||
static unsigned int handleID = 0;
|
||||
/*------------------------------------------------------------------------*/
|
||||
static int SocketCompare(const void *s1, const void *s2)
|
||||
{
|
||||
pSocketDescriptor socke1, socke2;
|
||||
|
||||
socke1 = (pSocketDescriptor) s1;
|
||||
socke2 = (pSocketDescriptor) s2;
|
||||
return socke1->handle - socke2->handle;
|
||||
}
|
||||
|
||||
/*------------------------------------------------------------------------*/
|
||||
static void sortConnections()
|
||||
{
|
||||
qsort(connections, noConnections,
|
||||
sizeof(SocketDescriptor), SocketCompare);
|
||||
}
|
||||
|
||||
/*------------------------------------------------------------------------*/
|
||||
static pSocketDescriptor findSocketDescriptor(int handle)
|
||||
{
|
||||
SocketDescriptor key;
|
||||
pSocketDescriptor result;
|
||||
|
||||
key.handle = handle;
|
||||
result = bsearch(&key, connections, noConnections,
|
||||
sizeof(SocketDescriptor), SocketCompare);
|
||||
return result;
|
||||
}
|
||||
|
||||
/*------------------------------------------------------------------------*/
|
||||
static ANETlog logOutput = NULL;
|
||||
static void *logUserData = NULL;
|
||||
/*------------------------------------------------------------------------*/
|
||||
static void anetLog(int level, char *fmt, ...)
|
||||
{
|
||||
va_list ap;
|
||||
char buf[256];
|
||||
char *text = NULL;
|
||||
int l;
|
||||
|
||||
if (logOutput == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
va_start(ap, fmt);
|
||||
l = vsnprintf(buf, sizeof buf, fmt, ap);
|
||||
va_end(ap);
|
||||
if (l < sizeof buf) {
|
||||
text = buf;
|
||||
logOutput(level, text, logUserData);
|
||||
} else {
|
||||
/* assuming we have a C99 conforming snprintf and need a larger buffer */
|
||||
text = calloc(l, 1);
|
||||
va_start(ap, fmt);
|
||||
vsnprintf(text, l, fmt, ap);
|
||||
va_end(ap);
|
||||
logOutput(level, text, logUserData);
|
||||
free(text);
|
||||
}
|
||||
}
|
||||
|
||||
/*============= public interface =========================================*/
|
||||
void ANETsetLog(ANETlog lcb, void *userData)
|
||||
{
|
||||
logOutput = lcb;
|
||||
logUserData = userData;
|
||||
}
|
||||
|
||||
/*------------------------------------------------------------------------*/
|
||||
int ANETopenServerPort(int iPort, ANETcallback cb, void *userData)
|
||||
{
|
||||
SocketDescriptor socke;
|
||||
int i = 1, status;
|
||||
struct sockaddr_in addresse;
|
||||
|
||||
assert(iPort > 0);
|
||||
assert(cb != NULL);
|
||||
|
||||
memset(&socke, 0, sizeof(SocketDescriptor));
|
||||
socke.socket = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (socke.socket < 0) {
|
||||
anetLog(ANETERROR, "Failed to open server port: socket: %d", iPort);
|
||||
return ANETOPENFAIL;
|
||||
}
|
||||
status =
|
||||
setsockopt(socke.socket, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(int));
|
||||
if (status < 0) {
|
||||
anetLog(ANETERROR,
|
||||
"Failed to open server port: setsockopt: %d, errno = %d",
|
||||
iPort, errno);
|
||||
return ANETOPENFAIL;
|
||||
}
|
||||
memset(&addresse, 0, sizeof(struct sockaddr_in));
|
||||
addresse.sin_family = AF_INET;
|
||||
addresse.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||
addresse.sin_port = htons(iPort);
|
||||
status = bind(socke.socket, (struct sockaddr *) &addresse,
|
||||
sizeof(struct sockaddr_in));
|
||||
if (status < 0) {
|
||||
anetLog(ANETERROR, "Failed to open server port: bind: %d, errno = %d",
|
||||
iPort, errno);
|
||||
return ANETOPENFAIL;
|
||||
}
|
||||
status = listen(socke.socket, 8);
|
||||
if (status < 0) {
|
||||
anetLog(ANETERROR, "Failed to open server port: listen: %d", iPort);
|
||||
return ANETOPENFAIL;
|
||||
}
|
||||
socke.type = SERVERSOCKET;
|
||||
socke.handle = handleID;
|
||||
handleID++;
|
||||
socke.readCallback = cb;
|
||||
socke.userData = userData;
|
||||
socke.lastOpenForWrite = time(NULL);
|
||||
connections[noConnections] = socke;
|
||||
noConnections++;
|
||||
sortConnections();
|
||||
anetLog(ANETCON, "Opened server port %d", iPort);
|
||||
return socke.handle;
|
||||
}
|
||||
|
||||
/*-----------------------------------------------------------------------*/
|
||||
int ANETregisterSocket(int socket)
|
||||
{
|
||||
SocketDescriptor socke;
|
||||
int flags, status;
|
||||
|
||||
if (noConnections >= MAXCONNECTIONS) {
|
||||
anetLog(ANETERROR, "Maximum number of connections exceeded");
|
||||
return ANETOUTOFSOCKETS;
|
||||
}
|
||||
|
||||
memset(&socke, 0, sizeof(SocketDescriptor));
|
||||
flags = fcntl(socket, F_GETFL, 0);
|
||||
status = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
|
||||
if (status < 0) {
|
||||
return ANETSOCKERROR;
|
||||
}
|
||||
socke.readBuffer = MakeRWPuffer(RBUFFERSIZE);
|
||||
socke.writeBuffer = MakeRWPuffer(WBUFFERSIZE);
|
||||
if (socke.readBuffer == NULL || socke.writeBuffer == NULL) {
|
||||
return ANETMEM;
|
||||
}
|
||||
socke.socket = socket;
|
||||
socke.handle = handleID;
|
||||
handleID++;
|
||||
socke.type = DATASOCKET;
|
||||
socke.lastOpenForWrite = time(NULL);
|
||||
connections[noConnections] = socke;
|
||||
noConnections++;
|
||||
sortConnections();
|
||||
return socke.handle;
|
||||
}
|
||||
|
||||
/*-------------------------------------------------------------------------*/
|
||||
int ANETconnect(char *name, int iPort)
|
||||
{
|
||||
struct in_addr addr;
|
||||
struct sockaddr_in addresse;
|
||||
struct hostent *host;
|
||||
int socke, status;
|
||||
|
||||
/* check for aaa.bbb.ccc.ddd first */
|
||||
addr.s_addr = inet_addr(name);
|
||||
if (addr.s_addr < 0) {
|
||||
host = gethostbyname(name);
|
||||
if (host == NULL) {
|
||||
anetLog(ANETERROR, "Failed to locate host: %s", name);
|
||||
return ANETOPENFAIL;
|
||||
}
|
||||
memcpy(&addr, host->h_addr_list, sizeof(struct in_addr));
|
||||
}
|
||||
memset(&addresse, 0, sizeof(struct sockaddr_in));
|
||||
addresse.sin_family = AF_INET;
|
||||
addresse.sin_port = iPort;
|
||||
addresse.sin_addr = addr;
|
||||
socke = socket(AF_INET, SOCK_STREAM, 0);
|
||||
status = connect(socke, (struct sockaddr *) &addresse,
|
||||
sizeof(struct sockaddr_in));
|
||||
if (status < 0) {
|
||||
anetLog(ANETERROR, "Failed to open socket to %s:%d", name, iPort);
|
||||
return ANETOPENFAIL;
|
||||
}
|
||||
anetLog(ANETCON, "Opened socket %d to %s:%d", socke, name, iPort);
|
||||
return ANETregisterSocket(socke);
|
||||
}
|
||||
|
||||
/*--------------------------------------------------------------------------*/
|
||||
void ANETclose(int handle)
|
||||
{
|
||||
pSocketDescriptor socke = NULL;
|
||||
|
||||
socke = findSocketDescriptor(handle);
|
||||
if (socke == NULL) {
|
||||
return;
|
||||
}
|
||||
close(socke->socket);
|
||||
anetLog(ANETCON, "Closed socket %d", socke->socket);
|
||||
if (socke->readBuffer != NULL) {
|
||||
KillRWBuffer(socke->readBuffer);
|
||||
}
|
||||
if (socke->writeBuffer != NULL) {
|
||||
KillRWBuffer(socke->writeBuffer);
|
||||
}
|
||||
if (socke->userData && socke->killUser) {
|
||||
socke->killUser(socke->userData);
|
||||
}
|
||||
if (noConnections > 1) {
|
||||
*socke = connections[noConnections - 1];
|
||||
noConnections--;
|
||||
sortConnections();
|
||||
} else {
|
||||
noConnections = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/*--------------------------------------------------------------------------*/
|
||||
static int anetWrite(SocketDescriptor con)
|
||||
{
|
||||
int status, length;
|
||||
void *pPtr;
|
||||
|
||||
con.lastOpenForWrite = time(NULL);
|
||||
pPtr = GetRWBufferData(con.writeBuffer, &length);
|
||||
if (length > 0) {
|
||||
status = send(con.socket, pPtr, length, 0);
|
||||
if (status < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
return 1;
|
||||
}
|
||||
ANETclose(con.handle);
|
||||
return 0;
|
||||
}
|
||||
RemoveRWBufferData(con.writeBuffer, status);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*--------------------------------------------------------------------------*/
|
||||
static int anetRead(SocketDescriptor con)
|
||||
{
|
||||
int socke, handle, status;
|
||||
unsigned int len;
|
||||
struct sockaddr addresse;
|
||||
char buffer[8192];
|
||||
|
||||
switch (con.type) {
|
||||
case SERVERSOCKET:
|
||||
len = sizeof(struct sockaddr);
|
||||
socke = accept(con.socket, &addresse, &len);
|
||||
if (socke < 0) {
|
||||
return 1;
|
||||
}
|
||||
handle = ANETregisterSocket(socke);
|
||||
if (handle > 0) {
|
||||
status = con.readCallback(handle, con.userData);
|
||||
if (status != 1) {
|
||||
ANETclose(handle);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
anetLog(ANETCON, "Accepted socket %d on port %d, handle %d",
|
||||
socke, con.socket, handle);
|
||||
break;
|
||||
case DATASOCKET:
|
||||
memset(buffer, 0, 8192);
|
||||
status = recv(con.socket, buffer, 8192, 0);
|
||||
if (status < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
return 1;
|
||||
}
|
||||
ANETclose(con.handle);
|
||||
return 0;
|
||||
} else if (status == 0) {
|
||||
/* this means EOF */
|
||||
ANETclose(con.handle);
|
||||
return 0;
|
||||
} else {
|
||||
status = StoreRWBuffer(con.readBuffer, buffer, status);
|
||||
if (status != 1) {
|
||||
anetLog(ANETERROR, "Read buffer overrun at handle %d, socket %d",
|
||||
con.handle, con.socket);
|
||||
}
|
||||
if (con.readCallback != NULL) {
|
||||
con.readCallback(con.handle, con.userData);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*---------------------------------------------------------------------------*/
|
||||
void ANETprocess(void)
|
||||
{
|
||||
int i, status, count = 0, socke = 0;
|
||||
fd_set readMask, writeMask;
|
||||
struct timeval tmo = { 0, 10 };
|
||||
|
||||
FD_ZERO(&readMask);
|
||||
FD_ZERO(&writeMask);
|
||||
for (i = 0; i < noConnections; i++) {
|
||||
socke = connections[i].socket;
|
||||
FD_SET(socke, &readMask);
|
||||
FD_SET(socke, &writeMask);
|
||||
if (socke > count) {
|
||||
count = socke;
|
||||
}
|
||||
}
|
||||
|
||||
count++;
|
||||
status = select(count, &readMask, &writeMask, NULL, &tmo);
|
||||
if (status < 0) {
|
||||
if (errno == EINTR) {
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* I always jump out of this loop when a socket is created or closed
|
||||
* because then the order in the connections array is no longer valid.
|
||||
* Try again the next time round.
|
||||
*/
|
||||
for (i = 0; i < noConnections; i++) {
|
||||
socke = connections[i].socket;
|
||||
if (FD_ISSET(socke, &readMask)) {
|
||||
if (!anetRead(connections[i])) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (FD_ISSET(socke, &writeMask)) {
|
||||
if (!anetWrite(connections[i])) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
/*
|
||||
* if I could not write to the socket for three minutes,
|
||||
* the socket is considered broken and is closed
|
||||
*/
|
||||
if (time(NULL) > connections[i].lastOpenForWrite + 180 &&
|
||||
connections[i].type == DATASOCKET) {
|
||||
ANETclose(connections[i].handle);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*--------------------------------------------------------------------------*/
|
||||
int ANETvalidHandle(int handle)
|
||||
{
|
||||
pSocketDescriptor con = NULL;
|
||||
|
||||
con = findSocketDescriptor(handle);
|
||||
if (con != NULL) {
|
||||
return ANETOK;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/*---------------------------------------------------------------------------*/
|
||||
int ANETinfo(int handle, char *hostname, int hostnameLen)
|
||||
{
|
||||
pSocketDescriptor con = NULL;
|
||||
struct sockaddr_in sin;
|
||||
struct hostent *host;
|
||||
socklen_t len;
|
||||
|
||||
con = findSocketDescriptor(handle);
|
||||
if (con == NULL) {
|
||||
return ANETDISCONNECTED;
|
||||
} else {
|
||||
if (getpeername(con->socket, (struct sockaddr *) &sin, &len) < 0) {
|
||||
return ANETSOCKERROR;
|
||||
}
|
||||
if ((host = gethostbyaddr((char *) &sin.sin_addr,
|
||||
sizeof(sin.sin_addr), AF_INET)) == NULL) {
|
||||
return ANETSOCKERROR;
|
||||
}
|
||||
memset(hostname, 0, hostnameLen);
|
||||
strncpy(hostname, host->h_name, hostnameLen);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*---------------------------------------------------------------------------*/
|
||||
int ANETwrite(int handle, void *buffer, int count)
|
||||
{
|
||||
pSocketDescriptor con = NULL;
|
||||
int status;
|
||||
|
||||
con = findSocketDescriptor(handle);
|
||||
if (con == NULL) {
|
||||
return ANETDISCONNECTED;
|
||||
} else {
|
||||
status = StoreRWBuffer(con->writeBuffer, buffer, count);
|
||||
/*
|
||||
first try if ANETprocess can write some and free the buffer
|
||||
before giving up
|
||||
*/
|
||||
if (status != 1) {
|
||||
ANETprocess();
|
||||
status = StoreRWBuffer(con->writeBuffer, buffer, count);
|
||||
}
|
||||
if (status != 1) {
|
||||
anetLog(ANETERROR, "write buffer overrun on handle %d, socket %d",
|
||||
con->handle, con->socket);
|
||||
return ANETWRITEBUFFERFULL;
|
||||
}
|
||||
}
|
||||
return ANETOK;
|
||||
}
|
||||
|
||||
/*---------------------------------------------------------------------------*/
|
||||
int ANETread(int handle, void *buffer, int bufferLength)
|
||||
{
|
||||
pSocketDescriptor con = NULL;
|
||||
int status, length, len;
|
||||
void *data = NULL;
|
||||
|
||||
con = findSocketDescriptor(handle);
|
||||
if (con == NULL) {
|
||||
return ANETDISCONNECTED;
|
||||
} else {
|
||||
data = GetRWBufferData(con->readBuffer, &length);
|
||||
if (length == 0) {
|
||||
len = 0;
|
||||
} else if (length >= bufferLength) {
|
||||
len = bufferLength;
|
||||
} else {
|
||||
len = length;
|
||||
}
|
||||
if (len > 0) {
|
||||
memcpy(buffer, data, len);
|
||||
}
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
/*---------------------------------------------------------------------------*/
|
||||
void *ANETreadPtr(int handle, int *length)
|
||||
{
|
||||
pSocketDescriptor con = NULL;
|
||||
void *data = NULL;
|
||||
|
||||
con = findSocketDescriptor(handle);
|
||||
if (con == NULL) {
|
||||
return NULL;
|
||||
} else {
|
||||
data = GetRWBufferData(con->readBuffer, length);
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
/*---------------------------------------------------------------------------*/
|
||||
void ANETreadConsume(int handle, int count)
|
||||
{
|
||||
pSocketDescriptor con = NULL;
|
||||
|
||||
con = findSocketDescriptor(handle);
|
||||
if (con == NULL) {
|
||||
return;
|
||||
} else {
|
||||
RemoveRWBufferData(con->readBuffer, count);
|
||||
}
|
||||
}
|
||||
|
||||
/*---------------------------------------------------------------------------*/
|
||||
void ANETsetReadCallback(int handle, ANETcallback cb,
|
||||
void *userData, ANETkill killUser)
|
||||
{
|
||||
pSocketDescriptor con = NULL;
|
||||
|
||||
con = findSocketDescriptor(handle);
|
||||
if (con == NULL) {
|
||||
return;
|
||||
} else {
|
||||
con->readCallback = cb;
|
||||
con->userData = userData;
|
||||
con->killUser = killUser;
|
||||
}
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
int ANETreadTillTerm(int handle,
|
||||
ANETtermCallback tcb, void *termData,
|
||||
ANETwait wcb, void *waitData, char **buffer)
|
||||
{
|
||||
pSocketDescriptor con = NULL;
|
||||
char *data;
|
||||
int length, status;
|
||||
|
||||
while (wcb(waitData) > 0) {
|
||||
ANETprocess();
|
||||
con = findSocketDescriptor(handle);
|
||||
if (con == NULL) {
|
||||
return ANETDISCONNECTED;
|
||||
}
|
||||
data = GetRWBufferData(con->readBuffer, &length);
|
||||
if (length > 0) {
|
||||
status = tcb(data, length, termData);
|
||||
if (status > 0) {
|
||||
*buffer = malloc(status * sizeof(char));
|
||||
if (*buffer != NULL) {
|
||||
memcpy(*buffer, data, status);
|
||||
}
|
||||
RemoveRWBufferData(con->readBuffer, status);
|
||||
return ANETOK;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ANETTIMEOUT;
|
||||
}
|
Reference in New Issue
Block a user