From 0c1583ff7b61087de2b089e36bfc683851bcc196 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Fri, 20 Oct 2000 23:34:39 +0000 Subject: [PATCH] fixed deadlock discovered during connect / disconnect test --- src/ca/cacServiceList.cpp | 2 - src/ca/comBuf_IL.h | 4 +- src/ca/comQueSend.cpp | 96 +++++++++++++++++++------------------- src/ca/comQueSend_IL.h | 3 -- src/ca/iocinf.h | 24 ++++++---- src/ca/nciu.cpp | 6 ++- src/ca/netSubscription.cpp | 9 ++-- src/ca/netiiu.cpp | 2 +- src/ca/tcpiiu.cpp | 25 +++++++--- 9 files changed, 94 insertions(+), 77 deletions(-) diff --git a/src/ca/cacServiceList.cpp b/src/ca/cacServiceList.cpp index 83a386639..1ddab1b70 100644 --- a/src/ca/cacServiceList.cpp +++ b/src/ca/cacServiceList.cpp @@ -50,8 +50,6 @@ cacLocalChannelIO * cacServiceList::createChannelIO (const char *pName, cac &cac void cacServiceList::show ( unsigned level ) const { - cacLocalChannelIO *pChanIO = 0; - this->lock (); tsDLIterConstBD < cacServiceIO > iter ( this->services.first () ); while ( iter.valid () ) { diff --git a/src/ca/comBuf_IL.h b/src/ca/comBuf_IL.h index 8d00f36b5..e7097a120 100644 --- a/src/ca/comBuf_IL.h +++ b/src/ca/comBuf_IL.h @@ -119,11 +119,11 @@ inline unsigned comBuf::maxBytes () return comBufSize; } -inline bool comBuf::flushToWire ( class comQueSend &que ) +inline bool comBuf::flushToWire ( class comQueSend &que, bool enablePreemptionDuringFlush ) { unsigned occupied = this->occupiedBytes (); while ( occupied ) { - unsigned nBytes = que.sendBytes ( &this->buf[this->nextReadIndex], occupied ); + unsigned nBytes = que.sendBytes ( &this->buf[this->nextReadIndex], occupied, enablePreemptionDuringFlush ); if ( nBytes == 0u ) { this->nextReadIndex = this->nextWriteIndex; return false; diff --git a/src/ca/comQueSend.cpp b/src/ca/comQueSend.cpp index 3b87b320b..12ca17836 100644 --- a/src/ca/comQueSend.cpp +++ b/src/ca/comQueSend.cpp @@ -99,42 +99,38 @@ inline unsigned bufferReservoir::nBytes () return ( this->reservedBufs.count () * comBuf::maxBytes () ); } -// o lock the comQueSend -// o reserve sufficent space for entire message +// reserve sufficent space for entire message // (this allows the recv thread to add a message // to the que while some other thread is flushing // and therefore prevents deadlocks, and it also // allows proper status to be returned) -// o unlock comQueSend if status is not ECA_NORMAL -inline int comQueSend::lockAndReserveSpace ( unsigned msgSize, bufferReservoir &reservoir ) +inline int comQueSend::lockAndReserveSpace ( unsigned msgSize, + bufferReservoir &reservoir, bool enablePreemptionDuringFlush ) { unsigned bytesReserved = reservoir.nBytes (); - unsigned unoccupied; this->mutex.lock (); - comBuf *pComBuf = this->bufs.last (); - if ( pComBuf ) { - unoccupied = pComBuf->unoccupiedBytes (); - } - else { - unoccupied = 0u; - } + while ( true ) { + comBuf *pComBuf = this->bufs.last (); + if ( pComBuf ) { + bytesReserved += pComBuf->unoccupiedBytes (); + } + + if ( bytesReserved >= msgSize || this->bufs.count () < 4 ) { + break; + } + + if ( ! this->flushToWirePermit () ) { + if ( this->bufs.count () >= 32u ) { + this->mutex.unlock (); + return ECA_TOLARGE; + } + break; + } - // flush if conditions indicate. second part of this guarantees - // that we will not flush out a buffer with almost nothing - // in it (this has a large impact on performance) - if ( this->bufs.count () <= 1u || unoccupied >= msgSize ) { - bytesReserved = unoccupied; - } - else { this->mutex.unlock (); - if ( ! this->flushToWire () ) { - return ECA_DISCONNCHID; - } - if ( this->bufs.count () >= 32u ) { - return ECA_TOLARGE; - } + this->flushToWire ( enablePreemptionDuringFlush ); this->mutex.lock (); } @@ -320,7 +316,7 @@ unsigned comQueSend::occupiedBytes () const return nBytes; } -bool comQueSend::flushToWire () +bool comQueSend::flushToWire ( bool enablePreemptionDuringFlush ) { bool success = false; @@ -345,7 +341,7 @@ bool comQueSend::flushToWire () success = true; break; } - success = pBuf->flushToWire ( *this ); + success = pBuf->flushToWire ( *this, enablePreemptionDuringFlush ); pBuf->destroy (); if ( ! success ) { this->mutex.lock (); @@ -396,7 +392,7 @@ int comQueSend::writeRequest ( unsigned serverId, unsigned type, unsigned nElem, assert ( serverId <= 0xffffffff ); - int status = this->lockAndReserveSpace ( postcnt + 16u, reservoir ); + int status = this->lockAndReserveSpace ( postcnt + 16u, reservoir, true ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_WRITE ) ); // cmd comQueSend_copyIn ( this->bufs, reservoir, static_cast ( postcnt ) ); // postsize @@ -446,7 +442,7 @@ int comQueSend::writeNotifyRequest ( unsigned ioId, unsigned serverId, unsigned assert ( serverId <= 0xffffffff ); - int status = this->lockAndReserveSpace ( postcnt + 16u, reservoir ); + int status = this->lockAndReserveSpace ( postcnt + 16u, reservoir, true ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_WRITE_NOTIFY ) ); // cmd comQueSend_copyIn ( this->bufs, reservoir, static_cast ( postcnt ) ); // postsize @@ -475,7 +471,7 @@ int comQueSend::readCopyRequest ( unsigned ioId, unsigned serverId, unsigned typ assert ( serverId <= 0xffffffff ); - int status = this->lockAndReserveSpace ( 16u, reservoir ); + int status = this->lockAndReserveSpace ( 16u, reservoir, true ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_READ ) ); // cmd comQueSend_copyIn ( this->bufs, reservoir, static_cast ( 0u ) ); // postsize @@ -502,7 +498,7 @@ int comQueSend::readNotifyRequest ( unsigned ioId, unsigned serverId, unsigned t assert ( serverId <= 0xffffffff ); - int status = this->lockAndReserveSpace ( 16u, reservoir ); + int status = this->lockAndReserveSpace ( 16u, reservoir, true ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_READ_NOTIFY ) ); // cmd comQueSend_copyIn ( this->bufs, reservoir, static_cast ( 0u ) ); // postsize @@ -524,7 +520,7 @@ int comQueSend::createChannelRequest ( unsigned id, const char *pName, unsigned assert ( id <= 0xffffffff ); assert ( postCnt <= 0xffff ); - int status = this->lockAndReserveSpace ( postCnt + 16u, reservoir ); + int status = this->lockAndReserveSpace ( postCnt + 16u, reservoir, false ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_CLAIM_CIU ) ); // cmd @@ -544,6 +540,7 @@ int comQueSend::createChannelRequest ( unsigned id, const char *pName, unsigned if ( postCnt > nameLength ) { comQueSend_copyIn ( this->bufs, reservoir, nillBytes, postCnt - nameLength ); } + this->mutex.unlock (); } @@ -557,7 +554,7 @@ int comQueSend::clearChannelRequest ( unsigned clientId, unsigned serverId ) assert ( serverId <= 0xffffffff ); assert ( clientId <= 0xffffffff ); - int status = this->lockAndReserveSpace ( 16u, reservoir ); + int status = this->lockAndReserveSpace ( 16u, reservoir, true ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_CLEAR_CHANNEL ) ); // cmd comQueSend_copyIn ( this->bufs, reservoir, static_cast ( 0u ) ); // postsize @@ -571,7 +568,8 @@ int comQueSend::clearChannelRequest ( unsigned clientId, unsigned serverId ) return status; } -int comQueSend::subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, unsigned mask ) +int comQueSend::subscriptionRequest ( unsigned ioId, unsigned serverId, + unsigned type, unsigned nElem, unsigned mask, bool enablePreemptionDuringFlush ) { bufferReservoir reservoir; @@ -584,7 +582,7 @@ int comQueSend::subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned assert ( serverId <= 0xffffffff ); assert ( ioId <= 0xffffffff ); - int status = this->lockAndReserveSpace ( 32u, reservoir ); + int status = this->lockAndReserveSpace ( 32u, reservoir, enablePreemptionDuringFlush ); if ( status == ECA_NORMAL ) { // header @@ -617,14 +615,14 @@ int comQueSend::subscriptionCancelRequest ( unsigned ioId, unsigned serverId, un assert ( serverId <= 0xffffffff ); assert ( ioId <= 0xffffffff ); - int status = this->lockAndReserveSpace ( 16u, reservoir ); + int status = this->lockAndReserveSpace ( 16u, reservoir, true ); if ( status == ECA_NORMAL ) { - comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_EVENT_CANCEL ) ); // cmd - comQueSend_copyIn ( this->bufs, reservoir, static_cast ( 0u ) ); // postsize - comQueSend_copyIn ( this->bufs, reservoir, static_cast ( type ) ); // dataType - comQueSend_copyIn ( this->bufs, reservoir, static_cast ( nElem ) ); // count - comQueSend_copyIn ( this->bufs, reservoir, static_cast ( serverId ) ); // cid - comQueSend_copyIn ( this->bufs, reservoir, static_cast ( ioId ) ); // available + comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint16_t > ( CA_PROTO_EVENT_CANCEL ) ); // cmd + comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint16_t > ( 0u ) ); // postsize + comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint16_t > ( type ) ); // dataType + comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint16_t > ( nElem ) ); // count + comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint32_t > ( serverId ) ); // cid + comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint32_t > ( ioId ) ); // available this->mutex.unlock (); } @@ -635,7 +633,7 @@ int comQueSend::disableFlowControlRequest () { bufferReservoir reservoir; - int status = this->lockAndReserveSpace ( 16u, reservoir ); + int status = this->lockAndReserveSpace ( 16u, reservoir, false ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_EVENTS_ON ) ); // cmd comQueSend_copyIn ( this->bufs, reservoir, static_cast ( 0u ) ); // postsize @@ -652,7 +650,7 @@ int comQueSend::enableFlowControlRequest () { bufferReservoir reservoir; - int status = this->lockAndReserveSpace ( 16u, reservoir ); + int status = this->lockAndReserveSpace ( 16u, reservoir, false ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_EVENTS_OFF ) ); // cmd comQueSend_copyIn ( this->bufs, reservoir, static_cast ( 0u ) ); // postsize @@ -669,7 +667,7 @@ int comQueSend::noopRequest () { bufferReservoir reservoir; - int status = this->lockAndReserveSpace ( 16u, reservoir ); + int status = this->lockAndReserveSpace ( 16u, reservoir, false ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_NOOP ) ); // cmd comQueSend_copyIn ( this->bufs, reservoir, static_cast ( 0u ) ); // postsize @@ -686,7 +684,7 @@ int comQueSend::echoRequest () { bufferReservoir reservoir; - int status = this->lockAndReserveSpace ( 16u, reservoir ); + int status = this->lockAndReserveSpace ( 16u, reservoir, false ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_ECHO ) ); // cmd comQueSend_copyIn ( this->bufs, reservoir, static_cast ( 0u ) ); // postsize @@ -706,7 +704,7 @@ int comQueSend::hostNameSetRequest ( const char *pName ) unsigned postSize = CA_MESSAGE_ALIGN ( size ); assert ( postSize < 0xffff ); - int status = this->lockAndReserveSpace ( postSize + 16u, reservoir ); + int status = this->lockAndReserveSpace ( postSize + 16u, reservoir, false ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_HOST_NAME ) ); // cmd comQueSend_copyIn ( this->bufs, reservoir, static_cast ( postSize ) ); // postsize @@ -717,6 +715,7 @@ int comQueSend::hostNameSetRequest ( const char *pName ) comQueSend_copyIn ( this->bufs, reservoir, pName, size ); comQueSend_copyIn ( this->bufs, reservoir, nillBytes, postSize - size ); + this->mutex.unlock (); } return status; @@ -729,7 +728,7 @@ int comQueSend::userNameSetRequest ( const char *pName ) unsigned postSize = CA_MESSAGE_ALIGN ( size ); assert ( postSize < 0xffff ); - int status = this->lockAndReserveSpace ( postSize + 16u, reservoir ); + int status = this->lockAndReserveSpace ( postSize + 16u, reservoir, false ); if ( status == ECA_NORMAL ) { comQueSend_copyIn ( this->bufs, reservoir, static_cast ( CA_PROTO_CLIENT_NAME ) ); // cmd comQueSend_copyIn ( this->bufs, reservoir, static_cast ( postSize ) ); // postsize @@ -740,6 +739,7 @@ int comQueSend::userNameSetRequest ( const char *pName ) comQueSend_copyIn ( this->bufs, reservoir, pName, size ); comQueSend_copyIn ( this->bufs, reservoir, nillBytes, postSize - size ); + this->mutex.unlock (); } return status; diff --git a/src/ca/comQueSend_IL.h b/src/ca/comQueSend_IL.h index e1f0cad79..715ca58df 100644 --- a/src/ca/comQueSend_IL.h +++ b/src/ca/comQueSend_IL.h @@ -18,8 +18,5 @@ #ifndef comQueSend_ILh #define comQueSend_ILh - - - #endif // comQueSend_ILh diff --git a/src/ca/iocinf.h b/src/ca/iocinf.h index d230c40fd..3c95f6b7a 100644 --- a/src/ca/iocinf.h +++ b/src/ca/iocinf.h @@ -132,7 +132,7 @@ public: unsigned removeBytes ( unsigned nBytes ); void * operator new ( size_t size ); void operator delete ( void *pCadaver, size_t size ); - bool flushToWire ( class comQueSend & ); + bool flushToWire ( class comQueSend &, bool enablePreemptionDuringFlush ); unsigned fillFromWire ( class comQueRecv & ); private: static tsFreeList < class comBuf, 0x20 > freeList; @@ -170,14 +170,15 @@ class comQueSend { public: virtual ~comQueSend (); unsigned occupiedBytes () const; - bool flushToWire (); + bool flushToWire ( bool enablePreemptionDuringFlush ); int writeRequest ( unsigned serverId, unsigned type, unsigned nElem, const void *pValue ); int writeNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, const void *pValue ); int readCopyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem ); int readNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem ); int createChannelRequest ( unsigned clientId, const char *pName, unsigned nameLength ); int clearChannelRequest ( unsigned clientId, unsigned serverId ); - int subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, unsigned mask ); + int subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type, + unsigned nElem, unsigned mask, bool enablePreemptionDuringFlush ); int subscriptionCancelRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem ); int disableFlowControlRequest (); int enableFlowControlRequest (); @@ -186,10 +187,11 @@ public: int hostNameSetRequest ( const char *pName ); int userNameSetRequest ( const char *pName ); - virtual unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf ) = 0; + virtual unsigned sendBytes ( const void *pBuf, + unsigned nBytesInBuf, bool enablePreemptionDuringFlush ) = 0; private: - int lockAndReserveSpace ( unsigned msgSize, bufferReservoir & ); + int lockAndReserveSpace ( unsigned msgSize, bufferReservoir &, bool enablePreemptionDuringFlush ); virtual bool flushToWirePermit () = 0; void copy_dbr_string ( bufferReservoir &, const void *pValue, unsigned nElem ); @@ -295,7 +297,8 @@ public: void operator delete ( void *pCadaver, size_t size ); int subscriptionMsg ( unsigned subscriptionId, - unsigned typeIn, unsigned long countIn, unsigned short maskIn ); + unsigned typeIn, unsigned long countIn, unsigned short maskIn, + bool enablePreemptionDuringFlush ); void resetRetryCount (); unsigned getRetrySeqNo () const; void accessRightsStateChange ( const caar &arIn ); @@ -513,7 +516,8 @@ public: virtual int writeNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, const void *pValue ); virtual int readCopyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem ); virtual int readNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem ); - virtual int subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, unsigned mask ); + virtual int subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type, + unsigned nElem, unsigned mask, bool enablePreemptionDuringFlush ); virtual int subscriptionCancelRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem ); virtual int createChannelRequest ( unsigned clientId, const char *pName, unsigned nameLength ); virtual int clearChannelRequest ( unsigned clientId, unsigned serverId ); @@ -745,7 +749,8 @@ public: int readNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem ); int createChannelRequest ( unsigned clientId, const char *pName, unsigned nameLength ); int clearChannelRequest ( unsigned clientId, unsigned serverId ); - int subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, unsigned mask ); + int subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, + unsigned mask, bool enablePreemptionDuringFlush ); int subscriptionCancelRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem ); void hostName (char *pBuf, unsigned bufLength) const; @@ -778,7 +783,8 @@ private: bool ca_v42_ok () const; void postMsg (); - unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf ); + unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf, + bool enablePreemptionDuringFlush ); unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf ); bool flushToWirePermit (); diff --git a/src/ca/nciu.cpp b/src/ca/nciu.cpp index 33a8b1a12..df7fe349a 100644 --- a/src/ca/nciu.cpp +++ b/src/ca/nciu.cpp @@ -456,7 +456,8 @@ bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisCh } int nciu::subscriptionMsg ( unsigned subscriptionId, unsigned typeIn, - unsigned long countIn, unsigned short maskIn) + unsigned long countIn, unsigned short maskIn, + bool enablePreemptionDuringFlush ) { int status; @@ -471,7 +472,8 @@ int nciu::subscriptionMsg ( unsigned subscriptionId, unsigned typeIn, if ( this->f_connected ) { this->lockPIIU (); if ( this->piiu ) { - status = this->piiu->subscriptionRequest ( subscriptionId, this->sid, typeIn, countIn, maskIn ); + status = this->piiu->subscriptionRequest ( subscriptionId, this->sid, + typeIn, countIn, maskIn, enablePreemptionDuringFlush ); } else { status = ECA_NORMAL; diff --git a/src/ca/netSubscription.cpp b/src/ca/netSubscription.cpp index 975bc4ad1..038e99caa 100644 --- a/src/ca/netSubscription.cpp +++ b/src/ca/netSubscription.cpp @@ -18,10 +18,11 @@ tsFreeList < class netSubscription, 1024 > netSubscription::freeList; netSubscription::netSubscription ( nciu &chan, chtype typeIn, unsigned long countIn, unsigned short maskIn, cacNotify ¬ifyIn ) : - cacNotifyIO (notifyIn), baseNMIU (chan), - type (typeIn), count (countIn), mask (maskIn) + cacNotifyIO ( notifyIn ), baseNMIU ( chan ), + type ( typeIn ), count ( countIn ), mask ( maskIn ) { - this->subscriptionMsg (); + this->chan.subscriptionMsg ( this->getId (), this->type, + this->count, this->mask, true ); } netSubscription::~netSubscription () @@ -37,7 +38,7 @@ void netSubscription::destroy() int netSubscription::subscriptionMsg () { return this->chan.subscriptionMsg ( this->getId (), this->type, - this->count, this->mask ); + this->count, this->mask, false ); } void netSubscription::disconnect ( const char * /* pHostName */ ) diff --git a/src/ca/netiiu.cpp b/src/ca/netiiu.cpp index dcaf706ec..fa61c9e12 100644 --- a/src/ca/netiiu.cpp +++ b/src/ca/netiiu.cpp @@ -218,7 +218,7 @@ int netiiu::clearChannelRequest ( unsigned, unsigned ) return ECA_DISCONNCHID; } -int netiiu::subscriptionRequest ( unsigned, unsigned, unsigned, unsigned, unsigned ) +int netiiu::subscriptionRequest ( unsigned, unsigned, unsigned, unsigned, unsigned, bool ) { return ECA_DISCONNCHID; } diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index f6c44c2f5..69be3a42b 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -197,7 +197,7 @@ extern "C" void cacSendThreadTCP ( void *pParam ) piiu->unlock (); if ( laborNeeded ) { - if ( ! piiu->flushToWire () ) { + if ( ! piiu->flushToWire ( false ) ) { break; } } @@ -206,7 +206,8 @@ extern "C" void cacSendThreadTCP ( void *pParam ) semBinaryGive ( piiu->sendThreadExitSignal ); } -unsigned tcpiiu::sendBytes ( const void *pBuf, unsigned nBytesInBuf ) +unsigned tcpiiu::sendBytes ( const void *pBuf, + unsigned nBytesInBuf, bool enablePreemptionDuringFlush ) { int status; unsigned nBytes; @@ -216,8 +217,13 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, unsigned nBytesInBuf ) } assert ( nBytesInBuf <= INT_MAX ); - this->clientCtx ().enableCallbackPreemption (); + + if ( enablePreemptionDuringFlush ) { + this->clientCtx ().enableCallbackPreemption (); + } + this->armSendWatchdog (); + while ( true ) { status = ::send ( this->sock, static_cast < const char * > (pBuf), (int) nBytesInBuf, 0 ); @@ -253,8 +259,13 @@ unsigned tcpiiu::sendBytes ( const void *pBuf, unsigned nBytesInBuf ) break; } } + this->cancelSendWatchdog (); - this->clientCtx ().disableCallbackPreemption (); + + if ( enablePreemptionDuringFlush ) { + this->clientCtx ().disableCallbackPreemption (); + } + return nBytes; } @@ -1142,9 +1153,11 @@ int tcpiiu::clearChannelRequest ( unsigned clientId, unsigned serverId ) return this->comQueSend::clearChannelRequest ( clientId, serverId ); } -int tcpiiu::subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, unsigned mask ) +int tcpiiu::subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type, + unsigned nElem, unsigned mask, bool enablePreemptionDuringFlush ) { - return this->comQueSend::subscriptionRequest ( ioId, serverId, type, nElem, mask ); + return this->comQueSend::subscriptionRequest ( ioId, serverId, type, nElem, + mask, enablePreemptionDuringFlush ); } int tcpiiu::subscriptionCancelRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem )