added preemptive callback control

reinstalled flow control
This commit is contained in:
Jeff Hill
2000-06-22 23:59:51 +00:00
parent a0c9a0ea91
commit 47729fed41
14 changed files with 480 additions and 354 deletions
+5 -3
View File
@@ -89,6 +89,7 @@ int CASG::block ( double timeout )
if ( p ) {
return ECA_EVDISALLOW;
}
threadPrivateSet (cacRecursionLock, &cacRecursionLock);
cur_time = osiTime::getCurrent ();
@@ -98,6 +99,8 @@ int CASG::block ( double timeout )
beg_time = cur_time;
delay = 0.0;
this->client.enableCallbackPreemption ();
status = ECA_NORMAL;
while ( 1 ) {
this->mutex.lock ();
@@ -135,9 +138,6 @@ int CASG::block ( double timeout )
break;
}
/*
* wait for asynch notification
*/
this->sem.wait ( remaining );
/*
@@ -148,6 +148,8 @@ int CASG::block ( double timeout )
delay = cur_time - beg_time;
}
this->client.disableCallbackPreemption ();
threadPrivateSet (cacRecursionLock, NULL);
return status;
+3 -2
View File
@@ -22,7 +22,6 @@ LIBSRCS += cacServiceList.cpp
LIBSRCS += access.cpp
LIBSRCS += processThread.cpp
LIBSRCS += iocinf.cpp
LIBSRCS += flow_control.cpp
LIBSRCS += convert.cpp
LIBSRCS += test_event.cpp
LIBSRCS += repeater.cpp
@@ -73,10 +72,12 @@ Com_DIR = $(INSTALL_LIB)
caRepeater_SRCS = caRepeater.cpp
PROD += caRepeater
PROD += catime acctst
PROD_DEFAULT += catime acctst
catime_SRCS = catimeMain.c catime.c
acctst_SRCS = acctstMain.c acctst.c
PROD_vxWorks = catime acctst
include $(TOP)/configure/RULES
+36 -30
View File
@@ -111,6 +111,11 @@ extern "C" void ca_default_exception_handler (struct exception_handler_args args
* ca_task_initialize ()
*/
int epicsShareAPI ca_task_initialize (void)
{
return ca_context_create ( false );
}
epicsShareFunc int epicsShareAPI ca_context_create ( int preemptiveCallBackEnable )
{
cac *pcac;
@@ -125,7 +130,7 @@ int epicsShareAPI ca_task_initialize (void)
return ECA_NORMAL;
}
pcac = new cac;
pcac = new cac ( preemptiveCallBackEnable ? true : false );
if ( ! pcac ) {
return ECA_ALLOCMEM;
}
@@ -179,12 +184,7 @@ int epicsShareAPI ca_modify_user_name (const char *)
}
/*
* ca_task_exit()
*
* releases all resources alloc to a channel access client
*/
epicsShareFunc int epicsShareAPI ca_task_exit (void)
epicsShareFunc int epicsShareAPI ca_context_destroy (void)
{
cac *pcac;
@@ -199,6 +199,16 @@ epicsShareFunc int epicsShareAPI ca_task_exit (void)
return ECA_NORMAL;
}
/*
* ca_task_exit()
*
* releases all resources alloc to a channel access client
*/
epicsShareFunc int epicsShareAPI ca_task_exit (void)
{
return ca_context_destroy ();
}
/*
*
* CA_BUILD_AND_CONNECT
@@ -323,13 +333,13 @@ int epicsShareAPI ca_add_exception_event (caExceptionHandler *pfunc, void *arg)
cac *pcac;
int caStatus;
caStatus = fetchClientContext (&pcac);
caStatus = fetchClientContext ( &pcac );
if ( caStatus != ECA_NORMAL ) {
return caStatus;
}
LOCK (pcac);
if (pfunc) {
pcac->lock ();
if ( pfunc ) {
pcac->ca_exception_func = pfunc;
pcac->ca_exception_arg = arg;
}
@@ -337,7 +347,7 @@ int epicsShareAPI ca_add_exception_event (caExceptionHandler *pfunc, void *arg)
pcac->ca_exception_func = ca_default_exception_handler;
pcac->ca_exception_arg = NULL;
}
UNLOCK (pcac);
pcac->unlock ();
return ECA_NORMAL;
}
@@ -515,9 +525,9 @@ void genLocalExcepWFL (cac *pcac, long stat, const char *ctx, const char *pFile,
else if (pcac->ca_exception_func!=NULL) {
args.usr = pcac->ca_exception_arg;
LOCK (pcac);
pcac->lock ();
(*pcac->ca_exception_func) (args);
UNLOCK (pcac);
pcac->unlock ();
}
}
@@ -626,20 +636,17 @@ void epicsShareAPI ca_signal_formated (long ca_status, const char *pfilenm,
* (for a manager of the select system call under UNIX)
*
*/
int epicsShareAPI ca_add_fd_registration(CAFDHANDLER *func, void *arg)
int epicsShareAPI ca_add_fd_registration (CAFDHANDLER *func, void *arg)
{
cac *pcac;
int caStatus;
cac *pcac;
int caStatus;
caStatus = fetchClientContext (&pcac);
caStatus = fetchClientContext ( &pcac );
if ( caStatus != ECA_NORMAL ) {
return caStatus;
}
LOCK (pcac);
pcac->ca_fd_register_func = func;
pcac->ca_fd_register_arg = arg;
UNLOCK (pcac);
pcac->registerForFileDescriptorCallBack ( func, arg );
return ECA_NORMAL;
}
@@ -703,14 +710,14 @@ int epicsShareAPI ca_replace_printf_handler (caPrintfFunc *ca_printf_func)
return caStatus;
}
LOCK (pcac);
pcac->lock ();
if (ca_printf_func) {
pcac->ca_printf_func = ca_printf_func;
}
else {
pcac->ca_printf_func = epicsVprintf;
}
UNLOCK (pcac);
pcac->unlock ();
return ECA_NORMAL;
}
@@ -844,14 +851,12 @@ unsigned epicsShareAPI ca_get_ioc_connection_count ()
return pcac->connectionCount ();
}
void netiiu::show (unsigned /* level */) const
void netiiu::show ( unsigned /* level */ ) const
{
nciu *pChan;
this->pcas->lock ();
LOCK (this->pcas);
tsDLIter<nciu> iter (this->pcas->pudpiiu->chidList);
while ( ( pChan = iter () ) ) {
tsDLIterConstBD <nciu> pChan ( this->chidList.first () );
while ( pChan != pChan.eol () ) {
char hostName [256];
printf( "%s native type=%d ",
pChan->pName (), pChan->nativeType () );
@@ -877,7 +882,8 @@ void netiiu::show (unsigned /* level */) const
printf("\n");
}
UNLOCK (this->pcas);
this->pcas->unlock ();
}
epicsShareFunc int epicsShareAPI ca_channel_status (threadId /* tid */)
+2 -2
View File
@@ -57,9 +57,9 @@ LOCAL void caExtraEventLabor (void *pArg)
args.status = ECA_NORMAL;
}
LOCK (pcac);
pcac->lock ();
(*ppnb->caUserCallback) (args);
UNLOCK (pcac);
pcac->unlock ();
ppnb->busy = FALSE;
}
+146 -97
View File
@@ -35,13 +35,16 @@ static void cacInitRecursionLock ( void * dummy )
//
// cac::cac ()
//
cac::cac () :
beaconTable (1024),
endOfBCastList (0),
ioTable (1024),
chanTable (1024),
sgTable (128),
pndrecvcnt (0)
cac::cac ( bool enablePreemptiveCallbackIn ) :
beaconTable ( 1024 ),
endOfBCastList ( 0 ),
ioTable ( 1024 ),
chanTable ( 1024 ),
sgTable ( 128 ),
fdRegFunc ( 0 ),
fdRegArg ( 0 ),
pndrecvcnt ( 0 ),
enablePreemptiveCallback ( enablePreemptiveCallbackIn )
{
long status;
static threadOnceId once = OSITHREAD_ONCE_INIT;
@@ -80,24 +83,16 @@ cac::cac () :
this->pudpiiu = NULL;
this->ca_exception_func = ca_default_exception_handler;
this->ca_exception_arg = NULL;
this->ca_fd_register_func = NULL;
this->ca_fd_register_arg = NULL;
this->ca_number_iiu_in_fc = 0u;
this->readSeq = 0u;
this->ca_client_lock = semMutexCreate();
if (!this->ca_client_lock) {
throwWithLocation ( caErrorCode (ECA_ALLOCMEM) );
}
this->ca_io_done_sem = semBinaryCreate(semEmpty);
if (!this->ca_io_done_sem) {
semMutexDestroy (this->ca_client_lock);
throwWithLocation ( caErrorCode (ECA_ALLOCMEM) );
}
this->ca_blockSem = semBinaryCreate(semEmpty);
if (!this->ca_blockSem) {
semMutexDestroy (this->ca_client_lock);
semBinaryDestroy (this->ca_io_done_sem);
throwWithLocation ( caErrorCode (ECA_ALLOCMEM) );
}
@@ -116,7 +111,6 @@ cac::cac () :
len = strlen (tmp) + 1;
this->ca_pUserName = (char *) malloc ( len );
if ( ! this->ca_pUserName ) {
semMutexDestroy (this->ca_client_lock);
semBinaryDestroy (this->ca_io_done_sem);
semBinaryDestroy (this->ca_blockSem);
throwWithLocation ( caErrorCode (ECA_ALLOCMEM) );
@@ -126,8 +120,7 @@ cac::cac () :
/* record the host name */
this->ca_pHostName = localHostName();
if (!this->ca_pHostName) {
semMutexDestroy (this->ca_client_lock);
if ( ! this->ca_pHostName ) {
semBinaryDestroy (this->ca_io_done_sem);
semBinaryDestroy (this->ca_blockSem);
free (this->ca_pUserName);
@@ -151,15 +144,20 @@ cac::cac () :
this->ca_server_port =
envGetInetPortConfigParam (&EPICS_CA_SERVER_PORT, CA_SERVER_PORT);
this->pProcThread = new processThread (this);
//
// unfortunately, this must be created her in the
// constructor, and not on demand (only when it is needed)
// because the enable reference count must be
// maintained whenever this object exists.
//
this->pProcThread = new processThread ( this );
if ( ! this->pProcThread ) {
semMutexDestroy (this->ca_client_lock);
semBinaryDestroy (this->ca_io_done_sem);
semBinaryDestroy (this->ca_blockSem);
free (this->ca_pUserName);
free (this->ca_pHostName);
throwWithLocation ( caErrorCode (ECA_ALLOCMEM) );
}
else if ( this->enablePreemptiveCallback ) {
// only after this->pProcThread is valid
this->enableCallbackPreemption ();
}
}
/*
@@ -169,6 +167,8 @@ cac::cac () :
*/
cac::~cac ()
{
this->enableCallbackPreemption ();
//
// destroy local IO channels
//
@@ -205,45 +205,49 @@ cac::~cac ()
}
this->iiuListMutex.unlock ();
this->defaultMutex.lock ();
//
// shutdown udp and wait for threads to exit
//
if (this->pudpiiu) {
if ( this->pudpiiu ) {
if ( ! this->enablePreemptiveCallback ) {
if ( this->fdRegFunc ) {
( *this->fdRegFunc )
( this->fdRegArg, this->pudpiiu->getSock (), FALSE );
}
}
delete this->pudpiiu;
}
LOCK (this);
/* remove put convert block free list */
ellFree (&this->putCvrtBuf);
ellFree ( &this->putCvrtBuf );
/* reclaim sync group resources */
this->sgTable.destroyAllEntries ();
/* free select context lists */
ellFree (&this->fdInfoFreeList);
ellFree (&this->fdInfoList);
ellFree ( &this->fdInfoFreeList );
ellFree ( &this->fdInfoList );
/*
* free user name string
*/
if (this->ca_pUserName) {
free (this->ca_pUserName);
if ( this->ca_pUserName ) {
free ( this->ca_pUserName );
}
/*
* free host name string
*/
if (this->ca_pHostName) {
free (this->ca_pHostName);
if ( this->ca_pHostName ) {
free ( this->ca_pHostName );
}
this->beaconTable.destroyAllEntries ();
semBinaryDestroy (this->ca_io_done_sem);
semBinaryDestroy (this->ca_blockSem);
semMutexDestroy (this->ca_client_lock);
semBinaryDestroy ( this->ca_io_done_sem );
semBinaryDestroy ( this->ca_blockSem );
osiSockRelease ();
@@ -252,14 +256,14 @@ cac::~cac ()
void cac::safeDestroyNMIU (unsigned id)
{
LOCK (this);
this->defaultMutex.lock ();
baseNMIU *pIOBlock = this->ioTable.lookup (id);
if (pIOBlock) {
if ( pIOBlock ) {
pIOBlock->destroy ();
}
UNLOCK (this);
this->defaultMutex.unlock ();
}
void cac::processRecvBacklog ()
@@ -271,6 +275,7 @@ void cac::processRecvBacklog ()
unsigned bytesToProcess;
this->iiuListMutex.lock ();
piiu = this->iiuListRecvPending.get ();
if ( ! piiu ) {
this->iiuListMutex.unlock ();
@@ -279,6 +284,7 @@ void cac::processRecvBacklog ()
piiu->recvPending = false;
this->iiuListIdle.add (*piiu);
this->iiuListMutex.unlock ();
if ( piiu->state == iiu_disconnected ) {
@@ -338,17 +344,19 @@ void cac::cleanUpPendIO ()
{
nciu *pchan;
LOCK (this);
this->defaultMutex.lock ();
this->readSeq++;
this->pndrecvcnt = 0u;
tsDLIter <nciu> iter ( this->pudpiiu->chidList );
while ( ( pchan = iter () ) ) {
pchan->connectTimeoutNotify ();
if ( this->pudpiiu ) {
tsDLIter <nciu> iter ( this->pudpiiu->chidList );
while ( ( pchan = iter () ) ) {
pchan->connectTimeoutNotify ();
}
}
UNLOCK (this);
this->defaultMutex.unlock ();
}
unsigned cac::connectionCount () const
@@ -363,8 +371,9 @@ unsigned cac::connectionCount () const
void cac::show (unsigned level) const
{
LOCK (this);
if (this->pudpiiu) {
this->defaultMutex.lock ();
if ( this->pudpiiu ) {
this->pudpiiu->show (level);
}
@@ -384,19 +393,23 @@ void cac::show (unsigned level) const
this->iiuListMutex.unlock ();
UNLOCK (this);
this->defaultMutex.unlock ();
}
void cac::installIIU (tcpiiu &iiu)
void cac::installIIU ( tcpiiu &iiu )
{
this->iiuListMutex.lock ();
iiu.recvPending = false;
this->iiuListIdle.add (iiu);
this->iiuListMutex.unlock ();
if (this->ca_fd_register_func) {
( * this->ca_fd_register_func ) ( (void *) this->ca_fd_register_arg, iiu.sock, TRUE);
this->defaultMutex.lock ();
if ( ! this->enablePreemptiveCallback && this->fdRegFunc ) {
( * this->fdRegFunc )
( (void *) this->fdRegArg, iiu.getSock (), TRUE );
}
this->defaultMutex.unlock ();
}
void cac::signalRecvActivityIIU (tcpiiu &iiu)
@@ -429,27 +442,33 @@ void cac::removeIIU (tcpiiu &iiu)
{
this->iiuListMutex.lock ();
if (iiu.recvPending) {
if ( iiu.recvPending ) {
this->iiuListRecvPending.remove (iiu);
}
else {
this->iiuListIdle.remove (iiu);
}
if ( ! this->enablePreemptiveCallback ) {
if ( this->fdRegFunc ) {
(*this->fdRegFunc)
((void *)this->fdRegArg, iiu.getSock (), FALSE);
}
}
this->iiuListMutex.unlock ();
}
/*
* cac::lookupBeaconInetAddr()
*
* LOCK must be applied
*/
bhe * cac::lookupBeaconInetAddr (const inetAddrID &ina)
{
bhe *pBHE;
LOCK (this);
this->defaultMutex.lock ();
pBHE = this->beaconTable.lookup (ina);
UNLOCK (this);
this->defaultMutex.unlock ();
return pBHE;
}
@@ -460,7 +479,7 @@ bhe *cac::createBeaconHashEntry (const inetAddrID &ina, const osiTime &initialTi
{
bhe *pBHE;
LOCK (this);
this->defaultMutex.lock ();
pBHE = this->beaconTable.lookup ( ina );
if ( !pBHE ) {
@@ -473,7 +492,7 @@ bhe *cac::createBeaconHashEntry (const inetAddrID &ina, const osiTime &initialTi
}
}
UNLOCK (this);
this->defaultMutex.unlock ();
return pBHE;
}
@@ -491,7 +510,8 @@ void cac::beaconNotify ( const inetAddrID &addr )
return;
}
LOCK ( this );
this->defaultMutex.lock ();
/*
* look for it in the hash table
*/
@@ -512,7 +532,7 @@ void cac::beaconNotify ( const inetAddrID &addr )
}
if ( ! netChange ) {
UNLOCK (this);
this->defaultMutex.unlock ();
return;
}
@@ -533,8 +553,7 @@ void cac::beaconNotify ( const inetAddrID &addr )
int saddr_length = sizeof(saddr);
int status;
status = getsockname ( this->pudpiiu->sock, (struct sockaddr *) &saddr,
&saddr_length );
status = getsockname ( this->pudpiiu->getSock (), (struct sockaddr *) &saddr, &saddr_length );
if ( status < 0 ) {
epicsPrintf ( "CAC: getsockname () error was \"%s\"\n", SOCKERRSTR (SOCKERRNO) );
return;
@@ -562,7 +581,7 @@ void cac::beaconNotify ( const inetAddrID &addr )
iter++;
}
UNLOCK ( this );
this->defaultMutex.unlock ();
# if DEBUG
{
@@ -581,22 +600,22 @@ void cac::removeBeaconInetAddr (const inetAddrID &ina)
{
bhe *pBHE;
LOCK (this);
this->defaultMutex.lock ();
pBHE = this->beaconTable.remove ( ina );
UNLOCK (this);
this->defaultMutex.unlock ();
assert (pBHE);
}
void cac::decrementOutstandingIO (unsigned seqNumber)
{
LOCK (this);
this->defaultMutex.lock ();
if ( this->readSeq == seqNumber ) {
if ( this->pndrecvcnt > 0u ) {
this->pndrecvcnt--;
}
}
UNLOCK (this);
this->defaultMutex.unlock ();
if ( this->pndrecvcnt == 0u ) {
semBinaryGive (this->ca_io_done_sem);
}
@@ -604,11 +623,11 @@ void cac::decrementOutstandingIO (unsigned seqNumber)
void cac::decrementOutstandingIO ()
{
LOCK (this);
this->defaultMutex.lock ();
if ( this->pndrecvcnt > 0u ) {
this->pndrecvcnt--;
}
UNLOCK (this);
this->defaultMutex.unlock ();
if ( this->pndrecvcnt == 0u ) {
semBinaryGive (this->ca_io_done_sem);
}
@@ -616,11 +635,11 @@ void cac::decrementOutstandingIO ()
void cac::incrementOutstandingIO ()
{
LOCK (this);
this->defaultMutex.lock ();
if ( this->pndrecvcnt < UINT_MAX ) {
this->pndrecvcnt++;
}
UNLOCK (this);
this->defaultMutex.unlock ();
}
unsigned cac::readSequence () const
@@ -628,7 +647,7 @@ unsigned cac::readSequence () const
return this->readSeq;
}
int cac::pend (double timeout, int early)
int cac::pend ( double timeout, int early )
{
int status;
void *p;
@@ -636,16 +655,20 @@ int cac::pend (double timeout, int early)
/*
* dont allow recursion
*/
p = threadPrivateGet (cacRecursionLock);
p = threadPrivateGet ( cacRecursionLock );
if (p) {
return ECA_EVDISALLOW;
}
threadPrivateSet (cacRecursionLock, &cacRecursionLock);
threadPrivateSet ( cacRecursionLock, &cacRecursionLock );
status = this->pendPrivate (timeout, early);
this->enableCallbackPreemption ();
threadPrivateSet (cacRecursionLock, NULL);
status = this->pendPrivate ( timeout, early );
this->disableCallbackPreemption ();
threadPrivateSet ( cacRecursionLock, NULL );
return status;
}
@@ -698,7 +721,6 @@ int cac::pendPrivate (double timeout, int early)
}
}
/* recv occurs in another thread */
semBinaryTakeTimeout ( this->ca_io_done_sem, remaining );
if ( this->pndrecvcnt == 0 && early ) {
@@ -725,72 +747,72 @@ bool cac::ioComplete () const
void cac::installIO ( baseNMIU &io )
{
LOCK (this);
this->defaultMutex.lock ();
this->ioTable.add ( io );
UNLOCK (this);
this->defaultMutex.unlock ();
}
void cac::uninstallIO ( baseNMIU &io )
{
LOCK (this);
this->defaultMutex.lock ();
this->ioTable.remove ( io );
UNLOCK (this);
this->defaultMutex.unlock ();
}
baseNMIU * cac::lookupIO (unsigned id)
{
LOCK (this);
this->defaultMutex.lock ();
baseNMIU * pmiu = this->ioTable.lookup ( id );
UNLOCK (this);
this->defaultMutex.unlock ();
return pmiu;
}
void cac::installChannel (nciu &chan)
{
LOCK (this);
this->defaultMutex.lock ();
this->chanTable.add ( chan );
UNLOCK (this);
this->defaultMutex.unlock ();
}
void cac::uninstallChannel (nciu &chan)
{
LOCK (this);
this->defaultMutex.lock ();
this->chanTable.remove ( chan );
UNLOCK (this);
this->defaultMutex.unlock ();
}
nciu * cac::lookupChan (unsigned id)
{
LOCK (this);
this->defaultMutex.lock ();
nciu * pchan = this->chanTable.lookup ( id );
UNLOCK (this);
this->defaultMutex.unlock ();
return pchan;
}
void cac::installCASG (CASG &sg)
{
LOCK (this);
this->defaultMutex.lock ();
this->sgTable.add ( sg );
UNLOCK (this);
this->defaultMutex.unlock ();
}
void cac::uninstallCASG (CASG &sg)
{
LOCK (this);
this->defaultMutex.lock ();
this->sgTable.remove ( sg );
UNLOCK (this);
this->defaultMutex.unlock ();
}
CASG * cac::lookupCASG (unsigned id)
{
LOCK (this);
this->defaultMutex.lock ();
CASG * psg = this->sgTable.lookup ( id );
if ( psg ) {
if ( ! psg->verify () ) {
psg = 0;
}
}
UNLOCK (this);
this->defaultMutex.unlock ();
return psg;
}
@@ -822,10 +844,19 @@ bool cac::createChannelIO (const char *pName, cacChannel &chan)
pIO = cacGlobalServiceList.createChannelIO ( pName, chan );
if ( ! pIO ) {
if ( ! this->pudpiiu ) {
this->defaultMutex.lock ();
this->pudpiiu = new udpiiu ( this );
if ( ! this->pudpiiu ) {
this->defaultMutex.unlock ();
return false;
}
if ( ! this->enablePreemptiveCallback ) {
if ( this->fdRegFunc ) {
( *this->fdRegFunc )
( this->fdRegArg, this->pudpiiu->getSock (), TRUE );
}
}
this->defaultMutex.unlock ();
}
nciu *pNetChan = new nciu ( this, chan, pName );
if ( pNetChan ) {
@@ -850,10 +881,28 @@ bool cac::createChannelIO (const char *pName, cacChannel &chan)
void cac::lock () const
{
semMutexMustTake ( this->ca_client_lock );
this->defaultMutex.lock ();
}
void cac::unlock () const
{
semMutexGive ( this->ca_client_lock );
this->defaultMutex.unlock ();
}
void cac::registerForFileDescriptorCallBack ( CAFDHANDLER *pFunc, void *pArg )
{
this->defaultMutex.lock ();
this->fdRegFunc = pFunc;
this->fdRegArg = pArg;
this->defaultMutex.unlock ();
}
void cac::enableCallbackPreemption ()
{
this->pProcThread->enable ();
}
void cac::disableCallbackPreemption ()
{
this->pProcThread->disable ();
}
+2
View File
@@ -188,6 +188,7 @@ epicsShareFunc enum channel_state epicsShareAPI ca_state (chid chan);
/* Must be called once before calling any of the other routines */
/************************************************************************/
epicsShareFunc int epicsShareAPI ca_task_initialize (void);
epicsShareFunc int epicsShareAPI ca_context_create ( int preemptiveCallBackEnable );
/************************************************************************/
/* Remove CA facility from your task */
@@ -195,6 +196,7 @@ epicsShareFunc int epicsShareAPI ca_task_initialize (void);
/* Normally called automatically at task exit */
/************************************************************************/
epicsShareFunc int epicsShareAPI ca_task_exit (void);
epicsShareFunc int epicsShareAPI ca_context_destroy (void);
/************************************************************************
* anachronistic entry points *
-76
View File
@@ -1,76 +0,0 @@
/* $Id$ */
/*
*
* L O S A L A M O S
* Los Alamos National Laboratory
* Los Alamos, New Mexico 87545
*
* Copyright, 1986, The Regents of the University of California.
*
* Author: Jeff Hill
*/
#include "iocinf.h"
/*
* FLOW CONTROL
*
* Keep track of how many times messages have
* come with out a break in between and
* suppress monitors if we are behind
* (an update is sent when we catch up)
*/
void flow_control_on (tcpiiu *piiu)
{
int status;
LOCK (piiu->pcas);
/*
* I prefer to avoid going into flow control
* as this impacts the performance of batched fetches
*/
if (piiu->contiguous_msg_count >= MAX_CONTIGUOUS_MSG_COUNT) {
if (!piiu->client_busy) {
status = piiu->busyRequestMsg ();
if (status==ECA_NORMAL) {
assert(piiu->pcas->ca_number_iiu_in_fc<UINT_MAX);
piiu->pcas->ca_number_iiu_in_fc++;
piiu->client_busy = TRUE;
# if defined(DEBUG)
printf("fc on\n");
# endif
}
}
}
else {
piiu->contiguous_msg_count++;
}
UNLOCK (piiu->pcas);
return;
}
void flow_control_off (tcpiiu *piiu)
{
int status;
LOCK (piiu->pcas);
piiu->contiguous_msg_count = 0;
if (piiu->client_busy) {
status = piiu->readyRequestMsg ();
if (status==ECA_NORMAL) {
assert (piiu->pcas->ca_number_iiu_in_fc>0u);
piiu->pcas->ca_number_iiu_in_fc--;
piiu->client_busy = FALSE;
# if defined(DEBUG)
printf("fc off\n");
# endif
}
}
UNLOCK (piiu->pcas);
return;
}
+36 -13
View File
@@ -108,9 +108,6 @@
#define MSEC_PER_SEC 1000L
#define USEC_PER_SEC 1000000L
#define LOCK(PCAC) semMutexMustTake ( (PCAC)->ca_client_lock );
#define UNLOCK(PCAC) semMutexGive ( (PCAC)->ca_client_lock );
/*
* catch when they use really large strings
*/
@@ -429,6 +426,7 @@ public:
int pushDatagramMsg (const caHdr *pMsg, const void *pExt, ca_uint16_t extsize);
void repeaterRegistrationMessage ( unsigned attemptNumber );
void flush ();
SOCKET getSock () const;
osiTime recvTime;
char xmitBuf[MAX_UDP];
@@ -437,7 +435,6 @@ public:
repeaterSubscribeTimer repeaterSubscribeTmr;
semMutexId xmitBufLock;
ELLLIST dest;
SOCKET sock;
semBinaryId recvThreadExitSignal;
unsigned nBytesInXmitBuf;
unsigned short repeaterPort;
@@ -446,7 +443,8 @@ public:
// exceptions
class noSocket {};
class noMemory {};
private:
private:
SOCKET sock;
bool compareIfTCP (nciu &chan, const sockaddr_in &) const;
};
@@ -496,6 +494,8 @@ private:
const double period;
};
extern "C" void cacSendThreadTCP ( void *pParam );
class tcpiiu : public tcpRecvWatchdog, public tcpSendWatchdog,
public netiiu, public tsDLNode <tcpiiu> {
public:
@@ -520,6 +520,7 @@ public:
void flush ();
virtual void show (unsigned level) const;
osiSockAddr ipAddress () const;
SOCKET getSock () const;
void noopRequestMsg ();
void echoRequestMsg ();
@@ -542,7 +543,6 @@ public:
unsigned minor_version_number;
unsigned contiguous_msg_count;
unsigned curMsgBytes;
SOCKET sock;
iiu_conn_state state;
bool client_busy;
bool echoRequestPending;
@@ -554,9 +554,16 @@ public:
private:
bool compareIfTCP (nciu &chan, const sockaddr_in &) const;
int pushDatagramMsg (const caHdr *pMsg, const void *pExt, ca_uint16_t extsize);
int pushStreamMsgPrivate ( const caHdr *pmsg, const void *pext,
unsigned extsize, unsigned actualextsize );
void flowControlOn ();
void flowControlOff ();
SOCKET sock;
bool fc;
static tsFreeList < class tcpiiu, 16 > freeList;
friend void cacSendThreadTCP ( void *pParam );
};
class inetAddrID {
@@ -614,9 +621,23 @@ public:
~processThread ();
void entryPoint ();
void signalShutDown ();
void enable ();
void disable ();
private:
//
// The additional complexity associated with
// "processingDone" event and the "processing" flag
// avoid complex locking hierarchy constraints
// and therefore reduces the chance of creating
// a deadlock window during code maintenance.
//
class cac *pcac;
osiEvent exit;
osiEvent processingDone;
osiMutex mutex;
unsigned enableRefCount;
unsigned blockingForCompletion;
bool processing;
bool shutDown;
};
@@ -683,7 +704,7 @@ private:
class cac : public caClient {
public:
cac ();
cac ( bool enablePreemptiveCallback = false );
virtual ~cac ();
void safeDestroyNMIU (unsigned id);
void processRecvBacklog ();
@@ -721,8 +742,11 @@ public:
void uninstallCASG (CASG &);
void registerService ( cacServiceIO &service );
bool createChannelIO (const char *name_str, cacChannel &chan);
void registerForFileDescriptorCallBack ( CAFDHANDLER *pFunc, void *pArg );
void lock () const;
void unlock () const;
void enableCallbackPreemption ();
void disableCallbackPreemption ();
osiTimerQueue *pTimerQueue;
ELLLIST activeCASGOP;
@@ -735,8 +759,6 @@ public:
caExceptionHandler *ca_exception_func;
void *ca_exception_arg;
caPrintfFunc *ca_printf_func;
CAFDHANDLER *ca_fd_register_func;
void *ca_fd_register_arg;
char *ca_pUserName;
char *ca_pHostName;
resTable
@@ -745,8 +767,6 @@ public:
semBinaryId ca_io_done_sem;
osiEvent recvActivity;
semBinaryId ca_blockSem;
semMutexId ca_client_lock;
processThread *pProcThread;
unsigned readSeq;
unsigned ca_nextSlowBucketId;
unsigned ca_number_iiu_in_fc;
@@ -769,7 +789,12 @@ private:
< nciu > chanTable;
chronIntIdResTable
< CASG > sgTable;
processThread *pProcThread;
CAFDHANDLER *fdRegFunc;
void *fdRegArg;
unsigned pndrecvcnt;
bool enablePreemptiveCallback;
int pendPrivate (double timeout, int early);
};
@@ -782,8 +807,6 @@ int ca_defunct (void);
int ca_printf (const char *pformat, ...);
int ca_vPrintf (const char *pformat, va_list args);
void manage_conn (cac *pcac);
void flow_control_on (tcpiiu *piiu);
void flow_control_off (tcpiiu *piiu);
epicsShareFunc void epicsShareAPI ca_repeater (void);
int cac_select_io (cac *pcac, double maxDelay, int flags);
+5 -8
View File
@@ -678,11 +678,10 @@ void nciu::disconnect ()
*/
int nciu::searchMsg ()
{
udpiiu *pudpiiu = this->piiu->pcas->pudpiiu;
int status;
caHdr msg;
if ( this->piiu != static_cast<netiiu *> (pudpiiu) ) {
if ( this->piiu != static_cast<netiiu *> (this->piiu->pcas->pudpiiu) ) {
return ECA_INTERNAL;
}
@@ -690,10 +689,10 @@ int nciu::searchMsg ()
return ECA_STRTOBIG;
}
msg.m_cmmd = htons (CA_PROTO_SEARCH);
msg.m_cmmd = htons ( CA_PROTO_SEARCH );
msg.m_available = this->getId ();
msg.m_dataType = htons (DONTREPLY);
msg.m_count = htons (CA_MINOR_VERSION);
msg.m_dataType = htons ( DONTREPLY );
msg.m_count = htons ( CA_MINOR_VERSION );
msg.m_cid = this->getId ();
status = this->piiu->pushDatagramMsg (&msg, this->pNameStr, this->nameLength);
@@ -704,7 +703,7 @@ int nciu::searchMsg ()
/*
* increment the number of times we have tried to find this thisnel
*/
if (this->retry<MAXCONNTRIES) {
if ( this->retry < MAXCONNTRIES ) {
this->retry++;
}
@@ -891,14 +890,12 @@ int nciu::subscriptionMsg ( unsigned subscriptionId, unsigned typeIn,
msg.m_info.m_mask = htons ( maskIn );
msg.m_info.m_pad = 0; /* allow future use */
this->lock ();
if ( this->f_connected ) {
status = this->piiu->pushStreamMsg ( &msg.m_header, &msg.m_info, true );
}
else {
status = ECA_NORMAL;
}
this->unlock ();
return status;
}
+63 -1
View File
@@ -13,6 +13,7 @@
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*
*/
#include <iocinf.h>
@@ -20,6 +21,9 @@
processThread::processThread (cac *pcacIn) :
osiThread ( "CAC-process", threadGetStackSize (threadStackSmall), threadPriorityMedium ),
pcac ( pcacIn ),
enableRefCount ( 0u ),
blockingForCompletion ( 0u ),
processing ( false ),
shutDown ( false )
{
this->start ();
@@ -38,7 +42,22 @@ void processThread::entryPoint ()
int status = ca_attach_context ( this->pcac );
SEVCHK ( status, "attaching to client context in process thread" );
while ( ! this->shutDown ) {
pcac->processRecvBacklog ();
this->mutex.lock ();
if ( this->enableRefCount ) {
this->processing = true;
}
this->mutex.unlock ();
if ( this->processing ) {
pcac->processRecvBacklog ();
}
this->processing = false;
if ( this->blockingForCompletion ) {
this->processingDone.signal ();
}
this->pcac->recvActivity.wait ();
}
this->exit.signal ();
@@ -50,3 +69,46 @@ void processThread::signalShutDown ()
this->pcac->recvActivity.signal ();
}
void processThread::enable ()
{
unsigned copy;
this->mutex.lock ();
assert ( this->enableRefCount < UINT_MAX );
copy = this->enableRefCount;
this->enableRefCount++;
this->mutex.unlock ();
if ( copy == 0u ) {
this->pcac->recvActivity.signal ();
}
}
void processThread::disable ()
{
bool waitNeeded;
this->mutex.lock ();
assert ( this->enableRefCount != 0u );
this->enableRefCount--;
if ( this->enableRefCount == 0u && this->processing ) {
waitNeeded = true;
this->blockingForCompletion++;
}
else {
waitNeeded = false;
}
this->mutex.unlock ();
if ( waitNeeded ) {
while ( this->processing ) {
this->processingDone.wait ();
}
this->mutex.lock ();
this->blockingForCompletion--;
this->mutex.unlock ();
if ( this->blockingForCompletion ) {
this->processingDone.signal ();
}
}
}
-1
View File
@@ -331,7 +331,6 @@ void cacRingBufferWriteLock (ringBuffer *pBuf)
bool cacRingBufferWriteLockNoBlock (ringBuffer *pBuf, unsigned bytesRequired)
{
semMutexMustTake (pBuf->writeLock);
if ( cacRingBufferWriteSize (pBuf) < bytesRequired ) {
semMutexGive (pBuf->writeLock);
return false;
+8 -8
View File
@@ -47,7 +47,7 @@ void searchTimer::reset ( double delayToNextTry )
delayToNextTry = CA_RECAST_DELAY;
}
LOCK (this->iiu.pcas);
this->iiu.pcas->lock ();
this->retry = 0;
if ( this->period > delayToNextTry ) {
reschedule = true;
@@ -56,7 +56,7 @@ void searchTimer::reset ( double delayToNextTry )
reschedule = false;
}
this->period = CA_RECAST_DELAY;
UNLOCK (this->iiu.pcas);
this->iiu.pcas->unlock ();
if ( reschedule ) {
this->reschedule ( delayToNextTry );
@@ -76,7 +76,7 @@ void searchTimer::setRetryInterval (unsigned retryNo)
unsigned idelay;
double delay;
LOCK (this->iiu.pcas);
this->iiu.pcas->lock ();
/*
* set the retry number
@@ -93,7 +93,7 @@ void searchTimer::setRetryInterval (unsigned retryNo)
*/
this->period = min (CA_RECAST_PERIOD, delay);
UNLOCK (this->iiu.pcas);
this->iiu.pcas->unlock ();
debugPrintf ( ("new CA search period is %f sec\n", this->period) );
}
@@ -107,7 +107,7 @@ void searchTimer::setRetryInterval (unsigned retryNo)
//
void searchTimer::notifySearchResponse (nciu *pChan)
{
LOCK (this->iiu.pcas);
this->iiu.pcas->lock ();
if ( this->retrySeqNoAtListBegin <= pChan->retrySeqNo ) {
if ( this->searchResponses < ULONG_MAX ) {
@@ -115,7 +115,7 @@ void searchTimer::notifySearchResponse (nciu *pChan)
}
}
UNLOCK (this->iiu.pcas);
this->iiu.pcas->unlock ();
if ( pChan->retrySeqNo == this->retrySeqNo ) {
this->reschedule (0.0);
@@ -139,7 +139,7 @@ void searchTimer::expire ()
return;
}
LOCK ( this->iiu.pcas );
this->iiu.pcas->lock ();
/*
* increment the retry sequence number
@@ -314,7 +314,7 @@ void searchTimer::expire ()
}
}
UNLOCK (this->iiu.pcas);
this->iiu.pcas->unlock ();
// flush out the search request buffer
this->iiu.flush ();
+142 -71
View File
@@ -145,23 +145,23 @@ LOCAL void retryPendingClaims (tcpiiu *piiu)
{
bool success;
LOCK (piiu->pcas);
piiu->pcas->lock ();
tsDLIterBD<nciu> chan ( piiu->chidList.first () );
while ( chan != chan.eol () ) {
if (!chan->claimPending) {
if ( ! chan->claimPending ) {
piiu->claimRequestsPending = false;
piiu->flush ();
break;
}
// this moves the channel to the end of the list
success = chan->claimMsg (piiu);
success = chan->claimMsg ( piiu );
if ( ! success ) {
piiu->flush ();
break;
}
chan = piiu->chidList.first ();
}
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
}
/*
@@ -291,8 +291,15 @@ void tcpiiu::recvMsg ()
assert ( ( (unsigned) status ) <= writeSpace );
totalBytes = (unsigned) status;
if ( writeSpace == totalBytes ) {
this->flowControlOn ();
}
else {
this->flowControlOff ();
}
cacRingBufferWriteCommit (&this->recv, totalBytes);
cacRingBufferWriteCommit ( &this->recv, totalBytes );
// cacRingBufferWriteFlush (&this->recv);
this->messageArrivalNotify (); // reschedule connection activity watchdog
@@ -508,7 +515,7 @@ tcpiiu::tcpiiu (cac *pcac, const struct sockaddr_in &ina, unsigned minorVersion,
*/
void tcpiiu::shutdown ()
{
LOCK ( this->pcas );
this->pcas->lock ();
if ( this->state != iiu_disconnected ) {
int status;
@@ -522,7 +529,7 @@ void tcpiiu::shutdown ()
cacRingBufferShutDown ( &this->recv );
this->pcas->signalRecvActivityIIU ( *this );
}
UNLOCK ( this->pcas );
this->pcas->unlock ();
}
//
@@ -540,7 +547,7 @@ tcpiiu::~tcpiiu ()
this->shutdown ();
LOCK (this->pcas);
this->pcas->lock ();
chanDisconnectCount = ellCount (&this->chidList);
if ( chanDisconnectCount ) {
@@ -554,7 +561,7 @@ tcpiiu::~tcpiiu ()
iter = next;
}
UNLOCK (this->pcas);
this->pcas->unlock ();
// wait for send and recv threads to exit
semBinaryMustTake ( this->sendThreadExitSignal );
@@ -566,10 +573,6 @@ tcpiiu::~tcpiiu ()
this->pcas->removeBeaconInetAddr ( this->dest.ia );
if ( this->pcas->ca_fd_register_func ) {
(*this->pcas->ca_fd_register_func)
((void *)this->pcas->ca_fd_register_arg, this->sock, FALSE);
}
socket_close (this->sock);
cacRingBufferDestroy (&this->recv);
@@ -596,12 +599,12 @@ bool tcpiiu::compareIfTCP ( nciu &chan, const sockaddr_in &addr ) const
char rej[64];
ipAddrToA ( &addr, rej, sizeof (rej) );
LOCK ( this->pcas );
this->pcas->lock ();
sprintf ( this->pcas->ca_sprintf_buf,
"Channel: %s Accepted: %s Rejected: %s ",
chan.pName (), this->host_name_str, rej );
genLocalExcep (this->pcas, ECA_DBLCHNL, this->pcas->ca_sprintf_buf);
UNLOCK ( this->pcas );
this->pcas->unlock ();
}
return true;
}
@@ -781,7 +784,7 @@ LOCAL void write_notify_resp_action (tcpiiu *piiu)
{
baseNMIU *monix;
LOCK (piiu->pcas);
piiu->pcas->lock ();
monix = piiu->pcas->lookupIO ( piiu->curMsg.m_available );
if (monix) {
int status = ntohl (piiu->curMsg.m_cid);
@@ -793,7 +796,7 @@ LOCAL void write_notify_resp_action (tcpiiu *piiu)
}
monix->destroy ();
}
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
}
/*
@@ -803,7 +806,7 @@ LOCAL void read_notify_resp_action ( tcpiiu *piiu )
{
baseNMIU *monix;
LOCK (piiu->pcas);
piiu->pcas->lock ();
monix = piiu->pcas->lookupIO ( piiu->curMsg.m_available );
if (monix) {
@@ -849,7 +852,7 @@ LOCAL void read_notify_resp_action ( tcpiiu *piiu )
monix->destroy ();
}
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -864,7 +867,7 @@ LOCAL void event_resp_action (tcpiiu *piiu)
/*
* run the user's event handler
*/
LOCK (piiu->pcas);
piiu->pcas->lock ();
monix = piiu->pcas->lookupIO ( piiu->curMsg.m_available );
if ( monix ) {
int v41;
@@ -877,7 +880,7 @@ LOCAL void event_resp_action (tcpiiu *piiu)
*/
if ( ! piiu->curMsg.m_postsize ) {
monix->destroy ();
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -920,7 +923,7 @@ LOCAL void event_resp_action (tcpiiu *piiu)
}
}
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -932,7 +935,7 @@ LOCAL void read_resp_action (tcpiiu *piiu)
{
baseNMIU *pIOBlock;
LOCK (piiu->pcas);
piiu->pcas->lock ();
/*
* verify the event id
@@ -948,7 +951,7 @@ LOCAL void read_resp_action (tcpiiu *piiu)
pIOBlock->destroy ();
}
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -979,7 +982,7 @@ LOCAL void exception_resp_action (tcpiiu *piiu)
sprintf (context, "detected by: %s", piiu->host_name_str);
}
LOCK (piiu->pcas);
piiu->pcas->lock ();
switch ( ntohs (req->m_cmmd) ) {
case CA_PROTO_READ_NOTIFY:
monix = piiu->pcas->lookupIO ( piiu->curMsg.m_available );
@@ -1048,7 +1051,7 @@ LOCAL void exception_resp_action (tcpiiu *piiu)
break;
}
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -1061,14 +1064,14 @@ LOCAL void access_rights_resp_action (tcpiiu *piiu)
int ar;
nciu *chan;
LOCK (piiu->pcas);
piiu->pcas->lock ();
chan = piiu->pcas->lookupChan (piiu->curMsg.m_cid);
if ( ! chan ) {
/*
* end up here if they delete the channel
* prior to connecting
*/
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -1078,7 +1081,7 @@ LOCAL void access_rights_resp_action (tcpiiu *piiu)
chan->accessRightsNotify (chan->ar);
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -1090,11 +1093,11 @@ LOCAL void claim_ciu_resp_action (tcpiiu *piiu)
unsigned sid;
nciu *chan;
LOCK (piiu->pcas);
piiu->pcas->lock ();
chan = piiu->pcas->lookupChan (piiu->curMsg.m_cid);
if ( ! chan ) {
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -1107,7 +1110,7 @@ LOCAL void claim_ciu_resp_action (tcpiiu *piiu)
chan->connect (*piiu, piiu->curMsg.m_dataType, piiu->curMsg.m_count, sid);
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -1119,14 +1122,14 @@ LOCAL void verifyAndDisconnectChan (tcpiiu *piiu)
{
nciu *chan;
LOCK (piiu->pcas);
piiu->pcas->lock ();
chan = piiu->pcas->lookupChan (piiu->curMsg.m_cid);
if (!chan) {
/*
* end up here if they delete the channel
* prior to this response
*/
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -1138,7 +1141,7 @@ LOCAL void verifyAndDisconnectChan (tcpiiu *piiu)
* count goes to zero
*/
chan->disconnect ();
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -1189,9 +1192,6 @@ LOCAL const pProtoStubTCP tcpJumpTableCAC[] =
/*
* post_tcp_msg ()
*
* LOCK should be applied when calling this routine
*
*/
int tcpiiu::post_msg (char *pInBuf, unsigned long blockSize)
{
@@ -1271,7 +1271,7 @@ int tcpiiu::post_msg (char *pInBuf, unsigned long blockSize)
*/
cacheSize = max ( this->curMsg.m_postsize, MAX_STRING_SIZE );
pData = (void *) calloc (1u, cacheSize);
if (!pData) {
if ( ! pData ) {
return ECA_ALLOCMEM;
}
if (this->pCurData) {
@@ -1333,27 +1333,25 @@ void tcpiiu::hostName ( char *pBuf, unsigned bufLength ) const
bool tcpiiu::ca_v42_ok () const
{
return CA_V42 (CA_PROTOCOL_VERSION,
this->minor_version_number);
return CA_V42 (CA_PROTOCOL_VERSION, this->minor_version_number);
}
bool tcpiiu::ca_v41_ok () const
{
return CA_V41 (CA_PROTOCOL_VERSION,
this->minor_version_number);
return CA_V41 (CA_PROTOCOL_VERSION, this->minor_version_number);
}
/*
* tcpiiu::pushStreamMsg ()
*/
int tcpiiu::pushStreamMsg (const caHdr *pmsg,
const void *pext, bool BlockingOk)
int tcpiiu::pushStreamMsg ( const caHdr *pmsg,
const void *pext, bool BlockingOk )
{
caHdr msg;
ca_uint16_t actualextsize;
ca_uint16_t extsize;
unsigned msgsize;
unsigned bytesSent;
int status;
if ( pext == NULL ) {
extsize = actualextsize = 0;
@@ -1367,25 +1365,38 @@ int tcpiiu::pushStreamMsg (const caHdr *pmsg,
}
msg = *pmsg;
msg.m_postsize = htons (extsize);
msgsize = extsize + sizeof (msg);
msg.m_postsize = htons ( extsize );
msgsize = extsize + sizeof ( msg );
if ( ! cacRingBufferWriteLockNoBlock ( &this->send, msgsize ) ) {
if ( BlockingOk ) {
this->armSendWatchdog ();
cacRingBufferWriteLock ( &this->send );
}
else {
return ECA_OPWILLBLOCK;
}
if ( cacRingBufferWriteLockNoBlock ( &this->send, msgsize ) ) {
status = this->pushStreamMsgPrivate ( &msg, pext, extsize, actualextsize );
cacRingBufferWriteUnlock ( &this->send );
return status;
}
else if ( BlockingOk ) {
this->armSendWatchdog ();
this->pcas->enableCallbackPreemption ();
cacRingBufferWriteLock ( &this->send );
status = this->pushStreamMsgPrivate ( &msg, pext, extsize, actualextsize );
cacRingBufferWriteUnlock ( &this->send );
this->pcas->disableCallbackPreemption ();
return status;
}
else {
return ECA_OPWILLBLOCK;
}
}
int tcpiiu::pushStreamMsgPrivate ( const caHdr *pmsg, const void *pext,
unsigned extsize, unsigned actualextsize )
{
unsigned bytesSent;
/*
* push the header onto the ring
*/
bytesSent = cacRingBufferWrite ( &this->send, &msg, sizeof (msg) );
if ( bytesSent != sizeof (msg) ) {
cacRingBufferWriteUnlock ( &this->send );
bytesSent = cacRingBufferWrite ( &this->send, pmsg, sizeof ( *pmsg ) );
if ( bytesSent != sizeof ( *pmsg ) ) {
return ECA_DISCONNCHID;
}
@@ -1397,7 +1408,6 @@ int tcpiiu::pushStreamMsg (const caHdr *pmsg,
if ( extsize > 0u ) {
bytesSent = cacRingBufferWrite ( &this->send, pext, actualextsize );
if ( bytesSent != actualextsize ) {
cacRingBufferWriteUnlock ( &this->send );
return ECA_DISCONNCHID;
}
/*
@@ -1408,19 +1418,15 @@ int tcpiiu::pushStreamMsg (const caHdr *pmsg,
unsigned long n;
n = extsize-actualextsize;
if (n) {
if ( n ) {
assert ( n <= sizeof (nullBuff) );
bytesSent = cacRingBufferWrite ( &this->send, nullBuff, n );
if ( bytesSent != n ) {
cacRingBufferWriteUnlock ( &this->send );
return ECA_DISCONNCHID;
}
}
}
}
cacRingBufferWriteUnlock ( &this->send );
return ECA_NORMAL;
}
@@ -1437,19 +1443,19 @@ int tcpiiu::pushDatagramMsg (const caHdr * /* pMsg */,
*/
void tcpiiu::addToChanList (nciu *chan)
{
LOCK (this->pcas);
this->pcas->lock ();
chan->claimPending = TRUE;
this->chidList.push (*chan);
chan->piiu = this;
UNLOCK (this->pcas);
this->pcas->unlock ();
}
void tcpiiu::removeFromChanList (nciu *chan)
{
LOCK (this->pcas);
this->pcas->lock ();
this->chidList.remove (*chan);
chan->piiu = NULL;
UNLOCK (this->pcas);
this->pcas->unlock ();
if ( this->chidList.count () == 0 ) {
this->shutdown ();
@@ -1458,7 +1464,7 @@ void tcpiiu::removeFromChanList (nciu *chan)
void tcpiiu::disconnect (nciu *chan)
{
LOCK (this->pcas);
this->pcas->lock ();
this->removeFromChanList (chan);
/*
* try to reconnect
@@ -1466,5 +1472,70 @@ void tcpiiu::disconnect (nciu *chan)
assert (this->pcas->pudpiiu);
this->pcas->pudpiiu->addToChanList ( chan );
this->pcas->pudpiiu->searchTmr.reset ( CA_RECAST_DELAY );
UNLOCK (this->pcas);
this->pcas->unlock ();
}
SOCKET tcpiiu::getSock () const
{
return this->sock;
}
/*
* FLOW CONTROL
*
* Keep track of how many times messages have
* come with out a break in between and
* suppress monitors if we are behind
* (an update is sent when we catch up)
*/
void tcpiiu::flowControlOn ()
{
int status;
this->pcas->lock ();
/*
* I prefer to avoid going into flow control
* as this impacts the performance of batched fetches
*/
if ( this->contiguous_msg_count >= MAX_CONTIGUOUS_MSG_COUNT ) {
if ( ! this->client_busy ) {
status = this->busyRequestMsg ();
if ( status == ECA_NORMAL ) {
assert ( this->pcas->ca_number_iiu_in_fc < UINT_MAX );
this->pcas->ca_number_iiu_in_fc++;
this->client_busy = TRUE;
# if defined(DEBUG)
printf("fc on\n");
# endif
}
}
}
else {
this->contiguous_msg_count++;
}
this->pcas->unlock ();
}
void tcpiiu::flowControlOff ()
{
int status;
this->pcas->lock ();
this->contiguous_msg_count = 0;
if ( this->client_busy ) {
status = this->readyRequestMsg ();
if ( status == ECA_NORMAL ) {
assert ( this->pcas->ca_number_iiu_in_fc > 0u );
this->pcas->ca_number_iiu_in_fc--;
this->client_busy = FALSE;
# if defined (DEBUG)
printf("fc off\n");
# endif
}
}
this->pcas->unlock ();
}
+32 -42
View File
@@ -184,7 +184,7 @@ void udpiiu::repeaterRegistrationMessage ( unsigned attemptNumber )
errnoCpy != SOCK_ECONNREFUSED ) {
ca_printf (
"CAC: error sending to repeater was \"%s\"\n",
SOCKERRSTR(errnoCpy));
SOCKERRSTR (errnoCpy) );
}
}
}
@@ -354,7 +354,7 @@ udpiiu::udpiiu ( cac *pcac ) :
priorityOfRecv = priorityOfSelf;
}
tid = threadCreate ("CAC-UDP-recv", priorityOfRecv,
tid = threadCreate ("CAC-UDP", priorityOfRecv,
threadGetStackSize (threadStackMedium), cacRecvThreadUDP, this);
if (tid==0) {
ca_printf ("CA: unable to create UDP receive thread\n");
@@ -365,10 +365,6 @@ udpiiu::udpiiu ( cac *pcac ) :
}
}
if ( pcac->ca_fd_register_func ) {
( *pcac->ca_fd_register_func ) ( pcac->ca_fd_register_arg, this->sock, TRUE );
}
if ( ! repeater_installed (this) ) {
osiSpawnDetachedProcessReturn osptr;
@@ -380,24 +376,16 @@ udpiiu::udpiiu ( cac *pcac ) :
*/
osptr = osiSpawnDetachedProcess ("CA Repeater", "caRepeater");
if ( osptr == osiSpawnDetachedProcessNoSupport ) {
unsigned priorityOfSelf = threadGetPrioritySelf ();
unsigned priorityOfRepeater;
threadId tid;
threadBoolStatus tbs;
tbs = threadLowestPriorityLevelAbove ( priorityOfSelf, &priorityOfRepeater );
if ( tbs != tbsSuccess ) {
priorityOfRepeater = priorityOfSelf;
}
tid = threadCreate ( "CAC-repeater", priorityOfRepeater,
tid = threadCreate ( "CAC-repeater", threadPriorityLow,
threadGetStackSize (threadStackMedium), caRepeaterThread, 0);
if (tid==0) {
if ( tid == 0 ) {
ca_printf ("CA: unable to create CA repeater daemon thread\n");
}
}
else if (osptr==osiSpawnDetachedProcessFail) {
ca_printf ("CA: unable to start CA repeater daemon detached process\n");
else if ( osptr == osiSpawnDetachedProcessFail ) {
ca_printf ( "CA: unable to start CA repeater daemon detached process\n" );
}
}
}
@@ -409,10 +397,12 @@ udpiiu::~udpiiu ()
{
nciu *pChan, *pNext;
this->searchTmr.cancel ();
// closes the udp socket
this->shutdown ();
LOCK (this->pcas);
this->pcas->lock ();
tsDLIter<nciu> iter (this->chidList);
pChan = iter ();
while (pChan) {
@@ -420,19 +410,15 @@ udpiiu::~udpiiu ()
pChan->destroy ();
pChan = pNext;
}
UNLOCK (this->pcas);
this->pcas->unlock ();
// wait for recv threads to exit
semBinaryMustTake (this->recvThreadExitSignal);
semMutexDestroy (this->xmitBufLock);
semBinaryDestroy (this->recvThreadExitSignal);
ellFree (&this->dest);
if (this->pcas->ca_fd_register_func) {
(*this->pcas->ca_fd_register_func)
(this->pcas->ca_fd_register_arg, this->sock, FALSE);
}
ellFree (&this->dest);
}
/*
@@ -440,7 +426,7 @@ udpiiu::~udpiiu ()
*/
void udpiiu::shutdown ()
{
LOCK (this->pcas);
this->pcas->lock ();
if ( ! this->shutdownCmd ) {
int status;
@@ -455,7 +441,7 @@ void udpiiu::shutdown ()
SOCKERRSTR (SOCKERRNO) );
}
}
UNLOCK (this->pcas);
this->pcas->unlock ();
}
/*
@@ -495,10 +481,10 @@ LOCAL void search_resp_action (udpiiu *piiu, caHdr *pMsg, const struct sockaddr_
*
* lock required around use of the sprintf buffer
*/
LOCK (piiu->pcas);
piiu->pcas->lock ();
chan = piiu->pcas->lookupChan (pMsg->m_available);
if (!chan) {
UNLOCK (piiu->pcas);
if ( ! chan ) {
piiu->pcas->unlock ();
return;
}
@@ -546,13 +532,13 @@ LOCAL void search_resp_action (udpiiu *piiu, caHdr *pMsg, const struct sockaddr_
* Ignore duplicate search replies
*/
if ( chan->piiu->compareIfTCP (*chan, *pnet_addr) ) {
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
allocpiiu = constructTCPIIU (piiu->pcas, &ina, minorVersion);
if ( ! allocpiiu ) {
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -607,7 +593,8 @@ LOCAL void search_resp_action (udpiiu *piiu, caHdr *pMsg, const struct sockaddr_
chan->claimMsg ( allocpiiu );
cacRingBufferWriteFlush ( &allocpiiu->send );
UNLOCK ( piiu->pcas );
piiu->pcas->unlock ();
}
@@ -619,7 +606,7 @@ LOCAL void beacon_action ( udpiiu * piiu,
{
struct sockaddr_in ina;
LOCK (piiu->pcas);
piiu->pcas->lock ();
/*
* this allows a fan-out server to potentially
@@ -654,7 +641,7 @@ LOCAL void beacon_action ( udpiiu * piiu,
}
piiu->pcas->beaconNotify (ina);
UNLOCK (piiu->pcas);
piiu->pcas->unlock ();
return;
}
@@ -738,9 +725,6 @@ LOCAL const pProtoStubUDP udpJumpTableCAC[] =
/*
* post_udp_msg ()
*
* LOCK should be applied when calling this routine
*
*/
int udpiiu::post_msg (const struct sockaddr_in *pnet_addr,
char *pInBuf, unsigned long blockSize)
@@ -832,7 +816,8 @@ bool udpiiu::compareIfTCP (nciu &, const sockaddr_in &) const
*/
void udpiiu::addToChanList (nciu *chan)
{
LOCK (this->pcas);
this->pcas->lock ();
/*
* add to the beginning of the list so that search requests for
* this channel will be sent first (since the retry count is zero)
@@ -847,14 +832,14 @@ void udpiiu::addToChanList (nciu *chan)
chan->retry = 0u;
this->chidList.push (*chan);
chan->piiu = this;
UNLOCK (this->pcas);
this->pcas->unlock ();
}
void udpiiu::removeFromChanList (nciu *chan)
{
tsDLIterBD<nciu> iter (chan);
LOCK (this->pcas);
this->pcas->lock ();
if ( chan->piiu->pcas->endOfBCastList == iter ) {
if ( iter.itemBefore () != tsDLIterBD<nciu>::eol () ) {
chan->piiu->pcas->endOfBCastList = iter.itemBefore ();
@@ -866,7 +851,7 @@ void udpiiu::removeFromChanList (nciu *chan)
}
chan->piiu->chidList.remove (*chan);
chan->piiu = NULL;
UNLOCK (this->pcas);
this->pcas->unlock ();
}
void udpiiu::disconnect ( nciu * /* chan */ )
@@ -982,3 +967,8 @@ int udpiiu::pushStreamMsg ( const caHdr * /* pmsg */,
ca_printf ("in pushStreamMsg () for a udp iiu?\n");
return ECA_DISCONNCHID;
}
SOCKET udpiiu::getSock () const
{
return this->sock;
}