- Added asynchronous IO code from ANSTO

- Added new ACT protocol
- Extended sicshdbadapter to cover counters and status to put the status
 into Hipadaba
- Fixes to napi5.c
- Exe now supports hdbrun which allows to write output for a buffer into
 hdb node.
This commit is contained in:
koennecke
2007-06-22 11:44:46 +00:00
parent d5ff6410bc
commit 08c5e037a0
24 changed files with 13224 additions and 1291 deletions

338
asyncprotocol.c Normal file
View File

@ -0,0 +1,338 @@
#include <sics.h>
#include <asyncprotocol.h>
#include <asyncqueue.h>
int defaultSendCommand(pAsyncProtocol p, pAsyncTxn txn) {
int i, iRet;
int state;
const char *term = "\r\n";
if (p->sendTerminator)
term = p->sendTerminator;
state = 0;
for (i = 0; i < txn->out_len; ++i) {
if (txn->out_buf[i] == 0x00) { /* end of transmission */
break;
}
else if (txn->out_buf[i] == term[state]) {
++state;
continue;
}
state = 0;
}
txn->txn_state = 0;
iRet = AsyncUnitWrite(txn->unit, txn->out_buf, txn->out_len);
if (iRet <= 0)
return iRet;
if (term[state] != 0)
iRet = AsyncUnitWrite(txn->unit,(void *) term, strlen(term));
return iRet;
}
int defaultHandleInput(pAsyncProtocol p, pAsyncTxn txn, int ch) {
const char *term = "\r\n";
if (p->replyTerminator)
term = p->replyTerminator;
if (ch == term[txn->txn_state])
++txn->txn_state;
else
txn->txn_state = 0;
if (txn->inp_idx < txn->inp_len)
txn->inp_buf[txn->inp_idx++] = ch;
if (term[txn->txn_state] == 0) {
if (txn->inp_idx < txn->inp_len)
txn->inp_buf[txn->inp_idx] = '\0';
return AQU_POP_CMD;
}
return 1;
}
int defaultHandleEvent(pAsyncProtocol p, pAsyncTxn txn, int event) {
/* TODO: what could or should we do to handle the event */
return AQU_POP_CMD;
}
int defaultPrepareTxn(pAsyncProtocol p, pAsyncTxn txn, const char* cmd, int cmd_len, int rsp_len) {
int i;
int state;
const char *term = "\r\n";
if (p->sendTerminator)
term = p->sendTerminator;
state = 0;
for (i = 0; i < cmd_len; ++i) {
if (cmd[i] == 0x00) { /* end of transmission */
cmd_len = i;
break;
}
else if (cmd[i] == term[state]) {
++state;
continue;
}
state = 0;
}
if (term[state] == 0) {
/* outgoing command is correctly terminated */
txn->out_buf = malloc(cmd_len + 1);
if (txn->out_buf == NULL) {
SICSLogWrite("Out of memory in AsyncProtocol::defaultPrepareTxn", eError);
return 0;
}
memcpy(txn->out_buf, cmd, cmd_len + 1);
}
else {
/* outgoing command is NOT correctly terminated */
int tlen = strlen(term);
txn->out_buf = malloc(cmd_len + tlen + 1);
if (txn->out_buf == NULL) {
SICSLogWrite("Out of memory in AsyncProtocol::defaultPrepareTxn", eError);
return 0;
}
memcpy(txn->out_buf, cmd, cmd_len);
memcpy(txn->out_buf + cmd_len, term, tlen + 1);
cmd_len += tlen;
}
txn->out_len = cmd_len;
txn->out_idx = 0;
txn->inp_buf = malloc(rsp_len);
if (txn->inp_buf == NULL) {
SICSLogWrite("Out of memory in AsyncProtocol::defaultPrepareTxn", eError);
free(txn->out_buf);
txn->out_buf = NULL;
return 0;
}
txn->inp_len = rsp_len;
txn->inp_idx = 0;
txn->txn_state = 0;
txn->txn_status = 0;
return 1;
}
static const char* hex = "0123456789ABCDEF";
/*--------------------------------------------------------------------*/
static void encodeTerminator(char *result, char *terminator)
{
if (terminator)
while (*terminator) {
*result++ = '0';
*result++ = 'x';
*result++ = hex[(*terminator >> 4) &0xF];
*result++ = hex[(*terminator) &0xF];
++terminator;
}
*result = '\0';
return;
}
static int fromHex(const char* code) {
int icode = -1;
int result = -1;
if (code[0] == '0' && (code[1] == 'x' || code[1] == 'X')) {
if (code[2] >= '0' && code[2] <= '9')
icode = (code[2] - '0');
else if (code[2] >= 'a' && code[2] <= 'f')
icode = 10 + (code[2] - 'a');
else if (code[2] >= 'A' && code[2] <= 'F')
icode = 10 + (code[2] - 'A');
if (icode < 0)
return -1;
result = icode << 4;
icode = -1;
if (code[3] >= '0' && code[3] <= '9')
icode = (code[3] - '0');
else if (code[3] >= 'a' && code[3] <= 'f')
icode = 10 + (code[3] - 'a');
else if (code[3] >= 'A' && code[3] <= 'F')
icode = 10 + (code[3] - 'A');
if (icode < 0)
return -1;
result |= icode;
return result;
}
return -1;
}
/*--------------------------------------------------------------------*/
static char *decodeTerminator(char *code)
{
int count = 0, icode;
char *pResult;
char* pCh;
char* pQt = NULL; /* pointer to quote character if found */
if (code == NULL)
return NULL;
count = strlen(code);
pResult = (char *) malloc(count + 1);
if (!pResult) {
SICSLogWrite("Out of memory in AsyncProtocol::decodeTerminator", eError);
return NULL;
}
memset(pResult, 0, count + 1);
pCh = pResult;
if (*code == '\'' || *code == '"') /* check for leading quote */
pQt = code++;
while (*code) {
if (pQt && *code == *pQt) /* check for trailing quote */
break;
if (code[0] == '\\' && code[1] == 'r') { /* CR */
*pCh++ = '\r';
code += 2;
}
else if (code[0] == '\\' && code[1] == 'n') { /* LF */
*pCh++ = '\n';
code += 2;
}
else if ((icode = fromHex(code)) >= 0) { /* Hex: 0xFF */
*pCh++ = icode;
code += 4;
}
else /* literal */
*pCh++ = *code++;
}
*pCh = '\0';
return pResult;
}
int AsyncProtocolNoAction(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[])
{
char line[132];
pAsyncProtocol self = (pAsyncProtocol) pData;
snprintf(line, 132, "%s does not understand %s", argv[0], argv[1]);
SCWrite(pCon, line, eError);
return 0;
}
int AsyncProtocolAction(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[])
{
char line[132];
pAsyncProtocol self = (pAsyncProtocol) pData;
if (argc > 1) {
/* handle genecic parameters like terminators */
if (strcasecmp(argv[1], "sendterminator") == 0) {
if (argc > 2) {
char* pPtr = decodeTerminator(argv[2]);
if (pPtr) {
if (self->sendTerminator)
free(self->sendTerminator);
self->sendTerminator = pPtr;
}
SCSendOK(pCon);
}
else
{
char term[132];
char line[1024];
encodeTerminator(term, self->sendTerminator);
sprintf(line, "%s.sendTerminator = \"%s\"", argv[0], term);
SCWrite(pCon, line, eValue);
}
return 1;
}
else if (strcasecmp(argv[1], "replyterminator") == 0) {
if (argc > 2) {
char* pPtr = decodeTerminator(argv[2]);
if (pPtr) {
if (self->replyTerminator)
free(self->replyTerminator);
self->replyTerminator = pPtr;
}
SCSendOK(pCon);
}
else
{
char term[132];
char line[1024];
encodeTerminator(term, self->replyTerminator);
sprintf(line, "%s.replyTerminator = \"%s\"", argv[0], term);
SCWrite(pCon, line, eValue);
}
return 1;
}
}
else if (strcasecmp(argv[1], "list") == 0) {
int ac = 2;
char* av[3] = { argv[0], 0, 0 };
av[1] = "sendterminator";
AsyncProtocolAction(pCon, pSics, pData, ac, av);
av[1] = "replyterminator";
AsyncProtocolAction(pCon, pSics, pData, ac, av);
return 1;
}
/* handle any other actions here */
return AsyncProtocolNoAction(pCon, pSics, pData, argc,argv);
}
void defaultKillPrivate(pAsyncProtocol p) {
if (p->privateData) {
/* TODO: should we do anything? */
free(p->privateData);
}
}
void AsyncProtocolKill(void *pData) {
pAsyncProtocol self = (pAsyncProtocol) pData;
if(self->pDes)
DeleteDescriptor(self->pDes);
if(self->sendTerminator != NULL)
free(self->sendTerminator);
if(self->replyTerminator != NULL)
free(self->replyTerminator);
if (self->killPrivate)
self->killPrivate(self);
}
pAsyncProtocol AsyncProtocolCreate(SicsInterp *pSics, const char* protocolName,
ObjectFunc pFunc, KillFunc pKFunc) {
int iRet;
pAsyncProtocol self = NULL;
/* try to find an existing queue with this name */
self = (pAsyncProtocol) FindCommandData(pServ->pSics,(char *)protocolName, "AsyncProtocol");
if (self != NULL) {
return self;
}
self = (pAsyncProtocol) malloc(sizeof(AsyncProtocol));
if (self == NULL) {
SICSLogWrite("Out of memory in AsyncProtocolCreate", eError);
return NULL;
}
memset(self, 0, sizeof(AsyncProtocol));
self->pDes = CreateDescriptor("AsyncProtocol");
if (pFunc == NULL)
pFunc = AsyncProtocolNoAction;
if (pKFunc == NULL)
pKFunc = AsyncProtocolKill;
iRet = AddCommand(pSics, (char *)protocolName, pFunc, pKFunc, self);
if (!iRet ) {
SICSLogWrite("AddCommand failed in AsyncProtocolCreate", eError);
AsyncProtocolKill(self);
return NULL;
}
self->sendCommand = defaultSendCommand;
self->handleInput = defaultHandleInput;
self->handleEvent = defaultHandleEvent;
self->prepareTxn = defaultPrepareTxn;
self->killPrivate = defaultKillPrivate;
self->sendTerminator = strdup("\r\n");
self->replyTerminator = strdup("\r\n");
return self;
}
int AsyncProtocolFactory(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]) {
if (argc < 2) {
SCWrite(pCon,"ERROR: insufficient arguments to AsyncProtocolFactory", eError);
return 0;
}
pAsyncProtocol pNew = AsyncProtocolCreate(pSics, argv[1],
AsyncProtocolAction, AsyncProtocolKill);
/* handle any extra arguments here */
pNew->privateData = NULL;
return 1;
}

61
asyncprotocol.h Normal file
View File

@ -0,0 +1,61 @@
#ifndef ASYNCPROTOCOL
#define ASYNCPROTOCOL
typedef struct __AsyncUnit AsyncUnit, *pAsyncUnit;
typedef struct __async_txn AsyncTxn, *pAsyncTxn;
typedef int (*AsyncTxnHandler)(pAsyncTxn pTxn);
typedef struct __async_protocol AsyncProtocol, *pAsyncProtocol;
pAsyncProtocol AsyncProtocolCreate(SicsInterp *pSics, const char* protocolName,
ObjectFunc pFunc, KillFunc pKFunc);
int AsyncProtocolAction(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]);
int AsyncProtocolFactory(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]);
typedef enum {
ATX_NULL=0,
ATX_TIMEOUT=-1,
ATX_ACTIVE=1,
ATX_COMPLETE=2
} ATX_STATUS;
struct __async_txn {
pAsyncUnit unit; /**< unit that transaction is associated with */
int txn_state; /**< protocol handler transaction parse state */
ATX_STATUS txn_status; /**< status of the transaction OK, Error, ... */
int txn_timeout; /**< transaction timeout in milliseconds */
char* out_buf; /**< output buffer for sendCommand */
int out_len; /**< length of data to be sent */
int out_idx; /**< index of next character to transmit */
char* inp_buf; /**< input buffer for transaction response */
int inp_len; /**< length of input buffer */
int inp_idx; /**< index of next character (number already received) */
AsyncTxnHandler handleResponse; /**< Txn response handler of command sender */
void* cntx; /**< opaque context used by command sender */
/* The cntx field may be used by protocol handler from sendCommand
* as long as it is restored when response is complete
*/
};
/*
* The async protocol interface virtual function table
*/
struct __async_protocol {
pObjectDescriptor pDes;
char* protocolName;
char *sendTerminator;
char *replyTerminator;
void* privateData;
int (* sendCommand)(pAsyncProtocol p, pAsyncTxn txn);
int (* handleInput)(pAsyncProtocol p, pAsyncTxn txn, int ch);
int (* handleEvent)(pAsyncProtocol p, pAsyncTxn txn, int event);
int (* prepareTxn)(pAsyncProtocol p, pAsyncTxn txn, const char* cmd, int cmd_len, int rsp_len);
void (* killPrivate)(pAsyncProtocol p);
};
#endif /* ASYNCPROTOCOL */

956
asyncqueue.c Normal file
View File

@ -0,0 +1,956 @@
/*
* A S Y N C Q U E U E
*
* This module manages AsyncQueue communications.
*
* The AsyncQueue is an asynchronous queue between drivers and the device. It
* supports multiple logical units on a single device controller that share a
* single command channel.
*
* Douglas Clowes, February 2007
*
*/
#include <sys/time.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <sics.h>
#include <rs232controller.h>
#include "network.h"
#include "asyncqueue.h"
#include "nwatch.h"
typedef struct __AsyncQueue AsyncQueue, *pAsyncQueue;
typedef struct __async_command AQ_Cmd, *pAQ_Cmd;
struct __async_command {
pAQ_Cmd next;
pAsyncTxn tran;
pAsyncUnit unit;
int timeout;
int retries;
int active;
};
struct __AsyncUnit {
pAsyncUnit next;
pAsyncQueue queue;
AQU_Notify notify_func;
void* notify_cntx;
};
struct __AsyncQueue {
pObjectDescriptor pDes;
char* queue_name;
char* pHost;
int iPort;
int iDelay; /* intercommand delay in milliseconds */
int timeout;
int retries;
struct timeval tvLastCmd; /* time of completion of last command */
int unit_count; /* number of units connected */
pAsyncUnit units; /* head of unit chain */
pAQ_Cmd command_head; /* first/next command in queue */
pAQ_Cmd command_tail; /* last command in queue */
pNWContext nw_ctx; /* NetWait context handle */
pNWTimer nw_tmr; /* NetWait timer handle */
mkChannel* pSock; /* socket address */
pAsyncProtocol protocol;
};
static pAsyncQueue queue_array[FD_SETSIZE];
static int queue_index = 0;
/* ---------------------------- Local ------------------------------------
CreateSocketAdress stolen from Tcl. Thanks to John Ousterhout
*/
static int
CreateSocketAdress(
struct sockaddr_in *sockaddrPtr, /* Socket address */
char *host, /* Host. NULL implies INADDR_ANY */
int port) /* Port number */
{
struct hostent *hostent; /* Host database entry */
struct in_addr addr; /* For 64/32 bit madness */
(void) memset((char *) sockaddrPtr, '\0', sizeof(struct sockaddr_in));
sockaddrPtr->sin_family = AF_INET;
sockaddrPtr->sin_port = htons((unsigned short) (port & 0xFFFF));
if (host == NULL) {
addr.s_addr = INADDR_ANY;
} else {
hostent = gethostbyname(host);
if (hostent != NULL) {
memcpy((char *) &addr,
(char *) hostent->h_addr_list[0], (size_t) hostent->h_length);
} else {
addr.s_addr = inet_addr(host);
if (addr.s_addr == (unsigned long)-1) {
return 0; /* error */
}
}
}
/*
* There is a rumor that this assignment may require care on
* some 64 bit machines.
*/
sockaddrPtr->sin_addr.s_addr = addr.s_addr;
return 1;
}
static void AQ_Notify(pAsyncQueue self, int event)
{
pAsyncUnit unit;
for (unit = self->units; unit; unit = unit->next)
if (unit->notify_func != NULL)
unit->notify_func(unit->notify_cntx, event);
}
static int AQ_Reconnect(pAsyncQueue self)
{
int iRet;
int sock;
int flag = 1;
char line[132];
iRet = NETReconnect(self->pSock);
if (iRet <= 0) {
snprintf(line, 132, "Disconnect on AsyncQueue '%s'", self->queue_name);
SICSLogWrite(line, eStatus);
AQ_Notify(self, AQU_DISCONNECT);
return iRet;
}
snprintf(line, 132, "Reconnect on AsyncQueue '%s'", self->queue_name);
SICSLogWrite(line, eStatus);
AQ_Notify(self, AQU_RECONNECT);
return 1;
}
static int CommandTimeout(void* cntx, int mode);
static int DelayedStart(void* cntx, int mode);
static int StartCommand(pAsyncQueue self)
{
pAQ_Cmd myCmd = self->command_head;
mkChannel* sock = self->pSock;
if (myCmd == NULL)
return OKOK;
/*
* Remove any old command timeout timer
*/
if (self->nw_tmr)
NetWatchRemoveTimer(self->nw_tmr);
/*
* Implement the inter-command delay
*/
if (self->iDelay) {
struct timeval now, when;
gettimeofday(&now, NULL);
if (self->tvLastCmd.tv_sec == 0)
self->tvLastCmd = now;
when.tv_sec = self->tvLastCmd.tv_sec;
when.tv_usec = self->tvLastCmd.tv_usec + 1000 * self->iDelay;
if (when.tv_usec >= 1000000) {
when.tv_sec += when.tv_usec / 1000000;
when.tv_usec %= 1000000;
}
if (when.tv_sec > now.tv_sec ||
(when.tv_sec == now.tv_sec && when.tv_usec > now.tv_usec)) {
int delay = when.tv_sec - now.tv_sec;
delay *= 1000;
delay += (when.tv_usec - now.tv_usec + (1000 - 1)) / 1000;
NetWatchRegisterTimer(&self->nw_tmr, delay,
DelayedStart, self);
return OKOK;
}
}
/*
* Discard any input before sending command
*/
while (NETAvailable(sock, 0)) {
/* TODO: handle unsolicited input */
char reply[1];
int iRet;
iRet = NETRead(sock, reply, 1, 0);
if (iRet < 0) { /* EOF */
iRet = AQ_Reconnect(self);
if (iRet == 0)
return 0;
}
}
/*
* Add a new command timeout timer
*/
if (myCmd->timeout > 0)
NetWatchRegisterTimer(&self->nw_tmr, myCmd->timeout,
CommandTimeout, self);
else
NetWatchRegisterTimer(&self->nw_tmr, 30000,
CommandTimeout, self);
myCmd->active = 1;
return self->protocol->sendCommand(self->protocol, myCmd->tran);
}
static int QueCommandHead(pAsyncQueue self, pAQ_Cmd cmd)
{
cmd->next = NULL;
/*
* If the command queue is empty, start transmission
*/
if (self->command_head == NULL) {
self->command_head = self->command_tail = cmd;
StartCommand(self);
return 1;
}
if (self->command_head->active) {
cmd->next = self->command_head->next;
self->command_head->next = cmd;
}
else {
cmd->next = self->command_head;
self->command_head = cmd;
}
if (cmd->next == NULL)
self->command_tail = cmd;
return 1;
}
static int QueCommand(pAsyncQueue self, pAQ_Cmd cmd)
{
cmd->next = NULL;
/*
* If the command queue is empty, start transmission
*/
if (self->command_head == NULL) {
self->command_head = self->command_tail = cmd;
StartCommand(self);
return 1;
}
self->command_tail->next = cmd;
self->command_tail = cmd;
return 1;
}
static int PopCommand(pAsyncQueue self)
{
pAQ_Cmd myCmd = self->command_head;
if (self->nw_tmr)
NetWatchRemoveTimer(self->nw_tmr);
self->nw_tmr = 0;
gettimeofday(&self->tvLastCmd, NULL);
/*
* If this is not the last in queue, start transmission
*/
if (myCmd->next) {
pAQ_Cmd pNew = myCmd->next;
self->command_head = pNew;
StartCommand(self);
}
else
self->command_head = self->command_tail = NULL;
free(myCmd->tran->out_buf);
free(myCmd->tran->inp_buf);
free(myCmd->tran);
free(myCmd);
return 1;
}
static int CommandTimeout(void* cntx, int mode)
{
pAsyncQueue self = (pAsyncQueue) cntx;
pAQ_Cmd myCmd = self->command_head;
self->nw_tmr = 0;
if (myCmd->retries > 0) {
--myCmd->retries;
StartCommand(self);
}
else {
int iRet;
iRet = self->protocol->handleEvent(self->protocol, myCmd->tran, AQU_TIMEOUT);
if (iRet == AQU_POP_CMD) {
if (myCmd->tran->handleResponse)
myCmd->tran->handleResponse(myCmd->tran);
PopCommand(self); /* remove command */
}
else if (iRet == AQU_RETRY_CMD)
StartCommand(self); /* restart command */
else if (iRet == AQU_RECONNECT)
AQ_Reconnect(self);
}
return 1;
}
static int DelayedStart(void* cntx, int mode)
{
pAsyncQueue self = (pAsyncQueue) cntx;
self->nw_tmr = 0;
StartCommand(self);
return 1;
}
static int MyCallback(void* context, int mode)
{
pAsyncQueue self = (pAsyncQueue) context;
if (mode & nwatch_read) {
int iRet;
char reply[1];
iRet = NETRead(self->pSock, reply, 1, 0);
if (iRet < 0) { /* EOF */
iRet = AQ_Reconnect(self);
if (iRet <= 0)
return iRet;
/* restart the command */
StartCommand(self);
return 1;
}
if (iRet == 0) { /* TODO: timeout or error */
return 0;
} else {
pAQ_Cmd myCmd = self->command_head;
if (myCmd) {
iRet = self->protocol->handleInput(self->protocol, myCmd->tran, reply[0]);
if (iRet == 0 || iRet == AQU_POP_CMD) { /* end of command */
if (myCmd->tran->handleResponse)
myCmd->tran->handleResponse(myCmd->tran);
PopCommand(self);
}
else if (iRet < 0) /* TODO: error */
;
}
else {
/* TODO: handle unsolicited input */
}
}
}
return 1;
}
int AsyncUnitEnqueHead(pAsyncUnit unit, pAsyncTxn context)
{
pAQ_Cmd myCmd = NULL;
assert(unit && unit->queue && unit->queue->protocol);
myCmd = (pAQ_Cmd) malloc(sizeof(AQ_Cmd));
if (myCmd == NULL) {
SICSLogWrite("ERROR: Out of memory in AsyncUnitEnqueHead", eError);
return 0;
}
memset(myCmd, 0, sizeof(AQ_Cmd));
myCmd->tran = context;
myCmd->unit = unit;
myCmd->timeout = unit->queue->timeout;
myCmd->retries = unit->queue->retries;
myCmd->active = 0;
return QueCommandHead(unit->queue, myCmd);
}
int AsyncUnitEnqueueTxn(pAsyncUnit unit, pAsyncTxn pTxn)
{
pAQ_Cmd myCmd = NULL;
assert(unit && unit->queue && unit->queue->protocol);
myCmd = (pAQ_Cmd) malloc(sizeof(AQ_Cmd));
if (myCmd == NULL) {
SICSLogWrite("ERROR: Out of memory in AsyncUnitEnqueueTxn", eError);
return 0;
}
memset(myCmd, 0, sizeof(AQ_Cmd));
myCmd->tran = pTxn;
myCmd->unit = unit;
myCmd->timeout = unit->queue->timeout;
myCmd->retries = unit->queue->retries;
myCmd->active = 0;
return QueCommand(unit->queue, myCmd);
}
pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
const char* command, int cmd_len,
AsyncTxnHandler callback, void* context,
int rsp_len)
{
pAsyncTxn myTxn = NULL;
assert(unit);
myTxn = (pAsyncTxn) malloc(sizeof(AsyncTxn));
if (myTxn == NULL) {
SICSLogWrite("ERROR: Out of memory in AsyncUnitPrepareTxn", eError);
return 0;
}
memset(myTxn, 0, sizeof(AsyncTxn));
if (unit->queue->protocol->prepareTxn) {
int iRet;
iRet = unit->queue->protocol->prepareTxn(unit->queue->protocol, myTxn, command, cmd_len, rsp_len);
}
else {
myTxn->out_buf = (char*) malloc(cmd_len + 5);
if (myTxn->out_buf == NULL) {
SICSLogWrite("ERROR: Out of memory in AsyncUnitPrepareTxn", eError);
free(myTxn);
return 0;
}
memcpy(myTxn->out_buf, command, cmd_len);
myTxn->out_len = cmd_len;
if (myTxn->out_len < 2 ||
myTxn->out_buf[myTxn->out_len - 1] != 0x0A ||
myTxn->out_buf[myTxn->out_len - 2] != 0x0D) {
myTxn->out_buf[myTxn->out_len++] = 0x0D;
myTxn->out_buf[myTxn->out_len++] = 0x0A;
}
myTxn->out_buf[myTxn->out_len] = '\0';
}
if (rsp_len == 0)
myTxn->inp_buf = NULL;
else {
myTxn->inp_buf = malloc(rsp_len + 1);
if (myTxn->inp_buf == NULL) {
SICSLogWrite("ERROR: Out of memory in AsyncUnitPrepareTxn", eError);
free(myTxn->out_buf);
free(myTxn);
return 0;
}
memset(myTxn->inp_buf, 0, rsp_len + 1);
}
myTxn->inp_len = rsp_len;
myTxn->unit = unit;
myTxn->handleResponse = callback;
myTxn->cntx = context;
return myTxn;
}
int AsyncUnitSendTxn(pAsyncUnit unit,
const char* command, int cmd_len,
AsyncTxnHandler callback, void* context,
int rsp_len)
{
pAsyncTxn myTxn = NULL;
myTxn = AsyncUnitPrepareTxn(unit, command, cmd_len,
callback, context, rsp_len);
if (myTxn == NULL)
return -1;
return AsyncUnitEnqueueTxn(unit, myTxn);
}
typedef struct txn_s {
char* transReply;
int transWait;
} TXN, *pTXN;
/**
* \brief TransCallback is the callback for the general command transaction.
*/
static int TransCallback(pAsyncTxn pCmd) {
char* resp = pCmd->inp_buf;
int resp_len = pCmd->inp_idx;
pTXN self = (pTXN) pCmd->cntx;
if (pCmd->txn_status == ATX_TIMEOUT) {
memcpy(self->transReply, resp, resp_len);
self->transReply[resp_len] = '\0';
self->transReply[0] = '\0';
self->transWait = -1;
}
else {
memcpy(self->transReply, resp, resp_len);
self->transReply[resp_len] = '\0';
self->transWait = 0;
}
return 0;
}
int AsyncUnitTransact(pAsyncUnit unit,
const char* command, int cmd_len,
char* response, int rsp_len)
{
TXN txn;
assert(unit);
txn.transReply = response;
txn.transWait = 1;
AsyncUnitSendTxn(unit,
command, cmd_len,
TransCallback, &txn, rsp_len);
while (txn.transWait == 1)
TaskYield(pServ->pTasker);
if (txn.transWait < 0)
return txn.transWait;
return 1;
}
int AsyncUnitWrite(pAsyncUnit unit, void* buffer, int buflen)
{
int iRet;
mkChannel* sock;
assert(unit);
assert(unit->queue);
if (buflen > 0) {
sock = AsyncUnitGetSocket(unit);
iRet = NETWrite(sock, buffer, buflen);
/* TODO handle errors */
if (iRet < 0) { /* EOF */
iRet = AQ_Reconnect(unit->queue);
if (iRet == 0)
return 0;
}
}
return 1;
}
void AsyncUnitSetNotify(pAsyncUnit unit, void* context, AQU_Notify notify)
{
assert(unit);
unit->notify_func = notify;
unit->notify_cntx = context;
}
int AsyncUnitGetDelay(pAsyncUnit unit)
{
assert(unit);
return unit->queue->iDelay;
}
void AsyncUnitSetDelay(pAsyncUnit unit, int iDelay)
{
assert(unit);
unit->queue->iDelay = iDelay;
}
int AsyncUnitGetTimeout(pAsyncUnit unit)
{
assert(unit);
return unit->queue->timeout;
}
void AsyncUnitSetTimeout(pAsyncUnit unit, int timeout)
{
assert(unit);
unit->queue->timeout = timeout;
}
int AsyncUnitGetRetries(pAsyncUnit unit)
{
assert(unit);
return unit->queue->retries;
}
void AsyncUnitSetRetries(pAsyncUnit unit, int retries)
{
assert(unit);
unit->queue->retries = retries;
}
pAsyncProtocol AsyncUnitGetProtocol(pAsyncUnit unit)
{
return unit->queue->protocol;
}
void AsyncUnitSetProtocol(pAsyncUnit unit, pAsyncProtocol protocol)
{
unit->queue->protocol = protocol;
}
mkChannel* AsyncUnitGetSocket(pAsyncUnit unit)
{
assert(unit);
assert(unit->queue);
return unit->queue->pSock;
}
int AsyncUnitReconnect(pAsyncUnit unit)
{
int iRet;
assert(unit);
assert(unit->queue);
iRet = AQ_Reconnect(unit->queue);
/* TODO: handle in-progress */
return iRet;
}
int AsyncQueueAction(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[])
{
char line[132];
pAsyncQueue self = (pAsyncQueue) pData;
if (argc > 1) {
if (strcasecmp("send", argv[1]) == 0) {
AsyncUnit myUnit;
char cmd[10240];
char rsp[10240];
int idx = 0;
int i, j;
cmd[0] = '\0';
for (i = 2; i < argc; ++i) {
j = snprintf(&cmd[idx], 10240 - idx, "%s%s",
(i > 2) ? " " : "",
argv[i]);
if (j < 0)
break;
idx += j;
}
memset(&myUnit, 0, sizeof(AsyncUnit));
myUnit.queue = self;
AsyncUnitTransact(&myUnit, cmd, idx, rsp, 10240);
SCWrite(pCon, rsp, eValue);
return 1;
}
if (strcasecmp(argv[1], "reconnect") == 0) {
AQ_Reconnect(self);
return OKOK;
}
if (strcasecmp(argv[1], "delay") == 0) {
if (argc > 2) {
int delay;
int iRet;
iRet = sscanf(argv[2], "%d", &delay);
if (iRet != 1) {
snprintf(line, 132, "Invalid argument: %s", argv[2]);
SCWrite(pCon, line, eError);
return 0;
}
else {
if (delay < 0 || delay > 30000) {
snprintf(line, 132, "Value out of range: %d", delay);
SCWrite(pCon, line, eError);
return 0;
}
self->iDelay = delay;
return OKOK;
}
}
else {
snprintf(line, 132, "%s.delay = %d", argv[0], self->iDelay);
SCWrite(pCon, line, eStatus);
return OKOK;
}
return OKOK;
}
if (strcasecmp(argv[1], "timeout") == 0) {
if (argc > 2) {
int timeout;
int iRet;
iRet = sscanf(argv[2], "%d", &timeout);
if (iRet != 1) {
snprintf(line, 132, "Invalid argument: %s", argv[2]);
SCWrite(pCon, line, eError);
return 0;
}
else {
if (timeout < 0 || timeout > 30000) {
snprintf(line, 132, "Value out of range: %d", timeout);
SCWrite(pCon, line, eError);
return 0;
}
self->timeout = timeout;
return OKOK;
}
}
else {
snprintf(line, 132, "%s.timeout = %d", argv[0], self->timeout);
SCWrite(pCon, line, eStatus);
return OKOK;
}
return OKOK;
}
if (strcasecmp(argv[1], "retries") == 0) {
if (argc > 2) {
int retries;
int iRet;
iRet = sscanf(argv[2], "%d", &retries);
if (iRet != 1) {
snprintf(line, 132, "Invalid argument: %s", argv[2]);
SCWrite(pCon, line, eError);
return 0;
}
else {
if (retries < 0 || retries > 30000) {
snprintf(line, 132, "Value out of range: %d", retries);
SCWrite(pCon, line, eError);
return 0;
}
self->retries = retries;
return OKOK;
}
}
else {
snprintf(line, 132, "%s.retries = %d", argv[0], self->retries);
SCWrite(pCon, line, eStatus);
return OKOK;
}
return OKOK;
}
}
snprintf(line, 132, "%s does not understand %s", argv[0], argv[1]);
SCWrite(pCon, line, eError);
return 0;
}
static pAsyncQueue AQ_Create(const char* host, const char* port)
{
int i;
pAsyncQueue self = NULL;
mkChannel* channel = NULL;
if (host == NULL)
return NULL;
/* try the AsyncQueue with this name */
self = (pAsyncQueue) FindCommandData(pServ->pSics,(char *) host, "AsyncQueue");
/* try host and port */
if (self == NULL && port) {
int port_no = atoi(port);
if (port_no == 0) {
struct servent *sp=NULL;
sp = getservbyname(port, NULL);
if (sp)
port_no = ntohs(sp->s_port);
}
if (port_no > 0) {
struct sockaddr_in sa;
if (CreateSocketAdress(&sa,(char *) host, port_no)) {
/* look for queue with same address */
for (i = 0; i < queue_index; ++i)
if (queue_array[i]->pSock->adresse.sin_port == sa.sin_port
&& queue_array[i]->pSock->adresse.sin_addr.s_addr == sa.sin_addr.s_addr) {
self = queue_array[i];
break;
}
}
if (self == NULL) {
channel = NETConnectWithFlags((char *)host, port_no, 0);
/* TODO handle asynchronous connection */
}
}
}
if (self == NULL) {
if (channel == NULL)
return NULL;
self = (pAsyncQueue) malloc(sizeof(AsyncQueue));
if (self == NULL)
return NULL;
memset(self, 0, sizeof(AsyncQueue));
self->pSock = channel;
self->pDes = CreateDescriptor("AsyncQueue");
queue_array[queue_index++] = self;
}
for (i = 0; i < queue_index; ++i)
if (queue_array[i] == self) {
break;
}
if (i == queue_index)
queue_array[queue_index++] = self;
return self;
}
static int AQ_Init(pAsyncQueue self)
{
/* Init the controller */
if (self->nw_ctx == NULL)
NetWatchRegisterCallback(&self->nw_ctx,
self->pSock->sockid,
MyCallback,
self);
return 1;
}
static void AQ_Kill(void* pData)
{
int i;
pAsyncQueue self = (pAsyncQueue) pData;
for (i = 0; i < queue_index; ++i)
if (queue_array[i] == self) {
--queue_index;
if (queue_index > 0)
queue_array[i] = queue_array[queue_index];
if (self->nw_ctx)
NetWatchRemoveCallback(self->nw_ctx);
if (self->nw_tmr)
NetWatchRemoveTimer(self->nw_tmr);
if (self->queue_name)
free(self->queue_name);
NETClosePort(self->pSock);
free(self->pSock);
DeleteDescriptor(self->pDes);
free(self);
return;
}
}
/*
* \brief make a AsyncQueue from the command line
*
* MakeAsyncQueue queueName protocolName hostName portname
*/
int AsyncQueueFactory(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[])
{
pAsyncQueue pNew = NULL;
mkChannel* channel = NULL;
pAsyncProtocol pPro = NULL;
int port_no;
int iRet = 0;
if (argc < 5) {
SCWrite(pCon,"ERROR: insufficient arguments to AsyncQueueFactory", eError);
return 0;
}
/* try to find an existing queue with this name */
pNew = (pAsyncQueue) FindCommandData(pServ->pSics, argv[1], "AsyncQueue");
if (pNew != NULL) {
char line[132];
snprintf(line, 132, "WARNING: AsyncQueue '%s' already exists", argv[1]);
SCWrite(pCon, line, eError);
SCSendOK(pCon);
return 1;
}
/* try to find an existing protocol with this name */
pPro = (pAsyncProtocol) FindCommandData(pServ->pSics, argv[2], "AsyncProtocol");
if (pPro == NULL) {
char line[132];
snprintf(line, 132, "WARNING: AsyncQueue protocol '%s' not found", argv[2]);
SCWrite(pCon, line, eError);
return 0;
}
port_no = atoi(argv[4]);
if (port_no == 0) {
struct servent *sp=NULL;
sp = getservbyname(argv[4], NULL);
if (sp)
port_no = ntohs(sp->s_port);
}
if (port_no > 0) {
struct sockaddr_in sa;
if (CreateSocketAdress(&sa, argv[3], port_no)) {
int i;
/* look for queue with same address */
for (i = 0; i < queue_index; ++i)
if (queue_array[i]->pSock->adresse.sin_port == sa.sin_port
&& queue_array[i]->pSock->adresse.sin_addr.s_addr == sa.sin_addr.s_addr) {
char line[132];
snprintf(line, 132, "WARNING: AsyncQueue '%s' has same address as %s",
argv[1],
queue_array[i]->queue_name);
SCWrite(pCon, line, eError);
}
}
/* TODO: implement asynchronous connection */
channel = NETConnectWithFlags(argv[3], port_no, 0);
}
if (channel == NULL) {
char line[132];
snprintf(line, 132, "ERROR: AsyncQueue '%s' cannot connect", argv[1]);
SCWrite(pCon, line, eError);
return 0;
}
pNew = (pAsyncQueue) malloc(sizeof(AsyncQueue));
if (pNew == NULL) {
char line[132];
snprintf(line, 132, "ERROR: AsyncQueue '%s' memory failure", argv[1]);
SCWrite(pCon, line, eError);
return 0;
}
memset(pNew, 0, sizeof(AsyncQueue));
pNew->pDes = CreateDescriptor("AsyncQueue");
pNew->queue_name = strdup(argv[1]);
pNew->protocol = pPro;
pNew->pSock = channel;
queue_array[queue_index++] = pNew;
AQ_Init(pNew);
/*
create the command
*/
iRet = AddCommand(pSics, argv[1], AsyncQueueAction, AQ_Kill, pNew);
if(!iRet)
{
char line[132];
snprintf(line, 123, "ERROR: add command %s failed", argv[1]);
SCWrite(pCon, line, eError);
AQ_Kill(pNew);
return 0;
}
SCSendOK(pCon);
return 1;
}
/*
* \brief make a AsyncQueue from a named rs232 controller
*
* \param name the name of the SICS "RS232 Controller" object
* \param handle the handle to the AsyncQueue object
* \return 0 for FAILURE, 1 for SUCCESS
*/
int AsyncUnitCreateHost(const char* host, const char* port, pAsyncUnit* handle)
{
int status;
pAsyncQueue self = NULL;
pAsyncUnit unit = NULL;
*handle = NULL;
self = AQ_Create(host, port);
if (self == NULL)
return 0;
status = AQ_Init(self);
unit = (pAsyncUnit) malloc(sizeof(AsyncUnit));
if (unit == NULL) {
SICSLogWrite("ERROR: Out of memory in AsyncUnitCreateHost", eError);
*handle = NULL;
return 0;
}
memset(unit, 0, sizeof(AsyncUnit));
++self->unit_count;
unit->queue = self;
unit->next = self->units;
self->units = unit;
*handle = unit;
return 1;
}
int AsyncUnitCreate(const char* host, pAsyncUnit* handle) {
return AsyncUnitCreateHost(host, NULL, handle);
}
int AsyncUnitDestroy(pAsyncUnit unit)
{
assert(unit);
assert(unit->queue);
pAsyncQueue self = unit->queue;
pAsyncUnit* pNxt = &self->units;
while (*pNxt) {
if (*pNxt == unit) {
*pNxt = (*pNxt)->next;
break;
}
pNxt = &(*pNxt)->next;
}
--self->unit_count;
if (self->unit_count <= 0) {
AQ_Kill(self);
}
free(unit);
return 1;
}

178
asyncqueue.h Normal file
View File

@ -0,0 +1,178 @@
/*
* A S Y N C Q U E U E
*
* This module manages communications on an asynchronous connection.
*
* Douglas Clowes, May 2007
*
*/
#ifndef SICSASYNCQUEUE
#define SICSASYNCQUEUE
#include "asyncprotocol.h"
#define AQU_TIMEOUT -1
#define AQU_DISCONNECT -2
#define AQU_RECONNECT -3
#define AQU_RETRY_CMD -4
#define AQU_POP_CMD -5
/** \brief create an AsyncUnit attached to a named AsyncQueue.
*
* \param queueName the name of the AsyncQueue to be used
* \param unit pointer to the AsyncUnit created on positive return
* \return positive if successful
*/
int AsyncUnitCreate(const char* queueName, pAsyncUnit* unit);
/** \brief create an AsyncUnit attached to an anonymous AsyncQueue.
*
* \param host name or address of the target host
* \param port number or service name on the target host
* \param unit pointer to the AsyncUnit created on positive return
* \return positive if successful
*/
int AsyncUnitCreateHost(const char* host,
const char* port,
pAsyncUnit* unit);
/** \brief destroys an AsyncUnit
*
* \param unit pointer to the AsyncUnit to be destroyed
*/
int AsyncUnitDestroy(pAsyncUnit unit);
/** \brief Queue a transaction at the head of the associated AsyncQueue
*
* \param unit AsyncUnit
* \param pTxn pointer to transaction
*/
int AsyncUnitEnqueueHead(pAsyncUnit unit, pAsyncTxn pTxn);
/** \brief Queue a transaction at the tail of the associated AsyncQueue
*
* \param unit AsyncUnit
* \param pTxn pointer to transaction
*/
int AsyncUnitEnqueueTxn(pAsyncUnit unit, pAsyncTxn pTxn);
/** \brief prepare a transaction according to the protocol (default is CRLF)
*
* \param unit AsyncUnit
* \param command text string to be sent
* \param cmd_len length of data in command
* \param responseHandler function to handle the response
* \param context to be used by handler function
* \param resp_len maximum length to be allowed for response
*/
pAsyncTxn AsyncUnitPrepareTxn(pAsyncUnit unit,
const char* command, int cmd_len,
AsyncTxnHandler responseHandler, void* context,
int rsp_len);
/** \brief prepare and queue a transaction
*
* \param unit AsyncUnit
* \param command text string to be sent
* \param cmd_len length of data in command
* \param responseHandler function to handle the response
* \param context to be used by handler function
* \param resp_len maximum length to be allowed for response
*/
int AsyncUnitSendTxn(pAsyncUnit unit,
const char* command, int cmd_len,
AsyncTxnHandler responseHandler, void* context,
int rsp_len);
/** \brief send a transaction and wait for the response
*
* \param unit AsyncUnit
* \param command text string to be sent
* \param cmd_len length of data in command
* \param responseHandler function to handle the response
* \param context to be used by handler function
* \param resp_len maximum length to be allowed for response
*/
int AsyncUnitTransact(pAsyncUnit unit,
const char* command, int cmd_len,
char* response, int rsp_len);
/** \brief write to the AsyncQueue file descriptor
*
* The data is transmitted directly to the descriptor without being queued.
* This may be used by the protocol transmit function or for retrieving error
* text associated with the current transmission.
*
* \param unit AsyncUnit
* \param data to be transmitted
* \param buflen lenght of data
*/
int AsyncUnitWrite(pAsyncUnit unit, void* buffer, int buflen);
/** \brief registers a notification callback
*
* The notification callback may notify unsolicited or unusual events
*
* \param unit AsyncUnit
* \param context passed in callback
* \param notify function to be called
*/
typedef void (*AQU_Notify)(void* context, int event);
void AsyncUnitSetNotify(pAsyncUnit unit, void* context, AQU_Notify notify);
/** \brief get the intertransaction delay in milliseconds
*/
int AsyncUnitGetDelay(pAsyncUnit unit);
/** \brief set the intertransaction delay in milliseconds
*/
void AsyncUnitSetDelay(pAsyncUnit unit, int iDelay);
/** \brief get the default transaction timeout in milliseconds
*/
int AsyncUnitGetTimeout(pAsyncUnit unit);
/** \brief set the default transaction timeout in milliseconds
*/
void AsyncUnitSetTimeout(pAsyncUnit unit, int timeout);
/** \brief get the number of retries
*/
int AsyncUnitGetRetries(pAsyncUnit unit);
/** \brief set the number of retries
*/
void AsyncUnitSetRetries(pAsyncUnit unit, int retries);
/** \brief get the associated protocol handler
*/
pAsyncProtocol AsyncUnitGetProtocol(pAsyncUnit unit);
/** \brief set the associated protocol handler
*/
void AsyncUnitSetProtocol(pAsyncUnit unit, pAsyncProtocol protocol);
/** \brief retrieves the socket/channel associated with the AsyncQueue
*
* \param unit AsyncUnit
* \return channel or NULL
*/
mkChannel* AsyncUnitGetSocket(pAsyncUnit unit);
/** \brief attempt to reconnect the socket of the associated AsyncQueue
*
* \param unit pointer to AsyncUnit
*/
int AsyncUnitReconnect(pAsyncUnit handle);
/** \brief create an AsyncQueue from the SICS command MakeAsyncQueue
*/
int AsyncQueueFactory(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]);
/** \brief SICS command handler for the AsyncQueue object
*/
int AsyncQueueAction(SConnection *pCon, SicsInterp *pSics,
void *pData, int argc, char *argv[]);
#endif /* SICSASYNCQUEUE */

119
conman.c
View File

@ -745,6 +745,94 @@ static void writeToLogFiles(SConnection *self, char *buffer)
} }
return 1; return 1;
} }
/*--------------------------------------------------------------------------*/
int SCACTWrite(SConnection *self, char *buffer, int iOut)
{
int i, iPtr, iRet;
char pBueffel[1024];
char *pPtr = pBueffel;
commandContext cx;
if(!VerifyConnection(self))
{
return 0;
}
if (buffer[0] == '\0' && iOut >= eStart && iOut <= eEvent) {
return 1; /* do not write empty line */
}
/* log it for any case */
if(self->pSock)
{
iRet = self->pSock->sockid;
}
else
{
iRet = 0;
}
sprintf(pBueffel,"Next line intended for socket: %d",iRet);
SICSLogWrite(pBueffel,eInternal);
SICSLogWrite(buffer,iOut);
/* write to commandlog if user or manager privilege */
if(SCGetRights(self) <= usUser)
{
if(self->iMacro != 1)
{
sprintf(pBueffel,"To sock %d :",iRet);
sendingConnection = self;
WriteToCommandLog(pBueffel,buffer);
sendingConnection = NULL;
}
else
{
if(iOut == eError || iOut == eWarning)
{
sprintf(pBueffel,"To sock %d :",iRet);
sendingConnection = self;
WriteToCommandLog(pBueffel,buffer);
sendingConnection = NULL;
}
}
}
/*
* copy in ACT
*/
if(strlen(buffer) + 30 > 1024){
pPtr = (char *)malloc((strlen(buffer)+30)*sizeof(char));
memset(pPtr,0,strlen(buffer)+20);
}
cx = SCGetContext(self);
sprintf(pPtr,"%d::>%s<::", cx.transID, buffer);
/* put it into the interpreter if present */
if(SCinMacro(self))
{
InterpWrite(pServ->pSics,buffer);
/* print it to client if error message */
if((iOut== eError) || (iOut == eWarning) )
{
iRet = doSockWrite(self,pPtr);
}
}
else /* not in interpreter, normal logic */
{
/* is this really to be printed ? */
if(iOut < self->iOutput)
return 0;
/* first the socket */
iRet = doSockWrite(self,pPtr);
writeToLogFiles(self,buffer);
}
if(pPtr != pBueffel){
free(pPtr);
}
return 1;
}
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
int SCWriteWithOutcode(SConnection *self, char *buffer, int iOut) int SCWriteWithOutcode(SConnection *self, char *buffer, int iOut)
{ {
@ -964,14 +1052,35 @@ pDynString SCEndBuffering(SConnection *pCon)
/* put into Serverlog */ /* put into Serverlog */
sprintf(pBueffel,"Next line intended for socket: %d",-10); sprintf(pBueffel,"Next line intended for socket: %d",-10);
SICSLogWrite(pBueffel,eInternal); SICSLogWrite(pBueffel,eInternal);
SICSLogWrite(buffer,iOut); SICSLogWrite(buffer,iOut);
/* log it for any case */
if(self->pSock)
{
iRet = self->pSock->sockid;
}
else
{
iRet = -10;
}
/* write to commandlog if user or manager privilege */ /* write to commandlog if user or manager privilege */
if(SCGetRights(self) <= usUser && self->iMacro != 1) if(SCGetRights(self) <= usUser)
{ {
sprintf(pBueffel,"To sock %d :",-10); if(self->iMacro != 1)
{
sprintf(pBueffel,"To sock %d :",iRet);
WriteToCommandLog(pBueffel,buffer); WriteToCommandLog(pBueffel,buffer);
} }
else
{
if(iOut == eError || iOut == eWarning)
{
sprintf(pBueffel,"To sock %d :",iRet);
WriteToCommandLog(pBueffel,buffer);
}
}
}
/* put it into the interpreter if present */ /* put it into the interpreter if present */
if(SCinMacro(self)) if(SCinMacro(self))
@ -1494,7 +1603,7 @@ pDynString SCEndBuffering(SConnection *pCon)
config OutCode val sets an new output code config OutCode val sets an new output code
config Rights User Password sets and verifies new user rights config Rights User Password sets and verifies new user rights
config File Filename Logs to another file config File Filename Logs to another file
config output normal | withcode Sets output mode config output normal | withcode | ACT Sets output mode
config listen 0 | 1 enables commandlog listen mode config listen 0 | 1 enables commandlog listen mode
---------------------------------------------------------------------------*/ ---------------------------------------------------------------------------*/
@ -1633,6 +1742,10 @@ pDynString SCEndBuffering(SConnection *pCon)
{ {
SCSetWriteFunc(pCon,SCWriteWithOutcode); SCSetWriteFunc(pCon,SCWriteWithOutcode);
} }
else if(strcmp(argv[2],"act") == 0)
{
SCSetWriteFunc(pCon,SCACTWrite);
}
else else
{ {
SCWrite(pCon,"ERROT: output mode not recognised",eError); SCWrite(pCon,"ERROT: output mode not recognised",eError);

View File

@ -123,6 +123,7 @@ typedef int (*writeFunc)(struct __SConnection *pCon,
int SCNotWrite(SConnection *self, char *buffer, int iOut); int SCNotWrite(SConnection *self, char *buffer, int iOut);
int SCNormalWrite(SConnection *self, char *buffer, int iOut); int SCNormalWrite(SConnection *self, char *buffer, int iOut);
int SCWriteWithOutcode(SConnection *self, char *buffer, int iOut); int SCWriteWithOutcode(SConnection *self, char *buffer, int iOut);
int SCACTWrite(SConnection *self, char *buffer, int iOut);
/*********************** I/O Buffering ***********************************/ /*********************** I/O Buffering ***********************************/
int SCStartBuffering(SConnection *pCon); int SCStartBuffering(SConnection *pCon);
pDynString SCEndBuffering(SConnection *pCon); pDynString SCEndBuffering(SConnection *pCon);

142
exeman.c
View File

@ -22,7 +22,9 @@
#include "exebuf.h" #include "exebuf.h"
#include "exeman.i" #include "exeman.i"
#include "exeman.h" #include "exeman.h"
#include "sicshipadaba.h"
#include "commandlog.h"
#include "protocol.h"
/*-------------------------------------------------------------------*/ /*-------------------------------------------------------------------*/
static void KillExeMan(void *data){ static void KillExeMan(void *data){
pExeMan self = (pExeMan)data; pExeMan self = (pExeMan)data;
@ -252,6 +254,134 @@ static int runBatchBuffer(pExeMan self, SConnection *pCon,
return status; return status;
} }
/*-------------------------------------------------------------------*/ /*-------------------------------------------------------------------*/
static char bufferNode[512];
static int SCHdbWrite(SConnection *self, char *message, int outCode){
pHdb node = NULL;
char pBueffel[512];
commandContext cc;
hdbValue v;
pDynString val = NULL;
writeFunc defWrite = NULL;
cc = SCGetContext(self);
node = GetHipadabaNode(GetHipadabaRoot(),cc.deviceID);
if(node == NULL || strstr(cc.deviceID,bufferNode) == NULL){
/*
* this means the deviceId is wrong and the output is for another
* operation.
*/
defWrite = GetProtocolWriteFunc(self);
if(defWrite == NULL){
defWrite = SCNormalWrite;
}
defWrite(self,message,outCode);
return 1;
}
SCFileWrite(self,message,outCode);
if(SCinMacro(self) && (outCode != eError && outCode != eWarning) ){
return 1;
}
v = MakeHdbText(strdup(""));
GetHipadabaPar(node,&v,NULL);
v.dataType = HIPTEXT;
val = CreateDynString(128,128);
if(val == NULL){
WriteToCommandLog("INTERNAL ERROR>>",
"No memory to append to log in SCHdbWrite");
return 0;
}
if(v.v.text != NULL){
DynStringConcat(val,v.v.text);
if(strrchr(v.v.text,(int)'\n') == NULL && strlen(v.v.text) > 2){
DynStringConcatChar(val,'\n');
}
}
DynStringConcat(val,message);
if(strrchr(message,(int)'\n') == NULL && strlen(message) > 2){
DynStringConcatChar(val,'\n');
}
if(v.v.text != NULL){
free(v.v.text);
}
v.v.text = GetCharArray(val);
UpdateHipadabaPar(node,v,NULL);
DeleteDynString(val);
return 1;
}
/*--------------------------------------------------------------------*/
static int runHdbBuffer(pExeMan self, SConnection *pCon,
SicsInterp *pSics, char *name){
char pBueffel[512];
pExeBuf buffer = NULL;
pHdb node = NULL;
hdbValue v;
int status;
commandContext cc;
writeFunc oldWrite;
if(!SCMatchRights(pCon,usUser)) {
return 0;
}
/*
* clear log buffer
*/
snprintf(pBueffel,511,"%s/log",name);
node = GetHipadabaNode(GetHipadabaRoot(),pBueffel);
if(node == NULL){
SCWrite(pCon,"ERROR: Hdb node not found or in wrong format",eError);
return 0;
}
v = MakeHdbText(strdup(""));
UpdateHipadabaPar(node,v,pCon);
/*
* prepare context
*/
cc = SCGetContext(pCon);
strcpy(cc.deviceID, pBueffel);
/*
* load commands into buffer
*/
snprintf(pBueffel,511,"%s/commands",name);
node = GetHipadabaNode(GetHipadabaRoot(),pBueffel);
if(node == NULL){
SCWrite(pCon,"ERROR: Hdb node not found or in wrong format",eError);
return 0;
}
GetHipadabaPar(node,&v,pCon);
if(v.dataType != HIPTEXT || v.v.text == NULL){
SCWrite(pCon,"ERROR: Hdb node is of wrong type or contains no data",eError);
return 0;
}
buffer = exeBufCreate(name);
if(!buffer){
SCWrite(pCon,"ERROR: out of memory creating batch buffer",eError);
return 0;
}
exeBufAppend(buffer,v.v.text);
strncpy(bufferNode,name,511);
oldWrite = SCGetWriteFunc(pCon);
SCSetWriteFunc(pCon,SCHdbWrite);
SCPushContext2(pCon,cc);
self->exeStackPtr++;
DynarPut(self->exeStack,self->exeStackPtr,buffer);
status = exeBufProcess(buffer,pSics,pCon,self->pCall,self->echo);
self->exeStackPtr--;
SCSetWriteFunc(pCon,oldWrite);
SCPopContext(pCon);
return status;
}
/*-------------------------------------------------------------------*/
int runExeBatchBuffer(void *pData, SConnection *pCon, int runExeBatchBuffer(void *pData, SConnection *pCon,
SicsInterp *pSics, char *name){ SicsInterp *pSics, char *name){
int status, oldEcho; int status, oldEcho;
@ -1090,6 +1220,16 @@ int ExeManagerWrapper(SConnection *pCon, SicsInterp *pSics, void *pData,
SCPrintf(pCon, eValue, "exe echo = %d", self->echo); SCPrintf(pCon, eValue, "exe echo = %d", self->echo);
} }
return 1; return 1;
}else if(strcmp(argv[1],"runhdb") == 0){
if (argc < 2) {
SCWrite(pCon,"ERROR: require path to root of queue node",eError);
SCSendOK(pCon);
}
status = runHdbBuffer(self,pCon,pSics,argv[2]);
if(self->exeStackPtr < 0){
SCWrite(pCon,"EXE TERMINATED",eWarning);
}
return status;
} else { } else {
status = runBatchBuffer(self,pCon,pSics,pBufferName); status = runBatchBuffer(self,pCon,pSics,pBufferName);
if(self->exeStackPtr < 0){ if(self->exeStackPtr < 0){

View File

@ -32,7 +32,8 @@ SOBJ = network.o ifile.o conman.o SCinter.o splitter.o passwd.o \
mcstashm.o initializer.o remob.o tclmotdriv.o protocol.o \ mcstashm.o initializer.o remob.o tclmotdriv.o protocol.o \
sinfox.o sicslist.o cone.o hipadaba.o sicshipadaba.o statistics.o \ sinfox.o sicslist.o cone.o hipadaba.o sicshipadaba.o statistics.o \
moregress.o hdbcommand.o multicounter.o regresscter.o histregress.o \ moregress.o hdbcommand.o multicounter.o regresscter.o histregress.o \
sicshdbadapter.o polldriv.o sicspoll.o statemon.o hmslave.o sicshdbadapter.o polldriv.o sicspoll.o statemon.o hmslave.o \
nwatch.o asyncqueue.o asyncprotocol.o
MOTOROBJ = motor.o simdriv.o MOTOROBJ = motor.o simdriv.o
COUNTEROBJ = countdriv.o simcter.o counter.o COUNTEROBJ = countdriv.o simcter.o counter.o

View File

@ -171,8 +171,10 @@
fputs(pBueffel,fd); fputs(pBueffel,fd);
sprintf(pBueffel,"%s AccessCode %f\n",name,ObVal(self->ParArray,USRIGHTS)); sprintf(pBueffel,"%s AccessCode %f\n",name,ObVal(self->ParArray,USRIGHTS));
fputs(pBueffel,fd); fputs(pBueffel,fd);
sprintf(pBueffel,"%s poscount %f\n",name, sprintf(pBueffel,"%s failafter %f\n",name,ObVal(self->ParArray,ECOUNT));
ObVal(self->ParArray,POSCOUNT)); fputs(pBueffel,fd);
sprintf(pBueffel,"%s maxretry %f\n",name,ObVal(self->ParArray,POSCOUNT));
fputs(pBueffel,fd);
sprintf(pBueffel,"%s movecount %f\n",name, sprintf(pBueffel,"%s movecount %f\n",name,
ObVal(self->ParArray,MOVECOUNT)); ObVal(self->ParArray,MOVECOUNT));
fputs(pBueffel,fd); fputs(pBueffel,fd);

3
napi.h
View File

@ -21,7 +21,7 @@
For further information, see <http://www.neutron.anl.gov/NeXus/> For further information, see <http://www.neutron.anl.gov/NeXus/>
$Id: napi.h,v 1.10 2006/03/03 15:30:55 koennecke Exp $ $Id: napi.h,v 1.11 2007/06/22 11:44:47 koennecke Exp $
----------------------------------------------------------------------------*/ ----------------------------------------------------------------------------*/
@ -117,6 +117,7 @@ typedef struct {
char iRef5[1024]; /* HDF5 variable */ char iRef5[1024]; /* HDF5 variable */
char iRefd[1024]; /* HDF5 variable */ char iRefd[1024]; /* HDF5 variable */
char targetPath[1024]; /* XML path */ char targetPath[1024]; /* XML path */
int linkType;
} NXlink; } NXlink;
#define NXMAXSTACK 50 #define NXMAXSTACK 50

1256
napi5.c

File diff suppressed because it is too large Load Diff

View File

@ -609,7 +609,7 @@ int NETReadTillTerm(mkChannel *self, long timeout,
pBuffer[bufPtr] = '\0'; pBuffer[bufPtr] = '\0';
return bufPtr; return bufPtr;
} else { } else {
matchIndex == 1; matchIndex = 1;
} }
} }
} }
@ -668,6 +668,89 @@ int NETReadTillTerm(mkChannel *self, long timeout,
return 1; return 1;
} }
/*---------------------------------------------------------------------------*/
int NETReconnectWithFlags(mkChannel* self, int flags)
{
int iRet;
int sock;
int oldopts;
/*
* Get the flags and close the old socket
*/
oldopts = fcntl(self->sockid, F_GETFL, 0);
close(self->sockid);
/* Reopen and try to get it on the olf fd */
sock = socket(AF_INET,SOCK_STREAM,0);
if (self->sockid != sock) {
iRet = fcntl(sock, F_DUPFD, self->sockid);
if (iRet != sock)
self->sockid = sock;
else
close(sock);
sock = self->sockid;
}
/* restore the old flags */
fcntl(self->sockid, F_SETFL, oldopts);
/* set socket non-blocking */
oldopts = fcntl(self->sockid, F_GETFL, 0);
if (/*(flags & 1) &&*/ !(oldopts & O_NONBLOCK))
fcntl(self->sockid, F_SETFL, oldopts | O_NONBLOCK);
/* try to reconnect */
iRet = connect(self->sockid,
(struct sockaddr *)&(self->adresse),
sizeof(struct sockaddr_in));
if (iRet < 0) {
if (errno == EINPROGRESS) {
if ((flags & 1)) {
iRet = 0; /* in progress */
} else {
fd_set rmask;
fd_set wmask;
struct timeval tmo = {1,0};
FD_ZERO(&rmask);
FD_ZERO(&wmask);
FD_SET(self->sockid, &rmask);
FD_SET(self->sockid, &wmask);
iRet = select(self->sockid+1, &rmask, &wmask, NULL, &tmo);
if (iRet < 0) /* error */
iRet = -1;
else if (iRet == 0) /* timeout */
iRet = 0; /* in progress */
else {
char reply[1];
if (FD_ISSET(self->sockid, &rmask)) {
iRet = recv(self->sockid, reply, 1, MSG_PEEK);
if (iRet <= 0)
iRet = -1; /* failure */
}
if (FD_ISSET(self->sockid, &wmask)) {
iRet = send(self->sockid, NULL, 0, 0);
if (iRet < 0)
iRet = -1; /* failure */
else
iRet = 1; /* success */
}
}
}
}
else /* other error */
iRet = -1; /* error */
}
else
iRet = 1; /* success */
if (iRet != 0 && !(oldopts & O_NONBLOCK))
fcntl(self->sockid, F_SETFL, oldopts);
return iRet;
}
int NETReconnect(mkChannel* self)
{
return NETReconnectWithFlags(self, 0);
}
/* ################### UDP -functions ######################################*/ /* ################### UDP -functions ######################################*/
mkChannel *UDPOpen(int iPort) mkChannel *UDPOpen(int iPort)
{ {

View File

@ -76,10 +76,22 @@
of hostname are copied to pComposter of hostname are copied to pComposter
*/ */
int NETReconnect(mkChannel* self);
/* If a connection has been lost, try to reconnect using the same
* socket id if possible. Blocks for up to one second.
* returns 0 if in progress, 1 on success, a negative value on error
*/
int NETReconnectWithFlags(mkChannel* self, int flags);
/* If a connection has been lost, try to reconnect using the same
* socket id if possible. If (flags & 1) do not block, use
* NETConnectFinished to check success.
* returns 0 if in progress, 1 on success, a negative value on error
*/
/* *********************** DATA TRANSFER ******************************** */ /* *********************** DATA TRANSFER ******************************** */
int NETWrite(mkChannel *self, char *buffer, long lLen); int NETWrite(mkChannel *self, char *buffer, long lLen);
/* writes data to socket self, returns True if succes, /* writes data to socket self, returns True if success,
false otherwise. false otherwise.
*/ */

View File

@ -45,6 +45,8 @@
extern void StopExit(void); /* in SICSmain.c */ extern void StopExit(void); /* in SICSmain.c */
extern int openDevexecLog(); /* in devexec.c */ extern int openDevexecLog(); /* in devexec.c */
extern void NetWatchInit(void); /* in nwatch.c */
/* ========================= Less dreadful file statics =================== */ /* ========================= Less dreadful file statics =================== */
#define DEFAULTINIFILE "servo.tcl" #define DEFAULTINIFILE "servo.tcl"
@ -111,6 +113,9 @@
pSICSOptions = IFAddOption(pSICSOptions, "ConnectionCount","0"); pSICSOptions = IFAddOption(pSICSOptions, "ConnectionCount","0");
/* initialize the network watcher */
NetWatchInit();
/* initialise the server from script */ /* initialise the server from script */
if(file == NULL) if(file == NULL)
{ {

464
nwatch.c Normal file
View File

@ -0,0 +1,464 @@
/*
* N E T W A T C H E R
*
* This module watches network connections for sockets becoming readable or
* writeable and invokes callbacks. It also provides a timer mechanism.
*
* Douglas Clowes, February 2007
*
*/
#include <stdlib.h>
#include <assert.h>
#ifdef CYGNUS
#include <sys/socket.h>
#else
#include <sys/select.h>
#endif
#include <sys/time.h>
#include "fortify.h"
#include "nwatch.h"
#include "sics.h"
#define NWMAGIC 51966
/* Net Watcher control structure */
typedef struct __netwatcher_s {
pNWContext cq_head; /* head of socket context queue */
pNWContext cq_tail; /* tail of socket context queue */
int nInvalid; /* number of invalidated entries */
pNWTimer tq_head; /* head of timer context queue */
pNWTimer tq_tail; /* tail of timer context queue */
long lMagic; /* integrity check */
} NetWatch, *pNetWatch;
/* Singleton pattern */
static pNetWatch instance = NULL;
static int NetWatchTask(void* pData);
/**
* \brief Initialises the Net Watcher singleton and starts the task
*
* \return 1=success, 0=failure
*/
int NetWatchInit(void) {
/*
* If the singleton has not yet been created, do so now
*/
if (instance == NULL) {
instance = (pNetWatch) malloc(sizeof(NetWatch));
if (instance == NULL)
return 0;
memset(instance, 0, sizeof(NetWatch));
instance->lMagic = NWMAGIC;
TaskRegister(pServ->pTasker, NetWatchTask, NULL, NULL, NULL, 1);
}
return 1;
}
/*
* The timer context object private definition
*/
typedef struct __netwatchtimer {
pNWTimer next; /* chain to next event */
struct timeval tv; /* time when event is due */
pNWCallback func; /* function to call */
void* cntx; /* abstract context to pass to callback */
long int tick; /* millisecond repeat rate */
long int vrfy; /* integrity check */
} NWTimer;
/*
* \brief private function to insert an entry into the sorted timer queue.
*
* \param self singleton
* \param handle new timer to insert
*/
static int NetWatchTimerInsQue(pNetWatch self, pNWTimer handle)
{
/* if the queue is empty, just stick new one in */
if (self->tq_head == NULL) {
self->tq_head = self->tq_tail = handle;
handle->next = NULL;
return 1;
}
/* if new one is not earlier than latest one, insert after latest */
if (handle->tv.tv_sec > self->tq_tail->tv.tv_sec ||
(handle->tv.tv_sec == self->tq_tail->tv.tv_sec &&
handle->tv.tv_usec >= self->tq_tail->tv.tv_usec)) {
self->tq_tail->next = handle;
self->tq_tail = handle;
handle->next = NULL;
return 1;
}
/* if new one is not later than earliest one, insert before earliest */
if (handle->tv.tv_sec < self->tq_head->tv.tv_sec ||
(handle->tv.tv_sec == self->tq_head->tv.tv_sec &&
handle->tv.tv_usec <= self->tq_head->tv.tv_usec)) {
handle->next = self->tq_head;
self->tq_head = handle;
return 1;
}
else
{
/* must be in between two so start at the first entry */
pNWTimer pNxt = self->tq_head;
/* follow chain until the one after this one is greater than new one */
while (pNxt->next &&
(handle->tv.tv_sec > pNxt->next->tv.tv_sec ||
(handle->tv.tv_sec == pNxt->next->tv.tv_sec &&
handle->tv.tv_usec > pNxt->next->tv.tv_usec)))
pNxt = pNxt->next;
/* slip new one in between this one and the next one */
handle->next = pNxt->next;
pNxt->next = handle ;
}
return 1;
}
/*
* \brief private function to remove an entry from the sorted timer queue.
*
* \param self singleton
* \param handle existing timer to remove
*/
static int NetWatchTimerRemQue(pNetWatch self, pNWTimer handle)
{
/* handle the case of first and possibly only */
if (handle == self->tq_head) {
self->tq_head = self->tq_head->next; /* may be NULL */
if (handle == self->tq_tail)
self->tq_tail = NULL;
}
/* handle general case */
else {
pNWTimer pNxt = self->tq_head;
while (pNxt) {
if (handle == pNxt->next) {
pNxt->next = pNxt->next->next;
break;
}
pNxt = pNxt->next;
}
/* It it was the last entry, point tail to its predecessor */
if (handle == self->tq_tail)
self->tq_tail = pNxt;
}
return 1;
}
int NetWatchRegisterTimer(pNWTimer* handle, int mSec,
pNWCallback callback, void* context)
{
pNetWatch self = instance;
if(!self || self->lMagic != NWMAGIC)
return 0;
pNWTimer pNew = (pNWTimer) malloc(sizeof(NWTimer));
if (pNew == NULL)
return 0;
memset(pNew, 0, sizeof(NWTimer));
gettimeofday(&pNew->tv, NULL);
pNew->tv.tv_sec += mSec / 1000;
pNew->tv.tv_usec += 1000 * (mSec % 1000);
if (pNew->tv.tv_usec > 1000000) {
pNew->tv.tv_sec ++;
pNew->tv.tv_usec -= 1000000;
}
pNew->tick = 0;
pNew->func = callback;
pNew->cntx = context;
pNew->vrfy = NWMAGIC;
NetWatchTimerInsQue(self, pNew);
*handle = pNew;
return 1;
}
int NetWatchRegisterTimerPeriodic(pNWTimer* handle, int mSecInitial, int mSecPeriod,
pNWCallback callback, void* context)
{
if (NetWatchRegisterTimer(handle, mSecInitial, callback, context)) {
pNWTimer pNew = *handle;
if (pNew == NULL)
return 0;
if (mSecPeriod > 0)
pNew->tick = mSecPeriod;
return 1;
}
return 0;
}
int NetWatchGetTimerPeriod(pNWTimer handle)
{
if (handle == NULL || handle->vrfy != NWMAGIC)
return 0;
return handle->tick;
}
int NetWatchSetTimerPeriod(pNWTimer handle, int mSecPeriod)
{
if (handle == NULL || handle->vrfy != NWMAGIC)
return 0;
handle->tick = mSecPeriod;
return 1;
}
int NetWatchRemoveTimer(pNWTimer handle)
{
pNetWatch self = instance;
if (!self || self->lMagic != NWMAGIC)\
return 0;
NetWatchTimerRemQue(self, handle);
handle->vrfy = 0;
free(handle);
return 1;
}
/* private data */
typedef struct __netwatchcontext {
pNWContext next; /* chain pointer */
int sock; /* socket to watch */
int mode; /* read or write */
pNWCallback func; /* user supplied callback function */
void* cntx; /* user supplied callback context */
long vrfy; /* integrity check */
} NWContext;
/**
* \brief private function to insert entry into unsorted queue
*
* \param self singleton
* \param handle entry to insert
*/
static int NetWatchContextInsQue(pNetWatch self, pNWContext handle)
{
if (self->cq_head == NULL) /* empty */
self->cq_head = self->cq_tail = handle;
else {
self->cq_tail->next = handle;
self->cq_tail = handle;
}
return 1;
}
/**
* \brief private function to remove entry from unsorted queue
*
* \param self singleton
* \param handle entry to insert
*/
static void NetWatchContextRemQue(pNetWatch self, pNWContext handle)
{
if (handle == self->cq_head) { /* if first */
self->cq_head = self->cq_head->next;
if (handle == self->cq_tail) /* if also last */
self->cq_tail = NULL;
}
else {
pNWContext pNxt = self->cq_head;
while (pNxt) {
if (handle == pNxt->next) {
pNxt->next = pNxt->next->next;
break;
}
pNxt = pNxt->next;
}
if (handle == self->cq_tail) /* if last */
self->cq_tail = pNxt;
}
return;
}
/**
* \brief private function to purge invalid entries
*
* \param self singleton
*/
static void NetWatchContextPrgQue(pNetWatch self)
{
pNWContext pNxt = NULL;
/* while the first entry is invalid remove it */
while (self->cq_head && self->cq_head->sock < 0) {
pNWContext tmp = NULL;
tmp = self->cq_head;
self->cq_head = self->cq_head->next;
tmp->vrfy = 0;
free(tmp);
}
pNxt = self->cq_head;
while (pNxt) {
if (pNxt->next && pNxt->next->sock < 0) {
pNWContext tmp = NULL;
tmp = pNxt->next;
pNxt->next = pNxt->next->next;
tmp->vrfy = 0;
free(tmp);
}
pNxt = pNxt->next;
}
/* if the queue is empty clear the tail */
if (self->cq_head == NULL)
self->cq_tail = pNxt;
self->nInvalid = 0;
return;
}
int NetWatchRegisterCallback(pNWContext* handle, int iSocket,
pNWCallback callback, void* context)
{
pNWContext pNew = NULL;
pNetWatch self = instance;
if(!self || self->lMagic != NWMAGIC)
return 0;
if (iSocket < 0 || iSocket > 65535)
return 0;
pNew = (pNWContext) malloc(sizeof(NWContext));
if (pNew == NULL)
return 0;
memset(pNew, 0, sizeof(NWContext));
pNew->sock = iSocket;
pNew->mode = nwatch_read;
pNew->func = callback;
pNew->cntx = context;
pNew->vrfy = NWMAGIC;
*handle = pNew;
NetWatchContextInsQue(self, pNew);
return 1;
}
int NetWatchRemoveCallback(pNWContext handle)
{
pNetWatch self = instance;
if (handle == NULL || handle->vrfy != NWMAGIC)
return 0;
if(!self || self->lMagic != NWMAGIC)
return 0;
handle->sock = -1;
self->nInvalid++;
return 1;
}
int NetWatchGetMode(pNWContext handle)
{
if (handle == NULL || handle->vrfy != NWMAGIC)
return 0;
return handle->mode;
}
int NetWatchSetMode(pNWContext handle, int mode)
{
if (handle == NULL || handle->vrfy != NWMAGIC)
return 0;
handle->mode = mode;
return 1;
}
/**
* \brief the registered SICS Task to drive all this
*/
int NetWatchTask (void* pData)
{
pNetWatch self = NULL;
pNWContext pNWC = NULL;
fd_set rMask;
fd_set wMask;
struct timeval tmo = {0,0};
int iRet;
int iCount;
/* Check the singleton */
self = (pNetWatch) instance;
if(!self || self->lMagic != NWMAGIC)
return 0;
/* Purge the invalidated */
if (self->nInvalid > 0)
NetWatchContextPrgQue(self);
/* build the select mask */
FD_ZERO(&rMask);
FD_ZERO(&wMask);
pNWC = self->cq_head;
iCount = -1;
while(pNWC) {
if (pNWC->sock >= 0 && pNWC->sock <= 65535) {
if (pNWC->mode & nwatch_read)
FD_SET(pNWC->sock,&rMask);
if (pNWC->mode & nwatch_write)
FD_SET(pNWC->sock,&wMask);
if(pNWC->sock > iCount) {
iCount = pNWC->sock;
}
}
pNWC = pNWC->next;
}
iRet = 0;
if (iCount >= 0)
iRet = select(iCount+1, &rMask, &wMask, NULL, &tmo);
if(iRet > 0) {
/* invoke the active callbacks */
iCount = 0;
pNWC = self->cq_head;
while(pNWC)
{
if (pNWC->sock >= 0 && pNWC->sock <= 65535) {
int action_mode = 0;
if ((pNWC->mode & nwatch_read) && FD_ISSET(pNWC->sock, &rMask))
action_mode |= nwatch_read;
if ((pNWC->mode & nwatch_write) && FD_ISSET(pNWC->sock, &wMask))
action_mode |= nwatch_write;
if (action_mode != 0) {
int iStatus;
iStatus = (*pNWC->func)(pNWC->cntx, action_mode);
}
}
pNWC = pNWC->next;
}
}
/* Now do the timers */
if (self->tq_head) {
int iStatus;
struct timeval tv;
gettimeofday(&tv, NULL);
while (self->tq_head) {
pNWTimer pNew = self->tq_head;
if (tv.tv_sec < pNew->tv.tv_sec ||
(tv.tv_sec == pNew->tv.tv_sec &&
tv.tv_usec < pNew->tv.tv_usec)) {
break;
}
NetWatchTimerRemQue(self, pNew);
iStatus = pNew->func(pNew->cntx, 0);
/*
* If this is a recurrent timer and the function
* indicates to keep it going, put it back in
*/
if (pNew->tick && iStatus == 1) {
/*
* While the expiration time is in the past, increment
*/
gettimeofday(&tv, NULL);
while (tv.tv_sec > pNew->tv.tv_sec ||
(tv.tv_sec == pNew->tv.tv_sec &&
tv.tv_usec > pNew->tv.tv_usec)) {
pNew->tv.tv_usec += 1000 * pNew->tick;
if (pNew->tv.tv_usec > 1000000) {
pNew->tv.tv_sec += pNew->tv.tv_usec / 1000000;
pNew->tv.tv_usec %= 1000000;
}
}
NetWatchTimerInsQue(self, pNew);
}
else {
pNew->vrfy = 0;
free(pNew);
}
}
}
/* done, finally */
return 1;
}

105
nwatch.h Normal file
View File

@ -0,0 +1,105 @@
/*
* N E T W A T C H E R
*
* This module watches network connections for sockets becoming readable or
* writeable and invokes callbacks. It also provides a timer mechanism.
*
* Douglas Clowes, February 2007
*
*/
#ifndef SICSNETWATCHER
#define SICSNETWATCHER
#define nwatch_read 1
#define nwatch_write 2
/**
* \brief network or timer callback function
*
* \param context from the network/timer registration
* \param mode
* for network, nwatch_read or nwatch_write
* for timer, zero, reserved for future use
*
* \return normally zero, for future use
*/
typedef int (*pNWCallback)(void* context, int mode);
/* the abstract timer object handle */
typedef struct __netwatchtimer *pNWTimer;
/**
* \brief register a one-shot timer event
*
* \param handle pointer to location to receive the timer object handle
* \param mSec milliseconds after which the timer should expire
* \param callback function when timer expires
* \param context abstract context passed to callback function
* \return success=1, failure=0
*/
int NetWatchRegisterTimer(pNWTimer* handle, int mSec,
pNWCallback callback, void* context);
/**
* \brief register a periodic timer
*
* \param handle pointer to location to receive the timer object handle
* \param mSec milliseconds after which the timer should expire
* \param mSecPeriod milliseconds after which the timer should repeat
* \param callback function when timer expires
* \param context abstract context passed to callback function
* \return success=1, failure=0
*/
int NetWatchRegisterTimerPeriodic(pNWTimer* handle, int mSecInitial, int mSecPeriod,
pNWCallback callback, void* context);
int NetWatchGetTimerPeriod(pNWTimer handle);
int NetWatchSetTimerPeriod(pNWTimer handle, int mSecPeriod);
/**
* \brief remove a registered timer event
*
* \param handle from the timer registration
* \return success=1, failure=0
*/
int NetWatchRemoveTimer(pNWTimer handle);
/* the abstract socket object handle */
typedef struct __netwatchcontext *pNWContext;
/**
* \brief register a socket to be watched in read mode
*
* \param handle pointer to location to receive the socket object handle
* \param iSocket file descriptor number of the socket to watch
* \param callback function when socket readable/writeable
* \param context abstract context passed to callback function
* \return success=1, failure=0
*/
int NetWatchRegisterCallback(pNWContext* handle, int iSocket,
pNWCallback callback, void* context);
/**
* \brief remove a socket callback registration
*
* \param handle from the socket registration
* \return success=1, failure=0
*/
int NetWatchRemoveCallback(pNWContext handle);
/**
* \brief retrieve the mode of a socket callback registration
*
* \param handle from the socket registration
* \return 0=failure else the mode (read and/or write)
*/
int NetWatchGetMode(pNWContext handle);
/**
* \brief set the mode of a socket callback registration
*
* \param handle from the socket registration
* \param mode read and/or write
* \return 0=failure, 1=success
*/
int NetWatchSetMode(pNWContext handle, int mode);
#endif /* SICSNETWATCHER */

10
ofac.c
View File

@ -122,6 +122,8 @@
#include "multicounter.h" #include "multicounter.h"
#include "sicspoll.h" #include "sicspoll.h"
#include "statemon.h" #include "statemon.h"
#include "asyncqueue.h"
#include "asyncprotocol.h"
/*----------------------- Server options creation -------------------------*/ /*----------------------- Server options creation -------------------------*/
static int IFServerOption(SConnection *pCon, SicsInterp *pSics, void *pData, static int IFServerOption(SConnection *pCon, SicsInterp *pSics, void *pData,
int argc, char *argv[]) int argc, char *argv[])
@ -250,6 +252,7 @@
AddCommand(pInter,"help",SicsHelp,KillHelp,NULL); AddCommand(pInter,"help",SicsHelp,KillHelp,NULL);
AddCommand(pInter,"list",SicsList,NULL,NULL); AddCommand(pInter,"list",SicsList,NULL,NULL);
AddCommand(pInter,"InstallHdb",InstallSICSHipadaba,NULL,NULL); AddCommand(pInter,"InstallHdb",InstallSICSHipadaba,NULL,NULL);
MakeProtocol(pInter);
/* commands to do with the executor. Only StopExe carries the /* commands to do with the executor. Only StopExe carries the
DeleteFunction in order to avoid double deletion. All the DeleteFunction in order to avoid double deletion. All the
@ -326,8 +329,6 @@
McStasReaderFactory,NULL,NULL); McStasReaderFactory,NULL,NULL);
AddCommand(pInter,"MakeMcStasController", AddCommand(pInter,"MakeMcStasController",
McStasControllerFactory,NULL,NULL); McStasControllerFactory,NULL,NULL);
AddCommand(pInter,"InstallProtocolHandler",
InstallProtocol,NULL,NULL);
AddCommand(pInter,"InstallSinfox", AddCommand(pInter,"InstallSinfox",
InstallSinfox,NULL,NULL); InstallSinfox,NULL,NULL);
AddCommand(pInter,"MakeCone", AddCommand(pInter,"MakeCone",
@ -338,6 +339,8 @@
InstallSICSPoll,NULL,NULL); InstallSICSPoll,NULL,NULL);
AddCommand(pInter,"MakeStateMon", AddCommand(pInter,"MakeStateMon",
StateMonFactory,NULL,NULL); StateMonFactory,NULL,NULL);
AddCommand(pInter,"MakeAsyncProtocol",AsyncProtocolFactory,NULL,NULL);
AddCommand(pInter,"MakeAsyncQueue",AsyncQueueFactory,NULL,NULL);
/* /*
install site specific commands install site specific commands
@ -403,11 +406,12 @@
RemoveCommand(pSics,"MakeTasUB"); RemoveCommand(pSics,"MakeTasUB");
RemoveCommand(pSics,"MakeTasScan"); RemoveCommand(pSics,"MakeTasScan");
RemoveCommand(pSics,"MakemcStasReader"); RemoveCommand(pSics,"MakemcStasReader");
RemoveCommand(pSics,"InstallProtocolHandler");
RemoveCommand(pSics,"InstallSinfox"); RemoveCommand(pSics,"InstallSinfox");
RemoveCommand(pSics,"MakeCone"); RemoveCommand(pSics,"MakeCone");
RemoveCommand(pSics,"MakeMultiCounter"); RemoveCommand(pSics,"MakeMultiCounter");
RemoveCommand(pSics,"MakeStateMon"); RemoveCommand(pSics,"MakeStateMon");
RemoveCommand(pSics,"MakeAsyncQueue");
RemoveCommand(pSics,"MakeAsyncProtocol");
/* /*
remove site specific installation commands remove site specific installation commands
*/ */

View File

@ -21,8 +21,8 @@
#define MAXMSG 1024 #define MAXMSG 1024
#define INIT_STR_SIZE 256 #define INIT_STR_SIZE 256
#define STR_RESIZE_LENGTH 256 #define STR_RESIZE_LENGTH 256
#define NUMPROS 5 #define NUMPROS 6
#define PROLISTLEN 6 #define PROLISTLEN 7
typedef struct __Protocol { typedef struct __Protocol {
pObjectDescriptor pDes; /* required as first field */ pObjectDescriptor pDes; /* required as first field */
char *name; /* protocol handler name */ char *name; /* protocol handler name */
@ -106,6 +106,7 @@ pProtocol CreateProtocol(void)
"withcode", "withcode",
"sycamore", "sycamore",
"json", "json",
"act",
NULL NULL
}; };
pProtocol pNew = NULL; pProtocol pNew = NULL;
@ -217,7 +218,16 @@ int InstallProtocol(SConnection *pCon, SicsInterp *pSics, void *pData,
SCSendOK(pCon); SCSendOK(pCon);
return 1; return 1;
} }
/*------------------------------------------------------------------------*/
void MakeProtocol(SicsInterp *pSics){
pProtocol pNew = NULL;
pNew = CreateProtocol();
if(NULL!= pNew)
{
AddCommand(pSics,"Protocol",ProtocolAction,DeleteProtocol,pNew);
AddCommand(pSics,"contextdo",ContextDo,NULL,NULL);
}
}
/*------------------------------------------------------------------------*/ /*------------------------------------------------------------------------*/
static int ProtocolOptions(SConnection* pCon, pProtocol pPro) static int ProtocolOptions(SConnection* pCon, pProtocol pPro)
{ {
@ -281,6 +291,9 @@ static int ProtocolSet(SConnection* pCon, Protocol* pPro, char *pProName)
case 4: /* json */ case 4: /* json */
SCSetWriteFunc(pCon,SCWriteJSON_String); SCSetWriteFunc(pCon,SCWriteJSON_String);
break; break;
case 5:
SCSetWriteFunc(pCon,SCACTWrite);
break;
case 0: /* default = psi_sics */ case 0: /* default = psi_sics */
default: default:
SCSetWriteFunc(pCon,pPro->defaultWriter); SCSetWriteFunc(pCon,pPro->defaultWriter);
@ -327,6 +340,7 @@ int ProtocolGet(SConnection* pCon, void* pData, char *pProName, int len)
case 2: /* outcodes */ case 2: /* outcodes */
case 3: /* sycamore */ case 3: /* sycamore */
case 4: /* json */ case 4: /* json */
case 5: /* act */
pProName = pPro->pProList[Index]; pProName = pPro->pProList[Index];
return 1; return 1;
break; break;
@ -785,3 +799,26 @@ int GetProtocolID(SConnection* pCon)
} }
return -1; return -1;
} }
/*---------------------------------------------------------------------------*/
writeFunc GetProtocolWriteFunc(SConnection *pCon){
if(pCon != NULL){
switch(pCon->iProtocolID){
case 2: /* outcodes */
return SCWriteWithOutcode;
break;
case 3: /* sycamore */
return SCWriteSycamore;
break;
case 4: /* json */
return SCWriteJSON_String;
break;
case 5:
return SCACTWrite;
break;
default:
return SCNormalWrite;
break;
}
}
return SCNormalWrite;
}

View File

@ -19,11 +19,11 @@ static char *pProTags[3] = {
int InstallProtocol(SConnection *pCon, SicsInterp *pSics, void *pData, int InstallProtocol(SConnection *pCon, SicsInterp *pSics, void *pData,
int argc, char *argv[]); int argc, char *argv[]);
void DeleteProtocol(void *pSelf); void DeleteProtocol(void *pSelf);
void MakeProtocol(SicsInterp *pSics);
/*--------------------- operations --------------------------------------*/ /*--------------------- operations --------------------------------------*/
int ProtocolAction(SConnection *pCon, SicsInterp *pSics, void *pData, int ProtocolAction(SConnection *pCon, SicsInterp *pSics, void *pData,
int argc, char *argv[]); int argc, char *argv[]);
/*--------------------- implement protocol sycamore ---------------------*/ /*--------------------- implement protocol sycamore ---------------------*/
int SCWriteSycamore(SConnection *pCon, char *pBuffer, int iOut); int SCWriteSycamore(SConnection *pCon, char *pBuffer, int iOut);
@ -31,5 +31,6 @@ int SCWriteSycamore(SConnection *pCon, char *pBuffer, int iOut);
char * GetProtocolName(SConnection *pCon); char * GetProtocolName(SConnection *pCon);
int GetProtocolID(SConnection *pCon); int GetProtocolID(SConnection *pCon);
int ProtocolGet(SConnection* pCon, void* pData, char *pProName, int len); int ProtocolGet(SConnection* pCon, void* pData, char *pProName, int len);
writeFunc GetProtocolWriteFunc(SConnection *pCon);
/*-----------------------------------------------------------------------*/ /*-----------------------------------------------------------------------*/
#endif #endif

View File

@ -180,8 +180,16 @@ static void netEncode(pSICSData self){
/*---------------------------------------------------------------------*/ /*---------------------------------------------------------------------*/
void clearSICSData(pSICSData self){ void clearSICSData(pSICSData self){
assert(self); assert(self);
int clearSize = 8192;
self->dataUsed = 0; self->dataUsed = 0;
if(self->currentDataSize > clearSize){
free(self->data);
free(self->dataType);
self->data = (int *)malloc(clearSize*sizeof(int));
self->dataType = (char *)malloc(clearSize*sizeof(char));
self->currentDataSize = clearSize;
}
memset(self->data,0,self->currentDataSize*sizeof(int)); memset(self->data,0,self->currentDataSize*sizeof(int));
memset(self->dataType,0,self->currentDataSize*sizeof(char)); memset(self->dataType,0,self->currentDataSize*sizeof(char));
} }

View File

@ -21,6 +21,8 @@
#include "motor.h" #include "motor.h"
#include "HistMem.h" #include "HistMem.h"
#include "sicsvar.h" #include "sicsvar.h"
#include "counter.h"
#include "lld.h"
#include "sicshipadaba.h" #include "sicshipadaba.h"
#include "sicshdbadapter.h" #include "sicshdbadapter.h"
@ -411,6 +413,54 @@ static pHdb MakeSicsVarNode(pSicsVariable pVar, char *name){
node->protected = 1; node->protected = 1;
return node; return node;
} }
/*================ counter =============================================*/
typedef struct {
pHdb node;
int monitor; /* -1 == time */
pCounter counter;
} CountEntry;
static int countList = -10;
/*---------------------------------------------------------------------*/
static void updateCountList(){
int status;
hdbValue v;
CountEntry hugo;
long monitor;
float time;
SConnection *pDummy = NULL;
if(countList < 0){
return;
}
pDummy = SCCreateDummyConnection(pServ->pSics);
if(pDummy == NULL){
return;
}
status = LLDnodePtr2First(countList);
while(status != 0){
LLDnodeDataTo(countList,&hugo);
if(hugo.monitor < 0){
time = GetCountTime(hugo.counter,pDummy);
v = MakeHdbFloat((double)time);
UpdateHipadabaPar(hugo.node,v, NULL);
} else {
monitor = GetMonitor(hugo.counter, hugo.monitor, pDummy);
v = MakeHdbInt((int)monitor);
UpdateHipadabaPar(hugo.node,v, NULL);
}
status = LLDnodePtr2Next(countList);
}
SCDeleteConnection(pDummy);
}
/*---------------------------------------------------------------------------*/
static int CounterCallback(int iEvent, void *eventData, void *userData,
commandContext cc){
if(iEvent == MONITOR || iEvent == COUNTEND || iEvent == COUNTSTART){
updateCountList();
}
return 1;
}
/*============== interpreter function ==================================*/ /*============== interpreter function ==================================*/
int SICSHdbAdapter(SConnection *pCon, SicsInterp *pSics, void *pData, int SICSHdbAdapter(SConnection *pCon, SicsInterp *pSics, void *pData,
int argc, char *argv[]){ int argc, char *argv[]){
@ -423,6 +473,8 @@ int SICSHdbAdapter(SConnection *pCon, SicsInterp *pSics, void *pData,
pIDrivable pDriv = NULL; pIDrivable pDriv = NULL;
pSicsVariable pVar = NULL; pSicsVariable pVar = NULL;
char buffer[512]; char buffer[512];
pCounter pCount = NULL;
CountEntry hugo;
root = GetHipadabaRoot(); root = GetHipadabaRoot();
assert(root != NULL); assert(root != NULL);
@ -505,6 +557,31 @@ int SICSHdbAdapter(SConnection *pCon, SicsInterp *pSics, void *pData,
return 1; return 1;
} }
/**
* look for counters
*/
pCount = (pCounter)FindCommandData(pSics,argv[2],"SingleCounter");
if(pCount != NULL){
hugo.monitor = atoi(argv[3]);
hugo.counter = pCount;
hugo.node = path;
if(countList < 0){
countList = LLDcreate(sizeof(CountEntry));
RegisterCallback(pCount->pCall, SCGetContext(pCon),
COUNTSTART, CounterCallback,
NULL, NULL);
RegisterCallback(pCount->pCall, SCGetContext(pCon),
COUNTEND, CounterCallback,
NULL, NULL);
RegisterCallback(pCount->pCall, SCGetContext(pCon),
MONITOR, CounterCallback,
NULL, NULL);
}
LLDnodeAppendFrom(countList,&hugo);
SCSendOK(pCon);
return 1;
}
snprintf(buffer,511, snprintf(buffer,511,
"ERROR: attaching this type of object: %s at %s not implemented", "ERROR: attaching this type of object: %s at %s not implemented",
argv[2], argv[1]); argv[2], argv[1]);

View File

@ -229,7 +229,7 @@ static int SICSNotifyCallback(void *userData, void *callData, pHdb node,
pDynString result = NULL; pDynString result = NULL;
char *pPath = NULL; char *pPath = NULL;
Protocol protocol = normal_protocol; Protocol protocol = normal_protocol;
int outCode; int outCode, macro;
cbInfo = (HdbCBInfo *)userData; cbInfo = (HdbCBInfo *)userData;
pPath = GetHipadabaPath(node); pPath = GetHipadabaPath(node);
@ -239,11 +239,17 @@ static int SICSNotifyCallback(void *userData, void *callData, pHdb node,
else else
outCode = eEvent; outCode = eEvent;
/*
* we want our notifications to come even when called from a macro
*/
macro = SCinMacro(cbInfo->pCon);
SCsetMacro(cbInfo->pCon,0);
if(v.arrayLength < 100){ if(v.arrayLength < 100){
printedData = formatValue(v); printedData = formatValue(v);
if(pPath == NULL || printedData == NULL || result == NULL){ if(pPath == NULL || printedData == NULL || result == NULL){
SCWriteInContext(cbInfo->pCon,"ERROR: out of memory formatting data" , SCWriteInContext(cbInfo->pCon,"ERROR: out of memory formatting data" ,
eEvent,cbInfo->context); eEvent,cbInfo->context);
SCsetMacro(cbInfo->pCon,macro);
/* /*
* no need to interrupt something because writing data to a client does * no need to interrupt something because writing data to a client does
* not work * not work
@ -259,6 +265,7 @@ static int SICSNotifyCallback(void *userData, void *callData, pHdb node,
SCWriteInContext(cbInfo->pCon,GetCharArray(result), SCWriteInContext(cbInfo->pCon,GetCharArray(result),
outCode,cbInfo->context); outCode,cbInfo->context);
} }
SCsetMacro(cbInfo->pCon,macro);
free(pPath); free(pPath);
DeleteDynString(result); DeleteDynString(result);

View File

@ -49,6 +49,7 @@
#include "status.h" #include "status.h"
#include "interrupt.h" #include "interrupt.h"
#include "devexec.h" #include "devexec.h"
#include "sicshipadaba.h"
#undef VALUECHANGE #undef VALUECHANGE
#define VALUECHANGE 2 #define VALUECHANGE 2
@ -197,13 +198,30 @@
SCPopContext(pCon); SCPopContext(pCon);
return 1; return 1;
} }
/*------------------- The CallBack function for interest ------------------*/
static int StatusHDBCallback(int iEvent, void *pEvent, void *pUser,
commandContext cc)
{
pHdb node = NULL;
char pBueffel[80];
hdbValue v;
assert(pUser);
node = (pHdb)pUser;
v = MakeHdbText(pText[eCode]);
if(node != NULL && iEvent == VALUECHANGE){
UpdateHipadabaPar(node,v,NULL);
}
return 1;
}
/*-----------------------------------------------------------------------*/ /*-----------------------------------------------------------------------*/
int UserStatus(SConnection *pCon, SicsInterp *pSics, void *pData, int UserStatus(SConnection *pCon, SicsInterp *pSics, void *pData,
int argc, char *argv[]) int argc, char *argv[])
{ {
char pBueffel[512]; char pBueffel[512];
long lID; long lID;
pHdb node = NULL;
assert(pSics); assert(pSics);
assert(pCon); assert(pCon);
@ -227,6 +245,27 @@
SCSendOK(pCon); SCSendOK(pCon);
return 1; return 1;
} }
else if(strcmp(argv[1],"hdbinterest") == 0)
{
if(argc > 2){
node = GetHipadabaNode(GetHipadabaRoot(),argv[2]);
if(node != NULL){
lID = RegisterCallback(pCall, SCGetContext(pCon),
VALUECHANGE, StatusHDBCallback,
node, NULL);
/* SCRegister(pCon,pSics, pCall,lID); */
SCSendOK(pCon);
return 1;
} else {
SCWrite(pCon,"ERROR: Hipadaba node not found",eError);
return 0;
}
}
} else {
SCWrite(pCon,"ERROR: require node parameter to register status callback",
eError);
return 0;
}
} }
/* else just print value */ /* else just print value */

10396
val.lis

File diff suppressed because it is too large Load Diff