diff --git a/documentation/RELEASE_NOTES.html b/documentation/RELEASE_NOTES.html index 4e4845715..0dfa814a9 100644 --- a/documentation/RELEASE_NOTES.html +++ b/documentation/RELEASE_NOTES.html @@ -15,6 +15,14 @@ EPICS Base 3.15.0.x releases are not intended for use in production systems.

Changes between 3.15.0.1 and 3.15.0.2

+

+General purpose thread pool

+ +

+A general purpose threaded work queue API epicsThreadPool is added. +Multiple pools can be created with controlable priority and number +of worker threads. Lazy worker startup is supported.

+

Database field setting updates

A database (.db) file loaded by an IOC does not have to repeat the record diff --git a/src/libCom/Makefile b/src/libCom/Makefile index c8f41b330..767180316 100644 --- a/src/libCom/Makefile +++ b/src/libCom/Makefile @@ -31,6 +31,7 @@ include $(LIBCOM)/log/Makefile include $(LIBCOM)/macLib/Makefile include $(LIBCOM)/misc/Makefile include $(LIBCOM)/osi/Makefile +include $(LIBCOM)/pool/Makefile include $(LIBCOM)/ring/Makefile include $(LIBCOM)/taskwd/Makefile include $(LIBCOM)/timer/Makefile diff --git a/src/libCom/pool/Makefile b/src/libCom/pool/Makefile new file mode 100644 index 000000000..efaf66729 --- /dev/null +++ b/src/libCom/pool/Makefile @@ -0,0 +1,16 @@ +#************************************************************************* +# Copyright (c) 2014 UChicago Argonne LLC, as Operator of Argonne +# National Laboratory. +# EPICS BASE is distributed subject to a Software License Agreement found +# in file LICENSE that is included with this distribution. +#************************************************************************* + +# This is a Makefile fragment, see src/libCom/Makefile. + +SRC_DIRS += $(LIBCOM)/pool + +INC += epicsThreadPool.h + +Com_SRCS += poolJob.c +Com_SRCS += threadPool.c + diff --git a/src/libCom/pool/epicsThreadPool.h b/src/libCom/pool/epicsThreadPool.h new file mode 100644 index 000000000..e6b3f03c8 --- /dev/null +++ b/src/libCom/pool/epicsThreadPool.h @@ -0,0 +1,152 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* General purpose worker thread pool manager + * mdavidsaver@bnl.gov + */ +#ifndef EPICSTHREADPOOL_H +#define EPICSTHREADPOOL_H + +#include +#include + +#include "shareLib.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + unsigned int initialThreads; + unsigned int maxThreads; + unsigned int workerStack; + unsigned int workerPriority; +} epicsThreadPoolConfig; + +typedef struct epicsThreadPool epicsThreadPool; + +/* Job function call modes */ +typedef enum { + /* Normal run of job */ + epicsJobModeRun, + + /* Thread pool is being destroyed. + * A chance to cleanup the job immediately with epicsJobDestroy(). + * If ignored, the job is orphaned (dissociated from the thread pool) + * and epicsJobDestroy() must be called later. + */ + epicsJobModeCleanup +} epicsJobMode; + +typedef void (*epicsJobFunction)(void* arg, epicsJobMode mode); + +typedef struct epicsJob epicsJob; + +/* Pool operations */ + +/* Initialize a pool config with default values. + * This much be done to preserve future compatibility + * when new options are added. + */ +epicsShareFunc void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *); + +/* fetch or create a thread pool which can be shared with other users. + * may return NULL for allocation failures + */ +epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts); +epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool); + +/* If opts is NULL then defaults are used. + * The opts pointer is not stored by this call, and may exist on the stack. + */ +epicsShareFunc epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts); + +/* Blocks until all worker threads have stopped. + * Any jobs still attached to this pool receive a callback with EPICSJOB_CLEANUP + * and are then orphaned. + */ +epicsShareFunc void epicsThreadPoolDestroy(epicsThreadPool *); + +/* pool control options */ +typedef enum { + epicsThreadPoolQueueAdd, /* val==0 causes epicsJobQueue to fail, 1 is default */ + epicsThreadPoolQueueRun /* val==0 prevents workers from running jobs, 1 is default */ +} epicsThreadPoolOption; + +epicsShareFunc void epicsThreadPoolControl(epicsThreadPool* pool, + epicsThreadPoolOption opt, + unsigned int val); + +/* Block until job queue is emptied and no jobs are running. + * Useful after calling epicsThreadPoolControl() with option epicsThreadPoolQueueAdd=0 + * + * timeout<0 waits forever, timeout==0 polls, timeout>0 waits at most one timeout period + * Returns 0 for success or non-zero on error (timeout is ETIMEOUT) + */ +epicsShareFunc int epicsThreadPoolWait(epicsThreadPool* pool, double timeout); + + +/* Per job operations */ + +/* Special flag for epicsJobCreate(). + * When passed as the third argument "user" + * the argument passed to the job callback + * will be the epicsJob* + */ +#define EPICSJOB_SELF epicsJobArgSelfMagic +epicsShareExtern void* epicsJobArgSelfMagic; + +/* Creates, but does not add, a new job. + * If pool is NULL then the job is not associated with any pool and + * epicsJobMove() must be called before epicsJobQueue(). + * Safe to call from a running job function. + * Returns a new job pointer, or NULL on error. + */ +epicsShareFunc epicsJob* epicsJobCreate(epicsThreadPool* pool, + epicsJobFunction cb, + void* user); + +/* Cancel and free a job structure. Does not block. + * Job may not be immediately free'd. + * Safe to call from a running job function. + */ +epicsShareFunc void epicsJobDestroy(epicsJob*); + +/* Move the job to a different pool. + * If pool is NULL then the job will no longer be associated + * with any pool. + * Not thread safe. Job must not be running or queued. + * returns 0 on success, non-zero on error. + */ +epicsShareFunc int epicsJobMove(epicsJob* job, epicsThreadPool* pool); + +/* Adds the job to the run queue + * Safe to call from a running job function. + * returns 0 for success, non-zero on error. + */ +epicsShareFunc int epicsJobQueue(epicsJob*); + +/* Remove a job from the run queue if it is queued. + * Safe to call from a running job function. + * returns 0 if job was queued and now is not. + * 1 if job already ran, is running, or was not queued before, + * Other non-zero on error + */ +epicsShareFunc int epicsJobUnqueue(epicsJob*); + + +/* Mostly useful for debugging */ + +epicsShareFunc void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd); + +/* Current number of active workers. May be less than the maximum */ +epicsShareFunc unsigned int epicsThreadPoolNThreads(epicsThreadPool *); + +#ifdef __cplusplus +} +#endif + +#endif // EPICSTHREADPOOL_H diff --git a/src/libCom/pool/poolJob.c b/src/libCom/pool/poolJob.c new file mode 100644 index 000000000..05645fea0 --- /dev/null +++ b/src/libCom/pool/poolJob.c @@ -0,0 +1,327 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +#include +#include +#include + +#define epicsExportSharedSymbols + +#include "dbDefs.h" +#include "errlog.h" +#include "ellLib.h" +#include "epicsThread.h" +#include "epicsMutex.h" +#include "epicsEvent.h" +#include "epicsInterrupt.h" + +#include "epicsThreadPool.h" +#include "poolPriv.h" + +void *epicsJobArgSelfMagic = &epicsJobArgSelfMagic; + +static +void workerMain(void *arg) +{ + epicsThreadPool *pool = arg; + unsigned int nrun, ocnt; + + /* workers are created with counts + * in the running, sleeping, and (possibly) waking counters + */ + + epicsMutexMustLock(pool->guard); + pool->threadsAreAwake++; + pool->threadsSleeping--; + + while (1) { + ELLNODE *cur; + + pool->threadsAreAwake--; + pool->threadsSleeping++; + epicsMutexUnlock(pool->guard); + + epicsEventMustWait(pool->workerWakeup); + + epicsMutexMustLock(pool->guard); + pool->threadsSleeping--; + pool->threadsAreAwake++; + + if (pool->threadsWaking==0) + continue; + + pool->threadsWaking--; + + CHECKCOUNT(pool); + + if (pool->shutdown) + break; + + if (pool->pauserun) + continue; + + /* more threads to wakeup */ + if (pool->threadsWaking) { + epicsEventSignal(pool->workerWakeup); + } + + while ((cur=ellGet(&pool->jobs)) != NULL) { + epicsJob *job = CONTAINER(cur, epicsJob, jobnode); + + assert(job->queued && !job->running); + + job->queued=0; + job->running=1; + + epicsMutexUnlock(pool->guard); + (*job->func)(job->arg, epicsJobModeRun); + epicsMutexMustLock(pool->guard); + + if (job->freewhendone) { + job->dead=1; + free(job); + } + else { + job->running=0; + /* job may be re-queued from within callback */ + if (job->queued) + ellAdd(&pool->jobs, &job->jobnode); + else + ellAdd(&pool->owned, &job->jobnode); + } + } + + if (pool->observerCount) + epicsEventSignal(pool->observerWakeup); + } + + pool->threadsAreAwake--; + pool->threadsRunning--; + + nrun = pool->threadsRunning; + ocnt = pool->observerCount; + epicsMutexUnlock(pool->guard); + + if (ocnt) + epicsEventSignal(pool->observerWakeup); + + if (nrun) + epicsEventSignal(pool->workerWakeup); /* pass along */ + else + epicsEventSignal(pool->shutdownEvent); +} + +int createPoolThread(epicsThreadPool *pool) +{ + epicsThreadId tid; + + tid = epicsThreadCreate("PoolWorker", + pool->conf.workerPriority, + pool->conf.workerStack, + &workerMain, + pool); + if (!tid) + return 1; + + pool->threadsRunning++; + pool->threadsSleeping++; + return 0; +} + +epicsJob* epicsJobCreate(epicsThreadPool *pool, + epicsJobFunction func, + void *arg) +{ + epicsJob *job = calloc(1, sizeof(*job)); + + if (!job) + return NULL; + + if (arg == &epicsJobArgSelfMagic) + arg = job; + + job->pool = NULL; + job->func = func; + job->arg = arg; + + epicsJobMove(job, pool); + + return job; +} + +void epicsJobDestroy(epicsJob *job) +{ + epicsThreadPool *pool; + if (!job || !job->pool) { + free(job); + return; + } + pool = job->pool; + + epicsMutexMustLock(pool->guard); + + assert(!job->dead); + + epicsJobUnqueue(job); + + if (job->running || job->freewhendone) { + job->freewhendone = 1; + } + else { + ellDelete(&pool->owned, &job->jobnode); + job->dead = 1; + free(job); + } + + epicsMutexUnlock(pool->guard); +} + +int epicsJobMove(epicsJob *job, epicsThreadPool *newpool) +{ + epicsThreadPool *pool = job->pool; + + /* remove from current pool */ + if (pool) { + epicsMutexMustLock(pool->guard); + + if (job->queued || job->running) { + epicsMutexUnlock(pool->guard); + return EINVAL; + } + + ellDelete(&pool->owned, &job->jobnode); + + epicsMutexUnlock(pool->guard); + } + + pool = job->pool = newpool; + + /* add to new pool */ + if (pool) { + epicsMutexMustLock(pool->guard); + + ellAdd(&pool->owned, &job->jobnode); + + epicsMutexUnlock(pool->guard); + } + + return 0; +} + +int epicsJobQueue(epicsJob *job) +{ + int ret = 0; + epicsThreadPool *pool = job->pool; + + if (!pool) + return EINVAL; + + epicsMutexMustLock(pool->guard); + + assert(!job->dead); + + if (pool->pauseadd) { + ret = EPERM; + goto done; + } + else if (job->freewhendone) { + ret = EINVAL; + goto done; + } + else if (job->queued) { + goto done; + } + + job->queued = 1; + /* Job may be queued from within a callback */ + if (!job->running) { + ellDelete(&pool->owned, &job->jobnode); + ellAdd(&pool->jobs, &job->jobnode); + } + else { + /* some worker will find it again before sleeping */ + goto done; + } + + /* Since we hold the lock, we can be certain that all awake worker are + * executing work functions. The current thread may be a worker. + * We prefer to wakeup a new worker rather then wait for a busy worker to + * finish. However, after we initiate a wakeup there will be a race + * between the worker waking up, and a busy worker finishing. + * Thus we can't avoid spurious wakeups. + */ + + if (pool->threadsRunning >= pool->conf.maxThreads) { + /* all workers created... */ + /* ... but some are sleeping, so wake one up */ + if (pool->threadsWaking < pool->threadsSleeping) { + pool->threadsWaking++; + epicsEventSignal(pool->workerWakeup); + } + /*else one of the running workers will find this job before sleeping */ + CHECKCOUNT(pool); + + } + else { + /* could create more workers so + * will either create a new worker, or wakeup an existing worker + */ + + if (pool->threadsWaking >= pool->threadsSleeping) { + /* all sleeping workers have already been woken. + * start a new worker for this job + */ + if (createPoolThread(pool) && pool->threadsRunning == 0) { + /* oops, we couldn't lazy create our first worker + * so this job would never run! + */ + ret = EAGAIN; + job->queued = 0; + /* if threadsRunning==0 then no jobs can be running */ + assert(!job->running); + ellDelete(&pool->jobs, &job->jobnode); + ellAdd(&pool->owned, &job->jobnode); + } + } + if (ret == 0) { + pool->threadsWaking++; + epicsEventSignal(pool->workerWakeup); + } + CHECKCOUNT(pool); + } + +done: + epicsMutexUnlock(pool->guard); + return ret; +} + +int epicsJobUnqueue(epicsJob *job) +{ + int ret = 1; + epicsThreadPool *pool = job->pool; + + if (!pool) + return EINVAL; + + epicsMutexMustLock(pool->guard); + + assert(!job->dead); + + if (job->queued) { + if (!job->running) { + ellDelete(&pool->jobs, &job->jobnode); + ellAdd(&pool->owned, &job->jobnode); + } + job->queued = 0; + ret = 0; + } + + epicsMutexUnlock(pool->guard); + + return ret; +} + diff --git a/src/libCom/pool/poolPriv.h b/src/libCom/pool/poolPriv.h new file mode 100644 index 000000000..237cedf45 --- /dev/null +++ b/src/libCom/pool/poolPriv.h @@ -0,0 +1,99 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +#ifndef POOLPRIV_H +#define POOLPRIV_H + +#include "epicsThreadPool.h" +#include "ellLib.h" +#include "epicsThread.h" +#include "epicsEvent.h" +#include "epicsMutex.h" + +struct epicsThreadPool { + ELLNODE sharedNode; + size_t sharedCount; + + ELLLIST jobs; /* run queue */ + ELLLIST owned; /* unqueued jobs. */ + + /* Worker state counters. + * The life cycle of a worker is + * Wakeup -> Awake -> Sleeping + * Newly created workers go into the wakeup state + */ + + /* # of running workers which are not waiting for a wakeup event */ + unsigned int threadsAreAwake; + /* # of sleeping workers which need to be awakened */ + unsigned int threadsWaking; + /* # of workers waiting on the workerWakeup event */ + unsigned int threadsSleeping; + /* # of threads started and not stopped */ + unsigned int threadsRunning; + + /* # of observers waiting on pool events */ + unsigned int observerCount; + + epicsEventId workerWakeup; + epicsEventId shutdownEvent; + + epicsEventId observerWakeup; + + /* Disallow epicsJobQueue */ + unsigned int pauseadd:1; + /* Prevent workers from running new jobs */ + unsigned int pauserun:1; + /* Prevent further changes to pool options */ + unsigned int freezeopt:1; + /* tell workers to exit */ + unsigned int shutdown:1; + + epicsMutexId guard; + + /* copy of config passed when created */ + epicsThreadPoolConfig conf; +}; + +/* Called after manipulating counters to check that invariants are preserved */ +#define CHECKCOUNT(pPool) do { \ + if (!(pPool)->shutdown) { \ + assert((pPool)->threadsAreAwake + (pPool)->threadsSleeping == (pPool)->threadsRunning); \ + assert((pPool)->threadsWaking <= (pPool)->threadsSleeping); \ + } \ +} while(0) + +/* When created a job is idle. queued and running are false + * and jobnode is in the thread pool's owned list. + * + * When the job is added, the queued flag is set and jobnode + * is in the jobs list. + * + * When the job starts running the queued flag is cleared and + * the running flag is set. jobnode is not in any list + * (held locally by worker). + * + * When the job has finished running, the running flag is cleared. + * The queued flag may be set if the job re-added itself. + * Based on the queued flag jobnode is added to the appropriate + * list. + */ +struct epicsJob { + ELLNODE jobnode; + epicsJobFunction func; + void *arg; + epicsThreadPool *pool; + + unsigned int queued:1; + unsigned int running:1; + unsigned int freewhendone:1; /* lazy delete of running job */ + unsigned int dead:1; /* flag to catch use of freed objects */ +}; + +int createPoolThread(epicsThreadPool *pool); + +#endif // POOLPRIV_H diff --git a/src/libCom/pool/threadPool.c b/src/libCom/pool/threadPool.c new file mode 100644 index 000000000..317f4786a --- /dev/null +++ b/src/libCom/pool/threadPool.c @@ -0,0 +1,403 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +#include +#include +#include + +#define epicsExportSharedSymbols + +#include "dbDefs.h" +#include "errlog.h" +#include "ellLib.h" +#include "epicsThread.h" +#include "epicsMutex.h" +#include "epicsEvent.h" +#include "epicsInterrupt.h" +#include "cantProceed.h" + +#include "epicsThreadPool.h" +#include "poolPriv.h" + + +void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts) +{ + memset(opts, 0, sizeof(*opts)); + opts->maxThreads = epicsThreadGetCPUs(); + opts->workerStack = epicsThreadGetStackSize(epicsThreadStackSmall); + + if (epicsThreadLowestPriorityLevelAbove(epicsThreadPriorityCAServerHigh, &opts->workerPriority) + != epicsThreadBooleanStatusSuccess) + opts->workerPriority = epicsThreadPriorityMedium; +} + +epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts) +{ + size_t i; + epicsThreadPool *pool; + + /* caller likely didn't initialize the options structure */ + if (opts && opts->maxThreads == 0) { + errlogMessage("Error: epicsThreadPoolCreate() options provided, but not initialized"); + return NULL; + } + + pool = calloc(1, sizeof(*pool)); + if (!pool) + return NULL; + + if (opts) + memcpy(&pool->conf, opts, sizeof(*opts)); + else + epicsThreadPoolConfigDefaults(&pool->conf); + + if (pool->conf.initialThreads > pool->conf.maxThreads) + pool->conf.initialThreads = pool->conf.maxThreads; + + pool->workerWakeup = epicsEventCreate(epicsEventEmpty); + pool->shutdownEvent = epicsEventCreate(epicsEventEmpty); + pool->observerWakeup = epicsEventCreate(epicsEventEmpty); + pool->guard = epicsMutexCreate(); + + if (!pool->workerWakeup || !pool->shutdownEvent || + !pool->observerWakeup || !pool->guard) + goto cleanup; + + ellInit(&pool->jobs); + ellInit(&pool->owned); + + epicsMutexMustLock(pool->guard); + + for (i = 0; i < pool->conf.initialThreads; i++) { + createPoolThread(pool); + } + + if (pool->threadsRunning == 0 && pool->conf.initialThreads != 0) { + epicsMutexUnlock(pool->guard); + errlogPrintf("Error: Unable to create any threads for thread pool\n"); + goto cleanup; + + } + else if (pool->threadsRunning < pool->conf.initialThreads) { + errlogPrintf("Warning: Unable to create all threads for thread pool (%u/%u)\n", + pool->threadsRunning, pool->conf.initialThreads); + } + + epicsMutexUnlock(pool->guard); + + return pool; + +cleanup: + if (pool->workerWakeup) + epicsEventDestroy(pool->workerWakeup); + if (pool->shutdownEvent) + epicsEventDestroy(pool->shutdownEvent); + if (pool->observerWakeup) + epicsEventDestroy(pool->observerWakeup); + if (pool->guard) + epicsMutexDestroy(pool->guard); + + free(pool); + return 0; +} + +static +void epicsThreadPoolControlImpl(epicsThreadPool *pool, epicsThreadPoolOption opt, unsigned int val) +{ + if (pool->freezeopt) + return; + + if (opt == epicsThreadPoolQueueAdd) { + pool->pauseadd = !val; + } + else if (opt == epicsThreadPoolQueueRun) { + if (!val && !pool->pauserun) + pool->pauserun = 1; + + else if (val && pool->pauserun) { + int jobs = ellCount(&pool->jobs); + pool->pauserun = 0; + + if (jobs) { + int wakeable = pool->threadsSleeping - pool->threadsWaking; + + /* first try to give jobs to sleeping workers */ + if (wakeable) { + int wakeup = jobs > wakeable ? wakeable : jobs; + assert(wakeup > 0); + jobs -= wakeup; + pool->threadsWaking += wakeup; + epicsEventSignal(pool->workerWakeup); + CHECKCOUNT(pool); + } + } + while (jobs-- && pool->threadsRunning < pool->conf.maxThreads) { + if (createPoolThread(pool) == 0) { + pool->threadsWaking++; + epicsEventSignal(pool->workerWakeup); + } + else + break; /* oops, couldn't create worker */ + } + CHECKCOUNT(pool); + } + } + /* unknown options ignored */ + +} + +void epicsThreadPoolControl(epicsThreadPool *pool, epicsThreadPoolOption opt, unsigned int val) +{ + epicsMutexMustLock(pool->guard); + epicsThreadPoolControlImpl(pool, opt, val); + epicsMutexUnlock(pool->guard); +} + +int epicsThreadPoolWait(epicsThreadPool *pool, double timeout) +{ + int ret = 0; + epicsMutexMustLock(pool->guard); + + while (ellCount(&pool->jobs) > 0 || pool->threadsAreAwake > 0) { + pool->observerCount++; + epicsMutexUnlock(pool->guard); + + if (timeout < 0.0) { + epicsEventMustWait(pool->observerWakeup); + } + else { + switch (epicsEventWaitWithTimeout(pool->observerWakeup, timeout)) { + case epicsEventWaitError: + cantProceed("epicsThreadPoolWait: failed to wait for Event"); + break; + case epicsEventWaitTimeout: + ret = ETIMEDOUT; + break; + case epicsEventWaitOK: + ret = 0; + break; + } + } + + epicsMutexMustLock(pool->guard); + pool->observerCount--; + + if (pool->observerCount) + epicsEventSignal(pool->observerWakeup); + + if (ret != 0) + break; + } + + epicsMutexUnlock(pool->guard); + return ret; +} + +void epicsThreadPoolDestroy(epicsThreadPool *pool) +{ + unsigned int nThr; + ELLLIST notify; + ELLNODE *cur; + + if (!pool) + return; + + ellInit(¬ify); + + epicsMutexMustLock(pool->guard); + + /* run remaining queued jobs */ + epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueAdd, 0); + epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueRun, 1); + nThr = pool->threadsRunning; + pool->freezeopt = 1; + + epicsMutexUnlock(pool->guard); + + epicsThreadPoolWait(pool, -1.0); + /* At this point all queued jobs have run */ + + epicsMutexMustLock(pool->guard); + + pool->shutdown = 1; + /* wakeup all */ + if (pool->threadsWaking < pool->threadsSleeping) { + pool->threadsWaking = pool->threadsSleeping; + epicsEventSignal(pool->workerWakeup); + } + + ellConcat(¬ify, &pool->owned); + ellConcat(¬ify, &pool->jobs); + + epicsMutexUnlock(pool->guard); + + if (nThr && epicsEventWait(pool->shutdownEvent) != epicsEventWaitOK){ + errlogMessage("epicsThreadPoolDestroy: wait error"); + return; + } + + /* all workers are now shutdown */ + + /* notify remaining jobs that pool is being destroyed */ + while ((cur = ellGet(¬ify)) != NULL) { + epicsJob *job = CONTAINER(cur, epicsJob, jobnode); + + job->running = 1; + job->func(job->arg, epicsJobModeCleanup); + job->running = 0; + if (job->freewhendone) + free(job); + else + job->pool = NULL; /* orphan */ + } + + epicsEventDestroy(pool->workerWakeup); + epicsEventDestroy(pool->shutdownEvent); + epicsEventDestroy(pool->observerWakeup); + epicsMutexDestroy(pool->guard); + + free(pool); +} + + +void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd) +{ + ELLNODE *cur; + epicsMutexMustLock(pool->guard); + + fprintf(fd, "Thread Pool with %u/%u threads\n" + " running %d jobs with %u threads\n", + pool->threadsRunning, + pool->conf.maxThreads, + ellCount(&pool->jobs), + pool->threadsAreAwake); + if (pool->pauseadd) + fprintf(fd, " Inhibit queueing\n"); + if (pool->pauserun) + fprintf(fd, " Pause workers\n"); + if (pool->shutdown) + fprintf(fd, " Shutdown in progress\n"); + + for (cur = ellFirst(&pool->jobs); cur; cur = ellNext(cur)) { + epicsJob *job = CONTAINER(cur, epicsJob, jobnode); + + fprintf(fd, " job %p func: %p, arg: %p ", + job, job->func, + job->arg); + if (job->queued) + fprintf(fd, "Queued "); + if (job->running) + fprintf(fd, "Running "); + if (job->freewhendone) + fprintf(fd, "Free "); + fprintf(fd, "\n"); + } + + epicsMutexUnlock(pool->guard); +} + +unsigned int epicsThreadPoolNThreads(epicsThreadPool *pool) +{ + unsigned int ret; + + epicsMutexMustLock(pool->guard); + ret = pool->threadsRunning; + epicsMutexUnlock(pool->guard); + + return ret; +} + +static +ELLLIST sharedPools = ELLLIST_INIT; + +static +epicsMutexId sharedPoolsGuard; + +static +epicsThreadOnceId sharedPoolsOnce = EPICS_THREAD_ONCE_INIT; + +static +void sharedPoolsInit(void* unused) +{ + sharedPoolsGuard = epicsMutexMustCreate(); +} + +epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts) +{ + ELLNODE *node; + epicsThreadPool *cur; + epicsThreadPoolConfig defopts; + size_t N = epicsThreadGetCPUs(); + + if (!opts) { + epicsThreadPoolConfigDefaults(&defopts); + opts = &defopts; + } + /* shared pools must have a minimum allowed number of workers. + * Use the number of CPU cores + */ + if (opts->maxThreads < N) + opts->maxThreads = N; + + epicsThreadOnce(&sharedPoolsOnce, &sharedPoolsInit, NULL); + + epicsMutexMustLock(sharedPoolsGuard); + + for (node = ellFirst(&sharedPools); node; node = ellNext(node)) { + cur = CONTAINER(node, epicsThreadPool, sharedNode); + + /* Must have exactly the requested priority + * At least the requested max workers + * and at least the requested stack size + */ + if (cur->conf.workerPriority != opts->workerPriority) + continue; + if (cur->conf.maxThreads < opts->maxThreads) + continue; + if (cur->conf.workerStack < opts->workerStack) + continue; + + cur->sharedCount++; + assert(cur->sharedCount > 0); + epicsMutexUnlock(sharedPoolsGuard); + + epicsMutexMustLock(cur->guard); + *opts = cur->conf; + epicsMutexUnlock(cur->guard); + return cur; + } + + cur = epicsThreadPoolCreate(opts); + if (!cur) { + epicsMutexUnlock(sharedPoolsGuard); + return NULL; + } + cur->sharedCount = 1; + + ellAdd(&sharedPools, &cur->sharedNode); + epicsMutexUnlock(sharedPoolsGuard); + return cur; +} + +epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool) +{ + if (!pool) + return; + + epicsMutexMustLock(sharedPoolsGuard); + + assert(pool->sharedCount > 0); + + pool->sharedCount--; + + if (pool->sharedCount == 0) { + ellDelete(&sharedPools, &pool->sharedNode); + epicsThreadPoolDestroy(pool); + } + + epicsMutexUnlock(sharedPoolsGuard); +} diff --git a/src/libCom/test/Makefile b/src/libCom/test/Makefile index 870313e1c..fd4800953 100755 --- a/src/libCom/test/Makefile +++ b/src/libCom/test/Makefile @@ -106,6 +106,11 @@ epicsThreadHooksTest_SRCS += epicsThreadHooksTest.c testHarness_SRCS += epicsThreadHooksTest.c TESTS += epicsThreadHooksTest +TESTPROD_HOST += epicsThreadPoolTest +epicsThreadPoolTest_SRCS += epicsThreadPoolTest.c +testHarness_SRCS += epicsThreadPoolTest.c +TESTS += epicsThreadPoolTest + TESTPROD_HOST += epicsExitTest epicsExitTest_SRCS += epicsExitTest.c testHarness_SRCS += epicsExitTest.c diff --git a/src/libCom/test/epicsRunLibComTests.c b/src/libCom/test/epicsRunLibComTests.c index f77821199..f59d88b91 100644 --- a/src/libCom/test/epicsRunLibComTests.c +++ b/src/libCom/test/epicsRunLibComTests.c @@ -35,6 +35,7 @@ int epicsThreadOnceTest(void); int epicsThreadPriorityTest(void); int epicsThreadPrivateTest(void); int epicsThreadHooksTest(void); +int epicsThreadPoolTest(void); int epicsTimeTest(void); int epicsTypesTest(void); int macLibTest(void); @@ -94,6 +95,8 @@ void epicsRunLibComTests(void) runTest(epicsThreadHooksTest); + runTest(epicsThreadPoolTest); + runTest(epicsTimeTest); runTest(epicsTypesTest); diff --git a/src/libCom/test/epicsThreadPoolTest.c b/src/libCom/test/epicsThreadPoolTest.c new file mode 100644 index 000000000..f94b9eb5e --- /dev/null +++ b/src/libCom/test/epicsThreadPoolTest.c @@ -0,0 +1,447 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +#include "epicsThreadPool.h" + +/* included to allow tests to peek */ +#include "../../pool/poolPriv.h" + +#include "testMain.h" +#include "epicsUnitTest.h" + +#include "cantProceed.h" +#include "epicsEvent.h" +#include "epicsMutex.h" +#include "epicsThread.h" + +/* Do nothing */ +static void nullop(void) +{ + epicsThreadPool *pool; + testDiag("nullop()"); + { + epicsThreadPoolConfig conf; + epicsThreadPoolConfigDefaults(&conf); + testOk1(conf.maxThreads>0); + + testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL); + if(!pool) + return; + } + + epicsThreadPoolDestroy(pool); +} + +/* Just create and destroy worker threads */ +static void oneop(void) +{ + epicsThreadPool *pool; + testDiag("oneop()"); + { + epicsThreadPoolConfig conf; + epicsThreadPoolConfigDefaults(&conf); + conf.initialThreads=2; + testOk1(conf.maxThreads>0); + + testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL); + if(!pool) + return; + } + + epicsThreadPoolDestroy(pool); +} + +/* Test that Bursts of jobs will create enough threads to + * run all in parallel + */ +typedef struct { + epicsMutexId guard; + unsigned int count; + epicsEventId allrunning; + epicsEventId done; + epicsJob **job; +} countPriv; + +static void countjob(void *param, epicsJobMode mode) +{ + countPriv *cnt=param; + testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + return; + + epicsMutexMustLock(cnt->guard); + testDiag("Job %lu", (unsigned long)cnt->count); + cnt->count--; + if(cnt->count==0) { + testDiag("All jobs running"); + epicsEventSignal(cnt->allrunning); + } + epicsMutexUnlock(cnt->guard); + + epicsEventMustWait(cnt->done); + epicsEventSignal(cnt->done); /* pass along to next thread */ +} + +/* Starts "mcnt" jobs in a pool with initial and max + * thread counts "icnt" and "mcnt". + * The test ensures that all jobs run in parallel. + * "cork" checks the function of pausing the run queue + * with epicsThreadPoolQueueRun + */ +static void postjobs(size_t icnt, size_t mcnt, int cork) +{ + size_t i; + epicsThreadPool *pool; + countPriv *priv=callocMustSucceed(1, sizeof(*priv), "postjobs priv alloc"); + priv->guard=epicsMutexMustCreate(); + priv->done=epicsEventMustCreate(epicsEventEmpty); + priv->allrunning=epicsEventMustCreate(epicsEventEmpty); + priv->count=mcnt; + priv->job=callocMustSucceed(mcnt, sizeof(*priv->job), "postjobs job array"); + + testDiag("postjobs(%lu,%lu)", (unsigned long)icnt, (unsigned long)mcnt); + + { + epicsThreadPoolConfig conf; + epicsThreadPoolConfigDefaults(&conf); + conf.initialThreads=icnt; + conf.maxThreads=mcnt; + + testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL); + if(!pool) + return; + } + + if(cork) + epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0); + + for(i=0; ijob[i] = epicsJobCreate(pool, &countjob, priv); + testOk1(priv->job[i]!=NULL); + testOk1(epicsJobQueue(priv->job[i])==0); + } + + if(cork) { + /* no jobs should have run */ + epicsMutexMustLock(priv->guard); + testOk1(priv->count==mcnt); + epicsMutexUnlock(priv->guard); + + epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1); + } + + testDiag("Waiting for all jobs to start"); + epicsEventMustWait(priv->allrunning); + testDiag("Stop all"); + epicsEventSignal(priv->done); + + for(i=0; ijob[i]); + } + + epicsThreadPoolDestroy(pool); + epicsMutexDestroy(priv->guard); + epicsEventDestroy(priv->allrunning); + epicsEventDestroy(priv->done); + free(priv->job); + free(priv); +} + +static unsigned int flag0 = 0; + +/* Test cancel from job (no-op) + * and destroy from job (lazy free) + */ +static void cleanupjob0(void* arg, epicsJobMode mode) +{ + epicsJob *job=arg; + testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + return; + + assert(flag0==0); + flag0=1; + + testOk1(epicsJobQueue(job)==0); + + epicsJobDestroy(job); /* delete while job is running */ +} +static void cleanupjob1(void* arg, epicsJobMode mode) +{ + epicsJob *job=arg; + testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + return; + + testOk1(epicsJobQueue(job)==0); + + testOk1(epicsJobUnqueue(job)==0); + /* delete later after job finishes, but before pool is destroyed */ +} +static void cleanupjob2(void* arg, epicsJobMode mode) +{ + epicsJob *job=arg; + testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + epicsJobDestroy(job); /* delete when threadpool is destroyed */ + else if(mode==epicsJobModeRun) + testOk1(epicsJobUnqueue(job)==1); +} +static epicsJobFunction cleanupjobs[3] = {&cleanupjob0,&cleanupjob1,&cleanupjob2}; + +/* Tests three methods for job cleanup. + * 1. destroy which running + * 2. deferred cleanup after pool destroyed + * 3. immediate cleanup when pool destroyed + */ +static void testcleanup(void) +{ + int i=0; + epicsThreadPool *pool; + epicsJob *job[3]; + + testDiag("testcleanup()"); + + testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL); + if(!pool) + return; + + /* unrolled so that valgrind can show which methods leaks */ + testOk1((job[0]=epicsJobCreate(pool, cleanupjobs[0], EPICSJOB_SELF))!=NULL); + testOk1((job[1]=epicsJobCreate(pool, cleanupjobs[1], EPICSJOB_SELF))!=NULL); + testOk1((job[2]=epicsJobCreate(pool, cleanupjobs[2], EPICSJOB_SELF))!=NULL); + for(i=0; i<3; i++) { + testOk1(epicsJobQueue(job[i])==0); + } + + epicsThreadPoolWait(pool, -1); + epicsJobDestroy(job[1]); + epicsThreadPoolDestroy(pool); +} + +/* Test re-add from inside job */ +typedef struct { + unsigned int count; + epicsEventId done; + epicsJob *job; + unsigned int inprogress; +} readdPriv; + +static void readdjob(void *arg, epicsJobMode mode) +{ + readdPriv *priv=arg; + testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + return; + testOk1(priv->inprogress==0); + testDiag("count==%u", priv->count); + + if(priv->count--) { + priv->inprogress=1; + epicsJobQueue(priv->job); + epicsThreadSleep(0.05); + priv->inprogress=0; + }else{ + epicsEventSignal(priv->done); + epicsJobDestroy(priv->job); + } +} + +/* Test re-queueing a job while it is running. + * Check that a single job won't run concurrently. + */ +static void testreadd(void) { + epicsThreadPoolConfig conf; + epicsThreadPool *pool; + readdPriv *priv=callocMustSucceed(1, sizeof(*priv), "testcleanup priv"); + readdPriv *priv2=callocMustSucceed(1, sizeof(*priv), "testcleanup priv"); + + testDiag("testreadd"); + + priv->done=epicsEventMustCreate(epicsEventEmpty); + priv->count=5; + priv2->done=epicsEventMustCreate(epicsEventEmpty); + priv2->count=5; + + epicsThreadPoolConfigDefaults(&conf); + conf.maxThreads = 2; + testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL); + if(!pool) + return; + + testOk1((priv->job=epicsJobCreate(pool, &readdjob, priv))!=NULL); + testOk1((priv2->job=epicsJobCreate(pool, &readdjob, priv2))!=NULL); + + testOk1(epicsJobQueue(priv->job)==0); + testOk1(epicsJobQueue(priv2->job)==0); + epicsEventMustWait(priv->done); + epicsEventMustWait(priv2->done); + + testOk1(epicsThreadPoolNThreads(pool)==2); + testDiag("epicsThreadPoolNThreads = %d", epicsThreadPoolNThreads(pool)); + + epicsThreadPoolDestroy(pool); + epicsEventDestroy(priv->done); + epicsEventDestroy(priv2->done); + free(priv); + free(priv2); + +} + +static int shouldneverrun = 0; +static int numtoolate = 0; + +/* test job canceling */ +static +void neverrun(void *arg, epicsJobMode mode) +{ + epicsJob *job=arg; + testOk1(mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + epicsJobDestroy(job); + else + shouldneverrun++; +} +static epicsEventId cancel[2]; +static +void toolate(void *arg, epicsJobMode mode) +{ + epicsJob *job=arg; + if(mode==epicsJobModeCleanup){ + epicsJobDestroy(job); + return; + } + testPass("Job runs"); + numtoolate++; + epicsEventSignal(cancel[0]); + epicsEventMustWait(cancel[1]); +} + +static +void testcancel(void) +{ + epicsJob *job[2]; + epicsThreadPool *pool; + testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL); + if(!pool) + return; + + cancel[0]=epicsEventCreate(epicsEventEmpty); + cancel[1]=epicsEventCreate(epicsEventEmpty); + + testOk1((job[0]=epicsJobCreate(pool, &neverrun, EPICSJOB_SELF))!=NULL); + testOk1((job[1]=epicsJobCreate(pool, &toolate, EPICSJOB_SELF))!=NULL); + + /* freeze */ + epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0); + + testOk1(epicsJobUnqueue(job[0])==1); /* not queued yet */ + + epicsJobQueue(job[0]); + testOk1(epicsJobUnqueue(job[0])==0); + testOk1(epicsJobUnqueue(job[0])==1); + + epicsThreadSleep(0.01); + epicsJobQueue(job[0]); + testOk1(epicsJobUnqueue(job[0])==0); + testOk1(epicsJobUnqueue(job[0])==1); + + epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1); + + epicsJobQueue(job[1]); /* actually let it run this time */ + + epicsEventMustWait(cancel[0]); + testOk1(epicsJobUnqueue(job[0])==1); + epicsEventSignal(cancel[1]); + + epicsThreadPoolDestroy(pool); + epicsEventDestroy(cancel[0]); + epicsEventDestroy(cancel[1]); + + testOk1(shouldneverrun==0); + testOk1(numtoolate==1); +} + +static +unsigned int sharedWasDeleted=0; + +static +void lastjob(void *arg, epicsJobMode mode) +{ + epicsJob *job=arg; + if(mode==epicsJobModeCleanup) { + sharedWasDeleted=1; + epicsJobDestroy(job); + } +} + +static +void testshared(void) +{ + epicsThreadPool *poolA, *poolB; + epicsThreadPoolConfig conf; + epicsJob *job; + + epicsThreadPoolConfigDefaults(&conf); + + testDiag("Check reference counting of shared pools"); + + testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL); + + testOk1(poolA->sharedCount==1); + + testOk1((poolB=epicsThreadPoolGetShared(&conf))!=NULL); + + testOk1(poolA==poolB); + + testOk1(poolA->sharedCount==2); + + epicsThreadPoolReleaseShared(poolA); + + testOk1(poolB->sharedCount==1); + + testOk1((job=epicsJobCreate(poolB, &lastjob, EPICSJOB_SELF))!=NULL); + + epicsThreadPoolReleaseShared(poolB); + + testOk1(sharedWasDeleted==1); + + testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL); + + testOk1(poolA->sharedCount==1); + + epicsThreadPoolReleaseShared(poolA); + +} + +MAIN(epicsThreadPoolTest) +{ + testPlan(171); + + nullop(); + oneop(); + testDiag("Queue with delayed start"); + postjobs(1,1,1); + postjobs(0,1,1); + postjobs(4,4,1); + postjobs(0,4,1); + postjobs(2,4,1); + testDiag("Queue with immediate start"); + postjobs(1,1,0); + postjobs(0,1,0); + postjobs(4,4,0); + postjobs(0,4,0); + postjobs(2,4,0); + testcleanup(); + testreadd(); + testcancel(); + testshared(); + + return testDone(); +}