1583 lines
46 KiB
C++
1583 lines
46 KiB
C++
|
|
/* $Id$
|
|
*
|
|
* L O S A L A M O S
|
|
* Los Alamos National Laboratory
|
|
* Los Alamos, New Mexico 87545
|
|
*
|
|
* Copyright, 1986, The Regents of the University of California.
|
|
*
|
|
* Author: Jeff Hill
|
|
*/
|
|
|
|
#define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
|
|
|
|
#include "epicsMemory.h"
|
|
#include "osiProcess.h"
|
|
#include "osiSigPipeIgnore.h"
|
|
#include "envDefs.h"
|
|
|
|
#include "iocinf.h"
|
|
#include "cac.h"
|
|
#include "inetAddrID.h"
|
|
#include "virtualCircuit.h"
|
|
#include "netIO.h"
|
|
#include "syncGroup.h"
|
|
#include "nciu.h"
|
|
#include "autoPtrRecycle.h"
|
|
#include "searchTimer.h"
|
|
#include "repeaterSubscribeTimer.h"
|
|
|
|
#define epicsExportSharedSymbols
|
|
#include "udpiiu.h"
|
|
#include "bhe.h"
|
|
#include "net_convert.h"
|
|
#undef epicsExportSharedSymbols
|
|
|
|
// TCP response dispatch table
|
|
const cac::pProtoStubTCP cac::tcpJumpTableCAC [] =
|
|
{
|
|
&cac::noopAction,
|
|
&cac::eventRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::readRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::exceptionRespAction,
|
|
&cac::clearChannelRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::readNotifyRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::claimCIURespAction,
|
|
&cac::writeNotifyRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::accessRightsRespAction,
|
|
&cac::echoRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::badTCPRespAction,
|
|
&cac::verifyAndDisconnectChan,
|
|
&cac::verifyAndDisconnectChan
|
|
};
|
|
|
|
// TCP exception dispatch table
|
|
const cac::pExcepProtoStubTCP cac::tcpExcepJumpTableCAC [] =
|
|
{
|
|
&cac::defaultExcep, // CA_PROTO_NOOP
|
|
&cac::eventAddExcep, // CA_PROTO_EVENT_ADD
|
|
&cac::defaultExcep, // CA_PROTO_EVENT_CANCEL
|
|
&cac::readExcep, // CA_PROTO_READ
|
|
&cac::writeExcep, // CA_PROTO_WRITE
|
|
&cac::defaultExcep, // CA_PROTO_SNAPSHOT
|
|
&cac::defaultExcep, // CA_PROTO_SEARCH
|
|
&cac::defaultExcep, // CA_PROTO_BUILD
|
|
&cac::defaultExcep, // CA_PROTO_EVENTS_OFF
|
|
&cac::defaultExcep, // CA_PROTO_EVENTS_ON
|
|
&cac::defaultExcep, // CA_PROTO_READ_SYNC
|
|
&cac::defaultExcep, // CA_PROTO_ERROR
|
|
&cac::defaultExcep, // CA_PROTO_CLEAR_CHANNEL
|
|
&cac::defaultExcep, // CA_PROTO_RSRV_IS_UP
|
|
&cac::defaultExcep, // CA_PROTO_NOT_FOUND
|
|
&cac::readNotifyExcep, // CA_PROTO_READ_NOTIFY
|
|
&cac::defaultExcep, // CA_PROTO_READ_BUILD
|
|
&cac::defaultExcep, // REPEATER_CONFIRM
|
|
&cac::defaultExcep, // CA_PROTO_CLAIM_CIU
|
|
&cac::writeNotifyExcep, // CA_PROTO_WRITE_NOTIFY
|
|
&cac::defaultExcep, // CA_PROTO_CLIENT_NAME
|
|
&cac::defaultExcep, // CA_PROTO_HOST_NAME
|
|
&cac::defaultExcep, // CA_PROTO_ACCESS_RIGHTS
|
|
&cac::defaultExcep, // CA_PROTO_ECHO
|
|
&cac::defaultExcep, // REPEATER_REGISTER
|
|
&cac::defaultExcep, // CA_PROTO_SIGNAL
|
|
&cac::defaultExcep, // CA_PROTO_CLAIM_CIU_FAILED
|
|
&cac::defaultExcep // CA_PROTO_SERVER_DISCONN
|
|
};
|
|
|
|
//
|
|
// cac::cac ()
|
|
//
|
|
cac::cac ( cacNotify ¬ifyIn, bool enablePreemptiveCallbackIn ) :
|
|
ipToAEngine ( "caIPAddrToAsciiEngine" ),
|
|
chanTable ( 1024 ),
|
|
ioTable ( 1024 ),
|
|
sgTable ( 128 ),
|
|
beaconTable ( 1024 ),
|
|
pudpiiu ( 0 ),
|
|
pSearchTmr ( 0 ),
|
|
pRepeaterSubscribeTmr ( 0 ),
|
|
tcpSmallRecvBufFreeList ( 0 ),
|
|
tcpLargeRecvBufFreeList ( 0 ),
|
|
notify ( notifyIn ),
|
|
ioNotifyInProgressId ( 0 ),
|
|
initializingThreadsPriority ( epicsThreadGetPrioritySelf () ),
|
|
threadsBlockingOnNotifyCompletion ( 0u ),
|
|
maxRecvBytesTCP ( MAX_TCP ),
|
|
enablePreemptiveCallback ( enablePreemptiveCallbackIn ),
|
|
ioInProgress ( false )
|
|
{
|
|
long status;
|
|
unsigned abovePriority;
|
|
|
|
if ( ! osiSockAttach () ) {
|
|
throwWithLocation ( caErrorCode (ECA_INTERNAL) );
|
|
}
|
|
|
|
{
|
|
epicsThreadBooleanStatus tbs;
|
|
|
|
tbs = epicsThreadLowestPriorityLevelAbove ( this->initializingThreadsPriority, &abovePriority);
|
|
if ( tbs != epicsThreadBooleanStatusSuccess ) {
|
|
abovePriority = this->initializingThreadsPriority;
|
|
}
|
|
}
|
|
|
|
installSigPipeIgnore ();
|
|
|
|
{
|
|
char tmp[256];
|
|
size_t len;
|
|
osiGetUserNameReturn gunRet;
|
|
|
|
gunRet = osiGetUserName ( tmp, sizeof (tmp) );
|
|
if ( gunRet != osiGetUserNameSuccess ) {
|
|
tmp[0] = '\0';
|
|
}
|
|
len = strlen ( tmp ) + 1;
|
|
this->pUserName = new char [len];
|
|
if ( ! this->pUserName ) {
|
|
throwWithLocation ( caErrorCode (ECA_ALLOCMEM) );
|
|
}
|
|
strncpy ( this->pUserName, tmp, len );
|
|
}
|
|
|
|
this->programBeginTime = epicsTime::getCurrent ();
|
|
|
|
status = envGetDoubleConfigParam ( &EPICS_CA_CONN_TMO, &this->connTMO );
|
|
if ( status ) {
|
|
this->connTMO = CA_CONN_VERIFY_PERIOD;
|
|
this->printf ( "EPICS \"%s\" double fetch failed\n", EPICS_CA_CONN_TMO.name);
|
|
this->printf ( "Defaulting \"%s\" = %f\n", EPICS_CA_CONN_TMO.name, this->connTMO);
|
|
}
|
|
|
|
long maxBytesAsALong;
|
|
status = envGetLongConfigParam ( &EPICS_CA_MAX_ARRAY_BYTES, &maxBytesAsALong );
|
|
if ( status || maxBytesAsALong < 0 ) {
|
|
errlogPrintf ( "cac: EPICS_CA_MAX_ARRAY_BYTES was not a positive integer\n" );
|
|
}
|
|
else {
|
|
/* allow room for the protocol header so that they get the array size they requested */
|
|
static const unsigned headerSize = sizeof ( caHdr ) + 2 * sizeof ( ca_uint32_t );
|
|
ca_uint32_t maxBytes = ( unsigned ) maxBytesAsALong;
|
|
if ( maxBytes < 0xffffffff - headerSize ) {
|
|
maxBytes += headerSize;
|
|
}
|
|
else {
|
|
maxBytes = 0xffffffff;
|
|
}
|
|
if ( maxBytes < MAX_TCP ) {
|
|
errlogPrintf ( "cac: EPICS_CA_MAX_ARRAY_BYTES was rounded up to %u\n", MAX_TCP );
|
|
}
|
|
else {
|
|
this->maxRecvBytesTCP = maxBytes;
|
|
}
|
|
}
|
|
freeListInitPvt ( &this->tcpSmallRecvBufFreeList, MAX_TCP, 1 );
|
|
if ( ! this->tcpSmallRecvBufFreeList ) {
|
|
free ( this->pUserName );
|
|
throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) );
|
|
}
|
|
|
|
freeListInitPvt ( &this->tcpLargeRecvBufFreeList, this->maxRecvBytesTCP, 1 );
|
|
if ( ! this->tcpLargeRecvBufFreeList ) {
|
|
free ( this->pUserName );
|
|
freeListCleanup ( this->tcpSmallRecvBufFreeList );
|
|
throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) );
|
|
}
|
|
|
|
this->pTimerQueue = & epicsTimerQueueActive::allocate ( false, abovePriority );
|
|
if ( ! this->pTimerQueue ) {
|
|
free ( this->pUserName );
|
|
freeListCleanup ( this->tcpSmallRecvBufFreeList );
|
|
freeListCleanup ( this->tcpLargeRecvBufFreeList );
|
|
throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) );
|
|
}
|
|
|
|
//
|
|
// unfortunately, this must be created here in the
|
|
// constructor, and not on demand (only when it is needed)
|
|
// because the enable reference count must be
|
|
// maintained whenever this object exists.
|
|
//
|
|
this->pRecvProcThread = new recvProcessThread ( this );
|
|
if ( ! this->pRecvProcThread ) {
|
|
this->pTimerQueue->release ();
|
|
free ( this->pUserName );
|
|
freeListCleanup ( this->tcpSmallRecvBufFreeList );
|
|
freeListCleanup ( this->tcpLargeRecvBufFreeList );
|
|
throwWithLocation ( caErrorCode ( ECA_ALLOCMEM ) );
|
|
}
|
|
else if ( this->enablePreemptiveCallback ) {
|
|
// only after this->pRecvProcThread is valid
|
|
this->enableCallbackPreemption ();
|
|
}
|
|
}
|
|
|
|
cac::~cac ()
|
|
{
|
|
this->enableCallbackPreemption ();
|
|
|
|
//
|
|
// make certain that process thread isnt deleting
|
|
// tcpiiu objects at the same that this thread is
|
|
//
|
|
if ( this->pRecvProcThread ) {
|
|
this->pRecvProcThread->disable ();
|
|
}
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
if ( this->pudpiiu ) {
|
|
// this blocks until the UDP thread exits so that
|
|
// it will not sneak in any new clients
|
|
this->pudpiiu->shutdown ();
|
|
}
|
|
}
|
|
|
|
//
|
|
// shutdown all tcp connections and wait for threads to exit
|
|
//
|
|
while ( true ) {
|
|
tcpiiu * piiu;
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
if ( ( piiu = this->iiuList.get() ) ) {
|
|
piiu->disconnectAllChan ( limboIIU );
|
|
}
|
|
}
|
|
if ( ! piiu ) {
|
|
break;
|
|
}
|
|
piiu->destroy ();
|
|
}
|
|
|
|
delete this->pRecvProcThread;
|
|
delete this->pRepeaterSubscribeTmr;
|
|
delete this->pSearchTmr;
|
|
|
|
freeListCleanup ( this->tcpSmallRecvBufFreeList );
|
|
freeListCleanup ( this->tcpLargeRecvBufFreeList );
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
if ( this->pudpiiu ) {
|
|
|
|
//
|
|
// make certain that the UDP thread isnt starting
|
|
// up new clients.
|
|
//
|
|
this->pudpiiu->disconnectAllChan ( limboIIU );
|
|
}
|
|
}
|
|
|
|
if ( ! this->enablePreemptiveCallback ) {
|
|
this->notify.fdWasDestroyed ( this->pudpiiu->getSock() );
|
|
}
|
|
delete this->pudpiiu;
|
|
delete this->pUserName;
|
|
|
|
this->beaconTable.traverse ( &bhe::destroy );
|
|
|
|
// if we get here and the IO is still attached then we have a
|
|
// leaked io block that was not registered with a channel.
|
|
if ( this->ioTable.numEntriesInstalled () ) {
|
|
this->printf ( "CAC %u orphaned IO items?\n",
|
|
this->ioTable.numEntriesInstalled () );
|
|
}
|
|
|
|
osiSockRelease ();
|
|
|
|
this->pTimerQueue->release ();
|
|
}
|
|
|
|
void cac::processRecvBacklog ()
|
|
{
|
|
tsDLList < tcpiiu > deadIIU;
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
tsDLIterBD < tcpiiu > piiu = this->iiuList.firstIter ();
|
|
while ( piiu.valid () ) {
|
|
tsDLIterBD < tcpiiu > pNext = piiu;
|
|
pNext++;
|
|
|
|
if ( ! piiu->alive () ) {
|
|
assert ( this->pudpiiu && this->pSearchTmr );
|
|
|
|
piiu->getBHE().unbindFromIIU ();
|
|
|
|
if ( piiu->channelCount () ) {
|
|
char hostNameTmp[64];
|
|
piiu->hostName ( hostNameTmp, sizeof ( hostNameTmp ) );
|
|
genLocalExcep ( *this, ECA_DISCONN, hostNameTmp );
|
|
}
|
|
|
|
piiu->disconnectAllChan ( *this->pudpiiu );
|
|
|
|
// make certain that:
|
|
// 1) this is called from the appropriate thread
|
|
// 2) lock is not held while in call back
|
|
if ( ! this->enablePreemptiveCallback ) {
|
|
epicsAutoMutexRelease autoRelease ( this->mutex );
|
|
this->notify.fdWasDestroyed ( piiu->getSock() );
|
|
}
|
|
this->iiuList.remove ( *piiu );
|
|
deadIIU.add ( *piiu ); // postpone destroy and avoid deadlock
|
|
}
|
|
else {
|
|
// make certain that:
|
|
// 1) this is called from the appropriate thread
|
|
// 2) lock is not held while in call back
|
|
if ( piiu->trueOnceOnly() && ! this->enablePreemptiveCallback ) {
|
|
epicsAutoMutexRelease autoRelease ( this->mutex );
|
|
this->notify.fdWasCreated ( piiu->getSock() );
|
|
}
|
|
piiu->processIncoming ();
|
|
}
|
|
|
|
piiu = pNext;
|
|
}
|
|
}
|
|
if ( deadIIU.count() ) {
|
|
while ( tcpiiu *piiu = deadIIU.get() ) {
|
|
piiu->destroy ();
|
|
}
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
this->pSearchTmr->resetPeriod ( 0.0 );
|
|
}
|
|
}
|
|
}
|
|
|
|
//
|
|
// set the push pending flag on all virtual circuits
|
|
//
|
|
void cac::flushRequest ()
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
tsDLIterBD <tcpiiu> piiu = this->iiuList.firstIter ();
|
|
while ( piiu.valid () ) {
|
|
piiu->flushRequest ();
|
|
piiu++;
|
|
}
|
|
}
|
|
|
|
unsigned cac::connectionCount () const
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
return this->iiuList.count ();
|
|
}
|
|
|
|
void cac::show ( unsigned level ) const
|
|
{
|
|
epicsAutoMutex autoMutex2 ( this->mutex );
|
|
|
|
::printf ( "Channel Access Client Context at %p for user %s\n",
|
|
static_cast <const void *> ( this ), this->pUserName );
|
|
if ( level > 0u ) {
|
|
tsDLIterConstBD < tcpiiu > piiu = this->iiuList.firstIter ();
|
|
while ( piiu.valid () ) {
|
|
piiu->show ( level - 1u );
|
|
piiu++;
|
|
}
|
|
|
|
::printf ( "\tconnection time out watchdog period %f\n", this->connTMO );
|
|
::printf ( "\tpreemptive calback is %s\n",
|
|
this->enablePreemptiveCallback ? "enabled" : "disabled" );
|
|
::printf ( "list of installed services:\n" );
|
|
this->services.show ( level - 1u );
|
|
}
|
|
|
|
if ( level > 1u ) {
|
|
if ( this->pudpiiu ) {
|
|
this->pudpiiu->show ( level - 2u );
|
|
}
|
|
this->ioCounter.show ( level - 2u );
|
|
}
|
|
|
|
if ( level > 2u ) {
|
|
::printf ( "Program begin time:\n");
|
|
this->programBeginTime.show ( level - 3u );
|
|
::printf ( "Channel identifier hash table:\n" );
|
|
this->chanTable.show ( level - 3u );
|
|
::printf ( "IO identifier hash table:\n" );
|
|
this->ioTable.show ( level - 3u );
|
|
::printf ( "Synchronous group identifier hash table:\n" );
|
|
this->sgTable.show ( level - 3u );
|
|
::printf ( "Beacon source identifier hash table:\n" );
|
|
this->beaconTable.show ( level - 3u );
|
|
if ( this->pTimerQueue ) {
|
|
::printf ( "Timer queue:\n" );
|
|
this->pTimerQueue->show ( level - 3u );
|
|
}
|
|
if ( this->pRecvProcThread ) {
|
|
::printf ( "incoming messages processing thread:\n" );
|
|
this->pRecvProcThread->show ( level - 3u );
|
|
}
|
|
if ( this->pSearchTmr ) {
|
|
::printf ( "search message timer:\n" );
|
|
this->pSearchTmr->show ( level - 3u );
|
|
}
|
|
if ( this->pRepeaterSubscribeTmr ) {
|
|
::printf ( "repeater subscribee timer:\n" );
|
|
this->pRepeaterSubscribeTmr->show ( level - 3u );
|
|
}
|
|
::printf ( "IP address to name conversion engine:\n" );
|
|
this->ipToAEngine.show ( level - 3u );
|
|
}
|
|
|
|
if ( level > 3u ) {
|
|
::printf ( "Default mutex:\n");
|
|
this->mutex.show ( level - 4u );
|
|
}
|
|
}
|
|
|
|
void cac::signalRecvActivity ()
|
|
{
|
|
if ( this->pRecvProcThread ) {
|
|
this->pRecvProcThread->signalActivity ();
|
|
}
|
|
}
|
|
|
|
/*
|
|
* cac::beaconNotify
|
|
*/
|
|
void cac::beaconNotify ( const inetAddrID &addr, const epicsTime ¤tTime )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
if ( ! this->pudpiiu ) {
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* look for it in the hash table
|
|
*/
|
|
bhe *pBHE = this->beaconTable.lookup ( addr );
|
|
if ( pBHE ) {
|
|
/*
|
|
* return if the beacon period has not changed significantly
|
|
*/
|
|
if ( ! pBHE->updatePeriod ( this->programBeginTime, currentTime ) ) {
|
|
return;
|
|
}
|
|
}
|
|
else {
|
|
/*
|
|
* This is the first beacon seen from this server.
|
|
* Wait until 2nd beacon is seen before deciding
|
|
* if it is a new server (or just the first
|
|
* time that we have seen a server's beacon
|
|
* shortly after the program started up)
|
|
*/
|
|
pBHE = new bhe ( currentTime, addr );
|
|
if ( pBHE ) {
|
|
if ( this->beaconTable.add ( *pBHE ) < 0 ) {
|
|
pBHE->destroy ();
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* This part is needed when many machines
|
|
* have channels in a disconnected state that
|
|
* dont exist anywhere on the network. This insures
|
|
* that we dont have many CA clients synchronously
|
|
* flooding the network with broadcasts (and swamping
|
|
* out requests for valid channels).
|
|
*
|
|
* I fetch the local UDP port number and use the low
|
|
* order bits as a pseudo random delay to prevent every
|
|
* one from replying at once.
|
|
*/
|
|
if ( this->pSearchTmr ) {
|
|
static const double portTicksPerSec = 1000u;
|
|
static const unsigned portBasedDelayMask = 0xff;
|
|
unsigned port = this->pudpiiu->getPort ();
|
|
double delay = ( port & portBasedDelayMask );
|
|
delay /= portTicksPerSec;
|
|
this->pSearchTmr->resetPeriod ( delay );
|
|
}
|
|
|
|
this->pudpiiu->resetChannelRetryCounts ();
|
|
|
|
# if DEBUG
|
|
{
|
|
char buf[64];
|
|
ipAddrToDottedIP (pnet_addr, buf, sizeof ( buf ) );
|
|
printf ("new server available: %s\n", buf);
|
|
}
|
|
# endif
|
|
}
|
|
|
|
int cac::pendIO ( const double &timeout )
|
|
{
|
|
// prevent recursion nightmares by disabling calls to
|
|
// pendIO () from within a CA callback
|
|
if ( this->pRecvProcThread->isCurrentThread () ) {
|
|
return ECA_EVDISALLOW;
|
|
}
|
|
|
|
this->enableCallbackPreemption ();
|
|
|
|
this->flushRequest ();
|
|
|
|
int status = ECA_NORMAL;
|
|
epicsTime beg_time = epicsTime::getCurrent ();
|
|
double remaining;
|
|
if ( timeout == 0.0 ) {
|
|
remaining = 60.0;
|
|
}
|
|
else{
|
|
remaining = timeout;
|
|
}
|
|
while ( this->ioCounter.currentCount () > 0 ) {
|
|
if ( remaining < CAC_SIGNIFICANT_SELECT_DELAY ) {
|
|
status = ECA_TIMEOUT;
|
|
break;
|
|
}
|
|
this->ioCounter.waitForCompletion ( remaining );
|
|
if ( timeout != 0.0 ) {
|
|
double delay = epicsTime::getCurrent () - beg_time;
|
|
remaining = timeout - delay;
|
|
}
|
|
}
|
|
|
|
this->ioCounter.cleanUp ();
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
if ( this->pudpiiu ) {
|
|
this->pudpiiu->connectTimeoutNotify ();
|
|
}
|
|
}
|
|
|
|
this->disableCallbackPreemption ();
|
|
|
|
return status;
|
|
}
|
|
|
|
int cac::pendEvent ( const double &timeout )
|
|
{
|
|
// prevent recursion nightmares by disabling calls to
|
|
// pendIO () from within a CA callback
|
|
if ( this->pRecvProcThread->isCurrentThread () ) {
|
|
return ECA_EVDISALLOW;
|
|
}
|
|
|
|
this->enableCallbackPreemption ();
|
|
|
|
this->flushRequest ();
|
|
|
|
if ( timeout == 0.0 ) {
|
|
while ( true ) {
|
|
epicsThreadSleep ( 60.0 );
|
|
}
|
|
}
|
|
else if ( timeout >= CAC_SIGNIFICANT_SELECT_DELAY ) {
|
|
epicsThreadSleep ( timeout );
|
|
}
|
|
|
|
this->disableCallbackPreemption ();
|
|
|
|
return ECA_TIMEOUT;
|
|
}
|
|
|
|
bool cac::ioComplete () const
|
|
{
|
|
if ( this->ioCounter.currentCount () == 0u ) {
|
|
return true;
|
|
}
|
|
else{
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// this is to only be used by early protocol revisions
|
|
bool cac::connectChannel ( unsigned id )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
nciu * pChan = this->chanTable.lookup ( id );
|
|
if ( pChan ) {
|
|
pChan->connect ();
|
|
return true;
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void cac::installCASG ( CASG &sg )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
this->sgTable.add ( sg );
|
|
}
|
|
|
|
void cac::uninstallCASG ( CASG &sg )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
this->sgTable.remove ( sg );
|
|
}
|
|
|
|
CASG * cac::lookupCASG ( unsigned id )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
CASG * psg = this->sgTable.lookup ( id );
|
|
if ( psg ) {
|
|
if ( ! psg->verify () ) {
|
|
psg = 0;
|
|
}
|
|
}
|
|
return psg;
|
|
}
|
|
|
|
void cac::registerService ( cacService &service )
|
|
{
|
|
this->services.registerService ( service );
|
|
}
|
|
|
|
cacChannel & cac::createChannel ( const char *pName, cacChannelNotify &chan )
|
|
{
|
|
cacChannel *pIO;
|
|
|
|
pIO = this->services.createChannel ( pName, chan );
|
|
if ( ! pIO ) {
|
|
pIO = cacGlobalServiceList.createChannel ( pName, chan );
|
|
if ( ! pIO ) {
|
|
if ( ! this->pudpiiu || ! this->pSearchTmr ) {
|
|
if ( ! this->setupUDP () ) {
|
|
throw ECA_INTERNAL;
|
|
}
|
|
}
|
|
epics_auto_ptr < cacChannel > pNetChan
|
|
( new nciu ( *this, limboIIU, chan, pName ) );
|
|
if ( pNetChan.get() ) {
|
|
return *pNetChan.release ();
|
|
}
|
|
else {
|
|
throw std::bad_alloc ();
|
|
}
|
|
}
|
|
}
|
|
return *pIO;
|
|
}
|
|
|
|
void cac::installNetworkChannel ( nciu & chan, netiiu * & piiu )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
this->chanTable.add ( chan );
|
|
this->pudpiiu->attachChannel ( chan );
|
|
piiu = this->pudpiiu;
|
|
this->pSearchTmr->resetPeriod ( 0.0 );
|
|
}
|
|
|
|
bool cac::setupUDP ()
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
if ( ! this->pudpiiu ) {
|
|
this->pudpiiu = new udpiiu ( *this );
|
|
if ( ! this->pudpiiu ) {
|
|
return false;
|
|
}
|
|
if ( ! this->enablePreemptiveCallback ) {
|
|
epicsAutoMutexRelease autoRelease ( this->mutex );
|
|
this->notify.fdWasCreated ( this->pudpiiu->getSock() );
|
|
}
|
|
}
|
|
|
|
if ( ! this->pSearchTmr ) {
|
|
this->pSearchTmr = new searchTimer ( *this->pudpiiu, *this->pTimerQueue, this->mutex );
|
|
if ( ! this->pSearchTmr ) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if ( ! this->pRepeaterSubscribeTmr ) {
|
|
this->pRepeaterSubscribeTmr = new repeaterSubscribeTimer ( *this->pudpiiu, *this->pTimerQueue );
|
|
if ( ! this->pRepeaterSubscribeTmr ) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void cac::enableCallbackPreemption ()
|
|
{
|
|
if ( this->pRecvProcThread ) {
|
|
this->pRecvProcThread->enable ();
|
|
}
|
|
}
|
|
|
|
void cac::disableCallbackPreemption ()
|
|
{
|
|
if ( this->pRecvProcThread ) {
|
|
this->pRecvProcThread->disable ();
|
|
}
|
|
}
|
|
|
|
void cac::repeaterSubscribeConfirmNotify ()
|
|
{
|
|
if ( this->pRepeaterSubscribeTmr ) {
|
|
this->pRepeaterSubscribeTmr->confirmNotify ();
|
|
}
|
|
}
|
|
|
|
bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
|
|
ca_uint16_t typeCode, arrayElementCount count,
|
|
unsigned minorVersionNumber, const osiSockAddr &addr,
|
|
const epicsTime & currentTime )
|
|
{
|
|
unsigned retrySeqNumber;
|
|
|
|
if ( addr.sa.sa_family != AF_INET ) {
|
|
return false;
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
nciu *chan;
|
|
|
|
/*
|
|
* ignore search replies for deleted channels
|
|
*/
|
|
chan = this->chanTable.lookup ( cid );
|
|
if ( ! chan ) {
|
|
return true;
|
|
}
|
|
|
|
retrySeqNumber = chan->getRetrySeqNo ();
|
|
|
|
/*
|
|
* Ignore duplicate search replies
|
|
*/
|
|
if ( chan->getPIIU()->isVirtaulCircuit( chan->pName(), addr ) ) {
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* look for an existing virtual circuit
|
|
*/
|
|
tcpiiu *piiu;
|
|
bhe *pBHE = this->beaconTable.lookup ( addr.ia );
|
|
if ( pBHE ) {
|
|
piiu = pBHE->getIIU ();
|
|
if ( piiu ) {
|
|
if ( ! piiu->alive () ) {
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
else {
|
|
pBHE = new bhe ( epicsTime (), addr.ia );
|
|
if ( pBHE ) {
|
|
if ( this->beaconTable.add ( *pBHE ) < 0 ) {
|
|
pBHE->destroy ();
|
|
return true;
|
|
}
|
|
}
|
|
else {
|
|
return true;
|
|
}
|
|
piiu = 0;
|
|
}
|
|
|
|
if ( ! piiu ) {
|
|
try {
|
|
piiu = new tcpiiu ( *this, this->connTMO, *this->pTimerQueue,
|
|
addr, minorVersionNumber, *pBHE, this->ipToAEngine );
|
|
if ( ! piiu ) {
|
|
return true;
|
|
}
|
|
this->iiuList.add ( *piiu );
|
|
|
|
}
|
|
catch ( ... ) {
|
|
this->printf ( "CAC: Exception caught during virtual circuit creation\n" );
|
|
return true;
|
|
}
|
|
}
|
|
|
|
this->pudpiiu->detachChannel ( *chan );
|
|
chan->searchReplySetUp ( *piiu, sid, typeCode, count );
|
|
piiu->attachChannel ( *chan );
|
|
|
|
chan->createChannelRequest ();
|
|
piiu->flushRequest ();
|
|
|
|
if ( ! piiu->ca_v42_ok () ) {
|
|
chan->connect ();
|
|
}
|
|
|
|
if ( this->pSearchTmr ) {
|
|
this->pSearchTmr->notifySearchResponse ( retrySeqNumber, currentTime );
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void cac::uninstallChannel ( nciu & chan )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
nciu *pChan = this->chanTable.remove ( chan );
|
|
assert ( pChan = &chan );
|
|
this->flushIfRequired ( chan );
|
|
chan.getPIIU()->clearChannelRequest ( chan );
|
|
chan.getPIIU()->detachChannel ( chan );
|
|
}
|
|
|
|
int cac::printf ( const char *pformat, ... ) const
|
|
{
|
|
va_list theArgs;
|
|
int status;
|
|
|
|
va_start ( theArgs, pformat );
|
|
|
|
status = this->vPrintf ( pformat, theArgs );
|
|
|
|
va_end ( theArgs );
|
|
|
|
return status;
|
|
}
|
|
|
|
// lock must be applied before calling this cac private routine
|
|
void cac::flushIfRequired ( nciu &chan )
|
|
{
|
|
if ( chan.getPIIU()->flushBlockThreshold() ) {
|
|
// the process thread is not permitted to flush as this
|
|
// can result in a push / pull deadlock on the TCP pipe.
|
|
// Instead, the process thread scheduals the flush with the
|
|
// send thread which runs at a higher priority than the
|
|
// send thread. The same applies to the UDP thread for
|
|
// locking hierarchy reasons.
|
|
bool blockPermit = true;
|
|
if ( this->pRecvProcThread ) {
|
|
if ( this->pRecvProcThread->isCurrentThread () ) {
|
|
blockPermit = false;
|
|
}
|
|
}
|
|
if ( this->pudpiiu ) {
|
|
if ( this->pudpiiu->isCurrentThread () ) {
|
|
blockPermit = false;
|
|
}
|
|
}
|
|
this->flushRequest ();
|
|
if ( blockPermit ) {
|
|
chan.getPIIU()->blockUntilSendBacklogIsReasonable ( this->mutex );
|
|
}
|
|
}
|
|
else {
|
|
chan.getPIIU()->flushRequestIfAboveEarlyThreshold ();
|
|
}
|
|
}
|
|
|
|
void cac::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
this->flushIfRequired ( chan );
|
|
chan.getPIIU()->writeRequest ( chan, type, nElem, pValue );
|
|
}
|
|
|
|
cacChannel::ioid cac::writeNotifyRequest ( nciu &chan, unsigned type, unsigned nElem,
|
|
const void *pValue, cacWriteNotify ¬ifyIn )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
autoPtrRecycle < netWriteNotifyIO > pIO ( *this, netWriteNotifyIO::factory (
|
|
this->freeListWriteNotifyIO, chan, notifyIn ) );
|
|
if ( pIO.get() ) {
|
|
this->ioTable.add ( *pIO );
|
|
chan.cacPrivateListOfIO::eventq.add ( *pIO );
|
|
this->flushIfRequired ( chan );
|
|
chan.getPIIU()->writeNotifyRequest (
|
|
chan, *pIO, type, nElem, pValue );
|
|
return pIO.release()->getId ();
|
|
}
|
|
else {
|
|
throw std::bad_alloc ();
|
|
}
|
|
}
|
|
|
|
cacChannel::ioid cac::readNotifyRequest ( nciu &chan, unsigned type, unsigned nElem, cacReadNotify ¬ifyIn )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
autoPtrRecycle < netReadNotifyIO > pIO ( *this, netReadNotifyIO::factory (
|
|
this->freeListReadNotifyIO, chan, notifyIn ) );
|
|
if ( pIO.get() ) {
|
|
this->flushIfRequired ( chan );
|
|
this->ioTable.add ( *pIO );
|
|
chan.cacPrivateListOfIO::eventq.add ( *pIO );
|
|
chan.getPIIU()->readNotifyRequest ( chan, *pIO, type, nElem );
|
|
return pIO.release()->getId ();
|
|
}
|
|
else {
|
|
throw std::bad_alloc ();
|
|
}
|
|
}
|
|
|
|
void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
|
|
{
|
|
bool signalNeeded;
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
baseNMIU * pmiu = this->ioTable.remove ( id );
|
|
if ( pmiu ) {
|
|
chan.cacPrivateListOfIO::eventq.remove ( *pmiu );
|
|
pmiu->destroy ( *this );
|
|
}
|
|
assert ( this->threadsBlockingOnNotifyCompletion < UINT_MAX );
|
|
this->threadsBlockingOnNotifyCompletion++;
|
|
while ( this->ioInProgress && this->ioNotifyInProgressId == id ) {
|
|
epicsAutoMutexRelease autoRelease ( this->mutex );
|
|
this->notifyCompletionEvent.wait ( 0.5 );
|
|
}
|
|
assert ( this->threadsBlockingOnNotifyCompletion > 0u );
|
|
this->threadsBlockingOnNotifyCompletion--;
|
|
signalNeeded = this->threadsBlockingOnNotifyCompletion > 0u;
|
|
}
|
|
if ( signalNeeded ) {
|
|
this->notifyCompletionEvent.signal ();
|
|
}
|
|
}
|
|
|
|
void cac::ioShow ( const cacChannel::ioid &id, unsigned level ) const
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
baseNMIU * pmiu = this->ioTable.lookup ( id );
|
|
if ( pmiu ) {
|
|
pmiu->show ( level );
|
|
}
|
|
}
|
|
|
|
void cac::ioCompletionNotify ( unsigned id, unsigned type,
|
|
arrayElementCount count, const void *pData )
|
|
{
|
|
baseNMIU * pmiu = this->ioTable.lookup ( id );
|
|
if ( ! pmiu ) {
|
|
return;
|
|
}
|
|
assert ( ! this->ioInProgress );
|
|
this->ioInProgress = true;
|
|
this->ioNotifyInProgressId = id;
|
|
{
|
|
epicsAutoMutexRelease autoRelease ( this->mutex );
|
|
pmiu->completion ( type, count, pData );
|
|
}
|
|
// threads blocked canceling this IO will wait
|
|
// until we stop processing this IO
|
|
this->ioInProgress = false;
|
|
if ( this->threadsBlockingOnNotifyCompletion ) {
|
|
this->notifyCompletionEvent.signal ();
|
|
}
|
|
}
|
|
|
|
void cac::ioExceptionNotify ( unsigned id, int status, const char *pContext )
|
|
{
|
|
baseNMIU * pmiu = this->ioTable.lookup ( id );
|
|
if ( ! pmiu ) {
|
|
return;
|
|
}
|
|
assert ( ! this->ioInProgress );
|
|
this->ioInProgress = true;
|
|
this->ioNotifyInProgressId = id;
|
|
{
|
|
epicsAutoMutexRelease autoRelease ( this->mutex );
|
|
pmiu->exception ( status, pContext );
|
|
}
|
|
// threads blocked canceling this IO will wait
|
|
// until we stop processing this IO
|
|
this->ioInProgress = false;
|
|
if ( this->threadsBlockingOnNotifyCompletion ) {
|
|
this->notifyCompletionEvent.signal ();
|
|
}
|
|
}
|
|
|
|
void cac::ioExceptionNotify ( unsigned id, int status,
|
|
const char *pContext, unsigned type, arrayElementCount count )
|
|
{
|
|
baseNMIU * pmiu = this->ioTable.lookup ( id );
|
|
if ( ! pmiu ) {
|
|
return;
|
|
}
|
|
assert ( ! this->ioInProgress );
|
|
this->ioInProgress = true;
|
|
this->ioNotifyInProgressId = id;
|
|
{
|
|
epicsAutoMutexRelease autoRelease ( this->mutex );
|
|
pmiu->exception ( status, pContext, type, count );
|
|
}
|
|
// threads blocked canceling this IO will wait
|
|
// until we stop processing this IO
|
|
this->ioInProgress = false;
|
|
if ( this->threadsBlockingOnNotifyCompletion ) {
|
|
this->notifyCompletionEvent.signal ();
|
|
}
|
|
}
|
|
|
|
void cac::ioCompletionNotifyAndDestroy ( unsigned id )
|
|
{
|
|
baseNMIU * pmiu = this->ioTable.remove ( id );
|
|
if ( ! pmiu ) {
|
|
return;
|
|
}
|
|
pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu );
|
|
assert ( ! this->ioInProgress );
|
|
this->ioInProgress = true;
|
|
this->ioNotifyInProgressId = id;
|
|
{
|
|
epicsAutoMutexRelease autoRelease ( this->mutex );
|
|
pmiu->completion ();
|
|
}
|
|
pmiu->destroy ( *this );
|
|
// threads blocked canceling this IO will wait
|
|
// until we stop processing this IO
|
|
this->ioInProgress = false;
|
|
if ( this->threadsBlockingOnNotifyCompletion ) {
|
|
this->notifyCompletionEvent.signal ();
|
|
}
|
|
}
|
|
|
|
void cac::ioCompletionNotifyAndDestroy ( unsigned id,
|
|
unsigned type, arrayElementCount count, const void *pData )
|
|
{
|
|
baseNMIU * pmiu = this->ioTable.remove ( id );
|
|
if ( ! pmiu ) {
|
|
return;
|
|
}
|
|
assert ( ! this->ioInProgress );
|
|
this->ioInProgress = true;
|
|
this->ioNotifyInProgressId = id;
|
|
pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu );
|
|
{
|
|
epicsAutoMutexRelease autoRelease ( this->mutex );
|
|
pmiu->completion ( type, count, pData );
|
|
}
|
|
pmiu->destroy ( *this );
|
|
// threads blocked canceling this IO will wait
|
|
// until we stop processing this IO
|
|
this->ioInProgress = false;
|
|
if ( this->threadsBlockingOnNotifyCompletion ) {
|
|
this->notifyCompletionEvent.signal ();
|
|
}
|
|
}
|
|
|
|
void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
|
|
const char *pContext )
|
|
{
|
|
baseNMIU * pmiu = this->ioTable.remove ( id );
|
|
if ( ! pmiu ) {
|
|
return;
|
|
}
|
|
assert ( ! this->ioInProgress );
|
|
this->ioInProgress = true;
|
|
this->ioNotifyInProgressId = id;
|
|
pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu );
|
|
{
|
|
epicsAutoMutexRelease autoRelease ( this->mutex );
|
|
pmiu->exception ( status, pContext );
|
|
}
|
|
pmiu->destroy ( *this );
|
|
// threads blocked canceling this IO will wait
|
|
// until we stop processing this IO
|
|
this->ioInProgress = false;
|
|
if ( this->threadsBlockingOnNotifyCompletion ) {
|
|
this->notifyCompletionEvent.signal ();
|
|
}
|
|
}
|
|
|
|
void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
|
|
const char *pContext, unsigned type, arrayElementCount count )
|
|
{
|
|
baseNMIU * pmiu = this->ioTable.remove ( id );
|
|
if ( ! pmiu ) {
|
|
return;
|
|
}
|
|
assert ( ! this->ioInProgress );
|
|
this->ioInProgress = true;
|
|
this->ioNotifyInProgressId = id;
|
|
pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu );
|
|
{
|
|
epicsAutoMutexRelease autoRelease ( this->mutex );
|
|
pmiu->exception ( status, pContext, type, count );
|
|
}
|
|
pmiu->destroy ( *this );
|
|
// threads blocked canceling this IO will wait
|
|
// until we stop processing this IO
|
|
this->ioInProgress = false;
|
|
if ( this->threadsBlockingOnNotifyCompletion ) {
|
|
this->notifyCompletionEvent.signal ();
|
|
}
|
|
}
|
|
|
|
// resubscribe for monitors from this channel
|
|
void cac::connectAllIO ( nciu &chan )
|
|
{
|
|
tsDLList < baseNMIU > tmpList;
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
tsDLIterBD < baseNMIU > pNetIO =
|
|
chan.cacPrivateListOfIO::eventq.firstIter ();
|
|
while ( pNetIO.valid () ) {
|
|
tsDLIterBD < baseNMIU > next = pNetIO;
|
|
next++;
|
|
class netSubscription *pSubscr = pNetIO->isSubscription ();
|
|
if ( pSubscr ) {
|
|
try {
|
|
chan.getPIIU()->subscriptionRequest ( chan, *pSubscr );
|
|
}
|
|
catch (...) {
|
|
this->printf ( "cac: no memory to queue event subscription\n" );
|
|
}
|
|
}
|
|
else {
|
|
// it shouldnt be here at this point - so uninstall it
|
|
this->ioTable.remove ( *pNetIO );
|
|
chan.cacPrivateListOfIO::eventq.remove ( *pNetIO );
|
|
tmpList.add ( *pNetIO );
|
|
}
|
|
pNetIO = next;
|
|
}
|
|
chan.getPIIU()->flushRequest ();
|
|
}
|
|
while ( baseNMIU *pIO = tmpList.get () ) {
|
|
pIO->destroy ( *this );
|
|
}
|
|
}
|
|
|
|
// cancel IO operations and monitor subscriptions
|
|
void cac::disconnectAllIO ( nciu &chan )
|
|
{
|
|
tsDLList < baseNMIU > tmpList;
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
tsDLIterBD < baseNMIU > pNetIO =
|
|
chan.cacPrivateListOfIO::eventq.firstIter ();
|
|
while ( pNetIO.valid () ) {
|
|
tsDLIterBD < baseNMIU > next = pNetIO;
|
|
next++;
|
|
if ( ! pNetIO->isSubscription () ) {
|
|
// no use after disconnected - so uninstall it
|
|
this->ioTable.remove ( *pNetIO );
|
|
chan.cacPrivateListOfIO::eventq.remove ( *pNetIO );
|
|
tmpList.add ( *pNetIO );
|
|
}
|
|
pNetIO = next;
|
|
}
|
|
}
|
|
while ( baseNMIU *pIO = tmpList.get () ) {
|
|
char buf[128];
|
|
sprintf ( buf, "host = %100s", chan.pHostName() );
|
|
pIO->exception ( ECA_DISCONN, buf );
|
|
pIO->destroy ( *this );
|
|
}
|
|
}
|
|
|
|
void cac::destroyAllIO ( nciu &chan )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
while ( baseNMIU *pIO = chan.cacPrivateListOfIO::eventq.get() ) {
|
|
this->ioTable.remove ( *pIO );
|
|
this->flushIfRequired ( chan );
|
|
class netSubscription *pSubscr = pIO->isSubscription ();
|
|
if ( pSubscr ) {
|
|
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
|
|
}
|
|
pIO->destroy ( *this );
|
|
}
|
|
}
|
|
|
|
void cac::recycleReadNotifyIO ( netReadNotifyIO &io )
|
|
{
|
|
this->freeListReadNotifyIO.release ( &io, sizeof ( io ) );
|
|
}
|
|
|
|
void cac::recycleWriteNotifyIO ( netWriteNotifyIO &io )
|
|
{
|
|
this->freeListWriteNotifyIO.release ( &io, sizeof ( io ) );
|
|
}
|
|
|
|
void cac::recycleSubscription ( netSubscription &io )
|
|
{
|
|
this->freeListSubscription.release ( &io, sizeof ( io ) );
|
|
}
|
|
|
|
cacChannel::ioid cac::subscriptionRequest ( nciu &chan, unsigned type,
|
|
arrayElementCount nElem, unsigned mask, cacStateNotify ¬ifyIn )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
autoPtrRecycle < netSubscription > pIO ( *this, netSubscription::factory (
|
|
this->freeListSubscription, chan, type, nElem, mask, notifyIn ) );
|
|
if ( pIO.get() ) {
|
|
chan.cacPrivateListOfIO::eventq.add ( *pIO );
|
|
this->ioTable.add ( *pIO );
|
|
if ( chan.connected () ) {
|
|
try {
|
|
this->flushIfRequired ( chan );
|
|
chan.getPIIU()->subscriptionRequest ( chan, *pIO );
|
|
}
|
|
catch ( ... ) {
|
|
chan.cacPrivateListOfIO::eventq.remove ( *pIO );
|
|
this->ioTable.remove ( *pIO );
|
|
throw;
|
|
}
|
|
}
|
|
cacChannel::ioid id = pIO->getId ();
|
|
pIO.release ();
|
|
return id;
|
|
}
|
|
else {
|
|
throw std::bad_alloc();
|
|
}
|
|
}
|
|
|
|
bool cac::noopAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ )
|
|
{
|
|
return true;
|
|
}
|
|
|
|
bool cac::echoRespAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ )
|
|
{
|
|
return true;
|
|
}
|
|
|
|
bool cac::writeNotifyRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * /* pMsgBdy */ )
|
|
{
|
|
int caStatus = hdr.m_cid;
|
|
if ( caStatus == ECA_NORMAL ) {
|
|
this->ioCompletionNotifyAndDestroy ( hdr.m_available );
|
|
}
|
|
else {
|
|
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
|
|
caStatus, "write notify request rejected" );
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool cac::readNotifyRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy )
|
|
{
|
|
|
|
/*
|
|
* the channel id field is abused for
|
|
* read notify status starting with CA V4.1
|
|
*/
|
|
int caStatus;
|
|
if ( iiu.ca_v41_ok() ) {
|
|
caStatus = hdr.m_cid;
|
|
}
|
|
else {
|
|
caStatus = ECA_NORMAL;
|
|
}
|
|
|
|
/*
|
|
* convert the data buffer from net
|
|
* format to host format
|
|
*/
|
|
# ifdef CONVERSION_REQUIRED
|
|
if ( hdr.m_dataType < NELEMENTS ( cac_dbr_cvrt ) ) {
|
|
( *cac_dbr_cvrt[ hdr.m_dataType ] ) (
|
|
pMsgBdy, pMsgBdy, false, hdr.m_count);
|
|
}
|
|
else {
|
|
caStatus = ECA_BADTYPE;
|
|
}
|
|
# endif
|
|
|
|
if ( caStatus == ECA_NORMAL ) {
|
|
this->ioCompletionNotifyAndDestroy ( hdr.m_available,
|
|
hdr.m_dataType, hdr.m_count, pMsgBdy );
|
|
}
|
|
else {
|
|
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
|
|
caStatus, "read failed", hdr.m_dataType, hdr.m_count );
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool cac::eventRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy )
|
|
{
|
|
int caStatus;
|
|
|
|
/*
|
|
* m_postsize = 0 used to be a confirmation, but is
|
|
* now a noop because the IO block is immediately
|
|
* deleted
|
|
*/
|
|
if ( ! hdr.m_postsize ) {
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* the channel id field is abused for
|
|
* read notify status starting with CA V4.1
|
|
*/
|
|
if ( iiu.ca_v41_ok() ) {
|
|
caStatus = hdr.m_cid;
|
|
}
|
|
else {
|
|
caStatus = ECA_NORMAL;
|
|
}
|
|
|
|
/*
|
|
* convert the data buffer from net format to host format
|
|
*/
|
|
# ifdef CONVERSION_REQUIRED
|
|
if ( hdr.m_dataType < NELEMENTS ( cac_dbr_cvrt ) ) {
|
|
( *cac_dbr_cvrt [ hdr.m_dataType ] )(
|
|
pMsgBdy, pMsgBdy, false, hdr.m_count);
|
|
}
|
|
else {
|
|
caStatus = htonl ( ECA_BADTYPE );
|
|
}
|
|
# endif
|
|
|
|
if ( caStatus == ECA_NORMAL ) {
|
|
this->ioCompletionNotify ( hdr.m_available,
|
|
hdr.m_dataType, hdr.m_count, pMsgBdy );
|
|
}
|
|
else {
|
|
this->ioExceptionNotify ( hdr.m_available,
|
|
caStatus, "subscription update failed",
|
|
hdr.m_dataType, hdr.m_count );
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool cac::readRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void *pMsgBdy )
|
|
{
|
|
this->ioCompletionNotifyAndDestroy ( hdr.m_available,
|
|
hdr.m_dataType, hdr.m_count, pMsgBdy );
|
|
return true;
|
|
}
|
|
|
|
bool cac::clearChannelRespAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ )
|
|
{
|
|
return true; // currently a noop
|
|
}
|
|
|
|
bool cac::defaultExcep ( tcpiiu &iiu, const caHdrLargeArray &,
|
|
const char *pCtx, unsigned status )
|
|
{
|
|
char buf[512];
|
|
char hostName[64];
|
|
iiu.hostName ( hostName, sizeof ( hostName ) );
|
|
sprintf ( buf, "host=%64s ctx=%400s", hostName, pCtx );
|
|
this->notify.exception ( status, buf, 0, 0u );
|
|
return true;
|
|
}
|
|
|
|
bool cac::eventAddExcep ( tcpiiu &iiu, const caHdrLargeArray &hdr,
|
|
const char *pCtx, unsigned status )
|
|
{
|
|
this->ioExceptionNotify ( hdr.m_available, status, pCtx,
|
|
hdr.m_dataType, hdr.m_count );
|
|
return true;
|
|
}
|
|
|
|
bool cac::readExcep ( tcpiiu &, const caHdrLargeArray &hdr,
|
|
const char *pCtx, unsigned status )
|
|
{
|
|
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
|
|
status, pCtx, hdr.m_dataType, hdr.m_count );
|
|
return true;
|
|
}
|
|
|
|
bool cac::writeExcep ( tcpiiu &, const caHdrLargeArray &hdr,
|
|
const char *pCtx, unsigned status )
|
|
{
|
|
nciu * pChan = this->chanTable.lookup ( hdr.m_available );
|
|
if ( pChan ) {
|
|
pChan->writeException ( status, pCtx,
|
|
hdr.m_dataType, hdr.m_count );
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool cac::readNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr,
|
|
const char *pCtx, unsigned status )
|
|
{
|
|
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
|
|
status, pCtx, hdr.m_dataType, hdr.m_count );
|
|
return true;
|
|
}
|
|
|
|
bool cac::writeNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr,
|
|
const char *pCtx, unsigned status )
|
|
{
|
|
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
|
|
status, pCtx, hdr.m_dataType, hdr.m_count );
|
|
return true;
|
|
}
|
|
|
|
bool cac::exceptionRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy )
|
|
{
|
|
const caHdr * pReq = reinterpret_cast < const caHdr * > ( pMsgBdy );
|
|
unsigned bytesSoFar = sizeof ( *pReq );
|
|
if ( hdr.m_postsize < bytesSoFar ) {
|
|
return false;
|
|
}
|
|
caHdrLargeArray req;
|
|
req.m_cmmd = ntohs ( pReq->m_cmmd );
|
|
req.m_postsize = ntohs ( pReq->m_postsize );
|
|
req.m_dataType = ntohs ( pReq->m_dataType );
|
|
req.m_count = ntohs ( pReq->m_count );
|
|
req.m_cid = ntohl ( pReq->m_cid );
|
|
req.m_available = ntohl ( pReq->m_available );
|
|
const ca_uint32_t * pLW = reinterpret_cast < const ca_uint32_t * > ( pReq + 1 );
|
|
if ( req.m_postsize == 0xffff ) {
|
|
static const unsigned annexSize =
|
|
sizeof ( req.m_postsize ) + sizeof ( req.m_count );
|
|
bytesSoFar += annexSize;
|
|
if ( hdr.m_postsize < bytesSoFar ) {
|
|
return false;
|
|
}
|
|
req.m_postsize = ntohl ( pLW[0] );
|
|
req.m_count = ntohl ( pLW[1] );
|
|
pLW += 2u;
|
|
}
|
|
|
|
// execute the exception message
|
|
pExcepProtoStubTCP pStub;
|
|
if ( hdr.m_cmmd >= NELEMENTS ( cac::tcpExcepJumpTableCAC ) ) {
|
|
pStub = &cac::defaultExcep;
|
|
}
|
|
else {
|
|
pStub = cac::tcpExcepJumpTableCAC [req.m_cmmd];
|
|
}
|
|
const char *pCtx = reinterpret_cast < const char * > ( pLW );
|
|
return ( this->*pStub ) ( iiu, req, pCtx, hdr.m_available );
|
|
}
|
|
|
|
bool cac::accessRightsRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * /* pMsgBdy */ )
|
|
{
|
|
nciu * pChan = this->chanTable.lookup ( hdr.m_cid );
|
|
if ( pChan ) {
|
|
unsigned ar = hdr.m_available;
|
|
caAccessRights accessRights (
|
|
( ar & CA_PROTO_ACCESS_RIGHT_READ ) ? true : false,
|
|
( ar & CA_PROTO_ACCESS_RIGHT_WRITE ) ? true : false);
|
|
pChan->accessRightsStateChange ( accessRights );
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool cac::claimCIURespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy )
|
|
{
|
|
nciu * pChan = this->chanTable.lookup ( hdr.m_cid );
|
|
if ( pChan ) {
|
|
unsigned sidTmp;
|
|
if ( iiu.ca_v44_ok() ) {
|
|
sidTmp = hdr.m_available;
|
|
}
|
|
else {
|
|
sidTmp = pChan->getSID ();
|
|
}
|
|
pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp );
|
|
return true;
|
|
}
|
|
else {
|
|
return true; // ignore claim response to deleted channel
|
|
}
|
|
}
|
|
|
|
bool cac::verifyAndDisconnectChan ( tcpiiu &, const caHdrLargeArray &hdr, void * /* pMsgBdy */ )
|
|
{
|
|
nciu * pChan = this->chanTable.lookup ( hdr.m_cid );
|
|
if ( pChan ) {
|
|
assert ( this->pudpiiu && this->pSearchTmr );
|
|
pChan->disconnect ( *this->pudpiiu );
|
|
this->pSearchTmr->resetPeriod ( 0.0 );
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool cac::badTCPRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void * /* pMsgBdy */ )
|
|
{
|
|
char hostName[64];
|
|
iiu.hostName ( hostName, sizeof(hostName) );
|
|
this->printf ( "CAC: Undecipherable TCP message ( bad response type %u ) from %s\n",
|
|
hdr.m_cmmd, hostName );
|
|
return false;
|
|
}
|
|
|
|
bool cac::executeResponse ( tcpiiu &iiu, caHdrLargeArray &hdr, char *pMshBody )
|
|
{
|
|
// execute the response message
|
|
pProtoStubTCP pStub;
|
|
if ( hdr.m_cmmd >= NELEMENTS ( cac::tcpJumpTableCAC ) ) {
|
|
pStub = &cac::badTCPRespAction;
|
|
}
|
|
else {
|
|
pStub = cac::tcpJumpTableCAC [hdr.m_cmmd];
|
|
}
|
|
return ( this->*pStub ) ( iiu, hdr, pMshBody );
|
|
}
|
|
|
|
void cac::signal ( int ca_status, const char *pfilenm,
|
|
int lineno, const char *pFormat, ... )
|
|
{
|
|
va_list theArgs;
|
|
va_start ( theArgs, pFormat );
|
|
this->vSignal ( ca_status, pfilenm, lineno, pFormat, theArgs);
|
|
va_end ( theArgs );
|
|
}
|
|
|
|
void cac::vSignal ( int ca_status, const char *pfilenm,
|
|
int lineno, const char *pFormat, va_list args )
|
|
{
|
|
static const char *severity[] =
|
|
{
|
|
"Warning",
|
|
"Success",
|
|
"Error",
|
|
"Info",
|
|
"Fatal",
|
|
"Fatal",
|
|
"Fatal",
|
|
"Fatal"
|
|
};
|
|
|
|
this->printf ( "CA.Client.Diagnostic..............................................\n" );
|
|
|
|
this->printf ( " %s: \"%s\"\n",
|
|
severity[ CA_EXTRACT_SEVERITY ( ca_status ) ],
|
|
ca_message ( ca_status ) );
|
|
|
|
if ( pFormat ) {
|
|
this->printf ( " Context: \"" );
|
|
this->vPrintf ( pFormat, args );
|
|
this->printf ( "\"\n" );
|
|
}
|
|
|
|
if (pfilenm) {
|
|
this->printf ( " Source File: %s Line Number: %d\n",
|
|
pfilenm, lineno );
|
|
}
|
|
|
|
/*
|
|
* Terminate execution if unsuccessful
|
|
*/
|
|
if( ! ( ca_status & CA_M_SUCCESS ) &&
|
|
CA_EXTRACT_SEVERITY ( ca_status ) != CA_K_WARNING ){
|
|
errlogFlush();
|
|
abort();
|
|
}
|
|
|
|
this->printf ( "..................................................................\n" );
|
|
}
|
|
|