From 06d11b736afee5b91566da9f4467a8cb62842840 Mon Sep 17 00:00:00 2001 From: Andrew Johnson Date: Thu, 15 May 2008 17:41:02 +0000 Subject: [PATCH] Cleanup, rework shutdown mechanism. --- src/db/callback.c | 210 +++++++++++++++++++++------------------------- src/db/callback.h | 16 ++-- 2 files changed, 104 insertions(+), 122 deletions(-) diff --git a/src/db/callback.c b/src/db/callback.c index 91009afce..6c90e0cb4 100644 --- a/src/db/callback.c +++ b/src/db/callback.c @@ -1,5 +1,5 @@ /*************************************************************************\ -* Copyright (c) 2007 UChicago Argonne LLC, as Operator of Argonne +* Copyright (c) 2008 UChicago Argonne LLC, as Operator of Argonne * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. @@ -18,6 +18,7 @@ #include #include +#include "cantProceed.h" #include "dbDefs.h" #include "epicsEvent.h" #include "epicsThread.h" @@ -40,150 +41,132 @@ #include "dbLock.h" #include "callback.h" -int callbackQueueSize = 2000; + +static epicsThreadOnceId callbackOnceFlag = EPICS_THREAD_ONCE_INIT; +static int callbackQueueSize = 2000; static epicsEventId callbackSem[NUM_CALLBACK_PRIORITIES]; static epicsRingPointerId callbackQ[NUM_CALLBACK_PRIORITIES]; -static epicsThreadId callbackTaskId[NUM_CALLBACK_PRIORITIES]; -static int ringOverflow[NUM_CALLBACK_PRIORITIES]; +static volatile int ringOverflow[NUM_CALLBACK_PRIORITIES]; -volatile int callbackRestart = FALSE; -volatile int callbackExit = FALSE; - -static int priorityValue[NUM_CALLBACK_PRIORITIES] = {0,1,2}; - - -/*for Delayed Requests */ -static void notify(void *pPrivate); +/* Timer for Delayed Requests */ static epicsTimerQueueId timerQueue; +/* Shutdown handling */ +static epicsEventId exitEvent; +static void *exitValue; -/* forward references */ -static void wdCallback(void *ind); /*callback from taskwd*/ -static void start(int ind); /*start or restart a callbackTask*/ - -/*public routines */ -int epicsShareAPI callbackSetQueueSize(int size) +/* Static data */ +static char *threadName[NUM_CALLBACK_PRIORITIES] = { + "cbLow", "cbMedium", "cbHigh" +}; +static unsigned int threadPriority[NUM_CALLBACK_PRIORITIES] = { + epicsThreadPriorityScanLow - 1, + epicsThreadPriorityScanLow + 4, + epicsThreadPriorityScanHigh + 1 +}; +static int priorityValue[NUM_CALLBACK_PRIORITIES] = {0, 1, 2}; + + +int callbackSetQueueSize(int size) { + if (callbackOnceFlag != EPICS_THREAD_ONCE_INIT) { + errlogPrintf("Callback system already initialized\n"); + return -1; + } callbackQueueSize = size; return 0; } +static void callbackTask(void *arg) +{ + int priority = *(int *)arg; + + taskwdInsert(epicsThreadGetIdSelf(), NULL, NULL); + while(TRUE) { + epicsEventMustWait(callbackSem[priority]); + void *ptr; + while((ptr = epicsRingPointerPop(callbackQ[priority]))) { + if (ptr == &exitValue) goto shutdown; + CALLBACK *pcallback = (CALLBACK *)ptr; + ringOverflow[priority] = FALSE; + (*pcallback->callback)(pcallback); + } + } +shutdown: + taskwdRemove(epicsThreadGetIdSelf()); + epicsEventSignal(exitEvent); +} + static void callbackShutdown(void *arg) { int i; - callbackExit = TRUE; - for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { - epicsEventSignal(callbackSem[i]); - } -} -static int callbackWait(int priority) -{ - if (callbackExit) return FALSE; - epicsEventMustWait(callbackSem[priority]); - return !callbackExit; + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { + int lockKey = epicsInterruptLock(); + int ok = epicsRingPointerPush(callbackQ[i], &exitValue); + epicsInterruptUnlock(lockKey); + epicsEventSignal(callbackSem[i]); + if (ok) epicsEventWait(exitEvent); + } } static void callbackInitPvt(void *arg) { int i; + exitEvent = epicsEventMustCreate(epicsEventEmpty); timerQueue = epicsTimerQueueAllocate(0,epicsThreadPriorityScanHigh); for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { - start(i); + epicsThreadId tid; + + callbackSem[i] = epicsEventMustCreate(epicsEventEmpty); + callbackQ[i] = epicsRingPointerCreate(callbackQueueSize); + if (callbackQ[i] == 0) + cantProceed("epicsRingPointerCreate failed for %s\n", + threadName[i]); + ringOverflow[i] = FALSE; + tid = epicsThreadCreate(threadName[i], threadPriority[i], + epicsThreadGetStackSize(epicsThreadStackBig), + (EPICSTHREADFUNC)callbackTask, &priorityValue[i]); + if (tid == 0) + cantProceed("Failed to spawn callback task %s\n", threadName[i]); } epicsAtExit(callbackShutdown, NULL); } -void epicsShareAPI callbackInit(void) +void callbackInit(void) { - static epicsThreadOnceId callbackOnceFlag = EPICS_THREAD_ONCE_INIT; epicsThreadOnce(&callbackOnceFlag,callbackInitPvt,NULL); } -/* Routine which places requests into callback queue*/ -/* This routine can be called from interrupt routine*/ -void epicsShareAPI callbackRequest(CALLBACK *pcallback) +/* This routine can be called from interrupt context */ +void callbackRequest(CALLBACK *pcallback) { int priority = pcallback->priority; int pushOK; int lockKey; - if(priority<0 || priority>=(NUM_CALLBACK_PRIORITIES)) { - epicsPrintf("callbackRequest called with invalid priority\n"); - return; + if (priority < 0 || priority >= NUM_CALLBACK_PRIORITIES) { + epicsPrintf("callbackRequest called with invalid priority\n"); + return; } - if(ringOverflow[priority]) return; + if (ringOverflow[priority]) return; + lockKey = epicsInterruptLock(); - pushOK = epicsRingPointerPush(callbackQ[priority],(void *)pcallback); + pushOK = epicsRingPointerPush(callbackQ[priority], pcallback); epicsInterruptUnlock(lockKey); - if(!pushOK) { - epicsPrintf("callbackRequest ring buffer full\n"); - ringOverflow[priority] = TRUE; + + if (!pushOK) { + errlogPrintf("callbackRequest: %s ring buffer full\n", + threadName[priority]); + ringOverflow[priority] = TRUE; } epicsEventSignal(callbackSem[priority]); - return; -} - -/* General purpose callback task */ -static void callbackTask(int *ppriority) -{ - int priority = *ppriority; - CALLBACK *pcallback; - - taskwdInsert(epicsThreadGetIdSelf(), - wdCallback,(void *)&priorityValue[priority]); - ringOverflow[priority] = FALSE; - while(callbackWait(priority)) { - while((pcallback = (CALLBACK *) - epicsRingPointerPop(callbackQ[priority]))) { - ringOverflow[priority] = FALSE; - (*pcallback->callback)(pcallback); - } - } - taskwdRemove(epicsThreadGetIdSelf()); -} - -static char *priorityName[3] = {"Low","Medium","High"}; -static void start(int ind) -{ - unsigned int priority; - char taskName[20]; - - callbackSem[ind] = epicsEventMustCreate(epicsEventEmpty); - if(ind==0) priority = epicsThreadPriorityScanLow - 1; - else if(ind==1) priority = epicsThreadPriorityScanLow +4; - else if(ind==2) priority = epicsThreadPriorityScanHigh + 1; - else { - errMessage(0,"callback start called with illegal priority\n"); - return; - } - if((callbackQ[ind]=epicsRingPointerCreate(callbackQueueSize)) == 0) - errMessage(0,"epicsRingPointerCreate failed while starting a callback task"); - sprintf(taskName,"cb%s",priorityName[ind]); - callbackTaskId[ind] = epicsThreadCreate(taskName,priority, - epicsThreadGetStackSize(epicsThreadStackBig),(EPICSTHREADFUNC)callbackTask, - &priorityValue[ind]); - if(callbackTaskId[ind]==0) { - errMessage(0,"Failed to spawn a callback task"); - return; - } } - -static void wdCallback(void *pind) -{ - int ind = *(int *)pind; - taskwdRemove(callbackTaskId[ind]); - if(!callbackRestart)return; - epicsEventDestroy(callbackSem[ind]); - epicsRingPointerDelete(callbackQ[ind]); - start(ind); -} - static void ProcessCallback(CALLBACK *pcallback) { - dbCommon *pRec; + dbCommon *pRec; callbackGetUser(pRec, pcallback); dbScanLock(pRec); @@ -191,50 +174,49 @@ static void ProcessCallback(CALLBACK *pcallback) dbScanUnlock(pRec); } -void epicsShareAPI callbackSetProcess(CALLBACK *pcallback, - int Priority, void *pRec) +void callbackSetProcess(CALLBACK *pcallback, int Priority, void *pRec) { callbackSetCallback(ProcessCallback, pcallback); callbackSetPriority(Priority, pcallback); callbackSetUser(pRec, pcallback); } -void epicsShareAPI callbackRequestProcessCallback(CALLBACK *pcallback, - int Priority, void *pRec) +void callbackRequestProcessCallback(CALLBACK *pcallback, + int Priority, void *pRec) { callbackSetProcess(pcallback, Priority, pRec); callbackRequest(pcallback); } - + static void notify(void *pPrivate) { CALLBACK *pcallback = (CALLBACK *)pPrivate; callbackRequest(pcallback); } -void epicsShareAPI callbackRequestDelayed(CALLBACK *pcallback,double seconds) +void callbackRequestDelayed(CALLBACK *pcallback, double seconds) { epicsTimerId timer = (epicsTimerId)pcallback->timer; - if(timer==0) { - timer = epicsTimerQueueCreateTimer(timerQueue,notify,(void *)pcallback); + if (timer == 0) { + timer = epicsTimerQueueCreateTimer(timerQueue, notify, pcallback); pcallback->timer = timer; } - epicsTimerStartDelay(timer,seconds); + epicsTimerStartDelay(timer, seconds); } -void epicsShareAPI callbackCancelDelayed(CALLBACK *pcallback) +void callbackCancelDelayed(CALLBACK *pcallback) { epicsTimerId timer = (epicsTimerId)pcallback->timer; - if (timer!=0) { + if (timer != 0) { epicsTimerCancel(timer); } } -void epicsShareAPI callbackRequestProcessCallbackDelayed(CALLBACK *pcallback, - int Priority, void *pRec,double seconds) +void callbackRequestProcessCallbackDelayed(CALLBACK *pcallback, + int Priority, void *pRec, double seconds) { callbackSetProcess(pcallback, Priority, pRec); - callbackRequestDelayed(pcallback,seconds); + callbackRequestDelayed(pcallback, seconds); } diff --git a/src/db/callback.h b/src/db/callback.h index aa13a754b..023a4c3c8 100644 --- a/src/db/callback.h +++ b/src/db/callback.h @@ -55,18 +55,18 @@ typedef void (*CALLBACKFUNC)(struct callbackPvt*); #define callbackGetUser(USER,PCALLBACK)\ ( (USER) = (void *)((CALLBACK *)(PCALLBACK))->user ) -epicsShareFunc void epicsShareAPI callbackInit(void); -epicsShareFunc void epicsShareAPI callbackRequest(CALLBACK *pCallback); -epicsShareFunc void epicsShareAPI callbackSetProcess( +epicsShareFunc void callbackInit(void); +epicsShareFunc void callbackRequest(CALLBACK *pCallback); +epicsShareFunc void callbackSetProcess( CALLBACK *pcallback, int Priority, void *pRec); -epicsShareFunc void epicsShareAPI callbackRequestProcessCallback( +epicsShareFunc void callbackRequestProcessCallback( CALLBACK *pCallback,int Priority, void *pRec); -epicsShareFunc void epicsShareAPI callbackRequestDelayed( +epicsShareFunc void callbackRequestDelayed( CALLBACK *pCallback,double seconds); -epicsShareFunc void epicsShareAPI callbackCancelDelayed(CALLBACK *pcallback); -epicsShareFunc void epicsShareAPI callbackRequestProcessCallbackDelayed( +epicsShareFunc void callbackCancelDelayed(CALLBACK *pcallback); +epicsShareFunc void callbackRequestProcessCallbackDelayed( CALLBACK *pCallback, int Priority, void *pRec, double seconds); -epicsShareFunc int epicsShareAPI callbackSetQueueSize(int size); +epicsShareFunc int callbackSetQueueSize(int size); #ifdef __cplusplus }