diff --git a/src/cas/generic/caServerI.cc b/src/cas/generic/caServerI.cc index d412aa1fe..bea408034 100644 --- a/src/cas/generic/caServerI.cc +++ b/src/cas/generic/caServerI.cc @@ -62,7 +62,8 @@ caServerI::caServerI (caServer &tool) : adapter (tool), debugLevel (0u), nEventsProcessed (0u), - nEventsPosted (0u) + nEventsPosted (0u), + beaconCounter (0u) { caStatus status; double maxPeriod; @@ -239,10 +240,12 @@ void caServerI::sendBeacon() epicsGuard < epicsMutex > locker ( this->mutex ); tsDLIterBD iter = this->intfList.firstIter (); while ( iter.valid () ) { - iter->sendBeacon (); + iter->sendBeacon ( this->beaconCounter ); iter++; } } + + this->beaconCounter++; // // double the period between beacons (but dont exceed max) diff --git a/src/cas/generic/casAsyncReadIO.cc b/src/cas/generic/casAsyncReadIO.cc index 1f340049c..bd92398c0 100644 --- a/src/cas/generic/casAsyncReadIO.cc +++ b/src/cas/generic/casAsyncReadIO.cc @@ -37,26 +37,23 @@ // // casAsyncReadIO::casAsyncReadIO() // -casAsyncReadIO::casAsyncReadIO(const casCtx &ctx) : - casAsyncIOI(*ctx.getClient()), - msg(*ctx.getMsg()), - chan(*ctx.getChannel()), - pDD(NULL), - completionStatus(S_cas_internal) +casAsyncReadIO::casAsyncReadIO ( const casCtx & ctx ) : + casAsyncIOI ( *ctx.getClient() ), msg ( *ctx.getMsg() ), + chan( *ctx.getChannel () ), pDD ( NULL ), completionStatus ( S_cas_internal ) { - assert (&this->chan); + assert ( &this->chan ); - this->chan.installAsyncIO(*this); + this->chan.installAsyncIO ( *this ); } // // casAsyncReadIO::~casAsyncReadIO() // -casAsyncReadIO::~casAsyncReadIO() +casAsyncReadIO::~casAsyncReadIO () { this->lock(); - this->chan.removeAsyncIO(*this); + this->chan.removeAsyncIO ( *this ); this->unlock(); } diff --git a/src/cas/generic/casBufferFactory.cpp b/src/cas/generic/casBufferFactory.cpp new file mode 100644 index 000000000..fdbb15ca2 --- /dev/null +++ b/src/cas/generic/casBufferFactory.cpp @@ -0,0 +1,118 @@ +/* + * $Id$ + * + * Author Jeffrey O. Hill + * johill@lanl.gov + * 505 665 1831 + * + * Experimental Physics and Industrial Control System (EPICS) + * + * Copyright 1991, the Regents of the University of California, + * and the University of Chicago Board of Governors. + * + * This software was produced under U.S. Government contracts: + * (W-7405-ENG-36) at the Los Alamos National Laboratory, + * and (W-31-109-ENG-38) at Argonne National Laboratory. + * + * Initial development by: + * The Controls and Automation Group (AT-8) + * Ground Test Accelerator + * Accelerator Technology Division + * Los Alamos National Laboratory + * + * Co-developed with + * The Controls and Computing Group + * Accelerator Systems Division + * Advanced Photon Source + * Argonne National Laboratory + * + */ + +#include + +#include "envdefs.h" +#include "freelist.h" + +#include "server.h" + +epicsSingleton < casBufferFactory > pGlobalBufferFactoryCAS; + +casBufferFactory::casBufferFactory () : + smallBufFreeList ( 0 ), largeBufFreeList ( 0 ), largeBufferSizePriv ( 0u ) +{ + long maxBytesAsALong; + long status = envGetLongConfigParam ( & EPICS_CA_MAX_ARRAY_BYTES, & maxBytesAsALong ); + if ( status || maxBytesAsALong < 0 ) { + errlogPrintf ( "cas: EPICS_CA_MAX_ARRAY_BYTES was not a positive integer\n" ); + this->largeBufferSizePriv = MAX_TCP; + } + else { + /* allow room for the protocol header so that they get the array size they requested */ + static const unsigned headerSize = sizeof ( caHdr ) + 2 * sizeof ( ca_uint32_t ); + ca_uint32_t maxBytes = ( unsigned ) maxBytesAsALong; + if ( maxBytes < 0xffffffff - headerSize ) { + maxBytes += headerSize; + } + else { + maxBytes = 0xffffffff; + } + if ( maxBytes < MAX_TCP ) { + errlogPrintf ( "cas: EPICS_CA_MAX_ARRAY_BYTES was rounded up to %u\n", MAX_TCP ); + this->largeBufferSizePriv = MAX_TCP; + } + else { + this->largeBufferSizePriv = maxBytes; + } + } + + freeListInitPvt ( & this->smallBufFreeList, MAX_MSG_SIZE, 8 ); + freeListInitPvt ( & this->largeBufFreeList, this->largeBufferSizePriv, 1 ); +} + +casBufferFactory::~casBufferFactory () +{ + freeListCleanup ( this->smallBufFreeList ); + freeListCleanup ( this->largeBufFreeList ); +} + +unsigned casBufferFactory::smallBufferSize () const +{ + return MAX_MSG_SIZE; +} + +char * casBufferFactory::newSmallBuffer () +{ + void * pBuf = freeListCalloc ( this->smallBufFreeList ); + if ( ! pBuf ) { + throw std::bad_alloc(); + } + return static_cast < char * > ( pBuf ); +} + +void casBufferFactory::destroySmallBuffer ( char * pBuf ) +{ + if ( pBuf ) { + freeListFree ( this->smallBufFreeList, pBuf ); + } +} + +unsigned casBufferFactory::largeBufferSize () const +{ + return this->largeBufferSizePriv; +} + +char * casBufferFactory::newLargeBuffer () +{ + void * pBuf = freeListCalloc ( this->largeBufFreeList ); + if ( ! pBuf ) { + throw std::bad_alloc(); + } + return static_cast < char * > ( pBuf ); +} + +void casBufferFactory::destroyLargeBuffer ( char * pBuf ) +{ + if ( pBuf ) { + freeListFree ( this->largeBufFreeList, pBuf ); + } +} diff --git a/src/cas/generic/casChannelI.cc b/src/cas/generic/casChannelI.cc index 3a706cb0b..d2802a8a5 100644 --- a/src/cas/generic/casChannelI.cc +++ b/src/cas/generic/casChannelI.cc @@ -204,4 +204,23 @@ void casChannelI::destroyClientNotify () this->destroy(); } +// +// casChannelI::findMonitor +// (it is reasonable to do a linear search here because +// sane clients will require only one or two monitors +// per channel) +// +tsDLIterBD casChannelI::findMonitor (const caResId clientIdIn) +{ + this->lock (); + tsDLIterBD iter = this->monitorList.firstIter (); + while ( iter.valid () ) { + if ( clientIdIn == iter->getClientId () ) { + break; + } + iter++; + } + this->unlock (); + return iter; +} diff --git a/src/cas/generic/casChannelIIL.h b/src/cas/generic/casChannelIIL.h index eee5daaee..a87d60029 100644 --- a/src/cas/generic/casChannelIIL.h +++ b/src/cas/generic/casChannelIIL.h @@ -96,26 +96,6 @@ inline void casChannelI::addMonitor(casMonitor &mon) this->unlock(); } -// -// casChannelI::findMonitor -// (it is reasonable to do a linear search here because -// sane clients will require only one or two monitors -// per channel) -// -inline tsDLIterBD casChannelI::findMonitor (const caResId clientIdIn) -{ - this->lock (); - tsDLIterBD iter = this->monitorList.firstIter (); - while ( iter.valid () ) { - if ( clientIdIn == iter->getClientId () ) { - break; - } - iter++; - } - this->unlock (); - return iter; -} - // // casChannelI::destroyNoClientNotify() // diff --git a/src/cas/generic/casClient.cc b/src/cas/generic/casClient.cc index f789c13b4..883f62fe9 100644 --- a/src/cas/generic/casClient.cc +++ b/src/cas/generic/casClient.cc @@ -51,9 +51,9 @@ casClient::pCASMsgHandler casClient::msgHandlers[CA_PROTO_LAST_CMMD+1u]; // // casClient::casClient() // -casClient::casClient(caServerI &serverInternal, bufSizeT ioSizeMinIn) : - casCoreClient (serverInternal), - inBuf (MAX_MSG_SIZE, ioSizeMinIn), outBuf (MAX_MSG_SIZE) +casClient::casClient ( caServerI &serverInternal, bufSizeT ioSizeMinIn ) : + casCoreClient ( serverInternal ), in ( *this, ioSizeMinIn ), out ( *this ), + minor_version_number ( 0 ), incommingBytesToDrain ( 0 ) { // // static member init @@ -183,8 +183,8 @@ void casClient::show (unsigned level) const printf ( "casClient at %p\n", static_cast ( this ) ); this->casCoreClient::show (level); - this->inBuf::show (level); - this->outBuf::show (level); + this->in.show (level); + this->out.show (level); } // @@ -192,88 +192,122 @@ void casClient::show (unsigned level) const // caStatus casClient::processMsg () { - unsigned msgsize; - unsigned bytesLeft; - int status; - const caHdr *mp; - const char *rawMP; + // drain message that does not fit + if ( this->incommingBytesToDrain ) { + unsigned bytesLeft = this->in.bytesPresent(); + if ( bytesLeft < this->incommingBytesToDrain ) { + this->in.removeMsg ( bytesLeft ); + this->incommingBytesToDrain -= bytesLeft; + return S_cas_success; + } + else { + this->in.removeMsg ( this->incommingBytesToDrain ); + this->incommingBytesToDrain = 0u; + } + } // // process any messages in the in buffer // - status = S_cas_success; + int status = S_cas_success; - while ( (bytesLeft = this->inBuf::bytesPresent()) ) { + unsigned bytesLeft; + while ( ( bytesLeft = this->in.bytesPresent() ) ) { + caHdrLargeArray msgTmp; + unsigned msgSize; + ca_uint32_t hdrSize; + char * rawMP; + { + // + // copy as raw bytes in order to avoid + // alignment problems + // + caHdr smallHdr; + if ( bytesLeft < sizeof ( smallHdr ) ) { + break; + } - /* - * incomplete message - return success and - * wait for more bytes to arrive - */ - if (bytesLeft < sizeof(*mp)) { - status = S_cas_success; - break; - } + rawMP = this->in.msgPtr (); + memcpy ( & smallHdr, rawMP, sizeof ( smallHdr ) ); - rawMP = this->inBuf::msgPtr(); - this->ctx.setMsg(rawMP); + ca_uint32_t payloadSize = epicsNTOH16 ( smallHdr.m_postsize ); + ca_uint32_t nElem = epicsNTOH16 ( smallHdr.m_count ); + if ( payloadSize != 0xffff && nElem != 0xffff ) { + hdrSize = sizeof ( smallHdr ); + } + else { + ca_uint32_t LWA[2]; + hdrSize = sizeof ( smallHdr ) + sizeof ( LWA ); + if ( bytesLeft < hdrSize ) { + break; + } + // + // copy as raw bytes in order to avoid + // alignment problems + // + memcpy ( LWA, rawMP + sizeof ( caHdr ), sizeof( LWA ) ); + payloadSize = epicsNTOH32 ( LWA[0] ); + nElem = epicsNTOH32 ( LWA[1] ); + } - // - // get pointer to msg copy in local byte order - // - mp = this->ctx.getMsg(); + msgTmp.m_cmmd = epicsNTOH16 ( smallHdr.m_cmmd ); + msgTmp.m_postsize = payloadSize; + msgTmp.m_dataType = epicsNTOH16 ( smallHdr.m_dataType ); + msgTmp.m_count = nElem; + msgTmp.m_cid = epicsNTOH32 ( smallHdr.m_cid ); + msgTmp.m_available = epicsNTOH32 ( smallHdr.m_available ); - msgsize = mp->m_postsize + sizeof(*mp); - /* - * incomplete message - return success and - * wait for more bytes to arrive - */ - if (msgsize > bytesLeft) { - status = S_cas_success; - break; - } + msgSize = hdrSize + payloadSize; + if ( bytesLeft < msgSize ) { + if ( msgSize > this->in.bufferSize() ) { + this->in.expandBuffer (); + // msg to large - set up message drain + if ( msgSize > this->in.bufferSize() ) { + this->dumpMsg ( & msgTmp, 0, + "The client requested transfer is greater than available " + "memory in server or EPICS_CA_MAX_ARRAY_BYTES\n" ); + status = this->sendErr ( & msgTmp, ECA_TOLARGE, + "request didnt fit within the CA server's message buffer" ); + this->in.removeMsg ( bytesLeft ); + this->incommingBytesToDrain = msgSize - bytesLeft; + } + } + break; + } - this->ctx.setData((void *)(rawMP+sizeof(*mp))); + this->ctx.setMsg ( msgTmp, rawMP + hdrSize ); - if (this->getCAS().getDebugLevel()> 2u) { - this->dumpMsg (mp, (void *)(mp+1), NULL); - } + if ( this->getCAS().getDebugLevel() > 2u ) { + this->dumpMsg ( & msgTmp, rawMP + hdrSize, 0 ); + } + + } // // Reset the context to the default // (guarantees that previous message does not get mixed // up with the current message) // - this->ctx.setChannel (NULL); - this->ctx.setPV (NULL); + this->ctx.setChannel ( NULL ); + this->ctx.setPV ( NULL ); // - // Check for bad protocol element + // Call protocol stub // - if (mp->m_cmmd >= NELEMENTS(casClient::msgHandlers)){ - status = this->uknownMessageAction (); + pCASMsgHandler pHandler; + if ( msgTmp.m_cmmd < NELEMENTS ( casClient::msgHandlers ) ) { + pHandler = this->casClient::msgHandlers[msgTmp.m_cmmd]; + } + else { + pHandler = & casClient::uknownMessageAction; + } + status = ( this->*pHandler ) (); + if ( status ) { break; } - // - // Call protocol stub - // - try { - status = (this->*casClient::msgHandlers[mp->m_cmmd]) (); - if (status) { - break; - } - } - catch (...) { - this->dumpMsg (mp, this->ctx.getData(), "request resulted in C++ exception\n"); - status = this->sendErr (mp, ECA_INTERNAL, "request resulted in C++ exception"); - if (status) { - break; - } - break; - } - - this->inBuf::removeMsg (msgsize); + this->in.removeMsg ( msgSize ); } return status; @@ -339,22 +373,17 @@ caStatus casClient::hostNameAction () // caStatus casClient::echoAction () { - const caHdr *mp = this->ctx.getMsg(); - void *dp = this->ctx.getData(); - int status; - caHdr *reply; - - status = this->outBuf::allocMsg (mp->m_postsize, &reply); - if (status) { - if (status==S_cas_hugeRequest) { - status = sendErr(mp, ECA_TOLARGE, NULL); - } - return status; - } - *reply = *mp; - memcpy((char *) (reply+1), (char *) dp, mp->m_postsize); - this->outBuf::commitMsg(); + const caHdrLargeArray * mp = this->ctx.getMsg(); + const void * dp = this->ctx.getData(); + void * pPayloadOut; + caStatus status = this->out.copyInHeader ( mp->m_cmmd, mp->m_postsize, + mp->m_dataType, mp->m_count, mp->m_cid, mp->m_available, + & pPayloadOut ); + if ( ! status ) { + memcpy ( pPayloadOut, dp, mp->m_postsize ); + this->out.commitMsg (); + } return S_cas_success; } @@ -369,66 +398,48 @@ caStatus casClient::noopAction () // send minor protocol revision to the client void casClient::sendVersion () { - caHdr * pReply; - caStatus status = this->allocMsg ( 0, &pReply ); - if ( status ) { - return; + caStatus status = this->out.copyInHeader ( CA_PROTO_VERSION, 0, + 0, CA_MINOR_PROTOCOL_REVISION, 0, 0, 0 ); + if ( ! status ) { + this->out.commitMsg (); } - memset ( pReply, '\0', sizeof ( *pReply ) ); - pReply->m_cmmd = CA_PROTO_VERSION; - pReply->m_count = CA_MINOR_PROTOCOL_REVISION; - this->commitMsg (); } // // casClient::sendErr() // -caStatus casClient::sendErr( -const caHdr *curp, -const int reportedStatus, -const char *pformat, - ... -) +caStatus casClient::sendErr ( const caHdrLargeArray *curp, const int reportedStatus, + const char *pformat, ... ) { - va_list args; - casChannelI *pciu; - unsigned size; - caHdr *reply; - char *pMsgString; - int status; + unsigned stringSize; char msgBuf[1024]; /* allocate plenty of space for the message string */ - - if (pformat) { - va_start (args, pformat); - status = vsprintf (msgBuf, pformat, args); - if (status<0) { + if ( pformat ) { + va_list args; + va_start ( args, pformat ); + int status = vsprintf (msgBuf, pformat, args); + if ( status < 0 ) { errPrintf (S_cas_internal, __FILE__, __LINE__, "bad sendErr(%s)", pformat); - size = 0u; + stringSize = 0u; } else { - size = 1u + (unsigned) status; + stringSize = 1u + (unsigned) status; } } else { - size = 0u; + stringSize = 0u; } - /* - * allocate plenty of space for a sprintf() buffer - */ - status = this->outBuf::allocMsg (size+sizeof(caHdr), &reply); - if (status) { - return status; - } + unsigned hdrSize = sizeof ( caHdr ); + if ( ( curp->m_postsize >= 0xffff || curp->m_count >= 0xffff ) && + CA_V49( this->minor_version_number ) ) { + hdrSize += 2 * sizeof ( ca_uint32_t ); + } - reply[0] = nill_msg; - reply[0].m_cmmd = CA_PROTO_ERROR; - reply[0].m_available = reportedStatus; - - switch (curp->m_cmmd) { + ca_uint32_t cid = 0u; + switch ( curp->m_cmmd ) { case CA_PROTO_SEARCH: - reply->m_cid = curp->m_cid; + cid = curp->m_cid; break; case CA_PROTO_EVENT_ADD: @@ -437,52 +448,70 @@ const char *pformat, case CA_PROTO_READ_NOTIFY: case CA_PROTO_WRITE: case CA_PROTO_WRITE_NOTIFY: - /* - * Verify the channel - */ - pciu = this->resIdToChannel(curp->m_cid); - if(pciu){ - reply->m_cid = pciu->getCID(); - } - else{ - reply->m_cid = ~0u; - } - break; + { + /* + * Verify the channel + */ + casChannelI * pciu = this->resIdToChannel ( curp->m_cid ); + if ( pciu ) { + cid = pciu->getCID(); + } + else{ + cid = ~0u; + } + break; + } case CA_PROTO_EVENTS_ON: case CA_PROTO_EVENTS_OFF: case CA_PROTO_READ_SYNC: case CA_PROTO_SNAPSHOT: default: - reply->m_cid = (caResId) ~0UL; + cid = (caResId) ~0UL; break; } - /* - * copy back the request protocol - * (in network byte order) - */ - reply[1].m_postsize = epicsHTON16 (curp->m_postsize); - reply[1].m_cmmd = epicsHTON16 (curp->m_cmmd); - reply[1].m_dataType = epicsHTON16 (curp->m_dataType); - reply[1].m_count = epicsHTON16 (curp->m_count); - reply[1].m_cid = epicsHTON32 (curp->m_cid); - reply[1].m_available = epicsHTON32 (curp->m_available); + caHdr * pReqOut; + caStatus status = this->out.copyInHeader ( CA_PROTO_ERROR, + hdrSize + stringSize, 0, 0, cid, reportedStatus, + reinterpret_cast ( & pReqOut ) ); + if ( ! status ) { + char * pMsgString; - /* - * add their optional context string into the protocol - */ - if (size>0u) { - pMsgString = (char *) (reply+2u); - strncpy (pMsgString, msgBuf, size-1u); - pMsgString[size-1u] = '\0'; - } + /* + * copy back the request protocol + * (in network byte order) + */ + if ( ( curp->m_postsize >= 0xffff || curp->m_count >= 0xffff ) && + CA_V49( this->minor_version_number ) ) { + ca_uint32_t *pLW = ( ca_uint32_t * ) ( pReqOut + 1 ); + pReqOut->m_cmmd = htons ( curp->m_cmmd ); + pReqOut->m_postsize = htons ( 0xffff ); + pReqOut->m_dataType = htons ( curp->m_dataType ); + pReqOut->m_count = htons ( 0u ); + pReqOut->m_cid = htonl ( curp->m_cid ); + pReqOut->m_available = htonl ( curp->m_available ); + pLW[0] = htonl ( curp->m_postsize ); + pLW[1] = htonl ( curp->m_count ); + pMsgString = ( char * ) ( pLW + 2 ); + } + else { + pReqOut->m_cmmd = htons (curp->m_cmmd); + pReqOut->m_postsize = htons ( ( (ca_uint16_t) curp->m_postsize ) ); + pReqOut->m_dataType = htons (curp->m_dataType); + pReqOut->m_count = htons ( ( (ca_uint16_t) curp->m_count ) ); + pReqOut->m_cid = htonl (curp->m_cid); + pReqOut->m_available = htonl (curp->m_available); + pMsgString = ( char * ) ( pReqOut + 1 ); + } - size += sizeof(caHdr); - assert ( size < 0xffff ); - reply->m_postsize = static_cast ( size ); + /* + * add their context string into the protocol + */ + memcpy ( pMsgString, msgBuf, stringSize ); - this->outBuf::commitMsg(); + this->out.commitMsg (); + } return S_cas_success; } @@ -493,52 +522,44 @@ const char *pformat, * same as sendErr() except that we convert epicsStatus * to a string and send that additional detail */ -caStatus casClient::sendErrWithEpicsStatus(const caHdr *pMsg, - caStatus epicsStatus, caStatus clientStatus) +caStatus casClient::sendErrWithEpicsStatus ( const caHdrLargeArray * pMsg, + caStatus epicsStatus, caStatus clientStatus ) { long status; char buf[0x1ff]; - status = errSymFind(epicsStatus, buf); + status = errSymFind ( epicsStatus, buf ); if (status) { - sprintf(buf, "UKN error code = 0X%u\n", - epicsStatus); + sprintf ( buf, "UKN error code = 0X%u\n", + epicsStatus ); } - return this->sendErr(pMsg, clientStatus, buf); + return this->sendErr ( pMsg, clientStatus, buf ); } /* * casClient::logBadIdWithFileAndLineno() */ -caStatus casClient::logBadIdWithFileAndLineno( -const caHdr *mp, -const void *dp, -const int cacStatus, -const char *pFileName, -const unsigned lineno, -const unsigned id +caStatus casClient::logBadIdWithFileAndLineno ( const caHdrLargeArray * mp, + const void * dp, const int cacStatus, const char * pFileName, + const unsigned lineno, const unsigned id ) { int status; - if (pFileName) { - this->dumpMsg (mp, dp, + if ( pFileName) { + this-> dumpMsg ( mp, dp, "bad resource id in \"%s\" at line %d\n", - pFileName, lineno); + pFileName, lineno ); } else { - this->dumpMsg (mp, dp, - "bad resource id\n"); + this->dumpMsg ( mp, dp, + "bad resource id\n" ); } status = this->sendErr ( - mp, - cacStatus, - "Bad Resource ID=%u detected at %s.%d", - id, - pFileName, - lineno); + mp, cacStatus, "Bad Resource ID=%u detected at %s.%d", + id, pFileName, lineno); return status; } @@ -551,21 +572,22 @@ const unsigned id // dp arg allowed to be null // // -void casClient::dumpMsg(const caHdr *mp, const void *dp, const char *pFormat, ...) +void casClient::dumpMsg ( const caHdrLargeArray *mp, + const void *dp, const char *pFormat, ... ) { casChannelI *pciu; char pName[64u]; char pPVName[64u]; va_list theArgs; - if (pFormat) { - va_start (theArgs, pFormat); - errlogPrintf ("CAS: "); - errlogVprintf (pFormat, theArgs); - va_end (theArgs); + if ( pFormat ) { + va_start ( theArgs, pFormat ); + errlogPrintf ( "CAS: " ); + errlogVprintf ( pFormat, theArgs ); + va_end ( theArgs ); } - this->clientHostName (pName, sizeof (pName)); + this->hostName (pName, sizeof (pName)); pciu = this->resIdToChannel(mp->m_cid); @@ -591,8 +613,8 @@ void casClient::dumpMsg(const caHdr *mp, const void *dp, const char *pFormat, .. mp->m_postsize, mp->m_available); - if (mp->m_cmmd==CA_PROTO_WRITE && mp->m_dataType==DBR_STRING && dp) { - errlogPrintf("CAS: The string written: %s \n", (char *)dp); + if ( mp->m_cmmd == CA_PROTO_WRITE && mp->m_dataType == DBR_STRING && dp ) { + errlogPrintf("CAS: The string written: %s \n", (char *)dp); } } diff --git a/src/cas/generic/casClientMon.cc b/src/cas/generic/casClientMon.cc index 07836aa35..11bc01e6f 100644 --- a/src/cas/generic/casClientMon.cc +++ b/src/cas/generic/casClientMon.cc @@ -29,6 +29,9 @@ * * History * $Log$ + * Revision 1.8 2002/02/06 02:28:24 jhill + * fixed warnings + * * Revision 1.7 2001/01/11 21:54:53 jhill * accomodate Marty's osi => epics name changes * @@ -83,11 +86,11 @@ casClientMon::~casClientMon() // // casClientMon::callBack() // -caStatus casClientMon::callBack (const smartConstGDDPointer &value) +caStatus casClientMon::callBack ( const smartConstGDDPointer &value ) { - casCoreClient &client = this->getChannel().getClient(); + casCoreClient & client = this->getChannel().getClient(); caStatus status; - caHdr msg; + caHdrLargeArray msg; // // reconstruct the msg header @@ -98,13 +101,13 @@ caStatus casClientMon::callBack (const smartConstGDDPointer &value) assert ( type <= 0xffff ); msg.m_dataType = static_cast ( type ); unsigned long count = this->getCount(); - assert ( count <= 0xffff ); - msg.m_count = static_cast ( count ); + assert ( count <= 0xffffffff ); + msg.m_count = static_cast ( count ); msg.m_cid = this->getChannel().getSID(); msg.m_available = this->getClientId(); - status = client.monitorResponse (this->getChannel(), - msg, value, S_cas_success); + status = client.monitorResponse ( this->getChannel(), + msg, value, S_cas_success ); return status; } diff --git a/src/cas/generic/casCoreClient.cc b/src/cas/generic/casCoreClient.cc index dca3860ca..e5c46b852 100644 --- a/src/cas/generic/casCoreClient.cc +++ b/src/cas/generic/casCoreClient.cc @@ -106,36 +106,36 @@ void casCoreClient::show (unsigned level) const // asynchronous completion // caStatus casCoreClient::asyncSearchResponse ( - const caNetAddr &, const caHdr &, const pvExistReturn &) + const caNetAddr &, const caHdrLargeArray &, const pvExistReturn & ) { return S_casApp_noSupport; } -caStatus casCoreClient::createChanResponse (const caHdr &, const pvAttachReturn &) +caStatus casCoreClient::createChanResponse ( const caHdrLargeArray &, const pvAttachReturn & ) { return S_casApp_noSupport; } -caStatus casCoreClient::readResponse (casChannelI *, const caHdr &, +caStatus casCoreClient::readResponse (casChannelI *, const caHdrLargeArray &, const smartConstGDDPointer &, const caStatus) { return S_casApp_noSupport; } -caStatus casCoreClient::readNotifyResponse (casChannelI *, const caHdr &, +caStatus casCoreClient::readNotifyResponse (casChannelI *, const caHdrLargeArray &, const smartConstGDDPointer &, const caStatus) { return S_casApp_noSupport; } -caStatus casCoreClient::writeResponse (const caHdr &, +caStatus casCoreClient::writeResponse (const caHdrLargeArray &, const caStatus) { return S_casApp_noSupport; } -caStatus casCoreClient::writeNotifyResponse (const caHdr &, +caStatus casCoreClient::writeNotifyResponse (const caHdrLargeArray &, const caStatus) { return S_casApp_noSupport; } -caStatus casCoreClient::monitorResponse (casChannelI &, const caHdr &, - const smartConstGDDPointer &, const caStatus) +caStatus casCoreClient::monitorResponse ( casChannelI &, const caHdrLargeArray &, + const smartConstGDDPointer &, const caStatus ) { return S_casApp_noSupport; } diff --git a/src/cas/generic/casCtxIL.h b/src/cas/generic/casCtxIL.h index d1ecad67f..aa336aa8b 100644 --- a/src/cas/generic/casCtxIL.h +++ b/src/cas/generic/casCtxIL.h @@ -17,15 +17,15 @@ inline casCtx::casCtx() : // // casCtx::getMsg() // -inline const caHdr *casCtx::getMsg() const +inline const caHdrLargeArray * casCtx::getMsg() const { - return &this->msg; + return & this->msg; } // // casCtx::getData() // -inline void *casCtx::getData() const +inline void * casCtx::getData() const { return this->pData; } @@ -64,29 +64,11 @@ inline casChannelI * casCtx::getChannel() const // // casCtx::setMsg() -// (assumes incoming message is in network byte order) // -inline void casCtx::setMsg(const char *pBuf) +inline void casCtx::setMsg ( caHdrLargeArray &msgIn, void * pBody ) { - // - // copy as raw bytes in order to avoid - // alignment problems - // - memcpy (&this->msg, pBuf, sizeof(this->msg)); - this->msg.m_cmmd = epicsNTOH16 (this->msg.m_cmmd); - this->msg.m_postsize = epicsNTOH16 (this->msg.m_postsize); - this->msg.m_dataType = epicsNTOH16 (this->msg.m_dataType); - this->msg.m_count = epicsNTOH16 (this->msg.m_count); - this->msg.m_cid = epicsNTOH32 (this->msg.m_cid); - this->msg.m_available = epicsNTOH32 (this->msg.m_available); -} - -// -// casCtx::setData() -// -inline void casCtx::setData(void *p) -{ - this->pData = p; + this->msg = msgIn; + this->pData = pBody; } // diff --git a/src/cas/generic/casDGClient.cc b/src/cas/generic/casDGClient.cc index 7cc4457e9..5dcef7741 100644 --- a/src/cas/generic/casDGClient.cc +++ b/src/cas/generic/casDGClient.cc @@ -32,8 +32,8 @@ #include "server.h" #include "caServerIIL.h" // caServerI inline func +#include "inBufIL.h" // inline functions for inBuf #include "outBufIL.h" // inline func for outBuf -#include "dgInBufIL.h" // dgInBuf inline func #include "casCtxIL.h" // casCtx inline func #include "casCoreClientIL.h" // casCoreClient inline func #include "osiPoolStatus.h" // osi pool monitoring functions @@ -75,7 +75,7 @@ void casDGClient::show (unsigned level) const static_cast ( this ) ); if (level>=1u) { char buf[64]; - this->clientHostName (buf, sizeof(buf)); + this->hostName (buf, sizeof(buf)); printf ("Client Host=%s\n", buf); } this->casClient::show (level); @@ -86,10 +86,10 @@ void casDGClient::show (unsigned level) const // caStatus casDGClient::uknownMessageAction () { - const caHdr *mp = this->ctx.getMsg(); + const caHdrLargeArray * mp = this->ctx.getMsg(); - this->dumpMsg (mp, this->ctx.getData(), - "bad request code=%u in DG\n", mp->m_cmmd); + this->dumpMsg ( mp, this->ctx.getData(), + "bad request code=%u in DG\n", mp->m_cmmd ); return S_cas_internal; } @@ -99,25 +99,41 @@ caStatus casDGClient::uknownMessageAction () // caStatus casDGClient::searchAction() { - const caHdr *mp = this->ctx.getMsg(); - void *dp = this->ctx.getData(); - char *pChanName = (char *) dp; - caStatus status; + const caHdrLargeArray *mp = this->ctx.getMsg(); + const char *pChanName = static_cast ( this->ctx.getData() ); + caStatus status; // // check the sanity of the message // - if (mp->m_postsize<=1) { - this->dumpMsg (mp, dp, - "empty PV name in UDP search request?\n"); + if ( mp->m_postsize <= 1 ) { + this->dumpMsg ( mp, this->ctx.getData(), + "empty PV name extension in UDP search request?\n" ); return S_cas_success; } - pChanName[mp->m_postsize-1] = '\0'; - if (this->getCAS().getDebugLevel()>2u) { - char pName[64u]; - this->clientHostName (pChanName, sizeof (pChanName)); - printf("%s is searching for \"%s\"\n", pName, pChanName); + if ( pChanName[0] == '\0' ) { + this->dumpMsg ( mp, this->ctx.getData(), + "zero length PV name in UDP search request?\n" ); + return S_cas_success; + } + + // check for an unterminated string before calling server tool + // by searching backwards through the string (some early versions + // of the client library might not be setting the pad bytes to nill) + for ( unsigned i = mp->m_postsize-1; pChanName[i] != '\0'; i-- ) { + if ( i <= 1 ) { + this->dumpMsg ( mp, this->ctx.getData(), + "unterminated PV name in UDP search request?\n" ); + return S_cas_success; + } + } + + if ( this->getCAS().getDebugLevel() > 2u ) { + char pHostName[64u]; + this->hostName ( pHostName, sizeof ( pHostName ) ); + printf ( "\"%s\" is searching for \"%s\"\n", + pHostName, pChanName ); } // @@ -156,7 +172,7 @@ caStatus casDGClient::searchAction() // switch (pver.getStatus()) { case pverExistsHere: - status = this->searchResponse (*mp, pver); + status = this->searchResponse (*mp, pver); break; case pverDoesNotExistHere: @@ -182,18 +198,15 @@ caStatus casDGClient::searchAction() // // caStatus casDGClient::searchResponse() // -caStatus casDGClient::searchResponse(const caHdr &msg, - const pvExistReturn &retVal) +caStatus casDGClient::searchResponse ( const caHdrLargeArray & msg, + const pvExistReturn & retVal ) { - caStatus status; - caHdr *search_reply; - struct sockaddr_in ina; - unsigned short *pMinorVersion; - + caStatus status; + // // normal search failure is ignored // - if (retVal.getStatus()==pverDoesNotExistHere) { + if ( retVal.getStatus() == pverDoesNotExistHere ) { return S_cas_success; } @@ -214,7 +227,7 @@ caStatus casDGClient::searchResponse(const caHdr &msg, if ( !CA_V44(msg.m_count) ) { if (this->getCAS().getDebugLevel()>0u) { char pName[64u]; - this->clientHostName (pName, sizeof (pName)); + this->hostName (pName, sizeof (pName)); printf("client \"%s\" using EPICS R3.11 CA connect protocol was ignored\n", pName); } // @@ -226,30 +239,23 @@ caStatus casDGClient::searchResponse(const caHdr &msg, "R3.11 connect sequence from old client was ignored"); return status; } - - // - // obtain space for the reply message - // - status = this->allocMsg(sizeof(*pMinorVersion), &search_reply); - if (status) { - return status; - } - - *search_reply = msg; - search_reply->m_postsize = sizeof(*pMinorVersion); + // // cid field is abused to carry the IP // address in CA_V48 or higher // (this allows a CA servers to serve // as a directory service) // - // type field is abused to carry the IP + // data type field is abused to carry the IP // port number here CA_V44 or higher // (this allows multiple CA servers on one // host) // - if (CA_V48(msg.m_count)) { - if (retVal.addrIsValid()) { + ca_uint32_t serverAddr; + ca_uint16_t serverPort; + if ( CA_V48( msg.m_count ) ) { + struct sockaddr_in ina; + if ( retVal.addrIsValid() ) { caNetAddr addr = retVal.getAddr(); ina = addr.getSockIP(); // @@ -280,27 +286,30 @@ caStatus casDGClient::searchResponse(const caHdr &msg, ina.sin_addr.s_addr = epicsNTOH32 (~0U); } } - search_reply->m_cid = epicsNTOH32 (ina.sin_addr.s_addr); - search_reply->m_dataType = epicsNTOH16 (ina.sin_port); + serverAddr = epicsNTOH32 (ina.sin_addr.s_addr); + serverPort = epicsNTOH16 (ina.sin_port); } else { caNetAddr addr = this->serverAddress (); struct sockaddr_in inetAddr = addr.getSockIP(); - search_reply->m_cid = ~0U; - search_reply->m_dataType = epicsNTOH16 (inetAddr.sin_port); + serverAddr = ~0U; + serverPort = epicsNTOH16 ( inetAddr.sin_port ); } - search_reply->m_count = 0ul; - + ca_uint16_t * pMinorVersion; + status = this->out.copyInHeader ( CA_PROTO_SEARCH, + sizeof ( *pMinorVersion ), serverPort, 0, + serverAddr, msg.m_available, + reinterpret_cast ( &pMinorVersion ) ); + // // Starting with CA V4.1 the minor version number // is appended to the end of each search reply. // This value is ignored by earlier clients. // - pMinorVersion = (unsigned short *) (search_reply+1); *pMinorVersion = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION ); - this->commitMsg(); + this->out.commitMsg (); return S_cas_success; } @@ -310,21 +319,14 @@ caStatus casDGClient::searchResponse(const caHdr &msg, // (only when requested by the client // - when it isnt a reply to a broadcast) // -caStatus casDGClient::searchFailResponse(const caHdr *mp) +caStatus casDGClient::searchFailResponse ( const caHdrLargeArray * mp ) { int status; - caHdr *reply; - status = this->allocMsg(0u, &reply); - if(status){ - return status; - } + status = this->out.copyInHeader ( CA_PROTO_NOT_FOUND, 0, + mp->m_dataType, mp->m_count, mp->m_cid, mp->m_available, 0 ); - *reply = *mp; - reply->m_cmmd = CA_PROTO_NOT_FOUND; - reply->m_postsize = 0u; - - this->commitMsg(); + this->out.commitMsg (); return S_cas_success; } @@ -333,7 +335,7 @@ caStatus casDGClient::searchFailResponse(const caHdr *mp) // casDGClient::sendBeacon() // (implemented here because this has knowledge of the protocol) // -void casDGClient::sendBeacon () +void casDGClient::sendBeacon ( ca_uint32_t beaconNumber ) { union { caHdr msg; @@ -343,8 +345,10 @@ void casDGClient::sendBeacon () // // create the message // - memset ( &buf, 0, sizeof (msg) ); - msg.m_cmmd = epicsHTON16 (CA_PROTO_RSRV_IS_UP); + memset ( & buf, 0, sizeof ( msg ) ); + msg.m_cmmd = epicsHTON16 ( CA_PROTO_RSRV_IS_UP ); + msg.m_dataType = epicsHTON16 ( CA_MINOR_PROTOCOL_REVISION ); + msg.m_cid = epicsHTON32 ( beaconNumber ); // // send it to all addresses on the beacon list, @@ -357,61 +361,61 @@ void casDGClient::sendBeacon () // // casDGClient::xSend() // -outBuf::flushCondition casDGClient::xSend (char *pBufIn, // X aCC 361 +outBufClient::flushCondition casDGClient::xSend ( char *pBufIn, // X aCC 361 bufSizeT nBytesAvailableToSend, bufSizeT nBytesNeedToBeSent, - bufSizeT &nBytesSent) + bufSizeT &nBytesSent ) { - outBuf::flushCondition stat; + outBufClient::flushCondition stat; bufSizeT totalBytes; cadg *pHdr; - assert (nBytesAvailableToSend>=nBytesNeedToBeSent); + assert ( nBytesAvailableToSend >= nBytesNeedToBeSent ); totalBytes = 0; while ( true ) { pHdr = reinterpret_cast(&pBufIn[totalBytes]); - assert (totalBytes<=bufSizeT_MAX-pHdr->cadg_nBytes); - assert (totalBytes+pHdr->cadg_nBytes<=nBytesAvailableToSend); + assert ( totalBytes <= bufSizeT_MAX-pHdr->cadg_nBytes ); + assert ( totalBytes+pHdr->cadg_nBytes <= nBytesAvailableToSend ); - if (pHdr->cadg_addr.isValid()) { - stat = this->osdSend (reinterpret_cast(pHdr+1), - pHdr->cadg_nBytes-sizeof(*pHdr), pHdr->cadg_addr); - if (stat!=outBuf::flushProgress) { + if ( pHdr->cadg_addr.isValid() ) { + stat = this->osdSend ( reinterpret_cast(pHdr+1), + pHdr->cadg_nBytes-sizeof(*pHdr), pHdr->cadg_addr ); + if ( stat != outBufClient::flushProgress ) { break; } } totalBytes += pHdr->cadg_nBytes; - if (totalBytes>=nBytesNeedToBeSent) { + if ( totalBytes >= nBytesNeedToBeSent ) { break; } } - if (totalBytes) { + if ( totalBytes ) { // // !! this time fetch may be slowing things down !! // //this->lastSendTS = epicsTime::getCurrent(); nBytesSent = totalBytes; - return outBuf::flushProgress; + return outBufClient::flushProgress; } else { - return outBuf::flushNone; + return outBufClient::flushNone; } } // // casDGClient::xRecv () // -inBuf::fillCondition casDGClient::xRecv (char *pBufIn, bufSizeT nBytesToRecv, // X aCC 361 +inBufClient::fillCondition casDGClient::xRecv (char *pBufIn, bufSizeT nBytesToRecv, // X aCC 361 fillParameter parm, bufSizeT &nByesRecv) { const char *pAfter = pBufIn + nBytesToRecv; char *pCurBuf = pBufIn; bufSizeT nDGBytesRecv; - inBuf::fillCondition stat; + inBufClient::fillCondition stat; cadg *pHdr; while (pAfter-pCurBuf >= static_cast(MAX_UDP_RECV+sizeof(cadg))) { @@ -448,8 +452,8 @@ inBuf::fillCondition casDGClient::xRecv (char *pBufIn, bufSizeT nBytesToRecv, // // this results in many small UDP frames which unfortunately // isnt particularly efficient // -caStatus casDGClient::asyncSearchResponse (const caNetAddr &outAddr, - const caHdr &msg, const pvExistReturn &retVal) +caStatus casDGClient::asyncSearchResponse ( const caNetAddr & outAddr, + const caHdrLargeArray & msg, const pvExistReturn & retVal ) { caStatus stat; @@ -457,20 +461,20 @@ caStatus casDGClient::asyncSearchResponse (const caNetAddr &outAddr, // start a DG context in the output protocol stream // and grab the send lock // - void *pRaw; - const outBufCtx outctx = this->outBuf::pushCtx - (sizeof(cadg), MAX_UDP_SEND, pRaw); - if (outctx.pushResult()!=outBufCtx::pushCtxSuccess) { + void * pRaw; + const outBufCtx outctx = this->out.pushCtx + ( sizeof(cadg), MAX_UDP_SEND, pRaw ); + if ( outctx.pushResult() != outBufCtx::pushCtxSuccess ) { return S_cas_sendBlocked; } - cadg *pRespHdr = reinterpret_cast(pRaw); - stat = this->searchResponse (msg, retVal); + cadg * pRespHdr = reinterpret_cast ( pRaw ); + stat = this->searchResponse ( msg, retVal ); - pRespHdr->cadg_nBytes = this->outBuf::popCtx (outctx); - if (pRespHdr->cadg_nBytes>0) { + pRespHdr->cadg_nBytes = this->out.popCtx (outctx); + if ( pRespHdr->cadg_nBytes > 0 ) { pRespHdr->cadg_addr = outAddr; - this->outBuf::commitRawMsg (pRespHdr->cadg_nBytes); + this->out.commitRawMsg ( pRespHdr->cadg_nBytes ); } return stat; @@ -485,12 +489,12 @@ caStatus casDGClient::processDG () caStatus status; status = S_cas_success; - while ( (bytesLeft = this->inBuf::bytesPresent()) ) { + while ( ( bytesLeft = this->in.bytesPresent() ) ) { bufSizeT dgInBytesConsumed; - const cadg *pReqHdr = reinterpret_cast(this->inBuf::msgPtr ()); + const cadg * pReqHdr = reinterpret_cast ( this->in.msgPtr () ); if (bytesLeftinBuf::removeMsg (bytesLeft); + this->in.removeMsg (bytesLeft); errlogPrintf ("casDGClient::processMsg: incomplete DG header?"); status = S_cas_internal; break; @@ -501,7 +505,7 @@ caStatus casDGClient::processDG () // and grab the send lock // void *pRaw; - const outBufCtx outctx = this->outBuf::pushCtx (sizeof(cadg), MAX_UDP_SEND, pRaw); + const outBufCtx outctx = this->out.pushCtx ( sizeof ( cadg ), MAX_UDP_SEND, pRaw ); if ( outctx.pushResult() != outBufCtx::pushCtxSuccess ) { status = S_cas_sendBlocked; break; @@ -513,10 +517,11 @@ caStatus casDGClient::processDG () // select the next DG in the input stream and start processing it // const bufSizeT reqBodySize = pReqHdr->cadg_nBytes-sizeof (*pReqHdr); - const inBufCtx inctx = this->inBuf::pushCtx (sizeof (*pReqHdr), reqBodySize); + + const inBufCtx inctx = this->in.pushCtx (sizeof (*pReqHdr), reqBodySize); if ( inctx.pushResult() != inBufCtx::pushCtxSuccess ) { - this->inBuf::removeMsg (bytesLeft); - this->outBuf::popCtx (outctx); + this->in.removeMsg ( bytesLeft ); + this->out.popCtx ( outctx ); errlogPrintf ("casDGClient::processMsg: incomplete DG?\n"); status = S_cas_internal; break; @@ -525,7 +530,7 @@ caStatus casDGClient::processDG () this->lastRecvAddr = pReqHdr->cadg_addr; status = this->processMsg (); - dgInBytesConsumed = this->inBuf::popCtx (inctx); + dgInBytesConsumed = this->in.popCtx ( inctx ); if (dgInBytesConsumed>0) { // @@ -536,21 +541,21 @@ caStatus casDGClient::processDG () // In either case commit the DG to the protocol stream and // release the send lock // - pRespHdr->cadg_nBytes = this->outBuf::popCtx (outctx) + sizeof(*pRespHdr); - if (pRespHdr->cadg_nBytes>sizeof(*pRespHdr)) { + pRespHdr->cadg_nBytes = this->out.popCtx ( outctx ) + sizeof ( *pRespHdr ); + if ( pRespHdr->cadg_nBytes > sizeof ( *pRespHdr ) ) { pRespHdr->cadg_addr = pReqHdr->cadg_addr; - this->outBuf::commitRawMsg (pRespHdr->cadg_nBytes); + this->out.commitRawMsg ( pRespHdr->cadg_nBytes ); } // // check to see that all of the incoming UDP frame was used // - if (dgInBytesConsumedinBuf::removeMsg (dgInBytesConsumed); + this->in.removeMsg (dgInBytesConsumed); // // slide the UDP header forward and correct the byte count @@ -558,7 +563,7 @@ caStatus casDGClient::processDG () { cadg *pReqHdrMove; cadg copy = *pReqHdr; - pReqHdrMove = reinterpret_cast (this->inBuf::msgPtr ()); + pReqHdrMove = reinterpret_cast ( this->in.msgPtr () ); pReqHdrMove->cadg_addr = copy.cadg_addr; pReqHdrMove->cadg_nBytes = copy.cadg_nBytes - dgInBytesConsumed; } @@ -567,7 +572,7 @@ caStatus casDGClient::processDG () // // remove the header and all of the body // - this->inBuf::removeMsg ( pReqHdr->cadg_nBytes ); + this->in.removeMsg ( pReqHdr->cadg_nBytes ); } } @@ -595,9 +600,9 @@ caNetAddr casDGClient::fetchLastRecvAddr () const } // -// casDGClient::clientHostName() +// casDGClient::hostName() // -void casDGClient::clientHostName (char *pBufIn, unsigned bufSizeIn) const +void casDGClient::hostName (char *pBufIn, unsigned bufSizeIn) const { this->lastRecvAddr.stringConvert (pBufIn, bufSizeIn); } diff --git a/src/cas/generic/casInternal.h b/src/cas/generic/casInternal.h index f3ba9a5cb..bc9450864 100644 --- a/src/cas/generic/casInternal.h +++ b/src/cas/generic/casInternal.h @@ -34,7 +34,7 @@ // #include "tsDLList.h" #include "resourceLib.h" -#define CA_MINOR_PROTOCOL_REVISION 8 +#define CA_MINOR_PROTOCOL_REVISION 10 #include "caProto.h" #include "smartGDDPointer.h" @@ -499,3 +499,14 @@ private: casPVI & operator = ( const casPVI & ); }; +/* a modified ca header with capacity for large arrays */ +struct caHdrLargeArray { + ca_uint32_t m_postsize; /* size of message extension */ + ca_uint32_t m_count; /* operation data count */ + ca_uint32_t m_cid; /* channel identifier */ + ca_uint32_t m_available; /* protocol stub dependent */ + ca_uint16_t m_dataType; /* operation data type */ + ca_uint16_t m_cmmd; /* operation to be performed */ +}; + + diff --git a/src/cas/generic/casPVListChan.cc b/src/cas/generic/casPVListChan.cc index 20e2d4347..da2b7805c 100644 --- a/src/cas/generic/casPVListChan.cc +++ b/src/cas/generic/casPVListChan.cc @@ -33,10 +33,6 @@ #include "server.h" #include "casPVIIL.h" -// -// this needs to be here (and not in dgInBufIL.h) if we -// are to avoid undefined symbols under gcc 2.7.x with -g -// // // casPVListChan::casPVListChan() // diff --git a/src/cas/generic/casStrmClient.cc b/src/cas/generic/casStrmClient.cc index 6ee671370..4954990df 100644 --- a/src/cas/generic/casStrmClient.cc +++ b/src/cas/generic/casStrmClient.cc @@ -110,16 +110,16 @@ casStrmClient::~casStrmClient() // caStatus casStrmClient::uknownMessageAction () { - const caHdr *mp = this->ctx.getMsg(); + const caHdrLargeArray *mp = this->ctx.getMsg(); caStatus status; - this->dumpMsg (mp, this->ctx.getData(), - "bad request code from virtual circuit=%u\n", mp->m_cmmd); + this->dumpMsg ( mp, this->ctx.getData(), + "bad request code from virtual circuit=%u\n", mp->m_cmmd ); /* * most clients dont recover from this */ - status = this->sendErr (mp, ECA_INTERNAL, "Invalid Request Code"); + status = this->sendErr ( mp, ECA_INTERNAL, "Invalid Request Code" ); if (status) { return status; } @@ -136,7 +136,7 @@ caStatus casStrmClient::uknownMessageAction () // caStatus casStrmClient::verifyRequest (casChannelI *&pChan) { - const caHdr *mp = this->ctx.getMsg(); + const caHdrLargeArray * mp = this->ctx.getMsg(); // // channel exists for this resource id ? @@ -189,8 +189,8 @@ void casStrmClient::show (unsigned level) const if (level > 1u) { printf ("\tuser %s at %s\n", this->pUserName, this->pHostName); } - this->inBuf::show(level); - this->outBuf::show(level); + this->in.show(level); + this->out.show(level); } /* @@ -198,7 +198,7 @@ void casStrmClient::show (unsigned level) const */ caStatus casStrmClient::readAction () { - const caHdr *mp = this->ctx.getMsg(); + const caHdrLargeArray *mp = this->ctx.getMsg(); caStatus status; casChannelI *pChan; smartGDDPointer pDesc; @@ -245,70 +245,54 @@ caStatus casStrmClient::readAction () // // casStrmClient::readResponse() // -caStatus casStrmClient::readResponse (casChannelI *pChan, const caHdr &msg, - const smartConstGDDPointer &pDesc, const caStatus status) +caStatus casStrmClient::readResponse ( casChannelI * pChan, const caHdrLargeArray & msg, + const smartConstGDDPointer & pDesc, const caStatus status ) { - caHdr *reply; - unsigned size; - caStatus localStatus; - int mapDBRStatus; - int strcnt; - - if (status!=S_casApp_success) { - return this->sendErrWithEpicsStatus(&msg, status, ECA_GETFAIL); + if ( status != S_casApp_success ) { + return this->sendErrWithEpicsStatus ( & msg, status, ECA_GETFAIL ); } - size = dbr_size_n (msg.m_dataType, msg.m_count); - localStatus = this->allocMsg(size, &reply); - if (localStatus) { - if (localStatus==S_cas_hugeRequest) { - localStatus = sendErr(&msg, ECA_TOLARGE, NULL); - } - return localStatus; - } - - - // - // setup response message - // - *reply = msg; - assert ( size <= 0xffff ); - reply->m_postsize = static_cast ( size ); - reply->m_cid = pChan->getCID(); + void *pPayload; + { + unsigned payloadSize = dbr_size_n ( msg.m_dataType, msg.m_count ); + caStatus localStatus = this->out.copyInHeader ( msg.m_cmmd, payloadSize, + msg.m_dataType, msg.m_count, pChan->getCID (), + msg.m_available, & pPayload ); + if ( localStatus ) { + if ( localStatus==S_cas_hugeRequest ) { + localStatus = sendErr ( &msg, ECA_TOLARGE, NULL ); + } + return localStatus; + } + } // // convert gdd to db_access type // (places the data in network format) // - mapDBRStatus = gddMapDbr[msg.m_dataType].conv_dbr((reply+1), msg.m_count, *pDesc, pChan->enumStringTable()); - if (mapDBRStatus<0) { + int mapDBRStatus = gddMapDbr[msg.m_dataType].conv_dbr( + pPayload, msg.m_count, *pDesc, pChan->enumStringTable() ); + if ( mapDBRStatus < 0 ) { pDesc->dump(); errPrintf (S_cas_badBounds, __FILE__, __LINE__, "- get with PV=%s type=%u count=%u", pChan->getPVI().getName(), msg.m_dataType, msg.m_count); - return this->sendErrWithEpicsStatus(&msg, S_cas_badBounds, ECA_GETFAIL); + return this->sendErrWithEpicsStatus ( + &msg, S_cas_badBounds, ECA_GETFAIL ); } #ifdef CONVERSION_REQUIRED - /* use type as index into conversion jumptable */ - (* cac_dbr_cvrt[msg.m_dataType]) - ( reply + 1, - reply + 1, - TRUE, /* host -> net format */ - msg.m_count); + ( * cac_dbr_cvrt[msg.m_dataType] ) + ( pPayload, pPayload, TRUE, msg.m_count ); #endif - // - // force string message size to be the true size rounded to even - // boundary - // - if (msg.m_dataType == DBR_STRING && msg.m_count == 1u) { - /* add 1 so that the string terminator will be shipped */ - strcnt = strlen((char *)(reply + 1u)) + 1u; - assert ( strcnt <= 0xffff ); - reply->m_postsize = static_cast (strcnt); + + if ( msg.m_dataType == DBR_STRING && msg.m_count == 1u ) { + unsigned reducedPayloadSize = strlen ( static_cast < char * > ( pPayload ) ) + 1u; + this->out.commitMsg ( reducedPayloadSize ); } + else { + this->out.commitMsg (); + } - this->commitMsg (); - - return localStatus; + return S_cas_success; } // @@ -316,14 +300,14 @@ caStatus casStrmClient::readResponse (casChannelI *pChan, const caHdr &msg, // caStatus casStrmClient::readNotifyAction () { - const caHdr *mp = this->ctx.getMsg(); + const caHdrLargeArray *mp = this->ctx.getMsg(); int status; casChannelI *pChan; smartGDDPointer pDesc; - status = this->verifyRequest (pChan); - if (status != ECA_NORMAL) { - return this->readNotifyResponseECA_XXX (NULL, *mp, NULL, status); + status = this->verifyRequest ( pChan ); + if ( status != ECA_NORMAL ) { + return this->readNotifyFailureResponse ( *mp, status ); } // @@ -331,10 +315,10 @@ caStatus casStrmClient::readNotifyAction () // if (!pChan->readAccess()) { if (CA_V41(this->minor_version_number)) { - return this->readNotifyResponseECA_XXX (NULL, *mp, NULL, ECA_NORDACCESS); + return this->readNotifyFailureResponse ( *mp, ECA_NORDACCESS ); } else { - return this->readNotifyResponse (NULL, *mp, NULL, S_cas_noRead); + return this->readNotifyResponse ( NULL, *mp, NULL, S_cas_noRead ); } } @@ -358,142 +342,95 @@ caStatus casStrmClient::readNotifyAction () // // casStrmClient::readNotifyResponse() // -caStatus casStrmClient::readNotifyResponse (casChannelI *pChan, - const caHdr &msg, const smartConstGDDPointer &pDesc, const caStatus completionStatus) +caStatus casStrmClient::readNotifyResponse ( casChannelI * pChan, + const caHdrLargeArray & msg, const smartConstGDDPointer & pDesc, const caStatus completionStatus ) { - caStatus ecaStatus; - - if (completionStatus!=S_cas_success) { - ecaStatus = ECA_GETFAIL; - } - else { - ecaStatus = ECA_NORMAL; - } - ecaStatus = this->readNotifyResponseECA_XXX (pChan, msg, pDesc, ecaStatus); - if (ecaStatus) { - return ecaStatus; - } - - // - // send independent warning exception to the client so that they - // will see the error string associated with this error code - // since the error string cant be sent with the get call back - // response (hopefully this is useful information) - // - // order is very important here because it determines that the get - // call back response is always sent, and that this warning exception - // message will be sent at most one time (in rare instances it will - // not be sent, but at least it will not be sent multiple times). - // The message is logged to the console in the rare situations when - // we are unable to send. - // - if (completionStatus!=S_cas_success) { - ecaStatus = this->sendErrWithEpicsStatus (&msg, completionStatus, ECA_NOCONVERT); - if (ecaStatus) { - errMessage (completionStatus, "<= get callback failure detail not passed to client"); + if ( completionStatus != S_cas_success ) { + caStatus ecaStatus = this->readNotifyFailureResponse ( msg, ECA_GETFAIL ); + // + // send independent warning exception to the client so that they + // will see the error string associated with this error code + // since the error string cant be sent with the get call back + // response (hopefully this is useful information) + // + // order is very important here because it determines that the get + // call back response is always sent, and that this warning exception + // message will be sent at most one time (in rare instances it will + // not be sent, but at least it will not be sent multiple times). + // The message is logged to the console in the rare situations when + // we are unable to send. + // + caStatus tmpStatus = this->sendErrWithEpicsStatus ( & msg, completionStatus, ECA_NOCONVERT ); + if ( tmpStatus ) { + errMessage ( completionStatus, "<= get callback failure detail not passed to client" ); } + return ecaStatus; } + + if ( ! pDesc ) { + errMessage ( S_cas_badParameter, + "no data in server tool asynch read resp ?" ); + return this->readNotifyFailureResponse ( msg, ECA_GETFAIL ); + } + + void *pPayload; + { + unsigned size = dbr_size_n ( msg.m_dataType, msg.m_count ); + caStatus status = this->out.copyInHeader ( msg.m_cmmd, size, + msg.m_dataType, msg.m_count, ECA_NORMAL, + msg.m_available, & pPayload ); + if ( status ) { + if ( status == S_cas_hugeRequest ) { + status = sendErr ( & msg, ECA_TOLARGE, NULL ); + } + return status; + } + } + + // + // convert gdd to db_access type + // + int mapDBRStatus = gddMapDbr[msg.m_dataType].conv_dbr ( pPayload, + msg.m_count, *pDesc, pChan->enumStringTable() ); + if ( mapDBRStatus < 0 ) { + pDesc->dump(); + errPrintf ( S_cas_badBounds, __FILE__, __LINE__, + "- get notify with PV=%s type=%u count=%u", + pChan->getPVI().getName(), msg.m_dataType, msg.m_count ); + return this->readNotifyFailureResponse ( msg, ECA_NOCONVERT ); + } + +#ifdef CONVERSION_REQUIRED + ( * cac_dbr_cvrt[ msg.m_dataType ] ) + ( pPayload, pPayload, TRUE, msg.m_count ); +#endif + + if ( msg.m_dataType == DBR_STRING && msg.m_count == 1u ) { + unsigned reducedPayloadSize = strlen ( static_cast < char * > ( pPayload ) ) + 1u; + this->out.commitMsg ( reducedPayloadSize ); + } + else { + this->out.commitMsg (); + } + return S_cas_success; } // -// casStrmClient::readNotifyResponseECA_XXX () +// casStrmClient::readNotifyFailureResponse () // -caStatus casStrmClient::readNotifyResponseECA_XXX (casChannelI *pChan, - const caHdr &msg, const smartConstGDDPointer &pDesc, const caStatus ecaStatus) +caStatus casStrmClient::readNotifyFailureResponse ( const caHdrLargeArray & msg, const caStatus ECA_XXXX ) { - caHdr *reply; - unsigned size; - caStatus status; - - size = dbr_size_n (msg.m_dataType, msg.m_count); - status = this->allocMsg(size, &reply); - if (status) { - if (status==S_cas_hugeRequest) { - // - // All read notify responses must include a buffer of - // the size they specify - otherwise an exception - // is generated - // - status = sendErr(&msg, ECA_TOLARGE, NULL); - } - return status; + assert ( ECA_XXXX != ECA_NORMAL ); + void *pPayload; + unsigned size = dbr_size_n ( msg.m_dataType, msg.m_count ); + caStatus status = this->out.copyInHeader ( msg.m_cmmd, size, + msg.m_dataType, msg.m_count, ECA_XXXX, + msg.m_available, & pPayload ); + if ( ! status ) { + memset ( pPayload, '\0', size ); } - - // - // setup response message - // - *reply = msg; - assert ( size <= 0xffff ); - reply->m_postsize = static_cast (size); - - // - // cid field abused to store the status here - // - if (ecaStatus == ECA_NORMAL) { - if (!pDesc) { - errMessage(S_cas_badParameter, - "because no data in server tool asynch read resp"); - reply->m_cid = ECA_GETFAIL; - } - else { - int mapDBRStatus; - // - // convert gdd to db_access type - // (places the data in network format) - // - mapDBRStatus = gddMapDbr[msg.m_dataType].conv_dbr((reply+1), msg.m_count, *pDesc, pChan->enumStringTable()); - if (mapDBRStatus<0) { - pDesc->dump(); - errPrintf (S_cas_badBounds, __FILE__, __LINE__, "- get notify with PV=%s type=%u count=%u", - pChan->getPVI().getName(), msg.m_dataType, msg.m_count); - reply->m_cid = ECA_GETFAIL; - } - else { - reply->m_cid = ECA_NORMAL; - } - } - } - else { - reply->m_cid = ecaStatus; - } - - // - // If they return non-zero status or a nill gdd ptr - // - if (reply->m_cid != ECA_NORMAL) { - // - // If the operation failed clear the response data - // area - // - memset ((char *)(reply+1), '\0', size); - } -#ifdef CONVERSION_REQUIRED - else { - - /* use type as index into conversion jumptable */ - (* cac_dbr_cvrt[msg.m_dataType]) - ( reply + 1, - reply + 1, - TRUE, /* host -> net format */ - msg.m_count); - } -#endif - - // - // force string message size to be the true size rounded to even - // boundary - // - if (msg.m_dataType == DBR_STRING && msg.m_count == 1u) { - /* add 1 so that the string terminator will be shipped */ - size_t strcnt = strlen((char *)(reply + 1u)) + 1u; - assert ( strcnt < 0xffff ); - reply->m_postsize = static_cast ( strcnt ); - } - - this->commitMsg (); - - return S_cas_success; + return status; } // @@ -541,7 +478,11 @@ static smartGDDPointer createDBRDD (unsigned dbrType, aitIndex dbrCount) // returned for DBR types // if ( dbrCount > 1 ) { - pDescRet = new gdd (*pDescRet); + gdd & gddRef = *pDescRet; + gddContainer * pContainer = (gddContainer*) & gddRef; + gdd *pDuplicate = new gdd ( pContainer ); + pDescRet = pDuplicate; + // // smart pointer class maintains the ref count from here down // @@ -592,27 +533,17 @@ static smartGDDPointer createDBRDD (unsigned dbrType, aitIndex dbrCount) pVal->setDimension ( 1u, &bds ); } else if ( pVal->isAtomic () ) { - const gddBounds* pB = pVal->getBounds (); - aitIndex bound = dbrCount; - unsigned dim; - int modAllowed; if ( pDescRet->isManaged () || pDescRet->isFlat () ) { - modAllowed = FALSE; + pDescRet = NULL; + return pDescRet; } - else { - modAllowed = TRUE; - } - - for ( dim=0u; dim < pVal->dimension (); dim++ ) { + + aitIndex bound = dbrCount; + const gddBounds* pB = pVal->getBounds (); + for ( unsigned dim = 0u; dim < pVal->dimension (); dim++ ) { if ( pB[dim].first () != 0u && pB[dim].size() != bound ) { - if ( modAllowed ) { - pVal->setBound( dim, 0u, bound ); - } - else { - pDescRet = NULL; - return pDescRet; - } + pVal->setBound( dim, 0u, bound ); } bound = 1u; } @@ -631,125 +562,105 @@ static smartGDDPointer createDBRDD (unsigned dbrType, aitIndex dbrCount) // // casStrmClient::monitorResponse () // -caStatus casStrmClient::monitorResponse (casChannelI &chan, const caHdr &msg, - const smartConstGDDPointer &pDesc, const caStatus completionStatus) +caStatus casStrmClient::monitorFailureResponse ( const caHdrLargeArray & msg, + const caStatus ECA_XXXX ) { - caStatus completionStatusCopy = completionStatus; - smartGDDPointer pDBRDD; - caHdr *pReply; - unsigned size; - caStatus status; - gddStatus gdds; - - size = dbr_size_n (msg.m_dataType, msg.m_count); - status = this->allocMsg(size, &pReply); - if (status) { - if (status==S_cas_hugeRequest) { - // - // If we cant include the data - it is a proto - // violation - so we generate an exception - // instead - // - status = sendErr (&msg, ECA_TOLARGE, - "unable to xmit event"); - } - return status; + assert ( ECA_XXXX != ECA_NORMAL ); + void *pPayload; + unsigned size = dbr_size_n ( msg.m_dataType, msg.m_count ); + caStatus status = this->out.copyInHeader ( msg.m_cmmd, size, + msg.m_dataType, msg.m_count, ECA_XXXX, + msg.m_available, & pPayload ); + if ( ! status ) { + memset ( pPayload, '\0', size ); + this->out.commitMsg (); } + return status; +} - // - // setup response message - // - *pReply = msg; - assert ( size <= 0xffff ); - pReply->m_postsize = static_cast (size); +// +// casStrmClient::monitorResponse () +// +caStatus casStrmClient::monitorResponse ( casChannelI & chan, const caHdrLargeArray & msg, + const smartConstGDDPointer & pDesc, const caStatus completionStatus ) +{ + void * pPayload; + { + ca_uint32_t size = dbr_size_n ( msg.m_dataType, msg.m_count ); + caStatus status = out.copyInHeader ( msg.m_cmmd, size, + msg.m_dataType, msg.m_count, ECA_NORMAL, + msg.m_available, & pPayload ); + if ( status ) { + if ( status == S_cas_hugeRequest ) { + status = sendErr ( & msg, ECA_TOLARGE, + "unable to xmit event" ); + } + return status; + } + } - // - // verify read access - // - if (!chan.readAccess()) { - completionStatusCopy = S_cas_noRead; + smartGDDPointer pDBRDD; + if ( ! chan.readAccess () ) { + return monitorFailureResponse ( msg, ECA_NORDACCESS ); } - - // - // cid field abused to store the status here - // - if (completionStatusCopy == S_cas_success) { - - if (!pDesc) { - completionStatusCopy = S_cas_badParameter; - } - else { - pDBRDD = createDBRDD (msg.m_dataType, msg.m_count); - if (!pDBRDD) { - completionStatusCopy = S_cas_noMemory; + else if ( completionStatus == S_cas_success ) { + pDBRDD = createDBRDD ( msg.m_dataType, msg.m_count ); + if ( ! pDBRDD ) { + return monitorFailureResponse ( msg, ECA_ALLOCMEM ); + } + else if ( pDesc.valid() ) { + gddStatus gdds = gddApplicationTypeTable:: + app_table.smartCopy ( & (*pDBRDD), & (*pDesc) ); + if ( gdds < 0 ) { + errPrintf ( S_cas_noConvert, __FILE__, __LINE__, + "no conversion between event app type=%d and DBR type=%d Element count=%d", + pDesc->applicationType (), msg.m_dataType, msg.m_count); + return monitorFailureResponse ( msg, ECA_NOCONVERT ); } - else { - gdds = gddApplicationTypeTable:: - app_table.smartCopy ( & (*pDBRDD), & (*pDesc) ); - if (gdds) { - errPrintf (status, __FILE__, __LINE__, -"no conversion between event app type=%d and DBR type=%d Element count=%d", - pDesc->applicationType(), - msg.m_dataType, - msg.m_count); - completionStatusCopy = S_cas_noConvert; - } - } - } - } - - // - // see no DD and no convert case above - // - if (completionStatusCopy == S_cas_success) { - pReply->m_cid = ECA_NORMAL; - - // - // there appears to be no success/fail - // status from this routine - // - gddMapDbr[msg.m_dataType].conv_dbr ((pReply+1), msg.m_count, *pDBRDD, chan.enumStringTable()); - -#ifdef CONVERSION_REQUIRED - /* use type as index into conversion jumptable */ - (* cac_dbr_cvrt[msg.m_dataType]) - ( pReply + 1, - pReply + 1, - TRUE, /* host -> net format */ - msg.m_count); -#endif - // - // force string message size to be the true size - // - if (msg.m_dataType == DBR_STRING && msg.m_count == 1u) { - // add 1 so that the string terminator - // will be shipped - size_t strcnt = strlen((char *)(pReply + 1u)) + 1u; - assert ( strcnt < 0xffff ); - pReply->m_postsize = static_cast ( strcnt ); - } + } + else { + errMessage ( S_cas_badParameter, "no GDD in monitor response ?" ); + return monitorFailureResponse ( msg, ECA_GETFAIL ); + } } else { - errMessage(completionStatusCopy, "- in monitor response"); - - if (completionStatusCopy== S_cas_noRead) { - pReply->m_cid = ECA_NORDACCESS; + errMessage ( completionStatus, "- in monitor response" ); + if ( completionStatus == S_cas_noRead ) { + return monitorFailureResponse ( msg, ECA_NORDACCESS ); } - else if (completionStatusCopy==S_cas_noMemory) { - pReply->m_cid = ECA_ALLOCMEM; + else if ( completionStatus == S_cas_noMemory ) { + return monitorFailureResponse ( msg, ECA_ALLOCMEM ); } else { - pReply->m_cid = ECA_GETFAIL; + return monitorFailureResponse ( msg, ECA_GETFAIL ); } - - // - // If the operation failed clear the response data - // area - // - memset ((char *)(pReply+1u), '\0', size); } - this->commitMsg (); + // + // there appears to be no success/fail + // status from this routine + // + int mapDBRStatus = gddMapDbr[msg.m_dataType].conv_dbr ( pPayload, msg.m_count, + *pDBRDD, chan.enumStringTable() ); + if ( mapDBRStatus < 0 ) { + return monitorFailureResponse ( msg, ECA_NOCONVERT ); + } + +#ifdef CONVERSION_REQUIRED + /* use type as index into conversion jumptable */ + (* cac_dbr_cvrt[msg.m_dataType]) + ( pPayload, pPayload, TRUE, msg.m_count ); +#endif + // + // force string message size to be the true size + // + if ( msg.m_dataType == DBR_STRING && msg.m_count == 1u ) { + ca_uint32_t reducedPayloadSize = strlen ( static_cast < char * > ( pPayload ) ) + 1u; + this->out.commitMsg ( reducedPayloadSize ); + } + else { + this->out.commitMsg (); + } return S_cas_success; } @@ -759,8 +670,8 @@ caStatus casStrmClient::monitorResponse (casChannelI &chan, const caHdr &msg, */ caStatus casStrmClient::writeAction() { - const caHdr *mp = this->ctx.getMsg(); - caStatus status; + const caHdrLargeArray *mp = this->ctx.getMsg(); + caStatus status; casChannelI *pChan; status = this->verifyRequest (pChan); @@ -814,7 +725,7 @@ caStatus casStrmClient::writeAction() // casStrmClient::writeResponse() // caStatus casStrmClient::writeResponse ( - const caHdr &msg, const caStatus completionStatus) + const caHdrLargeArray &msg, const caStatus completionStatus) { caStatus status; @@ -835,7 +746,7 @@ caStatus casStrmClient::writeResponse ( */ caStatus casStrmClient::writeNotifyAction() { - const caHdr *mp = this->ctx.getMsg(); + const caHdrLargeArray *mp = this->ctx.getMsg(); int status; casChannelI *pChan; @@ -879,7 +790,7 @@ caStatus casStrmClient::writeNotifyAction() * casStrmClient::writeNotifyResponse() */ caStatus casStrmClient::writeNotifyResponse( - const caHdr &msg, const caStatus completionStatus) + const caHdrLargeArray &msg, const caStatus completionStatus) { caStatus ecaStatus; @@ -920,25 +831,17 @@ caStatus casStrmClient::writeNotifyResponse( /* * casStrmClient::writeNotifyResponseECA_XXX() */ -caStatus casStrmClient::writeNotifyResponseECA_XXX( - const caHdr &msg, const caStatus ecaStatus) +caStatus casStrmClient::writeNotifyResponseECA_XXX ( + const caHdrLargeArray & msg, const caStatus ecaStatus ) { - caHdr *preply; - caStatus opStatus; - - opStatus = this->allocMsg(0u, &preply); - if (opStatus) { - return opStatus; + caStatus status = out.copyInHeader ( msg.m_cmmd, 0, + msg.m_dataType, msg.m_count, ecaStatus, + msg.m_available, 0 ); + if ( ! status ) { + this->out.commitMsg (); } - *preply = msg; - preply->m_postsize = 0u; - preply->m_cid = ecaStatus; - - /* commit the message */ - this->commitMsg(); - - return S_cas_success; + return status; } /* @@ -946,7 +849,7 @@ caStatus casStrmClient::writeNotifyResponseECA_XXX( */ caStatus casStrmClient::hostNameAction() { - const caHdr *mp = this->ctx.getMsg(); + const caHdrLargeArray *mp = this->ctx.getMsg(); char *pName = (char *) this->ctx.getData(); unsigned size; char *pMalloc; @@ -993,7 +896,7 @@ caStatus casStrmClient::hostNameAction() */ caStatus casStrmClient::clientNameAction() { - const caHdr *mp = this->ctx.getMsg(); + const caHdrLargeArray *mp = this->ctx.getMsg(); char *pName = (char *) this->ctx.getData(); unsigned size; char *pMalloc; @@ -1040,7 +943,7 @@ caStatus casStrmClient::clientNameAction() */ caStatus casStrmClient::claimChannelAction() { - const caHdr *mp = this->ctx.getMsg(); + const caHdrLargeArray *mp = this->ctx.getMsg(); char *pName = (char *) this->ctx.getData(); caServerI &cas = *this->ctx.getServer(); caStatus status; @@ -1123,14 +1026,13 @@ caStatus casStrmClient::claimChannelAction() // // LOCK must be applied // -caStatus casStrmClient::createChanResponse(const caHdr &hdr, const pvAttachReturn &pvar) +caStatus casStrmClient::createChanResponse(const caHdrLargeArray &hdr, const pvAttachReturn &pvar) { - casPVI *pPV; - casChannel *pChan; - casChannelI *pChanI; - caHdr *claim_reply; - bufSizeT nBytes; - caStatus status; + casPVI *pPV; + casChannel *pChan; + casChannelI *pChanI; + bufSizeT nBytes; + caStatus status; if (pvar.getStatus() != S_cas_success) { return this->channelCreateFailed (&hdr, pvar.getStatus()); @@ -1165,14 +1067,13 @@ caStatus casStrmClient::createChanResponse(const caHdr &hdr, const pvAttachRetur } // - // NOTE: // We are allocating enough space for both the claim // response and the access rights response so that we know for // certain that they will both be sent together. // void *pRaw; - const outBufCtx outctx = this->outBuf::pushCtx - (0, 2*sizeof(caHdr), pRaw); + const outBufCtx outctx = this->out.pushCtx + ( 0, 2 * sizeof ( caHdr ), pRaw ); if (outctx.pushResult()!=outBufCtx::pushCtxSuccess) { return S_cas_sendBlocked; } @@ -1183,7 +1084,7 @@ caStatus casStrmClient::createChanResponse(const caHdr &hdr, const pvAttachRetur this->ctx.setPV (pPV); pChan = pPV->createChannel (this->ctx, this->pUserName, this->pHostName); if (!pChan) { - this->outBuf::popCtx (outctx); + this->out.popCtx (outctx); pPV->deleteSignal(); return this->channelCreateFailed (&hdr, S_cas_noMemory); } @@ -1191,21 +1092,19 @@ caStatus casStrmClient::createChanResponse(const caHdr &hdr, const pvAttachRetur pChanI = (casChannelI *) pChan; // - // NOTE: // We are certain that the request will complete // here because we allocated enough space for this // and the claim response above. // status = casStrmClient::accessRightsResponse(pChanI); if (status) { - this->outBuf::popCtx (outctx); + this->out.popCtx (outctx); errMessage(status, "incomplete channel create?"); pChanI->destroyNoClientNotify(); return this->channelCreateFailed(&hdr, status); } // - // NOTE: // We are allocated enough space for both the claim // response and the access response so that we know for // certain that they will both be sent together. @@ -1214,31 +1113,27 @@ caStatus casStrmClient::createChanResponse(const caHdr &hdr, const pvAttachRetur // here to be certain that we are at the correct place in // the protocol buffer. // - status = this->allocMsg (0u, &claim_reply); - if (status!=S_cas_success) { - this->outBuf::popCtx (outctx); - errMessage(status, "incomplete channel create?"); + assert ( nativeType <= 0xffff ); + unsigned nativeCount = pPV->nativeCount(); + status = this->out.copyInHeader ( CA_PROTO_CLAIM_CIU, 0, + static_cast ( nativeType ), + static_cast ( nativeCount ), + hdr.m_cid, pChanI->getSID(), 0 ); + if ( status != S_cas_success ) { + this->out.popCtx ( outctx ); + errMessage ( status, "incomplete channel create?" ); pChanI->destroyNoClientNotify(); - return this->channelCreateFailed(&hdr, status); + return this->channelCreateFailed ( &hdr, status ); } - *claim_reply = nill_msg; - claim_reply->m_cmmd = CA_PROTO_CLAIM_CIU; - assert ( nativeType <= 0xffff ); - claim_reply->m_dataType = static_cast (nativeType); - unsigned nativeCount = pPV->nativeCount(); - assert ( nativeType <= 0xffff ); - claim_reply->m_count = static_cast (nativeCount); - claim_reply->m_cid = hdr.m_cid; - claim_reply->m_available = pChanI->getSID(); - this->commitMsg(); + this->out.commitMsg (); // // commit the message // - nBytes = this->outBuf::popCtx (outctx); + nBytes = this->out.popCtx (outctx); assert ( nBytes == 2*sizeof(caHdr) ); - this->outBuf::commitRawMsg (nBytes); + this->out.commitRawMsg (nBytes); return status; } @@ -1249,37 +1144,32 @@ caStatus casStrmClient::createChanResponse(const caHdr &hdr, const pvAttachRetur * If we are talking to an CA_V46 client then tell them when a channel * cant be created (instead of just disconnecting) */ -caStatus casStrmClient::channelCreateFailed( -const caHdr *mp, -caStatus createStatus) +caStatus casStrmClient::channelCreateFailed ( + const caHdrLargeArray *mp, caStatus createStatus ) { caStatus status; - caHdr *reply; - if (createStatus == S_casApp_asyncCompletion) { - errMessage(S_cas_badParameter, + if ( createStatus == S_casApp_asyncCompletion ) { + errMessage( S_cas_badParameter, "- no asynchronous IO create in pvAttach() ?"); - errMessage(S_cas_badParameter, + errMessage( S_cas_badParameter, "- or S_casApp_asyncCompletion was async IO competion code ?"); } else { - errMessage (createStatus, "- Server unable to create a new PV"); + errMessage ( createStatus, "- Server unable to create a new PV"); } - if (CA_V46(this->minor_version_number)) { - - status = allocMsg (0u, &reply); - if (status) { + if ( CA_V46( this->minor_version_number ) ) { + status = this->out.copyInHeader ( CA_PROTO_CLAIM_CIU_FAILED, 0, + 0, 0, mp->m_cid, 0, 0 ); + if ( status ) { return status; } - *reply = nill_msg; - reply->m_cmmd = CA_PROTO_CLAIM_CIU_FAILED; - reply->m_cid = mp->m_cid; - this->commitMsg(); + this->out.commitMsg (); createStatus = S_cas_success; } else { - status = this->sendErrWithEpicsStatus(mp, createStatus, ECA_ALLOCMEM); - if (status) { + status = this->sendErrWithEpicsStatus ( mp, createStatus, ECA_ALLOCMEM ); + if ( status ) { return status; } } @@ -1293,27 +1183,24 @@ caStatus createStatus) * If we are talking to an CA_V47 client then tell them when a channel * was deleted by the server tool */ -caStatus casStrmClient::disconnectChan(caResId id) +caStatus casStrmClient::disconnectChan ( caResId id ) { caStatus status; caStatus createStatus; - caHdr *reply; - if (CA_V47(this->minor_version_number)) { + if ( CA_V47 ( this->minor_version_number ) ) { - status = allocMsg (0u, &reply); - if (status) { + status = this->out.copyInHeader ( CA_PROTO_SERVER_DISCONN, 0, + 0, 0, id, 0, 0 ); + if ( status ) { return status; } - *reply = nill_msg; - reply->m_cmmd = CA_PROTO_SERVER_DISCONN; - reply->m_cid = id; - this->commitMsg(); + this->out.commitMsg (); createStatus = S_cas_success; } else { - errlogPrintf( - "Disconnecting old client because of internal channel or PV delete\n"); + errlogPrintf ( + "Disconnecting old client because of internal channel or PV delete\n"); createStatus = S_cas_disconnect; } @@ -1342,7 +1229,7 @@ caStatus casStrmClient::eventsOffAction() // caStatus casStrmClient::eventAddAction () { - const caHdr *mp = this->ctx.getMsg(); + const caHdrLargeArray *mp = this->ctx.getMsg(); struct mon_info *pMonInfo = (struct mon_info *) this->ctx.getData(); casClientMon *pMonitor; @@ -1411,10 +1298,10 @@ caStatus casStrmClient::eventAddAction () return S_cas_success; } else { - status = this->monitorResponse (*pciu, *mp, pDD, status); + status = this->monitorResponse ( *pciu, *mp, pDD, status ); } - if (status==S_cas_success) { + if ( status == S_cas_success ) { pMonitor = new casClientMon(*pciu, mp->m_available, mp->m_count, mp->m_dataType, mask, *this); @@ -1440,17 +1327,16 @@ caStatus casStrmClient::eventAddAction () // caStatus casStrmClient::clearChannelAction () { - const caHdr *mp = this->ctx.getMsg(); - void *dp = this->ctx.getData(); - caHdr *reply; - casChannelI *pciu; + const caHdrLargeArray * mp = this->ctx.getMsg(); + const void * dp = this->ctx.getData(); + casChannelI * pciu; int status; /* * Verify the channel */ - pciu = this->resIdToChannel (mp->m_cid); - if (pciu==NULL) { + pciu = this->resIdToChannel ( mp->m_cid ); + if ( pciu == NULL ) { /* * it is possible that the channel delete arrives just * after the server tool has deleted the PV so we will @@ -1461,8 +1347,8 @@ caStatus casStrmClient::clearChannelAction () * return early here if we are unable to send the warning * so that send block conditions will be handled */ - status = logBadId (mp, dp, ECA_BADCHID, mp->m_cid); - if (status) { + status = logBadId ( mp, dp, ECA_BADCHID, mp->m_cid ); + if ( status ) { return status; } // @@ -1472,26 +1358,20 @@ caStatus casStrmClient::clearChannelAction () // } - /* - * send delete confirmed message - */ - status = this->allocMsg (0u, &reply); - if (status) { - return status; - } - // - // only execute the request after we have allocated - // space for the response + // send delete confirmed message // - if (pciu) { - pciu->destroyNoClientNotify (); - } + status = this->out.copyInHeader ( mp->m_cmmd, 0, + mp->m_dataType, mp->m_count, + mp->m_cid, mp->m_available, 0 ); + if ( ! status ) { + this->out.commitMsg (); + if ( pciu ) { + pciu->destroyNoClientNotify (); + } + } - *reply = *mp; - this->commitMsg (); - - return S_cas_success; + return status; } @@ -1500,17 +1380,16 @@ caStatus casStrmClient::clearChannelAction () // caStatus casStrmClient::eventCancelAction () { - const caHdr *mp = this->ctx.getMsg (); - void *dp = this->ctx.getData (); + const caHdrLargeArray * mp = this->ctx.getMsg (); + const void * dp = this->ctx.getData (); casChannelI *pciu; - caHdr *reply; int status; /* * Verify the channel */ - pciu = this->resIdToChannel (mp->m_cid); - if (!pciu) { + pciu = this->resIdToChannel ( mp->m_cid ); + if ( ! pciu ) { /* * it is possible that the event delete arrives just * after the server tool has deleted the PV. In this @@ -1518,7 +1397,8 @@ caStatus casStrmClient::eventCancelAction () * resource id for the return message and so we must force * the client to reconnect. */ - return logBadId (mp, dp, ECA_BADCHID, mp->m_cid); + logBadId ( mp, dp, ECA_BADCHID, mp->m_cid ); + return S_cas_badResourceId; } /* @@ -1527,33 +1407,27 @@ caStatus casStrmClient::eventCancelAction () tsDLIterBD pMon = pciu->findMonitor ( mp->m_available ); if ( ! pMon.valid () ) { // - // this indicates client or server library corruption + // this indicates client or server library corruption so a + // disconnect is the best response // - return logBadId ( mp, dp, ECA_BADMONID, mp->m_cid ); + logBadId ( mp, dp, ECA_BADMONID, mp->m_available ); + return S_cas_badResourceId; } - /* - * allocate delete confirmed message - */ - status = allocMsg ( 0u, &reply ); - if ( status ) { - return status; - } - - reply->m_cmmd = CA_PROTO_EVENT_ADD; - reply->m_postsize = 0u; unsigned type = pMon->getType (); assert ( type <= 0xff ); - reply->m_dataType = static_cast ( type ); - reply->m_count = (unsigned short) pMon->getCount (); - reply->m_cid = pciu->getCID (); - reply->m_available = pMon->getClientId (); + status = this->out.copyInHeader ( CA_PROTO_EVENT_ADD, 0, + static_cast ( type ), pMon->getCount (), + pciu->getCID (), pMon->getClientId (), 0 ); + if ( ! status ) { + this->out.commitMsg (); + pMon->destroy (); + } + else { + printf ( "no room for subscription destroy response message\n" ); + } - this->commitMsg (); - - pMon->destroy (); - - return S_cas_success; + return status; } #if 0 @@ -1570,7 +1444,7 @@ caStatus casStrmClient::noReadAccessEvent(casClientMon *pMon) caHdr *reply; int status; - size = dbr_size_n (pMon->getType(), pMon->getCount()); + size = dbr_size_n ( pMon->getType(), pMon->getCount() ); falseReply.m_cmmd = CA_PROTO_EVENT_ADD; falseReply.m_postsize = size; @@ -1580,8 +1454,8 @@ caStatus casStrmClient::noReadAccessEvent(casClientMon *pMon) falseReply.m_available = pMon->getClientId(); status = this->allocMsg(size, &reply); - if (status) { - if(status == S_cas_hugeRequest){ + if ( status ) { + if( status == S_cas_hugeRequest ) { return this->sendErr(&falseReply, ECA_TOLARGE, NULL); } return status; @@ -1603,7 +1477,7 @@ caStatus casStrmClient::noReadAccessEvent(casClientMon *pMon) reply->m_postsize = size; reply->m_cid = ECA_NORDACCESS; memset((char *)(reply+1), 0, size); - this->commitMsg(); + this->commitMsg (); } return S_cas_success; @@ -1615,9 +1489,8 @@ caStatus casStrmClient::noReadAccessEvent(casClientMon *pMon) // caStatus casStrmClient::readSyncAction() { - const caHdr *mp = this->ctx.getMsg(); - int status; - caHdr *reply; + const caHdrLargeArray *mp = this->ctx.getMsg(); + int status; // // This messages indicates that the client @@ -1633,16 +1506,14 @@ caStatus casStrmClient::readSyncAction() } this->unlock(); - status = this->allocMsg ( 0u, &reply ); - if ( status ) { - return status; + status = this->out.copyInHeader ( mp->m_cmmd, 0, + mp->m_dataType, mp->m_count, + mp->m_cid, mp->m_available, 0 ); + if ( ! status ) { + this->out.commitMsg (); } - *reply = *mp; - - this->commitMsg (); - - return S_cas_success; + return status; } // @@ -1654,39 +1525,33 @@ caStatus casStrmClient::readSyncAction() // caStatus casStrmClient::accessRightsResponse(casChannelI *pciu) { - caHdr *reply; - unsigned ar; - int v41; - int status; + unsigned ar; + int v41; + int status; - /* - * noop if this is an old client - */ - v41 = CA_V41(this->minor_version_number); - if(!v41){ + // + // noop if this is an old client + // + v41 = CA_V41 ( this->minor_version_number ); + if ( ! v41 ) { return S_cas_success; } - ar = 0; /* none */ - if (pciu->readAccess()) { + ar = 0; // none + if ( pciu->readAccess() ) { ar |= CA_PROTO_ACCESS_RIGHT_READ; } - if (pciu->writeAccess()) { + if ( pciu->writeAccess() ) { ar |= CA_PROTO_ACCESS_RIGHT_WRITE; } - status = this->allocMsg(0u, &reply); - if(status){ - return status; + status = this->out.copyInHeader ( CA_PROTO_ACCESS_RIGHTS, 0, + 0, 0, pciu->getCID(), ar, 0 ); + if ( ! status ) { + this->out.commitMsg (); } - *reply = nill_msg; - reply->m_cmmd = CA_PROTO_ACCESS_RIGHTS; - reply->m_cid = pciu->getCID(); - reply->m_available = ar; - this->commitMsg(); - - return S_cas_success; + return status; } // @@ -1694,7 +1559,7 @@ caStatus casStrmClient::accessRightsResponse(casChannelI *pciu) // caStatus casStrmClient::write() { - const caHdr *pHdr = this->ctx.getMsg(); + const caHdrLargeArray *pHdr = this->ctx.getMsg(); casPVI *pPV = this->ctx.getPV(); caStatus status; @@ -1768,7 +1633,7 @@ caStatus casStrmClient::write() caStatus casStrmClient::writeScalarData() { smartGDDPointer pDD; - const caHdr *pHdr = this->ctx.getMsg(); + const caHdrLargeArray *pHdr = this->ctx.getMsg(); gddStatus gddStat; caStatus status; aitEnum type; @@ -1836,7 +1701,7 @@ caStatus casStrmClient::writeScalarData() caStatus casStrmClient::writeArrayData() { smartGDDPointer pDD; - const caHdr *pHdr = this->ctx.getMsg(); + const caHdrLargeArray *pHdr = this->ctx.getMsg(); gddDestructor *pDestructor; gddStatus gddStat; caStatus status; @@ -1930,7 +1795,7 @@ caStatus casStrmClient::writeArrayData() // caStatus casStrmClient::read (smartGDDPointer &pDescRet) { - const caHdr *pHdr = this->ctx.getMsg(); + const caHdrLargeArray *pHdr = this->ctx.getMsg(); caStatus status; pDescRet = createDBRDD (pHdr->m_dataType, pHdr->m_count); @@ -1961,7 +1826,7 @@ caStatus casStrmClient::read (smartGDDPointer &pDescRet) // async IO but dont return status // indicating so (and vise versa) // - if (this->asyncIOFlag) { + if ( this->asyncIOFlag ) { if (status!=S_casApp_asyncCompletion) { fprintf(stderr, "Application returned %d from casPV::read() - expected S_casApp_asyncCompletion\n", @@ -1969,13 +1834,13 @@ caStatus casStrmClient::read (smartGDDPointer &pDescRet) status = S_casApp_asyncCompletion; } } - else if (status == S_casApp_asyncCompletion) { + else if ( status == S_casApp_asyncCompletion ) { status = S_cas_badParameter; errMessage(status, "- expected asynch IO creation from casPV::read()"); } - if (status) { + if ( status ) { pDescRet = NULL; } @@ -2036,29 +1901,29 @@ void casStrmClient::removeChannel(casChannelI &chan) // // casStrmClient::xSend() // -outBuf::flushCondition casStrmClient::xSend (char *pBufIn, +outBufClient::flushCondition casStrmClient::xSend ( char * pBufIn, bufSizeT nBytesAvailableToSend, bufSizeT nBytesNeedToBeSent, - bufSizeT &nActualBytes) + bufSizeT & nActualBytes ) { - outBuf::flushCondition stat; + outBufClient::flushCondition stat; bufSizeT nActualBytesDelta; bufSizeT totalBytes; - assert (nBytesAvailableToSend>=nBytesNeedToBeSent); + assert ( nBytesAvailableToSend >= nBytesNeedToBeSent ); totalBytes = 0u; while ( true ) { - stat = this->osdSend (&pBufIn[totalBytes], - nBytesAvailableToSend-totalBytes, nActualBytesDelta); - if (stat != outBuf::flushProgress) { - if (totalBytes>0) { + stat = this->osdSend ( &pBufIn[totalBytes], + nBytesAvailableToSend-totalBytes, nActualBytesDelta ); + if ( stat != outBufClient::flushProgress ) { + if ( totalBytes > 0 ) { nActualBytes = totalBytes; // // !! this time fetch may be slowing things down !! // //this->lastSendTS = epicsTime::getCurrent(); - return outBuf::flushProgress; + return outBufClient::flushProgress; } else { return stat; @@ -2067,31 +1932,31 @@ outBuf::flushCondition casStrmClient::xSend (char *pBufIn, totalBytes += nActualBytesDelta; - if (totalBytes>=nBytesNeedToBeSent) { + if ( totalBytes >= nBytesNeedToBeSent ) { // // !! this time fetch may be slowing things down !! // //this->lastSendTS = epicsTime::getCurrent(); nActualBytes = totalBytes; - return outBuf::flushProgress; + return outBufClient::flushProgress; } } - return outBuf::flushDisconnect; // happy compiler + return outBufClient::flushDisconnect; // happy compiler } // // casStrmClient::xRecv() // -inBuf::fillCondition casStrmClient::xRecv(char *pBufIn, bufSizeT nBytes, - enum inBuf::fillParameter, bufSizeT &nActualBytes) +inBufClient::fillCondition casStrmClient::xRecv ( char * pBufIn, bufSizeT nBytes, + inBufClient::fillParameter, bufSizeT & nActualBytes ) { - inBuf::fillCondition stat; + inBufClient::fillCondition stat; - stat = this->osdRecv (pBufIn, nBytes, nActualBytes); + stat = this->osdRecv ( pBufIn, nBytes, nActualBytes ); // // this is used to set the time stamp for write GDD's // - this->lastRecvTS = epicsTime::getCurrent(); + this->lastRecvTS = epicsTime::getCurrent (); return stat; } @@ -2103,3 +1968,9 @@ unsigned casStrmClient::getDebugLevel() const return this->getCAS().getDebugLevel(); } +void casStrmClient::flush () +{ + this->out.flush (); +} + + diff --git a/src/cas/generic/casdef.h b/src/cas/generic/casdef.h index 51cbb6fbf..00b61482c 100644 --- a/src/cas/generic/casdef.h +++ b/src/cas/generic/casdef.h @@ -809,7 +809,7 @@ public: epicsShareFunc virtual void destroy (); private: - caHdr const msg; + caHdrLargeArray const msg; casChannelI &chan; smartConstGDDPointer pDD; caStatus completionStatus; @@ -870,7 +870,7 @@ public: epicsShareFunc virtual void destroy (); private: - caHdr const msg; + caHdrLargeArray const msg; casChannelI &chan; caStatus completionStatus; @@ -919,7 +919,7 @@ public: epicsShareFunc virtual void destroy(); private: - caHdr const msg; + caHdrLargeArray const msg; pvExistReturn retVal; const caNetAddr dgOutAddr; @@ -966,7 +966,7 @@ public: epicsShareFunc virtual void destroy (); private: - caHdr const msg; + caHdrLargeArray const msg; pvAttachReturn retVal; epicsShareFunc caStatus cbFuncAsyncIO (); diff --git a/src/cas/generic/dgInBufIL.h b/src/cas/generic/dgInBufIL.h deleted file mode 100644 index be2e0113e..000000000 --- a/src/cas/generic/dgInBufIL.h +++ /dev/null @@ -1,8 +0,0 @@ - -#ifndef dgInBufILh -#define dgInBufILh - -#include "inBufIL.h" - -#endif // dgInBufILh - diff --git a/src/cas/generic/inBuf.cc b/src/cas/generic/inBuf.cc index efe25af75..0db326fd7 100644 --- a/src/cas/generic/inBuf.cc +++ b/src/cas/generic/inBuf.cc @@ -34,25 +34,20 @@ // // inBuf::inBuf() // -inBuf::inBuf (bufSizeT bufSizeIn, bufSizeT ioMinSizeIn) : - bufSize (bufSizeIn), - bytesInBuffer (0u), - nextReadIndex (0u), - ioMinSize (ioMinSizeIn), - ctxRecursCount (0u) +inBuf::inBuf ( inBufClient &clientIn, bufSizeT ioMinSizeIn ) : + client ( clientIn ), pBuf ( 0 ), bufSize ( 0u ), bytesInBuffer ( 0u ), nextReadIndex ( 0u ), + ioMinSize ( ioMinSizeIn ), ctxRecursCount ( 0u ) { - if (this->ioMinSize==0) { + if ( this->ioMinSize == 0 ) { this->ioMinSize = 1; } - - if ( this->bufSize < this->ioMinSize ) { - this->bufSize = this->ioMinSize; + if ( this->ioMinSize <= pGlobalBufferFactoryCAS->smallBufferSize () ) { + this->pBuf = pGlobalBufferFactoryCAS->newSmallBuffer (); + this->bufSize = pGlobalBufferFactoryCAS->smallBufferSize (); } - - this->pBuf = new char [this->bufSize]; - if (!this->pBuf) { - this->bufSize = 0u; - throw S_cas_noMemory; + else { + this->pBuf = new char [ this->ioMinSize ]; + this->bufSize = this->ioMinSize; } } @@ -62,8 +57,16 @@ inBuf::inBuf (bufSizeT bufSizeIn, bufSizeT ioMinSizeIn) : // inBuf::~inBuf () { - assert (this->ctxRecursCount==0); - delete [] this->pBuf; + assert ( this->ctxRecursCount == 0 ); + if ( this->bufSize == pGlobalBufferFactoryCAS->smallBufferSize () ) { + pGlobalBufferFactoryCAS->destroySmallBuffer ( this->pBuf ); + } + else if ( this->bufSize == pGlobalBufferFactoryCAS->largeBufferSize () ) { + pGlobalBufferFactoryCAS->destroyLargeBuffer ( this->pBuf ); + } + else { + delete [] this->pBuf; + } } // @@ -81,11 +84,11 @@ void inBuf::show (unsigned level) const // // inBuf::fill() // -inBuf::fillCondition inBuf::fill (fillParameter parm) +inBufClient::fillCondition inBuf::fill ( inBufClient::fillParameter parm ) { bufSizeT bytesOpen; bufSizeT bytesRecv; - inBuf::fillCondition stat; + inBufClient::fillCondition stat; // // move back any prexisting data to the start of the buffer @@ -108,20 +111,20 @@ inBuf::fillCondition inBuf::fill (fillParameter parm) // noop if the buffer is full // bytesOpen = this->bufSize - this->bytesInBuffer; - if (bytesOpen < this->ioMinSize) { - return casFillNone; + if ( bytesOpen < this->ioMinSize ) { + return inBufClient::casFillNone; } - stat = this->xRecv (&this->pBuf[this->bytesInBuffer], - bytesOpen, parm, bytesRecv); - if (stat == casFillProgress) { + stat = this->client.xRecv ( &this->pBuf[this->bytesInBuffer], + bytesOpen, parm, bytesRecv ); + if ( stat == inBufClient::casFillProgress ) { assert (bytesRecv<=bytesOpen); this->bytesInBuffer += bytesRecv; - if (this->getDebugLevel()>2u) { + if ( this->client.getDebugLevel() > 2u ) { char buf[64]; - this->clientHostName(buf, sizeof(buf)); + this->client.hostName ( buf, sizeof ( buf ) ); printf ("CAS: incoming %u byte msg from %s\n", bytesRecv, buf); @@ -134,11 +137,11 @@ inBuf::fillCondition inBuf::fill (fillParameter parm) // // inBuf::pushCtx () // -const inBufCtx inBuf::pushCtx (bufSizeT headerSize, // X aCC 361 - bufSizeT bodySize) +const inBufCtx inBuf::pushCtx ( bufSizeT headerSize, // X aCC 361 + bufSizeT bodySize ) { - if (headerSize+bodySize>(this->bytesInBuffer - this->nextReadIndex) || - this->ctxRecursCount==UINT_MAX) { + if ( headerSize + bodySize > ( this->bytesInBuffer - this->nextReadIndex ) || + this->ctxRecursCount == UINT_MAX ) { return inBufCtx (); } else { @@ -159,7 +162,7 @@ const inBufCtx inBuf::pushCtx (bufSizeT headerSize, // X aCC 361 // bufSizeT inBuf::popCtx (const inBufCtx &ctx) // X aCC 361 { - if (ctx.stat==inBufCtx::pushCtxSuccess) { + if ( ctx.stat==inBufCtx::pushCtxSuccess ) { this->mutex.lock(); bufSizeT bytesRemoved = this->nextReadIndex; this->pBuf = ctx.pBuf; @@ -176,5 +179,21 @@ bufSizeT inBuf::popCtx (const inBufCtx &ctx) // X aCC 361 } } +void inBuf::expandBuffer () +{ + if ( this->bufSize < pGlobalBufferFactoryCAS->largeBufferSize () ) { + char * pNewBuf = pGlobalBufferFactoryCAS->newLargeBuffer (); + memcpy ( pNewBuf, &this->pBuf[this->nextReadIndex], this->bytesPresent () ); + this->nextReadIndex = 0u; + pGlobalBufferFactoryCAS->destroySmallBuffer ( this->pBuf ); + this->pBuf = pNewBuf; + this->bufSize = pGlobalBufferFactoryCAS->largeBufferSize (); + } +} + +unsigned inBuf::bufferSize () const +{ + return this->bufSize; +} diff --git a/src/cas/generic/inBufIL.h b/src/cas/generic/inBufIL.h index a645526d9..a17f52ad4 100644 --- a/src/cas/generic/inBufIL.h +++ b/src/cas/generic/inBufIL.h @@ -16,8 +16,8 @@ inline bufSizeT inBuf::bytesPresent () const inline bufSizeT inBuf::bytesAvailable () const { bufSizeT bp; - bp = this->bytesPresent(); - bp += this->incomingBytesPresent(); + bp = this->bytesPresent (); + bp += this->client.incomingBytesPresent (); return bp; } @@ -52,10 +52,10 @@ inline char *inBuf::msgPtr () const // // inBuf::removeMsg() // -inline void inBuf::removeMsg (bufSizeT nBytes) +inline void inBuf::removeMsg ( bufSizeT nBytes ) { this->nextReadIndex += nBytes; - assert (this->nextReadIndex<=this->bytesInBuffer); + assert ( this->nextReadIndex <= this->bytesInBuffer ); } // diff --git a/src/cas/generic/outBuf.cc b/src/cas/generic/outBuf.cc index 06ce7f8c0..1f2d4bdfb 100644 --- a/src/cas/generic/outBuf.cc +++ b/src/cas/generic/outBuf.cc @@ -36,14 +36,12 @@ // // outBuf::outBuf() // -outBuf::outBuf (unsigned bufSizeIn) : - bufSize (bufSizeIn), stack (0u), ctxRecursCount (0u) +outBuf::outBuf ( outBufClient & clientIn ) : + client ( clientIn ), bufSize ( 0 ), stack ( 0u ), ctxRecursCount ( 0u ) { - this->pBuf = new char [this->bufSize]; - if (!this->pBuf) { - throw S_cas_noMemory; - } - memset (this->pBuf, '\0', this->bufSize); + this->pBuf = pGlobalBufferFactoryCAS->newSmallBuffer (); + this->bufSize = pGlobalBufferFactoryCAS->smallBufferSize (); + memset ( this->pBuf, '\0', this->bufSize ); } // @@ -51,42 +49,52 @@ outBuf::outBuf (unsigned bufSizeIn) : // outBuf::~outBuf() { - assert (this->ctxRecursCount==0); - delete [] this->pBuf; + assert ( this->ctxRecursCount == 0 ); + if ( this->bufSize == pGlobalBufferFactoryCAS->smallBufferSize () ) { + pGlobalBufferFactoryCAS->destroySmallBuffer ( this->pBuf ); + } + else if ( this->bufSize == pGlobalBufferFactoryCAS->largeBufferSize () ) { + pGlobalBufferFactoryCAS->destroyLargeBuffer ( this->pBuf ); + } + else if ( this->pBuf ) { + errlogPrintf ( + "cas: unusual outBuf buffer size %u in destructor - probable leak\n", + this->bufSize ); + } } // // outBuf::allocRawMsg () // -caStatus outBuf::allocRawMsg (bufSizeT msgsize, void **ppMsg) +caStatus outBuf::allocRawMsg ( bufSizeT msgsize, void **ppMsg ) { bufSizeT stackNeeded; - msgsize = CA_MESSAGE_ALIGN (msgsize); + msgsize = CA_MESSAGE_ALIGN ( msgsize ); this->mutex.lock (); - if (msgsize>this->bufSize) { + if ( msgsize > this->bufSize ) { this->mutex.unlock (); return S_cas_hugeRequest; } stackNeeded = this->bufSize - msgsize; - if (this->stack > stackNeeded) { + if ( this->stack > stackNeeded ) { // // Try to flush the output queue // - this->flush (this->stack-stackNeeded); + this->flush ( this->stack-stackNeeded ); // // If this failed then the fd is nonblocking // and we will let select() take care of it // - if (this->stack > stackNeeded) { + if ( this->stack > stackNeeded ) { this->mutex.unlock(); - this->sendBlockSignal(); + this->client.sendBlockSignal(); return S_cas_sendBlocked; } } @@ -99,63 +107,135 @@ caStatus outBuf::allocRawMsg (bufSizeT msgsize, void **ppMsg) return S_cas_success; } +caStatus outBuf::copyInHeader ( ca_uint16_t response, ca_uint32_t payloadSize, + ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, + ca_uint32_t responseSpecific, void **ppPayload ) +{ + ca_uint32_t alignedPayloadSize = CA_MESSAGE_ALIGN ( payloadSize ); + ca_uint32_t hdrSize; + + if ( alignedPayloadSize < 0xffff && nElem < 0xffff ) { + hdrSize = sizeof ( caHdr ); + } + else { + hdrSize = sizeof ( caHdr ) + 2 * sizeof (ca_uint32_t); + } + + caHdr * pHdr; + caStatus status = this->allocRawMsg ( hdrSize + alignedPayloadSize, + reinterpret_cast < void ** > ( & pHdr ) ); + if ( status != S_cas_success ) { + if ( status == S_cas_hugeRequest ) { + this->expandBuffer (); + status = this->allocRawMsg ( hdrSize + alignedPayloadSize, + reinterpret_cast < void ** > ( & pHdr ) ); + if ( status != S_cas_success ) { + return status; + } + } + else { + return status; + } + } + + pHdr->m_cmmd = epicsHTON16 ( response ); + pHdr->m_dataType = epicsHTON16 ( dataType ); + pHdr->m_cid = epicsHTON32 ( cid ); + pHdr->m_available = epicsHTON32 ( responseSpecific ); + char * pPayload; + if ( hdrSize == sizeof ( caHdr ) ) { + pHdr->m_postsize = epicsHTON16 ( alignedPayloadSize ); + pHdr->m_count = epicsHTON16 ( nElem ); + pPayload = reinterpret_cast < char * > ( pHdr + 1 ); + } + else { + pHdr->m_postsize = epicsHTON16 ( 0xffff ); + pHdr->m_count = epicsHTON16 ( 0 ); + ca_uint32_t * pLW = reinterpret_cast ( pHdr + 1 ); + pLW[0] = epicsHTON32 ( alignedPayloadSize ); + pLW[1] = epicsHTON32 ( nElem ); + pPayload = reinterpret_cast < char * > ( pLW + 2 ); + } + + /* zero out pad bytes */ + if ( alignedPayloadSize > payloadSize ) { + memset ( pPayload + payloadSize, '\0', + alignedPayloadSize - payloadSize ); + } + + if ( ppPayload ) { + *ppPayload = pPayload; + } + + return status; +} + // // outBuf::commitMsg () // void outBuf::commitMsg () { - caHdr *mp; - unsigned diff; - ca_uint16_t extSize; + ca_uint32_t payloadSize; + ca_uint32_t elementCount; + ca_uint32_t hdrSize; - mp = (caHdr *) &this->pBuf[this->stack]; + const caHdr * mp = ( caHdr * ) & this->pBuf[ this->stack ]; + if ( mp->m_postsize == 0xffff || mp->m_count == 0xffff ) { + const ca_uint32_t *pLW = reinterpret_cast ( mp + 1 ); + payloadSize = epicsNTOH32 ( pLW[0] ); + elementCount = epicsNTOH32 ( pLW[1] ); + hdrSize = sizeof ( caHdr ) + 2 * sizeof ( ca_uint32_t ); + } + else { + payloadSize = epicsNTOH16 ( mp->m_postsize ); + elementCount = epicsNTOH16 ( mp->m_count ); + hdrSize = sizeof ( caHdr ); + } - extSize = static_cast ( CA_MESSAGE_ALIGN (mp->m_postsize) ); + this->commitRawMsg ( hdrSize + payloadSize ); - // - // Guarantee that all portions of outgoing messages - // (including alignment pad) are initialized - // - diff = extSize - mp->m_postsize; - if (diff>0u) { - memset ( reinterpret_cast(mp+1) + mp->m_postsize, '\0', diff ); - } - - if (this->getDebugLevel()) { + if ( this->client.getDebugLevel() ) { errlogPrintf ( -"CAS Response => cmd=%d id=%x typ=%d cnt=%d psz=%d avail=%x outBuf ptr=%lx\n", - mp->m_cmmd, mp->m_cid, mp->m_dataType, mp->m_count, mp->m_postsize, - mp->m_available, (long)mp); +"CAS Response => cmd=%d id=%x typ=%d cnt=%d psz=%d avail=%x outBuf ptr=%p \n", + epicsNTOH16 ( mp->m_cmmd ), epicsNTOH32 ( mp->m_cid ), + epicsNTOH16 ( mp->m_dataType ), elementCount, payloadSize, + epicsNTOH32 ( mp->m_available ), static_cast ( mp ) ); } +} - // - // convert to network byte order - // (data following header is handled elsewhere) - // - mp->m_cmmd = epicsHTON16 (mp->m_cmmd); - mp->m_postsize = epicsHTON16 (extSize); - mp->m_dataType = epicsHTON16 (mp->m_dataType); - mp->m_count = epicsHTON16 (mp->m_count); - mp->m_cid = epicsHTON32 (mp->m_cid); - mp->m_available = epicsHTON32 (mp->m_available); - - commitRawMsg (extSize + sizeof(*mp)); +// +// outBuf::commitMsg () +// +void outBuf::commitMsg ( ca_uint32_t reducedPayloadSize ) +{ + caHdr * mp = ( caHdr * ) & this->pBuf[ this->stack ]; + reducedPayloadSize = CA_MESSAGE_ALIGN ( reducedPayloadSize ); + if ( mp->m_postsize == 0xffff || mp->m_count == 0xffff ) { + ca_uint32_t *pLW = reinterpret_cast ( mp + 1 ); + assert ( reducedPayloadSize <= epicsNTOH32 ( pLW[0] ) ); + pLW[0] = epicsHTON32 ( reducedPayloadSize ); + } + else { + assert ( reducedPayloadSize <= epicsNTOH16 ( mp->m_postsize ) ); + mp->m_postsize = epicsHTON16 ( static_cast ( reducedPayloadSize ) ); + } + this->commitMsg (); } // // outBuf::flush () // -outBuf::flushCondition outBuf::flush (bufSizeT spaceRequired) +outBufClient::flushCondition outBuf::flush ( bufSizeT spaceRequired ) { - bufSizeT nBytes; - bufSizeT nBytesRequired; - flushCondition cond; + bufSizeT nBytes; + bufSizeT nBytesRequired; + outBufClient::flushCondition cond; this->mutex.lock(); if (this->ctxRecursCount>0) { this->mutex.unlock(); - return flushNone; + return outBufClient::flushNone; } if (spaceRequired>this->bufSize) { @@ -173,9 +253,9 @@ outBuf::flushCondition outBuf::flush (bufSizeT spaceRequired) } } - cond = this->xSend(this->pBuf, this->stack, + cond = this->client.xSend(this->pBuf, this->stack, nBytesRequired, nBytes); - if (cond==flushProgress) { + if (cond==outBufClient::flushProgress) { bufSizeT len; if (nBytes >= this->stack) { @@ -190,9 +270,9 @@ outBuf::flushCondition outBuf::flush (bufSizeT spaceRequired) this->stack = len; } - if (this->getDebugLevel()>2u) { + if (this->client.getDebugLevel()>2u) { char buf[64]; - this->clientHostName (buf, sizeof(buf)); + this->client.hostName (buf, sizeof(buf)); errlogPrintf ("CAS: Sent a %d byte reply to %s\n", nBytes, buf); } @@ -252,8 +332,20 @@ bufSizeT outBuf::popCtx (const outBufCtx &ctx) // X aCC 361 // void outBuf::show (unsigned level) const { - if (level>1u) { + if ( level > 1u ) { printf("\tUndelivered response bytes = %d\n", this->bytesPresent()); } } +void outBuf::expandBuffer () +{ + if ( this->bufSize < pGlobalBufferFactoryCAS->largeBufferSize () ) { + char * pNewBuf = pGlobalBufferFactoryCAS->newLargeBuffer (); + memcpy ( pNewBuf, this->pBuf, this->stack ); + pGlobalBufferFactoryCAS->destroySmallBuffer ( this->pBuf ); + this->pBuf = pNewBuf; + this->bufSize = pGlobalBufferFactoryCAS->largeBufferSize (); + } +} + + diff --git a/src/cas/generic/outBufIL.h b/src/cas/generic/outBufIL.h index b4e0d9697..2c9ff44f1 100644 --- a/src/cas/generic/outBufIL.h +++ b/src/cas/generic/outBufIL.h @@ -52,10 +52,10 @@ inline void outBuf::clear () // // (if space is avilable this leaves the send lock applied) // -inline caStatus outBuf::allocMsg (bufSizeT extsize, caHdr **ppMsg) -{ - return this->allocRawMsg (extsize + sizeof(caHdr), (void **)ppMsg); -} +//inline caStatus outBuf::allocMsg (bufSizeT extsize, caHdr **ppMsg) +//{ +// return this->allocRawMsg (extsize + sizeof(caHdr), (void **)ppMsg); +//} // // outBuf::commitRawMsg() @@ -63,7 +63,7 @@ inline caStatus outBuf::allocMsg (bufSizeT extsize, caHdr **ppMsg) inline void outBuf::commitRawMsg (bufSizeT size) { this->stack += size; - assert (this->stack <= this->bufSize); + assert ( this->stack <= this->bufSize ); this->mutex.unlock(); } diff --git a/src/cas/generic/server.h b/src/cas/generic/server.h index 1fb7470b5..489c7f8c6 100644 --- a/src/cas/generic/server.h +++ b/src/cas/generic/server.h @@ -37,7 +37,7 @@ #include #if defined(epicsExportSharedSymbols) -#error suspect that libCom, ca, and gdd were not imported +# error suspect that libCom, ca, and gdd were not imported #endif // @@ -55,6 +55,7 @@ #include "errMdef.h" // EPICS error codes #include "resourceLib.h" // EPICS hashing templates #include "errlog.h" // EPICS error logging interface +#include "epicsSingleton.h" // // CA @@ -188,44 +189,38 @@ private: class casCtx { public: - inline casCtx(); + casCtx(); // // get // - inline const caHdr *getMsg() const; - inline void *getData() const; - inline caServerI * getServer() const; - inline casCoreClient * getClient() const; - inline casPVI * getPV() const; - inline casChannelI * getChannel() const; + const caHdrLargeArray * getMsg() const; + void * getData() const; + caServerI * getServer() const; + casCoreClient * getClient() const; + casPVI * getPV() const; + casChannelI * getChannel() const; - // - // set - // (assumes incoming message is in network byte order) - // - inline void setMsg(const char *pBuf); + void setMsg ( caHdrLargeArray &, void * pBody ); - inline void setData(void *p); + void setServer ( caServerI *p ); - inline void setServer(caServerI *p); + void setClient ( casCoreClient *p ); - inline void setClient(casCoreClient *p); + void setPV ( casPVI *p ); - inline void setPV(casPVI *p); + void setChannel ( casChannelI *p ); - inline void setChannel(casChannelI *p); - - void show (unsigned level) const; + void show ( unsigned level ) const; private: - caHdr msg; // ca message header - void *pData; // pointer to data following header - caServerI *pCAS; - casCoreClient *pClient; - casChannelI *pChannel; - casPVI *pPV; - unsigned nAsyncIO; // checks for improper use of async io + caHdrLargeArray msg; // ca message header + void *pData; // pointer to data following header + caServerI *pCAS; + casCoreClient *pClient; + casChannelI *pChannel; + casPVI *pPV; + unsigned nAsyncIO; // checks for improper use of async io }; // @@ -248,6 +243,37 @@ private: bufSizeT nextReadIndex; }; +class casBufferFactory { +public: + casBufferFactory (); + ~casBufferFactory (); + unsigned smallBufferSize () const; + char * newSmallBuffer (); + void destroySmallBuffer ( char * pBuf ); + unsigned largeBufferSize () const; + char * newLargeBuffer (); + void destroyLargeBuffer ( char * pBuf ); +private: + void * smallBufFreeList; + void * largeBufFreeList; + unsigned largeBufferSizePriv; +}; + +extern epicsSingleton < casBufferFactory > pGlobalBufferFactoryCAS; + +class inBufClient { +public: + enum fillCondition { casFillNone, casFillProgress, + casFillDisconnect }; + // this is a hack for a Solaris IP kernel feature + enum fillParameter { fpNone, fpUseBroadcastInterface }; + virtual unsigned getDebugLevel () const = 0; + virtual bufSizeT incomingBytesPresent () const = 0; + virtual fillCondition xRecv ( char *pBuf, bufSizeT nBytesToRecv, + enum fillParameter parm, bufSizeT &nByesRecv ) = 0; + virtual void hostName ( char *pBuf, unsigned bufSize ) const = 0; +}; + // // inBuf // @@ -255,13 +281,7 @@ class inBuf { friend class inBufCtx; public: - enum fillCondition {casFillNone, casFillProgress, - casFillDisconnect}; - - // this is a hack for a Solaris IP kernel feature - enum fillParameter {fpNone, fpUseBroadcastInterface}; - - inBuf (bufSizeT bufSizeIn, bufSizeT ioMinSizeIn); + inBuf ( inBufClient &, bufSizeT ioMinSizeIn ); virtual ~inBuf (); bufSizeT bytesPresent () const; @@ -273,17 +293,15 @@ public: // // fill the input buffer with any incoming messages // - fillCondition fill (enum fillParameter parm=fpNone); + inBufClient::fillCondition fill ( inBufClient::fillParameter parm = inBufClient::fpNone ); - void show (unsigned level) const; + void show ( unsigned level ) const; -protected: + void removeMsg ( bufSizeT nBytes ); void clear (); - - char *msgPtr () const; - - void removeMsg (bufSizeT nBytes); + + char * msgPtr () const; // // This is used to create recursive protocol stacks. A subsegment @@ -294,22 +312,22 @@ protected: // // pushCtx() returns an outBufCtx to be restored by popCtx() // - const inBufCtx pushCtx (bufSizeT headerSize, bufSizeT bodySize); - bufSizeT popCtx (const inBufCtx &); // returns actual size + const inBufCtx pushCtx ( bufSizeT headerSize, bufSizeT bodySize ); + bufSizeT popCtx ( const inBufCtx & ); // returns actual size + + unsigned bufferSize () const; + + void expandBuffer (); private: epicsMutex mutex; - char *pBuf; + inBufClient & client; + char * pBuf; bufSizeT bufSize; bufSizeT bytesInBuffer; bufSizeT nextReadIndex; bufSizeT ioMinSize; unsigned ctxRecursCount; - virtual unsigned getDebugLevel () const = 0; - virtual bufSizeT incomingBytesPresent () const = 0; - virtual fillCondition xRecv (char *pBuf, bufSizeT nBytesToRecv, - enum inBuf::fillParameter parm, bufSizeT &nByesRecv) = 0; - virtual void clientHostName (char *pBuf, unsigned bufSize) const = 0; inBuf ( const inBuf & ); inBuf & operator = ( const inBuf & ); }; @@ -333,6 +351,16 @@ private: bufSizeT stack; }; +class outBufClient { +public: + enum flushCondition { flushNone, flushProgress, flushDisconnect }; + virtual unsigned getDebugLevel () const = 0; + virtual void sendBlockSignal () = 0; + virtual flushCondition xSend ( char *pBuf, bufSizeT nBytesAvailableToSend, + bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent ) = 0; + virtual void hostName ( char *pBuf, unsigned bufSize ) const = 0; +}; + // // outBuf // @@ -340,34 +368,37 @@ class outBuf { friend class outBufCtx; public: - enum flushCondition {flushNone, flushProgress, flushDisconnect}; - - outBuf (bufSizeT bufSizeIn); + outBuf ( outBufClient & ); virtual ~outBuf (); - bufSizeT bytesPresent() const; + bufSizeT bytesPresent () const; // // flush output queue // (returns the number of bytes sent) // - flushCondition flush (bufSizeT spaceRequired=bufSizeT_MAX); + outBufClient::flushCondition flush ( bufSizeT spaceRequired=bufSizeT_MAX ); void show (unsigned level) const; -protected: + unsigned bufferSize () const; // // allocate message buffer space // (leaves message buffer locked) // - caStatus allocMsg (bufSizeT extsize, caHdr **ppMsg); + caStatus copyInHeader ( ca_uint16_t response, ca_uint32_t payloadSize, + ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, + ca_uint32_t responseSpecific, void **pPayload ); - // - // allocate message buffer space - // (leaves message buffer locked) - // - caStatus allocRawMsg (bufSizeT size, void **ppMsg); + // + // commit message created with copyInHeader + // + void commitMsg (); + void commitMsg ( ca_uint32_t reducedPayloadSize ); + + caStatus allocRawMsg ( bufSizeT msgsize, void **ppMsg ); + void commitRawMsg ( bufSizeT size ); // // This is used to create recursive protocol stacks. A subsegment @@ -378,37 +409,20 @@ protected: // // pushCtx() returns an outBufCtx to be restored by popCtx() // - const outBufCtx pushCtx (bufSizeT headerSize, bufSizeT maxBodySize, void *&pHeader); - bufSizeT popCtx (const outBufCtx &); // returns actual size - - // - // commits message allocated with allocMsg() - // - void commitMsg (); - - // - // commits message allocated with allocRawMsg() - // - void commitRawMsg (bufSizeT size); + const outBufCtx pushCtx ( bufSizeT headerSize, bufSizeT maxBodySize, void *&pHeader ); + bufSizeT popCtx ( const outBufCtx & ); // returns actual size void clear (); private: mutable epicsMutex mutex; - char *pBuf; + outBufClient & client; + char * pBuf; bufSizeT bufSize; bufSizeT stack; unsigned ctxRecursCount; - virtual unsigned getDebugLevel() const = 0; - virtual void sendBlockSignal() = 0; - - // - // io dependent - // - virtual flushCondition xSend (char *pBuf, bufSizeT nBytesAvailableToSend, - bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent) = 0; - virtual void clientHostName (char *pBuf, unsigned bufSize) const = 0; + void expandBuffer (); outBuf ( const outBuf & ); outBuf & operator = ( const outBuf & ); @@ -444,8 +458,8 @@ public: caServerI &getCAS() const; - virtual caStatus monitorResponse (casChannelI &chan, const caHdr &msg, - const smartConstGDDPointer &pDesc, const caStatus status); + virtual caStatus monitorResponse ( casChannelI &chan, const caHdrLargeArray &msg, + const smartConstGDDPointer &pDesc, const caStatus status ); virtual caStatus accessRightsResponse(casChannelI *); @@ -454,16 +468,16 @@ public: // asynchronous completion // virtual caStatus asyncSearchResponse ( - const caNetAddr &outAddr, - const caHdr &, const pvExistReturn &); + const caNetAddr & outAddr, + const caHdrLargeArray &, const pvExistReturn & ); virtual caStatus createChanResponse ( - const caHdr &, const pvAttachReturn &); + const caHdrLargeArray &, const pvAttachReturn &); virtual caStatus readResponse ( - casChannelI *, const caHdr &, const smartConstGDDPointer &, const caStatus); + casChannelI *, const caHdrLargeArray &, const smartConstGDDPointer &, const caStatus); virtual caStatus readNotifyResponse ( - casChannelI *, const caHdr &, const smartConstGDDPointer &, const caStatus); - virtual caStatus writeResponse (const caHdr &, const caStatus); - virtual caStatus writeNotifyResponse (const caHdr &, const caStatus); + casChannelI *, const caHdrLargeArray &, const smartConstGDDPointer &, const caStatus); + virtual caStatus writeResponse (const caHdrLargeArray &, const caStatus); + virtual caStatus writeNotifyResponse (const caHdrLargeArray &, const caStatus); // // used only with DG clients @@ -471,8 +485,8 @@ public: virtual caNetAddr fetchLastRecvAddr () const; protected: - casCtx ctx; - unsigned char asyncIOFlag; + casCtx ctx; + unsigned char asyncIOFlag; private: tsDLList ioInProgList; @@ -483,17 +497,17 @@ private: // // casClient // -class casClient : public casCoreClient, public inBuf, public outBuf { +class casClient : public casCoreClient, public outBufClient, + public inBufClient { public: - typedef caStatus (casClient::*pCASMsgHandler) (); - casClient (caServerI &, bufSizeT ioMinSizeIn); + casClient ( caServerI &, bufSizeT ioMinSizeIn ); virtual ~casClient (); - virtual void show (unsigned level) const; + virtual void show ( unsigned level ) const; - caStatus sendErr (const caHdr *, const int reportedStatus, - const char *pFormat, ...); + caStatus sendErr ( const caHdrLargeArray *, const int reportedStatus, + const char *pFormat, ... ); void sendVersion (); @@ -502,32 +516,37 @@ public: // // find the channel associated with a resource id // - casChannelI *resIdToChannel (const caResId &id); + casChannelI * resIdToChannel (const caResId &id); - virtual void clientHostName (char *pBuf, unsigned bufSize) const = 0; + virtual void hostName ( char *pBuf, unsigned bufSize ) const = 0; protected: - unsigned minor_version_number; - epicsTime lastSendTS; - epicsTime lastRecvTS; + inBuf in; + outBuf out; + unsigned minor_version_number; + unsigned incommingBytesToDrain; + epicsTime lastSendTS; + epicsTime lastRecvTS; - caStatus sendErrWithEpicsStatus(const caHdr *pMsg, - caStatus epicsStatus, caStatus clientStatus); + caStatus sendErrWithEpicsStatus ( const caHdrLargeArray *pMsg, + caStatus epicsStatus, caStatus clientStatus ); # define logBadId(MP, DP, CACSTAT, RESID) \ this->logBadIdWithFileAndLineno(MP, DP, CACSTAT, __FILE__, __LINE__, RESID) - caStatus logBadIdWithFileAndLineno(const caHdr *mp, + caStatus logBadIdWithFileAndLineno ( const caHdrLargeArray *mp, const void *dp, const int cacStat, const char *pFileName, - const unsigned lineno, const unsigned resId); + const unsigned lineno, const unsigned resId ); caStatus processMsg(); // // dump message to stderr // - void dumpMsg (const caHdr *mp, const void *dp, const char *pFormat, ...); + void dumpMsg ( const caHdrLargeArray *mp, const void *dp, const char *pFormat, ... ); + private: + typedef caStatus ( casClient::*pCASMsgHandler ) (); // // one function for each CA request type @@ -579,6 +598,8 @@ public: void show (unsigned level) const; + void flush (); + // // installChannel() // @@ -593,15 +614,15 @@ public: // one function for each CA request type that has // asynchronous completion // - virtual caStatus createChanResponse (const caHdr &, const pvAttachReturn &); - caStatus readResponse (casChannelI *pChan, const caHdr &msg, + virtual caStatus createChanResponse (const caHdrLargeArray &, const pvAttachReturn &); + caStatus readResponse (casChannelI *pChan, const caHdrLargeArray &msg, const smartConstGDDPointer &pDesc, const caStatus status); - caStatus readNotifyResponse (casChannelI *pChan, const caHdr &msg, + caStatus readNotifyResponse (casChannelI *pChan, const caHdrLargeArray &msg, const smartConstGDDPointer &pDesc, const caStatus status); - caStatus writeResponse (const caHdr &msg, const caStatus status); - caStatus writeNotifyResponse (const caHdr &msg, const caStatus status); - caStatus monitorResponse (casChannelI &chan, const caHdr &msg, - const smartConstGDDPointer &pDesc, const caStatus status); + caStatus writeResponse (const caHdrLargeArray &msg, const caStatus status); + caStatus writeNotifyResponse (const caHdrLargeArray &msg, const caStatus status); + caStatus monitorResponse ( casChannelI & chan, const caHdrLargeArray & msg, + const smartConstGDDPointer & pDesc, const caStatus status ); caStatus noReadAccessEvent(casClientMon *); @@ -615,7 +636,7 @@ public: unsigned getDebugLevel() const; - virtual void clientHostName (char *pBuf, unsigned bufSize) const = 0; + virtual void hostName (char *pBuf, unsigned bufSize) const = 0; protected: @@ -666,7 +687,7 @@ private: // // channelCreateFailed() // - caStatus channelCreateFailed (const caHdr *mp, caStatus createStatus); + caStatus channelCreateFailed ( const caHdrLargeArray *mp, caStatus createStatus ); caStatus writeArrayData(); caStatus writeScalarData(); @@ -675,22 +696,26 @@ private: // // io independent send/recv // - outBuf::flushCondition xSend (char *pBuf, bufSizeT nBytesAvailableToSend, - bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent); - inBuf::fillCondition xRecv (char *pBuf, bufSizeT nBytesToRecv, - fillParameter parm, bufSizeT &nByesRecv); + outBufClient::flushCondition xSend ( char * pBuf, bufSizeT nBytesAvailableToSend, + bufSizeT nBytesNeedToBeSent, bufSizeT & nBytesSent ); + inBufClient::fillCondition xRecv ( char * pBuf, bufSizeT nBytesToRecv, + inBufClient::fillParameter parm, bufSizeT & nByesRecv ); virtual xBlockingStatus blockingState() const = 0; - virtual outBuf::flushCondition osdSend (const char *pBuf, bufSizeT nBytesReq, - bufSizeT &nBytesActual) = 0; - virtual inBuf::fillCondition osdRecv (char *pBuf, bufSizeT nBytesReq, - bufSizeT &nBytesActual) = 0; + virtual outBufClient::flushCondition osdSend ( const char *pBuf, bufSizeT nBytesReq, + bufSizeT & nBytesActual ) = 0; + virtual inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq, + bufSizeT &nBytesActual ) = 0; - caStatus readNotifyResponseECA_XXX (casChannelI *pChan, - const caHdr &msg, const smartConstGDDPointer &pDesc, const caStatus ecaStatus); - caStatus writeNotifyResponseECA_XXX (const caHdr &msg, - const caStatus status); + caStatus readNotifyFailureResponse ( const caHdrLargeArray & msg, + const caStatus ECA_XXXX ); + + caStatus monitorFailureResponse ( const caHdrLargeArray & msg, + const caStatus ECA_XXXX ); + + caStatus writeNotifyResponseECA_XXX ( const caHdrLargeArray &msg, + const caStatus status ); casStrmClient ( const casStrmClient & ); casStrmClient & operator = ( const casStrmClient & ); @@ -712,7 +737,7 @@ public: // // only for use with DG io // - void sendBeacon (); + void sendBeacon ( ca_uint32_t beaconNumber ); virtual void sendBeaconIO (char &msg, bufSizeT length, aitUint16 &portField, aitUint32 &addrField) = 0; @@ -721,7 +746,7 @@ public: unsigned getDebugLevel() const; - void clientHostName (char *pBuf, unsigned bufSize) const; + void hostName (char *pBuf, unsigned bufSize) const; caNetAddr fetchLastRecvAddr () const; @@ -749,25 +774,25 @@ private: // // searchFailResponse() // - caStatus searchFailResponse (const caHdr *pMsg); + caStatus searchFailResponse (const caHdrLargeArray *pMsg); - caStatus searchResponse (const caHdr &, const pvExistReturn &); + caStatus searchResponse (const caHdrLargeArray &, const pvExistReturn &); - caStatus asyncSearchResponse (const caNetAddr &outAddr, - const caHdr &msg, const pvExistReturn &); + caStatus asyncSearchResponse ( const caNetAddr & outAddr, + const caHdrLargeArray & msg, const pvExistReturn & ); // // IO depen // - outBuf::flushCondition xSend (char *pBufIn, bufSizeT nBytesAvailableToSend, - bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent); - inBuf::fillCondition xRecv (char *pBufIn, bufSizeT nBytesToRecv, - fillParameter parm, bufSizeT &nByesRecv); + outBufClient::flushCondition xSend ( char *pBufIn, bufSizeT nBytesAvailableToSend, + bufSizeT nBytesNeedToBeSent, bufSizeT &nBytesSent ); + inBufClient::fillCondition xRecv ( char *pBufIn, bufSizeT nBytesToRecv, + fillParameter parm, bufSizeT &nByesRecv ); - virtual outBuf::flushCondition osdSend (const char *pBuf, bufSizeT nBytesReq, - const caNetAddr &addr) = 0; - virtual inBuf::fillCondition osdRecv (char *pBuf, bufSizeT nBytesReq, - fillParameter parm, bufSizeT &nBytesActual, caNetAddr &addr) = 0; + virtual outBufClient::flushCondition osdSend ( const char *pBuf, bufSizeT nBytesReq, + const caNetAddr & addr ) = 0; + virtual inBufClient::fillCondition osdRecv ( char *pBuf, bufSizeT nBytesReq, + fillParameter parm, bufSizeT &nBytesActual, caNetAddr & addr ) = 0; // // cadg @@ -915,6 +940,7 @@ private: unsigned debugLevel; unsigned nEventsProcessed; unsigned nEventsPosted; + ca_uint32_t beaconCounter; // // predefined event types diff --git a/src/cas/generic/st/casDGIntfOS.cc b/src/cas/generic/st/casDGIntfOS.cc index 90f19c293..2a0bb0c02 100644 --- a/src/cas/generic/st/casDGIntfOS.cc +++ b/src/cas/generic/st/casDGIntfOS.cc @@ -207,7 +207,7 @@ void casDGIOWakeup::show(unsigned level) const // void casDGIntfOS::armRecv() { - if (!this->inBuf::full()) { + if ( ! this->in.full () ) { if (!this->pRdReg) { this->pRdReg = new casDGReadReg(*this); if (!this->pRdReg) { @@ -243,7 +243,7 @@ void casDGIntfOS::disarmRecv() // void casDGIntfOS::armSend() { - if (this->outBuf::bytesPresent()==0u) { + if ( this->out.bytesPresent () == 0u ) { return; } @@ -291,7 +291,7 @@ void casDGIntfOS::eventFlush() // if there is nothing pending in the input // queue, then flush the output queue // - if (this->inBuf::bytesAvailable()==0u) { + if ( this->in.bytesAvailable() == 0u ) { this->armSend (); } } @@ -322,7 +322,7 @@ void casDGIntfOS::show(unsigned level) const // void casDGReadReg::callBack() { - this->os.recvCB (inBuf::fpNone); + this->os.recvCB ( inBufClient::fpNone ); } // @@ -348,7 +348,7 @@ void casDGReadReg::show(unsigned level) const // void casDGBCastReadReg::callBack() { - this->os.recvCB (inBuf::fpUseBroadcastInterface); + this->os.recvCB ( inBufClient::fpUseBroadcastInterface ); } // @@ -414,13 +414,13 @@ void casDGIntfOS::sendBlockSignal() // void casDGIntfOS::sendCB() { - flushCondition flushCond; + outBufClient::flushCondition flushCond; // // attempt to flush the output buffer // - flushCond = this->flush(); - if (flushCond==flushProgress) { + flushCond = this->out.flush(); + if ( flushCond == flushProgress ) { if (this->sendBlocked) { this->sendBlocked = false; } @@ -458,14 +458,14 @@ void casDGIntfOS::sendCB() // // casDGIntfOS::recvCB() // -void casDGIntfOS::recvCB (inBuf::fillParameter parm) +void casDGIntfOS::recvCB ( inBufClient::fillParameter parm ) { assert (this->pRdReg); // // copy in new messages // - this->inBuf::fill (parm); + this->in.fill ( parm ); this->processInput (); // @@ -478,7 +478,7 @@ void casDGIntfOS::recvCB (inBuf::fillParameter parm) // (casDGReadReg is _not_ a onceOnly fdReg - // therefore an explicit delete is required here) // - if (this->inBuf::full()) { + if ( this->in.full() ) { this->disarmRecv(); // this deletes the casDGReadReg object } } @@ -500,7 +500,7 @@ void casDGIntfOS::processInput() status!=S_casApp_postponeAsyncIO) { char pName[64u]; - this->clientHostName (pName, sizeof (pName)); + this->hostName (pName, sizeof (pName)); errPrintf (status, __FILE__, __LINE__, "unexpected problem with UDP input from \"%s\"", pName); } @@ -511,8 +511,8 @@ void casDGIntfOS::processInput() // input buffer then keep sending the output // buffer until it is empty // - if (this->outBuf::bytesPresent()>0u) { - if ( this->bytesAvailable () == 0 ) { + if ( this->out.bytesPresent() > 0u ) { + if ( this->in.bytesAvailable () == 0 ) { this->armSend (); } } diff --git a/src/cas/generic/st/casOSD.h b/src/cas/generic/st/casOSD.h index f9db1b3e2..6dbbde45f 100644 --- a/src/cas/generic/st/casOSD.h +++ b/src/cas/generic/st/casOSD.h @@ -94,12 +94,12 @@ public: void processInput(); private: - casDGIOWakeup ioWk; - casDGEvWakeup evWk; - casDGReadReg *pRdReg; - casDGBCastReadReg *pBCastRdReg; // fix for solaris bug - casDGWriteReg *pWtReg; - bool sendBlocked; + casDGIOWakeup ioWk; + casDGEvWakeup evWk; + casDGReadReg *pRdReg; + casDGBCastReadReg *pBCastRdReg; // fix for solaris bug + casDGWriteReg *pWtReg; + bool sendBlocked; void armRecv (); void armSend (); @@ -107,7 +107,7 @@ private: void disarmRecv (); void disarmSend (); - void recvCB (inBuf::fillParameter parm); + void recvCB ( inBufClient::fillParameter parm ); void sendCB (); void sendBlockSignal (); diff --git a/src/cas/generic/st/casStreamOS.cc b/src/cas/generic/st/casStreamOS.cc index 68e01635a..3560c70e4 100644 --- a/src/cas/generic/st/casStreamOS.cc +++ b/src/cas/generic/st/casStreamOS.cc @@ -244,11 +244,11 @@ void casStreamIOWakeup::start ( casStreamOS &os ) // inline void casStreamOS::armRecv() { - if (!this->pRdReg) { - if (!this->inBuf::full()) { - this->pRdReg = new casStreamReadReg(*this); - if (!this->pRdReg) { - errMessage(S_cas_noMemory, "armRecv()"); + if ( ! this->pRdReg ) { + if ( ! this->in.full() ) { + this->pRdReg = new casStreamReadReg ( *this ); + if ( ! this->pRdReg ) { + errMessage ( S_cas_noMemory, "armRecv()" ); throw S_cas_noMemory; } } @@ -270,11 +270,11 @@ inline void casStreamOS::disarmRecv() // inline void casStreamOS::armSend() { - if (this->outBuf::bytesPresent()==0u) { + if ( this->out.bytesPresent () == 0u ) { return; } - if (!this->pWtReg) { + if ( ! this->pWtReg ) { this->pWtReg = new casStreamWriteReg(*this); if (!this->pWtReg) { errMessage(S_cas_noMemory, "armSend() failed"); @@ -318,7 +318,7 @@ void casStreamOS::eventFlush() // if there is nothing pending in the input // queue, then flush the output queue // - if (this->inBuf::bytesAvailable()==0u) { + if ( this->in.bytesAvailable() == 0u ) { this->armSend (); } } @@ -396,16 +396,16 @@ void casStreamReadReg::callBack () // void casStreamOS::recvCB() { - inBuf::fillCondition fillCond; + inBufClient::fillCondition fillCond; casProcCond procCond; - assert (this->pRdReg); + assert ( this->pRdReg ); // // copy in new messages // - fillCond = this->fill(); - if (fillCond == casFillDisconnect) { + fillCond = this->in.fill(); + if ( fillCond == casFillDisconnect ) { delete this; } else { @@ -413,7 +413,7 @@ void casStreamOS::recvCB() if (procCond == casProcDisconnect) { delete this; } - else if (this->inBuf::full()) { + else if ( this->in.full() ) { // // If there isnt any space then temporarily // stop calling this routine until problem is resolved @@ -470,19 +470,19 @@ void casStreamWriteReg::callBack() // void casStreamOS::sendCB() { - outBuf::flushCondition flushCond; + outBufClient::flushCondition flushCond; casProcCond procCond; // // attempt to flush the output buffer // - flushCond = this->flush(); + flushCond = this->out.flush (); if (flushCond==flushProgress) { if (this->sendBlocked) { this->sendBlocked = FALSE; } } - else if (flushCond==outBuf::flushDisconnect) { + else if ( flushCond == outBufClient::flushDisconnect ) { // // ok to delete the client here // because casStreamWriteReg::callBack() @@ -552,8 +552,8 @@ void casStreamOS::sendCB() // additional bytes may have been added since // we flushed the out buffer // - if (this->outBuf::bytesPresent()>0u && - this->inBuf::bytesAvailable()==0u) { + if ( this->out.bytesPresent() > 0u && + this->in.bytesAvailable() == 0u ) { this->armSend(); } } @@ -586,7 +586,7 @@ casProcCond casStreamOS::processInput() // X aCC 361 // if there is nothing pending in the input // queue, then flush the output queue // - if (this->inBuf::bytesAvailable()==0u) { + if ( this->in.bytesAvailable() == 0u ) { this->armSend (); } this->armRecv ();