448 lines
11 KiB
C
448 lines
11 KiB
C
/*************************************************************************\
|
|
* Copyright (c) 2014 Brookhaven Science Associates, as Operator of
|
|
* Brookhaven National Laboratory.
|
|
* EPICS BASE is distributed subject to a Software License Agreement found
|
|
* in file LICENSE that is included with this distribution.
|
|
\*************************************************************************/
|
|
|
|
#include "epicsThreadPool.h"
|
|
|
|
/* included to allow tests to peek */
|
|
#include "../../src/pool/poolPriv.h"
|
|
|
|
#include "testMain.h"
|
|
#include "epicsUnitTest.h"
|
|
|
|
#include "cantProceed.h"
|
|
#include "epicsEvent.h"
|
|
#include "epicsMutex.h"
|
|
#include "epicsThread.h"
|
|
|
|
/* Do nothing */
|
|
static void nullop(void)
|
|
{
|
|
epicsThreadPool *pool;
|
|
testDiag("nullop()");
|
|
{
|
|
epicsThreadPoolConfig conf;
|
|
epicsThreadPoolConfigDefaults(&conf);
|
|
testOk1(conf.maxThreads>0);
|
|
|
|
testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
|
|
if(!pool)
|
|
return;
|
|
}
|
|
|
|
epicsThreadPoolDestroy(pool);
|
|
}
|
|
|
|
/* Just create and destroy worker threads */
|
|
static void oneop(void)
|
|
{
|
|
epicsThreadPool *pool;
|
|
testDiag("oneop()");
|
|
{
|
|
epicsThreadPoolConfig conf;
|
|
epicsThreadPoolConfigDefaults(&conf);
|
|
conf.initialThreads=2;
|
|
testOk1(conf.maxThreads>0);
|
|
|
|
testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
|
|
if(!pool)
|
|
return;
|
|
}
|
|
|
|
epicsThreadPoolDestroy(pool);
|
|
}
|
|
|
|
/* Test that Bursts of jobs will create enough threads to
|
|
* run all in parallel
|
|
*/
|
|
typedef struct {
|
|
epicsMutexId guard;
|
|
unsigned int count;
|
|
epicsEventId allrunning;
|
|
epicsEventId done;
|
|
epicsJob **job;
|
|
} countPriv;
|
|
|
|
static void countjob(void *param, epicsJobMode mode)
|
|
{
|
|
countPriv *cnt=param;
|
|
testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
|
|
if(mode==epicsJobModeCleanup)
|
|
return;
|
|
|
|
epicsMutexMustLock(cnt->guard);
|
|
testDiag("Job %lu", (unsigned long)cnt->count);
|
|
cnt->count--;
|
|
if(cnt->count==0) {
|
|
testDiag("All jobs running");
|
|
epicsEventSignal(cnt->allrunning);
|
|
}
|
|
epicsMutexUnlock(cnt->guard);
|
|
|
|
epicsEventMustWait(cnt->done);
|
|
epicsEventSignal(cnt->done); /* pass along to next thread */
|
|
}
|
|
|
|
/* Starts "mcnt" jobs in a pool with initial and max
|
|
* thread counts "icnt" and "mcnt".
|
|
* The test ensures that all jobs run in parallel.
|
|
* "cork" checks the function of pausing the run queue
|
|
* with epicsThreadPoolQueueRun
|
|
*/
|
|
static void postjobs(size_t icnt, size_t mcnt, int cork)
|
|
{
|
|
size_t i;
|
|
epicsThreadPool *pool;
|
|
countPriv *priv=callocMustSucceed(1, sizeof(*priv), "postjobs priv alloc");
|
|
priv->guard=epicsMutexMustCreate();
|
|
priv->done=epicsEventMustCreate(epicsEventEmpty);
|
|
priv->allrunning=epicsEventMustCreate(epicsEventEmpty);
|
|
priv->count=mcnt;
|
|
priv->job=callocMustSucceed(mcnt, sizeof(*priv->job), "postjobs job array");
|
|
|
|
testDiag("postjobs(%lu,%lu)", (unsigned long)icnt, (unsigned long)mcnt);
|
|
|
|
{
|
|
epicsThreadPoolConfig conf;
|
|
epicsThreadPoolConfigDefaults(&conf);
|
|
conf.initialThreads=icnt;
|
|
conf.maxThreads=mcnt;
|
|
|
|
testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
|
|
if(!pool)
|
|
return;
|
|
}
|
|
|
|
if(cork)
|
|
epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0);
|
|
|
|
for(i=0; i<mcnt; i++) {
|
|
testDiag("i=%lu", (unsigned long)i);
|
|
priv->job[i] = epicsJobCreate(pool, &countjob, priv);
|
|
testOk1(priv->job[i]!=NULL);
|
|
testOk1(epicsJobQueue(priv->job[i])==0);
|
|
}
|
|
|
|
if(cork) {
|
|
/* no jobs should have run */
|
|
epicsMutexMustLock(priv->guard);
|
|
testOk1(priv->count==mcnt);
|
|
epicsMutexUnlock(priv->guard);
|
|
|
|
epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1);
|
|
}
|
|
|
|
testDiag("Waiting for all jobs to start");
|
|
epicsEventMustWait(priv->allrunning);
|
|
testDiag("Stop all");
|
|
epicsEventSignal(priv->done);
|
|
|
|
for(i=0; i<mcnt; i++) {
|
|
testDiag("i=%lu", (unsigned long)i);
|
|
epicsJobDestroy(priv->job[i]);
|
|
}
|
|
|
|
epicsThreadPoolDestroy(pool);
|
|
epicsMutexDestroy(priv->guard);
|
|
epicsEventDestroy(priv->allrunning);
|
|
epicsEventDestroy(priv->done);
|
|
free(priv->job);
|
|
free(priv);
|
|
}
|
|
|
|
static unsigned int flag0 = 0;
|
|
|
|
/* Test cancel from job (no-op)
|
|
* and destroy from job (lazy free)
|
|
*/
|
|
static void cleanupjob0(void* arg, epicsJobMode mode)
|
|
{
|
|
epicsJob *job=arg;
|
|
testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
|
|
if(mode==epicsJobModeCleanup)
|
|
return;
|
|
|
|
assert(flag0==0);
|
|
flag0=1;
|
|
|
|
testOk1(epicsJobQueue(job)==0);
|
|
|
|
epicsJobDestroy(job); /* delete while job is running */
|
|
}
|
|
static void cleanupjob1(void* arg, epicsJobMode mode)
|
|
{
|
|
epicsJob *job=arg;
|
|
testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
|
|
if(mode==epicsJobModeCleanup)
|
|
return;
|
|
|
|
testOk1(epicsJobQueue(job)==0);
|
|
|
|
testOk1(epicsJobUnqueue(job)==0);
|
|
/* delete later after job finishes, but before pool is destroyed */
|
|
}
|
|
static void cleanupjob2(void* arg, epicsJobMode mode)
|
|
{
|
|
epicsJob *job=arg;
|
|
testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
|
|
if(mode==epicsJobModeCleanup)
|
|
epicsJobDestroy(job); /* delete when threadpool is destroyed */
|
|
else if(mode==epicsJobModeRun)
|
|
testOk1(epicsJobUnqueue(job)==S_pool_jobIdle);
|
|
}
|
|
static epicsJobFunction cleanupjobs[3] = {&cleanupjob0,&cleanupjob1,&cleanupjob2};
|
|
|
|
/* Tests three methods for job cleanup.
|
|
* 1. destroy which running
|
|
* 2. deferred cleanup after pool destroyed
|
|
* 3. immediate cleanup when pool destroyed
|
|
*/
|
|
static void testcleanup(void)
|
|
{
|
|
int i=0;
|
|
epicsThreadPool *pool;
|
|
epicsJob *job[3];
|
|
|
|
testDiag("testcleanup()");
|
|
|
|
testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
|
|
if(!pool)
|
|
return;
|
|
|
|
/* unrolled so that valgrind can show which methods leaks */
|
|
testOk1((job[0]=epicsJobCreate(pool, cleanupjobs[0], EPICSJOB_SELF))!=NULL);
|
|
testOk1((job[1]=epicsJobCreate(pool, cleanupjobs[1], EPICSJOB_SELF))!=NULL);
|
|
testOk1((job[2]=epicsJobCreate(pool, cleanupjobs[2], EPICSJOB_SELF))!=NULL);
|
|
for(i=0; i<3; i++) {
|
|
testOk1(epicsJobQueue(job[i])==0);
|
|
}
|
|
|
|
epicsThreadPoolWait(pool, -1);
|
|
epicsJobDestroy(job[1]);
|
|
epicsThreadPoolDestroy(pool);
|
|
}
|
|
|
|
/* Test re-add from inside job */
|
|
typedef struct {
|
|
unsigned int count;
|
|
epicsEventId done;
|
|
epicsJob *job;
|
|
unsigned int inprogress;
|
|
} readdPriv;
|
|
|
|
static void readdjob(void *arg, epicsJobMode mode)
|
|
{
|
|
readdPriv *priv=arg;
|
|
testOk1(mode==epicsJobModeRun||mode==epicsJobModeCleanup);
|
|
if(mode==epicsJobModeCleanup)
|
|
return;
|
|
testOk1(priv->inprogress==0);
|
|
testDiag("count==%u", priv->count);
|
|
|
|
if(priv->count--) {
|
|
priv->inprogress=1;
|
|
epicsJobQueue(priv->job);
|
|
epicsThreadSleep(0.05);
|
|
priv->inprogress=0;
|
|
}else{
|
|
epicsEventSignal(priv->done);
|
|
epicsJobDestroy(priv->job);
|
|
}
|
|
}
|
|
|
|
/* Test re-queueing a job while it is running.
|
|
* Check that a single job won't run concurrently.
|
|
*/
|
|
static void testreadd(void) {
|
|
epicsThreadPoolConfig conf;
|
|
epicsThreadPool *pool;
|
|
readdPriv *priv=callocMustSucceed(1, sizeof(*priv), "testcleanup priv");
|
|
readdPriv *priv2=callocMustSucceed(1, sizeof(*priv), "testcleanup priv");
|
|
|
|
testDiag("testreadd");
|
|
|
|
priv->done=epicsEventMustCreate(epicsEventEmpty);
|
|
priv->count=5;
|
|
priv2->done=epicsEventMustCreate(epicsEventEmpty);
|
|
priv2->count=5;
|
|
|
|
epicsThreadPoolConfigDefaults(&conf);
|
|
conf.maxThreads = 2;
|
|
testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
|
|
if(!pool)
|
|
return;
|
|
|
|
testOk1((priv->job=epicsJobCreate(pool, &readdjob, priv))!=NULL);
|
|
testOk1((priv2->job=epicsJobCreate(pool, &readdjob, priv2))!=NULL);
|
|
|
|
testOk1(epicsJobQueue(priv->job)==0);
|
|
testOk1(epicsJobQueue(priv2->job)==0);
|
|
epicsEventMustWait(priv->done);
|
|
epicsEventMustWait(priv2->done);
|
|
|
|
testOk1(epicsThreadPoolNThreads(pool)==2);
|
|
testDiag("epicsThreadPoolNThreads = %d", epicsThreadPoolNThreads(pool));
|
|
|
|
epicsThreadPoolDestroy(pool);
|
|
epicsEventDestroy(priv->done);
|
|
epicsEventDestroy(priv2->done);
|
|
free(priv);
|
|
free(priv2);
|
|
|
|
}
|
|
|
|
static int shouldneverrun = 0;
|
|
static int numtoolate = 0;
|
|
|
|
/* test job canceling */
|
|
static
|
|
void neverrun(void *arg, epicsJobMode mode)
|
|
{
|
|
epicsJob *job=arg;
|
|
testOk1(mode==epicsJobModeCleanup);
|
|
if(mode==epicsJobModeCleanup)
|
|
epicsJobDestroy(job);
|
|
else
|
|
shouldneverrun++;
|
|
}
|
|
static epicsEventId cancel[2];
|
|
static
|
|
void toolate(void *arg, epicsJobMode mode)
|
|
{
|
|
epicsJob *job=arg;
|
|
if(mode==epicsJobModeCleanup){
|
|
epicsJobDestroy(job);
|
|
return;
|
|
}
|
|
testPass("Job runs");
|
|
numtoolate++;
|
|
epicsEventSignal(cancel[0]);
|
|
epicsEventMustWait(cancel[1]);
|
|
}
|
|
|
|
static
|
|
void testcancel(void)
|
|
{
|
|
epicsJob *job[2];
|
|
epicsThreadPool *pool;
|
|
testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
|
|
if(!pool)
|
|
return;
|
|
|
|
cancel[0]=epicsEventCreate(epicsEventEmpty);
|
|
cancel[1]=epicsEventCreate(epicsEventEmpty);
|
|
|
|
testOk1((job[0]=epicsJobCreate(pool, &neverrun, EPICSJOB_SELF))!=NULL);
|
|
testOk1((job[1]=epicsJobCreate(pool, &toolate, EPICSJOB_SELF))!=NULL);
|
|
|
|
/* freeze */
|
|
epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 0);
|
|
|
|
testOk1(epicsJobUnqueue(job[0])==S_pool_jobIdle); /* not queued yet */
|
|
|
|
epicsJobQueue(job[0]);
|
|
testOk1(epicsJobUnqueue(job[0])==0);
|
|
testOk1(epicsJobUnqueue(job[0])==S_pool_jobIdle);
|
|
|
|
epicsThreadSleep(0.01);
|
|
epicsJobQueue(job[0]);
|
|
testOk1(epicsJobUnqueue(job[0])==0);
|
|
testOk1(epicsJobUnqueue(job[0])==S_pool_jobIdle);
|
|
|
|
epicsThreadPoolControl(pool, epicsThreadPoolQueueRun, 1);
|
|
|
|
epicsJobQueue(job[1]); /* actually let it run this time */
|
|
|
|
epicsEventMustWait(cancel[0]);
|
|
testOk1(epicsJobUnqueue(job[0])==S_pool_jobIdle);
|
|
epicsEventSignal(cancel[1]);
|
|
|
|
epicsThreadPoolDestroy(pool);
|
|
epicsEventDestroy(cancel[0]);
|
|
epicsEventDestroy(cancel[1]);
|
|
|
|
testOk1(shouldneverrun==0);
|
|
testOk1(numtoolate==1);
|
|
}
|
|
|
|
static
|
|
unsigned int sharedWasDeleted=0;
|
|
|
|
static
|
|
void lastjob(void *arg, epicsJobMode mode)
|
|
{
|
|
epicsJob *job=arg;
|
|
if(mode==epicsJobModeCleanup) {
|
|
sharedWasDeleted=1;
|
|
epicsJobDestroy(job);
|
|
}
|
|
}
|
|
|
|
static
|
|
void testshared(void)
|
|
{
|
|
epicsThreadPool *poolA, *poolB;
|
|
epicsThreadPoolConfig conf;
|
|
epicsJob *job;
|
|
|
|
epicsThreadPoolConfigDefaults(&conf);
|
|
|
|
testDiag("Check reference counting of shared pools");
|
|
|
|
testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL);
|
|
|
|
testOk1(poolA->sharedCount==1);
|
|
|
|
testOk1((poolB=epicsThreadPoolGetShared(&conf))!=NULL);
|
|
|
|
testOk1(poolA==poolB);
|
|
|
|
testOk1(poolA->sharedCount==2);
|
|
|
|
epicsThreadPoolReleaseShared(poolA);
|
|
|
|
testOk1(poolB->sharedCount==1);
|
|
|
|
testOk1((job=epicsJobCreate(poolB, &lastjob, EPICSJOB_SELF))!=NULL);
|
|
|
|
epicsThreadPoolReleaseShared(poolB);
|
|
|
|
testOk1(sharedWasDeleted==1);
|
|
|
|
testOk1((poolA=epicsThreadPoolGetShared(&conf))!=NULL);
|
|
|
|
testOk1(poolA->sharedCount==1);
|
|
|
|
epicsThreadPoolReleaseShared(poolA);
|
|
|
|
}
|
|
|
|
MAIN(epicsThreadPoolTest)
|
|
{
|
|
testPlan(171);
|
|
|
|
nullop();
|
|
oneop();
|
|
testDiag("Queue with delayed start");
|
|
postjobs(1,1,1);
|
|
postjobs(0,1,1);
|
|
postjobs(4,4,1);
|
|
postjobs(0,4,1);
|
|
postjobs(2,4,1);
|
|
testDiag("Queue with immediate start");
|
|
postjobs(1,1,0);
|
|
postjobs(0,1,0);
|
|
postjobs(4,4,0);
|
|
postjobs(0,4,0);
|
|
postjobs(2,4,0);
|
|
testcleanup();
|
|
testreadd();
|
|
testcancel();
|
|
testshared();
|
|
|
|
return testDone();
|
|
}
|