moved someof the functionality into the old interface and improved shutdown sequence

This commit is contained in:
Jeff Hill
2002-05-09 00:36:55 +00:00
parent 71827bb2ce
commit 07bf67ef2f

View File

@@ -131,13 +131,11 @@ cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) :
pudpiiu ( 0 ),
tcpSmallRecvBufFreeList ( 0 ),
tcpLargeRecvBufFreeList ( 0 ),
pCallbackGuard ( 0 ),
notify ( notifyIn ),
initializingThreadsId ( epicsThreadGetIdSelf() ),
initializingThreadsPriority ( epicsThreadGetPrioritySelf() ),
maxRecvBytesTCP ( MAX_TCP ),
pndRecvCnt ( 0u ),
readSeq ( 0u )
preemptiveCallbackEnabled ( enablePreemptiveCallbackIn )
{
if ( ! osiSockAttach () ) {
throwWithLocation ( caErrorCode (ECA_INTERNAL) );
@@ -202,10 +200,6 @@ cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) :
if ( ! this->tcpLargeRecvBufFreeList ) {
throw std::bad_alloc ();
}
if ( ! enablePreemptiveCallbackIn ) {
this->pCallbackGuard = new epicsGuard < callbackMutex > ( this->cbMutex );
}
}
catch ( ... ) {
osiSockRelease ();
@@ -223,14 +217,6 @@ cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) :
cac::~cac ()
{
//
// release callback lock
// (disconnect callbacks will occur if they still have connected
// channels at this point)
//
delete this->pCallbackGuard;
//
// lock intentionally not held here so that we dont deadlock
// waiting for the UDP thread to exit while it is waiting to
@@ -239,16 +225,26 @@ cac::~cac ()
// this blocks until the UDP thread exits so that
// it will not sneak in any new clients
delete this->pudpiiu;
this->pudpiiu = 0; // for tcpCircuitShutdown
this->pudpiiu = 0; // for initiateAbortShutdown()
//
// shutdown all tcp connections
// (take both locks here in the proper order to avoid deadlocks)
//
tsSLList < tcpiiu > dest;
{
epicsGuard < callbackMutex > autoMutexCB ( this->cbMutex );
epicsGuard < cacMutex > autoMutexCAC ( this->mutex );
this->serverTable.traverse ( & tcpiiu::cleanShutdown );
epicsGuard < cacMutex > guard ( this->mutex );
this->serverTable.removeAll ( dest );
}
while ( tcpiiu * pIIU = dest.get () ) {
{
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
this->privateUninstallIIU ( cbGuard, *pIIU );
}
// this will block for oustanding sends to go out so dont
// hold a lock while destroying the tcpiiu
delete pIIU;
}
//
@@ -308,13 +304,7 @@ unsigned cac::highestPriorityLevelBelow ( unsigned priority )
//
void cac::flushRequest ()
{
epicsGuard < cacMutex > autoMutex ( this->mutex );
this->flushRequestPrivate ();
}
// lock must be applied
void cac::flushRequestPrivate ()
{
epicsGuard < cacMutex > guard ( this->mutex );
this->serverTable.traverse ( & tcpiiu::flushRequest );
}
@@ -333,8 +323,6 @@ void cac::show ( unsigned level ) const
if ( level > 0u ) {
this->serverTable.show ( level - 1u );
::printf ( "\tconnection time out watchdog period %f\n", this->connTMO );
::printf ( "\tpreemptive calback is %s\n",
this->pCallbackGuard ? "disabled" : "enabled" );
::printf ( "list of installed services:\n" );
this->services.show ( level - 1u );
}
@@ -343,8 +331,6 @@ void cac::show ( unsigned level ) const
if ( this->pudpiiu ) {
this->pudpiiu->show ( level - 2u );
}
::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n",
this->pndRecvCnt );
}
if ( level > 2u ) {
@@ -362,15 +348,11 @@ void cac::show ( unsigned level ) const
this->timerQueue.show ( level - 3u );
::printf ( "IP address to name conversion engine:\n" );
this->ipToAEngine.show ( level - 3u );
::printf ( "\tthe current read sequence number is %u\n",
this->readSeq );
}
if ( level > 3u ) {
::printf ( "Default mutex:\n");
this->mutex.show ( level - 4u );
::printf ( "IO done event:\n");
this->ioDone.show ( level - 3u );
::printf ( "mutex:\n" );
this->mutex.show ( level - 3u );
}
@@ -429,130 +411,6 @@ void cac::beaconNotify ( const inetAddrID & addr, const epicsTime & currentTime,
# endif
}
// !!!! This routine is only visible in the old interface - or in a new ST interface.
// !!!! In the old interface we restrict thread attach so that calls from threads
// !!!! other than the initializing thread are not allowed if preemptive callback
// !!!! is disabled. This prevents the preemptive callback lock from being released
// !!!! by other threads than the one that locked it.
//
// this routine should probably be moved to the oldCAC?
int cac::pendIO ( const double & timeout )
{
// prevent recursion nightmares by disabling calls to
// pendIO () from within a CA callback.
if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
return ECA_EVDISALLOW;
}
int status = ECA_NORMAL;
epicsTime beg_time = epicsTime::getCurrent ();
double remaining = timeout;
{
epicsGuard < cacMutex > guard ( this->mutex );
this->flushRequestPrivate ();
}
while ( this->pndRecvCnt > 0 ) {
if ( remaining < CAC_SIGNIFICANT_DELAY ) {
status = ECA_TIMEOUT;
break;
}
if ( this->pCallbackGuard ) {
epicsGuardRelease < callbackMutex > cbRelease ( *this->pCallbackGuard );
this->ioDone.wait ( remaining );
}
else {
this->ioDone.wait ( remaining );
}
double delay = epicsTime::getCurrent () - beg_time;
if ( delay < timeout ) {
remaining = timeout - delay;
}
else {
remaining = 0.0;
}
}
{
epicsGuard < cacMutex > guard ( this->mutex );
this->readSeq++;
this->pndRecvCnt = 0u;
}
if ( this->pudpiiu ) {
this->pudpiiu->pendioTimeoutNotify ();
}
return status;
}
int cac::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout )
{
if ( this->pCallbackGuard ) {
epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard );
event.wait ( timeout );
}
else {
event.wait ( timeout );
}
return ECA_NORMAL;
}
// !!!! This routine is only visible in the old interface - or in a new ST interface.
// !!!! In the old interface we restrict thread attach so that calls from threads
// !!!! other than the initializing thread are not allowed if preemptive callback
// !!!! is disabled. This prevents the preemptive callback lock from being released
// !!!! by other threads than the one that locked it.
//
// this routine should probably be moved to the oldCAC?
int cac::pendEvent ( const double & timeout )
{
// prevent recursion nightmares by disabling calls to
// pendIO () from within a CA callback.
if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
return ECA_EVDISALLOW;
}
epicsTime current = epicsTime::getCurrent ();
{
epicsGuard < cacMutex > guard ( this->mutex );
this->flushRequestPrivate ();
}
// process at least once if preemptive callback is disabled
if ( this->pCallbackGuard ) {
epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard );
this->cbMutex.waitUntilNoRecvThreadsPending ();
}
double elapsed = epicsTime::getCurrent() - current;
double delay;
if ( timeout > elapsed ) {
delay = timeout - elapsed;
}
else {
delay = 0.0;
}
if ( delay >= CAC_SIGNIFICANT_DELAY ) {
if ( this->pCallbackGuard ) {
epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard );
epicsThreadSleep ( delay );
}
else {
epicsThreadSleep ( delay );
}
}
return ECA_TIMEOUT;
}
void cac::installCASG ( CASG &sg )
{
epicsGuard < cacMutex > guard ( this->mutex );
@@ -608,7 +466,6 @@ cacChannel & cac::createChannel ( const char * pName,
epicsGuard < cacMutex > guard ( this->mutex );
epics_auto_ptr < nciu > pNetChan
( new nciu ( *this, *this->pudpiiu, chan, pName, pri ) );
pNetChan->notifyStateChangeFirstConnectInCountOfOutstandingIO ();
this->chanTable.add ( *pNetChan );
return *pNetChan.release ();
}
@@ -714,7 +571,7 @@ bool cac::lookupChannelAndTransferToTCP (
}
}
this->pudpiiu->uninstallChannel ( cbGuard, guard, *pChan );
this->pudpiiu->uninstallChanAndReturnDestroyPtr ( guard, *pChan );
piiu->installChannel ( guard, *pChan, sid, typeCode, count );
v41Ok = piiu->ca_v41_ok ();
@@ -818,23 +675,7 @@ void cac::uninstallChannel ( nciu & chan )
}
}
// take callback lock to ensure that any callbacks in
// progress complete prior to destroying channel and any
// associated IO
//
// If this is a callback thread then it already owns the
// CB lock at this point. If this is the main thread then
// it is possible that this->pCallbackGuard already owns
// the callback lock. Otherwise if this is a preemptive
// callback enabled app then this->pCallbackGuard
// isnt set and we must take the call back lock.
//
// We take the callback lock again here and rely on recursive
// mutex capabilities, but perhaps this should be handled
// differently in the future.
//
// bool alreadyLocked = epicsThreadPrivateGet ( caClientCallbackThreadId )
// || this->pCallbackGuard;
tcpiiu * pDestroyIIU;
{
// taking this mutex prior to deleting the IO and channel guarantees
// that we will not delete a channel out from under a callback
@@ -857,10 +698,18 @@ void cac::uninstallChannel ( nciu & chan )
// o chan destroy exception has been delivered
{
epicsGuard < cacMutex > guard ( this->mutex );
// this destroys the tcpiiu if its the last channel
chan.getPIIU()->uninstallChannel ( cbGuard, guard, chan );
pDestroyIIU = chan.getPIIU()->uninstallChanAndReturnDestroyPtr
( guard, chan );
if ( pDestroyIIU ) {
this->serverTable.remove ( *pDestroyIIU );
this->privateUninstallIIU ( cbGuard, *pDestroyIIU );
}
}
}
// this blocks for all outstanding messages to be sent so
// no lock can be held here
delete pDestroyIIU;
}
int cac::printf ( const char *pformat, ... ) const
@@ -892,13 +741,7 @@ void cac::flushIfRequired ( epicsGuard < cacMutex > & guard, netiiu & iiu )
// enable / disable of call back preemption must occur here
// because the tcpiiu might disconnect while waiting and its
// pointer to this cac might become invalid
if ( this->pCallbackGuard ) {
iiu.blockUntilSendBacklogIsReasonable
( this->pCallbackGuard, guard );
}
else {
iiu.blockUntilSendBacklogIsReasonable ( 0, guard );
}
iiu.blockUntilSendBacklogIsReasonable ( this->notify, guard );
}
}
else {
@@ -970,18 +813,7 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
}
// wait for any IO callbacks in progress to complete
// prior to destroying the IO object
//
// If this is a callback thread then it already owns the
// CB lock at this point. If this is the main thread then we
// are not in pendEvent, pendIO, SG block, etc and
// this->pCallbackGuard protects. Otherwise if this id
// the users auxillary thread then this->pCallbackGuard
// isnt set and we must take the call back lock.
bool alreadyLocked = ( epicsThreadPrivateGet ( caClientCallbackThreadId )
|| this->pCallbackGuard );
if ( ! alreadyLocked ) {
epicsGuard < callbackMutex > autoMutex ( this->cbMutex );
}
epicsGuard < callbackMutex > autoMutex ( this->cbMutex );
// now it is safe to destroy the IO object
{
epicsGuard < cacMutex > autoMutex ( this->mutex );
@@ -1651,72 +1483,6 @@ void cac::vSignal ( int ca_status, const char *pfilenm,
this->printf ( "..................................................................\n" );
}
void cac::incrementOutstandingIO ()
{
epicsGuard < cacMutex > guard ( this->mutex );
if ( this->pndRecvCnt < UINT_MAX ) {
this->pndRecvCnt++;
}
else {
throwWithLocation ( caErrorCode (ECA_INTERNAL) );
}
}
void cac::decrementOutstandingIO ()
{
bool signalNeeded;
{
epicsGuard < cacMutex > guard ( this->mutex );
if ( this->pndRecvCnt > 0u ) {
this->pndRecvCnt--;
if ( this->pndRecvCnt == 0u ) {
signalNeeded = true;
}
else {
signalNeeded = false;
}
}
else {
signalNeeded = true;
}
}
if ( signalNeeded ) {
this->ioDone.signal ();
}
}
void cac::decrementOutstandingIO ( unsigned sequenceNo )
{
bool signalNeeded;
{
epicsGuard < cacMutex > guard ( this->mutex );
if ( this->readSeq == sequenceNo ) {
if ( this->pndRecvCnt > 0u ) {
this->pndRecvCnt--;
if ( this->pndRecvCnt == 0u ) {
signalNeeded = true;
}
else {
signalNeeded = false;
}
}
else {
signalNeeded = true;
}
}
else {
signalNeeded = false;
}
}
if ( signalNeeded ) {
this->ioDone.signal ();
}
}
void cac::selfTest () const
{
this->chanTable.verify ();
@@ -1735,7 +1501,7 @@ void cac::notifyDestroyFD ( epicsGuard < callbackMutex > &, SOCKET sock ) const
this->notify.fdWasDestroyed ( sock );
}
void cac::tcpCircuitShutdown ( tcpiiu & iiu, bool discardPendingMessages )
void cac::initiateAbortShutdown ( tcpiiu & iiu )
{
// generate some NOOP UDP traffic so that ca_pend_event()
// will get called in preemptive callback disabled
@@ -1747,16 +1513,10 @@ void cac::tcpCircuitShutdown ( tcpiiu & iiu, bool discardPendingMessages )
{
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
epicsGuard < cacMutex > guard ( this->mutex );
iiu.shutdown ( cbGuard, guard, discardPendingMessages );
iiu.initiateAbortShutdown ( cbGuard, guard );
}
}
void cac::uninstallIIU ( tcpiiu & iiu )
{
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
this->privateUninstallIIU ( cbGuard, iiu );
}
void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbGuard, tcpiiu & iiu )
{
epicsGuard < cacMutex > guard ( this->mutex );
@@ -1773,16 +1533,10 @@ void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbGuard, tcpiiu &
pBHE->unregisterIIU ( iiu );
}
}
// dont allow any channels to jump back onto
// this iiu while the lock is removed below
this->serverTable.remove ( iiu );
assert ( this->pudpiiu );
iiu.removeAllChannels ( cbGuard, guard, *this );
delete &iiu;
this->pudpiiu->resetSearchTimerPeriod ( 0.0 );
// signal iiu uninstal event so that cac can properly shut down