diff --git a/syncedprot.c b/syncedprot.c new file mode 100644 index 00000000..743882a3 --- /dev/null +++ b/syncedprot.c @@ -0,0 +1,410 @@ +#include +#include "sics.h" +#include "ascon.h" +#include "ascon.i" +#include "syncedprot.h" + +/* + * Markus Zolliker Aug 2012 + * + * This is a scriptcontext driver 'connecting' to its own sics server. + * The argument of the send command is a sync id. + * The next script of the scriptchain is called when all actions realted to + * this sync id are finished or on timeout. + * + * In addition the functions needed to implement other mechanisms than scriptcontext to + * support this type of synchronization are implemented. + * + * the command sync with its subcommands begin/end/incr/decr/pending act as the corresponding + * functions SyncedBegin, SyncedEnd, SyncedIncr, SyncedDecr, SyncedPending + * + * Usage of the syncedprot: + * ------------------------ + * + * For asynchronous execution of processes synchronized with commands implemented with + * scriptcontext machinery, you create an object with scriptcontext using syncprot as a protocol. + * + * with the following helper proc: + * + * proc sctsync {code} { + * set id [synced begin] + * uplevel 1 $code + * synced end $id + * sct send $id + * } + * + * In action scripts, you do all actions within the body of the sctsync command. The next script + * in the script chanin is not exectuted before all these actions are terminated. + * + * + * Usage outside of scriptcontext: + * ------------------------------- + * + * additional command to wait for a sync id + * + * synced wait [] + * + * + * if the following proc is implemented in tcl + * + * proc synceddo {code} { + * set id [synced begin] + * uplevel 1 $code + * synced end $id + * synced wait $id + * } + * + * you might do something like: + * + * synceddo {hset /path value} + * + * or + * synceddo { + * hset /path value + * ctrl queue /path2 write + * } + * + * the synceddo command does finish when the inititated actions have finished + */ + +typedef struct SyncedData { + struct SyncedData *next; + long id; + long count; +} SyncedData; + +#define NSTACK 100 + +static SyncedData *stack[NSTACK]; /* the stack is limited in order to detect missing calls to SyncedEnd */ +static int sp = 0; + +static SyncedData *syncList = NULL; +static SyncedData *actualSync = NULL; +static long nextId = 1; + +SyncedData *SyncedFind(long id) +{ + SyncedData *s; + for (s = syncList; s != NULL; s = s->next) { + if (s->id == id) { + return s; + } + } + return NULL; +} + +SyncedData *SyncedNew(void) +{ + SyncedData *new = NULL; + SyncedData *s, **ptr2previous, *tobekilled; + int cnt; + long id; + + id = 0; + for (cnt = 1000; id == 0 && cnt > 0; cnt--) { + id = nextId; + nextId++; + if (nextId > 999999999) { + nextId = 1; + } + ptr2previous = &syncList; + for (s = syncList; s != NULL; ) { + if (s->id == id) { + id = 0; + break; + } + if (s->count <= 0) { + /* remove unused syncs */ + *ptr2previous = s->next; + tobekilled = s; + s = s->next; + if (tobekilled == actualSync) { + actualSync = NULL; + } + free(tobekilled); + } else { + ptr2previous = &s->next; + s = s->next; + } + }; + } + if (id == 0) { + return NULL; + } + new = calloc(1,sizeof *new); + if (new) { + /* append at the end */ + new->count = 0; + new->id = id; + *ptr2previous = new; + new->next = NULL; + } + return new; +} + +long SyncedGet(void) +{ + if (actualSync == NULL) { + return SYNCED_NO_ID; + } + return actualSync->id; +} + +long SyncedBegin(long syncid) +{ + SyncedData *sync; + + if (syncid < 0) return syncid; + stack[sp] = actualSync; + sync = SyncedFind(syncid); + if (sync == NULL) { + sync = SyncedNew(); + if (sync == NULL) { + return SYNCED_NO_MEMORY; + } + } + sp++; + if (sp >= NSTACK) { + return SYNCED_STACK_OVERFLOW; + } + sync->count++; + actualSync = sync; + return sync->id; +} + +long SyncedEnd(long syncid) +{ + SyncedData *sync; + long id; + + if (syncid < 0) return syncid; + if (sp <= 0) { + return SYNCED_STACK_UNDERFLOW; + } + sync = actualSync; + sp--; + actualSync = stack[sp]; + sync->count--; + if (sync->count < 0) { + return SYNCED_COUNT_UNDERFLOW; + } + if (syncid != 0 && syncid != sync->id) { + return SYNCED_ID_MISMATCH; + } + return sync->id; +} + +long SyncedIncr(long syncid) +{ + SyncedData *sync; + + if (syncid < 0) return syncid; + if (syncid == 0) { + sync = actualSync; + } else { + sync = SyncedFind(syncid); + } + if (sync == NULL) { + return SYNCED_NOT_FOUND; + } + sync->count++; + return sync->id; +} + +long SyncedDecr(long syncid) +{ + SyncedData *sync; + + if (syncid < 0) return syncid; + if (syncid == 0) { + sync = actualSync; + } else { + sync = SyncedFind(syncid); + } + if (sync == NULL) { + return SYNCED_NOT_FOUND; + } + if (sync->count <= 0) { + return SYNCED_COUNT_UNDERFLOW; + } + sync->count--; + return sync->id; +} + +int SyncedPending(long syncid) +{ + SyncedData *sync; + + if (syncid == 0) { + return (actualSync != NULL); /* this makes no sense: actual syncid is always pending */ + } + sync = SyncedFind(syncid); + if (sync == NULL || sync->count <= 0) { + return 0; + } + return 1; +} + +int SyncedCommand(SConnection * con, SicsInterp * sics, void *object, + int argc, char *argv[]) +{ + long ret; + double start, tmo; + long syncid; + char *errmsg; + SyncedData *sync; + + if (argc < 2) { + goto badsyntax; + } + if (argc >= 3) { + syncid = atol(argv[2]); + if (syncid < 0) { + SCPrintf(con, eError, "ERROR: Usage: illegal sync id"); + } + } else { + syncid = 0; + } + if (strcasecmp(argv[1], "wait") == 0) { + if (argc == 3) { + tmo = 365 * 24 * 3600; + } else if (argc == 4) { + tmo = atof(argv[3]); + } else { + SCPrintf(con, eError, "ERROR: Usage: synced wait []"); + return 0; + } + start = DoubleTime(); + if (syncid == 0) { + SCPrintf(con, eError, "ERROR: missing sync id"); + return 0; + } + sync = SyncedFind(syncid); + if (sync == NULL || sync->count <= 0) { /* finished */ + return 1; + } + while (sync->count > 0) { + TaskYield(pServ->pTasker); + if (SCGetInterrupt(con) != eContinue) { + SCPrintf(con, eError, "ERROR: interrupted"); + return 0; + } + if (DoubleTime() > start + tmo) { + SCPrintf(con, eError, "ERROR: timeout"); + return 0; + } + } + return 1; + } + + if (argc > 3) { + SCPrintf(con, eError, "ERROR: Usage: synced %s []", argv[1]); + return 0; + } + if (strcasecmp(argv[1], "begin") == 0) { + ret = SyncedBegin(syncid); + } else if (strcasecmp(argv[1], "end") == 0) { + ret = SyncedEnd(syncid); + } else if (strcasecmp(argv[1], "incr") == 0) { + ret = SyncedIncr(syncid); + } else if (strcasecmp(argv[1], "decr") == 0) { + ret = SyncedDecr(syncid); + } else if (strcasecmp(argv[1], "pending") == 0) { + ret = SyncedPending(syncid); + } else { + goto badsyntax; + } + if (ret >= 0) { + SCPrintf(con, eValue, "%ld", ret); + return 1; + } + switch (ret) { + case SYNCED_NO_ID: errmsg = "no sync id defined"; break; + case SYNCED_NO_MEMORY: errmsg = "no memory"; break; + case SYNCED_STACK_OVERFLOW: errmsg = "stack overflow"; break; + case SYNCED_STACK_UNDERFLOW: errmsg = "stack underflow"; break; + case SYNCED_COUNT_UNDERFLOW: errmsg = "count underflow"; break; + case SYNCED_ID_MISMATCH: errmsg = "sync id mismatch"; break; + case SYNCED_NOT_FOUND: errmsg = "sync id not found"; break; + default: errmsg = "unknown error"; break; + } + SCPrintf(con, eError, "ERROR: %s", errmsg); + return 0; +badsyntax: + SCPrintf(con, eError, "ERROR: Usage: synced (begin|end|incr|decr|pending|wait) [ []]"); + return 0; +} + +/*----------------------------------------------------------------------------*/ +int SyncedProtHandler(Ascon *a) +{ + Tcl_Interp *pTcl; + int ret; + + char *result = NULL; + int iRet = 1; + long *syncid; + + switch (a->state) { + case AsconConnectStart: + case AsconConnecting: + a->state = AsconConnectDone; + break; + case AsconWriteStart: + syncid = a->private; + *syncid = atol(GetCharArray(a->wrBuffer)); + a->start = DoubleTime(); + + /* fall through */ + case AsconWriting: + a->state = AsconWriteDone; + break; + case AsconReadStart: + case AsconReading: + syncid = a->private; + if (!SyncedPending(*syncid)) { + a->state = AsconReadDone; + } else if (a->timeout > 0) { + if (DoubleTime() - a->start > a->timeout) { + AsconError(a, "timeout", 0); + a->state = AsconTimeout; + } + } + break; + } + return 0; +} + +/*----------------------------------------------------------------------------*/ +static void SyncedKillPrivate(void *private) { + if (private) { + free(private); + } +} + +/*----------------------------------------------------------------------------*/ +static int SyncedProtInit(Ascon * a, SConnection * con, int argc, char *argv[]) +{ + if (argc == 2) { + a->timeout = atof(argv[1]); + } else if (argc > 2) { + /* a dummy hostport is used */ + a->timeout = atof(argv[2]); + } else { + a->timeout = 0.0; + } + a->private = calloc(1, sizeof(long)); + a->killPrivate = SyncedKillPrivate; + return 1; +} + +/*----------------------------------------------------------------------------*/ +void AddSyncedProt() +{ + static AsconProtocol Syncedprot; + Syncedprot.name = "syncedprot"; + Syncedprot.handler = SyncedProtHandler; + Syncedprot.init = SyncedProtInit; + AsconInsertProtocol(&Syncedprot); + AddCmd("synced", SyncedCommand); +} diff --git a/syncedprot.h b/syncedprot.h new file mode 100644 index 00000000..7da5ac35 --- /dev/null +++ b/syncedprot.h @@ -0,0 +1,58 @@ +#ifndef SYNCEDPROT_H +#define SYNCEDPROT_H + +/** synchronized execution of scriptcontext and other operations + * + * see sctsyncprot.c for a description of the 'protocol' + * + * M. Zolliker Aug 2012 + */ + +typedef enum { + SYNCED_NO_ID = -1, + SYNCED_NO_MEMORY = -2, + SYNCED_STACK_OVERFLOW = -3, + SYNCED_STACK_UNDERFLOW = -4, + SYNCED_COUNT_UNDERFLOW = -5, + SYNCED_ID_MISMATCH = -6, + SYNCED_NOT_FOUND = -7 +} Synced_Error_Message; + +/** \brief set the actual sync id + * \param syncid the sync id or 0 to reate a new sync id + * \return the created or given sync id or < 0 on error + */ +long SyncedBegin(long syncid); + +/** \brief set the actual sync id back to value before the last call to SyncedBegin + * \param syncid the sync id (for checking) or 0 (for lazy programmers) + * \return the sync id on success or a negative value on failure (Synced_Error_Message) + */ +long SyncedEnd(long syncid); + +/** \brief increment the counter of syncid + * \parameter syncid the syncid to increment + * \return the sync id on success or a negative value on failure (Synced_Error_Message) + */ +long SyncedIncr(long syncid); + +/** \brief decrement the counter of syncid + * \parameter syncid the sync id to decrement (or 0 for using the actual sync id) + * \return the sync id on success, 0 if syncid was 0 and no actual sync id was defined + * or a negative value on failure (Synced_Error_Message) + */ +long SyncedDecr(long syncid); + +/** \brief get the actual syncid + * \return 1 the actual sync id or 0 + */ +long SyncedGet(void); + +/** + * \brief check if an action created with this sync id is pending + * \param syncid the sync id + * \return 1 for pending, 0 for not pending + */ +int SyncedPending(long syncid); + +#endif \ No newline at end of file