diff --git a/src/db/Makefile b/src/db/Makefile index ce01fb3c1..49fc7e08e 100644 --- a/src/db/Makefile +++ b/src/db/Makefile @@ -63,7 +63,7 @@ LIBSRCS += dbServiceIO.cpp LIBSRCS += dbChannelIO.cpp LIBSRCS += dbPutNotifyIO.cpp LIBSRCS += dbSubscriptionIO.cpp - +LIBSRCS += dbPutNotifyBlocker.cpp LIBRARY_IOC = dbIoc DLL_LIBS = dbStaticIoc ca Com diff --git a/src/db/dbCAC.h b/src/db/dbCAC.h index 5accaa40c..e9c8910b2 100644 --- a/src/db/dbCAC.h +++ b/src/db/dbCAC.h @@ -24,9 +24,12 @@ extern "C" void putNotifyCompletion ( putNotify *ppn ); +class dbChannelIO; +class dbPutNotifyBlocker; + class dbPutNotifyIO : public cacNotifyIO { public: - dbPutNotifyIO ( cacNotify ¬ify ); + dbPutNotifyIO ( cacNotify ¬ify, dbPutNotifyBlocker &blockerIn ); int initiate ( struct dbAddr &addr, unsigned type, unsigned long count, const void *pValue); void destroy (); @@ -34,18 +37,17 @@ public: static void operator delete ( void *pCadaver, size_t size ); private: putNotify pn; + dbPutNotifyBlocker &blocker; bool ioComplete; static tsFreeList < dbPutNotifyIO > freeList; ~dbPutNotifyIO (); // must allocate out of pool friend void putNotifyCompletion ( putNotify *ppn ); }; -class dbChannelIO; - extern "C" void dbSubscriptionEventCallback ( void *pPrivate, struct dbAddr *paddr, int eventsRemaining, struct db_field_log *pfl ); -class dbSubscriptionIO : public cacNotifyIO { +class dbSubscriptionIO : public cacNotifyIO, public tsDLNode { public: dbSubscriptionIO ( dbChannelIO &chanIO, cacNotify &, unsigned type, unsigned long count ); int begin ( struct dbAddr &addr, unsigned mask ); @@ -59,18 +61,38 @@ private: unsigned long count; static tsFreeList < dbSubscriptionIO > freeList; ~dbSubscriptionIO (); // must be allocated from pool - friend void dbSubscriptionEventCallback ( void *user_arg, struct dbAddr *paddr, + friend dbPutNotifyIO::dbPutNotifyIO ( cacNotify ¬ify, dbPutNotifyBlocker &blockerIn ); + friend dbPutNotifyIO::~dbPutNotifyIO ( ); + friend void dbSubscriptionEventCallback ( void *pPrivate, struct dbAddr *paddr, int eventsRemaining, struct db_field_log *pfl ); }; class dbServiceIO; -class dbChannelIO : public cacChannelIO { +class dbPutNotifyBlocker { +public: + dbPutNotifyBlocker ( dbChannelIO &chanIn ); + void destroy (); + static void * operator new ( size_t size ); + static void operator delete ( void *pCadaver, size_t size ); +private: + osiEvent block; + dbPutNotifyIO *pPN; + dbChannelIO &chan; + + static tsFreeList < dbPutNotifyBlocker > freeList; + ~dbPutNotifyBlocker (); // must allocate out of pool + + friend dbPutNotifyIO::dbPutNotifyIO ( cacNotify ¬ify, dbPutNotifyBlocker &blockerIn ); + friend dbPutNotifyIO::~dbPutNotifyIO (); +}; + +class dbChannelIO : public cacLocalChannelIO { public: dbChannelIO ( cacChannel &chan, const dbAddr &addr, dbServiceIO &serviceIO ); void destroy (); void subscriptionUpdate ( unsigned type, unsigned long count, - const struct db_field_log *pfl, cacNotifyIO ¬ify); + const struct db_field_log *pfl, cacNotifyIO ¬ify ); dbEventSubscription subscribe ( dbSubscriptionIO &subscr, unsigned mask ); static void * operator new ( size_t size); @@ -79,7 +101,9 @@ public: private: dbServiceIO &serviceIO; char *pGetCallbackCache; + dbPutNotifyBlocker *pBlocker; unsigned long getCallbackCacheSize; + tsDLList eventq; dbAddr addr; static tsFreeList < dbChannelIO > freeList; @@ -94,13 +118,18 @@ private: int subscribe ( unsigned type, unsigned long count, unsigned mask, cacNotify ¬ify ); short nativeType () const; unsigned long nativeElementCount () const; + + friend dbSubscriptionIO::dbSubscriptionIO ( dbChannelIO &chanIO, cacNotify &, unsigned type, unsigned long count ); + friend dbSubscriptionIO::~dbSubscriptionIO (); + friend dbPutNotifyBlocker::dbPutNotifyBlocker ( dbChannelIO &chanIn ); + friend dbPutNotifyBlocker::~dbPutNotifyBlocker (); }; class dbServiceIO : public cacServiceIO { public: dbServiceIO (); virtual ~dbServiceIO (); - cacChannelIO *createChannelIO ( cacChannel &chan, const char *pName ); + cacLocalChannelIO *createChannelIO ( cacChannel &chan, const char *pName ); void subscriptionUpdate ( struct dbAddr &addr, unsigned type, unsigned long count, const struct db_field_log *pfl, cacNotifyIO ¬ify ); dbEventSubscription subscribe ( struct dbAddr &addr, dbSubscriptionIO &subscr, unsigned mask ); diff --git a/src/db/dbChannelIO.cpp b/src/db/dbChannelIO.cpp index 2d88fe946..c8ba3d4aa 100644 --- a/src/db/dbChannelIO.cpp +++ b/src/db/dbChannelIO.cpp @@ -33,14 +33,31 @@ extern "C" unsigned short dbDBRnewToDBRold[DBR_ENUM+1]; tsFreeList < dbChannelIO > dbChannelIO::freeList; dbChannelIO::dbChannelIO ( cacChannel &chan, const dbAddr &addrIn, dbServiceIO &serviceIO ) : - cacChannelIO ( chan ), serviceIO ( serviceIO ), pGetCallbackCache ( 0 ), - getCallbackCacheSize ( 0ul ), addr ( addrIn ) + cacLocalChannelIO ( chan ), serviceIO ( serviceIO ), pGetCallbackCache ( 0 ), + getCallbackCacheSize ( 0ul ), pBlocker (0), addr ( addrIn ) { + chan.attachIO ( *this ); this->connectNotify (); } dbChannelIO::~dbChannelIO () { + /* + * remove any subscriptions attached to this channel + */ + this->lock (); + tsDLIterBD iter = this->eventq.first (); + while ( iter != iter.eol () ) { + tsDLIterBD next = iter.itemAfter (); + iter->destroy (); + iter = next; + } + this->unlock (); + + if ( this->pBlocker ) { + this->pBlocker->destroy (); + } + if ( this->pGetCallbackCache ) { delete [] this->pGetCallbackCache; } @@ -139,13 +156,26 @@ int dbChannelIO::write ( unsigned type, unsigned long count, const void *pValue, cacNotify ¬ify ) { dbPutNotifyIO *pIO; + if ( count > LONG_MAX ) { return ECA_BADCOUNT; } - pIO = new dbPutNotifyIO ( notify ); + + this->lock (); + if ( ! this->pBlocker ) { + this->pBlocker = new dbPutNotifyBlocker ( *this ); + if ( ! this->pBlocker ) { + this->unlock (); + return ECA_ALLOCMEM; + } + } + this->unlock (); + + pIO = new dbPutNotifyIO ( notify, *this->pBlocker ); if ( ! pIO ) { return ECA_ALLOCMEM; } + int status = pIO->initiate ( this->addr, type, count, pValue ); if ( status != ECA_NORMAL ) { pIO->destroy (); @@ -160,6 +190,7 @@ int dbChannelIO::subscribe ( unsigned type, unsigned long count, if ( ! pIO ) { return ECA_ALLOCMEM; } + int status = pIO->begin ( this->addr, mask ); if ( status != ECA_NORMAL ) { pIO->destroy (); diff --git a/src/db/dbPutNotifyIO.cpp b/src/db/dbPutNotifyIO.cpp index 8096f88a2..4d5626e68 100644 --- a/src/db/dbPutNotifyIO.cpp +++ b/src/db/dbPutNotifyIO.cpp @@ -10,7 +10,8 @@ * Copyright, 1986, The Regents of the University of California. * * - * Author Jeffrey O. Hill + * Author: + * Jeffrey O. Hill * johill@lanl.gov * 505 665 1831 */ @@ -51,12 +52,21 @@ extern "C" void putNotifyCompletion ( putNotify *ppn ) pNotify->destroy (); } -dbPutNotifyIO::dbPutNotifyIO ( cacNotify ¬ifyIn ) : - cacNotifyIO (notifyIn), ioComplete (false) +dbPutNotifyIO::dbPutNotifyIO ( cacNotify ¬ifyIn, dbPutNotifyBlocker &blockerIn ) : + cacNotifyIO ( notifyIn ), blocker ( blockerIn ), ioComplete ( false ) { - memset (&this->pn, '\0', sizeof (this->pn)); + memset ( &this->pn, '\0', sizeof ( this->pn ) ); this->pn.userCallback = putNotifyCompletion; this->pn.usrPvt = this; + // wait for current put notify to complete + this->blocker.chan.lock (); + while ( this->blocker.pPN ) { + this->blocker.chan.unlock (); + this->blocker.block.wait ( 1.0 ); + this->blocker.chan.lock (); + } + this->blocker.pPN = this; + this->blocker.chan.unlock (); } dbPutNotifyIO::~dbPutNotifyIO () @@ -64,6 +74,8 @@ dbPutNotifyIO::~dbPutNotifyIO () if ( ! this->ioComplete ) { dbNotifyCancel ( &this->pn ); } + this->blocker.pPN = 0; + this->blocker.block.signal (); } int dbPutNotifyIO::initiate ( struct dbAddr &addr, unsigned type, diff --git a/src/db/dbServiceIO.cpp b/src/db/dbServiceIO.cpp index 62f98fddd..fdbb7be6f 100644 --- a/src/db/dbServiceIO.cpp +++ b/src/db/dbServiceIO.cpp @@ -60,7 +60,7 @@ dbServiceIO::~dbServiceIO () } } -cacChannelIO *dbServiceIO::createChannelIO ( cacChannel &chan, const char *pName ) +cacLocalChannelIO *dbServiceIO::createChannelIO ( cacChannel &chan, const char *pName ) { struct dbAddr addr; diff --git a/src/db/dbSubscriptionIO.cpp b/src/db/dbSubscriptionIO.cpp index d8ca38453..49ad646a1 100644 --- a/src/db/dbSubscriptionIO.cpp +++ b/src/db/dbSubscriptionIO.cpp @@ -34,10 +34,16 @@ dbSubscriptionIO::dbSubscriptionIO ( dbChannelIO &chanIO, cacNotifyIO ( notifyIn ), chan ( chanIO ), es ( 0 ), type ( typeIn ), count ( countIn ) { + this->chan.lock (); + this->chan.eventq.add ( *this ); + this->chan.unlock (); } dbSubscriptionIO::~dbSubscriptionIO () { + this->chan.lock (); + this->chan.eventq.remove ( *this ); + this->chan.unlock (); if ( this->es ) { db_cancel_event ( this->es ); }