large array modifications

This commit is contained in:
Jeff Hill
2002-05-29 00:06:02 +00:00
parent 8caf6b3793
commit 565a372df1
23 changed files with 1309 additions and 1173 deletions

View File

@@ -62,7 +62,8 @@ caServerI::caServerI (caServer &tool) :
adapter (tool),
debugLevel (0u),
nEventsProcessed (0u),
nEventsPosted (0u)
nEventsPosted (0u),
beaconCounter (0u)
{
caStatus status;
double maxPeriod;
@@ -239,10 +240,12 @@ void caServerI::sendBeacon()
epicsGuard < epicsMutex > locker ( this->mutex );
tsDLIterBD <casIntfOS> iter = this->intfList.firstIter ();
while ( iter.valid () ) {
iter->sendBeacon ();
iter->sendBeacon ( this->beaconCounter );
iter++;
}
}
this->beaconCounter++;
//
// double the period between beacons (but dont exceed max)

View File

@@ -37,26 +37,23 @@
//
// casAsyncReadIO::casAsyncReadIO()
//
casAsyncReadIO::casAsyncReadIO(const casCtx &ctx) :
casAsyncIOI(*ctx.getClient()),
msg(*ctx.getMsg()),
chan(*ctx.getChannel()),
pDD(NULL),
completionStatus(S_cas_internal)
casAsyncReadIO::casAsyncReadIO ( const casCtx & ctx ) :
casAsyncIOI ( *ctx.getClient() ), msg ( *ctx.getMsg() ),
chan( *ctx.getChannel () ), pDD ( NULL ), completionStatus ( S_cas_internal )
{
assert (&this->chan);
assert ( &this->chan );
this->chan.installAsyncIO(*this);
this->chan.installAsyncIO ( *this );
}
//
// casAsyncReadIO::~casAsyncReadIO()
//
casAsyncReadIO::~casAsyncReadIO()
casAsyncReadIO::~casAsyncReadIO ()
{
this->lock();
this->chan.removeAsyncIO(*this);
this->chan.removeAsyncIO ( *this );
this->unlock();
}

View File

@@ -0,0 +1,118 @@
/*
* $Id$
*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*
* Experimental Physics and Industrial Control System (EPICS)
*
* Copyright 1991, the Regents of the University of California,
* and the University of Chicago Board of Governors.
*
* This software was produced under U.S. Government contracts:
* (W-7405-ENG-36) at the Los Alamos National Laboratory,
* and (W-31-109-ENG-38) at Argonne National Laboratory.
*
* Initial development by:
* The Controls and Automation Group (AT-8)
* Ground Test Accelerator
* Accelerator Technology Division
* Los Alamos National Laboratory
*
* Co-developed with
* The Controls and Computing Group
* Accelerator Systems Division
* Advanced Photon Source
* Argonne National Laboratory
*
*/
#include <new>
#include "envdefs.h"
#include "freelist.h"
#include "server.h"
epicsSingleton < casBufferFactory > pGlobalBufferFactoryCAS;
casBufferFactory::casBufferFactory () :
smallBufFreeList ( 0 ), largeBufFreeList ( 0 ), largeBufferSizePriv ( 0u )
{
long maxBytesAsALong;
long status = envGetLongConfigParam ( & EPICS_CA_MAX_ARRAY_BYTES, & maxBytesAsALong );
if ( status || maxBytesAsALong < 0 ) {
errlogPrintf ( "cas: EPICS_CA_MAX_ARRAY_BYTES was not a positive integer\n" );
this->largeBufferSizePriv = MAX_TCP;
}
else {
/* allow room for the protocol header so that they get the array size they requested */
static const unsigned headerSize = sizeof ( caHdr ) + 2 * sizeof ( ca_uint32_t );
ca_uint32_t maxBytes = ( unsigned ) maxBytesAsALong;
if ( maxBytes < 0xffffffff - headerSize ) {
maxBytes += headerSize;
}
else {
maxBytes = 0xffffffff;
}
if ( maxBytes < MAX_TCP ) {
errlogPrintf ( "cas: EPICS_CA_MAX_ARRAY_BYTES was rounded up to %u\n", MAX_TCP );
this->largeBufferSizePriv = MAX_TCP;
}
else {
this->largeBufferSizePriv = maxBytes;
}
}
freeListInitPvt ( & this->smallBufFreeList, MAX_MSG_SIZE, 8 );
freeListInitPvt ( & this->largeBufFreeList, this->largeBufferSizePriv, 1 );
}
casBufferFactory::~casBufferFactory ()
{
freeListCleanup ( this->smallBufFreeList );
freeListCleanup ( this->largeBufFreeList );
}
unsigned casBufferFactory::smallBufferSize () const
{
return MAX_MSG_SIZE;
}
char * casBufferFactory::newSmallBuffer ()
{
void * pBuf = freeListCalloc ( this->smallBufFreeList );
if ( ! pBuf ) {
throw std::bad_alloc();
}
return static_cast < char * > ( pBuf );
}
void casBufferFactory::destroySmallBuffer ( char * pBuf )
{
if ( pBuf ) {
freeListFree ( this->smallBufFreeList, pBuf );
}
}
unsigned casBufferFactory::largeBufferSize () const
{
return this->largeBufferSizePriv;
}
char * casBufferFactory::newLargeBuffer ()
{
void * pBuf = freeListCalloc ( this->largeBufFreeList );
if ( ! pBuf ) {
throw std::bad_alloc();
}
return static_cast < char * > ( pBuf );
}
void casBufferFactory::destroyLargeBuffer ( char * pBuf )
{
if ( pBuf ) {
freeListFree ( this->largeBufFreeList, pBuf );
}
}

View File

@@ -204,4 +204,23 @@ void casChannelI::destroyClientNotify ()
this->destroy();
}
//
// casChannelI::findMonitor
// (it is reasonable to do a linear search here because
// sane clients will require only one or two monitors
// per channel)
//
tsDLIterBD <casMonitor> casChannelI::findMonitor (const caResId clientIdIn)
{
this->lock ();
tsDLIterBD <casMonitor> iter = this->monitorList.firstIter ();
while ( iter.valid () ) {
if ( clientIdIn == iter->getClientId () ) {
break;
}
iter++;
}
this->unlock ();
return iter;
}

View File

@@ -96,26 +96,6 @@ inline void casChannelI::addMonitor(casMonitor &mon)
this->unlock();
}
//
// casChannelI::findMonitor
// (it is reasonable to do a linear search here because
// sane clients will require only one or two monitors
// per channel)
//
inline tsDLIterBD <casMonitor> casChannelI::findMonitor (const caResId clientIdIn)
{
this->lock ();
tsDLIterBD <casMonitor> iter = this->monitorList.firstIter ();
while ( iter.valid () ) {
if ( clientIdIn == iter->getClientId () ) {
break;
}
iter++;
}
this->unlock ();
return iter;
}
//
// casChannelI::destroyNoClientNotify()
//

View File

@@ -51,9 +51,9 @@ casClient::pCASMsgHandler casClient::msgHandlers[CA_PROTO_LAST_CMMD+1u];
//
// casClient::casClient()
//
casClient::casClient(caServerI &serverInternal, bufSizeT ioSizeMinIn) :
casCoreClient (serverInternal),
inBuf (MAX_MSG_SIZE, ioSizeMinIn), outBuf (MAX_MSG_SIZE)
casClient::casClient ( caServerI &serverInternal, bufSizeT ioSizeMinIn ) :
casCoreClient ( serverInternal ), in ( *this, ioSizeMinIn ), out ( *this ),
minor_version_number ( 0 ), incommingBytesToDrain ( 0 )
{
//
// static member init
@@ -183,8 +183,8 @@ void casClient::show (unsigned level) const
printf ( "casClient at %p\n",
static_cast <const void *> ( this ) );
this->casCoreClient::show (level);
this->inBuf::show (level);
this->outBuf::show (level);
this->in.show (level);
this->out.show (level);
}
//
@@ -192,88 +192,122 @@ void casClient::show (unsigned level) const
//
caStatus casClient::processMsg ()
{
unsigned msgsize;
unsigned bytesLeft;
int status;
const caHdr *mp;
const char *rawMP;
// drain message that does not fit
if ( this->incommingBytesToDrain ) {
unsigned bytesLeft = this->in.bytesPresent();
if ( bytesLeft < this->incommingBytesToDrain ) {
this->in.removeMsg ( bytesLeft );
this->incommingBytesToDrain -= bytesLeft;
return S_cas_success;
}
else {
this->in.removeMsg ( this->incommingBytesToDrain );
this->incommingBytesToDrain = 0u;
}
}
//
// process any messages in the in buffer
//
status = S_cas_success;
int status = S_cas_success;
while ( (bytesLeft = this->inBuf::bytesPresent()) ) {
unsigned bytesLeft;
while ( ( bytesLeft = this->in.bytesPresent() ) ) {
caHdrLargeArray msgTmp;
unsigned msgSize;
ca_uint32_t hdrSize;
char * rawMP;
{
//
// copy as raw bytes in order to avoid
// alignment problems
//
caHdr smallHdr;
if ( bytesLeft < sizeof ( smallHdr ) ) {
break;
}
/*
* incomplete message - return success and
* wait for more bytes to arrive
*/
if (bytesLeft < sizeof(*mp)) {
status = S_cas_success;
break;
}
rawMP = this->in.msgPtr ();
memcpy ( & smallHdr, rawMP, sizeof ( smallHdr ) );
rawMP = this->inBuf::msgPtr();
this->ctx.setMsg(rawMP);
ca_uint32_t payloadSize = epicsNTOH16 ( smallHdr.m_postsize );
ca_uint32_t nElem = epicsNTOH16 ( smallHdr.m_count );
if ( payloadSize != 0xffff && nElem != 0xffff ) {
hdrSize = sizeof ( smallHdr );
}
else {
ca_uint32_t LWA[2];
hdrSize = sizeof ( smallHdr ) + sizeof ( LWA );
if ( bytesLeft < hdrSize ) {
break;
}
//
// copy as raw bytes in order to avoid
// alignment problems
//
memcpy ( LWA, rawMP + sizeof ( caHdr ), sizeof( LWA ) );
payloadSize = epicsNTOH32 ( LWA[0] );
nElem = epicsNTOH32 ( LWA[1] );
}
//
// get pointer to msg copy in local byte order
//
mp = this->ctx.getMsg();
msgTmp.m_cmmd = epicsNTOH16 ( smallHdr.m_cmmd );
msgTmp.m_postsize = payloadSize;
msgTmp.m_dataType = epicsNTOH16 ( smallHdr.m_dataType );
msgTmp.m_count = nElem;
msgTmp.m_cid = epicsNTOH32 ( smallHdr.m_cid );
msgTmp.m_available = epicsNTOH32 ( smallHdr.m_available );
msgsize = mp->m_postsize + sizeof(*mp);
/*
* incomplete message - return success and
* wait for more bytes to arrive
*/
if (msgsize > bytesLeft) {
status = S_cas_success;
break;
}
msgSize = hdrSize + payloadSize;
if ( bytesLeft < msgSize ) {
if ( msgSize > this->in.bufferSize() ) {
this->in.expandBuffer ();
// msg to large - set up message drain
if ( msgSize > this->in.bufferSize() ) {
this->dumpMsg ( & msgTmp, 0,
"The client requested transfer is greater than available "
"memory in server or EPICS_CA_MAX_ARRAY_BYTES\n" );
status = this->sendErr ( & msgTmp, ECA_TOLARGE,
"request didnt fit within the CA server's message buffer" );
this->in.removeMsg ( bytesLeft );
this->incommingBytesToDrain = msgSize - bytesLeft;
}
}
break;
}
this->ctx.setData((void *)(rawMP+sizeof(*mp)));
this->ctx.setMsg ( msgTmp, rawMP + hdrSize );
if (this->getCAS().getDebugLevel()> 2u) {
this->dumpMsg (mp, (void *)(mp+1), NULL);
}
if ( this->getCAS().getDebugLevel() > 2u ) {
this->dumpMsg ( & msgTmp, rawMP + hdrSize, 0 );
}
}
//
// Reset the context to the default
// (guarantees that previous message does not get mixed
// up with the current message)
//
this->ctx.setChannel (NULL);
this->ctx.setPV (NULL);
this->ctx.setChannel ( NULL );
this->ctx.setPV ( NULL );
//
// Check for bad protocol element
// Call protocol stub
//
if (mp->m_cmmd >= NELEMENTS(casClient::msgHandlers)){
status = this->uknownMessageAction ();
pCASMsgHandler pHandler;
if ( msgTmp.m_cmmd < NELEMENTS ( casClient::msgHandlers ) ) {
pHandler = this->casClient::msgHandlers[msgTmp.m_cmmd];
}
else {
pHandler = & casClient::uknownMessageAction;
}
status = ( this->*pHandler ) ();
if ( status ) {
break;
}
//
// Call protocol stub
//
try {
status = (this->*casClient::msgHandlers[mp->m_cmmd]) ();
if (status) {
break;
}
}
catch (...) {
this->dumpMsg (mp, this->ctx.getData(), "request resulted in C++ exception\n");
status = this->sendErr (mp, ECA_INTERNAL, "request resulted in C++ exception");
if (status) {
break;
}
break;
}
this->inBuf::removeMsg (msgsize);
this->in.removeMsg ( msgSize );
}
return status;
@@ -339,22 +373,17 @@ caStatus casClient::hostNameAction ()
//
caStatus casClient::echoAction ()
{
const caHdr *mp = this->ctx.getMsg();
void *dp = this->ctx.getData();
int status;
caHdr *reply;
status = this->outBuf::allocMsg (mp->m_postsize, &reply);
if (status) {
if (status==S_cas_hugeRequest) {
status = sendErr(mp, ECA_TOLARGE, NULL);
}
return status;
}
*reply = *mp;
memcpy((char *) (reply+1), (char *) dp, mp->m_postsize);
this->outBuf::commitMsg();
const caHdrLargeArray * mp = this->ctx.getMsg();
const void * dp = this->ctx.getData();
void * pPayloadOut;
caStatus status = this->out.copyInHeader ( mp->m_cmmd, mp->m_postsize,
mp->m_dataType, mp->m_count, mp->m_cid, mp->m_available,
& pPayloadOut );
if ( ! status ) {
memcpy ( pPayloadOut, dp, mp->m_postsize );
this->out.commitMsg ();
}
return S_cas_success;
}
@@ -369,66 +398,48 @@ caStatus casClient::noopAction ()
// send minor protocol revision to the client
void casClient::sendVersion ()
{
caHdr * pReply;
caStatus status = this->allocMsg ( 0, &pReply );
if ( status ) {
return;
caStatus status = this->out.copyInHeader ( CA_PROTO_VERSION, 0,
0, CA_MINOR_PROTOCOL_REVISION, 0, 0, 0 );
if ( ! status ) {
this->out.commitMsg ();
}
memset ( pReply, '\0', sizeof ( *pReply ) );
pReply->m_cmmd = CA_PROTO_VERSION;
pReply->m_count = CA_MINOR_PROTOCOL_REVISION;
this->commitMsg ();
}
//
// casClient::sendErr()
//
caStatus casClient::sendErr(
const caHdr *curp,
const int reportedStatus,
const char *pformat,
...
)
caStatus casClient::sendErr ( const caHdrLargeArray *curp, const int reportedStatus,
const char *pformat, ... )
{
va_list args;
casChannelI *pciu;
unsigned size;
caHdr *reply;
char *pMsgString;
int status;
unsigned stringSize;
char msgBuf[1024]; /* allocate plenty of space for the message string */
if (pformat) {
va_start (args, pformat);
status = vsprintf (msgBuf, pformat, args);
if (status<0) {
if ( pformat ) {
va_list args;
va_start ( args, pformat );
int status = vsprintf (msgBuf, pformat, args);
if ( status < 0 ) {
errPrintf (S_cas_internal, __FILE__, __LINE__,
"bad sendErr(%s)", pformat);
size = 0u;
stringSize = 0u;
}
else {
size = 1u + (unsigned) status;
stringSize = 1u + (unsigned) status;
}
}
else {
size = 0u;
stringSize = 0u;
}
/*
* allocate plenty of space for a sprintf() buffer
*/
status = this->outBuf::allocMsg (size+sizeof(caHdr), &reply);
if (status) {
return status;
}
unsigned hdrSize = sizeof ( caHdr );
if ( ( curp->m_postsize >= 0xffff || curp->m_count >= 0xffff ) &&
CA_V49( this->minor_version_number ) ) {
hdrSize += 2 * sizeof ( ca_uint32_t );
}
reply[0] = nill_msg;
reply[0].m_cmmd = CA_PROTO_ERROR;
reply[0].m_available = reportedStatus;
switch (curp->m_cmmd) {
ca_uint32_t cid = 0u;
switch ( curp->m_cmmd ) {
case CA_PROTO_SEARCH:
reply->m_cid = curp->m_cid;
cid = curp->m_cid;
break;
case CA_PROTO_EVENT_ADD:
@@ -437,52 +448,70 @@ const char *pformat,
case CA_PROTO_READ_NOTIFY:
case CA_PROTO_WRITE:
case CA_PROTO_WRITE_NOTIFY:
/*
* Verify the channel
*/
pciu = this->resIdToChannel(curp->m_cid);
if(pciu){
reply->m_cid = pciu->getCID();
}
else{
reply->m_cid = ~0u;
}
break;
{
/*
* Verify the channel
*/
casChannelI * pciu = this->resIdToChannel ( curp->m_cid );
if ( pciu ) {
cid = pciu->getCID();
}
else{
cid = ~0u;
}
break;
}
case CA_PROTO_EVENTS_ON:
case CA_PROTO_EVENTS_OFF:
case CA_PROTO_READ_SYNC:
case CA_PROTO_SNAPSHOT:
default:
reply->m_cid = (caResId) ~0UL;
cid = (caResId) ~0UL;
break;
}
/*
* copy back the request protocol
* (in network byte order)
*/
reply[1].m_postsize = epicsHTON16 (curp->m_postsize);
reply[1].m_cmmd = epicsHTON16 (curp->m_cmmd);
reply[1].m_dataType = epicsHTON16 (curp->m_dataType);
reply[1].m_count = epicsHTON16 (curp->m_count);
reply[1].m_cid = epicsHTON32 (curp->m_cid);
reply[1].m_available = epicsHTON32 (curp->m_available);
caHdr * pReqOut;
caStatus status = this->out.copyInHeader ( CA_PROTO_ERROR,
hdrSize + stringSize, 0, 0, cid, reportedStatus,
reinterpret_cast <void **> ( & pReqOut ) );
if ( ! status ) {
char * pMsgString;
/*
* add their optional context string into the protocol
*/
if (size>0u) {
pMsgString = (char *) (reply+2u);
strncpy (pMsgString, msgBuf, size-1u);
pMsgString[size-1u] = '\0';
}
/*
* copy back the request protocol
* (in network byte order)
*/
if ( ( curp->m_postsize >= 0xffff || curp->m_count >= 0xffff ) &&
CA_V49( this->minor_version_number ) ) {
ca_uint32_t *pLW = ( ca_uint32_t * ) ( pReqOut + 1 );
pReqOut->m_cmmd = htons ( curp->m_cmmd );
pReqOut->m_postsize = htons ( 0xffff );
pReqOut->m_dataType = htons ( curp->m_dataType );
pReqOut->m_count = htons ( 0u );
pReqOut->m_cid = htonl ( curp->m_cid );
pReqOut->m_available = htonl ( curp->m_available );
pLW[0] = htonl ( curp->m_postsize );
pLW[1] = htonl ( curp->m_count );
pMsgString = ( char * ) ( pLW + 2 );
}
else {
pReqOut->m_cmmd = htons (curp->m_cmmd);
pReqOut->m_postsize = htons ( ( (ca_uint16_t) curp->m_postsize ) );
pReqOut->m_dataType = htons (curp->m_dataType);
pReqOut->m_count = htons ( ( (ca_uint16_t) curp->m_count ) );
pReqOut->m_cid = htonl (curp->m_cid);
pReqOut->m_available = htonl (curp->m_available);
pMsgString = ( char * ) ( pReqOut + 1 );
}
size += sizeof(caHdr);
assert ( size < 0xffff );
reply->m_postsize = static_cast <ca_uint16_t> ( size );
/*
* add their context string into the protocol
*/
memcpy ( pMsgString, msgBuf, stringSize );
this->outBuf::commitMsg();
this->out.commitMsg ();
}
return S_cas_success;
}
@@ -493,52 +522,44 @@ const char *pformat,
* same as sendErr() except that we convert epicsStatus
* to a string and send that additional detail
*/
caStatus casClient::sendErrWithEpicsStatus(const caHdr *pMsg,
caStatus epicsStatus, caStatus clientStatus)
caStatus casClient::sendErrWithEpicsStatus ( const caHdrLargeArray * pMsg,
caStatus epicsStatus, caStatus clientStatus )
{
long status;
char buf[0x1ff];
status = errSymFind(epicsStatus, buf);
status = errSymFind ( epicsStatus, buf );
if (status) {
sprintf(buf, "UKN error code = 0X%u\n",
epicsStatus);
sprintf ( buf, "UKN error code = 0X%u\n",
epicsStatus );
}
return this->sendErr(pMsg, clientStatus, buf);
return this->sendErr ( pMsg, clientStatus, buf );
}
/*
* casClient::logBadIdWithFileAndLineno()
*/
caStatus casClient::logBadIdWithFileAndLineno(
const caHdr *mp,
const void *dp,
const int cacStatus,
const char *pFileName,
const unsigned lineno,
const unsigned id
caStatus casClient::logBadIdWithFileAndLineno ( const caHdrLargeArray * mp,
const void * dp, const int cacStatus, const char * pFileName,
const unsigned lineno, const unsigned id
)
{
int status;
if (pFileName) {
this->dumpMsg (mp, dp,
if ( pFileName) {
this-> dumpMsg ( mp, dp,
"bad resource id in \"%s\" at line %d\n",
pFileName, lineno);
pFileName, lineno );
}
else {
this->dumpMsg (mp, dp,
"bad resource id\n");
this->dumpMsg ( mp, dp,
"bad resource id\n" );
}
status = this->sendErr (
mp,
cacStatus,
"Bad Resource ID=%u detected at %s.%d",
id,
pFileName,
lineno);
mp, cacStatus, "Bad Resource ID=%u detected at %s.%d",
id, pFileName, lineno);
return status;
}
@@ -551,21 +572,22 @@ const unsigned id
// dp arg allowed to be null
//
//
void casClient::dumpMsg(const caHdr *mp, const void *dp, const char *pFormat, ...)
void casClient::dumpMsg ( const caHdrLargeArray *mp,
const void *dp, const char *pFormat, ... )
{
casChannelI *pciu;
char pName[64u];
char pPVName[64u];
va_list theArgs;
if (pFormat) {
va_start (theArgs, pFormat);
errlogPrintf ("CAS: ");
errlogVprintf (pFormat, theArgs);
va_end (theArgs);
if ( pFormat ) {
va_start ( theArgs, pFormat );
errlogPrintf ( "CAS: " );
errlogVprintf ( pFormat, theArgs );
va_end ( theArgs );
}
this->clientHostName (pName, sizeof (pName));
this->hostName (pName, sizeof (pName));
pciu = this->resIdToChannel(mp->m_cid);
@@ -591,8 +613,8 @@ void casClient::dumpMsg(const caHdr *mp, const void *dp, const char *pFormat, ..
mp->m_postsize,
mp->m_available);
if (mp->m_cmmd==CA_PROTO_WRITE && mp->m_dataType==DBR_STRING && dp) {
errlogPrintf("CAS: The string written: %s \n", (char *)dp);
if ( mp->m_cmmd == CA_PROTO_WRITE && mp->m_dataType == DBR_STRING && dp ) {
errlogPrintf("CAS: The string written: %s \n", (char *)dp);
}
}

View File

@@ -29,6 +29,9 @@
*
* History
* $Log$
* Revision 1.8 2002/02/06 02:28:24 jhill
* fixed warnings
*
* Revision 1.7 2001/01/11 21:54:53 jhill
* accomodate Marty's osi => epics name changes
*
@@ -83,11 +86,11 @@ casClientMon::~casClientMon()
//
// casClientMon::callBack()
//
caStatus casClientMon::callBack (const smartConstGDDPointer &value)
caStatus casClientMon::callBack ( const smartConstGDDPointer &value )
{
casCoreClient &client = this->getChannel().getClient();
casCoreClient & client = this->getChannel().getClient();
caStatus status;
caHdr msg;
caHdrLargeArray msg;
//
// reconstruct the msg header
@@ -98,13 +101,13 @@ caStatus casClientMon::callBack (const smartConstGDDPointer &value)
assert ( type <= 0xffff );
msg.m_dataType = static_cast <ca_uint16_t> ( type );
unsigned long count = this->getCount();
assert ( count <= 0xffff );
msg.m_count = static_cast <ca_uint16_t> ( count );
assert ( count <= 0xffffffff );
msg.m_count = static_cast <ca_uint32_t> ( count );
msg.m_cid = this->getChannel().getSID();
msg.m_available = this->getClientId();
status = client.monitorResponse (this->getChannel(),
msg, value, S_cas_success);
status = client.monitorResponse ( this->getChannel(),
msg, value, S_cas_success );
return status;
}

View File

@@ -106,36 +106,36 @@ void casCoreClient::show (unsigned level) const
// asynchronous completion
//
caStatus casCoreClient::asyncSearchResponse (
const caNetAddr &, const caHdr &, const pvExistReturn &)
const caNetAddr &, const caHdrLargeArray &, const pvExistReturn & )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::createChanResponse (const caHdr &, const pvAttachReturn &)
caStatus casCoreClient::createChanResponse ( const caHdrLargeArray &, const pvAttachReturn & )
{
return S_casApp_noSupport;
}
caStatus casCoreClient::readResponse (casChannelI *, const caHdr &,
caStatus casCoreClient::readResponse (casChannelI *, const caHdrLargeArray &,
const smartConstGDDPointer &, const caStatus)
{
return S_casApp_noSupport;
}
caStatus casCoreClient::readNotifyResponse (casChannelI *, const caHdr &,
caStatus casCoreClient::readNotifyResponse (casChannelI *, const caHdrLargeArray &,
const smartConstGDDPointer &, const caStatus)
{
return S_casApp_noSupport;
}
caStatus casCoreClient::writeResponse (const caHdr &,
caStatus casCoreClient::writeResponse (const caHdrLargeArray &,
const caStatus)
{
return S_casApp_noSupport;
}
caStatus casCoreClient::writeNotifyResponse (const caHdr &,
caStatus casCoreClient::writeNotifyResponse (const caHdrLargeArray &,
const caStatus)
{
return S_casApp_noSupport;
}
caStatus casCoreClient::monitorResponse (casChannelI &, const caHdr &,
const smartConstGDDPointer &, const caStatus)
caStatus casCoreClient::monitorResponse ( casChannelI &, const caHdrLargeArray &,
const smartConstGDDPointer &, const caStatus )
{
return S_casApp_noSupport;
}

View File

@@ -17,15 +17,15 @@ inline casCtx::casCtx() :
//
// casCtx::getMsg()
//
inline const caHdr *casCtx::getMsg() const
inline const caHdrLargeArray * casCtx::getMsg() const
{
return &this->msg;
return & this->msg;
}
//
// casCtx::getData()
//
inline void *casCtx::getData() const
inline void * casCtx::getData() const
{
return this->pData;
}
@@ -64,29 +64,11 @@ inline casChannelI * casCtx::getChannel() const
//
// casCtx::setMsg()
// (assumes incoming message is in network byte order)
//
inline void casCtx::setMsg(const char *pBuf)
inline void casCtx::setMsg ( caHdrLargeArray &msgIn, void * pBody )
{
//
// copy as raw bytes in order to avoid
// alignment problems
//
memcpy (&this->msg, pBuf, sizeof(this->msg));
this->msg.m_cmmd = epicsNTOH16 (this->msg.m_cmmd);
this->msg.m_postsize = epicsNTOH16 (this->msg.m_postsize);
this->msg.m_dataType = epicsNTOH16 (this->msg.m_dataType);
this->msg.m_count = epicsNTOH16 (this->msg.m_count);
this->msg.m_cid = epicsNTOH32 (this->msg.m_cid);
this->msg.m_available = epicsNTOH32 (this->msg.m_available);
}
//
// casCtx::setData()
//
inline void casCtx::setData(void *p)
{
this->pData = p;
this->msg = msgIn;
this->pData = pBody;
}
//

View File

@@ -32,8 +32,8 @@
#include "server.h"
#include "caServerIIL.h" // caServerI inline func
#include "inBufIL.h" // inline functions for inBuf
#include "outBufIL.h" // inline func for outBuf
#include "dgInBufIL.h" // dgInBuf inline func
#include "casCtxIL.h" // casCtx inline func
#include "casCoreClientIL.h" // casCoreClient inline func
#include "osiPoolStatus.h" // osi pool monitoring functions
@@ -75,7 +75,7 @@ void casDGClient::show (unsigned level) const
static_cast <const void *> ( this ) );
if (level>=1u) {
char buf[64];
this->clientHostName (buf, sizeof(buf));
this->hostName (buf, sizeof(buf));
printf ("Client Host=%s\n", buf);
}
this->casClient::show (level);
@@ -86,10 +86,10 @@ void casDGClient::show (unsigned level) const
//
caStatus casDGClient::uknownMessageAction ()
{
const caHdr *mp = this->ctx.getMsg();
const caHdrLargeArray * mp = this->ctx.getMsg();
this->dumpMsg (mp, this->ctx.getData(),
"bad request code=%u in DG\n", mp->m_cmmd);
this->dumpMsg ( mp, this->ctx.getData(),
"bad request code=%u in DG\n", mp->m_cmmd );
return S_cas_internal;
}
@@ -99,25 +99,41 @@ caStatus casDGClient::uknownMessageAction ()
//
caStatus casDGClient::searchAction()
{
const caHdr *mp = this->ctx.getMsg();
void *dp = this->ctx.getData();
char *pChanName = (char *) dp;
caStatus status;
const caHdrLargeArray *mp = this->ctx.getMsg();
const char *pChanName = static_cast <char * > ( this->ctx.getData() );
caStatus status;
//
// check the sanity of the message
//
if (mp->m_postsize<=1) {
this->dumpMsg (mp, dp,
"empty PV name in UDP search request?\n");
if ( mp->m_postsize <= 1 ) {
this->dumpMsg ( mp, this->ctx.getData(),
"empty PV name extension in UDP search request?\n" );
return S_cas_success;
}
pChanName[mp->m_postsize-1] = '\0';
if (this->getCAS().getDebugLevel()>2u) {
char pName[64u];
this->clientHostName (pChanName, sizeof (pChanName));
printf("%s is searching for \"%s\"\n", pName, pChanName);
if ( pChanName[0] == '\0' ) {
this->dumpMsg ( mp, this->ctx.getData(),
"zero length PV name in UDP search request?\n" );
return S_cas_success;
}
// check for an unterminated string before calling server tool
// by searching backwards through the string (some early versions
// of the client library might not be setting the pad bytes to nill)
for ( unsigned i = mp->m_postsize-1; pChanName[i] != '\0'; i-- ) {
if ( i <= 1 ) {
this->dumpMsg ( mp, this->ctx.getData(),
"unterminated PV name in UDP search request?\n" );
return S_cas_success;
}
}
if ( this->getCAS().getDebugLevel() > 2u ) {
char pHostName[64u];
this->hostName ( pHostName, sizeof ( pHostName ) );
printf ( "\"%s\" is searching for \"%s\"\n",
pHostName, pChanName );
}
//
@@ -156,7 +172,7 @@ caStatus casDGClient::searchAction()
//
switch (pver.getStatus()) {
case pverExistsHere:
status = this->searchResponse (*mp, pver);
status = this->searchResponse (*mp, pver);
break;
case pverDoesNotExistHere:
@@ -182,18 +198,15 @@ caStatus casDGClient::searchAction()
//
// caStatus casDGClient::searchResponse()
//
caStatus casDGClient::searchResponse(const caHdr &msg,
const pvExistReturn &retVal)
caStatus casDGClient::searchResponse ( const caHdrLargeArray & msg,
const pvExistReturn & retVal )
{
caStatus status;
caHdr *search_reply;
struct sockaddr_in ina;
unsigned short *pMinorVersion;
caStatus status;
//
// normal search failure is ignored
//
if (retVal.getStatus()==pverDoesNotExistHere) {
if ( retVal.getStatus() == pverDoesNotExistHere ) {
return S_cas_success;
}
@@ -214,7 +227,7 @@ caStatus casDGClient::searchResponse(const caHdr &msg,
if ( !CA_V44(msg.m_count) ) {
if (this->getCAS().getDebugLevel()>0u) {
char pName[64u];
this->clientHostName (pName, sizeof (pName));
this->hostName (pName, sizeof (pName));
printf("client \"%s\" using EPICS R3.11 CA connect protocol was ignored\n", pName);
}
//
@@ -226,30 +239,23 @@ caStatus casDGClient::searchResponse(const caHdr &msg,
"R3.11 connect sequence from old client was ignored");
return status;
}
//
// obtain space for the reply message
//
status = this->allocMsg(sizeof(*pMinorVersion), &search_reply);
if (status) {
return status;
}
*search_reply = msg;
search_reply->m_postsize = sizeof(*pMinorVersion);
//
// cid field is abused to carry the IP
// address in CA_V48 or higher
// (this allows a CA servers to serve
// as a directory service)
//
// type field is abused to carry the IP
// data type field is abused to carry the IP
// port number here CA_V44 or higher
// (this allows multiple CA servers on one
// host)
//
if (CA_V48(msg.m_count)) {
if (retVal.addrIsValid()) {
ca_uint32_t serverAddr;
ca_uint16_t serverPort;
if ( CA_V48( msg.m_count ) ) {
struct sockaddr_in ina;
if ( retVal.addrIsValid() ) {
caNetAddr addr = retVal.getAddr();
ina = addr.getSockIP();
//
@@ -280,27 +286,30 @@ caStatus casDGClient::searchResponse(const caHdr &msg,
ina.sin_addr.s_addr = epicsNTOH32 (~0U);
}
}
search_reply->m_cid = epicsNTOH32 (ina.sin_addr.s_addr);
search_reply->m_dataType = epicsNTOH16 (ina.sin_port);
serverAddr = epicsNTOH32 (ina.sin_addr.s_addr);
serverPort = epicsNTOH16 (ina.sin_port);
}
else {
caNetAddr addr = this->serverAddress ();
struct sockaddr_in inetAddr = addr.getSockIP();
search_reply->m_cid = ~0U;
search_reply->m_dataType = epicsNTOH16 (inetAddr.sin_port);
serverAddr = ~0U;
serverPort = epicsNTOH16 ( inetAddr.sin_port );
}
search_reply->m_count = 0ul;
ca_uint16_t * pMinorVersion;
status = this->out.copyInHeader ( CA_PROTO_SEARCH,
sizeof ( *pMinorVersion ), serverPort, 0,
serverAddr, msg.m_available,
reinterpret_cast <void **> ( &pMinorVersion ) );
//
// Starting with CA V4.1 the minor version number
// is appended to the end of each search reply.
// This value is ignored by earlier clients.
//
pMinorVersion = (unsigned short *) (search_reply+1);
*pMinorVersion = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION );
this->commitMsg();
this->out.commitMsg ();
return S_cas_success;
}
@@ -310,21 +319,14 @@ caStatus casDGClient::searchResponse(const caHdr &msg,
// (only when requested by the client
// - when it isnt a reply to a broadcast)
//
caStatus casDGClient::searchFailResponse(const caHdr *mp)
caStatus casDGClient::searchFailResponse ( const caHdrLargeArray * mp )
{
int status;
caHdr *reply;
status = this->allocMsg(0u, &reply);
if(status){
return status;
}
status = this->out.copyInHeader ( CA_PROTO_NOT_FOUND, 0,
mp->m_dataType, mp->m_count, mp->m_cid, mp->m_available, 0 );
*reply = *mp;
reply->m_cmmd = CA_PROTO_NOT_FOUND;
reply->m_postsize = 0u;
this->commitMsg();
this->out.commitMsg ();
return S_cas_success;
}
@@ -333,7 +335,7 @@ caStatus casDGClient::searchFailResponse(const caHdr *mp)
// casDGClient::sendBeacon()
// (implemented here because this has knowledge of the protocol)
//
void casDGClient::sendBeacon ()
void casDGClient::sendBeacon ( ca_uint32_t beaconNumber )
{
union {
caHdr msg;
@@ -343,8 +345,10 @@ void casDGClient::sendBeacon ()
//
// create the message
//
memset ( &buf, 0, sizeof (msg) );
msg.m_cmmd = epicsHTON16 (CA_PROTO_RSRV_IS_UP);
memset ( & buf, 0, sizeof ( msg ) );
msg.m_cmmd = epicsHTON16 ( CA_PROTO_RSRV_IS_UP );
msg.m_dataType = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION );
msg.m_cid = epicsHTON32 ( beaconNumber );
//
// send it to all addresses on the beacon list,
@@ -357,61 +361,61 @@ void casDGClient::sendBeacon ()
//
// casDGClient::xSend()
//
outBuf::flushCondition casDGClient::xSend (char *pBufIn, // X aCC 361
outBufClient::flushCondition casDGClient::xSend ( char *pBufIn, // X aCC 361
bufSizeT nBytesAvailableToSend, bufSizeT nBytesNeedToBeSent,
bufSizeT &nBytesSent)
bufSizeT &nBytesSent )
{
outBuf::flushCondition stat;
outBufClient::flushCondition stat;
bufSizeT totalBytes;
cadg *pHdr;
assert (nBytesAvailableToSend>=nBytesNeedToBeSent);
assert ( nBytesAvailableToSend >= nBytesNeedToBeSent );
totalBytes = 0;
while ( true ) {
pHdr = reinterpret_cast<cadg *>(&pBufIn[totalBytes]);
assert (totalBytes<=bufSizeT_MAX-pHdr->cadg_nBytes);
assert (totalBytes+pHdr->cadg_nBytes<=nBytesAvailableToSend);
assert ( totalBytes <= bufSizeT_MAX-pHdr->cadg_nBytes );
assert ( totalBytes+pHdr->cadg_nBytes <= nBytesAvailableToSend );
if (pHdr->cadg_addr.isValid()) {
stat = this->osdSend (reinterpret_cast<char *>(pHdr+1),
pHdr->cadg_nBytes-sizeof(*pHdr), pHdr->cadg_addr);
if (stat!=outBuf::flushProgress) {
if ( pHdr->cadg_addr.isValid() ) {
stat = this->osdSend ( reinterpret_cast<char *>(pHdr+1),
pHdr->cadg_nBytes-sizeof(*pHdr), pHdr->cadg_addr );
if ( stat != outBufClient::flushProgress ) {
break;
}
}
totalBytes += pHdr->cadg_nBytes;
if (totalBytes>=nBytesNeedToBeSent) {
if ( totalBytes >= nBytesNeedToBeSent ) {
break;
}
}
if (totalBytes) {
if ( totalBytes ) {
//
// !! this time fetch may be slowing things down !!
//
//this->lastSendTS = epicsTime::getCurrent();
nBytesSent = totalBytes;
return outBuf::flushProgress;
return outBufClient::flushProgress;
}
else {
return outBuf::flushNone;
return outBufClient::flushNone;
}
}
//
// casDGClient::xRecv ()
//
inBuf::fillCondition casDGClient::xRecv (char *pBufIn, bufSizeT nBytesToRecv, // X aCC 361
inBufClient::fillCondition casDGClient::xRecv (char *pBufIn, bufSizeT nBytesToRecv, // X aCC 361
fillParameter parm, bufSizeT &nByesRecv)
{
const char *pAfter = pBufIn + nBytesToRecv;
char *pCurBuf = pBufIn;
bufSizeT nDGBytesRecv;
inBuf::fillCondition stat;
inBufClient::fillCondition stat;
cadg *pHdr;
while (pAfter-pCurBuf >= static_cast<int>(MAX_UDP_RECV+sizeof(cadg))) {
@@ -448,8 +452,8 @@ inBuf::fillCondition casDGClient::xRecv (char *pBufIn, bufSizeT nBytesToRecv, //
// this results in many small UDP frames which unfortunately
// isnt particularly efficient
//
caStatus casDGClient::asyncSearchResponse (const caNetAddr &outAddr,
const caHdr &msg, const pvExistReturn &retVal)
caStatus casDGClient::asyncSearchResponse ( const caNetAddr & outAddr,
const caHdrLargeArray & msg, const pvExistReturn & retVal )
{
caStatus stat;
@@ -457,20 +461,20 @@ caStatus casDGClient::asyncSearchResponse (const caNetAddr &outAddr,
// start a DG context in the output protocol stream
// and grab the send lock
//
void *pRaw;
const outBufCtx outctx = this->outBuf::pushCtx
(sizeof(cadg), MAX_UDP_SEND, pRaw);
if (outctx.pushResult()!=outBufCtx::pushCtxSuccess) {
void * pRaw;
const outBufCtx outctx = this->out.pushCtx
( sizeof(cadg), MAX_UDP_SEND, pRaw );
if ( outctx.pushResult() != outBufCtx::pushCtxSuccess ) {
return S_cas_sendBlocked;
}
cadg *pRespHdr = reinterpret_cast<cadg *>(pRaw);
stat = this->searchResponse (msg, retVal);
cadg * pRespHdr = reinterpret_cast<cadg *> ( pRaw );
stat = this->searchResponse ( msg, retVal );
pRespHdr->cadg_nBytes = this->outBuf::popCtx (outctx);
if (pRespHdr->cadg_nBytes>0) {
pRespHdr->cadg_nBytes = this->out.popCtx (outctx);
if ( pRespHdr->cadg_nBytes > 0 ) {
pRespHdr->cadg_addr = outAddr;
this->outBuf::commitRawMsg (pRespHdr->cadg_nBytes);
this->out.commitRawMsg ( pRespHdr->cadg_nBytes );
}
return stat;
@@ -485,12 +489,12 @@ caStatus casDGClient::processDG ()
caStatus status;
status = S_cas_success;
while ( (bytesLeft = this->inBuf::bytesPresent()) ) {
while ( ( bytesLeft = this->in.bytesPresent() ) ) {
bufSizeT dgInBytesConsumed;
const cadg *pReqHdr = reinterpret_cast<cadg *>(this->inBuf::msgPtr ());
const cadg * pReqHdr = reinterpret_cast<cadg *> ( this->in.msgPtr () );
if (bytesLeft<sizeof(*pReqHdr)) {
this->inBuf::removeMsg (bytesLeft);
this->in.removeMsg (bytesLeft);
errlogPrintf ("casDGClient::processMsg: incomplete DG header?");
status = S_cas_internal;
break;
@@ -501,7 +505,7 @@ caStatus casDGClient::processDG ()
// and grab the send lock
//
void *pRaw;
const outBufCtx outctx = this->outBuf::pushCtx (sizeof(cadg), MAX_UDP_SEND, pRaw);
const outBufCtx outctx = this->out.pushCtx ( sizeof ( cadg ), MAX_UDP_SEND, pRaw );
if ( outctx.pushResult() != outBufCtx::pushCtxSuccess ) {
status = S_cas_sendBlocked;
break;
@@ -513,10 +517,11 @@ caStatus casDGClient::processDG ()
// select the next DG in the input stream and start processing it
//
const bufSizeT reqBodySize = pReqHdr->cadg_nBytes-sizeof (*pReqHdr);
const inBufCtx inctx = this->inBuf::pushCtx (sizeof (*pReqHdr), reqBodySize);
const inBufCtx inctx = this->in.pushCtx (sizeof (*pReqHdr), reqBodySize);
if ( inctx.pushResult() != inBufCtx::pushCtxSuccess ) {
this->inBuf::removeMsg (bytesLeft);
this->outBuf::popCtx (outctx);
this->in.removeMsg ( bytesLeft );
this->out.popCtx ( outctx );
errlogPrintf ("casDGClient::processMsg: incomplete DG?\n");
status = S_cas_internal;
break;
@@ -525,7 +530,7 @@ caStatus casDGClient::processDG ()
this->lastRecvAddr = pReqHdr->cadg_addr;
status = this->processMsg ();
dgInBytesConsumed = this->inBuf::popCtx (inctx);
dgInBytesConsumed = this->in.popCtx ( inctx );
if (dgInBytesConsumed>0) {
//
@@ -536,21 +541,21 @@ caStatus casDGClient::processDG ()
// In either case commit the DG to the protocol stream and
// release the send lock
//
pRespHdr->cadg_nBytes = this->outBuf::popCtx (outctx) + sizeof(*pRespHdr);
if (pRespHdr->cadg_nBytes>sizeof(*pRespHdr)) {
pRespHdr->cadg_nBytes = this->out.popCtx ( outctx ) + sizeof ( *pRespHdr );
if ( pRespHdr->cadg_nBytes > sizeof ( *pRespHdr ) ) {
pRespHdr->cadg_addr = pReqHdr->cadg_addr;
this->outBuf::commitRawMsg (pRespHdr->cadg_nBytes);
this->out.commitRawMsg ( pRespHdr->cadg_nBytes );
}
//
// check to see that all of the incoming UDP frame was used
//
if (dgInBytesConsumed<reqBodySize) {
if ( dgInBytesConsumed < reqBodySize ) {
//
// remove the bytes in the body that were consumed,
// but _not_ the header bytes
//
this->inBuf::removeMsg (dgInBytesConsumed);
this->in.removeMsg (dgInBytesConsumed);
//
// slide the UDP header forward and correct the byte count
@@ -558,7 +563,7 @@ caStatus casDGClient::processDG ()
{
cadg *pReqHdrMove;
cadg copy = *pReqHdr;
pReqHdrMove = reinterpret_cast <cadg *> (this->inBuf::msgPtr ());
pReqHdrMove = reinterpret_cast <cadg *> ( this->in.msgPtr () );
pReqHdrMove->cadg_addr = copy.cadg_addr;
pReqHdrMove->cadg_nBytes = copy.cadg_nBytes - dgInBytesConsumed;
}
@@ -567,7 +572,7 @@ caStatus casDGClient::processDG ()
//
// remove the header and all of the body
//
this->inBuf::removeMsg ( pReqHdr->cadg_nBytes );
this->in.removeMsg ( pReqHdr->cadg_nBytes );
}
}
@@ -595,9 +600,9 @@ caNetAddr casDGClient::fetchLastRecvAddr () const
}
//
// casDGClient::clientHostName()
// casDGClient::hostName()
//
void casDGClient::clientHostName (char *pBufIn, unsigned bufSizeIn) const
void casDGClient::hostName (char *pBufIn, unsigned bufSizeIn) const
{
this->lastRecvAddr.stringConvert (pBufIn, bufSizeIn);
}

View File

@@ -34,7 +34,7 @@
//
#include "tsDLList.h"
#include "resourceLib.h"
#define CA_MINOR_PROTOCOL_REVISION 8
#define CA_MINOR_PROTOCOL_REVISION 10
#include "caProto.h"
#include "smartGDDPointer.h"
@@ -499,3 +499,14 @@ private:
casPVI & operator = ( const casPVI & );
};
/* a modified ca header with capacity for large arrays */
struct caHdrLargeArray {
ca_uint32_t m_postsize; /* size of message extension */
ca_uint32_t m_count; /* operation data count */
ca_uint32_t m_cid; /* channel identifier */
ca_uint32_t m_available; /* protocol stub dependent */
ca_uint16_t m_dataType; /* operation data type */
ca_uint16_t m_cmmd; /* operation to be performed */
};

View File

@@ -33,10 +33,6 @@
#include "server.h"
#include "casPVIIL.h"
//
// this needs to be here (and not in dgInBufIL.h) if we
// are to avoid undefined symbols under gcc 2.7.x with -g
//
//
// casPVListChan::casPVListChan()
//

File diff suppressed because it is too large Load Diff

View File

@@ -809,7 +809,7 @@ public:
epicsShareFunc virtual void destroy ();
private:
caHdr const msg;
caHdrLargeArray const msg;
casChannelI &chan;
smartConstGDDPointer pDD;
caStatus completionStatus;
@@ -870,7 +870,7 @@ public:
epicsShareFunc virtual void destroy ();
private:
caHdr const msg;
caHdrLargeArray const msg;
casChannelI &chan;
caStatus completionStatus;
@@ -919,7 +919,7 @@ public:
epicsShareFunc virtual void destroy();
private:
caHdr const msg;
caHdrLargeArray const msg;
pvExistReturn retVal;
const caNetAddr dgOutAddr;
@@ -966,7 +966,7 @@ public:
epicsShareFunc virtual void destroy ();
private:
caHdr const msg;
caHdrLargeArray const msg;
pvAttachReturn retVal;
epicsShareFunc caStatus cbFuncAsyncIO ();

View File

@@ -1,8 +0,0 @@
#ifndef dgInBufILh
#define dgInBufILh
#include "inBufIL.h"
#endif // dgInBufILh

View File

@@ -34,25 +34,20 @@
//
// inBuf::inBuf()
//
inBuf::inBuf (bufSizeT bufSizeIn, bufSizeT ioMinSizeIn) :
bufSize (bufSizeIn),
bytesInBuffer (0u),
nextReadIndex (0u),
ioMinSize (ioMinSizeIn),
ctxRecursCount (0u)
inBuf::inBuf ( inBufClient &clientIn, bufSizeT ioMinSizeIn ) :
client ( clientIn ), pBuf ( 0 ), bufSize ( 0u ), bytesInBuffer ( 0u ), nextReadIndex ( 0u ),
ioMinSize ( ioMinSizeIn ), ctxRecursCount ( 0u )
{
if (this->ioMinSize==0) {
if ( this->ioMinSize == 0 ) {
this->ioMinSize = 1;
}
if ( this->bufSize < this->ioMinSize ) {
this->bufSize = this->ioMinSize;
if ( this->ioMinSize <= pGlobalBufferFactoryCAS->smallBufferSize () ) {
this->pBuf = pGlobalBufferFactoryCAS->newSmallBuffer ();
this->bufSize = pGlobalBufferFactoryCAS->smallBufferSize ();
}
this->pBuf = new char [this->bufSize];
if (!this->pBuf) {
this->bufSize = 0u;
throw S_cas_noMemory;
else {
this->pBuf = new char [ this->ioMinSize ];
this->bufSize = this->ioMinSize;
}
}
@@ -62,8 +57,16 @@ inBuf::inBuf (bufSizeT bufSizeIn, bufSizeT ioMinSizeIn) :
//
inBuf::~inBuf ()
{
assert (this->ctxRecursCount==0);
delete [] this->pBuf;
assert ( this->ctxRecursCount == 0 );
if ( this->bufSize == pGlobalBufferFactoryCAS->smallBufferSize () ) {
pGlobalBufferFactoryCAS->destroySmallBuffer ( this->pBuf );
}
else if ( this->bufSize == pGlobalBufferFactoryCAS->largeBufferSize () ) {
pGlobalBufferFactoryCAS->destroyLargeBuffer ( this->pBuf );
}
else {
delete [] this->pBuf;
}
}
//
@@ -81,11 +84,11 @@ void inBuf::show (unsigned level) const
//
// inBuf::fill()
//
inBuf::fillCondition inBuf::fill (fillParameter parm)
inBufClient::fillCondition inBuf::fill ( inBufClient::fillParameter parm )
{
bufSizeT bytesOpen;
bufSizeT bytesRecv;
inBuf::fillCondition stat;
inBufClient::fillCondition stat;
//
// move back any prexisting data to the start of the buffer
@@ -108,20 +111,20 @@ inBuf::fillCondition inBuf::fill (fillParameter parm)
// noop if the buffer is full
//
bytesOpen = this->bufSize - this->bytesInBuffer;
if (bytesOpen < this->ioMinSize) {
return casFillNone;
if ( bytesOpen < this->ioMinSize ) {
return inBufClient::casFillNone;
}
stat = this->xRecv (&this->pBuf[this->bytesInBuffer],
bytesOpen, parm, bytesRecv);
if (stat == casFillProgress) {
stat = this->client.xRecv ( &this->pBuf[this->bytesInBuffer],
bytesOpen, parm, bytesRecv );
if ( stat == inBufClient::casFillProgress ) {
assert (bytesRecv<=bytesOpen);
this->bytesInBuffer += bytesRecv;
if (this->getDebugLevel()>2u) {
if ( this->client.getDebugLevel() > 2u ) {
char buf[64];
this->clientHostName(buf, sizeof(buf));
this->client.hostName ( buf, sizeof ( buf ) );
printf ("CAS: incoming %u byte msg from %s\n",
bytesRecv, buf);
@@ -134,11 +137,11 @@ inBuf::fillCondition inBuf::fill (fillParameter parm)
//
// inBuf::pushCtx ()
//
const inBufCtx inBuf::pushCtx (bufSizeT headerSize, // X aCC 361
bufSizeT bodySize)
const inBufCtx inBuf::pushCtx ( bufSizeT headerSize, // X aCC 361
bufSizeT bodySize )
{
if (headerSize+bodySize>(this->bytesInBuffer - this->nextReadIndex) ||
this->ctxRecursCount==UINT_MAX) {
if ( headerSize + bodySize > ( this->bytesInBuffer - this->nextReadIndex ) ||
this->ctxRecursCount == UINT_MAX ) {
return inBufCtx ();
}
else {
@@ -159,7 +162,7 @@ const inBufCtx inBuf::pushCtx (bufSizeT headerSize, // X aCC 361
//
bufSizeT inBuf::popCtx (const inBufCtx &ctx) // X aCC 361
{
if (ctx.stat==inBufCtx::pushCtxSuccess) {
if ( ctx.stat==inBufCtx::pushCtxSuccess ) {
this->mutex.lock();
bufSizeT bytesRemoved = this->nextReadIndex;
this->pBuf = ctx.pBuf;
@@ -176,5 +179,21 @@ bufSizeT inBuf::popCtx (const inBufCtx &ctx) // X aCC 361
}
}
void inBuf::expandBuffer ()
{
if ( this->bufSize < pGlobalBufferFactoryCAS->largeBufferSize () ) {
char * pNewBuf = pGlobalBufferFactoryCAS->newLargeBuffer ();
memcpy ( pNewBuf, &this->pBuf[this->nextReadIndex], this->bytesPresent () );
this->nextReadIndex = 0u;
pGlobalBufferFactoryCAS->destroySmallBuffer ( this->pBuf );
this->pBuf = pNewBuf;
this->bufSize = pGlobalBufferFactoryCAS->largeBufferSize ();
}
}
unsigned inBuf::bufferSize () const
{
return this->bufSize;
}

View File

@@ -16,8 +16,8 @@ inline bufSizeT inBuf::bytesPresent () const
inline bufSizeT inBuf::bytesAvailable () const
{
bufSizeT bp;
bp = this->bytesPresent();
bp += this->incomingBytesPresent();
bp = this->bytesPresent ();
bp += this->client.incomingBytesPresent ();
return bp;
}
@@ -52,10 +52,10 @@ inline char *inBuf::msgPtr () const
//
// inBuf::removeMsg()
//
inline void inBuf::removeMsg (bufSizeT nBytes)
inline void inBuf::removeMsg ( bufSizeT nBytes )
{
this->nextReadIndex += nBytes;
assert (this->nextReadIndex<=this->bytesInBuffer);
assert ( this->nextReadIndex <= this->bytesInBuffer );
}
//

View File

@@ -36,14 +36,12 @@
//
// outBuf::outBuf()
//
outBuf::outBuf (unsigned bufSizeIn) :
bufSize (bufSizeIn), stack (0u), ctxRecursCount (0u)
outBuf::outBuf ( outBufClient & clientIn ) :
client ( clientIn ), bufSize ( 0 ), stack ( 0u ), ctxRecursCount ( 0u )
{
this->pBuf = new char [this->bufSize];
if (!this->pBuf) {
throw S_cas_noMemory;
}
memset (this->pBuf, '\0', this->bufSize);
this->pBuf = pGlobalBufferFactoryCAS->newSmallBuffer ();
this->bufSize = pGlobalBufferFactoryCAS->smallBufferSize ();
memset ( this->pBuf, '\0', this->bufSize );
}
//
@@ -51,42 +49,52 @@ outBuf::outBuf (unsigned bufSizeIn) :
//
outBuf::~outBuf()
{
assert (this->ctxRecursCount==0);
delete [] this->pBuf;
assert ( this->ctxRecursCount == 0 );
if ( this->bufSize == pGlobalBufferFactoryCAS->smallBufferSize () ) {
pGlobalBufferFactoryCAS->destroySmallBuffer ( this->pBuf );
}
else if ( this->bufSize == pGlobalBufferFactoryCAS->largeBufferSize () ) {
pGlobalBufferFactoryCAS->destroyLargeBuffer ( this->pBuf );
}
else if ( this->pBuf ) {
errlogPrintf (
"cas: unusual outBuf buffer size %u in destructor - probable leak\n",
this->bufSize );
}
}
//
// outBuf::allocRawMsg ()
//
caStatus outBuf::allocRawMsg (bufSizeT msgsize, void **ppMsg)
caStatus outBuf::allocRawMsg ( bufSizeT msgsize, void **ppMsg )
{
bufSizeT stackNeeded;
msgsize = CA_MESSAGE_ALIGN (msgsize);
msgsize = CA_MESSAGE_ALIGN ( msgsize );
this->mutex.lock ();
if (msgsize>this->bufSize) {
if ( msgsize > this->bufSize ) {
this->mutex.unlock ();
return S_cas_hugeRequest;
}
stackNeeded = this->bufSize - msgsize;
if (this->stack > stackNeeded) {
if ( this->stack > stackNeeded ) {
//
// Try to flush the output queue
//
this->flush (this->stack-stackNeeded);
this->flush ( this->stack-stackNeeded );
//
// If this failed then the fd is nonblocking
// and we will let select() take care of it
//
if (this->stack > stackNeeded) {
if ( this->stack > stackNeeded ) {
this->mutex.unlock();
this->sendBlockSignal();
this->client.sendBlockSignal();
return S_cas_sendBlocked;
}
}
@@ -99,63 +107,135 @@ caStatus outBuf::allocRawMsg (bufSizeT msgsize, void **ppMsg)
return S_cas_success;
}
caStatus outBuf::copyInHeader ( ca_uint16_t response, ca_uint32_t payloadSize,
ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid,
ca_uint32_t responseSpecific, void **ppPayload )
{
ca_uint32_t alignedPayloadSize = CA_MESSAGE_ALIGN ( payloadSize );
ca_uint32_t hdrSize;
if ( alignedPayloadSize < 0xffff && nElem < 0xffff ) {
hdrSize = sizeof ( caHdr );
}
else {
hdrSize = sizeof ( caHdr ) + 2 * sizeof (ca_uint32_t);
}
caHdr * pHdr;
caStatus status = this->allocRawMsg ( hdrSize + alignedPayloadSize,
reinterpret_cast < void ** > ( & pHdr ) );
if ( status != S_cas_success ) {
if ( status == S_cas_hugeRequest ) {
this->expandBuffer ();
status = this->allocRawMsg ( hdrSize + alignedPayloadSize,
reinterpret_cast < void ** > ( & pHdr ) );
if ( status != S_cas_success ) {
return status;
}
}
else {
return status;
}
}
pHdr->m_cmmd = epicsHTON16 ( response );
pHdr->m_dataType = epicsHTON16 ( dataType );
pHdr->m_cid = epicsHTON32 ( cid );
pHdr->m_available = epicsHTON32 ( responseSpecific );
char * pPayload;
if ( hdrSize == sizeof ( caHdr ) ) {
pHdr->m_postsize = epicsHTON16 ( alignedPayloadSize );
pHdr->m_count = epicsHTON16 ( nElem );
pPayload = reinterpret_cast < char * > ( pHdr + 1 );
}
else {
pHdr->m_postsize = epicsHTON16 ( 0xffff );
pHdr->m_count = epicsHTON16 ( 0 );
ca_uint32_t * pLW = reinterpret_cast <ca_uint32_t *> ( pHdr + 1 );
pLW[0] = epicsHTON32 ( alignedPayloadSize );
pLW[1] = epicsHTON32 ( nElem );
pPayload = reinterpret_cast < char * > ( pLW + 2 );
}
/* zero out pad bytes */
if ( alignedPayloadSize > payloadSize ) {
memset ( pPayload + payloadSize, '\0',
alignedPayloadSize - payloadSize );
}
if ( ppPayload ) {
*ppPayload = pPayload;
}
return status;
}
//
// outBuf::commitMsg ()
//
void outBuf::commitMsg ()
{
caHdr *mp;
unsigned diff;
ca_uint16_t extSize;
ca_uint32_t payloadSize;
ca_uint32_t elementCount;
ca_uint32_t hdrSize;
mp = (caHdr *) &this->pBuf[this->stack];
const caHdr * mp = ( caHdr * ) & this->pBuf[ this->stack ];
if ( mp->m_postsize == 0xffff || mp->m_count == 0xffff ) {
const ca_uint32_t *pLW = reinterpret_cast <const ca_uint32_t *> ( mp + 1 );
payloadSize = epicsNTOH32 ( pLW[0] );
elementCount = epicsNTOH32 ( pLW[1] );
hdrSize = sizeof ( caHdr ) + 2 * sizeof ( ca_uint32_t );
}
else {
payloadSize = epicsNTOH16 ( mp->m_postsize );
elementCount = epicsNTOH16 ( mp->m_count );
hdrSize = sizeof ( caHdr );
}
extSize = static_cast <ca_uint16_t> ( CA_MESSAGE_ALIGN (mp->m_postsize) );
this->commitRawMsg ( hdrSize + payloadSize );
//
// Guarantee that all portions of outgoing messages
// (including alignment pad) are initialized
//
diff = extSize - mp->m_postsize;
if (diff>0u) {
memset ( reinterpret_cast<char *>(mp+1) + mp->m_postsize, '\0', diff );
}
if (this->getDebugLevel()) {
if ( this->client.getDebugLevel() ) {
errlogPrintf (
"CAS Response => cmd=%d id=%x typ=%d cnt=%d psz=%d avail=%x outBuf ptr=%lx\n",
mp->m_cmmd, mp->m_cid, mp->m_dataType, mp->m_count, mp->m_postsize,
mp->m_available, (long)mp);
"CAS Response => cmd=%d id=%x typ=%d cnt=%d psz=%d avail=%x outBuf ptr=%p \n",
epicsNTOH16 ( mp->m_cmmd ), epicsNTOH32 ( mp->m_cid ),
epicsNTOH16 ( mp->m_dataType ), elementCount, payloadSize,
epicsNTOH32 ( mp->m_available ), static_cast <const void *> ( mp ) );
}
}
//
// convert to network byte order
// (data following header is handled elsewhere)
//
mp->m_cmmd = epicsHTON16 (mp->m_cmmd);
mp->m_postsize = epicsHTON16 (extSize);
mp->m_dataType = epicsHTON16 (mp->m_dataType);
mp->m_count = epicsHTON16 (mp->m_count);
mp->m_cid = epicsHTON32 (mp->m_cid);
mp->m_available = epicsHTON32 (mp->m_available);
commitRawMsg (extSize + sizeof(*mp));
//
// outBuf::commitMsg ()
//
void outBuf::commitMsg ( ca_uint32_t reducedPayloadSize )
{
caHdr * mp = ( caHdr * ) & this->pBuf[ this->stack ];
reducedPayloadSize = CA_MESSAGE_ALIGN ( reducedPayloadSize );
if ( mp->m_postsize == 0xffff || mp->m_count == 0xffff ) {
ca_uint32_t *pLW = reinterpret_cast <ca_uint32_t *> ( mp + 1 );
assert ( reducedPayloadSize <= epicsNTOH32 ( pLW[0] ) );
pLW[0] = epicsHTON32 ( reducedPayloadSize );
}
else {
assert ( reducedPayloadSize <= epicsNTOH16 ( mp->m_postsize ) );
mp->m_postsize = epicsHTON16 ( static_cast <ca_uint16_t> ( reducedPayloadSize ) );
}
this->commitMsg ();
}
//
// outBuf::flush ()
//
outBuf::flushCondition outBuf::flush (bufSizeT spaceRequired)
outBufClient::flushCondition outBuf::flush ( bufSizeT spaceRequired )
{
bufSizeT nBytes;
bufSizeT nBytesRequired;
flushCondition cond;
bufSizeT nBytes;
bufSizeT nBytesRequired;
outBufClient::flushCondition cond;
this->mutex.lock();
if (this->ctxRecursCount>0) {
this->mutex.unlock();
return flushNone;
return outBufClient::flushNone;
}
if (spaceRequired>this->bufSize) {
@@ -173,9 +253,9 @@ outBuf::flushCondition outBuf::flush (bufSizeT spaceRequired)
}
}
cond = this->xSend(this->pBuf, this->stack,
cond = this->client.xSend(this->pBuf, this->stack,
nBytesRequired, nBytes);
if (cond==flushProgress) {
if (cond==outBufClient::flushProgress) {
bufSizeT len;
if (nBytes >= this->stack) {
@@ -190,9 +270,9 @@ outBuf::flushCondition outBuf::flush (bufSizeT spaceRequired)
this->stack = len;
}
if (this->getDebugLevel()>2u) {
if (this->client.getDebugLevel()>2u) {
char buf[64];
this->clientHostName (buf, sizeof(buf));
this->client.hostName (buf, sizeof(buf));
errlogPrintf ("CAS: Sent a %d byte reply to %s\n",
nBytes, buf);
}
@@ -252,8 +332,20 @@ bufSizeT outBuf::popCtx (const outBufCtx &ctx) // X aCC 361
//
void outBuf::show (unsigned level) const
{
if (level>1u) {
if ( level > 1u ) {
printf("\tUndelivered response bytes = %d\n", this->bytesPresent());
}
}
void outBuf::expandBuffer ()
{
if ( this->bufSize < pGlobalBufferFactoryCAS->largeBufferSize () ) {
char * pNewBuf = pGlobalBufferFactoryCAS->newLargeBuffer ();
memcpy ( pNewBuf, this->pBuf, this->stack );
pGlobalBufferFactoryCAS->destroySmallBuffer ( this->pBuf );
this->pBuf = pNewBuf;
this->bufSize = pGlobalBufferFactoryCAS->largeBufferSize ();
}
}

View File

@@ -52,10 +52,10 @@ inline void outBuf::clear ()
//
// (if space is avilable this leaves the send lock applied)
//
inline caStatus outBuf::allocMsg (bufSizeT extsize, caHdr **ppMsg)
{
return this->allocRawMsg (extsize + sizeof(caHdr), (void **)ppMsg);
}
//inline caStatus outBuf::allocMsg (bufSizeT extsize, caHdr **ppMsg)
//{
// return this->allocRawMsg (extsize + sizeof(caHdr), (void **)ppMsg);
//}
//
// outBuf::commitRawMsg()
@@ -63,7 +63,7 @@ inline caStatus outBuf::allocMsg (bufSizeT extsize, caHdr **ppMsg)
inline void outBuf::commitRawMsg (bufSizeT size)
{
this->stack += size;
assert (this->stack <= this->bufSize);
assert ( this->stack <= this->bufSize );
this->mutex.unlock();
}

View File

@@ -37,7 +37,7 @@
#include <stdio.h>
#if defined(epicsExportSharedSymbols)
#error suspect that libCom, ca, and gdd were not imported
# error suspect that libCom, ca, and gdd were not imported
#endif
//
@@ -55,6 +55,7 @@
#include "errMdef.h" // EPICS error codes
#include "resourceLib.h" // EPICS hashing templates
#include "errlog.h" // EPICS error logging interface
#include "epicsSingleton.h"
//
// CA
@@ -188,44 +189,38 @@ private:
class casCtx {
public:
inline casCtx();
casCtx();
//
// get
//
inline const caHdr *getMsg() const;
inline void *getData() const;
inline caServerI * getServer() const;
inline casCoreClient * getClient() const;
inline casPVI * getPV() const;
inline casChannelI * getChannel() const;
const caHdrLargeArray * getMsg() const;
void * getData() const;
caServerI * getServer() const;
casCoreClient * getClient() const;
casPVI * getPV() const;
casChannelI * getChannel() const;
//
// set
// (assumes incoming message is in network byte order)
//
inline void setMsg(const char *pBuf);
void setMsg ( caHdrLargeArray &, void * pBody );
inline void setData(void *p);
void setServer ( caServerI *p );
inline void setServer(caServerI *p);
void setClient ( casCoreClient *p );
inline void setClient(casCoreClient *p);
void setPV ( casPVI *p );
inline void setPV(casPVI *p);
void setChannel ( casChannelI *p );
inline void setChannel(casChannelI *p);
void show (unsigned level) const;
void show ( unsigned level ) const;
private:
caHdr msg; // ca message header
void *pData; // pointer to data following header
caServerI *pCAS;
casCoreClient *pClient;
casChannelI *pChannel;
casPVI *pPV;
unsigned nAsyncIO; // checks for improper use of async io
caHdrLargeArray msg; // ca message header
void *pData; // pointer to data following header
caServerI *pCAS;
casCoreClient *pClient;
casChannelI *pChannel;
casPVI *pPV;
unsigned nAsyncIO; // checks for improper use of async io
};
//
@@ -248,6 +243,37 @@ private:
bufSizeT nextReadIndex;
};
class casBufferFactory {
public:
casBufferFactory ();
~casBufferFactory ();
unsigned smallBufferSize () const;
char * newSmallBuffer ();
void destroySmallBuffer ( char * pBuf );
unsigned largeBufferSize () const;
char * newLargeBuffer ();
void destroyLargeBuffer ( char * pBuf );
private:
void * smallBufFreeList;
void * largeBufFreeList;
unsigned largeBufferSizePriv;
};
extern epicsSingleton < casBufferFactory > pGlobalBufferFactoryCAS;
class inBufClient {
public:
enum fillCondition { casFillNone, casFillProgress,
casFillDisconnect };
// this is a hack for a Solaris IP kernel feature
enum fillParameter { fpNone, fpUseBroadcastInterface };
virtual unsigned getDebugLevel () const = 0;
virtual bufSizeT incomingBytesPresent () const = 0;
virtual fillCondition xRecv ( char *pBuf, bufSizeT nBytesToRecv,
enum fillParameter parm, bufSizeT &nByesRecv ) = 0;
virtual void hostName ( char *pBuf, unsigned bufSize ) const = 0;
};
//
// inBuf
//
@@ -255,13 +281,7 @@ class inBuf {
friend class inBufCtx;
public:
enum fillCondition {casFillNone, casFillProgress,
casFillDisconnect};
// this is a hack for a Solaris IP kernel feature
enum fillParameter {fpNone, fpUseBroadcastInterface};
inBuf (bufSizeT bufSizeIn, bufSizeT ioMinSizeIn);
inBuf ( inBufClient &, bufSizeT ioMinSizeIn );
virtual ~inBuf ();
bufSizeT bytesPresent () const;
@@ -273,17 +293,15 @@ public:
//
// fill the input buffer with any incoming messages
//
fillCondition fill (enum fillParameter parm=fpNone);
inBufClient::fillCondition fill ( inBufClient::fillParameter parm = inBufClient::fpNone );
void show (unsigned level) const;
void show ( unsigned level ) const;
protected:
void removeMsg ( bufSizeT nBytes );
void clear ();
char *msgPtr () const;
void removeMsg (bufSizeT nBytes);
char * msgPtr () const;
//
// This is used to create recursive protocol stacks. A subsegment
@@ -294,22 +312,22 @@ protected:
//
// pushCtx() returns an outBufCtx to be restored by popCtx()
//
const inBufCtx pushCtx (bufSizeT headerSize, bufSizeT bodySize);
bufSizeT popCtx (const inBufCtx &); // returns actual size
const inBufCtx pushCtx ( bufSizeT headerSize, bufSizeT bodySize );
bufSizeT popCtx ( const inBufCtx & ); // returns actual size
unsigned bufferSize () const;
void expandBuffer ();
private:
epicsMutex mutex;
char *pBuf;
inBufClient & client;
char * pBuf;
bufSizeT bufSize;
bufSizeT bytesInBuffer;
bufSizeT nextReadIndex;
bufSizeT ioMinSize;
unsigned ctxRecursCount;
virtual unsigned getDebugLevel () const = 0;
virtual bufSizeT incomingBytesPresent () const = 0;
virtual fillCondition xRecv (char *pBuf, bufSizeT nBytesToRecv,
enum inBuf::fillParameter parm, bufSizeT &nByesRecv) = 0;
virtual void clientHostName (char *pBuf, unsigned bufSize) const = 0;
inBuf ( const inBuf & );
inBuf & operator = ( const inBuf & );
};
@@ -333,6 +351,16 @@ private:
bufSizeT stack;
};
class outBufClient {
public:
enum flushCondition { flushNone, flushProgress, flushDisconnect };
virtual unsigned getDebugLevel () const = 0;
virtual void sendBlockSignal () = 0;
virtual flushCondition xSend ( char *pBuf, bufSizeT nBytesAvailableToSend,
bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent ) = 0;
virtual void hostName ( char *pBuf, unsigned bufSize ) const = 0;
};
//
// outBuf
//
@@ -340,34 +368,37 @@ class outBuf {
friend class outBufCtx;
public:
enum flushCondition {flushNone, flushProgress, flushDisconnect};
outBuf (bufSizeT bufSizeIn);
outBuf ( outBufClient & );
virtual ~outBuf ();
bufSizeT bytesPresent() const;
bufSizeT bytesPresent () const;
//
// flush output queue
// (returns the number of bytes sent)
//
flushCondition flush (bufSizeT spaceRequired=bufSizeT_MAX);
outBufClient::flushCondition flush ( bufSizeT spaceRequired=bufSizeT_MAX );
void show (unsigned level) const;
protected:
unsigned bufferSize () const;
//
// allocate message buffer space
// (leaves message buffer locked)
//
caStatus allocMsg (bufSizeT extsize, caHdr **ppMsg);
caStatus copyInHeader ( ca_uint16_t response, ca_uint32_t payloadSize,
ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid,
ca_uint32_t responseSpecific, void **pPayload );
//
// allocate message buffer space
// (leaves message buffer locked)
//
caStatus allocRawMsg (bufSizeT size, void **ppMsg);
//
// commit message created with copyInHeader
//
void commitMsg ();
void commitMsg ( ca_uint32_t reducedPayloadSize );
caStatus allocRawMsg ( bufSizeT msgsize, void **ppMsg );
void commitRawMsg ( bufSizeT size );
//
// This is used to create recursive protocol stacks. A subsegment
@@ -378,37 +409,20 @@ protected:
//
// pushCtx() returns an outBufCtx to be restored by popCtx()
//
const outBufCtx pushCtx (bufSizeT headerSize, bufSizeT maxBodySize, void *&pHeader);
bufSizeT popCtx (const outBufCtx &); // returns actual size
//
// commits message allocated with allocMsg()
//
void commitMsg ();
//
// commits message allocated with allocRawMsg()
//
void commitRawMsg (bufSizeT size);
const outBufCtx pushCtx ( bufSizeT headerSize, bufSizeT maxBodySize, void *&pHeader );
bufSizeT popCtx ( const outBufCtx & ); // returns actual size
void clear ();
private:
mutable epicsMutex mutex;
char *pBuf;
outBufClient & client;
char * pBuf;
bufSizeT bufSize;
bufSizeT stack;
unsigned ctxRecursCount;
virtual unsigned getDebugLevel() const = 0;
virtual void sendBlockSignal() = 0;
//
// io dependent
//
virtual flushCondition xSend (char *pBuf, bufSizeT nBytesAvailableToSend,
bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent) = 0;
virtual void clientHostName (char *pBuf, unsigned bufSize) const = 0;
void expandBuffer ();
outBuf ( const outBuf & );
outBuf & operator = ( const outBuf & );
@@ -444,8 +458,8 @@ public:
caServerI &getCAS() const;
virtual caStatus monitorResponse (casChannelI &chan, const caHdr &msg,
const smartConstGDDPointer &pDesc, const caStatus status);
virtual caStatus monitorResponse ( casChannelI &chan, const caHdrLargeArray &msg,
const smartConstGDDPointer &pDesc, const caStatus status );
virtual caStatus accessRightsResponse(casChannelI *);
@@ -454,16 +468,16 @@ public:
// asynchronous completion
//
virtual caStatus asyncSearchResponse (
const caNetAddr &outAddr,
const caHdr &, const pvExistReturn &);
const caNetAddr & outAddr,
const caHdrLargeArray &, const pvExistReturn & );
virtual caStatus createChanResponse (
const caHdr &, const pvAttachReturn &);
const caHdrLargeArray &, const pvAttachReturn &);
virtual caStatus readResponse (
casChannelI *, const caHdr &, const smartConstGDDPointer &, const caStatus);
casChannelI *, const caHdrLargeArray &, const smartConstGDDPointer &, const caStatus);
virtual caStatus readNotifyResponse (
casChannelI *, const caHdr &, const smartConstGDDPointer &, const caStatus);
virtual caStatus writeResponse (const caHdr &, const caStatus);
virtual caStatus writeNotifyResponse (const caHdr &, const caStatus);
casChannelI *, const caHdrLargeArray &, const smartConstGDDPointer &, const caStatus);
virtual caStatus writeResponse (const caHdrLargeArray &, const caStatus);
virtual caStatus writeNotifyResponse (const caHdrLargeArray &, const caStatus);
//
// used only with DG clients
@@ -471,8 +485,8 @@ public:
virtual caNetAddr fetchLastRecvAddr () const;
protected:
casCtx ctx;
unsigned char asyncIOFlag;
casCtx ctx;
unsigned char asyncIOFlag;
private:
tsDLList<casAsyncIOI> ioInProgList;
@@ -483,17 +497,17 @@ private:
//
// casClient
//
class casClient : public casCoreClient, public inBuf, public outBuf {
class casClient : public casCoreClient, public outBufClient,
public inBufClient {
public:
typedef caStatus (casClient::*pCASMsgHandler) ();
casClient (caServerI &, bufSizeT ioMinSizeIn);
casClient ( caServerI &, bufSizeT ioMinSizeIn );
virtual ~casClient ();
virtual void show (unsigned level) const;
virtual void show ( unsigned level ) const;
caStatus sendErr (const caHdr *, const int reportedStatus,
const char *pFormat, ...);
caStatus sendErr ( const caHdrLargeArray *, const int reportedStatus,
const char *pFormat, ... );
void sendVersion ();
@@ -502,32 +516,37 @@ public:
//
// find the channel associated with a resource id
//
casChannelI *resIdToChannel (const caResId &id);
casChannelI * resIdToChannel (const caResId &id);
virtual void clientHostName (char *pBuf, unsigned bufSize) const = 0;
virtual void hostName ( char *pBuf, unsigned bufSize ) const = 0;
protected:
unsigned minor_version_number;
epicsTime lastSendTS;
epicsTime lastRecvTS;
inBuf in;
outBuf out;
unsigned minor_version_number;
unsigned incommingBytesToDrain;
epicsTime lastSendTS;
epicsTime lastRecvTS;
caStatus sendErrWithEpicsStatus(const caHdr *pMsg,
caStatus epicsStatus, caStatus clientStatus);
caStatus sendErrWithEpicsStatus ( const caHdrLargeArray *pMsg,
caStatus epicsStatus, caStatus clientStatus );
# define logBadId(MP, DP, CACSTAT, RESID) \
this->logBadIdWithFileAndLineno(MP, DP, CACSTAT, __FILE__, __LINE__, RESID)
caStatus logBadIdWithFileAndLineno(const caHdr *mp,
caStatus logBadIdWithFileAndLineno ( const caHdrLargeArray *mp,
const void *dp, const int cacStat, const char *pFileName,
const unsigned lineno, const unsigned resId);
const unsigned lineno, const unsigned resId );
caStatus processMsg();
//
// dump message to stderr
//
void dumpMsg (const caHdr *mp, const void *dp, const char *pFormat, ...);
void dumpMsg ( const caHdrLargeArray *mp, const void *dp, const char *pFormat, ... );
private:
typedef caStatus ( casClient::*pCASMsgHandler ) ();
//
// one function for each CA request type
@@ -579,6 +598,8 @@ public:
void show (unsigned level) const;
void flush ();
//
// installChannel()
//
@@ -593,15 +614,15 @@ public:
// one function for each CA request type that has
// asynchronous completion
//
virtual caStatus createChanResponse (const caHdr &, const pvAttachReturn &);
caStatus readResponse (casChannelI *pChan, const caHdr &msg,
virtual caStatus createChanResponse (const caHdrLargeArray &, const pvAttachReturn &);
caStatus readResponse (casChannelI *pChan, const caHdrLargeArray &msg,
const smartConstGDDPointer &pDesc, const caStatus status);
caStatus readNotifyResponse (casChannelI *pChan, const caHdr &msg,
caStatus readNotifyResponse (casChannelI *pChan, const caHdrLargeArray &msg,
const smartConstGDDPointer &pDesc, const caStatus status);
caStatus writeResponse (const caHdr &msg, const caStatus status);
caStatus writeNotifyResponse (const caHdr &msg, const caStatus status);
caStatus monitorResponse (casChannelI &chan, const caHdr &msg,
const smartConstGDDPointer &pDesc, const caStatus status);
caStatus writeResponse (const caHdrLargeArray &msg, const caStatus status);
caStatus writeNotifyResponse (const caHdrLargeArray &msg, const caStatus status);
caStatus monitorResponse ( casChannelI & chan, const caHdrLargeArray & msg,
const smartConstGDDPointer & pDesc, const caStatus status );
caStatus noReadAccessEvent(casClientMon *);
@@ -615,7 +636,7 @@ public:
unsigned getDebugLevel() const;
virtual void clientHostName (char *pBuf, unsigned bufSize) const = 0;
virtual void hostName (char *pBuf, unsigned bufSize) const = 0;
protected:
@@ -666,7 +687,7 @@ private:
//
// channelCreateFailed()
//
caStatus channelCreateFailed (const caHdr *mp, caStatus createStatus);
caStatus channelCreateFailed ( const caHdrLargeArray *mp, caStatus createStatus );
caStatus writeArrayData();
caStatus writeScalarData();
@@ -675,22 +696,26 @@ private:
//
// io independent send/recv
//
outBuf::flushCondition xSend (char *pBuf, bufSizeT nBytesAvailableToSend,
bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent);
inBuf::fillCondition xRecv (char *pBuf, bufSizeT nBytesToRecv,
fillParameter parm, bufSizeT &nByesRecv);
outBufClient::flushCondition xSend ( char * pBuf, bufSizeT nBytesAvailableToSend,
bufSizeT nBytesNeedToBeSent, bufSizeT & nBytesSent );
inBufClient::fillCondition xRecv ( char * pBuf, bufSizeT nBytesToRecv,
inBufClient::fillParameter parm, bufSizeT & nByesRecv );
virtual xBlockingStatus blockingState() const = 0;
virtual outBuf::flushCondition osdSend (const char *pBuf, bufSizeT nBytesReq,
bufSizeT &nBytesActual) = 0;
virtual inBuf::fillCondition osdRecv (char *pBuf, bufSizeT nBytesReq,
bufSizeT &nBytesActual) = 0;
virtual outBufClient::flushCondition osdSend ( const char *pBuf, bufSizeT nBytesReq,
bufSizeT & nBytesActual ) = 0;
virtual inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq,
bufSizeT &nBytesActual ) = 0;
caStatus readNotifyResponseECA_XXX (casChannelI *pChan,
const caHdr &msg, const smartConstGDDPointer &pDesc, const caStatus ecaStatus);
caStatus writeNotifyResponseECA_XXX (const caHdr &msg,
const caStatus status);
caStatus readNotifyFailureResponse ( const caHdrLargeArray & msg,
const caStatus ECA_XXXX );
caStatus monitorFailureResponse ( const caHdrLargeArray & msg,
const caStatus ECA_XXXX );
caStatus writeNotifyResponseECA_XXX ( const caHdrLargeArray &msg,
const caStatus status );
casStrmClient ( const casStrmClient & );
casStrmClient & operator = ( const casStrmClient & );
@@ -712,7 +737,7 @@ public:
//
// only for use with DG io
//
void sendBeacon ();
void sendBeacon ( ca_uint32_t beaconNumber );
virtual void sendBeaconIO (char &msg, bufSizeT length,
aitUint16 &portField, aitUint32 &addrField) = 0;
@@ -721,7 +746,7 @@ public:
unsigned getDebugLevel() const;
void clientHostName (char *pBuf, unsigned bufSize) const;
void hostName (char *pBuf, unsigned bufSize) const;
caNetAddr fetchLastRecvAddr () const;
@@ -749,25 +774,25 @@ private:
//
// searchFailResponse()
//
caStatus searchFailResponse (const caHdr *pMsg);
caStatus searchFailResponse (const caHdrLargeArray *pMsg);
caStatus searchResponse (const caHdr &, const pvExistReturn &);
caStatus searchResponse (const caHdrLargeArray &, const pvExistReturn &);
caStatus asyncSearchResponse (const caNetAddr &outAddr,
const caHdr &msg, const pvExistReturn &);
caStatus asyncSearchResponse ( const caNetAddr & outAddr,
const caHdrLargeArray & msg, const pvExistReturn & );
//
// IO depen
//
outBuf::flushCondition xSend (char *pBufIn, bufSizeT nBytesAvailableToSend,
bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent);
inBuf::fillCondition xRecv (char *pBufIn, bufSizeT nBytesToRecv,
fillParameter parm, bufSizeT &nByesRecv);
outBufClient::flushCondition xSend ( char *pBufIn, bufSizeT nBytesAvailableToSend,
bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent );
inBufClient::fillCondition xRecv ( char *pBufIn, bufSizeT nBytesToRecv,
fillParameter parm, bufSizeT &nByesRecv );
virtual outBuf::flushCondition osdSend (const char *pBuf, bufSizeT nBytesReq,
const caNetAddr &addr) = 0;
virtual inBuf::fillCondition osdRecv (char *pBuf, bufSizeT nBytesReq,
fillParameter parm, bufSizeT &nBytesActual, caNetAddr &addr) = 0;
virtual outBufClient::flushCondition osdSend ( const char *pBuf, bufSizeT nBytesReq,
const caNetAddr & addr ) = 0;
virtual inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq,
fillParameter parm, bufSizeT &nBytesActual, caNetAddr & addr ) = 0;
//
// cadg
@@ -915,6 +940,7 @@ private:
unsigned debugLevel;
unsigned nEventsProcessed;
unsigned nEventsPosted;
ca_uint32_t beaconCounter;
//
// predefined event types

View File

@@ -207,7 +207,7 @@ void casDGIOWakeup::show(unsigned level) const
//
void casDGIntfOS::armRecv()
{
if (!this->inBuf::full()) {
if ( ! this->in.full () ) {
if (!this->pRdReg) {
this->pRdReg = new casDGReadReg(*this);
if (!this->pRdReg) {
@@ -243,7 +243,7 @@ void casDGIntfOS::disarmRecv()
//
void casDGIntfOS::armSend()
{
if (this->outBuf::bytesPresent()==0u) {
if ( this->out.bytesPresent () == 0u ) {
return;
}
@@ -291,7 +291,7 @@ void casDGIntfOS::eventFlush()
// if there is nothing pending in the input
// queue, then flush the output queue
//
if (this->inBuf::bytesAvailable()==0u) {
if ( this->in.bytesAvailable() == 0u ) {
this->armSend ();
}
}
@@ -322,7 +322,7 @@ void casDGIntfOS::show(unsigned level) const
//
void casDGReadReg::callBack()
{
this->os.recvCB (inBuf::fpNone);
this->os.recvCB ( inBufClient::fpNone );
}
//
@@ -348,7 +348,7 @@ void casDGReadReg::show(unsigned level) const
//
void casDGBCastReadReg::callBack()
{
this->os.recvCB (inBuf::fpUseBroadcastInterface);
this->os.recvCB ( inBufClient::fpUseBroadcastInterface );
}
//
@@ -414,13 +414,13 @@ void casDGIntfOS::sendBlockSignal()
//
void casDGIntfOS::sendCB()
{
flushCondition flushCond;
outBufClient::flushCondition flushCond;
//
// attempt to flush the output buffer
//
flushCond = this->flush();
if (flushCond==flushProgress) {
flushCond = this->out.flush();
if ( flushCond == flushProgress ) {
if (this->sendBlocked) {
this->sendBlocked = false;
}
@@ -458,14 +458,14 @@ void casDGIntfOS::sendCB()
//
// casDGIntfOS::recvCB()
//
void casDGIntfOS::recvCB (inBuf::fillParameter parm)
void casDGIntfOS::recvCB ( inBufClient::fillParameter parm )
{
assert (this->pRdReg);
//
// copy in new messages
//
this->inBuf::fill (parm);
this->in.fill ( parm );
this->processInput ();
//
@@ -478,7 +478,7 @@ void casDGIntfOS::recvCB (inBuf::fillParameter parm)
// (casDGReadReg is _not_ a onceOnly fdReg -
// therefore an explicit delete is required here)
//
if (this->inBuf::full()) {
if ( this->in.full() ) {
this->disarmRecv(); // this deletes the casDGReadReg object
}
}
@@ -500,7 +500,7 @@ void casDGIntfOS::processInput()
status!=S_casApp_postponeAsyncIO) {
char pName[64u];
this->clientHostName (pName, sizeof (pName));
this->hostName (pName, sizeof (pName));
errPrintf (status, __FILE__, __LINE__,
"unexpected problem with UDP input from \"%s\"", pName);
}
@@ -511,8 +511,8 @@ void casDGIntfOS::processInput()
// input buffer then keep sending the output
// buffer until it is empty
//
if (this->outBuf::bytesPresent()>0u) {
if ( this->bytesAvailable () == 0 ) {
if ( this->out.bytesPresent() > 0u ) {
if ( this->in.bytesAvailable () == 0 ) {
this->armSend ();
}
}

View File

@@ -94,12 +94,12 @@ public:
void processInput();
private:
casDGIOWakeup ioWk;
casDGEvWakeup evWk;
casDGReadReg *pRdReg;
casDGBCastReadReg *pBCastRdReg; // fix for solaris bug
casDGWriteReg *pWtReg;
bool sendBlocked;
casDGIOWakeup ioWk;
casDGEvWakeup evWk;
casDGReadReg *pRdReg;
casDGBCastReadReg *pBCastRdReg; // fix for solaris bug
casDGWriteReg *pWtReg;
bool sendBlocked;
void armRecv ();
void armSend ();
@@ -107,7 +107,7 @@ private:
void disarmRecv ();
void disarmSend ();
void recvCB (inBuf::fillParameter parm);
void recvCB ( inBufClient::fillParameter parm );
void sendCB ();
void sendBlockSignal ();

View File

@@ -244,11 +244,11 @@ void casStreamIOWakeup::start ( casStreamOS &os )
//
inline void casStreamOS::armRecv()
{
if (!this->pRdReg) {
if (!this->inBuf::full()) {
this->pRdReg = new casStreamReadReg(*this);
if (!this->pRdReg) {
errMessage(S_cas_noMemory, "armRecv()");
if ( ! this->pRdReg ) {
if ( ! this->in.full() ) {
this->pRdReg = new casStreamReadReg ( *this );
if ( ! this->pRdReg ) {
errMessage ( S_cas_noMemory, "armRecv()" );
throw S_cas_noMemory;
}
}
@@ -270,11 +270,11 @@ inline void casStreamOS::disarmRecv()
//
inline void casStreamOS::armSend()
{
if (this->outBuf::bytesPresent()==0u) {
if ( this->out.bytesPresent () == 0u ) {
return;
}
if (!this->pWtReg) {
if ( ! this->pWtReg ) {
this->pWtReg = new casStreamWriteReg(*this);
if (!this->pWtReg) {
errMessage(S_cas_noMemory, "armSend() failed");
@@ -318,7 +318,7 @@ void casStreamOS::eventFlush()
// if there is nothing pending in the input
// queue, then flush the output queue
//
if (this->inBuf::bytesAvailable()==0u) {
if ( this->in.bytesAvailable() == 0u ) {
this->armSend ();
}
}
@@ -396,16 +396,16 @@ void casStreamReadReg::callBack ()
//
void casStreamOS::recvCB()
{
inBuf::fillCondition fillCond;
inBufClient::fillCondition fillCond;
casProcCond procCond;
assert (this->pRdReg);
assert ( this->pRdReg );
//
// copy in new messages
//
fillCond = this->fill();
if (fillCond == casFillDisconnect) {
fillCond = this->in.fill();
if ( fillCond == casFillDisconnect ) {
delete this;
}
else {
@@ -413,7 +413,7 @@ void casStreamOS::recvCB()
if (procCond == casProcDisconnect) {
delete this;
}
else if (this->inBuf::full()) {
else if ( this->in.full() ) {
//
// If there isnt any space then temporarily
// stop calling this routine until problem is resolved
@@ -470,19 +470,19 @@ void casStreamWriteReg::callBack()
//
void casStreamOS::sendCB()
{
outBuf::flushCondition flushCond;
outBufClient::flushCondition flushCond;
casProcCond procCond;
//
// attempt to flush the output buffer
//
flushCond = this->flush();
flushCond = this->out.flush ();
if (flushCond==flushProgress) {
if (this->sendBlocked) {
this->sendBlocked = FALSE;
}
}
else if (flushCond==outBuf::flushDisconnect) {
else if ( flushCond == outBufClient::flushDisconnect ) {
//
// ok to delete the client here
// because casStreamWriteReg::callBack()
@@ -552,8 +552,8 @@ void casStreamOS::sendCB()
// additional bytes may have been added since
// we flushed the out buffer
//
if (this->outBuf::bytesPresent()>0u &&
this->inBuf::bytesAvailable()==0u) {
if ( this->out.bytesPresent() > 0u &&
this->in.bytesAvailable() == 0u ) {
this->armSend();
}
}
@@ -586,7 +586,7 @@ casProcCond casStreamOS::processInput() // X aCC 361
// if there is nothing pending in the input
// queue, then flush the output queue
//
if (this->inBuf::bytesAvailable()==0u) {
if ( this->in.bytesAvailable() == 0u ) {
this->armSend ();
}
this->armRecv ();