o when processing subscription update events, always activate the send
independent of whether receive bytes are pending or not. This improves event latency, and allows events to flow if we are blocked due to the service postponing an IO operation. This was a bug fix. o when finishing processing input activate a send if more than one half of the TCP buffering would be used. In the past the send was not activated until either the send buffer was full or the receive buffer (including any bytes waiting in the socket) was empty. This could cause latency and performance issues because send receive piplining would not occur if they have very large buffers due to a large EPICS_CA_MAX_ARRAY_SIZE. o always activate a send if we enter a service is postponing IO state. Users would probably prefer to receive past responses and not wait until the postponed IO initiates. o casStrmClient::inBufBytesAvailable renamed to casStrmClient:: inBufBytesPending reflecting the fact that we nlonger use a socket ioctl to check how many bytes are pending in the sockets input queue o casStrmClient::outBufBytesPresent renamed to casStrmClient::outBufBytesPending for consistency with inBufBytesPending o removed the eventFlush function o removed sendBlocked flag o call processMessage directly from the IO completion callback that restarts when in an IO postponement state. This makes the logic easier to understand and maintain, but isnt a functional change o call processMessage directly from the receive callback. This makes the logic easier to understand and maintain, but isnt a functional change o call processMessage directly from the send callback. This makes the logic easier to understand and maintain, but isnt a functional change
This commit is contained in:
@@ -10,8 +10,6 @@
|
||||
// casStreamOS.cc
|
||||
// $Id$
|
||||
//
|
||||
//
|
||||
//
|
||||
// TO DO:
|
||||
// o armRecv() and armSend() should return bad status when
|
||||
// there isnt enough memory
|
||||
@@ -23,6 +21,10 @@
|
||||
#define epicsExportSharedFunc
|
||||
#include "casStreamOS.h"
|
||||
|
||||
#if 0
|
||||
#define DEBUG
|
||||
#endif
|
||||
|
||||
//
|
||||
// printStatus ()
|
||||
//
|
||||
@@ -34,13 +36,13 @@ void casStreamOS :: printStatus ( const char * pCtx ) const
|
||||
printf (
|
||||
"%03.3f, "
|
||||
"Sock %d, %s, "
|
||||
"recv backlog %u, "
|
||||
"send backlog %u\n",
|
||||
"RecvBuf %u, "
|
||||
"SendBuf %u\n",
|
||||
current - beginTime,
|
||||
this->getFD(),
|
||||
pCtx,
|
||||
this->inBufBytesAvailable (),
|
||||
this->outBytesPresent () );
|
||||
this->inBufBytesPending (),
|
||||
this->outBufBytesPending () );
|
||||
fflush ( stdout );
|
||||
}
|
||||
#else
|
||||
@@ -148,7 +150,23 @@ epicsTimerNotify::expireStatus casStreamEvWakeup::
|
||||
this->os.printStatus ( "casStreamEvWakeup tmr expire" );
|
||||
casEventSys::processStatus ps = os.eventSysProcess ();
|
||||
if ( ps.nAccepted > 0u ) {
|
||||
this->os.eventFlush ();
|
||||
// We do not wait for any impartial, or complete,
|
||||
// messages in the input queue to be processed
|
||||
// because.
|
||||
// A) IO postponement might be preventing the
|
||||
// input queue processing from proceeding.
|
||||
// B) We dont want to interrupt subscription
|
||||
// updates while waiting for very large arrays
|
||||
// to be read in a packet at a time.
|
||||
// C) Since both reads and events get processed
|
||||
// before going back to select to find out if we
|
||||
// can do a write then we naturally tend to
|
||||
// combine get responses and subscription responses
|
||||
// into one write.
|
||||
// D) Its probably questionable to hold up event
|
||||
// traffic (introduce latency) because a partial
|
||||
// message is pending in the input queue.
|
||||
this->os.armSend ();
|
||||
}
|
||||
if ( ps.cond != casProcOk ) {
|
||||
//
|
||||
@@ -214,8 +232,10 @@ void casStreamIOWakeup::show ( unsigned level ) const
|
||||
//
|
||||
// casStreamIOWakeup::expire()
|
||||
//
|
||||
// This is called whenever asynchronous IO completes
|
||||
//
|
||||
// Running this indirectly off of the timer queue
|
||||
// guarantees that we will not call processInput()
|
||||
// guarantees that we will not call processMsg()
|
||||
// recursively
|
||||
//
|
||||
epicsTimerNotify::expireStatus casStreamIOWakeup ::
|
||||
@@ -225,7 +245,29 @@ epicsTimerNotify::expireStatus casStreamIOWakeup ::
|
||||
this->pOS->printStatus ( "casStreamIOWakeup tmr expire" );
|
||||
casStreamOS & tmpOS = *this->pOS;
|
||||
this->pOS = 0;
|
||||
tmpOS.processInput();
|
||||
caStatus status = tmpOS.processMsg ();
|
||||
if ( status == S_cas_success ) {
|
||||
tmpOS.armRecv ();
|
||||
if ( tmpOS._sendNeeded () ) {
|
||||
tmpOS.armSend ();
|
||||
}
|
||||
}
|
||||
else if ( status == S_cas_sendBlocked ) {
|
||||
tmpOS.armSend ();
|
||||
}
|
||||
else if ( status == S_casApp_postponeAsyncIO ) {
|
||||
tmpOS.armSend ();
|
||||
}
|
||||
else {
|
||||
errMessage ( status,
|
||||
"- unexpected problem with client's input - forcing disconnect");
|
||||
tmpOS.getCAS().destroyClient ( tmpOS );
|
||||
//
|
||||
// must _not_ touch "tmpOS" ref
|
||||
// after the destroy
|
||||
//
|
||||
return noRestart;
|
||||
}
|
||||
return noRestart;
|
||||
}
|
||||
|
||||
@@ -270,7 +312,7 @@ inline void casStreamOS::disarmRecv ()
|
||||
//
|
||||
inline void casStreamOS::armSend()
|
||||
{
|
||||
if ( this->outBytesPresent() == 0u ) {
|
||||
if ( this->outBufBytesPending() == 0u ) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -304,20 +346,6 @@ void casStreamOS::eventSignal()
|
||||
this->evWk.start ( *this );
|
||||
}
|
||||
|
||||
//
|
||||
// casStreamOS :: eventFlush()
|
||||
//
|
||||
void casStreamOS :: eventFlush()
|
||||
{
|
||||
//
|
||||
// if there is nothing pending in the input
|
||||
// queue, then flush the output queue
|
||||
//
|
||||
if ( _sendNeeded () ) {
|
||||
this->armSend ();
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// casStreamOS::casStreamOS()
|
||||
//
|
||||
@@ -328,8 +356,7 @@ casStreamOS::casStreamOS (
|
||||
evWk ( *this ),
|
||||
pWtReg ( 0 ),
|
||||
pRdReg ( 0 ),
|
||||
_sendBacklogThresh ( osSendBufferSize () / 2u ),
|
||||
sendBlocked ( false )
|
||||
_sendBacklogThresh ( osSendBufferSize () / 2u )
|
||||
{
|
||||
if ( _sendBacklogThresh < MAX_TCP / 2 ) {
|
||||
_sendBacklogThresh = MAX_TCP / 2;
|
||||
@@ -360,7 +387,6 @@ void casStreamOS::show ( unsigned level ) const
|
||||
this->casStrmClient::show ( level );
|
||||
printf ( "casStreamOS at %p\n",
|
||||
static_cast <const void *> ( this ) );
|
||||
printf ( "\tsendBlocked = %d\n", this->sendBlocked );
|
||||
if ( this->pWtReg ) {
|
||||
this->pWtReg->show ( level );
|
||||
}
|
||||
@@ -396,7 +422,7 @@ void casStreamReadReg::callBack ()
|
||||
//
|
||||
// casStreamOS::recvCB()
|
||||
//
|
||||
void casStreamOS::recvCB ()
|
||||
void casStreamOS :: recvCB ()
|
||||
{
|
||||
assert ( this->pRdReg );
|
||||
|
||||
@@ -408,38 +434,48 @@ void casStreamOS::recvCB ()
|
||||
inBufClient::fillCondition fillCond = this->inBufFill ();
|
||||
if ( fillCond == casFillDisconnect ) {
|
||||
this->getCAS().destroyClient ( *this );
|
||||
//
|
||||
// must _not_ touch "this" pointer
|
||||
// after the destroy
|
||||
//
|
||||
return;
|
||||
}
|
||||
else {
|
||||
casProcCond procCond = this->processInput ();
|
||||
if ( procCond == casProcDisconnect ) {
|
||||
this->getCAS().destroyClient ( *this );
|
||||
}
|
||||
else if ( this->inBufFull() ) {
|
||||
//
|
||||
// If there isnt any space then temporarily
|
||||
// stop calling this routine until problem is resolved
|
||||
// either by:
|
||||
// (1) sending or
|
||||
// (2) a blocked IO op unblocks
|
||||
//
|
||||
// (casStreamReadReg is _not_ a onceOnly fdReg -
|
||||
// therefore an explicit delete is required here)
|
||||
//
|
||||
this->disarmRecv (); // this deletes the casStreamReadReg object
|
||||
}
|
||||
else if ( fillCond == casFillNone ) {
|
||||
this->disarmRecv ();
|
||||
}
|
||||
else {
|
||||
printStatus ( "recv CB req proc" );
|
||||
caStatus status = this->processMsg ();
|
||||
if ( status == S_cas_success ) {
|
||||
this->armRecv ();
|
||||
if ( _sendNeeded () ) {
|
||||
this->armSend ();
|
||||
}
|
||||
}
|
||||
else if ( status == S_cas_sendBlocked ) {
|
||||
this->armSend ();
|
||||
}
|
||||
else if ( status == S_casApp_postponeAsyncIO ) {
|
||||
this->armSend ();
|
||||
}
|
||||
else {
|
||||
errMessage ( status,
|
||||
"- unexpected problem with client's input - forcing disconnect");
|
||||
this->getCAS().destroyClient ( *this );
|
||||
//
|
||||
// must _not_ touch "this" pointer
|
||||
// after the destroy
|
||||
//
|
||||
return;
|
||||
}
|
||||
}
|
||||
//
|
||||
// NO CODE HERE
|
||||
// (see delete above)
|
||||
//
|
||||
}
|
||||
|
||||
//
|
||||
// casStreamOS::sendBlockSignal()
|
||||
// casStreamOS :: sendBlockSignal()
|
||||
//
|
||||
void casStreamOS::sendBlockSignal ()
|
||||
void casStreamOS :: sendBlockSignal ()
|
||||
{
|
||||
this->sendBlocked = true;
|
||||
this->armSend ();
|
||||
}
|
||||
|
||||
@@ -470,6 +506,12 @@ void casStreamWriteReg::callBack()
|
||||
//
|
||||
void casStreamOS::sendCB ()
|
||||
{
|
||||
// we know that the fdManager will destroy the write
|
||||
// registration after this function returns, and that
|
||||
// it is robust in situations where the callback
|
||||
// deletes its fdReg derived object so delete it now,
|
||||
// because we can now reschedule a send as needed
|
||||
//
|
||||
this->disarmSend ();
|
||||
|
||||
printStatus ( "writing" );
|
||||
@@ -478,12 +520,7 @@ void casStreamOS::sendCB ()
|
||||
// attempt to flush the output buffer
|
||||
//
|
||||
outBufClient::flushCondition flushCond = this->flush ();
|
||||
if ( flushCond == flushProgress ) {
|
||||
if ( this->sendBlocked ) {
|
||||
this->sendBlocked = false;
|
||||
}
|
||||
}
|
||||
else if ( flushCond == outBufClient::flushDisconnect ) {
|
||||
if ( flushCond == outBufClient::flushDisconnect ) {
|
||||
//
|
||||
// ok to delete the client here
|
||||
// because casStreamWriteReg::callBack()
|
||||
@@ -532,72 +569,48 @@ void casStreamOS::sendCB ()
|
||||
// to process the input queue in case we were send
|
||||
// blocked.
|
||||
//
|
||||
casProcCond procCond = this->processInput ();
|
||||
if ( procCond == casProcDisconnect ) {
|
||||
this->getCAS().destroyClient ( *this );
|
||||
}
|
||||
else {
|
||||
//
|
||||
// if anything is left in the send buffer that
|
||||
// still needs to be sent and there are not
|
||||
// requests pending in the input buffer then
|
||||
// keep sending the output buffer until it is
|
||||
// empty
|
||||
//
|
||||
// do not test for this with flushCond since
|
||||
// additional bytes may have been added since
|
||||
// we flushed the out buffer
|
||||
//
|
||||
if ( _sendNeeded () ) {
|
||||
this->armSend();
|
||||
}
|
||||
}
|
||||
//
|
||||
// NO CODE HERE
|
||||
// (see deletes above)
|
||||
//
|
||||
|
||||
bufSizeT inBufBytesPend = this->inBufBytesPending ();
|
||||
if ( flushCond == flushProgress && inBufBytesPend ) {
|
||||
printStatus ( "send CB req proc" );
|
||||
caStatus status = this->processMsg ();
|
||||
if ( status == S_cas_success ) {
|
||||
this->armRecv ();
|
||||
}
|
||||
else if ( status == S_cas_sendBlocked
|
||||
|| status == S_casApp_postponeAsyncIO ) {
|
||||
bufSizeT inBufBytesPendNew = this->inBufBytesPending ();
|
||||
if ( inBufBytesPendNew < inBufBytesPend ) {
|
||||
this->armRecv ();
|
||||
}
|
||||
}
|
||||
else {
|
||||
errMessage ( status,
|
||||
"- unexpected problem with client's input - forcing disconnect");
|
||||
this->getCAS().destroyClient ( *this );
|
||||
//
|
||||
// must _not_ touch "this" pointer
|
||||
// after the destroy
|
||||
//
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Once a send starts we will keep going with it until
|
||||
// it flushes all of the way out. Its important to
|
||||
// perform this step only after processMsg so that we
|
||||
// flush out any send blocks detected by processMsg.
|
||||
this->armSend ();
|
||||
}
|
||||
|
||||
//
|
||||
// casStreamOS::processInput()
|
||||
//
|
||||
casProcCond casStreamOS :: processInput()
|
||||
{
|
||||
printStatus ( "req proc" );
|
||||
|
||||
caStatus status = this->processMsg();
|
||||
if (status==S_cas_success ||
|
||||
status==S_cas_sendBlocked ||
|
||||
status==S_casApp_postponeAsyncIO) {
|
||||
|
||||
//
|
||||
// if there is nothing pending in the input
|
||||
// queue, or the output queue size has grown
|
||||
// to be greater than one half of the os's
|
||||
// buffer size then flush the output queue
|
||||
//
|
||||
if ( _sendNeeded () ) {
|
||||
this->armSend ();
|
||||
}
|
||||
this->armRecv ();
|
||||
|
||||
return casProcOk;
|
||||
}
|
||||
else {
|
||||
errMessage ( status,
|
||||
"- unexpected problem with client's input - forcing disconnect");
|
||||
return casProcDisconnect;
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// casStreamOS :: _sendNeeded ()
|
||||
// casStreamOS :: sendNeeded ()
|
||||
//
|
||||
bool casStreamOS ::
|
||||
_sendNeeded () const
|
||||
{
|
||||
bool sn = this->outBytesPresent() >= this->_sendBacklogThresh;
|
||||
sn = sn || ( this->inBufBytesAvailable () == 0u );
|
||||
bool sn = this->outBufBytesPending() >= this->_sendBacklogThresh;
|
||||
sn = sn || ( this->inBufBytesPending () == 0u );
|
||||
return sn;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user