diff --git a/site_ansto/hardsup/sct_asyncqueue.c b/site_ansto/hardsup/sct_asyncqueue.c new file mode 100644 index 00000000..1dcfae9f --- /dev/null +++ b/site_ansto/hardsup/sct_asyncqueue.c @@ -0,0 +1,283 @@ +/* + * SCAQA: Script Context to Async Queue Adapter + * + * The purpose of this module is to provide the interface glue to allow Async + * Queue communication devices to be used in Script Context drivers. + * + * The approach taken it to provide an Ascon device interface and state machine + * on top of an AsyncUnit device. + */ +#include +#include +#include +#include + +typedef struct txn_s { + int transWait; +} TXN, *pTXN; + +typedef struct scaq_private_s { + pAsyncUnit unit; + char *queue_name; + TXN txn; +} Private, *pPrivate; + +static char *state_name(Ascon *a) +{ + switch (a->state) { + case AsconNotConnected: + return "AsconNotConnected"; + break; + case AsconConnectStart: + return "AsconConnectStart"; + break; + case AsconConnecting: + return "AsconConnecting"; + break; + case AsconConnectDone: + return "AsconConnectDone"; + break; + case AsconWriteStart: + return "AsconWriteStart"; + break; + case AsconWriting: + return "AsconWriting"; + break; + case AsconWriteDone: + return "AsconWriteDone"; + break; + case AsconReadStart: + return "AsconReadStart"; + break; + case AsconReading: + return "AsconReading"; + break; + case AsconReadDone: + return "AsconReadDone"; + break; + case AsconIdle: + return "AsconIdle"; + break; + case AsconFailed: + return "AsconFailed"; + break; + case AsconTimeout: + return "AsconTimeout"; + break; + case AsconMaxState: + return "AsconMaxState"; + break; + default: + return "Unknown Ascon State"; + break; + } +} + +/** + * \brief handle notification callbacks from the unit + */ +static void SCAQ_Notify(void *context, int event) +{ + Ascon *a = (Ascon*) context; + pPrivate pp = (pPrivate) a->private; + pTXN txn = &pp->txn; + char line[132]; + + if (1 == txn->transWait) + txn->transWait = -1; /* TIMEOUT */ + switch (event) { + case AQU_DISCONNECT: + snprintf(line, 132, "Disconnect on aqadapter to '%s'", pp->queue_name); + SICSLogWrite(line, eStatus); + break; + case AQU_RECONNECT: + snprintf(line, 132, "Reconnect on aqadapter to '%s'", pp->queue_name); + SICSLogWrite(line, eStatus); + break; + } +} + +/** + * \brief TransCallback is the callback for the general command transaction. + */ +static int TransCallback(pAsyncTxn pCmd) +{ + char *resp = pCmd->inp_buf; + int resp_len = pCmd->inp_idx; + Ascon *a = (Ascon *) pCmd->cntx; + pPrivate pp = (pPrivate) a->private; + pTXN self = &pp->txn; + + if (resp_len > 0) { + DynStringConcatBytes(a->rdBuffer, resp, resp_len); + } + if (pCmd->txn_status == ATX_TIMEOUT) { + self->transWait = -1; + } else { + self->transWait = 0; + } + + return 0; +} + +static void SCAQTransact(Ascon *a) +{ + pPrivate pp = NULL; + pTXN txn = NULL; + AsyncUnit *unit = NULL; + const char *command = GetCharArray(a->wrBuffer); + int cmd_len = GetDynStringLength(a->wrBuffer); + pp = (pPrivate) a->private; + assert(pp); + unit = pp->unit; + assert(unit); + txn = &pp->txn; + txn->transWait = 1; + DynStringClear(a->rdBuffer); + AsyncUnitSendTxn(unit, command, cmd_len, TransCallback, a, 1024); +} + +static int scaqaProtHandler(Ascon *a) +{ + pPrivate pp = (pPrivate) a->private; + switch (a->state) { + case AsconNotConnected: + return 0; + case AsconConnectStart: + a->state = AsconConnecting; + return 0; + case AsconConnecting: + if (1 == AsyncUnitIsQueueConnected(pp->unit)) { + a->state = AsconConnectDone; + } + return 0; + case AsconConnectDone: + /* should not get here */ + a->state = AsconIdle; + return 0; + case AsconWriteStart: + if (0 == AsyncUnitIsQueueConnected(pp->unit)) { + AsconError(a, "Disconnected", 0); + a->state = AsconFailed; + return 0; + } + a->state = AsconWriting; + return 0; + case AsconWriting: + if (0 == AsyncUnitIsQueueConnected(pp->unit)) { + AsconError(a, "Disconnected", 0); + a->state = AsconFailed; + return 0; + } + SCAQTransact(a); + a->state = AsconWriteDone; + return 0; + case AsconWriteDone: + /* should not get here */ + a->state = AsconReadStart; + return 0; + case AsconReadStart: + if (0 == AsyncUnitIsQueueConnected(pp->unit)) { + AsconError(a, "Disconnected", 0); + a->state = AsconFailed; + return 0; + } + a->state = AsconReading; + return 0; + case AsconReading: + if (0 == AsyncUnitIsQueueConnected(pp->unit)) { + AsconError(a, "Disconnected", 0); + a->state = AsconFailed; + return 0; + } + if (pp->txn.transWait < 0) { + a->state = AsconTimeout; + } else if (pp->txn.transWait == 0) { + a->state = AsconReadDone; + } + return 0; + case AsconReadDone: + /* should not get here */ + return 0; + case AsconIdle: + return 0; + case AsconFailed: + if (1 == AsyncUnitIsQueueConnected(pp->unit)) { + a->state = AsconConnectStart; + return 0; + } + return 0; + case AsconTimeout: + /* should not get here */ + a->state = AsconIdle; + return 0; + case AsconMaxState: + return 0; + default: + return 0; + } +} + +/* + * Kill the private storage + * + * Clean up and release all resources associated with the private object + */ +static void SCAQ_KillPrivate(void *vp) +{ + pPrivate pp = (pPrivate) vp; + if (pp) { + if (pp->unit) { + AsyncUnit *asyncUnit = (AsyncUnit *) pp->unit; + AsyncUnitDestroy(asyncUnit); + } + if (pp->queue_name) + free(pp->queue_name); + free(pp); + } +} + +/* + * Initialize the Ascon object for this device, the async queue argument. + */ +static int scaqaAsconInit(Ascon *a, SConnection *pCon, int argc, char *argv[]) +{ + int i; + AsyncUnit *asyncUnit; + pPrivate pp; + for (i = 0; i < argc; ++i) { + SCPrintf(pCon, eStatus, "scaqaAsconInit: arg[%d] = %s\n", i, argv[i]); + } + if (argc < 1) { + SCPrintf(pCon, eError, "Insufficient arguments to scaqaAsconInit: %d\n", argc); + return 0; + } + if (!AsyncUnitCreate(argv[1], &asyncUnit)) { + SCPrintf(pCon, eError, "Cannot find AsyncQueue '%s' when creating script context adapter '%s'", + argv[1], argv[0]); + return 0; + } + pp = (pPrivate) calloc(sizeof(Private), 1); + pp->unit = asyncUnit; + pp->queue_name = strdup(argv[1]); + pp->txn.transWait = 0; + a->private = pp; + a->hostport = strdup(argv[1]); + a->killPrivate = SCAQ_KillPrivate; + AsyncUnitSetNotify(asyncUnit, a, SCAQ_Notify); + return 1; +} +/* + * This procedure creates, initializes and registers the "scaqa" protocol + * with the Ascon infrastructure + */ +void AddSCAQAProtocol(void) +{ + AsconProtocol *prot = NULL; + printf("AddSCAQAProtocol\n"); + prot = calloc(sizeof(AsconProtocol), 1); + prot->name = strdup("aqadapter"); + prot->init = scaqaAsconInit; + prot->handler = scaqaProtHandler; + AsconInsertProtocol(prot); +}