Files
pcas/src/ca/cac.cpp

1895 lines
55 KiB
C++

/* $Id$
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
* Author: Jeff Hill
*/
#define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
#include <new>
#include "epicsMemory.h"
#include "osiProcess.h"
#include "osiSigPipeIgnore.h"
#include "envDefs.h"
#include "iocinf.h"
#include "cac.h"
#include "inetAddrID.h"
#include "caServerID.h"
#include "virtualCircuit.h"
#include "syncGroup.h"
#include "nciu.h"
#include "autoPtrRecycle.h"
#include "searchTimer.h"
#include "repeaterSubscribeTimer.h"
#define epicsExportSharedSymbols
#include "udpiiu.h"
#include "bhe.h"
#include "net_convert.h"
#undef epicsExportSharedSymbols
#if defined ( _MSC_VER )
# pragma warning ( push )
# pragma warning ( disable: 4660 )
#endif
template class tsSLNode < nciu >;
template class resTable < nciu, chronIntId >;
template class chronIntIdResTable < nciu >;
template class tsSLNode < tcpiiu >;
template class resTable < tcpiiu, caServerID >;
template class tsSLNode < bhe >;
template class resTable < bhe, inetAddrID >;
template class tsSLNode < baseNMIU >;
template class resTable < baseNMIU, chronIntId >;
template class chronIntIdResTable < baseNMIU >;
template class resTable < CASG, chronIntId >;
template class chronIntIdResTable < CASG >;
template class tsFreeList < netReadNotifyIO, 1024, 0 >;
template class tsFreeList < netWriteNotifyIO, 1024, 0 >;
template class tsFreeList < netSubscription, 1024, 0 >;
#if defined ( _MSC_VER )
# pragma warning ( pop )
#endif
// TCP response dispatch table
const cac::pProtoStubTCP cac::tcpJumpTableCAC [] =
{
&cac::noopAction,
&cac::eventRespAction,
&cac::badTCPRespAction,
&cac::readRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::exceptionRespAction,
&cac::clearChannelRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::readNotifyRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::claimCIURespAction,
&cac::writeNotifyRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::accessRightsRespAction,
&cac::echoRespAction,
&cac::badTCPRespAction,
&cac::badTCPRespAction,
&cac::verifyAndDisconnectChan,
&cac::verifyAndDisconnectChan
};
// TCP exception dispatch table
const cac::pExcepProtoStubTCP cac::tcpExcepJumpTableCAC [] =
{
&cac::defaultExcep, // CA_PROTO_VERSION
&cac::eventAddExcep, // CA_PROTO_EVENT_ADD
&cac::defaultExcep, // CA_PROTO_EVENT_CANCEL
&cac::readExcep, // CA_PROTO_READ
&cac::writeExcep, // CA_PROTO_WRITE
&cac::defaultExcep, // CA_PROTO_SNAPSHOT
&cac::defaultExcep, // CA_PROTO_SEARCH
&cac::defaultExcep, // CA_PROTO_BUILD
&cac::defaultExcep, // CA_PROTO_EVENTS_OFF
&cac::defaultExcep, // CA_PROTO_EVENTS_ON
&cac::defaultExcep, // CA_PROTO_READ_SYNC
&cac::defaultExcep, // CA_PROTO_ERROR
&cac::defaultExcep, // CA_PROTO_CLEAR_CHANNEL
&cac::defaultExcep, // CA_PROTO_RSRV_IS_UP
&cac::defaultExcep, // CA_PROTO_NOT_FOUND
&cac::readNotifyExcep, // CA_PROTO_READ_NOTIFY
&cac::defaultExcep, // CA_PROTO_READ_BUILD
&cac::defaultExcep, // REPEATER_CONFIRM
&cac::defaultExcep, // CA_PROTO_CLAIM_CIU
&cac::writeNotifyExcep, // CA_PROTO_WRITE_NOTIFY
&cac::defaultExcep, // CA_PROTO_CLIENT_NAME
&cac::defaultExcep, // CA_PROTO_HOST_NAME
&cac::defaultExcep, // CA_PROTO_ACCESS_RIGHTS
&cac::defaultExcep, // CA_PROTO_ECHO
&cac::defaultExcep, // REPEATER_REGISTER
&cac::defaultExcep, // CA_PROTO_SIGNAL
&cac::defaultExcep, // CA_PROTO_CLAIM_CIU_FAILED
&cac::defaultExcep // CA_PROTO_SERVER_DISCONN
};
epicsThreadPrivateId caClientCallbackThreadId;
static epicsThreadOnceId cacOnce = EPICS_THREAD_ONCE_INIT;
extern "C" void cacExitHandler ()
{
epicsThreadPrivateDelete ( caClientCallbackThreadId );
}
// runs once only for each process
extern "C" void cacOnceFunc ( void * )
{
caClientCallbackThreadId = epicsThreadPrivateCreate ();
if ( caClientCallbackThreadId ) {
atexit ( cacExitHandler );
}
else {
throw std::bad_alloc ();
}
}
//
// cac::cac ()
//
cac::cac ( cacNotify & notifyIn, bool enablePreemptiveCallbackIn ) :
ipToAEngine ( "caIPAddrToAsciiEngine" ),
programBeginTime ( epicsTime::getCurrent() ),
connTMO ( CA_CONN_VERIFY_PERIOD ),
timerQueue ( epicsTimerQueueActive::allocate ( false,
lowestPriorityLevelAbove(epicsThreadGetPrioritySelf()) ) ),
pUserName ( 0 ),
pudpiiu ( 0 ),
pSearchTmr ( 0 ),
pRepeaterSubscribeTmr ( 0 ),
tcpSmallRecvBufFreeList ( 0 ),
tcpLargeRecvBufFreeList ( 0 ),
pCallbackLocker ( 0 ),
notify ( notifyIn ),
initializingThreadsId ( epicsThreadGetIdSelf() ),
initializingThreadsPriority ( epicsThreadGetPrioritySelf() ),
maxRecvBytesTCP ( MAX_TCP ),
pndRecvCnt ( 0u ),
readSeq ( 0u ),
recvThreadsPendingCount ( 0u )
{
if ( ! osiSockAttach () ) {
throwWithLocation ( caErrorCode (ECA_INTERNAL) );
}
epicsThreadOnce ( &cacOnce, cacOnceFunc, 0 );
try {
long status;
installSigPipeIgnore ();
{
char tmp[256];
size_t len;
osiGetUserNameReturn gunRet;
gunRet = osiGetUserName ( tmp, sizeof (tmp) );
if ( gunRet != osiGetUserNameSuccess ) {
tmp[0] = '\0';
}
len = strlen ( tmp ) + 1;
this->pUserName = new ( std::nothrow ) char [ len ];
if ( ! this->pUserName ) {
throw std::bad_alloc ();
}
strncpy ( this->pUserName, tmp, len );
}
status = envGetDoubleConfigParam ( &EPICS_CA_CONN_TMO, &this->connTMO );
if ( status ) {
this->connTMO = CA_CONN_VERIFY_PERIOD;
this->printf ( "EPICS \"%s\" double fetch failed\n", EPICS_CA_CONN_TMO.name);
this->printf ( "Defaulting \"%s\" = %f\n", EPICS_CA_CONN_TMO.name, this->connTMO);
}
long maxBytesAsALong;
status = envGetLongConfigParam ( &EPICS_CA_MAX_ARRAY_BYTES, &maxBytesAsALong );
if ( status || maxBytesAsALong < 0 ) {
errlogPrintf ( "cac: EPICS_CA_MAX_ARRAY_BYTES was not a positive integer\n" );
}
else {
/* allow room for the protocol header so that they get the array size they requested */
static const unsigned headerSize = sizeof ( caHdr ) + 2 * sizeof ( ca_uint32_t );
ca_uint32_t maxBytes = ( unsigned ) maxBytesAsALong;
if ( maxBytes < 0xffffffff - headerSize ) {
maxBytes += headerSize;
}
else {
maxBytes = 0xffffffff;
}
if ( maxBytes < MAX_TCP ) {
errlogPrintf ( "cac: EPICS_CA_MAX_ARRAY_BYTES was rounded up to %u\n", MAX_TCP );
}
else {
this->maxRecvBytesTCP = maxBytes;
}
}
freeListInitPvt ( &this->tcpSmallRecvBufFreeList, MAX_TCP, 1 );
if ( ! this->tcpSmallRecvBufFreeList ) {
throw std::bad_alloc ();
}
freeListInitPvt ( &this->tcpLargeRecvBufFreeList, this->maxRecvBytesTCP, 1 );
if ( ! this->tcpLargeRecvBufFreeList ) {
throw std::bad_alloc ();
}
if ( ! enablePreemptiveCallbackIn ) {
this->pCallbackLocker = new callbackAutoMutex ( *this );
}
}
catch ( ... ) {
osiSockRelease ();
delete [] this->pUserName;
if ( this->tcpSmallRecvBufFreeList ) {
freeListCleanup ( this->tcpSmallRecvBufFreeList );
}
if ( this->tcpLargeRecvBufFreeList ) {
freeListCleanup ( this->tcpLargeRecvBufFreeList );
}
this->timerQueue.release ();
throw;
}
}
cac::~cac ()
{
//
// release callback lock
//
delete this->pCallbackLocker;
//
// lock intentionally not held here so that we dont deadlock
// waiting for the UDP thread to exit while it is waiting to
// get the lock.
if ( this->pudpiiu ) {
// this blocks until the UDP thread exits so that
// it will not sneak in any new clients
this->pudpiiu->shutdown ();
}
//
// shutdown all tcp connections
//
{
epicsAutoMutex autoMutex ( this->mutex );
this->serverTable.traverse ( & tcpiiu::cleanShutdown );
}
//
// wait for tcp threads to exit
//
while ( this->serverTable.numEntriesInstalled() ) {
this->iiuUninstal.wait ();
}
delete this->pRepeaterSubscribeTmr;
delete this->pSearchTmr;
freeListCleanup ( this->tcpSmallRecvBufFreeList );
freeListCleanup ( this->tcpLargeRecvBufFreeList );
{
epicsAutoMutex autoMutexCB ( this->callbackMutex );
epicsAutoMutex autoMutexCAC ( this->mutex );
if ( this->pudpiiu ) {
this->removeAllChan ( *this->pudpiiu );
}
}
delete this->pudpiiu;
delete [] this->pUserName;
this->beaconTable.traverse ( &bhe::destroy );
// its ok for channels and subscriptions to still
// exist at this point. The user created them and
// its his responsibility to clean them up.
osiSockRelease ();
this->timerQueue.release ();
}
// must have callback lock and also cac lock
void cac::removeAllChan ( netiiu & srcIIU, netiiu *pDstIIU )
{
// we are protected here because channel delete takes the callback mutex
while ( nciu *pChan = srcIIU.firstChannel() ) {
// if the claim reply has not returned then we will issue
// the clear channel request to the server when the claim reply
// arrives and there is no matching nciu in the client
if ( pChan->connected() ) {
srcIIU.clearChannelRequest ( pChan->getSID(), pChan->getCID() );
}
this->disconnectChannelPrivate ( *pChan, pDstIIU );
}
}
unsigned cac::lowestPriorityLevelAbove ( unsigned priority )
{
unsigned abovePriority;
epicsThreadBooleanStatus tbs;
tbs = epicsThreadLowestPriorityLevelAbove (
priority, & abovePriority );
if ( tbs != epicsThreadBooleanStatusSuccess ) {
abovePriority = priority;
}
return abovePriority;
}
unsigned cac::highestPriorityLevelBelow ( unsigned priority )
{
unsigned belowPriority;
epicsThreadBooleanStatus tbs;
tbs = epicsThreadHighestPriorityLevelBelow (
priority, & belowPriority );
if ( tbs != epicsThreadBooleanStatusSuccess ) {
belowPriority = priority;
}
return belowPriority;
}
//
// set the push pending flag on all virtual circuits
//
void cac::flushRequest ()
{
epicsAutoMutex autoMutex ( this->mutex );
this->flushRequestPrivate ();
}
// lock must be applied
void cac::flushRequestPrivate ()
{
this->serverTable.traverse ( & tcpiiu::flushRequest );
}
unsigned cac::connectionCount () const
{
epicsAutoMutex autoMutex ( this->mutex );
return this->serverTable.numEntriesInstalled ();
}
void cac::show ( unsigned level ) const
{
epicsAutoMutex autoMutex2 ( this->mutex );
::printf ( "Channel Access Client Context at %p for user %s\n",
static_cast <const void *> ( this ), this->pUserName );
if ( level > 0u ) {
this->serverTable.show ( level - 1u );
::printf ( "\tconnection time out watchdog period %f\n", this->connTMO );
::printf ( "\tpreemptive calback is %s\n",
this->pCallbackLocker ? "disabled" : "enabled" );
::printf ( "list of installed services:\n" );
this->services.show ( level - 1u );
}
if ( level > 1u ) {
if ( this->pudpiiu ) {
this->pudpiiu->show ( level - 2u );
}
::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n",
this->pndRecvCnt );
}
if ( level > 2u ) {
::printf ( "Program begin time:\n");
this->programBeginTime.show ( level - 3u );
::printf ( "Channel identifier hash table:\n" );
this->chanTable.show ( level - 3u );
::printf ( "IO identifier hash table:\n" );
this->ioTable.show ( level - 3u );
::printf ( "Synchronous group identifier hash table:\n" );
this->sgTable.show ( level - 3u );
::printf ( "Beacon source identifier hash table:\n" );
this->beaconTable.show ( level - 3u );
::printf ( "Timer queue:\n" );
this->timerQueue.show ( level - 3u );
if ( this->pSearchTmr ) {
::printf ( "search message timer:\n" );
this->pSearchTmr->show ( level - 3u );
}
if ( this->pRepeaterSubscribeTmr ) {
::printf ( "repeater subscribee timer:\n" );
this->pRepeaterSubscribeTmr->show ( level - 3u );
}
::printf ( "IP address to name conversion engine:\n" );
this->ipToAEngine.show ( level - 3u );
::printf ( "\tthe current read sequence number is %u\n",
this->readSeq );
}
if ( level > 3u ) {
::printf ( "Default mutex:\n");
this->mutex.show ( level - 4u );
::printf ( "IO done event:\n");
this->ioDone.show ( level - 3u );
::printf ( "mutex:\n" );
this->mutex.show ( level - 3u );
}
}
/*
* cac::beaconNotify
*/
void cac::beaconNotify ( const inetAddrID & addr, const epicsTime & currentTime )
{
epicsAutoMutex autoMutex ( this->mutex );
if ( ! this->pudpiiu ) {
return;
}
/*
* look for it in the hash table
*/
bhe *pBHE = this->beaconTable.lookup ( addr );
if ( pBHE ) {
/*
* return if the beacon period has not changed significantly
*/
if ( ! pBHE->updatePeriod ( this->programBeginTime, currentTime ) ) {
return;
}
}
else {
/*
* This is the first beacon seen from this server.
* Wait until 2nd beacon is seen before deciding
* if it is a new server (or just the first
* time that we have seen a server's beacon
* shortly after the program started up)
*/
pBHE = new bhe ( currentTime, addr );
if ( pBHE ) {
if ( this->beaconTable.add ( *pBHE ) < 0 ) {
pBHE->destroy ();
}
}
return;
}
/*
* This part is needed when many machines
* have channels in a disconnected state that
* dont exist anywhere on the network. This insures
* that we dont have many CA clients synchronously
* flooding the network with broadcasts (and swamping
* out requests for valid channels).
*
* I fetch the local UDP port number and use the low
* order bits as a pseudo random delay to prevent every
* one from replying at once.
*/
if ( this->pSearchTmr ) {
static const double portTicksPerSec = 1000u;
static const unsigned portBasedDelayMask = 0xff;
unsigned port = this->pudpiiu->getPort ();
double delay = ( port & portBasedDelayMask );
delay /= portTicksPerSec;
this->pSearchTmr->resetPeriod ( delay );
}
this->pudpiiu->resetChannelRetryCounts ();
# if DEBUG
{
char buf[64];
ipAddrToDottedIP (pnet_addr, buf, sizeof ( buf ) );
printf ("new server available: %s\n", buf);
}
# endif
}
// !!!! This routine is only visible in the old interface - or in a new ST interface.
// !!!! In the old interface we restrict thread attach so that calls from threads
// !!!! other than the initializing thread are not allowed if preemptive callback
// !!!! is disabled. This prevents the preemptive callback lock from being released
// !!!! by other threads than the one that locked it.
//
// this routine should probably be moved to the oldCAC?
int cac::pendIO ( const double & timeout )
{
// prevent recursion nightmares by disabling calls to
// pendIO () from within a CA callback.
if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
return ECA_EVDISALLOW;
}
int status = ECA_NORMAL;
epicsTime beg_time = epicsTime::getCurrent ();
double remaining = timeout;
{
epicsAutoMutex autoMutex ( this->mutex );
this->flushRequestPrivate ();
}
while ( this->pndRecvCnt > 0 ) {
if ( remaining < CAC_SIGNIFICANT_DELAY ) {
status = ECA_TIMEOUT;
break;
}
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoRelease ( this->callbackMutex );
this->ioDone.wait ( remaining );
}
else {
this->ioDone.wait ( remaining );
}
double delay = epicsTime::getCurrent () - beg_time;
if ( delay < timeout ) {
remaining = timeout - delay;
}
else {
remaining = 0.0;
}
}
{
epicsAutoMutex autoMutex ( this->mutex );
this->readSeq++;
this->pndRecvCnt = 0u;
if ( this->pudpiiu ) {
this->pudpiiu->connectTimeoutNotify ();
}
}
return status;
}
int cac::blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout )
{
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex );
event.wait ( timeout );
}
else {
event.wait ( timeout );
}
return ECA_NORMAL;
}
// !!!! This routine is only visible in the old interface - or in a new ST interface.
// !!!! In the old interface we restrict thread attach so that calls from threads
// !!!! other than the initializing thread are not allowed if preemptive callback
// !!!! is disabled. This prevents the preemptive callback lock from being released
// !!!! by other threads than the one that locked it.
//
// this routine should probably be moved to the oldCAC?
int cac::pendEvent ( const double & timeout )
{
// prevent recursion nightmares by disabling calls to
// pendIO () from within a CA callback.
if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
return ECA_EVDISALLOW;
}
epicsTime current = epicsTime::getCurrent ();
{
epicsAutoMutex autoMutex ( this->mutex );
this->flushRequestPrivate ();
}
// process at least once if preemptive callback
// isnt enabled
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex );
while ( this->recvThreadsPendingCount > 1 ) {
this->noRecvThreadsPending.wait ();
}
}
double elapsed = epicsTime::getCurrent() - current;
double delay;
if ( timeout > elapsed ) {
delay = timeout - elapsed;
}
else {
delay = 0.0;
}
if ( delay >= CAC_SIGNIFICANT_DELAY ) {
if ( this->pCallbackLocker ) {
epicsAutoMutexRelease autoMutexRelease ( this->callbackMutex );
epicsThreadSleep ( delay );
}
else {
epicsThreadSleep ( delay );
}
}
return ECA_TIMEOUT;
}
void cac::installCASG ( CASG &sg )
{
epicsAutoMutex autoMutex ( this->mutex );
this->sgTable.add ( sg );
}
void cac::uninstallCASG ( CASG &sg )
{
epicsAutoMutex autoMutex ( this->mutex );
this->sgTable.remove ( sg );
}
CASG * cac::lookupCASG ( unsigned id )
{
epicsAutoMutex autoMutex ( this->mutex );
CASG * psg = this->sgTable.lookup ( id );
if ( psg ) {
if ( ! psg->verify () ) {
psg = 0;
}
}
return psg;
}
void cac::registerService ( cacService &service )
{
this->services.registerService ( service );
}
cacChannel & cac::createChannel ( const char * pName,
cacChannelNotify & chan, cacChannel::priLev pri )
{
cacChannel *pIO;
if ( pri > cacChannel::priorityMax ) {
throw cacChannel::badPriority ();
}
if ( pName == 0 || pName[0] == '\0' ) {
throw cacChannel::badString ();
}
pIO = this->services.createChannel ( pName, chan, pri );
if ( ! pIO ) {
pIO = cacGlobalServiceList.createChannel ( pName, chan, pri );
if ( ! pIO ) {
if ( ! this->pudpiiu || ! this->pSearchTmr ) {
if ( ! this->setupUDP () ) {
throw ECA_INTERNAL;
}
}
epics_auto_ptr < cacChannel > pNetChan
( new nciu ( *this, limboIIU, chan, pName, pri ) );
if ( pNetChan.get() ) {
return *pNetChan.release ();
}
else {
throw std::bad_alloc ();
}
}
}
return *pIO;
}
void cac::installNetworkChannel ( nciu & chan, netiiu * & piiu )
{
epicsAutoMutex autoMutex ( this->mutex );
this->chanTable.add ( chan );
this->pudpiiu->attachChannel ( chan );
piiu = this->pudpiiu;
this->pSearchTmr->resetPeriod ( 0.0 );
}
bool cac::setupUDP ()
{
epicsAutoMutex autoMutex ( this->mutex );
if ( ! this->pudpiiu ) {
this->pudpiiu = new udpiiu ( *this );
if ( ! this->pudpiiu ) {
return false;
}
}
if ( ! this->pSearchTmr ) {
this->pSearchTmr = new searchTimer ( *this->pudpiiu, this->timerQueue, this->mutex );
if ( ! this->pSearchTmr ) {
return false;
}
}
if ( ! this->pRepeaterSubscribeTmr ) {
this->pRepeaterSubscribeTmr = new repeaterSubscribeTimer ( *this->pudpiiu, this->timerQueue );
if ( ! this->pRepeaterSubscribeTmr ) {
return false;
}
}
return true;
}
void cac::repeaterSubscribeConfirmNotify ()
{
if ( this->pRepeaterSubscribeTmr ) {
this->pRepeaterSubscribeTmr->confirmNotify ();
}
}
bool cac::lookupChannelAndTransferToTCP ( unsigned cid, unsigned sid,
ca_uint16_t typeCode, arrayElementCount count,
unsigned minorVersionNumber, const osiSockAddr & addr,
const epicsTime & currentTime )
{
tcpiiu * pnewiiu = 0;
unsigned retrySeqNumber;
if ( addr.sa.sa_family != AF_INET ) {
return false;
}
bool v41Ok, v42Ok;
nciu *chan;
{
epicsAutoMutex autoMutex ( this->mutex );
/*
* ignore search replies for deleted channels
*/
chan = this->chanTable.lookup ( cid );
if ( ! chan ) {
return true;
}
retrySeqNumber = chan->getRetrySeqNo ();
/*
* Ignore duplicate search replies
*/
if ( chan->getPIIU()->isVirtualCircuit( chan->pName(), addr ) ) {
return true;
}
/*
* look for an existing virtual circuit
*/
caServerID servID ( addr.ia, chan->getPriority() );
tcpiiu * piiu = this->serverTable.lookup ( servID );
if ( piiu ) {
if ( ! piiu->alive () ) {
return true;
}
}
else {
try {
pnewiiu = piiu = new tcpiiu (
*this, this->connTMO, this->timerQueue,
addr, minorVersionNumber, this->ipToAEngine,
chan->getPriority() );
if ( ! piiu ) {
return true;
}
this->serverTable.add ( *piiu );
bhe * pBHE = this->beaconTable.lookup ( addr.ia );
if ( ! pBHE ) {
pBHE = new bhe ( epicsTime (), addr.ia );
if ( pBHE ) {
if ( this->beaconTable.add ( *pBHE ) < 0 ) {
pBHE->destroy ();
return true;
}
}
else {
return true;
}
}
pBHE->registerIIU ( *piiu );
}
catch ( ... ) {
this->printf ( "CAC: Exception during virtual circuit creation\n" );
return true;
}
}
this->pudpiiu->detachChannel ( *chan );
chan->searchReplySetUp ( *piiu, sid, typeCode, count );
piiu->attachChannel ( *chan );
chan->createChannelRequest ();
piiu->flushRequest ();
v41Ok = piiu->ca_v41_ok ();
v42Ok = piiu->ca_v42_ok ();
if ( ! v42Ok ) {
// connect to old server with lock applied
chan->connect ();
// resubscribe for monitors from this channel
this->connectAllIO ( *chan );
}
if ( this->pSearchTmr ) {
this->pSearchTmr->notifySearchResponse ( retrySeqNumber, currentTime );
}
}
if ( ! v42Ok ) {
// channel uninstal routine grabs the callback lock so
// a channel will not be deleted while a call back is
// in progress
//
// the callback lock is also taken when a channel
// disconnects to prevent a race condition with the
// code below - ie we hold the callback lock here
// so a chanel cant be destroyed out from under us.
chan->connectStateNotify ();
/*
* if less than v4.1 then the server will never
* send access rights and we know that there
* will always be access and also need to call
* their call back here
*/
if ( ! v41Ok ) {
chan->accessRightsNotify ();
}
}
if ( pnewiiu ) {
// this is done here after we release the priamry
// lock so that we will hold the callback lock but
// not the primary lock when the fd is registered
// with the user
bool success = pnewiiu->start ();
if ( ! success ) {
this->privateUninstallIIU ( *pnewiiu );
}
}
return true;
}
void cac::uninstallChannel ( nciu & chan )
{
// wait for any IO callbacks in progress to complete
// prior to destroying the IO object
//
// If this is a callback thread then it already owns the
// CB lock at this point. If this is the main thread then we
// are not in pendEvent, pendIO, SG block, etc and
// this->pCallbackLocker protects. Otherwise if this is
// the users auxillary thread then this->pCallbackLocker
// isnt set and we must take the call back lock.
if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
this->uninstallChannelPrivate ( chan );
}
else if ( this->pCallbackLocker ) {
this->uninstallChannelPrivate ( chan );
}
else {
// taking this mutex guarantees that we will not delete
// a channel out from under a callback
epicsAutoMutex autoCallbackMutex ( this->callbackMutex );
this->uninstallChannelPrivate ( chan );
}
}
void cac::uninstallChannelPrivate ( nciu & chan )
{
epicsAutoMutex autoMutex ( this->mutex );
this->flushIfRequired ( *chan.getPIIU() );
while ( baseNMIU *pIO = chan.cacPrivateListOfIO::eventq.get() ) {
this->ioTable.remove ( *pIO );
class netSubscription *pSubscr = pIO->isSubscription ();
if ( pSubscr && chan.connected() ) {
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
}
{
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
// If they call ioCancel() here it will be ignored
// because the IO has been unregistered above
pIO->exception ( ECA_CHANDESTROY, chan.pName() );
}
pIO->destroy ( *this );
}
nciu * pChan = this->chanTable.remove ( chan );
assert ( pChan = &chan );
// if the claim reply has not returned yet then we will issue
// the clear channel request to the server when the claim reply
// arrives and there is no matching nciu in the client
if ( pChan->connected() ) {
chan.getPIIU()->clearChannelRequest ( chan.getSID(), chan.getCID() );
}
chan.getPIIU()->detachChannel ( chan );
}
int cac::printf ( const char *pformat, ... ) const
{
va_list theArgs;
int status;
va_start ( theArgs, pformat );
status = this->vPrintf ( pformat, theArgs );
va_end ( theArgs );
return status;
}
// lock must be applied before calling this cac private routine
void cac::flushIfRequired ( netiiu & iiu )
{
if ( iiu.flushBlockThreshold() ) {
iiu.flushRequest ();
// the process thread is not permitted to flush as this
// can result in a push / pull deadlock on the TCP pipe.
// Instead, the process thread scheduals the flush with the
// send thread which runs at a higher priority than the
// send thread. The same applies to the UDP thread for
// locking hierarchy reasons.
if ( ! epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
// enable / disable of call back preemption must occur here
// because the tcpiiu might disconnect while waiting and its
// pointer to this cac might become invalid
if ( this->pCallbackLocker ) {
iiu.blockUntilSendBacklogIsReasonable
( &this->callbackMutex, this->mutex );
}
else {
iiu.blockUntilSendBacklogIsReasonable ( 0, this->mutex );
}
}
}
else {
iiu.flushRequestIfAboveEarlyThreshold ();
}
}
void cac::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue )
{
epicsAutoMutex autoMutex ( this->mutex );
this->flushIfRequired ( *chan.getPIIU() );
chan.getPIIU()->writeRequest ( chan, type, nElem, pValue );
}
cacChannel::ioid cac::writeNotifyRequest ( nciu &chan, unsigned type, unsigned nElem,
const void *pValue, cacWriteNotify &notifyIn )
{
epicsAutoMutex autoMutex ( this->mutex );
autoPtrRecycle < netWriteNotifyIO > pIO ( this->ioTable, chan.cacPrivateListOfIO::eventq,
*this, netWriteNotifyIO::factory ( this->freeListWriteNotifyIO, chan, notifyIn ) );
if ( pIO.get() ) {
this->ioTable.add ( *pIO );
chan.cacPrivateListOfIO::eventq.add ( *pIO );
this->flushIfRequired ( *chan.getPIIU() );
chan.getPIIU()->writeNotifyRequest (
chan, *pIO, type, nElem, pValue );
return pIO.release()->getId ();
}
else {
throw std::bad_alloc ();
}
}
cacChannel::ioid cac::readNotifyRequest ( nciu &chan, unsigned type,
unsigned nElem, cacReadNotify &notifyIn )
{
epicsAutoMutex autoMutex ( this->mutex );
autoPtrRecycle < netReadNotifyIO > pIO ( this->ioTable, chan.cacPrivateListOfIO::eventq, *this,
netReadNotifyIO::factory ( this->freeListReadNotifyIO, chan, notifyIn ) );
if ( pIO.get() ) {
this->ioTable.add ( *pIO );
chan.cacPrivateListOfIO::eventq.add ( *pIO );
this->flushIfRequired ( *chan.getPIIU() );
chan.getPIIU()->readNotifyRequest ( chan, *pIO, type, nElem );
return pIO.release()->getId ();
}
else {
throw std::bad_alloc ();
}
}
void cac::ioCancel ( nciu &chan, const cacChannel::ioid &id )
{
// wait for any IO callbacks in progress to complete
// prior to destroying the IO object
//
// If this is a callback thread then it already owns the
// CB lock at this point. If this is the main thread then we
// are not in pendEvent, pendIO, SG block, etc and
// this->pCallbackLocker protects. Otherwise if this id
// the users auxillary thread then this->pCallbackLocker
// isnt set and we must take the call back lock.
if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
this->ioCancelPrivate ( chan, id );
}
else if ( this->pCallbackLocker ) {
this->ioCancelPrivate ( chan, id );
}
else {
epicsAutoMutex autoMutex ( this->callbackMutex );
this->ioCancelPrivate ( chan, id );
}
}
void cac::ioCancelPrivate ( nciu & chan, const cacChannel::ioid & id )
{
epicsAutoMutex autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( pmiu ) {
chan.cacPrivateListOfIO::eventq.remove ( *pmiu );
class netSubscription *pSubscr = pmiu->isSubscription ();
if ( pSubscr ) {
this->flushIfRequired ( *chan.getPIIU() );
if ( chan.connected() ) {
chan.getPIIU()->subscriptionCancelRequest ( chan, *pSubscr );
}
}
pmiu->destroy ( *this );
}
}
void cac::ioShow ( const cacChannel::ioid &id, unsigned level ) const
{
epicsAutoMutex autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.lookup ( id );
if ( pmiu ) {
pmiu->show ( level );
}
}
void cac::ioCompletionNotify ( unsigned id, unsigned type,
arrayElementCount count, const void *pData )
{
baseNMIU * pmiu;
{
epicsAutoMutex autoMutex ( this->mutex );
pmiu = this->ioTable.lookup ( id );
if ( ! pmiu ) {
return;
}
}
//
// The IO destroy routines take the call back mutex
// when uninstalling and deleting the baseNMIU so there is
// no need to worry here about the baseNMIU being deleted while
// it is in use here.
//
pmiu->completion ( type, count, pData );
}
void cac::ioExceptionNotify ( unsigned id, int status, const char *pContext )
{
baseNMIU * pmiu;
{
epicsAutoMutex autoMutex ( this->mutex );
pmiu = this->ioTable.lookup ( id );
}
if ( ! pmiu ) {
return;
}
//
// The IO destroy routines take the call back mutex
// when uninstalling and deleting the baseNMIU so there is
// no need to worry here about the baseNMIU being deleted while
// it is in use here.
//
pmiu->exception ( status, pContext );
}
void cac::ioExceptionNotify ( unsigned id, int status,
const char *pContext, unsigned type, arrayElementCount count )
{
baseNMIU * pmiu;
{
epicsAutoMutex autoMutex ( this->mutex );
pmiu = this->ioTable.lookup ( id );
if ( ! pmiu ) {
return;
}
}
//
// The IO destroy routines take the call back mutex
// when uninstalling and deleting the baseNMIU so there is
// no need to worry here about the baseNMIU being deleted while
// it is in use here.
//
pmiu->exception ( status, pContext, type, count );
}
void cac::ioCompletionNotifyAndDestroy ( unsigned id )
{
epicsAutoMutex autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( ! pmiu ) {
return;
}
pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu );
//
// The IO destroy routines take the call back mutex
// when uninstalling and deleting the baseNMIU so there is
// no need to worry here about the baseNMIU being deleted while
// it is in use here.
//
{
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
pmiu->completion ();
}
pmiu->destroy ( *this );
}
void cac::ioCompletionNotifyAndDestroy ( unsigned id,
unsigned type, arrayElementCount count, const void *pData )
{
epicsAutoMutex autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( ! pmiu ) {
return;
}
pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu );
//
// The IO destroy routines take the call back mutex
// when uninstalling and deleting the baseNMIU so there is
// no need to worry here about the baseNMIU being deleted while
// it is in use here.
//
{
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
pmiu->completion ( type, count, pData );
}
pmiu->destroy ( *this );
}
void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
const char *pContext )
{
epicsAutoMutex autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( ! pmiu ) {
return;
}
pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu );
//
// The IO destroy routines take the call back mutex
// when uninstalling and deleting the baseNMIU so there is
// no need to worry here about the baseNMIU being deleted while
// it is in use here.
//
{
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
pmiu->exception ( status, pContext );
}
pmiu->destroy ( *this );
}
void cac::ioExceptionNotifyAndDestroy ( unsigned id, int status,
const char *pContext, unsigned type, arrayElementCount count )
{
epicsAutoMutex autoMutex ( this->mutex );
baseNMIU * pmiu = this->ioTable.remove ( id );
if ( ! pmiu ) {
return;
}
pmiu->channel().cacPrivateListOfIO::eventq.remove ( *pmiu );
//
// The IO destroy routines take the call back mutex
// when uninstalling and deleting the baseNMIU so there is
// no need to worry here about the baseNMIU being deleted while
// it is in use here.
//
{
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
pmiu->exception ( status, pContext, type, count );
}
pmiu->destroy ( *this );
}
// resubscribe for monitors from this channel
// (lock must be applied)
void cac::connectAllIO ( nciu & chan )
{
tsDLIterBD < baseNMIU > pNetIO =
chan.cacPrivateListOfIO::eventq.firstIter ();
while ( pNetIO.valid () ) {
tsDLIterBD < baseNMIU > next = pNetIO;
next++;
class netSubscription *pSubscr = pNetIO->isSubscription ();
// disconnected channels should have only subscription IO attached
assert ( pSubscr );
try {
chan.getPIIU()->subscriptionRequest ( chan, *pSubscr );
}
catch ( ... ) {
this->printf ( "CAC: failed to send subscription request during channel connect\n" );
}
pNetIO = next;
}
chan.getPIIU()->requestRecvProcessPostponedFlush ();
}
// cancel IO operations and monitor subscriptions
// -- callback lock and cac lock must be applied here
void cac::disconnectAllIO ( nciu & chan, bool enableCallbacks )
{
tsDLIterBD<baseNMIU> pNetIO = chan.cacPrivateListOfIO::eventq.firstIter();
while ( pNetIO.valid() ) {
tsDLIterBD<baseNMIU> pNext = pNetIO;
pNext++;
if ( ! pNetIO->isSubscription() ) {
// no use after disconnected - so uninstall it
this->ioTable.remove ( *pNetIO );
chan.cacPrivateListOfIO::eventq.remove ( *pNetIO );
}
if ( enableCallbacks ) {
char buf[128];
sprintf ( buf, "host = %100s", chan.pHostName() );
epicsAutoMutexRelease unlocker ( this->mutex );
pNetIO->exception ( ECA_DISCONN, buf );
}
if ( ! pNetIO->isSubscription() ) {
pNetIO->destroy ( *this );
}
pNetIO = pNext;
}
}
void cac::recycleReadNotifyIO ( netReadNotifyIO &io )
{
this->freeListReadNotifyIO.release ( &io, sizeof ( io ) );
}
void cac::recycleWriteNotifyIO ( netWriteNotifyIO &io )
{
this->freeListWriteNotifyIO.release ( &io, sizeof ( io ) );
}
void cac::recycleSubscription ( netSubscription &io )
{
this->freeListSubscription.release ( &io, sizeof ( io ) );
}
cacChannel::ioid cac::subscriptionRequest ( nciu &chan, unsigned type,
arrayElementCount nElem, unsigned mask, cacStateNotify &notifyIn )
{
epicsAutoMutex autoMutex ( this->mutex );
autoPtrRecycle < netSubscription > pIO ( this->ioTable, chan.cacPrivateListOfIO::eventq, *this,
netSubscription::factory ( this->freeListSubscription, chan, type, nElem, mask, notifyIn ) );
if ( pIO.get() ) {
this->ioTable.add ( *pIO );
chan.cacPrivateListOfIO::eventq.add ( *pIO );
if ( chan.connected () ) {
this->flushIfRequired ( *chan.getPIIU() );
chan.getPIIU()->subscriptionRequest ( chan, *pIO );
}
cacChannel::ioid id = pIO->getId ();
pIO.release ();
return id;
}
else {
throw std::bad_alloc();
}
}
bool cac::noopAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ )
{
return true;
}
bool cac::echoRespAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ )
{
return true;
}
bool cac::writeNotifyRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * /* pMsgBdy */ )
{
int caStatus = hdr.m_cid;
if ( caStatus == ECA_NORMAL ) {
this->ioCompletionNotifyAndDestroy ( hdr.m_available );
}
else {
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
caStatus, "write notify request rejected" );
}
return true;
}
bool cac::readNotifyRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy )
{
/*
* the channel id field is abused for
* read notify status starting with CA V4.1
*/
int caStatus;
if ( iiu.ca_v41_ok() ) {
caStatus = hdr.m_cid;
}
else {
caStatus = ECA_NORMAL;
}
/*
* convert the data buffer from net
* format to host format
*/
# ifdef CONVERSION_REQUIRED
if ( hdr.m_dataType < NELEMENTS ( cac_dbr_cvrt ) ) {
( *cac_dbr_cvrt[ hdr.m_dataType ] ) (
pMsgBdy, pMsgBdy, false, hdr.m_count);
}
else {
caStatus = ECA_BADTYPE;
}
# endif
if ( caStatus == ECA_NORMAL ) {
this->ioCompletionNotifyAndDestroy ( hdr.m_available,
hdr.m_dataType, hdr.m_count, pMsgBdy );
}
else {
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
caStatus, "read failed", hdr.m_dataType, hdr.m_count );
}
return true;
}
bool cac::eventRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy )
{
int caStatus;
/*
* m_postsize = 0 used to be a subscription cancel confirmation,
* but is now a noop because the IO block is immediately deleted
*/
if ( ! hdr.m_postsize ) {
return true;
}
/*
* the channel id field is abused for
* read notify status starting with CA V4.1
*/
if ( iiu.ca_v41_ok() ) {
caStatus = hdr.m_cid;
}
else {
caStatus = ECA_NORMAL;
}
/*
* convert the data buffer from net format to host format
*/
# ifdef CONVERSION_REQUIRED
if ( hdr.m_dataType < NELEMENTS ( cac_dbr_cvrt ) ) {
( *cac_dbr_cvrt [ hdr.m_dataType ] )(
pMsgBdy, pMsgBdy, false, hdr.m_count);
}
else {
caStatus = htonl ( ECA_BADTYPE );
}
# endif
if ( caStatus == ECA_NORMAL ) {
this->ioCompletionNotify ( hdr.m_available,
hdr.m_dataType, hdr.m_count, pMsgBdy );
}
else {
this->ioExceptionNotify ( hdr.m_available,
caStatus, "subscription update failed",
hdr.m_dataType, hdr.m_count );
}
return true;
}
bool cac::readRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void *pMsgBdy )
{
this->ioCompletionNotifyAndDestroy ( hdr.m_available,
hdr.m_dataType, hdr.m_count, pMsgBdy );
return true;
}
bool cac::clearChannelRespAction ( tcpiiu &, const caHdrLargeArray &, void * /* pMsgBdy */ )
{
return true; // currently a noop
}
bool cac::defaultExcep ( tcpiiu &iiu, const caHdrLargeArray &,
const char *pCtx, unsigned status )
{
char buf[512];
char hostName[64];
iiu.hostName ( hostName, sizeof ( hostName ) );
sprintf ( buf, "host=%64s ctx=%400s", hostName, pCtx );
this->notify.exception ( status, buf, 0, 0u );
return true;
}
bool cac::eventAddExcep ( tcpiiu & /* iiu */, const caHdrLargeArray &hdr,
const char *pCtx, unsigned status )
{
this->ioExceptionNotify ( hdr.m_available, status, pCtx,
hdr.m_dataType, hdr.m_count );
return true;
}
bool cac::readExcep ( tcpiiu &, const caHdrLargeArray &hdr,
const char *pCtx, unsigned status )
{
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
status, pCtx, hdr.m_dataType, hdr.m_count );
return true;
}
bool cac::writeExcep ( tcpiiu &, const caHdrLargeArray &hdr,
const char *pCtx, unsigned status )
{
nciu * pChan = this->chanTable.lookup ( hdr.m_available );
if ( pChan ) {
pChan->writeException ( status, pCtx,
hdr.m_dataType, hdr.m_count );
}
return true;
}
bool cac::readNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr,
const char *pCtx, unsigned status )
{
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
status, pCtx, hdr.m_dataType, hdr.m_count );
return true;
}
bool cac::writeNotifyExcep ( tcpiiu &, const caHdrLargeArray &hdr,
const char *pCtx, unsigned status )
{
this->ioExceptionNotifyAndDestroy ( hdr.m_available,
status, pCtx, hdr.m_dataType, hdr.m_count );
return true;
}
bool cac::exceptionRespAction ( tcpiiu &iiu, const caHdrLargeArray &hdr, void *pMsgBdy )
{
const caHdr * pReq = reinterpret_cast < const caHdr * > ( pMsgBdy );
unsigned bytesSoFar = sizeof ( *pReq );
if ( hdr.m_postsize < bytesSoFar ) {
return false;
}
caHdrLargeArray req;
req.m_cmmd = ntohs ( pReq->m_cmmd );
req.m_postsize = ntohs ( pReq->m_postsize );
req.m_dataType = ntohs ( pReq->m_dataType );
req.m_count = ntohs ( pReq->m_count );
req.m_cid = ntohl ( pReq->m_cid );
req.m_available = ntohl ( pReq->m_available );
const ca_uint32_t * pLW = reinterpret_cast < const ca_uint32_t * > ( pReq + 1 );
if ( req.m_postsize == 0xffff ) {
static const unsigned annexSize =
sizeof ( req.m_postsize ) + sizeof ( req.m_count );
bytesSoFar += annexSize;
if ( hdr.m_postsize < bytesSoFar ) {
return false;
}
req.m_postsize = ntohl ( pLW[0] );
req.m_count = ntohl ( pLW[1] );
pLW += 2u;
}
// execute the exception message
pExcepProtoStubTCP pStub;
if ( hdr.m_cmmd >= NELEMENTS ( cac::tcpExcepJumpTableCAC ) ) {
pStub = &cac::defaultExcep;
}
else {
pStub = cac::tcpExcepJumpTableCAC [req.m_cmmd];
}
const char *pCtx = reinterpret_cast < const char * > ( pLW );
return ( this->*pStub ) ( iiu, req, pCtx, hdr.m_available );
}
bool cac::accessRightsRespAction ( tcpiiu &, const caHdrLargeArray &hdr, void * /* pMsgBdy */ )
{
nciu * pChan;
{
epicsAutoMutex autoMutex ( this->mutex );
pChan = this->chanTable.lookup ( hdr.m_cid );
if ( pChan ) {
unsigned ar = hdr.m_available;
caAccessRights accessRights (
( ar & CA_PROTO_ACCESS_RIGHT_READ ) ? true : false,
( ar & CA_PROTO_ACCESS_RIGHT_WRITE ) ? true : false);
pChan->accessRightsStateChange ( accessRights );
}
}
//
// the channel delete routine takes the call back lock so
// that this will not be called when the channel is being
// deleted.
//
if ( pChan ) {
pChan->accessRightsNotify ();
}
return true;
}
bool cac::claimCIURespAction ( tcpiiu & iiu,
const caHdrLargeArray & hdr, void * /*pMsgBdy */ )
{
nciu * pChan;
{
epicsAutoMutex autoMutex ( this->mutex );
pChan = this->chanTable.lookup ( hdr.m_cid );
if ( pChan ) {
unsigned sidTmp;
if ( iiu.ca_v44_ok() ) {
sidTmp = hdr.m_available;
}
else {
sidTmp = pChan->getSID ();
}
pChan->connect ( hdr.m_dataType, hdr.m_count, sidTmp, iiu.ca_v41_ok() );
this->connectAllIO ( *pChan );
}
else if ( iiu.ca_v44_ok() ) {
// this indicates a claim response for a resource that does
// not exist in the client - so just remove it from the server
iiu.clearChannelRequest ( hdr.m_available, hdr.m_cid );
}
}
// the callback lock is taken when a channel is unistalled or when
// is disconnected to prevent race conditions here
if ( pChan ) {
pChan->connectStateNotify ();
}
return true;
}
bool cac::verifyAndDisconnectChan ( tcpiiu & /* iiu */,
const caHdrLargeArray & hdr, void * /* pMsgBdy */ )
{
epicsAutoMutex autoMutex ( this->mutex );
nciu * pChan = this->chanTable.lookup ( hdr.m_cid );
if ( ! pChan ) {
return true;
}
assert ( this->pudpiiu );
this->disconnectChannelPrivate ( *pChan, this->pudpiiu );
this->pSearchTmr->resetPeriod ( 0.0 );
return true;
}
// callback lock and cac lock must be applied
void cac::disconnectChannelPrivate ( nciu & chan, netiiu *pDstIIU )
{
this->disconnectAllIO ( chan, true );
chan.getPIIU()->detachChannel ( chan );
chan.disconnect ( limboIIU );
limboIIU.attachChannel ( chan );
if ( pDstIIU ) {
epicsAutoMutexRelease autoMutexRelease ( this->mutex );
chan.connectStateNotify ();
chan.accessRightsNotify ();
}
if ( pDstIIU ) {
limboIIU.detachChannel ( chan );
chan.disconnect ( *pDstIIU );
pDstIIU->attachChannel ( chan );
}
}
bool cac::badTCPRespAction ( tcpiiu & iiu,
const caHdrLargeArray & hdr, void * /* pMsgBdy */ )
{
char hostName[64];
iiu.hostName ( hostName, sizeof ( hostName ) );
this->printf ( "CAC: Undecipherable TCP message ( bad response type %u ) from %s\n",
hdr.m_cmmd, hostName );
return false;
}
bool cac::executeResponse ( tcpiiu &iiu, caHdrLargeArray &hdr, char *pMshBody )
{
// execute the response message
pProtoStubTCP pStub;
if ( hdr.m_cmmd >= NELEMENTS ( cac::tcpJumpTableCAC ) ) {
pStub = &cac::badTCPRespAction;
}
else {
pStub = cac::tcpJumpTableCAC [hdr.m_cmmd];
}
return ( this->*pStub ) ( iiu, hdr, pMshBody );
}
void cac::signal ( int ca_status, const char *pfilenm,
int lineno, const char *pFormat, ... )
{
va_list theArgs;
va_start ( theArgs, pFormat );
this->vSignal ( ca_status, pfilenm, lineno, pFormat, theArgs);
va_end ( theArgs );
}
void cac::vSignal ( int ca_status, const char *pfilenm,
int lineno, const char *pFormat, va_list args )
{
static const char *severity[] =
{
"Warning",
"Success",
"Error",
"Info",
"Fatal",
"Fatal",
"Fatal",
"Fatal"
};
this->printf ( "CA.Client.Exception...............................................\n" );
this->printf ( " %s: \"%s\"\n",
severity[ CA_EXTRACT_SEVERITY ( ca_status ) ],
ca_message ( ca_status ) );
if ( pFormat ) {
this->printf ( " Context: \"" );
this->vPrintf ( pFormat, args );
this->printf ( "\"\n" );
}
if ( pfilenm ) {
this->printf ( " Source File: %s line %d\n",
pfilenm, lineno );
}
epicsTime current = epicsTime::getCurrent ();
char date[64];
current.strftime ( date, sizeof ( date ), "%a %b %d %Y %H:%M:%S.%f");
this->printf ( " Current Time: %s\n", date );
/*
* Terminate execution if unsuccessful
*/
if( ! ( ca_status & CA_M_SUCCESS ) &&
CA_EXTRACT_SEVERITY ( ca_status ) != CA_K_WARNING ){
errlogFlush();
abort();
}
this->printf ( "..................................................................\n" );
}
void cac::incrementOutstandingIO ()
{
epicsAutoMutex locker ( this->mutex );
if ( this->pndRecvCnt < UINT_MAX ) {
this->pndRecvCnt++;
}
else {
throwWithLocation ( caErrorCode (ECA_INTERNAL) );
}
}
void cac::decrementOutstandingIO ()
{
bool signalNeeded;
{
epicsAutoMutex locker ( this->mutex );
if ( this->pndRecvCnt > 0u ) {
this->pndRecvCnt--;
if ( this->pndRecvCnt == 0u ) {
signalNeeded = true;
}
else {
signalNeeded = false;
}
}
else {
signalNeeded = true;
}
}
if ( signalNeeded ) {
this->ioDone.signal ();
}
}
void cac::decrementOutstandingIO ( unsigned sequenceNo )
{
bool signalNeeded;
{
epicsAutoMutex locker ( this->mutex );
if ( this->readSeq == sequenceNo ) {
if ( this->pndRecvCnt > 0u ) {
this->pndRecvCnt--;
if ( this->pndRecvCnt == 0u ) {
signalNeeded = true;
}
else {
signalNeeded = false;
}
}
else {
signalNeeded = true;
}
}
else {
signalNeeded = false;
}
}
if ( signalNeeded ) {
this->ioDone.signal ();
}
}
void cac::selfTest () const
{
this->chanTable.verify ();
this->ioTable.verify ();
this->sgTable.verify ();
this->beaconTable.verify ();
}
void cac::notifyNewFD ( SOCKET sock ) const
{
if ( this->pCallbackLocker ) {
this->notify.fdWasCreated ( sock );
}
}
void cac::notifyDestroyFD ( SOCKET sock ) const
{
if ( this->pCallbackLocker ) {
this->notify.fdWasDestroyed ( sock );
}
}
void cac::uninstallIIU ( tcpiiu & iiu )
{
epicsAutoMutex autoMutexCB ( this->callbackMutex );
this->privateUninstallIIU ( iiu );
}
void cac::privateUninstallIIU ( tcpiiu & iiu )
{
epicsAutoMutex autoMutexCAC ( this->mutex );
if ( iiu.channelCount() ) {
char hostNameTmp[64];
iiu.hostName ( hostNameTmp, sizeof ( hostNameTmp ) );
genLocalExcep ( *this, ECA_DISCONN, hostNameTmp );
}
osiSockAddr addr = iiu.getNetworkAddress();
if ( addr.sa.sa_family == AF_INET ) {
inetAddrID tmp ( addr.ia );
bhe *pBHE = this->beaconTable.lookup ( tmp );
if ( pBHE ) {
pBHE->unregisterIIU ( iiu );
}
}
// dont allow any channels to jump back onto
// this iiu while the lock is removed below
this->serverTable.remove ( iiu );
assert ( this->pudpiiu );
this->removeAllChan ( iiu, this->pudpiiu );
delete &iiu;
this->pSearchTmr->resetPeriod ( 0.0 );
// signal iiu uninstal event so that cac can properly shut down
this->iiuUninstal.signal();
}
void cac::preemptiveCallbackLock ()
{
// the count must be incremented prior to taking the lock
{
epicsAutoMutex autoMutex ( this->mutex );
assert ( this->recvThreadsPendingCount < UINT_MAX );
this->recvThreadsPendingCount++;
}
this->callbackMutex.lock ();
}
void cac::preemptiveCallbackUnlock ()
{
this->callbackMutex.unlock ();
bool signalRequired;
{
epicsAutoMutex autoMutex ( this->mutex );
assert ( this->recvThreadsPendingCount > 0 );
this->recvThreadsPendingCount--;
if ( this->pCallbackLocker ) {
if ( this->recvThreadsPendingCount == 1u ) {
signalRequired = true;
}
else {
signalRequired = false;
}
}
else {
signalRequired = false;
}
}
if ( signalRequired ) {
this->noRecvThreadsPending.signal ();
}
}
double cac::beaconPeriod ( const nciu & chan ) const
{
epicsAutoMutex locker ( this->mutex );
const netiiu * pIIU = chan.getConstPIIU ();
if ( pIIU ) {
osiSockAddr addr = pIIU->getNetworkAddress ();
if ( addr.sa.sa_family == AF_INET ) {
inetAddrID tmp ( addr.ia );
bhe *pBHE = this->beaconTable.lookup ( tmp );
if ( pBHE ) {
return pBHE->period ();
}
}
}
return - DBL_MAX;
}
void cac::udpWakeup ()
{
epicsAutoMutex locker ( this->mutex );
if ( this->pudpiiu ) {
this->pudpiiu->wakeupMsg ();
}
}