Merge (cherry-pick) original parts from parallel-cbthreads branch

This commit is contained in:
Ralph Lange
2014-08-25 13:40:18 -07:00
12 changed files with 588 additions and 73 deletions
+87 -23
View File
@@ -3,8 +3,9 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* Copyright (c) 2013 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/* callback.c */
@@ -25,6 +26,7 @@
#include "epicsThread.h"
#include "epicsExit.h"
#include "epicsInterrupt.h"
#include "epicsString.h"
#include "epicsTimer.h"
#include "epicsRingPointer.h"
#include "errlog.h"
@@ -36,6 +38,7 @@
#include "taskwd.h"
#include "errMdef.h"
#include "dbCommon.h"
#include "epicsExport.h"
#define epicsExportSharedSymbols
#include "dbAddr.h"
#include "dbAccessDefs.h"
@@ -48,6 +51,12 @@ static epicsEventId callbackSem[NUM_CALLBACK_PRIORITIES];
static epicsRingPointerId callbackQ[NUM_CALLBACK_PRIORITIES];
static volatile int ringOverflow[NUM_CALLBACK_PRIORITIES];
/* Parallel callback threads (configured and actual counts) */
static int callbackThreadCount[NUM_CALLBACK_PRIORITIES] = { 1, 1, 1 };
static int callbackThreadsRunning[NUM_CALLBACK_PRIORITIES];
int callbackParallelThreadsDefault = 2;
epicsExportAddress(int,callbackParallelThreadsDefault);
/* Timer for Delayed Requests */
static epicsTimerQueueId timerQueue;
@@ -58,7 +67,7 @@ static epicsEventId startStopEvent;
static void *exitCallback;
/* Static data */
static char *threadName[NUM_CALLBACK_PRIORITIES] = {
static char *threadNamePrefix[NUM_CALLBACK_PRIORITIES] = {
"cbLow", "cbMedium", "cbHigh"
};
static unsigned int threadPriority[NUM_CALLBACK_PRIORITIES] = {
@@ -79,6 +88,54 @@ int callbackSetQueueSize(int size)
return 0;
}
int callbackParallelThreads(int count, const char *prio)
{
int i;
dbMenu *pdbMenu;
int gotMatch;
if (startStopEvent) {
errlogPrintf("Callback system already initialized\n");
return -1;
}
if (count < 0)
count = epicsThreadGetCPUs() + count;
else if (count == 0)
count = callbackParallelThreadsDefault;
if (!prio || strcmp(prio, "") == 0 || strcmp(prio, "*") == 0) {
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
callbackThreadCount[i] = count;
}
} else {
if (!pdbbase) {
errlogPrintf("pdbbase not specified\n");
return -1;
}
/* Find prio in menuPriority */
pdbMenu = (dbMenu *)ellFirst(&pdbbase->menuList);
while (pdbMenu) {
gotMatch = (strcmp("menuPriority", pdbMenu->name)==0) ? TRUE : FALSE;
if (gotMatch) {
for (i = 0; i < pdbMenu->nChoice; i++) {
gotMatch = (epicsStrCaseCmp(prio, pdbMenu->papChoiceValue[i])==0) ? TRUE : FALSE;
if (gotMatch) break;
}
if (gotMatch) {
callbackThreadCount[i] = count;
break;
} else {
errlogPrintf("Unknown priority \"%s\"\n", prio);
return -1;
}
}
pdbMenu = (dbMenu *)ellNext(&pdbMenu->node);
}
}
return 0;
}
static void callbackTask(void *arg)
{
int priority = *(int *)arg;
@@ -110,13 +167,11 @@ void callbackShutdown(void)
cbCtl = ctlExit;
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
int lockKey = epicsInterruptLock();
int ok = epicsRingPointerPush(callbackQ[i], &exitCallback);
epicsInterruptUnlock(lockKey);
epicsEventSignal(callbackSem[i]);
if (ok) epicsEventWait(startStopEvent);
epicsEventDestroy(callbackSem[i]);
epicsRingPointerDelete(callbackQ[i]);
while (callbackThreadsRunning[i]--) {
int ok = epicsRingPointerPush(callbackQ[i], &exitCallback);
epicsEventSignal(callbackSem[i]);
if (ok) epicsEventWait(startStopEvent);
}
}
epicsTimerQueueRelease(timerQueue);
epicsEventDestroy(startStopEvent);
@@ -126,6 +181,8 @@ void callbackShutdown(void)
void callbackInit(void)
{
int i;
int j;
char threadName[32];
if(startStopEvent)
return;
@@ -133,22 +190,32 @@ void callbackInit(void)
startStopEvent = epicsEventMustCreate(epicsEventEmpty);
cbCtl = ctlRun;
timerQueue = epicsTimerQueueAllocate(0,epicsThreadPriorityScanHigh);
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
epicsThreadId tid;
callbackSem[i] = epicsEventMustCreate(epicsEventEmpty);
callbackQ[i] = epicsRingPointerCreate(callbackQueueSize);
callbackQ[i] = epicsRingPointerLockedCreate(callbackQueueSize);
if (callbackQ[i] == 0)
cantProceed("epicsRingPointerCreate failed for %s\n",
threadName[i]);
cantProceed("epicsRingPointerLockedCreate failed for %s\n",
threadNamePrefix[i]);
ringOverflow[i] = FALSE;
tid = epicsThreadCreate(threadName[i], threadPriority[i],
epicsThreadGetStackSize(epicsThreadStackBig),
(EPICSTHREADFUNC)callbackTask, &priorityValue[i]);
if (tid == 0)
cantProceed("Failed to spawn callback task %s\n", threadName[i]);
else
epicsEventWait(startStopEvent);
for (j = 0; j < callbackThreadCount[i]; j++) {
if (callbackThreadCount[i] > 1 )
sprintf(threadName, "%s-%d", threadNamePrefix[i], j);
else
strcpy(threadName, threadNamePrefix[i]);
tid = epicsThreadCreate(threadName, threadPriority[i],
epicsThreadGetStackSize(epicsThreadStackBig),
(EPICSTHREADFUNC)callbackTask, &priorityValue[i]);
if (tid == 0) {
cantProceed("Failed to spawn callback thread %s\n", threadName);
} else {
epicsEventWait(startStopEvent);
callbackThreadsRunning[i]++;
}
}
}
}
@@ -157,7 +224,6 @@ void callbackRequest(CALLBACK *pcallback)
{
int priority;
int pushOK;
int lockKey;
if (!pcallback) {
epicsInterruptContextMessage("callbackRequest: pcallback was NULL\n");
@@ -170,14 +236,12 @@ void callbackRequest(CALLBACK *pcallback)
}
if (ringOverflow[priority]) return;
lockKey = epicsInterruptLock();
pushOK = epicsRingPointerPush(callbackQ[priority], pcallback);
epicsInterruptUnlock(lockKey);
if (!pushOK) {
char msg[48] = "callbackRequest: ";
strcat(msg, threadName[priority]);
strcat(msg, threadNamePrefix[priority]);
strcat(msg, " ring buffer full\n");
epicsInterruptContextMessage(msg);
ringOverflow[priority] = TRUE;
+2
View File
@@ -3,6 +3,7 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* Copyright (c) 2013 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
@@ -68,6 +69,7 @@ epicsShareFunc void callbackCancelDelayed(CALLBACK *pcallback);
epicsShareFunc void callbackRequestProcessCallbackDelayed(
CALLBACK *pCallback, int Priority, void *pRec, double seconds);
epicsShareFunc int callbackSetQueueSize(int size);
epicsShareFunc int callbackParallelThreads(int count, const char *prio);
#ifdef __cplusplus
}
+18
View File
@@ -23,6 +23,8 @@
#include "dbIocRegister.h"
#include "dbState.h"
epicsShareDef int callbackParallelThreadsDefault;
/* dbLoadDatabase */
static const iocshArg dbLoadDatabaseArg0 = { "file name",iocshArgString};
static const iocshArg dbLoadDatabaseArg1 = { "path",iocshArgString};
@@ -298,6 +300,18 @@ static void callbackSetQueueSizeCallFunc(const iocshArgBuf *args)
callbackSetQueueSize(args[0].ival);
}
/* callbackParallelThreads */
static const iocshArg callbackParallelThreadsArg0 = { "no of threads", iocshArgInt};
static const iocshArg callbackParallelThreadsArg1 = { "priority", iocshArgString};
static const iocshArg * const callbackParallelThreadsArgs[2] =
{&callbackParallelThreadsArg0,&callbackParallelThreadsArg1};
static const iocshFuncDef callbackParallelThreadsFuncDef =
{"callbackParallelThreads",2,callbackParallelThreadsArgs};
static void callbackParallelThreadsCallFunc(const iocshArgBuf *args)
{
callbackParallelThreads(args[0].ival, args[1].sval);
}
/* dbStateCreate */
static const iocshArg dbStateArgName = { "name", iocshArgString };
static const iocshArg * const dbStateCreateArgs[] = { &dbStateArgName };
@@ -394,6 +408,10 @@ void dbIocRegister(void)
iocshRegister(&scanpiolFuncDef,scanpiolCallFunc);
iocshRegister(&callbackSetQueueSizeFuncDef,callbackSetQueueSizeCallFunc);
iocshRegister(&callbackParallelThreadsFuncDef,callbackParallelThreadsCallFunc);
/* Needed before callback system is initialized */
callbackParallelThreadsDefault = epicsThreadGetCPUs();
iocshRegister(&dbStateCreateFuncDef, dbStateCreateCallFunc);
iocshRegister(&dbStateSetFuncDef, dbStateSetCallFunc);
+5
View File
@@ -55,6 +55,11 @@ callbackTest_SRCS += callbackTest.c
testHarness_SRCS += callbackTest.c
TESTS += callbackTest
TESTPROD_HOST += callbackParallelTest
callbackParallelTest_SRCS += callbackParallelTest.c
testHarness_SRCS += callbackParallelTest.c
TESTS += callbackParallelTest
TESTPROD_HOST += dbStateTest
dbStateTest_SRCS += dbStateTest.c
testHarness_SRCS += dbStateTest.c
+184
View File
@@ -0,0 +1,184 @@
/*************************************************************************\
* Copyright (c) 2008 UChicago Argonne LLC, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* Copyright (c) 2013 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/* $Revision-Id$ */
/* Author: Marty Kraimer Date: 26JAN2000 */
#include <stddef.h>
#include <stdlib.h>
#include <stddef.h>
#include <string.h>
#include <stdio.h>
#include <math.h>
#include "callback.h"
#include "cantProceed.h"
#include "epicsThread.h"
#include "epicsEvent.h"
#include "epicsTime.h"
#include "epicsUnitTest.h"
#include "testMain.h"
/*
* This test checks both immediate and delayed callbacks in two steps.
* In the first step (pass1) NCALLBACKS immediate callbacks are queued.
* As each is run it starts a second delayed callback (pass2).
* The last delayed callback which runs signals an epicsEvent
* to the main thread.
*
* Two time intervals are measured. The time to queue and run each of
* the immediate callbacks, and the actual delay of the delayed callback.
*/
#define NCALLBACKS 169
#define DELAY_QUANTUM 0.25
#define TEST_DELAY(i) ((i / NUM_CALLBACK_PRIORITIES) * DELAY_QUANTUM)
typedef struct myPvt {
CALLBACK cb1;
CALLBACK cb2;
epicsTimeStamp pass1Time;
epicsTimeStamp pass2Time;
double delay;
int pass;
int resultFail;
} myPvt;
epicsEventId finished;
static void myCallback(CALLBACK *pCallback)
{
myPvt *pmyPvt;
callbackGetUser(pmyPvt, pCallback);
pmyPvt->pass++;
if (pmyPvt->pass == 1) {
epicsTimeGetCurrent(&pmyPvt->pass1Time);
callbackRequestDelayed(&pmyPvt->cb2, pmyPvt->delay);
} else if (pmyPvt->pass == 2) {
epicsTimeGetCurrent(&pmyPvt->pass2Time);
} else {
pmyPvt->resultFail = 1;
return;
}
}
static void finalCallback(CALLBACK *pCallback)
{
myCallback(pCallback);
epicsEventSignal(finished);
}
static void updateStats(double *stats, double val)
{
if (stats[0] > val) stats[0] = val;
if (stats[1] < val) stats[1] = val;
stats[2] += val;
stats[3] += pow(val, 2.0);
stats[4] += 1.;
}
static void printStats(double *stats, const char* tag) {
testDiag("Priority %4s min/avg/max/sigma = %f / %f / %f / %f",
tag, stats[0], stats[2]/stats[4], stats[1],
sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]);
}
MAIN(callbackParallelTest)
{
myPvt *pcbt[NCALLBACKS];
epicsTimeStamp start;
int noCpus = epicsThreadGetCPUs();
int i, j;
/* Statistics: min/max/sum/sum^2/n for each priority */
double setupError[NUM_CALLBACK_PRIORITIES][5];
double timeError[NUM_CALLBACK_PRIORITIES][5];
double defaultError[5] = {1,-1,0,0,0};
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++)
for (j = 0; j < 5; j++)
setupError[i][j] = timeError[i][j] = defaultError[j];
testPlan(NCALLBACKS * 2 + 1);
testDiag("Starting %d parallel callback threads", noCpus);
callbackParallelThreads(noCpus, "");
callbackInit();
epicsThreadSleep(1.0);
finished = epicsEventMustCreate(epicsEventEmpty);
for (i = 0; i < NCALLBACKS ; i++) {
pcbt[i] = callocMustSucceed(1, sizeof(myPvt), "pcbt");
callbackSetCallback(myCallback, &pcbt[i]->cb1);
callbackSetCallback(myCallback, &pcbt[i]->cb2);
callbackSetUser(pcbt[i], &pcbt[i]->cb1);
callbackSetUser(pcbt[i], &pcbt[i]->cb2);
callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb1);
callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb2);
pcbt[i]->delay = TEST_DELAY(i);
pcbt[i]->pass = 0;
}
/* Last callback is special */
callbackSetCallback(finalCallback, &pcbt[NCALLBACKS-1]->cb2);
callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb1);
callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb2);
pcbt[NCALLBACKS-1]->delay = TEST_DELAY(NCALLBACKS) + 1.0;
pcbt[NCALLBACKS-1]->pass = 0;
testOk1(epicsTimeGetCurrent(&start)==epicsTimeOK);
for (i = 0; i < NCALLBACKS ; i++) {
callbackRequest(&pcbt[i]->cb1);
}
testDiag("Waiting %.02f sec", pcbt[NCALLBACKS-1]->delay);
epicsEventWait(finished);
for (i = 0; i < NCALLBACKS ; i++) {
if(pcbt[i]->resultFail || pcbt[i]->pass!=2)
testFail("pass = %d for delay = %f", pcbt[i]->pass, pcbt[i]->delay);
else {
double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start);
testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05",
pcbt[i]->delay, delta);
updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta);
}
}
for (i = 0; i < NCALLBACKS ; i++) {
double delta, error;
if(pcbt[i]->resultFail || pcbt[i]->pass!=2)
continue;
delta = epicsTimeDiffInSeconds(&pcbt[i]->pass2Time, &pcbt[i]->pass1Time);
error = delta - pcbt[i]->delay;
testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05",
pcbt[i]->delay, error);
updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error);
}
testDiag("Setup time statistics");
printStats(setupError[0], "LOW");
printStats(setupError[1], "MID");
printStats(setupError[2], "HIGH");
testDiag("Delay time statistics");
printStats(timeError[0], "LOW");
printStats(timeError[1], "MID");
printStats(timeError[2], "HIGH");
return testDone();
}
+37 -2
View File
@@ -3,6 +3,7 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* Copyright (c) 2013 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
@@ -79,11 +80,34 @@ static void finalCallback(CALLBACK *pCallback)
epicsEventSignal(finished);
}
static void updateStats(double *stats, double val)
{
if (stats[0] > val) stats[0] = val;
if (stats[1] < val) stats[1] = val;
stats[2] += val;
stats[3] += pow(val, 2.0);
stats[4] += 1.;
}
static void printStats(double *stats, const char* tag) {
testDiag("Priority %4s min/avg/max/sigma = %f / %f / %f / %f",
tag, stats[0], stats[2]/stats[4], stats[1],
sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]);
}
MAIN(callbackTest)
{
myPvt *pcbt[NCALLBACKS];
epicsTimeStamp start;
int i;
int i, j;
/* Statistics: min/max/sum/sum^2/n for each priority */
double setupError[NUM_CALLBACK_PRIORITIES][5];
double timeError[NUM_CALLBACK_PRIORITIES][5];
double defaultError[5] = {1,-1,0,0,0};
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++)
for (j = 0; j < 5; j++)
setupError[i][j] = timeError[i][j] = defaultError[j];
testPlan(NCALLBACKS * 2 + 1);
@@ -128,8 +152,8 @@ MAIN(callbackTest)
double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start);
testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05",
pcbt[i]->delay, delta);
updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta);
}
}
for (i = 0; i < NCALLBACKS ; i++) {
@@ -140,7 +164,18 @@ MAIN(callbackTest)
error = delta - pcbt[i]->delay;
testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05",
pcbt[i]->delay, error);
updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error);
}
testDiag("Setup time statistics");
printStats(setupError[0], "LOW");
printStats(setupError[1], "MID");
printStats(setupError[2], "HIGH");
testDiag("Delay time statistics");
printStats(timeError[0], "LOW");
printStats(timeError[1], "MID");
printStats(timeError[2], "HIGH");
return testDone();
}
+2
View File
@@ -17,3 +17,5 @@ variable(dbBptNotMonotonic,int)
# dbLoadTemplate settings
variable(dbTemplateMaxVars,int)
# Default number of parallel callback threads
variable(callbackParallelThreadsDefault,int)
+68 -17
View File
@@ -3,13 +3,16 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
* Copyright (c) 2012 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/* epicsRingBytes.cd */
/* Author: Eric Norum & Marty Kraimer Date: 15JUL99 */
/*
* Author: Marty Kraimer Date: 15JUL99
* Eric Norum
* Ralph Lange <Ralph.Lange@gmx.de>
*/
#include <stddef.h>
#include <string.h>
@@ -18,6 +21,7 @@
#include <stdio.h>
#define epicsExportSharedSymbols
#include "epicsSpin.h"
#include "dbDefs.h"
#include "epicsRingBytes.h"
@@ -30,6 +34,7 @@
#define SLOP 16
typedef struct ringPvt {
epicsSpinId lock;
volatile int nextPut;
volatile int nextGet;
int size;
@@ -44,12 +49,23 @@ epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int size)
pring->size = size + SLOP;
pring->nextGet = 0;
pring->nextPut = 0;
pring->lock = 0;
return((void *)pring);
}
epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesLockedCreate(int size)
{
ringPvt *pring = (ringPvt *)epicsRingBytesCreate(size);
if(!pring)
return NULL;
pring->lock = epicsSpinCreate();
return((void *)pring);
}
epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id)
{
ringPvt *pring = (ringPvt *)id;
if (pring->lock) epicsSpinDestroy(pring->lock);
free((void *)pring);
}
@@ -57,11 +73,14 @@ epicsShareFunc int epicsShareAPI epicsRingBytesGet(
epicsRingBytesId id, char *value,int nbytes)
{
ringPvt *pring = (ringPvt *)id;
int nextGet = pring->nextGet;
int nextPut = pring->nextPut;
int size = pring->size;
int nextGet, nextPut, size;
int count;
if (pring->lock) epicsSpinLock(pring->lock);
nextGet = pring->nextGet;
nextPut = pring->nextPut;
size = pring->size;
if (nextGet <= nextPut) {
count = nextPut - nextGet;
if (count < nbytes)
@@ -89,6 +108,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesGet(
}
}
pring->nextGet = nextGet;
if (pring->lock) epicsSpinUnlock(pring->lock);
return nbytes;
}
@@ -96,23 +117,30 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut(
epicsRingBytesId id, char *value,int nbytes)
{
ringPvt *pring = (ringPvt *)id;
int nextGet = pring->nextGet;
int nextPut = pring->nextPut;
int size = pring->size;
int nextGet, nextPut, size;
int freeCount, copyCount, topCount;
if (pring->lock) epicsSpinLock(pring->lock);
nextGet = pring->nextGet;
nextPut = pring->nextPut;
size = pring->size;
if (nextPut < nextGet) {
freeCount = nextGet - nextPut - SLOP;
if (nbytes > freeCount)
if (nbytes > freeCount) {
if (pring->lock) epicsSpinUnlock(pring->lock);
return 0;
}
if (nbytes)
memcpy ((void *)&pring->buffer[nextPut], value, nbytes);
nextPut += nbytes;
}
else {
freeCount = size - nextPut + nextGet - SLOP;
if (nbytes > freeCount)
if (nbytes > freeCount) {
if (pring->lock) epicsSpinUnlock(pring->lock);
return 0;
}
topCount = size - nextPut;
copyCount = (nbytes > topCount) ? topCount : nbytes;
if (copyCount)
@@ -126,6 +154,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut(
}
}
pring->nextPut = nextPut;
if (pring->lock) epicsSpinUnlock(pring->lock);
return nbytes;
}
@@ -133,14 +163,20 @@ epicsShareFunc void epicsShareAPI epicsRingBytesFlush(epicsRingBytesId id)
{
ringPvt *pring = (ringPvt *)id;
if (pring->lock) epicsSpinLock(pring->lock);
pring->nextGet = pring->nextPut;
if (pring->lock) epicsSpinUnlock(pring->lock);
}
epicsShareFunc int epicsShareAPI epicsRingBytesFreeBytes(epicsRingBytesId id)
{
ringPvt *pring = (ringPvt *)id;
int nextGet = pring->nextGet;
int nextPut = pring->nextPut;
int nextGet, nextPut;
if (pring->lock) epicsSpinLock(pring->lock);
nextGet = pring->nextGet;
nextPut = pring->nextPut;
if (pring->lock) epicsSpinUnlock(pring->lock);
if (nextPut < nextGet)
return nextGet - nextPut - SLOP;
@@ -151,8 +187,18 @@ epicsShareFunc int epicsShareAPI epicsRingBytesFreeBytes(epicsRingBytesId id)
epicsShareFunc int epicsShareAPI epicsRingBytesUsedBytes(epicsRingBytesId id)
{
ringPvt *pring = (ringPvt *)id;
int nextGet, nextPut;
int used;
return pring->size - epicsRingBytesFreeBytes(id) - SLOP;
if (pring->lock) epicsSpinLock(pring->lock);
nextGet = pring->nextGet;
nextPut = pring->nextPut;
if (pring->lock) epicsSpinUnlock(pring->lock);
used = nextPut - nextGet;
if (used < 0) used += pring->size;
return used;
}
epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id)
@@ -165,8 +211,13 @@ epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id)
epicsShareFunc int epicsShareAPI epicsRingBytesIsEmpty(epicsRingBytesId id)
{
ringPvt *pring = (ringPvt *)id;
int isEmpty;
return (pring->nextPut == pring->nextGet);
if (pring->lock) epicsSpinLock(pring->lock);
isEmpty = (pring->nextPut == pring->nextGet);
if (pring->lock) epicsSpinUnlock(pring->lock);
return isEmpty;
}
epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id)
+12 -5
View File
@@ -3,13 +3,16 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
* Copyright (c) 2012 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*epicsRingBytes.h */
/* Author: Eric Norum & Marty Kraimer Date: 15JUL99 */
/*
* Author: Marty Kraimer Date: 15JUL99
* Eric Norum
* Ralph Lange <Ralph.Lange@gmx.de>
*/
#ifndef INCepicsRingBytesh
#define INCepicsRingBytesh
@@ -23,6 +26,8 @@ extern "C" {
typedef void *epicsRingBytesId;
epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int nbytes);
/* Same, but secured by a spinlock */
epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesLockedCreate(int nbytes);
epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id);
epicsShareFunc int epicsShareAPI epicsRingBytesGet(
epicsRingBytesId id, char *value,int nbytes);
@@ -42,6 +47,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id);
/* NOTES
If there is only one writer it is not necessary to lock for put
If there is a single reader it is not necessary to lock for puts
epicsRingBytesLocked uses a spinlock.
*/
#endif /* INCepicsRingBytesh */
+14 -3
View File
@@ -3,11 +3,16 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* Copyright (c) 2012 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*epicsRingPointer.cpp*/
/* Author: Marty Kraimer Date: 13OCT2000 */
/*
* Author: Marty Kraimer Date: 13OCT2000
* Ralph Lange <Ralph.Lange@gmx.de>
*/
#include <stddef.h>
#include <string.h>
@@ -22,7 +27,13 @@ typedef epicsRingPointer<void> voidPointer;
epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size)
{
voidPointer *pvoidPointer = new voidPointer(size);
voidPointer *pvoidPointer = new voidPointer(size, false);
return(reinterpret_cast<void *>(pvoidPointer));
}
epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerLockedCreate(int size)
{
voidPointer *pvoidPointer = new voidPointer(size, true);
return(reinterpret_cast<void *>(pvoidPointer));
}
+52 -10
View File
@@ -3,12 +3,15 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* Copyright (c) 2012 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*epicsRingPointer.h */
/* Author: Marty Kraimer Date: 15JUL99 */
/*
* Author: Marty Kraimer Date: 15JUL99
* Ralph Lange <Ralph.Lange@gmx.de>
*/
#ifndef INCepicsRingPointerh
#define INCepicsRingPointerh
@@ -16,15 +19,18 @@
/* NOTES
* If there is only one writer it is not necessary to lock push
* If there is a single reader it is not necessary to lock pop
*
* epicsRingPointerLocked uses a spinlock.
*/
#include "epicsSpin.h"
#include "shareLib.h"
#ifdef __cplusplus
template <class T>
class epicsRingPointer {
public: /* Functions */
epicsRingPointer(int size);
epicsRingPointer(int size, bool locked);
~epicsRingPointer();
bool push(T *p);
T* pop();
@@ -42,6 +48,7 @@ private: /* Prevent compiler-generated member functions */
epicsRingPointer& operator=(const epicsRingPointer &);
private: /* Data */
epicsSpinId lock;
volatile int nextPush;
volatile int nextPop;
int size;
@@ -54,6 +61,8 @@ extern "C" {
typedef void *epicsRingPointerId;
epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size);
/* Same, but secured by a spinlock */
epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerLockedCreate(int size);
epicsShareFunc void epicsShareAPI epicsRingPointerDelete(epicsRingPointerId id);
/*ringPointerPush returns (0,1) if p (was not, was) put on ring*/
epicsShareFunc int epicsShareAPI epicsRingPointerPush(epicsRingPointerId id,void *p);
@@ -85,72 +94,105 @@ epicsShareFunc int epicsShareAPI epicsRingPointerIsFull(epicsRingPointerId id);
#ifdef __cplusplus
template <class T>
inline epicsRingPointer<T>::epicsRingPointer(int sz) :
nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1]) {}
inline epicsRingPointer<T>::epicsRingPointer(int sz, bool locked) :
lock(0), nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1])
{
if (locked)
lock = epicsSpinCreate();
}
template <class T>
inline epicsRingPointer<T>::~epicsRingPointer()
{ delete [] buffer;}
{
if (lock) epicsSpinDestroy(lock);
delete [] buffer;
}
template <class T>
inline bool epicsRingPointer<T>::push(T *p)
{
if (lock) epicsSpinLock(lock);
int next = nextPush;
int newNext = next + 1;
if(newNext>=size) newNext=0;
if(newNext==nextPop) return(false);
if (newNext == nextPop) {
if (lock) epicsSpinUnlock(lock);
return(false);
}
buffer[next] = p;
nextPush = newNext;
if (lock) epicsSpinUnlock(lock);
return(true);
}
template <class T>
inline T* epicsRingPointer<T>::pop()
{
if (lock) epicsSpinLock(lock);
int next = nextPop;
if(next == nextPush) return(0);
if (next == nextPush) {
if (lock) epicsSpinUnlock(lock);
return(0);
}
T*p = buffer[next];
++next;
if(next >=size) next = 0;
nextPop = next;
if (lock) epicsSpinUnlock(lock);
return(p);
}
template <class T>
inline void epicsRingPointer<T>::flush()
{
if (lock) epicsSpinLock(lock);
nextPop = 0;
nextPush = 0;
if (lock) epicsSpinUnlock(lock);
}
template <class T>
inline int epicsRingPointer<T>::getFree() const
{
if (lock) epicsSpinLock(lock);
int n = nextPop - nextPush - 1;
if (n < 0) n += size;
if (lock) epicsSpinUnlock(lock);
return n;
}
template <class T>
inline int epicsRingPointer<T>::getUsed() const
{
if (lock) epicsSpinLock(lock);
int n = nextPush - nextPop;
if (n < 0) n += size;
if (lock) epicsSpinUnlock(lock);
return n;
}
template <class T>
inline int epicsRingPointer<T>::getSize() const
{ return(size-1);}
{
return(size-1);
}
template <class T>
inline bool epicsRingPointer<T>::isEmpty() const
{ return(nextPush==nextPop);}
{
bool isEmpty;
if (lock) epicsSpinLock(lock);
isEmpty = (nextPush == nextPop);
if (lock) epicsSpinUnlock(lock);
return isEmpty;
}
template <class T>
inline bool epicsRingPointer<T>::isFull() const
{
if (lock) epicsSpinLock(lock);
int count = nextPush - nextPop +1;
if (lock) epicsSpinUnlock(lock);
return((count == 0) || (count == size));
}
+107 -13
View File
@@ -3,6 +3,7 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* Copyright (c) 2013 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
@@ -26,12 +27,17 @@
#include "testMain.h"
#define ringSize 10
#define consumerCount 4
#define producerCount 4
static volatile int testExit = 0;
int value[ringSize*2];
typedef struct info {
epicsEventId consumerEvent;
epicsRingPointerId ring;
int checkOrder;
int value[ringSize*2];
}info;
static void consumer(void *arg)
@@ -39,16 +45,46 @@ static void consumer(void *arg)
info *pinfo = (info *)arg;
static int expectedValue=0;
int *newvalue;
char myname[20];
testDiag("Consumer starting");
epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname));
testDiag("%s starting", myname);
while(1) {
epicsEventMustWait(pinfo->consumerEvent);
if (testExit) return;
while((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) {
testOk(expectedValue == *newvalue,
"Consumer: %d == %d", expectedValue, *newvalue);
expectedValue = *newvalue + 1;
}
while ((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) {
if (pinfo->checkOrder) {
testOk(expectedValue == *newvalue,
"%s: (got) %d == %d (expected)", myname, *newvalue, expectedValue);
expectedValue = *newvalue + 1;
} else {
testOk(pinfo->value[*newvalue] <= producerCount, "%s: got a %d (%d times seen before)",
myname, *newvalue, pinfo->value[*newvalue]);
}
/* This must be atomic... */
pinfo->value[*newvalue]++;
epicsThreadSleep(0.05);
}
}
}
static void producer(void *arg)
{
info *pinfo = (info *)arg;
char myname[20];
int i;
epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname));
testDiag("%s starting", myname);
for (i=0; i<ringSize*2; i++) {
while (epicsRingPointerIsFull(pinfo->ring)) {
epicsThreadSleep(0.2);
if (testExit) return;
}
testOk(epicsRingPointerPush(pinfo->ring, (void *)&value[i]),
"%s: Pushing %d, ring not full", myname, i);
epicsEventSignal(pinfo->consumerEvent);
if (testExit) return;
}
}
@@ -57,21 +93,27 @@ MAIN(ringPointerTest)
int i;
info *pinfo;
epicsEventId consumerEvent;
int value[ringSize*2];
int *pgetValue;
epicsRingPointerId ring;
epicsThreadId tid;
char threadName[20];
testPlan(54);
testPlan(256);
for (i=0; i<ringSize*2; i++) value[i] = i;
pinfo = calloc(1,sizeof(info));
if(!pinfo) testAbort("calloc failed");
pinfo->consumerEvent = consumerEvent = epicsEventMustCreate(epicsEventEmpty);
if (!consumerEvent) {
testAbort("epicsEventMustCreate failed");
}
testDiag("******************************************************");
testDiag("** Test 1: local ring pointer, check size and order **");
testDiag("******************************************************");
pinfo->ring = ring = epicsRingPointerCreate(ringSize);
if (!ring) {
testAbort("epicsRingPointerCreate failed");
@@ -86,22 +128,74 @@ MAIN(ringPointerTest)
}
testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
testDiag("**************************************************************");
testDiag("** Test 2: unlocked ring pointer, one consumer, check order **");
testDiag("**************************************************************");
pinfo->checkOrder = 1;
tid=epicsThreadCreate("consumer", 50,
epicsThreadGetStackSize(epicsThreadStackSmall), consumer, pinfo);
if(!tid) testAbort("epicsThreadCreate failed");
epicsThreadSleep(0.1);
epicsThreadSleep(0.2);
for (i=0; i<ringSize*2; i++) {
if (epicsRingPointerIsFull(ring)) {
epicsEventSignal(consumerEvent);
epicsThreadSleep(0.2);
}
testOk(epicsRingPointerPush(ring, (void *)&value[i]), "Ring not full");
testOk(epicsRingPointerPush(ring, (void *)&value[i]), "Pushing %d, ring not full", i);
epicsEventSignal(consumerEvent);
}
epicsEventSignal(consumerEvent);
epicsThreadSleep(0.2);
epicsThreadSleep(1.0);
testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
for (i=0; i<ringSize*2; i++) {
testOk(pinfo->value[i] == 1, "Value test: %d was processed", i);
}
testExit = 1;
epicsEventSignal(consumerEvent);
epicsThreadSleep(1.0);
epicsRingPointerDelete(pinfo->ring);
testDiag("*************************************************************************************");
testDiag("** Test 3: locked ring pointer, many consumers, many producers, check no of copies **");
testDiag("*************************************************************************************");
pinfo->ring = ring = epicsRingPointerLockedCreate(ringSize);
if (!ring) {
testAbort("epicsRingPointerLockedCreate failed");
}
testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
for (i=0; i<ringSize*2; i++) pinfo->value[i] = 0;
testExit = 0;
pinfo->checkOrder = 0;
for (i=0; i<consumerCount; i++) {
sprintf(threadName, "consumer%d", i);
tid=epicsThreadCreate(threadName, 50,
epicsThreadGetStackSize(epicsThreadStackSmall), consumer, pinfo);
if(!tid) testAbort("epicsThreadCreate failed");
}
epicsThreadSleep(0.2);
for (i=0; i<producerCount; i++) {
sprintf(threadName, "producer%d", i);
tid=epicsThreadCreate(threadName, 50,
epicsThreadGetStackSize(epicsThreadStackSmall), producer, pinfo);
if(!tid) testAbort("epicsThreadCreate failed");
}
epicsThreadSleep(0.5);
epicsEventSignal(consumerEvent);
epicsThreadSleep(1.0);
testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
for (i=0; i<ringSize*2; i++) {
testOk(pinfo->value[i] == producerCount, "Value test: %d was processed %d times", i, producerCount);
}
testExit = 1;
epicsEventSignal(consumerEvent);
epicsThreadSleep(1.0);