SICS-760 Add NOREPLY mechanism in aqadapter and asyncqueue
This removes the expectation of a reply when sending is suffixed with "{0}" or "@@NOREPLY@@" or, for asyncqueue, a custom string.
This commit is contained in:
36
asyncqueue.c
36
asyncqueue.c
@ -67,6 +67,8 @@ struct __AsyncQueue {
|
||||
mkChannel *pSock; /* socket address */
|
||||
AsyncState state; /* Queue Connection State */
|
||||
pAsyncProtocol protocol;
|
||||
char *noreply_text;
|
||||
int noreply_len;
|
||||
void *context; /**< opaque caller queue context */
|
||||
};
|
||||
|
||||
@ -374,6 +376,10 @@ static int StartCommand(pAsyncQueue self)
|
||||
/*
|
||||
* Handle case of no response expected
|
||||
*/
|
||||
if (myCmd->tran->inp_len == 0 || myCmd->tran->inp_buf == NULL) {
|
||||
myCmd->tran->txn_status = ATX_COMPLETE;
|
||||
return PopCommand(self);
|
||||
}
|
||||
if (iRet > 0)
|
||||
if (myCmd->tran->txn_status == ATX_COMPLETE)
|
||||
return PopCommand(self);
|
||||
@ -634,6 +640,23 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
|
||||
return NULL;
|
||||
}
|
||||
memset(myTxn, 0, sizeof(AsyncTxn));
|
||||
if (unit->queue->noreply_text) {
|
||||
if (cmd_len > unit->queue->noreply_len
|
||||
&& strncasecmp(&command[cmd_len - unit->queue->noreply_len],
|
||||
unit->queue->noreply_text, unit->queue->noreply_len) == 0) {
|
||||
rsp_len = 0;
|
||||
cmd_len -= unit->queue->noreply_len;
|
||||
}
|
||||
} else {
|
||||
if (cmd_len > 3 && strncmp(&command[cmd_len - 3], "{0}", 3) == 0) {
|
||||
rsp_len = 0;
|
||||
cmd_len -= 3;
|
||||
}
|
||||
else if (cmd_len > 11 && strncasecmp(&command[cmd_len - 11], "@@NOREPLY@@", 11) == 0) {
|
||||
rsp_len = 0;
|
||||
cmd_len -= 11;
|
||||
}
|
||||
}
|
||||
if (unit->queue->protocol->prepareTxn) {
|
||||
int iRet;
|
||||
iRet =
|
||||
@ -1047,12 +1070,25 @@ int AsyncQueueAction(SConnection * pCon, SicsInterp * pSics,
|
||||
}
|
||||
return OKOK;
|
||||
}
|
||||
if (strcasecmp(argv[1], "noreply") == 0) {
|
||||
if (argc > 2) {
|
||||
if (self->noreply_text)
|
||||
free(self->noreply_text);
|
||||
self->noreply_text = strdup(argv[2]);
|
||||
self->noreply_len = strlen(argv[2]);
|
||||
} else {
|
||||
SCPrintf(pCon, eValue, "%s.noreply = %s", argv[0], self->noreply_text);
|
||||
}
|
||||
return OKOK;
|
||||
}
|
||||
if (strcasecmp(argv[1], "list") == 0) {
|
||||
SCPrintf(pCon, eValue, "%s.delay = %d", argv[0], self->iDelay);
|
||||
SCPrintf(pCon, eValue, "%s.timeout = %d", argv[0], self->timeout);
|
||||
SCPrintf(pCon, eValue, "%s.retries = %d", argv[0], self->retries);
|
||||
SCPrintf(pCon, eValue, "%s.translate = %d", argv[0], self->translate);
|
||||
SCPrintf(pCon, eValue, "%s.trace = %d", argv[0], self->trace);
|
||||
if (self->noreply_text)
|
||||
SCPrintf(pCon, eValue, "%s.noreply = %s", argv[0], self->noreply_text);
|
||||
return OKOK;
|
||||
}
|
||||
}
|
||||
|
@ -127,6 +127,7 @@ static void SCAQTransact(Ascon *a)
|
||||
AsyncUnit *unit = NULL;
|
||||
const char *command = GetCharArray(a->wrBuffer);
|
||||
int cmd_len = GetDynStringLength(a->wrBuffer);
|
||||
int rsp_len = 1024;
|
||||
pp = (pPrivate) a->private;
|
||||
assert(pp);
|
||||
unit = pp->unit;
|
||||
@ -134,7 +135,15 @@ static void SCAQTransact(Ascon *a)
|
||||
txn = &pp->txn;
|
||||
txn->transWait = 1;
|
||||
DynStringClear(a->rdBuffer);
|
||||
AsyncUnitSendTxn(unit, command, cmd_len, TransCallback, a, 1024);
|
||||
if (cmd_len > 3 && strncmp(&command[cmd_len - 3], "{0}", 3) == 0) {
|
||||
rsp_len = 0;
|
||||
cmd_len -= 3;
|
||||
}
|
||||
else if (cmd_len > 11 && strncasecmp(&command[cmd_len - 11], "@@NOREPLY@@", 11) == 0) {
|
||||
rsp_len = 0;
|
||||
cmd_len -= 11;
|
||||
}
|
||||
AsyncUnitSendTxn(unit, command, cmd_len, TransCallback, a, rsp_len);
|
||||
}
|
||||
|
||||
static int scaqaNullHandler(Ascon *a)
|
||||
|
Reference in New Issue
Block a user