Implement task priorities and message queues

This commit is contained in:
Douglas Clowes
2015-07-29 17:47:46 +10:00
parent 0db57b9bae
commit eff54a5fd9
24 changed files with 1440 additions and 269 deletions

View File

@ -28,6 +28,7 @@ static void KillBckTask(void *data)
if (self->command) {
free(self->command);
}
free(self);
}
/*---------------------------------------------------------------------------*/
@ -44,6 +45,7 @@ static int BackgroundTask(void *data)
/*----------------------------------------------------------------------------*/
int BackgroundCommand(SConnection * pCon, char *command)
{
long lID;
pBckTask self = NULL;
self = calloc(1, sizeof(BckTask));
@ -58,7 +60,8 @@ int BackgroundCommand(SConnection * pCon, char *command)
return 0;
}
TaskRegisterN(pServ->pTasker, self->command, BackgroundTask, NULL, KillBckTask, self, 1);
lID = TaskRegisterN(pServ->pTasker, self->command, BackgroundTask, NULL, KillBckTask, self, TASK_PRIO_LOW);
SCPrintf(pCon, eValue, "bg = %ld", lID);
return 1;
}
@ -76,7 +79,6 @@ int BackgroundAction(SConnection * pCon, SicsInterp * pSics,
SCWrite(pCon, "ERROR: out of memory starting task", eError);
return 0;
}
SCSendOK(pCon);
return 1;
}

View File

@ -491,7 +491,7 @@ int CommandLog(SConnection * pCon, SicsInterp * pSics, void *pData,
SCWrite(pCon, "ERROR: autologging is already active", eError);
return 0;
}
TaskRegisterN(pServ->pTasker, "autologger", AutoTask, NULL, NULL, NULL, 1);
TaskRegisterN(pServ->pTasker, "autologger", AutoTask, NULL, NULL, NULL, TASK_PRIO_HIGH);
SCSendOK(pCon);
iAutoActive = 1;
return 1;

View File

@ -452,6 +452,9 @@ SConnection *SCCopyConnection(SConnection * pCon)
result->iMacro = pCon->iMacro;
result->iTelnet = pCon->iTelnet;
result->iOutput = pCon->iOutput;
if (pCon->oldWriteFunc != NULL)
result->write = pCon->oldWriteFunc;
else
result->write = pCon->write;
result->listening = pCon->listening;
result->eInterrupt = pCon->eInterrupt;
@ -467,7 +470,6 @@ SConnection *SCCopyConnection(SConnection * pCon)
result->contextStack = -1;
result->iList = -1;
result->runLevel = pCon->runLevel;
result->data = pCon->data;
return result;
}
@ -2272,7 +2274,7 @@ int LogCapture(SConnection * pCon, SicsInterp * pSics, void *pData,
if (code_bits & (1 << i)) {
if (GetDynStringLength(buffer) > 0)
DynStringConcatChar(buffer, ',');
DynStringConcat(buffer, OutCodeToTxt(i));
DynStringConcat(buffer, (char *)OutCodeToTxt(i));
}
}
SCPrintf(pCon, eLog, "getlog %s", GetCharArray(buffer));
@ -2291,7 +2293,7 @@ int LogCapture(SConnection * pCon, SicsInterp * pSics, void *pData,
if (code_bits & (1 << i)) {
if (GetDynStringLength(buffer) > 0)
DynStringConcatChar(buffer, ',');
DynStringConcat(buffer, OutCodeToTxt(i));
DynStringConcat(buffer, (char *)OutCodeToTxt(i));
}
}
SCPrintf(pCon, eLog, "getlog %s", GetCharArray(buffer));

View File

@ -250,7 +250,6 @@ int StartDevice(pExeList self, char *name, pObjectDescriptor pDes,
char *overwriteOption;
float oldVal;
long taskID;
pTaskGroupData taskGroup = NULL;
assert(self);
assert(pDes);
@ -329,7 +328,7 @@ int StartDevice(pExeList self, char *name, pObjectDescriptor pDes,
self->lTask = TaskRegisterN(self->pTask,"devexec",
DevExecTask,
DevExecSignal,
NULL, self,1);
NULL, self,TASK_PRIO_HIGH);
}
pCon->conStatus = HWBusy;
return 1;

View File

@ -311,7 +311,7 @@ DevSer *DevMake(SConnection * con, int argc, char *argv[])
devser->status = AsconUnconnected;
devser->startTime = -1;
TaskRegisterN(pServ->pTasker, AsconHostport(ascon),
DevQueueTask, NULL, NULL, devser, 0);
DevQueueTask, NULL, NULL, devser, TASK_PRIO_HIGH);
return devser;
}

View File

@ -475,7 +475,7 @@ int RunDiffScan(pDiffScan self, pScanData pScan,
InvokeCallBack(self->scanObject->pCall, SCANSTART, self->scanObject);
lID = TaskRegisterN(pServ->pTasker,"diffscan", DiffScanTask, NULL, NULL, self, 10);
lID = TaskRegisterN(pServ->pTasker,"diffscan", DiffScanTask, NULL, NULL, self, TASK_PRIO_HIGH);
TaskWait(pServ->pTasker, lID);

View File

@ -284,7 +284,7 @@ long StartDriveTask(void *obj, SConnection *pCon, char *name, float fTarget)
DriveTaskFunc,
DriveTaskSignal,
KillDriveTaskData,
taskData,0);
taskData, TASK_PRIO_HIGH);
}
/*--------------------------------------------------------------------------*/
pICountable GetCountableInterface(void *pObject)
@ -430,7 +430,7 @@ long StartCountTask(void *obj, SConnection *pCon, char *name)
CountTaskFunc,
CountTaskSignal,
KillCountTaskData,
taskData,0);
taskData, TASK_PRIO_HIGH);
}
/*--------------------------------------------------------------------------*/

16
nread.c
View File

@ -224,9 +224,9 @@ static int NetReadAccept(pNetRead self, mkChannel * pSock)
} else {
/* register the connection and create a task for it here */
NetReadRegister(self, pNew, command, pRes);
TaskRegister(self->pMain->pTasker,
TaskRegisterN(self->pMain->pTasker, "NetReadAccept",
SCTaskFunction,
SCSignalFunction, SCDeleteConnection, pRes, 1);
SCSignalFunction, SCDeleteConnection, pRes, TASK_PRIO_LOW);
SCSendOK(pRes);
return 1;
}
@ -377,8 +377,8 @@ static int TelnetAccept(pNetRead self, mkChannel * pSock)
/* register connection and task */
pRes->iTelnet = 1;
NetReadRegister(self, pNew, tcommand, pRes);
TaskRegister(self->pMain->pTasker,
TelnetTask, TelnetSignal, DeleteTelnet, pTel, 1);
TaskRegisterN(self->pMain->pTasker," TelnetAccept",
TelnetTask, TelnetSignal, DeleteTelnet, pTel, TASK_PRIO_LOW);
return 1;
}
} else {
@ -896,8 +896,8 @@ int NetReadWait4Data(pNetRead self, int iSocket)
pNew->pChan = pChan;
pNew->iEnd = 0;
pNew->pRead = self;
lID = TaskRegister(self->pMain->pTasker,
Wait4ReadTask, ReadWaitSignal, ReadWaitFree, pNew, 0);
lID = TaskRegisterN(self->pMain->pTasker, "NetReadWait4Data",
Wait4ReadTask, ReadWaitSignal, ReadWaitFree, pNew, TASK_PRIO_HIGH);
/* wait for finish */
TaskWait(self->pMain->pTasker, lID);
@ -1127,7 +1127,7 @@ static int CommandAcceptCB(int handle, void *userData)
TaskRegisterN(pServ->pTasker,
buffer,
SCTaskFunction,
SCSignalFunction, SCDeleteConnection, pCon, 1);
SCSignalFunction, SCDeleteConnection, pCon, TASK_PRIO_LOW);
ANETsetReadCallback(handle, CommandDataCB, usData, killCommandCBData);
SCSendOK(pCon);
return 1;
@ -1343,7 +1343,7 @@ static int TelnetAcceptCB(int handle, void *userData)
snprintf(buffer,sizeof(buffer),"con%ld", pCon->ident);
TaskRegisterN(pServ->pTasker,
buffer,
TelnetTask, TelnetSignal, DeleteTelnet, pTel, 1);
TelnetTask, TelnetSignal, DeleteTelnet, pTel, TASK_PRIO_LOW);
ANETsetReadCallback(handle, ANETTelnetProcess,
usData, killCommandCBData);
SCSendOK(pCon);

View File

@ -43,8 +43,6 @@
extern int openDevexecLog(); /* in devexec.c */
extern int NetWatchTask(void *pData); /* in nwatch.c */
/* ========================= Less dreadful file statics =================== */
/*------------------------------------------------------------------------*/
static void StopExit(void)
{
@ -116,13 +114,14 @@ int InitServer(char *file, pServer * pServ)
assert(self->pSics);
/* initialise tasker */
assert(TaskerInit(&self->pTasker));
TaskerInit(&self->pTasker);
assert(self->pTasker != NULL);
pSICSOptions = IFAddOption(pSICSOptions, "ConnectionCount", "0");
pSICSOptions = IFAddOption(pSICSOptions, "ConMask", "0");
/* initialize the network watcher */
TaskRegister(self->pTasker, NetWatchTask, NULL, NULL, NULL, 1);
TaskRegisterN(self->pTasker, "nwatch", NetWatchTask, NULL, NULL, NULL, TASK_PRIO_HIGH);
/* initialise the server from script */
SetWriteHistory(0);
@ -183,7 +182,7 @@ int InitServer(char *file, pServer * pServ)
iCommandTimeOut)) != NULL);
TaskRegisterN(self->pTasker, "Network Reader",
NetReaderTask, NetReaderSignal, NULL, /* call DeleteNetReader later than TaskerDelete */
pReader, 1);
pReader, TASK_PRIO_HIGH);
self->pReader = pReader;
/* the socket */
@ -245,17 +244,17 @@ int InitServer(char *file, pServer * pServ)
/* install environment monitor */
self->pMonitor = GetEnvMon(self->pSics);
TaskRegisterN(self->pTasker,"EV Monitor",
EnvMonTask, EnvMonSignal, NULL, self->pMonitor, 1);
EnvMonTask, EnvMonSignal, NULL, self->pMonitor, TASK_PRIO_HIGH);
/* install performance monitor */
pMon = CreatePerfMon(20);
AddCommand(self->pSics, "Performance", PerfMonWrapper, DeletePerfMon,
pMon);
TaskRegisterN(self->pTasker,"perfmon", PerfMonTask, PerfMonSignal, NULL, pMon, 1);
TaskRegisterN(self->pTasker,"perfmon", PerfMonTask, PerfMonSignal, NULL, pMon, TASK_PRIO_HIGH);
/* Install a second one for higher granularity measurement */
pMon = CreatePerfMon(2);
TaskRegisterN(self->pTasker,"perfmon2",
PerfMonTask, PerfMonSignal, DeletePerfMon, pMon, 1);
PerfMonTask, PerfMonSignal, DeletePerfMon, pMon, TASK_PRIO_HIGH);
/* install telnet port */
@ -467,7 +466,7 @@ int UserWait(SConnection * pCon, SicsInterp * pSics, void *pData,
sWait.dFinish = DoubleTime() + (double)fVal;
sWait.iEnd = 0;
lID = TaskRegisterN(pTask,"wait", WaitTask, WaitSignal, NULL, &sWait, 1);
lID = TaskRegisterN(pTask,"wait", WaitTask, WaitSignal, NULL, &sWait, TASK_PRIO_HIGH);
TaskWait(pTask, lID); if (SCGetInterrupt(pCon) != eContinue) {
return 0;
} else {

View File

@ -81,7 +81,7 @@ static int CountCallback(int iEvent, void *pEventData, void *pUser)
self->nextUpdate = time(NULL) + self->updateIntervall;
self->iEnd = 0;
self->pCon = pCon;
TaskRegisterN(pServ->pTasker, "NXupdater", UpdateTask, NULL, NULL, self, 1);
TaskRegisterN(pServ->pTasker, "NXupdater", UpdateTask, NULL, NULL, self, TASK_PRIO_HIGH);
return 1;
} else if (iEvent == COUNTEND) {
self->iEnd = 1;

View File

@ -165,7 +165,7 @@ static int StartOscillation(pOscillator self, SConnection * pCon)
*/
snprintf(pBueffel,sizeof(pBueffel),"Oscillate-%s", self->pMot->name);
self->taskID = TaskRegisterN(pServ->pTasker,pBueffel,
OscillationTask, NULL, NULL, self, 10);
OscillationTask, NULL, NULL, self, TASK_PRIO_HIGH);
if (self->taskID < 0) {
SCWrite(pCon, "ERROR: failed to start oscillation task", eError);
return 0;

View File

@ -954,7 +954,7 @@ static RemServer *RemServerInit(char *name, char *host, int port)
remserver->interestActive = 0;
remserver->forwardMessages = 1;
TaskRegisterN(pServ->pTasker,name, RemServerTask, NULL, RemServerKill,
remserver, 1);
remserver, TASK_PRIO_HIGH);
return remserver;
}

View File

@ -523,7 +523,7 @@ static char *SctActionHandler(void *actionData, char *lastReply,
traceIO(data->controller->node->name, "ERROR: action {%s} in {%s} node %s:\nERROR: %s",
data->name, origScript, path, result);
} else {
traceIO("sctunknown", "reply:%s", "ERROR: action {%s} in {%s} node %s:\nERROR: %s",
traceIO("sctunknown", "ERROR: action {%s} in {%s} node %s:\nERROR: %s",
data->name, origScript, path, result);
}
}
@ -1842,7 +1842,7 @@ static void SctKillController(void *c)
if (pServ->pTasker) {
TaskRegisterN(pServ->pTasker,"killsct", SctDeferredTask, NULL, SctDeferredFree,
controller, 0);
controller, TASK_PRIO_HIGH);
} else {
free(controller);
}

View File

@ -195,7 +195,7 @@ int SerialSicsExecute(void **pData, char *pCommand,
/* start task */
lTask = TaskRegisterN(pServ->pTasker,"serialwait",
SWTask, SWSignal, NULL, &control, 1);
SWTask, SWSignal, NULL, &control, TASK_PRIO_LOW);
/* wait for it to end */
TaskWait(pServ->pTasker, lTask);

View File

@ -237,7 +237,7 @@ int MakeCron(SConnection * pCon, SicsInterp * pSics, void *pData,
}
pNew->stat = StatisticsNew(cmd);
TaskRegisterN(pServ->pTasker, cmd, CronTask, CronSignal, KillCron, pNew, 1);
TaskRegisterN(pServ->pTasker, cmd, CronTask, CronSignal, KillCron, pNew, TASK_PRIO_LOW);
SCSendOK(pCon);
return 1;
}

View File

@ -76,7 +76,7 @@ int SicsExit(SConnection * pCon, SicsInterp * pInterp, void *pData,
if (SCMatchRights(pCon, usMugger)) { /* only Muggers are allowed to do it */
SetInterrupt(eEndServer);
lID = TaskRegisterN(pTask,"exittask", WaitTask, NULL, NULL, NULL, 1);
lID = TaskRegisterN(pTask,"exittask", WaitTask, NULL, NULL, NULL, TASK_PRIO_HIGH);
TaskWait(pTask, lID);
TaskStop(pTask);
return 1;

View File

@ -379,7 +379,7 @@ int InstallSICSPoll(SConnection * pCon, SicsInterp * pSics, void *pData,
pNew->pCon = defCon;
pNew->nPoll = 3;
TaskRegisterN(pServ->pTasker,"sicspoll", PollTask, SicsPollSignal, NULL, pNew, 10);
TaskRegisterN(pServ->pTasker,"sicspoll", PollTask, SicsPollSignal, NULL, pNew, TASK_PRIO_HIGH);
if (argc > 1) {
AddCommand(pSics, argv[1], SICSPollWrapper, killSicsPoll, pNew);

View File

@ -292,7 +292,7 @@ static void printPollList(pSicsPoll self, SConnection *pCon){
pNew->pCon = defCon;
pNew->nPoll = 3;
TaskRegister(pServ->pTasker,PollTask,SicsPollSignal,NULL,pNew, 10);
TaskRegister(pServ->pTasker,PollTask,SicsPollSignal,NULL,pNew, TASK_PRIO_HIGH);
if(argc > 1){
AddCommand(pSics,argv[1],SICSPollWrapper,

View File

@ -504,5 +504,5 @@ static int StatusTask(void *data)
void InitStatus(void)
{
BuildStatusChain();
TaskRegisterN(pServ->pTasker,"statustask",StatusTask, NULL, NULL, NULL,1);
TaskRegisterN(pServ->pTasker,"statustask",StatusTask, NULL, NULL, NULL, TASK_PRIO_HIGH);
}

View File

@ -296,10 +296,10 @@ static int cleanRestoreErr(pRestoreObj self, SConnection * pCon, int hard)
}
if (errMsg) {
SCWrite(pCon, errMsg, eError);
LLDstringDelete(newErrList);
LLDdeleteString(newErrList);
} else {
/* swap lists */
LLDstringDelete(self->errList);
LLDdeleteString(self->errList);
self->errList = newErrList;
SCPrintf(pCon, eLog, "CleanErr: %d pairs in, %d pairs out", count_in, count_ex);
}
@ -397,5 +397,5 @@ void StatusFileDirty(void)
/*-----------------------------------------------------------------------*/
void StatusFileInit(void)
{
TaskRegisterN(pServ->pTasker, "statusfile", StatusFileTask, NULL, NULL, NULL, 0);
TaskRegisterN(pServ->pTasker, "statusfile", StatusFileTask, NULL, NULL, NULL, TASK_PRIO_HIGH);
}

964
task.c

File diff suppressed because it is too large Load Diff

232
task.h
View File

@ -7,7 +7,7 @@
Mark Koennecke, September 1997
extended to suuport task groups
extended to support task groups
Mark Koennecke, December 2012
copyright: see implementation file
@ -15,6 +15,37 @@
#ifndef TASKOMAT
#define TASKOMAT
typedef long TaskTaskID;
typedef long TaskGroupID;
extern TaskTaskID TaskUnknownTaskID, TaskBadTaskID;
extern TaskGroupID TaskUnknownGroupID, TaskBadGroupID;
/*---------------------------------------------------------------------------*/
typedef enum eTaskLogLevel {
eTaskLogNone = 0,
eTaskLogDebug = 1,
eTaskLogInfo = 2,
eTaskLogWarning = 3,
eTaskLogError = 4,
eTaskLogFatal = 5
} eTaskLogLevel;
typedef void (*TaskLogFunc) (eTaskLogLevel, const char *buf);
/*
A TaskLogFunc can be registered for logging activity within the task module.
*/
/*
* Use these values for the Task Priority
*/
#define TASK_PRIO_LOW 10
#define TASK_PRIO_MED 30
#define TASK_PRIO_HIGH 50
/*--------------------------------------------------------------------------*/
typedef struct __TaskHead *pTaskHead;
typedef struct __TaskMan *pTaskMan;
typedef struct __TaskQueue *pTaskQueue;
typedef struct __TaskMessage *pTaskMessage;
/*
two data structure used internally and defined in task.c
*/
/*---------------------------------------------------------------------------*/
typedef int (*TaskFunc) (void *pData);
@ -28,6 +59,12 @@ typedef int (*TaskFunc) (void *pData);
1.
*/
/*--------------------------------------------------------------------------*/
typedef int (*TaskMsgFunc) (void *pData, pTaskMessage pMsg);
/*
* Like the TaskFunc but with a message extracted from the task queue
*/
/*--------------------------------------------------------------------------*/
typedef void (*TaskKillFunc) (void *pData);
/*
Each task using private data structures must define this functions. It's
@ -45,12 +82,6 @@ typedef void (*SignalFunc) (void *pUser, int iSignal, void *pSigData);
and a pointer to a datastructure for this signal. The meaning of signal
ID's and signal data structures is up to the client of this code.
*/
/*--------------------------------------------------------------------------*/
typedef struct __TaskHead *pTaskHead;
typedef struct __TaskMan *pTaskMan;
/*
two data structure used internally and defined in task.c
*/
/*===========================================================================
ALL FUNTIONS RETURN 0 on FAILURE, 1 ON SUCCESS WHEN NOT MENTIONED
OTHERWISE
@ -65,7 +96,7 @@ int TaskerDelete(pTaskMan * self);
tasks and the TaskManager.
*/
/*--------------------------------------------------------------------------*/
long TaskRegister(pTaskMan self, TaskFunc pTaskRun,
TaskTaskID TaskRegister(pTaskMan self, TaskFunc pTaskRun,
SignalFunc pSignalFunc,
TaskKillFunc pKillFunc, void *pData, int iPriority);
/*
@ -82,9 +113,9 @@ long TaskRegister(pTaskMan self, TaskFunc pTaskRun,
On error a negative value is returned.
*/
/*--------------------------------------------------------------------------*/
long TaskRegisterN(pTaskMan self, char *name, TaskFunc pTaskRun,
SignalFunc pSignalFunc,
TaskKillFunc pKillFunc, void *pData, int iPriority);
TaskTaskID TaskRegisterN(pTaskMan self, char *name, TaskFunc pTaskRun,
SignalFunc pSignalFunc, TaskKillFunc pKillFunc,
void *pData, int iPriority);
/*
This call enter a new task into the system. The caller has to
specify:
@ -98,6 +129,31 @@ long TaskRegisterN(pTaskMan self, char *name, TaskFunc pTaskRun,
On Success a positive value denoting the ID of the task is returned.
On error a negative value is returned.
*/
/*--------------------------------------------------------------------------*/
TaskTaskID TaskRegisterD(pTaskMan self, char *name, TaskFunc pTaskRun,
SignalFunc pSignalFunc, TaskKillFunc pKillFunc,
void *pData, int iPriority, double delay);
/*
This call enters a new task into the system.
The task will start running after the given delay in seconds.
*/
/*--------------------------------------------------------------------------*/
TaskTaskID TaskRegisterP(pTaskMan self, char *name, TaskFunc pTaskRun,
SignalFunc pSignalFunc, TaskKillFunc pKillFunc,
void *pData, int iPriority, double delay, double period);
/*
This call enters a new task into the system.
The task will run after delay seconds and then every period seconds.
*/
/*--------------------------------------------------------------------------*/
TaskTaskID TaskRegisterQ(pTaskMan self, char *name, TaskMsgFunc pTaskRun,
SignalFunc pSignalFunc, TaskKillFunc pKillFunc,
void *pData, int iPriority);
/*
This call enters a new task with queue into the system.
As for TaskRegisterN except the task function signature has
a message pointer.
*/
/*-------------------------------------------------------------------------*/
int TaskSchedule(pTaskMan self);
/*
@ -115,10 +171,10 @@ int TaskContinue(pTaskMan self);
the apopriate TaskYield, TaskSchedule or wahtever has to be called.
*/
/*-------------------------------------------------------------------------*/
int TaskWait(pTaskMan self, long lID);
int TaskWait(pTaskMan self, TaskTaskID taskID);
/*
Waits until the task specified by lID has finished. lID is obtained from
a call to TaskRegister.
Waits until the task specified by taskID has finished. taskID is obtained
from a call to TaskRegister.
*/
/*-------------------------------------------------------------------------*/
int TaskYield(pTaskMan self);
@ -148,7 +204,7 @@ int TaskSignal(pTaskMan self, int iSignal, void *pSigData);
returns 1 when task name is running, 0 else
*/
/*--------------------------------------------------------------------------*/
int isTaskIDRunning(pTaskMan self, long lID);
int isTaskIDRunning(pTaskMan self, TaskTaskID taskID);
/*
returns 1 when task name is running, 0 else
*/
@ -179,17 +235,30 @@ pTaskHead TaskIteratorNext(pTaskHead it);
Steps to the next element in the task list. Returns NULL when node.
Do NOT delete the returned pointer!
*/
pTaskHead TaskIteratorCurrent(pTaskMan self);
pTaskHead TaskIteratorByName(pTaskMan self, const char* name);
pTaskHead TaskIteratorByID(pTaskMan self, TaskTaskID taskID);
/*
Gets the task iterator for either the current, named or numbered task
Do NOT delete the returned pointer!
*/
char *TaskDescription(pTaskHead it);
/*
get a description of the task at the current iterator
You are responsible for deleting the returned character array.
*/
long GetTaskID(pTaskHead it);
char *TaskDetail(pTaskHead it);
/*
get a detailed description of the task at the current iterator
You are responsible for deleting the returned character array.
*/
TaskTaskID GetTaskID(pTaskHead it);
/*
get the ID of the current task
*/
long GetGroupID(pTaskHead it);
TaskGroupID GetGroupID(pTaskHead it);
/*
get the group ID of the current task
*/
@ -205,24 +274,19 @@ const void *GetTaskData(pTaskHead it);
Task Groups. The implementation has the limit that any given task can
only be member of one task group
===============================================================================*/
long GetTaskGroupID(pTaskMan self);
TaskGroupID GetTaskGroupID(pTaskMan self);
/*
get the ID for a task group
*/
void AddTaskToGroup(pTaskMan self, long taskID, long groupID);
void AddTaskToGroup(pTaskMan self, TaskTaskID taskID, TaskGroupID groupID);
/*
Add taskID to the task group groupID
*/
int isTaskGroupRunning(pTaskMan self, long groupID);
int isTaskGroupRunning(pTaskMan self, TaskGroupID groupID);
/*
Returns 1 when the task group is still running, 0 else
*/
typedef struct{
pTaskMan tasker;
long groupID;
} TaskGroupData, *pTaskGroupData;
int TaskGroupTask(void *data);
/*
This is a task function which implements the common task of waiting
@ -230,8 +294,124 @@ int TaskGroupTask(void *data);
structure.
*/
/*--------------------------------------------------------------------------*/
int TaskSignalGroup(pTaskMan self, int iSignal, void *pSigData, long groupID);
int TaskSignalGroup(pTaskMan self, int iSignal, void *pSigData, TaskGroupID groupID);
/*
signal only tasks in the group groupID
*/
/*--------------------------------------------------------------------------*/
void TaskSetLogFunc(TaskLogFunc);
TaskLogFunc TaskGetLogFunc(void);
eTaskLogLevel TaskSetLogLevel(eTaskLogLevel thresh);
eTaskLogLevel TaskGetLogLevel(void);
/*--------------------------------------------------------------------------*/
int TaskGetStack(pTaskMan self, pTaskHead it[]);
/*
* Returns the current stack depth of tasks and, if provided, fills the array
* of iterators. The iterators start at it[0] and can be used to get
* information about each task.
*/
/*--------------------------------------------------------------------------*/
void TaskRunMeAfter(pTaskMan self, double delay);
/*
* Run this task once in <delay> seconds
*/
/*--------------------------------------------------------------------------*/
void TaskRunMeEvery(pTaskMan self, double delay);
/*
* Run this task every <delay> seconds from now on
*/
/*--------------------------------------------------------------------------*/
double TaskRunMyPeriod(pTaskMan self);
/*
* Return this task run period in seconds
*/
/*--------------------------------------------------------------------------*/
pTaskQueue TaskQueueAlloc(void);
/*
* Task Queue constructor
*/
/*--------------------------------------------------------------------------*/
int TaskQueueFree(pTaskQueue);
/*
* Task Queue destructor
*/
/*--------------------------------------------------------------------------*/
int TaskQueueCount(pTaskQueue);
/*
* Returns the message count on the Task Queue
*/
/*--------------------------------------------------------------------------*/
int TaskQueueSend(pTaskQueue, pTaskMessage);
/*
* Pushes a Task Message onto the tail of the Task Queue
*/
/*--------------------------------------------------------------------------*/
int TaskQueueSendID(pTaskMan self, TaskTaskID taskID, pTaskMessage);
/*
* Pushes a Task Message onto the tail of the Task Queue
*/
/*--------------------------------------------------------------------------*/
pTaskMessage TaskQueueRecv(pTaskQueue);
/*
* Pops a Task Message off the head of the given Task Queue
*/
/*--------------------------------------------------------------------------*/
pTaskMessage TaskQueueRecvMine(pTaskMan self);
/*
* Pops a Task Message off the head of the owner's Task Queue
*/
/*--------------------------------------------------------------------------*/
int TaskQueueSet(pTaskMan self, TaskTaskID taskID, pTaskQueue);
/*
* Sets the Task Queue of the task
*/
/*--------------------------------------------------------------------------*/
int TaskQueueRem(pTaskMan self, TaskTaskID taskID);
/*
* Clears the Task Queue of the task
*/
/*--------------------------------------------------------------------------*/
pTaskQueue TaskQueueGet(pTaskMan self, TaskTaskID taskID);
/*
* Gets the Task Queue of the specified task
*/
/*--------------------------------------------------------------------------*/
pTaskQueue TaskQueueGetMine(pTaskMan self);
/*
* Gets the Task Queue of the current task
*/
/*--------------------------------------------------------------------------*/
pTaskMessage TaskMessageAlloc(size_t mSize, int mType);
/*
* Constructor for a Task Message of the requested size and type
*/
/*--------------------------------------------------------------------------*/
int TaskMessageFree(pTaskMessage);
/*
* Destructor for the Task Message
* returne 0=success, -1=fail
*/
/*--------------------------------------------------------------------------*/
int TaskMessageGetType(pTaskMessage self);
/*
* Get the type of the Task Message
*/
/*--------------------------------------------------------------------------*/
void TaskMessageSetType(pTaskMessage self, int mType);
/*
* Set the type of the Task Message
*/
/*--------------------------------------------------------------------------*/
void * TaskMessageGetData(pTaskMessage self);
/*
* Get the data pointer from the message
*/
/*--------------------------------------------------------------------------*/
void TaskMessageSetData(pTaskMessage self, void *mData);
/*
* Set the data pointer in the message
*/
/*--------------------------------------------------------------------------*/
#endif

270
taskobj.c
View File

@ -15,6 +15,37 @@ typedef struct {
char *scriptName;
SConnection *con;
} TclFunc, *pTclFunc;
/*-------------------------------------------------------------------------*/
void TaskObjLogFunc(eTaskLogLevel lvl, const char *buf)
{
OutCode eOut;
switch (lvl) {
case eTaskLogNone:
return;
case eTaskLogInfo:
eOut = eValue;
break;
case eTaskLogDebug:
eOut = eValue;
break;
case eTaskLogWarning:
eOut = eWarning;
break;
case eTaskLogError:
eOut = eError;
break;
case eTaskLogFatal:
eOut = eError;
break;
default:
eOut = eValue;
break;
}
SICSLogWrite(buf, eOut);
return;
}
/*-------------------------------------------------------------------------*/
static int ListCmd(pSICSOBJ self, SConnection *pCon, pHdb commandNode,
pHdb par[], int nPar)
@ -51,6 +82,103 @@ static int ListCmd(pSICSOBJ self, SConnection *pCon, pHdb commandNode,
return 1;
}
/*-------------------------------------------------------------------------*/
static int PerfCmd(pSICSOBJ self, SConnection *pCon, pHdb commandNode,
pHdb par[], int nPar)
{
pDynString result = NULL;
char buffer[256], *pDes, *pPtr, name[80];
char runs[80], waits[80], yields[80], cpu[80], wall[80], yield[80];
pTaskHead it = NULL;
result = CreateDynString(128,128);
if(result == NULL){
SCWrite(pCon,"ERROR: out of memory in PerfCmd", eError);
return 0;
}
snprintf(buffer,sizeof(buffer),"%18s %16s %16s %16s %16s %16s %16s",
"Task", "Runs", "Waits", "Yields", "Processor", "Elapsed", "Yield");
DynStringConcat(result,buffer);
DynStringConcatChar(result,'\n');
for(it = TaskIteratorStart(pServ->pTasker); it != NULL; it = TaskIteratorNext(it)){
pDes = TaskDetail(it);
if(pDes != NULL){
pPtr = stptok(pDes,name,sizeof(name),"|");
pPtr = stptok(pPtr,runs,sizeof(runs),"|");
pPtr = stptok(pPtr,waits,sizeof(waits),"|");
pPtr = stptok(pPtr,yields,sizeof(yields),"|");
pPtr = stptok(pPtr,cpu,sizeof(cpu),"|");
pPtr = stptok(pPtr,wall,sizeof(wall),"|");
pPtr = stptok(pPtr,yield,sizeof(yield),"|");
snprintf(buffer,sizeof(buffer),"%18s %16s %16s %16s %16s %16s %16s",
name,runs,waits,yields,cpu,wall,yield);
DynStringConcat(result,buffer);
DynStringConcatChar(result,'\n');
free(pDes);
}
}
SCWrite(pCon,GetCharArray(result),eValue);
DeleteDynString(result);
return 1;
}
/*----------------------------------------------------------------------*/
static int StackCmd(pSICSOBJ self, SConnection *pCon, pHdb commandNode,
pHdb par[], int nPar)
{
pDynString result = NULL;
char buffer[256], *pDes, *pPtr, name[80];
char runs[80], waits[80], yields[80], cpu[80], wall[80], yield[80];
pTaskHead it = NULL;
pTaskHead *stack = NULL;
int i, stackDepth;
result = CreateDynString(128,128);
if(result == NULL){
SCWrite(pCon,"ERROR: out of memory in StackCmd", eError);
return 0;
}
snprintf(buffer,sizeof(buffer),"%18s %16s %16s %16s %16s %16s %16s",
"Task", "Runs", "Waits", "Yields", "Processor", "Elapsed", "Yield");
DynStringConcat(result,buffer);
DynStringConcatChar(result,'\n');
stackDepth = TaskGetStack(pServ->pTasker, NULL);
if (stackDepth <= 0) {
SCWrite(pCon,"No task stack", eValue);
DeleteDynString(result);
return 0;
}
stack = calloc(stackDepth, sizeof(pTaskHead));
if (stack == NULL) {
SCWrite(pCon,"ERROR: out of memory in StackCmd", eError);
DeleteDynString(result);
return 0;
}
stackDepth = TaskGetStack(pServ->pTasker, stack);
for (i = 0; i < stackDepth; ++i) {
it = stack[i];
pDes = TaskDetail(it);
if(pDes != NULL){
pPtr = stptok(pDes,name,sizeof(name),"|");
pPtr = stptok(pPtr,runs,sizeof(runs),"|");
pPtr = stptok(pPtr,waits,sizeof(waits),"|");
pPtr = stptok(pPtr,yields,sizeof(yields),"|");
pPtr = stptok(pPtr,cpu,sizeof(cpu),"|");
pPtr = stptok(pPtr,wall,sizeof(wall),"|");
pPtr = stptok(pPtr,yield,sizeof(yield),"|");
snprintf(buffer,sizeof(buffer),"%18s %16s %16s %16s %16s %16s %16s",
name,runs,waits,yields,cpu,wall,yield);
DynStringConcat(result,buffer);
DynStringConcatChar(result,'\n');
free(pDes);
}
}
SCWrite(pCon,GetCharArray(result),eValue);
DeleteDynString(result);
free(stack);
return 1;
}
/*----------------------------------------------------------------------*/
static int KillCmd(pSICSOBJ self, SConnection *pCon, pHdb commandNode,
pHdb par[], int nPar)
@ -74,6 +202,126 @@ static int KillCmd(pSICSOBJ self, SConnection *pCon, pHdb commandNode,
SCSendOK(pCon);
return 1;
}
/*----------------------------------------------------------------------*/
static int InfoCmd(pSICSOBJ self, SConnection *pCon, pHdb commandNode,
pHdb par[], int nPar)
{
char buffer[256];
char *pArgs = NULL, *pPtr;
char task_task[20], task_name[80], task_info[80];
pTaskHead it = NULL;
if (nPar < 1) {
SCWrite(pCon,"Info.args: NULL", eError);
return 0;
}
if (!SCMatchRights(pCon,usMugger)) {
return 0;
}
pArgs = par[0]->value.v.text;
pPtr = pArgs;
while (pPtr && *pPtr == '/') ++pPtr;
pPtr = stptok(pPtr, task_task, sizeof(task_task), "/");
if (strcasecmp(task_task, "task") != 0) {
SCPrintf(pCon, eError, "ERROR: task info must start with /task/, not %s in %s",
task_task, pArgs);
return 0;
}
while (pPtr && *pPtr == '/') ++pPtr;
pPtr = stptok(pPtr, task_name, sizeof(task_name), "/");
while (pPtr && *pPtr == '/') ++pPtr;
if (strcasecmp(task_name, "count") == 0) {
int count = 0;
for(it = TaskIteratorStart(pServ->pTasker); it != NULL; it = TaskIteratorNext(it)){
++count;
}
SCPrintf(pCon, eValue, "Task.Count = %d", count);
return 1;
} else if (strcasecmp(task_name, "list") == 0) {
int count = 0;
pDynString result = CreateDynString(100, 100);
char txt[80], *pDes;
for(it = TaskIteratorStart(pServ->pTasker); it != NULL; it = TaskIteratorNext(it)) {
pDes = TaskDescription(it);
if (pDes != NULL) {
char buffer[256], *pPtr, name[80], time[80], id[80], gid[80];
pPtr = stptok(pDes,name,sizeof(name),"|");
pPtr = stptok(pPtr,time,sizeof(time),"|");
pPtr = stptok(pPtr,id,sizeof(id),"|");
snprintf(buffer,sizeof(buffer),"%s", id);
if (count++ > 0)
DynStringConcatChar(result,' ');
DynStringConcat(result, buffer);
free(pDes);
}
}
SCWrite(pCon,GetCharArray(result), eValue);
DeleteDynString(result);
return 1;
} else if (strcasecmp(task_name, "current") == 0) {
it = TaskIteratorCurrent(pServ->pTasker);
} else if (strcasecmp(task_name, "by-id") == 0) {
pPtr = stptok(pPtr, task_name, sizeof(task_name), "/");
while (*pPtr == '/') ++pPtr;
it = TaskIteratorByID(pServ->pTasker, atol(task_name));
} else if (strcasecmp(task_name, "by-name") == 0) {
pPtr = stptok(pPtr, task_name, sizeof(task_name), "/");
while (*pPtr == '/') ++pPtr;
it = TaskIteratorByName(pServ->pTasker, task_name);
} else {
SCPrintf(pCon, eError, "ERROR: expected current, by-name or by-id in %s", pArgs);
return 0;
}
if (it == NULL) {
SCPrintf(pCon, eError, "ERROR: task not found in %s", pArgs);
return 0;
}
pPtr = stptok(pPtr, task_info, sizeof(task_info), "/");
if (strcasecmp(task_info, "all") == 0) {
char *pDet, *pDes;
pDes = TaskDescription(it);
pDet = TaskDetail(it);
SCPrintf(pCon, eValue, "%s %s", pDes, pDet);
free(pDes);
free(pDet);
} else {
SCPrintf(pCon, eLog, "Info.args: %s", par[0]->value.v.text);
}
return 1;
}
/*----------------------------------------------------------------------*/
static int LogCmd(pSICSOBJ self, SConnection *pCon, pHdb commandNode,
pHdb par[], int nPar)
{
if (nPar < 1) {
SCWrite(pCon,"ERROR: need log level (none,debug,info,warning,error,fatal)", eError);
return 0;
}
if (!SCMatchRights(pCon,usMugger)) {
return 0;
}
if (strcasecmp("None", par[0]->value.v.text) == 0) {
TaskSetLogLevel(eTaskLogNone);
} else if (strcasecmp("Debug", par[0]->value.v.text) == 0) {
TaskSetLogLevel(eTaskLogDebug);
} else if (strcasecmp("Info", par[0]->value.v.text) == 0) {
TaskSetLogLevel(eTaskLogInfo);
} else if (strcasecmp("Warning", par[0]->value.v.text) == 0) {
TaskSetLogLevel(eTaskLogWarning);
} else if (strcasecmp("Error", par[0]->value.v.text) == 0) {
TaskSetLogLevel(eTaskLogError);
} else if (strcasecmp("Fatal", par[0]->value.v.text) == 0) {
TaskSetLogLevel(eTaskLogFatal);
}
TaskSetLogFunc((TaskLogFunc) TaskObjLogFunc);
SCSendOK(pCon);
return 1;
}
/*--------------------------------------------------------------------*/
static int TclTaskFunction(void *pData)
{
@ -153,7 +401,7 @@ static int RunCmd(pSICSOBJ self, SConnection *pCon, pHdb commandNode,
TclTaskFunction,
TclFuncSignal,
KillTclFunc,
data, 0);
data, TASK_PRIO_HIGH);
traceSys("task","Started task %s",data->scriptName);
SCSendOK(pCon);
return 1;
@ -172,6 +420,8 @@ int TaskOBJFactory(SConnection *pCon, SicsInterp *pSics, void *pData,
cmd = AddSICSHdbPar(pNew->objectNode, "ps", usSpy,
MakeSICSFunc(ListCmd));
cmd = AddSICSHdbPar(pNew->objectNode, "perf", usSpy,
MakeSICSFunc(PerfCmd));
cmd = AddSICSHdbPar(pNew->objectNode, "kill", usSpy,
MakeSICSFunc(KillCmd));
@ -205,6 +455,24 @@ void InitTaskOBJ()
cmd = AddSICSHdbPar(pNew->objectNode, "ps", usSpy,
MakeSICSFunc(ListCmd));
cmd = AddSICSHdbPar(pNew->objectNode, "perf", usSpy,
MakeSICSFunc(PerfCmd));
cmd = AddSICSHdbPar(pNew->objectNode, "stack", usSpy,
MakeSICSFunc(StackCmd));
cmd = AddSICSHdbPar(pNew->objectNode, "info", usSpy,
MakeSICSFunc(InfoCmd));
SetHdbProperty(cmd,"type","command");
SetHdbProperty(cmd,"priv","spy");
node = MakeSICSHdbPar("args",usSpy,MakeHdbText("banana"));
AddHipadabaChild(cmd,node,NULL);
cmd = AddSICSHdbPar(pNew->objectNode, "log", usSpy,
MakeSICSFunc(LogCmd));
SetHdbProperty(cmd,"type","command");
SetHdbProperty(cmd,"priv","spy");
node = MakeSICSHdbPar("level",usSpy,MakeHdbText("banana"));
AddHipadabaChild(cmd,node,NULL);
cmd = AddSICSHdbPar(pNew->objectNode, "kill", usSpy,
MakeSICSFunc(KillCmd));

21
trace.c
View File

@ -191,15 +191,11 @@ static int strrepc(char *pszStr, char cFrom, char cTo)
/*----------------------------------------------------------------*/
while( 0 != ( ptr = strchr( pszStr, cFrom ) ) )
{ /* WHILE cFrom occurs in pszStr */
pszStr[ (int) ptr - (int) pszStr ] = cTo ;
/*- Replace next cFrom with cTo */
iReturn++ ; /*- count */
for (ptr = pszStr; ptr && *ptr; ++ptr) {
if (*ptr == cFrom) {
*ptr = cTo;
++iReturn;
}
}
return( iReturn ) ;
@ -286,12 +282,13 @@ void traceCommand(char *id, char *format, ...)
va_end(argptr);
if(len >= sizeof(buffer)){
buf = malloc(len+1);
memset(buf,0,len+1);
if(buf != NULL){
memset(buf,0,len+1);
va_start(argptr,format);
len = vsnprintf(buf, len+1,format,argptr);
va_end(argptr);
traceprint("com",id,buf);
free(buf);
}
} else {
traceprint("com",id,buffer);
@ -471,7 +468,7 @@ static int TraceLog(pSICSOBJ ccmd, SConnection * con,
}
if(traceStamperID < 0){
traceStamperID = TaskRegisterN(pServ->pTasker,"tracestamper",
TraceLogTask, NULL, NULL, NULL, 1);
TraceLogTask, NULL, NULL, NULL, TASK_PRIO_HIGH);
}
}
}
@ -514,7 +511,7 @@ static int TraceAppend(pSICSOBJ ccmd, SConnection * con,
hdbInit = 1;
}
TaskRegisterN(pServ->pTasker,"tracestamper",
TraceLogTask, NULL, NULL, NULL, 1);
TraceLogTask, NULL, NULL, NULL, TASK_PRIO_HIGH);
}
return 1;
}