From a8fd14aae174583a9733d71038484d5fbf6dc801 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 24 Jul 2014 14:22:52 -0400 Subject: [PATCH 01/13] Thread pool API --- src/libCom/pool/epicsThreadPool.h | 148 ++++++++++++++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 src/libCom/pool/epicsThreadPool.h diff --git a/src/libCom/pool/epicsThreadPool.h b/src/libCom/pool/epicsThreadPool.h new file mode 100644 index 000000000..85279fc3d --- /dev/null +++ b/src/libCom/pool/epicsThreadPool.h @@ -0,0 +1,148 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* General purpose worker thread pool manager + * mdavidsaver@bnl.gov + */ +#ifndef EPICSTHREADPOOL_H +#define EPICSTHREADPOOL_H + +#include +#include + +#include "shareLib.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + size_t initialThreads; + size_t maxThreads; + unsigned int workerStack; + unsigned int workerPriority; +} epicsThreadPoolConfig; + +typedef struct epicsThreadPool epicsThreadPool; + +/* Job function call modes */ +typedef enum { + /* Normal run of job */ + epicsJobModeRun, + /* Thread pool is being destroyed. + * A chance to cleanup the job immediately with epicsJobDestroy(). + * If ignored, the job is orphaned (dissociated from the thread pool) + * and epicsJobDestroy() must be called later. + */ + epicsJobModeCleanup +} epicsJobMode; + +typedef void (*epicsJobFunction)(void* arg, epicsJobMode mode); + +typedef struct epicsJob epicsJob; + +/* Pool operations */ + +/* Initialize a pool config with default values. + * This much be done to preserve future compatibility + * when new options are added. + */ +epicsShareFunc void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *); + +/* fetch or create a thread pool which can be shared with other users. + * may return NULL for allocation failures + */ +epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts); +epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool); + +/* If opts is NULL then defaults are used. + * The opts pointer is not stored by this call, and may exist on the stack. + */ +epicsShareFunc epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts); + +/* Blocks until all worker threads have stopped. + * Any jobs still attached to this pool receive a callback with EPICSJOB_CLEANUP + * and are then orphaned. + */ +epicsShareFunc void epicsThreadPoolDestroy(epicsThreadPool *); + +/* pool control options */ +typedef enum { + epicsThreadPoolQueueAdd, /* val==0 causes epicsJobQueue to fail, 1 is default */ + epicsThreadPoolQueueRun /* val==0 prevents workers from running jobs, 1 is default */ +} epicsThreadPoolOption; +epicsShareFunc void epicsThreadPoolControl(epicsThreadPool* pool, + epicsThreadPoolOption opt, + unsigned int val); + +/* Block until job queue is emptied and no jobs are running. + * Useful after calling epicsThreadPoolControl() with option epicsThreadPoolQueueAdd=0 + * + * timeout<0 waits forever, timeout==0 polls, timeout>0 waits for a fixed time + * Returns 1 for timeout, 0 for success, >1 on errors + */ +epicsShareFunc int epicsThreadPoolWait(epicsThreadPool* pool, double timeout); + + +/* Per job operations */ + +/* special flag for epicsJobCreate(). + * When passed as the third argument "user" + * the argument passed to the job callback + * will be the epicsJob* + */ +#define EPICSJOB_SELF _epicsJobArgSelf +epicsShareExtern void* _epicsJobArgSelf; + +/* creates, but does not add, a new job. + * If pool in NULL then the job is not associated with any pool and + * epicsJobMove() must be called before epicsJobQueue() + * returns a new job pointer, or NULL on error + */ +epicsShareFunc epicsJob* epicsJobCreate(epicsThreadPool* pool, + epicsJobFunction cb, + void* user); + +/* Cancel and free a job structure. Does not block. + * job may not be immediately free'd. + * Safe to call from a running job function. + */ +epicsShareFunc void epicsJobDestroy(epicsJob*); + +/* Move the job to a different pool. + * If pool is NULL then the job will no longer be associated + * with any pool. + * Not thread safe. Job must not be running or queued. + * returns 0 on error, and non-zero on error + */ +epicsShareFunc int epicsJobMove(epicsJob* job, epicsThreadPool* pool); + +/* Adds the job to the run queue + * Safe to call from a running job function. + * returns 0 for success, non-zero on error. + */ +epicsShareFunc int epicsJobQueue(epicsJob*); + +/* Remove a job from the run queue if it is queued. + * Safe to call from a running job function. + * returns 0 if job already ran, is running, or was not queued before, + * 1 if job was queued and now is not. + */ +epicsShareFunc int epicsJobUnqueue(epicsJob*); + + +/* Mostly useful for debugging */ + +epicsShareFunc void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd); + +/* Current number of active workers. May be less than the maximum */ +epicsShareFunc size_t epicsThreadPoolNThreads(epicsThreadPool *); + +#ifdef __cplusplus +} +#endif + +#endif // EPICSTHREADPOOL_H From 658bd0b570f9bc911ca6fa193f4fb19e7ccb9d2c Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 24 Jul 2014 14:22:52 -0400 Subject: [PATCH 02/13] thread pool implementation --- src/libCom/Makefile | 1 + src/libCom/pool/Makefile | 16 ++ src/libCom/pool/poolJob.c | 314 ++++++++++++++++++++++++++++ src/libCom/pool/poolPriv.h | 97 +++++++++ src/libCom/pool/threadPool.c | 394 +++++++++++++++++++++++++++++++++++ 5 files changed, 822 insertions(+) create mode 100644 src/libCom/pool/Makefile create mode 100644 src/libCom/pool/poolJob.c create mode 100644 src/libCom/pool/poolPriv.h create mode 100644 src/libCom/pool/threadPool.c diff --git a/src/libCom/Makefile b/src/libCom/Makefile index c8f41b330..767180316 100644 --- a/src/libCom/Makefile +++ b/src/libCom/Makefile @@ -31,6 +31,7 @@ include $(LIBCOM)/log/Makefile include $(LIBCOM)/macLib/Makefile include $(LIBCOM)/misc/Makefile include $(LIBCOM)/osi/Makefile +include $(LIBCOM)/pool/Makefile include $(LIBCOM)/ring/Makefile include $(LIBCOM)/taskwd/Makefile include $(LIBCOM)/timer/Makefile diff --git a/src/libCom/pool/Makefile b/src/libCom/pool/Makefile new file mode 100644 index 000000000..efaf66729 --- /dev/null +++ b/src/libCom/pool/Makefile @@ -0,0 +1,16 @@ +#************************************************************************* +# Copyright (c) 2014 UChicago Argonne LLC, as Operator of Argonne +# National Laboratory. +# EPICS BASE is distributed subject to a Software License Agreement found +# in file LICENSE that is included with this distribution. +#************************************************************************* + +# This is a Makefile fragment, see src/libCom/Makefile. + +SRC_DIRS += $(LIBCOM)/pool + +INC += epicsThreadPool.h + +Com_SRCS += poolJob.c +Com_SRCS += threadPool.c + diff --git a/src/libCom/pool/poolJob.c b/src/libCom/pool/poolJob.c new file mode 100644 index 000000000..00cc8dfaf --- /dev/null +++ b/src/libCom/pool/poolJob.c @@ -0,0 +1,314 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +#include +#include +#include + +#define epicsExportSharedSymbols + +#include "dbDefs.h" +#include "errlog.h" +#include "ellLib.h" +#include "epicsThread.h" +#include "epicsMutex.h" +#include "epicsEvent.h" +#include "epicsInterrupt.h" +#include "errCommon.h" + +#include "epicsThreadPool.h" +#include "poolPriv.h" + +void* _epicsJobArgSelf = &_epicsJobArgSelf; + +static +void workerMain(void* arg) +{ + epicsThreadPool *pool=arg; + + /* workers are created with counts + * in the running, sleeping, and (possibly) waking counters + */ + + epicsMutexMustLock(pool->guard); + pool->threadsAreAwake++; + pool->threadsSleeping--; + + while(1) + { + ELLNODE *cur; + epicsJob *job; + + pool->threadsAreAwake--; + pool->threadsSleeping++; + epicsMutexUnlock(pool->guard); + + epicsEventMustWait(pool->workerWakeup); + + epicsMutexMustLock(pool->guard); + pool->threadsSleeping--; + pool->threadsAreAwake++; + + if(pool->threadsWaking==0) + continue; + + pool->threadsWaking--; + + CHECKCOUNT(pool); + + if(pool->shutdown) + break; + + if(pool->pauserun) + continue; + + /* more threads to wakeup */ + if(pool->threadsWaking) + { + epicsEventSignal(pool->workerWakeup); + } + + while ((cur=ellGet(&pool->jobs)) !=NULL) + { + job=CONTAINER(cur, epicsJob, jobnode); + + assert(job->queued && !job->running); + + job->queued=0; + job->running=1; + + epicsMutexUnlock(pool->guard); + (*job->func)(job->arg, epicsJobModeRun); + epicsMutexMustLock(pool->guard); + + if(job->freewhendone) { + job->dead=1; + free(job); + } else { + job->running=0; + /* job may be re-queued from within callback */ + if(job->queued) + ellAdd(&pool->jobs, &job->jobnode); + else + ellAdd(&pool->owned, &job->jobnode); + } + + } + + if(pool->observerCount) + epicsEventSignal(pool->observerWakeup); + } + + pool->threadsAreAwake--; + pool->threadsRunning--; + + { + size_t nrun = pool->threadsRunning, + ocnt = pool->observerCount; + epicsMutexUnlock(pool->guard); + + if(ocnt) + epicsEventSignal(pool->observerWakeup); + + if(nrun) + epicsEventSignal(pool->workerWakeup); /* pass along */ + else + epicsEventSignal(pool->shutdownEvent); + } + + return; +} + +void createPoolThread(epicsThreadPool *pool) +{ + epicsThreadId tid; + + tid = epicsThreadCreate("PoolWorker", + pool->conf.workerPriority, + pool->conf.workerStack, + &workerMain, + pool); + if(!tid) + return; + + pool->threadsRunning++; + pool->threadsSleeping++; +} + +epicsJob* epicsJobCreate(epicsThreadPool* pool, + epicsJobFunction func, + void* arg) +{ + epicsJob *job=calloc(1, sizeof(*job)); + + if(!job) + return NULL; + + if(arg==&_epicsJobArgSelf) + arg=job; + + job->pool=pool; + job->func=func; + job->arg=arg; + + if(pool) { + epicsMutexMustLock(pool->guard); + + ellAdd(&pool->owned, &job->jobnode); + + epicsMutexUnlock(pool->guard); + } + + return job; +} + +void epicsJobDestroy(epicsJob* job) +{ + epicsThreadPool *pool; + if(!job || !job->pool){ + free(job); + return; + } + pool=job->pool; + + epicsMutexMustLock(pool->guard); + + assert(!job->dead); + + epicsJobUnqueue(job); + + if(job->running || job->freewhendone) { + job->freewhendone=1; + } else { + ellDelete(&pool->owned, &job->jobnode); + job->dead=1; + free(job); + } + + epicsMutexUnlock(pool->guard); +} + +int epicsJobMove(epicsJob* job, epicsThreadPool* newpool) +{ + epicsThreadPool *pool=job->pool; + + /* remove from current pool */ + if(pool) { + epicsMutexMustLock(pool->guard); + + if(job->queued || job->running) { + epicsMutexUnlock(pool->guard); + return EINVAL; + } + + ellDelete(&pool->owned, &job->jobnode); + + epicsMutexUnlock(pool->guard); + } + + pool = job->pool = newpool; + + /* add to new pool */ + if(pool) { + epicsMutexMustLock(pool->guard); + + ellAdd(&pool->owned, &job->jobnode); + + epicsMutexUnlock(pool->guard); + } + + return 0; +} + +int epicsJobQueue(epicsJob* job) +{ + int ret=0; + epicsThreadPool *pool=job->pool; + if(!pool) + return EINVAL; + + epicsMutexMustLock(pool->guard); + + assert(!job->dead); + + if(pool->pauseadd || job->freewhendone) { + ret=EINVAL; + goto done; + } else if(job->queued) { + goto done; + } + + job->queued=1; + /* Job may be queued from within a callback */ + if(!job->running) { + ellDelete(&pool->owned, &job->jobnode); + ellAdd(&pool->jobs, &job->jobnode); + } else { + /* some worker will find it again before sleeping */ + goto done; + } + + /* Since we hold the lock, we can be certain that all awake worker are + * executing work functions. The current thread may be a worker. + * We prefer to wakeup a new worker rather then wait for a busy worker to + * finish. However, after we initiate a wakeup there will be a race + * between the worker waking up, and a busy worker finishing. + * Thus we can't avoid spurious wakeups. + */ + + if(pool->threadsRunning >= pool->conf.maxThreads) { + /* all workers created... */ + /* ... but some are sleeping, so wake one up */ + if(pool->threadsWaking < pool->threadsSleeping) { + pool->threadsWaking++; + epicsEventSignal(pool->workerWakeup); + } + /*else one of the running workers will find this job before sleeping */ + CHECKCOUNT(pool); + + } else { + /* could create more workers so + * will either create a new worker, or wakeup an existing worker + */ + pool->threadsWaking++; + epicsEventSignal(pool->workerWakeup); + if(pool->threadsWaking > pool->threadsSleeping) + createPoolThread(pool); + CHECKCOUNT(pool); + } + +done: + epicsMutexUnlock(pool->guard); + return ret; +} + +int epicsJobUnqueue(epicsJob* job) +{ + int ret=0; + epicsThreadPool *pool=job->pool; + + if(!pool) + return 0; + + epicsMutexMustLock(pool->guard); + + assert(!job->dead); + + if(job->queued) { + if(!job->running) { + ellDelete(&pool->jobs, &job->jobnode); + ellAdd(&pool->owned, &job->jobnode); + } + job->queued=0; + ret=1; + } + + epicsMutexUnlock(pool->guard); + + return ret; +} + diff --git a/src/libCom/pool/poolPriv.h b/src/libCom/pool/poolPriv.h new file mode 100644 index 000000000..789ea57a6 --- /dev/null +++ b/src/libCom/pool/poolPriv.h @@ -0,0 +1,97 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +#ifndef POOLPRIV_H +#define POOLPRIV_H + +#include "epicsThreadPool.h" +#include "ellLib.h" +#include "epicsThread.h" +#include "epicsEvent.h" +#include "epicsMutex.h" + +struct epicsThreadPool { + ELLNODE sharedNode; + size_t sharedCount; + + ELLLIST jobs; /* run queue */ + ELLLIST owned; /* unqueued jobs. */ + + /* Worker state counters. + * The life cycle of a worker is + * Wakeup -> Awake -> Sleeping + * Newly created workers go into the wakeup state + */ + + /* # of running workers which are not waiting for a wakeup event */ + size_t threadsAreAwake; + /* # of sleeping workers which need to be awakened */ + size_t threadsWaking; + /* # of workers waiting on the workerWakeup event */ + size_t threadsSleeping; + /* # of threads started and not stopped */ + size_t threadsRunning; + + /* # of observers waiting on pool events */ + size_t observerCount; + + epicsEventId workerWakeup; + epicsEventId shutdownEvent; + + epicsEventId observerWakeup; + + /* Disallow epicsJobQueue */ + unsigned int pauseadd:1; + /* Prevent workers from running new jobs */ + unsigned int pauserun:1; + /* Prevent further changes to pool options */ + unsigned int freezeopt:1; + /* tell workers to exit */ + unsigned int shutdown:1; + + epicsMutexId guard; + + /* copy of config passed when created */ + epicsThreadPoolConfig conf; +}; + +/* Called after manipulating counters to check that invariants are preserved */ +#define CHECKCOUNT(pPool) do{if(!(pPool)->shutdown) { \ + assert( (pPool)->threadsAreAwake + (pPool)->threadsSleeping == (pPool)->threadsRunning ); \ + assert( (pPool)->threadsWaking <= (pPool)->threadsSleeping ); \ +}}while(0) + +/* When created a job is idle. queued and running are false + * and jobnode is in the thread pool's owned list. + * + * When the job is added, the queued flag is set and jobnode + * is in the jobs list. + * + * When the job starts running the queued flag is cleared and + * the running flag is set. jobnode is not in any list + * (held locally by worker). + * + * When the job has finished running, the running flag is cleared. + * The queued flag may be set if the job re-added itself. + * Based on the queued flag jobnode is added to the appropriate + * list. + */ +struct epicsJob { + ELLNODE jobnode; + epicsJobFunction func; + void *arg; + epicsThreadPool *pool; + + unsigned int queued:1; + unsigned int running:1; + unsigned int freewhendone:1; /* lazy delete of running job */ + unsigned int dead:1; /* flag to catch use of freed objects */ +}; + +void createPoolThread(epicsThreadPool *pool); + +#endif // POOLPRIV_H diff --git a/src/libCom/pool/threadPool.c b/src/libCom/pool/threadPool.c new file mode 100644 index 000000000..7cb3bb39d --- /dev/null +++ b/src/libCom/pool/threadPool.c @@ -0,0 +1,394 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +#include +#include +#include + +#define epicsExportSharedSymbols + +#include "dbDefs.h" +#include "errlog.h" +#include "ellLib.h" +#include "epicsThread.h" +#include "epicsMutex.h" +#include "epicsEvent.h" +#include "epicsInterrupt.h" +#include "errCommon.h" +#include "cantProceed.h" + +#include "epicsThreadPool.h" +#include "poolPriv.h" + + +void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts) +{ + memset(opts, 0, sizeof(*opts)); + opts->maxThreads=epicsThreadGetCPUs(); + opts->workerStack=epicsThreadGetStackSize(epicsThreadStackSmall); + + if(epicsThreadLowestPriorityLevelAbove(epicsThreadPriorityCAServerHigh, &opts->workerPriority) + !=epicsThreadBooleanStatusSuccess) + opts->workerPriority = epicsThreadPriorityMedium; +} + +epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts) +{ + size_t i; + epicsThreadPool *pool; + + /* caller likely didn't initialize the options structure */ + if(opts && opts->maxThreads==0) { + errlogMessage("Error: epicsThreadPoolCreate() options provided, but not initialized"); + return NULL; + } + + pool=calloc(1, sizeof(*pool)); + if(!pool) + return NULL; + + if(opts) + memcpy(&pool->conf, opts, sizeof(*opts)); + else + epicsThreadPoolConfigDefaults(&pool->conf); + + if(pool->conf.initialThreads > pool->conf.maxThreads) + pool->conf.initialThreads = pool->conf.maxThreads; + + pool->workerWakeup=epicsEventCreate(epicsEventEmpty); + pool->shutdownEvent=epicsEventCreate(epicsEventEmpty); + pool->observerWakeup=epicsEventCreate(epicsEventEmpty); + pool->guard=epicsMutexCreate(); + + if(!pool->workerWakeup || !pool->shutdownEvent || + !pool->observerWakeup || !pool->guard) + goto cleanup; + + ellInit(&pool->jobs); + ellInit(&pool->owned); + + epicsMutexMustLock(pool->guard); + + for(i=0; iconf.initialThreads; i++) { + createPoolThread(pool); + } + + epicsMutexUnlock(pool->guard); + + if(pool->threadsRunning==0 && pool->conf.initialThreads!=0) { + errlogPrintf("Error: Unable to create any threads for thread pool\n"); + goto cleanup; + + }else if(pool->threadsRunning < pool->conf.initialThreads) { + errlogPrintf("Warning: Unable to create all threads for thread pool (%lu/%lu)\n", + (unsigned long)pool->threadsRunning, + (unsigned long)pool->conf.initialThreads); + } + + return pool; + +cleanup: + if(pool->workerWakeup) epicsEventDestroy(pool->workerWakeup); + if(pool->shutdownEvent) epicsEventDestroy(pool->shutdownEvent); + if(pool->observerWakeup) epicsEventDestroy(pool->observerWakeup); + if(pool->guard) epicsMutexDestroy(pool->guard); + + free(pool); + return 0; +} + +static +void _epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val) +{ + if(pool->freezeopt) + return; + + if(opt==epicsThreadPoolQueueAdd) { + pool->pauseadd = !val; + + } else if(opt==epicsThreadPoolQueueRun) { + if(!val && !pool->pauserun) + pool->pauserun=1; + + else if(val && pool->pauserun) { + size_t jobs=(size_t)ellCount(&pool->jobs); + pool->pauserun=0; + + if(jobs) { + size_t wakeable=pool->threadsSleeping - pool->threadsWaking; + /* first try to give jobs to sleeping workers */ + if(wakeable) { + int wakeup = jobs > wakeable ? wakeable : jobs; + assert(wakeup>0); + jobs-=wakeup; + pool->threadsWaking+=wakeup; + epicsEventSignal(pool->workerWakeup); + CHECKCOUNT(pool); + } + } + while(jobs-- && pool->threadsRunning < pool->conf.maxThreads) { + pool->threadsWaking++; + epicsEventSignal(pool->workerWakeup); + createPoolThread(pool); + } + CHECKCOUNT(pool); + } + } + /* unknown options ignored */ + +} + +void epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val) +{ + epicsMutexMustLock(pool->guard); + _epicsThreadPoolControl(pool, opt, val); + epicsMutexUnlock(pool->guard); +} + +int epicsThreadPoolWait(epicsThreadPool* pool, double timeout) +{ + int ret=0; + epicsMutexMustLock(pool->guard); + + while(ellCount(&pool->jobs)>0 || pool->threadsAreAwake>0) { + pool->observerCount++; + epicsMutexUnlock(pool->guard); + + if(timeout<0.0) { + epicsEventMustWait(pool->observerWakeup); + + } else { + switch(epicsEventWaitWithTimeout(pool->observerWakeup, timeout)) { + case epicsEventWaitError: + cantProceed("epicsThreadPoolWait: failed to wait for Event"); + break; + case epicsEventWaitTimeout: + ret=EWOULDBLOCK; + break; + case epicsEventWaitOK: + ret=0; + break; + } + + } + + epicsMutexMustLock(pool->guard); + pool->observerCount--; + + if(pool->observerCount) + epicsEventSignal(pool->observerWakeup); + + if(ret!=0) + break; + } + + epicsMutexUnlock(pool->guard); + return ret; +} + +void epicsThreadPoolDestroy(epicsThreadPool *pool) +{ + size_t nThr; + ELLLIST notify; + ELLNODE *cur; + + if(!pool) + return; + + ellInit(¬ify); + + epicsMutexMustLock(pool->guard); + + /* run remaining queued jobs */ + _epicsThreadPoolControl(pool, epicsThreadPoolQueueAdd, 0); + _epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1); + nThr=pool->threadsRunning; + pool->freezeopt = 1; + + epicsMutexUnlock(pool->guard); + + epicsThreadPoolWait(pool, -1.0); + /* At this point all queued jobs have run */ + + epicsMutexMustLock(pool->guard); + + pool->shutdown=1; + /* wakeup all */ + if(pool->threadsWaking < pool->threadsSleeping) { + pool->threadsWaking = pool->threadsSleeping; + epicsEventSignal(pool->workerWakeup); + } + + ellConcat(¬ify, &pool->owned); + ellConcat(¬ify, &pool->jobs); + + epicsMutexUnlock(pool->guard); + + if(nThr && epicsEventWait(pool->shutdownEvent)!=epicsEventWaitOK){ + errlogMessage("epicsThreadPoolDestroy: wait error"); + return; + } + + /* all workers are now shutdown */ + + /* notify remaining jobs that pool is being destroyed */ + while( (cur=ellGet(¬ify))!=NULL ) + { + epicsJob *job=CONTAINER(cur, epicsJob, jobnode); + job->running=1; + (*job->func)(job->arg, epicsJobModeCleanup); + job->running=0; + if(job->freewhendone) + free(job); + else + job->pool=NULL; /* orphan */ + } + + epicsEventDestroy(pool->workerWakeup); + epicsEventDestroy(pool->shutdownEvent); + epicsEventDestroy(pool->observerWakeup); + epicsMutexDestroy(pool->guard); + + free(pool); +} + + +void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd) +{ + ELLNODE *cur; + epicsMutexMustLock(pool->guard); + + fprintf(fd, "Thread Pool with %lu/%lu threads\n" + " running %d jobs with %lu threads\n", + (unsigned long)pool->threadsRunning, + (unsigned long)pool->conf.maxThreads, + ellCount(&pool->jobs), + (unsigned long)pool->threadsAreAwake); + if(pool->pauseadd) + fprintf(fd, " Inhibit queueing\n"); + if(pool->pauserun) + fprintf(fd, " Pause workers\n"); + if(pool->shutdown) + fprintf(fd, " Shutdown in progress\n"); + + for(cur=ellFirst(&pool->jobs); cur; cur=ellNext(cur)) + { + epicsJob *job=CONTAINER(cur, epicsJob, jobnode); + fprintf(fd, " job 0x%lu func: 0x%lu, arg: 0x%lu ", + (unsigned long)job, (unsigned long)job->func, + (unsigned long)job->arg); + if(job->queued) + fprintf(fd, "Queued "); + if(job->running) + fprintf(fd, "Running "); + if(job->freewhendone) + fprintf(fd, "Free "); + fprintf(fd, "\n"); + } + + epicsMutexUnlock(pool->guard); +} + +size_t epicsThreadPoolNThreads(epicsThreadPool *pool) +{ + size_t ret; + + epicsMutexMustLock(pool->guard); + ret=pool->threadsRunning; + epicsMutexUnlock(pool->guard); + + return ret; +} + +static +ELLLIST sharedPools = ELLLIST_INIT; + +static +epicsMutexId sharedPoolsGuard; + +static +epicsThreadOnceId sharedPoolsOnce = EPICS_THREAD_ONCE_INIT; + +static +void sharedPoolsInit(void* unused) +{ + sharedPoolsGuard = epicsMutexMustCreate(); +} + +epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts) +{ + ELLNODE *node; + epicsThreadPool *cur; + size_t N=epicsThreadGetCPUs(); + + if(!opts) + return NULL; + /* shared pools must have a minimum allowed number of workers. + * Use the number of CPU cores + */ + if(opts->maxThreadsmaxThreads=N; + + epicsThreadOnce(&sharedPoolsOnce, &sharedPoolsInit, NULL); + + epicsMutexMustLock(sharedPoolsGuard); + + for(node=ellFirst(&sharedPools); node; node=ellNext(node)) + { + cur=CONTAINER(node, epicsThreadPool, sharedNode); + + /* Must have exactly the requested priority + * At least the requested max workers + * and at least the requested stack size + */ + if(cur->conf.workerPriority != opts->workerPriority) + continue; + if(cur->conf.maxThreads < opts->maxThreads) + continue; + if(cur->conf.workerStack < opts->workerStack) + continue; + + cur->sharedCount++; + assert(cur->sharedCount>0); + epicsMutexUnlock(sharedPoolsGuard); + + epicsMutexMustLock(cur->guard); + *opts = cur->conf; + epicsMutexUnlock(cur->guard); + return cur; + } + + cur=epicsThreadPoolCreate(opts); + if(!cur) { + epicsMutexUnlock(sharedPoolsGuard); + return NULL; + } + cur->sharedCount=1; + + ellAdd(&sharedPools, &cur->sharedNode); + epicsMutexUnlock(sharedPoolsGuard); + return cur; +} + +epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool) +{ + if(!pool) + return; + + epicsMutexMustLock(sharedPoolsGuard); + + assert(pool->sharedCount>0); + + pool->sharedCount--; + + if(pool->sharedCount==0) { + ellDelete(&sharedPools, &pool->sharedNode); + epicsThreadPoolDestroy(pool); + } + + epicsMutexUnlock(sharedPoolsGuard); +} From 01a50b5165ba2d211b73ee6a78628457e8ccaeb7 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 24 Jul 2014 14:22:52 -0400 Subject: [PATCH 03/13] thread pool test --- src/libCom/test/Makefile | 4 + src/libCom/test/epicsThreadPoolTest.c | 443 ++++++++++++++++++++++++++ 2 files changed, 447 insertions(+) create mode 100644 src/libCom/test/epicsThreadPoolTest.c diff --git a/src/libCom/test/Makefile b/src/libCom/test/Makefile index 870313e1c..bcc72c71e 100755 --- a/src/libCom/test/Makefile +++ b/src/libCom/test/Makefile @@ -12,6 +12,10 @@ include $(TOP)/configure/CONFIG PROD_LIBS += Com +TESTPROD_HOST += epicsThreadPoolTest +epicsThreadPoolTest_SRCS += epicsThreadPoolTest.c +TESTS += epicsThreadPoolTest + TESTPROD_HOST += epicsUnitTestTest epicsUnitTestTest_SRCS += epicsUnitTestTest.c # Not much point running this on vxWorks or RTEMS... diff --git a/src/libCom/test/epicsThreadPoolTest.c b/src/libCom/test/epicsThreadPoolTest.c new file mode 100644 index 000000000..660bdea87 --- /dev/null +++ b/src/libCom/test/epicsThreadPoolTest.c @@ -0,0 +1,443 @@ +/*************************************************************************\ +* Copyright (c) 2014 Brookhaven Science Associates, as Operator of +* Brookhaven National Laboratory. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +#include "epicsThreadPool.h" + +/* included to allow tests to peek */ +#include "../../pool/poolPriv.h" + +#include "testMain.h" +#include "epicsUnitTest.h" + +#include "cantProceed.h" +#include "epicsEvent.h" +#include "epicsMutex.h" +#include "epicsThread.h" + +/* Do nothing */ +static void nullop(void) +{ + epicsThreadPool *pool; + testDiag("nullop()"); + { + epicsThreadPoolConfig conf; + epicsThreadPoolConfigDefaults(&conf); + testOk1(conf.maxThreads>0); + + testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL); + if(!pool) + return; + } + + epicsThreadPoolDestroy(pool); +} + +/* Just create and destroy worker threads */ +static void oneop(void) +{ + epicsThreadPool *pool; + testDiag("oneop()"); + { + epicsThreadPoolConfig conf; + epicsThreadPoolConfigDefaults(&conf); + conf.initialThreads=2; + testOk1(conf.maxThreads>0); + + testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL); + if(!pool) + return; + } + + epicsThreadPoolDestroy(pool); +} + +/* Test that Bursts of jobs will create enough threads to + * run all in parallel + */ +typedef struct { + epicsMutexId guard; + unsigned int count; + epicsEventId allrunning; + epicsEventId done; + epicsJob **job; +} countPriv; + +static void countjob(void *param, epicsJobMode mode) +{ + countPriv *cnt=param; + testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + return; + + epicsMutexMustLock(cnt->guard); + testDiag("Job %lu", (unsigned long)cnt->count); + cnt->count--; + if(cnt->count==0) { + testDiag("All jobs running"); + epicsEventSignal(cnt->allrunning); + } + epicsMutexUnlock(cnt->guard); + + epicsEventMustWait(cnt->done); + epicsEventSignal(cnt->done); /* pass along to next thread */ +} + +/* Starts "mcnt" jobs in a pool with initial and max + * thread counts "icnt" and "mcnt". + * The test ensures that all jobs run in parallel. + * "cork" checks the function of pausing the run queue + * with epicsThreadPoolQueueRun + */ +static void postjobs(size_t icnt, size_t mcnt, int cork) +{ + size_t i; + epicsThreadPool *pool; + countPriv *priv=callocMustSucceed(1, sizeof(*priv), "postjobs priv alloc"); + priv->guard=epicsMutexMustCreate(); + priv->done=epicsEventMustCreate(epicsEventEmpty); + priv->allrunning=epicsEventMustCreate(epicsEventEmpty); + priv->count=mcnt; + priv->job=callocMustSucceed(mcnt, sizeof(*priv->job), "postjobs job array"); + + testDiag("postjobs(%lu,%lu)", (unsigned long)icnt, (unsigned long)mcnt); + + { + epicsThreadPoolConfig conf; + epicsThreadPoolConfigDefaults(&conf); + conf.initialThreads=icnt; + conf.maxThreads=mcnt; + + testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL); + if(!pool) + return; + } + + if(cork) + epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0); + + for(i=0; ijob[i] = epicsJobCreate(pool, &countjob, priv); + testOk1(priv->job[i]!=NULL); + testOk1(epicsJobQueue(priv->job[i])==0); + } + + if(cork) { + /* no jobs should have run */ + epicsMutexMustLock(priv->guard); + testOk1(priv->count==mcnt); + epicsMutexUnlock(priv->guard); + + epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1); + } + + testDiag("Waiting for all jobs to start"); + epicsEventMustWait(priv->allrunning); + testDiag("Stop all"); + epicsEventSignal(priv->done); + + for(i=0; ijob[i]); + } + + epicsThreadPoolDestroy(pool); + epicsMutexDestroy(priv->guard); + epicsEventDestroy(priv->allrunning); + epicsEventDestroy(priv->done); + free(priv->job); + free(priv); +} + +static unsigned int flag0 = 0; + +/* Test cancel from job (no-op) + * and destroy from job (lazy free) + */ +static void cleanupjob0(void* arg, epicsJobMode mode) +{ + epicsJob *job=arg; + testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + return; + + assert(flag0==0); + flag0=1; + + testOk1(epicsJobQueue(job)==0); + + epicsJobDestroy(job); /* delete while job is running */ +} +static void cleanupjob1(void* arg, epicsJobMode mode) +{ + epicsJob *job=arg; + testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + return; + + testOk1(epicsJobQueue(job)==0); + + testOk1(epicsJobUnqueue(job)==1); + /* delete later after job finishes, but before pool is destroyed */ +} +static void cleanupjob2(void* arg, epicsJobMode mode) +{ + epicsJob *job=arg; + testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + epicsJobDestroy(job); /* delete when threadpool is destroyed */ + else if(mode==epicsJobModeRun) + testOk1(epicsJobUnqueue(job)==0); +} +static epicsJobFunction cleanupjobs[3] = {&cleanupjob0,&cleanupjob1,&cleanupjob2}; + +/* Tests three methods for job cleanup. + * 1. destroy which running + * 2. deferred cleanup after pool destroyed + * 3. immediate cleanup when pool destroyed + */ +static void testcleanup(void) +{ + int i=0; + epicsThreadPool *pool; + epicsJob *job[3]; + + testDiag("testcleanup()"); + + testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL); + if(!pool) + return; + + /* unrolled so that valgrind can show which methods leaks */ + testOk1((job[0]=epicsJobCreate(pool, cleanupjobs[0], EPICSJOB_SELF))!=NULL); + testOk1((job[1]=epicsJobCreate(pool, cleanupjobs[1], EPICSJOB_SELF))!=NULL); + testOk1((job[2]=epicsJobCreate(pool, cleanupjobs[2], EPICSJOB_SELF))!=NULL); + for(i=0; i<3; i++) { + testOk1(epicsJobQueue(job[i])==0); + } + + epicsThreadPoolWait(pool, -1); + epicsJobDestroy(job[1]); + epicsThreadPoolDestroy(pool); +} + +/* Test re-add from inside job */ +typedef struct { + unsigned int count; + epicsEventId done; + epicsJob *job; + unsigned int inprogress; +} readdPriv; + +static void readdjob(void *arg, epicsJobMode mode) +{ + readdPriv *priv=arg; + testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + return; + testOk1(priv->inprogress==0); + testDiag("count==%u", priv->count); + + if(priv->count--) { + priv->inprogress=1; + epicsJobQueue(priv->job); + epicsThreadSleep(0.05); + priv->inprogress=0; + }else{ + epicsEventSignal(priv->done); + epicsJobDestroy(priv->job); + } +} + +/* Test re-queueing a job while it is running. + * Check that a single job won't run concurrently. + */ +static void testreadd(void) { + epicsThreadPool *pool; + readdPriv *priv=callocMustSucceed(1, sizeof(*priv), "testcleanup priv"); + readdPriv *priv2=callocMustSucceed(1, sizeof(*priv), "testcleanup priv"); + + testDiag("testreadd"); + + priv->done=epicsEventMustCreate(epicsEventEmpty); + priv->count=5; + priv2->done=epicsEventMustCreate(epicsEventEmpty); + priv2->count=5; + + testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL); + if(!pool) + return; + + testOk1((priv->job=epicsJobCreate(pool, &readdjob, priv))!=NULL); + testOk1((priv2->job=epicsJobCreate(pool, &readdjob, priv2))!=NULL); + + testOk1(epicsJobQueue(priv->job)==0); + testOk1(epicsJobQueue(priv2->job)==0); + epicsEventMustWait(priv->done); + epicsEventMustWait(priv2->done); + + testOk1(epicsThreadPoolNThreads(pool)==2); + + epicsThreadPoolDestroy(pool); + epicsEventDestroy(priv->done); + epicsEventDestroy(priv2->done); + free(priv); + free(priv2); + +} + +static int shouldneverrun = 0; +static int numtoolate = 0; + +/* test job canceling */ +static +void neverrun(void *arg, epicsJobMode mode) +{ + epicsJob *job=arg; + testOk1(mode==epicsJobModeCleanup); + if(mode==epicsJobModeCleanup) + epicsJobDestroy(job); + else + shouldneverrun++; +} +static epicsEventId cancel[2]; +static +void toolate(void *arg, epicsJobMode mode) +{ + epicsJob *job=arg; + if(mode==epicsJobModeCleanup){ + epicsJobDestroy(job); + return; + } + testPass("Job runs"); + numtoolate++; + epicsEventSignal(cancel[0]); + epicsEventMustWait(cancel[1]); +} + +static +void testcancel(void) +{ + epicsJob *job[2]; + epicsThreadPool *pool; + testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL); + if(!pool) + return; + + cancel[0]=epicsEventCreate(epicsEventEmpty); + cancel[1]=epicsEventCreate(epicsEventEmpty); + + testOk1((job[0]=epicsJobCreate(pool, &neverrun, EPICSJOB_SELF))!=NULL); + testOk1((job[1]=epicsJobCreate(pool, &toolate, EPICSJOB_SELF))!=NULL); + + /* freeze */ + epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0); + + testOk1(epicsJobUnqueue(job[0])==0); /* not queued yet */ + + epicsJobQueue(job[0]); + testOk1(epicsJobUnqueue(job[0])==1); + testOk1(epicsJobUnqueue(job[0])==0); + + epicsThreadSleep(0.01); + epicsJobQueue(job[0]); + testOk1(epicsJobUnqueue(job[0])==1); + testOk1(epicsJobUnqueue(job[0])==0); + + epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1); + + epicsJobQueue(job[1]); /* actually let it run this time */ + + epicsEventMustWait(cancel[0]); + testOk1(epicsJobUnqueue(job[0])==0); + epicsEventSignal(cancel[1]); + + epicsThreadPoolDestroy(pool); + epicsEventDestroy(cancel[0]); + epicsEventDestroy(cancel[1]); + + testOk1(shouldneverrun==0); + testOk1(numtoolate==1); +} + +static +unsigned int sharedWasDeleted=0; + +static +void lastjob(void *arg, epicsJobMode mode) +{ + epicsJob *job=arg; + if(mode==epicsJobModeCleanup) { + sharedWasDeleted=1; + epicsJobDestroy(job); + } +} + +static +void testshared(void) +{ + epicsThreadPool *poolA, *poolB; + epicsThreadPoolConfig conf; + epicsJob *job; + + epicsThreadPoolConfigDefaults(&conf); + + testDiag("Check reference counting of shared pools"); + + testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL); + + testOk1(poolA->sharedCount==1); + + testOk1((poolB=epicsThreadPoolGetShared(&conf))!=NULL); + + testOk1(poolA==poolB); + + testOk1(poolA->sharedCount==2); + + epicsThreadPoolReleaseShared(poolA); + + testOk1(poolB->sharedCount==1); + + testOk1((job=epicsJobCreate(poolB, &lastjob, EPICSJOB_SELF))!=NULL); + + epicsThreadPoolReleaseShared(poolB); + + testOk1(sharedWasDeleted==1); + + testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL); + + testOk1(poolA->sharedCount==1); + + epicsThreadPoolReleaseShared(poolA); + +} + +MAIN(epicsThreadPoolTest) +{ + testPlan(171); + + nullop(); + oneop(); + testDiag("Queue with delayed start"); + postjobs(1,1,1); + postjobs(0,1,1); + postjobs(4,4,1); + postjobs(0,4,1); + postjobs(2,4,1); + testDiag("Queue with immediate start"); + postjobs(1,1,0); + postjobs(0,1,0); + postjobs(4,4,0); + postjobs(0,4,0); + postjobs(2,4,0); + testcleanup(); + testreadd(); + testcancel(); + testshared(); + + return testDone(); +} From fd2edfe94c9d0a142d4dfeb2052d7f74ac5500cc Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 24 Jul 2014 14:22:52 -0400 Subject: [PATCH 04/13] release notes --- documentation/RELEASE_NOTES.html | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/documentation/RELEASE_NOTES.html b/documentation/RELEASE_NOTES.html index 4e4845715..0dfa814a9 100644 --- a/documentation/RELEASE_NOTES.html +++ b/documentation/RELEASE_NOTES.html @@ -15,6 +15,14 @@ EPICS Base 3.15.0.x releases are not intended for use in production systems.

Changes between 3.15.0.1 and 3.15.0.2

+

+General purpose thread pool

+ +

+A general purpose threaded work queue API epicsThreadPool is added. +Multiple pools can be created with controlable priority and number +of worker threads. Lazy worker startup is supported.

+

Database field setting updates

A database (.db) file loaded by an IOC does not have to repeat the record From fc4119094fec1807ecb3ef0b73aabac2ffc1c077 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 24 Jul 2014 14:44:28 -0400 Subject: [PATCH 05/13] thread pool: epicsThreadPoolGetShared accepts NULL as defaults --- src/libCom/pool/threadPool.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/libCom/pool/threadPool.c b/src/libCom/pool/threadPool.c index 7cb3bb39d..4a7c03ea1 100644 --- a/src/libCom/pool/threadPool.c +++ b/src/libCom/pool/threadPool.c @@ -323,10 +323,13 @@ epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig * { ELLNODE *node; epicsThreadPool *cur; + epicsThreadPoolConfig defopts; size_t N=epicsThreadGetCPUs(); - if(!opts) - return NULL; + if(!opts) { + epicsThreadPoolConfigDefaults(&defopts); + opts = &defopts; + } /* shared pools must have a minimum allowed number of workers. * Use the number of CPU cores */ From 69d106306721f6add22528f69df03235ec475ade Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 24 Jul 2014 18:19:33 -0400 Subject: [PATCH 06/13] thread pool: epicsJobQueue return EPERM When pool control prevents operation --- src/libCom/pool/poolJob.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/libCom/pool/poolJob.c b/src/libCom/pool/poolJob.c index 00cc8dfaf..46db31d0e 100644 --- a/src/libCom/pool/poolJob.c +++ b/src/libCom/pool/poolJob.c @@ -235,7 +235,10 @@ int epicsJobQueue(epicsJob* job) assert(!job->dead); - if(pool->pauseadd || job->freewhendone) { + if(pool->pauseadd) { + ret=EPERM; + goto done; + } else if(job->freewhendone) { ret=EINVAL; goto done; } else if(job->queued) { From 83dfc7980dddf530dc7d7bb209d93720f1b21675 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 24 Jul 2014 18:50:19 -0400 Subject: [PATCH 07/13] thread pool: handle failure to create worker epicsJobQueue() returns EAGAIN when the first worker can't be lazily created. Failure to create workers beyond the first is silently ignored. --- src/libCom/pool/poolJob.c | 30 ++++++++++++++++++++++++------ src/libCom/pool/poolPriv.h | 2 +- src/libCom/pool/threadPool.c | 13 ++++++++----- 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/src/libCom/pool/poolJob.c b/src/libCom/pool/poolJob.c index 46db31d0e..f7e9baa88 100644 --- a/src/libCom/pool/poolJob.c +++ b/src/libCom/pool/poolJob.c @@ -123,7 +123,7 @@ void workerMain(void* arg) return; } -void createPoolThread(epicsThreadPool *pool) +int createPoolThread(epicsThreadPool *pool) { epicsThreadId tid; @@ -133,10 +133,11 @@ void createPoolThread(epicsThreadPool *pool) &workerMain, pool); if(!tid) - return; + return 1; pool->threadsRunning++; pool->threadsSleeping++; + return 0; } epicsJob* epicsJobCreate(epicsThreadPool* pool, @@ -277,10 +278,27 @@ int epicsJobQueue(epicsJob* job) /* could create more workers so * will either create a new worker, or wakeup an existing worker */ - pool->threadsWaking++; - epicsEventSignal(pool->workerWakeup); - if(pool->threadsWaking > pool->threadsSleeping) - createPoolThread(pool); + + if(pool->threadsWaking >= pool->threadsSleeping) { + /* all sleeping workers have already been woken. + * start a new worker for this job + */ + if(createPoolThread(pool) && pool->threadsRunning==0) { + /* oops, we couldn't lazy create our first worker + * so this job would never run! + */ + ret = EAGAIN; + job->queued = 0; + /* if threadsRunning==0 then no jobs can be running */ + assert(!job->running); + ellDelete(&pool->jobs, &job->jobnode); + ellAdd(&pool->owned, &job->jobnode); + } + } + if(ret==0) { + pool->threadsWaking++; + epicsEventSignal(pool->workerWakeup); + } CHECKCOUNT(pool); } diff --git a/src/libCom/pool/poolPriv.h b/src/libCom/pool/poolPriv.h index 789ea57a6..4ad0d112c 100644 --- a/src/libCom/pool/poolPriv.h +++ b/src/libCom/pool/poolPriv.h @@ -92,6 +92,6 @@ struct epicsJob { unsigned int dead:1; /* flag to catch use of freed objects */ }; -void createPoolThread(epicsThreadPool *pool); +int createPoolThread(epicsThreadPool *pool); #endif // POOLPRIV_H diff --git a/src/libCom/pool/threadPool.c b/src/libCom/pool/threadPool.c index 4a7c03ea1..ccdc7f2b1 100644 --- a/src/libCom/pool/threadPool.c +++ b/src/libCom/pool/threadPool.c @@ -77,9 +77,8 @@ epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts) createPoolThread(pool); } - epicsMutexUnlock(pool->guard); - if(pool->threadsRunning==0 && pool->conf.initialThreads!=0) { + epicsMutexUnlock(pool->guard); errlogPrintf("Error: Unable to create any threads for thread pool\n"); goto cleanup; @@ -89,6 +88,8 @@ epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts) (unsigned long)pool->conf.initialThreads); } + epicsMutexUnlock(pool->guard); + return pool; cleanup: @@ -131,9 +132,11 @@ void _epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, u } } while(jobs-- && pool->threadsRunning < pool->conf.maxThreads) { - pool->threadsWaking++; - epicsEventSignal(pool->workerWakeup); - createPoolThread(pool); + if(createPoolThread(pool)==0) { + pool->threadsWaking++; + epicsEventSignal(pool->workerWakeup); + } else + break; /* oops, couldn't create worker */ } CHECKCOUNT(pool); } From b3a1fe9c6b29999299130aa01719c57fd9e835e5 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 24 Jul 2014 18:56:27 -0400 Subject: [PATCH 08/13] thread pool: fix return of epicsJobUnqueue() Return 0 on success (was queued, now is not), 1 if not queued initially, and EINVAL if orphaned. --- src/libCom/pool/epicsThreadPool.h | 5 +++-- src/libCom/pool/poolJob.c | 6 +++--- src/libCom/test/epicsThreadPoolTest.c | 12 ++++++------ 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/libCom/pool/epicsThreadPool.h b/src/libCom/pool/epicsThreadPool.h index 85279fc3d..e41dd3653 100644 --- a/src/libCom/pool/epicsThreadPool.h +++ b/src/libCom/pool/epicsThreadPool.h @@ -128,8 +128,9 @@ epicsShareFunc int epicsJobQueue(epicsJob*); /* Remove a job from the run queue if it is queued. * Safe to call from a running job function. - * returns 0 if job already ran, is running, or was not queued before, - * 1 if job was queued and now is not. + * returns 0 if job was queued and now is not. + * 1 if job already ran, is running, or was not queued before, + * Other non-zero on error */ epicsShareFunc int epicsJobUnqueue(epicsJob*); diff --git a/src/libCom/pool/poolJob.c b/src/libCom/pool/poolJob.c index f7e9baa88..3016f4298 100644 --- a/src/libCom/pool/poolJob.c +++ b/src/libCom/pool/poolJob.c @@ -309,11 +309,11 @@ done: int epicsJobUnqueue(epicsJob* job) { - int ret=0; + int ret=1; epicsThreadPool *pool=job->pool; if(!pool) - return 0; + return EINVAL; epicsMutexMustLock(pool->guard); @@ -325,7 +325,7 @@ int epicsJobUnqueue(epicsJob* job) ellAdd(&pool->owned, &job->jobnode); } job->queued=0; - ret=1; + ret=0; } epicsMutexUnlock(pool->guard); diff --git a/src/libCom/test/epicsThreadPoolTest.c b/src/libCom/test/epicsThreadPoolTest.c index 660bdea87..196684e45 100644 --- a/src/libCom/test/epicsThreadPoolTest.c +++ b/src/libCom/test/epicsThreadPoolTest.c @@ -181,7 +181,7 @@ static void cleanupjob1(void* arg, epicsJobMode mode) testOk1(epicsJobQueue(job)==0); - testOk1(epicsJobUnqueue(job)==1); + testOk1(epicsJobUnqueue(job)==0); /* delete later after job finishes, but before pool is destroyed */ } static void cleanupjob2(void* arg, epicsJobMode mode) @@ -191,7 +191,7 @@ static void cleanupjob2(void* arg, epicsJobMode mode) if(mode==epicsJobModeCleanup) epicsJobDestroy(job); /* delete when threadpool is destroyed */ else if(mode==epicsJobModeRun) - testOk1(epicsJobUnqueue(job)==0); + testOk1(epicsJobUnqueue(job)==1); } static epicsJobFunction cleanupjobs[3] = {&cleanupjob0,&cleanupjob1,&cleanupjob2}; @@ -337,23 +337,23 @@ void testcancel(void) /* freeze */ epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0); - testOk1(epicsJobUnqueue(job[0])==0); /* not queued yet */ + testOk1(epicsJobUnqueue(job[0])==1); /* not queued yet */ epicsJobQueue(job[0]); - testOk1(epicsJobUnqueue(job[0])==1); testOk1(epicsJobUnqueue(job[0])==0); + testOk1(epicsJobUnqueue(job[0])==1); epicsThreadSleep(0.01); epicsJobQueue(job[0]); - testOk1(epicsJobUnqueue(job[0])==1); testOk1(epicsJobUnqueue(job[0])==0); + testOk1(epicsJobUnqueue(job[0])==1); epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1); epicsJobQueue(job[1]); /* actually let it run this time */ epicsEventMustWait(cancel[0]); - testOk1(epicsJobUnqueue(job[0])==0); + testOk1(epicsJobUnqueue(job[0])==1); epicsEventSignal(cancel[1]); epicsThreadPoolDestroy(pool); From cbfbce54f9f8ddc71404de0e97efeae2aa2e2d3f Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Fri, 25 Jul 2014 09:59:46 -0400 Subject: [PATCH 09/13] thread pool: epicsThreadPoolWait return ETIMEOUT --- src/libCom/pool/epicsThreadPool.h | 4 ++-- src/libCom/pool/threadPool.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libCom/pool/epicsThreadPool.h b/src/libCom/pool/epicsThreadPool.h index e41dd3653..2bb176dec 100644 --- a/src/libCom/pool/epicsThreadPool.h +++ b/src/libCom/pool/epicsThreadPool.h @@ -81,8 +81,8 @@ epicsShareFunc void epicsThreadPoolControl(epicsThreadPool* pool, /* Block until job queue is emptied and no jobs are running. * Useful after calling epicsThreadPoolControl() with option epicsThreadPoolQueueAdd=0 * - * timeout<0 waits forever, timeout==0 polls, timeout>0 waits for a fixed time - * Returns 1 for timeout, 0 for success, >1 on errors + * timeout<0 waits forever, timeout==0 polls, timeout>0 waits at most one timeout period + * Returns 0 for success or non-zero on error (timeout is ETIMEOUT) */ epicsShareFunc int epicsThreadPoolWait(epicsThreadPool* pool, double timeout); diff --git a/src/libCom/pool/threadPool.c b/src/libCom/pool/threadPool.c index ccdc7f2b1..1cfc0e88b 100644 --- a/src/libCom/pool/threadPool.c +++ b/src/libCom/pool/threadPool.c @@ -170,7 +170,7 @@ int epicsThreadPoolWait(epicsThreadPool* pool, double timeout) cantProceed("epicsThreadPoolWait: failed to wait for Event"); break; case epicsEventWaitTimeout: - ret=EWOULDBLOCK; + ret=ETIMEDOUT; break; case epicsEventWaitOK: ret=0; From 95b916ecd4f7ace04dc5af43c4073add1eaaf8ee Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Fri, 25 Jul 2014 10:16:20 -0400 Subject: [PATCH 10/13] thread pool: mark epicsJobCreate() as safe for job functions Also, use epicsJobMove() to avoid some redundant code --- src/libCom/pool/epicsThreadPool.h | 1 + src/libCom/pool/poolJob.c | 10 ++-------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/libCom/pool/epicsThreadPool.h b/src/libCom/pool/epicsThreadPool.h index 2bb176dec..1f2d12069 100644 --- a/src/libCom/pool/epicsThreadPool.h +++ b/src/libCom/pool/epicsThreadPool.h @@ -100,6 +100,7 @@ epicsShareExtern void* _epicsJobArgSelf; /* creates, but does not add, a new job. * If pool in NULL then the job is not associated with any pool and * epicsJobMove() must be called before epicsJobQueue() + * Safe to call from a running job function. * returns a new job pointer, or NULL on error */ epicsShareFunc epicsJob* epicsJobCreate(epicsThreadPool* pool, diff --git a/src/libCom/pool/poolJob.c b/src/libCom/pool/poolJob.c index 3016f4298..b351a37ab 100644 --- a/src/libCom/pool/poolJob.c +++ b/src/libCom/pool/poolJob.c @@ -152,17 +152,11 @@ epicsJob* epicsJobCreate(epicsThreadPool* pool, if(arg==&_epicsJobArgSelf) arg=job; - job->pool=pool; + job->pool=NULL; job->func=func; job->arg=arg; - if(pool) { - epicsMutexMustLock(pool->guard); - - ellAdd(&pool->owned, &job->jobnode); - - epicsMutexUnlock(pool->guard); - } + epicsJobMove(job, pool); return job; } From 6fee83900eecd04b1bb42108ad11351b87413496 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 29 Jul 2014 12:06:24 -0400 Subject: [PATCH 11/13] don't include errCommon.h doesn't exist anymore --- src/libCom/pool/poolJob.c | 1 - src/libCom/pool/threadPool.c | 1 - 2 files changed, 2 deletions(-) diff --git a/src/libCom/pool/poolJob.c b/src/libCom/pool/poolJob.c index b351a37ab..0b731767b 100644 --- a/src/libCom/pool/poolJob.c +++ b/src/libCom/pool/poolJob.c @@ -18,7 +18,6 @@ #include "epicsMutex.h" #include "epicsEvent.h" #include "epicsInterrupt.h" -#include "errCommon.h" #include "epicsThreadPool.h" #include "poolPriv.h" diff --git a/src/libCom/pool/threadPool.c b/src/libCom/pool/threadPool.c index 1cfc0e88b..70295fa5b 100644 --- a/src/libCom/pool/threadPool.c +++ b/src/libCom/pool/threadPool.c @@ -18,7 +18,6 @@ #include "epicsMutex.h" #include "epicsEvent.h" #include "epicsInterrupt.h" -#include "errCommon.h" #include "cantProceed.h" #include "epicsThreadPool.h" From b1a8b2f20e9b65fc14470fc9c00762432f127e0e Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 29 Jul 2014 12:18:19 -0400 Subject: [PATCH 12/13] thread pool: switch thread counts to unsigned int size_t is considered overly optimistic --- src/libCom/pool/epicsThreadPool.h | 6 +++--- src/libCom/pool/poolPriv.h | 10 +++++----- src/libCom/pool/threadPool.c | 20 ++++++++++---------- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/libCom/pool/epicsThreadPool.h b/src/libCom/pool/epicsThreadPool.h index 1f2d12069..b8d991f58 100644 --- a/src/libCom/pool/epicsThreadPool.h +++ b/src/libCom/pool/epicsThreadPool.h @@ -20,8 +20,8 @@ extern "C" { #endif typedef struct { - size_t initialThreads; - size_t maxThreads; + unsigned int initialThreads; + unsigned int maxThreads; unsigned int workerStack; unsigned int workerPriority; } epicsThreadPoolConfig; @@ -141,7 +141,7 @@ epicsShareFunc int epicsJobUnqueue(epicsJob*); epicsShareFunc void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd); /* Current number of active workers. May be less than the maximum */ -epicsShareFunc size_t epicsThreadPoolNThreads(epicsThreadPool *); +epicsShareFunc unsigned int epicsThreadPoolNThreads(epicsThreadPool *); #ifdef __cplusplus } diff --git a/src/libCom/pool/poolPriv.h b/src/libCom/pool/poolPriv.h index 4ad0d112c..7142f69e1 100644 --- a/src/libCom/pool/poolPriv.h +++ b/src/libCom/pool/poolPriv.h @@ -28,16 +28,16 @@ struct epicsThreadPool { */ /* # of running workers which are not waiting for a wakeup event */ - size_t threadsAreAwake; + unsigned int threadsAreAwake; /* # of sleeping workers which need to be awakened */ - size_t threadsWaking; + unsigned int threadsWaking; /* # of workers waiting on the workerWakeup event */ - size_t threadsSleeping; + unsigned int threadsSleeping; /* # of threads started and not stopped */ - size_t threadsRunning; + unsigned int threadsRunning; /* # of observers waiting on pool events */ - size_t observerCount; + unsigned int observerCount; epicsEventId workerWakeup; epicsEventId shutdownEvent; diff --git a/src/libCom/pool/threadPool.c b/src/libCom/pool/threadPool.c index 70295fa5b..2c373f23b 100644 --- a/src/libCom/pool/threadPool.c +++ b/src/libCom/pool/threadPool.c @@ -264,12 +264,12 @@ void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd) ELLNODE *cur; epicsMutexMustLock(pool->guard); - fprintf(fd, "Thread Pool with %lu/%lu threads\n" - " running %d jobs with %lu threads\n", - (unsigned long)pool->threadsRunning, - (unsigned long)pool->conf.maxThreads, + fprintf(fd, "Thread Pool with %u/%u threads\n" + " running %d jobs with %u threads\n", + pool->threadsRunning, + pool->conf.maxThreads, ellCount(&pool->jobs), - (unsigned long)pool->threadsAreAwake); + pool->threadsAreAwake); if(pool->pauseadd) fprintf(fd, " Inhibit queueing\n"); if(pool->pauserun) @@ -280,9 +280,9 @@ void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd) for(cur=ellFirst(&pool->jobs); cur; cur=ellNext(cur)) { epicsJob *job=CONTAINER(cur, epicsJob, jobnode); - fprintf(fd, " job 0x%lu func: 0x%lu, arg: 0x%lu ", - (unsigned long)job, (unsigned long)job->func, - (unsigned long)job->arg); + fprintf(fd, " job %p func: %p, arg: %p ", + job, job->func, + job->arg); if(job->queued) fprintf(fd, "Queued "); if(job->running) @@ -295,9 +295,9 @@ void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd) epicsMutexUnlock(pool->guard); } -size_t epicsThreadPoolNThreads(epicsThreadPool *pool) +unsigned int epicsThreadPoolNThreads(epicsThreadPool *pool) { - size_t ret; + unsigned int ret; epicsMutexMustLock(pool->guard); ret=pool->threadsRunning; From 716f2679a7fe61be8d638d7da806e0a6809cc7b4 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 29 Jul 2014 12:21:07 -0400 Subject: [PATCH 13/13] thread pool: don't use reserved names Avoid global symbols with leading underscore --- src/libCom/pool/epicsThreadPool.h | 4 ++-- src/libCom/pool/poolJob.c | 4 ++-- src/libCom/pool/threadPool.c | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/libCom/pool/epicsThreadPool.h b/src/libCom/pool/epicsThreadPool.h index b8d991f58..ef6f2f3ba 100644 --- a/src/libCom/pool/epicsThreadPool.h +++ b/src/libCom/pool/epicsThreadPool.h @@ -94,8 +94,8 @@ epicsShareFunc int epicsThreadPoolWait(epicsThreadPool* pool, double timeout); * the argument passed to the job callback * will be the epicsJob* */ -#define EPICSJOB_SELF _epicsJobArgSelf -epicsShareExtern void* _epicsJobArgSelf; +#define EPICSJOB_SELF epicsJobArgSelfMagic +epicsShareExtern void* epicsJobArgSelfMagic; /* creates, but does not add, a new job. * If pool in NULL then the job is not associated with any pool and diff --git a/src/libCom/pool/poolJob.c b/src/libCom/pool/poolJob.c index 0b731767b..03dc100fa 100644 --- a/src/libCom/pool/poolJob.c +++ b/src/libCom/pool/poolJob.c @@ -22,7 +22,7 @@ #include "epicsThreadPool.h" #include "poolPriv.h" -void* _epicsJobArgSelf = &_epicsJobArgSelf; +void* epicsJobArgSelfMagic = &epicsJobArgSelfMagic; static void workerMain(void* arg) @@ -148,7 +148,7 @@ epicsJob* epicsJobCreate(epicsThreadPool* pool, if(!job) return NULL; - if(arg==&_epicsJobArgSelf) + if(arg==&epicsJobArgSelfMagic) arg=job; job->pool=NULL; diff --git a/src/libCom/pool/threadPool.c b/src/libCom/pool/threadPool.c index 2c373f23b..72677b2ce 100644 --- a/src/libCom/pool/threadPool.c +++ b/src/libCom/pool/threadPool.c @@ -102,7 +102,7 @@ cleanup: } static -void _epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val) +void epicsThreadPoolControlImpl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val) { if(pool->freezeopt) return; @@ -147,7 +147,7 @@ void _epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, u void epicsThreadPoolControl(epicsThreadPool* pool, epicsThreadPoolOption opt, unsigned int val) { epicsMutexMustLock(pool->guard); - _epicsThreadPoolControl(pool, opt, val); + epicsThreadPoolControlImpl(pool, opt, val); epicsMutexUnlock(pool->guard); } @@ -206,8 +206,8 @@ void epicsThreadPoolDestroy(epicsThreadPool *pool) epicsMutexMustLock(pool->guard); /* run remaining queued jobs */ - _epicsThreadPoolControl(pool, epicsThreadPoolQueueAdd, 0); - _epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1); + epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueAdd, 0); + epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueRun, 1); nThr=pool->threadsRunning; pool->freezeopt = 1;