From bd0c759af3b07b44073497d30efb5dd7eb71bedd Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Tue, 26 Aug 2014 10:36:32 -0700 Subject: [PATCH] ioc/db: change call back queue wakeup algorithm (only wake when threads are sleeping and enough work in the queue) --- src/ioc/db/callback.c | 80 +++++++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index a3ae657df..d8da9fd42 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -29,6 +29,7 @@ #include "epicsString.h" #include "epicsTimer.h" #include "epicsRingPointer.h" +#include "epicsAtomic.h" #include "errlog.h" #include "dbStaticLib.h" #include "dbBase.h" @@ -47,13 +48,18 @@ static int callbackQueueSize = 2000; -static epicsEventId callbackSem[NUM_CALLBACK_PRIORITIES]; -static epicsRingPointerId callbackQ[NUM_CALLBACK_PRIORITIES]; -static volatile int ringOverflow[NUM_CALLBACK_PRIORITIES]; -/* Parallel callback threads (configured and actual counts) */ -static int callbackThreadCount[NUM_CALLBACK_PRIORITIES] = { 1, 1, 1 }; -static int callbackThreadsRunning[NUM_CALLBACK_PRIORITIES]; +typedef struct cbQueueSet { + epicsEventId semWakeUp; + epicsRingPointerId queue; + int queueOverflow; + int threadsConfigured; + int threadsRunning; + int threadsBusy; +} cbQueueSet; + +static cbQueueSet callbackQueue[NUM_CALLBACK_PRIORITIES]; + int callbackParallelThreadsDefault = 2; epicsExportAddress(int,callbackParallelThreadsDefault); @@ -106,7 +112,7 @@ int callbackParallelThreads(int count, const char *prio) if (!prio || strcmp(prio, "") == 0 || strcmp(prio, "*") == 0) { for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { - callbackThreadCount[i] = count; + callbackQueue[i].threadsConfigured = count; } } else { if (!pdbbase) { @@ -123,7 +129,7 @@ int callbackParallelThreads(int count, const char *prio) if (gotMatch) break; } if (gotMatch) { - callbackThreadCount[i] = count; + callbackQueue[i].threadsConfigured = count; break; } else { errlogPrintf("Unknown priority \"%s\"\n", prio); @@ -138,23 +144,22 @@ int callbackParallelThreads(int count, const char *prio) static void callbackTask(void *arg) { - int priority = *(int *)arg; + cbQueueSet *mySet = &callbackQueue[*(int*)arg]; taskwdInsert(0, NULL, NULL); epicsEventSignal(startStopEvent); + epicsAtomicIncrIntT(&mySet->threadsBusy); while(TRUE) { void *ptr; - epicsEventMustWait(callbackSem[priority]); - while ((ptr = epicsRingPointerPop(callbackQ[priority]))) { - /* Retrigger if there are more threads and more work */ - if (callbackThreadsRunning[priority] > 1 - && !epicsRingPointerIsEmpty(callbackQ[priority])) { - epicsEventTrigger(callbackSem[priority]); - } + epicsAtomicDecrIntT(&mySet->threadsBusy); + if (epicsRingPointerIsEmpty(mySet->queue)) + epicsEventMustWait(mySet->semWakeUp); + epicsAtomicIncrIntT(&mySet->threadsBusy); + while ((ptr = epicsRingPointerPop(mySet->queue))) { CALLBACK *pcallback = (CALLBACK *)ptr; if (ptr == &exitCallback) goto shutdown; - ringOverflow[priority] = FALSE; + mySet->queueOverflow = FALSE; (*pcallback->callback)(pcallback); } } @@ -172,9 +177,9 @@ void callbackShutdown(void) cbCtl = ctlExit; for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { - while (callbackThreadsRunning[i]--) { - int ok = epicsRingPointerPush(callbackQ[i], &exitCallback); - epicsEventSignal(callbackSem[i]); + while (callbackQueue[i].threadsRunning--) { + int ok = epicsRingPointerPush(callbackQueue[i].queue, &exitCallback); + epicsEventSignal(callbackQueue[i].semWakeUp); if (ok) epicsEventWait(startStopEvent); } } @@ -189,25 +194,25 @@ void callbackInit(void) int j; char threadName[32]; - if(startStopEvent) + if (startStopEvent) return; startStopEvent = epicsEventMustCreate(epicsEventEmpty); cbCtl = ctlRun; - timerQueue = epicsTimerQueueAllocate(0,epicsThreadPriorityScanHigh); + timerQueue = epicsTimerQueueAllocate(0, epicsThreadPriorityScanHigh); for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { epicsThreadId tid; - callbackSem[i] = epicsEventMustCreate(epicsEventEmpty); - callbackQ[i] = epicsRingPointerLockedCreate(callbackQueueSize); - if (callbackQ[i] == 0) + callbackQueue[i].semWakeUp = epicsEventMustCreate(epicsEventEmpty); + callbackQueue[i].queue = epicsRingPointerLockedCreate(callbackQueueSize); + if (callbackQueue[i].queue == 0) cantProceed("epicsRingPointerLockedCreate failed for %s\n", threadNamePrefix[i]); - ringOverflow[i] = FALSE; + callbackQueue[i].queueOverflow = FALSE; - for (j = 0; j < callbackThreadCount[i]; j++) { - if (callbackThreadCount[i] > 1 ) + for (j = 0; j < callbackQueue[i].threadsConfigured; j++) { + if (callbackQueue[i].threadsConfigured > 1 ) sprintf(threadName, "%s-%d", threadNamePrefix[i], j); else strcpy(threadName, threadNamePrefix[i]); @@ -218,7 +223,7 @@ void callbackInit(void) cantProceed("Failed to spawn callback thread %s\n", threadName); } else { epicsEventWait(startStopEvent); - callbackThreadsRunning[i]++; + callbackQueue[i].threadsRunning++; } } } @@ -229,6 +234,8 @@ int callbackRequest(CALLBACK *pcallback) { int priority; int pushOK; + int threadsBusy; + cbQueueSet *mySet; if (!pcallback) { epicsInterruptContextMessage("callbackRequest: pcallback was NULL\n"); @@ -239,9 +246,10 @@ int callbackRequest(CALLBACK *pcallback) epicsInterruptContextMessage("callbackRequest: Bad priority\n"); return S_db_badChoice; } - if (ringOverflow[priority]) return S_db_bufFull; + mySet = &callbackQueue[priority]; + if (mySet->queueOverflow) return S_db_bufFull; - pushOK = epicsRingPointerPush(callbackQ[priority], pcallback); + pushOK = epicsRingPointerPush(mySet->queue, pcallback); if (!pushOK) { char msg[48] = "callbackRequest: "; @@ -249,10 +257,16 @@ int callbackRequest(CALLBACK *pcallback) strcat(msg, threadNamePrefix[priority]); strcat(msg, " ring buffer full\n"); epicsInterruptContextMessage(msg); - ringOverflow[priority] = TRUE; + mySet->queueOverflow = TRUE; return S_db_bufFull; } - epicsEventSignal(callbackSem[priority]); + /* Wake up another sleeping thread, if threads are sleeping + * and there are more jobs in the queue than busy threads */ + threadsBusy = epicsAtomicGetIntT(mySet->threadsBusy); + if (threadsBusy < mySet->threadsRunning + && epicsRingPointerGetUsed(mySet->queue) > threadsBusy) { + epicsEventSignal(mySet->semWakeUp); + } return 0; }