diff --git a/asyncqueue.c b/asyncqueue.c index 232d96b5..f8afb941 100644 --- a/asyncqueue.c +++ b/asyncqueue.c @@ -124,6 +124,34 @@ static int CreateSocketAdress(struct sockaddr_in *sockaddrPtr, /* Socket addres return 1; } +static void AQ_Purge(pAsyncQueue self) +{ + pAQ_Cmd myCmd = self->command_head; + if (self->nw_tmr) + NetWatchRemoveTimer(self->nw_tmr); + self->nw_tmr = 0; + gettimeofday(&self->tvLastCmd, NULL); + while (myCmd) { + /* Process any callback */ + if (myCmd->tran->handleResponse) { + myCmd->tran->txn_status = ATX_TIMEOUT; /* TODO should be ATX_DISCO */ + myCmd->tran->handleResponse(myCmd->tran); + } + /* + * Remove this transaction from the queue + */ + if (myCmd->next) { + self->command_head = myCmd->next; + } else + self->command_head = self->command_tail = NULL; + free(myCmd->tran->out_buf); + free(myCmd->tran->inp_buf); + free(myCmd->tran); + free(myCmd); + myCmd = self->command_head; + } +} + static void AQ_Notify(pAsyncQueue self, int event) { pAsyncUnit unit; @@ -205,6 +233,13 @@ static int AQ_Reconnect(pAsyncQueue self) if (self->nw_tmr) NetWatchRemoveTimer(self->nw_tmr); + if (self->state == eAsyncConnected) { + self->state = eAsyncIdle; + SICSLogPrintf(eStatus, "Disconnect on AsyncQueue '%s'", self->queue_name); + AQ_Notify(self, AQU_DISCONNECT); + AQ_Purge(self); + } + iRet = NETReconnect(self->pSock); /* * iRet can take the following values: @@ -213,9 +248,6 @@ static int AQ_Reconnect(pAsyncQueue self) * +1: The request succeeded */ if (iRet <= 0) { - snprintf(line, 132, "Disconnect on AsyncQueue '%s'", self->queue_name); - SICSLogWrite(line, eStatus); - AQ_Notify(self, AQU_DISCONNECT); if (iRet < 0) { /* Timer for retry */ NetWatchSetMode(self->nw_ctx, 0); @@ -335,6 +367,26 @@ static int StartCommand(pAsyncQueue self) static int QueCommandHead(pAsyncQueue self, pAQ_Cmd cmd) { cmd->next = NULL; +#if 0 + if (self->state != eAsyncConnected) { + if (cmd->tran->handleResponse) { + cmd->tran->txn_status == ATX_TIMEOUT; /* TODO should be ATX_DISCO */ + cmd->tran->handleResponse(cmd->tran); + } + /* + * Remove this transaction from the queue + */ + if (cmd->next) { + self->command_head = cmd->next; + } else + self->command_head = self->command_tail = NULL; + free(cmd->tran->out_buf); + free(cmd->tran->inp_buf); + free(cmd->tran); + free(cmd); + return 0; + } +#endif /* * If the command queue is empty, start transmission */ @@ -358,6 +410,26 @@ static int QueCommandHead(pAsyncQueue self, pAQ_Cmd cmd) static int QueCommand(pAsyncQueue self, pAQ_Cmd cmd) { cmd->next = NULL; +#if 0 + if (self->state != eAsyncConnected) { + if (cmd->tran->handleResponse) { + cmd->tran->txn_status == ATX_TIMEOUT; /* TODO should be ATX_DISCO */ + cmd->tran->handleResponse(cmd->tran); + } + /* + * Remove this transaction from the queue + */ + if (cmd->next) { + self->command_head = cmd->next; + } else + self->command_head = self->command_tail = NULL; + free(cmd->tran->out_buf); + free(cmd->tran->inp_buf); + free(cmd->tran); + free(cmd); + return 0; + } +#endif /* * If the command queue is empty, start transmission */