diff --git a/src/cas/generic/st/casStreamOS.cc b/src/cas/generic/st/casStreamOS.cc index 7ab2aad7c..45236b06e 100644 --- a/src/cas/generic/st/casStreamOS.cc +++ b/src/cas/generic/st/casStreamOS.cc @@ -10,8 +10,6 @@ // casStreamOS.cc // $Id$ // -// -// // TO DO: // o armRecv() and armSend() should return bad status when // there isnt enough memory @@ -23,6 +21,10 @@ #define epicsExportSharedFunc #include "casStreamOS.h" +#if 0 +#define DEBUG +#endif + // // printStatus () // @@ -34,13 +36,13 @@ void casStreamOS :: printStatus ( const char * pCtx ) const printf ( "%03.3f, " "Sock %d, %s, " - "recv backlog %u, " - "send backlog %u\n", + "RecvBuf %u, " + "SendBuf %u\n", current - beginTime, this->getFD(), pCtx, - this->inBufBytesAvailable (), - this->outBytesPresent () ); + this->inBufBytesPending (), + this->outBufBytesPending () ); fflush ( stdout ); } #else @@ -148,7 +150,23 @@ epicsTimerNotify::expireStatus casStreamEvWakeup:: this->os.printStatus ( "casStreamEvWakeup tmr expire" ); casEventSys::processStatus ps = os.eventSysProcess (); if ( ps.nAccepted > 0u ) { - this->os.eventFlush (); + // We do not wait for any impartial, or complete, + // messages in the input queue to be processed + // because. + // A) IO postponement might be preventing the + // input queue processing from proceeding. + // B) We dont want to interrupt subscription + // updates while waiting for very large arrays + // to be read in a packet at a time. + // C) Since both reads and events get processed + // before going back to select to find out if we + // can do a write then we naturally tend to + // combine get responses and subscription responses + // into one write. + // D) Its probably questionable to hold up event + // traffic (introduce latency) because a partial + // message is pending in the input queue. + this->os.armSend (); } if ( ps.cond != casProcOk ) { // @@ -214,8 +232,10 @@ void casStreamIOWakeup::show ( unsigned level ) const // // casStreamIOWakeup::expire() // +// This is called whenever asynchronous IO completes +// // Running this indirectly off of the timer queue -// guarantees that we will not call processInput() +// guarantees that we will not call processMsg() // recursively // epicsTimerNotify::expireStatus casStreamIOWakeup :: @@ -225,7 +245,29 @@ epicsTimerNotify::expireStatus casStreamIOWakeup :: this->pOS->printStatus ( "casStreamIOWakeup tmr expire" ); casStreamOS & tmpOS = *this->pOS; this->pOS = 0; - tmpOS.processInput(); + caStatus status = tmpOS.processMsg (); + if ( status == S_cas_success ) { + tmpOS.armRecv (); + if ( tmpOS._sendNeeded () ) { + tmpOS.armSend (); + } + } + else if ( status == S_cas_sendBlocked ) { + tmpOS.armSend (); + } + else if ( status == S_casApp_postponeAsyncIO ) { + tmpOS.armSend (); + } + else { + errMessage ( status, + "- unexpected problem with client's input - forcing disconnect"); + tmpOS.getCAS().destroyClient ( tmpOS ); + // + // must _not_ touch "tmpOS" ref + // after the destroy + // + return noRestart; + } return noRestart; } @@ -270,7 +312,7 @@ inline void casStreamOS::disarmRecv () // inline void casStreamOS::armSend() { - if ( this->outBytesPresent() == 0u ) { + if ( this->outBufBytesPending() == 0u ) { return; } @@ -304,20 +346,6 @@ void casStreamOS::eventSignal() this->evWk.start ( *this ); } -// -// casStreamOS :: eventFlush() -// -void casStreamOS :: eventFlush() -{ - // - // if there is nothing pending in the input - // queue, then flush the output queue - // - if ( _sendNeeded () ) { - this->armSend (); - } -} - // // casStreamOS::casStreamOS() // @@ -328,8 +356,7 @@ casStreamOS::casStreamOS ( evWk ( *this ), pWtReg ( 0 ), pRdReg ( 0 ), - _sendBacklogThresh ( osSendBufferSize () / 2u ), - sendBlocked ( false ) + _sendBacklogThresh ( osSendBufferSize () / 2u ) { if ( _sendBacklogThresh < MAX_TCP / 2 ) { _sendBacklogThresh = MAX_TCP / 2; @@ -360,7 +387,6 @@ void casStreamOS::show ( unsigned level ) const this->casStrmClient::show ( level ); printf ( "casStreamOS at %p\n", static_cast ( this ) ); - printf ( "\tsendBlocked = %d\n", this->sendBlocked ); if ( this->pWtReg ) { this->pWtReg->show ( level ); } @@ -396,7 +422,7 @@ void casStreamReadReg::callBack () // // casStreamOS::recvCB() // -void casStreamOS::recvCB () +void casStreamOS :: recvCB () { assert ( this->pRdReg ); @@ -408,38 +434,48 @@ void casStreamOS::recvCB () inBufClient::fillCondition fillCond = this->inBufFill (); if ( fillCond == casFillDisconnect ) { this->getCAS().destroyClient ( *this ); + // + // must _not_ touch "this" pointer + // after the destroy + // + return; } - else { - casProcCond procCond = this->processInput (); - if ( procCond == casProcDisconnect ) { - this->getCAS().destroyClient ( *this ); - } - else if ( this->inBufFull() ) { - // - // If there isnt any space then temporarily - // stop calling this routine until problem is resolved - // either by: - // (1) sending or - // (2) a blocked IO op unblocks - // - // (casStreamReadReg is _not_ a onceOnly fdReg - - // therefore an explicit delete is required here) - // - this->disarmRecv (); // this deletes the casStreamReadReg object - } + else if ( fillCond == casFillNone ) { + this->disarmRecv (); + } + else { + printStatus ( "recv CB req proc" ); + caStatus status = this->processMsg (); + if ( status == S_cas_success ) { + this->armRecv (); + if ( _sendNeeded () ) { + this->armSend (); + } + } + else if ( status == S_cas_sendBlocked ) { + this->armSend (); + } + else if ( status == S_casApp_postponeAsyncIO ) { + this->armSend (); + } + else { + errMessage ( status, + "- unexpected problem with client's input - forcing disconnect"); + this->getCAS().destroyClient ( *this ); + // + // must _not_ touch "this" pointer + // after the destroy + // + return; + } } - // - // NO CODE HERE - // (see delete above) - // } // -// casStreamOS::sendBlockSignal() +// casStreamOS :: sendBlockSignal() // -void casStreamOS::sendBlockSignal () +void casStreamOS :: sendBlockSignal () { - this->sendBlocked = true; this->armSend (); } @@ -470,6 +506,12 @@ void casStreamWriteReg::callBack() // void casStreamOS::sendCB () { + // we know that the fdManager will destroy the write + // registration after this function returns, and that + // it is robust in situations where the callback + // deletes its fdReg derived object so delete it now, + // because we can now reschedule a send as needed + // this->disarmSend (); printStatus ( "writing" ); @@ -478,12 +520,7 @@ void casStreamOS::sendCB () // attempt to flush the output buffer // outBufClient::flushCondition flushCond = this->flush (); - if ( flushCond == flushProgress ) { - if ( this->sendBlocked ) { - this->sendBlocked = false; - } - } - else if ( flushCond == outBufClient::flushDisconnect ) { + if ( flushCond == outBufClient::flushDisconnect ) { // // ok to delete the client here // because casStreamWriteReg::callBack() @@ -532,72 +569,48 @@ void casStreamOS::sendCB () // to process the input queue in case we were send // blocked. // - casProcCond procCond = this->processInput (); - if ( procCond == casProcDisconnect ) { - this->getCAS().destroyClient ( *this ); - } - else { - // - // if anything is left in the send buffer that - // still needs to be sent and there are not - // requests pending in the input buffer then - // keep sending the output buffer until it is - // empty - // - // do not test for this with flushCond since - // additional bytes may have been added since - // we flushed the out buffer - // - if ( _sendNeeded () ) { - this->armSend(); - } - } - // - // NO CODE HERE - // (see deletes above) - // + + bufSizeT inBufBytesPend = this->inBufBytesPending (); + if ( flushCond == flushProgress && inBufBytesPend ) { + printStatus ( "send CB req proc" ); + caStatus status = this->processMsg (); + if ( status == S_cas_success ) { + this->armRecv (); + } + else if ( status == S_cas_sendBlocked + || status == S_casApp_postponeAsyncIO ) { + bufSizeT inBufBytesPendNew = this->inBufBytesPending (); + if ( inBufBytesPendNew < inBufBytesPend ) { + this->armRecv (); + } + } + else { + errMessage ( status, + "- unexpected problem with client's input - forcing disconnect"); + this->getCAS().destroyClient ( *this ); + // + // must _not_ touch "this" pointer + // after the destroy + // + return; + } + } + + // Once a send starts we will keep going with it until + // it flushes all of the way out. Its important to + // perform this step only after processMsg so that we + // flush out any send blocks detected by processMsg. + this->armSend (); } // -// casStreamOS::processInput() -// -casProcCond casStreamOS :: processInput() -{ - printStatus ( "req proc" ); - - caStatus status = this->processMsg(); - if (status==S_cas_success || - status==S_cas_sendBlocked || - status==S_casApp_postponeAsyncIO) { - - // - // if there is nothing pending in the input - // queue, or the output queue size has grown - // to be greater than one half of the os's - // buffer size then flush the output queue - // - if ( _sendNeeded () ) { - this->armSend (); - } - this->armRecv (); - - return casProcOk; - } - else { - errMessage ( status, - "- unexpected problem with client's input - forcing disconnect"); - return casProcDisconnect; - } -} - -// -// casStreamOS :: _sendNeeded () +// casStreamOS :: sendNeeded () // bool casStreamOS :: _sendNeeded () const { - bool sn = this->outBytesPresent() >= this->_sendBacklogThresh; - sn = sn || ( this->inBufBytesAvailable () == 0u ); + bool sn = this->outBufBytesPending() >= this->_sendBacklogThresh; + sn = sn || ( this->inBufBytesPending () == 0u ); return sn; }