New file for scriptcontext-asyncqueue adapter
This commit is contained in:
283
site_ansto/hardsup/sct_asyncqueue.c
Normal file
283
site_ansto/hardsup/sct_asyncqueue.c
Normal file
@@ -0,0 +1,283 @@
|
||||
/*
|
||||
* SCAQA: Script Context to Async Queue Adapter
|
||||
*
|
||||
* The purpose of this module is to provide the interface glue to allow Async
|
||||
* Queue communication devices to be used in Script Context drivers.
|
||||
*
|
||||
* The approach taken it to provide an Ascon device interface and state machine
|
||||
* on top of an AsyncUnit device.
|
||||
*/
|
||||
#include <ascon.h>
|
||||
#include <ascon.i>
|
||||
#include <dynstring.h>
|
||||
#include <asyncqueue.h>
|
||||
|
||||
typedef struct txn_s {
|
||||
int transWait;
|
||||
} TXN, *pTXN;
|
||||
|
||||
typedef struct scaq_private_s {
|
||||
pAsyncUnit unit;
|
||||
char *queue_name;
|
||||
TXN txn;
|
||||
} Private, *pPrivate;
|
||||
|
||||
static char *state_name(Ascon *a)
|
||||
{
|
||||
switch (a->state) {
|
||||
case AsconNotConnected:
|
||||
return "AsconNotConnected";
|
||||
break;
|
||||
case AsconConnectStart:
|
||||
return "AsconConnectStart";
|
||||
break;
|
||||
case AsconConnecting:
|
||||
return "AsconConnecting";
|
||||
break;
|
||||
case AsconConnectDone:
|
||||
return "AsconConnectDone";
|
||||
break;
|
||||
case AsconWriteStart:
|
||||
return "AsconWriteStart";
|
||||
break;
|
||||
case AsconWriting:
|
||||
return "AsconWriting";
|
||||
break;
|
||||
case AsconWriteDone:
|
||||
return "AsconWriteDone";
|
||||
break;
|
||||
case AsconReadStart:
|
||||
return "AsconReadStart";
|
||||
break;
|
||||
case AsconReading:
|
||||
return "AsconReading";
|
||||
break;
|
||||
case AsconReadDone:
|
||||
return "AsconReadDone";
|
||||
break;
|
||||
case AsconIdle:
|
||||
return "AsconIdle";
|
||||
break;
|
||||
case AsconFailed:
|
||||
return "AsconFailed";
|
||||
break;
|
||||
case AsconTimeout:
|
||||
return "AsconTimeout";
|
||||
break;
|
||||
case AsconMaxState:
|
||||
return "AsconMaxState";
|
||||
break;
|
||||
default:
|
||||
return "Unknown Ascon State";
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief handle notification callbacks from the unit
|
||||
*/
|
||||
static void SCAQ_Notify(void *context, int event)
|
||||
{
|
||||
Ascon *a = (Ascon*) context;
|
||||
pPrivate pp = (pPrivate) a->private;
|
||||
pTXN txn = &pp->txn;
|
||||
char line[132];
|
||||
|
||||
if (1 == txn->transWait)
|
||||
txn->transWait = -1; /* TIMEOUT */
|
||||
switch (event) {
|
||||
case AQU_DISCONNECT:
|
||||
snprintf(line, 132, "Disconnect on aqadapter to '%s'", pp->queue_name);
|
||||
SICSLogWrite(line, eStatus);
|
||||
break;
|
||||
case AQU_RECONNECT:
|
||||
snprintf(line, 132, "Reconnect on aqadapter to '%s'", pp->queue_name);
|
||||
SICSLogWrite(line, eStatus);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief TransCallback is the callback for the general command transaction.
|
||||
*/
|
||||
static int TransCallback(pAsyncTxn pCmd)
|
||||
{
|
||||
char *resp = pCmd->inp_buf;
|
||||
int resp_len = pCmd->inp_idx;
|
||||
Ascon *a = (Ascon *) pCmd->cntx;
|
||||
pPrivate pp = (pPrivate) a->private;
|
||||
pTXN self = &pp->txn;
|
||||
|
||||
if (resp_len > 0) {
|
||||
DynStringConcatBytes(a->rdBuffer, resp, resp_len);
|
||||
}
|
||||
if (pCmd->txn_status == ATX_TIMEOUT) {
|
||||
self->transWait = -1;
|
||||
} else {
|
||||
self->transWait = 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void SCAQTransact(Ascon *a)
|
||||
{
|
||||
pPrivate pp = NULL;
|
||||
pTXN txn = NULL;
|
||||
AsyncUnit *unit = NULL;
|
||||
const char *command = GetCharArray(a->wrBuffer);
|
||||
int cmd_len = GetDynStringLength(a->wrBuffer);
|
||||
pp = (pPrivate) a->private;
|
||||
assert(pp);
|
||||
unit = pp->unit;
|
||||
assert(unit);
|
||||
txn = &pp->txn;
|
||||
txn->transWait = 1;
|
||||
DynStringClear(a->rdBuffer);
|
||||
AsyncUnitSendTxn(unit, command, cmd_len, TransCallback, a, 1024);
|
||||
}
|
||||
|
||||
static int scaqaProtHandler(Ascon *a)
|
||||
{
|
||||
pPrivate pp = (pPrivate) a->private;
|
||||
switch (a->state) {
|
||||
case AsconNotConnected:
|
||||
return 0;
|
||||
case AsconConnectStart:
|
||||
a->state = AsconConnecting;
|
||||
return 0;
|
||||
case AsconConnecting:
|
||||
if (1 == AsyncUnitIsQueueConnected(pp->unit)) {
|
||||
a->state = AsconConnectDone;
|
||||
}
|
||||
return 0;
|
||||
case AsconConnectDone:
|
||||
/* should not get here */
|
||||
a->state = AsconIdle;
|
||||
return 0;
|
||||
case AsconWriteStart:
|
||||
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
|
||||
AsconError(a, "Disconnected", 0);
|
||||
a->state = AsconFailed;
|
||||
return 0;
|
||||
}
|
||||
a->state = AsconWriting;
|
||||
return 0;
|
||||
case AsconWriting:
|
||||
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
|
||||
AsconError(a, "Disconnected", 0);
|
||||
a->state = AsconFailed;
|
||||
return 0;
|
||||
}
|
||||
SCAQTransact(a);
|
||||
a->state = AsconWriteDone;
|
||||
return 0;
|
||||
case AsconWriteDone:
|
||||
/* should not get here */
|
||||
a->state = AsconReadStart;
|
||||
return 0;
|
||||
case AsconReadStart:
|
||||
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
|
||||
AsconError(a, "Disconnected", 0);
|
||||
a->state = AsconFailed;
|
||||
return 0;
|
||||
}
|
||||
a->state = AsconReading;
|
||||
return 0;
|
||||
case AsconReading:
|
||||
if (0 == AsyncUnitIsQueueConnected(pp->unit)) {
|
||||
AsconError(a, "Disconnected", 0);
|
||||
a->state = AsconFailed;
|
||||
return 0;
|
||||
}
|
||||
if (pp->txn.transWait < 0) {
|
||||
a->state = AsconTimeout;
|
||||
} else if (pp->txn.transWait == 0) {
|
||||
a->state = AsconReadDone;
|
||||
}
|
||||
return 0;
|
||||
case AsconReadDone:
|
||||
/* should not get here */
|
||||
return 0;
|
||||
case AsconIdle:
|
||||
return 0;
|
||||
case AsconFailed:
|
||||
if (1 == AsyncUnitIsQueueConnected(pp->unit)) {
|
||||
a->state = AsconConnectStart;
|
||||
return 0;
|
||||
}
|
||||
return 0;
|
||||
case AsconTimeout:
|
||||
/* should not get here */
|
||||
a->state = AsconIdle;
|
||||
return 0;
|
||||
case AsconMaxState:
|
||||
return 0;
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Kill the private storage
|
||||
*
|
||||
* Clean up and release all resources associated with the private object
|
||||
*/
|
||||
static void SCAQ_KillPrivate(void *vp)
|
||||
{
|
||||
pPrivate pp = (pPrivate) vp;
|
||||
if (pp) {
|
||||
if (pp->unit) {
|
||||
AsyncUnit *asyncUnit = (AsyncUnit *) pp->unit;
|
||||
AsyncUnitDestroy(asyncUnit);
|
||||
}
|
||||
if (pp->queue_name)
|
||||
free(pp->queue_name);
|
||||
free(pp);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize the Ascon object for this device, the async queue argument.
|
||||
*/
|
||||
static int scaqaAsconInit(Ascon *a, SConnection *pCon, int argc, char *argv[])
|
||||
{
|
||||
int i;
|
||||
AsyncUnit *asyncUnit;
|
||||
pPrivate pp;
|
||||
for (i = 0; i < argc; ++i) {
|
||||
SCPrintf(pCon, eStatus, "scaqaAsconInit: arg[%d] = %s\n", i, argv[i]);
|
||||
}
|
||||
if (argc < 1) {
|
||||
SCPrintf(pCon, eError, "Insufficient arguments to scaqaAsconInit: %d\n", argc);
|
||||
return 0;
|
||||
}
|
||||
if (!AsyncUnitCreate(argv[1], &asyncUnit)) {
|
||||
SCPrintf(pCon, eError, "Cannot find AsyncQueue '%s' when creating script context adapter '%s'",
|
||||
argv[1], argv[0]);
|
||||
return 0;
|
||||
}
|
||||
pp = (pPrivate) calloc(sizeof(Private), 1);
|
||||
pp->unit = asyncUnit;
|
||||
pp->queue_name = strdup(argv[1]);
|
||||
pp->txn.transWait = 0;
|
||||
a->private = pp;
|
||||
a->hostport = strdup(argv[1]);
|
||||
a->killPrivate = SCAQ_KillPrivate;
|
||||
AsyncUnitSetNotify(asyncUnit, a, SCAQ_Notify);
|
||||
return 1;
|
||||
}
|
||||
/*
|
||||
* This procedure creates, initializes and registers the "scaqa" protocol
|
||||
* with the Ascon infrastructure
|
||||
*/
|
||||
void AddSCAQAProtocol(void)
|
||||
{
|
||||
AsconProtocol *prot = NULL;
|
||||
printf("AddSCAQAProtocol\n");
|
||||
prot = calloc(sizeof(AsconProtocol), 1);
|
||||
prot->name = strdup("aqadapter");
|
||||
prot->init = scaqaAsconInit;
|
||||
prot->handler = scaqaProtHandler;
|
||||
AsconInsertProtocol(prot);
|
||||
}
|
||||
Reference in New Issue
Block a user