codec: avoid indirection when accessing buffers

avoid some indirection to make this code easier to follow.
move buffer lower limit to base class.
This commit is contained in:
Michael Davidsaver
2017-05-18 15:59:01 -04:00
parent 0c1f410e63
commit 87dca19708
3 changed files with 139 additions and 163 deletions

View File

@@ -65,10 +65,16 @@ const std::size_t AbstractCodec::MAX_ENSURE_DATA_SIZE = MAX_ENSURE_SIZE/2;
const std::size_t AbstractCodec::MAX_ENSURE_BUFFER_SIZE = MAX_ENSURE_SIZE;
const std::size_t AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE = 1024;
static
size_t bufSizeSelect(size_t request)
{
return std::max(request, MAX_TCP_RECV + AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE);
}
AbstractCodec::AbstractCodec(
bool serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & sendBuffer,
size_t sendBufferSize,
size_t receiveBufferSize,
int32_t socketSendBufferSize,
bool blockingProcessQueue):
//PROTECTED
@@ -77,48 +83,43 @@ AbstractCodec::AbstractCodec(
_senderThread(0),
_writeMode(PROCESS_SEND_QUEUE),
_writeOpReady(false),_lowLatency(false),
_socketBuffer(receiveBuffer),
_sendBuffer(sendBuffer),
_socketBuffer(bufSizeSelect(receiveBufferSize)),
_sendBuffer(bufSizeSelect(sendBufferSize)),
//PRIVATE
_storedPayloadSize(0), _storedPosition(0), _startPosition(0),
_maxSendPayloadSize(0),
_maxSendPayloadSize(_sendBuffer.getSize() - 2*PVA_MESSAGE_HEADER_SIZE), // start msg + control
_lastMessageStartPosition(std::numeric_limits<size_t>::max()),_lastSegmentedMessageType(0),
_lastSegmentedMessageCommand(0), _nextMessagePayloadOffset(0),
_byteOrderFlag(EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00),
_clientServerFlag(serverFlag ? 0x40 : 0x00),
_socketSendBufferSize(0)
_socketSendBufferSize(socketSendBufferSize)
{
if (receiveBuffer->getSize() < 2*MAX_ENSURE_SIZE)
if (_socketBuffer.getSize() < 2*MAX_ENSURE_SIZE)
throw std::invalid_argument(
"receiveBuffer.capacity() < 2*MAX_ENSURE_SIZE");
// require aligned buffer size
//(not condition, but simplifies alignment code)
if (receiveBuffer->getSize() % PVA_ALIGNMENT != 0)
if (_socketBuffer.getSize() % PVA_ALIGNMENT != 0)
throw std::invalid_argument(
"receiveBuffer.capacity() % PVAConstants.PVA_ALIGNMENT != 0");
if (sendBuffer->getSize() < 2*MAX_ENSURE_SIZE)
if (_sendBuffer.getSize() < 2*MAX_ENSURE_SIZE)
throw std::invalid_argument("sendBuffer() < 2*MAX_ENSURE_SIZE");
// require aligned buffer size
//(not condition, but simplifies alignment code)
if (sendBuffer->getSize() % PVA_ALIGNMENT != 0)
if (_sendBuffer.getSize() % PVA_ALIGNMENT != 0)
throw std::invalid_argument(
"sendBuffer() % PVAConstants.PVA_ALIGNMENT != 0");
// initialize to be empty
_socketBuffer->setPosition(_socketBuffer->getLimit());
_startPosition = _socketBuffer->getPosition();
_socketBuffer.setPosition(_socketBuffer.getLimit());
_startPosition = _socketBuffer.getPosition();
// clear send
_sendBuffer->clear();
// start msg + control
_maxSendPayloadSize =
_sendBuffer->getSize() - 2*PVA_MESSAGE_HEADER_SIZE;
_socketSendBufferSize = socketSendBufferSize;
_sendBuffer.clear();
}
@@ -142,19 +143,19 @@ void AbstractCodec::processRead() {
void AbstractCodec::processHeader() {
// magic code
int8_t magicCode = _socketBuffer->getByte();
int8_t magicCode = _socketBuffer.getByte();
// version
_version = _socketBuffer->getByte();
_version = _socketBuffer.getByte();
// flags
_flags = _socketBuffer->getByte();
_flags = _socketBuffer.getByte();
// command
_command = _socketBuffer->getByte();
_command = _socketBuffer.getByte();
// read payload size
_payloadSize = _socketBuffer->getInt();
_payloadSize = _socketBuffer.getInt();
// check magic code
if (magicCode != PVA_MAGIC)
@@ -184,8 +185,8 @@ void AbstractCodec::processReadNormal() {
}
/*
hexDump("Header", (const int8*)_socketBuffer->getArray(),
_socketBuffer->getPosition(), PVA_MESSAGE_HEADER_SIZE);
hexDump("Header", (const int8*)_socketBuffer.getArray(),
_socketBuffer.getPosition(), PVA_MESSAGE_HEADER_SIZE);
*/
@@ -216,9 +217,9 @@ void AbstractCodec::processReadNormal() {
}
_storedPayloadSize = _payloadSize;
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(std::min<std::size_t>
_storedPosition = _socketBuffer.getPosition();
_storedLimit = _socketBuffer.getLimit();
_socketBuffer.setLimit(std::min<std::size_t>
(_storedPosition + _storedPayloadSize, _storedLimit));
bool postProcess = true;
try
@@ -283,7 +284,7 @@ void AbstractCodec::postProcessApplicationMessage()
// we only handle unused alignment bytes
int bytesNotRead =
newPosition - _socketBuffer->getPosition();
newPosition - _socketBuffer.getPosition();
if (bytesNotRead < PVA_ALIGNMENT)
{
@@ -292,7 +293,7 @@ void AbstractCodec::postProcessApplicationMessage()
// due to aligned buffer size
_storedPayloadSize += bytesNotRead;
// reveal currently existing padding
_socketBuffer->setLimit(_storedLimit);
_socketBuffer.setLimit(_storedLimit);
ensureData(bytesNotRead);
_storedPayloadSize -= bytesNotRead;
continue;
@@ -307,8 +308,8 @@ void AbstractCodec::postProcessApplicationMessage()
throw invalid_data_stream_exception(
"unprocessed read buffer");
}
_socketBuffer->setLimit(_storedLimit);
_socketBuffer->setPosition(newPosition);
_socketBuffer.setLimit(_storedLimit);
_socketBuffer.setPosition(newPosition);
break;
}
}
@@ -361,7 +362,7 @@ bool AbstractCodec::readToBuffer(
bool persistent) {
// do we already have requiredBytes available?
std::size_t remainingBytes = _socketBuffer->getRemaining();
std::size_t remainingBytes = _socketBuffer.getRemaining();
if (remainingBytes >= requiredBytes) {
return true;
}
@@ -377,22 +378,22 @@ bool AbstractCodec::readToBuffer(
// a new start position, we are careful to preserve alignment
_startPosition =
MAX_ENSURE_SIZE + _socketBuffer->getPosition() % PVA_ALIGNMENT;
MAX_ENSURE_SIZE + _socketBuffer.getPosition() % PVA_ALIGNMENT;
std::size_t endPosition = _startPosition + remainingBytes;
for (std::size_t i = _startPosition; i < endPosition; i++)
_socketBuffer->putByte(i, _socketBuffer->getByte());
_socketBuffer.putByte(i, _socketBuffer.getByte());
// update buffer to the new position
_socketBuffer->setLimit(_socketBuffer->getSize());
_socketBuffer->setPosition(endPosition);
_socketBuffer.setLimit(_socketBuffer.getSize());
_socketBuffer.setPosition(endPosition);
// read at least requiredBytes bytes
std::size_t requiredPosition = _startPosition + requiredBytes;
while (_socketBuffer->getPosition() < requiredPosition)
while (_socketBuffer.getPosition() < requiredPosition)
{
int bytesRead = read(_socketBuffer.get());
int bytesRead = read(&_socketBuffer);
if (bytesRead < 0)
{
@@ -407,8 +408,8 @@ bool AbstractCodec::readToBuffer(
else
{
// set pointers (aka flip)
_socketBuffer->setLimit(_socketBuffer->getPosition());
_socketBuffer->setPosition(_startPosition);
_socketBuffer.setLimit(_socketBuffer.getPosition());
_socketBuffer.setPosition(_startPosition);
return false;
}
@@ -416,8 +417,8 @@ bool AbstractCodec::readToBuffer(
}
// set pointers (aka flip)
_socketBuffer->setLimit(_socketBuffer->getPosition());
_socketBuffer->setPosition(_startPosition);
_socketBuffer.setLimit(_socketBuffer.getPosition());
_socketBuffer.setPosition(_startPosition);
return true;
}
@@ -426,7 +427,7 @@ bool AbstractCodec::readToBuffer(
void AbstractCodec::ensureData(std::size_t size) {
// enough of data?
if (_socketBuffer->getRemaining() >= size)
if (_socketBuffer.getRemaining() >= size)
return;
// to large for buffer...
@@ -436,15 +437,14 @@ void AbstractCodec::ensureData(std::size_t size) {
<< ", but maximum " << MAX_ENSURE_DATA_SIZE << " is allowed.";
LOG(logLevelWarn,
"%s at %s:%d.,", msg.str().c_str(), __FILE__, __LINE__);
std::string s = msg.str();
throw std::invalid_argument(s);
throw std::invalid_argument(msg.str());
}
try
{
// subtract what was already processed
std::size_t pos = _socketBuffer->getPosition();
std::size_t pos = _socketBuffer.getPosition();
_storedPayloadSize -= pos - _storedPosition;
// SPLIT message case
@@ -460,9 +460,9 @@ void AbstractCodec::ensureData(std::size_t size) {
_readMode = SPLIT;
readToBuffer(size, true);
_readMode = storedMode;
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(
_storedPosition = _socketBuffer.getPosition();
_storedLimit = _socketBuffer.getLimit();
_socketBuffer.setLimit(
std::min<std::size_t>(
_storedPosition + _storedPayloadSize, _storedLimit));
@@ -483,25 +483,25 @@ void AbstractCodec::ensureData(std::size_t size) {
//[0 to MAX_ENSURE_DATA_BUFFER_SIZE/2), if any
// remaining is relative to payload since buffer is
//bounded from outside
std::size_t remainingBytes = _socketBuffer->getRemaining();
std::size_t remainingBytes = _socketBuffer.getRemaining();
for (std::size_t i = 0; i < remainingBytes; i++)
_socketBuffer->putByte(i, _socketBuffer->getByte());
_socketBuffer.putByte(i, _socketBuffer.getByte());
// restore limit (there might be some data already present
//and readToBuffer needs to know real limit)
_socketBuffer->setLimit(_storedLimit);
_socketBuffer.setLimit(_storedLimit);
// remember alignment offset of end of the message (to be restored)
std::size_t storedAlignmentOffset =
_socketBuffer->getPosition() % PVA_ALIGNMENT;
_socketBuffer.getPosition() % PVA_ALIGNMENT;
// skip post-message alignment bytes
if (storedAlignmentOffset > 0)
{
std::size_t toSkip = PVA_ALIGNMENT - storedAlignmentOffset;
readToBuffer(toSkip, true);
std::size_t currentPos = _socketBuffer->getPosition();
_socketBuffer->setPosition(currentPos + toSkip);
std::size_t currentPos = _socketBuffer.getPosition();
_socketBuffer.setPosition(currentPos + toSkip);
}
// we expect segmented message, we expect header
@@ -519,21 +519,21 @@ void AbstractCodec::ensureData(std::size_t size) {
//segmented message)
// SPLIT cannot mess with this, since start of the message,
//i.e. current position, is always aligned
_socketBuffer->setPosition(
_socketBuffer->getPosition() + storedAlignmentOffset);
_socketBuffer.setPosition(
_socketBuffer.getPosition() + storedAlignmentOffset);
// copy before position (i.e. start of the payload)
for (int32_t i = remainingBytes - 1,
j = _socketBuffer->getPosition() - 1; i >= 0; i--, j--)
_socketBuffer->putByte(j, _socketBuffer->getByte(i));
j = _socketBuffer.getPosition() - 1; i >= 0; i--, j--)
_socketBuffer.putByte(j, _socketBuffer.getByte(i));
_startPosition = _socketBuffer->getPosition() - remainingBytes;
_socketBuffer->setPosition(_startPosition);
_startPosition = _socketBuffer.getPosition() - remainingBytes;
_socketBuffer.setPosition(_startPosition);
_storedPayloadSize += remainingBytes - storedAlignmentOffset;
_storedPosition = _startPosition;
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(
_storedLimit = _socketBuffer.getLimit();
_socketBuffer.setLimit(
std::min<std::size_t>(
_storedPosition + _storedPayloadSize, _storedLimit));
@@ -565,23 +565,23 @@ std::size_t AbstractCodec::alignedValue(
void AbstractCodec::alignData(std::size_t alignment) {
std::size_t k = (alignment - 1);
std::size_t pos = _socketBuffer->getPosition();
std::size_t pos = _socketBuffer.getPosition();
std::size_t newpos = (pos + k) & (~k);
if (pos == newpos)
return;
std::size_t diff = _socketBuffer->getLimit() - newpos;
std::size_t diff = _socketBuffer.getLimit() - newpos;
if (diff > 0)
{
_socketBuffer->setPosition(newpos);
_socketBuffer.setPosition(newpos);
return;
}
ensureData(diff);
// position has changed, recalculate
newpos = (_socketBuffer->getPosition() + k) & (~k);
_socketBuffer->setPosition(newpos);
newpos = (_socketBuffer.getPosition() + k) & (~k);
_socketBuffer.setPosition(newpos);
}
static const char PADDING_BYTES[] =
@@ -599,7 +599,7 @@ static const char PADDING_BYTES[] =
void AbstractCodec::alignBuffer(std::size_t alignment) {
std::size_t k = (alignment - 1);
std::size_t pos = _sendBuffer->getPosition();
std::size_t pos = _sendBuffer.getPosition();
std::size_t newpos = (pos + k) & (~k);
if (pos == newpos)
return;
@@ -607,12 +607,12 @@ void AbstractCodec::alignBuffer(std::size_t alignment) {
/*
// there is always enough of space
// since sendBuffer capacity % PVA_ALIGNMENT == 0
_sendBuffer->setPosition(newpos);
_sendBuffer.setPosition(newpos);
*/
// for safety reasons we really pad (override previous message data)
std::size_t padCount = newpos - pos;
_sendBuffer->put(PADDING_BYTES, 0, padCount);
_sendBuffer.put(PADDING_BYTES, 0, padCount);
}
@@ -624,18 +624,18 @@ void AbstractCodec::startMessage(
std::numeric_limits<size_t>::max(); // TODO revise this
ensureBuffer(
PVA_MESSAGE_HEADER_SIZE + ensureCapacity + _nextMessagePayloadOffset);
_lastMessageStartPosition = _sendBuffer->getPosition();
_sendBuffer->putByte(PVA_MAGIC);
_sendBuffer->putByte(PVA_VERSION);
_sendBuffer->putByte(
_lastMessageStartPosition = _sendBuffer.getPosition();
_sendBuffer.putByte(PVA_MAGIC);
_sendBuffer.putByte(PVA_VERSION);
_sendBuffer.putByte(
(_lastSegmentedMessageType | _byteOrderFlag | _clientServerFlag)); // data message
_sendBuffer->putByte(command); // command
_sendBuffer->putInt(payloadSize);
_sendBuffer.putByte(command); // command
_sendBuffer.putInt(payloadSize);
// apply offset
if (_nextMessagePayloadOffset > 0)
_sendBuffer->setPosition(
_sendBuffer->getPosition() + _nextMessagePayloadOffset);
_sendBuffer.setPosition(
_sendBuffer.getPosition() + _nextMessagePayloadOffset);
}
@@ -646,11 +646,11 @@ void AbstractCodec::putControlMessage(
_lastMessageStartPosition =
std::numeric_limits<size_t>::max(); // TODO revise this
ensureBuffer(PVA_MESSAGE_HEADER_SIZE);
_sendBuffer->putByte(PVA_MAGIC);
_sendBuffer->putByte(PVA_VERSION);
_sendBuffer->putByte((0x01 | _byteOrderFlag | _clientServerFlag)); // control message
_sendBuffer->putByte(command); // command
_sendBuffer->putInt(data); // data
_sendBuffer.putByte(PVA_MAGIC);
_sendBuffer.putByte(PVA_VERSION);
_sendBuffer.putByte((0x01 | _byteOrderFlag | _clientServerFlag)); // control message
_sendBuffer.putByte(command); // command
_sendBuffer.putInt(data); // data
}
@@ -663,7 +663,7 @@ void AbstractCodec::endMessage(bool hasMoreSegments) {
if (_lastMessageStartPosition != std::numeric_limits<size_t>::max())
{
std::size_t lastPayloadBytePosition = _sendBuffer->getPosition();
std::size_t lastPayloadBytePosition = _sendBuffer.getPosition();
// align
alignBuffer(PVA_ALIGNMENT);
@@ -673,7 +673,7 @@ void AbstractCodec::endMessage(bool hasMoreSegments) {
lastPayloadBytePosition -
_lastMessageStartPosition - PVA_MESSAGE_HEADER_SIZE;
_sendBuffer->putInt(_lastMessageStartPosition + 4, payloadSize);
_sendBuffer.putInt(_lastMessageStartPosition + 4, payloadSize);
// set segmented bit
if (hasMoreSegments) {
@@ -681,13 +681,13 @@ void AbstractCodec::endMessage(bool hasMoreSegments) {
if (_lastSegmentedMessageType == 0)
{
std::size_t flagsPosition = _lastMessageStartPosition + 2;
epics::pvData::int8 type = _sendBuffer->getByte(flagsPosition);
epics::pvData::int8 type = _sendBuffer.getByte(flagsPosition);
// set first segment bit
_sendBuffer->putByte(flagsPosition, (type | 0x10));
_sendBuffer.putByte(flagsPosition, (type | 0x10));
// first + last segment bit == in-between segment
_lastSegmentedMessageType = type | 0x30;
_lastSegmentedMessageCommand =
_sendBuffer->getByte(flagsPosition + 1);
_sendBuffer.getByte(flagsPosition + 1);
}
_nextMessagePayloadOffset = lastPayloadBytePosition % PVA_ALIGNMENT;
}
@@ -698,7 +698,7 @@ void AbstractCodec::endMessage(bool hasMoreSegments) {
{
std::size_t flagsPosition = _lastMessageStartPosition + 2;
// set last segment bit (by clearing first segment bit)
_sendBuffer->putByte(flagsPosition,
_sendBuffer.putByte(flagsPosition,
(_lastSegmentedMessageType & 0xEF));
_lastSegmentedMessageType = 0;
}
@@ -728,7 +728,7 @@ void AbstractCodec::endMessage(bool hasMoreSegments) {
void AbstractCodec::ensureBuffer(std::size_t size) {
if (_sendBuffer->getRemaining() >= size)
if (_sendBuffer.getRemaining() >= size)
return;
// too large for buffer...
@@ -742,7 +742,7 @@ void AbstractCodec::ensureBuffer(std::size_t size) {
throw std::invalid_argument(s);
}
while (_sendBuffer->getRemaining() < size)
while (_sendBuffer.getRemaining() < size)
flush(false);
}
@@ -753,10 +753,10 @@ void AbstractCodec::flushSerializeBuffer() {
void AbstractCodec::flushSendBuffer() {
_sendBuffer->flip();
_sendBuffer.flip();
try {
send(_sendBuffer.get());
send(&_sendBuffer);
} catch (io_exception &) {
try {
if (isOpen())
@@ -767,7 +767,7 @@ void AbstractCodec::flushSendBuffer() {
throw connection_closed_exception("Failed to send buffer.");
}
_sendBuffer->clear();
_sendBuffer.clear();
_lastMessageStartPosition = std::numeric_limits<size_t>::max();
}
@@ -878,7 +878,7 @@ void AbstractCodec::processSendQueue()
if (sender.get() == 0)
{
// flush
if (_sendBuffer->getPosition() > 0)
if (_sendBuffer.getPosition() > 0)
flush(true);
sendCompleted(); // do not schedule sending
@@ -892,7 +892,7 @@ void AbstractCodec::processSendQueue()
try {
processSender(sender);
} catch(...) {
if (_sendBuffer->getPosition() > 0)
if (_sendBuffer.getPosition() > 0)
flush(true);
sendCompleted();
throw;
@@ -901,7 +901,7 @@ void AbstractCodec::processSendQueue()
}
// flush
if (_sendBuffer->getPosition() > 0)
if (_sendBuffer.getPosition() > 0)
flush(true);
}
@@ -926,9 +926,9 @@ void AbstractCodec::processSender(
ScopedLock lock(sender);
try {
_lastMessageStartPosition = _sendBuffer->getPosition();
_lastMessageStartPosition = _sendBuffer.getPosition();
sender->send(_sendBuffer.get(), this);
sender->send(&_sendBuffer, this);
// automatic end (to set payload size)
endMessage(false);
@@ -961,10 +961,10 @@ void AbstractCodec::enqueueSendRequest(
if (_senderThread == epicsThreadGetIdSelf() &&
_sendQueue.empty() &&
_sendBuffer->getRemaining() >= requiredBufferSize)
_sendBuffer.getRemaining() >= requiredBufferSize)
{
processSender(sender);
if (_sendBuffer->getPosition() > 0)
if (_sendBuffer.getPosition() > 0)
{
if (_lowLatency)
flush(true);
@@ -984,9 +984,9 @@ void AbstractCodec::setRecipient(osiSockAddr const & sendTo) {
void AbstractCodec::setByteOrder(int byteOrder)
{
_socketBuffer->setEndianess(byteOrder);
_socketBuffer.setEndianess(byteOrder);
// TODO sync
_sendBuffer->setEndianess(byteOrder);
_sendBuffer.setEndianess(byteOrder);
_byteOrderFlag = EPICS_ENDIAN_BIG == byteOrder ? 0x80 : 0x00;
}
@@ -1182,16 +1182,13 @@ void BlockingTCPTransportCodec::sendBufferFull(int tries) {
BlockingTCPTransportCodec::BlockingTCPTransportCodec(bool serverFlag, const Context::shared_pointer &context,
SOCKET channel, const ResponseHandler::shared_pointer &responseHandler,
int32_t sendBufferSize,
int32_t receiveBufferSize, int16 priority)
size_t sendBufferSize,
size_t receiveBufferSize, int16 priority)
:AbstractCodec(
serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer((std::max<std::size_t>((std::size_t)(
MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) +
(PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))),
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer((std::max<std::size_t>((std::size_t)( MAX_TCP_RECV +
MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1))
& (~(PVA_ALIGNMENT - 1)))), sendBufferSize,
sendBufferSize,
receiveBufferSize,
sendBufferSize,
true)
,_readThread(epics::pvData::Thread::Config(this, &BlockingTCPTransportCodec::receiveThread)
.prio(epicsThreadPriorityCAServerLow)