Files
sics/syncedprot.c
Ferdi Franceschini 10d29d597c Cleaned up ANSTO code to merge with sinqdev.sics
This is our new RELEASE-4_0 branch which was taken from ansto/93d9a7c
Conflicts:
	.gitignore
	SICSmain.c
	asynnet.c
	confvirtualmot.c
	counter.c
	devexec.c
	drive.c
	event.h
	exebuf.c
	exeman.c
	histmem.c
	interface.h
	motor.c
	motorlist.c
	motorsec.c
	multicounter.c
	napi.c
	napi.h
	napi4.c
	network.c
	nwatch.c
	nxscript.c
	nxxml.c
	nxxml.h
	ofac.c
	reflist.c
	scan.c
	sicshipadaba.c
	sicsobj.c
	site_ansto/docs/Copyright.txt
	site_ansto/instrument/lyrebird/config/tasmad/sicscommon/nxsupport.tcl
	site_ansto/instrument/lyrebird/config/tasmad/taspub_sics/tasscript.tcl
	statusfile.c
	tasdrive.c
	tasub.c
	tasub.h
	tasublib.c
	tasublib.h
2015-04-23 20:49:26 +10:00

413 lines
9.4 KiB
C

#include <stdio.h>
#include "sics.h"
#include "ascon.h"
#include "ascon.i"
#include "syncedprot.h"
/*
* Markus Zolliker Aug 2012
*
* This is a scriptcontext driver 'connecting' to its own sics server.
* The argument of the send command is a sync id.
* The next script of the scriptchain is called when all actions related to
* this sync id are finished or on timeout.
*
* In addition the functions needed to implement other mechanisms than scriptcontext to
* support this type of synchronization are implemented.
*
* the command sync with its subcommands begin/end/incr/decr/pending act as the corresponding
* functions SyncedBegin, SyncedEnd, SyncedIncr, SyncedDecr, SyncedPending
*
* Usage of the syncedprot:
* ------------------------
*
* For asynchronous execution of processes synchronized with commands implemented with
* scriptcontext machinery, you create an object with scriptcontext using syncprot as a protocol.
*
* with the following helper proc:
*
* proc sctsync {code} {
* set id [synced begin]
* uplevel 1 $code
* synced end $id
* sct send $id
* }
*
* In action scripts, you do all actions within the body of the sctsync command. The next script
* in the script chanin is not exectuted before all these actions are terminated.
*
*
* Usage outside of scriptcontext:
* -------------------------------
*
* additional command to wait for a sync id
*
* synced wait <sync id> [<timeout>]
*
*
* if the following proc is implemented in tcl
*
* proc synceddo {code} {
* set id [synced begin]
* uplevel 1 $code
* synced end $id
* synced wait $id
* }
*
* you might do something like:
*
* synceddo {hset /path value}
*
* or
* synceddo {
* hset /path value
* ctrl queue /path2 write
* }
*
* the synceddo command does finish when the inititated actions have finished
*/
typedef struct SyncedData {
struct SyncedData *next;
long id;
long count;
} SyncedData;
#define NSTACK 100
static SyncedData *stack[NSTACK]; /* the stack is limited in order to detect missing calls to SyncedEnd */
static int sp = 0;
static SyncedData *syncList = NULL;
static SyncedData *actualSync = NULL;
static long nextId = 1;
SyncedData *SyncedFind(long id)
{
SyncedData *s;
for (s = syncList; s != NULL; s = s->next) {
if (s->id == id) {
return s;
}
}
return NULL;
}
SyncedData *SyncedNew(void)
{
SyncedData *new = NULL;
SyncedData *s, **ptr2previous, *tobekilled;
int cnt;
long id;
id = 0;
for (cnt = 1000; id == 0 && cnt > 0; cnt--) {
id = nextId;
nextId++;
if (nextId > 999999999) {
nextId = 1;
}
ptr2previous = &syncList;
for (s = syncList; s != NULL; ) {
if (s->id == id) {
id = 0;
break;
}
if (s->count <= 0) {
/* remove unused syncs */
*ptr2previous = s->next;
tobekilled = s;
s = s->next;
if (tobekilled == actualSync) {
actualSync = NULL;
}
free(tobekilled);
} else {
ptr2previous = &s->next;
s = s->next;
}
};
}
if (id == 0) {
return NULL;
}
new = calloc(1,sizeof *new);
if (new) {
/* append at the end */
new->count = 0;
new->id = id;
*ptr2previous = new;
new->next = NULL;
}
return new;
}
long SyncedGet(void)
{
if (actualSync == NULL) {
return SYNCED_NO_ID;
}
return actualSync->id;
}
long SyncedBegin(long syncid)
{
SyncedData *sync;
if (syncid < 0) return syncid;
stack[sp] = actualSync;
sync = SyncedFind(syncid);
if (sync == NULL) {
sync = SyncedNew();
if (sync == NULL) {
return SYNCED_NO_MEMORY;
}
}
if (sp >= NSTACK) {
return SYNCED_STACK_OVERFLOW;
}
sp++;
sync->count++;
actualSync = sync;
return sync->id;
}
long SyncedEnd(long syncid)
{
SyncedData *sync;
long id;
if (syncid < 0) return syncid;
if (sp <= 0) {
return SYNCED_STACK_UNDERFLOW;
}
sp--;
sync = actualSync;
actualSync = stack[sp];
if (sync->count <= 0) {
return SYNCED_COUNT_UNDERFLOW;
}
sync->count--;
if (syncid != 0 && syncid != sync->id) {
return SYNCED_ID_MISMATCH;
}
return sync->id;
}
long SyncedIncr(long syncid)
{
SyncedData *sync;
if (syncid < 0) return syncid;
if (syncid == 0) {
sync = actualSync;
} else {
sync = SyncedFind(syncid);
}
if (sync == NULL) {
return SYNCED_NOT_FOUND;
}
sync->count++;
return sync->id;
}
long SyncedDecr(long syncid)
{
SyncedData *sync;
if (syncid < 0) return syncid;
if (syncid == 0) {
sync = actualSync;
} else {
sync = SyncedFind(syncid);
}
if (sync == NULL) {
return SYNCED_NOT_FOUND;
}
if (sync->count <= 0) {
return SYNCED_COUNT_UNDERFLOW;
}
sync->count--;
return sync->id;
}
int SyncedPending(long syncid)
{
SyncedData *sync;
if (syncid == 0) {
return (actualSync != NULL); /* this makes no sense: actual syncid is always pending */
}
sync = SyncedFind(syncid);
if (sync == NULL || sync->count <= 0) {
return 0;
}
return 1;
}
int SyncedCommand(SConnection * con, SicsInterp * sics, void *object,
int argc, char *argv[])
{
long ret;
double start, tmo;
long syncid;
char *errmsg;
SyncedData *sync;
if (argc < 2) {
goto badsyntax;
}
if (argc >= 3) {
syncid = atol(argv[2]);
if (syncid < 0) {
SCPrintf(con, eError, "ERROR: Usage: illegal sync id");
}
} else {
syncid = 0;
}
if (strcasecmp(argv[1], "wait") == 0) {
if (argc == 3) {
tmo = 365 * 24 * 3600;
} else if (argc == 4) {
tmo = atof(argv[3]);
} else {
SCPrintf(con, eError, "ERROR: Usage: synced wait <sync id> [<tmo>]");
return 0;
}
start = DoubleTime();
if (syncid == 0) {
SCPrintf(con, eError, "ERROR: missing sync id");
return 0;
}
sync = SyncedFind(syncid);
if (sync == NULL || sync->count <= 0) { /* finished */
return 1;
}
while (sync->count > 0) {
TaskYield(pServ->pTasker);
if (SCGetInterrupt(con) != eContinue) {
SCPrintf(con, eError, "ERROR: interrupted");
return 0;
}
if (DoubleTime() > start + tmo) {
SCPrintf(con, eError, "ERROR: timeout");
return 0;
}
}
return 1;
}
if (argc > 3) {
SCPrintf(con, eError, "ERROR: Usage: synced %s [<sync id>]", argv[1]);
return 0;
}
if (strcasecmp(argv[1], "begin") == 0) {
ret = SyncedBegin(syncid);
} else if (strcasecmp(argv[1], "end") == 0) {
ret = SyncedEnd(syncid);
} else if (strcasecmp(argv[1], "incr") == 0) {
ret = SyncedIncr(syncid);
} else if (strcasecmp(argv[1], "decr") == 0) {
ret = SyncedDecr(syncid);
} else if (strcasecmp(argv[1], "pending") == 0) {
ret = SyncedPending(syncid);
} else {
goto badsyntax;
}
if (ret >= 0) {
SCPrintf(con, eValue, "%ld", ret);
return 1;
}
switch (ret) {
case SYNCED_NO_ID: errmsg = "no sync id defined"; break;
case SYNCED_NO_MEMORY: errmsg = "no memory"; break;
case SYNCED_STACK_OVERFLOW: errmsg = "stack overflow"; break;
case SYNCED_STACK_UNDERFLOW: errmsg = "stack underflow"; break;
case SYNCED_COUNT_UNDERFLOW: errmsg = "count underflow"; break;
case SYNCED_ID_MISMATCH: errmsg = "sync id mismatch"; break;
case SYNCED_NOT_FOUND: errmsg = "sync id not found"; break;
default: errmsg = "unknown error"; break;
}
SCPrintf(con, eError, "ERROR: %s", errmsg);
return 0;
badsyntax:
SCPrintf(con, eError, "ERROR: Usage: synced (begin|end|incr|decr|pending|wait) [<sync id> [<tmo>]]");
return 0;
}
/*----------------------------------------------------------------------------*/
int SyncedProtHandler(Ascon *a)
{
Tcl_Interp *pTcl;
int ret;
char *result = NULL;
int iRet = 1;
long *syncid;
switch (a->state) {
case AsconConnectStart:
case AsconConnecting:
a->state = AsconConnectDone;
break;
case AsconWriteStart:
syncid = a->private;
*syncid = atol(GetCharArray(a->wrBuffer));
a->start = DoubleTime();
/* fall through */
case AsconWriting:
a->state = AsconWriteDone;
break;
case AsconReadStart:
case AsconReading:
syncid = a->private;
if (!SyncedPending(*syncid)) {
a->state = AsconReadDone;
} else if (a->timeout > 0) {
if (DoubleTime() - a->start > a->timeout) {
AsconError(a, "timeout", 0);
a->state = AsconTimeout;
}
}
break;
}
return 0;
}
/*----------------------------------------------------------------------------*/
static void SyncedKillPrivate(void *private) {
if (private) {
free(private);
}
}
/*----------------------------------------------------------------------------*/
static int SyncedProtInit(Ascon * a, SConnection * con, int argc, char *argv[])
{
a->timeout = 0.0;
if (argc > 1) {
/* a dummy hostport is used, here a unique name should be given */
a->hostport = strdup(argv[1]);
if (argc > 2) {
a->timeout = atof(argv[2]);
}
} else {
a->hostport = strdup("synced"); /* WARNING: this name is not unique, but used by Marks new Task manager */
}
a->private = calloc(1, sizeof(long));
a->killPrivate = SyncedKillPrivate;
return 1;
}
/*----------------------------------------------------------------------------*/
void AddSyncedProt()
{
static AsconProtocol Syncedprot;
Syncedprot.name = "syncedprot";
Syncedprot.handler = SyncedProtHandler;
Syncedprot.init = SyncedProtInit;
AsconInsertProtocol(&Syncedprot);
AddCmd("synced", SyncedCommand);
}