use guard class to manage mutex locks

This commit is contained in:
Jeff Hill
2002-08-09 23:40:18 +00:00
parent a630ecefc9
commit 7cc5842830
20 changed files with 276 additions and 448 deletions
+6 -23
View File
@@ -20,7 +20,6 @@
#include "caServerIIL.h" // caServerI in line func
#include "casCoreClientIL.h" // casCoreClient in line func
#include "casEventSysIL.h" // casEventSys in line func
#include "casAsyncIOIIL.h" // casAsyncIOI in line func
#include "casCtxIL.h" // casCtx in line func
//
@@ -78,8 +77,6 @@ casAsyncIOI::casAsyncIOI (casCoreClient &clientIn) :
//
casAsyncIOI::~casAsyncIOI()
{
this->lock();
if (!this->serverDelete) {
fprintf(stderr,
"WARNING: An async IO operation was deleted prematurely\n");
@@ -93,6 +90,8 @@ casAsyncIOI::~casAsyncIOI()
"WARNING: by deleting the async IO object.\n");
}
epicsGuard < casCoreClient > guard ( this->client );
//
// pulls itself out of the event queue
// if it is installed there
@@ -100,8 +99,6 @@ casAsyncIOI::~casAsyncIOI()
if (this->inTheEventQueue) {
this->client.casEventSys::removeFromEventQueue(*this);
}
this->unlock();
}
//
@@ -110,26 +107,22 @@ casAsyncIOI::~casAsyncIOI()
//
caStatus casAsyncIOI::cbFunc(class casEventSys &)
{
casCoreClient &theClient = this->client;
caStatus status;
//
// Use the client's lock here (which is the same as the
// asynch IO's lock) here because we need to leave the lock
// applied around the destroy() call here.
//
theClient.lock();
epicsGuard < casCoreClient > guard ( this->client );
this->inTheEventQueue = FALSE;
status = this->cbFuncAsyncIO();
caStatus status = this->cbFuncAsyncIO();
if (status == S_cas_sendBlocked) {
//
// causes this op to be pushed back on the queue
//
this->inTheEventQueue = TRUE;
this->unlock();
return status;
}
else if (status != S_cas_success) {
@@ -144,8 +137,6 @@ caStatus casAsyncIOI::cbFunc(class casEventSys &)
//
this->serverDestroy();
theClient.unlock();
return S_cas_success;
}
@@ -164,7 +155,7 @@ caStatus casAsyncIOI::postIOCompletionI()
return S_cas_redundantPost;
}
this->lock();
epicsGuard < casCoreClient > guard ( this->client );
if (this->duplicate) {
errMessage(S_cas_badParameter,
@@ -173,7 +164,6 @@ caStatus casAsyncIOI::postIOCompletionI()
// dont use "this" after potentially destroying the
// object here
//
this->unlock();
this->serverDestroy();
return S_cas_redundantPost;
}
@@ -182,7 +172,6 @@ caStatus casAsyncIOI::postIOCompletionI()
// verify that they dont post completion more than once
//
if (this->posted) {
this->unlock();
return S_cas_redundantPost;
}
@@ -198,8 +187,6 @@ caStatus casAsyncIOI::postIOCompletionI()
this->inTheEventQueue = TRUE;
this->client.casEventSys::addToEventQueue(*this);
this->unlock();
return S_cas_success;
}
@@ -228,13 +215,11 @@ epicsShareFunc bool casAsyncIOI::readOP() const
//
void casAsyncIOI::serverDestroyIfReadOP()
{
casCoreClient &clientCopy = this->client;
//
// client lock used because this object's
// lock may be destroyed
//
clientCopy.lock();
epicsGuard < casCoreClient > guard ( this->client );
if (this->readOP()) {
this->serverDestroy();
@@ -244,8 +229,6 @@ void casAsyncIOI::serverDestroyIfReadOP()
// NO REF TO THIS OBJECT BELOW HERE
// BECAUSE OF THE DELETE ABOVE
//
clientCopy.unlock();
}
//
-44
View File
@@ -1,44 +0,0 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef casAsyncIOIIL_h
#define casAsyncIOIIL_h
//
// void casAsyncIOI::lock()
//
// NOTE:
// If this changed to use another lock other than the one in the
// client structure then the locking in casAsyncIOI::cbFunc()
// must be changed
//
inline void casAsyncIOI::lock()
{
client.lock();
}
//
// void casAsyncIOI::unlock()
//
inline void casAsyncIOI::unlock()
{
client.unlock();
}
#endif // casAsyncIOIIL_h
-1
View File
@@ -16,7 +16,6 @@
*/
#include "server.h"
#include "casAsyncIOIIL.h" // casAsyncIOI in line func
#include "casChannelIIL.h" // casChannelI in line func
#include "casCtxIL.h" // casCtx in line func
#include "casCoreClientIL.h" // casCoreClient in line func
-1
View File
@@ -17,7 +17,6 @@
#include "server.h"
#include "casAsyncIOIIL.h" // casAsyncIOI in line func
#include "casChannelIIL.h" // casChannelI in line func
#include "casCtxIL.h" // casCtx in line func
#include "casCoreClientIL.h" // casCoreClient in line func
+6 -9
View File
@@ -17,7 +17,6 @@
#include "server.h"
#include "casAsyncIOIIL.h" // casAsyncIOI in line func
#include "casChannelIIL.h" // casChannelI in line func
#include "casCtxIL.h" // casCtxI in line func
@@ -38,11 +37,8 @@ casAsyncReadIO::casAsyncReadIO ( const casCtx & ctx ) :
//
casAsyncReadIO::~casAsyncReadIO ()
{
this->lock();
epicsGuard < casCoreClient > guard ( this->client );
this->chan.removeAsyncIO ( *this );
this->unlock();
}
//
@@ -51,11 +47,12 @@ casAsyncReadIO::~casAsyncReadIO ()
caStatus casAsyncReadIO::postIOCompletion (caStatus completionStatusIn,
const gdd &valueRead)
{
this->lock();
this->pDD = &valueRead;
this->unlock();
{
epicsGuard < casCoreClient > guard ( this->client );
this->pDD = &valueRead;
this->completionStatus = completionStatusIn;
}
this->completionStatus = completionStatusIn;
return this->postIOCompletionI();
}
+1 -3
View File
@@ -18,7 +18,6 @@
#include "server.h"
#include "casChannelIIL.h" // casChannelI in line func
#include "casAsyncIOIIL.h" // casAsyncIOI in line func
#include "casCtxIL.h" // casCtx in line func
//
@@ -39,9 +38,8 @@ casAsyncWriteIO::casAsyncWriteIO(const casCtx &ctx) :
//
casAsyncWriteIO::~casAsyncWriteIO()
{
this->lock();
epicsGuard < casCoreClient > guard ( this->client );
this->chan.removeAsyncIO(*this);
this->unlock();
}
//
+4 -12
View File
@@ -17,7 +17,6 @@
#include "server.h"
#include "casEventSysIL.h" // casEventSys inline func
#include "casAsyncIOIIL.h" // casAsyncIOI inline func
#include "casPVIIL.h" // casPVI inline func
#include "casCtxIL.h" // casCtx inline func
@@ -43,7 +42,7 @@ void casChannelI::bindToClientI ( casCoreClient & client, casPVI & pv, caResId c
//
casChannelI::~casChannelI()
{
this->lock();
epicsGuard < casCoreClient > guard ( * this->pClient );
//
// cancel any pending asynchronous IO
@@ -79,8 +78,6 @@ casChannelI::~casChannelI()
// force PV delete if this is the last channel attached
//
this->pPV->deleteSignal();
this->unlock();
}
//
@@ -88,7 +85,7 @@ casChannelI::~casChannelI()
//
void casChannelI::clearOutstandingReads()
{
this->lock();
epicsGuard < casCoreClient > guard ( * this->pClient );
//
// cancel any pending asynchronous IO
@@ -103,8 +100,6 @@ void casChannelI::clearOutstandingReads()
iterIO->serverDestroyIfReadOP();
iterIO = tmp;
}
this->unlock();
}
//
@@ -112,7 +107,7 @@ void casChannelI::clearOutstandingReads()
//
void casChannelI::show ( unsigned level ) const
{
this->lock ();
epicsGuard < casCoreClient > guard ( * this->pClient );
tsDLIterConst <casMonitor> iter = this->monitorList.firstIter ();
if ( iter.valid () ) {
@@ -125,8 +120,6 @@ void casChannelI::show ( unsigned level ) const
}
this->show ( level );
this->unlock ();
}
//
@@ -201,7 +194,7 @@ void casChannelI::destroyClientNotify ()
//
tsDLIter <casMonitor> casChannelI::findMonitor (const caResId clientIdIn)
{
this->lock ();
epicsGuard < casCoreClient > guard ( * this->pClient );
tsDLIter <casMonitor> iter = this->monitorList.firstIter ();
while ( iter.valid () ) {
if ( clientIdIn == iter->getClientId () ) {
@@ -209,7 +202,6 @@ tsDLIter <casMonitor> casChannelI::findMonitor (const caResId clientIdIn)
}
iter++;
}
this->unlock ();
return iter;
}
+10 -34
View File
@@ -22,36 +22,17 @@
#include "casCoreClientIL.h"
#include "casEventSysIL.h"
//
// casChannelI::lock()
//
inline void casChannelI::lock() const
{
this->pClient->lock();
}
//
// casChannelI::unlock()
//
inline void casChannelI::unlock() const
{
this->pClient->unlock();
}
//
// casChannelI::postEvent()
//
inline void casChannelI::postEvent (const casEventMask &select, const gdd &event)
{
this->lock();
epicsGuard < casCoreClient > guard ( * this->pClient );
tsDLIter<casMonitor> iter = this->monitorList.firstIter ();
while ( iter.valid () ) {
iter->post (select, event);
++iter;
}
this->unlock();
}
@@ -60,13 +41,11 @@ inline void casChannelI::postEvent (const casEventMask &select, const gdd &event
//
inline void casChannelI::deleteMonitor(casMonitor &mon)
{
casRes *pRes;
this->lock();
epicsGuard < casCoreClient > guard ( * this->pClient );
this->getClient().casEventSys::removeMonitor();
this->monitorList.remove(mon);
pRes = this->getClient().getCAS().removeItem(mon);
this->unlock();
assert(&mon == (casMonitor *)pRes);
casRes *pRes = this->getClient().getCAS().removeItem(mon);
assert ( & mon == (casMonitor *) pRes );
}
//
@@ -74,11 +53,10 @@ inline void casChannelI::deleteMonitor(casMonitor &mon)
//
inline void casChannelI::addMonitor(casMonitor &mon)
{
this->lock();
epicsGuard < casCoreClient > guard ( * this->pClient );
this->monitorList.add(mon);
this->getClient().getCAS().installItem(mon);
this->getClient().casEventSys::installMonitor();
this->unlock();
}
//
@@ -100,9 +78,8 @@ inline void casChannelI::destroyNoClientNotify()
//
inline void casChannelI::installAsyncIO(casAsyncIOI &io)
{
this->lock();
this->ioInProgList.add(io);
this->unlock();
epicsGuard < casCoreClient > guard ( * this->pClient );
this->ioInProgList.add(io);
}
//
@@ -110,10 +87,9 @@ inline void casChannelI::installAsyncIO(casAsyncIOI &io)
//
inline void casChannelI::removeAsyncIO(casAsyncIOI &io)
{
this->lock();
this->ioInProgList.remove(io);
this->pPV->unregisterIO();
this->unlock();
epicsGuard < casCoreClient > guard ( * this->pClient );
this->ioInProgList.remove(io);
this->pPV->unregisterIO();
}
//
+7 -9
View File
@@ -18,7 +18,6 @@
#include "server.h"
#include "caServerIIL.h" // caServerI in line func
#include "casAsyncIOIIL.h" // casAsyncIOI in line func
#include "casEventSysIL.h" // casEventSys in line func
#include "casCtxIL.h" // casCtx in line func
#include "inBufIL.h" // inBuf in line func
@@ -43,7 +42,8 @@ casCoreClient::~casCoreClient()
errlogPrintf ("CAS: Connection Terminated\n");
}
this->lock();
epicsGuard < epicsMutex > guard ( this->mutex );
tsDLIter<casAsyncIOI> iterIO = this->ioInProgList.firstIter ();
//
@@ -58,8 +58,6 @@ casCoreClient::~casCoreClient()
iterIO->serverDestroy ();
iterIO = tmpIO;
}
this->unlock();
}
//
@@ -81,11 +79,11 @@ caStatus casCoreClient::disconnectChan(caResId)
void casCoreClient::show (unsigned level) const
{
printf ("Core client\n");
this->casEventSys::show (level);
printf ("\t%d io ops in progess\n", this->ioInProgList.count());
this->ctx.show (level);
this->epicsMutex::show (level);
printf ( "Core client\n" );
this->casEventSys::show ( level );
printf ( "\t%d io ops in progess\n", this->ioInProgList.count() );
this->ctx.show ( level );
this->mutex.show ( level );
}
//
+13 -4
View File
@@ -22,6 +22,17 @@
#include "caServerIIL.h" // caServerI in line func
#include "casCtxIL.h" // casEventSys in line func
inline void casCoreClient::lock ()
{
this->mutex.lock ();
}
inline void casCoreClient::unlock ()
{
this->mutex.unlock ();
}
//
// casCoreClient::getCAS()
//
@@ -35,9 +46,8 @@ inline caServerI &casCoreClient::getCAS() const
//
inline void casCoreClient::installAsyncIO(casAsyncIOI &ioIn)
{
this->lock();
epicsGuard < epicsMutex > guard ( this->mutex );
this->ioInProgList.add(ioIn);
this->unlock();
}
//
@@ -45,10 +55,9 @@ inline void casCoreClient::installAsyncIO(casAsyncIOI &ioIn)
//
inline void casCoreClient::removeAsyncIO(casAsyncIOI &ioIn)
{
this->lock();
epicsGuard < epicsMutex > guard ( this->mutex );
this->ioInProgList.remove(ioIn);
this->ctx.getServer()->ioBlockedList::signal();
this->unlock();
}
#endif // casCoreClientIL_h
+85 -92
View File
@@ -48,25 +48,22 @@ void casEventSys::show(unsigned level) const
//
casEventSys::~casEventSys()
{
casEvent *pE;
this->mutex.lock();
epicsGuard < epicsMutex > guard ( this->mutex );
if (this->pPurgeEvent != NULL) {
this->eventLogQue.remove(*this->pPurgeEvent);
if ( this->pPurgeEvent != NULL ) {
this->eventLogQue.remove ( *this->pPurgeEvent );
delete this->pPurgeEvent;
}
/*
* all active event blocks must be canceled first
*/
casVerify (this->numEventBlocks==0);
casVerify ( this->numEventBlocks == 0 );
while ( (pE = this->eventLogQue.get()) ) {
while ( casEvent *pE = this->eventLogQue.get () ) {
delete pE;
}
this->mutex.unlock();
}
//
@@ -74,10 +71,9 @@ casEventSys::~casEventSys()
//
void casEventSys::installMonitor()
{
this->mutex.lock();
epicsGuard < epicsMutex > guard ( this->mutex );
this->numEventBlocks++;
this->maxLogEntries += averageEventEntries;
this->mutex.unlock();
}
//
@@ -85,73 +81,71 @@ void casEventSys::installMonitor()
//
void casEventSys::removeMonitor()
{
this->mutex.lock();
epicsGuard < epicsMutex > guard ( this->mutex );
assert (this->numEventBlocks>=1u);
this->numEventBlocks--;
this->maxLogEntries -= averageEventEntries;
this->mutex.unlock();
}
//
// casEventSys::process()
//
casProcCond casEventSys::process()
casProcCond casEventSys::process ()
{
casEvent *pEvent;
caStatus status;
casProcCond cond = casProcOk;
unsigned long nAccepted = 0u;
unsigned long nAccepted = 0u;
this->mutex.lock();
{
epicsGuard < epicsMutex > guard ( this->mutex );
while (!this->dontProcess) {
while (!this->dontProcess) {
pEvent = this->eventLogQue.get();
if (pEvent==NULL) {
break;
}
casEvent *pEvent = this->eventLogQue.get ();
if ( pEvent == NULL ) {
break;
}
//
// lock must remain on until the event queue
// event is called
//
//
// lock must remain on until the event queue
// event is called
//
status = pEvent->cbFunc(*this);
if (status==S_cas_success) {
/*
* only remove it after it was accepted by the
* client
*/
nAccepted++;
}
else if (status==S_cas_sendBlocked) {
/*
* not accepted so return to the head of the list
* (we will try again later)
*/
this->pushOnToEventQueue(*pEvent);
cond = casProcOk;
break;
}
else if (status==S_cas_disconnect) {
cond = casProcDisconnect;
break;
}
else {
errMessage(status, "- unexpected error processing event");
cond = casProcDisconnect;
break;
}
}
caStatus status = pEvent->cbFunc ( *this );
if ( status == S_cas_success ) {
/*
* only remove it after it was accepted by the
* client
*/
nAccepted++;
}
else if ( status == S_cas_sendBlocked ) {
/*
* not accepted so return to the head of the list
* (we will try again later)
*/
this->pushOnToEventQueue ( *pEvent );
cond = casProcOk;
break;
}
else if ( status == S_cas_disconnect ) {
cond = casProcDisconnect;
break;
}
else {
errMessage ( status,
"- unexpected error processing event" );
cond = casProcDisconnect;
break;
}
}
/*
* call flush function if they provided one
*/
if (nAccepted > 0u) {
this->eventFlush ();
}
this->mutex.unlock();
/*
* call flush function if they provided one
*/
if ( nAccepted > 0u ) {
this->eventFlush ();
}
}
//
// allows the derived class to be informed that it
@@ -162,7 +156,7 @@ casProcCond casEventSys::process()
// the caller may be using the client's "this"
// pointer.
//
if (this->destroyPending) {
if ( this->destroyPending ) {
cond = casProcDisconnect;
}
@@ -174,28 +168,28 @@ casProcCond casEventSys::process()
//
void casEventSys::eventsOn()
{
this->mutex.lock();
{
epicsGuard < epicsMutex > guard ( this->mutex );
//
// allow multiple events for each monitor
//
this->replaceEvents = FALSE;
//
// allow multiple events for each monitor
//
this->replaceEvents = FALSE;
//
// allow the event queue to be processed
//
this->dontProcess = FALSE;
//
// allow the event queue to be processed
//
this->dontProcess = FALSE;
//
// remove purge event if it is still pending
//
if (this->pPurgeEvent != NULL) {
this->eventLogQue.remove (*this->pPurgeEvent);
delete this->pPurgeEvent;
this->pPurgeEvent = NULL;
}
this->mutex.unlock();
//
// remove purge event if it is still pending
//
if ( this->pPurgeEvent != NULL ) {
this->eventLogQue.remove ( *this->pPurgeEvent );
delete this->pPurgeEvent;
this->pPurgeEvent = NULL;
}
}
//
// wakes up the event queue consumer
@@ -208,7 +202,7 @@ void casEventSys::eventsOn()
//
caStatus casEventSys::eventsOff()
{
this->mutex.lock();
epicsGuard < epicsMutex > guard ( this->mutex );
//
// new events will replace the last event on
@@ -221,9 +215,9 @@ caStatus casEventSys::eventsOff()
// only after we have purged the event queue
// for this particular client
//
if (this->pPurgeEvent==NULL) {
if ( this->pPurgeEvent == NULL ) {
this->pPurgeEvent = new casEventPurgeEv;
if (this->pPurgeEvent==NULL) {
if ( this->pPurgeEvent == NULL ) {
//
// if there is no room for the event then immediately
// stop processing and sending events to the client
@@ -232,12 +226,10 @@ caStatus casEventSys::eventsOff()
this->dontProcess = TRUE;
}
else {
this->casEventSys::addToEventQueue(*this->pPurgeEvent);
this->casEventSys::addToEventQueue ( *this->pPurgeEvent );
}
}
this->mutex.unlock();
return S_cas_success;
}
@@ -274,12 +266,13 @@ casRes * casEventSys::lookupRes ( const caResId &, casResType )
//
// casEventPurgeEv::cbFunc()
//
caStatus casEventPurgeEv::cbFunc (casEventSys &evSys)
caStatus casEventPurgeEv::cbFunc ( casEventSys & evSys )
{
evSys.mutex.lock();
evSys.dontProcess = TRUE;
evSys.pPurgeEvent = NULL;
evSys.mutex.unlock();
{
epicsGuard < epicsMutex > guard ( evSys.mutex );
evSys.dontProcess = TRUE;
evSys.pPurgeEvent = NULL;
}
delete this;
+19 -20
View File
@@ -35,18 +35,20 @@ inline casEventSys::casEventSys () :
//
// casEventSys::addToEventQueue()
//
inline void casEventSys::addToEventQueue(casEvent &event)
inline void casEventSys::addToEventQueue ( casEvent & event )
{
this->mutex.lock();
this->eventLogQue.add(event);
this->mutex.unlock();
{
epicsGuard < epicsMutex > guard ( this->mutex );
this->eventLogQue.add ( event );
}
//
// wake up the event queue consumer only if
// we are not supressing events to a client that
// is in flow control
//
if (!this->dontProcess) {
this->eventSignal();
if ( ! this->dontProcess ) {
this->eventSignal ();
}
}
@@ -59,37 +61,34 @@ inline void casEventSys::setDestroyPending()
//
// wakes up the event queue consumer
//
this->eventSignal();
this->eventSignal ();
}
//
// casEventSys::insertEventQueue()
//
inline void casEventSys::insertEventQueue(casEvent &insert, casEvent &prevEvent)
inline void casEventSys::insertEventQueue( casEvent & insert, casEvent & prevEvent )
{
this->mutex.lock();
this->eventLogQue.insertAfter(insert, prevEvent);
this->mutex.unlock();
epicsGuard < epicsMutex > guard ( this->mutex );
this->eventLogQue.insertAfter ( insert, prevEvent );
}
//
// casEventSys::pushOnToEventQueue()
//
inline void casEventSys::pushOnToEventQueue (casEvent &event)
inline void casEventSys::pushOnToEventQueue ( casEvent & event )
{
this->mutex.lock ();
this->eventLogQue.push (event);
this->mutex.unlock ();
epicsGuard < epicsMutex > guard ( this->mutex );
this->eventLogQue.push ( event );
}
//
// casEventSys::removeFromEventQueue()
//
inline void casEventSys::removeFromEventQueue(casEvent &event)
inline void casEventSys::removeFromEventQueue ( casEvent & event )
{
this->mutex.lock();
this->eventLogQue.remove(event);
this->mutex.unlock();
epicsGuard < epicsMutex > guard ( this->mutex );
this->eventLogQue.remove ( event );
}
//
@@ -97,7 +96,7 @@ inline void casEventSys::removeFromEventQueue(casEvent &event)
//
inline bool casEventSys::full() // X aCC 361
{
if (this->replaceEvents || this->eventLogQue.count()>=this->maxLogEntries) {
if ( this->replaceEvents || this->eventLogQue.count() >= this->maxLogEntries ) {
return true;
}
else {
+4 -13
View File
@@ -230,16 +230,13 @@ public:
caServer *getCAS() const;
protected:
casCoreClient &client;
casCoreClient & client;
//
// place notification of IO completion on the event queue
//
caStatus postIOCompletionI();
void lock();
void unlock();
private:
unsigned inTheEventQueue:1;
unsigned posted:1;
@@ -328,9 +325,6 @@ public:
epicsShareFunc virtual casResType resourceType () const;
void lock () const;
void unlock () const;
void destroyNoClientNotify ();
void destroyClientNotify ();
@@ -391,9 +385,9 @@ class casPV;
// casPVI
//
class casPVI :
public tsSLNode<casPVI>, // server resource table installation
public casRes, // server resource table installation
public ioBlockedList // list of clients io blocked on this pv
public tsSLNode<casPVI>, // server resource table installation
public casRes, // server resource table installation
public ioBlockedList // list of clients io blocked on this pv
{
public:
casPVI ();
@@ -480,9 +474,6 @@ private:
unsigned nMonAttached;
unsigned nIOAttached;
bool destroyInProgress;
inline void lock () const;
inline void unlock () const;
epicsShareFunc virtual void destroy (); // casPVI destructor noop
casPVI ( const casPVI & );
+45 -55
View File
@@ -55,22 +55,19 @@ casMonitor::casMonitor(caResId clientIdIn, casChannelI &chan,
//
casMonitor::~casMonitor()
{
casCoreClient &client = this->ciu.getClient();
this->mutex.lock();
epicsGuard < epicsMutex > guard ( this->mutex );
this->disable();
//
// remove from the event system
//
if (this->ovf) {
client.removeFromEventQueue (this->overFlowEvent);
if ( this->ovf ) {
casCoreClient &client = this->ciu.getClient();
client.removeFromEventQueue ( this->overFlowEvent );
}
this->ciu.deleteMonitor(*this);
this->mutex.unlock();
this->ciu.deleteMonitor ( * this );
}
//
@@ -78,18 +75,15 @@ casMonitor::~casMonitor()
//
void casMonitor::enable()
{
caStatus status;
this->mutex.lock();
if (!this->enabled && this->ciu.readAccess()) {
epicsGuard < epicsMutex > guard ( this->mutex );
if ( ! this->enabled && this->ciu.readAccess() ) {
this->enabled = true;
status = this->ciu.getPVI().registerEvent();
if (status) {
errMessage(status,
"Server tool failed to register event\n");
caStatus status = this->ciu.getPVI().registerEvent();
if ( status ) {
errMessage ( status,
"Server tool failed to register event\n" );
}
}
this->mutex.unlock();
}
//
@@ -97,12 +91,11 @@ void casMonitor::enable()
//
void casMonitor::disable()
{
this->mutex.lock();
if (this->enabled) {
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->enabled ) {
this->enabled = false;
this->ciu.getPVI().unregisterEvent();
}
this->mutex.unlock();
}
//
@@ -110,20 +103,18 @@ void casMonitor::disable()
//
void casMonitor::push (const smartConstGDDPointer &pNewValue)
{
epicsGuard < epicsMutex > guard ( this->mutex );
casCoreClient &client = this->ciu.getClient ();
casMonEvent *pLog;
char full;
this->mutex.lock();
client.getCAS().incrEventsPostedCounter ();
//
// get a new block if we havent exceeded quotas
//
full = ( this->nPend >= individualEventEntries )
bool full = ( this->nPend >= individualEventEntries )
|| client.casEventSys::full ();
if (!full) {
casMonEvent * pLog;
if ( ! full ) {
pLog = new casMonEvent (*this, pNewValue);
if (pLog) {
this->nPend++; // X aCC 818
@@ -133,44 +124,42 @@ void casMonitor::push (const smartConstGDDPointer &pNewValue)
pLog = NULL;
}
if (this->ovf) {
if (pLog) {
if ( this->ovf ) {
if ( pLog ) {
//
// swap values
// (ugly - but avoids purify ukn sym type problem)
// (better to create a temp event object)
//
smartConstGDDPointer pValue = this->overFlowEvent.getValue ();
if (!pValue) {
assert (0);
if ( ! pValue ) {
assert ( 0 );
}
this->overFlowEvent = *pLog;
pLog->assign (*this, pValue);
client.insertEventQueue (*pLog, this->overFlowEvent);
pLog->assign ( *this, pValue );
client.insertEventQueue ( *pLog, this->overFlowEvent );
}
else {
//
// replace the value with the current one
//
this->overFlowEvent.assign (*this, pNewValue);
this->overFlowEvent.assign ( *this, pNewValue );
}
client.removeFromEventQueue (this->overFlowEvent);
pLog = &this->overFlowEvent;
client.removeFromEventQueue ( this->overFlowEvent );
pLog = & this->overFlowEvent;
}
else if (!pLog) {
else if ( ! pLog ) {
//
// no log block
// => use the over flow block in the event structure
//
this->ovf = true;
this->overFlowEvent.assign (*this, pNewValue);
this->overFlowEvent.assign ( * this, pNewValue );
this->nPend++; // X aCC 818
pLog = &this->overFlowEvent;
}
client.addToEventQueue (*pLog);
this->mutex.unlock ();
client.addToEventQueue ( * pLog );
}
//
@@ -182,19 +171,20 @@ caStatus casMonitor::executeEvent(casMonEvent *pEV)
smartConstGDDPointer pVal;
pVal = pEV->getValue ();
if (!pVal) {
assert (0);
if ( ! pVal ) {
assert ( 0 );
}
this->mutex.lock ();
status = this->callBack (*pVal);
this->mutex.unlock ();
{
epicsGuard < epicsMutex > guard ( this->mutex );
status = this->callBack ( * pVal );
}
//
// if the event isnt accepted we will try
// again later (and the event returns to the queue)
//
if (status) {
if ( status ) {
return status;
}
@@ -207,8 +197,8 @@ caStatus casMonitor::executeEvent(casMonEvent *pEV)
// delete event object if it isnt a cache entry
// saved in the call back object
//
if (pEV == &this->overFlowEvent) {
assert (this->ovf);
if ( pEV == &this->overFlowEvent ) {
assert ( this->ovf );
this->ovf = false;
pEV->clear();
}
@@ -224,13 +214,13 @@ caStatus casMonitor::executeEvent(casMonEvent *pEV)
//
// casMonitor::show(unsigned level)
//
void casMonitor::show(unsigned level) const
void casMonitor::show ( unsigned level ) const
{
if (level>1u) {
printf(
if ( level > 1u ) {
printf (
"\tmonitor type=%u count=%lu client id=%u enabled=%u OVF=%u nPend=%u\n",
dbrType, nElem, clientId, enabled, ovf, nPend);
this->mask.show(level);
}
dbrType, nElem, clientId, enabled, ovf, nPend );
this->mask.show ( level );
}
}
+16 -24
View File
@@ -37,44 +37,42 @@ casPVI::casPVI () :
//
// casPVI::~casPVI()
//
casPVI::~casPVI()
casPVI::~casPVI ()
{
this->destroyInProgress = true;
//
// only relevant if we are attached to a server
//
if (this->pCAS!=NULL) {
if ( this->pCAS != NULL ) {
this->lock();
epicsGuard < caServerI > guard ( * this->pCAS );
this->pCAS->removeItem(*this);
this->pCAS->removeItem ( *this );
//
// delete any attached channels
//
tsDLIter <casPVListChan> iter = this->chanList.firstIter ();
tsDLIter < casPVListChan > iter = this->chanList.firstIter ();
while ( iter.valid () ) {
tsDLIter<casPVListChan> tmp = iter;
tsDLIter < casPVListChan > tmp = iter;
++tmp;
iter->destroyClientNotify ();
iter = tmp;
}
this->unlock();
}
//
// all outstanding IO should have been deleted
// when we destroyed the channels
//
casVerify (this->nIOAttached==0u);
casVerify ( this->nIOAttached == 0u );
//
// all monitors should have been deleted
// when we destroyed the channels
//
casVerify (this->nMonAttached==0u);
casVerify ( this->nMonAttached == 0u );
}
//
@@ -83,13 +81,11 @@ casPVI::~casPVI()
//
void casPVI::deleteSignal ()
{
caServerI *pLocalCAS = this->pCAS;
//
// if we are not attached to a server then the
// following steps are not relevant
//
if (pLocalCAS) {
if ( this->pCAS ) {
//
// We dont take the PV lock here because
// the PV may be destroyed and we must
@@ -101,18 +97,16 @@ void casPVI::deleteSignal ()
// lock when we add a new channel (and the
// PV lock is realy the server's lock)
//
pLocalCAS->lock();
epicsGuard < caServerI > guard ( * this->pCAS );
if (this->chanList.count()==0u) {
pLocalCAS->removeItem (*this);
if ( this->chanList.count() == 0u ) {
this->pCAS->removeItem ( *this );
this->pCAS = NULL;
this->destroy ();
//
// !! dont access self after destroy !!
//
}
pLocalCAS->unlock();
}
}
@@ -312,15 +306,14 @@ caStatus casPVI::registerEvent ()
{
caStatus status;
this->lock();
epicsGuard < caServerI > guard ( * this->pCAS );
this->nMonAttached++;
if (this->nMonAttached==1u) {
if ( this->nMonAttached == 1u ) {
status = this->interestRegister ();
}
else {
status = S_cas_success;
}
this->unlock();
return status;
}
@@ -330,16 +323,15 @@ caStatus casPVI::registerEvent ()
//
void casPVI::unregisterEvent()
{
this->lock();
epicsGuard < caServerI > guard ( * this->pCAS );
this->nMonAttached--;
//
// Dont call casPV::interestDelete() when we are in
// casPVI::~casPVI() (and casPV no longr exists)
//
if ( this->nMonAttached==0u && !this->destroyInProgress ) {
if ( this->nMonAttached == 0u && !this->destroyInProgress ) {
this->interestDelete();
}
this->unlock();
}
//
+10 -45
View File
@@ -30,56 +30,22 @@ inline caServerI *casPVI::getPCAS() const
return this->pCAS;
}
//
// casPVI::lock()
//
inline void casPVI::lock() const
{
//
// NOTE:
// if this lock becomes something else besides the
// server's lock then look carefully at the
// comment in casPVI::deleteSignal()
//
if (this->pCAS) {
this->pCAS->lock();
}
else {
fprintf (stderr, "PV lock call when not attached to server?\n");
}
}
//
// casPVI::unlock()
//
inline void casPVI::unlock() const
{
if (this->pCAS) {
this->pCAS->unlock();
}
else {
fprintf (stderr, "PV unlock call when not attached to server?\n");
}
}
//
// casPVI::installChannel()
//
inline void casPVI::installChannel(casPVListChan &chan)
inline void casPVI::installChannel ( casPVListChan & chan )
{
this->lock();
epicsGuard < caServerI > guard ( * this->pCAS );
this->chanList.add(chan);
this->unlock();
}
//
// casPVI::removeChannel()
//
inline void casPVI::removeChannel(casPVListChan &chan)
inline void casPVI::removeChannel ( casPVListChan & chan )
{
this->lock();
epicsGuard < caServerI > guard ( * this->pCAS );
this->chanList.remove(chan);
this->unlock();
}
//
@@ -93,11 +59,11 @@ inline void casPVI::unregisterIO()
//
// casPVI::bestDBRType()
//
inline caStatus casPVI::bestDBRType (unsigned &dbrType) // X aCC 361
inline caStatus casPVI::bestDBRType ( unsigned &dbrType ) // X aCC 361
{
unsigned bestAIT = this->bestExternalType();
unsigned bestAIT = this->bestExternalType ();
if (bestAIT<NELEMENTS(gddAitToDbr)&&bestAIT!=aitEnumInvalid) {
if ( bestAIT < NELEMENTS ( gddAitToDbr ) && bestAIT != aitEnumInvalid ) {
dbrType = gddAitToDbr[bestAIT];
return S_cas_success;
}
@@ -117,17 +83,16 @@ inline caStatus casPVI::bestDBRType (unsigned &dbrType) // X aCC 361
//
inline void casPVI::postEvent (const casEventMask &select, const gdd &event)
{
if (this->nMonAttached==0u) {
if ( this->nMonAttached == 0u ) {
return;
}
this->lock();
tsDLIter<casPVListChan> iter = this->chanList.firstIter ();
epicsGuard < caServerI > guard ( * this->pCAS );
tsDLIter < casPVListChan > iter = this->chanList.firstIter ();
while ( iter.valid () ) {
iter->postEvent ( select, event );
++iter;
}
this->unlock();
}
//
+21 -32
View File
@@ -40,7 +40,7 @@ casStrmClient::casStrmClient ( caServerI &serverInternal ) :
this->pHostName = new char [1u];
*this->pHostName = '\0';
this->lock ();
epicsGuard < casCoreClient > guard ( * this );
this->ctx.getServer()->installClient ( this );
@@ -50,8 +50,6 @@ casStrmClient::casStrmClient ( caServerI &serverInternal ) :
throw std::bad_alloc();
}
*this->pUserName= '\0';
this->unlock ();
}
//
@@ -59,7 +57,7 @@ casStrmClient::casStrmClient ( caServerI &serverInternal ) :
//
casStrmClient::~casStrmClient()
{
this->lock();
epicsGuard < casCoreClient > guard ( * this );
//
// remove this from the list of connected clients
@@ -83,8 +81,6 @@ casStrmClient::~casStrmClient()
iter->destroyNoClientNotify();
iter = tmp;
}
this->unlock();
}
//
@@ -855,7 +851,7 @@ caStatus casStrmClient::hostNameAction()
size-1);
pMalloc[size-1]='\0';
this->lock();
epicsGuard < casCoreClient > guard ( * this );
if (this->pHostName) {
delete [] this->pHostName;
@@ -868,8 +864,6 @@ caStatus casStrmClient::hostNameAction()
++iter;
}
this->unlock();
return S_cas_success;
}
@@ -903,7 +897,7 @@ caStatus casStrmClient::clientNameAction()
size-1);
pMalloc[size-1]='\0';
this->lock();
epicsGuard < casCoreClient > guard ( * this );
if (this->pUserName) {
delete [] this->pUserName;
@@ -915,7 +909,6 @@ caStatus casStrmClient::clientNameAction()
iter->setOwner ( this->pUserName, this->pHostName );
++iter;
}
this->unlock();
return S_cas_success;
}
@@ -977,7 +970,7 @@ caStatus casStrmClient::claimChannelAction()
// prevent problems such as the PV being deleted before the
// channel references it
//
this->lock();
epicsGuard < casCoreClient > guard ( * this );
this->asyncIOFlag = 0u;
//
@@ -1005,7 +998,6 @@ caStatus casStrmClient::claimChannelAction()
else {
status = this->createChanResponse(*mp, pvar);
}
this->unlock();
return status;
}
@@ -1293,11 +1285,11 @@ caStatus casStrmClient::eventAddAction ()
if ( status == S_cas_success ) {
pMonitor = new casClientMon(*pciu, mp->m_available,
mp->m_count, mp->m_dataType, mask, *this);
if (!pMonitor) {
status = this->sendErr(mp, ECA_ALLOCMEM, NULL);
if (status==S_cas_success) {
pMonitor = new casClientMon ( *pciu, mp->m_available,
mp->m_count, mp->m_dataType, mask, this->mutex );
if ( ! pMonitor ) {
status = this->sendErr ( mp, ECA_ALLOCMEM, NULL );
if ( status==S_cas_success ) {
//
// If we cant allocate space for a monitor then
// delete (disconnect) the channel
@@ -1485,13 +1477,14 @@ caStatus casStrmClient::readSyncAction()
// any pending asynchronous IO associated with
// a read.
//
this->lock();
tsDLIter <casChannelI> iter = this->chanList.firstIter ();
while ( iter.valid () ) {
iter->clearOutstandingReads ();
++iter;
}
this->unlock();
{
epicsGuard < casCoreClient > guard ( * this );
tsDLIter <casChannelI> iter = this->chanList.firstIter ();
while ( iter.valid () ) {
iter->clearOutstandingReads ();
++iter;
}
}
status = this->out.copyInHeader ( mp->m_cmmd, 0,
mp->m_dataType, mp->m_count,
@@ -1861,10 +1854,9 @@ inline bool caServerI::roomForNewChannel() const
//
void casStrmClient::installChannel(casChannelI &chan)
{
this->lock();
epicsGuard < casCoreClient > guard ( * this );
this->getCAS().installItem (chan);
this->chanList.add(chan);
this->unlock();
}
//
@@ -1872,13 +1864,10 @@ void casStrmClient::installChannel(casChannelI &chan)
//
void casStrmClient::removeChannel(casChannelI &chan)
{
casRes *pRes;
this->lock();
pRes = this->getCAS().removeItem(chan);
epicsGuard < casCoreClient > guard ( * this );
casRes * pRes = this->getCAS().removeItem(chan);
assert (&chan == (casChannelI *)pRes);
this->chanList.remove(chan);
this->unlock();
}
//
+3 -4
View File
@@ -147,18 +147,17 @@ const inBufCtx inBuf::pushCtx ( bufSizeT headerSize, // X aCC 361
//
// inBuf::popCtx ()
//
bufSizeT inBuf::popCtx (const inBufCtx &ctx) // X aCC 361
bufSizeT inBuf::popCtx ( const inBufCtx &ctx ) // X aCC 361
{
if ( ctx.stat==inBufCtx::pushCtxSuccess ) {
this->mutex.lock();
epicsGuard < epicsMutex > guard ( this->mutex );
bufSizeT bytesRemoved = this->nextReadIndex;
this->pBuf = ctx.pBuf;
this->bufSize = ctx.bufSize;
this->bytesInBuffer = ctx.bytesInBuffer;
this->nextReadIndex = ctx.nextReadIndex;
assert (this->ctxRecursCount>0);
assert ( this->ctxRecursCount > 0 );
this->ctxRecursCount--;
this->mutex.unlock();
return bytesRemoved;
}
else {
+19 -21
View File
@@ -217,21 +217,20 @@ outBufClient::flushCondition outBuf::flush ( bufSizeT spaceRequired )
bufSizeT nBytesRequired;
outBufClient::flushCondition cond;
this->mutex.lock();
epicsGuard < epicsMutex > guard ( this->mutex );
if (this->ctxRecursCount>0) {
this->mutex.unlock();
if ( this->ctxRecursCount > 0 ) {
return outBufClient::flushNone;
}
if (spaceRequired>this->bufSize) {
if ( spaceRequired > this->bufSize ) {
nBytesRequired = this->stack;
}
else {
bufSizeT stackPermitted;
stackPermitted = this->bufSize - spaceRequired;
if (this->stack>stackPermitted) {
if ( this->stack > stackPermitted ) {
nBytesRequired = this->stack - stackPermitted;
}
else {
@@ -239,12 +238,12 @@ outBufClient::flushCondition outBuf::flush ( bufSizeT spaceRequired )
}
}
cond = this->client.xSend(this->pBuf, this->stack,
nBytesRequired, nBytes);
if (cond==outBufClient::flushProgress) {
cond = this->client.xSend ( this->pBuf, this->stack,
nBytesRequired, nBytes );
if ( cond == outBufClient::flushProgress ) {
bufSizeT len;
if (nBytes >= this->stack) {
if ( nBytes >= this->stack ) {
this->stack = 0u;
}
else {
@@ -252,40 +251,39 @@ outBufClient::flushCondition outBuf::flush ( bufSizeT spaceRequired )
//
// memmove() is ok with overlapping buffers
//
memmove (this->pBuf, &this->pBuf[nBytes], len);
memmove ( this->pBuf, &this->pBuf[nBytes], len );
this->stack = len;
}
if (this->client.getDebugLevel()>2u) {
if ( this->client.getDebugLevel () > 2u ) {
char buf[64];
this->client.hostName (buf, sizeof(buf));
errlogPrintf ("CAS: Sent a %d byte reply to %s\n",
nBytes, buf);
this->client.hostName ( buf, sizeof ( buf ) );
errlogPrintf ( "CAS: Sent a %d byte reply to %s\n",
nBytes, buf );
}
}
this->mutex.unlock();
return cond;
}
//
// outBuf::pushCtx ()
//
const outBufCtx outBuf::pushCtx (bufSizeT headerSize, // X aCC 361
const outBufCtx outBuf::pushCtx ( bufSizeT headerSize, // X aCC 361
bufSizeT maxBodySize,
void *&pHeader)
void *&pHeader )
{
bufSizeT totalSize = headerSize + maxBodySize;
caStatus status;
status = this->allocRawMsg (totalSize, &pHeader);
if (status!=S_cas_success) {
status = this->allocRawMsg ( totalSize, & pHeader );
if ( status != S_cas_success ) {
return outBufCtx ();
}
else if (this->ctxRecursCount>=UINT_MAX) {
else if ( this->ctxRecursCount >= UINT_MAX ) {
return outBufCtx ();
}
else {
outBufCtx result (*this);
outBufCtx result ( *this );
this->pBuf = this->pBuf + this->stack + headerSize;
this->stack = 0;
this->bufSize = maxBodySize;
+7 -2
View File
@@ -420,7 +420,7 @@ private:
// (this will eventually support direct communication
// between the client lib and the server lib)
//
class casCoreClient : public epicsMutex, public ioBlocked,
class casCoreClient : public ioBlocked,
public casEventSys {
//
@@ -450,6 +450,9 @@ public:
virtual caStatus accessRightsResponse(casChannelI *);
void lock ();
void unlock ();
//
// one virtual function for each CA request type that has
// asynchronous completion
@@ -476,11 +479,13 @@ public:
virtual ca_uint32_t datagramSequenceNumber () const;
protected:
epicsMutex mutex;
casCtx ctx;
unsigned char asyncIOFlag;
private:
tsDLList<casAsyncIOI> ioInProgList;
tsDLList < casAsyncIOI > ioInProgList;
casCoreClient ( const casCoreClient & );
casCoreClient & operator = ( const casCoreClient & );
};