diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index bd20304a3..719f5ced6 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -3,8 +3,9 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found -* in file LICENSE that is included with this distribution. +* in file LICENSE that is included with this distribution. \*************************************************************************/ /* callback.c */ @@ -25,6 +26,7 @@ #include "epicsThread.h" #include "epicsExit.h" #include "epicsInterrupt.h" +#include "epicsString.h" #include "epicsTimer.h" #include "epicsRingPointer.h" #include "errlog.h" @@ -36,6 +38,7 @@ #include "taskwd.h" #include "errMdef.h" #include "dbCommon.h" +#include "epicsExport.h" #define epicsExportSharedSymbols #include "dbAddr.h" #include "dbAccessDefs.h" @@ -48,6 +51,12 @@ static epicsEventId callbackSem[NUM_CALLBACK_PRIORITIES]; static epicsRingPointerId callbackQ[NUM_CALLBACK_PRIORITIES]; static volatile int ringOverflow[NUM_CALLBACK_PRIORITIES]; +/* Parallel callback threads (configured and actual counts) */ +static int callbackThreadCount[NUM_CALLBACK_PRIORITIES] = { 1, 1, 1 }; +static int callbackThreadsRunning[NUM_CALLBACK_PRIORITIES]; +int callbackParallelThreadsDefault = 2; +epicsExportAddress(int,callbackParallelThreadsDefault); + /* Timer for Delayed Requests */ static epicsTimerQueueId timerQueue; @@ -58,7 +67,7 @@ static epicsEventId startStopEvent; static void *exitCallback; /* Static data */ -static char *threadName[NUM_CALLBACK_PRIORITIES] = { +static char *threadNamePrefix[NUM_CALLBACK_PRIORITIES] = { "cbLow", "cbMedium", "cbHigh" }; static unsigned int threadPriority[NUM_CALLBACK_PRIORITIES] = { @@ -79,6 +88,54 @@ int callbackSetQueueSize(int size) return 0; } +int callbackParallelThreads(int count, const char *prio) +{ + int i; + dbMenu *pdbMenu; + int gotMatch; + + if (startStopEvent) { + errlogPrintf("Callback system already initialized\n"); + return -1; + } + + if (count < 0) + count = epicsThreadGetCPUs() + count; + else if (count == 0) + count = callbackParallelThreadsDefault; + + if (!prio || strcmp(prio, "") == 0 || strcmp(prio, "*") == 0) { + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { + callbackThreadCount[i] = count; + } + } else { + if (!pdbbase) { + errlogPrintf("pdbbase not specified\n"); + return -1; + } + /* Find prio in menuPriority */ + pdbMenu = (dbMenu *)ellFirst(&pdbbase->menuList); + while (pdbMenu) { + gotMatch = (strcmp("menuPriority", pdbMenu->name)==0) ? TRUE : FALSE; + if (gotMatch) { + for (i = 0; i < pdbMenu->nChoice; i++) { + gotMatch = (epicsStrCaseCmp(prio, pdbMenu->papChoiceValue[i])==0) ? TRUE : FALSE; + if (gotMatch) break; + } + if (gotMatch) { + callbackThreadCount[i] = count; + break; + } else { + errlogPrintf("Unknown priority \"%s\"\n", prio); + return -1; + } + } + pdbMenu = (dbMenu *)ellNext(&pdbMenu->node); + } + } + return 0; +} + static void callbackTask(void *arg) { int priority = *(int *)arg; @@ -110,13 +167,11 @@ void callbackShutdown(void) cbCtl = ctlExit; for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { - int lockKey = epicsInterruptLock(); - int ok = epicsRingPointerPush(callbackQ[i], &exitCallback); - epicsInterruptUnlock(lockKey); - epicsEventSignal(callbackSem[i]); - if (ok) epicsEventWait(startStopEvent); - epicsEventDestroy(callbackSem[i]); - epicsRingPointerDelete(callbackQ[i]); + while (callbackThreadsRunning[i]--) { + int ok = epicsRingPointerPush(callbackQ[i], &exitCallback); + epicsEventSignal(callbackSem[i]); + if (ok) epicsEventWait(startStopEvent); + } } epicsTimerQueueRelease(timerQueue); epicsEventDestroy(startStopEvent); @@ -126,6 +181,8 @@ void callbackShutdown(void) void callbackInit(void) { int i; + int j; + char threadName[32]; if(startStopEvent) return; @@ -133,22 +190,32 @@ void callbackInit(void) startStopEvent = epicsEventMustCreate(epicsEventEmpty); cbCtl = ctlRun; timerQueue = epicsTimerQueueAllocate(0,epicsThreadPriorityScanHigh); + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { epicsThreadId tid; callbackSem[i] = epicsEventMustCreate(epicsEventEmpty); - callbackQ[i] = epicsRingPointerCreate(callbackQueueSize); + callbackQ[i] = epicsRingPointerLockedCreate(callbackQueueSize); if (callbackQ[i] == 0) - cantProceed("epicsRingPointerCreate failed for %s\n", - threadName[i]); + cantProceed("epicsRingPointerLockedCreate failed for %s\n", + threadNamePrefix[i]); ringOverflow[i] = FALSE; - tid = epicsThreadCreate(threadName[i], threadPriority[i], - epicsThreadGetStackSize(epicsThreadStackBig), - (EPICSTHREADFUNC)callbackTask, &priorityValue[i]); - if (tid == 0) - cantProceed("Failed to spawn callback task %s\n", threadName[i]); - else - epicsEventWait(startStopEvent); + + for (j = 0; j < callbackThreadCount[i]; j++) { + if (callbackThreadCount[i] > 1 ) + sprintf(threadName, "%s-%d", threadNamePrefix[i], j); + else + strcpy(threadName, threadNamePrefix[i]); + tid = epicsThreadCreate(threadName, threadPriority[i], + epicsThreadGetStackSize(epicsThreadStackBig), + (EPICSTHREADFUNC)callbackTask, &priorityValue[i]); + if (tid == 0) { + cantProceed("Failed to spawn callback thread %s\n", threadName); + } else { + epicsEventWait(startStopEvent); + callbackThreadsRunning[i]++; + } + } } } @@ -157,7 +224,6 @@ void callbackRequest(CALLBACK *pcallback) { int priority; int pushOK; - int lockKey; if (!pcallback) { epicsInterruptContextMessage("callbackRequest: pcallback was NULL\n"); @@ -170,14 +236,12 @@ void callbackRequest(CALLBACK *pcallback) } if (ringOverflow[priority]) return; - lockKey = epicsInterruptLock(); pushOK = epicsRingPointerPush(callbackQ[priority], pcallback); - epicsInterruptUnlock(lockKey); if (!pushOK) { char msg[48] = "callbackRequest: "; - strcat(msg, threadName[priority]); + strcat(msg, threadNamePrefix[priority]); strcat(msg, " ring buffer full\n"); epicsInterruptContextMessage(msg); ringOverflow[priority] = TRUE; diff --git a/src/ioc/db/callback.h b/src/ioc/db/callback.h index 6ce0b1adb..ae06406b5 100644 --- a/src/ioc/db/callback.h +++ b/src/ioc/db/callback.h @@ -3,6 +3,7 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ @@ -68,6 +69,7 @@ epicsShareFunc void callbackCancelDelayed(CALLBACK *pcallback); epicsShareFunc void callbackRequestProcessCallbackDelayed( CALLBACK *pCallback, int Priority, void *pRec, double seconds); epicsShareFunc int callbackSetQueueSize(int size); +epicsShareFunc int callbackParallelThreads(int count, const char *prio); #ifdef __cplusplus } diff --git a/src/ioc/db/dbIocRegister.c b/src/ioc/db/dbIocRegister.c index 0de2d7172..7ef15ac1f 100644 --- a/src/ioc/db/dbIocRegister.c +++ b/src/ioc/db/dbIocRegister.c @@ -23,6 +23,8 @@ #include "dbIocRegister.h" #include "dbState.h" +epicsShareDef int callbackParallelThreadsDefault; + /* dbLoadDatabase */ static const iocshArg dbLoadDatabaseArg0 = { "file name",iocshArgString}; static const iocshArg dbLoadDatabaseArg1 = { "path",iocshArgString}; @@ -298,6 +300,18 @@ static void callbackSetQueueSizeCallFunc(const iocshArgBuf *args) callbackSetQueueSize(args[0].ival); } +/* callbackParallelThreads */ +static const iocshArg callbackParallelThreadsArg0 = { "no of threads", iocshArgInt}; +static const iocshArg callbackParallelThreadsArg1 = { "priority", iocshArgString}; +static const iocshArg * const callbackParallelThreadsArgs[2] = + {&callbackParallelThreadsArg0,&callbackParallelThreadsArg1}; +static const iocshFuncDef callbackParallelThreadsFuncDef = + {"callbackParallelThreads",2,callbackParallelThreadsArgs}; +static void callbackParallelThreadsCallFunc(const iocshArgBuf *args) +{ + callbackParallelThreads(args[0].ival, args[1].sval); +} + /* dbStateCreate */ static const iocshArg dbStateArgName = { "name", iocshArgString }; static const iocshArg * const dbStateCreateArgs[] = { &dbStateArgName }; @@ -394,6 +408,10 @@ void dbIocRegister(void) iocshRegister(&scanpiolFuncDef,scanpiolCallFunc); iocshRegister(&callbackSetQueueSizeFuncDef,callbackSetQueueSizeCallFunc); + iocshRegister(&callbackParallelThreadsFuncDef,callbackParallelThreadsCallFunc); + + /* Needed before callback system is initialized */ + callbackParallelThreadsDefault = epicsThreadGetCPUs(); iocshRegister(&dbStateCreateFuncDef, dbStateCreateCallFunc); iocshRegister(&dbStateSetFuncDef, dbStateSetCallFunc); diff --git a/src/ioc/db/test/Makefile b/src/ioc/db/test/Makefile index 72c6ed24f..dd98ffa9e 100644 --- a/src/ioc/db/test/Makefile +++ b/src/ioc/db/test/Makefile @@ -55,6 +55,11 @@ callbackTest_SRCS += callbackTest.c testHarness_SRCS += callbackTest.c TESTS += callbackTest +TESTPROD_HOST += callbackParallelTest +callbackParallelTest_SRCS += callbackParallelTest.c +testHarness_SRCS += callbackParallelTest.c +TESTS += callbackParallelTest + TESTPROD_HOST += dbStateTest dbStateTest_SRCS += dbStateTest.c testHarness_SRCS += dbStateTest.c diff --git a/src/ioc/db/test/callbackParallelTest.c b/src/ioc/db/test/callbackParallelTest.c new file mode 100644 index 000000000..d07b28346 --- /dev/null +++ b/src/ioc/db/test/callbackParallelTest.c @@ -0,0 +1,184 @@ +/*************************************************************************\ +* Copyright (c) 2008 UChicago Argonne LLC, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2002 The Regents of the University of California, as +* Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* $Revision-Id$ */ + +/* Author: Marty Kraimer Date: 26JAN2000 */ + +#include +#include +#include +#include +#include +#include + +#include "callback.h" +#include "cantProceed.h" +#include "epicsThread.h" +#include "epicsEvent.h" +#include "epicsTime.h" +#include "epicsUnitTest.h" +#include "testMain.h" + +/* + * This test checks both immediate and delayed callbacks in two steps. + * In the first step (pass1) NCALLBACKS immediate callbacks are queued. + * As each is run it starts a second delayed callback (pass2). + * The last delayed callback which runs signals an epicsEvent + * to the main thread. + * + * Two time intervals are measured. The time to queue and run each of + * the immediate callbacks, and the actual delay of the delayed callback. + */ + +#define NCALLBACKS 169 +#define DELAY_QUANTUM 0.25 + +#define TEST_DELAY(i) ((i / NUM_CALLBACK_PRIORITIES) * DELAY_QUANTUM) + +typedef struct myPvt { + CALLBACK cb1; + CALLBACK cb2; + epicsTimeStamp pass1Time; + epicsTimeStamp pass2Time; + double delay; + int pass; + int resultFail; +} myPvt; + +epicsEventId finished; + +static void myCallback(CALLBACK *pCallback) +{ + myPvt *pmyPvt; + + callbackGetUser(pmyPvt, pCallback); + + pmyPvt->pass++; + + if (pmyPvt->pass == 1) { + epicsTimeGetCurrent(&pmyPvt->pass1Time); + callbackRequestDelayed(&pmyPvt->cb2, pmyPvt->delay); + } else if (pmyPvt->pass == 2) { + epicsTimeGetCurrent(&pmyPvt->pass2Time); + } else { + pmyPvt->resultFail = 1; + return; + } +} + +static void finalCallback(CALLBACK *pCallback) +{ + myCallback(pCallback); + epicsEventSignal(finished); +} + +static void updateStats(double *stats, double val) +{ + if (stats[0] > val) stats[0] = val; + if (stats[1] < val) stats[1] = val; + stats[2] += val; + stats[3] += pow(val, 2.0); + stats[4] += 1.; +} + +static void printStats(double *stats, const char* tag) { + testDiag("Priority %4s min/avg/max/sigma = %f / %f / %f / %f", + tag, stats[0], stats[2]/stats[4], stats[1], + sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]); +} + +MAIN(callbackParallelTest) +{ + myPvt *pcbt[NCALLBACKS]; + epicsTimeStamp start; + int noCpus = epicsThreadGetCPUs(); + int i, j; + /* Statistics: min/max/sum/sum^2/n for each priority */ + double setupError[NUM_CALLBACK_PRIORITIES][5]; + double timeError[NUM_CALLBACK_PRIORITIES][5]; + double defaultError[5] = {1,-1,0,0,0}; + + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) + for (j = 0; j < 5; j++) + setupError[i][j] = timeError[i][j] = defaultError[j]; + + testPlan(NCALLBACKS * 2 + 1); + + testDiag("Starting %d parallel callback threads", noCpus); + + callbackParallelThreads(noCpus, ""); + callbackInit(); + epicsThreadSleep(1.0); + + finished = epicsEventMustCreate(epicsEventEmpty); + + for (i = 0; i < NCALLBACKS ; i++) { + pcbt[i] = callocMustSucceed(1, sizeof(myPvt), "pcbt"); + callbackSetCallback(myCallback, &pcbt[i]->cb1); + callbackSetCallback(myCallback, &pcbt[i]->cb2); + callbackSetUser(pcbt[i], &pcbt[i]->cb1); + callbackSetUser(pcbt[i], &pcbt[i]->cb2); + callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb1); + callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb2); + pcbt[i]->delay = TEST_DELAY(i); + pcbt[i]->pass = 0; + } + + /* Last callback is special */ + callbackSetCallback(finalCallback, &pcbt[NCALLBACKS-1]->cb2); + callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb1); + callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb2); + pcbt[NCALLBACKS-1]->delay = TEST_DELAY(NCALLBACKS) + 1.0; + pcbt[NCALLBACKS-1]->pass = 0; + + testOk1(epicsTimeGetCurrent(&start)==epicsTimeOK); + + for (i = 0; i < NCALLBACKS ; i++) { + callbackRequest(&pcbt[i]->cb1); + } + + testDiag("Waiting %.02f sec", pcbt[NCALLBACKS-1]->delay); + + epicsEventWait(finished); + + for (i = 0; i < NCALLBACKS ; i++) { + if(pcbt[i]->resultFail || pcbt[i]->pass!=2) + testFail("pass = %d for delay = %f", pcbt[i]->pass, pcbt[i]->delay); + else { + double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start); + testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05", + pcbt[i]->delay, delta); + updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta); + } + } + + for (i = 0; i < NCALLBACKS ; i++) { + double delta, error; + if(pcbt[i]->resultFail || pcbt[i]->pass!=2) + continue; + delta = epicsTimeDiffInSeconds(&pcbt[i]->pass2Time, &pcbt[i]->pass1Time); + error = delta - pcbt[i]->delay; + testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05", + pcbt[i]->delay, error); + updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error); + } + + testDiag("Setup time statistics"); + printStats(setupError[0], "LOW"); + printStats(setupError[1], "MID"); + printStats(setupError[2], "HIGH"); + + testDiag("Delay time statistics"); + printStats(timeError[0], "LOW"); + printStats(timeError[1], "MID"); + printStats(timeError[2], "HIGH"); + + return testDone(); +} diff --git a/src/ioc/db/test/callbackTest.c b/src/ioc/db/test/callbackTest.c index 47b5516d1..75e65765d 100644 --- a/src/ioc/db/test/callbackTest.c +++ b/src/ioc/db/test/callbackTest.c @@ -3,6 +3,7 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ @@ -79,11 +80,34 @@ static void finalCallback(CALLBACK *pCallback) epicsEventSignal(finished); } +static void updateStats(double *stats, double val) +{ + if (stats[0] > val) stats[0] = val; + if (stats[1] < val) stats[1] = val; + stats[2] += val; + stats[3] += pow(val, 2.0); + stats[4] += 1.; +} + +static void printStats(double *stats, const char* tag) { + testDiag("Priority %4s min/avg/max/sigma = %f / %f / %f / %f", + tag, stats[0], stats[2]/stats[4], stats[1], + sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]); +} + MAIN(callbackTest) { myPvt *pcbt[NCALLBACKS]; epicsTimeStamp start; - int i; + int i, j; + /* Statistics: min/max/sum/sum^2/n for each priority */ + double setupError[NUM_CALLBACK_PRIORITIES][5]; + double timeError[NUM_CALLBACK_PRIORITIES][5]; + double defaultError[5] = {1,-1,0,0,0}; + + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) + for (j = 0; j < 5; j++) + setupError[i][j] = timeError[i][j] = defaultError[j]; testPlan(NCALLBACKS * 2 + 1); @@ -128,8 +152,8 @@ MAIN(callbackTest) double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start); testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05", pcbt[i]->delay, delta); + updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta); } - } for (i = 0; i < NCALLBACKS ; i++) { @@ -140,7 +164,18 @@ MAIN(callbackTest) error = delta - pcbt[i]->delay; testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05", pcbt[i]->delay, error); + updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error); } + testDiag("Setup time statistics"); + printStats(setupError[0], "LOW"); + printStats(setupError[1], "MID"); + printStats(setupError[2], "HIGH"); + + testDiag("Delay time statistics"); + printStats(timeError[0], "LOW"); + printStats(timeError[1], "MID"); + printStats(timeError[2], "HIGH"); + return testDone(); } diff --git a/src/ioc/misc/dbCore.dbd b/src/ioc/misc/dbCore.dbd index 3b22f4663..b383efeb6 100644 --- a/src/ioc/misc/dbCore.dbd +++ b/src/ioc/misc/dbCore.dbd @@ -17,3 +17,5 @@ variable(dbBptNotMonotonic,int) # dbLoadTemplate settings variable(dbTemplateMaxVars,int) +# Default number of parallel callback threads +variable(callbackParallelThreadsDefault,int) diff --git a/src/libCom/ring/epicsRingBytes.c b/src/libCom/ring/epicsRingBytes.c index 2c15ee8f1..cb7e52e83 100644 --- a/src/libCom/ring/epicsRingBytes.c +++ b/src/libCom/ring/epicsRingBytes.c @@ -3,13 +3,16 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. -* EPICS BASE Versions 3.13.7 -* and higher are distributed subject to a Software License Agreement found -* in file LICENSE that is included with this distribution. +* Copyright (c) 2012 ITER Organization. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. \*************************************************************************/ -/* epicsRingBytes.cd */ -/* Author: Eric Norum & Marty Kraimer Date: 15JUL99 */ +/* + * Author: Marty Kraimer Date: 15JUL99 + * Eric Norum + * Ralph Lange + */ #include #include @@ -18,6 +21,7 @@ #include #define epicsExportSharedSymbols +#include "epicsSpin.h" #include "dbDefs.h" #include "epicsRingBytes.h" @@ -30,6 +34,7 @@ #define SLOP 16 typedef struct ringPvt { + epicsSpinId lock; volatile int nextPut; volatile int nextGet; int size; @@ -44,12 +49,23 @@ epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int size) pring->size = size + SLOP; pring->nextGet = 0; pring->nextPut = 0; + pring->lock = 0; + return((void *)pring); +} + +epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesLockedCreate(int size) +{ + ringPvt *pring = (ringPvt *)epicsRingBytesCreate(size); + if(!pring) + return NULL; + pring->lock = epicsSpinCreate(); return((void *)pring); } epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + if (pring->lock) epicsSpinDestroy(pring->lock); free((void *)pring); } @@ -57,11 +73,14 @@ epicsShareFunc int epicsShareAPI epicsRingBytesGet( epicsRingBytesId id, char *value,int nbytes) { ringPvt *pring = (ringPvt *)id; - int nextGet = pring->nextGet; - int nextPut = pring->nextPut; - int size = pring->size; + int nextGet, nextPut, size; int count; + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + size = pring->size; + if (nextGet <= nextPut) { count = nextPut - nextGet; if (count < nbytes) @@ -89,6 +108,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesGet( } } pring->nextGet = nextGet; + + if (pring->lock) epicsSpinUnlock(pring->lock); return nbytes; } @@ -96,23 +117,30 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut( epicsRingBytesId id, char *value,int nbytes) { ringPvt *pring = (ringPvt *)id; - int nextGet = pring->nextGet; - int nextPut = pring->nextPut; - int size = pring->size; + int nextGet, nextPut, size; int freeCount, copyCount, topCount; + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + size = pring->size; + if (nextPut < nextGet) { freeCount = nextGet - nextPut - SLOP; - if (nbytes > freeCount) + if (nbytes > freeCount) { + if (pring->lock) epicsSpinUnlock(pring->lock); return 0; + } if (nbytes) memcpy ((void *)&pring->buffer[nextPut], value, nbytes); nextPut += nbytes; } else { freeCount = size - nextPut + nextGet - SLOP; - if (nbytes > freeCount) + if (nbytes > freeCount) { + if (pring->lock) epicsSpinUnlock(pring->lock); return 0; + } topCount = size - nextPut; copyCount = (nbytes > topCount) ? topCount : nbytes; if (copyCount) @@ -126,6 +154,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut( } } pring->nextPut = nextPut; + + if (pring->lock) epicsSpinUnlock(pring->lock); return nbytes; } @@ -133,14 +163,20 @@ epicsShareFunc void epicsShareAPI epicsRingBytesFlush(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + if (pring->lock) epicsSpinLock(pring->lock); pring->nextGet = pring->nextPut; + if (pring->lock) epicsSpinUnlock(pring->lock); } epicsShareFunc int epicsShareAPI epicsRingBytesFreeBytes(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; - int nextGet = pring->nextGet; - int nextPut = pring->nextPut; + int nextGet, nextPut; + + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + if (pring->lock) epicsSpinUnlock(pring->lock); if (nextPut < nextGet) return nextGet - nextPut - SLOP; @@ -151,8 +187,18 @@ epicsShareFunc int epicsShareAPI epicsRingBytesFreeBytes(epicsRingBytesId id) epicsShareFunc int epicsShareAPI epicsRingBytesUsedBytes(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + int nextGet, nextPut; + int used; - return pring->size - epicsRingBytesFreeBytes(id) - SLOP; + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + if (pring->lock) epicsSpinUnlock(pring->lock); + + used = nextPut - nextGet; + if (used < 0) used += pring->size; + + return used; } epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id) @@ -165,8 +211,13 @@ epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id) epicsShareFunc int epicsShareAPI epicsRingBytesIsEmpty(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + int isEmpty; - return (pring->nextPut == pring->nextGet); + if (pring->lock) epicsSpinLock(pring->lock); + isEmpty = (pring->nextPut == pring->nextGet); + if (pring->lock) epicsSpinUnlock(pring->lock); + + return isEmpty; } epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id) diff --git a/src/libCom/ring/epicsRingBytes.h b/src/libCom/ring/epicsRingBytes.h index 9e5220513..011829bfe 100644 --- a/src/libCom/ring/epicsRingBytes.h +++ b/src/libCom/ring/epicsRingBytes.h @@ -3,13 +3,16 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. -* EPICS BASE Versions 3.13.7 -* and higher are distributed subject to a Software License Agreement found -* in file LICENSE that is included with this distribution. +* Copyright (c) 2012 ITER Organization. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. \*************************************************************************/ -/*epicsRingBytes.h */ -/* Author: Eric Norum & Marty Kraimer Date: 15JUL99 */ +/* + * Author: Marty Kraimer Date: 15JUL99 + * Eric Norum + * Ralph Lange + */ #ifndef INCepicsRingBytesh #define INCepicsRingBytesh @@ -23,6 +26,8 @@ extern "C" { typedef void *epicsRingBytesId; epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int nbytes); +/* Same, but secured by a spinlock */ +epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesLockedCreate(int nbytes); epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id); epicsShareFunc int epicsShareAPI epicsRingBytesGet( epicsRingBytesId id, char *value,int nbytes); @@ -42,6 +47,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id); /* NOTES If there is only one writer it is not necessary to lock for put If there is a single reader it is not necessary to lock for puts + + epicsRingBytesLocked uses a spinlock. */ #endif /* INCepicsRingBytesh */ diff --git a/src/libCom/ring/epicsRingPointer.cpp b/src/libCom/ring/epicsRingPointer.cpp index 3a5cc8c72..9c144cec1 100644 --- a/src/libCom/ring/epicsRingPointer.cpp +++ b/src/libCom/ring/epicsRingPointer.cpp @@ -3,11 +3,16 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2012 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ -/*epicsRingPointer.cpp*/ -/* Author: Marty Kraimer Date: 13OCT2000 */ + +/* + * Author: Marty Kraimer Date: 13OCT2000 + * Ralph Lange + */ + #include #include @@ -22,7 +27,13 @@ typedef epicsRingPointer voidPointer; epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size) { - voidPointer *pvoidPointer = new voidPointer(size); + voidPointer *pvoidPointer = new voidPointer(size, false); + return(reinterpret_cast(pvoidPointer)); +} + +epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerLockedCreate(int size) +{ + voidPointer *pvoidPointer = new voidPointer(size, true); return(reinterpret_cast(pvoidPointer)); } diff --git a/src/libCom/ring/epicsRingPointer.h b/src/libCom/ring/epicsRingPointer.h index 3332782a4..48d62036d 100644 --- a/src/libCom/ring/epicsRingPointer.h +++ b/src/libCom/ring/epicsRingPointer.h @@ -3,12 +3,15 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2012 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ -/*epicsRingPointer.h */ -/* Author: Marty Kraimer Date: 15JUL99 */ +/* + * Author: Marty Kraimer Date: 15JUL99 + * Ralph Lange + */ #ifndef INCepicsRingPointerh #define INCepicsRingPointerh @@ -16,15 +19,18 @@ /* NOTES * If there is only one writer it is not necessary to lock push * If there is a single reader it is not necessary to lock pop + * + * epicsRingPointerLocked uses a spinlock. */ +#include "epicsSpin.h" #include "shareLib.h" #ifdef __cplusplus template class epicsRingPointer { public: /* Functions */ - epicsRingPointer(int size); + epicsRingPointer(int size, bool locked); ~epicsRingPointer(); bool push(T *p); T* pop(); @@ -42,6 +48,7 @@ private: /* Prevent compiler-generated member functions */ epicsRingPointer& operator=(const epicsRingPointer &); private: /* Data */ + epicsSpinId lock; volatile int nextPush; volatile int nextPop; int size; @@ -54,6 +61,8 @@ extern "C" { typedef void *epicsRingPointerId; epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size); +/* Same, but secured by a spinlock */ +epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerLockedCreate(int size); epicsShareFunc void epicsShareAPI epicsRingPointerDelete(epicsRingPointerId id); /*ringPointerPush returns (0,1) if p (was not, was) put on ring*/ epicsShareFunc int epicsShareAPI epicsRingPointerPush(epicsRingPointerId id,void *p); @@ -85,72 +94,105 @@ epicsShareFunc int epicsShareAPI epicsRingPointerIsFull(epicsRingPointerId id); #ifdef __cplusplus template -inline epicsRingPointer::epicsRingPointer(int sz) : - nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1]) {} +inline epicsRingPointer::epicsRingPointer(int sz, bool locked) : + lock(0), nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1]) +{ + if (locked) + lock = epicsSpinCreate(); +} template inline epicsRingPointer::~epicsRingPointer() -{ delete [] buffer;} +{ + if (lock) epicsSpinDestroy(lock); + delete [] buffer; +} template inline bool epicsRingPointer::push(T *p) { + if (lock) epicsSpinLock(lock); int next = nextPush; int newNext = next + 1; if(newNext>=size) newNext=0; - if(newNext==nextPop) return(false); + if (newNext == nextPop) { + if (lock) epicsSpinUnlock(lock); + return(false); + } buffer[next] = p; nextPush = newNext; + if (lock) epicsSpinUnlock(lock); return(true); } template inline T* epicsRingPointer::pop() { + if (lock) epicsSpinLock(lock); int next = nextPop; - if(next == nextPush) return(0); + if (next == nextPush) { + if (lock) epicsSpinUnlock(lock); + return(0); + } T*p = buffer[next]; ++next; if(next >=size) next = 0; nextPop = next; + if (lock) epicsSpinUnlock(lock); return(p); } template inline void epicsRingPointer::flush() { + if (lock) epicsSpinLock(lock); nextPop = 0; nextPush = 0; + if (lock) epicsSpinUnlock(lock); } template inline int epicsRingPointer::getFree() const { + if (lock) epicsSpinLock(lock); int n = nextPop - nextPush - 1; if (n < 0) n += size; + if (lock) epicsSpinUnlock(lock); return n; } template inline int epicsRingPointer::getUsed() const { + if (lock) epicsSpinLock(lock); int n = nextPush - nextPop; if (n < 0) n += size; + if (lock) epicsSpinUnlock(lock); return n; } template inline int epicsRingPointer::getSize() const -{ return(size-1);} +{ + return(size-1); +} template inline bool epicsRingPointer::isEmpty() const -{ return(nextPush==nextPop);} +{ + bool isEmpty; + if (lock) epicsSpinLock(lock); + isEmpty = (nextPush == nextPop); + if (lock) epicsSpinUnlock(lock); + return isEmpty; +} template inline bool epicsRingPointer::isFull() const { + if (lock) epicsSpinLock(lock); int count = nextPush - nextPop +1; + if (lock) epicsSpinUnlock(lock); return((count == 0) || (count == size)); } diff --git a/src/libCom/test/ringPointerTest.c b/src/libCom/test/ringPointerTest.c index fd3171045..e50aeb639 100644 --- a/src/libCom/test/ringPointerTest.c +++ b/src/libCom/test/ringPointerTest.c @@ -3,6 +3,7 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ @@ -26,12 +27,17 @@ #include "testMain.h" #define ringSize 10 +#define consumerCount 4 +#define producerCount 4 static volatile int testExit = 0; +int value[ringSize*2]; typedef struct info { epicsEventId consumerEvent; epicsRingPointerId ring; + int checkOrder; + int value[ringSize*2]; }info; static void consumer(void *arg) @@ -39,16 +45,46 @@ static void consumer(void *arg) info *pinfo = (info *)arg; static int expectedValue=0; int *newvalue; + char myname[20]; - testDiag("Consumer starting"); + epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname)); + testDiag("%s starting", myname); while(1) { epicsEventMustWait(pinfo->consumerEvent); if (testExit) return; - while((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) { - testOk(expectedValue == *newvalue, - "Consumer: %d == %d", expectedValue, *newvalue); - expectedValue = *newvalue + 1; - } + while ((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) { + if (pinfo->checkOrder) { + testOk(expectedValue == *newvalue, + "%s: (got) %d == %d (expected)", myname, *newvalue, expectedValue); + expectedValue = *newvalue + 1; + } else { + testOk(pinfo->value[*newvalue] <= producerCount, "%s: got a %d (%d times seen before)", + myname, *newvalue, pinfo->value[*newvalue]); + } + /* This must be atomic... */ + pinfo->value[*newvalue]++; + epicsThreadSleep(0.05); + } + } +} + +static void producer(void *arg) +{ + info *pinfo = (info *)arg; + char myname[20]; + int i; + + epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname)); + testDiag("%s starting", myname); + for (i=0; iring)) { + epicsThreadSleep(0.2); + if (testExit) return; + } + testOk(epicsRingPointerPush(pinfo->ring, (void *)&value[i]), + "%s: Pushing %d, ring not full", myname, i); + epicsEventSignal(pinfo->consumerEvent); + if (testExit) return; } } @@ -57,21 +93,27 @@ MAIN(ringPointerTest) int i; info *pinfo; epicsEventId consumerEvent; - int value[ringSize*2]; int *pgetValue; epicsRingPointerId ring; epicsThreadId tid; + char threadName[20]; - testPlan(54); + testPlan(256); for (i=0; iconsumerEvent = consumerEvent = epicsEventMustCreate(epicsEventEmpty); if (!consumerEvent) { testAbort("epicsEventMustCreate failed"); } + testDiag("******************************************************"); + testDiag("** Test 1: local ring pointer, check size and order **"); + testDiag("******************************************************"); + pinfo->ring = ring = epicsRingPointerCreate(ringSize); if (!ring) { testAbort("epicsRingPointerCreate failed"); @@ -86,22 +128,74 @@ MAIN(ringPointerTest) } testOk(epicsRingPointerIsEmpty(ring), "Ring empty"); + testDiag("**************************************************************"); + testDiag("** Test 2: unlocked ring pointer, one consumer, check order **"); + testDiag("**************************************************************"); + + pinfo->checkOrder = 1; tid=epicsThreadCreate("consumer", 50, epicsThreadGetStackSize(epicsThreadStackSmall), consumer, pinfo); if(!tid) testAbort("epicsThreadCreate failed"); - epicsThreadSleep(0.1); + epicsThreadSleep(0.2); for (i=0; ivalue[i] == 1, "Value test: %d was processed", i); + } + + testExit = 1; + epicsEventSignal(consumerEvent); + epicsThreadSleep(1.0); + + epicsRingPointerDelete(pinfo->ring); + + testDiag("*************************************************************************************"); + testDiag("** Test 3: locked ring pointer, many consumers, many producers, check no of copies **"); + testDiag("*************************************************************************************"); + + pinfo->ring = ring = epicsRingPointerLockedCreate(ringSize); + if (!ring) { + testAbort("epicsRingPointerLockedCreate failed"); + } + testOk(epicsRingPointerIsEmpty(ring), "Ring empty"); + + for (i=0; ivalue[i] = 0; + testExit = 0; + pinfo->checkOrder = 0; + for (i=0; ivalue[i] == producerCount, "Value test: %d was processed %d times", i, producerCount); + } + testExit = 1; epicsEventSignal(consumerEvent); epicsThreadSleep(1.0);