many, many changes
This commit is contained in:
@@ -13,50 +13,40 @@
|
||||
* Author Jeffrey O. Hill
|
||||
* johill@lanl.gov
|
||||
* 505 665 1831
|
||||
*
|
||||
* Notes:
|
||||
* 1) when rdix is equal to wtix it indicates that the entire buffer is
|
||||
* available to be read, and therefore nothing can be written.
|
||||
* 2) the byte at index rdix + 1 is the next byte to read.
|
||||
* 3) the byte at index wtix is the next byte to write.
|
||||
*/
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#include "ringBuffer.h"
|
||||
|
||||
static const unsigned ringIndexMask = nElementsInRing-1;
|
||||
static const unsigned ringIndexMask = nElementsInRing - 1;
|
||||
|
||||
/*
|
||||
* cacRingBufferConstruct ()
|
||||
*/
|
||||
int cacRingBufferConstruct (ringBuffer *pBuf)
|
||||
bool cacRingBufferConstruct ( ringBuffer *pBuf )
|
||||
{
|
||||
pBuf->shutDown = 0u;
|
||||
pBuf->rdix = 0u;
|
||||
pBuf->wtix = 1u;
|
||||
|
||||
pBuf->readSignal = semBinaryCreate (semEmpty);
|
||||
if (!pBuf->readSignal) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pBuf->writeSignal = semBinaryCreate (semEmpty);
|
||||
if (!pBuf->writeSignal) {
|
||||
semBinaryDestroy (pBuf->readSignal);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pBuf->readLock = semMutexCreate ();
|
||||
if (!pBuf->readLock) {
|
||||
semBinaryDestroy (pBuf->readSignal);
|
||||
semBinaryDestroy (pBuf->writeSignal);
|
||||
return -1;
|
||||
if ( ! pBuf->readLock ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
pBuf->writeLock = semMutexCreate ();
|
||||
if (!pBuf->writeLock) {
|
||||
semBinaryDestroy (pBuf->readSignal);
|
||||
semBinaryDestroy (pBuf->writeSignal);
|
||||
semMutexDestroy (pBuf->readLock);
|
||||
return -1;
|
||||
if ( ! pBuf->writeLock ) {
|
||||
semMutexDestroy ( pBuf->readLock );
|
||||
return false;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -64,35 +54,23 @@ int cacRingBufferConstruct (ringBuffer *pBuf)
|
||||
*/
|
||||
void cacRingBufferDestroy (ringBuffer *pBuf)
|
||||
{
|
||||
semBinaryDestroy (pBuf->readSignal);
|
||||
semBinaryDestroy (pBuf->writeSignal);
|
||||
semMutexDestroy (pBuf->readLock);
|
||||
semMutexDestroy (pBuf->writeLock);
|
||||
}
|
||||
|
||||
/*
|
||||
* cacRingBufferShutDown ();
|
||||
*/
|
||||
void cacRingBufferShutDown (ringBuffer *pBuf)
|
||||
{
|
||||
pBuf->shutDown = 1u;
|
||||
semBinaryGive (pBuf->readSignal);
|
||||
semBinaryGive (pBuf->writeSignal);
|
||||
semMutexDestroy ( pBuf->readLock );
|
||||
semMutexDestroy ( pBuf->writeLock );
|
||||
}
|
||||
|
||||
/*
|
||||
* cacRingBufferReadSize ()
|
||||
*/
|
||||
static inline unsigned cacRingBufferReadSize (ringBuffer *pBuf)
|
||||
static inline unsigned cacRingBufferReadSize ( ringBuffer *pBuf )
|
||||
{
|
||||
unsigned long count;
|
||||
|
||||
if ( pBuf->wtix <= pBuf->rdix ) {
|
||||
static const unsigned bufSizeM1 = sizeof (pBuf->buf) - 1u;
|
||||
static const unsigned bufSizeM1 = sizeof ( pBuf->buf ) - 1u;
|
||||
count = ( bufSizeM1 - pBuf->rdix ) + pBuf->wtix;
|
||||
}
|
||||
else {
|
||||
count = (pBuf->wtix - pBuf->rdix) - 1u;
|
||||
count = ( pBuf->wtix - pBuf->rdix ) - 1u;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
@@ -102,11 +80,11 @@ static inline unsigned cacRingBufferReadSize (ringBuffer *pBuf)
|
||||
*/
|
||||
static inline unsigned cacRingBufferContiguousReadSize (ringBuffer *pBuf)
|
||||
{
|
||||
static const unsigned bufSizeM1 = sizeof (pBuf->buf) - 1u;
|
||||
unsigned long count;
|
||||
static const unsigned bufSizeM1 = sizeof ( pBuf->buf ) - 1u;
|
||||
unsigned long count;
|
||||
|
||||
if ( pBuf->wtix <= pBuf->rdix ) {
|
||||
if (pBuf->rdix==bufSizeM1) {
|
||||
if ( pBuf->rdix == bufSizeM1 ) {
|
||||
count = pBuf->wtix;
|
||||
}
|
||||
else {
|
||||
@@ -114,7 +92,7 @@ static inline unsigned cacRingBufferContiguousReadSize (ringBuffer *pBuf)
|
||||
}
|
||||
}
|
||||
else {
|
||||
count = (pBuf->wtix - pBuf->rdix) - 1u;
|
||||
count = ( pBuf->wtix - pBuf->rdix ) - 1u;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
@@ -122,11 +100,11 @@ static inline unsigned cacRingBufferContiguousReadSize (ringBuffer *pBuf)
|
||||
/*
|
||||
* cacRingBufferWriteSize ()
|
||||
*/
|
||||
static inline unsigned cacRingBufferWriteSize (ringBuffer *pBuf)
|
||||
static inline unsigned cacRingBufferWriteSize ( ringBuffer *pBuf )
|
||||
{
|
||||
unsigned long count;
|
||||
|
||||
if (pBuf->wtix <= pBuf->rdix) {
|
||||
if ( pBuf->wtix <= pBuf->rdix ) {
|
||||
count = pBuf->rdix - pBuf->wtix;
|
||||
}
|
||||
else {
|
||||
@@ -138,15 +116,15 @@ static inline unsigned cacRingBufferWriteSize (ringBuffer *pBuf)
|
||||
/*
|
||||
* cacRingBufferContiguousWriteSize ()
|
||||
*/
|
||||
static inline unsigned cacRingBufferContiguousWriteSize (ringBuffer *pBuf)
|
||||
static inline unsigned cacRingBufferContiguousWriteSize ( ringBuffer *pBuf )
|
||||
{
|
||||
unsigned long count;
|
||||
|
||||
if (pBuf->wtix <= pBuf->rdix) {
|
||||
if ( pBuf->wtix <= pBuf->rdix ) {
|
||||
count = pBuf->rdix - pBuf->wtix;
|
||||
}
|
||||
else {
|
||||
count = sizeof (pBuf->buf) - pBuf->wtix;
|
||||
count = sizeof ( pBuf->buf ) - pBuf->wtix;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
@@ -158,17 +136,23 @@ static inline unsigned cacRingBufferContiguousWriteSize (ringBuffer *pBuf)
|
||||
* returns the number of bytes read which may be less than
|
||||
* the number requested.
|
||||
*/
|
||||
static unsigned cacRingBufferReadPartial (ringBuffer *pRing, void *pBuf,
|
||||
unsigned nBytes)
|
||||
static unsigned cacRingBufferReadPartial ( ringBuffer *pRing, void *pBuf,
|
||||
unsigned nBytes )
|
||||
{
|
||||
unsigned totalBytes;
|
||||
|
||||
if ( pRing->wtix < pRing->rdix ) {
|
||||
static const unsigned bufSizeM1 = sizeof (pRing->buf) - 1u;
|
||||
if ( pRing->wtix <= pRing->rdix ) {
|
||||
static const unsigned bufSizeM1 = sizeof ( pRing->buf ) - 1u;
|
||||
unsigned nBytesAvail1stBlock, nBytesAvail2ndBlock;
|
||||
|
||||
nBytesAvail1stBlock = bufSizeM1 - pRing->rdix;
|
||||
nBytesAvail2ndBlock = pRing->wtix;
|
||||
if ( pRing->rdix == bufSizeM1 ) {
|
||||
nBytesAvail1stBlock = pRing->wtix;
|
||||
nBytesAvail2ndBlock = 0u;
|
||||
}
|
||||
else {
|
||||
nBytesAvail1stBlock = bufSizeM1 - pRing->rdix;
|
||||
nBytesAvail2ndBlock = pRing->wtix;
|
||||
}
|
||||
if ( nBytesAvail1stBlock >= nBytes ) {
|
||||
totalBytes = nBytes;
|
||||
memcpy ( pBuf, pRing->buf + pRing->rdix + 1u, totalBytes );
|
||||
@@ -186,53 +170,15 @@ static unsigned cacRingBufferReadPartial (ringBuffer *pRing, void *pBuf,
|
||||
pRing->rdix += totalBytes;
|
||||
pRing->rdix &= ringIndexMask;
|
||||
}
|
||||
else if ( pRing->wtix > pRing->rdix ) {
|
||||
totalBytes = (pRing->wtix - pRing->rdix) - 1;
|
||||
else {
|
||||
totalBytes = ( pRing->wtix - pRing->rdix ) - 1;
|
||||
if ( totalBytes > nBytes ) {
|
||||
totalBytes = nBytes;
|
||||
}
|
||||
memcpy (pBuf, pRing->buf+pRing->rdix+1, totalBytes);
|
||||
memcpy ( pBuf, pRing->buf+pRing->rdix+1, totalBytes );
|
||||
pRing->rdix += totalBytes;
|
||||
pRing->rdix &= ringIndexMask;
|
||||
}
|
||||
else {
|
||||
totalBytes = 0;
|
||||
}
|
||||
|
||||
return totalBytes;
|
||||
}
|
||||
|
||||
/*
|
||||
* cacRingBufferRead ()
|
||||
*
|
||||
* returns the number of bytes read which may be less than
|
||||
* the number requested.
|
||||
*/
|
||||
unsigned cacRingBufferRead (ringBuffer *pRing, void *pBuf,
|
||||
unsigned nBytes)
|
||||
{
|
||||
unsigned char *pBufTmp = (unsigned char *) pBuf;
|
||||
unsigned totalBytes = 0;
|
||||
unsigned curBytes;
|
||||
|
||||
semMutexMustTake (pRing->readLock);
|
||||
|
||||
while (totalBytes<nBytes) {
|
||||
curBytes = cacRingBufferReadPartial (pRing,
|
||||
pBufTmp+totalBytes, nBytes-totalBytes);
|
||||
if (curBytes==0) {
|
||||
semBinaryMustTake (pRing->readSignal);
|
||||
if (pRing->shutDown) {
|
||||
semMutexGive (pRing->readLock);
|
||||
return totalBytes;
|
||||
}
|
||||
}
|
||||
else {
|
||||
totalBytes += curBytes;
|
||||
}
|
||||
}
|
||||
|
||||
semMutexGive (pRing->readLock);
|
||||
|
||||
return totalBytes;
|
||||
}
|
||||
@@ -243,8 +189,8 @@ unsigned cacRingBufferRead (ringBuffer *pRing, void *pBuf,
|
||||
* returns the number of bytes written which may be less than
|
||||
* the number requested.
|
||||
*/
|
||||
static unsigned cacRingBufferWritePartial (ringBuffer *pRing,
|
||||
const void *pBuf, unsigned nBytes)
|
||||
static unsigned cacRingBufferWritePartial ( ringBuffer *pRing,
|
||||
const void *pBuf, unsigned nBytes )
|
||||
{
|
||||
unsigned totalBytes;
|
||||
|
||||
@@ -287,96 +233,37 @@ static unsigned cacRingBufferWritePartial (ringBuffer *pRing,
|
||||
return totalBytes;
|
||||
}
|
||||
|
||||
/*
|
||||
* cacRingBufferWrite ()
|
||||
*
|
||||
* returns the number of bytes written which may be less than
|
||||
* the number requested.
|
||||
*/
|
||||
unsigned cacRingBufferWrite (ringBuffer *pRing, const void *pBuf,
|
||||
unsigned nBytes)
|
||||
void cacRingBufferWriteLock ( ringBuffer *pBuf )
|
||||
{
|
||||
unsigned char *pBufTmp = (unsigned char *) pBuf;
|
||||
unsigned totalBytes = 0;
|
||||
unsigned curBytes;
|
||||
|
||||
semMutexMustTake ( pRing->writeLock );
|
||||
|
||||
while ( totalBytes < nBytes ) {
|
||||
curBytes = cacRingBufferWritePartial ( pRing,
|
||||
pBufTmp+totalBytes, nBytes-totalBytes );
|
||||
if ( curBytes == 0 ) {
|
||||
semBinaryGive ( pRing->readSignal );
|
||||
semBinaryMustTake ( pRing->writeSignal );
|
||||
if ( pRing->shutDown ) {
|
||||
semMutexGive ( pRing->writeLock );
|
||||
return totalBytes;
|
||||
}
|
||||
}
|
||||
else {
|
||||
totalBytes += curBytes;
|
||||
}
|
||||
}
|
||||
|
||||
semMutexGive ( pRing->writeLock );
|
||||
|
||||
return totalBytes;
|
||||
semMutexMustTake ( pBuf->writeLock );
|
||||
}
|
||||
|
||||
void cacRingBufferWriteLock (ringBuffer *pBuf)
|
||||
bool cacRingBufferWriteLockIfBytesAvailable ( ringBuffer *pBuf, unsigned bytesRequired )
|
||||
{
|
||||
semMutexMustTake (pBuf->writeLock);
|
||||
}
|
||||
|
||||
bool cacRingBufferWriteLockNoBlock (ringBuffer *pBuf, unsigned bytesRequired)
|
||||
{
|
||||
semMutexMustTake (pBuf->writeLock);
|
||||
semMutexMustTake ( pBuf->writeLock );
|
||||
if ( cacRingBufferWriteSize (pBuf) < bytesRequired ) {
|
||||
semMutexGive (pBuf->writeLock);
|
||||
semMutexGive ( pBuf->writeLock );
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void cacRingBufferWriteUnlock (ringBuffer *pBuf)
|
||||
void cacRingBufferWriteUnlock ( ringBuffer *pBuf )
|
||||
{
|
||||
semMutexGive (pBuf->writeLock);
|
||||
semMutexGive ( pBuf->writeLock );
|
||||
}
|
||||
|
||||
void *cacRingBufferWriteReserve (ringBuffer *pRing, unsigned *pBytesAvail)
|
||||
void *cacRingBufferWriteReserve ( ringBuffer *pRing, unsigned *pBytesAvail )
|
||||
{
|
||||
unsigned avail;
|
||||
|
||||
semMutexMustTake (pRing->writeLock);
|
||||
semMutexMustTake ( pRing->writeLock );
|
||||
|
||||
avail = cacRingBufferContiguousWriteSize (pRing);
|
||||
while (avail==0) {
|
||||
semBinaryGive (pRing->readSignal);
|
||||
semBinaryMustTake (pRing->writeSignal);
|
||||
if (pRing->shutDown) {
|
||||
semMutexGive (pRing->writeLock);
|
||||
*pBytesAvail = 0u;
|
||||
return 0;
|
||||
}
|
||||
avail = cacRingBufferContiguousWriteSize (pRing);
|
||||
}
|
||||
avail = cacRingBufferContiguousWriteSize ( pRing );
|
||||
|
||||
*pBytesAvail = avail;
|
||||
|
||||
return (void *) &pRing->buf[pRing->wtix];
|
||||
}
|
||||
|
||||
void *cacRingBufferWriteReserveNoBlock (ringBuffer *pRing, unsigned *pBytesAvail)
|
||||
{
|
||||
unsigned avail;
|
||||
|
||||
semMutexMustTake (pRing->writeLock);
|
||||
|
||||
avail = cacRingBufferContiguousWriteSize (pRing);
|
||||
|
||||
if ( avail==0 || pRing->shutDown ) {
|
||||
if ( avail == 0 ) {
|
||||
*pBytesAvail = 0u;
|
||||
semMutexGive (pRing->writeLock);
|
||||
semMutexGive ( pRing->writeLock );
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -385,76 +272,113 @@ void *cacRingBufferWriteReserveNoBlock (ringBuffer *pRing, unsigned *pBytesAvail
|
||||
return (void *) &pRing->buf[pRing->wtix];
|
||||
}
|
||||
|
||||
void cacRingBufferWriteCommit (ringBuffer *pRing, unsigned delta)
|
||||
void cacRingBufferWriteCommit ( ringBuffer *pRing, unsigned delta )
|
||||
{
|
||||
pRing->wtix += delta;
|
||||
pRing->wtix &= ringIndexMask;
|
||||
semMutexGive (pRing->writeLock);
|
||||
semMutexGive ( pRing->writeLock );
|
||||
}
|
||||
|
||||
void *cacRingBufferReadReserve (ringBuffer *pRing, unsigned *pBytesAvail)
|
||||
bool cacRingBufferWriteNoBlock ( ringBuffer *pBuf, const void *pMsg, unsigned bytesRequired )
|
||||
{
|
||||
unsigned avail;
|
||||
unsigned nBytesWritten;
|
||||
|
||||
semMutexMustTake (pRing->readLock);
|
||||
|
||||
avail = cacRingBufferContiguousReadSize (pRing);
|
||||
while (avail==0) {
|
||||
semBinaryMustTake (pRing->readSignal);
|
||||
if (pRing->shutDown) {
|
||||
semMutexGive (pRing->readLock);
|
||||
*pBytesAvail = 0u;
|
||||
return NULL;
|
||||
}
|
||||
avail = cacRingBufferContiguousReadSize (pRing);
|
||||
semMutexMustTake ( pBuf->writeLock );
|
||||
if ( cacRingBufferWriteSize ( pBuf ) < bytesRequired ) {
|
||||
semMutexGive ( pBuf->writeLock );
|
||||
return false;
|
||||
}
|
||||
|
||||
*pBytesAvail = avail;
|
||||
|
||||
return (void *) &pRing->buf[(pRing->rdix+1) & ringIndexMask];
|
||||
nBytesWritten = cacRingBufferWritePartial ( pBuf, pMsg, bytesRequired );
|
||||
semMutexGive ( pBuf->writeLock );
|
||||
return nBytesWritten == bytesRequired;
|
||||
}
|
||||
|
||||
void *cacRingBufferReadReserveNoBlock (ringBuffer *pRing, unsigned *pBytesAvail)
|
||||
bool cacRingBufferWriteMultipartMessageNoBlock ( ringBuffer *pBuf,
|
||||
const msgDescriptor *pMsgs, unsigned nMsgs )
|
||||
{
|
||||
unsigned i;
|
||||
unsigned totalBytes = 0u;
|
||||
unsigned nBytesWritten;
|
||||
|
||||
for ( i = 0u; i < nMsgs; i++ ) {
|
||||
totalBytes += pMsgs[i].length;
|
||||
}
|
||||
|
||||
semMutexMustTake ( pBuf->writeLock );
|
||||
|
||||
if ( cacRingBufferWriteSize ( pBuf ) < totalBytes ) {
|
||||
semMutexGive ( pBuf->writeLock );
|
||||
return false;
|
||||
}
|
||||
|
||||
for ( i = 0u; i < nMsgs; i++ ) {
|
||||
nBytesWritten = cacRingBufferWritePartial ( pBuf,
|
||||
pMsgs[i].pMsg, pMsgs[i].length );
|
||||
if ( nBytesWritten != pMsgs[i].length ) {
|
||||
semMutexGive ( pBuf->writeLock );
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
semMutexGive ( pBuf->writeLock );
|
||||
return true;
|
||||
}
|
||||
|
||||
unsigned cacRingBufferWrite ( ringBuffer *pBuf, const void *pMsg, unsigned nBytes )
|
||||
{
|
||||
unsigned nBytesWritten;
|
||||
|
||||
semMutexMustTake ( pBuf->writeLock );
|
||||
nBytesWritten = cacRingBufferWritePartial ( pBuf, pMsg, nBytes );
|
||||
semMutexGive ( pBuf->writeLock );
|
||||
|
||||
return nBytesWritten;
|
||||
}
|
||||
|
||||
void *cacRingBufferReadReserve ( ringBuffer *pRing, unsigned *pBytesAvail )
|
||||
{
|
||||
unsigned avail;
|
||||
|
||||
semMutexMustTake (pRing->readLock);
|
||||
semMutexMustTake ( pRing->readLock );
|
||||
|
||||
avail = cacRingBufferContiguousReadSize (pRing);
|
||||
avail = cacRingBufferContiguousReadSize ( pRing );
|
||||
|
||||
if ( avail==0 || pRing->shutDown ) {
|
||||
if ( avail == 0 ) {
|
||||
*pBytesAvail = 0u;
|
||||
semMutexGive (pRing->readLock);
|
||||
semMutexGive ( pRing->readLock );
|
||||
return NULL;
|
||||
}
|
||||
|
||||
*pBytesAvail = avail;
|
||||
|
||||
return (void *) &pRing->buf[(pRing->rdix+1) & ringIndexMask];
|
||||
return (void *) &pRing->buf[ ( pRing->rdix + 1 ) & ringIndexMask ];
|
||||
}
|
||||
|
||||
|
||||
void cacRingBufferReadCommit (ringBuffer *pRing, unsigned delta)
|
||||
void cacRingBufferReadCommit ( ringBuffer *pRing, unsigned delta )
|
||||
{
|
||||
pRing->rdix += delta;
|
||||
pRing->rdix &= ringIndexMask;
|
||||
semMutexGive (pRing->readLock);
|
||||
semMutexGive ( pRing->readLock );
|
||||
}
|
||||
|
||||
bool cacRingBufferWriteFlush (ringBuffer *pRing)
|
||||
bool cacRingBufferReadNoBlock ( ringBuffer *pBuf, void *pDest, unsigned nBytesRequired )
|
||||
{
|
||||
if ( cacRingBufferReadSize (pRing) ) {
|
||||
semBinaryGive (pRing->readSignal);
|
||||
return true;
|
||||
semMutexMustTake ( pBuf->readLock );
|
||||
unsigned available = cacRingBufferReadSize ( pBuf );
|
||||
if ( available < nBytesRequired) {
|
||||
semMutexGive ( pBuf->readLock );
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
char *pCurrent = static_cast <char *> ( pDest );
|
||||
unsigned totalBytes = cacRingBufferReadPartial ( pBuf, pCurrent, nBytesRequired );
|
||||
unsigned diff = nBytesRequired - totalBytes;
|
||||
if ( diff ) {
|
||||
totalBytes += cacRingBufferReadPartial ( pBuf, &pCurrent[totalBytes], diff );
|
||||
assert ( totalBytes == nBytesRequired );
|
||||
}
|
||||
semMutexGive ( pBuf->readLock );
|
||||
return true;
|
||||
}
|
||||
|
||||
bool cacRingBufferReadFlush (ringBuffer *pRing)
|
||||
{
|
||||
if ( cacRingBufferWriteSize (pRing) ) {
|
||||
semBinaryGive (pRing->writeSignal);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user