Cleanup, rework shutdown mechanism.

This commit is contained in:
Andrew Johnson
2008-05-15 17:41:02 +00:00
parent 24edb594cc
commit 06d11b736a
2 changed files with 104 additions and 122 deletions

View File

@@ -1,5 +1,5 @@
/*************************************************************************\
* Copyright (c) 2007 UChicago Argonne LLC, as Operator of Argonne
* Copyright (c) 2008 UChicago Argonne LLC, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
@@ -18,6 +18,7 @@
#include <stdlib.h>
#include <stdio.h>
#include "cantProceed.h"
#include "dbDefs.h"
#include "epicsEvent.h"
#include "epicsThread.h"
@@ -40,150 +41,132 @@
#include "dbLock.h"
#include "callback.h"
int callbackQueueSize = 2000;
static epicsThreadOnceId callbackOnceFlag = EPICS_THREAD_ONCE_INIT;
static int callbackQueueSize = 2000;
static epicsEventId callbackSem[NUM_CALLBACK_PRIORITIES];
static epicsRingPointerId callbackQ[NUM_CALLBACK_PRIORITIES];
static epicsThreadId callbackTaskId[NUM_CALLBACK_PRIORITIES];
static int ringOverflow[NUM_CALLBACK_PRIORITIES];
static volatile int ringOverflow[NUM_CALLBACK_PRIORITIES];
volatile int callbackRestart = FALSE;
volatile int callbackExit = FALSE;
static int priorityValue[NUM_CALLBACK_PRIORITIES] = {0,1,2};
/*for Delayed Requests */
static void notify(void *pPrivate);
/* Timer for Delayed Requests */
static epicsTimerQueueId timerQueue;
/* Shutdown handling */
static epicsEventId exitEvent;
static void *exitValue;
/* forward references */
static void wdCallback(void *ind); /*callback from taskwd*/
static void start(int ind); /*start or restart a callbackTask*/
/*public routines */
int epicsShareAPI callbackSetQueueSize(int size)
/* Static data */
static char *threadName[NUM_CALLBACK_PRIORITIES] = {
"cbLow", "cbMedium", "cbHigh"
};
static unsigned int threadPriority[NUM_CALLBACK_PRIORITIES] = {
epicsThreadPriorityScanLow - 1,
epicsThreadPriorityScanLow + 4,
epicsThreadPriorityScanHigh + 1
};
static int priorityValue[NUM_CALLBACK_PRIORITIES] = {0, 1, 2};
int callbackSetQueueSize(int size)
{
if (callbackOnceFlag != EPICS_THREAD_ONCE_INIT) {
errlogPrintf("Callback system already initialized\n");
return -1;
}
callbackQueueSize = size;
return 0;
}
static void callbackTask(void *arg)
{
int priority = *(int *)arg;
taskwdInsert(epicsThreadGetIdSelf(), NULL, NULL);
while(TRUE) {
epicsEventMustWait(callbackSem[priority]);
void *ptr;
while((ptr = epicsRingPointerPop(callbackQ[priority]))) {
if (ptr == &exitValue) goto shutdown;
CALLBACK *pcallback = (CALLBACK *)ptr;
ringOverflow[priority] = FALSE;
(*pcallback->callback)(pcallback);
}
}
shutdown:
taskwdRemove(epicsThreadGetIdSelf());
epicsEventSignal(exitEvent);
}
static void callbackShutdown(void *arg)
{
int i;
callbackExit = TRUE;
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
epicsEventSignal(callbackSem[i]);
}
}
static int callbackWait(int priority)
{
if (callbackExit) return FALSE;
epicsEventMustWait(callbackSem[priority]);
return !callbackExit;
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
int lockKey = epicsInterruptLock();
int ok = epicsRingPointerPush(callbackQ[i], &exitValue);
epicsInterruptUnlock(lockKey);
epicsEventSignal(callbackSem[i]);
if (ok) epicsEventWait(exitEvent);
}
}
static void callbackInitPvt(void *arg)
{
int i;
exitEvent = epicsEventMustCreate(epicsEventEmpty);
timerQueue = epicsTimerQueueAllocate(0,epicsThreadPriorityScanHigh);
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
start(i);
epicsThreadId tid;
callbackSem[i] = epicsEventMustCreate(epicsEventEmpty);
callbackQ[i] = epicsRingPointerCreate(callbackQueueSize);
if (callbackQ[i] == 0)
cantProceed("epicsRingPointerCreate failed for %s\n",
threadName[i]);
ringOverflow[i] = FALSE;
tid = epicsThreadCreate(threadName[i], threadPriority[i],
epicsThreadGetStackSize(epicsThreadStackBig),
(EPICSTHREADFUNC)callbackTask, &priorityValue[i]);
if (tid == 0)
cantProceed("Failed to spawn callback task %s\n", threadName[i]);
}
epicsAtExit(callbackShutdown, NULL);
}
void epicsShareAPI callbackInit(void)
void callbackInit(void)
{
static epicsThreadOnceId callbackOnceFlag = EPICS_THREAD_ONCE_INIT;
epicsThreadOnce(&callbackOnceFlag,callbackInitPvt,NULL);
}
/* Routine which places requests into callback queue*/
/* This routine can be called from interrupt routine*/
void epicsShareAPI callbackRequest(CALLBACK *pcallback)
/* This routine can be called from interrupt context */
void callbackRequest(CALLBACK *pcallback)
{
int priority = pcallback->priority;
int pushOK;
int lockKey;
if(priority<0 || priority>=(NUM_CALLBACK_PRIORITIES)) {
epicsPrintf("callbackRequest called with invalid priority\n");
return;
if (priority < 0 || priority >= NUM_CALLBACK_PRIORITIES) {
epicsPrintf("callbackRequest called with invalid priority\n");
return;
}
if(ringOverflow[priority]) return;
if (ringOverflow[priority]) return;
lockKey = epicsInterruptLock();
pushOK = epicsRingPointerPush(callbackQ[priority],(void *)pcallback);
pushOK = epicsRingPointerPush(callbackQ[priority], pcallback);
epicsInterruptUnlock(lockKey);
if(!pushOK) {
epicsPrintf("callbackRequest ring buffer full\n");
ringOverflow[priority] = TRUE;
if (!pushOK) {
errlogPrintf("callbackRequest: %s ring buffer full\n",
threadName[priority]);
ringOverflow[priority] = TRUE;
}
epicsEventSignal(callbackSem[priority]);
return;
}
/* General purpose callback task */
static void callbackTask(int *ppriority)
{
int priority = *ppriority;
CALLBACK *pcallback;
taskwdInsert(epicsThreadGetIdSelf(),
wdCallback,(void *)&priorityValue[priority]);
ringOverflow[priority] = FALSE;
while(callbackWait(priority)) {
while((pcallback = (CALLBACK *)
epicsRingPointerPop(callbackQ[priority]))) {
ringOverflow[priority] = FALSE;
(*pcallback->callback)(pcallback);
}
}
taskwdRemove(epicsThreadGetIdSelf());
}
static char *priorityName[3] = {"Low","Medium","High"};
static void start(int ind)
{
unsigned int priority;
char taskName[20];
callbackSem[ind] = epicsEventMustCreate(epicsEventEmpty);
if(ind==0) priority = epicsThreadPriorityScanLow - 1;
else if(ind==1) priority = epicsThreadPriorityScanLow +4;
else if(ind==2) priority = epicsThreadPriorityScanHigh + 1;
else {
errMessage(0,"callback start called with illegal priority\n");
return;
}
if((callbackQ[ind]=epicsRingPointerCreate(callbackQueueSize)) == 0)
errMessage(0,"epicsRingPointerCreate failed while starting a callback task");
sprintf(taskName,"cb%s",priorityName[ind]);
callbackTaskId[ind] = epicsThreadCreate(taskName,priority,
epicsThreadGetStackSize(epicsThreadStackBig),(EPICSTHREADFUNC)callbackTask,
&priorityValue[ind]);
if(callbackTaskId[ind]==0) {
errMessage(0,"Failed to spawn a callback task");
return;
}
}
static void wdCallback(void *pind)
{
int ind = *(int *)pind;
taskwdRemove(callbackTaskId[ind]);
if(!callbackRestart)return;
epicsEventDestroy(callbackSem[ind]);
epicsRingPointerDelete(callbackQ[ind]);
start(ind);
}
static void ProcessCallback(CALLBACK *pcallback)
{
dbCommon *pRec;
dbCommon *pRec;
callbackGetUser(pRec, pcallback);
dbScanLock(pRec);
@@ -191,50 +174,49 @@ static void ProcessCallback(CALLBACK *pcallback)
dbScanUnlock(pRec);
}
void epicsShareAPI callbackSetProcess(CALLBACK *pcallback,
int Priority, void *pRec)
void callbackSetProcess(CALLBACK *pcallback, int Priority, void *pRec)
{
callbackSetCallback(ProcessCallback, pcallback);
callbackSetPriority(Priority, pcallback);
callbackSetUser(pRec, pcallback);
}
void epicsShareAPI callbackRequestProcessCallback(CALLBACK *pcallback,
int Priority, void *pRec)
void callbackRequestProcessCallback(CALLBACK *pcallback,
int Priority, void *pRec)
{
callbackSetProcess(pcallback, Priority, pRec);
callbackRequest(pcallback);
}
static void notify(void *pPrivate)
{
CALLBACK *pcallback = (CALLBACK *)pPrivate;
callbackRequest(pcallback);
}
void epicsShareAPI callbackRequestDelayed(CALLBACK *pcallback,double seconds)
void callbackRequestDelayed(CALLBACK *pcallback, double seconds)
{
epicsTimerId timer = (epicsTimerId)pcallback->timer;
if(timer==0) {
timer = epicsTimerQueueCreateTimer(timerQueue,notify,(void *)pcallback);
if (timer == 0) {
timer = epicsTimerQueueCreateTimer(timerQueue, notify, pcallback);
pcallback->timer = timer;
}
epicsTimerStartDelay(timer,seconds);
epicsTimerStartDelay(timer, seconds);
}
void epicsShareAPI callbackCancelDelayed(CALLBACK *pcallback)
void callbackCancelDelayed(CALLBACK *pcallback)
{
epicsTimerId timer = (epicsTimerId)pcallback->timer;
if (timer!=0) {
if (timer != 0) {
epicsTimerCancel(timer);
}
}
void epicsShareAPI callbackRequestProcessCallbackDelayed(CALLBACK *pcallback,
int Priority, void *pRec,double seconds)
void callbackRequestProcessCallbackDelayed(CALLBACK *pcallback,
int Priority, void *pRec, double seconds)
{
callbackSetProcess(pcallback, Priority, pRec);
callbackRequestDelayed(pcallback,seconds);
callbackRequestDelayed(pcallback, seconds);
}

View File

@@ -55,18 +55,18 @@ typedef void (*CALLBACKFUNC)(struct callbackPvt*);
#define callbackGetUser(USER,PCALLBACK)\
( (USER) = (void *)((CALLBACK *)(PCALLBACK))->user )
epicsShareFunc void epicsShareAPI callbackInit(void);
epicsShareFunc void epicsShareAPI callbackRequest(CALLBACK *pCallback);
epicsShareFunc void epicsShareAPI callbackSetProcess(
epicsShareFunc void callbackInit(void);
epicsShareFunc void callbackRequest(CALLBACK *pCallback);
epicsShareFunc void callbackSetProcess(
CALLBACK *pcallback, int Priority, void *pRec);
epicsShareFunc void epicsShareAPI callbackRequestProcessCallback(
epicsShareFunc void callbackRequestProcessCallback(
CALLBACK *pCallback,int Priority, void *pRec);
epicsShareFunc void epicsShareAPI callbackRequestDelayed(
epicsShareFunc void callbackRequestDelayed(
CALLBACK *pCallback,double seconds);
epicsShareFunc void epicsShareAPI callbackCancelDelayed(CALLBACK *pcallback);
epicsShareFunc void epicsShareAPI callbackRequestProcessCallbackDelayed(
epicsShareFunc void callbackCancelDelayed(CALLBACK *pcallback);
epicsShareFunc void callbackRequestProcessCallbackDelayed(
CALLBACK *pCallback, int Priority, void *pRec, double seconds);
epicsShareFunc int epicsShareAPI callbackSetQueueSize(int size);
epicsShareFunc int callbackSetQueueSize(int size);
#ifdef __cplusplus
}