removed mmsg minder and eliminate dlow level knowledge of cb locker

This commit is contained in:
Jeff Hill
2003-04-18 22:10:39 +00:00
parent 2399da3a1a
commit 35d0d41f22
2 changed files with 15 additions and 114 deletions
+8 -67
View File
@@ -134,10 +134,11 @@ extern "C" void cacOnceFunc ( void * )
//
// cac::cac ()
//
cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) :
cac::cac ( cacNotify & notifyIn ) :
ipToAEngine ( "dnsQuery" ),
programBeginTime ( epicsTime::getCurrent() ),
connTMO ( CA_CONN_VERIFY_PERIOD ),
cbMutex ( notifyIn ),
globalServiceList ( globalServiceListCAC.getReference () ),
timerQueue ( epicsTimerQueueActive::allocate ( false,
lowestPriorityLevelAbove(epicsThreadGetPrioritySelf()) ) ),
@@ -149,9 +150,7 @@ cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) :
initializingThreadsId ( epicsThreadGetIdSelf() ),
initializingThreadsPriority ( epicsThreadGetPrioritySelf() ),
maxRecvBytesTCP ( MAX_TCP ),
nRecvThreadsPending ( 0u ),
beaconAnomalyCount ( 0u ),
preemptiveCallbackEnabled ( enablePreemptiveCallbackIn )
beaconAnomalyCount ( 0u )
{
if ( ! osiSockAttach () ) {
throwWithLocation ( caErrorCode (ECA_INTERNAL) );
@@ -1505,12 +1504,9 @@ void cac::disconnectNotify ( tcpiiu & iiu )
void cac::initiateAbortShutdown ( tcpiiu & iiu )
{
cacMessageProcessingMinder msgProcMinder ( *this );
{
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
epicsGuard < cacMutex > guard ( this->mutex );
iiu.initiateAbortShutdown ( cbGuard, guard );
}
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
epicsGuard < cacMutex > guard ( this->mutex );
iiu.initiateAbortShutdown ( cbGuard, guard );
}
void cac::uninstallIIU ( epicsGuard < callbackMutex > & cbGuard, tcpiiu & iiu )
@@ -1581,64 +1577,9 @@ void cac::pvMultiplyDefinedNotify ( msgForMultiplyDefinedPV & mfmdpv,
sprintf ( buf, "Channel: \"%.64s\", Connecting to: %.64s, Ignored: %.64s",
pChannelName, pAcc, pRej );
{
cacMessageProcessingMinder msgProcMinder ( *this );
{
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
this->exception ( cbGuard, ECA_DBLCHNL, buf, __FILE__, __LINE__ );
}
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
this->exception ( cbGuard, ECA_DBLCHNL, buf, __FILE__, __LINE__ );
}
mfmdpv.~msgForMultiplyDefinedPV ();
this->mdpvFreeList.release ( & mfmdpv );
}
//
// This is needed because in non-preemptive callback mode
// legacy applications that use file descriptor managers
// will register for ca receive thread activity and keep
// calling ca_pend_event until all of the socket data has
// been read. We must guarantee that other threads get a
// chance to run if there is data in any of the sockets.
//
void cac::waitUntilNoRecvThreadsPending ()
{
if ( ! this->preemptiveCallbackEnabled ) {
epicsGuard < cacMutex > guard ( this->mutex );
while ( this->nRecvThreadsPending > 0 ) {
epicsGuardRelease < cacMutex > unguard ( guard );
this->recvThreadActivityComplete.wait ( 30.0 );
}
}
}
void cac::messageProcessingCompleteNotify ()
{
if ( ! this->preemptiveCallbackEnabled ) {
bool signalNeeded = false;
{
epicsGuard < cacMutex > guard ( this->mutex );
if ( this->nRecvThreadsPending <= 1 ) {
if ( this->nRecvThreadsPending == 1 ) {
this->nRecvThreadsPending = 0;
signalNeeded = true;
}
}
else {
this->nRecvThreadsPending--;
}
}
if ( signalNeeded ) {
this->recvThreadActivityComplete.signal ();
}
}
}
void cac::messageArrivalNotify ()
{
if ( ! this->preemptiveCallbackEnabled ) {
epicsGuard < cacMutex > guard ( this->mutex );
this->nRecvThreadsPending++;
}
this->notify.messageArrivalNotify ();
}
+7 -47
View File
@@ -73,12 +73,12 @@ extern epicsThreadPrivateId caClientCallbackThreadId;
class callbackMutex {
public:
callbackMutex ();
callbackMutex ( cacNotify & );
~callbackMutex ();
void lock ();
void unlock ();
private:
epicsMutex primaryMutex;
cacNotify & notify;
callbackMutex ( callbackMutex & );
callbackMutex & operator = ( callbackMutex & );
};
@@ -109,19 +109,11 @@ public:
epicsGuard < cacMutex > &, nciu & chan ) = 0;
};
class cacMessageProcessingMinder {
public:
cacMessageProcessingMinder ( class cac & );
~cacMessageProcessingMinder ();
private:
class cac & cacRef;
};
class cac : private cacRecycle, private cacDisconnectChannelPrivate,
private callbackForMultiplyDefinedPV
{
public:
cac ( cacNotify &, bool enablePreemptiveCallbackIn );
cac ( cacNotify & );
virtual ~cac ();
// beacon management
@@ -132,8 +124,6 @@ public:
// IO management
void flushRequest ();
void waitUntilNoRecvThreadsPending ();
epicsGuard < callbackMutex > callbackGuardFactory ();
bool executeResponse ( epicsGuard < callbackMutex > &, tcpiiu &,
const epicsTime & currentTime, caHdrLargeArray &, char *pMsgBody );
@@ -194,7 +184,6 @@ public:
cacMutex & mutexRef ();
void attachToClientCtx ();
void selfTest () const;
bool preemptiveCallbakIsEnabled () const;
double beaconPeriod ( const nciu & chan ) const;
static unsigned lowestPriorityLevelAbove ( unsigned priority );
static unsigned highestPriorityLevelBelow ( unsigned priority );
@@ -247,7 +236,6 @@ private:
callbackMutex cbMutex;
mutable cacMutex mutex;
epicsEvent iiuUninstall;
epicsEvent recvThreadActivityComplete;
epicsSingleton
< cacServiceList >::reference
globalServiceList;
@@ -260,9 +248,7 @@ private:
epicsThreadId initializingThreadsId;
unsigned initializingThreadsPriority;
unsigned maxRecvBytesTCP;
unsigned nRecvThreadsPending;
unsigned beaconAnomalyCount;
bool preemptiveCallbackEnabled;
void run ();
void connectAllIO ( epicsGuard < cacMutex > &, nciu &chan );
@@ -345,26 +331,10 @@ private:
const char *pCtx, unsigned status );
static const pExcepProtoStubTCP tcpExcepJumpTableCAC [];
void messageArrivalNotify ();
void messageProcessingCompleteNotify ();
cac ( const cac & );
cac & operator = ( const cac & );
friend class cacMessageProcessingMinder;
};
inline cacMessageProcessingMinder::cacMessageProcessingMinder ( cac & cacIn ) :
cacRef ( cacIn )
{
cacIn.messageArrivalNotify ();
}
inline cacMessageProcessingMinder::~cacMessageProcessingMinder ()
{
cacRef.messageProcessingCompleteNotify ();
}
inline const char * cac::userNamePointer () const
{
return this->pUserName;
@@ -425,22 +395,11 @@ inline void cac::releaseLargeBufferTCP ( char *pBuf )
freeListFree ( this->tcpLargeRecvBufFreeList, pBuf );
}
inline bool cac::preemptiveCallbakIsEnabled () const
{
return this->preemptiveCallbackEnabled;
}
inline unsigned cac::beaconAnomaliesSinceProgramStart () const
{
return this->beaconAnomalyCount;
}
inline epicsGuard < callbackMutex > cac::callbackGuardFactory ()
{
// facilitate the return value optimization
return ( epicsGuard < callbackMutex > ( this->cbMutex ) );
}
inline void cacMutex::lock ()
{
this->mutex.lock ();
@@ -456,7 +415,8 @@ inline void cacMutex::show ( unsigned level ) const
this->mutex.show ( level );
}
inline callbackMutex::callbackMutex ()
inline callbackMutex::callbackMutex ( cacNotify & notifyIn ) :
notify ( notifyIn )
{
}
@@ -466,12 +426,12 @@ inline callbackMutex::~callbackMutex ()
inline void callbackMutex::lock ()
{
this->primaryMutex.lock ();
this->notify.callbackLock ();
}
inline void callbackMutex::unlock ()
{
this->primaryMutex.unlock ();
this->notify.callbackUnlock ();
}
#endif // ifdef cach