From 1ba658b4524c7e2dd7557afa0046157b78b8555e Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Sat, 25 Jul 2009 00:44:21 +0000 Subject: [PATCH] improved diagnostics --- src/cas/generic/st/casStreamOS.cc | 131 +++++++++++++++++------------- src/cas/generic/st/casStreamOS.h | 3 + 2 files changed, 77 insertions(+), 57 deletions(-) diff --git a/src/cas/generic/st/casStreamOS.cc b/src/cas/generic/st/casStreamOS.cc index d5a74d09e..7ab2aad7c 100644 --- a/src/cas/generic/st/casStreamOS.cc +++ b/src/cas/generic/st/casStreamOS.cc @@ -23,6 +23,30 @@ #define epicsExportSharedFunc #include "casStreamOS.h" +// +// printStatus () +// +#if defined(DEBUG) +void casStreamOS :: printStatus ( const char * pCtx ) const +{ + static epicsTime beginTime = epicsTime :: getCurrent (); + epicsTime current = epicsTime :: getCurrent (); + printf ( + "%03.3f, " + "Sock %d, %s, " + "recv backlog %u, " + "send backlog %u\n", + current - beginTime, + this->getFD(), + pCtx, + this->inBufBytesAvailable (), + this->outBytesPresent () ); + fflush ( stdout ); +} +#else +inline void casStreamOS :: printStatus ( const char * ) const {} +#endif + // // casStreamReadReg // @@ -44,13 +68,7 @@ private: inline casStreamReadReg::casStreamReadReg (casStreamOS &osIn) : fdReg (osIn.getFD(), fdrRead), os (osIn) { -# if defined(DEBUG) - printf ("Read on %d\n", this->os.getFD()); - printf ("Recv backlog %u\n", - this->os.in.bytesPresent()); - printf ("Send backlog %u\n", - this->os.out.bytesPresent()); -# endif + this->os.printStatus ( "read schedualed" ); } // @@ -58,13 +76,7 @@ inline casStreamReadReg::casStreamReadReg (casStreamOS &osIn) : // inline casStreamReadReg::~casStreamReadReg () { -# if defined(DEBUG) - printf ("Read off %d\n", this->os.getFD()); - printf ("Recv backlog %u\n", - this->os.in.bytesPresent()); - printf ("Send backlog %u\n", - this->os.out.bytesPresent()); -# endif + this->os.printStatus ( "read unschedualed" ); } // @@ -89,13 +101,7 @@ private: inline casStreamWriteReg::casStreamWriteReg (casStreamOS &osIn) : fdReg (osIn.getFD(), fdrWrite, true), os (osIn) { -# if defined(DEBUG) - printf ("Write on %d\n", this->os.getFD()); - printf ("Recv backlog %u\n", - this->os.in.bytesPresent()); - printf ("Send backlog %u\n", - this->os.out.bytesPresent()); -# endif + this->os.printStatus ( "write schedualed" ); } // @@ -103,13 +109,7 @@ inline casStreamWriteReg::casStreamWriteReg (casStreamOS &osIn) : // inline casStreamWriteReg::~casStreamWriteReg () { -# if defined(DEBUG) - printf ("Write off %d\n", this->os.getFD()); - printf ("Recv backlog %u\n", - this->os.in.bytesPresent()); - printf ("Send backlog %u\n", - this->os.out.bytesPresent()); -# endif + this->os.printStatus ( "write unschedualed" ); } // @@ -142,8 +142,10 @@ void casStreamEvWakeup::show(unsigned level) const // // casStreamEvWakeup::expire() // -epicsTimerNotify::expireStatus casStreamEvWakeup::expire ( const epicsTime & /* currentTime */ ) +epicsTimerNotify::expireStatus casStreamEvWakeup:: + expire ( const epicsTime & /* currentTime */ ) { + this->os.printStatus ( "casStreamEvWakeup tmr expire" ); casEventSys::processStatus ps = os.eventSysProcess (); if ( ps.nAccepted > 0u ) { this->os.eventFlush (); @@ -172,6 +174,7 @@ epicsTimerNotify::expireStatus casStreamEvWakeup::expire ( const epicsTime & /* // void casStreamEvWakeup::start( casStreamOS & ) { + this->os.printStatus ( "casStreamEvWakeup tmr start" ); // care is needed here because this is called // asynchronously by postEvent // @@ -215,9 +218,11 @@ void casStreamIOWakeup::show ( unsigned level ) const // guarantees that we will not call processInput() // recursively // -epicsTimerNotify::expireStatus casStreamIOWakeup::expire ( const epicsTime & /* currentTime */ ) +epicsTimerNotify::expireStatus casStreamIOWakeup :: + expire ( const epicsTime & /* currentTime */ ) { assert ( this->pOS ); + this->pOS->printStatus ( "casStreamIOWakeup tmr expire" ); casStreamOS & tmpOS = *this->pOS; this->pOS = 0; tmpOS.processInput(); @@ -236,6 +241,7 @@ void casStreamIOWakeup::start ( casStreamOS &os ) this->pOS = &os; this->timer.start ( *this, 0.0 ); } + this->pOS->printStatus ( "casStreamIOWakeup tmr start" ); } // @@ -264,7 +270,7 @@ inline void casStreamOS::disarmRecv () // inline void casStreamOS::armSend() { - if ( this->outBufBytesPresent() == 0u ) { + if ( this->outBytesPresent() == 0u ) { return; } @@ -299,15 +305,15 @@ void casStreamOS::eventSignal() } // -// casStreamOS::eventFlush() +// casStreamOS :: eventFlush() // -void casStreamOS::eventFlush() +void casStreamOS :: eventFlush() { // // if there is nothing pending in the input // queue, then flush the output queue // - if ( this->inBufBytesAvailable() == 0u ) { + if ( _sendNeeded () ) { this->armSend (); } } @@ -319,9 +325,15 @@ casStreamOS::casStreamOS ( caServerI & cas, clientBufMemoryManager & bufMgrIn, const ioArgsToNewStreamIO & ioArgs ) : casStreamIO ( cas, bufMgrIn, ioArgs ), - evWk ( *this ), pWtReg ( 0 ), pRdReg ( 0 ), + evWk ( *this ), + pWtReg ( 0 ), + pRdReg ( 0 ), + _sendBacklogThresh ( osSendBufferSize () / 2u ), sendBlocked ( false ) { + if ( _sendBacklogThresh < MAX_TCP / 2 ) { + _sendBacklogThresh = MAX_TCP / 2; + } this->xSetNonBlocking (); this->armRecv (); } @@ -387,6 +399,8 @@ void casStreamReadReg::callBack () void casStreamOS::recvCB () { assert ( this->pRdReg ); + + printStatus ( "receiving" ); // // copy in new messages @@ -396,7 +410,7 @@ void casStreamOS::recvCB () this->getCAS().destroyClient ( *this ); } else { - casProcCond procCond = this->processInput (); + casProcCond procCond = this->processInput (); if ( procCond == casProcDisconnect ) { this->getCAS().destroyClient ( *this ); } @@ -458,6 +472,8 @@ void casStreamOS::sendCB () { this->disarmSend (); + printStatus ( "writing" ); + // // attempt to flush the output buffer // @@ -508,13 +524,8 @@ void casStreamOS::sendCB () // return; } - -# if defined(DEBUG) - printf ( "write attempted on %d result was %d\n", - this->getFD(), flushCond ); - printf ( "Recv backlog %u\n", this->in.bytesPresent() ); - printf ( "Send backlog %u\n", this->out.bytesPresent() ); -# endif + + printStatus ( ppFlushCondText [ flushCond ] ); // // If we were able to send something then we need @@ -537,8 +548,7 @@ void casStreamOS::sendCB () // additional bytes may have been added since // we flushed the out buffer // - if ( this->outBufBytesPresent() > 0u && - this->inBufBytesAvailable() == 0u ) { + if ( _sendNeeded () ) { this->armSend(); } } @@ -551,27 +561,22 @@ void casStreamOS::sendCB () // // casStreamOS::processInput() // -casProcCond casStreamOS::processInput() // X aCC 361 +casProcCond casStreamOS :: processInput() { - caStatus status; + printStatus ( "req proc" ); -# ifdef DEBUG - printf( - "Resp bytes to send=%d, Req bytes pending %d\n", - this->out.bytesPresent(), - this->in.bytesPresent()); -# endif - - status = this->processMsg(); + caStatus status = this->processMsg(); if (status==S_cas_success || status==S_cas_sendBlocked || status==S_casApp_postponeAsyncIO) { // // if there is nothing pending in the input - // queue, then flush the output queue + // queue, or the output queue size has grown + // to be greater than one half of the os's + // buffer size then flush the output queue // - if ( this->inBufBytesAvailable() == 0u ) { + if ( _sendNeeded () ) { this->armSend (); } this->armRecv (); @@ -585,3 +590,15 @@ casProcCond casStreamOS::processInput() // X aCC 361 } } +// +// casStreamOS :: _sendNeeded () +// +bool casStreamOS :: + _sendNeeded () const +{ + bool sn = this->outBytesPresent() >= this->_sendBacklogThresh; + sn = sn || ( this->inBufBytesAvailable () == 0u ); + return sn; +} + + diff --git a/src/cas/generic/st/casStreamOS.h b/src/cas/generic/st/casStreamOS.h index 91d8c39cd..96f4c1341 100644 --- a/src/cas/generic/st/casStreamOS.h +++ b/src/cas/generic/st/casStreamOS.h @@ -68,11 +68,13 @@ public: void show ( unsigned level ) const; casProcCond processInput (); void eventFlush (); + void printStatus ( const char * pCtx ) const; private: casStreamEvWakeup evWk; casStreamIOWakeup ioWk; class casStreamWriteReg * pWtReg; class casStreamReadReg * pRdReg; + bufSizeT _sendBacklogThresh; bool sendBlocked; void armSend (); void armRecv (); @@ -83,6 +85,7 @@ private: void sendBlockSignal (); void ioBlockedSignal (); void eventSignal (); + bool _sendNeeded () const; casStreamOS ( const casStreamOS & ); casStreamOS & operator = ( const casStreamOS & ); friend class casStreamWriteReg;