288 lines
6.6 KiB
C
288 lines
6.6 KiB
C
/*
|
|
* SCAQA: Script Context to Async Queue Adapter
|
|
*
|
|
* The purpose of this module is to provide the interface glue to allow Async
|
|
* Queue communication devices to be used in Script Context drivers.
|
|
*
|
|
* The approach taken it to provide an Ascon device interface and state machine
|
|
* on top of an AsyncUnit device.
|
|
*/
|
|
#include <ascon.h>
|
|
#include <ascon.i>
|
|
#include <dynstring.h>
|
|
#include <asyncqueue.h>
|
|
|
|
typedef struct txn_s {
|
|
int transWait;
|
|
} TXN, *pTXN;
|
|
|
|
typedef struct scaq_private_s {
|
|
pAsyncUnit unit;
|
|
char *queue_name;
|
|
TXN txn;
|
|
} Private, *pPrivate;
|
|
|
|
static char *state_name(Ascon *a)
|
|
{
|
|
switch (a->state) {
|
|
case AsconNotConnected:
|
|
return "AsconNotConnected";
|
|
break;
|
|
case AsconConnectStart:
|
|
return "AsconConnectStart";
|
|
break;
|
|
case AsconConnecting:
|
|
return "AsconConnecting";
|
|
break;
|
|
case AsconConnectDone:
|
|
return "AsconConnectDone";
|
|
break;
|
|
case AsconWriteStart:
|
|
return "AsconWriteStart";
|
|
break;
|
|
case AsconWriting:
|
|
return "AsconWriting";
|
|
break;
|
|
case AsconWriteDone:
|
|
return "AsconWriteDone";
|
|
break;
|
|
case AsconReadStart:
|
|
return "AsconReadStart";
|
|
break;
|
|
case AsconReading:
|
|
return "AsconReading";
|
|
break;
|
|
case AsconReadDone:
|
|
return "AsconReadDone";
|
|
break;
|
|
case AsconIdle:
|
|
return "AsconIdle";
|
|
break;
|
|
case AsconFailed:
|
|
return "AsconFailed";
|
|
break;
|
|
case AsconTimeout:
|
|
return "AsconTimeout";
|
|
break;
|
|
case AsconMaxState:
|
|
return "AsconMaxState";
|
|
break;
|
|
default:
|
|
return "Unknown Ascon State";
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* \brief handle notification callbacks from the unit
|
|
*/
|
|
static void SCAQ_Notify(void *context, int event)
|
|
{
|
|
Ascon *a = (Ascon*) context;
|
|
pPrivate pp = (pPrivate) a->private;
|
|
pTXN txn = &pp->txn;
|
|
char line[132];
|
|
|
|
if (1 == txn->transWait)
|
|
txn->transWait = -1; /* TIMEOUT */
|
|
switch (event) {
|
|
case AQU_DISCONNECT:
|
|
snprintf(line, 132, "Disconnect on aqadapter to '%s'", pp->queue_name);
|
|
SICSLogWrite(line, eStatus);
|
|
break;
|
|
case AQU_RECONNECT:
|
|
snprintf(line, 132, "Reconnect on aqadapter to '%s'", pp->queue_name);
|
|
SICSLogWrite(line, eStatus);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* \brief TransCallback is the callback for the general command transaction.
|
|
*/
|
|
static int TransCallback(pAsyncTxn pCmd)
|
|
{
|
|
char *resp = pCmd->inp_buf;
|
|
int resp_len = pCmd->inp_idx;
|
|
Ascon *a = (Ascon *) pCmd->cntx;
|
|
pPrivate pp = (pPrivate) a->private;
|
|
pTXN self = &pp->txn;
|
|
|
|
if (resp_len > 0) {
|
|
DynStringConcatBytes(a->rdBuffer, resp, resp_len);
|
|
}
|
|
if (pCmd->txn_status == ATX_TIMEOUT) {
|
|
self->transWait = -1;
|
|
} else {
|
|
self->transWait = 0;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void SCAQTransact(Ascon *a)
|
|
{
|
|
pPrivate pp = NULL;
|
|
pTXN txn = NULL;
|
|
AsyncUnit *unit = NULL;
|
|
const char *command = GetCharArray(a->wrBuffer);
|
|
int cmd_len = GetDynStringLength(a->wrBuffer);
|
|
pp = (pPrivate) a->private;
|
|
assert(pp);
|
|
unit = pp->unit;
|
|
assert(unit);
|
|
txn = &pp->txn;
|
|
txn->transWait = 1;
|
|
DynStringClear(a->rdBuffer);
|
|
AsyncUnitSendTxn(unit, command, cmd_len, TransCallback, a, 1024);
|
|
}
|
|
|
|
static int scaqaProtHandler(Ascon *a)
|
|
{
|
|
pPrivate pp = (pPrivate) a->private;
|
|
switch (a->state) {
|
|
case AsconNotConnected:
|
|
return 0;
|
|
case AsconConnectStart:
|
|
a->state = AsconConnecting;
|
|
return 0;
|
|
case AsconConnecting:
|
|
if (1 == AsyncUnitIsQueueConnected(pp->unit)) {
|
|
a->state = AsconConnectDone;
|
|
}
|
|
return 0;
|
|
case AsconConnectDone:
|
|
/* should not get here */
|
|
a->state = AsconIdle;
|
|
return 0;
|
|
case AsconWriteStart:
|
|
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
|
|
AsconError(a, "Disconnected", 0);
|
|
a->state = AsconFailed;
|
|
return 0;
|
|
}
|
|
a->state = AsconWriting;
|
|
return 0;
|
|
case AsconWriting:
|
|
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
|
|
AsconError(a, "Disconnected", 0);
|
|
a->state = AsconFailed;
|
|
return 0;
|
|
}
|
|
SCAQTransact(a);
|
|
a->state = AsconWriteDone;
|
|
return 0;
|
|
case AsconWriteDone:
|
|
/* should not get here */
|
|
a->state = AsconReadStart;
|
|
return 0;
|
|
case AsconReadStart:
|
|
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
|
|
AsconError(a, "Disconnected", 0);
|
|
a->state = AsconFailed;
|
|
return 0;
|
|
}
|
|
a->state = AsconReading;
|
|
return 0;
|
|
case AsconReading:
|
|
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
|
|
AsconError(a, "Disconnected", 0);
|
|
a->state = AsconFailed;
|
|
return 0;
|
|
}
|
|
if (pp->txn.transWait < 0) {
|
|
if (GetDynStringLength(a->rdBuffer) == 0) {
|
|
a->state = AsconTimeout;
|
|
} else {
|
|
a->state = AsconReadDone;
|
|
}
|
|
} else if (pp->txn.transWait == 0) {
|
|
a->state = AsconReadDone;
|
|
}
|
|
return 0;
|
|
case AsconReadDone:
|
|
/* should not get here */
|
|
return 0;
|
|
case AsconIdle:
|
|
return 0;
|
|
case AsconFailed:
|
|
if (1 == AsyncUnitIsQueueConnected(pp->unit)) {
|
|
a->state = AsconConnectStart;
|
|
return 0;
|
|
}
|
|
return 0;
|
|
case AsconTimeout:
|
|
/* should not get here */
|
|
a->state = AsconIdle;
|
|
return 0;
|
|
case AsconMaxState:
|
|
return 0;
|
|
default:
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Kill the private storage
|
|
*
|
|
* Clean up and release all resources associated with the private object
|
|
*/
|
|
static void SCAQ_KillPrivate(void *vp)
|
|
{
|
|
pPrivate pp = (pPrivate) vp;
|
|
if (pp) {
|
|
if (pp->unit) {
|
|
AsyncUnit *asyncUnit = (AsyncUnit *) pp->unit;
|
|
AsyncUnitDestroy(asyncUnit);
|
|
}
|
|
if (pp->queue_name)
|
|
free(pp->queue_name);
|
|
free(pp);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Initialize the Ascon object for this device, the async queue argument.
|
|
*/
|
|
static int scaqaAsconInit(Ascon *a, SConnection *pCon, int argc, char *argv[])
|
|
{
|
|
int i;
|
|
AsyncUnit *asyncUnit;
|
|
pPrivate pp;
|
|
for (i = 0; i < argc; ++i) {
|
|
SCPrintf(pCon, eStatus, "scaqaAsconInit: arg[%d] = %s\n", i, argv[i]);
|
|
}
|
|
if (argc < 1) {
|
|
SCPrintf(pCon, eError, "Insufficient arguments to scaqaAsconInit: %d\n", argc);
|
|
return 0;
|
|
}
|
|
if (!AsyncUnitCreate(argv[1], &asyncUnit)) {
|
|
SCPrintf(pCon, eError, "Cannot find AsyncQueue '%s' when creating script context adapter '%s'",
|
|
argv[1], argv[0]);
|
|
return 0;
|
|
}
|
|
pp = (pPrivate) calloc(sizeof(Private), 1);
|
|
pp->unit = asyncUnit;
|
|
pp->queue_name = strdup(argv[1]);
|
|
pp->txn.transWait = 0;
|
|
a->private = pp;
|
|
a->hostport = strdup(argv[1]);
|
|
a->killPrivate = SCAQ_KillPrivate;
|
|
AsyncUnitSetNotify(asyncUnit, a, SCAQ_Notify);
|
|
return 1;
|
|
}
|
|
/*
|
|
* This procedure creates, initializes and registers the "scaqa" protocol
|
|
* with the Ascon infrastructure
|
|
*/
|
|
void AddSCAQAProtocol(void)
|
|
{
|
|
AsconProtocol *prot = NULL;
|
|
printf("AddSCAQAProtocol\n");
|
|
prot = calloc(sizeof(AsconProtocol), 1);
|
|
prot->name = strdup("aqadapter");
|
|
prot->init = scaqaAsconInit;
|
|
prot->handler = scaqaProtHandler;
|
|
AsconInsertProtocol(prot);
|
|
}
|