Changes to support communication with protocol revisions prior to 4.2 (EPICS 3.11)

This commit is contained in:
Jeff Hill
2004-10-04 16:39:00 +00:00
parent 46cfe589d5
commit 753ecff270
6 changed files with 216 additions and 182 deletions

View File

@@ -252,7 +252,7 @@ cacChannel::ioStatus nciu::read (
{
guard.assertIdenticalMutex ( this->cacCtx.mutexRef () );
if ( ! this->isConnected ( guard ) ) {
if ( ! this->connected ( guard ) ) {
throw cacChannel::notConnected ();
}
if ( ! this->accessRightState.readPermit () ) {
@@ -398,17 +398,11 @@ bool nciu::ca_v42_ok (
short nciu::nativeType (
epicsGuard < epicsMutex > & guard ) const
{
short type;
if ( this->channelNode::isConnected ( guard ) ) {
short type = TYPENOTCONN;
if ( this->connected ( guard ) ) {
if ( this->typeCode < SHRT_MAX ) {
type = static_cast <short> ( this->typeCode );
}
else {
type = TYPENOTCONN;
}
}
else {
type = TYPENOTCONN;
}
return type;
}
@@ -416,13 +410,10 @@ short nciu::nativeType (
arrayElementCount nciu::nativeElementCount (
epicsGuard < epicsMutex > & guard ) const
{
arrayElementCount countOut;
if ( this->channelNode::isConnected ( guard ) ) {
arrayElementCount countOut = 0ul;
if ( this->connected ( guard ) ) {
countOut = this->count;
}
else {
countOut = 0ul;
}
return countOut;
}
@@ -455,7 +446,12 @@ double nciu::receiveWatchdogDelay (
bool nciu::connected ( epicsGuard < epicsMutex > & guard ) const
{
guard.assertIdenticalMutex ( this->cacCtx.mutexRef () );
return this->channelNode::isConnected ( guard );
if ( this->piiu->ca_v42_ok ( guard ) ) {
return this->channelNode::isConnectedAtOrAfterV42 ( guard );
}
else {
return this->channelNode::isConnectedBeforeV42 ( guard );
}
}
void nciu::show ( unsigned level ) const
@@ -467,7 +463,7 @@ void nciu::show ( unsigned level ) const
void nciu::show (
epicsGuard < epicsMutex > & guard, unsigned level ) const
{
if ( this->channelNode::isConnected ( guard ) ) {
if ( this->connected ( guard ) ) {
char hostNameTmp [256];
this->hostName ( guard, hostNameTmp, sizeof ( hostNameTmp ) );
::printf ( "Channel \"%s\", connected to server %s",

View File

@@ -57,7 +57,8 @@ class channelNode : public tsDLNode < class nciu >
{
protected:
channelNode ();
bool isConnected ( epicsGuard < epicsMutex > & ) const;
bool isConnectedAtOrAfterV42 ( epicsGuard < epicsMutex > & ) const;
bool isConnectedBeforeV42 ( epicsGuard < epicsMutex > & ) const;
bool isInstalledInServer ( epicsGuard < epicsMutex > & ) const;
static unsigned getMaxSearchTimerCount ();
private:
@@ -351,7 +352,7 @@ inline channelNode::channelNode () :
{
}
inline bool channelNode::isConnected ( epicsGuard < epicsMutex > & ) const
inline bool channelNode::isConnectedAtOrAfterV42 ( epicsGuard < epicsMutex > & ) const
{
return
this->listMember == cs_connected ||
@@ -359,6 +360,15 @@ inline bool channelNode::isConnected ( epicsGuard < epicsMutex > & ) const
this->listMember == cs_subscripUpdateReqPend;
}
inline bool channelNode::isConnectedBeforeV42 ( epicsGuard < epicsMutex > & ) const
{
return
this->listMember == cs_connected ||
this->listMember == cs_subscripReqPend ||
this->listMember == cs_subscripUpdateReqPend ||
this->listMember == cs_createReqPend;
}
inline bool channelNode::isInstalledInServer ( epicsGuard < epicsMutex > & ) const
{
return

View File

@@ -115,21 +115,18 @@ void tcpRecvWatchdog::beaconAnomalyNotify (
}
void tcpRecvWatchdog::messageArrivalNotify (
epicsGuard < epicsMutex > & guard,
const epicsTime & currentTime )
{
bool restartNeeded = false;
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( ! ( this->shuttingDown || this->probeResponsePending ) ) {
this->beaconAnomaly = false;
restartNeeded = true;
}
}
// dont hold the lock for fear of deadlocking
// because cancel is blocking for the completion
// of expire() which takes the lock - it take also
// the callback lock
if ( restartNeeded ) {
guard.assertIdenticalMutex ( this->mutex );
if ( ! ( this->shuttingDown || this->probeResponsePending ) ) {
this->beaconAnomaly = false;
// dont hold the lock for fear of deadlocking
// because cancel is blocking for the completion
// of expire() which takes the lock - it take also
// the callback lock
epicsGuardRelease < epicsMutex > unguard ( guard );
this->timer.start ( *this, currentTime + this->period );
debugPrintf ( ("received a message - reseting circuit recv watchdog\n") );
}
@@ -203,15 +200,17 @@ void tcpRecvWatchdog::sendBacklogProgressNotify (
}
}
void tcpRecvWatchdog::connectNotify ()
void tcpRecvWatchdog::connectNotify (
epicsGuard < epicsMutex > & guard )
{
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->shuttingDown ) {
return;
}
guard.assertIdenticalMutex ( this->mutex );
if ( this->shuttingDown ) {
return;
}
{
epicsGuardRelease < epicsMutex > guardRelease ( guard );
this->timer.start ( *this, this->period );
}
this->timer.start ( *this, this->period );
debugPrintf ( ("connected to the server - initiating circuit recv watchdog\n") );
}

View File

@@ -42,6 +42,7 @@ public:
epicsGuard < epicsMutex > &,
const epicsTime & currentTime );
void messageArrivalNotify (
epicsGuard < epicsMutex > & guard,
const epicsTime & currentTime );
void probeResponseNotify (
epicsGuard < epicsMutex > &,
@@ -50,10 +51,11 @@ public:
epicsGuard < epicsMutex > &,
const epicsTime & currentTime );
void beaconAnomalyNotify ( epicsGuard < epicsMutex > & );
void connectNotify ();
void connectNotify (
epicsGuard < epicsMutex > & );
void sendTimeoutNotify (
epicsGuard < epicsMutex > & cbMutex,
epicsGuard < epicsMutex > & mutex,
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard,
const epicsTime & currentTime );
void cancel ();
void shutdown ();

View File

@@ -117,9 +117,17 @@ void tcpSendThread::run ()
while ( nciu * pChan = this->iiu.createReqPend.get () ) {
this->iiu.createChannelRequest ( *pChan, guard );
this->iiu.createRespPend.add ( *pChan );
pChan->channelNode::listMember =
channelNode::cs_createRespPend;
if ( CA_V42 ( this->iiu.minorProtocolVersion ) ) {
this->iiu.createRespPend.add ( *pChan );
pChan->channelNode::listMember =
channelNode::cs_createRespPend;
}
else {
this->iiu.subscripReqPend.add ( *pChan );
pChan->channelNode::listMember =
channelNode::cs_subscripReqPend;
}
if ( this->iiu.sendQue.flushBlockThreshold ( 0u ) ) {
laborPending = true;
break;
@@ -226,8 +234,7 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
this->sendDog.start ( currentTime );
while ( this->state == iiucs_connected ||
this->state == iiucs_clean_shutdown ) {
while ( true ) {
int status = ::send ( this->sock,
static_cast < const char * > (pBuf), (int) nBytesInBuf, 0 );
if ( status > 0 ) {
@@ -236,13 +243,18 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
break;
}
else {
int localError = SOCKERRNO;
// winsock indicates disconnect by returning zero here
if ( status == 0 ) {
this->disconnectNotify ();
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->state != iiucs_connected &&
this->state != iiucs_clean_shutdown ) {
break;
}
// winsock indicates disconnect by returning zero here
if ( status == 0 ) {
this->disconnectNotify ( guard );
break;
}
int localError = SOCKERRNO;
if ( localError == SOCK_EINTR ) {
continue;
@@ -252,7 +264,10 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
errlogPrintf (
"CAC: system low on network buffers "
"- send retry in 15 seconds\n" );
epicsThreadSleep ( 15.0 );
{
epicsGuardRelease < epicsMutex > unguard ( guard );
epicsThreadSleep ( 15.0 );
}
continue;
}
@@ -269,7 +284,7 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
sockErrBuf );
}
this->disconnectNotify ();
this->disconnectNotify ( guard );
break;
}
}
@@ -284,9 +299,7 @@ void tcpiiu::recvBytes (
{
assert ( nBytesInBuf <= INT_MAX );
while ( this->state == iiucs_connected ||
this->state == iiucs_clean_shutdown ) {
while ( true ) {
int status = ::recv ( this->sock, static_cast <char *> ( pBuf ),
static_cast <int> ( nBytesInBuf ), 0 );
@@ -297,10 +310,10 @@ void tcpiiu::recvBytes (
return;
}
else {
int localErrno = SOCKERRNO;
epicsGuard < epicsMutex > guard ( this->mutex );
if ( status == 0 ) {
this->disconnectNotify ();
this->disconnectNotify ( guard );
stat.bytesCopied = 0u;
stat.circuitState = swioPeerHangup;
return;
@@ -315,6 +328,8 @@ void tcpiiu::recvBytes (
return;
}
int localErrno = SOCKERRNO;
if ( localErrno == SOCK_SHUTDOWN ) {
stat.bytesCopied = 0u;
stat.circuitState = swioPeerHangup;
@@ -329,7 +344,10 @@ void tcpiiu::recvBytes (
errlogPrintf (
"CAC: system low on network buffers "
"- receive retry in 15 seconds\n" );
epicsThreadSleep ( 15.0 );
{
epicsGuardRelease < epicsMutex > unguard ( guard );
epicsThreadSleep ( 15.0 );
}
continue;
}
@@ -386,25 +404,59 @@ void tcpRecvThread::exitWait ()
this->thread.exitWait ();
}
bool tcpRecvThread::validFillStatus (
epicsGuard < epicsMutex > & guard, const statusWireIO & stat )
{
if ( this->iiu.state != tcpiiu::iiucs_connected &&
this->iiu.state != tcpiiu::iiucs_clean_shutdown ) {
return false;
}
if ( stat.circuitState == swioConnected ) {
return true;
}
if ( stat.circuitState == swioPeerHangup ||
stat.circuitState == swioPeerAbort ) {
this->iiu.disconnectNotify ( guard );
}
else if ( stat.circuitState == swioLinkFailure ) {
this->iiu.initiateAbortShutdown ( guard );
}
else if ( stat.circuitState == swioLocalAbort ) {
// state change already occurred
}
else {
errlogMessage ( "cac: invalid fill status - disconnecting" );
this->iiu.disconnectNotify ( guard );
}
return false;
}
void tcpRecvThread::run ()
{
try {
this->iiu.cacRef.attachToClientCtx ();
epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu );
this->connect ();
this->iiu.sendThread.start ();
if ( this->iiu.state != tcpiiu::iiucs_connected ) {
this->iiu.disconnectNotify ();
return;
{
bool connectSuccess = false;
{
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
this->connect ( guard );
connectSuccess = this->iiu.state == tcpiiu::iiucs_connected;
}
if ( ! connectSuccess ) {
this->iiu.recvDog.shutdown ();
this->thread.exitWaitRelease ();
this->iiu.cacRef.destroyIIU ( this->iiu );
return;
}
}
this->iiu.sendThread.start ();
epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu );
this->iiu.cacRef.attachToClientCtx ();
comBuf * pComBuf = new ( this->iiu.comBufMemMgr ) comBuf;
while ( this->iiu.state == tcpiiu::iiucs_connected ||
this->iiu.state == tcpiiu::iiucs_clean_shutdown ) {
bool breakOut = false;
while ( ! breakOut ) {
//
// if this thread has connected channels with subscriptions
// that need to be sent then wakeup the send thread
@@ -430,32 +482,21 @@ void tcpRecvThread::run ()
//
statusWireIO stat;
pComBuf->fillFromWire ( this->iiu, stat );
if ( stat.circuitState != swioConnected ) {
if ( stat.circuitState == swioPeerHangup ||
stat.circuitState == swioPeerAbort ) {
this->iiu.disconnectNotify ();
}
else if ( stat.circuitState == swioLinkFailure ) {
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
this->iiu.initiateAbortShutdown ( guard );
break;
}
else if ( stat.circuitState == swioLocalAbort ) {
break;
}
else {
assert ( 0 );
}
}
if ( stat.bytesCopied == 0u ) {
continue;
}
epicsTime currentTime = epicsTime::getCurrent ();
// reschedule connection activity watchdog
this->iiu.recvDog.messageArrivalNotify ( currentTime );
{
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
if ( ! this->validFillStatus ( guard, stat ) ) {
break;
}
if ( stat.bytesCopied == 0u ) {
continue;
}
// reschedule connection activity watchdog
this->iiu.recvDog.messageArrivalNotify (
guard, currentTime );
}
// only one recv thread at a time may call callbacks
// - pendEvent() blocks until threads waiting for
@@ -488,6 +529,7 @@ void tcpRecvThread::run ()
if ( ! protocolOK ) {
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
this->iiu.initiateAbortShutdown ( guard );
breakOut = true;
break;
}
@@ -497,20 +539,12 @@ void tcpRecvThread::run ()
}
pComBuf->fillFromWire ( this->iiu, stat );
if ( stat.circuitState != swioConnected ) {
if ( stat.circuitState == swioPeerHangup ) {
this->iiu.disconnectNotify ();
}
else if ( stat.circuitState == swioLinkFailure ) {
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
this->iiu.initiateAbortShutdown ( guard );
}
else if ( stat.circuitState == swioLocalAbort ) {
{
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
if ( ! this->validFillStatus ( guard, stat ) ) {
breakOut = true;
break;
}
else {
assert ( 0 );
}
}
}
}
@@ -542,48 +576,46 @@ void tcpRecvThread::run ()
/*
* tcpRecvThread::connect ()
*/
void tcpRecvThread::connect ()
void tcpRecvThread::connect (
epicsGuard < epicsMutex > & guard )
{
// attempt to connect to a CA server
while ( this->iiu.state == tcpiiu::iiucs_connecting ) {
osiSockAddr tmp = this->iiu.address ();
int status = ::connect ( this->iiu.sock,
& tmp.sa, sizeof ( tmp.sa ) );
if ( status == 0 ) {
bool enteredConnectedState = false;
{
epicsGuard < epicsMutex > autoMutex ( this->iiu.mutex );
while ( true ) {
int status;
{
epicsGuardRelease < epicsMutex > unguard ( guard );
osiSockAddr tmp = this->iiu.address ();
status = ::connect ( this->iiu.sock,
& tmp.sa, sizeof ( tmp.sa ) );
}
if ( this->iiu.state == tcpiiu::iiucs_connecting ) {
// put the iiu into the connected state
this->iiu.state = tcpiiu::iiucs_connected;
enteredConnectedState = true;
}
if ( this->iiu.state != tcpiiu::iiucs_connecting ) {
break;
}
if ( status >= 0 ) {
// put the iiu into the connected state
this->iiu.state = tcpiiu::iiucs_connected;
this->iiu.recvDog.connectNotify ( guard );
break;
}
else {
int errnoCpy = SOCKERRNO;
if ( errnoCpy == SOCK_EINTR ) {
continue;
}
if ( enteredConnectedState ) {
// start connection activity watchdog
this->iiu.recvDog.connectNotify ();
else if ( errnoCpy == SOCK_SHUTDOWN ) {
break;
}
else {
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf ( "CAC: Unable to connect because \"%s\"\n",
sockErrBuf );
this->iiu.disconnectNotify ( guard );
break;
}
break;
}
int errnoCpy = SOCKERRNO;
if ( errnoCpy == SOCK_EINTR ) {
continue;
}
else if ( errnoCpy == SOCK_SHUTDOWN ) {
break;
}
else {
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
callbackManager mgr ( this->ctxNotify, this->cbMutex );
this->iiu.printf ( mgr.cbGuard, "Unable to connect because \"%s\"\n",
sockErrBuf );
this->iiu.disconnectNotify ();
break;
}
}
return;
@@ -768,28 +800,30 @@ void tcpiiu::initiateCleanShutdown (
{
guard.assertIdenticalMutex ( this->mutex );
if ( this->state == iiucs_connected ) {
if ( ! this->unresponsiveCircuit ) {
if ( this->unresponsiveCircuit ) {
this->initiateAbortShutdown ( guard );
}
else {
this->state = iiucs_clean_shutdown;
this->sendThreadFlushEvent.signal ();
this->flushBlockEvent.signal ();
}
else {
this->initiateAbortShutdown ( guard );
}
}
else if ( this->state == iiucs_clean_shutdown ) {
if ( this->unresponsiveCircuit ) {
this->initiateAbortShutdown ( guard );
}
}
else if ( this->state == iiucs_connecting ) {
this->initiateAbortShutdown ( guard );
}
}
void tcpiiu::disconnectNotify ()
void tcpiiu::disconnectNotify (
epicsGuard < epicsMutex > & guard )
{
{
epicsGuard < epicsMutex > guard ( this->mutex );
this->state = iiucs_disconnected;
}
guard.assertIdenticalMutex ( this->mutex );
this->state = iiucs_disconnected;
this->sendThreadFlushEvent.signal ();
this->flushBlockEvent.signal ();
}
@@ -1314,11 +1348,6 @@ void tcpiiu::writeRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431
nciu &chan, unsigned type, arrayElementCount nElem, const void *pValue )
{
guard.assertIdenticalMutex ( this->mutex );
// there are situations where the circuit is disconnected, but
// the channel does not know this yet
if ( this->state != iiucs_connected ) {
throw cacChannel::notConnected ();
}
comQueSendMsgMinder minder ( this->sendQue, guard );
this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE,
type, nElem, chan.getSID(guard), chan.getCID(guard), pValue,
@@ -1336,11 +1365,6 @@ void tcpiiu::writeNotifyRequest ( epicsGuard < epicsMutex > & guard, // X aCC 43
if ( ! this->ca_v41_ok ( guard ) ) {
throw cacChannel::unsupportedByService();
}
// there are situations where the circuit is disconnected, but
// the channel does not know this yet
if ( this->state != iiucs_connected ) {
throw cacChannel::notConnected ();
}
comQueSendMsgMinder minder ( this->sendQue, guard );
this->sendQue.insertRequestWithPayLoad ( CA_PROTO_WRITE_NOTIFY,
type, nElem, chan.getSID(guard), io.getId(), pValue,
@@ -1353,11 +1377,6 @@ void tcpiiu::readNotifyRequest ( epicsGuard < epicsMutex > & guard, // X aCC 431
unsigned dataType, arrayElementCount nElem )
{
guard.assertIdenticalMutex ( this->mutex );
// there are situations where the circuit is disconnected, but
// the channel does not know this yet
if ( this->state != iiucs_connected ) {
throw cacChannel::notConnected ();
}
if ( INVALID_DB_REQ ( dataType ) ) {
throw cacChannel::badType ();
}
@@ -1608,13 +1627,6 @@ void tcpiiu::eliminateExcessiveSendBacklog (
{
mutualExclusionGuard.assertIdenticalMutex ( this->mutex );
// this is used to slow down applications that use too much
// send cache, but this is inappropriate when the circuit hasnt
// connected yet.
if ( this->state == iiucs_connecting ) {
return;
}
if ( this->sendQue.flushBlockThreshold ( 0u ) ) {
this->flushRequest ( mutualExclusionGuard );
// the process thread is not permitted to flush as this
@@ -1631,9 +1643,13 @@ void tcpiiu::eliminateExcessiveSendBacklog (
this->blockingForFlush++;
while ( this->sendQue.flushBlockThreshold(0u) ) {
bool userRequestsCanBeAccepted =
this->state == iiucs_connected ||
( ! this->ca_v42_ok ( mutualExclusionGuard ) &&
this->state == iiucs_connecting );
// fail the users request if we have a disconnected
// or unresponsive circuit
if ( this->state != iiucs_connected ||
if ( ! userRequestsCanBeAccepted ||
this->unresponsiveCircuit ) {
throw cacChannel::notConnected ();
}
@@ -1749,10 +1765,13 @@ void tcpiiu::unlinkAllChannels (
guard.assertIdenticalMutex ( this->mutex );
while ( nciu * pChan = this->createReqPend.get () ) {
// with server prior to V42 IO could exit here
pChan->disconnectAllIO ( cbGuard, guard );
pChan->serviceShutdownNotify ( cbGuard, guard );
}
while ( nciu * pChan = this->createRespPend.get () ) {
pChan->disconnectAllIO ( cbGuard, guard );
this->clearChannelRequest ( guard,
pChan->getSID(guard), pChan->getCID(guard) );
pChan->serviceShutdownNotify ( cbGuard, guard );
@@ -1813,10 +1832,13 @@ void tcpiiu::connectNotify (
epicsGuard < epicsMutex > & guard, nciu & chan )
{
guard.assertIdenticalMutex ( this->mutex );
this->createRespPend.remove ( chan );
this->subscripReqPend.add ( chan );
chan.channelNode::listMember = channelNode::cs_subscripReqPend;
// this improves robustness in the face of a server sending
// protocol that does not match its declared protocol revision
if ( chan.channelNode::listMember == channelNode::cs_createRespPend ) {
this->createRespPend.remove ( chan );
this->subscripReqPend.add ( chan );
chan.channelNode::listMember = channelNode::cs_subscripReqPend;
}
// the TCP send thread is awakened by its receive thread whenever the receive thread
// is about to block if this->subscripReqPend has items in it
}

View File

@@ -72,7 +72,11 @@ private:
epicsMutex & cbMutex;
cacContextNotify & ctxNotify;
void run ();
void connect ();
void connect (
epicsGuard < epicsMutex > & guard );
bool validFillStatus (
epicsGuard < epicsMutex > & guard,
const statusWireIO & stat );
};
class tcpSendThread : private epicsThreadRunable {
@@ -250,7 +254,8 @@ private:
epicsGuard < epicsMutex > & );
void initiateAbortShutdown (
epicsGuard < epicsMutex > & );
void disconnectNotify ();
void disconnectNotify (
epicsGuard < epicsMutex > & );
// send protocol stubs
void echoRequest (