Files
sics/site_ansto/hardsup/sct_asyncqueue.c
Douglas Clowes 29583c51c0 Add a NULL aqadapter for use by script context pseudo devices
This allows the creation of a script context device without the need for
a physical device to exist. This pseudo device could derive its values
from other things and apply control to other things.

Writes to, this device are not expected and are logged. Reads from this
device are not expected and are logged and return an "ASCERR: " response.
2014-04-23 16:51:05 +10:00

355 lines
8.3 KiB
C

/*
* SCAQA: Script Context to Async Queue Adapter
*
* The purpose of this module is to provide the interface glue to allow Async
* Queue communication devices to be used in Script Context drivers.
*
* The approach taken it to provide an Ascon device interface and state machine
* on top of an AsyncUnit device.
*/
#include <ascon.h>
#include <ascon.i>
#include <dynstring.h>
#include <asyncqueue.h>
typedef struct txn_s {
int transWait;
} TXN, *pTXN;
typedef struct scaq_private_s {
pAsyncUnit unit;
char *queue_name;
TXN txn;
} Private, *pPrivate;
static char *state_name(Ascon *a)
{
switch (a->state) {
case AsconNotConnected:
return "AsconNotConnected";
break;
case AsconConnectStart:
return "AsconConnectStart";
break;
case AsconConnecting:
return "AsconConnecting";
break;
case AsconConnectDone:
return "AsconConnectDone";
break;
case AsconWriteStart:
return "AsconWriteStart";
break;
case AsconWriting:
return "AsconWriting";
break;
case AsconWriteDone:
return "AsconWriteDone";
break;
case AsconReadStart:
return "AsconReadStart";
break;
case AsconReading:
return "AsconReading";
break;
case AsconReadDone:
return "AsconReadDone";
break;
case AsconIdle:
return "AsconIdle";
break;
case AsconFailed:
return "AsconFailed";
break;
case AsconTimeout:
return "AsconTimeout";
break;
case AsconMaxState:
return "AsconMaxState";
break;
default:
return "Unknown Ascon State";
break;
}
}
/**
* \brief handle notification callbacks from the unit
*/
static void SCAQ_Notify(void *context, int event)
{
Ascon *a = (Ascon*) context;
pPrivate pp = (pPrivate) a->private;
pTXN txn = &pp->txn;
char line[132];
if (1 == txn->transWait)
txn->transWait = -1; /* TIMEOUT */
switch (event) {
case AQU_DISCONNECT:
snprintf(line, 132, "Disconnect on aqadapter to '%s'", pp->queue_name);
SICSLogWrite(line, eStatus);
break;
case AQU_RECONNECT:
snprintf(line, 132, "Reconnect on aqadapter to '%s'", pp->queue_name);
SICSLogWrite(line, eStatus);
break;
}
}
/**
* \brief TransCallback is the callback for the general command transaction.
*/
static int TransCallback(pAsyncTxn pCmd)
{
char *resp = pCmd->inp_buf;
int resp_len = pCmd->inp_idx;
Ascon *a = (Ascon *) pCmd->cntx;
pPrivate pp = (pPrivate) a->private;
pTXN self = &pp->txn;
if (resp_len > 0) {
DynStringConcatBytes(a->rdBuffer, resp, resp_len);
}
if (pCmd->txn_status == ATX_TIMEOUT) {
self->transWait = -1;
} else {
self->transWait = 0;
}
return 0;
}
static void SCAQTransact(Ascon *a)
{
pPrivate pp = NULL;
pTXN txn = NULL;
AsyncUnit *unit = NULL;
const char *command = GetCharArray(a->wrBuffer);
int cmd_len = GetDynStringLength(a->wrBuffer);
pp = (pPrivate) a->private;
assert(pp);
unit = pp->unit;
assert(unit);
txn = &pp->txn;
txn->transWait = 1;
DynStringClear(a->rdBuffer);
AsyncUnitSendTxn(unit, command, cmd_len, TransCallback, a, 1024);
}
static int scaqaNullHandler(Ascon *a)
{
const char *command;
int cmd_len;
struct timeval tv;
pPrivate pp = (pPrivate) a->private;
switch (a->state) {
case AsconNotConnected:
return 0;
case AsconConnectStart:
a->state = AsconConnecting;
return 0;
case AsconConnecting:
a->state = AsconConnectDone;
return 0;
case AsconConnectDone:
/* should not get here */
a->state = AsconIdle;
return 0;
case AsconWriteStart:
a->state = AsconWriting;
return 0;
case AsconWriting:
/* Log this as a Warning */
command = GetCharArray(a->wrBuffer);
cmd_len = GetDynStringLength(a->wrBuffer);
gettimeofday(&tv, NULL);
SICSLogWriteTime("WARNING: writing to NULL aqadapter", eWarning, &tv);
SICSLogWriteHexTime(command, cmd_len, eWarning, &tv);
a->state = AsconWriteDone;
return 0;
case AsconWriteDone:
/* should not get here */
a->state = AsconReadStart;
return 0;
case AsconReadStart:
/* Log this as a Warning and Return an ASCERR message */
SICSLogWrite("ASCERR: Reading from NULL aqadapter", eWarning);
DynStringCopy(a->rdBuffer, "ASCERR: Reading from NULL aqadapter");
a->state = AsconReadDone;
return 0;
case AsconReadDone:
/* should not get here */
return 0;
case AsconIdle:
return 0;
case AsconFailed:
a->state = AsconConnectStart;
return 0;
case AsconTimeout:
/* should not get here */
a->state = AsconIdle;
return 0;
case AsconMaxState:
return 0;
default:
return 0;
}
}
static int scaqaProtHandler(Ascon *a)
{
pPrivate pp = (pPrivate) a->private;
if (NULL == pp->unit)
return scaqaNullHandler(a);
switch (a->state) {
case AsconNotConnected:
return 0;
case AsconConnectStart:
a->state = AsconConnecting;
return 0;
case AsconConnecting:
if (1 == AsyncUnitIsQueueConnected(pp->unit)) {
a->state = AsconConnectDone;
}
return 0;
case AsconConnectDone:
/* should not get here */
a->state = AsconIdle;
return 0;
case AsconWriteStart:
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
AsconError(a, "Disconnected", 0);
a->state = AsconFailed;
return 0;
}
a->state = AsconWriting;
return 0;
case AsconWriting:
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
AsconError(a, "Disconnected", 0);
a->state = AsconFailed;
return 0;
}
SCAQTransact(a);
a->state = AsconWriteDone;
return 0;
case AsconWriteDone:
/* should not get here */
a->state = AsconReadStart;
return 0;
case AsconReadStart:
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
AsconError(a, "Disconnected", 0);
a->state = AsconFailed;
return 0;
}
a->state = AsconReading;
return 0;
case AsconReading:
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
AsconError(a, "Disconnected", 0);
a->state = AsconFailed;
return 0;
}
if (pp->txn.transWait < 0) {
if (GetDynStringLength(a->rdBuffer) == 0) {
a->state = AsconTimeout;
} else {
a->state = AsconReadDone;
}
} else if (pp->txn.transWait == 0) {
a->state = AsconReadDone;
}
return 0;
case AsconReadDone:
/* should not get here */
return 0;
case AsconIdle:
return 0;
case AsconFailed:
if (1 == AsyncUnitIsQueueConnected(pp->unit)) {
a->state = AsconConnectStart;
return 0;
}
return 0;
case AsconTimeout:
/* should not get here */
a->state = AsconIdle;
return 0;
case AsconMaxState:
return 0;
default:
return 0;
}
}
/*
* Kill the private storage
*
* Clean up and release all resources associated with the private object
*/
static void SCAQ_KillPrivate(void *vp)
{
pPrivate pp = (pPrivate) vp;
if (pp) {
if (pp->unit) {
AsyncUnit *asyncUnit = (AsyncUnit *) pp->unit;
AsyncUnitDestroy(asyncUnit);
}
if (pp->queue_name)
free(pp->queue_name);
free(pp);
}
}
/*
* Initialize the Ascon object for this device, the async queue argument.
*/
static int scaqaAsconInit(Ascon *a, SConnection *pCon, int argc, char *argv[])
{
int i;
AsyncUnit *asyncUnit;
pPrivate pp;
for (i = 0; i < argc; ++i) {
SCPrintf(pCon, eStatus, "scaqaAsconInit: arg[%d] = %s\n", i, argv[i]);
}
if (argc < 1) {
SCPrintf(pCon, eError, "Insufficient arguments to scaqaAsconInit: %d\n", argc);
return 0;
}
if (strcasecmp("null", argv[1]) == 0) {
asyncUnit = NULL;
}
else if (!AsyncUnitCreate(argv[1], &asyncUnit)) {
SCPrintf(pCon, eError, "Cannot find AsyncQueue '%s' when creating script context adapter '%s'",
argv[1], argv[0]);
return 0;
}
pp = (pPrivate) calloc(sizeof(Private), 1);
pp->unit = asyncUnit;
pp->queue_name = strdup(argv[1]);
pp->txn.transWait = 0;
a->private = pp;
a->hostport = strdup(argv[1]);
a->killPrivate = SCAQ_KillPrivate;
if (asyncUnit)
AsyncUnitSetNotify(asyncUnit, a, SCAQ_Notify);
return 1;
}
/*
* This procedure creates, initializes and registers the "scaqa" protocol
* with the Ascon infrastructure
*/
void AddSCAQAProtocol(void)
{
AsconProtocol *prot = NULL;
printf("AddSCAQAProtocol\n");
prot = calloc(sizeof(AsconProtocol), 1);
prot->name = strdup("aqadapter");
prot->init = scaqaAsconInit;
prot->handler = scaqaProtHandler;
AsconInsertProtocol(prot);
}