Merged Ralph's ca-over-tcp branch.

Fixed some vxWorks compile errors, and made a number of edits
to the HTML in the CAref.html documentation.
This commit is contained in:
Andrew Johnson
2010-08-10 16:05:46 -05:00
24 changed files with 1450 additions and 771 deletions
+1
View File
@@ -31,6 +31,7 @@
# (see CAref.html in this distribution)
EPICS_CA_ADDR_LIST=""
EPICS_CA_AUTO_ADDR_LIST=YES
EPICS_CA_NAME_SERVERS=""
EPICS_CA_CONN_TMO=30.0
EPICS_CA_REPEATER_PORT=5065
EPICS_CA_SERVER_PORT=5064
+2 -7
View File
@@ -3,14 +3,12 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
*/
#include <string>
@@ -314,6 +312,3 @@ void CASG::operator delete ( void * )
errlogPrintf ( "%s:%d this compiler is confused about placement delete - memory was probably leaked",
__FILE__, __LINE__ );
}
+388 -369
View File
File diff suppressed because it is too large Load Diff
+39
View File
@@ -0,0 +1,39 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
#ifndef SearchDest_h
#define SearchDest_h
#include <osiSock.h>
#include <epicsTime.h>
#include <tsDLList.h>
#include "caProto.h"
class channelNode;
class epicsMutex;
template < class T > class epicsGuard;
struct SearchDest :
public tsDLNode < SearchDest > {
virtual ~SearchDest () {};
struct Callback {
virtual ~Callback () {};
virtual void notify (
const caHdr & msg, const void * pPayload,
const osiSockAddr & addr, const epicsTime & ) = 0;
virtual void show (
epicsGuard < epicsMutex > &, unsigned level ) const = 0;
};
virtual void searchRequest ( epicsGuard < epicsMutex > &,
const char * pbuf, size_t len ) = 0;
virtual void show ( epicsGuard < epicsMutex > &, unsigned level ) const = 0;
};
#endif // SearchDest_h
+4 -1
View File
@@ -40,6 +40,7 @@
# define CA_V49(MINOR) ((MINOR)>=9u) /* large arrays, dispatch priorities */
# define CA_V410(MINOR) ((MINOR)>=10u) /* beacon counter */
# define CA_V411(MINOR) ((MINOR)>=11u) /* sequence numbers in UDP version command */
# define CA_V412(MINOR) ((MINOR)>=12u) /* TCP-based search requests */
#elif CA_MAJOR_PROTOCOL_REVISION > 4u
# define CA_V41(MINOR) ( 1u )
# define CA_V42(MINOR) ( 1u )
@@ -52,6 +53,7 @@
# define CA_V49(MINOR) ( 1u )
# define CA_V410(MINOR) ( 1u )
# define CA_V411(MINOR) ( 1u )
# define CA_V412(MINOR) ( 1u )
#else
# define CA_V41(MINOR) ( 0u )
# define CA_V42(MINOR) ( 0u )
@@ -64,7 +66,8 @@
# define CA_V49(MINOR) ( 0u )
# define CA_V410(MINOR) ( 0u )
# define CA_V411(MINOR) ( 0u )
#endif
# define CA_V412(MINOR) ( 0u )
#endif
/*
* These port numbers are only used if the CA repeater and
+2 -2
View File
@@ -3,10 +3,10 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
*
*
+120 -55
View File
@@ -3,12 +3,11 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
*
/*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
@@ -16,6 +15,7 @@
* Copyright, 1986, The Regents of the University of California.
*
* Author: Jeff Hill
*
*/
#define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
@@ -34,6 +34,7 @@
#include "errlog.h"
#define epicsExportSharedSymbols
#include "addrList.h"
#include "iocinf.h"
#include "cac.h"
#include "inetAddrID.h"
@@ -62,7 +63,7 @@ const cac::pProtoStubTCP cac::tcpJumpTableCAC [] =
&cac::readRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::searchRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
@@ -177,6 +178,10 @@ cac::cac (
strncpy ( this->pUserName, tmp, len );
}
this->_serverPort =
envGetInetPortConfigParam ( &EPICS_CA_SERVER_PORT,
static_cast <unsigned short> (CA_SERVER_PORT) );
status = envGetDoubleConfigParam ( &EPICS_CA_CONN_TMO, &this->connTMO );
if ( status ) {
this->connTMO = CA_CONN_VERIFY_PERIOD;
@@ -234,6 +239,34 @@ cac::cac (
this->timerQueue.release ();
throw;
}
/*
* load user configured tcp name server address list,
* create virtual circuits, and add them to server table
*/
ELLLIST dest, tmpList;
ellInit ( & dest );
ellInit ( & tmpList );
addAddrToChannelAccessAddressList ( &tmpList, &EPICS_CA_NAME_SERVERS, this->_serverPort, false );
removeDuplicateAddresses ( &dest, &tmpList, 0 );
epicsGuard < epicsMutex > guard ( this->mutex );
while ( osiSockAddrNode *
pNode = reinterpret_cast < osiSockAddrNode * > ( ellGet ( & dest ) ) ) {
tcpiiu * piiu = NULL;
SearchDestTCP * pdst = new SearchDestTCP ( *this, pNode->addr );
this->registerSearchDest ( guard, * pdst );
bool newIIU = findOrCreateVirtCircuit (
guard, pNode->addr, cacChannel::priorityDefault,
piiu, CA_UKN_MINOR_VERSION, pdst );
free ( pNode );
if ( newIIU ) {
piiu->start ( guard );
}
}
}
cac::~cac ()
@@ -468,7 +501,8 @@ cacChannel & cac::createChannel (
if ( ! this->pudpiiu ) {
this->pudpiiu = new udpiiu (
guard, this->timerQueue, this->cbMutex,
this->mutex, this->notify, *this );
this->mutex, this->notify, *this, this->_serverPort,
this->searchDestList );
}
nciu * pNetChan = new ( this->channelFreeList )
@@ -477,6 +511,57 @@ cacChannel & cac::createChannel (
return *pNetChan;
}
bool cac::findOrCreateVirtCircuit (
epicsGuard < epicsMutex > & guard, const osiSockAddr & addr,
unsigned priority, tcpiiu *& piiu, unsigned minorVersionNumber,
SearchDestTCP * pSearchDest )
{
guard.assertIdenticalMutex ( this->mutex );
bool newIIU = false;
if ( piiu ) {
if ( ! piiu->alive ( guard ) ) {
return newIIU;
}
}
else {
try {
autoPtrFreeList < tcpiiu, 32, epicsMutexNOOP > pnewiiu (
this->freeListVirtualCircuit,
new ( this->freeListVirtualCircuit ) tcpiiu (
*this, this->mutex, this->cbMutex, this->notify, this->connTMO,
this->timerQueue, addr, this->comBufMemMgr, minorVersionNumber,
this->ipToAEngine, priority, pSearchDest ) );
bhe * pBHE = this->beaconTable.lookup ( addr.ia );
if ( ! pBHE ) {
pBHE = new ( this->bheFreeList )
bhe ( this->mutex, epicsTime (), 0u, addr.ia );
if ( this->beaconTable.add ( *pBHE ) < 0 ) {
return newIIU;
}
}
this->serverTable.add ( *pnewiiu );
this->circuitList.add ( *pnewiiu );
pBHE->registerIIU ( guard, *pnewiiu );
piiu = pnewiiu.release ();
newIIU = true;
}
catch ( std :: exception & except ) {
errlogPrintf (
"CAC: exception during virtual circuit creation \"%s\"\n",
except.what () );
return newIIU;
}
catch ( ... ) {
errlogPrintf (
"CAC: Nonstandard exception during virtual circuit creation\n" );
return newIIU;
}
}
return newIIU;
}
void cac::transferChanToVirtCircuit (
unsigned cid, unsigned sid, // X aCC 431
ca_uint16_t typeCode, arrayElementCount count,
@@ -484,7 +569,7 @@ void cac::transferChanToVirtCircuit (
const epicsTime & currentTime )
{
if ( addr.sa.sa_family != AF_INET ) {
return ;
return;
}
epicsGuard < epicsMutex > guard ( this->mutex );
@@ -501,6 +586,7 @@ void cac::transferChanToVirtCircuit (
* Ignore duplicate search replies
*/
osiSockAddr chanAddr = pChan->getPIIU(guard)->getNetworkAddress (guard);
if ( chanAddr.sa.sa_family != AF_UNSPEC ) {
if ( ! sockAddrAreIdentical ( &addr, &chanAddr ) ) {
char acc[64];
@@ -519,52 +605,12 @@ void cac::transferChanToVirtCircuit (
return;
}
/*
* look for an existing virtual circuit
*/
bool newIIU = false;
caServerID servID ( addr.ia, pChan->getPriority(guard) );
tcpiiu * piiu = this->serverTable.lookup ( servID );
if ( piiu ) {
if ( ! piiu->alive ( guard ) ) {
return;
}
}
else {
try {
autoPtrFreeList < tcpiiu, 32, epicsMutexNOOP > pnewiiu (
this->freeListVirtualCircuit,
new ( this->freeListVirtualCircuit ) tcpiiu (
*this, this->mutex, this->cbMutex, this->notify, this->connTMO,
this->timerQueue, addr, this->comBufMemMgr, minorVersionNumber,
this->ipToAEngine, pChan->getPriority(guard) ) );
bhe * pBHE = this->beaconTable.lookup ( addr.ia );
if ( ! pBHE ) {
pBHE = new ( this->bheFreeList )
bhe ( this->mutex, epicsTime (), 0u, addr.ia );
if ( this->beaconTable.add ( *pBHE ) < 0 ) {
return;
}
}
this->serverTable.add ( *pnewiiu );
this->circuitList.add ( *pnewiiu );
this->iiuExistenceCount++;
pBHE->registerIIU ( guard, *pnewiiu );
piiu = pnewiiu.release ();
newIIU = true;
}
catch ( std :: exception & except ) {
errlogPrintf (
"CAC: exception during virtual circuit creation \"%s\"\n",
except.what () );
return;
}
catch ( ... ) {
errlogPrintf (
"CAC: nonstandard exception during virtual circuit creation\n" );
return;
}
}
bool newIIU = findOrCreateVirtCircuit (
guard, addr,
pChan->getPriority(guard), piiu, minorVersionNumber );
// must occur before moving to new iiu
pChan->getPIIU(guard)->uninstallChanDueToSuccessfulSearchResponse (
@@ -746,9 +792,10 @@ netSubscription & cac::subscriptionRequest (
return *pIO.release ();
}
bool cac::versionAction ( callbackManager &, tcpiiu &,
const epicsTime &, const caHdrLargeArray &, void * )
bool cac::versionAction ( callbackManager &, tcpiiu & iiu,
const epicsTime &, const caHdrLargeArray & msg, void * )
{
iiu.versionRespNotify ( msg );
return true;
}
@@ -831,6 +878,15 @@ bool cac::readNotifyRespAction ( callbackManager &, tcpiiu & iiu,
return true;
}
bool cac::searchRespAction ( callbackManager &, tcpiiu & iiu,
const epicsTime & currentTime, const caHdrLargeArray & msg,
void * /* pMsgBdy */ )
{
assert ( this->pudpiiu );
iiu.searchRespNotify ( currentTime, msg );
return true;
}
bool cac::eventRespAction ( callbackManager &, tcpiiu &iiu,
const epicsTime &, const caHdrLargeArray & hdr, void * pMsgBdy )
{
@@ -1076,7 +1132,7 @@ bool cac::createChannelRespAction (
}
bool cac::verifyAndDisconnectChan (
callbackManager & mgr, tcpiiu &,
callbackManager & mgr, tcpiiu &,
const epicsTime &, const caHdrLargeArray & hdr, void * /* pMsgBody */ )
{
epicsGuard < epicsMutex > guard ( this->mutex );
@@ -1139,6 +1195,7 @@ void cac::destroyIIU ( tcpiiu & iiu )
{
callbackManager mgr ( this->notify, this->cbMutex );
epicsGuard < epicsMutex > guard ( this->mutex );
if ( iiu.channelCount ( guard ) ) {
char hostNameTmp[64];
iiu.getHostName ( guard, hostNameTmp, sizeof ( hostNameTmp ) );
@@ -1166,13 +1223,14 @@ void cac::destroyIIU ( tcpiiu & iiu )
// this waits for send/recv threads to exit
// this also uses the cac free lists so cac must wait
// for this to finish before it shuts down
iiu.~tcpiiu ();
{
epicsGuard < epicsMutex > guard ( this->mutex );
this->freeListVirtualCircuit.release ( & iiu );
this->iiuExistenceCount--;
// signal iiu uninstal event so that cac can properly shut down
// signal iiu uninstall event so that cac can properly shut down
this->iiuUninstall.signal();
}
// do not touch "this" after lock is released above
@@ -1230,3 +1288,10 @@ void cac::pvMultiplyDefinedNotify ( msgForMultiplyDefinedPV & mfmdpv,
this->mdpvFreeList.release ( & mfmdpv );
}
void cac::registerSearchDest (
epicsGuard < epicsMutex > & guard,
SearchDest & req )
{
guard.assertIdenticalMutex ( this->mutex );
this->searchDestList.add ( req );
}
+28 -13
View File
@@ -3,23 +3,20 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
*
*
/*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*
*
* Author: Jeff Hill
*
*/
#ifndef cach
@@ -182,6 +179,13 @@ public:
int status, const char * pContext,
const char * pFileName, unsigned lineNo );
// search destination management
void registerSearchDest (
epicsGuard < epicsMutex > &, SearchDest & req );
bool findOrCreateVirtCircuit (
epicsGuard < epicsMutex > &, const osiSockAddr &,
unsigned, tcpiiu *&, unsigned, SearchDestTCP * pSearchDest = NULL );
// diagnostics
unsigned circuitCount ( epicsGuard < epicsMutex > & ) const;
void show ( epicsGuard < epicsMutex > &, unsigned level ) const;
@@ -191,6 +195,7 @@ public:
int varArgsPrintFormated (
epicsGuard < epicsMutex > & callbackControl,
const char *pformat, va_list args ) const;
double connectionTimeout ( epicsGuard < epicsMutex > & );
// buffer management
char * allocateSmallBufferTCP ();
@@ -235,6 +240,7 @@ private:
resTable < bhe, inetAddrID > beaconTable;
resTable < tcpiiu, caServerID > serverTable;
tsDLList < tcpiiu > circuitList;
tsDLList < SearchDest > searchDestList;
tsFreeList
< class tcpiiu, 32, epicsMutexNOOP >
freeListVirtualCircuit;
@@ -275,6 +281,7 @@ private:
unsigned maxRecvBytesTCP;
unsigned maxContigFrames;
unsigned beaconAnomalyCount;
unsigned short _serverPort;
unsigned iiuExistenceCount;
void recycleReadNotifyIO (
@@ -303,6 +310,8 @@ private:
const epicsTime & currentTime, const caHdrLargeArray &, void *pMsgBdy );
bool writeNotifyRespAction ( callbackManager &, tcpiiu &,
const epicsTime & currentTime, const caHdrLargeArray &, void *pMsgBdy );
bool searchRespAction ( callbackManager &, tcpiiu &,
const epicsTime & currentTime, const caHdrLargeArray &, void *pMsgBdy );
bool readNotifyRespAction ( callbackManager &, tcpiiu &,
const epicsTime & currentTime, const caHdrLargeArray &, void *pMsgBdy );
bool eventRespAction ( callbackManager &, tcpiiu &,
@@ -444,11 +453,17 @@ inline const char * cac :: pLocalHostName ()
return _refLocalHostName->pointer ();
}
inline unsigned cac ::
inline unsigned cac ::
maxContiguousFrames ( epicsGuard < epicsMutex > & ) const
{
return maxContigFrames;
}
inline double cac ::
connectionTimeout ( epicsGuard < epicsMutex > & guard )
{
guard.assertIdenticalMutex ( this->mutex );
return this->connTMO;
}
#endif // ifdef cach
+2 -2
View File
@@ -3,10 +3,10 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
*
* L O S A L A M O S
+1 -1
View File
@@ -41,7 +41,7 @@
# include "shareLib.h"
#endif
#define CA_MINOR_PROTOCOL_REVISION 11
#define CA_MINOR_PROTOCOL_REVISION 12
#include "caProto.h"
#include "cacIO.h"
+119 -32
View File
@@ -3,14 +3,11 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
@@ -18,6 +15,7 @@
* Copyright, 1986, The Regents of the University of California.
*
* Author: Jeff Hill
*
*/
#ifdef _MSC_VER
@@ -228,7 +226,6 @@ void tcpSendThread::run ()
epicsThreadSleep ( 0.1 );
}
}
this->iiu.cacRef.destroyIIU ( this->iiu );
}
@@ -236,7 +233,6 @@ unsigned tcpiiu::sendBytes ( const void *pBuf,
unsigned nBytesInBuf, const epicsTime & currentTime )
{
unsigned nBytes = 0u;
assert ( nBytesInBuf <= INT_MAX );
this->sendDog.start ( currentTime );
@@ -453,6 +449,10 @@ void tcpRecvThread::run ()
this->iiu.cacRef.destroyIIU ( this->iiu );
return;
}
if ( this->iiu.isNameService () ) {
this->iiu.pSearchDest->setCircuit ( &this->iiu );
this->iiu.pSearchDest->enable ();
}
}
this->iiu.sendThread.start ();
@@ -477,7 +477,7 @@ void tcpRecvThread::run ()
pComBuf->fillFromWire ( this->iiu, stat );
epicsTime currentTime = epicsTime::getCurrent ();
{
epicsGuard < epicsMutex > guard ( this->iiu.mutex );
@@ -634,17 +634,27 @@ void tcpRecvThread::connect (
continue;
}
else if ( errnoCpy == SOCK_SHUTDOWN ) {
break;
if ( ! this->iiu.isNameService () ) {
break;
}
}
else {
else {
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf ( "CAC: Unable to connect because \"%s\"\n",
errlogPrintf ( "CAC: Unable to connect because \"%s\"\n",
sockErrBuf );
this->iiu.disconnectNotify ( guard );
break;
if ( ! this->iiu.isNameService () ) {
this->iiu.disconnectNotify ( guard );
break;
}
}
{
double sleepTime = this->iiu.cacRef.connectionTimeout ( guard );
epicsGuardRelease < epicsMutex > unguard ( guard );
epicsThreadSleep ( sleepTime );
}
continue;
}
}
return;
@@ -659,7 +669,8 @@ tcpiiu::tcpiiu (
epicsTimerQueue & timerQueue, const osiSockAddr & addrIn,
comBufMemoryManager & comBufMemMgrIn,
unsigned minorVersion, ipAddrToAsciiEngine & engineIn,
const cacChannel::priLev & priorityIn ) :
const cacChannel::priLev & priorityIn,
SearchDestTCP * pSearchDestIn ) :
caServerID ( addrIn.ia, priorityIn ),
hostNameCacheInstance ( addrIn, engineIn ),
recvThread ( *this, cbMutexIn, ctxNotifyIn, "CAC-TCP-recv",
@@ -680,6 +691,7 @@ tcpiiu::tcpiiu (
comBufMemMgr ( comBufMemMgrIn ),
cacRef ( cac ),
pCurData ( cac.allocateSmallBufferTCP () ),
pSearchDest ( pSearchDestIn ),
mutex ( mutexIn ),
cbMutex ( cbMutexIn ),
minorProtocolVersion ( minorVersion ),
@@ -815,6 +827,10 @@ tcpiiu::tcpiiu (
}
# endif
if ( isNameService() ) {
pSearchDest->setCircuit ( this );
}
memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) );
}
@@ -831,6 +847,7 @@ void tcpiiu::initiateCleanShutdown (
epicsGuard < epicsMutex > & guard )
{
guard.assertIdenticalMutex ( this->mutex );
if ( this->state == iiucs_connected ) {
if ( this->unresponsiveCircuit ) {
this->initiateAbortShutdown ( guard );
@@ -1014,8 +1031,12 @@ void tcpiiu::initiateAbortShutdown (
//
// tcpiiu::~tcpiiu ()
//
tcpiiu::~tcpiiu ()
tcpiiu :: ~tcpiiu ()
{
if ( this->pSearchDest ) {
this->pSearchDest->disable ();
}
this->sendThread.exitWait ();
this->recvThread.exitWait ();
this->sendDog.cancel ();
@@ -1830,7 +1851,9 @@ void tcpiiu::disconnectAllChannels (
this->channelCountTot = 0u;
this->initiateCleanShutdown ( guard );
if ( ! isNameService () ) {
this->initiateCleanShutdown ( guard );
}
}
void tcpiiu::unlinkAllChannels (
@@ -1839,7 +1862,7 @@ void tcpiiu::unlinkAllChannels (
{
cbGuard.assertIdenticalMutex ( this->cbMutex );
guard.assertIdenticalMutex ( this->mutex );
while ( nciu * pChan = this->createReqPend.get () ) {
pChan->serviceShutdownNotify ( cbGuard, guard );
}
@@ -1890,7 +1913,9 @@ void tcpiiu::unlinkAllChannels (
this->channelCountTot = 0u;
this->initiateCleanShutdown ( guard );
if ( ! isNameService () ) {
this->initiateCleanShutdown ( guard );
}
}
void tcpiiu::installChannel (
@@ -1909,20 +1934,6 @@ void tcpiiu::installChannel (
this->sendThreadFlushEvent.signal ();
}
void tcpiiu::nameResolutionMsgEndNotify ()
{
bool wakeupNeeded = false;
{
epicsGuard < epicsMutex > autoMutex ( this->mutex );
if ( this->createReqPend.count () ) {
wakeupNeeded = true;
}
}
if ( wakeupNeeded ) {
this->sendThreadFlushEvent.signal ();
}
}
bool tcpiiu :: connectNotify (
epicsGuard < epicsMutex > & guard, nciu & chan )
{
@@ -1980,7 +1991,7 @@ void tcpiiu::uninstallChan (
}
chan.channelNode::listMember = channelNode::cs_none;
this->channelCountTot--;
if ( this->channelCountTot == 0 ) {
if ( this->channelCountTot == 0 && ! this->isNameService() ) {
this->initiateCleanShutdown ( guard );
}
}
@@ -2100,5 +2111,81 @@ bool tcpiiu::searchMsg (
guard, id, pName, nameLength );
}
SearchDestTCP :: SearchDestTCP (
cac & cacIn, const osiSockAddr & addrIn ) :
_ptcpiiu ( NULL ),
_cac ( cacIn ),
_addr ( addrIn ),
_active ( false )
{
}
void SearchDestTCP :: disable ()
{
_active = false;
_ptcpiiu = NULL;
}
void SearchDestTCP :: enable ()
{
_active = true;
}
void SearchDestTCP :: searchRequest (
epicsGuard < epicsMutex > & guard,
const char * pBuf, size_t len )
{
// restart circuit if it was shut down
if ( ! _ptcpiiu ) {
tcpiiu * piiu = NULL;
bool newIIU = _cac.findOrCreateVirtCircuit (
guard, _addr, cacChannel::priorityDefault,
piiu, CA_UKN_MINOR_VERSION, this );
if ( newIIU ) {
piiu->start ( guard );
}
_ptcpiiu = piiu;
}
// does this server support TCP-based name resolution?
if ( CA_V412 ( _ptcpiiu->minorProtocolVersion ) ) {
guard.assertIdenticalMutex ( _ptcpiiu->mutex );
assert ( CA_MESSAGE_ALIGN ( len ) == len );
comQueSendMsgMinder minder ( _ptcpiiu->sendQue, guard );
_ptcpiiu->sendQue.pushString ( pBuf, len );
minder.commit ();
_ptcpiiu->flushRequest ( guard );
}
}
void SearchDestTCP :: show (
epicsGuard < epicsMutex > & guard, unsigned level ) const
{
:: printf ( "tcpiiu :: SearchDestTCP\n" );
}
void tcpiiu :: versionRespNotify ( const caHdrLargeArray & msg )
{
this->minorProtocolVersion = msg.m_count;
}
void tcpiiu :: searchRespNotify (
const epicsTime & currentTime, const caHdrLargeArray & msg )
{
/*
* the type field is abused to carry the port number
* so that we can have multiple servers on one host
*/
osiSockAddr serverAddr;
if ( msg.m_cid != INADDR_BROADCAST ) {
serverAddr.ia.sin_family = AF_INET;
serverAddr.ia.sin_addr.s_addr = htonl ( msg.m_cid );
serverAddr.ia.sin_port = htons ( msg.m_dataType );
}
else {
serverAddr = this->address ();
}
cacRef.transferChanToVirtCircuit
( msg.m_available, msg.m_cid, 0xffff,
0, minorProtocolVersion, serverAddr, currentTime );
}
+207 -79
View File
@@ -3,10 +3,10 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
*
* L O S A L A M O S
@@ -73,7 +73,9 @@ udpiiu::udpiiu (
epicsMutex & cbMutexIn,
epicsMutex & cacMutexIn,
cacContextNotify & ctxNotifyIn,
cac & cac ) :
cac & cac,
unsigned port,
tsDLList < SearchDest > & searchDestListIn ) :
recvThread ( *this, ctxNotifyIn, cbMutexIn, "CAC-UDP",
epicsThreadGetStackSize ( epicsThreadStackMedium ),
cac::lowestPriorityLevelAbove (
@@ -95,7 +97,7 @@ udpiiu::udpiiu (
lastReceivedSeqNo ( 0 ),
sock ( 0 ),
repeaterPort ( 0 ),
serverPort ( 0 ),
serverPort ( port ),
localPort ( 0 ),
shutdownCmd ( false ),
lastReceivedSeqNoIsValid ( false )
@@ -150,10 +152,6 @@ udpiiu::udpiiu (
envGetInetPortConfigParam ( &EPICS_CA_REPEATER_PORT,
static_cast <unsigned short> (CA_REPEATER_PORT) );
this->serverPort =
envGetInetPortConfigParam ( &EPICS_CA_SERVER_PORT,
static_cast <unsigned short> (CA_SERVER_PORT) );
this->sock = epicsSocketCreate ( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
if ( this->sock == INVALID_SOCKET ) {
char sockErrBuf[64];
@@ -238,8 +236,19 @@ udpiiu::udpiiu (
* load user and auto configured
* broadcast address list
*/
ellInit ( & this->dest ); // X aCC 392
configureChannelAccessAddressList ( & this->dest, this->sock, this->serverPort );
ELLLIST dest;
ellInit ( & dest );
configureChannelAccessAddressList ( & dest, this->sock, this->serverPort );
while ( osiSockAddrNode *
pNode = reinterpret_cast < osiSockAddrNode * > ( ellGet ( & dest ) ) ) {
SearchDestUDP & searchDest = *
new SearchDestUDP ( pNode->addr, *this );
_searchDestList.add ( searchDest );
free ( pNode );
}
/* add list of tcp name service addresses */
_searchDestList.add ( searchDestListIn );
caStartRepeaterIfNotInstalled ( this->repeaterPort );
@@ -265,14 +274,12 @@ udpiiu::~udpiiu ()
this->shutdown ( cbGuard, guard );
}
// avoid use of ellFree because problems on windows occur if the
// free is in a different DLL than the malloc
ELLNODE * nnode = this->dest.node.next;
while ( nnode )
tsDLIter < SearchDest > iter ( _searchDestList.firstIter () );
while ( iter.valid () )
{
ELLNODE * pnode = nnode;
nnode = nnode->next;
free ( pnode );
SearchDest & curr ( *iter );
iter++;
delete & curr;
}
epicsSocketDestroy ( this->sock );
@@ -344,7 +351,7 @@ void udpRecvThread::run ()
{
epicsThreadPrivateSet ( caClientCallbackThreadId, &this->iiu );
if ( ellCount ( & this->iiu.dest ) == 0 ) { // X aCC 392
if ( this->iiu._searchDestList.count () == 0 ) {
callbackManager mgr ( this->ctxNotify, this->cbMutex );
epicsGuard < epicsMutex > guard ( this->iiu.cacMutex );
genLocalExcep ( mgr.cbGuard, guard,
@@ -614,26 +621,31 @@ bool udpiiu::versionAction (
return true;
}
bool udpiiu::searchRespAction ( // X aCC 361
const caHdr &msg,
const osiSockAddr & addr, const epicsTime & currentTime )
bool udpiiu :: searchRespAction (
const caHdr & msg, const osiSockAddr & addr,
const epicsTime & currentTime )
{
/*
* we dont currently know what to do with channel's
* found to be at non-IP type addresses
*/
if ( addr.sa.sa_family != AF_INET ) {
return false;
return true;
}
/*
* Starting with CA V4.1 the minor version number
* is appended to the end of each search reply.
* is appended to the end of each UDP search reply.
* This value is ignored by earlier clients.
*/
ca_uint32_t minorVersion;
if ( msg.m_postsize >= sizeof (minorVersion) ){
if ( msg.m_postsize >= sizeof ( minorVersion ) ){
/*
* care is taken here not to break gcc 3.2 aggressive alias
* analysis rules
*/
ca_uint8_t * pPayLoad = ( ca_uint8_t *) ( &msg + 1 );
const ca_uint8_t * pPayLoad =
reinterpret_cast < const ca_uint8_t *> ( & msg + 1 );
unsigned byte0 = pPayLoad[0];
unsigned byte1 = pPayLoad[1];
minorVersion = ( byte0 << 8u ) | byte1;
@@ -667,12 +679,12 @@ bool udpiiu::searchRespAction ( // X aCC 361
}
if ( CA_V42 ( minorVersion ) ) {
this->cacRef.transferChanToVirtCircuit
cacRef.transferChanToVirtCircuit
( msg.m_available, msg.m_cid, 0xffff,
0, minorVersion, serverAddr, currentTime );
}
else {
this->cacRef.transferChanToVirtCircuit
cacRef.transferChanToVirtCircuit
( msg.m_available, msg.m_cid, msg.m_dataType,
msg.m_count, minorVersion, serverAddr, currentTime );
}
@@ -895,62 +907,168 @@ bool udpiiu::pushDatagramMsg ( epicsGuard < epicsMutex > & guard,
return true;
}
bool udpiiu::datagramFlush (
epicsGuard < epicsMutex > &, const epicsTime & /* currentTime */ )
udpiiu :: SearchDestUDP :: SearchDestUDP (
const osiSockAddr & destAddr, udpiiu & udpiiuIn ) :
_destAddr ( destAddr ), _udpiiu ( udpiiuIn )
{
}
void udpiiu :: SearchDestUDP :: searchRequest (
epicsGuard < epicsMutex > & guard, const char * pBuf, size_t bufSize )
{
guard.assertIdenticalMutex ( _udpiiu.cacMutex );
assert ( bufSize <= INT_MAX );
int bufSizeAsInt = static_cast < int > ( bufSize );
while ( true ) {
// This const_cast is needed for vxWorks:
int status = sendto ( _udpiiu.sock, const_cast<char *>(pBuf), bufSizeAsInt, 0,
& _destAddr.sa, sizeof ( _destAddr.sa ) );
if ( status == bufSizeAsInt ) {
break;
}
if ( status >= 0 ) {
errlogPrintf ( "CAC: UDP sendto () call returned strange xmit count?\n" );
break;
}
else {
int localErrno = SOCKERRNO;
if ( localErrno == SOCK_EINTR ) {
if ( _udpiiu.shutdownCmd ) {
break;
}
else {
continue;
}
}
else if ( localErrno == SOCK_SHUTDOWN ) {
break;
}
else if ( localErrno == SOCK_ENOTSOCK ) {
break;
}
else if ( localErrno == SOCK_EBADF ) {
break;
}
else {
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
char buf[64];
sockAddrToDottedIP ( &_destAddr.sa, buf, sizeof ( buf ) );
errlogPrintf (
"CAC: error = \"%s\" sending UDP msg to %s\n",
sockErrBuf, buf);
break;
}
}
}
}
void udpiiu :: SearchDestUDP :: show (
epicsGuard < epicsMutex > & guard, unsigned level ) const
{
guard.assertIdenticalMutex ( _udpiiu.cacMutex );
char buf[64];
sockAddrToDottedIP ( &_destAddr.sa, buf, sizeof ( buf ) );
:: printf ( "UDP Search destination \"%s\"\n", buf );
}
udpiiu :: SearchRespCallback :: SearchRespCallback ( udpiiu & udpiiuIn ) :
_udpiiu ( udpiiuIn )
{
}
void udpiiu :: SearchRespCallback :: notify (
const caHdr & msg, const void * pPayloadUntyped,
const osiSockAddr & addr, const epicsTime & currentTime )
{
/*
* we dont currently know what to do with channel's
* found to be at non-IP type addresses
*/
if ( addr.sa.sa_family != AF_INET ) {
return;
}
/*
* Starting with CA V4.1 the minor version number
* is appended to the end of each search reply.
* This value is ignored by earlier clients.
*/
ca_uint32_t minorVersion;
if ( msg.m_postsize >= sizeof ( minorVersion ) ){
/*
* care is taken here not to break gcc 3.2 aggressive alias
* analysis rules
*/
const ca_uint8_t * pPayLoad = reinterpret_cast < const ca_uint8_t *> ( pPayloadUntyped );
unsigned byte0 = pPayLoad[0];
unsigned byte1 = pPayLoad[1];
minorVersion = ( byte0 << 8u ) | byte1;
}
else {
minorVersion = CA_UKN_MINOR_VERSION;
}
/*
* the type field is abused to carry the port number
* so that we can have multiple servers on one host
*/
osiSockAddr serverAddr;
serverAddr.ia.sin_family = AF_INET;
if ( CA_V48 ( minorVersion ) ) {
if ( msg.m_cid != INADDR_BROADCAST ) {
serverAddr.ia.sin_addr.s_addr = htonl ( msg.m_cid );
}
else {
serverAddr.ia.sin_addr = addr.ia.sin_addr;
}
serverAddr.ia.sin_port = htons ( msg.m_dataType );
}
else if ( CA_V45 (minorVersion) ) {
serverAddr.ia.sin_port = htons ( msg.m_dataType );
serverAddr.ia.sin_addr = addr.ia.sin_addr;
}
else {
serverAddr.ia.sin_port = htons ( _udpiiu.serverPort );
serverAddr.ia.sin_addr = addr.ia.sin_addr;
}
if ( CA_V42 ( minorVersion ) ) {
_udpiiu.cacRef.transferChanToVirtCircuit
( msg.m_available, msg.m_cid, 0xffff,
0, minorVersion, serverAddr, currentTime );
}
else {
_udpiiu.cacRef.transferChanToVirtCircuit
( msg.m_available, msg.m_cid, msg.m_dataType,
msg.m_count, minorVersion, serverAddr, currentTime );
}
}
void udpiiu :: SearchRespCallback :: show (
epicsGuard < epicsMutex > & guard, unsigned level ) const
{
guard.assertIdenticalMutex ( _udpiiu.cacMutex );
::printf ( "udpiiu :: SearchRespCallback\n" );
}
bool udpiiu :: datagramFlush (
epicsGuard < epicsMutex > & guard, const epicsTime & currentTime )
{
guard.assertIdenticalMutex ( cacMutex );
// dont send the version header by itself
if ( this->nBytesInXmitBuf <= sizeof ( caHdr ) ) {
return false;
}
osiSockAddrNode *pNode = ( osiSockAddrNode * ) // X aCC 749
ellFirst ( & this->dest );
while ( pNode ) {
int status;
assert ( this->nBytesInXmitBuf <= INT_MAX );
status = sendto ( this->sock, this->xmitBuf,
(int) this->nBytesInXmitBuf, 0,
&pNode->addr.sa, sizeof ( pNode->addr.sa ) );
if ( status != (int) this->nBytesInXmitBuf ) {
if ( status >= 0 ) {
errlogPrintf ( "CAC: UDP sendto () call returned strange xmit count?\n" );
break;
}
else {
int localErrno = SOCKERRNO;
if ( localErrno == SOCK_EINTR ) {
if ( this->shutdownCmd ) {
break;
}
else {
continue;
}
}
else if ( localErrno == SOCK_SHUTDOWN ) {
break;
}
else if ( localErrno == SOCK_ENOTSOCK ) {
break;
}
else if ( localErrno == SOCK_EBADF ) {
break;
}
else {
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
char buf[64];
sockAddrToDottedIP ( &pNode->addr.sa, buf, sizeof ( buf ) );
errlogPrintf (
"CAC: error = \"%s\" sending UDP msg to %s\n",
sockErrBuf, buf);
break;
}
}
}
pNode = (osiSockAddrNode *) ellNext ( &pNode->node ); // X aCC 749
tsDLIter < SearchDest > iter ( _searchDestList.firstIter () );
while ( iter.valid () )
{
iter->searchRequest ( guard, this->xmitBuf, this->nBytesInXmitBuf );
iter++;
}
this->nBytesInXmitBuf = 0u;
@@ -960,7 +1078,7 @@ bool udpiiu::datagramFlush (
return true;
}
void udpiiu::show ( unsigned level ) const
void udpiiu :: show ( unsigned level ) const
{
epicsGuard < epicsMutex > guard ( this->cacMutex );
@@ -968,7 +1086,17 @@ void udpiiu::show ( unsigned level ) const
if ( level > 1u ) {
::printf ("\trepeater port %u\n", this->repeaterPort );
::printf ("\tdefault server port %u\n", this->serverPort );
printChannelAccessAddressList ( & this->dest );
::printf ( "Search Destination List with %u items\n",
_searchDestList.count () );
if ( level > 2u ) {
tsDLIterConst < SearchDest > iter (
_searchDestList.firstIter () );
while ( iter.valid () )
{
iter->show ( guard, level - 2 );
iter++;
}
}
}
if ( level > 2u ) {
::printf ("\tsocket identifier %d\n", this->sock );
+35 -5
View File
@@ -3,8 +3,7 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
@@ -47,6 +46,7 @@
#include "searchTimer.h"
#include "disconnectGovernorTimer.h"
#include "repeaterSubscribeTimer.h"
#include "SearchDest.h"
extern "C" void cacRecvThreadUDP ( void *pParam );
@@ -97,7 +97,9 @@ public:
epicsMutex & callbackControl,
epicsMutex & mutualExclusion,
cacContextNotify &,
class cac & );
class cac &,
unsigned port,
tsDLList < SearchDest > & );
virtual ~udpiiu ();
void installNewChannel (
epicsGuard < epicsMutex > &, nciu &, netiiu * & );
@@ -108,17 +110,41 @@ public:
void shutdown ( epicsGuard < epicsMutex > & cbGuard,
epicsGuard < epicsMutex > & guard );
void show ( unsigned level ) const;
// exceptions
class noSocket {};
private:
class SearchDestUDP :
public SearchDest {
public:
SearchDestUDP ( const osiSockAddr &, udpiiu & );
void searchRequest (
epicsGuard < epicsMutex > &, const char * pBuf, size_t bufLen );
void show (
epicsGuard < epicsMutex > &, unsigned level ) const;
private:
osiSockAddr _destAddr;
udpiiu & _udpiiu;
};
class SearchRespCallback :
public SearchDest :: Callback {
public:
SearchRespCallback ( udpiiu & );
void notify (
const caHdr &, const void * pPayload,
const osiSockAddr &, const epicsTime & );
void show (
epicsGuard < epicsMutex > &, unsigned level ) const;
private:
udpiiu & _udpiiu;
};
char xmitBuf [MAX_UDP_SEND];
char recvBuf [MAX_UDP_RECV];
udpRecvThread recvThread;
repeaterSubscribeTimer repeaterSubscribeTmr;
disconnectGovernorTimer govTmr;
ELLLIST dest;
tsDLList < SearchDest > _searchDestList;
double maxPeriod;
double rtteMean;
double rtteMeanDev;
@@ -264,6 +290,10 @@ private:
udpiiu & operator = ( const udpiiu & );
friend class udpRecvThread;
// These are needed for the vxWorks 5.5 compiler:
friend class udpiiu::SearchDestUDP;
friend class udpiiu::SearchRespCallback;
};
#endif // udpiiuh
+42 -8
View File
@@ -3,10 +3,10 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
*
*
@@ -37,6 +37,7 @@
#include "tcpRecvWatchdog.h"
#include "tcpSendWatchdog.h"
#include "hostNameCache.h"
#include "SearchDest.h"
#include "compilerDependencies.h"
class callbackManager;
@@ -93,15 +94,34 @@ private:
void run ();
};
class tcpiiu :
class SearchDestTCP : public SearchDest {
public:
SearchDestTCP ( cac &, const osiSockAddr & );
void searchRequest ( epicsGuard < epicsMutex > & guard,
const char * pbuf, size_t len );
void show ( epicsGuard < epicsMutex > & guard, unsigned level ) const;
void setCircuit ( tcpiiu * );
void disable ();
void enable ();
private:
tcpiiu * _ptcpiiu;
cac & _cac;
const osiSockAddr _addr;
bool _active;
};
class tcpiiu :
public netiiu, public tsDLNode < tcpiiu >,
public tsSLNode < tcpiiu >, public caServerID,
private wireSendAdapter, private wireRecvAdapter {
friend void SearchDestTCP::searchRequest ( epicsGuard < epicsMutex > & guard,
const char * pbuf, size_t len );
public:
tcpiiu ( cac & cac, epicsMutex & mutualExclusion, epicsMutex & callbackControl,
cacContextNotify &, double connectionTimeout, epicsTimerQueue & timerQueue,
const osiSockAddr & addrIn, comBufMemoryManager &, unsigned minorVersion,
ipAddrToAsciiEngine & engineIn, const cacChannel::priLev & priorityIn );
ipAddrToAsciiEngine & engineIn, const cacChannel::priLev & priorityIn,
SearchDestTCP * pSearchDestIn = NULL);
~tcpiiu ();
void start (
epicsGuard < epicsMutex > & );
@@ -175,12 +195,15 @@ public:
epicsGuard < epicsMutex > & guard, nciu & chan );
bool connectNotify (
epicsGuard < epicsMutex > &, nciu & chan );
void nameResolutionMsgEndNotify ();
void searchRespNotify (
const epicsTime &, const caHdrLargeArray & );
void versionRespNotify ( const caHdrLargeArray & );
void * operator new ( size_t size,
tsFreeList < class tcpiiu, 32, epicsMutexNOOP > & );
epicsPlacementDeleteOperator (( void *,
tsFreeList < class tcpiiu, 32, epicsMutexNOOP > & ))
epicsPlacementDeleteOperator (( void *,
tsFreeList < class tcpiiu, 32, epicsMutexNOOP > & ));
private:
hostNameCache hostNameCacheInstance;
@@ -205,6 +228,7 @@ private:
comBufMemoryManager & comBufMemMgr;
cac & cacRef;
char * pCurData;
SearchDestTCP * pSearchDest;
epicsMutex & mutex;
epicsMutex & cbMutex;
unsigned minorProtocolVersion;
@@ -257,6 +281,7 @@ private:
bool bytesArePendingInOS () const;
void decrementBlockingForFlushCount (
epicsGuard < epicsMutex > & guard );
bool isNameService () const;
// send protocol stubs
void echoRequest (
@@ -386,5 +411,14 @@ inline void tcpiiu::probeResponseNotify (
this->recvDog.probeResponseNotify ( cbGuard );
}
#endif // ifdef virtualCircuith
inline bool tcpiiu::isNameService () const
{
return ( this->pSearchDest != NULL );
}
inline void SearchDestTCP::setCircuit ( tcpiiu * piiu )
{
_ptcpiiu = piiu;
}
#endif // ifdef virtualCircuith
+1 -1
View File
@@ -32,7 +32,7 @@
# include "shareLib.h"
#endif
static const unsigned char CA_MINOR_PROTOCOL_REVISION = 11;
static const unsigned char CA_MINOR_PROTOCOL_REVISION = 12;
typedef ca_uint32_t caResId;
+250 -36
View File
@@ -3,27 +3,24 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
// *must* be defined before including net_convert.h
typedef unsigned long arrayElementCount;
#include "osiWireFormat.h"
#include "net_convert.h" // byte order conversion from libca
#include "dbMapper.h" // ait to dbr types
#include "net_convert.h" // byte order conversion from libca
#include "dbMapper.h" // ait to dbr types
#include "gddAppTable.h" // EPICS application type table
#include "gddApps.h" // gdd predefined application type codes
#include "gddApps.h" // gdd predefined application type codes
#include "errlog.h"
#include "osiPoolStatus.h" // is there sufficent space in pool
#define epicsExportSharedSymbols
#include "casStrmClient.h"
@@ -47,7 +44,7 @@ casStrmClient::pCASMsgHandler const casStrmClient::msgHandlers[] =
& casStrmClient::readAction,
& casStrmClient::writeAction,
& casStrmClient::uknownMessageAction,
& casStrmClient::uknownMessageAction,
& casStrmClient::searchAction,
& casStrmClient::uknownMessageAction,
& casStrmClient::eventsOffAction,
& casStrmClient::eventsOnAction,
@@ -74,10 +71,13 @@ casStrmClient::pCASMsgHandler const casStrmClient::msgHandlers[] =
//
// casStrmClient::casStrmClient()
//
casStrmClient::casStrmClient ( caServerI & cas, clientBufMemoryManager & mgrIn ) :
casStrmClient::casStrmClient (
caServerI & cas, clientBufMemoryManager & mgrIn,
const caNetAddr & clientAddr ) :
casCoreClient ( cas ),
in ( *this, mgrIn, 1 ),
out ( *this, mgrIn ),
_clientAddr ( clientAddr ),
pUserName ( 0 ),
pHostName ( 0 ),
incommingBytesToDrain ( 0 ),
@@ -141,7 +141,7 @@ caStatus casStrmClient :: processMsg ()
this->incommingBytesToDrain = 0u;
}
}
//
// process any messages in the in buffer
//
@@ -618,30 +618,30 @@ caStatus casStrmClient::readNotifyAction ( epicsGuard < casClientMutex > & guard
}
{
caStatus servStat = this->read ();
if ( servStat == S_casApp_success ) {
caStatus servStat = this->read ();
if ( servStat == S_casApp_success ) {
assert ( pValueRead.valid () );
caStatus status = this->readNotifyResponse (
guard, pChan, *mp,
*pValueRead, servStat );
this->responseIsPending = ( status != S_cas_success );
return status;
}
else if ( servStat == S_casApp_asyncCompletion ) {
return S_cas_success;
}
else if ( servStat == S_casApp_postponeAsyncIO ) {
}
else if ( servStat == S_casApp_asyncCompletion ) {
return S_cas_success;
}
else if ( servStat == S_casApp_postponeAsyncIO ) {
return S_casApp_postponeAsyncIO;
}
else {
caStatus status = this->readNotifyFailureResponse (
guard, *mp, ECA_GETFAIL );
}
else {
caStatus status = this->readNotifyFailureResponse (
guard, *mp, ECA_GETFAIL );
if ( status != S_cas_success ) {
this->pendingResponseStatus = servStat;
this->responseIsPending = true;
}
this->responseIsPending = true;
}
return status;
}
}
}
}
@@ -1025,23 +1025,23 @@ caStatus casStrmClient::writeAction ( epicsGuard < casClientMutex > & guard )
//
{
caStatus servStat = this->write ( & casChannelI :: write );
if ( servStat == S_casApp_success ||
if ( servStat == S_casApp_success ||
servStat == S_casApp_asyncCompletion ) {
return S_cas_success;
}
else if ( servStat == S_casApp_postponeAsyncIO ) {
}
else if ( servStat == S_casApp_postponeAsyncIO ) {
return S_casApp_postponeAsyncIO;
}
else {
}
else {
caStatus status =
this->writeActionSendFailureStatus ( guard, *mp,
pChan->getCID(), servStat );
if ( status != S_cas_success ) {
this->pendingResponseStatus = servStat;
this->responseIsPending = true;
}
}
return status;
}
}
}
//
@@ -1194,6 +1194,221 @@ caStatus casStrmClient::writeNotifyResponseECA_XXX (
return status;
}
//
// casStrmClient :: asyncSearchResp()
//
caStatus casStrmClient :: asyncSearchResponse (
epicsGuard < casClientMutex > & guard, const caNetAddr & /* outAddr */,
const caHdrLargeArray & msg, const pvExistReturn & retVal,
ca_uint16_t /* protocolRevision */, ca_uint32_t /* sequenceNumber */ )
{
return this->searchResponse ( guard, msg, retVal );
}
// casStrmClient :: hostName()
void casStrmClient :: hostName ( char * pInBuf, unsigned bufSizeIn ) const
{
_clientAddr.stringConvert ( pInBuf, bufSizeIn );
}
//
// caStatus casStrmClient :: searchResponse()
//
caStatus casStrmClient :: searchResponse (
epicsGuard < casClientMutex > & guard,
const caHdrLargeArray & msg,
const pvExistReturn & retVal )
{
if ( retVal.getStatus() != pverExistsHere ) {
return S_cas_success;
}
//
// starting with V4.1 the count field is used (abused)
// by the client to store the minor version number of
// the client.
//
// Old versions expect alloc of channel in response
// to a search request. This is no longer supported.
//
if ( !CA_V44( msg.m_count ) ) {
errlogPrintf (
"client \"%s\" using EPICS R3.11 CA "
"connect protocol was ignored\n",
pHostName );
//
// old connect protocol was dropped when the
// new API was added to the server (they must
// now use clients at EPICS 3.12 or higher)
//
caStatus status = this->sendErr (
guard, & msg, invalidResID, ECA_DEFUNCT,
"R3.11 connect sequence from old client was ignored" );
return status;
}
//
// cid field is abused to carry the IP
// address in CA_V48 or higher
// (this allows a CA servers to serve
// as a directory service)
//
// data type field is abused to carry the IP
// port number here CA_V44 or higher
// (this allows multiple CA servers on one
// host)
//
ca_uint32_t serverAddr;
ca_uint16_t serverPort;
if ( CA_V48( msg.m_count ) ) {
struct sockaddr_in ina;
if ( retVal.addrIsValid() ) {
caNetAddr addr = retVal.getAddr();
ina = addr.getSockIP();
//
// If they dont specify a port number then the default
// CA server port is assumed when it it is a server
// address redirect (it is never correct to use this
// server's port when it is a redirect).
//
if ( ina.sin_port == 0u ) {
ina.sin_port = htons ( CA_SERVER_PORT );
}
}
else {
//
// We dont fill in the servers address here because
// the client has a tcp circuit to us and he knows
// our address
//
ina.sin_addr.s_addr = htonl ( ~0U );
ina.sin_port = htons ( 0 );
}
serverAddr = ntohl ( ina.sin_addr.s_addr );
serverPort = ntohs ( ina.sin_port );
}
else {
serverAddr = ntohl ( ~0U );
serverPort = ntohs ( 0 );
}
caStatus status = this->out.copyInHeader ( CA_PROTO_SEARCH,
0, serverPort, 0, serverAddr, msg.m_available, 0 );
//
// Starting with CA V4.1 the minor version number
// is appended to the end of each search reply.
// This value is ignored by earlier clients.
//
if ( status == S_cas_success ) {
this->out.commitMsg ();
}
return status;
}
//
// casStrmClient :: searchAction()
//
caStatus casStrmClient :: searchAction ( epicsGuard < casClientMutex > & guard )
{
const caHdrLargeArray *mp = this->ctx.getMsg();
const char *pChanName = static_cast <char * > ( this->ctx.getData() );
caStatus status;
//
// check the sanity of the message
//
if ( mp->m_postsize <= 1 ) {
caServerI::dumpMsg ( this->pHostName, "?", mp, this->ctx.getData(),
"empty PV name extension in TCP search request?\n" );
return S_cas_success;
}
if ( pChanName[0] == '\0' ) {
caServerI::dumpMsg ( this->pHostName, "?", mp, this->ctx.getData(),
"zero length PV name in UDP search request?\n" );
return S_cas_success;
}
// check for an unterminated string before calling server tool
// by searching backwards through the string (some early versions
// of the client library might not be setting the pad bytes to nill)
for ( unsigned i = mp->m_postsize-1; pChanName[i] != '\0'; i-- ) {
if ( i <= 1 ) {
caServerI::dumpMsg ( pHostName, "?", mp, this->ctx.getData(),
"unterminated PV name in UDP search request?\n" );
return S_cas_success;
}
}
if ( this->getCAS().getDebugLevel() > 6u ) {
this->hostName ( this->pHostName, sizeof ( pHostName ) );
printf ( "\"%s\" is searching for \"%s\"\n",
pHostName, pChanName );
}
//
// verify that we have sufficent memory for a PV and a
// monitor prior to calling PV exist test so that when
// the server runs out of memory we dont reply to
// search requests, and therefore dont thrash through
// caServer::pvExistTest() and casCreatePV::pvAttach()
//
if ( ! osiSufficentSpaceInPool ( 0 ) ) {
return S_cas_success;
}
//
// ask the server tool if this PV exists
//
this->userStartedAsyncIO = false;
pvExistReturn pver =
this->getCAS()->pvExistTest (
this->ctx, _clientAddr, pChanName );
//
// prevent problems when they initiate
// async IO but dont return status
// indicating so (and vise versa)
//
if ( this->userStartedAsyncIO ) {
if ( pver.getStatus() != pverAsyncCompletion ) {
errMessage ( S_cas_badParameter,
"- assuming asynch IO status from caServer::pvExistTest()");
}
status = S_cas_success;
}
else {
//
// otherwise we assume sync IO operation was initiated
//
switch ( pver.getStatus() ) {
case pverExistsHere:
status = this->searchResponse ( guard, *mp, pver );
break;
case pverDoesNotExistHere:
status = S_cas_success;
break;
case pverAsyncCompletion:
errMessage ( S_cas_badParameter,
"- unexpected asynch IO status from "
"caServer::pvExistTest() ignored");
status = S_cas_success;
break;
default:
errMessage ( S_cas_badParameter,
"- invalid return from "
"caServer::pvExistTest() ignored");
status = S_cas_success;
break;
}
}
return status;
}
/*
* casStrmClient::hostNameAction()
*/
@@ -2610,9 +2825,8 @@ inBufClient::fillCondition casStrmClient::inBufFill ()
epicsGuard < epicsMutex > guard ( this->mutex );
return this->in.fill ();
}
bufSizeT casStrmClient ::
inBufBytesPending () const
bufSizeT casStrmClient :: inBufBytesPending () const
{
epicsGuard < epicsMutex > guard ( this->mutex );
return this->in.bytesPresent ();
+69 -64
View File
@@ -1,11 +1,9 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
@@ -37,43 +35,44 @@ class casStrmClient :
public casCoreClient, public outBufClient,
public inBufClient, public tsDLNode < casStrmClient > {
public:
casStrmClient ( caServerI &, clientBufMemoryManager & );
virtual ~casStrmClient();
void show ( unsigned level ) const;
casStrmClient ( caServerI &, clientBufMemoryManager &, const caNetAddr & clientAddr );
virtual ~casStrmClient();
void show ( unsigned level ) const;
outBufClient::flushCondition flush ();
unsigned getDebugLevel () const;
virtual void hostName ( char * pBuf, unsigned bufSize ) const = 0;
void userName ( char * pBuf, unsigned bufSize ) const;
ca_uint16_t protocolRevision () const;
unsigned getDebugLevel () const;
void hostName ( char * pBuf, unsigned bufSize ) const;
void userName ( char * pBuf, unsigned bufSize ) const;
ca_uint16_t protocolRevision () const;
void sendVersion ();
protected:
caStatus processMsg ();
caStatus processMsg ();
bool inBufFull () const;
inBufClient::fillCondition inBufFill ();
bufSizeT inBufBytesPending () const;
bufSizeT outBufBytesPending () const;
private:
char hostNameStr [32];
//char hostNameStr [32];
inBuf in;
outBuf out;
chronIntIdResTable < casChannelI > chanTable;
tsDLList < casChannelI > chanList;
epicsTime lastSendTS;
epicsTime lastRecvTS;
char * pUserName;
char * pHostName;
chronIntIdResTable < casChannelI > chanTable;
tsDLList < casChannelI > chanList;
epicsTime lastSendTS;
epicsTime lastRecvTS;
caNetAddr _clientAddr;
char * pUserName;
char * pHostName;
smartGDDPointer pValueRead;
unsigned incommingBytesToDrain;
caStatus pendingResponseStatus;
ca_uint16_t minor_version_number;
ca_uint16_t minor_version_number;
bool reqPayloadNeedsByteSwap;
bool responseIsPending;
caStatus createChannel ( const char * pName );
caStatus verifyRequest ( casChannelI * & pChan );
caStatus createChannel ( const char * pName );
caStatus verifyRequest ( casChannelI * & pChan );
typedef caStatus ( casStrmClient :: * pCASMsgHandler )
( epicsGuard < casClientMutex > & );
static pCASMsgHandler const msgHandlers[CA_PROTO_LAST_CMMD+1u];
( epicsGuard < casClientMutex > & );
static pCASMsgHandler const msgHandlers[CA_PROTO_LAST_CMMD+1u];
//
// one function for each CA request type
@@ -82,19 +81,20 @@ private:
caStatus ignoreMsgAction ( epicsGuard < casClientMutex > & );
caStatus versionAction ( epicsGuard < casClientMutex > & );
caStatus echoAction ( epicsGuard < casClientMutex > & );
caStatus eventAddAction ( epicsGuard < casClientMutex > & );
caStatus eventCancelAction ( epicsGuard < casClientMutex > & );
caStatus readAction ( epicsGuard < casClientMutex > & );
caStatus readNotifyAction ( epicsGuard < casClientMutex > & );
caStatus writeAction ( epicsGuard < casClientMutex > & );
caStatus eventsOffAction ( epicsGuard < casClientMutex > & );
caStatus eventsOnAction ( epicsGuard < casClientMutex > & );
caStatus readSyncAction ( epicsGuard < casClientMutex > & );
caStatus clearChannelAction ( epicsGuard < casClientMutex > & );
caStatus claimChannelAction ( epicsGuard < casClientMutex > & );
caStatus writeNotifyAction ( epicsGuard < casClientMutex > & );
caStatus clientNameAction ( epicsGuard < casClientMutex > & );
caStatus hostNameAction ( epicsGuard < casClientMutex > & );
caStatus eventAddAction ( epicsGuard < casClientMutex > & );
caStatus eventCancelAction ( epicsGuard < casClientMutex > & );
caStatus readAction ( epicsGuard < casClientMutex > & );
caStatus readNotifyAction ( epicsGuard < casClientMutex > & );
caStatus writeAction ( epicsGuard < casClientMutex > & );
caStatus eventsOffAction ( epicsGuard < casClientMutex > & );
caStatus eventsOnAction ( epicsGuard < casClientMutex > & );
caStatus readSyncAction ( epicsGuard < casClientMutex > & );
caStatus clearChannelAction ( epicsGuard < casClientMutex > & );
caStatus claimChannelAction ( epicsGuard < casClientMutex > & );
caStatus writeNotifyAction ( epicsGuard < casClientMutex > & );
caStatus clientNameAction ( epicsGuard < casClientMutex > & );
caStatus hostNameAction ( epicsGuard < casClientMutex > & );
caStatus searchAction ( epicsGuard < casClientMutex > & );
caStatus sendErr ( epicsGuard < casClientMutex > &,
const caHdrLargeArray *curp, ca_uint32_t cid,
const int reportedStatus, const char * pformat, ... );
@@ -114,34 +114,40 @@ private:
// one function for each CA request type that has
// asynchronous completion
//
caStatus createChanResponse ( epicsGuard < casClientMutex > &,
casCtx &, const pvAttachReturn & );
caStatus readResponse ( epicsGuard < casClientMutex > &,
casChannelI * pChan, const caHdrLargeArray & msg,
const gdd & desc, const caStatus status );
caStatus readNotifyResponse ( epicsGuard < casClientMutex > &,
caStatus createChanResponse ( epicsGuard < casClientMutex > &,
casCtx &, const pvAttachReturn & );
caStatus readResponse ( epicsGuard < casClientMutex > &,
casChannelI * pChan, const caHdrLargeArray & msg,
const gdd & desc, const caStatus status );
caStatus readNotifyResponse ( epicsGuard < casClientMutex > &,
casChannelI *pChan, const caHdrLargeArray & msg,
const gdd & desc, const caStatus status );
caStatus writeResponse ( epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray & msg, const caStatus status );
caStatus writeNotifyResponse ( epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray &, const caStatus status );
caStatus monitorResponse ( epicsGuard < casClientMutex > &,
const gdd & desc, const caStatus status );
caStatus writeResponse ( epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray & msg, const caStatus status );
caStatus writeNotifyResponse ( epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray &, const caStatus status );
caStatus monitorResponse ( epicsGuard < casClientMutex > &,
casChannelI & chan, const caHdrLargeArray & msg,
const gdd &, const caStatus status );
const gdd & desc, const caStatus status );
caStatus enumPostponedCreateChanResponse ( epicsGuard < casClientMutex > &,
casChannelI & chan, const caHdrLargeArray & hdr );
caStatus privateCreateChanResponse ( epicsGuard < casClientMutex > &,
casChannelI & chan, const caHdrLargeArray & hdr, unsigned dbrType );
caStatus channelCreateFailedResp ( epicsGuard < casClientMutex > &,
caStatus channelCreateFailedResp ( epicsGuard < casClientMutex > &,
const caHdrLargeArray &, const caStatus createStatus );
caStatus channelDestroyEventNotify (
epicsGuard < casClientMutex > & guard,
casChannelI * const pChan, ca_uint32_t sid );
caStatus accessRightsResponse (
caStatus accessRightsResponse (
casChannelI * pciu );
caStatus accessRightsResponse (
caStatus accessRightsResponse (
epicsGuard < casClientMutex > &, casChannelI * pciu );
caStatus searchResponse (
epicsGuard < casClientMutex > &, const caHdrLargeArray &, const pvExistReturn & );
caStatus asyncSearchResponse (
epicsGuard < casClientMutex > &, const caNetAddr & outAddr,
const caHdrLargeArray & msg, const pvExistReturn & retVal,
ca_uint16_t protocolRevision, ca_uint32_t sequenceNumber );
typedef caStatus ( casChannelI :: * PWriteMethod ) (
@@ -152,29 +158,29 @@ private:
caStatus writeScalarData( PWriteMethod );
outBufClient::flushCondition xSend ( char * pBuf, bufSizeT nBytesToSend,
bufSizeT & nBytesSent );
bufSizeT & nBytesSent );
inBufClient::fillCondition xRecv ( char * pBuf, bufSizeT nBytesToRecv,
inBufClient::fillParameter parm, bufSizeT & nByesRecv );
inBufClient::fillParameter parm, bufSizeT & nByesRecv );
virtual xBlockingStatus blockingState () const = 0;
virtual xBlockingStatus blockingState () const = 0;
virtual outBufClient::flushCondition osdSend ( const char *pBuf, bufSizeT nBytesReq,
bufSizeT & nBytesActual ) = 0;
virtual inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq,
bufSizeT &nBytesActual ) = 0;
virtual outBufClient::flushCondition osdSend ( const char *pBuf, bufSizeT nBytesReq,
bufSizeT & nBytesActual ) = 0;
virtual inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq,
bufSizeT &nBytesActual ) = 0;
virtual void forceDisconnect () = 0;
caStatus casMonitorCallBack (
caStatus casMonitorCallBack (
epicsGuard < casClientMutex > &, casMonitor &, const gdd & );
caStatus logBadIdWithFileAndLineno (
epicsGuard < casClientMutex > & guard, const caHdrLargeArray * mp,
const void * dp, const int cacStatus, const char * pFileName,
const void * dp, const int cacStatus, const char * pFileName,
const unsigned lineno, const unsigned idIn );
void casChannelDestroyFromInterfaceNotify ( casChannelI & chan,
bool immediatedSestroyNeeded );
static void issuePosponeWhenNonePendingWarning ( const char * pReqTypeStr );
casStrmClient ( const casStrmClient & );
casStrmClient & operator = ( const casStrmClient & );
casStrmClient ( const casStrmClient & );
casStrmClient & operator = ( const casStrmClient & );
};
#define logBadId(GUARD, MP, DP, CACSTAT, RESID) \
@@ -188,4 +194,3 @@ inline ca_uint16_t casStrmClient::protocolRevision () const
}
#endif // casStrmClienth
+26 -28
View File
@@ -3,14 +3,12 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
//
// $Id$
//
// Author Jeff Hill
// Author: Jeff Hill
//
#include <stdio.h>
@@ -35,11 +33,11 @@ const unsigned caServerConnectPendQueueSize = 5u;
// casIntfIO::casIntfIO()
//
casIntfIO::casIntfIO ( const caNetAddr & addrIn ) :
sock ( INVALID_SOCKET ),
sock ( INVALID_SOCKET ),
addr ( addrIn.getSockIP() )
{
int status;
osiSocklen_t addrSize;
int status;
osiSocklen_t addrSize;
bool portChange;
if ( ! osiSockAttach () ) {
@@ -152,9 +150,9 @@ casStreamOS *casIntfIO::newStreamClient ( caServerI & cas,
{
static bool oneMsgFlag = false;
struct sockaddr newAddr;
osiSocklen_t length = ( osiSocklen_t ) sizeof ( newAddr );
SOCKET newSock = epicsSocketAccept ( this->sock, & newAddr, & length );
struct sockaddr newClientAddr;
osiSocklen_t length = ( osiSocklen_t ) sizeof ( newClientAddr );
SOCKET newSock = epicsSocketAccept ( this->sock, & newClientAddr, & length );
if ( newSock == INVALID_SOCKET ) {
int errnoCpy = SOCKERRNO;
if ( errnoCpy != SOCK_EWOULDBLOCK && ! oneMsgFlag ) {
@@ -166,14 +164,14 @@ casStreamOS *casIntfIO::newStreamClient ( caServerI & cas,
}
return NULL;
}
else if ( sizeof ( newAddr ) > (size_t) length ) {
else if ( sizeof ( newClientAddr ) > (size_t) length ) {
epicsSocketDestroy ( newSock );
errlogPrintf ( "CAS: accept returned bad address len?\n" );
return NULL;
}
oneMsgFlag = false;
ioArgsToNewStreamIO args;
args.addr = newAddr;
args.clientAddr = newClientAddr;
args.sock = newSock;
casStreamOS * pOS = new casStreamOS ( cas, bufMgr, args );
if ( ! pOS ) {
@@ -197,25 +195,25 @@ casStreamOS *casIntfIO::newStreamClient ( caServerI & cas,
//
void casIntfIO::setNonBlocking()
{
int status;
osiSockIoctl_t yes = true;
int status;
osiSockIoctl_t yes = true;
status = socket_ioctl(this->sock, FIONBIO, &yes); // X aCC 392
if ( status < 0 ) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf (
"%s:CAS: server non blocking IO set fail because \"%s\"\n",
__FILE__, sockErrBuf );
}
status = socket_ioctl(this->sock, FIONBIO, &yes); // X aCC 392
if ( status < 0 ) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
errlogPrintf (
"%s:CAS: server non blocking IO set fail because \"%s\"\n",
__FILE__, sockErrBuf );
}
}
//
// casIntfIO::getFD()
//
int casIntfIO::getFD() const
{
return this->sock;
return this->sock;
}
//
@@ -223,9 +221,9 @@ int casIntfIO::getFD() const
//
void casIntfIO::show(unsigned level) const
{
if (level>2u) {
printf(" casIntfIO::sock = %d\n", this->sock);
}
if (level>2u) {
printf(" casIntfIO::sock = %d\n", this->sock);
}
}
//
+14 -19
View File
@@ -6,8 +6,9 @@
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
//
// $Id$
// Author: Jeff Hill
//
#include "errlog.h"
@@ -17,11 +18,13 @@
// casStreamIO::casStreamIO()
casStreamIO::casStreamIO ( caServerI & cas, clientBufMemoryManager & bufMgr,
const ioArgsToNewStreamIO & args ) :
casStrmClient ( cas, bufMgr ), sock ( args.sock ), addr ( args.addr),
_osSendBufferSize ( MAX_TCP ), blockingFlag ( xIsBlocking ),
sockHasBeenShutdown ( false )
{
const ioArgsToNewStreamIO & args ) :
casStrmClient ( cas, bufMgr, args.clientAddr ),
sock ( args.sock ),
_osSendBufferSize ( MAX_TCP ),
blockingFlag ( xIsBlocking ),
sockHasBeenShutdown ( false )
{
assert ( sock >= 0 );
int yes = true;
int status;
@@ -152,7 +155,7 @@ outBufClient::flushCondition casStreamIO::osdSend ( const char *pInBuf, bufSizeT
char sockErrBuf[64];
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
char buf[64];
ipAddrToA (&this->addr, buf, sizeof(buf));
this->hostName ( buf, sizeof ( buf ) );
errlogPrintf (
"CAS: TCP socket send to \"%s\" failed because \"%s\"\n",
buf, sockErrBuf );
@@ -197,7 +200,7 @@ casStreamIO::osdRecv ( char * pInBuf, bufSizeT nBytes, // X aCC 361
myerrno != SOCK_ETIMEDOUT ) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
ipAddrToA (&this->addr, buf, sizeof(buf));
this->hostName ( buf, sizeof ( buf ) );
errlogPrintf(
"CAS: client %s disconnected because \"%s\"\n",
buf, sockErrBuf );
@@ -236,10 +239,8 @@ void casStreamIO::osdShow (unsigned level) const
static_cast <const void *> ( this ) );
if (level>1u) {
char buf[64];
ipAddrToA(&this->addr, buf, sizeof(buf));
printf (
"client=%s, port=%x\n",
buf, ntohs(this->addr.sin_port));
this->hostName ( buf, sizeof ( buf ) );
printf ( "client = \"%s\"\n", buf );
}
}
@@ -286,7 +287,7 @@ bufSizeT casStreamIO :: inCircuitBytesPending () const
char sockErrBuf[64];
epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
char buf[64];
ipAddrToA ( &this->addr, buf, sizeof(buf) );
this->hostName ( buf, sizeof ( buf ) );
errlogPrintf ("CAS: FIONREAD for %s failed because \"%s\"\n",
buf, sockErrBuf );
}
@@ -300,12 +301,6 @@ bufSizeT casStreamIO :: inCircuitBytesPending () const
}
}
// casStreamIO::hostName()
void casStreamIO::hostName ( char * pInBuf, unsigned bufSizeIn ) const
{
ipAddrToA ( & this->addr, pInBuf, bufSizeIn );
}
// casStreamIO :: osSendBufferSize ()
bufSizeT casStreamIO :: osSendBufferSize () const
{
+18 -27
View File
@@ -1,13 +1,12 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
//
// $Id$
//
@@ -18,42 +17,34 @@
#include "casStrmClient.h"
struct ioArgsToNewStreamIO {
caNetAddr addr;
SOCKET sock;
caNetAddr clientAddr;
SOCKET sock;
};
class casStreamIO : public casStrmClient {
public:
casStreamIO ( caServerI &, clientBufMemoryManager &,
casStreamIO ( caServerI &, clientBufMemoryManager &,
const ioArgsToNewStreamIO & );
~casStreamIO ();
int getFD () const;
void xSetNonBlocking ();
const caNetAddr getAddr() const;
void hostName ( char *pBuf, unsigned bufSize ) const;
~casStreamIO ();
int getFD () const;
void xSetNonBlocking ();
bufSizeT inCircuitBytesPending () const;
bufSizeT osSendBufferSize () const;
private:
SOCKET sock;
struct sockaddr_in addr;
SOCKET sock;
bufSizeT _osSendBufferSize;
xBlockingStatus blockingFlag;
xBlockingStatus blockingFlag;
bool sockHasBeenShutdown;
xBlockingStatus blockingState() const;
void osdShow ( unsigned level ) const;
outBufClient::flushCondition osdSend ( const char *pBuf, bufSizeT nBytesReq,
bufSizeT & nBytesActual );
inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq,
bufSizeT & nBytesActual );
xBlockingStatus blockingState() const;
void osdShow ( unsigned level ) const;
outBufClient::flushCondition osdSend ( const char *pBuf, bufSizeT nBytesReq,
bufSizeT & nBytesActual );
inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq,
bufSizeT & nBytesActual );
void forceDisconnect ();
casStreamIO ( const casStreamIO & );
casStreamIO & operator = ( const casStreamIO & );
casStreamIO ( const casStreamIO & );
casStreamIO & operator = ( const casStreamIO & );
};
inline const caNetAddr casStreamIO::getAddr() const
{
return caNetAddr ( this->addr );
}
#endif // casStreamIOh
+1
View File
@@ -49,6 +49,7 @@ epicsShareExtern const ENV_PARAM EPICS_CA_REPEATER_PORT;
epicsShareExtern const ENV_PARAM EPICS_CA_SERVER_PORT;
epicsShareExtern const ENV_PARAM EPICS_CA_MAX_ARRAY_BYTES;
epicsShareExtern const ENV_PARAM EPICS_CA_MAX_SEARCH_PERIOD;
epicsShareExtern const ENV_PARAM EPICS_CA_NAME_SERVERS;
epicsShareExtern const ENV_PARAM EPICS_CAS_INTF_ADDR_LIST;
epicsShareExtern const ENV_PARAM EPICS_CAS_IGNORE_ADDR_LIST;
epicsShareExtern const ENV_PARAM EPICS_CAS_AUTO_BEACON_ADDR_LIST;
+68 -10
View File
@@ -3,15 +3,13 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
/*
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 5-88
*
*/
#include <stddef.h>
@@ -2190,9 +2188,9 @@ int rsrv_version_reply ( struct client *client )
}
/*
* search_reply()
* search_reply_udp ()
*/
static int search_reply ( caHdrLargeArray *mp, void *pPayload, struct client *client )
static int search_reply_udp ( caHdrLargeArray *mp, void *pPayload, struct client *client )
{
struct dbAddr tmp_addr;
ca_uint16_t *pMinorVersion;
@@ -2299,6 +2297,66 @@ static int search_reply ( caHdrLargeArray *mp, void *pPayload, struct client *cl
return RSRV_OK;
}
/*
* search_reply_tcp ()
*/
static int search_reply_tcp (
caHdrLargeArray *mp, void *pPayload, struct client *client )
{
struct dbAddr tmp_addr;
char *pName = (char *) pPayload;
int status;
int spaceAvailOnFreeList;
size_t spaceNeeded;
size_t reasonableMonitorSpace = 10;
/*
* check the sanity of the message
*/
if (mp->m_postsize<=1) {
log_header ("empty PV name in UDP search request?",
client, mp, pPayload, 0);
return RSRV_OK;
}
pName[mp->m_postsize-1] = '\0';
/* Exit quickly if channel not on this node */
status = db_name_to_addr (pName, &tmp_addr);
if (status) {
DLOG ( 2, ( "CAS: Lookup for channel \"%s\" failed\n", pPayLoad ) );
if (mp->m_dataType == DOREPLY)
search_fail_reply ( mp, pPayload, client );
return RSRV_OK;
}
/*
* stop further use of server if memory becomes scarse
*/
spaceAvailOnFreeList = freeListItemsAvail ( rsrvChanFreeList ) > 0
&& freeListItemsAvail ( rsrvEventFreeList ) > reasonableMonitorSpace;
spaceNeeded = sizeof (struct channel_in_use) +
reasonableMonitorSpace * sizeof (struct event_ext);
if ( ! ( osiSufficentSpaceInPool(spaceNeeded) || spaceAvailOnFreeList ) ) {
SEND_LOCK(client);
send_err ( mp, ECA_ALLOCMEM, client, "Server memory exhausted" );
SEND_UNLOCK(client);
return RSRV_OK;
}
SEND_LOCK ( client );
status = cas_copy_in_header ( client, CA_PROTO_SEARCH,
0, ca_server_port, 0, ~0U, mp->m_available, 0 );
if ( status != ECA_NORMAL ) {
SEND_UNLOCK ( client );
return RSRV_ERROR;
}
cas_commit_msg ( client, 0 );
SEND_UNLOCK ( client );
return RSRV_OK;
}
typedef int (*pProtoStubTCP) (caHdrLargeArray *mp, void *pPayload, struct client *client);
/*
@@ -2312,7 +2370,7 @@ static const pProtoStubTCP tcpJumpTable[] =
read_action,
write_action,
bad_tcp_cmd_action,
bad_tcp_cmd_action,
search_reply_tcp,
bad_tcp_cmd_action,
events_off_action,
events_on_action,
@@ -2348,7 +2406,7 @@ static const pProtoStubUDP udpJumpTable[] =
bad_udp_cmd_action,
bad_udp_cmd_action,
bad_udp_cmd_action,
search_reply,
search_reply_udp,
bad_udp_cmd_action,
bad_udp_cmd_action,
bad_udp_cmd_action,
+8 -5
View File
@@ -3,17 +3,15 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 5-88
*
*/
#include <stddef.h>
@@ -938,6 +936,11 @@ struct client *create_tcp_client ( SOCKET sock )
return NULL;
}
/*
* add first version message should it be needed
*/
rsrv_version_reply ( client );
if ( CASDEBUG > 0 ) {
char buf[64];
ipAddrToDottedIP ( &client->addr, buf, sizeof(buf) );
+5 -7
View File
@@ -3,15 +3,13 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* Author: Jeffrey O. Hill
* hill@luke.lanl.gov
* (505) 665 1831
* Date: 5-88
*
*/
#ifndef INCLserverh
@@ -29,7 +27,7 @@
#include "asLib.h"
#include "dbAddr.h"
#include "dbNotify.h"
#define CA_MINOR_PROTOCOL_REVISION 11
#define CA_MINOR_PROTOCOL_REVISION 12
#include "caProto.h"
#include "ellLib.h"
#include "epicsTime.h"
@@ -47,7 +45,7 @@ typedef struct caHdrLargeArray {
ca_uint32_t m_available; /* protocol stub dependent */
ca_uint16_t m_dataType; /* operation data type */
ca_uint16_t m_cmmd; /* operation to be performed */
}caHdrLargeArray;
} caHdrLargeArray;
/*
* !! buf must be the first item in this structure !!