restructure to eliminate use of select

This commit is contained in:
Jeff Hill
2001-09-07 23:02:32 +00:00
parent 6517109e7e
commit ad819cba65
19 changed files with 807 additions and 570 deletions

View File

@@ -111,14 +111,9 @@ int CASG::block ( double timeout )
break;
}
{
// serialize access the blocking mechanism below
epicsAutoMutex autoMutex ( this->serializeBlock );
status = this->client.blockForEventAndEnableCallbacks ( this->sem, remaining );
if ( status != ECA_NORMAL ) {
return status;
}
status = this->client.blockForEventAndEnableCallbacks ( this->sem, remaining );
if ( status != ECA_NORMAL ) {
return status;
}
/*

View File

@@ -1986,7 +1986,7 @@ void verifyOldPend ()
{
int status;
/*
* at least verify that the old ca_pend() is in the symbol table
* verify that the old ca_pend() is in the symbol table
*/
status = ca_pend ( 100000.0, 1 );
assert ( status = ECA_NORMAL );

View File

@@ -12,6 +12,8 @@
#define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
#include <new>
#include "epicsMemory.h"
#include "osiProcess.h"
#include "osiSigPipeIgnore.h"
@@ -124,20 +126,20 @@ extern "C" void cacOnceFunc ( void * )
//
// cac::cac ()
//
cac::cac ( cacNotify &notifyIn, bool enablePreemptiveCallbackIn ) :
cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) :
ipToAEngine ( "caIPAddrToAsciiEngine" ),
pudpiiu ( 0 ),
pSearchTmr ( 0 ),
pRepeaterSubscribeTmr ( 0 ),
tcpSmallRecvBufFreeList ( 0 ),
tcpLargeRecvBufFreeList ( 0 ),
pCallbackLocker ( 0 ),
notify ( notifyIn ),
initializingThreadsPriority ( epicsThreadGetPrioritySelf () ),
maxRecvBytesTCP ( MAX_TCP ),
pndRecvCnt ( 0u ),
readSeq ( 0u ),
recvThreadsPendingCount ( 0u ),
enablePreemptiveCallback ( enablePreemptiveCallbackIn )
recvThreadsPendingCount ( 0u )
{
long status;
unsigned abovePriority;
@@ -230,20 +232,26 @@ cac::cac ( cacNotify &notifyIn, bool enablePreemptiveCallbackIn ) :
freeListCleanup ( this->tcpLargeRecvBufFreeList );
throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) );
}
if ( ! this->enablePreemptiveCallback ) {
this->callbackMutex.lock ();
if ( ! enablePreemptiveCallbackIn ) {
this->pCallbackLocker = new ( std::nothrow ) callbackAutoMutex ( *this );
if ( ! this->pCallbackLocker ) {
osiSockRelease ();
free ( this->pUserName );
freeListCleanup ( this->tcpSmallRecvBufFreeList );
freeListCleanup ( this->tcpLargeRecvBufFreeList );
this->pTimerQueue->release ();
throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) );
}
}
}
cac::~cac ()
{
//
// make certain that process thread isnt deleting
// tcpiiu objects at the same that this thread is
// release callback lock
//
if ( ! this->enablePreemptiveCallback ) {
this->callbackMutex.unlock ();
}
delete this->pCallbackLocker;
//
// lock intentionally not held here so that we dont deadlock
@@ -339,7 +347,7 @@ void cac::show ( unsigned level ) const
this->serverTable.show ( level - 1u );
::printf ( "\tconnection time out watchdog period %f\n", this->connTMO );
::printf ( "\tpreemptive calback is %s\n",
this->enablePreemptiveCallback ? "enabled" : "disabled" );
this->pCallbackLocker ? "disabled" : "enabled" );
::printf ( "list of installed services:\n" );
this->services.show ( level - 1u );
}
@@ -480,30 +488,32 @@ int cac::pendIO ( const double & timeout )
this->flushRequestPrivate ();
}
{
// serialize access the blocking mechanism below
epicsAutoMutex autoMutex ( this->serializePendIO );
while ( this->pndRecvCnt > 0 ) {
if ( remaining < CAC_SIGNIFICANT_DELAY ) {
status = ECA_TIMEOUT;
break;
}
while ( this->pndRecvCnt > 0 ) {
if ( remaining < CAC_SIGNIFICANT_DELAY ) {
status = ECA_TIMEOUT;
break;
}
if ( this->enablePreemptiveCallback ) {
this->ioDone.wait ( remaining );
}
else {
{
// serialize access the blocking mechanism below
epicsAutoMutex autoMutex ( this->serializePendIO );
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoRelease ( this->callbackMutex );
this->ioDone.wait ( remaining );
}
double delay = epicsTime::getCurrent () - beg_time;
if ( delay < timeout ) {
remaining = timeout - delay;
}
else {
remaining = 0.0;
this->ioDone.wait ( remaining );
}
}
double delay = epicsTime::getCurrent () - beg_time;
if ( delay < timeout ) {
remaining = timeout - delay;
}
else {
remaining = 0.0;
}
}
{
@@ -520,11 +530,13 @@ int cac::pendIO ( const double & timeout )
int cac::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout )
{
if ( this->enablePreemptiveCallback ) {
epicsAutoMutex autoMutex ( this->serializeCallbackMutexUsage );
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex );
event.wait ( timeout );
}
else {
epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex );
event.wait ( timeout );
}
@@ -548,13 +560,13 @@ int cac::pendEvent ( const double & timeout )
{
// serialize access the blocking mechanism below
epicsAutoMutex autoMutex ( this->serializePendEvent );
epicsAutoMutex autoMutex ( this->serializeCallbackMutexUsage );
// process at least once if preemptive callback
// isnt enabled
if ( ! this->enablePreemptiveCallback ) {
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex );
while ( this->recvThreadsPendingCount ) {
while ( this->recvThreadsPendingCount > 1 ) {
this->noRecvThreadsPending.wait ();
}
}
@@ -571,11 +583,11 @@ int cac::pendEvent ( const double & timeout )
}
if ( delay >= CAC_SIGNIFICANT_DELAY ) {
if ( this->enablePreemptiveCallback ) {
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex );
epicsThreadSleep ( delay );
}
else {
epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex );
epicsThreadSleep ( delay );
}
}
@@ -734,22 +746,7 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
return true;
}
}
bhe * pBHE = this->beaconTable.lookup ( addr.ia );
if ( ! pBHE ) {
pBHE = new bhe ( epicsTime (), addr.ia );
if ( pBHE ) {
if ( this->beaconTable.add ( *pBHE ) < 0 ) {
pBHE->destroy ();
return true;
}
}
else {
return true;
}
}
if ( ! piiu ) {
else {
try {
piiu = new tcpiiu ( *this, this->connTMO, *this->pTimerQueue,
addr, minorVersionNumber, this->ipToAEngine,
@@ -758,6 +755,19 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
return true;
}
this->serverTable.add ( *piiu );
bhe * pBHE = this->beaconTable.lookup ( addr.ia );
if ( ! pBHE ) {
pBHE = new bhe ( epicsTime (), addr.ia );
if ( pBHE ) {
if ( this->beaconTable.add ( *pBHE ) < 0 ) {
pBHE->destroy ();
return true;
}
}
else {
return true;
}
}
pBHE->registerIIU ( *piiu );
}
catch ( ... ) {
@@ -814,19 +824,33 @@ bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
void cac::uninstallChannel ( nciu & chan )
{
{
epicsAutoMutex autoMutex ( this->mutex );
nciu *pChan = this->chanTable.remove ( chan );
assert ( pChan = &chan );
// flush prior to taking the callback lock
this->flushIfRequired ( *chan.getPIIU() );
chan.getPIIU()->clearChannelRequest ( chan );
chan.getPIIU()->detachChannel ( chan );
}
//
// dont block on the call back lock if this isnt the
// primary thread
this->udpWakeup ();
// taking this mutex guarantees that we will not delete
// a channel out from under a callback
epicsAutoMutex autoCallbackMutex ( this->callbackMutex );
epicsAutoMutex autoMutex ( this->serializeCallbackMutexUsage );
if ( this->pCallbackLocker ) {
this->uninstallChannelPrivate ( chan );
}
else {
// taking this mutex guarantees that we will not delete
// a channel out from under a callback
epicsAutoMutex autoCallbackMutex ( this->callbackMutex );
this->uninstallChannelPrivate ( chan );
}
}
void cac::uninstallChannelPrivate ( nciu & chan )
{
epicsAutoMutex autoMutex ( this->mutex );
nciu * pChan = this->chanTable.remove ( chan );
assert ( pChan = &chan );
// flush prior to taking the callback lock
this->flushIfRequired ( *chan.getPIIU() );
chan.getPIIU()->clearChannelRequest ( chan );
chan.getPIIU()->detachChannel ( chan );
}
int cac::printf ( const char *pformat, ... ) const
@@ -858,13 +882,13 @@ void cac::flushIfRequired ( netiiu & iiu )
// enable / disable of call back preemption must occur here
// because the tcpiiu might disconnect while waiting and its
// pointer to this cac might become invalid
if ( this->enablePreemptiveCallback ) {
iiu.blockUntilSendBacklogIsReasonable ( 0, this->mutex );
}
else {
if ( this->pCallbackLocker ) {
iiu.blockUntilSendBacklogIsReasonable
( &this->callbackMutex, this->mutex );
}
else {
iiu.blockUntilSendBacklogIsReasonable ( 0, this->mutex );
}
}
}
else {
@@ -919,13 +943,13 @@ cacChannel::ioid cac::readNotifyRequest ( nciu &chan, unsigned type,
void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
{
if ( ! epicsThreadPrivateGet ( caClientCallbackThreadId ) &&
this->enablePreemptiveCallback ) {
// wait for any IO callbacks in progress to complete
// prior to destroying the IO object
epicsAutoMutex autoMutex ( this->callbackMutex );
this->pCallbackLocker ) {
this->ioCancelPrivate ( chan, id );
}
else {
// wait for any IO callbacks in progress to complete
// prior to destroying the IO object
epicsAutoMutex autoMutex ( this->callbackMutex );
this->ioCancelPrivate ( chan, id );
}
}
@@ -1172,13 +1196,13 @@ void cac::disconnectAllIO ( nciu & chan, bool enableCallbacks )
void cac::destroyAllIO ( nciu & chan )
{
if ( ! epicsThreadPrivateGet ( caClientCallbackThreadId ) &&
this->enablePreemptiveCallback ) {
// force any callbacks in progress to complete
// before deleting the IO
epicsAutoMutex autoMutex ( this->callbackMutex );
this->pCallbackLocker ) {
this->privateDestroyAllIO ( chan );
}
else {
// force any callbacks in progress to complete
// before deleting the IO
epicsAutoMutex autoMutex ( this->callbackMutex );
this->privateDestroyAllIO ( chan );
}
}
@@ -1692,14 +1716,14 @@ void cac::selfTest () const
void cac::notifyNewFD ( SOCKET sock ) const
{
if ( ! this->enablePreemptiveCallback ) {
if ( this->pCallbackLocker ) {
this->notify.fdWasCreated ( sock );
}
}
void cac::notifyDestroyFD ( SOCKET sock ) const
{
if ( ! this->enablePreemptiveCallback ) {
if ( this->pCallbackLocker ) {
this->notify.fdWasDestroyed ( sock );
}
}
@@ -1739,10 +1763,10 @@ void cac::uninstallIIU ( tcpiiu & iiu )
this->iiuUninstal.signal();
}
void cac::preemptiveCallbackLock()
void cac::preemptiveCallbackLock ()
{
// the count must be incremented prior to taking the lock
if ( ! this->enablePreemptiveCallback ) {
{
epicsAutoMutex autoMutex ( this->mutex );
assert ( this->recvThreadsPendingCount < UINT_MAX );
this->recvThreadsPendingCount++;
@@ -1750,25 +1774,26 @@ void cac::preemptiveCallbackLock()
this->callbackMutex.lock ();
}
void cac::preemptiveCallbackUnlock()
void cac::preemptiveCallbackUnlock ()
{
this->callbackMutex.unlock ();
if ( ! this->enablePreemptiveCallback ) {
bool signalRequired;
{
epicsAutoMutex autoMutex ( this->mutex );
assert ( this->recvThreadsPendingCount > 0 );
this->recvThreadsPendingCount--;
if ( this->recvThreadsPendingCount == 0u ) {
bool signalRequired;
{
epicsAutoMutex autoMutex ( this->mutex );
assert ( this->recvThreadsPendingCount > 0 );
this->recvThreadsPendingCount--;
unsigned noThreadsWaiting;
if ( this->pCallbackLocker ) {
if ( this->recvThreadsPendingCount == 1u ) {
signalRequired = true;
}
else {
signalRequired = false;
}
}
if ( signalRequired ) {
this->noRecvThreadsPending.signal ();
}
}
if ( signalRequired ) {
this->noRecvThreadsPending.signal ();
}
}
@@ -1789,3 +1814,10 @@ double cac::beaconPeriod ( const nciu & chan ) const
return - DBL_MAX;
}
void cac::udpWakeup ()
{
epicsAutoMutex locker ( this->mutex );
if ( this->pudpiiu ) {
this->pudpiiu->wakeupMsg ();
}
}

View File

@@ -141,6 +141,7 @@ public:
void uninstallIIU ( tcpiiu &iiu );
bool preemptiveCallbackEnable () const;
double beaconPeriod ( const nciu & chan ) const;
void udpWakeup ();
private:
ipAddrToAsciiEngine ipToAEngine;
@@ -169,27 +170,28 @@ private:
mutable epicsMutex mutex;
epicsMutex callbackMutex;
epicsMutex serializePendIO;
epicsMutex serializePendEvent;
epicsMutex serializeCallbackMutexUsage;
epicsEvent ioDone;
epicsEvent noRecvThreadsPending;
epicsEvent iiuUninstal;
epicsTimerQueueActive *pTimerQueue;
char *pUserName;
class udpiiu *pudpiiu;
class searchTimer *pSearchTmr;
epicsTimerQueueActive * pTimerQueue;
char * pUserName;
class udpiiu * pudpiiu;
class searchTimer * pSearchTmr;
class repeaterSubscribeTimer
*pRepeaterSubscribeTmr;
void *tcpSmallRecvBufFreeList;
void *tcpLargeRecvBufFreeList;
* pRepeaterSubscribeTmr;
void * tcpSmallRecvBufFreeList;
void * tcpLargeRecvBufFreeList;
class callbackAutoMutex * pCallbackLocker;
cacNotify & notify;
unsigned initializingThreadsPriority;
unsigned maxRecvBytesTCP;
unsigned pndRecvCnt;
unsigned readSeq;
unsigned recvThreadsPendingCount;
bool enablePreemptiveCallback;
void flushRequestPrivate ();
void uninstallChannelPrivate ( nciu & );
void run ();
bool setupUDP ();
void connectAllIO ( nciu &chan );
@@ -348,7 +350,7 @@ inline bool cac::ioComplete () const
inline bool cac::preemptiveCallbackEnable () const
{
return this->enablePreemptiveCallback;
return ! this->pCallbackLocker;
}
#endif // ifdef cach

View File

@@ -23,8 +23,8 @@ bool comBuf::flushToWire ( wireSendAdapter &wire )
{
unsigned occupied = this->occupiedBytes ();
while ( occupied ) {
unsigned nBytes = wire.sendBytes ( &this->buf[this->nextReadIndex],
occupied );
unsigned nBytes = wire.sendBytes (
&this->buf[this->nextReadIndex], occupied );
if ( nBytes == 0u ) {
return false;
}

View File

@@ -33,7 +33,6 @@ class wireSendAdapter {
public:
virtual unsigned sendBytes ( const void *pBuf,
unsigned nBytesInBuf ) = 0;
virtual void forcedShutdown () = 0;
};
class wireRecvAdapter {
@@ -48,6 +47,7 @@ public:
void destroy ();
unsigned unoccupiedBytes () const;
unsigned occupiedBytes () const;
unsigned uncommittedBytes () const;
static unsigned capacityBytes ();
void clear ();
unsigned copyInBytes ( const void *pBuf, unsigned nBytes );
@@ -61,6 +61,8 @@ public:
unsigned copyIn ( const epicsFloat32 *pValue, unsigned nElem );
unsigned copyIn ( const epicsFloat64 *pValue, unsigned nElem );
unsigned copyIn ( const epicsOldString *pValue, unsigned nElem );
void commitIncomming ();
void clearUncommittedIncomming ();
bool copyInAllBytes ( const void *pBuf, unsigned nBytes );
unsigned copyOutBytes ( void *pBuf, unsigned nBytes );
bool copyOutAllBytes ( void *pBuf, unsigned nBytes );
@@ -84,6 +86,7 @@ public:
protected:
~comBuf ();
private:
unsigned commitIndex;
unsigned nextWriteIndex;
unsigned nextReadIndex;
epicsUInt8 buf [ comBufSize ];
@@ -93,7 +96,8 @@ private:
static epicsMutex freeListMutex;
};
inline comBuf::comBuf () : nextWriteIndex ( 0u ), nextReadIndex ( 0u )
inline comBuf::comBuf () : nextWriteIndex ( 0u ),
nextReadIndex ( 0u ), commitIndex ( 0u )
{
}
@@ -108,6 +112,7 @@ inline void comBuf::destroy ()
inline void comBuf::clear ()
{
this->commitIndex = 0u;
this->nextWriteIndex = 0u;
this->nextReadIndex = 0u;
}
@@ -131,8 +136,12 @@ inline unsigned comBuf::unoccupiedBytes () const
inline unsigned comBuf::occupiedBytes () const
{
// assert (this->nextWriteIndex >= this->nextReadIndex);
return this->nextWriteIndex - this->nextReadIndex;
return this->commitIndex - this->nextReadIndex;
}
inline unsigned comBuf::uncommittedBytes () const
{
return this->nextWriteIndex - this->commitIndex;
}
inline bool comBuf::copyInAllBytes ( const void *pBuf, unsigned nBytes )
@@ -158,15 +167,15 @@ inline unsigned comBuf::copyInBytes ( const void *pBuf, unsigned nBytes )
return nBytes;
}
inline unsigned comBuf::copyIn ( comBuf &bufIn )
inline unsigned comBuf::copyIn ( comBuf & bufIn )
{
unsigned nBytes = this->copyInBytes ( &bufIn.buf[bufIn.nextReadIndex],
bufIn.nextWriteIndex - bufIn.nextReadIndex );
bufIn.commitIndex - bufIn.nextReadIndex );
bufIn.nextReadIndex += nBytes;
return nBytes;
}
inline bool comBuf::copyOutAllBytes ( void *pBuf, unsigned nBytes )
inline bool comBuf::copyOutAllBytes ( void * pBuf, unsigned nBytes )
{
if ( nBytes <= this->occupiedBytes () ) {
memcpy ( pBuf, &this->buf[this->nextReadIndex], nBytes);
@@ -371,4 +380,14 @@ inline comBuf::statusPopUInt32 comBuf::popUInt32 ()
return tmp;
}
inline void comBuf::commitIncomming ()
{
this->commitIndex = this->nextWriteIndex;
}
inline void comBuf::clearUncommittedIncomming ()
{
this->nextWriteIndex = this->commitIndex;
}
#endif // ifndef comBufh

View File

@@ -84,10 +84,12 @@ unsigned comQueRecv::removeBytes ( unsigned nBytes )
void comQueRecv::pushLastComBufReceived ( comBuf & bufIn )
{
bufIn.commitIncomming ();
comBuf * pComBuf = this->bufs.last ();
if ( pComBuf ) {
if ( pComBuf->unoccupiedBytes() ) {
this->nBytesPending += pComBuf->copyIn ( bufIn );
pComBuf->commitIncomming ();
}
}
unsigned bufBytes = bufIn.occupiedBytes();

95
src/ca/comQueRecv.h Normal file
View File

@@ -0,0 +1,95 @@
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef comQueRecvh
#define comQueRecvh
class comQueRecv {
public:
comQueRecv ();
~comQueRecv ();
unsigned occupiedBytes () const;
unsigned copyOutBytes ( epicsInt8 *pBuf, unsigned nBytes );
unsigned removeBytes ( unsigned nBytes );
void pushLastComBufReceived ( comBuf & );
void clear ();
epicsInt8 popInt8 ();
epicsUInt8 popUInt8 ();
epicsInt16 popInt16 ();
epicsUInt16 popUInt16 ();
epicsInt32 popInt32 ();
epicsUInt32 popUInt32 ();
epicsFloat32 popFloat32 ();
epicsFloat64 popFloat64 ();
void popString ( epicsOldString * );
class insufficentBytesAvailable {};
private:
tsDLList < comBuf > bufs;
unsigned nBytesPending;
};
inline unsigned comQueRecv::occupiedBytes () const
{
return this->nBytesPending;
}
inline epicsInt8 comQueRecv::popInt8 ()
{
return static_cast < epicsInt8 > ( this->popUInt8() );
}
inline epicsInt16 comQueRecv::popInt16 ()
{
epicsInt16 tmp;
tmp = this->popInt8() << 8u;
tmp |= this->popInt8() << 0u;
return tmp;
}
inline epicsInt32 comQueRecv::popInt32 ()
{
epicsInt32 tmp ;
tmp |= this->popInt8() << 24u;
tmp |= this->popInt8() << 16u;
tmp |= this->popInt8() << 8u;
tmp |= this->popInt8() << 0u;
return tmp;
}
inline epicsFloat32 comQueRecv::popFloat32 ()
{
epicsFloat32 tmp;
epicsUInt8 wire[ sizeof ( tmp ) ];
for ( unsigned i = 0u; i < sizeof ( tmp ); i++ ) {
wire[i] = this->popUInt8 ();
}
osiConvertFromWireFormat ( tmp, wire );
return tmp;
}
inline epicsFloat64 comQueRecv::popFloat64 ()
{
epicsFloat64 tmp;
epicsUInt8 wire[ sizeof ( tmp ) ];
for ( unsigned i = 0u; i < sizeof ( tmp ); i++ ) {
wire[i] = this->popUInt8 ();
}
osiConvertFromWireFormat ( tmp, wire );
return tmp;
}
#endif // ifndef comQueRecvh

View File

@@ -87,6 +87,8 @@ void comQueSend::clear ()
this->nBytesPending -= pBuf->occupiedBytes ();
pBuf->destroy ();
}
this->pFirstUncommited = tsDLIterBD < comBuf > ();
assert ( this->nBytesPending == 0 );
}
void comQueSend::copy_dbr_string ( const void *pValue, unsigned nElem )
@@ -161,3 +163,29 @@ const comQueSend::copyFunc_t comQueSend::dbrCopyVector [39] = {
0 // DBR_CLASS_NAME
};
comBuf * comQueSend::popNextComBufToSend ()
{
comBuf *pBuf = this->bufs.get ();
if ( pBuf ) {
unsigned nBytesThisBuf = pBuf->occupiedBytes ();
if ( nBytesThisBuf ) {
assert ( this->nBytesPending >= nBytesThisBuf );
this->nBytesPending -= nBytesThisBuf;
}
else {
this->bufs.push ( *pBuf );
pBuf = 0;
}
}
else {
assert ( this->nBytesPending == 0u );
}
return pBuf;
}
void comQueRecv::popString ( epicsOldString *pStr )
{
for ( unsigned i = 0u; i < sizeof ( *pStr ); i++ ) {
pStr[0][i] = this->popInt8 ();
}
}

195
src/ca/comQueSend.h Normal file
View File

@@ -0,0 +1,195 @@
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef comQueSendh
#define comQueSendh
#include <new>
#include "tsDLList.h"
#include "comBuf.h"
//
// Notes.
// o calling popNextComBufToSend() will clear
// any uncommitted bytes
//
class comQueSend {
public:
comQueSend ( wireSendAdapter & );
~comQueSend ();
void clear ();
void beginMsg ();
void commitMsg ();
unsigned occupiedBytes () const;
bool flushEarlyThreshold ( unsigned nBytesThisMsg ) const;
bool flushBlockThreshold ( unsigned nBytesThisMsg ) const;
bool dbr_type_ok ( unsigned type );
void pushUInt16 ( const ca_uint16_t value );
void pushUInt32 ( const ca_uint32_t value );
void pushFloat32 ( const ca_float32_t value );
void pushString ( const char *pVal, unsigned nChar );
void push_dbr_type ( unsigned type, const void *pVal, unsigned nElem );
comBuf * popNextComBufToSend ();
private:
tsDLList < comBuf > bufs;
tsDLIterBD < comBuf > pFirstUncommited;
wireSendAdapter & wire;
unsigned nBytesPending;
void copy_dbr_string ( const void *pValue, unsigned nElem );
void copy_dbr_short ( const void *pValue, unsigned nElem );
void copy_dbr_float ( const void *pValue, unsigned nElem );
void copy_dbr_char ( const void *pValue, unsigned nElem );
void copy_dbr_long ( const void *pValue, unsigned nElem );
void copy_dbr_double ( const void *pValue, unsigned nElem );
typedef void ( comQueSend::*copyFunc_t ) (
const void *pValue, unsigned nElem );
static const copyFunc_t dbrCopyVector [39];
//
// visual C++ version 6.0 does not allow out of
// class member template function definition
//
template < class T >
inline void copyIn ( const T *pVal, unsigned nElem )
{
comBuf * pLastBuf = this->bufs.last ();
unsigned nCopied;
if ( pLastBuf ) {
nCopied = pLastBuf->copyIn ( pVal, nElem );
}
else {
nCopied = 0u;
}
while ( nElem > nCopied ) {
comBuf * pComBuf = new ( std::nothrow ) comBuf;
if ( ! pComBuf ) {
throw std::bad_alloc ();
}
unsigned nNew = pComBuf->copyIn ( &pVal[nCopied], nElem - nCopied );
nCopied += nNew;
this->bufs.add ( *pComBuf );
if ( ! this->pFirstUncommited.valid() ) {
this->pFirstUncommited = this->bufs.lastIter ();
}
}
}
//
// visual C++ version 6.0 does not allow out of
// class member template function definition
//
template < class T >
inline void copyIn ( const T &val )
{
comBuf *pComBuf = this->bufs.last ();
if ( pComBuf ) {
if ( pComBuf->copyIn ( &val, 1u ) >= 1u ) {
return;
}
}
pComBuf = new ( std::nothrow ) comBuf;
if ( ! pComBuf ) {
throw std::bad_alloc ();
}
assert ( pComBuf->copyIn ( &val, 1u ) == 1u );
this->bufs.add ( *pComBuf );
if ( ! this->pFirstUncommited.valid() ) {
this->pFirstUncommited = this->bufs.lastIter ();
}
return;
}
};
inline bool comQueSend::dbr_type_ok ( unsigned type )
{
if ( type >= ( sizeof ( this->dbrCopyVector ) / sizeof ( this->dbrCopyVector[0] ) ) ) {
return false;
}
if ( ! this->dbrCopyVector [type] ) {
return false;
}
return true;
}
inline void comQueSend::pushUInt16 ( const ca_uint16_t value )
{
this->copyIn ( value );
}
inline void comQueSend::pushUInt32 ( const ca_uint32_t value )
{
this->copyIn ( value );
}
inline void comQueSend::pushFloat32 ( const ca_float32_t value )
{
this->copyIn ( value );
}
inline void comQueSend::pushString ( const char *pVal, unsigned nChar )
{
this->copyIn ( pVal, nChar );
}
// it is assumed that dbr_type_ok() was called prior to calling this routine
// to check the type code
inline void comQueSend::push_dbr_type ( unsigned type, const void *pVal, unsigned nElem )
{
( this->*dbrCopyVector [type] ) ( pVal, nElem );
}
inline unsigned comQueSend::occupiedBytes () const
{
return this->nBytesPending;
}
inline bool comQueSend::flushBlockThreshold ( unsigned nBytesThisMsg ) const
{
return ( this->nBytesPending + nBytesThisMsg > 16 * comBuf::capacityBytes () );
}
inline bool comQueSend::flushEarlyThreshold ( unsigned nBytesThisMsg ) const
{
return ( this->nBytesPending + nBytesThisMsg > 4 * comBuf::capacityBytes () );
}
inline void comQueSend::beginMsg ()
{
while ( this->pFirstUncommited.valid() ) {
tsDLIterBD < comBuf > next = this->pFirstUncommited;
next++;
this->pFirstUncommited->clearUncommittedIncomming ();
if ( this->pFirstUncommited->occupiedBytes() == 0u ) {
this->bufs.remove ( *this->pFirstUncommited );
}
this->pFirstUncommited = next;
}
this->pFirstUncommited = this->bufs.lastIter ();
}
inline void comQueSend::commitMsg ()
{
while ( this->pFirstUncommited.valid() ) {
this->nBytesPending += this->pFirstUncommited->uncommittedBytes ();
this->pFirstUncommited->commitIncomming ();
this->pFirstUncommited++;
}
}
#endif // ifndef comQueSendh

View File

@@ -17,7 +17,7 @@
#define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
#include "iocinf.h"
#include "virtualCircuit.h"
#include "hostNameCache.h"
tsFreeList < hostNameCache, 16 > hostNameCache::freeList;
epicsMutex hostNameCache::freeListMutex;

40
src/ca/hostNameCache.h Normal file
View File

@@ -0,0 +1,40 @@
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef hostNameCacheh
#define hostNameCacheh
#include "ipAddrToAsciiAsynchronous.h"
#include "tsFreeList.h"
class hostNameCache : public ipAddrToAsciiAsynchronous {
public:
hostNameCache ( const osiSockAddr &addr, ipAddrToAsciiEngine &engine );
~hostNameCache ();
void destroy ();
void ioCompletionNotify ( const char *pHostName );
void hostName ( char *pBuf, unsigned bufLength ) const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
private:
bool ioComplete;
char hostNameBuf [128];
static tsFreeList < class hostNameCache, 16 > freeList;
static epicsMutex freeListMutex;
};
#endif // #ifndef hostNameCacheh

View File

@@ -130,7 +130,6 @@ protected:
private:
tsDLList < syncGroupNotify > ioList;
epicsMutex mutable mutex;
epicsMutex serializeBlock;
epicsEvent sem;
oldCAC & client;
unsigned magic;

47
src/ca/tcpRecvWatchdog.h Normal file
View File

@@ -0,0 +1,47 @@
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef tcpRecvWatchdogh
#define tcpRecvWatchdogh
#include "epicsTimer.h"
class tcpiiu;
class tcpRecvWatchdog : private epicsTimerNotify {
public:
tcpRecvWatchdog ( tcpiiu &, double periodIn, epicsTimerQueue & );
virtual ~tcpRecvWatchdog ();
void rescheduleRecvTimer ();
void sendBacklogProgressNotify ();
void messageArrivalNotify ();
void beaconArrivalNotify ();
void beaconAnomalyNotify ();
void connectNotify ();
void cancel ();
void show ( unsigned level ) const;
private:
const double period;
epicsTimer & timer;
tcpiiu &iiu;
bool responsePending;
bool beaconAnomaly;
expireStatus expire ( const epicsTime & currentTime );
};
#endif // #ifndef tcpRecvWatchdogh

36
src/ca/tcpSendWatchdog.h Normal file
View File

@@ -0,0 +1,36 @@
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef tcpSendWatchdogh
#define tcpSendWatchdogh
#include "epicsTimer.h"
class tcpSendWatchdog : private epicsTimerNotify {
public:
tcpSendWatchdog ( tcpiiu &, double periodIn, epicsTimerQueue & queueIn );
virtual ~tcpSendWatchdog ();
void start ();
void cancel ();
private:
const double period;
epicsTimer & timer;
tcpiiu & iiu;
expireStatus expire ( const epicsTime & currentTime );
};
#endif // #ifndef tcpSendWatchdog

View File

@@ -19,6 +19,7 @@
#include "cac.h"
#include "netiiu.h"
#include "msgForMultiplyDefinedPV.h"
#include "hostNameCache.h"
#define epicsExportSharedSymbols
#include "net_convert.h"
@@ -266,44 +267,18 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
// file manager call backs works correctly. This does not
// appear to impact performance.
//
// We also use select() here because shutdown() does not
// unblock a thread in recv() on WIN32 and probably also on
// vxWorks. This is a problem even if preemptive callbacks
// are enabled.
//
while ( true ) {
fd_set recvInterest;
struct timeval tmo;
tmo.tv_sec = 5; // seconds
tmo.tv_usec = 0; // micro seconds
FD_ZERO ( & recvInterest );
FD_SET ( piiu->sock, & recvInterest );
int status = select ( piiu->sock + 1,
& recvInterest, 0, 0, & tmo );
if ( piiu->state != iiu_connected ) {
break;
}
if ( status < 0 ) {
int localErrno = SOCKERRNO;
if ( localErrno == SOCK_EINTR ) {
continue;
}
else if ( localErrno == SOCK_EBADF ) {
piiu->state = iiu_disconnected;
break;
}
else {
errlogPrintf ( "Select error was %s\n",
SOCKERRSTR ( localErrno ) );
epicsThreadSleep ( 1.0 );
continue;
}
}
else if ( status == 1 ) {
unsigned nBytesIn;
if ( piiu->pCAC()->preemptiveCallbackEnable() ) {
nBytesIn = pComBuf->fillFromWire ( *piiu );
if ( nBytesIn == 0u ) {
break;
}
}
else {
char buf;
::recv ( piiu->sock, &buf, 1, MSG_PEEK );
}
if ( piiu->state != iiu_connected ) {
break;
}
@@ -311,16 +286,16 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
// only one recv thread at a time may call callbacks
callbackAutoMutex autoMutex ( *piiu->pCAC() );
osiSockIoctl_t bytesPending = 0;
do {
unsigned nBytesIn = pComBuf->fillFromWire ( *piiu );
if ( ! piiu->pCAC()->preemptiveCallbackEnable() ) {
nBytesIn = pComBuf->fillFromWire ( *piiu );
if ( nBytesIn == 0u ) {
// outer loop checks to see if state is connected
// ( properly set by fillFromWire() )
break;
}
}
piiu->recvQue.pushLastComBufReceived ( *pComBuf );
while ( true ) {
if ( nBytesIn == pComBuf->capacityBytes () ) {
if ( piiu->contigRecvMsgCount >=
@@ -337,6 +312,9 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
}
piiu->unacknowledgedSendBytes = 0u;
piiu->recvQue.pushLastComBufReceived ( *pComBuf );
pComBuf = 0;
// reschedule connection activity watchdog
// but dont hold the lock for fear of deadlocking
// because cancel is blocking for the completion
@@ -344,20 +322,34 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
piiu->recvDog.messageArrivalNotify ();
// execute receive labor
piiu->processIncoming ();
bool noProtocolViolation = piiu->processIncoming ();
if ( ! noProtocolViolation ) {
piiu->state = iiu_disconnected;
break;
}
// allocate a new com buf
pComBuf = new ( std::nothrow ) comBuf;
nBytesIn = 0u;
if ( ! pComBuf ) {
break;
}
{
int status;
osiSockIoctl_t bytesPending = 0;
status = socket_ioctl ( piiu->sock, FIONREAD, & bytesPending );
if ( status ) {
bytesPending = 0u;
if ( status || bytesPending == 0u ) {
break;
}
nBytesIn = pComBuf->fillFromWire ( *piiu );
if ( nBytesIn == 0u ) {
// outer loop checks to see if state is connected
// ( properly set by fillFromWire() )
break;
}
}
} while ( bytesPending && pComBuf );
}
}
if ( pComBuf ) {
@@ -367,7 +359,6 @@ extern "C" void cacRecvThreadTCP ( void *pParam )
{
callbackAutoMutex autoMutex ( *piiu->pCAC() );
piiu->pCAC()->uninstallIIU ( *piiu );
piiu->pCAC()->notifyDestroyFD ( piiu->sock );
}
piiu->destroy ();
}
@@ -556,60 +547,69 @@ void tcpiiu::connect ()
}
}
/*
* tcpiiu::cleanShutdown ()
*/
void tcpiiu::cleanShutdown ()
{
epicsAutoMutex autoMutex ( this->pCAC()->mutexRef() );
if ( this->state == iiu_connected || this->state == iiu_connecting ) {
int status;
/*
* on winsock and probably vxWorks shutdown() does not
* unblock a thread in recv() so we use close and introduce
* some complexity because we must unregister the fd early
*/
status = shutdown ( this->sock, SD_BOTH );
if ( status ) {
errlogPrintf ("CAC TCP socket shutdown error was %s\n",
SOCKERRSTR (SOCKERRNO) );
status = socket_close ( this->sock );
if ( status ) {
errlogPrintf ("CAC TCP socket close error was %s\n",
SOCKERRSTR (SOCKERRNO) );
}
else {
this->sockCloseCompleted = true;
this->state = iiu_disconnected;
}
}
else {
this->state = iiu_disconnected;
}
this->sendThreadFlushEvent.signal ();
}
}
/*
* tcpiiu::forcedShutdown ()
*/
void tcpiiu::forcedShutdown ()
{
epicsAutoMutex autoMutex ( this->pCAC()->mutexRef() );
// generate some NOOP UDP traffic so that ca_pend_event()
// will get called in preemptive callback disabled
// applications, and therefore the callback lock below
// will not block
this->pCAC()->udpWakeup ();
callbackAutoMutex autoMutexCB ( *this->pCAC() );
epicsAutoMutex autoMutexCAC ( this->pCAC()->mutexRef() );
this->shutdown ( true );
}
if ( this->state != iiu_disconnected || this->state == iiu_connecting ) {
// force abortive shutdown sequence (discard outstanding sends
// and receives)
struct linger tmpLinger;
tmpLinger.l_onoff = true;
tmpLinger.l_linger = 0u;
int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER,
reinterpret_cast <char *> ( &tmpLinger ), sizeof (tmpLinger) );
if ( status != 0 ) {
errlogPrintf ( "CAC TCP socket linger set error was %s\n",
void tcpiiu::cleanShutdown ()
{
// generate some NOOP UDP traffic so that ca_pend_event()
// will get called in preemptive callback disabled
// applications, and therefore the callback lock below
// will not block
this->pCAC()->udpWakeup ();
callbackAutoMutex autoMutexCB ( *this->pCAC() );
epicsAutoMutex autoMutexCAC ( this->pCAC()->mutexRef() );
this->shutdown ( false );
}
//
// tcpiiu::shutdown ()
//
// caller must hold callback mutex and also primary cac mutex
// when calling this routine
//
void tcpiiu::shutdown ( bool discardPendingMessages )
{
if ( ! this->sockCloseCompleted ) {
this->state = iiu_disconnected;
this->sockCloseCompleted = true;
this->pCAC()->notifyDestroyFD ( this->sock );
if ( discardPendingMessages ) {
// force abortive shutdown sequence
// (discard outstanding sends and receives)
struct linger tmpLinger;
tmpLinger.l_onoff = true;
tmpLinger.l_linger = 0u;
int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER,
reinterpret_cast <char *> ( &tmpLinger ), sizeof (tmpLinger) );
if ( status != 0 ) {
errlogPrintf ( "CAC TCP socket linger set error was %s\n",
SOCKERRSTR (SOCKERRNO) );
}
}
//
// on winsock and probably vxWorks shutdown() does not
// unblock a thread in recv() so we use close and introduce
// some complexity because we must unregister the fd early
//
int status = socket_close ( this->sock );
if ( status ) {
errlogPrintf ("CAC TCP socket close error was %s\n",
SOCKERRSTR (SOCKERRNO) );
}
this->cleanShutdown ();
this->sendThreadFlushEvent.signal ();
}
}
@@ -768,7 +768,7 @@ bool tcpiiu::setEchoRequestPending ()
//
// tcpiiu::processIncoming()
//
void tcpiiu::processIncoming ()
bool tcpiiu::processIncoming ()
{
while ( true ) {
@@ -781,7 +781,7 @@ void tcpiiu::processIncoming ()
if ( ! this->oldMsgHeaderAvailable ) {
if ( nBytes < sizeof ( caHdr ) ) {
this->flushIfRecvProcessRequested ();
return;
return true;
}
this->curMsg.m_cmmd = this->recvQue.popUInt16 ();
this->curMsg.m_postsize = this->recvQue.popUInt16 ();
@@ -796,7 +796,7 @@ void tcpiiu::processIncoming ()
sizeof ( this->curMsg.m_postsize ) + sizeof ( this->curMsg.m_count );
if ( this->recvQue.occupiedBytes () < annexSize ) {
this->flushIfRecvProcessRequested ();
return;
return true;
}
this->curMsg.m_postsize = this->recvQue.popUInt32 ();
this->curMsg.m_count = this->recvQue.popUInt32 ();
@@ -840,14 +840,13 @@ void tcpiiu::processIncoming ()
this->curMsg.m_postsize - this->curDataBytes );
if ( this->curDataBytes < this->curMsg.m_postsize ) {
this->flushIfRecvProcessRequested ();
return;
return true;
}
}
bool msgOK = this->pCAC()->executeResponse ( *this,
this->curMsg, this->pCurData );
if ( ! msgOK ) {
this->cleanShutdown ();
return;
return false;
}
}
else {
@@ -862,7 +861,7 @@ void tcpiiu::processIncoming ()
this->curMsg.m_postsize - this->curDataBytes );
if ( this->curDataBytes < this->curMsg.m_postsize ) {
this->flushIfRecvProcessRequested ();
return;
return true;
}
}
@@ -877,6 +876,7 @@ inline void insertRequestHeader (
ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid,
ca_uint32_t requestDependent, bool v49Ok )
{
sendQue.beginMsg ();
if ( payloadSize < 0xffff && nElem < 0xffff ) {
sendQue.pushUInt16 ( request );
sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( payloadSize ) );
@@ -920,6 +920,7 @@ void tcpiiu::hostNameSetRequest ()
epicsAutoMutex locker ( this->pCAC()->mutexRef() );
this->sendQue.beginMsg ();
this->sendQue.pushUInt16 ( CA_PROTO_HOST_NAME ); // cmd
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( postSize ) ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
@@ -928,6 +929,7 @@ void tcpiiu::hostNameSetRequest ()
this->sendQue.pushUInt32 ( 0u ); // available
this->sendQue.pushString ( pName, size );
this->sendQue.pushString ( nillBytes, postSize - size );
this->sendQue.commitMsg ();
}
/*
@@ -949,6 +951,7 @@ void tcpiiu::userNameSetRequest ()
}
epicsAutoMutex locker ( this->pCAC()->mutexRef() );
this->sendQue.beginMsg ();
this->sendQue.pushUInt16 ( CA_PROTO_CLIENT_NAME ); // cmd
this->sendQue.pushUInt16 ( postSize ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
@@ -957,6 +960,7 @@ void tcpiiu::userNameSetRequest ()
this->sendQue.pushUInt32 ( 0u ); // available
this->sendQue.pushString ( pName, size );
this->sendQue.pushString ( nillBytes, postSize - size );
this->sendQue.commitMsg ();
}
void tcpiiu::disableFlowControlRequest ()
@@ -966,13 +970,14 @@ void tcpiiu::disableFlowControlRequest ()
}
epicsAutoMutex locker ( this->pCAC()->mutexRef() );
this->sendQue.beginMsg ();
this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_ON ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
this->sendQue.pushUInt16 ( 0u ); // count
this->sendQue.pushUInt32 ( 0u ); // cid
this->sendQue.pushUInt32 ( 0u ); // available
this->sendQue.commitMsg ();
}
void tcpiiu::enableFlowControlRequest ()
@@ -982,13 +987,14 @@ void tcpiiu::enableFlowControlRequest ()
}
epicsAutoMutex locker ( this->pCAC()->mutexRef() );
this->sendQue.beginMsg ();
this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_OFF ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
this->sendQue.pushUInt16 ( 0u ); // count
this->sendQue.pushUInt32 ( 0u ); // cid
this->sendQue.pushUInt32 ( 0u ); // available
this->sendQue.commitMsg ();
}
void tcpiiu::versionMessage ( const cacChannel::priLev & priority )
@@ -1000,13 +1006,14 @@ void tcpiiu::versionMessage ( const cacChannel::priLev & priority )
}
epicsAutoMutex locker ( this->pCAC()->mutexRef() );
this->sendQue.beginMsg ();
this->sendQue.pushUInt16 ( CA_PROTO_VERSION ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize ( old possize field )
this->sendQue.pushUInt16 ( priority ); // old dataType field
this->sendQue.pushUInt16 ( CA_MINOR_PROTOCOL_REVISION ); // old count field
this->sendQue.pushUInt32 ( 0u ); // ( old cid field )
this->sendQue.pushUInt32 ( 0u ); // ( old available field )
this->sendQue.commitMsg ();
}
void tcpiiu::echoRequest ()
@@ -1016,13 +1023,14 @@ void tcpiiu::echoRequest ()
}
epicsAutoMutex locker ( this->pCAC()->mutexRef() );
this->sendQue.beginMsg ();
this->sendQue.pushUInt16 ( CA_PROTO_ECHO ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
this->sendQue.pushUInt16 ( 0u ); // count
this->sendQue.pushUInt32 ( 0u ); // cid
this->sendQue.pushUInt32 ( 0u ); // available
this->sendQue.commitMsg ();
}
inline void insertRequestWithPayLoad (
@@ -1069,7 +1077,9 @@ inline void insertRequestWithPayLoad (
else {
sendQue.push_dbr_type ( dataType, pPayload, nElem );
}
// set pad bytes to nill
sendQue.pushString ( nillBytes, payloadSize - size );
sendQue.commitMsg ();
}
void tcpiiu::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue )
@@ -1125,6 +1135,7 @@ void tcpiiu::readNotifyRequest ( nciu &chan, netReadNotifyIO &io,
static_cast < ca_uint16_t > ( dataType ),
nElem, chan.getSID(), io.getID(),
CA_V49 ( this->minorProtocolVersion ) );
this->sendQue.commitMsg ();
}
void tcpiiu::createChannelRequest ( nciu &chan )
@@ -1149,6 +1160,7 @@ void tcpiiu::createChannelRequest ( nciu &chan )
throw cacChannel::unsupportedByService();
}
this->sendQue.beginMsg ();
this->sendQue.pushUInt16 ( CA_PROTO_CLAIM_CIU ); // cmd
this->sendQue.pushUInt16 ( postCnt ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
@@ -1166,17 +1178,20 @@ void tcpiiu::createChannelRequest ( nciu &chan )
if ( postCnt > nameLength ) {
this->sendQue.pushString ( nillBytes, postCnt - nameLength );
}
this->sendQue.commitMsg ();
}
void tcpiiu::clearChannelRequest ( nciu &chan )
{
if ( chan.connected () ) {
this->sendQue.beginMsg ();
this->sendQue.pushUInt16 ( CA_PROTO_CLEAR_CHANNEL ); // cmd
this->sendQue.pushUInt16 ( 0u ); // postsize
this->sendQue.pushUInt16 ( 0u ); // dataType
this->sendQue.pushUInt16 ( 0u ); // count
this->sendQue.pushUInt32 ( chan.getSID () ); // cid
this->sendQue.pushUInt32 ( chan.getCID () ); // available
this->sendQue.commitMsg ();
}
}
@@ -1219,6 +1234,7 @@ void tcpiiu::subscriptionRequest ( nciu &chan, netSubscription & subscr )
this->sendQue.pushFloat32 ( 0.0 ); // m_toval
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( mask ) ); // m_mask
this->sendQue.pushUInt16 ( 0u ); // m_pad
this->sendQue.commitMsg ();
}
void tcpiiu::subscriptionCancelRequest ( nciu &chan, netSubscription &subscr )
@@ -1229,11 +1245,16 @@ void tcpiiu::subscriptionCancelRequest ( nciu &chan, netSubscription &subscr )
static_cast < ca_uint16_t > ( subscr.getCount () ),
chan.getSID(), subscr.getID(),
CA_V49 ( this->minorProtocolVersion ) );
this->sendQue.commitMsg ();
}
//
// caller must hold both the callback mutex and
// also the cac primary mutex
//
void tcpiiu::lastChannelDetachNotify ()
{
this->cleanShutdown ();
this->shutdown ( false );
}
bool tcpiiu::flush ()
@@ -1336,6 +1357,19 @@ void tcpiiu::requestRecvProcessPostponedFlush ()
this->recvProcessPostponedFlush = true;
}
void tcpiiu::hostName ( char *pBuf, unsigned bufLength ) const
{
this->pHostNameCache->hostName ( pBuf, bufLength );
}
// deprecated - please dont use - this is _not_ thread safe
const char * tcpiiu::pHostName () const
{
static char nameBuf [128];
this->hostName ( nameBuf, sizeof ( nameBuf ) );
return nameBuf; // ouch !!
}

View File

@@ -199,7 +199,6 @@ udpiiu::~udpiiu ()
//
void udpiiu::recvMsg ()
{
char peek;
osiSockAddr src;
int status;
@@ -212,6 +211,7 @@ void udpiiu::recvMsg ()
// peek first at the message so that file descriptor managers will wake up
// in single threaded applications
osiSocklen_t src_size = sizeof ( src );
char peek;
recvfrom ( this->sock, & peek, sizeof ( peek ), MSG_PEEK,
&src.sa, &src_size );
status = 0;
@@ -486,36 +486,10 @@ void udpiiu::shutdown ()
if ( this->shutdownCmd ) {
return;
}
this->shutdownCmd = true;
caHdr msg;
msg.m_cmmd = htons ( CA_PROTO_VERSION );
msg.m_available = htonl ( 0u );
msg.m_dataType = htons ( 0u );
msg.m_count = htons ( 0u );
msg.m_cid = htonl ( 0u );
msg.m_postsize = htons ( 0u );
osiSockAddr addr;
addr.ia.sin_family = AF_INET;
addr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
addr.ia.sin_port = htons ( this->localPort );
// send a wakeup msg so the UDP recv thread will exit
int status = sendto ( this->sock, reinterpret_cast < const char * > ( &msg ),
sizeof (msg), 0, &addr.sa, sizeof ( addr.sa ) );
if ( status < 0 ) {
// this knocks the UDP input thread out of recv ()
// on all os except linux
status = socket_close ( this->sock );
if ( status == 0 ) {
this->sockCloseCompleted = true;
}
else {
errlogPrintf ("CAC UDP socket close error was %s\n",
SOCKERRSTR ( SOCKERRNO ) );
}
}
this->wakeupMsg ();
// wait for recv threads to exit
epicsEventMustWait ( this->recvThreadExitSignal );
@@ -528,7 +502,7 @@ bool udpiiu::badUDPRespAction ( const caHdr &msg,
sockAddrToDottedIP ( &netAddr.sa, buf, sizeof ( buf ) );
char date[64];
currentTime.strftime ( date, sizeof ( date ), "%a %b %d %Y %H:%M:%S");
this->printf ( "CAC: undecipherable ( bad msg code %u ) UDP message from %s at %s\n",
this->printf ( "CAC: Undecipherable ( bad msg code %u ) UDP message from %s at %s\n",
msg.m_cmmd, buf, date );
return false;
}
@@ -690,7 +664,7 @@ void udpiiu::postMsg ( const osiSockAddr & net_addr,
char buf[64];
sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) );
this->printf (
"%s: undecipherable (too small) UDP msg from %s ignored\n",
"%s: Undecipherable (too small) UDP msg from %s ignored\n",
__FILE__, buf );
return;
}
@@ -725,7 +699,7 @@ void udpiiu::postMsg ( const osiSockAddr & net_addr,
char buf[64];
sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) );
this->printf (
"%s: undecipherable (payload too small) UDP msg from %s ignored\n", __FILE__,
"%s: Undecipherable (payload too small) UDP msg from %s ignored\n", __FILE__,
buf );
return;
}
@@ -744,7 +718,7 @@ void udpiiu::postMsg ( const osiSockAddr & net_addr,
if ( ! success ) {
char buf[256];
sockAddrToDottedIP ( &net_addr.sa, buf, sizeof ( buf ) );
this->printf ( "CAC: undecipherable UDP message from %s\n", buf );
this->printf ( "CAC: Undecipherable UDP message from %s\n", buf );
return;
}
@@ -867,3 +841,36 @@ void udpiiu::show ( unsigned level ) const
}
}
void udpiiu::wakeupMsg ()
{
caHdr msg;
msg.m_cmmd = htons ( CA_PROTO_VERSION );
msg.m_available = htonl ( 0u );
msg.m_dataType = htons ( 0u );
msg.m_count = htons ( 0u );
msg.m_cid = htonl ( 0u );
msg.m_postsize = htons ( 0u );
osiSockAddr addr;
addr.ia.sin_family = AF_INET;
addr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
addr.ia.sin_port = htons ( this->localPort );
// send a wakeup msg so the UDP recv thread will exit
int status = sendto ( this->sock, reinterpret_cast < const char * > ( &msg ),
sizeof (msg), 0, &addr.sa, sizeof ( addr.sa ) );
if ( status < 0 ) {
// this knocks the UDP input thread out of recv ()
// on all os except linux
status = socket_close ( this->sock );
if ( status == 0 ) {
this->sockCloseCompleted = true;
}
else {
errlogPrintf ("CAC UDP socket close error was %s\n",
SOCKERRSTR ( SOCKERRNO ) );
}
}
}

View File

@@ -47,14 +47,15 @@ public:
virtual ~udpiiu ();
void shutdown ();
void recvMsg ();
void postMsg ( const osiSockAddr &net_addr,
void postMsg ( const osiSockAddr & net_addr,
char *pInBuf, arrayElementCount blockSize,
const epicsTime &currenTime);
const epicsTime &currenTime );
void repeaterRegistrationMessage ( unsigned attemptNumber );
void datagramFlush ();
unsigned getPort () const;
void show ( unsigned level ) const;
bool isCurrentThread () const;
void wakeupMsg ();
// exceptions
class noSocket {};

View File

@@ -18,183 +18,20 @@
#ifndef virtualCircuith
#define virtualCircuith
#include <new> // needed by comQueueSend
#include "epicsTimer.h"
#include "epicsMemory.h"
#include "ipAddrToAsciiAsynchronous.h"
#include "caServerID.h"
#include "tsSLList.h"
#include "tsDLList.h"
#include "comBuf.h"
#include "caServerID.h"
#include "netiiu.h"
#include "comQueSend.h"
#include "comQueRecv.h"
#include "tcpRecvWatchdog.h"
#include "tcpSendWatchdog.h"
enum iiu_conn_state { iiu_connecting, iiu_connected, iiu_disconnected };
class nciu;
class tcpiiu;
class comQueSend {
public:
comQueSend ( wireSendAdapter & );
~comQueSend ();
void clear ();
unsigned occupiedBytes () const;
bool flushEarlyThreshold ( unsigned nBytesThisMsg ) const;
bool flushBlockThreshold ( unsigned nBytesThisMsg ) const;
bool dbr_type_ok ( unsigned type );
void pushUInt16 ( const ca_uint16_t value );
void pushUInt32 ( const ca_uint32_t value );
void pushFloat32 ( const ca_float32_t value );
void pushString ( const char *pVal, unsigned nChar );
void push_dbr_type ( unsigned type, const void *pVal, unsigned nElem );
comBuf * popNextComBufToSend ();
private:
wireSendAdapter & wire;
tsDLList < comBuf > bufs;
unsigned nBytesPending;
void copy_dbr_string ( const void *pValue, unsigned nElem );
void copy_dbr_short ( const void *pValue, unsigned nElem );
void copy_dbr_float ( const void *pValue, unsigned nElem );
void copy_dbr_char ( const void *pValue, unsigned nElem );
void copy_dbr_long ( const void *pValue, unsigned nElem );
void copy_dbr_double ( const void *pValue, unsigned nElem );
typedef void ( comQueSend::*copyFunc_t ) (
const void *pValue, unsigned nElem );
static const copyFunc_t dbrCopyVector [39];
//
// visual C++ version 6.0 does not allow out of
// class member template function definition
//
template < class T >
inline void copyIn ( const T *pVal, unsigned nElem )
{
unsigned nCopied;
comBuf *pComBuf = this->bufs.last ();
if ( pComBuf ) {
nCopied = pComBuf->copyIn ( pVal, nElem );
this->nBytesPending += nCopied * sizeof ( T );
}
else {
nCopied = 0u;
}
while ( nElem > nCopied ) {
pComBuf = new ( std::nothrow ) comBuf;
if ( ! pComBuf ) {
this->wire.forcedShutdown ();
throw std::bad_alloc ();
}
unsigned nNew = pComBuf->copyIn ( &pVal[nCopied], nElem - nCopied );
nCopied += nNew;
this->nBytesPending += nNew * sizeof ( T );
this->bufs.add ( *pComBuf );
}
}
//
// visual C++ version 6.0 does not allow out of
// class member template function definition
//
template < class T >
inline void copyIn ( const T &val )
{
comBuf *pComBuf = this->bufs.last ();
if ( pComBuf ) {
if ( pComBuf->copyIn ( &val, 1u ) >= 1u ) {
this->nBytesPending += sizeof ( T );
return;
}
}
pComBuf = new ( std::nothrow ) comBuf;
if ( ! pComBuf ) {
this->wire.forcedShutdown ();
throw std::bad_alloc ();
}
if ( pComBuf->copyIn ( &val, 1u ) == 0u ) {
this->wire.forcedShutdown ();
throw -1;
}
this->bufs.add ( *pComBuf );
this->nBytesPending += sizeof ( T );
return;
}
};
static const unsigned maxBytesPendingTCP = 0x4000;
class comQueRecv {
public:
comQueRecv ();
~comQueRecv ();
unsigned occupiedBytes () const;
unsigned copyOutBytes ( epicsInt8 *pBuf, unsigned nBytes );
unsigned removeBytes ( unsigned nBytes );
void pushLastComBufReceived ( comBuf & );
void clear ();
epicsInt8 popInt8 ();
epicsUInt8 popUInt8 ();
epicsInt16 popInt16 ();
epicsUInt16 popUInt16 ();
epicsInt32 popInt32 ();
epicsUInt32 popUInt32 ();
epicsFloat32 popFloat32 ();
epicsFloat64 popFloat64 ();
void popString ( epicsOldString * );
class insufficentBytesAvailable {};
private:
tsDLList < comBuf > bufs;
unsigned nBytesPending;
};
class tcpRecvWatchdog : private epicsTimerNotify {
public:
tcpRecvWatchdog ( tcpiiu &, double periodIn, epicsTimerQueue & );
virtual ~tcpRecvWatchdog ();
void rescheduleRecvTimer ();
void sendBacklogProgressNotify ();
void messageArrivalNotify ();
void beaconArrivalNotify ();
void beaconAnomalyNotify ();
void connectNotify ();
void cancel ();
void show ( unsigned level ) const;
private:
const double period;
epicsTimer & timer;
tcpiiu &iiu;
bool responsePending;
bool beaconAnomaly;
expireStatus expire ( const epicsTime & currentTime );
};
class tcpSendWatchdog : private epicsTimerNotify {
public:
tcpSendWatchdog ( tcpiiu &, double periodIn, epicsTimerQueue & queueIn );
virtual ~tcpSendWatchdog ();
void start ();
void cancel ();
private:
const double period;
epicsTimer & timer;
tcpiiu & iiu;
expireStatus expire ( const epicsTime & currentTime );
};
class hostNameCache : public ipAddrToAsciiAsynchronous {
public:
hostNameCache ( const osiSockAddr &addr, ipAddrToAsciiEngine &engine );
~hostNameCache ();
void destroy ();
void ioCompletionNotify ( const char *pHostName );
void hostName ( char *pBuf, unsigned bufLength ) const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
private:
bool ioComplete;
char hostNameBuf [128];
static tsFreeList < class hostNameCache, 16 > freeList;
static epicsMutex freeListMutex;
};
extern "C" void cacSendThreadTCP ( void *pParam );
extern "C" void cacRecvThreadTCP ( void *pParam );
@@ -208,6 +45,9 @@ struct caHdrLargeArray {
ca_uint16_t m_cmmd; // operation to be performed
};
class hostNameCache;
class ipAddrToAsciiEngine;
class tcpiiu :
public netiiu, public tsDLNode < tcpiiu >,
public tsSLNode < tcpiiu >, public caServerID,
@@ -220,10 +60,10 @@ public:
~tcpiiu ();
void connect ();
void destroy ();
void forcedShutdown ();
void cleanShutdown ();
void beaconAnomalyNotify ();
void beaconArrivalNotify ();
void forcedShutdown ();
void flushRequest ();
bool flushBlockThreshold () const;
@@ -274,7 +114,9 @@ private:
bool earlyFlush;
bool recvProcessPostponedFlush;
void processIncoming ();
void shutdown ( bool discardPendingMessages );
bool processIncoming ();
unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf );
unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf );
@@ -302,143 +144,6 @@ private:
bool flush (); // only to be called by the send thread
};
inline bool comQueSend::dbr_type_ok ( unsigned type )
{
if ( type >= ( sizeof ( this->dbrCopyVector ) / sizeof ( this->dbrCopyVector[0] ) ) ) {
return false;
}
if ( ! this->dbrCopyVector [type] ) {
return false;
}
return true;
}
inline void comQueSend::pushUInt16 ( const ca_uint16_t value )
{
this->copyIn ( value );
}
inline void comQueSend::pushUInt32 ( const ca_uint32_t value )
{
this->copyIn ( value );
}
inline void comQueSend::pushFloat32 ( const ca_float32_t value )
{
this->copyIn ( value );
}
inline void comQueSend::pushString ( const char *pVal, unsigned nChar )
{
this->copyIn ( pVal, nChar );
}
// it is assumed that dbr_type_ok() was called prior to calling this routine
// to check the type code
inline void comQueSend::push_dbr_type ( unsigned type, const void *pVal, unsigned nElem )
{
( this->*dbrCopyVector [type] ) ( pVal, nElem );
}
inline unsigned comQueSend::occupiedBytes () const
{
return this->nBytesPending;
}
inline bool comQueSend::flushBlockThreshold ( unsigned nBytesThisMsg ) const
{
return ( this->nBytesPending + nBytesThisMsg > 16 * comBuf::capacityBytes () );
}
inline bool comQueSend::flushEarlyThreshold ( unsigned nBytesThisMsg ) const
{
return ( this->nBytesPending + nBytesThisMsg > 4 * comBuf::capacityBytes () );
}
inline comBuf * comQueSend::popNextComBufToSend ()
{
comBuf *pBuf = this->bufs.get ();
if ( pBuf ) {
unsigned nBytesThisBuf = pBuf->occupiedBytes ();
assert ( this->nBytesPending >= nBytesThisBuf );
this->nBytesPending -= pBuf->occupiedBytes ();
}
else {
assert ( this->nBytesPending == 0u );
}
return pBuf;
}
inline unsigned comQueRecv::occupiedBytes () const
{
return this->nBytesPending;
}
inline epicsInt8 comQueRecv::popInt8 ()
{
return static_cast < epicsInt8 > ( this->popUInt8() );
}
inline epicsInt16 comQueRecv::popInt16 ()
{
epicsInt16 tmp;
tmp = this->popInt8() << 8u;
tmp |= this->popInt8() << 0u;
return tmp;
}
inline epicsInt32 comQueRecv::popInt32 ()
{
epicsInt32 tmp ;
tmp |= this->popInt8() << 24u;
tmp |= this->popInt8() << 16u;
tmp |= this->popInt8() << 8u;
tmp |= this->popInt8() << 0u;
return tmp;
}
inline epicsFloat32 comQueRecv::popFloat32 ()
{
epicsFloat32 tmp;
epicsUInt8 wire[ sizeof ( tmp ) ];
for ( unsigned i = 0u; i < sizeof ( tmp ); i++ ) {
wire[i] = this->popUInt8 ();
}
osiConvertFromWireFormat ( tmp, wire );
return tmp;
}
inline epicsFloat64 comQueRecv::popFloat64 ()
{
epicsFloat64 tmp;
epicsUInt8 wire[ sizeof ( tmp ) ];
for ( unsigned i = 0u; i < sizeof ( tmp ); i++ ) {
wire[i] = this->popUInt8 ();
}
osiConvertFromWireFormat ( tmp, wire );
return tmp;
}
inline void comQueRecv::popString ( epicsOldString *pStr )
{
for ( unsigned i = 0u; i < sizeof ( *pStr ); i++ ) {
pStr[0][i] = this->popInt8 ();
}
}
inline void tcpiiu::hostName ( char *pBuf, unsigned bufLength ) const
{
this->pHostNameCache->hostName ( pBuf, bufLength );
}
// deprecated - please dont use - this is _not_ thread safe
inline const char * tcpiiu::pHostName () const
{
static char nameBuf [128];
this->hostName ( nameBuf, sizeof ( nameBuf ) );
return nameBuf; // ouch !!
}
inline void tcpiiu::flushRequest ()
{
this->sendThreadFlushEvent.signal ();