Files
sics/site_ansto/hardsup/sct_asyncqueue.c
2015-01-23 17:13:50 +11:00

364 lines
8.5 KiB
C

/*
* SCAQA: Script Context to Async Queue Adapter
*
* The purpose of this module is to provide the interface glue to allow Async
* Queue communication devices to be used in Script Context drivers.
*
* The approach taken it to provide an Ascon device interface and state machine
* on top of an AsyncUnit device.
*/
#include <ascon.h>
#include <ascon.i>
#include <dynstring.h>
#include <asyncqueue.h>
typedef struct txn_s {
int transWait;
} TXN, *pTXN;
typedef struct scaq_private_s {
pAsyncUnit unit;
char *queue_name;
TXN txn;
} Private, *pPrivate;
static char *state_name(Ascon *a)
{
switch (a->state) {
case AsconNotConnected:
return "AsconNotConnected";
break;
case AsconConnectStart:
return "AsconConnectStart";
break;
case AsconConnecting:
return "AsconConnecting";
break;
case AsconConnectDone:
return "AsconConnectDone";
break;
case AsconWriteStart:
return "AsconWriteStart";
break;
case AsconWriting:
return "AsconWriting";
break;
case AsconWriteDone:
return "AsconWriteDone";
break;
case AsconReadStart:
return "AsconReadStart";
break;
case AsconReading:
return "AsconReading";
break;
case AsconReadDone:
return "AsconReadDone";
break;
case AsconIdle:
return "AsconIdle";
break;
case AsconFailed:
return "AsconFailed";
break;
case AsconTimeout:
return "AsconTimeout";
break;
case AsconMaxState:
return "AsconMaxState";
break;
default:
return "Unknown Ascon State";
break;
}
}
/**
* \brief handle notification callbacks from the unit
*/
static void SCAQ_Notify(void *context, int event)
{
Ascon *a = (Ascon*) context;
pPrivate pp = (pPrivate) a->private;
pTXN txn = &pp->txn;
char line[132];
if (1 == txn->transWait)
txn->transWait = -1; /* TIMEOUT */
switch (event) {
case AQU_DISCONNECT:
snprintf(line, 132, "Disconnect on aqadapter to '%s'", pp->queue_name);
SICSLogWrite(line, eStatus);
break;
case AQU_RECONNECT:
snprintf(line, 132, "Reconnect on aqadapter to '%s'", pp->queue_name);
SICSLogWrite(line, eStatus);
break;
}
}
/**
* \brief TransCallback is the callback for the general command transaction.
*/
static int TransCallback(pAsyncTxn pCmd)
{
char *resp = pCmd->inp_buf;
int resp_len = pCmd->inp_idx;
Ascon *a = (Ascon *) pCmd->cntx;
pPrivate pp = (pPrivate) a->private;
pTXN self = &pp->txn;
if (resp_len > 0) {
DynStringConcatBytes(a->rdBuffer, resp, resp_len);
}
if (pCmd->txn_status == ATX_TIMEOUT) {
self->transWait = -1;
} else {
self->transWait = 0;
}
return 0;
}
static void SCAQTransact(Ascon *a)
{
pPrivate pp = NULL;
pTXN txn = NULL;
AsyncUnit *unit = NULL;
const char *command = GetCharArray(a->wrBuffer);
int cmd_len = GetDynStringLength(a->wrBuffer);
int rsp_len = 1024;
pp = (pPrivate) a->private;
assert(pp);
unit = pp->unit;
assert(unit);
txn = &pp->txn;
txn->transWait = 1;
DynStringClear(a->rdBuffer);
if (cmd_len > 3 && strncmp(&command[cmd_len - 3], "{0}", 3) == 0) {
rsp_len = 0;
cmd_len -= 3;
}
else if (cmd_len > 11 && strncasecmp(&command[cmd_len - 11], "@@NOREPLY@@", 11) == 0) {
rsp_len = 0;
cmd_len -= 11;
}
AsyncUnitSendTxn(unit, command, cmd_len, TransCallback, a, rsp_len);
}
static int scaqaNullHandler(Ascon *a)
{
const char *command;
int cmd_len;
struct timeval tv;
pPrivate pp = (pPrivate) a->private;
switch (a->state) {
case AsconNotConnected:
return 0;
case AsconConnectStart:
a->state = AsconConnecting;
return 0;
case AsconConnecting:
a->state = AsconConnectDone;
return 0;
case AsconConnectDone:
/* should not get here */
a->state = AsconIdle;
return 0;
case AsconWriteStart:
a->state = AsconWriting;
return 0;
case AsconWriting:
/* Log this as a Warning */
command = GetCharArray(a->wrBuffer);
cmd_len = GetDynStringLength(a->wrBuffer);
gettimeofday(&tv, NULL);
SICSLogWriteTime("WARNING: writing to NULL aqadapter", eWarning, &tv);
SICSLogWriteHexTime(command, cmd_len, eWarning, &tv);
a->state = AsconWriteDone;
return 0;
case AsconWriteDone:
/* should not get here */
a->state = AsconReadStart;
return 0;
case AsconReadStart:
/* Log this as a Warning and Return an ASCERR message */
SICSLogWrite("ASCERR: Reading from NULL aqadapter", eWarning);
DynStringCopy(a->rdBuffer, "ASCERR: Reading from NULL aqadapter");
a->state = AsconReadDone;
return 0;
case AsconReadDone:
/* should not get here */
return 0;
case AsconIdle:
return 0;
case AsconFailed:
a->state = AsconConnectStart;
return 0;
case AsconTimeout:
/* should not get here */
a->state = AsconIdle;
return 0;
case AsconMaxState:
return 0;
default:
return 0;
}
}
static int scaqaProtHandler(Ascon *a)
{
pPrivate pp = (pPrivate) a->private;
if (NULL == pp->unit)
return scaqaNullHandler(a);
switch (a->state) {
case AsconNotConnected:
return 0;
case AsconConnectStart:
a->state = AsconConnecting;
return 0;
case AsconConnecting:
if (1 == AsyncUnitIsQueueConnected(pp->unit)) {
a->state = AsconConnectDone;
}
return 0;
case AsconConnectDone:
/* should not get here */
a->state = AsconIdle;
return 0;
case AsconWriteStart:
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
AsconError(a, "Disconnected", 0);
a->state = AsconFailed;
return 0;
}
a->state = AsconWriting;
return 0;
case AsconWriting:
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
AsconError(a, "Disconnected", 0);
a->state = AsconFailed;
return 0;
}
SCAQTransact(a);
a->state = AsconWriteDone;
return 0;
case AsconWriteDone:
/* should not get here */
a->state = AsconReadStart;
return 0;
case AsconReadStart:
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
AsconError(a, "Disconnected", 0);
a->state = AsconFailed;
return 0;
}
a->state = AsconReading;
return 0;
case AsconReading:
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
AsconError(a, "Disconnected", 0);
a->state = AsconFailed;
return 0;
}
if (pp->txn.transWait < 0) {
if (GetDynStringLength(a->rdBuffer) == 0) {
a->state = AsconTimeout;
} else {
a->state = AsconReadDone;
}
} else if (pp->txn.transWait == 0) {
a->state = AsconReadDone;
}
return 0;
case AsconReadDone:
/* should not get here */
return 0;
case AsconIdle:
return 0;
case AsconFailed:
if (1 == AsyncUnitIsQueueConnected(pp->unit)) {
a->state = AsconConnectStart;
return 0;
}
return 0;
case AsconTimeout:
/* should not get here */
a->state = AsconIdle;
return 0;
case AsconMaxState:
return 0;
default:
return 0;
}
}
/*
* Kill the private storage
*
* Clean up and release all resources associated with the private object
*/
static void SCAQ_KillPrivate(void *vp)
{
pPrivate pp = (pPrivate) vp;
if (pp) {
if (pp->unit) {
AsyncUnit *asyncUnit = (AsyncUnit *) pp->unit;
AsyncUnitDestroy(asyncUnit);
}
if (pp->queue_name)
free(pp->queue_name);
free(pp);
}
}
/*
* Initialize the Ascon object for this device, the async queue argument.
*/
static int scaqaAsconInit(Ascon *a, SConnection *pCon, int argc, char *argv[])
{
int i;
AsyncUnit *asyncUnit;
pPrivate pp;
for (i = 0; i < argc; ++i) {
SCPrintf(pCon, eStatus, "scaqaAsconInit: arg[%d] = %s\n", i, argv[i]);
}
if (argc < 2) {
SCPrintf(pCon, eError, "Insufficient arguments to scaqaAsconInit: %d\n", argc);
return 0;
}
if (strcasecmp("null", argv[1]) == 0) {
asyncUnit = NULL;
}
else if (!AsyncUnitCreate(argv[1], &asyncUnit)) {
SCPrintf(pCon, eError, "Cannot find AsyncQueue '%s' when creating script context adapter '%s'",
argv[1], argv[0]);
return 0;
}
pp = (pPrivate) calloc(sizeof(Private), 1);
pp->unit = asyncUnit;
pp->queue_name = strdup(argv[1]);
pp->txn.transWait = 0;
a->private = pp;
a->hostport = strdup(argv[1]);
a->killPrivate = SCAQ_KillPrivate;
if (asyncUnit)
AsyncUnitSetNotify(asyncUnit, a, SCAQ_Notify);
return 1;
}
/*
* This procedure creates, initializes and registers the "scaqa" protocol
* with the Ascon infrastructure
*/
void AddSCAQAProtocol(void)
{
AsconProtocol *prot = NULL;
printf("AddSCAQAProtocol\n");
prot = calloc(sizeof(AsconProtocol), 1);
prot->name = strdup("aqadapter");
prot->init = scaqaAsconInit;
prot->handler = scaqaProtHandler;
AsconInsertProtocol(prot);
}