#include #include "sics.h" #include "ascon.h" #include "ascon.i" #include "syncedprot.h" /* * Markus Zolliker Aug 2012 * * This is a scriptcontext driver 'connecting' to its own sics server. * The argument of the send command is a sync id. * The next script of the scriptchain is called when all actions related to * this sync id are finished or on timeout. * * In addition the functions needed to implement other mechanisms than scriptcontext to * support this type of synchronization are implemented. * * the command sync with its subcommands begin/end/incr/decr/pending act as the corresponding * functions SyncedBegin, SyncedEnd, SyncedIncr, SyncedDecr, SyncedPending * * Usage of the syncedprot: * ------------------------ * * For asynchronous execution of processes synchronized with commands implemented with * scriptcontext machinery, you create an object with scriptcontext using syncprot as a protocol. * * with the following helper proc: * * proc sctsync {code} { * set id [synced begin] * uplevel 1 $code * synced end $id * sct send $id * } * * In action scripts, you do all actions within the body of the sctsync command. The next script * in the script chanin is not exectuted before all these actions are terminated. * * * Usage outside of scriptcontext: * ------------------------------- * * additional command to wait for a sync id * * synced wait [] * * * if the following proc is implemented in tcl * * proc synceddo {code} { * set id [synced begin] * uplevel 1 $code * synced end $id * synced wait $id * } * * you might do something like: * * synceddo {hset /path value} * * or * synceddo { * hset /path value * ctrl queue /path2 write * } * * the synceddo command does finish when the inititated actions have finished */ typedef struct SyncedData { struct SyncedData *next; long id; long count; } SyncedData; #define NSTACK 100 static SyncedData *stack[NSTACK]; /* the stack is limited in order to detect missing calls to SyncedEnd */ static int sp = 0; static SyncedData *syncList = NULL; static SyncedData *actualSync = NULL; static long nextId = 1; SyncedData *SyncedFind(long id) { SyncedData *s; for (s = syncList; s != NULL; s = s->next) { if (s->id == id) { return s; } } return NULL; } SyncedData *SyncedNew(void) { SyncedData *new = NULL; SyncedData *s, **ptr2previous, *tobekilled; int cnt; long id; id = 0; for (cnt = 1000; id == 0 && cnt > 0; cnt--) { id = nextId; nextId++; if (nextId > 999999999) { nextId = 1; } ptr2previous = &syncList; for (s = syncList; s != NULL; ) { if (s->id == id) { id = 0; break; } if (s->count <= 0) { /* remove unused syncs */ *ptr2previous = s->next; tobekilled = s; s = s->next; if (tobekilled == actualSync) { actualSync = NULL; } free(tobekilled); } else { ptr2previous = &s->next; s = s->next; } }; } if (id == 0) { return NULL; } new = calloc(1,sizeof *new); if (new) { /* append at the end */ new->count = 0; new->id = id; *ptr2previous = new; new->next = NULL; } return new; } long SyncedGet(void) { if (actualSync == NULL) { return SYNCED_NO_ID; } return actualSync->id; } long SyncedBegin(long syncid) { SyncedData *sync; if (syncid < 0) return syncid; stack[sp] = actualSync; sync = SyncedFind(syncid); if (sync == NULL) { sync = SyncedNew(); if (sync == NULL) { return SYNCED_NO_MEMORY; } } if (sp >= NSTACK) { return SYNCED_STACK_OVERFLOW; } sp++; sync->count++; actualSync = sync; return sync->id; } long SyncedEnd(long syncid) { SyncedData *sync; long id; if (syncid < 0) return syncid; if (sp <= 0) { return SYNCED_STACK_UNDERFLOW; } sp--; sync = actualSync; actualSync = stack[sp]; if (sync->count <= 0) { return SYNCED_COUNT_UNDERFLOW; } sync->count--; if (syncid != 0 && syncid != sync->id) { return SYNCED_ID_MISMATCH; } return sync->id; } long SyncedIncr(long syncid) { SyncedData *sync; if (syncid < 0) return syncid; if (syncid == 0) { sync = actualSync; } else { sync = SyncedFind(syncid); } if (sync == NULL) { return SYNCED_NOT_FOUND; } sync->count++; return sync->id; } long SyncedDecr(long syncid) { SyncedData *sync; if (syncid < 0) return syncid; if (syncid == 0) { sync = actualSync; } else { sync = SyncedFind(syncid); } if (sync == NULL) { return SYNCED_NOT_FOUND; } if (sync->count <= 0) { return SYNCED_COUNT_UNDERFLOW; } sync->count--; return sync->id; } int SyncedPending(long syncid) { SyncedData *sync; if (syncid == 0) { return (actualSync != NULL); /* this makes no sense: actual syncid is always pending */ } sync = SyncedFind(syncid); if (sync == NULL || sync->count <= 0) { return 0; } return 1; } int SyncedCommand(SConnection * con, SicsInterp * sics, void *object, int argc, char *argv[]) { long ret; double start, tmo; long syncid; char *errmsg; SyncedData *sync; if (argc < 2) { goto badsyntax; } if (argc >= 3) { syncid = atol(argv[2]); if (syncid < 0) { SCPrintf(con, eError, "ERROR: Usage: illegal sync id"); } } else { syncid = 0; } if (strcasecmp(argv[1], "wait") == 0) { if (argc == 3) { tmo = 365 * 24 * 3600; } else if (argc == 4) { tmo = atof(argv[3]); } else { SCPrintf(con, eError, "ERROR: Usage: synced wait []"); return 0; } start = DoubleTime(); if (syncid == 0) { SCPrintf(con, eError, "ERROR: missing sync id"); return 0; } sync = SyncedFind(syncid); if (sync == NULL || sync->count <= 0) { /* finished */ return 1; } while (sync->count > 0) { TaskYield(pServ->pTasker); if (SCGetInterrupt(con) != eContinue) { SCPrintf(con, eError, "ERROR: interrupted"); return 0; } if (DoubleTime() > start + tmo) { SCPrintf(con, eError, "ERROR: timeout"); return 0; } } return 1; } if (argc > 3) { SCPrintf(con, eError, "ERROR: Usage: synced %s []", argv[1]); return 0; } if (strcasecmp(argv[1], "begin") == 0) { ret = SyncedBegin(syncid); } else if (strcasecmp(argv[1], "end") == 0) { ret = SyncedEnd(syncid); } else if (strcasecmp(argv[1], "incr") == 0) { ret = SyncedIncr(syncid); } else if (strcasecmp(argv[1], "decr") == 0) { ret = SyncedDecr(syncid); } else if (strcasecmp(argv[1], "pending") == 0) { ret = SyncedPending(syncid); } else { goto badsyntax; } if (ret >= 0) { SCPrintf(con, eValue, "%ld", ret); return 1; } switch (ret) { case SYNCED_NO_ID: errmsg = "no sync id defined"; break; case SYNCED_NO_MEMORY: errmsg = "no memory"; break; case SYNCED_STACK_OVERFLOW: errmsg = "stack overflow"; break; case SYNCED_STACK_UNDERFLOW: errmsg = "stack underflow"; break; case SYNCED_COUNT_UNDERFLOW: errmsg = "count underflow"; break; case SYNCED_ID_MISMATCH: errmsg = "sync id mismatch"; break; case SYNCED_NOT_FOUND: errmsg = "sync id not found"; break; default: errmsg = "unknown error"; break; } SCPrintf(con, eError, "ERROR: %s", errmsg); return 0; badsyntax: SCPrintf(con, eError, "ERROR: Usage: synced (begin|end|incr|decr|pending|wait) [ []]"); return 0; } /*----------------------------------------------------------------------------*/ int SyncedProtHandler(Ascon *a) { Tcl_Interp *pTcl; int ret; char *result = NULL; int iRet = 1; long *syncid; switch (a->state) { case AsconConnectStart: case AsconConnecting: a->state = AsconConnectDone; break; case AsconWriteStart: syncid = a->private; *syncid = atol(GetCharArray(a->wrBuffer)); a->start = DoubleTime(); /* fall through */ case AsconWriting: a->state = AsconWriteDone; break; case AsconReadStart: case AsconReading: syncid = a->private; if (!SyncedPending(*syncid)) { a->state = AsconReadDone; } else if (a->timeout > 0) { if (DoubleTime() - a->start > a->timeout) { AsconError(a, "timeout", 0); a->state = AsconTimeout; } } break; } return 0; } /*----------------------------------------------------------------------------*/ static void SyncedKillPrivate(void *private) { if (private) { free(private); } } /*----------------------------------------------------------------------------*/ static int SyncedProtInit(Ascon * a, SConnection * con, int argc, char *argv[]) { a->timeout = 0.0; if (argc > 1) { /* a dummy hostport is used, here a unique name should be given */ a->hostport = strdup(argv[1]); if (argc > 2) { a->timeout = atof(argv[2]); } } else { a->hostport = strdup("synced"); /* WARNING: this name is not unique, but used by Marks new Task manager */ } a->private = calloc(1, sizeof(long)); a->killPrivate = SyncedKillPrivate; return 1; } /*----------------------------------------------------------------------------*/ void AddSyncedProt() { static AsconProtocol Syncedprot; Syncedprot.name = "syncedprot"; Syncedprot.handler = SyncedProtHandler; Syncedprot.init = SyncedProtInit; AsconInsertProtocol(&Syncedprot); AddCmd("synced", SyncedCommand); }