diff --git a/src/ca/iocinf.h b/src/ca/iocinf.h index 19b855534..ebfc8f135 100644 --- a/src/ca/iocinf.h +++ b/src/ca/iocinf.h @@ -251,6 +251,7 @@ class baseNMIU; class tcpiiuPrivateListOfIO { private: friend tcpiiu; + friend netiiu; // used to install subscriptions when not connected tsDLList < class baseNMIU > eventq; }; @@ -513,6 +514,7 @@ public: virtual int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem ); virtual int subscriptionRequest ( netSubscription &subscr, bool userThread ); virtual int subscriptionCancelRequest ( netSubscription &subscr ); + virtual int installSubscription ( netSubscription &subscr ); virtual void unistallSubscription ( nciu &chan, netSubscription &subscr ); virtual void subscribeAllIO ( nciu &chan ); virtual int createChannelRequest ( nciu & ); @@ -753,6 +755,7 @@ public: int clearChannelRequest ( nciu & ); int subscriptionRequest ( netSubscription &subscr, bool userThread ); int subscriptionCancelRequest ( netSubscription &subscr ); + int installSubscription ( netSubscription &subscr ); void unistallSubscription ( nciu &chan, netSubscription &subscr ); void hostName ( char *pBuf, unsigned bufLength ) const; diff --git a/src/ca/nciu.cpp b/src/ca/nciu.cpp index 9b8cd7888..6a1a475e8 100644 --- a/src/ca/nciu.cpp +++ b/src/ca/nciu.cpp @@ -353,22 +353,12 @@ bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisCh int nciu::subscriptionMsg ( netSubscription &subscr, bool userThread ) { - int status; - if ( this->f_connected ) { - status = this->piiu->subscriptionRequest ( subscr, userThread ); - } - else { - status = ECA_NORMAL; - } - - return status; + return this->piiu->subscriptionRequest ( subscr, userThread ); } void nciu::unistallSubscription ( netSubscription &subscr ) { - if ( this->f_connected ) { - this->piiu->unistallSubscription ( *this, subscr ); - } + this->piiu->unistallSubscription ( *this, subscr ); } void nciu::incrementOutstandingIO () @@ -518,7 +508,7 @@ int nciu::subscribe ( unsigned type, unsigned long nElem, netSubscription *pSubcr = new netSubscription ( *this, type, nElem, mask, notify ); if ( pSubcr ) { - int status = this->piiu->subscriptionRequest ( *pSubcr, true ); + int status = this->piiu->installSubscription ( *pSubcr ); if ( status != ECA_NORMAL ) { pSubcr->destroy (); } diff --git a/src/ca/netiiu.cpp b/src/ca/netiiu.cpp index 734c2d2d5..163a44ce4 100644 --- a/src/ca/netiiu.cpp +++ b/src/ca/netiiu.cpp @@ -190,7 +190,7 @@ int netiiu::clearChannelRequest ( nciu & ) return ECA_DISCONNCHID; } -int netiiu::subscriptionRequest ( netSubscription &, bool ) +int netiiu::subscriptionRequest ( netSubscription &subscr, bool ) { return ECA_NORMAL; } @@ -200,8 +200,17 @@ int netiiu::subscriptionCancelRequest ( netSubscription & ) return ECA_DISCONNCHID; } -void netiiu::unistallSubscription ( nciu &, netSubscription & ) +int netiiu::installSubscription ( netSubscription &subscr ) { + epicsAutoMutex autoMutex ( this->mutex ); + subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr ); + return ECA_NORMAL; +} + +void netiiu::unistallSubscription ( nciu &, netSubscription &subscr ) +{ + epicsAutoMutex autoMutex ( this->mutex ); + subscr.channel ().tcpiiuPrivateListOfIO::eventq.remove ( subscr ); } void netiiu::hostName ( char *pBuf, unsigned bufLength ) const diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 6bd1b4150..97622eece 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -1679,25 +1679,25 @@ int tcpiiu::createChannelRequest ( nciu &chan ) int tcpiiu::clearChannelRequest ( nciu &chan ) { + int status; + if ( this->sendQue.flushThreshold ( 16u ) ) { this->flushToWire ( true ); } epicsAutoMutex autoMutex ( this->mutex ); - int status = this->sendQue.reserveSpace ( 16u ); - if ( status == ECA_NORMAL ) { - if ( ! chan.verifyConnected ( *this ) ) { - status = ECA_DISCONNCHID; - } - else { - baseNMIU *pNMIU; - while ( ( pNMIU = chan.tcpiiuPrivateListOfIO::eventq.get () ) ) { - baseNMIU *pFound = this->ioTable.remove ( *pNMIU ); - assert ( pFound == pNMIU ); - pNMIU->subscriptionCancelMsg (); - delete pNMIU; - } + baseNMIU *pNMIU; + while ( ( pNMIU = chan.tcpiiuPrivateListOfIO::eventq.first () ) ) { + delete pNMIU; + } + + if ( ! chan.verifyConnected ( *this ) ) { + status = ECA_DISCONNCHID; + } + else { + status = this->sendQue.reserveSpace ( 16u ); + if ( status == ECA_NORMAL ) { this->sendQue.pushUInt16 ( CA_PROTO_CLEAR_CHANNEL ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType @@ -1741,9 +1741,6 @@ int tcpiiu::subscriptionRequest ( netSubscription &subscr, bool userThread ) status = ECA_NORMAL; } else { - this->ioTable.add ( subscr ); - subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr ); - // header this->sendQue.pushUInt16 ( CA_PROTO_EVENT_ADD ); // cmd this->sendQue.pushUInt16 ( 16u ); // postsize @@ -1962,10 +1959,12 @@ void tcpiiu::subscribeAllIO ( nciu &chan ) tsDLIterBD < baseNMIU > iter = chan.tcpiiuPrivateListOfIO::eventq.first (); while ( iter.valid () ) { + this->ioTable.add ( *iter ); iter->subscriptionMsg (); iter++; } } + this->flush (); } // cancel IO operations and monitor subscriptions @@ -1988,13 +1987,20 @@ void tcpiiu::disconnectAllIO ( nciu &chan ) } } +int tcpiiu::installSubscription ( netSubscription &subscr ) +{ + { + epicsAutoMutex autoMutex ( this->mutex ); + subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr ); + this->ioTable.add ( subscr ); + } + return this->subscriptionRequest ( subscr, true ); +} + void tcpiiu::unistallSubscription ( nciu &chan, netSubscription &subscr ) { epicsAutoMutex autoMutex ( this->mutex ); - if ( chan.verifyConnected ( *this ) ) { - baseNMIU *p = this->ioTable.remove ( subscr ); - if ( p ) { - chan.tcpiiuPrivateListOfIO::eventq.remove ( subscr ); - } - } + chan.tcpiiuPrivateListOfIO::eventq.remove ( subscr ); + baseNMIU *pIO = this->ioTable.remove ( subscr ); + assert ( pIO == &subscr ); } diff --git a/src/ca/tcpiiu_IL.h b/src/ca/tcpiiu_IL.h index f40aae4b1..17808875a 100644 --- a/src/ca/tcpiiu_IL.h +++ b/src/ca/tcpiiu_IL.h @@ -47,11 +47,20 @@ inline SOCKET tcpiiu::getSock () const inline void tcpiiu::flush () { + bool signalNeeded; { epicsAutoMutex autoMutex ( this->mutex ); - this->flushPending = true; + if ( this->sendQue.occupiedBytes () ) { + this->flushPending = true; + signalNeeded = true; + } + else { + signalNeeded = false; + } + } + if ( signalNeeded ) { + epicsEventSignal ( this->sendThreadFlushSignal ); } - epicsEventSignal ( this->sendThreadFlushSignal ); } inline bool tcpiiu::ca_v44_ok () const