added beacon period diagnostic, fixed flush logic, made tcp

recv and send watch dogs embedded objects
This commit is contained in:
Jeff Hill
2001-01-27 00:15:19 +00:00
parent 5882c4d675
commit d85ee06cee
2 changed files with 46 additions and 55 deletions
+36 -41
View File
@@ -93,10 +93,10 @@ const static char nullBuff[32] = {
extern "C" void cacSendThreadTCP ( void *pParam )
{
tcpiiu *piiu = ( tcpiiu * ) pParam;
//claimMsgCache cache ( CA_V44 ( CA_PROTOCOL_VERSION, piiu->minorProtocolVersion ) );
bool laborNeeded;
while ( true ) {
bool flowControlLaborNeeded;
bool echoLaborNeeded;
epicsEventMustWait ( piiu->sendThreadFlushSignal );
@@ -106,10 +106,12 @@ extern "C" void cacSendThreadTCP ( void *pParam )
{
epicsAutoMutex autoMutex ( piiu->mutex );
laborNeeded = piiu->busyStateDetected != piiu->flowControlActive;
flowControlLaborNeeded = piiu->busyStateDetected != piiu->flowControlActive;
echoLaborNeeded = piiu->echoRequestPending;
piiu->echoRequestPending = false;
}
if ( laborNeeded ) {
if ( flowControlLaborNeeded ) {
if ( piiu->flowControlActive ) {
int status = piiu->disableFlowControlRequest ();
if ( status == ECA_NORMAL ) {
@@ -128,35 +130,20 @@ extern "C" void cacSendThreadTCP ( void *pParam )
printf ( "fc on\n" );
# endif
}
piiu->flushPending = true;
}
{
epicsAutoMutex autoMutex ( piiu->mutex );
laborNeeded = piiu->echoRequestPending;
piiu->echoRequestPending = false;
}
if ( laborNeeded ) {
if ( echoLaborNeeded ) {
if ( CA_V43 ( CA_PROTOCOL_VERSION, piiu->minorProtocolVersion ) ) {
piiu->echoRequest ();
}
else {
piiu->noopRequest ();
}
piiu->flushPending = true;
}
{
epicsAutoMutex autoMutex ( piiu->mutex );
laborNeeded = piiu->flushPending;
piiu->flushPending = false;
}
if ( laborNeeded ) {
if ( ! piiu->flushToWire ( false ) ) {
break;
}
if ( ! piiu->flushToWire ( false ) ) {
break;
}
}
@@ -175,7 +162,7 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
assert ( nBytesInBuf <= INT_MAX );
this->armSendWatchdog ();
this->sendDog.arm ();
while ( true ) {
status = ::send ( this->sock,
@@ -213,7 +200,7 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
}
}
this->cancelSendWatchdog ();
this->sendDog.cancel ();
return nBytes;
}
@@ -285,7 +272,7 @@ unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf )
}
}
this->messageArrivalNotify (); // reschedule connection activity watchdog
this->recvDog.messageArrivalNotify (); // reschedule connection activity watchdog
return totalBytes;
}
@@ -365,9 +352,9 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
// tcpiiu::tcpiiu ()
//
tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, osiTimerQueue &timerQueue ) :
tcpRecvWatchdog ( connectionTimeout, timerQueue ),
tcpSendWatchdog ( connectionTimeout, timerQueue ),
netiiu ( &cac ),
recvDog ( *this, connectionTimeout, timerQueue ),
sendDog ( *this, connectionTimeout, timerQueue ),
ioTable ( 1024 ),
sendQue ( *this ),
pHostNameCache ( 0 ),
@@ -381,7 +368,6 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, osiTimerQueue &timerQueue )
busyStateDetected ( false ),
flowControlActive ( false ),
echoRequestPending ( false ),
flushPending ( false ),
msgHeaderAvailable ( false ),
sockCloseCompleted ( false )
{
@@ -453,7 +439,6 @@ bool tcpiiu::initiateConnect ( const osiSockAddr &addrIn, unsigned minorVersion,
this->busyStateDetected = false;
this->flowControlActive = false;
this->echoRequestPending = false;
this->flushPending = false;
this->msgHeaderAvailable = false;
this->sockCloseCompleted = false;
@@ -534,14 +519,14 @@ void tcpiiu::connect ()
/*
* attempt to connect to a CA server
*/
this->armSendWatchdog ();
this->sendDog.arm ();
while ( ! this->sockCloseCompleted ) {
int status = ::connect ( this->sock, &this->addr.sa, sizeof ( addr.sa ) );
if ( status == 0 ) {
this->cancelSendWatchdog ();
this->sendDog.cancel ();
epicsAutoMutex autoMutex ( this->mutex );
@@ -550,7 +535,7 @@ void tcpiiu::connect ()
this->state = iiu_connected;
// start connection activity watchdog
this->connectNotify ();
this->recvDog.connectNotify ();
}
return;
@@ -560,7 +545,7 @@ void tcpiiu::connect ()
if ( errnoCpy == SOCK_EINTR ) {
if ( this->state != iiu_connecting ) {
this->cancelSendWatchdog ();
this->sendDog.cancel ();
return;
}
else {
@@ -568,11 +553,11 @@ void tcpiiu::connect ()
}
}
else if ( errnoCpy == SOCK_SHUTDOWN ) {
this->cancelSendWatchdog ();
this->sendDog.cancel ();
return;
}
else {
this->cancelSendWatchdog ();
this->sendDog.cancel ();
ca_printf ( "Unable to connect because %d=\"%s\"\n",
errnoCpy, SOCKERRSTR ( errnoCpy ) );
this->cleanShutdown ();
@@ -586,8 +571,8 @@ void tcpiiu::connect ()
*/
void tcpiiu::cleanShutdown ()
{
this->cancelSendWatchdog ();
this->cancelRecvWatchdog ();
this->sendDog.cancel ();
this->recvDog.cancel ();
epicsAutoMutex autoMutex ( this->mutex );
@@ -632,8 +617,8 @@ void tcpiiu::cleanShutdown ()
*/
void tcpiiu::forcedShutdown ()
{
this->cancelSendWatchdog ();
this->cancelRecvWatchdog ();
this->sendDog.cancel ();
this->recvDog.cancel ();
epicsAutoMutex autoMutex ( this->mutex );
@@ -864,7 +849,6 @@ void tcpiiu::show ( unsigned level ) const
epicsEventShow ( this->recvThreadExitSignal, level-3u );
printf ( "\tfully constructed bool %u\n", this->fullyConstructedFlag );
printf ("\techo pending bool = %u\n", this->echoRequestPending );
printf ("\tflush pending bool = %u\n", this->flushPending );
printf ("\treceive message header available bool = %u\n", this->msgHeaderAvailable );
if ( this->pBHE ) {
this->pBHE->show ( level - 3u );
@@ -877,7 +861,7 @@ void tcpiiu::show ( unsigned level ) const
bool tcpiiu::setEchoRequestPending ()
{
{
epicsAutoMutex autoMuext ( this->mutex );
epicsAutoMutex autoMutex ( this->mutex );
this->echoRequestPending = true;
}
this->flush ();
@@ -2007,4 +1991,15 @@ void tcpiiu::uninstallIO ( baseNMIU &io )
io.channel ().tcpiiuPrivateListOfIO::eventq.remove ( io );
}
double tcpiiu::beaconPeriod () const
{
epicsAutoMutex autoMutex ( this->mutex );
if ( this->pBHE ) {
return this->pBHE->period ();
}
else {
return netiiu::beaconPeriod ();
}
}
+10 -14
View File
@@ -47,20 +47,7 @@ inline SOCKET tcpiiu::getSock () const
inline void tcpiiu::flush ()
{
bool signalNeeded;
{
epicsAutoMutex autoMutex ( this->mutex );
if ( this->sendQue.occupiedBytes () ) {
this->flushPending = true;
signalNeeded = true;
}
else {
signalNeeded = false;
}
}
if ( signalNeeded ) {
epicsEventSignal ( this->sendThreadFlushSignal );
}
epicsEventSignal ( this->sendThreadFlushSignal );
}
inline bool tcpiiu::ca_v44_ok () const
@@ -94,3 +81,12 @@ inline bhe * tcpiiu::getBHE () const
return this->pBHE;
}
inline void tcpiiu::beaconAnomalyNotify ()
{
this->recvDog.beaconAnomalyNotify ();
}
inline void tcpiiu::beaconArrivalNotify ()
{
this->recvDog.beaconArrivalNotify ();
}