ioc/db: change call back queue wakeup algorithm (only wake when threads are sleeping and enough work in the queue)

This commit is contained in:
Ralph Lange
2014-08-26 10:36:32 -07:00
parent 15415b5590
commit bd0c759af3

View File

@@ -29,6 +29,7 @@
#include "epicsString.h"
#include "epicsTimer.h"
#include "epicsRingPointer.h"
#include "epicsAtomic.h"
#include "errlog.h"
#include "dbStaticLib.h"
#include "dbBase.h"
@@ -47,13 +48,18 @@
static int callbackQueueSize = 2000;
static epicsEventId callbackSem[NUM_CALLBACK_PRIORITIES];
static epicsRingPointerId callbackQ[NUM_CALLBACK_PRIORITIES];
static volatile int ringOverflow[NUM_CALLBACK_PRIORITIES];
/* Parallel callback threads (configured and actual counts) */
static int callbackThreadCount[NUM_CALLBACK_PRIORITIES] = { 1, 1, 1 };
static int callbackThreadsRunning[NUM_CALLBACK_PRIORITIES];
typedef struct cbQueueSet {
epicsEventId semWakeUp;
epicsRingPointerId queue;
int queueOverflow;
int threadsConfigured;
int threadsRunning;
int threadsBusy;
} cbQueueSet;
static cbQueueSet callbackQueue[NUM_CALLBACK_PRIORITIES];
int callbackParallelThreadsDefault = 2;
epicsExportAddress(int,callbackParallelThreadsDefault);
@@ -106,7 +112,7 @@ int callbackParallelThreads(int count, const char *prio)
if (!prio || strcmp(prio, "") == 0 || strcmp(prio, "*") == 0) {
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
callbackThreadCount[i] = count;
callbackQueue[i].threadsConfigured = count;
}
} else {
if (!pdbbase) {
@@ -123,7 +129,7 @@ int callbackParallelThreads(int count, const char *prio)
if (gotMatch) break;
}
if (gotMatch) {
callbackThreadCount[i] = count;
callbackQueue[i].threadsConfigured = count;
break;
} else {
errlogPrintf("Unknown priority \"%s\"\n", prio);
@@ -138,23 +144,22 @@ int callbackParallelThreads(int count, const char *prio)
static void callbackTask(void *arg)
{
int priority = *(int *)arg;
cbQueueSet *mySet = &callbackQueue[*(int*)arg];
taskwdInsert(0, NULL, NULL);
epicsEventSignal(startStopEvent);
epicsAtomicIncrIntT(&mySet->threadsBusy);
while(TRUE) {
void *ptr;
epicsEventMustWait(callbackSem[priority]);
while ((ptr = epicsRingPointerPop(callbackQ[priority]))) {
/* Retrigger if there are more threads and more work */
if (callbackThreadsRunning[priority] > 1
&& !epicsRingPointerIsEmpty(callbackQ[priority])) {
epicsEventTrigger(callbackSem[priority]);
}
epicsAtomicDecrIntT(&mySet->threadsBusy);
if (epicsRingPointerIsEmpty(mySet->queue))
epicsEventMustWait(mySet->semWakeUp);
epicsAtomicIncrIntT(&mySet->threadsBusy);
while ((ptr = epicsRingPointerPop(mySet->queue))) {
CALLBACK *pcallback = (CALLBACK *)ptr;
if (ptr == &exitCallback) goto shutdown;
ringOverflow[priority] = FALSE;
mySet->queueOverflow = FALSE;
(*pcallback->callback)(pcallback);
}
}
@@ -172,9 +177,9 @@ void callbackShutdown(void)
cbCtl = ctlExit;
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
while (callbackThreadsRunning[i]--) {
int ok = epicsRingPointerPush(callbackQ[i], &exitCallback);
epicsEventSignal(callbackSem[i]);
while (callbackQueue[i].threadsRunning--) {
int ok = epicsRingPointerPush(callbackQueue[i].queue, &exitCallback);
epicsEventSignal(callbackQueue[i].semWakeUp);
if (ok) epicsEventWait(startStopEvent);
}
}
@@ -189,25 +194,25 @@ void callbackInit(void)
int j;
char threadName[32];
if(startStopEvent)
if (startStopEvent)
return;
startStopEvent = epicsEventMustCreate(epicsEventEmpty);
cbCtl = ctlRun;
timerQueue = epicsTimerQueueAllocate(0,epicsThreadPriorityScanHigh);
timerQueue = epicsTimerQueueAllocate(0, epicsThreadPriorityScanHigh);
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
epicsThreadId tid;
callbackSem[i] = epicsEventMustCreate(epicsEventEmpty);
callbackQ[i] = epicsRingPointerLockedCreate(callbackQueueSize);
if (callbackQ[i] == 0)
callbackQueue[i].semWakeUp = epicsEventMustCreate(epicsEventEmpty);
callbackQueue[i].queue = epicsRingPointerLockedCreate(callbackQueueSize);
if (callbackQueue[i].queue == 0)
cantProceed("epicsRingPointerLockedCreate failed for %s\n",
threadNamePrefix[i]);
ringOverflow[i] = FALSE;
callbackQueue[i].queueOverflow = FALSE;
for (j = 0; j < callbackThreadCount[i]; j++) {
if (callbackThreadCount[i] > 1 )
for (j = 0; j < callbackQueue[i].threadsConfigured; j++) {
if (callbackQueue[i].threadsConfigured > 1 )
sprintf(threadName, "%s-%d", threadNamePrefix[i], j);
else
strcpy(threadName, threadNamePrefix[i]);
@@ -218,7 +223,7 @@ void callbackInit(void)
cantProceed("Failed to spawn callback thread %s\n", threadName);
} else {
epicsEventWait(startStopEvent);
callbackThreadsRunning[i]++;
callbackQueue[i].threadsRunning++;
}
}
}
@@ -229,6 +234,8 @@ int callbackRequest(CALLBACK *pcallback)
{
int priority;
int pushOK;
int threadsBusy;
cbQueueSet *mySet;
if (!pcallback) {
epicsInterruptContextMessage("callbackRequest: pcallback was NULL\n");
@@ -239,9 +246,10 @@ int callbackRequest(CALLBACK *pcallback)
epicsInterruptContextMessage("callbackRequest: Bad priority\n");
return S_db_badChoice;
}
if (ringOverflow[priority]) return S_db_bufFull;
mySet = &callbackQueue[priority];
if (mySet->queueOverflow) return S_db_bufFull;
pushOK = epicsRingPointerPush(callbackQ[priority], pcallback);
pushOK = epicsRingPointerPush(mySet->queue, pcallback);
if (!pushOK) {
char msg[48] = "callbackRequest: ";
@@ -249,10 +257,16 @@ int callbackRequest(CALLBACK *pcallback)
strcat(msg, threadNamePrefix[priority]);
strcat(msg, " ring buffer full\n");
epicsInterruptContextMessage(msg);
ringOverflow[priority] = TRUE;
mySet->queueOverflow = TRUE;
return S_db_bufFull;
}
epicsEventSignal(callbackSem[priority]);
/* Wake up another sleeping thread, if threads are sleeping
* and there are more jobs in the queue than busy threads */
threadsBusy = epicsAtomicGetIntT(mySet->threadsBusy);
if (threadsBusy < mySet->threadsRunning
&& epicsRingPointerGetUsed(mySet->queue) > threadsBusy) {
epicsEventSignal(mySet->semWakeUp);
}
return 0;
}