Merged Michael's thread-pool branch.
Made various formatting changes, and fixed one problem in the test program which failed a test on UP machines.
This commit is contained in:
@@ -15,6 +15,14 @@ EPICS Base 3.15.0.x releases are not intended for use in production systems.</p>
|
||||
<h2 align="center">Changes between 3.15.0.1 and 3.15.0.2</h2>
|
||||
<!-- Insert new items immediately below here ... -->
|
||||
|
||||
<h3>
|
||||
General purpose thread pool</h3>
|
||||
|
||||
<p>
|
||||
A general purpose threaded work queue API epicsThreadPool is added.
|
||||
Multiple pools can be created with controlable priority and number
|
||||
of worker threads. Lazy worker startup is supported.</p>
|
||||
|
||||
<h3>Database field setting updates</h3>
|
||||
|
||||
<p>A database (.db) file loaded by an IOC does not have to repeat the record
|
||||
|
||||
@@ -31,6 +31,7 @@ include $(LIBCOM)/log/Makefile
|
||||
include $(LIBCOM)/macLib/Makefile
|
||||
include $(LIBCOM)/misc/Makefile
|
||||
include $(LIBCOM)/osi/Makefile
|
||||
include $(LIBCOM)/pool/Makefile
|
||||
include $(LIBCOM)/ring/Makefile
|
||||
include $(LIBCOM)/taskwd/Makefile
|
||||
include $(LIBCOM)/timer/Makefile
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
#*************************************************************************
|
||||
# Copyright (c) 2014 UChicago Argonne LLC, as Operator of Argonne
|
||||
# National Laboratory.
|
||||
# EPICS BASE is distributed subject to a Software License Agreement found
|
||||
# in file LICENSE that is included with this distribution.
|
||||
#*************************************************************************
|
||||
|
||||
# This is a Makefile fragment, see src/libCom/Makefile.
|
||||
|
||||
SRC_DIRS += $(LIBCOM)/pool
|
||||
|
||||
INC += epicsThreadPool.h
|
||||
|
||||
Com_SRCS += poolJob.c
|
||||
Com_SRCS += threadPool.c
|
||||
|
||||
@@ -0,0 +1,152 @@
|
||||
/*************************************************************************\
|
||||
* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
|
||||
* Brookhaven National Laboratory.
|
||||
* EPICS BASE is distributed subject to a Software License Agreement found
|
||||
* in file LICENSE that is included with this distribution.
|
||||
\*************************************************************************/
|
||||
/* General purpose worker thread pool manager
|
||||
* mdavidsaver@bnl.gov
|
||||
*/
|
||||
#ifndef EPICSTHREADPOOL_H
|
||||
#define EPICSTHREADPOOL_H
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include "shareLib.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
unsigned int initialThreads;
|
||||
unsigned int maxThreads;
|
||||
unsigned int workerStack;
|
||||
unsigned int workerPriority;
|
||||
} epicsThreadPoolConfig;
|
||||
|
||||
typedef struct epicsThreadPool epicsThreadPool;
|
||||
|
||||
/* Job function call modes */
|
||||
typedef enum {
|
||||
/* Normal run of job */
|
||||
epicsJobModeRun,
|
||||
|
||||
/* Thread pool is being destroyed.
|
||||
* A chance to cleanup the job immediately with epicsJobDestroy().
|
||||
* If ignored, the job is orphaned (dissociated from the thread pool)
|
||||
* and epicsJobDestroy() must be called later.
|
||||
*/
|
||||
epicsJobModeCleanup
|
||||
} epicsJobMode;
|
||||
|
||||
typedef void (*epicsJobFunction)(void* arg, epicsJobMode mode);
|
||||
|
||||
typedef struct epicsJob epicsJob;
|
||||
|
||||
/* Pool operations */
|
||||
|
||||
/* Initialize a pool config with default values.
|
||||
* This much be done to preserve future compatibility
|
||||
* when new options are added.
|
||||
*/
|
||||
epicsShareFunc void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *);
|
||||
|
||||
/* fetch or create a thread pool which can be shared with other users.
|
||||
* may return NULL for allocation failures
|
||||
*/
|
||||
epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts);
|
||||
epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool);
|
||||
|
||||
/* If opts is NULL then defaults are used.
|
||||
* The opts pointer is not stored by this call, and may exist on the stack.
|
||||
*/
|
||||
epicsShareFunc epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts);
|
||||
|
||||
/* Blocks until all worker threads have stopped.
|
||||
* Any jobs still attached to this pool receive a callback with EPICSJOB_CLEANUP
|
||||
* and are then orphaned.
|
||||
*/
|
||||
epicsShareFunc void epicsThreadPoolDestroy(epicsThreadPool *);
|
||||
|
||||
/* pool control options */
|
||||
typedef enum {
|
||||
epicsThreadPoolQueueAdd, /* val==0 causes epicsJobQueue to fail, 1 is default */
|
||||
epicsThreadPoolQueueRun /* val==0 prevents workers from running jobs, 1 is default */
|
||||
} epicsThreadPoolOption;
|
||||
|
||||
epicsShareFunc void epicsThreadPoolControl(epicsThreadPool* pool,
|
||||
epicsThreadPoolOption opt,
|
||||
unsigned int val);
|
||||
|
||||
/* Block until job queue is emptied and no jobs are running.
|
||||
* Useful after calling epicsThreadPoolControl() with option epicsThreadPoolQueueAdd=0
|
||||
*
|
||||
* timeout<0 waits forever, timeout==0 polls, timeout>0 waits at most one timeout period
|
||||
* Returns 0 for success or non-zero on error (timeout is ETIMEOUT)
|
||||
*/
|
||||
epicsShareFunc int epicsThreadPoolWait(epicsThreadPool* pool, double timeout);
|
||||
|
||||
|
||||
/* Per job operations */
|
||||
|
||||
/* Special flag for epicsJobCreate().
|
||||
* When passed as the third argument "user"
|
||||
* the argument passed to the job callback
|
||||
* will be the epicsJob*
|
||||
*/
|
||||
#define EPICSJOB_SELF epicsJobArgSelfMagic
|
||||
epicsShareExtern void* epicsJobArgSelfMagic;
|
||||
|
||||
/* Creates, but does not add, a new job.
|
||||
* If pool is NULL then the job is not associated with any pool and
|
||||
* epicsJobMove() must be called before epicsJobQueue().
|
||||
* Safe to call from a running job function.
|
||||
* Returns a new job pointer, or NULL on error.
|
||||
*/
|
||||
epicsShareFunc epicsJob* epicsJobCreate(epicsThreadPool* pool,
|
||||
epicsJobFunction cb,
|
||||
void* user);
|
||||
|
||||
/* Cancel and free a job structure. Does not block.
|
||||
* Job may not be immediately free'd.
|
||||
* Safe to call from a running job function.
|
||||
*/
|
||||
epicsShareFunc void epicsJobDestroy(epicsJob*);
|
||||
|
||||
/* Move the job to a different pool.
|
||||
* If pool is NULL then the job will no longer be associated
|
||||
* with any pool.
|
||||
* Not thread safe. Job must not be running or queued.
|
||||
* returns 0 on success, non-zero on error.
|
||||
*/
|
||||
epicsShareFunc int epicsJobMove(epicsJob* job, epicsThreadPool* pool);
|
||||
|
||||
/* Adds the job to the run queue
|
||||
* Safe to call from a running job function.
|
||||
* returns 0 for success, non-zero on error.
|
||||
*/
|
||||
epicsShareFunc int epicsJobQueue(epicsJob*);
|
||||
|
||||
/* Remove a job from the run queue if it is queued.
|
||||
* Safe to call from a running job function.
|
||||
* returns 0 if job was queued and now is not.
|
||||
* 1 if job already ran, is running, or was not queued before,
|
||||
* Other non-zero on error
|
||||
*/
|
||||
epicsShareFunc int epicsJobUnqueue(epicsJob*);
|
||||
|
||||
|
||||
/* Mostly useful for debugging */
|
||||
|
||||
epicsShareFunc void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd);
|
||||
|
||||
/* Current number of active workers. May be less than the maximum */
|
||||
epicsShareFunc unsigned int epicsThreadPoolNThreads(epicsThreadPool *);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // EPICSTHREADPOOL_H
|
||||
@@ -0,0 +1,327 @@
|
||||
/*************************************************************************\
|
||||
* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
|
||||
* Brookhaven National Laboratory.
|
||||
* EPICS BASE is distributed subject to a Software License Agreement found
|
||||
* in file LICENSE that is included with this distribution.
|
||||
\*************************************************************************/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
|
||||
#include "dbDefs.h"
|
||||
#include "errlog.h"
|
||||
#include "ellLib.h"
|
||||
#include "epicsThread.h"
|
||||
#include "epicsMutex.h"
|
||||
#include "epicsEvent.h"
|
||||
#include "epicsInterrupt.h"
|
||||
|
||||
#include "epicsThreadPool.h"
|
||||
#include "poolPriv.h"
|
||||
|
||||
void *epicsJobArgSelfMagic = &epicsJobArgSelfMagic;
|
||||
|
||||
static
|
||||
void workerMain(void *arg)
|
||||
{
|
||||
epicsThreadPool *pool = arg;
|
||||
unsigned int nrun, ocnt;
|
||||
|
||||
/* workers are created with counts
|
||||
* in the running, sleeping, and (possibly) waking counters
|
||||
*/
|
||||
|
||||
epicsMutexMustLock(pool->guard);
|
||||
pool->threadsAreAwake++;
|
||||
pool->threadsSleeping--;
|
||||
|
||||
while (1) {
|
||||
ELLNODE *cur;
|
||||
|
||||
pool->threadsAreAwake--;
|
||||
pool->threadsSleeping++;
|
||||
epicsMutexUnlock(pool->guard);
|
||||
|
||||
epicsEventMustWait(pool->workerWakeup);
|
||||
|
||||
epicsMutexMustLock(pool->guard);
|
||||
pool->threadsSleeping--;
|
||||
pool->threadsAreAwake++;
|
||||
|
||||
if (pool->threadsWaking==0)
|
||||
continue;
|
||||
|
||||
pool->threadsWaking--;
|
||||
|
||||
CHECKCOUNT(pool);
|
||||
|
||||
if (pool->shutdown)
|
||||
break;
|
||||
|
||||
if (pool->pauserun)
|
||||
continue;
|
||||
|
||||
/* more threads to wakeup */
|
||||
if (pool->threadsWaking) {
|
||||
epicsEventSignal(pool->workerWakeup);
|
||||
}
|
||||
|
||||
while ((cur=ellGet(&pool->jobs)) != NULL) {
|
||||
epicsJob *job = CONTAINER(cur, epicsJob, jobnode);
|
||||
|
||||
assert(job->queued && !job->running);
|
||||
|
||||
job->queued=0;
|
||||
job->running=1;
|
||||
|
||||
epicsMutexUnlock(pool->guard);
|
||||
(*job->func)(job->arg, epicsJobModeRun);
|
||||
epicsMutexMustLock(pool->guard);
|
||||
|
||||
if (job->freewhendone) {
|
||||
job->dead=1;
|
||||
free(job);
|
||||
}
|
||||
else {
|
||||
job->running=0;
|
||||
/* job may be re-queued from within callback */
|
||||
if (job->queued)
|
||||
ellAdd(&pool->jobs, &job->jobnode);
|
||||
else
|
||||
ellAdd(&pool->owned, &job->jobnode);
|
||||
}
|
||||
}
|
||||
|
||||
if (pool->observerCount)
|
||||
epicsEventSignal(pool->observerWakeup);
|
||||
}
|
||||
|
||||
pool->threadsAreAwake--;
|
||||
pool->threadsRunning--;
|
||||
|
||||
nrun = pool->threadsRunning;
|
||||
ocnt = pool->observerCount;
|
||||
epicsMutexUnlock(pool->guard);
|
||||
|
||||
if (ocnt)
|
||||
epicsEventSignal(pool->observerWakeup);
|
||||
|
||||
if (nrun)
|
||||
epicsEventSignal(pool->workerWakeup); /* pass along */
|
||||
else
|
||||
epicsEventSignal(pool->shutdownEvent);
|
||||
}
|
||||
|
||||
int createPoolThread(epicsThreadPool *pool)
|
||||
{
|
||||
epicsThreadId tid;
|
||||
|
||||
tid = epicsThreadCreate("PoolWorker",
|
||||
pool->conf.workerPriority,
|
||||
pool->conf.workerStack,
|
||||
&workerMain,
|
||||
pool);
|
||||
if (!tid)
|
||||
return 1;
|
||||
|
||||
pool->threadsRunning++;
|
||||
pool->threadsSleeping++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
epicsJob* epicsJobCreate(epicsThreadPool *pool,
|
||||
epicsJobFunction func,
|
||||
void *arg)
|
||||
{
|
||||
epicsJob *job = calloc(1, sizeof(*job));
|
||||
|
||||
if (!job)
|
||||
return NULL;
|
||||
|
||||
if (arg == &epicsJobArgSelfMagic)
|
||||
arg = job;
|
||||
|
||||
job->pool = NULL;
|
||||
job->func = func;
|
||||
job->arg = arg;
|
||||
|
||||
epicsJobMove(job, pool);
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
void epicsJobDestroy(epicsJob *job)
|
||||
{
|
||||
epicsThreadPool *pool;
|
||||
if (!job || !job->pool) {
|
||||
free(job);
|
||||
return;
|
||||
}
|
||||
pool = job->pool;
|
||||
|
||||
epicsMutexMustLock(pool->guard);
|
||||
|
||||
assert(!job->dead);
|
||||
|
||||
epicsJobUnqueue(job);
|
||||
|
||||
if (job->running || job->freewhendone) {
|
||||
job->freewhendone = 1;
|
||||
}
|
||||
else {
|
||||
ellDelete(&pool->owned, &job->jobnode);
|
||||
job->dead = 1;
|
||||
free(job);
|
||||
}
|
||||
|
||||
epicsMutexUnlock(pool->guard);
|
||||
}
|
||||
|
||||
int epicsJobMove(epicsJob *job, epicsThreadPool *newpool)
|
||||
{
|
||||
epicsThreadPool *pool = job->pool;
|
||||
|
||||
/* remove from current pool */
|
||||
if (pool) {
|
||||
epicsMutexMustLock(pool->guard);
|
||||
|
||||
if (job->queued || job->running) {
|
||||
epicsMutexUnlock(pool->guard);
|
||||
return EINVAL;
|
||||
}
|
||||
|
||||
ellDelete(&pool->owned, &job->jobnode);
|
||||
|
||||
epicsMutexUnlock(pool->guard);
|
||||
}
|
||||
|
||||
pool = job->pool = newpool;
|
||||
|
||||
/* add to new pool */
|
||||
if (pool) {
|
||||
epicsMutexMustLock(pool->guard);
|
||||
|
||||
ellAdd(&pool->owned, &job->jobnode);
|
||||
|
||||
epicsMutexUnlock(pool->guard);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int epicsJobQueue(epicsJob *job)
|
||||
{
|
||||
int ret = 0;
|
||||
epicsThreadPool *pool = job->pool;
|
||||
|
||||
if (!pool)
|
||||
return EINVAL;
|
||||
|
||||
epicsMutexMustLock(pool->guard);
|
||||
|
||||
assert(!job->dead);
|
||||
|
||||
if (pool->pauseadd) {
|
||||
ret = EPERM;
|
||||
goto done;
|
||||
}
|
||||
else if (job->freewhendone) {
|
||||
ret = EINVAL;
|
||||
goto done;
|
||||
}
|
||||
else if (job->queued) {
|
||||
goto done;
|
||||
}
|
||||
|
||||
job->queued = 1;
|
||||
/* Job may be queued from within a callback */
|
||||
if (!job->running) {
|
||||
ellDelete(&pool->owned, &job->jobnode);
|
||||
ellAdd(&pool->jobs, &job->jobnode);
|
||||
}
|
||||
else {
|
||||
/* some worker will find it again before sleeping */
|
||||
goto done;
|
||||
}
|
||||
|
||||
/* Since we hold the lock, we can be certain that all awake worker are
|
||||
* executing work functions. The current thread may be a worker.
|
||||
* We prefer to wakeup a new worker rather then wait for a busy worker to
|
||||
* finish. However, after we initiate a wakeup there will be a race
|
||||
* between the worker waking up, and a busy worker finishing.
|
||||
* Thus we can't avoid spurious wakeups.
|
||||
*/
|
||||
|
||||
if (pool->threadsRunning >= pool->conf.maxThreads) {
|
||||
/* all workers created... */
|
||||
/* ... but some are sleeping, so wake one up */
|
||||
if (pool->threadsWaking < pool->threadsSleeping) {
|
||||
pool->threadsWaking++;
|
||||
epicsEventSignal(pool->workerWakeup);
|
||||
}
|
||||
/*else one of the running workers will find this job before sleeping */
|
||||
CHECKCOUNT(pool);
|
||||
|
||||
}
|
||||
else {
|
||||
/* could create more workers so
|
||||
* will either create a new worker, or wakeup an existing worker
|
||||
*/
|
||||
|
||||
if (pool->threadsWaking >= pool->threadsSleeping) {
|
||||
/* all sleeping workers have already been woken.
|
||||
* start a new worker for this job
|
||||
*/
|
||||
if (createPoolThread(pool) && pool->threadsRunning == 0) {
|
||||
/* oops, we couldn't lazy create our first worker
|
||||
* so this job would never run!
|
||||
*/
|
||||
ret = EAGAIN;
|
||||
job->queued = 0;
|
||||
/* if threadsRunning==0 then no jobs can be running */
|
||||
assert(!job->running);
|
||||
ellDelete(&pool->jobs, &job->jobnode);
|
||||
ellAdd(&pool->owned, &job->jobnode);
|
||||
}
|
||||
}
|
||||
if (ret == 0) {
|
||||
pool->threadsWaking++;
|
||||
epicsEventSignal(pool->workerWakeup);
|
||||
}
|
||||
CHECKCOUNT(pool);
|
||||
}
|
||||
|
||||
done:
|
||||
epicsMutexUnlock(pool->guard);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int epicsJobUnqueue(epicsJob *job)
|
||||
{
|
||||
int ret = 1;
|
||||
epicsThreadPool *pool = job->pool;
|
||||
|
||||
if (!pool)
|
||||
return EINVAL;
|
||||
|
||||
epicsMutexMustLock(pool->guard);
|
||||
|
||||
assert(!job->dead);
|
||||
|
||||
if (job->queued) {
|
||||
if (!job->running) {
|
||||
ellDelete(&pool->jobs, &job->jobnode);
|
||||
ellAdd(&pool->owned, &job->jobnode);
|
||||
}
|
||||
job->queued = 0;
|
||||
ret = 0;
|
||||
}
|
||||
|
||||
epicsMutexUnlock(pool->guard);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
/*************************************************************************\
|
||||
* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
|
||||
* Brookhaven National Laboratory.
|
||||
* EPICS BASE is distributed subject to a Software License Agreement found
|
||||
* in file LICENSE that is included with this distribution.
|
||||
\*************************************************************************/
|
||||
|
||||
#ifndef POOLPRIV_H
|
||||
#define POOLPRIV_H
|
||||
|
||||
#include "epicsThreadPool.h"
|
||||
#include "ellLib.h"
|
||||
#include "epicsThread.h"
|
||||
#include "epicsEvent.h"
|
||||
#include "epicsMutex.h"
|
||||
|
||||
struct epicsThreadPool {
|
||||
ELLNODE sharedNode;
|
||||
size_t sharedCount;
|
||||
|
||||
ELLLIST jobs; /* run queue */
|
||||
ELLLIST owned; /* unqueued jobs. */
|
||||
|
||||
/* Worker state counters.
|
||||
* The life cycle of a worker is
|
||||
* Wakeup -> Awake -> Sleeping
|
||||
* Newly created workers go into the wakeup state
|
||||
*/
|
||||
|
||||
/* # of running workers which are not waiting for a wakeup event */
|
||||
unsigned int threadsAreAwake;
|
||||
/* # of sleeping workers which need to be awakened */
|
||||
unsigned int threadsWaking;
|
||||
/* # of workers waiting on the workerWakeup event */
|
||||
unsigned int threadsSleeping;
|
||||
/* # of threads started and not stopped */
|
||||
unsigned int threadsRunning;
|
||||
|
||||
/* # of observers waiting on pool events */
|
||||
unsigned int observerCount;
|
||||
|
||||
epicsEventId workerWakeup;
|
||||
epicsEventId shutdownEvent;
|
||||
|
||||
epicsEventId observerWakeup;
|
||||
|
||||
/* Disallow epicsJobQueue */
|
||||
unsigned int pauseadd:1;
|
||||
/* Prevent workers from running new jobs */
|
||||
unsigned int pauserun:1;
|
||||
/* Prevent further changes to pool options */
|
||||
unsigned int freezeopt:1;
|
||||
/* tell workers to exit */
|
||||
unsigned int shutdown:1;
|
||||
|
||||
epicsMutexId guard;
|
||||
|
||||
/* copy of config passed when created */
|
||||
epicsThreadPoolConfig conf;
|
||||
};
|
||||
|
||||
/* Called after manipulating counters to check that invariants are preserved */
|
||||
#define CHECKCOUNT(pPool) do { \
|
||||
if (!(pPool)->shutdown) { \
|
||||
assert((pPool)->threadsAreAwake + (pPool)->threadsSleeping == (pPool)->threadsRunning); \
|
||||
assert((pPool)->threadsWaking <= (pPool)->threadsSleeping); \
|
||||
} \
|
||||
} while(0)
|
||||
|
||||
/* When created a job is idle. queued and running are false
|
||||
* and jobnode is in the thread pool's owned list.
|
||||
*
|
||||
* When the job is added, the queued flag is set and jobnode
|
||||
* is in the jobs list.
|
||||
*
|
||||
* When the job starts running the queued flag is cleared and
|
||||
* the running flag is set. jobnode is not in any list
|
||||
* (held locally by worker).
|
||||
*
|
||||
* When the job has finished running, the running flag is cleared.
|
||||
* The queued flag may be set if the job re-added itself.
|
||||
* Based on the queued flag jobnode is added to the appropriate
|
||||
* list.
|
||||
*/
|
||||
struct epicsJob {
|
||||
ELLNODE jobnode;
|
||||
epicsJobFunction func;
|
||||
void *arg;
|
||||
epicsThreadPool *pool;
|
||||
|
||||
unsigned int queued:1;
|
||||
unsigned int running:1;
|
||||
unsigned int freewhendone:1; /* lazy delete of running job */
|
||||
unsigned int dead:1; /* flag to catch use of freed objects */
|
||||
};
|
||||
|
||||
int createPoolThread(epicsThreadPool *pool);
|
||||
|
||||
#endif // POOLPRIV_H
|
||||
@@ -0,0 +1,403 @@
|
||||
/*************************************************************************\
|
||||
* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
|
||||
* Brookhaven National Laboratory.
|
||||
* EPICS BASE is distributed subject to a Software License Agreement found
|
||||
* in file LICENSE that is included with this distribution.
|
||||
\*************************************************************************/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
|
||||
#define epicsExportSharedSymbols
|
||||
|
||||
#include "dbDefs.h"
|
||||
#include "errlog.h"
|
||||
#include "ellLib.h"
|
||||
#include "epicsThread.h"
|
||||
#include "epicsMutex.h"
|
||||
#include "epicsEvent.h"
|
||||
#include "epicsInterrupt.h"
|
||||
#include "cantProceed.h"
|
||||
|
||||
#include "epicsThreadPool.h"
|
||||
#include "poolPriv.h"
|
||||
|
||||
|
||||
void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts)
|
||||
{
|
||||
memset(opts, 0, sizeof(*opts));
|
||||
opts->maxThreads = epicsThreadGetCPUs();
|
||||
opts->workerStack = epicsThreadGetStackSize(epicsThreadStackSmall);
|
||||
|
||||
if (epicsThreadLowestPriorityLevelAbove(epicsThreadPriorityCAServerHigh, &opts->workerPriority)
|
||||
!= epicsThreadBooleanStatusSuccess)
|
||||
opts->workerPriority = epicsThreadPriorityMedium;
|
||||
}
|
||||
|
||||
epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts)
|
||||
{
|
||||
size_t i;
|
||||
epicsThreadPool *pool;
|
||||
|
||||
/* caller likely didn't initialize the options structure */
|
||||
if (opts && opts->maxThreads == 0) {
|
||||
errlogMessage("Error: epicsThreadPoolCreate() options provided, but not initialized");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pool = calloc(1, sizeof(*pool));
|
||||
if (!pool)
|
||||
return NULL;
|
||||
|
||||
if (opts)
|
||||
memcpy(&pool->conf, opts, sizeof(*opts));
|
||||
else
|
||||
epicsThreadPoolConfigDefaults(&pool->conf);
|
||||
|
||||
if (pool->conf.initialThreads > pool->conf.maxThreads)
|
||||
pool->conf.initialThreads = pool->conf.maxThreads;
|
||||
|
||||
pool->workerWakeup = epicsEventCreate(epicsEventEmpty);
|
||||
pool->shutdownEvent = epicsEventCreate(epicsEventEmpty);
|
||||
pool->observerWakeup = epicsEventCreate(epicsEventEmpty);
|
||||
pool->guard = epicsMutexCreate();
|
||||
|
||||
if (!pool->workerWakeup || !pool->shutdownEvent ||
|
||||
!pool->observerWakeup || !pool->guard)
|
||||
goto cleanup;
|
||||
|
||||
ellInit(&pool->jobs);
|
||||
ellInit(&pool->owned);
|
||||
|
||||
epicsMutexMustLock(pool->guard);
|
||||
|
||||
for (i = 0; i < pool->conf.initialThreads; i++) {
|
||||
createPoolThread(pool);
|
||||
}
|
||||
|
||||
if (pool->threadsRunning == 0 && pool->conf.initialThreads != 0) {
|
||||
epicsMutexUnlock(pool->guard);
|
||||
errlogPrintf("Error: Unable to create any threads for thread pool\n");
|
||||
goto cleanup;
|
||||
|
||||
}
|
||||
else if (pool->threadsRunning < pool->conf.initialThreads) {
|
||||
errlogPrintf("Warning: Unable to create all threads for thread pool (%u/%u)\n",
|
||||
pool->threadsRunning, pool->conf.initialThreads);
|
||||
}
|
||||
|
||||
epicsMutexUnlock(pool->guard);
|
||||
|
||||
return pool;
|
||||
|
||||
cleanup:
|
||||
if (pool->workerWakeup)
|
||||
epicsEventDestroy(pool->workerWakeup);
|
||||
if (pool->shutdownEvent)
|
||||
epicsEventDestroy(pool->shutdownEvent);
|
||||
if (pool->observerWakeup)
|
||||
epicsEventDestroy(pool->observerWakeup);
|
||||
if (pool->guard)
|
||||
epicsMutexDestroy(pool->guard);
|
||||
|
||||
free(pool);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static
|
||||
void epicsThreadPoolControlImpl(epicsThreadPool *pool, epicsThreadPoolOption opt, unsigned int val)
|
||||
{
|
||||
if (pool->freezeopt)
|
||||
return;
|
||||
|
||||
if (opt == epicsThreadPoolQueueAdd) {
|
||||
pool->pauseadd = !val;
|
||||
}
|
||||
else if (opt == epicsThreadPoolQueueRun) {
|
||||
if (!val && !pool->pauserun)
|
||||
pool->pauserun = 1;
|
||||
|
||||
else if (val && pool->pauserun) {
|
||||
int jobs = ellCount(&pool->jobs);
|
||||
pool->pauserun = 0;
|
||||
|
||||
if (jobs) {
|
||||
int wakeable = pool->threadsSleeping - pool->threadsWaking;
|
||||
|
||||
/* first try to give jobs to sleeping workers */
|
||||
if (wakeable) {
|
||||
int wakeup = jobs > wakeable ? wakeable : jobs;
|
||||
assert(wakeup > 0);
|
||||
jobs -= wakeup;
|
||||
pool->threadsWaking += wakeup;
|
||||
epicsEventSignal(pool->workerWakeup);
|
||||
CHECKCOUNT(pool);
|
||||
}
|
||||
}
|
||||
while (jobs-- && pool->threadsRunning < pool->conf.maxThreads) {
|
||||
if (createPoolThread(pool) == 0) {
|
||||
pool->threadsWaking++;
|
||||
epicsEventSignal(pool->workerWakeup);
|
||||
}
|
||||
else
|
||||
break; /* oops, couldn't create worker */
|
||||
}
|
||||
CHECKCOUNT(pool);
|
||||
}
|
||||
}
|
||||
/* unknown options ignored */
|
||||
|
||||
}
|
||||
|
||||
void epicsThreadPoolControl(epicsThreadPool *pool, epicsThreadPoolOption opt, unsigned int val)
|
||||
{
|
||||
epicsMutexMustLock(pool->guard);
|
||||
epicsThreadPoolControlImpl(pool, opt, val);
|
||||
epicsMutexUnlock(pool->guard);
|
||||
}
|
||||
|
||||
int epicsThreadPoolWait(epicsThreadPool *pool, double timeout)
|
||||
{
|
||||
int ret = 0;
|
||||
epicsMutexMustLock(pool->guard);
|
||||
|
||||
while (ellCount(&pool->jobs) > 0 || pool->threadsAreAwake > 0) {
|
||||
pool->observerCount++;
|
||||
epicsMutexUnlock(pool->guard);
|
||||
|
||||
if (timeout < 0.0) {
|
||||
epicsEventMustWait(pool->observerWakeup);
|
||||
}
|
||||
else {
|
||||
switch (epicsEventWaitWithTimeout(pool->observerWakeup, timeout)) {
|
||||
case epicsEventWaitError:
|
||||
cantProceed("epicsThreadPoolWait: failed to wait for Event");
|
||||
break;
|
||||
case epicsEventWaitTimeout:
|
||||
ret = ETIMEDOUT;
|
||||
break;
|
||||
case epicsEventWaitOK:
|
||||
ret = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
epicsMutexMustLock(pool->guard);
|
||||
pool->observerCount--;
|
||||
|
||||
if (pool->observerCount)
|
||||
epicsEventSignal(pool->observerWakeup);
|
||||
|
||||
if (ret != 0)
|
||||
break;
|
||||
}
|
||||
|
||||
epicsMutexUnlock(pool->guard);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void epicsThreadPoolDestroy(epicsThreadPool *pool)
|
||||
{
|
||||
unsigned int nThr;
|
||||
ELLLIST notify;
|
||||
ELLNODE *cur;
|
||||
|
||||
if (!pool)
|
||||
return;
|
||||
|
||||
ellInit(¬ify);
|
||||
|
||||
epicsMutexMustLock(pool->guard);
|
||||
|
||||
/* run remaining queued jobs */
|
||||
epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueAdd, 0);
|
||||
epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueRun, 1);
|
||||
nThr = pool->threadsRunning;
|
||||
pool->freezeopt = 1;
|
||||
|
||||
epicsMutexUnlock(pool->guard);
|
||||
|
||||
epicsThreadPoolWait(pool, -1.0);
|
||||
/* At this point all queued jobs have run */
|
||||
|
||||
epicsMutexMustLock(pool->guard);
|
||||
|
||||
pool->shutdown = 1;
|
||||
/* wakeup all */
|
||||
if (pool->threadsWaking < pool->threadsSleeping) {
|
||||
pool->threadsWaking = pool->threadsSleeping;
|
||||
epicsEventSignal(pool->workerWakeup);
|
||||
}
|
||||
|
||||
ellConcat(¬ify, &pool->owned);
|
||||
ellConcat(¬ify, &pool->jobs);
|
||||
|
||||
epicsMutexUnlock(pool->guard);
|
||||
|
||||
if (nThr && epicsEventWait(pool->shutdownEvent) != epicsEventWaitOK){
|
||||
errlogMessage("epicsThreadPoolDestroy: wait error");
|
||||
return;
|
||||
}
|
||||
|
||||
/* all workers are now shutdown */
|
||||
|
||||
/* notify remaining jobs that pool is being destroyed */
|
||||
while ((cur = ellGet(¬ify)) != NULL) {
|
||||
epicsJob *job = CONTAINER(cur, epicsJob, jobnode);
|
||||
|
||||
job->running = 1;
|
||||
job->func(job->arg, epicsJobModeCleanup);
|
||||
job->running = 0;
|
||||
if (job->freewhendone)
|
||||
free(job);
|
||||
else
|
||||
job->pool = NULL; /* orphan */
|
||||
}
|
||||
|
||||
epicsEventDestroy(pool->workerWakeup);
|
||||
epicsEventDestroy(pool->shutdownEvent);
|
||||
epicsEventDestroy(pool->observerWakeup);
|
||||
epicsMutexDestroy(pool->guard);
|
||||
|
||||
free(pool);
|
||||
}
|
||||
|
||||
|
||||
void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd)
|
||||
{
|
||||
ELLNODE *cur;
|
||||
epicsMutexMustLock(pool->guard);
|
||||
|
||||
fprintf(fd, "Thread Pool with %u/%u threads\n"
|
||||
" running %d jobs with %u threads\n",
|
||||
pool->threadsRunning,
|
||||
pool->conf.maxThreads,
|
||||
ellCount(&pool->jobs),
|
||||
pool->threadsAreAwake);
|
||||
if (pool->pauseadd)
|
||||
fprintf(fd, " Inhibit queueing\n");
|
||||
if (pool->pauserun)
|
||||
fprintf(fd, " Pause workers\n");
|
||||
if (pool->shutdown)
|
||||
fprintf(fd, " Shutdown in progress\n");
|
||||
|
||||
for (cur = ellFirst(&pool->jobs); cur; cur = ellNext(cur)) {
|
||||
epicsJob *job = CONTAINER(cur, epicsJob, jobnode);
|
||||
|
||||
fprintf(fd, " job %p func: %p, arg: %p ",
|
||||
job, job->func,
|
||||
job->arg);
|
||||
if (job->queued)
|
||||
fprintf(fd, "Queued ");
|
||||
if (job->running)
|
||||
fprintf(fd, "Running ");
|
||||
if (job->freewhendone)
|
||||
fprintf(fd, "Free ");
|
||||
fprintf(fd, "\n");
|
||||
}
|
||||
|
||||
epicsMutexUnlock(pool->guard);
|
||||
}
|
||||
|
||||
unsigned int epicsThreadPoolNThreads(epicsThreadPool *pool)
|
||||
{
|
||||
unsigned int ret;
|
||||
|
||||
epicsMutexMustLock(pool->guard);
|
||||
ret = pool->threadsRunning;
|
||||
epicsMutexUnlock(pool->guard);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static
|
||||
ELLLIST sharedPools = ELLLIST_INIT;
|
||||
|
||||
static
|
||||
epicsMutexId sharedPoolsGuard;
|
||||
|
||||
static
|
||||
epicsThreadOnceId sharedPoolsOnce = EPICS_THREAD_ONCE_INIT;
|
||||
|
||||
static
|
||||
void sharedPoolsInit(void* unused)
|
||||
{
|
||||
sharedPoolsGuard = epicsMutexMustCreate();
|
||||
}
|
||||
|
||||
epicsShareFunc epicsThreadPool* epicsThreadPoolGetShared(epicsThreadPoolConfig *opts)
|
||||
{
|
||||
ELLNODE *node;
|
||||
epicsThreadPool *cur;
|
||||
epicsThreadPoolConfig defopts;
|
||||
size_t N = epicsThreadGetCPUs();
|
||||
|
||||
if (!opts) {
|
||||
epicsThreadPoolConfigDefaults(&defopts);
|
||||
opts = &defopts;
|
||||
}
|
||||
/* shared pools must have a minimum allowed number of workers.
|
||||
* Use the number of CPU cores
|
||||
*/
|
||||
if (opts->maxThreads < N)
|
||||
opts->maxThreads = N;
|
||||
|
||||
epicsThreadOnce(&sharedPoolsOnce, &sharedPoolsInit, NULL);
|
||||
|
||||
epicsMutexMustLock(sharedPoolsGuard);
|
||||
|
||||
for (node = ellFirst(&sharedPools); node; node = ellNext(node)) {
|
||||
cur = CONTAINER(node, epicsThreadPool, sharedNode);
|
||||
|
||||
/* Must have exactly the requested priority
|
||||
* At least the requested max workers
|
||||
* and at least the requested stack size
|
||||
*/
|
||||
if (cur->conf.workerPriority != opts->workerPriority)
|
||||
continue;
|
||||
if (cur->conf.maxThreads < opts->maxThreads)
|
||||
continue;
|
||||
if (cur->conf.workerStack < opts->workerStack)
|
||||
continue;
|
||||
|
||||
cur->sharedCount++;
|
||||
assert(cur->sharedCount > 0);
|
||||
epicsMutexUnlock(sharedPoolsGuard);
|
||||
|
||||
epicsMutexMustLock(cur->guard);
|
||||
*opts = cur->conf;
|
||||
epicsMutexUnlock(cur->guard);
|
||||
return cur;
|
||||
}
|
||||
|
||||
cur = epicsThreadPoolCreate(opts);
|
||||
if (!cur) {
|
||||
epicsMutexUnlock(sharedPoolsGuard);
|
||||
return NULL;
|
||||
}
|
||||
cur->sharedCount = 1;
|
||||
|
||||
ellAdd(&sharedPools, &cur->sharedNode);
|
||||
epicsMutexUnlock(sharedPoolsGuard);
|
||||
return cur;
|
||||
}
|
||||
|
||||
epicsShareFunc void epicsThreadPoolReleaseShared(epicsThreadPool *pool)
|
||||
{
|
||||
if (!pool)
|
||||
return;
|
||||
|
||||
epicsMutexMustLock(sharedPoolsGuard);
|
||||
|
||||
assert(pool->sharedCount > 0);
|
||||
|
||||
pool->sharedCount--;
|
||||
|
||||
if (pool->sharedCount == 0) {
|
||||
ellDelete(&sharedPools, &pool->sharedNode);
|
||||
epicsThreadPoolDestroy(pool);
|
||||
}
|
||||
|
||||
epicsMutexUnlock(sharedPoolsGuard);
|
||||
}
|
||||
@@ -106,6 +106,11 @@ epicsThreadHooksTest_SRCS += epicsThreadHooksTest.c
|
||||
testHarness_SRCS += epicsThreadHooksTest.c
|
||||
TESTS += epicsThreadHooksTest
|
||||
|
||||
TESTPROD_HOST += epicsThreadPoolTest
|
||||
epicsThreadPoolTest_SRCS += epicsThreadPoolTest.c
|
||||
testHarness_SRCS += epicsThreadPoolTest.c
|
||||
TESTS += epicsThreadPoolTest
|
||||
|
||||
TESTPROD_HOST += epicsExitTest
|
||||
epicsExitTest_SRCS += epicsExitTest.c
|
||||
testHarness_SRCS += epicsExitTest.c
|
||||
|
||||
@@ -35,6 +35,7 @@ int epicsThreadOnceTest(void);
|
||||
int epicsThreadPriorityTest(void);
|
||||
int epicsThreadPrivateTest(void);
|
||||
int epicsThreadHooksTest(void);
|
||||
int epicsThreadPoolTest(void);
|
||||
int epicsTimeTest(void);
|
||||
int epicsTypesTest(void);
|
||||
int macLibTest(void);
|
||||
@@ -94,6 +95,8 @@ void epicsRunLibComTests(void)
|
||||
|
||||
runTest(epicsThreadHooksTest);
|
||||
|
||||
runTest(epicsThreadPoolTest);
|
||||
|
||||
runTest(epicsTimeTest);
|
||||
|
||||
runTest(epicsTypesTest);
|
||||
|
||||
@@ -0,0 +1,447 @@
|
||||
/*************************************************************************\
|
||||
* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
|
||||
* Brookhaven National Laboratory.
|
||||
* EPICS BASE is distributed subject to a Software License Agreement found
|
||||
* in file LICENSE that is included with this distribution.
|
||||
\*************************************************************************/
|
||||
|
||||
#include "epicsThreadPool.h"
|
||||
|
||||
/* included to allow tests to peek */
|
||||
#include "../../pool/poolPriv.h"
|
||||
|
||||
#include "testMain.h"
|
||||
#include "epicsUnitTest.h"
|
||||
|
||||
#include "cantProceed.h"
|
||||
#include "epicsEvent.h"
|
||||
#include "epicsMutex.h"
|
||||
#include "epicsThread.h"
|
||||
|
||||
/* Do nothing */
|
||||
static void nullop(void)
|
||||
{
|
||||
epicsThreadPool *pool;
|
||||
testDiag("nullop()");
|
||||
{
|
||||
epicsThreadPoolConfig conf;
|
||||
epicsThreadPoolConfigDefaults(&conf);
|
||||
testOk1(conf.maxThreads>0);
|
||||
|
||||
testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
|
||||
if(!pool)
|
||||
return;
|
||||
}
|
||||
|
||||
epicsThreadPoolDestroy(pool);
|
||||
}
|
||||
|
||||
/* Just create and destroy worker threads */
|
||||
static void oneop(void)
|
||||
{
|
||||
epicsThreadPool *pool;
|
||||
testDiag("oneop()");
|
||||
{
|
||||
epicsThreadPoolConfig conf;
|
||||
epicsThreadPoolConfigDefaults(&conf);
|
||||
conf.initialThreads=2;
|
||||
testOk1(conf.maxThreads>0);
|
||||
|
||||
testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
|
||||
if(!pool)
|
||||
return;
|
||||
}
|
||||
|
||||
epicsThreadPoolDestroy(pool);
|
||||
}
|
||||
|
||||
/* Test that Bursts of jobs will create enough threads to
|
||||
* run all in parallel
|
||||
*/
|
||||
typedef struct {
|
||||
epicsMutexId guard;
|
||||
unsigned int count;
|
||||
epicsEventId allrunning;
|
||||
epicsEventId done;
|
||||
epicsJob **job;
|
||||
} countPriv;
|
||||
|
||||
static void countjob(void *param, epicsJobMode mode)
|
||||
{
|
||||
countPriv *cnt=param;
|
||||
testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
|
||||
if(mode==epicsJobModeCleanup)
|
||||
return;
|
||||
|
||||
epicsMutexMustLock(cnt->guard);
|
||||
testDiag("Job %lu", (unsigned long)cnt->count);
|
||||
cnt->count--;
|
||||
if(cnt->count==0) {
|
||||
testDiag("All jobs running");
|
||||
epicsEventSignal(cnt->allrunning);
|
||||
}
|
||||
epicsMutexUnlock(cnt->guard);
|
||||
|
||||
epicsEventMustWait(cnt->done);
|
||||
epicsEventSignal(cnt->done); /* pass along to next thread */
|
||||
}
|
||||
|
||||
/* Starts "mcnt" jobs in a pool with initial and max
|
||||
* thread counts "icnt" and "mcnt".
|
||||
* The test ensures that all jobs run in parallel.
|
||||
* "cork" checks the function of pausing the run queue
|
||||
* with epicsThreadPoolQueueRun
|
||||
*/
|
||||
static void postjobs(size_t icnt, size_t mcnt, int cork)
|
||||
{
|
||||
size_t i;
|
||||
epicsThreadPool *pool;
|
||||
countPriv *priv=callocMustSucceed(1, sizeof(*priv), "postjobs priv alloc");
|
||||
priv->guard=epicsMutexMustCreate();
|
||||
priv->done=epicsEventMustCreate(epicsEventEmpty);
|
||||
priv->allrunning=epicsEventMustCreate(epicsEventEmpty);
|
||||
priv->count=mcnt;
|
||||
priv->job=callocMustSucceed(mcnt, sizeof(*priv->job), "postjobs job array");
|
||||
|
||||
testDiag("postjobs(%lu,%lu)", (unsigned long)icnt, (unsigned long)mcnt);
|
||||
|
||||
{
|
||||
epicsThreadPoolConfig conf;
|
||||
epicsThreadPoolConfigDefaults(&conf);
|
||||
conf.initialThreads=icnt;
|
||||
conf.maxThreads=mcnt;
|
||||
|
||||
testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
|
||||
if(!pool)
|
||||
return;
|
||||
}
|
||||
|
||||
if(cork)
|
||||
epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0);
|
||||
|
||||
for(i=0; i<mcnt; i++) {
|
||||
testDiag("i=%lu", (unsigned long)i);
|
||||
priv->job[i] = epicsJobCreate(pool, &countjob, priv);
|
||||
testOk1(priv->job[i]!=NULL);
|
||||
testOk1(epicsJobQueue(priv->job[i])==0);
|
||||
}
|
||||
|
||||
if(cork) {
|
||||
/* no jobs should have run */
|
||||
epicsMutexMustLock(priv->guard);
|
||||
testOk1(priv->count==mcnt);
|
||||
epicsMutexUnlock(priv->guard);
|
||||
|
||||
epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1);
|
||||
}
|
||||
|
||||
testDiag("Waiting for all jobs to start");
|
||||
epicsEventMustWait(priv->allrunning);
|
||||
testDiag("Stop all");
|
||||
epicsEventSignal(priv->done);
|
||||
|
||||
for(i=0; i<mcnt; i++) {
|
||||
testDiag("i=%lu", (unsigned long)i);
|
||||
epicsJobDestroy(priv->job[i]);
|
||||
}
|
||||
|
||||
epicsThreadPoolDestroy(pool);
|
||||
epicsMutexDestroy(priv->guard);
|
||||
epicsEventDestroy(priv->allrunning);
|
||||
epicsEventDestroy(priv->done);
|
||||
free(priv->job);
|
||||
free(priv);
|
||||
}
|
||||
|
||||
static unsigned int flag0 = 0;
|
||||
|
||||
/* Test cancel from job (no-op)
|
||||
* and destroy from job (lazy free)
|
||||
*/
|
||||
static void cleanupjob0(void* arg, epicsJobMode mode)
|
||||
{
|
||||
epicsJob *job=arg;
|
||||
testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
|
||||
if(mode==epicsJobModeCleanup)
|
||||
return;
|
||||
|
||||
assert(flag0==0);
|
||||
flag0=1;
|
||||
|
||||
testOk1(epicsJobQueue(job)==0);
|
||||
|
||||
epicsJobDestroy(job); /* delete while job is running */
|
||||
}
|
||||
static void cleanupjob1(void* arg, epicsJobMode mode)
|
||||
{
|
||||
epicsJob *job=arg;
|
||||
testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
|
||||
if(mode==epicsJobModeCleanup)
|
||||
return;
|
||||
|
||||
testOk1(epicsJobQueue(job)==0);
|
||||
|
||||
testOk1(epicsJobUnqueue(job)==0);
|
||||
/* delete later after job finishes, but before pool is destroyed */
|
||||
}
|
||||
static void cleanupjob2(void* arg, epicsJobMode mode)
|
||||
{
|
||||
epicsJob *job=arg;
|
||||
testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
|
||||
if(mode==epicsJobModeCleanup)
|
||||
epicsJobDestroy(job); /* delete when threadpool is destroyed */
|
||||
else if(mode==epicsJobModeRun)
|
||||
testOk1(epicsJobUnqueue(job)==1);
|
||||
}
|
||||
static epicsJobFunction cleanupjobs[3] = {&cleanupjob0,&cleanupjob1,&cleanupjob2};
|
||||
|
||||
/* Tests three methods for job cleanup.
|
||||
* 1. destroy which running
|
||||
* 2. deferred cleanup after pool destroyed
|
||||
* 3. immediate cleanup when pool destroyed
|
||||
*/
|
||||
static void testcleanup(void)
|
||||
{
|
||||
int i=0;
|
||||
epicsThreadPool *pool;
|
||||
epicsJob *job[3];
|
||||
|
||||
testDiag("testcleanup()");
|
||||
|
||||
testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
|
||||
if(!pool)
|
||||
return;
|
||||
|
||||
/* unrolled so that valgrind can show which methods leaks */
|
||||
testOk1((job[0]=epicsJobCreate(pool, cleanupjobs[0], EPICSJOB_SELF))!=NULL);
|
||||
testOk1((job[1]=epicsJobCreate(pool, cleanupjobs[1], EPICSJOB_SELF))!=NULL);
|
||||
testOk1((job[2]=epicsJobCreate(pool, cleanupjobs[2], EPICSJOB_SELF))!=NULL);
|
||||
for(i=0; i<3; i++) {
|
||||
testOk1(epicsJobQueue(job[i])==0);
|
||||
}
|
||||
|
||||
epicsThreadPoolWait(pool, -1);
|
||||
epicsJobDestroy(job[1]);
|
||||
epicsThreadPoolDestroy(pool);
|
||||
}
|
||||
|
||||
/* Test re-add from inside job */
|
||||
typedef struct {
|
||||
unsigned int count;
|
||||
epicsEventId done;
|
||||
epicsJob *job;
|
||||
unsigned int inprogress;
|
||||
} readdPriv;
|
||||
|
||||
static void readdjob(void *arg, epicsJobMode mode)
|
||||
{
|
||||
readdPriv *priv=arg;
|
||||
testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
|
||||
if(mode==epicsJobModeCleanup)
|
||||
return;
|
||||
testOk1(priv->inprogress==0);
|
||||
testDiag("count==%u", priv->count);
|
||||
|
||||
if(priv->count--) {
|
||||
priv->inprogress=1;
|
||||
epicsJobQueue(priv->job);
|
||||
epicsThreadSleep(0.05);
|
||||
priv->inprogress=0;
|
||||
}else{
|
||||
epicsEventSignal(priv->done);
|
||||
epicsJobDestroy(priv->job);
|
||||
}
|
||||
}
|
||||
|
||||
/* Test re-queueing a job while it is running.
|
||||
* Check that a single job won't run concurrently.
|
||||
*/
|
||||
static void testreadd(void) {
|
||||
epicsThreadPoolConfig conf;
|
||||
epicsThreadPool *pool;
|
||||
readdPriv *priv=callocMustSucceed(1, sizeof(*priv), "testcleanup priv");
|
||||
readdPriv *priv2=callocMustSucceed(1, sizeof(*priv), "testcleanup priv");
|
||||
|
||||
testDiag("testreadd");
|
||||
|
||||
priv->done=epicsEventMustCreate(epicsEventEmpty);
|
||||
priv->count=5;
|
||||
priv2->done=epicsEventMustCreate(epicsEventEmpty);
|
||||
priv2->count=5;
|
||||
|
||||
epicsThreadPoolConfigDefaults(&conf);
|
||||
conf.maxThreads = 2;
|
||||
testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
|
||||
if(!pool)
|
||||
return;
|
||||
|
||||
testOk1((priv->job=epicsJobCreate(pool, &readdjob, priv))!=NULL);
|
||||
testOk1((priv2->job=epicsJobCreate(pool, &readdjob, priv2))!=NULL);
|
||||
|
||||
testOk1(epicsJobQueue(priv->job)==0);
|
||||
testOk1(epicsJobQueue(priv2->job)==0);
|
||||
epicsEventMustWait(priv->done);
|
||||
epicsEventMustWait(priv2->done);
|
||||
|
||||
testOk1(epicsThreadPoolNThreads(pool)==2);
|
||||
testDiag("epicsThreadPoolNThreads = %d", epicsThreadPoolNThreads(pool));
|
||||
|
||||
epicsThreadPoolDestroy(pool);
|
||||
epicsEventDestroy(priv->done);
|
||||
epicsEventDestroy(priv2->done);
|
||||
free(priv);
|
||||
free(priv2);
|
||||
|
||||
}
|
||||
|
||||
static int shouldneverrun = 0;
|
||||
static int numtoolate = 0;
|
||||
|
||||
/* test job canceling */
|
||||
static
|
||||
void neverrun(void *arg, epicsJobMode mode)
|
||||
{
|
||||
epicsJob *job=arg;
|
||||
testOk1(mode==epicsJobModeCleanup);
|
||||
if(mode==epicsJobModeCleanup)
|
||||
epicsJobDestroy(job);
|
||||
else
|
||||
shouldneverrun++;
|
||||
}
|
||||
static epicsEventId cancel[2];
|
||||
static
|
||||
void toolate(void *arg, epicsJobMode mode)
|
||||
{
|
||||
epicsJob *job=arg;
|
||||
if(mode==epicsJobModeCleanup){
|
||||
epicsJobDestroy(job);
|
||||
return;
|
||||
}
|
||||
testPass("Job runs");
|
||||
numtoolate++;
|
||||
epicsEventSignal(cancel[0]);
|
||||
epicsEventMustWait(cancel[1]);
|
||||
}
|
||||
|
||||
static
|
||||
void testcancel(void)
|
||||
{
|
||||
epicsJob *job[2];
|
||||
epicsThreadPool *pool;
|
||||
testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
|
||||
if(!pool)
|
||||
return;
|
||||
|
||||
cancel[0]=epicsEventCreate(epicsEventEmpty);
|
||||
cancel[1]=epicsEventCreate(epicsEventEmpty);
|
||||
|
||||
testOk1((job[0]=epicsJobCreate(pool, &neverrun, EPICSJOB_SELF))!=NULL);
|
||||
testOk1((job[1]=epicsJobCreate(pool, &toolate, EPICSJOB_SELF))!=NULL);
|
||||
|
||||
/* freeze */
|
||||
epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0);
|
||||
|
||||
testOk1(epicsJobUnqueue(job[0])==1); /* not queued yet */
|
||||
|
||||
epicsJobQueue(job[0]);
|
||||
testOk1(epicsJobUnqueue(job[0])==0);
|
||||
testOk1(epicsJobUnqueue(job[0])==1);
|
||||
|
||||
epicsThreadSleep(0.01);
|
||||
epicsJobQueue(job[0]);
|
||||
testOk1(epicsJobUnqueue(job[0])==0);
|
||||
testOk1(epicsJobUnqueue(job[0])==1);
|
||||
|
||||
epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1);
|
||||
|
||||
epicsJobQueue(job[1]); /* actually let it run this time */
|
||||
|
||||
epicsEventMustWait(cancel[0]);
|
||||
testOk1(epicsJobUnqueue(job[0])==1);
|
||||
epicsEventSignal(cancel[1]);
|
||||
|
||||
epicsThreadPoolDestroy(pool);
|
||||
epicsEventDestroy(cancel[0]);
|
||||
epicsEventDestroy(cancel[1]);
|
||||
|
||||
testOk1(shouldneverrun==0);
|
||||
testOk1(numtoolate==1);
|
||||
}
|
||||
|
||||
static
|
||||
unsigned int sharedWasDeleted=0;
|
||||
|
||||
static
|
||||
void lastjob(void *arg, epicsJobMode mode)
|
||||
{
|
||||
epicsJob *job=arg;
|
||||
if(mode==epicsJobModeCleanup) {
|
||||
sharedWasDeleted=1;
|
||||
epicsJobDestroy(job);
|
||||
}
|
||||
}
|
||||
|
||||
static
|
||||
void testshared(void)
|
||||
{
|
||||
epicsThreadPool *poolA, *poolB;
|
||||
epicsThreadPoolConfig conf;
|
||||
epicsJob *job;
|
||||
|
||||
epicsThreadPoolConfigDefaults(&conf);
|
||||
|
||||
testDiag("Check reference counting of shared pools");
|
||||
|
||||
testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL);
|
||||
|
||||
testOk1(poolA->sharedCount==1);
|
||||
|
||||
testOk1((poolB=epicsThreadPoolGetShared(&conf))!=NULL);
|
||||
|
||||
testOk1(poolA==poolB);
|
||||
|
||||
testOk1(poolA->sharedCount==2);
|
||||
|
||||
epicsThreadPoolReleaseShared(poolA);
|
||||
|
||||
testOk1(poolB->sharedCount==1);
|
||||
|
||||
testOk1((job=epicsJobCreate(poolB, &lastjob, EPICSJOB_SELF))!=NULL);
|
||||
|
||||
epicsThreadPoolReleaseShared(poolB);
|
||||
|
||||
testOk1(sharedWasDeleted==1);
|
||||
|
||||
testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL);
|
||||
|
||||
testOk1(poolA->sharedCount==1);
|
||||
|
||||
epicsThreadPoolReleaseShared(poolA);
|
||||
|
||||
}
|
||||
|
||||
MAIN(epicsThreadPoolTest)
|
||||
{
|
||||
testPlan(171);
|
||||
|
||||
nullop();
|
||||
oneop();
|
||||
testDiag("Queue with delayed start");
|
||||
postjobs(1,1,1);
|
||||
postjobs(0,1,1);
|
||||
postjobs(4,4,1);
|
||||
postjobs(0,4,1);
|
||||
postjobs(2,4,1);
|
||||
testDiag("Queue with immediate start");
|
||||
postjobs(1,1,0);
|
||||
postjobs(0,1,0);
|
||||
postjobs(4,4,0);
|
||||
postjobs(0,4,0);
|
||||
postjobs(2,4,0);
|
||||
testcleanup();
|
||||
testreadd();
|
||||
testcancel();
|
||||
testshared();
|
||||
|
||||
return testDone();
|
||||
}
|
||||
Reference in New Issue
Block a user