moved io in progress counter to old interface

This commit is contained in:
Jeff Hill
2002-05-08 23:33:37 +00:00
parent 8289f8550b
commit 48e23429cb
3 changed files with 268 additions and 84 deletions

View File

@@ -84,10 +84,13 @@ protected:
~oldChannelNotify (); // must allocate from pool
private:
oldCAC & cacCtx;
cacChannel & io;
caCh * pConnCallBack;
void * pPrivate;
caArh * pAccessRightsFunc;
cacChannel & io;
unsigned ioSeqNo;
bool prevConnected;
bool connCallbackInProress;
void connectNotify ();
void disconnectNotify ();
void accessRightsNotify ( const caAccessRights & );
@@ -96,7 +99,6 @@ private:
unsigned type, arrayElementCount count, void *pValue );
void writeException ( int status, const char *pContext,
unsigned type, arrayElementCount count );
bool includeFirstConnectInCountOfOutstandingIO () const;
static epicsSingleton < tsFreeList < struct oldChannelNotify, 1024 > > pFreeList;
oldChannelNotify ( const oldChannelNotify & );
oldChannelNotify & operator = ( const oldChannelNotify & );
@@ -118,7 +120,7 @@ private:
oldCAC &cacCtx;
oldChannelNotify &chan;
void *pValue;
unsigned readSeq;
unsigned ioSeqNo;
unsigned type;
void completion (
unsigned type, arrayElementCount count, const void *pData);
@@ -212,21 +214,21 @@ struct oldCAC : public cacNotify
public:
oldCAC ( bool enablePreemptiveCallback = false );
virtual ~oldCAC ();
void changeExceptionEvent ( caExceptionHandler *pfunc, void *arg );
void registerForFileDescriptorCallBack ( CAFDHANDLER *pFunc, void *pArg );
void replaceErrLogHandler ( caPrintfFunc *ca_printf_func );
void registerService ( cacService &service );
void changeExceptionEvent ( caExceptionHandler * pfunc, void * arg );
void registerForFileDescriptorCallBack ( CAFDHANDLER * pFunc, void * pArg );
void replaceErrLogHandler ( caPrintfFunc * ca_printf_func );
void registerService ( cacService & service );
cacChannel & createChannel ( const char * name_str,
oldChannelNotify & chan, cacChannel::priLev pri );
void flushRequest ();
int pendIO ( const double &timeout );
int pendEvent ( const double &timeout );
int pendIO ( const double & timeout );
int pendEvent ( const double & timeout );
bool ioComplete () const;
void show ( unsigned level ) const;
unsigned connectionCount () const;
unsigned sequenceNumberOfOutstandingIO () const;
void incrementOutstandingIO ();
void decrementOutstandingIO ( unsigned sequenceNo );
void incrementOutstandingIO ( unsigned ioSeqNo );
void decrementOutstandingIO ( unsigned ioSeqNo );
void exception ( int status, const char *pContext,
const char *pFileName, unsigned lineNo );
void exception ( int status, const char *pContext,
@@ -235,7 +237,7 @@ public:
CASG * lookupCASG ( unsigned id );
void installCASG ( CASG & );
void uninstallCASG ( CASG & );
int blockForEventAndEnableCallbacks ( epicsEvent &event, double timeout );
void blockForEventAndEnableCallbacks ( epicsEvent & event, double timeout );
void selfTest ();
// perhaps these should be eliminated in deference to the exception mechanism
int printf ( const char *pformat, ... ) const;
@@ -243,14 +245,19 @@ public:
void vSignal ( int ca_status, const char *pfilenm,
int lineno, const char *pFormat, va_list args );
bool preemptiveCallbakIsEnabled () const;
epicsGuard < callbackMutex > callbackGuardFactory ();
private:
mutable oldCACMutex mutex;
epicsEvent ioDone;
cac & clientCtx;
caExceptionHandler *ca_exception_func;
void *ca_exception_arg;
caPrintfFunc *pVPrintfFunc;
CAFDHANDLER *fdRegFunc;
void *fdRegArg;
epicsGuard < callbackMutex > * pCallbackGuard;
caExceptionHandler * ca_exception_func;
void * ca_exception_arg;
caPrintfFunc * pVPrintfFunc;
CAFDHANDLER * fdRegFunc;
void * fdRegArg;
unsigned pndRecvCnt;
unsigned ioSeqNo;
// this should probably be phased out (its not OS independent)
void fdWasCreated ( int fd );
void fdWasDestroyed ( int fd );
@@ -353,7 +360,7 @@ inline bool oldChannelNotify::connected () const
inline bool oldChannelNotify::previouslyConnected () const
{
return this->io.previouslyConnected ();
return this->prevConnected;
}
inline void oldChannelNotify::hostName ( char *pBuf, unsigned bufLength ) const
@@ -457,46 +464,16 @@ inline cacChannel & oldCAC::createChannel ( const char * name_str,
return this->clientCtx.createChannel ( name_str, chan, pri );
}
inline int oldCAC::pendIO ( const double &timeout )
{
return this->clientCtx.pendIO ( timeout );
}
inline int oldCAC::pendEvent ( const double &timeout )
{
return this->clientCtx.pendEvent ( timeout );
}
inline void oldCAC::flushRequest ()
{
this->clientCtx.flushRequest ();
}
inline bool oldCAC::ioComplete () const
{
return this->clientCtx.ioComplete ();
}
inline unsigned oldCAC::connectionCount () const
{
return this->clientCtx.connectionCount ();
}
inline unsigned oldCAC::sequenceNumberOfOutstandingIO () const
{
return this->clientCtx.sequenceNumberOfOutstandingIO ();
}
inline void oldCAC::incrementOutstandingIO ()
{
this->clientCtx.incrementOutstandingIO ();
}
inline void oldCAC::decrementOutstandingIO ( unsigned sequenceNo )
{
this->clientCtx.decrementOutstandingIO ( sequenceNo );
}
inline CASG * oldCAC::lookupCASG ( unsigned id )
{
return this->clientCtx.lookupCASG ( id );
@@ -512,12 +489,6 @@ inline void oldCAC::uninstallCASG ( CASG &sg )
this->clientCtx.uninstallCASG ( sg );
}
inline int oldCAC::blockForEventAndEnableCallbacks (
epicsEvent &event, double timeout )
{
return this->clientCtx.blockForEventAndEnableCallbacks ( event, timeout );
}
inline void oldCAC::vSignal ( int ca_status, const char *pfilenm,
int lineno, const char *pFormat, va_list args )
{
@@ -535,6 +506,21 @@ inline bool oldCAC::preemptiveCallbakIsEnabled () const
return this->clientCtx.preemptiveCallbakIsEnabled ();
}
inline bool oldCAC::ioComplete () const
{
return ( this->pndRecvCnt == 0u );
}
inline unsigned oldCAC::sequenceNumberOfOutstandingIO () const
{
return this->ioSeqNo;
}
inline epicsGuard < callbackMutex > oldCAC::callbackGuardFactory ()
{
return this->clientCtx.callbackGuardFactory ();
}
inline void oldCACMutex::lock ()
{
this->mutex.lock ();

View File

@@ -18,6 +18,8 @@
# pragma warning(disable:4355)
#endif
#include <stdexcept>
#include <stdio.h>
#define epicsExportSharedSymbols
@@ -29,18 +31,24 @@ extern epicsThreadPrivateId caClientContextId;
oldCAC::oldCAC ( bool enablePreemptiveCallback ) :
clientCtx ( * new cac ( *this, enablePreemptiveCallback ) ),
ca_exception_func ( 0 ), ca_exception_arg ( 0 ),
pVPrintfFunc ( errlogVprintf ), fdRegFunc ( 0 ), fdRegArg ( 0 )
pVPrintfFunc ( errlogVprintf ), fdRegFunc ( 0 ), fdRegArg ( 0 ),
pCallbackGuard ( 0 ), pndRecvCnt ( 0u ), ioSeqNo ( 0u )
{
if ( enablePreemptiveCallback ) {
this->pCallbackGuard = new epicsGuard < callbackMutex >
( this->clientCtx.callbackGuardFactory () );
}
}
oldCAC::~oldCAC ()
{
delete this->pCallbackGuard;
delete & this->clientCtx;
}
void oldCAC::changeExceptionEvent ( caExceptionHandler *pfunc, void *arg )
{
epicsGuard < oldCACMutex > autoMutex ( this->mutex );
epicsGuard < oldCACMutex > guard ( this->mutex );
this->ca_exception_func = pfunc;
this->ca_exception_arg = arg;
// should block here until releated callback in progress completes
@@ -197,6 +205,15 @@ void oldCAC::show ( unsigned level ) const
if ( level > 0u ) {
this->mutex.show ( level - 1u );
this->clientCtx.show ( level - 1u );
::printf ( "\tpreemptive calback is %s\n",
this->pCallbackGuard ? "disabled" : "enabled" );
::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n",
this->pndRecvCnt );
::printf ( "\tthe current io sequence number is %u\n",
this->ioSeqNo );
::printf ( "IO done event:\n");
this->ioDone.show ( level - 1u );
}
}
@@ -206,5 +223,156 @@ void oldCAC::attachToClientCtx ()
epicsThreadPrivateSet ( caClientContextId, this );
}
void oldCAC::incrementOutstandingIO ( unsigned ioSeqNo )
{
if ( this->ioSeqNo == ioSeqNo ) {
epicsGuard < oldCACMutex > guard ( this->mutex );
if ( this->ioSeqNo == ioSeqNo ) {
if ( this->pndRecvCnt < UINT_MAX ) {
this->pndRecvCnt++;
}
else {
throw std::logic_error (
"oldCAC::incrementOutstandingIO() IO counter overflow" );
}
}
}
}
void oldCAC::decrementOutstandingIO ( unsigned ioSeqNo )
{
if ( this->ioSeqNo != ioSeqNo ) {
return;
}
bool signalNeeded;
{
epicsGuard < oldCACMutex > guard ( this->mutex );
if ( this->ioSeqNo == ioSeqNo ) {
if ( this->pndRecvCnt > 0u ) {
this->pndRecvCnt--;
if ( this->pndRecvCnt == 0u ) {
signalNeeded = true;
}
else {
signalNeeded = false;
}
}
else {
signalNeeded = true;
}
}
else {
signalNeeded = false;
}
}
if ( signalNeeded ) {
this->ioDone.signal ();
}
}
// !!!! This routine is only visible in the old interface - or in a new ST interface.
// !!!! In the old interface we restrict thread attach so that calls from threads
// !!!! other than the initializing thread are not allowed if preemptive callback
// !!!! is disabled. This prevents the preemptive callback lock from being released
// !!!! by other threads than the one that locked it.
//
int oldCAC::pendIO ( const double & timeout )
{
// prevent recursion nightmares by disabling calls to
// pendIO () from within a CA callback.
if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
return ECA_EVDISALLOW;
}
int status = ECA_NORMAL;
epicsTime beg_time = epicsTime::getCurrent ();
double remaining = timeout;
this->flushRequest ();
while ( this->pndRecvCnt > 0 ) {
if ( remaining < CAC_SIGNIFICANT_DELAY ) {
status = ECA_TIMEOUT;
break;
}
this->blockForEventAndEnableCallbacks ( this->ioDone, remaining );
double delay = epicsTime::getCurrent () - beg_time;
if ( delay < timeout ) {
remaining = timeout - delay;
}
else {
remaining = 0.0;
}
}
{
epicsGuard < oldCACMutex > guard ( this->mutex );
this->ioSeqNo++;
this->pndRecvCnt = 0u;
}
return status;
}
// !!!! This routine is only visible in the old interface - or in a new ST interface.
// !!!! In the old interface we restrict thread attach so that calls from threads
// !!!! other than the initializing thread are not allowed if preemptive callback
// !!!! is disabled. This prevents the preemptive callback lock from being released
// !!!! by other threads than the one that locked it.
//
// this routine should probably be moved to the oldCAC?
int oldCAC::pendEvent ( const double & timeout )
{
// prevent recursion nightmares by disabling calls to
// pendIO () from within a CA callback.
if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
return ECA_EVDISALLOW;
}
epicsTime current = epicsTime::getCurrent ();
this->flushRequest ();
// process at least once if preemptive callback is disabled
if ( this->pCallbackGuard ) {
epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard );
this->clientCtx.waitUntilNoRecvThreadsPending ();
}
double elapsed = epicsTime::getCurrent() - current;
double delay;
if ( timeout > elapsed ) {
delay = timeout - elapsed;
}
else {
delay = 0.0;
}
if ( delay >= CAC_SIGNIFICANT_DELAY ) {
if ( this->pCallbackGuard ) {
epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard );
epicsThreadSleep ( delay );
}
else {
epicsThreadSleep ( delay );
}
}
return ECA_TIMEOUT;
}
void oldCAC::blockForEventAndEnableCallbacks ( epicsEvent & event, double timeout )
{
if ( this->pCallbackGuard ) {
epicsGuardRelease < callbackMutex > unguard ( *this->pCallbackGuard );
event.wait ( timeout );
}
else {
event.wait ( timeout );
}
}

View File

@@ -27,10 +27,6 @@
epicsSingleton < tsFreeList < struct oldChannelNotify, 1024 > > oldChannelNotify::pFreeList;
extern "C" void cacNoopConnHandler ( struct connection_handler_args )
{
}
extern "C" void cacNoopAccesRightsHandler ( struct access_rights_handler_args )
{
}
@@ -38,15 +34,28 @@ extern "C" void cacNoopAccesRightsHandler ( struct access_rights_handler_args )
oldChannelNotify::oldChannelNotify ( oldCAC & cacIn, const char *pName,
caCh * pConnCallBackIn, void * pPrivateIn, capri priority ) :
cacCtx ( cacIn ),
pConnCallBack ( pConnCallBackIn ? pConnCallBackIn : cacNoopConnHandler ),
pConnCallBack ( pConnCallBackIn ),
pPrivate ( pPrivateIn ), pAccessRightsFunc ( cacNoopAccesRightsHandler ),
io ( cacIn.createChannel ( pName, *this, priority ) )
io ( cacIn.createChannel ( pName, *this, priority ) ),
ioSeqNo ( cacIn.sequenceNumberOfOutstandingIO () ),
prevConnected ( false )
{
// no need to worry about a connect preempting here because
// the connect sequence will not start untill initiateConnect()
// is called
if ( pConnCallBackIn == 0 ) {
this->cacCtx.incrementOutstandingIO ( cacIn.sequenceNumberOfOutstandingIO () );
}
}
oldChannelNotify::~oldChannelNotify ()
{
delete & this->io;
// no need to worry about a connect preempting here because
// the nciu as been deleted
this->cacCtx.decrementOutstandingIO ( this->ioSeqNo );
}
void oldChannelNotify::setPrivatePointer ( void *pPrivateIn )
@@ -59,15 +68,6 @@ void * oldChannelNotify::privatePointer () const
return this->pPrivate;
}
int oldChannelNotify::changeConnCallBack ( caCh *pfunc )
{
this->pConnCallBack = pfunc ? pfunc : cacNoopConnHandler;
// test for NOOP connection handler does _not_ occur here because the
// lock is not applied
this->io.notifyStateChangeFirstConnectInCountOfOutstandingIO ();
return ECA_NORMAL;
}
int oldChannelNotify::replaceAccessRightsEvent ( caArh *pfunc )
{
// The order of the following is significant to guarantee that the
@@ -87,20 +87,55 @@ int oldChannelNotify::replaceAccessRightsEvent ( caArh *pfunc )
return ECA_NORMAL;
}
int oldChannelNotify::changeConnCallBack ( caCh * pfunc )
{
epicsGuard < callbackMutex > callbackGuard =
this->cacCtx.callbackGuardFactory ();
if ( ! this->prevConnected ) {
if ( pfunc ) {
if ( ! this->pConnCallBack ) {
this->cacCtx.decrementOutstandingIO ( this->ioSeqNo );
}
}
else {
if ( this->pConnCallBack ) {
this->cacCtx.incrementOutstandingIO ( this->ioSeqNo );
}
}
}
this->pConnCallBack = pfunc;
return ECA_NORMAL;
}
void oldChannelNotify::connectNotify ()
{
struct connection_handler_args args;
args.chid = this;
args.op = CA_OP_CONN_UP;
( *this->pConnCallBack ) ( args );
this->prevConnected = true;
if ( this->pConnCallBack ) {
struct connection_handler_args args;
args.chid = this;
args.op = CA_OP_CONN_UP;
( *this->pConnCallBack ) ( args );
}
else {
this->cacCtx.decrementOutstandingIO ( this->ioSeqNo );
}
}
void oldChannelNotify::disconnectNotify ()
{
struct connection_handler_args args;
args.chid = this;
args.op = CA_OP_CONN_DOWN;
( *this->pConnCallBack ) ( args );
if ( this->pConnCallBack ) {
struct connection_handler_args args;
args.chid = this;
args.op = CA_OP_CONN_DOWN;
( *this->pConnCallBack ) ( args );
}
else {
this->cacCtx.incrementOutstandingIO ( this->ioSeqNo );
}
}
void oldChannelNotify::accessRightsNotify ( const caAccessRights &ar )
@@ -131,11 +166,6 @@ void oldChannelNotify::writeException ( int status, const char *pContext,
__FILE__, __LINE__, *this, type, count, CA_OP_PUT );
}
bool oldChannelNotify::includeFirstConnectInCountOfOutstandingIO () const
{
return ( this->pConnCallBack == cacNoopConnHandler );
}
void * oldChannelNotify::operator new ( size_t size )
{
return oldChannelNotify::pFreeList->allocate ( size );