diff --git a/documentation/RELEASE_NOTES.html b/documentation/RELEASE_NOTES.html index b9c3f2d68..6415d0a7b 100644 --- a/documentation/RELEASE_NOTES.html +++ b/documentation/RELEASE_NOTES.html @@ -7,7 +7,7 @@ -

EPICS Base Release 3.15.0.1

+

EPICS Base Release 3.15.0.2

EPICS Base 3.15.0.x releases are not intended for use in production systems.

@@ -15,6 +15,23 @@ EPICS Base 3.15.0.x releases are not intended for use in production systems.

Changes between 3.15.0.1 and 3.15.0.2

+

Parallel callback threads

+ +

The general purpose callback facility can run multiple parallel callback +threads per priority level. This makes better use of SMP architectures (e.g. +processors with multiple cores), as callback work - which includes second +stage processing of records with asynchronuous device support and I/O +scanned processing - can be distributed over the available CPUs.

+ +

Note that by using parallel callback threads the order of scan callback +requests in the queue is not retained. If a device support needs to be +informed when scanIoRequest processing has finished, it should use the new +scanIoSetComplete() feature to add a user function that will be called after +the scanIoRequest record processing has finished.

+ +

Parallel callback threads have to be explicitly configured, by default +the IOC keeps the old behavior of running one callback thread per priority.

+

Merge MMIO API from devLib2

Added calls to handle 8, 16, and 32 bit Memory Mapped I/O reads and writes. @@ -108,8 +125,7 @@ backwards compatibility reasons. Only the alarm.h header needs to be included now to declare the epicsAlarmSeverityStrings and epicsAlarmConditionStrings arrays.

-

-General purpose thread pool

+

General purpose thread pool

A general purpose threaded work queue API epicsThreadPool is added. diff --git a/src/ioc/db/callback.c b/src/ioc/db/callback.c index bd20304a3..bcefe292c 100644 --- a/src/ioc/db/callback.c +++ b/src/ioc/db/callback.c @@ -3,8 +3,9 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found -* in file LICENSE that is included with this distribution. +* in file LICENSE that is included with this distribution. \*************************************************************************/ /* callback.c */ @@ -25,8 +26,10 @@ #include "epicsThread.h" #include "epicsExit.h" #include "epicsInterrupt.h" +#include "epicsString.h" #include "epicsTimer.h" #include "epicsRingPointer.h" +#include "epicsAtomic.h" #include "errlog.h" #include "dbStaticLib.h" #include "dbBase.h" @@ -36,6 +39,7 @@ #include "taskwd.h" #include "errMdef.h" #include "dbCommon.h" +#include "epicsExport.h" #define epicsExportSharedSymbols #include "dbAddr.h" #include "dbAccessDefs.h" @@ -44,9 +48,22 @@ static int callbackQueueSize = 2000; -static epicsEventId callbackSem[NUM_CALLBACK_PRIORITIES]; -static epicsRingPointerId callbackQ[NUM_CALLBACK_PRIORITIES]; -static volatile int ringOverflow[NUM_CALLBACK_PRIORITIES]; + +typedef struct cbQueueSet { + epicsEventId semWakeUp; + epicsRingPointerId queue; + int queueOverflow; + int threadsConfigured; + int threadsRunning; +} cbQueueSet; + +static cbQueueSet callbackQueue[NUM_CALLBACK_PRIORITIES]; + +int callbackThreadsDefault = 1; +/* Don't know what a reasonable default is (yet). + * For the time being: parallel means 2 if not explicitly specified */ +epicsShareDef int callbackParallelThreadsDefault = 2; +epicsExportAddress(int,callbackParallelThreadsDefault); /* Timer for Delayed Requests */ static epicsTimerQueueId timerQueue; @@ -58,7 +75,7 @@ static epicsEventId startStopEvent; static void *exitCallback; /* Static data */ -static char *threadName[NUM_CALLBACK_PRIORITIES] = { +static char *threadNamePrefix[NUM_CALLBACK_PRIORITIES] = { "cbLow", "cbMedium", "cbHigh" }; static unsigned int threadPriority[NUM_CALLBACK_PRIORITIES] = { @@ -79,25 +96,80 @@ int callbackSetQueueSize(int size) return 0; } +int callbackParallelThreads(int count, const char *prio) +{ + if (startStopEvent) { + errlogPrintf("Callback system already initialized\n"); + return -1; + } + + if (count < 0) + count = epicsThreadGetCPUs() + count; + else if (count == 0) + count = callbackParallelThreadsDefault; + if (count < 1) count = 1; + + if (!prio || strcmp(prio, "") == 0 || strcmp(prio, "*") == 0) { + int i; + + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { + callbackQueue[i].threadsConfigured = count; + } + } + else { + dbMenu *pdbMenu; + + if (!pdbbase) { + errlogPrintf("callbackParallelThreads: pdbbase not set\n"); + return -1; + } + /* Find prio in menuPriority */ + pdbMenu = dbFindMenu(pdbbase, "menuPriority"); + if (pdbMenu) { + int i, gotMatch = 0; + + for (i = 0; i < pdbMenu->nChoice; i++) { + gotMatch = (epicsStrCaseCmp(prio, pdbMenu->papChoiceValue[i])==0) ? TRUE : FALSE; + if (gotMatch) + break; + } + if (gotMatch) { + callbackQueue[i].threadsConfigured = count; + return 0; + } + else { + errlogPrintf("Unknown priority \"%s\"\n", prio); + return -1; + } + } + } + return 0; +} + static void callbackTask(void *arg) { - int priority = *(int *)arg; + cbQueueSet *mySet = &callbackQueue[*(int*)arg]; taskwdInsert(0, NULL, NULL); epicsEventSignal(startStopEvent); while(TRUE) { void *ptr; - epicsEventMustWait(callbackSem[priority]); - while((ptr = epicsRingPointerPop(callbackQ[priority]))) { + if (epicsRingPointerIsEmpty(mySet->queue)) + epicsEventMustWait(mySet->semWakeUp); + + while ((ptr = epicsRingPointerPop(mySet->queue))) { CALLBACK *pcallback = (CALLBACK *)ptr; + if(!epicsRingPointerIsEmpty(mySet->queue)) + epicsEventMustTrigger(mySet->semWakeUp); if (ptr == &exitCallback) goto shutdown; - ringOverflow[priority] = FALSE; + mySet->queueOverflow = FALSE; (*pcallback->callback)(pcallback); } } shutdown: + mySet->threadsRunning--; taskwdRemove(0); epicsEventSignal(startStopEvent); } @@ -109,80 +181,101 @@ void callbackShutdown(void) if (cbCtl == ctlExit) return; cbCtl = ctlExit; + /* sequential shutdown of workers */ for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { - int lockKey = epicsInterruptLock(); - int ok = epicsRingPointerPush(callbackQ[i], &exitCallback); - epicsInterruptUnlock(lockKey); - epicsEventSignal(callbackSem[i]); - if (ok) epicsEventWait(startStopEvent); - epicsEventDestroy(callbackSem[i]); - epicsRingPointerDelete(callbackQ[i]); + while (callbackQueue[i].threadsRunning) { + if(epicsRingPointerPush(callbackQueue[i].queue, &exitCallback)) { + epicsEventSignal(callbackQueue[i].semWakeUp); + epicsEventWait(startStopEvent); + } else { + epicsThreadSleep(0.05); + } + } + assert(callbackQueue[i].threadsRunning==0); + epicsEventDestroy(callbackQueue[i].semWakeUp); + epicsRingPointerDelete(callbackQueue[i].queue); } epicsTimerQueueRelease(timerQueue); epicsEventDestroy(startStopEvent); startStopEvent = NULL; + memset(callbackQueue, 0, sizeof(callbackQueue)); } void callbackInit(void) { int i; + int j; + char threadName[32]; - if(startStopEvent) + if (startStopEvent) return; startStopEvent = epicsEventMustCreate(epicsEventEmpty); cbCtl = ctlRun; - timerQueue = epicsTimerQueueAllocate(0,epicsThreadPriorityScanHigh); + timerQueue = epicsTimerQueueAllocate(0, epicsThreadPriorityScanHigh); + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) { epicsThreadId tid; - callbackSem[i] = epicsEventMustCreate(epicsEventEmpty); - callbackQ[i] = epicsRingPointerCreate(callbackQueueSize); - if (callbackQ[i] == 0) - cantProceed("epicsRingPointerCreate failed for %s\n", - threadName[i]); - ringOverflow[i] = FALSE; - tid = epicsThreadCreate(threadName[i], threadPriority[i], - epicsThreadGetStackSize(epicsThreadStackBig), - (EPICSTHREADFUNC)callbackTask, &priorityValue[i]); - if (tid == 0) - cantProceed("Failed to spawn callback task %s\n", threadName[i]); - else - epicsEventWait(startStopEvent); + callbackQueue[i].semWakeUp = epicsEventMustCreate(epicsEventEmpty); + callbackQueue[i].queue = epicsRingPointerLockedCreate(callbackQueueSize); + if (callbackQueue[i].queue == 0) + cantProceed("epicsRingPointerLockedCreate failed for %s\n", + threadNamePrefix[i]); + callbackQueue[i].queueOverflow = FALSE; + if (callbackQueue[i].threadsConfigured == 0) + callbackQueue[i].threadsConfigured = callbackThreadsDefault; + + for (j = 0; j < callbackQueue[i].threadsConfigured; j++) { + if (callbackQueue[i].threadsConfigured > 1 ) + sprintf(threadName, "%s-%d", threadNamePrefix[i], j); + else + strcpy(threadName, threadNamePrefix[i]); + tid = epicsThreadCreate(threadName, threadPriority[i], + epicsThreadGetStackSize(epicsThreadStackBig), + (EPICSTHREADFUNC)callbackTask, &priorityValue[i]); + if (tid == 0) { + cantProceed("Failed to spawn callback thread %s\n", threadName); + } else { + epicsEventWait(startStopEvent); + callbackQueue[i].threadsRunning++; + } + } } } /* This routine can be called from interrupt context */ -void callbackRequest(CALLBACK *pcallback) +int callbackRequest(CALLBACK *pcallback) { int priority; int pushOK; - int lockKey; + cbQueueSet *mySet; if (!pcallback) { epicsInterruptContextMessage("callbackRequest: pcallback was NULL\n"); - return; + return S_db_notInit; } priority = pcallback->priority; if (priority < 0 || priority >= NUM_CALLBACK_PRIORITIES) { epicsInterruptContextMessage("callbackRequest: Bad priority\n"); - return; + return S_db_badChoice; } - if (ringOverflow[priority]) return; + mySet = &callbackQueue[priority]; + if (mySet->queueOverflow) return S_db_bufFull; - lockKey = epicsInterruptLock(); - pushOK = epicsRingPointerPush(callbackQ[priority], pcallback); - epicsInterruptUnlock(lockKey); + pushOK = epicsRingPointerPush(mySet->queue, pcallback); if (!pushOK) { char msg[48] = "callbackRequest: "; - strcat(msg, threadName[priority]); + strcat(msg, threadNamePrefix[priority]); strcat(msg, " ring buffer full\n"); epicsInterruptContextMessage(msg); - ringOverflow[priority] = TRUE; + mySet->queueOverflow = TRUE; + return S_db_bufFull; } - epicsEventSignal(callbackSem[priority]); + epicsEventSignal(mySet->semWakeUp); + return 0; } static void ProcessCallback(CALLBACK *pcallback) @@ -203,11 +296,11 @@ void callbackSetProcess(CALLBACK *pcallback, int Priority, void *pRec) callbackSetUser(pRec, pcallback); } -void callbackRequestProcessCallback(CALLBACK *pcallback, +int callbackRequestProcessCallback(CALLBACK *pcallback, int Priority, void *pRec) { callbackSetProcess(pcallback, Priority, pRec); - callbackRequest(pcallback); + return callbackRequest(pcallback); } static void notify(void *pPrivate) diff --git a/src/ioc/db/callback.h b/src/ioc/db/callback.h index 6ce0b1adb..2732f9d5e 100644 --- a/src/ioc/db/callback.h +++ b/src/ioc/db/callback.h @@ -3,6 +3,7 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ @@ -56,11 +57,11 @@ typedef void (*CALLBACKFUNC)(struct callbackPvt*); ( (USER) = (void *)((CALLBACK *)(PCALLBACK))->user ) epicsShareFunc void callbackInit(void); -epicsShareFunc void callbackRequest(CALLBACK *pCallback); epicsShareFunc void callbackShutdown(void); +epicsShareFunc int callbackRequest(CALLBACK *pCallback); epicsShareFunc void callbackSetProcess( CALLBACK *pcallback, int Priority, void *pRec); -epicsShareFunc void callbackRequestProcessCallback( +epicsShareFunc int callbackRequestProcessCallback( CALLBACK *pCallback,int Priority, void *pRec); epicsShareFunc void callbackRequestDelayed( CALLBACK *pCallback,double seconds); @@ -68,6 +69,7 @@ epicsShareFunc void callbackCancelDelayed(CALLBACK *pcallback); epicsShareFunc void callbackRequestProcessCallbackDelayed( CALLBACK *pCallback, int Priority, void *pRec, double seconds); epicsShareFunc int callbackSetQueueSize(int size); +epicsShareFunc int callbackParallelThreads(int count, const char *prio); #ifdef __cplusplus } diff --git a/src/ioc/db/dbAccessDefs.h b/src/ioc/db/dbAccessDefs.h index ba794a520..31ffc18c2 100644 --- a/src/ioc/db/dbAccessDefs.h +++ b/src/ioc/db/dbAccessDefs.h @@ -199,6 +199,8 @@ struct dbr_alDouble {DBRalDouble}; #define S_db_cntSpwn (M_dbAccess|63) /*Cannot spawn dbContTask*/ #define S_db_cntCont (M_dbAccess|65) /*Cannot resume dbContTask*/ #define S_db_noMemory (M_dbAccess|66) /*unable to allocate data structure from pool*/ +#define S_db_notInit (M_dbAccess|67) /*Not initialized*/ +#define S_db_bufFull (M_dbAccess|68) /*Buffer full*/ epicsShareFunc long dbPutSpecial(struct dbAddr *paddr,int pass); epicsShareFunc struct rset * dbGetRset(const struct dbAddr *paddr); diff --git a/src/ioc/db/dbIocRegister.c b/src/ioc/db/dbIocRegister.c index c07dcc730..c543f98fa 100644 --- a/src/ioc/db/dbIocRegister.c +++ b/src/ioc/db/dbIocRegister.c @@ -24,6 +24,8 @@ #include "dbIocRegister.h" #include "dbState.h" +epicsShareExtern int callbackParallelThreadsDefault; + /* dbLoadDatabase */ static const iocshArg dbLoadDatabaseArg0 = { "file name",iocshArgString}; static const iocshArg dbLoadDatabaseArg1 = { "path",iocshArgString}; @@ -305,6 +307,18 @@ static void callbackSetQueueSizeCallFunc(const iocshArgBuf *args) callbackSetQueueSize(args[0].ival); } +/* callbackParallelThreads */ +static const iocshArg callbackParallelThreadsArg0 = { "no of threads", iocshArgInt}; +static const iocshArg callbackParallelThreadsArg1 = { "priority", iocshArgString}; +static const iocshArg * const callbackParallelThreadsArgs[2] = + {&callbackParallelThreadsArg0,&callbackParallelThreadsArg1}; +static const iocshFuncDef callbackParallelThreadsFuncDef = + {"callbackParallelThreads",2,callbackParallelThreadsArgs}; +static void callbackParallelThreadsCallFunc(const iocshArgBuf *args) +{ + callbackParallelThreads(args[0].ival, args[1].sval); +} + /* dbStateCreate */ static const iocshArg dbStateArgName = { "name", iocshArgString }; static const iocshArg * const dbStateCreateArgs[] = { &dbStateArgName }; @@ -402,6 +416,10 @@ void dbIocRegister(void) iocshRegister(&scanpiolFuncDef,scanpiolCallFunc); iocshRegister(&callbackSetQueueSizeFuncDef,callbackSetQueueSizeCallFunc); + iocshRegister(&callbackParallelThreadsFuncDef,callbackParallelThreadsCallFunc); + + /* Needed before callback system is initialized */ + callbackParallelThreadsDefault = epicsThreadGetCPUs(); iocshRegister(&dbStateCreateFuncDef, dbStateCreateCallFunc); iocshRegister(&dbStateSetFuncDef, dbStateSetCallFunc); diff --git a/src/ioc/db/dbScan.c b/src/ioc/db/dbScan.c index a1ba12d9a..b7ac0e788 100644 --- a/src/ioc/db/dbScan.c +++ b/src/ioc/db/dbScan.c @@ -114,20 +114,24 @@ typedef struct event_list { char event_name[MAX_STRING_SIZE]; } event_list; static event_list * volatile pevent_list[256]; - +static epicsMutexId event_lock; /* IO_EVENT*/ typedef struct io_scan_list { - CALLBACK callback; - scan_list scan_list; - struct io_scan_list *next; + CALLBACK callback; + scan_list scan_list; } io_scan_list; -static io_scan_list *iosl_head[NUM_CALLBACK_PRIORITIES] = { - NULL, NULL, NULL -}; +typedef struct ioscan_head { + struct ioscan_head *next; + struct io_scan_list iosl[NUM_CALLBACK_PRIORITIES]; + io_scan_complete cb; + void *arg; +} ioscan_head; +static ioscan_head *pioscan_list = NULL; +static epicsMutexId ioscan_lock; /* Private routines */ static void onceTask(void *); @@ -136,9 +140,10 @@ static void periodicTask(void *arg); static void initPeriodic(void); static void deletePeriodic(void); static void spawnPeriodic(int ind); -static void initEvent(void); static void eventCallback(CALLBACK *pcallback); -static void ioeventCallback(CALLBACK *pcallback); +static void ioscanInit(void); +static void ioscanCallback(CALLBACK *pcallback); +static void ioscanDestroy(void); static void printList(scan_list *psl, char *message); static void scanList(scan_list *psl); static void buildScanLists(void); @@ -164,6 +169,7 @@ void scanShutdown(void) epicsEventWait(startStopEvent); deletePeriodic(); + ioscanDestroy(); epicsRingPointerDelete(onceQ); @@ -185,7 +191,6 @@ long scanInit(void) initPeriodic(); initOnce(); - initEvent(); buildScanLists(); for (i = 0; i < nPeriodic; i++) spawnPeriodic(i); @@ -245,7 +250,7 @@ void scanAdd(struct dbCommon *precord) pel = eventNameToHandle(eventname); if (pel) addToList(precord, &pel->scan_list[prio]); } else if (scan == menuScanI_O_Intr) { - io_scan_list *piosl = NULL; + ioscan_head *piosh = NULL; int prio; DEVSUPFUN get_ioint_info; @@ -262,11 +267,11 @@ void scanAdd(struct dbCommon *precord) precord->scan = menuScanPassive; return; } - if (get_ioint_info(0, precord, &piosl)) { + if (get_ioint_info(0, precord, &piosh)) { precord->scan = menuScanPassive; return; } - if (piosl == NULL) { + if (piosh == NULL) { recGblRecordError(-1, (void *)precord, "scanAdd: I/O Intr not valid"); precord->scan = menuScanPassive; @@ -279,8 +284,7 @@ void scanAdd(struct dbCommon *precord) precord->scan = menuScanPassive; return; } - piosl += prio; /* get piosl for correct priority*/ - addToList(precord, &piosl->scan_list); + addToList(precord, &piosh->iosl[prio].scan_list); } else if (scan >= SCAN_1ST_PERIODIC) { addToList(precord, &papPeriodic[scan - SCAN_1ST_PERIODIC]->scan_list); } @@ -319,7 +323,7 @@ void scanDelete(struct dbCommon *precord) if (pel && (psl = &pel->scan_list[prio])) deleteFromList(precord, psl); } else if (scan == menuScanI_O_Intr) { - io_scan_list *piosl=NULL; + ioscan_head *piosh = NULL; int prio; DEVSUPFUN get_ioint_info; @@ -334,8 +338,8 @@ void scanDelete(struct dbCommon *precord) "scanDelete: I/O Intr not valid (no get_ioint_info)"); return; } - if (get_ioint_info(1, precord, &piosl)) return; - if (piosl == NULL) { + if (get_ioint_info(1, precord, &piosh)) return; + if (piosh == NULL) { recGblRecordError(-1, (void *)precord, "scanDelete: I/O Intr not valid"); return; @@ -346,8 +350,7 @@ void scanDelete(struct dbCommon *precord) "scanDelete: get_ioint_info returned illegal priority"); return; } - piosl += prio; /*get piosl for correct priority*/ - deleteFromList(precord, &piosl->scan_list); + deleteFromList(precord, &piosh->iosl[prio].scan_list); } else if (scan >= SCAN_1ST_PERIODIC) { deleteFromList(precord, &papPeriodic[scan - SCAN_1ST_PERIODIC]->scan_list); } @@ -399,21 +402,28 @@ int scanpel(const char* eventname) /* print event list */ return 0; } -int scanpiol(void) /* print io_event list */ +int scanpiol(void) /* print pioscan_list */ { - io_scan_list *piosl; - int prio; - char message[80]; + ioscan_head *piosh; - for(prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { - piosl = iosl_head[prio]; - if (piosl == NULL) continue; - sprintf(message, "IO Event: Priority %s", priorityName[prio]); - while(piosl != NULL) { + ioscanInit(); + epicsMutexMustLock(ioscan_lock); + piosh = pioscan_list; + + while (piosh) { + int prio; + + for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { + io_scan_list *piosl = &piosh->iosl[prio]; + char message[80]; + + sprintf(message, "IO Event %p: Priority %s", + piosh, priorityName[prio]); printList(&piosl->scan_list, message); - piosl = piosl->next; } + piosh = piosh->next; } + epicsMutexUnlock(ioscan_lock); return 0; } @@ -425,19 +435,22 @@ static void eventCallback(CALLBACK *pcallback) scanList(psl); } -static void initEvent(void) +static void eventOnce(void *arg) { + event_lock = epicsMutexMustCreate(); } event_list *eventNameToHandle(const char *eventname) { int prio; event_list *pel; - static epicsMutexId lock = NULL; + static epicsThreadOnceId onceId = EPICS_THREAD_ONCE_INIT; - if (!lock) lock = epicsMutexMustCreate(); - if (!eventname || eventname[0] == 0) return NULL; - epicsMutexMustLock(lock); + if (!eventname || eventname[0] == 0) + return NULL; + + epicsThreadOnce(&onceId, eventOnce, NULL); + epicsMutexMustLock(event_lock); for (pel = pevent_list[0]; pel; pel=pel->next) { if (strcmp(pel->event_name, eventname) == 0) break; } @@ -460,7 +473,7 @@ event_list *eventNameToHandle(const char *eventname) pevent_list[e] = pel; } } - epicsMutexUnlock(lock); + epicsMutexUnlock(event_lock); return pel; } @@ -488,38 +501,91 @@ void post_event(int event) postEvent(pel); } -void scanIoInit(IOSCANPVT *ppioscanpvt) +static void ioscanOnce(void *arg) { + ioscan_lock = epicsMutexMustCreate(); +} + +static void ioscanInit(void) +{ + static epicsThreadOnceId onceId = EPICS_THREAD_ONCE_INIT; + + epicsThreadOnce(&onceId, ioscanOnce, NULL); +} + +static void ioscanDestroy(void) +{ + ioscan_head *piosh; + + ioscanInit(); + epicsMutexMustLock(ioscan_lock); + piosh = pioscan_list; + pioscan_list = NULL; + epicsMutexUnlock(ioscan_lock); + while (piosh) { + ioscan_head *pnext = piosh->next; + int prio; + + for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { + epicsMutexDestroy(piosh->iosl[prio].scan_list.lock); + ellFree(&piosh->iosl[prio].scan_list.list); + } + free(piosh); + piosh = pnext; + } +} + +void scanIoInit(IOSCANPVT *pioscanpvt) +{ + ioscan_head *piosh = dbCalloc(1, sizeof(ioscan_head)); int prio; - /* Allocate an array of io_scan_lists, one for each priority. */ - /* IOSCANPVT will hold the address of this array of structures */ - *ppioscanpvt = dbCalloc(NUM_CALLBACK_PRIORITIES, sizeof(io_scan_list)); + ioscanInit(); for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { - io_scan_list *piosl = &(*ppioscanpvt)[prio]; - callbackSetCallback(ioeventCallback, &piosl->callback); + io_scan_list *piosl = &piosh->iosl[prio]; + + callbackSetCallback(ioscanCallback, &piosl->callback); callbackSetPriority(prio, &piosl->callback); - callbackSetUser(piosl, &piosl->callback); + callbackSetUser(piosh, &piosl->callback); ellInit(&piosl->scan_list.list); piosl->scan_list.lock = epicsMutexMustCreate(); - piosl->next = iosl_head[prio]; - iosl_head[prio] = piosl; } + epicsMutexMustLock(ioscan_lock); + piosh->next = pioscan_list; + pioscan_list = piosh; + epicsMutexUnlock(ioscan_lock); + *pioscanpvt = piosh; } - -void scanIoRequest(IOSCANPVT pioscanpvt) +/* Return a bit mask indicating each priority level + * in which a callback request was successfully queued. + */ +unsigned int scanIoRequest(IOSCANPVT piosh) { int prio; + unsigned int queued = 0; + + if (scanCtl != ctlRun) + return 0; - if (scanCtl != ctlRun) return; for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) { - io_scan_list *piosl = &pioscanpvt[prio]; + io_scan_list *piosl = &piosh->iosl[prio]; + if (ellCount(&piosl->scan_list.list) > 0) - callbackRequest(&piosl->callback); + if (!callbackRequest(&piosl->callback)) + queued |= 1 << prio; } + + return queued; } - + +/* May not be called while a scan request is queued or running */ +void scanIoSetComplete(IOSCANPVT piosh, io_scan_complete cb, void *arg) +{ + piosh->cb = cb; + piosh->arg = arg; +} + void scanOnce(struct dbCommon *precord) { static int newOverflow = TRUE; @@ -744,12 +810,14 @@ static void spawnPeriodic(int ind) epicsEventWait(startStopEvent); } -static void ioeventCallback(CALLBACK *pcallback) +static void ioscanCallback(CALLBACK *pcallback) { - io_scan_list *piosl; + ioscan_head *piosh = (ioscan_head *) pcallback->user; + int prio = pcallback->priority; - callbackGetUser(piosl, pcallback); - scanList(&piosl->scan_list); + scanList(&piosh->iosl[prio].scan_list); + if (piosh->cb) + piosh->cb(piosh->arg, piosh, prio); } static void printList(scan_list *psl, char *message) @@ -759,14 +827,17 @@ static void printList(scan_list *psl, char *message) epicsMutexMustLock(psl->lock); pse = (scan_element *)ellFirst(&psl->list); epicsMutexUnlock(psl->lock); - if (pse == NULL) return; + + if (!pse) + return; + printf("%s\n", message); - while (pse != NULL) { + while (pse) { printf(" %-28s\n", pse->precord->name); epicsMutexMustLock(psl->lock); if (pse->pscan_list != psl) { epicsMutexUnlock(psl->lock); - printf("Scan list changed while processing."); + printf(" Scan list changed while printing, try again.\n"); return; } pse = (scan_element *)ellNext(&pse->node); diff --git a/src/ioc/db/dbScan.h b/src/ioc/db/dbScan.h index 03117f769..cd9666348 100644 --- a/src/ioc/db/dbScan.h +++ b/src/ioc/db/dbScan.h @@ -34,11 +34,13 @@ extern "C" { #define MIN_PHASE SHRT_MIN /*definitions for I/O Interrupt Scanning */ -struct io_scan_list; +struct ioscan_head; -typedef struct io_scan_list *IOSCANPVT; +typedef struct ioscan_head *IOSCANPVT; typedef struct event_list *EVENTPVT; +typedef void (*io_scan_complete)(void *usr, IOSCANPVT, int prio); + struct dbCommon; epicsShareFunc long scanInit(void); @@ -64,8 +66,9 @@ epicsShareFunc int scanpel(const char *event_name); /*print io_event list*/ epicsShareFunc int scanpiol(void); -epicsShareFunc void scanIoInit(IOSCANPVT *); -epicsShareFunc void scanIoRequest(IOSCANPVT); +epicsShareFunc void scanIoInit(IOSCANPVT *ppios); +epicsShareFunc unsigned int scanIoRequest(IOSCANPVT pios); +epicsShareFunc void scanIoSetComplete(IOSCANPVT, io_scan_complete, void *usr); #ifdef __cplusplus } diff --git a/src/ioc/db/test/Makefile b/src/ioc/db/test/Makefile index 2d4560846..320ea5260 100644 --- a/src/ioc/db/test/Makefile +++ b/src/ioc/db/test/Makefile @@ -55,11 +55,28 @@ callbackTest_SRCS += callbackTest.c testHarness_SRCS += callbackTest.c TESTS += callbackTest +TESTPROD_HOST += callbackParallelTest +callbackParallelTest_SRCS += callbackParallelTest.c +testHarness_SRCS += callbackParallelTest.c +TESTS += callbackParallelTest + TESTPROD_HOST += dbStateTest dbStateTest_SRCS += dbStateTest.c testHarness_SRCS += dbStateTest.c TESTS += dbStateTest +TARGETS += $(COMMON_DIR)/scanIoTest.dbd +scanIoTest_DBD += menuGlobal.dbd +scanIoTest_DBD += menuConvert.dbd +scanIoTest_DBD += yRecord.dbd +TESTPROD_HOST += scanIoTest +scanIoTest_SRCS += scanIoTest.c +scanIoTest_SRCS += scanIoTest_registerRecordDeviceDriver.cpp +testHarness_SRCS += scanIoTest.c +testHarness_SRCS += scanIoTest_registerRecordDeviceDriver.cpp +TESTFILES += $(COMMON_DIR)/scanIoTest.dbd ../scanIoTest.db +TESTS += scanIoTest + TESTPROD_HOST += dbChannelTest dbChannelTest_SRCS += dbChannelTest.c dbChannelTest_SRCS += dbTestIoc_registerRecordDeviceDriver.cpp @@ -105,4 +122,5 @@ include $(TOP)/configure/RULES xRecord$(DEP): $(COMMON_DIR)/xRecord.h dbPutLinkTest$(DEP): $(COMMON_DIR)/xRecord.h +scanIoTest$(DEP): $(COMMON_DIR)/yRecord.h diff --git a/src/ioc/db/test/callbackParallelTest.c b/src/ioc/db/test/callbackParallelTest.c new file mode 100644 index 000000000..eeb6ddaee --- /dev/null +++ b/src/ioc/db/test/callbackParallelTest.c @@ -0,0 +1,190 @@ +/*************************************************************************\ +* Copyright (c) 2008 UChicago Argonne LLC, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2002 The Regents of the University of California, as +* Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ +/* $Revision-Id$ */ + +/* Author: Marty Kraimer Date: 26JAN2000 */ + +#include +#include +#include +#include +#include +#include + +#include "callback.h" +#include "cantProceed.h" +#include "epicsThread.h" +#include "epicsEvent.h" +#include "epicsTime.h" +#include "epicsUnitTest.h" +#include "testMain.h" + +/* + * This test checks both immediate and delayed callbacks in two steps. + * In the first step (pass1) NCALLBACKS immediate callbacks are queued. + * As each is run it starts a second delayed callback (pass2). + * The last delayed callback which runs signals an epicsEvent + * to the main thread. + * + * Two time intervals are measured. The time to queue and run each of + * the immediate callbacks, and the actual delay of the delayed callback. + */ + +#define NCALLBACKS 169 +#define DELAY_QUANTUM 0.25 + +#define TEST_DELAY(i) ((i / NUM_CALLBACK_PRIORITIES) * DELAY_QUANTUM) + +typedef struct myPvt { + CALLBACK cb1; + CALLBACK cb2; + epicsTimeStamp pass1Time; + epicsTimeStamp pass2Time; + double delay; + int pass; + int resultFail; +} myPvt; + +epicsEventId finished; + +static void myCallback(CALLBACK *pCallback) +{ + myPvt *pmyPvt; + + callbackGetUser(pmyPvt, pCallback); + + pmyPvt->pass++; + + if (pmyPvt->pass == 1) { + epicsTimeGetCurrent(&pmyPvt->pass1Time); + callbackRequestDelayed(&pmyPvt->cb2, pmyPvt->delay); + } else if (pmyPvt->pass == 2) { + epicsTimeGetCurrent(&pmyPvt->pass2Time); + } else { + pmyPvt->resultFail = 1; + return; + } +} + +static void finalCallback(CALLBACK *pCallback) +{ + myCallback(pCallback); + epicsEventSignal(finished); +} + +static void updateStats(double *stats, double val) +{ + if (stats[0] > val) stats[0] = val; + if (stats[1] < val) stats[1] = val; + stats[2] += val; + stats[3] += pow(val, 2.0); + stats[4] += 1.; +} + +static void printStats(double *stats, const char* tag) { + testDiag("Priority %4s min/avg/max/sigma = %f / %f / %f / %f", + tag, stats[0], stats[2]/stats[4], stats[1], + sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]); +} + +MAIN(callbackParallelTest) +{ + myPvt *pcbt[NCALLBACKS]; + epicsTimeStamp start; + int noCpus = epicsThreadGetCPUs(); + int i, j; + /* Statistics: min/max/sum/sum^2/n for each priority */ + double setupError[NUM_CALLBACK_PRIORITIES][5]; + double timeError[NUM_CALLBACK_PRIORITIES][5]; + double defaultError[5] = {1,-1,0,0,0}; + + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) + for (j = 0; j < 5; j++) + setupError[i][j] = timeError[i][j] = defaultError[j]; + + testPlan(NCALLBACKS * 2 + 1); + + testDiag("Starting %d parallel callback threads", noCpus); + + callbackParallelThreads(noCpus, ""); + callbackInit(); + epicsThreadSleep(1.0); + + finished = epicsEventMustCreate(epicsEventEmpty); + + for (i = 0; i < NCALLBACKS ; i++) { + pcbt[i] = callocMustSucceed(1, sizeof(myPvt), "pcbt"); + callbackSetCallback(myCallback, &pcbt[i]->cb1); + callbackSetCallback(myCallback, &pcbt[i]->cb2); + callbackSetUser(pcbt[i], &pcbt[i]->cb1); + callbackSetUser(pcbt[i], &pcbt[i]->cb2); + callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb1); + callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb2); + pcbt[i]->delay = TEST_DELAY(i); + pcbt[i]->pass = 0; + } + + /* Last callback is special */ + callbackSetCallback(finalCallback, &pcbt[NCALLBACKS-1]->cb2); + callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb1); + callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb2); + pcbt[NCALLBACKS-1]->delay = TEST_DELAY(NCALLBACKS) + 1.0; + pcbt[NCALLBACKS-1]->pass = 0; + + testOk1(epicsTimeGetCurrent(&start)==epicsTimeOK); + + for (i = 0; i < NCALLBACKS ; i++) { + callbackRequest(&pcbt[i]->cb1); + } + + testDiag("Waiting %.02f sec", pcbt[NCALLBACKS-1]->delay); + + epicsEventWait(finished); + + for (i = 0; i < NCALLBACKS ; i++) { + if(pcbt[i]->resultFail || pcbt[i]->pass!=2) + testFail("pass = %d for delay = %f", pcbt[i]->pass, pcbt[i]->delay); + else { + double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start); + testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05", + pcbt[i]->delay, delta); + updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta); + } + } + + for (i = 0; i < NCALLBACKS ; i++) { + double delta, error; + if(pcbt[i]->resultFail || pcbt[i]->pass!=2) + continue; + delta = epicsTimeDiffInSeconds(&pcbt[i]->pass2Time, &pcbt[i]->pass1Time); + error = delta - pcbt[i]->delay; + testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05", + pcbt[i]->delay, error); + updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error); + } + + testDiag("Setup time statistics"); + printStats(setupError[0], "LOW"); + printStats(setupError[1], "MID"); + printStats(setupError[2], "HIGH"); + + testDiag("Delay time statistics"); + printStats(timeError[0], "LOW"); + printStats(timeError[1], "MID"); + printStats(timeError[2], "HIGH"); + + for (i = 0; i < NCALLBACKS ; i++) { + free(pcbt[i]); + } + + callbackShutdown(); + + return testDone(); +} diff --git a/src/ioc/db/test/callbackTest.c b/src/ioc/db/test/callbackTest.c index 47b5516d1..c4c6b31d9 100644 --- a/src/ioc/db/test/callbackTest.c +++ b/src/ioc/db/test/callbackTest.c @@ -3,6 +3,7 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ @@ -79,11 +80,34 @@ static void finalCallback(CALLBACK *pCallback) epicsEventSignal(finished); } +static void updateStats(double *stats, double val) +{ + if (stats[0] > val) stats[0] = val; + if (stats[1] < val) stats[1] = val; + stats[2] += val; + stats[3] += pow(val, 2.0); + stats[4] += 1.; +} + +static void printStats(double *stats, const char* tag) { + testDiag("Priority %4s min/avg/max/sigma = %f / %f / %f / %f", + tag, stats[0], stats[2]/stats[4], stats[1], + sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]); +} + MAIN(callbackTest) { myPvt *pcbt[NCALLBACKS]; epicsTimeStamp start; - int i; + int i, j; + /* Statistics: min/max/sum/sum^2/n for each priority */ + double setupError[NUM_CALLBACK_PRIORITIES][5]; + double timeError[NUM_CALLBACK_PRIORITIES][5]; + double defaultError[5] = {1,-1,0,0,0}; + + for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) + for (j = 0; j < 5; j++) + setupError[i][j] = timeError[i][j] = defaultError[j]; testPlan(NCALLBACKS * 2 + 1); @@ -128,8 +152,8 @@ MAIN(callbackTest) double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start); testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05", pcbt[i]->delay, delta); + updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta); } - } for (i = 0; i < NCALLBACKS ; i++) { @@ -140,7 +164,24 @@ MAIN(callbackTest) error = delta - pcbt[i]->delay; testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05", pcbt[i]->delay, error); + updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error); } + testDiag("Setup time statistics"); + printStats(setupError[0], "LOW"); + printStats(setupError[1], "MID"); + printStats(setupError[2], "HIGH"); + + testDiag("Delay time statistics"); + printStats(timeError[0], "LOW"); + printStats(timeError[1], "MID"); + printStats(timeError[2], "HIGH"); + + for (i = 0; i < NCALLBACKS ; i++) { + free(pcbt[i]); + } + + callbackShutdown(); + return testDone(); } diff --git a/src/ioc/db/test/epicsRunDbTests.c b/src/ioc/db/test/epicsRunDbTests.c index ae150b55a..349fabccd 100644 --- a/src/ioc/db/test/epicsRunDbTests.c +++ b/src/ioc/db/test/epicsRunDbTests.c @@ -20,6 +20,7 @@ int testdbConvert(void); int callbackTest(void); int dbStateTest(void); int dbShutdownTest(void); +int scanIoTest(void); int dbLockTest(void); int dbPutLinkTest(void); int testDbChannel(void); @@ -35,6 +36,7 @@ void epicsRunDbTests(void) runTest(callbackTest); runTest(dbStateTest); runTest(dbShutdownTest); + runTest(scanIoTest); runTest(dbLockTest); runTest(dbPutLinkTest); runTest(testDbChannel); diff --git a/src/ioc/db/test/scanIoTest.c b/src/ioc/db/test/scanIoTest.c new file mode 100644 index 000000000..39e9aa35f --- /dev/null +++ b/src/ioc/db/test/scanIoTest.c @@ -0,0 +1,519 @@ +/*************************************************************************\ +* Copyright (c) 2013 UChicago Argonne LLC, as Operator of Argonne +* National Laboratory. +* Copyright (c) 2013 Helmholtz-Zentrum Berlin +* für Materialien und Energie GmbH. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +\*************************************************************************/ + +/* + * Author: Ralph Lange + */ + +#include + +#include "epicsEvent.h" +#include "epicsMessageQueue.h" +#include "epicsPrint.h" +#include "epicsMath.h" +#include "alarm.h" +#include "menuPriority.h" +#include "dbChannel.h" +#include "dbStaticLib.h" +#include "dbAccessDefs.h" +#include "dbScan.h" +#include "dbLock.h" +#include "dbUnitTest.h" +#include "dbCommon.h" +#include "registry.h" +#include "registryRecordType.h" +#include "registryDeviceSupport.h" +#include "recSup.h" +#include "devSup.h" +#include "iocInit.h" +#include "callback.h" +#include "ellLib.h" +#include "epicsUnitTest.h" +#include "testMain.h" +#include "osiFileName.h" + +#define GEN_SIZE_OFFSET +#include "yRecord.h" + +#include "epicsExport.h" + +#define ONE_THREAD_LOOPS 101 +#define PAR_THREAD_LOOPS 53 +#define CB_THREAD_LOOPS 13 + +#define NO_OF_THREADS 7 +#define NO_OF_MEMBERS 5 +#define NO_OF_GROUPS 11 + +#define NO_OF_MID_THREADS 3 + +static int noOfGroups = NO_OF_GROUPS; +static int noOfIoscans = NO_OF_GROUPS; + +static IOSCANPVT *ioscanpvt; /* Soft interrupt sources */ +static ELLLIST *pvtList; /* Per group private part lists */ + +static int executionOrder; +static int orderFail; +static int testNo; +static epicsMessageQueueId *mq; /* Per group message queue */ +static epicsEventId *barrier; /* Per group barrier event */ +static int *cbCounter; + +struct pvtY { + ELLNODE node; + yRecord *prec; + int group; + int member; + int count; + int processed; + int callback; +}; + +/* test2: priority and ioscan index for each group + * used priorities are expressed in the bit pattern of (ioscan index + 1) */ +struct groupItem { + int prio; + int ioscan; +} groupTable[12] = { + { 0, 0 }, + { 1, 1 }, + { 0, 2 }, { 1, 2 }, + { 2, 3 }, + { 0, 4 }, { 2, 4 }, + { 1, 5 }, { 2, 5 }, + { 0, 6 }, { 1, 6 }, { 2, 6 } +}; +static int recsProcessed = 1; +static int noDoubleCallback = 1; + +void scanIoTest_registerRecordDeviceDriver(struct dbBase *); + +long count_bits(long n) { + unsigned int c; /* c accumulates the total bits set in v */ + for (c = 0; n; c++) + n &= n - 1; /* clear the least significant bit set */ + return c; +} + +/*************************************************************************\ +* yRecord: minimal record needed to test I/O Intr scanning +\*************************************************************************/ + +static long get_ioint_info(int cmd, yRecord *prec, IOSCANPVT *ppvt) +{ + struct pvtY *pvt = (struct pvtY *)(prec->dpvt); + + if (testNo == 2) + *ppvt = ioscanpvt[groupTable[pvt->group].ioscan]; + else + *ppvt = ioscanpvt[pvt->group]; + return 0; +} + +struct ydset { + long number; + DEVSUPFUN report; + DEVSUPFUN init; + DEVSUPFUN init_record; + DEVSUPFUN get_ioint_info; + DEVSUPFUN process; +} devY = { + 5, + NULL, + NULL, + NULL, + get_ioint_info, + NULL +}; +epicsExportAddress(dset, devY); + +static long init_record(yRecord *prec, int pass) +{ + struct pvtY *pvt; + + if (pass == 0) return 0; + + pvt = (struct pvtY *) calloc(1, sizeof(struct pvtY)); + prec->dpvt = pvt; + + pvt->prec = prec; + sscanf(prec->name, "g%dm%d", &pvt->group, &pvt->member); + ellAdd(&pvtList[pvt->group], &pvt->node); + + return 0; +} + +static long process(yRecord *prec) +{ + struct pvtY *pvt = (struct pvtY *)(prec->dpvt); + + if (testNo == 0) { + /* Single callback thread */ + if (executionOrder != pvt->member) { + orderFail = 1; + } + pvt->count++; + if (++executionOrder == NO_OF_MEMBERS) executionOrder = 0; + } else { + pvt->count++; + if (pvt->member == 0) { + epicsMessageQueueSend(mq[pvt->group], NULL, 0); + epicsEventMustWait(barrier[pvt->group]); + } + } + pvt->processed = 1; + return 0; +} + +rset yRSET={ + 4, + NULL, /* report */ + NULL, /* initialize */ + init_record, + process +}; +epicsExportAddress(rset, yRSET); + +static void startMockIoc(void) { + char substitutions[256]; + int i, j; + char *prio[] = { "LOW", "MEDIUM", "HIGH" }; + + if (testNo == 2) { + noOfGroups = 12; + noOfIoscans = 7; + } + ioscanpvt = calloc(noOfIoscans, sizeof(IOSCANPVT)); + mq = calloc(noOfGroups, sizeof(epicsMessageQueueId)); + barrier = calloc(noOfGroups, sizeof(epicsEventId)); + pvtList = calloc(noOfGroups, sizeof(ELLLIST)); + cbCounter = calloc(noOfGroups, sizeof(int)); + + if (dbReadDatabase(&pdbbase, "scanIoTest.dbd", + "." OSI_PATH_LIST_SEPARATOR ".." OSI_PATH_LIST_SEPARATOR + "../O.Common" OSI_PATH_LIST_SEPARATOR "O.Common", NULL)) + testAbort("Error reading database description 'scanIoTest.dbd'"); + + callbackParallelThreads(1, "Low"); + callbackParallelThreads(NO_OF_MID_THREADS, "Medium"); + callbackParallelThreads(NO_OF_THREADS, "High"); + + for (i = 0; i < noOfIoscans; i++) { + scanIoInit(&ioscanpvt[i]); + } + + for (i = 0; i < noOfGroups; i++) { + mq[i] = epicsMessageQueueCreate(NO_OF_MEMBERS, 1); + barrier[i] = epicsEventMustCreate(epicsEventEmpty); + ellInit(&pvtList[i]); + } + + scanIoTest_registerRecordDeviceDriver(pdbbase); + for (i = 0; i < noOfGroups; i++) { + for (j = 0; j < NO_OF_MEMBERS; j++) { + sprintf(substitutions, "GROUP=%d,MEMBER=%d,PRIO=%s", i, j, + testNo==0?"LOW":(testNo==1?"HIGH":prio[groupTable[i].prio])); + if (dbReadDatabase(&pdbbase, "scanIoTest.db", + "." OSI_PATH_LIST_SEPARATOR "..", substitutions)) + testAbort("Error reading test database 'scanIoTest.db'"); + } + } + + testIocInitOk(); +} + +static void stopMockIoc(void) { + int i; + + testIocShutdownOk(); + epicsThreadSleep(0.1); + for (i = 0; i < noOfGroups; i++) { + epicsMessageQueueDestroy(mq[i]); mq[i] = NULL; + epicsEventDestroy(barrier[i]); barrier[i] = NULL; + ellFree(&pvtList[i]); + } + free(mq); + free(barrier); + free(pvtList); + free(cbCounter); + testdbCleanup(); +} + +static void checkProcessed(void *user, IOSCANPVT ioscan, int prio) { + struct pvtY *pvt; + int group = -1; + int i; + + for (i = 0; i < noOfGroups; i++) { + if (ioscanpvt[groupTable[i].ioscan] == ioscan + && groupTable[i].prio == prio) { + group = i; + break; + } + } + if (group == -1) + testAbort("invalid ioscanpvt in scanio callback"); + + cbCounter[group]++; + for (pvt = (struct pvtY *)ellFirst(&pvtList[group]); + pvt; + pvt = (struct pvtY *)ellNext(&pvt->node)) { + if (pvt->callback == 1) { + testDiag("callback for rec %s arrived twice\n", pvt->prec->name); + noDoubleCallback = 0; + } + if (pvt->processed == 0) { + testDiag("rec %s was not processed\n", pvt->prec->name); + recsProcessed = 0; + } + pvt->callback = 1; + } +} + +/*************************************************************************\ +* scanIoTest: Test I/O Intr scanning +* including parallel callback threads and scanio callbacks +\*************************************************************************/ + +MAIN(scanIoTest) +{ + int i, j; + int loop; + int max_one, max_one_all; + int parallel, parallel_all; + int result; + int cbCountOk; + long waiting; + struct pvtY *pvt; + + testPlan(10); + + if (noOfGroups < NO_OF_THREADS) + testAbort("ERROR: This test requires number of ioscan sources >= number of parallel threads"); + + /**************************************\ + * Single callback thread + \**************************************/ + + testNo = 0; + startMockIoc(); + + testDiag("Testing single callback thread"); + testDiag(" using %d ioscan sources, %d records for each, and %d loops", + noOfGroups, NO_OF_MEMBERS, ONE_THREAD_LOOPS); + + for (j = 0; j < ONE_THREAD_LOOPS; j++) { + for (i = 0; i < noOfIoscans; i++) { + scanIoRequest(ioscanpvt[i]); + } + } + + epicsThreadSleep(0.1); + + testOk((orderFail==0), "No out-of-order processing"); + + result = 1; + for (i = 0; i < noOfGroups; i++) { + for (pvt = (struct pvtY *)ellFirst(&pvtList[i]); + pvt; + pvt = (struct pvtY *)ellNext(&pvt->node)) { + if (pvt->count != ONE_THREAD_LOOPS) result = 0; + } + } + + testOk(result, "All per-record process counters match number of loops"); + + stopMockIoc(); + + /**************************************\ + * Multiple parallel callback threads + \**************************************/ + + testNo = 1; + startMockIoc(); + + testDiag("Testing multiple parallel callback threads"); + testDiag(" using %d ioscan sources, %d records for each, %d loops, and %d parallel threads", + noOfIoscans, NO_OF_MEMBERS, PAR_THREAD_LOOPS, NO_OF_THREADS); + + for (j = 0; j < PAR_THREAD_LOOPS; j++) { + for (i = 0; i < noOfIoscans; i++) { + scanIoRequest(ioscanpvt[i]); + } + } + + /* With parallel cb threads, order and distribution to threads are not guaranteed. + * We have stop barrier events for each request (in the first record). + * Test schedule: + * - After the requests have been put in the queue, NO_OF_THREADS threads should have taken + * one request each. + * - Each barrier event is given PAR_THREAD_LOOPS times. + * - Whenever things stop, there should be four threads waiting, one request each. + * - After all loops, each record should have processed PAR_THREAD_LOOPS times. + */ + + max_one_all = 1; + parallel_all = 1; + + for (loop = 0; loop < (PAR_THREAD_LOOPS * noOfGroups) / NO_OF_THREADS + 1; loop++) { + max_one = 1; + parallel = 0; + waiting = 0; + j = 0; + do { + epicsThreadSleep(0.001); + j++; + for (i = 0; i < noOfGroups; i++) { + int l = epicsMessageQueuePending(mq[i]); + while (epicsMessageQueueTryReceive(mq[i], NULL, 0) != -1); + if (l == 1) { + waiting |= 1 << i; + } else if (l > 1) { + max_one = 0; + } + } + parallel = count_bits(waiting); + } while (j < 5 && parallel < NO_OF_THREADS); + + if (!max_one) max_one_all = 0; + if (loop < (PAR_THREAD_LOOPS * noOfGroups) / NO_OF_THREADS) { + if (!(parallel == NO_OF_THREADS)) parallel_all = 0; + } else { + /* In the last run of the loop only the remaining requests are processed */ + if (!(parallel == PAR_THREAD_LOOPS * noOfGroups % NO_OF_THREADS)) parallel_all = 0; + } + + for (i = 0; i < noOfGroups; i++) { + if (waiting & (1 << i)) { + epicsEventTrigger(barrier[i]); + } + } + } + + testOk(max_one_all, "No thread took more than one request per loop"); + testOk(parallel_all, "Correct number of requests were being processed in parallel in each loop"); + + epicsThreadSleep(0.1); + + result = 1; + for (i = 0; i < noOfGroups; i++) { + for (pvt = (struct pvtY *)ellFirst(&pvtList[i]); + pvt; + pvt = (struct pvtY *)ellNext(&pvt->node)) { + if (pvt->count != PAR_THREAD_LOOPS) { + testDiag("Process counter for record %s (%d) does not match loop count (%d)", + pvt->prec->name, pvt->count, PAR_THREAD_LOOPS); + result = 0; + } + } + } + + testOk(result, "All per-record process counters match number of loops"); + + stopMockIoc(); + + /**************************************\ + * Scanio callback mechanism + \**************************************/ + + testNo = 2; + startMockIoc(); + + for (i = 0; i < noOfIoscans; i++) { + scanIoSetComplete(ioscanpvt[i], checkProcessed, NULL); + } + + testDiag("Testing scanio callback mechanism"); + testDiag(" using %d ioscan sources, %d records for each, %d loops, and 1 LOW / %d MEDIUM / %d HIGH parallel threads", + noOfIoscans, NO_OF_MEMBERS, CB_THREAD_LOOPS, NO_OF_MID_THREADS, NO_OF_THREADS); + + result = 1; + for (j = 0; j < CB_THREAD_LOOPS; j++) { + for (i = 0; i < noOfIoscans; i++) { + int prio_used; + prio_used = scanIoRequest(ioscanpvt[i]); + if (i+1 != prio_used) + result = 0; + } + } + testOk(result, "All requests return the correct priority callback mask (all 7 permutations covered)"); + + /* Test schedule: + * After the requests have been put in the queue, it is checked + * - that each callback arrives exactly once, + * - after all records in the group have been processed. + */ + + /* loop count times 4 since (worst case) one loop triggers 4 groups for the single LOW thread */ + for (loop = 0; loop < CB_THREAD_LOOPS * 4; loop++) { + max_one = 1; + parallel = 0; + waiting = 0; + j = 0; + do { + epicsThreadSleep(0.001); + j++; + for (i = 0; i < noOfGroups; i++) { + int l = epicsMessageQueuePending(mq[i]); + while (epicsMessageQueueTryReceive(mq[i], NULL, 0) != -1); + if (l == 1) { + waiting |= 1 << i; + } else if (l > 1) { + max_one = 0; + } + } + parallel = count_bits(waiting); + } while (j < 5); +\ + for (i = 0; i < noOfGroups; i++) { + if (waiting & (1 << i)) { + for (pvt = (struct pvtY *)ellFirst(&pvtList[i]); + pvt; + pvt = (struct pvtY *)ellNext(&pvt->node)) { + pvt->processed = 0; + pvt->callback = 0; + /* record processing will set this at the end of process() */ + } + epicsEventTrigger(barrier[i]); + } + } + } + + epicsThreadSleep(0.1); + + testOk(recsProcessed, "Each callback occured after all records in the group were processed"); + testOk(noDoubleCallback, "No double callbacks occured in any loop"); + + result = 1; + cbCountOk = 1; + for (i = 0; i < noOfGroups; i++) { + if (cbCounter[i] != CB_THREAD_LOOPS) { + testDiag("Callback counter for group %d (%d) does not match loop count (%d)", + i, cbCounter[i], CB_THREAD_LOOPS); + cbCountOk = 0; + } + for (pvt = (struct pvtY *)ellFirst(&pvtList[i]); + pvt; + pvt = (struct pvtY *)ellNext(&pvt->node)) { + if (pvt->count != CB_THREAD_LOOPS) { + testDiag("Process counter for record %s (%d) does not match loop count (%d)", + pvt->prec->name, pvt->count, CB_THREAD_LOOPS); + result = 0; + } + } + } + + testOk(result, "All per-record process counters match number of loops"); + testOk(cbCountOk, "All per-group callback counters match number of loops"); + + stopMockIoc(); + + return testDone(); +} diff --git a/src/ioc/db/test/scanIoTest.db b/src/ioc/db/test/scanIoTest.db new file mode 100644 index 000000000..810a84edf --- /dev/null +++ b/src/ioc/db/test/scanIoTest.db @@ -0,0 +1,4 @@ +record(y, g$(GROUP)m$(MEMBER)) { + field(SCAN, "I/O Intr") + field(PRIO, "$(PRIO)") +} diff --git a/src/ioc/db/test/yRecord.dbd b/src/ioc/db/test/yRecord.dbd new file mode 100644 index 000000000..0aa970da5 --- /dev/null +++ b/src/ioc/db/test/yRecord.dbd @@ -0,0 +1,10 @@ +# This is a minimal I/O scanned record + +recordtype(y) { + include "dbCommon.dbd" + field(VAL, DBF_LONG) { + prompt("Value") + } +} + +device(y,CONSTANT,devY,"ScanIO Test") diff --git a/src/ioc/misc/dbCore.dbd b/src/ioc/misc/dbCore.dbd index 3b22f4663..b383efeb6 100644 --- a/src/ioc/misc/dbCore.dbd +++ b/src/ioc/misc/dbCore.dbd @@ -17,3 +17,5 @@ variable(dbBptNotMonotonic,int) # dbLoadTemplate settings variable(dbTemplateMaxVars,int) +# Default number of parallel callback threads +variable(callbackParallelThreadsDefault,int) diff --git a/src/libCom/ring/epicsRingBytes.c b/src/libCom/ring/epicsRingBytes.c index 2c15ee8f1..cb7e52e83 100644 --- a/src/libCom/ring/epicsRingBytes.c +++ b/src/libCom/ring/epicsRingBytes.c @@ -3,13 +3,16 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. -* EPICS BASE Versions 3.13.7 -* and higher are distributed subject to a Software License Agreement found -* in file LICENSE that is included with this distribution. +* Copyright (c) 2012 ITER Organization. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. \*************************************************************************/ -/* epicsRingBytes.cd */ -/* Author: Eric Norum & Marty Kraimer Date: 15JUL99 */ +/* + * Author: Marty Kraimer Date: 15JUL99 + * Eric Norum + * Ralph Lange + */ #include #include @@ -18,6 +21,7 @@ #include #define epicsExportSharedSymbols +#include "epicsSpin.h" #include "dbDefs.h" #include "epicsRingBytes.h" @@ -30,6 +34,7 @@ #define SLOP 16 typedef struct ringPvt { + epicsSpinId lock; volatile int nextPut; volatile int nextGet; int size; @@ -44,12 +49,23 @@ epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int size) pring->size = size + SLOP; pring->nextGet = 0; pring->nextPut = 0; + pring->lock = 0; + return((void *)pring); +} + +epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesLockedCreate(int size) +{ + ringPvt *pring = (ringPvt *)epicsRingBytesCreate(size); + if(!pring) + return NULL; + pring->lock = epicsSpinCreate(); return((void *)pring); } epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + if (pring->lock) epicsSpinDestroy(pring->lock); free((void *)pring); } @@ -57,11 +73,14 @@ epicsShareFunc int epicsShareAPI epicsRingBytesGet( epicsRingBytesId id, char *value,int nbytes) { ringPvt *pring = (ringPvt *)id; - int nextGet = pring->nextGet; - int nextPut = pring->nextPut; - int size = pring->size; + int nextGet, nextPut, size; int count; + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + size = pring->size; + if (nextGet <= nextPut) { count = nextPut - nextGet; if (count < nbytes) @@ -89,6 +108,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesGet( } } pring->nextGet = nextGet; + + if (pring->lock) epicsSpinUnlock(pring->lock); return nbytes; } @@ -96,23 +117,30 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut( epicsRingBytesId id, char *value,int nbytes) { ringPvt *pring = (ringPvt *)id; - int nextGet = pring->nextGet; - int nextPut = pring->nextPut; - int size = pring->size; + int nextGet, nextPut, size; int freeCount, copyCount, topCount; + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + size = pring->size; + if (nextPut < nextGet) { freeCount = nextGet - nextPut - SLOP; - if (nbytes > freeCount) + if (nbytes > freeCount) { + if (pring->lock) epicsSpinUnlock(pring->lock); return 0; + } if (nbytes) memcpy ((void *)&pring->buffer[nextPut], value, nbytes); nextPut += nbytes; } else { freeCount = size - nextPut + nextGet - SLOP; - if (nbytes > freeCount) + if (nbytes > freeCount) { + if (pring->lock) epicsSpinUnlock(pring->lock); return 0; + } topCount = size - nextPut; copyCount = (nbytes > topCount) ? topCount : nbytes; if (copyCount) @@ -126,6 +154,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesPut( } } pring->nextPut = nextPut; + + if (pring->lock) epicsSpinUnlock(pring->lock); return nbytes; } @@ -133,14 +163,20 @@ epicsShareFunc void epicsShareAPI epicsRingBytesFlush(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + if (pring->lock) epicsSpinLock(pring->lock); pring->nextGet = pring->nextPut; + if (pring->lock) epicsSpinUnlock(pring->lock); } epicsShareFunc int epicsShareAPI epicsRingBytesFreeBytes(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; - int nextGet = pring->nextGet; - int nextPut = pring->nextPut; + int nextGet, nextPut; + + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + if (pring->lock) epicsSpinUnlock(pring->lock); if (nextPut < nextGet) return nextGet - nextPut - SLOP; @@ -151,8 +187,18 @@ epicsShareFunc int epicsShareAPI epicsRingBytesFreeBytes(epicsRingBytesId id) epicsShareFunc int epicsShareAPI epicsRingBytesUsedBytes(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + int nextGet, nextPut; + int used; - return pring->size - epicsRingBytesFreeBytes(id) - SLOP; + if (pring->lock) epicsSpinLock(pring->lock); + nextGet = pring->nextGet; + nextPut = pring->nextPut; + if (pring->lock) epicsSpinUnlock(pring->lock); + + used = nextPut - nextGet; + if (used < 0) used += pring->size; + + return used; } epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id) @@ -165,8 +211,13 @@ epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id) epicsShareFunc int epicsShareAPI epicsRingBytesIsEmpty(epicsRingBytesId id) { ringPvt *pring = (ringPvt *)id; + int isEmpty; - return (pring->nextPut == pring->nextGet); + if (pring->lock) epicsSpinLock(pring->lock); + isEmpty = (pring->nextPut == pring->nextGet); + if (pring->lock) epicsSpinUnlock(pring->lock); + + return isEmpty; } epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id) diff --git a/src/libCom/ring/epicsRingBytes.h b/src/libCom/ring/epicsRingBytes.h index 9e5220513..011829bfe 100644 --- a/src/libCom/ring/epicsRingBytes.h +++ b/src/libCom/ring/epicsRingBytes.h @@ -3,13 +3,16 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. -* EPICS BASE Versions 3.13.7 -* and higher are distributed subject to a Software License Agreement found -* in file LICENSE that is included with this distribution. +* Copyright (c) 2012 ITER Organization. +* EPICS BASE is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. \*************************************************************************/ -/*epicsRingBytes.h */ -/* Author: Eric Norum & Marty Kraimer Date: 15JUL99 */ +/* + * Author: Marty Kraimer Date: 15JUL99 + * Eric Norum + * Ralph Lange + */ #ifndef INCepicsRingBytesh #define INCepicsRingBytesh @@ -23,6 +26,8 @@ extern "C" { typedef void *epicsRingBytesId; epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int nbytes); +/* Same, but secured by a spinlock */ +epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesLockedCreate(int nbytes); epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id); epicsShareFunc int epicsShareAPI epicsRingBytesGet( epicsRingBytesId id, char *value,int nbytes); @@ -42,6 +47,8 @@ epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id); /* NOTES If there is only one writer it is not necessary to lock for put If there is a single reader it is not necessary to lock for puts + + epicsRingBytesLocked uses a spinlock. */ #endif /* INCepicsRingBytesh */ diff --git a/src/libCom/ring/epicsRingPointer.cpp b/src/libCom/ring/epicsRingPointer.cpp index 3a5cc8c72..9c144cec1 100644 --- a/src/libCom/ring/epicsRingPointer.cpp +++ b/src/libCom/ring/epicsRingPointer.cpp @@ -3,11 +3,16 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2012 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ -/*epicsRingPointer.cpp*/ -/* Author: Marty Kraimer Date: 13OCT2000 */ + +/* + * Author: Marty Kraimer Date: 13OCT2000 + * Ralph Lange + */ + #include #include @@ -22,7 +27,13 @@ typedef epicsRingPointer voidPointer; epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size) { - voidPointer *pvoidPointer = new voidPointer(size); + voidPointer *pvoidPointer = new voidPointer(size, false); + return(reinterpret_cast(pvoidPointer)); +} + +epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerLockedCreate(int size) +{ + voidPointer *pvoidPointer = new voidPointer(size, true); return(reinterpret_cast(pvoidPointer)); } diff --git a/src/libCom/ring/epicsRingPointer.h b/src/libCom/ring/epicsRingPointer.h index 3332782a4..48d62036d 100644 --- a/src/libCom/ring/epicsRingPointer.h +++ b/src/libCom/ring/epicsRingPointer.h @@ -3,12 +3,15 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2012 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ -/*epicsRingPointer.h */ -/* Author: Marty Kraimer Date: 15JUL99 */ +/* + * Author: Marty Kraimer Date: 15JUL99 + * Ralph Lange + */ #ifndef INCepicsRingPointerh #define INCepicsRingPointerh @@ -16,15 +19,18 @@ /* NOTES * If there is only one writer it is not necessary to lock push * If there is a single reader it is not necessary to lock pop + * + * epicsRingPointerLocked uses a spinlock. */ +#include "epicsSpin.h" #include "shareLib.h" #ifdef __cplusplus template class epicsRingPointer { public: /* Functions */ - epicsRingPointer(int size); + epicsRingPointer(int size, bool locked); ~epicsRingPointer(); bool push(T *p); T* pop(); @@ -42,6 +48,7 @@ private: /* Prevent compiler-generated member functions */ epicsRingPointer& operator=(const epicsRingPointer &); private: /* Data */ + epicsSpinId lock; volatile int nextPush; volatile int nextPop; int size; @@ -54,6 +61,8 @@ extern "C" { typedef void *epicsRingPointerId; epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size); +/* Same, but secured by a spinlock */ +epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerLockedCreate(int size); epicsShareFunc void epicsShareAPI epicsRingPointerDelete(epicsRingPointerId id); /*ringPointerPush returns (0,1) if p (was not, was) put on ring*/ epicsShareFunc int epicsShareAPI epicsRingPointerPush(epicsRingPointerId id,void *p); @@ -85,72 +94,105 @@ epicsShareFunc int epicsShareAPI epicsRingPointerIsFull(epicsRingPointerId id); #ifdef __cplusplus template -inline epicsRingPointer::epicsRingPointer(int sz) : - nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1]) {} +inline epicsRingPointer::epicsRingPointer(int sz, bool locked) : + lock(0), nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1]) +{ + if (locked) + lock = epicsSpinCreate(); +} template inline epicsRingPointer::~epicsRingPointer() -{ delete [] buffer;} +{ + if (lock) epicsSpinDestroy(lock); + delete [] buffer; +} template inline bool epicsRingPointer::push(T *p) { + if (lock) epicsSpinLock(lock); int next = nextPush; int newNext = next + 1; if(newNext>=size) newNext=0; - if(newNext==nextPop) return(false); + if (newNext == nextPop) { + if (lock) epicsSpinUnlock(lock); + return(false); + } buffer[next] = p; nextPush = newNext; + if (lock) epicsSpinUnlock(lock); return(true); } template inline T* epicsRingPointer::pop() { + if (lock) epicsSpinLock(lock); int next = nextPop; - if(next == nextPush) return(0); + if (next == nextPush) { + if (lock) epicsSpinUnlock(lock); + return(0); + } T*p = buffer[next]; ++next; if(next >=size) next = 0; nextPop = next; + if (lock) epicsSpinUnlock(lock); return(p); } template inline void epicsRingPointer::flush() { + if (lock) epicsSpinLock(lock); nextPop = 0; nextPush = 0; + if (lock) epicsSpinUnlock(lock); } template inline int epicsRingPointer::getFree() const { + if (lock) epicsSpinLock(lock); int n = nextPop - nextPush - 1; if (n < 0) n += size; + if (lock) epicsSpinUnlock(lock); return n; } template inline int epicsRingPointer::getUsed() const { + if (lock) epicsSpinLock(lock); int n = nextPush - nextPop; if (n < 0) n += size; + if (lock) epicsSpinUnlock(lock); return n; } template inline int epicsRingPointer::getSize() const -{ return(size-1);} +{ + return(size-1); +} template inline bool epicsRingPointer::isEmpty() const -{ return(nextPush==nextPop);} +{ + bool isEmpty; + if (lock) epicsSpinLock(lock); + isEmpty = (nextPush == nextPop); + if (lock) epicsSpinUnlock(lock); + return isEmpty; +} template inline bool epicsRingPointer::isFull() const { + if (lock) epicsSpinLock(lock); int count = nextPush - nextPop +1; + if (lock) epicsSpinUnlock(lock); return((count == 0) || (count == size)); } diff --git a/src/libCom/test/ringPointerTest.c b/src/libCom/test/ringPointerTest.c index fd3171045..e50aeb639 100644 --- a/src/libCom/test/ringPointerTest.c +++ b/src/libCom/test/ringPointerTest.c @@ -3,6 +3,7 @@ * National Laboratory. * Copyright (c) 2002 The Regents of the University of California, as * Operator of Los Alamos National Laboratory. +* Copyright (c) 2013 ITER Organization. * EPICS BASE is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. \*************************************************************************/ @@ -26,12 +27,17 @@ #include "testMain.h" #define ringSize 10 +#define consumerCount 4 +#define producerCount 4 static volatile int testExit = 0; +int value[ringSize*2]; typedef struct info { epicsEventId consumerEvent; epicsRingPointerId ring; + int checkOrder; + int value[ringSize*2]; }info; static void consumer(void *arg) @@ -39,16 +45,46 @@ static void consumer(void *arg) info *pinfo = (info *)arg; static int expectedValue=0; int *newvalue; + char myname[20]; - testDiag("Consumer starting"); + epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname)); + testDiag("%s starting", myname); while(1) { epicsEventMustWait(pinfo->consumerEvent); if (testExit) return; - while((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) { - testOk(expectedValue == *newvalue, - "Consumer: %d == %d", expectedValue, *newvalue); - expectedValue = *newvalue + 1; - } + while ((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) { + if (pinfo->checkOrder) { + testOk(expectedValue == *newvalue, + "%s: (got) %d == %d (expected)", myname, *newvalue, expectedValue); + expectedValue = *newvalue + 1; + } else { + testOk(pinfo->value[*newvalue] <= producerCount, "%s: got a %d (%d times seen before)", + myname, *newvalue, pinfo->value[*newvalue]); + } + /* This must be atomic... */ + pinfo->value[*newvalue]++; + epicsThreadSleep(0.05); + } + } +} + +static void producer(void *arg) +{ + info *pinfo = (info *)arg; + char myname[20]; + int i; + + epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname)); + testDiag("%s starting", myname); + for (i=0; iring)) { + epicsThreadSleep(0.2); + if (testExit) return; + } + testOk(epicsRingPointerPush(pinfo->ring, (void *)&value[i]), + "%s: Pushing %d, ring not full", myname, i); + epicsEventSignal(pinfo->consumerEvent); + if (testExit) return; } } @@ -57,21 +93,27 @@ MAIN(ringPointerTest) int i; info *pinfo; epicsEventId consumerEvent; - int value[ringSize*2]; int *pgetValue; epicsRingPointerId ring; epicsThreadId tid; + char threadName[20]; - testPlan(54); + testPlan(256); for (i=0; iconsumerEvent = consumerEvent = epicsEventMustCreate(epicsEventEmpty); if (!consumerEvent) { testAbort("epicsEventMustCreate failed"); } + testDiag("******************************************************"); + testDiag("** Test 1: local ring pointer, check size and order **"); + testDiag("******************************************************"); + pinfo->ring = ring = epicsRingPointerCreate(ringSize); if (!ring) { testAbort("epicsRingPointerCreate failed"); @@ -86,22 +128,74 @@ MAIN(ringPointerTest) } testOk(epicsRingPointerIsEmpty(ring), "Ring empty"); + testDiag("**************************************************************"); + testDiag("** Test 2: unlocked ring pointer, one consumer, check order **"); + testDiag("**************************************************************"); + + pinfo->checkOrder = 1; tid=epicsThreadCreate("consumer", 50, epicsThreadGetStackSize(epicsThreadStackSmall), consumer, pinfo); if(!tid) testAbort("epicsThreadCreate failed"); - epicsThreadSleep(0.1); + epicsThreadSleep(0.2); for (i=0; ivalue[i] == 1, "Value test: %d was processed", i); + } + + testExit = 1; + epicsEventSignal(consumerEvent); + epicsThreadSleep(1.0); + + epicsRingPointerDelete(pinfo->ring); + + testDiag("*************************************************************************************"); + testDiag("** Test 3: locked ring pointer, many consumers, many producers, check no of copies **"); + testDiag("*************************************************************************************"); + + pinfo->ring = ring = epicsRingPointerLockedCreate(ringSize); + if (!ring) { + testAbort("epicsRingPointerLockedCreate failed"); + } + testOk(epicsRingPointerIsEmpty(ring), "Ring empty"); + + for (i=0; ivalue[i] = 0; + testExit = 0; + pinfo->checkOrder = 0; + for (i=0; ivalue[i] == producerCount, "Value test: %d was processed %d times", i, producerCount); + } + testExit = 1; epicsEventSignal(consumerEvent); epicsThreadSleep(1.0);