Move some definitions and add something private
definitions from asyncqueue to asyncprotocol to reduce dependency loops add protocol-private data structure and killer routine
This commit is contained in:
@ -26,6 +26,12 @@ typedef enum {
|
|||||||
ATX_DISCO = 3
|
ATX_DISCO = 3
|
||||||
} ATX_STATUS;
|
} ATX_STATUS;
|
||||||
|
|
||||||
|
#define AQU_TIMEOUT -1
|
||||||
|
#define AQU_DISCONNECT -2
|
||||||
|
#define AQU_RECONNECT -3
|
||||||
|
#define AQU_RETRY_CMD -4
|
||||||
|
#define AQU_POP_CMD -5
|
||||||
|
|
||||||
struct __async_txn {
|
struct __async_txn {
|
||||||
pAsyncUnit unit; /**< unit that transaction is associated with */
|
pAsyncUnit unit; /**< unit that transaction is associated with */
|
||||||
int txn_state; /**< protocol handler transaction parse state */
|
int txn_state; /**< protocol handler transaction parse state */
|
||||||
@ -38,6 +44,8 @@ struct __async_txn {
|
|||||||
int inp_len; /**< length of input buffer */
|
int inp_len; /**< length of input buffer */
|
||||||
int inp_idx; /**< index of next character (number already received) */
|
int inp_idx; /**< index of next character (number already received) */
|
||||||
AsyncTxnHandler handleResponse; /**< Txn response handler of command sender */
|
AsyncTxnHandler handleResponse; /**< Txn response handler of command sender */
|
||||||
|
void *proto_private; /**< Protocol Private structure */
|
||||||
|
void (*kill_private) (struct __async_txn *pTxn); /**< if it needs killing */
|
||||||
void *cntx; /**< opaque context used by command sender */
|
void *cntx; /**< opaque context used by command sender */
|
||||||
/* The cntx field may be used by protocol handler from sendCommand
|
/* The cntx field may be used by protocol handler from sendCommand
|
||||||
* as long as it is restored when response is complete
|
* as long as it is restored when response is complete
|
||||||
|
14
asyncqueue.c
14
asyncqueue.c
@ -167,6 +167,11 @@ static void AQ_Purge(pAsyncQueue self)
|
|||||||
self->command_head = self->command_tail = NULL;
|
self->command_head = self->command_tail = NULL;
|
||||||
free(myCmd->tran->out_buf);
|
free(myCmd->tran->out_buf);
|
||||||
free(myCmd->tran->inp_buf);
|
free(myCmd->tran->inp_buf);
|
||||||
|
if (myCmd->tran->proto_private)
|
||||||
|
if (myCmd->tran->kill_private)
|
||||||
|
myCmd->tran->kill_private(myCmd->tran);
|
||||||
|
else
|
||||||
|
free(myCmd->tran->proto_private);
|
||||||
free(myCmd->tran);
|
free(myCmd->tran);
|
||||||
free(myCmd);
|
free(myCmd);
|
||||||
myCmd = self->command_head;
|
myCmd = self->command_head;
|
||||||
@ -455,6 +460,11 @@ static int PopCommand(pAsyncQueue self)
|
|||||||
self->command_head = self->command_tail = NULL;
|
self->command_head = self->command_tail = NULL;
|
||||||
free(myCmd->tran->out_buf);
|
free(myCmd->tran->out_buf);
|
||||||
free(myCmd->tran->inp_buf);
|
free(myCmd->tran->inp_buf);
|
||||||
|
if (myCmd->tran->proto_private)
|
||||||
|
if (myCmd->tran->kill_private)
|
||||||
|
myCmd->tran->kill_private(myCmd->tran);
|
||||||
|
else
|
||||||
|
free(myCmd->tran->proto_private);
|
||||||
free(myCmd->tran);
|
free(myCmd->tran);
|
||||||
free(myCmd);
|
free(myCmd);
|
||||||
return 1;
|
return 1;
|
||||||
@ -531,7 +541,7 @@ static int MyCallback(void *context, int mode)
|
|||||||
for (i = 0; i < nchars; ++i) {
|
for (i = 0; i < nchars; ++i) {
|
||||||
iRet =
|
iRet =
|
||||||
self->protocol->handleInput(self->protocol, myCmd->tran,
|
self->protocol->handleInput(self->protocol, myCmd->tran,
|
||||||
reply[i]);
|
reply[i] & 0xFF);
|
||||||
if (iRet == 0 || iRet == AQU_POP_CMD) { /* end of command */
|
if (iRet == 0 || iRet == AQU_POP_CMD) { /* end of command */
|
||||||
if (self->trace) {
|
if (self->trace) {
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
@ -666,6 +676,7 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
|
|||||||
}
|
}
|
||||||
if (unit->queue->protocol->prepareTxn) {
|
if (unit->queue->protocol->prepareTxn) {
|
||||||
int iRet;
|
int iRet;
|
||||||
|
myTxn->inp_len = rsp_len; /* allowing protocol to change it */
|
||||||
iRet =
|
iRet =
|
||||||
unit->queue->protocol->prepareTxn(unit->queue->protocol, myTxn,
|
unit->queue->protocol->prepareTxn(unit->queue->protocol, myTxn,
|
||||||
command, cmd_len, rsp_len);
|
command, cmd_len, rsp_len);
|
||||||
@ -673,6 +684,7 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
|
|||||||
free(myTxn);
|
free(myTxn);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
rsp_len = myTxn->inp_len; /* allowed protocol to change it */
|
||||||
} else {
|
} else {
|
||||||
myTxn->out_buf = (char *) malloc(cmd_len + 5);
|
myTxn->out_buf = (char *) malloc(cmd_len + 5);
|
||||||
if (myTxn->out_buf == NULL) {
|
if (myTxn->out_buf == NULL) {
|
||||||
|
@ -11,12 +11,6 @@
|
|||||||
|
|
||||||
#include "asyncprotocol.h"
|
#include "asyncprotocol.h"
|
||||||
|
|
||||||
#define AQU_TIMEOUT -1
|
|
||||||
#define AQU_DISCONNECT -2
|
|
||||||
#define AQU_RECONNECT -3
|
|
||||||
#define AQU_RETRY_CMD -4
|
|
||||||
#define AQU_POP_CMD -5
|
|
||||||
|
|
||||||
typedef struct __AsyncQueue AsyncQueue, *pAsyncQueue;
|
typedef struct __AsyncQueue AsyncQueue, *pAsyncQueue;
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user