multichan becomes AsyncQueue and AsyncProtocol

r1957 | dcl | 2007-05-11 17:28:31 +1000 (Fri, 11 May 2007) | 2 lines
This commit is contained in:
Douglas Clowes
2007-05-11 17:28:31 +10:00
parent 6d9120f796
commit a18500cbf0
13 changed files with 1785 additions and 503 deletions

154
asyncprotocol.c Normal file
View File

@@ -0,0 +1,154 @@
#include <sics.h>
#include <asyncprotocol.h>
#include <asyncqueue.h>
int defaultSendCommand(pAsyncProtocol p, pAsyncTxn txn) {
txn->txn_state = 0;
/* TODO: anything? */
return AsyncUnitWrite(txn->unit, txn->out_buf, txn->out_len);
}
int defaultHandleInput(pAsyncProtocol p, pAsyncTxn txn, int ch) {
/* TODO: generic terminators */
char term[] = { 0x0D, 0x0A, 0x00 };
if (ch == term[txn->txn_state])
++txn->txn_state;
else
txn->txn_state = 0;
if (txn->inp_idx < txn->inp_len)
txn->inp_buf[txn->inp_idx++] = ch;
if (term[txn->txn_state] == 0) {
if (txn->inp_idx < txn->inp_len)
txn->inp_buf[txn->inp_idx] = '\0';
return AQU_POP_CMD;
}
return 1;
}
int defaultHandleEvent(pAsyncProtocol p, pAsyncTxn txn, int event) {
/* TODO */
}
int defaultPrepareTxn(pAsyncProtocol p, pAsyncTxn txn, const char* cmd, int cmd_len, int rsp_len) {
/* TODO: generic terminators */
char term[] = { 0x0D, 0x0A, 0x00 };
int i;
int state;
state = 0;
for (i = 0; i < cmd_len; ++i) {
if (cmd[i] == 0x00) { /* end of transmission */
cmd_len = i;
break;
}
else if (cmd[i] == term[state]) {
++state;
continue;
}
state = 0;
}
if (term[state] == 0) {
txn->out_buf = malloc(cmd_len + 1);
memcpy(txn->out_buf, cmd, cmd_len + 1);
}
else {
int tlen = strlen(term);
txn->out_buf = malloc(cmd_len + tlen + 1);
memcpy(txn->out_buf, cmd, cmd_len);
memcpy(txn->out_buf + cmd_len, term, tlen + 1);
cmd_len += tlen;
}
txn->out_len = cmd_len;
txn->out_idx = 0;
txn->inp_buf = malloc(rsp_len);
txn->inp_len = rsp_len;
txn->inp_idx = 0;
txn->txn_state = 0;
txn->txn_status = 0;
return 1;
}
int AsyncProtocolNoAction(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[])
{
char line[132];
pAsyncProtocol self = (pAsyncProtocol) pData;
if (argc > 1) {
/* TODO: handle parameters like terminators */
}
snprintf(line, 132, "%s does not understand %s", argv[0], argv[1]);
SCWrite(pCon, line, eError);
return 0;
}
int AsyncProtocolAction(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[])
{
char line[132];
pAsyncProtocol self = (pAsyncProtocol) pData;
/* TODO: terminators */
snprintf(line, 132, "%s does not understand %s", argv[0], argv[1]);
SCWrite(pCon, line, eError);
return 0;
}
void defaultKillPrivate(pAsyncProtocol p) {
/* TODO: anything? */
}
void AsyncProtocolKill(void *pData) {
pAsyncProtocol self = (pAsyncProtocol) pData;
DeleteDescriptor(self->pDes);
/* TODO: more destruction maybe */
if (self->killPrivate)
self->killPrivate(self);
}
pAsyncProtocol AsyncProtocolCreate(SicsInterp *pSics, const char* protocolName,
ObjectFunc pFunc, KillFunc pKFunc) {
int iRet;
pAsyncProtocol self = NULL;
/* try to find an existing queue with this name */
self = (pAsyncProtocol) FindCommandData(pServ->pSics, protocolName, "AsyncProtocol");
if (self != NULL) {
return self;
}
self = (pAsyncProtocol) malloc(sizeof(AsyncProtocol));
if (self == NULL) {
/* TODO */
return NULL;
}
memset(self, 0, sizeof(AsyncProtocol));
self->pDes = CreateDescriptor("AsyncProtocol");
if (pFunc == NULL)
pFunc = AsyncProtocolNoAction;
if (pKFunc == NULL)
pKFunc = AsyncProtocolKill;
iRet = AddCommand(pSics, protocolName, pFunc, pKFunc, self);
if (!iRet ) {
/* TODO */
AsyncProtocolKill(self);
return NULL;
}
self->sendCommand = defaultSendCommand;
self->handleInput = defaultHandleInput;
self->handleEvent = defaultHandleEvent;
self->prepareTxn = defaultPrepareTxn;
self->killPrivate = defaultKillPrivate;
return self;
}
int AsyncProtocolFactory(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]) {
if (argc < 2) {
SCWrite(pCon,"ERROR: insufficient arguments to AsyncProtocolFactory", eError);
return 0;
}
pAsyncProtocol pNew = AsyncProtocolCreate(pSics, argv[1],
AsyncProtocolAction, AsyncProtocolKill);
/* TODO: handle arguments */
pNew->privateData = NULL;
return 1;
}

59
asyncprotocol.h Normal file
View File

@@ -0,0 +1,59 @@
#ifndef ASYNCPROTOCOL
#define ASYNCPROTOCOL
typedef struct __AsyncUnit AsyncUnit, *pAsyncUnit;
typedef struct __async_txn AsyncTxn, *pAsyncTxn;
typedef int (*AsyncTxnHandler)(pAsyncTxn pTxn);
typedef struct __async_protocol AsyncProtocol, *pAsyncProtocol;
pAsyncProtocol AsyncProtocolCreate(SicsInterp *pSics, const char* protocolName,
ObjectFunc pFunc, KillFunc pKFunc);
int AsyncProtocolAction(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]);
int AsyncProtocolFactory(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]);
typedef enum {
ATX_NULL=0,
ATX_TIMEOUT=-1,
ATX_ACTIVE=1,
ATX_COMPLETE=2
} ATX_STATUS;
struct __async_txn {
pAsyncUnit unit; /**< unit that transaction is associated with */
int txn_state; /**< protocol handler transaction parse state */
ATX_STATUS txn_status; /**< status of the transaction OK, Error, ... */
int txn_timeout; /**< transaction timeout in milliseconds */
char* out_buf; /**< output buffer for sendCommand */
int out_len; /**< length of data to be sent */
int out_idx; /**< index of next character to transmit */
char* inp_buf; /**< input buffer for transaction response */
int inp_len; /**< length of input buffer */
int inp_idx; /**< index of next character (number already received) */
AsyncTxnHandler handleResponse; /**< Txn response handler of command sender */
void* cntx; /**< opaque context used by command sender */
/* The cntx field may be used by protocol handler from sendCommand
* as long as it is restored when response is complete
*/
};
/*
* The async protocol interface virtual function table
*/
struct __async_protocol {
pObjectDescriptor pDes;
char* protocolName;
void* privateData;
int (* sendCommand)(pAsyncProtocol p, pAsyncTxn txn);
int (* handleInput)(pAsyncProtocol p, pAsyncTxn txn, int ch);
int (* handleEvent)(pAsyncProtocol p, pAsyncTxn txn, int event);
int (* prepareTxn)(pAsyncProtocol p, pAsyncTxn txn, const char* cmd, int cmd_len, int rsp_len);
void (* killPrivate)(pAsyncProtocol p);
};
#endif /* ASYNCPROTOCOL */

985
asyncqueue.c Normal file
View File

@@ -0,0 +1,985 @@
/*
* A S Y N C Q U E U E
*
* This module manages AsyncQueue communications.
*
* The AsyncQueue is an asynchronous queue between drivers and the device. It
* supports multiple logical units on a single device controller that share a
* single command channel.
*
* Douglas Clowes, February 2007
*
*/
#include <sys/time.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <sics.h>
#include <rs232controller.h>
#include "network.h"
#include "asyncqueue.h"
#include "nwatch.h"
typedef struct __AsyncQueue AsyncQueue, *pAsyncQueue;
typedef struct __async_command AQ_Cmd, *pAQ_Cmd;
struct __async_command {
pAQ_Cmd next;
pAsyncTxn tran;
pAsyncUnit unit;
int timeout;
int retries;
int active;
};
struct __AsyncUnit {
pAsyncUnit next;
pAsyncQueue queue;
AQU_Notify notify_func;
void* notify_cntx;
};
struct __AsyncQueue {
pObjectDescriptor pDes;
char* queue_name;
char* pHost;
int iPort;
int iDelay; /* intercommand delay in milliseconds */
int timeout;
int retries;
struct timeval tvLastCmd; /* time of completion of last command */
int unit_count; /* number of units connected */
pAsyncUnit units; /* head of unit chain */
pAQ_Cmd command_head; /* first/next command in queue */
pAQ_Cmd command_tail; /* last command in queue */
pNWContext nw_ctx; /* NetWait context handle */
pNWTimer nw_tmr; /* NetWait timer handle */
mkChannel* pSock; /* socket address */
pAsyncProtocol protocol;
};
static pAsyncQueue queue_array[FD_SETSIZE];
static int queue_index = 0;
/* ---------------------------- Local ------------------------------------
CreateSocketAdress stolen from Tcl. Thanks to John Ousterhout
*/
static int
CreateSocketAdress(
struct sockaddr_in *sockaddrPtr, /* Socket address */
char *host, /* Host. NULL implies INADDR_ANY */
int port) /* Port number */
{
struct hostent *hostent; /* Host database entry */
struct in_addr addr; /* For 64/32 bit madness */
(void) memset((char *) sockaddrPtr, '\0', sizeof(struct sockaddr_in));
sockaddrPtr->sin_family = AF_INET;
sockaddrPtr->sin_port = htons((unsigned short) (port & 0xFFFF));
if (host == NULL) {
addr.s_addr = INADDR_ANY;
} else {
hostent = gethostbyname(host);
if (hostent != NULL) {
memcpy((char *) &addr,
(char *) hostent->h_addr_list[0], (size_t) hostent->h_length);
} else {
addr.s_addr = inet_addr(host);
if (addr.s_addr == (unsigned long)-1) {
return 0; /* error */
}
}
}
/*
* There is a rumor that this assignment may require care on
* some 64 bit machines.
*/
sockaddrPtr->sin_addr.s_addr = addr.s_addr;
return 1;
}
static void AQ_Notify(pAsyncQueue self, int event)
{
pAsyncUnit unit;
for (unit = self->units; unit; unit = unit->next)
if (unit->notify_func != NULL)
unit->notify_func(unit->notify_cntx, event);
}
static int AQ_Reconnect(pAsyncQueue self)
{
int iRet;
int sock;
int flag = 1;
char line[132];
iRet = NETReconnect(self->pSock);
if (iRet <= 0) {
snprintf(line, 132, "Disconnect on AsyncQueue '%s'", self->queue_name);
SICSLogWrite(line, eStatus);
AQ_Notify(self, AQU_DISCONNECT);
return iRet;
}
snprintf(line, 132, "Reconnect on AsyncQueue '%s'", self->queue_name);
SICSLogWrite(line, eStatus);
AQ_Notify(self, AQU_RECONNECT);
return 1;
}
static int CommandTimeout(void* cntx, int mode);
static int StartCommand(pAsyncQueue self)
{
pAQ_Cmd myCmd = self->command_head;
mkChannel* sock = self->pSock;
if (myCmd == NULL)
return OKOK;
/*
* Remove any old command timeout timer
*/
if (self->nw_tmr)
NetWatchRemoveTimer(self->nw_tmr);
/*
* Implement the inter-command delay
*/
if (self->iDelay) {
struct timeval now, when;
gettimeofday(&now, NULL);
if (self->tvLastCmd.tv_sec == 0)
self->tvLastCmd = now;
when.tv_sec = self->tvLastCmd.tv_sec;
when.tv_usec = self->tvLastCmd.tv_usec + 1000 * self->iDelay;
if (when.tv_usec >= 1000000) {
when.tv_sec += when.tv_usec / 1000000;
when.tv_usec %= 1000000;
}
if (when.tv_sec > now.tv_sec ||
(when.tv_sec == now.tv_sec && when.tv_usec > now.tv_usec)) {
int delay = when.tv_sec - now.tv_sec;
delay *= 1000;
delay += (when.tv_usec - now.tv_usec + (1000 - 1)) / 1000;
NetWatchRegisterTimer(&self->nw_tmr, delay,
CommandTimeout, self);
return OKOK;
}
}
/*
* Discard any input before sending command
*/
while (NETAvailable(sock, 0)) {
/* TODO: handle unsolicited input */
char reply[1];
int iRet;
iRet = NETRead(sock, reply, 1, 0);
if (iRet < 0) { /* TODO: EOF */
iRet = AQ_Reconnect(self);
if (iRet == 0)
return 0;
}
}
/*
* Add a new command timeout timer
*/
if (myCmd->timeout > 0)
NetWatchRegisterTimer(&self->nw_tmr, myCmd->timeout,
CommandTimeout, self);
else
NetWatchRegisterTimer(&self->nw_tmr, 30000,
CommandTimeout, self);
myCmd->active = 1;
return self->protocol->sendCommand(self->protocol, myCmd->tran);
}
static int QueCommandHead(pAsyncQueue self, pAQ_Cmd cmd)
{
cmd->next = NULL;
/*
* If the command queue is empty, start transmission
*/
if (self->command_head == NULL) {
self->command_head = self->command_tail = cmd;
StartCommand(self);
return 1;
}
if (self->command_head->active) {
cmd->next = self->command_head->next;
self->command_head->next = cmd;
}
else {
cmd->next = self->command_head;
self->command_head = cmd;
}
if (cmd->next == NULL)
self->command_tail = cmd;
return 1;
}
static int QueCommand(pAsyncQueue self, pAQ_Cmd cmd)
{
cmd->next = NULL;
/*
* If the command queue is empty, start transmission
*/
if (self->command_head == NULL) {
self->command_head = self->command_tail = cmd;
StartCommand(self);
return 1;
}
self->command_tail->next = cmd;
self->command_tail = cmd;
return 1;
}
static int PopCommand(pAsyncQueue self)
{
pAQ_Cmd myCmd = self->command_head;
if (self->nw_tmr)
NetWatchRemoveTimer(self->nw_tmr);
self->nw_tmr = 0;
gettimeofday(&self->tvLastCmd, NULL);
/*
* If this is not the last in queue, start transmission
*/
if (myCmd->next) {
pAQ_Cmd pNew = myCmd->next;
self->command_head = pNew;
StartCommand(self);
}
else
self->command_head = self->command_tail = NULL;
free(myCmd->tran->out_buf);
free(myCmd->tran->inp_buf);
free(myCmd->tran);
free(myCmd);
return 1;
}
static int CommandTimeout(void* cntx, int mode)
{
pAsyncQueue self = (pAsyncQueue) cntx;
pAQ_Cmd myCmd = self->command_head;
self->nw_tmr = 0;
if (myCmd->retries > 0) {
--myCmd->retries;
StartCommand(self);
}
else {
int iRet;
iRet = self->protocol->handleEvent(self->protocol, myCmd->tran, AQU_TIMEOUT);
if (iRet == AQU_POP_CMD) {
if (myCmd->tran->handleResponse)
myCmd->tran->handleResponse(myCmd->tran);
PopCommand(self); /* remove command */
}
else if (iRet == AQU_RETRY_CMD)
StartCommand(self); /* restart command */
else if (iRet == AQU_RECONNECT)
AQ_Reconnect(self);
}
return 1;
}
static int MyCallback(void* context, int mode)
{
pAsyncQueue self = (pAsyncQueue) context;
if (mode & nwatch_read) {
int iRet;
char reply[1];
iRet = NETRead(self->pSock, reply, 1, 0);
if (iRet < 0) { /* TODO: EOF */
iRet = AQ_Reconnect(self);
if (iRet <= 0)
return iRet;
/* restart the command */
StartCommand(self);
return 1;
}
if (iRet == 0) { /* TODO: timeout or error */
return 0;
} else {
pAQ_Cmd myCmd = self->command_head;
if (myCmd) {
iRet = self->protocol->handleInput(self->protocol, myCmd->tran, reply[0]);
if (iRet == 0 || iRet == AQU_POP_CMD) { /* end of command */
if (myCmd->tran->handleResponse)
myCmd->tran->handleResponse(myCmd->tran);
PopCommand(self);
}
else if (iRet < 0) /* TODO: error */
;
}
else {
/* TODO: handle unsolicited input */
}
}
}
return 1;
}
int AsyncUnitEnqueHead(pAsyncUnit unit, pAsyncTxn context)
{
pAQ_Cmd myCmd = NULL;
assert(unit && unit->queue && unit->queue->protocol);
myCmd = (pAQ_Cmd) malloc(sizeof(AQ_Cmd));
/* TODO: check malloc */
memset(myCmd, 0, sizeof(AQ_Cmd));
myCmd->tran = context;
myCmd->unit = unit;
myCmd->timeout = unit->queue->timeout;
myCmd->retries = unit->queue->retries;
myCmd->active = 0;
return QueCommandHead(unit->queue, myCmd);
}
int AsyncUnitEnqueueTxn(pAsyncUnit unit, pAsyncTxn pTxn)
{
pAQ_Cmd myCmd = NULL;
assert(unit && unit->queue && unit->queue->protocol);
myCmd = (pAQ_Cmd) malloc(sizeof(AQ_Cmd));
/* TODO: check malloc */
memset(myCmd, 0, sizeof(AQ_Cmd));
myCmd->tran = pTxn;
myCmd->unit = unit;
myCmd->timeout = unit->queue->timeout;
myCmd->retries = unit->queue->retries;
myCmd->active = 0;
return QueCommand(unit->queue, myCmd);
}
pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
const char* command, int cmd_len,
AsyncTxnHandler callback, void* context,
int rsp_len)
{
pAsyncTxn myTxn = NULL;
assert(unit);
myTxn = (pAsyncTxn) malloc(sizeof(AsyncTxn));
assert(myTxn);
memset(myTxn, 0, sizeof(AsyncTxn));
if (unit->queue->protocol->prepareTxn) {
int iRet;
iRet = unit->queue->protocol->prepareTxn(unit->queue->protocol, myTxn, command, cmd_len, rsp_len);
}
else {
myTxn->out_buf = (char*) malloc(cmd_len + 5);
memcpy(myTxn->out_buf, command, cmd_len);
myTxn->out_len = cmd_len;
if (myTxn->out_len < 2 ||
myTxn->out_buf[myTxn->out_len - 1] != 0x0A ||
myTxn->out_buf[myTxn->out_len - 2] != 0x0D) {
myTxn->out_buf[myTxn->out_len++] = 0x0D;
myTxn->out_buf[myTxn->out_len++] = 0x0A;
}
myTxn->out_buf[myTxn->out_len] = '\0';
}
if (rsp_len == 0)
myTxn->inp_buf = NULL;
else {
myTxn->inp_buf = malloc(rsp_len + 1);
memset(myTxn->inp_buf, 0, rsp_len + 1);
}
myTxn->inp_len = rsp_len;
myTxn->unit = unit;
myTxn->handleResponse = callback;
myTxn->cntx = context;
return myTxn;
}
int AsyncUnitSendTxn(pAsyncUnit unit,
const char* command, int cmd_len,
AsyncTxnHandler callback, void* context,
int rsp_len)
{
pAsyncTxn myTxn = NULL;
myTxn = AsyncUnitPrepareTxn(unit, command, cmd_len,
callback, context, rsp_len);
if (myTxn == NULL)
return -1;
return AsyncUnitEnqueueTxn(unit, myTxn);
}
typedef struct txn_s {
char* transReply;
int transWait;
} TXN, *pTXN;
/**
* \brief TransCallback is the callback for the general command transaction.
*/
static int TransCallback(pAsyncTxn pCmd) {
char* resp = pCmd->inp_buf;
int resp_len = pCmd->inp_idx;
pTXN self = (pTXN) pCmd->cntx;
if (pCmd->txn_status == ATX_TIMEOUT) {
memcpy(self->transReply, resp, resp_len);
self->transReply[resp_len] = '\0';
self->transReply[0] = '\0';
self->transWait = -1;
}
else {
memcpy(self->transReply, resp, resp_len);
self->transReply[resp_len] = '\0';
self->transWait = 0;
}
return 0;
}
int AsyncUnitTransact(pAsyncUnit unit,
const char* command, int cmd_len,
char* response, int rsp_len)
{
TXN txn;
assert(unit);
txn.transReply = response;
txn.transWait = 1;
AsyncUnitSendTxn(unit,
command, cmd_len,
TransCallback, &txn, rsp_len);
while (txn.transWait == 1)
TaskYield(pServ->pTasker);
if (txn.transWait < 0)
return txn.transWait;
return 1;
}
int AsyncUnitWrite(pAsyncUnit unit, void* buffer, int buflen)
{
int iRet;
mkChannel* sock;
assert(unit);
assert(unit->queue);
if (buflen > 0) {
sock = AsyncUnitGetSocket(unit);
iRet = NETWrite(sock, buffer, buflen);
/* TODO handle errors */
if (iRet < 0) { /* TODO: EOF */
iRet = AQ_Reconnect(unit->queue);
if (iRet == 0)
return 0;
}
}
return 1;
}
void AsyncUnitSetNotify(pAsyncUnit unit, void* context, AQU_Notify notify)
{
assert(unit);
unit->notify_func = notify;
unit->notify_cntx = context;
}
int AsyncUnitGetDelay(pAsyncUnit unit)
{
assert(unit);
return unit->queue->iDelay;
}
void AsyncUnitSetDelay(pAsyncUnit unit, int iDelay)
{
assert(unit);
unit->queue->iDelay = iDelay;
}
int AsyncUnitGetTimeout(pAsyncUnit unit)
{
assert(unit);
return unit->queue->timeout;
}
void AsyncUnitSetTimeout(pAsyncUnit unit, int timeout)
{
assert(unit);
unit->queue->timeout = timeout;
}
int AsyncUnitGetRetries(pAsyncUnit unit)
{
assert(unit);
return unit->queue->retries;
}
void AsyncUnitSetRetries(pAsyncUnit unit, int retries)
{
assert(unit);
unit->queue->retries = retries;
}
pAsyncProtocol AsyncUnitGetProtocol(pAsyncUnit unit)
{
return unit->queue->protocol;
}
void AsyncUnitSetProtocol(pAsyncUnit unit, pAsyncProtocol protocol)
{
unit->queue->protocol = protocol;
}
mkChannel* AsyncUnitGetSocket(pAsyncUnit unit)
{
assert(unit);
assert(unit->queue);
return unit->queue->pSock;
}
int AsyncUnitReconnect(pAsyncUnit unit)
{
int iRet;
assert(unit);
assert(unit->queue);
iRet = AQ_Reconnect(unit->queue);
/* TODO: handle in-progress */
return iRet;
}
int AsyncQueueAction(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[])
{
char line[132];
pAsyncQueue self = (pAsyncQueue) pData;
if (argc > 1) {
if (strcasecmp("send", argv[1]) == 0) {
AsyncUnit myUnit;
char cmd[10240];
char rsp[10240];
int idx = 0;
int i, j;
cmd[0] = '\0';
for (i = 2; i < argc; ++i) {
j = snprintf(&cmd[idx], 10240 - j, "%s%s",
(i > 2) ? " " : "",
argv[i]);
if (j < 0)
break;
idx += j;
}
memset(&myUnit, 0, sizeof(AsyncUnit));
myUnit.queue = self;
AsyncUnitTransact(&myUnit, cmd, idx, rsp, 10240);
SCWrite(pCon, rsp, eValue);
return 1;
}
if (strcasecmp(argv[1], "reconnect") == 0) {
AQ_Reconnect(self);
return OKOK;
}
if (strcasecmp(argv[1], "delay") == 0) {
if (argc > 2) {
int delay;
int iRet;
iRet = sscanf(argv[2], "%d", &delay);
if (iRet != 1) {
snprintf(line, 132, "Invalid argument: %s", argv[2]);
SCWrite(pCon, line, eError);
return 0;
}
else {
if (delay < 0 || delay > 30000) {
snprintf(line, 132, "Value out of range: %d", delay);
SCWrite(pCon, line, eError);
return 0;
}
self->iDelay = delay;
return OKOK;
}
}
else {
snprintf(line, 132, "%s.delay = %d", argv[0], self->iDelay);
SCWrite(pCon, line, eStatus);
return OKOK;
}
return OKOK;
}
if (strcasecmp(argv[1], "timeout") == 0) {
if (argc > 2) {
int timeout;
int iRet;
iRet = sscanf(argv[2], "%d", &timeout);
if (iRet != 1) {
snprintf(line, 132, "Invalid argument: %s", argv[2]);
SCWrite(pCon, line, eError);
return 0;
}
else {
if (timeout < 0 || timeout > 30000) {
snprintf(line, 132, "Value out of range: %d", timeout);
SCWrite(pCon, line, eError);
return 0;
}
self->timeout = timeout;
return OKOK;
}
}
else {
snprintf(line, 132, "%s.timeout = %d", argv[0], self->timeout);
SCWrite(pCon, line, eStatus);
return OKOK;
}
return OKOK;
}
if (strcasecmp(argv[1], "retries") == 0) {
if (argc > 2) {
int retries;
int iRet;
iRet = sscanf(argv[2], "%d", &retries);
if (iRet != 1) {
snprintf(line, 132, "Invalid argument: %s", argv[2]);
SCWrite(pCon, line, eError);
return 0;
}
else {
if (retries < 0 || retries > 30000) {
snprintf(line, 132, "Value out of range: %d", retries);
SCWrite(pCon, line, eError);
return 0;
}
self->retries = retries;
return OKOK;
}
}
else {
snprintf(line, 132, "%s.retries = %d", argv[0], self->retries);
SCWrite(pCon, line, eStatus);
return OKOK;
}
return OKOK;
}
}
snprintf(line, 132, "%s does not understand %s", argv[0], argv[1]);
SCWrite(pCon, line, eError);
return 0;
}
static pAsyncQueue AQ_Create(const char* host, const char* port)
{
int i;
pAsyncQueue self = NULL;
mkChannel* channel = NULL;
if (host == NULL)
return NULL;
/* try the AsyncQueue with this name */
self = (pAsyncQueue) FindCommandData(pServ->pSics, host, "AsyncQueue");
/* try host and port */
if (self == NULL) {
int port_no = atoi(port);
if (port_no == 0) {
struct servent *sp=NULL;
sp = getservbyname(port, NULL);
if (sp)
port_no = ntohs(sp->s_port);
}
if (port_no > 0) {
struct sockaddr_in sa;
if (CreateSocketAdress(&sa, host, port_no)) {
/* look for queue with same address */
for (i = 0; i < queue_index; ++i)
if (queue_array[i]->pSock->adresse.sin_port == sa.sin_port
&& queue_array[i]->pSock->adresse.sin_addr.s_addr == sa.sin_addr.s_addr) {
self = queue_array[i];
break;
}
}
if (self == NULL) {
channel = NETConnectWithFlags(host, port_no, 0);
/* TODO */
}
}
}
if (self == NULL) {
if (channel == NULL)
return NULL;
self = (pAsyncQueue) malloc(sizeof(AsyncQueue));
if (self == NULL)
return NULL;
memset(self, 0, sizeof(AsyncQueue));
self->pSock = channel;
self->pDes = CreateDescriptor("AsyncQueue");
queue_array[queue_index++] = self;
}
for (i = 0; i < queue_index; ++i)
if (queue_array[i] == self) {
break;
}
if (i == queue_index)
queue_array[queue_index++] = self;
return self;
}
static int AQ_Init(pAsyncQueue self)
{
/* TODO: Init the controller */
if (self->nw_ctx == NULL)
NetWatchRegisterCallback(&self->nw_ctx,
self->pSock->sockid,
MyCallback,
self);
return 1;
}
static void AQ_Kill(void* pData)
{
int i;
pAsyncQueue self = (pAsyncQueue) pData;
for (i = 0; i < queue_index; ++i)
if (queue_array[i] == self) {
--queue_index;
if (queue_index > 0)
queue_array[i] = queue_array[queue_index];
if (self->nw_ctx)
NetWatchRemoveCallback(self->nw_ctx);
if (self->nw_tmr)
NetWatchRemoveTimer(self->nw_tmr);
if (self->queue_name)
free(self->queue_name);
NETClosePort(self->pSock);
free(self->pSock);
DeleteDescriptor(self->pDes);
free(self);
return;
}
}
/*
* \brief make a AsyncQueue from the command line
*
* MakeAsyncQueue queueName protocolName hostName portname
*/
int AsyncQueueFactory(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[])
{
pAsyncQueue pNew = NULL;
mkChannel* channel = NULL;
pAsyncProtocol pPro = NULL;
int port_no;
int iRet = 0;
if (argc < 5) {
SCWrite(pCon,"ERROR: insufficient arguments to AsyncQueueFactory", eError);
return 0;
}
/* try to find an existing queue with this name */
pNew = (pAsyncQueue) FindCommandData(pServ->pSics, argv[1], "AsyncQueue");
if (pNew != NULL) {
char line[132];
snprintf(line, 132, "WARNING: AsyncQueue '%s' already exists", argv[1]);
SCWrite(pCon, line, eError);
SCSendOK(pCon);
return 1;
}
/* try to find an existing protocol with this name */
pPro = (pAsyncProtocol) FindCommandData(pServ->pSics, argv[2], "AsyncProtocol");
if (pPro == NULL) {
char line[132];
snprintf(line, 132, "WARNING: AsyncQueue protocol '%s' not found", argv[2]);
SCWrite(pCon, line, eError);
return 0;
}
port_no = atoi(argv[4]);
if (port_no == 0) {
struct servent *sp=NULL;
sp = getservbyname(argv[4], NULL);
if (sp)
port_no = ntohs(sp->s_port);
}
if (port_no > 0) {
struct sockaddr_in sa;
if (CreateSocketAdress(&sa, argv[3], port_no)) {
int i;
/* look for queue with same address */
for (i = 0; i < queue_index; ++i)
if (queue_array[i]->pSock->adresse.sin_port == sa.sin_port
&& queue_array[i]->pSock->adresse.sin_addr.s_addr == sa.sin_addr.s_addr) {
char line[132];
snprintf(line, 132, "WARNING: AsyncQueue '%s' has same address as %s",
argv[1],
queue_array[i]->queue_name);
SCWrite(pCon, line, eError);
}
}
/* TODO: asynchronous connection */
channel = NETConnectWithFlags(argv[3], port_no, 0);
}
if (channel == NULL) {
char line[132];
snprintf(line, 132, "ERROR: AsyncQueue '%s' cannot connect", argv[1]);
SCWrite(pCon, line, eError);
return 0;
}
pNew = (pAsyncQueue) malloc(sizeof(AsyncQueue));
if (pNew == NULL) {
char line[132];
snprintf(line, 132, "ERROR: AsyncQueue '%s' memory failure", argv[1]);
SCWrite(pCon, line, eError);
return 0;
}
memset(pNew, 0, sizeof(AsyncQueue));
pNew->pDes = CreateDescriptor("AsyncQueue");
pNew->queue_name = strdup(argv[1]);
pNew->protocol = pPro;
pNew->pSock = channel;
queue_array[queue_index++] = pNew;
AQ_Init(pNew);
/*
create the command
*/
iRet = AddCommand(pSics, argv[1], AsyncQueueAction, AQ_Kill, pNew);
if(!iRet)
{
char line[132];
snprintf(line, 123, "ERROR: add command %s failed", argv[1]);
SCWrite(pCon, line, eError);
AQ_Kill(pNew);
return 0;
}
SCSendOK(pCon);
return 1;
}
int MultiChanFactory(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[])
{
pAsyncQueue pNew = NULL;
int iRet, status;
char pError[256];
if(argc < 3)
{
SCWrite(pCon,"ERROR: insufficient no of arguments to AsyncQueueFactory",
eError);
return 0;
}
/* try to find an existing queue with this name */
pNew = (pAsyncQueue) FindCommandData(pServ->pSics, argv[1], "AsyncQueue");
if (pNew != NULL) {
char line[132];
snprintf(line, 132, "ERROR: AsyncQueue '%s' already exists", argv[1]);
SCWrite(pCon, line, eError);
return 1;
}
/*
create data structure and open port
*/
if (argc > 3)
pNew = AQ_Create(argv[2], argv[3]);
else
pNew = AQ_Create(argv[2], NULL) ;
if(!pNew)
{
SCWrite(pCon,"ERROR: failed to create AsyncQueue in AsyncQueueFactory",eError);
return 0;
}
if (pNew->queue_name)
free(pNew->queue_name);
pNew->queue_name = strdup(argv[1]);
status = AQ_Init(pNew);
if(status != 1)
{
sprintf(pError,"ERROR: failed to connect to %s at port %d",
pNew->pHost, pNew->iPort);
SCWrite(pCon,pError,eError);
}
/*
create the command
*/
iRet = AddCommand(pSics, argv[1], AsyncQueueAction, AQ_Kill, pNew);
if(!iRet)
{
sprintf(pError,"ERROR: duplicate command %s not created", argv[1]);
SCWrite(pCon,pError,eError);
AQ_Kill(pNew);
return 0;
}
return 1;
}
/*
* \brief make a AsyncQueue from a named rs232 controller
*
* \param name the name of the SICS "RS232 Controller" object
* \param handle the handle to the AsyncQueue object
* \return 0 for FAILURE, 1 for SUCCESS
*/
int AsyncUnitCreateHost(const char* host, const char* port, pAsyncUnit* handle)
{
int status;
pAsyncQueue self = NULL;
pAsyncUnit unit = NULL;
*handle = NULL;
self = AQ_Create(host, port);
if (self == NULL)
return 0;
status = AQ_Init(self);
unit = (pAsyncUnit) malloc(sizeof(AsyncUnit));
/* TODO: check malloc failure */
memset(unit, 0, sizeof(AsyncUnit));
++self->unit_count;
unit->queue = self;
unit->next = self->units;
self->units = unit;
*handle = unit;
return 1;
}
int AsyncUnitCreate(const char* host, pAsyncUnit* handle) {
return AsyncUnitCreateHost(host, NULL, handle);
}
int AsyncUnitDestroy(pAsyncUnit unit)
{
assert(unit);
assert(unit->queue);
pAsyncQueue self = unit->queue;
pAsyncUnit* pNxt = &self->units;
while (*pNxt) {
if (*pNxt == unit) {
*pNxt = (*pNxt)->next;
break;
}
pNxt = &(*pNxt)->next;
}
--self->unit_count;
if (self->unit_count <= 0) {
AQ_Kill(self);
}
free(unit);
return 1;
}

178
asyncqueue.h Normal file
View File

@@ -0,0 +1,178 @@
/*
* A S Y N C Q U E U E
*
* This module manages communications on an asynchronous connection.
*
* Douglas Clowes, May 2007
*
*/
#ifndef SICSASYNCQUEUE
#define SICSASYNCQUEUE
#include "asyncprotocol.h"
#define AQU_TIMEOUT -1
#define AQU_DISCONNECT -2
#define AQU_RECONNECT -3
#define AQU_RETRY_CMD -4
#define AQU_POP_CMD -5
/** \brief create an AsyncUnit attached to a named AsyncQueue.
*
* \param queueName the name of the AsyncQueue to be used
* \param unit pointer to the AsyncUnit created on positive return
* \return positive if successful
*/
int AsyncUnitCreate(const char* queueName, pAsyncUnit* unit);
/** \brief create an AsyncUnit attached to an anonymous AsyncQueue.
*
* \param host name or address of the target host
* \param port number or service name on the target host
* \param unit pointer to the AsyncUnit created on positive return
* \return positive if successful
*/
int AsyncUnitCreateHost(const char* host,
const char* port,
pAsyncUnit* unit);
/** \brief destroys an AsyncUnit
*
* \param unit pointer to the AsyncUnit to be destroyed
*/
int AsyncUnitDestroy(pAsyncUnit unit);
/** \brief Queue a transaction at the head of the associated AsyncQueue
*
* \param unit AsyncUnit
* \param pTxn pointer to transaction
*/
int AsyncUnitEnqueueHead(pAsyncUnit unit, pAsyncTxn pTxn);
/** \brief Queue a transaction at the tail of the associated AsyncQueue
*
* \param unit AsyncUnit
* \param pTxn pointer to transaction
*/
int AsyncUnitEnqueueTxn(pAsyncUnit unit, pAsyncTxn pTxn);
/** \brief prepare a transaction according to the protocol (default is CRLF)
*
* \param unit AsyncUnit
* \param command text string to be sent
* \param cmd_len length of data in command
* \param responseHandler function to handle the response
* \param context to be used by handler function
* \param resp_len maximum length to be allowed for response
*/
pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
const char* command, int cmd_len,
AsyncTxnHandler responseHandler, void* context,
int rsp_len);
/** \brief prepare and queue a transaction
*
* \param unit AsyncUnit
* \param command text string to be sent
* \param cmd_len length of data in command
* \param responseHandler function to handle the response
* \param context to be used by handler function
* \param resp_len maximum length to be allowed for response
*/
int AsyncUnitSendTxn(pAsyncUnit unit,
const char* command, int cmd_len,
AsyncTxnHandler responseHandler, void* context,
int rsp_len);
/** \brief send a transaction and wait for the response
*
* \param unit AsyncUnit
* \param command text string to be sent
* \param cmd_len length of data in command
* \param responseHandler function to handle the response
* \param context to be used by handler function
* \param resp_len maximum length to be allowed for response
*/
int AsyncUnitTransact(pAsyncUnit unit,
const char* command, int cmd_len,
char* response, int rsp_len);
/** \brief write to the AsyncQueue file descriptor
*
* The data is transmitted directly to the descriptor without being queued.
* This may be used by the protocol transmit function or for retrieving error
* text associated with the current transmission.
*
* \param unit AsyncUnit
* \param data to be transmitted
* \param buflen lenght of data
*/
int AsyncUnitWrite(pAsyncUnit unit, void* buffer, int buflen);
/** \brief registers a notification callback
*
* The notification callback may notify unsolicited or unusual events
*
* \param unit AsyncUnit
* \param context passed in callback
* \param notify function to be called
*/
typedef void (*AQU_Notify)(void* context, int event);
void AsyncUnitSetNotify(pAsyncUnit unit, void* context, AQU_Notify notify);
/** \brief get the intertransaction delay in milliseconds
*/
int AsyncUnitGetDelay(pAsyncUnit unit);
/** \brief set the intertransaction delay in milliseconds
*/
void AsyncUnitSetDelay(pAsyncUnit unit, int iDelay);
/** \brief get the default transaction timeout in milliseconds
*/
int AsyncUnitGetTimeout(pAsyncUnit unit);
/** \brief set the default transaction timeout in milliseconds
*/
void AsyncUnitSetTimeout(pAsyncUnit unit, int timeout);
/** \brief get the number of retries
*/
int AsyncUnitGetRetries(pAsyncUnit unit);
/** \brief set the number of retries
*/
void AsyncUnitSetRetries(pAsyncUnit unit, int retries);
/** \brief get the associated protocol handler
*/
pAsyncProtocol AsyncUnitGetProtocol(pAsyncUnit unit);
/** \brief set the associated protocol handler
*/
void AsyncUnitSetProtocol(pAsyncUnit unit, pAsyncProtocol protocol);
/** \brief retrieves the socket/channel associated with the AsyncQueue
*
* \param unit AsyncUnit
* \return channel or NULL
*/
mkChannel* AsyncUnitGetSocket(pAsyncUnit unit);
/** \brief attempt to reconnect the socket of the associated AsyncQueue
*
* \param unit pointer to AsyncUnit
*/
int AsyncUnitReconnect(pAsyncUnit handle);
/** \brief create an AsyncQueue from the SICS command MakeAsyncQueue
*/
int AsyncQueueFactory(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]);
/** \brief SICS command handler for the AsyncQueue object
*/
int AsyncQueueAction(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]);
#endif /* SICSASYNCQUEUE */

View File

@@ -12,43 +12,37 @@
#ifndef SICSMULTICHAN #ifndef SICSMULTICHAN
#define SICSMULTICHAN #define SICSMULTICHAN
#define MCC_TIMEOUT -1 #include <asyncqueue.h>
#define MCC_DISCONNECT -2
#define MCC_RECONNECT -3
#define MCC_RETRY_CMD -4
#define MCC_POP_CMD -5
typedef struct __MultiChan MultiChan, *pMultiChan; /* transitional definitions*/
#define MCC_TIMEOUT AQU_TIMEOUT
#define MCC_DISCONNECT AQU_DISCONNECT
#define MCC_RECONNECT AQU_RECONNECT
#define MCC_RETRY_CMD AQU_RETRY_CMD
#define MCC_POP_CMD AQU_POP_CMD
#define MCC_Transmit AQU_Transmit
#define MCC_Receive AQU_Receive
#define MCC_Notify AQU_Notify
int MultiChanFactory(SConnection *pCon, SicsInterp *pSics, #define __MultiChan __AsyncUnit
void *pData, int argc, char *argv[]); #define MultiChan AsyncUnit
#define pMultiChan pAsyncUnit
int MultiChanCreate(const char* controller, pMultiChan* handle); #define MultiChanCreate AsyncUnitCreate
#define MultiChanCreateHost AsyncUnitCreateHost
#define MultiChanDestroy AsyncUnitDestroy
#define MultiChanReconnect AsyncUnitReconnect
#define MultiChanGetSocket AsyncUnitGetSocket
#define MultiChanEnqueTxn AsyncUnitEnqueTxn
#define MultiChanEnque AsyncUnitEnque
#define MultiChanWrite AsyncUnitWrite
#define MultiChanSetNotify AsyncUnitSetNotify
#define MultiChanGetDelay AsyncUnitGetDelay
#define MultiChanSetDelay AsyncUnitSetDelay
#define MultiChanGetTimeout AsyncUnitGetTimeout
#define MultiChanSetTimeout AsyncUnitSetTimeout
int MultiChanCreateHost(const char* host, #define MultiChanFactory AsyncQueueFactory
const char* port, #define MultiChanAction AsyncQueueAction
pMultiChan* handle);
int MultiChanDestroy(pMultiChan handle);
int MultiChanReconnect(pMultiChan handle);
mkChannel* MultiChanGetSocket(pMultiChan handle);
typedef int (*MCC_Transmit)(void* context);
typedef int (*MCC_Receive)(void* context, int ch);
int MultiChanEnque(pMultiChan unit, void* context, MCC_Transmit tx, MCC_Receive rx);
int MultiChanWrite(pMultiChan unit, void* buffer, int buflen);
typedef void (*MCC_Notify)(void* context, int event);
void MultiChanSetNotify(pMultiChan unit, void* context, MCC_Notify notify);
int MultiChanGetDelay(pMultiChan unit);
int MultiChanSetDelay(pMultiChan unit, int iDelay);
int MultiChanGetTimeout(pMultiChan unit);
int MultiChanSetTimeout(pMultiChan unit, int timeout);
int MultiChanAction(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]);
#endif /* SICSMULTICHAN */ #endif /* SICSMULTICHAN */

View File

@@ -51,67 +51,39 @@
#include <sics.h> #include <sics.h>
#include <modriv.h> #include <modriv.h>
#include <nwatch.h> #include <nwatch.h>
#include <multichan.h> #include <asyncqueue.h>
#include "nhq200util.h" #include "nhq200util.h"
/* -------------------------------------------------------------------*/ /* -------------------------------------------------------------------*/
typedef struct __command Command, *pCommand; static pAsyncProtocol NHQ_Protocol = NULL;
typedef int (*CommandCallback)(void* ctx, const char* resp, int resp_len);
struct __command { static int NHQ_Tx1(pAsyncProtocol p, void* ctx)
pMultiChan unit;
int cstate;
int lstate;
char* out_buf;
int out_len;
int out_idx;
char* inp_buf;
int inp_len;
int inp_idx;
CommandCallback func;
void* cntx;
};
static int NHQ_Tx1(void* ctx)
{ {
int iRet = 1; int iRet = 1;
pCommand myCmd = (pCommand) ctx; pAsyncTxn myCmd = (pAsyncTxn) ctx;
assert(myCmd); assert(myCmd);
iRet = MultiChanWrite(myCmd->unit, &myCmd->out_buf[myCmd->out_idx], 1); iRet = AsyncUnitWrite(myCmd->unit, &myCmd->out_buf[myCmd->out_idx], 1);
return iRet; return iRet;
} }
static int NHQ_Tx(void* ctx) static int NHQ_Tx(pAsyncProtocol p, pAsyncTxn myCmd)
{ {
pCommand myCmd = (pCommand) ctx;
/* /*
* Set/reset command states for send/resend of command * Set/reset command states for send/resend of command
*/ */
myCmd->cstate = 0; myCmd->txn_state = 0;
myCmd->lstate = 0;
myCmd->out_idx = 0; myCmd->out_idx = 0;
myCmd->inp_idx = 0; myCmd->inp_idx = 0;
return NHQ_Tx1(myCmd); myCmd->txn_status = ATX_ACTIVE;
return NHQ_Tx1(p, myCmd);
} }
static int NHQ_Rx(void* ctx, int rxchar) static int NHQ_Rx(pAsyncProtocol p, pAsyncTxn myCmd, int rxchar)
{ {
int iRet = 1; int iRet = 1;
pCommand myCmd = (pCommand) ctx;
if (rxchar == MCC_TIMEOUT) { switch (myCmd->txn_state) {
/* TODO: handle command timeout */
if (myCmd->func)
iRet = myCmd->func(myCmd->cntx, NULL, MCC_TIMEOUT);
free(myCmd->out_buf);
free(myCmd->inp_buf);
free(myCmd);
return MCC_POP_CMD;
}
switch (myCmd->cstate) {
case 0: /* send with echo */ case 0: /* send with echo */
if (rxchar != myCmd->out_buf[myCmd->out_idx]) { if (rxchar != myCmd->out_buf[myCmd->out_idx]) {
/* TODO: bad echo */ /* TODO: bad echo */
@@ -120,12 +92,12 @@ static int NHQ_Rx(void* ctx, int rxchar)
myCmd->out_idx > 0 && myCmd->out_idx > 0 &&
myCmd->out_buf[myCmd->out_idx - 1] == 0x0D) { myCmd->out_buf[myCmd->out_idx - 1] == 0x0D) {
myCmd->inp_idx = 0; myCmd->inp_idx = 0;
myCmd->cstate = 1; myCmd->txn_state = 1;
/* TODO: end of line */ /* TODO: end of line */
} }
else if (myCmd->out_idx < myCmd->out_len) { else if (myCmd->out_idx < myCmd->out_len) {
myCmd->out_idx++; myCmd->out_idx++;
iRet = NHQ_Tx1(myCmd); iRet = NHQ_Tx1(p, myCmd);
} }
else { else {
/* TODO: out of data */ /* TODO: out of data */
@@ -135,65 +107,37 @@ static int NHQ_Rx(void* ctx, int rxchar)
if (myCmd->inp_idx < myCmd->inp_len) if (myCmd->inp_idx < myCmd->inp_len)
myCmd->inp_buf[myCmd->inp_idx++] = rxchar; myCmd->inp_buf[myCmd->inp_idx++] = rxchar;
if (rxchar == 0x0D) if (rxchar == 0x0D)
myCmd->cstate = 2; myCmd->txn_state = 2;
break; break;
case 2: /* received CR and looking for LF */ case 2: /* received CR and looking for LF */
if (myCmd->inp_idx < myCmd->inp_len) if (myCmd->inp_idx < myCmd->inp_len)
myCmd->inp_buf[myCmd->inp_idx++] = rxchar; myCmd->inp_buf[myCmd->inp_idx++] = rxchar;
if (rxchar == 0x0A) { if (rxchar == 0x0A) {
/* end of line */ /* end of line */
myCmd->cstate = 3; myCmd->txn_state = 3;
myCmd->inp_idx -= 2; myCmd->inp_idx -= 2;
myCmd->inp_buf[myCmd->inp_idx] = '\0'; myCmd->inp_buf[myCmd->inp_idx] = '\0';
if (myCmd->func) myCmd->txn_status = ATX_COMPLETE;
iRet = myCmd->func(myCmd->cntx, myCmd->inp_buf, myCmd->inp_idx);
else
iRet = 0; iRet = 0;
} }
else else
myCmd->cstate = 1; myCmd->txn_state = 1;
break; break;
} }
if (iRet == 0) { /* end of command */ if (iRet == 0) { /* end of command */
free(myCmd->out_buf); return AQU_POP_CMD;
free(myCmd->inp_buf);
free(myCmd);
return MCC_POP_CMD;
} }
return iRet; return iRet;
} }
int NHQ_SendCmd(pMultiChan unit, static int NHQ_Ev(pAsyncProtocol p, pAsyncTxn myCmd, int event)
char* command, int cmd_len,
CommandCallback callback, void* context, int rsp_len)
{ {
pCommand myCmd = NULL; if (event == AQU_TIMEOUT) {
/* TODO: handle command timeout */
assert(unit); myCmd->txn_status = ATX_TIMEOUT;
myCmd = (pCommand) malloc(sizeof(Command)); return AQU_POP_CMD;
assert(myCmd);
memset(myCmd, 0, sizeof(Command));
myCmd->out_buf = (char*) malloc(cmd_len + 5);
memcpy(myCmd->out_buf, command, cmd_len);
myCmd->out_len = cmd_len;
if (myCmd->out_len < 2 ||
myCmd->out_buf[myCmd->out_len - 1] != 0x0A ||
myCmd->out_buf[myCmd->out_len - 2] != 0x0D) {
myCmd->out_buf[myCmd->out_len++] = 0x0D;
myCmd->out_buf[myCmd->out_len++] = 0x0A;
} }
myCmd->out_buf[myCmd->out_len] = '\0'; return AQU_POP_CMD;
myCmd->func = callback;
myCmd->cntx = context;
if (rsp_len == 0)
myCmd->inp_buf = NULL;
else {
myCmd->inp_buf = malloc(rsp_len + 1);
memset(myCmd->inp_buf, 0, rsp_len + 1);
}
myCmd->inp_len = rsp_len;
myCmd->unit = unit;
return MultiChanEnque(unit, myCmd, NHQ_Tx, NHQ_Rx);
} }
static void NHQ_Notify(void* context, int event) static void NHQ_Notify(void* context, int event)
@@ -201,14 +145,14 @@ static void NHQ_Notify(void* context, int event)
pNHQ200 self = (pNHQ200) context; pNHQ200 self = (pNHQ200) context;
switch (event) { switch (event) {
case MCC_DISCONNECT: case AQU_DISCONNECT:
if (self->transWait == 1) { if (self->transWait == 1) {
self->transWait = NHQ200__FAULT; self->transWait = NHQ200__FAULT;
strcpy(self->pAns, "DISCONNECTED"); strcpy(self->pAns, "DISCONNECTED");
} }
case MCC_RECONNECT: case AQU_RECONNECT:
do { do {
mkChannel* sock = MultiChanGetSocket(self->mcc); mkChannel* sock = AsyncUnitGetSocket(self->unit);
int flag = 1; int flag = 1;
setsockopt(sock->sockid, /* socket affected */ setsockopt(sock->sockid, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */ IPPROTO_TCP, /* set option at TCP level */
@@ -302,13 +246,15 @@ static void parse_Vx(pNHQ200 self, const char* resp, int resp_len)
#define STATE_NX 5 #define STATE_NX 5
#define STATE_VX 6 #define STATE_VX 6
#define STATE_END 9 #define STATE_END 9
static int InitCallback(void* ctx, const char* resp, int resp_len) static int InitCallback(pAsyncTxn pTxn)
{ {
char cmd[20]; char cmd[20];
int cmd_len; int cmd_len;
pNHQ200 self = (pNHQ200) ctx; const char* resp = pTxn->inp_buf;
int resp_len = pTxn->inp_idx;
pNHQ200 self = (pNHQ200) pTxn->cntx;
/* TODO: FIXME finish initialisation */ /* TODO: FIXME finish initialisation */
if (resp_len < 0) { if (pTxn->txn_status == ATX_TIMEOUT) {
self->iError = NHQ200__BADSET; self->iError = NHQ200__BADSET;
self->iState = 0; self->iState = 0;
} }
@@ -316,37 +262,37 @@ static int InitCallback(void* ctx, const char* resp, int resp_len)
switch (self->iState) { switch (self->iState) {
case 0: /* Initial */ case 0: /* Initial */
cmd_len = snprintf(cmd, sizeof(cmd), "#"); cmd_len = snprintf(cmd, sizeof(cmd), "#");
NHQ_SendCmd(self->mcc, cmd, cmd_len, InitCallback, self, 80); AsyncUnitSendTxn(self->unit, cmd, cmd_len, InitCallback, self, 80);
self->iState = STATE_HASH; self->iState = STATE_HASH;
break; break;
case STATE_HASH: /* # */ case STATE_HASH: /* # */
parse_hash(self, resp, resp_len); parse_hash(self, resp, resp_len);
cmd_len = snprintf(cmd, sizeof(cmd), "S%d", self->iControl); cmd_len = snprintf(cmd, sizeof(cmd), "S%d", self->iControl);
NHQ_SendCmd(self->mcc, cmd, cmd_len, InitCallback, self, 80); AsyncUnitSendTxn(self->unit, cmd, cmd_len, InitCallback, self, 80);
self->iState = STATE_SX; self->iState = STATE_SX;
break; break;
case STATE_SX: /* Sx */ case STATE_SX: /* Sx */
parse_Sx(self, resp, resp_len); parse_Sx(self, resp, resp_len);
cmd_len = snprintf(cmd, sizeof(cmd), "T%d", self->iControl); cmd_len = snprintf(cmd, sizeof(cmd), "T%d", self->iControl);
NHQ_SendCmd(self->mcc, cmd, cmd_len, InitCallback, self, 80); AsyncUnitSendTxn(self->unit, cmd, cmd_len, InitCallback, self, 80);
self->iState = STATE_TX; self->iState = STATE_TX;
break; break;
case STATE_TX: /* Tx */ case STATE_TX: /* Tx */
parse_Tx(self, resp, resp_len); parse_Tx(self, resp, resp_len);
cmd_len = snprintf(cmd, sizeof(cmd), "M%d", self->iControl); cmd_len = snprintf(cmd, sizeof(cmd), "M%d", self->iControl);
NHQ_SendCmd(self->mcc, cmd, cmd_len, InitCallback, self, 80); AsyncUnitSendTxn(self->unit, cmd, cmd_len, InitCallback, self, 80);
self->iState = STATE_MX; self->iState = STATE_MX;
break; break;
case STATE_MX: /* Mx */ case STATE_MX: /* Mx */
parse_Mx(self, resp, resp_len); parse_Mx(self, resp, resp_len);
cmd_len = snprintf(cmd, sizeof(cmd), "N%d", self->iControl); cmd_len = snprintf(cmd, sizeof(cmd), "N%d", self->iControl);
NHQ_SendCmd(self->mcc, cmd, cmd_len, InitCallback, self, 80); AsyncUnitSendTxn(self->unit, cmd, cmd_len, InitCallback, self, 80);
self->iState = STATE_NX; self->iState = STATE_NX;
break; break;
case STATE_NX: /* Nx */ case STATE_NX: /* Nx */
parse_Nx(self, resp, resp_len); parse_Nx(self, resp, resp_len);
cmd_len = snprintf(cmd, sizeof(cmd), "V%d", self->iControl); cmd_len = snprintf(cmd, sizeof(cmd), "V%d", self->iControl);
NHQ_SendCmd(self->mcc, cmd, cmd_len, InitCallback, self, 80); AsyncUnitSendTxn(self->unit, cmd, cmd_len, InitCallback, self, 80);
self->iState = STATE_VX; self->iState = STATE_VX;
break; break;
case STATE_VX: /* Vx */ case STATE_VX: /* Vx */
@@ -363,19 +309,20 @@ static int InitCallback(void* ctx, const char* resp, int resp_len)
static void NHQ_Init(pNHQ200 self) static void NHQ_Init(pNHQ200 self)
{ {
self->iState = 0; self->iState = 0;
NHQ_SendCmd(self->mcc, "", 0, InitCallback, self, 80); AsyncUnitSendTxn(self->unit, "", 0, InitCallback, self, 80);
} }
/* /*
* \brief GetCallback is the callback for the get position/value command. * \brief GetCallback is the callback for the get position/value command.
*/ */
static int GetCallback(void* ctx, const char* resp, int resp_len) static int GetCallback(pAsyncTxn pTxn)
{ {
int iRet; int iRet;
float fRead; float fRead;
pNHQ200 self = (pNHQ200) ctx; const char* resp = pTxn->inp_buf;
pNHQ200 self = (pNHQ200) pTxn->cntx;
if (resp_len < 0) { if (pTxn->txn_status == ATX_TIMEOUT) {
self->iError = NHQ200__BADREAD; self->iError = NHQ200__BADREAD;
} }
else { else {
@@ -396,11 +343,13 @@ static int GetCallback(void* ctx, const char* resp, int resp_len)
/* /*
* \brief TransCallback is the callback for the general command transaction. * \brief TransCallback is the callback for the general command transaction.
*/ */
static int TransCallback(void* ctx, const char* resp, int resp_len) static int TransCallback(pAsyncTxn pTxn)
{ {
pNHQ200 self = (pNHQ200) ctx; const char* resp = pTxn->inp_buf;
int resp_len = pTxn->inp_idx;
pNHQ200 self = (pNHQ200) pTxn->cntx;
if (resp_len < 0) { if (pTxn->txn_status == ATX_TIMEOUT) {
self->transReply[0] = '\0'; self->transReply[0] = '\0';
self->transWait = -1; self->transWait = -1;
} }
@@ -419,7 +368,7 @@ int transactNHQ200(pNHQ200 self, void *send, int sendLen,
assert(self); assert(self);
self->transReply = reply; self->transReply = reply;
self->transWait = 1; self->transWait = 1;
NHQ_SendCmd(self->mcc, AsyncUnitSendTxn(self->unit,
send, sendLen, send, sendLen,
TransCallback, self, replyLen); TransCallback, self, replyLen);
while (self->transWait == 1) while (self->transWait == 1)
@@ -448,7 +397,8 @@ int NHQ200_Check_Status(pNHQ200 self)
do do
{ {
sprintf(pCommand,"S%d", self->iControl); sprintf(pCommand,"S%d", self->iControl);
if ((iRet=transactNHQ200(self,pCommand,strlen(pCommand),pReply,79))<=0) iRet=AsyncUnitTransact(self->unit, pCommand, strlen(pCommand), pReply, 79);
if (iRet <= 0)
{ {
printf("Comms error!\n"); printf("Comms error!\n");
return iRet; // Comms problem return iRet; // Comms problem
@@ -472,10 +422,6 @@ int NHQ200_Check_Status(pNHQ200 self)
/* Operations common to both Open and Config functions */ /* Operations common to both Open and Config functions */
static int NHQ200_Setup(pNHQ200 self, int iControl) static int NHQ200_Setup(pNHQ200 self, int iControl)
{ {
int iRet;
char pCommand[20];
char pReply[132];
if (!self) if (!self)
return NHQ200__BADCOM; return NHQ200__BADCOM;
@@ -505,12 +451,12 @@ int NHQ200_Open(pNHQ200 *pData, char *pName, int iSensor, int iCTRL, int iMode)
self->fDiv = 1.0; self->fDiv = 1.0;
self->fMult = 1.0; self->fMult = 1.0;
if (MultiChanCreate(pName, &self->mcc) == 0) { if (AsyncUnitCreate(pName, &self->unit) == 0) {
return NHQ200__NONHQ200; return NHQ200__NONHQ200;
} }
MultiChanSetNotify(self->mcc, self, NHQ_Notify); AsyncUnitSetNotify(self->unit, self, NHQ_Notify);
sock = MultiChanGetSocket(self->mcc); sock = AsyncUnitGetSocket(self->unit);
if (sock) { if (sock) {
int flag = 1; int flag = 1;
iRet = setsockopt(sock->sockid, /* socket affected */ iRet = setsockopt(sock->sockid, /* socket affected */
@@ -560,7 +506,7 @@ int NHQ200_Send(pNHQ200 *pData, char *pCommand, char *pReply, int iLen)
/* Send command direct to the NHQ200 */ /* Send command direct to the NHQ200 */
commandlen=strlen(pCommand); commandlen=strlen(pCommand);
iRet=transactNHQ200(self,pCommand,commandlen,pReply,iLen); iRet=AsyncUnitTransact(self->unit, pCommand, commandlen, pReply, iLen);
return iRet; return iRet;
} }
@@ -581,13 +527,20 @@ int NHQ200_Read(pNHQ200 *pData, float *fVal)
struct timeval tv_this; struct timeval tv_this;
gettimeofday(&tv_this, NULL); gettimeofday(&tv_this, NULL);
if ((tv_this.tv_sec - self->tv_last.tv_sec) > 0) { if ((tv_this.tv_sec - self->tv_last.tv_sec) > 0) {
NHQ_SendCmd(self->mcc, AsyncUnitSendTxn(self->unit,
pCommand, 2, pCommand, 2,
GetCallback, self, 132); GetCallback, self, 132);
self->iGetOut = 1; self->iGetOut = 1;
self->tv_last = tv_this; self->tv_last = tv_this;
} }
} }
while (self->iGetOut) {
struct timeval tv_this;
gettimeofday(&tv_this, NULL);
if ((tv_this.tv_sec - self->tv_last.tv_sec) > 1)
break;
TaskYield(pServ->pTasker);
}
*fVal = self->fValue; *fVal = self->fValue;
iRet = 1; iRet = 1;
@@ -597,7 +550,7 @@ int NHQ200_Read(pNHQ200 *pData, float *fVal)
int NHQ200_Set(pNHQ200 *pData, float fVal) int NHQ200_Set(pNHQ200 *pData, float fVal)
{ {
char pCommand[20], pCommandRead[20], pReply[132], pCommandGo[20]; char pCommand[20], pCommandRead[20], pReply[132], pCommandGo[20];
int iRet, i; int iRet;
const float fPrecision = 0.1; const float fPrecision = 0.1;
float fDelta, fRead; float fDelta, fRead;
pNHQ200 self; pNHQ200 self;
@@ -615,11 +568,11 @@ int NHQ200_Set(pNHQ200 *pData, float fVal)
sprintf(pCommandRead,"D%d", self->iControl); sprintf(pCommandRead,"D%d", self->iControl);
/* send Dn=nnn command, we get a blank line response */ /* send Dn=nnn command, we get a blank line response */
iRet = transactNHQ200(self,pCommand,strlen(pCommand),pReply,131); iRet = AsyncUnitTransact(self->unit,pCommand,strlen(pCommand),pReply,131);
if (iRet <= 0) if (iRet <= 0)
return iRet; return iRet;
/* read the set value again using the Dn command */ /* read the set value again using the Dn command */
iRet = transactNHQ200(self,pCommandRead,strlen(pCommandRead),pReply,131); iRet = AsyncUnitTransact(self->unit,pCommandRead,strlen(pCommandRead),pReply,131);
if (iRet <= 0) if (iRet <= 0)
return iRet; return iRet;
printf("D%d: Response %d chars: '%s'\n",self->iControl, iRet, pReply); printf("D%d: Response %d chars: '%s'\n",self->iControl, iRet, pReply);
@@ -636,7 +589,7 @@ int NHQ200_Set(pNHQ200 *pData, float fVal)
if(fDelta < fPrecision) if(fDelta < fPrecision)
{ {
sprintf(pCommandGo, "G%d", self->iControl); sprintf(pCommandGo, "G%d", self->iControl);
iRet = transactNHQ200(self,pCommandGo,strlen(pCommandGo),pReply,131); iRet = AsyncUnitTransact(self->unit,pCommandGo,strlen(pCommandGo),pReply,131);
if (iRet <= 0) if (iRet <= 0)
return iRet; return iRet;
printf("G%d: Response %d chars: '%s'\n",self->iControl, iRet, pReply); printf("G%d: Response %d chars: '%s'\n",self->iControl, iRet, pReply);
@@ -686,3 +639,14 @@ void NHQ200_ErrorTxt(pNHQ200 *pData,int iCode, char *pError, int iLen)
break; break;
} }
} }
void NHQ200InitProtocol(SicsInterp *pSics) {
if (NHQ_Protocol == NULL) {
NHQ_Protocol = AsyncProtocolCreate(pSics, "NHQ200", NULL, NULL);
NHQ_Protocol->sendCommand = NHQ_Tx;
NHQ_Protocol->handleInput = NHQ_Rx;
NHQ_Protocol->handleEvent = NHQ_Ev;
NHQ_Protocol->prepareTxn = NULL;
NHQ_Protocol->killPrivate = NULL;
}
}

View File

@@ -29,7 +29,7 @@
/*------------------------------------------------------------------------*/ /*------------------------------------------------------------------------*/
typedef struct __NHQ200 { typedef struct __NHQ200 {
pMultiChan mcc; pAsyncUnit unit;
int iRead; int iRead;
int iControl; int iControl;
void *pData; void *pData;

View File

@@ -26,7 +26,8 @@ SOBJ = network.o ifile.o conman.o SCinter.o splitter.o passwd.o \
mcstashm.o initializer.o remob.o tclmotdriv.o protocol.o \ mcstashm.o initializer.o remob.o tclmotdriv.o protocol.o \
sinfox.o sicslist.o cone.o hipadaba.o sicshipadaba.o statistics.o \ sinfox.o sicslist.o cone.o hipadaba.o sicshipadaba.o statistics.o \
moregress.o hdbcommand.o multicounter.o regresscter.o histregress.o \ moregress.o hdbcommand.o multicounter.o regresscter.o histregress.o \
sicshdbadapter.o polldriv.o sicspoll.o statemon.o multichan.o sicshdbadapter.o polldriv.o sicspoll.o statemon.o \
asyncqueue.o asyncprotocol.o
# These are intermediate files generated from .tc files, marking # These are intermediate files generated from .tc files, marking
# them as SECONDARY prevents make from removing them. # them as SECONDARY prevents make from removing them.

View File

@@ -22,7 +22,7 @@
#include <sys/time.h> #include <sys/time.h>
#include <fortify.h> #include <fortify.h>
#include <sics.h> #include <sics.h>
#include <multichan.h> #include <asyncqueue.h>
#include <nwatch.h> #include <nwatch.h>
#include <modriv.h> #include <modriv.h>
#include <motor.h> #include <motor.h>
@@ -40,12 +40,8 @@
enum dmcsetting {dmcspeed, dmcacceleration, dmcdeceleration}; enum dmcsetting {dmcspeed, dmcacceleration, dmcdeceleration};
enum commandtype {CMD_RUN=1, CMD_HALT=2}; enum commandtype {CMD_RUN=1, CMD_HALT=2};
typedef struct __MoDriv DMC2280Driv, *pDMC2280Driv; typedef struct __MoDriv DMC2280Driv, *pDMC2280Driv;
typedef struct __command Command, *pCommand;
typedef int (*CommandCallback)(pCommand pCmd);
enum eventtype {eTimerEvent, eMessageEvent, eCommandEvent, eTimeoutEvent}; enum eventtype {eTimerEvent, eMessageEvent, eCommandEvent, eTimeoutEvent};
typedef struct EvtEvent_s EvtEvent, *pEvtEvent; typedef struct EvtEvent_s EvtEvent, *pEvtEvent;
@@ -54,7 +50,7 @@ typedef void (*StateFunc)(pDMC2280Driv self, pEvtEvent event);
typedef struct EvtTimer_s { } EvtTimer; typedef struct EvtTimer_s { } EvtTimer;
typedef struct EvtMessage_s { typedef struct EvtMessage_s {
pCommand cmd; pAsyncTxn cmd;
} EvtMessage; } EvtMessage;
typedef struct EvtCommand_s { typedef struct EvtCommand_s {
@@ -73,6 +69,8 @@ struct EvtEvent_s {
} event; } event;
}; };
static pAsyncProtocol DMC2280_Protocol = NULL;
/*----------------------------------------------------------------------- /*-----------------------------------------------------------------------
The motor driver structure. Please note that the first set of fields has The motor driver structure. Please note that the first set of fields has
be identical with the fields of AbstractModriv in ../modriv.h be identical with the fields of AbstractModriv in ../modriv.h
@@ -102,7 +100,7 @@ struct __MoDriv {
/* DMC-2280 specific fields */ /* DMC-2280 specific fields */
pMultiChan mcc; pAsyncUnit asyncUnit;
pMotor pMot; /**< Points to logical motor object */ pMotor pMot; /**< Points to logical motor object */
int errorCode; int errorCode;
char *errorMsg; /**< Points to memory for error messages */ char *errorMsg; /**< Points to memory for error messages */
@@ -226,20 +224,6 @@ static int DMC2280Halt(void *pData);
static int DMC2280SetPar(void *pData, SConnection *pCon, static int DMC2280SetPar(void *pData, SConnection *pCon,
char *name, float newValue); char *name, float newValue);
struct __command {
pMultiChan unit;
int cstate;
int lstate;
char* out_buf;
int out_len;
int out_idx;
char* inp_buf;
int inp_len;
int inp_idx;
CommandCallback func;
void* cntx;
};
/** \brief Convert axis speed in physical units to /** \brief Convert axis speed in physical units to
* motor speed in steps/sec. * motor speed in steps/sec.
* \param self (r) provides access to the motor's data structure * \param self (r) provides access to the motor's data structure
@@ -397,17 +381,18 @@ static int motCreep(pDMC2280Driv self, float target) {
return target_steps; return target_steps;
} }
static int DMC_Tx(void* ctx) static int DMC_Tx(pAsyncProtocol p, pAsyncTxn ctx)
{ {
int iRet = 1; int iRet = 1;
pCommand myCmd = (pCommand) ctx; pAsyncTxn myCmd = (pAsyncTxn) ctx;
if (myCmd) { if (myCmd) {
iRet = MultiChanWrite(myCmd->unit, myCmd->out_buf, myCmd->out_len); myCmd->txn_status = ATX_ACTIVE;
iRet = AsyncUnitWrite(myCmd->unit, myCmd->out_buf, myCmd->out_len);
/* TODO handle errors */ /* TODO handle errors */
if (iRet < 0) { /* TODO: EOF */ if (iRet < 0) { /* TODO: EOF */
/* /*
iRet = MultiChanReconnect(myCmd->unit); iRet = AsyncUnitReconnect(myCmd->unit);
if (iRet == 0) if (iRet == 0)
*/ */
return 0; return 0;
@@ -416,42 +401,32 @@ static int DMC_Tx(void* ctx)
return 1; return 1;
} }
static int DMC_Rx(void* ctx, int rxchar) { static int DMC_Rx(pAsyncProtocol p, pAsyncTxn ctx, int rxchar) {
int iRet = 1; int iRet = 1;
pCommand myCmd = (pCommand) ctx; pAsyncTxn myCmd = (pAsyncTxn) ctx;
if (rxchar == MCC_TIMEOUT) { switch (myCmd->txn_state) {
/* handle command timeout */
myCmd->inp_idx = MCC_TIMEOUT;
if (myCmd->func)
iRet = myCmd->func(myCmd);
free(myCmd->out_buf);
free(myCmd->inp_buf);
free(myCmd);
return MCC_POP_CMD;
}
switch (myCmd->cstate) {
case 0: /* first character */ case 0: /* first character */
if (rxchar == ':') { if (rxchar == ':') {
/* normal prompt */ /* normal prompt */
myCmd->cstate = 99; myCmd->txn_state = 99;
myCmd->txn_status = ATX_COMPLETE;
} }
else if (rxchar == '?') { else if (rxchar == '?') {
/* error prompt, send TC1 ahead of any queued commands */ /* error prompt, send TC1 ahead of any queued commands */
iRet = MultiChanWrite(myCmd->unit, "TC1\r\n", 5); iRet = AsyncUnitWrite(myCmd->unit, "TC1\r\n", 5);
myCmd->cstate = 1; myCmd->txn_state = 1;
} }
else { else {
/* normal data */ /* normal data */
myCmd->cstate = 1; myCmd->txn_state = 1;
} }
/* note fallthrough */ /* note fallthrough */
case 1: /* receiving reply */ case 1: /* receiving reply */
if (myCmd->inp_idx < myCmd->inp_len) if (myCmd->inp_idx < myCmd->inp_len)
myCmd->inp_buf[myCmd->inp_idx++] = rxchar; myCmd->inp_buf[myCmd->inp_idx++] = rxchar;
if (rxchar == 0x0D) if (rxchar == 0x0D)
myCmd->cstate = 2; myCmd->txn_state = 2;
break; break;
case 2: /* received CR and looking for LF */ case 2: /* received CR and looking for LF */
if (myCmd->inp_idx < myCmd->inp_len) if (myCmd->inp_idx < myCmd->inp_len)
@@ -462,13 +437,13 @@ static int DMC_Rx(void* ctx, int rxchar) {
myCmd->inp_idx -= 2; myCmd->inp_idx -= 2;
myCmd->inp_buf[myCmd->inp_idx++] = rxchar; myCmd->inp_buf[myCmd->inp_idx++] = rxchar;
*/ */
myCmd->cstate = 0; myCmd->txn_state = 0;
} }
else else
myCmd->cstate = 1; myCmd->txn_state = 1;
break; break;
} }
if (myCmd->cstate == 99) { if (myCmd->txn_state == 99) {
myCmd->inp_buf[myCmd->inp_idx] = '\0'; myCmd->inp_buf[myCmd->inp_idx] = '\0';
if (strncmp(myCmd->inp_buf, myCmd->out_buf, myCmd->out_len) == 0) { if (strncmp(myCmd->inp_buf, myCmd->out_buf, myCmd->out_len) == 0) {
int i; int i;
@@ -477,60 +452,28 @@ static int DMC_Rx(void* ctx, int rxchar) {
myCmd->inp_buf[i - myCmd->out_len] = myCmd->inp_buf[i]; myCmd->inp_buf[i - myCmd->out_len] = myCmd->inp_buf[i];
} }
} }
if (myCmd->func)
iRet = myCmd->func(myCmd);
else
iRet = 0; iRet = 0;
myCmd->cstate = 0;
myCmd->inp_idx = 0;
} }
if (iRet == 0) { /* end of command */ if (iRet == 0) { /* end of command */
free(myCmd->out_buf); return AQU_POP_CMD;
free(myCmd->inp_buf);
free(myCmd);
return MCC_POP_CMD;
} }
return iRet; return iRet;
} }
static int DMC_SendCommand(pMultiChan unit, static int DMC_Ev(pAsyncProtocol p, pAsyncTxn pTxn, int event) {
char* command, int cmd_len, if (event == AQU_TIMEOUT) {
CommandCallback callback, void* context, int rsp_len) /* handle command timeout */
{ pTxn->txn_status = ATX_TIMEOUT;
pCommand myCmd = NULL; return AQU_POP_CMD;
assert(unit);
myCmd = (pCommand) malloc(sizeof(Command));
assert(myCmd);
memset(myCmd, 0, sizeof(Command));
myCmd->out_buf = (char*) malloc(cmd_len + 5);
memcpy(myCmd->out_buf, command, cmd_len);
myCmd->out_len = cmd_len;
if (myCmd->out_len < 2 ||
myCmd->out_buf[myCmd->out_len - 1] != 0x0A ||
myCmd->out_buf[myCmd->out_len - 2] != 0x0D) {
myCmd->out_buf[myCmd->out_len++] = 0x0D;
myCmd->out_buf[myCmd->out_len++] = 0x0A;
} }
myCmd->out_buf[myCmd->out_len] = '\0'; return AQU_POP_CMD;
myCmd->func = callback;
myCmd->cntx = context;
if (rsp_len == 0)
myCmd->inp_buf = NULL;
else {
myCmd->inp_buf = malloc(rsp_len + 1);
memset(myCmd->inp_buf, 0, rsp_len + 1);
}
myCmd->inp_len = rsp_len;
myCmd->unit = unit;
return MultiChanEnque(unit, myCmd, DMC_Tx, DMC_Rx);
} }
static int DMC_SendCmd(pDMC2280Driv self, static int DMC_SendCmd(pDMC2280Driv self,
char* command, char* command,
CommandCallback callback) AsyncTxnHandler callback)
{ {
return DMC_SendCommand(self->mcc, return AsyncUnitSendTxn(self->asyncUnit,
command, strlen(command), command, strlen(command),
callback, self, CMDLEN); callback, self, CMDLEN);
} }
@@ -541,12 +484,12 @@ static void DMC_Notify(void* context, int event)
char line[132]; char line[132];
switch (event) { switch (event) {
case MCC_DISCONNECT: case AQU_DISCONNECT:
snprintf(line, 132, "Disconnect on Motor '%s'", self->name); snprintf(line, 132, "Disconnect on Motor '%s'", self->name);
SICSLogWrite(line, eStatus); SICSLogWrite(line, eStatus);
/* TODO: disconnect */ /* TODO: disconnect */
break; break;
case MCC_RECONNECT: case AQU_RECONNECT:
snprintf(line, 132, "Reconnect on Motor '%s'", self->name); snprintf(line, 132, "Reconnect on Motor '%s'", self->name);
SICSLogWrite(line, eStatus); SICSLogWrite(line, eStatus);
/* TODO: reconnect */ /* TODO: reconnect */
@@ -555,22 +498,24 @@ static void DMC_Notify(void* context, int event)
return; return;
} }
typedef struct txn_s {
char* transReply;
int transWait;
} TXN, *pTXN;
/** /**
* \brief SendCallback is the callback for the general command. * \brief SendCallback is the callback for the general command.
*/ */
static int SendCallback(pCommand pCmd) static int SendCallback(pAsyncTxn pCmd)
{ {
char* cmnd = pCmd->out_buf; char* cmnd = pCmd->out_buf;
char* resp = pCmd->inp_buf; char* resp = pCmd->inp_buf;
int resp_len = pCmd->inp_idx;
pDMC2280Driv self = (pDMC2280Driv) pCmd->cntx; pDMC2280Driv self = (pDMC2280Driv) pCmd->cntx;
if (resp_len > 0) { if (pCmd->txn_status == ATX_TIMEOUT) {
if (self->debug) {
SICSLogWrite(pCmd->out_buf, eStatus);
SICSLogWrite("<TIMEOUT>", eStatus);
}
strncpy(self->lastCmd, pCmd->out_buf, CMDLEN);
self->errorCode = MOTCMDTMO;
}
else {
switch (resp[0]) { switch (resp[0]) {
case ':': case ':':
case ' ': case ' ':
@@ -593,52 +538,11 @@ static int SendCallback(pCommand pCmd)
break; break;
} }
} }
else {
/* TODO: timeout */
}
return 0; return 0;
} }
/** static int DMC2280Queue(pDMC2280Driv self, char *cmd, AsyncTxnHandler cb) {
* \brief TransCallback is the callback for the general command transaction.
*/
static int TransCallback(pCommand pCmd) {
char* resp = pCmd->inp_buf;
int resp_len = pCmd->inp_idx;
pTXN self = (pTXN) pCmd->cntx;
if (resp_len < 0) {
self->transReply[0] = '\0';
self->transWait = -1;
}
else {
memcpy(self->transReply, resp, resp_len);
self->transReply[resp_len] = '\0';
self->transWait = 0;
}
return 0;
}
/*------------------------------------------------------------------------*/
static int DMC_transact(pDMC2280Driv self, void *send, int sendLen,
void *reply, int replyLen)
{
TXN txn;
assert(self);
txn.transReply = reply;
txn.transWait = 1;
DMC_SendCommand(self->mcc,
send, sendLen,
TransCallback, &txn, replyLen);
while (txn.transWait == 1)
TaskYield(pServ->pTasker);
if (txn.transWait < 0)
return txn.transWait;
return 1;
}
static int DMC2280Queue(pDMC2280Driv self, char *cmd, CommandCallback cb) {
if (cb == NULL) if (cb == NULL)
cb = SendCallback; cb = SendCallback;
return DMC_SendCmd(self, cmd, cb); return DMC_SendCmd(self, cmd, cb);
@@ -677,7 +581,7 @@ static int DMC2280Send(pDMC2280Driv self, char *command) {
static int DMC2280SendReceive(pDMC2280Driv self, char *cmd, char* reply) { static int DMC2280SendReceive(pDMC2280Driv self, char *cmd, char* reply) {
int status; int status;
status = DMC_transact(self, cmd, strlen(cmd), reply, CMDLEN); status = AsyncUnitTransact(self->asyncUnit, cmd, strlen(cmd), reply, CMDLEN);
if (status != 1) { if (status != 1) {
if (self->debug) if (self->debug)
@@ -985,12 +889,19 @@ static int DMC2280RunCommon(pDMC2280Driv self,float fValue){
/** /**
* \brief process the airpad status response * \brief process the airpad status response
*/ */
static int airpad_callback(pCommand pCmd) { static int airpad_callback(pAsyncTxn pCmd) {
char* resp = pCmd->inp_buf; char* resp = pCmd->inp_buf;
int resp_len = pCmd->inp_idx;
pDMC2280Driv self = (pDMC2280Driv) pCmd->cntx; pDMC2280Driv self = (pDMC2280Driv) pCmd->cntx;
if (resp_len > 0) { if (pCmd->txn_status == ATX_TIMEOUT) {
if (self->debug) {
SICSLogWrite(pCmd->out_buf, eStatus);
SICSLogWrite("<TIMEOUT>", eStatus);
}
strncpy(self->lastCmd, pCmd->out_buf, CMDLEN);
self->errorCode = MOTCMDTMO;
}
else {
float fReply; float fReply;
if (self->debug) { if (self->debug) {
SICSLogWrite(pCmd->inp_buf, eStatus); SICSLogWrite(pCmd->inp_buf, eStatus);
@@ -1010,9 +921,6 @@ static int airpad_callback(pCommand pCmd) {
return 0; return 0;
} }
} }
else {
/* TODO: timeout */
}
return 0; return 0;
} }
@@ -1228,15 +1136,36 @@ static char* state_name(StateFunc func) {
return "<unknown_state>"; return "<unknown_state>";
} }
void str_n_cat(char* s1, int len, const char* s2) {
int i = strlen(s1);
const char* p = s2;
while (i < len - 3 && *p) {
if (*p == '\r') {
s1[i++] = '\\';
s1[i++] = 'r';
++p;
}
else if (*p == '\n') {
s1[i++] = '\\';
s1[i++] = 'n';
++p;
}
else
s1[i++] = *p++;
}
s1[i] = '\0';
}
static char* event_name(pEvtEvent event, char* text, int length) { static char* event_name(pEvtEvent event, char* text, int length) {
switch (event->event_type) { switch (event->event_type) {
case eTimerEvent: case eTimerEvent:
snprintf(text, length, "eTimerEvent"); snprintf(text, length, "eTimerEvent");
return text; return text;
case eMessageEvent: case eMessageEvent:
snprintf(text, length, "eMessageEvent:%s:%s", snprintf(text, length, "eMessageEvent:");
event->event.msg.cmd->out_buf, str_n_cat(text, length, event->event.msg.cmd->out_buf);
event->event.msg.cmd->inp_buf); str_n_cat(text, length, "|");
str_n_cat(text, length, event->event.msg.cmd->inp_buf);
return text; return text;
case eCommandEvent: case eCommandEvent:
switch (event->event.cmd.cmd_type) { switch (event->event.cmd.cmd_type) {
@@ -1285,19 +1214,12 @@ static void change_state(pDMC2280Driv self, StateFunc func) {
self->subState = 0; self->subState = 0;
} }
static int state_msg_callback(pCommand pCmd) static int state_msg_callback(pAsyncTxn pCmd)
{ {
pDMC2280Driv self = (pDMC2280Driv) pCmd->cntx; pDMC2280Driv self = (pDMC2280Driv) pCmd->cntx;
EvtEvent event; EvtEvent event;
if (pCmd->inp_idx > 0) {
if (self->debug) { if (pCmd->txn_status == ATX_TIMEOUT) {
SICSLogWrite(pCmd->out_buf, eStatus);
SICSLogWrite(pCmd->inp_buf, eStatus);
}
event.event_type = eMessageEvent;
event.event.msg.cmd = pCmd;
}
else {
if (self->debug) { if (self->debug) {
SICSLogWrite(pCmd->out_buf, eStatus); SICSLogWrite(pCmd->out_buf, eStatus);
SICSLogWrite("<TIMEOUT>", eStatus); SICSLogWrite("<TIMEOUT>", eStatus);
@@ -1305,6 +1227,14 @@ static int state_msg_callback(pCommand pCmd)
event.event_type = eTimeoutEvent; event.event_type = eTimeoutEvent;
event.event.msg.cmd = pCmd; event.event.msg.cmd = pCmd;
} }
else {
if (self->debug) {
SICSLogWrite(pCmd->out_buf, eStatus);
SICSLogWrite(pCmd->inp_buf, eStatus);
}
event.event_type = eMessageEvent;
event.event.msg.cmd = pCmd;
}
if (self->debug || self->trace) if (self->debug || self->trace)
report_event(self, &event); report_event(self, &event);
self->myState(self, &event); self->myState(self, &event);
@@ -1336,7 +1266,6 @@ static int state_cmd_execute(pDMC2280Driv self, enum commandtype cmd) {
static void DMCState_Unknown(pDMC2280Driv self, pEvtEvent event) { static void DMCState_Unknown(pDMC2280Driv self, pEvtEvent event) {
char cmd[CMDLEN]; char cmd[CMDLEN];
int value; int value;
float steps, counts;
switch (event->event_type) { switch (event->event_type) {
case eTimerEvent: case eTimerEvent:
@@ -1363,7 +1292,7 @@ static void DMCState_Unknown(pDMC2280Driv self, pEvtEvent event) {
return; return;
case eMessageEvent: case eMessageEvent:
do { do {
pCommand pCmd = event->event.msg.cmd; pAsyncTxn pCmd = event->event.msg.cmd;
if (pCmd->out_buf[0] == 'M') { /* MG */ if (pCmd->out_buf[0] == 'M') { /* MG */
int iRet; int iRet;
iRet = set_currMotion(self, pCmd->inp_buf); iRet = set_currMotion(self, pCmd->inp_buf);
@@ -1374,7 +1303,7 @@ static void DMCState_Unknown(pDMC2280Driv self, pEvtEvent event) {
change_state(self, DMCState_Idle); change_state(self, DMCState_Idle);
return; return;
} }
value = ((counts - self->absEncHome)/ self->cntsPerX) * self->stepsPerX; value = ((self->currCounts - self->absEncHome) / self->cntsPerX) * self->stepsPerX;
self->currSteps = value; self->currSteps = value;
snprintf(cmd, CMDLEN, "DP%c=%d", self->axisLabel, value); snprintf(cmd, CMDLEN, "DP%c=%d", self->axisLabel, value);
DMC_SendCmd(self, cmd, state_msg_callback); DMC_SendCmd(self, cmd, state_msg_callback);
@@ -1399,7 +1328,7 @@ static void DMCState_Idle(pDMC2280Driv self, pEvtEvent event) {
switch (event->event_type) { switch (event->event_type) {
case eMessageEvent: case eMessageEvent:
do { do {
pCommand pCmd = event->event.msg.cmd; pAsyncTxn pCmd = event->event.msg.cmd;
if (pCmd->out_buf[0] == 'M') { /* MG _XQ0,_TSx */ if (pCmd->out_buf[0] == 'M') { /* MG _XQ0,_TSx */
float fReply; float fReply;
int iRet, iFlags; int iRet, iFlags;
@@ -1479,7 +1408,7 @@ static void DMCState_AirOn(pDMC2280Driv self, pEvtEvent event) {
return; return;
case eMessageEvent: case eMessageEvent:
do { do {
pCommand pCmd = event->event.msg.cmd; pAsyncTxn pCmd = event->event.msg.cmd;
if (pCmd->out_buf[0] == 'F') { /* FTUBE */ if (pCmd->out_buf[0] == 'F') { /* FTUBE */
NetWatchRegisterTimer(&self->state_timer, NetWatchRegisterTimer(&self->state_timer,
AIR_POLL_TIMER, AIR_POLL_TIMER,
@@ -1523,7 +1452,7 @@ static void DMCState_MotorOn(pDMC2280Driv self, pEvtEvent event) {
return; return;
case eMessageEvent: case eMessageEvent:
do { do {
pCommand pCmd = event->event.msg.cmd; pAsyncTxn pCmd = event->event.msg.cmd;
if (pCmd->out_buf[0] == 'S') { /* SH */ if (pCmd->out_buf[0] == 'S') { /* SH */
NetWatchRegisterTimer(&self->state_timer, NetWatchRegisterTimer(&self->state_timer,
ON_SETTLE_TIMER, ON_SETTLE_TIMER,
@@ -1532,7 +1461,7 @@ static void DMCState_MotorOn(pDMC2280Driv self, pEvtEvent event) {
} }
else if (pCmd->out_buf[0] == 'M') { /* MG */ else if (pCmd->out_buf[0] == 'M') { /* MG */
int iRet, absolute; int iRet, absolute;
float steps, counts, target; float target;
iRet = set_currMotion(self, pCmd->inp_buf); iRet = set_currMotion(self, pCmd->inp_buf);
if (iRet == 0) if (iRet == 0)
break; break;
@@ -1602,7 +1531,7 @@ static void DMCState_Moving(pDMC2280Driv self, pEvtEvent event) {
return; return;
case eMessageEvent: case eMessageEvent:
do { do {
pCommand pCmd = event->event.msg.cmd; pAsyncTxn pCmd = event->event.msg.cmd;
if (pCmd->out_buf[0] == 'B') { /* BG */ if (pCmd->out_buf[0] == 'B') { /* BG */
NetWatchRegisterTimer(&self->state_timer, NetWatchRegisterTimer(&self->state_timer,
MOTOR_POLL_TIMER, MOTOR_POLL_TIMER,
@@ -1779,6 +1708,7 @@ static void DMCState_Moving(pDMC2280Driv self, pEvtEvent event) {
} }
break; break;
case eTimeoutEvent: case eTimeoutEvent:
strncpy(self->lastCmd, event->event.msg.cmd->out_buf, CMDLEN);
self->errorCode = MOTCMDTMO; self->errorCode = MOTCMDTMO;
self->driver_status = HWFault; self->driver_status = HWFault;
state_cmd_execute(self, CMD_HALT); state_cmd_execute(self, CMD_HALT);
@@ -1800,7 +1730,7 @@ static void DMCState_MotorHalt(pDMC2280Driv self, pEvtEvent event)
return; return;
case eMessageEvent: case eMessageEvent:
do { do {
pCommand pCmd = event->event.msg.cmd; pAsyncTxn pCmd = event->event.msg.cmd;
if (pCmd->out_buf[0] == 'S') { /* ST */ if (pCmd->out_buf[0] == 'S') { /* ST */
NetWatchRegisterTimer(&self->state_timer, NetWatchRegisterTimer(&self->state_timer,
MOTOR_POLL_TIMER, MOTOR_POLL_TIMER,
@@ -1881,14 +1811,13 @@ static void DMCState_OffTimer(pDMC2280Driv self, pEvtEvent event) {
} }
static void DMCState_AirOff(pDMC2280Driv self, pEvtEvent event) { static void DMCState_AirOff(pDMC2280Driv self, pEvtEvent event) {
char cmd[CMDLEN];
switch (event->event_type) { switch (event->event_type) {
case eTimerEvent: case eTimerEvent:
DMC_SendCmd(self, "MG APDONE", state_msg_callback); DMC_SendCmd(self, "MG APDONE", state_msg_callback);
return; return;
case eMessageEvent: case eMessageEvent:
do { do {
pCommand pCmd = event->event.msg.cmd; pAsyncTxn pCmd = event->event.msg.cmd;
if (pCmd->out_buf[0] == 'F') { /* FTUBE */ if (pCmd->out_buf[0] == 'F') { /* FTUBE */
} }
else if (pCmd->out_buf[0] == 'M') { /* MG APDONE */ else if (pCmd->out_buf[0] == 'M') { /* MG APDONE */
@@ -2534,6 +2463,7 @@ static int DMC2280GetPar(void *pData, char *name,
*fValue = self->absEncHome; *fValue = self->absEncHome;
return 1; return 1;
} }
if (self->has_fsm) {
if(strcasecmp(name,"creep_offset") == 0) { if(strcasecmp(name,"creep_offset") == 0) {
*fValue = self->creep_offset; *fValue = self->creep_offset;
return 1; return 1;
@@ -2543,6 +2473,7 @@ static int DMC2280GetPar(void *pData, char *name,
return 1; return 1;
} }
} }
}
else { else {
if (strcasecmp(name,"homerun") == 0) { if (strcasecmp(name,"homerun") == 0) {
if (readHomeRun(self, fValue) == SUCCESS) if (readHomeRun(self, fValue) == SUCCESS)
@@ -2717,10 +2648,11 @@ static int DMC2280SetPar(void *pData, SConnection *pCon,
} }
} }
/* Set creep offset, if (self->abs_encoder) { /* If we DO have an absolute encoder */
* managers only */ if (self->has_fsm) { /* If we DO have a finite state machine */
if(self->abs_encoder && strcasecmp(name,"creep_offset") == 0) { /* Set creep offset */
if(!SCMatchRights(pCon,usMugger)) if (strcasecmp(name,"creep_offset") == 0) {
if(!SCMatchRights(pCon,usMugger)) /* managers only */
return 1; return 1;
else { else {
self->creep_offset = fabs(newValue); self->creep_offset = fabs(newValue);
@@ -2728,21 +2660,22 @@ static int DMC2280SetPar(void *pData, SConnection *pCon,
} }
} }
/* Set creep_precision, /* Set creep_precision */
* managers only */ if (strcasecmp(name,"creep_precision") == 0) {
if(self->abs_encoder && strcasecmp(name,"creep_precision") == 0) { if(!SCMatchRights(pCon,usMugger)) /* managers only */
if(!SCMatchRights(pCon,usMugger))
return 1; return 1;
else { else {
self->creep_precision = fabs(newValue); self->creep_precision = fabs(newValue);
return 1; return 1;
} }
} }
}
}
else { /* If we do NOT have an absolute encoder */
/* Invoke Home Run routine in controller, /* Invoke Home Run routine in controller */
* managers only */ if(strcasecmp(name,"homerun") == 0) {
if(self->abs_encoder == 0 && strcasecmp(name,"homerun") == 0) { if(!SCMatchRights(pCon,usMugger)) /* managers only */
if(!SCMatchRights(pCon,usMugger))
return 1; return 1;
else { else {
if (DMC2280MotionControl != 1 && newValue > 0.5) { if (DMC2280MotionControl != 1 && newValue > 0.5) {
@@ -2753,6 +2686,7 @@ static int DMC2280SetPar(void *pData, SConnection *pCon,
return 1; return 1;
} }
} }
}
/* Set speed */ /* Set speed */
if(strcasecmp(name,SPEED) == 0) { if(strcasecmp(name,SPEED) == 0) {
@@ -2877,6 +2811,7 @@ static void DMC2280List(void *pData, char *name, SConnection *pCon){
if (self->abs_encoder) { if (self->abs_encoder) {
snprintf(buffer, BUFFLEN, "%s.absEncHome = %d\n", name, self->absEncHome); snprintf(buffer, BUFFLEN, "%s.absEncHome = %d\n", name, self->absEncHome);
SCWrite(pCon, buffer, eStatus); SCWrite(pCon, buffer, eStatus);
if (self->has_fsm) {
snprintf(buffer, BUFFLEN, "%s.cntsPerX = %f\n", name, self->cntsPerX); snprintf(buffer, BUFFLEN, "%s.cntsPerX = %f\n", name, self->cntsPerX);
SCWrite(pCon, buffer, eStatus); SCWrite(pCon, buffer, eStatus);
snprintf(buffer, BUFFLEN, "%s.Creep_Offset = %f\n", name, self->creep_offset); snprintf(buffer, BUFFLEN, "%s.Creep_Offset = %f\n", name, self->creep_offset);
@@ -2884,6 +2819,7 @@ static void DMC2280List(void *pData, char *name, SConnection *pCon){
snprintf(buffer, BUFFLEN, "%s.Creep_Precision = %f\n", name, self->creep_precision); snprintf(buffer, BUFFLEN, "%s.Creep_Precision = %f\n", name, self->creep_precision);
SCWrite(pCon, buffer, eStatus); SCWrite(pCon, buffer, eStatus);
} }
}
snprintf(buffer, BUFFLEN, "%s.stepsPerX = %f\n", name, self->stepsPerX); snprintf(buffer, BUFFLEN, "%s.stepsPerX = %f\n", name, self->stepsPerX);
SCWrite(pCon, buffer, eStatus); SCWrite(pCon, buffer, eStatus);
return; return;
@@ -2903,9 +2839,9 @@ static void KillDMC2280(/*@only@*/void *pData){
free(self->errorMsg); free(self->errorMsg);
self->errorMsg = NULL; self->errorMsg = NULL;
} }
if (self->mcc) { if (self->asyncUnit) {
MultiChanDestroy(self->mcc); AsyncUnitDestroy(self->asyncUnit);
self->mcc = NULL; self->asyncUnit = NULL;
} }
/* Not required as performed in caller /* Not required as performed in caller
* free(self); * free(self);
@@ -2957,17 +2893,18 @@ MotorDriver *CreateDMC2280(SConnection *pCon, char *motor, char *params) {
} }
memset(pNew, 0, sizeof(DMC2280Driv)); memset(pNew, 0, sizeof(DMC2280Driv));
/* Get multichan from the list of named parameters */ /* Get AsyncQueue from the list of named parameters */
if ((pPtr=getParam(pCon, interp, params, "multichan", _OPTIONAL)) != NULL) { if ((pPtr=getParam(pCon, interp, params, "multichan", _OPTIONAL)) != NULL ||
/* MultiChan */ (pPtr=getParam(pCon, interp, params, "asyncqueue", _OPTIONAL)) != NULL ||
if (!MultiChanCreate(pPtr, &pNew->mcc)) { (pPtr=getParam(pCon, interp, params, "asyncunit", _OPTIONAL)) != NULL) {
snprintf(pError, ERRLEN, "Cannot find MultiChan '%s' when creating DMC2280 motor '%s'", if (!AsyncUnitCreate(pPtr, &pNew->asyncUnit)) {
snprintf(pError, ERRLEN, "Cannot find AsyncQueue '%s' when creating DMC2280 motor '%s'",
pPtr, motor); pPtr, motor);
SCWrite(pCon,pError,eError); SCWrite(pCon,pError,eError);
KillDMC2280(pNew); KillDMC2280(pNew);
return NULL; return NULL;
} }
MultiChanSetNotify(pNew->mcc, pNew, DMC_Notify); AsyncUnitSetNotify(pNew->asyncUnit, pNew, DMC_Notify);
} }
else if ((pPtr=getParam(pCon, interp, params, "host", _OPTIONAL)) != NULL) { else if ((pPtr=getParam(pCon, interp, params, "host", _OPTIONAL)) != NULL) {
char* host = pPtr; char* host = pPtr;
@@ -2977,16 +2914,16 @@ MotorDriver *CreateDMC2280(SConnection *pCon, char *motor, char *params) {
KillDMC2280(pNew); KillDMC2280(pNew);
return NULL; return NULL;
} }
/* MultiChan */ /* AsyncUnit */
if (!MultiChanCreateHost(host, pPtr, &pNew->mcc)) { if (!AsyncUnitCreateHost(host, pPtr, &pNew->asyncUnit)) {
snprintf(pError, ERRLEN, snprintf(pError, ERRLEN,
"Cannot create MultiChan '%s:%s' for DMC2280 motor '%s'", "Cannot create AsyncUnit '%s:%s' for DMC2280 motor '%s'",
host, pPtr, motor); host, pPtr, motor);
SCWrite(pCon,pError,eError); SCWrite(pCon,pError,eError);
KillDMC2280(pNew); KillDMC2280(pNew);
return NULL; return NULL;
} }
MultiChanSetNotify(pNew->mcc, pNew, DMC_Notify); AsyncUnitSetNotify(pNew->asyncUnit, pNew, DMC_Notify);
} }
else { else {
snprintf(pError, ERRLEN, "\tError occurred when creating DMC2280 motor '%s'", motor); snprintf(pError, ERRLEN, "\tError occurred when creating DMC2280 motor '%s'", motor);
@@ -3002,6 +2939,7 @@ MotorDriver *CreateDMC2280(SConnection *pCon, char *motor, char *params) {
KillDMC2280(pNew); KillDMC2280(pNew);
return NULL; return NULL;
} }
pNew->pMot = NULL; pNew->pMot = NULL;
strcpy(pNew->name, motor); strcpy(pNew->name, motor);
pNew->home = 0.0; pNew->home = 0.0;
@@ -3135,6 +3073,7 @@ MotorDriver *CreateDMC2280(SConnection *pCon, char *motor, char *params) {
pNew->cntsPerX = 1.0; pNew->cntsPerX = 1.0;
else else
sscanf(pPtr,"%f",&(pNew->cntsPerX)); sscanf(pPtr,"%f",&(pNew->cntsPerX));
if (pNew->has_fsm) {
/* CREEP_OFFSET: this controls unidirectional driving */ /* CREEP_OFFSET: this controls unidirectional driving */
if ((pPtr=getParam(pCon, interp, params,"creep_offset",_OPTIONAL)) == NULL) if ((pPtr=getParam(pCon, interp, params,"creep_offset",_OPTIONAL)) == NULL)
pNew->creep_offset = 0.0; pNew->creep_offset = 0.0;
@@ -3152,7 +3091,7 @@ MotorDriver *CreateDMC2280(SConnection *pCon, char *motor, char *params) {
if (pNew->creep_precision < 0) if (pNew->creep_precision < 0)
pNew->creep_precision = -pNew->creep_precision; pNew->creep_precision = -pNew->creep_precision;
} }
}
} }
if (pNew->has_fsm) { if (pNew->has_fsm) {
@@ -3297,7 +3236,7 @@ int DMC2280Action(SConnection *pCon, SicsInterp *pSics, void *pData,
return 1; return 1;
} }
else if(strcasecmp("trace", argv[1]) == 0) { else if(strcasecmp("trace", argv[1]) == 0) {
if (strcasecmp("on", argv[2]) == 0) { if (argc > 2 && strcasecmp("on", argv[2]) == 0) {
self->trace = pCon; self->trace = pCon;
SCWrite(pCon, "TRACE ON", eValue); SCWrite(pCon, "TRACE ON", eValue);
} }
@@ -3310,3 +3249,14 @@ int DMC2280Action(SConnection *pCon, SicsInterp *pSics, void *pData,
} }
return MotorAction(pCon, pSics, pData, argc, argv); return MotorAction(pCon, pSics, pData, argc, argv);
} }
void DMC2280InitProtocol(SicsInterp *pSics) {
if (DMC2280_Protocol == NULL) {
DMC2280_Protocol = AsyncProtocolCreate(pSics, "DMC2280", NULL, NULL);
DMC2280_Protocol->sendCommand = DMC_Tx;
DMC2280_Protocol->handleInput = DMC_Rx;
DMC2280_Protocol->handleEvent = DMC_Ev;
DMC2280_Protocol->prepareTxn = NULL;
DMC2280_Protocol->killPrivate = NULL;
}
}

View File

@@ -11,10 +11,14 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
void DMC2280InitProtocol(SicsInterp *pSics);
MotorDriver *CreateDMC2280(SConnection *pCon, char* motor, char *params); MotorDriver *CreateDMC2280(SConnection *pCon, char* motor, char *params);
int DMC2280Action(SConnection *pCon, SicsInterp *pSics, void *pData, int DMC2280Action(SConnection *pCon, SicsInterp *pSics, void *pData,
int argc, char *argv[]); int argc, char *argv[]);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@@ -44,6 +44,9 @@ int NHQ200SetPar(pEVControl self, char *name, float fNew,
int NHQ200GetPar(pEVControl self, char *name, float *fVal); int NHQ200GetPar(pEVControl self, char *name, float *fVal);
int NHQ200List(pEVControl self, SConnection *pCon); int NHQ200List(pEVControl self, SConnection *pCon);
/*------------------------- The NHQ200 protocol ------------------------------*/
void NHQ200InitProtocol(SicsInterp *pSics);
#endif #endif

View File

@@ -9,7 +9,7 @@
#include <sys/time.h> #include <sys/time.h>
#include <sics.h> #include <sics.h>
#include "network.h" #include "network.h"
#include "multichan.h" #include "asyncqueue.h"
#include "nwatch.h" #include "nwatch.h"
#include "safetyplc.h" #include "safetyplc.h"
#include "sicsvar.h" #include "sicsvar.h"
@@ -41,6 +41,8 @@ extern int DMC2280MotionControl;
#define MOTOR_BOTH_BITS (MOTOR_ENABLED_BIT | MOTOR_DISABLED_BIT) #define MOTOR_BOTH_BITS (MOTOR_ENABLED_BIT | MOTOR_DISABLED_BIT)
#define ACCESS_BOTH_BITS (ACCESS_LOCKED_BIT | ACCESS_UNLOCKED_BIT) #define ACCESS_BOTH_BITS (ACCESS_LOCKED_BIT | ACCESS_UNLOCKED_BIT)
static pAsyncProtocol PLC_Protocol = NULL;
int PLC_UserPriv = 0; /* Internal */ int PLC_UserPriv = 0; /* Internal */
typedef enum { typedef enum {
Unknown_low, Invalid_high, Enabled, Disabled, Unknown_low, Invalid_high, Enabled, Disabled,
@@ -64,45 +66,27 @@ typedef struct __SafetyPLCController SafetyPLCController, *pSafetyPLCController;
struct __SafetyPLCController { struct __SafetyPLCController {
pObjectDescriptor pDes; pObjectDescriptor pDes;
pMultiChan mcc; /* associated MultiChan object */ pAsyncUnit unit; /* associated AsyncUnit object */
int iGetOut; int iGetOut;
int iValue; int iValue;
int oldValue; int oldValue;
pNWTimer nw_tmr; /* NetWait timer handle */ pNWTimer nw_tmr; /* periodic timer handle */
pNWTimer oneshot; /* oneshot timer handle */
int timeout; int timeout;
struct timeval tvSend;
};
typedef struct __command Command, *pCommand;
typedef int (*CommandCallback)(void* ctx, const char* resp, int resp_len);
struct __command {
pSafetyPLCController plc;
int cstate;
int lstate;
char* out_buf;
int out_len;
int out_idx;
char* inp_buf;
int inp_len;
int inp_idx;
CommandCallback func;
void* cntx;
}; };
static int PLC_GetState(void *pData, char *param, PLC_STATUS *retState); static int PLC_GetState(void *pData, char *param, PLC_STATUS *retState);
static int PLC_Tx(void* ctx) static int PLC_Tx(pAsyncProtocol p, pAsyncTxn myCmd)
{ {
int iRet = 1; int iRet = 1;
pCommand myCmd = (pCommand) ctx;
if (myCmd) { if (myCmd) {
gettimeofday(&myCmd->plc->tvSend, NULL); /* refresh */ myCmd->txn_status = ATX_ACTIVE;
iRet = MultiChanWrite(myCmd->plc->mcc, myCmd->out_buf, myCmd->out_len); iRet = AsyncUnitWrite(myCmd->unit, myCmd->out_buf, myCmd->out_len);
/* TODO handle errors */ /* TODO handle errors */
if (iRet < 0) { /* TODO: EOF */ if (iRet < 0) { /* TODO: EOF */
iRet = MultiChanReconnect(myCmd->plc->mcc); iRet = AsyncUnitReconnect(myCmd->unit);
if (iRet == 0) if (iRet == 0)
return 0; return 0;
} }
@@ -110,94 +94,52 @@ static int PLC_Tx(void* ctx)
return 1; return 1;
} }
static int PLC_Rx(void* ctx, int rxchar) static int PLC_Rx(pAsyncProtocol p, pAsyncTxn myCmd, int rxchar)
{ {
int iRet = 1; int iRet = 1;
pCommand myCmd = (pCommand) ctx;
if (rxchar == MCC_TIMEOUT) { switch (myCmd->txn_state) {
/* TODO: handle command timeout */
if (myCmd->func)
iRet = myCmd->func(myCmd->cntx, NULL, MCC_TIMEOUT);
free(myCmd->out_buf);
free(myCmd->inp_buf);
free(myCmd);
return MCC_POP_CMD;
}
switch (myCmd->cstate) {
case 0: /* first character */ case 0: /* first character */
/* normal data */ /* normal data */
myCmd->cstate = 1; myCmd->txn_state = 1;
/* note fallthrough */ /* note fallthrough */
case 1: /* receiving reply */ case 1: /* receiving reply */
if (myCmd->inp_idx < myCmd->inp_len) if (myCmd->inp_idx < myCmd->inp_len)
myCmd->inp_buf[myCmd->inp_idx++] = rxchar; myCmd->inp_buf[myCmd->inp_idx++] = rxchar;
if (rxchar == 0x0D) if (rxchar == 0x0D)
myCmd->cstate = 2; myCmd->txn_state = 2;
break; break;
case 2: /* received CR and looking for LF */ case 2: /* received CR and looking for LF */
if (myCmd->inp_idx < myCmd->inp_len) if (myCmd->inp_idx < myCmd->inp_len)
myCmd->inp_buf[myCmd->inp_idx++] = rxchar; myCmd->inp_buf[myCmd->inp_idx++] = rxchar;
if (rxchar == 0x0A) { if (rxchar == 0x0A) {
myCmd->cstate = 99; myCmd->txn_state = 99;
/* end of line */ /* end of line */
} }
else else
myCmd->cstate = 1; myCmd->txn_state = 1;
break; break;
} }
if (myCmd->cstate == 99) { if (myCmd->txn_state == 99) {
myCmd->inp_buf[myCmd->inp_idx] = '\0'; myCmd->inp_buf[myCmd->inp_idx] = '\0';
if (myCmd->func)
iRet = myCmd->func(myCmd->cntx, myCmd->inp_buf, myCmd->inp_idx);
else
iRet = 0; iRet = 0;
myCmd->cstate = 0; myCmd->txn_state = 0;
myCmd->inp_idx = 0; myCmd->txn_status = ATX_COMPLETE;
} }
if (iRet == 0) { /* end of command */ if (iRet == 0) { /* end of command */
free(myCmd->out_buf); return AQU_POP_CMD;
free(myCmd->inp_buf);
free(myCmd);
return MCC_POP_CMD;
} }
return iRet; return iRet;
} }
static int PLC_SendCmd(pSafetyPLCController self, static int PLC_Ev(pAsyncProtocol p, pAsyncTxn myCmd, int event)
char* command, int cmd_len,
CommandCallback callback, void* context, int rsp_len)
{ {
pCommand myCmd = NULL; if (event == AQU_TIMEOUT) {
/* TODO: handle command timeout */
assert(self); myCmd->txn_status = ATX_TIMEOUT;
assert(self->mcc); return AQU_POP_CMD;
myCmd = (pCommand) malloc(sizeof(Command));
assert(myCmd);
memset(myCmd, 0, sizeof(Command));
myCmd->out_buf = (char*) malloc(cmd_len + 5);
memcpy(myCmd->out_buf, command, cmd_len);
myCmd->out_len = cmd_len;
if (myCmd->out_len < 2 ||
myCmd->out_buf[myCmd->out_len - 1] != 0x0A ||
myCmd->out_buf[myCmd->out_len - 2] != 0x0D) {
myCmd->out_buf[myCmd->out_len++] = 0x0D;
myCmd->out_buf[myCmd->out_len++] = 0x0A;
} }
myCmd->out_buf[myCmd->out_len] = '\0'; return AQU_POP_CMD;
myCmd->func = callback;
myCmd->cntx = context;
if (rsp_len == 0)
myCmd->inp_buf = NULL;
else {
myCmd->inp_buf = malloc(rsp_len + 1);
memset(myCmd->inp_buf, 0, rsp_len + 1);
}
myCmd->inp_len = rsp_len;
myCmd->plc = self;
gettimeofday(&self->tvSend, NULL); /* refresh */
return MultiChanEnque(self->mcc, myCmd, PLC_Tx, PLC_Rx);
} }
static void PLC_Notify(void* context, int event) static void PLC_Notify(void* context, int event)
@@ -205,9 +147,9 @@ static void PLC_Notify(void* context, int event)
pSafetyPLCController self = (pSafetyPLCController) context; pSafetyPLCController self = (pSafetyPLCController) context;
switch (event) { switch (event) {
case MCC_RECONNECT: case AQU_RECONNECT:
do { do {
mkChannel* sock = MultiChanGetSocket(self->mcc); mkChannel* sock = AsyncUnitGetSocket(self->unit);
int flag = 1; int flag = 1;
setsockopt(sock->sockid, /* socket affected */ setsockopt(sock->sockid, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */ IPPROTO_TCP, /* set option at TCP level */
@@ -223,14 +165,16 @@ static void PLC_Notify(void* context, int event)
/* /*
* \brief GetCallback is the callback for the read command. * \brief GetCallback is the callback for the read command.
*/ */
static int GetCallback(void* ctx, const char* resp, int resp_len) static int GetCallback(pAsyncTxn txn)
{ {
int iRet,i; int iRet,i;
unsigned int iRead; unsigned int iRead;
char* resp = txn->inp_buf;
int resp_len = txn->inp_idx;
PLC_STATUS plcState; PLC_STATUS plcState;
pSicsVariable plcVar=NULL; pSicsVariable plcVar=NULL;
pSafetyPLCController self = (pSafetyPLCController) ctx; pSafetyPLCController self = (pSafetyPLCController) txn->cntx;
if (resp_len < 0) { if (resp_len < 0) {
DMC2280MotionControl = -1; DMC2280MotionControl = -1;
} }
@@ -269,20 +213,33 @@ static int MyTimerCallback(void* context, int mode)
{ {
pSafetyPLCController self = (pSafetyPLCController) context; pSafetyPLCController self = (pSafetyPLCController) context;
if (self->iGetOut) { if (self->iGetOut) {
struct timeval now;
gettimeofday(&now, NULL);
/* TODO error handling */ /* TODO error handling */
if (((now.tv_sec - self->tvSend.tv_sec) * 1000
+ (now.tv_usec / 1000)
- (self->tvSend.tv_usec / 1000)) < self->timeout)
return 1;
DMC2280MotionControl = -1;
} }
self->iGetOut = 1; self->iGetOut = 1;
PLC_SendCmd(self, "READ", 4, GetCallback, self, 132); AsyncUnitSendTxn(self->unit, "READ", 4, GetCallback, self, 132);
return 1; return 1;
} }
static int MyOneShotCallback(void* context, int mode)
{
pSafetyPLCController self = (pSafetyPLCController) context;
self->oneshot = 0;
AsyncUnitSendTxn(self->unit, "WRITE 0", 7, NULL, NULL, 132);
return 0;
}
/*
* \brief PutCallback is the callback for the write command.
*/
static int PutCallback(pAsyncTxn txn)
{
pSafetyPLCController self = (pSafetyPLCController) txn->cntx;
if (self->oneshot)
NetWatchRemoveTimer(self->oneshot);
NetWatchRegisterTimer(&self->oneshot, 500, MyOneShotCallback, self);
return 0;
}
static int PLC_GetState(void *pData, char *param, PLC_STATUS *retState) static int PLC_GetState(void *pData, char *param, PLC_STATUS *retState)
{ {
pSafetyPLCController self = (pSafetyPLCController) pData; pSafetyPLCController self = (pSafetyPLCController) pData;
@@ -427,13 +384,32 @@ static int PLC_Action(SConnection *pCon, SicsInterp *pSics,
} else if (argc == 3) { } else if (argc == 3) {
if (strcasecmp(argv[1], "hattach") == 0) { if (strcasecmp(argv[1], "hattach") == 0) {
} }
else if (strcasecmp(argv[1], "shutter") == 0) {
if (strcasecmp(argv[2], "open") == 0) {
/* open shutter */
AsyncUnitSendTxn(self->unit, "WRITE 1", 4, PutCallback, self, 132);
return OKOK;
}
else if (strcasecmp(argv[2], "close") == 0 ||
strcasecmp(argv[2], "shut") == 0) {
/* close shutter */
AsyncUnitSendTxn(self->unit, "WRITE 2", 4, PutCallback, self, 132);
return OKOK;
}
else {
snprintf(line, 132, "%s %s does not understand %s",
argv[0], argv[1], argv[2]);
SCWrite(pCon, line, eError);
return 0;
}
}
} }
snprintf(line, 132, "%s does not understand %s", argv[0], argv[1]); snprintf(line, 132, "%s does not understand %s", argv[0], argv[1]);
SCWrite(pCon, line, eError); SCWrite(pCon, line, eError);
return 0; return 0;
} }
static pSafetyPLCController PLC_Create(const char* pName, int port) static pSafetyPLCController PLC_Create(const char* pName)
{ {
pSafetyPLCController self = NULL; pSafetyPLCController self = NULL;
@@ -441,11 +417,11 @@ static pSafetyPLCController PLC_Create(const char* pName, int port)
if (self == NULL) if (self == NULL)
return NULL; return NULL;
memset(self, 0, sizeof(SafetyPLCController)); memset(self, 0, sizeof(SafetyPLCController));
if (MultiChanCreate(pName, &self->mcc) == 0) { if (AsyncUnitCreate(pName, &self->unit) == 0) {
free(self); free(self);
return NULL; return NULL;
} }
MultiChanSetNotify(self->mcc, self, PLC_Notify); AsyncUnitSetNotify(self->unit, self, PLC_Notify);
self->pDes = CreateDescriptor("SafetyPLC"); self->pDes = CreateDescriptor("SafetyPLC");
return self; return self;
@@ -457,7 +433,7 @@ static int PLC_Init(pSafetyPLCController self)
if (self->nw_tmr != NULL) if (self->nw_tmr != NULL)
NetWatchRemoveTimer(self->nw_tmr); NetWatchRemoveTimer(self->nw_tmr);
NetWatchRegisterTimerPeriodic(&self->nw_tmr, NetWatchRegisterTimerPeriodic(&self->nw_tmr,
1000, 100, 1000, 1000,
MyTimerCallback, MyTimerCallback,
self); self);
self->timeout=120000; /* huge */ self->timeout=120000; /* huge */
@@ -473,6 +449,17 @@ static void PLC_Kill(void* pData)
return; return;
} }
void SafetyPLCInitProtocol(SicsInterp *pSics) {
if (PLC_Protocol == NULL) {
PLC_Protocol = AsyncProtocolCreate(pSics, "SafetyPLC", NULL, NULL);
PLC_Protocol->sendCommand = PLC_Tx;
PLC_Protocol->handleInput = PLC_Rx;
PLC_Protocol->handleEvent = PLC_Ev;
PLC_Protocol->prepareTxn = NULL;
PLC_Protocol->killPrivate = NULL;
}
}
int SafetyPLCFactory(SConnection *pCon, SicsInterp *pSics, int SafetyPLCFactory(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]) void *pData, int argc, char *argv[])
{ {
@@ -482,7 +469,7 @@ int SafetyPLCFactory(SConnection *pCon, SicsInterp *pSics,
pSicsVariable plcVar=NULL; pSicsVariable plcVar=NULL;
PLC_STATUS plcState; PLC_STATUS plcState;
if(argc < 4) if(argc < 3)
{ {
SCWrite(pCon,"ERROR: insufficient no of arguments to SafetyPLCFactory", SCWrite(pCon,"ERROR: insufficient no of arguments to SafetyPLCFactory",
eError); eError);
@@ -492,7 +479,7 @@ int SafetyPLCFactory(SConnection *pCon, SicsInterp *pSics,
/* /*
create data structure and open port create data structure and open port
*/ */
pNew = PLC_Create(argv[2], atoi(argv[3])); pNew = PLC_Create(argv[2]);
if(!pNew) if(!pNew)
{ {
@@ -524,5 +511,6 @@ int SafetyPLCFactory(SConnection *pCon, SicsInterp *pSics,
PLC_Kill(pNew); PLC_Kill(pNew);
return 0; return 0;
} }
SCSendOK(pCon);
return 1; return 1;
} }

View File

@@ -7,6 +7,8 @@
#ifndef SICSSAFETYPLC #ifndef SICSSAFETYPLC
#define SICSSAFETYPLC #define SICSSAFETYPLC
void SafetyPLCInitProtocol(SicsInterp *pSics);
int SafetyPLCFactory(SConnection *pCon, SicsInterp *pSics, int SafetyPLCFactory(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]); void *pData, int argc, char *argv[]);