*** empty log message ***
This commit is contained in:
45
asyncqueue.c
45
asyncqueue.c
@@ -8,7 +8,7 @@
|
||||
* single command channel.
|
||||
*
|
||||
* Douglas Clowes, February 2007
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
#include <sys/time.h>
|
||||
@@ -21,7 +21,6 @@
|
||||
#include "asyncqueue.h"
|
||||
#include "nwatch.h"
|
||||
|
||||
typedef struct __AsyncQueue AsyncQueue, *pAsyncQueue;
|
||||
|
||||
typedef struct __async_command AQ_Cmd, *pAQ_Cmd;
|
||||
|
||||
@@ -130,9 +129,9 @@ static int AQ_Reconnect(pAsyncQueue self)
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
static int CommandTimeout(void* cntx, int mode);
|
||||
static int DelayedStart(void* cntx, int mode);
|
||||
static int PopCommand(pAsyncQueue self);
|
||||
|
||||
static int StartCommand(pAsyncQueue self)
|
||||
{
|
||||
@@ -183,8 +182,14 @@ static int StartCommand(pAsyncQueue self)
|
||||
iRet = NETRead(sock, reply, 1, 0);
|
||||
if (iRet < 0) { /* EOF */
|
||||
iRet = AQ_Reconnect(self);
|
||||
if (iRet == 0)
|
||||
if (iRet <= 0) {
|
||||
myCmd->tran->txn_state = ATX_DISCO;
|
||||
if(myCmd->tran->handleResponse){
|
||||
myCmd->tran->handleResponse(myCmd->tran);
|
||||
}
|
||||
PopCommand(self);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
@@ -239,7 +244,6 @@ static int QueCommand(pAsyncQueue self, pAQ_Cmd cmd)
|
||||
self->command_tail = cmd;
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int PopCommand(pAsyncQueue self)
|
||||
{
|
||||
pAQ_Cmd myCmd = self->command_head;
|
||||
@@ -306,10 +310,22 @@ static int MyCallback(void* context, int mode)
|
||||
char reply[1];
|
||||
|
||||
iRet = NETRead(self->pSock, reply, 1, 0);
|
||||
/* printf(" iRet, char = %d, %d\n", iRet, (int)reply[0]); */
|
||||
if (iRet < 0) { /* EOF */
|
||||
iRet = AQ_Reconnect(self);
|
||||
if (iRet <= 0)
|
||||
if (iRet <= 0){
|
||||
/* changed to call handleResponse with a bad status code: MK
|
||||
*/
|
||||
pAQ_Cmd myCmd = self->command_head;
|
||||
if(myCmd){
|
||||
myCmd->tran->txn_state = ATX_DISCO;
|
||||
if(myCmd->tran->handleResponse){
|
||||
myCmd->tran->handleResponse(myCmd->tran);
|
||||
}
|
||||
PopCommand(self);
|
||||
}
|
||||
return iRet;
|
||||
}
|
||||
/* restart the command */
|
||||
StartCommand(self);
|
||||
return 1;
|
||||
@@ -336,7 +352,7 @@ static int MyCallback(void* context, int mode)
|
||||
return 1;
|
||||
}
|
||||
|
||||
int AsyncUnitEnqueHead(pAsyncUnit unit, pAsyncTxn context)
|
||||
int AsyncUnitEnqueueHead(pAsyncUnit unit, pAsyncTxn context)
|
||||
{
|
||||
pAQ_Cmd myCmd = NULL;
|
||||
|
||||
@@ -412,6 +428,9 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
|
||||
if (rsp_len == 0)
|
||||
myTxn->inp_buf = NULL;
|
||||
else {
|
||||
if(myTxn->inp_buf != NULL){
|
||||
free(myTxn->inp_buf);
|
||||
}
|
||||
myTxn->inp_buf = malloc(rsp_len + 1);
|
||||
if (myTxn->inp_buf == NULL) {
|
||||
SICSLogWrite("ERROR: Out of memory in AsyncUnitPrepareTxn", eError);
|
||||
@@ -954,3 +973,15 @@ int AsyncUnitDestroy(pAsyncUnit unit)
|
||||
return 1;
|
||||
}
|
||||
|
||||
pAsyncUnit AsyncUnitFromQueue(pAsyncQueue queue){
|
||||
pAsyncUnit result = NULL;
|
||||
|
||||
result = malloc(sizeof(AsyncUnit));
|
||||
if(result == NULL){
|
||||
return NULL;
|
||||
}
|
||||
memset(result,0,sizeof(AsyncUnit));
|
||||
result->queue = queue;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user