Make Timer handling more robust and easier to use
This commit is contained in:
44
asyncqueue.c
44
asyncqueue.c
@@ -124,12 +124,30 @@ static int CreateSocketAdress(struct sockaddr_in *sockaddrPtr, /* Socket addres
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int AQ_ClearTimer(pAsyncQueue self) {
|
||||||
|
if (self->nw_tmr) {
|
||||||
|
NetWatchRemoveTimer(self->nw_tmr);
|
||||||
|
self->nw_tmr = 0;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int AQ_SetTimer(pAsyncQueue self, int msecs, pNWCallback callback, void *context) {
|
||||||
|
int ret = 1;
|
||||||
|
if (self->nw_tmr) {
|
||||||
|
ret = AQ_ClearTimer(self);
|
||||||
|
}
|
||||||
|
NetWatchRegisterTimer(&self->nw_tmr, msecs, callback, context);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
static void AQ_Purge(pAsyncQueue self)
|
static void AQ_Purge(pAsyncQueue self)
|
||||||
{
|
{
|
||||||
pAQ_Cmd myCmd = self->command_head;
|
pAQ_Cmd myCmd = self->command_head;
|
||||||
if (self->nw_tmr)
|
if (self->nw_tmr)
|
||||||
NetWatchRemoveTimer(self->nw_tmr);
|
AQ_ClearTimer(self);
|
||||||
self->nw_tmr = 0;
|
|
||||||
gettimeofday(&self->tvLastCmd, NULL);
|
gettimeofday(&self->tvLastCmd, NULL);
|
||||||
while (myCmd) {
|
while (myCmd) {
|
||||||
/* Process any callback */
|
/* Process any callback */
|
||||||
@@ -195,7 +213,7 @@ static int TimedReconnect(void *cntx, int mode)
|
|||||||
self->retryTimer = 125;
|
self->retryTimer = 125;
|
||||||
if (self->retryTimer > 16000)
|
if (self->retryTimer > 16000)
|
||||||
self->retryTimer = 16000;
|
self->retryTimer = 16000;
|
||||||
NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer,
|
AQ_SetTimer(self, self->retryTimer,
|
||||||
TimedReconnect, self);
|
TimedReconnect, self);
|
||||||
SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n",
|
SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n",
|
||||||
self->queue_name, __func__, state_name(self->state));
|
self->queue_name, __func__, state_name(self->state));
|
||||||
@@ -231,7 +249,7 @@ static int AQ_Reconnect(pAsyncQueue self)
|
|||||||
* Remove any old timer
|
* Remove any old timer
|
||||||
*/
|
*/
|
||||||
if (self->nw_tmr)
|
if (self->nw_tmr)
|
||||||
NetWatchRemoveTimer(self->nw_tmr);
|
AQ_ClearTimer(self);
|
||||||
|
|
||||||
if (self->state == eAsyncConnected) {
|
if (self->state == eAsyncConnected) {
|
||||||
self->state = eAsyncIdle;
|
self->state = eAsyncIdle;
|
||||||
@@ -253,7 +271,7 @@ static int AQ_Reconnect(pAsyncQueue self)
|
|||||||
NetWatchSetMode(self->nw_ctx, 0);
|
NetWatchSetMode(self->nw_ctx, 0);
|
||||||
/* implement an exponential backoff within limits */
|
/* implement an exponential backoff within limits */
|
||||||
self->retryTimer = 125; /* initial delay */
|
self->retryTimer = 125; /* initial delay */
|
||||||
NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer,
|
AQ_SetTimer(self, self->retryTimer,
|
||||||
TimedReconnect, self);
|
TimedReconnect, self);
|
||||||
SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n",
|
SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n",
|
||||||
self->queue_name, __func__, state_name(self->state));
|
self->queue_name, __func__, state_name(self->state));
|
||||||
@@ -297,7 +315,7 @@ static int StartCommand(pAsyncQueue self)
|
|||||||
* Remove any old command timeout timer
|
* Remove any old command timeout timer
|
||||||
*/
|
*/
|
||||||
if (self->nw_tmr)
|
if (self->nw_tmr)
|
||||||
NetWatchRemoveTimer(self->nw_tmr);
|
AQ_ClearTimer(self);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Implement the inter-command delay
|
* Implement the inter-command delay
|
||||||
@@ -318,7 +336,7 @@ static int StartCommand(pAsyncQueue self)
|
|||||||
int delay = when.tv_sec - now.tv_sec;
|
int delay = when.tv_sec - now.tv_sec;
|
||||||
delay *= 1000;
|
delay *= 1000;
|
||||||
delay += (when.tv_usec - now.tv_usec + (1000 - 1)) / 1000;
|
delay += (when.tv_usec - now.tv_usec + (1000 - 1)) / 1000;
|
||||||
NetWatchRegisterTimer(&self->nw_tmr, delay, DelayedStart, self);
|
AQ_SetTimer(self, delay, DelayedStart, self);
|
||||||
return OKOK;
|
return OKOK;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -357,10 +375,10 @@ static int StartCommand(pAsyncQueue self)
|
|||||||
* Add a new command timeout timer
|
* Add a new command timeout timer
|
||||||
*/
|
*/
|
||||||
if (myCmd->timeout > 0)
|
if (myCmd->timeout > 0)
|
||||||
NetWatchRegisterTimer(&self->nw_tmr, myCmd->timeout,
|
AQ_SetTimer(self, myCmd->timeout,
|
||||||
CommandTimeout, self);
|
CommandTimeout, self);
|
||||||
else
|
else
|
||||||
NetWatchRegisterTimer(&self->nw_tmr, 30000, CommandTimeout, self);
|
AQ_SetTimer(self, 30000, CommandTimeout, self);
|
||||||
myCmd->active = 1;
|
myCmd->active = 1;
|
||||||
return iRet;
|
return iRet;
|
||||||
}
|
}
|
||||||
@@ -448,8 +466,8 @@ static int PopCommand(pAsyncQueue self)
|
|||||||
{
|
{
|
||||||
pAQ_Cmd myCmd = self->command_head;
|
pAQ_Cmd myCmd = self->command_head;
|
||||||
if (self->nw_tmr)
|
if (self->nw_tmr)
|
||||||
NetWatchRemoveTimer(self->nw_tmr);
|
AQ_ClearTimer(self);
|
||||||
self->nw_tmr = 0;
|
|
||||||
gettimeofday(&self->tvLastCmd, NULL);
|
gettimeofday(&self->tvLastCmd, NULL);
|
||||||
/* Process any callback */
|
/* Process any callback */
|
||||||
if (myCmd->tran->handleResponse)
|
if (myCmd->tran->handleResponse)
|
||||||
@@ -1081,7 +1099,7 @@ static pAsyncQueue AQ_Create(const char *host, const char *port)
|
|||||||
#if 0
|
#if 0
|
||||||
if (channel == NULL) {
|
if (channel == NULL) {
|
||||||
/* TODO: all the rest */
|
/* TODO: all the rest */
|
||||||
NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer, TimedReconnect,
|
AQ_SetTimer(self, self->retryTimer, TimedReconnect,
|
||||||
self);
|
self);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@@ -1111,7 +1129,7 @@ static void AQ_Kill(void *pData)
|
|||||||
if (self->nw_ctx)
|
if (self->nw_ctx)
|
||||||
NetWatchRemoveCallback(self->nw_ctx);
|
NetWatchRemoveCallback(self->nw_ctx);
|
||||||
if (self->nw_tmr)
|
if (self->nw_tmr)
|
||||||
NetWatchRemoveTimer(self->nw_tmr);
|
AQ_ClearTimer(self);
|
||||||
if (self->queue_name)
|
if (self->queue_name)
|
||||||
free(self->queue_name);
|
free(self->queue_name);
|
||||||
NETClosePort(self->pSock);
|
NETClosePort(self->pSock);
|
||||||
|
|||||||
Reference in New Issue
Block a user