implement exponential backoff of reconnect (and some debug logging)

r3791 | dcl | 2012-11-13 14:07:21 +1100 (Tue, 13 Nov 2012) | 1 line
This commit is contained in:
Douglas Clowes
2012-11-13 14:07:21 +11:00
parent fd064f3df9
commit 41d57074bf

View File

@ -16,6 +16,7 @@
#include <netinet/tcp.h>
#include <netdb.h>
#include <ctype.h>
#include <stdarg.h>
#include <sics.h>
#include <rs232controller.h>
#include "network.h"
@ -42,6 +43,8 @@ struct __AsyncUnit {
void* notify_cntx;
};
typedef enum {eAsyncIdle, eAsyncWaiting, eAsyncConnecting, eAsyncConnected} AsyncState;
struct __AsyncQueue {
pObjectDescriptor pDes;
char* queue_name;
@ -50,6 +53,7 @@ struct __AsyncQueue {
int iDelay; /* intercommand delay in milliseconds */
int timeout;
int retries;
int retryTimer; /* mSec delay before next retry */
bool translate; /* translate binary output with escaped chars */
struct timeval tvLastCmd; /* time of completion of last command */
int unit_count; /* number of units connected */
@ -59,6 +63,7 @@ struct __AsyncQueue {
pNWContext nw_ctx; /* NetWait context handle */
pNWTimer nw_tmr; /* NetWait timer handle */
mkChannel* pSock; /* socket address */
AsyncState state; /* Queue Connection State */
pAsyncProtocol protocol;
void* context; /**< opaque caller queue context */
};
@ -66,6 +71,20 @@ struct __AsyncQueue {
static pAsyncQueue queue_array[FD_SETSIZE];
static int queue_index = 0;
static const char* state_name(AsyncState the_state) {
switch (the_state) {
case eAsyncIdle:
return "eAsyncIdle";
case eAsyncWaiting:
return "eAsyncWaiting";
case eAsyncConnecting:
return "eAsyncConnecting";
case eAsyncConnected:
return "eAsyncConnected";
}
return "<unknown>";
}
/* ---------------------------- Local ------------------------------------
CreateSocketAdress stolen from Tcl. Thanks to John Ousterhout
*/
@ -108,11 +127,59 @@ CreateSocketAdress(
static void AQ_Notify(pAsyncQueue self, int event)
{
pAsyncUnit unit;
if (self->state != eAsyncConnected)
SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__);
for (unit = self->units; unit; unit = unit->next)
if (unit->notify_func != NULL)
unit->notify_func(unit->notify_cntx, event);
}
static int TimedReconnect(void* cntx, int mode)
{
int iRet;
char line[132];
pAsyncQueue self = (pAsyncQueue) cntx;
if (self->state != eAsyncConnected)
SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__);
iRet = NETReconnect(self->pSock);
/*
* iRet can take the following values:
* -1: The request failed
* 0: The request is still in progress
* +1: The request succeeded
*/
if (iRet < 0) {
snprintf(line, 132, "Failed reconnect on AsyncQueue '%s'", self->queue_name);
SICSLogWrite(line, eStatus);
NetWatchSetMode(self->nw_ctx, 0);
/* implement an exponential backoff within limits */
self->retryTimer = 2 * self->retryTimer;
if (self->retryTimer < 250)
self->retryTimer = 250;
if (self->retryTimer > 30000)
self->retryTimer = 30000;
NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer, TimedReconnect, self);
SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n", self->queue_name, __func__, state_name(self->state));
self->state = eAsyncWaiting;
}
else if (iRet == 0) {
snprintf(line, 132, "Inprogress reconnect on AsyncQueue '%s'", self->queue_name);
NetWatchSetMode(self->nw_ctx, nwatch_write);
SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnecting\n", self->queue_name, __func__, state_name(self->state));
self->state = eAsyncConnecting;
}
else {
snprintf(line, 132, "Reconnect on AsyncQueue '%s'", self->queue_name);
SICSLogWrite(line, eStatus);
AQ_Notify(self, AQU_RECONNECT);
NetWatchSetMode(self->nw_ctx, nwatch_read);
SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnected\n", self->queue_name, __func__, state_name(self->state));
self->state = eAsyncConnected;
}
return 1;
}
static int AQ_Reconnect(pAsyncQueue self)
{
int iRet;
@ -120,6 +187,14 @@ static int AQ_Reconnect(pAsyncQueue self)
int flag = 1;
char line[132];
if (self->state != eAsyncConnected)
SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__);
/*
* Remove any old timer
*/
if (self->nw_tmr)
NetWatchRemoveTimer(self->nw_tmr);
iRet = NETReconnect(self->pSock);
/*
* iRet can take the following values:
@ -131,8 +206,24 @@ static int AQ_Reconnect(pAsyncQueue self)
snprintf(line, 132, "Disconnect on AsyncQueue '%s'", self->queue_name);
SICSLogWrite(line, eStatus);
AQ_Notify(self, AQU_DISCONNECT);
if (iRet < 0) {
/* TODO Timer for retry */
NetWatchSetMode(self->nw_ctx, 0);
self->retryTimer = 125; /* initial delay */
NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer, TimedReconnect, self);
SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n", self->queue_name, __func__, state_name(self->state));
self->state = eAsyncWaiting;
}
else {
NetWatchSetMode(self->nw_ctx, nwatch_write);
SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnecting\n", self->queue_name, __func__, state_name(self->state));
self->state = eAsyncConnecting;
/* TODO await reconnect result */
}
return iRet;
}
SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnected\n", self->queue_name, __func__, state_name(self->state));
self->state = eAsyncConnected;
snprintf(line, 132, "Reconnect on AsyncQueue '%s'", self->queue_name);
SICSLogWrite(line, eStatus);
AQ_Notify(self, AQU_RECONNECT);
@ -149,6 +240,8 @@ static int StartCommand(pAsyncQueue self)
mkChannel* sock = self->pSock;
int iRet = 0;
if (self->state != eAsyncConnected)
SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__);
if (myCmd == NULL)
return OKOK;
@ -319,6 +412,8 @@ static int CommandTimeout(void* cntx, int mode)
static int DelayedStart(void* cntx, int mode)
{
pAsyncQueue self = (pAsyncQueue) cntx;
if (self->state != eAsyncConnected)
SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__);
self->nw_tmr = 0;
StartCommand(self);
return 1;
@ -328,6 +423,8 @@ static int MyCallback(void* context, int mode)
{
pAsyncQueue self = (pAsyncQueue) context;
if (self->state != eAsyncConnected)
SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__);
if (mode & nwatch_read) {
int iRet;
char reply[100];
@ -379,6 +476,13 @@ static int MyCallback(void* context, int mode)
}
}
}
if (mode & nwatch_write) {
char line[132];
SICSLogPrintf(eStatus, "Writeable socket callback on AsyncQueue %s", self->queue_name);
NetWatchSetMode(self->nw_ctx, nwatch_read);
SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnected\n", self->queue_name, __func__, state_name(self->state));
self->state = eAsyncConnected;
}
return 1;
}
@ -905,6 +1009,7 @@ static int AQ_Init(pAsyncQueue self)
self->pSock->sockid,
MyCallback,
self);
NetWatchSetMode(self->nw_ctx, nwatch_write | nwatch_read);
return 1;
}