From 47729fed4164f7c7caba10636bc2b064213f8531 Mon Sep 17 00:00:00 2001 From: Jeff Hill Date: Thu, 22 Jun 2000 23:59:51 +0000 Subject: [PATCH] added preemptive callback control reinstalled flow control --- src/ca/CASG.cpp | 8 +- src/ca/Makefile | 5 +- src/ca/access.cpp | 66 ++++++----- src/ca/caPutNotify.cpp | 4 +- src/ca/cac.cpp | 243 +++++++++++++++++++++++---------------- src/ca/cadef.h | 2 + src/ca/flow_control.cpp | 76 ------------ src/ca/iocinf.h | 49 +++++--- src/ca/nciu.cpp | 13 +-- src/ca/processThread.cpp | 64 ++++++++++- src/ca/ringBuffer.cpp | 1 - src/ca/searchTimer.cpp | 16 +-- src/ca/tcpiiu.cpp | 213 ++++++++++++++++++++++------------ src/ca/udpiiu.cpp | 74 ++++++------ 14 files changed, 480 insertions(+), 354 deletions(-) delete mode 100644 src/ca/flow_control.cpp diff --git a/src/ca/CASG.cpp b/src/ca/CASG.cpp index 6325fa74a..7143efddb 100644 --- a/src/ca/CASG.cpp +++ b/src/ca/CASG.cpp @@ -89,6 +89,7 @@ int CASG::block ( double timeout ) if ( p ) { return ECA_EVDISALLOW; } + threadPrivateSet (cacRecursionLock, &cacRecursionLock); cur_time = osiTime::getCurrent (); @@ -98,6 +99,8 @@ int CASG::block ( double timeout ) beg_time = cur_time; delay = 0.0; + this->client.enableCallbackPreemption (); + status = ECA_NORMAL; while ( 1 ) { this->mutex.lock (); @@ -135,9 +138,6 @@ int CASG::block ( double timeout ) break; } - /* - * wait for asynch notification - */ this->sem.wait ( remaining ); /* @@ -148,6 +148,8 @@ int CASG::block ( double timeout ) delay = cur_time - beg_time; } + this->client.disableCallbackPreemption (); + threadPrivateSet (cacRecursionLock, NULL); return status; diff --git a/src/ca/Makefile b/src/ca/Makefile index f603f3c72..c837d2427 100644 --- a/src/ca/Makefile +++ b/src/ca/Makefile @@ -22,7 +22,6 @@ LIBSRCS += cacServiceList.cpp LIBSRCS += access.cpp LIBSRCS += processThread.cpp LIBSRCS += iocinf.cpp -LIBSRCS += flow_control.cpp LIBSRCS += convert.cpp LIBSRCS += test_event.cpp LIBSRCS += repeater.cpp @@ -73,10 +72,12 @@ Com_DIR = $(INSTALL_LIB) caRepeater_SRCS = caRepeater.cpp PROD += caRepeater -PROD += catime acctst +PROD_DEFAULT += catime acctst catime_SRCS = catimeMain.c catime.c acctst_SRCS = acctstMain.c acctst.c +PROD_vxWorks = catime acctst + include $(TOP)/configure/RULES diff --git a/src/ca/access.cpp b/src/ca/access.cpp index 2a50a8168..3e6a60e3c 100644 --- a/src/ca/access.cpp +++ b/src/ca/access.cpp @@ -111,6 +111,11 @@ extern "C" void ca_default_exception_handler (struct exception_handler_args args * ca_task_initialize () */ int epicsShareAPI ca_task_initialize (void) +{ + return ca_context_create ( false ); +} + +epicsShareFunc int epicsShareAPI ca_context_create ( int preemptiveCallBackEnable ) { cac *pcac; @@ -125,7 +130,7 @@ int epicsShareAPI ca_task_initialize (void) return ECA_NORMAL; } - pcac = new cac; + pcac = new cac ( preemptiveCallBackEnable ? true : false ); if ( ! pcac ) { return ECA_ALLOCMEM; } @@ -179,12 +184,7 @@ int epicsShareAPI ca_modify_user_name (const char *) } -/* - * ca_task_exit() - * - * releases all resources alloc to a channel access client - */ -epicsShareFunc int epicsShareAPI ca_task_exit (void) +epicsShareFunc int epicsShareAPI ca_context_destroy (void) { cac *pcac; @@ -199,6 +199,16 @@ epicsShareFunc int epicsShareAPI ca_task_exit (void) return ECA_NORMAL; } +/* + * ca_task_exit() + * + * releases all resources alloc to a channel access client + */ +epicsShareFunc int epicsShareAPI ca_task_exit (void) +{ + return ca_context_destroy (); +} + /* * * CA_BUILD_AND_CONNECT @@ -323,13 +333,13 @@ int epicsShareAPI ca_add_exception_event (caExceptionHandler *pfunc, void *arg) cac *pcac; int caStatus; - caStatus = fetchClientContext (&pcac); + caStatus = fetchClientContext ( &pcac ); if ( caStatus != ECA_NORMAL ) { return caStatus; } - LOCK (pcac); - if (pfunc) { + pcac->lock (); + if ( pfunc ) { pcac->ca_exception_func = pfunc; pcac->ca_exception_arg = arg; } @@ -337,7 +347,7 @@ int epicsShareAPI ca_add_exception_event (caExceptionHandler *pfunc, void *arg) pcac->ca_exception_func = ca_default_exception_handler; pcac->ca_exception_arg = NULL; } - UNLOCK (pcac); + pcac->unlock (); return ECA_NORMAL; } @@ -515,9 +525,9 @@ void genLocalExcepWFL (cac *pcac, long stat, const char *ctx, const char *pFile, else if (pcac->ca_exception_func!=NULL) { args.usr = pcac->ca_exception_arg; - LOCK (pcac); + pcac->lock (); (*pcac->ca_exception_func) (args); - UNLOCK (pcac); + pcac->unlock (); } } @@ -626,20 +636,17 @@ void epicsShareAPI ca_signal_formated (long ca_status, const char *pfilenm, * (for a manager of the select system call under UNIX) * */ -int epicsShareAPI ca_add_fd_registration(CAFDHANDLER *func, void *arg) +int epicsShareAPI ca_add_fd_registration (CAFDHANDLER *func, void *arg) { - cac *pcac; - int caStatus; + cac *pcac; + int caStatus; - caStatus = fetchClientContext (&pcac); + caStatus = fetchClientContext ( &pcac ); if ( caStatus != ECA_NORMAL ) { return caStatus; } - LOCK (pcac); - pcac->ca_fd_register_func = func; - pcac->ca_fd_register_arg = arg; - UNLOCK (pcac); + pcac->registerForFileDescriptorCallBack ( func, arg ); return ECA_NORMAL; } @@ -703,14 +710,14 @@ int epicsShareAPI ca_replace_printf_handler (caPrintfFunc *ca_printf_func) return caStatus; } - LOCK (pcac); + pcac->lock (); if (ca_printf_func) { pcac->ca_printf_func = ca_printf_func; } else { pcac->ca_printf_func = epicsVprintf; } - UNLOCK (pcac); + pcac->unlock (); return ECA_NORMAL; } @@ -844,14 +851,12 @@ unsigned epicsShareAPI ca_get_ioc_connection_count () return pcac->connectionCount (); } -void netiiu::show (unsigned /* level */) const +void netiiu::show ( unsigned /* level */ ) const { - nciu *pChan; + this->pcas->lock (); - LOCK (this->pcas); - - tsDLIter iter (this->pcas->pudpiiu->chidList); - while ( ( pChan = iter () ) ) { + tsDLIterConstBD pChan ( this->chidList.first () ); + while ( pChan != pChan.eol () ) { char hostName [256]; printf( "%s native type=%d ", pChan->pName (), pChan->nativeType () ); @@ -877,7 +882,8 @@ void netiiu::show (unsigned /* level */) const printf("\n"); } - UNLOCK (this->pcas); + this->pcas->unlock (); + } epicsShareFunc int epicsShareAPI ca_channel_status (threadId /* tid */) diff --git a/src/ca/caPutNotify.cpp b/src/ca/caPutNotify.cpp index ce7f1f890..583c09938 100644 --- a/src/ca/caPutNotify.cpp +++ b/src/ca/caPutNotify.cpp @@ -57,9 +57,9 @@ LOCAL void caExtraEventLabor (void *pArg) args.status = ECA_NORMAL; } - LOCK (pcac); + pcac->lock (); (*ppnb->caUserCallback) (args); - UNLOCK (pcac); + pcac->unlock (); ppnb->busy = FALSE; } diff --git a/src/ca/cac.cpp b/src/ca/cac.cpp index 67a8f56e9..36e8e1609 100644 --- a/src/ca/cac.cpp +++ b/src/ca/cac.cpp @@ -35,13 +35,16 @@ static void cacInitRecursionLock ( void * dummy ) // // cac::cac () // -cac::cac () : - beaconTable (1024), - endOfBCastList (0), - ioTable (1024), - chanTable (1024), - sgTable (128), - pndrecvcnt (0) +cac::cac ( bool enablePreemptiveCallbackIn ) : + beaconTable ( 1024 ), + endOfBCastList ( 0 ), + ioTable ( 1024 ), + chanTable ( 1024 ), + sgTable ( 128 ), + fdRegFunc ( 0 ), + fdRegArg ( 0 ), + pndrecvcnt ( 0 ), + enablePreemptiveCallback ( enablePreemptiveCallbackIn ) { long status; static threadOnceId once = OSITHREAD_ONCE_INIT; @@ -80,24 +83,16 @@ cac::cac () : this->pudpiiu = NULL; this->ca_exception_func = ca_default_exception_handler; this->ca_exception_arg = NULL; - this->ca_fd_register_func = NULL; - this->ca_fd_register_arg = NULL; this->ca_number_iiu_in_fc = 0u; this->readSeq = 0u; - this->ca_client_lock = semMutexCreate(); - if (!this->ca_client_lock) { - throwWithLocation ( caErrorCode (ECA_ALLOCMEM) ); - } this->ca_io_done_sem = semBinaryCreate(semEmpty); if (!this->ca_io_done_sem) { - semMutexDestroy (this->ca_client_lock); throwWithLocation ( caErrorCode (ECA_ALLOCMEM) ); } this->ca_blockSem = semBinaryCreate(semEmpty); if (!this->ca_blockSem) { - semMutexDestroy (this->ca_client_lock); semBinaryDestroy (this->ca_io_done_sem); throwWithLocation ( caErrorCode (ECA_ALLOCMEM) ); } @@ -116,7 +111,6 @@ cac::cac () : len = strlen (tmp) + 1; this->ca_pUserName = (char *) malloc ( len ); if ( ! this->ca_pUserName ) { - semMutexDestroy (this->ca_client_lock); semBinaryDestroy (this->ca_io_done_sem); semBinaryDestroy (this->ca_blockSem); throwWithLocation ( caErrorCode (ECA_ALLOCMEM) ); @@ -126,8 +120,7 @@ cac::cac () : /* record the host name */ this->ca_pHostName = localHostName(); - if (!this->ca_pHostName) { - semMutexDestroy (this->ca_client_lock); + if ( ! this->ca_pHostName ) { semBinaryDestroy (this->ca_io_done_sem); semBinaryDestroy (this->ca_blockSem); free (this->ca_pUserName); @@ -151,15 +144,20 @@ cac::cac () : this->ca_server_port = envGetInetPortConfigParam (&EPICS_CA_SERVER_PORT, CA_SERVER_PORT); - this->pProcThread = new processThread (this); + // + // unfortunately, this must be created her in the + // constructor, and not on demand (only when it is needed) + // because the enable reference count must be + // maintained whenever this object exists. + // + this->pProcThread = new processThread ( this ); if ( ! this->pProcThread ) { - semMutexDestroy (this->ca_client_lock); - semBinaryDestroy (this->ca_io_done_sem); - semBinaryDestroy (this->ca_blockSem); - free (this->ca_pUserName); - free (this->ca_pHostName); throwWithLocation ( caErrorCode (ECA_ALLOCMEM) ); } + else if ( this->enablePreemptiveCallback ) { + // only after this->pProcThread is valid + this->enableCallbackPreemption (); + } } /* @@ -169,6 +167,8 @@ cac::cac () : */ cac::~cac () { + this->enableCallbackPreemption (); + // // destroy local IO channels // @@ -205,45 +205,49 @@ cac::~cac () } this->iiuListMutex.unlock (); + this->defaultMutex.lock (); + // // shutdown udp and wait for threads to exit // - if (this->pudpiiu) { + if ( this->pudpiiu ) { + if ( ! this->enablePreemptiveCallback ) { + if ( this->fdRegFunc ) { + ( *this->fdRegFunc ) + ( this->fdRegArg, this->pudpiiu->getSock (), FALSE ); + } + } delete this->pudpiiu; } - LOCK (this); - /* remove put convert block free list */ - ellFree (&this->putCvrtBuf); + ellFree ( &this->putCvrtBuf ); /* reclaim sync group resources */ this->sgTable.destroyAllEntries (); /* free select context lists */ - ellFree (&this->fdInfoFreeList); - ellFree (&this->fdInfoList); + ellFree ( &this->fdInfoFreeList ); + ellFree ( &this->fdInfoList ); /* * free user name string */ - if (this->ca_pUserName) { - free (this->ca_pUserName); + if ( this->ca_pUserName ) { + free ( this->ca_pUserName ); } /* * free host name string */ - if (this->ca_pHostName) { - free (this->ca_pHostName); + if ( this->ca_pHostName ) { + free ( this->ca_pHostName ); } this->beaconTable.destroyAllEntries (); - semBinaryDestroy (this->ca_io_done_sem); - semBinaryDestroy (this->ca_blockSem); - - semMutexDestroy (this->ca_client_lock); + semBinaryDestroy ( this->ca_io_done_sem ); + semBinaryDestroy ( this->ca_blockSem ); osiSockRelease (); @@ -252,14 +256,14 @@ cac::~cac () void cac::safeDestroyNMIU (unsigned id) { - LOCK (this); + this->defaultMutex.lock (); baseNMIU *pIOBlock = this->ioTable.lookup (id); - if (pIOBlock) { + if ( pIOBlock ) { pIOBlock->destroy (); } - UNLOCK (this); + this->defaultMutex.unlock (); } void cac::processRecvBacklog () @@ -271,6 +275,7 @@ void cac::processRecvBacklog () unsigned bytesToProcess; this->iiuListMutex.lock (); + piiu = this->iiuListRecvPending.get (); if ( ! piiu ) { this->iiuListMutex.unlock (); @@ -279,6 +284,7 @@ void cac::processRecvBacklog () piiu->recvPending = false; this->iiuListIdle.add (*piiu); + this->iiuListMutex.unlock (); if ( piiu->state == iiu_disconnected ) { @@ -338,17 +344,19 @@ void cac::cleanUpPendIO () { nciu *pchan; - LOCK (this); + this->defaultMutex.lock (); this->readSeq++; this->pndrecvcnt = 0u; - tsDLIter iter ( this->pudpiiu->chidList ); - while ( ( pchan = iter () ) ) { - pchan->connectTimeoutNotify (); + if ( this->pudpiiu ) { + tsDLIter iter ( this->pudpiiu->chidList ); + while ( ( pchan = iter () ) ) { + pchan->connectTimeoutNotify (); + } } - UNLOCK (this); + this->defaultMutex.unlock (); } unsigned cac::connectionCount () const @@ -363,8 +371,9 @@ unsigned cac::connectionCount () const void cac::show (unsigned level) const { - LOCK (this); - if (this->pudpiiu) { + this->defaultMutex.lock (); + + if ( this->pudpiiu ) { this->pudpiiu->show (level); } @@ -384,19 +393,23 @@ void cac::show (unsigned level) const this->iiuListMutex.unlock (); - UNLOCK (this); + this->defaultMutex.unlock (); } -void cac::installIIU (tcpiiu &iiu) +void cac::installIIU ( tcpiiu &iiu ) { this->iiuListMutex.lock (); iiu.recvPending = false; this->iiuListIdle.add (iiu); this->iiuListMutex.unlock (); - if (this->ca_fd_register_func) { - ( * this->ca_fd_register_func ) ( (void *) this->ca_fd_register_arg, iiu.sock, TRUE); + this->defaultMutex.lock (); + + if ( ! this->enablePreemptiveCallback && this->fdRegFunc ) { + ( * this->fdRegFunc ) + ( (void *) this->fdRegArg, iiu.getSock (), TRUE ); } + this->defaultMutex.unlock (); } void cac::signalRecvActivityIIU (tcpiiu &iiu) @@ -429,27 +442,33 @@ void cac::removeIIU (tcpiiu &iiu) { this->iiuListMutex.lock (); - if (iiu.recvPending) { + if ( iiu.recvPending ) { this->iiuListRecvPending.remove (iiu); } else { this->iiuListIdle.remove (iiu); } + if ( ! this->enablePreemptiveCallback ) { + if ( this->fdRegFunc ) { + (*this->fdRegFunc) + ((void *)this->fdRegArg, iiu.getSock (), FALSE); + } + } + this->iiuListMutex.unlock (); } /* * cac::lookupBeaconInetAddr() * - * LOCK must be applied */ bhe * cac::lookupBeaconInetAddr (const inetAddrID &ina) { bhe *pBHE; - LOCK (this); + this->defaultMutex.lock (); pBHE = this->beaconTable.lookup (ina); - UNLOCK (this); + this->defaultMutex.unlock (); return pBHE; } @@ -460,7 +479,7 @@ bhe *cac::createBeaconHashEntry (const inetAddrID &ina, const osiTime &initialTi { bhe *pBHE; - LOCK (this); + this->defaultMutex.lock (); pBHE = this->beaconTable.lookup ( ina ); if ( !pBHE ) { @@ -473,7 +492,7 @@ bhe *cac::createBeaconHashEntry (const inetAddrID &ina, const osiTime &initialTi } } - UNLOCK (this); + this->defaultMutex.unlock (); return pBHE; } @@ -491,7 +510,8 @@ void cac::beaconNotify ( const inetAddrID &addr ) return; } - LOCK ( this ); + this->defaultMutex.lock (); + /* * look for it in the hash table */ @@ -512,7 +532,7 @@ void cac::beaconNotify ( const inetAddrID &addr ) } if ( ! netChange ) { - UNLOCK (this); + this->defaultMutex.unlock (); return; } @@ -533,8 +553,7 @@ void cac::beaconNotify ( const inetAddrID &addr ) int saddr_length = sizeof(saddr); int status; - status = getsockname ( this->pudpiiu->sock, (struct sockaddr *) &saddr, - &saddr_length ); + status = getsockname ( this->pudpiiu->getSock (), (struct sockaddr *) &saddr, &saddr_length ); if ( status < 0 ) { epicsPrintf ( "CAC: getsockname () error was \"%s\"\n", SOCKERRSTR (SOCKERRNO) ); return; @@ -562,7 +581,7 @@ void cac::beaconNotify ( const inetAddrID &addr ) iter++; } - UNLOCK ( this ); + this->defaultMutex.unlock (); # if DEBUG { @@ -581,22 +600,22 @@ void cac::removeBeaconInetAddr (const inetAddrID &ina) { bhe *pBHE; - LOCK (this); + this->defaultMutex.lock (); pBHE = this->beaconTable.remove ( ina ); - UNLOCK (this); + this->defaultMutex.unlock (); assert (pBHE); } void cac::decrementOutstandingIO (unsigned seqNumber) { - LOCK (this); + this->defaultMutex.lock (); if ( this->readSeq == seqNumber ) { if ( this->pndrecvcnt > 0u ) { this->pndrecvcnt--; } } - UNLOCK (this); + this->defaultMutex.unlock (); if ( this->pndrecvcnt == 0u ) { semBinaryGive (this->ca_io_done_sem); } @@ -604,11 +623,11 @@ void cac::decrementOutstandingIO (unsigned seqNumber) void cac::decrementOutstandingIO () { - LOCK (this); + this->defaultMutex.lock (); if ( this->pndrecvcnt > 0u ) { this->pndrecvcnt--; } - UNLOCK (this); + this->defaultMutex.unlock (); if ( this->pndrecvcnt == 0u ) { semBinaryGive (this->ca_io_done_sem); } @@ -616,11 +635,11 @@ void cac::decrementOutstandingIO () void cac::incrementOutstandingIO () { - LOCK (this); + this->defaultMutex.lock (); if ( this->pndrecvcnt < UINT_MAX ) { this->pndrecvcnt++; } - UNLOCK (this); + this->defaultMutex.unlock (); } unsigned cac::readSequence () const @@ -628,7 +647,7 @@ unsigned cac::readSequence () const return this->readSeq; } -int cac::pend (double timeout, int early) +int cac::pend ( double timeout, int early ) { int status; void *p; @@ -636,16 +655,20 @@ int cac::pend (double timeout, int early) /* * dont allow recursion */ - p = threadPrivateGet (cacRecursionLock); + p = threadPrivateGet ( cacRecursionLock ); if (p) { return ECA_EVDISALLOW; } - threadPrivateSet (cacRecursionLock, &cacRecursionLock); + threadPrivateSet ( cacRecursionLock, &cacRecursionLock ); - status = this->pendPrivate (timeout, early); + this->enableCallbackPreemption (); - threadPrivateSet (cacRecursionLock, NULL); + status = this->pendPrivate ( timeout, early ); + + this->disableCallbackPreemption (); + + threadPrivateSet ( cacRecursionLock, NULL ); return status; } @@ -698,7 +721,6 @@ int cac::pendPrivate (double timeout, int early) } } - /* recv occurs in another thread */ semBinaryTakeTimeout ( this->ca_io_done_sem, remaining ); if ( this->pndrecvcnt == 0 && early ) { @@ -725,72 +747,72 @@ bool cac::ioComplete () const void cac::installIO ( baseNMIU &io ) { - LOCK (this); + this->defaultMutex.lock (); this->ioTable.add ( io ); - UNLOCK (this); + this->defaultMutex.unlock (); } void cac::uninstallIO ( baseNMIU &io ) { - LOCK (this); + this->defaultMutex.lock (); this->ioTable.remove ( io ); - UNLOCK (this); + this->defaultMutex.unlock (); } baseNMIU * cac::lookupIO (unsigned id) { - LOCK (this); + this->defaultMutex.lock (); baseNMIU * pmiu = this->ioTable.lookup ( id ); - UNLOCK (this); + this->defaultMutex.unlock (); return pmiu; } void cac::installChannel (nciu &chan) { - LOCK (this); + this->defaultMutex.lock (); this->chanTable.add ( chan ); - UNLOCK (this); + this->defaultMutex.unlock (); } void cac::uninstallChannel (nciu &chan) { - LOCK (this); + this->defaultMutex.lock (); this->chanTable.remove ( chan ); - UNLOCK (this); + this->defaultMutex.unlock (); } nciu * cac::lookupChan (unsigned id) { - LOCK (this); + this->defaultMutex.lock (); nciu * pchan = this->chanTable.lookup ( id ); - UNLOCK (this); + this->defaultMutex.unlock (); return pchan; } void cac::installCASG (CASG &sg) { - LOCK (this); + this->defaultMutex.lock (); this->sgTable.add ( sg ); - UNLOCK (this); + this->defaultMutex.unlock (); } void cac::uninstallCASG (CASG &sg) { - LOCK (this); + this->defaultMutex.lock (); this->sgTable.remove ( sg ); - UNLOCK (this); + this->defaultMutex.unlock (); } CASG * cac::lookupCASG (unsigned id) { - LOCK (this); + this->defaultMutex.lock (); CASG * psg = this->sgTable.lookup ( id ); if ( psg ) { if ( ! psg->verify () ) { psg = 0; } } - UNLOCK (this); + this->defaultMutex.unlock (); return psg; } @@ -822,10 +844,19 @@ bool cac::createChannelIO (const char *pName, cacChannel &chan) pIO = cacGlobalServiceList.createChannelIO ( pName, chan ); if ( ! pIO ) { if ( ! this->pudpiiu ) { + this->defaultMutex.lock (); this->pudpiiu = new udpiiu ( this ); if ( ! this->pudpiiu ) { + this->defaultMutex.unlock (); return false; } + if ( ! this->enablePreemptiveCallback ) { + if ( this->fdRegFunc ) { + ( *this->fdRegFunc ) + ( this->fdRegArg, this->pudpiiu->getSock (), TRUE ); + } + } + this->defaultMutex.unlock (); } nciu *pNetChan = new nciu ( this, chan, pName ); if ( pNetChan ) { @@ -850,10 +881,28 @@ bool cac::createChannelIO (const char *pName, cacChannel &chan) void cac::lock () const { - semMutexMustTake ( this->ca_client_lock ); + this->defaultMutex.lock (); } void cac::unlock () const { - semMutexGive ( this->ca_client_lock ); + this->defaultMutex.unlock (); +} + +void cac::registerForFileDescriptorCallBack ( CAFDHANDLER *pFunc, void *pArg ) +{ + this->defaultMutex.lock (); + this->fdRegFunc = pFunc; + this->fdRegArg = pArg; + this->defaultMutex.unlock (); +} + +void cac::enableCallbackPreemption () +{ + this->pProcThread->enable (); +} + +void cac::disableCallbackPreemption () +{ + this->pProcThread->disable (); } diff --git a/src/ca/cadef.h b/src/ca/cadef.h index 25ffaff2c..1ced77327 100644 --- a/src/ca/cadef.h +++ b/src/ca/cadef.h @@ -188,6 +188,7 @@ epicsShareFunc enum channel_state epicsShareAPI ca_state (chid chan); /* Must be called once before calling any of the other routines */ /************************************************************************/ epicsShareFunc int epicsShareAPI ca_task_initialize (void); +epicsShareFunc int epicsShareAPI ca_context_create ( int preemptiveCallBackEnable ); /************************************************************************/ /* Remove CA facility from your task */ @@ -195,6 +196,7 @@ epicsShareFunc int epicsShareAPI ca_task_initialize (void); /* Normally called automatically at task exit */ /************************************************************************/ epicsShareFunc int epicsShareAPI ca_task_exit (void); +epicsShareFunc int epicsShareAPI ca_context_destroy (void); /************************************************************************ * anachronistic entry points * diff --git a/src/ca/flow_control.cpp b/src/ca/flow_control.cpp deleted file mode 100644 index 4e819bbc4..000000000 --- a/src/ca/flow_control.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* $Id$ */ -/* - * - * L O S A L A M O S - * Los Alamos National Laboratory - * Los Alamos, New Mexico 87545 - * - * Copyright, 1986, The Regents of the University of California. - * - * Author: Jeff Hill - */ - -#include "iocinf.h" - -/* - * FLOW CONTROL - * - * Keep track of how many times messages have - * come with out a break in between and - * suppress monitors if we are behind - * (an update is sent when we catch up) - */ - -void flow_control_on (tcpiiu *piiu) -{ - int status; - - LOCK (piiu->pcas); - - /* - * I prefer to avoid going into flow control - * as this impacts the performance of batched fetches - */ - if (piiu->contiguous_msg_count >= MAX_CONTIGUOUS_MSG_COUNT) { - if (!piiu->client_busy) { - status = piiu->busyRequestMsg (); - if (status==ECA_NORMAL) { - assert(piiu->pcas->ca_number_iiu_in_fcpcas->ca_number_iiu_in_fc++; - piiu->client_busy = TRUE; -# if defined(DEBUG) - printf("fc on\n"); -# endif - } - } - } - else { - piiu->contiguous_msg_count++; - } - - UNLOCK (piiu->pcas); - return; -} - -void flow_control_off (tcpiiu *piiu) -{ - int status; - - LOCK (piiu->pcas); - - piiu->contiguous_msg_count = 0; - if (piiu->client_busy) { - status = piiu->readyRequestMsg (); - if (status==ECA_NORMAL) { - assert (piiu->pcas->ca_number_iiu_in_fc>0u); - piiu->pcas->ca_number_iiu_in_fc--; - piiu->client_busy = FALSE; -# if defined(DEBUG) - printf("fc off\n"); -# endif - } - } - - UNLOCK (piiu->pcas); - return; -} diff --git a/src/ca/iocinf.h b/src/ca/iocinf.h index 23f0934e2..7d04069fe 100644 --- a/src/ca/iocinf.h +++ b/src/ca/iocinf.h @@ -108,9 +108,6 @@ #define MSEC_PER_SEC 1000L #define USEC_PER_SEC 1000000L -#define LOCK(PCAC) semMutexMustTake ( (PCAC)->ca_client_lock ); -#define UNLOCK(PCAC) semMutexGive ( (PCAC)->ca_client_lock ); - /* * catch when they use really large strings */ @@ -429,6 +426,7 @@ public: int pushDatagramMsg (const caHdr *pMsg, const void *pExt, ca_uint16_t extsize); void repeaterRegistrationMessage ( unsigned attemptNumber ); void flush (); + SOCKET getSock () const; osiTime recvTime; char xmitBuf[MAX_UDP]; @@ -437,7 +435,6 @@ public: repeaterSubscribeTimer repeaterSubscribeTmr; semMutexId xmitBufLock; ELLLIST dest; - SOCKET sock; semBinaryId recvThreadExitSignal; unsigned nBytesInXmitBuf; unsigned short repeaterPort; @@ -446,7 +443,8 @@ public: // exceptions class noSocket {}; class noMemory {}; -private: +private: + SOCKET sock; bool compareIfTCP (nciu &chan, const sockaddr_in &) const; }; @@ -496,6 +494,8 @@ private: const double period; }; +extern "C" void cacSendThreadTCP ( void *pParam ); + class tcpiiu : public tcpRecvWatchdog, public tcpSendWatchdog, public netiiu, public tsDLNode { public: @@ -520,6 +520,7 @@ public: void flush (); virtual void show (unsigned level) const; osiSockAddr ipAddress () const; + SOCKET getSock () const; void noopRequestMsg (); void echoRequestMsg (); @@ -542,7 +543,6 @@ public: unsigned minor_version_number; unsigned contiguous_msg_count; unsigned curMsgBytes; - SOCKET sock; iiu_conn_state state; bool client_busy; bool echoRequestPending; @@ -554,9 +554,16 @@ public: private: bool compareIfTCP (nciu &chan, const sockaddr_in &) const; int pushDatagramMsg (const caHdr *pMsg, const void *pExt, ca_uint16_t extsize); + int pushStreamMsgPrivate ( const caHdr *pmsg, const void *pext, + unsigned extsize, unsigned actualextsize ); + void flowControlOn (); + void flowControlOff (); + SOCKET sock; bool fc; static tsFreeList < class tcpiiu, 16 > freeList; + + friend void cacSendThreadTCP ( void *pParam ); }; class inetAddrID { @@ -614,9 +621,23 @@ public: ~processThread (); void entryPoint (); void signalShutDown (); + void enable (); + void disable (); private: + // + // The additional complexity associated with + // "processingDone" event and the "processing" flag + // avoid complex locking hierarchy constraints + // and therefore reduces the chance of creating + // a deadlock window during code maintenance. + // class cac *pcac; osiEvent exit; + osiEvent processingDone; + osiMutex mutex; + unsigned enableRefCount; + unsigned blockingForCompletion; + bool processing; bool shutDown; }; @@ -683,7 +704,7 @@ private: class cac : public caClient { public: - cac (); + cac ( bool enablePreemptiveCallback = false ); virtual ~cac (); void safeDestroyNMIU (unsigned id); void processRecvBacklog (); @@ -721,8 +742,11 @@ public: void uninstallCASG (CASG &); void registerService ( cacServiceIO &service ); bool createChannelIO (const char *name_str, cacChannel &chan); + void registerForFileDescriptorCallBack ( CAFDHANDLER *pFunc, void *pArg ); void lock () const; void unlock () const; + void enableCallbackPreemption (); + void disableCallbackPreemption (); osiTimerQueue *pTimerQueue; ELLLIST activeCASGOP; @@ -735,8 +759,6 @@ public: caExceptionHandler *ca_exception_func; void *ca_exception_arg; caPrintfFunc *ca_printf_func; - CAFDHANDLER *ca_fd_register_func; - void *ca_fd_register_arg; char *ca_pUserName; char *ca_pHostName; resTable @@ -745,8 +767,6 @@ public: semBinaryId ca_io_done_sem; osiEvent recvActivity; semBinaryId ca_blockSem; - semMutexId ca_client_lock; - processThread *pProcThread; unsigned readSeq; unsigned ca_nextSlowBucketId; unsigned ca_number_iiu_in_fc; @@ -769,7 +789,12 @@ private: < nciu > chanTable; chronIntIdResTable < CASG > sgTable; + processThread *pProcThread; + CAFDHANDLER *fdRegFunc; + void *fdRegArg; unsigned pndrecvcnt; + bool enablePreemptiveCallback; + int pendPrivate (double timeout, int early); }; @@ -782,8 +807,6 @@ int ca_defunct (void); int ca_printf (const char *pformat, ...); int ca_vPrintf (const char *pformat, va_list args); void manage_conn (cac *pcac); -void flow_control_on (tcpiiu *piiu); -void flow_control_off (tcpiiu *piiu); epicsShareFunc void epicsShareAPI ca_repeater (void); int cac_select_io (cac *pcac, double maxDelay, int flags); diff --git a/src/ca/nciu.cpp b/src/ca/nciu.cpp index 2f5a13697..f8c9979e0 100644 --- a/src/ca/nciu.cpp +++ b/src/ca/nciu.cpp @@ -678,11 +678,10 @@ void nciu::disconnect () */ int nciu::searchMsg () { - udpiiu *pudpiiu = this->piiu->pcas->pudpiiu; int status; caHdr msg; - if ( this->piiu != static_cast (pudpiiu) ) { + if ( this->piiu != static_cast (this->piiu->pcas->pudpiiu) ) { return ECA_INTERNAL; } @@ -690,10 +689,10 @@ int nciu::searchMsg () return ECA_STRTOBIG; } - msg.m_cmmd = htons (CA_PROTO_SEARCH); + msg.m_cmmd = htons ( CA_PROTO_SEARCH ); msg.m_available = this->getId (); - msg.m_dataType = htons (DONTREPLY); - msg.m_count = htons (CA_MINOR_VERSION); + msg.m_dataType = htons ( DONTREPLY ); + msg.m_count = htons ( CA_MINOR_VERSION ); msg.m_cid = this->getId (); status = this->piiu->pushDatagramMsg (&msg, this->pNameStr, this->nameLength); @@ -704,7 +703,7 @@ int nciu::searchMsg () /* * increment the number of times we have tried to find this thisnel */ - if (this->retryretry < MAXCONNTRIES ) { this->retry++; } @@ -891,14 +890,12 @@ int nciu::subscriptionMsg ( unsigned subscriptionId, unsigned typeIn, msg.m_info.m_mask = htons ( maskIn ); msg.m_info.m_pad = 0; /* allow future use */ - this->lock (); if ( this->f_connected ) { status = this->piiu->pushStreamMsg ( &msg.m_header, &msg.m_info, true ); } else { status = ECA_NORMAL; } - this->unlock (); return status; } diff --git a/src/ca/processThread.cpp b/src/ca/processThread.cpp index 316b57a25..5853c0908 100644 --- a/src/ca/processThread.cpp +++ b/src/ca/processThread.cpp @@ -13,6 +13,7 @@ * Author Jeffrey O. Hill * johill@lanl.gov * 505 665 1831 + * */ #include @@ -20,6 +21,9 @@ processThread::processThread (cac *pcacIn) : osiThread ( "CAC-process", threadGetStackSize (threadStackSmall), threadPriorityMedium ), pcac ( pcacIn ), + enableRefCount ( 0u ), + blockingForCompletion ( 0u ), + processing ( false ), shutDown ( false ) { this->start (); @@ -38,7 +42,22 @@ void processThread::entryPoint () int status = ca_attach_context ( this->pcac ); SEVCHK ( status, "attaching to client context in process thread" ); while ( ! this->shutDown ) { - pcac->processRecvBacklog (); + + this->mutex.lock (); + if ( this->enableRefCount ) { + this->processing = true; + } + this->mutex.unlock (); + + if ( this->processing ) { + pcac->processRecvBacklog (); + } + + this->processing = false; + if ( this->blockingForCompletion ) { + this->processingDone.signal (); + } + this->pcac->recvActivity.wait (); } this->exit.signal (); @@ -50,3 +69,46 @@ void processThread::signalShutDown () this->pcac->recvActivity.signal (); } +void processThread::enable () +{ + unsigned copy; + + this->mutex.lock (); + assert ( this->enableRefCount < UINT_MAX ); + copy = this->enableRefCount; + this->enableRefCount++; + this->mutex.unlock (); + + if ( copy == 0u ) { + this->pcac->recvActivity.signal (); + } +} + +void processThread::disable () +{ + bool waitNeeded; + + this->mutex.lock (); + assert ( this->enableRefCount != 0u ); + this->enableRefCount--; + if ( this->enableRefCount == 0u && this->processing ) { + waitNeeded = true; + this->blockingForCompletion++; + } + else { + waitNeeded = false; + } + this->mutex.unlock (); + + if ( waitNeeded ) { + while ( this->processing ) { + this->processingDone.wait (); + } + this->mutex.lock (); + this->blockingForCompletion--; + this->mutex.unlock (); + if ( this->blockingForCompletion ) { + this->processingDone.signal (); + } + } +} \ No newline at end of file diff --git a/src/ca/ringBuffer.cpp b/src/ca/ringBuffer.cpp index 7c62abbe3..3a4ed8f40 100644 --- a/src/ca/ringBuffer.cpp +++ b/src/ca/ringBuffer.cpp @@ -331,7 +331,6 @@ void cacRingBufferWriteLock (ringBuffer *pBuf) bool cacRingBufferWriteLockNoBlock (ringBuffer *pBuf, unsigned bytesRequired) { semMutexMustTake (pBuf->writeLock); - if ( cacRingBufferWriteSize (pBuf) < bytesRequired ) { semMutexGive (pBuf->writeLock); return false; diff --git a/src/ca/searchTimer.cpp b/src/ca/searchTimer.cpp index 66333d454..1575d05fb 100644 --- a/src/ca/searchTimer.cpp +++ b/src/ca/searchTimer.cpp @@ -47,7 +47,7 @@ void searchTimer::reset ( double delayToNextTry ) delayToNextTry = CA_RECAST_DELAY; } - LOCK (this->iiu.pcas); + this->iiu.pcas->lock (); this->retry = 0; if ( this->period > delayToNextTry ) { reschedule = true; @@ -56,7 +56,7 @@ void searchTimer::reset ( double delayToNextTry ) reschedule = false; } this->period = CA_RECAST_DELAY; - UNLOCK (this->iiu.pcas); + this->iiu.pcas->unlock (); if ( reschedule ) { this->reschedule ( delayToNextTry ); @@ -76,7 +76,7 @@ void searchTimer::setRetryInterval (unsigned retryNo) unsigned idelay; double delay; - LOCK (this->iiu.pcas); + this->iiu.pcas->lock (); /* * set the retry number @@ -93,7 +93,7 @@ void searchTimer::setRetryInterval (unsigned retryNo) */ this->period = min (CA_RECAST_PERIOD, delay); - UNLOCK (this->iiu.pcas); + this->iiu.pcas->unlock (); debugPrintf ( ("new CA search period is %f sec\n", this->period) ); } @@ -107,7 +107,7 @@ void searchTimer::setRetryInterval (unsigned retryNo) // void searchTimer::notifySearchResponse (nciu *pChan) { - LOCK (this->iiu.pcas); + this->iiu.pcas->lock (); if ( this->retrySeqNoAtListBegin <= pChan->retrySeqNo ) { if ( this->searchResponses < ULONG_MAX ) { @@ -115,7 +115,7 @@ void searchTimer::notifySearchResponse (nciu *pChan) } } - UNLOCK (this->iiu.pcas); + this->iiu.pcas->unlock (); if ( pChan->retrySeqNo == this->retrySeqNo ) { this->reschedule (0.0); @@ -139,7 +139,7 @@ void searchTimer::expire () return; } - LOCK ( this->iiu.pcas ); + this->iiu.pcas->lock (); /* * increment the retry sequence number @@ -314,7 +314,7 @@ void searchTimer::expire () } } - UNLOCK (this->iiu.pcas); + this->iiu.pcas->unlock (); // flush out the search request buffer this->iiu.flush (); diff --git a/src/ca/tcpiiu.cpp b/src/ca/tcpiiu.cpp index d253c4323..8874f2fa0 100644 --- a/src/ca/tcpiiu.cpp +++ b/src/ca/tcpiiu.cpp @@ -145,23 +145,23 @@ LOCAL void retryPendingClaims (tcpiiu *piiu) { bool success; - LOCK (piiu->pcas); + piiu->pcas->lock (); tsDLIterBD chan ( piiu->chidList.first () ); while ( chan != chan.eol () ) { - if (!chan->claimPending) { + if ( ! chan->claimPending ) { piiu->claimRequestsPending = false; piiu->flush (); break; } // this moves the channel to the end of the list - success = chan->claimMsg (piiu); + success = chan->claimMsg ( piiu ); if ( ! success ) { piiu->flush (); break; } chan = piiu->chidList.first (); } - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); } /* @@ -291,8 +291,15 @@ void tcpiiu::recvMsg () assert ( ( (unsigned) status ) <= writeSpace ); totalBytes = (unsigned) status; + + if ( writeSpace == totalBytes ) { + this->flowControlOn (); + } + else { + this->flowControlOff (); + } - cacRingBufferWriteCommit (&this->recv, totalBytes); + cacRingBufferWriteCommit ( &this->recv, totalBytes ); // cacRingBufferWriteFlush (&this->recv); this->messageArrivalNotify (); // reschedule connection activity watchdog @@ -508,7 +515,7 @@ tcpiiu::tcpiiu (cac *pcac, const struct sockaddr_in &ina, unsigned minorVersion, */ void tcpiiu::shutdown () { - LOCK ( this->pcas ); + this->pcas->lock (); if ( this->state != iiu_disconnected ) { int status; @@ -522,7 +529,7 @@ void tcpiiu::shutdown () cacRingBufferShutDown ( &this->recv ); this->pcas->signalRecvActivityIIU ( *this ); } - UNLOCK ( this->pcas ); + this->pcas->unlock (); } // @@ -540,7 +547,7 @@ tcpiiu::~tcpiiu () this->shutdown (); - LOCK (this->pcas); + this->pcas->lock (); chanDisconnectCount = ellCount (&this->chidList); if ( chanDisconnectCount ) { @@ -554,7 +561,7 @@ tcpiiu::~tcpiiu () iter = next; } - UNLOCK (this->pcas); + this->pcas->unlock (); // wait for send and recv threads to exit semBinaryMustTake ( this->sendThreadExitSignal ); @@ -566,10 +573,6 @@ tcpiiu::~tcpiiu () this->pcas->removeBeaconInetAddr ( this->dest.ia ); - if ( this->pcas->ca_fd_register_func ) { - (*this->pcas->ca_fd_register_func) - ((void *)this->pcas->ca_fd_register_arg, this->sock, FALSE); - } socket_close (this->sock); cacRingBufferDestroy (&this->recv); @@ -596,12 +599,12 @@ bool tcpiiu::compareIfTCP ( nciu &chan, const sockaddr_in &addr ) const char rej[64]; ipAddrToA ( &addr, rej, sizeof (rej) ); - LOCK ( this->pcas ); + this->pcas->lock (); sprintf ( this->pcas->ca_sprintf_buf, "Channel: %s Accepted: %s Rejected: %s ", chan.pName (), this->host_name_str, rej ); genLocalExcep (this->pcas, ECA_DBLCHNL, this->pcas->ca_sprintf_buf); - UNLOCK ( this->pcas ); + this->pcas->unlock (); } return true; } @@ -781,7 +784,7 @@ LOCAL void write_notify_resp_action (tcpiiu *piiu) { baseNMIU *monix; - LOCK (piiu->pcas); + piiu->pcas->lock (); monix = piiu->pcas->lookupIO ( piiu->curMsg.m_available ); if (monix) { int status = ntohl (piiu->curMsg.m_cid); @@ -793,7 +796,7 @@ LOCAL void write_notify_resp_action (tcpiiu *piiu) } monix->destroy (); } - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); } /* @@ -803,7 +806,7 @@ LOCAL void read_notify_resp_action ( tcpiiu *piiu ) { baseNMIU *monix; - LOCK (piiu->pcas); + piiu->pcas->lock (); monix = piiu->pcas->lookupIO ( piiu->curMsg.m_available ); if (monix) { @@ -849,7 +852,7 @@ LOCAL void read_notify_resp_action ( tcpiiu *piiu ) monix->destroy (); } - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -864,7 +867,7 @@ LOCAL void event_resp_action (tcpiiu *piiu) /* * run the user's event handler */ - LOCK (piiu->pcas); + piiu->pcas->lock (); monix = piiu->pcas->lookupIO ( piiu->curMsg.m_available ); if ( monix ) { int v41; @@ -877,7 +880,7 @@ LOCAL void event_resp_action (tcpiiu *piiu) */ if ( ! piiu->curMsg.m_postsize ) { monix->destroy (); - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -920,7 +923,7 @@ LOCAL void event_resp_action (tcpiiu *piiu) } } - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -932,7 +935,7 @@ LOCAL void read_resp_action (tcpiiu *piiu) { baseNMIU *pIOBlock; - LOCK (piiu->pcas); + piiu->pcas->lock (); /* * verify the event id @@ -948,7 +951,7 @@ LOCAL void read_resp_action (tcpiiu *piiu) pIOBlock->destroy (); } - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -979,7 +982,7 @@ LOCAL void exception_resp_action (tcpiiu *piiu) sprintf (context, "detected by: %s", piiu->host_name_str); } - LOCK (piiu->pcas); + piiu->pcas->lock (); switch ( ntohs (req->m_cmmd) ) { case CA_PROTO_READ_NOTIFY: monix = piiu->pcas->lookupIO ( piiu->curMsg.m_available ); @@ -1048,7 +1051,7 @@ LOCAL void exception_resp_action (tcpiiu *piiu) break; } - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -1061,14 +1064,14 @@ LOCAL void access_rights_resp_action (tcpiiu *piiu) int ar; nciu *chan; - LOCK (piiu->pcas); + piiu->pcas->lock (); chan = piiu->pcas->lookupChan (piiu->curMsg.m_cid); if ( ! chan ) { /* * end up here if they delete the channel * prior to connecting */ - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -1078,7 +1081,7 @@ LOCAL void access_rights_resp_action (tcpiiu *piiu) chan->accessRightsNotify (chan->ar); - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -1090,11 +1093,11 @@ LOCAL void claim_ciu_resp_action (tcpiiu *piiu) unsigned sid; nciu *chan; - LOCK (piiu->pcas); + piiu->pcas->lock (); chan = piiu->pcas->lookupChan (piiu->curMsg.m_cid); if ( ! chan ) { - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -1107,7 +1110,7 @@ LOCAL void claim_ciu_resp_action (tcpiiu *piiu) chan->connect (*piiu, piiu->curMsg.m_dataType, piiu->curMsg.m_count, sid); - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -1119,14 +1122,14 @@ LOCAL void verifyAndDisconnectChan (tcpiiu *piiu) { nciu *chan; - LOCK (piiu->pcas); + piiu->pcas->lock (); chan = piiu->pcas->lookupChan (piiu->curMsg.m_cid); if (!chan) { /* * end up here if they delete the channel * prior to this response */ - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -1138,7 +1141,7 @@ LOCAL void verifyAndDisconnectChan (tcpiiu *piiu) * count goes to zero */ chan->disconnect (); - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -1189,9 +1192,6 @@ LOCAL const pProtoStubTCP tcpJumpTableCAC[] = /* * post_tcp_msg () - * - * LOCK should be applied when calling this routine - * */ int tcpiiu::post_msg (char *pInBuf, unsigned long blockSize) { @@ -1271,7 +1271,7 @@ int tcpiiu::post_msg (char *pInBuf, unsigned long blockSize) */ cacheSize = max ( this->curMsg.m_postsize, MAX_STRING_SIZE ); pData = (void *) calloc (1u, cacheSize); - if (!pData) { + if ( ! pData ) { return ECA_ALLOCMEM; } if (this->pCurData) { @@ -1333,27 +1333,25 @@ void tcpiiu::hostName ( char *pBuf, unsigned bufLength ) const bool tcpiiu::ca_v42_ok () const { - return CA_V42 (CA_PROTOCOL_VERSION, - this->minor_version_number); + return CA_V42 (CA_PROTOCOL_VERSION, this->minor_version_number); } bool tcpiiu::ca_v41_ok () const { - return CA_V41 (CA_PROTOCOL_VERSION, - this->minor_version_number); + return CA_V41 (CA_PROTOCOL_VERSION, this->minor_version_number); } /* * tcpiiu::pushStreamMsg () */ -int tcpiiu::pushStreamMsg (const caHdr *pmsg, - const void *pext, bool BlockingOk) +int tcpiiu::pushStreamMsg ( const caHdr *pmsg, + const void *pext, bool BlockingOk ) { caHdr msg; ca_uint16_t actualextsize; ca_uint16_t extsize; unsigned msgsize; - unsigned bytesSent; + int status; if ( pext == NULL ) { extsize = actualextsize = 0; @@ -1367,25 +1365,38 @@ int tcpiiu::pushStreamMsg (const caHdr *pmsg, } msg = *pmsg; - msg.m_postsize = htons (extsize); - msgsize = extsize + sizeof (msg); + msg.m_postsize = htons ( extsize ); + msgsize = extsize + sizeof ( msg ); - if ( ! cacRingBufferWriteLockNoBlock ( &this->send, msgsize ) ) { - if ( BlockingOk ) { - this->armSendWatchdog (); - cacRingBufferWriteLock ( &this->send ); - } - else { - return ECA_OPWILLBLOCK; - } + if ( cacRingBufferWriteLockNoBlock ( &this->send, msgsize ) ) { + status = this->pushStreamMsgPrivate ( &msg, pext, extsize, actualextsize ); + cacRingBufferWriteUnlock ( &this->send ); + return status; } + else if ( BlockingOk ) { + this->armSendWatchdog (); + this->pcas->enableCallbackPreemption (); + cacRingBufferWriteLock ( &this->send ); + status = this->pushStreamMsgPrivate ( &msg, pext, extsize, actualextsize ); + cacRingBufferWriteUnlock ( &this->send ); + this->pcas->disableCallbackPreemption (); + return status; + } + else { + return ECA_OPWILLBLOCK; + } +} + +int tcpiiu::pushStreamMsgPrivate ( const caHdr *pmsg, const void *pext, + unsigned extsize, unsigned actualextsize ) +{ + unsigned bytesSent; /* * push the header onto the ring */ - bytesSent = cacRingBufferWrite ( &this->send, &msg, sizeof (msg) ); - if ( bytesSent != sizeof (msg) ) { - cacRingBufferWriteUnlock ( &this->send ); + bytesSent = cacRingBufferWrite ( &this->send, pmsg, sizeof ( *pmsg ) ); + if ( bytesSent != sizeof ( *pmsg ) ) { return ECA_DISCONNCHID; } @@ -1397,7 +1408,6 @@ int tcpiiu::pushStreamMsg (const caHdr *pmsg, if ( extsize > 0u ) { bytesSent = cacRingBufferWrite ( &this->send, pext, actualextsize ); if ( bytesSent != actualextsize ) { - cacRingBufferWriteUnlock ( &this->send ); return ECA_DISCONNCHID; } /* @@ -1408,19 +1418,15 @@ int tcpiiu::pushStreamMsg (const caHdr *pmsg, unsigned long n; n = extsize-actualextsize; - if (n) { + if ( n ) { assert ( n <= sizeof (nullBuff) ); bytesSent = cacRingBufferWrite ( &this->send, nullBuff, n ); if ( bytesSent != n ) { - cacRingBufferWriteUnlock ( &this->send ); return ECA_DISCONNCHID; } } } } - - cacRingBufferWriteUnlock ( &this->send ); - return ECA_NORMAL; } @@ -1437,19 +1443,19 @@ int tcpiiu::pushDatagramMsg (const caHdr * /* pMsg */, */ void tcpiiu::addToChanList (nciu *chan) { - LOCK (this->pcas); + this->pcas->lock (); chan->claimPending = TRUE; this->chidList.push (*chan); chan->piiu = this; - UNLOCK (this->pcas); + this->pcas->unlock (); } void tcpiiu::removeFromChanList (nciu *chan) { - LOCK (this->pcas); + this->pcas->lock (); this->chidList.remove (*chan); chan->piiu = NULL; - UNLOCK (this->pcas); + this->pcas->unlock (); if ( this->chidList.count () == 0 ) { this->shutdown (); @@ -1458,7 +1464,7 @@ void tcpiiu::removeFromChanList (nciu *chan) void tcpiiu::disconnect (nciu *chan) { - LOCK (this->pcas); + this->pcas->lock (); this->removeFromChanList (chan); /* * try to reconnect @@ -1466,5 +1472,70 @@ void tcpiiu::disconnect (nciu *chan) assert (this->pcas->pudpiiu); this->pcas->pudpiiu->addToChanList ( chan ); this->pcas->pudpiiu->searchTmr.reset ( CA_RECAST_DELAY ); - UNLOCK (this->pcas); + this->pcas->unlock (); +} + +SOCKET tcpiiu::getSock () const +{ + return this->sock; +} + +/* + * FLOW CONTROL + * + * Keep track of how many times messages have + * come with out a break in between and + * suppress monitors if we are behind + * (an update is sent when we catch up) + */ +void tcpiiu::flowControlOn () +{ + int status; + + this->pcas->lock (); + + /* + * I prefer to avoid going into flow control + * as this impacts the performance of batched fetches + */ + if ( this->contiguous_msg_count >= MAX_CONTIGUOUS_MSG_COUNT ) { + if ( ! this->client_busy ) { + status = this->busyRequestMsg (); + if ( status == ECA_NORMAL ) { + assert ( this->pcas->ca_number_iiu_in_fc < UINT_MAX ); + this->pcas->ca_number_iiu_in_fc++; + this->client_busy = TRUE; +# if defined(DEBUG) + printf("fc on\n"); +# endif + } + } + } + else { + this->contiguous_msg_count++; + } + + this->pcas->unlock (); +} + +void tcpiiu::flowControlOff () +{ + int status; + + this->pcas->lock (); + + this->contiguous_msg_count = 0; + if ( this->client_busy ) { + status = this->readyRequestMsg (); + if ( status == ECA_NORMAL ) { + assert ( this->pcas->ca_number_iiu_in_fc > 0u ); + this->pcas->ca_number_iiu_in_fc--; + this->client_busy = FALSE; +# if defined (DEBUG) + printf("fc off\n"); +# endif + } + } + + this->pcas->unlock (); } diff --git a/src/ca/udpiiu.cpp b/src/ca/udpiiu.cpp index d4056aa27..bd0e1a273 100644 --- a/src/ca/udpiiu.cpp +++ b/src/ca/udpiiu.cpp @@ -184,7 +184,7 @@ void udpiiu::repeaterRegistrationMessage ( unsigned attemptNumber ) errnoCpy != SOCK_ECONNREFUSED ) { ca_printf ( "CAC: error sending to repeater was \"%s\"\n", - SOCKERRSTR(errnoCpy)); + SOCKERRSTR (errnoCpy) ); } } } @@ -354,7 +354,7 @@ udpiiu::udpiiu ( cac *pcac ) : priorityOfRecv = priorityOfSelf; } - tid = threadCreate ("CAC-UDP-recv", priorityOfRecv, + tid = threadCreate ("CAC-UDP", priorityOfRecv, threadGetStackSize (threadStackMedium), cacRecvThreadUDP, this); if (tid==0) { ca_printf ("CA: unable to create UDP receive thread\n"); @@ -365,10 +365,6 @@ udpiiu::udpiiu ( cac *pcac ) : } } - if ( pcac->ca_fd_register_func ) { - ( *pcac->ca_fd_register_func ) ( pcac->ca_fd_register_arg, this->sock, TRUE ); - } - if ( ! repeater_installed (this) ) { osiSpawnDetachedProcessReturn osptr; @@ -380,24 +376,16 @@ udpiiu::udpiiu ( cac *pcac ) : */ osptr = osiSpawnDetachedProcess ("CA Repeater", "caRepeater"); if ( osptr == osiSpawnDetachedProcessNoSupport ) { - unsigned priorityOfSelf = threadGetPrioritySelf (); - unsigned priorityOfRepeater; threadId tid; - threadBoolStatus tbs; - tbs = threadLowestPriorityLevelAbove ( priorityOfSelf, &priorityOfRepeater ); - if ( tbs != tbsSuccess ) { - priorityOfRepeater = priorityOfSelf; - } - - tid = threadCreate ( "CAC-repeater", priorityOfRepeater, + tid = threadCreate ( "CAC-repeater", threadPriorityLow, threadGetStackSize (threadStackMedium), caRepeaterThread, 0); - if (tid==0) { + if ( tid == 0 ) { ca_printf ("CA: unable to create CA repeater daemon thread\n"); } } - else if (osptr==osiSpawnDetachedProcessFail) { - ca_printf ("CA: unable to start CA repeater daemon detached process\n"); + else if ( osptr == osiSpawnDetachedProcessFail ) { + ca_printf ( "CA: unable to start CA repeater daemon detached process\n" ); } } } @@ -409,10 +397,12 @@ udpiiu::~udpiiu () { nciu *pChan, *pNext; + this->searchTmr.cancel (); + // closes the udp socket this->shutdown (); - LOCK (this->pcas); + this->pcas->lock (); tsDLIter iter (this->chidList); pChan = iter (); while (pChan) { @@ -420,19 +410,15 @@ udpiiu::~udpiiu () pChan->destroy (); pChan = pNext; } - UNLOCK (this->pcas); + this->pcas->unlock (); // wait for recv threads to exit semBinaryMustTake (this->recvThreadExitSignal); semMutexDestroy (this->xmitBufLock); semBinaryDestroy (this->recvThreadExitSignal); - ellFree (&this->dest); - if (this->pcas->ca_fd_register_func) { - (*this->pcas->ca_fd_register_func) - (this->pcas->ca_fd_register_arg, this->sock, FALSE); - } + ellFree (&this->dest); } /* @@ -440,7 +426,7 @@ udpiiu::~udpiiu () */ void udpiiu::shutdown () { - LOCK (this->pcas); + this->pcas->lock (); if ( ! this->shutdownCmd ) { int status; @@ -455,7 +441,7 @@ void udpiiu::shutdown () SOCKERRSTR (SOCKERRNO) ); } } - UNLOCK (this->pcas); + this->pcas->unlock (); } /* @@ -495,10 +481,10 @@ LOCAL void search_resp_action (udpiiu *piiu, caHdr *pMsg, const struct sockaddr_ * * lock required around use of the sprintf buffer */ - LOCK (piiu->pcas); + piiu->pcas->lock (); chan = piiu->pcas->lookupChan (pMsg->m_available); - if (!chan) { - UNLOCK (piiu->pcas); + if ( ! chan ) { + piiu->pcas->unlock (); return; } @@ -546,13 +532,13 @@ LOCAL void search_resp_action (udpiiu *piiu, caHdr *pMsg, const struct sockaddr_ * Ignore duplicate search replies */ if ( chan->piiu->compareIfTCP (*chan, *pnet_addr) ) { - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } allocpiiu = constructTCPIIU (piiu->pcas, &ina, minorVersion); if ( ! allocpiiu ) { - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -607,7 +593,8 @@ LOCAL void search_resp_action (udpiiu *piiu, caHdr *pMsg, const struct sockaddr_ chan->claimMsg ( allocpiiu ); cacRingBufferWriteFlush ( &allocpiiu->send ); - UNLOCK ( piiu->pcas ); + + piiu->pcas->unlock (); } @@ -619,7 +606,7 @@ LOCAL void beacon_action ( udpiiu * piiu, { struct sockaddr_in ina; - LOCK (piiu->pcas); + piiu->pcas->lock (); /* * this allows a fan-out server to potentially @@ -654,7 +641,7 @@ LOCAL void beacon_action ( udpiiu * piiu, } piiu->pcas->beaconNotify (ina); - UNLOCK (piiu->pcas); + piiu->pcas->unlock (); return; } @@ -738,9 +725,6 @@ LOCAL const pProtoStubUDP udpJumpTableCAC[] = /* * post_udp_msg () - * - * LOCK should be applied when calling this routine - * */ int udpiiu::post_msg (const struct sockaddr_in *pnet_addr, char *pInBuf, unsigned long blockSize) @@ -832,7 +816,8 @@ bool udpiiu::compareIfTCP (nciu &, const sockaddr_in &) const */ void udpiiu::addToChanList (nciu *chan) { - LOCK (this->pcas); + this->pcas->lock (); + /* * add to the beginning of the list so that search requests for * this channel will be sent first (since the retry count is zero) @@ -847,14 +832,14 @@ void udpiiu::addToChanList (nciu *chan) chan->retry = 0u; this->chidList.push (*chan); chan->piiu = this; - UNLOCK (this->pcas); + this->pcas->unlock (); } void udpiiu::removeFromChanList (nciu *chan) { tsDLIterBD iter (chan); - LOCK (this->pcas); + this->pcas->lock (); if ( chan->piiu->pcas->endOfBCastList == iter ) { if ( iter.itemBefore () != tsDLIterBD::eol () ) { chan->piiu->pcas->endOfBCastList = iter.itemBefore (); @@ -866,7 +851,7 @@ void udpiiu::removeFromChanList (nciu *chan) } chan->piiu->chidList.remove (*chan); chan->piiu = NULL; - UNLOCK (this->pcas); + this->pcas->unlock (); } void udpiiu::disconnect ( nciu * /* chan */ ) @@ -982,3 +967,8 @@ int udpiiu::pushStreamMsg ( const caHdr * /* pmsg */, ca_printf ("in pushStreamMsg () for a udp iiu?\n"); return ECA_DISCONNCHID; } + +SOCKET udpiiu::getSock () const +{ + return this->sock; +} \ No newline at end of file