changes allowing asynchronous casPV destroy

This commit is contained in:
Jeff Hill
2003-03-10 22:26:33 +00:00
parent 04f4585891
commit 8efa36b734
48 changed files with 1891 additions and 1833 deletions

View File

@@ -30,7 +30,6 @@ INC += caNetAddr.h
LIBSRCS += caServer.cc
LIBSRCS += caServerI.cc
LIBSRCS += casCoreClient.cc
LIBSRCS += casClient.cc
LIBSRCS += casDGClient.cc
LIBSRCS += casStrmClient.cc
LIBSRCS += casPV.cc
@@ -62,6 +61,7 @@ LIBSRCS += beaconTimer.cc
LIBSRCS += beaconAnomalyGovernor.cc
LIBSRCS += clientBufMemoryManager.cpp
LIBSRCS += chanIntfForPV.cc
LIBSRCS += channelDestroyEvent.cpp
LIBSRCS += casIntfOS.cc
LIBSRCS += casDGIntfOS.cc

View File

@@ -53,10 +53,10 @@ pvCreateReturn caServer::createPV (const casCtx &, const char *)
return S_casApp_pvNotFound;
}
pvAttachReturn caServer::pvAttach (const casCtx &ctx, const char *pAliasName)
pvAttachReturn caServer::pvAttach ( const casCtx &ctx, const char *pAliasName )
{
// remain backwards compatible (call deprecated routine)
return this->createPV(ctx, pAliasName);
return this->createPV ( ctx, pAliasName );
}
casEventMask caServer::registerEvent (const char *pName) // X aCC 361

View File

@@ -250,11 +250,43 @@ void caServerI::casMonitorDestroy ( casMonitor & cm )
this->casMonitorFreeList.release ( & cm );
}
//
// caServerI::dumpMsg()
//
// Debug aid - print the header part of a message.
//
// dp arg allowed to be null
//
//
void caServerI::dumpMsg ( const char * pHostName, const char * pUserName,
const caHdrLargeArray * mp, const void * dp, const char * pFormat, ... )
{
va_list theArgs;
if ( pFormat ) {
va_start ( theArgs, pFormat );
errlogPrintf ( "CAS: " );
errlogVprintf ( pFormat, theArgs );
va_end ( theArgs );
}
fprintf ( stderr,
"CAS Request: %s on %s: cmd=%u cid=%u typ=%u cnt=%u psz=%u avail=%x\n",
pUserName,
pHostName,
mp->m_cmmd,
mp->m_cid,
mp->m_dataType,
mp->m_count,
mp->m_postsize,
mp->m_available);
//if ( mp->m_cmmd == CA_PROTO_WRITE && mp->m_dataType == DBR_STRING && dp ) {
// errlogPrintf("CAS: The string written: %s \n", (char *)dp);
//}
}
casEventRegistry::~casEventRegistry()
{
this->traverse ( &casEventMaskEntry::destroy );
}

View File

@@ -32,10 +32,12 @@
#include "ioBlocked.h"
#include "caServerDefs.h"
class casStrmClient;
class beaconTimer;
class beaconAnomalyGovernor;
class casIntfOS;
class casMonitor;
class casChannelI;
class caServerI :
public caServerIO,
@@ -60,12 +62,16 @@ public:
unsigned subscriptionEventsPosted () const;
void updateEventsPostedCounter ( unsigned nNewPosts );
void generateBeaconAnomaly ();
casMonitor & casMonitorFactory ( casChannelI &,
casMonitor & casMonitorFactory ( casChannelI &,
caResId clientId, const unsigned long count,
const unsigned type, const casEventMask &,
class casMonitorCallbackInterface & );
void casMonitorDestroy ( casMonitor & );
void destroyClient ( casStrmClient & );
static void dumpMsg (
const char * pHostName, const char * pUserName,
const struct caHdrLargeArray * mp, const void * dp,
const char * pFormat, ... );
private:
clientBufMemoryManager clientBufMemMgr;
tsFreeList < casMonitor, 1024 > casMonitorFreeList;
@@ -84,8 +90,8 @@ private:
casEventMask logEvent; // DBE_LOG registerEvent("log")
casEventMask alarmEvent; // DBE_ALARM registerEvent("alarm")
caStatus attachInterface (const caNetAddr &addr, bool autoBeaconAddr,
bool addConfigAddr);
caStatus attachInterface ( const caNetAddr & addr, bool autoBeaconAddr,
bool addConfigAddr );
void sendBeacon ( ca_uint32_t beaconNo );

View File

@@ -1,173 +0,0 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef caServerIIL_h
#define caServerIIL_h
#ifdef epicsExportSharedSymbols
#define caServerIIL_h_epicsExportSharedSymbols
#undef epicsExportSharedSymbols
#endif
#include "epicsGuard.h"
#ifdef caServerIIL_h_epicsExportSharedSymbols
#define epicsExportSharedSymbols
#endif
//
// caServerI::getAdapter()
//
inline caServer *caServerI::getAdapter()
{
return &this->adapter;
}
//
// call virtual function in the interface class
//
inline caServer * caServerI::operator -> ()
{
return this->getAdapter();
}
//
// caServerI::installItem()
//
inline void caServerI::installItem(casRes &res)
{
this->chronIntIdResTable<casRes>::add(res);
}
//
// caServerI::removeItem()
//
inline casRes *caServerI::removeItem(casRes &res)
{
return this->chronIntIdResTable<casRes>::remove(res);
}
//
// caServerI::setDebugLevel()
//
inline void caServerI::setDebugLevel(unsigned debugLevelIn)
{
this->debugLevel = debugLevelIn;
}
//
// casEventMask caServerI::valueEventMask()
//
inline casEventMask caServerI::valueEventMask() const
{
return this->valueEvent;
}
//
// caServerI::logEventMask()
//
inline casEventMask caServerI::logEventMask() const
{
return this->logEvent;
}
//
// caServerI::alarmEventMask()
//
inline casEventMask caServerI::alarmEventMask() const
{
return this->alarmEvent;
}
//
// caServerI::subscriptionEventsProcessedCounter () const
//
inline unsigned caServerI::subscriptionEventsProcessed () const
{
return this->nEventsProcessed;
}
//
// caServerI::incrEventsProcessedCounter ()
//
inline void caServerI::incrEventsProcessedCounter ()
{
this->nEventsProcessed++;
}
//
// caServerI::subscriptionEventsPosted () const
//
inline unsigned caServerI::subscriptionEventsPosted () const
{
return this->nEventsPosted;
}
//
// caServerI::incEventsPostedCounter ()
//
inline void caServerI::incrEventsPostedCounter ()
{
this->nEventsPosted++;
}
inline void caServerI::lock () const
{
this->mutex.lock ();
}
inline void caServerI::unlock () const
{
this->mutex.unlock ();
}
inline casMonEvent & caServerI::casMonEventFactory ( casMonitor & monitor,
const gdd & pNewValue )
{
return * new ( this->casMonEventFreeList ) casMonEvent ( monitor, pNewValue );
}
inline void caServerI::casMonEventDestroy ( casMonEvent & monEvent )
{
monEvent.~casMonEvent ();
this->casMonEventFreeList.release ( & monEvent );
}
inline casMonitor & caServerI::casMonitorFactory (
casChannelI & chan, caResId clientId,
const unsigned long count, const unsigned type,
const casEventMask & mask,
casMonitorCallbackInterface & cb )
{
casMonitor * pMon =
new ( this->casMonitorFreeList ) casMonitor
( clientId, chan, count, type, mask, cb );
this->installItem ( *pMon );
return *pMon;
}
inline void caServerI::casMonitorDestroy ( casMonitor & cm )
{
casRes * pRes = this->removeItem ( cm );
assert ( & cm == ( casMonitor * ) pRes );
cm.~casMonitor ();
this->casMonitorFreeList.release ( & cm );
}
#endif // caServerIIL_h

View File

@@ -75,18 +75,15 @@ casAsyncIOI::~casAsyncIOI()
// o clients lock is applied when calling this
//
caStatus casAsyncIOI::cbFunc (
casCoreClient &, epicsGuard < epicsMutex > & guard )
casCoreClient &,
epicsGuard < casClientMutex > & clientGuard,
epicsGuard < evSysMutex > & evGuard )
{
caStatus status = S_cas_success;
{
this->inTheEventQueue = false;
{
epicsGuardRelease < epicsMutex > unlocker ( guard );
status = this->cbFuncAsyncIO ();
}
status = this->cbFuncAsyncIO ( clientGuard );
if ( status == S_cas_sendBlocked ) {
//
// causes this op to be pushed back on the queue

View File

@@ -23,9 +23,6 @@
#include "caHdrLargeArray.h"
#include "casCoreClient.h"
class epicsMutex;
template < class MUTEX > class epicsGuard;
class casAsyncIOI :
public tsDLNode < casAsyncIOI >,
public casEvent {
@@ -47,13 +44,17 @@ private:
// casEvent virtual call back function
// (called when IO completion event reaches top of event queue)
//
epicsShareFunc caStatus cbFunc ( casCoreClient &, epicsGuard < epicsMutex > & );
epicsShareFunc caStatus cbFunc (
casCoreClient &,
epicsGuard < casClientMutex > &,
epicsGuard < evSysMutex > & );
//
// derived class specific call back
// (called when IO completion event reaches top of event queue)
//
epicsShareFunc virtual caStatus cbFuncAsyncIO () = 0;
epicsShareFunc virtual caStatus cbFuncAsyncIO (
epicsGuard < casClientMutex > & ) = 0;
casAsyncIOI ( const casAsyncIOI & );
casAsyncIOI & operator = ( const casAsyncIOI & );

View File

@@ -33,13 +33,15 @@ caStatus casAsyncPVAttachIOI::postIOCompletion ( const pvAttachReturn & retValIn
return this->insertEventQueue ();
}
caStatus casAsyncPVAttachIOI::cbFuncAsyncIO ()
caStatus casAsyncPVAttachIOI::cbFuncAsyncIO (
epicsGuard < casClientMutex > & guard )
{
caStatus status;
switch ( this->msg.m_cmmd ) {
case CA_PROTO_CLAIM_CIU:
status = this->client.createChanResponse ( this->msg, this->retVal );
status = this->client.createChanResponse ( guard,
this->msg, this->retVal );
if ( status == S_cas_sendBlocked ) {
return status;
}

View File

@@ -32,7 +32,7 @@ private:
class casAsyncPVAttachIO & asyncPVAttachIO;
pvAttachReturn retVal;
caStatus cbFuncAsyncIO ();
caStatus cbFuncAsyncIO ( epicsGuard < casClientMutex > & );
casAsyncPVAttachIOI ( const casAsyncPVAttachIOI & );
casAsyncPVAttachIOI & operator = ( const casAsyncPVAttachIOI & );
};

View File

@@ -39,7 +39,8 @@ caStatus casAsyncPVExistIOI::postIOCompletion (
return this->insertEventQueue ();
}
caStatus casAsyncPVExistIOI::cbFuncAsyncIO ()
caStatus casAsyncPVExistIOI::cbFuncAsyncIO (
epicsGuard < casClientMutex > & guard )
{
caStatus status;
@@ -48,7 +49,7 @@ caStatus casAsyncPVExistIOI::cbFuncAsyncIO ()
// pass output DG address parameters
//
status = this->client.asyncSearchResponse (
this->dgOutAddr, this->msg, this->retVal,
guard, this->dgOutAddr, this->msg, this->retVal,
this->protocolRevision, this->sequenceNumber );
if ( status == S_cas_sendBlocked ) {
return status;

View File

@@ -35,7 +35,7 @@ private:
const ca_uint16_t protocolRevision;
const ca_uint32_t sequenceNumber;
caStatus cbFuncAsyncIO ();
caStatus cbFuncAsyncIO ( epicsGuard < casClientMutex > & );
casAsyncPVExistIOI ( const casAsyncPVExistIOI & );
casAsyncPVExistIOI & operator = ( const casAsyncPVExistIOI & );
};

View File

@@ -51,26 +51,28 @@ bool casAsyncReadIOI::oneShotReadOP () const
return true; // it is a read op
}
caStatus casAsyncReadIOI::cbFuncAsyncIO ()
caStatus casAsyncReadIOI::cbFuncAsyncIO (
epicsGuard < casClientMutex > & guard )
{
caStatus status;
switch ( this->msg.m_cmmd ) {
case CA_PROTO_READ:
status = client.readResponse ( &this->chan, this->msg,
*this->pDD, this->completionStatus);
status = client.readResponse (
guard, & this->chan, this->msg,
* this->pDD, this->completionStatus );
break;
case CA_PROTO_READ_NOTIFY:
status = client.readNotifyResponse ( &this->chan,
this->msg, *this->pDD,
this->completionStatus);
status = client.readNotifyResponse (
guard, & this->chan, this->msg, * this->pDD,
this->completionStatus );
break;
case CA_PROTO_EVENT_ADD:
status = client.monitorResponse ( this->chan,
this->msg, *this->pDD,
this->completionStatus);
status = client.monitorResponse (
guard, this->chan, this->msg, * this->pDD,
this->completionStatus );
break;
case CA_PROTO_CLAIM_CIU:
@@ -78,7 +80,8 @@ caStatus casAsyncReadIOI::cbFuncAsyncIO ()
status = this->chan.getPVI().bestDBRType ( nativeTypeDBR );
if ( status ) {
errMessage ( status, "best external dbr type fetch failed" );
status = client.channelCreateFailedResp ( this->msg, status );
status = client.channelCreateFailedResp (
guard, this->msg, status );
if ( status != S_cas_sendBlocked ) {
delete & this->chan;
}
@@ -94,8 +97,8 @@ caStatus casAsyncReadIOI::cbFuncAsyncIO ()
"unable to read application type \"enums\" string"
" conversion table for enumerated PV" );
}
status = client.enumPostponedCreateChanResponse ( this->chan,
this->msg, nativeTypeDBR );
status = client.enumPostponedCreateChanResponse (
guard, this->chan, this->msg, nativeTypeDBR );
this->createChannelWasSuccessful =
( status == S_cas_success );
}

View File

@@ -39,7 +39,8 @@ private:
caStatus completionStatus;
bool createChannelWasSuccessful;
epicsShareFunc bool oneShotReadOP () const;
epicsShareFunc caStatus cbFuncAsyncIO ();
epicsShareFunc caStatus cbFuncAsyncIO (
epicsGuard < casClientMutex > & );
casAsyncReadIOI ( const casAsyncReadIOI & );
casAsyncReadIOI & operator = ( const casAsyncReadIOI & );
};

View File

@@ -37,18 +37,19 @@ caStatus casAsyncWriteIOI::postIOCompletion ( caStatus completionStatusIn )
return this->insertEventQueue ();
}
caStatus casAsyncWriteIOI::cbFuncAsyncIO ()
caStatus casAsyncWriteIOI::cbFuncAsyncIO (
epicsGuard < casClientMutex > & guard )
{
caStatus status;
switch ( this->msg.m_cmmd ) {
case CA_PROTO_WRITE:
status = client.writeResponse ( this->chan,
status = client.writeResponse ( guard, this->chan,
this->msg, this->completionStatus );
break;
case CA_PROTO_WRITE_NOTIFY:
status = client.writeNotifyResponse ( this->chan,
status = client.writeNotifyResponse ( guard, this->chan,
this->msg, this->completionStatus );
break;

View File

@@ -32,7 +32,8 @@ private:
class casAsyncWriteIO & asyncWriteIO;
class casChannelI & chan;
caStatus completionStatus;
caStatus cbFuncAsyncIO ();
caStatus cbFuncAsyncIO (
epicsGuard < casClientMutex > & );
casAsyncWriteIOI ( const casAsyncWriteIOI & );
casAsyncWriteIOI & operator = ( const casAsyncWriteIOI & );
};

View File

@@ -20,12 +20,21 @@
#include "casChannelI.h"
casChannel::casChannel ( const casCtx & ctx ) :
pChanI ( new casChannelI ( *this, ctx ) )
pChanI ( 0 )
{
}
casChannel::~casChannel ()
{
if ( this->pChanI ) {
this->pChanI->casChannelDestroyNotify ( true );
}
}
void casChannel::destroyRequest ()
{
this->pChanI = 0;
this->destroy ();
}
casPV * casChannel::getPV () // X aCC 361
@@ -39,8 +48,8 @@ casPV * casChannel::getPV () // X aCC 361
}
}
void casChannel::setOwner(const char * const /* pUserName */,
const char * const /* pHostName */)
void casChannel::setOwner ( const char * const /* pUserName */,
const char * const /* pHostName */ )
{
//
// NOOP
@@ -52,34 +61,34 @@ bool casChannel::readAccess () const
return true;
}
bool casChannel::writeAccess() const
bool casChannel::writeAccess () const
{
return true;
}
bool casChannel::confirmationRequested() const
bool casChannel::confirmationRequested () const
{
return false;
}
void casChannel::show(unsigned level) const
void casChannel::show ( unsigned level ) const
{
if (level>2u) {
printf("casChannel: read access = %d\n",
this->readAccess());
printf("casChannel: write access = %d\n",
this->writeAccess());
printf("casChannel: confirmation requested = %d\n",
this->confirmationRequested());
if ( level > 2u ) {
printf ( "casChannel: read access = %d\n",
this->readAccess() );
printf ( "casChannel: write access = %d\n",
this->writeAccess() );
printf ( "casChannel: confirmation requested = %d\n",
this->confirmationRequested() );
}
}
void casChannel::destroy()
void casChannel::destroy ()
{
delete this;
}
void casChannel::postAccessRightsEvent()
void casChannel::postAccessRightsEvent ()
{
if ( this->pChanI ) {
this->pChanI->postAccessRightsEvent ();

View File

@@ -20,9 +20,9 @@
#include "casAsyncIOI.h"
casChannelI::casChannelI ( casChannel & chanIn, const casCtx & ctx ) :
chanForPV ( *ctx.getClient() ), pv ( *ctx.getPV() ),
chanIntfForPV ( *ctx.getClient() ), pv ( *ctx.getPV() ),
chan ( chanIn ), cid ( ctx.getMsg()->m_cid ),
accessRightsEvPending ( false )
serverDeletePending ( false ), accessRightsEvPending ( false )
{
}
@@ -30,7 +30,8 @@ casChannelI::~casChannelI ()
{
this->pv.destroyAllIO ( this->ioList );
this->chan.destroy ();
this->serverDeletePending = true;
this->chan.destroyRequest ();
// force PV delete if this is the last channel attached
this->pv.deleteSignal ();
@@ -39,7 +40,7 @@ casChannelI::~casChannelI ()
void casChannelI::uninstallFromPV ( casEventSys & eventSys )
{
tsDLList < casMonitor > dest;
this->chanForPV.removeSelfFromPV ( this->pv, dest );
this->removeSelfFromPV ( this->pv, dest );
while ( casMonitor * pMon = dest.get () ) {
eventSys.prepareMonitorForDestroy ( *pMon );
}
@@ -50,18 +51,20 @@ void casChannelI::show ( unsigned level ) const
printf ( "casChannelI: client id %u PV %s\n",
this->cid, this->pv.getName() );
if ( level > 0 ) {
this->chanForPV.show ( level - 1 );
this->chanIntfForPV::show ( level - 1 );
this->chan.show ( level - 1 );
}
}
caStatus casChannelI::cbFunc (
casCoreClient &, epicsGuard < epicsMutex > & guard )
casCoreClient &,
epicsGuard < casClientMutex > & clientGuard,
epicsGuard < evSysMutex > & evGuard )
{
caStatus stat = S_cas_success;
{
epicsGuardRelease < epicsMutex > guardRelease ( guard );
stat = this->chanForPV.client().accessRightsResponse ( this );
stat = this->client().accessRightsResponse (
clientGuard, this );
}
if ( stat == S_cas_success ) {
this->accessRightsEvPending = false;
@@ -69,7 +72,3 @@ caStatus casChannelI::cbFunc (
return stat;
}
void casChannelI::destroy()
{
}

View File

@@ -28,18 +28,20 @@ class casMonitor;
class casAsyncIOI;
class casChannelI : public tsDLNode < casChannelI >,
public chronIntIdRes < casChannelI >, public casEvent {
public chronIntIdRes < casChannelI >, public casEvent,
private chanIntfForPV {
public:
casChannelI ( casChannel & chan, const casCtx & ctx );
epicsShareFunc virtual ~casChannelI ();
~casChannelI ();
void casChannelDestroyNotify ( bool immediateUninstall );
const caResId getCID ();
const caResId getSID ();
void uninstallFromPV ( casEventSys & eventSys );
void installIntoPV ();
void installMonitor ( casMonitor & mon );
casMonitor * removeMonitor ( ca_uint32_t monId );
void installIO ( casAsyncIOI & );
void uninstallIO ( casAsyncIOI & );
void installMonitor ( casMonitor & mon );
casMonitor * removeMonitor ( ca_uint32_t clientIdIn );
casPVI & getPVI () const;
void clearOutstandingReads ();
void postAccessRightsEvent ();
@@ -52,14 +54,16 @@ public:
void show ( unsigned level ) const;
private:
tsDLList < casAsyncIOI > ioList;
chanIntfForPV chanForPV;
casPVI & pv;
casChannel & chan;
caResId cid; // client id
bool serverDeletePending;
bool accessRightsEvPending;
epicsShareFunc virtual void destroy ();
epicsShareFunc caStatus cbFunc (
casCoreClient &, epicsGuard < epicsMutex > & guard );
//epicsShareFunc virtual void destroy ();
caStatus cbFunc (
casCoreClient &,
epicsGuard < casClientMutex > &,
epicsGuard < evSysMutex > & );
casChannelI ( const casChannelI & );
casChannelI & operator = ( const casChannelI & );
};
@@ -81,7 +85,7 @@ inline const caResId casChannelI::getSID ()
inline void casChannelI::postAccessRightsEvent ()
{
this->chanForPV.client().addToEventQueue ( *this, this->accessRightsEvPending );
this->client().addToEventQueue ( *this, this->accessRightsEvPending );
}
inline const gddEnumStringTable & casChannelI::enumStringTable () const
@@ -91,12 +95,7 @@ inline const gddEnumStringTable & casChannelI::enumStringTable () const
inline void casChannelI::installIntoPV ()
{
this->pv.installChannel ( this->chanForPV );
}
inline void casChannelI::installMonitor ( casMonitor & mon )
{
this->chanForPV.installMonitor ( this->pv, mon );
this->pv.installChannel ( *this );
}
inline void casChannelI::clearOutstandingReads ()
@@ -125,11 +124,6 @@ inline bool casChannelI::confirmationRequested () const
return this->chan.confirmationRequested ();
}
inline casMonitor * casChannelI::removeMonitor ( ca_uint32_t clientIdIn )
{
return this->chanForPV.removeMonitor ( this->pv, clientIdIn );
}
inline void casChannelI::installIO ( casAsyncIOI & io )
{
this->pv.installIO ( this->ioList, io );
@@ -140,4 +134,24 @@ inline void casChannelI::uninstallIO ( casAsyncIOI & io )
this->pv.uninstallIO ( this->ioList, io );
}
inline void casChannelI::casChannelDestroyNotify (
bool immediateUninstall )
{
if ( ! this->serverDeletePending ) {
this->client().casChannelDestroyNotify (
*this, immediateUninstall );
}
}
inline void casChannelI::installMonitor ( casMonitor & mon )
{
this->chanIntfForPV::installMonitor ( this->pv, mon );
}
inline casMonitor * casChannelI::removeMonitor (
ca_uint32_t clientIdIn )
{
return this->chanIntfForPV::removeMonitor ( this->pv, clientIdIn );
}
#endif // casChannelIh

View File

@@ -1,548 +0,0 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#include <stdarg.h>
#include "caerr.h"
#include "osiWireFormat.h"
#include "db_access.h"
#define epicsExportSharedSymbols
#include "casClient.h"
static const caHdr nill_msg = {0u,0u,0u,0u,0u,0u};
//
// static declartions for class casClient
//
bool casClient::msgHandlersInit;
casClient::pCASMsgHandler casClient::msgHandlers[CA_PROTO_LAST_CMMD+1u];
//
// casClient::casClient()
//
casClient::casClient ( caServerI & serverInternal,
clientBufMemoryManager & mgrIn, bufSizeT ioSizeMinIn ) :
casCoreClient ( serverInternal ),
in ( *this, mgrIn, ioSizeMinIn ),
out ( *this, mgrIn ),
minor_version_number ( 0 ),
incommingBytesToDrain ( 0 )
{
//
// static member init
//
casClient::loadProtoJumpTable();
}
//
// casClient::loadProtoJumpTable()
//
void casClient::loadProtoJumpTable()
{
//
// Load the static protocol handler tables
//
if ( casClient::msgHandlersInit ) {
return;
}
//
// Request Protocol Jump Table
// (use of & here is more portable)
//
casClient::msgHandlers[CA_PROTO_VERSION] =
&casClient::versionAction;
casClient::msgHandlers[CA_PROTO_EVENT_ADD] =
&casClient::eventAddAction;
casClient::msgHandlers[CA_PROTO_EVENT_CANCEL] =
&casClient::eventCancelAction;
casClient::msgHandlers[CA_PROTO_READ] =
&casClient::readAction;
casClient::msgHandlers[CA_PROTO_WRITE] =
&casClient::writeAction;
casClient::msgHandlers[CA_PROTO_SNAPSHOT] =
&casClient::uknownMessageAction;
casClient::msgHandlers[CA_PROTO_SEARCH] =
&casClient::searchAction;
casClient::msgHandlers[CA_PROTO_BUILD] =
&casClient::ignoreMsgAction;
casClient::msgHandlers[CA_PROTO_EVENTS_OFF] =
&casClient::eventsOffAction;
casClient::msgHandlers[CA_PROTO_EVENTS_ON] =
&casClient::eventsOnAction;
casClient::msgHandlers[CA_PROTO_READ_SYNC] =
&casClient::readSyncAction;
casClient::msgHandlers[CA_PROTO_ERROR] =
&casClient::uknownMessageAction;
casClient::msgHandlers[CA_PROTO_CLEAR_CHANNEL] =
&casClient::clearChannelAction;
casClient::msgHandlers[CA_PROTO_RSRV_IS_UP] =
&casClient::uknownMessageAction;
casClient::msgHandlers[CA_PROTO_NOT_FOUND] =
&casClient::uknownMessageAction;
casClient::msgHandlers[CA_PROTO_READ_NOTIFY] =
&casClient::readNotifyAction;
casClient::msgHandlers[CA_PROTO_READ_BUILD] =
&casClient::ignoreMsgAction;
casClient::msgHandlers[REPEATER_CONFIRM] =
&casClient::uknownMessageAction;
casClient::msgHandlers[CA_PROTO_CLAIM_CIU] =
&casClient::claimChannelAction;
casClient::msgHandlers[CA_PROTO_WRITE_NOTIFY] =
&casClient::writeNotifyAction;
casClient::msgHandlers[CA_PROTO_CLIENT_NAME] =
&casClient::clientNameAction;
casClient::msgHandlers[CA_PROTO_HOST_NAME] =
&casClient::hostNameAction;
casClient::msgHandlers[CA_PROTO_ACCESS_RIGHTS] =
&casClient::uknownMessageAction;
casClient::msgHandlers[CA_PROTO_ECHO] =
&casClient::echoAction;
casClient::msgHandlers[REPEATER_REGISTER] =
&casClient::uknownMessageAction;
casClient::msgHandlers[CA_PROTO_CLAIM_CIU_FAILED] =
&casClient::uknownMessageAction;
casClient::msgHandlersInit = true;
}
//
// casClient::~casClient ()
//
casClient::~casClient ()
{
}
//
// casClient::show()
//
void casClient::show (unsigned level) const
{
printf ( "casClient at %p\n",
static_cast <const void *> ( this ) );
this->casCoreClient::show (level);
this->in.show (level);
this->out.show (level);
}
//
// casClient::processMsg ()
//
caStatus casClient::processMsg ()
{
int status = S_cas_success;
try {
// drain message that does not fit
if ( this->incommingBytesToDrain ) {
unsigned bytesLeft = this->in.bytesPresent();
if ( bytesLeft < this->incommingBytesToDrain ) {
this->in.removeMsg ( bytesLeft );
this->incommingBytesToDrain -= bytesLeft;
return S_cas_success;
}
else {
this->in.removeMsg ( this->incommingBytesToDrain );
this->incommingBytesToDrain = 0u;
}
}
//
// process any messages in the in buffer
//
unsigned bytesLeft;
while ( ( bytesLeft = this->in.bytesPresent() ) ) {
caHdrLargeArray msgTmp;
unsigned msgSize;
ca_uint32_t hdrSize;
char * rawMP;
{
//
// copy as raw bytes in order to avoid
// alignment problems
//
caHdr smallHdr;
if ( bytesLeft < sizeof ( smallHdr ) ) {
break;
}
rawMP = this->in.msgPtr ();
memcpy ( & smallHdr, rawMP, sizeof ( smallHdr ) );
ca_uint32_t payloadSize = epicsNTOH16 ( smallHdr.m_postsize );
ca_uint32_t nElem = epicsNTOH16 ( smallHdr.m_count );
if ( payloadSize != 0xffff && nElem != 0xffff ) {
hdrSize = sizeof ( smallHdr );
}
else {
ca_uint32_t LWA[2];
hdrSize = sizeof ( smallHdr ) + sizeof ( LWA );
if ( bytesLeft < hdrSize ) {
break;
}
//
// copy as raw bytes in order to avoid
// alignment problems
//
memcpy ( LWA, rawMP + sizeof ( caHdr ), sizeof( LWA ) );
payloadSize = epicsNTOH32 ( LWA[0] );
nElem = epicsNTOH32 ( LWA[1] );
}
msgTmp.m_cmmd = epicsNTOH16 ( smallHdr.m_cmmd );
msgTmp.m_postsize = payloadSize;
msgTmp.m_dataType = epicsNTOH16 ( smallHdr.m_dataType );
msgTmp.m_count = nElem;
msgTmp.m_cid = epicsNTOH32 ( smallHdr.m_cid );
msgTmp.m_available = epicsNTOH32 ( smallHdr.m_available );
msgSize = hdrSize + payloadSize;
if ( bytesLeft < msgSize ) {
if ( msgSize > this->in.bufferSize() ) {
this->in.expandBuffer ();
// msg to large - set up message drain
if ( msgSize > this->in.bufferSize() ) {
this->dumpMsg ( & msgTmp, 0,
"The client requested transfer is greater than available "
"memory in server or EPICS_CA_MAX_ARRAY_BYTES\n" );
status = this->sendErr ( & msgTmp, invalidResID, ECA_TOLARGE,
"client's request didnt fit within the CA server's message buffer" );
this->in.removeMsg ( bytesLeft );
this->incommingBytesToDrain = msgSize - bytesLeft;
}
}
break;
}
this->ctx.setMsg ( msgTmp, rawMP + hdrSize );
if ( this->getCAS().getDebugLevel() > 2u ) {
this->dumpMsg ( & msgTmp, rawMP + hdrSize, 0 );
}
}
//
// Reset the context to the default
// (guarantees that previous message does not get mixed
// up with the current message)
//
this->ctx.setChannel ( NULL );
this->ctx.setPV ( NULL );
//
// Call protocol stub
//
pCASMsgHandler pHandler;
if ( msgTmp.m_cmmd < NELEMENTS ( casClient::msgHandlers ) ) {
pHandler = this->casClient::msgHandlers[msgTmp.m_cmmd];
}
else {
pHandler = & casClient::uknownMessageAction;
}
status = ( this->*pHandler ) ();
if ( status ) {
break;
}
this->in.removeMsg ( msgSize );
}
}
catch ( std::bad_alloc & ) {
status = this->sendErr (
this->ctx.getMsg(), invalidResID, ECA_ALLOCMEM,
"inablility to allocate memory in "
"the server disconnected client" );
status = S_cas_noMemory;
}
catch ( std::exception & except ) {
status = this->sendErr (
this->ctx.getMsg(), invalidResID, ECA_INTERNAL,
"C++ exception \"%s\" in server "
"diconnected client",
except.what () );
status = S_cas_internal;
}
catch (...) {
status = this->sendErr (
this->ctx.getMsg(), invalidResID, ECA_INTERNAL,
"unexpected C++ exception in server "
"diconnected client" );
status = S_cas_internal;
}
return status;
}
/*
* casClient::ignoreMsgAction()
*/
caStatus casClient::ignoreMsgAction ()
{
return S_cas_success;
}
//
// what gets called if the derived class does not supply a
// message handler for the message type (and it isnt a generic
// message)
//
caStatus casClient::eventAddAction ()
{return this->uknownMessageAction ();}
caStatus casClient::eventCancelAction ()
{return this->uknownMessageAction ();}
caStatus casClient::readAction ()
{return this->uknownMessageAction ();}
caStatus casClient::readNotifyAction ()
{return this->uknownMessageAction ();}
caStatus casClient::writeAction ()
{return this->uknownMessageAction ();}
caStatus casClient::searchAction ()
{return this->uknownMessageAction ();}
caStatus casClient::eventsOffAction ()
{return this->uknownMessageAction ();}
caStatus casClient::eventsOnAction ()
{return this->uknownMessageAction ();}
caStatus casClient::readSyncAction ()
{return this->uknownMessageAction ();}
caStatus casClient::clearChannelAction ()
{return this->uknownMessageAction ();}
caStatus casClient::claimChannelAction ()
{return this->uknownMessageAction ();}
caStatus casClient::writeNotifyAction ()
{return this->uknownMessageAction ();}
caStatus casClient::clientNameAction ()
{return this->uknownMessageAction ();}
caStatus casClient::hostNameAction ()
{return this->uknownMessageAction ();}
//
// echoAction()
//
caStatus casClient::echoAction ()
{
const caHdrLargeArray * mp = this->ctx.getMsg();
const void * dp = this->ctx.getData();
void * pPayloadOut;
epicsGuard < epicsMutex > guard ( this->mutex );
caStatus status = this->out.copyInHeader ( mp->m_cmmd, mp->m_postsize,
mp->m_dataType, mp->m_count, mp->m_cid, mp->m_available,
& pPayloadOut );
if ( ! status ) {
memcpy ( pPayloadOut, dp, mp->m_postsize );
this->out.commitMsg ();
}
return S_cas_success;
}
/*
* casClient::versionAction()
*/
caStatus casClient::versionAction ()
{
return S_cas_success;
}
// send minor protocol revision to the client
void casClient::sendVersion ()
{
epicsGuard < epicsMutex > guard ( this->mutex );
caStatus status = this->out.copyInHeader ( CA_PROTO_VERSION, 0,
0, CA_MINOR_PROTOCOL_REVISION, 0, 0, 0 );
if ( ! status ) {
this->out.commitMsg ();
}
}
//
// casClient::sendErr()
//
caStatus casClient::sendErr ( const caHdrLargeArray *curp,
ca_uint32_t cid, const int reportedStatus, const char *pformat, ... )
{
unsigned stringSize;
char msgBuf[1024]; /* allocate plenty of space for the message string */
if ( pformat ) {
va_list args;
va_start ( args, pformat );
int status = vsprintf (msgBuf, pformat, args);
if ( status < 0 ) {
errPrintf (S_cas_internal, __FILE__, __LINE__,
"bad sendErr(%s)", pformat);
stringSize = 0u;
}
else {
stringSize = 1u + (unsigned) status;
}
}
else {
stringSize = 0u;
}
unsigned hdrSize = sizeof ( caHdr );
if ( ( curp->m_postsize >= 0xffff || curp->m_count >= 0xffff ) &&
CA_V49( this->minor_version_number ) ) {
hdrSize += 2 * sizeof ( ca_uint32_t );
}
caHdr * pReqOut;
epicsGuard < epicsMutex > guard ( this->mutex );
caStatus status = this->out.copyInHeader ( CA_PROTO_ERROR,
hdrSize + stringSize, 0, 0, cid, reportedStatus,
reinterpret_cast <void **> ( & pReqOut ) );
if ( ! status ) {
char * pMsgString;
/*
* copy back the request protocol
* (in network byte order)
*/
if ( ( curp->m_postsize >= 0xffff || curp->m_count >= 0xffff ) &&
CA_V49( this->minor_version_number ) ) {
ca_uint32_t *pLW = ( ca_uint32_t * ) ( pReqOut + 1 );
pReqOut->m_cmmd = htons ( curp->m_cmmd );
pReqOut->m_postsize = htons ( 0xffff );
pReqOut->m_dataType = htons ( curp->m_dataType );
pReqOut->m_count = htons ( 0u );
pReqOut->m_cid = htonl ( curp->m_cid );
pReqOut->m_available = htonl ( curp->m_available );
pLW[0] = htonl ( curp->m_postsize );
pLW[1] = htonl ( curp->m_count );
pMsgString = ( char * ) ( pLW + 2 );
}
else {
pReqOut->m_cmmd = htons (curp->m_cmmd);
pReqOut->m_postsize = htons ( ( (ca_uint16_t) curp->m_postsize ) );
pReqOut->m_dataType = htons (curp->m_dataType);
pReqOut->m_count = htons ( ( (ca_uint16_t) curp->m_count ) );
pReqOut->m_cid = htonl (curp->m_cid);
pReqOut->m_available = htonl (curp->m_available);
pMsgString = ( char * ) ( pReqOut + 1 );
}
/*
* add their context string into the protocol
*/
memcpy ( pMsgString, msgBuf, stringSize );
this->out.commitMsg ();
}
return S_cas_success;
}
/*
* casClient::sendErrWithEpicsStatus()
*
* same as sendErr() except that we convert epicsStatus
* to a string and send that additional detail
*/
caStatus casClient::sendErrWithEpicsStatus ( const caHdrLargeArray * pMsg,
ca_uint32_t cid, caStatus epicsStatus, caStatus clientStatus )
{
char buf[0x1ff];
errSymLookup ( epicsStatus, buf, sizeof(buf) );
return this->sendErr ( pMsg, cid, clientStatus, buf );
}
/*
* casClient::logBadIdWithFileAndLineno()
*/
caStatus casClient::logBadIdWithFileAndLineno ( const caHdrLargeArray * mp,
const void * dp, const int cacStatus, const char * pFileName,
const unsigned lineno, const unsigned idIn
)
{
int status;
if ( pFileName) {
this-> dumpMsg ( mp, dp,
"bad resource id in \"%s\" at line %d\n",
pFileName, lineno );
}
else {
this->dumpMsg ( mp, dp,
"bad resource id\n" );
}
status = this->sendErr (
mp, invalidResID, cacStatus, "Bad Resource ID=%u detected at %s.%d",
idIn, pFileName, lineno);
return status;
}
//
// casClient::dumpMsg()
//
// Debug aid - print the header part of a message.
//
// dp arg allowed to be null
//
//
void casClient::dumpMsg ( const caHdrLargeArray *mp,
const void *dp, const char *pFormat, ... )
{
va_list theArgs;
if ( pFormat ) {
va_start ( theArgs, pFormat );
errlogPrintf ( "CAS: " );
errlogVprintf ( pFormat, theArgs );
va_end ( theArgs );
}
char pName[64u];
this->hostName ( pName, sizeof ( pName ) );
char pUserName[32];
this->userName ( pUserName, sizeof ( pUserName) );
char pHostName[32];
this->hostName ( pHostName, sizeof ( pHostName) );
fprintf ( stderr,
"CAS Request: %s on %s at %s: cmd=%u cid=%u typ=%u cnt=%u psz=%u avail=%x\n",
pUserName,
pHostName,
pName,
mp->m_cmmd,
mp->m_cid,
mp->m_dataType,
mp->m_count,
mp->m_postsize,
mp->m_available);
if ( mp->m_cmmd == CA_PROTO_WRITE && mp->m_dataType == DBR_STRING && dp ) {
errlogPrintf("CAS: The string written: %s \n", (char *)dp);
}
}

View File

@@ -1,115 +0,0 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
#ifndef casClienth
#define casClienth
#ifdef epicsExportSharedSymbols
# define epicsExportSharedSymbols_casClienth
# undef epicsExportSharedSymbols
#endif
#include "epicsTime.h"
#ifdef epicsExportSharedSymbols_casClienth
# define epicsExportSharedSymbols
# include "shareLib.h"
#endif
#include "casCoreClient.h"
#include "inBuf.h"
#include "outBuf.h"
//
// casClient
//
// this class exists so that udp and tcp can share certain
// protocol stubs but is this worth the extra complexity ????
//
class casClient : public casCoreClient, public outBufClient,
public inBufClient {
public:
casClient ( caServerI &, clientBufMemoryManager &, bufSizeT ioMinSizeIn );
virtual ~casClient ();
virtual void show ( unsigned level ) const;
caStatus sendErr ( const caHdrLargeArray *, ca_uint32_t cid,
const int reportedStatus, const char *pFormat, ... );
ca_uint16_t protocolRevision() const {return this->minor_version_number;}
virtual void hostName ( char *pBuf, unsigned bufSize ) const = 0;
void sendVersion ();
protected:
inBuf in;
outBuf out;
ca_uint16_t minor_version_number;
unsigned incommingBytesToDrain;
epicsTime lastSendTS;
epicsTime lastRecvTS;
caStatus sendErrWithEpicsStatus ( const caHdrLargeArray *pMsg,
ca_uint32_t cid, caStatus epicsStatus, caStatus clientStatus );
# define logBadId(MP, DP, CACSTAT, RESID) \
this->logBadIdWithFileAndLineno(MP, DP, CACSTAT, __FILE__, __LINE__, RESID)
caStatus logBadIdWithFileAndLineno ( const caHdrLargeArray *mp,
const void *dp, const int cacStat, const char *pFileName,
const unsigned lineno, const unsigned resId );
caStatus processMsg();
//
// dump message to stderr
//
void dumpMsg ( const caHdrLargeArray *mp, const void *dp,
const char *pFormat, ... );
private:
typedef caStatus ( casClient::*pCASMsgHandler ) ();
//
// one function for each CA request type
//
virtual caStatus uknownMessageAction () = 0;
caStatus ignoreMsgAction ();
virtual caStatus versionAction ();
virtual caStatus eventAddAction ();
virtual caStatus eventCancelAction ();
virtual caStatus readAction ();
virtual caStatus readNotifyAction ();
virtual caStatus writeAction ();
virtual caStatus searchAction ();
virtual caStatus eventsOffAction ();
virtual caStatus eventsOnAction ();
virtual caStatus readSyncAction ();
virtual caStatus clearChannelAction ();
virtual caStatus claimChannelAction ();
virtual caStatus writeNotifyAction ();
virtual caStatus clientNameAction ();
virtual caStatus hostNameAction ();
virtual caStatus echoAction ();
virtual void userName ( char * pBuf, unsigned bufSize ) const = 0;
//
// static members
//
static void loadProtoJumpTable();
static pCASMsgHandler msgHandlers[CA_PROTO_LAST_CMMD+1u];
static bool msgHandlersInit;
casClient ( const casClient & );
casClient & operator = ( const casClient & );
};
#endif // casClienth

View File

@@ -38,12 +38,15 @@ casCoreClient::~casCoreClient()
if ( this->ctx.getServer()->getDebugLevel() > 0u ) {
errlogPrintf ( "CAS: Connection Terminated\n" );
}
}
caStatus casCoreClient::disconnectChan ( caResId )
{
printf ("Disconnect Chan issued for inappropriate client type?\n");
return S_cas_success;
// this will clean up the event queue because all
// channels have been deleted and any events left on
// the queue are there because they are going to
// execute a subscription delete
{
epicsGuard < casClientMutex > guard ( this->mutex );
this->eventSys.process ( guard );
}
}
void casCoreClient::show ( unsigned level ) const
@@ -60,54 +63,78 @@ void casCoreClient::show ( unsigned level ) const
// asynchronous completion
//
caStatus casCoreClient::asyncSearchResponse (
const caNetAddr &, const caHdrLargeArray &, const pvExistReturn &,
ca_uint16_t, ca_uint32_t )
epicsGuard < casClientMutex > &, const caNetAddr &,
const caHdrLargeArray &, const pvExistReturn &,
ca_uint16_t, ca_uint32_t )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::createChanResponse ( const caHdrLargeArray &, const pvAttachReturn & )
caStatus casCoreClient::createChanResponse (
epicsGuard < casClientMutex > &,
const caHdrLargeArray &, const pvAttachReturn & )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::readResponse ( casChannelI *, const caHdrLargeArray &,
const gdd &, const caStatus )
caStatus casCoreClient::readResponse (
epicsGuard < casClientMutex > &, casChannelI *,
const caHdrLargeArray &, const gdd &, const caStatus )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::readNotifyResponse ( casChannelI *, const caHdrLargeArray &,
const gdd &, const caStatus )
caStatus casCoreClient::readNotifyResponse (
epicsGuard < casClientMutex > &, casChannelI *,
const caHdrLargeArray &, const gdd &, const caStatus )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::writeResponse ( casChannelI &,
const caHdrLargeArray &, const caStatus )
caStatus casCoreClient::writeResponse (
epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray &, const caStatus )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::writeNotifyResponse ( casChannelI &,
const caHdrLargeArray &, const caStatus )
caStatus casCoreClient::writeNotifyResponse (
epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray &, const caStatus )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::monitorResponse ( casChannelI &, const caHdrLargeArray &,
const gdd &, const caStatus )
caStatus casCoreClient::monitorResponse (
epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray &, const gdd &, const caStatus )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::accessRightsResponse ( casChannelI * )
caStatus casCoreClient::accessRightsResponse (
epicsGuard < casClientMutex > &, casChannelI * )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::enumPostponedCreateChanResponse ( casChannelI &,
caStatus casCoreClient::enumPostponedCreateChanResponse (
epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray &, unsigned )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::channelCreateFailedResp ( const caHdrLargeArray &,
caStatus casCoreClient::channelCreateFailedResp (
epicsGuard < casClientMutex > &, const caHdrLargeArray &,
const caStatus )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::channelDestroyNotify (
epicsGuard < casClientMutex > &,
casChannelI &, bool )
{
assert ( 0 );
return S_casApp_noSupport;
}
void casCoreClient::casChannelDestroyNotify (
casChannelI &, bool immediatedSestroyNeeded )
{
assert ( 0 );
}
caNetAddr casCoreClient::fetchLastRecvAddr () const
{
@@ -131,8 +158,8 @@ void casCoreClient::eventSignal()
{
}
caStatus casCoreClient::casMonitorCallBack ( casMonitor &,
const gdd & )
caStatus casCoreClient::casMonitorCallBack (
epicsGuard < casClientMutex > &, casMonitor &, const gdd & )
{
return S_cas_internal;
}

View File

@@ -25,6 +25,9 @@
#include "casEventSys.h"
#include "casCtx.h"
class casClientMutex : public epicsMutex {
};
//
// casCoreClient
// (this will eventually support direct communication
@@ -35,7 +38,6 @@ class casCoreClient : public ioBlocked,
public:
casCoreClient ( caServerI & serverInternal );
virtual ~casCoreClient ();
virtual caStatus disconnectChan( caResId id );
virtual void show ( unsigned level ) const;
void installAsynchIO ( class casAsyncPVAttachIOI & io );
@@ -50,30 +52,44 @@ public:
// asynchronous completion
//
virtual caStatus asyncSearchResponse (
const caNetAddr & outAddr,
epicsGuard < casClientMutex > &, const caNetAddr & outAddr,
const caHdrLargeArray &, const pvExistReturn &,
ca_uint16_t protocolRevision, ca_uint32_t sequenceNumber );
virtual caStatus createChanResponse (
epicsGuard < casClientMutex > &,
const caHdrLargeArray &, const pvAttachReturn &);
virtual caStatus readResponse (
epicsGuard < casClientMutex > &,
casChannelI *, const caHdrLargeArray &,
const gdd &, const caStatus );
virtual caStatus readNotifyResponse (
epicsGuard < casClientMutex > &,
casChannelI *, const caHdrLargeArray &,
const gdd &, const caStatus );
virtual caStatus writeResponse ( casChannelI &,
virtual caStatus writeResponse (
epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray &, const caStatus );
virtual caStatus writeNotifyResponse ( casChannelI &,
virtual caStatus writeNotifyResponse (
epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray &, const caStatus );
virtual caStatus monitorResponse ( casChannelI &,
virtual caStatus monitorResponse (
epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray &, const gdd &,
const caStatus status );
virtual caStatus accessRightsResponse ( casChannelI * );
virtual caStatus accessRightsResponse (
epicsGuard < casClientMutex > &, casChannelI * );
virtual caStatus enumPostponedCreateChanResponse (
epicsGuard < casClientMutex > &,
casChannelI &, const caHdrLargeArray &,
unsigned dbrType );
virtual caStatus channelCreateFailedResp (
epicsGuard < casClientMutex > &,
const caHdrLargeArray &, const caStatus createStatus );
virtual caStatus channelDestroyNotify (
epicsGuard < casClientMutex > &,
casChannelI &, bool uninstallNeeded );
virtual void casChannelDestroyNotify (
casChannelI & chan, bool immediateDestroyNeeded );
virtual ca_uint16_t protocolRevision () const = 0;
@@ -86,8 +102,6 @@ public:
casEventSys::processStatus eventSysProcess();
void addToEventQueue ( casMonEvent & );
void removeFromEventQueue ( casMonEvent & );
caStatus addToEventQueue ( casAsyncIOI &,
bool & onTheQueue, bool & posted );
void removeFromEventQueue ( casAsyncIOI &,
@@ -96,8 +110,9 @@ public:
casChannelI &, bool & inTheEventQueue );
void enableEvents ();
void disableEvents ();
caStatus casMonitorCallBack ( casMonitor &,
const gdd & );
caStatus casMonitorCallBack (
epicsGuard < casClientMutex > &,
casMonitor &, const gdd & );
void postEvent ( tsDLList <casMonitor > &,
const casEventMask &select, const gdd &event );
@@ -110,10 +125,10 @@ public:
void destroyMonitor ( casMonitor & mon );
void casMonEventDestroy (
casMonEvent &, epicsGuard < epicsMutex > & );
casMonEvent &, epicsGuard < evSysMutex > & );
protected:
mutable epicsMutex mutex;
mutable casClientMutex mutex;
casEventSys eventSys;
casCtx ctx;
bool userStartedAsyncIO;
@@ -142,7 +157,8 @@ inline bool casCoreClient::okToStartAsynchIO ()
return false;
}
inline void casCoreClient::postEvent ( tsDLList < casMonitor > & monitorList,
inline void casCoreClient::postEvent (
tsDLList < casMonitor > & monitorList,
const casEventMask & select, const gdd & event )
{
bool signalNeeded =
@@ -150,12 +166,12 @@ inline void casCoreClient::postEvent ( tsDLList < casMonitor > & monitorList,
if ( signalNeeded ) {
this->eventSignal ();
}
}
inline casEventSys::processStatus casCoreClient::eventSysProcess ()
{
return this->eventSys.process ();
epicsGuard < casClientMutex > guard ( this->mutex );
return this->eventSys.process ( guard );
}
inline caStatus casCoreClient::addToEventQueue ( casAsyncIOI & io,
@@ -176,16 +192,6 @@ inline void casCoreClient::removeFromEventQueue (
this->eventSys.removeFromEventQueue ( io, onTheEventQueue );
}
inline void casCoreClient::addToEventQueue ( casMonEvent & ev )
{
this->eventSys.addToEventQueue ( ev );
}
inline void casCoreClient::removeFromEventQueue ( casMonEvent & ev )
{
this->eventSys.removeFromEventQueue ( ev );
}
inline void casCoreClient::addToEventQueue (
casChannelI & ev, bool & inTheEventQueue )
{
@@ -218,7 +224,7 @@ inline void casCoreClient::setDestroyPending ()
}
inline void casCoreClient::casMonEventDestroy (
casMonEvent & ev, epicsGuard < epicsMutex > & guard )
casMonEvent & ev, epicsGuard < evSysMutex > & guard )
{
this->eventSys.casMonEventDestroy ( ev, guard );
}

View File

@@ -1,70 +0,0 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef casCoreClientIL_h
#define casCoreClientIL_h
#include "caServerIIL.h" // caServerI in line func
#include "casCtxIL.h" // casEventSys in line func
inline void casCoreClient::lock ()
{
this->mutex.lock ();
}
inline void casCoreClient::unlock ()
{
this->mutex.unlock ();
}
//
// casCoreClient::getCAS()
//
inline caServerI &casCoreClient::getCAS() const
{
return *this->ctx.getServer();
}
inline bool casCoreClient::okToStartAsynchIO ()
{
if ( ! this->asyncIOFlag ) {
this->asyncIOFlag = true;
return true;
}
return false;
}
inline casMonEvent & casCoreClient::casMonEventFactory ( casMonitor & monitor,
const gdd & pNewValue )
{
return this->ctx.getServer()->casMonEventFactory ( monitor, pNewValue );
}
inline void casCoreClient::casMonEventDestroy ( casMonEvent & event )
{
this->ctx.getServer()->casMonEventDestroy ( event );
}
inline casMonitor * casCoreClient::lookupMonitor ( const caResId & idIn )
{
return this->ctx.getServer()->lookupMonitor ( idIn );
}
#endif // casCoreClientIL_h

View File

@@ -23,12 +23,52 @@
#include "casDGClient.h"
#include "osiPoolStatus.h" // osi pool monitoring functions
casDGClient::pCASMsgHandler const casDGClient::msgHandlers[] =
{
& casDGClient::versionAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::searchAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::echoAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction,
& casDGClient::uknownMessageAction
};
//
// casDGClient::casDGClient()
//
casDGClient::casDGClient ( caServerI & serverIn, clientBufMemoryManager & mgrIn ) :
casClient ( serverIn, mgrIn, MAX_UDP_RECV + sizeof ( cadg ) ),
seqNoOfReq ( 0 )
casCoreClient ( serverIn ),
in ( *this, mgrIn, MAX_UDP_RECV + sizeof ( cadg ) ),
out ( *this, mgrIn ),
seqNoOfReq ( 0 ),
minor_version_number ( 0 )
{
}
@@ -59,8 +99,10 @@ void casDGClient::show (unsigned level) const
char buf[64];
this->hostName (buf, sizeof(buf));
printf ("Client Host=%s\n", buf);
this->casCoreClient::show ( level - 1u );
this->in.show ( level - 1u );
this->out.show ( level - 1u );
}
this->casClient::show (level);
}
//
@@ -70,7 +112,10 @@ caStatus casDGClient::uknownMessageAction ()
{
const caHdrLargeArray * mp = this->ctx.getMsg();
this->dumpMsg ( mp, this->ctx.getData(),
char pHostName[64u];
this->lastRecvAddr.stringConvert ( pHostName, sizeof ( pHostName ) );
caServerI::dumpMsg ( pHostName, "?", mp, this->ctx.getData(),
"bad request code=%u in DG\n", mp->m_cmmd );
return S_cas_internal;
@@ -89,13 +134,17 @@ caStatus casDGClient::searchAction()
// check the sanity of the message
//
if ( mp->m_postsize <= 1 ) {
this->dumpMsg ( mp, this->ctx.getData(),
char pHostName[64u];
this->lastRecvAddr.stringConvert ( pHostName, sizeof ( pHostName ) );
caServerI::dumpMsg ( pHostName, "?", mp, this->ctx.getData(),
"empty PV name extension in UDP search request?\n" );
return S_cas_success;
}
if ( pChanName[0] == '\0' ) {
this->dumpMsg ( mp, this->ctx.getData(),
char pHostName[64u];
this->lastRecvAddr.stringConvert ( pHostName, sizeof ( pHostName ) );
caServerI::dumpMsg ( pHostName, "?", mp, this->ctx.getData(),
"zero length PV name in UDP search request?\n" );
return S_cas_success;
}
@@ -105,7 +154,9 @@ caStatus casDGClient::searchAction()
// of the client library might not be setting the pad bytes to nill)
for ( unsigned i = mp->m_postsize-1; pChanName[i] != '\0'; i-- ) {
if ( i <= 1 ) {
this->dumpMsg ( mp, this->ctx.getData(),
char pHostName[64u];
this->lastRecvAddr.stringConvert ( pHostName, sizeof ( pHostName ) );
caServerI::dumpMsg ( pHostName, "?", mp, this->ctx.getData(),
"unterminated PV name in UDP search request?\n" );
return S_cas_success;
}
@@ -198,11 +249,11 @@ caStatus casDGClient::searchResponse ( const caHdrLargeArray & msg,
// to a search request. This is no longer supported.
//
if ( !CA_V44(msg.m_count) ) {
if (this->getCAS().getDebugLevel()>0u) {
char pName[64u];
this->hostName (pName, sizeof (pName));
printf("client \"%s\" using EPICS R3.11 CA connect protocol was ignored\n", pName);
}
errlogPrintf (
"client \"%s\" using EPICS R3.11 CA connect protocol was ignored\n",
pName);
//
// old connect protocol was dropped when the
// new API was added to the server (they must
@@ -442,7 +493,8 @@ inBufClient::fillCondition casDGClient::xRecv (char *pBufIn, bufSizeT nBytesToRe
// this results in many small UDP frames which unfortunately
// isnt particularly efficient
//
caStatus casDGClient::asyncSearchResponse ( const caNetAddr & outAddr,
caStatus casDGClient::asyncSearchResponse (
epicsGuard < casClientMutex > &, const caNetAddr & outAddr,
const caHdrLargeArray & msg, const pvExistReturn & retVal,
ca_uint16_t protocolRevision, ca_uint32_t sequenceNumber )
{
@@ -450,8 +502,6 @@ caStatus casDGClient::asyncSearchResponse ( const caNetAddr & outAddr,
return S_cas_success;
}
epicsGuard < epicsMutex > guard ( this->mutex );
void * pRaw;
const outBufCtx outctx = this->out.pushCtx
( sizeof(cadg), MAX_UDP_SEND, pRaw );
@@ -636,11 +686,270 @@ void casDGClient::hostName ( char *pBufIn, unsigned bufSizeIn ) const
this->lastRecvAddr.stringConvert ( pBufIn, bufSizeIn );
}
void casDGClient::userName ( char * pBuf, unsigned bufSizeIn ) const
// send minor protocol revision to the client
void casDGClient::sendVersion ()
{
if ( bufSizeIn ) {
strncpy ( pBuf, "?", bufSizeIn );
pBuf[bufSizeIn - 1] = '\0';
epicsGuard < epicsMutex > guard ( this->mutex );
caStatus status = this->out.copyInHeader ( CA_PROTO_VERSION, 0,
0, CA_MINOR_PROTOCOL_REVISION, 0, 0, 0 );
if ( ! status ) {
this->out.commitMsg ();
}
}
bool casDGClient::inBufFull () const
{
epicsGuard < epicsMutex > guard ( this->mutex );
return this->in.full ();
}
void casDGClient::inBufFill ( inBufClient::fillParameter parm )
{
epicsGuard < epicsMutex > guard ( this->mutex );
this->in.fill ( parm );
}
bufSizeT casDGClient::inBufBytesAvailable () const
{
epicsGuard < epicsMutex > guard ( this->mutex );
return this->in.bytesAvailable ();
}
bufSizeT casDGClient::outBufBytesPresent () const
{
epicsGuard < epicsMutex > guard ( this->mutex );
return this->out.bytesPresent ();
}
outBufClient::flushCondition casDGClient::flush ()
{
epicsGuard < epicsMutex > guard ( this->mutex );
return this->out.flush ();
}
//
// casDGClient::processMsg ()
// process any messages in the in buffer
//
caStatus casDGClient::processMsg ()
{
int status = S_cas_success;
try {
unsigned bytesLeft;
while ( ( bytesLeft = this->in.bytesPresent() ) ) {
caHdrLargeArray msgTmp;
unsigned msgSize;
ca_uint32_t hdrSize;
char * rawMP;
{
//
// copy as raw bytes in order to avoid
// alignment problems
//
caHdr smallHdr;
if ( bytesLeft < sizeof ( smallHdr ) ) {
break;
}
rawMP = this->in.msgPtr ();
memcpy ( & smallHdr, rawMP, sizeof ( smallHdr ) );
ca_uint32_t payloadSize = epicsNTOH16 ( smallHdr.m_postsize );
ca_uint32_t nElem = epicsNTOH16 ( smallHdr.m_count );
if ( payloadSize != 0xffff && nElem != 0xffff ) {
hdrSize = sizeof ( smallHdr );
}
else {
ca_uint32_t LWA[2];
hdrSize = sizeof ( smallHdr ) + sizeof ( LWA );
if ( bytesLeft < hdrSize ) {
break;
}
//
// copy as raw bytes in order to avoid
// alignment problems
//
memcpy ( LWA, rawMP + sizeof ( caHdr ), sizeof( LWA ) );
payloadSize = epicsNTOH32 ( LWA[0] );
nElem = epicsNTOH32 ( LWA[1] );
}
msgTmp.m_cmmd = epicsNTOH16 ( smallHdr.m_cmmd );
msgTmp.m_postsize = payloadSize;
msgTmp.m_dataType = epicsNTOH16 ( smallHdr.m_dataType );
msgTmp.m_count = nElem;
msgTmp.m_cid = epicsNTOH32 ( smallHdr.m_cid );
msgTmp.m_available = epicsNTOH32 ( smallHdr.m_available );
msgSize = hdrSize + payloadSize;
if ( bytesLeft < msgSize ) {
if ( msgSize > this->in.bufferSize() ) {
status = this->sendErr ( & msgTmp, invalidResID, ECA_TOLARGE,
"client's request didnt fit within the CA server's message buffer" );
this->in.removeMsg ( bytesLeft );
}
break;
}
this->ctx.setMsg ( msgTmp, rawMP + hdrSize );
if ( this->getCAS().getDebugLevel() > 2u ) {
char pHostName[64u];
this->lastRecvAddr.stringConvert ( pHostName, sizeof ( pHostName ) );
caServerI::dumpMsg ( pHostName, "?",
& msgTmp, rawMP + hdrSize, 0 );
}
}
//
// Reset the context to the default
// (guarantees that previous message does not get mixed
// up with the current message)
//
this->ctx.setChannel ( NULL );
this->ctx.setPV ( NULL );
//
// Call protocol stub
//
casDGClient::pCASMsgHandler pHandler;
if ( msgTmp.m_cmmd < NELEMENTS ( casDGClient::msgHandlers ) ) {
pHandler = this->casDGClient::msgHandlers[msgTmp.m_cmmd];
}
else {
pHandler = & casDGClient::uknownMessageAction;
}
status = ( this->*pHandler ) ();
if ( status ) {
break;
}
this->in.removeMsg ( msgSize );
}
}
catch ( std::bad_alloc & ) {
status = this->sendErr (
this->ctx.getMsg(), invalidResID, ECA_ALLOCMEM,
"inablility to allocate memory in "
"the server disconnected client" );
status = S_cas_noMemory;
}
catch ( std::exception & except ) {
status = this->sendErr (
this->ctx.getMsg(), invalidResID, ECA_INTERNAL,
"C++ exception \"%s\" in server "
"diconnected client",
except.what () );
status = S_cas_internal;
}
catch (...) {
status = this->sendErr (
this->ctx.getMsg(), invalidResID, ECA_INTERNAL,
"unexpected C++ exception in server "
"diconnected client" );
status = S_cas_internal;
}
return status;
}
//
// casDGClient::sendErr()
//
caStatus casDGClient::sendErr ( const caHdrLargeArray *curp,
ca_uint32_t cid, const int reportedStatus, const char *pformat, ... )
{
unsigned stringSize;
char msgBuf[1024]; /* allocate plenty of space for the message string */
if ( pformat ) {
va_list args;
va_start ( args, pformat );
int status = vsprintf ( msgBuf, pformat, args );
if ( status < 0 ) {
errPrintf (S_cas_internal, __FILE__, __LINE__,
"bad sendErr(%s)", pformat);
stringSize = 0u;
}
else {
stringSize = 1u + (unsigned) status;
}
}
else {
stringSize = 0u;
}
unsigned hdrSize = sizeof ( caHdr );
if ( ( curp->m_postsize >= 0xffff || curp->m_count >= 0xffff ) &&
CA_V49( this->minor_version_number ) ) {
hdrSize += 2 * sizeof ( ca_uint32_t );
}
caHdr * pReqOut;
epicsGuard < epicsMutex > guard ( this->mutex );
caStatus status = this->out.copyInHeader ( CA_PROTO_ERROR,
hdrSize + stringSize, 0, 0, cid, reportedStatus,
reinterpret_cast <void **> ( & pReqOut ) );
if ( ! status ) {
char * pMsgString;
/*
* copy back the request protocol
* (in network byte order)
*/
if ( ( curp->m_postsize >= 0xffff || curp->m_count >= 0xffff ) &&
CA_V49( this->minor_version_number ) ) {
ca_uint32_t *pLW = ( ca_uint32_t * ) ( pReqOut + 1 );
pReqOut->m_cmmd = htons ( curp->m_cmmd );
pReqOut->m_postsize = htons ( 0xffff );
pReqOut->m_dataType = htons ( curp->m_dataType );
pReqOut->m_count = htons ( 0u );
pReqOut->m_cid = htonl ( curp->m_cid );
pReqOut->m_available = htonl ( curp->m_available );
pLW[0] = htonl ( curp->m_postsize );
pLW[1] = htonl ( curp->m_count );
pMsgString = ( char * ) ( pLW + 2 );
}
else {
pReqOut->m_cmmd = htons (curp->m_cmmd);
pReqOut->m_postsize = htons ( ( (ca_uint16_t) curp->m_postsize ) );
pReqOut->m_dataType = htons (curp->m_dataType);
pReqOut->m_count = htons ( ( (ca_uint16_t) curp->m_count ) );
pReqOut->m_cid = htonl (curp->m_cid);
pReqOut->m_available = htonl (curp->m_available);
pMsgString = ( char * ) ( pReqOut + 1 );
}
/*
* add their context string into the protocol
*/
memcpy ( pMsgString, msgBuf, stringSize );
this->out.commitMsg ();
}
return S_cas_success;
}
//
// echoAction()
//
caStatus casDGClient::echoAction ()
{
const caHdrLargeArray * mp = this->ctx.getMsg();
const void * dp = this->ctx.getData();
void * pPayloadOut;
epicsGuard < epicsMutex > guard ( this->mutex );
caStatus status = this->out.copyInHeader ( mp->m_cmmd, mp->m_postsize,
mp->m_dataType, mp->m_count, mp->m_cid, mp->m_available,
& pPayloadOut );
if ( ! status ) {
memcpy ( pPayloadOut, dp, mp->m_postsize );
this->out.commitMsg ();
}
return S_cas_success;
}

View File

@@ -13,86 +13,97 @@
#ifndef casDGClienth
#define casDGClienth
#include "casClient.h"
#ifdef epicsExportSharedSymbols
# define epicsExportSharedSymbols_casDGClienth
# undef epicsExportSharedSymbols
#endif
class casDGClient : public casClient {
#include "epicsTime.h"
#ifdef epicsExportSharedSymbols_casDGClienth
# define epicsExportSharedSymbols
# include "shareLib.h"
#endif
#include "casCoreClient.h"
#include "inBuf.h"
#include "outBuf.h"
class casDGClient : public casCoreClient, public outBufClient,
public inBufClient {
public:
casDGClient ( class caServerI & serverIn,
clientBufMemoryManager & );
virtual ~casDGClient ();
virtual void show (unsigned level) const;
caStatus processMsg ();
void show ( unsigned level ) const;
void sendBeacon ( ca_uint32_t beaconNumber );
virtual void sendBeaconIO ( char &msg, bufSizeT length,
aitUint16 &portField, aitUint32 &addrField ) = 0;
virtual void sendBeaconIO ( char & msg, bufSizeT length,
aitUint16 & portField, aitUint32 & addrField ) = 0;
void destroy ();
unsigned getDebugLevel () const;
void hostName ( char * pBuf, unsigned bufSize ) const;
void userName ( char * pBuf, unsigned bufSize ) const;
caNetAddr fetchLastRecvAddr () const;
virtual caNetAddr serverAddress () const = 0;
caStatus sendErr ( const caHdrLargeArray * curp,
ca_uint32_t cid, const int reportedStatus,
const char *pformat, ... );
protected:
caStatus processDG ();
bool inBufFull () const;
void inBufFill ( inBufClient::fillParameter );
bufSizeT inBufBytesAvailable () const;
bufSizeT outBufBytesPresent () const;
outBufClient::flushCondition flush ();
private:
inBuf in;
outBuf out;
caNetAddr lastRecvAddr;
epicsTime lastSendTS;
epicsTime lastRecvTS;
ca_uint32_t seqNoOfReq;
ca_uint16_t minor_version_number;
typedef caStatus ( casDGClient :: * pCASMsgHandler ) ();
static pCASMsgHandler const msgHandlers[CA_PROTO_LAST_CMMD+1u];
//
// one function for each CA request type
//
caStatus searchAction ();
caStatus uknownMessageAction ();
caStatus echoAction ();
//
// searchFailResponse()
//
caStatus searchFailResponse ( const caHdrLargeArray *pMsg );
caStatus searchResponse ( const caHdrLargeArray &,
const pvExistReturn & retVal );
caStatus asyncSearchResponse ( const caNetAddr & outAddr,
const caHdrLargeArray & msg, const pvExistReturn & retVal,
caStatus asyncSearchResponse (
epicsGuard < casClientMutex > &, const caNetAddr & outAddr,
const caHdrLargeArray &, const pvExistReturn &,
ca_uint16_t protocolRevision, ca_uint32_t sequenceNumber );
//
// IO depen
//
void sendVersion ();
outBufClient::flushCondition xSend ( char *pBufIn, bufSizeT nBytesAvailableToSend,
bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent );
inBufClient::fillCondition xRecv ( char *pBufIn, bufSizeT nBytesToRecv,
fillParameter parm, bufSizeT &nByesRecv );
virtual outBufClient::flushCondition osdSend ( const char *pBuf, bufSizeT nBytesReq,
const caNetAddr & addr ) = 0;
inBufClient::fillCondition xRecv ( char * pBufIn, bufSizeT nBytesToRecv,
fillParameter parm, bufSizeT & nByesRecv );
virtual outBufClient::flushCondition osdSend (
const char * pBuf, bufSizeT nBytesReq, const caNetAddr & addr ) = 0;
virtual inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq,
fillParameter parm, bufSizeT &nBytesActual, caNetAddr & addr ) = 0;
caStatus versionAction ();
ca_uint32_t datagramSequenceNumber () const;
//
// cadg
//
ca_uint16_t protocolRevision () const;
struct cadg {
caNetAddr cadg_addr; // invalid address indicates pad
bufSizeT cadg_nBytes;
};
casDGClient ( const casDGClient & );
casDGClient & operator = ( const casDGClient & );
};
inline ca_uint16_t casDGClient::protocolRevision () const
{
return this->minor_version_number;
}
#endif // casDGClienth

View File

@@ -36,12 +36,16 @@
class casCoreClient;
class epicsMutex;
class evSysMutex;
class casClientMutex;
template < class MUTEX > class epicsGuard;
class casEvent : public tsDLNode < casEvent > {
public:
virtual caStatus cbFunc ( casCoreClient &, epicsGuard < epicsMutex > & ) = 0;
virtual caStatus cbFunc (
casCoreClient &,
epicsGuard < casClientMutex > &,
epicsGuard < evSysMutex > & ) = 0;
protected:
epicsShareFunc virtual ~casEvent();
};

View File

@@ -22,8 +22,9 @@
#include "casCoreClient.h"
#include "casAsyncIOI.h"
#include "casChannelI.h"
#include "channelDestroyEvent.h"
void casEventSys::show(unsigned level) const
void casEventSys::show ( unsigned level ) const
{
epicsGuard < epicsMutex > guard ( this->mutex );
printf ( "casEventSys at %p\n",
@@ -52,12 +53,6 @@ casEventSys::~casEventSys()
// o any subscription events remaining on the queue
// are pending destroy
// this will clean up the event queue because all
// channels have been deleted and any events left on
// the queue are there because they are going to
// execute a subscription delete
this->process ();
// verify above assertion is true
casVerify ( this->eventLogQue.count() == 0 );
@@ -85,13 +80,14 @@ void casEventSys::removeMonitor ()
this->maxLogEntries -= averageEventEntries;
}
casEventSys::processStatus casEventSys::process ()
casEventSys::processStatus casEventSys::process (
epicsGuard < casClientMutex > & casClientGuard )
{
casEventSys::processStatus ps;
ps.cond = casProcOk;
ps.nAccepted = 0u;
epicsGuard < epicsMutex > guard ( this->mutex );
epicsGuard < evSysMutex > evGuard ( this->mutex );
while ( ! this->dontProcess ) {
casEvent * pEvent;
@@ -103,7 +99,7 @@ casEventSys::processStatus casEventSys::process ()
}
caStatus status = pEvent->cbFunc (
this->client, guard );
this->client, casClientGuard, evGuard );
if ( status == S_cas_success ) {
ps.nAccepted++;
}
@@ -210,15 +206,14 @@ casEventPurgeEv::casEventPurgeEv ( casEventSys & evSysIn ) :
{
}
caStatus casEventPurgeEv::cbFunc ( casCoreClient &, epicsGuard < epicsMutex > & guard )
caStatus casEventPurgeEv::cbFunc (
casCoreClient &,
epicsGuard < casClientMutex > &,
epicsGuard < evSysMutex > & )
{
this->evSys.dontProcess = true;
this->evSys.pPurgeEvent = NULL;
{
epicsGuardRelease < epicsMutex > unklocker ( guard );
delete this;
}
delete this;
return S_cas_success;
}
@@ -255,12 +250,6 @@ bool casEventSys::addToEventQueue ( casChannelI & event, bool & inTheEventQueue
return wakeupRequired;
}
void casEventSys::removeFromEventQueue ( casMonEvent & event )
{
epicsGuard < epicsMutex > guard ( this->mutex );
this->eventLogQue.remove ( event );
}
void casEventSys::removeFromEventQueue ( casAsyncIOI & io, bool & onTheEventQueue )
{
epicsGuard < epicsMutex > guard ( this->mutex );
@@ -270,6 +259,12 @@ void casEventSys::removeFromEventQueue ( casAsyncIOI & io, bool & onTheEventQue
}
}
void casEventSys::addToEventQueue ( channelDestroyEvent & event )
{
epicsGuard < epicsMutex > guard ( this->mutex );
this->eventLogQue.add ( event );
}
void casEventSys::setDestroyPending ()
{
epicsGuard < epicsMutex > guard ( this->mutex );
@@ -319,7 +314,8 @@ bool casEventSys::postEvent ( tsDLList < casMonitor > & monitorList,
if ( this->eventLogQue.count() == 0 ) {
signalNeeded = true;
}
iter->installNewEventLog ( this->eventLogQue, pLog, event );
iter->installNewEventLog (
this->eventLogQue, pLog, event );
}
++iter;
}
@@ -328,7 +324,7 @@ bool casEventSys::postEvent ( tsDLList < casMonitor > & monitorList,
}
void casEventSys::casMonEventDestroy (
casMonEvent & ev, epicsGuard < epicsMutex > & guard )
casMonEvent & ev, epicsGuard < evSysMutex > & guard )
{
guard.assertIdenticalMutex ( this->mutex );
ev.~casMonEvent ();

View File

@@ -59,48 +59,42 @@ class casMonitor;
class casMonEvent;
class casCoreClient;
class evSysMutex;
template < class MUTEX > class epicsGuard;
class evSysMutex : public epicsMutex {};
class casEventSys {
public:
casEventSys ( casCoreClient & );
~casEventSys ();
void show ( unsigned level ) const;
struct processStatus {
casProcCond cond;
unsigned nAccepted;
};
processStatus process ();
processStatus process (
epicsGuard < casClientMutex > & guard );
void installMonitor ();
void removeMonitor ();
void prepareMonitorForDestroy ( casMonitor & mon );
bool postEvent ( tsDLList < casMonitor > & monitorList,
const casEventMask & select, const gdd & event );
void removeFromEventQueue ( casMonEvent & );
void addToEventQueue ( casMonEvent & );
caStatus addToEventQueue ( class casAsyncIOI &,
bool & onTheQueue, bool & posted, bool & signalNeeded );
void removeFromEventQueue ( class casAsyncIOI &,
bool & onTheEventQueue );
bool addToEventQueue (
casChannelI &, bool & inTheEventQueue );
void addToEventQueue ( class channelDestroyEvent & );
bool getNDuplicateEvents () const;
void setDestroyPending ();
void eventsOn ();
bool eventsOff ();
void casMonEventDestroy (
casMonEvent &, epicsGuard < epicsMutex > & );
casMonEvent &, epicsGuard < evSysMutex > & );
private:
mutable epicsMutex mutex;
mutable evSysMutex mutex;
tsDLList < casEvent > eventLogQue;
tsFreeList < casMonEvent, 1024, epicsMutexNOOP > casMonEventFreeList;
casCoreClient & client;
@@ -129,7 +123,9 @@ public:
private:
casEventSys & evSys;
caStatus cbFunc (
casCoreClient &, epicsGuard < epicsMutex > & guard );
casCoreClient &,
epicsGuard < casClientMutex > &,
epicsGuard < evSysMutex > & );
};
//

View File

@@ -23,10 +23,13 @@
#include "casCoreClient.h"
caStatus casMonEvent::cbFunc (
casCoreClient & client, epicsGuard < epicsMutex > & guard )
casCoreClient & client,
epicsGuard < casClientMutex > & clientGuard,
epicsGuard < evSysMutex > & evGuard )
{
return this->monitor.executeEvent (
client, * this, *this->pValue, guard );
client, * this, *this->pValue,
clientGuard, evGuard );
}
void casMonEvent::assign ( const gdd & valueIn )

View File

@@ -34,9 +34,6 @@
#include "casEvent.h"
class epicsMutex;
template < class MUTEX > class epicsGuard;
class casMonEvent : public casEvent {
public:
casMonEvent ( class casMonitor & monitor );
@@ -54,7 +51,10 @@ private:
smartConstGDDPointer pValue;
void * operator new ( size_t );
void operator delete ( void * );
caStatus cbFunc ( casCoreClient &, epicsGuard < epicsMutex > & guard );
caStatus cbFunc (
casCoreClient &,
epicsGuard < casClientMutex > &,
epicsGuard < evSysMutex > & );
casMonEvent ( const casMonEvent & );
casMonEvent & operator = ( const casMonEvent & );
};

View File

@@ -45,8 +45,9 @@ casMonitor::~casMonitor()
{
}
caStatus casMonitor::response ( casCoreClient & client,
const gdd & value )
caStatus casMonitor::response (
epicsGuard < casClientMutex > & guard,
casCoreClient & client, const gdd & value )
{
if ( this->pChannel ) {
// reconstruct request header
@@ -57,8 +58,8 @@ caStatus casMonitor::response ( casCoreClient & client,
msg.m_count = this->nElem;
msg.m_cid = this->pChannel->getSID();
msg.m_available = this->clientId;
return client.monitorResponse ( *this->pChannel,
msg, value, S_cas_success );
return client.monitorResponse (
guard, *this->pChannel, msg, value, S_cas_success );
}
else {
return S_cas_success;
@@ -102,12 +103,12 @@ void casMonitor::installNewEventLog (
caStatus casMonitor::executeEvent ( casCoreClient & client,
casMonEvent & ev, const gdd & value,
epicsGuard < epicsMutex > & guard )
epicsGuard < casClientMutex > & clientGuard,
epicsGuard < evSysMutex > & evGuard )
{
if ( this->pChannel ) {
epicsGuardRelease < epicsMutex > unguard ( guard );
caStatus status = this->callBackIntf.casMonitorCallBack (
*this, value );
clientGuard, *this, value );
if ( status != S_cas_success ) {
return status;
}
@@ -125,14 +126,16 @@ caStatus casMonitor::executeEvent ( casCoreClient & client,
this->overFlowEvent.clear ();
}
else {
client.casMonEventDestroy ( ev, guard );
client.casMonEventDestroy ( ev, evGuard );
}
if ( ! this->pChannel && this->nPend == 0 ) {
// we are careful here not to invert
// the lock hierarchy
epicsGuardRelease < epicsMutex > unguard ( guard );
client.destroyMonitor ( *this );
// we carefully avoid inverting the lock hierarchy here
epicsGuardRelease < evSysMutex > unguard ( evGuard );
{
epicsGuardRelease < casClientMutex > unguard ( clientGuard );
client.destroyMonitor ( *this );
}
}
return S_cas_success;

View File

@@ -36,9 +36,13 @@
#include "caHdrLargeArray.h"
#include "casMonEvent.h"
class casMonitor;
class casClientMutex;
class casMonitorCallbackInterface { // X aCC 655
public:
virtual caStatus casMonitorCallBack ( class casMonitor &,
virtual caStatus casMonitorCallBack (
epicsGuard < casClientMutex > &, casMonitor &,
const gdd & ) = 0;
};
@@ -55,20 +59,21 @@ public:
void installNewEventLog (
tsDLList < casEvent > & eventLogQue,
casMonEvent * pLog, const gdd & event );
caStatus response ( casCoreClient & client,
const gdd & value );
void show ( unsigned level ) const;
bool selected ( const casEventMask & select ) const;
caStatus executeEvent ( casCoreClient &,
casMonEvent &, const gdd &,
epicsGuard < epicsMutex > & );
bool matchingClientId ( caResId clientIdIn ) const;
unsigned numEventsQueued () const;
caStatus response (
epicsGuard < casClientMutex > &, casCoreClient & client,
const gdd & value );
caStatus executeEvent ( casCoreClient &,
casMonEvent &, const gdd &,
epicsGuard < casClientMutex > &,
epicsGuard < evSysMutex > & );
void * operator new ( size_t size,
tsFreeList < casMonitor, 1024 > & );
epicsPlacementDeleteOperator (( void *,
tsFreeList < casMonitor, 1024 > & ))
private:
casMonEvent overFlowEvent;
ca_uint32_t const nElem;

View File

@@ -18,7 +18,8 @@
#define epicsExportSharedSymbols
#include "casPVI.h"
casPV::casPV () : pPVI ( 0 )
casPV::casPV () :
pPVI ( 0 )
{
}
@@ -26,12 +27,22 @@ casPV::casPV () : pPVI ( 0 )
// This constructor is preserved for backwards compatibility only.
// Please do _not_ use this constructor.
//
casPV::casPV ( caServer & ) : pPVI ( 0 )
casPV::casPV ( caServer & ) :
pPVI ( 0 )
{
}
casPV::~casPV ()
{
if ( this->pPVI ) {
this->pPVI->casPVDestroyNotify ();
}
}
void casPV::destroyRequest ()
{
this->pPVI = 0;
this->destroy ();
}
//
@@ -48,8 +59,8 @@ void casPV::destroy ()
//
// casPV::createChannel()
//
casChannel *casPV::createChannel (const casCtx &ctx, const char * const,
const char * const )
casChannel *casPV::createChannel (
const casCtx &ctx, const char * const, const char * const )
{
return new casChannel ( ctx );
}
@@ -72,7 +83,7 @@ void casPV::interestDelete ()
//
// casPV::beginTransaction()
//
caStatus casPV::beginTransaction ()
caStatus casPV::beginTransaction ()
{
return S_casApp_success;
}

View File

@@ -29,8 +29,8 @@
#include "casMonitor.h"
casPVI::casPVI ( casPV & intf ) :
pCAS ( NULL ), pv ( intf ),
nMonAttached ( 0u ), nIOAttached ( 0u ) {}
pCAS ( NULL ), pPV ( & intf ), nMonAttached ( 0u ),
nIOAttached ( 0u ), deletePending ( false ) {}
casPVI::~casPVI ()
{
@@ -46,8 +46,8 @@ casPVI::~casPVI ()
// when we destroyed the channels
//
casVerify ( this->nIOAttached == 0u );
if (this->nIOAttached) {
errlogPrintf ( "The number of IO objected supposedly attached is %u\n", this->nIOAttached );
if ( this->nIOAttached ) {
errlogPrintf ( "The number of IO objected attached is %u\n", this->nIOAttached );
}
//
@@ -56,8 +56,24 @@ casPVI::~casPVI ()
//
casVerify ( this->nMonAttached == 0u );
this->pv.pPVI = 0;
this->pv.destroy ();
{
epicsGuard < epicsMutex > guard ( this->mutex );
this->deletePending = true;
if ( this->pPV ) {
this->pPV->destroyRequest ();
}
}
}
void casPVI::casPVDestroyNotify ()
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( ! this->deletePending ) {
tsDLIter < chanIntfForPV > iter = this->chanList.firstIter ();
while ( iter.valid() ) {
iter->casChannelDestroyNotify ( false );
}
}
}
//
@@ -93,16 +109,9 @@ void casPVI::deleteSignal ()
// !! dont access self after potential delete above !!
}
casPVI * casPVI::attachPV ( casPV & pv )
{
if ( ! pv.pPVI ) {
pv.pPVI = new ( std::nothrow ) casPVI ( pv );
}
return pv.pPVI;
}
caStatus casPVI::attachToServer ( caServerI & cas )
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->pCAS ) {
//
// currently we enforce that the PV can be attached to only
@@ -128,6 +137,8 @@ caStatus casPVI::attachToServer ( caServerI & cas )
//
caStatus casPVI::updateEnumStringTable ( casCtx & ctx )
{
epicsGuard < epicsMutex > guard ( this->mutex );
//
// keep trying to fill in the table if client disconnects
// prevented previous asynchronous IO from finishing, but if
@@ -248,7 +259,7 @@ void casPVI::updateEnumStringTableAsyncCompletion ( const gdd & resp )
}
else if ( resp.primitiveType() == aitEnumFixedString ) {
aitFixedString *pStr = (aitFixedString *) resp.dataVoid ();
for ( index = 0; index<count; index++ ) {
for ( index = 0; index < count; index++ ) {
if ( ! this->enumStrTbl.setString ( index, pStr[index].fixed_string ) ) {
errMessage ( S_cas_noMemory,
"no memory to set enumerated PV string cache" );
@@ -287,19 +298,13 @@ void casPVI::postEvent ( const casEventMask & select, const gdd & event )
caStatus casPVI::installMonitor (
casMonitor & mon, tsDLList < casMonitor > & monitorList )
{
bool newInterest = false;
{
epicsGuard < epicsMutex > guard ( this->mutex );
assert ( this->nMonAttached < UINT_MAX );
this->nMonAttached++;
if ( this->nMonAttached == 1u ) {
newInterest = true;
}
// use pv lock to protect channel's monitor list
monitorList.add ( mon );
}
if ( newInterest ) {
return this->pv.interestRegister ();
epicsGuard < epicsMutex > guard ( this->mutex );
assert ( this->nMonAttached < UINT_MAX );
this->nMonAttached++;
// use pv lock to protect channel's monitor list
monitorList.add ( mon );
if ( this->nMonAttached == 1u && this->pPV ) {
return this->pPV->interestRegister ();
}
else {
return S_cas_success;
@@ -309,37 +314,33 @@ caStatus casPVI::installMonitor (
casMonitor * casPVI::removeMonitor (
tsDLList < casMonitor > & list, ca_uint32_t clientIdIn )
{
epicsGuard < epicsMutex > guard ( this->mutex );
casMonitor * pMon = 0;
bool noInterest = false;
{
//
// (it is reasonable to do a linear search here because
// sane clients will require only one or two monitors
// per channel)
//
epicsGuard < epicsMutex > guard ( this->mutex );
tsDLIter < casMonitor > iter = list.firstIter ();
while ( iter.valid () ) {
if ( iter->matchingClientId ( clientIdIn ) ) {
list.remove ( *iter.pointer () );
assert ( this->nMonAttached > 0 );
this->nMonAttached--;
noInterest =
( this->nMonAttached == 0u );
pMon = iter.pointer ();
break;
}
iter++;
}
}
if ( noInterest ) {
this->pv.interestDelete ();
//
// (it is reasonable to do a linear search here because
// sane clients will require only one or two monitors
// per channel)
//
tsDLIter < casMonitor > iter = list.firstIter ();
while ( iter.valid () ) {
if ( iter->matchingClientId ( clientIdIn ) ) {
list.remove ( *iter.pointer () );
assert ( this->nMonAttached > 0 );
this->nMonAttached--;
pMon = iter.pointer ();
break;
}
iter++;
}
if ( this->nMonAttached == 0u && this->pPV ) {
this->pPV->interestDelete ();
}
return pMon;
}
caServer *casPVI::getExtServer () const // X aCC 361
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->pCAS ) {
return this->pCAS->getAdapter ();
}
@@ -357,13 +358,13 @@ void casPVI::show ( unsigned level ) const
printf ( "\tBest external type = %d\n", this->bestExternalType() );
}
if ( level >= 2u ) {
this->pv.show ( level - 2u );
this->pPV->show ( level - 2u );
}
}
casPV * casPVI::apiPointer ()
{
return & this->pv;
return this->pPV;
}
void casPVI::installChannel ( chanIntfForPV & chan )
@@ -376,19 +377,15 @@ void casPVI::removeChannel (
chanIntfForPV & chan, tsDLList < casMonitor > & src,
tsDLList < casMonitor > & dest )
{
bool noInterest = false;
{
epicsGuard < epicsMutex > guard ( this->mutex );
src.removeAll ( dest );
if ( dest.count() ) {
assert ( this->nMonAttached >= dest.count() );
this->nMonAttached -= dest.count ();
noInterest = ( this->nMonAttached == 0u );
}
this->chanList.remove ( chan );
epicsGuard < epicsMutex > guard ( this->mutex );
src.removeAll ( dest );
if ( dest.count() ) {
assert ( this->nMonAttached >= dest.count() );
this->nMonAttached -= dest.count ();
}
if ( noInterest ) {
this->pv.interestDelete ();
this->chanList.remove ( chan );
if ( this->nMonAttached == 0u && this->pPV ) {
this->pPV->interestDelete ();
}
}
@@ -446,6 +443,7 @@ void casPVI::uninstallIO (
caStatus casPVI::bestDBRType ( unsigned & dbrType ) // X aCC 361
{
epicsGuard < epicsMutex > guard ( this->mutex );
aitEnum bestAIT = this->bestExternalType ();
if ( bestAIT == aitEnumInvalid || bestAIT < 0 ) {
return S_cas_badType;
@@ -458,4 +456,85 @@ caStatus casPVI::bestDBRType ( unsigned & dbrType ) // X aCC 361
return S_cas_success;
}
caStatus casPVI::read ( const casCtx & ctx, gdd & prototype )
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->pPV ) {
caStatus status = this->pPV->beginTransaction ();
if ( status != S_casApp_success ) {
return status;
}
status = this->pPV->read ( ctx, prototype );
this->pPV->endTransaction ();
return status;
}
else {
return S_cas_disconnect;
}
}
caStatus casPVI::write ( const casCtx & ctx, const gdd & value )
{
if ( this->pPV ) {
caStatus status = this->pPV->beginTransaction ();
if ( status != S_casApp_success ) {
return status;
}
status = this->pPV->write ( ctx, value );
this->pPV->endTransaction ();
return status;
}
else {
return S_cas_disconnect;
}
}
casChannel * casPVI::createChannel ( const casCtx & ctx,
const char * const pUserName, const char * const pHostName )
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->pPV ) {
return this->pPV->createChannel ( ctx, pUserName, pHostName );
}
else {
return 0;
}
}
aitEnum casPVI::bestExternalType () const
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->pPV ) {
return this->pPV->bestExternalType ();
}
else {
return aitEnumInvalid;
}
}
// CA only does 1D arrays for now
aitIndex casPVI::nativeCount ()
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->pPV ) {
if ( this->pPV->maxDimension() == 0u ) {
return 1u; // scalar
}
return this->pPV->maxBound ( 0u );
}
else {
return S_cas_disconnect;
}
}
const char * casPVI::getName () const
{
epicsGuard < epicsMutex > guard ( this->mutex );
if ( this->pPV ) {
return this->pPV->getName ();
}
else {
return "<disconnected>";
}
}

View File

@@ -47,6 +47,7 @@ class casPVI :
{
public:
casPVI ( casPV & );
epicsShareFunc virtual ~casPVI ();
caServerI * getPCAS () const;
caStatus attachToServer ( caServerI & cas );
aitIndex nativeCount ();
@@ -74,97 +75,37 @@ public:
void updateEnumStringTableAsyncCompletion ( const gdd & resp );
casPV * apiPointer (); // retuns NULL if casPVI isnt a base of casPV
void show ( unsigned level ) const;
caStatus beginTransaction ();
void endTransaction ();
caStatus read ( const casCtx & ctx, gdd & prototype );
caStatus write ( const casCtx & ctx, const gdd & value );
casChannel * createChannel ( const casCtx & ctx,
const char * const pUserName, const char * const pHostName );
aitEnum bestExternalType () const;
unsigned maxDimension () const;
aitIndex maxBound ( unsigned dimension ) const;
const char * getName () const;
static casPVI * attachPV ( casPV & );
void casPVDestroyNotify ();
protected:
epicsShareFunc virtual ~casPVI ();
private:
mutable epicsMutex mutex;
tsDLList < chanIntfForPV > chanList;
gddEnumStringTable enumStrTbl;
caServerI * pCAS;
casPV & pv;
casPV * pPV;
unsigned nMonAttached;
unsigned nIOAttached;
bool deletePending;
casPVI ( const casPVI & );
casPVI & operator = ( const casPVI & );
};
inline caServerI *casPVI::getPCAS() const
inline caServerI * casPVI::getPCAS() const
{
return this->pCAS;
}
// CA only does 1D arrays for now
inline aitIndex casPVI::nativeCount ()
{
if ( this->maxDimension() == 0u ) {
return 1u; // scalar
}
return this->maxBound(0u);
}
inline const gddEnumStringTable & casPVI::enumStringTable () const
{
return this->enumStrTbl;
}
inline caStatus casPVI::beginTransaction ()
{
return this->pv.beginTransaction ();
}
inline void casPVI::endTransaction ()
{
this->pv.endTransaction ();
}
inline caStatus casPVI::read ( const casCtx & ctx, gdd & prototype )
{
return this->pv.read ( ctx, prototype );
}
inline caStatus casPVI::write ( const casCtx & ctx, const gdd & value )
{
return this->pv.write ( ctx, value );
}
inline casChannel * casPVI::createChannel ( const casCtx & ctx,
const char * const pUserName, const char * const pHostName )
{
return this->pv.createChannel ( ctx, pUserName, pHostName );
}
inline aitEnum casPVI::bestExternalType () const
{
return this->pv.bestExternalType ();
}
inline unsigned casPVI::maxDimension () const
{
return this->pv.maxDimension ();
}
inline aitIndex casPVI::maxBound ( unsigned dimension ) const
{
return this->pv.maxBound ( dimension );
}
inline const char * casPVI::getName () const
{
return this->pv.getName ();
}
#endif // casPVIh

File diff suppressed because it is too large Load Diff

View File

@@ -12,7 +12,21 @@
#ifndef casStrmClienth
#define casStrmClienth
#include "casClient.h"
#ifdef epicsExportSharedSymbols
# define epicsExportSharedSymbols_casStrmClienth
# undef epicsExportSharedSymbols
#endif
#include "epicsTime.h"
#ifdef epicsExportSharedSymbols_casStrmClienth
# define epicsExportSharedSymbols
# include "shareLib.h"
#endif
#include "casCoreClient.h"
#include "inBuf.h"
#include "outBuf.h"
enum xBlockingStatus { xIsBlocking, xIsntBlocking };
@@ -20,82 +34,108 @@ enum xBlockingStatus { xIsBlocking, xIsntBlocking };
// casStrmClient
//
class casStrmClient :
public casClient,
public tsDLNode < casStrmClient > {
public casCoreClient, public outBufClient,
public inBufClient, public tsDLNode < casStrmClient > {
public:
casStrmClient ( caServerI &, clientBufMemoryManager & );
virtual ~casStrmClient();
void show ( unsigned level ) const;
void flush ();
outBufClient::flushCondition flush ();
unsigned getDebugLevel () const;
virtual void hostName ( char * pBuf, unsigned bufSize ) const = 0;
void userName ( char * pBuf, unsigned bufSize ) const;
ca_uint16_t protocolRevision () const;
void sendVersion ();
protected:
caStatus processMsg ();
bool inBufFull () const;
bufSizeT inBufBytesAvailable () const;
inBufClient::fillCondition inBufFill ();
bufSizeT outBufBytesPresent () const;
private:
char hostNameStr [32];
inBuf in;
outBuf out;
chronIntIdResTable < casChannelI > chanTable;
tsDLList < casChannelI > chanList;
epicsTime lastSendTS;
epicsTime lastRecvTS;
char * pUserName;
char * pHostName;
unsigned incommingBytesToDrain;
ca_uint16_t minor_version_number;
caStatus createChannel ( const char * pName );
caStatus verifyRequest ( casChannelI * & pChan );
typedef caStatus ( casStrmClient :: * pCASMsgHandler )
( epicsGuard < casClientMutex > & );
static pCASMsgHandler const msgHandlers[CA_PROTO_LAST_CMMD+1u];
//
// one function for each CA request type
//
caStatus uknownMessageAction ( epicsGuard < casClientMutex > & );
caStatus ignoreMsgAction ( epicsGuard < casClientMutex > & );
caStatus versionAction ( epicsGuard < casClientMutex > & );
caStatus echoAction ( epicsGuard < casClientMutex > & );
caStatus eventAddAction ( epicsGuard < casClientMutex > & );
caStatus eventCancelAction ( epicsGuard < casClientMutex > & );
caStatus readAction ( epicsGuard < casClientMutex > & );
caStatus readNotifyAction ( epicsGuard < casClientMutex > & );
caStatus writeAction ( epicsGuard < casClientMutex > & );
caStatus eventsOffAction ( epicsGuard < casClientMutex > & );
caStatus eventsOnAction ( epicsGuard < casClientMutex > & );
caStatus readSyncAction ( epicsGuard < casClientMutex > & );
caStatus clearChannelAction ( epicsGuard < casClientMutex > & );
caStatus claimChannelAction ( epicsGuard < casClientMutex > & );
caStatus writeNotifyAction ( epicsGuard < casClientMutex > & );
caStatus clientNameAction ( epicsGuard < casClientMutex > & );
caStatus hostNameAction ( epicsGuard < casClientMutex > & );
caStatus sendErr ( epicsGuard < casClientMutex > &,
const caHdrLargeArray *curp, ca_uint32_t cid,
const int reportedStatus, const char * pformat, ... );
caStatus readNotifyFailureResponse ( epicsGuard < casClientMutex > &,
const caHdrLargeArray & msg, const caStatus ECA_XXXX );
caStatus monitorFailureResponse ( epicsGuard < casClientMutex > &,
const caHdrLargeArray & msg, const caStatus ECA_XXXX );
caStatus writeNotifyResponseECA_XXX ( epicsGuard < casClientMutex > &,
const caHdrLargeArray & msg, const caStatus status );
caStatus sendErrWithEpicsStatus ( epicsGuard < casClientMutex > &,
const caHdrLargeArray * pMsg, ca_uint32_t cid, caStatus epicsStatus,
caStatus clientStatus );
//
// one function for each CA request type that has
// asynchronous completion
//
virtual caStatus createChanResponse (
caStatus createChanResponse ( epicsGuard < casClientMutex > &,
const caHdrLargeArray &, const pvAttachReturn & );
caStatus readResponse ( casChannelI * pChan, const caHdrLargeArray & msg,
caStatus readResponse ( epicsGuard < casClientMutex > &,
casChannelI * pChan, const caHdrLargeArray & msg,
const gdd & desc, const caStatus status );
caStatus readNotifyResponse ( casChannelI *pChan, const caHdrLargeArray & msg,
caStatus readNotifyResponse ( epicsGuard < casClientMutex > &,
casChannelI *pChan, const caHdrLargeArray & msg,
const gdd & desc, const caStatus status );
caStatus writeResponse ( casChannelI &,
caStatus writeResponse ( epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray & msg, const caStatus status );
caStatus writeNotifyResponse ( casChannelI &,
caStatus writeNotifyResponse ( epicsGuard < casClientMutex > &, casChannelI &,
const caHdrLargeArray &, const caStatus status );
caStatus monitorResponse ( casChannelI & chan, const caHdrLargeArray & msg,
caStatus monitorResponse ( epicsGuard < casClientMutex > &,
casChannelI & chan, const caHdrLargeArray & msg,
const gdd & desc, const caStatus status );
caStatus enumPostponedCreateChanResponse ( casChannelI & chan,
const caHdrLargeArray & hdr, unsigned dbrType );
caStatus channelCreateFailedResp ( const caHdrLargeArray &,
const caStatus createStatus );
caStatus enumPostponedCreateChanResponse ( epicsGuard < casClientMutex > &,
casChannelI & chan, const caHdrLargeArray & hdr, unsigned dbrType );
caStatus channelCreateFailedResp ( epicsGuard < casClientMutex > &,
const caHdrLargeArray &, const caStatus createStatus );
caStatus casStrmClient::channelDestroyNotify (
epicsGuard < casClientMutex > & guard,
casChannelI &, bool uninstallNeeded );
caStatus disconnectChan ( caResId id );
unsigned getDebugLevel () const;
virtual void hostName ( char * pBuf, unsigned bufSize ) const = 0;
void userName ( char * pBuf, unsigned bufSize ) const;
caStatus accessRightsResponse (
casChannelI * pciu );
caStatus accessRightsResponse (
epicsGuard < casClientMutex > &, casChannelI * pciu );
private:
chronIntIdResTable < casChannelI > chanTable;
tsDLList < casChannelI > chanList;
char * pUserName;
char * pHostName;
//
// createChannel()
//
caStatus createChannel ( const char *pName );
//
// verify read/write requests
//
caStatus verifyRequest ( casChannelI * & pChan );
//
// one function for each CA request type
//
caStatus uknownMessageAction ();
caStatus eventAddAction ();
caStatus eventCancelAction ();
caStatus readAction ();
caStatus readNotifyAction ();
caStatus writeAction ();
caStatus eventsOffAction ();
caStatus eventsOnAction ();
caStatus readSyncAction ();
caStatus clearChannelAction ();
caStatus claimChannelAction ();
caStatus writeNotifyAction ();
caStatus clientNameAction ();
caStatus hostNameAction ();
//
// accessRightsResponse()
//
caStatus accessRightsResponse ( casChannelI * pciu );
//
// these prepare the gdd based on what is in the ca hdr
//
caStatus read ( const gdd * & pDesc );
caStatus write ();
@@ -103,9 +143,6 @@ private:
caStatus writeScalarData();
caStatus writeString();
//
// io independent send/recv
//
outBufClient::flushCondition xSend ( char * pBuf, bufSizeT nBytesAvailableToSend,
bufSizeT nBytesNeedToBeSent, bufSizeT & nBytesSent );
inBufClient::fillCondition xRecv ( char * pBuf, bufSizeT nBytesToRecv,
@@ -117,22 +154,29 @@ private:
bufSizeT & nBytesActual ) = 0;
virtual inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq,
bufSizeT &nBytesActual ) = 0;
caStatus readNotifyFailureResponse ( const caHdrLargeArray & msg,
const caStatus ECA_XXXX );
caStatus monitorFailureResponse ( const caHdrLargeArray & msg,
const caStatus ECA_XXXX );
caStatus writeNotifyResponseECA_XXX ( const caHdrLargeArray &msg,
const caStatus status );
caStatus casMonitorCallBack ( casMonitor &,
const gdd & );
virtual void forceDisconnect () = 0;
caStatus casMonitorCallBack (
epicsGuard < casClientMutex > &, casMonitor &, const gdd & );
caStatus logBadIdWithFileAndLineno (
epicsGuard < casClientMutex > & guard, const caHdrLargeArray * mp,
const void * dp, const int cacStatus, const char * pFileName,
const unsigned lineno, const unsigned idIn );
void casChannelDestroyNotify ( casChannelI & chan,
bool immediatedSestroyNeeded );
casStrmClient ( const casStrmClient & );
casStrmClient & operator = ( const casStrmClient & );
};
#define logBadId(GUARD, MP, DP, CACSTAT, RESID) \
this->logBadIdWithFileAndLineno ( GUARD, MP, DP, \
CACSTAT, __FILE__, __LINE__, RESID )
inline ca_uint16_t casStrmClient::protocolRevision () const
{
return this->minor_version_number;
}
#endif // casStrmClienth

View File

@@ -299,6 +299,11 @@ private:
// o The destructor for this object will cancel any
// client attachment to this PV (and reclaim any resources
// allocated by the server library on its behalf)
// o If the server tool needs to asynchronously delete an object
// derived from casPV from another thread then it *must* also
// define a specialized destroy() method that prevent race conditions
// occurring when both the server library and the server tool attempt
// to destroy the same casPV derived object at the same instant.
//
// NOTE: if the server tool precreates the PV during initialization
// then it may decide to provide a "destroy()" implementation in the
@@ -406,17 +411,6 @@ public:
epicsShareFunc virtual casChannel * createChannel ( const casCtx &ctx,
const char * const pUserName, const char * const pHostName);
//
// destroy() is called
// 1) each time that a PV transitions from
// a situation where clients are attached to a situation
// where no clients are attached.
// 2) once for all PVs that exist when the server is deleted
//
// the default (base) "destroy()" executes "delete this"
//
epicsShareFunc virtual void destroy ();
//
// tbe best type for clients to use when accessing the
// value of the PV
@@ -468,6 +462,17 @@ public:
epicsShareFunc virtual unsigned maxDimension () const; // return zero if scalar
epicsShareFunc virtual aitIndex maxBound ( unsigned dimension ) const;
//
// destroy() is called
// 1) each time that a PV transitions from
// a situation where clients are attached to a situation
// where no clients are attached.
// 2) once for all PVs that exist when the server is deleted
//
// the default (base) "destroy()" executes "delete this"
//
epicsShareFunc virtual void destroy ();
//
// Server tool calls this function to post a PV event.
//
@@ -499,24 +504,20 @@ public:
// ***************
//
epicsShareFunc caServer * getCAS () const;
//
// only used when caStrmClient converts from
// casPV * to casPVI *
//
//friend class casStrmClient;
void destroyRequest ();
private:
casPVI * pPVI;
friend class casPVI; // used ony to get casPVI casPV ( const casPV & );
casPV & operator = ( const casPV & );
friend class casStrmClient;
public:
//
// This constructor has been deprecated, and is preserved for
// backwards compatibility only. Please do not use it.
//
epicsShareFunc casPV (caServer &);
epicsShareFunc casPV ( caServer & );
};
//
@@ -540,6 +541,11 @@ public:
// o The destructor for this object will cancel any
// client attachment to this channel (and reclaim any resources
// allocated by the server library on its behalf)
// o If the server tool needs to asynchronously delete an object
// derived from casChannel from another thread then it *must* also
// define a specialized destroy() method that prevent race conditions
// occurring when both the server library and the server tool attempt
// to destroy the same casChannel derived object at the same instant.
//
class casChannel {
public:
@@ -568,13 +574,12 @@ public:
// caServer::show() is called and the level is high
// enough
//
epicsShareFunc virtual void show (unsigned level) const;
epicsShareFunc virtual void show ( unsigned level ) const;
//
// destroy() is called when
// 1) there is a client initiated channel delete
// 2) there is a server tool initiaed PV delete
// 3) there is a server tool initiated server delete
// 2) there is a server tool initiated PV delete
//
// the casChannel::destroy() executes a "delete this"
//
@@ -597,11 +602,14 @@ public:
//
epicsShareFunc casPV * getPV ();
void destroyRequest ();
private:
class casChannelI * pChanI;
friend class casStrmClient; // used ony to get casChannelI
casChannel ( const casChannel & );
casChannel & operator = ( const casChannel & );
friend class casStrmClient;
};
//

View File

@@ -33,7 +33,7 @@
# include "shareLib.h"
#endif
#include "casCoreClient.h"
#include "casStrmClient.h"
#include "casPVI.h"
class casMonitor;
@@ -46,6 +46,7 @@ public:
chanIntfForPV ( class casCoreClient & );
~chanIntfForPV ();
class casCoreClient & client () const;
virtual void casChannelDestroyNotify ( bool immediateUninstall ) = 0;
void installMonitor ( casPVI & pv, casMonitor & mon );
casMonitor * removeMonitor ( casPVI &, ca_uint32_t monId );
void removeSelfFromPV ( casPVI &,

View File

@@ -16,16 +16,20 @@
*/
#ifndef casClientIL_h
#define casClientIL_h
#define epicsExportSharedSymbols
#include "channelDestroyEvent.h"
#include "casCoreClient.h"
//
// casClient::getDebugLevel()
//
//inline unsigned casClient::getDebugLevel() const
//{
// return this->ctx.getServer()->getDebugLevel();
//}
#endif // casClientIL_h
caStatus channelDestroyEvent::cbFunc (
casCoreClient & client,
epicsGuard < casClientMutex > & clientGuard,
epicsGuard < evSysMutex > & )
{
caStatus status = client.channelDestroyNotify (
clientGuard, this->chan, this->uninstallNeeded );
if ( status != S_cas_sendBlocked ) {
delete this;
}
return status;
}

View File

@@ -0,0 +1,57 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* $Id$
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#ifndef channelDestroyEventh
#define channelDestroyEventh
#ifdef epicsExportSharedSymbols
# define epicsExportSharedSymbols_channelDestroyEventh
# undef epicsExportSharedSymbols
#endif
// external headers included here
#include "caProto.h"
#ifdef epicsExportSharedSymbols_channelDestroyEventh
# define epicsExportSharedSymbols
# include "shareLib.h"
#endif
#include "casEvent.h"
class casChannelI;
class channelDestroyEvent : public casEvent {
public:
channelDestroyEvent ( casChannelI &, bool uninstallNeeded );
private:
casChannelI & chan;
bool uninstallNeeded;
caStatus cbFunc (
casCoreClient &,
epicsGuard < casClientMutex > &,
epicsGuard < evSysMutex > & );
};
inline channelDestroyEvent::channelDestroyEvent (
casChannelI & chanIn, bool uninstallNeededIn ) :
chan ( chanIn ), uninstallNeeded ( uninstallNeededIn )
{
}
#endif // channelDestroyEventh

View File

@@ -76,9 +76,9 @@ private:
casDGIntfOS::casDGIntfOS (
caServerI & serverIn, clientBufMemoryManager & memMgrIn,
const caNetAddr & addr, bool autoBeaconAddr,
bool addConfigBeaconAddr) :
casDGIntfIO (serverIn, memMgrIn, addr,
autoBeaconAddr, addConfigBeaconAddr),
bool addConfigBeaconAddr ) :
casDGIntfIO ( serverIn, memMgrIn, addr,
autoBeaconAddr, addConfigBeaconAddr ),
pRdReg ( 0 ),
pBCastRdReg ( 0 ),
pWtReg ( 0 ),
@@ -217,12 +217,12 @@ void casDGIOWakeup::show(unsigned level) const
//
void casDGIntfOS::armRecv()
{
if ( ! this->in.full () ) {
if (!this->pRdReg) {
this->pRdReg = new casDGReadReg(*this);
if ( ! this->inBufFull () ) {
if ( ! this->pRdReg ) {
this->pRdReg = new casDGReadReg ( *this );
}
if (this->validBCastFD() && this->pBCastRdReg==NULL) {
this->pBCastRdReg = new casDGBCastReadReg(*this);
if ( this->validBCastFD() && this->pBCastRdReg == NULL ) {
this->pBCastRdReg = new casDGBCastReadReg ( *this );
}
}
}
@@ -232,12 +232,10 @@ void casDGIntfOS::armRecv()
//
void casDGIntfOS::disarmRecv()
{
if (this->pRdReg) {
delete this->pRdReg;
}
if (this->pBCastRdReg) {
delete this->pBCastRdReg;
}
delete this->pRdReg;
this->pRdReg = 0;
delete this->pBCastRdReg;
this->pBCastRdReg = 0;
}
//
@@ -245,12 +243,12 @@ void casDGIntfOS::disarmRecv()
//
void casDGIntfOS::armSend()
{
if ( this->out.bytesPresent () == 0u ) {
if ( this->outBufBytesPresent () == 0u ) {
return;
}
if (!this->pWtReg) {
this->pWtReg = new casDGWriteReg(*this);
if ( ! this->pWtReg ) {
this->pWtReg = new casDGWriteReg ( *this );
}
}
@@ -259,9 +257,8 @@ void casDGIntfOS::armSend()
//
void casDGIntfOS::disarmSend ()
{
if (this->pWtReg) {
delete this->pWtReg;
}
delete this->pWtReg;
this->pWtReg = 0;
}
//
@@ -289,7 +286,7 @@ void casDGIntfOS::eventFlush()
// if there is nothing pending in the input
// queue, then flush the output queue
//
if ( this->in.bytesAvailable() == 0u ) {
if ( this->inBufBytesAvailable() == 0u ) {
this->armSend ();
}
}
@@ -301,14 +298,14 @@ void casDGIntfOS::show(unsigned level) const
{
printf ( "casDGIntfOS at %p\n",
static_cast <const void *> ( this ) );
if (this->pRdReg) {
this->pRdReg->show (level);
if ( this->pRdReg ) {
this->pRdReg->show ( level );
}
if (this->pWtReg) {
this->pWtReg->show (level);
if ( this->pWtReg ) {
this->pWtReg->show ( level );
}
if (this->pBCastRdReg) {
this->pBCastRdReg->show (level);
if ( this->pBCastRdReg ) {
this->pBCastRdReg->show ( level );
}
this->evWk.show (level);
this->ioWk.show (level);
@@ -328,7 +325,6 @@ void casDGReadReg::callBack()
//
casDGReadReg::~casDGReadReg()
{
os.pRdReg = NULL;
}
//
@@ -354,7 +350,6 @@ void casDGBCastReadReg::callBack()
//
casDGBCastReadReg::~casDGBCastReadReg()
{
os.pBCastRdReg = NULL;
}
//
@@ -372,7 +367,6 @@ void casDGBCastReadReg::show(unsigned level) const
//
casDGWriteReg::~casDGWriteReg()
{
os.pWtReg = NULL; // allow rearm (send callbacks are one shots)
}
//
@@ -380,11 +374,10 @@ casDGWriteReg::~casDGWriteReg()
//
void casDGWriteReg::callBack()
{
casDGIntfOS *pDGIOS = &this->os;
delete this; // allows rearm to occur if required
casDGIntfOS * pDGIOS = & this->os;
pDGIOS->sendCB();
//
// NO CODE HERE - see delete above
// NO CODE HERE - sendCB deletes this object
//
}
@@ -412,17 +405,18 @@ void casDGIntfOS::sendBlockSignal()
//
void casDGIntfOS::sendCB()
{
outBufClient::flushCondition flushCond;
// allows rearm to occur if required
this->disarmSend ();
//
// attempt to flush the output buffer
//
flushCond = this->out.flush();
outBufClient::flushCondition flushCond = this->flush ();
if ( flushCond != flushProgress ) {
return;
}
if (this->sendBlocked) {
if ( this->sendBlocked ) {
this->sendBlocked = false;
}
@@ -460,12 +454,12 @@ void casDGIntfOS::sendCB()
//
void casDGIntfOS::recvCB ( inBufClient::fillParameter parm )
{
assert (this->pRdReg);
assert ( this->pRdReg );
//
// copy in new messages
//
this->in.fill ( parm );
this->inBufFill ( parm );
this->processInput ();
//
@@ -478,7 +472,7 @@ void casDGIntfOS::recvCB ( inBufClient::fillParameter parm )
// (casDGReadReg is _not_ a onceOnly fdReg -
// therefore an explicit delete is required here)
//
if ( this->in.full() ) {
if ( this->inBufFull() ) {
this->disarmRecv(); // this deletes the casDGReadReg object
}
}
@@ -511,8 +505,8 @@ void casDGIntfOS::processInput()
// input buffer then keep sending the output
// buffer until it is empty
//
if ( this->out.bytesPresent() > 0u ) {
if ( this->in.bytesAvailable () == 0 ) {
if ( this->outBufBytesPresent() > 0u ) {
if ( this->inBufBytesAvailable () == 0 ) {
this->armSend ();
}
}

View File

@@ -39,9 +39,9 @@ public:
private:
casDGIOWakeup ioWk;
casDGEvWakeup evWk;
casDGReadReg *pRdReg;
casDGBCastReadReg *pBCastRdReg; // fix for solaris bug
casDGWriteReg *pWtReg;
casDGReadReg * pRdReg;
casDGBCastReadReg * pBCastRdReg; // fix for solaris bug
casDGWriteReg * pWtReg;
bool sendBlocked;
void armRecv ();

View File

@@ -42,20 +42,18 @@ casIntfOS::casIntfOS ( caServerI & casIn, clientBufMemoryManager & memMgrIn,
{
this->setNonBlocking();
this->pRdReg = new casServerReg(*this);
this->pRdReg = new casServerReg ( *this );
}
casIntfOS::~casIntfOS()
{
if (this->pRdReg) {
delete this->pRdReg;
}
delete this->pRdReg;
}
void casServerReg::callBack()
{
assert(this->os.pRdReg);
this->os.cas.connectCB(this->os);
assert ( this->os.pRdReg );
this->os.cas.connectCB ( this->os );
}
casServerReg::~casServerReg()

View File

@@ -36,8 +36,8 @@ public:
void show ( unsigned level ) const;
caNetAddr serverAddress () const;
private:
caServerI & cas;
casServerReg * pRdReg;
caServerI & cas;
casServerReg * pRdReg;
casIntfOS ( const casIntfOS & );
casIntfOS & operator = ( const casIntfOS & );

View File

@@ -58,7 +58,6 @@ inline casStreamReadReg::casStreamReadReg (casStreamOS &osIn) :
//
inline casStreamReadReg::~casStreamReadReg ()
{
os.pRdReg = NULL;
# if defined(DEBUG)
printf ("Read off %d\n", this->os.getFD());
printf ("Recv backlog %u\n",
@@ -104,7 +103,6 @@ inline casStreamWriteReg::casStreamWriteReg (casStreamOS &osIn) :
//
inline casStreamWriteReg::~casStreamWriteReg ()
{
os.pWtReg = NULL;
# if defined(DEBUG)
printf ("Write off %d\n", this->os.getFD());
printf ("Recv backlog %u\n",
@@ -248,20 +246,19 @@ void casStreamIOWakeup::start ( casStreamOS &os )
inline void casStreamOS::armRecv()
{
if ( ! this->pRdReg ) {
if ( ! this->in.full() ) {
if ( ! this->inBufFull() ) {
this->pRdReg = new casStreamReadReg ( *this );
}
}
}
//
// casStreamOS::disarmRecv()
// casStreamOS::disarmRecv ()
//
inline void casStreamOS::disarmRecv()
inline void casStreamOS::disarmRecv ()
{
if ( this->pRdReg ) {
delete this->pRdReg;
}
delete this->pRdReg;
this->pRdReg = 0;
}
//
@@ -269,7 +266,7 @@ inline void casStreamOS::disarmRecv()
//
inline void casStreamOS::armSend()
{
if ( this->out.bytesPresent () == 0u ) {
if ( this->outBufBytesPresent() == 0u ) {
return;
}
@@ -283,9 +280,8 @@ inline void casStreamOS::armSend()
//
inline void casStreamOS::disarmSend ()
{
if ( this->pWtReg ) {
delete this->pWtReg;
}
delete this->pWtReg;
this->pWtReg = 0;
}
//
@@ -313,7 +309,7 @@ void casStreamOS::eventFlush()
// if there is nothing pending in the input
// queue, then flush the output queue
//
if ( this->in.bytesAvailable() == 0u ) {
if ( this->inBufBytesAvailable() == 0u ) {
this->armSend ();
}
}
@@ -349,15 +345,15 @@ casStreamOS::~casStreamOS()
//
void casStreamOS::show ( unsigned level ) const
{
this->casStrmClient::show(level);
printf("casStreamOS at %p\n",
this->casStrmClient::show ( level );
printf ( "casStreamOS at %p\n",
static_cast <const void *> ( this ) );
printf("\tsendBlocked = %d\n", this->sendBlocked);
if (this->pWtReg) {
this->pWtReg->show(level);
printf ( "\tsendBlocked = %d\n", this->sendBlocked );
if ( this->pWtReg ) {
this->pWtReg->show ( level );
}
if (this->pRdReg) {
this->pRdReg->show(level);
if ( this->pRdReg ) {
this->pRdReg->show ( level );
}
this->evWk.show ( level );
this->ioWk.show ( level );
@@ -366,7 +362,7 @@ void casStreamOS::show ( unsigned level ) const
//
// casStreamReadReg::show()
//
void casStreamReadReg::show(unsigned level) const
void casStreamReadReg::show ( unsigned level ) const
{
this->fdReg::show ( level );
printf ( "casStreamReadReg at %p\n",
@@ -395,16 +391,16 @@ void casStreamOS::recvCB ()
//
// copy in new messages
//
inBufClient::fillCondition fillCond = this->in.fill();
inBufClient::fillCondition fillCond = this->inBufFill ();
if ( fillCond == casFillDisconnect ) {
this->getCAS().destroyClient ( *this );
}
else {
casProcCond procCond = this->processInput();
if (procCond == casProcDisconnect) {
casProcCond procCond = this->processInput ();
if ( procCond == casProcDisconnect ) {
this->getCAS().destroyClient ( *this );
}
else if ( this->in.full() ) {
else if ( this->inBufFull() ) {
//
// If there isnt any space then temporarily
// stop calling this routine until problem is resolved
@@ -415,7 +411,7 @@ void casStreamOS::recvCB ()
// (casStreamReadReg is _not_ a onceOnly fdReg -
// therefore an explicit delete is required here)
//
this->disarmRecv(); // this deletes the casStreamReadReg object
this->disarmRecv (); // this deletes the casStreamReadReg object
}
}
//
@@ -427,7 +423,7 @@ void casStreamOS::recvCB ()
//
// casStreamOS::sendBlockSignal()
//
void casStreamOS::sendBlockSignal()
void casStreamOS::sendBlockSignal ()
{
this->sendBlocked = true;
this->armSend ();
@@ -436,36 +432,37 @@ void casStreamOS::sendBlockSignal()
//
// casStreamWriteReg::show()
//
void casStreamWriteReg::show(unsigned level) const
void casStreamWriteReg::show ( unsigned level ) const
{
this->fdReg::show (level);
this->fdReg::show ( level );
printf ( "casStreamWriteReg at %p\n",
static_cast <const void *> ( this ) );
}
//
// casStreamWriteReg::callBack()
// casStreamWriteReg::callBack ()
//
void casStreamWriteReg::callBack()
{
casStreamOS *pSOS = &this->os;
delete this; // allows rearm to occur if required
pSOS->sendCB();
casStreamOS * pSOS = & this->os;
pSOS->sendCB ();
//
// NO CODE HERE - see delete above
// NO CODE HERE - sendCB deletes this object
//
}
//
// casStreamOS::sendCB()
// casStreamOS::sendCB ()
//
void casStreamOS::sendCB()
void casStreamOS::sendCB ()
{
this->disarmSend ();
//
// attempt to flush the output buffer
//
outBufClient::flushCondition flushCond = this->out.flush ();
if (flushCond==flushProgress) {
outBufClient::flushCondition flushCond = this->flush ();
if ( flushCond == flushProgress ) {
if ( this->sendBlocked ) {
this->sendBlocked = false;
}
@@ -513,10 +510,10 @@ void casStreamOS::sendCB()
}
# if defined(DEBUG)
printf ("write attempted on %d result was %d\n",
this->getFD(), flushCond);
printf ("Recv backlog %u\n", this->in.bytesPresent());
printf ("Send backlog %u\n", this->out.bytesPresent());
printf ( "write attempted on %d result was %d\n",
this->getFD(), flushCond );
printf ( "Recv backlog %u\n", this->in.bytesPresent() );
printf ( "Send backlog %u\n", this->out.bytesPresent() );
# endif
//
@@ -540,8 +537,8 @@ void casStreamOS::sendCB()
// additional bytes may have been added since
// we flushed the out buffer
//
if ( this->out.bytesPresent() > 0u &&
this->in.bytesAvailable() == 0u ) {
if ( this->outBufBytesPresent() > 0u &&
this->inBufBytesAvailable() == 0u ) {
this->armSend();
}
}
@@ -574,7 +571,7 @@ casProcCond casStreamOS::processInput() // X aCC 361
// if there is nothing pending in the input
// queue, then flush the output queue
//
if ( this->in.bytesAvailable() == 0u ) {
if ( this->inBufBytesAvailable() == 0u ) {
this->armSend ();
}
this->armRecv ();

View File

@@ -14,15 +14,13 @@
#define epicsExportSharedSymbols
#include "casStreamIO.h"
//
// casStreamIO::casStreamIO()
//
casStreamIO::casStreamIO ( caServerI & cas, clientBufMemoryManager & bufMgr,
const ioArgsToNewStreamIO & args ) :
casStrmClient ( cas, bufMgr ), sock ( args.sock ), addr ( args.addr),
blockingFlag ( xIsBlocking )
blockingFlag ( xIsBlocking ), sockHasBeenClosed ( false )
{
assert (sock>=0);
assert ( sock >= 0 );
int yes = true;
int status;
@@ -30,16 +28,12 @@ casStreamIO::casStreamIO ( caServerI & cas, clientBufMemoryManager & bufMgr,
* see TCP(4P) this seems to make unsollicited single events much
* faster. I take care of queue up as load increases.
*/
status = setsockopt(
this->sock,
IPPROTO_TCP,
TCP_NODELAY,
(char *)&yes,
sizeof(yes));
if (status<0) {
errlogPrintf(
status = setsockopt ( this->sock, IPPROTO_TCP, TCP_NODELAY,
( char * ) & yes, sizeof ( yes ) );
if ( status < 0 ) {
errlogPrintf (
"CAS: %s TCP_NODELAY option set failed %s\n",
__FILE__, SOCKERRSTR(SOCKERRNO));
__FILE__, SOCKERRSTR(SOCKERRNO) );
throw S_cas_internal;
}
@@ -47,16 +41,12 @@ casStreamIO::casStreamIO ( caServerI & cas, clientBufMemoryManager & bufMgr,
* turn on KEEPALIVE so if the client crashes
* this task will find out and exit
*/
status = setsockopt(
sock,
SOL_SOCKET,
SO_KEEPALIVE,
(char *)&yes,
sizeof(yes));
status = setsockopt ( sock, SOL_SOCKET, SO_KEEPALIVE,
(char *) & yes, sizeof ( yes ) );
if (status<0) {
errlogPrintf(
errlogPrintf (
"CAS: %s SO_KEEPALIVE option set failed %s\n",
__FILE__, SOCKERRSTR(SOCKERRNO));
__FILE__, SOCKERRSTR(SOCKERRNO) );
throw S_cas_internal;
}
@@ -99,19 +89,15 @@ casStreamIO::casStreamIO ( caServerI & cas, clientBufMemoryManager & bufMgr,
}
//
// casStreamIO::~casStreamIO()
//
casStreamIO::~casStreamIO()
{
if (sock>=0) {
socket_close(this->sock);
if ( ! this->sockHasBeenClosed ) {
socket_close ( this->sock );
}
}
//
// casStreamIO::osdSend()
//
outBufClient::flushCondition casStreamIO::osdSend ( const char *pInBuf, bufSizeT nBytesReq,
bufSizeT &nBytesActual )
{
@@ -154,9 +140,7 @@ outBufClient::flushCondition casStreamIO::osdSend ( const char *pInBuf, bufSizeT
return outBufClient::flushProgress;
}
//
// casStreamIO::osdRecv()
//
inBufClient::fillCondition
casStreamIO::osdRecv ( char * pInBuf, bufSizeT nBytes, // X aCC 361
bufSizeT & nBytesActual )
@@ -194,9 +178,23 @@ casStreamIO::osdRecv ( char * pInBuf, bufSizeT nBytes, // X aCC 361
}
}
//
// casStreamIO::forceDisconnect()
void casStreamIO::forceDisconnect ()
{
if ( ! this->sockHasBeenClosed ) {
this->sockHasBeenClosed;
int status = ::shutdown ( this->sock, SD_BOTH );
if ( status ) {
errlogPrintf ("CAC TCP socket shutdown error was %s\n",
SOCKERRSTR (SOCKERRNO) );
}
socket_close ( this->sock );
// other wakeup will be required here when we
// switch to a threaded implementation
}
}
// casStreamIO::show()
//
void casStreamIO::osdShow (unsigned level) const
{
printf ( "casStreamIO at %p\n",
@@ -210,9 +208,7 @@ void casStreamIO::osdShow (unsigned level) const
}
}
//
// casStreamIO::xSsetNonBlocking()
//
void casStreamIO::xSetNonBlocking()
{
int status;
@@ -229,17 +225,13 @@ void casStreamIO::xSetNonBlocking()
}
}
//
// casStreamIO::blockingState()
//
xBlockingStatus casStreamIO::blockingState() const
{
return this->blockingFlag;
}
//
// casStreamIO::incomingBytesPresent()
//
bufSizeT casStreamIO::incomingBytesPresent () const // X aCC 361
{
int status;
@@ -271,17 +263,13 @@ bufSizeT casStreamIO::incomingBytesPresent () const // X aCC 361
}
}
//
// casStreamIO::hostName()
//
void casStreamIO::hostName ( char *pInBuf, unsigned bufSizeIn ) const
void casStreamIO::hostName ( char * pInBuf, unsigned bufSizeIn ) const
{
ipAddrToA ( &this->addr, pInBuf, bufSizeIn );
ipAddrToA ( & this->addr, pInBuf, bufSizeIn );
}
//
// casStreamIO:::optimumBufferSize()
//
bufSizeT casStreamIO::optimumBufferSize ()
{
@@ -309,9 +297,7 @@ printf("the tcp buf size is %d\n", size);
#endif
}
//
// casStreamIO::getFD()
//
int casStreamIO::getFD() const
{
return this->sock;

View File

@@ -27,36 +27,32 @@ public:
casStreamIO ( caServerI &, clientBufMemoryManager &,
const ioArgsToNewStreamIO & );
~casStreamIO ();
int getFD () const;
void xSetNonBlocking ();
const caNetAddr getAddr() const;
void hostName ( char *pBuf, unsigned bufSize ) const;
outBufClient::flushCondition osdSend ( const char *pBuf, bufSizeT nBytesReq,
bufSizeT & nBytesActual );
inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq,
bufSizeT & nBytesActual );
xBlockingStatus blockingState() const;
bufSizeT incomingBytesPresent() const;
static bufSizeT optimumBufferSize ();
void osdShow ( unsigned level ) const;
const caNetAddr getAddr() const
{
return caNetAddr ( this->addr );
}
private:
SOCKET sock;
struct sockaddr_in addr;
xBlockingStatus blockingFlag;
bool sockHasBeenClosed;
xBlockingStatus blockingState() const;
bufSizeT incomingBytesPresent() const;
static bufSizeT optimumBufferSize ();
void osdShow ( unsigned level ) const;
outBufClient::flushCondition osdSend ( const char *pBuf, bufSizeT nBytesReq,
bufSizeT & nBytesActual );
inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq,
bufSizeT & nBytesActual );
void forceDisconnect ();
casStreamIO ( const casStreamIO & );
casStreamIO & operator = ( const casStreamIO & );
};
inline const caNetAddr casStreamIO::getAddr() const
{
return caNetAddr ( this->addr );
}
#endif // casStreamIOh