fixed and also simplified locking logic
This commit is contained in:
+58
-74
@@ -119,11 +119,9 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) :
|
||||
recvProcessEnableRefCount ( 0u ),
|
||||
pndRecvCnt ( 0u ),
|
||||
readSeq ( 0u ),
|
||||
recvProcessCompletionSignalRequestCount ( 0u ),
|
||||
enablePreemptiveCallback ( enablePreemptiveCallbackIn ),
|
||||
ioInProgress ( false ),
|
||||
recvProcessThreadExitRequest ( false ),
|
||||
recvProcessPending ( false )
|
||||
recvProcessThreadExitRequest ( false )
|
||||
{
|
||||
long status;
|
||||
unsigned abovePriority;
|
||||
@@ -223,7 +221,10 @@ cac::~cac ()
|
||||
if ( this->pRecvProcessThread ) {
|
||||
this->recvProcessThreadExitRequest = true;
|
||||
this->recvProcessActivityEvent.signal ();
|
||||
this->enableCallbackPreemption ();
|
||||
{
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
this->enableCallbackPreemption ();
|
||||
}
|
||||
this->recvProcessThreadExit.wait ();
|
||||
delete this->pRecvProcessThread;
|
||||
}
|
||||
@@ -577,8 +578,10 @@ void cac::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout )
|
||||
this->disableCallbackPreemption ();
|
||||
}
|
||||
|
||||
int cac::pendEvent ( const double &timeout )
|
||||
int cac::pendEvent ( const double & timeout )
|
||||
{
|
||||
epicsTime current = epicsTime::getCurrent ();
|
||||
|
||||
// prevent recursion nightmares by disabling calls to
|
||||
// pendIO () from within a CA callback
|
||||
if ( this->recvProcessThreadIsCurrentThread () ) {
|
||||
@@ -590,39 +593,31 @@ int cac::pendEvent ( const double &timeout )
|
||||
|
||||
this->flushRequestPrivate ();
|
||||
|
||||
bool waitForSignal;
|
||||
if ( this->recvProcessPending ) {
|
||||
this->recvProcessCompletionSignalRequestCount++;
|
||||
waitForSignal = true;
|
||||
// process at least once if preemptive callback
|
||||
// isnt enabled (this avoids complications associated
|
||||
// with forcing the recv processing thread to run.
|
||||
if ( ! this->enablePreemptiveCallback &&
|
||||
this->recvProcessEnableRefCount == 0 ) {
|
||||
this->processRecvBacklog ();
|
||||
}
|
||||
|
||||
double elapsed = epicsTime::getCurrent() - current;
|
||||
double delay;
|
||||
|
||||
if ( timeout > elapsed ) {
|
||||
delay = timeout - elapsed;
|
||||
}
|
||||
else {
|
||||
waitForSignal = false;
|
||||
delay = 0.0;
|
||||
}
|
||||
|
||||
this->enableCallbackPreemption ();
|
||||
|
||||
{
|
||||
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
|
||||
if ( timeout == 0.0 ) {
|
||||
while ( true ) {
|
||||
epicsThreadSleep ( 60.0 );
|
||||
}
|
||||
}
|
||||
else if ( timeout >= CAC_SIGNIFICANT_DELAY ) {
|
||||
epicsThreadSleep ( timeout );
|
||||
}
|
||||
if ( waitForSignal ) {
|
||||
this->recvProcessCompleted.wait ();
|
||||
}
|
||||
}
|
||||
|
||||
this->disableCallbackPreemption ();
|
||||
|
||||
if ( waitForSignal ) {
|
||||
this->recvProcessCompletionSignalRequestCount--;
|
||||
if ( this->recvProcessCompletionSignalRequestCount ) {
|
||||
this->recvProcessCompleted.signal ();
|
||||
if ( delay >= CAC_SIGNIFICANT_DELAY ) {
|
||||
this->enableCallbackPreemption ();
|
||||
{
|
||||
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
|
||||
epicsThreadSleep ( delay );
|
||||
}
|
||||
this->disableCallbackPreemption ();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -674,28 +669,24 @@ void cac::registerService ( cacService &service )
|
||||
|
||||
void cac::startRecvProcessThread ()
|
||||
{
|
||||
bool newThread = false;
|
||||
{
|
||||
epicsAutoMutex epicsMutex ( this->mutex );
|
||||
if ( ! this->pRecvProcessThread ) {
|
||||
unsigned priorityOfProcess;
|
||||
epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove (
|
||||
this->getInitializingThreadsPriority (), &priorityOfProcess );
|
||||
if ( tbs != epicsThreadBooleanStatusSuccess ) {
|
||||
priorityOfProcess = this->getInitializingThreadsPriority ();
|
||||
}
|
||||
|
||||
this->pRecvProcessThread = new epicsThread ( *this, "CAC-recv-process",
|
||||
epicsThreadGetStackSize ( epicsThreadStackSmall ), priorityOfProcess );
|
||||
if ( ! this->pRecvProcessThread ) {
|
||||
throw std::bad_alloc ();
|
||||
}
|
||||
this->pRecvProcessThread->start ();
|
||||
newThread = true;
|
||||
epicsAutoMutex epicsMutex ( this->mutex );
|
||||
if ( ! this->pRecvProcessThread ) {
|
||||
unsigned priorityOfProcess;
|
||||
epicsThreadBooleanStatus tbs = epicsThreadLowestPriorityLevelAbove (
|
||||
this->getInitializingThreadsPriority (), &priorityOfProcess );
|
||||
if ( tbs != epicsThreadBooleanStatusSuccess ) {
|
||||
priorityOfProcess = this->getInitializingThreadsPriority ();
|
||||
}
|
||||
|
||||
this->pRecvProcessThread = new epicsThread ( *this, "CAC-recv-process",
|
||||
epicsThreadGetStackSize ( epicsThreadStackSmall ), priorityOfProcess );
|
||||
if ( ! this->pRecvProcessThread ) {
|
||||
throw std::bad_alloc ();
|
||||
}
|
||||
this->pRecvProcessThread->start ();
|
||||
if ( this->enablePreemptiveCallback ) {
|
||||
this->enableCallbackPreemption ();
|
||||
}
|
||||
}
|
||||
if ( this->enablePreemptiveCallback && newThread ) {
|
||||
this->enableCallbackPreemption ();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -704,17 +695,10 @@ void cac::run ()
|
||||
{
|
||||
this->attachToClientCtx ();
|
||||
while ( ! this->recvProcessThreadExitRequest ) {
|
||||
bool wakeupNeeded;
|
||||
{
|
||||
this->recvProcessPending = true;
|
||||
epicsAutoMutex autoMutexPCB ( this->preemptiveCallbackLock );
|
||||
epicsAutoMutex autoMutex ( this->mutex );
|
||||
this->processRecvBacklog ();
|
||||
this->recvProcessPending = false;
|
||||
wakeupNeeded = this->recvProcessCompletionSignalRequestCount > 0u;
|
||||
}
|
||||
if ( wakeupNeeded ) {
|
||||
this->recvProcessCompleted.signal ();
|
||||
}
|
||||
this->recvProcessActivityEvent.wait ();
|
||||
}
|
||||
@@ -802,20 +786,14 @@ void cac::enableCallbackPreemption ()
|
||||
// lock must already be applied
|
||||
void cac::disableCallbackPreemption ()
|
||||
{
|
||||
if ( this->recvProcessEnableRefCount == 1u ) {
|
||||
if ( ! this->preemptiveCallbackLock.tryLock () ) {
|
||||
{
|
||||
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
|
||||
this->preemptiveCallbackLock.lock ();
|
||||
}
|
||||
// in case some thread enabled it while this->mutex was unlocked
|
||||
if ( this->recvProcessEnableRefCount > 1u ) {
|
||||
this->preemptiveCallbackLock.unlock ();
|
||||
}
|
||||
}
|
||||
}
|
||||
assert ( this->recvProcessEnableRefCount > 0u );
|
||||
this->recvProcessEnableRefCount--;
|
||||
if ( this->recvProcessEnableRefCount == 0u ) {
|
||||
if ( ! this->preemptiveCallbackLock.tryLock () ) {
|
||||
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
|
||||
this->preemptiveCallbackLock.lock ();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void cac::repeaterSubscribeConfirmNotify ()
|
||||
@@ -1045,7 +1023,13 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
|
||||
if ( pSubscr ) {
|
||||
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
|
||||
}
|
||||
if ( pRecvProcessThread->isCurrentThread() ) {
|
||||
//
|
||||
// if preemptive callback is not enabled then
|
||||
// the code will block until the recv process
|
||||
// thread is idle before returning control back
|
||||
// to the user
|
||||
if ( ! this->enablePreemptiveCallback ||
|
||||
pRecvProcessThread->isCurrentThread() ) {
|
||||
signalNeeded = false;
|
||||
}
|
||||
else {
|
||||
|
||||
@@ -163,7 +163,6 @@ private:
|
||||
epicsEvent notifyCompletionEvent;
|
||||
epicsEvent recvProcessActivityEvent;
|
||||
epicsEvent recvProcessThreadExit;
|
||||
epicsEvent recvProcessCompleted;
|
||||
epicsEvent ioDone;
|
||||
epicsTimerQueueActive *pTimerQueue;
|
||||
char *pUserName;
|
||||
@@ -182,11 +181,9 @@ private:
|
||||
unsigned recvProcessEnableRefCount;
|
||||
unsigned pndRecvCnt;
|
||||
unsigned readSeq;
|
||||
unsigned recvProcessCompletionSignalRequestCount;
|
||||
bool enablePreemptiveCallback;
|
||||
bool ioInProgress;
|
||||
bool recvProcessThreadExitRequest;
|
||||
bool recvProcessPending;
|
||||
|
||||
void processRecvBacklog ();
|
||||
void flushRequestPrivate ();
|
||||
|
||||
Reference in New Issue
Block a user