From 41d57074bf89e925e6d6086eee8edbe7eca3400f Mon Sep 17 00:00:00 2001 From: Douglas Clowes Date: Tue, 13 Nov 2012 14:07:21 +1100 Subject: [PATCH] implement exponential backoff of reconnect (and some debug logging) r3791 | dcl | 2012-11-13 14:07:21 +1100 (Tue, 13 Nov 2012) | 1 line --- asyncqueue.c | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/asyncqueue.c b/asyncqueue.c index 11dcacc0..9faa9ed1 100644 --- a/asyncqueue.c +++ b/asyncqueue.c @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include "network.h" @@ -42,6 +43,8 @@ struct __AsyncUnit { void* notify_cntx; }; +typedef enum {eAsyncIdle, eAsyncWaiting, eAsyncConnecting, eAsyncConnected} AsyncState; + struct __AsyncQueue { pObjectDescriptor pDes; char* queue_name; @@ -50,6 +53,7 @@ struct __AsyncQueue { int iDelay; /* intercommand delay in milliseconds */ int timeout; int retries; + int retryTimer; /* mSec delay before next retry */ bool translate; /* translate binary output with escaped chars */ struct timeval tvLastCmd; /* time of completion of last command */ int unit_count; /* number of units connected */ @@ -59,6 +63,7 @@ struct __AsyncQueue { pNWContext nw_ctx; /* NetWait context handle */ pNWTimer nw_tmr; /* NetWait timer handle */ mkChannel* pSock; /* socket address */ + AsyncState state; /* Queue Connection State */ pAsyncProtocol protocol; void* context; /**< opaque caller queue context */ }; @@ -66,6 +71,20 @@ struct __AsyncQueue { static pAsyncQueue queue_array[FD_SETSIZE]; static int queue_index = 0; +static const char* state_name(AsyncState the_state) { + switch (the_state) { + case eAsyncIdle: + return "eAsyncIdle"; + case eAsyncWaiting: + return "eAsyncWaiting"; + case eAsyncConnecting: + return "eAsyncConnecting"; + case eAsyncConnected: + return "eAsyncConnected"; + } + return ""; +} + /* ---------------------------- Local ------------------------------------ CreateSocketAdress stolen from Tcl. Thanks to John Ousterhout */ @@ -108,11 +127,59 @@ CreateSocketAdress( static void AQ_Notify(pAsyncQueue self, int event) { pAsyncUnit unit; + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__); for (unit = self->units; unit; unit = unit->next) if (unit->notify_func != NULL) unit->notify_func(unit->notify_cntx, event); } +static int TimedReconnect(void* cntx, int mode) +{ + int iRet; + char line[132]; + pAsyncQueue self = (pAsyncQueue) cntx; + + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__); + iRet = NETReconnect(self->pSock); + /* + * iRet can take the following values: + * -1: The request failed + * 0: The request is still in progress + * +1: The request succeeded + */ + if (iRet < 0) { + snprintf(line, 132, "Failed reconnect on AsyncQueue '%s'", self->queue_name); + SICSLogWrite(line, eStatus); + NetWatchSetMode(self->nw_ctx, 0); + /* implement an exponential backoff within limits */ + self->retryTimer = 2 * self->retryTimer; + if (self->retryTimer < 250) + self->retryTimer = 250; + if (self->retryTimer > 30000) + self->retryTimer = 30000; + NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer, TimedReconnect, self); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n", self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncWaiting; + } + else if (iRet == 0) { + snprintf(line, 132, "Inprogress reconnect on AsyncQueue '%s'", self->queue_name); + NetWatchSetMode(self->nw_ctx, nwatch_write); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnecting\n", self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncConnecting; + } + else { + snprintf(line, 132, "Reconnect on AsyncQueue '%s'", self->queue_name); + SICSLogWrite(line, eStatus); + AQ_Notify(self, AQU_RECONNECT); + NetWatchSetMode(self->nw_ctx, nwatch_read); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnected\n", self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncConnected; + } + return 1; +} + static int AQ_Reconnect(pAsyncQueue self) { int iRet; @@ -120,6 +187,14 @@ static int AQ_Reconnect(pAsyncQueue self) int flag = 1; char line[132]; + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__); + /* + * Remove any old timer + */ + if (self->nw_tmr) + NetWatchRemoveTimer(self->nw_tmr); + iRet = NETReconnect(self->pSock); /* * iRet can take the following values: @@ -131,8 +206,24 @@ static int AQ_Reconnect(pAsyncQueue self) snprintf(line, 132, "Disconnect on AsyncQueue '%s'", self->queue_name); SICSLogWrite(line, eStatus); AQ_Notify(self, AQU_DISCONNECT); + if (iRet < 0) { + /* TODO Timer for retry */ + NetWatchSetMode(self->nw_ctx, 0); + self->retryTimer = 125; /* initial delay */ + NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer, TimedReconnect, self); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n", self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncWaiting; + } + else { + NetWatchSetMode(self->nw_ctx, nwatch_write); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnecting\n", self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncConnecting; + /* TODO await reconnect result */ + } return iRet; } + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnected\n", self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncConnected; snprintf(line, 132, "Reconnect on AsyncQueue '%s'", self->queue_name); SICSLogWrite(line, eStatus); AQ_Notify(self, AQU_RECONNECT); @@ -149,6 +240,8 @@ static int StartCommand(pAsyncQueue self) mkChannel* sock = self->pSock; int iRet = 0; + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__); if (myCmd == NULL) return OKOK; @@ -319,6 +412,8 @@ static int CommandTimeout(void* cntx, int mode) static int DelayedStart(void* cntx, int mode) { pAsyncQueue self = (pAsyncQueue) cntx; + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__); self->nw_tmr = 0; StartCommand(self); return 1; @@ -328,6 +423,8 @@ static int MyCallback(void* context, int mode) { pAsyncQueue self = (pAsyncQueue) context; + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, __func__); if (mode & nwatch_read) { int iRet; char reply[100]; @@ -379,6 +476,13 @@ static int MyCallback(void* context, int mode) } } } + if (mode & nwatch_write) { + char line[132]; + SICSLogPrintf(eStatus, "Writeable socket callback on AsyncQueue %s", self->queue_name); + NetWatchSetMode(self->nw_ctx, nwatch_read); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnected\n", self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncConnected; + } return 1; } @@ -905,6 +1009,7 @@ static int AQ_Init(pAsyncQueue self) self->pSock->sockid, MyCallback, self); + NetWatchSetMode(self->nw_ctx, nwatch_write | nwatch_read); return 1; }