/* * SCAQA: Script Context to Async Queue Adapter * * The purpose of this module is to provide the interface glue to allow Async * Queue communication devices to be used in Script Context drivers. * * The approach taken it to provide an Ascon device interface and state machine * on top of an AsyncUnit device. */ #include #include #include #include typedef struct txn_s { int transWait; } TXN, *pTXN; typedef struct scaq_private_s { pAsyncUnit unit; char *queue_name; TXN txn; } Private, *pPrivate; static char *state_name(Ascon *a) { switch (a->state) { case AsconNotConnected: return "AsconNotConnected"; break; case AsconConnectStart: return "AsconConnectStart"; break; case AsconConnecting: return "AsconConnecting"; break; case AsconConnectDone: return "AsconConnectDone"; break; case AsconWriteStart: return "AsconWriteStart"; break; case AsconWriting: return "AsconWriting"; break; case AsconWriteDone: return "AsconWriteDone"; break; case AsconReadStart: return "AsconReadStart"; break; case AsconReading: return "AsconReading"; break; case AsconReadDone: return "AsconReadDone"; break; case AsconIdle: return "AsconIdle"; break; case AsconFailed: return "AsconFailed"; break; case AsconTimeout: return "AsconTimeout"; break; case AsconMaxState: return "AsconMaxState"; break; default: return "Unknown Ascon State"; break; } } /** * \brief handle notification callbacks from the unit */ static void SCAQ_Notify(void *context, int event) { Ascon *a = (Ascon*) context; pPrivate pp = (pPrivate) a->private; pTXN txn = &pp->txn; char line[132]; if (1 == txn->transWait) txn->transWait = -1; /* TIMEOUT */ switch (event) { case AQU_DISCONNECT: snprintf(line, 132, "Disconnect on aqadapter to '%s'", pp->queue_name); SICSLogWrite(line, eStatus); break; case AQU_RECONNECT: snprintf(line, 132, "Reconnect on aqadapter to '%s'", pp->queue_name); SICSLogWrite(line, eStatus); break; } } /** * \brief TransCallback is the callback for the general command transaction. */ static int TransCallback(pAsyncTxn pCmd) { char *resp = pCmd->inp_buf; int resp_len = pCmd->inp_idx; Ascon *a = (Ascon *) pCmd->cntx; pPrivate pp = (pPrivate) a->private; pTXN self = &pp->txn; if (resp_len > 0) { DynStringConcatBytes(a->rdBuffer, resp, resp_len); } if (pCmd->txn_status == ATX_TIMEOUT) { self->transWait = -1; } else { self->transWait = 0; } return 0; } static void SCAQTransact(Ascon *a) { pPrivate pp = NULL; pTXN txn = NULL; AsyncUnit *unit = NULL; const char *command = GetCharArray(a->wrBuffer); int cmd_len = GetDynStringLength(a->wrBuffer); int rsp_len = 1024; pp = (pPrivate) a->private; assert(pp); unit = pp->unit; assert(unit); txn = &pp->txn; txn->transWait = 1; DynStringClear(a->rdBuffer); if (cmd_len > 3 && strncmp(&command[cmd_len - 3], "{0}", 3) == 0) { rsp_len = 0; cmd_len -= 3; } else if (cmd_len > 11 && strncasecmp(&command[cmd_len - 11], "@@NOREPLY@@", 11) == 0) { rsp_len = 0; cmd_len -= 11; } AsyncUnitSendTxn(unit, command, cmd_len, TransCallback, a, rsp_len); } static int scaqaNullHandler(Ascon *a) { const char *command; int cmd_len; struct timeval tv; pPrivate pp = (pPrivate) a->private; switch (a->state) { case AsconNotConnected: return 0; case AsconConnectStart: a->state = AsconConnecting; return 0; case AsconConnecting: a->state = AsconConnectDone; return 0; case AsconConnectDone: /* should not get here */ a->state = AsconIdle; return 0; case AsconWriteStart: a->state = AsconWriting; return 0; case AsconWriting: /* Log this as a Warning */ command = GetCharArray(a->wrBuffer); cmd_len = GetDynStringLength(a->wrBuffer); gettimeofday(&tv, NULL); SICSLogWriteTime("WARNING: writing to NULL aqadapter", eWarning, &tv); SICSLogWriteHexTime(command, cmd_len, eWarning, &tv); a->state = AsconWriteDone; return 0; case AsconWriteDone: /* should not get here */ a->state = AsconReadStart; return 0; case AsconReadStart: /* Log this as a Warning and Return an ASCERR message */ SICSLogWrite("ASCERR: Reading from NULL aqadapter", eWarning); DynStringCopy(a->rdBuffer, "ASCERR: Reading from NULL aqadapter"); a->state = AsconReadDone; return 0; case AsconReadDone: /* should not get here */ return 0; case AsconIdle: return 0; case AsconFailed: a->state = AsconConnectStart; return 0; case AsconTimeout: /* should not get here */ a->state = AsconIdle; return 0; case AsconMaxState: return 0; default: return 0; } } static int scaqaProtHandler(Ascon *a) { pPrivate pp = (pPrivate) a->private; if (NULL == pp->unit) return scaqaNullHandler(a); switch (a->state) { case AsconNotConnected: return 0; case AsconConnectStart: a->state = AsconConnecting; return 0; case AsconConnecting: if (1 == AsyncUnitIsQueueConnected(pp->unit)) { a->state = AsconConnectDone; } return 0; case AsconConnectDone: /* should not get here */ a->state = AsconIdle; return 0; case AsconWriteStart: if (0 == AsyncUnitIsQueueConnected(pp->unit)) { AsconError(a, "Disconnected", 0); a->state = AsconFailed; return 0; } a->state = AsconWriting; return 0; case AsconWriting: if (0 == AsyncUnitIsQueueConnected(pp->unit)) { AsconError(a, "Disconnected", 0); a->state = AsconFailed; return 0; } SCAQTransact(a); a->state = AsconWriteDone; return 0; case AsconWriteDone: /* should not get here */ a->state = AsconReadStart; return 0; case AsconReadStart: if (0 == AsyncUnitIsQueueConnected(pp->unit)) { AsconError(a, "Disconnected", 0); a->state = AsconFailed; return 0; } a->state = AsconReading; return 0; case AsconReading: if (0 == AsyncUnitIsQueueConnected(pp->unit)) { AsconError(a, "Disconnected", 0); a->state = AsconFailed; return 0; } if (pp->txn.transWait < 0) { if (GetDynStringLength(a->rdBuffer) == 0) { a->state = AsconTimeout; } else { a->state = AsconReadDone; } } else if (pp->txn.transWait == 0) { a->state = AsconReadDone; } return 0; case AsconReadDone: /* should not get here */ return 0; case AsconIdle: return 0; case AsconFailed: if (1 == AsyncUnitIsQueueConnected(pp->unit)) { a->state = AsconConnectStart; return 0; } return 0; case AsconTimeout: /* should not get here */ a->state = AsconIdle; return 0; case AsconMaxState: return 0; default: return 0; } } /* * Kill the private storage * * Clean up and release all resources associated with the private object */ static void SCAQ_KillPrivate(void *vp) { pPrivate pp = (pPrivate) vp; if (pp) { if (pp->unit) { AsyncUnit *asyncUnit = (AsyncUnit *) pp->unit; AsyncUnitDestroy(asyncUnit); } if (pp->queue_name) free(pp->queue_name); free(pp); } } /* * Initialize the Ascon object for this device, the async queue argument. */ static int scaqaAsconInit(Ascon *a, SConnection *pCon, int argc, char *argv[]) { int i; AsyncUnit *asyncUnit; pPrivate pp; for (i = 0; i < argc; ++i) { SCPrintf(pCon, eStatus, "scaqaAsconInit: arg[%d] = %s\n", i, argv[i]); } if (argc < 1) { SCPrintf(pCon, eError, "Insufficient arguments to scaqaAsconInit: %d\n", argc); return 0; } if (strcasecmp("null", argv[1]) == 0) { asyncUnit = NULL; } else if (!AsyncUnitCreate(argv[1], &asyncUnit)) { SCPrintf(pCon, eError, "Cannot find AsyncQueue '%s' when creating script context adapter '%s'", argv[1], argv[0]); return 0; } pp = (pPrivate) calloc(sizeof(Private), 1); pp->unit = asyncUnit; pp->queue_name = strdup(argv[1]); pp->txn.transWait = 0; a->private = pp; a->hostport = strdup(argv[1]); a->killPrivate = SCAQ_KillPrivate; if (asyncUnit) AsyncUnitSetNotify(asyncUnit, a, SCAQ_Notify); return 1; } /* * This procedure creates, initializes and registers the "scaqa" protocol * with the Ascon infrastructure */ void AddSCAQAProtocol(void) { AsconProtocol *prot = NULL; printf("AddSCAQAProtocol\n"); prot = calloc(sizeof(AsconProtocol), 1); prot->name = strdup("aqadapter"); prot->init = scaqaAsconInit; prot->handler = scaqaProtHandler; AsconInsertProtocol(prot); }