Add queue/controller level opaque pointer for units to coordinate controller access.

Read multiple bytes at a time and pass them sequentially to the input callback to reduce number of system calls.

r2104 | dcl | 2007-08-10 10:25:40 +1000 (Fri, 10 Aug 2007) | 3 lines
This commit is contained in:
Douglas Clowes
2007-08-10 10:25:40 +10:00
parent ecfd182a2e
commit b701a69b15

View File

@@ -58,6 +58,7 @@ struct __AsyncQueue {
pNWTimer nw_tmr; /* NetWait timer handle */ pNWTimer nw_tmr; /* NetWait timer handle */
mkChannel* pSock; /* socket address */ mkChannel* pSock; /* socket address */
pAsyncProtocol protocol; pAsyncProtocol protocol;
void* context; /**< opaque caller queue context */
}; };
static pAsyncQueue queue_array[FD_SETSIZE]; static pAsyncQueue queue_array[FD_SETSIZE];
@@ -303,9 +304,9 @@ static int MyCallback(void* context, int mode)
if (mode & nwatch_read) { if (mode & nwatch_read) {
int iRet; int iRet;
char reply[1]; char reply[100];
iRet = NETRead(self->pSock, reply, 1, 0); iRet = NETRead(self->pSock, reply, 100, 0);
if (iRet < 0) { /* EOF */ if (iRet < 0) { /* EOF */
iRet = AQ_Reconnect(self); iRet = AQ_Reconnect(self);
if (iRet <= 0) if (iRet <= 0)
@@ -317,18 +318,31 @@ static int MyCallback(void* context, int mode)
if (iRet == 0) { /* TODO: timeout or error */ if (iRet == 0) { /* TODO: timeout or error */
return 0; return 0;
} else { } else {
int nchars = iRet;
int i = 0;
pAQ_Cmd myCmd = self->command_head; pAQ_Cmd myCmd = self->command_head;
if (myCmd) { if (myCmd) {
iRet = self->protocol->handleInput(self->protocol, myCmd->tran, reply[0]); for (i = 0; i < nchars; ++i) {
iRet = self->protocol->handleInput(self->protocol, myCmd->tran, reply[i]);
if (iRet == 0 || iRet == AQU_POP_CMD) { /* end of command */ if (iRet == 0 || iRet == AQU_POP_CMD) { /* end of command */
if (myCmd->tran->handleResponse) if (myCmd->tran->handleResponse)
myCmd->tran->handleResponse(myCmd->tran); myCmd->tran->handleResponse(myCmd->tran);
PopCommand(self); PopCommand(self);
break;
}
else if (iRet < 0) {
SICSLogWrite("ERROR: Protocol error in AsyncQueue", eError);
/* TODO: error */
break;
}
}
if (i < nchars - 1) {
SICSLogWrite("ERROR: excess chars in AsyncQueue", eError);
/* TODO: handle unsolicited */
} }
else if (iRet < 0) /* TODO: error */
;
} }
else { else {
SICSLogWrite("ERROR: unsolicited input in AsyncQueue", eError);
/* TODO: handle unsolicited input */ /* TODO: handle unsolicited input */
} }
} }
@@ -954,3 +968,18 @@ int AsyncUnitDestroy(pAsyncUnit unit)
return 1; return 1;
} }
void* AsyncUnitSetQueueContext(pAsyncUnit unit, void* cntx) {
void* hold;
assert(unit);
assert(unit->queue);
hold = unit->queue->context;
unit->queue->context = cntx;
return hold;
}
void* AsyncUnitGetQueueContext(pAsyncUnit unit) {
assert(unit);
assert(unit->queue);
return unit->queue->context;
}