diff --git a/src/ca/Makefile b/src/ca/Makefile index 5598f02f5..3aef5d42b 100644 --- a/src/ca/Makefile +++ b/src/ca/Makefile @@ -69,7 +69,6 @@ LIBSRCS += comQueSend.cpp LIBSRCS += comBuf.cpp LIBSRCS += hostNameCache.cpp LIBSRCS += msgForMultiplyDefinedPV.cpp -LIBSRCS += callbackMutex.cpp LIBSRCS += templateInstances.cpp LIBRARY=ca diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index c2b73166d..9e915d0f3 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -138,7 +138,6 @@ cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) : ipToAEngine ( "dnsQuery" ), programBeginTime ( epicsTime::getCurrent() ), connTMO ( CA_CONN_VERIFY_PERIOD ), - cbMutex ( ! enablePreemptiveCallbackIn ), globalServiceList ( globalServiceListCAC.getReference () ), timerQueue ( epicsTimerQueueActive::allocate ( false, lowestPriorityLevelAbove(epicsThreadGetPrioritySelf()) ) ), @@ -1606,5 +1605,67 @@ void cac::pvMultiplyDefinedNotify ( msgForMultiplyDefinedPV & mfmdpv, this->mdpvFreeList.release ( & mfmdpv ); } +// +// This is needed because in non-preemptive callback mode +// legacy applications that use file descriptor managers +// will register for ca receive thread activity and keep +// calling ca_pend_event until all of the socket data has +// been read. We must guarantee that other threads get a +// chance to run if there is data in any of the sockets. +// +void cac::waitUntilNoRecvThreadsPending () +{ + if ( ! this->preemptiveCallbackEnabled ) { + { + const struct timeval delay = { 0, 0 }; + fd_set mask; + FD_ZERO ( & mask ); + int count = 0; + epicsGuard < cacMutex > guard ( this->mutex ); + tsDLIter < tcpiiu > iter = this->serverList.firstIter (); + if ( this->pudpiiu ) { + this->pudpiiu->fdMaskSet ( mask ); + count++; + } + while ( iter.valid() ) { + iter->fdMaskSet ( mask ); + count++; + iter++; + } + + int status = select ( count, & mask, 0, 0, & delay ); + if ( status <= 0 ) { + return; + } + this->nRecvThreadsPending = + static_cast < unsigned > ( count ); + } + + this->recvThreadActivityComplete.wait ( 0.1 ); + } +} + +void cac::signalRecvThreadActivity () +{ + if ( ! this->preemptiveCallbackEnabled ) { + bool signalNeeded = false; + { + epicsGuard < cacMutex > guard ( this->mutex ); + if ( this->nRecvThreadsPending <= 1 ) { + if ( this->nRecvThreadsPending == 1 ) { + this->nRecvThreadsPending = 0; + signalNeeded = true; + } + } + else { + this->nRecvThreadsPending--; + } + } + if ( signalNeeded ) { + this->recvThreadActivityComplete.signal (); + } + } +} + diff --git a/src/ca/cac.h b/src/ca/cac.h index 3b583fc48..d1becde71 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -73,17 +73,12 @@ extern epicsThreadPrivateId caClientCallbackThreadId; class callbackMutex { public: - callbackMutex ( bool threadsMayBeBlockingForRecvThreadsToFinish ); + callbackMutex (); ~callbackMutex (); void lock (); void unlock (); - void waitUntilNoRecvThreadsPending (); private: - epicsMutex countMutex; epicsMutex primaryMutex; - epicsEvent noRecvThreadsPending; - unsigned recvThreadsPendingCount; - bool threadsMayBeBlockingForRecvThreadsToFinish; callbackMutex ( callbackMutex & ); callbackMutex & operator = ( callbackMutex & ); }; @@ -198,6 +193,7 @@ public: void initiateAbortShutdown ( tcpiiu & ); void disconnectNotify ( tcpiiu & ); void uninstallIIU ( tcpiiu & ); + void signalRecvThreadActivity (); private: localHostName hostNameCache; @@ -244,6 +240,7 @@ private: callbackMutex cbMutex; mutable cacMutex mutex; epicsEvent iiuUninstall; + epicsEvent recvThreadActivityComplete; epicsSingleton < cacServiceList >::reference globalServiceList; @@ -256,6 +253,7 @@ private: epicsThreadId initializingThreadsId; unsigned initializingThreadsPriority; unsigned maxRecvBytesTCP; + unsigned nRecvThreadsPending; bool preemptiveCallbackEnabled; void privateUninstallIIU ( epicsGuard < callbackMutex > &, tcpiiu &iiu ); @@ -406,11 +404,6 @@ inline bool cac::preemptiveCallbakIsEnabled () const return this->preemptiveCallbackEnabled; } -inline void cac::waitUntilNoRecvThreadsPending () -{ - this->cbMutex.waitUntilNoRecvThreadsPending (); -} - inline epicsGuard < callbackMutex > cac::callbackGuardFactory () { // facilitate the return value optimization @@ -432,5 +425,23 @@ inline void cacMutex::show ( unsigned level ) const this->mutex.show ( level ); } +inline callbackMutex::callbackMutex () +{ +} + +inline callbackMutex::~callbackMutex () +{ +} + +inline void callbackMutex::lock () +{ + this->primaryMutex.lock (); +} + +inline void callbackMutex::unlock () +{ + this->primaryMutex.unlock (); +} + #endif // ifdef cach diff --git a/src/ca/callbackMutex.cpp b/src/ca/callbackMutex.cpp deleted file mode 100644 index c8d9d0071..000000000 --- a/src/ca/callbackMutex.cpp +++ /dev/null @@ -1,85 +0,0 @@ -/*************************************************************************\ -* Copyright (c) 2002 The University of Chicago, as Operator of Argonne -* National Laboratory. -* Copyright (c) 2002 The Regents of the University of California, as -* Operator of Los Alamos National Laboratory. -* EPICS BASE Versions 3.13.7 -* and higher are distributed subject to a Software License Agreement found -* in file LICENSE that is included with this distribution. -\*************************************************************************/ - -/* - * $Id$ - * - * L O S A L A M O S - * Los Alamos National Laboratory - * Los Alamos, New Mexico 87545 - * - * Copyright, 1986, The Regents of the University of California. - * - * Author: Jeff Hill - */ - -#define epicsAssertAuthor "Jeff Hill johill@lanl.gov" - -#include "cac.h" - -callbackMutex::callbackMutex ( bool threadsMayBeBlockingForRecvThreadsToFinishIn ) : - recvThreadsPendingCount ( 0u ), - threadsMayBeBlockingForRecvThreadsToFinish ( threadsMayBeBlockingForRecvThreadsToFinishIn ) -{ -} - -callbackMutex::~callbackMutex () -{ -} - -void callbackMutex::lock () -{ - if ( this->threadsMayBeBlockingForRecvThreadsToFinish ) { - if ( ! this->primaryMutex.tryLock () ) { - // the count must be incremented prior to blocking for the lock - { - epicsGuard < epicsMutex > autoMutex ( this->countMutex ); - assert ( this->recvThreadsPendingCount < UINT_MAX ); - this->recvThreadsPendingCount++; - } - - this->primaryMutex.lock (); - - bool signalRequired; - { - epicsGuard < epicsMutex > autoMutex ( this->countMutex ); - assert ( this->recvThreadsPendingCount > 0 ); - this->recvThreadsPendingCount--; - if ( this->recvThreadsPendingCount == 0u ) { - signalRequired = true; - } - else { - signalRequired = false; - } - } - - if ( signalRequired ) { - this->noRecvThreadsPending.signal (); - } - } - } - else { - this->primaryMutex.lock (); - } -} - -void callbackMutex::unlock () -{ - this->primaryMutex.unlock (); -} - -void callbackMutex::waitUntilNoRecvThreadsPending () -{ - epicsGuard < epicsMutex > autoMutex ( this->countMutex ); - while ( this->recvThreadsPendingCount > 0 ) { - epicsGuardRelease < epicsMutex > autoMutexRelease ( autoMutex ); - this->noRecvThreadsPending.wait (); - } -}