diff --git a/src/ca/bhe.cpp b/src/ca/bhe.cpp index 1ada50554..aecf2f5a8 100644 --- a/src/ca/bhe.cpp +++ b/src/ca/bhe.cpp @@ -26,6 +26,11 @@ bool bhe::updateBeaconPeriod (osiTime programBeginTime) osiTime current = osiTime::getCurrent (); if ( this->timeStamp == osiTime () ) { + + if ( this->piiu ) { + this->piiu->beaconAnomalyNotify (); + } + /* * this is the 1st beacon seen - the beacon time stamp * was not initialized during BHE create because @@ -34,17 +39,6 @@ bool bhe::updateBeaconPeriod (osiTime programBeginTime) */ this->timeStamp = current; - /* - * be careful about using beacons to reset the connection - * time out watchdog until we have received a ping response - * from the IOC (this makes the software detect reconnects - * faster when the server is rebooted twice in rapid - * succession before a 1st or 2nd beacon has been received) - */ - if (this->piiu) { - this->piiu->beaconAnomaly = TRUE; - } - return netChange; } @@ -55,6 +49,10 @@ bool bhe::updateBeaconPeriod (osiTime programBeginTime) if ( this->averagePeriod < 0.0 ) { ca_real totalRunningTime; + if ( this->piiu ) { + this->piiu->beaconAnomalyNotify (); + } + /* * this is the 2nd beacon seen. We cant tell about * the change in period at this point so we just @@ -62,17 +60,6 @@ bool bhe::updateBeaconPeriod (osiTime programBeginTime) */ this->averagePeriod = currentPeriod; - /* - * be careful about using beacons to reset the connection - * time out watchdog until we have received a ping response - * from the IOC (this makes the software detect reconnects - * faster when the server is rebooted twice in rapid - * succession before a 2nd beacon has been received) - */ - if (this->piiu) { - this->piiu->beaconAnomaly = TRUE; - } - /* * ignore beacons seen for the first time shortly after * init, but do not ignore beacons arriving with a short @@ -82,16 +69,6 @@ bool bhe::updateBeaconPeriod (osiTime programBeginTime) totalRunningTime = this->timeStamp - programBeginTime; if ( currentPeriod <= totalRunningTime ) { netChange = true; -# ifdef DEBUG - { - char name[64]; - - ipAddrToA ( &this->inetAddr, name, sizeof (name) ); - ca_printf ( - "new beacon from %s with period=%f running time to first beacon=%f\n", - name, currentPeriod, totalRunningTime ); - } -# endif } } else { @@ -107,12 +84,12 @@ bool bhe::updateBeaconPeriod (osiTime programBeginTime) */ if ( currentPeriod >= this->averagePeriod * 1.25 ) { + /* + * trigger on any missing beacon + * if connected to this server + */ if ( this->piiu ) { - /* - * trigger on any missing beacon - * if connected to this server - */ - this->piiu->beaconAnomaly = true; + this->piiu->beaconAnomalyNotify (); } if ( currentPeriod >= this->averagePeriod * 3.25 ) { @@ -124,18 +101,6 @@ bool bhe::updateBeaconPeriod (osiTime programBeginTime) } } - -# ifdef DEBUG - if (netChange) { - char name[64]; - - ipAddrToA (&pBHE->inetAddr, name, sizeof(name)); - ca_printf ( - "net resume seen %s cur=%f avg=%f\n", - name, currentPeriod, pBHE->averagePeriod); - } -# endif - /* * Is this an IOC seen because of an IOC reboot * (beacon come at a higher rate just after the @@ -147,20 +112,19 @@ bool bhe::updateBeaconPeriod (osiTime programBeginTime) * problems because the echo response will tell us * that the server is available */ - if ( currentPeriod <= this->averagePeriod * 0.80 ) { -# ifdef DEBUG - { - char name[64]; - - ipAddrToA ( &this->inetAddr, name, sizeof (name) ); - ca_printf ( - "reboot seen %s cur=%f avg=%f\n", - name, currentPeriod, this->averagePeriod); - } -# endif - netChange = true; + else if ( currentPeriod <= this->averagePeriod * 0.80 ) { if ( this->piiu ) { - this->piiu->beaconAnomaly = true; + this->piiu->beaconAnomalyNotify (); + } + netChange = true; + } + else { + /* + * update state of health for active virtual circuits + * if the beacon looks ok + */ + if ( this->piiu ) { + piiu->beaconArrivalNotify (); // reset connection activity watchdog } } diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 61f337bec..dd8b13059 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -478,21 +478,8 @@ void cac::beaconNotify ( const inetAddrID &addr ) * look for it in the hash table */ pBHE = this->lookupBeaconInetAddr ( addr ); - if (pBHE) { - - netChange = pBHE->updateBeaconPeriod (this->programBeginTime); - - /* - * update state of health for active virtual circuits - * (only if we are not suspicious about past beacon changes - * until the next echo reply) - */ - tcpiiu *piiu = pBHE->getIIU (); - if ( piiu ) { - if ( ! piiu->beaconAnomaly ) { - piiu->rescheduleRecvTimer (); // reset connection activity watchdog - } - } + if ( pBHE ) { + netChange = pBHE->updateBeaconPeriod ( this->programBeginTime ); } else { /* @@ -503,7 +490,7 @@ void cac::beaconNotify ( const inetAddrID &addr ) * shortly after the program started up) */ netChange = FALSE; - this->createBeaconHashEntry (addr, osiTime::getCurrent () ); + this->createBeaconHashEntry ( addr, osiTime::getCurrent () ); } if ( ! netChange ) { diff --git a/src/ca/iocinf.h b/src/ca/iocinf.h index 88c83306b..ef5f48d7a 100644 --- a/src/ca/iocinf.h +++ b/src/ca/iocinf.h @@ -454,8 +454,13 @@ private: class tcpRecvWatchdog : public osiTimer { public: tcpRecvWatchdog (double periodIn, osiTimerQueue & queueIn, bool echoProtocolAcceptedIn); - void echoResponseNotify (); + ~tcpRecvWatchdog (); void rescheduleRecvTimer (); + void messageArrivalNotify (); + void beaconArrivalNotify (); + void beaconAnomalyNotify (); + void connectNotify (); + private: void expire (); void destroy (); @@ -465,16 +470,21 @@ private: virtual void shutdown () = 0; virtual void noopRequestMsg () = 0; virtual void echoRequestMsg () = 0; + virtual void hostName ( char *pBuf, unsigned bufLength ) const = 0; const double period; const bool echoProtocolAccepted; - bool echoResponsePending; + bool responsePending; + bool beaconAnomaly; + bool dead; }; class tcpSendWatchdog : public osiTimer { public: tcpSendWatchdog (double periodIn, osiTimerQueue & queueIn); - void rescheduleSendTimer (); + ~tcpSendWatchdog (); + void armSendWatchdog (); + void cancelSendWatchdog (); private: void expire (); void destroy (); @@ -482,6 +492,7 @@ private: double delay () const; const char *name () const; virtual void shutdown () = 0; + virtual void hostName ( char *pBuf, unsigned bufLength ) const = 0; const double period; }; @@ -531,19 +542,17 @@ public: unsigned contiguous_msg_count; unsigned curMsgBytes; SOCKET sock; - unsigned char state; /* for use with iiu_conn_state enum */ + iiu_conn_state state; bool client_busy; bool echoRequestPending; bool claimRequestsPending; bool sendPending; bool recvPending; bool pushPending; - bool beaconAnomaly; virtual void show (unsigned level) const; private: - bool compareIfTCP (nciu &chan, const sockaddr_in &) const; int pushDatagramMsg (const caHdr *pMsg, const void *pExt, ca_uint16_t extsize); diff --git a/src/ca/tcpRecvWatchdog.cpp b/src/ca/tcpRecvWatchdog.cpp index 845a1a198..a196726bd 100644 --- a/src/ca/tcpRecvWatchdog.cpp +++ b/src/ca/tcpRecvWatchdog.cpp @@ -13,18 +13,18 @@ #include "iocinf.h" tcpRecvWatchdog::tcpRecvWatchdog - (double periodIn, osiTimerQueue & queueIn, bool echoProtocolAcceptedIn) : - osiTimer (periodIn, queueIn), - period (periodIn), - echoProtocolAccepted (echoProtocolAcceptedIn), - echoResponsePending (false) + ( double periodIn, osiTimerQueue & queueIn, bool echoProtocolAcceptedIn ) : + osiTimer ( queueIn ), + period ( periodIn ), + echoProtocolAccepted ( echoProtocolAcceptedIn ), + responsePending ( false ), + beaconAnomaly ( true ), + dead (true) { } -void tcpRecvWatchdog::echoResponseNotify () +tcpRecvWatchdog::~tcpRecvWatchdog () { - this->echoResponsePending = false; - this->reschedule ( this->period ); } void tcpRecvWatchdog::expire () @@ -36,13 +36,16 @@ void tcpRecvWatchdog::expire () if ( ! this->echoProtocolAccepted ) { this->noopRequestMsg (); } - else if ( this->echoResponsePending ) { - ca_printf ( "CA server unresponsive for %f sec. Disconnecting\n", this->period ); + else if ( this->responsePending ) { + char hostName[128]; + this->hostName ( hostName, sizeof (hostName) ); + ca_printf ( "CA server %s unresponsive for %g sec. Disconnecting.\n", + hostName, this->period ); this->shutdown (); } else { this->echoRequestMsg (); - this->echoResponsePending = true; + this->responsePending = true; } } @@ -53,12 +56,12 @@ void tcpRecvWatchdog::destroy () bool tcpRecvWatchdog::again () const { - return true; + return ( ! this->dead ); } double tcpRecvWatchdog::delay () const { - if (this->echoResponsePending) { + if ( this->responsePending ) { return CA_ECHO_TIMEOUT; } else { @@ -66,12 +69,39 @@ double tcpRecvWatchdog::delay () const } } +void tcpRecvWatchdog::beaconArrivalNotify () +{ + if ( ! this->beaconAnomaly && ! this->responsePending ) { + this->reschedule ( this->period ); + } +} + +/* + * be careful about using beacons to reset the connection + * time out watchdog until we have received a ping response + * from the IOC (this makes the software detect reconnects + * faster when the server is rebooted twice in rapid + * succession before a 1st or 2nd beacon has been received) + */ +void tcpRecvWatchdog::beaconAnomalyNotify () +{ + this->beaconAnomaly = true; +} + +void tcpRecvWatchdog::messageArrivalNotify () +{ + this->beaconAnomaly = false; + this->responsePending = false; + this->reschedule ( this->period ); +} + +void tcpRecvWatchdog::connectNotify () +{ + this->reschedule ( this->period ); +} + const char *tcpRecvWatchdog::name () const { return "TCP Receive Watchdog"; } -void tcpRecvWatchdog::rescheduleRecvTimer () -{ - this->reschedule (); -} \ No newline at end of file diff --git a/src/ca/tcpSendWatchdog.cpp b/src/ca/tcpSendWatchdog.cpp index 8b0b24cf0..5a7c2a07f 100644 --- a/src/ca/tcpSendWatchdog.cpp +++ b/src/ca/tcpSendWatchdog.cpp @@ -19,9 +19,16 @@ tcpSendWatchdog::tcpSendWatchdog { } +tcpSendWatchdog::~tcpSendWatchdog () +{ +} + void tcpSendWatchdog::expire () { - ca_printf ( "Request was pending for %f sec. Disconnecting from CA server.\n", this->period); + char hostName[128]; + this->hostName ( hostName, sizeof (hostName) ); + ca_printf ( "Request not accepted by CA server %s for %g sec. Disconnecting.\n", + hostName, this->period); this->shutdown (); } @@ -45,7 +52,12 @@ const char *tcpSendWatchdog::name () const return "TCP Send Watchdog"; } -void tcpSendWatchdog::rescheduleSendTimer () +void tcpSendWatchdog::armSendWatchdog () { this->reschedule (); +} + +void tcpSendWatchdog::cancelSendWatchdog () +{ + this->cancel (); } \ No newline at end of file diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 9b80c0943..d549a84bb 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -93,37 +93,24 @@ void tcpiiu::connect () /* * attempt to connect to a CA server */ + this->armSendWatchdog (); while (1) { int errnoCpy; status = ::connect ( this->sock, &this->dest.sa, sizeof ( this->dest.sa ) ); if ( status == 0 ) { - break; + this->cancelSendWatchdog (); + // put the iiu into the connected state + this->state = iiu_connected; + // start connection activity watchdog + this->connectNotify (); + return; } errnoCpy = SOCKERRNO; - if ( errnoCpy == SOCK_EISCONN ) { - /* - * called connect after we are already connected - * (this appears to be how they provide - * connect completion notification) - */ - break; - } - else if ( errnoCpy == SOCK_EALREADY ) { - return; - } -#ifdef _WIN32 - /* - * including this with vxWorks appears to - * cause trouble - */ - else if ( errnoCpy == SOCK_EINVAL ) { /* a SOCK_EALREADY alias used by early WINSOCK */ - return; - } -#endif - else if ( errnoCpy == SOCK_EINTR ) { + + if ( errnoCpy == SOCK_EINTR ) { if ( this->state == iiu_disconnected ) { return; } @@ -135,21 +122,13 @@ void tcpiiu::connect () return; } else { + this->cancelSendWatchdog (); ca_printf ( "Unable to connect because %d=\"%s\"\n", errnoCpy, SOCKERRSTR (errnoCpy) ); this->shutdown (); return; } } - - /* - * put the iiu into the connected state - */ - this->state = iiu_connected; - - this->rescheduleRecvTimer (); // reset connection activity watchdog - - return; } /* @@ -199,7 +178,7 @@ extern "C" void cacSendThreadTCP (void *pParam) pOutBuf = static_cast ( cacRingBufferReadReserveNoBlock (&piiu->send, &sendCnt) ); while ( ! pOutBuf ) { - piiu->tcpSendWatchdog::cancel (); + piiu->cancelSendWatchdog (); pOutBuf = (char *) cacRingBufferReadReserve (&piiu->send, &sendCnt); if ( piiu->state != iiu_connected ) { semBinaryGive ( piiu->sendThreadExitSignal ); @@ -302,7 +281,7 @@ void tcpiiu::recvMsg () cacRingBufferWriteCommit (&this->recv, totalBytes); // cacRingBufferWriteFlush (&this->recv); - this->rescheduleRecvTimer (); // reschedule connection activity watchdog + this->messageArrivalNotify (); // reschedule connection activity watchdog return; } @@ -419,7 +398,6 @@ tcpiiu::tcpiiu (cac *pcac, const struct sockaddr_in &ina, unsigned minorVersion, this->echoRequestPending = false; this->sendPending = false; this->pushPending = false; - this->beaconAnomaly = false; this->curDataMax = 0ul; this->curMsgBytes = 0ul; this->curDataBytes = 0ul; @@ -607,7 +585,7 @@ bool tcpiiu::compareIfTCP ( nciu &chan, const sockaddr_in &addr ) const void tcpiiu::flush () { if ( cacRingBufferWriteFlush ( &this->send ) ) { - this->rescheduleSendTimer (); + this->armSendWatchdog (); } } @@ -769,8 +747,6 @@ LOCAL void tcp_noop_action (tcpiiu * /* piiu */) */ LOCAL void echo_resp_action (tcpiiu *piiu) { - piiu->echoResponseNotify (); - piiu->beaconAnomaly = false; return; } @@ -1372,7 +1348,7 @@ int tcpiiu::pushStreamMsg (const caHdr *pmsg, if ( ! cacRingBufferWriteLockNoBlock ( &this->send, msgsize ) ) { if ( BlockingOk ) { - this->rescheduleSendTimer (); + this->armSendWatchdog (); cacRingBufferWriteLock ( &this->send ); } else {