more work on tcp shutdown sequence
This commit is contained in:
@@ -228,28 +228,25 @@ cac::~cac ()
|
||||
}
|
||||
|
||||
//
|
||||
// shutdown all tcp connections
|
||||
// shutdown all tcp circuits
|
||||
// (take both locks here in the proper order to avoid deadlocks)
|
||||
//
|
||||
tsSLList < tcpiiu > dest;
|
||||
{
|
||||
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
|
||||
epicsGuard < cacMutex > guard ( this->mutex );
|
||||
this->serverTable.removeAll ( dest );
|
||||
}
|
||||
|
||||
while ( tcpiiu * pIIU = dest.get () ) {
|
||||
{
|
||||
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
|
||||
this->privateUninstallIIU ( cbGuard, *pIIU );
|
||||
resTableIter < tcpiiu, caServerID > iter = this->serverTable.firstIter ();
|
||||
while ( iter.valid() ) {
|
||||
iter->initiateAbortShutdown ( cbGuard, guard );
|
||||
iter++;
|
||||
}
|
||||
// this will block for oustanding sends to go out so dont
|
||||
// hold a lock while destroying the tcpiiu
|
||||
delete pIIU;
|
||||
}
|
||||
|
||||
//
|
||||
// wait for tcp threads to exit
|
||||
//
|
||||
// this will block for oustanding sends to go out so dont
|
||||
// hold a lock while waiting
|
||||
//
|
||||
while ( this->serverTable.numEntriesInstalled() ) {
|
||||
this->iiuUninstall.wait ();
|
||||
}
|
||||
@@ -570,7 +567,7 @@ bool cac::lookupChannelAndTransferToTCP (
|
||||
}
|
||||
}
|
||||
|
||||
this->pudpiiu->uninstallChanAndReturnDestroyPtr ( guard, *pChan );
|
||||
this->pudpiiu->uninstallChan ( guard, *pChan );
|
||||
piiu->installChannel ( guard, *pChan, sid, typeCode, count );
|
||||
|
||||
v41Ok = piiu->ca_v41_ok ();
|
||||
@@ -674,7 +671,6 @@ void cac::uninstallChannel ( nciu & chan )
|
||||
}
|
||||
}
|
||||
|
||||
tcpiiu * pDestroyIIU;
|
||||
{
|
||||
// taking this mutex prior to deleting the IO and channel guarantees
|
||||
// that we will not delete a channel out from under a callback
|
||||
@@ -697,18 +693,9 @@ void cac::uninstallChannel ( nciu & chan )
|
||||
// o chan destroy exception has been delivered
|
||||
{
|
||||
epicsGuard < cacMutex > guard ( this->mutex );
|
||||
pDestroyIIU = chan.getPIIU()->uninstallChanAndReturnDestroyPtr
|
||||
( guard, chan );
|
||||
if ( pDestroyIIU ) {
|
||||
this->serverTable.remove ( *pDestroyIIU );
|
||||
this->privateUninstallIIU ( cbGuard, *pDestroyIIU );
|
||||
}
|
||||
chan.getPIIU()->uninstallChan ( guard, chan );
|
||||
}
|
||||
}
|
||||
|
||||
// this blocks for all outstanding messages to be sent so
|
||||
// no lock can be held here
|
||||
delete pDestroyIIU;
|
||||
}
|
||||
|
||||
int cac::printf ( const char *pformat, ... ) const
|
||||
@@ -1518,6 +1505,12 @@ void cac::initiateAbortShutdown ( tcpiiu & iiu )
|
||||
}
|
||||
}
|
||||
|
||||
void cac::uninstallIIU ( tcpiiu & iiu )
|
||||
{
|
||||
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
|
||||
this->privateUninstallIIU ( cbGuard, iiu );
|
||||
}
|
||||
|
||||
void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbGuard, tcpiiu & iiu )
|
||||
{
|
||||
epicsGuard < cacMutex > guard ( this->mutex );
|
||||
@@ -1538,6 +1531,8 @@ void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbGuard, tcpiiu &
|
||||
assert ( this->pudpiiu );
|
||||
iiu.removeAllChannels ( cbGuard, guard, *this );
|
||||
|
||||
this->serverTable.remove ( iiu );
|
||||
|
||||
this->pudpiiu->resetSearchTimerPeriod ( 0.0 );
|
||||
|
||||
// signal iiu uninstal event so that cac can properly shut down
|
||||
|
||||
@@ -177,6 +177,7 @@ public:
|
||||
static unsigned lowestPriorityLevelAbove ( unsigned priority );
|
||||
static unsigned highestPriorityLevelBelow ( unsigned priority );
|
||||
void initiateAbortShutdown ( tcpiiu & );
|
||||
void uninstallIIU ( tcpiiu & );
|
||||
|
||||
private:
|
||||
ipAddrToAsciiEngine ipToAEngine;
|
||||
|
||||
@@ -108,10 +108,9 @@ osiSockAddr limboiiu::getNetworkAddress () const
|
||||
return netiiu::getNetworkAddress ();
|
||||
}
|
||||
|
||||
class tcpiiu * limboiiu::uninstallChanAndReturnDestroyPtr
|
||||
( epicsGuard < cacMutex > & guard, nciu & chan )
|
||||
void limboiiu::uninstallChan ( epicsGuard < cacMutex > & guard, nciu & chan )
|
||||
{
|
||||
return netiiu::uninstallChanAndReturnDestroyPtr ( guard, chan );
|
||||
return netiiu::uninstallChan( guard, chan );
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -103,8 +103,7 @@ void netiiu::requestRecvProcessPostponedFlush ()
|
||||
return;
|
||||
}
|
||||
|
||||
class tcpiiu * netiiu::uninstallChanAndReturnDestroyPtr
|
||||
( epicsGuard < cacMutex > &, nciu & )
|
||||
void netiiu::uninstallChan ( epicsGuard < cacMutex > &, nciu & )
|
||||
{
|
||||
throw cacChannel::notConnected();
|
||||
}
|
||||
|
||||
@@ -56,8 +56,7 @@ public:
|
||||
( cacNotify &, epicsGuard < cacMutex > & ) = 0;
|
||||
virtual void requestRecvProcessPostponedFlush () = 0;
|
||||
virtual osiSockAddr getNetworkAddress () const = 0;
|
||||
virtual class tcpiiu * uninstallChanAndReturnDestroyPtr
|
||||
( epicsGuard < cacMutex > &, nciu & ) = 0;
|
||||
virtual void uninstallChan ( epicsGuard < cacMutex > &, nciu & ) = 0;
|
||||
};
|
||||
|
||||
class limboiiu : public netiiu { // X aCC 655
|
||||
@@ -86,8 +85,7 @@ private:
|
||||
( cacNotify &, epicsGuard < cacMutex > & );
|
||||
void requestRecvProcessPostponedFlush ();
|
||||
osiSockAddr getNetworkAddress () const;
|
||||
class tcpiiu * uninstallChanAndReturnDestroyPtr
|
||||
( epicsGuard < cacMutex > &, nciu & );
|
||||
void uninstallChan ( epicsGuard < cacMutex > &, nciu & );
|
||||
limboiiu ( const limboiiu & );
|
||||
limboiiu & operator = ( const limboiiu & );
|
||||
};
|
||||
|
||||
@@ -114,6 +114,16 @@ void tcpSendThread::run ()
|
||||
while ( this->iiu.blockingForFlush ) {
|
||||
epicsThreadSleep ( 0.1 );
|
||||
}
|
||||
|
||||
this->iiu.recvThread.exitWait ();
|
||||
|
||||
this->thread.exitWaitRelease ();
|
||||
|
||||
this->iiu.cacRef.uninstallIIU ( this->iiu );
|
||||
|
||||
delete & this->iiu;
|
||||
|
||||
epicsThread::exit ();
|
||||
}
|
||||
|
||||
unsigned tcpiiu::sendBytes ( const void *pBuf,
|
||||
@@ -257,13 +267,20 @@ void tcpRecvThread::run ()
|
||||
|
||||
this->iiu.connect ();
|
||||
|
||||
if ( this->iiu.state == tcpiiu::iiucs_connected ) {
|
||||
this->iiu.sendThread.start ();
|
||||
}
|
||||
else {
|
||||
if ( this->iiu.state != tcpiiu::iiucs_connected ) {
|
||||
this->iiu.cacRef.initiateAbortShutdown ( this->iiu );
|
||||
|
||||
this->thread.exitWaitRelease ();
|
||||
|
||||
this->iiu.cacRef.uninstallIIU ( this->iiu );
|
||||
|
||||
delete & this->iiu;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
this->iiu.sendThread.start ();
|
||||
|
||||
comBuf * pComBuf = new comBuf;
|
||||
while ( this->iiu.state == tcpiiu::iiucs_connected ||
|
||||
this->iiu.state == tcpiiu::iiucs_clean_shutdown ) {
|
||||
@@ -373,8 +390,9 @@ void tcpRecvThread::run ()
|
||||
}
|
||||
}
|
||||
catch ( ... ) {
|
||||
errlogPrintf ("cac tcp receive thread terminating due to a c++ exception\n" );
|
||||
}
|
||||
errlogPrintf ( "cac tcp receive thread terminating due to a c++ exception\n" );
|
||||
this->iiu.cacRef.initiateAbortShutdown ( this->iiu );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -556,8 +574,17 @@ void tcpiiu::connect ()
|
||||
}
|
||||
}
|
||||
|
||||
void tcpiiu::initiateCleanShutdown ( epicsGuard < cacMutex > & )
|
||||
{
|
||||
if ( this->state == iiucs_connected || this->state == iiucs_connecting ) {
|
||||
this->state = iiucs_clean_shutdown;
|
||||
}
|
||||
this->sendThreadFlushEvent.signal ();
|
||||
}
|
||||
|
||||
|
||||
void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard,
|
||||
epicsGuard < cacMutex > & guard )
|
||||
epicsGuard <cacMutex > & guard )
|
||||
{
|
||||
if ( this->state != iiucs_abort_shutdown ) {
|
||||
this->state = iiucs_abort_shutdown;
|
||||
@@ -607,12 +634,9 @@ tcpiiu::~tcpiiu ()
|
||||
{
|
||||
{
|
||||
epicsGuard < cacMutex > guard ( this->cacRef.mutexRef() );
|
||||
if ( this->state == iiucs_connected || this->state == iiucs_connecting ) {
|
||||
this->state = iiucs_clean_shutdown;
|
||||
}
|
||||
this->initiateCleanShutdown ( guard );
|
||||
}
|
||||
|
||||
this->sendThreadFlushEvent.signal ();
|
||||
this->sendThread.exitWait ();
|
||||
this->recvThread.exitWait ();
|
||||
|
||||
@@ -1218,15 +1242,12 @@ void tcpiiu::installChannel ( epicsGuard < cacMutex > &, nciu & chan, unsigned s
|
||||
this->flushRequest ();
|
||||
}
|
||||
|
||||
tcpiiu * tcpiiu::uninstallChanAndReturnDestroyPtr
|
||||
void tcpiiu::uninstallChan
|
||||
( epicsGuard < cacMutex > & guard, nciu & chan )
|
||||
{
|
||||
this->channelList.remove ( chan );
|
||||
if ( channelList.count() == 0 ) {
|
||||
return this;
|
||||
}
|
||||
else {
|
||||
return 0;
|
||||
this->initiateCleanShutdown ( guard );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -995,12 +995,11 @@ int udpiiu::printf ( const char *pformat, ... )
|
||||
return status;
|
||||
}
|
||||
|
||||
class tcpiiu * udpiiu::uninstallChanAndReturnDestroyPtr (
|
||||
void udpiiu::uninstallChan (
|
||||
epicsGuard < cacMutex > &, nciu & chan )
|
||||
{
|
||||
epicsGuard < udpMutex > guard ( this->mutex );
|
||||
this->channelList.remove ( chan );
|
||||
return 0;
|
||||
}
|
||||
|
||||
void udpiiu::hostName ( char *pBuf, unsigned bufLength ) const
|
||||
|
||||
@@ -88,7 +88,7 @@ public:
|
||||
void beaconAnomalyNotify ();
|
||||
int printf ( const char *pformat, ... );
|
||||
unsigned channelCount ();
|
||||
class tcpiiu * uninstallChanAndReturnDestroyPtr ( epicsGuard < cacMutex > &, nciu & );
|
||||
void uninstallChan ( epicsGuard < cacMutex > &, nciu & );
|
||||
bool pushDatagramMsg ( const caHdr &hdr, const void *pExt, ca_uint16_t extsize);
|
||||
void shutdown ();
|
||||
|
||||
|
||||
@@ -64,6 +64,7 @@ public:
|
||||
virtual ~tcpSendThread ();
|
||||
void start ();
|
||||
void exitWait ();
|
||||
void exitWaitRelease ();
|
||||
private:
|
||||
class tcpiiu & iiu;
|
||||
epicsThread thread;
|
||||
@@ -81,8 +82,8 @@ public:
|
||||
const cacChannel::priLev & priorityIn );
|
||||
~tcpiiu ();
|
||||
void start ( epicsGuard < callbackMutex > & );
|
||||
void initiateAbortShutdown (
|
||||
epicsGuard < callbackMutex > &, epicsGuard < cacMutex > & );
|
||||
void initiateAbortShutdown ( epicsGuard < callbackMutex > & cbGuard,
|
||||
epicsGuard <cacMutex > & guard );
|
||||
void beaconAnomalyNotify ();
|
||||
void beaconArrivalNotify ();
|
||||
|
||||
@@ -90,7 +91,7 @@ public:
|
||||
bool flushBlockThreshold ( epicsGuard < cacMutex > & ) const;
|
||||
void flushRequestIfAboveEarlyThreshold ( epicsGuard < cacMutex > & );
|
||||
void blockUntilSendBacklogIsReasonable
|
||||
( cacNotify & notify, epicsGuard < cacMutex > & primaryGuard );
|
||||
( cacNotify &, epicsGuard < cacMutex > & );
|
||||
virtual void show ( unsigned level ) const;
|
||||
bool setEchoRequestPending ();
|
||||
void createChannelRequest ( nciu & );
|
||||
@@ -113,8 +114,7 @@ public:
|
||||
class cacDisconnectChannelPrivate & );
|
||||
void installChannel ( epicsGuard < cacMutex > &, nciu & chan,
|
||||
unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn );
|
||||
class tcpiiu * uninstallChanAndReturnDestroyPtr (
|
||||
epicsGuard < cacMutex > &, nciu & chan );
|
||||
void uninstallChan ( epicsGuard < cacMutex > &, nciu & chan );
|
||||
|
||||
private:
|
||||
tcpRecvThread recvThread;
|
||||
@@ -149,6 +149,7 @@ private:
|
||||
bool earlyFlush;
|
||||
bool recvProcessPostponedFlush;
|
||||
|
||||
void initiateCleanShutdown ( epicsGuard < cacMutex > & );
|
||||
bool processIncoming ( epicsGuard < callbackMutex > & );
|
||||
unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf );
|
||||
unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf );
|
||||
|
||||
Reference in New Issue
Block a user