diff --git a/ascon.c b/ascon.c index d5d5f96a..39a7facb 100644 --- a/ascon.c +++ b/ascon.c @@ -84,6 +84,8 @@ void AsconError(Ascon *a, char *msg, int errorno) DynStringConcat(a->errmsg, " ("); DynStringConcat(a->errmsg, state); DynStringConcat(a->errmsg, " state)"); + DynStringConcat(a->errmsg, " on "); + DynStringConcat(a->errmsg, a->hostport); } else { DynStringConcat(a->errmsg, strerror(errorno)); DynStringConcat(a->errmsg, " ("); @@ -91,6 +93,8 @@ void AsconError(Ascon *a, char *msg, int errorno) DynStringConcat(a->errmsg, " state, "); DynStringConcat(a->errmsg, msg); DynStringConcat(a->errmsg, ")"); + DynStringConcat(a->errmsg, " on "); + DynStringConcat(a->errmsg, a->hostport); } a->state = AsconFailed; } @@ -803,7 +807,7 @@ char *AsconRead(Ascon * a) { if (a->noResponse) { a->noResponse = 0; - return ""; + return NULL; } if (a->state != AsconIdle) { a->state = AsconIdle; diff --git a/asyncprotocol.c b/asyncprotocol.c index 49a0118c..9fcf6827 100644 --- a/asyncprotocol.c +++ b/asyncprotocol.c @@ -31,16 +31,23 @@ int defaultSendCommand(pAsyncProtocol p, pAsyncTxn txn) int defaultHandleInput(pAsyncProtocol p, pAsyncTxn txn, int ch) { const char *term = "\r\n"; - if (p->replyTerminator) - term = p->replyTerminator; - if (ch == term[txn->txn_state]) + if (txn->txn_state == 0) { + int i; + for (i = 0; i < 10; ++i) + if (p->replyTerminator[i] && ch == p->replyTerminator[i][0]) { + txn->txn_state = i << 16; + break; + } + } + term = p->replyTerminator[txn->txn_state >> 16]; + if (ch == term[txn->txn_state & 0xffff]) ++txn->txn_state; else txn->txn_state = 0; if (txn->inp_idx < txn->inp_len) txn->inp_buf[txn->inp_idx++] = ch; - if (term[txn->txn_state] == 0) { + if (term[txn->txn_state & 0xffff] == 0) { if (txn->inp_idx < txn->inp_len) txn->inp_buf[txn->inp_idx] = '\0'; return AQU_POP_CMD; @@ -64,10 +71,7 @@ int defaultPrepareTxn(pAsyncProtocol p, pAsyncTxn txn, const char *cmd, term = p->sendTerminator; state = 0; for (i = 0; i < cmd_len; ++i) { - if (cmd[i] == 0x00) { /* end of transmission */ - cmd_len = i; - break; - } else if (cmd[i] == term[state]) { + if (cmd[i] == term[state]) { ++state; continue; } @@ -100,14 +104,6 @@ int defaultPrepareTxn(pAsyncProtocol p, pAsyncTxn txn, const char *cmd, if (txn->inp_buf != NULL) { free(txn->inp_buf); } - txn->inp_buf = malloc(rsp_len); - if (txn->inp_buf == NULL) { - SICSLogWrite("Out of memory in AsyncProtocol::defaultPrepareTxn", - eError); - free(txn->out_buf); - txn->out_buf = NULL; - return 0; - } txn->inp_len = rsp_len; txn->inp_idx = 0; txn->txn_state = 0; @@ -122,11 +118,15 @@ static void encodeTerminator(char *result, char *terminator) { if (terminator) while (*terminator) { - *result++ = '0'; - *result++ = 'x'; - *result++ = hex[(*terminator >> 4) & 0xF]; - *result++ = hex[(*terminator) & 0xF]; - ++terminator; + if (*terminator <= 32 || *terminator >= 127) { + *result++ = '0'; + *result++ = 'x'; + *result++ = hex[(*terminator >> 4) & 0xF]; + *result++ = hex[(*terminator) & 0xF]; + ++terminator; + } else { + *result++ = *terminator++; + } } *result = '\0'; return; @@ -208,7 +208,6 @@ int AsyncProtocolNoAction(SConnection * pCon, SicsInterp * pSics, void *pData, int argc, char *argv[]) { char line[132]; - pAsyncProtocol self = (pAsyncProtocol) pData; snprintf(line, 132, "%s does not understand %s", argv[0], argv[1]); SCWrite(pCon, line, eError); return 0; @@ -217,7 +216,6 @@ int AsyncProtocolNoAction(SConnection * pCon, SicsInterp * pSics, int AsyncProtocolAction(SConnection * pCon, SicsInterp * pSics, void *pData, int argc, char *argv[]) { - char line[132]; pAsyncProtocol self = (pAsyncProtocol) pData; if (argc > 1) { /* handle genecic parameters like terminators */ @@ -240,18 +238,34 @@ int AsyncProtocolAction(SConnection * pCon, SicsInterp * pSics, return 1; } else if (strcasecmp(argv[1], "replyterminator") == 0) { if (argc > 2) { - char *pPtr = decodeTerminator(argv[2]); - if (pPtr) { - if (self->replyTerminator) - free(self->replyTerminator); - self->replyTerminator = pPtr; + int i; + for (i = 0; i < 10; ++i) + if (self->replyTerminator[i]) { + free(self->replyTerminator[i]); + self->replyTerminator[i] = NULL; + } + for (i = 0; i < 10 && i < argc - 2; ++i) { + char* pPtr = decodeTerminator(argv[i + 2]); + if (pPtr) { + self->replyTerminator[i] = pPtr; + } } SCSendOK(pCon); } else { + int i; char term[132]; char line[1024]; - encodeTerminator(term, self->replyTerminator); - sprintf(line, "%s.replyTerminator = \"%s\"", argv[0], term); + term[0] = '\0'; + sprintf(line, "%s.replyTerminator =", argv[0]); + for (i = 0; i < 10; ++i) { + if (self->replyTerminator[i] == NULL) + break; + term[0] = ' '; + term[1] = '"'; + encodeTerminator(&term[2], self->replyTerminator[i]); + strcat(term, "\""); + strcat(line, term); + } SCWrite(pCon, line, eValue); } return 1; @@ -280,12 +294,14 @@ void defaultKillPrivate(pAsyncProtocol p) void AsyncProtocolKill(void *pData) { pAsyncProtocol self = (pAsyncProtocol) pData; + int i; if (self->pDes) DeleteDescriptor(self->pDes); if (self->sendTerminator != NULL) free(self->sendTerminator); - if (self->replyTerminator != NULL) - free(self->replyTerminator); + for (i = 0; i < 10; ++i) + if (self->replyTerminator[i] != NULL) + free(self->replyTerminator[i]); if (self->killPrivate) self->killPrivate(self); } @@ -328,7 +344,7 @@ pAsyncProtocol AsyncProtocolCreate(SicsInterp * pSics, self->prepareTxn = defaultPrepareTxn; self->killPrivate = defaultKillPrivate; self->sendTerminator = strdup("\r\n"); - self->replyTerminator = strdup("\r\n"); + self->replyTerminator[0] = strdup("\r\n"); return self; } diff --git a/asyncprotocol.h b/asyncprotocol.h index 3c943666..4bc183ba 100644 --- a/asyncprotocol.h +++ b/asyncprotocol.h @@ -51,7 +51,7 @@ struct __async_protocol { pObjectDescriptor pDes; char *protocolName; char *sendTerminator; - char *replyTerminator; + char *replyTerminator[10]; void *privateData; int (*sendCommand) (pAsyncProtocol p, pAsyncTxn txn); int (*handleInput) (pAsyncProtocol p, pAsyncTxn txn, int ch); diff --git a/asyncqueue.c b/asyncqueue.c index 6b75a9f8..f34c47eb 100644 --- a/asyncqueue.c +++ b/asyncqueue.c @@ -8,18 +8,21 @@ * single command channel. * * Douglas Clowes, February 2007 - * + * */ #include #include #include #include +#include +#include #include #include #include "network.h" #include "asyncqueue.h" #include "nwatch.h" +#include typedef struct __async_command AQ_Cmd, *pAQ_Cmd; @@ -40,6 +43,9 @@ struct __AsyncUnit { void *notify_cntx; }; +typedef enum { eAsyncIdle, eAsyncWaiting, eAsyncConnecting, + eAsyncConnected } AsyncState; + struct __AsyncQueue { pObjectDescriptor pDes; char *queue_name; @@ -48,6 +54,8 @@ struct __AsyncQueue { int iDelay; /* intercommand delay in milliseconds */ int timeout; int retries; + int retryTimer; /* mSec delay before next retry */ + bool translate; /* translate binary output with escaped chars */ struct timeval tvLastCmd; /* time of completion of last command */ int unit_count; /* number of units connected */ pAsyncUnit units; /* head of unit chain */ @@ -56,12 +64,29 @@ struct __AsyncQueue { pNWContext nw_ctx; /* NetWait context handle */ pNWTimer nw_tmr; /* NetWait timer handle */ mkChannel *pSock; /* socket address */ + AsyncState state; /* Queue Connection State */ pAsyncProtocol protocol; + void *context; /**< opaque caller queue context */ }; static pAsyncQueue queue_array[FD_SETSIZE]; static int queue_index = 0; +static const char *state_name(AsyncState the_state) +{ + switch (the_state) { + case eAsyncIdle: + return "eAsyncIdle"; + case eAsyncWaiting: + return "eAsyncWaiting"; + case eAsyncConnecting: + return "eAsyncConnecting"; + case eAsyncConnected: + return "eAsyncConnected"; + } + return ""; +} + /* ---------------------------- Local ------------------------------------ CreateSocketAdress stolen from Tcl. Thanks to John Ousterhout */ @@ -102,11 +127,65 @@ static int CreateSocketAdress(struct sockaddr_in *sockaddrPtr, /* Socket addres static void AQ_Notify(pAsyncQueue self, int event) { pAsyncUnit unit; + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, + __func__); for (unit = self->units; unit; unit = unit->next) if (unit->notify_func != NULL) unit->notify_func(unit->notify_cntx, event); } +static int TimedReconnect(void *cntx, int mode) +{ + int iRet; + char line[132]; + pAsyncQueue self = (pAsyncQueue) cntx; + + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, + __func__); + iRet = NETReconnect(self->pSock); + /* + * iRet can take the following values: + * -1: The request failed + * 0: The request is still in progress + * +1: The request succeeded + */ + if (iRet < 0) { + snprintf(line, 132, "Failed reconnect on AsyncQueue '%s'", + self->queue_name); + SICSLogWrite(line, eStatus); + NetWatchSetMode(self->nw_ctx, 0); + /* implement an exponential backoff within limits */ + self->retryTimer = 2 * self->retryTimer; + if (self->retryTimer < 250) + self->retryTimer = 250; + if (self->retryTimer > 30000) + self->retryTimer = 30000; + NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer, TimedReconnect, + self); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n", + self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncWaiting; + } else if (iRet == 0) { + snprintf(line, 132, "Inprogress reconnect on AsyncQueue '%s'", + self->queue_name); + NetWatchSetMode(self->nw_ctx, nwatch_write); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnecting\n", + self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncConnecting; + } else { + snprintf(line, 132, "Reconnect on AsyncQueue '%s'", self->queue_name); + SICSLogWrite(line, eStatus); + AQ_Notify(self, AQU_RECONNECT); + NetWatchSetMode(self->nw_ctx, nwatch_read); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnected\n", + self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncConnected; + } + return 1; +} + static int AQ_Reconnect(pAsyncQueue self) { int iRet; @@ -114,13 +193,47 @@ static int AQ_Reconnect(pAsyncQueue self) int flag = 1; char line[132]; + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, + __func__); + /* + * Remove any old timer + */ + if (self->nw_tmr) + NetWatchRemoveTimer(self->nw_tmr); + iRet = NETReconnect(self->pSock); + /* + * iRet can take the following values: + * -1: The request failed + * 0: The request is still in progress + * +1: The request succeeded + */ if (iRet <= 0) { snprintf(line, 132, "Disconnect on AsyncQueue '%s'", self->queue_name); SICSLogWrite(line, eStatus); AQ_Notify(self, AQU_DISCONNECT); + if (iRet < 0) { + /* Timer for retry */ + NetWatchSetMode(self->nw_ctx, 0); + self->retryTimer = 125; /* initial delay */ + NetWatchRegisterTimer(&self->nw_tmr, self->retryTimer, + TimedReconnect, self); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncWaiting\n", + self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncWaiting; + } else { + NetWatchSetMode(self->nw_ctx, nwatch_write); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnecting\n", + self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncConnecting; + /* TODO await reconnect result */ + } return iRet; } + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnected\n", + self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncConnected; snprintf(line, 132, "Reconnect on AsyncQueue '%s'", self->queue_name); SICSLogWrite(line, eStatus); AQ_Notify(self, AQU_RECONNECT); @@ -135,7 +248,11 @@ static int StartCommand(pAsyncQueue self) { pAQ_Cmd myCmd = self->command_head; mkChannel *sock = self->pSock; + int iRet = 0; + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, + __func__); if (myCmd == NULL) return OKOK; @@ -172,23 +289,34 @@ static int StartCommand(pAsyncQueue self) /* * Discard any input before sending command */ - while (NETAvailable(sock, 0)) { - /* TODO: handle unsolicited input */ - char reply[1]; - int iRet; - iRet = NETRead(sock, reply, 1, 0); - if (iRet < 0) { /* EOF */ - iRet = AQ_Reconnect(self); - if (iRet <= 0) { - myCmd->tran->txn_state = ATX_DISCO; - if (myCmd->tran->handleResponse) { - myCmd->tran->handleResponse(myCmd->tran); - } - PopCommand(self); - return 0; + if (NETAvailable(sock, 0)) { + while (NETAvailable(sock, 0)) { + /* TODO: handle unsolicited input */ + char reply[128]; + iRet = NETRead(sock, reply, 128, 0); + if (iRet < 0) { /* EOF */ + iRet = AQ_Reconnect(self); + if (iRet == 0) + return 0; + } else if (iRet > 0) { + struct timeval tv; + gettimeofday(&tv, NULL); + SICSLogTimePrintf(eError, &tv, + "ERROR: %d unsolicited chars in AsyncQueue %s", + iRet, self->queue_name); + SICSLogWriteHexTime(reply, iRet, eError, &tv); } } } + + myCmd->tran->txn_status = ATX_ACTIVE; + iRet = self->protocol->sendCommand(self->protocol, myCmd->tran); + /* + * Handle case of no response expected + */ + if (iRet > 0) + if (myCmd->tran->txn_status == ATX_COMPLETE) + return PopCommand(self); /* * Add a new command timeout timer */ @@ -198,7 +326,7 @@ static int StartCommand(pAsyncQueue self) else NetWatchRegisterTimer(&self->nw_tmr, 30000, CommandTimeout, self); myCmd->active = 1; - return self->protocol->sendCommand(self->protocol, myCmd->tran); + return iRet; } static int QueCommandHead(pAsyncQueue self, pAQ_Cmd cmd) @@ -239,6 +367,7 @@ static int QueCommand(pAsyncQueue self, pAQ_Cmd cmd) self->command_tail = cmd; return 1; } + static int PopCommand(pAsyncQueue self) { pAQ_Cmd myCmd = self->command_head; @@ -246,6 +375,9 @@ static int PopCommand(pAsyncQueue self) NetWatchRemoveTimer(self->nw_tmr); self->nw_tmr = 0; gettimeofday(&self->tvLastCmd, NULL); + /* Process any callback */ + if (myCmd->tran->handleResponse) + myCmd->tran->handleResponse(myCmd->tran); /* * If this is not the last in queue, start transmission */ @@ -276,8 +408,6 @@ static int CommandTimeout(void *cntx, int mode) self->protocol->handleEvent(self->protocol, myCmd->tran, AQU_TIMEOUT); if (iRet == AQU_POP_CMD) { - if (myCmd->tran->handleResponse) - myCmd->tran->handleResponse(myCmd->tran); PopCommand(self); /* remove command */ } else if (iRet == AQU_RETRY_CMD) StartCommand(self); /* restart command */ @@ -290,6 +420,9 @@ static int CommandTimeout(void *cntx, int mode) static int DelayedStart(void *cntx, int mode) { pAsyncQueue self = (pAsyncQueue) cntx; + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, + __func__); self->nw_tmr = 0; StartCommand(self); return 1; @@ -299,27 +432,18 @@ static int MyCallback(void *context, int mode) { pAsyncQueue self = (pAsyncQueue) context; + if (self->state != eAsyncConnected) + SICSLogPrintf(eStatus, "Function: %s:%s\n", self->queue_name, + __func__); if (mode & nwatch_read) { int iRet; - char reply[1]; + char reply[100]; - iRet = NETRead(self->pSock, reply, 1, 0); - /* printf(" iRet, char = %d, %d\n", iRet, (int)reply[0]); */ + iRet = NETRead(self->pSock, reply, 100, 0); if (iRet < 0) { /* EOF */ iRet = AQ_Reconnect(self); - if (iRet <= 0) { - /* changed to call handleResponse with a bad status code: MK - */ - pAQ_Cmd myCmd = self->command_head; - if (myCmd) { - myCmd->tran->txn_state = ATX_DISCO; - if (myCmd->tran->handleResponse) { - myCmd->tran->handleResponse(myCmd->tran); - } - PopCommand(self); - } + if (iRet <= 0) return iRet; - } /* restart the command */ StartCommand(self); return 1; @@ -327,22 +451,54 @@ static int MyCallback(void *context, int mode) if (iRet == 0) { /* TODO: timeout or error */ return 0; } else { + int nchars = iRet; + int i = 0; pAQ_Cmd myCmd = self->command_head; if (myCmd) { - iRet = - self->protocol->handleInput(self->protocol, myCmd->tran, - reply[0]); - if (iRet == 0 || iRet == AQU_POP_CMD) { /* end of command */ - if (myCmd->tran->handleResponse) - myCmd->tran->handleResponse(myCmd->tran); - PopCommand(self); - } else if (iRet < 0) /* TODO: error */ - ; + for (i = 0; i < nchars; ++i) { + iRet = + self->protocol->handleInput(self->protocol, myCmd->tran, + reply[i]); + if (iRet == 0 || iRet == AQU_POP_CMD) { /* end of command */ + PopCommand(self); + break; + } else if (iRet < 0) { + SICSLogWrite("ERROR: Protocol error in AsyncQueue", eError); + /* TODO: error */ + break; + } + } + if (i < nchars - 1) { + int excess = nchars - 1 - i; + struct timeval tv; + gettimeofday(&tv, NULL); + SICSLogTimePrintf(eError, &tv, + "ERROR: %d excess chars in AsyncQueue %s", + excess, self->queue_name); + SICSLogWriteHexTime(&reply[i], excess, eError, &tv); + /* TODO: handle unsolicited */ + } } else { + int excess = nchars - 1 - i; + struct timeval tv; + gettimeofday(&tv, NULL); + SICSLogTimePrintf(eError, &tv, + "ERROR: %d unsolicited chars in AsyncQueue %s", + excess, self->queue_name); + SICSLogWriteHexTime(&reply[i], excess, eError, &tv); /* TODO: handle unsolicited input */ } } } + if (mode & nwatch_write) { + char line[132]; + SICSLogPrintf(eStatus, "Writeable socket callback on AsyncQueue %s", + self->queue_name); + NetWatchSetMode(self->nw_ctx, nwatch_read); + SICSLogPrintf(eStatus, "In %s:%s: state %s => eAsyncConnected\n", + self->queue_name, __func__, state_name(self->state)); + self->state = eAsyncConnected; + } return 1; } @@ -395,7 +551,7 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit, myTxn = (pAsyncTxn) malloc(sizeof(AsyncTxn)); if (myTxn == NULL) { SICSLogWrite("ERROR: Out of memory in AsyncUnitPrepareTxn", eError); - return 0; + return NULL; } memset(myTxn, 0, sizeof(AsyncTxn)); if (unit->queue->protocol->prepareTxn) { @@ -403,12 +559,16 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit, iRet = unit->queue->protocol->prepareTxn(unit->queue->protocol, myTxn, command, cmd_len, rsp_len); + if (iRet == 0) { + free(myTxn); + return NULL; + } } else { myTxn->out_buf = (char *) malloc(cmd_len + 5); if (myTxn->out_buf == NULL) { SICSLogWrite("ERROR: Out of memory in AsyncUnitPrepareTxn", eError); free(myTxn); - return 0; + return NULL; } memcpy(myTxn->out_buf, command, cmd_len); myTxn->out_len = cmd_len; @@ -423,15 +583,12 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit, if (rsp_len == 0) myTxn->inp_buf = NULL; else { - if (myTxn->inp_buf != NULL) { - free(myTxn->inp_buf); - } myTxn->inp_buf = malloc(rsp_len + 1); if (myTxn->inp_buf == NULL) { SICSLogWrite("ERROR: Out of memory in AsyncUnitPrepareTxn", eError); free(myTxn->out_buf); free(myTxn); - return 0; + return NULL; } memset(myTxn->inp_buf, 0, rsp_len + 1); } @@ -458,6 +615,7 @@ int AsyncUnitSendTxn(pAsyncUnit unit, typedef struct txn_s { char *transReply; int transWait; + int respLen; } TXN, *pTXN; /** @@ -469,30 +627,33 @@ static int TransCallback(pAsyncTxn pCmd) int resp_len = pCmd->inp_idx; pTXN self = (pTXN) pCmd->cntx; - if (pCmd->txn_status == ATX_TIMEOUT) { + self->respLen = resp_len; + if (resp_len > 0) { memcpy(self->transReply, resp, resp_len); self->transReply[resp_len] = '\0'; + } else self->transReply[0] = '\0'; + if (pCmd->txn_status == ATX_TIMEOUT) self->transWait = -1; - } else { - memcpy(self->transReply, resp, resp_len); - self->transReply[resp_len] = '\0'; + else self->transWait = 0; - } + return 0; } int AsyncUnitTransact(pAsyncUnit unit, const char *command, int cmd_len, - char *response, int rsp_len) + char *response, int *rsp_len) { TXN txn; assert(unit); txn.transReply = response; + txn.respLen = *rsp_len; txn.transWait = 1; - AsyncUnitSendTxn(unit, command, cmd_len, TransCallback, &txn, rsp_len); + AsyncUnitSendTxn(unit, command, cmd_len, TransCallback, &txn, *rsp_len); while (txn.transWait == 1) TaskYield(pServ->pTasker); + *rsp_len = txn.respLen; if (txn.transWait < 0) return txn.transWait; return 1; @@ -598,19 +759,70 @@ int AsyncQueueAction(SConnection * pCon, SicsInterp * pSics, char cmd[10240]; char rsp[10240]; int idx = 0; - int i, j; + int i, j, len; cmd[0] = '\0'; + /* Managers only */ + if (!SCMatchRights(pCon, usMugger)) + return 0; for (i = 2; i < argc; ++i) { - j = snprintf(&cmd[idx], 10240 - idx, "%s%s", - (i > 2) ? " " : "", argv[i]); - if (j < 0) + if (idx >= 10240) break; - idx += j; + if (i > 2) + cmd[idx++] = ' '; + len = strlen(argv[i]); + for (j = 0; j < len; ++j) { + if (idx >= 10240) + break; + if (argv[i][j] == '\\') { + ++j; + if (argv[i][j] == '\\') + cmd[idx++] = '\\'; + else if (argv[i][j] == 'r') + cmd[idx++] = '\r'; + else if (argv[i][j] == 'n') + cmd[idx++] = '\n'; + else if (isdigit(argv[i][j])) { + char *nptr = &argv[i][j]; + char *endptr; + long k = strtol(nptr, &endptr, 0); + cmd[idx++] = k; + j += (endptr - nptr); + --j; /* prepare for loop increment */ + } else + cmd[idx++] = argv[i][j]; + } else + cmd[idx++] = argv[i][j]; + } } + cmd[idx] = '\0'; memset(&myUnit, 0, sizeof(AsyncUnit)); myUnit.queue = self; - AsyncUnitTransact(&myUnit, cmd, idx, rsp, 10240); - SCWrite(pCon, rsp, eValue); + len = 10240; + (void) AsyncUnitTransact(&myUnit, cmd, idx, rsp, &len); + /* escape control characters in response */ + j = 0; + for (i = 0; i < len; ++i) { + if (j >= 10230) + break; + if (self->translate) { + if (rsp[i] < 32 || rsp[i] >= 127) { + if (rsp[i] == '\r') { + cmd[j++] = '\r'; + } else if (rsp[i] == '\n') { + cmd[j++] = '\n'; + } else { + j += snprintf(&cmd[j], 6, "\\0x%02x", rsp[i]); + } + } else if (rsp[i] == '\\') { + cmd[j++] = '\\'; + cmd[j++] = '\\'; + } else + cmd[j++] = rsp[i]; + } else + cmd[j++] = rsp[i]; + } + cmd[j++] = '\0'; + SCWrite(pCon, cmd, eValue); return 1; } if (strcasecmp(argv[1], "reconnect") == 0) { @@ -627,7 +839,7 @@ int AsyncQueueAction(SConnection * pCon, SicsInterp * pSics, SCWrite(pCon, line, eError); return 0; } else { - if (delay < 0 || delay > 30000) { + if (delay < 0 || delay > 300000) { snprintf(line, 132, "Value out of range: %d", delay); SCWrite(pCon, line, eError); return 0; @@ -692,6 +904,34 @@ int AsyncQueueAction(SConnection * pCon, SicsInterp * pSics, } return OKOK; } + if (strcasecmp(argv[1], "translate") == 0) { + if (argc > 2) { + int value; + int iRet; + iRet = sscanf(argv[2], "%d", &value); + if (iRet != 1) { + snprintf(line, 132, "Invalid argument: %s", argv[2]); + SCWrite(pCon, line, eError); + return 0; + } else { + if (value == 0) { + self->translate = false; + return OKOK; + } else if (value == 1) { + self->translate = true; + return OKOK; + } + snprintf(line, 132, "Invalid argument: %s", argv[2]); + SCWrite(pCon, line, eError); + return 0; + } + } else { + snprintf(line, 132, "%s.translate = %d", argv[0], self->translate); + SCWrite(pCon, line, eStatus); + return OKOK; + } + return OKOK; + } } snprintf(line, 132, "%s does not understand %s", argv[0], argv[1]); SCWrite(pCon, line, eError); @@ -769,6 +1009,7 @@ static int AQ_Init(pAsyncQueue self) if (self->nw_ctx == NULL) NetWatchRegisterCallback(&self->nw_ctx, self->pSock->sockid, MyCallback, self); + NetWatchSetMode(self->nw_ctx, nwatch_write | nwatch_read); return 1; } @@ -979,3 +1220,20 @@ pAsyncUnit AsyncUnitFromQueue(pAsyncQueue queue) result->queue = queue; return result; } + +void *AsyncUnitSetQueueContext(pAsyncUnit unit, void *cntx) +{ + void *hold; + assert(unit); + assert(unit->queue); + hold = unit->queue->context; + unit->queue->context = cntx; + return hold; +} + +void *AsyncUnitGetQueueContext(pAsyncUnit unit) +{ + assert(unit); + assert(unit->queue); + return unit->queue->context; +} diff --git a/asyncqueue.h b/asyncqueue.h index 2359d575..60fed9ca 100644 --- a/asyncqueue.h +++ b/asyncqueue.h @@ -28,7 +28,7 @@ typedef struct __AsyncQueue AsyncQueue, *pAsyncQueue; */ int AsyncUnitCreate(const char *queueName, pAsyncUnit * unit); /** \brief Get an AsyncUnit from a given AsyncQueue - * \param queue The AsyncQueue fro which this AsyncUnit is valid + * \param queue The AsyncQueue for which this AsyncUnit is valid * \return a new AsyncUnit or NULL on error */ pAsyncUnit AsyncUnitFromQueue(pAsyncQueue queue); @@ -36,9 +36,13 @@ pAsyncUnit AsyncUnitFromQueue(pAsyncQueue queue); /** \brief create an AsyncUnit attached to an anonymous AsyncQueue. * * \param host name or address of the target host - * \param port number or service name on the target host + * \param port number or service name on the target host * \param unit pointer to the AsyncUnit created on positive return * \return positive if successful + * + * If is null then points to an existing queue name. + * If a queue exists for /, then that queue is used. + * If neither of the above hold, a new queue is created. */ int AsyncUnitCreateHost(const char *host, const char *port, pAsyncUnit * unit); @@ -98,11 +102,12 @@ int AsyncUnitSendTxn(pAsyncUnit unit, * \param cmd_len length of data in command * \param responseHandler function to handle the response * \param context to be used by handler function - * \param resp_len maximum length to be allowed for response + * \param resp_len [in] maximum length to be allowed for response + * [out] actual length returned */ int AsyncUnitTransact(pAsyncUnit unit, const char *command, int cmd_len, - char *response, int rsp_len); + char *response, int *rsp_len); /** \brief write to the AsyncQueue file descriptor * @@ -182,4 +187,12 @@ int AsyncQueueFactory(SConnection * pCon, SicsInterp * pSics, int AsyncQueueAction(SConnection * pCon, SicsInterp * pSics, void *pData, int argc, char *argv[]); +/** \brief store queue level context + */ +void *AsyncUnitSetQueueContext(pAsyncUnit handle, void *cntx); + +/** \brief retrieve queue level context + */ +void *AsyncUnitGetQueueContext(pAsyncUnit handle); + #endif /* SICSASYNCQUEUE */