improved performance

This commit is contained in:
Jeff Hill
2000-11-08 03:52:18 +00:00
parent 2ded135da9
commit 4784b73a9f
39 changed files with 2480 additions and 2589 deletions
+2 -1
View File
@@ -50,10 +50,11 @@ LIBSRCS += syncGroupNotify.cpp
LIBSRCS += localHostName.cpp
LIBSRCS += comQueRecv.cpp
LIBSRCS += comQueSend.cpp
LIBSRCS += cacPrivateListOfIO.cpp
LIBSRCS += tcpiiuPrivateListOfIO.cpp
LIBSRCS += hostNameCache.cpp
LIBSRCS += ioCounter.cpp
LIBSRCS += msgForMultiplyDefinedPV.cpp
LIBSRCS += limboiiu.cpp
LIBRARY=ca
+3 -2
View File
@@ -346,12 +346,13 @@ int epicsShareAPI ca_add_exception_event ( caExceptionHandler *pfunc, void *arg
/*
* ca_add_masked_array_event
*/
int epicsShareAPI ca_add_masked_array_event ( chtype type, unsigned long count, chid pChan,
int epicsShareAPI ca_add_masked_array_event (
chtype type, unsigned long count, chid pChan,
caEventCallBackFunc *pCallBack, void *pCallBackArg,
ca_real, ca_real, ca_real,
evid *monixptr, long mask )
{
static const long maskMask = USHRT_MAX;
static const long maskMask = 0xffff;
oldSubscription *pSubsr;
int status;
+2 -2
View File
@@ -349,10 +349,10 @@ void verifyBlockingConnect ( appChan *pChans, unsigned chanCount, unsigned repet
* verify that connections to IOC's that are
* not in use are dropped
*/
if ( ca_get_ioc_connection_count() != 0u ) {
if ( ca_get_ioc_connection_count () != 0u ) {
ca_pend_event ( 1.0 );
j=0;
while ( ca_get_ioc_connection_count() != 0u ) {
while ( ca_get_ioc_connection_count () != 0u ) {
printf ( "-" );
ca_pend_event ( 1.0 );
assert ( ++j < 100 );
+8 -17
View File
@@ -13,29 +13,13 @@
#include "iocinf.h"
#include "nciu_IL.h"
osiMutex baseNMIU::mutex;
baseNMIU::baseNMIU ( nciu &chanIn ) :
chan ( chanIn ), attachedToChannel ( true )
chan ( chanIn )
{
chanIn.ioInstall ( *this );
}
baseNMIU::~baseNMIU ()
{
this->uninstallFromChannel ();
}
void baseNMIU::uninstallFromChannel ()
{
this->mutex.lock ();
bool attached = this->attachedToChannel;
this->attachedToChannel = false;
this->mutex.unlock ();
if ( attached ) {
this->chan.ioUninstall ( *this );
}
}
void baseNMIU::destroy ()
@@ -48,8 +32,15 @@ int baseNMIU::subscriptionMsg ()
return ECA_NORMAL;
}
void baseNMIU::subscriptionCancelMsg ()
{
}
void baseNMIU::show ( unsigned /* level */ ) const
{
printf ( "CA IO primitive at %p for channel %s\n",
this, this->chan.pName () );
}
+31
View File
@@ -0,0 +1,31 @@
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef baseNMIU_ILh
#define baseNMIU_ILh
inline ca_uint32_t baseNMIU::getID () const
{
return this->id;
}
inline nciu & baseNMIU::channel ()
{
return this->chan;
}
#endif // baseNMIU
+227 -377
View File
@@ -42,7 +42,6 @@ static void cacInitRecursionLock ( void * )
//
cac::cac ( bool enablePreemptiveCallbackIn ) :
ipToAEngine ( "caIPAddrToAsciiEngine" ),
ioTable ( 1024 ),
chanTable ( 1024 ),
sgTable ( 128 ),
beaconTable ( 1024 ),
@@ -51,6 +50,7 @@ cac::cac ( bool enablePreemptiveCallbackIn ) :
pudpiiu ( 0 ),
pSearchTmr ( 0 ),
pRepeaterSubscribeTmr ( 0 ),
initializingThreadsPriority ( threadGetPrioritySelf () ),
enablePreemptiveCallback ( enablePreemptiveCallbackIn )
{
long status;
@@ -69,11 +69,10 @@ cac::cac ( bool enablePreemptiveCallbackIn ) :
{
threadBoolStatus tbs;
unsigned selfPriority = threadGetPrioritySelf ();
tbs = threadLowestPriorityLevelAbove ( selfPriority, &abovePriority);
tbs = threadLowestPriorityLevelAbove ( this->initializingThreadsPriority, &abovePriority);
if ( tbs != tbsSuccess ) {
abovePriority = selfPriority;
abovePriority = this->initializingThreadsPriority;
}
}
@@ -143,14 +142,15 @@ cac::~cac ()
//
// destroy local IO channels
//
this->defaultMutex.lock ();
tsDLIterBD < cacLocalChannelIO > iter ( this->localChanList.first () );
while ( iter.valid () ) {
tsDLIterBD <cacLocalChannelIO> pnext = iter.itemAfter ();
iter->destroy ();
iter = pnext;
{
osiAutoMutex autoMutex ( this->defaultMutex );
tsDLIterBD < cacLocalChannelIO > iter ( this->localChanList.first () );
while ( iter.valid () ) {
tsDLIterBD < cacLocalChannelIO > pnext = iter.itemAfter ();
iter->destroy ();
iter = pnext;
}
}
this->defaultMutex.unlock ();
//
// make certain that process thread isnt deleting
@@ -169,14 +169,24 @@ cac::~cac ()
//
// shutdown all tcp connections and wait for threads to exit
//
this->iiuListMutex.lock ();
tsDLIterBD <tcpiiu> piiu ( this->iiuList.first () );
while ( piiu.valid () ) {
tsDLIterBD <tcpiiu> pnext = piiu.itemAfter ();
piiu->suicide ();
piiu = pnext;
{
osiAutoMutex autoMutex ( this->iiuListMutex );
tsDLIterBD <tcpiiu> piiu ( this->iiuList.first () );
while ( piiu.valid () ) {
tsDLIterBD <tcpiiu> pnext = piiu.itemAfter ();
piiu->disconnect ();
piiu->suicide ();
piiu = pnext;
}
piiu = this->iiuListLimbo.first ();
while ( piiu.valid () ) {
tsDLIterBD <tcpiiu> pnext = piiu.itemAfter ();
piiu->suicide ();
piiu = pnext;
}
}
this->iiuListMutex.unlock ();
if ( this->pRepeaterSubscribeTmr ) {
delete this->pRepeaterSubscribeTmr;
@@ -198,6 +208,10 @@ cac::~cac ()
// up new clients. this adds an additional
// requirement that threads
//
{
osiAutoMutex autoMutex ( this->defaultMutex );
this->pudpiiu->disconnectAllChan ( limboIIU );
}
delete this->pudpiiu;
}
@@ -211,7 +225,6 @@ cac::~cac ()
this->sgTable.destroyAllEntries ();
this->beaconTable.destroyAllEntries ();
this->chanTable.destroyAllEntries ();
this->ioTable.destroyAllEntries ();
osiSockRelease ();
@@ -220,16 +233,46 @@ cac::~cac ()
void cac::processRecvBacklog ()
{
this->iiuListMutex.lock ();
osiAutoMutex autoMutex ( this->iiuListMutex );
tsDLIterBD <tcpiiu> piiu ( this->iiuList.first () );
tsDLIterBD < tcpiiu > piiu ( this->iiuList.first () );
while ( piiu.valid () ) {
tsDLIterBD <tcpiiu> pNext = piiu.itemAfter ();
piiu->processIncomingAndDestroySelfIfDisconnected ();
tsDLIterBD < tcpiiu > pNext = piiu.itemAfter ();
if ( ! piiu->alive () ) {
assert ( this->pudpiiu && this->pSearchTmr );
bhe *pBHE = piiu->getBHE ();
if ( ! this->enablePreemptiveCallback ) {
if ( this->fdRegFunc ) {
( *this->fdRegFunc )
( (void *) this->fdRegArg, piiu->getSock (), FALSE );
}
}
{
osiAutoMutex autoMutex ( this->defaultMutex );
piiu->disconnectAllChan ( *this->pudpiiu );
}
piiu->disconnect ();
this->iiuList.remove ( *piiu );
this->iiuListLimbo.add ( *piiu );
if ( pBHE ) {
pBHE->destroy ();
}
this->pSearchTmr->resetPeriod ( CA_RECAST_DELAY );
}
else {
piiu->processIncoming ();
}
piiu = pNext;
}
this->iiuListMutex.unlock ();
}
/*
@@ -240,13 +283,12 @@ void cac::flush ()
/*
* set the push pending flag on all virtual circuits
*/
this->iiuListMutex.lock ();
osiAutoMutex autoMutex ( this->iiuListMutex );
tsDLIterBD <tcpiiu> piiu ( this->iiuList.first () );
while ( piiu.valid () ) {
piiu->flush ();
piiu++;
}
this->iiuListMutex.unlock ();
}
unsigned cac::connectionCount () const
@@ -259,21 +301,21 @@ void cac::show ( unsigned level ) const
::printf ( "Channel Access Client Context at %p for user %s\n",
this, this->pUserName );
if (level > 0u ) {
this->iiuListMutex.lock ();
tsDLIterConstBD < tcpiiu > piiu ( this->iiuList.first () );
while ( piiu.valid () ) {
piiu->show ( level - 1u );
piiu++;
}
this->iiuListMutex.unlock ();
{
osiAutoMutex autoMutex ( this->iiuListMutex );
this->defaultMutex.lock ();
tsDLIterConstBD < cacLocalChannelIO > pChan ( this->localChanList.first () );
while ( pChan.valid () ) {
pChan->show ( level - 1u );
pChan++;
}
this->defaultMutex.unlock ();
tsDLIterConstBD < tcpiiu > piiu ( this->iiuList.first () );
while ( piiu.valid () ) {
piiu->show ( level - 1u );
piiu++;
}
tsDLIterConstBD < cacLocalChannelIO > pChan ( this->localChanList.first () );
while ( pChan.valid () ) {
pChan->show ( level - 1u );
pChan++;
}
}
::printf ( "\tconnection time out watchdog period %f\n", this->connTMO );
::printf ( "\tpreemptive calback is %s\n",
@@ -298,8 +340,6 @@ void cac::show ( unsigned level ) const
if ( level > 2u ) {
::printf ( "Program begin time:\n");
this->programBeginTime.show ( level - 3u );
::printf ( "IO identifier hash table:\n" );
this->ioTable.show ( level - 3u );
::printf ( "Channel identifier hash table:\n" );
this->chanTable.show ( level - 3u );
::printf ( "Synchronous group identifier hash table:\n" );
@@ -334,20 +374,6 @@ void cac::show ( unsigned level ) const
}
}
void cac::installIIU ( tcpiiu &iiu )
{
this->iiuListMutex.lock ();
this->iiuList.add (iiu);
this->iiuListMutex.unlock ();
this->defaultMutex.lock ();
if ( ! this->enablePreemptiveCallback && this->fdRegFunc ) {
( * this->fdRegFunc )
( (void *) this->fdRegArg, iiu.getSock (), TRUE );
}
this->defaultMutex.unlock ();
}
void cac::signalRecvActivity ()
{
if ( this->pRecvProcThread ) {
@@ -355,44 +381,14 @@ void cac::signalRecvActivity ()
}
}
void cac::removeIIU ( tcpiiu &iiu )
{
this->defaultMutex.lock ();
osiSockAddr addr = iiu.address ();
if ( addr.sa.sa_family == AF_INET ) {
bhe *pBHE = this->lookupBeaconInetAddr ( addr.ia );
if ( pBHE ) {
pBHE->destroy ();
}
}
else {
errlogPrintf ( "CA server didnt have inet type address?\n" );
}
this->defaultMutex.unlock ();
this->iiuListMutex.lock ();
this->iiuList.remove (iiu);
if ( ! this->enablePreemptiveCallback ) {
if ( this->fdRegFunc ) {
(*this->fdRegFunc)
((void *)this->fdRegArg, iiu.getSock (), FALSE);
}
}
this->iiuListMutex.unlock ();
}
/*
* cac::lookupBeaconInetAddr()
*/
bhe * cac::lookupBeaconInetAddr (const inetAddrID &ina)
{
osiAutoMutex autoMutex ( this->defaultMutex );
bhe *pBHE;
this->defaultMutex.lock ();
pBHE = this->beaconTable.lookup (ina);
this->defaultMutex.unlock ();
return pBHE;
}
@@ -401,10 +397,9 @@ bhe * cac::lookupBeaconInetAddr (const inetAddrID &ina)
*/
bhe *cac::createBeaconHashEntry (const inetAddrID &ina, const osiTime &initialTimeStamp)
{
osiAutoMutex autoMutex ( this->defaultMutex );
bhe *pBHE;
this->defaultMutex.lock ();
pBHE = this->beaconTable.lookup ( ina );
if ( !pBHE ) {
pBHE = new bhe (*this, initialTimeStamp, ina);
@@ -416,8 +411,6 @@ bhe *cac::createBeaconHashEntry (const inetAddrID &ina, const osiTime &initialTi
}
}
this->defaultMutex.unlock ();
return pBHE;
}
@@ -426,6 +419,7 @@ bhe *cac::createBeaconHashEntry (const inetAddrID &ina, const osiTime &initialTi
*/
void cac::beaconNotify ( const inetAddrID &addr )
{
osiAutoMutex autoMutex ( this->defaultMutex );
bhe *pBHE;
unsigned port;
int netChange;
@@ -434,8 +428,6 @@ void cac::beaconNotify ( const inetAddrID &addr )
return;
}
this->defaultMutex.lock ();
/*
* look for it in the hash table
*/
@@ -456,7 +448,6 @@ void cac::beaconNotify ( const inetAddrID &addr )
}
if ( ! netChange ) {
this->defaultMutex.unlock ();
return;
}
@@ -480,7 +471,6 @@ void cac::beaconNotify ( const inetAddrID &addr )
status = getsockname ( this->pudpiiu->getSock (), (struct sockaddr *) &saddr, &saddr_length );
if ( status < 0 ) {
this->printf ( "CAC: getsockname () error was \"%s\"\n", SOCKERRSTR (SOCKERRNO) );
this->defaultMutex.unlock ();
return;
}
port = ntohs ( saddr.sin_port );
@@ -498,8 +488,6 @@ void cac::beaconNotify ( const inetAddrID &addr )
}
}
this->defaultMutex.unlock ();
this->pudpiiu->resetChannelRetryCounts ();
# if DEBUG
@@ -517,12 +505,10 @@ void cac::beaconNotify ( const inetAddrID &addr )
*/
void cac::removeBeaconInetAddr (const inetAddrID &ina)
{
osiAutoMutex autoMutex ( this->defaultMutex );
bhe *pBHE;
this->defaultMutex.lock ();
pBHE = this->beaconTable.remove ( ina );
this->defaultMutex.unlock ();
assert (pBHE);
}
@@ -630,186 +616,19 @@ bool cac::ioComplete () const
}
}
void cac::ioInstall ( baseNMIU &io )
{
this->defaultMutex.lock ();
this->ioTable.add ( io );
this->defaultMutex.unlock ();
}
void cac::ioUninstall ( unsigned id )
{
this->defaultMutex.lock ();
this->ioTable.remove ( id );
this->defaultMutex.unlock ();
}
void cac::ioDestroy ( unsigned id )
{
this->defaultMutex.lock ();
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( pmiu ) {
pmiu->uninstallFromChannel ();
}
this->defaultMutex.unlock ();
// care is taken to not destroy with the cac lock
// applied because we could potentially hold the
// cac lock while sending and deadlock with the
// recv thread, but we must uninstall the IO
// before accessing it with the lock released
if ( pmiu ) {
pmiu->destroy ();
}
}
void cac::ioCompletionNotify ( unsigned id )
{
this->defaultMutex.lock ();
baseNMIU * pmiu = this->ioTable.lookup ( id );
if ( pmiu ) {
pmiu->completionNotify ();
}
this->defaultMutex.unlock ();
}
void cac::ioCompletionNotify ( unsigned id, unsigned type,
unsigned long count, const void *pData )
{
this->defaultMutex.lock ();
baseNMIU * pmiu = this->ioTable.lookup ( id );
if ( pmiu ) {
pmiu->completionNotify ( type, count, pData );
}
this->defaultMutex.unlock ();
}
void cac::ioExceptionNotify ( unsigned id, int status, const char *pContext )
{
this->defaultMutex.lock ();
baseNMIU * pmiu = this->ioTable.lookup ( id );
if ( pmiu ) {
pmiu->exceptionNotify ( status, pContext );
}
this->defaultMutex.unlock ();
}
void cac::ioExceptionNotify ( unsigned id, int status,
const char *pContext, unsigned type, unsigned long count )
{
this->defaultMutex.lock ();
baseNMIU * pmiu = this->ioTable.lookup ( id );
if ( pmiu ) {
pmiu->exceptionNotify ( status, pContext, type, count );
}
this->defaultMutex.unlock ();
}
void cac::ioCompletionNotifyAndDestroy ( unsigned id )
{
this->defaultMutex.lock ();
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( pmiu ) {
pmiu->uninstallFromChannel ();
}
this->defaultMutex.unlock ();
// care is taken to not destroy with the cac lock
// applied because we could potentially hold the
// cac lock while sending and deadlock with the
// recv thread, but we must uninstall the IO
// before accessing it with the lock released
if ( pmiu ) {
pmiu->completionNotify ();
pmiu->destroy ();
}
}
void cac::ioCompletionNotifyAndDestroy ( unsigned id,
unsigned type, unsigned long count, const void *pData )
{
this->defaultMutex.lock ();
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( pmiu ) {
pmiu->uninstallFromChannel ();
}
this->defaultMutex.unlock ();
// care is taken to not destroy with the cac lock
// applied because we could potentially hold the
// cac lock while sending and deadlock with the
// recv thread, but we must uninstall the IO
// before accessing it with the lock released
if ( pmiu ) {
pmiu->completionNotify ( type, count, pData );
pmiu->destroy ();
}
}
void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext )
{
this->defaultMutex.lock ();
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( pmiu ) {
pmiu->uninstallFromChannel ();
}
this->defaultMutex.unlock ();
// care is taken to not destroy with the cac lock
// applied because we could potentially hold the
// cac lock while sending and deadlock with the
// recv thread, but we must uninstall the IO
// before accessing it with the lock released
if ( pmiu ) {
pmiu->exceptionNotify ( status, pContext );
pmiu->destroy ();
}
}
void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
const char *pContext, unsigned type, unsigned long count )
{
this->defaultMutex.lock ();
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( pmiu ) {
pmiu->uninstallFromChannel ();
}
this->defaultMutex.unlock ();
// care is taken to not destroy with the cac lock
// applied because we could potentially hold the
// cac lock while sending and deadlock with the
// recv thread, but we must uninstall the IO
// before accessing it with the lock released
if ( pmiu ) {
pmiu->exceptionNotify ( status, pContext, type, count );
pmiu->destroy ();
}
}
void cac::registerChannel (nciu &chan)
{
this->defaultMutex.lock ();
this->chanTable.add ( chan );
this->defaultMutex.unlock ();
}
void cac::unregisterChannel ( nciu &chan )
{
this->defaultMutex.lock ();
this->chanTable.remove ( chan );
this->defaultMutex.unlock ();
}
void cac::accessRightsNotify ( unsigned id, caar ar )
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
nciu * pChan = this->chanTable.lookup ( id );
if ( pChan ) {
pChan->accessRightsStateChange ( ar );
}
this->defaultMutex.unlock ();
}
void cac::connectChannel ( bool v44Ok, unsigned id,
unsigned nativeType, unsigned long nativeCount, unsigned sid )
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
nciu * pChan = this->chanTable.lookup ( id );
if ( pChan ) {
unsigned sidTmp;
@@ -821,64 +640,60 @@ void cac::connectChannel ( bool v44Ok, unsigned id,
}
pChan->connect ( nativeType, nativeCount, sidTmp );
}
this->defaultMutex.unlock ();
}
// this is to only be used by early protocol revisions
void cac::connectChannel ( unsigned id )
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
nciu * pChan = this->chanTable.lookup ( id );
if ( pChan ) {
pChan->connect ();
}
this->defaultMutex.unlock ();
}
void cac::channelDestroy ( unsigned id )
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
nciu * pChan = this->chanTable.lookup ( id );
// channel should already have been deleted
if ( pChan ) {
pChan->destroy ();
epicsPrintf ( "cac: received invalid channel delete verification?\n" );
}
this->defaultMutex.unlock ();
}
void cac::disconnectChannel ( unsigned id )
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
nciu * pChan = this->chanTable.lookup ( id );
if ( pChan ) {
pChan->disconnect ();
assert ( this->pudpiiu && this->pSearchTmr );
pChan->disconnect ( *this->pudpiiu );
this->pSearchTmr->resetPeriod ( CA_RECAST_DELAY );
}
this->defaultMutex.unlock ();
}
void cac::installCASG (CASG &sg)
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
this->sgTable.add ( sg );
this->defaultMutex.unlock ();
}
void cac::uninstallCASG (CASG &sg)
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
this->sgTable.remove ( sg );
this->defaultMutex.unlock ();
}
CASG * cac::lookupCASG (unsigned id)
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
CASG * psg = this->sgTable.lookup ( id );
if ( psg ) {
if ( ! psg->verify () ) {
psg = 0;
}
}
this->defaultMutex.unlock ();
return psg;
}
@@ -909,18 +724,23 @@ bool cac::createChannelIO (const char *pName, cacChannel &chan)
if ( ! pIO ) {
pIO = cacGlobalServiceList.createChannelIO ( pName, *this, chan );
if ( ! pIO ) {
if ( ! this->pudpiiu ) {
if ( ! this->pudpiiu || ! this->pSearchTmr ) {
if ( ! this->setupUDP () ) {
return false;
}
}
nciu *pNetChan = new nciu ( *this, chan, pName );
nciu *pNetChan = new nciu ( *this, *this->pudpiiu, chan, pName );
if ( pNetChan ) {
if ( ! pNetChan->fullyConstructed () ) {
pNetChan->destroy ();
return false;
}
else {
osiAutoMutex autoMutex ( this->defaultMutex );
chan.attachIO ( *pNetChan );
this->chanTable.add ( *pNetChan );
this->pudpiiu->attachChannel ( *pNetChan );
this->pSearchTmr->resetPeriod ( CA_RECAST_DELAY );
return true;
}
}
@@ -929,53 +749,49 @@ bool cac::createChannelIO (const char *pName, cacChannel &chan)
}
}
}
this->defaultMutex.lock ();
this->localChanList.add ( *pIO );
this->defaultMutex.unlock ();
{
osiAutoMutex autoMutex ( this->defaultMutex );
this->localChanList.add ( *pIO );
}
return true;
}
void cac::uninstallLocalChannel ( cacLocalChannelIO &localIO )
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
this->localChanList.remove ( localIO );
this->defaultMutex.unlock ();
}
bool cac::setupUDP ()
{
this->defaultMutex.lock ();
{
osiAutoMutex autoMutex ( this->defaultMutex );
if ( this->pudpiiu ) {
this->defaultMutex.unlock ();
return true;
if ( this->pudpiiu ) {
return true;
}
this->pudpiiu = new udpiiu ( *this );
if ( ! this->pudpiiu ) {
return false;
}
this->pSearchTmr = new searchTimer ( *this->pudpiiu, *this->pTimerQueue );
if ( ! this->pSearchTmr ) {
delete this->pudpiiu;
this->pudpiiu = 0;
return false;
}
this->pRepeaterSubscribeTmr = new repeaterSubscribeTimer ( *this->pudpiiu, *this->pTimerQueue );
if ( ! this->pRepeaterSubscribeTmr ) {
delete this->pSearchTmr;
delete this->pudpiiu;
this->pudpiiu = 0;
return false;
}
}
this->pudpiiu = new udpiiu ( *this );
if ( ! this->pudpiiu ) {
this->defaultMutex.unlock ();
return false;
}
this->pSearchTmr = new searchTimer ( *this->pudpiiu, *this->pTimerQueue );
if ( ! this->pSearchTmr ) {
delete this->pudpiiu;
this->pudpiiu = 0;
this->defaultMutex.unlock ();
return false;
}
this->pRepeaterSubscribeTmr = new repeaterSubscribeTimer ( *this->pudpiiu, *this->pTimerQueue );
if ( ! this->pRepeaterSubscribeTmr ) {
delete this->pSearchTmr;
delete this->pudpiiu;
this->pudpiiu = 0;
this->defaultMutex.unlock ();
return false;
}
this->defaultMutex.unlock ();
if ( ! this->enablePreemptiveCallback ) {
if ( this->fdRegFunc ) {
( *this->fdRegFunc )
@@ -988,10 +804,9 @@ bool cac::setupUDP ()
void cac::registerForFileDescriptorCallBack ( CAFDHANDLER *pFunc, void *pArg )
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
this->fdRegFunc = pFunc;
this->fdRegArg = pArg;
this->defaultMutex.unlock ();
}
void cac::enableCallbackPreemption ()
@@ -1010,7 +825,7 @@ void cac::disableCallbackPreemption ()
void cac::changeExceptionEvent ( caExceptionHandler *pfunc, void *arg )
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
if ( pfunc ) {
this->ca_exception_func = pfunc;
this->ca_exception_arg = arg;
@@ -1019,7 +834,6 @@ void cac::changeExceptionEvent ( caExceptionHandler *pfunc, void *arg )
this->ca_exception_func = ca_default_exception_handler;
this->ca_exception_arg = NULL;
}
this->defaultMutex.unlock ();
}
//
@@ -1045,25 +859,16 @@ void cac::genLocalExcepWFL (long stat, const char *ctx, const char *pFile, unsig
args.pFile = pFile;
args.lineNo = lineNo;
this->defaultMutex.lock ();
pExceptionFunc = this->ca_exception_func;
args.usr = this->ca_exception_arg;
this->defaultMutex.unlock ();
{
osiAutoMutex autoMutex ( this->defaultMutex );
pExceptionFunc = this->ca_exception_func;
args.usr = this->ca_exception_arg;
}
(*pExceptionFunc) (args);
}
}
void cac::installDisconnectedChannel ( nciu &chan )
{
assert ( this->pudpiiu && this->pSearchTmr );
chan.attachChanToIIU ( *this->pudpiiu );
chan.resetRetryCount ();
this->pSearchTmr->resetPeriod ( CA_RECAST_DELAY );
}
void cac::notifySearchResponse ( unsigned short retrySeqNo )
{
if ( this->pSearchTmr ) {
@@ -1080,14 +885,13 @@ void cac::repeaterSubscribeConfirmNotify ()
void cac::replaceErrLogHandler ( caPrintfFunc *ca_printf_func )
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
if ( ca_printf_func ) {
this->pVPrintfFunc = ca_printf_func;
}
else {
this->pVPrintfFunc = epicsVprintf;
}
this->defaultMutex.unlock ();
}
/*
@@ -1105,37 +909,59 @@ tcpiiu * cac::constructTCPIIU ( const osiSockAddr &addr, unsigned minorVersion )
/*
* look for an existing virtual circuit
*/
this->defaultMutex.lock ();
pBHE = this->lookupBeaconInetAddr ( addr.ia );
if ( ! pBHE ) {
pBHE = this->createBeaconHashEntry ( addr.ia, osiTime () );
{
osiAutoMutex autoMutex ( this->defaultMutex );
pBHE = this->lookupBeaconInetAddr ( addr.ia );
if ( ! pBHE ) {
this->defaultMutex.unlock ();
return NULL;
pBHE = this->createBeaconHashEntry ( addr.ia, osiTime () );
if ( ! pBHE ) {
return NULL;
}
}
}
piiu = pBHE->getIIU ();
if ( piiu ) {
if ( piiu->alive () ) {
this->defaultMutex.unlock ();
return piiu;
}
else {
this->defaultMutex.unlock ();
return NULL;
piiu = pBHE->getIIU ();
if ( piiu ) {
if ( piiu->alive () ) {
return piiu;
}
else {
return NULL;
}
}
}
this->defaultMutex.unlock ();
piiu = new tcpiiu ( *this, addr, minorVersion,
*pBHE, this->connTMO, *this->pTimerQueue,
this->ipToAEngine );
{
osiAutoMutex autoMutex ( this->iiuListMutex );
piiu = iiuListLimbo.get ();
}
if ( ! piiu ) {
return NULL;
piiu = new tcpiiu ( *this, this->connTMO, *this->pTimerQueue );
if ( ! piiu ) {
return NULL;
}
}
if ( piiu->fullyConstructed () ) {
{
osiAutoMutex autoMutex ( this->iiuListMutex );
this->iiuList.add ( *piiu );
}
if ( ! piiu->initiateConnect ( addr, minorVersion, *pBHE, this->ipToAEngine ) ) {
osiAutoMutex autoMutex ( this->iiuListMutex );
this->iiuList.remove ( *piiu );
this->iiuListLimbo.add ( *piiu );
return NULL;
}
{
osiAutoMutex autoMutex ( this->defaultMutex );
if ( ! this->enablePreemptiveCallback && this->fdRegFunc ) {
( * this->fdRegFunc )
( (void *) this->fdRegArg, piiu->getSock (), TRUE );
}
}
return piiu;
}
else {
@@ -1152,7 +978,7 @@ void cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
tcpiiu *allocpiiu;
{
this->defaultMutex.lock ();
osiAutoMutex autoMutex ( this->defaultMutex );
nciu *chan;
/*
@@ -1160,7 +986,6 @@ void cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
*/
chan = this->chanTable.lookup ( cid );
if ( ! chan ) {
this->defaultMutex.unlock ();
return;
}
@@ -1169,36 +994,61 @@ void cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
/*
* Ignore duplicate search replies
*/
if ( chan->connectionInProgress ( addr ) ) {
this->defaultMutex.unlock ();
if ( chan->isAttachedToVirtaulCircuit ( addr ) ) {
return;
}
allocpiiu = this->constructTCPIIU ( addr, minorVersionNumber );
if ( ! allocpiiu ) {
this->defaultMutex.unlock ();
return;
}
/*
* remove it from the broadcast niiu
*/
chan->searchReplySetUp ( sid, typeCode, count );
allocpiiu->installChannelPendingClaim ( *chan );
this->pudpiiu->detachChannel ( *chan );
chan->searchReplySetUp ( *allocpiiu, sid, typeCode, count );
allocpiiu->attachChannel ( *chan );
this->defaultMutex.unlock ();
chan->createChannelRequest ();
// wake up send thread which ultimately sends the claim message
allocpiiu->flush ();
if ( ! allocpiiu->ca_v42_ok () ) {
chan->connect ();
}
}
this->notifySearchResponse ( retrySeqNumber );
return;
}
bool cac::currentThreadIsRecvProcessThread ()
void cac::destroyNCIU ( nciu & chan )
{
{
osiAutoMutex autoMutex ( this->defaultMutex );
nciu *pChan = this->chanTable.remove ( chan );
assert ( pChan = &chan );
chan.getPIIU ()->detachChannel ( chan );
}
chan.cacDestroy ();
}
// the recv thread is not permitted to flush as this
// can result in a push / pull deadlock on the TCP pipe.
// Instead, the recv thread scheduals the flush with the
// send thread which runs at a higher priority than the
// send thread. The same applies to the UDP thread for
// locking hierarchy reasons.
bool cac::flushPermit () const
{
if ( this->pRecvProcThread ) {
return this->pRecvProcThread->isCurrentThread ();
if ( this->pRecvProcThread->isCurrentThread () ) {
return false;
}
}
else {
return false;
if ( this->pudpiiu ) {
if ( this->pudpiiu->isCurrentThread () ) {
return false;
}
}
return true;
}
+1 -1
View File
@@ -86,7 +86,7 @@ int cacChannel::subscribe ( unsigned type, unsigned long count,
{
cacChannelIO *pIO = this->pChannelIO;
if ( pIO ) {
return pIO->subscribe (type, count, mask, notify);
return pIO->subscribe ( type, count, mask, notify );
}
else {
return ECA_DISCONNCHID;
-21
View File
@@ -30,24 +30,3 @@ void cacNotifyIO::destroy ()
{
delete this;
}
void cacNotifyIO::completionNotify ()
{
this->notify.completionNotify ();
}
void cacNotifyIO::completionNotify ( unsigned type, unsigned long count, const void *pData )
{
this->notify.completionNotify ( type, count, pData );
}
void cacNotifyIO::exceptionNotify ( int status, const char *pContext )
{
this->notify.exceptionNotify ( status, pContext );
}
void cacNotifyIO::exceptionNotify ( int status, const char *pContext, unsigned type, unsigned long count )
{
this->notify.exceptionNotify ( status, pContext, type, count );
}
+31
View File
@@ -0,0 +1,31 @@
/* $Id$
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
* Author: Jeff Hill
*/
inline void cacNotifyIO::completionNotify ()
{
this->notify.completionNotify ();
}
inline void cacNotifyIO::completionNotify ( unsigned type, unsigned long count, const void *pData )
{
this->notify.completionNotify ( type, count, pData );
}
inline void cacNotifyIO::exceptionNotify ( int status, const char *pContext )
{
this->notify.exceptionNotify ( status, pContext );
}
inline void cacNotifyIO::exceptionNotify ( int status, const char *pContext, unsigned type, unsigned long count )
{
this->notify.exceptionNotify ( status, pContext, type, count );
}
-95
View File
@@ -1,95 +0,0 @@
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#include "iocinf.h"
cacPrivateListOfIO::cacPrivateListOfIO ( cac &cacIn ) :
cacCtx ( cacIn )
{
}
void cacPrivateListOfIO::addIO ( baseNMIU &io )
{
this->cacCtx.cacPrivateListOfIOPrivate::mutex.lock ();
this->eventq.add ( io );
this->cacCtx.cacPrivateListOfIOPrivate::mutex.unlock ();
}
void cacPrivateListOfIO::removeIO ( baseNMIU &io )
{
this->cacCtx.cacPrivateListOfIOPrivate::mutex.lock ();
this->eventq.remove ( io );
this->cacCtx.cacPrivateListOfIOPrivate::mutex.unlock ();
}
// Destroy all IO blocks attached.
// Care is taken here not to hold the lock while
// sending a subscription delete message (which
// would result in deadlocks)
void cacPrivateListOfIO::destroyAllIO ()
{
while ( true ) {
unsigned id;
bool done;
this->cacCtx.cacPrivateListOfIOPrivate::mutex.lock ();
{
baseNMIU *pNMIU = this->eventq.first ();
if ( pNMIU ) {
id = pNMIU->getId ();
done = false;
}
else {
id = UINT_MAX;
done = true;
}
}
this->cacCtx.cacPrivateListOfIOPrivate::mutex.unlock ();
if ( done ) {
break;
}
// care is taken to not hold a lock when
// executing this
this->cacCtx.ioDestroy ( id );
}
}
// resubscribe for monitors from this channel
void cacPrivateListOfIO::subscribeAllIO ()
{
this->cacCtx.cacPrivateListOfIOPrivate::mutex.lock ();
tsDLIterBD < baseNMIU > iter = this->eventq.first ();
while ( iter.valid () ) {
iter->subscriptionMsg ();
iter++;
}
this->cacCtx.cacPrivateListOfIOPrivate::mutex.unlock ();
}
// cancel IO operations and monitor subscriptions
void cacPrivateListOfIO::disconnectAllIO ( const char *pHostName )
{
this->cacCtx.cacPrivateListOfIOPrivate::mutex.lock ();
tsDLIterBD < baseNMIU > iter = this->eventq.first ();
while ( iter.valid () ) {
tsDLIterBD < baseNMIU > next = iter.itemAfter ();
iter->disconnect ( pHostName );
iter = next;
}
this->cacCtx.cacPrivateListOfIOPrivate::mutex.unlock ();
}
+6
View File
@@ -49,3 +49,9 @@ inline void cac::ipAddrToAsciiAsynchronousRequestInstall ( ipAddrToAsciiAsynchro
request.ioInitiate ( this->ipToAEngine );
}
inline unsigned cac::getInitializingThreadsPriority () const
{
return this->initializingThreadsPriority;
}
-39
View File
@@ -1,39 +0,0 @@
/* $Id$
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
* Author: Jeff Hill
*/
inline claimMsgCache::claimMsgCache ( bool v44In ) :
pStr ( 0 ), clientId ( UINT_MAX ), serverId ( UINT_MAX ), currentStrLen ( 0u ),
bufLen ( 0u ), v44 ( v44In )
{
}
inline claimMsgCache::~claimMsgCache ()
{
if ( this->pStr ) {
delete this->pStr;
}
}
inline int claimMsgCache::deliverMsg ( netiiu &iiu )
{
if ( v44 ) {
return iiu.createChannelRequest ( this->clientId, this->pStr, this->currentStrLen );
}
else {
return iiu.createChannelRequest ( this->serverId, 0u, 0u );
}
}
inline void claimMsgCache::connectChannel ( cac & cacRef )
{
cacRef.connectChannel ( this->clientId );
}
+22 -9
View File
@@ -66,12 +66,14 @@ inline bool comBuf::copyInAllBytes ( const void *pBuf, unsigned nBytes )
inline unsigned comBuf::copyInBytes ( const void *pBuf, unsigned nBytes )
{
unsigned available = this->unoccupiedBytes ();
if ( nBytes > available ) {
nBytes = available;
if ( nBytes > 0u ) {
unsigned available = this->unoccupiedBytes ();
if ( nBytes > available ) {
nBytes = available;
}
memcpy ( &this->buf[this->nextWriteIndex], pBuf, nBytes);
this->nextWriteIndex += nBytes;
}
memcpy ( &this->buf[this->nextWriteIndex], pBuf, nBytes);
this->nextWriteIndex += nBytes;
return nBytes;
}
@@ -119,11 +121,22 @@ inline unsigned comBuf::maxBytes ()
return comBufSize;
}
inline bool comBuf::flushToWire ( class comQueSend &que, bool enablePreemptionDuringFlush )
inline void comBuf::compress ()
{
if ( this->nextReadIndex > 0u ) {
memmove ( this->buf, &this->buf[this->nextReadIndex],
this->nextWriteIndex - this->nextReadIndex );
this->nextWriteIndex -= this->nextReadIndex;
this->nextReadIndex = 0u;
}
}
inline bool comBuf::flushToWire ( wireSendAdapter &wire )
{
unsigned occupied = this->occupiedBytes ();
while ( occupied ) {
unsigned nBytes = que.sendBytes ( &this->buf[this->nextReadIndex], occupied, enablePreemptionDuringFlush );
unsigned nBytes = wire.sendBytes ( &this->buf[this->nextReadIndex],
occupied );
if ( nBytes == 0u ) {
this->nextReadIndex = this->nextWriteIndex;
return false;
@@ -134,9 +147,9 @@ inline bool comBuf::flushToWire ( class comQueSend &que, bool enablePreemptionDu
return true;
}
inline unsigned comBuf::fillFromWire ( class comQueRecv &que )
inline unsigned comBuf::fillFromWire ( wireRecvAdapter &wire )
{
unsigned nNewBytes = que.recvBytes ( &this->buf[this->nextWriteIndex],
unsigned nNewBytes = wire.recvBytes ( &this->buf[this->nextWriteIndex],
sizeof ( this->buf ) - this->nextWriteIndex );
this->nextWriteIndex += nNewBytes;
return nNewBytes;
+24 -39
View File
@@ -17,37 +17,43 @@
#include "iocinf.h"
#include "comBuf_IL.h"
comQueRecv::comQueRecv ()
{
}
comQueRecv::~comQueRecv ()
{
this->clear ();
}
void comQueRecv::clear ()
{
comBuf *pBuf;
this->mutex.lock ();
while ( ( pBuf = this->bufs.get () ) ) {
pBuf->destroy ();
}
this->mutex.unlock ();
}
unsigned comQueRecv::occupiedBytes () const
{
this->mutex.lock ();
unsigned count = this->bufs.count ();
unsigned nBytes;
if ( count >= 2u ) {
nBytes = this->bufs.first ()->occupiedBytes ();
nBytes += this->bufs.last ()->occupiedBytes ();
nBytes += ( count - 2u ) * comBuf::maxBytes ();
if ( count == 0u ) {
nBytes = 0u;
}
else if ( count == 1u ) {
nBytes = this->bufs.first ()->occupiedBytes ();
}
else {
nBytes = 0u;
// this requires the compress operation in
// copyIn ( comBuf & bufIn )
nBytes = this->bufs.first ()->occupiedBytes ();
nBytes += this->bufs.last ()->occupiedBytes ();
nBytes += ( count - 2u ) * comBuf::maxBytes ();
}
this->mutex.unlock ();
return nBytes;
}
@@ -55,11 +61,8 @@ bool comQueRecv::copyOutBytes ( void *pBuf, unsigned nBytes )
{
char *pCharBuf = static_cast < char * > ( pBuf );
this->mutex.lock ();
// dont return partial message
if ( nBytes > this->occupiedBytes () ) {
this->mutex.unlock ();
return false;
}
@@ -74,40 +77,22 @@ bool comQueRecv::copyOutBytes ( void *pBuf, unsigned nBytes )
}
}
this->mutex.unlock ();
return true;
}
unsigned comQueRecv::fillFromWire ()
void comQueRecv::pushLastComBufReceived ( comBuf & bufIn )
{
// this approach requires that only one thread performs fill
// but its advantage is that the lock is not held while filling
comBuf *pComBuf = new comBuf;
if ( ! pComBuf ) {
// no way to be informed when memory is available
threadSleep ( 0.5 );
return 0u;
}
unsigned nNewBytes = pComBuf->fillFromWire ( *this );
this->mutex.lock ();
comBuf *pLastBuf = this->bufs.last ();
if ( pLastBuf ) {
pLastBuf->copyIn ( *pComBuf );
pLastBuf->copyIn ( bufIn );
}
if ( pComBuf->occupiedBytes () ) {
this->bufs.add ( *pComBuf );
if ( bufIn.occupiedBytes () ) {
// move occupied bytes to the start of the buffer
bufIn.compress ();
this->bufs.add ( bufIn );
}
else {
pComBuf->destroy ();
bufIn.destroy ();
}
this->mutex.unlock ();
return nNewBytes;
}
+50 -647
View File
@@ -59,22 +59,13 @@
#include "iocinf.h"
#include "comBuf_IL.h"
#include "comQueSend_IL.h"
tsFreeList < class comBuf, 0x20 > comBuf::freeList;
// nill message pad bytes
static const char nillBytes [] =
{
0, 0, 0, 0,
0, 0, 0, 0
};
inline bufferReservoir::~bufferReservoir ()
{
comBuf *pBuf;
while ( ( pBuf = this->reservedBufs.get () ) ) {
pBuf->destroy ();
}
this->drain ();
}
inline comBuf *bufferReservoir::fetchOneBuffer ()
@@ -99,47 +90,49 @@ inline unsigned bufferReservoir::nBytes ()
return ( this->reservedBufs.count () * comBuf::maxBytes () );
}
comQueSend::comQueSend ( wireSendAdapter & wireIn ) :
wire ( wireIn ), nBytesPending ( 0u )
{
}
comQueSend::~comQueSend ()
{
this->clear ();
}
void comQueSend::clear ()
{
comBuf *pBuf;
while ( ( pBuf = this->bufs.get () ) ) {
this->nBytesPending -= pBuf->occupiedBytes ();
pBuf->destroy ();
}
this->reservoir.drain ();
}
// reserve sufficent space for entire message
// (this allows the recv thread to add a message
// to the que while some other thread is flushing
// and therefore prevents deadlocks, and it also
// allows proper status to be returned)
inline int comQueSend::lockAndReserveSpace ( unsigned msgSize,
bufferReservoir &reservoir, bool enablePreemptionDuringFlush )
int comQueSend::reserveSpace ( unsigned msgSize )
{
unsigned bytesReserved = reservoir.nBytes ();
unsigned bytesReserved;
this->mutex.lock ();
bytesReserved = this->reservoir.nBytes ();
while ( true ) {
comBuf *pComBuf = this->bufs.last ();
if ( pComBuf ) {
bytesReserved += pComBuf->unoccupiedBytes ();
}
if ( bytesReserved >= msgSize || this->bufs.count () < 4 ) {
break;
}
if ( ! this->flushToWirePermit () ) {
if ( this->bufs.count () >= 32u ) {
this->mutex.unlock ();
return ECA_TOLARGE;
}
break;
}
this->mutex.unlock ();
this->flushToWire ( enablePreemptionDuringFlush );
this->mutex.lock ();
comBuf *pComBuf = this->bufs.last ();
if ( pComBuf ) {
bytesReserved += pComBuf->unoccupiedBytes ();
}
while ( bytesReserved < msgSize ) {
if ( reservoir.addOneBuffer() ) {
if ( reservoir.addOneBuffer () ) {
bytesReserved += comBuf::maxBytes ();
}
else {
this->mutex.unlock ();
return ECA_ALLOCMEM;
}
}
@@ -147,100 +140,43 @@ inline int comQueSend::lockAndReserveSpace ( unsigned msgSize,
return ECA_NORMAL;
}
// 1) This routine is private because it assumes that the lock
// is applied
//
// 2) This routine does not return status because of the following
// argument. The routine can fail because the wire disconnects or
// because their isnt memory to create a buffer. For the former we
// just discard the message, but do not fail. For the latter we
// shutdown() the connection and discard the rest of the message
// (this eliminates the possibility of message fragments getting
// onto the wire).
//
// 3) Arguments here are a bit verbose until compilers all implement
// member template functions.
//
template < class T >
inline void comQueSend_copyIn ( tsDLList < comBuf > &comBufList,
bufferReservoir &reservoir, const T *pVal, unsigned nElem )
void comQueSend::copy_dbr_string ( const void *pValue, unsigned nElem )
{
unsigned nCopied;
comBuf *pComBuf = comBufList.last ();
if ( pComBuf ) {
nCopied = pComBuf->copyIn ( pVal, nElem );
}
else {
nCopied = 0u;
}
while ( nElem > nCopied ) {
pComBuf = reservoir.fetchOneBuffer ();
//
// This fails only if space was not preallocated.
// See comments at the top of this program on
// why space must always be preallocated.
//
assert ( pComBuf );
nCopied += pComBuf->copyIn ( &pVal[nCopied], nElem - nCopied );
comBufList.add ( *pComBuf );
}
comQueSend_copyIn ( this->nBytesPending, this->bufs, this->reservoir,
static_cast <const dbr_string_t *> ( pValue ), nElem );
}
template < class T >
inline void comQueSend_copyIn ( tsDLList < comBuf > &comBufList, bufferReservoir &reservoir, const T &val )
void comQueSend::copy_dbr_short ( const void *pValue, unsigned nElem )
{
comBuf *pComBuf = comBufList.last ();
if ( pComBuf ) {
if ( pComBuf->copyIn ( &val, 1u ) >= 1u ) {
return;
}
}
pComBuf = reservoir.fetchOneBuffer ();
//
// This fails only if space was not preallocated.
// See comments at the top of this program on
// space must always be preallocated.
//
assert ( pComBuf );
pComBuf->copyIn ( &val, 1u );
comBufList.add ( *pComBuf );
comQueSend_copyIn ( this->nBytesPending, this->bufs, this->reservoir,
static_cast <const dbr_short_t *> ( pValue ), nElem );
}
void comQueSend::copy_dbr_string ( bufferReservoir &reservoir, const void *pValue, unsigned nElem )
void comQueSend::copy_dbr_float ( const void *pValue, unsigned nElem )
{
comQueSend_copyIn ( this->bufs, reservoir, static_cast <const dbr_string_t *> ( pValue ), nElem );
comQueSend_copyIn ( this->nBytesPending, this->bufs, this->reservoir,
static_cast <const dbr_float_t *> ( pValue ), nElem );
}
void comQueSend::copy_dbr_short ( bufferReservoir &reservoir, const void *pValue, unsigned nElem )
void comQueSend::copy_dbr_char ( const void *pValue, unsigned nElem )
{
comQueSend_copyIn ( this->bufs, reservoir, static_cast <const dbr_short_t *> ( pValue ), nElem );
comQueSend_copyIn ( this->nBytesPending, this->bufs, this->reservoir,
static_cast <const dbr_char_t *> ( pValue ), nElem );
}
void comQueSend::copy_dbr_float ( bufferReservoir &reservoir, const void *pValue, unsigned nElem )
void comQueSend::copy_dbr_long ( const void *pValue, unsigned nElem )
{
comQueSend_copyIn ( this->bufs, reservoir, static_cast <const dbr_float_t *> ( pValue ), nElem );
comQueSend_copyIn ( this->nBytesPending, this->bufs, this->reservoir,
static_cast <const dbr_long_t *> ( pValue ), nElem );
}
void comQueSend::copy_dbr_char ( bufferReservoir &reservoir, const void *pValue, unsigned nElem )
void comQueSend::copy_dbr_double ( const void *pValue, unsigned nElem )
{
comQueSend_copyIn ( this->bufs, reservoir, static_cast <const dbr_char_t *> ( pValue ), nElem );
comQueSend_copyIn ( this->nBytesPending, this->bufs, this->reservoir,
static_cast <const dbr_double_t *> ( pValue ), nElem );
}
void comQueSend::copy_dbr_long ( bufferReservoir &reservoir, const void *pValue, unsigned nElem )
{
comQueSend_copyIn ( this->bufs, reservoir, static_cast <const dbr_long_t *> ( pValue ), nElem );
}
void comQueSend::copy_dbr_double ( bufferReservoir &reservoir, const void *pValue, unsigned nElem )
{
comQueSend_copyIn ( this->bufs, reservoir, static_cast <const dbr_double_t *> ( pValue ), nElem );
}
const comQueSend::copyFunc_t comQueSend::dbrCopyVector [] = {
const comQueSend::copyFunc_t comQueSend::dbrCopyVector [39] = {
&comQueSend::copy_dbr_string,
&comQueSend::copy_dbr_short,
&comQueSend::copy_dbr_float,
@@ -282,536 +218,3 @@ const comQueSend::copyFunc_t comQueSend::dbrCopyVector [] = {
0 // DBR_CLASS_NAME
};
comQueSend::~comQueSend ()
{
comBuf *pBuf;
this->mutex.lock ();
while ( ( pBuf = this->bufs.get () ) ) {
pBuf->destroy ();
}
this->mutex.unlock ();
}
unsigned comQueSend::occupiedBytes () const
{
this->mutex.lock ();
unsigned count = this->bufs.count ();
unsigned nBytes;
if ( count >= 2u ) {
nBytes = this->bufs.first ()->occupiedBytes ();
nBytes += this->bufs.last ()->occupiedBytes ();
nBytes += ( count - 2u ) * comBuf::maxBytes ();
}
else if ( count == 1u ) {
nBytes = this->bufs.first ()->occupiedBytes ();
}
else {
nBytes = 0u;
}
this->mutex.unlock ();
return nBytes;
}
bool comQueSend::flushToWire ( bool enablePreemptionDuringFlush )
{
bool success = false;
// the recv thread is not permitted to flush as this
// can result in a push / pull deadlock on the TCP pipe,
// but in that case this does schedual the flush through
// the higher priority send thread
if ( ! this->flushToWirePermit () ) {
return true;
}
// this approach requires that only one thread at a time
// performs flushes but its advantage is that the primary
// lock is not held while sending and this prevents deadlocks
this->flushMutex.lock ();
while ( true ) {
this->mutex.lock ();
comBuf * pBuf = this->bufs.get ();
this->mutex.unlock ();
if ( ! pBuf ) {
success = true;
break;
}
success = pBuf->flushToWire ( *this, enablePreemptionDuringFlush );
pBuf->destroy ();
if ( ! success ) {
this->mutex.lock ();
while ( ( pBuf = this->bufs.get () ) ) {
pBuf->destroy ();
}
this->mutex.unlock ();
break;
}
}
this->flushMutex.unlock ();
return success;
}
int comQueSend::writeRequest ( unsigned serverId, unsigned type, unsigned nElem, const void *pValue )
{
bufferReservoir reservoir;
unsigned size, postcnt;
bool stringOptim;
if ( type < 0 || type >= NELEMENTS ( this->dbrCopyVector ) ) {
return ECA_BADTYPE;
}
if ( ! this->dbrCopyVector [type] ) {
return ECA_BADTYPE;
}
if ( nElem > 0xffff) {
return ECA_BADCOUNT;
}
if ( type == DBR_STRING && nElem == 1 ) {
char *pstr = (char *) pValue;
size = strlen ( pstr ) +1;
stringOptim = true;
}
else {
size = dbr_size_n ( type, nElem );
stringOptim = false;
}
postcnt = CA_MESSAGE_ALIGN ( size );
if ( postcnt > 0xffff ) {
return ECA_BADCOUNT;
}
assert ( serverId <= 0xffffffff );
int status = this->lockAndReserveSpace ( postcnt + 16u, reservoir, true );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_WRITE ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( postcnt ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( type ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( nElem ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( serverId ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( ~0UL ) ); // available
if ( stringOptim ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <const unsigned char *> ( pValue ), size );
}
else {
( this->*dbrCopyVector [type] ) ( reservoir, pValue, nElem );
}
comQueSend_copyIn ( this->bufs, reservoir, nillBytes, postcnt - size );
this->mutex.unlock ();
}
return status;
}
int comQueSend::writeNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, const void *pValue )
{
bufferReservoir reservoir;
ca_uint32_t size, postcnt;
if ( type < 0 || type >= NELEMENTS ( this->dbrCopyVector ) ) {
return ECA_BADTYPE;
}
if ( ! this->dbrCopyVector [type] ) {
return ECA_BADTYPE;
}
if ( nElem > 0xffff) {
return ECA_BADCOUNT;
}
if ( type == DBR_STRING && nElem == 1 ) {
char *pstr = (char *) pValue;
size = strlen ( pstr ) +1;
}
else {
size = dbr_size_n ( type, nElem );
}
postcnt = CA_MESSAGE_ALIGN ( size );
if ( postcnt > 0xffff ) {
return ECA_BADCOUNT;
}
assert ( serverId <= 0xffffffff );
int status = this->lockAndReserveSpace ( postcnt + 16u, reservoir, true );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_WRITE_NOTIFY ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( postcnt ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( type ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( nElem ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( serverId ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( ioId ) ); // available
( this->*dbrCopyVector [type] ) ( reservoir, pValue, nElem );
comQueSend_copyIn ( this->bufs, reservoir, nillBytes, postcnt - size );
this->mutex.unlock ();
}
return status;
}
int comQueSend::readCopyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem )
{
bufferReservoir reservoir;
if ( nElem > 0xffff) {
return ECA_BADCOUNT;
}
if ( type > 0xffff) {
return ECA_BADTYPE;
}
assert ( serverId <= 0xffffffff );
int status = this->lockAndReserveSpace ( 16u, reservoir, true );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_READ ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( type ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( nElem ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( serverId ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( ioId ) ); // available
this->mutex.unlock ();
}
return status;
}
int comQueSend::readNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem )
{
bufferReservoir reservoir;
if ( nElem > 0xffff) {
return ECA_BADCOUNT;
}
if ( type > 0xffff) {
return ECA_BADTYPE;
}
assert ( serverId <= 0xffffffff );
int status = this->lockAndReserveSpace ( 16u, reservoir, true );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_READ_NOTIFY ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( type ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( nElem ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( serverId ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( ioId ) ); // available
this->mutex.unlock ();
}
return status;
}
int comQueSend::createChannelRequest ( unsigned id, const char *pName, unsigned nameLength )
{
bufferReservoir reservoir;
unsigned postCnt = CA_MESSAGE_ALIGN ( nameLength );
assert ( id <= 0xffffffff );
assert ( postCnt <= 0xffff );
int status = this->lockAndReserveSpace ( postCnt + 16u, reservoir, false );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_CLAIM_CIU ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( postCnt ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( id ) ); // cid
//
// The available field is used (abused)
// here to communicate the minor version number
// starting with CA 4.1.
//
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( CA_MINOR_VERSION ) ); // available
if ( nameLength ) {
comQueSend_copyIn ( this->bufs, reservoir, pName, nameLength );
}
if ( postCnt > nameLength ) {
comQueSend_copyIn ( this->bufs, reservoir, nillBytes, postCnt - nameLength );
}
this->mutex.unlock ();
}
return status;
}
int comQueSend::clearChannelRequest ( unsigned clientId, unsigned serverId )
{
bufferReservoir reservoir;
assert ( serverId <= 0xffffffff );
assert ( clientId <= 0xffffffff );
int status = this->lockAndReserveSpace ( 16u, reservoir, true );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_CLEAR_CHANNEL ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( serverId ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( clientId ) ); // available
this->mutex.unlock ();
}
return status;
}
int comQueSend::subscriptionRequest ( unsigned ioId, unsigned serverId,
unsigned type, unsigned nElem, unsigned mask, bool enablePreemptionDuringFlush )
{
bufferReservoir reservoir;
if ( nElem > 0xffff) {
return ECA_BADCOUNT;
}
if ( type > 0xffff) {
return ECA_BADTYPE;
}
assert ( serverId <= 0xffffffff );
assert ( ioId <= 0xffffffff );
int status = this->lockAndReserveSpace ( 32u, reservoir, enablePreemptionDuringFlush );
if ( status == ECA_NORMAL ) {
// header
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_EVENT_ADD ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 16u ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( type ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( nElem ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( serverId ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( ioId ) ); // available
// extension
comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_float32_t > ( 0.0 ) ); // m_lval
comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_float32_t > ( 0.0 ) ); // m_hval
comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_float32_t > ( 0.0 ) ); // m_toval
comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint16_t > ( mask ) ); // m_mask
comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint16_t > ( 0u ) ); // m_pad
this->mutex.unlock ();
}
return status;
}
int comQueSend::subscriptionCancelRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem )
{
bufferReservoir reservoir;
assert ( type <= 0xffff );
assert ( nElem <= 0xffff );
assert ( serverId <= 0xffffffff );
assert ( ioId <= 0xffffffff );
int status = this->lockAndReserveSpace ( 16u, reservoir, true );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint16_t > ( CA_PROTO_EVENT_CANCEL ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint16_t > ( 0u ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint16_t > ( type ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint16_t > ( nElem ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint32_t > ( serverId ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast < ca_uint32_t > ( ioId ) ); // available
this->mutex.unlock ();
}
return status;
}
int comQueSend::disableFlowControlRequest ()
{
bufferReservoir reservoir;
int status = this->lockAndReserveSpace ( 16u, reservoir, false );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_EVENTS_ON ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // available
this->mutex.unlock ();
}
return status;
}
int comQueSend::enableFlowControlRequest ()
{
bufferReservoir reservoir;
int status = this->lockAndReserveSpace ( 16u, reservoir, false );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_EVENTS_OFF ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // available
this->mutex.unlock ();
}
return status;
}
int comQueSend::noopRequest ()
{
bufferReservoir reservoir;
int status = this->lockAndReserveSpace ( 16u, reservoir, false );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_NOOP ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // available
this->mutex.unlock ();
}
return status;
}
int comQueSend::echoRequest ()
{
bufferReservoir reservoir;
int status = this->lockAndReserveSpace ( 16u, reservoir, false );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_ECHO ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // available
this->mutex.unlock ();
}
return status;
}
int comQueSend::hostNameSetRequest ( const char *pName )
{
bufferReservoir reservoir;
unsigned size = strlen ( pName ) + 1u;
unsigned postSize = CA_MESSAGE_ALIGN ( size );
assert ( postSize < 0xffff );
int status = this->lockAndReserveSpace ( postSize + 16u, reservoir, false );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_HOST_NAME ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( postSize ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // available
comQueSend_copyIn ( this->bufs, reservoir, pName, size );
comQueSend_copyIn ( this->bufs, reservoir, nillBytes, postSize - size );
this->mutex.unlock ();
}
return status;
}
int comQueSend::userNameSetRequest ( const char *pName )
{
bufferReservoir reservoir;
unsigned size = strlen ( pName ) + 1u;
unsigned postSize = CA_MESSAGE_ALIGN ( size );
assert ( postSize < 0xffff );
int status = this->lockAndReserveSpace ( postSize + 16u, reservoir, false );
if ( status == ECA_NORMAL ) {
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( CA_PROTO_CLIENT_NAME ) ); // cmd
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( postSize ) ); // postsize
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // dataType
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint16_t> ( 0u ) ); // count
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // cid
comQueSend_copyIn ( this->bufs, reservoir, static_cast <ca_uint32_t> ( 0u ) ); // available
comQueSend_copyIn ( this->bufs, reservoir, pName, size );
comQueSend_copyIn ( this->bufs, reservoir, nillBytes, postSize - size );
this->mutex.unlock ();
}
return status;
}
#ifdef JUNKYARD
/*
* tcpiiu::pushStreamMsg ()
*/
int tcpiiu::pushStreamMsg ( const caHdr &hdr, const void *pext, unsigned extsize )
{
unsigned alignedExtSize;
bool status;
msgDescriptor msgs[3];
caHdr msgHdr = hdr;
if ( extsize > 0xffff - 7 ) {
return ECA_TOLARGE;
}
alignedExtSize = CA_MESSAGE_ALIGN ( extsize );
msgHdr.m_postsize = htons ( alignedExtSize );
debugPrintf ( (
"CAC: Request => cmmd=%x cid=0x%x type=%x count=%x postsize=%x\n",
hdr.m_cmmd, hdr.m_cid, hdr.m_dataType,
hdr.m_count, hdr.m_postsize ) );
msgs[0].pBuf = &msgHdr;
msgs[0].nBytes = sizeof ( msgHdr );
msgs[1].pBuf = pext;
msgs[1].nBytes = extsize;
if ( alignedExtSize > extsize ) {
unsigned diff = alignedExtSize - extsize;
assert ( diff <= sizeof ( nullBuff ) );
msgs[2].pBuf = nullBuff;
msgs[2].nBytes = diff;
status = this->copyInBytes ( msgs, 3u );
}
else {
status = this->copyInBytes ( msgs, 2u );
}
if ( status ) {
return ECA_NORMAL;
}
else {
this->shutdown ();
return ECA_ALLOCMEM;
}
}
/*
* tcpiiu::pushStreamMsg ()
*/
int tcpiiu::pushStreamMsg ( const caHdr &hdr )
{
caHdr msgHdr = hdr;
msgHdr.m_postsize = htons ( 0 );
debugPrintf ( (
"CAC: Request => cmmd=%x cid=0x%x type=%x count=%x postsize=%x\n",
hdr.m_cmmd, hdr.m_cid, hdr.m_dataType,
hdr.m_count, hdr.m_postsize ) );
bool status = this->copyIn ( msgHdr );
if ( status ) {
return ECA_NORMAL;
}
else {
this->shutdown ();
return ECA_ALLOCMEM;
}
}
#endif
+142
View File
@@ -18,5 +18,147 @@
#ifndef comQueSend_ILh
#define comQueSend_ILh
inline void bufferReservoir::drain ()
{
comBuf *pBuf;
while ( ( pBuf = this->reservedBufs.get () ) ) {
pBuf->destroy ();
}
}
inline bool comQueSend::dbr_type_ok ( unsigned type )
{
if ( type >= ( sizeof ( this->dbrCopyVector ) / sizeof ( this->dbrCopyVector[0] ) ) ) {
return false;
}
if ( ! this->dbrCopyVector [type] ) {
return false;
}
return true;
}
//
// 1) This routine does not return status because of the following
// argument. The routine can fail because the wire disconnects or
// because their isnt memory to create a buffer. For the former we
// just discard the message, but do not fail. For the latter we
// shutdown() the connection and discard the rest of the message
// (this eliminates the possibility of message fragments getting
// onto the wire).
//
// 2) Arguments here are a bit verbose until compilers all implement
// member template functions.
//
template < class T >
inline void comQueSend_copyIn ( unsigned &nBytesPending,
tsDLList < comBuf > &comBufList, bufferReservoir &reservoir,
const T *pVal, unsigned nElem )
{
unsigned nCopied;
nBytesPending += sizeof ( T ) * nElem;
comBuf *pComBuf = comBufList.last ();
if ( pComBuf ) {
nCopied = pComBuf->copyIn ( pVal, nElem );
}
else {
nCopied = 0u;
}
while ( nElem > nCopied ) {
pComBuf = reservoir.fetchOneBuffer ();
//
// This fails only if space was not preallocated.
// See comments at the top of this program on
// why space must always be preallocated.
//
assert ( pComBuf );
nCopied += pComBuf->copyIn ( &pVal[nCopied], nElem - nCopied );
comBufList.add ( *pComBuf );
}
}
template < class T >
inline void comQueSend_copyIn ( unsigned &nBytesPending,
tsDLList < comBuf > &comBufList, bufferReservoir &reservoir,
const T &val )
{
nBytesPending += sizeof ( T );
comBuf *pComBuf = comBufList.last ();
if ( pComBuf ) {
if ( pComBuf->copyIn ( &val, 1u ) >= 1u ) {
return;
}
}
pComBuf = reservoir.fetchOneBuffer ();
//
// This fails only if space was not preallocated.
// See comments at the top of this program on
// space must always be preallocated.
//
assert ( pComBuf );
pComBuf->copyIn ( &val, 1u );
comBufList.add ( *pComBuf );
}
inline void comQueSend::pushUInt16 ( const ca_uint16_t value )
{
comQueSend_copyIn ( this->nBytesPending,
this->bufs, this->reservoir, value );
}
inline void comQueSend::pushUInt32 ( const ca_uint32_t value )
{
comQueSend_copyIn ( this->nBytesPending,
this->bufs, this->reservoir, value );
}
inline void comQueSend::pushFloat32 ( const ca_float32_t value )
{
comQueSend_copyIn ( this->nBytesPending,
this->bufs, this->reservoir, value );
}
inline void comQueSend::pushString ( const char *pVal, unsigned nElem )
{
comQueSend_copyIn ( this->nBytesPending,
this->bufs, this->reservoir, pVal, nElem );
}
// it is assumed that dbr_type_ok() was called prior to calling this routine
// to check the type code
inline void comQueSend::push_dbr_type ( unsigned type, const void *pVal, unsigned nElem )
{
( this->*dbrCopyVector [type] ) ( pVal, nElem );
}
inline unsigned comQueSend::occupiedBytes () const
{
return this->nBytesPending;
}
inline bool comQueSend::flushThreshold ( unsigned nBytesThisMsg ) const
{
return ( this->nBytesPending + nBytesThisMsg > 4 * comBuf::maxBytes () );
}
inline comBuf * comQueSend::popNextComBufToSend ()
{
comBuf *pBuf = this->bufs.get ();
if ( pBuf ) {
unsigned nBytesThisBuf = pBuf->occupiedBytes ();
assert ( this->nBytesPending >= nBytesThisBuf );
this->nBytesPending -= pBuf->occupiedBytes ();
}
else {
assert ( this->nBytesPending == 0u );
}
return pBuf;
}
#endif // comQueSend_ILh
+24 -16
View File
@@ -16,42 +16,50 @@
#include "iocinf.h"
tsFreeList < hostNameCache, 16 > hostNameCache::freeList;
hostNameCache::hostNameCache ( const osiSockAddr &addr, ipAddrToAsciiEngine &engine ) :
ipAddrToAsciiAsynchronous ( addr ),
pHostName ( 0u )
ioComplete ( false )
{
this->ioInitiate ( engine );
}
void hostNameCache::destroy ()
{
delete this;
}
hostNameCache::~hostNameCache ()
{
if ( this->pHostName ) {
delete [] this->pHostName;
}
}
void hostNameCache::ioCompletionNotify ( const char *pHostNameIn )
{
if ( ! this->pHostName ) {
unsigned size = strlen ( pHostNameIn ) + 1u;
char *pTmp = new char [size];
if ( ! pTmp ) {
// we fail over to using the IP address for the name
return;
}
strcpy ( pTmp, pHostNameIn );
this->pHostName = pTmp;
if ( ! this->ioComplete ) {
strncpy ( this->hostNameBuf, pHostNameIn, sizeof ( this->hostNameBuf ) );
this->hostNameBuf[ sizeof ( this->hostNameBuf ) - 1 ] = '\0';
}
}
void hostNameCache::hostName ( char *pBuf, unsigned bufSize ) const
{
if ( this->pHostName ) {
strncpy ( pBuf, this->pHostName, bufSize);
if ( this->ioComplete ) {
strncpy ( pBuf, this->hostNameBuf, bufSize);
pBuf [ bufSize - 1u ] = '\0';
}
else {
osiSockAddr tmpAddr = this->address ();
sockAddrToDottedIP ( &tmpAddr.sa, pBuf, bufSize );
}
pBuf [ bufSize - 1u ] = '\0';
}
void * hostNameCache::operator new ( size_t size )
{
return hostNameCache::freeList.allocate ( size );
}
void hostNameCache::operator delete ( void *pCadaver, size_t size )
{
hostNameCache::freeList.release ( pCadaver, size );
}
+266 -243
View File
@@ -108,12 +108,25 @@
static const unsigned comBufSize = 0x4000;
class wireSendAdapter {
public:
virtual unsigned sendBytes ( const void *pBuf,
unsigned nBytesInBuf ) = 0;
};
class wireRecvAdapter {
public:
virtual unsigned recvBytes ( void *pBuf,
unsigned nBytesInBuf ) = 0;
};
class comBuf : public tsDLNode < comBuf > {
public:
comBuf ();
void destroy ();
unsigned unoccupiedBytes () const;
unsigned occupiedBytes () const;
void compress ();
static unsigned maxBytes ();
unsigned copyInBytes ( const void *pBuf, unsigned nBytes );
unsigned copyIn ( comBuf & );
@@ -132,8 +145,8 @@ public:
unsigned removeBytes ( unsigned nBytes );
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
bool flushToWire ( class comQueSend &, bool enablePreemptionDuringFlush );
unsigned fillFromWire ( class comQueRecv & );
bool flushToWire ( wireSendAdapter & );
unsigned fillFromWire ( wireRecvAdapter & );
private:
static tsFreeList < class comBuf, 0x20 > freeList;
@@ -150,76 +163,64 @@ struct msgDescriptor {
unsigned nBytes;
};
template < class T >
void comQueSend_copyIn ( comQueSend &que, const T *pVal, unsigned nElem );
template < class T >
void comQueSend_copyIn ( comQueSend &que, const T &val );
class bufferReservoir {
public:
~bufferReservoir ();
bool addOneBuffer ();
comBuf *fetchOneBuffer ();
unsigned nBytes ();
void drain ();
private:
tsDLList < comBuf > reservedBufs;
};
class comQueSend {
public:
virtual ~comQueSend ();
comQueSend ( wireSendAdapter & );
~comQueSend ();
void clear ();
int reserveSpace ( unsigned msgSize );
unsigned occupiedBytes () const;
bool flushToWire ( bool enablePreemptionDuringFlush );
int writeRequest ( unsigned serverId, unsigned type, unsigned nElem, const void *pValue );
int writeNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, const void *pValue );
int readCopyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem );
int readNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem );
int createChannelRequest ( unsigned clientId, const char *pName, unsigned nameLength );
int clearChannelRequest ( unsigned clientId, unsigned serverId );
int subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type,
unsigned nElem, unsigned mask, bool enablePreemptionDuringFlush );
int subscriptionCancelRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem );
int disableFlowControlRequest ();
int enableFlowControlRequest ();
int echoRequest ();
int noopRequest ();
int hostNameSetRequest ( const char *pName );
int userNameSetRequest ( const char *pName );
bool flushThreshold ( unsigned nBytesThisMsg ) const;
bool dbr_type_ok ( unsigned type );
virtual unsigned sendBytes ( const void *pBuf,
unsigned nBytesInBuf, bool enablePreemptionDuringFlush ) = 0;
void pushUInt16 ( const ca_uint16_t value );
void pushUInt32 ( const ca_uint32_t value );
void pushFloat32 ( const ca_float32_t value );
void pushString ( const char *pVal, unsigned nElem );
void push_dbr_type ( unsigned type, const void *pVal, unsigned nElem );
comBuf * popNextComBufToSend ();
private:
int lockAndReserveSpace ( unsigned msgSize, bufferReservoir &, bool enablePreemptionDuringFlush );
virtual bool flushToWirePermit () = 0;
void copy_dbr_string ( bufferReservoir &, const void *pValue, unsigned nElem );
void copy_dbr_short ( bufferReservoir &, const void *pValue, unsigned nElem );
void copy_dbr_float ( bufferReservoir &, const void *pValue, unsigned nElem );
void copy_dbr_char ( bufferReservoir &, const void *pValue, unsigned nElem );
void copy_dbr_long ( bufferReservoir &, const void *pValue, unsigned nElem );
void copy_dbr_double ( bufferReservoir &, const void *pValue, unsigned nElem );
void copy_dbr_string ( const void *pValue, unsigned nElem );
void copy_dbr_short ( const void *pValue, unsigned nElem );
void copy_dbr_float ( const void *pValue, unsigned nElem );
void copy_dbr_char ( const void *pValue, unsigned nElem );
void copy_dbr_long ( const void *pValue, unsigned nElem );
void copy_dbr_double ( const void *pValue, unsigned nElem );
wireSendAdapter & wire;
tsDLList < comBuf > bufs;
osiMutex mutex;
osiMutex flushMutex; // only one thread flushes at a time
bufferReservoir reservoir;
unsigned nBytesPending;
typedef void ( comQueSend::*copyFunc_t ) ( bufferReservoir &, const void *pValue, unsigned nElem );
static const copyFunc_t dbrCopyVector [];
typedef void ( comQueSend::*copyFunc_t ) (
const void *pValue, unsigned nElem );
static const copyFunc_t dbrCopyVector [39];
};
class comQueRecv {
public:
virtual ~comQueRecv ();
comQueRecv ();
~comQueRecv ();
unsigned occupiedBytes () const;
bool copyOutBytes ( void *pBuf, unsigned nBytes );
unsigned fillFromWire ();
virtual unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf ) = 0;
void pushLastComBufReceived ( comBuf & );
void clear ();
private:
tsDLList < comBuf > bufs;
osiMutex mutex;
};
class caClient {
@@ -229,7 +230,6 @@ public:
virtual void exceptionNotify (int status, const char *pContext,
unsigned type, unsigned long count,
const char *pFileName, unsigned lineNo) = 0;
private:
};
class netiiu;
@@ -242,37 +242,40 @@ private:
friend class nciu;
};
class cac;
class tcpiiu;
class baseNMIU;
class cacPrivateListOfIO {
public:
cacPrivateListOfIO ( cac & );
void destroyAllIO ();
void subscribeAllIO ();
void disconnectAllIO ( const char *pHostName );
void addIO ( baseNMIU &io );
void removeIO ( baseNMIU &io );
protected:
cac &cacCtx;
//
// fields in class nciu which really belong to tcpiiu
//
class tcpiiuPrivateListOfIO {
private:
friend tcpiiu;
tsDLList < class baseNMIU > eventq;
};
class nciu : public cacChannelIO, public tsDLNode < nciu >,
public chronIntIdRes < nciu >, private cacPrivateListOfIO {
public chronIntIdRes < nciu >, public tcpiiuPrivateListOfIO {
public:
nciu ( class cac &cac, cacChannel &chan, const char *pNameIn );
nciu ( class cac &, netiiu &,
cacChannel &, const char *pNameIn );
void destroy ();
void connect ( unsigned nativeType, unsigned long nativeCount, unsigned sid );
void cacDestroy ();
void connect ( unsigned nativeType,
unsigned long nativeCount, unsigned sid );
void connect ();
void disconnect ();
void searchReplySetUp ( unsigned sid, unsigned typeCode, unsigned long count );
int read ( unsigned type, unsigned long count, void *pValue );
int read ( unsigned type, unsigned long count, cacNotify &notify );
int write ( unsigned type, unsigned long count, const void *pValue );
int write ( unsigned type, unsigned long count, const void *pValue, cacNotify & );
int subscribe ( unsigned type, unsigned long count, unsigned mask, cacNotify &notify );
void disconnect ( netiiu &newiiu );
int createChannelRequest ();
int read ( unsigned type,
unsigned long count, void *pValue );
int read ( unsigned type,
unsigned long count, cacNotify &notify );
int write ( unsigned type,
unsigned long count, const void *pValue );
int write ( unsigned type,
unsigned long count, const void *pValue, cacNotify & );
int subscribe ( unsigned type,
unsigned long count, unsigned mask, cacNotify &notify );
void hostName ( char *pBuf, unsigned bufLength ) const;
bool ca_v42_ok () const;
short nativeType () const;
@@ -280,45 +283,44 @@ public:
channel_state state () const;
caar accessRights () const;
const char *pName () const;
unsigned nameLen () const;
unsigned searchAttempts () const;
bool connected () const;
bool claimSent () const;
unsigned readSequence () const;
void incrementOutstandingIO ();
void decrementOutstandingIO ();
void decrementOutstandingIO ( unsigned seqNumber );
bool searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel );
void subscriptionCancelMsg ( ca_uint32_t clientId );
bool searchMsg ( unsigned short retrySeqNumber,
unsigned &retryNoForThisChannel );
void subscriptionCancelMsg ( class netSubscription &subsc );
bool fullyConstructed () const;
bool connectionInProgress ( const osiSockAddr & );
bool isAttachedToVirtaulCircuit ( const osiSockAddr & );
bool identifierEquivelence ( unsigned idToMatch );
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
int subscriptionMsg ( unsigned subscriptionId,
unsigned typeIn, unsigned long countIn, unsigned short maskIn,
bool enablePreemptionDuringFlush );
int subscriptionMsg ( netSubscription &, bool userThread );
void resetRetryCount ();
unsigned getRetrySeqNo () const;
void accessRightsStateChange ( const caar &arIn );
unsigned getSID () const;
void attachChanToIIU ( netiiu &iiu );
void detachChanFromIIU ();
void ioInstall ( class baseNMIU & );
void ioUninstall ( class baseNMIU & );
bool setClaimMsgCache ( class claimMsgCache & );
ca_uint32_t getSID () const;
ca_uint32_t getCID () const;
netiiu * getPIIU ();
void searchReplySetUp ( netiiu &iiu, unsigned sidIn,
unsigned typeIn, unsigned long countIn );
void show ( unsigned level ) const;
bool verifyIIU ( netiiu & );
bool verifyConnected ( netiiu & );
private:
cac &cacCtx;
caar ar; // access rights
unsigned count;
char *pNameStr;
netiiu *piiu;
unsigned sid; // server id
ca_uint32_t sid; // server id
unsigned retry; // search retry number
mutable unsigned short ptrLockCount; // number of times IIU pointer was locked
mutable unsigned short ptrUnlockWaitCount; // number of threads waiting for IIU pointer unlock
unsigned short retrySeqNo; // search retry seq number
unsigned short nameLength; // channel name length
unsigned short typeCode;
@@ -332,17 +334,17 @@ private:
~nciu (); // force pool allocation
void lock () const;
void unlock () const;
void lockPIIU () const;
void unlockPIIU () const;
void lockOutstandingIO () const;
void unlockOutstandingIO () const;
const char * pHostName () const; // deprecated - please do not use
};
class baseNMIU : public tsDLNode < baseNMIU >, public chronIntIdRes < baseNMIU > {
class baseNMIU : public tsDLNode < baseNMIU >,
public chronIntIdRes < baseNMIU > {
public:
baseNMIU ( nciu &chan );
class cacChannelIO & channelIO () const;
virtual ~baseNMIU () = 0;
ca_uint32_t getID () const;
virtual void completionNotify () = 0;
virtual void completionNotify ( unsigned type, unsigned long count, const void *pData ) = 0;
virtual void exceptionNotify ( int status, const char *pContext ) = 0;
@@ -350,98 +352,94 @@ public:
virtual void disconnect ( const char *pHostName ) = 0;
virtual void show ( unsigned level ) const;
virtual int subscriptionMsg ();
virtual void subscriptionCancelMsg ();
nciu & channel ();
void destroy ();
void uninstallFromChannel ();
protected:
virtual ~baseNMIU (); // must be allocated from pool
nciu &chan;
bool attachedToChannel;
static osiMutex mutex;
};
class netSubscription : private cacNotifyIO, private baseNMIU {
class netSubscription : private cacNotifyIO, public baseNMIU {
public:
netSubscription ( nciu &chan, unsigned type, unsigned long count,
unsigned mask, cacNotify &notify );
void disconnect ( const char *pHostName );
static bool factory ( nciu &chan, chtype type, unsigned long count,
unsigned short mask, cacNotify &notify, unsigned &id );
void show ( unsigned level ) const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
void destroy ();
unsigned long getCount ();
unsigned getType ();
unsigned getMask ();
private:
chtype type;
unsigned long count;
unsigned short mask;
unsigned type;
unsigned mask;
netSubscription ( nciu &chan, chtype type, unsigned long count,
unsigned short mask, cacNotify &notify );
~netSubscription ();
void completionNotify ();
void completionNotify ( unsigned type, unsigned long count, const void *pData );
void exceptionNotify ( int status, const char *pContext );
void exceptionNotify ( int status, const char *pContext, unsigned type, unsigned long count );
int subscriptionMsg ();
void destroy ();
void subscriptionCancelMsg ();
~netSubscription ();
static tsFreeList < class netSubscription, 1024 > freeList;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
};
class netReadCopyIO : private baseNMIU {
class netReadCopyIO : public baseNMIU {
public:
netReadCopyIO ( nciu &chan, unsigned type, unsigned long count,
void *pValue, unsigned seqNumber );
void disconnect ( const char *pHostName );
static bool factory ( nciu &chan, unsigned type, unsigned long count,
void *pValue, unsigned seqNumber, ca_uint32_t &id );
void show ( unsigned level ) const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
private:
unsigned type;
unsigned long count;
void *pValue;
unsigned seqNumber;
netReadCopyIO ( nciu &chan, unsigned type, unsigned long count,
void *pValue, unsigned seqNumber );
void destroy ();
void completionNotify ();
void completionNotify ( unsigned type, unsigned long count, const void *pData );
void exceptionNotify ( int status, const char *pContext );
void exceptionNotify ( int status, const char *pContext, unsigned type, unsigned long count );
~netReadCopyIO (); // must be allocated from pool
void destroy ();
void completionNotify ();
void completionNotify ( unsigned type, unsigned long count, const void *pData );
void exceptionNotify ( int status, const char *pContext );
void exceptionNotify ( int status, const char *pContext, unsigned type, unsigned long count );
static tsFreeList < class netReadCopyIO, 1024 > freeList;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
};
class netReadNotifyIO : public cacNotifyIO, private baseNMIU {
class netReadNotifyIO : public cacNotifyIO, public baseNMIU {
public:
void disconnect ( const char *pHostName );
static bool factory ( nciu &chan, cacNotify &notify, ca_uint32_t &id );
void show ( unsigned level ) const;
private:
netReadNotifyIO ( nciu &chan, cacNotify &notify );
~netReadNotifyIO ();
void disconnect ( const char *pHostName );
void show ( unsigned level ) const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
private:
void destroy ();
void completionNotify ();
void completionNotify ( unsigned type, unsigned long count, const void *pData );
void exceptionNotify ( int status, const char *pContext );
void exceptionNotify ( int status, const char *pContext, unsigned type, unsigned long count );
~netReadNotifyIO ();
static tsFreeList < class netReadNotifyIO, 1024 > freeList;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
};
class netWriteNotifyIO : public cacNotifyIO, private baseNMIU {
class netWriteNotifyIO : public cacNotifyIO, public baseNMIU {
public:
void disconnect ( const char *pHostName );
static bool factory ( nciu &chan, cacNotify &notify, ca_uint32_t &id );
void show ( unsigned level ) const;
private:
netWriteNotifyIO ( nciu &chan, cacNotify &notify );
~netWriteNotifyIO ();
void disconnect ( const char *pHostName );
void show ( unsigned level ) const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
private:
void destroy ();
void completionNotify ();
void completionNotify ( unsigned type, unsigned long count, const void *pData );
void exceptionNotify ( int status, const char *pContext );
void exceptionNotify ( int status, const char *pContext, unsigned type, unsigned long count );
~netWriteNotifyIO ();
static tsFreeList < class netWriteNotifyIO, 1024 > freeList;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
};
/*
@@ -494,47 +492,50 @@ extern threadPrivateId cacRecursionLock;
class netiiu {
public:
netiiu ( class cac &cac );
netiiu ( class cac * );
virtual ~netiiu ();
void show ( unsigned level ) const;
unsigned channelCount () const;
cac & clientCtx () const;
void disconnectAllChan ();
void detachAllChan ();
void disconnectAllChan ( netiiu & newiiu );
void connectTimeoutNotify ();
void sendPendingClaims ( bool v42Ok, class claimMsgCache &cache );
bool searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel );
void resetChannelRetryCounts ();
virtual void hostName (char *pBuf, unsigned bufLength) const = 0;
virtual const char * pHostName () const = 0; // deprecated - please do not use
virtual bool connectionInProgress ( const char *pChannelName, const osiSockAddr &addr ) const;
void attachChannel ( nciu &chan );
void detachChannel ( nciu &chan );
virtual void hostName (char *pBuf, unsigned bufLength) const;
virtual const char * pHostName () const; // deprecated - please do not use
virtual bool isVirtaulCircuit ( const char *pChannelName, const osiSockAddr &addr ) const;
virtual bool ca_v42_ok () const;
virtual bool ca_v41_ok () const;
virtual bool pushDatagramMsg (const caHdr &hdr, const void *pExt, ca_uint16_t extsize);
virtual int writeRequest ( unsigned serverId, unsigned type, unsigned nElem, const void *pValue);
virtual int writeNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, const void *pValue );
virtual int readCopyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem );
virtual int readNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem );
virtual int subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type,
unsigned nElem, unsigned mask, bool enablePreemptionDuringFlush );
virtual int subscriptionCancelRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem );
virtual int createChannelRequest ( unsigned clientId, const char *pName, unsigned nameLength );
virtual int clearChannelRequest ( unsigned clientId, unsigned serverId );
virtual bool pushDatagramMsg ( const caHdr &hdr, const void *pExt, ca_uint16_t extsize);
virtual int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue);
virtual int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue );
virtual int readCopyRequest ( nciu &, unsigned type, unsigned nElem, void *pValue );
virtual int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem );
virtual int subscriptionRequest ( netSubscription &subscr, bool userThread );
virtual int subscriptionCancelRequest ( netSubscription &subscr );
virtual void subscribeAllIO ( nciu &chan );
virtual int createChannelRequest ( nciu & );
virtual void disconnectAllIO ( nciu &chan );
virtual int clearChannelRequest ( nciu & );
protected:
void lock () const;
void unlock () const;
cac * pCAC () const;
mutable osiMutex mutex;
private:
tsDLList < nciu > channelList;
class cac &cacRef;
osiMutex mutex;
friend class nciu;
class cac *pClientCtx;
virtual void lastChannelDetachNotify ();
};
class limboiiu : public netiiu {
public:
limboiiu::limboiiu ();
};
extern limboiiu limboIIU;
class udpiiu;
class searchTimer : private osiTimer, private osiMutex {
@@ -599,6 +600,7 @@ public:
SOCKET getSock () const;
bool repeaterInstalled ();
void show ( unsigned level ) const;
bool isCurrentThread () const;
// exceptions
class noSocket {};
@@ -607,6 +609,7 @@ private:
char xmitBuf [MAX_UDP_SEND];
char recvBuf [MAX_UDP_RECV];
ELLLIST dest;
threadId recvThreadId;
semBinaryId recvThreadExitSignal;
unsigned nBytesInXmitBuf;
SOCKET sock;
@@ -615,9 +618,6 @@ private:
bool shutdownCmd;
bool sockCloseCompleted;
const char * pHostName () const; // deprecated - please do not use
void hostName ( char *pBuf, unsigned bufLength ) const;
bool pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t extsize );
friend void cacRecvThreadUDP ( void *pParam );
@@ -639,7 +639,7 @@ private:
class tcpRecvWatchdog : private osiTimer {
public:
tcpRecvWatchdog (double periodIn, osiTimerQueue & queueIn, bool echoProtocolAcceptedIn);
tcpRecvWatchdog ( double periodIn, osiTimerQueue & queueIn );
~tcpRecvWatchdog ();
void rescheduleRecvTimer ();
void messageArrivalNotify ();
@@ -656,11 +656,10 @@ private:
double delay () const;
const char *name () const;
virtual void forcedShutdown () = 0;
virtual void echoRequest () = 0;
virtual bool setEchoRequestPending () = 0;
virtual void hostName ( char *pBuf, unsigned bufLength ) const = 0;
const double period;
const bool echoProtocolAccepted;
bool responsePending;
bool beaconAnomaly;
};
@@ -703,11 +702,17 @@ private:
class hostNameCache : public ipAddrToAsciiAsynchronous {
public:
hostNameCache ( const osiSockAddr &addr, ipAddrToAsciiEngine &engine );
~hostNameCache ();
void destroy ();
void ioCompletionNotify ( const char *pHostName );
void hostName (char *pBuf, unsigned bufLength) const;
void hostName ( char *pBuf, unsigned bufLength ) const;
void * operator new ( size_t size );
void operator delete ( void *pCadaver, size_t size );
private:
char *pHostName;
bool ioComplete;
char hostNameBuf [128];
~hostNameCache ();
static tsFreeList < class hostNameCache, 16 > freeList;
};
extern "C" void cacSendThreadTCP ( void *pParam );
@@ -716,54 +721,55 @@ extern "C" void cacRecvThreadTCP ( void *pParam );
class tcpiiu :
public tcpRecvWatchdog, public tcpSendWatchdog,
public netiiu, public tsDLNode < tcpiiu >,
private comQueSend, private comQueRecv {
private wireSendAdapter, private wireRecvAdapter {
public:
tcpiiu ( cac &cac, const osiSockAddr &addrIn,
unsigned minorVersion, class bhe &bhe,
double connectionTimeout, osiTimerQueue &timerQueue,
ipAddrToAsciiEngine & );
tcpiiu ( cac &cac, double connectionTimeout, osiTimerQueue &timerQueue );
~tcpiiu ();
bool initiateConnect ( const osiSockAddr &addrIn, unsigned minorVersion,
class bhe &bhe, ipAddrToAsciiEngine &engineIn );
void connect ();
void disconnect ();
void suicide ();
void cleanShutdown ();
void forcedShutdown ();
void * operator new (size_t size);
void operator delete (void *pCadaver, size_t size);
bool fullyConstructed () const;
void flush ();
virtual void show ( unsigned level ) const;
osiSockAddr address () const;
//osiSockAddr address () const;
SOCKET getSock () const;
void echoRequest ();
bool setEchoRequestPending ();
void hostNameSetMsg ();
void userNameSetMsg ();
void processIncomingAndDestroySelfIfDisconnected ();
void installChannelPendingClaim ( nciu & );
bool ca_v44_ok () const;
void processIncoming ();
bool ca_v41_ok () const;
void connect ();
int writeRequest ( unsigned serverId, unsigned type, unsigned nElem, const void *pValue );
int writeNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem, const void *pValue );
int readCopyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem );
int readNotifyRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem );
int createChannelRequest ( unsigned clientId, const char *pName, unsigned nameLength );
int clearChannelRequest ( unsigned clientId, unsigned serverId );
int subscriptionRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem,
unsigned mask, bool enablePreemptionDuringFlush );
int subscriptionCancelRequest ( unsigned ioId, unsigned serverId, unsigned type, unsigned nElem );
bool ca_v42_ok () const;
bool ca_v44_ok () const;
int writeRequest ( nciu &, unsigned type, unsigned nElem, const void *pValue );
int writeNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem, const void *pValue );
int readCopyRequest ( nciu &, unsigned type, unsigned nElem, void *pValue );
int readNotifyRequest ( nciu &, cacNotify &, unsigned type, unsigned nElem );
int createChannelRequest ( nciu & );
int clearChannelRequest ( nciu & );
int subscriptionRequest ( netSubscription &subscr, bool userThread );
int subscriptionCancelRequest ( netSubscription &subscr );
void hostName (char *pBuf, unsigned bufLength) const;
void hostName ( char *pBuf, unsigned bufLength ) const;
const char * pHostName () const; // deprecated - please do not use
bool connectionInProgress ( const char *pChannelName, const osiSockAddr &addr ) const;
bool isVirtaulCircuit ( const char *pChannelName, const osiSockAddr &addr ) const;
bool alive () const;
bhe * getBHE () const;
private:
hostNameCache ipToA;
osiMutex flushMutex; // only one thread flushes at a time
chronIntIdResTable < baseNMIU > ioTable;
comQueSend sendQue;
comQueRecv recvQue;
osiSockAddr addr;
hostNameCache *pHostNameCache;
caHdr curMsg;
unsigned long curDataMax;
class bhe &bhe;
class bhe *pBHE;
void *pCurData;
unsigned minorProtocolVersionNumber;
unsigned minorProtocolVersion;
iiu_conn_state state;
semBinaryId sendThreadFlushSignal;
semBinaryId recvThreadRingBufferSpaceAvailableSignal;
@@ -775,18 +781,13 @@ private:
bool busyStateDetected; // only modified by the recv thread
bool flowControlActive; // only modified by the send process thread
bool echoRequestPending;
bool recvMessagePending;
bool flushPending;
bool msgHeaderAvailable;
bool claimsPending;
bool sockCloseCompleted;
bool ca_v42_ok () const;
void postMsg ();
unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf,
bool enablePreemptionDuringFlush );
unsigned sendBytes ( const void *pBuf, unsigned nBytesInBuf );
unsigned recvBytes ( void *pBuf, unsigned nBytesInBuf );
bool flushToWirePermit ();
bool flushToWire ( bool userThread );
friend void cacSendThreadTCP ( void *pParam );
friend void cacRecvThreadTCP ( void *pParam );
@@ -794,7 +795,15 @@ private:
void lastChannelDetachNotify ();
int requestStubStatus ();
// protocol stubs
// send protocol stubs
int echoRequest ();
int noopRequest ();
int disableFlowControlRequest ();
int enableFlowControlRequest ();
int hostNameSetRequest ();
int userNameSetRequest ();
// recv protocol stubs
void noopAction ();
void echoRespAction ();
void writeNotifyRespAction ();
@@ -808,11 +817,31 @@ private:
void verifyAndDisconnectChan ();
void badTCPRespAction ();
// IO management routines
//void ioInstall ( baseNMIU &io );
//void ioUninstall ( unsigned id );
//void ioDestroy ( unsigned id );
void ioCompletionNotify ( unsigned id, unsigned type,
unsigned long count, const void *pData );
void ioExceptionNotify ( unsigned id,
int status, const char *pContext );
void ioExceptionNotify ( unsigned id, int status,
const char *pContext, unsigned type, unsigned long count );
void ioCompletionNotifyAndDestroy ( unsigned id );
void ioCompletionNotifyAndDestroy ( unsigned id,
unsigned type, unsigned long count, const void *pData );
void ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext );
void ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext, unsigned type, unsigned long count );
void subscribeAllIO ( nciu &chan );
void disconnectAllIO ( nciu &chan );
typedef void ( tcpiiu::*pProtoStubTCP ) ();
static const pProtoStubTCP tcpJumpTableCAC [];
static tsFreeList < class tcpiiu, 16 > freeList;
};
#if 0
class claimMsgCache {
public:
claimMsgCache ( bool v44 );
@@ -829,6 +858,7 @@ private:
friend bool nciu::setClaimMsgCache ( class claimMsgCache & );
};
#endif
class inetAddrID {
public:
@@ -892,7 +922,7 @@ private:
class cac *pcac;
osiEvent exit;
osiEvent processingDone;
osiMutex mutex;
mutable osiMutex mutex;
unsigned enableRefCount;
unsigned blockingForCompletion;
bool processing;
@@ -974,12 +1004,6 @@ private:
friend class syncGroupNotify;
};
class cacPrivateListOfIOPrivate {
private:
osiMutex mutex;
friend class cacPrivateListOfIO;
};
class ioCounter {
public:
ioCounter ();
@@ -1000,14 +1024,32 @@ private:
osiEvent ioDone;
};
//
// mutex strategy
// 1) mutex hierarchy
//
// (if multiple lock are applied simultaneously then they
// must be applied in this order)
//
// cac::iiuListMutex
// cac::defaultMutex:
// netiiu::mutex
// nciu::mutex
// baseNMIU::mutex
//
// 2) channels can not be moved between netiiu derived classes
// w/o taking the defaultMutex in this class first
//
//
class cac : public caClient, public nciuPrivate,
public ioCounter, public cacPrivateListOfIOPrivate
public ioCounter
{
public:
cac ( bool enablePreemptiveCallback = false );
virtual ~cac ();
void flush ();
int pend ( double timeout, int early );
unsigned getInitializingThreadsPriority () const;
// beacon management
void beaconNotify ( const inetAddrID &addr );
@@ -1018,12 +1060,11 @@ public:
void repeaterSubscribeConfirmNotify ();
// IIU routines
void installIIU ( tcpiiu &iiu );
void removeIIU ( tcpiiu &iiu );
void signalRecvActivity ();
void processRecvBacklog ();
tcpiiu * constructTCPIIU ( const osiSockAddr &, unsigned minorVersion );
double connectionTimeout () const;
// IO management routines
bool ioComplete () const;
// exception routines
void exceptionNotify ( int status, const char *pContext,
@@ -1034,41 +1075,19 @@ public:
void changeExceptionEvent ( caExceptionHandler *pfunc, void *arg );
void genLocalExcepWFL ( long stat, const char *ctx, const char *pFile, unsigned lineNo );
// IO management routines
bool ioComplete () const;
void ioInstall ( baseNMIU &io );
void ioUninstall ( unsigned id );
void ioDestroy ( unsigned id );
void ioCompletionNotify ( unsigned id );
void ioCompletionNotify ( unsigned id, unsigned type,
unsigned long count, const void *pData );
void ioExceptionNotify ( unsigned id,
int status, const char *pContext );
void ioExceptionNotify ( unsigned id, int status,
const char *pContext, unsigned type, unsigned long count );
void ioCompletionNotifyAndDestroy ( unsigned id );
void ioCompletionNotifyAndDestroy ( unsigned id,
unsigned type, unsigned long count, const void *pData );
void ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext );
void ioExceptionNotifyAndDestroy ( unsigned id,
int status, const char *pContext, unsigned type, unsigned long count );
// channel routines
void connectChannel ( unsigned id );
void connectChannel ( bool v44Ok, unsigned id,
unsigned nativeType, unsigned long nativeCount, unsigned sid );
void channelDestroy ( unsigned id );
void disconnectChannel ( unsigned id );
void registerChannel ( nciu &chan );
void unregisterChannel ( nciu &chan );
bool createChannelIO ( const char *name_str, cacChannel &chan );
void lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
unsigned typeCode, unsigned long count, unsigned minorVersionNumber,
const osiSockAddr & );
void installDisconnectedChannel ( nciu &chan );
void accessRightsNotify ( unsigned id, caar );
void uninstallLocalChannel ( cacLocalChannelIO & );
void destroyNCIU ( nciu & );
// sync group routines
CASG * lookupCASG ( unsigned id );
@@ -1080,7 +1099,7 @@ public:
void enableCallbackPreemption ();
void disableCallbackPreemption ();
void notifySearchResponse ( unsigned short retrySeqNo );
bool currentThreadIsRecvProcessThread ();
bool flushPermit () const;
const char * userNamePointer ();
// diagnostics
@@ -1095,10 +1114,9 @@ private:
ipAddrToAsciiEngine ipToAEngine;
cacServiceList services;
tsDLList <tcpiiu> iiuList;
tsDLList <tcpiiu> iiuListLimbo;
tsDLList
<cacLocalChannelIO> localChanList;
chronIntIdResTable
< baseNMIU > ioTable;
chronIntIdResTable
< nciu > chanTable;
chronIntIdResTable
@@ -1107,8 +1125,10 @@ private:
< bhe, inetAddrID > beaconTable;
osiTime programBeginTime;
double connTMO;
osiMutex defaultMutex;
osiMutex iiuListMutex;
// defaultMutex can be applied if iiuListMutex is already applied
mutable osiMutex defaultMutex;
// iiuListMutex must not be applied if defaultMutex is already applied
mutable osiMutex iiuListMutex;
osiTimerQueue *pTimerQueue;
caExceptionHandler *ca_exception_func;
void *ca_exception_arg;
@@ -1120,14 +1140,17 @@ private:
udpiiu *pudpiiu;
searchTimer *pSearchTmr;
repeaterSubscribeTimer *pRepeaterSubscribeTmr;
unsigned initializingThreadsPriority;
bool enablePreemptiveCallback;
// IIU routines
tcpiiu * constructTCPIIU ( const osiSockAddr &, unsigned minorVersion );
double connectionTimeout () const;
int pendPrivate ( double timeout, int early );
bool setupUDP ();
};
extern const caHdr cacnullmsg;
/*
* CA internal functions
*/
+26
View File
@@ -0,0 +1,26 @@
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#include "iocinf.h"
limboiiu limboIIU;
limboiiu::limboiiu () : netiiu ( 0 )
{
}
+70 -269
View File
@@ -31,48 +31,47 @@
tsFreeList < class nciu, 1024 > nciu::freeList;
nciu::nciu ( cac &cacIn, cacChannel &chanIn, const char *pNameIn ) :
cacChannelIO ( chanIn ), cacPrivateListOfIO ( cacIn )
{
static const caar defaultAccessRights = { false, false };
size_t strcnt;
static const caar defaultAccessRights = { false, false };
strcnt = strlen ( pNameIn ) + 1;
nciu::nciu ( cac &cacIn, netiiu &iiuIn, cacChannel &chanIn,
const char *pNameIn ) :
cacChannelIO ( chanIn ),
cacCtx ( cacIn ),
ar ( defaultAccessRights ),
count ( 0 ),
piiu ( &iiuIn ),
sid ( UINT_MAX ),
retry ( 0u ),
retrySeqNo ( 0u ),
nameLength ( strlen ( pNameIn ) + 1 ),
typeCode ( USHRT_MAX ),
f_connected ( false ),
f_fullyConstructed ( true ),
f_previousConn ( false ),
f_claimSent ( false )
{
// second constraint is imposed by size field in protocol header
if ( strcnt > MAX_UDP_SEND - sizeof ( caHdr ) || strcnt > 0xffff ) {
if ( this->nameLength > MAX_UDP_SEND - sizeof ( caHdr ) || this->nameLength > 0xffff ) {
throwWithLocation ( caErrorCode ( ECA_STRTOBIG ) );
}
this->pNameStr = new char [ strcnt ];
this->pNameStr = new char [ this->nameLength ];
if ( ! this->pNameStr ) {
this->f_fullyConstructed = false;
return;
}
strcpy ( this->pNameStr, pNameIn );
this->piiu = 0u;
this->typeCode = USHRT_MAX; /* invalid initial type */
this->count = 0; /* invalid initial count */
this->sid = UINT_MAX; /* invalid initial server id */
this->ar = defaultAccessRights;
this->nameLength = strcnt;
this->f_previousConn = false;
this->f_connected = false;
this->f_fullyConstructed = true;
this->f_claimSent = false;
this->retry = 0u;
this->retrySeqNo = 0u;
this->ptrLockCount = 0u;
this->ptrUnlockWaitCount = 0u;
this->cacCtx.registerChannel ( *this );
this->cacCtx.installDisconnectedChannel ( *this );
chanIn.attachIO ( *this );
}
void nciu::destroy ()
{
// this occurs here so that it happens when
// a lock is not applied
this->piiu->clearChannelRequest ( *this );
this->cacCtx.destroyNCIU ( *this );
}
void nciu::cacDestroy ()
{
delete this;
}
@@ -87,29 +86,11 @@ nciu::~nciu ()
// this calls virtual functions in the cacChannelIO base
this->ioReleaseNotify ();
this->destroyAllIO ();
this->lockPIIU ();
if ( this->f_connected && this->piiu ) {
this->piiu->clearChannelRequest ( this->getId (), this->sid );
}
this->unlockPIIU ();
this->detachChanFromIIU ();
this->cacCtx.unregisterChannel ( *this );
delete [] this->pNameStr;
}
int nciu::read ( unsigned type, unsigned long countIn, cacNotify &notify )
{
int status;
unsigned idCopy;
//
// fail out if their arguments are invalid
//
@@ -129,33 +110,11 @@ int nciu::read ( unsigned type, unsigned long countIn, cacNotify &notify )
countIn = this->count;
}
bool success = netReadNotifyIO::factory ( *this, notify, idCopy );
if ( ! success ) {
return ECA_ALLOCMEM;
}
this->lockPIIU ();
if ( this->piiu ) {
status = this->piiu->readNotifyRequest ( idCopy, this->sid, type, countIn );
}
else {
status = ECA_DISCONNCHID;
}
this->unlockPIIU ();
if ( status != ECA_NORMAL ) {
this->cacCtx.ioDestroy ( id );
}
return status;
return this->piiu->readNotifyRequest ( *this, notify, type, countIn );
}
int nciu::read ( unsigned type, unsigned long countIn, void *pValue )
{
unsigned idCopy;
bool success;
int status;
/*
* fail out if channel isnt connected or arguments are
* otherwise invalid
@@ -176,26 +135,7 @@ int nciu::read ( unsigned type, unsigned long countIn, void *pValue )
countIn = this->count;
}
success = netReadCopyIO::factory ( *this, type, countIn, pValue,
this->readSequence (), idCopy );
if ( ! success ) {
return ECA_ALLOCMEM;
}
this->lockPIIU ();
if ( this->piiu ) {
status = this->piiu->readCopyRequest ( idCopy, this->sid, type, countIn );
}
else {
status = ECA_DISCONNCHID;
}
this->unlockPIIU ();
if ( status != ECA_NORMAL ) {
this->cacCtx.ioDestroy ( id );
}
return status;
return this->piiu->readCopyRequest ( *this, type, countIn, pValue );
}
/*
@@ -246,20 +186,11 @@ int nciu::write ( unsigned type, unsigned long countIn, const void *pValue )
}
}
this->lockPIIU ();
if ( this->piiu ) {
status = this->piiu->writeRequest ( this->sid, type, countIn, pValue );
}
else {
status = ECA_DISCONNCHID;
}
this->unlockPIIU ();
return status;
return this->piiu->writeRequest ( *this, type, countIn, pValue );
}
int nciu::write ( unsigned type, unsigned long countIn, const void *pValue, cacNotify &notify )
{
ca_uint32_t newId;
int status;
// check this first so thet get a decent diagnostic
@@ -282,33 +213,7 @@ int nciu::write ( unsigned type, unsigned long countIn, const void *pValue, cacN
}
}
// dont use monix pointer because monix could be deleted
// when the channel disconnects or when the IO completes
bool success = netWriteNotifyIO::factory ( *this, notify, newId );
if ( ! success ) {
return ECA_ALLOCMEM;
}
this->lockPIIU ();
if ( this->piiu ) {
status = this->piiu->writeNotifyRequest ( newId, this->sid, type, countIn, pValue );
}
else {
status = ECA_DISCONNCHID;
}
this->unlockPIIU ();
if ( status != ECA_NORMAL ) {
/*
* we need to be careful about touching the monix
* pointer after the lock has been released
*/
this->cacCtx.ioDestroy ( newId );
}
return status;
return this->piiu->writeNotifyRequest ( *this, notify, type, countIn, pValue );
}
void nciu::connect ( unsigned nativeType,
@@ -355,7 +260,7 @@ void nciu::connect ( unsigned nativeType,
this->unlock ();
// resubscribe for monitors from this channel
this->subscribeAllIO ();
this->piiu->subscribeAllIO ( *this );
this->connectNotify ();
@@ -370,14 +275,17 @@ void nciu::connect ( unsigned nativeType,
}
}
void nciu::disconnect ()
void nciu::disconnect ( netiiu &newiiu )
{
char hostNameBuf[64];
this->hostName ( hostNameBuf, sizeof (hostNameBuf) );
this->hostName ( hostNameBuf, sizeof ( hostNameBuf ) );
bool wasConnected;
this->piiu->disconnectAllIO ( *this );
this->lock ();
this->piiu = &newiiu;
this->retry = 0u;
this->typeCode = USHRT_MAX;
this->count = 0u;
@@ -396,17 +304,15 @@ void nciu::disconnect ()
this->unlock ();
if ( wasConnected ) {
/*
* look for events that have an event cancel in progress
*/
disconnectAllIO ( hostNameBuf );
this->disconnectNotify ();
this->accessRightsNotify ( this->ar );
}
this->cacCtx.installDisconnectedChannel ( *this );
this->resetRetryCount ();
}
/*
@@ -423,18 +329,8 @@ bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisCh
msg.m_count = htons ( CA_MINOR_VERSION );
msg.m_cid = this->getId ();
this->lockPIIU ();
if ( this->piiu ) {
status = this->piiu->pushDatagramMsg ( msg,
this->pNameStr, this->nameLength );
}
else {
status = false;
}
this->unlockPIIU ();
status = this->piiu->pushDatagramMsg ( msg,
this->pNameStr, this->nameLength );
if ( status ) {
//
// increment the number of times we have tried
@@ -455,30 +351,11 @@ bool nciu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisCh
return status;
}
int nciu::subscriptionMsg ( unsigned subscriptionId, unsigned typeIn,
unsigned long countIn, unsigned short maskIn,
bool enablePreemptionDuringFlush )
int nciu::subscriptionMsg ( netSubscription &subscr, bool userThread )
{
int status;
/*
* clip to the native count and set to the native count if they
* specify zero
*/
if ( countIn > this->count ){
countIn = this->count;
}
if ( this->f_connected ) {
this->lockPIIU ();
if ( this->piiu ) {
status = this->piiu->subscriptionRequest ( subscriptionId, this->sid,
typeIn, countIn, maskIn, enablePreemptionDuringFlush );
}
else {
status = ECA_NORMAL;
}
this->unlockPIIU ();
status = this->piiu->subscriptionRequest ( subscr, userThread );
}
else {
status = ECA_NORMAL;
@@ -487,64 +364,6 @@ int nciu::subscriptionMsg ( unsigned subscriptionId, unsigned typeIn,
return status;
}
void nciu::attachChanToIIU ( netiiu &iiu )
{
this->lockPIIU ();
if ( this->piiu ) {
this->piiu->mutex.lock ();
this->piiu->channelList.remove ( *this );
if ( this->piiu->channelList.count () == 0u ) {
this->piiu->lastChannelDetachNotify ();
}
this->piiu->mutex.unlock ();
}
this->unlockPIIU ();
this->lock ();
while ( this->ptrLockCount ) {
this->unlock ();
this->cacCtx.nciuPrivate::ptrLockReleaseWakeup.wait ();
this->lock ();
}
this->piiu = &iiu;
this->unlock ();
iiu.mutex.lock ();
// add to the front of the list so that search requests
// for new channels will be sent first and so that
// channels lacking a claim message prior to connecting
// are located
iiu.channelList.push ( *this );
iiu.mutex.unlock ();
}
void nciu::detachChanFromIIU ()
{
this->lockPIIU ();
if ( this->piiu ) {
this->piiu->mutex.lock ();
this->piiu->channelList.remove ( *this );
if ( this->piiu->channelList.count () == 0u ) {
this->piiu->lastChannelDetachNotify ();
}
this->piiu->mutex.unlock ();
}
this->unlockPIIU ();
this->lock ();
while ( this->ptrLockCount ) {
this->unlock ();
this->cacCtx.nciuPrivate::ptrLockReleaseWakeup.wait ();
this->lock ();
}
this->piiu = 0u;
this->unlock ();
}
void nciu::incrementOutstandingIO ()
{
this->cacCtx.incrementOutstandingIO ();
@@ -577,15 +396,7 @@ unsigned nciu::readSequence () const
void nciu::hostName ( char *pBuf, unsigned bufLength ) const
{
this->lockPIIU ();
if ( this->piiu ) {
this->piiu->hostName ( pBuf, bufLength );
}
else {
strncpy (pBuf, "<disconnected>", bufLength);
pBuf[bufLength-1] = '\0';
}
this->unlockPIIU ();
this->piiu->hostName ( pBuf, bufLength );
}
// deprecated - please do not use, this is _not_ thread safe
@@ -675,21 +486,38 @@ const char *nciu::pName () const
return this->pNameStr;
}
unsigned nciu::nameLen () const
{
return this->nameLength;
}
unsigned nciu::searchAttempts () const
{
return this->retry;
}
int nciu::subscribe ( unsigned type, unsigned long countIn,
int nciu::createChannelRequest ()
{
int status = this->piiu->createChannelRequest ( *this );
if ( status == ECA_NORMAL ) {
this->f_claimSent = true;
}
return status;
}
int nciu::subscribe ( unsigned type, unsigned long nElem,
unsigned mask, cacNotify &notify )
{
unsigned idCopy;
bool success = netSubscription::factory ( *this, type, countIn,
static_cast <unsigned short> (mask), notify, idCopy );
if ( success ) {
return ECA_NORMAL;
netSubscription *pSubcr = new netSubscription ( *this,
type, nElem, mask, notify );
if ( pSubcr ) {
int status = this->piiu->subscriptionRequest ( *pSubcr, true );
if ( status != ECA_NORMAL ) {
pSubcr->destroy ();
}
return status;
}
else {
else {
return ECA_ALLOCMEM;
}
}
@@ -718,8 +546,8 @@ void nciu::show ( unsigned level ) const
}
if ( level > 2u ) {
printf ( "\tnetwork IO pointer=%p, ptr lock count=%u, ptr unlock wait count=%u\n",
this->piiu, this->ptrLockCount, this->ptrUnlockWaitCount );
printf ( "\tnetwork IO pointer=%p\n",
this->piiu );
printf ( "\tserver identifier %u\n", this->sid );
printf ( "\tsearch retry number=%u, search retry sequence number=%u\n",
this->retry, this->retrySeqNo );
@@ -728,31 +556,4 @@ void nciu::show ( unsigned level ) const
}
}
bool nciu::setClaimMsgCache ( claimMsgCache &cache )
{
cache.clientId = this->id;
cache.serverId = this->sid;
if ( cache.v44 ) {
unsigned len = strlen ( this->pNameStr ) + 1u;
if ( cache.bufLen < len ) {
unsigned newBufLen = 2 * len;
char *pNewStr = new char [ newBufLen ];
if ( pNewStr ) {
delete [] cache.pStr;
cache.pStr = pNewStr;
cache.bufLen = newBufLen;
}
else {
return false;
}
}
strcpy ( cache.pStr, this->pNameStr );
cache.currentStrLen = len;
}
else {
cache.currentStrLen = 0u;
}
this->f_claimSent = true;
return true;
}
+31 -70
View File
@@ -61,54 +61,41 @@ inline void nciu::accessRightsStateChange ( const caar &arIn )
this->accessRightsNotify ( this->ar );
}
inline unsigned nciu::getSID () const
inline ca_uint32_t nciu::getSID () const
{
return this->sid;
}
inline ca_uint32_t nciu::getCID () const
{
return this->id;
}
//
// this routine is used to verify that the channel's
// iiu pointer and connection state has not changed
// before the iiu's mutex was applied
//
inline bool nciu::verifyConnected ( netiiu &iiuIn )
{
return ( this->piiu == &iiuIn ) && this->f_connected;
}
inline bool nciu::verifyIIU ( netiiu &iiuIn )
{
return ( this->piiu == &iiuIn );
}
inline unsigned nciu::getRetrySeqNo () const
{
return this->retrySeqNo;
}
inline void nciu::lockPIIU () const
inline void nciu::subscriptionCancelMsg ( netSubscription &subsc )
{
this->lock ();
assert ( this->ptrLockCount < USHRT_MAX );
this->ptrLockCount++;
this->unlock ();
}
inline void nciu::unlockPIIU () const
{
bool signalNeeded;
this->lock ();
assert ( this->ptrLockCount > 0 );
this->ptrLockCount--;
if ( this->ptrLockCount == 0u && this->ptrUnlockWaitCount > 0u ) {
this->ptrUnlockWaitCount--;
signalNeeded = true;
}
else {
signalNeeded = false;
}
this->unlock ();
if ( signalNeeded ) {
this->cacCtx.nciuPrivate::ptrLockReleaseWakeup.signal ();
}
}
inline void nciu::subscriptionCancelMsg ( ca_uint32_t clientId )
{
this->lockPIIU ();
if ( this->piiu ) {
this->piiu->subscriptionCancelRequest ( clientId, this->sid, this->typeCode, this->count );
}
this->unlockPIIU ();
this->piiu->subscriptionCancelRequest ( subsc );
}
// this is to only be used by early protocol revisions
@@ -117,17 +104,16 @@ inline void nciu::connect ()
this->connect ( this->typeCode, this->count, this->sid );
}
inline void nciu::searchReplySetUp ( unsigned sidIn,
inline void nciu::searchReplySetUp ( netiiu &iiu, unsigned sidIn,
unsigned typeIn, unsigned long countIn )
{
this->lock ();
this->piiu = &iiu;
this->typeCode = typeIn;
this->count = countIn;
this->sid = sidIn;
this->ar.read_access = true;
this->ar.write_access = true;
this->unlock ();
}
@@ -136,38 +122,13 @@ inline bool nciu::connected () const
return this->f_connected;
}
inline bool nciu::claimSent () const
inline bool nciu::isAttachedToVirtaulCircuit ( const osiSockAddr &addrIn )
{
return this->f_claimSent;
return this->piiu->isVirtaulCircuit ( this->pNameStr, addrIn );
}
inline bool nciu::connectionInProgress ( const osiSockAddr &addrIn )
inline netiiu * nciu::getPIIU ()
{
bool status;
this->lock ();
if ( this->piiu ) {
status = this->piiu->connectionInProgress ( this->pNameStr, addrIn );
}
else {
status = false;
}
this->unlock ();
return status;
}
inline void nciu::ioInstall ( class baseNMIU &nmiu )
{
this->addIO ( nmiu );
this->cacCtx.ioInstall ( nmiu );
}
inline void nciu::ioUninstall ( class baseNMIU &nmiu )
{
this->cacCtx.ioUninstall ( nmiu.getId () );
this->removeIO ( nmiu );
return this->piiu;
}
+2 -2
View File
@@ -18,8 +18,8 @@ tsFreeList < class netReadCopyIO, 1024 > netReadCopyIO::freeList;
netReadCopyIO::netReadCopyIO ( nciu &chanIn, unsigned typeIn, unsigned long countIn,
void *pValueIn, unsigned seqNumberIn ) :
baseNMIU (chanIn), type (typeIn), count (countIn),
pValue (pValueIn), seqNumber (seqNumberIn)
baseNMIU ( chanIn ), type ( typeIn ), count ( countIn ),
pValue ( pValueIn ), seqNumber ( seqNumberIn )
{
chanIn.incrementOutstandingIO ();
}
-20
View File
@@ -18,26 +18,6 @@
#ifndef netReadCopyIO_ILh
#define netReadCopyIO_ILh
//
// we need to be careful about exporting a raw IO
// pointer because the IO object may be deleted
// at any time when the channel disconnects or the
// IO completes
//
inline bool netReadCopyIO::factory ( nciu &chan, unsigned type,
unsigned long count, void *pValue, unsigned seqNumber, unsigned &id )
{
netReadCopyIO *pIO = new netReadCopyIO ( chan,
type, count, pValue, seqNumber );
if ( pIO ) {
id = pIO->getId ();
return true;
}
else {
return false;
}
}
inline void * netReadCopyIO::operator new ( size_t size )
{
return netReadCopyIO::freeList.allocate ( size );
+1
View File
@@ -12,6 +12,7 @@
#include "iocinf.h"
#include "netReadNotifyIO_IL.h"
#include "cacNotifyIO_IL.h"
tsFreeList < class netReadNotifyIO, 1024 > netReadNotifyIO::freeList;
+11 -11
View File
@@ -34,16 +34,16 @@ inline void netReadNotifyIO::operator delete ( void *pCadaver, size_t size )
// at any time when the channel disconnects or the
// IO completes
//
inline bool netReadNotifyIO::factory ( nciu &chan, cacNotify &notify, ca_uint32_t &id )
{
netReadNotifyIO *pIO = new netReadNotifyIO ( chan, notify );
if ( pIO ) {
id = pIO->getId ();
return true;
}
else {
return false;
}
}
//inline bool netReadNotifyIO::factory ( nciu &chan, cacNotify &notify, ca_uint32_t &id )
//{
// netReadNotifyIO *pIO = new netReadNotifyIO ( chan, notify );
// if ( pIO ) {
// id = pIO->getId ();
// return true;
// }
// else {
// return false;
// }
//}
#endif // netReadNotifyIO_ILh
+12 -9
View File
@@ -12,33 +12,36 @@
#include "iocinf.h"
#include "netSubscription_IL.h"
#include "cacNotifyIO_IL.h"
#include "nciu_IL.h"
tsFreeList < class netSubscription, 1024 > netSubscription::freeList;
netSubscription::netSubscription ( nciu &chan, chtype typeIn, unsigned long countIn,
unsigned short maskIn, cacNotify &notifyIn ) :
netSubscription::netSubscription ( nciu &chan, unsigned typeIn, unsigned long countIn,
unsigned maskIn, cacNotify &notifyIn ) :
cacNotifyIO ( notifyIn ), baseNMIU ( chan ),
type ( typeIn ), count ( countIn ), mask ( maskIn )
count ( countIn ), type ( typeIn ), mask ( maskIn )
{
this->chan.subscriptionMsg ( this->getId (), this->type,
this->count, this->mask, true );
}
netSubscription::~netSubscription ()
{
this->chan.subscriptionCancelMsg ( this->getId () );
this->chan.subscriptionCancelMsg ( *this );
}
void netSubscription::destroy()
void netSubscription::destroy ()
{
delete this;
}
int netSubscription::subscriptionMsg ()
{
return this->chan.subscriptionMsg ( this->getId (), this->type,
this->count, this->mask, false );
return this->chan.subscriptionMsg ( *this, false );
}
void netSubscription::subscriptionCancelMsg ()
{
this->chan.subscriptionCancelMsg ( *this );
}
void netSubscription::disconnect ( const char * /* pHostName */ )
+27 -18
View File
@@ -18,36 +18,45 @@
#ifndef netSubscription_ILh
#define netSubscription_ILh
inline void * netSubscription::operator new (size_t size)
inline void * netSubscription::operator new ( size_t size )
{
return netSubscription::freeList.allocate (size);
return netSubscription::freeList.allocate ( size );
}
inline void netSubscription::operator delete (void *pCadaver, size_t size)
inline void netSubscription::operator delete ( void *pCadaver, size_t size )
{
netSubscription::freeList.release (pCadaver,size);
netSubscription::freeList.release ( pCadaver, size );
}
//
// we need to be careful about exporting a raw IO
// pointer because the IO object may be deleted
// at any time when the channel disconnects or the
// IO completes
//
inline bool netSubscription::factory ( nciu &chan, chtype type, unsigned long count,
unsigned short mask, cacNotify &notify, unsigned &id )
inline unsigned long netSubscription::getCount ()
{
netSubscription *pIO = new netSubscription ( chan, type, count, mask, notify );
if ( pIO ) {
id = pIO->getId ();
return true;
if ( this->chan.connected () ) {
unsigned long nativeCount = chan.nativeElementCount ();
if ( this->count == 0u ) {
return nativeCount;
}
else if ( this->count > nativeCount ) {
return nativeCount;
}
else {
return this->count;
}
}
else {
return false;
return this->count;
}
}
inline unsigned netSubscription::getType ()
{
return this->type;
}
inline unsigned netSubscription::getMask ()
{
return this->mask;
}
#endif // netSubscription_ILh
+1
View File
@@ -12,6 +12,7 @@
#include "iocinf.h"
#include "netWriteNotifyIO_IL.h"
#include "cacNotifyIO_IL.h"
tsFreeList < class netWriteNotifyIO, 1024 > netWriteNotifyIO::freeList;
+11 -11
View File
@@ -34,16 +34,16 @@ inline void netWriteNotifyIO::operator delete ( void *pCadaver, size_t size )
// at any time when the channel disconnects or the
// IO completes
//
inline bool netWriteNotifyIO::factory ( nciu &chan, cacNotify &notify, ca_uint32_t &id )
{
netWriteNotifyIO *pIO = new netWriteNotifyIO ( chan, notify );
if ( pIO ) {
id = pIO->getId ();
return true;
}
else {
return false;
}
}
//inline bool netWriteNotifyIO::factory ( nciu &chan, cacNotify &notify, ca_uint32_t &id )
//{
// netWriteNotifyIO *pIO = new netWriteNotifyIO ( chan, notify );
// if ( pIO ) {
// id = pIO->getId ();
// return true;
// }
// else {
// return false;
// }
//}
+78 -87
View File
@@ -10,11 +10,15 @@
* Author: Jeff Hill
*/
#include <limits.h>
#include "iocinf.h"
#include "netiiu_IL.h"
#include "nciu_IL.h"
#include "claimMsgCache_IL.h"
netiiu::netiiu ( cac *pClientCtxIn ) : pClientCtx ( pClientCtxIn )
{
}
netiiu::~netiiu ()
{
@@ -23,7 +27,8 @@ netiiu::~netiiu ()
void netiiu::show ( unsigned level ) const
{
this->lock ();
osiAutoMutex autoMutex ( this->mutex );
printf ( "network IO base class\n" );
if ( level > 1 ) {
tsDLIterConstBD < nciu > pChan ( this->channelList.first () );
@@ -33,10 +38,9 @@ void netiiu::show ( unsigned level ) const
}
}
if ( level > 2u ) {
printf ("\tcac pointer %p\n", &this->cacRef );
printf ( "\tcac pointer %p\n", this->pClientCtx );
this->mutex.show ( level - 2u );
}
this->unlock ();
}
unsigned netiiu::channelCount () const
@@ -44,69 +48,79 @@ unsigned netiiu::channelCount () const
return this->channelList.count ();
}
void netiiu::lock () const
// cac lock must also be applied when
// calling this
void netiiu::attachChannel ( nciu &chan )
{
this->mutex.lock ();
osiAutoMutex autoMutex ( this->mutex );
this->channelList.add ( chan );
}
void netiiu::unlock () const
// cac lock must also be applied when
// calling this
void netiiu::detachChannel ( nciu &chan )
{
this->mutex.unlock ();
}
void netiiu::detachAllChan ()
{
this->lock ();
tsDLIterBD <nciu> chan ( this->channelList.first () );
while ( chan.valid () ) {
tsDLIterBD <nciu> next = chan.itemAfter ();
chan->detachChanFromIIU ();
chan = next;
{
osiAutoMutex autoMutex ( this->mutex );
this->channelList.remove ( chan );
if ( this->channelList.count () == 0u ) {
this->lastChannelDetachNotify ();
}
}
this->unlock ();
}
void netiiu::disconnectAllChan ()
// cac lock must also be applied when
// calling this
void netiiu::disconnectAllChan ( netiiu & newiiu )
{
this->lock ();
tsDLIterBD <nciu> chan ( this->channelList.first () );
while ( chan.valid () ) {
tsDLIterBD <nciu> next = chan.itemAfter ();
chan->disconnect ();
chan = next;
tsDLList < nciu > list;
{
osiAutoMutex autoMutex ( this->mutex );
tsDLIterBD < nciu > chan ( this->channelList.first () );
while ( chan.valid () ) {
tsDLIterBD < nciu > next = chan.itemAfter ();
this->clearChannelRequest ( *chan );
this->channelList.remove ( *chan );
chan->disconnect ( newiiu );
list.add ( *chan );
chan = next;
}
}
{
osiAutoMutex autoMutex ( newiiu.mutex );
newiiu.channelList.add ( list );
}
this->unlock ();
}
void netiiu::connectTimeoutNotify ()
{
this->lock ();
tsDLIterBD <nciu> chan ( this->channelList.first () );
osiAutoMutex autoMutex ( this->mutex );
tsDLIterBD < nciu > chan ( this->channelList.first () );
while ( chan.valid () ) {
chan->connectTimeoutNotify ();
chan++;
}
this->unlock ();
}
void netiiu::resetChannelRetryCounts ()
{
this->lock ();
tsDLIterBD <nciu> chan ( this->channelList.first () );
osiAutoMutex autoMutex ( this->mutex );
tsDLIterBD < nciu > chan ( this->channelList.first () );
while ( chan.valid () ) {
chan->resetRetryCount ();
chan++;
}
this->unlock ();
}
bool netiiu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThisChannel )
{
bool status;
this->lock ();
osiAutoMutex autoMutex ( this->mutex );
tsDLIterBD <nciu> chan = this->channelList.first ();
tsDLIterBD < nciu > chan = this->channelList.first ();
if ( chan.valid () ) {
status = chan->searchMsg ( retrySeqNumber, retryNoForThisChannel );
if ( status ) {
@@ -118,52 +132,10 @@ bool netiiu::searchMsg ( unsigned short retrySeqNumber, unsigned &retryNoForThis
status = false;
}
this->unlock ();
return status;
}
//
// considerable extra effort is taken in this routine to
// guarantee that the lock is not held while blocking
// in ::send () for buffer space.
//
void netiiu::sendPendingClaims ( bool v42Ok, claimMsgCache &cache )
{
while ( 1 ) {
this->lock ();
tsDLIterBD < nciu > chan ( this->channelList.first () );
if ( ! chan.valid () ) {
this->unlock ();
return;
}
if ( chan->claimSent () ) {
this->unlock ();
return;
}
if ( ! chan->setClaimMsgCache ( cache ) ) {
this->unlock ();
return;
}
// move channel to the end of the list so that it
// will not be considered again for a claim message
this->channelList.remove ( *chan );
this->channelList.add ( *chan );
this->unlock ();
int status = cache.deliverMsg ( *this );
if ( status != ECA_NORMAL ) {
// this indicates diconnect condition
// therefore no cleanup required
return;
}
if ( ! v42Ok ) {
cache.connectChannel ( this->cacRef );
}
}
}
bool netiiu::ca_v42_ok () const
{
return false;
@@ -179,7 +151,7 @@ bool netiiu::pushDatagramMsg ( const caHdr &, const void *, ca_uint16_t )
return false;
}
bool netiiu::connectionInProgress ( const char *, const osiSockAddr & ) const
bool netiiu::isVirtaulCircuit ( const char *, const osiSockAddr & ) const
{
return false;
}
@@ -188,44 +160,63 @@ void netiiu::lastChannelDetachNotify ()
{
}
int netiiu::writeRequest ( unsigned, unsigned, unsigned, const void * )
int netiiu::writeRequest ( nciu &, unsigned, unsigned, const void * )
{
return ECA_DISCONNCHID;
}
int netiiu::writeNotifyRequest ( unsigned, unsigned, unsigned, unsigned, const void * )
int netiiu::writeNotifyRequest ( nciu &, cacNotify &, unsigned, unsigned, const void * )
{
return ECA_DISCONNCHID;
}
int netiiu::readCopyRequest ( unsigned, unsigned, unsigned, unsigned )
int netiiu::readCopyRequest ( nciu &, unsigned, unsigned, void * )
{
return ECA_DISCONNCHID;
}
int netiiu::readNotifyRequest ( unsigned, unsigned, unsigned, unsigned )
int netiiu::readNotifyRequest ( nciu &, cacNotify &, unsigned, unsigned )
{
return ECA_DISCONNCHID;
}
int netiiu::createChannelRequest ( unsigned, const char *, unsigned )
int netiiu::createChannelRequest ( nciu & )
{
return ECA_DISCONNCHID;
}
int netiiu::clearChannelRequest ( unsigned, unsigned )
int netiiu::clearChannelRequest ( nciu & )
{
return ECA_DISCONNCHID;
}
int netiiu::subscriptionRequest ( unsigned, unsigned, unsigned, unsigned, unsigned, bool )
int netiiu::subscriptionRequest ( netSubscription &, bool )
{
return ECA_NORMAL;
}
int netiiu::subscriptionCancelRequest ( netSubscription & )
{
return ECA_DISCONNCHID;
}
int netiiu::subscriptionCancelRequest ( unsigned, unsigned, unsigned, unsigned )
void netiiu::hostName ( char *pBuf, unsigned bufLength ) const
{
return ECA_DISCONNCHID;
if ( bufLength ) {
strncpy ( pBuf, this->pHostName (), bufLength );
pBuf[bufLength - 1u] = '\0';
}
}
const char * netiiu::pHostName () const
{
return "<disconnected>";
}
void netiiu::disconnectAllIO ( nciu & )
{
}
void netiiu::subscribeAllIO ( nciu & )
{
}
+2 -7
View File
@@ -18,14 +18,9 @@
#ifndef netiiu_ILh
#define netiiu_ILh
inline netiiu::netiiu ( cac &cacIn ) :
cacRef ( cacIn )
inline cac * netiiu::pCAC () const
{
}
inline cac & netiiu::clientCtx () const
{
return this->cacRef;
return this->pClientCtx;
}
#endif // netiiu_ILh
+3 -2
View File
@@ -93,9 +93,10 @@ public:
void operator delete ( void *pCadaver, size_t size );
private:
oldChannel &chan;
oldChannel &chan;
caEventCallBackFunc *pFunc;
void *pPrivate;
void *pPrivate;
void completionNotify ( unsigned type, unsigned long count, const void *pData );
void exceptionNotify ( int status, const char *pContext );
+1
View File
@@ -70,3 +70,4 @@ void oldSubscription::operator delete ( void *pCadaver, size_t size )
{
oldSubscription::freeList.release ( pCadaver, size );
}
+52 -34
View File
@@ -19,7 +19,8 @@
#include "iocinf.h"
recvProcessThread::recvProcessThread (cac *pcacIn) :
osiThread ( "CAC-recv-process", threadGetStackSize (threadStackSmall), threadPriorityMedium ),
osiThread ( "CAC-recv-process", threadGetStackSize (threadStackSmall),
pcacIn->getInitializingThreadsPriority () ),
pcac ( pcacIn ),
enableRefCount ( 0u ),
blockingForCompletion ( 0u ),
@@ -41,18 +42,25 @@ void recvProcessThread::entryPoint ()
SEVCHK ( status, "attaching to client context in recv process thread" );
while ( ! this->shutDown ) {
this->mutex.lock ();
if ( this->enableRefCount ) {
this->processing = true;
{
osiAutoMutex autoMutex ( this->mutex );
if ( this->enableRefCount ) {
this->processing = true;
}
}
this->mutex.unlock ();
if ( this->processing ) {
pcac->processRecvBacklog ();
}
this->processing = false;
if ( this->blockingForCompletion ) {
bool signalNeeded;
{
osiAutoMutex autoMutex ( this->mutex );
this->processing = false;
signalNeeded = this->blockingForCompletion > 0u;
}
if ( signalNeeded ) {
this->processingDone.signal ();
}
@@ -71,11 +79,12 @@ void recvProcessThread::enable ()
{
unsigned copy;
this->mutex.lock ();
assert ( this->enableRefCount < UINT_MAX );
copy = this->enableRefCount;
this->enableRefCount++;
this->mutex.unlock ();
{
osiAutoMutex autoMutex ( this->mutex );
assert ( this->enableRefCount < UINT_MAX );
copy = this->enableRefCount;
this->enableRefCount++;
}
if ( copy == 0u ) {
this->recvActivity.signal ();
@@ -84,31 +93,41 @@ void recvProcessThread::enable ()
void recvProcessThread::disable ()
{
bool waitNeeded;
bool wakeupNeeded;
this->mutex.lock ();
assert ( this->enableRefCount != 0u );
this->enableRefCount--;
if ( this->enableRefCount == 0u && this->processing ) {
waitNeeded = true;
this->blockingForCompletion++;
}
else {
waitNeeded = false;
}
this->mutex.unlock ();
{
osiAutoMutex autoMutex ( this->mutex );
if ( waitNeeded ) {
while ( this->processing ) {
this->processingDone.wait ();
if ( ! this->processing ) {
assert ( this->enableRefCount != 0u );
this->enableRefCount--;
return;
}
this->mutex.lock ();
this->blockingForCompletion--;
this->mutex.unlock ();
if ( this->blockingForCompletion ) {
this->processingDone.signal ();
else {
this->blockingForCompletion++;
}
}
while ( true ) {
this->processingDone.wait ();
{
osiAutoMutex autoMutex ( this->mutex );
if ( ! this->processing ) {
assert ( this->enableRefCount > 0u );
this->enableRefCount--;
assert ( this->blockingForCompletion > 0u );
this->blockingForCompletion--;
wakeupNeeded = this->blockingForCompletion > 0u;
break;
}
}
}
if ( wakeupNeeded ) {
this->processingDone.signal ();
}
}
void recvProcessThread::signalActivity ()
@@ -118,7 +137,7 @@ void recvProcessThread::signalActivity ()
void recvProcessThread::show ( unsigned level ) const
{
this->mutex.lock ();
osiAutoMutex autoMutex ( this->mutex );
printf ( "CA receive processing thread at %p state=%s\n",
this, this->processing ? "busy" : "idle");
if ( level > 0u ) {
@@ -139,5 +158,4 @@ void recvProcessThread::show ( unsigned level ) const
printf ( "mutex:\n" );
this->mutex.show ( level - 3u );
}
this->mutex.unlock ();
}
+2 -10
View File
@@ -13,10 +13,9 @@
#include "iocinf.h"
tcpRecvWatchdog::tcpRecvWatchdog
( double periodIn, osiTimerQueue & queueIn, bool echoProtocolAcceptedIn ) :
( double periodIn, osiTimerQueue & queueIn ) :
osiTimer ( queueIn ),
period ( periodIn ),
echoProtocolAccepted ( echoProtocolAcceptedIn ),
responsePending ( false ),
beaconAnomaly ( true )
{
@@ -37,10 +36,7 @@ void tcpRecvWatchdog::expire ()
this->forcedShutdown ();
}
else {
this->echoRequest ();
if ( this->echoProtocolAccepted ) {
this->responsePending = true;
}
this->responsePending = this->setEchoRequestPending ();
}
}
@@ -113,8 +109,4 @@ void tcpRecvWatchdog::show ( unsigned level ) const
printf ( "\tresponse pending boolean %u, beacon anomaly boolean %u\n",
this->responsePending, this->beaconAnomaly );
}
if ( level > 1u ) {
printf ( "\techo protocol accepted boolean %u\n",
this->echoProtocolAccepted );
}
}
+1243 -465
View File
File diff suppressed because it is too large Load Diff
+20
View File
@@ -0,0 +1,20 @@
/*
* $Id$
*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
+22 -24
View File
@@ -16,36 +16,27 @@
* 505 665 1831
*/
inline osiSockAddr tcpiiu::address () const
{
return this->ipToA.address ();
}
inline void * tcpiiu::operator new (size_t size)
{
return tcpiiu::freeList.allocate (size);
}
inline void tcpiiu::operator delete (void *pCadaver, size_t size)
{
tcpiiu::freeList.release (pCadaver,size);
}
inline bool tcpiiu::fullyConstructed () const
{
return this->fullyConstructedFlag;
}
inline void tcpiiu::hostName ( char *pBuf, unsigned bufLength ) const
{
this->ipToA.hostName ( pBuf, bufLength );
{
osiAutoMutex autoMutex ( this->mutex );
if ( this->pHostNameCache ) {
this->pHostNameCache->hostName ( pBuf, bufLength );
}
else {
netiiu::hostName ( pBuf, bufLength );
}
}
// deprecated - please dont use
inline const char * tcpiiu::pHostName () const
{
static char nameBuf [128];
this->ipToA.hostName ( nameBuf, sizeof ( nameBuf ) );
this->hostName ( nameBuf, sizeof ( nameBuf ) );
return nameBuf; // ouch !!
}
@@ -56,25 +47,26 @@ inline SOCKET tcpiiu::getSock () const
inline void tcpiiu::flush ()
{
this->lock ();
this->flushPending = true;
this->unlock ();
{
osiAutoMutex autoMutex ( this->mutex );
this->flushPending = true;
}
semBinaryGive ( this->sendThreadFlushSignal );
}
inline bool tcpiiu::ca_v44_ok () const
{
return CA_V44 ( CA_PROTOCOL_VERSION, this->minorProtocolVersionNumber );
return CA_V44 ( CA_PROTOCOL_VERSION, this->minorProtocolVersion );
}
inline bool tcpiiu::ca_v42_ok () const
{
return CA_V42 ( CA_PROTOCOL_VERSION, this->minorProtocolVersionNumber );
return CA_V42 ( CA_PROTOCOL_VERSION, this->minorProtocolVersion );
}
inline bool tcpiiu::ca_v41_ok () const
{
return CA_V41 ( CA_PROTOCOL_VERSION, this->minorProtocolVersionNumber );
return CA_V41 ( CA_PROTOCOL_VERSION, this->minorProtocolVersion );
}
inline bool tcpiiu::alive () const
@@ -87,3 +79,9 @@ inline bool tcpiiu::alive () const
return false;
}
}
inline bhe * tcpiiu::getBHE () const
{
return this->pBHE;
}
+26 -41
View File
@@ -16,6 +16,7 @@
#include "addrList.h"
#include "inetAddrID_IL.h"
#include "netiiu_IL.h"
#include "cac_IL.h"
typedef void (*pProtoStubUDP) (udpiiu *piiu, caHdr *pMsg, const struct sockaddr_in *pnet_addr);
@@ -291,7 +292,7 @@ bool udpiiu::repeaterInstalled ()
// udpiiu::udpiiu ()
//
udpiiu::udpiiu ( cac &cac ) :
netiiu ( cac ), shutdownCmd ( false ), sockCloseCompleted ( false )
netiiu ( &cac ), shutdownCmd ( false ), sockCloseCompleted ( false )
{
static const unsigned short PORT_ANY = 0u;
osiSockAddr addr;
@@ -367,23 +368,22 @@ udpiiu::udpiiu ( cac &cac ) :
ellInit ( &this->dest );
configureChannelAccessAddressList ( &this->dest, this->sock, this->serverPort );
if ( ellCount ( &this->dest ) == 0 ) {
genLocalExcep ( this->clientCtx (), ECA_NOSEARCHADDR, NULL );
genLocalExcep ( *this->pCAC (), ECA_NOSEARCHADDR, NULL );
}
{
unsigned priorityOfSelf = threadGetPrioritySelf ();
unsigned priorityOfRecv;
threadId tid;
threadBoolStatus tbs;
tbs = threadLowestPriorityLevelAbove ( priorityOfSelf, &priorityOfRecv );
tbs = threadLowestPriorityLevelAbove (
this->pCAC ()->getInitializingThreadsPriority (), &priorityOfRecv );
if ( tbs != tbsSuccess ) {
priorityOfRecv = priorityOfSelf;
priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority ();
}
tid = threadCreate ( "CAC-UDP", priorityOfRecv,
this->recvThreadId = threadCreate ( "CAC-UDP", priorityOfRecv,
threadGetStackSize (threadStackMedium), cacRecvThreadUDP, this );
if (tid==0) {
if ( this->recvThreadId == 0 ) {
ca_printf ("CA: unable to create UDP receive thread\n");
semBinaryDestroy (this->recvThreadExitSignal);
socket_close (this->sock);
@@ -424,8 +424,6 @@ udpiiu::~udpiiu ()
// closes the udp socket and waits for its recv thread to exit
this->shutdown ();
this->detachAllChan ();
semBinaryDestroy ( this->recvThreadExitSignal );
ellFree ( &this->dest );
@@ -440,10 +438,13 @@ udpiiu::~udpiiu ()
*/
void udpiiu::shutdown ()
{
this->lock ();
bool laborNeeded = ! this->shutdownCmd;
this->shutdownCmd = true;
this->unlock ();
bool laborNeeded;
{
osiAutoMutex autoMutex ( this->mutex );
laborNeeded = ! this->shutdownCmd;
this->shutdownCmd = true;
}
if ( laborNeeded ) {
int status;
@@ -559,12 +560,12 @@ void udpiiu::searchRespAction ( const caHdr &msg, const osiSockAddr &addr )
}
if ( CA_V42 ( CA_PROTOCOL_VERSION, minorVersion ) ) {
this->clientCtx ().lookupChannelAndTransferToTCP
this->pCAC ()->lookupChannelAndTransferToTCP
( msg.m_available, msg.m_cid, USHRT_MAX, 0,
minorVersion, serverAddr );
}
else {
this->clientCtx ().lookupChannelAndTransferToTCP
this->pCAC ()->lookupChannelAndTransferToTCP
( msg.m_available, msg.m_cid, msg.m_dataType,
minorVersion, msg.m_count, serverAddr );
}
@@ -610,14 +611,14 @@ void udpiiu::beaconAction ( const caHdr &msg, const osiSockAddr &net_addr )
ina.sin_port = htons ( this->serverPort );
}
this->clientCtx ().beaconNotify ( ina );
this->pCAC ()->beaconNotify ( ina );
return;
}
void udpiiu::repeaterAckAction ( const caHdr &, const osiSockAddr &)
{
this->clientCtx ().repeaterSubscribeConfirmNotify ();
this->pCAC ()->repeaterSubscribeConfirmNotify ();
}
void udpiiu::notHereRespAction ( const caHdr &, const osiSockAddr &)
@@ -709,19 +710,6 @@ int udpiiu::postMsg ( const osiSockAddr &net_addr,
return ECA_NORMAL;
}
void udpiiu::hostName ( char *pBuf, unsigned bufLength ) const
{
if ( bufLength ) {
strncpy ( pBuf, this->pHostName (), bufLength );
pBuf[bufLength - 1u] = '\0';
}
}
const char * udpiiu::pHostName () const
{
return "<disconnected>";
}
/*
* udpiiu::pushDatagramMsg ()
*/
@@ -740,10 +728,9 @@ bool udpiiu::pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t e
return false;
}
this->lock ();
osiAutoMutex autoMutex ( this->mutex );
if ( msgsize + this->nBytesInXmitBuf > sizeof ( this->xmitBuf ) ) {
this->unlock ();
return false;
}
@@ -757,8 +744,6 @@ bool udpiiu::pushDatagramMsg ( const caHdr &msg, const void *pExt, ca_uint16_t e
pbufmsg->m_postsize = htons ( allignedExtSize );
this->nBytesInXmitBuf += msgsize;
this->unlock ();
return true;
}
@@ -769,10 +754,9 @@ void udpiiu::flush ()
{
osiSockAddrNode *pNode;
this->lock ();
osiAutoMutex autoMutex ( this->mutex );
if ( this->nBytesInXmitBuf == 0u ) {
this->unlock ();
return;
}
@@ -825,8 +809,6 @@ void udpiiu::flush ()
}
this->nBytesInXmitBuf = 0u;
this->unlock ();
}
SOCKET udpiiu::getSock () const
@@ -836,7 +818,7 @@ SOCKET udpiiu::getSock () const
void udpiiu::show ( unsigned level ) const
{
this->lock ();
osiAutoMutex autoMutex ( this->mutex );
printf ( "Datagram IO circuit (and disconnected channel repository)\n");
if ( level > 1u ) {
this->netiiu::show ( level - 1u );
@@ -853,6 +835,9 @@ void udpiiu::show ( unsigned level ) const
printf ( "\trecv thread exit signal:\n" );
semBinaryShow ( this->recvThreadExitSignal, level-3u );
}
this->unlock ();
}
bool udpiiu::isCurrentThread () const
{
return ( this->recvThreadId == threadGetIdSelf () );
}