added proper rundown of db events and db put call backs
This commit is contained in:
+1
-1
@@ -63,7 +63,7 @@ LIBSRCS += dbServiceIO.cpp
|
||||
LIBSRCS += dbChannelIO.cpp
|
||||
LIBSRCS += dbPutNotifyIO.cpp
|
||||
LIBSRCS += dbSubscriptionIO.cpp
|
||||
|
||||
LIBSRCS += dbPutNotifyBlocker.cpp
|
||||
|
||||
LIBRARY_IOC = dbIoc
|
||||
DLL_LIBS = dbStaticIoc ca Com
|
||||
|
||||
+37
-8
@@ -24,9 +24,12 @@
|
||||
|
||||
extern "C" void putNotifyCompletion ( putNotify *ppn );
|
||||
|
||||
class dbChannelIO;
|
||||
class dbPutNotifyBlocker;
|
||||
|
||||
class dbPutNotifyIO : public cacNotifyIO {
|
||||
public:
|
||||
dbPutNotifyIO ( cacNotify ¬ify );
|
||||
dbPutNotifyIO ( cacNotify ¬ify, dbPutNotifyBlocker &blockerIn );
|
||||
int initiate ( struct dbAddr &addr, unsigned type,
|
||||
unsigned long count, const void *pValue);
|
||||
void destroy ();
|
||||
@@ -34,18 +37,17 @@ public:
|
||||
static void operator delete ( void *pCadaver, size_t size );
|
||||
private:
|
||||
putNotify pn;
|
||||
dbPutNotifyBlocker &blocker;
|
||||
bool ioComplete;
|
||||
static tsFreeList < dbPutNotifyIO > freeList;
|
||||
~dbPutNotifyIO (); // must allocate out of pool
|
||||
friend void putNotifyCompletion ( putNotify *ppn );
|
||||
};
|
||||
|
||||
class dbChannelIO;
|
||||
|
||||
extern "C" void dbSubscriptionEventCallback ( void *pPrivate, struct dbAddr *paddr,
|
||||
int eventsRemaining, struct db_field_log *pfl );
|
||||
|
||||
class dbSubscriptionIO : public cacNotifyIO {
|
||||
class dbSubscriptionIO : public cacNotifyIO, public tsDLNode <dbSubscriptionIO> {
|
||||
public:
|
||||
dbSubscriptionIO ( dbChannelIO &chanIO, cacNotify &, unsigned type, unsigned long count );
|
||||
int begin ( struct dbAddr &addr, unsigned mask );
|
||||
@@ -59,18 +61,38 @@ private:
|
||||
unsigned long count;
|
||||
static tsFreeList < dbSubscriptionIO > freeList;
|
||||
~dbSubscriptionIO (); // must be allocated from pool
|
||||
friend void dbSubscriptionEventCallback ( void *user_arg, struct dbAddr *paddr,
|
||||
friend dbPutNotifyIO::dbPutNotifyIO ( cacNotify ¬ify, dbPutNotifyBlocker &blockerIn );
|
||||
friend dbPutNotifyIO::~dbPutNotifyIO ( );
|
||||
friend void dbSubscriptionEventCallback ( void *pPrivate, struct dbAddr *paddr,
|
||||
int eventsRemaining, struct db_field_log *pfl );
|
||||
};
|
||||
|
||||
class dbServiceIO;
|
||||
|
||||
class dbChannelIO : public cacChannelIO {
|
||||
class dbPutNotifyBlocker {
|
||||
public:
|
||||
dbPutNotifyBlocker ( dbChannelIO &chanIn );
|
||||
void destroy ();
|
||||
static void * operator new ( size_t size );
|
||||
static void operator delete ( void *pCadaver, size_t size );
|
||||
private:
|
||||
osiEvent block;
|
||||
dbPutNotifyIO *pPN;
|
||||
dbChannelIO &chan;
|
||||
|
||||
static tsFreeList < dbPutNotifyBlocker > freeList;
|
||||
~dbPutNotifyBlocker (); // must allocate out of pool
|
||||
|
||||
friend dbPutNotifyIO::dbPutNotifyIO ( cacNotify ¬ify, dbPutNotifyBlocker &blockerIn );
|
||||
friend dbPutNotifyIO::~dbPutNotifyIO ();
|
||||
};
|
||||
|
||||
class dbChannelIO : public cacLocalChannelIO {
|
||||
public:
|
||||
dbChannelIO ( cacChannel &chan, const dbAddr &addr, dbServiceIO &serviceIO );
|
||||
void destroy ();
|
||||
void subscriptionUpdate ( unsigned type, unsigned long count,
|
||||
const struct db_field_log *pfl, cacNotifyIO ¬ify);
|
||||
const struct db_field_log *pfl, cacNotifyIO ¬ify );
|
||||
dbEventSubscription subscribe ( dbSubscriptionIO &subscr, unsigned mask );
|
||||
|
||||
static void * operator new ( size_t size);
|
||||
@@ -79,7 +101,9 @@ public:
|
||||
private:
|
||||
dbServiceIO &serviceIO;
|
||||
char *pGetCallbackCache;
|
||||
dbPutNotifyBlocker *pBlocker;
|
||||
unsigned long getCallbackCacheSize;
|
||||
tsDLList <dbSubscriptionIO> eventq;
|
||||
dbAddr addr;
|
||||
|
||||
static tsFreeList < dbChannelIO > freeList;
|
||||
@@ -94,13 +118,18 @@ private:
|
||||
int subscribe ( unsigned type, unsigned long count, unsigned mask, cacNotify ¬ify );
|
||||
short nativeType () const;
|
||||
unsigned long nativeElementCount () const;
|
||||
|
||||
friend dbSubscriptionIO::dbSubscriptionIO ( dbChannelIO &chanIO, cacNotify &, unsigned type, unsigned long count );
|
||||
friend dbSubscriptionIO::~dbSubscriptionIO ();
|
||||
friend dbPutNotifyBlocker::dbPutNotifyBlocker ( dbChannelIO &chanIn );
|
||||
friend dbPutNotifyBlocker::~dbPutNotifyBlocker ();
|
||||
};
|
||||
|
||||
class dbServiceIO : public cacServiceIO {
|
||||
public:
|
||||
dbServiceIO ();
|
||||
virtual ~dbServiceIO ();
|
||||
cacChannelIO *createChannelIO ( cacChannel &chan, const char *pName );
|
||||
cacLocalChannelIO *createChannelIO ( cacChannel &chan, const char *pName );
|
||||
void subscriptionUpdate ( struct dbAddr &addr, unsigned type, unsigned long count,
|
||||
const struct db_field_log *pfl, cacNotifyIO ¬ify );
|
||||
dbEventSubscription subscribe ( struct dbAddr &addr, dbSubscriptionIO &subscr, unsigned mask );
|
||||
|
||||
+34
-3
@@ -33,14 +33,31 @@ extern "C" unsigned short dbDBRnewToDBRold[DBR_ENUM+1];
|
||||
tsFreeList < dbChannelIO > dbChannelIO::freeList;
|
||||
|
||||
dbChannelIO::dbChannelIO ( cacChannel &chan, const dbAddr &addrIn, dbServiceIO &serviceIO ) :
|
||||
cacChannelIO ( chan ), serviceIO ( serviceIO ), pGetCallbackCache ( 0 ),
|
||||
getCallbackCacheSize ( 0ul ), addr ( addrIn )
|
||||
cacLocalChannelIO ( chan ), serviceIO ( serviceIO ), pGetCallbackCache ( 0 ),
|
||||
getCallbackCacheSize ( 0ul ), pBlocker (0), addr ( addrIn )
|
||||
{
|
||||
chan.attachIO ( *this );
|
||||
this->connectNotify ();
|
||||
}
|
||||
|
||||
dbChannelIO::~dbChannelIO ()
|
||||
{
|
||||
/*
|
||||
* remove any subscriptions attached to this channel
|
||||
*/
|
||||
this->lock ();
|
||||
tsDLIterBD <dbSubscriptionIO> iter = this->eventq.first ();
|
||||
while ( iter != iter.eol () ) {
|
||||
tsDLIterBD <dbSubscriptionIO> next = iter.itemAfter ();
|
||||
iter->destroy ();
|
||||
iter = next;
|
||||
}
|
||||
this->unlock ();
|
||||
|
||||
if ( this->pBlocker ) {
|
||||
this->pBlocker->destroy ();
|
||||
}
|
||||
|
||||
if ( this->pGetCallbackCache ) {
|
||||
delete [] this->pGetCallbackCache;
|
||||
}
|
||||
@@ -139,13 +156,26 @@ int dbChannelIO::write ( unsigned type, unsigned long count,
|
||||
const void *pValue, cacNotify ¬ify )
|
||||
{
|
||||
dbPutNotifyIO *pIO;
|
||||
|
||||
if ( count > LONG_MAX ) {
|
||||
return ECA_BADCOUNT;
|
||||
}
|
||||
pIO = new dbPutNotifyIO ( notify );
|
||||
|
||||
this->lock ();
|
||||
if ( ! this->pBlocker ) {
|
||||
this->pBlocker = new dbPutNotifyBlocker ( *this );
|
||||
if ( ! this->pBlocker ) {
|
||||
this->unlock ();
|
||||
return ECA_ALLOCMEM;
|
||||
}
|
||||
}
|
||||
this->unlock ();
|
||||
|
||||
pIO = new dbPutNotifyIO ( notify, *this->pBlocker );
|
||||
if ( ! pIO ) {
|
||||
return ECA_ALLOCMEM;
|
||||
}
|
||||
|
||||
int status = pIO->initiate ( this->addr, type, count, pValue );
|
||||
if ( status != ECA_NORMAL ) {
|
||||
pIO->destroy ();
|
||||
@@ -160,6 +190,7 @@ int dbChannelIO::subscribe ( unsigned type, unsigned long count,
|
||||
if ( ! pIO ) {
|
||||
return ECA_ALLOCMEM;
|
||||
}
|
||||
|
||||
int status = pIO->begin ( this->addr, mask );
|
||||
if ( status != ECA_NORMAL ) {
|
||||
pIO->destroy ();
|
||||
|
||||
@@ -10,7 +10,8 @@
|
||||
* Copyright, 1986, The Regents of the University of California.
|
||||
*
|
||||
*
|
||||
* Author Jeffrey O. Hill
|
||||
* Author:
|
||||
* Jeffrey O. Hill
|
||||
* johill@lanl.gov
|
||||
* 505 665 1831
|
||||
*/
|
||||
@@ -51,12 +52,21 @@ extern "C" void putNotifyCompletion ( putNotify *ppn )
|
||||
pNotify->destroy ();
|
||||
}
|
||||
|
||||
dbPutNotifyIO::dbPutNotifyIO ( cacNotify ¬ifyIn ) :
|
||||
cacNotifyIO (notifyIn), ioComplete (false)
|
||||
dbPutNotifyIO::dbPutNotifyIO ( cacNotify ¬ifyIn, dbPutNotifyBlocker &blockerIn ) :
|
||||
cacNotifyIO ( notifyIn ), blocker ( blockerIn ), ioComplete ( false )
|
||||
{
|
||||
memset (&this->pn, '\0', sizeof (this->pn));
|
||||
memset ( &this->pn, '\0', sizeof ( this->pn ) );
|
||||
this->pn.userCallback = putNotifyCompletion;
|
||||
this->pn.usrPvt = this;
|
||||
// wait for current put notify to complete
|
||||
this->blocker.chan.lock ();
|
||||
while ( this->blocker.pPN ) {
|
||||
this->blocker.chan.unlock ();
|
||||
this->blocker.block.wait ( 1.0 );
|
||||
this->blocker.chan.lock ();
|
||||
}
|
||||
this->blocker.pPN = this;
|
||||
this->blocker.chan.unlock ();
|
||||
}
|
||||
|
||||
dbPutNotifyIO::~dbPutNotifyIO ()
|
||||
@@ -64,6 +74,8 @@ dbPutNotifyIO::~dbPutNotifyIO ()
|
||||
if ( ! this->ioComplete ) {
|
||||
dbNotifyCancel ( &this->pn );
|
||||
}
|
||||
this->blocker.pPN = 0;
|
||||
this->blocker.block.signal ();
|
||||
}
|
||||
|
||||
int dbPutNotifyIO::initiate ( struct dbAddr &addr, unsigned type,
|
||||
|
||||
@@ -60,7 +60,7 @@ dbServiceIO::~dbServiceIO ()
|
||||
}
|
||||
}
|
||||
|
||||
cacChannelIO *dbServiceIO::createChannelIO ( cacChannel &chan, const char *pName )
|
||||
cacLocalChannelIO *dbServiceIO::createChannelIO ( cacChannel &chan, const char *pName )
|
||||
{
|
||||
struct dbAddr addr;
|
||||
|
||||
|
||||
@@ -34,10 +34,16 @@ dbSubscriptionIO::dbSubscriptionIO ( dbChannelIO &chanIO,
|
||||
cacNotifyIO ( notifyIn ), chan ( chanIO ), es ( 0 ),
|
||||
type ( typeIn ), count ( countIn )
|
||||
{
|
||||
this->chan.lock ();
|
||||
this->chan.eventq.add ( *this );
|
||||
this->chan.unlock ();
|
||||
}
|
||||
|
||||
dbSubscriptionIO::~dbSubscriptionIO ()
|
||||
{
|
||||
this->chan.lock ();
|
||||
this->chan.eventq.remove ( *this );
|
||||
this->chan.unlock ();
|
||||
if ( this->es ) {
|
||||
db_cancel_event ( this->es );
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user