From e4075da4d795f465af56e9d158f0cc602a094e09 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Thu, 13 Aug 2009 23:49:59 +0000 Subject: [PATCH] o added new ioQue so the server will not wedge one of its clients when simultaneouly in flow control mode and also asynchronous io postponed mode. o simplified the status from the process func because callers should not need to know how many messages have been sent --- src/cas/generic/casEventSys.cc | 145 +++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 54 deletions(-) diff --git a/src/cas/generic/casEventSys.cc b/src/cas/generic/casEventSys.cc index dbeb62f91..f86feb227 100644 --- a/src/cas/generic/casEventSys.cc +++ b/src/cas/generic/casEventSys.cc @@ -33,11 +33,13 @@ void casEventSys::show ( unsigned level ) const if (level>=1u) { printf ( "\numSubscriptions = %u, maxLogEntries = %u\n", this->numSubscriptions, this->maxLogEntries ); - printf ( "\tthere are %d events in the queue\n", + printf ( "\tthere are %d items in the event queue\n", this->eventLogQue.count() ); - printf ( "Replace events flag = %d, dontProcess flag = %d\n", + printf ( "\tthere are %d items in the io queue\n", + this->ioQue.count() ); + printf ( "Replace events flag = %d, dontProcessSubscr flag = %d\n", static_cast < int > ( this->replaceEvents ), - static_cast < int > ( this->dontProcess ) ); + static_cast < int > ( this->dontProcessSubscr ) ); } } @@ -49,13 +51,14 @@ casEventSys::~casEventSys() } // at this point: - // o all channels delete - // o all IO deleted + // o all channels have been deleted + // o all IO has been deleted // o any subscription events remaining on the queue // are pending destroy // verify above assertion is true casVerify ( this->eventLogQue.count() == 0 ); + casVerify ( this->ioQue.count() == 0 ); // all active subscriptions should also have been // uninstalled @@ -81,20 +84,18 @@ void casEventSys::removeMonitor () this->maxLogEntries -= averageEventEntries; } -casEventSys::processStatus casEventSys::process ( +casProcCond casEventSys :: process ( epicsGuard < casClientMutex > & casClientGuard ) { - casEventSys::processStatus ps; - ps.cond = casProcOk; - ps.nAccepted = 0u; + casProcCond cond = casProcOk; epicsGuard < evSysMutex > evGuard ( this->mutex ); - while ( ! this->dontProcess ) { - casEvent * pEvent; - - pEvent = this->eventLogQue.get (); - + // we need two queues, one for io and one for subscriptions, + // so that we dont hang up the server when in an IO postponed + // state simultaneouly with a flow control active state + while ( true ) { + casEvent * pEvent = this->ioQue.get (); if ( pEvent == NULL ) { break; } @@ -102,27 +103,59 @@ casEventSys::processStatus casEventSys::process ( caStatus status = pEvent->cbFunc ( this->client, casClientGuard, evGuard ); if ( status == S_cas_success ) { - ps.nAccepted++; + cond = casProcOk; } else if ( status == S_cas_sendBlocked ) { // not accepted so return to the head of the list // (we will try again later) - this->eventLogQue.push ( *pEvent ); - ps.cond = casProcOk; + this->ioQue.push ( *pEvent ); + cond = casProcOk; break; } else if ( status == S_cas_disconnect ) { - ps.cond = casProcDisconnect; + cond = casProcDisconnect; break; } else { errMessage ( status, - "- unexpected error processing event" ); - ps.cond = casProcDisconnect; + "- unexpected error, processing io queue" ); + cond = casProcDisconnect; break; } } + if ( cond == casProcOk ) { + while ( ! this->dontProcessSubscr ) { + casEvent * pEvent = this->eventLogQue.get (); + if ( pEvent == NULL ) { + break; + } + + caStatus status = pEvent->cbFunc ( + this->client, casClientGuard, evGuard ); + if ( status == S_cas_success ) { + cond = casProcOk; + } + else if ( status == S_cas_sendBlocked ) { + // not accepted so return to the head of the list + // (we will try again later) + this->eventLogQue.push ( *pEvent ); + cond = casProcOk; + break; + } + else if ( status == S_cas_disconnect ) { + cond = casProcDisconnect; + break; + } + else { + errMessage ( status, + "- unexpected error, processing event queue" ); + cond = casProcDisconnect; + break; + } + } + } + // // allows the derived class to be informed that it // needs to delete itself via the event system @@ -133,10 +166,10 @@ casEventSys::processStatus casEventSys::process ( // pointer. // if ( this->destroyPending ) { - ps.cond = casProcDisconnect; + cond = casProcDisconnect; } - return ps; + return cond; } void casEventSys::eventsOn () @@ -151,7 +184,7 @@ void casEventSys::eventsOn () // // allow the event queue to be processed // - this->dontProcess = false; + this->dontProcessSubscr = false; // // remove purge event if it is still pending @@ -188,7 +221,7 @@ bool casEventSys::eventsOff () // stop processing and sending events to the client // until we exit flow control // - this->dontProcess = true; + this->dontProcessSubscr = true; } else { if ( this->eventLogQue.count() == 0 ) { @@ -211,7 +244,7 @@ caStatus casEventPurgeEv::cbFunc ( epicsGuard < casClientMutex > &, epicsGuard < evSysMutex > & ) { - this->evSys.dontProcess = true; + this->evSys.dontProcessSubscr = true; this->evSys.pPurgeEvent = NULL; delete this; return S_cas_success; @@ -229,51 +262,57 @@ caStatus casEventSys::addToEventQueue ( casAsyncIOI & event, } posted = true; onTheQueue = true; - wakeupNeeded = ! this->dontProcess && this->eventLogQue.count() == 0; - this->eventLogQue.add ( event ); + wakeupNeeded = + ( this->dontProcessSubscr || this->eventLogQue.count() == 0 ) && + this->ioQue.count() == 0; + this->ioQue.add ( event ); } return S_cas_success; } -void casEventSys::removeFromEventQueue ( casAsyncIOI & io, bool & onTheEventQueue ) +void casEventSys::removeFromEventQueue ( casAsyncIOI & io, bool & onTheIOQueue ) { epicsGuard < epicsMutex > guard ( this->mutex ); - if ( onTheEventQueue ) { - onTheEventQueue = false; - this->eventLogQue.remove ( io ); + if ( onTheIOQueue ) { + onTheIOQueue = false; + this->ioQue.remove ( io ); } } bool casEventSys::addToEventQueue ( casChannelI & event, - bool & inTheEventQueue ) + bool & onTheIOQueue ) { - bool wakeupRequired = false; + bool wakeupNeeded = false; { epicsGuard < epicsMutex > guard ( this->mutex ); - if ( ! inTheEventQueue ) { - inTheEventQueue = true; - wakeupRequired = ! this->dontProcess && this->eventLogQue.count()==0; - this->eventLogQue.add ( event ); + if ( ! onTheIOQueue ) { + onTheIOQueue = true; + wakeupNeeded = + ( this->dontProcessSubscr || this->eventLogQue.count() == 0 ) && + this->ioQue.count() == 0; + this->ioQue.add ( event ); } } - return wakeupRequired; + return wakeupNeeded; } void casEventSys::removeFromEventQueue ( class casChannelI & io, - bool & inTheEventQueue ) + bool & onTheIOQueue ) { epicsGuard < epicsMutex > guard ( this->mutex ); - if ( inTheEventQueue ) { - inTheEventQueue = false; - this->eventLogQue.remove ( io ); + if ( onTheIOQueue ) { + onTheIOQueue = false; + this->ioQue.remove ( io ); } } bool casEventSys::addToEventQueue ( channelDestroyEvent & event ) { epicsGuard < epicsMutex > guard ( this->mutex ); - bool wakeupRequired = ! this->dontProcess && this->eventLogQue.count()==0; - this->eventLogQue.add ( event ); + bool wakeupRequired = + ( this->dontProcessSubscr || this->eventLogQue.count() == 0 ) && + this->ioQue.count() == 0; + this->ioQue.add ( event ); return wakeupRequired; } @@ -283,14 +322,10 @@ void casEventSys::setDestroyPending () this->destroyPending = true; } -inline bool casEventSys::full () const // X aCC 361 +inline bool casEventSys::full () const { - if ( this->replaceEvents || this->eventLogQue.count() >= this->maxLogEntries ) { - return true; - } - else { - return false; - } + return this->replaceEvents || + this->eventLogQue.count() >= this->maxLogEntries; } bool casEventSys::postEvent ( tsDLList < casMonitor > & monitorList, @@ -323,9 +358,11 @@ bool casEventSys::postEvent ( tsDLList < casMonitor > & monitorList, pLog = 0; } - if ( this->eventLogQue.count() == 0 ) { - signalNeeded = true; - } + signalNeeded = + !this->dontProcessSubscr && + this->eventLogQue.count() == 0 && + this->ioQue.count() == 0; + iter->installNewEventLog ( this->eventLogQue, pLog, event ); }