diff --git a/asyncqueue.c b/asyncqueue.c index af2b59a1..e24c0b7f 100644 --- a/asyncqueue.c +++ b/asyncqueue.c @@ -124,12 +124,30 @@ static int CreateSocketAdress(struct sockaddr_in *sockaddrPtr, /* Socket addres return 1; } +static int AQ_ClearTimer(pAsyncQueue self) { + if (self->nw_tmr) { + NetWatchRemoveTimer(self->nw_tmr); + self->nw_tmr = 0; + return 1; + } + return 0; +} + +static int AQ_SetTimer(pAsyncQueue self, int msecs, pNWCallback callback, void *context) { + int ret = 1; + if (self->nw_tmr) { + ret = AQ_ClearTimer(self); + } + NetWatchRegisterTimer(&self->nw_tmr, msecs, callback, context); + return ret; +} + static void AQ_Purge(pAsyncQueue self) { pAQ_Cmd myCmd = self->command_head; if (self->nw_tmr) - NetWatchRemoveTimer(self->nw_tmr); - self->nw_tmr = 0; + AQ_ClearTimer(self); + gettimeofday(&self->tvLastCmd, NULL); while (myCmd) { /* Process any callback */ @@ -195,7 +213,7 @@ static int TimedReconnect(void *cntx, int mode) self->retryTimer = 125; if (self->retryTimer > 16000) self->retryTimer = 16000; - NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer, + AQ_SetTimer(self, self->retryTimer, TimedReconnect, self); SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n", self->queue_name, __func__, state_name(self->state)); @@ -231,7 +249,7 @@ static int AQ_Reconnect(pAsyncQueue self) * Remove any old timer */ if (self->nw_tmr) - NetWatchRemoveTimer(self->nw_tmr); + AQ_ClearTimer(self); if (self->state == eAsyncConnected) { self->state = eAsyncIdle; @@ -253,7 +271,7 @@ static int AQ_Reconnect(pAsyncQueue self) NetWatchSetMode(self->nw_ctx, 0); /* implement an exponential backoff within limits */ self->retryTimer = 125; /* initial delay */ - NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer, + AQ_SetTimer(self, self->retryTimer, TimedReconnect, self); SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n", self->queue_name, __func__, state_name(self->state)); @@ -297,7 +315,7 @@ static int StartCommand(pAsyncQueue self) * Remove any old command timeout timer */ if (self->nw_tmr) - NetWatchRemoveTimer(self->nw_tmr); + AQ_ClearTimer(self); /* * Implement the inter-command delay @@ -318,7 +336,7 @@ static int StartCommand(pAsyncQueue self) int delay = when.tv_sec - now.tv_sec; delay *= 1000; delay += (when.tv_usec - now.tv_usec + (1000 - 1)) / 1000; - NetWatchRegisterTimer(&self->nw_tmr, delay, DelayedStart, self); + AQ_SetTimer(self, delay, DelayedStart, self); return OKOK; } } @@ -357,10 +375,10 @@ static int StartCommand(pAsyncQueue self) * Add a new command timeout timer */ if (myCmd->timeout > 0) - NetWatchRegisterTimer(&self->nw_tmr, myCmd->timeout, + AQ_SetTimer(self, myCmd->timeout, CommandTimeout, self); else - NetWatchRegisterTimer(&self->nw_tmr, 30000, CommandTimeout, self); + AQ_SetTimer(self, 30000, CommandTimeout, self); myCmd->active = 1; return iRet; } @@ -448,8 +466,8 @@ static int PopCommand(pAsyncQueue self) { pAQ_Cmd myCmd = self->command_head; if (self->nw_tmr) - NetWatchRemoveTimer(self->nw_tmr); - self->nw_tmr = 0; + AQ_ClearTimer(self); + gettimeofday(&self->tvLastCmd, NULL); /* Process any callback */ if (myCmd->tran->handleResponse) @@ -1081,7 +1099,7 @@ static pAsyncQueue AQ_Create(const char *host, const char *port) #if 0 if (channel == NULL) { /* TODO: all the rest */ - NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer, TimedReconnect, + AQ_SetTimer(self, self->retryTimer, TimedReconnect, self); } #endif @@ -1111,7 +1129,7 @@ static void AQ_Kill(void *pData) if (self->nw_ctx) NetWatchRemoveCallback(self->nw_ctx); if (self->nw_tmr) - NetWatchRemoveTimer(self->nw_tmr); + AQ_ClearTimer(self); if (self->queue_name) free(self->queue_name); NETClosePort(self->pSock);