diff --git a/src/libCom/Makefile b/src/libCom/Makefile index c8f41b330..767180316 100644 --- a/src/libCom/Makefile +++ b/src/libCom/Makefile @@ -31,6 +31,7 @@ include $(LIBCOM)/log/Makefile include $(LIBCOM)/macLib/Makefile include $(LIBCOM)/misc/Makefile include $(LIBCOM)/osi/Makefile +include $(LIBCOM)/pool/Makefile include $(LIBCOM)/ring/Makefile include $(LIBCOM)/taskwd/Makefile include $(LIBCOM)/timer/Makefile diff --git a/src/libCom/pool/Makefile b/src/libCom/pool/Makefile new file mode 100644 index 000000000..efaf66729 --- /dev/null +++ b/src/libCom/pool/Makefile @@ -0,0 +1,16 @@ +#************************************************************************* +# Copyright (c) 2014 UChicago Argonne LLC, as Operator of Argonne +# National Laboratory. +# EPICS BASE is distributed subject to a Software License Agreement found +# in file LICENSE that is included with this distribution. +#************************************************************************* + +# This is a Makefile fragment, see src/libCom/Makefile. + +SRC_DIRS += $(LIBCOM)/pool + +INC += epicsThreadPool.h + +Com_SRCS += poolJob.c +Com_SRCS += threadPool.c + diff --git a/src/libCom/pool/poolJob.c b/src/libCom/pool/poolJob.c new file mode 100644 index 000000000..00cc8dfaf --- /dev/null +++ b/src/libCom/pool/poolJob.c @@ -0,0 +1,314 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +#include +#include +#include + +#define epicsExportSharedSymbols + +#include "dbDefs.h" +#include "errlog.h" +#include "ellLib.h" +#include "epicsThread.h" +#include "epicsMutex.h" +#include "epicsEvent.h" +#include "epicsInterrupt.h" +#include "errCommon.h" + +#include "epicsThreadPool.h" +#include "poolPriv.h" + +void* _epicsJobArgSelf = &_epicsJobArgSelf; + +static +void workerMain(void* arg) +{ + epicsThreadPool *pool=arg; + + /* workers are created with counts + * in the running, sleeping, and (possibly) waking counters + */ + + epicsMutexMustLock(pool->guard); + pool->threadsAreAwake++; + pool->threadsSleeping--; + + while(1) + { + ELLNODE *cur; + epicsJob *job; + + pool->threadsAreAwake--; + pool->threadsSleeping++; + epicsMutexUnlock(pool->guard); + + epicsEventMustWait(pool->workerWakeup); + + epicsMutexMustLock(pool->guard); + pool->threadsSleeping--; + pool->threadsAreAwake++; + + if(pool->threadsWaking==0) + continue; + + pool->threadsWaking--; + + CHECKCOUNT(pool); + + if(pool->shutdown) + break; + + if(pool->pauserun) + continue; + + /* more threads to wakeup */ + if(pool->threadsWaking) + { + epicsEventSignal(pool->workerWakeup); + } + + while ((cur=ellGet(&pool->jobs)) !=NULL) + { + job=CONTAINER(cur, epicsJob, jobnode); + + assert(job->queued && !job->running); + + job->queued=0; + job->running=1; + + epicsMutexUnlock(pool->guard); + (*job->func)(job->arg, epicsJobModeRun); + epicsMutexMustLock(pool->guard); + + if(job->freewhendone) { + job->dead=1; + free(job); + } else { + job->running=0; + /* job may be re-queued from within callback */ + if(job->queued) + ellAdd(&pool->jobs, &job->jobnode); + else + ellAdd(&pool->owned, &job->jobnode); + } + + } + + if(pool->observerCount) + epicsEventSignal(pool->observerWakeup); + } + + pool->threadsAreAwake--; + pool->threadsRunning--; + + { + size_t nrun = pool->threadsRunning, + ocnt = pool->observerCount; + epicsMutexUnlock(pool->guard); + + if(ocnt) + epicsEventSignal(pool->observerWakeup); + + if(nrun) + epicsEventSignal(pool->workerWakeup); /* pass along */ + else + epicsEventSignal(pool->shutdownEvent); + } + + return; +} + +void createPoolThread(epicsThreadPool *pool) +{ + epicsThreadId tid; + + tid = epicsThreadCreate("PoolWorker", + pool->conf.workerPriority, + pool->conf.workerStack, + &workerMain, + pool); + if(!tid) + return; + + pool->threadsRunning++; + pool->threadsSleeping++; +} + +epicsJob* epicsJobCreate(epicsThreadPool* pool, + epicsJobFunction func, + void* arg) +{ + epicsJob *job=calloc(1, sizeof(*job)); + + if(!job) + return NULL; + + if(arg==&_epicsJobArgSelf) + arg=job; + + job->pool=pool; + job->func=func; + job->arg=arg; + + if(pool) { + epicsMutexMustLock(pool->guard); + + ellAdd(&pool->owned, &job->jobnode); + + epicsMutexUnlock(pool->guard); + } + + return job; +} + +void epicsJobDestroy(epicsJob* job) +{ + epicsThreadPool *pool; + if(!job || !job->pool){ + free(job); + return; + } + pool=job->pool; + + epicsMutexMustLock(pool->guard); + + assert(!job->dead); + + epicsJobUnqueue(job); + + if(job->running || job->freewhendone) { + job->freewhendone=1; + } else { + ellDelete(&pool->owned, &job->jobnode); + job->dead=1; + free(job); + } + + epicsMutexUnlock(pool->guard); +} + +int epicsJobMove(epicsJob* job, epicsThreadPool* newpool) +{ + epicsThreadPool *pool=job->pool; + + /* remove from current pool */ + if(pool) { + epicsMutexMustLock(pool->guard); + + if(job->queued || job->running) { + epicsMutexUnlock(pool->guard); + return EINVAL; + } + + ellDelete(&pool->owned, &job->jobnode); + + epicsMutexUnlock(pool->guard); + } + + pool = job->pool = newpool; + + /* add to new pool */ + if(pool) { + epicsMutexMustLock(pool->guard); + + ellAdd(&pool->owned, &job->jobnode); + + epicsMutexUnlock(pool->guard); + } + + return 0; +} + +int epicsJobQueue(epicsJob* job) +{ + int ret=0; + epicsThreadPool *pool=job->pool; + if(!pool) + return EINVAL; + + epicsMutexMustLock(pool->guard); + + assert(!job->dead); + + if(pool->pauseadd || job->freewhendone) { + ret=EINVAL; + goto done; + } else if(job->queued) { + goto done; + } + + job->queued=1; + /* Job may be queued from within a callback */ + if(!job->running) { + ellDelete(&pool->owned, &job->jobnode); + ellAdd(&pool->jobs, &job->jobnode); + } else { + /* some worker will find it again before sleeping */ + goto done; + } + + /* Since we hold the lock, we can be certain that all awake worker are + * executing work functions. The current thread may be a worker. + * We prefer to wakeup a new worker rather then wait for a busy worker to + * finish. However, after we initiate a wakeup there will be a race + * between the worker waking up, and a busy worker finishing. + * Thus we can't avoid spurious wakeups. + */ + + if(pool->threadsRunning >= pool->conf.maxThreads) { + /* all workers created... */ + /* ... but some are sleeping, so wake one up */ + if(pool->threadsWaking < pool->threadsSleeping) { + pool->threadsWaking++; + epicsEventSignal(pool->workerWakeup); + } + /*else one of the running workers will find this job before sleeping */ + CHECKCOUNT(pool); + + } else { + /* could create more workers so + * will either create a new worker, or wakeup an existing worker + */ + pool->threadsWaking++; + epicsEventSignal(pool->workerWakeup); + if(pool->threadsWaking > pool->threadsSleeping) + createPoolThread(pool); + CHECKCOUNT(pool); + } + +done: + epicsMutexUnlock(pool->guard); + return ret; +} + +int epicsJobUnqueue(epicsJob* job) +{ + int ret=0; + epicsThreadPool *pool=job->pool; + + if(!pool) + return 0; + + epicsMutexMustLock(pool->guard); + + assert(!job->dead); + + if(job->queued) { + if(!job->running) { + ellDelete(&pool->jobs, &job->jobnode); + ellAdd(&pool->owned, &job->jobnode); + } + job->queued=0; + ret=1; + } + + epicsMutexUnlock(pool->guard); + + return ret; +} + diff --git a/src/libCom/pool/poolPriv.h b/src/libCom/pool/poolPriv.h new file mode 100644 index 000000000..789ea57a6 --- /dev/null +++ b/src/libCom/pool/poolPriv.h @@ -0,0 +1,97 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +#ifndef POOLPRIV_H +#define POOLPRIV_H + +#include "epicsThreadPool.h" +#include "ellLib.h" +#include "epicsThread.h" +#include "epicsEvent.h" +#include "epicsMutex.h" + +struct epicsThreadPool { + ELLNODE sharedNode; + size_t sharedCount; + + ELLLIST jobs; /* run queue */ + ELLLIST owned; /* unqueued jobs. */ + + /* Worker state counters. + * The life cycle of a worker is + * Wakeup -> Awake -> Sleeping + * Newly created workers go into the wakeup state + */ + + /* # of running workers which are not waiting for a wakeup event */ + size_t threadsAreAwake; + /* # of sleeping workers which need to be awakened */ + size_t threadsWaking; + /* # of workers waiting on the workerWakeup event */ + size_t threadsSleeping; + /* # of threads started and not stopped */ + size_t threadsRunning; + + /* # of observers waiting on pool events */ + size_t observerCount; + + epicsEventId workerWakeup; + epicsEventId shutdownEvent; + + epicsEventId observerWakeup; + + /* Disallow epicsJobQueue */ + unsigned int pauseadd:1; + /* Prevent workers from running new jobs */ + unsigned int pauserun:1; + /* Prevent further changes to pool options */ + unsigned int freezeopt:1; + /* tell workers to exit */ + unsigned int shutdown:1; + + epicsMutexId guard; + + /* copy of config passed when created */ + epicsThreadPoolConfig conf; +}; + +/* Called after manipulating counters to check that invariants are preserved */ +#define CHECKCOUNT(pPool) do{if(!(pPool)->shutdown) { \ + assert( (pPool)->threadsAreAwake + (pPool)->threadsSleeping == (pPool)->threadsRunning ); \ + assert( (pPool)->threadsWaking <= (pPool)->threadsSleeping ); \ +}}while(0) + +/* When created a job is idle. queued and running are false + * and jobnode is in the thread pool's owned list. + * + * When the job is added, the queued flag is set and jobnode + * is in the jobs list. + * + * When the job starts running the queued flag is cleared and + * the running flag is set. jobnode is not in any list + * (held locally by worker). + * + * When the job has finished running, the running flag is cleared. + * The queued flag may be set if the job re-added itself. + * Based on the queued flag jobnode is added to the appropriate + * list. + */ +struct epicsJob { + ELLNODE jobnode; + epicsJobFunction func; + void *arg; + epicsThreadPool *pool; + + unsigned int queued:1; + unsigned int running:1; + unsigned int freewhendone:1; /* lazy delete of running job */ + unsigned int dead:1; /* flag to catch use of freed objects */ +}; + +void createPoolThread(epicsThreadPool *pool); + +#endif // POOLPRIV_H diff --git a/src/libCom/pool/threadPool.c b/src/libCom/pool/threadPool.c new file mode 100644 index 000000000..7cb3bb39d --- /dev/null +++ b/src/libCom/pool/threadPool.c @@ -0,0 +1,394 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +#include +#include +#include + +#define epicsExportSharedSymbols + +#include "dbDefs.h" +#include "errlog.h" +#include "ellLib.h" +#include "epicsThread.h" +#include "epicsMutex.h" +#include "epicsEvent.h" +#include "epicsInterrupt.h" +#include "errCommon.h" +#include "cantProceed.h" + +#include "epicsThreadPool.h" +#include "poolPriv.h" + + +void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts) +{ + memset(opts, 0, sizeof(*opts)); + opts->maxThreads=epicsThreadGetCPUs(); + opts->workerStack=epicsThreadGetStackSize(epicsThreadStackSmall); + + if(epicsThreadLowestPriorityLevelAbove(epicsThreadPriorityCAServerHigh, &opts->workerPriority) + !=epicsThreadBooleanStatusSuccess) + opts->workerPriority = epicsThreadPriorityMedium; +} + +epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts) +{ + size_t i; + epicsThreadPool *pool; + + /* caller likely didn't initialize the options structure */ + if(opts && opts->maxThreads==0) { + errlogMessage("Error: epicsThreadPoolCreate() options provided, but not initialized"); + return NULL; + } + + pool=calloc(1, sizeof(*pool)); + if(!pool) + return NULL; + + if(opts) + memcpy(&pool->conf, opts, sizeof(*opts)); + else + epicsThreadPoolConfigDefaults(&pool->conf); + + if(pool->conf.initialThreads > pool->conf.maxThreads) + pool->conf.initialThreads = pool->conf.maxThreads; + + pool->workerWakeup=epicsEventCreate(epicsEventEmpty); + pool->shutdownEvent=epicsEventCreate(epicsEventEmpty); + pool->observerWakeup=epicsEventCreate(epicsEventEmpty); + pool->guard=epicsMutexCreate(); + + if(!pool->workerWakeup || !pool->shutdownEvent || + !pool->observerWakeup || !pool->guard) + goto cleanup; + + ellInit(&pool->jobs); + ellInit(&pool->owned); + + epicsMutexMustLock(pool->guard); + + for(i=0; iconf.initialThreads; i++) { + createPoolThread(pool); + } + + epicsMutexUnlock(pool->guard); + + if(pool->threadsRunning==0 && pool->conf.initialThreads!=0) { + errlogPrintf("Error: Unable to create any threads for thread pool\n"); + goto cleanup; + + }else if(pool->threadsRunning < pool->conf.initialThreads) { + errlogPrintf("Warning: Unable to create all threads for thread pool (%lu/%lu)\n", + (unsigned long)pool->threadsRunning, + (unsigned long)pool->conf.initialThreads); + } + + return pool; + +cleanup: + if(pool->workerWakeup) epicsEventDestroy(pool->workerWakeup); + if(pool->shutdownEvent) epicsEventDestroy(pool->shutdownEvent); + if(pool->observerWakeup) epicsEventDestroy(pool->observerWakeup); + if(pool->guard) epicsMutexDestroy(pool->guard); + + free(pool); + return 0; +} + +static +void _epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val) +{ + if(pool->freezeopt) + return; + + if(opt==epicsThreadPoolQueueAdd) { + pool->pauseadd = !val; + + } else if(opt==epicsThreadPoolQueueRun) { + if(!val && !pool->pauserun) + pool->pauserun=1; + + else if(val && pool->pauserun) { + size_t jobs=(size_t)ellCount(&pool->jobs); + pool->pauserun=0; + + if(jobs) { + size_t wakeable=pool->threadsSleeping - pool->threadsWaking; + /* first try to give jobs to sleeping workers */ + if(wakeable) { + int wakeup = jobs > wakeable ? wakeable : jobs; + assert(wakeup>0); + jobs-=wakeup; + pool->threadsWaking+=wakeup; + epicsEventSignal(pool->workerWakeup); + CHECKCOUNT(pool); + } + } + while(jobs-- && pool->threadsRunning < pool->conf.maxThreads) { + pool->threadsWaking++; + epicsEventSignal(pool->workerWakeup); + createPoolThread(pool); + } + CHECKCOUNT(pool); + } + } + /* unknown options ignored */ + +} + +void epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val) +{ + epicsMutexMustLock(pool->guard); + _epicsThreadPoolControl(pool, opt, val); + epicsMutexUnlock(pool->guard); +} + +int epicsThreadPoolWait(epicsThreadPool* pool, double timeout) +{ + int ret=0; + epicsMutexMustLock(pool->guard); + + while(ellCount(&pool->jobs)>0 || pool->threadsAreAwake>0) { + pool->observerCount++; + epicsMutexUnlock(pool->guard); + + if(timeout<0.0) { + epicsEventMustWait(pool->observerWakeup); + + } else { + switch(epicsEventWaitWithTimeout(pool->observerWakeup, timeout)) { + case epicsEventWaitError: + cantProceed("epicsThreadPoolWait: failed to wait for Event"); + break; + case epicsEventWaitTimeout: + ret=EWOULDBLOCK; + break; + case epicsEventWaitOK: + ret=0; + break; + } + + } + + epicsMutexMustLock(pool->guard); + pool->observerCount--; + + if(pool->observerCount) + epicsEventSignal(pool->observerWakeup); + + if(ret!=0) + break; + } + + epicsMutexUnlock(pool->guard); + return ret; +} + +void epicsThreadPoolDestroy(epicsThreadPool *pool) +{ + size_t nThr; + ELLLIST notify; + ELLNODE *cur; + + if(!pool) + return; + + ellInit(¬ify); + + epicsMutexMustLock(pool->guard); + + /* run remaining queued jobs */ + _epicsThreadPoolControl(pool, epicsThreadPoolQueueAdd, 0); + _epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1); + nThr=pool->threadsRunning; + pool->freezeopt = 1; + + epicsMutexUnlock(pool->guard); + + epicsThreadPoolWait(pool, -1.0); + /* At this point all queued jobs have run */ + + epicsMutexMustLock(pool->guard); + + pool->shutdown=1; + /* wakeup all */ + if(pool->threadsWaking < pool->threadsSleeping) { + pool->threadsWaking = pool->threadsSleeping; + epicsEventSignal(pool->workerWakeup); + } + + ellConcat(¬ify, &pool->owned); + ellConcat(¬ify, &pool->jobs); + + epicsMutexUnlock(pool->guard); + + if(nThr && epicsEventWait(pool->shutdownEvent)!=epicsEventWaitOK){ + errlogMessage("epicsThreadPoolDestroy: wait error"); + return; + } + + /* all workers are now shutdown */ + + /* notify remaining jobs that pool is being destroyed */ + while( (cur=ellGet(¬ify))!=NULL ) + { + epicsJob *job=CONTAINER(cur, epicsJob, jobnode); + job->running=1; + (*job->func)(job->arg, epicsJobModeCleanup); + job->running=0; + if(job->freewhendone) + free(job); + else + job->pool=NULL; /* orphan */ + } + + epicsEventDestroy(pool->workerWakeup); + epicsEventDestroy(pool->shutdownEvent); + epicsEventDestroy(pool->observerWakeup); + epicsMutexDestroy(pool->guard); + + free(pool); +} + + +void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd) +{ + ELLNODE *cur; + epicsMutexMustLock(pool->guard); + + fprintf(fd, "Thread Pool with %lu/%lu threads\n" + " running %d jobs with %lu threads\n", + (unsigned long)pool->threadsRunning, + (unsigned long)pool->conf.maxThreads, + ellCount(&pool->jobs), + (unsigned long)pool->threadsAreAwake); + if(pool->pauseadd) + fprintf(fd, " Inhibit queueing\n"); + if(pool->pauserun) + fprintf(fd, " Pause workers\n"); + if(pool->shutdown) + fprintf(fd, " Shutdown in progress\n"); + + for(cur=ellFirst(&pool->jobs); cur; cur=ellNext(cur)) + { + epicsJob *job=CONTAINER(cur, epicsJob, jobnode); + fprintf(fd, " job 0x%lu func: 0x%lu, arg: 0x%lu ", + (unsigned long)job, (unsigned long)job->func, + (unsigned long)job->arg); + if(job->queued) + fprintf(fd, "Queued "); + if(job->running) + fprintf(fd, "Running "); + if(job->freewhendone) + fprintf(fd, "Free "); + fprintf(fd, "\n"); + } + + epicsMutexUnlock(pool->guard); +} + +size_t epicsThreadPoolNThreads(epicsThreadPool *pool) +{ + size_t ret; + + epicsMutexMustLock(pool->guard); + ret=pool->threadsRunning; + epicsMutexUnlock(pool->guard); + + return ret; +} + +static +ELLLIST sharedPools = ELLLIST_INIT; + +static +epicsMutexId sharedPoolsGuard; + +static +epicsThreadOnceId sharedPoolsOnce = EPICS_THREAD_ONCE_INIT; + +static +void sharedPoolsInit(void* unused) +{ + sharedPoolsGuard = epicsMutexMustCreate(); +} + +epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts) +{ + ELLNODE *node; + epicsThreadPool *cur; + size_t N=epicsThreadGetCPUs(); + + if(!opts) + return NULL; + /* shared pools must have a minimum allowed number of workers. + * Use the number of CPU cores + */ + if(opts->maxThreadsmaxThreads=N; + + epicsThreadOnce(&sharedPoolsOnce, &sharedPoolsInit, NULL); + + epicsMutexMustLock(sharedPoolsGuard); + + for(node=ellFirst(&sharedPools); node; node=ellNext(node)) + { + cur=CONTAINER(node, epicsThreadPool, sharedNode); + + /* Must have exactly the requested priority + * At least the requested max workers + * and at least the requested stack size + */ + if(cur->conf.workerPriority != opts->workerPriority) + continue; + if(cur->conf.maxThreads < opts->maxThreads) + continue; + if(cur->conf.workerStack < opts->workerStack) + continue; + + cur->sharedCount++; + assert(cur->sharedCount>0); + epicsMutexUnlock(sharedPoolsGuard); + + epicsMutexMustLock(cur->guard); + *opts = cur->conf; + epicsMutexUnlock(cur->guard); + return cur; + } + + cur=epicsThreadPoolCreate(opts); + if(!cur) { + epicsMutexUnlock(sharedPoolsGuard); + return NULL; + } + cur->sharedCount=1; + + ellAdd(&sharedPools, &cur->sharedNode); + epicsMutexUnlock(sharedPoolsGuard); + return cur; +} + +epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool) +{ + if(!pool) + return; + + epicsMutexMustLock(sharedPoolsGuard); + + assert(pool->sharedCount>0); + + pool->sharedCount--; + + if(pool->sharedCount==0) { + ellDelete(&sharedPools, &pool->sharedNode); + epicsThreadPoolDestroy(pool); + } + + epicsMutexUnlock(sharedPoolsGuard); +}