o added new ioQue so the server will not wedge one of its clients

when simultaneouly in flow control mode and also asynchronous io
postponed mode.
o simplified the status from the process func because callers should
not need to know how many messages have been sent
This commit is contained in:
Jeff Hill
2009-08-13 23:49:59 +00:00
parent 8ace886cfe
commit e4075da4d7

View File

@@ -33,11 +33,13 @@ void casEventSys::show ( unsigned level ) const
if (level>=1u) {
printf ( "\numSubscriptions = %u, maxLogEntries = %u\n",
this->numSubscriptions, this->maxLogEntries );
printf ( "\tthere are %d events in the queue\n",
printf ( "\tthere are %d items in the event queue\n",
this->eventLogQue.count() );
printf ( "Replace events flag = %d, dontProcess flag = %d\n",
printf ( "\tthere are %d items in the io queue\n",
this->ioQue.count() );
printf ( "Replace events flag = %d, dontProcessSubscr flag = %d\n",
static_cast < int > ( this->replaceEvents ),
static_cast < int > ( this->dontProcess ) );
static_cast < int > ( this->dontProcessSubscr ) );
}
}
@@ -49,13 +51,14 @@ casEventSys::~casEventSys()
}
// at this point:
// o all channels delete
// o all IO deleted
// o all channels have been deleted
// o all IO has been deleted
// o any subscription events remaining on the queue
// are pending destroy
// verify above assertion is true
casVerify ( this->eventLogQue.count() == 0 );
casVerify ( this->ioQue.count() == 0 );
// all active subscriptions should also have been
// uninstalled
@@ -81,20 +84,18 @@ void casEventSys::removeMonitor ()
this->maxLogEntries -= averageEventEntries;
}
casEventSys::processStatus casEventSys::process (
casProcCond casEventSys :: process (
epicsGuard < casClientMutex > & casClientGuard )
{
casEventSys::processStatus ps;
ps.cond = casProcOk;
ps.nAccepted = 0u;
casProcCond cond = casProcOk;
epicsGuard < evSysMutex > evGuard ( this->mutex );
while ( ! this->dontProcess ) {
casEvent * pEvent;
pEvent = this->eventLogQue.get ();
// we need two queues, one for io and one for subscriptions,
// so that we dont hang up the server when in an IO postponed
// state simultaneouly with a flow control active state
while ( true ) {
casEvent * pEvent = this->ioQue.get ();
if ( pEvent == NULL ) {
break;
}
@@ -102,27 +103,59 @@ casEventSys::processStatus casEventSys::process (
caStatus status = pEvent->cbFunc (
this->client, casClientGuard, evGuard );
if ( status == S_cas_success ) {
ps.nAccepted++;
cond = casProcOk;
}
else if ( status == S_cas_sendBlocked ) {
// not accepted so return to the head of the list
// (we will try again later)
this->eventLogQue.push ( *pEvent );
ps.cond = casProcOk;
this->ioQue.push ( *pEvent );
cond = casProcOk;
break;
}
else if ( status == S_cas_disconnect ) {
ps.cond = casProcDisconnect;
cond = casProcDisconnect;
break;
}
else {
errMessage ( status,
"- unexpected error processing event" );
ps.cond = casProcDisconnect;
"- unexpected error, processing io queue" );
cond = casProcDisconnect;
break;
}
}
if ( cond == casProcOk ) {
while ( ! this->dontProcessSubscr ) {
casEvent * pEvent = this->eventLogQue.get ();
if ( pEvent == NULL ) {
break;
}
caStatus status = pEvent->cbFunc (
this->client, casClientGuard, evGuard );
if ( status == S_cas_success ) {
cond = casProcOk;
}
else if ( status == S_cas_sendBlocked ) {
// not accepted so return to the head of the list
// (we will try again later)
this->eventLogQue.push ( *pEvent );
cond = casProcOk;
break;
}
else if ( status == S_cas_disconnect ) {
cond = casProcDisconnect;
break;
}
else {
errMessage ( status,
"- unexpected error, processing event queue" );
cond = casProcDisconnect;
break;
}
}
}
//
// allows the derived class to be informed that it
// needs to delete itself via the event system
@@ -133,10 +166,10 @@ casEventSys::processStatus casEventSys::process (
// pointer.
//
if ( this->destroyPending ) {
ps.cond = casProcDisconnect;
cond = casProcDisconnect;
}
return ps;
return cond;
}
void casEventSys::eventsOn ()
@@ -151,7 +184,7 @@ void casEventSys::eventsOn ()
//
// allow the event queue to be processed
//
this->dontProcess = false;
this->dontProcessSubscr = false;
//
// remove purge event if it is still pending
@@ -188,7 +221,7 @@ bool casEventSys::eventsOff ()
// stop processing and sending events to the client
// until we exit flow control
//
this->dontProcess = true;
this->dontProcessSubscr = true;
}
else {
if ( this->eventLogQue.count() == 0 ) {
@@ -211,7 +244,7 @@ caStatus casEventPurgeEv::cbFunc (
epicsGuard < casClientMutex > &,
epicsGuard < evSysMutex > & )
{
this->evSys.dontProcess = true;
this->evSys.dontProcessSubscr = true;
this->evSys.pPurgeEvent = NULL;
delete this;
return S_cas_success;
@@ -229,51 +262,57 @@ caStatus casEventSys::addToEventQueue ( casAsyncIOI & event,
}
posted = true;
onTheQueue = true;
wakeupNeeded = ! this->dontProcess && this->eventLogQue.count() == 0;
this->eventLogQue.add ( event );
wakeupNeeded =
( this->dontProcessSubscr || this->eventLogQue.count() == 0 ) &&
this->ioQue.count() == 0;
this->ioQue.add ( event );
}
return S_cas_success;
}
void casEventSys::removeFromEventQueue ( casAsyncIOI & io, bool & onTheEventQueue )
void casEventSys::removeFromEventQueue ( casAsyncIOI & io, bool & onTheIOQueue )
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( onTheEventQueue ) {
onTheEventQueue = false;
this->eventLogQue.remove ( io );
if ( onTheIOQueue ) {
onTheIOQueue = false;
this->ioQue.remove ( io );
}
}
bool casEventSys::addToEventQueue ( casChannelI & event,
bool & inTheEventQueue )
bool & onTheIOQueue )
{
bool wakeupRequired = false;
bool wakeupNeeded = false;
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( ! inTheEventQueue ) {
inTheEventQueue = true;
wakeupRequired = ! this->dontProcess && this->eventLogQue.count()==0;
this->eventLogQue.add ( event );
if ( ! onTheIOQueue ) {
onTheIOQueue = true;
wakeupNeeded =
( this->dontProcessSubscr || this->eventLogQue.count() == 0 ) &&
this->ioQue.count() == 0;
this->ioQue.add ( event );
}
}
return wakeupRequired;
return wakeupNeeded;
}
void casEventSys::removeFromEventQueue ( class casChannelI & io,
bool & inTheEventQueue )
bool & onTheIOQueue )
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( inTheEventQueue ) {
inTheEventQueue = false;
this->eventLogQue.remove ( io );
if ( onTheIOQueue ) {
onTheIOQueue = false;
this->ioQue.remove ( io );
}
}
bool casEventSys::addToEventQueue ( channelDestroyEvent & event )
{
epicsGuard < epicsMutex > guard ( this->mutex );
bool wakeupRequired = ! this->dontProcess && this->eventLogQue.count()==0;
this->eventLogQue.add ( event );
bool wakeupRequired =
( this->dontProcessSubscr || this->eventLogQue.count() == 0 ) &&
this->ioQue.count() == 0;
this->ioQue.add ( event );
return wakeupRequired;
}
@@ -283,14 +322,10 @@ void casEventSys::setDestroyPending ()
this->destroyPending = true;
}
inline bool casEventSys::full () const // X aCC 361
inline bool casEventSys::full () const
{
if ( this->replaceEvents || this->eventLogQue.count() >= this->maxLogEntries ) {
return true;
}
else {
return false;
}
return this->replaceEvents ||
this->eventLogQue.count() >= this->maxLogEntries;
}
bool casEventSys::postEvent ( tsDLList < casMonitor > & monitorList,
@@ -323,9 +358,11 @@ bool casEventSys::postEvent ( tsDLList < casMonitor > & monitorList,
pLog = 0;
}
if ( this->eventLogQue.count() == 0 ) {
signalNeeded = true;
}
signalNeeded =
!this->dontProcessSubscr &&
this->eventLogQue.count() == 0 &&
this->ioQue.count() == 0;
iter->installNewEventLog (
this->eventLogQue, pLog, event );
}