improved diagnostics

This commit is contained in:
Jeff Hill
2009-07-25 00:44:21 +00:00
parent 6eb25148c5
commit 1ba658b452
2 changed files with 77 additions and 57 deletions

View File

@@ -23,6 +23,30 @@
#define epicsExportSharedFunc
#include "casStreamOS.h"
//
// printStatus ()
//
#if defined(DEBUG)
void casStreamOS :: printStatus ( const char * pCtx ) const
{
static epicsTime beginTime = epicsTime :: getCurrent ();
epicsTime current = epicsTime :: getCurrent ();
printf (
"%03.3f, "
"Sock %d, %s, "
"recv backlog %u, "
"send backlog %u\n",
current - beginTime,
this->getFD(),
pCtx,
this->inBufBytesAvailable (),
this->outBytesPresent () );
fflush ( stdout );
}
#else
inline void casStreamOS :: printStatus ( const char * ) const {}
#endif
//
// casStreamReadReg
//
@@ -44,13 +68,7 @@ private:
inline casStreamReadReg::casStreamReadReg (casStreamOS &osIn) :
fdReg (osIn.getFD(), fdrRead), os (osIn)
{
# if defined(DEBUG)
printf ("Read on %d\n", this->os.getFD());
printf ("Recv backlog %u\n",
this->os.in.bytesPresent());
printf ("Send backlog %u\n",
this->os.out.bytesPresent());
# endif
this->os.printStatus ( "read schedualed" );
}
//
@@ -58,13 +76,7 @@ inline casStreamReadReg::casStreamReadReg (casStreamOS &osIn) :
//
inline casStreamReadReg::~casStreamReadReg ()
{
# if defined(DEBUG)
printf ("Read off %d\n", this->os.getFD());
printf ("Recv backlog %u\n",
this->os.in.bytesPresent());
printf ("Send backlog %u\n",
this->os.out.bytesPresent());
# endif
this->os.printStatus ( "read unschedualed" );
}
//
@@ -89,13 +101,7 @@ private:
inline casStreamWriteReg::casStreamWriteReg (casStreamOS &osIn) :
fdReg (osIn.getFD(), fdrWrite, true), os (osIn)
{
# if defined(DEBUG)
printf ("Write on %d\n", this->os.getFD());
printf ("Recv backlog %u\n",
this->os.in.bytesPresent());
printf ("Send backlog %u\n",
this->os.out.bytesPresent());
# endif
this->os.printStatus ( "write schedualed" );
}
//
@@ -103,13 +109,7 @@ inline casStreamWriteReg::casStreamWriteReg (casStreamOS &osIn) :
//
inline casStreamWriteReg::~casStreamWriteReg ()
{
# if defined(DEBUG)
printf ("Write off %d\n", this->os.getFD());
printf ("Recv backlog %u\n",
this->os.in.bytesPresent());
printf ("Send backlog %u\n",
this->os.out.bytesPresent());
# endif
this->os.printStatus ( "write unschedualed" );
}
//
@@ -142,8 +142,10 @@ void casStreamEvWakeup::show(unsigned level) const
//
// casStreamEvWakeup::expire()
//
epicsTimerNotify::expireStatus casStreamEvWakeup::expire ( const epicsTime & /* currentTime */ )
epicsTimerNotify::expireStatus casStreamEvWakeup::
expire ( const epicsTime & /* currentTime */ )
{
this->os.printStatus ( "casStreamEvWakeup tmr expire" );
casEventSys::processStatus ps = os.eventSysProcess ();
if ( ps.nAccepted > 0u ) {
this->os.eventFlush ();
@@ -172,6 +174,7 @@ epicsTimerNotify::expireStatus casStreamEvWakeup::expire ( const epicsTime & /*
//
void casStreamEvWakeup::start( casStreamOS & )
{
this->os.printStatus ( "casStreamEvWakeup tmr start" );
// care is needed here because this is called
// asynchronously by postEvent
//
@@ -215,9 +218,11 @@ void casStreamIOWakeup::show ( unsigned level ) const
// guarantees that we will not call processInput()
// recursively
//
epicsTimerNotify::expireStatus casStreamIOWakeup::expire ( const epicsTime & /* currentTime */ )
epicsTimerNotify::expireStatus casStreamIOWakeup ::
expire ( const epicsTime & /* currentTime */ )
{
assert ( this->pOS );
this->pOS->printStatus ( "casStreamIOWakeup tmr expire" );
casStreamOS & tmpOS = *this->pOS;
this->pOS = 0;
tmpOS.processInput();
@@ -236,6 +241,7 @@ void casStreamIOWakeup::start ( casStreamOS &os )
this->pOS = &os;
this->timer.start ( *this, 0.0 );
}
this->pOS->printStatus ( "casStreamIOWakeup tmr start" );
}
//
@@ -264,7 +270,7 @@ inline void casStreamOS::disarmRecv ()
//
inline void casStreamOS::armSend()
{
if ( this->outBufBytesPresent() == 0u ) {
if ( this->outBytesPresent() == 0u ) {
return;
}
@@ -299,15 +305,15 @@ void casStreamOS::eventSignal()
}
//
// casStreamOS::eventFlush()
// casStreamOS :: eventFlush()
//
void casStreamOS::eventFlush()
void casStreamOS :: eventFlush()
{
//
// if there is nothing pending in the input
// queue, then flush the output queue
//
if ( this->inBufBytesAvailable() == 0u ) {
if ( _sendNeeded () ) {
this->armSend ();
}
}
@@ -319,9 +325,15 @@ casStreamOS::casStreamOS (
caServerI & cas, clientBufMemoryManager & bufMgrIn,
const ioArgsToNewStreamIO & ioArgs ) :
casStreamIO ( cas, bufMgrIn, ioArgs ),
evWk ( *this ), pWtReg ( 0 ), pRdReg ( 0 ),
evWk ( *this ),
pWtReg ( 0 ),
pRdReg ( 0 ),
_sendBacklogThresh ( osSendBufferSize () / 2u ),
sendBlocked ( false )
{
if ( _sendBacklogThresh < MAX_TCP / 2 ) {
_sendBacklogThresh = MAX_TCP / 2;
}
this->xSetNonBlocking ();
this->armRecv ();
}
@@ -387,6 +399,8 @@ void casStreamReadReg::callBack ()
void casStreamOS::recvCB ()
{
assert ( this->pRdReg );
printStatus ( "receiving" );
//
// copy in new messages
@@ -396,7 +410,7 @@ void casStreamOS::recvCB ()
this->getCAS().destroyClient ( *this );
}
else {
casProcCond procCond = this->processInput ();
casProcCond procCond = this->processInput ();
if ( procCond == casProcDisconnect ) {
this->getCAS().destroyClient ( *this );
}
@@ -458,6 +472,8 @@ void casStreamOS::sendCB ()
{
this->disarmSend ();
printStatus ( "writing" );
//
// attempt to flush the output buffer
//
@@ -508,13 +524,8 @@ void casStreamOS::sendCB ()
//
return;
}
# if defined(DEBUG)
printf ( "write attempted on %d result was %d\n",
this->getFD(), flushCond );
printf ( "Recv backlog %u\n", this->in.bytesPresent() );
printf ( "Send backlog %u\n", this->out.bytesPresent() );
# endif
printStatus ( ppFlushCondText [ flushCond ] );
//
// If we were able to send something then we need
@@ -537,8 +548,7 @@ void casStreamOS::sendCB ()
// additional bytes may have been added since
// we flushed the out buffer
//
if ( this->outBufBytesPresent() > 0u &&
this->inBufBytesAvailable() == 0u ) {
if ( _sendNeeded () ) {
this->armSend();
}
}
@@ -551,27 +561,22 @@ void casStreamOS::sendCB ()
//
// casStreamOS::processInput()
//
casProcCond casStreamOS::processInput() // X aCC 361
casProcCond casStreamOS :: processInput()
{
caStatus status;
printStatus ( "req proc" );
# ifdef DEBUG
printf(
"Resp bytes to send=%d, Req bytes pending %d\n",
this->out.bytesPresent(),
this->in.bytesPresent());
# endif
status = this->processMsg();
caStatus status = this->processMsg();
if (status==S_cas_success ||
status==S_cas_sendBlocked ||
status==S_casApp_postponeAsyncIO) {
//
// if there is nothing pending in the input
// queue, then flush the output queue
// queue, or the output queue size has grown
// to be greater than one half of the os's
// buffer size then flush the output queue
//
if ( this->inBufBytesAvailable() == 0u ) {
if ( _sendNeeded () ) {
this->armSend ();
}
this->armRecv ();
@@ -585,3 +590,15 @@ casProcCond casStreamOS::processInput() // X aCC 361
}
}
//
// casStreamOS :: _sendNeeded ()
//
bool casStreamOS ::
_sendNeeded () const
{
bool sn = this->outBytesPresent() >= this->_sendBacklogThresh;
sn = sn || ( this->inBufBytesAvailable () == 0u );
return sn;
}

View File

@@ -68,11 +68,13 @@ public:
void show ( unsigned level ) const;
casProcCond processInput ();
void eventFlush ();
void printStatus ( const char * pCtx ) const;
private:
casStreamEvWakeup evWk;
casStreamIOWakeup ioWk;
class casStreamWriteReg * pWtReg;
class casStreamReadReg * pRdReg;
bufSizeT _sendBacklogThresh;
bool sendBlocked;
void armSend ();
void armRecv ();
@@ -83,6 +85,7 @@ private:
void sendBlockSignal ();
void ioBlockedSignal ();
void eventSignal ();
bool _sendNeeded () const;
casStreamOS ( const casStreamOS & );
casStreamOS & operator = ( const casStreamOS & );
friend class casStreamWriteReg;