Reference count async queue transactions
so we can put them of task message queues
This commit is contained in:
@ -33,6 +33,7 @@ typedef enum {
|
|||||||
#define AQU_POP_CMD -5
|
#define AQU_POP_CMD -5
|
||||||
|
|
||||||
struct __async_txn {
|
struct __async_txn {
|
||||||
|
int ref_counter; /**< reference counter (reference counted object) */
|
||||||
pAsyncUnit unit; /**< unit that transaction is associated with */
|
pAsyncUnit unit; /**< unit that transaction is associated with */
|
||||||
int txn_state; /**< protocol handler transaction parse state */
|
int txn_state; /**< protocol handler transaction parse state */
|
||||||
ATX_STATUS txn_status; /**< status of the transaction OK, Error, ... */
|
ATX_STATUS txn_status; /**< status of the transaction OK, Error, ... */
|
||||||
|
31
asyncqueue.c
31
asyncqueue.c
@ -90,13 +90,13 @@ static const char *state_name(AsyncState the_state)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Free the command and transaction structures and their contents
|
* Free the transaction and buffers
|
||||||
*/
|
*/
|
||||||
static void free_command(pAQ_Cmd myCmd)
|
static void free_transaction(pAsyncTxn myTxn)
|
||||||
{
|
{
|
||||||
if (myCmd) {
|
|
||||||
pAsyncTxn myTxn = myCmd->tran;
|
|
||||||
if (myTxn) {
|
if (myTxn) {
|
||||||
|
if (--myTxn->ref_counter > 0)
|
||||||
|
return;
|
||||||
/*
|
/*
|
||||||
* Allow kill_private to clean it all if it wants
|
* Allow kill_private to clean it all if it wants
|
||||||
*/
|
*/
|
||||||
@ -110,6 +110,15 @@ static void free_command(pAQ_Cmd myCmd)
|
|||||||
free(myTxn->proto_private);
|
free(myTxn->proto_private);
|
||||||
free(myTxn);
|
free(myTxn);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Free the command and transaction structures and their contents
|
||||||
|
*/
|
||||||
|
static void free_command(pAQ_Cmd myCmd)
|
||||||
|
{
|
||||||
|
if (myCmd) {
|
||||||
|
free_transaction(myCmd->tran);
|
||||||
free(myCmd);
|
free(myCmd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -735,9 +744,23 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
|
|||||||
myTxn->unit = unit;
|
myTxn->unit = unit;
|
||||||
myTxn->handleResponse = callback;
|
myTxn->handleResponse = callback;
|
||||||
myTxn->cntx = context;
|
myTxn->cntx = context;
|
||||||
|
myTxn->ref_counter = 1;
|
||||||
return myTxn;
|
return myTxn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pAsyncTxn AsyncUnitHoldTxn(pAsyncTxn txn)
|
||||||
|
{
|
||||||
|
if (txn)
|
||||||
|
txn->ref_counter++;
|
||||||
|
return txn;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncUnitFreeTxn(pAsyncTxn txn)
|
||||||
|
{
|
||||||
|
if (txn)
|
||||||
|
free_transaction(txn);
|
||||||
|
}
|
||||||
|
|
||||||
int AsyncUnitSendTxn(pAsyncUnit unit,
|
int AsyncUnitSendTxn(pAsyncUnit unit,
|
||||||
const char *command, int cmd_len,
|
const char *command, int cmd_len,
|
||||||
AsyncTxnHandler callback, void *context, int rsp_len)
|
AsyncTxnHandler callback, void *context, int rsp_len)
|
||||||
|
10
asyncqueue.h
10
asyncqueue.h
@ -75,6 +75,16 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
|
|||||||
AsyncTxnHandler responseHandler,
|
AsyncTxnHandler responseHandler,
|
||||||
void *context, int rsp_len);
|
void *context, int rsp_len);
|
||||||
|
|
||||||
|
/** \brief clone a pointer to the transaction - must use AsyncUnitFreeTxn
|
||||||
|
* \param txn the transaction to clone
|
||||||
|
*/
|
||||||
|
pAsyncTxn AsyncUnitHoldTxn(pAsyncTxn txn);
|
||||||
|
|
||||||
|
/** \brief free a cloned transaction
|
||||||
|
* \param txn the transaction to free (returned by AsyncUnitHoldTxn)
|
||||||
|
*/
|
||||||
|
void AsyncUnitFreeTxn(pAsyncTxn txn);
|
||||||
|
|
||||||
/** \brief prepare and queue a transaction
|
/** \brief prepare and queue a transaction
|
||||||
*
|
*
|
||||||
* \param unit AsyncUnit
|
* \param unit AsyncUnit
|
||||||
|
Reference in New Issue
Block a user