implement some TODOs, add malloc checks and logging
r1984 | dcl | 2007-05-25 16:12:10 +1000 (Fri, 25 May 2007) | 2 lines
This commit is contained in:
@ -48,7 +48,7 @@ int defaultHandleInput(pAsyncProtocol p, pAsyncTxn txn, int ch) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int defaultHandleEvent(pAsyncProtocol p, pAsyncTxn txn, int event) {
|
int defaultHandleEvent(pAsyncProtocol p, pAsyncTxn txn, int event) {
|
||||||
/* TODO: handle the event */
|
/* TODO: what could or should we do to handle the event */
|
||||||
return AQU_POP_CMD;
|
return AQU_POP_CMD;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,12 +73,20 @@ int defaultPrepareTxn(pAsyncProtocol p, pAsyncTxn txn, const char* cmd, int cmd_
|
|||||||
if (term[state] == 0) {
|
if (term[state] == 0) {
|
||||||
/* outgoing command is correctly terminated */
|
/* outgoing command is correctly terminated */
|
||||||
txn->out_buf = malloc(cmd_len + 1);
|
txn->out_buf = malloc(cmd_len + 1);
|
||||||
|
if (txn->out_buf == NULL) {
|
||||||
|
SICSLogWrite("Out of memory in AsyncProtocol::defaultPrepareTxn", eError);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
memcpy(txn->out_buf, cmd, cmd_len + 1);
|
memcpy(txn->out_buf, cmd, cmd_len + 1);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
/* outgoing command is NOT correctly terminated */
|
/* outgoing command is NOT correctly terminated */
|
||||||
int tlen = strlen(term);
|
int tlen = strlen(term);
|
||||||
txn->out_buf = malloc(cmd_len + tlen + 1);
|
txn->out_buf = malloc(cmd_len + tlen + 1);
|
||||||
|
if (txn->out_buf == NULL) {
|
||||||
|
SICSLogWrite("Out of memory in AsyncProtocol::defaultPrepareTxn", eError);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
memcpy(txn->out_buf, cmd, cmd_len);
|
memcpy(txn->out_buf, cmd, cmd_len);
|
||||||
memcpy(txn->out_buf + cmd_len, term, tlen + 1);
|
memcpy(txn->out_buf + cmd_len, term, tlen + 1);
|
||||||
cmd_len += tlen;
|
cmd_len += tlen;
|
||||||
@ -86,6 +94,12 @@ int defaultPrepareTxn(pAsyncProtocol p, pAsyncTxn txn, const char* cmd, int cmd_
|
|||||||
txn->out_len = cmd_len;
|
txn->out_len = cmd_len;
|
||||||
txn->out_idx = 0;
|
txn->out_idx = 0;
|
||||||
txn->inp_buf = malloc(rsp_len);
|
txn->inp_buf = malloc(rsp_len);
|
||||||
|
if (txn->inp_buf == NULL) {
|
||||||
|
SICSLogWrite("Out of memory in AsyncProtocol::defaultPrepareTxn", eError);
|
||||||
|
free(txn->out_buf);
|
||||||
|
txn->out_buf = NULL;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
txn->inp_len = rsp_len;
|
txn->inp_len = rsp_len;
|
||||||
txn->inp_idx = 0;
|
txn->inp_idx = 0;
|
||||||
txn->txn_state = 0;
|
txn->txn_state = 0;
|
||||||
@ -149,8 +163,10 @@ static char *decodeTerminator(char *code)
|
|||||||
return NULL;
|
return NULL;
|
||||||
count = strlen(code);
|
count = strlen(code);
|
||||||
pResult = (char *) malloc(count + 1);
|
pResult = (char *) malloc(count + 1);
|
||||||
if (!pResult)
|
if (!pResult) {
|
||||||
|
SICSLogWrite("Out of memory in AsyncProtocol::decodeTerminator", eError);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
}
|
||||||
memset(pResult, 0, count + 1);
|
memset(pResult, 0, count + 1);
|
||||||
|
|
||||||
pCh = pResult;
|
pCh = pResult;
|
||||||
@ -217,7 +233,7 @@ int AsyncProtocolAction(SConnection *pCon, SicsInterp *pSics,
|
|||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
if (strcasecmp(argv[1], "replyterminator") == 0) {
|
else if (strcasecmp(argv[1], "replyterminator") == 0) {
|
||||||
if (argc > 2) {
|
if (argc > 2) {
|
||||||
char* pPtr = decodeTerminator(argv[2]);
|
char* pPtr = decodeTerminator(argv[2]);
|
||||||
if (pPtr) {
|
if (pPtr) {
|
||||||
@ -238,13 +254,23 @@ int AsyncProtocolAction(SConnection *pCon, SicsInterp *pSics,
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* TODO: other actions */
|
else if (strcasecmp(argv[1], "list") == 0) {
|
||||||
|
int ac = 2;
|
||||||
|
char* av[3] = { argv[0], 0, 0 };
|
||||||
|
av[1] = "sendterminator";
|
||||||
|
AsyncProtocolAction(pCon, pSics, pData, ac, av);
|
||||||
|
av[1] = "replyterminator";
|
||||||
|
AsyncProtocolAction(pCon, pSics, pData, ac, av);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
/* handle any other actions here */
|
||||||
return AsyncProtocolNoAction(pCon, pSics, pData, argc,argv);
|
return AsyncProtocolNoAction(pCon, pSics, pData, argc,argv);
|
||||||
}
|
}
|
||||||
|
|
||||||
void defaultKillPrivate(pAsyncProtocol p) {
|
void defaultKillPrivate(pAsyncProtocol p) {
|
||||||
if (p->privateData) {
|
if (p->privateData) {
|
||||||
/* TODO: anything? */
|
/* TODO: should we do anything? */
|
||||||
|
free(p->privateData);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -252,7 +278,6 @@ void AsyncProtocolKill(void *pData) {
|
|||||||
pAsyncProtocol self = (pAsyncProtocol) pData;
|
pAsyncProtocol self = (pAsyncProtocol) pData;
|
||||||
if(self->pDes)
|
if(self->pDes)
|
||||||
DeleteDescriptor(self->pDes);
|
DeleteDescriptor(self->pDes);
|
||||||
/* TODO: more destruction maybe */
|
|
||||||
if(self->sendTerminator != NULL)
|
if(self->sendTerminator != NULL)
|
||||||
free(self->sendTerminator);
|
free(self->sendTerminator);
|
||||||
if(self->replyTerminator != NULL)
|
if(self->replyTerminator != NULL)
|
||||||
@ -274,7 +299,7 @@ pAsyncProtocol AsyncProtocolCreate(SicsInterp *pSics, const char* protocolName,
|
|||||||
|
|
||||||
self = (pAsyncProtocol) malloc(sizeof(AsyncProtocol));
|
self = (pAsyncProtocol) malloc(sizeof(AsyncProtocol));
|
||||||
if (self == NULL) {
|
if (self == NULL) {
|
||||||
/* TODO */
|
SICSLogWrite("Out of memory in AsyncProtocolCreate", eError);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
memset(self, 0, sizeof(AsyncProtocol));
|
memset(self, 0, sizeof(AsyncProtocol));
|
||||||
@ -285,7 +310,7 @@ pAsyncProtocol AsyncProtocolCreate(SicsInterp *pSics, const char* protocolName,
|
|||||||
pKFunc = AsyncProtocolKill;
|
pKFunc = AsyncProtocolKill;
|
||||||
iRet = AddCommand(pSics, protocolName, pFunc, pKFunc, self);
|
iRet = AddCommand(pSics, protocolName, pFunc, pKFunc, self);
|
||||||
if (!iRet ) {
|
if (!iRet ) {
|
||||||
/* TODO */
|
SICSLogWrite("AddCommand failed in AsyncProtocolCreate", eError);
|
||||||
AsyncProtocolKill(self);
|
AsyncProtocolKill(self);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -307,7 +332,7 @@ int AsyncProtocolFactory(SConnection *pCon, SicsInterp *pSics,
|
|||||||
}
|
}
|
||||||
pAsyncProtocol pNew = AsyncProtocolCreate(pSics, argv[1],
|
pAsyncProtocol pNew = AsyncProtocolCreate(pSics, argv[1],
|
||||||
AsyncProtocolAction, AsyncProtocolKill);
|
AsyncProtocolAction, AsyncProtocolKill);
|
||||||
/* TODO: handle arguments */
|
/* handle any extra arguments here */
|
||||||
pNew->privateData = NULL;
|
pNew->privateData = NULL;
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
44
asyncqueue.c
44
asyncqueue.c
@ -181,7 +181,7 @@ static int StartCommand(pAsyncQueue self)
|
|||||||
char reply[1];
|
char reply[1];
|
||||||
int iRet;
|
int iRet;
|
||||||
iRet = NETRead(sock, reply, 1, 0);
|
iRet = NETRead(sock, reply, 1, 0);
|
||||||
if (iRet < 0) { /* TODO: EOF */
|
if (iRet < 0) { /* EOF */
|
||||||
iRet = AQ_Reconnect(self);
|
iRet = AQ_Reconnect(self);
|
||||||
if (iRet == 0)
|
if (iRet == 0)
|
||||||
return 0;
|
return 0;
|
||||||
@ -306,7 +306,7 @@ static int MyCallback(void* context, int mode)
|
|||||||
char reply[1];
|
char reply[1];
|
||||||
|
|
||||||
iRet = NETRead(self->pSock, reply, 1, 0);
|
iRet = NETRead(self->pSock, reply, 1, 0);
|
||||||
if (iRet < 0) { /* TODO: EOF */
|
if (iRet < 0) { /* EOF */
|
||||||
iRet = AQ_Reconnect(self);
|
iRet = AQ_Reconnect(self);
|
||||||
if (iRet <= 0)
|
if (iRet <= 0)
|
||||||
return iRet;
|
return iRet;
|
||||||
@ -342,7 +342,10 @@ int AsyncUnitEnqueHead(pAsyncUnit unit, pAsyncTxn context)
|
|||||||
|
|
||||||
assert(unit && unit->queue && unit->queue->protocol);
|
assert(unit && unit->queue && unit->queue->protocol);
|
||||||
myCmd = (pAQ_Cmd) malloc(sizeof(AQ_Cmd));
|
myCmd = (pAQ_Cmd) malloc(sizeof(AQ_Cmd));
|
||||||
/* TODO: check malloc */
|
if (myCmd == NULL) {
|
||||||
|
SICSLogWrite("ERROR: Out of memory in AsyncUnitEnqueHead", eError);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
memset(myCmd, 0, sizeof(AQ_Cmd));
|
memset(myCmd, 0, sizeof(AQ_Cmd));
|
||||||
myCmd->tran = context;
|
myCmd->tran = context;
|
||||||
myCmd->unit = unit;
|
myCmd->unit = unit;
|
||||||
@ -358,7 +361,10 @@ int AsyncUnitEnqueueTxn(pAsyncUnit unit, pAsyncTxn pTxn)
|
|||||||
|
|
||||||
assert(unit && unit->queue && unit->queue->protocol);
|
assert(unit && unit->queue && unit->queue->protocol);
|
||||||
myCmd = (pAQ_Cmd) malloc(sizeof(AQ_Cmd));
|
myCmd = (pAQ_Cmd) malloc(sizeof(AQ_Cmd));
|
||||||
/* TODO: check malloc */
|
if (myCmd == NULL) {
|
||||||
|
SICSLogWrite("ERROR: Out of memory in AsyncUnitEnqueueTxn", eError);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
memset(myCmd, 0, sizeof(AQ_Cmd));
|
memset(myCmd, 0, sizeof(AQ_Cmd));
|
||||||
myCmd->tran = pTxn;
|
myCmd->tran = pTxn;
|
||||||
myCmd->unit = unit;
|
myCmd->unit = unit;
|
||||||
@ -377,7 +383,10 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
|
|||||||
|
|
||||||
assert(unit);
|
assert(unit);
|
||||||
myTxn = (pAsyncTxn) malloc(sizeof(AsyncTxn));
|
myTxn = (pAsyncTxn) malloc(sizeof(AsyncTxn));
|
||||||
assert(myTxn);
|
if (myTxn == NULL) {
|
||||||
|
SICSLogWrite("ERROR: Out of memory in AsyncUnitPrepareTxn", eError);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
memset(myTxn, 0, sizeof(AsyncTxn));
|
memset(myTxn, 0, sizeof(AsyncTxn));
|
||||||
if (unit->queue->protocol->prepareTxn) {
|
if (unit->queue->protocol->prepareTxn) {
|
||||||
int iRet;
|
int iRet;
|
||||||
@ -385,6 +394,11 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
|
|||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
myTxn->out_buf = (char*) malloc(cmd_len + 5);
|
myTxn->out_buf = (char*) malloc(cmd_len + 5);
|
||||||
|
if (myTxn->out_buf == NULL) {
|
||||||
|
SICSLogWrite("ERROR: Out of memory in AsyncUnitPrepareTxn", eError);
|
||||||
|
free(myTxn);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
memcpy(myTxn->out_buf, command, cmd_len);
|
memcpy(myTxn->out_buf, command, cmd_len);
|
||||||
myTxn->out_len = cmd_len;
|
myTxn->out_len = cmd_len;
|
||||||
if (myTxn->out_len < 2 ||
|
if (myTxn->out_len < 2 ||
|
||||||
@ -399,6 +413,12 @@ pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
|
|||||||
myTxn->inp_buf = NULL;
|
myTxn->inp_buf = NULL;
|
||||||
else {
|
else {
|
||||||
myTxn->inp_buf = malloc(rsp_len + 1);
|
myTxn->inp_buf = malloc(rsp_len + 1);
|
||||||
|
if (myTxn->inp_buf == NULL) {
|
||||||
|
SICSLogWrite("ERROR: Out of memory in AsyncUnitPrepareTxn", eError);
|
||||||
|
free(myTxn->out_buf);
|
||||||
|
free(myTxn);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
memset(myTxn->inp_buf, 0, rsp_len + 1);
|
memset(myTxn->inp_buf, 0, rsp_len + 1);
|
||||||
}
|
}
|
||||||
myTxn->inp_len = rsp_len;
|
myTxn->inp_len = rsp_len;
|
||||||
@ -477,7 +497,7 @@ int AsyncUnitWrite(pAsyncUnit unit, void* buffer, int buflen)
|
|||||||
sock = AsyncUnitGetSocket(unit);
|
sock = AsyncUnitGetSocket(unit);
|
||||||
iRet = NETWrite(sock, buffer, buflen);
|
iRet = NETWrite(sock, buffer, buflen);
|
||||||
/* TODO handle errors */
|
/* TODO handle errors */
|
||||||
if (iRet < 0) { /* TODO: EOF */
|
if (iRet < 0) { /* EOF */
|
||||||
iRet = AQ_Reconnect(unit->queue);
|
iRet = AQ_Reconnect(unit->queue);
|
||||||
if (iRet == 0)
|
if (iRet == 0)
|
||||||
return 0;
|
return 0;
|
||||||
@ -708,7 +728,7 @@ static pAsyncQueue AQ_Create(const char* host, const char* port)
|
|||||||
}
|
}
|
||||||
if (self == NULL) {
|
if (self == NULL) {
|
||||||
channel = NETConnectWithFlags(host, port_no, 0);
|
channel = NETConnectWithFlags(host, port_no, 0);
|
||||||
/* TODO */
|
/* TODO handle asynchronous connection */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -738,7 +758,7 @@ static pAsyncQueue AQ_Create(const char* host, const char* port)
|
|||||||
|
|
||||||
static int AQ_Init(pAsyncQueue self)
|
static int AQ_Init(pAsyncQueue self)
|
||||||
{
|
{
|
||||||
/* TODO: Init the controller */
|
/* Init the controller */
|
||||||
if (self->nw_ctx == NULL)
|
if (self->nw_ctx == NULL)
|
||||||
NetWatchRegisterCallback(&self->nw_ctx,
|
NetWatchRegisterCallback(&self->nw_ctx,
|
||||||
self->pSock->sockid,
|
self->pSock->sockid,
|
||||||
@ -830,7 +850,7 @@ int AsyncQueueFactory(SConnection *pCon, SicsInterp *pSics,
|
|||||||
SCWrite(pCon, line, eError);
|
SCWrite(pCon, line, eError);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* TODO: asynchronous connection */
|
/* TODO: implement asynchronous connection */
|
||||||
channel = NETConnectWithFlags(argv[3], port_no, 0);
|
channel = NETConnectWithFlags(argv[3], port_no, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -895,7 +915,11 @@ int AsyncUnitCreateHost(const char* host, const char* port, pAsyncUnit* handle)
|
|||||||
status = AQ_Init(self);
|
status = AQ_Init(self);
|
||||||
|
|
||||||
unit = (pAsyncUnit) malloc(sizeof(AsyncUnit));
|
unit = (pAsyncUnit) malloc(sizeof(AsyncUnit));
|
||||||
/* TODO: check malloc failure */
|
if (unit == NULL) {
|
||||||
|
SICSLogWrite("ERROR: Out of memory in AsyncUnitCreateHost", eError);
|
||||||
|
*handle = NULL;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
memset(unit, 0, sizeof(AsyncUnit));
|
memset(unit, 0, sizeof(AsyncUnit));
|
||||||
++self->unit_count;
|
++self->unit_count;
|
||||||
unit->queue = self;
|
unit->queue = self;
|
||||||
|
Reference in New Issue
Block a user