better fix for mantis 111

This commit is contained in:
Jeff Hill
2005-11-08 21:25:23 +00:00
parent d50d793467
commit 6f6f1d22cd
5 changed files with 101 additions and 97 deletions

View File

@@ -258,8 +258,6 @@ private:
// **** lock hierarchy ****
// 1) callback lock must always be acquired before
// the primary mutex if both locks are needed
// 2) tcpiiu::recvThreadIsRunning lock must always be
// acquired before callback lock if both locks are needed
mutable epicsMutex & mutex;
mutable epicsMutex & cbMutex;
epicsEvent iiuUninstall;

View File

@@ -47,32 +47,15 @@ tcpRecvWatchdog::~tcpRecvWatchdog ()
epicsTimerNotify::expireStatus
tcpRecvWatchdog::expire ( const epicsTime & /* currentTime */ ) // X aCC 361
{
// allow pending receive traffic to run first
this->iiu.deferToRecvBacklog ();
// callback lock is required because channel disconnect
// state change is initiated from this thread, and
// this can cause their disconnect notify callback
// to be invoked.
callbackManager mgr ( this->ctxNotify, this->cbMutex );
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->shuttingDown ) {
return noRestart;
}
if ( this->probeResponsePending ) {
if ( this->iiu.bytesArePendingInOS() ) {
this->iiu.printf ( mgr.cbGuard,
"The CA client library's server inactivity timer initiated server disconnect\n" );
this->iiu.printf ( mgr.cbGuard,
"despite the fact that messages from this server are pending for processing in\n" );
this->iiu.printf ( mgr.cbGuard,
"the client library. Here are some possible causes of the unnecessary disconnect:\n" );
this->iiu.printf ( mgr.cbGuard,
"o ca_pend_event() or ca_poll() have not been called for %f seconds\n",
this->period );
this->iiu.printf ( mgr.cbGuard,
"o application is blocked in a callback from the client library\n" );
if ( this->iiu.receiveThreadIsBusy ( guard ) ) {
return expireStatus ( restart, CA_ECHO_TIMEOUT );
}
{
# ifdef DEBUG
char hostName[128];
@@ -81,12 +64,26 @@ tcpRecvWatchdog::expire ( const epicsTime & /* currentTime */ ) // X aCC 361
"- disconnecting.\n",
hostName, this->period ) );
# endif
this->iiu.receiveTimeoutNotify ( mgr, guard );
this->probeTimeoutDetected = true;
// to get the callback lock safely we must reorder
// the lock hierarchy
epicsGuardRelease < epicsMutex > unguard ( guard );
{
// callback lock is required because channel disconnect
// state change is initiated from this thread, and
// this can cause their disconnect notify callback
// to be invoked.
callbackManager mgr ( this->ctxNotify, this->cbMutex );
epicsGuard < epicsMutex > tmpGuard ( this->mutex );
this->iiu.receiveTimeoutNotify ( mgr, tmpGuard );
this->probeTimeoutDetected = true;
}
}
return noRestart;
}
else {
if ( this->iiu.receiveThreadIsBusy ( guard ) ) {
return expireStatus ( restart, this->period );
}
this->probeTimeoutDetected = false;
this->probeResponsePending = this->iiu.setEchoRequestPending ( guard );
debugPrintf ( ("circuit timed out - sending echo request\n") );

View File

@@ -43,27 +43,29 @@ tcpSendWatchdog::~tcpSendWatchdog ()
epicsTimerNotify::expireStatus tcpSendWatchdog::expire (
const epicsTime & /* currentTime */ )
{
callbackManager mgr ( this->ctxNotify, this->cbMutex );
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->iiu.bytesArePendingInOS() ) {
this->iiu.printf ( mgr.cbGuard,
"The CA client library is disconnecting after a flush request "
"timed out, but receive data is pending, probably because of an "
"application schedualing problem\n" );
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->iiu.receiveThreadIsBusy ( guard ) ) {
return expireStatus ( restart, this->period );
}
}
{
callbackManager mgr ( this->ctxNotify, this->cbMutex );
epicsGuard < epicsMutex > guard ( this->mutex );
# ifdef DEBUG
char hostName[128];
this->iiu.getHostName ( guard, hostName, sizeof ( hostName ) );
debugPrintf ( ( "Request not accepted by CA server %s for %g sec. Disconnecting.\n",
hostName, this->period ) );
# endif
this->iiu.sendTimeoutNotify ( mgr, guard );
this->iiu.sendTimeoutNotify ( mgr, guard );
}
return noRestart;
}
void tcpSendWatchdog::start ( const epicsTime & currentTime )
void tcpSendWatchdog::start ( const epicsTime & /* currentTime */ )
{
this->timer.start ( *this, currentTime + this->period );
this->timer.start ( *this, this->period );
}
void tcpSendWatchdog::cancel ()

View File

@@ -452,26 +452,10 @@ void tcpRecvThread::run ()
epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu );
this->iiu.cacRef.attachToClientCtx ();
comBuf * pComBuf = new ( this->iiu.comBufMemMgr ) comBuf;
comBuf * pComBuf = 0;
bool breakOut = false;
while ( ! breakOut ) {
//
// if this thread has connected channels with subscriptions
// that need to be sent then wakeup the send thread
{
bool wakeupNeeded = false;
{
epicsGuard < epicsMutex > cacGuard ( this->iiu.mutex );
if ( this->iiu.subscripReqPend.count() ) {
wakeupNeeded = true;
}
}
if ( wakeupNeeded ) {
this->iiu.sendThreadFlushEvent.signal ();
}
}
//
// We leave the bytes pending and fetch them after
// callbacks are enabled when running in the old preemptive
@@ -479,6 +463,9 @@ void tcpRecvThread::run ()
// file manager call backs works correctly. This does not
// appear to impact performance.
//
if ( ! pComBuf ) {
pComBuf = new ( this->iiu.comBufMemMgr ) comBuf;
}
statusWireIO stat;
pComBuf->fillFromWire ( this->iiu, stat );
@@ -492,52 +479,48 @@ void tcpRecvThread::run ()
if ( stat.bytesCopied == 0u ) {
continue;
}
// reschedule connection activity watchdog
this->iiu.recvDog.messageArrivalNotify ( guard );
this->iiu.recvQue.pushLastComBufReceived ( *pComBuf );
pComBuf = 0;
this->iiu._receiveThreadIsBusy = true;
}
bool sendWakeupNeeded = false;
{
// This is used to enforce that the recv thread runs
// first when both the receive watchdog and the receive
// thread are both waiting for the callback lock.
// This is a workaround for a premature disconnect if
// they dont call ca_poll often enough.
epicsGuard < epicsMutex >
recvThreadBusy ( this->iiu.recvThreadIsRunning );
// only one recv thread at a time may call callbacks
// - pendEvent() blocks until threads waiting for
// this lock get a chance to run
callbackManager mgr ( this->ctxNotify, this->cbMutex );
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
// force the receive watchdog to be reset every 5 frames
unsigned contiguousFrameCount = 0;
while ( stat.bytesCopied ) {
{
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
if ( stat.bytesCopied == pComBuf->capacityBytes () ) {
if ( this->iiu.contigRecvMsgCount >=
contiguousMsgCountWhichTriggersFlowControl ) {
this->iiu.busyStateDetected = true;
}
else {
this->iiu.contigRecvMsgCount++;
}
if ( stat.bytesCopied == pComBuf->capacityBytes () ) {
if ( this->iiu.contigRecvMsgCount >=
contiguousMsgCountWhichTriggersFlowControl ) {
this->iiu.busyStateDetected = true;
}
else {
this->iiu.contigRecvMsgCount++;
}
else {
this->iiu.contigRecvMsgCount = 0u;
this->iiu.busyStateDetected = false;
}
this->iiu.unacknowledgedSendBytes = 0u;
this->iiu.recvQue.pushLastComBufReceived ( *pComBuf );
}
pComBuf = new ( this->iiu.comBufMemMgr ) comBuf;
else {
this->iiu.contigRecvMsgCount = 0u;
this->iiu.busyStateDetected = false;
}
this->iiu.unacknowledgedSendBytes = 0u;
bool protocolOK = false;
{
epicsGuardRelease < epicsMutex > unguard ( guard );
// execute receive labor
protocolOK = this->iiu.processIncoming ( currentTime, mgr );
}
// execute receive labor
bool protocolOK = this->iiu.processIncoming ( currentTime, mgr );
if ( ! protocolOK ) {
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
this->iiu.initiateAbortShutdown ( guard );
breakOut = true;
break;
@@ -548,15 +531,35 @@ void tcpRecvThread::run ()
break;
}
pComBuf->fillFromWire ( this->iiu, stat );
{
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
if ( ! this->validFillStatus ( guard, stat ) ) {
breakOut = true;
break;
epicsGuardRelease < epicsMutex > unguard ( guard );
if ( ! pComBuf ) {
pComBuf = new ( this->iiu.comBufMemMgr ) comBuf;
}
pComBuf->fillFromWire ( this->iiu, stat );
}
if ( this->validFillStatus ( guard, stat ) ) {
this->iiu.recvQue.pushLastComBufReceived ( *pComBuf );
pComBuf = 0;
}
else {
breakOut = true;
break;
}
}
this->iiu._receiveThreadIsBusy = false;
// reschedule connection activity watchdog
this->iiu.recvDog.messageArrivalNotify ( guard );
//
// if this thread has connected channels with subscriptions
// that need to be sent then wakeup the send thread
if ( this->iiu.subscripReqPend.count() ) {
sendWakeupNeeded = true;
}
}
if ( sendWakeupNeeded ) {
this->iiu.sendThreadFlushEvent.signal ();
}
}
@@ -672,6 +675,7 @@ tcpiiu::tcpiiu (
socketLibrarySendBufferSize ( 0x1000 ),
unacknowledgedSendBytes ( 0u ),
channelCountTot ( 0u ),
_receiveThreadIsBusy ( false ),
busyStateDetected ( false ),
flowControlActive ( false ),
echoRequestPending ( false ),
@@ -868,11 +872,6 @@ void tcpiiu::sendTimeoutNotify (
this->recvDog.sendTimeoutNotify ( mgr.cbGuard, guard );
}
void tcpiiu::deferToRecvBacklog ()
{
epicsGuard < epicsMutex > waitUntilRecvThreadIsntBusy ( this->recvThreadIsRunning );
}
void tcpiiu::receiveTimeoutNotify (
callbackManager & mgr,
epicsGuard < epicsMutex > & guard )
@@ -1033,6 +1032,8 @@ void tcpiiu::show ( unsigned level ) const
static_cast < void * > ( this->pCurData ), this->curDataMax );
::printf ( "\tcontiguous receive message count=%u, busy detect bool=%u, flow control bool=%u\n",
this->contigRecvMsgCount, this->busyStateDetected, this->flowControlActive );
::printf ( "\receive thread is busy=%u\n",
this->_receiveThreadIsBusy );
}
if ( level > 2u ) {
::printf ( "\tvirtual circuit socket identifier %d\n", this->sock );

View File

@@ -112,8 +112,6 @@ public:
void sendTimeoutNotify (
callbackManager & cbMgr,
epicsGuard < epicsMutex > & guard );
// dont call deferToRecvBacklog() while holding the callback lock.
void deferToRecvBacklog ();
void receiveTimeoutNotify(
callbackManager &,
epicsGuard < epicsMutex > & );
@@ -154,6 +152,8 @@ public:
epicsGuard < epicsMutex > & ) const;
bool connecting (
epicsGuard < epicsMutex > & ) const;
bool receiveThreadIsBusy (
epicsGuard < epicsMutex > & );
osiSockAddr getNetworkAddress (
epicsGuard < epicsMutex > & ) const;
int printf (
@@ -176,8 +176,6 @@ public:
epicsGuard < epicsMutex > &, nciu & chan );
void nameResolutionMsgEndNotify ();
bool bytesArePendingInOS () const;
void * operator new ( size_t size,
tsFreeList < class tcpiiu, 32, epicsMutexNOOP > & );
epicsPlacementDeleteOperator (( void *,
@@ -207,7 +205,6 @@ private:
char * pCurData;
epicsMutex & mutex;
epicsMutex & cbMutex;
epicsMutex recvThreadIsRunning;
unsigned minorProtocolVersion;
enum iiu_conn_state {
iiucs_connecting, // pending circuit connect
@@ -224,6 +221,7 @@ private:
unsigned socketLibrarySendBufferSize;
unsigned unacknowledgedSendBytes;
unsigned channelCountTot;
bool _receiveThreadIsBusy;
bool busyStateDetected; // only modified by the recv thread
bool flowControlActive; // only modified by the send process thread
bool echoRequestPending;
@@ -254,6 +252,7 @@ private:
epicsGuard < epicsMutex > & );
void disconnectNotify (
epicsGuard < epicsMutex > & );
bool bytesArePendingInOS () const;
// send protocol stubs
void echoRequest (
@@ -356,6 +355,13 @@ inline bool tcpiiu::connecting (
return ( this->state == iiucs_connecting );
}
inline bool tcpiiu::receiveThreadIsBusy (
epicsGuard < epicsMutex > & guard )
{
guard.assertIdenticalMutex ( this->mutex );
return this->_receiveThreadIsBusy;
}
inline void tcpiiu::beaconAnomalyNotify (
epicsGuard < epicsMutex > & guard )
{