upgraded event subscription logic

This commit is contained in:
Jeff Hill
2001-01-23 23:56:05 +00:00
parent debb5b5814
commit db43b6e983
14 changed files with 189 additions and 151 deletions
+2 -11
View File
@@ -27,18 +27,9 @@ void baseNMIU::destroy ()
delete this;
}
int baseNMIU::subscriptionMsg ()
class netSubscription * baseNMIU::isSubscription ()
{
return ECA_NORMAL;
}
void baseNMIU::subscriptionCancelMsg ()
{
}
bool baseNMIU::isSubscription () const
{
return false;
return 0;
}
void baseNMIU::show ( unsigned /* level */ ) const
+9 -23
View File
@@ -21,6 +21,7 @@
#include "nciu_IL.h"
#include "ioCounter_IL.h"
#include "comQueSend_IL.h"
#include "recvProcessThread_IL.h"
extern "C" void cacRecursionLockExitHandler ()
{
@@ -157,9 +158,9 @@ cac::~cac ()
// make certain that process thread isnt deleting
// tcpiiu objects at the same that this thread is
//
recvProcessThread *pTmp = this->pRecvProcThread;
this->pRecvProcThread = 0;
delete pTmp;
if ( this->pRecvProcThread ) {
this->pRecvProcThread->disable ();
}
if ( this->pudpiiu ) {
// this blocks until the UDP thread exits so that
@@ -193,6 +194,10 @@ cac::~cac ()
}
}
if ( this->pRecvProcThread ) {
delete this->pRecvProcThread;
}
if ( this->pRepeaterSubscribeTmr ) {
delete this->pRepeaterSubscribeTmr;
}
@@ -1031,23 +1036,4 @@ void cac::destroyNCIU ( nciu & chan )
chan.cacDestroy ();
}
// the recv thread is not permitted to flush as this
// can result in a push / pull deadlock on the TCP pipe.
// Instead, the recv thread scheduals the flush with the
// send thread which runs at a higher priority than the
// send thread. The same applies to the UDP thread for
// locking hierarchy reasons.
bool cac::flushPermit () const
{
if ( this->pRecvProcThread ) {
if ( this->pRecvProcThread->thread.isCurrentThread () ) {
return false;
}
}
if ( this->pudpiiu ) {
if ( this->pudpiiu->isCurrentThread () ) {
return false;
}
}
return true;
}
+27
View File
@@ -15,6 +15,12 @@
* 505 665 1831
*/
#ifndef cac_ILh
#define cac_ILh
#include "recvProcessThread_IL.h"
#include "udpiiu_IL.h"
inline int cac::vPrintf ( const char *pformat, va_list args )
{
return ( *this->pVPrintfFunc ) ( pformat, args );
@@ -54,4 +60,25 @@ inline unsigned cac::getInitializingThreadsPriority () const
return this->initializingThreadsPriority;
}
// the recv thread is not permitted to flush as this
// can result in a push / pull deadlock on the TCP pipe.
// Instead, the recv thread scheduals the flush with the
// send thread which runs at a higher priority than the
// send thread. The same applies to the UDP thread for
// locking hierarchy reasons.
inline bool cac::flushPermit () const
{
if ( this->pRecvProcThread ) {
if ( this->pRecvProcThread->isCurrentThread () ) {
return false;
}
}
if ( this->pudpiiu ) {
if ( this->pudpiiu->isCurrentThread () ) {
return false;
}
}
return true;
}
#endif // cac_ILh
+1 -1
View File
@@ -152,7 +152,7 @@ READONLY char *ca_message_text[]
"Database value get for that channel failed during channel search",
"Unable to initialize without the vxWorks VX_FP_TASK task option set",
"Event queue overflow has prevented first pass event after event add",
"A monitor by that id cant be found",
"bad event subscription identifier",
"Remote channel has new network address",
"New or resumed network connection",
"Specified task isnt a member of a CA context",
+16 -24
View File
@@ -293,16 +293,12 @@ public:
void decrementOutstandingIO ( unsigned seqNumber );
bool searchMsg ( unsigned short retrySeqNumber,
unsigned &retryNoForThisChannel );
void subscriptionCancelMsg ( class netSubscription &subsc );
bool fullyConstructed () const;
bool isAttachedToVirtaulCircuit ( const osiSockAddr & );
bool identifierEquivelence ( unsigned idToMatch );
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
int subscriptionMsg ( netSubscription &, bool userThread );
void unistallSubscription ( netSubscription & );
void unistallSubscription ( class netSubscription & );
void resetRetryCount ();
unsigned getRetrySeqNo () const;
void accessRightsStateChange ( const caar &arIn );
@@ -350,10 +346,8 @@ public:
virtual void completionNotify ( unsigned type, unsigned long count, const void *pData ) = 0;
virtual void exceptionNotify ( int status, const char *pContext ) = 0;
virtual void exceptionNotify ( int status, const char *pContext, unsigned type, unsigned long count ) = 0;
virtual bool isSubscription () const;
virtual class netSubscription * isSubscription ();
virtual void show ( unsigned level ) const;
virtual int subscriptionMsg ();
virtual void subscriptionCancelMsg ();
ca_uint32_t getID () const;
nciu & channel ();
void destroy ();
@@ -381,9 +375,7 @@ private:
void completionNotify ( unsigned type, unsigned long count, const void *pData );
void exceptionNotify ( int status, const char *pContext );
void exceptionNotify ( int status, const char *pContext, unsigned type, unsigned long count );
int subscriptionMsg ();
void subscriptionCancelMsg ();
bool isSubscription () const;
class netSubscription * isSubscription ();
~netSubscription ();
static tsFreeList < class netSubscription, 1024 > freeList;
};
@@ -501,6 +493,8 @@ public:
void resetChannelRetryCounts ();
void attachChannel ( nciu &chan );
void detachChannel ( nciu &chan );
int installSubscription ( netSubscription &subscr );
void unistallSubscription ( netSubscription &subscr );
virtual void hostName (char *pBuf, unsigned bufLength) const;
virtual const char * pHostName () const; // deprecated - please do not use
virtual bool isVirtaulCircuit ( const char *pChannelName, const osiSockAddr &addr ) const;
@@ -512,22 +506,22 @@ public:
virtual int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue );
virtual int readCopyRequest ( nciu &, unsigned type, unsigned nElem, void *pValue );
virtual int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem );
virtual int subscriptionRequest ( netSubscription &subscr, bool userThread );
virtual int subscriptionCancelRequest ( netSubscription &subscr );
virtual int installSubscription ( netSubscription &subscr );
virtual void unistallSubscription ( nciu &chan, netSubscription &subscr );
virtual void subscribeAllIO ( nciu &chan );
virtual int createChannelRequest ( nciu & );
virtual void connectAllIO ( nciu &chan );
virtual void disconnectAllIO ( nciu &chan );
virtual int clearChannelRequest ( nciu & );
protected:
cac * pCAC () const;
mutable epicsMutex mutex;
private:
tsDLList < nciu > channelList;
class cac *pClientCtx;
virtual void lastChannelDetachNotify ();
virtual int subscriptionRequest ( netSubscription &subscr, bool userThread );
virtual int subscriptionCancelRequest ( netSubscription &subscr, bool userThread );
};
class limboiiu : public netiiu {
@@ -753,10 +747,6 @@ public:
int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem );
int createChannelRequest ( nciu & );
int clearChannelRequest ( nciu & );
int subscriptionRequest ( netSubscription &subscr, bool userThread );
int subscriptionCancelRequest ( netSubscription &subscr );
int installSubscription ( netSubscription &subscr );
void unistallSubscription ( nciu &chan, netSubscription &subscr );
void hostName ( char *pBuf, unsigned bufLength ) const;
const char * pHostName () const; // deprecated - please do not use
@@ -839,9 +829,12 @@ private:
int status, const char *pContext );
void ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext, unsigned type, unsigned long count );
void subscribeAllIO ( nciu &chan );
void connectAllIO ( nciu &chan );
void disconnectAllIO ( nciu &chan );
int subscriptionRequest ( netSubscription &subscr, bool userThread );
int subscriptionCancelRequest ( netSubscription &subscr, bool userThread );
typedef void ( tcpiiu::*pProtoStubTCP ) ();
static const pProtoStubTCP tcpJumpTableCAC [];
};
@@ -908,12 +901,11 @@ public:
recvProcessThread ( class cac *pcacIn );
virtual ~recvProcessThread ();
void run ();
void signalShutDown ();
void enable ();
void disable ();
void signalActivity ();
bool isCurrentThread () const;
void show ( unsigned level ) const;
epicsThread thread;
private:
//
// The additional complexity associated with
@@ -922,6 +914,7 @@ private:
// and therefore reduces the chance of creating
// a deadlock window during code maintenance.
//
epicsThread thread;
epicsEvent recvActivity;
class cac *pcac;
epicsEvent exit;
@@ -938,7 +931,6 @@ public:
sendProcessThread ( class cac &cacIn );
virtual ~sendProcessThread ();
void run ();
void signalShutDown ();
void signalActivity ();
epicsThread thread;
private:
+2 -11
View File
@@ -67,6 +67,7 @@ void nciu::destroy ()
{
// this occurs here so that it happens when
// a lock is not applied
this->piiu->disconnectAllIO ( *this );
this->piiu->clearChannelRequest ( *this );
this->cacCtx.destroyNCIU ( *this );
}
@@ -260,7 +261,7 @@ void nciu::connect ( unsigned nativeType,
this->unlock ();
// resubscribe for monitors from this channel
this->piiu->subscribeAllIO ( *this );
this->piiu->connectAllIO ( *this );
this->connectNotify ();
@@ -351,16 +352,6 @@ bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisCh
return status;
}
int nciu::subscriptionMsg ( netSubscription &subscr, bool userThread )
{
return this->piiu->subscriptionRequest ( subscr, userThread );
}
void nciu::unistallSubscription ( netSubscription &subscr )
{
this->piiu->unistallSubscription ( *this, subscr );
}
void nciu::incrementOutstandingIO ()
{
this->cacCtx.incrementOutstandingIO ();
+6 -5
View File
@@ -93,11 +93,6 @@ inline unsigned nciu::getRetrySeqNo () const
return this->retrySeqNo;
}
inline void nciu::subscriptionCancelMsg ( netSubscription &subsc )
{
this->piiu->subscriptionCancelRequest ( subsc );
}
// this is to only be used by early protocol revisions
inline void nciu::connect ()
{
@@ -132,3 +127,9 @@ inline netiiu * nciu::getPIIU ()
return this->piiu;
}
inline void nciu::unistallSubscription ( netSubscription &subscr )
{
this->piiu->unistallSubscription ( subscr );
}
+2 -13
View File
@@ -25,7 +25,6 @@ netSubscription::netSubscription ( nciu &chan, unsigned typeIn, unsigned long co
netSubscription::~netSubscription ()
{
this->chan.subscriptionCancelMsg ( *this );
this->chan.unistallSubscription ( *this );
}
@@ -34,19 +33,9 @@ void netSubscription::destroy ()
delete this;
}
int netSubscription::subscriptionMsg ()
class netSubscription * netSubscription::isSubscription ()
{
return this->chan.subscriptionMsg ( *this, false );
}
void netSubscription::subscriptionCancelMsg ()
{
this->chan.subscriptionCancelMsg ( *this );
}
bool netSubscription::isSubscription () const
{
return true;
return this;
}
void netSubscription::completionNotify ()
+20 -9
View File
@@ -196,22 +196,33 @@ int netiiu::subscriptionRequest ( netSubscription &subscr, bool )
return ECA_NORMAL;
}
int netiiu::subscriptionCancelRequest ( netSubscription & )
int netiiu::subscriptionCancelRequest ( netSubscription &, bool userThread )
{
return ECA_DISCONNCHID;
return ECA_NORMAL;
}
int netiiu::installSubscription ( netSubscription &subscr )
{
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr );
return ECA_NORMAL;
// we must install the subscription first on the channel so that
// proper installation is guaranteed to occur if a connect occurs
// beteen these two steps
{
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr );
}
return this->subscriptionRequest ( subscr, true );
}
void netiiu::unistallSubscription ( nciu &, netSubscription &subscr )
void netiiu::unistallSubscription ( netSubscription &subscr )
{
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.remove ( subscr );
// we must cancel the subscription first so that clean up
// is guaranteed to occur if a disconnect occurs beteen
// these two steps
this->subscriptionCancelRequest ( subscr, true );
{
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.remove ( subscr );
}
}
void netiiu::hostName ( char *pBuf, unsigned bufLength ) const
@@ -231,6 +242,6 @@ void netiiu::disconnectAllIO ( nciu & )
{
}
void netiiu::subscribeAllIO ( nciu & )
void netiiu::connectAllIO ( nciu & )
{
}
+2 -7
View File
@@ -34,7 +34,8 @@ recvProcessThread::recvProcessThread (cac *pcacIn) :
recvProcessThread::~recvProcessThread ()
{
this->signalShutDown ();
this->shutDown = true;
this->recvActivity.signal ();
this->exit.wait ();
}
@@ -71,12 +72,6 @@ void recvProcessThread::run ()
this->exit.signal ();
}
void recvProcessThread::signalShutDown ()
{
this->shutDown = true;
this->recvActivity.signal ();
}
void recvProcessThread::enable ()
{
unsigned copy;
+26
View File
@@ -0,0 +1,26 @@
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef recvProcessThread_ILh
#define recvProcessThread_ILh
inline bool recvProcessThread::isCurrentThread () const
{
return this->thread.isCurrentThread ();
}
#endif // recvProcessThread_ILh
+50 -43
View File
@@ -1687,11 +1687,6 @@ int tcpiiu::clearChannelRequest ( nciu &chan )
epicsAutoMutex autoMutex ( this->mutex );
baseNMIU *pNMIU;
while ( ( pNMIU = chan.tcpiiuPrivateListOfIO::eventq.first () ) ) {
delete pNMIU;
}
if ( ! chan.verifyConnected ( *this ) ) {
status = ECA_DISCONNCHID;
}
@@ -1741,6 +1736,8 @@ int tcpiiu::subscriptionRequest ( netSubscription &subscr, bool userThread )
status = ECA_NORMAL;
}
else {
this->ioTable.add ( subscr );
// header
this->sendQue.pushUInt16 ( CA_PROTO_EVENT_ADD ); // cmd
this->sendQue.pushUInt16 ( 16u ); // postsize
@@ -1761,28 +1758,40 @@ int tcpiiu::subscriptionRequest ( netSubscription &subscr, bool userThread )
return status;
}
int tcpiiu::subscriptionCancelRequest ( netSubscription &subscr )
int tcpiiu::subscriptionCancelRequest ( netSubscription &subscr, bool userThread )
{
if ( this->sendQue.flushThreshold ( 16u ) ) {
this->flushToWire ( true );
if ( userThread ) {
this->flushToWire ( true );
}
else {
this->flush ();
}
}
epicsAutoMutex autoMutex ( this->mutex );
int status = this->sendQue.reserveSpace ( 16u );
if ( status == ECA_NORMAL ) {
if ( ! subscr.channel ().verifyConnected ( *this ) ) {
status = ECA_DISCONNCHID;
}
else {
this->sendQue.pushUInt16 ( CA_PROTO_EVENT_CANCEL ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getType () ) ); // dataType
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getCount () ) ); // count
this->sendQue.pushUInt32 ( subscr.channel ().getSID () ); // cid
this->sendQue.pushUInt32 ( subscr.getID () ); // available
int status;
baseNMIU *pIO = this->ioTable.remove ( subscr );
if ( pIO == &subscr ) {
status = this->sendQue.reserveSpace ( 16u );
if ( status == ECA_NORMAL ) {
if ( ! subscr.channel ().verifyConnected ( *this ) ) {
status = ECA_DISCONNCHID;
}
else {
this->sendQue.pushUInt16 ( CA_PROTO_EVENT_CANCEL ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getType () ) ); // dataType
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getCount () ) ); // count
this->sendQue.pushUInt32 ( subscr.channel ().getSID () ); // cid
this->sendQue.pushUInt32 ( subscr.getID () ); // available
}
}
}
else {
status = ECA_BADMONID;
}
return status;
}
@@ -1950,18 +1959,27 @@ void tcpiiu::ioExceptionNotifyAndDestroy ( unsigned id, int status,
}
}
// resubscribe for monitors from this channel
void tcpiiu::subscribeAllIO ( nciu &chan )
void tcpiiu::connectAllIO ( nciu &chan )
{
epicsAutoMutex autoMutex ( this->mutex );
if ( chan.verifyConnected ( *this ) ) {
tsDLIterBD < baseNMIU > iter =
chan.tcpiiuPrivateListOfIO::eventq.first ();
while ( iter.valid () ) {
this->ioTable.add ( *iter );
iter->subscriptionMsg ();
iter++;
tsDLIterBD < baseNMIU > next = iter.itemAfter ();
class netSubscription *pSubscr = iter->isSubscription ();
if ( pSubscr ) {
this->subscriptionRequest ( *pSubscr, false );
}
else {
// it shouldnt be here at this point - so uninstall it
this->ioTable.remove ( *iter );
chan.tcpiiuPrivateListOfIO::eventq.remove ( *iter );
iter->exceptionNotify ( ECA_DISCONN, this->pHostName () );
iter->destroy ();
}
iter = next;
}
}
this->flush ();
@@ -1976,10 +1994,15 @@ void tcpiiu::disconnectAllIO ( nciu &chan )
chan.tcpiiuPrivateListOfIO::eventq.first ();
while ( iter.valid () ) {
tsDLIterBD < baseNMIU > next = iter.itemAfter ();
this->ioTable.remove ( *iter );
if ( ! iter->isSubscription () ) {
iter->exceptionNotify ( ECA_DISCONN, this->pHostName () );
class netSubscription *pSubscr = iter->isSubscription ();
if ( pSubscr ) {
this->subscriptionCancelRequest ( *pSubscr, false );
}
else {
// no use after disconnected - so uninstall it
this->ioTable.remove ( *iter );
chan.tcpiiuPrivateListOfIO::eventq.remove ( *iter );
iter->exceptionNotify ( ECA_DISCONN, this->pHostName () );
iter->destroy ();
}
iter = next;
@@ -1987,20 +2010,4 @@ void tcpiiu::disconnectAllIO ( nciu &chan )
}
}
int tcpiiu::installSubscription ( netSubscription &subscr )
{
{
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr );
this->ioTable.add ( subscr );
}
return this->subscriptionRequest ( subscr, true );
}
void tcpiiu::unistallSubscription ( nciu &chan, netSubscription &subscr )
{
epicsAutoMutex autoMutex ( this->mutex );
chan.tcpiiuPrivateListOfIO::eventq.remove ( subscr );
baseNMIU *pIO = this->ioTable.remove ( subscr );
assert ( pIO == &subscr );
}
-4
View File
@@ -850,7 +850,3 @@ void udpiiu::show ( unsigned level ) const
}
}
bool udpiiu::isCurrentThread () const
{
return ( this->recvThreadId == epicsThreadGetIdSelf () );
}
+26
View File
@@ -0,0 +1,26 @@
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef udpiiu_ILh
#define udpiiu_ILh
inline bool udpiiu::isCurrentThread () const
{
return ( this->recvThreadId == epicsThreadGetIdSelf () );
}
#endif // udpiiu_ILh