diff --git a/src/ca/CAref.html b/src/ca/CAref.html
index c423b72c3..2c52788d8 100644
--- a/src/ca/CAref.html
+++ b/src/ca/CAref.html
@@ -1804,8 +1804,14 @@ void ca_context_destroy();
Description
Shut down a channel access client context and free any resources
-allocated. On most operating systems this is performed automatically at
-process exit.
+allocated.
+
+On many OS that execute programs in a process based environment the
+resources used by the client library such as sockets and allocated memory are
+automatically released by the system when the process exits and
+ca_context_destroy() hasn't been called, but on light weight systems such as
+vxWorks or RTEMS no cleanup occurs unless the application call
+ca_context_destroy().
Returns
diff --git a/src/ca/access.cpp b/src/ca/access.cpp
index a0f9ed4a8..81a993d2a 100644
--- a/src/ca/access.cpp
+++ b/src/ca/access.cpp
@@ -382,6 +382,8 @@ int epicsShareAPI ca_array_get ( chtype type,
}
unsigned tmpType = static_cast < unsigned > ( type );
epicsGuard < epicsMutex > guard ( pChan->getClientCtx().mutex );
+ pChan->eliminateExcessiveSendBacklog (
+ pChan->getClientCtx().pCallbackGuard.get(), guard );
autoPtrFreeList < getCopy, 0x400, epicsMutexNOOP > pNotify
( pChan->getClientCtx().getCopyFreeList,
new ( pChan->getClientCtx().getCopyFreeList )
@@ -449,6 +451,8 @@ int epicsShareAPI ca_array_get_callback ( chtype type,
unsigned tmpType = static_cast < unsigned > ( type );
epicsGuard < epicsMutex > guard ( pChan->getClientCtx().mutex );
+ pChan->eliminateExcessiveSendBacklog (
+ pChan->getClientCtx().pCallbackGuard.get(), guard );
autoPtrFreeList < getCallback, 0x400, epicsMutexNOOP > pNotify
( pChan->getClientCtx().getCallbackFreeList,
new ( pChan->getClientCtx().getCallbackFreeList )
@@ -512,6 +516,8 @@ int epicsShareAPI ca_array_put_callback ( chtype type, arrayElementCount count,
return ECA_BADTYPE;
}
epicsGuard < epicsMutex > guard ( pChan->getClientCtx().mutex );
+ pChan->eliminateExcessiveSendBacklog (
+ pChan->getClientCtx().pCallbackGuard.get(), guard );
unsigned tmpType = static_cast < unsigned > ( type );
autoPtrFreeList < putCallback, 0x400, epicsMutexNOOP > pNotify
( pChan->getClientCtx().putCallbackFreeList,
@@ -575,6 +581,8 @@ int epicsShareAPI ca_array_put ( chtype type, arrayElementCount count,
int caStatus;
try {
epicsGuard < epicsMutex > guard ( pChan->getClientCtx().mutex );
+ pChan->eliminateExcessiveSendBacklog (
+ pChan->getClientCtx().pCallbackGuard.get(), guard );
pChan->write ( guard, tmpType, count, pValue );
caStatus = ECA_NORMAL;
}
@@ -683,6 +691,8 @@ int epicsShareAPI ca_create_subscription (
try {
epicsGuard < epicsMutex > guard ( pChan->getClientCtx().mutex );
+ pChan->eliminateExcessiveSendBacklog (
+ pChan->getClientCtx().pCallbackGuard.get(), guard );
autoPtrFreeList < oldSubscription, 0x400, epicsMutexNOOP > pSubsr
( pChan->getClientCtx().subscriptionFreeList,
new ( pChan->getClientCtx().subscriptionFreeList )
@@ -749,11 +759,15 @@ epicsShareFunc int epicsShareAPI ca_clear_subscription ( evid pMon )
ca_client_context & cac = chan.getClientCtx ();
if ( cac.pCallbackGuard.get() ) {
epicsGuard < epicsMutex > guard ( cac.mutex );
+ chan.eliminateExcessiveSendBacklog (
+ cac.pCallbackGuard.get(), guard );
pMon->ioCancel ( *cac.pCallbackGuard, guard );
}
else {
epicsGuard < epicsMutex > cbGuard ( cac.cbMutex );
epicsGuard < epicsMutex > guard ( cac.mutex );
+ chan.eliminateExcessiveSendBacklog (
+ &cbGuard, guard );
pMon->ioCancel ( cbGuard, guard );
}
return ECA_NORMAL;
diff --git a/src/ca/ca_client_context.cpp b/src/ca/ca_client_context.cpp
index efcac8f95..674377b7d 100644
--- a/src/ca/ca_client_context.cpp
+++ b/src/ca/ca_client_context.cpp
@@ -189,12 +189,15 @@ void ca_client_context::destroyChannel ( oldChannelNotify & chan )
{
if ( this->pCallbackGuard.get() ) {
epicsGuard < epicsMutex > guard ( this->mutex );
+ chan.eliminateExcessiveSendBacklog (
+ this->pCallbackGuard.get(), guard );
chan.destructor ( *this->pCallbackGuard.get(), guard );
this->oldChannelNotifyFreeList.release ( & chan );
}
else {
epicsGuard < epicsMutex > cbGuard ( this->cbMutex );
epicsGuard < epicsMutex > guard ( this->mutex );
+ chan.eliminateExcessiveSendBacklog ( &cbGuard, guard );
chan.destructor ( cbGuard, guard );
this->oldChannelNotifyFreeList.release ( & chan );
}
diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp
index 7d1e78da0..5f5d30e6e 100644
--- a/src/ca/cac.cpp
+++ b/src/ca/cac.cpp
@@ -573,11 +573,8 @@ void cac::destroyChannel (
if ( this->chanTable.remove ( chan ) != & chan ) {
throw std::logic_error ( "Invalid channel identifier" );
}
-
- chan.getPIIU(guard)->uninstallChan ( cbGuard, guard, chan );
- chan.destructor ( cbGuard, guard );
-
+ chan.~nciu ();
this->channelFreeList.release ( & chan );
}
@@ -614,36 +611,11 @@ int cac::printf ( epicsGuard < epicsMutex > & callbackControl,
return status;
}
-void cac::flushIfRequired ( epicsGuard < epicsMutex > & guard, netiiu & iiu )
-{
- guard.assertIdenticalMutex ( this->mutex );
-
- if ( iiu.flushBlockThreshold ( guard ) ) {
- iiu.flushRequest ( guard );
- // the process thread is not permitted to flush as this
- // can result in a push / pull deadlock on the TCP pipe.
- // Instead, the process thread scheduals the flush with the
- // send thread which runs at a higher priority than the
- // send thread. The same applies to the UDP thread for
- // locking hierarchy reasons.
- if ( ! epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
- // enable / disable of call back preemption must occur here
- // because the tcpiiu might disconnect while waiting and its
- // pointer to this cac might become invalid
- iiu.blockUntilSendBacklogIsReasonable ( this->notify, guard );
- }
- }
- else {
- iiu.flushRequestIfAboveEarlyThreshold ( guard );
- }
-}
-
void cac::writeRequest (
epicsGuard < epicsMutex > & guard, nciu & chan, unsigned type,
arrayElementCount nElem, const void * pValue )
{
guard.assertIdenticalMutex ( this->mutex );
- this->flushIfRequired ( guard, *chan.getPIIU(guard) );
chan.getPIIU(guard)->writeRequest ( guard, chan, type, nElem, pValue );
}
@@ -656,7 +628,6 @@ netWriteNotifyIO & cac::writeNotifyRequest (
guard, this->ioTable, *this,
netWriteNotifyIO::factory ( this->freeListWriteNotifyIO, icni, notifyIn ) );
this->ioTable.add ( *pIO );
- this->flushIfRequired ( guard, *chan.getPIIU(guard) );
chan.getPIIU(guard)->writeNotifyRequest (
guard, chan, *pIO, type, nElem, pValue );
return *pIO.release();
@@ -671,7 +642,6 @@ netReadNotifyIO & cac::readNotifyRequest (
guard, this->ioTable, *this,
netReadNotifyIO::factory ( this->freeListReadNotifyIO, icni, notifyIn ) );
this->ioTable.add ( *pIO );
- this->flushIfRequired ( guard, *chan.getPIIU(guard) );
chan.getPIIU(guard)->readNotifyRequest ( guard, chan, *pIO, type, nElem );
return *pIO.release();
}
@@ -684,13 +654,10 @@ baseNMIU * cac::destroyIO (
cbGuard.assertIdenticalMutex ( this->cbMutex );
guard.assertIdenticalMutex ( this->mutex );
- // unistall the IO object so that a receive thread will not find it,
- // but do _not_ hold the callback lock here because this could result
- // in deadlock
baseNMIU * pIO = this->ioTable.remove ( idIn );
if ( pIO ) {
class netSubscription * pSubscr = pIO->isSubscription ();
- if ( pSubscr && chan.connected ( guard ) ) {
+ if ( pSubscr ) {
chan.getPIIU(guard)->subscriptionCancelRequest (
guard, chan, *pSubscr );
}
@@ -760,7 +727,8 @@ netSubscription & cac::subscriptionRequest (
nciu & chan, privateInterfaceForIO & privChan,
unsigned type, // X aCC 361
arrayElementCount nElem, unsigned mask,
- cacStateNotify & notifyIn )
+ cacStateNotify & notifyIn,
+ bool chanIsInstalled )
{
guard.assertIdenticalMutex ( this->mutex );
autoPtrRecycle < netSubscription > pIO (
@@ -768,8 +736,7 @@ netSubscription & cac::subscriptionRequest (
netSubscription::factory ( this->freeListSubscription,
privChan, type, nElem, mask, notifyIn ) );
this->ioTable.add ( *pIO );
- if ( chan.connected ( guard ) ) {
- this->flushIfRequired ( guard, *chan.getPIIU(guard) );
+ if ( chanIsInstalled ) {
pIO->subscribeIfRequired ( guard, chan );
}
return *pIO.release ();
diff --git a/src/ca/cac.h b/src/ca/cac.h
index de9b1958e..40fd6e7c3 100644
--- a/src/ca/cac.h
+++ b/src/ca/cac.h
@@ -169,7 +169,7 @@ public:
netSubscription & subscriptionRequest (
epicsGuard < epicsMutex > &, nciu &, privateInterfaceForIO &,
unsigned type, arrayElementCount nElem, unsigned mask,
- cacStateNotify & );
+ cacStateNotify &, bool channelIsInstalled );
baseNMIU * destroyIO (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard,
@@ -224,7 +224,6 @@ public:
static unsigned lowestPriorityLevelAbove ( unsigned priority );
static unsigned highestPriorityLevelBelow ( unsigned priority );
void destroyIIU ( tcpiiu & iiu );
- void flushIfRequired ( epicsGuard < epicsMutex > &, netiiu & );
private:
localHostName hostNameCache;
diff --git a/src/ca/cacIO.h b/src/ca/cacIO.h
index 5f9637284..7cd4a8ddd 100644
--- a/src/ca/cacIO.h
+++ b/src/ca/cacIO.h
@@ -184,6 +184,9 @@ public:
unsigned level ) const = 0;
virtual void initiateConnect (
epicsGuard < epicsMutex > & ) = 0;
+ virtual void eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > * pCallbackGuard,
+ epicsGuard < epicsMutex > & mutualExclusionGuard ) = 0;
virtual ioStatus read (
epicsGuard < epicsMutex > &,
unsigned type, arrayElementCount count,
@@ -285,8 +288,6 @@ public:
virtual int vPrintf ( const char * pformat, va_list args ) const = 0;
// backwards compatibility (from here down)
virtual void attachToClientCtx () = 0;
- virtual void blockForEventAndEnableCallbacks (
- class epicsEvent & event, const double & timeout ) = 0;
virtual void callbackProcessingInitiateNotify () = 0;
virtual void callbackProcessingCompleteNotify () = 0;
};
diff --git a/src/ca/nciu.cpp b/src/ca/nciu.cpp
index 667e633f3..e3435baeb 100644
--- a/src/ca/nciu.cpp
+++ b/src/ca/nciu.cpp
@@ -74,43 +74,33 @@ nciu::nciu ( cac & cacIn, netiiu & iiuIn, cacChannelNotify & chanIn,
nciu::~nciu ()
{
+ delete [] this->pNameStr;
}
+// channels are created by the user, and only destroyed by the user
+// using this routine
void nciu::destroy (
epicsGuard < epicsMutex > & callbackControlGuard,
epicsGuard < epicsMutex > & mutualExclusionGuard )
-{
- this->cacCtx.destroyChannel (
- callbackControlGuard, mutualExclusionGuard, *this );
-}
-
-void nciu::destructor (
- epicsGuard < epicsMutex > & cbGuard,
- epicsGuard < epicsMutex > & guard )
-{
- guard.assertIdenticalMutex ( this->cacCtx.mutexRef () );
- // Send any side effect IO requests w/o holding the callback lock so that
- // we do not dead lock.
- // There is special protection in this routine that prevents blocking if
- // this is the tcp receive thread.
- // must not hold callback lock here
- this->cacCtx.flushIfRequired ( guard, *this->piiu );
-
- while ( baseNMIU * pNetIO = this->eventq.first () ) {
- assert ( this->cacCtx.destroyIO ( cbGuard, guard,
- pNetIO->getId (), *this ) );
- }
-
+{
// if the claim reply has not returned yet then we will issue
// the clear channel request to the server when the claim reply
// arrives and there is no matching nciu in the client
- if ( this->connected ( guard ) ) {
- this->getPIIU(guard)->clearChannelRequest (
- guard, this->sid, this->id );
+ if ( this->channelNode::isInstalledInServer ( mutualExclusionGuard ) ) {
+ while ( baseNMIU * pNetIO = this->eventq.first () ) {
+ assert ( this->cacCtx.destroyIO (
+ callbackControlGuard, mutualExclusionGuard,
+ pNetIO->getId (), *this ) );
+ }
+ this->getPIIU(mutualExclusionGuard)->clearChannelRequest (
+ mutualExclusionGuard, this->sid, this->id );
}
-
- delete [] this->pNameStr;
- this->~nciu ();
+
+ this->piiu->uninstallChan (
+ callbackControlGuard, mutualExclusionGuard, *this );
+
+ this->cacCtx.destroyChannel (
+ callbackControlGuard, mutualExclusionGuard, *this );
}
void * nciu::operator new ( size_t ) // X aCC 361
@@ -265,6 +255,14 @@ unsigned nciu::nameLen (
return this->nameLength;
}
+void nciu::eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > * pCallbackGuard,
+ epicsGuard < epicsMutex > & mutualExclusionGuard )
+{
+ this->piiu->eliminateExcessiveSendBacklog (
+ pCallbackGuard, mutualExclusionGuard );
+}
+
cacChannel::ioStatus nciu::read (
epicsGuard < epicsMutex > & guard,
unsigned type, arrayElementCount countIn,
@@ -372,11 +370,12 @@ void nciu::subscribe (
cacStateNotify & notify, ioid *pId )
{
netSubscription & io = this->cacCtx.subscriptionRequest (
- guard, *this, *this, type, nElem, mask, notify );
+ guard, *this, *this, type, nElem, mask, notify,
+ this->channelNode::isInstalledInServer ( guard ) );
+ this->eventq.add ( io );
if ( pId ) {
*pId = io.getId ();
}
- this->eventq.add ( io );
}
void nciu::ioCancel (
diff --git a/src/ca/nciu.h b/src/ca/nciu.h
index 74ff6ee18..7baff8e9a 100644
--- a/src/ca/nciu.h
+++ b/src/ca/nciu.h
@@ -58,6 +58,7 @@ class channelNode : public tsDLNode < class nciu >
public:
channelNode ();
bool isConnected ( epicsGuard < epicsMutex > & ) const;
+ bool isInstalledInServer ( epicsGuard < epicsMutex > & ) const;
private:
enum channelState {
cs_none,
@@ -92,9 +93,7 @@ class nciu :
public:
nciu ( cac &, netiiu &, cacChannelNotify &,
const char * pNameIn, cacChannel::priLev );
- void destructor (
- epicsGuard < epicsMutex > & cbGuard,
- epicsGuard < epicsMutex > & guard );
+ ~nciu ();
void connect ( unsigned nativeType,
unsigned nativeCount, unsigned sid,
epicsGuard < epicsMutex > & cbGuard,
@@ -155,9 +154,6 @@ public:
epicsGuard < epicsMutex > &, epicsGuard < epicsMutex > & );
bool connected ( epicsGuard < epicsMutex > & ) const;
-protected:
- ~nciu ();
-
private:
tsDLList < class baseNMIU > eventq;
caAccessRights accessRightState;
@@ -175,6 +171,9 @@ private:
epicsGuard < epicsMutex > & mutualExclusionGuard );
void initiateConnect (
epicsGuard < epicsMutex > & );
+ void eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > * pCallbackGuard,
+ epicsGuard < epicsMutex > & mutualExclusionGuard );
ioStatus read (
epicsGuard < epicsMutex > &,
unsigned type, arrayElementCount count,
@@ -319,4 +318,13 @@ inline bool channelNode::isConnected ( epicsGuard < epicsMutex > & ) const
this->listMember == cs_subscripUpdateReqPend;
}
+inline bool channelNode::isInstalledInServer ( epicsGuard < epicsMutex > & ) const
+{
+ return
+ this->listMember == cs_connected ||
+ this->listMember == cs_subscripReqPend ||
+ this->listMember == cs_unrespCircuit ||
+ this->listMember == cs_subscripUpdateReqPend;
+}
+
#endif // ifdef nciuh
diff --git a/src/ca/netiiu.cpp b/src/ca/netiiu.cpp
index f2950f5d9..41cf4767b 100644
--- a/src/ca/netiiu.cpp
+++ b/src/ca/netiiu.cpp
@@ -115,22 +115,12 @@ void netiiu::flushRequest (
{
}
-bool netiiu::flushBlockThreshold (
- epicsGuard < epicsMutex > & ) const
-{
- return false;
-}
-
-void netiiu::flushRequestIfAboveEarlyThreshold (
+void netiiu::eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > *,
epicsGuard < epicsMutex > & )
{
}
-void netiiu::blockUntilSendBacklogIsReasonable
- ( cacContextNotify &, epicsGuard < epicsMutex > & )
-{
-}
-
void netiiu::requestRecvProcessPostponedFlush (
epicsGuard < epicsMutex > & )
{
diff --git a/src/ca/netiiu.h b/src/ca/netiiu.h
index 32a9c5535..db2d16616 100644
--- a/src/ca/netiiu.h
+++ b/src/ca/netiiu.h
@@ -49,6 +49,9 @@ public:
epicsGuard < epicsMutex > & ) const = 0;
virtual bool ca_v42_ok (
epicsGuard < epicsMutex > & ) const = 0;
+ virtual void eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > * pCallbackGuard,
+ epicsGuard < epicsMutex > & mutualExclusionGuard ) = 0;
virtual void writeRequest (
epicsGuard < epicsMutex > &, nciu &,
unsigned type, arrayElementCount nElem,
@@ -76,12 +79,6 @@ public:
nciu & chan, netSubscription & subscr ) = 0;
virtual void flushRequest (
epicsGuard < epicsMutex > & ) = 0;
- virtual bool flushBlockThreshold (
- epicsGuard < epicsMutex > & ) const = 0;
- virtual void flushRequestIfAboveEarlyThreshold (
- epicsGuard < epicsMutex > & ) = 0;
- virtual void blockUntilSendBacklogIsReasonable
- ( cacContextNotify &, epicsGuard < epicsMutex > & ) = 0;
virtual void requestRecvProcessPostponedFlush (
epicsGuard < epicsMutex > & ) = 0;
virtual osiSockAddr getNetworkAddress (
diff --git a/src/ca/oldAccess.h b/src/ca/oldAccess.h
index 12d249a3e..ff6d4acfe 100644
--- a/src/ca/oldAccess.h
+++ b/src/ca/oldAccess.h
@@ -71,6 +71,9 @@ public:
unsigned level ) const;
void initiateConnect (
epicsGuard < epicsMutex > & );
+ void eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > * pCallbackGuard,
+ epicsGuard < epicsMutex > & mutualExclusionGuard );
void read (
epicsGuard < epicsMutex > &,
unsigned type, arrayElementCount count,
@@ -285,6 +288,8 @@ public:
epicsGuard < epicsMutex > &, const char * pChannelName,
oldChannelNotify &, cacChannel::priLev pri );
void flush ( epicsGuard < epicsMutex > & );
+ void eliminateExcessiveSendBacklog (
+ oldChannelNotify & chan, epicsGuard < epicsMutex > & guard );
int pendIO ( const double & timeout );
int pendEvent ( const double & timeout );
bool ioComplete () const;
@@ -434,6 +439,14 @@ inline void oldChannelNotify::initiateConnect (
this->io.initiateConnect ( guard );
}
+inline void oldChannelNotify::eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > * pCallbackGuard,
+ epicsGuard < epicsMutex > & mutualExclusionGuard )
+{
+ this->io.eliminateExcessiveSendBacklog (
+ pCallbackGuard, mutualExclusionGuard );
+}
+
inline void oldChannelNotify::ioCancel (
epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard,
@@ -628,4 +641,11 @@ inline unsigned ca_client_context::sequenceNumberOfOutstandingIO (
return this->ioSeqNo;
}
+inline void ca_client_context::eliminateExcessiveSendBacklog (
+ oldChannelNotify & chan, epicsGuard < epicsMutex > & guard )
+{
+ chan.eliminateExcessiveSendBacklog (
+ this->pCallbackGuard.get(), guard );
+}
+
#endif // ifndef oldAccessh
diff --git a/src/ca/syncGroupReadNotify.cpp b/src/ca/syncGroupReadNotify.cpp
index 3ec917497..4aefc4d48 100644
--- a/src/ca/syncGroupReadNotify.cpp
+++ b/src/ca/syncGroupReadNotify.cpp
@@ -37,6 +37,8 @@ void syncGroupReadNotify::begin (
epicsGuard < epicsMutex > & guard,
unsigned type, arrayElementCount count )
{
+ this->chan->getClientCtx().
+ eliminateExcessiveSendBacklog ( *this->chan, guard );
this->ioComplete = false;
boolFlagManager mgr ( this->idIsValid );
this->chan->read ( guard, type, count, *this, &this->id );
diff --git a/src/ca/syncGroupWriteNotify.cpp b/src/ca/syncGroupWriteNotify.cpp
index 4f794c9a4..6f5e2e75e 100644
--- a/src/ca/syncGroupWriteNotify.cpp
+++ b/src/ca/syncGroupWriteNotify.cpp
@@ -35,6 +35,8 @@ void syncGroupWriteNotify::begin (
epicsGuard < epicsMutex > & guard, unsigned type,
arrayElementCount count, const void * pValueIn )
{
+ this->chan->getClientCtx().eliminateExcessiveSendBacklog (
+ *this->chan, guard );
this->ioComplete = false;
boolFlagManager mgr ( this->idIsValid );
this->chan->write ( guard, type, count,
diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp
index 7a34149a1..e155413c6 100644
--- a/src/ca/tcpiiu.cpp
+++ b/src/ca/tcpiiu.cpp
@@ -1540,43 +1540,52 @@ bool tcpiiu::flush ( epicsGuard < epicsMutex > & guard )
return true;
}
-// ~tcpiiu() will not return while this->blockingForFlush is greater than zero
-void tcpiiu::blockUntilSendBacklogIsReasonable (
- cacContextNotify & notify, epicsGuard < epicsMutex > & guard )
+void tcpiiu::eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > * pCallbackGuard,
+ epicsGuard < epicsMutex > & mutualExclusionGuard )
{
- guard.assertIdenticalMutex ( this->mutex );
+ mutualExclusionGuard.assertIdenticalMutex ( this->mutex );
- assert ( this->blockingForFlush < UINT_MAX );
- this->blockingForFlush++;
- while ( this->sendQue.flushBlockThreshold(0u) && this->state == iiucs_connected ) {
- epicsGuardRelease < epicsMutex > autoRelease ( guard );
- notify.blockForEventAndEnableCallbacks ( this->flushBlockEvent, 30.0 );
+ if ( this->sendQue.flushBlockThreshold ( 0u ) ) {
+ this->flushRequest ( mutualExclusionGuard );
+ // the process thread is not permitted to flush as this
+ // can result in a push / pull deadlock on the TCP pipe.
+ // Instead, the process thread scheduals the flush with the
+ // send thread which runs at a higher priority than the
+ // receive thread. The same applies to the UDP thread for
+ // locking hierarchy reasons.
+ if ( ! epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
+ // enable / disable of call back preemption must occur here
+ // because the tcpiiu might disconnect while waiting and its
+ // pointer to this cac might become invalid
+ assert ( this->blockingForFlush < UINT_MAX );
+ this->blockingForFlush++;
+ while ( this->sendQue.flushBlockThreshold(0u) &&
+ this->state == iiucs_connected ) {
+ epicsGuardRelease < epicsMutex > autoRelease (
+ mutualExclusionGuard );
+ if ( pCallbackGuard ) {
+ epicsGuardRelease < epicsMutex >
+ autoReleaseCB ( *pCallbackGuard );
+ this->flushBlockEvent.wait ( 30.0 );
+ }
+ else {
+ this->flushBlockEvent.wait ( 30.0 );
+ }
+ }
+ assert ( this->blockingForFlush > 0u );
+ this->blockingForFlush--;
+ if ( this->blockingForFlush == 0 ) {
+ this->flushBlockEvent.signal ();
+ }
+ }
}
- assert ( this->blockingForFlush > 0u );
- this->blockingForFlush--;
- if ( this->blockingForFlush == 0 ) {
- this->flushBlockEvent.signal ();
- }
-}
-
-void tcpiiu::flushRequestIfAboveEarlyThreshold (
- epicsGuard < epicsMutex > & guard )
-{
- guard.assertIdenticalMutex ( this->mutex );
-
- if ( ! this->earlyFlush && this->sendQue.flushEarlyThreshold(0u) ) {
+ else if ( ! this->earlyFlush && this->sendQue.flushEarlyThreshold(0u) ) {
this->earlyFlush = true;
this->sendThreadFlushEvent.signal ();
}
}
-bool tcpiiu::flushBlockThreshold (
- epicsGuard < epicsMutex > & guard ) const
-{
- guard.assertIdenticalMutex ( this->mutex );
- return this->sendQue.flushBlockThreshold ( 0u );
-}
-
osiSockAddr tcpiiu::getNetworkAddress (
epicsGuard < epicsMutex > & guard ) const
{
diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp
index b2a9dfced..2e5560a3b 100644
--- a/src/ca/udpiiu.cpp
+++ b/src/ca/udpiiu.cpp
@@ -1229,25 +1229,10 @@ void udpiiu::flushRequest (
netiiu::flushRequest ( guard );
}
-bool udpiiu::flushBlockThreshold (
- epicsGuard < epicsMutex > & guard ) const
+void udpiiu::eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > *,
+ epicsGuard < epicsMutex > & )
{
- guard.assertIdenticalMutex ( this->cacMutex );
- return netiiu::flushBlockThreshold ( guard );
-}
-
-void udpiiu::flushRequestIfAboveEarlyThreshold (
- epicsGuard < epicsMutex > & guard )
-{
- guard.assertIdenticalMutex ( this->cacMutex );
- netiiu::flushRequestIfAboveEarlyThreshold ( guard );
-}
-
-void udpiiu::blockUntilSendBacklogIsReasonable (
- cacContextNotify & notify, epicsGuard < epicsMutex > & guard )
-{
- guard.assertIdenticalMutex ( this->cacMutex );
- netiiu::blockUntilSendBacklogIsReasonable ( notify, guard );
}
void udpiiu::requestRecvProcessPostponedFlush (
diff --git a/src/ca/udpiiu.h b/src/ca/udpiiu.h
index 886f29602..8daedd6c6 100644
--- a/src/ca/udpiiu.h
+++ b/src/ca/udpiiu.h
@@ -215,12 +215,9 @@ private:
nciu & chan, netSubscription & subscr );
void flushRequest (
epicsGuard < epicsMutex > & );
- bool flushBlockThreshold (
- epicsGuard < epicsMutex > & ) const;
- void flushRequestIfAboveEarlyThreshold (
- epicsGuard < epicsMutex > & );
- void blockUntilSendBacklogIsReasonable
- ( cacContextNotify &, epicsGuard < epicsMutex > & );
+ void eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > * pCallbackGuard,
+ epicsGuard < epicsMutex > & mutualExclusionGuard );
void requestRecvProcessPostponedFlush (
epicsGuard < epicsMutex > & );
osiSockAddr getNetworkAddress (
diff --git a/src/ca/virtualCircuit.h b/src/ca/virtualCircuit.h
index 47ab4257b..eb758ea8b 100644
--- a/src/ca/virtualCircuit.h
+++ b/src/ca/virtualCircuit.h
@@ -126,13 +126,10 @@ public:
void flushRequest (
epicsGuard < epicsMutex > & );
- bool flushBlockThreshold (
- epicsGuard < epicsMutex > & ) const;
- void flushRequestIfAboveEarlyThreshold (
- epicsGuard < epicsMutex > & );
- void blockUntilSendBacklogIsReasonable
- ( cacContextNotify &, epicsGuard < epicsMutex > & );
- virtual void show ( unsigned level ) const;
+ void eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > * pCallbackGuard,
+ epicsGuard < epicsMutex > & mutualExclusionGuard );
+ void show ( unsigned level ) const;
bool setEchoRequestPending (
epicsGuard < epicsMutex > & );
void requestRecvProcessPostponedFlush (
diff --git a/src/db/dbChannelIO.cpp b/src/db/dbChannelIO.cpp
index 78b3e2100..8e21713ab 100644
--- a/src/db/dbChannelIO.cpp
+++ b/src/db/dbChannelIO.cpp
@@ -221,4 +221,9 @@ void dbChannelIO::operator delete ( void * )
__FILE__, __LINE__ );
}
+void dbChannelIO::eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > * /* pCallbackGuard */,
+ epicsGuard < epicsMutex > & /* mutualExclusionGuard */ )
+{
+}
diff --git a/src/db/dbChannelIO.h b/src/db/dbChannelIO.h
index 127b1e1e9..8dfa8e140 100644
--- a/src/db/dbChannelIO.h
+++ b/src/db/dbChannelIO.h
@@ -74,6 +74,9 @@ private:
dbAddr addr;
void initiateConnect (
epicsGuard < epicsMutex > & );
+ void eliminateExcessiveSendBacklog (
+ epicsGuard < epicsMutex > * pCallbackGuard,
+ epicsGuard < epicsMutex > & mutualExclusionGuard );
ioStatus read (
epicsGuard < epicsMutex > &,
unsigned type, unsigned long count,