- added syncedprot
This commit is contained in:
410
syncedprot.c
Normal file
410
syncedprot.c
Normal file
@ -0,0 +1,410 @@
|
||||
#include <stdio.h>
|
||||
#include "sics.h"
|
||||
#include "ascon.h"
|
||||
#include "ascon.i"
|
||||
#include "syncedprot.h"
|
||||
|
||||
/*
|
||||
* Markus Zolliker Aug 2012
|
||||
*
|
||||
* This is a scriptcontext driver 'connecting' to its own sics server.
|
||||
* The argument of the send command is a sync id.
|
||||
* The next script of the scriptchain is called when all actions realted to
|
||||
* this sync id are finished or on timeout.
|
||||
*
|
||||
* In addition the functions needed to implement other mechanisms than scriptcontext to
|
||||
* support this type of synchronization are implemented.
|
||||
*
|
||||
* the command sync with its subcommands begin/end/incr/decr/pending act as the corresponding
|
||||
* functions SyncedBegin, SyncedEnd, SyncedIncr, SyncedDecr, SyncedPending
|
||||
*
|
||||
* Usage of the syncedprot:
|
||||
* ------------------------
|
||||
*
|
||||
* For asynchronous execution of processes synchronized with commands implemented with
|
||||
* scriptcontext machinery, you create an object with scriptcontext using syncprot as a protocol.
|
||||
*
|
||||
* with the following helper proc:
|
||||
*
|
||||
* proc sctsync {code} {
|
||||
* set id [synced begin]
|
||||
* uplevel 1 $code
|
||||
* synced end $id
|
||||
* sct send $id
|
||||
* }
|
||||
*
|
||||
* In action scripts, you do all actions within the body of the sctsync command. The next script
|
||||
* in the script chanin is not exectuted before all these actions are terminated.
|
||||
*
|
||||
*
|
||||
* Usage outside of scriptcontext:
|
||||
* -------------------------------
|
||||
*
|
||||
* additional command to wait for a sync id
|
||||
*
|
||||
* synced wait <sync id> [<timeout>]
|
||||
*
|
||||
*
|
||||
* if the following proc is implemented in tcl
|
||||
*
|
||||
* proc synceddo {code} {
|
||||
* set id [synced begin]
|
||||
* uplevel 1 $code
|
||||
* synced end $id
|
||||
* synced wait $id
|
||||
* }
|
||||
*
|
||||
* you might do something like:
|
||||
*
|
||||
* synceddo {hset /path value}
|
||||
*
|
||||
* or
|
||||
* synceddo {
|
||||
* hset /path value
|
||||
* ctrl queue /path2 write
|
||||
* }
|
||||
*
|
||||
* the synceddo command does finish when the inititated actions have finished
|
||||
*/
|
||||
|
||||
typedef struct SyncedData {
|
||||
struct SyncedData *next;
|
||||
long id;
|
||||
long count;
|
||||
} SyncedData;
|
||||
|
||||
#define NSTACK 100
|
||||
|
||||
static SyncedData *stack[NSTACK]; /* the stack is limited in order to detect missing calls to SyncedEnd */
|
||||
static int sp = 0;
|
||||
|
||||
static SyncedData *syncList = NULL;
|
||||
static SyncedData *actualSync = NULL;
|
||||
static long nextId = 1;
|
||||
|
||||
SyncedData *SyncedFind(long id)
|
||||
{
|
||||
SyncedData *s;
|
||||
for (s = syncList; s != NULL; s = s->next) {
|
||||
if (s->id == id) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SyncedData *SyncedNew(void)
|
||||
{
|
||||
SyncedData *new = NULL;
|
||||
SyncedData *s, **ptr2previous, *tobekilled;
|
||||
int cnt;
|
||||
long id;
|
||||
|
||||
id = 0;
|
||||
for (cnt = 1000; id == 0 && cnt > 0; cnt--) {
|
||||
id = nextId;
|
||||
nextId++;
|
||||
if (nextId > 999999999) {
|
||||
nextId = 1;
|
||||
}
|
||||
ptr2previous = &syncList;
|
||||
for (s = syncList; s != NULL; ) {
|
||||
if (s->id == id) {
|
||||
id = 0;
|
||||
break;
|
||||
}
|
||||
if (s->count <= 0) {
|
||||
/* remove unused syncs */
|
||||
*ptr2previous = s->next;
|
||||
tobekilled = s;
|
||||
s = s->next;
|
||||
if (tobekilled == actualSync) {
|
||||
actualSync = NULL;
|
||||
}
|
||||
free(tobekilled);
|
||||
} else {
|
||||
ptr2previous = &s->next;
|
||||
s = s->next;
|
||||
}
|
||||
};
|
||||
}
|
||||
if (id == 0) {
|
||||
return NULL;
|
||||
}
|
||||
new = calloc(1,sizeof *new);
|
||||
if (new) {
|
||||
/* append at the end */
|
||||
new->count = 0;
|
||||
new->id = id;
|
||||
*ptr2previous = new;
|
||||
new->next = NULL;
|
||||
}
|
||||
return new;
|
||||
}
|
||||
|
||||
long SyncedGet(void)
|
||||
{
|
||||
if (actualSync == NULL) {
|
||||
return SYNCED_NO_ID;
|
||||
}
|
||||
return actualSync->id;
|
||||
}
|
||||
|
||||
long SyncedBegin(long syncid)
|
||||
{
|
||||
SyncedData *sync;
|
||||
|
||||
if (syncid < 0) return syncid;
|
||||
stack[sp] = actualSync;
|
||||
sync = SyncedFind(syncid);
|
||||
if (sync == NULL) {
|
||||
sync = SyncedNew();
|
||||
if (sync == NULL) {
|
||||
return SYNCED_NO_MEMORY;
|
||||
}
|
||||
}
|
||||
sp++;
|
||||
if (sp >= NSTACK) {
|
||||
return SYNCED_STACK_OVERFLOW;
|
||||
}
|
||||
sync->count++;
|
||||
actualSync = sync;
|
||||
return sync->id;
|
||||
}
|
||||
|
||||
long SyncedEnd(long syncid)
|
||||
{
|
||||
SyncedData *sync;
|
||||
long id;
|
||||
|
||||
if (syncid < 0) return syncid;
|
||||
if (sp <= 0) {
|
||||
return SYNCED_STACK_UNDERFLOW;
|
||||
}
|
||||
sync = actualSync;
|
||||
sp--;
|
||||
actualSync = stack[sp];
|
||||
sync->count--;
|
||||
if (sync->count < 0) {
|
||||
return SYNCED_COUNT_UNDERFLOW;
|
||||
}
|
||||
if (syncid != 0 && syncid != sync->id) {
|
||||
return SYNCED_ID_MISMATCH;
|
||||
}
|
||||
return sync->id;
|
||||
}
|
||||
|
||||
long SyncedIncr(long syncid)
|
||||
{
|
||||
SyncedData *sync;
|
||||
|
||||
if (syncid < 0) return syncid;
|
||||
if (syncid == 0) {
|
||||
sync = actualSync;
|
||||
} else {
|
||||
sync = SyncedFind(syncid);
|
||||
}
|
||||
if (sync == NULL) {
|
||||
return SYNCED_NOT_FOUND;
|
||||
}
|
||||
sync->count++;
|
||||
return sync->id;
|
||||
}
|
||||
|
||||
long SyncedDecr(long syncid)
|
||||
{
|
||||
SyncedData *sync;
|
||||
|
||||
if (syncid < 0) return syncid;
|
||||
if (syncid == 0) {
|
||||
sync = actualSync;
|
||||
} else {
|
||||
sync = SyncedFind(syncid);
|
||||
}
|
||||
if (sync == NULL) {
|
||||
return SYNCED_NOT_FOUND;
|
||||
}
|
||||
if (sync->count <= 0) {
|
||||
return SYNCED_COUNT_UNDERFLOW;
|
||||
}
|
||||
sync->count--;
|
||||
return sync->id;
|
||||
}
|
||||
|
||||
int SyncedPending(long syncid)
|
||||
{
|
||||
SyncedData *sync;
|
||||
|
||||
if (syncid == 0) {
|
||||
return (actualSync != NULL); /* this makes no sense: actual syncid is always pending */
|
||||
}
|
||||
sync = SyncedFind(syncid);
|
||||
if (sync == NULL || sync->count <= 0) {
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
int SyncedCommand(SConnection * con, SicsInterp * sics, void *object,
|
||||
int argc, char *argv[])
|
||||
{
|
||||
long ret;
|
||||
double start, tmo;
|
||||
long syncid;
|
||||
char *errmsg;
|
||||
SyncedData *sync;
|
||||
|
||||
if (argc < 2) {
|
||||
goto badsyntax;
|
||||
}
|
||||
if (argc >= 3) {
|
||||
syncid = atol(argv[2]);
|
||||
if (syncid < 0) {
|
||||
SCPrintf(con, eError, "ERROR: Usage: illegal sync id");
|
||||
}
|
||||
} else {
|
||||
syncid = 0;
|
||||
}
|
||||
if (strcasecmp(argv[1], "wait") == 0) {
|
||||
if (argc == 3) {
|
||||
tmo = 365 * 24 * 3600;
|
||||
} else if (argc == 4) {
|
||||
tmo = atof(argv[3]);
|
||||
} else {
|
||||
SCPrintf(con, eError, "ERROR: Usage: synced wait <sync id> [<tmo>]");
|
||||
return 0;
|
||||
}
|
||||
start = DoubleTime();
|
||||
if (syncid == 0) {
|
||||
SCPrintf(con, eError, "ERROR: missing sync id");
|
||||
return 0;
|
||||
}
|
||||
sync = SyncedFind(syncid);
|
||||
if (sync == NULL || sync->count <= 0) { /* finished */
|
||||
return 1;
|
||||
}
|
||||
while (sync->count > 0) {
|
||||
TaskYield(pServ->pTasker);
|
||||
if (SCGetInterrupt(con) != eContinue) {
|
||||
SCPrintf(con, eError, "ERROR: interrupted");
|
||||
return 0;
|
||||
}
|
||||
if (DoubleTime() > start + tmo) {
|
||||
SCPrintf(con, eError, "ERROR: timeout");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (argc > 3) {
|
||||
SCPrintf(con, eError, "ERROR: Usage: synced %s [<sync id>]", argv[1]);
|
||||
return 0;
|
||||
}
|
||||
if (strcasecmp(argv[1], "begin") == 0) {
|
||||
ret = SyncedBegin(syncid);
|
||||
} else if (strcasecmp(argv[1], "end") == 0) {
|
||||
ret = SyncedEnd(syncid);
|
||||
} else if (strcasecmp(argv[1], "incr") == 0) {
|
||||
ret = SyncedIncr(syncid);
|
||||
} else if (strcasecmp(argv[1], "decr") == 0) {
|
||||
ret = SyncedDecr(syncid);
|
||||
} else if (strcasecmp(argv[1], "pending") == 0) {
|
||||
ret = SyncedPending(syncid);
|
||||
} else {
|
||||
goto badsyntax;
|
||||
}
|
||||
if (ret >= 0) {
|
||||
SCPrintf(con, eValue, "%ld", ret);
|
||||
return 1;
|
||||
}
|
||||
switch (ret) {
|
||||
case SYNCED_NO_ID: errmsg = "no sync id defined"; break;
|
||||
case SYNCED_NO_MEMORY: errmsg = "no memory"; break;
|
||||
case SYNCED_STACK_OVERFLOW: errmsg = "stack overflow"; break;
|
||||
case SYNCED_STACK_UNDERFLOW: errmsg = "stack underflow"; break;
|
||||
case SYNCED_COUNT_UNDERFLOW: errmsg = "count underflow"; break;
|
||||
case SYNCED_ID_MISMATCH: errmsg = "sync id mismatch"; break;
|
||||
case SYNCED_NOT_FOUND: errmsg = "sync id not found"; break;
|
||||
default: errmsg = "unknown error"; break;
|
||||
}
|
||||
SCPrintf(con, eError, "ERROR: %s", errmsg);
|
||||
return 0;
|
||||
badsyntax:
|
||||
SCPrintf(con, eError, "ERROR: Usage: synced (begin|end|incr|decr|pending|wait) [<sync id> [<tmo>]]");
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
int SyncedProtHandler(Ascon *a)
|
||||
{
|
||||
Tcl_Interp *pTcl;
|
||||
int ret;
|
||||
|
||||
char *result = NULL;
|
||||
int iRet = 1;
|
||||
long *syncid;
|
||||
|
||||
switch (a->state) {
|
||||
case AsconConnectStart:
|
||||
case AsconConnecting:
|
||||
a->state = AsconConnectDone;
|
||||
break;
|
||||
case AsconWriteStart:
|
||||
syncid = a->private;
|
||||
*syncid = atol(GetCharArray(a->wrBuffer));
|
||||
a->start = DoubleTime();
|
||||
|
||||
/* fall through */
|
||||
case AsconWriting:
|
||||
a->state = AsconWriteDone;
|
||||
break;
|
||||
case AsconReadStart:
|
||||
case AsconReading:
|
||||
syncid = a->private;
|
||||
if (!SyncedPending(*syncid)) {
|
||||
a->state = AsconReadDone;
|
||||
} else if (a->timeout > 0) {
|
||||
if (DoubleTime() - a->start > a->timeout) {
|
||||
AsconError(a, "timeout", 0);
|
||||
a->state = AsconTimeout;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
static void SyncedKillPrivate(void *private) {
|
||||
if (private) {
|
||||
free(private);
|
||||
}
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
static int SyncedProtInit(Ascon * a, SConnection * con, int argc, char *argv[])
|
||||
{
|
||||
if (argc == 2) {
|
||||
a->timeout = atof(argv[1]);
|
||||
} else if (argc > 2) {
|
||||
/* a dummy hostport is used */
|
||||
a->timeout = atof(argv[2]);
|
||||
} else {
|
||||
a->timeout = 0.0;
|
||||
}
|
||||
a->private = calloc(1, sizeof(long));
|
||||
a->killPrivate = SyncedKillPrivate;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
void AddSyncedProt()
|
||||
{
|
||||
static AsconProtocol Syncedprot;
|
||||
Syncedprot.name = "syncedprot";
|
||||
Syncedprot.handler = SyncedProtHandler;
|
||||
Syncedprot.init = SyncedProtInit;
|
||||
AsconInsertProtocol(&Syncedprot);
|
||||
AddCmd("synced", SyncedCommand);
|
||||
}
|
Reference in New Issue
Block a user