Fixed issue "TCP Nameserver Connection Down Blocks All Callbacks" and cleaned up the SearchDest list / TCP circuit relation

* Pulled out tcpiiu::SearchDestTCP to have cac create the SearchDest independent from the TCP circuit
* Reorganized the relation between the SearchDestTCP and the TCP circuit:
  - SearchDest stays always on the cac list
  - TCP connection going down sets SearchDest inactive and SearchDestTCP's circuit member to NULL
  - Next seach request creates the new TCP circuit and sets SearchDestTCP's member, but leaves SearchDest inactive
  - Successful connect sets SearchDest active
* ReceiveThread releases the cac mutex before going to sleep in connect loop
This commit is contained in:
Ralph Lange
2010-04-15 17:06:16 -04:00
parent a0e868e2e1
commit be978d6499
4 changed files with 72 additions and 50 deletions

View File

@@ -258,9 +258,11 @@ cac::cac (
while ( osiSockAddrNode *
pNode = reinterpret_cast < osiSockAddrNode * > ( ellGet ( & dest ) ) ) {
tcpiiu * piiu = NULL;
SearchDestTCP * pdst = new SearchDestTCP ( *this, pNode->addr );
this->registerSearchDest ( guard, * pdst );
bool newIIU = findOrCreateVirtCircuit (
guard, pNode->addr, cacChannel::priorityDefault,
piiu, CA_UKN_MINOR_VERSION, true );
piiu, CA_UKN_MINOR_VERSION, pdst );
free ( pNode );
if ( newIIU ) {
piiu->start ( guard );
@@ -513,7 +515,7 @@ cacChannel & cac::createChannel (
bool cac::findOrCreateVirtCircuit (
epicsGuard < epicsMutex > & guard, const osiSockAddr & addr,
unsigned priority, tcpiiu *& piiu, unsigned minorVersionNumber,
const bool nameService )
SearchDestTCP * pSearchDest )
{
guard.assertIdenticalMutex ( this->mutex );
bool newIIU = false;
@@ -530,7 +532,7 @@ bool cac::findOrCreateVirtCircuit (
new ( this->freeListVirtualCircuit ) tcpiiu (
*this, this->mutex, this->cbMutex, this->notify, this->connTMO,
this->timerQueue, addr, this->comBufMemMgr, minorVersionNumber,
this->ipToAEngine, priority, nameService ) );
this->ipToAEngine, priority, pSearchDest ) );
bhe * pBHE = this->beaconTable.lookup ( addr.ia );
if ( ! pBHE ) {

View File

@@ -185,7 +185,7 @@ public:
epicsGuard < epicsMutex > &, SearchDest & req );
bool findOrCreateVirtCircuit (
epicsGuard < epicsMutex > &, const osiSockAddr &,
unsigned, tcpiiu *&, unsigned, const bool );
unsigned, tcpiiu *&, unsigned, SearchDestTCP * pSearchDest = NULL );
// diagnostics
unsigned circuitCount ( epicsGuard < epicsMutex > & ) const;

View File

@@ -450,6 +450,10 @@ void tcpRecvThread::run ()
this->iiu.cacRef.destroyIIU ( this->iiu );
return;
}
if ( this->iiu.isNameService () ) {
this->iiu.pSearchDest->setCircuit ( &this->iiu );
this->iiu.pSearchDest->enable ();
}
}
this->iiu.sendThread.start ();
@@ -631,7 +635,7 @@ void tcpRecvThread::connect (
continue;
}
else if ( errnoCpy == SOCK_SHUTDOWN ) {
if ( ! this->iiu._nameService ) {
if ( ! this->iiu.isNameService () ) {
break;
}
}
@@ -639,14 +643,18 @@ void tcpRecvThread::connect (
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf ( "CAC: Unable to connect because \"%s\"\n",
errlogPrintf ( "CAC: Unable to connect because \"%s\"\n",
sockErrBuf );
if ( ! this->iiu._nameService ) {
if ( ! this->iiu.isNameService () ) {
this->iiu.disconnectNotify ( guard );
break;
}
}
epicsThreadSleep ( this->iiu.cacRef.connectionTimeout ( guard ) );
{
double sleepTime = this->iiu.cacRef.connectionTimeout ( guard );
epicsGuardRelease < epicsMutex > unguard ( guard );
epicsThreadSleep ( sleepTime );
}
continue;
}
}
@@ -663,7 +671,7 @@ tcpiiu::tcpiiu (
comBufMemoryManager & comBufMemMgrIn,
unsigned minorVersion, ipAddrToAsciiEngine & engineIn,
const cacChannel::priLev & priorityIn,
const bool nameService ) :
SearchDestTCP * pSearchDestIn ) :
caServerID ( addrIn.ia, priorityIn ),
hostNameCacheInstance ( addrIn, engineIn ),
recvThread ( *this, cbMutexIn, ctxNotifyIn, "CAC-TCP-recv",
@@ -684,7 +692,7 @@ tcpiiu::tcpiiu (
comBufMemMgr ( comBufMemMgrIn ),
cacRef ( cac ),
pCurData ( cac.allocateSmallBufferTCP () ),
pSearchDest ( NULL ),
pSearchDest ( pSearchDestIn ),
mutex ( mutexIn ),
cbMutex ( cbMutexIn ),
minorProtocolVersion ( minorVersion ),
@@ -696,7 +704,6 @@ tcpiiu::tcpiiu (
unacknowledgedSendBytes ( 0u ),
channelCountTot ( 0u ),
_receiveThreadIsBusy ( false ),
_nameService ( nameService ),
busyStateDetected ( false ),
flowControlActive ( false ),
echoRequestPending ( false ),
@@ -821,12 +828,8 @@ tcpiiu::tcpiiu (
}
# endif
if ( _nameService ) {
#pragma message ( "exception thrown here might not cleanup socket" )
epicsGuard < epicsMutex > guard ( this->mutex );
SearchDestTCP * pSearchDestTCP = new SearchDestTCP ( this, cacRef, addrIn );
cacRef.registerSearchDest ( guard, *pSearchDestTCP );
this->pSearchDest = pSearchDestTCP;
if ( isNameService() ) {
pSearchDest->setCircuit ( this );
}
memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) );
@@ -1849,7 +1852,7 @@ void tcpiiu::disconnectAllChannels (
this->channelCountTot = 0u;
if ( ! _nameService ) {
if ( ! isNameService () ) {
this->initiateCleanShutdown ( guard );
}
}
@@ -1911,7 +1914,7 @@ void tcpiiu::unlinkAllChannels (
this->channelCountTot = 0u;
if ( ! _nameService ) {
if ( ! isNameService () ) {
this->initiateCleanShutdown ( guard );
}
}
@@ -1989,7 +1992,7 @@ void tcpiiu::uninstallChan (
}
chan.channelNode::listMember = channelNode::cs_none;
this->channelCountTot--;
if ( this->channelCountTot == 0 && ! _nameService ) {
if ( this->channelCountTot == 0 && ! this->isNameService() ) {
this->initiateCleanShutdown ( guard );
}
}
@@ -2109,38 +2112,43 @@ bool tcpiiu::searchMsg (
guard, id, pName, nameLength );
}
tcpiiu :: SearchDestTCP :: SearchDestTCP (
tcpiiu * tcpiiuIn, cac & cacIn, const osiSockAddr & addrIn ) :
_ptcpiiu ( tcpiiuIn ),
SearchDestTCP :: SearchDestTCP (
cac & cacIn, const osiSockAddr & addrIn ) :
_ptcpiiu ( NULL ),
_cac ( cacIn ),
_addr ( addrIn ),
_active ( true )
_active ( false )
{
}
void tcpiiu :: SearchDestTCP :: disable ()
void SearchDestTCP :: disable ()
{
_active = false;
_ptcpiiu = NULL;
}
void tcpiiu :: SearchDestTCP :: searchRequest (
void SearchDestTCP :: enable ()
{
_active = true;
}
void SearchDestTCP :: searchRequest (
epicsGuard < epicsMutex > & guard,
const char * pBuf, size_t len )
{
// restart circuit if it was shut down
if ( ! _active ) {
if ( ! _ptcpiiu ) {
tcpiiu * piiu = NULL;
bool newIIU = _cac.findOrCreateVirtCircuit (
guard, _addr, cacChannel::priorityDefault,
piiu, CA_UKN_MINOR_VERSION, true );
piiu, CA_UKN_MINOR_VERSION, this );
if ( newIIU ) {
piiu->start ( guard );
}
_ptcpiiu = piiu;
_active = true;
}
// does this server support TCP-based name resolution
// does this server support TCP-based name resolution?
if ( CA_V412 ( _ptcpiiu->minorProtocolVersion ) ) {
guard.assertIdenticalMutex ( _ptcpiiu->mutex );
assert ( CA_MESSAGE_ALIGN ( len ) == len );
@@ -2151,7 +2159,7 @@ void tcpiiu :: SearchDestTCP :: searchRequest (
}
}
void tcpiiu :: SearchDestTCP :: show (
void SearchDestTCP :: show (
epicsGuard < epicsMutex > & guard, unsigned level ) const
{
:: printf ( "tcpiiu :: SearchDestTCP\n" );

View File

@@ -95,16 +95,34 @@ private:
void run ();
};
class tcpiiu :
class SearchDestTCP : public SearchDest {
public:
SearchDestTCP ( cac &, const osiSockAddr & );
void searchRequest ( epicsGuard < epicsMutex > & guard,
const char * pbuf, size_t len );
void show ( epicsGuard < epicsMutex > & guard, unsigned level ) const;
void setCircuit ( tcpiiu * );
void disable ();
void enable ();
private:
tcpiiu * _ptcpiiu;
cac & _cac;
const osiSockAddr _addr;
bool _active;
};
class tcpiiu :
public netiiu, public tsDLNode < tcpiiu >,
public tsSLNode < tcpiiu >, public caServerID,
private wireSendAdapter, private wireRecvAdapter {
friend void SearchDestTCP::searchRequest ( epicsGuard < epicsMutex > & guard,
const char * pbuf, size_t len );
public:
tcpiiu ( cac & cac, epicsMutex & mutualExclusion, epicsMutex & callbackControl,
cacContextNotify &, double connectionTimeout, epicsTimerQueue & timerQueue,
const osiSockAddr & addrIn, comBufMemoryManager &, unsigned minorVersion,
ipAddrToAsciiEngine & engineIn, const cacChannel::priLev & priorityIn,
const bool nameService );
SearchDestTCP * pSearchDestIn = NULL);
~tcpiiu ();
void start (
epicsGuard < epicsMutex > & );
@@ -189,21 +207,6 @@ public:
tsFreeList < class tcpiiu, 32, epicsMutexNOOP > & ));
private:
class SearchDestTCP :
public SearchDest {
public:
SearchDestTCP ( tcpiiu *, cac &, const osiSockAddr & );
void searchRequest ( epicsGuard < epicsMutex > & guard,
const char * pbuf, size_t len );
void show ( epicsGuard < epicsMutex > & guard, unsigned level ) const;
void disable ();
private:
tcpiiu * _ptcpiiu;
cac & _cac;
const osiSockAddr _addr;
bool _active;
};
hostNameCache hostNameCacheInstance;
tcpRecvThread recvThread;
tcpSendThread sendThread;
@@ -246,7 +249,6 @@ private:
unsigned unacknowledgedSendBytes;
unsigned channelCountTot;
bool _receiveThreadIsBusy;
bool _nameService;
bool busyStateDetected; // only modified by the recv thread
bool flowControlActive; // only modified by the send process thread
bool echoRequestPending;
@@ -280,6 +282,7 @@ private:
bool bytesArePendingInOS () const;
void decrementBlockingForFlushCount (
epicsGuard < epicsMutex > & guard );
bool isNameService () const;
// send protocol stubs
void echoRequest (
@@ -409,5 +412,14 @@ inline void tcpiiu::probeResponseNotify (
this->recvDog.probeResponseNotify ( cbGuard );
}
#endif // ifdef virtualCircuith
inline bool tcpiiu::isNameService () const
{
return ( this->pSearchDest != NULL );
}
inline void SearchDestTCP::setCircuit ( tcpiiu * piiu )
{
_ptcpiiu = piiu;
}
#endif // ifdef virtualCircuith