Merged Martin Konrad's callbackQueueStatus branch into 3.16

This commit is contained in:
Andrew Johnson
2018-12-07 13:47:56 -06:00
11 changed files with 232 additions and 20 deletions

View File

@@ -54,6 +54,7 @@ typedef struct cbQueueSet {
epicsEventId semWakeUp;
epicsRingPointerId queue;
int queueOverflow;
int queueOverflows;
int shutdown;
int threadsConfigured;
int threadsRunning;
@@ -103,6 +104,51 @@ int callbackSetQueueSize(int size)
return 0;
}
int callbackQueueStatus(const int reset, callbackQueueStats *result)
{
int ret;
if (!callbackIsInit) return -1;
if (result) {
int prio;
result->size = callbackQueueSize;
for(prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) {
epicsRingPointerId qId = callbackQueue[prio].queue;
result->numUsed[prio] = epicsRingPointerGetUsed(qId);
result->maxUsed[prio] = epicsRingPointerGetHighWaterMark(qId);
result->numOverflow[prio] = epicsAtomicGetIntT(&callbackQueue[prio].queueOverflows);
}
ret = 0;
} else {
ret = -2;
}
if (reset) {
int prio;
for(prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) {
epicsRingPointerResetHighWaterMark(callbackQueue[prio].queue);
}
}
return ret;
}
void callbackQueuePrintStatus(const int reset)
{
callbackQueueStats stats;
if (callbackQueueStatus(reset, &stats) == -1) {
fprintf(stderr, "Callback system not initialized, yet. Please run "
"iocInit before using this command.\n");
} else {
int prio;
printf("PRIORITY HIGH-WATER MARK ITEMS IN Q Q SIZE %% USED Q OVERFLOWS\n");
for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) {
double qusage = 100.0 * stats.numUsed[prio] / stats.size;
printf("%8s %15d %10d %6d %6.1f %11d\n",
threadNamePrefix[prio], stats.maxUsed[prio],
stats.numUsed[prio], stats.size, qusage,
stats.numOverflow[prio]);
}
}
}
int callbackParallelThreads(int count, const char *prio)
{
if (callbackIsInit) {
@@ -290,6 +336,7 @@ int callbackRequest(CALLBACK *pcallback)
if (!pushOK) {
epicsInterruptContextMessage(fullMessage[priority]);
mySet->queueOverflow = TRUE;
epicsAtomicIncrIntT(&mySet->queueOverflows);
return S_db_bufFull;
}
epicsEventSignal(mySet->semWakeUp);

View File

@@ -48,6 +48,13 @@ typedef epicsCallback CALLBACK;
typedef void (*CALLBACKFUNC)(struct callbackPvt*);
typedef struct callbackQueueStats {
int size;
int numUsed[NUM_CALLBACK_PRIORITIES];
int maxUsed[NUM_CALLBACK_PRIORITIES];
int numOverflow[NUM_CALLBACK_PRIORITIES];
} callbackQueueStats;
#define callbackSetCallback(PFUN, PCALLBACK) \
( (PCALLBACK)->callback = (PFUN) )
#define callbackSetPriority(PRIORITY, PCALLBACK) \
@@ -73,6 +80,8 @@ epicsShareFunc void callbackCancelDelayed(CALLBACK *pcallback);
epicsShareFunc void callbackRequestProcessCallbackDelayed(
CALLBACK *pCallback, int Priority, void *pRec, double seconds);
epicsShareFunc int callbackSetQueueSize(int size);
epicsShareFunc int callbackQueueStatus(const int reset, callbackQueueStats *result);
void callbackQueuePrintStatus(const int reset);
epicsShareFunc int callbackParallelThreads(int count, const char *prio);
#ifdef __cplusplus

View File

@@ -296,6 +296,17 @@ static void scanOnceSetQueueSizeCallFunc(const iocshArgBuf *args)
scanOnceSetQueueSize(args[0].ival);
}
/* scanOnceQueueStatus */
static const iocshArg scanOnceQueueStatusArg0 = { "reset",iocshArgInt};
static const iocshArg * const scanOnceQueueStatusArgs[1] =
{&scanOnceQueueStatusArg0};
static const iocshFuncDef scanOnceQueueStatusFuncDef =
{"scanOnceQueueStatus",1,scanOnceQueueStatusArgs};
static void scanOnceQueueStatusCallFunc(const iocshArgBuf *args)
{
scanOnceQueuePrintStatus(args[0].ival);
}
/* scanppl */
static const iocshArg scanpplArg0 = { "rate",iocshArgDouble};
static const iocshArg * const scanpplArgs[1] = {&scanpplArg0};
@@ -335,6 +346,17 @@ static void callbackSetQueueSizeCallFunc(const iocshArgBuf *args)
callbackSetQueueSize(args[0].ival);
}
/* callbackQueueStatus */
static const iocshArg callbackQueueStatusArg0 = { "reset", iocshArgInt};
static const iocshArg * const callbackQueueStatusArgs[1] =
{&callbackQueueStatusArg0};
static const iocshFuncDef callbackQueueStatusFuncDef =
{"callbackQueueStatus",1,callbackQueueStatusArgs};
static void callbackQueueStatusCallFunc(const iocshArgBuf *args)
{
callbackQueuePrintStatus(args[0].ival);
}
/* callbackParallelThreads */
static const iocshArg callbackParallelThreadsArg0 = { "no of threads", iocshArgInt};
static const iocshArg callbackParallelThreadsArg1 = { "priority", iocshArgString};
@@ -441,12 +463,14 @@ void dbIocRegister(void)
iocshRegister(&dbLockShowLockedFuncDef,dbLockShowLockedCallFunc);
iocshRegister(&scanOnceSetQueueSizeFuncDef,scanOnceSetQueueSizeCallFunc);
iocshRegister(&scanOnceQueueStatusFuncDef,scanOnceQueueStatusCallFunc);
iocshRegister(&scanpplFuncDef,scanpplCallFunc);
iocshRegister(&scanpelFuncDef,scanpelCallFunc);
iocshRegister(&postEventFuncDef,postEventCallFunc);
iocshRegister(&scanpiolFuncDef,scanpiolCallFunc);
iocshRegister(&callbackSetQueueSizeFuncDef,callbackSetQueueSizeCallFunc);
iocshRegister(&callbackQueueStatusFuncDef,callbackQueueStatusCallFunc);
iocshRegister(&callbackParallelThreadsFuncDef,callbackParallelThreadsCallFunc);
/* Needed before callback system is initialized */

View File

@@ -24,6 +24,7 @@
#include "cantProceed.h"
#include "dbDefs.h"
#include "ellLib.h"
#include "epicsAtomic.h"
#include "epicsEvent.h"
#include "epicsMutex.h"
#include "epicsPrint.h"
@@ -63,6 +64,7 @@ static volatile enum ctl scanCtl;
static int onceQueueSize = 1000;
static epicsEventId onceSem;
static epicsRingBytesId onceQ;
static int onceQOverruns = 0;
static epicsThreadId onceTaskId;
static void *exitOnce;
@@ -676,6 +678,7 @@ int scanOnceCallback(struct dbCommon *precord, once_complete cb, void *usr)
if (!pushOK) {
if (newOverflow) errlogPrintf("scanOnce: Ring buffer overflow\n");
newOverflow = FALSE;
epicsAtomicIncrIntT(&onceQOverruns);
} else {
newOverflow = TRUE;
}
@@ -722,6 +725,40 @@ int scanOnceSetQueueSize(int size)
return 0;
}
int scanOnceQueueStatus(const int reset, scanOnceQueueStats *result)
{
int ret;
if (!onceQ) return -1;
if (result) {
result->size = epicsRingBytesSize(onceQ) / sizeof(onceEntry);
result->numUsed = epicsRingBytesUsedBytes(onceQ) / sizeof(onceEntry);
result->maxUsed = epicsRingBytesHighWaterMark(onceQ) / sizeof(onceEntry);
result->numOverflow = epicsAtomicGetIntT(&onceQOverruns);
ret = 0;
} else {
ret = -2;
}
if (reset) {
epicsRingBytesResetHighWaterMark(onceQ);
}
return ret;
}
void scanOnceQueuePrintStatus(const int reset)
{
scanOnceQueueStats stats;
if (scanOnceQueueStatus(reset, &stats) == -1) {
fprintf(stderr, "scanOnce system not initialized, yet. Please run "
"iocInit before using this command.\n");
} else {
double qusage = 100.0 * stats.numUsed / stats.size;
printf("PRIORITY HIGH-WATER MARK ITEMS IN Q Q SIZE %% USED Q OVERFLOWS\n");
printf("%8s %15d %10d %6d %6.1f %11d\n", "scanOnce", stats.maxUsed,
stats.numUsed, stats.size, qusage,
epicsAtomicGetIntT(&onceQOverruns));
}
}
static void initOnce(void)
{
if ((onceQ = epicsRingBytesLockedCreate(sizeof(onceEntry)*onceQueueSize)) == NULL) {

View File

@@ -42,6 +42,13 @@ struct dbCommon;
typedef void (*io_scan_complete)(void *usr, IOSCANPVT, int prio);
typedef void (*once_complete)(void *usr, struct dbCommon*);
typedef struct scanOnceQueueStats {
int size;
int numUsed;
int maxUsed;
int numOverflow;
} scanOnceQueueStats;
epicsShareFunc long scanInit(void);
epicsShareFunc void scanRun(void);
epicsShareFunc void scanPause(void);
@@ -57,6 +64,8 @@ epicsShareFunc double scanPeriod(int scan);
epicsShareFunc int scanOnce(struct dbCommon *);
epicsShareFunc int scanOnceCallback(struct dbCommon *, once_complete cb, void *usr);
epicsShareFunc int scanOnceSetQueueSize(int size);
epicsShareFunc int scanOnceQueueStatus(const int reset, scanOnceQueueStats *result);
void scanOnceQueuePrintStatus(const int reset);
/*print periodic lists*/
epicsShareFunc int scanppl(double rate);

View File

@@ -38,6 +38,7 @@ typedef struct ringPvt {
volatile int nextPut;
volatile int nextGet;
int size;
int highWaterMark;
volatile char buffer[1]; /* actually larger */
}ringPvt;
@@ -47,6 +48,7 @@ epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int size)
if(!pring)
return NULL;
pring->size = size + SLOP;
pring->highWaterMark = 0;
pring->nextGet = 0;
pring->nextPut = 0;
pring->lock = 0;
@@ -118,7 +120,7 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut(
{
ringPvt *pring = (ringPvt *)id;
int nextGet, nextPut, size;
int freeCount, copyCount, topCount;
int freeCount, copyCount, topCount, used;
if (pring->lock) epicsSpinLock(pring->lock);
nextGet = pring->nextGet;
@@ -131,8 +133,9 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut(
if (pring->lock) epicsSpinUnlock(pring->lock);
return 0;
}
if (nbytes)
if (nbytes) {
memcpy ((void *)&pring->buffer[nextPut], value, nbytes);
}
nextPut += nbytes;
}
else {
@@ -143,8 +146,9 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut(
}
topCount = size - nextPut;
copyCount = (nbytes > topCount) ? topCount : nbytes;
if (copyCount)
if (copyCount) {
memcpy ((void *)&pring->buffer[nextPut], value, copyCount);
}
nextPut += copyCount;
if (nextPut == size) {
int nLeft = nbytes - copyCount;
@@ -155,6 +159,10 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut(
}
pring->nextPut = nextPut;
used = nextPut - nextGet;
if (used < 0) used += pring->size;
if (used > pring->highWaterMark) pring->highWaterMark = used;
if (pring->lock) epicsSpinUnlock(pring->lock);
return nbytes;
}
@@ -224,3 +232,20 @@ epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id)
{
return (epicsRingBytesFreeBytes(id) <= 0);
}
epicsShareFunc int epicsShareAPI epicsRingBytesHighWaterMark(epicsRingBytesIdConst id)
{
ringPvt *pring = (ringPvt *)id;
return pring->highWaterMark;
}
epicsShareFunc void epicsShareAPI epicsRingBytesResetHighWaterMark(epicsRingBytesId id)
{
ringPvt *pring = (ringPvt *)id;
int used;
if (pring->lock) epicsSpinLock(pring->lock);
used = pring->nextGet - pring->nextPut;
if (used < 0) used += pring->size;
pring->highWaterMark = used;
if (pring->lock) epicsSpinUnlock(pring->lock);
}

View File

@@ -24,6 +24,7 @@ extern "C" {
#include "shareLib.h"
typedef void *epicsRingBytesId;
typedef void const *epicsRingBytesIdConst;
epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int nbytes);
/* Same, but secured by a spinlock */
@@ -39,6 +40,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesUsedBytes(epicsRingBytesId id);
epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id);
epicsShareFunc int epicsShareAPI epicsRingBytesIsEmpty(epicsRingBytesId id);
epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id);
epicsShareFunc int epicsShareAPI epicsRingBytesHighWaterMark(epicsRingBytesIdConst id);
epicsShareFunc void epicsShareAPI epicsRingBytesResetHighWaterMark(epicsRingBytesId id);
#ifdef __cplusplus
}

View File

@@ -90,3 +90,15 @@ epicsShareFunc int epicsShareAPI epicsRingPointerIsFull(epicsRingPointerId id)
voidPointer *pvoidPointer = reinterpret_cast<voidPointer*>(id);
return((pvoidPointer->isFull()) ? 1 : 0);
}
epicsShareFunc int epicsShareAPI epicsRingPointerGetHighWaterMark(epicsRingPointerIdConst id)
{
voidPointer const *pvoidPointer = reinterpret_cast<voidPointer const*>(id);
return(pvoidPointer->getHighWaterMark());
}
epicsShareFunc void epicsShareAPI epicsRingPointerResetHighWaterMark(epicsRingPointerId id)
{
voidPointer *pvoidPointer = reinterpret_cast<voidPointer*>(id);
pvoidPointer->resetHighWaterMark();
}

View File

@@ -40,18 +40,22 @@ public: /* Functions */
int getSize() const;
bool isEmpty() const;
bool isFull() const;
int getHighWaterMark() const;
void resetHighWaterMark();
private: /* Prevent compiler-generated member functions */
/* default constructor, copy constructor, assignment operator */
epicsRingPointer();
epicsRingPointer(const epicsRingPointer &);
epicsRingPointer& operator=(const epicsRingPointer &);
int getUsedNoLock() const;
private: /* Data */
epicsSpinId lock;
volatile int nextPush;
volatile int nextPop;
int size;
int highWaterMark;
T * volatile * buffer;
};
@@ -59,6 +63,7 @@ extern "C" {
#endif /*__cplusplus */
typedef void *epicsRingPointerId;
typedef void const *epicsRingPointerIdConst;
epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size);
/* Same, but secured by a spinlock */
@@ -74,6 +79,8 @@ epicsShareFunc int epicsShareAPI epicsRingPointerGetUsed(epicsRingPointerId id)
epicsShareFunc int epicsShareAPI epicsRingPointerGetSize(epicsRingPointerId id);
epicsShareFunc int epicsShareAPI epicsRingPointerIsEmpty(epicsRingPointerId id);
epicsShareFunc int epicsShareAPI epicsRingPointerIsFull(epicsRingPointerId id);
epicsShareFunc int epicsShareAPI epicsRingPointerGetHighWaterMark(epicsRingPointerIdConst id);
epicsShareFunc void epicsShareAPI epicsRingPointerResetHighWaterMark(epicsRingPointerId id);
/* This routine was incorrectly named in previous releases */
#define epicsRingPointerSize epicsRingPointerGetSize
@@ -95,7 +102,8 @@ epicsShareFunc int epicsShareAPI epicsRingPointerIsFull(epicsRingPointerId id);
template <class T>
inline epicsRingPointer<T>::epicsRingPointer(int sz, bool locked) :
lock(0), nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1])
lock(0), nextPush(0), nextPop(0), size(sz+1), highWaterMark(0),
buffer(new T* [sz+1])
{
if (locked)
lock = epicsSpinCreate();
@@ -121,6 +129,8 @@ inline bool epicsRingPointer<T>::push(T *p)
}
buffer[next] = p;
nextPush = newNext;
int used = getUsedNoLock();
if (used > highWaterMark) highWaterMark = used;
if (lock) epicsSpinUnlock(lock);
return(true);
}
@@ -161,12 +171,19 @@ inline int epicsRingPointer<T>::getFree() const
return n;
}
template <class T>
inline int epicsRingPointer<T>::getUsedNoLock() const
{
int n = nextPush - nextPop;
if (n < 0) n += size;
return n;
}
template <class T>
inline int epicsRingPointer<T>::getUsed() const
{
if (lock) epicsSpinLock(lock);
int n = nextPush - nextPop;
if (n < 0) n += size;
int n = getUsedNoLock();
if (lock) epicsSpinUnlock(lock);
return n;
}
@@ -196,6 +213,20 @@ inline bool epicsRingPointer<T>::isFull() const
return((count == 0) || (count == size));
}
template <class T>
inline int epicsRingPointer<T>::getHighWaterMark() const
{
return highWaterMark;
}
template <class T>
inline void epicsRingPointer<T>::resetHighWaterMark()
{
if (lock) epicsSpinLock(lock);
highWaterMark = getUsedNoLock();
if (lock) epicsSpinUnlock(lock);
}
#endif /* __cplusplus */
#endif /* INCepicsRingPointerh */

View File

@@ -30,7 +30,8 @@ typedef struct info {
epicsRingBytesId ring;
}info;
static void check(epicsRingBytesId ring, int expectedFree)
static void check(epicsRingBytesId ring, int expectedFree,
int expectedHighWaterMark)
{
int expectedUsed = RINGSIZE - expectedFree;
int expectedEmpty = (expectedUsed == 0);
@@ -39,11 +40,14 @@ static void check(epicsRingBytesId ring, int expectedFree)
int nUsed = epicsRingBytesUsedBytes(ring);
int isEmpty = epicsRingBytesIsEmpty(ring);
int isFull = epicsRingBytesIsFull(ring);
int highWaterMark = epicsRingBytesHighWaterMark(ring);
testOk(nFree == expectedFree, "Free: %d == %d", nFree, expectedFree);
testOk(nUsed == expectedUsed, "Used: %d == %d", nUsed, expectedUsed);
testOk(isEmpty == expectedEmpty, "Empty: %d == %d", isEmpty, expectedEmpty);
testOk(isFull == expectedFull, "Full: %d == %d", isFull, expectedFull);
testOk(highWaterMark == expectedHighWaterMark, "HighWaterMark: %d == %d",
highWaterMark, expectedHighWaterMark);
}
MAIN(ringBytesTest)
@@ -55,7 +59,7 @@ MAIN(ringBytesTest)
char get[RINGSIZE+1];
epicsRingBytesId ring;
testPlan(245);
testPlan(292);
pinfo = calloc(1,sizeof(info));
if (!pinfo) {
@@ -70,50 +74,54 @@ MAIN(ringBytesTest)
if (!ring) {
testAbort("epicsRingBytesCreate failed");
}
check(ring, RINGSIZE);
check(ring, RINGSIZE, 0);
for (i = 0 ; i < sizeof(put) ; i++)
put[i] = i;
for(i = 0 ; i < RINGSIZE ; i++) {
n = epicsRingBytesPut(ring, put, i);
testOk(n==i, "ring put %d", i);
check(ring, RINGSIZE-i);
check(ring, RINGSIZE-i, i);
n = epicsRingBytesGet(ring, get, i);
testOk(n==i, "ring get %d", i);
check(ring, RINGSIZE);
check(ring, RINGSIZE, i);
testOk(memcmp(put,get,i)==0, "get matches write");
}
epicsRingBytesResetHighWaterMark(ring);
for(i = 0 ; i < RINGSIZE ; i++) {
n = epicsRingBytesPut(ring, put+i, 1);
testOk(n==1, "ring put 1, %d", i);
check(ring, RINGSIZE-1-i);
check(ring, RINGSIZE-1-i, i + 1);
}
n = epicsRingBytesPut(ring, put+RINGSIZE, 1);
testOk(n==0, "put to full ring");
check(ring, 0);
check(ring, 0, RINGSIZE);
for(i = 0 ; i < RINGSIZE ; i++) {
n = epicsRingBytesGet(ring, get+i, 1);
testOk(n==1, "ring get 1, %d", i);
check(ring, 1+i);
check(ring, 1+i, RINGSIZE);
}
testOk(memcmp(put,get,RINGSIZE)==0, "get matches write");
n = epicsRingBytesGet(ring, get+RINGSIZE, 1);
testOk(n==0, "get from empty ring");
check(ring, RINGSIZE);
check(ring, RINGSIZE, RINGSIZE);
epicsRingBytesResetHighWaterMark(ring);
n = epicsRingBytesPut(ring, put, RINGSIZE+1);
testOk(n==0, "ring put beyond ring capacity (%d, expected 0)",n);
check(ring, RINGSIZE);
check(ring, RINGSIZE, 0);
n = epicsRingBytesPut(ring, put, 1);
testOk(n==1, "ring put %d", 1);
check(ring, RINGSIZE-1);
check(ring, RINGSIZE-1, 1);
n = epicsRingBytesPut(ring, put, RINGSIZE);
testOk(n==0, "ring put beyond ring capacity (%d, expected 0)",n);
check(ring, RINGSIZE-1);
check(ring, RINGSIZE-1, 1);
n = epicsRingBytesGet(ring, get, 1);
testOk(n==1, "ring get %d", 1);
check(ring, RINGSIZE);
check(ring, RINGSIZE, 1);
epicsRingBytesDelete(ring);
epicsEventDestroy(consumerEvent);

View File

@@ -64,6 +64,7 @@ static void testSingle(void)
testOk1(epicsRingPointerGetFree(ring)==rsize);
testOk1(epicsRingPointerGetSize(ring)==rsize);
testOk1(epicsRingPointerGetUsed(ring)==0);
testOk1(epicsRingPointerGetHighWaterMark(ring)==0);
testOk1(epicsRingPointerPop(ring)==NULL);
@@ -75,6 +76,10 @@ static void testSingle(void)
testOk1(epicsRingPointerGetFree(ring)==rsize-1);
testOk1(epicsRingPointerGetSize(ring)==rsize);
testOk1(epicsRingPointerGetUsed(ring)==1);
testOk1(epicsRingPointerGetHighWaterMark(ring)==1);
epicsRingPointerResetHighWaterMark(ring);
testOk1(epicsRingPointerGetHighWaterMark(ring)==1);
testDiag("Fill it up");
for(i=2; i<2*rsize; i++) {
@@ -92,6 +97,7 @@ static void testSingle(void)
testOk1(epicsRingPointerGetFree(ring)==0);
testOk1(epicsRingPointerGetSize(ring)==rsize);
testOk1(epicsRingPointerGetUsed(ring)==rsize);
testOk1(epicsRingPointerGetHighWaterMark(ring)==rsize);
testDiag("Drain it out");
for(i=1; i<2*rsize; i++) {
@@ -108,6 +114,7 @@ static void testSingle(void)
testOk1(epicsRingPointerGetFree(ring)==rsize);
testOk1(epicsRingPointerGetSize(ring)==rsize);
testOk1(epicsRingPointerGetUsed(ring)==0);
testOk1(epicsRingPointerGetHighWaterMark(ring)==rsize);
testDiag("Fill it up again");
for(i=2; i<2*rsize; i++) {
@@ -236,7 +243,7 @@ MAIN(ringPointerTest)
{
int prio = epicsThreadGetPrioritySelf();
testPlan(37);
testPlan(42);
testSingle();
if (prio)
epicsThreadSetPriority(epicsThreadGetIdSelf(), epicsThreadPriorityScanLow);