From ad819cba657ab07a65bae659bdbd828331fd31f0 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Fri, 7 Sep 2001 23:02:32 +0000 Subject: [PATCH] restructure to eliminate use of select --- src/ca/CASG.cpp | 11 +- src/ca/acctst.c | 2 +- src/ca/cac.cpp | 212 ++++++++++++++----------- src/ca/cac.h | 22 +-- src/ca/comBuf.cpp | 4 +- src/ca/comBuf.h | 33 +++- src/ca/comQueRecv.cpp | 2 + src/ca/comQueRecv.h | 95 ++++++++++++ src/ca/comQueSend.cpp | 28 ++++ src/ca/comQueSend.h | 195 +++++++++++++++++++++++ src/ca/hostNameCache.cpp | 2 +- src/ca/hostNameCache.h | 40 +++++ src/ca/syncGroup.h | 1 - src/ca/tcpRecvWatchdog.h | 47 ++++++ src/ca/tcpSendWatchdog.h | 36 +++++ src/ca/tcpiiu.cpp | 244 ++++++++++++++++------------- src/ca/udpiiu.cpp | 73 +++++---- src/ca/udpiiu.h | 5 +- src/ca/virtualCircuit.h | 325 ++------------------------------------- 19 files changed, 807 insertions(+), 570 deletions(-) create mode 100644 src/ca/comQueRecv.h create mode 100644 src/ca/comQueSend.h create mode 100644 src/ca/hostNameCache.h create mode 100644 src/ca/tcpRecvWatchdog.h create mode 100644 src/ca/tcpSendWatchdog.h diff --git a/src/ca/CASG.cpp b/src/ca/CASG.cpp index b4b1fa7fb..0fa4fded0 100644 --- a/src/ca/CASG.cpp +++ b/src/ca/CASG.cpp @@ -111,14 +111,9 @@ int CASG::block ( double timeout ) break; } - { - // serialize access the blocking mechanism below - epicsAutoMutex autoMutex ( this->serializeBlock ); - - status = this->client.blockForEventAndEnableCallbacks ( this->sem, remaining ); - if ( status != ECA_NORMAL ) { - return status; - } + status = this->client.blockForEventAndEnableCallbacks ( this->sem, remaining ); + if ( status != ECA_NORMAL ) { + return status; } /* diff --git a/src/ca/acctst.c b/src/ca/acctst.c index 6e9f4eb40..d18eb45ae 100644 --- a/src/ca/acctst.c +++ b/src/ca/acctst.c @@ -1986,7 +1986,7 @@ void verifyOldPend () { int status; /* - * at least verify that the old ca_pend() is in the symbol table + * verify that the old ca_pend() is in the symbol table */ status = ca_pend ( 100000.0, 1 ); assert ( status = ECA_NORMAL ); diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index d30bda80b..c4c407e3c 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -12,6 +12,8 @@ #define epicsAssertAuthor "Jeff Hill johill@lanl.gov" +#include + #include "epicsMemory.h" #include "osiProcess.h" #include "osiSigPipeIgnore.h" @@ -124,20 +126,20 @@ extern "C" void cacOnceFunc ( void * ) // // cac::cac () // -cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : +cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) : ipToAEngine ( "caIPAddrToAsciiEngine" ), pudpiiu ( 0 ), pSearchTmr ( 0 ), pRepeaterSubscribeTmr ( 0 ), tcpSmallRecvBufFreeList ( 0 ), tcpLargeRecvBufFreeList ( 0 ), + pCallbackLocker ( 0 ), notify ( notifyIn ), initializingThreadsPriority ( epicsThreadGetPrioritySelf () ), maxRecvBytesTCP ( MAX_TCP ), pndRecvCnt ( 0u ), readSeq ( 0u ), - recvThreadsPendingCount ( 0u ), - enablePreemptiveCallback ( enablePreemptiveCallbackIn ) + recvThreadsPendingCount ( 0u ) { long status; unsigned abovePriority; @@ -230,20 +232,26 @@ cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) : freeListCleanup ( this->tcpLargeRecvBufFreeList ); throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) ); } - if ( ! this->enablePreemptiveCallback ) { - this->callbackMutex.lock (); + + if ( ! enablePreemptiveCallbackIn ) { + this->pCallbackLocker = new ( std::nothrow ) callbackAutoMutex ( *this ); + if ( ! this->pCallbackLocker ) { + osiSockRelease (); + free ( this->pUserName ); + freeListCleanup ( this->tcpSmallRecvBufFreeList ); + freeListCleanup ( this->tcpLargeRecvBufFreeList ); + this->pTimerQueue->release (); + throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) ); + } } } cac::~cac () { // - // make certain that process thread isnt deleting - // tcpiiu objects at the same that this thread is + // release callback lock // - if ( ! this->enablePreemptiveCallback ) { - this->callbackMutex.unlock (); - } + delete this->pCallbackLocker; // // lock intentionally not held here so that we dont deadlock @@ -339,7 +347,7 @@ void cac::show ( unsigned level ) const this->serverTable.show ( level - 1u ); ::printf ( "\tconnection time out watchdog period %f\n", this->connTMO ); ::printf ( "\tpreemptive calback is %s\n", - this->enablePreemptiveCallback ? "enabled" : "disabled" ); + this->pCallbackLocker ? "disabled" : "enabled" ); ::printf ( "list of installed services:\n" ); this->services.show ( level - 1u ); } @@ -480,30 +488,32 @@ int cac::pendIO ( const double & timeout ) this->flushRequestPrivate (); } - { - // serialize access the blocking mechanism below - epicsAutoMutex autoMutex ( this->serializePendIO ); + while ( this->pndRecvCnt > 0 ) { + if ( remaining < CAC_SIGNIFICANT_DELAY ) { + status = ECA_TIMEOUT; + break; + } - while ( this->pndRecvCnt > 0 ) { - if ( remaining < CAC_SIGNIFICANT_DELAY ) { - status = ECA_TIMEOUT; - break; - } - if ( this->enablePreemptiveCallback ) { - this->ioDone.wait ( remaining ); - } - else { + { + // serialize access the blocking mechanism below + epicsAutoMutex autoMutex ( this->serializePendIO ); + + if ( this->pCallbackLocker ) { epicsAutoMutexRelease autoRelease ( this->callbackMutex ); this->ioDone.wait ( remaining ); } - double delay = epicsTime::getCurrent () - beg_time; - if ( delay < timeout ) { - remaining = timeout - delay; - } else { - remaining = 0.0; + this->ioDone.wait ( remaining ); } } + + double delay = epicsTime::getCurrent () - beg_time; + if ( delay < timeout ) { + remaining = timeout - delay; + } + else { + remaining = 0.0; + } } { @@ -520,11 +530,13 @@ int cac::pendIO ( const double & timeout ) int cac::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout ) { - if ( this->enablePreemptiveCallback ) { + epicsAutoMutex autoMutex ( this->serializeCallbackMutexUsage ); + + if ( this->pCallbackLocker ) { + epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex ); event.wait ( timeout ); } else { - epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex ); event.wait ( timeout ); } @@ -548,13 +560,13 @@ int cac::pendEvent ( const double & timeout ) { // serialize access the blocking mechanism below - epicsAutoMutex autoMutex ( this->serializePendEvent ); + epicsAutoMutex autoMutex ( this->serializeCallbackMutexUsage ); // process at least once if preemptive callback // isnt enabled - if ( ! this->enablePreemptiveCallback ) { + if ( this->pCallbackLocker ) { epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex ); - while ( this->recvThreadsPendingCount ) { + while ( this->recvThreadsPendingCount > 1 ) { this->noRecvThreadsPending.wait (); } } @@ -571,11 +583,11 @@ int cac::pendEvent ( const double & timeout ) } if ( delay >= CAC_SIGNIFICANT_DELAY ) { - if ( this->enablePreemptiveCallback ) { + if ( this->pCallbackLocker ) { + epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex ); epicsThreadSleep ( delay ); } else { - epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex ); epicsThreadSleep ( delay ); } } @@ -734,22 +746,7 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, return true; } } - - bhe * pBHE = this->beaconTable.lookup ( addr.ia ); - if ( ! pBHE ) { - pBHE = new bhe ( epicsTime (), addr.ia ); - if ( pBHE ) { - if ( this->beaconTable.add ( *pBHE ) < 0 ) { - pBHE->destroy (); - return true; - } - } - else { - return true; - } - } - - if ( ! piiu ) { + else { try { piiu = new tcpiiu ( *this, this->connTMO, *this->pTimerQueue, addr, minorVersionNumber, this->ipToAEngine, @@ -758,6 +755,19 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, return true; } this->serverTable.add ( *piiu ); + bhe * pBHE = this->beaconTable.lookup ( addr.ia ); + if ( ! pBHE ) { + pBHE = new bhe ( epicsTime (), addr.ia ); + if ( pBHE ) { + if ( this->beaconTable.add ( *pBHE ) < 0 ) { + pBHE->destroy (); + return true; + } + } + else { + return true; + } + } pBHE->registerIIU ( *piiu ); } catch ( ... ) { @@ -814,19 +824,33 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid, void cac::uninstallChannel ( nciu & chan ) { - { - epicsAutoMutex autoMutex ( this->mutex ); - nciu *pChan = this->chanTable.remove ( chan ); - assert ( pChan = &chan ); - // flush prior to taking the callback lock - this->flushIfRequired ( *chan.getPIIU() ); - chan.getPIIU()->clearChannelRequest ( chan ); - chan.getPIIU()->detachChannel ( chan ); - } + // + // dont block on the call back lock if this isnt the + // primary thread + this->udpWakeup (); - // taking this mutex guarantees that we will not delete - // a channel out from under a callback - epicsAutoMutex autoCallbackMutex ( this->callbackMutex ); + epicsAutoMutex autoMutex ( this->serializeCallbackMutexUsage ); + + if ( this->pCallbackLocker ) { + this->uninstallChannelPrivate ( chan ); + } + else { + // taking this mutex guarantees that we will not delete + // a channel out from under a callback + epicsAutoMutex autoCallbackMutex ( this->callbackMutex ); + this->uninstallChannelPrivate ( chan ); + } +} + +void cac::uninstallChannelPrivate ( nciu & chan ) +{ + epicsAutoMutex autoMutex ( this->mutex ); + nciu * pChan = this->chanTable.remove ( chan ); + assert ( pChan = &chan ); + // flush prior to taking the callback lock + this->flushIfRequired ( *chan.getPIIU() ); + chan.getPIIU()->clearChannelRequest ( chan ); + chan.getPIIU()->detachChannel ( chan ); } int cac::printf ( const char *pformat, ... ) const @@ -858,13 +882,13 @@ void cac::flushIfRequired ( netiiu & iiu ) // enable / disable of call back preemption must occur here // because the tcpiiu might disconnect while waiting and its // pointer to this cac might become invalid - if ( this->enablePreemptiveCallback ) { - iiu.blockUntilSendBacklogIsReasonable ( 0, this->mutex ); - } - else { + if ( this->pCallbackLocker ) { iiu.blockUntilSendBacklogIsReasonable ( &this->callbackMutex, this->mutex ); } + else { + iiu.blockUntilSendBacklogIsReasonable ( 0, this->mutex ); + } } } else { @@ -919,13 +943,13 @@ cacChannel::ioid cac::readNotifyRequest ( nciu &chan, unsigned type, void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id ) { if ( ! epicsThreadPrivateGet ( caClientCallbackThreadId ) && - this->enablePreemptiveCallback ) { - // wait for any IO callbacks in progress to complete - // prior to destroying the IO object - epicsAutoMutex autoMutex ( this->callbackMutex ); + this->pCallbackLocker ) { this->ioCancelPrivate ( chan, id ); } else { + // wait for any IO callbacks in progress to complete + // prior to destroying the IO object + epicsAutoMutex autoMutex ( this->callbackMutex ); this->ioCancelPrivate ( chan, id ); } } @@ -1172,13 +1196,13 @@ void cac::disconnectAllIO ( nciu & chan, bool enableCallbacks ) void cac::destroyAllIO ( nciu & chan ) { if ( ! epicsThreadPrivateGet ( caClientCallbackThreadId ) && - this->enablePreemptiveCallback ) { - // force any callbacks in progress to complete - // before deleting the IO - epicsAutoMutex autoMutex ( this->callbackMutex ); + this->pCallbackLocker ) { this->privateDestroyAllIO ( chan ); } else { + // force any callbacks in progress to complete + // before deleting the IO + epicsAutoMutex autoMutex ( this->callbackMutex ); this->privateDestroyAllIO ( chan ); } } @@ -1692,14 +1716,14 @@ void cac::selfTest () const void cac::notifyNewFD ( SOCKET sock ) const { - if ( ! this->enablePreemptiveCallback ) { + if ( this->pCallbackLocker ) { this->notify.fdWasCreated ( sock ); } } void cac::notifyDestroyFD ( SOCKET sock ) const { - if ( ! this->enablePreemptiveCallback ) { + if ( this->pCallbackLocker ) { this->notify.fdWasDestroyed ( sock ); } } @@ -1739,10 +1763,10 @@ void cac::uninstallIIU ( tcpiiu & iiu ) this->iiuUninstal.signal(); } -void cac::preemptiveCallbackLock() +void cac::preemptiveCallbackLock () { // the count must be incremented prior to taking the lock - if ( ! this->enablePreemptiveCallback ) { + { epicsAutoMutex autoMutex ( this->mutex ); assert ( this->recvThreadsPendingCount < UINT_MAX ); this->recvThreadsPendingCount++; @@ -1750,25 +1774,26 @@ void cac::preemptiveCallbackLock() this->callbackMutex.lock (); } -void cac::preemptiveCallbackUnlock() +void cac::preemptiveCallbackUnlock () { this->callbackMutex.unlock (); - if ( ! this->enablePreemptiveCallback ) { - bool signalRequired; - { - epicsAutoMutex autoMutex ( this->mutex ); - assert ( this->recvThreadsPendingCount > 0 ); - this->recvThreadsPendingCount--; - if ( this->recvThreadsPendingCount == 0u ) { + bool signalRequired; + { + epicsAutoMutex autoMutex ( this->mutex ); + assert ( this->recvThreadsPendingCount > 0 ); + this->recvThreadsPendingCount--; + unsigned noThreadsWaiting; + if ( this->pCallbackLocker ) { + if ( this->recvThreadsPendingCount == 1u ) { signalRequired = true; } else { signalRequired = false; } } - if ( signalRequired ) { - this->noRecvThreadsPending.signal (); - } + } + if ( signalRequired ) { + this->noRecvThreadsPending.signal (); } } @@ -1789,3 +1814,10 @@ double cac::beaconPeriod ( const nciu & chan ) const return - DBL_MAX; } +void cac::udpWakeup () +{ + epicsAutoMutex locker ( this->mutex ); + if ( this->pudpiiu ) { + this->pudpiiu->wakeupMsg (); + } +} diff --git a/src/ca/cac.h b/src/ca/cac.h index a1e7c46e2..48e0db0c0 100644 --- a/src/ca/cac.h +++ b/src/ca/cac.h @@ -141,6 +141,7 @@ public: void uninstallIIU ( tcpiiu &iiu ); bool preemptiveCallbackEnable () const; double beaconPeriod ( const nciu & chan ) const; + void udpWakeup (); private: ipAddrToAsciiEngine ipToAEngine; @@ -169,27 +170,28 @@ private: mutable epicsMutex mutex; epicsMutex callbackMutex; epicsMutex serializePendIO; - epicsMutex serializePendEvent; + epicsMutex serializeCallbackMutexUsage; epicsEvent ioDone; epicsEvent noRecvThreadsPending; epicsEvent iiuUninstal; - epicsTimerQueueActive *pTimerQueue; - char *pUserName; - class udpiiu *pudpiiu; - class searchTimer *pSearchTmr; + epicsTimerQueueActive * pTimerQueue; + char * pUserName; + class udpiiu * pudpiiu; + class searchTimer * pSearchTmr; class repeaterSubscribeTimer - *pRepeaterSubscribeTmr; - void *tcpSmallRecvBufFreeList; - void *tcpLargeRecvBufFreeList; + * pRepeaterSubscribeTmr; + void * tcpSmallRecvBufFreeList; + void * tcpLargeRecvBufFreeList; + class callbackAutoMutex * pCallbackLocker; cacNotify & notify; unsigned initializingThreadsPriority; unsigned maxRecvBytesTCP; unsigned pndRecvCnt; unsigned readSeq; unsigned recvThreadsPendingCount; - bool enablePreemptiveCallback; void flushRequestPrivate (); + void uninstallChannelPrivate ( nciu & ); void run (); bool setupUDP (); void connectAllIO ( nciu &chan ); @@ -348,7 +350,7 @@ inline bool cac::ioComplete () const inline bool cac::preemptiveCallbackEnable () const { - return this->enablePreemptiveCallback; + return ! this->pCallbackLocker; } #endif // ifdef cach diff --git a/src/ca/comBuf.cpp b/src/ca/comBuf.cpp index c56b88447..11f424907 100644 --- a/src/ca/comBuf.cpp +++ b/src/ca/comBuf.cpp @@ -23,8 +23,8 @@ bool comBuf::flushToWire ( wireSendAdapter &wire ) { unsigned occupied = this->occupiedBytes (); while ( occupied ) { - unsigned nBytes = wire.sendBytes ( &this->buf[this->nextReadIndex], - occupied ); + unsigned nBytes = wire.sendBytes ( + &this->buf[this->nextReadIndex], occupied ); if ( nBytes == 0u ) { return false; } diff --git a/src/ca/comBuf.h b/src/ca/comBuf.h index 15866c3e2..1ad228438 100644 --- a/src/ca/comBuf.h +++ b/src/ca/comBuf.h @@ -33,7 +33,6 @@ class wireSendAdapter { public: virtual unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf ) = 0; - virtual void forcedShutdown () = 0; }; class wireRecvAdapter { @@ -48,6 +47,7 @@ public: void destroy (); unsigned unoccupiedBytes () const; unsigned occupiedBytes () const; + unsigned uncommittedBytes () const; static unsigned capacityBytes (); void clear (); unsigned copyInBytes ( const void *pBuf, unsigned nBytes ); @@ -61,6 +61,8 @@ public: unsigned copyIn ( const epicsFloat32 *pValue, unsigned nElem ); unsigned copyIn ( const epicsFloat64 *pValue, unsigned nElem ); unsigned copyIn ( const epicsOldString *pValue, unsigned nElem ); + void commitIncomming (); + void clearUncommittedIncomming (); bool copyInAllBytes ( const void *pBuf, unsigned nBytes ); unsigned copyOutBytes ( void *pBuf, unsigned nBytes ); bool copyOutAllBytes ( void *pBuf, unsigned nBytes ); @@ -84,6 +86,7 @@ public: protected: ~comBuf (); private: + unsigned commitIndex; unsigned nextWriteIndex; unsigned nextReadIndex; epicsUInt8 buf [ comBufSize ]; @@ -93,7 +96,8 @@ private: static epicsMutex freeListMutex; }; -inline comBuf::comBuf () : nextWriteIndex ( 0u ), nextReadIndex ( 0u ) +inline comBuf::comBuf () : nextWriteIndex ( 0u ), + nextReadIndex ( 0u ), commitIndex ( 0u ) { } @@ -108,6 +112,7 @@ inline void comBuf::destroy () inline void comBuf::clear () { + this->commitIndex = 0u; this->nextWriteIndex = 0u; this->nextReadIndex = 0u; } @@ -131,8 +136,12 @@ inline unsigned comBuf::unoccupiedBytes () const inline unsigned comBuf::occupiedBytes () const { - // assert (this->nextWriteIndex >= this->nextReadIndex); - return this->nextWriteIndex - this->nextReadIndex; + return this->commitIndex - this->nextReadIndex; +} + +inline unsigned comBuf::uncommittedBytes () const +{ + return this->nextWriteIndex - this->commitIndex; } inline bool comBuf::copyInAllBytes ( const void *pBuf, unsigned nBytes ) @@ -158,15 +167,15 @@ inline unsigned comBuf::copyInBytes ( const void *pBuf, unsigned nBytes ) return nBytes; } -inline unsigned comBuf::copyIn ( comBuf &bufIn ) +inline unsigned comBuf::copyIn ( comBuf & bufIn ) { unsigned nBytes = this->copyInBytes ( &bufIn.buf[bufIn.nextReadIndex], - bufIn.nextWriteIndex - bufIn.nextReadIndex ); + bufIn.commitIndex - bufIn.nextReadIndex ); bufIn.nextReadIndex += nBytes; return nBytes; } -inline bool comBuf::copyOutAllBytes ( void *pBuf, unsigned nBytes ) +inline bool comBuf::copyOutAllBytes ( void * pBuf, unsigned nBytes ) { if ( nBytes <= this->occupiedBytes () ) { memcpy ( pBuf, &this->buf[this->nextReadIndex], nBytes); @@ -371,4 +380,14 @@ inline comBuf::statusPopUInt32 comBuf::popUInt32 () return tmp; } +inline void comBuf::commitIncomming () +{ + this->commitIndex = this->nextWriteIndex; +} + +inline void comBuf::clearUncommittedIncomming () +{ + this->nextWriteIndex = this->commitIndex; +} + #endif // ifndef comBufh diff --git a/src/ca/comQueRecv.cpp b/src/ca/comQueRecv.cpp index def465bed..a88c6cda8 100644 --- a/src/ca/comQueRecv.cpp +++ b/src/ca/comQueRecv.cpp @@ -84,10 +84,12 @@ unsigned comQueRecv::removeBytes ( unsigned nBytes ) void comQueRecv::pushLastComBufReceived ( comBuf & bufIn ) { + bufIn.commitIncomming (); comBuf * pComBuf = this->bufs.last (); if ( pComBuf ) { if ( pComBuf->unoccupiedBytes() ) { this->nBytesPending += pComBuf->copyIn ( bufIn ); + pComBuf->commitIncomming (); } } unsigned bufBytes = bufIn.occupiedBytes(); diff --git a/src/ca/comQueRecv.h b/src/ca/comQueRecv.h new file mode 100644 index 000000000..95555caad --- /dev/null +++ b/src/ca/comQueRecv.h @@ -0,0 +1,95 @@ + +/* + * $Id$ + * + * + * L O S A L A M O S + * Los Alamos National Laboratory + * Los Alamos, New Mexico 87545 + * + * Copyright, 1986, The Regents of the University of California. + * + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + */ + +#ifndef comQueRecvh +#define comQueRecvh + +class comQueRecv { +public: + comQueRecv (); + ~comQueRecv (); + unsigned occupiedBytes () const; + unsigned copyOutBytes ( epicsInt8 *pBuf, unsigned nBytes ); + unsigned removeBytes ( unsigned nBytes ); + void pushLastComBufReceived ( comBuf & ); + void clear (); + epicsInt8 popInt8 (); + epicsUInt8 popUInt8 (); + epicsInt16 popInt16 (); + epicsUInt16 popUInt16 (); + epicsInt32 popInt32 (); + epicsUInt32 popUInt32 (); + epicsFloat32 popFloat32 (); + epicsFloat64 popFloat64 (); + void popString ( epicsOldString * ); + class insufficentBytesAvailable {}; +private: + tsDLList < comBuf > bufs; + unsigned nBytesPending; +}; + +inline unsigned comQueRecv::occupiedBytes () const +{ + return this->nBytesPending; +} + +inline epicsInt8 comQueRecv::popInt8 () +{ + return static_cast < epicsInt8 > ( this->popUInt8() ); +} + +inline epicsInt16 comQueRecv::popInt16 () +{ + epicsInt16 tmp; + tmp = this->popInt8() << 8u; + tmp |= this->popInt8() << 0u; + return tmp; +} + +inline epicsInt32 comQueRecv::popInt32 () +{ + epicsInt32 tmp ; + tmp |= this->popInt8() << 24u; + tmp |= this->popInt8() << 16u; + tmp |= this->popInt8() << 8u; + tmp |= this->popInt8() << 0u; + return tmp; +} + +inline epicsFloat32 comQueRecv::popFloat32 () +{ + epicsFloat32 tmp; + epicsUInt8 wire[ sizeof ( tmp ) ]; + for ( unsigned i = 0u; i < sizeof ( tmp ); i++ ) { + wire[i] = this->popUInt8 (); + } + osiConvertFromWireFormat ( tmp, wire ); + return tmp; +} + +inline epicsFloat64 comQueRecv::popFloat64 () +{ + epicsFloat64 tmp; + epicsUInt8 wire[ sizeof ( tmp ) ]; + for ( unsigned i = 0u; i < sizeof ( tmp ); i++ ) { + wire[i] = this->popUInt8 (); + } + osiConvertFromWireFormat ( tmp, wire ); + return tmp; +} + +#endif // ifndef comQueRecvh \ No newline at end of file diff --git a/src/ca/comQueSend.cpp b/src/ca/comQueSend.cpp index 549c19a91..db56f5919 100644 --- a/src/ca/comQueSend.cpp +++ b/src/ca/comQueSend.cpp @@ -87,6 +87,8 @@ void comQueSend::clear () this->nBytesPending -= pBuf->occupiedBytes (); pBuf->destroy (); } + this->pFirstUncommited = tsDLIterBD < comBuf > (); + assert ( this->nBytesPending == 0 ); } void comQueSend::copy_dbr_string ( const void *pValue, unsigned nElem ) @@ -161,3 +163,29 @@ const comQueSend::copyFunc_t comQueSend::dbrCopyVector [39] = { 0 // DBR_CLASS_NAME }; +comBuf * comQueSend::popNextComBufToSend () +{ + comBuf *pBuf = this->bufs.get (); + if ( pBuf ) { + unsigned nBytesThisBuf = pBuf->occupiedBytes (); + if ( nBytesThisBuf ) { + assert ( this->nBytesPending >= nBytesThisBuf ); + this->nBytesPending -= nBytesThisBuf; + } + else { + this->bufs.push ( *pBuf ); + pBuf = 0; + } + } + else { + assert ( this->nBytesPending == 0u ); + } + return pBuf; +} + +void comQueRecv::popString ( epicsOldString *pStr ) +{ + for ( unsigned i = 0u; i < sizeof ( *pStr ); i++ ) { + pStr[0][i] = this->popInt8 (); + } +} diff --git a/src/ca/comQueSend.h b/src/ca/comQueSend.h new file mode 100644 index 000000000..a95c275c5 --- /dev/null +++ b/src/ca/comQueSend.h @@ -0,0 +1,195 @@ + + +/* + * $Id$ + * + * + * L O S A L A M O S + * Los Alamos National Laboratory + * Los Alamos, New Mexico 87545 + * + * Copyright, 1986, The Regents of the University of California. + * + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + */ + +#ifndef comQueSendh +#define comQueSendh + +#include + +#include "tsDLList.h" +#include "comBuf.h" + +// +// Notes. +// o calling popNextComBufToSend() will clear +// any uncommitted bytes +// +class comQueSend { +public: + comQueSend ( wireSendAdapter & ); + ~comQueSend (); + void clear (); + void beginMsg (); + void commitMsg (); + unsigned occupiedBytes () const; + bool flushEarlyThreshold ( unsigned nBytesThisMsg ) const; + bool flushBlockThreshold ( unsigned nBytesThisMsg ) const; + bool dbr_type_ok ( unsigned type ); + void pushUInt16 ( const ca_uint16_t value ); + void pushUInt32 ( const ca_uint32_t value ); + void pushFloat32 ( const ca_float32_t value ); + void pushString ( const char *pVal, unsigned nChar ); + void push_dbr_type ( unsigned type, const void *pVal, unsigned nElem ); + comBuf * popNextComBufToSend (); +private: + tsDLList < comBuf > bufs; + tsDLIterBD < comBuf > pFirstUncommited; + wireSendAdapter & wire; + unsigned nBytesPending; + void copy_dbr_string ( const void *pValue, unsigned nElem ); + void copy_dbr_short ( const void *pValue, unsigned nElem ); + void copy_dbr_float ( const void *pValue, unsigned nElem ); + void copy_dbr_char ( const void *pValue, unsigned nElem ); + void copy_dbr_long ( const void *pValue, unsigned nElem ); + void copy_dbr_double ( const void *pValue, unsigned nElem ); + typedef void ( comQueSend::*copyFunc_t ) ( + const void *pValue, unsigned nElem ); + static const copyFunc_t dbrCopyVector [39]; + + // + // visual C++ version 6.0 does not allow out of + // class member template function definition + // + template < class T > + inline void copyIn ( const T *pVal, unsigned nElem ) + { + comBuf * pLastBuf = this->bufs.last (); + unsigned nCopied; + if ( pLastBuf ) { + nCopied = pLastBuf->copyIn ( pVal, nElem ); + } + else { + nCopied = 0u; + } + while ( nElem > nCopied ) { + comBuf * pComBuf = new ( std::nothrow ) comBuf; + if ( ! pComBuf ) { + throw std::bad_alloc (); + } + unsigned nNew = pComBuf->copyIn ( &pVal[nCopied], nElem - nCopied ); + nCopied += nNew; + this->bufs.add ( *pComBuf ); + if ( ! this->pFirstUncommited.valid() ) { + this->pFirstUncommited = this->bufs.lastIter (); + } + } + } + + // + // visual C++ version 6.0 does not allow out of + // class member template function definition + // + template < class T > + inline void copyIn ( const T &val ) + { + comBuf *pComBuf = this->bufs.last (); + if ( pComBuf ) { + if ( pComBuf->copyIn ( &val, 1u ) >= 1u ) { + return; + } + } + pComBuf = new ( std::nothrow ) comBuf; + if ( ! pComBuf ) { + throw std::bad_alloc (); + } + assert ( pComBuf->copyIn ( &val, 1u ) == 1u ); + this->bufs.add ( *pComBuf ); + if ( ! this->pFirstUncommited.valid() ) { + this->pFirstUncommited = this->bufs.lastIter (); + } + return; + } +}; + +inline bool comQueSend::dbr_type_ok ( unsigned type ) +{ + if ( type >= ( sizeof ( this->dbrCopyVector ) / sizeof ( this->dbrCopyVector[0] ) ) ) { + return false; + } + if ( ! this->dbrCopyVector [type] ) { + return false; + } + return true; +} + +inline void comQueSend::pushUInt16 ( const ca_uint16_t value ) +{ + this->copyIn ( value ); +} + +inline void comQueSend::pushUInt32 ( const ca_uint32_t value ) +{ + this->copyIn ( value ); +} + +inline void comQueSend::pushFloat32 ( const ca_float32_t value ) +{ + this->copyIn ( value ); +} + +inline void comQueSend::pushString ( const char *pVal, unsigned nChar ) +{ + this->copyIn ( pVal, nChar ); +} + +// it is assumed that dbr_type_ok() was called prior to calling this routine +// to check the type code +inline void comQueSend::push_dbr_type ( unsigned type, const void *pVal, unsigned nElem ) +{ + ( this->*dbrCopyVector [type] ) ( pVal, nElem ); +} + +inline unsigned comQueSend::occupiedBytes () const +{ + return this->nBytesPending; +} + +inline bool comQueSend::flushBlockThreshold ( unsigned nBytesThisMsg ) const +{ + return ( this->nBytesPending + nBytesThisMsg > 16 * comBuf::capacityBytes () ); +} + +inline bool comQueSend::flushEarlyThreshold ( unsigned nBytesThisMsg ) const +{ + return ( this->nBytesPending + nBytesThisMsg > 4 * comBuf::capacityBytes () ); +} + +inline void comQueSend::beginMsg () +{ + while ( this->pFirstUncommited.valid() ) { + tsDLIterBD < comBuf > next = this->pFirstUncommited; + next++; + this->pFirstUncommited->clearUncommittedIncomming (); + if ( this->pFirstUncommited->occupiedBytes() == 0u ) { + this->bufs.remove ( *this->pFirstUncommited ); + } + this->pFirstUncommited = next; + } + this->pFirstUncommited = this->bufs.lastIter (); +} + +inline void comQueSend::commitMsg () +{ + while ( this->pFirstUncommited.valid() ) { + this->nBytesPending += this->pFirstUncommited->uncommittedBytes (); + this->pFirstUncommited->commitIncomming (); + this->pFirstUncommited++; + } +} + +#endif // ifndef comQueSendh diff --git a/src/ca/hostNameCache.cpp b/src/ca/hostNameCache.cpp index 41945a09a..ee9bde5ef 100644 --- a/src/ca/hostNameCache.cpp +++ b/src/ca/hostNameCache.cpp @@ -17,7 +17,7 @@ #define epicsAssertAuthor "Jeff Hill johill@lanl.gov" #include "iocinf.h" -#include "virtualCircuit.h" +#include "hostNameCache.h" tsFreeList < hostNameCache, 16 > hostNameCache::freeList; epicsMutex hostNameCache::freeListMutex; diff --git a/src/ca/hostNameCache.h b/src/ca/hostNameCache.h new file mode 100644 index 000000000..dc39b0ecc --- /dev/null +++ b/src/ca/hostNameCache.h @@ -0,0 +1,40 @@ + +/* + * $Id$ + * + * + * L O S A L A M O S + * Los Alamos National Laboratory + * Los Alamos, New Mexico 87545 + * + * Copyright, 1986, The Regents of the University of California. + * + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + */ + +#ifndef hostNameCacheh +#define hostNameCacheh + +#include "ipAddrToAsciiAsynchronous.h" +#include "tsFreeList.h" + +class hostNameCache : public ipAddrToAsciiAsynchronous { +public: + hostNameCache ( const osiSockAddr &addr, ipAddrToAsciiEngine &engine ); + ~hostNameCache (); + void destroy (); + void ioCompletionNotify ( const char *pHostName ); + void hostName ( char *pBuf, unsigned bufLength ) const; + void * operator new ( size_t size ); + void operator delete ( void *pCadaver, size_t size ); +private: + bool ioComplete; + char hostNameBuf [128]; + static tsFreeList < class hostNameCache, 16 > freeList; + static epicsMutex freeListMutex; +}; + +#endif // #ifndef hostNameCacheh diff --git a/src/ca/syncGroup.h b/src/ca/syncGroup.h index 70cb73df1..7ab756687 100644 --- a/src/ca/syncGroup.h +++ b/src/ca/syncGroup.h @@ -130,7 +130,6 @@ protected: private: tsDLList < syncGroupNotify > ioList; epicsMutex mutable mutex; - epicsMutex serializeBlock; epicsEvent sem; oldCAC & client; unsigned magic; diff --git a/src/ca/tcpRecvWatchdog.h b/src/ca/tcpRecvWatchdog.h new file mode 100644 index 000000000..98b1b95fe --- /dev/null +++ b/src/ca/tcpRecvWatchdog.h @@ -0,0 +1,47 @@ + +/* + * $Id$ + * + * + * L O S A L A M O S + * Los Alamos National Laboratory + * Los Alamos, New Mexico 87545 + * + * Copyright, 1986, The Regents of the University of California. + * + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + */ + +#ifndef tcpRecvWatchdogh +#define tcpRecvWatchdogh + +#include "epicsTimer.h" + +class tcpiiu; + +class tcpRecvWatchdog : private epicsTimerNotify { +public: + tcpRecvWatchdog ( tcpiiu &, double periodIn, epicsTimerQueue & ); + virtual ~tcpRecvWatchdog (); + void rescheduleRecvTimer (); + void sendBacklogProgressNotify (); + void messageArrivalNotify (); + void beaconArrivalNotify (); + void beaconAnomalyNotify (); + void connectNotify (); + void cancel (); + void show ( unsigned level ) const; +private: + const double period; + epicsTimer & timer; + tcpiiu &iiu; + bool responsePending; + bool beaconAnomaly; + expireStatus expire ( const epicsTime & currentTime ); +}; + +#endif // #ifndef tcpRecvWatchdogh + diff --git a/src/ca/tcpSendWatchdog.h b/src/ca/tcpSendWatchdog.h new file mode 100644 index 000000000..b0e766079 --- /dev/null +++ b/src/ca/tcpSendWatchdog.h @@ -0,0 +1,36 @@ + +/* + * $Id$ + * + * + * L O S A L A M O S + * Los Alamos National Laboratory + * Los Alamos, New Mexico 87545 + * + * Copyright, 1986, The Regents of the University of California. + * + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + */ + +#ifndef tcpSendWatchdogh +#define tcpSendWatchdogh + +#include "epicsTimer.h" + +class tcpSendWatchdog : private epicsTimerNotify { +public: + tcpSendWatchdog ( tcpiiu &, double periodIn, epicsTimerQueue & queueIn ); + virtual ~tcpSendWatchdog (); + void start (); + void cancel (); +private: + const double period; + epicsTimer & timer; + tcpiiu & iiu; + expireStatus expire ( const epicsTime & currentTime ); +}; + +#endif // #ifndef tcpSendWatchdog diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index 4998315ab..b61b436bc 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -19,6 +19,7 @@ #include "cac.h" #include "netiiu.h" #include "msgForMultiplyDefinedPV.h" +#include "hostNameCache.h" #define epicsExportSharedSymbols #include "net_convert.h" @@ -266,44 +267,18 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) // file manager call backs works correctly. This does not // appear to impact performance. // - // We also use select() here because shutdown() does not - // unblock a thread in recv() on WIN32 and probably also on - // vxWorks. This is a problem even if preemptive callbacks - // are enabled. - // - while ( true ) { - fd_set recvInterest; - struct timeval tmo; - tmo.tv_sec = 5; // seconds - tmo.tv_usec = 0; // micro seconds - FD_ZERO ( & recvInterest ); - FD_SET ( piiu->sock, & recvInterest ); - int status = select ( piiu->sock + 1, - & recvInterest, 0, 0, & tmo ); - if ( piiu->state != iiu_connected ) { - break; - } - if ( status < 0 ) { - int localErrno = SOCKERRNO; - if ( localErrno == SOCK_EINTR ) { - continue; - } - else if ( localErrno == SOCK_EBADF ) { - piiu->state = iiu_disconnected; - break; - } - else { - errlogPrintf ( "Select error was %s\n", - SOCKERRSTR ( localErrno ) ); - epicsThreadSleep ( 1.0 ); - continue; - } - } - else if ( status == 1 ) { + unsigned nBytesIn; + if ( piiu->pCAC()->preemptiveCallbackEnable() ) { + nBytesIn = pComBuf->fillFromWire ( *piiu ); + if ( nBytesIn == 0u ) { break; } } - + else { + char buf; + ::recv ( piiu->sock, &buf, 1, MSG_PEEK ); + } + if ( piiu->state != iiu_connected ) { break; } @@ -311,16 +286,16 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) // only one recv thread at a time may call callbacks callbackAutoMutex autoMutex ( *piiu->pCAC() ); - osiSockIoctl_t bytesPending = 0; - do { - unsigned nBytesIn = pComBuf->fillFromWire ( *piiu ); + if ( ! piiu->pCAC()->preemptiveCallbackEnable() ) { + nBytesIn = pComBuf->fillFromWire ( *piiu ); if ( nBytesIn == 0u ) { // outer loop checks to see if state is connected // ( properly set by fillFromWire() ) break; } + } - piiu->recvQue.pushLastComBufReceived ( *pComBuf ); + while ( true ) { if ( nBytesIn == pComBuf->capacityBytes () ) { if ( piiu->contigRecvMsgCount >= @@ -337,6 +312,9 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) } piiu->unacknowledgedSendBytes = 0u; + piiu->recvQue.pushLastComBufReceived ( *pComBuf ); + pComBuf = 0; + // reschedule connection activity watchdog // but dont hold the lock for fear of deadlocking // because cancel is blocking for the completion @@ -344,20 +322,34 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) piiu->recvDog.messageArrivalNotify (); // execute receive labor - piiu->processIncoming (); + bool noProtocolViolation = piiu->processIncoming (); + if ( ! noProtocolViolation ) { + piiu->state = iiu_disconnected; + break; + } // allocate a new com buf pComBuf = new ( std::nothrow ) comBuf; nBytesIn = 0u; + if ( ! pComBuf ) { + break; + } { int status; + osiSockIoctl_t bytesPending = 0; status = socket_ioctl ( piiu->sock, FIONREAD, & bytesPending ); - if ( status ) { - bytesPending = 0u; + if ( status || bytesPending == 0u ) { + break; + } + nBytesIn = pComBuf->fillFromWire ( *piiu ); + if ( nBytesIn == 0u ) { + // outer loop checks to see if state is connected + // ( properly set by fillFromWire() ) + break; } } - } while ( bytesPending && pComBuf ); + } } if ( pComBuf ) { @@ -367,7 +359,6 @@ extern "C" void cacRecvThreadTCP ( void *pParam ) { callbackAutoMutex autoMutex ( *piiu->pCAC() ); piiu->pCAC()->uninstallIIU ( *piiu ); - piiu->pCAC()->notifyDestroyFD ( piiu->sock ); } piiu->destroy (); } @@ -556,60 +547,69 @@ void tcpiiu::connect () } } -/* - * tcpiiu::cleanShutdown () - */ -void tcpiiu::cleanShutdown () -{ - epicsAutoMutex autoMutex ( this->pCAC()->mutexRef() ); - if ( this->state == iiu_connected || this->state == iiu_connecting ) { - int status; - /* - * on winsock and probably vxWorks shutdown() does not - * unblock a thread in recv() so we use close and introduce - * some complexity because we must unregister the fd early - */ - status = shutdown ( this->sock, SD_BOTH ); - if ( status ) { - errlogPrintf ("CAC TCP socket shutdown error was %s\n", - SOCKERRSTR (SOCKERRNO) ); - status = socket_close ( this->sock ); - if ( status ) { - errlogPrintf ("CAC TCP socket close error was %s\n", - SOCKERRSTR (SOCKERRNO) ); - } - else { - this->sockCloseCompleted = true; - this->state = iiu_disconnected; - } - } - else { - this->state = iiu_disconnected; - } - this->sendThreadFlushEvent.signal (); - } -} - -/* - * tcpiiu::forcedShutdown () - */ void tcpiiu::forcedShutdown () { - epicsAutoMutex autoMutex ( this->pCAC()->mutexRef() ); + // generate some NOOP UDP traffic so that ca_pend_event() + // will get called in preemptive callback disabled + // applications, and therefore the callback lock below + // will not block + this->pCAC()->udpWakeup (); + callbackAutoMutex autoMutexCB ( *this->pCAC() ); + epicsAutoMutex autoMutexCAC ( this->pCAC()->mutexRef() ); + this->shutdown ( true ); +} - if ( this->state != iiu_disconnected || this->state == iiu_connecting ) { - // force abortive shutdown sequence (discard outstanding sends - // and receives) - struct linger tmpLinger; - tmpLinger.l_onoff = true; - tmpLinger.l_linger = 0u; - int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER, - reinterpret_cast ( &tmpLinger ), sizeof (tmpLinger) ); - if ( status != 0 ) { - errlogPrintf ( "CAC TCP socket linger set error was %s\n", +void tcpiiu::cleanShutdown () +{ + // generate some NOOP UDP traffic so that ca_pend_event() + // will get called in preemptive callback disabled + // applications, and therefore the callback lock below + // will not block + this->pCAC()->udpWakeup (); + callbackAutoMutex autoMutexCB ( *this->pCAC() ); + epicsAutoMutex autoMutexCAC ( this->pCAC()->mutexRef() ); + this->shutdown ( false ); +} + +// +// tcpiiu::shutdown () +// +// caller must hold callback mutex and also primary cac mutex +// when calling this routine +// +void tcpiiu::shutdown ( bool discardPendingMessages ) +{ + if ( ! this->sockCloseCompleted ) { + this->state = iiu_disconnected; + this->sockCloseCompleted = true; + + this->pCAC()->notifyDestroyFD ( this->sock ); + + if ( discardPendingMessages ) { + // force abortive shutdown sequence + // (discard outstanding sends and receives) + struct linger tmpLinger; + tmpLinger.l_onoff = true; + tmpLinger.l_linger = 0u; + int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER, + reinterpret_cast ( &tmpLinger ), sizeof (tmpLinger) ); + if ( status != 0 ) { + errlogPrintf ( "CAC TCP socket linger set error was %s\n", + SOCKERRSTR (SOCKERRNO) ); + } + } + + // + // on winsock and probably vxWorks shutdown() does not + // unblock a thread in recv() so we use close and introduce + // some complexity because we must unregister the fd early + // + int status = socket_close ( this->sock ); + if ( status ) { + errlogPrintf ("CAC TCP socket close error was %s\n", SOCKERRSTR (SOCKERRNO) ); } - this->cleanShutdown (); + this->sendThreadFlushEvent.signal (); } } @@ -768,7 +768,7 @@ bool tcpiiu::setEchoRequestPending () // // tcpiiu::processIncoming() // -void tcpiiu::processIncoming () +bool tcpiiu::processIncoming () { while ( true ) { @@ -781,7 +781,7 @@ void tcpiiu::processIncoming () if ( ! this->oldMsgHeaderAvailable ) { if ( nBytes < sizeof ( caHdr ) ) { this->flushIfRecvProcessRequested (); - return; + return true; } this->curMsg.m_cmmd = this->recvQue.popUInt16 (); this->curMsg.m_postsize = this->recvQue.popUInt16 (); @@ -796,7 +796,7 @@ void tcpiiu::processIncoming () sizeof ( this->curMsg.m_postsize ) + sizeof ( this->curMsg.m_count ); if ( this->recvQue.occupiedBytes () < annexSize ) { this->flushIfRecvProcessRequested (); - return; + return true; } this->curMsg.m_postsize = this->recvQue.popUInt32 (); this->curMsg.m_count = this->recvQue.popUInt32 (); @@ -840,14 +840,13 @@ void tcpiiu::processIncoming () this->curMsg.m_postsize - this->curDataBytes ); if ( this->curDataBytes < this->curMsg.m_postsize ) { this->flushIfRecvProcessRequested (); - return; + return true; } } bool msgOK = this->pCAC()->executeResponse ( *this, this->curMsg, this->pCurData ); if ( ! msgOK ) { - this->cleanShutdown (); - return; + return false; } } else { @@ -862,7 +861,7 @@ void tcpiiu::processIncoming () this->curMsg.m_postsize - this->curDataBytes ); if ( this->curDataBytes < this->curMsg.m_postsize ) { this->flushIfRecvProcessRequested (); - return; + return true; } } @@ -877,6 +876,7 @@ inline void insertRequestHeader ( ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok ) { + sendQue.beginMsg (); if ( payloadSize < 0xffff && nElem < 0xffff ) { sendQue.pushUInt16 ( request ); sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( payloadSize ) ); @@ -920,6 +920,7 @@ void tcpiiu::hostNameSetRequest () epicsAutoMutex locker ( this->pCAC()->mutexRef() ); + this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_HOST_NAME ); // cmd this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( postSize ) ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType @@ -928,6 +929,7 @@ void tcpiiu::hostNameSetRequest () this->sendQue.pushUInt32 ( 0u ); // available this->sendQue.pushString ( pName, size ); this->sendQue.pushString ( nillBytes, postSize - size ); + this->sendQue.commitMsg (); } /* @@ -949,6 +951,7 @@ void tcpiiu::userNameSetRequest () } epicsAutoMutex locker ( this->pCAC()->mutexRef() ); + this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_CLIENT_NAME ); // cmd this->sendQue.pushUInt16 ( postSize ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType @@ -957,6 +960,7 @@ void tcpiiu::userNameSetRequest () this->sendQue.pushUInt32 ( 0u ); // available this->sendQue.pushString ( pName, size ); this->sendQue.pushString ( nillBytes, postSize - size ); + this->sendQue.commitMsg (); } void tcpiiu::disableFlowControlRequest () @@ -966,13 +970,14 @@ void tcpiiu::disableFlowControlRequest () } epicsAutoMutex locker ( this->pCAC()->mutexRef() ); - + this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_ON ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType this->sendQue.pushUInt16 ( 0u ); // count this->sendQue.pushUInt32 ( 0u ); // cid this->sendQue.pushUInt32 ( 0u ); // available + this->sendQue.commitMsg (); } void tcpiiu::enableFlowControlRequest () @@ -982,13 +987,14 @@ void tcpiiu::enableFlowControlRequest () } epicsAutoMutex locker ( this->pCAC()->mutexRef() ); - + this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_OFF ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType this->sendQue.pushUInt16 ( 0u ); // count this->sendQue.pushUInt32 ( 0u ); // cid this->sendQue.pushUInt32 ( 0u ); // available + this->sendQue.commitMsg (); } void tcpiiu::versionMessage ( const cacChannel::priLev & priority ) @@ -1000,13 +1006,14 @@ void tcpiiu::versionMessage ( const cacChannel::priLev & priority ) } epicsAutoMutex locker ( this->pCAC()->mutexRef() ); - + this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_VERSION ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize ( old possize field ) this->sendQue.pushUInt16 ( priority ); // old dataType field this->sendQue.pushUInt16 ( CA_MINOR_PROTOCOL_REVISION ); // old count field this->sendQue.pushUInt32 ( 0u ); // ( old cid field ) this->sendQue.pushUInt32 ( 0u ); // ( old available field ) + this->sendQue.commitMsg (); } void tcpiiu::echoRequest () @@ -1016,13 +1023,14 @@ void tcpiiu::echoRequest () } epicsAutoMutex locker ( this->pCAC()->mutexRef() ); - + this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_ECHO ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType this->sendQue.pushUInt16 ( 0u ); // count this->sendQue.pushUInt32 ( 0u ); // cid this->sendQue.pushUInt32 ( 0u ); // available + this->sendQue.commitMsg (); } inline void insertRequestWithPayLoad ( @@ -1069,7 +1077,9 @@ inline void insertRequestWithPayLoad ( else { sendQue.push_dbr_type ( dataType, pPayload, nElem ); } + // set pad bytes to nill sendQue.pushString ( nillBytes, payloadSize - size ); + sendQue.commitMsg (); } void tcpiiu::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue ) @@ -1125,6 +1135,7 @@ void tcpiiu::readNotifyRequest ( nciu &chan, netReadNotifyIO &io, static_cast < ca_uint16_t > ( dataType ), nElem, chan.getSID(), io.getID(), CA_V49 ( this->minorProtocolVersion ) ); + this->sendQue.commitMsg (); } void tcpiiu::createChannelRequest ( nciu &chan ) @@ -1149,6 +1160,7 @@ void tcpiiu::createChannelRequest ( nciu &chan ) throw cacChannel::unsupportedByService(); } + this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_CLAIM_CIU ); // cmd this->sendQue.pushUInt16 ( postCnt ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType @@ -1166,17 +1178,20 @@ void tcpiiu::createChannelRequest ( nciu &chan ) if ( postCnt > nameLength ) { this->sendQue.pushString ( nillBytes, postCnt - nameLength ); } + this->sendQue.commitMsg (); } void tcpiiu::clearChannelRequest ( nciu &chan ) { if ( chan.connected () ) { + this->sendQue.beginMsg (); this->sendQue.pushUInt16 ( CA_PROTO_CLEAR_CHANNEL ); // cmd this->sendQue.pushUInt16 ( 0u ); // postsize this->sendQue.pushUInt16 ( 0u ); // dataType this->sendQue.pushUInt16 ( 0u ); // count this->sendQue.pushUInt32 ( chan.getSID () ); // cid this->sendQue.pushUInt32 ( chan.getCID () ); // available + this->sendQue.commitMsg (); } } @@ -1219,6 +1234,7 @@ void tcpiiu::subscriptionRequest ( nciu &chan, netSubscription & subscr ) this->sendQue.pushFloat32 ( 0.0 ); // m_toval this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( mask ) ); // m_mask this->sendQue.pushUInt16 ( 0u ); // m_pad + this->sendQue.commitMsg (); } void tcpiiu::subscriptionCancelRequest ( nciu &chan, netSubscription &subscr ) @@ -1229,11 +1245,16 @@ void tcpiiu::subscriptionCancelRequest ( nciu &chan, netSubscription &subscr ) static_cast < ca_uint16_t > ( subscr.getCount () ), chan.getSID(), subscr.getID(), CA_V49 ( this->minorProtocolVersion ) ); + this->sendQue.commitMsg (); } +// +// caller must hold both the callback mutex and +// also the cac primary mutex +// void tcpiiu::lastChannelDetachNotify () { - this->cleanShutdown (); + this->shutdown ( false ); } bool tcpiiu::flush () @@ -1336,6 +1357,19 @@ void tcpiiu::requestRecvProcessPostponedFlush () this->recvProcessPostponedFlush = true; } +void tcpiiu::hostName ( char *pBuf, unsigned bufLength ) const +{ + this->pHostNameCache->hostName ( pBuf, bufLength ); +} + +// deprecated - please dont use - this is _not_ thread safe +const char * tcpiiu::pHostName () const +{ + static char nameBuf [128]; + this->hostName ( nameBuf, sizeof ( nameBuf ) ); + return nameBuf; // ouch !! +} + diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp index dec6407b2..911a1acbb 100644 --- a/src/ca/udpiiu.cpp +++ b/src/ca/udpiiu.cpp @@ -199,7 +199,6 @@ udpiiu::~udpiiu () // void udpiiu::recvMsg () { - char peek; osiSockAddr src; int status; @@ -212,6 +211,7 @@ void udpiiu::recvMsg () // peek first at the message so that file descriptor managers will wake up // in single threaded applications osiSocklen_t src_size = sizeof ( src ); + char peek; recvfrom ( this->sock, & peek, sizeof ( peek ), MSG_PEEK, &src.sa, &src_size ); status = 0; @@ -486,36 +486,10 @@ void udpiiu::shutdown () if ( this->shutdownCmd ) { return; } + this->shutdownCmd = true; - caHdr msg; - msg.m_cmmd = htons ( CA_PROTO_VERSION ); - msg.m_available = htonl ( 0u ); - msg.m_dataType = htons ( 0u ); - msg.m_count = htons ( 0u ); - msg.m_cid = htonl ( 0u ); - msg.m_postsize = htons ( 0u ); - - osiSockAddr addr; - addr.ia.sin_family = AF_INET; - addr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK ); - addr.ia.sin_port = htons ( this->localPort ); - - // send a wakeup msg so the UDP recv thread will exit - int status = sendto ( this->sock, reinterpret_cast < const char * > ( &msg ), - sizeof (msg), 0, &addr.sa, sizeof ( addr.sa ) ); - if ( status < 0 ) { - // this knocks the UDP input thread out of recv () - // on all os except linux - status = socket_close ( this->sock ); - if ( status == 0 ) { - this->sockCloseCompleted = true; - } - else { - errlogPrintf ("CAC UDP socket close error was %s\n", - SOCKERRSTR ( SOCKERRNO ) ); - } - } + this->wakeupMsg (); // wait for recv threads to exit epicsEventMustWait ( this->recvThreadExitSignal ); @@ -528,7 +502,7 @@ bool udpiiu::badUDPRespAction ( const caHdr &msg, sockAddrToDottedIP ( &netAddr.sa, buf, sizeof ( buf ) ); char date[64]; currentTime.strftime ( date, sizeof ( date ), "%a %b %d %Y %H:%M:%S"); - this->printf ( "CAC: undecipherable ( bad msg code %u ) UDP message from %s at %s\n", + this->printf ( "CAC: Undecipherable ( bad msg code %u ) UDP message from %s at %s\n", msg.m_cmmd, buf, date ); return false; } @@ -690,7 +664,7 @@ void udpiiu::postMsg ( const osiSockAddr & net_addr, char buf[64]; sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); this->printf ( - "%s: undecipherable (too small) UDP msg from %s ignored\n", + "%s: Undecipherable (too small) UDP msg from %s ignored\n", __FILE__, buf ); return; } @@ -725,7 +699,7 @@ void udpiiu::postMsg ( const osiSockAddr & net_addr, char buf[64]; sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); this->printf ( - "%s: undecipherable (payload too small) UDP msg from %s ignored\n", __FILE__, + "%s: Undecipherable (payload too small) UDP msg from %s ignored\n", __FILE__, buf ); return; } @@ -744,7 +718,7 @@ void udpiiu::postMsg ( const osiSockAddr & net_addr, if ( ! success ) { char buf[256]; sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) ); - this->printf ( "CAC: undecipherable UDP message from %s\n", buf ); + this->printf ( "CAC: Undecipherable UDP message from %s\n", buf ); return; } @@ -867,3 +841,36 @@ void udpiiu::show ( unsigned level ) const } } +void udpiiu::wakeupMsg () +{ + caHdr msg; + msg.m_cmmd = htons ( CA_PROTO_VERSION ); + msg.m_available = htonl ( 0u ); + msg.m_dataType = htons ( 0u ); + msg.m_count = htons ( 0u ); + msg.m_cid = htonl ( 0u ); + msg.m_postsize = htons ( 0u ); + + osiSockAddr addr; + addr.ia.sin_family = AF_INET; + addr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK ); + addr.ia.sin_port = htons ( this->localPort ); + + // send a wakeup msg so the UDP recv thread will exit + int status = sendto ( this->sock, reinterpret_cast < const char * > ( &msg ), + sizeof (msg), 0, &addr.sa, sizeof ( addr.sa ) ); + if ( status < 0 ) { + // this knocks the UDP input thread out of recv () + // on all os except linux + status = socket_close ( this->sock ); + if ( status == 0 ) { + this->sockCloseCompleted = true; + } + else { + errlogPrintf ("CAC UDP socket close error was %s\n", + SOCKERRSTR ( SOCKERRNO ) ); + } + } +} + + diff --git a/src/ca/udpiiu.h b/src/ca/udpiiu.h index dad8f9e5b..bc635f4e1 100644 --- a/src/ca/udpiiu.h +++ b/src/ca/udpiiu.h @@ -47,14 +47,15 @@ public: virtual ~udpiiu (); void shutdown (); void recvMsg (); - void postMsg ( const osiSockAddr &net_addr, + void postMsg ( const osiSockAddr & net_addr, char *pInBuf, arrayElementCount blockSize, - const epicsTime ¤Time); + const epicsTime ¤Time ); void repeaterRegistrationMessage ( unsigned attemptNumber ); void datagramFlush (); unsigned getPort () const; void show ( unsigned level ) const; bool isCurrentThread () const; + void wakeupMsg (); // exceptions class noSocket {}; diff --git a/src/ca/virtualCircuit.h b/src/ca/virtualCircuit.h index 323a49594..8057fb31c 100644 --- a/src/ca/virtualCircuit.h +++ b/src/ca/virtualCircuit.h @@ -18,183 +18,20 @@ #ifndef virtualCircuith #define virtualCircuith -#include // needed by comQueueSend - -#include "epicsTimer.h" #include "epicsMemory.h" -#include "ipAddrToAsciiAsynchronous.h" -#include "caServerID.h" +#include "tsSLList.h" +#include "tsDLList.h" + #include "comBuf.h" +#include "caServerID.h" #include "netiiu.h" +#include "comQueSend.h" +#include "comQueRecv.h" +#include "tcpRecvWatchdog.h" +#include "tcpSendWatchdog.h" enum iiu_conn_state { iiu_connecting, iiu_connected, iiu_disconnected }; -class nciu; -class tcpiiu; - -class comQueSend { -public: - comQueSend ( wireSendAdapter & ); - ~comQueSend (); - void clear (); - unsigned occupiedBytes () const; - bool flushEarlyThreshold ( unsigned nBytesThisMsg ) const; - bool flushBlockThreshold ( unsigned nBytesThisMsg ) const; - bool dbr_type_ok ( unsigned type ); - void pushUInt16 ( const ca_uint16_t value ); - void pushUInt32 ( const ca_uint32_t value ); - void pushFloat32 ( const ca_float32_t value ); - void pushString ( const char *pVal, unsigned nChar ); - void push_dbr_type ( unsigned type, const void *pVal, unsigned nElem ); - comBuf * popNextComBufToSend (); -private: - wireSendAdapter & wire; - tsDLList < comBuf > bufs; - unsigned nBytesPending; - void copy_dbr_string ( const void *pValue, unsigned nElem ); - void copy_dbr_short ( const void *pValue, unsigned nElem ); - void copy_dbr_float ( const void *pValue, unsigned nElem ); - void copy_dbr_char ( const void *pValue, unsigned nElem ); - void copy_dbr_long ( const void *pValue, unsigned nElem ); - void copy_dbr_double ( const void *pValue, unsigned nElem ); - typedef void ( comQueSend::*copyFunc_t ) ( - const void *pValue, unsigned nElem ); - static const copyFunc_t dbrCopyVector [39]; - - // - // visual C++ version 6.0 does not allow out of - // class member template function definition - // - template < class T > - inline void copyIn ( const T *pVal, unsigned nElem ) - { - unsigned nCopied; - comBuf *pComBuf = this->bufs.last (); - if ( pComBuf ) { - nCopied = pComBuf->copyIn ( pVal, nElem ); - this->nBytesPending += nCopied * sizeof ( T ); - } - else { - nCopied = 0u; - } - while ( nElem > nCopied ) { - pComBuf = new ( std::nothrow ) comBuf; - if ( ! pComBuf ) { - this->wire.forcedShutdown (); - throw std::bad_alloc (); - } - unsigned nNew = pComBuf->copyIn ( &pVal[nCopied], nElem - nCopied ); - nCopied += nNew; - this->nBytesPending += nNew * sizeof ( T ); - this->bufs.add ( *pComBuf ); - } - } - - // - // visual C++ version 6.0 does not allow out of - // class member template function definition - // - template < class T > - inline void copyIn ( const T &val ) - { - comBuf *pComBuf = this->bufs.last (); - if ( pComBuf ) { - if ( pComBuf->copyIn ( &val, 1u ) >= 1u ) { - this->nBytesPending += sizeof ( T ); - return; - } - } - pComBuf = new ( std::nothrow ) comBuf; - if ( ! pComBuf ) { - this->wire.forcedShutdown (); - throw std::bad_alloc (); - } - if ( pComBuf->copyIn ( &val, 1u ) == 0u ) { - this->wire.forcedShutdown (); - throw -1; - } - this->bufs.add ( *pComBuf ); - this->nBytesPending += sizeof ( T ); - return; - } -}; - -static const unsigned maxBytesPendingTCP = 0x4000; - -class comQueRecv { -public: - comQueRecv (); - ~comQueRecv (); - unsigned occupiedBytes () const; - unsigned copyOutBytes ( epicsInt8 *pBuf, unsigned nBytes ); - unsigned removeBytes ( unsigned nBytes ); - void pushLastComBufReceived ( comBuf & ); - void clear (); - epicsInt8 popInt8 (); - epicsUInt8 popUInt8 (); - epicsInt16 popInt16 (); - epicsUInt16 popUInt16 (); - epicsInt32 popInt32 (); - epicsUInt32 popUInt32 (); - epicsFloat32 popFloat32 (); - epicsFloat64 popFloat64 (); - void popString ( epicsOldString * ); - class insufficentBytesAvailable {}; -private: - tsDLList < comBuf > bufs; - unsigned nBytesPending; -}; - -class tcpRecvWatchdog : private epicsTimerNotify { -public: - tcpRecvWatchdog ( tcpiiu &, double periodIn, epicsTimerQueue & ); - virtual ~tcpRecvWatchdog (); - void rescheduleRecvTimer (); - void sendBacklogProgressNotify (); - void messageArrivalNotify (); - void beaconArrivalNotify (); - void beaconAnomalyNotify (); - void connectNotify (); - void cancel (); - void show ( unsigned level ) const; -private: - const double period; - epicsTimer & timer; - tcpiiu &iiu; - bool responsePending; - bool beaconAnomaly; - expireStatus expire ( const epicsTime & currentTime ); -}; - -class tcpSendWatchdog : private epicsTimerNotify { -public: - tcpSendWatchdog ( tcpiiu &, double periodIn, epicsTimerQueue & queueIn ); - virtual ~tcpSendWatchdog (); - void start (); - void cancel (); -private: - const double period; - epicsTimer & timer; - tcpiiu & iiu; - expireStatus expire ( const epicsTime & currentTime ); -}; - -class hostNameCache : public ipAddrToAsciiAsynchronous { -public: - hostNameCache ( const osiSockAddr &addr, ipAddrToAsciiEngine &engine ); - ~hostNameCache (); - void destroy (); - void ioCompletionNotify ( const char *pHostName ); - void hostName ( char *pBuf, unsigned bufLength ) const; - void * operator new ( size_t size ); - void operator delete ( void *pCadaver, size_t size ); -private: - bool ioComplete; - char hostNameBuf [128]; - static tsFreeList < class hostNameCache, 16 > freeList; - static epicsMutex freeListMutex; -}; - extern "C" void cacSendThreadTCP ( void *pParam ); extern "C" void cacRecvThreadTCP ( void *pParam ); @@ -208,6 +45,9 @@ struct caHdrLargeArray { ca_uint16_t m_cmmd; // operation to be performed }; +class hostNameCache; +class ipAddrToAsciiEngine; + class tcpiiu : public netiiu, public tsDLNode < tcpiiu >, public tsSLNode < tcpiiu >, public caServerID, @@ -220,10 +60,10 @@ public: ~tcpiiu (); void connect (); void destroy (); - void forcedShutdown (); void cleanShutdown (); void beaconAnomalyNotify (); void beaconArrivalNotify (); + void forcedShutdown (); void flushRequest (); bool flushBlockThreshold () const; @@ -274,7 +114,9 @@ private: bool earlyFlush; bool recvProcessPostponedFlush; - void processIncoming (); + void shutdown ( bool discardPendingMessages ); + + bool processIncoming (); unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf ); unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf ); @@ -302,143 +144,6 @@ private: bool flush (); // only to be called by the send thread }; -inline bool comQueSend::dbr_type_ok ( unsigned type ) -{ - if ( type >= ( sizeof ( this->dbrCopyVector ) / sizeof ( this->dbrCopyVector[0] ) ) ) { - return false; - } - if ( ! this->dbrCopyVector [type] ) { - return false; - } - return true; -} - -inline void comQueSend::pushUInt16 ( const ca_uint16_t value ) -{ - this->copyIn ( value ); -} - -inline void comQueSend::pushUInt32 ( const ca_uint32_t value ) -{ - this->copyIn ( value ); -} - -inline void comQueSend::pushFloat32 ( const ca_float32_t value ) -{ - this->copyIn ( value ); -} - -inline void comQueSend::pushString ( const char *pVal, unsigned nChar ) -{ - this->copyIn ( pVal, nChar ); -} - -// it is assumed that dbr_type_ok() was called prior to calling this routine -// to check the type code -inline void comQueSend::push_dbr_type ( unsigned type, const void *pVal, unsigned nElem ) -{ - ( this->*dbrCopyVector [type] ) ( pVal, nElem ); -} - -inline unsigned comQueSend::occupiedBytes () const -{ - return this->nBytesPending; -} - -inline bool comQueSend::flushBlockThreshold ( unsigned nBytesThisMsg ) const -{ - return ( this->nBytesPending + nBytesThisMsg > 16 * comBuf::capacityBytes () ); -} - -inline bool comQueSend::flushEarlyThreshold ( unsigned nBytesThisMsg ) const -{ - return ( this->nBytesPending + nBytesThisMsg > 4 * comBuf::capacityBytes () ); -} - -inline comBuf * comQueSend::popNextComBufToSend () -{ - comBuf *pBuf = this->bufs.get (); - if ( pBuf ) { - unsigned nBytesThisBuf = pBuf->occupiedBytes (); - assert ( this->nBytesPending >= nBytesThisBuf ); - this->nBytesPending -= pBuf->occupiedBytes (); - } - else { - assert ( this->nBytesPending == 0u ); - } - return pBuf; -} - -inline unsigned comQueRecv::occupiedBytes () const -{ - return this->nBytesPending; -} - -inline epicsInt8 comQueRecv::popInt8 () -{ - return static_cast < epicsInt8 > ( this->popUInt8() ); -} - -inline epicsInt16 comQueRecv::popInt16 () -{ - epicsInt16 tmp; - tmp = this->popInt8() << 8u; - tmp |= this->popInt8() << 0u; - return tmp; -} - -inline epicsInt32 comQueRecv::popInt32 () -{ - epicsInt32 tmp ; - tmp |= this->popInt8() << 24u; - tmp |= this->popInt8() << 16u; - tmp |= this->popInt8() << 8u; - tmp |= this->popInt8() << 0u; - return tmp; -} - -inline epicsFloat32 comQueRecv::popFloat32 () -{ - epicsFloat32 tmp; - epicsUInt8 wire[ sizeof ( tmp ) ]; - for ( unsigned i = 0u; i < sizeof ( tmp ); i++ ) { - wire[i] = this->popUInt8 (); - } - osiConvertFromWireFormat ( tmp, wire ); - return tmp; -} - -inline epicsFloat64 comQueRecv::popFloat64 () -{ - epicsFloat64 tmp; - epicsUInt8 wire[ sizeof ( tmp ) ]; - for ( unsigned i = 0u; i < sizeof ( tmp ); i++ ) { - wire[i] = this->popUInt8 (); - } - osiConvertFromWireFormat ( tmp, wire ); - return tmp; -} - -inline void comQueRecv::popString ( epicsOldString *pStr ) -{ - for ( unsigned i = 0u; i < sizeof ( *pStr ); i++ ) { - pStr[0][i] = this->popInt8 (); - } -} - -inline void tcpiiu::hostName ( char *pBuf, unsigned bufLength ) const -{ - this->pHostNameCache->hostName ( pBuf, bufLength ); -} - -// deprecated - please dont use - this is _not_ thread safe -inline const char * tcpiiu::pHostName () const -{ - static char nameBuf [128]; - this->hostName ( nameBuf, sizeof ( nameBuf ) ); - return nameBuf; // ouch !! -} - inline void tcpiiu::flushRequest () { this->sendThreadFlushEvent.signal ();