improved client scedualing fro fdManager based apps

This commit is contained in:
Jeff Hill
2003-02-20 17:27:04 +00:00
parent 077fe6e1f3
commit 3f680362bb
11 changed files with 353 additions and 217 deletions

View File

@@ -29,21 +29,92 @@
extern epicsThreadPrivateId caClientContextId;
ca_client_context::ca_client_context ( bool enablePreemptiveCallback ) :
clientCtx ( * new cac ( *this, enablePreemptiveCallback ) ),
pCallbackGuard ( 0 ), ca_exception_func ( 0 ), ca_exception_arg ( 0 ),
ca_exception_func ( 0 ), ca_exception_arg ( 0 ),
pVPrintfFunc ( errlogVprintf ), fdRegFunc ( 0 ), fdRegArg ( 0 ),
pndRecvCnt ( 0u ), ioSeqNo ( 0u )
pndRecvCnt ( 0u ), ioSeqNo ( 0u ), localPort ( 0 ),
fdRegFuncNeedsToBeCalled ( false ), noWakeupSincePend ( true )
{
if ( ! enablePreemptiveCallback ) {
this->pCallbackGuard = new epicsGuard < callbackMutex >
( this->clientCtx.callbackGuardFactory () );
static const unsigned short PORT_ANY = 0u;
this->sock = socket ( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
if ( this->sock == INVALID_SOCKET ) {
this->printf (
"ca_client_context: unable to create "
"datagram socket because = \"%s\"\n",
SOCKERRSTR (SOCKERRNO));
throwWithLocation ( noSocket () );
}
{
osiSockIoctl_t yes = true;
int status = socket_ioctl ( this->sock,
FIONBIO, & yes); // X aCC 392
if ( status < 0 ) {
socket_close ( this->sock );
this->printf (
"%s: non blocking IO set fail because \"%s\"\n",
__FILE__, SOCKERRSTR ( SOCKERRNO ) );
throwWithLocation ( noSocket () );
}
}
// force a bind to an unconstrained address so we can obtain
// the local port number below
{
osiSockAddr addr;
memset ( (char *)&addr, 0 , sizeof ( addr ) );
addr.ia.sin_family = AF_INET;
addr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_ANY );
addr.ia.sin_port = epicsHTON16 ( PORT_ANY ); // X aCC 818
int status = bind (this->sock, &addr.sa, sizeof (addr) );
if ( status < 0 ) {
socket_close (this->sock);
this->printf (
"CAC: unable to bind to an unconstrained "
"address because = \"%s\"\n",
SOCKERRSTR (SOCKERRNO));
throwWithLocation ( noSocket () );
}
}
{
osiSockAddr tmpAddr;
osiSocklen_t saddr_length = sizeof ( tmpAddr );
int status = getsockname ( this->sock, & tmpAddr.sa, & saddr_length );
if ( status < 0 ) {
socket_close ( this->sock );
this->printf ( "CAC: getsockname () error was \"%s\"\n", SOCKERRSTR (SOCKERRNO) );
throwWithLocation ( noSocket () );
}
if ( tmpAddr.sa.sa_family != AF_INET) {
socket_close ( this->sock );
this->printf ( "CAC: UDP socket was not inet addr family\n" );
throwWithLocation ( noSocket () );
}
this->localPort = epicsNTOH16 ( tmpAddr.ia.sin_port );
}
epics_auto_ptr < cac > pCAC (
new cac ( *this, enablePreemptiveCallback ) );
epics_auto_ptr < epicsGuard < callbackMutex > > pCBGuard;
if ( ! enablePreemptiveCallback ) {
pCBGuard.reset ( new epicsGuard < callbackMutex >
( pCAC->callbackGuardFactory () ) );
}
// multiple steps ensure exception safety
this->pCallbackGuard = pCBGuard;
this->pClientCtx = pCAC;
}
ca_client_context::~ca_client_context ()
{
delete this->pCallbackGuard;
delete & this->clientCtx;
if ( this->fdRegFunc ) {
( *this->fdRegFunc )
( this->fdRegArg, this->sock, false );
}
socket_close ( this->sock );
}
void ca_client_context::destroyChannel ( oldChannelNotify & chan )
@@ -101,6 +172,7 @@ void ca_client_context::registerForFileDescriptorCallBack ( CAFDHANDLER *pFunc,
epicsGuard < ca_client_context_mutex > autoMutex ( this->mutex );
this->fdRegFunc = pFunc;
this->fdRegArg = pArg;
this->fdRegFuncNeedsToBeCalled = true;
// should block here until releated callback in progress completes
}
@@ -160,7 +232,7 @@ void ca_client_context::exception ( int stat, const char *pCtx,
( *pFunc ) ( args );
}
else {
this->clientCtx.signal ( stat, pFile, lineNo, pCtx );
this->pClientCtx->signal ( stat, pFile, lineNo, pCtx );
}
}
@@ -192,7 +264,7 @@ void ca_client_context::exception ( int status, const char *pContext,
( *pFunc ) ( args );
}
else {
this->clientCtx.signal ( status, pFileName, lineNo,
this->pClientCtx->signal ( status, pFileName, lineNo,
"op=%u, channel=%s, type=%s, count=%lu, ctx=\"%s\"",
op, ca_name ( &chan ),
dbr_type_to_text ( static_cast <int> ( type ) ),
@@ -200,34 +272,6 @@ void ca_client_context::exception ( int status, const char *pContext,
}
}
void ca_client_context::fdWasCreated ( int fd )
{
CAFDHANDLER *pFunc;
void *pArg;
{
epicsGuard < ca_client_context_mutex > autoMutex ( this->mutex );
pFunc = this->fdRegFunc;
pArg = this->fdRegArg;
}
if ( pFunc ) {
( *pFunc ) ( pArg, fd, true );
}
}
void ca_client_context::fdWasDestroyed ( int fd )
{
CAFDHANDLER *pFunc;
void *pArg;
{
epicsGuard < ca_client_context_mutex > autoMutex ( this->mutex );
pFunc = this->fdRegFunc;
pArg = this->fdRegArg;
}
if ( pFunc ) {
( *pFunc ) ( pArg, fd, false );
}
}
void ca_client_context::show ( unsigned level ) const
{
::printf ( "ca_client_context at %p pndRecvCnt=%u ioSeqNo=%u\n",
@@ -235,9 +279,9 @@ void ca_client_context::show ( unsigned level ) const
this->pndRecvCnt, this->ioSeqNo );
if ( level > 0u ) {
this->mutex.show ( level - 1u );
this->clientCtx.show ( level - 1u );
this->pClientCtx->show ( level - 1u );
::printf ( "\tpreemptive callback is %s\n",
this->pCallbackGuard ? "disabled" : "enabled" );
this->pCallbackGuard.get() ? "disabled" : "enabled" );
::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n",
this->pndRecvCnt );
::printf ( "\tthe current io sequence number is %u\n",
@@ -351,10 +395,36 @@ int ca_client_context::pendEvent ( const double & timeout )
this->flushRequest ();
{
bool cleanupNeeded = false;
{
epicsGuard < ca_client_context_mutex > guard ( this->mutex );
if ( this->fdRegFunc ) {
cleanupNeeded = true;
}
}
if ( cleanupNeeded ) {
// send short udp message to wake up a file descriptor manager
// when a message arrives
osiSockAddr tmpAddr;
osiSocklen_t addrSize = sizeof ( tmpAddr.sa );
char buf = 0;
int status = 0;
do {
status = recvfrom ( this->sock, & buf, sizeof ( buf ),
0, & tmpAddr.sa, & addrSize );
} while ( status > 0 );
{
epicsGuard < ca_client_context_mutex > guard ( this->mutex );
this->noWakeupSincePend = true;
}
}
}
// process at least once if preemptive callback is disabled
if ( this->pCallbackGuard ) {
if ( this->pCallbackGuard.get() ) {
epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard );
this->clientCtx.waitUntilNoRecvThreadsPending ();
this->pClientCtx->waitUntilNoRecvThreadsPending ();
}
double elapsed = epicsTime::getCurrent() - current;
@@ -368,7 +438,7 @@ int ca_client_context::pendEvent ( const double & timeout )
}
if ( delay >= CAC_SIGNIFICANT_DELAY ) {
if ( this->pCallbackGuard ) {
if ( this->pCallbackGuard.get() ) {
epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard );
epicsThreadSleep ( delay );
}
@@ -380,9 +450,10 @@ int ca_client_context::pendEvent ( const double & timeout )
return ECA_TIMEOUT;
}
void ca_client_context::blockForEventAndEnableCallbacks ( epicsEvent & event, double timeout )
void ca_client_context::blockForEventAndEnableCallbacks (
epicsEvent & event, const double & timeout )
{
if ( this->pCallbackGuard ) {
if ( this->pCallbackGuard.get() ) {
epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard );
event.wait ( timeout );
}
@@ -390,3 +461,26 @@ void ca_client_context::blockForEventAndEnableCallbacks ( epicsEvent & event, do
event.wait ( timeout );
}
}
void ca_client_context::messageArrivalNotify ()
{
bool sendNeeded = false;
{
epicsGuard < ca_client_context_mutex > guard ( this->mutex );
if ( this->fdRegFunc && this->noWakeupSincePend ) {
this->noWakeupSincePend = false;
sendNeeded = true;
}
}
if ( sendNeeded ) {
// send short udp message to wake up a file descriptor manager
// when a message arrives
osiSockAddr tmpAddr;
tmpAddr.ia.sin_family = AF_INET;
tmpAddr.ia.sin_addr.s_addr = epicsHTON32 ( INADDR_LOOPBACK );
tmpAddr.ia.sin_port = epicsHTON16 ( this->localPort );
char buf = 0;
sendto ( this->sock, & buf, sizeof ( buf ),
0, & tmpAddr.sa, sizeof ( tmpAddr.sa ) );
}
}