Merge remote-tracking branch 'cahuge/master'

* cahuge/master:
  replace caLimitArray with EPICS_CA_AUTO_MAX_ARRAY_BYTES
  ca: fix export caLimitArray on win32
  rsrv: export variable caLimitArray
  rsrv: optional max_array_bytes
  cas: simplify buffer alloc
  ca: large array free list is optional
  rsrv: support larger than max. array bytes
  pcas: support larger than max array bytes
  ca: support alloc larger than max array bytes
This commit is contained in:
Michael Davidsaver
2017-05-02 20:06:54 -04:00
15 changed files with 232 additions and 250 deletions
+1
View File
@@ -35,6 +35,7 @@ EPICS_CA_CONN_TMO=30.0
EPICS_CA_REPEATER_PORT=5065
EPICS_CA_SERVER_PORT=5064
EPICS_CA_MAX_ARRAY_BYTES=16384
EPICS_CA_AUTO_MAX_ARRAY_BYTES=NO
EPICS_CA_BEACON_PERIOD=15.0
EPICS_CA_MAX_SEARCH_PERIOD=300.0
EPICS_CAS_BEACON_PERIOD=
+14 -7
View File
@@ -32,6 +32,7 @@
#include "envDefs.h"
#include "locationException.h"
#include "errlog.h"
#include "epicsExport.h"
#define epicsExportSharedSymbols
#include "addrList.h"
@@ -218,9 +219,15 @@ cac::cac (
throw std::bad_alloc ();
}
freeListInitPvt ( &this->tcpLargeRecvBufFreeList, this->maxRecvBytesTCP, 1 );
if ( ! this->tcpLargeRecvBufFreeList ) {
throw std::bad_alloc ();
int caLimitArray;
if(envGetBoolConfigParam(&EPICS_CA_AUTO_MAX_ARRAY_BYTES, &caLimitArray))
caLimitArray = 0;
if(caLimitArray) {
freeListInitPvt ( &this->tcpLargeRecvBufFreeList, this->maxRecvBytesTCP, 1 );
if ( ! this->tcpLargeRecvBufFreeList ) {
throw std::bad_alloc ();
}
}
unsigned bufsPerArray = this->maxRecvBytesTCP / comBuf::capacityBytes ();
if ( bufsPerArray > 1u ) {
@@ -231,9 +238,7 @@ cac::cac (
catch ( ... ) {
osiSockRelease ();
delete [] this->pUserName;
if ( this->tcpSmallRecvBufFreeList ) {
freeListCleanup ( this->tcpSmallRecvBufFreeList );
}
freeListCleanup ( this->tcpSmallRecvBufFreeList );
if ( this->tcpLargeRecvBufFreeList ) {
freeListCleanup ( this->tcpLargeRecvBufFreeList );
}
@@ -318,7 +323,9 @@ cac::~cac ()
}
freeListCleanup ( this->tcpSmallRecvBufFreeList );
freeListCleanup ( this->tcpLargeRecvBufFreeList );
if ( this->tcpLargeRecvBufFreeList ) {
freeListCleanup ( this->tcpLargeRecvBufFreeList );
}
delete [] this->pUserName;
+3 -35
View File
@@ -51,6 +51,7 @@
class netWriteNotifyIO;
class netReadNotifyIO;
class netSubscription;
class tcpiiu;
// used to control access to cac's recycle routines which
// should only be indirectly invoked by CAC when its lock
@@ -193,12 +194,6 @@ public:
const char *pformat, va_list args ) const;
double connectionTimeout ( epicsGuard < epicsMutex > & );
// buffer management
char * allocateSmallBufferTCP ();
void releaseSmallBufferTCP ( char * );
unsigned largeBufferSizeTCP () const;
char * allocateLargeBufferTCP ();
void releaseLargeBufferTCP ( char * );
unsigned maxContiguousFrames ( epicsGuard < epicsMutex > & ) const;
// misc
@@ -355,6 +350,8 @@ private:
cac ( const cac & );
cac & operator = ( const cac & );
friend class tcpiiu;
};
inline const char * cac::userNamePointer () const
@@ -385,35 +382,6 @@ inline void cac::attachToClientCtx ()
this->notify.attachToClientCtx ();
}
inline char * cac::allocateSmallBufferTCP ()
{
// this locks internally
return ( char * ) freeListMalloc ( this->tcpSmallRecvBufFreeList );
}
inline void cac::releaseSmallBufferTCP ( char *pBuf )
{
// this locks internally
freeListFree ( this->tcpSmallRecvBufFreeList, pBuf );
}
inline unsigned cac::largeBufferSizeTCP () const
{
return this->maxRecvBytesTCP;
}
inline char * cac::allocateLargeBufferTCP ()
{
// this locks internally
return ( char * ) freeListMalloc ( this->tcpLargeRecvBufFreeList );
}
inline void cac::releaseLargeBufferTCP ( char *pBuf )
{
// this locks internally
freeListFree ( this->tcpLargeRecvBufFreeList, pBuf );
}
inline unsigned cac::beaconAnomaliesSinceProgramStart (
epicsGuard < epicsMutex > & guard ) const
{
+55 -18
View File
@@ -26,6 +26,9 @@
#include <stdexcept>
#include <string>
#include <stdlib.h>
#include "errlog.h"
#define epicsExportSharedSymbols
@@ -687,7 +690,7 @@ tcpiiu::tcpiiu (
curDataBytes ( 0ul ),
comBufMemMgr ( comBufMemMgrIn ),
cacRef ( cac ),
pCurData ( cac.allocateSmallBufferTCP () ),
pCurData ( (char*) freeListMalloc(this->cacRef.tcpSmallRecvBufFreeList) ),
pSearchDest ( pSearchDestIn ),
mutex ( mutexIn ),
cbMutex ( cbMutexIn ),
@@ -711,9 +714,12 @@ tcpiiu::tcpiiu (
socketHasBeenClosed ( false ),
unresponsiveCircuit ( false )
{
if(!pCurData)
throw std::bad_alloc();
this->sock = epicsSocketCreate ( AF_INET, SOCK_STREAM, IPPROTO_TCP );
if ( this->sock == INVALID_SOCKET ) {
cac.releaseSmallBufferTCP ( this->pCurData );
freeListFree(this->cacRef.tcpSmallRecvBufFreeList, this->pCurData);
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
@@ -1023,11 +1029,14 @@ tcpiiu :: ~tcpiiu ()
// free message body cache
if ( this->pCurData ) {
if ( this->curDataMax == MAX_TCP ) {
this->cacRef.releaseSmallBufferTCP ( this->pCurData );
if ( this->curDataMax <= MAX_TCP ) {
freeListFree(this->cacRef.tcpSmallRecvBufFreeList, this->pCurData);
}
else if ( this->cacRef.tcpLargeRecvBufFreeList ) {
freeListFree(this->cacRef.tcpLargeRecvBufFreeList, this->pCurData);
}
else {
this->cacRef.releaseLargeBufferTCP ( this->pCurData );
free ( this->pCurData );
}
}
}
@@ -1197,18 +1206,46 @@ bool tcpiiu::processIncoming (
// make sure we have a large enough message body cache
//
if ( this->curMsg.m_postsize > this->curDataMax ) {
if ( this->curDataMax == MAX_TCP &&
this->cacRef.largeBufferSizeTCP() >= this->curMsg.m_postsize ) {
char * pBuf = this->cacRef.allocateLargeBufferTCP ();
if ( pBuf ) {
this->cacRef.releaseSmallBufferTCP ( this->pCurData );
this->pCurData = pBuf;
this->curDataMax = this->cacRef.largeBufferSizeTCP ();
assert (this->curMsg.m_postsize > MAX_TCP);
char * newbuf = NULL;
arrayElementCount newsize;
if ( !this->cacRef.tcpLargeRecvBufFreeList ) {
// round size up to multiple of 4K
newsize = ((this->curMsg.m_postsize-1)|0xfff)+1;
if ( this->curDataMax <= MAX_TCP ) {
// small -> large
newbuf = (char*)malloc(newsize);
} else {
// expand large to larger
newbuf = (char*)realloc(this->pCurData, newsize);
}
else {
this->printFormated ( mgr.cbGuard,
"CAC: not enough memory for message body cache (ignoring response message)\n");
} else if ( this->curMsg.m_postsize <= this->cacRef.maxRecvBytesTCP ) {
newbuf = (char*) freeListMalloc(this->cacRef.tcpLargeRecvBufFreeList);
newsize = this->cacRef.maxRecvBytesTCP;
}
if ( newbuf) {
if (this->curDataMax <= MAX_TCP) {
freeListFree(this->cacRef.tcpSmallRecvBufFreeList, this->pCurData );
} else if (this->cacRef.tcpLargeRecvBufFreeList) {
freeListFree(this->cacRef.tcpLargeRecvBufFreeList, this->pCurData );
} else {
// called realloc()
}
this->pCurData = newbuf;
this->curDataMax = newsize;
} else {
this->printFormated ( mgr.cbGuard,
"CAC: not enough memory for message body cache (ignoring response message)\n");
}
}
@@ -1426,7 +1463,7 @@ void tcpiiu::readNotifyRequest ( epicsGuard < epicsMutex > & guard,
}
arrayElementCount maxBytes;
if ( CA_V49 ( this->minorProtocolVersion ) ) {
maxBytes = this->cacRef.largeBufferSizeTCP ();
maxBytes = 0xfffffff0;
}
else {
maxBytes = MAX_TCP;
@@ -1537,7 +1574,7 @@ void tcpiiu::subscriptionRequest (
guard, CA_V413(this->minorProtocolVersion) );
arrayElementCount maxBytes;
if ( CA_V49 ( this->minorProtocolVersion ) ) {
maxBytes = this->cacRef.largeBufferSizeTCP ();
maxBytes = 0xfffffff0;
}
else {
maxBytes = MAX_TCP;
@@ -1584,7 +1621,7 @@ void tcpiiu::subscriptionUpdateRequest (
guard, CA_V413(this->minorProtocolVersion) );
arrayElementCount maxBytes;
if ( CA_V49 ( this->minorProtocolVersion ) ) {
maxBytes = this->cacRef.largeBufferSizeTCP ();
maxBytes = 0xfffffff0;
}
else {
maxBytes = MAX_TCP;
-1
View File
@@ -53,7 +53,6 @@ LIBSRCS += outBuf.cc
LIBSRCS += casCtx.cc
LIBSRCS += casEventMask.cc
LIBSRCS += ioBlocked.cc
LIBSRCS += casBufferFactory.cc
LIBSRCS += pvExistReturn.cc
LIBSRCS += pvAttachReturn.cc
LIBSRCS += caNetAddr.cc
@@ -1,104 +0,0 @@
/*************************************************************************\
* Copyright (c) 2002 The University of Chicago, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* EPICS BASE Versions 3.13.7
* and higher are distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
/*
* Author Jeffrey O. Hill
* johill@lanl.gov
* 505 665 1831
*/
#include <new>
#include "envDefs.h"
#include "freeList.h"
#include "errlog.h"
#define epicsExportSharedSymbols
#include "clientBufMemoryManager.h"
#include "caProto.h"
casBufferFactory::casBufferFactory () :
smallBufFreeList ( 0 ), largeBufFreeList ( 0 ), largeBufferSizePriv ( 0u )
{
long maxBytesAsALong;
long status = envGetLongConfigParam ( & EPICS_CA_MAX_ARRAY_BYTES, & maxBytesAsALong );
if ( status || maxBytesAsALong < 0 ) {
errlogPrintf ( "cas: EPICS_CA_MAX_ARRAY_BYTES was not a positive integer\n" );
this->largeBufferSizePriv = MAX_TCP;
}
else {
/* allow room for the protocol header so that they get the array size they requested */
static const unsigned headerSize = sizeof ( caHdr ) + 2 * sizeof ( ca_uint32_t );
ca_uint32_t maxBytes = ( unsigned ) maxBytesAsALong;
if ( maxBytes < 0xffffffff - headerSize ) {
maxBytes += headerSize;
}
else {
maxBytes = 0xffffffff;
}
if ( maxBytes < MAX_TCP ) {
errlogPrintf ( "cas: EPICS_CA_MAX_ARRAY_BYTES was rounded up to %u\n", MAX_TCP );
this->largeBufferSizePriv = MAX_TCP;
}
else {
this->largeBufferSizePriv = maxBytes;
}
}
freeListInitPvt ( & this->smallBufFreeList, MAX_MSG_SIZE, 8 );
freeListInitPvt ( & this->largeBufFreeList, this->largeBufferSizePriv, 1 );
}
casBufferFactory::~casBufferFactory ()
{
freeListCleanup ( this->smallBufFreeList );
freeListCleanup ( this->largeBufFreeList );
}
unsigned casBufferFactory::smallBufferSize () const
{
return MAX_MSG_SIZE;
}
char * casBufferFactory::newSmallBuffer ()
{
void * pBuf = freeListCalloc ( this->smallBufFreeList );
if ( ! pBuf ) {
throw std::bad_alloc();
}
return static_cast < char * > ( pBuf );
}
void casBufferFactory::destroySmallBuffer ( char * pBuf )
{
if ( pBuf ) {
freeListFree ( this->smallBufFreeList, pBuf );
}
}
unsigned casBufferFactory::largeBufferSize () const
{
return this->largeBufferSizePriv;
}
char * casBufferFactory::newLargeBuffer ()
{
void * pBuf = freeListCalloc ( this->largeBufFreeList );
if ( ! pBuf ) {
throw std::bad_alloc();
}
return static_cast < char * > ( pBuf );
}
void casBufferFactory::destroyLargeBuffer ( char * pBuf )
{
if ( pBuf ) {
freeListFree ( this->largeBufFreeList, pBuf );
}
}
+1 -1
View File
@@ -205,7 +205,7 @@ caStatus casStrmClient :: processMsg ()
if ( bytesLeft < msgSize ) {
status = S_cas_success;
if ( msgSize > this->in.bufferSize() ) {
this->in.expandBuffer ();
this->in.expandBuffer (msgSize);
// msg to large - set up message drain
if ( msgSize > this->in.bufferSize() ) {
caServerI::dumpMsg ( this->pHostName, this->pUserName, & msgTmp, 0,
@@ -13,42 +13,54 @@
* 505 665 1831
*/
#include <new>
#include <stdlib.h>
#include "epicsAssert.h"
#include "freeList.h"
#define epicsExportSharedSymbols
#include "clientBufMemoryManager.h"
#include "caProto.h"
bufSizeT clientBufMemoryManager::maxSize () const
clientBufMemoryManager::clientBufMemoryManager()
:smallBufFreeList ( 0 )
{
return bufferFactory.largeBufferSize ();
freeListInitPvt ( & this->smallBufFreeList, MAX_MSG_SIZE, 8 );
}
clientBufMemoryManager::~clientBufMemoryManager()
{
freeListCleanup ( this->smallBufFreeList );
}
casBufferParm clientBufMemoryManager::allocate ( bufSizeT newMinSize )
{
casBufferParm parm;
if ( newMinSize <= bufferFactory.smallBufferSize () ) {
parm.pBuf = bufferFactory.newSmallBuffer ();
parm.bufSize = bufferFactory.smallBufferSize ();
}
else if ( newMinSize <= bufferFactory.largeBufferSize () ) {
parm.pBuf = bufferFactory.newLargeBuffer ();
parm.bufSize = bufferFactory.largeBufferSize ();
if ( newMinSize <= MAX_MSG_SIZE ) {
parm.pBuf = (char*)freeListMalloc(this->smallBufFreeList);
parm.bufSize = MAX_MSG_SIZE;
}
else {
parm.pBuf = static_cast < char * > ( ::operator new ( newMinSize ) );
// round size up to multiple of 4K
newMinSize = ((newMinSize-1)|0xfff)+1;
parm.pBuf = (char*)malloc(newMinSize);
parm.bufSize = newMinSize;
}
if(!parm.pBuf)
throw std::bad_alloc();
return parm;
}
void clientBufMemoryManager::release ( char * pBuf, bufSizeT bufSize )
{
if ( bufSize == bufferFactory.smallBufferSize () ) {
bufferFactory.destroySmallBuffer ( pBuf );
}
else if ( bufSize == bufferFactory.largeBufferSize () ) {
bufferFactory.destroyLargeBuffer ( pBuf );
assert(pBuf);
if (bufSize <= MAX_MSG_SIZE) {
freeListFree(this->smallBufFreeList, pBuf);
}
else {
::operator delete ( pBuf );
free(pBuf);
}
}
@@ -16,22 +16,6 @@
typedef unsigned bufSizeT;
static const unsigned bufSizeT_MAX = UINT_MAX;
class casBufferFactory {
public:
casBufferFactory ();
~casBufferFactory ();
unsigned smallBufferSize () const;
char * newSmallBuffer ();
void destroySmallBuffer ( char * pBuf );
unsigned largeBufferSize () const;
char * newLargeBuffer ();
void destroyLargeBuffer ( char * pBuf );
private:
void * smallBufFreeList;
void * largeBufFreeList;
unsigned largeBufferSizePriv;
};
struct casBufferParm {
char * pBuf;
bufSizeT bufSize;
@@ -39,11 +23,15 @@ struct casBufferParm {
class clientBufMemoryManager {
public:
clientBufMemoryManager();
~clientBufMemoryManager();
//! @throws std::bad_alloc on failure
casBufferParm allocate ( bufSizeT newMinSize );
void release ( char * pBuf, bufSizeT bufSize );
bufSizeT maxSize () const;
private:
casBufferFactory bufferFactory;
void * smallBufFreeList;
};
#endif // clientBufMemoryManagerh
+13 -5
View File
@@ -13,6 +13,8 @@
* 505 665 1831
*/
#include <stdexcept>
#include <stdio.h>
#include <string.h>
@@ -155,11 +157,17 @@ bufSizeT inBuf::popCtx ( const inBufCtx &ctx )
}
}
void inBuf::expandBuffer ()
void inBuf::expandBuffer (bufSizeT needed)
{
bufSizeT max = this->memMgr.maxSize();
if ( this->bufSize < max ) {
casBufferParm bufParm = this->memMgr.allocate ( max );
if (needed > bufSize) {
casBufferParm bufParm;
try {
bufParm = this->memMgr.allocate ( needed );
} catch (std::bad_alloc& e) {
// caller must check that buffer size has expended
return;
}
bufSizeT unprocessedBytes = this->bytesPresent ();
memcpy ( bufParm.pBuf, &this->pBuf[this->nextReadIndex], unprocessedBytes );
this->bytesInBuffer = unprocessedBytes;
@@ -170,7 +178,7 @@ void inBuf::expandBuffer ()
}
}
unsigned inBuf::bufferSize () const
bufSizeT inBuf::bufferSize() const
{
return this->bufSize;
}
+2 -2
View File
@@ -82,8 +82,8 @@ public:
//
const inBufCtx pushCtx ( bufSizeT headerSize, bufSizeT bodySize );
bufSizeT popCtx ( const inBufCtx & ); // returns actual size
unsigned bufferSize () const;
void expandBuffer ();
bufSizeT bufferSize () const;
void expandBuffer (bufSizeT needed);
private:
class inBufClient & client;
class clientBufMemoryManager & memMgr;
+11 -5
View File
@@ -59,7 +59,7 @@ caStatus outBuf::allocRawMsg ( bufSizeT msgsize, void **ppMsg )
msgsize = CA_MESSAGE_ALIGN ( msgsize );
if ( msgsize > this->bufSize ) {
this->expandBuffer ();
this->expandBuffer (msgsize);
if ( msgsize > this->bufSize ) {
return S_cas_hugeRequest;
}
@@ -316,11 +316,17 @@ void outBuf::show (unsigned level) const
}
}
void outBuf::expandBuffer ()
void outBuf::expandBuffer (bufSizeT needed)
{
bufSizeT max = this->memMgr.maxSize();
if ( this->bufSize < max ) {
casBufferParm bufParm = this->memMgr.allocate ( max );
if (needed > bufSize) {
casBufferParm bufParm;
try {
bufParm = this->memMgr.allocate ( needed );
} catch (std::bad_alloc& e) {
// caller must check that buffer size has expended
return;
}
memcpy ( bufParm.pBuf, this->pBuf, this->stack );
this->memMgr.release ( this->pBuf, this->bufSize );
this->pBuf = bufParm.pBuf;
+1 -1
View File
@@ -122,7 +122,7 @@ private:
bufSizeT stack;
unsigned ctxRecursCount;
void expandBuffer ();
void expandBuffer (bufSizeT needed);
outBuf ( const outBuf & );
outBuf & operator = ( const outBuf & );
+96 -37
View File
@@ -34,6 +34,8 @@
#include "taskwd.h"
#include "cantProceed.h"
#include "epicsExport.h"
#define epicsExportSharedSymbols
#include "dbChannel.h"
#include "dbCommon.h"
@@ -467,6 +469,7 @@ int rsrv_init (void)
long maxBytesAsALong;
long status;
SOCKET *socks;
int caLimitArray;
clientQlock = epicsMutexMustCreate();
@@ -525,7 +528,14 @@ int rsrv_init (void)
rsrvSizeofLargeBufTCP = maxBytes;
}
}
freeListInitPvt ( &rsrvLargeBufFreeListTCP, rsrvSizeofLargeBufTCP, 1 );
if(envGetBoolConfigParam(&EPICS_CA_AUTO_MAX_ARRAY_BYTES, &caLimitArray))
caLimitArray = 0;
if (caLimitArray)
freeListInitPvt ( &rsrvLargeBufFreeListTCP, rsrvSizeofLargeBufTCP, 1 );
else
rsrvLargeBufFreeListTCP = NULL;
pCaBucket = bucketCreate(CAS_HASH_TABLE_SIZE);
if (!pCaBucket)
cantProceed("RSRV failed to allocate ID lookup table\n");
@@ -1012,8 +1022,10 @@ void casr (unsigned level)
freeListItemsAvail (rsrvEventFreeList);
bytes_reserved += MAX_TCP *
freeListItemsAvail ( rsrvSmallBufFreeListTCP );
bytes_reserved += rsrvSizeofLargeBufTCP *
freeListItemsAvail ( rsrvLargeBufFreeListTCP );
if(rsrvLargeBufFreeListTCP) {
bytes_reserved += rsrvSizeofLargeBufTCP *
freeListItemsAvail ( rsrvLargeBufFreeListTCP );
}
bytes_reserved += rsrvSizeOfPutNotify ( 0 ) *
freeListItemsAvail ( rsrvPutNotifyFreeList );
printf( "Free-lists total %u bytes, comprising\n",
@@ -1026,7 +1038,7 @@ void casr (unsigned level)
printf( " %u small (%u byte) buffers, %u jumbo (%u byte) buffers\n",
(unsigned int) freeListItemsAvail ( rsrvSmallBufFreeListTCP ),
MAX_TCP,
(unsigned int) freeListItemsAvail ( rsrvLargeBufFreeListTCP ),
(unsigned int)(rsrvLargeBufFreeListTCP ? freeListItemsAvail ( rsrvLargeBufFreeListTCP ) : -1),
rsrvSizeofLargeBufTCP );
printf( "Server resource id table:\n");
LOCK_CLIENTQ;
@@ -1058,7 +1070,10 @@ void destroy_client ( struct client *client )
freeListFree ( rsrvSmallBufFreeListTCP, client->send.buf );
}
else if ( client->send.type == mbtLargeTCP ) {
freeListFree ( rsrvLargeBufFreeListTCP, client->send.buf );
if(rsrvLargeBufFreeListTCP)
freeListFree ( rsrvLargeBufFreeListTCP, client->send.buf );
else
free(client->send.buf);
}
else {
errlogPrintf ( "CAS: Corrupt send buffer free list type code=%u during client cleanup?\n",
@@ -1070,7 +1085,10 @@ void destroy_client ( struct client *client )
freeListFree ( rsrvSmallBufFreeListTCP, client->recv.buf );
}
else if ( client->recv.type == mbtLargeTCP ) {
freeListFree ( rsrvLargeBufFreeListTCP, client->recv.buf );
if(rsrvLargeBufFreeListTCP)
freeListFree ( rsrvLargeBufFreeListTCP, client->recv.buf );
else
free(client->recv.buf);
}
else {
errlogPrintf ( "CAS: Corrupt recv buffer free list type code=%u during client cleanup?\n",
@@ -1301,43 +1319,84 @@ void casAttachThreadToClient ( struct client *pClient )
taskwdInsert ( pClient->tid, NULL, NULL );
}
static
void casExpandBuffer ( struct message_buffer *buf, ca_uint32_t size, int sendbuf )
{
char *newbuf = NULL;
unsigned newsize;
enum messageBufferType newtype;
assert (size > MAX_TCP);
if ( size <= buf->maxstk || buf->type == mbtUDP ) return;
/* try to alloc new buffer */
if (size <= MAX_TCP) {
return; /* shouldn't happen */
} else if(!rsrvLargeBufFreeListTCP) {
// round up to multiple of 4K
size = ((size-1)|0xfff)+1;
if (buf->type==mbtLargeTCP)
newbuf = realloc (buf->buf, size);
else
newbuf = malloc (size);
newtype = mbtLargeTCP;
newsize = size;
} else if (size <= rsrvSizeofLargeBufTCP) {
newbuf = freeListCalloc ( rsrvLargeBufFreeListTCP );
newsize = rsrvSizeofLargeBufTCP;
newtype = mbtLargeTCP;
}
if (newbuf) {
/* copy existing buffer */
if (sendbuf) {
/* send buffer uses [0, stk) */
if (!rsrvLargeBufFreeListTCP && buf->type==mbtLargeTCP) {
/* realloc already copied */
} else {
memcpy ( newbuf, buf->buf, buf->stk );
}
} else {
/* recv buffer uses [stk, cnt) */
unsigned used;
assert ( buf->cnt >= buf->stk );
used = buf->cnt - buf->stk;
/* buf->buf may be the same as newbuf if realloc() used */
memmove ( newbuf, &buf->buf[buf->stk], used );
buf->cnt = used;
buf->stk = 0;
}
/* free existing buffer */
if(buf->type==mbtSmallTCP) {
freeListFree ( rsrvSmallBufFreeListTCP, buf->buf );
} else if(buf->type==mbtLargeTCP) {
freeListFree ( rsrvLargeBufFreeListTCP, buf->buf );
} else {
/* realloc() already free()'d if necessary */
}
buf->buf = newbuf;
buf->type = newtype;
buf->maxstk = newsize;
}
}
void casExpandSendBuffer ( struct client *pClient, ca_uint32_t size )
{
if ( pClient->send.type == mbtSmallTCP && rsrvSizeofLargeBufTCP > MAX_TCP
&& size <= rsrvSizeofLargeBufTCP ) {
int spaceAvailOnFreeList = freeListItemsAvail ( rsrvLargeBufFreeListTCP ) > 0;
if ( osiSufficentSpaceInPool(rsrvSizeofLargeBufTCP) || spaceAvailOnFreeList ) {
char *pNewBuf = ( char * ) freeListCalloc ( rsrvLargeBufFreeListTCP );
if ( pNewBuf ) {
memcpy ( pNewBuf, pClient->send.buf, pClient->send.stk );
freeListFree ( rsrvSmallBufFreeListTCP, pClient->send.buf );
pClient->send.buf = pNewBuf;
pClient->send.maxstk = rsrvSizeofLargeBufTCP;
pClient->send.type = mbtLargeTCP;
}
}
}
casExpandBuffer (&pClient->send, size, 1);
}
void casExpandRecvBuffer ( struct client *pClient, ca_uint32_t size )
{
if ( pClient->recv.type == mbtSmallTCP && rsrvSizeofLargeBufTCP > MAX_TCP
&& size <= rsrvSizeofLargeBufTCP) {
int spaceAvailOnFreeList = freeListItemsAvail ( rsrvLargeBufFreeListTCP ) > 0;
if ( osiSufficentSpaceInPool(rsrvSizeofLargeBufTCP) || spaceAvailOnFreeList ) {
char *pNewBuf = ( char * ) freeListCalloc ( rsrvLargeBufFreeListTCP );
if ( pNewBuf ) {
assert ( pClient->recv.cnt >= pClient->recv.stk );
memcpy ( pNewBuf, &pClient->recv.buf[pClient->recv.stk], pClient->recv.cnt - pClient->recv.stk );
freeListFree ( rsrvSmallBufFreeListTCP, pClient->recv.buf );
pClient->recv.buf = pNewBuf;
pClient->recv.cnt = pClient->recv.cnt - pClient->recv.stk;
pClient->recv.stk = 0u;
pClient->recv.maxstk = rsrvSizeofLargeBufTCP;
pClient->recv.type = mbtLargeTCP;
}
}
}
casExpandBuffer (&pClient->recv, size, 0);
}
/*
+1
View File
@@ -48,6 +48,7 @@ epicsShareExtern const ENV_PARAM EPICS_CA_AUTO_ADDR_LIST;
epicsShareExtern const ENV_PARAM EPICS_CA_REPEATER_PORT;
epicsShareExtern const ENV_PARAM EPICS_CA_SERVER_PORT;
epicsShareExtern const ENV_PARAM EPICS_CA_MAX_ARRAY_BYTES;
epicsShareExtern const ENV_PARAM EPICS_CA_AUTO_MAX_ARRAY_BYTES;
epicsShareExtern const ENV_PARAM EPICS_CA_MAX_SEARCH_PERIOD;
epicsShareExtern const ENV_PARAM EPICS_CA_NAME_SERVERS;
epicsShareExtern const ENV_PARAM EPICS_CAS_INTF_ADDR_LIST;