Adjustments for handling binary protocols and multiple receive terminators

r2268 | dcl | 2008-01-15 12:19:37 +1100 (Tue, 15 Jan 2008) | 2 lines
This commit is contained in:
Douglas Clowes
2008-01-15 12:19:37 +11:00
parent 1250d902e7
commit fe2eabfc33
2 changed files with 56 additions and 29 deletions

View File

@@ -24,22 +24,29 @@ int defaultSendCommand(pAsyncProtocol p, pAsyncTxn txn) {
if (iRet <= 0) if (iRet <= 0)
return iRet; return iRet;
if (term[state] != 0) if (term[state] != 0)
iRet = AsyncUnitWrite(txn->unit, term, strlen(term)); iRet = AsyncUnitWrite(txn->unit, (void *)term, strlen(term));
return iRet; return iRet;
} }
int defaultHandleInput(pAsyncProtocol p, pAsyncTxn txn, int ch) { int defaultHandleInput(pAsyncProtocol p, pAsyncTxn txn, int ch) {
const char *term = "\r\n"; const char *term = "\r\n";
if (p->replyTerminator) if (txn->txn_state == 0) {
term = p->replyTerminator; int i;
if (ch == term[txn->txn_state]) for (i = 0; i < 10; ++i)
if (p->replyTerminator[i] && ch == p->replyTerminator[i][0]) {
txn->txn_state = i << 16;
break;
}
}
term = p->replyTerminator[txn->txn_state >> 16];
if (ch == term[txn->txn_state & 0xffff])
++txn->txn_state; ++txn->txn_state;
else else
txn->txn_state = 0; txn->txn_state = 0;
if (txn->inp_idx < txn->inp_len) if (txn->inp_idx < txn->inp_len)
txn->inp_buf[txn->inp_idx++] = ch; txn->inp_buf[txn->inp_idx++] = ch;
if (term[txn->txn_state] == 0) { if (term[txn->txn_state & 0xffff] == 0) {
if (txn->inp_idx < txn->inp_len) if (txn->inp_idx < txn->inp_len)
txn->inp_buf[txn->inp_idx] = '\0'; txn->inp_buf[txn->inp_idx] = '\0';
return AQU_POP_CMD; return AQU_POP_CMD;
@@ -60,11 +67,7 @@ int defaultPrepareTxn(pAsyncProtocol p, pAsyncTxn txn, const char* cmd, int cmd_
term = p->sendTerminator; term = p->sendTerminator;
state = 0; state = 0;
for (i = 0; i < cmd_len; ++i) { for (i = 0; i < cmd_len; ++i) {
if (cmd[i] == 0x00) { /* end of transmission */ if (cmd[i] == term[state]) {
cmd_len = i;
break;
}
else if (cmd[i] == term[state]) {
++state; ++state;
continue; continue;
} }
@@ -114,12 +117,17 @@ static void encodeTerminator(char *result, char *terminator)
{ {
if (terminator) if (terminator)
while (*terminator) { while (*terminator) {
if (*terminator <= 32 || *terminator >= 127) {
*result++ = '0'; *result++ = '0';
*result++ = 'x'; *result++ = 'x';
*result++ = hex[(*terminator >> 4) &0xF]; *result++ = hex[(*terminator >> 4) &0xF];
*result++ = hex[(*terminator) &0xF]; *result++ = hex[(*terminator) &0xF];
++terminator; ++terminator;
} }
else {
*result++ = *terminator++;
}
}
*result = '\0'; *result = '\0';
return; return;
} }
@@ -235,20 +243,36 @@ int AsyncProtocolAction(SConnection *pCon, SicsInterp *pSics,
} }
else if (strcasecmp(argv[1], "replyterminator") == 0) { else if (strcasecmp(argv[1], "replyterminator") == 0) {
if (argc > 2) { if (argc > 2) {
char* pPtr = decodeTerminator(argv[2]); int i;
for (i = 0; i < 10; ++i)
if (self->replyTerminator[i]) {
free(self->replyTerminator[i]);
self->replyTerminator[i] = NULL;
}
for (i = 0; i < 10 && i < argc - 2; ++i) {
char* pPtr = decodeTerminator(argv[i + 2]);
if (pPtr) { if (pPtr) {
if (self->replyTerminator) self->replyTerminator[i] = pPtr;
free(self->replyTerminator); }
self->replyTerminator = pPtr;
} }
SCSendOK(pCon); SCSendOK(pCon);
} }
else else
{ {
int i;
char term[132]; char term[132];
char line[1024]; char line[1024];
encodeTerminator(term, self->replyTerminator); term[0] = '\0';
sprintf(line, "%s.replyTerminator = \"%s\"", argv[0], term); sprintf(line, "%s.replyTerminator =", argv[0]);
for (i = 0; i < 10; ++i) {
if (self->replyTerminator[i] == NULL)
break;
term[0] = ' ';
term[1] = '"';
encodeTerminator(&term[2], self->replyTerminator[i]);
strcat(term, "\"");
strcat(line, term);
}
SCWrite(pCon, line, eValue); SCWrite(pCon, line, eValue);
} }
return 1; return 1;
@@ -276,12 +300,14 @@ void defaultKillPrivate(pAsyncProtocol p) {
void AsyncProtocolKill(void *pData) { void AsyncProtocolKill(void *pData) {
pAsyncProtocol self = (pAsyncProtocol) pData; pAsyncProtocol self = (pAsyncProtocol) pData;
int i;
if(self->pDes) if(self->pDes)
DeleteDescriptor(self->pDes); DeleteDescriptor(self->pDes);
if(self->sendTerminator != NULL) if(self->sendTerminator != NULL)
free(self->sendTerminator); free(self->sendTerminator);
if(self->replyTerminator != NULL) for (i = 0; i < 10; ++i)
free(self->replyTerminator); if(self->replyTerminator[i] != NULL)
free(self->replyTerminator[i]);
if (self->killPrivate) if (self->killPrivate)
self->killPrivate(self); self->killPrivate(self);
} }
@@ -292,7 +318,7 @@ pAsyncProtocol AsyncProtocolCreate(SicsInterp *pSics, const char* protocolName,
pAsyncProtocol self = NULL; pAsyncProtocol self = NULL;
/* try to find an existing queue with this name */ /* try to find an existing queue with this name */
self = (pAsyncProtocol) FindCommandData(pServ->pSics, protocolName, "AsyncProtocol"); self = (pAsyncProtocol) FindCommandData(pServ->pSics, (char *)protocolName, "AsyncProtocol");
if (self != NULL) { if (self != NULL) {
return self; return self;
} }
@@ -308,7 +334,7 @@ pAsyncProtocol AsyncProtocolCreate(SicsInterp *pSics, const char* protocolName,
pFunc = AsyncProtocolNoAction; pFunc = AsyncProtocolNoAction;
if (pKFunc == NULL) if (pKFunc == NULL)
pKFunc = AsyncProtocolKill; pKFunc = AsyncProtocolKill;
iRet = AddCommand(pSics, protocolName, pFunc, pKFunc, self); iRet = AddCommand(pSics, (char *)protocolName, pFunc, pKFunc, self);
if (!iRet ) { if (!iRet ) {
SICSLogWrite("AddCommand failed in AsyncProtocolCreate", eError); SICSLogWrite("AddCommand failed in AsyncProtocolCreate", eError);
AsyncProtocolKill(self); AsyncProtocolKill(self);
@@ -320,7 +346,7 @@ pAsyncProtocol AsyncProtocolCreate(SicsInterp *pSics, const char* protocolName,
self->prepareTxn = defaultPrepareTxn; self->prepareTxn = defaultPrepareTxn;
self->killPrivate = defaultKillPrivate; self->killPrivate = defaultKillPrivate;
self->sendTerminator = strdup("\r\n"); self->sendTerminator = strdup("\r\n");
self->replyTerminator = strdup("\r\n"); self->replyTerminator[0] = strdup("\r\n");
return self; return self;
} }

View File

@@ -91,11 +91,12 @@ int AsyncUnitSendTxn(pAsyncUnit unit,
* \param cmd_len length of data in command * \param cmd_len length of data in command
* \param responseHandler function to handle the response * \param responseHandler function to handle the response
* \param context to be used by handler function * \param context to be used by handler function
* \param resp_len maximum length to be allowed for response * \param resp_len [in] maximum length to be allowed for response
* [out] actual length returned
*/ */
int AsyncUnitTransact(pAsyncUnit unit, int AsyncUnitTransact(pAsyncUnit unit,
const char* command, int cmd_len, const char* command, int cmd_len,
char* response, int rsp_len); char* response, int *rsp_len);
/** \brief write to the AsyncQueue file descriptor /** \brief write to the AsyncQueue file descriptor
* *