fixed cpu consumption problems in clients that use fd managers
together with ca fd registration
This commit is contained in:
@@ -69,7 +69,6 @@ LIBSRCS += comQueSend.cpp
|
||||
LIBSRCS += comBuf.cpp
|
||||
LIBSRCS += hostNameCache.cpp
|
||||
LIBSRCS += msgForMultiplyDefinedPV.cpp
|
||||
LIBSRCS += callbackMutex.cpp
|
||||
LIBSRCS += templateInstances.cpp
|
||||
|
||||
LIBRARY=ca
|
||||
|
||||
+62
-1
@@ -138,7 +138,6 @@ cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) :
|
||||
ipToAEngine ( "dnsQuery" ),
|
||||
programBeginTime ( epicsTime::getCurrent() ),
|
||||
connTMO ( CA_CONN_VERIFY_PERIOD ),
|
||||
cbMutex ( ! enablePreemptiveCallbackIn ),
|
||||
globalServiceList ( globalServiceListCAC.getReference () ),
|
||||
timerQueue ( epicsTimerQueueActive::allocate ( false,
|
||||
lowestPriorityLevelAbove(epicsThreadGetPrioritySelf()) ) ),
|
||||
@@ -1606,5 +1605,67 @@ void cac::pvMultiplyDefinedNotify ( msgForMultiplyDefinedPV & mfmdpv,
|
||||
this->mdpvFreeList.release ( & mfmdpv );
|
||||
}
|
||||
|
||||
//
|
||||
// This is needed because in non-preemptive callback mode
|
||||
// legacy applications that use file descriptor managers
|
||||
// will register for ca receive thread activity and keep
|
||||
// calling ca_pend_event until all of the socket data has
|
||||
// been read. We must guarantee that other threads get a
|
||||
// chance to run if there is data in any of the sockets.
|
||||
//
|
||||
void cac::waitUntilNoRecvThreadsPending ()
|
||||
{
|
||||
if ( ! this->preemptiveCallbackEnabled ) {
|
||||
{
|
||||
const struct timeval delay = { 0, 0 };
|
||||
fd_set mask;
|
||||
FD_ZERO ( & mask );
|
||||
int count = 0;
|
||||
epicsGuard < cacMutex > guard ( this->mutex );
|
||||
tsDLIter < tcpiiu > iter = this->serverList.firstIter ();
|
||||
if ( this->pudpiiu ) {
|
||||
this->pudpiiu->fdMaskSet ( mask );
|
||||
count++;
|
||||
}
|
||||
while ( iter.valid() ) {
|
||||
iter->fdMaskSet ( mask );
|
||||
count++;
|
||||
iter++;
|
||||
}
|
||||
|
||||
int status = select ( count, & mask, 0, 0, & delay );
|
||||
if ( status <= 0 ) {
|
||||
return;
|
||||
}
|
||||
this->nRecvThreadsPending =
|
||||
static_cast < unsigned > ( count );
|
||||
}
|
||||
|
||||
this->recvThreadActivityComplete.wait ( 0.1 );
|
||||
}
|
||||
}
|
||||
|
||||
void cac::signalRecvThreadActivity ()
|
||||
{
|
||||
if ( ! this->preemptiveCallbackEnabled ) {
|
||||
bool signalNeeded = false;
|
||||
{
|
||||
epicsGuard < cacMutex > guard ( this->mutex );
|
||||
if ( this->nRecvThreadsPending <= 1 ) {
|
||||
if ( this->nRecvThreadsPending == 1 ) {
|
||||
this->nRecvThreadsPending = 0;
|
||||
signalNeeded = true;
|
||||
}
|
||||
}
|
||||
else {
|
||||
this->nRecvThreadsPending--;
|
||||
}
|
||||
}
|
||||
if ( signalNeeded ) {
|
||||
this->recvThreadActivityComplete.signal ();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
+22
-11
@@ -73,17 +73,12 @@ extern epicsThreadPrivateId caClientCallbackThreadId;
|
||||
|
||||
class callbackMutex {
|
||||
public:
|
||||
callbackMutex ( bool threadsMayBeBlockingForRecvThreadsToFinish );
|
||||
callbackMutex ();
|
||||
~callbackMutex ();
|
||||
void lock ();
|
||||
void unlock ();
|
||||
void waitUntilNoRecvThreadsPending ();
|
||||
private:
|
||||
epicsMutex countMutex;
|
||||
epicsMutex primaryMutex;
|
||||
epicsEvent noRecvThreadsPending;
|
||||
unsigned recvThreadsPendingCount;
|
||||
bool threadsMayBeBlockingForRecvThreadsToFinish;
|
||||
callbackMutex ( callbackMutex & );
|
||||
callbackMutex & operator = ( callbackMutex & );
|
||||
};
|
||||
@@ -198,6 +193,7 @@ public:
|
||||
void initiateAbortShutdown ( tcpiiu & );
|
||||
void disconnectNotify ( tcpiiu & );
|
||||
void uninstallIIU ( tcpiiu & );
|
||||
void signalRecvThreadActivity ();
|
||||
|
||||
private:
|
||||
localHostName hostNameCache;
|
||||
@@ -244,6 +240,7 @@ private:
|
||||
callbackMutex cbMutex;
|
||||
mutable cacMutex mutex;
|
||||
epicsEvent iiuUninstall;
|
||||
epicsEvent recvThreadActivityComplete;
|
||||
epicsSingleton
|
||||
< cacServiceList >::reference
|
||||
globalServiceList;
|
||||
@@ -256,6 +253,7 @@ private:
|
||||
epicsThreadId initializingThreadsId;
|
||||
unsigned initializingThreadsPriority;
|
||||
unsigned maxRecvBytesTCP;
|
||||
unsigned nRecvThreadsPending;
|
||||
bool preemptiveCallbackEnabled;
|
||||
|
||||
void privateUninstallIIU ( epicsGuard < callbackMutex > &, tcpiiu &iiu );
|
||||
@@ -406,11 +404,6 @@ inline bool cac::preemptiveCallbakIsEnabled () const
|
||||
return this->preemptiveCallbackEnabled;
|
||||
}
|
||||
|
||||
inline void cac::waitUntilNoRecvThreadsPending ()
|
||||
{
|
||||
this->cbMutex.waitUntilNoRecvThreadsPending ();
|
||||
}
|
||||
|
||||
inline epicsGuard < callbackMutex > cac::callbackGuardFactory ()
|
||||
{
|
||||
// facilitate the return value optimization
|
||||
@@ -432,5 +425,23 @@ inline void cacMutex::show ( unsigned level ) const
|
||||
this->mutex.show ( level );
|
||||
}
|
||||
|
||||
inline callbackMutex::callbackMutex ()
|
||||
{
|
||||
}
|
||||
|
||||
inline callbackMutex::~callbackMutex ()
|
||||
{
|
||||
}
|
||||
|
||||
inline void callbackMutex::lock ()
|
||||
{
|
||||
this->primaryMutex.lock ();
|
||||
}
|
||||
|
||||
inline void callbackMutex::unlock ()
|
||||
{
|
||||
this->primaryMutex.unlock ();
|
||||
}
|
||||
|
||||
#endif // ifdef cach
|
||||
|
||||
|
||||
@@ -1,85 +0,0 @@
|
||||
/*************************************************************************\
|
||||
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
|
||||
* National Laboratory.
|
||||
* Copyright (c) 2002 The Regents of the University of California, as
|
||||
* Operator of Los Alamos National Laboratory.
|
||||
* EPICS BASE Versions 3.13.7
|
||||
* and higher are distributed subject to a Software License Agreement found
|
||||
* in file LICENSE that is included with this distribution.
|
||||
\*************************************************************************/
|
||||
|
||||
/*
|
||||
* $Id$
|
||||
*
|
||||
* L O S A L A M O S
|
||||
* Los Alamos National Laboratory
|
||||
* Los Alamos, New Mexico 87545
|
||||
*
|
||||
* Copyright, 1986, The Regents of the University of California.
|
||||
*
|
||||
* Author: Jeff Hill
|
||||
*/
|
||||
|
||||
#define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
|
||||
|
||||
#include "cac.h"
|
||||
|
||||
callbackMutex::callbackMutex ( bool threadsMayBeBlockingForRecvThreadsToFinishIn ) :
|
||||
recvThreadsPendingCount ( 0u ),
|
||||
threadsMayBeBlockingForRecvThreadsToFinish ( threadsMayBeBlockingForRecvThreadsToFinishIn )
|
||||
{
|
||||
}
|
||||
|
||||
callbackMutex::~callbackMutex ()
|
||||
{
|
||||
}
|
||||
|
||||
void callbackMutex::lock ()
|
||||
{
|
||||
if ( this->threadsMayBeBlockingForRecvThreadsToFinish ) {
|
||||
if ( ! this->primaryMutex.tryLock () ) {
|
||||
// the count must be incremented prior to blocking for the lock
|
||||
{
|
||||
epicsGuard < epicsMutex > autoMutex ( this->countMutex );
|
||||
assert ( this->recvThreadsPendingCount < UINT_MAX );
|
||||
this->recvThreadsPendingCount++;
|
||||
}
|
||||
|
||||
this->primaryMutex.lock ();
|
||||
|
||||
bool signalRequired;
|
||||
{
|
||||
epicsGuard < epicsMutex > autoMutex ( this->countMutex );
|
||||
assert ( this->recvThreadsPendingCount > 0 );
|
||||
this->recvThreadsPendingCount--;
|
||||
if ( this->recvThreadsPendingCount == 0u ) {
|
||||
signalRequired = true;
|
||||
}
|
||||
else {
|
||||
signalRequired = false;
|
||||
}
|
||||
}
|
||||
|
||||
if ( signalRequired ) {
|
||||
this->noRecvThreadsPending.signal ();
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
this->primaryMutex.lock ();
|
||||
}
|
||||
}
|
||||
|
||||
void callbackMutex::unlock ()
|
||||
{
|
||||
this->primaryMutex.unlock ();
|
||||
}
|
||||
|
||||
void callbackMutex::waitUntilNoRecvThreadsPending ()
|
||||
{
|
||||
epicsGuard < epicsMutex > autoMutex ( this->countMutex );
|
||||
while ( this->recvThreadsPendingCount > 0 ) {
|
||||
epicsGuardRelease < epicsMutex > autoMutexRelease ( autoMutex );
|
||||
this->noRecvThreadsPending.wait ();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user