fixed disconnect callback when channel known to be disconnected

This commit is contained in:
Jeff Hill
2003-09-10 17:09:25 +00:00
parent 36b1f9e4ff
commit 245cf129ff
14 changed files with 170 additions and 149 deletions
+2 -1
View File
@@ -118,7 +118,8 @@ const char * ca_message_text []
"User destroyed channel",
"Invalid channel priority",
"Preemptive callback not enabled - additional threads may not join context",
"Client's protocol revision does not support transfers exceeding 16k bytes"
"Client's protocol revision does not support transfers exceeding 16k bytes",
"Virtual circuit connection sequence aborted"
};
static epicsThreadOnceId caClientContextIdOnce = EPICS_THREAD_ONCE_INIT;
+1
View File
@@ -481,6 +481,7 @@ void ca_client_context::blockForEventAndEnableCallbacks (
void ca_client_context::callbackLock ()
{
// if preemptive callback is enabled then this is a noop
if ( this->pCallbackGuard.get() ) {
bool sendNeeded = false;
+57 -92
View File
@@ -534,15 +534,13 @@ bool cac::transferChanToVirtCircuit (
return false;
}
bool v41Ok, v42Ok;
nciu *pChan;
{
epicsGuard < cacMutex > guard ( this->mutex );
/*
* ignore search replies for deleted channels
*/
pChan = this->chanTable.lookup ( cid );
nciu * pChan = this->chanTable.lookup ( cid );
if ( ! pChan ) {
return false;
}
@@ -607,36 +605,9 @@ bool cac::transferChanToVirtCircuit (
this->pudpiiu->uninstallChan ( guard, *pChan );
piiu->installChannel ( guard, *pChan, sid, typeCode, count );
v41Ok = piiu->ca_v41_ok ();
v42Ok = piiu->ca_v42_ok ();
if ( ! v42Ok ) {
if ( ! piiu->ca_v42_ok () ) {
// connect to old server with lock applied
pChan->connect ();
// resubscribe for monitors from this channel
this->connectAllIO ( guard, *pChan );
}
}
if ( ! v42Ok ) {
// channel uninstal routine grabs the callback lock so
// a channel will not be deleted while a call back is
// in progress
//
// the callback lock is also taken when a channel
// disconnects to prevent a race condition with the
// code below - ie we hold the callback lock here
// so a chanel cant be destroyed out from under us.
pChan->connectStateNotify ( cbGuard );
/*
* if less than v4.1 then the server will never
* send access rights and we know that there
* will always be access and also need to call
* their call back here
*/
if ( ! v41Ok ) {
pChan->accessRightsNotify ( cbGuard );
pChan->connect ( cbGuard, guard );
}
}
@@ -1027,7 +998,7 @@ void cac::connectAllIO ( epicsGuard < cacMutex > & guard, nciu & chan )
while ( pNetIO.valid () ) {
tsDLIter < baseNMIU > next = pNetIO;
next++;
class netSubscription *pSubscr = pNetIO->isSubscription ();
class netSubscription * pSubscr = pNetIO->isSubscription ();
// disconnected channels should have only subscription IO attached
assert ( pSubscr );
try {
@@ -1337,26 +1308,14 @@ bool cac::accessRightsRespAction (
epicsGuard < callbackMutex > & cbGuard, tcpiiu &, // X aCC 431
const epicsTime &, const caHdrLargeArray & hdr, void * /* pMsgBdy */ )
{
nciu * pChan;
{
epicsGuard < cacMutex > guard ( this->mutex );
pChan = this->chanTable.lookup ( hdr.m_cid );
if ( pChan ) {
unsigned ar = hdr.m_available;
caAccessRights accessRights (
( ar & CA_PROTO_ACCESS_RIGHT_READ ) ? true : false,
( ar & CA_PROTO_ACCESS_RIGHT_WRITE ) ? true : false);
pChan->accessRightsStateChange ( accessRights );
}
}
//
// the channel delete routine takes the call back lock so
// that this will not be called when the channel is being
// deleted.
//
epicsGuard < cacMutex > guard ( this->mutex );
nciu * pChan = this->chanTable.lookup ( hdr.m_cid );
if ( pChan ) {
pChan->accessRightsNotify ( cbGuard );
unsigned ar = hdr.m_available;
caAccessRights accessRights (
( ar & CA_PROTO_ACCESS_RIGHT_READ ) ? true : false,
( ar & CA_PROTO_ACCESS_RIGHT_WRITE ) ? true : false);
pChan->accessRightsStateChange ( accessRights, cbGuard, guard );
}
return true;
@@ -1366,33 +1325,28 @@ bool cac::claimCIURespAction (
epicsGuard < callbackMutex > &cbGuard, tcpiiu & iiu, // X aCC 431
const epicsTime &, const caHdrLargeArray & hdr, void * /* pMsgBdy */ )
{
nciu * pChan;
{
epicsGuard < cacMutex > guard ( this->mutex );
pChan = this->chanTable.lookup ( hdr.m_cid );
if ( pChan ) {
unsigned sidTmp;
if ( iiu.ca_v44_ok() ) {
sidTmp = hdr.m_available;
}
else {
sidTmp = pChan->getSID ();
}
pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp, iiu.ca_v41_ok() );
this->connectAllIO ( guard, *pChan );
}
else if ( iiu.ca_v44_ok() ) {
// this indicates a claim response for a resource that does
// not exist in the client - so just remove it from the server
iiu.clearChannelRequest ( guard, hdr.m_available, hdr.m_cid );
}
}
// the callback lock is taken when a channel is unistalled or when
// is disconnected to prevent race conditions here
epicsGuard < cacMutex > guard ( this->mutex );
nciu * pChan = this->chanTable.lookup ( hdr.m_cid );
if ( pChan ) {
pChan->connectStateNotify ( cbGuard );
unsigned sidTmp;
if ( iiu.ca_v44_ok() ) {
sidTmp = hdr.m_available;
}
else {
sidTmp = pChan->getSID ();
}
// the callback lock is taken when a channel is unistalled or when
// is disconnected to prevent race conditions here
pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp,
cbGuard, guard );
}
else if ( iiu.ca_v44_ok() ) {
// this indicates a claim response for a resource that does
// not exist in the client - so just remove it from the server
iiu.clearChannelRequest ( guard, hdr.m_available, hdr.m_cid );
}
return true;
}
@@ -1417,11 +1371,8 @@ void cac::disconnectChannel (
assert ( this->pudpiiu );
this->disconnectAllIO ( guard, chan, true );
chan.getPIIU()->uninstallChan ( guard, chan );
chan.disconnect ( *this->pudpiiu );
chan.disconnect ( *this->pudpiiu, cbGuard, guard );
this->pudpiiu->installDisconnectedChannel ( currentTime, chan );
epicsGuardRelease < cacMutex > autoMutexRelease ( guard );
chan.connectStateNotify ( cbGuard );
chan.accessRightsNotify ( cbGuard );
}
bool cac::badTCPRespAction ( epicsGuard < callbackMutex > &, tcpiiu & iiu,
@@ -1508,6 +1459,7 @@ void cac::vSignal ( int ca_status, const char *pfilenm,
void cac::selfTest () const
{
epicsGuard < cacMutex > guard ( this->mutex );
this->chanTable.verify ();
this->ioTable.verify ();
this->sgTable.verify ();
@@ -1522,21 +1474,34 @@ void cac::disconnectNotify ( tcpiiu & iiu )
void cac::initiateAbortShutdown ( tcpiiu & iiu )
{
int exception = ECA_DISCONN;
char hostNameTmp[64];
bool exceptionNeeded = false;
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
epicsGuard < cacMutex > guard ( this->mutex );
iiu.initiateAbortShutdown ( cbGuard, guard );
{
epicsGuard < cacMutex > guard ( this->mutex );
// Disconnect all channels immediately from the timer thread
// because on certain OS such as HPUX it's difficult to
// unblock a blocking send() call, and we need immediate
// disconnect notification.
if ( iiu.channelCount() ) {
char hostNameTmp[64];
iiu.hostName ( hostNameTmp, sizeof ( hostNameTmp ) );
genLocalExcep ( cbGuard, *this, ECA_DISCONN, hostNameTmp );
if ( iiu.channelCount() ) {
iiu.hostName ( hostNameTmp, sizeof ( hostNameTmp ) );
if ( iiu.connecting () ) {
exception = ECA_CONNSEQTMO;
}
exceptionNeeded = true;
}
iiu.initiateAbortShutdown ( cbGuard, guard );
// Disconnect all channels immediately from the timer thread
// because on certain OS such as HPUX it's difficult to
// unblock a blocking send() call, and we need immediate
// disconnect notification.
iiu.removeAllChannels ( cbGuard, guard, *this );
}
if ( exceptionNeeded ) {
genLocalExcep ( cbGuard, *this, exception, hostNameTmp );
}
iiu.removeAllChannels ( cbGuard, guard, *this );
}
void cac::destroyIIU ( tcpiiu & iiu )
+13 -14
View File
@@ -72,18 +72,6 @@ struct caHdrLargeArray;
extern epicsThreadPrivateId caClientCallbackThreadId;
class callbackMutex {
public:
callbackMutex ( cacNotify & );
~callbackMutex ();
void lock ();
void unlock ();
private:
cacNotify & notify;
callbackMutex ( callbackMutex & );
callbackMutex & operator = ( callbackMutex & );
};
class cacMutex {
public:
void lock ();
@@ -110,6 +98,18 @@ public:
epicsGuard < cacMutex > &, nciu & chan ) = 0;
};
class callbackMutex {
public:
callbackMutex ( cacNotify & );
~callbackMutex ();
void lock ();
void unlock ();
private:
cacNotify & notify;
callbackMutex ( callbackMutex & );
callbackMutex & operator = ( callbackMutex & );
};
class cac : private cacRecycle, private cacDisconnectChannelPrivate,
private callbackForMultiplyDefinedPV
{
@@ -134,7 +134,7 @@ public:
unsigned cid, unsigned sid,
ca_uint16_t typeCode, arrayElementCount count,
unsigned minorVersionNumber, const osiSockAddr & );
void connectAllIO ( epicsGuard < cacMutex > &, nciu & chan );
void destroyChannel ( nciu & );
cacChannel & createChannel ( const char *name_str,
cacChannelNotify &chan, cacChannel::priLev pri );
@@ -255,7 +255,6 @@ private:
unsigned beaconAnomalyCount;
void run ();
void connectAllIO ( epicsGuard < cacMutex > &, nciu &chan );
void disconnectAllIO ( epicsGuard < cacMutex > & locker, nciu & chan, bool enableCallbacks );
void flushIfRequired ( epicsGuard < cacMutex > &, netiiu & );
void recycleReadNotifyIO ( netReadNotifyIO &io );
+2
View File
@@ -55,6 +55,7 @@
#include "tsDLList.h"
#include "epicsMutex.h"
#include "epicsSingleton.h"
#ifdef cacIOh_restore_epicsExportSharedSymbols
# define epicsExportSharedSymbols
@@ -250,6 +251,7 @@ private:
};
template < class T > class epicsSingleton;
epicsShareExtern epicsSingleton < cacServiceList > globalServiceListCAC;
epicsShareFunc int epicsShareAPI ca_register_service ( cacService *pService );
+1
View File
@@ -142,6 +142,7 @@
#define ECA_BADPRIORITY DEFMSG(CA_K_ERROR, 56)
#define ECA_NOTTHREADED DEFMSG(CA_K_ERROR, 57)
#define ECA_16KARRAYCLIENT DEFMSG(CA_K_WARNING, 58)
#define ECA_CONNSEQTMO DEFMSG(CA_K_WARNING, 59)
#ifdef __STDC__
#define CAERR_USE_FUNC_PROTO
+56 -2
View File
@@ -109,7 +109,9 @@ void nciu::initiateConnect ()
}
void nciu::connect ( unsigned nativeType,
unsigned nativeCount, unsigned sidIn, bool v41Ok )
unsigned nativeCount, unsigned sidIn,
epicsGuard < callbackMutex > & cbGuard,
epicsGuard < cacMutex > & guard )
{
if ( ! this->f_claimSent ) {
this->cacCtx.printf (
@@ -141,14 +143,46 @@ void nciu::connect ( unsigned nativeType,
* if less than v4.1 then the server will never
* send access rights and there will always be access
*/
bool v41Ok = this->piiu->ca_v41_ok ();
if ( ! v41Ok ) {
this->accessRightState.setReadPermit();
this->accessRightState.setWritePermit();
}
// this installs any subscriptions that
// might still be attached
this->cacCtx.connectAllIO ( guard, *this );
{
epicsGuardRelease < cacMutex > unguard ( guard );
// channel uninstal routine grabs the callback lock so
// a channel will not be deleted while a call back is
// in progress
//
// the callback lock is also taken when a channel
// disconnects to prevent a race condition with the
// code below - ie we hold the callback lock here
// so a chanel cant be destroyed out from under us.
this->notify().connectNotify ();
/*
* if less than v4.1 then the server will never
* send access rights and we know that there
* will always be access and also need to call
* their call back here
*/
if ( ! v41Ok ) {
this->notify().accessRightsNotify ( this->accessRightState );
}
}
}
void nciu::disconnect ( netiiu & newiiu )
void nciu::disconnect (
netiiu & newiiu, epicsGuard < callbackMutex > & cbGuard,
epicsGuard < cacMutex > & guard )
{
bool currentlyConnected = this->f_connected;
this->piiu = & newiiu;
this->retry = disconnectRetrySetpoint;
this->typeCode = USHRT_MAX;
@@ -158,6 +192,26 @@ void nciu::disconnect ( netiiu & newiiu )
this->accessRightState.clrWritePermit();
this->f_claimSent = false;
this->f_connected = false;
if ( currentlyConnected ) {
epicsGuardRelease < cacMutex > autoMutexRelease ( guard );
this->notify().disconnectNotify ();
this->notify().accessRightsNotify ( this->accessRightState );
}
}
void nciu::accessRightsStateChange (
const caAccessRights & arIn, epicsGuard < callbackMutex > &,
epicsGuard < cacMutex > & guard )
{
this->accessRightState = arIn;
//
// the channel delete routine takes the call back lock so
// that this will not be called when the channel is being
// deleted.
//
epicsGuardRelease < cacMutex > unguard ( guard );
this->notify().accessRightsNotify ( this->accessRightState );
}
/*
+18 -37
View File
@@ -70,17 +70,19 @@ public:
~nciu ();
void destroy ();
void connect ( unsigned nativeType,
unsigned nativeCount, unsigned sid, bool v41Ok );
void connect ();
void connectStateNotify ( epicsGuard < callbackMutex > & ) const;
void accessRightsNotify ( epicsGuard < callbackMutex > & ) const;
void disconnect ( netiiu & newiiu );
unsigned nativeCount, unsigned sid,
epicsGuard < callbackMutex > & cbGuard,
epicsGuard < cacMutex > & guard );
void connect ( epicsGuard < callbackMutex > & cbGuard,
epicsGuard < cacMutex > & guard );
void disconnect ( netiiu & newiiu, epicsGuard < callbackMutex > & cbGuard,
epicsGuard < cacMutex > & guard );
bool searchMsg ( class udpiiu & iiu, unsigned & retryNoForThisChannel );
void createChannelRequest ( class tcpiiu & iiu, epicsGuard < cacMutex > & );
bool identifierEquivelence ( unsigned idToMatch );
void beaconAnomalyNotify ();
void serviceShutdownNotify ();
void accessRightsStateChange ( const caAccessRights & );
void accessRightsStateChange ( const caAccessRights &,
epicsGuard < callbackMutex > &, epicsGuard < cacMutex > & );
ca_uint32_t getSID () const;
ca_uint32_t getCID () const;
netiiu * getPIIU ();
@@ -88,7 +90,8 @@ public:
cac & getClient ();
int printf ( const char *pFormat, ... );
void searchReplySetUp ( netiiu &iiu, unsigned sidIn,
ca_uint16_t typeIn, arrayElementCount countIn );
ca_uint16_t typeIn, arrayElementCount countIn,
epicsGuard < cacMutex > & );
void show ( unsigned level ) const;
const char *pName () const;
unsigned nameLen () const;
@@ -152,16 +155,6 @@ inline void nciu::operator delete ( void * pCadaver,
}
#endif
inline bool nciu::identifierEquivelence ( unsigned idToMatch )
{
return idToMatch == this->id;
}
inline void nciu::accessRightsStateChange ( const caAccessRights & arIn )
{
this->accessRightState = arIn;
}
inline ca_uint32_t nciu::getSID () const
{
return this->sid;
@@ -173,15 +166,18 @@ inline ca_uint32_t nciu::getCID () const
}
// this is to only be used by early protocol revisions
inline void nciu::connect ()
inline void nciu::connect ( epicsGuard < callbackMutex > & cbGuard,
epicsGuard < cacMutex > & guard )
{
this->connect ( this->typeCode, this->count, this->sid, false );
this->connect ( this->typeCode, this->count,
this->sid, cbGuard, guard );
}
inline void nciu::searchReplySetUp ( netiiu &iiu, unsigned sidIn,
ca_uint16_t typeIn, arrayElementCount countIn )
ca_uint16_t typeIn, arrayElementCount countIn,
epicsGuard < cacMutex > & )
{
this->piiu = &iiu;
this->piiu = & iiu;
this->typeCode = typeIn;
this->count = countIn;
this->sid = sidIn;
@@ -203,21 +199,6 @@ inline void nciu::writeException ( epicsGuard < callbackMutex > &, int status,
this->notify().writeException ( status, pContext, typeIn, countIn );
}
inline void nciu::accessRightsNotify ( epicsGuard < callbackMutex > & ) const
{
this->notify().accessRightsNotify ( this->accessRightState );
}
inline void nciu::connectStateNotify ( epicsGuard < callbackMutex > & ) const
{
if ( this->f_connected ) {
this->notify().connectNotify ();
}
else {
this->notify().disconnectNotify ();
}
}
inline const netiiu * nciu::getConstPIIU () const
{
return this->piiu;
+5
View File
@@ -37,6 +37,11 @@ bool netiiu::ca_v42_ok () const
return false;
}
bool netiiu::ca_v41_ok () const
{
return false;
}
void netiiu::writeRequest ( epicsGuard < cacMutex > &, nciu &,
unsigned, unsigned, const void * )
{
+1
View File
@@ -44,6 +44,7 @@ public:
virtual ~netiiu ();
virtual void hostName ( char *pBuf, unsigned bufLength ) const = 0;
virtual const char * pHostName () const = 0; // deprecated - please do not use
virtual bool ca_v41_ok () const = 0;
virtual bool ca_v42_ok () const = 0;
virtual void writeRequest ( epicsGuard < cacMutex > &, nciu &,
unsigned type, unsigned nElem, const void *pValue ) = 0;
+2 -2
View File
@@ -638,7 +638,7 @@ void tcpiiu::disconnectNotify ( epicsGuard < cacMutex > & )
}
void tcpiiu::initiateAbortShutdown ( epicsGuard < callbackMutex > &,
epicsGuard < cacMutex > & )
epicsGuard < cacMutex > & guard )
{
if ( ! this->discardingPendingData ) {
// force abortive shutdown sequence
@@ -1306,7 +1306,7 @@ void tcpiiu::installChannel ( epicsGuard < cacMutex > & guard,
ca_uint16_t typeIn, arrayElementCount countIn )
{
this->channelList.add ( chan );
chan.searchReplySetUp ( *this, sidIn, typeIn, countIn );
chan.searchReplySetUp ( *this, sidIn, typeIn, countIn, guard );
chan.createChannelRequest ( *this, guard );
this->flushRequest ();
}
+5 -1
View File
@@ -476,7 +476,6 @@ void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repeaterPort )
struct sockaddr_in ia;
struct sockaddr sa;
} bd;
int flag;
if ( repeaterPort > 0xffff ) {
fprintf ( stderr, "caStartRepeaterIfNotInstalled () : strange repeater port specified\n" );
@@ -1094,6 +1093,11 @@ bool udpiiu::ca_v42_ok () const
return netiiu::ca_v42_ok ();
}
bool udpiiu::ca_v41_ok () const
{
return netiiu::ca_v41_ok ();
}
void udpiiu::writeRequest ( epicsGuard < cacMutex > & guard, nciu & chan, unsigned type,
unsigned nElem, const void * pValue )
{
+1
View File
@@ -163,6 +163,7 @@ private:
void hostName ( char *pBuf, unsigned bufLength ) const;
const char * pHostName () const; // deprecated - please do not use
bool ca_v42_ok () const;
bool ca_v41_ok () const;
void writeRequest ( epicsGuard < cacMutex > &, nciu &, unsigned type,
unsigned nElem, const void *pValue );
void writeNotifyRequest ( epicsGuard < cacMutex > &, nciu &, netWriteNotifyIO &,
+6
View File
@@ -124,6 +124,7 @@ public:
void hostName ( char *pBuf, unsigned bufLength ) const;
bool alive () const;
bool connecting () const;
osiSockAddr getNetworkAddress () const;
int printf ( const char *pformat, ... );
unsigned channelCount ();
@@ -251,6 +252,11 @@ inline bool tcpiiu::alive () const // X aCC 361
this->state == iiucs_connected );
}
inline bool tcpiiu::connecting () const
{
return ( this->state == iiucs_connecting );
}
inline void tcpiiu::beaconAnomalyNotify ()
{
this->recvDog.beaconAnomalyNotify ();