changed locking hierarchy

This commit is contained in:
Jeff Hill
2001-03-21 01:07:27 +00:00
parent 0a9f914076
commit c0c2d30209
17 changed files with 1181 additions and 1328 deletions
+1 -1
View File
@@ -87,7 +87,7 @@ int CASG::block ( double timeout )
cur_time = epicsTime::getCurrent ();
this->client.flush ();
this->client.flushRequest ();
beg_time = cur_time;
delay = 0.0;
+1 -1
View File
@@ -500,7 +500,7 @@ extern "C" int epicsShareAPI ca_flush_io ()
return caStatus;
}
pcac->flush ();
pcac->flushRequest ();
return ECA_NORMAL;
}
+2 -13
View File
@@ -25,18 +25,11 @@ baseNMIU::~baseNMIU ()
void baseNMIU::cancel ()
{
unsigned i = 0u;
while ( ! this->chan.getPIIU ()->uninstallIO ( *this ) ) {
if ( i++ > 1000u ) {
ca_printf ( "CAC: unable to destroy IO\n" );
break;
}
}
this->ioCancelRequest ();
this->chan.uninstallIO ( *this );
delete this;
}
class netSubscription * baseNMIU::isSubscription ()
class netSubscription * baseNMIU::isSubscription ()
{
return 0;
}
@@ -52,8 +45,4 @@ cacChannelIO & baseNMIU::channelIO () const
return this->chan;
}
void baseNMIU::ioCancelRequest ()
{
}
+348 -27
View File
@@ -22,6 +22,9 @@
#include "comQueSend_IL.h"
#include "recvProcessThread_IL.h"
#include "netiiu_IL.h"
#include "netWriteNotifyIO_IL.h"
#include "netReadNotifyIO_IL.h"
#include "baseNMIU_IL.h"
//
// cac::cac ()
@@ -29,6 +32,7 @@
cac::cac ( bool enablePreemptiveCallbackIn ) :
ipToAEngine ( "caIPAddrToAsciiEngine" ),
chanTable ( 1024 ),
ioTable ( 1024 ),
sgTable ( 128 ),
beaconTable ( 1024 ),
fdRegFunc ( 0 ),
@@ -123,10 +127,13 @@ cac::~cac ()
this->pRecvProcThread->disable ();
}
if ( this->pudpiiu ) {
// this blocks until the UDP thread exits so that
// it will not sneak in any new clients
this->pudpiiu->shutdown ();
{
epicsAutoMutex autoMutex ( this->defaultMutex );
if ( this->pudpiiu ) {
// this blocks until the UDP thread exits so that
// it will not sneak in any new clients
this->pudpiiu->shutdown ();
}
}
//
@@ -169,18 +176,17 @@ cac::~cac ()
delete this->pSearchTmr;
}
if ( this->pudpiiu ) {
{
epicsAutoMutex autoMutex ( this->defaultMutex );
if ( this->pudpiiu ) {
//
// make certain that the UDP thread isnt starting
// up new clients. this adds an additional
// requirement that threads
//
{
epicsAutoMutex autoMutex ( this->defaultMutex );
//
// make certain that the UDP thread isnt starting
// up new clients.
//
this->pudpiiu->disconnectAllChan ( limboIIU );
delete this->pudpiiu;
}
delete this->pudpiiu;
}
//
@@ -192,6 +198,13 @@ cac::~cac ()
this->beaconTable.traverse ( &bhe::destroy );
// if we get here and the IO is still attached then we have a
// leaked io block that was not registered with a channel.
if ( this->ioTable.numEntriesInstalled () ) {
this->printf ( "CAC %u orphaned IO items?\n",
this->ioTable.numEntriesInstalled () );
}
osiSockRelease ();
this->pTimerQueue->release ();
@@ -244,10 +257,7 @@ void cac::processRecvBacklog ()
}
}
/*
* cac::flush ()
*/
void cac::flush ()
void cac::flushRequest ()
{
/*
* set the push pending flag on all virtual circuits
@@ -255,7 +265,7 @@ void cac::flush ()
epicsAutoMutex autoMutex ( this->iiuListMutex );
tsDLIterBD <tcpiiu> piiu = this->iiuList.firstIter ();
while ( piiu.valid () ) {
piiu->flush ();
piiu->flushRequest ();
piiu++;
}
}
@@ -299,6 +309,8 @@ void cac::show ( unsigned level ) const
this->programBeginTime.show ( level - 3u );
::printf ( "Channel identifier hash table:\n" );
this->chanTable.show ( level - 3u );
::printf ( "IO identifier hash table:\n" );
this->ioTable.show ( level - 3u );
::printf ( "Synchronous group identifier hash table:\n" );
this->sgTable.show ( level - 3u );
::printf ( "Beacon source identifier hash table:\n" );
@@ -426,7 +438,7 @@ int cac::pendIO ( const double &timeout )
this->enableCallbackPreemption ();
this->flush ();
this->flushRequest ();
int status = ECA_NORMAL;
epicsTime beg_time = epicsTime::getCurrent ();
@@ -450,8 +462,11 @@ int cac::pendIO ( const double &timeout )
}
this->ioCounter.cleanUp ();
if ( this->pudpiiu ) {
this->pudpiiu->connectTimeoutNotify ();
{
epicsAutoMutex autoMutex ( this->defaultMutex );
if ( this->pudpiiu ) {
this->pudpiiu->connectTimeoutNotify ();
}
}
this->disableCallbackPreemption ();
@@ -469,7 +484,7 @@ int cac::pendEvent ( const double &timeout )
this->enableCallbackPreemption ();
this->flush ();
this->flushRequest ();
if ( timeout == 0.0 ) {
while ( true ) {
@@ -646,7 +661,7 @@ bool cac::setupUDP ()
return false;
}
this->pSearchTmr = new searchTimer ( *this->pudpiiu, *this->pTimerQueue );
this->pSearchTmr = new searchTimer ( *this->pudpiiu, *this->pTimerQueue, this->defaultMutex );
if ( ! this->pSearchTmr ) {
delete this->pudpiiu;
this->pudpiiu = 0;
@@ -837,9 +852,7 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
piiu->attachChannel ( *chan );
chan->createChannelRequest ();
// wake up send thread which ultimately sends the claim message
piiu->flush ();
piiu->flushRequest ();
if ( ! piiu->ca_v42_ok () ) {
chan->connect ();
@@ -858,7 +871,9 @@ void cac::uninstallChannel ( nciu & chan )
epicsAutoMutex autoMutex ( this->defaultMutex );
nciu *pChan = this->chanTable.remove ( chan );
assert ( pChan = &chan );
chan.getPIIU ()->detachChannel ( chan );
this->flushIfRequired ( chan );
chan.getPIIU()->clearChannelRequest ( chan );
chan.getPIIU()->detachChannel ( chan );
}
void cac::getFDRegCallback ( CAFDHANDLER *&fdRegFuncOut, void *&fdRegArgOut ) const
@@ -888,5 +903,311 @@ int cac::printf ( const char *pformat, ... )
return status;
}
// lock must be applied before calling this cac private routine
void cac::flushIfRequired ( nciu &chan )
{
if ( chan.getPIIU()->flushBlockThreshold() ) {
// the process thread is not permitted to flush as this
// can result in a push / pull deadlock on the TCP pipe.
// Instead, the process thread scheduals the flush with the
// send thread which runs at a higher priority than the
// send thread. The same applies to the UDP thread for
// locking hierarchy reasons.
bool flushPermit = true;
if ( this->pRecvProcThread ) {
if ( this->pRecvProcThread->isCurrentThread () ) {
flushPermit = false;
}
}
if ( this->pudpiiu ) {
if ( this->pudpiiu->isCurrentThread () ) {
flushPermit = false;
}
}
if ( flushPermit ) {
chan.getPIIU()->blockUntilSendBacklogIsReasonable ( this->defaultMutex );
}
else {
this->flushRequest ();
}
}
else {
chan.getPIIU()->flushRequestIfAboveEarlyThreshold ();
}
}
int cac::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue )
{
epicsAutoMutex autoMutex ( this->defaultMutex );
this->flushIfRequired ( chan );
return chan.getPIIU()->writeRequest ( chan, type, nElem, pValue );
}
int cac::writeNotifyRequest ( nciu &chan, cacNotify &notify, unsigned type, unsigned nElem, const void *pValue )
{
epicsAutoMutex autoMutex ( this->defaultMutex );
this->flushIfRequired ( chan );
netWriteNotifyIO *pIO = new netWriteNotifyIO ( chan, notify );
if ( pIO ) {
this->ioTable.add ( *pIO );
chan.cacPrivateListOfIO::eventq.add ( *pIO );
int status = chan.getPIIU()->writeNotifyRequest ( chan, *pIO, type, nElem, pValue );
if ( status != ECA_NORMAL ) {
this->ioTable.remove ( *pIO );
chan.cacPrivateListOfIO::eventq.remove ( *pIO );
delete static_cast < baseNMIU * > ( pIO );
}
return status;
}
else {
return ECA_ALLOCMEM;
}
}
int cac::readNotifyRequest ( nciu &chan, cacNotify &notify, unsigned type, unsigned nElem )
{
epicsAutoMutex autoMutex ( this->defaultMutex );
this->flushIfRequired ( chan );
netReadNotifyIO *pIO = new netReadNotifyIO ( chan, notify );
if ( pIO ) {
this->ioTable.add ( *pIO );
chan.cacPrivateListOfIO::eventq.add ( *pIO );
int status = chan.getPIIU()->readNotifyRequest ( chan, *pIO, type, nElem );
if ( status != ECA_NORMAL ) {
this->ioTable.remove ( *pIO );
chan.cacPrivateListOfIO::eventq.remove ( *pIO );
delete static_cast < baseNMIU * > ( pIO );
}
return status;
}
else {
return ECA_ALLOCMEM;
}
}
bool cac::ioCompletionNotify ( unsigned id, unsigned type,
unsigned long count, const void *pData )
{
epicsAutoMutex autoMutex ( this->defaultMutex );
baseNMIU * pmiu = this->ioTable.lookup ( id );
if ( pmiu ) {
pmiu->notify ().completionNotify ( pmiu->channel (), type, count, pData );
return true;
}
else {
return false;
}
}
bool cac::ioExceptionNotify ( unsigned id, int status, const char *pContext )
{
epicsAutoMutex autoMutex ( this->defaultMutex );
baseNMIU * pmiu = this->ioTable.lookup ( id );
if ( pmiu ) {
pmiu->notify ().exceptionNotify ( pmiu->channel (), status, pContext );
return true;
}
else {
return false;
}
}
bool cac::ioExceptionNotify ( unsigned id, int status,
const char *pContext, unsigned type, unsigned long count )
{
epicsAutoMutex autoMutex ( this->defaultMutex );
baseNMIU * pmiu = this->ioTable.lookup ( id );
if ( pmiu ) {
pmiu->notify ().exceptionNotify ( pmiu->channel (),
status, pContext, type, count );
return true;
}
else {
return false;
}
}
bool cac::ioCompletionNotifyAndDestroy ( unsigned id )
{
baseNMIU * pmiu;
{
epicsAutoMutex autoMutex ( this->defaultMutex );
pmiu = this->ioTable.remove ( id );
if ( pmiu ) {
pmiu->channel ().cacPrivateListOfIO::eventq.remove ( *pmiu );
}
}
if ( pmiu ) {
pmiu->notify ().completionNotify ( pmiu->channel () );
delete pmiu;
return true;
}
else {
return false;
}
}
bool cac::ioCompletionNotifyAndDestroy ( unsigned id,
unsigned type, unsigned long count, const void *pData )
{
baseNMIU * pmiu;
{
epicsAutoMutex autoMutex ( this->defaultMutex );
pmiu = this->ioTable.remove ( id );
if ( pmiu ) {
pmiu->channel ().cacPrivateListOfIO::eventq.remove ( *pmiu );
}
}
if ( pmiu ) {
pmiu->notify ().completionNotify ( pmiu->channel (), type, count, pData );
delete pmiu;
return true;
}
else {
return false;
}
}
bool cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext )
{
baseNMIU * pmiu;
{
epicsAutoMutex autoMutex ( this->defaultMutex );
pmiu = this->ioTable.remove ( id );
if ( pmiu ) {
pmiu->channel ().cacPrivateListOfIO::eventq.remove ( *pmiu );
}
}
if ( pmiu ) {
pmiu->notify ().exceptionNotify ( pmiu->channel (), status, pContext );
delete pmiu;
return true;
}
else {
return false;
}
}
bool cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
const char *pContext, unsigned type, unsigned long count )
{
baseNMIU * pmiu;
{
epicsAutoMutex autoMutex ( this->defaultMutex );
pmiu = this->ioTable.remove ( id );
if ( pmiu ) {
pmiu->channel ().cacPrivateListOfIO::eventq.remove ( *pmiu );
}
}
if ( pmiu ) {
pmiu->notify ().exceptionNotify ( pmiu->channel (), status,
pContext, type, count );
delete pmiu;
return true;
}
else {
return false;
}
}
// resubscribe for monitors from this channel
void cac::connectAllIO ( nciu &chan )
{
epicsAutoMutex autoMutex ( this->defaultMutex );
tsDLIterBD < baseNMIU > pNetIO =
chan.cacPrivateListOfIO::eventq.firstIter ();
while ( pNetIO.valid () ) {
tsDLIterBD < baseNMIU > next = pNetIO;
next++;
class netSubscription *pSubscr = pNetIO->isSubscription ();
if ( pSubscr ) {
chan.getPIIU()->subscriptionRequest ( *pSubscr );
}
else {
// it shouldnt be here at this point - so uninstall it
this->ioTable.remove ( *pNetIO );
chan.cacPrivateListOfIO::eventq.remove ( *pNetIO );
pNetIO->notify().exceptionNotify ( pNetIO->channel(), ECA_DISCONN, chan.pHostName() );
delete pNetIO.pointer ();
}
pNetIO = next;
}
chan.getPIIU()->flushRequest ();
}
// cancel IO operations and monitor subscriptions
void cac::disconnectAllIO ( nciu &chan )
{
epicsAutoMutex autoMutex ( this->defaultMutex );
tsDLIterBD < baseNMIU > pNetIO =
chan.cacPrivateListOfIO::eventq.firstIter ();
while ( pNetIO.valid () ) {
tsDLIterBD < baseNMIU > next = pNetIO;
next++;
class netSubscription *pSubscr = pNetIO->isSubscription ();
this->ioTable.remove ( *pNetIO );
if ( pSubscr ) {
chan.getPIIU()->subscriptionCancelRequest ( *pSubscr );
}
else {
// no use after disconnected - so uninstall it
chan.cacPrivateListOfIO::eventq.remove ( *pNetIO );
pNetIO->notify ().exceptionNotify ( pNetIO->channel (), ECA_DISCONN, chan.pHostName () );
delete pNetIO.pointer ();
}
pNetIO = next;
}
}
//
// care is taken to not hold the lock while deleting the
// IO so that subscription delete request (sent by the
// IO's destructor) do not deadlock
//
void cac::destroyAllIO ( nciu &chan )
{
tsDLList < baseNMIU > eventQ;
{
epicsAutoMutex autoMutex ( this->defaultMutex );
while ( baseNMIU *pIO = eventQ.get () ) {
this->ioTable.remove ( *pIO );
eventQ.add ( *pIO );
}
}
while ( baseNMIU *pIO = eventQ.get () ) {
delete pIO;
}
}
void cac::uninstallIO ( baseNMIU &io )
{
epicsAutoMutex autoMutex ( this->defaultMutex );
baseNMIU *pIO = this->ioTable.remove ( io );
assert ( &io == pIO );
io.channel().cacPrivateListOfIO::eventq.remove ( io );
netSubscription * pSubscr = io.isSubscription ();
if ( pSubscr ) {
this->flushIfRequired ( io.channel() );
io.channel().getPIIU()->subscriptionCancelRequest ( *pSubscr );
}
}
void cac::installSubscription ( netSubscription &subscr )
{
epicsAutoMutex autoMutex ( this->defaultMutex );
subscr.channel().cacPrivateListOfIO::eventq.add ( subscr );
this->ioTable.add ( subscr );
if ( subscr.channel().connected() ) {
this->flushIfRequired ( subscr.channel() );
subscr.channel().getPIIU()->subscriptionRequest ( subscr );
}
}
+6 -24
View File
@@ -46,30 +46,6 @@ inline unsigned cac::getInitializingThreadsPriority () const
return this->initializingThreadsPriority;
}
// the process thread is not permitted to flush as this
// can result in a push / pull deadlock on the TCP pipe.
// Instead, the process thread scheduals the flush with the
// send thread which runs at a higher priority than the
// send thread. The same applies to the UDP thread for
// locking hierarchy reasons.
//
// this is only called when we detect send queue quota
// exceeded
inline bool cac::flushPermit () const
{
if ( this->pRecvProcThread ) {
if ( this->pRecvProcThread->isCurrentThread () ) {
return false;
}
}
if ( this->pudpiiu ) {
if ( this->pudpiiu->isCurrentThread () ) {
return false;
}
}
return true;
}
inline void cac::incrementOutstandingIO ()
{
this->ioCounter.increment ();
@@ -90,5 +66,11 @@ inline unsigned cac::sequenceNumberOfOutstandingIO () const
return this->ioCounter.sequenceNumber ();
}
inline epicsMutex & cac::mutex ()
{
return this->defaultMutex;
}
#endif // cac_ILh
-1
View File
@@ -24,7 +24,6 @@ bool comBuf::flushToWire ( wireSendAdapter &wire )
unsigned nBytes = wire.sendBytes ( &this->buf[this->nextReadIndex],
occupied );
if ( nBytes == 0u ) {
this->nextReadIndex = this->nextWriteIndex;
return false;
}
this->nextReadIndex += nBytes;
+6 -1
View File
@@ -179,7 +179,12 @@ inline unsigned comQueSend::occupiedBytes () const
return this->nBytesPending;
}
inline bool comQueSend::flushThreshold ( unsigned nBytesThisMsg ) const
inline bool comQueSend::flushBlockThreshold ( unsigned nBytesThisMsg ) const
{
return ( this->nBytesPending + nBytesThisMsg > 16 * comBuf::capacityBytes () );
}
inline bool comQueSend::flushEarlyThreshold ( unsigned nBytesThisMsg ) const
{
return ( this->nBytesPending + nBytesThisMsg > 4 * comBuf::capacityBytes () );
}
+69 -97
View File
@@ -166,31 +166,26 @@ public:
void clear ();
int reserveSpace ( unsigned msgSize );
unsigned occupiedBytes () const;
bool flushThreshold ( unsigned nBytesThisMsg ) const;
bool flushEarlyThreshold ( unsigned nBytesThisMsg ) const;
bool flushBlockThreshold ( unsigned nBytesThisMsg ) const;
bool dbr_type_ok ( unsigned type );
void pushUInt16 ( const ca_uint16_t value );
void pushUInt32 ( const ca_uint32_t value );
void pushFloat32 ( const ca_float32_t value );
void pushString ( const char *pVal, unsigned nElem );
void push_dbr_type ( unsigned type, const void *pVal, unsigned nElem );
comBuf * popNextComBufToSend ();
private:
wireSendAdapter & wire;
tsDLList < comBuf > bufs;
bufferReservoir reservoir;
unsigned nBytesPending;
void copy_dbr_string ( const void *pValue, unsigned nElem );
void copy_dbr_short ( const void *pValue, unsigned nElem );
void copy_dbr_float ( const void *pValue, unsigned nElem );
void copy_dbr_char ( const void *pValue, unsigned nElem );
void copy_dbr_long ( const void *pValue, unsigned nElem );
void copy_dbr_double ( const void *pValue, unsigned nElem );
wireSendAdapter & wire;
tsDLList < comBuf > bufs;
bufferReservoir reservoir;
unsigned nBytesPending;
typedef void ( comQueSend::*copyFunc_t ) (
const void *pValue, unsigned nElem );
static const copyFunc_t dbrCopyVector [39];
@@ -220,29 +215,20 @@ public:
};
class netiiu;
class nciuPrivate {
private:
epicsMutex mutex;
friend class nciu;
};
class tcpiiu;
class baseNMIU;
//
// fields in class nciu which really belong to tcpiiu
//
class tcpiiuPrivateListOfIO {
class cacPrivateListOfIO {
private:
tsDLList < class baseNMIU > eventq;
friend tcpiiu;
friend netiiu; // used to install subscriptions when not connected
friend class cac;
};
class nciu : public cacChannelIO, public tsDLNode < nciu >,
public chronIntIdRes < nciu >, public tcpiiuPrivateListOfIO {
public chronIntIdRes < nciu >, public cacPrivateListOfIO {
public:
nciu ( class cac &, netiiu &,
cacChannelNotify &, const char *pNameIn );
@@ -267,13 +253,13 @@ public:
void searchReplySetUp ( netiiu &iiu, unsigned sidIn,
unsigned typeIn, unsigned long countIn );
void show ( unsigned level ) const;
bool verifyIIU ( netiiu & );
bool verifyConnected ( netiiu & );
void connectTimeoutNotify ();
unsigned long nativeElementCount () const;
const char *pName () const;
unsigned nameLen () const;
const char * pHostName () const; // deprecated - please do not use
unsigned long nativeElementCount () const;
bool connected () const;
void uninstallIO ( baseNMIU &io );
protected:
~nciu (); // force pool allocation
private:
@@ -303,14 +289,13 @@ private:
int subscribe ( unsigned type, unsigned long nElem,
unsigned mask, cacNotify &notify,
cacNotifyIO *&pNotifyIO );
void hostName ( char *pBuf, unsigned bufLength ) const;
bool ca_v42_ok () const;
short nativeType () const;
channel_state state () const;
caar accessRights () const;
unsigned searchAttempts () const;
double beaconPeriod () const;
const char * pHostName () const; // deprecated - please do not use
bool ca_v42_ok () const;
void hostName ( char *pBuf, unsigned bufLength ) const;
void notifyStateChangeFirstConnectInCountOfOutstandingIO ();
static tsFreeList < class nciu, 1024 > freeList;
static epicsMutex freeListMutex;
@@ -322,7 +307,6 @@ public:
baseNMIU ( cacNotify &notifyIn, nciu &chan );
virtual ~baseNMIU () = 0;
virtual class netSubscription * isSubscription ();
virtual void ioCancelRequest ();
void show ( unsigned level ) const;
ca_uint32_t getID () const;
nciu & channel () const;
@@ -349,31 +333,10 @@ private:
const unsigned type;
const unsigned mask;
class netSubscription * isSubscription ();
void ioCancelRequest ();
static tsFreeList < class netSubscription, 1024 > freeList;
static epicsMutex freeListMutex;
};
#if 0
class netReadCopyIO : public baseNMIU {
public:
netReadCopyIO ( nciu &chan, unsigned type, unsigned long count,
void *pValue, unsigned seqNumber, cacNotify &notifyIn );
void show ( unsigned level ) const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
protected:
~netReadCopyIO (); // must be allocated from pool
private:
unsigned type;
unsigned long count;
void *pValue;
unsigned seqNumber;
static tsFreeList < class netReadCopyIO, 1024 > freeList;
static epicsMutex freeListMutex;
};
#endif
class netReadNotifyIO : public baseNMIU {
public:
netReadNotifyIO ( nciu &chan, cacNotify &notify );
@@ -458,7 +421,6 @@ public:
void resetChannelRetryCounts ();
void attachChannel ( nciu &chan );
void detachChannel ( nciu &chan );
void installSubscription ( netSubscription &subscr );
virtual void hostName (char *pBuf, unsigned bufLength) const;
virtual const char * pHostName () const; // deprecated - please do not use
virtual bool isVirtaulCircuit ( const char *pChannelName, const osiSockAddr &addr ) const;
@@ -466,24 +428,25 @@ public:
virtual bool ca_v41_ok () const;
virtual bool pushDatagramMsg ( const caHdr &hdr, const void *pExt, ca_uint16_t extsize);
virtual int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue);
virtual int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue );
virtual int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem );
virtual int writeNotifyRequest ( nciu &, netWriteNotifyIO &, unsigned type, unsigned nElem, const void *pValue );
virtual int readNotifyRequest ( nciu &, netReadNotifyIO &, unsigned type, unsigned nElem );
virtual int createChannelRequest ( nciu & );
virtual void connectAllIO ( nciu &chan );
virtual void disconnectAllIO ( nciu &chan );
virtual int clearChannelRequest ( nciu & );
virtual bool uninstallIO ( baseNMIU & );
virtual void subscriptionCancelRequest ( netSubscription &subscr, bool userThread );
virtual void subscriptionRequest ( netSubscription &subscr );
virtual void subscriptionCancelRequest ( netSubscription &subscr );
virtual double beaconPeriod () const;
virtual bool destroyAllIO ( nciu &chan );
virtual void flushRequest ();
virtual bool flushBlockThreshold () const;
virtual void flushRequestIfAboveEarlyThreshold ();
virtual void blockUntilSendBacklogIsReasonable ( epicsMutex & );
protected:
cac * pCAC () const;
mutable epicsMutex mutex;
private:
tsDLList < nciu > channelList;
class cac *pClientCtx;
virtual void lastChannelDetachNotify ();
virtual void subscriptionRequest ( netSubscription &subscr, bool userThread );
};
class limboiiu : public netiiu {
@@ -497,14 +460,14 @@ class udpiiu;
class searchTimer : private epicsTimerNotify {
public:
searchTimer ( udpiiu &iiu, epicsTimerQueue &queue );
searchTimer ( udpiiu &iiu, epicsTimerQueue &queue, epicsMutex & );
virtual ~searchTimer ();
void notifySearchResponse ( unsigned short retrySeqNo );
void resetPeriod ( double delayToNextTry );
void show ( unsigned level ) const;
private:
epicsTimer &timer;
epicsMutex mutex;
epicsMutex &mutex;
udpiiu &iiu;
unsigned framesPerTry; /* # of UDP frames per search try */
unsigned framesPerTryCongestThresh; /* one half N tries w congest */
@@ -525,7 +488,6 @@ public:
virtual ~repeaterSubscribeTimer ();
void confirmNotify ();
void show (unsigned level) const;
private:
epicsTimer &timer;
udpiiu &iiu;
@@ -549,7 +511,7 @@ public:
void postMsg ( const osiSockAddr &net_addr,
char *pInBuf, unsigned long blockSize );
void repeaterRegistrationMessage ( unsigned attemptNumber );
void flush ();
void datagramFlush ();
unsigned getPort () const;
void show ( unsigned level ) const;
bool isCurrentThread () const;
@@ -573,8 +535,6 @@ private:
bool pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t extsize );
friend void cacRecvThreadUDP ( void *pParam );
typedef bool (udpiiu::*pProtoStubUDP) ( const caHdr &, const osiSockAddr & );
// UDP protocol dispatch table
@@ -588,6 +548,8 @@ private:
bool beaconAction ( const caHdr &msg, const osiSockAddr &net_addr );
bool notHereRespAction ( const caHdr &msg, const osiSockAddr &net_addr );
bool repeaterAckAction ( const caHdr &msg, const osiSockAddr &net_addr );
friend void cacRecvThreadUDP ( void *pParam );
};
class tcpRecvWatchdog : private epicsTimerNotify {
@@ -679,9 +641,11 @@ public:
void beaconArrivalNotify ();
bool fullyConstructed () const;
void flush ();
void flushRequest ();
bool flushBlockThreshold () const;
void flushRequestIfAboveEarlyThreshold ();
void blockUntilSendBacklogIsReasonable ( epicsMutex & );
virtual void show ( unsigned level ) const;
//osiSockAddr address () const;
bool setEchoRequestPending ();
void processIncoming ();
@@ -698,8 +662,6 @@ public:
private:
tcpRecvWatchdog recvDog;
tcpSendWatchdog sendDog;
epicsMutex flushMutex; // only one thread flushes at a time
chronIntIdResTable < baseNMIU > ioTable;
comQueSend sendQue;
comQueRecv recvQue;
osiSockAddr addr;
@@ -714,8 +676,10 @@ private:
epicsEventId recvThreadRingBufferSpaceAvailableSignal;
epicsEventId sendThreadExitSignal;
epicsEventId recvThreadExitSignal;
epicsEventId flushBlockSignal;
SOCKET sock;
unsigned contigRecvMsgCount;
unsigned blockingForFlush;
bool fullyConstructedFlag;
bool busyStateDetected; // only modified by the recv thread
bool flowControlActive; // only modified by the send process thread
@@ -723,11 +687,10 @@ private:
bool msgHeaderAvailable;
bool sockCloseCompleted;
bool fdRegCallbackNeeded;
bool earlyFlush;
unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf );
unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf );
bool flushToWire ( bool userThread );
bool threadContextSensitiveFlushToWire ( bool userThread );
friend void cacSendThreadTCP ( void *pParam );
friend void cacRecvThreadTCP ( void *pParam );
@@ -742,10 +705,12 @@ private:
int hostNameSetRequest ();
int userNameSetRequest ();
int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue );
int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue );
int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem );
int writeNotifyRequest ( nciu &, netWriteNotifyIO &, unsigned type, unsigned nElem, const void *pValue );
int readNotifyRequest ( nciu &, netReadNotifyIO &, unsigned type, unsigned nElem );
int createChannelRequest ( nciu & );
int clearChannelRequest ( nciu & );
void subscriptionRequest ( netSubscription &subscr );
void subscriptionCancelRequest ( netSubscription &subscr );
// recv protocol stubs
bool noopAction ();
@@ -761,27 +726,7 @@ private:
bool verifyAndDisconnectChan ();
bool badTCPRespAction ();
// IO management routines
bool ioCompletionNotify ( unsigned id, unsigned type,
unsigned long count, const void *pData );
bool ioExceptionNotify ( unsigned id,
int status, const char *pContext );
bool ioExceptionNotify ( unsigned id, int status,
const char *pContext, unsigned type, unsigned long count );
bool ioCompletionNotifyAndDestroy ( unsigned id );
bool ioCompletionNotifyAndDestroy ( unsigned id,
unsigned type, unsigned long count, const void *pData );
bool ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext );
bool ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext, unsigned type, unsigned long count );
void connectAllIO ( nciu &chan );
void disconnectAllIO ( nciu &chan );
bool uninstallIO ( baseNMIU & );
bool destroyAllIO ( nciu &chan );
void subscriptionRequest ( netSubscription &subscr, bool userThread );
void subscriptionCancelRequest ( netSubscription &subscr, bool userThread );
bool flush (); // only to be called by the send thread
typedef bool ( tcpiiu::*pProtoStubTCP ) ();
static const pProtoStubTCP tcpJumpTableCAC [];
@@ -963,7 +908,7 @@ private:
// w/o taking the defaultMutex in this class first
//
//
class cac : public caClient, public nciuPrivate
class cac : public caClient
{
public:
cac ( bool enablePreemptiveCallback = false );
@@ -985,10 +930,27 @@ public:
bool ioComplete () const;
// IO management
void flush ();
bool flushPermit () const;
void flushRequest ();
int pendIO ( const double &timeout );
int pendEvent ( const double &timeout );
void uninstallIO ( baseNMIU &io );
bool ioCompletionNotify ( unsigned id, unsigned type,
unsigned long count, const void *pData );
bool ioExceptionNotify ( unsigned id,
int status, const char *pContext );
bool ioExceptionNotify ( unsigned id, int status,
const char *pContext, unsigned type, unsigned long count );
bool ioCompletionNotifyAndDestroy ( unsigned id );
bool ioCompletionNotifyAndDestroy ( unsigned id,
unsigned type, unsigned long count, const void *pData );
bool ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext );
bool ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext, unsigned type, unsigned long count );
void connectAllIO ( nciu &chan );
void disconnectAllIO ( nciu &chan );
void destroyAllIO ( nciu &chan );
void installSubscription ( netSubscription &subscr );
// exception routines
void exceptionNotify ( int status, const char *pContext,
@@ -1013,6 +975,11 @@ public:
cacChannelIO * createChannelIO ( const char *name_str, cacChannelNotify &chan );
void registerService ( cacServiceIO &service );
// IO request stubs
int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue );
int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue );
int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem );
// sync group routines
CASG * lookupCASG ( unsigned id );
void installCASG ( CASG & );
@@ -1038,6 +1005,8 @@ public:
const char * userNamePointer () const;
unsigned getInitializingThreadsPriority () const;
epicsMutex & mutex ();
private:
ioCounterNet ioCounter;
ipAddrToAsciiEngine ipToAEngine;
@@ -1046,6 +1015,8 @@ private:
tsDLList <tcpiiu> iiuListLimbo;
chronIntIdResTable
< nciu > chanTable;
chronIntIdResTable
< baseNMIU > ioTable;
chronIntIdResTable
< CASG > sgTable;
resTable
@@ -1070,6 +1041,7 @@ private:
unsigned initializingThreadsPriority;
bool enablePreemptiveCallback;
bool setupUDP ();
void flushIfRequired ( nciu & ); // lock must be applied
};
/*
+249 -272
View File
@@ -71,15 +71,7 @@ nciu::~nciu ()
}
// care is taken so that a lock is not applied during this phase
unsigned i = 0u;
while ( ! this->piiu->destroyAllIO ( *this ) ) {
if ( i++ > 1000u ) {
this->cacCtx.printf (
"CAC: unable to destroy IO when channel destroyed?\n" );
break;
}
}
this->piiu->clearChannelRequest ( *this );
this->cacCtx.destroyAllIO ( *this );
this->cacCtx.uninstallChannel ( *this );
if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) {
@@ -91,6 +83,155 @@ nciu::~nciu ()
delete [] this->pNameStr;
}
void nciu::connect ( unsigned nativeType,
unsigned long nativeCount, unsigned sidIn )
{
bool v41Ok;
if ( ! this->f_claimSent ) {
ca_printf (
"CAC: Ignored conn resp to chan lacking virtual circuit CID=%u SID=%u?\n",
this->getId (), sidIn );
return;
}
if ( this->f_connected ) {
ca_printf (
"CAC: Ignored conn resp to conn chan CID=%u SID=%u?\n",
this->getId (), sidIn );
return;
}
if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) {
if ( this->f_firstConnectDecrementsOutstandingIO ) {
this->cacCtx.decrementOutstandingIO ();
}
}
if ( this->piiu ) {
v41Ok = this->piiu->ca_v41_ok ();
}
else {
v41Ok = false;
}
this->typeCode = nativeType;
this->count = nativeCount;
this->sid = sidIn;
this->f_connected = true;
this->f_previousConn = true;
/*
* if less than v4.1 then the server will never
* send access rights and we know that there
* will always be access
*/
if ( ! v41Ok ) {
this->accessRightState.read_access = true;
this->accessRightState.write_access = true;
}
// resubscribe for monitors from this channel
this->cacCtx.connectAllIO ( *this );
this->notify().connectNotify ( *this );
/*
* if less than v4.1 then the server will never
* send access rights and we know that there
* will always be access and also need to call
* their call back here
*/
if ( ! v41Ok ) {
this->notify ().accessRightsNotify ( *this, this->accessRightState );
}
}
void nciu::disconnect ( netiiu &newiiu )
{
bool wasConnected;
this->piiu->disconnectAllIO ( *this );
this->piiu = &newiiu;
this->retry = 0u;
this->typeCode = USHRT_MAX;
this->count = 0u;
this->sid = UINT_MAX;
this->accessRightState.read_access = false;
this->accessRightState.write_access = false;
this->f_claimSent = false;
if ( this->f_connected ) {
wasConnected = true;
}
else {
wasConnected = false;
}
this->f_connected = false;
if ( wasConnected ) {
/*
* look for events that have an event cancel in progress
*/
this->notify ().disconnectNotify ( *this );
this->notify ().accessRightsNotify ( *this, this->accessRightState );
}
this->resetRetryCount ();
}
/*
* nciu::searchMsg ()
*/
bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel )
{
caHdr msg;
bool success;
msg.m_cmmd = htons ( CA_PROTO_SEARCH );
msg.m_available = this->getId ();
msg.m_dataType = htons ( DONTREPLY );
msg.m_count = htons ( CA_MINOR_VERSION );
msg.m_cid = this->getId ();
success = this->piiu->pushDatagramMsg ( msg,
this->pNameStr, this->nameLength );
if ( success ) {
//
// increment the number of times we have tried
// to find this channel
//
if ( this->retry < MAXCONNTRIES ) {
this->retry++;
}
this->retrySeqNo = retrySeqNumber;
retryNoForThisChannel = this->retry;
}
return success;
}
const char *nciu::pName () const
{
return this->pNameStr;
}
unsigned nciu::nameLen () const
{
return this->nameLength;
}
int nciu::createChannelRequest ()
{
int status = this->piiu->createChannelRequest ( *this );
if ( status == ECA_NORMAL ) {
this->f_claimSent = true;
}
return status;
}
int nciu::read ( unsigned type, unsigned long countIn, cacNotify &notify )
{
//
@@ -112,13 +253,13 @@ int nciu::read ( unsigned type, unsigned long countIn, cacNotify &notify )
countIn = this->count;
}
return this->piiu->readNotifyRequest ( *this, notify, type, countIn );
return this->cacCtx.readNotifyRequest ( *this, notify, type, countIn );
}
/*
* check_a_dbr_string()
*/
LOCAL int check_a_dbr_string ( const char *pStr, const unsigned count )
static int check_a_dbr_string ( const char *pStr, const unsigned count )
{
for ( unsigned i = 0; i < count; i++ ) {
unsigned int strsize = 0;
@@ -155,7 +296,7 @@ int nciu::write ( unsigned type, unsigned long countIn, const void *pValue )
}
}
return this->piiu->writeRequest ( *this, type, countIn, pValue );
return this->cacCtx.writeRequest ( *this, type, countIn, pValue );
}
int nciu::write ( unsigned type, unsigned long countIn, const void *pValue, cacNotify &notify )
@@ -180,264 +321,7 @@ int nciu::write ( unsigned type, unsigned long countIn, const void *pValue, cacN
}
}
return this->piiu->writeNotifyRequest ( *this, notify, type, countIn, pValue );
}
void nciu::initiateConnect ()
{
this->notifyStateChangeFirstConnectInCountOfOutstandingIO ();
this->cacCtx.installNetworkChannel ( *this, this->piiu );
}
void nciu::connect ( unsigned nativeType,
unsigned long nativeCount, unsigned sidIn )
{
bool v41Ok;
{
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
if ( ! this->f_claimSent ) {
ca_printf (
"CAC: Ignored conn resp to chan lacking virtual circuit CID=%u SID=%u?\n",
this->getId (), sidIn );
return;
}
if ( this->f_connected ) {
ca_printf (
"CAC: Ignored conn resp to conn chan CID=%u SID=%u?\n",
this->getId (), sidIn );
return;
}
if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) {
if ( this->f_firstConnectDecrementsOutstandingIO ) {
this->cacCtx.decrementOutstandingIO ();
}
}
if ( this->piiu ) {
v41Ok = this->piiu->ca_v41_ok ();
}
else {
v41Ok = false;
}
this->typeCode = nativeType;
this->count = nativeCount;
this->sid = sidIn;
this->f_connected = true;
this->f_previousConn = true;
/*
* if less than v4.1 then the server will never
* send access rights and we know that there
* will always be access
*/
if ( ! v41Ok ) {
this->accessRightState.read_access = true;
this->accessRightState.write_access = true;
}
}
// resubscribe for monitors from this channel
this->piiu->connectAllIO ( *this );
this->notify ().connectNotify ( *this );
/*
* if less than v4.1 then the server will never
* send access rights and we know that there
* will always be access and also need to call
* their call back here
*/
if ( ! v41Ok ) {
this->notify ().accessRightsNotify ( *this, this->accessRightState );
}
}
void nciu::connectTimeoutNotify ()
{
if ( ! this->f_connectTimeOutSeen ) {
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
this->f_connectTimeOutSeen = true;
}
}
void nciu::disconnect ( netiiu &newiiu )
{
bool wasConnected;
this->piiu->disconnectAllIO ( *this );
{
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
this->piiu = &newiiu;
this->retry = 0u;
this->typeCode = USHRT_MAX;
this->count = 0u;
this->sid = UINT_MAX;
this->accessRightState.read_access = false;
this->accessRightState.write_access = false;
this->f_claimSent = false;
if ( this->f_connected ) {
wasConnected = true;
}
else {
wasConnected = false;
}
this->f_connected = false;
}
if ( wasConnected ) {
/*
* look for events that have an event cancel in progress
*/
this->notify ().disconnectNotify ( *this );
this->notify ().accessRightsNotify ( *this, this->accessRightState );
}
this->resetRetryCount ();
}
/*
* nciu::searchMsg ()
*/
bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel )
{
caHdr msg;
bool success;
msg.m_cmmd = htons ( CA_PROTO_SEARCH );
msg.m_available = this->getId ();
msg.m_dataType = htons ( DONTREPLY );
msg.m_count = htons ( CA_MINOR_VERSION );
msg.m_cid = this->getId ();
success = this->piiu->pushDatagramMsg ( msg,
this->pNameStr, this->nameLength );
if ( success ) {
//
// increment the number of times we have tried
// to find this channel
//
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
if ( this->retry < MAXCONNTRIES ) {
this->retry++;
}
this->retrySeqNo = retrySeqNumber;
retryNoForThisChannel = this->retry;
}
return success;
}
void nciu::hostName ( char *pBuf, unsigned bufLength ) const
{
this->piiu->hostName ( pBuf, bufLength );
}
// deprecated - please do not use, this is _not_ thread safe
const char * nciu::pHostName () const
{
return this->piiu->pHostName (); // ouch !
}
bool nciu::ca_v42_ok () const
{
return this->piiu->ca_v42_ok ();
}
short nciu::nativeType () const
{
short type;
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
if ( this->f_connected ) {
if ( this->typeCode < SHRT_MAX ) {
type = static_cast <short> ( this->typeCode );
}
else {
type = TYPENOTCONN;
}
}
else {
type = TYPENOTCONN;
}
return type;
}
unsigned long nciu::nativeElementCount () const
{
unsigned long countOut;
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
if ( this->f_connected ) {
countOut = this->count;
}
else {
countOut = 0ul;
}
return countOut;
}
channel_state nciu::state () const
{
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
channel_state stateOut;
if ( this->f_connected ) {
stateOut = cs_conn;
}
else if ( this->f_previousConn ) {
stateOut = cs_prev_conn;
}
else {
stateOut = cs_never_conn;
}
return stateOut;
}
caar nciu::accessRights () const
{
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
caar tmp = this->accessRightState;
return tmp;
}
const char *nciu::pName () const
{
return this->pNameStr;
}
unsigned nciu::nameLen () const
{
return this->nameLength;
}
unsigned nciu::searchAttempts () const
{
return this->retry;
}
double nciu::beaconPeriod () const
{
return this->piiu->beaconPeriod ();
}
int nciu::createChannelRequest ()
{
int status = this->piiu->createChannelRequest ( *this );
if ( status == ECA_NORMAL ) {
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
this->f_claimSent = true;
}
return status;
return this->cacCtx.writeNotifyRequest ( *this, notify, type, countIn, pValue );
}
int nciu::subscribe ( unsigned type, unsigned long nElem,
@@ -455,7 +339,7 @@ int nciu::subscribe ( unsigned type, unsigned long nElem,
netSubscription *pSubcr = new netSubscription ( *this,
type, nElem, mask, notify );
if ( pSubcr ) {
this->piiu->installSubscription ( *pSubcr );
this->cacCtx.installSubscription ( *pSubcr );
pNotifyIO = pSubcr;
return ECA_NORMAL;;
}
@@ -464,9 +348,100 @@ int nciu::subscribe ( unsigned type, unsigned long nElem,
}
}
void nciu::initiateConnect ()
{
this->notifyStateChangeFirstConnectInCountOfOutstandingIO ();
this->cacCtx.installNetworkChannel ( *this, this->piiu );
}
void nciu::hostName ( char *pBuf, unsigned bufLength ) const
{
epicsAutoMutex locker ( this->cacCtx.mutex() );
this->piiu->hostName ( pBuf, bufLength );
}
// deprecated - please do not use, this is _not_ thread safe
const char * nciu::pHostName () const
{
epicsAutoMutex locker ( this->cacCtx.mutex() );
return this->piiu->pHostName (); // ouch !
}
bool nciu::ca_v42_ok () const
{
epicsAutoMutex locker ( this->cacCtx.mutex() );
return this->piiu->ca_v42_ok ();
}
short nciu::nativeType () const
{
epicsAutoMutex locker ( this->cacCtx.mutex() );
short type;
if ( this->f_connected ) {
if ( this->typeCode < SHRT_MAX ) {
type = static_cast <short> ( this->typeCode );
}
else {
type = TYPENOTCONN;
}
}
else {
type = TYPENOTCONN;
}
return type;
}
unsigned long nciu::nativeElementCount () const
{
epicsAutoMutex locker ( this->cacCtx.mutex() );
unsigned long countOut;
if ( this->f_connected ) {
countOut = this->count;
}
else {
countOut = 0ul;
}
return countOut;
}
channel_state nciu::state () const
{
epicsAutoMutex locker ( this->cacCtx.mutex() );
channel_state stateOut;
if ( this->f_connected ) {
stateOut = cs_conn;
}
else if ( this->f_previousConn ) {
stateOut = cs_prev_conn;
}
else {
stateOut = cs_never_conn;
}
return stateOut;
}
caar nciu::accessRights () const
{
epicsAutoMutex locker ( this->cacCtx.mutex() );
caar tmp = this->accessRightState;
return tmp;
}
unsigned nciu::searchAttempts () const
{
epicsAutoMutex locker ( this->cacCtx.mutex() );
return this->retry;
}
double nciu::beaconPeriod () const
{
epicsAutoMutex locker ( this->cacCtx.mutex() );
return this->piiu->beaconPeriod ();
}
void nciu::notifyStateChangeFirstConnectInCountOfOutstandingIO ()
{
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
epicsAutoMutex locker ( this->cacCtx.mutex() );
// test is performed via a callback so that locking is correct
if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) {
if ( this->notify ().includeFirstConnectInCountOfOutstandingIO () ) {
@@ -486,6 +461,7 @@ void nciu::notifyStateChangeFirstConnectInCountOfOutstandingIO ()
void nciu::show ( unsigned level ) const
{
epicsAutoMutex locker ( this->cacCtx.mutex() );
if ( this->f_connected ) {
char hostNameTmp [256];
this->hostName ( hostNameTmp, sizeof ( hostNameTmp ) );
@@ -520,3 +496,4 @@ void nciu::show ( unsigned level ) const
}
+14 -22
View File
@@ -49,10 +49,7 @@ inline void nciu::resetRetryCount ()
inline void nciu::accessRightsStateChange ( const caar &arIn )
{
{
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
this->accessRightState = arIn;
}
this->accessRightState = arIn;
this->notify ().accessRightsNotify ( *this, arIn );
}
@@ -66,23 +63,6 @@ inline ca_uint32_t nciu::getCID () const
return this->id;
}
//
// this routine is used to verify that the channel's
// iiu pointer and connection state has not changed
// before the iiu's mutex was applied
//
inline bool nciu::verifyConnected ( netiiu &iiuIn )
{
return ( this->piiu == &iiuIn ) && this->f_connected;
}
inline bool nciu::verifyIIU ( netiiu &iiuIn )
{
return ( this->piiu == &iiuIn );
}
inline unsigned nciu::getRetrySeqNo () const
{
return this->retrySeqNo;
@@ -97,7 +77,6 @@ inline void nciu::connect ()
inline void nciu::searchReplySetUp ( netiiu &iiu, unsigned sidIn,
unsigned typeIn, unsigned long countIn )
{
epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex );
this->piiu = &iiu;
this->typeCode = typeIn;
this->count = countIn;
@@ -121,4 +100,17 @@ inline netiiu * nciu::getPIIU ()
return this->piiu;
}
inline void nciu::uninstallIO ( baseNMIU &io )
{
this->cacCtx.uninstallIO ( io );
}
inline void nciu::connectTimeoutNotify ()
{
this->f_connectTimeOutSeen = true;
}
-5
View File
@@ -34,11 +34,6 @@ class netSubscription * netSubscription::isSubscription ()
return this;
}
void netSubscription::ioCancelRequest ()
{
this->chan.getPIIU ()->subscriptionCancelRequest ( *this, true );
}
void netSubscription::show ( unsigned level ) const
{
printf ( "event subscription IO at %p, type %s, element count %lu, mask %u\n",
+30 -98
View File
@@ -28,8 +28,6 @@ netiiu::~netiiu ()
void netiiu::show ( unsigned level ) const
{
epicsAutoMutex autoMutex ( this->mutex );
printf ( "network IO base class\n" );
if ( level > 1 ) {
tsDLIterConstBD < nciu > pChan = this->channelList.firstIter ();
@@ -41,26 +39,6 @@ void netiiu::show ( unsigned level ) const
if ( level > 2u ) {
printf ( "\tcac pointer %p\n",
static_cast <void *> ( this->pClientCtx ) );
this->mutex.show ( level - 2u );
}
}
// cac lock must also be applied when
// calling this
void netiiu::attachChannel ( nciu &chan )
{
epicsAutoMutex autoMutex ( this->mutex );
this->channelList.add ( chan );
}
// cac lock must also be applied when
// calling this
void netiiu::detachChannel ( nciu &chan )
{
epicsAutoMutex autoMutex ( this->mutex );
this->channelList.remove ( chan );
if ( this->channelList.count () == 0u ) {
this->lastChannelDetachNotify ();
}
}
@@ -68,56 +46,20 @@ void netiiu::detachChannel ( nciu &chan )
// calling this
void netiiu::disconnectAllChan ( netiiu & newiiu )
{
tsDLList < nciu > list;
{
epicsAutoMutex autoMutex ( this->mutex );
tsDLIterBD < nciu > chan = this->channelList.firstIter ();
while ( chan.valid () ) {
tsDLIterBD < nciu > next = chan;
next++;
this->clearChannelRequest ( *chan );
this->channelList.remove ( *chan );
chan->disconnect ( newiiu );
list.add ( *chan );
chan = next;
}
tsDLIterBD < nciu > chan = this->channelList.firstIter ();
while ( chan.valid () ) {
tsDLIterBD < nciu > next = chan;
next++;
this->clearChannelRequest ( *chan );
this->channelList.remove ( *chan );
chan->disconnect ( newiiu );
newiiu.channelList.add ( *chan );
chan = next;
}
{
epicsAutoMutex autoMutex ( newiiu.mutex );
newiiu.channelList.add ( list );
}
}
//
// netiiu::destroyAllIO ()
//
// care is taken to not hold the lock while deleting the
// IO so that subscription delete request (sent by the
// IO's destructor) do not deadlock
//
bool netiiu::destroyAllIO ( nciu &chan )
{
tsDLList < baseNMIU > eventQ;
{
epicsAutoMutex autoMutex ( this->mutex );
if ( chan.verifyIIU ( *this ) ) {
eventQ.add ( chan.tcpiiuPrivateListOfIO::eventq );
}
else {
return false;
}
}
while ( baseNMIU *pIO = eventQ.get () ) {
delete pIO;
}
return true;
}
void netiiu::connectTimeoutNotify ()
{
epicsAutoMutex autoMutex ( this->mutex );
tsDLIterBD < nciu > chan = this->channelList.firstIter ();
while ( chan.valid () ) {
chan->connectTimeoutNotify ();
@@ -127,7 +69,6 @@ void netiiu::connectTimeoutNotify ()
void netiiu::resetChannelRetryCounts ()
{
epicsAutoMutex autoMutex ( this->mutex );
tsDLIterBD < nciu > chan = this->channelList.firstIter ();
while ( chan.valid () ) {
chan->resetRetryCount ();
@@ -139,8 +80,6 @@ bool netiiu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThis
{
bool success;
epicsAutoMutex autoMutex ( this->mutex );
if ( nciu *pChan = this->channelList.get () ) {
success = pChan->searchMsg ( retrySeqNumber, retryNoForThisChannel );
if ( success ) {
@@ -186,12 +125,12 @@ int netiiu::writeRequest ( nciu &, unsigned, unsigned, const void * )
return ECA_DISCONNCHID;
}
int netiiu::writeNotifyRequest ( nciu &, cacNotify &, unsigned, unsigned, const void * )
int netiiu::writeNotifyRequest ( nciu &, netWriteNotifyIO &, unsigned, unsigned, const void * )
{
return ECA_DISCONNCHID;
}
int netiiu::readNotifyRequest ( nciu &, cacNotify &, unsigned, unsigned )
int netiiu::readNotifyRequest ( nciu &, netReadNotifyIO &, unsigned, unsigned )
{
return ECA_DISCONNCHID;
}
@@ -206,28 +145,14 @@ int netiiu::clearChannelRequest ( nciu & )
return ECA_DISCONNCHID;
}
void netiiu::subscriptionRequest ( netSubscription &, bool )
void netiiu::subscriptionRequest ( netSubscription & )
{
}
void netiiu::subscriptionCancelRequest ( netSubscription &, bool )
void netiiu::subscriptionCancelRequest ( netSubscription & )
{
}
void netiiu::installSubscription ( netSubscription &subscr )
{
bool connectedWhenInstalled;
{
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr );
connectedWhenInstalled = subscr.channel ().connected ();
}
// iiu pointer briefly points at tcpiiu before the channel is connected
if ( connectedWhenInstalled ) {
this->subscriptionRequest ( subscr, true );
}
}
void netiiu::hostName ( char *pBuf, unsigned bufLength ) const
{
if ( bufLength ) {
@@ -249,17 +174,24 @@ void netiiu::connectAllIO ( nciu & )
{
}
bool netiiu::uninstallIO ( baseNMIU &io )
{
epicsAutoMutex autoMutex ( this->mutex );
if ( io.channel ().verifyIIU ( *this ) ) {
io.channel ().tcpiiuPrivateListOfIO::eventq.remove ( io );
return true;
}
return false;
}
double netiiu::beaconPeriod () const
{
return ( - DBL_MAX );
}
void netiiu::flushRequest ()
{
}
bool netiiu::flushBlockThreshold () const
{
return false;
}
void netiiu::flushRequestIfAboveEarlyThreshold ()
{
}
void netiiu::blockUntilSendBacklogIsReasonable ( epicsMutex & )
{
}
+17
View File
@@ -28,4 +28,21 @@ inline unsigned netiiu::channelCount () const
return this->channelList.count ();
}
// cac lock must also be applied when
// calling this
inline void netiiu::attachChannel ( nciu &chan )
{
this->channelList.add ( chan );
}
// cac lock must also be applied when
// calling this
inline void netiiu::detachChannel ( nciu &chan )
{
this->channelList.remove ( chan );
if ( this->channelList.count () == 0u ) {
this->lastChannelDetachNotify ();
}
}
#endif // netiiu_ILh
+139 -141
View File
@@ -18,8 +18,9 @@
//
// searchTimer::searchTimer ()
//
searchTimer::searchTimer ( udpiiu &iiuIn, epicsTimerQueue &queueIn ) :
searchTimer::searchTimer ( udpiiu &iiuIn, epicsTimerQueue &queueIn, epicsMutex &mutexIn ) :
timer ( queueIn.createTimer ( *this ) ),
mutex ( mutexIn ),
iiu ( iiuIn ),
framesPerTry ( INITIALTRIESPERFRAME ),
framesPerTryCongestThresh ( UINT_MAX ),
@@ -133,6 +134,7 @@ void searchTimer::notifySearchResponse ( unsigned short retrySeqNoIn )
//
epicsTimerNotify::expireStatus searchTimer::expire ()
{
epicsAutoMutex locker ( this->mutex );
unsigned nFrameSent = 0u;
unsigned nChanSent = 0u;
@@ -142,165 +144,161 @@ epicsTimerNotify::expireStatus searchTimer::expire ()
if ( this->iiu.channelCount () == 0 ) {
return noRestart;
}
{
epicsAutoMutex locker ( this->mutex );
/*
* increment the retry sequence number
*/
this->retrySeqNo++; /* allowed to roll over */
/*
* dynamically adjust the number of UDP frames per
* try depending how many search requests are not
* replied to
*
* This determines how many search request can be
* sent together (at the same instant in time).
*
* The variable this->framesPerTry
* determines the number of UDP frames to be sent
* each time that expire() is called.
* If this value is too high we will waste some
* network bandwidth. If it is too low we will
* use very little of the incoming UDP message
* buffer associated with the server's port and
* will therefore take longer to connect. We
* initialize this->framesPerTry
* to a prime number so that it is less likely that the
* same channel is in the last UDP frame
* sent every time that this is called (and
* potentially discarded by a CA server with
* a small UDP input queue).
*/
/*
* increase frames per try only if we see better than
* a 93.75% success rate for one pass through the list
*/
if (this->searchResponsesWithinThisPass >
(this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/16u)) ) {
/*
* increment the retry sequence number
* increase UDP frames per try if we have a good score
*/
this->retrySeqNo++; /* allowed to roll over */
/*
* dynamically adjust the number of UDP frames per
* try depending how many search requests are not
* replied to
*
* This determines how many search request can be
* sent together (at the same instant in time).
*
* The variable this->framesPerTry
* determines the number of UDP frames to be sent
* each time that expire() is called.
* If this value is too high we will waste some
* network bandwidth. If it is too low we will
* use very little of the incoming UDP message
* buffer associated with the server's port and
* will therefore take longer to connect. We
* initialize this->framesPerTry
* to a prime number so that it is less likely that the
* same channel is in the last UDP frame
* sent every time that this is called (and
* potentially discarded by a CA server with
* a small UDP input queue).
*/
/*
* increase frames per try only if we see better than
* a 93.75% success rate for one pass through the list
*/
if (this->searchResponsesWithinThisPass >
(this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/16u)) ) {
if ( this->framesPerTry < MAXTRIESPERFRAME ) {
/*
* increase UDP frames per try if we have a good score
* a congestion avoidance threshold similar to TCP is now used
*/
if ( this->framesPerTry < MAXTRIESPERFRAME ) {
/*
* a congestion avoidance threshold similar to TCP is now used
*/
if ( this->framesPerTry < this->framesPerTryCongestThresh ) {
this->framesPerTry += this->framesPerTry;
}
else {
this->framesPerTry += (this->framesPerTry/8) + 1;
}
debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n",
this->framesPerTry, this->searchTriesWithinThisPass, this->searchResponsesWithinThisPass) );
if ( this->framesPerTry < this->framesPerTryCongestThresh ) {
this->framesPerTry += this->framesPerTry;
}
else {
this->framesPerTry += (this->framesPerTry/8) + 1;
}
debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n",
this->framesPerTry, this->searchTriesWithinThisPass, this->searchResponsesWithinThisPass) );
}
}
/*
* if we detect congestion because we have less than a 87.5% success
* rate then gradually reduce the frames per try
*/
else if ( this->searchResponsesWithinThisPass <
(this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/8u)) ) {
if (this->framesPerTry>1) {
this->framesPerTry--;
}
this->framesPerTryCongestThresh = this->framesPerTry/2 + 1;
debugPrintf ( ("Congestion detected - set frames per try to %u t=%u r=%u\n",
this->framesPerTry, this->searchTriesWithinThisPass,
this->searchResponsesWithinThisPass) );
}
while ( 1 ) {
/*
* if we detect congestion because we have less than a 87.5% success
* rate then gradually reduce the frames per try
* clear counter when we reach the end of the list
*
* if we are making some progress then
* dont increase the delay between search
* requests
*/
else if ( this->searchResponsesWithinThisPass <
(this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/8u)) ) {
if (this->framesPerTry>1) {
this->framesPerTry--;
}
this->framesPerTryCongestThresh = this->framesPerTry/2 + 1;
debugPrintf ( ("Congestion detected - set frames per try to %u t=%u r=%u\n",
this->framesPerTry, this->searchTriesWithinThisPass,
this->searchResponsesWithinThisPass) );
}
while ( 1 ) {
/*
* clear counter when we reach the end of the list
*
* if we are making some progress then
* dont increase the delay between search
* requests
*/
if ( this->searchTriesWithinThisPass >= this->iiu.channelCount () ) {
if ( this->searchResponsesWithinThisPass == 0u ) {
debugPrintf ( ("increasing search try interval\n") );
this->setRetryInterval ( this->minRetry + 1u );
}
this->minRetry = UINT_MAX;
/*
* increment the retry sequence number
* (this prevents the time of the next search
* try from being set to the current time if
* we are handling a response from an old
* search message)
*/
this->retrySeqNo++; /* allowed to roll over */
/*
* so that old search tries will not update the counters
*/
this->retrySeqAtPassBegin = this->retrySeqNo;
this->searchTriesWithinThisPass = 0;
this->searchResponsesWithinThisPass = 0;
debugPrintf ( ("saw end of list\n") );
if ( this->searchTriesWithinThisPass >= this->iiu.channelCount () ) {
if ( this->searchResponsesWithinThisPass == 0u ) {
debugPrintf ( ("increasing search try interval\n") );
this->setRetryInterval ( this->minRetry + 1u );
}
unsigned retryNoForThisChannel;
if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) {
nFrameSent++;
if ( nFrameSent >= this->framesPerTry ) {
break;
}
this->iiu.flush ();
if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) {
break;
}
}
if ( this->minRetry > retryNoForThisChannel ) {
this->minRetry = retryNoForThisChannel;
}
if ( this->searchTriesWithinThisPass < UINT_MAX ) {
this->searchTriesWithinThisPass++;
}
if ( nChanSent < UINT_MAX ) {
nChanSent++;
}
this->minRetry = UINT_MAX;
/*
* dont send any of the channels twice within one try
* increment the retry sequence number
* (this prevents the time of the next search
* try from being set to the current time if
* we are handling a response from an old
* search message)
*/
if ( nChanSent >= this->iiu.channelCount () ) {
/*
* add one to nFrameSent because there may be
* one more partial frame to be sent
*/
nFrameSent++;
/*
* cap this->framesPerTry to
* the number of frames required for all of
* the unresolved channels
*/
if ( this->framesPerTry > nFrameSent ) {
this->framesPerTry = nFrameSent;
}
this->retrySeqNo++; /* allowed to roll over */
/*
* so that old search tries will not update the counters
*/
this->retrySeqAtPassBegin = this->retrySeqNo;
this->searchTriesWithinThisPass = 0;
this->searchResponsesWithinThisPass = 0;
debugPrintf ( ("saw end of list\n") );
}
unsigned retryNoForThisChannel;
if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) {
nFrameSent++;
if ( nFrameSent >= this->framesPerTry ) {
break;
}
this->iiu.datagramFlush ();
if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) {
break;
}
}
if ( this->minRetry > retryNoForThisChannel ) {
this->minRetry = retryNoForThisChannel;
}
if ( this->searchTriesWithinThisPass < UINT_MAX ) {
this->searchTriesWithinThisPass++;
}
if ( nChanSent < UINT_MAX ) {
nChanSent++;
}
/*
* dont send any of the channels twice within one try
*/
if ( nChanSent >= this->iiu.channelCount () ) {
/*
* add one to nFrameSent because there may be
* one more partial frame to be sent
*/
nFrameSent++;
/*
* cap this->framesPerTry to
* the number of frames required for all of
* the unresolved channels
*/
if ( this->framesPerTry > nFrameSent ) {
this->framesPerTry = nFrameSent;
}
break;
}
}
// flush out the search request buffer
this->iiu.flush ();
this->iiu.datagramFlush ();
debugPrintf ( ("sent %u delay sec=%f\n", nFrameSent, this->period) );
+293 -608
View File
File diff suppressed because it is too large Load Diff
+2 -2
View File
@@ -23,7 +23,7 @@ inline bool tcpiiu::fullyConstructed () const
inline void tcpiiu::hostName ( char *pBuf, unsigned bufLength ) const
{
epicsAutoMutex locker ( this->mutex );
epicsAutoMutex locker ( this->pCAC()->mutex() );
if ( this->pHostNameCache ) {
this->pHostNameCache->hostName ( pBuf, bufLength );
}
@@ -40,7 +40,7 @@ inline const char * tcpiiu::pHostName () const
return nameBuf; // ouch !!
}
inline void tcpiiu::flush ()
inline void tcpiiu::flushRequest ()
{
epicsEventSignal ( this->sendThreadFlushSignal );
}
+4 -15
View File
@@ -454,13 +454,10 @@ epicsShareFunc void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repea
void udpiiu::shutdown ()
{
{
epicsAutoMutex autoMutex ( this->mutex );
if ( this->shutdownCmd ) {
return;
}
this->shutdownCmd = true;
if ( this->shutdownCmd ) {
return;
}
this->shutdownCmd = true;
caHdr msg;
msg.m_cmmd = htons ( CA_PROTO_NOOP );
@@ -735,8 +732,6 @@ bool udpiiu::pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t e
return false;
}
epicsAutoMutex autoMutex ( this->mutex );
if ( msgsize + this->nBytesInXmitBuf > sizeof ( this->xmitBuf ) ) {
return false;
}
@@ -754,15 +749,10 @@ bool udpiiu::pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t e
return true;
}
//
// udpiiu::flush ()
//
void udpiiu::flush ()
void udpiiu::datagramFlush ()
{
osiSockAddrNode *pNode;
epicsAutoMutex autoMutex ( this->mutex );
if ( this->nBytesInXmitBuf == 0u ) {
return;
}
@@ -820,7 +810,6 @@ void udpiiu::flush ()
void udpiiu::show ( unsigned level ) const
{
epicsAutoMutex autoMutex ( this->mutex );
printf ( "Datagram IO circuit (and disconnected channel repository)\n");
if ( level > 1u ) {
this->netiiu::show ( level - 1u );