From 87dca19708b8ce3bd4fc5b3bab2288da83901d1d Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 18 May 2017 15:59:01 -0400 Subject: [PATCH] codec: avoid indirection when accessing buffers avoid some indirection to make this code easier to follow. move buffer lower limit to base class. --- src/remote/codec.cpp | 223 +++++++++++++++++------------------ src/remote/pv/codec.h | 20 ++-- testApp/remote/testCodec.cpp | 59 +++------ 3 files changed, 139 insertions(+), 163 deletions(-) diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 5d69b56..b3cc0c4 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -65,10 +65,16 @@ const std::size_t AbstractCodec::MAX_ENSURE_DATA_SIZE = MAX_ENSURE_SIZE/2; const std::size_t AbstractCodec::MAX_ENSURE_BUFFER_SIZE = MAX_ENSURE_SIZE; const std::size_t AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE = 1024; +static +size_t bufSizeSelect(size_t request) +{ + return std::max(request, MAX_TCP_RECV + AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE); +} + AbstractCodec::AbstractCodec( bool serverFlag, - std::tr1::shared_ptr const & receiveBuffer, - std::tr1::shared_ptr const & sendBuffer, + size_t sendBufferSize, + size_t receiveBufferSize, int32_t socketSendBufferSize, bool blockingProcessQueue): //PROTECTED @@ -77,48 +83,43 @@ AbstractCodec::AbstractCodec( _senderThread(0), _writeMode(PROCESS_SEND_QUEUE), _writeOpReady(false),_lowLatency(false), - _socketBuffer(receiveBuffer), - _sendBuffer(sendBuffer), + _socketBuffer(bufSizeSelect(receiveBufferSize)), + _sendBuffer(bufSizeSelect(sendBufferSize)), //PRIVATE _storedPayloadSize(0), _storedPosition(0), _startPosition(0), - _maxSendPayloadSize(0), + _maxSendPayloadSize(_sendBuffer.getSize() - 2*PVA_MESSAGE_HEADER_SIZE), // start msg + control _lastMessageStartPosition(std::numeric_limits::max()),_lastSegmentedMessageType(0), _lastSegmentedMessageCommand(0), _nextMessagePayloadOffset(0), _byteOrderFlag(EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00), _clientServerFlag(serverFlag ? 0x40 : 0x00), - _socketSendBufferSize(0) + _socketSendBufferSize(socketSendBufferSize) { - if (receiveBuffer->getSize() < 2*MAX_ENSURE_SIZE) + if (_socketBuffer.getSize() < 2*MAX_ENSURE_SIZE) throw std::invalid_argument( "receiveBuffer.capacity() < 2*MAX_ENSURE_SIZE"); // require aligned buffer size //(not condition, but simplifies alignment code) - if (receiveBuffer->getSize() % PVA_ALIGNMENT != 0) + if (_socketBuffer.getSize() % PVA_ALIGNMENT != 0) throw std::invalid_argument( "receiveBuffer.capacity() % PVAConstants.PVA_ALIGNMENT != 0"); - if (sendBuffer->getSize() < 2*MAX_ENSURE_SIZE) + if (_sendBuffer.getSize() < 2*MAX_ENSURE_SIZE) throw std::invalid_argument("sendBuffer() < 2*MAX_ENSURE_SIZE"); // require aligned buffer size //(not condition, but simplifies alignment code) - if (sendBuffer->getSize() % PVA_ALIGNMENT != 0) + if (_sendBuffer.getSize() % PVA_ALIGNMENT != 0) throw std::invalid_argument( "sendBuffer() % PVAConstants.PVA_ALIGNMENT != 0"); // initialize to be empty - _socketBuffer->setPosition(_socketBuffer->getLimit()); - _startPosition = _socketBuffer->getPosition(); + _socketBuffer.setPosition(_socketBuffer.getLimit()); + _startPosition = _socketBuffer.getPosition(); // clear send - _sendBuffer->clear(); - - // start msg + control - _maxSendPayloadSize = - _sendBuffer->getSize() - 2*PVA_MESSAGE_HEADER_SIZE; - _socketSendBufferSize = socketSendBufferSize; + _sendBuffer.clear(); } @@ -142,19 +143,19 @@ void AbstractCodec::processRead() { void AbstractCodec::processHeader() { // magic code - int8_t magicCode = _socketBuffer->getByte(); + int8_t magicCode = _socketBuffer.getByte(); // version - _version = _socketBuffer->getByte(); + _version = _socketBuffer.getByte(); // flags - _flags = _socketBuffer->getByte(); + _flags = _socketBuffer.getByte(); // command - _command = _socketBuffer->getByte(); + _command = _socketBuffer.getByte(); // read payload size - _payloadSize = _socketBuffer->getInt(); + _payloadSize = _socketBuffer.getInt(); // check magic code if (magicCode != PVA_MAGIC) @@ -184,8 +185,8 @@ void AbstractCodec::processReadNormal() { } /* - hexDump("Header", (const int8*)_socketBuffer->getArray(), - _socketBuffer->getPosition(), PVA_MESSAGE_HEADER_SIZE); + hexDump("Header", (const int8*)_socketBuffer.getArray(), + _socketBuffer.getPosition(), PVA_MESSAGE_HEADER_SIZE); */ @@ -216,9 +217,9 @@ void AbstractCodec::processReadNormal() { } _storedPayloadSize = _payloadSize; - _storedPosition = _socketBuffer->getPosition(); - _storedLimit = _socketBuffer->getLimit(); - _socketBuffer->setLimit(std::min + _storedPosition = _socketBuffer.getPosition(); + _storedLimit = _socketBuffer.getLimit(); + _socketBuffer.setLimit(std::min (_storedPosition + _storedPayloadSize, _storedLimit)); bool postProcess = true; try @@ -283,7 +284,7 @@ void AbstractCodec::postProcessApplicationMessage() // we only handle unused alignment bytes int bytesNotRead = - newPosition - _socketBuffer->getPosition(); + newPosition - _socketBuffer.getPosition(); if (bytesNotRead < PVA_ALIGNMENT) { @@ -292,7 +293,7 @@ void AbstractCodec::postProcessApplicationMessage() // due to aligned buffer size _storedPayloadSize += bytesNotRead; // reveal currently existing padding - _socketBuffer->setLimit(_storedLimit); + _socketBuffer.setLimit(_storedLimit); ensureData(bytesNotRead); _storedPayloadSize -= bytesNotRead; continue; @@ -307,8 +308,8 @@ void AbstractCodec::postProcessApplicationMessage() throw invalid_data_stream_exception( "unprocessed read buffer"); } - _socketBuffer->setLimit(_storedLimit); - _socketBuffer->setPosition(newPosition); + _socketBuffer.setLimit(_storedLimit); + _socketBuffer.setPosition(newPosition); break; } } @@ -361,7 +362,7 @@ bool AbstractCodec::readToBuffer( bool persistent) { // do we already have requiredBytes available? - std::size_t remainingBytes = _socketBuffer->getRemaining(); + std::size_t remainingBytes = _socketBuffer.getRemaining(); if (remainingBytes >= requiredBytes) { return true; } @@ -377,22 +378,22 @@ bool AbstractCodec::readToBuffer( // a new start position, we are careful to preserve alignment _startPosition = - MAX_ENSURE_SIZE + _socketBuffer->getPosition() % PVA_ALIGNMENT; + MAX_ENSURE_SIZE + _socketBuffer.getPosition() % PVA_ALIGNMENT; std::size_t endPosition = _startPosition + remainingBytes; for (std::size_t i = _startPosition; i < endPosition; i++) - _socketBuffer->putByte(i, _socketBuffer->getByte()); + _socketBuffer.putByte(i, _socketBuffer.getByte()); // update buffer to the new position - _socketBuffer->setLimit(_socketBuffer->getSize()); - _socketBuffer->setPosition(endPosition); + _socketBuffer.setLimit(_socketBuffer.getSize()); + _socketBuffer.setPosition(endPosition); // read at least requiredBytes bytes std::size_t requiredPosition = _startPosition + requiredBytes; - while (_socketBuffer->getPosition() < requiredPosition) + while (_socketBuffer.getPosition() < requiredPosition) { - int bytesRead = read(_socketBuffer.get()); + int bytesRead = read(&_socketBuffer); if (bytesRead < 0) { @@ -407,8 +408,8 @@ bool AbstractCodec::readToBuffer( else { // set pointers (aka flip) - _socketBuffer->setLimit(_socketBuffer->getPosition()); - _socketBuffer->setPosition(_startPosition); + _socketBuffer.setLimit(_socketBuffer.getPosition()); + _socketBuffer.setPosition(_startPosition); return false; } @@ -416,8 +417,8 @@ bool AbstractCodec::readToBuffer( } // set pointers (aka flip) - _socketBuffer->setLimit(_socketBuffer->getPosition()); - _socketBuffer->setPosition(_startPosition); + _socketBuffer.setLimit(_socketBuffer.getPosition()); + _socketBuffer.setPosition(_startPosition); return true; } @@ -426,7 +427,7 @@ bool AbstractCodec::readToBuffer( void AbstractCodec::ensureData(std::size_t size) { // enough of data? - if (_socketBuffer->getRemaining() >= size) + if (_socketBuffer.getRemaining() >= size) return; // to large for buffer... @@ -436,15 +437,14 @@ void AbstractCodec::ensureData(std::size_t size) { << ", but maximum " << MAX_ENSURE_DATA_SIZE << " is allowed."; LOG(logLevelWarn, "%s at %s:%d.,", msg.str().c_str(), __FILE__, __LINE__); - std::string s = msg.str(); - throw std::invalid_argument(s); + throw std::invalid_argument(msg.str()); } try { // subtract what was already processed - std::size_t pos = _socketBuffer->getPosition(); + std::size_t pos = _socketBuffer.getPosition(); _storedPayloadSize -= pos - _storedPosition; // SPLIT message case @@ -460,9 +460,9 @@ void AbstractCodec::ensureData(std::size_t size) { _readMode = SPLIT; readToBuffer(size, true); _readMode = storedMode; - _storedPosition = _socketBuffer->getPosition(); - _storedLimit = _socketBuffer->getLimit(); - _socketBuffer->setLimit( + _storedPosition = _socketBuffer.getPosition(); + _storedLimit = _socketBuffer.getLimit(); + _socketBuffer.setLimit( std::min( _storedPosition + _storedPayloadSize, _storedLimit)); @@ -483,25 +483,25 @@ void AbstractCodec::ensureData(std::size_t size) { //[0 to MAX_ENSURE_DATA_BUFFER_SIZE/2), if any // remaining is relative to payload since buffer is //bounded from outside - std::size_t remainingBytes = _socketBuffer->getRemaining(); + std::size_t remainingBytes = _socketBuffer.getRemaining(); for (std::size_t i = 0; i < remainingBytes; i++) - _socketBuffer->putByte(i, _socketBuffer->getByte()); + _socketBuffer.putByte(i, _socketBuffer.getByte()); // restore limit (there might be some data already present //and readToBuffer needs to know real limit) - _socketBuffer->setLimit(_storedLimit); + _socketBuffer.setLimit(_storedLimit); // remember alignment offset of end of the message (to be restored) std::size_t storedAlignmentOffset = - _socketBuffer->getPosition() % PVA_ALIGNMENT; + _socketBuffer.getPosition() % PVA_ALIGNMENT; // skip post-message alignment bytes if (storedAlignmentOffset > 0) { std::size_t toSkip = PVA_ALIGNMENT - storedAlignmentOffset; readToBuffer(toSkip, true); - std::size_t currentPos = _socketBuffer->getPosition(); - _socketBuffer->setPosition(currentPos + toSkip); + std::size_t currentPos = _socketBuffer.getPosition(); + _socketBuffer.setPosition(currentPos + toSkip); } // we expect segmented message, we expect header @@ -519,21 +519,21 @@ void AbstractCodec::ensureData(std::size_t size) { //segmented message) // SPLIT cannot mess with this, since start of the message, //i.e. current position, is always aligned - _socketBuffer->setPosition( - _socketBuffer->getPosition() + storedAlignmentOffset); + _socketBuffer.setPosition( + _socketBuffer.getPosition() + storedAlignmentOffset); // copy before position (i.e. start of the payload) for (int32_t i = remainingBytes - 1, - j = _socketBuffer->getPosition() - 1; i >= 0; i--, j--) - _socketBuffer->putByte(j, _socketBuffer->getByte(i)); + j = _socketBuffer.getPosition() - 1; i >= 0; i--, j--) + _socketBuffer.putByte(j, _socketBuffer.getByte(i)); - _startPosition = _socketBuffer->getPosition() - remainingBytes; - _socketBuffer->setPosition(_startPosition); + _startPosition = _socketBuffer.getPosition() - remainingBytes; + _socketBuffer.setPosition(_startPosition); _storedPayloadSize += remainingBytes - storedAlignmentOffset; _storedPosition = _startPosition; - _storedLimit = _socketBuffer->getLimit(); - _socketBuffer->setLimit( + _storedLimit = _socketBuffer.getLimit(); + _socketBuffer.setLimit( std::min( _storedPosition + _storedPayloadSize, _storedLimit)); @@ -565,23 +565,23 @@ std::size_t AbstractCodec::alignedValue( void AbstractCodec::alignData(std::size_t alignment) { std::size_t k = (alignment - 1); - std::size_t pos = _socketBuffer->getPosition(); + std::size_t pos = _socketBuffer.getPosition(); std::size_t newpos = (pos + k) & (~k); if (pos == newpos) return; - std::size_t diff = _socketBuffer->getLimit() - newpos; + std::size_t diff = _socketBuffer.getLimit() - newpos; if (diff > 0) { - _socketBuffer->setPosition(newpos); + _socketBuffer.setPosition(newpos); return; } ensureData(diff); // position has changed, recalculate - newpos = (_socketBuffer->getPosition() + k) & (~k); - _socketBuffer->setPosition(newpos); + newpos = (_socketBuffer.getPosition() + k) & (~k); + _socketBuffer.setPosition(newpos); } static const char PADDING_BYTES[] = @@ -599,7 +599,7 @@ static const char PADDING_BYTES[] = void AbstractCodec::alignBuffer(std::size_t alignment) { std::size_t k = (alignment - 1); - std::size_t pos = _sendBuffer->getPosition(); + std::size_t pos = _sendBuffer.getPosition(); std::size_t newpos = (pos + k) & (~k); if (pos == newpos) return; @@ -607,12 +607,12 @@ void AbstractCodec::alignBuffer(std::size_t alignment) { /* // there is always enough of space // since sendBuffer capacity % PVA_ALIGNMENT == 0 - _sendBuffer->setPosition(newpos); + _sendBuffer.setPosition(newpos); */ // for safety reasons we really pad (override previous message data) std::size_t padCount = newpos - pos; - _sendBuffer->put(PADDING_BYTES, 0, padCount); + _sendBuffer.put(PADDING_BYTES, 0, padCount); } @@ -624,18 +624,18 @@ void AbstractCodec::startMessage( std::numeric_limits::max(); // TODO revise this ensureBuffer( PVA_MESSAGE_HEADER_SIZE + ensureCapacity + _nextMessagePayloadOffset); - _lastMessageStartPosition = _sendBuffer->getPosition(); - _sendBuffer->putByte(PVA_MAGIC); - _sendBuffer->putByte(PVA_VERSION); - _sendBuffer->putByte( + _lastMessageStartPosition = _sendBuffer.getPosition(); + _sendBuffer.putByte(PVA_MAGIC); + _sendBuffer.putByte(PVA_VERSION); + _sendBuffer.putByte( (_lastSegmentedMessageType | _byteOrderFlag | _clientServerFlag)); // data message - _sendBuffer->putByte(command); // command - _sendBuffer->putInt(payloadSize); + _sendBuffer.putByte(command); // command + _sendBuffer.putInt(payloadSize); // apply offset if (_nextMessagePayloadOffset > 0) - _sendBuffer->setPosition( - _sendBuffer->getPosition() + _nextMessagePayloadOffset); + _sendBuffer.setPosition( + _sendBuffer.getPosition() + _nextMessagePayloadOffset); } @@ -646,11 +646,11 @@ void AbstractCodec::putControlMessage( _lastMessageStartPosition = std::numeric_limits::max(); // TODO revise this ensureBuffer(PVA_MESSAGE_HEADER_SIZE); - _sendBuffer->putByte(PVA_MAGIC); - _sendBuffer->putByte(PVA_VERSION); - _sendBuffer->putByte((0x01 | _byteOrderFlag | _clientServerFlag)); // control message - _sendBuffer->putByte(command); // command - _sendBuffer->putInt(data); // data + _sendBuffer.putByte(PVA_MAGIC); + _sendBuffer.putByte(PVA_VERSION); + _sendBuffer.putByte((0x01 | _byteOrderFlag | _clientServerFlag)); // control message + _sendBuffer.putByte(command); // command + _sendBuffer.putInt(data); // data } @@ -663,7 +663,7 @@ void AbstractCodec::endMessage(bool hasMoreSegments) { if (_lastMessageStartPosition != std::numeric_limits::max()) { - std::size_t lastPayloadBytePosition = _sendBuffer->getPosition(); + std::size_t lastPayloadBytePosition = _sendBuffer.getPosition(); // align alignBuffer(PVA_ALIGNMENT); @@ -673,7 +673,7 @@ void AbstractCodec::endMessage(bool hasMoreSegments) { lastPayloadBytePosition - _lastMessageStartPosition - PVA_MESSAGE_HEADER_SIZE; - _sendBuffer->putInt(_lastMessageStartPosition + 4, payloadSize); + _sendBuffer.putInt(_lastMessageStartPosition + 4, payloadSize); // set segmented bit if (hasMoreSegments) { @@ -681,13 +681,13 @@ void AbstractCodec::endMessage(bool hasMoreSegments) { if (_lastSegmentedMessageType == 0) { std::size_t flagsPosition = _lastMessageStartPosition + 2; - epics::pvData::int8 type = _sendBuffer->getByte(flagsPosition); + epics::pvData::int8 type = _sendBuffer.getByte(flagsPosition); // set first segment bit - _sendBuffer->putByte(flagsPosition, (type | 0x10)); + _sendBuffer.putByte(flagsPosition, (type | 0x10)); // first + last segment bit == in-between segment _lastSegmentedMessageType = type | 0x30; _lastSegmentedMessageCommand = - _sendBuffer->getByte(flagsPosition + 1); + _sendBuffer.getByte(flagsPosition + 1); } _nextMessagePayloadOffset = lastPayloadBytePosition % PVA_ALIGNMENT; } @@ -698,7 +698,7 @@ void AbstractCodec::endMessage(bool hasMoreSegments) { { std::size_t flagsPosition = _lastMessageStartPosition + 2; // set last segment bit (by clearing first segment bit) - _sendBuffer->putByte(flagsPosition, + _sendBuffer.putByte(flagsPosition, (_lastSegmentedMessageType & 0xEF)); _lastSegmentedMessageType = 0; } @@ -728,7 +728,7 @@ void AbstractCodec::endMessage(bool hasMoreSegments) { void AbstractCodec::ensureBuffer(std::size_t size) { - if (_sendBuffer->getRemaining() >= size) + if (_sendBuffer.getRemaining() >= size) return; // too large for buffer... @@ -742,7 +742,7 @@ void AbstractCodec::ensureBuffer(std::size_t size) { throw std::invalid_argument(s); } - while (_sendBuffer->getRemaining() < size) + while (_sendBuffer.getRemaining() < size) flush(false); } @@ -753,10 +753,10 @@ void AbstractCodec::flushSerializeBuffer() { void AbstractCodec::flushSendBuffer() { - _sendBuffer->flip(); + _sendBuffer.flip(); try { - send(_sendBuffer.get()); + send(&_sendBuffer); } catch (io_exception &) { try { if (isOpen()) @@ -767,7 +767,7 @@ void AbstractCodec::flushSendBuffer() { throw connection_closed_exception("Failed to send buffer."); } - _sendBuffer->clear(); + _sendBuffer.clear(); _lastMessageStartPosition = std::numeric_limits::max(); } @@ -878,7 +878,7 @@ void AbstractCodec::processSendQueue() if (sender.get() == 0) { // flush - if (_sendBuffer->getPosition() > 0) + if (_sendBuffer.getPosition() > 0) flush(true); sendCompleted(); // do not schedule sending @@ -892,7 +892,7 @@ void AbstractCodec::processSendQueue() try { processSender(sender); } catch(...) { - if (_sendBuffer->getPosition() > 0) + if (_sendBuffer.getPosition() > 0) flush(true); sendCompleted(); throw; @@ -901,7 +901,7 @@ void AbstractCodec::processSendQueue() } // flush - if (_sendBuffer->getPosition() > 0) + if (_sendBuffer.getPosition() > 0) flush(true); } @@ -926,9 +926,9 @@ void AbstractCodec::processSender( ScopedLock lock(sender); try { - _lastMessageStartPosition = _sendBuffer->getPosition(); + _lastMessageStartPosition = _sendBuffer.getPosition(); - sender->send(_sendBuffer.get(), this); + sender->send(&_sendBuffer, this); // automatic end (to set payload size) endMessage(false); @@ -961,10 +961,10 @@ void AbstractCodec::enqueueSendRequest( if (_senderThread == epicsThreadGetIdSelf() && _sendQueue.empty() && - _sendBuffer->getRemaining() >= requiredBufferSize) + _sendBuffer.getRemaining() >= requiredBufferSize) { processSender(sender); - if (_sendBuffer->getPosition() > 0) + if (_sendBuffer.getPosition() > 0) { if (_lowLatency) flush(true); @@ -984,9 +984,9 @@ void AbstractCodec::setRecipient(osiSockAddr const & sendTo) { void AbstractCodec::setByteOrder(int byteOrder) { - _socketBuffer->setEndianess(byteOrder); + _socketBuffer.setEndianess(byteOrder); // TODO sync - _sendBuffer->setEndianess(byteOrder); + _sendBuffer.setEndianess(byteOrder); _byteOrderFlag = EPICS_ENDIAN_BIG == byteOrder ? 0x80 : 0x00; } @@ -1182,16 +1182,13 @@ void BlockingTCPTransportCodec::sendBufferFull(int tries) { BlockingTCPTransportCodec::BlockingTCPTransportCodec(bool serverFlag, const Context::shared_pointer &context, SOCKET channel, const ResponseHandler::shared_pointer &responseHandler, - int32_t sendBufferSize, - int32_t receiveBufferSize, int16 priority) + size_t sendBufferSize, + size_t receiveBufferSize, int16 priority) :AbstractCodec( serverFlag, - std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( - MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + - (PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))), - std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( MAX_TCP_RECV + - MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1)) - & (~(PVA_ALIGNMENT - 1)))), sendBufferSize, + sendBufferSize, + receiveBufferSize, + sendBufferSize, true) ,_readThread(epics::pvData::Thread::Config(this, &BlockingTCPTransportCodec::receiveThread) .prio(epicsThreadPriorityCAServerLow) diff --git a/src/remote/pv/codec.h b/src/remote/pv/codec.h index bf9fa94..e1727af 100644 --- a/src/remote/pv/codec.h +++ b/src/remote/pv/codec.h @@ -175,8 +175,8 @@ public: AbstractCodec( bool serverFlag, - std::tr1::shared_ptr const & receiveBuffer, - std::tr1::shared_ptr const & sendBuffer, + size_t sendBufferSize, + size_t receiveBufferSize, int32_t socketSendBufferSize, bool blockingProcessQueue); @@ -260,8 +260,8 @@ protected: bool _writeOpReady; bool _lowLatency; - std::tr1::shared_ptr _socketBuffer; - std::tr1::shared_ptr _sendBuffer; + epics::pvData::ByteBuffer _socketBuffer; + epics::pvData::ByteBuffer _sendBuffer; fair_queue _sendQueue; @@ -281,7 +281,7 @@ private: std::size_t _storedLimit; std::size_t _startPosition; - std::size_t _maxSendPayloadSize; + const std::size_t _maxSendPayloadSize; std::size_t _lastMessageStartPosition; std::size_t _lastSegmentedMessageType; int8_t _lastSegmentedMessageCommand; @@ -289,7 +289,7 @@ private: epics::pvData::int8 _byteOrderFlag; epics::pvData::int8 _clientServerFlag; - int32_t _socketSendBufferSize; + const size_t _socketSendBufferSize; }; @@ -308,8 +308,8 @@ public: Context::shared_pointer const & context, SOCKET channel, ResponseHandler::shared_pointer const & responseHandler, - int32_t sendBufferSize, - int32_t receiveBufferSize, + size_t sendBufferSize, + size_t receiveBufferSize, epics::pvData::int16 priority); virtual ~BlockingTCPTransportCodec(); @@ -348,7 +348,7 @@ public: virtual void processApplicationMessage() OVERRIDE FINAL { _responseHandler->handleResponse(&_socketAddress, shared_from_this(), - _version, _command, _payloadSize, _socketBuffer.get()); + _version, _command, _payloadSize, &_socketBuffer); } @@ -366,7 +366,7 @@ public: virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL { - return _socketBuffer->getSize(); + return _socketBuffer.getSize(); } diff --git a/testApp/remote/testCodec.cpp b/testApp/remote/testCodec.cpp index c504741..058494c 100644 --- a/testApp/remote/testCodec.cpp +++ b/testApp/remote/testCodec.cpp @@ -94,8 +94,8 @@ public: bool blocking = false): AbstractCodec( false, - std::tr1::shared_ptr(new ByteBuffer(receiveBufferSize)), - std::tr1::shared_ptr(new ByteBuffer(sendBufferSize)), + sendBufferSize, + receiveBufferSize, sendBufferSize/10, blocking), _closedCount(0), @@ -171,18 +171,11 @@ public: if (_throwExceptionOnSend) throw io_exception("text IO exception"); - // we could write remaining int8_ts, but for - //test this is enought - if (buffer->getRemaining() > _writeBuffer.getRemaining()) - return 0; + size_t nmove = std::min(buffer->getRemaining(), _writeBuffer.getRemaining()); - std::size_t startPos = buffer->getPosition(); - - while(buffer->getRemaining() > 0) { + for(size_t n=0; ngetByte()); - } - - return buffer->getPosition() - startPos; + return nmove; } @@ -220,7 +213,7 @@ public: void processControlMessage() { // alignment check - if (_socketBuffer->getPosition() % PVA_ALIGNMENT != 0) + if (_socketBuffer.getPosition() % PVA_ALIGNMENT != 0) throw std::logic_error("message not aligned"); _receivedControlMessages.push_back( @@ -230,7 +223,7 @@ public: void processApplicationMessage() { // alignment check - if (_socketBuffer->getPosition() % PVA_ALIGNMENT != 0) + if (_socketBuffer.getPosition() % PVA_ALIGNMENT != 0) throw std::logic_error("message not aligned"); PVAMessage caMessage(_version, _flags, @@ -252,8 +245,8 @@ public: std::size_t pos = caMessage._payload->getPosition(); - while(_socketBuffer->getRemaining() > 0) { - caMessage._payload->putByte(_socketBuffer->getByte()); + while(_socketBuffer.getRemaining() > 0) { + caMessage._payload->putByte(_socketBuffer.getByte()); } std::size_t read = @@ -296,9 +289,9 @@ public: return _writeMode; } - std::tr1::shared_ptr getSendBuffer() + ByteBuffer* getSendBuffer() { - return _sendBuffer; + return &_sendBuffer; } const osiSockAddr* getLastReadBufferSocketAddress() @@ -454,6 +447,9 @@ public: protected: void sendBufferFull(int tries) { + testDiag("sendBufferFull tries=%d", tries); + if(tries>10) // arbitrary limit + testAbort("Stuck"); _sendBufferFullCount++; _writeOpReady = false; _writeMode = WAIT_FOR_READY_SIGNAL; @@ -2770,6 +2766,7 @@ private: TestCodec &codec): _codec(codec) {} void writePollOne() { + testDiag("In %s", CURRENT_FUNCTION); _codec.processWrite(); // this should return immediately // now we fake reading @@ -2819,7 +2816,9 @@ private: codec.breakSender(); try { codec.processSendQueue(); - } catch(sender_break&) {} + } catch(sender_break&) { + testDiag("sender_break"); + } codec.addToReadBuffer(); @@ -2916,26 +2915,6 @@ private: { testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); - try - { - // too small - TestCodec codec(1,DEFAULT_BUFFER_SIZE); - testFail("%s: too small buffer accepted", - CURRENT_FUNCTION); - } catch (std::exception &) { - // OK - } - - try - { - // too small - TestCodec codec(DEFAULT_BUFFER_SIZE,1); - testFail("%s: too small buffer accepted", - CURRENT_FUNCTION); - } catch (std::exception &) { - // OK - } - if (PVA_ALIGNMENT > 1) { try @@ -2970,7 +2949,7 @@ private: try { - codec.ensureBuffer(DEFAULT_BUFFER_SIZE+1); + codec.ensureBuffer(MAX_TCP_RECV + AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE+1); testFail("%s: too big size accepted", CURRENT_FUNCTION); } catch (std::exception &) {