thread pool implementation

This commit is contained in:
Michael Davidsaver
2014-07-24 14:22:52 -04:00
parent a8fd14aae1
commit 658bd0b570
5 changed files with 822 additions and 0 deletions

View File

@@ -0,0 +1,394 @@
/*************************************************************************\
* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
* Brookhaven National Laboratory.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#define epicsExportSharedSymbols
#include "dbDefs.h"
#include "errlog.h"
#include "ellLib.h"
#include "epicsThread.h"
#include "epicsMutex.h"
#include "epicsEvent.h"
#include "epicsInterrupt.h"
#include "errCommon.h"
#include "cantProceed.h"
#include "epicsThreadPool.h"
#include "poolPriv.h"
void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts)
{
memset(opts, 0, sizeof(*opts));
opts->maxThreads=epicsThreadGetCPUs();
opts->workerStack=epicsThreadGetStackSize(epicsThreadStackSmall);
if(epicsThreadLowestPriorityLevelAbove(epicsThreadPriorityCAServerHigh, &opts->workerPriority)
!=epicsThreadBooleanStatusSuccess)
opts->workerPriority = epicsThreadPriorityMedium;
}
epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts)
{
size_t i;
epicsThreadPool *pool;
/* caller likely didn't initialize the options structure */
if(opts && opts->maxThreads==0) {
errlogMessage("Error: epicsThreadPoolCreate() options provided, but not initialized");
return NULL;
}
pool=calloc(1, sizeof(*pool));
if(!pool)
return NULL;
if(opts)
memcpy(&pool->conf, opts, sizeof(*opts));
else
epicsThreadPoolConfigDefaults(&pool->conf);
if(pool->conf.initialThreads > pool->conf.maxThreads)
pool->conf.initialThreads = pool->conf.maxThreads;
pool->workerWakeup=epicsEventCreate(epicsEventEmpty);
pool->shutdownEvent=epicsEventCreate(epicsEventEmpty);
pool->observerWakeup=epicsEventCreate(epicsEventEmpty);
pool->guard=epicsMutexCreate();
if(!pool->workerWakeup || !pool->shutdownEvent ||
!pool->observerWakeup || !pool->guard)
goto cleanup;
ellInit(&pool->jobs);
ellInit(&pool->owned);
epicsMutexMustLock(pool->guard);
for(i=0; i<pool->conf.initialThreads; i++) {
createPoolThread(pool);
}
epicsMutexUnlock(pool->guard);
if(pool->threadsRunning==0 && pool->conf.initialThreads!=0) {
errlogPrintf("Error: Unable to create any threads for thread pool\n");
goto cleanup;
}else if(pool->threadsRunning < pool->conf.initialThreads) {
errlogPrintf("Warning: Unable to create all threads for thread pool (%lu/%lu)\n",
(unsigned long)pool->threadsRunning,
(unsigned long)pool->conf.initialThreads);
}
return pool;
cleanup:
if(pool->workerWakeup) epicsEventDestroy(pool->workerWakeup);
if(pool->shutdownEvent) epicsEventDestroy(pool->shutdownEvent);
if(pool->observerWakeup) epicsEventDestroy(pool->observerWakeup);
if(pool->guard) epicsMutexDestroy(pool->guard);
free(pool);
return 0;
}
static
void _epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val)
{
if(pool->freezeopt)
return;
if(opt==epicsThreadPoolQueueAdd) {
pool->pauseadd = !val;
} else if(opt==epicsThreadPoolQueueRun) {
if(!val && !pool->pauserun)
pool->pauserun=1;
else if(val && pool->pauserun) {
size_t jobs=(size_t)ellCount(&pool->jobs);
pool->pauserun=0;
if(jobs) {
size_t wakeable=pool->threadsSleeping - pool->threadsWaking;
/* first try to give jobs to sleeping workers */
if(wakeable) {
int wakeup = jobs > wakeable ? wakeable : jobs;
assert(wakeup>0);
jobs-=wakeup;
pool->threadsWaking+=wakeup;
epicsEventSignal(pool->workerWakeup);
CHECKCOUNT(pool);
}
}
while(jobs-- && pool->threadsRunning < pool->conf.maxThreads) {
pool->threadsWaking++;
epicsEventSignal(pool->workerWakeup);
createPoolThread(pool);
}
CHECKCOUNT(pool);
}
}
/* unknown options ignored */
}
void epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val)
{
epicsMutexMustLock(pool->guard);
_epicsThreadPoolControl(pool, opt, val);
epicsMutexUnlock(pool->guard);
}
int epicsThreadPoolWait(epicsThreadPool* pool, double timeout)
{
int ret=0;
epicsMutexMustLock(pool->guard);
while(ellCount(&pool->jobs)>0 || pool->threadsAreAwake>0) {
pool->observerCount++;
epicsMutexUnlock(pool->guard);
if(timeout<0.0) {
epicsEventMustWait(pool->observerWakeup);
} else {
switch(epicsEventWaitWithTimeout(pool->observerWakeup, timeout)) {
case epicsEventWaitError:
cantProceed("epicsThreadPoolWait: failed to wait for Event");
break;
case epicsEventWaitTimeout:
ret=EWOULDBLOCK;
break;
case epicsEventWaitOK:
ret=0;
break;
}
}
epicsMutexMustLock(pool->guard);
pool->observerCount--;
if(pool->observerCount)
epicsEventSignal(pool->observerWakeup);
if(ret!=0)
break;
}
epicsMutexUnlock(pool->guard);
return ret;
}
void epicsThreadPoolDestroy(epicsThreadPool *pool)
{
size_t nThr;
ELLLIST notify;
ELLNODE *cur;
if(!pool)
return;
ellInit(&notify);
epicsMutexMustLock(pool->guard);
/* run remaining queued jobs */
_epicsThreadPoolControl(pool, epicsThreadPoolQueueAdd, 0);
_epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1);
nThr=pool->threadsRunning;
pool->freezeopt = 1;
epicsMutexUnlock(pool->guard);
epicsThreadPoolWait(pool, -1.0);
/* At this point all queued jobs have run */
epicsMutexMustLock(pool->guard);
pool->shutdown=1;
/* wakeup all */
if(pool->threadsWaking < pool->threadsSleeping) {
pool->threadsWaking = pool->threadsSleeping;
epicsEventSignal(pool->workerWakeup);
}
ellConcat(&notify, &pool->owned);
ellConcat(&notify, &pool->jobs);
epicsMutexUnlock(pool->guard);
if(nThr && epicsEventWait(pool->shutdownEvent)!=epicsEventWaitOK){
errlogMessage("epicsThreadPoolDestroy: wait error");
return;
}
/* all workers are now shutdown */
/* notify remaining jobs that pool is being destroyed */
while( (cur=ellGet(&notify))!=NULL )
{
epicsJob *job=CONTAINER(cur, epicsJob, jobnode);
job->running=1;
(*job->func)(job->arg, epicsJobModeCleanup);
job->running=0;
if(job->freewhendone)
free(job);
else
job->pool=NULL; /* orphan */
}
epicsEventDestroy(pool->workerWakeup);
epicsEventDestroy(pool->shutdownEvent);
epicsEventDestroy(pool->observerWakeup);
epicsMutexDestroy(pool->guard);
free(pool);
}
void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd)
{
ELLNODE *cur;
epicsMutexMustLock(pool->guard);
fprintf(fd, "Thread Pool with %lu/%lu threads\n"
" running %d jobs with %lu threads\n",
(unsigned long)pool->threadsRunning,
(unsigned long)pool->conf.maxThreads,
ellCount(&pool->jobs),
(unsigned long)pool->threadsAreAwake);
if(pool->pauseadd)
fprintf(fd, " Inhibit queueing\n");
if(pool->pauserun)
fprintf(fd, " Pause workers\n");
if(pool->shutdown)
fprintf(fd, " Shutdown in progress\n");
for(cur=ellFirst(&pool->jobs); cur; cur=ellNext(cur))
{
epicsJob *job=CONTAINER(cur, epicsJob, jobnode);
fprintf(fd, " job 0x%lu func: 0x%lu, arg: 0x%lu ",
(unsigned long)job, (unsigned long)job->func,
(unsigned long)job->arg);
if(job->queued)
fprintf(fd, "Queued ");
if(job->running)
fprintf(fd, "Running ");
if(job->freewhendone)
fprintf(fd, "Free ");
fprintf(fd, "\n");
}
epicsMutexUnlock(pool->guard);
}
size_t epicsThreadPoolNThreads(epicsThreadPool *pool)
{
size_t ret;
epicsMutexMustLock(pool->guard);
ret=pool->threadsRunning;
epicsMutexUnlock(pool->guard);
return ret;
}
static
ELLLIST sharedPools = ELLLIST_INIT;
static
epicsMutexId sharedPoolsGuard;
static
epicsThreadOnceId sharedPoolsOnce = EPICS_THREAD_ONCE_INIT;
static
void sharedPoolsInit(void* unused)
{
sharedPoolsGuard = epicsMutexMustCreate();
}
epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts)
{
ELLNODE *node;
epicsThreadPool *cur;
size_t N=epicsThreadGetCPUs();
if(!opts)
return NULL;
/* shared pools must have a minimum allowed number of workers.
* Use the number of CPU cores
*/
if(opts->maxThreads<N)
opts->maxThreads=N;
epicsThreadOnce(&sharedPoolsOnce, &sharedPoolsInit, NULL);
epicsMutexMustLock(sharedPoolsGuard);
for(node=ellFirst(&sharedPools); node; node=ellNext(node))
{
cur=CONTAINER(node, epicsThreadPool, sharedNode);
/* Must have exactly the requested priority
* At least the requested max workers
* and at least the requested stack size
*/
if(cur->conf.workerPriority != opts->workerPriority)
continue;
if(cur->conf.maxThreads < opts->maxThreads)
continue;
if(cur->conf.workerStack < opts->workerStack)
continue;
cur->sharedCount++;
assert(cur->sharedCount>0);
epicsMutexUnlock(sharedPoolsGuard);
epicsMutexMustLock(cur->guard);
*opts = cur->conf;
epicsMutexUnlock(cur->guard);
return cur;
}
cur=epicsThreadPoolCreate(opts);
if(!cur) {
epicsMutexUnlock(sharedPoolsGuard);
return NULL;
}
cur->sharedCount=1;
ellAdd(&sharedPools, &cur->sharedNode);
epicsMutexUnlock(sharedPoolsGuard);
return cur;
}
epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool)
{
if(!pool)
return;
epicsMutexMustLock(sharedPoolsGuard);
assert(pool->sharedCount>0);
pool->sharedCount--;
if(pool->sharedCount==0) {
ellDelete(&sharedPools, &pool->sharedNode);
epicsThreadPoolDestroy(pool);
}
epicsMutexUnlock(sharedPoolsGuard);
}