From 3718cfa673531ce0ca625080ccc7d0b1bad6bc66 Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Fri, 29 Mar 2013 11:59:30 +0100 Subject: [PATCH 01/21] ioc/db/test: add timing statistics to callbackTest --- src/ioc/db/test/callbackTest.c | 39 ++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/src/ioc/db/test/callbackTest.c b/src/ioc/db/test/callbackTest.c index 47b5516d1..75e65765d 100644 --- a/src/ioc/db/test/callbackTest.c +++ b/src/ioc/db/test/callbackTest.c @@ -3,6 +3,7 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ @@ -79,11 +80,34 @@ static void finalCallback(CALLBACK *pCallback) epicsEventSignal(finished); } +static void updateStats(double *stats, double val) +{ + if (stats[0] > val) stats[0] = val; + if (stats[1] < val) stats[1] = val; + stats[2] += val; + stats[3] += pow(val, 2.0); + stats[4] += 1.; +} + +static void printStats(double *stats, const char* tag) { + testDiag("Priority %4s min/avg/max/sigma = %f / %f / %f / %f", + tag, stats[0], stats[2]/stats[4], stats[1], + sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]); +} + MAIN(callbackTest) { myPvt *pcbt[NCALLBACKS]; epicsTimeStamp start; - int i; + int i, j; + /* Statistics: min/max/sum/sum^2/n for each priority */ + double setupError[NUM_CALLBACK_PRIORITIES][5]; + double timeError[NUM_CALLBACK_PRIORITIES][5]; + double defaultError[5] = {1,-1,0,0,0}; + + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) + for (j = 0; j < 5; j++) + setupError[i][j] = timeError[i][j] = defaultError[j]; testPlan(NCALLBACKS * 2 + 1); @@ -128,8 +152,8 @@ MAIN(callbackTest) double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start); testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05", pcbt[i]->delay, delta); + updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta); } - } for (i = 0; i < NCALLBACKS ; i++) { @@ -140,7 +164,18 @@ MAIN(callbackTest) error = delta - pcbt[i]->delay; testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05", pcbt[i]->delay, error); + updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error); } + testDiag("Setup time statistics"); + printStats(setupError[0], "LOW"); + printStats(setupError[1], "MID"); + printStats(setupError[2], "HIGH"); + + testDiag("Delay time statistics"); + printStats(timeError[0], "LOW"); + printStats(timeError[1], "MID"); + printStats(timeError[2], "HIGH"); + return testDone(); } From d6f5b505503feb11720a647fe35d6984bd860536 Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Fri, 29 Mar 2013 17:23:24 +0100 Subject: [PATCH 02/21] libCom/ring: add thread safe versions of ring buffers (using spinlocks) --- src/libCom/ring/epicsRingBytes.c | 85 ++++++++++++++++++++++------ src/libCom/ring/epicsRingBytes.h | 17 ++++-- src/libCom/ring/epicsRingPointer.cpp | 17 +++++- src/libCom/ring/epicsRingPointer.h | 62 ++++++++++++++++---- 4 files changed, 146 insertions(+), 35 deletions(-) diff --git a/src/libCom/ring/epicsRingBytes.c b/src/libCom/ring/epicsRingBytes.c index 2c15ee8f1..cb7e52e83 100644 --- a/src/libCom/ring/epicsRingBytes.c +++ b/src/libCom/ring/epicsRingBytes.c @@ -3,13 +3,16 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. -* EPICS BASE Versions 3.13.7 -* and higher are distributed subject to a Software License Agreement found -* in file LICENSE that is included with this distribution. +* Copyright (c) 2012 ITER Organization. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. \*************************************************************************/ -/* epicsRingBytes.cd */ -/* Author: Eric Norum & Marty Kraimer Date: 15JUL99 */ +/* + * Author: Marty Kraimer Date: 15JUL99 + * Eric Norum + * Ralph Lange + */ #include #include @@ -18,6 +21,7 @@ #include #define epicsExportSharedSymbols +#include "epicsSpin.h" #include "dbDefs.h" #include "epicsRingBytes.h" @@ -30,6 +34,7 @@ #define SLOP 16 typedef struct ringPvt { + epicsSpinId lock; volatile int nextPut; volatile int nextGet; int size; @@ -44,12 +49,23 @@ epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int size) pring->size = size + SLOP; pring->nextGet = 0; pring->nextPut = 0; + pring->lock = 0; + return((void *)pring); +} + +epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesLockedCreate(int size) +{ + ringPvt *pring = (ringPvt *)epicsRingBytesCreate(size); + if(!pring) + return NULL; + pring->lock = epicsSpinCreate(); return((void *)pring); } epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + if (pring->lock) epicsSpinDestroy(pring->lock); free((void *)pring); } @@ -57,11 +73,14 @@ epicsShareFunc int epicsShareAPI epicsRingBytesGet( epicsRingBytesId id, char *value,int nbytes) { ringPvt *pring = (ringPvt *)id; - int nextGet = pring->nextGet; - int nextPut = pring->nextPut; - int size = pring->size; + int nextGet, nextPut, size; int count; + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + size = pring->size; + if (nextGet <= nextPut) { count = nextPut - nextGet; if (count < nbytes) @@ -89,6 +108,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesGet( } } pring->nextGet = nextGet; + + if (pring->lock) epicsSpinUnlock(pring->lock); return nbytes; } @@ -96,23 +117,30 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut( epicsRingBytesId id, char *value,int nbytes) { ringPvt *pring = (ringPvt *)id; - int nextGet = pring->nextGet; - int nextPut = pring->nextPut; - int size = pring->size; + int nextGet, nextPut, size; int freeCount, copyCount, topCount; + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + size = pring->size; + if (nextPut < nextGet) { freeCount = nextGet - nextPut - SLOP; - if (nbytes > freeCount) + if (nbytes > freeCount) { + if (pring->lock) epicsSpinUnlock(pring->lock); return 0; + } if (nbytes) memcpy ((void *)&pring->buffer[nextPut], value, nbytes); nextPut += nbytes; } else { freeCount = size - nextPut + nextGet - SLOP; - if (nbytes > freeCount) + if (nbytes > freeCount) { + if (pring->lock) epicsSpinUnlock(pring->lock); return 0; + } topCount = size - nextPut; copyCount = (nbytes > topCount) ? topCount : nbytes; if (copyCount) @@ -126,6 +154,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut( } } pring->nextPut = nextPut; + + if (pring->lock) epicsSpinUnlock(pring->lock); return nbytes; } @@ -133,14 +163,20 @@ epicsShareFunc void epicsShareAPI epicsRingBytesFlush(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + if (pring->lock) epicsSpinLock(pring->lock); pring->nextGet = pring->nextPut; + if (pring->lock) epicsSpinUnlock(pring->lock); } epicsShareFunc int epicsShareAPI epicsRingBytesFreeBytes(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; - int nextGet = pring->nextGet; - int nextPut = pring->nextPut; + int nextGet, nextPut; + + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + if (pring->lock) epicsSpinUnlock(pring->lock); if (nextPut < nextGet) return nextGet - nextPut - SLOP; @@ -151,8 +187,18 @@ epicsShareFunc int epicsShareAPI epicsRingBytesFreeBytes(epicsRingBytesId id) epicsShareFunc int epicsShareAPI epicsRingBytesUsedBytes(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + int nextGet, nextPut; + int used; - return pring->size - epicsRingBytesFreeBytes(id) - SLOP; + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + if (pring->lock) epicsSpinUnlock(pring->lock); + + used = nextPut - nextGet; + if (used < 0) used += pring->size; + + return used; } epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id) @@ -165,8 +211,13 @@ epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id) epicsShareFunc int epicsShareAPI epicsRingBytesIsEmpty(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + int isEmpty; - return (pring->nextPut == pring->nextGet); + if (pring->lock) epicsSpinLock(pring->lock); + isEmpty = (pring->nextPut == pring->nextGet); + if (pring->lock) epicsSpinUnlock(pring->lock); + + return isEmpty; } epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id) diff --git a/src/libCom/ring/epicsRingBytes.h b/src/libCom/ring/epicsRingBytes.h index 9e5220513..011829bfe 100644 --- a/src/libCom/ring/epicsRingBytes.h +++ b/src/libCom/ring/epicsRingBytes.h @@ -3,13 +3,16 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. -* EPICS BASE Versions 3.13.7 -* and higher are distributed subject to a Software License Agreement found -* in file LICENSE that is included with this distribution. +* Copyright (c) 2012 ITER Organization. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. \*************************************************************************/ -/*epicsRingBytes.h */ -/* Author: Eric Norum & Marty Kraimer Date: 15JUL99 */ +/* + * Author: Marty Kraimer Date: 15JUL99 + * Eric Norum + * Ralph Lange + */ #ifndef INCepicsRingBytesh #define INCepicsRingBytesh @@ -23,6 +26,8 @@ extern "C" { typedef void *epicsRingBytesId; epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int nbytes); +/* Same, but secured by a spinlock */ +epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesLockedCreate(int nbytes); epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id); epicsShareFunc int epicsShareAPI epicsRingBytesGet( epicsRingBytesId id, char *value,int nbytes); @@ -42,6 +47,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id); /* NOTES If there is only one writer it is not necessary to lock for put If there is a single reader it is not necessary to lock for puts + + epicsRingBytesLocked uses a spinlock. */ #endif /* INCepicsRingBytesh */ diff --git a/src/libCom/ring/epicsRingPointer.cpp b/src/libCom/ring/epicsRingPointer.cpp index 3a5cc8c72..9c144cec1 100644 --- a/src/libCom/ring/epicsRingPointer.cpp +++ b/src/libCom/ring/epicsRingPointer.cpp @@ -3,11 +3,16 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2012 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ -/*epicsRingPointer.cpp*/ -/* Author: Marty Kraimer Date: 13OCT2000 */ + +/* + * Author: Marty Kraimer Date: 13OCT2000 + * Ralph Lange + */ + #include #include @@ -22,7 +27,13 @@ typedef epicsRingPointer voidPointer; epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size) { - voidPointer *pvoidPointer = new voidPointer(size); + voidPointer *pvoidPointer = new voidPointer(size, false); + return(reinterpret_cast(pvoidPointer)); +} + +epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerLockedCreate(int size) +{ + voidPointer *pvoidPointer = new voidPointer(size, true); return(reinterpret_cast(pvoidPointer)); } diff --git a/src/libCom/ring/epicsRingPointer.h b/src/libCom/ring/epicsRingPointer.h index 3332782a4..48d62036d 100644 --- a/src/libCom/ring/epicsRingPointer.h +++ b/src/libCom/ring/epicsRingPointer.h @@ -3,12 +3,15 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2012 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ -/*epicsRingPointer.h */ -/* Author: Marty Kraimer Date: 15JUL99 */ +/* + * Author: Marty Kraimer Date: 15JUL99 + * Ralph Lange + */ #ifndef INCepicsRingPointerh #define INCepicsRingPointerh @@ -16,15 +19,18 @@ /* NOTES * If there is only one writer it is not necessary to lock push * If there is a single reader it is not necessary to lock pop + * + * epicsRingPointerLocked uses a spinlock. */ +#include "epicsSpin.h" #include "shareLib.h" #ifdef __cplusplus template class epicsRingPointer { public: /* Functions */ - epicsRingPointer(int size); + epicsRingPointer(int size, bool locked); ~epicsRingPointer(); bool push(T *p); T* pop(); @@ -42,6 +48,7 @@ private: /* Prevent compiler-generated member functions */ epicsRingPointer& operator=(const epicsRingPointer &); private: /* Data */ + epicsSpinId lock; volatile int nextPush; volatile int nextPop; int size; @@ -54,6 +61,8 @@ extern "C" { typedef void *epicsRingPointerId; epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size); +/* Same, but secured by a spinlock */ +epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerLockedCreate(int size); epicsShareFunc void epicsShareAPI epicsRingPointerDelete(epicsRingPointerId id); /*ringPointerPush returns (0,1) if p (was not, was) put on ring*/ epicsShareFunc int epicsShareAPI epicsRingPointerPush(epicsRingPointerId id,void *p); @@ -85,72 +94,105 @@ epicsShareFunc int epicsShareAPI epicsRingPointerIsFull(epicsRingPointerId id); #ifdef __cplusplus template -inline epicsRingPointer::epicsRingPointer(int sz) : - nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1]) {} +inline epicsRingPointer::epicsRingPointer(int sz, bool locked) : + lock(0), nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1]) +{ + if (locked) + lock = epicsSpinCreate(); +} template inline epicsRingPointer::~epicsRingPointer() -{ delete [] buffer;} +{ + if (lock) epicsSpinDestroy(lock); + delete [] buffer; +} template inline bool epicsRingPointer::push(T *p) { + if (lock) epicsSpinLock(lock); int next = nextPush; int newNext = next + 1; if(newNext>=size) newNext=0; - if(newNext==nextPop) return(false); + if (newNext == nextPop) { + if (lock) epicsSpinUnlock(lock); + return(false); + } buffer[next] = p; nextPush = newNext; + if (lock) epicsSpinUnlock(lock); return(true); } template inline T* epicsRingPointer::pop() { + if (lock) epicsSpinLock(lock); int next = nextPop; - if(next == nextPush) return(0); + if (next == nextPush) { + if (lock) epicsSpinUnlock(lock); + return(0); + } T*p = buffer[next]; ++next; if(next >=size) next = 0; nextPop = next; + if (lock) epicsSpinUnlock(lock); return(p); } template inline void epicsRingPointer::flush() { + if (lock) epicsSpinLock(lock); nextPop = 0; nextPush = 0; + if (lock) epicsSpinUnlock(lock); } template inline int epicsRingPointer::getFree() const { + if (lock) epicsSpinLock(lock); int n = nextPop - nextPush - 1; if (n < 0) n += size; + if (lock) epicsSpinUnlock(lock); return n; } template inline int epicsRingPointer::getUsed() const { + if (lock) epicsSpinLock(lock); int n = nextPush - nextPop; if (n < 0) n += size; + if (lock) epicsSpinUnlock(lock); return n; } template inline int epicsRingPointer::getSize() const -{ return(size-1);} +{ + return(size-1); +} template inline bool epicsRingPointer::isEmpty() const -{ return(nextPush==nextPop);} +{ + bool isEmpty; + if (lock) epicsSpinLock(lock); + isEmpty = (nextPush == nextPop); + if (lock) epicsSpinUnlock(lock); + return isEmpty; +} template inline bool epicsRingPointer::isFull() const { + if (lock) epicsSpinLock(lock); int count = nextPush - nextPop +1; + if (lock) epicsSpinUnlock(lock); return((count == 0) || (count == size)); } From 8abb1ed2557d4b2652a39d7a79467d9321ae519b Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Fri, 29 Mar 2013 17:24:11 +0100 Subject: [PATCH 03/21] libCom/test: add test for thread safe (spinlocked) ringPointer buffer --- src/libCom/test/ringPointerTest.c | 120 ++++++++++++++++++++++++++---- 1 file changed, 107 insertions(+), 13 deletions(-) diff --git a/src/libCom/test/ringPointerTest.c b/src/libCom/test/ringPointerTest.c index fd3171045..e50aeb639 100644 --- a/src/libCom/test/ringPointerTest.c +++ b/src/libCom/test/ringPointerTest.c @@ -3,6 +3,7 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ @@ -26,12 +27,17 @@ #include "testMain.h" #define ringSize 10 +#define consumerCount 4 +#define producerCount 4 static volatile int testExit = 0; +int value[ringSize*2]; typedef struct info { epicsEventId consumerEvent; epicsRingPointerId ring; + int checkOrder; + int value[ringSize*2]; }info; static void consumer(void *arg) @@ -39,16 +45,46 @@ static void consumer(void *arg) info *pinfo = (info *)arg; static int expectedValue=0; int *newvalue; + char myname[20]; - testDiag("Consumer starting"); + epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname)); + testDiag("%s starting", myname); while(1) { epicsEventMustWait(pinfo->consumerEvent); if (testExit) return; - while((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) { - testOk(expectedValue == *newvalue, - "Consumer: %d == %d", expectedValue, *newvalue); - expectedValue = *newvalue + 1; - } + while ((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) { + if (pinfo->checkOrder) { + testOk(expectedValue == *newvalue, + "%s: (got) %d == %d (expected)", myname, *newvalue, expectedValue); + expectedValue = *newvalue + 1; + } else { + testOk(pinfo->value[*newvalue] <= producerCount, "%s: got a %d (%d times seen before)", + myname, *newvalue, pinfo->value[*newvalue]); + } + /* This must be atomic... */ + pinfo->value[*newvalue]++; + epicsThreadSleep(0.05); + } + } +} + +static void producer(void *arg) +{ + info *pinfo = (info *)arg; + char myname[20]; + int i; + + epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname)); + testDiag("%s starting", myname); + for (i=0; iring)) { + epicsThreadSleep(0.2); + if (testExit) return; + } + testOk(epicsRingPointerPush(pinfo->ring, (void *)&value[i]), + "%s: Pushing %d, ring not full", myname, i); + epicsEventSignal(pinfo->consumerEvent); + if (testExit) return; } } @@ -57,21 +93,27 @@ MAIN(ringPointerTest) int i; info *pinfo; epicsEventId consumerEvent; - int value[ringSize*2]; int *pgetValue; epicsRingPointerId ring; epicsThreadId tid; + char threadName[20]; - testPlan(54); + testPlan(256); for (i=0; iconsumerEvent = consumerEvent = epicsEventMustCreate(epicsEventEmpty); if (!consumerEvent) { testAbort("epicsEventMustCreate failed"); } + testDiag("******************************************************"); + testDiag("** Test 1: local ring pointer, check size and order **"); + testDiag("******************************************************"); + pinfo->ring = ring = epicsRingPointerCreate(ringSize); if (!ring) { testAbort("epicsRingPointerCreate failed"); @@ -86,22 +128,74 @@ MAIN(ringPointerTest) } testOk(epicsRingPointerIsEmpty(ring), "Ring empty"); + testDiag("**************************************************************"); + testDiag("** Test 2: unlocked ring pointer, one consumer, check order **"); + testDiag("**************************************************************"); + + pinfo->checkOrder = 1; tid=epicsThreadCreate("consumer", 50, epicsThreadGetStackSize(epicsThreadStackSmall), consumer, pinfo); if(!tid) testAbort("epicsThreadCreate failed"); - epicsThreadSleep(0.1); + epicsThreadSleep(0.2); for (i=0; ivalue[i] == 1, "Value test: %d was processed", i); + } + + testExit = 1; + epicsEventSignal(consumerEvent); + epicsThreadSleep(1.0); + + epicsRingPointerDelete(pinfo->ring); + + testDiag("*************************************************************************************"); + testDiag("** Test 3: locked ring pointer, many consumers, many producers, check no of copies **"); + testDiag("*************************************************************************************"); + + pinfo->ring = ring = epicsRingPointerLockedCreate(ringSize); + if (!ring) { + testAbort("epicsRingPointerLockedCreate failed"); + } + testOk(epicsRingPointerIsEmpty(ring), "Ring empty"); + + for (i=0; ivalue[i] = 0; + testExit = 0; + pinfo->checkOrder = 0; + for (i=0; ivalue[i] == producerCount, "Value test: %d was processed %d times", i, producerCount); + } + testExit = 1; epicsEventSignal(consumerEvent); epicsThreadSleep(1.0); From c1318e7d5528eb93c8967e93d5e40476329da8f7 Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Fri, 29 Mar 2013 17:27:56 +0100 Subject: [PATCH 04/21] ioc/db: add support for parallel callback threads - use thread safe (spinlocked) callback queues - add callbackParallelThreads() configuration call (with iocShell binding) - add callbackParallelThreadsDefault variable, preset to number of CPUs - remove calls to epicsInterruptLock() --- src/ioc/db/callback.c | 108 +++++++++++++++++++++++++++++-------- src/ioc/db/callback.h | 2 + src/ioc/db/dbIocRegister.c | 18 +++++++ src/ioc/misc/dbCore.dbd | 2 + 4 files changed, 109 insertions(+), 21 deletions(-) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index 4e364b181..91e29243c 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -3,8 +3,9 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found -* in file LICENSE that is included with this distribution. +* in file LICENSE that is included with this distribution. \*************************************************************************/ /* callback.c */ @@ -25,6 +26,7 @@ #include "epicsThread.h" #include "epicsExit.h" #include "epicsInterrupt.h" +#include "epicsString.h" #include "epicsTimer.h" #include "epicsRingPointer.h" #include "errlog.h" @@ -36,6 +38,7 @@ #include "taskwd.h" #include "errMdef.h" #include "dbCommon.h" +#include "epicsExport.h" #define epicsExportSharedSymbols #include "dbAddr.h" #include "dbAccessDefs.h" @@ -49,6 +52,12 @@ static epicsEventId callbackSem[NUM_CALLBACK_PRIORITIES]; static epicsRingPointerId callbackQ[NUM_CALLBACK_PRIORITIES]; static volatile int ringOverflow[NUM_CALLBACK_PRIORITIES]; +/* Parallel callback threads (configured and actual counts) */ +static int callbackThreadCount[NUM_CALLBACK_PRIORITIES] = { 1, 1, 1 }; +static int callbackThreadsRunning[NUM_CALLBACK_PRIORITIES]; +int callbackParallelThreadsDefault = 2; +epicsExportAddress(int,callbackParallelThreadsDefault); + /* Timer for Delayed Requests */ static epicsTimerQueueId timerQueue; @@ -57,7 +66,7 @@ static epicsEventId startStopEvent; static void *exitCallback; /* Static data */ -static char *threadName[NUM_CALLBACK_PRIORITIES] = { +static char *threadNamePrefix[NUM_CALLBACK_PRIORITIES] = { "cbLow", "cbMedium", "cbHigh" }; static unsigned int threadPriority[NUM_CALLBACK_PRIORITIES] = { @@ -78,6 +87,54 @@ int callbackSetQueueSize(int size) return 0; } +int callbackParallelThreads(int count, const char *prio) +{ + int i; + dbMenu *pdbMenu; + int gotMatch; + + if (callbackOnceFlag != EPICS_THREAD_ONCE_INIT) { + errlogPrintf("Callback system already initialized\n"); + return -1; + } + + if (count < 0) + count = epicsThreadGetCPUs() + count; + else if (count == 0) + count = callbackParallelThreadsDefault; + + if (!prio || strcmp(prio, "") == 0 || strcmp(prio, "*") == 0) { + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { + callbackThreadCount[i] = count; + } + } else { + if (!pdbbase) { + errlogPrintf("pdbbase not specified\n"); + return -1; + } + /* Find prio in menuPriority */ + pdbMenu = (dbMenu *)ellFirst(&pdbbase->menuList); + while (pdbMenu) { + gotMatch = (strcmp("menuPriority", pdbMenu->name)==0) ? TRUE : FALSE; + if (gotMatch) { + for (i = 0; i < pdbMenu->nChoice; i++) { + gotMatch = (epicsStrCaseCmp(prio, pdbMenu->papChoiceValue[i])==0) ? TRUE : FALSE; + if (gotMatch) break; + } + if (gotMatch) { + callbackThreadCount[i] = count; + break; + } else { + errlogPrintf("Unknown priority \"%s\"\n", prio); + return -1; + } + } + pdbMenu = (dbMenu *)ellNext(&pdbMenu->node); + } + } + return 0; +} + static void callbackTask(void *arg) { int priority = *(int *)arg; @@ -106,36 +163,48 @@ static void callbackShutdown(void *arg) int i; for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { - int lockKey = epicsInterruptLock(); - int ok = epicsRingPointerPush(callbackQ[i], &exitCallback); - epicsInterruptUnlock(lockKey); - epicsEventSignal(callbackSem[i]); - if (ok) epicsEventWait(startStopEvent); + while (callbackThreadsRunning[i]--) { + int ok = epicsRingPointerPush(callbackQ[i], &exitCallback); + epicsEventSignal(callbackSem[i]); + if (ok) epicsEventWait(startStopEvent); + } } } static void callbackInitOnce(void *arg) { int i; + int j; + char threadName[32]; startStopEvent = epicsEventMustCreate(epicsEventEmpty); timerQueue = epicsTimerQueueAllocate(0,epicsThreadPriorityScanHigh); + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { epicsThreadId tid; callbackSem[i] = epicsEventMustCreate(epicsEventEmpty); - callbackQ[i] = epicsRingPointerCreate(callbackQueueSize); + callbackQ[i] = epicsRingPointerLockedCreate(callbackQueueSize); if (callbackQ[i] == 0) - cantProceed("epicsRingPointerCreate failed for %s\n", - threadName[i]); + cantProceed("epicsRingPointerLockedCreate failed for %s\n", + threadNamePrefix[i]); ringOverflow[i] = FALSE; - tid = epicsThreadCreate(threadName[i], threadPriority[i], - epicsThreadGetStackSize(epicsThreadStackBig), - (EPICSTHREADFUNC)callbackTask, &priorityValue[i]); - if (tid == 0) - cantProceed("Failed to spawn callback task %s\n", threadName[i]); - else - epicsEventWait(startStopEvent); + + for (j = 0; j < callbackThreadCount[i]; j++) { + if (callbackThreadCount[i] > 1 ) + sprintf(threadName, "%s-%d", threadNamePrefix[i], j); + else + strcpy(threadName, threadNamePrefix[i]); + tid = epicsThreadCreate(threadName, threadPriority[i], + epicsThreadGetStackSize(epicsThreadStackBig), + (EPICSTHREADFUNC)callbackTask, &priorityValue[i]); + if (tid == 0) { + cantProceed("Failed to spawn callback thread %s\n", threadName); + } else { + epicsEventWait(startStopEvent); + callbackThreadsRunning[i]++; + } + } } epicsAtExit(callbackShutdown, NULL); } @@ -150,7 +219,6 @@ void callbackRequest(CALLBACK *pcallback) { int priority; int pushOK; - int lockKey; if (!pcallback) { epicsInterruptContextMessage("callbackRequest: pcallback was NULL\n"); @@ -163,14 +231,12 @@ void callbackRequest(CALLBACK *pcallback) } if (ringOverflow[priority]) return; - lockKey = epicsInterruptLock(); pushOK = epicsRingPointerPush(callbackQ[priority], pcallback); - epicsInterruptUnlock(lockKey); if (!pushOK) { char msg[48] = "callbackRequest: "; - strcat(msg, threadName[priority]); + strcat(msg, threadNamePrefix[priority]); strcat(msg, " ring buffer full\n"); epicsInterruptContextMessage(msg); ringOverflow[priority] = TRUE; diff --git a/src/ioc/db/callback.h b/src/ioc/db/callback.h index 1b539c13e..d29031496 100644 --- a/src/ioc/db/callback.h +++ b/src/ioc/db/callback.h @@ -3,6 +3,7 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ @@ -67,6 +68,7 @@ epicsShareFunc void callbackCancelDelayed(CALLBACK *pcallback); epicsShareFunc void callbackRequestProcessCallbackDelayed( CALLBACK *pCallback, int Priority, void *pRec, double seconds); epicsShareFunc int callbackSetQueueSize(int size); +epicsShareFunc int callbackParallelThreads(int count, const char *prio); #ifdef __cplusplus } diff --git a/src/ioc/db/dbIocRegister.c b/src/ioc/db/dbIocRegister.c index 0de2d7172..7ef15ac1f 100644 --- a/src/ioc/db/dbIocRegister.c +++ b/src/ioc/db/dbIocRegister.c @@ -23,6 +23,8 @@ #include "dbIocRegister.h" #include "dbState.h" +epicsShareDef int callbackParallelThreadsDefault; + /* dbLoadDatabase */ static const iocshArg dbLoadDatabaseArg0 = { "file name",iocshArgString}; static const iocshArg dbLoadDatabaseArg1 = { "path",iocshArgString}; @@ -298,6 +300,18 @@ static void callbackSetQueueSizeCallFunc(const iocshArgBuf *args) callbackSetQueueSize(args[0].ival); } +/* callbackParallelThreads */ +static const iocshArg callbackParallelThreadsArg0 = { "no of threads", iocshArgInt}; +static const iocshArg callbackParallelThreadsArg1 = { "priority", iocshArgString}; +static const iocshArg * const callbackParallelThreadsArgs[2] = + {&callbackParallelThreadsArg0,&callbackParallelThreadsArg1}; +static const iocshFuncDef callbackParallelThreadsFuncDef = + {"callbackParallelThreads",2,callbackParallelThreadsArgs}; +static void callbackParallelThreadsCallFunc(const iocshArgBuf *args) +{ + callbackParallelThreads(args[0].ival, args[1].sval); +} + /* dbStateCreate */ static const iocshArg dbStateArgName = { "name", iocshArgString }; static const iocshArg * const dbStateCreateArgs[] = { &dbStateArgName }; @@ -394,6 +408,10 @@ void dbIocRegister(void) iocshRegister(&scanpiolFuncDef,scanpiolCallFunc); iocshRegister(&callbackSetQueueSizeFuncDef,callbackSetQueueSizeCallFunc); + iocshRegister(&callbackParallelThreadsFuncDef,callbackParallelThreadsCallFunc); + + /* Needed before callback system is initialized */ + callbackParallelThreadsDefault = epicsThreadGetCPUs(); iocshRegister(&dbStateCreateFuncDef, dbStateCreateCallFunc); iocshRegister(&dbStateSetFuncDef, dbStateSetCallFunc); diff --git a/src/ioc/misc/dbCore.dbd b/src/ioc/misc/dbCore.dbd index eae942b18..4a5ec96ba 100644 --- a/src/ioc/misc/dbCore.dbd +++ b/src/ioc/misc/dbCore.dbd @@ -12,3 +12,5 @@ variable(asCaDebug,int) variable(dbRecordsOnceOnly,int) variable(dbBptNotMonotonic,int) +# Default number of parallel callback threads +variable(callbackParallelThreadsDefault,int) From ddadd9b62ea2956b6457b6c2e397c784456d17dc Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Fri, 29 Mar 2013 17:28:26 +0100 Subject: [PATCH 05/21] ioc/db/test: add test for parallel callback threads --- src/ioc/db/test/Makefile | 5 + src/ioc/db/test/callbackParallelTest.c | 184 +++++++++++++++++++++++++ 2 files changed, 189 insertions(+) create mode 100644 src/ioc/db/test/callbackParallelTest.c diff --git a/src/ioc/db/test/Makefile b/src/ioc/db/test/Makefile index 2f428aa5b..0027b1447 100644 --- a/src/ioc/db/test/Makefile +++ b/src/ioc/db/test/Makefile @@ -21,6 +21,11 @@ callbackTest_SRCS += callbackTest.c testHarness_SRCS += callbackTest.c TESTS += callbackTest +TESTPROD_HOST += callbackParallelTest +callbackParallelTest_SRCS += callbackParallelTest.c +testHarness_SRCS += callbackParallelTest.c +TESTS += callbackParallelTest + TESTPROD_HOST += dbStateTest dbStateTest_SRCS += dbStateTest.c testHarness_SRCS += dbStateTest.c diff --git a/src/ioc/db/test/callbackParallelTest.c b/src/ioc/db/test/callbackParallelTest.c new file mode 100644 index 000000000..d07b28346 --- /dev/null +++ b/src/ioc/db/test/callbackParallelTest.c @@ -0,0 +1,184 @@ +/*************************************************************************\ +* Copyright (c) 2008 UChicago Argonne LLC, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2002 The Regents of the University of California, as +* Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* $Revision-Id$ */ + +/* Author: Marty Kraimer Date: 26JAN2000 */ + +#include +#include +#include +#include +#include +#include + +#include "callback.h" +#include "cantProceed.h" +#include "epicsThread.h" +#include "epicsEvent.h" +#include "epicsTime.h" +#include "epicsUnitTest.h" +#include "testMain.h" + +/* + * This test checks both immediate and delayed callbacks in two steps. + * In the first step (pass1) NCALLBACKS immediate callbacks are queued. + * As each is run it starts a second delayed callback (pass2). + * The last delayed callback which runs signals an epicsEvent + * to the main thread. + * + * Two time intervals are measured. The time to queue and run each of + * the immediate callbacks, and the actual delay of the delayed callback. + */ + +#define NCALLBACKS 169 +#define DELAY_QUANTUM 0.25 + +#define TEST_DELAY(i) ((i / NUM_CALLBACK_PRIORITIES) * DELAY_QUANTUM) + +typedef struct myPvt { + CALLBACK cb1; + CALLBACK cb2; + epicsTimeStamp pass1Time; + epicsTimeStamp pass2Time; + double delay; + int pass; + int resultFail; +} myPvt; + +epicsEventId finished; + +static void myCallback(CALLBACK *pCallback) +{ + myPvt *pmyPvt; + + callbackGetUser(pmyPvt, pCallback); + + pmyPvt->pass++; + + if (pmyPvt->pass == 1) { + epicsTimeGetCurrent(&pmyPvt->pass1Time); + callbackRequestDelayed(&pmyPvt->cb2, pmyPvt->delay); + } else if (pmyPvt->pass == 2) { + epicsTimeGetCurrent(&pmyPvt->pass2Time); + } else { + pmyPvt->resultFail = 1; + return; + } +} + +static void finalCallback(CALLBACK *pCallback) +{ + myCallback(pCallback); + epicsEventSignal(finished); +} + +static void updateStats(double *stats, double val) +{ + if (stats[0] > val) stats[0] = val; + if (stats[1] < val) stats[1] = val; + stats[2] += val; + stats[3] += pow(val, 2.0); + stats[4] += 1.; +} + +static void printStats(double *stats, const char* tag) { + testDiag("Priority %4s min/avg/max/sigma = %f / %f / %f / %f", + tag, stats[0], stats[2]/stats[4], stats[1], + sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]); +} + +MAIN(callbackParallelTest) +{ + myPvt *pcbt[NCALLBACKS]; + epicsTimeStamp start; + int noCpus = epicsThreadGetCPUs(); + int i, j; + /* Statistics: min/max/sum/sum^2/n for each priority */ + double setupError[NUM_CALLBACK_PRIORITIES][5]; + double timeError[NUM_CALLBACK_PRIORITIES][5]; + double defaultError[5] = {1,-1,0,0,0}; + + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) + for (j = 0; j < 5; j++) + setupError[i][j] = timeError[i][j] = defaultError[j]; + + testPlan(NCALLBACKS * 2 + 1); + + testDiag("Starting %d parallel callback threads", noCpus); + + callbackParallelThreads(noCpus, ""); + callbackInit(); + epicsThreadSleep(1.0); + + finished = epicsEventMustCreate(epicsEventEmpty); + + for (i = 0; i < NCALLBACKS ; i++) { + pcbt[i] = callocMustSucceed(1, sizeof(myPvt), "pcbt"); + callbackSetCallback(myCallback, &pcbt[i]->cb1); + callbackSetCallback(myCallback, &pcbt[i]->cb2); + callbackSetUser(pcbt[i], &pcbt[i]->cb1); + callbackSetUser(pcbt[i], &pcbt[i]->cb2); + callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb1); + callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb2); + pcbt[i]->delay = TEST_DELAY(i); + pcbt[i]->pass = 0; + } + + /* Last callback is special */ + callbackSetCallback(finalCallback, &pcbt[NCALLBACKS-1]->cb2); + callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb1); + callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb2); + pcbt[NCALLBACKS-1]->delay = TEST_DELAY(NCALLBACKS) + 1.0; + pcbt[NCALLBACKS-1]->pass = 0; + + testOk1(epicsTimeGetCurrent(&start)==epicsTimeOK); + + for (i = 0; i < NCALLBACKS ; i++) { + callbackRequest(&pcbt[i]->cb1); + } + + testDiag("Waiting %.02f sec", pcbt[NCALLBACKS-1]->delay); + + epicsEventWait(finished); + + for (i = 0; i < NCALLBACKS ; i++) { + if(pcbt[i]->resultFail || pcbt[i]->pass!=2) + testFail("pass = %d for delay = %f", pcbt[i]->pass, pcbt[i]->delay); + else { + double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start); + testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05", + pcbt[i]->delay, delta); + updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta); + } + } + + for (i = 0; i < NCALLBACKS ; i++) { + double delta, error; + if(pcbt[i]->resultFail || pcbt[i]->pass!=2) + continue; + delta = epicsTimeDiffInSeconds(&pcbt[i]->pass2Time, &pcbt[i]->pass1Time); + error = delta - pcbt[i]->delay; + testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05", + pcbt[i]->delay, error); + updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error); + } + + testDiag("Setup time statistics"); + printStats(setupError[0], "LOW"); + printStats(setupError[1], "MID"); + printStats(setupError[2], "HIGH"); + + testDiag("Delay time statistics"); + printStats(timeError[0], "LOW"); + printStats(timeError[1], "MID"); + printStats(timeError[2], "HIGH"); + + return testDone(); +} From 15415b5590c15b337f73400daaad5f8f76bd560b Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Mon, 25 Aug 2014 14:40:37 -0700 Subject: [PATCH 06/21] Merge from parallel-cbthreads-2: return value for callbackRequest, add tests --- src/ioc/db/callback.c | 21 +- src/ioc/db/callback.h | 4 +- src/ioc/db/dbAccessDefs.h | 2 + src/ioc/db/dbScan.c | 30 +- src/ioc/db/dbScan.h | 5 +- src/ioc/db/test/Makefile | 13 + src/ioc/db/test/epicsRunDbTests.c | 1 + src/ioc/db/test/scanIoTest.c | 519 ++++++++++++++++++++++++++++++ src/ioc/db/test/scanIoTest.db | 4 + src/ioc/db/test/yRecord.dbd | 10 + 10 files changed, 594 insertions(+), 15 deletions(-) create mode 100644 src/ioc/db/test/scanIoTest.c create mode 100644 src/ioc/db/test/scanIoTest.db create mode 100644 src/ioc/db/test/yRecord.dbd diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index 719f5ced6..a3ae657df 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -146,7 +146,12 @@ static void callbackTask(void *arg) while(TRUE) { void *ptr; epicsEventMustWait(callbackSem[priority]); - while((ptr = epicsRingPointerPop(callbackQ[priority]))) { + while ((ptr = epicsRingPointerPop(callbackQ[priority]))) { + /* Retrigger if there are more threads and more work */ + if (callbackThreadsRunning[priority] > 1 + && !epicsRingPointerIsEmpty(callbackQ[priority])) { + epicsEventTrigger(callbackSem[priority]); + } CALLBACK *pcallback = (CALLBACK *)ptr; if (ptr == &exitCallback) goto shutdown; ringOverflow[priority] = FALSE; @@ -220,21 +225,21 @@ void callbackInit(void) } /* This routine can be called from interrupt context */ -void callbackRequest(CALLBACK *pcallback) +int callbackRequest(CALLBACK *pcallback) { int priority; int pushOK; if (!pcallback) { epicsInterruptContextMessage("callbackRequest: pcallback was NULL\n"); - return; + return S_db_notInit; } priority = pcallback->priority; if (priority < 0 || priority >= NUM_CALLBACK_PRIORITIES) { epicsInterruptContextMessage("callbackRequest: Bad priority\n"); - return; + return S_db_badChoice; } - if (ringOverflow[priority]) return; + if (ringOverflow[priority]) return S_db_bufFull; pushOK = epicsRingPointerPush(callbackQ[priority], pcallback); @@ -245,8 +250,10 @@ void callbackRequest(CALLBACK *pcallback) strcat(msg, " ring buffer full\n"); epicsInterruptContextMessage(msg); ringOverflow[priority] = TRUE; + return S_db_bufFull; } epicsEventSignal(callbackSem[priority]); + return 0; } static void ProcessCallback(CALLBACK *pcallback) @@ -267,11 +274,11 @@ void callbackSetProcess(CALLBACK *pcallback, int Priority, void *pRec) callbackSetUser(pRec, pcallback); } -void callbackRequestProcessCallback(CALLBACK *pcallback, +int callbackRequestProcessCallback(CALLBACK *pcallback, int Priority, void *pRec) { callbackSetProcess(pcallback, Priority, pRec); - callbackRequest(pcallback); + return callbackRequest(pcallback); } static void notify(void *pPrivate) diff --git a/src/ioc/db/callback.h b/src/ioc/db/callback.h index ae06406b5..2732f9d5e 100644 --- a/src/ioc/db/callback.h +++ b/src/ioc/db/callback.h @@ -57,11 +57,11 @@ typedef void (*CALLBACKFUNC)(struct callbackPvt*); ( (USER) = (void *)((CALLBACK *)(PCALLBACK))->user ) epicsShareFunc void callbackInit(void); -epicsShareFunc void callbackRequest(CALLBACK *pCallback); epicsShareFunc void callbackShutdown(void); +epicsShareFunc int callbackRequest(CALLBACK *pCallback); epicsShareFunc void callbackSetProcess( CALLBACK *pcallback, int Priority, void *pRec); -epicsShareFunc void callbackRequestProcessCallback( +epicsShareFunc int callbackRequestProcessCallback( CALLBACK *pCallback,int Priority, void *pRec); epicsShareFunc void callbackRequestDelayed( CALLBACK *pCallback,double seconds); diff --git a/src/ioc/db/dbAccessDefs.h b/src/ioc/db/dbAccessDefs.h index ba794a520..31ffc18c2 100644 --- a/src/ioc/db/dbAccessDefs.h +++ b/src/ioc/db/dbAccessDefs.h @@ -199,6 +199,8 @@ struct dbr_alDouble {DBRalDouble}; #define S_db_cntSpwn (M_dbAccess|63) /*Cannot spawn dbContTask*/ #define S_db_cntCont (M_dbAccess|65) /*Cannot resume dbContTask*/ #define S_db_noMemory (M_dbAccess|66) /*unable to allocate data structure from pool*/ +#define S_db_notInit (M_dbAccess|67) /*Not initialized*/ +#define S_db_bufFull (M_dbAccess|68) /*Buffer full*/ epicsShareFunc long dbPutSpecial(struct dbAddr *paddr,int pass); epicsShareFunc struct rset * dbGetRset(const struct dbAddr *paddr); diff --git a/src/ioc/db/dbScan.c b/src/ioc/db/dbScan.c index a1ba12d9a..7f2337178 100644 --- a/src/ioc/db/dbScan.c +++ b/src/ioc/db/dbScan.c @@ -122,6 +122,8 @@ typedef struct io_scan_list { CALLBACK callback; scan_list scan_list; struct io_scan_list *next; + io_scan_complete cb; + void * arg; } io_scan_list; static io_scan_list *iosl_head[NUM_CALLBACK_PRIORITIES] = { @@ -507,19 +509,31 @@ void scanIoInit(IOSCANPVT *ppioscanpvt) } } - -void scanIoRequest(IOSCANPVT pioscanpvt) +/* return a bit mask indicating each prioity level + * in which a callback request was queued. + */ +unsigned int scanIoRequest(IOSCANPVT pioscanpvt) { int prio; + unsigned int queued = 0; - if (scanCtl != ctlRun) return; + if (scanCtl != ctlRun) return 0; for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { io_scan_list *piosl = &pioscanpvt[prio]; if (ellCount(&piosl->scan_list.list) > 0) - callbackRequest(&piosl->callback); + if(!callbackRequest(&piosl->callback)) + queued |= 1<cb = cb; + pioscanpvt->arg = arg; +} + void scanOnce(struct dbCommon *precord) { static int newOverflow = TRUE; @@ -747,9 +761,15 @@ static void spawnPeriodic(int ind) static void ioeventCallback(CALLBACK *pcallback) { io_scan_list *piosl; + io_scan_list *pioslLow; callbackGetUser(piosl, pcallback); scanList(&piosl->scan_list); + pioslLow = piosl - pcallback->priority; + if(pioslLow->cb) + (*pioslLow->cb)(pioslLow->arg, + pioslLow, + pcallback->priority); } static void printList(scan_list *psl, char *message) diff --git a/src/ioc/db/dbScan.h b/src/ioc/db/dbScan.h index 03117f769..42ba46c37 100644 --- a/src/ioc/db/dbScan.h +++ b/src/ioc/db/dbScan.h @@ -39,6 +39,8 @@ struct io_scan_list; typedef struct io_scan_list *IOSCANPVT; typedef struct event_list *EVENTPVT; +typedef void (*io_scan_complete)(void *, IOSCANPVT, int); + struct dbCommon; epicsShareFunc long scanInit(void); @@ -65,7 +67,8 @@ epicsShareFunc int scanpel(const char *event_name); epicsShareFunc int scanpiol(void); epicsShareFunc void scanIoInit(IOSCANPVT *); -epicsShareFunc void scanIoRequest(IOSCANPVT); +epicsShareFunc unsigned int scanIoRequest(IOSCANPVT); +epicsShareFunc void scanIoSetComplete(IOSCANPVT, io_scan_complete, void*); #ifdef __cplusplus } diff --git a/src/ioc/db/test/Makefile b/src/ioc/db/test/Makefile index dd98ffa9e..fa186a601 100644 --- a/src/ioc/db/test/Makefile +++ b/src/ioc/db/test/Makefile @@ -65,6 +65,18 @@ dbStateTest_SRCS += dbStateTest.c testHarness_SRCS += dbStateTest.c TESTS += dbStateTest +TARGETS += $(COMMON_DIR)/scanIoTest.dbd +scanIoTest_DBD += menuGlobal.dbd +scanIoTest_DBD += menuConvert.dbd +scanIoTest_DBD += yRecord.dbd +TESTPROD_HOST += scanIoTest +scanIoTest_SRCS += scanIoTest.c +scanIoTest_SRCS += scanIoTest_registerRecordDeviceDriver.cpp +testHarness_SRCS += scanIoTest.c +testHarness_SRCS += scanIoTest_registerRecordDeviceDriver.cpp +TESTFILES += $(COMMON_DIR)/scanIoTest.dbd ../scanIoTest.db +TESTS += scanIoTest + TESTPROD_HOST += dbChannelTest dbChannelTest_SRCS += dbChannelTest.c dbChannelTest_SRCS += dbTestIoc_registerRecordDeviceDriver.cpp @@ -109,3 +121,4 @@ TESTSCRIPTS_HOST += $(TESTS:%=%.t) include $(TOP)/configure/RULES xRecord$(DEP): $(COMMON_DIR)/xRecord.h +scanIoTest$(DEP): $(COMMON_DIR)/yRecord.h diff --git a/src/ioc/db/test/epicsRunDbTests.c b/src/ioc/db/test/epicsRunDbTests.c index ae150b55a..0217436db 100644 --- a/src/ioc/db/test/epicsRunDbTests.c +++ b/src/ioc/db/test/epicsRunDbTests.c @@ -35,6 +35,7 @@ void epicsRunDbTests(void) runTest(callbackTest); runTest(dbStateTest); runTest(dbShutdownTest); + runTest(scanIoTest); runTest(dbLockTest); runTest(dbPutLinkTest); runTest(testDbChannel); diff --git a/src/ioc/db/test/scanIoTest.c b/src/ioc/db/test/scanIoTest.c new file mode 100644 index 000000000..b4d2b0d25 --- /dev/null +++ b/src/ioc/db/test/scanIoTest.c @@ -0,0 +1,519 @@ +/*************************************************************************\ +* Copyright (c) 2013 UChicago Argonne LLC, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2013 Helmholtz-Zentrum Berlin +* für Materialien und Energie GmbH. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +/* + * Author: Ralph Lange + */ + +#include + +#include "epicsEvent.h" +#include "epicsMessageQueue.h" +#include "epicsPrint.h" +#include "epicsMath.h" +#include "alarm.h" +#include "menuPriority.h" +#include "dbChannel.h" +#include "dbStaticLib.h" +#include "dbAccessDefs.h" +#include "dbScan.h" +#include "dbLock.h" +#include "dbUnitTest.h" +#include "dbCommon.h" +#include "registry.h" +#include "registryRecordType.h" +#include "registryDeviceSupport.h" +#include "recSup.h" +#include "devSup.h" +#include "iocInit.h" +#include "callback.h" +#include "ellLib.h" +#include "epicsUnitTest.h" +#include "testMain.h" +#include "osiFileName.h" + +#define GEN_SIZE_OFFSET +#include "yRecord.h" + +#include "epicsExport.h" + +#define ONE_THREAD_LOOPS 101 +#define PAR_THREAD_LOOPS 53 +#define CB_THREAD_LOOPS 13 + +#define NO_OF_THREADS 7 +#define NO_OF_MEMBERS 5 +#define NO_OF_GROUPS 11 + +#define NO_OF_MID_THREADS 3 + +static int noOfGroups = NO_OF_GROUPS; +static int noOfIoscans = NO_OF_GROUPS; + +static IOSCANPVT *ioscanpvt; /* Soft interrupt sources */ +static ELLLIST *pvtList; /* Per group private part lists */ + +static int executionOrder; +static int orderFail; +static int testNo; +static epicsMessageQueueId *mq; /* Per group message queue */ +static epicsEventId *barrier; /* Per group barrier event */ +static int *cbCounter; + +struct pvtY { + ELLNODE node; + yRecord *prec; + int group; + int member; + int count; + int processed; + int callback; +}; + +/* test2: priority and ioscan index for each group + * used priorities are expressed in the bit pattern of (ioscan index + 1) */ +struct groupItem { + int prio; + int ioscan; +} groupTable[12] = { + { 0, 0 }, + { 1, 1 }, + { 0, 2 }, { 1, 2 }, + { 2, 3 }, + { 0, 4 }, { 2, 4 }, + { 1, 5 }, { 2, 5 }, + { 0, 6 }, { 1, 6 }, { 2, 6 } +}; +static int recsProcessed = 1; +static int noDoubleCallback = 1; + +void scanIoTest_registerRecordDeviceDriver(struct dbBase *); + +long count_bits(long n) { + unsigned int c; // c accumulates the total bits set in v + for (c = 0; n; c++) + n &= n - 1; // clear the least significant bit set + return c; +} + +/*************************************************************************\ +* yRecord: minimal record needed to test I/O Intr scanning +\*************************************************************************/ + +static long get_ioint_info(int cmd, yRecord *prec, IOSCANPVT *ppvt) +{ + struct pvtY *pvt = (struct pvtY *)(prec->dpvt); + + if (testNo == 2) + *ppvt = ioscanpvt[groupTable[pvt->group].ioscan]; + else + *ppvt = ioscanpvt[pvt->group]; + return 0; +} + +struct ydset { + long number; + DEVSUPFUN report; + DEVSUPFUN init; + DEVSUPFUN init_record; + DEVSUPFUN get_ioint_info; + DEVSUPFUN process; +} devY = { + 5, + NULL, + NULL, + NULL, + get_ioint_info, + NULL +}; +epicsExportAddress(dset, devY); + +static long init_record(yRecord *prec, int pass) +{ + struct pvtY *pvt; + + if (pass == 0) return 0; + + pvt = (struct pvtY *) calloc(1, sizeof(struct pvtY)); + prec->dpvt = pvt; + + pvt->prec = prec; + sscanf(prec->name, "g%dm%d", &pvt->group, &pvt->member); + ellAdd(&pvtList[pvt->group], &pvt->node); + + return 0; +} + +static long process(yRecord *prec) +{ + struct pvtY *pvt = (struct pvtY *)(prec->dpvt); + + if (testNo == 0) { + // Single callback thread + if (executionOrder != pvt->member) { + orderFail = 1; + } + pvt->count++; + if (++executionOrder == NO_OF_MEMBERS) executionOrder = 0; + } else { + pvt->count++; + if (pvt->member == 0) { + epicsMessageQueueSend(mq[pvt->group], NULL, 0); + epicsEventMustWait(barrier[pvt->group]); + } + } + pvt->processed = 1; + return 0; +} + +rset yRSET={ + 4, + NULL, //report, + NULL, //initialize, + init_record, + process +}; +epicsExportAddress(rset, yRSET); + +static void startMockIoc(void) { + char substitutions[256]; + int i, j; + char *prio[] = { "LOW", "MEDIUM", "HIGH" }; + + if (testNo == 2) { + noOfGroups = 12; + noOfIoscans = 7; + } + ioscanpvt = calloc(noOfIoscans, sizeof(IOSCANPVT)); + mq = calloc(noOfGroups, sizeof(epicsMessageQueueId)); + barrier = calloc(noOfGroups, sizeof(epicsEventId)); + pvtList = calloc(noOfGroups, sizeof(ELLLIST)); + cbCounter = calloc(noOfGroups, sizeof(int)); + + if (dbReadDatabase(&pdbbase, "scanIoTest.dbd", + "." OSI_PATH_LIST_SEPARATOR ".." OSI_PATH_LIST_SEPARATOR + "../O.Common" OSI_PATH_LIST_SEPARATOR "O.Common", NULL)) + testAbort("Error reading database description 'scanIoTest.dbd'"); + + callbackParallelThreads(1, "Low"); + callbackParallelThreads(NO_OF_MID_THREADS, "Medium"); + callbackParallelThreads(NO_OF_THREADS, "High"); + + for (i = 0; i < noOfIoscans; i++) { + scanIoInit(&ioscanpvt[i]); + } + + for (i = 0; i < noOfGroups; i++) { + mq[i] = epicsMessageQueueCreate(NO_OF_MEMBERS, 1); + barrier[i] = epicsEventMustCreate(epicsEventEmpty); + ellInit(&pvtList[i]); + } + + scanIoTest_registerRecordDeviceDriver(pdbbase); + for (i = 0; i < noOfGroups; i++) { + for (j = 0; j < NO_OF_MEMBERS; j++) { + sprintf(substitutions, "GROUP=%d,MEMBER=%d,PRIO=%s", i, j, + testNo==0?"LOW":(testNo==1?"HIGH":prio[groupTable[i].prio])); + if (dbReadDatabase(&pdbbase, "scanIoTest.db", + "." OSI_PATH_LIST_SEPARATOR "..", substitutions)) + testAbort("Error reading test database 'scanIoTest.db'"); + } + } + + testIocInitOk(); +} + +static void stopMockIoc(void) { + int i; + + testIocShutdownOk(); + epicsThreadSleep(0.1); + for (i = 0; i < noOfGroups; i++) { + epicsMessageQueueDestroy(mq[i]); mq[i] = NULL; + epicsEventDestroy(barrier[i]); barrier[i] = NULL; + ellFree(&pvtList[i]); + } + free(mq); + free(barrier); + free(pvtList); + free(cbCounter); + testdbCleanup(); +} + +static void checkProcessed(void *user, IOSCANPVT ioscan, int prio) { + struct pvtY *pvt; + int group = -1; + int i; + + for (i = 0; i < noOfGroups; i++) { + if (ioscanpvt[groupTable[i].ioscan] == ioscan + && groupTable[i].prio == prio) { + group = i; + break; + } + } + if (group == -1) + testAbort("invalid ioscanpvt in scanio callback"); + + cbCounter[group]++; + for (pvt = (struct pvtY *)ellFirst(&pvtList[group]); + pvt; + pvt = (struct pvtY *)ellNext(&pvt->node)) { + if (pvt->callback == 1) { + testDiag("callback for rec %s arrived twice\n", pvt->prec->name); + noDoubleCallback = 0; + } + if (pvt->processed == 0) { + testDiag("rec %s was not processed\n", pvt->prec->name); + recsProcessed = 0; + } + pvt->callback = 1; + } +} + +/*************************************************************************\ +* scanIoTest: Test I/O Intr scanning +* including parallel callback threads and scanio callbacks +\*************************************************************************/ + +MAIN(scanIoTest) +{ + int i, j; + int loop; + int max_one, max_one_all; + int parallel, parallel_all; + int result; + int cbCountOk; + long waiting; + struct pvtY *pvt; + + testPlan(10); + + if (noOfGroups < NO_OF_THREADS) + testAbort("ERROR: This test requires number of ioscan sources >= number of parallel threads"); + + /**************************************\ + * Single callback thread + \**************************************/ + + testNo = 0; + startMockIoc(); + + testDiag("Testing single callback thread"); + testDiag(" using %d ioscan sources, %d records for each, and %d loops", + noOfGroups, NO_OF_MEMBERS, ONE_THREAD_LOOPS); + + for (j = 0; j < ONE_THREAD_LOOPS; j++) { + for (i = 0; i < noOfIoscans; i++) { + scanIoRequest(ioscanpvt[i]); + } + } + + epicsThreadSleep(0.1); + + testOk((orderFail==0), "No out-of-order processing"); + + result = 1; + for (i = 0; i < noOfGroups; i++) { + for (pvt = (struct pvtY *)ellFirst(&pvtList[i]); + pvt; + pvt = (struct pvtY *)ellNext(&pvt->node)) { + if (pvt->count != ONE_THREAD_LOOPS) result = 0; + } + } + + testOk(result, "All per-record process counters match number of loops"); + + stopMockIoc(); + + /**************************************\ + * Multiple parallel callback threads + \**************************************/ + + testNo = 1; + startMockIoc(); + + testDiag("Testing multiple parallel callback threads"); + testDiag(" using %d ioscan sources, %d records for each, %d loops, and %d parallel threads", + noOfIoscans, NO_OF_MEMBERS, PAR_THREAD_LOOPS, NO_OF_THREADS); + + for (j = 0; j < PAR_THREAD_LOOPS; j++) { + for (i = 0; i < noOfIoscans; i++) { + scanIoRequest(ioscanpvt[i]); + } + } + + /* With parallel cb threads, order and distribution to threads are not guaranteed. + * We have stop barrier events for each request (in the first record). + * Test schedule: + * - After the requests have been put in the queue, NO_OF_THREADS threads should have taken + * one request each. + * - Each barrier event is given PAR_THREAD_LOOPS times. + * - Whenever things stop, there should be four threads waiting, one request each. + * - After all loops, each record should have processed PAR_THREAD_LOOPS times. + */ + + max_one_all = 1; + parallel_all = 1; + + for (loop = 0; loop < (PAR_THREAD_LOOPS * noOfGroups) / NO_OF_THREADS + 1; loop++) { + max_one = 1; + parallel = 0; + waiting = 0; + j = 0; + do { + epicsThreadSleep(0.001); + j++; + for (i = 0; i < noOfGroups; i++) { + int l = epicsMessageQueuePending(mq[i]); + while (epicsMessageQueueTryReceive(mq[i], NULL, 0) != -1); + if (l == 1) { + waiting |= 1 << i; + } else if (l > 1) { + max_one = 0; + } + } + parallel = count_bits(waiting); + } while (j < 5 && parallel < NO_OF_THREADS); + + if (!max_one) max_one_all = 0; + if (loop < (PAR_THREAD_LOOPS * noOfGroups) / NO_OF_THREADS) { + if (!(parallel == NO_OF_THREADS)) parallel_all = 0; + } else { + /* In the last run of the loop only the remaining requests are processed */ + if (!(parallel == PAR_THREAD_LOOPS * noOfGroups % NO_OF_THREADS)) parallel_all = 0; + } + + for (i = 0; i < noOfGroups; i++) { + if (waiting & (1 << i)) { + epicsEventTrigger(barrier[i]); + } + } + } + + testOk(max_one_all, "No thread took more than one request per loop"); + testOk(parallel_all, "Correct number of requests were being processed in parallel in each loop"); + + epicsThreadSleep(0.1); + + result = 1; + for (i = 0; i < noOfGroups; i++) { + for (pvt = (struct pvtY *)ellFirst(&pvtList[i]); + pvt; + pvt = (struct pvtY *)ellNext(&pvt->node)) { + if (pvt->count != PAR_THREAD_LOOPS) { + testDiag("Process counter for record %s (%d) does not match loop count (%d)", + pvt->prec->name, pvt->count, PAR_THREAD_LOOPS); + result = 0; + } + } + } + + testOk(result, "All per-record process counters match number of loops"); + + stopMockIoc(); + + /**************************************\ + * Scanio callback mechanism + \**************************************/ + + testNo = 2; + startMockIoc(); + + for (i = 0; i < noOfIoscans; i++) { + scanIoSetComplete(ioscanpvt[i], checkProcessed, NULL); + } + + testDiag("Testing scanio callback mechanism"); + testDiag(" using %d ioscan sources, %d records for each, %d loops, and 1 LOW / %d MEDIUM / %d HIGH parallel threads", + noOfIoscans, NO_OF_MEMBERS, CB_THREAD_LOOPS, NO_OF_MID_THREADS, NO_OF_THREADS); + + result = 1; + for (j = 0; j < CB_THREAD_LOOPS; j++) { + for (i = 0; i < noOfIoscans; i++) { + int prio_used; + prio_used = scanIoRequest(ioscanpvt[i]); + if (i+1 != prio_used) + result = 0; + } + } + testOk(result, "All requests return the correct priority callback mask (all 7 permutations covered)"); + + /* Test schedule: + * After the requests have been put in the queue, it is checked + * - that each callback arrives exactly once, + * - after all records in the group have been processed. + */ + + /* loop count times 4 since (worst case) one loop triggers 4 groups for the single LOW thread */ + for (loop = 0; loop < CB_THREAD_LOOPS * 4; loop++) { + max_one = 1; + parallel = 0; + waiting = 0; + j = 0; + do { + epicsThreadSleep(0.001); + j++; + for (i = 0; i < noOfGroups; i++) { + int l = epicsMessageQueuePending(mq[i]); + while (epicsMessageQueueTryReceive(mq[i], NULL, 0) != -1); + if (l == 1) { + waiting |= 1 << i; + } else if (l > 1) { + max_one = 0; + } + } + parallel = count_bits(waiting); + } while (j < 5); +\ + for (i = 0; i < noOfGroups; i++) { + if (waiting & (1 << i)) { + for (pvt = (struct pvtY *)ellFirst(&pvtList[i]); + pvt; + pvt = (struct pvtY *)ellNext(&pvt->node)) { + pvt->processed = 0; + pvt->callback = 0; + /* record processing will set this at the end of process() */ + } + epicsEventTrigger(barrier[i]); + } + } + } + + epicsThreadSleep(0.1); + + testOk(recsProcessed, "Each callback occured after all records in the group were processed"); + testOk(noDoubleCallback, "No double callbacks occured in any loop"); + + result = 1; + cbCountOk = 1; + for (i = 0; i < noOfGroups; i++) { + if (cbCounter[i] != CB_THREAD_LOOPS) { + testDiag("Callback counter for group %d (%d) does not match loop count (%d)", + i, cbCounter[i], CB_THREAD_LOOPS); + cbCountOk = 0; + } + for (pvt = (struct pvtY *)ellFirst(&pvtList[i]); + pvt; + pvt = (struct pvtY *)ellNext(&pvt->node)) { + if (pvt->count != CB_THREAD_LOOPS) { + testDiag("Process counter for record %s (%d) does not match loop count (%d)", + pvt->prec->name, pvt->count, CB_THREAD_LOOPS); + result = 0; + } + } + } + + testOk(result, "All per-record process counters match number of loops"); + testOk(cbCountOk, "All per-group callback counters match number of loops"); + + stopMockIoc(); + + return testDone(); +} diff --git a/src/ioc/db/test/scanIoTest.db b/src/ioc/db/test/scanIoTest.db new file mode 100644 index 000000000..810a84edf --- /dev/null +++ b/src/ioc/db/test/scanIoTest.db @@ -0,0 +1,4 @@ +record(y, g$(GROUP)m$(MEMBER)) { + field(SCAN, "I/O Intr") + field(PRIO, "$(PRIO)") +} diff --git a/src/ioc/db/test/yRecord.dbd b/src/ioc/db/test/yRecord.dbd new file mode 100644 index 000000000..0aa970da5 --- /dev/null +++ b/src/ioc/db/test/yRecord.dbd @@ -0,0 +1,10 @@ +# This is a minimal I/O scanned record + +recordtype(y) { + include "dbCommon.dbd" + field(VAL, DBF_LONG) { + prompt("Value") + } +} + +device(y,CONSTANT,devY,"ScanIO Test") From bd0c759af3b07b44073497d30efb5dd7eb71bedd Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Tue, 26 Aug 2014 10:36:32 -0700 Subject: [PATCH 07/21] ioc/db: change call back queue wakeup algorithm (only wake when threads are sleeping and enough work in the queue) --- src/ioc/db/callback.c | 80 +++++++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index a3ae657df..d8da9fd42 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -29,6 +29,7 @@ #include "epicsString.h" #include "epicsTimer.h" #include "epicsRingPointer.h" +#include "epicsAtomic.h" #include "errlog.h" #include "dbStaticLib.h" #include "dbBase.h" @@ -47,13 +48,18 @@ static int callbackQueueSize = 2000; -static epicsEventId callbackSem[NUM_CALLBACK_PRIORITIES]; -static epicsRingPointerId callbackQ[NUM_CALLBACK_PRIORITIES]; -static volatile int ringOverflow[NUM_CALLBACK_PRIORITIES]; -/* Parallel callback threads (configured and actual counts) */ -static int callbackThreadCount[NUM_CALLBACK_PRIORITIES] = { 1, 1, 1 }; -static int callbackThreadsRunning[NUM_CALLBACK_PRIORITIES]; +typedef struct cbQueueSet { + epicsEventId semWakeUp; + epicsRingPointerId queue; + int queueOverflow; + int threadsConfigured; + int threadsRunning; + int threadsBusy; +} cbQueueSet; + +static cbQueueSet callbackQueue[NUM_CALLBACK_PRIORITIES]; + int callbackParallelThreadsDefault = 2; epicsExportAddress(int,callbackParallelThreadsDefault); @@ -106,7 +112,7 @@ int callbackParallelThreads(int count, const char *prio) if (!prio || strcmp(prio, "") == 0 || strcmp(prio, "*") == 0) { for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { - callbackThreadCount[i] = count; + callbackQueue[i].threadsConfigured = count; } } else { if (!pdbbase) { @@ -123,7 +129,7 @@ int callbackParallelThreads(int count, const char *prio) if (gotMatch) break; } if (gotMatch) { - callbackThreadCount[i] = count; + callbackQueue[i].threadsConfigured = count; break; } else { errlogPrintf("Unknown priority \"%s\"\n", prio); @@ -138,23 +144,22 @@ int callbackParallelThreads(int count, const char *prio) static void callbackTask(void *arg) { - int priority = *(int *)arg; + cbQueueSet *mySet = &callbackQueue[*(int*)arg]; taskwdInsert(0, NULL, NULL); epicsEventSignal(startStopEvent); + epicsAtomicIncrIntT(&mySet->threadsBusy); while(TRUE) { void *ptr; - epicsEventMustWait(callbackSem[priority]); - while ((ptr = epicsRingPointerPop(callbackQ[priority]))) { - /* Retrigger if there are more threads and more work */ - if (callbackThreadsRunning[priority] > 1 - && !epicsRingPointerIsEmpty(callbackQ[priority])) { - epicsEventTrigger(callbackSem[priority]); - } + epicsAtomicDecrIntT(&mySet->threadsBusy); + if (epicsRingPointerIsEmpty(mySet->queue)) + epicsEventMustWait(mySet->semWakeUp); + epicsAtomicIncrIntT(&mySet->threadsBusy); + while ((ptr = epicsRingPointerPop(mySet->queue))) { CALLBACK *pcallback = (CALLBACK *)ptr; if (ptr == &exitCallback) goto shutdown; - ringOverflow[priority] = FALSE; + mySet->queueOverflow = FALSE; (*pcallback->callback)(pcallback); } } @@ -172,9 +177,9 @@ void callbackShutdown(void) cbCtl = ctlExit; for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { - while (callbackThreadsRunning[i]--) { - int ok = epicsRingPointerPush(callbackQ[i], &exitCallback); - epicsEventSignal(callbackSem[i]); + while (callbackQueue[i].threadsRunning--) { + int ok = epicsRingPointerPush(callbackQueue[i].queue, &exitCallback); + epicsEventSignal(callbackQueue[i].semWakeUp); if (ok) epicsEventWait(startStopEvent); } } @@ -189,25 +194,25 @@ void callbackInit(void) int j; char threadName[32]; - if(startStopEvent) + if (startStopEvent) return; startStopEvent = epicsEventMustCreate(epicsEventEmpty); cbCtl = ctlRun; - timerQueue = epicsTimerQueueAllocate(0,epicsThreadPriorityScanHigh); + timerQueue = epicsTimerQueueAllocate(0, epicsThreadPriorityScanHigh); for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { epicsThreadId tid; - callbackSem[i] = epicsEventMustCreate(epicsEventEmpty); - callbackQ[i] = epicsRingPointerLockedCreate(callbackQueueSize); - if (callbackQ[i] == 0) + callbackQueue[i].semWakeUp = epicsEventMustCreate(epicsEventEmpty); + callbackQueue[i].queue = epicsRingPointerLockedCreate(callbackQueueSize); + if (callbackQueue[i].queue == 0) cantProceed("epicsRingPointerLockedCreate failed for %s\n", threadNamePrefix[i]); - ringOverflow[i] = FALSE; + callbackQueue[i].queueOverflow = FALSE; - for (j = 0; j < callbackThreadCount[i]; j++) { - if (callbackThreadCount[i] > 1 ) + for (j = 0; j < callbackQueue[i].threadsConfigured; j++) { + if (callbackQueue[i].threadsConfigured > 1 ) sprintf(threadName, "%s-%d", threadNamePrefix[i], j); else strcpy(threadName, threadNamePrefix[i]); @@ -218,7 +223,7 @@ void callbackInit(void) cantProceed("Failed to spawn callback thread %s\n", threadName); } else { epicsEventWait(startStopEvent); - callbackThreadsRunning[i]++; + callbackQueue[i].threadsRunning++; } } } @@ -229,6 +234,8 @@ int callbackRequest(CALLBACK *pcallback) { int priority; int pushOK; + int threadsBusy; + cbQueueSet *mySet; if (!pcallback) { epicsInterruptContextMessage("callbackRequest: pcallback was NULL\n"); @@ -239,9 +246,10 @@ int callbackRequest(CALLBACK *pcallback) epicsInterruptContextMessage("callbackRequest: Bad priority\n"); return S_db_badChoice; } - if (ringOverflow[priority]) return S_db_bufFull; + mySet = &callbackQueue[priority]; + if (mySet->queueOverflow) return S_db_bufFull; - pushOK = epicsRingPointerPush(callbackQ[priority], pcallback); + pushOK = epicsRingPointerPush(mySet->queue, pcallback); if (!pushOK) { char msg[48] = "callbackRequest: "; @@ -249,10 +257,16 @@ int callbackRequest(CALLBACK *pcallback) strcat(msg, threadNamePrefix[priority]); strcat(msg, " ring buffer full\n"); epicsInterruptContextMessage(msg); - ringOverflow[priority] = TRUE; + mySet->queueOverflow = TRUE; return S_db_bufFull; } - epicsEventSignal(callbackSem[priority]); + /* Wake up another sleeping thread, if threads are sleeping + * and there are more jobs in the queue than busy threads */ + threadsBusy = epicsAtomicGetIntT(mySet->threadsBusy); + if (threadsBusy < mySet->threadsRunning + && epicsRingPointerGetUsed(mySet->queue) > threadsBusy) { + epicsEventSignal(mySet->semWakeUp); + } return 0; } From 862abba4cb3165dae916823c580d32da6f053ce6 Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Tue, 26 Aug 2014 10:58:59 -0700 Subject: [PATCH 08/21] ioc/db: fix bug in callback.c --- src/ioc/db/callback.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index d8da9fd42..3b0e48e51 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -262,7 +262,7 @@ int callbackRequest(CALLBACK *pcallback) } /* Wake up another sleeping thread, if threads are sleeping * and there are more jobs in the queue than busy threads */ - threadsBusy = epicsAtomicGetIntT(mySet->threadsBusy); + threadsBusy = epicsAtomicGetIntT(&mySet->threadsBusy); if (threadsBusy < mySet->threadsRunning && epicsRingPointerGetUsed(mySet->queue) > threadsBusy) { epicsEventSignal(mySet->semWakeUp); From 08bf6a1081bba466a6b92341072beb0a5ec8503b Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Tue, 26 Aug 2014 11:58:42 -0700 Subject: [PATCH 09/21] ioc/db: fix bug in callback thread initialization --- src/ioc/db/callback.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index 3b0e48e51..52d72dbf5 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -60,6 +60,7 @@ typedef struct cbQueueSet { static cbQueueSet callbackQueue[NUM_CALLBACK_PRIORITIES]; +int callbackThreadsDefault = 1; int callbackParallelThreadsDefault = 2; epicsExportAddress(int,callbackParallelThreadsDefault); @@ -210,6 +211,8 @@ void callbackInit(void) cantProceed("epicsRingPointerLockedCreate failed for %s\n", threadNamePrefix[i]); callbackQueue[i].queueOverflow = FALSE; + if (callbackQueue[i].threadsConfigured == 0) + callbackQueue[i].threadsConfigured = callbackThreadsDefault; for (j = 0; j < callbackQueue[i].threadsConfigured; j++) { if (callbackQueue[i].threadsConfigured > 1 ) From 51c7dea0709d3d18810025e5cd5a2d6db0c9e82a Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Thu, 28 Aug 2014 12:30:07 -0700 Subject: [PATCH 10/21] ioc/db: make sure each priority gets at least one thread. --- src/ioc/db/callback.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index 52d72dbf5..eca624798 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -61,6 +61,8 @@ typedef struct cbQueueSet { static cbQueueSet callbackQueue[NUM_CALLBACK_PRIORITIES]; int callbackThreadsDefault = 1; +/* Don't know what a reasonable default is (yet). + * For the time being: parallel means 2 if not explicitly specified */ int callbackParallelThreadsDefault = 2; epicsExportAddress(int,callbackParallelThreadsDefault); @@ -110,6 +112,7 @@ int callbackParallelThreads(int count, const char *prio) count = epicsThreadGetCPUs() + count; else if (count == 0) count = callbackParallelThreadsDefault; + if (count < 1) count = 1; if (!prio || strcmp(prio, "") == 0 || strcmp(prio, "*") == 0) { for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { From 7f7e63fee13407bb1dcdde588b6892849e3dfce8 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Fri, 29 Aug 2014 15:43:16 -0700 Subject: [PATCH 11/21] dbScan.c: clarify ioeventCallback --- src/ioc/db/dbScan.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/ioc/db/dbScan.c b/src/ioc/db/dbScan.c index 7f2337178..61cb90bb7 100644 --- a/src/ioc/db/dbScan.c +++ b/src/ioc/db/dbScan.c @@ -761,14 +761,16 @@ static void spawnPeriodic(int ind) static void ioeventCallback(CALLBACK *pcallback) { io_scan_list *piosl; - io_scan_list *pioslLow; callbackGetUser(piosl, pcallback); scanList(&piosl->scan_list); - pioslLow = piosl - pcallback->priority; - if(pioslLow->cb) - (*pioslLow->cb)(pioslLow->arg, - pioslLow, + /* the callback function and argument are only stored in the + * first element of the array. So skip back to the beginning. + */ + piosl -= pcallback->priority; + if(piosl->cb) + (*piosl->cb)(piosl->arg, + piosl, pcallback->priority); } From b3c1fef110de6ed33ed7613fc72443531bfc4302 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 16 Sep 2014 18:55:27 -0400 Subject: [PATCH 12/21] cleanup callback and callback*Test --- src/ioc/db/callback.c | 17 +++++++++++++---- src/ioc/db/test/callbackParallelTest.c | 6 ++++++ src/ioc/db/test/callbackTest.c | 6 ++++++ 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index eca624798..574e05dd8 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -169,6 +169,7 @@ static void callbackTask(void *arg) } shutdown: + mySet->threadsRunning--; taskwdRemove(0); epicsEventSignal(startStopEvent); } @@ -180,16 +181,24 @@ void callbackShutdown(void) if (cbCtl == ctlExit) return; cbCtl = ctlExit; + /* sequential shutdown of workers */ for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { - while (callbackQueue[i].threadsRunning--) { - int ok = epicsRingPointerPush(callbackQueue[i].queue, &exitCallback); - epicsEventSignal(callbackQueue[i].semWakeUp); - if (ok) epicsEventWait(startStopEvent); + while (callbackQueue[i].threadsRunning) { + if(epicsRingPointerPush(callbackQueue[i].queue, &exitCallback)) { + epicsEventSignal(callbackQueue[i].semWakeUp); + epicsEventWait(startStopEvent); + } else { + epicsThreadSleep(0.05); + } } + assert(callbackQueue[i].threadsRunning==0); + epicsEventDestroy(callbackQueue[i].semWakeUp); + epicsRingPointerDelete(callbackQueue[i].queue); } epicsTimerQueueRelease(timerQueue); epicsEventDestroy(startStopEvent); startStopEvent = NULL; + memset(callbackQueue, 0, sizeof(callbackQueue)); } void callbackInit(void) diff --git a/src/ioc/db/test/callbackParallelTest.c b/src/ioc/db/test/callbackParallelTest.c index d07b28346..eeb6ddaee 100644 --- a/src/ioc/db/test/callbackParallelTest.c +++ b/src/ioc/db/test/callbackParallelTest.c @@ -180,5 +180,11 @@ MAIN(callbackParallelTest) printStats(timeError[1], "MID"); printStats(timeError[2], "HIGH"); + for (i = 0; i < NCALLBACKS ; i++) { + free(pcbt[i]); + } + + callbackShutdown(); + return testDone(); } diff --git a/src/ioc/db/test/callbackTest.c b/src/ioc/db/test/callbackTest.c index 75e65765d..c4c6b31d9 100644 --- a/src/ioc/db/test/callbackTest.c +++ b/src/ioc/db/test/callbackTest.c @@ -177,5 +177,11 @@ MAIN(callbackTest) printStats(timeError[1], "MID"); printStats(timeError[2], "HIGH"); + for (i = 0; i < NCALLBACKS ; i++) { + free(pcbt[i]); + } + + callbackShutdown(); + return testDone(); } From 1a8620f03ea3224e0040caee8863842f3d1df095 Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Fri, 19 Sep 2014 10:13:29 +0200 Subject: [PATCH 13/21] documentation: add parallel callback threads to RELEASE_NOTES --- documentation/RELEASE_NOTES.html | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/documentation/RELEASE_NOTES.html b/documentation/RELEASE_NOTES.html index 3db8233c0..fbbc645de 100644 --- a/documentation/RELEASE_NOTES.html +++ b/documentation/RELEASE_NOTES.html @@ -7,7 +7,7 @@ -

EPICS Base Release 3.15.0.1

+

EPICS Base Release 3.15.0.2

EPICS Base 3.15.0.x releases are not intended for use in production systems.

@@ -15,6 +15,23 @@ EPICS Base 3.15.0.x releases are not intended for use in production systems.

Changes between 3.15.0.1 and 3.15.0.2

+

Parallel callback threads

+ +

The general purpose callback facility can run multiple parallel callback +threads per priority level. This makes better use of SMP architectures (e.g. +processors with multiple cores), as callback work - which includes second +stage processing of records with asynchronuous device support and I/O +scanned processing - can be distributed over the available CPUs.

+ +

Note that by using parallel callback threads the order of scan callback +requests in the queue is not retained. If a device support needs to be +informed when scanIoRequest processing has finished, it should use the new +scanIoSetComplete() feature to add a user function that will be called after +the scanIoRequest record processing has finished.

+ +

Parallel callback threads have to be explicitly configured, by default +the IOC keeps the old behavior of running one callback thread per priority.

+

Implement EPICS_CAS_INTF_ADDR_LIST in rsrv

The IOC server can now bind to a single IP address (and optional port number) @@ -31,8 +48,7 @@ backwards compatibility reasons. Only the alarm.h header needs to be included now to declare the epicsAlarmSeverityStrings and epicsAlarmConditionStrings arrays.

-

-General purpose thread pool

+

General purpose thread pool

A general purpose threaded work queue API epicsThreadPool is added. From e7d186eaf85148343b2a22e251c0e416cffd680e Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Fri, 19 Sep 2014 13:09:52 +0200 Subject: [PATCH 14/21] ioc/db: use dbFindMenu() in callback.c --- src/ioc/db/callback.c | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index 574e05dd8..7a140c856 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -124,23 +124,19 @@ int callbackParallelThreads(int count, const char *prio) return -1; } /* Find prio in menuPriority */ - pdbMenu = (dbMenu *)ellFirst(&pdbbase->menuList); - while (pdbMenu) { - gotMatch = (strcmp("menuPriority", pdbMenu->name)==0) ? TRUE : FALSE; - if (gotMatch) { - for (i = 0; i < pdbMenu->nChoice; i++) { - gotMatch = (epicsStrCaseCmp(prio, pdbMenu->papChoiceValue[i])==0) ? TRUE : FALSE; - if (gotMatch) break; - } - if (gotMatch) { - callbackQueue[i].threadsConfigured = count; - break; - } else { - errlogPrintf("Unknown priority \"%s\"\n", prio); - return -1; - } + pdbMenu = dbFindMenu(&pdbbase, "menuPriority"); + if (pdbMenu) { + for (i = 0; i < pdbMenu->nChoice; i++) { + gotMatch = (epicsStrCaseCmp(prio, pdbMenu->papChoiceValue[i])==0) ? TRUE : FALSE; + if (gotMatch) break; + } + if (gotMatch) { + callbackQueue[i].threadsConfigured = count; + return 0; + } else { + errlogPrintf("Unknown priority \"%s\"\n", prio); + return -1; } - pdbMenu = (dbMenu *)ellNext(&pdbMenu->node); } } return 0; From f84b73602254ca39982a5d125f22b7773291c874 Mon Sep 17 00:00:00 2001 From: Ralph Lange Date: Mon, 29 Sep 2014 10:59:00 +0200 Subject: [PATCH 15/21] ioc/db: back to old signalling policy - wake up worker thread as long as there are any sleeping --- src/ioc/db/callback.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index 7a140c856..e4cfd74d2 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -271,11 +271,9 @@ int callbackRequest(CALLBACK *pcallback) mySet->queueOverflow = TRUE; return S_db_bufFull; } - /* Wake up another sleeping thread, if threads are sleeping - * and there are more jobs in the queue than busy threads */ + /* Wake up another sleeping thread, if threads are sleeping */ threadsBusy = epicsAtomicGetIntT(&mySet->threadsBusy); - if (threadsBusy < mySet->threadsRunning - && epicsRingPointerGetUsed(mySet->queue) > threadsBusy) { + if (threadsBusy < mySet->threadsRunning) { epicsEventSignal(mySet->semWakeUp); } return 0; From 4c761eeba07d940c574db687d450e398ec3db249 Mon Sep 17 00:00:00 2001 From: Andrew Johnson Date: Tue, 30 Sep 2014 12:36:32 -0500 Subject: [PATCH 16/21] Fixes for VxWorks & RTEMS builds * C++ comments are not legal in C code * Declare scanIoTest() in epicsRunDbTests.c --- src/ioc/db/test/epicsRunDbTests.c | 1 + src/ioc/db/test/scanIoTest.c | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/ioc/db/test/epicsRunDbTests.c b/src/ioc/db/test/epicsRunDbTests.c index 0217436db..349fabccd 100644 --- a/src/ioc/db/test/epicsRunDbTests.c +++ b/src/ioc/db/test/epicsRunDbTests.c @@ -20,6 +20,7 @@ int testdbConvert(void); int callbackTest(void); int dbStateTest(void); int dbShutdownTest(void); +int scanIoTest(void); int dbLockTest(void); int dbPutLinkTest(void); int testDbChannel(void); diff --git a/src/ioc/db/test/scanIoTest.c b/src/ioc/db/test/scanIoTest.c index b4d2b0d25..39e9aa35f 100644 --- a/src/ioc/db/test/scanIoTest.c +++ b/src/ioc/db/test/scanIoTest.c @@ -96,9 +96,9 @@ static int noDoubleCallback = 1; void scanIoTest_registerRecordDeviceDriver(struct dbBase *); long count_bits(long n) { - unsigned int c; // c accumulates the total bits set in v + unsigned int c; /* c accumulates the total bits set in v */ for (c = 0; n; c++) - n &= n - 1; // clear the least significant bit set + n &= n - 1; /* clear the least significant bit set */ return c; } @@ -155,7 +155,7 @@ static long process(yRecord *prec) struct pvtY *pvt = (struct pvtY *)(prec->dpvt); if (testNo == 0) { - // Single callback thread + /* Single callback thread */ if (executionOrder != pvt->member) { orderFail = 1; } @@ -174,8 +174,8 @@ static long process(yRecord *prec) rset yRSET={ 4, - NULL, //report, - NULL, //initialize, + NULL, /* report */ + NULL, /* initialize */ init_record, process }; From 04848e0766cbfee22db53ea21a125637d5f6f99e Mon Sep 17 00:00:00 2001 From: Andrew Johnson Date: Tue, 30 Sep 2014 13:00:39 -0500 Subject: [PATCH 17/21] Fix problems in callbackParallelThreads() --- src/ioc/db/callback.c | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index e4cfd74d2..2a4a291cc 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -99,10 +99,6 @@ int callbackSetQueueSize(int size) int callbackParallelThreads(int count, const char *prio) { - int i; - dbMenu *pdbMenu; - int gotMatch; - if (startStopEvent) { errlogPrintf("Callback system already initialized\n"); return -1; @@ -115,25 +111,34 @@ int callbackParallelThreads(int count, const char *prio) if (count < 1) count = 1; if (!prio || strcmp(prio, "") == 0 || strcmp(prio, "*") == 0) { + int i; + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { callbackQueue[i].threadsConfigured = count; } - } else { + } + else { + dbMenu *pdbMenu; + if (!pdbbase) { - errlogPrintf("pdbbase not specified\n"); + errlogPrintf("callbackParallelThreads: pdbbase not set\n"); return -1; } /* Find prio in menuPriority */ - pdbMenu = dbFindMenu(&pdbbase, "menuPriority"); + pdbMenu = dbFindMenu(pdbbase, "menuPriority"); if (pdbMenu) { + int i, gotMatch = 0; + for (i = 0; i < pdbMenu->nChoice; i++) { gotMatch = (epicsStrCaseCmp(prio, pdbMenu->papChoiceValue[i])==0) ? TRUE : FALSE; - if (gotMatch) break; + if (gotMatch) + break; } if (gotMatch) { callbackQueue[i].threadsConfigured = count; return 0; - } else { + } + else { errlogPrintf("Unknown priority \"%s\"\n", prio); return -1; } From c9f7a32c8210fb4b353c70cc05b20b59909ec508 Mon Sep 17 00:00:00 2001 From: Andrew Johnson Date: Tue, 30 Sep 2014 16:10:24 -0500 Subject: [PATCH 18/21] Mark callbackParallelThreadsDefault properly The definition gets epicsShareDef and may provide a value. Additional declarations get epicsShareExtern. --- src/ioc/db/callback.c | 2 +- src/ioc/db/dbIocRegister.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index 2a4a291cc..76fb154b7 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -63,7 +63,7 @@ static cbQueueSet callbackQueue[NUM_CALLBACK_PRIORITIES]; int callbackThreadsDefault = 1; /* Don't know what a reasonable default is (yet). * For the time being: parallel means 2 if not explicitly specified */ -int callbackParallelThreadsDefault = 2; +epicsShareDef int callbackParallelThreadsDefault = 2; epicsExportAddress(int,callbackParallelThreadsDefault); /* Timer for Delayed Requests */ diff --git a/src/ioc/db/dbIocRegister.c b/src/ioc/db/dbIocRegister.c index 7ef15ac1f..8b4be4fd9 100644 --- a/src/ioc/db/dbIocRegister.c +++ b/src/ioc/db/dbIocRegister.c @@ -23,7 +23,7 @@ #include "dbIocRegister.h" #include "dbState.h" -epicsShareDef int callbackParallelThreadsDefault; +epicsShareExtern int callbackParallelThreadsDefault; /* dbLoadDatabase */ static const iocshArg dbLoadDatabaseArg0 = { "file name",iocshArgString}; From f9628646d1245860e1d4829a475eaf6ab9f5e6b0 Mon Sep 17 00:00:00 2001 From: Andrew Johnson Date: Tue, 30 Sep 2014 16:22:03 -0500 Subject: [PATCH 19/21] Make IOSCANPVT a single structure Not an array of 3 io_scan_list objects. --- src/ioc/db/dbScan.c | 203 +++++++++++++++++++++++++++----------------- src/ioc/db/dbScan.h | 12 +-- 2 files changed, 132 insertions(+), 83 deletions(-) diff --git a/src/ioc/db/dbScan.c b/src/ioc/db/dbScan.c index 61cb90bb7..b7ac0e788 100644 --- a/src/ioc/db/dbScan.c +++ b/src/ioc/db/dbScan.c @@ -114,22 +114,24 @@ typedef struct event_list { char event_name[MAX_STRING_SIZE]; } event_list; static event_list * volatile pevent_list[256]; - +static epicsMutexId event_lock; /* IO_EVENT*/ typedef struct io_scan_list { - CALLBACK callback; - scan_list scan_list; - struct io_scan_list *next; - io_scan_complete cb; - void * arg; + CALLBACK callback; + scan_list scan_list; } io_scan_list; -static io_scan_list *iosl_head[NUM_CALLBACK_PRIORITIES] = { - NULL, NULL, NULL -}; +typedef struct ioscan_head { + struct ioscan_head *next; + struct io_scan_list iosl[NUM_CALLBACK_PRIORITIES]; + io_scan_complete cb; + void *arg; +} ioscan_head; +static ioscan_head *pioscan_list = NULL; +static epicsMutexId ioscan_lock; /* Private routines */ static void onceTask(void *); @@ -138,9 +140,10 @@ static void periodicTask(void *arg); static void initPeriodic(void); static void deletePeriodic(void); static void spawnPeriodic(int ind); -static void initEvent(void); static void eventCallback(CALLBACK *pcallback); -static void ioeventCallback(CALLBACK *pcallback); +static void ioscanInit(void); +static void ioscanCallback(CALLBACK *pcallback); +static void ioscanDestroy(void); static void printList(scan_list *psl, char *message); static void scanList(scan_list *psl); static void buildScanLists(void); @@ -166,6 +169,7 @@ void scanShutdown(void) epicsEventWait(startStopEvent); deletePeriodic(); + ioscanDestroy(); epicsRingPointerDelete(onceQ); @@ -187,7 +191,6 @@ long scanInit(void) initPeriodic(); initOnce(); - initEvent(); buildScanLists(); for (i = 0; i < nPeriodic; i++) spawnPeriodic(i); @@ -247,7 +250,7 @@ void scanAdd(struct dbCommon *precord) pel = eventNameToHandle(eventname); if (pel) addToList(precord, &pel->scan_list[prio]); } else if (scan == menuScanI_O_Intr) { - io_scan_list *piosl = NULL; + ioscan_head *piosh = NULL; int prio; DEVSUPFUN get_ioint_info; @@ -264,11 +267,11 @@ void scanAdd(struct dbCommon *precord) precord->scan = menuScanPassive; return; } - if (get_ioint_info(0, precord, &piosl)) { + if (get_ioint_info(0, precord, &piosh)) { precord->scan = menuScanPassive; return; } - if (piosl == NULL) { + if (piosh == NULL) { recGblRecordError(-1, (void *)precord, "scanAdd: I/O Intr not valid"); precord->scan = menuScanPassive; @@ -281,8 +284,7 @@ void scanAdd(struct dbCommon *precord) precord->scan = menuScanPassive; return; } - piosl += prio; /* get piosl for correct priority*/ - addToList(precord, &piosl->scan_list); + addToList(precord, &piosh->iosl[prio].scan_list); } else if (scan >= SCAN_1ST_PERIODIC) { addToList(precord, &papPeriodic[scan - SCAN_1ST_PERIODIC]->scan_list); } @@ -321,7 +323,7 @@ void scanDelete(struct dbCommon *precord) if (pel && (psl = &pel->scan_list[prio])) deleteFromList(precord, psl); } else if (scan == menuScanI_O_Intr) { - io_scan_list *piosl=NULL; + ioscan_head *piosh = NULL; int prio; DEVSUPFUN get_ioint_info; @@ -336,8 +338,8 @@ void scanDelete(struct dbCommon *precord) "scanDelete: I/O Intr not valid (no get_ioint_info)"); return; } - if (get_ioint_info(1, precord, &piosl)) return; - if (piosl == NULL) { + if (get_ioint_info(1, precord, &piosh)) return; + if (piosh == NULL) { recGblRecordError(-1, (void *)precord, "scanDelete: I/O Intr not valid"); return; @@ -348,8 +350,7 @@ void scanDelete(struct dbCommon *precord) "scanDelete: get_ioint_info returned illegal priority"); return; } - piosl += prio; /*get piosl for correct priority*/ - deleteFromList(precord, &piosl->scan_list); + deleteFromList(precord, &piosh->iosl[prio].scan_list); } else if (scan >= SCAN_1ST_PERIODIC) { deleteFromList(precord, &papPeriodic[scan - SCAN_1ST_PERIODIC]->scan_list); } @@ -401,21 +402,28 @@ int scanpel(const char* eventname) /* print event list */ return 0; } -int scanpiol(void) /* print io_event list */ +int scanpiol(void) /* print pioscan_list */ { - io_scan_list *piosl; - int prio; - char message[80]; + ioscan_head *piosh; - for(prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { - piosl = iosl_head[prio]; - if (piosl == NULL) continue; - sprintf(message, "IO Event: Priority %s", priorityName[prio]); - while(piosl != NULL) { + ioscanInit(); + epicsMutexMustLock(ioscan_lock); + piosh = pioscan_list; + + while (piosh) { + int prio; + + for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { + io_scan_list *piosl = &piosh->iosl[prio]; + char message[80]; + + sprintf(message, "IO Event %p: Priority %s", + piosh, priorityName[prio]); printList(&piosl->scan_list, message); - piosl = piosl->next; } + piosh = piosh->next; } + epicsMutexUnlock(ioscan_lock); return 0; } @@ -427,19 +435,22 @@ static void eventCallback(CALLBACK *pcallback) scanList(psl); } -static void initEvent(void) +static void eventOnce(void *arg) { + event_lock = epicsMutexMustCreate(); } event_list *eventNameToHandle(const char *eventname) { int prio; event_list *pel; - static epicsMutexId lock = NULL; + static epicsThreadOnceId onceId = EPICS_THREAD_ONCE_INIT; - if (!lock) lock = epicsMutexMustCreate(); - if (!eventname || eventname[0] == 0) return NULL; - epicsMutexMustLock(lock); + if (!eventname || eventname[0] == 0) + return NULL; + + epicsThreadOnce(&onceId, eventOnce, NULL); + epicsMutexMustLock(event_lock); for (pel = pevent_list[0]; pel; pel=pel->next) { if (strcmp(pel->event_name, eventname) == 0) break; } @@ -462,7 +473,7 @@ event_list *eventNameToHandle(const char *eventname) pevent_list[e] = pel; } } - epicsMutexUnlock(lock); + epicsMutexUnlock(event_lock); return pel; } @@ -490,48 +501,89 @@ void post_event(int event) postEvent(pel); } -void scanIoInit(IOSCANPVT *ppioscanpvt) +static void ioscanOnce(void *arg) { - int prio; + ioscan_lock = epicsMutexMustCreate(); +} - /* Allocate an array of io_scan_lists, one for each priority. */ - /* IOSCANPVT will hold the address of this array of structures */ - *ppioscanpvt = dbCalloc(NUM_CALLBACK_PRIORITIES, sizeof(io_scan_list)); - for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { - io_scan_list *piosl = &(*ppioscanpvt)[prio]; - callbackSetCallback(ioeventCallback, &piosl->callback); - callbackSetPriority(prio, &piosl->callback); - callbackSetUser(piosl, &piosl->callback); - ellInit(&piosl->scan_list.list); - piosl->scan_list.lock = epicsMutexMustCreate(); - piosl->next = iosl_head[prio]; - iosl_head[prio] = piosl; +static void ioscanInit(void) +{ + static epicsThreadOnceId onceId = EPICS_THREAD_ONCE_INIT; + + epicsThreadOnce(&onceId, ioscanOnce, NULL); +} + +static void ioscanDestroy(void) +{ + ioscan_head *piosh; + + ioscanInit(); + epicsMutexMustLock(ioscan_lock); + piosh = pioscan_list; + pioscan_list = NULL; + epicsMutexUnlock(ioscan_lock); + while (piosh) { + ioscan_head *pnext = piosh->next; + int prio; + + for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { + epicsMutexDestroy(piosh->iosl[prio].scan_list.lock); + ellFree(&piosh->iosl[prio].scan_list.list); + } + free(piosh); + piosh = pnext; } } -/* return a bit mask indicating each prioity level - * in which a callback request was queued. +void scanIoInit(IOSCANPVT *pioscanpvt) +{ + ioscan_head *piosh = dbCalloc(1, sizeof(ioscan_head)); + int prio; + + ioscanInit(); + for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { + io_scan_list *piosl = &piosh->iosl[prio]; + + callbackSetCallback(ioscanCallback, &piosl->callback); + callbackSetPriority(prio, &piosl->callback); + callbackSetUser(piosh, &piosl->callback); + ellInit(&piosl->scan_list.list); + piosl->scan_list.lock = epicsMutexMustCreate(); + } + epicsMutexMustLock(ioscan_lock); + piosh->next = pioscan_list; + pioscan_list = piosh; + epicsMutexUnlock(ioscan_lock); + *pioscanpvt = piosh; +} + +/* Return a bit mask indicating each priority level + * in which a callback request was successfully queued. */ -unsigned int scanIoRequest(IOSCANPVT pioscanpvt) +unsigned int scanIoRequest(IOSCANPVT piosh) { int prio; unsigned int queued = 0; - if (scanCtl != ctlRun) return 0; + if (scanCtl != ctlRun) + return 0; + for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { - io_scan_list *piosl = &pioscanpvt[prio]; + io_scan_list *piosl = &piosh->iosl[prio]; + if (ellCount(&piosl->scan_list.list) > 0) - if(!callbackRequest(&piosl->callback)) - queued |= 1<callback)) + queued |= 1 << prio; } + return queued; } /* May not be called while a scan request is queued or running */ -void scanIoSetComplete(IOSCANPVT pioscanpvt, io_scan_complete cb, void* arg) +void scanIoSetComplete(IOSCANPVT piosh, io_scan_complete cb, void *arg) { - pioscanpvt->cb = cb; - pioscanpvt->arg = arg; + piosh->cb = cb; + piosh->arg = arg; } void scanOnce(struct dbCommon *precord) @@ -758,20 +810,14 @@ static void spawnPeriodic(int ind) epicsEventWait(startStopEvent); } -static void ioeventCallback(CALLBACK *pcallback) +static void ioscanCallback(CALLBACK *pcallback) { - io_scan_list *piosl; + ioscan_head *piosh = (ioscan_head *) pcallback->user; + int prio = pcallback->priority; - callbackGetUser(piosl, pcallback); - scanList(&piosl->scan_list); - /* the callback function and argument are only stored in the - * first element of the array. So skip back to the beginning. - */ - piosl -= pcallback->priority; - if(piosl->cb) - (*piosl->cb)(piosl->arg, - piosl, - pcallback->priority); + scanList(&piosh->iosl[prio].scan_list); + if (piosh->cb) + piosh->cb(piosh->arg, piosh, prio); } static void printList(scan_list *psl, char *message) @@ -781,14 +827,17 @@ static void printList(scan_list *psl, char *message) epicsMutexMustLock(psl->lock); pse = (scan_element *)ellFirst(&psl->list); epicsMutexUnlock(psl->lock); - if (pse == NULL) return; + + if (!pse) + return; + printf("%s\n", message); - while (pse != NULL) { + while (pse) { printf(" %-28s\n", pse->precord->name); epicsMutexMustLock(psl->lock); if (pse->pscan_list != psl) { epicsMutexUnlock(psl->lock); - printf("Scan list changed while processing."); + printf(" Scan list changed while printing, try again.\n"); return; } pse = (scan_element *)ellNext(&pse->node); diff --git a/src/ioc/db/dbScan.h b/src/ioc/db/dbScan.h index 42ba46c37..cd9666348 100644 --- a/src/ioc/db/dbScan.h +++ b/src/ioc/db/dbScan.h @@ -34,12 +34,12 @@ extern "C" { #define MIN_PHASE SHRT_MIN /*definitions for I/O Interrupt Scanning */ -struct io_scan_list; +struct ioscan_head; -typedef struct io_scan_list *IOSCANPVT; +typedef struct ioscan_head *IOSCANPVT; typedef struct event_list *EVENTPVT; -typedef void (*io_scan_complete)(void *, IOSCANPVT, int); +typedef void (*io_scan_complete)(void *usr, IOSCANPVT, int prio); struct dbCommon; @@ -66,9 +66,9 @@ epicsShareFunc int scanpel(const char *event_name); /*print io_event list*/ epicsShareFunc int scanpiol(void); -epicsShareFunc void scanIoInit(IOSCANPVT *); -epicsShareFunc unsigned int scanIoRequest(IOSCANPVT); -epicsShareFunc void scanIoSetComplete(IOSCANPVT, io_scan_complete, void*); +epicsShareFunc void scanIoInit(IOSCANPVT *ppios); +epicsShareFunc unsigned int scanIoRequest(IOSCANPVT pios); +epicsShareFunc void scanIoSetComplete(IOSCANPVT, io_scan_complete, void *usr); #ifdef __cplusplus } From c2631cdc8ae3509d1f011f33528ac5296b493dfe Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 1 Oct 2014 18:35:42 -0400 Subject: [PATCH 20/21] callback: don't track busy workers wakeup unconditionally. Perhaps inefficient, but simple. --- src/ioc/db/callback.c | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index 76fb154b7..c37e1cc0f 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -55,7 +55,6 @@ typedef struct cbQueueSet { int queueOverflow; int threadsConfigured; int threadsRunning; - int threadsBusy; } cbQueueSet; static cbQueueSet callbackQueue[NUM_CALLBACK_PRIORITIES]; @@ -154,13 +153,11 @@ static void callbackTask(void *arg) taskwdInsert(0, NULL, NULL); epicsEventSignal(startStopEvent); - epicsAtomicIncrIntT(&mySet->threadsBusy); while(TRUE) { void *ptr; - epicsAtomicDecrIntT(&mySet->threadsBusy); if (epicsRingPointerIsEmpty(mySet->queue)) epicsEventMustWait(mySet->semWakeUp); - epicsAtomicIncrIntT(&mySet->threadsBusy); + while ((ptr = epicsRingPointerPop(mySet->queue))) { CALLBACK *pcallback = (CALLBACK *)ptr; if (ptr == &exitCallback) goto shutdown; @@ -250,7 +247,6 @@ int callbackRequest(CALLBACK *pcallback) { int priority; int pushOK; - int threadsBusy; cbQueueSet *mySet; if (!pcallback) { @@ -276,11 +272,7 @@ int callbackRequest(CALLBACK *pcallback) mySet->queueOverflow = TRUE; return S_db_bufFull; } - /* Wake up another sleeping thread, if threads are sleeping */ - threadsBusy = epicsAtomicGetIntT(&mySet->threadsBusy); - if (threadsBusy < mySet->threadsRunning) { - epicsEventSignal(mySet->semWakeUp); - } + epicsEventSignal(mySet->semWakeUp); return 0; } From c0a4ebebff172145e33145ff57f881042abbf8f0 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 1 Oct 2014 18:38:14 -0400 Subject: [PATCH 21/21] callback: workers wake up peer if necessary. --- src/ioc/db/callback.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index c37e1cc0f..bcefe292c 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -160,6 +160,8 @@ static void callbackTask(void *arg) while ((ptr = epicsRingPointerPop(mySet->queue))) { CALLBACK *pcallback = (CALLBACK *)ptr; + if(!epicsRingPointerIsEmpty(mySet->queue)) + epicsEventMustTrigger(mySet->semWakeUp); if (ptr == &exitCallback) goto shutdown; mySet->queueOverflow = FALSE; (*pcallback->callback)(pcallback);