diff --git a/src/ca/CASG.cpp b/src/ca/CASG.cpp index c2af2d4fb..cc32594ff 100644 --- a/src/ca/CASG.cpp +++ b/src/ca/CASG.cpp @@ -87,7 +87,7 @@ int CASG::block ( double timeout ) cur_time = epicsTime::getCurrent (); - this->client.flush (); + this->client.flushRequest (); beg_time = cur_time; delay = 0.0; diff --git a/src/ca/access.cpp b/src/ca/access.cpp index c681bad45..3ddbcd4d5 100644 --- a/src/ca/access.cpp +++ b/src/ca/access.cpp @@ -500,7 +500,7 @@ extern "C" int epicsShareAPI ca_flush_io () return caStatus; } - pcac->flush (); + pcac->flushRequest (); return ECA_NORMAL; } diff --git a/src/ca/baseNMIU.cpp b/src/ca/baseNMIU.cpp index 737a428fe..84825f6d7 100644 --- a/src/ca/baseNMIU.cpp +++ b/src/ca/baseNMIU.cpp @@ -25,18 +25,11 @@ baseNMIU::~baseNMIU () void baseNMIU::cancel () { - unsigned i = 0u; - while ( ! this->chan.getPIIU ()->uninstallIO ( *this ) ) { - if ( i++ > 1000u ) { - ca_printf ( "CAC: unable to destroy IO\n" ); - break; - } - } - this->ioCancelRequest (); + this->chan.uninstallIO ( *this ); delete this; } -class netSubscription * baseNMIU::isSubscription () +class netSubscription * baseNMIU::isSubscription () { return 0; } @@ -52,8 +45,4 @@ cacChannelIO & baseNMIU::channelIO () const return this->chan; } -void baseNMIU::ioCancelRequest () -{ -} - diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 9ae2ec69f..e41a75758 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -22,6 +22,9 @@ #include "comQueSend_IL.h" #include "recvProcessThread_IL.h" #include "netiiu_IL.h" +#include "netWriteNotifyIO_IL.h" +#include "netReadNotifyIO_IL.h" +#include "baseNMIU_IL.h" // // cac::cac () @@ -29,6 +32,7 @@ cac::cac ( bool enablePreemptiveCallbackIn ) : ipToAEngine ( "caIPAddrToAsciiEngine" ), chanTable ( 1024 ), + ioTable ( 1024 ), sgTable ( 128 ), beaconTable ( 1024 ), fdRegFunc ( 0 ), @@ -123,10 +127,13 @@ cac::~cac () this->pRecvProcThread->disable (); } - if ( this->pudpiiu ) { - // this blocks until the UDP thread exits so that - // it will not sneak in any new clients - this->pudpiiu->shutdown (); + { + epicsAutoMutex autoMutex ( this->defaultMutex ); + if ( this->pudpiiu ) { + // this blocks until the UDP thread exits so that + // it will not sneak in any new clients + this->pudpiiu->shutdown (); + } } // @@ -169,18 +176,17 @@ cac::~cac () delete this->pSearchTmr; } - if ( this->pudpiiu ) { + { + epicsAutoMutex autoMutex ( this->defaultMutex ); + if ( this->pudpiiu ) { - // - // make certain that the UDP thread isnt starting - // up new clients. this adds an additional - // requirement that threads - // - { - epicsAutoMutex autoMutex ( this->defaultMutex ); + // + // make certain that the UDP thread isnt starting + // up new clients. + // this->pudpiiu->disconnectAllChan ( limboIIU ); + delete this->pudpiiu; } - delete this->pudpiiu; } // @@ -192,6 +198,13 @@ cac::~cac () this->beaconTable.traverse ( &bhe::destroy ); + // if we get here and the IO is still attached then we have a + // leaked io block that was not registered with a channel. + if ( this->ioTable.numEntriesInstalled () ) { + this->printf ( "CAC %u orphaned IO items?\n", + this->ioTable.numEntriesInstalled () ); + } + osiSockRelease (); this->pTimerQueue->release (); @@ -244,10 +257,7 @@ void cac::processRecvBacklog () } } -/* - * cac::flush () - */ -void cac::flush () +void cac::flushRequest () { /* * set the push pending flag on all virtual circuits @@ -255,7 +265,7 @@ void cac::flush () epicsAutoMutex autoMutex ( this->iiuListMutex ); tsDLIterBD piiu = this->iiuList.firstIter (); while ( piiu.valid () ) { - piiu->flush (); + piiu->flushRequest (); piiu++; } } @@ -299,6 +309,8 @@ void cac::show ( unsigned level ) const this->programBeginTime.show ( level - 3u ); ::printf ( "Channel identifier hash table:\n" ); this->chanTable.show ( level - 3u ); + ::printf ( "IO identifier hash table:\n" ); + this->ioTable.show ( level - 3u ); ::printf ( "Synchronous group identifier hash table:\n" ); this->sgTable.show ( level - 3u ); ::printf ( "Beacon source identifier hash table:\n" ); @@ -426,7 +438,7 @@ int cac::pendIO ( const double &timeout ) this->enableCallbackPreemption (); - this->flush (); + this->flushRequest (); int status = ECA_NORMAL; epicsTime beg_time = epicsTime::getCurrent (); @@ -450,8 +462,11 @@ int cac::pendIO ( const double &timeout ) } this->ioCounter.cleanUp (); - if ( this->pudpiiu ) { - this->pudpiiu->connectTimeoutNotify (); + { + epicsAutoMutex autoMutex ( this->defaultMutex ); + if ( this->pudpiiu ) { + this->pudpiiu->connectTimeoutNotify (); + } } this->disableCallbackPreemption (); @@ -469,7 +484,7 @@ int cac::pendEvent ( const double &timeout ) this->enableCallbackPreemption (); - this->flush (); + this->flushRequest (); if ( timeout == 0.0 ) { while ( true ) { @@ -646,7 +661,7 @@ bool cac::setupUDP () return false; } - this->pSearchTmr = new searchTimer ( *this->pudpiiu, *this->pTimerQueue ); + this->pSearchTmr = new searchTimer ( *this->pudpiiu, *this->pTimerQueue, this->defaultMutex ); if ( ! this->pSearchTmr ) { delete this->pudpiiu; this->pudpiiu = 0; @@ -837,9 +852,7 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, piiu->attachChannel ( *chan ); chan->createChannelRequest (); - - // wake up send thread which ultimately sends the claim message - piiu->flush (); + piiu->flushRequest (); if ( ! piiu->ca_v42_ok () ) { chan->connect (); @@ -858,7 +871,9 @@ void cac::uninstallChannel ( nciu & chan ) epicsAutoMutex autoMutex ( this->defaultMutex ); nciu *pChan = this->chanTable.remove ( chan ); assert ( pChan = &chan ); - chan.getPIIU ()->detachChannel ( chan ); + this->flushIfRequired ( chan ); + chan.getPIIU()->clearChannelRequest ( chan ); + chan.getPIIU()->detachChannel ( chan ); } void cac::getFDRegCallback ( CAFDHANDLER *&fdRegFuncOut, void *&fdRegArgOut ) const @@ -888,5 +903,311 @@ int cac::printf ( const char *pformat, ... ) return status; } +// lock must be applied before calling this cac private routine +void cac::flushIfRequired ( nciu &chan ) +{ + if ( chan.getPIIU()->flushBlockThreshold() ) { + // the process thread is not permitted to flush as this + // can result in a push / pull deadlock on the TCP pipe. + // Instead, the process thread scheduals the flush with the + // send thread which runs at a higher priority than the + // send thread. The same applies to the UDP thread for + // locking hierarchy reasons. + bool flushPermit = true; + if ( this->pRecvProcThread ) { + if ( this->pRecvProcThread->isCurrentThread () ) { + flushPermit = false; + } + } + if ( this->pudpiiu ) { + if ( this->pudpiiu->isCurrentThread () ) { + flushPermit = false; + } + } + if ( flushPermit ) { + chan.getPIIU()->blockUntilSendBacklogIsReasonable ( this->defaultMutex ); + } + else { + this->flushRequest (); + } + } + else { + chan.getPIIU()->flushRequestIfAboveEarlyThreshold (); + } +} +int cac::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue ) +{ + epicsAutoMutex autoMutex ( this->defaultMutex ); + this->flushIfRequired ( chan ); + return chan.getPIIU()->writeRequest ( chan, type, nElem, pValue ); +} + +int cac::writeNotifyRequest ( nciu &chan, cacNotify ¬ify, unsigned type, unsigned nElem, const void *pValue ) +{ + epicsAutoMutex autoMutex ( this->defaultMutex ); + this->flushIfRequired ( chan ); + netWriteNotifyIO *pIO = new netWriteNotifyIO ( chan, notify ); + if ( pIO ) { + this->ioTable.add ( *pIO ); + chan.cacPrivateListOfIO::eventq.add ( *pIO ); + int status = chan.getPIIU()->writeNotifyRequest ( chan, *pIO, type, nElem, pValue ); + if ( status != ECA_NORMAL ) { + this->ioTable.remove ( *pIO ); + chan.cacPrivateListOfIO::eventq.remove ( *pIO ); + delete static_cast < baseNMIU * > ( pIO ); + } + return status; + } + else { + return ECA_ALLOCMEM; + } +} + +int cac::readNotifyRequest ( nciu &chan, cacNotify ¬ify, unsigned type, unsigned nElem ) +{ + epicsAutoMutex autoMutex ( this->defaultMutex ); + this->flushIfRequired ( chan ); + netReadNotifyIO *pIO = new netReadNotifyIO ( chan, notify ); + if ( pIO ) { + this->ioTable.add ( *pIO ); + chan.cacPrivateListOfIO::eventq.add ( *pIO ); + int status = chan.getPIIU()->readNotifyRequest ( chan, *pIO, type, nElem ); + if ( status != ECA_NORMAL ) { + this->ioTable.remove ( *pIO ); + chan.cacPrivateListOfIO::eventq.remove ( *pIO ); + delete static_cast < baseNMIU * > ( pIO ); + } + return status; + } + else { + return ECA_ALLOCMEM; + } +} + +bool cac::ioCompletionNotify ( unsigned id, unsigned type, + unsigned long count, const void *pData ) +{ + epicsAutoMutex autoMutex ( this->defaultMutex ); + baseNMIU * pmiu = this->ioTable.lookup ( id ); + if ( pmiu ) { + pmiu->notify ().completionNotify ( pmiu->channel (), type, count, pData ); + return true; + } + else { + return false; + } +} + +bool cac::ioExceptionNotify ( unsigned id, int status, const char *pContext ) +{ + epicsAutoMutex autoMutex ( this->defaultMutex ); + baseNMIU * pmiu = this->ioTable.lookup ( id ); + if ( pmiu ) { + pmiu->notify ().exceptionNotify ( pmiu->channel (), status, pContext ); + return true; + } + else { + return false; + } +} + +bool cac::ioExceptionNotify ( unsigned id, int status, + const char *pContext, unsigned type, unsigned long count ) +{ + epicsAutoMutex autoMutex ( this->defaultMutex ); + baseNMIU * pmiu = this->ioTable.lookup ( id ); + if ( pmiu ) { + pmiu->notify ().exceptionNotify ( pmiu->channel (), + status, pContext, type, count ); + return true; + } + else { + return false; + } +} + +bool cac::ioCompletionNotifyAndDestroy ( unsigned id ) +{ + baseNMIU * pmiu; + + { + epicsAutoMutex autoMutex ( this->defaultMutex ); + pmiu = this->ioTable.remove ( id ); + if ( pmiu ) { + pmiu->channel ().cacPrivateListOfIO::eventq.remove ( *pmiu ); + } + } + + if ( pmiu ) { + pmiu->notify ().completionNotify ( pmiu->channel () ); + delete pmiu; + return true; + } + else { + return false; + } +} + +bool cac::ioCompletionNotifyAndDestroy ( unsigned id, + unsigned type, unsigned long count, const void *pData ) +{ + baseNMIU * pmiu; + + { + epicsAutoMutex autoMutex ( this->defaultMutex ); + pmiu = this->ioTable.remove ( id ); + if ( pmiu ) { + pmiu->channel ().cacPrivateListOfIO::eventq.remove ( *pmiu ); + } + } + + if ( pmiu ) { + pmiu->notify ().completionNotify ( pmiu->channel (), type, count, pData ); + delete pmiu; + return true; + } + else { + return false; + } +} + +bool cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext ) +{ + baseNMIU * pmiu; + + { + epicsAutoMutex autoMutex ( this->defaultMutex ); + pmiu = this->ioTable.remove ( id ); + if ( pmiu ) { + pmiu->channel ().cacPrivateListOfIO::eventq.remove ( *pmiu ); + } + } + + if ( pmiu ) { + pmiu->notify ().exceptionNotify ( pmiu->channel (), status, pContext ); + delete pmiu; + return true; + } + else { + return false; + } +} + +bool cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, + const char *pContext, unsigned type, unsigned long count ) +{ + baseNMIU * pmiu; + + { + epicsAutoMutex autoMutex ( this->defaultMutex ); + pmiu = this->ioTable.remove ( id ); + if ( pmiu ) { + pmiu->channel ().cacPrivateListOfIO::eventq.remove ( *pmiu ); + } + } + + if ( pmiu ) { + pmiu->notify ().exceptionNotify ( pmiu->channel (), status, + pContext, type, count ); + delete pmiu; + return true; + } + else { + return false; + } +} + +// resubscribe for monitors from this channel +void cac::connectAllIO ( nciu &chan ) +{ + epicsAutoMutex autoMutex ( this->defaultMutex ); + tsDLIterBD < baseNMIU > pNetIO = + chan.cacPrivateListOfIO::eventq.firstIter (); + while ( pNetIO.valid () ) { + tsDLIterBD < baseNMIU > next = pNetIO; + next++; + class netSubscription *pSubscr = pNetIO->isSubscription (); + if ( pSubscr ) { + chan.getPIIU()->subscriptionRequest ( *pSubscr ); + } + else { + // it shouldnt be here at this point - so uninstall it + this->ioTable.remove ( *pNetIO ); + chan.cacPrivateListOfIO::eventq.remove ( *pNetIO ); + pNetIO->notify().exceptionNotify ( pNetIO->channel(), ECA_DISCONN, chan.pHostName() ); + delete pNetIO.pointer (); + } + pNetIO = next; + } + chan.getPIIU()->flushRequest (); +} + +// cancel IO operations and monitor subscriptions +void cac::disconnectAllIO ( nciu &chan ) +{ + epicsAutoMutex autoMutex ( this->defaultMutex ); + tsDLIterBD < baseNMIU > pNetIO = + chan.cacPrivateListOfIO::eventq.firstIter (); + while ( pNetIO.valid () ) { + tsDLIterBD < baseNMIU > next = pNetIO; + next++; + class netSubscription *pSubscr = pNetIO->isSubscription (); + this->ioTable.remove ( *pNetIO ); + if ( pSubscr ) { + chan.getPIIU()->subscriptionCancelRequest ( *pSubscr ); + } + else { + // no use after disconnected - so uninstall it + chan.cacPrivateListOfIO::eventq.remove ( *pNetIO ); + pNetIO->notify ().exceptionNotify ( pNetIO->channel (), ECA_DISCONN, chan.pHostName () ); + delete pNetIO.pointer (); + } + pNetIO = next; + } +} + +// +// care is taken to not hold the lock while deleting the +// IO so that subscription delete request (sent by the +// IO's destructor) do not deadlock +// +void cac::destroyAllIO ( nciu &chan ) +{ + tsDLList < baseNMIU > eventQ; + { + epicsAutoMutex autoMutex ( this->defaultMutex ); + while ( baseNMIU *pIO = eventQ.get () ) { + this->ioTable.remove ( *pIO ); + eventQ.add ( *pIO ); + } + } + while ( baseNMIU *pIO = eventQ.get () ) { + delete pIO; + } +} + +void cac::uninstallIO ( baseNMIU &io ) +{ + epicsAutoMutex autoMutex ( this->defaultMutex ); + baseNMIU *pIO = this->ioTable.remove ( io ); + assert ( &io == pIO ); + io.channel().cacPrivateListOfIO::eventq.remove ( io ); + netSubscription * pSubscr = io.isSubscription (); + if ( pSubscr ) { + this->flushIfRequired ( io.channel() ); + io.channel().getPIIU()->subscriptionCancelRequest ( *pSubscr ); + } +} + +void cac::installSubscription ( netSubscription &subscr ) +{ + epicsAutoMutex autoMutex ( this->defaultMutex ); + subscr.channel().cacPrivateListOfIO::eventq.add ( subscr ); + this->ioTable.add ( subscr ); + if ( subscr.channel().connected() ) { + this->flushIfRequired ( subscr.channel() ); + subscr.channel().getPIIU()->subscriptionRequest ( subscr ); + } +} diff --git a/src/ca/cac_IL.h b/src/ca/cac_IL.h index bb8c3a864..53e08cda9 100644 --- a/src/ca/cac_IL.h +++ b/src/ca/cac_IL.h @@ -46,30 +46,6 @@ inline unsigned cac::getInitializingThreadsPriority () const return this->initializingThreadsPriority; } -// the process thread is not permitted to flush as this -// can result in a push / pull deadlock on the TCP pipe. -// Instead, the process thread scheduals the flush with the -// send thread which runs at a higher priority than the -// send thread. The same applies to the UDP thread for -// locking hierarchy reasons. -// -// this is only called when we detect send queue quota -// exceeded -inline bool cac::flushPermit () const -{ - if ( this->pRecvProcThread ) { - if ( this->pRecvProcThread->isCurrentThread () ) { - return false; - } - } - if ( this->pudpiiu ) { - if ( this->pudpiiu->isCurrentThread () ) { - return false; - } - } - return true; -} - inline void cac::incrementOutstandingIO () { this->ioCounter.increment (); @@ -90,5 +66,11 @@ inline unsigned cac::sequenceNumberOfOutstandingIO () const return this->ioCounter.sequenceNumber (); } +inline epicsMutex & cac::mutex () +{ + return this->defaultMutex; +} + + #endif // cac_ILh diff --git a/src/ca/comBuf.cpp b/src/ca/comBuf.cpp index 98b92caf6..d87470566 100644 --- a/src/ca/comBuf.cpp +++ b/src/ca/comBuf.cpp @@ -24,7 +24,6 @@ bool comBuf::flushToWire ( wireSendAdapter &wire ) unsigned nBytes = wire.sendBytes ( &this->buf[this->nextReadIndex], occupied ); if ( nBytes == 0u ) { - this->nextReadIndex = this->nextWriteIndex; return false; } this->nextReadIndex += nBytes; diff --git a/src/ca/comQueSend_IL.h b/src/ca/comQueSend_IL.h index df44b0e7c..e27ba5684 100644 --- a/src/ca/comQueSend_IL.h +++ b/src/ca/comQueSend_IL.h @@ -179,7 +179,12 @@ inline unsigned comQueSend::occupiedBytes () const return this->nBytesPending; } -inline bool comQueSend::flushThreshold ( unsigned nBytesThisMsg ) const +inline bool comQueSend::flushBlockThreshold ( unsigned nBytesThisMsg ) const +{ + return ( this->nBytesPending + nBytesThisMsg > 16 * comBuf::capacityBytes () ); +} + +inline bool comQueSend::flushEarlyThreshold ( unsigned nBytesThisMsg ) const { return ( this->nBytesPending + nBytesThisMsg > 4 * comBuf::capacityBytes () ); } diff --git a/src/ca/iocinf.h b/src/ca/iocinf.h index d2a824a99..742761d7a 100644 --- a/src/ca/iocinf.h +++ b/src/ca/iocinf.h @@ -166,31 +166,26 @@ public: void clear (); int reserveSpace ( unsigned msgSize ); unsigned occupiedBytes () const; - bool flushThreshold ( unsigned nBytesThisMsg ) const; + bool flushEarlyThreshold ( unsigned nBytesThisMsg ) const; + bool flushBlockThreshold ( unsigned nBytesThisMsg ) const; bool dbr_type_ok ( unsigned type ); - void pushUInt16 ( const ca_uint16_t value ); void pushUInt32 ( const ca_uint32_t value ); void pushFloat32 ( const ca_float32_t value ); void pushString ( const char *pVal, unsigned nElem ); void push_dbr_type ( unsigned type, const void *pVal, unsigned nElem ); - comBuf * popNextComBufToSend (); - private: - + wireSendAdapter & wire; + tsDLList < comBuf > bufs; + bufferReservoir reservoir; + unsigned nBytesPending; void copy_dbr_string ( const void *pValue, unsigned nElem ); void copy_dbr_short ( const void *pValue, unsigned nElem ); void copy_dbr_float ( const void *pValue, unsigned nElem ); void copy_dbr_char ( const void *pValue, unsigned nElem ); void copy_dbr_long ( const void *pValue, unsigned nElem ); void copy_dbr_double ( const void *pValue, unsigned nElem ); - - wireSendAdapter & wire; - tsDLList < comBuf > bufs; - bufferReservoir reservoir; - unsigned nBytesPending; - typedef void ( comQueSend::*copyFunc_t ) ( const void *pValue, unsigned nElem ); static const copyFunc_t dbrCopyVector [39]; @@ -220,29 +215,20 @@ public: }; class netiiu; - -class nciuPrivate { -private: - epicsMutex mutex; - - friend class nciu; -}; - class tcpiiu; class baseNMIU; // // fields in class nciu which really belong to tcpiiu // -class tcpiiuPrivateListOfIO { +class cacPrivateListOfIO { private: tsDLList < class baseNMIU > eventq; - friend tcpiiu; - friend netiiu; // used to install subscriptions when not connected + friend class cac; }; class nciu : public cacChannelIO, public tsDLNode < nciu >, - public chronIntIdRes < nciu >, public tcpiiuPrivateListOfIO { + public chronIntIdRes < nciu >, public cacPrivateListOfIO { public: nciu ( class cac &, netiiu &, cacChannelNotify &, const char *pNameIn ); @@ -267,13 +253,13 @@ public: void searchReplySetUp ( netiiu &iiu, unsigned sidIn, unsigned typeIn, unsigned long countIn ); void show ( unsigned level ) const; - bool verifyIIU ( netiiu & ); - bool verifyConnected ( netiiu & ); void connectTimeoutNotify (); - unsigned long nativeElementCount () const; const char *pName () const; unsigned nameLen () const; + const char * pHostName () const; // deprecated - please do not use + unsigned long nativeElementCount () const; bool connected () const; + void uninstallIO ( baseNMIU &io ); protected: ~nciu (); // force pool allocation private: @@ -303,14 +289,13 @@ private: int subscribe ( unsigned type, unsigned long nElem, unsigned mask, cacNotify ¬ify, cacNotifyIO *&pNotifyIO ); - void hostName ( char *pBuf, unsigned bufLength ) const; - bool ca_v42_ok () const; short nativeType () const; channel_state state () const; caar accessRights () const; unsigned searchAttempts () const; double beaconPeriod () const; - const char * pHostName () const; // deprecated - please do not use + bool ca_v42_ok () const; + void hostName ( char *pBuf, unsigned bufLength ) const; void notifyStateChangeFirstConnectInCountOfOutstandingIO (); static tsFreeList < class nciu, 1024 > freeList; static epicsMutex freeListMutex; @@ -322,7 +307,6 @@ public: baseNMIU ( cacNotify ¬ifyIn, nciu &chan ); virtual ~baseNMIU () = 0; virtual class netSubscription * isSubscription (); - virtual void ioCancelRequest (); void show ( unsigned level ) const; ca_uint32_t getID () const; nciu & channel () const; @@ -349,31 +333,10 @@ private: const unsigned type; const unsigned mask; class netSubscription * isSubscription (); - void ioCancelRequest (); static tsFreeList < class netSubscription, 1024 > freeList; static epicsMutex freeListMutex; }; -#if 0 -class netReadCopyIO : public baseNMIU { -public: - netReadCopyIO ( nciu &chan, unsigned type, unsigned long count, - void *pValue, unsigned seqNumber, cacNotify ¬ifyIn ); - void show ( unsigned level ) const; - void * operator new ( size_t size ); - void operator delete ( void *pCadaver, size_t size ); -protected: - ~netReadCopyIO (); // must be allocated from pool -private: - unsigned type; - unsigned long count; - void *pValue; - unsigned seqNumber; - static tsFreeList < class netReadCopyIO, 1024 > freeList; - static epicsMutex freeListMutex; -}; -#endif - class netReadNotifyIO : public baseNMIU { public: netReadNotifyIO ( nciu &chan, cacNotify ¬ify ); @@ -458,7 +421,6 @@ public: void resetChannelRetryCounts (); void attachChannel ( nciu &chan ); void detachChannel ( nciu &chan ); - void installSubscription ( netSubscription &subscr ); virtual void hostName (char *pBuf, unsigned bufLength) const; virtual const char * pHostName () const; // deprecated - please do not use virtual bool isVirtaulCircuit ( const char *pChannelName, const osiSockAddr &addr ) const; @@ -466,24 +428,25 @@ public: virtual bool ca_v41_ok () const; virtual bool pushDatagramMsg ( const caHdr &hdr, const void *pExt, ca_uint16_t extsize); virtual int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue); - virtual int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue ); - virtual int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem ); + virtual int writeNotifyRequest ( nciu &, netWriteNotifyIO &, unsigned type, unsigned nElem, const void *pValue ); + virtual int readNotifyRequest ( nciu &, netReadNotifyIO &, unsigned type, unsigned nElem ); virtual int createChannelRequest ( nciu & ); virtual void connectAllIO ( nciu &chan ); virtual void disconnectAllIO ( nciu &chan ); virtual int clearChannelRequest ( nciu & ); - virtual bool uninstallIO ( baseNMIU & ); - virtual void subscriptionCancelRequest ( netSubscription &subscr, bool userThread ); + virtual void subscriptionRequest ( netSubscription &subscr ); + virtual void subscriptionCancelRequest ( netSubscription &subscr ); virtual double beaconPeriod () const; - virtual bool destroyAllIO ( nciu &chan ); + virtual void flushRequest (); + virtual bool flushBlockThreshold () const; + virtual void flushRequestIfAboveEarlyThreshold (); + virtual void blockUntilSendBacklogIsReasonable ( epicsMutex & ); protected: cac * pCAC () const; - mutable epicsMutex mutex; private: tsDLList < nciu > channelList; class cac *pClientCtx; virtual void lastChannelDetachNotify (); - virtual void subscriptionRequest ( netSubscription &subscr, bool userThread ); }; class limboiiu : public netiiu { @@ -497,14 +460,14 @@ class udpiiu; class searchTimer : private epicsTimerNotify { public: - searchTimer ( udpiiu &iiu, epicsTimerQueue &queue ); + searchTimer ( udpiiu &iiu, epicsTimerQueue &queue, epicsMutex & ); virtual ~searchTimer (); void notifySearchResponse ( unsigned short retrySeqNo ); void resetPeriod ( double delayToNextTry ); void show ( unsigned level ) const; private: epicsTimer &timer; - epicsMutex mutex; + epicsMutex &mutex; udpiiu &iiu; unsigned framesPerTry; /* # of UDP frames per search try */ unsigned framesPerTryCongestThresh; /* one half N tries w congest */ @@ -525,7 +488,6 @@ public: virtual ~repeaterSubscribeTimer (); void confirmNotify (); void show (unsigned level) const; - private: epicsTimer &timer; udpiiu &iiu; @@ -549,7 +511,7 @@ public: void postMsg ( const osiSockAddr &net_addr, char *pInBuf, unsigned long blockSize ); void repeaterRegistrationMessage ( unsigned attemptNumber ); - void flush (); + void datagramFlush (); unsigned getPort () const; void show ( unsigned level ) const; bool isCurrentThread () const; @@ -573,8 +535,6 @@ private: bool pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t extsize ); - friend void cacRecvThreadUDP ( void *pParam ); - typedef bool (udpiiu::*pProtoStubUDP) ( const caHdr &, const osiSockAddr & ); // UDP protocol dispatch table @@ -588,6 +548,8 @@ private: bool beaconAction ( const caHdr &msg, const osiSockAddr &net_addr ); bool notHereRespAction ( const caHdr &msg, const osiSockAddr &net_addr ); bool repeaterAckAction ( const caHdr &msg, const osiSockAddr &net_addr ); + + friend void cacRecvThreadUDP ( void *pParam ); }; class tcpRecvWatchdog : private epicsTimerNotify { @@ -679,9 +641,11 @@ public: void beaconArrivalNotify (); bool fullyConstructed () const; - void flush (); + void flushRequest (); + bool flushBlockThreshold () const; + void flushRequestIfAboveEarlyThreshold (); + void blockUntilSendBacklogIsReasonable ( epicsMutex & ); virtual void show ( unsigned level ) const; - //osiSockAddr address () const; bool setEchoRequestPending (); void processIncoming (); @@ -698,8 +662,6 @@ public: private: tcpRecvWatchdog recvDog; tcpSendWatchdog sendDog; - epicsMutex flushMutex; // only one thread flushes at a time - chronIntIdResTable < baseNMIU > ioTable; comQueSend sendQue; comQueRecv recvQue; osiSockAddr addr; @@ -714,8 +676,10 @@ private: epicsEventId recvThreadRingBufferSpaceAvailableSignal; epicsEventId sendThreadExitSignal; epicsEventId recvThreadExitSignal; + epicsEventId flushBlockSignal; SOCKET sock; unsigned contigRecvMsgCount; + unsigned blockingForFlush; bool fullyConstructedFlag; bool busyStateDetected; // only modified by the recv thread bool flowControlActive; // only modified by the send process thread @@ -723,11 +687,10 @@ private: bool msgHeaderAvailable; bool sockCloseCompleted; bool fdRegCallbackNeeded; + bool earlyFlush; unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf ); unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf ); - bool flushToWire ( bool userThread ); - bool threadContextSensitiveFlushToWire ( bool userThread ); friend void cacSendThreadTCP ( void *pParam ); friend void cacRecvThreadTCP ( void *pParam ); @@ -742,10 +705,12 @@ private: int hostNameSetRequest (); int userNameSetRequest (); int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue ); - int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue ); - int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem ); + int writeNotifyRequest ( nciu &, netWriteNotifyIO &, unsigned type, unsigned nElem, const void *pValue ); + int readNotifyRequest ( nciu &, netReadNotifyIO &, unsigned type, unsigned nElem ); int createChannelRequest ( nciu & ); int clearChannelRequest ( nciu & ); + void subscriptionRequest ( netSubscription &subscr ); + void subscriptionCancelRequest ( netSubscription &subscr ); // recv protocol stubs bool noopAction (); @@ -761,27 +726,7 @@ private: bool verifyAndDisconnectChan (); bool badTCPRespAction (); - // IO management routines - bool ioCompletionNotify ( unsigned id, unsigned type, - unsigned long count, const void *pData ); - bool ioExceptionNotify ( unsigned id, - int status, const char *pContext ); - bool ioExceptionNotify ( unsigned id, int status, - const char *pContext, unsigned type, unsigned long count ); - bool ioCompletionNotifyAndDestroy ( unsigned id ); - bool ioCompletionNotifyAndDestroy ( unsigned id, - unsigned type, unsigned long count, const void *pData ); - bool ioExceptionNotifyAndDestroy ( unsigned id, - int status, const char *pContext ); - bool ioExceptionNotifyAndDestroy ( unsigned id, - int status, const char *pContext, unsigned type, unsigned long count ); - void connectAllIO ( nciu &chan ); - void disconnectAllIO ( nciu &chan ); - bool uninstallIO ( baseNMIU & ); - bool destroyAllIO ( nciu &chan ); - - void subscriptionRequest ( netSubscription &subscr, bool userThread ); - void subscriptionCancelRequest ( netSubscription &subscr, bool userThread ); + bool flush (); // only to be called by the send thread typedef bool ( tcpiiu::*pProtoStubTCP ) (); static const pProtoStubTCP tcpJumpTableCAC []; @@ -963,7 +908,7 @@ private: // w/o taking the defaultMutex in this class first // // -class cac : public caClient, public nciuPrivate +class cac : public caClient { public: cac ( bool enablePreemptiveCallback = false ); @@ -985,10 +930,27 @@ public: bool ioComplete () const; // IO management - void flush (); - bool flushPermit () const; + void flushRequest (); int pendIO ( const double &timeout ); int pendEvent ( const double &timeout ); + void uninstallIO ( baseNMIU &io ); + bool ioCompletionNotify ( unsigned id, unsigned type, + unsigned long count, const void *pData ); + bool ioExceptionNotify ( unsigned id, + int status, const char *pContext ); + bool ioExceptionNotify ( unsigned id, int status, + const char *pContext, unsigned type, unsigned long count ); + bool ioCompletionNotifyAndDestroy ( unsigned id ); + bool ioCompletionNotifyAndDestroy ( unsigned id, + unsigned type, unsigned long count, const void *pData ); + bool ioExceptionNotifyAndDestroy ( unsigned id, + int status, const char *pContext ); + bool ioExceptionNotifyAndDestroy ( unsigned id, + int status, const char *pContext, unsigned type, unsigned long count ); + void connectAllIO ( nciu &chan ); + void disconnectAllIO ( nciu &chan ); + void destroyAllIO ( nciu &chan ); + void installSubscription ( netSubscription &subscr ); // exception routines void exceptionNotify ( int status, const char *pContext, @@ -1013,6 +975,11 @@ public: cacChannelIO * createChannelIO ( const char *name_str, cacChannelNotify &chan ); void registerService ( cacServiceIO &service ); + // IO request stubs + int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue ); + int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue ); + int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem ); + // sync group routines CASG * lookupCASG ( unsigned id ); void installCASG ( CASG & ); @@ -1038,6 +1005,8 @@ public: const char * userNamePointer () const; unsigned getInitializingThreadsPriority () const; + epicsMutex & mutex (); + private: ioCounterNet ioCounter; ipAddrToAsciiEngine ipToAEngine; @@ -1046,6 +1015,8 @@ private: tsDLList iiuListLimbo; chronIntIdResTable < nciu > chanTable; + chronIntIdResTable + < baseNMIU > ioTable; chronIntIdResTable < CASG > sgTable; resTable @@ -1070,6 +1041,7 @@ private: unsigned initializingThreadsPriority; bool enablePreemptiveCallback; bool setupUDP (); + void flushIfRequired ( nciu & ); // lock must be applied }; /* diff --git a/src/ca/nciu.cpp b/src/ca/nciu.cpp index 48c1cc798..b13268e53 100644 --- a/src/ca/nciu.cpp +++ b/src/ca/nciu.cpp @@ -71,15 +71,7 @@ nciu::~nciu () } // care is taken so that a lock is not applied during this phase - unsigned i = 0u; - while ( ! this->piiu->destroyAllIO ( *this ) ) { - if ( i++ > 1000u ) { - this->cacCtx.printf ( - "CAC: unable to destroy IO when channel destroyed?\n" ); - break; - } - } - this->piiu->clearChannelRequest ( *this ); + this->cacCtx.destroyAllIO ( *this ); this->cacCtx.uninstallChannel ( *this ); if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) { @@ -91,6 +83,155 @@ nciu::~nciu () delete [] this->pNameStr; } +void nciu::connect ( unsigned nativeType, + unsigned long nativeCount, unsigned sidIn ) +{ + bool v41Ok; + + if ( ! this->f_claimSent ) { + ca_printf ( + "CAC: Ignored conn resp to chan lacking virtual circuit CID=%u SID=%u?\n", + this->getId (), sidIn ); + return; + } + + if ( this->f_connected ) { + ca_printf ( + "CAC: Ignored conn resp to conn chan CID=%u SID=%u?\n", + this->getId (), sidIn ); + return; + } + + + if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) { + if ( this->f_firstConnectDecrementsOutstandingIO ) { + this->cacCtx.decrementOutstandingIO (); + } + } + + if ( this->piiu ) { + v41Ok = this->piiu->ca_v41_ok (); + } + else { + v41Ok = false; + } + this->typeCode = nativeType; + this->count = nativeCount; + this->sid = sidIn; + this->f_connected = true; + this->f_previousConn = true; + + /* + * if less than v4.1 then the server will never + * send access rights and we know that there + * will always be access + */ + if ( ! v41Ok ) { + this->accessRightState.read_access = true; + this->accessRightState.write_access = true; + } + + // resubscribe for monitors from this channel + this->cacCtx.connectAllIO ( *this ); + + this->notify().connectNotify ( *this ); + + /* + * if less than v4.1 then the server will never + * send access rights and we know that there + * will always be access and also need to call + * their call back here + */ + if ( ! v41Ok ) { + this->notify ().accessRightsNotify ( *this, this->accessRightState ); + } +} + +void nciu::disconnect ( netiiu &newiiu ) +{ + bool wasConnected; + + this->piiu->disconnectAllIO ( *this ); + + this->piiu = &newiiu; + this->retry = 0u; + this->typeCode = USHRT_MAX; + this->count = 0u; + this->sid = UINT_MAX; + this->accessRightState.read_access = false; + this->accessRightState.write_access = false; + this->f_claimSent = false; + + if ( this->f_connected ) { + wasConnected = true; + } + else { + wasConnected = false; + } + this->f_connected = false; + + if ( wasConnected ) { + /* + * look for events that have an event cancel in progress + */ + this->notify ().disconnectNotify ( *this ); + this->notify ().accessRightsNotify ( *this, this->accessRightState ); + } + + this->resetRetryCount (); +} + +/* + * nciu::searchMsg () + */ +bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel ) +{ + caHdr msg; + bool success; + + msg.m_cmmd = htons ( CA_PROTO_SEARCH ); + msg.m_available = this->getId (); + msg.m_dataType = htons ( DONTREPLY ); + msg.m_count = htons ( CA_MINOR_VERSION ); + msg.m_cid = this->getId (); + + success = this->piiu->pushDatagramMsg ( msg, + this->pNameStr, this->nameLength ); + if ( success ) { + // + // increment the number of times we have tried + // to find this channel + // + if ( this->retry < MAXCONNTRIES ) { + this->retry++; + } + this->retrySeqNo = retrySeqNumber; + retryNoForThisChannel = this->retry; + } + + return success; +} + + +const char *nciu::pName () const +{ + return this->pNameStr; +} + +unsigned nciu::nameLen () const +{ + return this->nameLength; +} + +int nciu::createChannelRequest () +{ + int status = this->piiu->createChannelRequest ( *this ); + if ( status == ECA_NORMAL ) { + this->f_claimSent = true; + } + return status; +} + int nciu::read ( unsigned type, unsigned long countIn, cacNotify ¬ify ) { // @@ -112,13 +253,13 @@ int nciu::read ( unsigned type, unsigned long countIn, cacNotify ¬ify ) countIn = this->count; } - return this->piiu->readNotifyRequest ( *this, notify, type, countIn ); + return this->cacCtx.readNotifyRequest ( *this, notify, type, countIn ); } /* * check_a_dbr_string() */ -LOCAL int check_a_dbr_string ( const char *pStr, const unsigned count ) +static int check_a_dbr_string ( const char *pStr, const unsigned count ) { for ( unsigned i = 0; i < count; i++ ) { unsigned int strsize = 0; @@ -155,7 +296,7 @@ int nciu::write ( unsigned type, unsigned long countIn, const void *pValue ) } } - return this->piiu->writeRequest ( *this, type, countIn, pValue ); + return this->cacCtx.writeRequest ( *this, type, countIn, pValue ); } int nciu::write ( unsigned type, unsigned long countIn, const void *pValue, cacNotify ¬ify ) @@ -180,264 +321,7 @@ int nciu::write ( unsigned type, unsigned long countIn, const void *pValue, cacN } } - return this->piiu->writeNotifyRequest ( *this, notify, type, countIn, pValue ); -} - -void nciu::initiateConnect () -{ - this->notifyStateChangeFirstConnectInCountOfOutstandingIO (); - this->cacCtx.installNetworkChannel ( *this, this->piiu ); -} - -void nciu::connect ( unsigned nativeType, - unsigned long nativeCount, unsigned sidIn ) -{ - bool v41Ok; - { - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - - if ( ! this->f_claimSent ) { - ca_printf ( - "CAC: Ignored conn resp to chan lacking virtual circuit CID=%u SID=%u?\n", - this->getId (), sidIn ); - return; - } - - if ( this->f_connected ) { - ca_printf ( - "CAC: Ignored conn resp to conn chan CID=%u SID=%u?\n", - this->getId (), sidIn ); - return; - } - - - if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) { - if ( this->f_firstConnectDecrementsOutstandingIO ) { - this->cacCtx.decrementOutstandingIO (); - } - } - - if ( this->piiu ) { - v41Ok = this->piiu->ca_v41_ok (); - } - else { - v41Ok = false; - } - this->typeCode = nativeType; - this->count = nativeCount; - this->sid = sidIn; - this->f_connected = true; - this->f_previousConn = true; - - /* - * if less than v4.1 then the server will never - * send access rights and we know that there - * will always be access - */ - if ( ! v41Ok ) { - this->accessRightState.read_access = true; - this->accessRightState.write_access = true; - } - } - - // resubscribe for monitors from this channel - this->piiu->connectAllIO ( *this ); - - this->notify ().connectNotify ( *this ); - - /* - * if less than v4.1 then the server will never - * send access rights and we know that there - * will always be access and also need to call - * their call back here - */ - if ( ! v41Ok ) { - this->notify ().accessRightsNotify ( *this, this->accessRightState ); - } -} - -void nciu::connectTimeoutNotify () -{ - if ( ! this->f_connectTimeOutSeen ) { - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - this->f_connectTimeOutSeen = true; - } -} - -void nciu::disconnect ( netiiu &newiiu ) -{ - bool wasConnected; - - this->piiu->disconnectAllIO ( *this ); - - { - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - - this->piiu = &newiiu; - this->retry = 0u; - this->typeCode = USHRT_MAX; - this->count = 0u; - this->sid = UINT_MAX; - this->accessRightState.read_access = false; - this->accessRightState.write_access = false; - this->f_claimSent = false; - - if ( this->f_connected ) { - wasConnected = true; - } - else { - wasConnected = false; - } - this->f_connected = false; - } - - if ( wasConnected ) { - /* - * look for events that have an event cancel in progress - */ - this->notify ().disconnectNotify ( *this ); - this->notify ().accessRightsNotify ( *this, this->accessRightState ); - } - - this->resetRetryCount (); -} - -/* - * nciu::searchMsg () - */ -bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel ) -{ - caHdr msg; - bool success; - - msg.m_cmmd = htons ( CA_PROTO_SEARCH ); - msg.m_available = this->getId (); - msg.m_dataType = htons ( DONTREPLY ); - msg.m_count = htons ( CA_MINOR_VERSION ); - msg.m_cid = this->getId (); - - success = this->piiu->pushDatagramMsg ( msg, - this->pNameStr, this->nameLength ); - if ( success ) { - // - // increment the number of times we have tried - // to find this channel - // - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - if ( this->retry < MAXCONNTRIES ) { - this->retry++; - } - this->retrySeqNo = retrySeqNumber; - retryNoForThisChannel = this->retry; - } - - return success; -} - -void nciu::hostName ( char *pBuf, unsigned bufLength ) const -{ - this->piiu->hostName ( pBuf, bufLength ); -} - -// deprecated - please do not use, this is _not_ thread safe -const char * nciu::pHostName () const -{ - return this->piiu->pHostName (); // ouch ! -} - -bool nciu::ca_v42_ok () const -{ - return this->piiu->ca_v42_ok (); -} - -short nciu::nativeType () const -{ - short type; - - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - if ( this->f_connected ) { - if ( this->typeCode < SHRT_MAX ) { - type = static_cast ( this->typeCode ); - } - else { - type = TYPENOTCONN; - } - } - else { - type = TYPENOTCONN; - } - - return type; -} - -unsigned long nciu::nativeElementCount () const -{ - unsigned long countOut; - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - - if ( this->f_connected ) { - countOut = this->count; - } - else { - countOut = 0ul; - } - - return countOut; -} - -channel_state nciu::state () const -{ - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - channel_state stateOut; - - if ( this->f_connected ) { - stateOut = cs_conn; - } - else if ( this->f_previousConn ) { - stateOut = cs_prev_conn; - } - else { - stateOut = cs_never_conn; - } - - return stateOut; -} - -caar nciu::accessRights () const -{ - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - caar tmp = this->accessRightState; - return tmp; -} - -const char *nciu::pName () const -{ - return this->pNameStr; -} - -unsigned nciu::nameLen () const -{ - return this->nameLength; -} - -unsigned nciu::searchAttempts () const -{ - return this->retry; -} - -double nciu::beaconPeriod () const -{ - return this->piiu->beaconPeriod (); -} - -int nciu::createChannelRequest () -{ - int status = this->piiu->createChannelRequest ( *this ); - if ( status == ECA_NORMAL ) { - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - this->f_claimSent = true; - } - return status; + return this->cacCtx.writeNotifyRequest ( *this, notify, type, countIn, pValue ); } int nciu::subscribe ( unsigned type, unsigned long nElem, @@ -455,7 +339,7 @@ int nciu::subscribe ( unsigned type, unsigned long nElem, netSubscription *pSubcr = new netSubscription ( *this, type, nElem, mask, notify ); if ( pSubcr ) { - this->piiu->installSubscription ( *pSubcr ); + this->cacCtx.installSubscription ( *pSubcr ); pNotifyIO = pSubcr; return ECA_NORMAL;; } @@ -464,9 +348,100 @@ int nciu::subscribe ( unsigned type, unsigned long nElem, } } +void nciu::initiateConnect () +{ + this->notifyStateChangeFirstConnectInCountOfOutstandingIO (); + this->cacCtx.installNetworkChannel ( *this, this->piiu ); +} + +void nciu::hostName ( char *pBuf, unsigned bufLength ) const +{ + epicsAutoMutex locker ( this->cacCtx.mutex() ); + this->piiu->hostName ( pBuf, bufLength ); +} + +// deprecated - please do not use, this is _not_ thread safe +const char * nciu::pHostName () const +{ + epicsAutoMutex locker ( this->cacCtx.mutex() ); + return this->piiu->pHostName (); // ouch ! +} + +bool nciu::ca_v42_ok () const +{ + epicsAutoMutex locker ( this->cacCtx.mutex() ); + return this->piiu->ca_v42_ok (); +} + +short nciu::nativeType () const +{ + epicsAutoMutex locker ( this->cacCtx.mutex() ); + short type; + if ( this->f_connected ) { + if ( this->typeCode < SHRT_MAX ) { + type = static_cast ( this->typeCode ); + } + else { + type = TYPENOTCONN; + } + } + else { + type = TYPENOTCONN; + } + return type; +} + +unsigned long nciu::nativeElementCount () const +{ + epicsAutoMutex locker ( this->cacCtx.mutex() ); + unsigned long countOut; + if ( this->f_connected ) { + countOut = this->count; + } + else { + countOut = 0ul; + } + return countOut; +} + +channel_state nciu::state () const +{ + epicsAutoMutex locker ( this->cacCtx.mutex() ); + channel_state stateOut; + if ( this->f_connected ) { + stateOut = cs_conn; + } + else if ( this->f_previousConn ) { + stateOut = cs_prev_conn; + } + else { + stateOut = cs_never_conn; + } + return stateOut; +} + +caar nciu::accessRights () const +{ + epicsAutoMutex locker ( this->cacCtx.mutex() ); + caar tmp = this->accessRightState; + return tmp; +} + +unsigned nciu::searchAttempts () const +{ + epicsAutoMutex locker ( this->cacCtx.mutex() ); + return this->retry; +} + +double nciu::beaconPeriod () const +{ + epicsAutoMutex locker ( this->cacCtx.mutex() ); + return this->piiu->beaconPeriod (); +} + void nciu::notifyStateChangeFirstConnectInCountOfOutstandingIO () { - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); + epicsAutoMutex locker ( this->cacCtx.mutex() ); // test is performed via a callback so that locking is correct if ( ! this->f_connectTimeOutSeen && ! this->f_previousConn ) { if ( this->notify ().includeFirstConnectInCountOfOutstandingIO () ) { @@ -486,6 +461,7 @@ void nciu::notifyStateChangeFirstConnectInCountOfOutstandingIO () void nciu::show ( unsigned level ) const { + epicsAutoMutex locker ( this->cacCtx.mutex() ); if ( this->f_connected ) { char hostNameTmp [256]; this->hostName ( hostNameTmp, sizeof ( hostNameTmp ) ); @@ -520,3 +496,4 @@ void nciu::show ( unsigned level ) const } + diff --git a/src/ca/nciu_IL.h b/src/ca/nciu_IL.h index aae46f2db..b4e403105 100644 --- a/src/ca/nciu_IL.h +++ b/src/ca/nciu_IL.h @@ -49,10 +49,7 @@ inline void nciu::resetRetryCount () inline void nciu::accessRightsStateChange ( const caar &arIn ) { - { - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); - this->accessRightState = arIn; - } + this->accessRightState = arIn; this->notify ().accessRightsNotify ( *this, arIn ); } @@ -66,23 +63,6 @@ inline ca_uint32_t nciu::getCID () const return this->id; } - -// -// this routine is used to verify that the channel's -// iiu pointer and connection state has not changed -// before the iiu's mutex was applied -// - -inline bool nciu::verifyConnected ( netiiu &iiuIn ) -{ - return ( this->piiu == &iiuIn ) && this->f_connected; -} - -inline bool nciu::verifyIIU ( netiiu &iiuIn ) -{ - return ( this->piiu == &iiuIn ); -} - inline unsigned nciu::getRetrySeqNo () const { return this->retrySeqNo; @@ -97,7 +77,6 @@ inline void nciu::connect () inline void nciu::searchReplySetUp ( netiiu &iiu, unsigned sidIn, unsigned typeIn, unsigned long countIn ) { - epicsAutoMutex locker ( this->cacCtx.nciuPrivate::mutex ); this->piiu = &iiu; this->typeCode = typeIn; this->count = countIn; @@ -121,4 +100,17 @@ inline netiiu * nciu::getPIIU () return this->piiu; } +inline void nciu::uninstallIO ( baseNMIU &io ) +{ + this->cacCtx.uninstallIO ( io ); +} + +inline void nciu::connectTimeoutNotify () +{ + this->f_connectTimeOutSeen = true; +} + + + + diff --git a/src/ca/netSubscription.cpp b/src/ca/netSubscription.cpp index 7139bf454..3aa70103d 100644 --- a/src/ca/netSubscription.cpp +++ b/src/ca/netSubscription.cpp @@ -34,11 +34,6 @@ class netSubscription * netSubscription::isSubscription () return this; } -void netSubscription::ioCancelRequest () -{ - this->chan.getPIIU ()->subscriptionCancelRequest ( *this, true ); -} - void netSubscription::show ( unsigned level ) const { printf ( "event subscription IO at %p, type %s, element count %lu, mask %u\n", diff --git a/src/ca/netiiu.cpp b/src/ca/netiiu.cpp index 5b1b36ac4..022a1b27c 100644 --- a/src/ca/netiiu.cpp +++ b/src/ca/netiiu.cpp @@ -28,8 +28,6 @@ netiiu::~netiiu () void netiiu::show ( unsigned level ) const { - epicsAutoMutex autoMutex ( this->mutex ); - printf ( "network IO base class\n" ); if ( level > 1 ) { tsDLIterConstBD < nciu > pChan = this->channelList.firstIter (); @@ -41,26 +39,6 @@ void netiiu::show ( unsigned level ) const if ( level > 2u ) { printf ( "\tcac pointer %p\n", static_cast ( this->pClientCtx ) ); - this->mutex.show ( level - 2u ); - } -} - -// cac lock must also be applied when -// calling this -void netiiu::attachChannel ( nciu &chan ) -{ - epicsAutoMutex autoMutex ( this->mutex ); - this->channelList.add ( chan ); -} - -// cac lock must also be applied when -// calling this -void netiiu::detachChannel ( nciu &chan ) -{ - epicsAutoMutex autoMutex ( this->mutex ); - this->channelList.remove ( chan ); - if ( this->channelList.count () == 0u ) { - this->lastChannelDetachNotify (); } } @@ -68,56 +46,20 @@ void netiiu::detachChannel ( nciu &chan ) // calling this void netiiu::disconnectAllChan ( netiiu & newiiu ) { - tsDLList < nciu > list; - - { - epicsAutoMutex autoMutex ( this->mutex ); - tsDLIterBD < nciu > chan = this->channelList.firstIter (); - while ( chan.valid () ) { - tsDLIterBD < nciu > next = chan; - next++; - this->clearChannelRequest ( *chan ); - this->channelList.remove ( *chan ); - chan->disconnect ( newiiu ); - list.add ( *chan ); - chan = next; - } + tsDLIterBD < nciu > chan = this->channelList.firstIter (); + while ( chan.valid () ) { + tsDLIterBD < nciu > next = chan; + next++; + this->clearChannelRequest ( *chan ); + this->channelList.remove ( *chan ); + chan->disconnect ( newiiu ); + newiiu.channelList.add ( *chan ); + chan = next; } - - { - epicsAutoMutex autoMutex ( newiiu.mutex ); - newiiu.channelList.add ( list ); - } -} - -// -// netiiu::destroyAllIO () -// -// care is taken to not hold the lock while deleting the -// IO so that subscription delete request (sent by the -// IO's destructor) do not deadlock -// -bool netiiu::destroyAllIO ( nciu &chan ) -{ - tsDLList < baseNMIU > eventQ; - { - epicsAutoMutex autoMutex ( this->mutex ); - if ( chan.verifyIIU ( *this ) ) { - eventQ.add ( chan.tcpiiuPrivateListOfIO::eventq ); - } - else { - return false; - } - } - while ( baseNMIU *pIO = eventQ.get () ) { - delete pIO; - } - return true; } void netiiu::connectTimeoutNotify () { - epicsAutoMutex autoMutex ( this->mutex ); tsDLIterBD < nciu > chan = this->channelList.firstIter (); while ( chan.valid () ) { chan->connectTimeoutNotify (); @@ -127,7 +69,6 @@ void netiiu::connectTimeoutNotify () void netiiu::resetChannelRetryCounts () { - epicsAutoMutex autoMutex ( this->mutex ); tsDLIterBD < nciu > chan = this->channelList.firstIter (); while ( chan.valid () ) { chan->resetRetryCount (); @@ -139,8 +80,6 @@ bool netiiu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThis { bool success; - epicsAutoMutex autoMutex ( this->mutex ); - if ( nciu *pChan = this->channelList.get () ) { success = pChan->searchMsg ( retrySeqNumber, retryNoForThisChannel ); if ( success ) { @@ -186,12 +125,12 @@ int netiiu::writeRequest ( nciu &, unsigned, unsigned, const void * ) return ECA_DISCONNCHID; } -int netiiu::writeNotifyRequest ( nciu &, cacNotify &, unsigned, unsigned, const void * ) +int netiiu::writeNotifyRequest ( nciu &, netWriteNotifyIO &, unsigned, unsigned, const void * ) { return ECA_DISCONNCHID; } -int netiiu::readNotifyRequest ( nciu &, cacNotify &, unsigned, unsigned ) +int netiiu::readNotifyRequest ( nciu &, netReadNotifyIO &, unsigned, unsigned ) { return ECA_DISCONNCHID; } @@ -206,28 +145,14 @@ int netiiu::clearChannelRequest ( nciu & ) return ECA_DISCONNCHID; } -void netiiu::subscriptionRequest ( netSubscription &, bool ) +void netiiu::subscriptionRequest ( netSubscription & ) { } -void netiiu::subscriptionCancelRequest ( netSubscription &, bool ) +void netiiu::subscriptionCancelRequest ( netSubscription & ) { } -void netiiu::installSubscription ( netSubscription &subscr ) -{ - bool connectedWhenInstalled; - { - epicsAutoMutex autoMutex ( this->mutex ); - subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr ); - connectedWhenInstalled = subscr.channel ().connected (); - } - // iiu pointer briefly points at tcpiiu before the channel is connected - if ( connectedWhenInstalled ) { - this->subscriptionRequest ( subscr, true ); - } -} - void netiiu::hostName ( char *pBuf, unsigned bufLength ) const { if ( bufLength ) { @@ -249,17 +174,24 @@ void netiiu::connectAllIO ( nciu & ) { } -bool netiiu::uninstallIO ( baseNMIU &io ) -{ - epicsAutoMutex autoMutex ( this->mutex ); - if ( io.channel ().verifyIIU ( *this ) ) { - io.channel ().tcpiiuPrivateListOfIO::eventq.remove ( io ); - return true; - } - return false; -} - double netiiu::beaconPeriod () const { return ( - DBL_MAX ); } + +void netiiu::flushRequest () +{ +} + +bool netiiu::flushBlockThreshold () const +{ + return false; +} + +void netiiu::flushRequestIfAboveEarlyThreshold () +{ +} + +void netiiu::blockUntilSendBacklogIsReasonable ( epicsMutex & ) +{ +} diff --git a/src/ca/netiiu_IL.h b/src/ca/netiiu_IL.h index 4386556ab..24b5acfba 100644 --- a/src/ca/netiiu_IL.h +++ b/src/ca/netiiu_IL.h @@ -28,4 +28,21 @@ inline unsigned netiiu::channelCount () const return this->channelList.count (); } +// cac lock must also be applied when +// calling this +inline void netiiu::attachChannel ( nciu &chan ) +{ + this->channelList.add ( chan ); +} + +// cac lock must also be applied when +// calling this +inline void netiiu::detachChannel ( nciu &chan ) +{ + this->channelList.remove ( chan ); + if ( this->channelList.count () == 0u ) { + this->lastChannelDetachNotify (); + } +} + #endif // netiiu_ILh diff --git a/src/ca/searchTimer.cpp b/src/ca/searchTimer.cpp index 248089aa0..efa936359 100644 --- a/src/ca/searchTimer.cpp +++ b/src/ca/searchTimer.cpp @@ -18,8 +18,9 @@ // // searchTimer::searchTimer () // -searchTimer::searchTimer ( udpiiu &iiuIn, epicsTimerQueue &queueIn ) : +searchTimer::searchTimer ( udpiiu &iiuIn, epicsTimerQueue &queueIn, epicsMutex &mutexIn ) : timer ( queueIn.createTimer ( *this ) ), + mutex ( mutexIn ), iiu ( iiuIn ), framesPerTry ( INITIALTRIESPERFRAME ), framesPerTryCongestThresh ( UINT_MAX ), @@ -133,6 +134,7 @@ void searchTimer::notifySearchResponse ( unsigned short retrySeqNoIn ) // epicsTimerNotify::expireStatus searchTimer::expire () { + epicsAutoMutex locker ( this->mutex ); unsigned nFrameSent = 0u; unsigned nChanSent = 0u; @@ -142,165 +144,161 @@ epicsTimerNotify::expireStatus searchTimer::expire () if ( this->iiu.channelCount () == 0 ) { return noRestart; } - - { - epicsAutoMutex locker ( this->mutex ); + /* + * increment the retry sequence number + */ + this->retrySeqNo++; /* allowed to roll over */ + + /* + * dynamically adjust the number of UDP frames per + * try depending how many search requests are not + * replied to + * + * This determines how many search request can be + * sent together (at the same instant in time). + * + * The variable this->framesPerTry + * determines the number of UDP frames to be sent + * each time that expire() is called. + * If this value is too high we will waste some + * network bandwidth. If it is too low we will + * use very little of the incoming UDP message + * buffer associated with the server's port and + * will therefore take longer to connect. We + * initialize this->framesPerTry + * to a prime number so that it is less likely that the + * same channel is in the last UDP frame + * sent every time that this is called (and + * potentially discarded by a CA server with + * a small UDP input queue). + */ + /* + * increase frames per try only if we see better than + * a 93.75% success rate for one pass through the list + */ + if (this->searchResponsesWithinThisPass > + (this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/16u)) ) { /* - * increment the retry sequence number + * increase UDP frames per try if we have a good score */ - this->retrySeqNo++; /* allowed to roll over */ - - /* - * dynamically adjust the number of UDP frames per - * try depending how many search requests are not - * replied to - * - * This determines how many search request can be - * sent together (at the same instant in time). - * - * The variable this->framesPerTry - * determines the number of UDP frames to be sent - * each time that expire() is called. - * If this value is too high we will waste some - * network bandwidth. If it is too low we will - * use very little of the incoming UDP message - * buffer associated with the server's port and - * will therefore take longer to connect. We - * initialize this->framesPerTry - * to a prime number so that it is less likely that the - * same channel is in the last UDP frame - * sent every time that this is called (and - * potentially discarded by a CA server with - * a small UDP input queue). - */ - /* - * increase frames per try only if we see better than - * a 93.75% success rate for one pass through the list - */ - if (this->searchResponsesWithinThisPass > - (this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/16u)) ) { + if ( this->framesPerTry < MAXTRIESPERFRAME ) { /* - * increase UDP frames per try if we have a good score + * a congestion avoidance threshold similar to TCP is now used */ - if ( this->framesPerTry < MAXTRIESPERFRAME ) { - /* - * a congestion avoidance threshold similar to TCP is now used - */ - if ( this->framesPerTry < this->framesPerTryCongestThresh ) { - this->framesPerTry += this->framesPerTry; - } - else { - this->framesPerTry += (this->framesPerTry/8) + 1; - } - debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n", - this->framesPerTry, this->searchTriesWithinThisPass, this->searchResponsesWithinThisPass) ); + if ( this->framesPerTry < this->framesPerTryCongestThresh ) { + this->framesPerTry += this->framesPerTry; } + else { + this->framesPerTry += (this->framesPerTry/8) + 1; + } + debugPrintf ( ("Increasing frame count to %u t=%u r=%u\n", + this->framesPerTry, this->searchTriesWithinThisPass, this->searchResponsesWithinThisPass) ); } + } + /* + * if we detect congestion because we have less than a 87.5% success + * rate then gradually reduce the frames per try + */ + else if ( this->searchResponsesWithinThisPass < + (this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/8u)) ) { + if (this->framesPerTry>1) { + this->framesPerTry--; + } + this->framesPerTryCongestThresh = this->framesPerTry/2 + 1; + debugPrintf ( ("Congestion detected - set frames per try to %u t=%u r=%u\n", + this->framesPerTry, this->searchTriesWithinThisPass, + this->searchResponsesWithinThisPass) ); + } + + while ( 1 ) { + /* - * if we detect congestion because we have less than a 87.5% success - * rate then gradually reduce the frames per try + * clear counter when we reach the end of the list + * + * if we are making some progress then + * dont increase the delay between search + * requests */ - else if ( this->searchResponsesWithinThisPass < - (this->searchTriesWithinThisPass-(this->searchTriesWithinThisPass/8u)) ) { - if (this->framesPerTry>1) { - this->framesPerTry--; - } - this->framesPerTryCongestThresh = this->framesPerTry/2 + 1; - debugPrintf ( ("Congestion detected - set frames per try to %u t=%u r=%u\n", - this->framesPerTry, this->searchTriesWithinThisPass, - this->searchResponsesWithinThisPass) ); - } - - while ( 1 ) { - - /* - * clear counter when we reach the end of the list - * - * if we are making some progress then - * dont increase the delay between search - * requests - */ - if ( this->searchTriesWithinThisPass >= this->iiu.channelCount () ) { - if ( this->searchResponsesWithinThisPass == 0u ) { - debugPrintf ( ("increasing search try interval\n") ); - this->setRetryInterval ( this->minRetry + 1u ); - } - - this->minRetry = UINT_MAX; - - /* - * increment the retry sequence number - * (this prevents the time of the next search - * try from being set to the current time if - * we are handling a response from an old - * search message) - */ - this->retrySeqNo++; /* allowed to roll over */ - - /* - * so that old search tries will not update the counters - */ - this->retrySeqAtPassBegin = this->retrySeqNo; - - this->searchTriesWithinThisPass = 0; - this->searchResponsesWithinThisPass = 0; - - debugPrintf ( ("saw end of list\n") ); + if ( this->searchTriesWithinThisPass >= this->iiu.channelCount () ) { + if ( this->searchResponsesWithinThisPass == 0u ) { + debugPrintf ( ("increasing search try interval\n") ); + this->setRetryInterval ( this->minRetry + 1u ); } - unsigned retryNoForThisChannel; - if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) { - nFrameSent++; - - if ( nFrameSent >= this->framesPerTry ) { - break; - } - - this->iiu.flush (); - - if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) { - break; - } - } - - if ( this->minRetry > retryNoForThisChannel ) { - this->minRetry = retryNoForThisChannel; - } - - if ( this->searchTriesWithinThisPass < UINT_MAX ) { - this->searchTriesWithinThisPass++; - } - if ( nChanSent < UINT_MAX ) { - nChanSent++; - } - + this->minRetry = UINT_MAX; + /* - * dont send any of the channels twice within one try + * increment the retry sequence number + * (this prevents the time of the next search + * try from being set to the current time if + * we are handling a response from an old + * search message) */ - if ( nChanSent >= this->iiu.channelCount () ) { - /* - * add one to nFrameSent because there may be - * one more partial frame to be sent - */ - nFrameSent++; - - /* - * cap this->framesPerTry to - * the number of frames required for all of - * the unresolved channels - */ - if ( this->framesPerTry > nFrameSent ) { - this->framesPerTry = nFrameSent; - } - + this->retrySeqNo++; /* allowed to roll over */ + + /* + * so that old search tries will not update the counters + */ + this->retrySeqAtPassBegin = this->retrySeqNo; + + this->searchTriesWithinThisPass = 0; + this->searchResponsesWithinThisPass = 0; + + debugPrintf ( ("saw end of list\n") ); + } + + unsigned retryNoForThisChannel; + if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) { + nFrameSent++; + + if ( nFrameSent >= this->framesPerTry ) { break; } + + this->iiu.datagramFlush (); + + if ( ! this->iiu.searchMsg ( this->retrySeqNo, retryNoForThisChannel ) ) { + break; + } + } + + if ( this->minRetry > retryNoForThisChannel ) { + this->minRetry = retryNoForThisChannel; + } + + if ( this->searchTriesWithinThisPass < UINT_MAX ) { + this->searchTriesWithinThisPass++; + } + if ( nChanSent < UINT_MAX ) { + nChanSent++; + } + + /* + * dont send any of the channels twice within one try + */ + if ( nChanSent >= this->iiu.channelCount () ) { + /* + * add one to nFrameSent because there may be + * one more partial frame to be sent + */ + nFrameSent++; + + /* + * cap this->framesPerTry to + * the number of frames required for all of + * the unresolved channels + */ + if ( this->framesPerTry > nFrameSent ) { + this->framesPerTry = nFrameSent; + } + + break; } } // flush out the search request buffer - this->iiu.flush (); + this->iiu.datagramFlush (); debugPrintf ( ("sent %u delay sec=%f\n", nFrameSent, this->period) ); diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 12b023e9a..7e58d8dcd 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -87,7 +87,7 @@ extern "C" void cacSendThreadTCP ( void *pParam ) } { - epicsAutoMutex autoMutex ( piiu->mutex ); + epicsAutoMutex autoMutex ( piiu->pCAC()->mutex() ); flowControlLaborNeeded = piiu->busyStateDetected != piiu->flowControlActive; echoLaborNeeded = piiu->echoRequestPending; piiu->echoRequestPending = false; @@ -119,7 +119,7 @@ extern "C" void cacSendThreadTCP ( void *pParam ) } } - if ( ! piiu->flushToWire ( false ) ) { + if ( ! piiu->flush () ) { break; } } @@ -241,7 +241,7 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) piiu->connect (); { - epicsAutoMutex autoMutex ( piiu->mutex ); + epicsAutoMutex autoMutex ( piiu->pCAC()->mutex() ); if ( piiu->state == iiu_connected ) { unsigned priorityOfSend; epicsThreadBooleanStatus tbs; @@ -272,7 +272,7 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) while ( piiu->state == iiu_connected ) { if ( nBytes >= maxBytesPendingTCP ) { epicsEventMustWait ( piiu->recvThreadRingBufferSpaceAvailableSignal ); - epicsAutoMutex autoMutex ( piiu->mutex ); + epicsAutoMutex autoMutex ( piiu->pCAC()->mutex() ); nBytes = piiu->recvQue.occupiedBytes (); } else { @@ -282,7 +282,7 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) if ( nBytesIn ) { bool msgHeaderButNoBody; { - epicsAutoMutex autoMutex ( piiu->mutex ); + epicsAutoMutex autoMutex ( piiu->pCAC()->mutex() ); nBytes = piiu->recvQue.occupiedBytes (); msgHeaderButNoBody = piiu->msgHeaderAvailable; piiu->recvQue.pushLastComBufReceived ( *pComBuf ); @@ -316,14 +316,14 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) } else { pComBuf->destroy (); - epicsAutoMutex autoMutex ( piiu->mutex ); + epicsAutoMutex autoMutex ( piiu->pCAC()->mutex() ); nBytes = piiu->recvQue.occupiedBytes (); } } else { // no way to be informed when memory is available epicsThreadSleep ( 0.5 ); - epicsAutoMutex autoMutex ( piiu->mutex ); + epicsAutoMutex autoMutex ( piiu->pCAC()->mutex() ); nBytes = piiu->recvQue.occupiedBytes (); } } @@ -339,7 +339,6 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, epicsTimerQueue &timerQueue netiiu ( &cac ), recvDog ( *this, connectionTimeout, timerQueue ), sendDog ( *this, connectionTimeout, timerQueue ), - ioTable ( 1024 ), sendQue ( *this ), pHostNameCache ( 0 ), curDataMax ( 0ul ), @@ -349,12 +348,14 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, epicsTimerQueue &timerQueue state ( iiu_connecting ), sock ( INVALID_SOCKET ), contigRecvMsgCount ( 0u ), + blockingForFlush ( 0u ), busyStateDetected ( false ), flowControlActive ( false ), echoRequestPending ( false ), msgHeaderAvailable ( false ), sockCloseCompleted ( false ), - fdRegCallbackNeeded ( true ) + fdRegCallbackNeeded ( true ), + earlyFlush ( false ) { this->addr.sa.sa_family = AF_UNSPEC; @@ -381,11 +382,21 @@ tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, epicsTimerQueue &timerQueue return; } + this->flushBlockSignal = epicsEventCreate ( epicsEventEmpty ); + if ( ! this->flushBlockSignal ) { + ca_printf ("CA: unable to create flushBlockSignal object\n"); + epicsEventDestroy (this->sendThreadExitSignal); + epicsEventDestroy (this->sendThreadFlushSignal); + this->fullyConstructedFlag = false; + return; + } + this->recvThreadRingBufferSpaceAvailableSignal = epicsEventCreate ( epicsEventEmpty ); if ( ! this->recvThreadRingBufferSpaceAvailableSignal ) { ca_printf ("CA: unable to create recvThreadRingBufferSpaceAvailableSignal object\n"); epicsEventDestroy (this->sendThreadExitSignal); epicsEventDestroy (this->sendThreadFlushSignal); + epicsEventDestroy (this->flushBlockSignal); this->fullyConstructedFlag = false; return; } @@ -405,95 +416,91 @@ bool tcpiiu::initiateConnect ( const osiSockAddr &addrIn, unsigned minorVersion, int status; int flag; + this->addr = addrIn; + + this->pHostNameCache = new hostNameCache ( addrIn, engineIn ); + if ( ! this->pHostNameCache ) { + return false; + } + + this->pBHE = &bhe; + bhe.bindToIIU ( *this ); + + this->state = iiu_connecting; + this->minorProtocolVersion = minorVersion; + + this->contigRecvMsgCount = 0u; + this->busyStateDetected = false; + this->flowControlActive = false; + this->echoRequestPending = false; + this->msgHeaderAvailable = false; + this->sockCloseCompleted = false; + + // first message informs server of user and host name of client + this->userNameSetRequest (); + this->hostNameSetRequest (); + + this->sock = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP ); + if ( this->sock == INVALID_SOCKET ) { + ca_printf ( "CAC: unable to create virtual circuit because \"%s\"\n", + SOCKERRSTR ( SOCKERRNO ) ); + return false; + } + + flag = true; + status = setsockopt ( this->sock, IPPROTO_TCP, TCP_NODELAY, + (char *) &flag, sizeof ( flag ) ); + if ( status < 0 ) { + ca_printf ("CAC: problems setting socket option TCP_NODELAY = \"%s\"\n", + SOCKERRSTR (SOCKERRNO)); + } + + flag = true; + status = setsockopt ( this->sock , SOL_SOCKET, SO_KEEPALIVE, + ( char * ) &flag, sizeof ( flag ) ); + if ( status < 0 ) { + ca_printf ( "CAC: problems setting socket option SO_KEEPALIVE = \"%s\"\n", + SOCKERRSTR ( SOCKERRNO ) ); + } + +# if 0 { - epicsAutoMutex autoMutex ( this->mutex ); + int i; - this->addr = addrIn; - - this->pHostNameCache = new hostNameCache ( addrIn, engineIn ); - if ( ! this->pHostNameCache ) { - return false; - } - - this->pBHE = &bhe; - bhe.bindToIIU ( *this ); - - this->state = iiu_connecting; - this->minorProtocolVersion = minorVersion; - - this->contigRecvMsgCount = 0u; - this->busyStateDetected = false; - this->flowControlActive = false; - this->echoRequestPending = false; - this->msgHeaderAvailable = false; - this->sockCloseCompleted = false; - - // first message informs server of user and host name of client - this->userNameSetRequest (); - this->hostNameSetRequest (); - - this->sock = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP ); - if ( this->sock == INVALID_SOCKET ) { - ca_printf ( "CAC: unable to create virtual circuit because \"%s\"\n", + /* + * some concern that vxWorks will run out of mBuf's + * if this change is made joh 11-10-98 + */ + i = MAX_MSG_SIZE; + status = setsockopt ( this->sock, SOL_SOCKET, SO_SNDBUF, + ( char * ) &i, sizeof ( i ) ); + if (status < 0) { + ca_printf ("CAC: problems setting socket option SO_SNDBUF = \"%s\"\n", SOCKERRSTR ( SOCKERRNO ) ); - return false; } - - flag = true; - status = setsockopt ( this->sock, IPPROTO_TCP, TCP_NODELAY, - (char *) &flag, sizeof ( flag ) ); + i = MAX_MSG_SIZE; + status = setsockopt ( this->sock, SOL_SOCKET, SO_RCVBUF, + ( char * ) &i, sizeof ( i ) ); if ( status < 0 ) { - ca_printf ("CAC: problems setting socket option TCP_NODELAY = \"%s\"\n", + ca_printf ("CAC: problems setting socket option SO_RCVBUF = \"%s\"\n", SOCKERRSTR (SOCKERRNO)); } + } +# endif - flag = true; - status = setsockopt ( this->sock , SOL_SOCKET, SO_KEEPALIVE, - ( char * ) &flag, sizeof ( flag ) ); - if ( status < 0 ) { - ca_printf ( "CAC: problems setting socket option SO_KEEPALIVE = \"%s\"\n", - SOCKERRSTR ( SOCKERRNO ) ); - } + memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) ); - #if 0 - { - int i; + tbs = epicsThreadHighestPriorityLevelBelow ( this->pCAC ()->getInitializingThreadsPriority (), &priorityOfRecv ); + if ( tbs != epicsThreadBooleanStatusSuccess ) { + priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority (); + } - /* - * some concern that vxWorks will run out of mBuf's - * if this change is made joh 11-10-98 - */ - i = MAX_MSG_SIZE; - status = setsockopt ( this->sock, SOL_SOCKET, SO_SNDBUF, - ( char * ) &i, sizeof ( i ) ); - if (status < 0) { - ca_printf ("CAC: problems setting socket option SO_SNDBUF = \"%s\"\n", - SOCKERRSTR ( SOCKERRNO ) ); - } - i = MAX_MSG_SIZE; - status = setsockopt ( this->sock, SOL_SOCKET, SO_RCVBUF, - ( char * ) &i, sizeof ( i ) ); - if ( status < 0 ) { - ca_printf ("CAC: problems setting socket option SO_RCVBUF = \"%s\"\n", - SOCKERRSTR (SOCKERRNO)); - } - } - #endif - - memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) ); - - tbs = epicsThreadHighestPriorityLevelBelow ( this->pCAC ()->getInitializingThreadsPriority (), &priorityOfRecv ); - if ( tbs != epicsThreadBooleanStatusSuccess ) { - priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority (); - } - - tid = epicsThreadCreate ("CAC-TCP-recv", priorityOfRecv, - epicsThreadGetStackSize (epicsThreadStackMedium), cacRecvThreadTCP, this); - if ( tid == 0 ) { - ca_printf ("CA: unable to create CA client receive thread\n"); - socket_close ( this->sock ); - return false; - } + tid = epicsThreadCreate ("CAC-TCP-recv", priorityOfRecv, + epicsThreadGetStackSize (epicsThreadStackMedium), cacRecvThreadTCP, this); + if ( tid == 0 ) { + ca_printf ("CA: unable to create CA client receive thread\n"); + socket_close ( this->sock ); + return false; } return true; @@ -516,7 +523,7 @@ void tcpiiu::connect () this->sendDog.cancel (); - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex autoMutex ( this->pCAC()->mutex() ); if ( this->state == iiu_connecting ) { // put the iiu into the connected state @@ -562,7 +569,7 @@ void tcpiiu::cleanShutdown () this->sendDog.cancel (); this->recvDog.cancel (); - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex autoMutex ( this->pCAC()->mutex() ); if ( this->state == iiu_connected ) { int status = ::shutdown ( this->sock, SD_BOTH ); @@ -607,7 +614,7 @@ void tcpiiu::forcedShutdown () this->sendDog.cancel (); this->recvDog.cancel (); - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex autoMutex ( this->pCAC()->mutex() ); if ( this->state != iiu_disconnected ) { // force abortive shutdown sequence (discard outstanding sends @@ -642,13 +649,6 @@ void tcpiiu::disconnect () { assert ( this->fullyConstructedFlag ); - // if we get here and the IO is still attached then we have an - // io block that was not registered with a channel. - if ( this->ioTable.numEntriesInstalled () ) { - this->pCAC ()->printf ( "CA connection disconnect with %u IO items still installed?\n", - this->ioTable.numEntriesInstalled () ); - } - CAFDHANDLER *fdRegFunc; void *fdRegArg; this->pCAC ()->getFDRegCallback ( fdRegFunc, fdRegArg ); @@ -719,7 +719,7 @@ void tcpiiu::disconnect () * free message body cache */ { - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex autoMutex ( this->pCAC()->mutex() ); if ( this->pCurData ) { delete [] this->pCurData; @@ -739,6 +739,13 @@ void tcpiiu::disconnect () this->recvQue.clear (); } this->fdRegCallbackNeeded = true; + + // wakeup user threads blocking for send backlog to be reduced + // and wait for them to stop using this IIU + epicsEventSignal ( this->flushBlockSignal ); + while ( this->blockingForFlush ) { + epicsThreadSleep ( 0.1 ); + } } @@ -755,12 +762,11 @@ tcpiiu::~tcpiiu () epicsEventDestroy ( this->recvThreadExitSignal ); epicsEventDestroy ( this->sendThreadFlushSignal ); epicsEventDestroy ( this->recvThreadRingBufferSpaceAvailableSignal ); + epicsEventDestroy ( this->flushBlockSignal ); if ( this->pHostNameCache ) { this->pHostNameCache->destroy (); } - - // this->pBHE lifetime management is handled by the class that creates this object } void tcpiiu::suicide () @@ -795,7 +801,7 @@ bool tcpiiu::isVirtaulCircuit ( const char *pChannelName, const osiSockAddr &add } if ( ! match ) { - epicsAutoMutex locker ( this->mutex ); + epicsAutoMutex locker ( this->pCAC()->mutex() ); char acc[64]; if ( this->pHostNameCache ) { this->pHostNameCache->hostName ( acc, sizeof ( acc ) ); @@ -813,7 +819,7 @@ bool tcpiiu::isVirtaulCircuit ( const char *pChannelName, const osiSockAddr &add void tcpiiu::show ( unsigned level ) const { - epicsAutoMutex locker ( this->mutex ); + epicsAutoMutex locker ( this->pCAC()->mutex() ); char buf[256]; if ( this->pHostNameCache ) { this->pHostNameCache->hostName ( buf, sizeof ( buf ) ); @@ -851,17 +857,16 @@ void tcpiiu::show ( unsigned level ) const this->pBHE->show ( level - 3u ); } ::printf ( "IO identifier hash table:\n" ); - this->ioTable.show ( level - 3u ); } } bool tcpiiu::setEchoRequestPending () { { - epicsAutoMutex locker ( this->mutex ); + epicsAutoMutex locker ( this->pCAC()->mutex() ); this->echoRequestPending = true; } - this->flush (); + this->flushRequest (); if ( CA_V43 (CA_PROTOCOL_VERSION, this->minorProtocolVersion ) ) { // we send an echo return true; @@ -886,11 +891,11 @@ int tcpiiu::hostNameSetRequest () unsigned postSize = CA_MESSAGE_ALIGN ( size ); assert ( postSize < 0xffff ); - if ( this->sendQue.flushThreshold ( postSize + 16u ) ) { - this->flush (); + if ( this->sendQue.flushEarlyThreshold ( postSize + 16u ) ) { + this->flushRequest (); } - epicsAutoMutex locker ( this->mutex ); + epicsAutoMutex locker ( this->pCAC()->mutex() ); int status = this->sendQue.reserveSpace ( postSize + 16u ); if ( status == ECA_NORMAL ) { @@ -922,11 +927,11 @@ int tcpiiu::userNameSetRequest () unsigned postSize = CA_MESSAGE_ALIGN ( size ); assert ( postSize < 0xffff ); - if ( this->sendQue.flushThreshold ( postSize + 16u ) ) { - this->flush (); + if ( this->sendQue.flushEarlyThreshold ( postSize + 16u ) ) { + this->flushRequest (); } - epicsAutoMutex locker ( this->mutex ); + epicsAutoMutex locker ( this->pCAC()->mutex() ); int status = this->sendQue.reserveSpace ( postSize + 16u ); if ( status == ECA_NORMAL ) { @@ -946,11 +951,11 @@ int tcpiiu::userNameSetRequest () int tcpiiu::disableFlowControlRequest () { - if ( this->sendQue.flushThreshold ( 16u ) ) { - this->flush (); + if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { + this->flushRequest (); } - epicsAutoMutex locker ( this->mutex ); + epicsAutoMutex locker ( this->pCAC()->mutex() ); int status = this->sendQue.reserveSpace ( 16u ); if ( status == ECA_NORMAL ) { @@ -967,11 +972,11 @@ int tcpiiu::disableFlowControlRequest () int tcpiiu::enableFlowControlRequest () { - if ( this->sendQue.flushThreshold ( 16u ) ) { - this->flush (); + if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { + this->flushRequest (); } - epicsAutoMutex locker ( this->mutex ); + epicsAutoMutex locker ( this->pCAC()->mutex() ); int status = this->sendQue.reserveSpace ( 16u ); if ( status == ECA_NORMAL ) { @@ -988,11 +993,11 @@ int tcpiiu::enableFlowControlRequest () int tcpiiu::noopRequest () { - if ( this->sendQue.flushThreshold ( 16u ) ) { - this->flush (); + if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { + this->flushRequest (); } - epicsAutoMutex locker ( this->mutex ); + epicsAutoMutex locker ( this->pCAC()->mutex() ); int status = this->sendQue.reserveSpace ( 16u ); if ( status == ECA_NORMAL ) { @@ -1009,11 +1014,11 @@ int tcpiiu::noopRequest () int tcpiiu::echoRequest () { - if ( this->sendQue.flushThreshold ( 16u ) ) { - this->flush (); + if ( this->sendQue.flushEarlyThreshold ( 16u ) ) { + this->flushRequest (); } - epicsAutoMutex locker ( this->mutex ); + epicsAutoMutex locker ( this->pCAC()->mutex() ); int status = this->sendQue.reserveSpace ( 16u ); if ( status == ECA_NORMAL ) { @@ -1042,10 +1047,10 @@ bool tcpiiu::writeNotifyRespAction () { int status = this->curMsg.m_cid; if ( status == ECA_NORMAL ) { - return this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available ); + return this->pCAC()->ioCompletionNotifyAndDestroy ( this->curMsg.m_available ); } else { - return this->ioExceptionNotifyAndDestroy ( this->curMsg.m_available, + return this->pCAC()->ioExceptionNotifyAndDestroy ( this->curMsg.m_available, status, "write notify request rejected" ); } } @@ -1083,11 +1088,11 @@ bool tcpiiu::readNotifyRespAction () } if ( status == ECA_NORMAL ) { - return this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available, + return this->pCAC()->ioCompletionNotifyAndDestroy ( this->curMsg.m_available, this->curMsg.m_dataType, this->curMsg.m_count, this->pCurData ); } else { - return this->ioExceptionNotifyAndDestroy ( this->curMsg.m_available, + return this->pCAC()->ioExceptionNotifyAndDestroy ( this->curMsg.m_available, status, "read failed", this->curMsg.m_dataType, this->curMsg.m_count ); } } @@ -1131,11 +1136,11 @@ bool tcpiiu::eventRespAction () status = ECA_NORMAL; } if ( status == ECA_NORMAL ) { - return this->ioCompletionNotify ( this->curMsg.m_available, + return this->pCAC()->ioCompletionNotify ( this->curMsg.m_available, this->curMsg.m_dataType, this->curMsg.m_count, this->pCurData ); } else { - return this->ioExceptionNotify ( this->curMsg.m_available, + return this->pCAC()->ioExceptionNotify ( this->curMsg.m_available, status, "subscription update failed", this->curMsg.m_dataType, this->curMsg.m_count ); } @@ -1143,7 +1148,7 @@ bool tcpiiu::eventRespAction () bool tcpiiu::readRespAction () { - return this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available, + return this->pCAC()->ioCompletionNotifyAndDestroy ( this->curMsg.m_available, this->curMsg.m_dataType, this->curMsg.m_count, this->pCurData ); } @@ -1159,7 +1164,7 @@ bool tcpiiu::exceptionRespAction () char hostName[64]; { - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex autoMutex ( this->pCAC()->mutex() ); const char *pName = reinterpret_cast < const char * > ( req + 1 ); @@ -1180,30 +1185,30 @@ bool tcpiiu::exceptionRespAction () switch ( ntohs ( req->m_cmmd ) ) { case CA_PROTO_READ_NOTIFY: - return this->ioExceptionNotifyAndDestroy ( ntohl ( req->m_available ), + return this->pCAC()->ioExceptionNotifyAndDestroy ( ntohl ( req->m_available ), ntohl ( this->curMsg.m_available ), context, ntohs ( req->m_dataType ), ntohs ( req->m_count ) ); case CA_PROTO_READ: - return this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available), + return this->pCAC()->ioExceptionNotifyAndDestroy ( ntohl (req->m_available), ntohl ( this->curMsg.m_available ), context, ntohs ( req->m_dataType ), ntohs ( req->m_count ) ); case CA_PROTO_WRITE_NOTIFY: - return this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available), + return this->pCAC()->ioExceptionNotifyAndDestroy ( ntohl (req->m_available), ntohl ( this->curMsg.m_available ), context, ntohs ( req->m_dataType ), ntohs ( req->m_count ) ); case CA_PROTO_WRITE: - this->pCAC ()->exceptionNotify ( ntohl ( this->curMsg.m_available ), + this->pCAC()->exceptionNotify ( ntohl ( this->curMsg.m_available ), context, ntohs ( req->m_dataType ), ntohs ( req->m_count ), __FILE__, __LINE__); return true; case CA_PROTO_EVENT_ADD: - return this->ioExceptionNotify ( ntohl ( req->m_available ), + return this->pCAC()->ioExceptionNotify ( ntohl ( req->m_available ), ntohl ( this->curMsg.m_available ), context, ntohs ( req->m_dataType ), ntohs ( req->m_count ) ); case CA_PROTO_EVENT_CANCEL: - return this->ioExceptionNotifyAndDestroy ( ntohl ( req->m_available ), + return this->pCAC()->ioExceptionNotifyAndDestroy ( ntohl ( req->m_available ), ntohl ( this->curMsg.m_available ), context ); default: - this->pCAC ()->exceptionNotify ( ntohl ( this->curMsg.m_available ), + this->pCAC()->exceptionNotify ( ntohl ( this->curMsg.m_available ), context, __FILE__, __LINE__ ); return true; } @@ -1240,7 +1245,7 @@ bool tcpiiu::badTCPRespAction () char hostName[64]; bool hostNameInit; { - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex autoMutex ( this->pCAC()->mutex() ); if ( this->pHostNameCache ) { this->pHostNameCache->hostName ( hostName, sizeof ( hostName ) ); hostNameInit = true; @@ -1286,7 +1291,7 @@ void tcpiiu::processIncoming () // fetch a complete message header // { - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex autoMutex ( this->pCAC()->mutex() ); nBytes = this->recvQue.occupiedBytes (); @@ -1403,6 +1408,10 @@ void tcpiiu::processIncoming () int tcpiiu::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue ) { + if ( ! chan.connected () ) { + return ECA_DISCONNCHID; + } + if ( ! this->sendQue.dbr_type_ok ( type ) ) { return ECA_BADTYPE; } @@ -1428,41 +1437,33 @@ int tcpiiu::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void return ECA_BADCOUNT; } - if ( this->sendQue.flushThreshold ( postcnt + sizeof ( caHdr ) ) ) { - this->threadContextSensitiveFlushToWire ( true ); - } - - epicsAutoMutex autoMutex ( this->mutex ); - - int status; - if ( chan.verifyConnected ( *this ) ) { - status = this->sendQue.reserveSpace ( postcnt + 16u ); - if ( status == ECA_NORMAL ) { - this->sendQue.pushUInt16 ( CA_PROTO_WRITE ); // cmd - this->sendQue.pushUInt16 ( postcnt ); // postsize - this->sendQue.pushUInt16 ( type ); // dataType - this->sendQue.pushUInt16 ( nElem ); // count - this->sendQue.pushUInt32 ( chan.getSID () ); // cid - this->sendQue.pushUInt32 ( ~0UL ); // available - if ( stringOptim ) { - this->sendQue.pushString ( static_cast < const char * > ( pValue ), size ); - } - else { - this->sendQue.push_dbr_type ( type, pValue, nElem ); - } - this->sendQue.pushString ( nillBytes, postcnt - size ); + int status = this->sendQue.reserveSpace ( postcnt + 16u ); + if ( status == ECA_NORMAL ) { + this->sendQue.pushUInt16 ( CA_PROTO_WRITE ); // cmd + this->sendQue.pushUInt16 ( postcnt ); // postsize + this->sendQue.pushUInt16 ( type ); // dataType + this->sendQue.pushUInt16 ( nElem ); // count + this->sendQue.pushUInt32 ( chan.getSID () ); // cid + this->sendQue.pushUInt32 ( ~0UL ); // available + if ( stringOptim ) { + this->sendQue.pushString ( static_cast < const char * > ( pValue ), size ); } - } - else { - status = ECA_DISCONNCHID; + else { + this->sendQue.push_dbr_type ( type, pValue, nElem ); + } + this->sendQue.pushString ( nillBytes, postcnt - size ); } return status; } -int tcpiiu::writeNotifyRequest ( nciu &chan, cacNotify ¬ify, unsigned type, +int tcpiiu::writeNotifyRequest ( nciu &chan, netWriteNotifyIO &io, unsigned type, unsigned nElem, const void *pValue ) { + if ( ! chan.connected () ) { + return ECA_DISCONNCHID; + } + if ( ! this->ca_v41_ok () ) { return ECA_NOSUPPORT; } @@ -1491,52 +1492,32 @@ int tcpiiu::writeNotifyRequest ( nciu &chan, cacNotify ¬ify, unsigned type, return ECA_BADCOUNT; } - if ( this->sendQue.flushThreshold ( postcnt + 16u ) ) { - this->threadContextSensitiveFlushToWire ( true ); - } - - epicsAutoMutex autoMutex ( this->mutex ); - - int status; - if ( chan.verifyConnected ( *this ) ) { - netWriteNotifyIO * pIO = new netWriteNotifyIO ( chan, notify ); - if ( pIO ) { - status = this->sendQue.reserveSpace ( postcnt + 16u ); - if ( status == ECA_NORMAL ) { - this->ioTable.add ( *pIO ); - chan.tcpiiuPrivateListOfIO::eventq.add ( *pIO ); - this->sendQue.pushUInt16 ( CA_PROTO_WRITE_NOTIFY ); // cmd - this->sendQue.pushUInt16 ( postcnt ); // postsize - this->sendQue.pushUInt16 ( type ); // dataType - this->sendQue.pushUInt16 ( nElem ); // count - this->sendQue.pushUInt32 ( chan.getSID () ); // cid - this->sendQue.pushUInt32 ( pIO->getID () ); // available - if ( stringOptim ) { - this->sendQue.pushString ( static_cast < const char * > ( pValue ), size ); - } - else { - this->sendQue.push_dbr_type ( type, pValue, nElem ); - } - this->sendQue.pushString ( nillBytes, postcnt - size ); - } - else { - delete static_cast < baseNMIU * > ( pIO ); - } + int status = this->sendQue.reserveSpace ( postcnt + 16u ); + if ( status == ECA_NORMAL ) { + this->sendQue.pushUInt16 ( CA_PROTO_WRITE_NOTIFY ); // cmd + this->sendQue.pushUInt16 ( postcnt ); // postsize + this->sendQue.pushUInt16 ( type ); // dataType + this->sendQue.pushUInt16 ( nElem ); // count + this->sendQue.pushUInt32 ( chan.getSID () ); // cid + this->sendQue.pushUInt32 ( io.getID () ); // available + if ( stringOptim ) { + this->sendQue.pushString ( static_cast < const char * > ( pValue ), size ); } else { - status = ECA_ALLOCMEM; + this->sendQue.push_dbr_type ( type, pValue, nElem ); } + this->sendQue.pushString ( nillBytes, postcnt - size ); } - else { - status = ECA_DISCONNCHID; - } - return status; } -int tcpiiu::readNotifyRequest ( nciu &chan, cacNotify ¬ify, +int tcpiiu::readNotifyRequest ( nciu &chan, netReadNotifyIO &io, unsigned type, unsigned nElem ) { + if ( ! chan.connected () ) { + return ECA_DISCONNCHID; + } + if ( nElem > 0xffff) { return ECA_BADCOUNT; } @@ -1544,39 +1525,15 @@ int tcpiiu::readNotifyRequest ( nciu &chan, cacNotify ¬ify, return ECA_BADTYPE; } - if ( this->sendQue.flushThreshold ( 16u ) ) { - this->threadContextSensitiveFlushToWire ( true ); + int status = this->sendQue.reserveSpace ( 16u ); + if ( status == ECA_NORMAL ) { + this->sendQue.pushUInt16 ( CA_PROTO_READ_NOTIFY ); // cmd + this->sendQue.pushUInt16 ( 0u ); // postsize + this->sendQue.pushUInt16 ( type ); // dataType + this->sendQue.pushUInt16 ( nElem ); // count + this->sendQue.pushUInt32 ( chan.getSID () ); // cid + this->sendQue.pushUInt32 ( io.getID () ); // available } - - epicsAutoMutex autoMutex ( this->mutex ); - - int status; - if ( chan.verifyConnected ( *this ) ) { - netReadNotifyIO *pIO = new netReadNotifyIO ( chan, notify ); - if ( pIO ) { - status = this->sendQue.reserveSpace ( 16u ); - if ( status == ECA_NORMAL ) { - this->ioTable.add ( *pIO ); - chan.tcpiiuPrivateListOfIO::eventq.add ( *pIO ); - this->sendQue.pushUInt16 ( CA_PROTO_READ_NOTIFY ); // cmd - this->sendQue.pushUInt16 ( 0u ); // postsize - this->sendQue.pushUInt16 ( type ); // dataType - this->sendQue.pushUInt16 ( nElem ); // count - this->sendQue.pushUInt32 ( chan.getSID () ); // cid - this->sendQue.pushUInt32 ( pIO->getID () ); // available - } - else { - delete static_cast < baseNMIU * > ( pIO ); - } - } - else { - status = ECA_ALLOCMEM; - } - } - else { - status = ECA_DISCONNCHID; - } - return status; } @@ -1602,37 +1559,25 @@ int tcpiiu::createChannelRequest ( nciu &chan ) return ECA_INTERNAL; } - if ( this->sendQue.flushThreshold ( postCnt + 16u ) ) { - this->flush (); - } - - epicsAutoMutex autoMutex ( this->mutex ); - - int status; - if ( chan.verifyIIU ( *this ) ) { - status = this->sendQue.reserveSpace ( postCnt + 16u ); - if ( status == ECA_NORMAL ) { - this->sendQue.pushUInt16 ( CA_PROTO_CLAIM_CIU ); // cmd - this->sendQue.pushUInt16 ( postCnt ); // postsize - this->sendQue.pushUInt16 ( 0u ); // dataType - this->sendQue.pushUInt16 ( 0u ); // count - this->sendQue.pushUInt32 ( identity ); // cid - // - // The available field is used (abused) - // here to communicate the minor version number - // starting with CA 4.1. - // - this->sendQue.pushUInt32 ( CA_MINOR_VERSION ); // available - if ( nameLength ) { - this->sendQue.pushString ( pName, nameLength ); - } - if ( postCnt > nameLength ) { - this->sendQue.pushString ( nillBytes, postCnt - nameLength ); - } + int status = this->sendQue.reserveSpace ( postCnt + 16u ); + if ( status == ECA_NORMAL ) { + this->sendQue.pushUInt16 ( CA_PROTO_CLAIM_CIU ); // cmd + this->sendQue.pushUInt16 ( postCnt ); // postsize + this->sendQue.pushUInt16 ( 0u ); // dataType + this->sendQue.pushUInt16 ( 0u ); // count + this->sendQue.pushUInt32 ( identity ); // cid + // + // The available field is used (abused) + // here to communicate the minor version number + // starting with CA 4.1. + // + this->sendQue.pushUInt32 ( CA_MINOR_VERSION ); // available + if ( nameLength ) { + this->sendQue.pushString ( pName, nameLength ); + } + if ( postCnt > nameLength ) { + this->sendQue.pushString ( nillBytes, postCnt - nameLength ); } - } - else { - status = ECA_DISCONNCHID; } return status; @@ -1640,29 +1585,18 @@ int tcpiiu::createChannelRequest ( nciu &chan ) int tcpiiu::clearChannelRequest ( nciu &chan ) { - int status; - - if ( this->sendQue.flushThreshold ( 16u ) ) { - this->threadContextSensitiveFlushToWire ( true ); + if ( ! chan.connected () ) { + return ECA_DISCONNCHID; } - - epicsAutoMutex autoMutex ( this->mutex ); - - if ( ! chan.verifyConnected ( *this ) ) { - status = ECA_DISCONNCHID; + int status = this->sendQue.reserveSpace ( 16u ); + if ( status == ECA_NORMAL ) { + this->sendQue.pushUInt16 ( CA_PROTO_CLEAR_CHANNEL ); // cmd + this->sendQue.pushUInt16 ( 0u ); // postsize + this->sendQue.pushUInt16 ( 0u ); // dataType + this->sendQue.pushUInt16 ( 0u ); // count + this->sendQue.pushUInt32 ( chan.getSID () ); // cid + this->sendQue.pushUInt32 ( chan.getCID () ); // available } - else { - status = this->sendQue.reserveSpace ( 16u ); - if ( status == ECA_NORMAL ) { - this->sendQue.pushUInt16 ( CA_PROTO_CLEAR_CHANNEL ); // cmd - this->sendQue.pushUInt16 ( 0u ); // postsize - this->sendQue.pushUInt16 ( 0u ); // dataType - this->sendQue.pushUInt16 ( 0u ); // count - this->sendQue.pushUInt32 ( chan.getSID () ); // cid - this->sendQue.pushUInt32 ( chan.getCID () ); // available - } - } - return status; } @@ -1670,87 +1604,58 @@ int tcpiiu::clearChannelRequest ( nciu &chan ) // this routine return void because if this internally fails the best response // is to try again the next time that we reconnect // -void tcpiiu::subscriptionRequest ( netSubscription & subscr, bool userThread ) +void tcpiiu::subscriptionRequest ( netSubscription & subscr ) { + if ( ! subscr.channel().connected() ) { + return; + } + if ( subscr.getType() > 0xffff ) { - this->pCAC () -> printf ( "CAC: subscriptionRequest() ignored because of unexpected bad type that was checked earlier\n" ); + this->pCAC()->printf ( "CAC: subscriptionRequest() ignored because of unexpected bad type that was checked earlier\n" ); return; } if ( subscr.getMask() > 0xffff || subscr.getMask () == 0u ) { - this->pCAC () -> printf ( "CAC: subscriptionRequest() ignored because of unexpected bad mask that was checked earlier\n" ); + this->pCAC()->printf ( "CAC: subscriptionRequest() ignored because of unexpected bad mask that was checked earlier\n" ); return; } unsigned long count = subscr.getCount (); if ( count == 0u || count > 0xffff ) { - this->pCAC () -> printf ( "CAC: subscriptionRequest() ignored because of unexpected bad count that was checked earlier\n" ); + this->pCAC()->printf ( "CAC: subscriptionRequest() ignored because of unexpected bad count that was checked earlier\n" ); return; } - if ( this->sendQue.flushThreshold ( 32u ) ) { - if ( userThread ) { - this->threadContextSensitiveFlushToWire ( true ); - } - else { - this->flush (); - } + int status = this->sendQue.reserveSpace ( 32u ); + if ( status == ECA_NORMAL ) { + + // header + this->sendQue.pushUInt16 ( CA_PROTO_EVENT_ADD ); // cmd + this->sendQue.pushUInt16 ( 16u ); // postsize + this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getType () ) ); // dataType + this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( count ) ); // count + this->sendQue.pushUInt32 ( subscr . channel () . getSID () ); // cid + this->sendQue.pushUInt32 ( subscr . getID () ); // available + + // extension + this->sendQue.pushFloat32 ( 0.0 ); // m_lval + this->sendQue.pushFloat32 ( 0.0 ); // m_hval + this->sendQue.pushFloat32 ( 0.0 ); // m_toval + this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getMask () ) ); // m_mask + this->sendQue.pushUInt16 ( 0u ); // m_pad } - - epicsAutoMutex autoMutex ( this->mutex ); - - int status; - if ( subscr.channel().verifyConnected ( *this ) ) { - status = this->sendQue.reserveSpace ( 32u ); - if ( status == ECA_NORMAL ) { - this->ioTable . add ( subscr ); - - // header - this->sendQue.pushUInt16 ( CA_PROTO_EVENT_ADD ); // cmd - this->sendQue.pushUInt16 ( 16u ); // postsize - this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getType () ) ); // dataType - this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( count ) ); // count - this->sendQue.pushUInt32 ( subscr . channel () . getSID () ); // cid - this->sendQue.pushUInt32 ( subscr . getID () ); // available - - // extension - this->sendQue.pushFloat32 ( 0.0 ); // m_lval - this->sendQue.pushFloat32 ( 0.0 ); // m_hval - this->sendQue.pushFloat32 ( 0.0 ); // m_toval - this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getMask () ) ); // m_mask - this->sendQue.pushUInt16 ( 0u ); // m_pad - } - } - else { - this->pCAC () -> printf ( "CAC: subscriptionRequest() ignored because of insufficient memory\n" ); - } - - return; } -void tcpiiu::subscriptionCancelRequest ( netSubscription &subscr, bool userThread ) +void tcpiiu::subscriptionCancelRequest ( netSubscription &subscr ) { - if ( this->sendQue.flushThreshold(16u) ) { - if ( userThread ) { - this->threadContextSensitiveFlushToWire ( true ); - } - else { - this->flush (); - } - } - - epicsAutoMutex autoMutex ( this->mutex ); - int status = this->sendQue.reserveSpace ( 16u ); if ( status == ECA_NORMAL ) { - if ( subscr.channel().verifyConnected(*this) ) { - this->sendQue.pushUInt16 ( CA_PROTO_EVENT_CANCEL ); // cmd - this->sendQue.pushUInt16 ( 0u ); // postsize - this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getType () ) ); // dataType - this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getCount () ) ); // count - this->sendQue.pushUInt32 ( subscr.channel ().getSID () ); // cid - this->sendQue.pushUInt32 ( subscr.getID () ); // available - } + this->sendQue.pushUInt16 ( CA_PROTO_EVENT_CANCEL ); // cmd + this->sendQue.pushUInt16 ( 0u ); // postsize + this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getType () ) ); // dataType + this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getCount () ) ); // count + this->sendQue.pushUInt32 ( subscr.channel ().getSID () ); // cid + this->sendQue.pushUInt32 ( subscr.getID () ); // available } } @@ -1759,293 +1664,72 @@ void tcpiiu::lastChannelDetachNotify () this->cleanShutdown (); } -bool tcpiiu::threadContextSensitiveFlushToWire ( bool userThread ) +bool tcpiiu::flush () { - // the recv thread is not permitted to flush as this - // can result in a push / pull deadlock on the TCP pipe, - // but in that case we still schedual the flush through - // the higher priority send thread - if ( pCAC ()->flushPermit () ) { - return this->flushToWire ( userThread ); - } - this->flush (); - return true; -} - -bool tcpiiu::flushToWire ( bool userThread ) -{ - bool success = true; - - // enable callback processing prior to taking the flush lock - if ( userThread ) { - this->pCAC ()->enableCallbackPreemption (); - } - - // only one thread at a time can perform a flush. Nevertheless, - // the primary lock must not be held while sending in order - // to prevent push pull deadlocks - epicsAutoMutex autoFlushMutex ( this->flushMutex ); - while ( true ) { comBuf * pBuf; { - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex autoMutex ( this->pCAC()->mutex() ); pBuf = this->sendQue.popNextComBufToSend (); + if ( ! pBuf ) { + if ( this->blockingForFlush ) { + epicsEventSignal ( this->flushBlockSignal ); + } + this->earlyFlush = false; + return true; + } } - if ( ! pBuf ) { - break; - } - - success = pBuf->flushToWire ( *this ); + bool success = pBuf->flushToWire ( *this ); pBuf->destroy (); if ( ! success ) { - epicsAutoMutex autoMutex ( this->mutex ); + epicsAutoMutex autoMutex ( this->pCAC()->mutex() ); while ( ( pBuf = this->sendQue.popNextComBufToSend () ) ) { pBuf->destroy (); } - break; - } - } - - if ( userThread ) { - this->pCAC ()->disableCallbackPreemption (); - } - - return success; -} - -bool tcpiiu::ioCompletionNotify ( unsigned id, unsigned type, - unsigned long count, const void *pData ) -{ - epicsAutoMutex autoMutex ( this->mutex ); - baseNMIU * pmiu = this->ioTable.lookup ( id ); - if ( pmiu ) { - pmiu->notify ().completionNotify ( pmiu->channel (), type, count, pData ); - return true; - } - else { - return false; - } -} - -bool tcpiiu::ioExceptionNotify ( unsigned id, int status, const char *pContext ) -{ - epicsAutoMutex autoMutex ( this->mutex ); - baseNMIU * pmiu = this->ioTable.lookup ( id ); - if ( pmiu ) { - pmiu->notify ().exceptionNotify ( pmiu->channel (), status, pContext ); - return true; - } - else { - return false; - } -} - -bool tcpiiu::ioExceptionNotify ( unsigned id, int status, - const char *pContext, unsigned type, unsigned long count ) -{ - epicsAutoMutex autoMutex ( this->mutex ); - baseNMIU * pmiu = this->ioTable.lookup ( id ); - if ( pmiu ) { - pmiu->notify ().exceptionNotify ( pmiu->channel (), - status, pContext, type, count ); - return true; - } - else { - return false; - } -} - -bool tcpiiu::ioCompletionNotifyAndDestroy ( unsigned id ) -{ - baseNMIU * pmiu; - - { - epicsAutoMutex autoMutex ( this->mutex ); - pmiu = this->ioTable.remove ( id ); - if ( pmiu ) { - pmiu->channel ().tcpiiuPrivateListOfIO::eventq.remove ( *pmiu ); - } - } - - if ( pmiu ) { - pmiu->notify ().completionNotify ( pmiu->channel () ); - delete pmiu; - return true; - } - else { - return false; - } -} - -bool tcpiiu::ioCompletionNotifyAndDestroy ( unsigned id, - unsigned type, unsigned long count, const void *pData ) -{ - baseNMIU * pmiu; - - { - epicsAutoMutex autoMutex ( this->mutex ); - pmiu = this->ioTable.remove ( id ); - if ( pmiu ) { - pmiu->channel ().tcpiiuPrivateListOfIO::eventq.remove ( *pmiu ); - } - } - - if ( pmiu ) { - pmiu->notify ().completionNotify ( pmiu->channel (), type, count, pData ); - delete pmiu; - return true; - } - else { - return false; - } -} - -bool tcpiiu::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext ) -{ - baseNMIU * pmiu; - - { - epicsAutoMutex autoMutex ( this->mutex ); - pmiu = this->ioTable.remove ( id ); - if ( pmiu ) { - pmiu->channel ().tcpiiuPrivateListOfIO::eventq.remove ( *pmiu ); - } - } - - if ( pmiu ) { - pmiu->notify ().exceptionNotify ( pmiu->channel (), status, pContext ); - delete pmiu; - return true; - } - else { - return false; - } -} - -bool tcpiiu::ioExceptionNotifyAndDestroy ( unsigned id, int status, - const char *pContext, unsigned type, unsigned long count ) -{ - baseNMIU * pmiu; - - { - epicsAutoMutex autoMutex ( this->mutex ); - pmiu = this->ioTable.remove ( id ); - if ( pmiu ) { - pmiu->channel ().tcpiiuPrivateListOfIO::eventq.remove ( *pmiu ); - } - } - - if ( pmiu ) { - pmiu->notify ().exceptionNotify ( pmiu->channel (), status, - pContext, type, count ); - delete pmiu; - return true; - } - else { - return false; - } -} - -// resubscribe for monitors from this channel -void tcpiiu::connectAllIO ( nciu &chan ) -{ - epicsAutoMutex autoMutex ( this->mutex ); - if ( chan.verifyConnected ( *this ) ) { - tsDLIterBD < baseNMIU > pNetIO = - chan.tcpiiuPrivateListOfIO::eventq.firstIter (); - while ( pNetIO.valid () ) { - tsDLIterBD < baseNMIU > next = pNetIO; - next++; - class netSubscription *pSubscr = pNetIO->isSubscription (); - if ( pSubscr ) { - this->subscriptionRequest ( *pSubscr, false ); + if ( this->blockingForFlush ) { + epicsEventSignal ( this->flushBlockSignal ); } - else { - // it shouldnt be here at this point - so uninstall it - this->ioTable.remove ( *pNetIO ); - chan.tcpiiuPrivateListOfIO::eventq.remove ( *pNetIO ); - pNetIO->notify ().exceptionNotify ( pNetIO->channel (), ECA_DISCONN, this->pHostName () ); - delete pNetIO.pointer (); - } - pNetIO = next; - } - } - this->flush (); -} - -// cancel IO operations and monitor subscriptions -void tcpiiu::disconnectAllIO ( nciu &chan ) -{ - epicsAutoMutex autoMutex ( this->mutex ); - if ( chan.verifyConnected ( *this ) ) { - tsDLIterBD < baseNMIU > pNetIO = - chan.tcpiiuPrivateListOfIO::eventq.firstIter (); - while ( pNetIO.valid () ) { - tsDLIterBD < baseNMIU > next = pNetIO; - next++; - class netSubscription *pSubscr = pNetIO->isSubscription (); - this->ioTable.remove ( *pNetIO ); - if ( pSubscr ) { - this->subscriptionCancelRequest ( *pSubscr, false ); - } - else { - // no use after disconnected - so uninstall it - chan.tcpiiuPrivateListOfIO::eventq.remove ( *pNetIO ); - pNetIO->notify ().exceptionNotify ( pNetIO->channel (), ECA_DISCONN, this->pHostName () ); - delete pNetIO.pointer (); - } - pNetIO = next; - } - } -} - -// -// care is taken to not hold the lock while deleting the -// IO so that subscription delete request (sent by the -// IO's destructor) do not deadlock -// -bool tcpiiu::destroyAllIO ( nciu &chan ) -{ - tsDLList < baseNMIU > eventQ; - { - epicsAutoMutex autoMutex ( this->mutex ); - if ( chan.verifyIIU ( *this ) ) { - while ( baseNMIU *pIO = eventQ.get () ) { - this->ioTable.remove ( *pIO ); - eventQ.add ( *pIO ); - } - } - else { return false; } } - while ( baseNMIU *pIO = eventQ.get () ) { - delete pIO; - } - return true; } -bool tcpiiu::uninstallIO ( baseNMIU &io ) +void tcpiiu::blockUntilSendBacklogIsReasonable ( epicsMutex &mutex ) { - epicsAutoMutex autoMutex ( this->mutex ); - if ( io.channel ().verifyIIU ( *this ) ) { - this->ioTable.remove ( io ); + assert ( this->blockingForFlush < UINT_MAX ); + this->blockingForFlush++; + while ( this->sendQue.flushBlockThreshold(0u) && this->state == iiu_connected ) { + epicsAutoMutexRelease autoMutex ( mutex ); + this->pCAC()->enableCallbackPreemption (); + epicsEventWaitWithTimeout ( this->flushBlockSignal, 5.0 ); + this->pCAC()->disableCallbackPreemption (); } - else { - return false; + assert ( this->blockingForFlush > 0u ); + this->blockingForFlush--; + if ( this->blockingForFlush ) { + epicsEventSignal ( this->flushBlockSignal ); } - io.channel ().tcpiiuPrivateListOfIO::eventq.remove ( io ); - return true; +} + +void tcpiiu::flushRequestIfAboveEarlyThreshold () +{ + if ( ! this->earlyFlush && this->sendQue.flushEarlyThreshold(0u) ) { + this->earlyFlush = true; + epicsEventSignal ( this->sendThreadFlushSignal ); + } +} + +bool tcpiiu::flushBlockThreshold () const +{ + return this->sendQue.flushBlockThreshold ( 0u ); } double tcpiiu::beaconPeriod () const { - epicsAutoMutex autoMutex ( this->mutex ); if ( this->pBHE ) { return this->pBHE->period (); } @@ -2062,3 +1746,4 @@ bool tcpiiu::ca_v42_ok () const + diff --git a/src/ca/tcpiiu_IL.h b/src/ca/tcpiiu_IL.h index bfe41563c..c574b7993 100644 --- a/src/ca/tcpiiu_IL.h +++ b/src/ca/tcpiiu_IL.h @@ -23,7 +23,7 @@ inline bool tcpiiu::fullyConstructed () const inline void tcpiiu::hostName ( char *pBuf, unsigned bufLength ) const { - epicsAutoMutex locker ( this->mutex ); + epicsAutoMutex locker ( this->pCAC()->mutex() ); if ( this->pHostNameCache ) { this->pHostNameCache->hostName ( pBuf, bufLength ); } @@ -40,7 +40,7 @@ inline const char * tcpiiu::pHostName () const return nameBuf; // ouch !! } -inline void tcpiiu::flush () +inline void tcpiiu::flushRequest () { epicsEventSignal ( this->sendThreadFlushSignal ); } diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp index 04b59a0f1..95be8de66 100644 --- a/src/ca/udpiiu.cpp +++ b/src/ca/udpiiu.cpp @@ -454,13 +454,10 @@ epicsShareFunc void epicsShareAPI caStartRepeaterIfNotInstalled ( unsigned repea void udpiiu::shutdown () { - { - epicsAutoMutex autoMutex ( this->mutex ); - if ( this->shutdownCmd ) { - return; - } - this->shutdownCmd = true; + if ( this->shutdownCmd ) { + return; } + this->shutdownCmd = true; caHdr msg; msg.m_cmmd = htons ( CA_PROTO_NOOP ); @@ -735,8 +732,6 @@ bool udpiiu::pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t e return false; } - epicsAutoMutex autoMutex ( this->mutex ); - if ( msgsize + this->nBytesInXmitBuf > sizeof ( this->xmitBuf ) ) { return false; } @@ -754,15 +749,10 @@ bool udpiiu::pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t e return true; } -// -// udpiiu::flush () -// -void udpiiu::flush () +void udpiiu::datagramFlush () { osiSockAddrNode *pNode; - epicsAutoMutex autoMutex ( this->mutex ); - if ( this->nBytesInXmitBuf == 0u ) { return; } @@ -820,7 +810,6 @@ void udpiiu::flush () void udpiiu::show ( unsigned level ) const { - epicsAutoMutex autoMutex ( this->mutex ); printf ( "Datagram IO circuit (and disconnected channel repository)\n"); if ( level > 1u ) { this->netiiu::show ( level - 1u );