/* * A S Y N C Q U E U E * * This module manages AsyncQueue communications. * * The AsyncQueue is an asynchronous queue between drivers and the device. It * supports multiple logical units on a single device controller that share a * single command channel. * * Douglas Clowes, February 2007 * */ #include #include #include #include #include #include #include #include "network.h" #include "asyncqueue.h" #include "nwatch.h" #include #include typedef struct __async_command AQ_Cmd, *pAQ_Cmd; struct __async_command { pAQ_Cmd next; pAsyncTxn tran; pAsyncUnit unit; int timeout; int retries; int active; }; struct __AsyncUnit { pAsyncUnit next; pAsyncQueue queue; AQU_Notify notify_func; void *notify_cntx; }; typedef enum { eAsyncIdle, eAsyncWaiting, eAsyncConnecting, eAsyncConnected } AsyncState; struct __AsyncQueue { pObjectDescriptor pDes; char *queue_name; char *pHost; int iPort; int iDelay; /* intercommand delay in milliseconds */ int timeout; int retries; int retryTimer; /* mSec delay before next retry */ bool translate; /* translate binary output with escaped chars */ bool trace; struct timeval tvLastCmd; /* time of completion of last command */ int unit_count; /* number of units connected */ pAsyncUnit units; /* head of unit chain */ pAQ_Cmd command_head; /* first/next command in queue */ pAQ_Cmd command_tail; /* last command in queue */ pNWContext nw_ctx; /* NetWait context handle */ pNWTimer nw_tmr; /* NetWait timer handle */ mkChannel *pSock; /* socket address */ AsyncState state; /* Queue Connection State */ pAsyncProtocol protocol; char *noreply_text; int noreply_len; void *context; /**< opaque caller queue context */ }; static pAsyncQueue queue_array[FD_SETSIZE]; static int queue_index = 0; static const char *state_name(AsyncState the_state) { switch (the_state) { case eAsyncIdle: return "eAsyncIdle"; case eAsyncWaiting: return "eAsyncWaiting"; case eAsyncConnecting: return "eAsyncConnecting"; case eAsyncConnected: return "eAsyncConnected"; } return ""; } /* * Free the transaction and buffers */ static void free_transaction(pAsyncTxn myTxn) { if (myTxn) { if (--myTxn->ref_counter > 0) return; /* * Allow kill_private to clean it all if it wants */ if (myTxn->kill_private) myTxn->kill_private(myTxn); if (myTxn->out_buf) free(myTxn->out_buf); if (myTxn->inp_buf) free(myTxn->inp_buf); if (myTxn->proto_private) free(myTxn->proto_private); free(myTxn); } } /* * Free the command and transaction structures and their contents */ static void free_command(pAQ_Cmd myCmd) { if (myCmd) { free_transaction(myCmd->tran); free(myCmd); } } /* ---------------------------- Local ------------------------------------ CreateSocketAdress stolen from Tcl. Thanks to John Ousterhout */ static int CreateSocketAdress(struct sockaddr_in *sockaddrPtr, /* Socket address */ char *host, /* Host. NULL implies INADDR_ANY */ int port) { /* Port number */ struct hostent *hostent; /* Host database entry */ struct in_addr addr; /* For 64/32 bit madness */ (void) memset((char *) sockaddrPtr, '\0', sizeof(struct sockaddr_in)); sockaddrPtr->sin_family = AF_INET; sockaddrPtr->sin_port = htons((unsigned short) (port & 0xFFFF)); if (host == NULL) { addr.s_addr = INADDR_ANY; } else { hostent = gethostbyname(host); if (hostent != NULL) { memcpy((char *) &addr, (char *) hostent->h_addr_list[0], (size_t) hostent->h_length); } else { addr.s_addr = inet_addr(host); if (addr.s_addr == (unsigned int) -1) { return 0; /* error */ } } } /* * There is a rumor that this assignment may require care on * some 64 bit machines. */ sockaddrPtr->sin_addr.s_addr = addr.s_addr; return 1; } static int AQ_ClearTimer(pAsyncQueue self) { if (self->nw_tmr) { NetWatchRemoveTimer(self->nw_tmr); self->nw_tmr = 0; return 1; } return 0; } static int AQ_SetTimer(pAsyncQueue self, int msecs, pNWCallback callback, void *context) { int ret = 1; if (self->nw_tmr) { ret = AQ_ClearTimer(self); } NetWatchRegisterTimer(&self->nw_tmr, msecs, callback, context); return ret; } static void AQ_Purge(pAsyncQueue self) { pAQ_Cmd myCmd = self->command_head; if (self->nw_tmr) AQ_ClearTimer(self); gettimeofday(&self->tvLastCmd, NULL); while (myCmd) { /* Process any callback */ if (myCmd->tran->handleResponse) { myCmd->tran->txn_status = ATX_TIMEOUT; /* TODO should be ATX_DISCO */ myCmd->tran->handleResponse(myCmd->tran); } /* * Remove this transaction from the queue */ if (myCmd->next) { self->command_head = myCmd->next; } else self->command_head = self->command_tail = NULL; free_command(myCmd); myCmd = self->command_head; } } static void AQ_Notify(pAsyncQueue self, int event) { pAsyncUnit unit; if (self->state != eAsyncConnected) Log(DEBUG,"asquio", "Function:%s:%s", self->queue_name, __func__); for (unit = self->units; unit; unit = unit->next) if (unit->notify_func != NULL) unit->notify_func(unit->notify_cntx, event); } static int TimedReconnect(void *cntx, int mode) { int iRet; char line[132]; pAsyncQueue self = (pAsyncQueue) cntx; self->nw_tmr = 0; if (self->state != eAsyncConnected) Log(DEBUG,"asquio", "Function: %s:%s\n", self->queue_name, __func__); AQ_Purge(self); /* TODO: if self->pSock is NULL we haven't connected yet */ iRet = NETReconnect(self->pSock); /* * iRet can take the following values: * -1: The request failed * 0: The request is still in progress * +1: The request succeeded */ if (iRet <= 0) { if (iRet < 0) { snprintf(line, 132, "Failed reconnect on AsyncQueue '%s'", self->queue_name); Log(DEBUG,"asquio","%s",line); /* Timer for retry */ NetWatchSetMode(self->nw_ctx, 0); /* implement an exponential backoff within limits */ self->retryTimer = 2 * self->retryTimer; if (self->retryTimer < 125) self->retryTimer = 125; if (self->retryTimer > 16000) self->retryTimer = 16000; AQ_SetTimer(self, self->retryTimer, TimedReconnect, self); Log(DEBUG,"asquio", "In %s:%s: state %s => eAsyncWaiting", self->queue_name, __func__, state_name(self->state)); self->state = eAsyncWaiting; } else { NetWatchSetMode(self->nw_ctx, nwatch_write); Log(DEBUG,"asquio", "In %s:%s: state %s => eAsyncConnecting\n", self->queue_name, __func__, state_name(self->state)); self->state = eAsyncConnecting; /* await reconnect result in MyCallback */ } return 1; } NetWatchSetMode(self->nw_ctx, nwatch_read); Log(DEBUG,"asquio", "In %s:%s: state %s => eAsyncConnected\n", self->queue_name, __func__, state_name(self->state)); self->state = eAsyncConnected; snprintf(line, 132, "Reconnect on AsyncQueue '%s'", self->queue_name); Log(DEBUG,"asquio","%s",line); AQ_Purge(self); AQ_Notify(self, AQU_RECONNECT); return 1; } static int AQ_Reconnect(pAsyncQueue self) { int iRet; char line[132]; if (self->state != eAsyncConnected) Log(DEBUG,"asquio", "Function: %s:%s\n", self->queue_name, __func__); /* * Remove any old timer */ if (self->nw_tmr) AQ_ClearTimer(self); if (self->state == eAsyncConnected) { self->state = eAsyncIdle; Log(DEBUG,"asquio", "Disconnect on AsyncQueue '%s'", self->queue_name); AQ_Notify(self, AQU_DISCONNECT); AQ_Purge(self); } iRet = NETReconnect(self->pSock); /* * iRet can take the following values: * -1: The request failed * 0: The request is still in progress * +1: The request succeeded */ if (iRet <= 0) { if (iRet < 0) { /* Timer for retry */ NetWatchSetMode(self->nw_ctx, 0); /* implement an exponential backoff within limits */ self->retryTimer = 125; /* initial delay */ AQ_SetTimer(self, self->retryTimer, TimedReconnect, self); Log(DEBUG,"asquio", "In %s:%s: state %s => eAsyncWaiting\n", self->queue_name, __func__, state_name(self->state)); self->state = eAsyncWaiting; } else { NetWatchSetMode(self->nw_ctx, nwatch_write); Log(DEBUG,"asquio", "In %s:%s: state %s => eAsyncConnecting\n", self->queue_name, __func__, state_name(self->state)); self->state = eAsyncConnecting; /* await reconnect result in MyCallback */ } return iRet; } NetWatchSetMode(self->nw_ctx, nwatch_read); Log(DEBUG,"asquio", "In %s:%s: state %s => eAsyncConnected\n", self->queue_name, __func__, state_name(self->state)); self->state = eAsyncConnected; snprintf(line, 132, "Reconnect on AsyncQueue '%s'", self->queue_name); Log(DEBUG,"asquio",line); AQ_Purge(self); AQ_Notify(self, AQU_RECONNECT); return 1; } static int CommandTimeout(void *cntx, int mode); static int DelayedStart(void *cntx, int mode); static int PopCommand(pAsyncQueue self); static int StartCommand(pAsyncQueue self) { pAQ_Cmd myCmd = self->command_head; mkChannel *sock = self->pSock; int iRet = 0; if (self->state != eAsyncConnected) Log(DEBUG,"asquio", "Function: %s:%s\n", self->queue_name, __func__); if (myCmd == NULL) return OKOK; /* * Remove any old command timeout timer */ if (self->nw_tmr) AQ_ClearTimer(self); /* * Implement the inter-command delay */ if (self->iDelay) { struct timeval now, when; gettimeofday(&now, NULL); if (self->tvLastCmd.tv_sec == 0) self->tvLastCmd = now; when.tv_sec = self->tvLastCmd.tv_sec; when.tv_usec = self->tvLastCmd.tv_usec + 1000 * self->iDelay; if (when.tv_usec >= 1000000) { when.tv_sec += when.tv_usec / 1000000; when.tv_usec %= 1000000; } if (when.tv_sec > now.tv_sec || (when.tv_sec == now.tv_sec && when.tv_usec > now.tv_usec)) { int delay = when.tv_sec - now.tv_sec; delay *= 1000; delay += (when.tv_usec - now.tv_usec + (1000 - 1)) / 1000; AQ_SetTimer(self, delay, DelayedStart, self); return OKOK; } } /* * Discard any input before sending command */ if (NETAvailable(sock, 0)) { while (NETAvailable(sock, 0)) { /* TODO: handle unsolicited input */ char reply[128]; iRet = NETRead(sock, reply, 128, 0); if (iRet < 0) { /* EOF */ iRet = AQ_Reconnect(self); return 0; } else if (iRet > 0) { struct timeval tv; gettimeofday(&tv, NULL); Log(ERROR, "asquio","%d unsolicited chars in AsyncQueue %s",iRet, self->queue_name); LogHex(&tv,ERROR,SASQIO,reply,iRet); } } } myCmd->tran->txn_status = ATX_ACTIVE; if (self->protocol->sendCommand) { iRet = self->protocol->sendCommand(self->protocol, myCmd->tran); } else { pAsyncTxn txn = myCmd->tran; iRet = AsyncUnitWrite(txn->unit, txn->out_buf, txn->out_len); if (iRet < 0) iRet = 0; else iRet = 1; } /* * Handle case of no response expected */ if (myCmd->tran->inp_len == 0 || myCmd->tran->inp_buf == NULL) { myCmd->tran->txn_status = ATX_COMPLETE; return PopCommand(self); } if (iRet > 0) if (myCmd->tran->txn_status == ATX_COMPLETE) return PopCommand(self); /* * Add a new command timeout timer */ if (myCmd->timeout > 0) AQ_SetTimer(self, myCmd->timeout, CommandTimeout, self); else AQ_SetTimer(self, 30000, CommandTimeout, self); myCmd->active = 1; return iRet; } static int QueCommandHead(pAsyncQueue self, pAQ_Cmd cmd) { cmd->next = NULL; /* * If the command queue is empty, start transmission */ if (self->command_head == NULL) { self->command_head = self->command_tail = cmd; StartCommand(self); return 1; } if (self->command_head->active) { cmd->next = self->command_head->next; self->command_head->next = cmd; } else { cmd->next = self->command_head; self->command_head = cmd; } if (cmd->next == NULL) self->command_tail = cmd; return 1; } static int QueCommand(pAsyncQueue self, pAQ_Cmd cmd) { cmd->next = NULL; /* * If the command queue is empty, start transmission */ if (self->command_head == NULL) { self->command_head = self->command_tail = cmd; StartCommand(self); return 1; } self->command_tail->next = cmd; self->command_tail = cmd; return 1; } static int PopCommand(pAsyncQueue self) { pAQ_Cmd myCmd = self->command_head; if (self->nw_tmr) AQ_ClearTimer(self); gettimeofday(&self->tvLastCmd, NULL); /* Process any callback */ if (myCmd->tran->handleResponse) myCmd->tran->handleResponse(myCmd->tran); /* * If this is not the last in queue, start transmission */ if (myCmd->next) { pAQ_Cmd pNew = myCmd->next; self->command_head = pNew; StartCommand(self); } else self->command_head = self->command_tail = NULL; free_command(myCmd); return 1; } static int CommandTimeout(void *cntx, int mode) { pAsyncQueue self = (pAsyncQueue) cntx; pAQ_Cmd myCmd = self->command_head; self->nw_tmr = 0; if (self->trace) { struct timeval tv; gettimeofday(&tv, NULL); Log(DEBUG,"asquio", "Timeout Trace on AsyncQueue %s", self->queue_name); LogHex(&tv,DEBUG,SASQIO,myCmd->tran->inp_buf,myCmd->tran->inp_idx); } if (myCmd->retries > 0) { --myCmd->retries; StartCommand(self); } else { int iRet; iRet = self->protocol->handleEvent(self->protocol, myCmd->tran, AQU_TIMEOUT); if (iRet == AQU_POP_CMD) { PopCommand(self); /* remove command */ } else if (iRet == AQU_RETRY_CMD) StartCommand(self); /* restart command */ else if (iRet == AQU_RECONNECT) AQ_Reconnect(self); } return 1; } static int DelayedStart(void *cntx, int mode) { pAsyncQueue self = (pAsyncQueue) cntx; if (self->state != eAsyncConnected) Log(DEBUG,"asquio", "Function: %s:%s\n", self->queue_name, __func__); self->nw_tmr = 0; StartCommand(self); return 1; } static void LogHexPrefix(struct timeval *tv, unsigned int severity, char *prefix, char *buffer, int bufferLength) { char *hexData = NULL; hexData = bytesToHexString((uint8_t *)buffer,(size_t)bufferLength); if(hexData != NULL){ LogTS(tv,severity,SASQIO,"%s:%s", prefix,hexData); free(hexData); } } static int MyCallback(void *context, int mode) { pAsyncQueue self = (pAsyncQueue) context; if (self->state != eAsyncConnected) Log(DEBUG,"asquio", "Function: %s:%s\n", self->queue_name, __func__); if (mode & nwatch_read) { int iRet; char reply[100]; iRet = NETRead(self->pSock, reply, 100, 0); if (iRet < 0) { /* EOF */ iRet = AQ_Reconnect(self); if (iRet <= 0) return iRet; /* restart the command */ StartCommand(self); return 1; } if (iRet == 0) { /* TODO: timeout or error */ return 0; } else { int nchars = iRet; int i = 0; pAQ_Cmd myCmd = self->command_head; if (myCmd) { for (i = 0; i < nchars; ++i) { iRet = self->protocol->handleInput(self->protocol, myCmd->tran, reply[i] & 0xFF); if (iRet == 0 || iRet == AQU_POP_CMD) { /* end of command */ if (self->trace) { struct timeval tv; gettimeofday(&tv, NULL); Log(INFO, "asquio","Input Trace on AsyncQueue %s", self->queue_name); LogHex(&tv,INFO,SASQIO,myCmd->tran->inp_buf, myCmd->tran->inp_idx); } PopCommand(self); break; } else if (iRet < 0) { int excess = nchars - 1 - i; struct timeval tv; gettimeofday(&tv, NULL); Log(ERROR,"asquio", "Protocol error %d in AsyncQueue %s", iRet, self->queue_name); LogHexPrefix(&tv,ERROR,"Sent", myCmd->tran->out_buf,myCmd->tran->out_len); LogHexPrefix(&tv,ERROR,"Received", myCmd->tran->inp_buf,myCmd->tran->inp_len); LogHexPrefix(&tv,ERROR,"Processed", &reply[0],i); LogHexPrefix(&tv,ERROR,"Unprocessed", &reply[i],excess); break; } } if (i < nchars - 1) { int excess = nchars - 1 - i; struct timeval tv; gettimeofday(&tv, NULL); Log(ERROR, "asquio", "%d excess chars in AsyncQueue %s", excess, self->queue_name); /* TODO: handle unsolicited */ } } else { int excess = nchars - 1 - i; struct timeval tv; gettimeofday(&tv, NULL); Log(ERROR, "asquio", "%d unsolicited chars in AsyncQueue %s", excess, self->queue_name); /* TODO: handle unsolicited input */ } } } if (mode & nwatch_write) { char line[132]; Log(DEBUG,"asquio", "Writeable socket callback on AsyncQueue %s", self->queue_name); NetWatchSetMode(self->nw_ctx, nwatch_read); Log(DEBUG,"asquio", "In %s:%s: state %s => eAsyncConnected\n", self->queue_name, __func__, state_name(self->state)); self->state = eAsyncConnected; } return 1; } int AsyncUnitEnqueueHead(pAsyncUnit unit, pAsyncTxn context) { pAQ_Cmd myCmd = NULL; assert(unit && unit->queue && unit->queue->protocol); myCmd = (pAQ_Cmd) malloc(sizeof(AQ_Cmd)); if (myCmd == NULL) { Log(ERROR,"asquio","Out of memory in AsyncUnitEnqueHead", eError); return 0; } memset(myCmd, 0, sizeof(AQ_Cmd)); myCmd->tran = context; myCmd->unit = unit; myCmd->timeout = unit->queue->timeout; myCmd->retries = unit->queue->retries; myCmd->active = 0; return QueCommandHead(unit->queue, myCmd); } int AsyncUnitEnqueueTxn(pAsyncUnit unit, pAsyncTxn pTxn) { pAQ_Cmd myCmd = NULL; assert(unit && unit->queue && unit->queue->protocol); myCmd = (pAQ_Cmd) malloc(sizeof(AQ_Cmd)); if (myCmd == NULL) { Log(ERROR,"asquio","%s","Out of memory in AsyncUnitEnqueueTxn"); return 0; } memset(myCmd, 0, sizeof(AQ_Cmd)); myCmd->tran = pTxn; myCmd->unit = unit; myCmd->timeout = unit->queue->timeout; myCmd->retries = unit->queue->retries; myCmd->active = 0; return QueCommand(unit->queue, myCmd); } pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit, const char *command, int cmd_len, AsyncTxnHandler callback, void *context, int rsp_len) { pAsyncTxn myTxn = NULL; assert(unit); myTxn = (pAsyncTxn) malloc(sizeof(AsyncTxn)); if (myTxn == NULL) { Log(ERROR,"asquio","%s","Out of memory in AsyncUnitPrepareTxn", eError); return NULL; } memset(myTxn, 0, sizeof(AsyncTxn)); if (unit->queue->noreply_text) { if (cmd_len > unit->queue->noreply_len && strncasecmp(&command[cmd_len - unit->queue->noreply_len], unit->queue->noreply_text, unit->queue->noreply_len) == 0) { rsp_len = 0; cmd_len -= unit->queue->noreply_len; } } else { if (cmd_len > 3 && strncmp(&command[cmd_len - 3], "{0}", 3) == 0) { rsp_len = 0; cmd_len -= 3; } else if (cmd_len > 11 && strncasecmp(&command[cmd_len - 11], "@@NOREPLY@@", 11) == 0) { rsp_len = 0; cmd_len -= 11; } } if (unit->queue->protocol->prepareTxn) { int iRet; myTxn->inp_len = rsp_len; /* allowing protocol to change it */ iRet = unit->queue->protocol->prepareTxn(unit->queue->protocol, myTxn, command, cmd_len, rsp_len); if (iRet == 0) { free(myTxn); return NULL; } rsp_len = myTxn->inp_len; /* allowed protocol to change it */ } else { myTxn->out_buf = (char *) malloc(cmd_len + 5); if (myTxn->out_buf == NULL) { Log(ERROR,"asquio","%s","Out of memory in AsyncUnitPrepareTxn"); free(myTxn); return NULL; } memcpy(myTxn->out_buf, command, cmd_len); myTxn->out_len = cmd_len; if (myTxn->out_len < 2 || myTxn->out_buf[myTxn->out_len - 1] != 0x0A || myTxn->out_buf[myTxn->out_len - 2] != 0x0D) { myTxn->out_buf[myTxn->out_len++] = 0x0D; myTxn->out_buf[myTxn->out_len++] = 0x0A; } myTxn->out_buf[myTxn->out_len] = '\0'; } if (rsp_len == 0) myTxn->inp_buf = NULL; else { myTxn->inp_buf = malloc(rsp_len + 1); if (myTxn->inp_buf == NULL) { Log(ERROR,"asquio","%s","Out of memory in AsyncUnitPrepareTxn", eError); free(myTxn->out_buf); free(myTxn); return NULL; } memset(myTxn->inp_buf, 0, rsp_len + 1); } myTxn->inp_len = rsp_len; myTxn->unit = unit; myTxn->handleResponse = callback; myTxn->cntx = context; myTxn->ref_counter = 1; return myTxn; } pAsyncTxn AsyncUnitHoldTxn(pAsyncTxn txn) { if (txn) txn->ref_counter++; return txn; } void AsyncUnitFreeTxn(pAsyncTxn txn) { if (txn) free_transaction(txn); } int AsyncUnitSendTxn(pAsyncUnit unit, const char *command, int cmd_len, AsyncTxnHandler callback, void *context, int rsp_len) { pAsyncTxn myTxn = NULL; myTxn = AsyncUnitPrepareTxn(unit, command, cmd_len, callback, context, rsp_len); if (myTxn == NULL) return -1; return AsyncUnitEnqueueTxn(unit, myTxn); } typedef struct txn_s { char *transReply; int transWait; int respLen; } TXN, *pTXN; /** * \brief TransCallback is the callback for the general command transaction. */ static int TransCallback(pAsyncTxn pCmd) { char *resp = pCmd->inp_buf; int resp_len = pCmd->inp_idx; pTXN self = (pTXN) pCmd->cntx; self->respLen = resp_len; if (resp_len > 0) { memcpy(self->transReply, resp, resp_len); self->transReply[resp_len] = '\0'; } else self->transReply[0] = '\0'; if (pCmd->txn_status == ATX_TIMEOUT) self->transWait = -1; else self->transWait = 0; return 0; } int AsyncUnitTransact(pAsyncUnit unit, const char *command, int cmd_len, char *response, int *rsp_len) { TXN txn; assert(unit); txn.transReply = response; txn.respLen = *rsp_len; txn.transWait = 1; AsyncUnitSendTxn(unit, command, cmd_len, TransCallback, &txn, *rsp_len); while (txn.transWait == 1) TaskYield(pServ->pTasker); *rsp_len = txn.respLen; if (txn.transWait < 0) return txn.transWait; return 1; } int AsyncUnitWrite(pAsyncUnit unit, void *buffer, int buflen) { int iRet; mkChannel *sock; assert(unit); assert(unit->queue); if (buflen > 0) { if (unit->queue->trace) { struct timeval tv; gettimeofday(&tv, NULL); LogHexPrefix(&tv,DEBUG,"Output Trace on AsyncQueue", buffer, buflen); } sock = AsyncUnitGetSocket(unit); iRet = NETWrite(sock, buffer, buflen); /* TODO handle errors */ if (iRet < 0) { /* EOF */ iRet = AQ_Reconnect(unit->queue); if (iRet == 0) return 0; } } return 1; } void AsyncUnitSetNotify(pAsyncUnit unit, void *context, AQU_Notify notify) { assert(unit); unit->notify_func = notify; unit->notify_cntx = context; } int AsyncUnitGetDelay(pAsyncUnit unit) { assert(unit); return unit->queue->iDelay; } void AsyncUnitSetDelay(pAsyncUnit unit, int iDelay) { assert(unit); unit->queue->iDelay = iDelay; } int AsyncUnitGetTimeout(pAsyncUnit unit) { assert(unit); return unit->queue->timeout; } void AsyncUnitSetTimeout(pAsyncUnit unit, int timeout) { assert(unit); unit->queue->timeout = timeout; } int AsyncUnitGetRetries(pAsyncUnit unit) { assert(unit); return unit->queue->retries; } void AsyncUnitSetRetries(pAsyncUnit unit, int retries) { assert(unit); unit->queue->retries = retries; } pAsyncProtocol AsyncUnitGetProtocol(pAsyncUnit unit) { return unit->queue->protocol; } void AsyncUnitSetProtocol(pAsyncUnit unit, pAsyncProtocol protocol) { unit->queue->protocol = protocol; } mkChannel *AsyncUnitGetSocket(pAsyncUnit unit) { assert(unit); assert(unit->queue); return unit->queue->pSock; } int AsyncUnitReconnect(pAsyncUnit unit) { int iRet; assert(unit); assert(unit->queue); iRet = AQ_Reconnect(unit->queue); /* TODO: handle in-progress */ return iRet; } int AsyncQueueAction(SConnection * pCon, SicsInterp * pSics, void *pData, int argc, char *argv[]) { char line[132]; pAsyncQueue self = (pAsyncQueue) pData; if (argc > 1) { if (strcasecmp("send", argv[1]) == 0 || strcasecmp("transact", argv[1]) == 0) { AsyncUnit myUnit; char cmd[10240]; char rsp[10240]; int idx = 0; int i, j, len; cmd[0] = '\0'; /* Managers only */ if (!SCMatchRights(pCon, usMugger)) return 0; for (i = 2; i < argc; ++i) { if (idx >= 10240) break; if (i > 2) cmd[idx++] = ' '; len = strlen(argv[i]); for (j = 0; j < len; ++j) { if (idx >= 10240) break; if (argv[i][j] == '\\') { ++j; if (argv[i][j] == '\\') cmd[idx++] = '\\'; else if (argv[i][j] == 'r') cmd[idx++] = '\r'; else if (argv[i][j] == 'n') cmd[idx++] = '\n'; else if (isdigit(argv[i][j])) { char *nptr = &argv[i][j]; char *endptr; long k = strtol(nptr, &endptr, 0); cmd[idx++] = k; j += (endptr - nptr); --j; /* prepare for loop increment */ } else cmd[idx++] = argv[i][j]; } else cmd[idx++] = argv[i][j]; } } cmd[idx] = '\0'; memset(&myUnit, 0, sizeof(AsyncUnit)); myUnit.queue = self; len = 10240; (void) AsyncUnitTransact(&myUnit, cmd, idx, rsp, &len); /* escape control characters in response */ j = 0; for (i = 0; i < len; ++i) { if (j >= 10230) break; if (self->translate) { if (rsp[i] < 32 || rsp[i] >= 127) { if (rsp[i] == '\r') { cmd[j++] = '\r'; } else if (rsp[i] == '\n') { cmd[j++] = '\n'; } else { j += snprintf(&cmd[j], 6, "\\0x%02x", rsp[i]); } } else if (rsp[i] == '\\') { cmd[j++] = '\\'; cmd[j++] = '\\'; } else cmd[j++] = rsp[i]; } else cmd[j++] = rsp[i]; } cmd[j++] = '\0'; SCWrite(pCon, cmd, eValue); return 1; } if (strcasecmp(argv[1], "reconnect") == 0) { AQ_Reconnect(self); return OKOK; } if (strcasecmp(argv[1], "delay") == 0) { if (argc > 2) { int delay; int iRet; iRet = sscanf(argv[2], "%d", &delay); if (iRet != 1) { snprintf(line, 132, "Invalid argument: %s", argv[2]); SCWrite(pCon, line, eError); return 0; } else { if (delay < 0 || delay > 300000) { snprintf(line, 132, "Value out of range: %d", delay); SCWrite(pCon, line, eError); return 0; } self->iDelay = delay; return OKOK; } } else { snprintf(line, 132, "%s.delay = %d", argv[0], self->iDelay); SCWrite(pCon, line, eValue); return OKOK; } return OKOK; } if (strcasecmp(argv[1], "timeout") == 0) { if (argc > 2) { int timeout; int iRet; iRet = sscanf(argv[2], "%d", &timeout); if (iRet != 1) { snprintf(line, 132, "Invalid argument: %s", argv[2]); SCWrite(pCon, line, eError); return 0; } else { if (timeout < 0 || timeout > 30000) { snprintf(line, 132, "Value out of range: %d", timeout); SCWrite(pCon, line, eError); return 0; } self->timeout = timeout; return OKOK; } } else { snprintf(line, 132, "%s.timeout = %d", argv[0], self->timeout); SCWrite(pCon, line, eValue); return OKOK; } return OKOK; } if (strcasecmp(argv[1], "retries") == 0) { if (argc > 2) { int retries; int iRet; iRet = sscanf(argv[2], "%d", &retries); if (iRet != 1) { snprintf(line, 132, "Invalid argument: %s", argv[2]); SCWrite(pCon, line, eError); return 0; } else { if (retries < 0 || retries > 30000) { snprintf(line, 132, "Value out of range: %d", retries); SCWrite(pCon, line, eError); return 0; } self->retries = retries; return OKOK; } } else { snprintf(line, 132, "%s.retries = %d", argv[0], self->retries); SCWrite(pCon, line, eValue); return OKOK; } return OKOK; } if (strcasecmp(argv[1], "translate") == 0) { if (argc > 2) { int value; int iRet; iRet = sscanf(argv[2], "%d", &value); if (iRet != 1) { snprintf(line, 132, "Invalid argument: %s", argv[2]); SCWrite(pCon, line, eError); return 0; } else { if (value == 0) { self->translate = false; return OKOK; } else if (value == 1) { self->translate = true; return OKOK; } snprintf(line, 132, "Invalid argument: %s", argv[2]); SCWrite(pCon, line, eError); return 0; } } else { snprintf(line, 132, "%s.translate = %d", argv[0], self->translate); SCWrite(pCon, line, eStatus); return OKOK; } return OKOK; } if (strcasecmp(argv[1], "trace") == 0) { if (argc > 2) { int value; int iRet; iRet = sscanf(argv[2], "%d", &value); if (iRet != 1) { snprintf(line, 132, "Invalid argument: %s", argv[2]); SCWrite(pCon, line, eError); return 0; } else { if (value == 0) { self->trace = false; return OKOK; } else if (value == 1) { self->trace = true; return OKOK; } snprintf(line, 132, "Invalid argument: %s", argv[2]); SCWrite(pCon, line, eError); return 0; } } else { snprintf(line, 132, "%s.trace = %d", argv[0], self->trace); SCWrite(pCon, line, eStatus); return OKOK; } return OKOK; } if (strcasecmp(argv[1], "noreply") == 0) { if (argc > 2) { if (self->noreply_text) free(self->noreply_text); self->noreply_text = strdup(argv[2]); self->noreply_len = strlen(argv[2]); } else { SCPrintf(pCon, eValue, "%s.noreply = %s", argv[0], self->noreply_text); } return OKOK; } if (strcasecmp(argv[1], "list") == 0) { SCPrintf(pCon, eValue, "%s.delay = %d", argv[0], self->iDelay); SCPrintf(pCon, eValue, "%s.timeout = %d", argv[0], self->timeout); SCPrintf(pCon, eValue, "%s.retries = %d", argv[0], self->retries); SCPrintf(pCon, eValue, "%s.translate = %d", argv[0], self->translate); SCPrintf(pCon, eValue, "%s.trace = %d", argv[0], self->trace); if (self->noreply_text) SCPrintf(pCon, eValue, "%s.noreply = %s", argv[0], self->noreply_text); if (self->protocol && self->protocol->protocolName) SCPrintf(pCon, eValue, "%s.protocol = %s", argv[0], self->protocol->protocolName); return OKOK; } } snprintf(line, 132, "%s does not understand %s", argv[0], argv[1]); SCWrite(pCon, line, eError); return 0; } static pAsyncQueue AQ_Create(const char *host, const char *port) { int i; pAsyncQueue self = NULL; mkChannel *channel = NULL; if (host == NULL) return NULL; /* try the AsyncQueue with this name */ self = (pAsyncQueue) FindCommandData(pServ->pSics, (char *) host, "AsyncQueue"); /* try host and port */ if (self == NULL && port) { int port_no = atoi(port); if (port_no == 0) { struct servent *sp = NULL; sp = getservbyname(port, NULL); if (sp) port_no = ntohs(sp->s_port); } if (port_no > 0) { struct sockaddr_in sa; if (CreateSocketAdress(&sa, (char *) host, port_no)) { /* look for queue with same address */ for (i = 0; i < queue_index; ++i) if (queue_array[i]->pSock->adresse.sin_port == sa.sin_port && queue_array[i]->pSock->adresse.sin_addr.s_addr == sa.sin_addr.s_addr) { self = queue_array[i]; break; } } if (self == NULL) { channel = NETConnectWithFlags((char *) host, port_no, 0); /* TODO handle asynchronous connection */ } } } if (self == NULL) { /* TODO: if channel (self->pSock) is NULL we haven't connected yet, do it later */ if (channel == NULL) return NULL; self = (pAsyncQueue) malloc(sizeof(AsyncQueue)); if (self == NULL) return NULL; memset(self, 0, sizeof(AsyncQueue)); self->pSock = channel; self->pDes = CreateDescriptor("AsyncQueue"); queue_array[queue_index++] = self; } for (i = 0; i < queue_index; ++i) if (queue_array[i] == self) { break; } if (i == queue_index) queue_array[queue_index++] = self; /* TODO: if self->pSock is NULL we haven't connected yet */ #if 0 if (channel == NULL) { /* TODO: all the rest */ AQ_SetTimer(self, self->retryTimer, TimedReconnect, self); } #endif return self; } static int AQ_Init(pAsyncQueue self) { /* Init the controller */ if (self->nw_ctx == NULL) NetWatchRegisterCallback(&self->nw_ctx, self->pSock->sockid, MyCallback, self); NetWatchSetMode(self->nw_ctx, nwatch_write | nwatch_read); return 1; } static void AQ_Kill(void *pData) { int i; pAsyncQueue self = (pAsyncQueue) pData; for (i = 0; i < queue_index; ++i) if (queue_array[i] == self) { --queue_index; if (queue_index > 0) queue_array[i] = queue_array[queue_index]; if (self->nw_ctx) NetWatchRemoveCallback(self->nw_ctx); if (self->nw_tmr) AQ_ClearTimer(self); if (self->queue_name) free(self->queue_name); NETClosePort(self->pSock); free(self->pSock); DeleteDescriptor(self->pDes); free(self); return; } } /* * \brief make a AsyncQueue from the command line * * MakeAsyncQueue queueName protocolName hostName portname */ int AsyncQueueFactory(SConnection * pCon, SicsInterp * pSics, void *pData, int argc, char *argv[]) { pAsyncQueue pNew = NULL; mkChannel *channel = NULL; pAsyncProtocol pPro = NULL; int port_no; int iRet = 0; if (argc < 5) { SCWrite(pCon, "ERROR: insufficient arguments to AsyncQueueFactory", eError); return 0; } /* try to find an existing queue with this name */ pNew = (pAsyncQueue) FindCommandData(pServ->pSics, argv[1], "AsyncQueue"); if (pNew != NULL) { char line[132]; snprintf(line, 132, "WARNING: AsyncQueue '%s' already exists", argv[1]); SCWrite(pCon, line, eError); SCSendOK(pCon); return 1; } /* try to find an existing protocol with this name */ pPro = (pAsyncProtocol) FindCommandData(pServ->pSics, argv[2], "AsyncProtocol"); if (pPro == NULL) { char line[132]; snprintf(line, 132, "WARNING: AsyncQueue protocol '%s' not found", argv[2]); SCWrite(pCon, line, eError); return 0; } port_no = atoi(argv[4]); if (port_no == 0) { struct servent *sp = NULL; sp = getservbyname(argv[4], NULL); if (sp) port_no = ntohs(sp->s_port); } if (port_no > 0) { struct sockaddr_in sa; if (CreateSocketAdress(&sa, argv[3], port_no)) { int i; /* look for queue with same address */ for (i = 0; i < queue_index; ++i) if (queue_array[i]->pSock->adresse.sin_port == sa.sin_port && queue_array[i]->pSock->adresse.sin_addr.s_addr == sa.sin_addr.s_addr) { char line[132]; snprintf(line, 132, "WARNING: AsyncQueue '%s' has same address as %s", argv[1], queue_array[i]->queue_name); SCWrite(pCon, line, eError); } } /* TODO: implement asynchronous connection */ channel = NETConnectWithFlags(argv[3], port_no, 0); } if (channel == NULL) { char line[132]; snprintf(line, 132, "ERROR: AsyncQueue '%s' cannot connect", argv[1]); SCWrite(pCon, line, eError); return 0; } pNew = (pAsyncQueue) malloc(sizeof(AsyncQueue)); if (pNew == NULL) { char line[132]; snprintf(line, 132, "ERROR: AsyncQueue '%s' memory failure", argv[1]); SCWrite(pCon, line, eError); return 0; } memset(pNew, 0, sizeof(AsyncQueue)); pNew->pDes = CreateDescriptor("AsyncQueue"); pNew->queue_name = strdup(argv[1]); pNew->protocol = pPro; pNew->pSock = channel; queue_array[queue_index++] = pNew; AQ_Init(pNew); /* create the command */ iRet = AddCommand(pSics, argv[1], AsyncQueueAction, AQ_Kill, pNew); if (!iRet) { char line[132]; snprintf(line, 123, "ERROR: add command %s failed", argv[1]); SCWrite(pCon, line, eError); AQ_Kill(pNew); return 0; } SCSendOK(pCon); return 1; } /* * \brief make a AsyncQueue from a named rs232 controller * * \param name the name of the SICS "RS232 Controller" object * \param handle the handle to the AsyncQueue object * \return 0 for FAILURE, 1 for SUCCESS */ int AsyncUnitCreateHost(const char *host, const char *port, pAsyncUnit * handle) { int status; pAsyncQueue self = NULL; pAsyncUnit unit = NULL; *handle = NULL; self = AQ_Create(host, port); if (self == NULL) return 0; status = AQ_Init(self); unit = (pAsyncUnit) malloc(sizeof(AsyncUnit)); if (unit == NULL) { Log(ERROR,"asquio","%s","Out of memory in AsyncUnitCreateHost", eError); *handle = NULL; return 0; } memset(unit, 0, sizeof(AsyncUnit)); ++self->unit_count; unit->queue = self; unit->next = self->units; self->units = unit; *handle = unit; return 1; } int AsyncUnitCreate(const char *host, pAsyncUnit * handle) { return AsyncUnitCreateHost(host, NULL, handle); } int AsyncUnitDestroy(pAsyncUnit unit) { assert(unit); assert(unit->queue); pAsyncQueue self = unit->queue; pAsyncUnit *pNxt = &self->units; while (*pNxt) { if (*pNxt == unit) { *pNxt = (*pNxt)->next; break; } pNxt = &(*pNxt)->next; } --self->unit_count; if (self->unit_count <= 0) { AQ_Kill(self); } free(unit); return 1; } pAsyncUnit AsyncUnitFromQueue(pAsyncQueue queue) { pAsyncUnit result = NULL; result = malloc(sizeof(AsyncUnit)); if (result == NULL) { return NULL; } memset(result, 0, sizeof(AsyncUnit)); result->queue = queue; return result; } void *AsyncUnitSetQueueContext(pAsyncUnit unit, void *cntx) { void *hold; assert(unit); assert(unit->queue); hold = unit->queue->context; unit->queue->context = cntx; return hold; } void *AsyncUnitGetQueueContext(pAsyncUnit unit) { assert(unit); assert(unit->queue); return unit->queue->context; } int AsyncUnitIsQueueConnected(pAsyncUnit unit) { assert(unit); assert(unit->queue); if (unit && unit->queue) if (unit->queue->state == eAsyncConnected) return 1; return 0; }