fixed problem where send was not always rearmed if this

was indirectly necessary in the send callback because
in this callback the code considered sends to be still armed
until the send callback completed
This commit is contained in:
Jeff Hill
1998-10-23 00:27:15 +00:00
parent 624f3e2284
commit 36b70e243f
5 changed files with 96 additions and 56 deletions

View File

@@ -29,7 +29,6 @@ private:
void callBack ();
};
//
// casDGIntfOS::casDGIntfOS()
//
@@ -39,7 +38,6 @@ casDGIntfOS::casDGIntfOS(casDGClient &clientIn) :
{
}
//
// casDGIntfOS::~casDGIntfOS()
//
@@ -61,15 +59,16 @@ void casDGIntfOS::show(unsigned level) const
}
}
/*
* casDGIntfOS::start()
*/
caStatus casDGIntfOS::start()
{
this->pRdReg = new casDGReadReg (*this);
if (!this->pRdReg) {
return S_cas_noMemory;
if (this->pRdReg==NULL) {
this->pRdReg = new casDGReadReg (*this);
if (this->pRdReg==NULL) {
return S_cas_noMemory;
}
}
return S_cas_success;
}
@@ -79,8 +78,16 @@ caStatus casDGIntfOS::start()
//
void casDGReadReg::callBack()
{
assert (os.pRdReg);
os.processDG();
this->os.recvCB();
}
//
// casDGIntfOS::recvCB()
//
void casDGIntfOS::recvCB()
{
assert (this->pRdReg);
this->processDG();
}
//
@@ -88,7 +95,6 @@ void casDGReadReg::callBack()
//
casDGReadReg::~casDGReadReg()
{
this->os.pRdReg = NULL;
}
//

View File

@@ -6,6 +6,9 @@
*
*
* $Log$
* Revision 1.5 1998/07/08 15:38:10 jhill
* fixed lost monitors during flow control problem
*
* Revision 1.4 1997/08/05 00:47:19 jhill
* fixed warnings
*
@@ -99,6 +102,14 @@ void casDGEvWakeup::expire()
}
}
//
// casDGOS::sendBlockSignal()
// (not inline because its virtual)
//
void casDGOS::sendBlockSignal()
{
}
//
// casDGOS::eventSignal()
//

View File

@@ -44,9 +44,11 @@ caStatus casIntfOS::init(const caNetAddr &addrIn, casDGClient &dgClientIn,
this->setNonBlocking();
this->pRdReg = new casServerReg(*this);
if (!this->pRdReg) {
return S_cas_noMemory;
if (this->pRdReg==NULL) {
this->pRdReg = new casServerReg(*this);
if (this->pRdReg==NULL) {
return S_cas_noMemory;
}
}
return S_cas_success;
@@ -86,6 +88,5 @@ void casServerReg::callBack()
//
casServerReg::~casServerReg()
{
this->os.pRdReg = NULL;
}

View File

@@ -48,7 +48,6 @@ class casDGReadReg;
// casDGIntfOS
//
class casDGIntfOS : public casDGIntfIO {
friend class casDGReadReg;
public:
casDGIntfOS(casDGClient &client);
virtual ~casDGIntfOS();
@@ -94,8 +93,6 @@ class casStreamIOWakeup;
// casStreamOS
//
class casStreamOS : public casStreamIO {
friend class casStreamReadReg;
friend class casStreamWriteReg;
friend class casStreamEvWakeup;
friend class casStreamIOWakeup;
public:
@@ -151,7 +148,7 @@ public:
//
casProcCond processInput();
void sendBlockSignal() {}
void sendBlockSignal();
void eventSignal();
void eventFlush();

View File

@@ -4,6 +4,10 @@
//
//
// $Log$
// Revision 1.9 1998/09/24 20:50:50 jhill
// subtle changes which relate to not flushing the output buffer if there are bytes
// pending in the input queue (improves performance)
//
// Revision 1.8 1998/05/05 16:29:58 jhill
// fixed warnings
//
@@ -80,7 +84,6 @@ inline casStreamReadReg::casStreamReadReg (casStreamOS &osIn) :
//
inline casStreamReadReg::~casStreamReadReg ()
{
this->os.pRdReg = NULL;
# if defined(DEBUG)
printf ("Read off %d\n", this->os.getFD());
printf ("Recv backlog %u\n",
@@ -125,7 +128,6 @@ inline casStreamWriteReg::casStreamWriteReg (casStreamOS &osIn) :
//
inline casStreamWriteReg::~casStreamWriteReg ()
{
this->os.pWtReg = NULL;
# if defined(DEBUG)
printf ("Write off %d\n", this->os.getFD());
printf ("Recv backlog %u\n",
@@ -254,14 +256,14 @@ void casStreamIOWakeup::show(unsigned level) const
//
inline void casStreamOS::armRecv()
{
if (!this->pRdReg) {
if (!this->pRdReg) {
if (this->inBuf::full()!=aitTrue) {
this->pRdReg = new casStreamReadReg(*this);
if (!this->pRdReg) {
errMessage(S_cas_noMemory, "armRecv()");
}
}
}
}
}
//
@@ -290,8 +292,9 @@ void casStreamIOWakeup::expire()
//
inline void casStreamOS::disarmRecv()
{
if (this->pRdReg) {
if (this->pRdReg) {
delete this->pRdReg;
this->pRdReg = NULL;
}
}
@@ -319,6 +322,7 @@ inline void casStreamOS::disarmSend ()
{
if (this->pWtReg) {
delete this->pWtReg;
this->pWtReg = NULL;
}
}
@@ -438,6 +442,9 @@ void casStreamOS::show(unsigned level) const
if (this->pEvWk) {
this->pEvWk->show(level);
}
if (this->pIOWk) {
this->pIOWk->show(level);
}
}
@@ -464,22 +471,34 @@ void casStreamReadReg::show(unsigned level) const
// casStreamReadReg::callBack ()
//
void casStreamReadReg::callBack ()
{
this->os.recvCB();
//
// NO CODE HERE
// (casStreamOS::recvCB() may up indirectly deleting this object)
//
}
//
// casStreamOS::recvCB()
//
void casStreamOS::recvCB()
{
casFillCondition fillCond;
casProcCond procCond;
assert (this->os.pRdReg);
assert (this->pRdReg);
//
// copy in new messages
//
fillCond = os.fill();
procCond = os.processInput();
//
// copy in new messages
//
fillCond = this->fill();
procCond = this->processInput();
if (fillCond == casFillDisconnect ||
procCond == casProcDisconnect) {
delete &this->os;
delete this;
}
else if (os.inBuf::full()==aitTrue) {
else if (this->inBuf::full()==aitTrue) {
//
// If there isnt any space then temporarily
// stop calling this routine until problem is resolved
@@ -490,11 +509,11 @@ void casStreamReadReg::callBack ()
// (casStreamReadReg is _not_ a onceOnly fdReg -
// therefore an explicit delete is required here)
//
delete this;
this->disarmRecv(); // this deletes the casStreamReadReg object
}
//
// NO CODE HERE
// (see deletes above)
// (see delete above)
//
}
@@ -508,7 +527,6 @@ void casStreamOS::sendBlockSignal()
this->armSend();
}
//
// casStreamWriteReg::show()
@@ -524,20 +542,33 @@ void casStreamWriteReg::show(unsigned level) const
// casStreamWriteReg::callBack()
//
void casStreamWriteReg::callBack()
{
casStreamOS *pSOS = &this->os;
delete this; // allows rearm to occur if required
pSOS->sendCB();
//
// NO CODE HERE - see delete above
//
}
//
// casStreamOS::sendCB()
//
void casStreamOS::sendCB()
{
casFlushCondition flushCond;
casProcCond procCond;
assert (os.pWtReg);
this->pWtReg = NULL; // allow rearm (send callbacks are one shots)
//
// attempt to flush the output buffer
//
flushCond = os.flush();
if ( flushCond==casFlushCompleted ||
flushCond = this->flush();
if (flushCond==casFlushCompleted ||
flushCond==casFlushPartial) {
if (os.sendBlocked) {
os.sendBlocked = FALSE;
if (this->sendBlocked) {
this->sendBlocked = FALSE;
}
}
else if (flushCond==casFlushDisconnect) {
@@ -558,7 +589,7 @@ void casStreamWriteReg::callBack()
// we _are_ able to write to see if additional events
// can be sent to the slow client.
//
procCond = this->os.casEventSys::process();
procCond = this->casEventSys::process();
if (procCond != casProcOk) {
//
// ok to delete the client here
@@ -568,19 +599,19 @@ void casStreamWriteReg::callBack()
// called from a client member function
// higher up on the stack
//
this->os.destroy();
this->destroy();
//
// must not touch "this" pointer
// after the destroy however
// must _not_ touch "this" pointer
// after the destroy
//
return;
}
# if defined(DEBUG)
printf ("write attempted on %d result was %d\n",
os.getFD(), flushCond);
printf ("Recv backlog %u\n", os.inBuf::bytesPresent());
printf ("Send backlog %u\n", os.outBuf::bytesPresent());
this->getFD(), flushCond);
printf ("Recv backlog %u\n", this->inBuf::bytesPresent());
printf ("Send backlog %u\n", this->outBuf::bytesPresent());
# endif
//
@@ -588,12 +619,11 @@ void casStreamWriteReg::callBack()
// to process the input queue in case we were send
// blocked.
//
procCond = this->os.processInput();
procCond = this->processInput();
if (procCond == casProcDisconnect) {
delete &this->os;
delete this;
}
else {
casStreamOS *pStrmOS = &this->os;
//
// if anything is left in the send buffer that
// still needs to be sent and there are not
@@ -605,14 +635,9 @@ void casStreamWriteReg::callBack()
// additional bytes may have been added since
// we flushed the out buffer
//
if (pStrmOS->outBuf::bytesPresent()>0u &&
pStrmOS->inBuf::bytesAvailable()==0u) {
//
// delete this object now so that the
// arm will work
//
delete this;
pStrmOS->armSend();
if (this->outBuf::bytesPresent()>0u &&
this->inBuf::bytesAvailable()==0u) {
this->armSend();
}
}
//