#include #include #include "ascon.h" #include "devser.h" #include "dynstring.h" typedef struct DevAction { struct DevAction *next; void *data; DevActionHandler *hdl; DevPrio prio; DevKillActionData *kill; double interval; /* -1 for a queued action */ double timeDue; /* 0 for a queued action */ DevInfoFunc *infoFunc; } DevAction; struct DevSer { Ascon *ascon; /* connection */ DevAction *current; int killCurrent; DevAction *actions; /* the action queue */ DevAction *toKill; /* list of actions to be killed */ int steps; AsconStatus status; /* fields for statistics: */ double startTime; double comCount; long nComCount; int comMaxState; double comMax; long errorCount; int inError; double asconStart; double asconSum; double asconMax; long asconCount; int asconState, asconMaxState; int maxCount; }; static char *devPrio[NumberOfPRIO] = { "null", "slow", "read", "progress", "write", "halt", "start" }; char *DevPrio2Text(DevPrio prio) { if (prio <= 0 || prio >= NumberOfPRIO) { prio = NullPRIO; } return devPrio[prio]; } DevPrio DevText2Prio(char *text) { DevPrio prio; for (prio = 0; prio < NumberOfPRIO; prio++) { if (strcasecmp(text, devPrio[prio]) == 0) return prio; } return NullPRIO; } static void DevFreeActionList(DevAction * actions) { DevAction *victim; while (actions != NULL) { victim = actions; actions = victim->next; assert(victim->data != NULL); if (victim->kill != NULL) victim->kill(victim->data); victim->data=NULL; free(victim); } } static double nextDue(double due, double now, double interval) { /* calculate next due value. the phase is rounded to usec in order to prevent phase drifts by summed rounding errors */ double p; /* phase */ if (interval <= 0) return now; if (now < due) return due; p = round(fmod(due, interval)*1e6)/1e6; return p + interval * floor((due-p)/interval + (now-due)/interval + 1.000001); } static DevAction *DevNextAction(DevSer * devser) { /* the action queue is primarily ordered by priority (high priority first), * within one priority by usage (last used action at end) */ DevPrio prio; double now, next; DevAction *action, **ptr2prev; devser->current = NULL; now = DoubleTime(); for (ptr2prev = &devser->actions, action = devser->actions; action != NULL; ptr2prev = &action->next, action = action->next) { if (now >= action->timeDue) { /* remove action from queue */ *ptr2prev = action->next; devser->current = action; if (action->interval >= 0) { /* this is a scheduled action, to be preserved after use */ devser->killCurrent = 0; if (action->interval == 0) { action->timeDue = now; } else { /* increase timeDue according to interval */ action->timeDue = nextDue(action->timeDue, now, action->interval); } prio = action->prio; /* insert devser->current before the next lower priority */ for (action = action->next; /* start after this */ action != NULL && action->prio >= prio; ptr2prev = &action->next, action = action->next); /* action is now NULL or the next action with lower priority */ *ptr2prev = devser->current; devser->current->next = action; } else { /* this is a queued action, to be killed after use */ devser->killCurrent = 1; } return devser->current; } if (action->prio == StartPRIO) { /* if a poll with StartPRIO is scheduled, block all other actions */ return NULL; } } return NULL; } static void LogStart(DevSer *self) { self->startTime = DoubleTime(); } static void LogResponse(DevSer *self, int error) { double responseTime; if(self->startTime < 0){ printf("DEVSER: there is something fucked up in LogResponse, Investigate!\n"); self->startTime = -1; return; } responseTime = DoubleTime() - self->startTime; self->comCount += responseTime; if(responseTime > self->comMax){ self->comMax = responseTime; } self->nComCount++; if(error == 1 && self->inError == 0){ self->errorCount++; self->inError = 1; } else if(error == 0){ self->inError = 0; } self->startTime = -1; if(responseTime > self->comMax/2.){ self->comMaxState++; } } static void StartAscon(DevSer *self) { self->asconStart = DoubleTime(); self->asconState = AsconLastState(self->ascon); } static void AsconLog(DevSer *self) { double used; used = DoubleTime() - self->asconStart; self->asconSum += used; self->asconCount++; if(used > self->asconMax){ self->asconMax = used; self->asconMaxState = self->asconState; } if(used > self->asconMax/2.){ self->maxCount++; } } void DevStatistics(DevSer *devser, double *avg, double *max, int *maxCount, long *errCount, int *errState) { if(devser->nComCount > 0){ *avg = devser->comCount/devser->nComCount; } else { *avg = 0; /* no data!*/ } *max = devser->comMax; *errCount = devser->errorCount; *errState = devser->inError; *maxCount = devser->maxCount; } void DevAsconStatistics(DevSer *self, double *avg, \ double *max, int *maxState, int *longCount) { if(self->asconCount > 0){ *avg = self->asconSum/self->asconCount; } else { *avg = .0; } *max = self->asconMax; *maxState = self->asconMaxState; *longCount = self->maxCount; } int DevQueueTask(void *ds) { DevSer *devser = ds; DevAction *action; char *sendData; char *replyData = NULL; if (devser->steps == 0) return 1; /* deferred deallocation of removed actions */ DevFreeActionList(devser->toKill); devser->toKill = NULL; action = devser->current; if (action == NULL) { action = DevNextAction(devser); } StartAscon(devser); devser->status = AsconTask(devser->ascon); while (action != NULL) { AsconLog(devser); if (devser->status >= AsconFailure) { replyData = AsconGetError(devser->ascon); /** * TODO: this may be a place to record the end time */ if(devser->startTime > 0){ LogResponse(devser,1); } else { /* This is a follow up error and should not go into statistics */ } } else if (devser->status == AsconReady) { replyData = AsconRead(devser->ascon); if(replyData != NULL){ LogResponse(devser,0); } } else if (devser->status == AsconOffline) { replyData = "ASCERR: disconnected"; } else { return 1; } if (devser->steps > 0) { /* debugging mode */ devser->steps--; } sendData = action->hdl(action->data, replyData, (devser->status != AsconReady)); if (sendData != NULL) { devser->status = AsconWrite(devser->ascon, sendData, 0); if (devser->status >= AsconFailure) { replyData = AsconGetError(devser->ascon); continue; } LogStart(devser); return 1; } if (devser->killCurrent) { if (action->kill != NULL) action->kill(action->data); devser->killCurrent = 0; free(action); devser->current = NULL; } action = DevNextAction(devser); StartAscon(devser); devser->status = AsconTask(devser->ascon); } return 1; } DevSer *DevMake(SConnection * con, int argc, char *argv[]) { DevSer *devser = NULL; Ascon *ascon = NULL; ascon = AsconMake(con, argc, argv); if (!ascon) { return NULL; } devser = calloc(1, sizeof(*devser)); assert(devser); devser->ascon = ascon; devser->current = NULL; devser->killCurrent = 0; devser->actions = NULL; devser->toKill = NULL; devser->steps = -1; /* no debugging by default */ devser->status = AsconUnconnected; devser->startTime = -1; TaskRegisterN(pServ->pTasker, AsconHostport(ascon), DevQueueTask, NULL, NULL, devser, TASK_PRIO_HIGH); return devser; } void DevDebugMode(DevSer * devser, int steps) { devser->steps = steps; } static void DevReset(DevSer * devser) { /* DevFreeActionList(devser->actions); devser->actions = NULL; */ DevFreeActionList(devser->toKill); devser->toKill = NULL; if (devser->killCurrent) { if (devser->current->kill != NULL) { devser->current->kill(devser->current->data); } devser->killCurrent = 0; free(devser->current); devser->current = NULL; } } void DevKill(DevSer * devser) { if (devser->ascon) { AsconKill(devser->ascon); } DevReset(devser); TaskRemove(pServ->pTasker, DevQueueTask, devser); free(devser); } void DevDisconnect(DevSer * devser) { DevReset(devser); if (devser->ascon) { AsconDisconnect(devser->ascon); } devser->status = AsconOffline; } void DevReconnect(DevSer * devser, char *hostport) { /* DevReset(devser); */ if (devser->ascon) { AsconReconnect(devser->ascon, hostport); } } int DevUnschedule(DevSer * devser, void *callData, DevActionHandler * hdl, DevActionMatch * matchFunc) { DevAction **ptr2prev = NULL; DevAction *action = NULL; int cnt = 0; /* scan through the queue */ for (ptr2prev = &devser->actions, action = devser->actions; action != NULL; action = action->next) { if (action->hdl == hdl && matchFunc(callData, action->data)) { if (action == devser->current) { devser->current = NULL; devser->killCurrent = 0; } cnt++; /* remove from list */ *ptr2prev = action->next; /* add to kill list */ action->next = devser->toKill; devser->toKill = action; } else { ptr2prev = &action->next; } } return cnt; } int DevScheduleS(DevSer * devser, void *actionData, DevPrio prio, double interval, double start, DevActionHandler * hdl, DevActionMatch * matchFunc, DevKillActionData * killFunc, DevInfoFunc * infoFunc) { DevAction *action; DevAction *foundAction = NULL; DevAction **ptr2prev; DevAction **ptr2insertPos = NULL; int ret; assert(killFunc == NULL || actionData != NULL); if (prio <= NullPRIO) { prio = NullPRIO + 1; } if (prio >= NumberOfPRIO) { prio = NumberOfPRIO - 1; } /* find similar action and the point, where we have to insert */ for (ptr2prev = &devser->actions, action = devser->actions; action != NULL; action = action->next) { if (action->prio < prio && ptr2insertPos == NULL) { ptr2insertPos = ptr2prev; } /* check if it is the same action (only once) */ if (action->hdl == hdl && action->kill == killFunc && (interval < 0) == (action->interval < 0) && foundAction == NULL && matchFunc(actionData, action->data)) { if (prio == action->prio && interval < 0) { /* do not move an action with equal prio */ if (killFunc) { killFunc(actionData); } return 0; /* not queued */ } /* remove action from list */ *ptr2prev = action->next; foundAction = action; } else { ptr2prev = &action->next; } } if (foundAction != NULL) { /* a similar action was found */ action = foundAction; if (killFunc) { killFunc(actionData); } ret = 0; } else { /* create if needed */ action = calloc(1, sizeof(*action)); assert(action); action->data = actionData; action->hdl = hdl; action->kill = killFunc; action->infoFunc = infoFunc; action->timeDue = 0; action->interval = 0; ret = 1; } action->prio = prio; /* insert into queue */ if (ptr2insertPos == NULL) { ptr2insertPos = ptr2prev; } action->next = *ptr2insertPos; *ptr2insertPos = action; if (interval < 0) { /* case "queued" */ action->interval = -1.0; } else { /* case "scheduled" */ /* nextDue calculates the next due date after the last call however, it returns the 'start' value - when this is a new action and start > 0 - when the presumed last call time is bigger the 'start' value */ action->timeDue = nextDue(start, action->timeDue - action->interval, interval); action->interval = interval; } return ret; /* when 0, actionData was killed */ } int DevSchedule(DevSer * devser, void *actionData, DevPrio prio, double interval, DevActionHandler * hdl, DevActionMatch * matchFunc, DevKillActionData * killFunc, DevInfoFunc * infoFunc) { return DevScheduleS(devser, actionData, prio, interval, 0.0, hdl, matchFunc, killFunc, infoFunc); } int DevQueue(DevSer * devser, void *actionData, DevPrio prio, DevActionHandler * hdl, DevActionMatch * matchFunc, DevKillActionData * killFunc, DevInfoFunc *infoFunc) { return DevScheduleS(devser, actionData, prio, -1.0, 0.0, hdl , matchFunc, killFunc, infoFunc); } int DevRemoveAction(DevSer * devser, void *actionData) { DevAction **ptr2prev = NULL; DevAction *action = NULL; int cnt = 0; /* Remove current action, if matched. If a reply is pending, the next action will get the reply. But as in the inital state no reply is expected, this should not harm. */ action = devser->current; if (action != NULL && actionData == action->data) { if (devser->killCurrent) { if (action->kill != NULL) action->kill(action->data); devser->killCurrent = 0; free(action); } devser->current = NULL; } /* remove from queue */ ptr2prev = &devser->actions; for (action = devser->actions; action != NULL; action = *ptr2prev) { if (actionData == action->data) { cnt++; /* remove from list */ *ptr2prev = action->next; if (action->kill != NULL) action->kill(action->data); free(action); } else { ptr2prev = &action->next; } } return cnt++; } int DevIsPending(DevSer * devser, void *callData, DevActionHandler * hdl, DevActionMatch * matchFunc) { DevAction *action = devser->current; if (action) { if (action->hdl == hdl && matchFunc(callData, action->data)) { return 1; } } return 0; } char * DevList(DevSer * devser) { DevAction * action; pDynString result, info; char text[80]; char *str; result = CreateDynString(63,64); for (action = devser->actions; action != NULL; action = action->next) { if (action->interval < 0) { snprintf(text, sizeof text, "%-8s queued ", DevPrio2Text(action->prio)); } else { snprintf(text, sizeof text, "%-8s %8.3g %14.3f ", DevPrio2Text(action->prio), action->interval, action->timeDue); } DynStringConcat(result, text); if (action == devser->current) { DynStringConcat(result, "ACTIVE "); } if (action->infoFunc) { str = action->infoFunc(action->data); DynStringConcat(result, str); free(str); } else { snprintf(text, sizeof text, "%p", action->data); DynStringConcat(result, text); } DynStringConcat(result, "\n"); } str = strdup(GetCharArray(result)); DeleteDynString(result); return str; } char *DevHostport(DevSer *devser) { return AsconHostport(devser->ascon); } char *DevIP(DevSer *devser) { return AsconIP(devser->ascon); } char *DevStatus(DevSer *devser) { char *str, *pos; static char buf[64]; switch (devser->status) { case AsconOffline: return "disconnected"; case AsconUnconnected: return "unconnected"; case AsconConnectPending: return "connecting"; /* case AsconPending: return "busy"; case AsconReady: return "ready"; case AsconFailure: return AsconGetError(devser->ascon); */ } str = AsconGetError(devser->ascon); if (strncmp(str, "ASCERR: ", 8) == 0) { str += 8; } pos = strstr(str, " ("); if (pos != 0) { snprintf(buf, sizeof buf, "%s", str); buf[pos-str] = 0; str = buf; } return str; } double DevGetSetTimeout(DevSer *devser, double timeout, int setmode) { return AsconGetSetTimeout(devser->ascon, timeout, setmode); } int DevReconnectInterval(DevSer *devser, int interval) { return AsconReconnectInterval(devser->ascon, interval); }