fixed event subscriptions

This commit is contained in:
Jeff Hill
2001-01-19 00:58:01 +00:00
parent 7a334eb417
commit a74b043aff
5 changed files with 56 additions and 39 deletions

View File

@@ -251,6 +251,7 @@ class baseNMIU;
class tcpiiuPrivateListOfIO {
private:
friend tcpiiu;
friend netiiu; // used to install subscriptions when not connected
tsDLList < class baseNMIU > eventq;
};
@@ -513,6 +514,7 @@ public:
virtual int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem );
virtual int subscriptionRequest ( netSubscription &subscr, bool userThread );
virtual int subscriptionCancelRequest ( netSubscription &subscr );
virtual int installSubscription ( netSubscription &subscr );
virtual void unistallSubscription ( nciu &chan, netSubscription &subscr );
virtual void subscribeAllIO ( nciu &chan );
virtual int createChannelRequest ( nciu & );
@@ -753,6 +755,7 @@ public:
int clearChannelRequest ( nciu & );
int subscriptionRequest ( netSubscription &subscr, bool userThread );
int subscriptionCancelRequest ( netSubscription &subscr );
int installSubscription ( netSubscription &subscr );
void unistallSubscription ( nciu &chan, netSubscription &subscr );
void hostName ( char *pBuf, unsigned bufLength ) const;

View File

@@ -353,22 +353,12 @@ bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisCh
int nciu::subscriptionMsg ( netSubscription &subscr, bool userThread )
{
int status;
if ( this->f_connected ) {
status = this->piiu->subscriptionRequest ( subscr, userThread );
}
else {
status = ECA_NORMAL;
}
return status;
return this->piiu->subscriptionRequest ( subscr, userThread );
}
void nciu::unistallSubscription ( netSubscription &subscr )
{
if ( this->f_connected ) {
this->piiu->unistallSubscription ( *this, subscr );
}
this->piiu->unistallSubscription ( *this, subscr );
}
void nciu::incrementOutstandingIO ()
@@ -518,7 +508,7 @@ int nciu::subscribe ( unsigned type, unsigned long nElem,
netSubscription *pSubcr = new netSubscription ( *this,
type, nElem, mask, notify );
if ( pSubcr ) {
int status = this->piiu->subscriptionRequest ( *pSubcr, true );
int status = this->piiu->installSubscription ( *pSubcr );
if ( status != ECA_NORMAL ) {
pSubcr->destroy ();
}

View File

@@ -190,7 +190,7 @@ int netiiu::clearChannelRequest ( nciu & )
return ECA_DISCONNCHID;
}
int netiiu::subscriptionRequest ( netSubscription &, bool )
int netiiu::subscriptionRequest ( netSubscription &subscr, bool )
{
return ECA_NORMAL;
}
@@ -200,8 +200,17 @@ int netiiu::subscriptionCancelRequest ( netSubscription & )
return ECA_DISCONNCHID;
}
void netiiu::unistallSubscription ( nciu &, netSubscription & )
int netiiu::installSubscription ( netSubscription &subscr )
{
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr );
return ECA_NORMAL;
}
void netiiu::unistallSubscription ( nciu &, netSubscription &subscr )
{
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.remove ( subscr );
}
void netiiu::hostName ( char *pBuf, unsigned bufLength ) const

View File

@@ -1679,25 +1679,25 @@ int tcpiiu::createChannelRequest ( nciu &chan )
int tcpiiu::clearChannelRequest ( nciu &chan )
{
int status;
if ( this->sendQue.flushThreshold ( 16u ) ) {
this->flushToWire ( true );
}
epicsAutoMutex autoMutex ( this->mutex );
int status = this->sendQue.reserveSpace ( 16u );
if ( status == ECA_NORMAL ) {
if ( ! chan.verifyConnected ( *this ) ) {
status = ECA_DISCONNCHID;
}
else {
baseNMIU *pNMIU;
while ( ( pNMIU = chan.tcpiiuPrivateListOfIO::eventq.get () ) ) {
baseNMIU *pFound = this->ioTable.remove ( *pNMIU );
assert ( pFound == pNMIU );
pNMIU->subscriptionCancelMsg ();
delete pNMIU;
}
baseNMIU *pNMIU;
while ( ( pNMIU = chan.tcpiiuPrivateListOfIO::eventq.first () ) ) {
delete pNMIU;
}
if ( ! chan.verifyConnected ( *this ) ) {
status = ECA_DISCONNCHID;
}
else {
status = this->sendQue.reserveSpace ( 16u );
if ( status == ECA_NORMAL ) {
this->sendQue.pushUInt16 ( CA_PROTO_CLEAR_CHANNEL ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
@@ -1741,9 +1741,6 @@ int tcpiiu::subscriptionRequest ( netSubscription &subscr, bool userThread )
status = ECA_NORMAL;
}
else {
this->ioTable.add ( subscr );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr );
// header
this->sendQue.pushUInt16 ( CA_PROTO_EVENT_ADD ); // cmd
this->sendQue.pushUInt16 ( 16u ); // postsize
@@ -1962,10 +1959,12 @@ void tcpiiu::subscribeAllIO ( nciu &chan )
tsDLIterBD < baseNMIU > iter =
chan.tcpiiuPrivateListOfIO::eventq.first ();
while ( iter.valid () ) {
this->ioTable.add ( *iter );
iter->subscriptionMsg ();
iter++;
}
}
this->flush ();
}
// cancel IO operations and monitor subscriptions
@@ -1988,13 +1987,20 @@ void tcpiiu::disconnectAllIO ( nciu &chan )
}
}
int tcpiiu::installSubscription ( netSubscription &subscr )
{
{
epicsAutoMutex autoMutex ( this->mutex );
subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr );
this->ioTable.add ( subscr );
}
return this->subscriptionRequest ( subscr, true );
}
void tcpiiu::unistallSubscription ( nciu &chan, netSubscription &subscr )
{
epicsAutoMutex autoMutex ( this->mutex );
if ( chan.verifyConnected ( *this ) ) {
baseNMIU *p = this->ioTable.remove ( subscr );
if ( p ) {
chan.tcpiiuPrivateListOfIO::eventq.remove ( subscr );
}
}
chan.tcpiiuPrivateListOfIO::eventq.remove ( subscr );
baseNMIU *pIO = this->ioTable.remove ( subscr );
assert ( pIO == &subscr );
}

View File

@@ -47,11 +47,20 @@ inline SOCKET tcpiiu::getSock () const
inline void tcpiiu::flush ()
{
bool signalNeeded;
{
epicsAutoMutex autoMutex ( this->mutex );
this->flushPending = true;
if ( this->sendQue.occupiedBytes () ) {
this->flushPending = true;
signalNeeded = true;
}
else {
signalNeeded = false;
}
}
if ( signalNeeded ) {
epicsEventSignal ( this->sendThreadFlushSignal );
}
epicsEventSignal ( this->sendThreadFlushSignal );
}
inline bool tcpiiu::ca_v44_ok () const