- Connections write timeouts were incorrectly handled in asynnetc. Fixed.

- Implemented the desired run/drive behaviour: drive waits for what it started
  run starts, and success waits for everything to finish. This required
  changes to a lot of files.
- Fixed a bug in remob which supressed required messages
This commit is contained in:
koennecke
2009-04-17 12:52:01 +00:00
parent 50b0a5c4a7
commit 99d2485d22
39 changed files with 422 additions and 200 deletions

View File

@ -42,7 +42,7 @@
#define DATASOCKET 1
#define MAXCONNECTIONS 1024
#define RBUFFERSIZE 262144 /* 256kb */
#define WBUFFERSIZE 2*262144 /* 512kb */
#define WBUFFERSIZE 4*262144 /* 512kb */
/*--------------------------------------------------------------------------*/
typedef struct {
int socket;
@ -233,8 +233,8 @@ int ANETconnect(char *name, int iPort)
}
memset(&addresse, 0, sizeof(struct sockaddr_in));
addresse.sin_family = AF_INET;
addresse.sin_port = iPort;
addresse.sin_addr = addr;
addresse.sin_port = htons((unsigned short)(iPort &0xFFFF));
addresse.sin_addr.s_addr = addr.s_addr;
socke = socket(AF_INET, SOCK_STREAM, 0);
status = connect(socke, (struct sockaddr *) &addresse,
sizeof(struct sockaddr_in));
@ -281,12 +281,11 @@ static int anetWrite(SocketDescriptor con)
int status, length;
void *pPtr;
con.lastOpenForWrite = time(NULL);
pPtr = GetRWBufferData(con.writeBuffer, &length);
if (length > 0) {
status = send(con.socket, pPtr, length, 0);
if (status < 0) {
if (errno == EAGAIN) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
return 1;
}
ANETclose(con.handle);
@ -296,7 +295,39 @@ static int anetWrite(SocketDescriptor con)
}
return 1;
}
/*--------------------------------------------------------------------------
* I have seen that select did not report the write possibility set,
* though the send buffer was empty. Thus I try to write if data is there
* and leave the lastOpenForWrite flag only when the socket would block
* on write.
*--------------------------------------------------------------------------*/
static int anetTestWrite(SocketDescriptor con)
{
int status, length;
void *pPtr;
time_t lastTime;
if(con.type != DATASOCKET){
return 1;
}
lastTime = con.lastOpenForWrite;
con.lastOpenForWrite = time(NULL);
pPtr = GetRWBufferData(con.writeBuffer, &length);
if (length > 0) {
status = send(con.socket, pPtr, length, 0);
if (status < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
con.lastOpenForWrite = lastTime;
return 1;
}
ANETclose(con.handle);
return 0;
}
RemoveRWBufferData(con.writeBuffer, status);
}
return 1;
}
/*--------------------------------------------------------------------------*/
static int anetRead(SocketDescriptor con)
{
@ -354,7 +385,7 @@ static int anetRead(SocketDescriptor con)
/*---------------------------------------------------------------------------*/
void ANETprocess(void)
{
int i, status, count = 0, socke = 0;
int i, status, count = 0, socke = 0, length;
fd_set readMask, writeMask;
struct timeval tmo = { 0, 10 };
@ -378,7 +409,7 @@ void ANETprocess(void)
return;
}
/**
/**
* I always jump out of this loop when a socket is created or closed
* because then the order in the connections array is no longer valid.
* Try again the next time round.
@ -386,21 +417,30 @@ void ANETprocess(void)
for (i = 0; i < noConnections; i++) {
socke = connections[i].socket;
if (FD_ISSET(socke, &readMask)) {
if (!anetRead(connections[i])) {
if (anetRead(connections[i]) == 0) {
return;
}
}
if (FD_ISSET(socke, &writeMask)) {
if (!anetWrite(connections[i])) {
/*
* This has to be here, as I found out tracing a subtle bug:
* If the time is set in writeANET the modification will not
* propagate into the array as C copies the structure for the function call.
*/
connections[i].lastOpenForWrite = time(NULL);
if (anetWrite(connections[i]) == 0) {
return;
}
} else {
/*
* if I could not write to the socket for three minutes,
* the socket is considered broken and is closed
* If I could not write to the socket for 5 minutes,
* the socket is considered broken and is closed.
*/
if (time(NULL) > connections[i].lastOpenForWrite + 180 &&
if (time(NULL) > connections[i].lastOpenForWrite + 300 &&
connections[i].type == DATASOCKET) {
GetRWBufferData(connections[i].writeBuffer, &length);
anetLog(ANETCON, "Closing socket because of time overrun: %d, delay = %d, bytes to be written = %d, write bit = %d\n", connections[i].socket,
(int)(time(NULL) - connections[i].lastOpenForWrite), length, FD_ISSET(connections[i].socket, &writeMask) );
ANETclose(connections[i].handle);
return;
}
@ -467,8 +507,8 @@ int ANETwrite(int handle, void *buffer, int count)
status = StoreRWBuffer(con->writeBuffer, buffer, count);
}
if (status != 1) {
anetLog(ANETERROR, "write buffer overrun on handle %d, socket %d",
con->handle, con->socket);
anetLog(ANETERROR, "Write buffer overrun on handle %d, socket %d, trying to write %d bytes",
con->handle, con->socket, count);
return ANETWRITEBUFFERFULL;
}
}