many structural improvements

This commit is contained in:
Jeff Hill
2002-04-25 18:03:17 +00:00
parent afc108fde2
commit bc1972ccd2
2 changed files with 177 additions and 291 deletions

View File

@@ -15,7 +15,6 @@
#include <new>
#include "epicsMemory.h"
#include "epicsMutex.h"
#include "epicsGuard.h"
#include "osiProcess.h"
#include "osiSigPipeIgnore.h"
@@ -30,34 +29,11 @@
#include "syncGroup.h"
#include "nciu.h"
#include "autoPtrRecycle.h"
#include "searchTimer.h"
#include "repeaterSubscribeTimer.h"
#include "msgForMultiplyDefinedPV.h"
#include "udpiiu.h"
#include "bhe.h"
#include "net_convert.h"
#ifdef _MSC_VER
# pragma warning ( push )
# pragma warning ( disable:4660 )
#endif
template class resTable < nciu, chronIntId >;
template class chronIntIdResTable < nciu >;
template class resTable < baseNMIU, chronIntId >;
template class chronIntIdResTable < baseNMIU >;
template class resTable < CASG, chronIntId >;
template class chronIntIdResTable < CASG >;
template class resTable < bhe, inetAddrID >;
template class resTable < tcpiiu, caServerID >;
template class tsFreeList < netReadNotifyIO, 1024, epicsMutexNOOP >;
template class tsFreeList < netWriteNotifyIO, 1024, epicsMutexNOOP >;
template class tsFreeList < netSubscription, 1024, epicsMutexNOOP >;
#ifdef _MSC_VER
# pragma warning ( pop )
#endif
// TCP response dispatch table
const cac::pProtoStubTCP cac::tcpJumpTableCAC [] =
{
@@ -137,12 +113,8 @@ extern "C" void cacExitHandler ()
extern "C" void cacOnceFunc ( void * )
{
caClientCallbackThreadId = epicsThreadPrivateCreate ();
if ( caClientCallbackThreadId ) {
atexit ( cacExitHandler );
}
else {
throw std::bad_alloc ();
}
assert ( caClientCallbackThreadId );
atexit ( cacExitHandler );
}
//
@@ -157,8 +129,6 @@ cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) :
lowestPriorityLevelAbove(epicsThreadGetPrioritySelf()) ) ),
pUserName ( 0 ),
pudpiiu ( 0 ),
pSearchTmr ( 0 ),
pRepeaterSubscribeTmr ( 0 ),
tcpSmallRecvBufFreeList ( 0 ),
tcpLargeRecvBufFreeList ( 0 ),
pCallbackGuard ( 0 ),
@@ -265,11 +235,10 @@ cac::~cac ()
// lock intentionally not held here so that we dont deadlock
// waiting for the UDP thread to exit while it is waiting to
// get the lock.
if ( this->pudpiiu ) {
// this blocks until the UDP thread exits so that
// it will not sneak in any new clients
this->pudpiiu->shutdown ();
}
// this blocks until the UDP thread exits so that
// it will not sneak in any new clients
delete this->pudpiiu;
//
// shutdown all tcp connections
@@ -277,7 +246,7 @@ cac::~cac ()
//
{
epicsGuard < callbackMutex > autoMutexCB ( this->cbMutex );
epicsGuard < epicsMutex > autoMutexCAC ( this->mutex );
epicsGuard < cacMutex > autoMutexCAC ( this->mutex );
this->serverTable.traverse ( & tcpiiu::cleanShutdown );
}
@@ -288,26 +257,14 @@ cac::~cac ()
this->iiuUninstall.wait ();
}
// (after this point all threads that know about this object
// have shut down so we no longer need to lock)
delete this->pRepeaterSubscribeTmr;
delete this->pSearchTmr;
freeListCleanup ( this->tcpSmallRecvBufFreeList );
freeListCleanup ( this->tcpLargeRecvBufFreeList );
if ( this->pudpiiu ) {
while ( nciu *pChan = this->pudpiiu->firstChannel() ) {
{
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
this->pudpiiu->detachChannel ( cbGuard, *pChan );
}
pChan->disconnect ( limboIIU );
limboIIU.attachChannel ( *pChan );
}
epicsGuard < callbackMutex > autoMutexCB ( this->cbMutex );
this->pudpiiu->removeAllChannels ( autoMutexCB );
}
delete this->pudpiiu;
delete [] this->pUserName;
this->beaconTable.traverse ( &bhe::destroy );
@@ -321,21 +278,6 @@ cac::~cac ()
// its his responsibility to clean them up.
}
void cac::removeAllChan ( epicsGuard < callbackMutex > & cbLocker, epicsGuard < epicsMutex > & locker,
netiiu & srcIIU, netiiu & dstIIU )
{
// we are protected here because channel delete takes the callback mutex
while ( nciu *pChan = srcIIU.firstChannel() ) {
// if the claim reply has not returned then we will issue
// the clear channel request to the server when the claim reply
// arrives and there is no matching nciu in the client
if ( pChan->connected() ) {
srcIIU.clearChannelRequest ( pChan->getSID(), pChan->getCID() );
}
this->disconnectChannelPrivate ( cbLocker, locker, *pChan, dstIIU );
}
}
unsigned cac::lowestPriorityLevelAbove ( unsigned priority )
{
unsigned abovePriority;
@@ -365,7 +307,7 @@ unsigned cac::highestPriorityLevelBelow ( unsigned priority )
//
void cac::flushRequest ()
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > autoMutex ( this->mutex );
this->flushRequestPrivate ();
}
@@ -377,13 +319,13 @@ void cac::flushRequestPrivate ()
unsigned cac::connectionCount () const
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
return this->serverTable.numEntriesInstalled ();
}
void cac::show ( unsigned level ) const
{
epicsGuard < epicsMutex > autoMutex2 ( this->mutex );
epicsGuard < cacMutex > autoMutex2 ( this->mutex );
::printf ( "Channel Access Client Context at %p for user %s\n",
static_cast <const void *> ( this ), this->pUserName );
@@ -417,14 +359,6 @@ void cac::show ( unsigned level ) const
this->beaconTable.show ( level - 3u );
::printf ( "Timer queue:\n" );
this->timerQueue.show ( level - 3u );
if ( this->pSearchTmr ) {
::printf ( "search message timer:\n" );
this->pSearchTmr->show ( level - 3u );
}
if ( this->pRepeaterSubscribeTmr ) {
::printf ( "repeater subscribee timer:\n" );
this->pRepeaterSubscribeTmr->show ( level - 3u );
}
::printf ( "IP address to name conversion engine:\n" );
this->ipToAEngine.show ( level - 3u );
::printf ( "\tthe current read sequence number is %u\n",
@@ -447,7 +381,7 @@ void cac::show ( unsigned level ) const
void cac::beaconNotify ( const inetAddrID & addr, const epicsTime & currentTime,
unsigned beaconNumber, unsigned protocolRevision )
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
if ( ! this->pudpiiu ) {
return;
@@ -483,28 +417,7 @@ void cac::beaconNotify ( const inetAddrID & addr, const epicsTime & currentTime,
return;
}
/*
* This part is needed when many machines
* have channels in a disconnected state that
* dont exist anywhere on the network. This insures
* that we dont have many CA clients synchronously
* flooding the network with broadcasts (and swamping
* out requests for valid channels).
*
* I fetch the local UDP port number and use the low
* order bits as a pseudo random delay to prevent every
* one from replying at once.
*/
if ( this->pSearchTmr ) {
static const double portTicksPerSec = 1000u;
static const unsigned portBasedDelayMask = 0xff;
unsigned port = this->pudpiiu->getPort ();
double delay = ( port & portBasedDelayMask );
delay /= portTicksPerSec;
this->pSearchTmr->resetPeriod ( delay );
}
this->pudpiiu->resetChannelRetryCounts ();
this->pudpiiu->beaconAnomalyNotify ();
# if DEBUG
{
@@ -535,7 +448,7 @@ int cac::pendIO ( const double & timeout )
double remaining = timeout;
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
this->flushRequestPrivate ();
}
@@ -563,12 +476,13 @@ int cac::pendIO ( const double & timeout )
}
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
this->readSeq++;
this->pndRecvCnt = 0u;
if ( this->pudpiiu ) {
this->pudpiiu->connectTimeoutNotify ();
}
}
if ( this->pudpiiu ) {
this->pudpiiu->pendioTimeoutNotify ();
}
return status;
@@ -605,7 +519,7 @@ int cac::pendEvent ( const double & timeout )
epicsTime current = epicsTime::getCurrent ();
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
this->flushRequestPrivate ();
}
@@ -640,19 +554,19 @@ int cac::pendEvent ( const double & timeout )
void cac::installCASG ( CASG &sg )
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
this->sgTable.add ( sg );
}
void cac::uninstallCASG ( CASG &sg )
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
this->sgTable.remove ( sg );
}
CASG * cac::lookupCASG ( unsigned id )
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
CASG * psg = this->sgTable.lookup ( id );
if ( psg ) {
if ( ! psg->verify () ) {
@@ -684,74 +598,27 @@ cacChannel & cac::createChannel ( const char * pName,
if ( ! pIO ) {
pIO = pGlobalServiceListCAC->createChannel ( pName, chan, pri );
if ( ! pIO ) {
if ( ! this->pudpiiu || ! this->pSearchTmr ) {
if ( ! this->setupUDP () ) {
throw ECA_INTERNAL;
if ( ! this->pudpiiu ) {
epicsGuard < cacMutex > guard ( this->mutex );
if ( ! this->pudpiiu ) {
this->pudpiiu = new udpiiu ( this->timerQueue, this->cbMutex, *this );
}
}
epics_auto_ptr < cacChannel > pNetChan
( new nciu ( *this, limboIIU, chan, pName, pri ) );
epicsGuard < cacMutex > guard ( this->mutex );
epics_auto_ptr < nciu > pNetChan
( new nciu ( *this, *this->pudpiiu, chan, pName, pri ) );
pNetChan->notifyStateChangeFirstConnectInCountOfOutstandingIO ();
this->chanTable.add ( *pNetChan );
return *pNetChan.release ();
}
}
return *pIO;
}
void cac::installNetworkChannel ( nciu & chan, netiiu * & piiu )
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
this->chanTable.add ( chan );
this->pudpiiu->attachChannel ( chan );
piiu = this->pudpiiu;
this->pSearchTmr->resetPeriod ( 0.0 );
}
bool cac::setupUDP ()
{
if ( ! this->pudpiiu ) {
udpiiu *piiu = 0;
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( ! this->pudpiiu ) {
piiu = this->pudpiiu = new udpiiu ( this->cbMutex, *this );
if ( ! this->pudpiiu ) {
return false;
}
}
}
if ( piiu ) {
piiu->start ( cbGuard );
}
}
if ( ! this->pSearchTmr ) {
epicsGuard < epicsMutex > autoMutex ( this->mutex );
if ( ! this->pSearchTmr ) {
this->pSearchTmr = new searchTimer ( *this->pudpiiu, this->timerQueue, this->mutex );
if ( ! this->pSearchTmr ) {
return false;
}
}
}
if ( ! this->pRepeaterSubscribeTmr ) {
epicsGuard < epicsMutex > autoMutex ( this->mutex );
if ( ! this->pRepeaterSubscribeTmr ) {
this->pRepeaterSubscribeTmr = new repeaterSubscribeTimer ( *this->pudpiiu, this->timerQueue );
if ( ! this->pRepeaterSubscribeTmr ) {
return false;
}
}
}
return true;
}
void cac::repeaterSubscribeConfirmNotify ()
{
if ( this->pRepeaterSubscribeTmr ) {
this->pRepeaterSubscribeTmr->confirmNotify ();
if ( this->pudpiiu ) {
this->pudpiiu->repeaterConfirmNotify ();
}
}
@@ -769,30 +636,30 @@ bool cac::lookupChannelAndTransferToTCP (
}
bool v41Ok, v42Ok;
nciu *chan;
nciu *pChan;
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
/*
* ignore search replies for deleted channels
*/
chan = this->chanTable.lookup ( cid );
if ( ! chan ) {
pChan = this->chanTable.lookup ( cid );
if ( ! pChan ) {
return true;
}
retrySeqNumber = chan->getRetrySeqNo ();
retrySeqNumber = pChan->getRetrySeqNo ();
/*
* Ignore duplicate search replies
*/
osiSockAddr chanAddr = chan->getPIIU()->getNetworkAddress ();
osiSockAddr chanAddr = pChan->getPIIU()->getNetworkAddress ();
if ( chanAddr.sa.sa_family != AF_UNSPEC ) {
if ( ! sockAddrAreIdentical ( &addr, &chanAddr ) ) {
char acc[64];
chan->getPIIU()->hostName ( acc, sizeof ( acc ) );
pChan->getPIIU()->hostName ( acc, sizeof ( acc ) );
msgForMultiplyDefinedPV *pMsg = new msgForMultiplyDefinedPV (
this->cbMutex, *this, chan->pName (), acc, addr );
this->cbMutex, *this, pChan->pName (), acc, addr );
if ( pMsg ) {
this->ipAddrToAsciiAsynchronousRequestInstall ( *pMsg );
}
@@ -803,7 +670,7 @@ bool cac::lookupChannelAndTransferToTCP (
/*
* look for an existing virtual circuit
*/
caServerID servID ( addr.ia, chan->getPriority() );
caServerID servID ( addr.ia, pChan->getPriority() );
tcpiiu * piiu = this->serverTable.lookup ( servID );
if ( piiu ) {
if ( ! piiu->alive () ) {
@@ -815,7 +682,7 @@ bool cac::lookupChannelAndTransferToTCP (
pnewiiu = piiu = new tcpiiu (
*this, this->cbMutex, this->connTMO, this->timerQueue,
addr, minorVersionNumber, this->ipToAEngine,
chan->getPriority() );
pChan->getPriority() );
if ( ! piiu ) {
return true;
}
@@ -841,21 +708,17 @@ bool cac::lookupChannelAndTransferToTCP (
}
}
this->pudpiiu->detachChannel ( cbGuard, *chan );
chan->searchReplySetUp ( *piiu, sid, typeCode, count );
piiu->attachChannel ( *chan );
chan->createChannelRequest ();
piiu->flushRequest ();
this->pudpiiu->uninstallChannel ( cbGuard, guard, *pChan );
piiu->installChannel ( guard, *pChan, sid, typeCode, count );
v41Ok = piiu->ca_v41_ok ();
v42Ok = piiu->ca_v42_ok ();
if ( ! v42Ok ) {
// connect to old server with lock applied
chan->connect ();
pChan->connect ();
// resubscribe for monitors from this channel
this->connectAllIO ( *chan );
this->connectAllIO ( guard, *pChan );
}
}
@@ -868,7 +731,7 @@ bool cac::lookupChannelAndTransferToTCP (
// disconnects to prevent a race condition with the
// code below - ie we hold the callback lock here
// so a chanel cant be destroyed out from under us.
chan->connectStateNotify ();
pChan->connectStateNotify ();
/*
* if less than v4.1 then the server will never
@@ -877,7 +740,7 @@ bool cac::lookupChannelAndTransferToTCP (
* their call back here
*/
if ( ! v41Ok ) {
chan->accessRightsNotify ();
pChan->accessRightsNotify ();
}
}
@@ -885,10 +748,8 @@ bool cac::lookupChannelAndTransferToTCP (
pnewiiu->start ( cbGuard );
}
if ( this->pSearchTmr ) {
// deadlock can result if this is called while holding the primary
// mutex (because the primary mutex is used in the search timer callback)
this->pSearchTmr->notifySearchResponse ( retrySeqNumber, currentTime );
if ( this->pudpiiu ) {
this->pudpiiu->notifySearchResponse ( retrySeqNumber, currentTime );
}
return true;
@@ -903,7 +764,7 @@ void cac::uninstallChannel ( nciu & chan )
// side effect IO requests w/o holding the callback lock so that
// we do not dead lock
{
epicsGuard < epicsMutex > guard ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
// if the send backlog is too high send some frames before we get entagled
// in the channel shutdown sequence below. There is special protection in
@@ -938,7 +799,7 @@ void cac::uninstallChannel ( nciu & chan )
class netSubscription *pSubscr = pIO->isSubscription ();
if ( pSubscr && chan.connected() ) {
// we will deadlock if we hold the callback lock here
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
chan.getPIIU()->subscriptionCancelRequest ( guard, chan, *pSubscr );
}
tmpList.add ( *pIO );
}
@@ -947,7 +808,7 @@ void cac::uninstallChannel ( nciu & chan )
// the clear channel request to the server when the claim reply
// arrives and there is no matching nciu in the client
if ( chan.connected() ) {
chan.getPIIU()->clearChannelRequest ( chan.getSID(), chan.getCID() );
chan.getPIIU()->clearChannelRequest ( guard, chan.getSID(), chan.getCID() );
}
}
@@ -988,9 +849,11 @@ void cac::uninstallChannel ( nciu & chan )
// o clear channel request
// o outstanding callbacks using this channel have completed
// o chan destroy exception has been delivered
epicsGuard < epicsMutex > autoMutex ( this->mutex );
// this destroys the tcpiiu if its the last channel
chan.getPIIU()->detachChannel ( cbGuard, chan );
{
epicsGuard < cacMutex > guard ( this->mutex );
// this destroys the tcpiiu if its the last channel
chan.getPIIU()->uninstallChannel ( cbGuard, guard, chan );
}
}
}
@@ -1009,9 +872,9 @@ int cac::printf ( const char *pformat, ... ) const
}
// lock must be applied before calling this cac private routine
void cac::flushIfRequired ( epicsGuard < epicsMutex > & guard, netiiu & iiu )
void cac::flushIfRequired ( epicsGuard < cacMutex > & guard, netiiu & iiu )
{
if ( iiu.flushBlockThreshold() ) {
if ( iiu.flushBlockThreshold ( guard ) ) {
iiu.flushRequest ();
// the process thread is not permitted to flush as this
// can result in a push / pull deadlock on the TCP pipe.
@@ -1033,29 +896,29 @@ void cac::flushIfRequired ( epicsGuard < epicsMutex > & guard, netiiu & iiu )
}
}
else {
iiu.flushRequestIfAboveEarlyThreshold ();
iiu.flushRequestIfAboveEarlyThreshold ( guard );
}
}
void cac::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue )
void cac::writeRequest ( nciu & chan, unsigned type, unsigned nElem, const void * pValue )
{
epicsGuard < epicsMutex > guard ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
this->flushIfRequired ( guard, *chan.getPIIU() );
chan.getPIIU()->writeRequest ( chan, type, nElem, pValue );
chan.getPIIU()->writeRequest ( guard, chan, type, nElem, pValue );
}
cacChannel::ioid
cac::writeNotifyRequest ( nciu &chan, unsigned type, // X aCC 361
unsigned nElem, const void *pValue, cacWriteNotify &notifyIn )
{
epicsGuard < epicsMutex > guard ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
autoPtrRecycle < netWriteNotifyIO > pIO ( this->ioTable, chan.cacPrivateListOfIO::eventq,
*this, netWriteNotifyIO::factory ( this->freeListWriteNotifyIO, chan, notifyIn ) );
this->ioTable.add ( *pIO );
chan.cacPrivateListOfIO::eventq.add ( *pIO );
this->flushIfRequired ( guard, *chan.getPIIU() );
chan.getPIIU()->writeNotifyRequest (
chan, *pIO, type, nElem, pValue );
guard, chan, *pIO, type, nElem, pValue );
return pIO.release()->getId ();
}
@@ -1063,14 +926,14 @@ cacChannel::ioid
cac::readNotifyRequest ( nciu &chan, unsigned type, // X aCC 361
unsigned nElem, cacReadNotify &notifyIn )
{
epicsGuard < epicsMutex > guard ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
autoPtrRecycle < netReadNotifyIO > pIO ( this->ioTable,
chan.cacPrivateListOfIO::eventq, *this,
netReadNotifyIO::factory ( this->freeListReadNotifyIO, chan, notifyIn ) );
this->ioTable.add ( *pIO );
chan.cacPrivateListOfIO::eventq.add ( *pIO );
this->flushIfRequired ( guard, *chan.getPIIU() );
chan.getPIIU()->readNotifyRequest ( chan, *pIO, type, nElem );
chan.getPIIU()->readNotifyRequest ( guard, chan, *pIO, type, nElem );
return pIO.release()->getId ();
}
@@ -1082,7 +945,7 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
// but do _not_ hold the callback lock here because this could result
// in deadlock
{
epicsGuard < epicsMutex > guard ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
pmiu = this->ioTable.remove ( id );
if ( ! pmiu ) {
return;
@@ -1091,7 +954,7 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
if ( pSubscr ) {
this->flushIfRequired ( guard, *chan.getPIIU() );
if ( chan.connected() ) {
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
chan.getPIIU()->subscriptionCancelRequest ( guard, chan, *pSubscr );
}
}
// must be uninstalled and also removed from the table
@@ -1115,14 +978,14 @@ void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
}
// now it is safe to destroy the IO object
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > autoMutex ( this->mutex );
pmiu->destroy ( *this );
}
}
void cac::ioShow ( const cacChannel::ioid &id, unsigned level ) const
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.lookup ( id );
if ( pmiu ) {
pmiu->show ( level );
@@ -1135,7 +998,7 @@ void cac::ioCompletionNotify ( unsigned id, unsigned type,
baseNMIU * pmiu;
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > autoMutex ( this->mutex );
pmiu = this->ioTable.lookup ( id );
if ( ! pmiu ) {
return;
@@ -1155,7 +1018,7 @@ void cac::ioExceptionNotify ( unsigned id, int status, const char *pContext )
{
baseNMIU * pmiu;
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > autoMutex ( this->mutex );
pmiu = this->ioTable.lookup ( id );
}
@@ -1179,7 +1042,7 @@ void cac::ioExceptionNotify ( unsigned id, int status,
baseNMIU * pmiu;
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > autoMutex ( this->mutex );
pmiu = this->ioTable.lookup ( id );
if ( ! pmiu ) {
return;
@@ -1197,7 +1060,7 @@ void cac::ioExceptionNotify ( unsigned id, int status,
void cac::ioCompletionNotifyAndDestroy ( unsigned id )
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( ! pmiu ) {
return;
@@ -1212,7 +1075,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id )
// it is in use here.
//
{
epicsGuardRelease < epicsMutex > autoMutexRelease ( autoMutex );
epicsGuardRelease < cacMutex > autoMutexRelease ( autoMutex );
pmiu->completion ();
}
@@ -1222,7 +1085,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id )
void cac::ioCompletionNotifyAndDestroy ( unsigned id,
unsigned type, arrayElementCount count, const void *pData )
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( ! pmiu ) {
return;
@@ -1236,7 +1099,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id,
// it is in use here.
//
{
epicsGuardRelease < epicsMutex > autoMutexRelease ( autoMutex );
epicsGuardRelease < cacMutex > autoMutexRelease ( autoMutex );
pmiu->completion ( type, count, pData );
}
@@ -1246,7 +1109,7 @@ void cac::ioCompletionNotifyAndDestroy ( unsigned id,
void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
const char *pContext )
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( ! pmiu ) {
return;
@@ -1260,7 +1123,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
// it is in use here.
//
{
epicsGuardRelease < epicsMutex > autoMutexRelease ( autoMutex );
epicsGuardRelease < cacMutex > autoMutexRelease ( autoMutex );
pmiu->exception ( status, pContext );
}
@@ -1270,7 +1133,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
const char *pContext, unsigned type, arrayElementCount count )
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( ! pmiu ) {
return;
@@ -1285,7 +1148,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
//
{
epicsGuardRelease < epicsMutex > autoMutexRelease ( autoMutex );
epicsGuardRelease < cacMutex > autoMutexRelease ( autoMutex );
pmiu->exception ( status, pContext, type, count );
}
@@ -1293,8 +1156,7 @@ void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
}
// resubscribe for monitors from this channel
// (lock must be applied)
void cac::connectAllIO ( nciu & chan )
void cac::connectAllIO ( epicsGuard < cacMutex > & guard, nciu & chan )
{
tsDLIterBD < baseNMIU > pNetIO =
chan.cacPrivateListOfIO::eventq.firstIter ();
@@ -1305,7 +1167,7 @@ void cac::connectAllIO ( nciu & chan )
// disconnected channels should have only subscription IO attached
assert ( pSubscr );
try {
chan.getPIIU()->subscriptionRequest ( chan, *pSubscr );
chan.getPIIU()->subscriptionRequest ( guard, chan, *pSubscr );
}
catch ( ... ) {
this->printf ( "CAC: failed to send subscription request during channel connect\n" );
@@ -1317,7 +1179,7 @@ void cac::connectAllIO ( nciu & chan )
// cancel IO operations and monitor subscriptions
// -- callback lock and cac lock must be applied here
void cac::disconnectAllIO ( epicsGuard < epicsMutex > &locker, nciu & chan, bool enableCallbacks )
void cac::disconnectAllIO ( epicsGuard < cacMutex > &locker, nciu & chan, bool enableCallbacks )
{
tsDLIterBD<baseNMIU> pNetIO = chan.cacPrivateListOfIO::eventq.firstIter();
while ( pNetIO.valid() ) {
@@ -1330,8 +1192,8 @@ void cac::disconnectAllIO ( epicsGuard < epicsMutex > &locker, nciu & chan, bool
}
if ( enableCallbacks ) {
char buf[128];
sprintf ( buf, "host = %100s", chan.pHostName() );
epicsGuardRelease < epicsMutex > unlocker ( locker );
sprintf ( buf, "host = %.100s", chan.pHostName() );
epicsGuardRelease < cacMutex > unlocker ( locker );
pNetIO->exception ( ECA_DISCONN, buf );
}
if ( ! pNetIO->isSubscription() ) {
@@ -1361,7 +1223,7 @@ cac::subscriptionRequest ( nciu &chan, unsigned type, // X aCC 361
arrayElementCount nElem, unsigned mask,
cacStateNotify &notifyIn )
{
epicsGuard < epicsMutex > guard ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
autoPtrRecycle < netSubscription > pIO ( this->ioTable,
chan.cacPrivateListOfIO::eventq, *this,
netSubscription::factory ( this->freeListSubscription,
@@ -1370,7 +1232,7 @@ cac::subscriptionRequest ( nciu &chan, unsigned type, // X aCC 361
chan.cacPrivateListOfIO::eventq.add ( *pIO );
if ( chan.connected () ) {
this->flushIfRequired ( guard, *chan.getPIIU() );
chan.getPIIU()->subscriptionRequest ( chan, *pIO );
chan.getPIIU()->subscriptionRequest ( guard, chan, *pIO );
}
cacChannel::ioid id = pIO->getId ();
pIO.release ();
@@ -1513,7 +1375,7 @@ bool cac::defaultExcep ( epicsGuard < callbackMutex > &, tcpiiu &iiu,
char buf[512];
char hostName[64];
iiu.hostName ( hostName, sizeof ( hostName ) );
sprintf ( buf, "host=%64s ctx=%400s", hostName, pCtx );
sprintf ( buf, "host=%s ctx=%.400s", hostName, pCtx );
this->notify.exception ( status, buf, 0, 0u );
return true;
}
@@ -1611,7 +1473,7 @@ bool cac::accessRightsRespAction ( epicsGuard < callbackMutex > &, tcpiiu &,
{
nciu * pChan;
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
pChan = this->chanTable.lookup ( hdr.m_cid );
if ( pChan ) {
unsigned ar = hdr.m_available;
@@ -1640,7 +1502,7 @@ bool cac::claimCIURespAction ( epicsGuard < callbackMutex > &, tcpiiu & iiu,
nciu * pChan;
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
pChan = this->chanTable.lookup ( hdr.m_cid );
if ( pChan ) {
unsigned sidTmp;
@@ -1651,12 +1513,12 @@ bool cac::claimCIURespAction ( epicsGuard < callbackMutex > &, tcpiiu & iiu,
sidTmp = pChan->getSID ();
}
pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp, iiu.ca_v41_ok() );
this->connectAllIO ( *pChan );
this->connectAllIO ( guard, *pChan );
}
else if ( iiu.ca_v44_ok() ) {
// this indicates a claim response for a resource that does
// not exist in the client - so just remove it from the server
iiu.clearChannelRequest ( hdr.m_available, hdr.m_cid );
iiu.clearChannelRequest ( guard, hdr.m_available, hdr.m_cid );
}
}
// the callback lock is taken when a channel is unistalled or when
@@ -1668,37 +1530,32 @@ bool cac::claimCIURespAction ( epicsGuard < callbackMutex > &, tcpiiu & iiu,
}
bool cac::verifyAndDisconnectChan (
epicsGuard < callbackMutex > & cbMutexIn, tcpiiu & /* iiu */,
epicsGuard < callbackMutex > & cbGuard, tcpiiu & /* iiu */,
const caHdrLargeArray & hdr, void * /* pMsgBdy */ )
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
nciu * pChan = this->chanTable.lookup ( hdr.m_cid );
if ( ! pChan ) {
return true;
}
assert ( this->pudpiiu );
this->disconnectChannelPrivate ( cbMutexIn, autoMutex,
*pChan, *this->pudpiiu );
this->pSearchTmr->resetPeriod ( 0.0 );
this->disconnectChannel ( cbGuard, guard, *pChan );
if ( this->pudpiiu ) {
this->pudpiiu->resetSearchTimerPeriod ( 0.0 );
}
return true;
}
void cac::disconnectChannelPrivate ( epicsGuard < callbackMutex > & cbLocker,
epicsGuard < epicsMutex > & locker,
nciu & chan, netiiu & dstIIU )
void cac::disconnectChannel ( epicsGuard < callbackMutex > & cbLocker,
epicsGuard < cacMutex > & locker,
nciu & chan )
{
this->disconnectAllIO ( locker, chan, true );
chan.getPIIU()->detachChannel ( cbLocker, chan );
chan.disconnect ( limboIIU );
limboIIU.attachChannel ( chan );
{
epicsGuardRelease < epicsMutex > autoMutexRelease ( locker );
chan.connectStateNotify ();
chan.accessRightsNotify ();
}
limboIIU.detachChannel ( cbLocker, chan );
chan.disconnect ( dstIIU );
dstIIU.attachChannel ( chan );
chan.disconnect ( *this->pudpiiu );
this->pudpiiu->installChannel ( chan );
epicsGuardRelease < cacMutex > autoMutexRelease ( locker );
chan.connectStateNotify ();
chan.accessRightsNotify ();
}
bool cac::badTCPRespAction ( epicsGuard < callbackMutex > &, tcpiiu & iiu,
@@ -1785,7 +1642,7 @@ void cac::vSignal ( int ca_status, const char *pfilenm,
void cac::incrementOutstandingIO ()
{
epicsGuard < epicsMutex > locker ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
if ( this->pndRecvCnt < UINT_MAX ) {
this->pndRecvCnt++;
}
@@ -1799,7 +1656,7 @@ void cac::decrementOutstandingIO ()
bool signalNeeded;
{
epicsGuard < epicsMutex > locker ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
if ( this->pndRecvCnt > 0u ) {
this->pndRecvCnt--;
if ( this->pndRecvCnt == 0u ) {
@@ -1824,7 +1681,7 @@ void cac::decrementOutstandingIO ( unsigned sequenceNo )
bool signalNeeded;
{
epicsGuard < epicsMutex > locker ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
if ( this->readSeq == sequenceNo ) {
if ( this->pndRecvCnt > 0u ) {
this->pndRecvCnt--;
@@ -1873,15 +1730,14 @@ void cac::tcpCircuitShutdown ( tcpiiu & iiu, bool discardPendingMessages )
// will get called in preemptive callback disabled
// applications, and therefore the callback lock below
// will not block
{
epicsGuard < epicsMutex > autoMutexCAC ( this->mutex );
if ( this->pudpiiu ) {
this->pudpiiu->wakeupMsg ();
}
if ( this->pudpiiu ) {
this->pudpiiu->wakeupMsg ();
}
{
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
epicsGuard < cacMutex > guard ( this->mutex );
iiu.shutdown ( cbGuard, guard, discardPendingMessages );
}
epicsGuard < callbackMutex > cbGuard ( this->cbMutex );
epicsGuard < epicsMutex > guard ( this->mutex );
iiu.shutdown ( cbGuard, discardPendingMessages );
}
void cac::uninstallIIU ( tcpiiu & iiu )
@@ -1890,9 +1746,9 @@ void cac::uninstallIIU ( tcpiiu & iiu )
this->privateUninstallIIU ( cbGuard, iiu );
}
void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbMutexIn, tcpiiu & iiu )
void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbGuard, tcpiiu & iiu )
{
epicsGuard < epicsMutex > autoMutexCAC ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
if ( iiu.channelCount() ) {
char hostNameTmp[64];
iiu.hostName ( hostNameTmp, sizeof ( hostNameTmp ) );
@@ -1912,11 +1768,11 @@ void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbMutexIn, tcpiiu
this->serverTable.remove ( iiu );
assert ( this->pudpiiu );
this->removeAllChan ( cbMutexIn, autoMutexCAC, iiu, *this->pudpiiu );
iiu.removeAllChannels ( cbGuard, guard, *this );
delete &iiu;
this->pSearchTmr->resetPeriod ( 0.0 );
this->pudpiiu->resetSearchTimerPeriod ( 0.0 );
// signal iiu uninstal event so that cac can properly shut down
this->iiuUninstall.signal();
@@ -1924,7 +1780,7 @@ void cac::privateUninstallIIU ( epicsGuard < callbackMutex > & cbMutexIn, tcpiiu
double cac::beaconPeriod ( const nciu & chan ) const
{
epicsGuard < epicsMutex > locker ( this->mutex );
epicsGuard < cacMutex > guard ( this->mutex );
const netiiu * pIIU = chan.getConstPIIU ();
if ( pIIU ) {
osiSockAddr addr = pIIU->getNetworkAddress ();
@@ -1938,3 +1794,10 @@ double cac::beaconPeriod ( const nciu & chan ) const
}
return - DBL_MAX;
}
void cac::initiateConnect ( nciu & chan )
{
assert ( this->pudpiiu );
this->pudpiiu->installChannel ( chan );
}

View File

@@ -75,11 +75,27 @@ private:
epicsEvent noRecvThreadsPending;
unsigned recvThreadsPendingCount;
bool threadsMayBeBlockingForRecvThreadsToFinish;
//callbackMutex ( callbackMutex & );
//callbackMutex & operator = ( callbackMutex & );
callbackMutex ( callbackMutex & );
callbackMutex & operator = ( callbackMutex & );
};
class cac : private cacRecycle
class cacMutex {
public:
void lock ();
void unlock ();
void show ( unsigned level ) const;
private:
epicsMutex mutex;
};
class cacDisconnectChannelPrivate {
public:
virtual void disconnectChannel (
epicsGuard < callbackMutex > &,
epicsGuard < cacMutex > &, nciu & chan ) = 0;
};
class cac : private cacRecycle, private cacDisconnectChannelPrivate
{
public:
cac ( cacNotify &, bool enablePreemptiveCallback = false );
@@ -107,7 +123,6 @@ public:
void ioShow ( const cacChannel::ioid &id, unsigned level ) const;
// channel routines
void installNetworkChannel ( nciu &, netiiu *&piiu );
bool lookupChannelAndTransferToTCP (
epicsGuard < callbackMutex > &,
unsigned cid, unsigned sid,
@@ -118,6 +133,7 @@ public:
cacChannel & createChannel ( const char *name_str,
cacChannelNotify &chan, cacChannel::priLev pri );
void registerService ( cacService &service );
void initiateConnect ( nciu & );
// IO request stubs
void writeRequest ( nciu &, unsigned type,
@@ -162,7 +178,7 @@ public:
// misc
const char * userNamePointer () const;
unsigned getInitializingThreadsPriority () const;
epicsMutex & mutexRef ();
cacMutex & mutexRef ();
void attachToClientCtx ();
void selfTest () const;
void notifyNewFD ( epicsGuard < callbackMutex > &, SOCKET ) const;
@@ -202,15 +218,12 @@ private:
// callback lock must always be acquired before
// the primary mutex if both locks are needed
callbackMutex cbMutex;
mutable epicsMutex mutex;
mutable cacMutex mutex;
epicsEvent ioDone;
epicsEvent iiuUninstall;
epicsTimerQueueActive & timerQueue;
char * pUserName;
class udpiiu * pudpiiu;
class searchTimer * pSearchTmr;
class repeaterSubscribeTimer
* pRepeaterSubscribeTmr;
void * tcpSmallRecvBufFreeList;
void * tcpLargeRecvBufFreeList;
epicsGuard <callbackMutex> * pCallbackGuard;
@@ -224,20 +237,15 @@ private:
void privateUninstallIIU ( epicsGuard < callbackMutex > &, tcpiiu &iiu );
void flushRequestPrivate ();
void run ();
bool setupUDP ();
void connectAllIO ( nciu &chan );
void disconnectAllIO ( epicsGuard < epicsMutex > & locker, nciu & chan, bool enableCallbacks );
void flushIfRequired ( epicsGuard < epicsMutex > &, netiiu & );
void connectAllIO ( epicsGuard < cacMutex > &, nciu &chan );
void disconnectAllIO ( epicsGuard < cacMutex > & locker, nciu & chan, bool enableCallbacks );
void flushIfRequired ( epicsGuard < cacMutex > &, netiiu & );
void recycleReadNotifyIO ( netReadNotifyIO &io );
void recycleWriteNotifyIO ( netWriteNotifyIO &io );
void recycleSubscription ( netSubscription &io );
void removeAllChan (
epicsGuard < callbackMutex > & cbLocker, epicsGuard < epicsMutex > &locker,
netiiu & srcIIU, netiiu & dstIIU );
void disconnectChannelPrivate (
epicsGuard < callbackMutex > &, epicsGuard < epicsMutex > &,
nciu & chan, netiiu & dstIIU );
void disconnectChannel (
epicsGuard < callbackMutex > &, epicsGuard < cacMutex > &, nciu & chan );
void ioCompletionNotify ( unsigned id, unsigned type,
arrayElementCount count, const void *pData );
@@ -327,7 +335,7 @@ inline unsigned cac::sequenceNumberOfOutstandingIO () const
return this->readSeq;
}
inline epicsMutex & cac::mutexRef ()
inline cacMutex & cac::mutexRef ()
{
return this->mutex;
}
@@ -387,5 +395,20 @@ inline bool cac::preemptiveCallbakIsEnabled () const
return ! this->pCallbackGuard;
}
inline void cacMutex::lock ()
{
this->mutex.lock ();
}
inline void cacMutex::unlock ()
{
this->mutex.unlock ();
}
inline void cacMutex::show ( unsigned level ) const
{
this->mutex.show ( level );
}
#endif // ifdef cach