Fix shutdown issues with scan and callback.
The main reason for this merge proposal is the change to "public" API functions. Use atomic counter to resolve data race on threadsRunning in callback. Split up callbackShutdown() and scanShutdown() into two phases *Stop() and *Cleanup(). The *Stop() functions signal worker threads, and wait for them to exit. The *Cleanup() functions actually reclaim global resources. These two mechanisms have couplings which are quite complex. I/O Intr scans involve both scan lists and callbacks.
This commit is contained in:
committed by
Andrew Johnson
parent
5eb49ebaf0
commit
fefe6fd1fc
@@ -54,6 +54,7 @@ typedef struct cbQueueSet {
|
||||
epicsEventId semWakeUp;
|
||||
epicsRingPointerId queue;
|
||||
int queueOverflow;
|
||||
int shutdown;
|
||||
int threadsConfigured;
|
||||
int threadsRunning;
|
||||
} cbQueueSet;
|
||||
@@ -73,7 +74,6 @@ static epicsTimerQueueId timerQueue;
|
||||
enum ctl {ctlInit, ctlRun, ctlPause, ctlExit};
|
||||
static volatile enum ctl cbCtl;
|
||||
static epicsEventId startStopEvent;
|
||||
static void *exitCallback;
|
||||
|
||||
/* Static data */
|
||||
static char *threadNamePrefix[NUM_CALLBACK_PRIORITIES] = {
|
||||
@@ -149,12 +149,13 @@ int callbackParallelThreads(int count, const char *prio)
|
||||
|
||||
static void callbackTask(void *arg)
|
||||
{
|
||||
cbQueueSet *mySet = &callbackQueue[*(int*)arg];
|
||||
int prio = *(int*)arg;
|
||||
cbQueueSet *mySet = &callbackQueue[prio];
|
||||
|
||||
taskwdInsert(0, NULL, NULL);
|
||||
epicsEventSignal(startStopEvent);
|
||||
|
||||
while(TRUE) {
|
||||
while(!mySet->shutdown) {
|
||||
void *ptr;
|
||||
if (epicsRingPointerIsEmpty(mySet->queue))
|
||||
epicsEventMustWait(mySet->semWakeUp);
|
||||
@@ -163,39 +164,50 @@ static void callbackTask(void *arg)
|
||||
CALLBACK *pcallback = (CALLBACK *)ptr;
|
||||
if(!epicsRingPointerIsEmpty(mySet->queue))
|
||||
epicsEventMustTrigger(mySet->semWakeUp);
|
||||
if (ptr == &exitCallback) goto shutdown;
|
||||
mySet->queueOverflow = FALSE;
|
||||
(*pcallback->callback)(pcallback);
|
||||
}
|
||||
}
|
||||
|
||||
shutdown:
|
||||
mySet->threadsRunning--;
|
||||
if(!epicsAtomicDecrIntT(&mySet->threadsRunning))
|
||||
epicsEventSignal(startStopEvent);
|
||||
taskwdRemove(0);
|
||||
epicsEventSignal(startStopEvent);
|
||||
}
|
||||
|
||||
void callbackShutdown(void)
|
||||
void callbackStop(void)
|
||||
{
|
||||
int i;
|
||||
|
||||
if (cbCtl == ctlExit) return;
|
||||
cbCtl = ctlExit;
|
||||
|
||||
/* sequential shutdown of workers */
|
||||
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
|
||||
while (callbackQueue[i].threadsRunning) {
|
||||
if(epicsRingPointerPush(callbackQueue[i].queue, &exitCallback)) {
|
||||
epicsEventSignal(callbackQueue[i].semWakeUp);
|
||||
epicsEventWait(startStopEvent);
|
||||
} else {
|
||||
epicsThreadSleep(0.05);
|
||||
}
|
||||
}
|
||||
assert(callbackQueue[i].threadsRunning==0);
|
||||
epicsEventDestroy(callbackQueue[i].semWakeUp);
|
||||
epicsRingPointerDelete(callbackQueue[i].queue);
|
||||
callbackQueue[i].shutdown = 1;
|
||||
epicsEventSignal(callbackQueue[i].semWakeUp);
|
||||
}
|
||||
|
||||
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
|
||||
cbQueueSet *mySet = &callbackQueue[i];
|
||||
|
||||
while (epicsAtomicGetIntT(&mySet->threadsRunning)) {
|
||||
epicsEventSignal(mySet->semWakeUp);
|
||||
epicsEventWaitWithTimeout(startStopEvent, 0.1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void callbackCleanup(void)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
|
||||
cbQueueSet *mySet = &callbackQueue[i];
|
||||
|
||||
assert(epicsAtomicGetIntT(&mySet->threadsRunning)==0);
|
||||
epicsEventDestroy(mySet->semWakeUp);
|
||||
epicsRingPointerDelete(mySet->queue);
|
||||
}
|
||||
|
||||
epicsTimerQueueRelease(timerQueue);
|
||||
epicsEventDestroy(startStopEvent);
|
||||
startStopEvent = NULL;
|
||||
@@ -239,7 +251,7 @@ void callbackInit(void)
|
||||
cantProceed("Failed to spawn callback thread %s\n", threadName);
|
||||
} else {
|
||||
epicsEventWait(startStopEvent);
|
||||
callbackQueue[i].threadsRunning++;
|
||||
epicsAtomicIncrIntT(&callbackQueue[i].threadsRunning);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,7 +57,8 @@ typedef void (*CALLBACKFUNC)(struct callbackPvt*);
|
||||
( (USER) = (void *)((CALLBACK *)(PCALLBACK))->user )
|
||||
|
||||
epicsShareFunc void callbackInit(void);
|
||||
epicsShareFunc void callbackShutdown(void);
|
||||
epicsShareFunc void callbackStop(void);
|
||||
epicsShareFunc void callbackCleanup(void);
|
||||
epicsShareFunc int callbackRequest(CALLBACK *pCallback);
|
||||
epicsShareFunc void callbackSetProcess(
|
||||
CALLBACK *pcallback, int Priority, void *pRec);
|
||||
|
||||
@@ -151,7 +151,7 @@ static void buildScanLists(void);
|
||||
static void addToList(struct dbCommon *precord, scan_list *psl);
|
||||
static void deleteFromList(struct dbCommon *precord, scan_list *psl);
|
||||
|
||||
void scanShutdown(void)
|
||||
void scanStop(void)
|
||||
{
|
||||
int i;
|
||||
|
||||
@@ -168,6 +168,10 @@ void scanShutdown(void)
|
||||
|
||||
scanOnce((dbCommon *)&exitOnce);
|
||||
epicsEventWait(startStopEvent);
|
||||
}
|
||||
|
||||
void scanCleanup(void)
|
||||
{
|
||||
|
||||
deletePeriodic();
|
||||
ioscanDestroy();
|
||||
|
||||
@@ -46,7 +46,8 @@ struct dbCommon;
|
||||
epicsShareFunc long scanInit(void);
|
||||
epicsShareFunc void scanRun(void);
|
||||
epicsShareFunc void scanPause(void);
|
||||
epicsShareFunc void scanShutdown(void);
|
||||
epicsShareFunc void scanStop(void);
|
||||
epicsShareFunc void scanCleanup(void);
|
||||
|
||||
epicsShareFunc EVENTPVT eventNameToHandle(const char* event);
|
||||
epicsShareFunc void postEvent(EVENTPVT epvt);
|
||||
|
||||
@@ -184,7 +184,8 @@ MAIN(callbackParallelTest)
|
||||
free(pcbt[i]);
|
||||
}
|
||||
|
||||
callbackShutdown();
|
||||
callbackStop();
|
||||
callbackCleanup();
|
||||
|
||||
return testDone();
|
||||
}
|
||||
|
||||
@@ -181,7 +181,8 @@ MAIN(callbackTest)
|
||||
free(pcbt[i]);
|
||||
}
|
||||
|
||||
callbackShutdown();
|
||||
callbackStop();
|
||||
callbackCleanup();
|
||||
|
||||
return testDone();
|
||||
}
|
||||
|
||||
@@ -607,7 +607,8 @@ static void initialProcess(void)
|
||||
|
||||
|
||||
/*
|
||||
* Shutdown processing.
|
||||
* set DB_LINK and CA_LINK to PV_LINK
|
||||
* Delete record scans
|
||||
*/
|
||||
static void doCloseLinks(dbRecordType *pdbRecordType, dbCommon *precord,
|
||||
void *user)
|
||||
@@ -678,8 +679,12 @@ int iocShutdown(void)
|
||||
if (iocState == iocVirgin || iocState == iocStopped) return 0;
|
||||
iterateRecords(doCloseLinks, NULL);
|
||||
if (iocBuildMode==buildIsolated) {
|
||||
scanShutdown();
|
||||
callbackShutdown();
|
||||
/* stop and "join" threads */
|
||||
scanStop();
|
||||
callbackStop();
|
||||
/* free resources */
|
||||
scanCleanup();
|
||||
callbackCleanup();
|
||||
iterateRecords(doFreeRecord, NULL);
|
||||
dbLockCleanupRecords(pdbbase);
|
||||
asShutdown();
|
||||
|
||||
Reference in New Issue
Block a user