cleaned up implementation of locks

This commit is contained in:
Jeff Hill
2001-03-23 23:00:03 +00:00
parent 02c05f38b1
commit b1d14576c5
7 changed files with 119 additions and 146 deletions

View File

@@ -34,9 +34,9 @@ class dbPutNotifyBlocker;
class dbPutNotifyIO : public cacNotifyIO {
public:
dbPutNotifyIO ( cacNotify &, dbPutNotifyBlocker & );
void cancel ();
void destroy ();
int initiate ( struct dbAddr &addr, unsigned type,
unsigned long count, const void *pValue);
unsigned long count, const void *pValue );
void completion ();
void show ( unsigned level ) const;
//void destroy ();
@@ -48,6 +48,7 @@ protected:
private:
putNotify pn;
dbPutNotifyBlocker &blocker;
void cancel ();
static tsFreeList < dbPutNotifyIO > freeList;
static epicsMutex freeListMutex;
};
@@ -58,7 +59,7 @@ extern "C" void dbSubscriptionEventCallback ( void *pPrivate, struct dbAddr *pad
class dbSubscriptionIO : public cacNotifyIO, public tsDLNode <dbSubscriptionIO> {
public:
dbSubscriptionIO ( dbChannelIO &chanIO, cacNotify &, unsigned type, unsigned long count );
void cancel ();
void destroy ();
int begin ( unsigned mask );
//void destroy ();
void show ( unsigned level ) const;
@@ -72,6 +73,7 @@ private:
dbEventSubscription es;
unsigned type;
unsigned long count;
void cancel ();
static tsFreeList < dbSubscriptionIO > freeList;
static epicsMutex freeListMutex;
friend void dbSubscriptionEventCallback ( void *pPrivate, struct dbAddr *paddr,
@@ -86,7 +88,7 @@ public:
void destroy ();
int initiatePutNotify ( cacNotify &notify, struct dbAddr &addr,
unsigned type, unsigned long count, const void *pValue );
void putNotifyDestroyNotify ();
uninstallPutNotifyIO ( dbPutNotifyIO &io );
dbChannelIO & channel () const;
void show ( unsigned level ) const;
void * operator new ( size_t size );
@@ -107,9 +109,10 @@ public:
dbChannelIO ( cacChannelNotify &notify,
const dbAddr &addr, dbServiceIO &serviceIO );
void destroy ();
void subscriptionUpdate ( unsigned type, unsigned long count,
const struct db_field_log *pfl, dbSubscriptionIO &notify );
void callReadNotify ( unsigned type, unsigned long count,
const struct db_field_log *pfl, cacNotify &notify );
dbEventSubscription subscribe ( dbSubscriptionIO &subscr, unsigned mask );
void uninstallSubscription ( dbSubscriptionIO & );
void show ( unsigned level ) const;
void * operator new ( size_t size);
void operator delete ( void *pCadaver, size_t size );
@@ -117,9 +120,7 @@ protected:
~dbChannelIO (); // allocate only from pool
private:
dbServiceIO &serviceIO;
char *pGetCallbackCache;
dbPutNotifyBlocker *pBlocker;
unsigned long getCallbackCacheSize;
tsDLList < dbSubscriptionIO > eventq;
dbAddr addr;
const char *pName () const;
@@ -134,15 +135,12 @@ private:
unsigned long nativeElementCount () const;
static tsFreeList < dbChannelIO > freeList;
static epicsMutex freeListMutex;
friend dbSubscriptionIO::dbSubscriptionIO ( dbChannelIO &chanIO,
cacNotify &, unsigned type, unsigned long count );
friend dbSubscriptionIO::~dbSubscriptionIO ();
friend class dbAutoScanLockCA;
friend class dbAutoScanLock;
};
class dbAutoScanLock {
public:
dbAutoScanLock ( dbCommon & );
dbAutoScanLock ( const dbChannelIO & );
~dbAutoScanLock ();
private:
dbAutoScanLock ( const dbAutoScanLock & );
@@ -150,21 +148,13 @@ private:
dbCommon & rCommon;
};
class dbAutoScanLockCA : public dbAutoScanLock {
public:
dbAutoScanLockCA ( dbChannelIO & );
private:
dbAutoScanLockCA ( const dbAutoScanLockCA & );
dbAutoScanLockCA & operator = ( const dbAutoScanLockCA & );
};
class dbServiceIO : public cacServiceIO {
public:
dbServiceIO ();
virtual ~dbServiceIO ();
cacChannelIO *createChannelIO ( const char *pName, cacChannelNotify & );
void subscriptionUpdate ( struct dbAddr &addr, unsigned type, unsigned long count,
const struct db_field_log *pfl, dbSubscriptionIO &notify );
void callReadNotify ( struct dbAddr &addr, unsigned type, unsigned long count,
const struct db_field_log *pfl, cacChannelIO &, cacNotify &notify );
dbEventSubscription subscribe ( struct dbAddr &addr, dbSubscriptionIO &subscr, unsigned mask );
void show ( unsigned level ) const;
private:
@@ -174,8 +164,8 @@ private:
mutable epicsMutex mutex;
};
inline dbAutoScanLock :: dbAutoScanLock ( dbCommon & dbCommonIn ) :
rCommon ( dbCommonIn )
inline dbAutoScanLock :: dbAutoScanLock ( const dbChannelIO &chan ) :
rCommon ( *chan.addr.precord )
{
dbScanLock ( &this->rCommon );
}
@@ -184,8 +174,3 @@ inline dbAutoScanLock :: ~dbAutoScanLock ()
{
dbScanUnlock ( &this->rCommon );
}
inline dbAutoScanLockCA::dbAutoScanLockCA ( dbChannelIO &chan ) :
dbAutoScanLock ( *chan.addr.precord )
{
}

View File

@@ -34,8 +34,7 @@ epicsMutex dbChannelIO::freeListMutex;
dbChannelIO::dbChannelIO ( cacChannelNotify &notify,
const dbAddr &addrIn, dbServiceIO &serviceIO ) :
cacChannelIO ( notify ), serviceIO ( serviceIO ),
pGetCallbackCache ( 0 ), pBlocker ( 0 ),
getCallbackCacheSize ( 0ul ), addr ( addrIn )
pBlocker ( 0 ), addr ( addrIn )
{
}
@@ -46,26 +45,13 @@ void dbChannelIO::initiateConnect ()
dbChannelIO::~dbChannelIO ()
{
dbAutoScanLock ( *this->addr.precord );
/*
* remove any subscriptions attached to this channel
*/
tsDLIterBD < dbSubscriptionIO > iter = this->eventq.firstIter ();
while ( iter.valid () ) {
tsDLIterBD <dbSubscriptionIO> next = iter;
next++;
iter->cancel ();
iter = next;
while ( dbSubscriptionIO *pIO = this->eventq.get () ) {
pIO->destroy ();
}
if ( this->pBlocker ) {
this->pBlocker->destroy ();
}
if ( this->pGetCallbackCache ) {
delete [] this->pGetCallbackCache;
}
}
int dbChannelIO::read ( unsigned type, unsigned long count, void *pValue )
@@ -88,37 +74,7 @@ int dbChannelIO::read ( unsigned type, unsigned long count, void *pValue )
int dbChannelIO::read ( unsigned type, unsigned long count, cacNotify &notify )
{
unsigned long size = dbr_size_n ( type, count );
if ( type > INT_MAX ) {
return ECA_BADCOUNT;
}
if ( count > INT_MAX ) {
return ECA_BADCOUNT;
}
{
dbAutoScanLock ( *this->addr.precord );
if ( this->getCallbackCacheSize < size) {
if ( this->pGetCallbackCache ) {
delete [] this->pGetCallbackCache;
}
this->pGetCallbackCache = new char [size];
if ( ! this->pGetCallbackCache ) {
this->getCallbackCacheSize = 0ul;
return ECA_ALLOCMEM;
}
this->getCallbackCacheSize = size;
}
int status = db_get_field ( &this->addr, static_cast <int> ( type ),
this->pGetCallbackCache, static_cast <int> ( count ), 0);
if ( status ) {
notify.exceptionNotify ( *this, ECA_GETFAIL,
"db_get_field () completed unsuccessfuly" );
}
else {
notify.completionNotify ( *this, type, count, this->pGetCallbackCache );
}
}
this->serviceIO.callReadNotify ( this->addr, type, count, 0, *this, notify );
notify.release ();
return ECA_NORMAL;
}
@@ -130,7 +86,7 @@ int dbChannelIO::write ( unsigned type, unsigned long count, const void *pValue
return ECA_BADCOUNT;
}
status = db_put_field ( &this->addr, type, pValue, static_cast <long> (count) );
if (status) {
if ( status ) {
return ECA_PUTFAIL;
}
else {
@@ -146,7 +102,7 @@ int dbChannelIO::write ( unsigned type, unsigned long count,
}
if ( ! this->pBlocker ) {
dbAutoScanLock ( *this->addr.precord );
dbAutoScanLock ( *this );
if ( ! this->pBlocker ) {
this->pBlocker = new dbPutNotifyBlocker ( *this );
if ( ! this->pBlocker ) {
@@ -155,35 +111,35 @@ int dbChannelIO::write ( unsigned type, unsigned long count,
}
}
// must release the lock here so that this can block
// for put notify completion without monopolizing the lock
int status = this->pBlocker->initiatePutNotify ( notify,
this->addr, type, count, pValue );
return status;
return this->pBlocker->initiatePutNotify ( notify,
this->addr, type, count, pValue );
}
int dbChannelIO::subscribe ( unsigned type, unsigned long count,
unsigned mask, cacNotify &notify, cacNotifyIO *&pReturnIO )
{
{
int status;
dbSubscriptionIO *pIO = new dbSubscriptionIO ( *this, notify, type, count );
if ( ! pIO ) {
return ECA_ALLOCMEM;
}
int status = pIO->begin ( mask );
if ( status == ECA_NORMAL ) {
pReturnIO = pIO;
if ( pIO ) {
status = pIO->begin ( mask );
if ( status == ECA_NORMAL ) {
dbAutoScanLock locker ( *this );
this->eventq.add ( *pIO );
pReturnIO = pIO;
}
else {
pIO->destroy ();
}
}
else {
pIO->cancel ();
status = ECA_ALLOCMEM;
}
return status;
}
void dbChannelIO::show ( unsigned level ) const
{
dbAutoScanLock ( *this->addr.precord );
dbAutoScanLock locker ( *this );
printf ("channel at %p attached to local database record %s\n",
static_cast <const void *> ( this ), this->addr.precord->name );
@@ -194,8 +150,6 @@ void dbChannelIO::show ( unsigned level ) const
}
if ( level > 1u ) {
this->serviceIO.show ( level - 2u );
printf ( "\tget callback cache at %p, with size %lu\n",
this->pGetCallbackCache, this->getCallbackCacheSize );
tsDLIterConstBD < dbSubscriptionIO > pItem = this->eventq.firstIter ();
while ( pItem.valid () ) {
pItem->show ( level - 2u );

View File

@@ -54,13 +54,19 @@ inline short dbChannelIO::nativeType () const
return dbDBRnewToDBRold[this->addr.field_type];
}
inline void dbChannelIO::subscriptionUpdate ( unsigned type, unsigned long count,
const struct db_field_log *pfl, dbSubscriptionIO &notify )
inline void dbChannelIO::callReadNotify ( unsigned type, unsigned long count,
const struct db_field_log *pfl, cacNotify &notify )
{
this->serviceIO.subscriptionUpdate ( this->addr, type, count, pfl, notify );
this->serviceIO.callReadNotify ( this->addr, type, count, pfl, *this, notify );
}
inline dbEventSubscription dbChannelIO::subscribe ( dbSubscriptionIO &subscr, unsigned mask )
{
return this->serviceIO.subscribe ( this->addr, subscr, mask );
}
inline void dbChannelIO::uninstallSubscription ( dbSubscriptionIO &subscr )
{
dbAutoScanLock locker ( *this );
this->eventq.remove ( subscr );
}

View File

@@ -46,16 +46,17 @@ dbPutNotifyBlocker::dbPutNotifyBlocker ( dbChannelIO &chanIn ) :
dbPutNotifyBlocker::~dbPutNotifyBlocker ()
{
dbAutoScanLockCA ( this->chan );
if ( this->pPN ) {
this->pPN->cancel ();
this->pPN->destroy ();
}
}
void dbPutNotifyBlocker::putNotifyDestroyNotify ()
dbPutNotifyBlocker::uninstallPutNotifyIO ( dbPutNotifyIO &io )
{
dbAutoScanLockCA ( this->chan );
this->pPN = 0;
dbAutoScanLock ( this->chan );
if ( &io == this->pPN ) {
this->pPN = 0;
}
}
dbChannelIO & dbPutNotifyBlocker::channel () const
@@ -72,36 +73,46 @@ int dbPutNotifyBlocker::initiatePutNotify ( cacNotify &notify,
return ECA_ALLOCMEM;
}
epicsTime begin;
bool beginTimeInit = false;
while ( true ) {
{
dbAutoScanLockCA ( this->chan );
if ( ! this->pPN ) {
dbAutoScanLock ( this->chan );
if ( this->pPN == 0 ) {
this->pPN = pIO;
int status = this->pPN->initiate ( addr, type, count, pValue );
if ( status != ECA_NORMAL ) {
pIO->cancel ();
this->pPN = 0;
}
return status;
break;
}
}
// wait for put notify in progress to complete
this->block.wait ( 1.0 );
epicsTime begin = epicsTime::getCurrent ();
if ( epicsTime::getCurrent () - begin > 30.0 ) {
pIO->cancel ();
return ECA_PUTCBINPROG;
if ( beginTimeInit ) {
if ( epicsTime::getCurrent () - begin > 30.0 ) {
pIO->destroy ();
return ECA_PUTCBINPROG;
}
}
else {
begin = epicsTime::getCurrent ();
beginTimeInit = true;
}
this->block.wait ( 1.0 );
}
int status = pIO->initiate ( addr, type, count, pValue );
if ( status != ECA_NORMAL ) {
pIO->destroy ();
dbAutoScanLock ( this->chan );
this->pPN = 0;
}
return status;
}
extern "C" void putNotifyCompletion ( putNotify *ppn )
{
dbPutNotifyBlocker *pBlocker = static_cast < dbPutNotifyBlocker * > ( ppn->usrPvt );
{
dbAutoScanLockCA ( pBlocker->chan );
pBlocker->pPN->completion ();
pBlocker->pPN->cancel ();
pBlocker->pPN->destroy ();
dbAutoScanLock ( pBlocker->chan );
pBlocker->pPN = 0;
}
pBlocker->block.signal ();

View File

@@ -33,8 +33,8 @@
#define S_db_Blocked (M_dbAccess|39)
#define S_db_Pending (M_dbAccess|37)
tsFreeList <dbPutNotifyIO> dbPutNotifyIO::freeList;
epicsMutex dbPutNotifyIO::freeListMutex;
tsFreeList < dbPutNotifyIO > dbPutNotifyIO :: freeList;
epicsMutex dbPutNotifyIO :: freeListMutex;
dbPutNotifyIO::dbPutNotifyIO ( cacNotify &notifyIn, dbPutNotifyBlocker &blockerIn ) :
cacNotifyIO ( notifyIn ), blocker ( blockerIn )
@@ -49,14 +49,18 @@ dbPutNotifyIO::~dbPutNotifyIO ()
if ( this->pn.paddr ) {
dbNotifyCancel ( &this->pn );
}
this->blocker.putNotifyDestroyNotify ();
}
void dbPutNotifyIO::cancel ()
void dbPutNotifyIO::destroy ()
{
delete this;
}
void dbPutNotifyIO::cancel ()
{
this->blocker.uninstallPutNotifyIO ( *this );
delete this;
}
cacChannelIO & dbPutNotifyIO::channelIO () const
{
@@ -78,8 +82,8 @@ int dbPutNotifyIO::initiate ( struct dbAddr &addr, unsigned type,
this->pn.nRequest = static_cast <unsigned> ( count );
this->pn.paddr = &addr;
status = this->pn.dbrType = dbPutNotifyMapType (
&this->pn, static_cast <short> ( type ) );
if (status) {
&this->pn, static_cast <short> ( type ) );
if ( status ) {
this->pn.paddr = 0;
return ECA_BADTYPE;
}
@@ -88,7 +92,7 @@ int dbPutNotifyIO::initiate ( struct dbAddr &addr, unsigned type,
if ( status && status != S_db_Pending ) {
this->pn.paddr = 0;
this->pn.status = status;
this->notify ().exceptionNotify ( this->blocker.channel (),
this->notify().exceptionNotify ( this->blocker.channel (),
ECA_PUTFAIL, "dbPutNotify() returned failure" );
}
return ECA_NORMAL;
@@ -97,21 +101,21 @@ int dbPutNotifyIO::initiate ( struct dbAddr &addr, unsigned type,
void dbPutNotifyIO::completion ()
{
if ( ! this->pn.paddr ) {
errlogPrintf ( "completion pn=%p\n", this );
errlogPrintf ( "put notify completion pn=%p?\n", this );
}
this->pn.paddr = 0;
if ( this->pn.status ) {
if ( this->pn.status == S_db_Blocked ) {
this->notify ().exceptionNotify ( this->blocker.channel (),
this->notify().exceptionNotify ( this->blocker.channel (),
ECA_PUTCBINPROG, "put notify blocked" );
}
else {
this->notify ().exceptionNotify ( this->blocker.channel (),
this->notify().exceptionNotify ( this->blocker.channel (),
ECA_PUTFAIL, "put notify unsuccessful");
}
}
else {
this->notify ().completionNotify ( this->blocker.channel () );
this->notify().completionNotify ( this->blocker.channel () );
}
}

View File

@@ -75,12 +75,25 @@ cacChannelIO *dbServiceIO::createChannelIO (
}
}
void dbServiceIO::subscriptionUpdate ( struct dbAddr &addr,
void dbServiceIO::callReadNotify ( struct dbAddr &addr,
unsigned type, unsigned long count,
const struct db_field_log *pfl, dbSubscriptionIO &io )
const struct db_field_log *pfl,
cacChannelIO &chan, cacNotify &notify )
{
unsigned long size = dbr_size_n ( type, count );
if ( type > INT_MAX ) {
notify.exceptionNotify ( chan, ECA_BADTYPE,
"type code out of range (high side)" );
return;
}
if ( count > INT_MAX ) {
notify.exceptionNotify ( chan, ECA_BADCOUNT,
"element count out of range (high side)" );
return;
}
epicsAutoMutex locker ( this->mutex );
if ( this->eventCallbackCacheSize < size) {
@@ -90,7 +103,7 @@ void dbServiceIO::subscriptionUpdate ( struct dbAddr &addr,
this->pEventCallbackCache = new char [size];
if ( ! this->pEventCallbackCache ) {
this->eventCallbackCacheSize = 0ul;
io.notify ().exceptionNotify ( io.channelIO (), ECA_ALLOCMEM,
notify.exceptionNotify ( chan, ECA_ALLOCMEM,
"unable to allocate callback cache" );
return;
}
@@ -100,11 +113,11 @@ void dbServiceIO::subscriptionUpdate ( struct dbAddr &addr,
int status = db_get_field ( &addr, static_cast <int> ( type ),
this->pEventCallbackCache, static_cast <int> ( count ), pvfl );
if ( status ) {
io.notify ().exceptionNotify ( io.channelIO (), ECA_GETFAIL,
"subscription update db_get_field () completed unsuccessfuly" );
notify.exceptionNotify ( chan, ECA_GETFAIL,
"db_get_field() completed unsuccessfuly" );
}
else {
io.notify ().completionNotify ( io.channelIO (), type,
notify.completionNotify ( chan, type,
count, this->pEventCallbackCache );
}
}

View File

@@ -36,23 +36,23 @@ dbSubscriptionIO::dbSubscriptionIO ( dbChannelIO &chanIO,
cacNotifyIO ( notifyIn ), chan ( chanIO ), es ( 0 ),
type ( typeIn ), count ( countIn )
{
dbAutoScanLockCA locker ( this->chan );
this->chan.eventq.add ( *this );
}
dbSubscriptionIO::~dbSubscriptionIO ()
{
{
dbAutoScanLockCA locker ( this->chan );
this->chan.eventq.remove ( *this );
}
if ( this->es ) {
db_cancel_event ( this->es );
}
}
void dbSubscriptionIO::destroy ()
{
delete this;
}
void dbSubscriptionIO::cancel ()
{
this->chan.uninstallSubscription ( *this );
delete this;
}
@@ -77,7 +77,7 @@ extern "C" void dbSubscriptionEventCallback ( void *pPrivate, struct dbAddr * /*
int /* eventsRemaining */, struct db_field_log *pfl )
{
dbSubscriptionIO *pIO = static_cast < dbSubscriptionIO * > ( pPrivate );
pIO->chan.subscriptionUpdate ( pIO->type, pIO->count, pfl, *pIO );
pIO->chan.callReadNotify ( pIO->type, pIO->count, pfl, pIO->notify() );
}
int dbSubscriptionIO::begin ( unsigned mask )