diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 835a1689d..fbcd57b21 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -65,7 +65,8 @@ const cac::pProtoStubTCP cac::tcpJumpTableCAC [] = &cac::badTCPRespAction, &cac::badTCPRespAction, &cac::badTCPRespAction, - &cac::badTCPRespAction, + // legacy CA_PROTO_READ_SYNC used as an echo with legacy server + &cac::echoRespAction, &cac::exceptionRespAction, &cac::clearChannelRespAction, &cac::badTCPRespAction, @@ -467,7 +468,7 @@ cacChannel & cac::createChannel ( } void cac::transferChanToVirtCircuit ( - epicsGuard < epicsMutex > & cbGuard, unsigned cid, unsigned sid, // X aCC 431 + unsigned cid, unsigned sid, // X aCC 431 ca_uint16_t typeCode, arrayElementCount count, unsigned minorVersionNumber, const osiSockAddr & addr, const epicsTime & currentTime ) @@ -549,12 +550,7 @@ void cac::transferChanToVirtCircuit ( pChan->getPIIU(guard)->uninstallChanDueToSuccessfulSearchResponse ( guard, *pChan, currentTime ); piiu->installChannel ( - cbGuard, guard, *pChan, sid, typeCode, count ); - - if ( ! piiu->ca_v42_ok ( guard ) ) { - // connect to old server with lock applied - pChan->connect ( cbGuard, guard ); - } + guard, *pChan, sid, typeCode, count ); if ( newIIU ) { piiu->start ( guard ); diff --git a/src/ca/cac.h b/src/ca/cac.h index 9c4c471a7..c4cf59a85 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -127,7 +127,6 @@ public: // channel routines void transferChanToVirtCircuit ( - epicsGuard < epicsMutex > &, unsigned cid, unsigned sid, ca_uint16_t typeCode, arrayElementCount count, unsigned minorVersionNumber, const osiSockAddr &, diff --git a/src/ca/nciu.cpp b/src/ca/nciu.cpp index ba08015b4..9d45584c2 100644 --- a/src/ca/nciu.cpp +++ b/src/ca/nciu.cpp @@ -476,12 +476,7 @@ double nciu::receiveWatchdogDelay ( bool nciu::connected ( epicsGuard < epicsMutex > & guard ) const { guard.assertIdenticalMutex ( this->cacCtx.mutexRef () ); - if ( this->piiu->ca_v42_ok ( guard ) ) { - return this->channelNode::isConnectedAtOrAfterV42 ( guard ); - } - else { - return this->channelNode::isConnectedBeforeV42 ( guard ); - } + return this->channelNode::isConnected ( guard ); } void nciu::show ( unsigned level ) const diff --git a/src/ca/nciu.h b/src/ca/nciu.h index 0890d6b89..56be56964 100644 --- a/src/ca/nciu.h +++ b/src/ca/nciu.h @@ -56,9 +56,8 @@ class channelNode : public tsDLNode < class nciu > { protected: channelNode (); - bool isConnectedAtOrAfterV42 ( epicsGuard < epicsMutex > & ) const; - bool isConnectedBeforeV42 ( epicsGuard < epicsMutex > & ) const; bool isInstalledInServer ( epicsGuard < epicsMutex > & ) const; + bool isConnected ( epicsGuard < epicsMutex > & ) const; static unsigned getMaxSearchTimerCount (); private: enum channelState { @@ -106,6 +105,7 @@ private: cs_searchRespPending17, cs_createReqPend, cs_createRespPend, + cs_v42ConnCallbackPend, cs_subscripReqPend, cs_connected, cs_unrespCircuit, @@ -354,7 +354,7 @@ inline channelNode::channelNode () : { } -inline bool channelNode::isConnectedAtOrAfterV42 ( epicsGuard < epicsMutex > & ) const +inline bool channelNode::isConnected ( epicsGuard < epicsMutex > & ) const { return this->listMember == cs_connected || @@ -362,15 +362,6 @@ inline bool channelNode::isConnectedAtOrAfterV42 ( epicsGuard < epicsMutex > & ) this->listMember == cs_subscripUpdateReqPend; } -inline bool channelNode::isConnectedBeforeV42 ( epicsGuard < epicsMutex > & ) const -{ - return - this->listMember == cs_connected || - this->listMember == cs_subscripReqPend || - this->listMember == cs_subscripUpdateReqPend || - this->listMember == cs_createReqPend; -} - inline bool channelNode::isInstalledInServer ( epicsGuard < epicsMutex > & ) const { return diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 96231edd2..09c9969fa 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -109,27 +109,30 @@ void tcpSendThread::run () } if ( echoLaborNeeded ) { - if ( CA_V43 ( this->iiu.minorProtocolVersion ) ) { - this->iiu.echoRequest ( guard ); - } - else { - this->iiu.versionMessage ( guard, this->iiu.priority() ); - } + this->iiu.echoRequest ( guard ); } while ( nciu * pChan = this->iiu.createReqPend.get () ) { this->iiu.createChannelRequest ( *pChan, guard ); + if ( CA_V42 ( this->iiu.minorProtocolVersion ) ) { this->iiu.createRespPend.add ( *pChan ); pChan->channelNode::listMember = channelNode::cs_createRespPend; } else { - this->iiu.subscripReqPend.add ( *pChan ); + // This wakes up the resp thread so that it can call + // the connect callback. This isnt maximally efficent + // but it has the excellent side effect of not requiring + // that the UDP thread take the callback lock. There are + // almost no V42 servers left at this point. + this->iiu.v42ConnCallbackPend.add ( *pChan ); pChan->channelNode::listMember = - channelNode::cs_subscripReqPend; + channelNode::cs_v42ConnCallbackPend; + this->iiu.echoRequestPending = true; + laborPending = true; } - + if ( this->iiu.sendQue.flushBlockThreshold ( 0u ) ) { laborPending = true; break; @@ -494,6 +497,13 @@ void tcpRecvThread::run () callbackManager mgr ( this->ctxNotify, this->cbMutex ); epicsGuard < epicsMutex > guard ( this->iiu.mutex ); + + // route legacy V42 channel connect through the recv thread - + // the only thread that should be taking the callback lock + while ( nciu * pChan = this->iiu.v42ConnCallbackPend.first () ) { + this->iiu.connectNotify ( guard, *pChan ); + pChan->connect ( mgr.cbGuard, guard ); + } // force the receive watchdog to be reset every 5 frames unsigned contiguousFrameCount = 0; @@ -1062,6 +1072,14 @@ void tcpiiu::show ( unsigned level ) const pChan++; } } + if ( this->v42ConnCallbackPend.count () ) { + ::printf ( "V42 Conn Callback pending channels\n" ); + tsDLIterConst < nciu > pChan = this->v42ConnCallbackPend.firstIter (); + while ( pChan.valid () ) { + pChan->show ( level - 2u ); + pChan++; + } + } if ( this->subscripReqPend.count () ) { ::printf ( "Subscription request pending channels\n" ); tsDLIterConst < nciu > pChan = this->subscripReqPend.firstIter (); @@ -1348,13 +1366,19 @@ void tcpiiu::versionMessage ( epicsGuard < epicsMutex > & guard, // X aCC 431 void tcpiiu::echoRequest ( epicsGuard < epicsMutex > & guard ) // X aCC 431 { guard.assertIdenticalMutex ( this->mutex ); + + int command = CA_PROTO_ECHO; + if ( ! CA_V43 ( this->minorProtocolVersion ) ) { + // we fake an echo to early server using a read sync + command = CA_PROTO_READ_SYNC; + } if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { this->flushRequest ( guard ); } comQueSendMsgMinder minder ( this->sendQue, guard ); this->sendQue.insertRequestHeader ( - CA_PROTO_ECHO, 0u, + command, 0u, 0u, 0u, 0u, 0u, CA_V49 ( this->minorProtocolVersion ) ); minder.commit (); @@ -1756,6 +1780,12 @@ void tcpiiu::disconnectAllChannels ( pChan->getSID(guard), pChan->getCID(guard) ); discIIU.installDisconnectedChannel ( guard, *pChan ); } + + while ( nciu * pChan = this->v42ConnCallbackPend.get () ) { + this->clearChannelRequest ( guard, + pChan->getSID(guard), pChan->getCID(guard) ); + discIIU.installDisconnectedChannel ( guard, *pChan ); + } while ( nciu * pChan = this->subscripReqPend.get () ) { pChan->disconnectAllIO ( cbGuard, guard ); @@ -1798,6 +1828,13 @@ void tcpiiu::unlinkAllChannels ( pChan->getSID(guard), pChan->getCID(guard) ); pChan->serviceShutdownNotify ( cbGuard, guard ); } + + while ( nciu * pChan = this->v42ConnCallbackPend.get () ) { + pChan->disconnectAllIO ( cbGuard, guard ); + this->clearChannelRequest ( guard, + pChan->getSID(guard), pChan->getCID(guard) ); + pChan->serviceShutdownNotify ( cbGuard, guard ); + } while ( nciu * pChan = this->subscripReqPend.get () ) { pChan->disconnectAllIO ( cbGuard, guard ); @@ -1820,7 +1857,6 @@ void tcpiiu::unlinkAllChannels ( } void tcpiiu::installChannel ( - epicsGuard < epicsMutex > & /* cbGuard */, epicsGuard < epicsMutex > & guard, nciu & chan, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn ) @@ -1861,6 +1897,11 @@ void tcpiiu::connectNotify ( this->subscripReqPend.add ( chan ); chan.channelNode::listMember = channelNode::cs_subscripReqPend; } + else if ( chan.channelNode::listMember == channelNode::cs_v42ConnCallbackPend ) { + this->v42ConnCallbackPend.remove ( chan ); + this->subscripReqPend.add ( chan ); + chan.channelNode::listMember = channelNode::cs_subscripReqPend; + } // the TCP send thread is awakened by its receive thread whenever the receive thread // is about to block if this->subscripReqPend has items in it } @@ -1877,6 +1918,9 @@ void tcpiiu::uninstallChan ( case channelNode::cs_createRespPend: this->createRespPend.remove ( chan ); break; + case channelNode::cs_v42ConnCallbackPend: + this->v42ConnCallbackPend.remove ( chan ); + break; case channelNode::cs_subscripReqPend: this->subscripReqPend.remove ( chan ); break; diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp index f8413f0a7..5a0d3ccdd 100644 --- a/src/ca/udpiiu.cpp +++ b/src/ca/udpiiu.cpp @@ -235,7 +235,7 @@ udpiiu::udpiiu ( */ ellInit ( & this->dest ); // X aCC 392 configureChannelAccessAddressList ( & this->dest, this->sock, this->serverPort ); - + caStartRepeaterIfNotInstalled ( this->repeaterPort ); this->pushVersionMsg (); @@ -338,7 +338,7 @@ void udpRecvThread::show ( unsigned /* level */ ) const void udpRecvThread::run () { epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu ); - + if ( ellCount ( & this->iiu.dest ) == 0 ) { // X aCC 392 callbackManager mgr ( this->ctxNotify, this->cbMutex ); epicsGuard < epicsMutex > guard ( this->iiu.cacMutex ); @@ -353,8 +353,6 @@ void udpRecvThread::run () this->iiu.recvBuf, sizeof ( this->iiu.recvBuf ), 0, & src.sa, & src_size ); - callbackManager mgr ( this->ctxNotify, this->cbMutex ); - if ( status <= 0 ) { if ( status < 0 ) { @@ -379,7 +377,7 @@ void udpRecvThread::run () } } else if ( status > 0 ) { - this->iiu.postMsg ( mgr.cbGuard, src, this->iiu.recvBuf, + this->iiu.postMsg ( src, this->iiu.recvBuf, (arrayElementCount) status, epicsTime::getCurrent() ); } @@ -586,19 +584,18 @@ void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repeaterPort ) } bool udpiiu::badUDPRespAction ( - epicsGuard < epicsMutex > & guard, const caHdr &msg, - const osiSockAddr &netAddr, const epicsTime ¤tTime ) + const caHdr &msg, const osiSockAddr &netAddr, const epicsTime ¤tTime ) { char buf[64]; sockAddrToDottedIP ( &netAddr.sa, buf, sizeof ( buf ) ); char date[64]; currentTime.strftime ( date, sizeof ( date ), "%a %b %d %Y %H:%M:%S"); - this->printf ( guard, "CAC: Undecipherable ( bad msg code %u ) UDP message from %s at %s\n", + errlogPrintf ( "CAC: Undecipherable ( bad msg code %u ) UDP message from %s at %s\n", msg.m_cmmd, buf, date ); return false; } -bool udpiiu::versionAction ( epicsGuard < epicsMutex > &, +bool udpiiu::versionAction ( const caHdr & hdr, const osiSockAddr &, const epicsTime & /* currentTime */ ) { epicsGuard < epicsMutex > guard ( this->cacMutex ); @@ -613,7 +610,7 @@ bool udpiiu::versionAction ( epicsGuard < epicsMutex > &, } bool udpiiu::searchRespAction ( // X aCC 361 - epicsGuard < epicsMutex > & cbGuard, const caHdr &msg, + const caHdr &msg, const osiSockAddr & addr, const epicsTime & currentTime ) { if ( addr.sa.sa_family != AF_INET ) { @@ -666,12 +663,12 @@ bool udpiiu::searchRespAction ( // X aCC 361 if ( CA_V42 ( minorVersion ) ) { this->cacRef.transferChanToVirtCircuit - ( cbGuard, msg.m_available, msg.m_cid, 0xffff, + ( msg.m_available, msg.m_cid, 0xffff, 0, minorVersion, serverAddr, currentTime ); } else { this->cacRef.transferChanToVirtCircuit - ( cbGuard, msg.m_available, msg.m_cid, msg.m_dataType, + ( msg.m_available, msg.m_cid, msg.m_dataType, msg.m_count, minorVersion, serverAddr, currentTime ); } @@ -679,7 +676,7 @@ bool udpiiu::searchRespAction ( // X aCC 361 } bool udpiiu::beaconAction ( - epicsGuard < epicsMutex > &, const caHdr & msg, + const caHdr & msg, const osiSockAddr & net_addr, const epicsTime & currentTime ) { struct sockaddr_in ina; @@ -724,7 +721,7 @@ bool udpiiu::beaconAction ( } bool udpiiu::repeaterAckAction ( - epicsGuard < epicsMutex > & /* cbGuard */, const caHdr &, + const caHdr &, const osiSockAddr &, const epicsTime &) { this->repeaterSubscribeTmr.confirmNotify (); @@ -732,14 +729,14 @@ bool udpiiu::repeaterAckAction ( } bool udpiiu::notHereRespAction ( - epicsGuard < epicsMutex > &, const caHdr &, + const caHdr &, const osiSockAddr &, const epicsTime & ) { return true; } bool udpiiu::exceptionRespAction ( - epicsGuard < epicsMutex > & cbGuard, const caHdr &msg, + const caHdr &msg, const osiSockAddr & net_addr, const epicsTime & currentTime ) { const caHdr &reqMsg = * ( &msg + 1 ); @@ -749,13 +746,13 @@ bool udpiiu::exceptionRespAction ( currentTime.strftime ( date, sizeof ( date ), "%a %b %d %Y %H:%M:%S"); if ( msg.m_postsize > sizeof ( caHdr ) ){ - this->cacRef.printf ( cbGuard, + errlogPrintf ( "error condition \"%s\" detected by %s with context \"%s\" at %s\n", ca_message ( msg.m_available ), name, reinterpret_cast ( &reqMsg + 1 ), date ); } else{ - this->cacRef.printf ( cbGuard, + errlogPrintf ( "error condition \"%s\" detected by %s at %s\n", ca_message ( msg.m_available ), name, date ); } @@ -763,7 +760,7 @@ bool udpiiu::exceptionRespAction ( return true; } -void udpiiu::postMsg ( epicsGuard < epicsMutex > & cbGuard, +void udpiiu::postMsg ( const osiSockAddr & net_addr, char * pInBuf, arrayElementCount blockSize, const epicsTime & currentTime ) @@ -779,7 +776,7 @@ void udpiiu::postMsg ( epicsGuard < epicsMutex > & cbGuard, if ( blockSize < sizeof ( *pCurMsg ) ) { char buf[64]; sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); - this->printf ( cbGuard, + errlogPrintf ( "%s: Undecipherable (too small) UDP msg from %s ignored\n", __FILE__, buf ); return; @@ -816,7 +813,7 @@ void udpiiu::postMsg ( epicsGuard < epicsMutex > & cbGuard, if ( size > blockSize ) { char buf[64]; sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); - this->printf ( cbGuard, + errlogPrintf ( "%s: Undecipherable (payload too small) UDP msg from %s ignored\n", __FILE__, buf ); return; @@ -832,11 +829,11 @@ void udpiiu::postMsg ( epicsGuard < epicsMutex > & cbGuard, else { pStub = &udpiiu::badUDPRespAction; } - bool success = ( this->*pStub ) ( cbGuard, *pCurMsg, net_addr, currentTime ); + bool success = ( this->*pStub ) ( *pCurMsg, net_addr, currentTime ); if ( ! success ) { char buf[256]; sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); - this->printf ( cbGuard, "CAC: Undecipherable UDP message from %s\n", buf ); + errlogPrintf ( "CAC: Undecipherable UDP message from %s\n", buf ); return; } diff --git a/src/ca/udpiiu.h b/src/ca/udpiiu.h index aa5256efd..639cebcfe 100644 --- a/src/ca/udpiiu.h +++ b/src/ca/udpiiu.h @@ -137,7 +137,7 @@ private: bool wakeupMsg (); - void postMsg ( epicsGuard < epicsMutex > & cbGuard, + void postMsg ( const osiSockAddr & net_addr, char *pInBuf, arrayElementCount blockSize, const epicsTime ¤Time ); @@ -147,7 +147,7 @@ private: ca_uint16_t extsize); typedef bool ( udpiiu::*pProtoStubUDP ) ( - epicsGuard < epicsMutex > &, const caHdr &, + const caHdr &, const osiSockAddr &, const epicsTime & ); // UDP protocol dispatch table @@ -155,25 +155,25 @@ private: // UDP protocol stubs bool versionAction ( - epicsGuard < epicsMutex > &, const caHdr &, + const caHdr &, const osiSockAddr &, const epicsTime & ); bool badUDPRespAction ( - epicsGuard < epicsMutex > &, const caHdr &msg, + const caHdr &msg, const osiSockAddr &netAddr, const epicsTime & ); bool searchRespAction ( - epicsGuard < epicsMutex > &, const caHdr &msg, + const caHdr &msg, const osiSockAddr &net_addr, const epicsTime & ); bool exceptionRespAction ( - epicsGuard < epicsMutex > &, const caHdr &msg, + const caHdr &msg, const osiSockAddr &net_addr, const epicsTime & ); bool beaconAction ( - epicsGuard < epicsMutex > &, const caHdr &msg, + const caHdr &msg, const osiSockAddr &net_addr, const epicsTime & ); bool notHereRespAction ( - epicsGuard < epicsMutex > &, const caHdr &msg, + const caHdr &msg, const osiSockAddr &net_addr, const epicsTime & ); bool repeaterAckAction ( - epicsGuard < epicsMutex > &, const caHdr &msg, + const caHdr &msg, const osiSockAddr &net_addr, const epicsTime & ); // netiiu stubs diff --git a/src/ca/virtualCircuit.h b/src/ca/virtualCircuit.h index 414d33d96..7532ce181 100644 --- a/src/ca/virtualCircuit.h +++ b/src/ca/virtualCircuit.h @@ -167,7 +167,7 @@ public: void unlinkAllChannels ( epicsGuard < epicsMutex > & cbGuard, epicsGuard < epicsMutex > & guard ); - void installChannel ( epicsGuard < epicsMutex > &, + void installChannel ( epicsGuard < epicsMutex > &, nciu & chan, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn ); void uninstallChan ( @@ -193,6 +193,7 @@ private: // protected by the callback mutex tsDLList < nciu > createReqPend; tsDLList < nciu > createRespPend; + tsDLList < nciu > v42ConnCallbackPend; tsDLList < nciu > subscripReqPend; tsDLList < nciu > connectedList; tsDLList < nciu > unrespCircuit;